Source code for weaver.wps_restapi.processes.processes

import logging
import os
from time import sleep
from typing import TYPE_CHECKING

import colander
import requests
import six
from celery.utils.log import get_task_logger
from lxml import etree
from owslib.util import clean_ows_url
from owslib.wps import ComplexDataInput, WebProcessingService
from pyramid.httpexceptions import (
    HTTPBadRequest,
    HTTPCreated,
    HTTPForbidden,
    HTTPNotFound,
    HTTPNotImplemented,
    HTTPOk,
    HTTPServiceUnavailable,
    HTTPSuccessful,
    HTTPUnauthorized,
    HTTPUnprocessableEntity
)
from pyramid.request import Request
from pyramid.settings import asbool
from pyramid_celery import celery_app as app

from weaver.config import WEAVER_CONFIGURATION_EMS, get_weaver_configuration
from weaver.database import get_db
from weaver.datatype import Service
from weaver.exceptions import InvalidIdentifierValue, ProcessNotAccessible, ProcessNotFound, log_unhandled_exceptions
from weaver.execute import (
    EXECUTE_MODE_ASYNC,
    EXECUTE_MODE_AUTO,
    EXECUTE_MODE_SYNC,
    EXECUTE_RESPONSE_DOCUMENT,
    EXECUTE_TRANSMISSION_MODE_REFERENCE
)
from weaver.formats import CONTENT_TYPE_APP_JSON
from weaver.owsexceptions import OWSNoApplicableCode
from weaver.processes import opensearch, wps_package
from weaver.processes.constants import WPS_COMPLEX_DATA
from weaver.processes.types import PROCESS_BUILTIN, PROCESS_WORKFLOW
from weaver.processes.utils import convert_process_wps_to_db, deploy_process_from_payload, jsonify_output
from weaver.status import STATUS_ACCEPTED, STATUS_FAILED, STATUS_STARTED, STATUS_SUCCEEDED, map_status
from weaver.store.base import StoreJobs, StoreProcesses, StoreServices
from weaver.utils import get_any_id, get_any_value, get_cookie_headers, get_settings, raise_on_xml_exception, wait_secs
from weaver.visibility import VISIBILITY_PUBLIC, VISIBILITY_VALUES
from weaver.wps import get_wps_output_dir, get_wps_output_path, get_wps_output_url, load_pywps_cfg
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.jobs.jobs import check_status
from weaver.wps_restapi.jobs.notify import encrypt_email, notify_job_complete
from weaver.wps_restapi.utils import OUTPUT_FORMAT_JSON, get_wps_restapi_base_url, parse_request_query

if TYPE_CHECKING:
    # pylint: disable=W0611,unused-import
    from weaver.datatype import Process as ProcessDB    # noqa: F401
    from weaver.typedefs import JSON, SettingsType      # noqa: F401
    from typing import AnyStr, List, Tuple, Optional    # noqa: F401

[docs]LOGGER = logging.getLogger(__name__)
# job process execution progress
[docs]JOB_PROGRESS_SETUP = 1
[docs]JOB_PROGRESS_DESCRIBE = 2
[docs]JOB_PROGRESS_GET_INPUTS = 4
[docs]JOB_PROGRESS_GET_OUTPUTS = 6
[docs]JOB_PROGRESS_EXECUTE_REQUEST = 8
[docs]JOB_PROGRESS_EXECUTE_STATUS_LOCATION = 10
[docs]JOB_PROGRESS_EXECUTE_MONITOR_START = 20
[docs]JOB_PROGRESS_EXECUTE_MONITOR_LOOP = 30
[docs]JOB_PROGRESS_EXECUTE_MONITOR_ERROR = 80
[docs]JOB_PROGRESS_EXECUTE_MONITOR_END = 90
[docs]JOB_PROGRESS_NOTIFY = 95
[docs]JOB_PROGRESS_DONE = 100
@app.task(bind=True)
[docs]def execute_process(self, job_id, url, headers=None, notification_email=None): LOGGER.debug("Job execute process called.") settings = get_settings(app) task_logger = get_task_logger(__name__) load_pywps_cfg(settings) ssl_verify = asbool(settings.get("weaver.ssl_verify", True)) wps_out_dir = get_wps_output_dir(settings) task_logger.debug("Job task setup.") store = get_db(app).get_store(StoreJobs) job = store.fetch_by_id(job_id) job.task_id = self.request.id job.progress = JOB_PROGRESS_SETUP job.save_log(logger=task_logger, message="Job task setup completed.") job = store.update_job(job) try: try: job.progress = JOB_PROGRESS_DESCRIBE job.save_log(logger=task_logger, message="Execute WPS request for process [{!s}]".format(job.process)) wps = WebProcessingService(url=url, headers=get_cookie_headers(headers), verify=ssl_verify) set_wps_language(wps, accept_language=job.accept_language) raise_on_xml_exception(wps._capabilities) # noqa except Exception as ex: raise OWSNoApplicableCode("Failed to retrieve WPS capabilities. Error: [{}].".format(str(ex))) try: process = wps.describeprocess(job.process) except Exception as ex: raise OWSNoApplicableCode("Failed to retrieve WPS process description. Error: [{}].".format(str(ex))) # prepare inputs job.progress = JOB_PROGRESS_GET_INPUTS job.save_log(logger=task_logger, message="Fetching job input definitions.") complex_inputs = [] for process_input in process.dataInputs: if WPS_COMPLEX_DATA in process_input.dataType: complex_inputs.append(process_input.identifier) try: wps_inputs = list() for process_input in job.inputs: input_id = get_any_id(process_input) process_value = get_any_value(process_input) # in case of array inputs, must repeat (id,value) input_values = process_value if isinstance(process_value, list) else [process_value] # we need to support file:// scheme but PyWPS doesn't like them so remove the scheme file:// input_values = [val[7:] if str(val).startswith("file://") else val for val in input_values] # need to use ComplexDataInput structure for complex input # need to use literal String for anything else than complex # TODO: BoundingBox not supported wps_inputs.extend([ (input_id, ComplexDataInput(input_value) if input_id in complex_inputs else str(input_value)) for input_value in input_values]) except KeyError: wps_inputs = [] # prepare outputs job.progress = JOB_PROGRESS_GET_OUTPUTS job.save_log(logger=task_logger, message="Fetching job output definitions.") outputs = [(o.identifier, o.dataType == WPS_COMPLEX_DATA) for o in process.processOutputs] mode = EXECUTE_MODE_ASYNC if job.execute_async else EXECUTE_MODE_SYNC job.progress = JOB_PROGRESS_EXECUTE_REQUEST job.save_log(logger=task_logger, message="Starting job process execution") execution = wps.execute(job.process, inputs=wps_inputs, output=outputs, mode=mode, lineage=True) if not execution.process and execution.errors: raise execution.errors[0] # adjust status location wps_status_path = execution.statusLocation job.progress = JOB_PROGRESS_EXECUTE_STATUS_LOCATION job.save_log(logger=task_logger, message="Verifying job status location.") if not execution.statusLocation.startswith("http") and not os.path.isfile(execution.statusLocation): wps_status_path = "file://{}".format(os.path.join(wps_out_dir, execution.statusLocation)) if os.path.isfile(wps_status_path): execution.statusLocation = wps_status_path job.save_log(logger=task_logger, level=logging.INFO, message="WPS status location has been corrected using internal server location.") else: job.save_log(logger=task_logger, level=logging.WARNING, message="WPS status location could not be found") LOGGER.debug("WPS status location that will be queried: [%s]", wps_status_path) job.status = map_status(STATUS_STARTED) job.status_message = execution.statusMessage or "{} initiation done.".format(str(job)) job.status_location = wps_status_path job.request = execution.request job.response = etree.tostring(execution.response) job.progress = JOB_PROGRESS_EXECUTE_MONITOR_START job.save_log(logger=task_logger, message="Starting monitoring of job execution.") job = store.update_job(job) max_retries = 5 num_retries = 0 run_step = 0 while execution.isNotComplete() or run_step == 0: if num_retries >= max_retries: raise Exception("Could not read status document after {} retries. Giving up.".format(max_retries)) try: # NOTE: # Don't actually log anything here until process is completed (success or fail) so that underlying # WPS execution logs can be inserted within the current job log and appear continuously. # Only update internal job fields in case they get referenced elsewhere. job.progress = JOB_PROGRESS_EXECUTE_MONITOR_LOOP execution = check_status(url=wps_status_path, verify=ssl_verify, sleep_secs=wait_secs(run_step)) job_msg = (execution.statusMessage or "").strip() job.response = etree.tostring(execution.response) job.status = map_status(execution.getStatus()) job.status_message = "Job execution monitoring (progress: {}%, status: {})."\ .format(execution.percentCompleted, job_msg or "n/a") # job.save_log(logger=task_logger) # job = store.update_job(job) if execution.isComplete(): job.mark_finished() job.progress = JOB_PROGRESS_EXECUTE_MONITOR_END msg_progress = " (status: {})".format(job_msg) if job_msg else "" if execution.isSucceded(): job.status = map_status(STATUS_SUCCEEDED) job.status_message = "Job succeeded{}.".format(msg_progress) wps_package.retrieve_package_job_log(execution, job) job.save_log(logger=task_logger) job_results = [jsonify_output(output, process) for output in execution.processOutputs] job.results = make_results_relative(job_results, settings) else: task_logger.debug("Job failed.") job.status_message = "Job failed{}.".format(msg_progress) wps_package.retrieve_package_job_log(execution, job) job.save_log(errors=execution.errors, logger=task_logger) except Exception as exc: num_retries += 1 task_logger.debug("Exception raised: %s", repr(exc)) job.status_message = "Could not read status xml document for {!s}. Trying again...".format(job) job.save_log(errors=execution.errors, logger=task_logger) sleep(1) else: # job.status_message = "Update {}...".format(str(job)) # job.save_log(logger=task_logger) num_retries = 0 run_step += 1 finally: job = store.update_job(job) except Exception as exc: LOGGER.exception("Failed running [%s]", job) job.status = map_status(STATUS_FAILED) job.status_message = "Failed to run {!s}.".format(job) job.progress = JOB_PROGRESS_EXECUTE_MONITOR_ERROR exception_class = "{}.{}".format(type(exc).__module__, type(exc).__name__) errors = "{0}: {1!s}".format(exception_class, exc) job.save_log(errors=errors, logger=task_logger) finally: job.progress = JOB_PROGRESS_EXECUTE_MONITOR_END job.status_message = "Job {}.".format(job.status) job.save_log(logger=task_logger) # Send email if requested if notification_email is not None: job.progress = JOB_PROGRESS_NOTIFY try: notify_job_complete(job, notification_email, settings) message = "Notification email sent successfully." job.save_log(logger=task_logger, message=message) except Exception as exc: exception_class = "{}.{}".format(type(exc).__module__, type(exc).__name__) exception = "{0}: {1!s}".format(exception_class, exc) message = "Couldn't send notification email ({})".format(exception) job.save_log(errors=message, logger=task_logger, message=message) job.progress = JOB_PROGRESS_DONE job.save_log(logger=task_logger, message="Job task complete.") job = store.update_job(job) return job.status
[docs]def make_results_relative(results, settings): # type: (List[JSON], SettingsType) -> List[JSON] """ Redefines job results to be saved in database as relative paths to output directory configured in PyWPS (i.e.: relative to ``weaver.wps_output_dir``). This allows us to easily adjust the exposed result HTTP path according to server configuration (i.e.: relative to ``weaver.wps_output_path`` and/or ``weaver.wps_output_url``) and it also avoid rewriting the whole database job results if the setting is changed later on. """ wps_url = get_wps_output_url(settings) wps_path = get_wps_output_path(settings) for res in results: ref = res.get("reference") if isinstance(ref, six.string_types) and ref: if ref.startswith(wps_url): ref = ref.replace(wps_url, "", 1) if ref.startswith(wps_path): ref = ref.replace(wps_path, "", 1) res["reference"] = ref return results
[docs]def set_wps_language(wps, accept_language=None, request=None): # type: (WebProcessingService, Optional[str], Optional[Request]) -> None """Set the :attr:`language` property on the :class:`WebProcessingService` object. Given the `Accept-Language` header value, match the best language to the supported languages. By default, and if no match is found, the :attr:`WebProcessingService.language` property is set to None. https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Language (q-factor weighting is ignored, only order is considered) :param wps: process for which to set the language header if it is accepted :param str accept_language: the value of the Accept-Language header :param request: request from which to extract Accept-Language header if not provided directly """ if not accept_language and request: accept_language = request.accept_language.header_value if not accept_language: return if not hasattr(wps, "languages"): # owslib version doesn't support setting a language return accepted_languages = [lang.strip().split(";")[0] for lang in accept_language.lower().split(",")] for accept in accepted_languages: for language in wps.languages.supported: # noqa # Accept-Language header could be only 'fr' instead of 'fr-CA' if language.lower().startswith(accept): wps.language = language return
[docs]def validate_supported_submit_job_handler_parameters(json_body): """ Tests supported parameters not automatically validated by colander deserialize. """ if json_body["mode"] not in [EXECUTE_MODE_ASYNC, EXECUTE_MODE_AUTO]: raise HTTPNotImplemented(detail="Execution mode '{}' not supported.".format(json_body["mode"])) if json_body["response"] != EXECUTE_RESPONSE_DOCUMENT: raise HTTPNotImplemented(detail="Execution response type '{}' not supported.".format(json_body["response"])) for job_output in json_body["outputs"]: if job_output["transmissionMode"] != EXECUTE_TRANSMISSION_MODE_REFERENCE: raise HTTPNotImplemented(detail="Execute transmissionMode '{}' not supported." .format(job_output["transmissionMode"]))
[docs]def submit_job_handler(request, service_url, is_workflow=False, visibility=None): # type: (Request, AnyStr, bool, Optional[AnyStr]) -> HTTPSuccessful # validate body with expected JSON content and schema if CONTENT_TYPE_APP_JSON not in request.content_type: raise HTTPBadRequest("Request 'Content-Type' header other than '{}' not supported." .format(CONTENT_TYPE_APP_JSON)) try: json_body = request.json_body except Exception as ex: raise HTTPBadRequest("Invalid JSON body cannot be decoded for job submission. [{}]".format(ex)) try: json_body = sd.Execute().deserialize(json_body) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{}]".format(str(ex))) # TODO: remove when all parameter variations are supported validate_supported_submit_job_handler_parameters(json_body) settings = get_settings(request) provider_id = request.matchdict.get("provider_id") # None OK if local process_id = request.matchdict.get("process_id") tags = request.params.get("tags", "").split(",") is_execute_async = json_body["mode"] != EXECUTE_MODE_SYNC # convert auto to async notification_email = json_body.get("notification_email") encrypted_email = encrypt_email(notification_email, settings) if notification_email else None store = get_db(request).get_store(StoreJobs) job = store.save_job(task_id=STATUS_ACCEPTED, process=process_id, service=provider_id, inputs=json_body.get("inputs"), is_workflow=is_workflow, access=visibility, user_id=request.authenticated_userid, execute_async=is_execute_async, custom_tags=tags, notification_email=encrypted_email, accept_language=request.accept_language.header_value) result = execute_process.delay( job_id=job.id, url=clean_ows_url(service_url), # Convert EnvironHeaders to a simple dict (should cherrypick the required headers) headers={k: v for k, v in request.headers.items()}, notification_email=notification_email) LOGGER.debug("Celery pending task [%s] for job [%s].", result.id, job.id) # local/provider process location location_base = "/providers/{provider_id}".format(provider_id=provider_id) if provider_id else "" location = "{base_url}{location_base}/processes/{process_id}/jobs/{job_id}".format( base_url=get_wps_restapi_base_url(settings), location_base=location_base, process_id=process_id, job_id=job.id) body_data = { "jobID": job.id, "status": map_status(STATUS_ACCEPTED), "location": location } return HTTPCreated(location=location, json=body_data)
@sd.jobs_full_service.post(tags=[sd.TAG_PROVIDER_PROCESS, sd.TAG_PROVIDERS, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OUTPUT_FORMAT_JSON, schema=sd.PostProviderProcessJobRequest(), response_schemas=sd.post_provider_process_job_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorPostProviderProcessJobResponse.description)
[docs]def submit_provider_job(request): """ Execute a provider process. """ store = get_db(request).get_store(StoreServices) provider_id = request.matchdict.get("provider_id") service = store.fetch_by_name(provider_id, request=request) return submit_job_handler(request, service.url)
[docs]def list_remote_processes(service, request): # type: (Service, Request) -> List[ProcessDB] """ Obtains a list of remote service processes in a compatible :class:`weaver.datatype.Process` format. Note: remote processes won't be stored to the local process storage. """ wps = WebProcessingService(url=service.url, headers=get_cookie_headers(request.headers)) set_wps_language(wps, request=request) settings = get_settings(request) return [convert_process_wps_to_db(service, process, settings) for process in wps.processes]
@sd.provider_processes_service.get(tags=[sd.TAG_PROVIDER_PROCESS, sd.TAG_PROVIDERS, sd.TAG_GETCAPABILITIES], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderEndpoint(), response_schemas=sd.get_provider_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProviderProcessesListResponse.description)
[docs]def get_provider_processes(request): """ Retrieve available provider processes (GetCapabilities). """ provider_id = request.matchdict.get("provider_id") store = get_db(request).get_store(StoreServices) service = store.fetch_by_name(provider_id, request=request) processes = list_remote_processes(service, request=request) return HTTPOk(json=[p.json() for p in processes])
[docs]def describe_provider_process(request): # type: (Request) -> ProcessDB """ Obtains a remote service process description in a compatible local process format. Note: this processes won't be stored to the local process storage. """ provider_id = request.matchdict.get("provider_id") process_id = request.matchdict.get("process_id") store = get_db(request).get_store(StoreServices) service = store.fetch_by_name(provider_id, request=request) wps = WebProcessingService(url=service.url, headers=get_cookie_headers(request.headers)) set_wps_language(wps, request=request) process = wps.describeprocess(process_id) return convert_process_wps_to_db(service, process, get_settings(request))
@sd.provider_process_service.get(tags=[sd.TAG_PROVIDER_PROCESS, sd.TAG_PROVIDERS, sd.TAG_DESCRIBEPROCESS], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderProcessEndpoint(), response_schemas=sd.get_provider_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProviderProcessResponse.description)
[docs]def get_provider_process(request): """ Retrieve a process description (DescribeProcess). """ try: process = describe_provider_process(request) process_offering = process.process_offering() return HTTPOk(json=process_offering) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))
[docs]def get_processes_filtered_by_valid_schemas(request): # type: (Request) -> Tuple[List[JSON], List[AnyStr]] """ Validates the processes summary schemas and returns them into valid/invalid lists. :returns: list of valid process summaries and invalid processes IDs for manual cleanup. """ store = get_db(request).get_store(StoreProcesses) processes = store.list_processes(visibility=VISIBILITY_PUBLIC, request=request) valid_processes = list() invalid_processes_ids = list() for process in processes: try: valid_processes.append(process.process_summary()) except colander.Invalid: invalid_processes_ids.append(process.identifier) return valid_processes, invalid_processes_ids
@sd.processes_service.get(schema=sd.GetProcessesEndpoint(), tags=[sd.TAG_PROCESSES, sd.TAG_GETCAPABILITIES], response_schemas=sd.get_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProcessesListResponse.description)
[docs]def get_processes(request): """ List registered processes (GetCapabilities). Optionally list both local and provider processes. """ detail = asbool(request.params.get("detail", True)) try: # get local processes and filter according to schema validity # (previously deployed process schemas can become invalid because of modified schema definitions processes, invalid_processes = get_processes_filtered_by_valid_schemas(request) if invalid_processes: raise HTTPServiceUnavailable( "Previously deployed processes are causing invalid schema integrity errors. " "Manual cleanup of following processes is required: {}".format(invalid_processes)) response_body = {"processes": processes if detail else [get_any_id(p) for p in processes]} # if 'EMS' and '?providers=True', also fetch each provider's processes if get_weaver_configuration(get_settings(request)) == WEAVER_CONFIGURATION_EMS: queries = parse_request_query(request) if "providers" in queries and asbool(queries["providers"][0]) is True: providers_response = requests.request("GET", "{host}/providers".format(host=request.host_url), headers=request.headers, cookies=request.cookies) providers = providers_response.json() response_body.update({"providers": providers}) for i, provider in enumerate(providers): provider_id = get_any_id(provider) processes = requests.request("GET", "{host}/providers/{provider_id}/processes" .format(host=request.host_url, provider_id=provider_id), headers=request.headers, cookies=request.cookies) response_body["providers"][i].update({ "processes": processes if detail else [get_any_id(p) for p in processes] }) return HTTPOk(json=response_body) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))
@sd.processes_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_DEPLOY], renderer=OUTPUT_FORMAT_JSON, schema=sd.PostProcessesEndpoint(), response_schemas=sd.post_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorPostProcessesResponse.description)
[docs]def add_local_process(request): """ Register a local process. """ return deploy_process_from_payload(request.json, request)
[docs]def get_process(request): # type: (Request) -> ProcessDB process_id = request.matchdict.get("process_id") if not isinstance(process_id, six.string_types): raise HTTPUnprocessableEntity("Invalid parameter 'process_id'.") try: store = get_db(request).get_store(StoreProcesses) process = store.fetch_by_id(process_id, visibility=VISIBILITY_PUBLIC, request=request) return process except InvalidIdentifierValue as ex: raise HTTPBadRequest(str(ex)) except ProcessNotAccessible: raise HTTPUnauthorized("Process with id '{!s}' is not accessible.".format(process_id)) except ProcessNotFound: raise HTTPNotFound("Process with id '{!s}' does not exist.".format(process_id)) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema:\n[{0!r}].".format(ex))
@sd.process_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessEndpoint(), response_schemas=sd.get_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProcessResponse.description)
[docs]def get_local_process(request): """ Get a registered local process information (DescribeProcess). """ try: process = get_process(request) process["inputs"] = opensearch.replace_inputs_describe_process(process.inputs, process.payload) process_offering = process.process_offering() return HTTPOk(json=process_offering) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))
@sd.process_package_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessPackageEndpoint(), response_schemas=sd.get_process_package_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProcessPackageResponse.description)
[docs]def get_local_process_package(request): """ Get a registered local process package definition. """ process = get_process(request) return HTTPOk(json=process.package or {})
@sd.process_payload_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessPayloadEndpoint(), response_schemas=sd.get_process_payload_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProcessPayloadResponse.description)
[docs]def get_local_process_payload(request): """ Get a registered local process payload definition. """ process = get_process(request) return HTTPOk(json=process.payload or {})
@sd.process_visibility_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_VISIBILITY], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessVisibilityGetEndpoint(), response_schemas=sd.get_process_visibility_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorGetProcessVisibilityResponse.description)
[docs]def get_process_visibility(request): """ Get the visibility of a registered local process. """ process_id = request.matchdict.get("process_id") if not isinstance(process_id, six.string_types): raise HTTPUnprocessableEntity("Invalid parameter 'process_id'.") try: store = get_db(request).get_store(StoreProcesses) visibility_value = store.get_visibility(process_id, request=request) return HTTPOk(json={u"value": visibility_value}) except ProcessNotFound as ex: raise HTTPNotFound(str(ex))
@sd.process_visibility_service.put(tags=[sd.TAG_PROCESSES, sd.TAG_VISIBILITY], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessVisibilityPutEndpoint(), response_schemas=sd.put_process_visibility_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorPutProcessVisibilityResponse.description)
[docs]def set_process_visibility(request): """ Set the visibility of a registered local process. """ visibility_value = request.json.get("value") process_id = request.matchdict.get("process_id") if not isinstance(process_id, six.string_types): raise HTTPUnprocessableEntity("Invalid parameter 'process_id'.") if not isinstance(visibility_value, six.string_types): raise HTTPUnprocessableEntity("Invalid visibility value specified.") if visibility_value not in VISIBILITY_VALUES: raise HTTPBadRequest("Invalid visibility value specified.") try: store = get_db(request).get_store(StoreProcesses) process = store.fetch_by_id(process_id) if process.type == PROCESS_BUILTIN: raise HTTPForbidden("Cannot change the visibility of builtin process.") store.set_visibility(process_id, visibility_value, request=request) return HTTPOk(json={u"value": visibility_value}) except TypeError: raise HTTPBadRequest("Value of visibility must be a string.") except ValueError: raise HTTPUnprocessableEntity("Value of visibility must be one of : {!s}".format(list(VISIBILITY_VALUES))) except ProcessNotFound as ex: raise HTTPNotFound(str(ex))
@sd.process_service.delete(tags=[sd.TAG_PROCESSES, sd.TAG_DEPLOY], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessEndpoint(), response_schemas=sd.delete_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorDeleteProcessResponse.description)
[docs]def delete_local_process(request): """ Unregister a local process. """ process_id = request.matchdict.get("process_id") if not isinstance(process_id, six.string_types): raise HTTPUnprocessableEntity("Invalid parameter 'process_id'.") try: store = get_db(request).get_store(StoreProcesses) process = store.fetch_by_id(process_id) if process.type == PROCESS_BUILTIN: raise HTTPForbidden("Cannot delete a builtin process.") if store.delete_process(process_id, visibility=VISIBILITY_PUBLIC, request=request): return HTTPOk(json={"undeploymentDone": True, "identifier": process_id}) LOGGER.error("Existing process [%s] should have been deleted with success status.", process_id) raise HTTPForbidden("Deletion of process has been refused by the database or could not have been validated.") except InvalidIdentifierValue as ex: raise HTTPBadRequest(str(ex)) except ProcessNotAccessible: raise HTTPUnauthorized("Process with id '{!s}' is not accessible.".format(process_id)) except ProcessNotFound: description = "Process with id '{!s}' does not exist.".format(process_id) raise HTTPNotFound(description)
@sd.process_jobs_service.post(tags=[sd.TAG_PROCESSES, sd.TAG_EXECUTE, sd.TAG_JOBS], renderer=OUTPUT_FORMAT_JSON, schema=sd.PostProcessJobsEndpoint(), response_schemas=sd.post_process_jobs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorPostProcessJobResponse.description)
[docs]def submit_local_job(request): """ Execute a local process. """ process_id = request.matchdict.get("process_id") if not isinstance(process_id, six.string_types): raise HTTPUnprocessableEntity("Invalid parameter 'process_id'.") try: store = get_db(request).get_store(StoreProcesses) process = store.fetch_by_id(process_id, visibility=VISIBILITY_PUBLIC, request=request) resp = submit_job_handler(request, process.processEndpointWPS1, is_workflow=process.type == PROCESS_WORKFLOW, visibility=process.visibility) return resp except InvalidIdentifierValue as ex: raise HTTPBadRequest(str(ex)) except ProcessNotAccessible: raise HTTPUnauthorized("Process with id '{!s}' is not accessible.".format(process_id)) except ProcessNotFound: raise HTTPNotFound("The process with id '{!s}' does not exist.".format(process_id))