import logging
from typing import TYPE_CHECKING
import colander
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPException,
HTTPForbidden,
HTTPNotFound,
HTTPOk,
HTTPServiceUnavailable,
HTTPUnprocessableEntity
)
from pyramid.settings import asbool
from weaver.database import get_db
from weaver.exceptions import ProcessNotFound, ServiceException, log_unhandled_exceptions
from weaver.formats import OutputFormat
from weaver.processes import opensearch
from weaver.processes.execution import submit_job
from weaver.processes.types import ProcessType
from weaver.processes.utils import deploy_process_from_payload, get_job_submission_response, get_process
from weaver.store.base import StoreProcesses
from weaver.utils import fully_qualified_name, get_any_id, repr_json
from weaver.visibility import Visibility
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.processes.utils import get_process_list_links, get_processes_filtered_by_valid_schemas
from weaver.wps_restapi.providers.utils import get_provider_services
if TYPE_CHECKING:
from weaver.typedefs import JSON
[docs]LOGGER = logging.getLogger(__name__)
@sd.processes_service.get(schema=sd.GetProcessesEndpoint(), tags=[sd.TAG_PROCESSES, sd.TAG_GETCAPABILITIES],
response_schemas=sd.get_processes_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_processes(request):
"""
List registered processes (GetCapabilities).
Optionally list both local and provider processes.
"""
try:
params = sd.GetProcessesQuery().deserialize(request.params)
except colander.Invalid as ex:
raise HTTPBadRequest(json={
"code": "ProcessInvalidParameter",
"description": "Process query parameters failed validation.",
"error": colander.Invalid.__name__,
"cause": str(ex),
"value": repr_json(ex.value or dict(request.params), force_string=False),
})
detail = asbool(params.get("detail", True))
ignore = asbool(params.get("ignore", True))
try:
# get local processes and filter according to schema validity
# (previously deployed process schemas can become invalid because of modified schema definitions
results = get_processes_filtered_by_valid_schemas(request)
processes, invalid_processes, paging, with_providers, total_processes = results
if invalid_processes:
raise HTTPServiceUnavailable(
"Previously deployed processes are causing invalid schema integrity errors. "
"Manual cleanup of following processes is required: {}".format(invalid_processes))
body = {"processes": processes if detail else [get_any_id(p) for p in processes]} # type: JSON
if not with_providers:
paging = {"page": paging.get("page"), "limit": paging.get("limit")} # remove other params
body.update(paging)
else:
paging = {} # disable to remove paging-related links
try:
body["links"] = get_process_list_links(request, paging, total_processes)
except IndexError as exc:
raise HTTPBadRequest(json={
"description": str(exc),
"cause": "Invalid paging parameters.",
"error": type(exc).__name__,
"value": repr_json(paging, force_string=False)
})
# if 'EMS/HYBRID' and '?providers=True', also fetch each provider's processes
if with_providers:
# param 'check' enforced because must fetch for listing of available processes (GetCapabilities)
# when 'ignore' is not enabled, any failing definition should raise any derived 'ServiceException'
services = get_provider_services(request, ignore=ignore, check=True)
body.update({
"providers": [svc.summary(request, ignore=ignore) if detail else {"id": svc.name} for svc in services]
})
invalid_services = [False] * len(services)
for i, provider in enumerate(services):
# ignore failing parsing of the service description
if body["providers"][i] is None:
invalid_services[i] = True
continue
# attempt parsing available processes and ignore again failing items
processes = provider.processes(request, ignore=ignore)
if processes is None:
invalid_services[i] = True
continue
total_processes += len(processes)
body["providers"][i].update({
"processes": processes if detail else [get_any_id(proc) for proc in processes]
})
if any(invalid_services):
LOGGER.debug("Invalid providers dropped due to failing parsing and ignore query: %s",
[svc.name for svc, status in zip(services, invalid_services) if status])
body["providers"] = [svc for svc, ignore in zip(body["providers"], invalid_services) if not ignore]
body["total"] = total_processes
body["description"] = sd.OkGetProcessesListResponse.description
LOGGER.debug("Process listing generated, validating schema...")
body = sd.MultiProcessesListing().deserialize(body)
return HTTPOk(json=body)
except ServiceException as exc:
LOGGER.debug("Error when listing provider processes using query parameter raised: [%s]", exc, exc_info=exc)
raise HTTPServiceUnavailable(json={
"description": "At least one provider could not list its processes. "
"Failing provider errors were requested to not be ignored.",
"exception": fully_qualified_name(exc),
"error": str(exc)
})
except HTTPException:
raise
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))
@sd.processes_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_DEPLOY], renderer=OutputFormat.JSON,
schema=sd.PostProcessesEndpoint(), response_schemas=sd.post_processes_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def add_local_process(request):
"""
Register a local process.
"""
return deploy_process_from_payload(request.json, request)
@sd.process_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON,
schema=sd.ProcessEndpoint(), response_schemas=sd.get_process_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_local_process(request):
"""
Get a registered local process information (DescribeProcess).
"""
try:
process = get_process(request=request)
process["inputs"] = opensearch.replace_inputs_describe_process(process.inputs, process.payload)
schema = request.params.get("schema")
offering = process.offering(schema)
return HTTPOk(json=offering)
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema: [{!s}]\nValue: [{!s}]".format(ex, ex.value))
@sd.process_package_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON,
schema=sd.ProcessPackageEndpoint(), response_schemas=sd.get_process_package_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_local_process_package(request):
"""
Get a registered local process package definition.
"""
process = get_process(request=request)
return HTTPOk(json=process.package or {})
@sd.process_payload_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON,
schema=sd.ProcessPayloadEndpoint(), response_schemas=sd.get_process_payload_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_local_process_payload(request):
"""
Get a registered local process payload definition.
"""
process = get_process(request=request)
return HTTPOk(json=process.payload or {})
@sd.process_visibility_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_VISIBILITY], renderer=OutputFormat.JSON,
schema=sd.ProcessVisibilityGetEndpoint(),
response_schemas=sd.get_process_visibility_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_process_visibility(request):
"""
Get the visibility of a registered local process.
"""
process = get_process(request=request)
return HTTPOk(json={u"value": process.visibility})
@sd.process_visibility_service.put(tags=[sd.TAG_PROCESSES, sd.TAG_VISIBILITY], renderer=OutputFormat.JSON,
schema=sd.ProcessVisibilityPutEndpoint(),
response_schemas=sd.put_process_visibility_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def set_process_visibility(request):
"""
Set the visibility of a registered local process.
"""
visibility = Visibility.get(request.json.get("value"))
process_id = request.matchdict.get("process_id")
if not isinstance(process_id, str):
raise HTTPUnprocessableEntity("Invalid process identifier.")
if visibility not in Visibility:
raise HTTPBadRequest("Invalid visibility value specified: {!s}".format(visibility))
try:
store = get_db(request).get_store(StoreProcesses)
process = store.fetch_by_id(process_id)
if process.type == ProcessType.BUILTIN:
raise HTTPForbidden("Cannot change the visibility of builtin process.")
store.set_visibility(process_id, visibility)
return HTTPOk(json={u"value": visibility})
except TypeError:
raise HTTPBadRequest("Value of visibility must be a string.")
except ValueError:
raise HTTPUnprocessableEntity("Value of visibility must be one of : {!s}".format(Visibility.values()))
except ProcessNotFound as ex:
raise HTTPNotFound(str(ex))
@sd.process_service.delete(tags=[sd.TAG_PROCESSES, sd.TAG_DEPLOY], renderer=OutputFormat.JSON,
schema=sd.ProcessEndpoint(), response_schemas=sd.delete_process_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def delete_local_process(request):
"""
Unregister a local process.
"""
store = get_db(request).get_store(StoreProcesses)
process = get_process(request=request, store=store)
process_id = process.id
if process.type == ProcessType.BUILTIN:
raise HTTPForbidden("Cannot delete a builtin process.")
if store.delete_process(process_id, visibility=Visibility.PUBLIC):
return HTTPOk(json={
"description": sd.OkDeleteProcessResponse.description,
"identifier": process_id,
"undeploymentDone": True,
})
LOGGER.error("Existing process [%s] should have been deleted with success status.", process_id)
raise HTTPForbidden("Deletion of process has been refused by the database or could not have been validated.")
@sd.process_execution_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OutputFormat.JSON,
schema=sd.PostProcessJobsEndpoint(), response_schemas=sd.post_process_jobs_responses)
@sd.process_jobs_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OutputFormat.JSON,
schema=sd.PostProcessJobsEndpoint(), response_schemas=sd.post_process_jobs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def submit_local_job(request):
"""
Execute a process registered locally.
Execution location and method is according to deployed Application Package.
"""
process = get_process(request=request)
body = submit_job(request, process, tags=["wps-rest"])
return get_job_submission_response(body)