Source code for weaver.wps_restapi.processes.processes

import logging
from typing import TYPE_CHECKING

import colander
from pyramid.httpexceptions import (
    HTTPBadRequest,
    HTTPException,
    HTTPForbidden,
    HTTPNotFound,
    HTTPOk,
    HTTPServiceUnavailable,
    HTTPUnprocessableEntity
)
from pyramid.settings import asbool

from weaver.database import get_db
from weaver.exceptions import ProcessNotFound, ServiceException, log_unhandled_exceptions
from weaver.formats import OutputFormat
from weaver.processes import opensearch
from weaver.processes.execution import submit_job
from weaver.processes.types import ProcessType
from weaver.processes.utils import deploy_process_from_payload, get_job_submission_response, get_process
from weaver.store.base import StoreProcesses
from weaver.utils import fully_qualified_name, get_any_id, repr_json
from weaver.visibility import Visibility
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.processes.utils import get_process_list_links, get_processes_filtered_by_valid_schemas
from weaver.wps_restapi.providers.utils import get_provider_services

if TYPE_CHECKING:
    from weaver.typedefs import JSON

[docs]LOGGER = logging.getLogger(__name__)
@sd.processes_service.get(schema=sd.GetProcessesEndpoint(), tags=[sd.TAG_PROCESSES, sd.TAG_GETCAPABILITIES], response_schemas=sd.get_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_processes(request): """ List registered processes (GetCapabilities). Optionally list both local and provider processes. """ try: params = sd.GetProcessesQuery().deserialize(request.params) except colander.Invalid as ex: raise HTTPBadRequest(json={ "code": "ProcessInvalidParameter", "description": "Process query parameters failed validation.", "error": colander.Invalid.__name__, "cause": str(ex), "value": repr_json(ex.value or dict(request.params), force_string=False), }) detail = asbool(params.get("detail", True)) ignore = asbool(params.get("ignore", True)) try: # get local processes and filter according to schema validity # (previously deployed process schemas can become invalid because of modified schema definitions results = get_processes_filtered_by_valid_schemas(request) processes, invalid_processes, paging, with_providers, total_processes = results if invalid_processes: raise HTTPServiceUnavailable( "Previously deployed processes are causing invalid schema integrity errors. " "Manual cleanup of following processes is required: {}".format(invalid_processes)) body = {"processes": processes if detail else [get_any_id(p) for p in processes]} # type: JSON if not with_providers: paging = {"page": paging.get("page"), "limit": paging.get("limit")} # remove other params body.update(paging) else: paging = {} # disable to remove paging-related links try: body["links"] = get_process_list_links(request, paging, total_processes) except IndexError as exc: raise HTTPBadRequest(json={ "description": str(exc), "cause": "Invalid paging parameters.", "error": type(exc).__name__, "value": repr_json(paging, force_string=False) }) # if 'EMS/HYBRID' and '?providers=True', also fetch each provider's processes if with_providers: # param 'check' enforced because must fetch for listing of available processes (GetCapabilities) # when 'ignore' is not enabled, any failing definition should raise any derived 'ServiceException' services = get_provider_services(request, ignore=ignore, check=True) body.update({ "providers": [svc.summary(request, ignore=ignore) if detail else {"id": svc.name} for svc in services] }) invalid_services = [False] * len(services) for i, provider in enumerate(services): # ignore failing parsing of the service description if body["providers"][i] is None: invalid_services[i] = True continue # attempt parsing available processes and ignore again failing items processes = provider.processes(request, ignore=ignore) if processes is None: invalid_services[i] = True continue total_processes += len(processes) body["providers"][i].update({ "processes": processes if detail else [get_any_id(proc) for proc in processes] }) if any(invalid_services): LOGGER.debug("Invalid providers dropped due to failing parsing and ignore query: %s", [svc.name for svc, status in zip(services, invalid_services) if status]) body["providers"] = [svc for svc, ignore in zip(body["providers"], invalid_services) if not ignore] body["total"] = total_processes body["description"] = sd.OkGetProcessesListResponse.description LOGGER.debug("Process listing generated, validating schema...") body = sd.MultiProcessesListing().deserialize(body) return HTTPOk(json=body) except ServiceException as exc: LOGGER.debug("Error when listing provider processes using query parameter raised: [%s]", exc, exc_info=exc) raise HTTPServiceUnavailable(json={ "description": "At least one provider could not list its processes. " "Failing provider errors were requested to not be ignored.", "exception": fully_qualified_name(exc), "error": str(exc) }) except HTTPException: raise # FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))
@sd.processes_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_DEPLOY], renderer=OutputFormat.JSON, schema=sd.PostProcessesEndpoint(), response_schemas=sd.post_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def add_local_process(request): """ Register a local process. """ return deploy_process_from_payload(request.json, request)
@sd.process_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON, schema=sd.ProcessEndpoint(), response_schemas=sd.get_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_local_process(request): """ Get a registered local process information (DescribeProcess). """ try: process = get_process(request=request) process["inputs"] = opensearch.replace_inputs_describe_process(process.inputs, process.payload) schema = request.params.get("schema") offering = process.offering(schema) return HTTPOk(json=offering) # FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{!s}]\nValue: [{!s}]".format(ex, ex.value))
@sd.process_package_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON, schema=sd.ProcessPackageEndpoint(), response_schemas=sd.get_process_package_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_local_process_package(request): """ Get a registered local process package definition. """ process = get_process(request=request) return HTTPOk(json=process.package or {})
@sd.process_payload_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON, schema=sd.ProcessPayloadEndpoint(), response_schemas=sd.get_process_payload_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_local_process_payload(request): """ Get a registered local process payload definition. """ process = get_process(request=request) return HTTPOk(json=process.payload or {})
@sd.process_visibility_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_VISIBILITY], renderer=OutputFormat.JSON, schema=sd.ProcessVisibilityGetEndpoint(), response_schemas=sd.get_process_visibility_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_process_visibility(request): """ Get the visibility of a registered local process. """ process = get_process(request=request) return HTTPOk(json={u"value": process.visibility})
@sd.process_visibility_service.put(tags=[sd.TAG_PROCESSES, sd.TAG_VISIBILITY], renderer=OutputFormat.JSON, schema=sd.ProcessVisibilityPutEndpoint(), response_schemas=sd.put_process_visibility_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def set_process_visibility(request): """ Set the visibility of a registered local process. """ visibility = Visibility.get(request.json.get("value")) process_id = request.matchdict.get("process_id") if not isinstance(process_id, str): raise HTTPUnprocessableEntity("Invalid process identifier.") if visibility not in Visibility: raise HTTPBadRequest("Invalid visibility value specified: {!s}".format(visibility)) try: store = get_db(request).get_store(StoreProcesses) process = store.fetch_by_id(process_id) if process.type == ProcessType.BUILTIN: raise HTTPForbidden("Cannot change the visibility of builtin process.") store.set_visibility(process_id, visibility) return HTTPOk(json={u"value": visibility}) except TypeError: raise HTTPBadRequest("Value of visibility must be a string.") except ValueError: raise HTTPUnprocessableEntity("Value of visibility must be one of : {!s}".format(Visibility.values())) except ProcessNotFound as ex: raise HTTPNotFound(str(ex))
@sd.process_service.delete(tags=[sd.TAG_PROCESSES, sd.TAG_DEPLOY], renderer=OutputFormat.JSON, schema=sd.ProcessEndpoint(), response_schemas=sd.delete_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def delete_local_process(request): """ Unregister a local process. """ store = get_db(request).get_store(StoreProcesses) process = get_process(request=request, store=store) process_id = process.id if process.type == ProcessType.BUILTIN: raise HTTPForbidden("Cannot delete a builtin process.") if store.delete_process(process_id, visibility=Visibility.PUBLIC): return HTTPOk(json={ "description": sd.OkDeleteProcessResponse.description, "identifier": process_id, "undeploymentDone": True, }) LOGGER.error("Existing process [%s] should have been deleted with success status.", process_id) raise HTTPForbidden("Deletion of process has been refused by the database or could not have been validated.")
@sd.process_execution_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OutputFormat.JSON, schema=sd.PostProcessJobsEndpoint(), response_schemas=sd.post_process_jobs_responses) @sd.process_jobs_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OutputFormat.JSON, schema=sd.PostProcessJobsEndpoint(), response_schemas=sd.post_process_jobs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def submit_local_job(request): """ Execute a process registered locally. Execution location and method is according to deployed Application Package. """ process = get_process(request=request) body = submit_job(request, process, tags=["wps-rest"]) return get_job_submission_response(body)