Source code for weaver.processes.utils

import json
import logging
import warnings
from copy import deepcopy
from distutils.version import LooseVersion
from typing import TYPE_CHECKING

import colander
import six
import yaml
from owslib.wps import WebProcessingService, is_reference
from pyramid.httpexceptions import (
    HTTPBadRequest,
    HTTPConflict,
    HTTPException,
    HTTPNotFound,
    HTTPOk,
    HTTPUnprocessableEntity
)
from six.moves.urllib.error import URLError
from six.moves.urllib.parse import parse_qs, urlparse
from six.moves.urllib.request import urlopen

from weaver.config import (
    WEAVER_CONFIGURATION_EMS,
    WEAVER_DEFAULT_WPS_PROCESSES_CONFIG,
    get_weaver_config_file,
    get_weaver_configuration
)
from weaver.database import get_db
from weaver.datatype import Process as ProcessDB
from weaver.datatype import Service
from weaver.exceptions import (
    InvalidIdentifierValue,
    PackageNotFound,
    PackageRegistrationError,
    PackageTypeError,
    ProcessNotFound,
    ProcessRegistrationError,
    log_unhandled_exceptions
)
from weaver.formats import CONTENT_TYPE_APP_JSON, CONTENT_TYPE_TEXT_PLAIN
from weaver.processes.constants import WPS_COMPLEX_DATA
from weaver.processes.types import PROCESS_APPLICATION, PROCESS_WORKFLOW
from weaver.store.base import StoreProcesses
from weaver.utils import get_sane_name, get_settings, get_url_without_query
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url

if TYPE_CHECKING:
    from weaver.typedefs import AnyContainer, AnySettingsContainer, FileSystemPathType, JSON, Number
    from weaver.store.mongodb import MongodbProcessStore
    from typing import Any, AnyStr, Dict, List, Optional, Union
    from pywps import Process as ProcessWPS
    import owslib.wps
[docs]LOGGER = logging.getLogger(__name__)
[docs]def _get_data(output): # type: (owslib.wps.Output) -> Optional[Any] """ Extract the data from the output value. """ # process output data are append into a list and # WPS standard v1.0.0 specify that Output data field has zero or one value if output.data: return output.data[0] return None
[docs]def _read_reference(url): # type: (AnyStr) -> Optional[AnyStr] """ Read a reference HTTP(S) URL and return the content. """ if not isinstance(url, six.string_types): return None if not url.lower().startswith("http"): LOGGER.warning("URL reading not allowed because of potentially insecure scheme: [%s]", url) return None try: return urlopen(url).read() # nosec: B310 except URLError: return None
[docs]def _get_multi_json_references(output): # type: (owslib.wps.Output) -> Optional[List[JSON]] """ Since WPS standard does not allow to return multiple values for a single output, a lot of process actually return a json array containing references to these outputs. This function goal is to detect this particular format. :return: An array of HTTP(S) references if the specified output is effectively a JSON containing that, ``None`` otherwise. """ # Check for the json datatype and mimetype if output.dataType == WPS_COMPLEX_DATA and output.mimeType == CONTENT_TYPE_APP_JSON: # If the json data is referenced read it's content if output.reference: json_data_str = _read_reference(output.reference) # Else get the data directly else: json_data_str = _get_data(output) # Load the actual json dict json_data = json.loads(json_data_str) if isinstance(json_data, list): for data_value in json_data: if not is_reference(data_value): return None return json_data return None
[docs]def map_progress(progress, range_min, range_max): # type: (Number, Number, Number) -> Number """Calculates the relative progression of the percentage process within min/max values.""" return max(range_min, min(range_max, range_min + (progress * (range_max - range_min)) / 100))
[docs]def jsonify_output(output, process_description): # type: (owslib.wps.Output, owslib.wps.Process) -> JSON """ Utility method to jsonify an output element from a WPS1 process description. """ if not output.dataType: for process_output in getattr(process_description, "processOutputs", []): if getattr(process_output, "identifier", "") == output.identifier: output.dataType = process_output.dataType break json_output = dict(identifier=output.identifier, title=output.title, dataType=output.dataType) # WPS standard v1.0.0 specify that either a reference or a data field has to be provided if output.reference: json_output["reference"] = output.reference # Handle special case where we have a reference to a json array containing dataset reference # Avoid reference to reference by fetching directly the dataset references json_array = _get_multi_json_references(output) if json_array and all(str(ref).startswith("http") for ref in json_array): json_output["data"] = json_array else: # WPS standard v1.0.0 specify that Output data field has Zero or one value json_output["data"] = output.data[0] if output.data else None if json_output["dataType"] == WPS_COMPLEX_DATA: json_output["mimeType"] = output.mimeType return json_output
[docs]def convert_process_wps_to_db(service, process, container): # type: (Union[Service, Dict[{"url": AnyStr, "name": AnyStr}]], ProcessWPS, AnySettingsContainer) -> ProcessDB """ Converts an owslib WPS Process to local storage Process. """ from weaver.processes.wps_package import complex2json as jsonify_value describe_process_url = "{base_url}/providers/{provider_id}/processes/{process_id}".format( base_url=get_wps_restapi_base_url(container), provider_id=service.get("name"), process_id=process.identifier) execute_process_url = "{describe_url}/jobs".format(describe_url=describe_process_url) default_format = {"mimeType": CONTENT_TYPE_TEXT_PLAIN} inputs = [dict( id=getattr(dataInput, "identifier", ""), title=getattr(dataInput, "title", ""), abstract=getattr(dataInput, "abstract", ""), minOccurs=str(getattr(dataInput, "minOccurs", 0)), maxOccurs=str(getattr(dataInput, "maxOccurs", 0)), dataType=dataInput.dataType, defaultValue=jsonify_value(getattr(dataInput, "defaultValue", None)), allowedValues=[jsonify_value(dataValue) for dataValue in getattr(dataInput, "allowedValues", [])], supportedValues=[jsonify_value(dataValue) for dataValue in getattr(dataInput, "supportedValues", [])], formats=[jsonify_value(dataValue) for dataValue in getattr(dataInput, "supportedValues", [default_format])], ) for dataInput in getattr(process, "dataInputs", [])] outputs = [dict( id=getattr(processOutput, "identifier", ""), title=getattr(processOutput, "title", ""), abstract=getattr(processOutput, "abstract", ""), dataType=processOutput.dataType, defaultValue=jsonify_value(getattr(processOutput, "defaultValue", None)), formats=[jsonify_value(dataValue) for dataValue in getattr(processOutput, "supportedValues", [default_format])], ) for processOutput in getattr(process, "processOutputs", [])] return ProcessDB( id=process.identifier, label=getattr(process, "title", ""), title=getattr(process, "title", ""), abstract=getattr(process, "abstract", ""), inputs=inputs, outputs=outputs, url=describe_process_url, processEndpointWPS1=service.get("url"), processDescriptionURL=describe_process_url, executeEndpoint=execute_process_url, package=None,
) @log_unhandled_exceptions(logger=LOGGER, message="Unhandled error occurred during parsing of deploy payload.", is_request=False)
[docs]def _check_deploy(payload): """Validate minimum deploy payload field requirements with exception handling.""" try: sd.Deploy().deserialize(payload) except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))
@log_unhandled_exceptions(logger=LOGGER, message="Unhandled error occurred during parsing of process definition.", is_request=False)
[docs]def _get_deploy_process_info(process_info, reference, package): """Obtain the process definition from deploy payload with exception handling.""" from weaver.processes.wps_package import get_process_definition try: # data_source `None` forces workflow process to search locally for deployed step applications return get_process_definition(process_info, reference, package, data_source=None) except PackageNotFound as ex: # raised when a workflow sub-process is not found (not deployed locally) raise HTTPNotFound(detail=str(ex)) except InvalidIdentifierValue as ex: raise HTTPBadRequest(str(ex)) except (PackageRegistrationError, PackageTypeError) as ex: msg = "Invalid package/reference definition. Loading generated error: [{!s}]".format(ex) LOGGER.exception(msg) raise HTTPUnprocessableEntity(detail=msg)
[docs]def deploy_process_from_payload(payload, container): # type: (JSON, AnyContainer) -> HTTPException """ Adds a :class:`weaver.datatype.Process` instance to storage using the provided JSON ``payload`` matching :class:`weaver.wps_restapi.swagger_definitions.ProcessDescription`. :returns: HTTPOk if the process registration was successful :raises HTTPException: otherwise """ _check_deploy(payload) # use deepcopy of to remove any circular dependencies before writing to mongodb or any updates to the payload payload_copy = deepcopy(payload) # validate identifier naming for unsupported characters process_description = payload.get("processDescription") process_info = process_description.get("process", {}) process_href = process_description.pop("href", None) # retrieve CWL package definition, either via "href" (WPS-1/2), "owsContext" or "executionUnit" (package/reference) deployment_profile_name = payload.get("deploymentProfileName", "").lower() ows_context = process_info.pop("owsContext", None) reference = None package = None if process_href: reference = process_href # reference type handled downstream elif isinstance(ows_context, dict): offering = ows_context.get("offering") if not isinstance(offering, dict): raise HTTPUnprocessableEntity("Invalid parameter 'processDescription.process.owsContext.offering'.") content = offering.get("content") if not isinstance(content, dict): raise HTTPUnprocessableEntity("Invalid parameter 'processDescription.process.owsContext.offering.content'.") package = None reference = content.get("href") elif deployment_profile_name: if not any(deployment_profile_name.endswith(typ) for typ in [PROCESS_APPLICATION, PROCESS_WORKFLOW]): raise HTTPBadRequest("Invalid value for parameter 'deploymentProfileName'.") execution_units = payload.get("executionUnit") if not isinstance(execution_units, list): raise HTTPUnprocessableEntity("Invalid parameter 'executionUnit'.") for execution_unit in execution_units: if not isinstance(execution_unit, dict): raise HTTPUnprocessableEntity("Invalid parameter 'executionUnit'.") package = execution_unit.get("unit") reference = execution_unit.get("href") # stop on first package/reference found, simultaneous usage will raise during package retrieval if package or reference: break else: raise HTTPBadRequest("Missing one of required parameters [href, owsContext, deploymentProfileName].") # obtain updated process information using WPS process offering, CWL/WPS reference or CWL package definition process_info = _get_deploy_process_info(process_info, reference, package) # validate process type against weaver configuration settings = get_settings(container) process_type = process_info["type"] if process_type == PROCESS_WORKFLOW: weaver_config = get_weaver_configuration(settings) if weaver_config != WEAVER_CONFIGURATION_EMS: raise HTTPBadRequest("Invalid [{0}] package deployment on [{1}].".format(process_type, weaver_config)) restapi_url = get_wps_restapi_base_url(settings) description_url = "/".join([restapi_url, "processes", process_info["identifier"]]) execute_endpoint = "/".join([description_url, "jobs"]) # ensure that required "processEndpointWPS1" in db is added, # will be auto-fixed to localhost if not specified in body process_info["processEndpointWPS1"] = process_description.get("processEndpointWPS1") process_info["executeEndpoint"] = execute_endpoint process_info["payload"] = payload_copy process_info["jobControlOptions"] = process_description.get("jobControlOptions", []) process_info["outputTransmission"] = process_description.get("outputTransmission", []) process_info["processDescriptionURL"] = description_url # insert the "resolved" context using details retrieved from "executionUnit"/"href" or directly with "owsContext" if "owsContext" not in process_info and reference: process_info["owsContext"] = {"offering": {"content": {"href": str(reference)}}} elif isinstance(ows_context, dict): process_info["owsContext"] = ows_context try: store = get_db(container).get_store(StoreProcesses) saved_process = store.save_process(ProcessDB(process_info), overwrite=False) except ProcessRegistrationError as ex: raise HTTPConflict(detail=str(ex)) except ValueError as ex: # raised on invalid process name raise HTTPBadRequest(detail=str(ex)) json_response = {"processSummary": saved_process.process_summary(), "deploymentDone": True} return HTTPOk(json=json_response) # FIXME: should be 201 (created), update swagger accordingly
[docs]def register_wps_processes_from_config(wps_processes_file_path, container): # type: (Optional[FileSystemPathType], AnySettingsContainer) -> None """ Loads a `wps_processes.yml` file and registers `WPS-1` providers processes to the current `Weaver` instance as equivalent `WPS-2` processes. .. seealso:: - `weaver.wps_processes.yml.example` for additional file format details """ if wps_processes_file_path is None: warnings.warn("No file specified for WPS-1 providers registration.", RuntimeWarning) wps_processes_file_path = get_weaver_config_file("", WEAVER_DEFAULT_WPS_PROCESSES_CONFIG) elif wps_processes_file_path == "": warnings.warn("Configuration file for WPS-1 providers registration explicitly defined as empty in settings. " "Not loading anything.", RuntimeWarning) return try: with open(wps_processes_file_path, "r") as f: processes_config = yaml.safe_load(f) processes = processes_config.get("processes") if not processes: LOGGER.warning("Nothing to process from file: [%s]", wps_processes_file_path) return process_store = get_db(container).get_store(StoreProcesses) # type: MongodbProcessStore for cfg_service in processes: # parse info if isinstance(cfg_service, dict): svc_url = cfg_service["url"] svc_name = cfg_service.get("name") svc_proc = cfg_service.get("id", []) elif isinstance(cfg_service, six.string_types): svc_url = cfg_service svc_name = None svc_proc = [] else: raise ValueError("Invalid service value: [{!s}].".format(cfg_service)) url_p = urlparse(svc_url) qs_p = parse_qs(url_p.query) svc_url = get_url_without_query(url_p) svc_name = svc_name or get_sane_name(url_p.hostname) svc_proc = svc_proc or qs_p.get("identifier", []) if not isinstance(svc_name, six.string_types): raise ValueError("Invalid service value: [{!s}].".format(svc_name)) if not isinstance(svc_proc, list): raise ValueError("Invalid process value: [{!s}].".format(svc_proc)) # fetch data LOGGER.info("Fetching WPS-1: [%s]", svc_url) wps = WebProcessingService(url=svc_url) if LooseVersion(wps.version) >= LooseVersion("2.0"): LOGGER.warning("Invalid WPS-1 provider, version was [%s]", wps.version) continue wps_processes = [wps.describeprocess(p) for p in svc_proc] or wps.processes for wps_process in wps_processes: proc_id = "{}_{}".format(svc_name, get_sane_name(wps_process.identifier)) try: process_store.fetch_by_id(proc_id) except ProcessNotFound: pass else: LOGGER.warning("Process already registered: [%s]. Skipping...", proc_id) continue proc_url = "{}?service=WPS&request=DescribeProcess&identifier={}&version={}" \ .format(svc_url, wps_process.identifier, wps.version) payload = { "processDescription": {"process": {"id": proc_id}}, "executionUnit": [{"href": proc_url}], "deploymentProfileName": "http://www.opengis.net/profiles/eoc/wpsApplication", } # noinspection PyBroadException try: resp = deploy_process_from_payload(payload, container) if resp.status_code == HTTPOk.status_code: LOGGER.info("Process registered: [%s]", proc_id) else: raise RuntimeError("Process registration failed: [{}]".format(proc_id)) except Exception as ex: LOGGER.exception("Exception during process registration: [%r]", ex) continue except Exception as exc: msg = "Invalid WPS-1 providers configuration file [{!r}].".format(exc) LOGGER.exception(msg) raise RuntimeError(msg)