Source code for weaver.processes.wps1_process

import logging
from time import sleep
from typing import TYPE_CHECKING, AnyStr

from owslib.wps import ComplexDataInput, WebProcessingService

from weaver import status
from weaver.execute import EXECUTE_MODE_ASYNC
from weaver.owsexceptions import OWSNoApplicableCode
from weaver.processes.constants import WPS_COMPLEX_DATA
from weaver.processes.utils import jsonify_output, map_progress
from weaver.processes.wps_process_base import WpsProcessInterface
from weaver.utils import (
    get_any_id,
    get_any_value,
    get_job_log_msg,
    get_log_monitor_msg,
    raise_on_xml_exception,
    request_extra,
    wait_secs
)
from weaver.wps_restapi.jobs.jobs import check_status

if TYPE_CHECKING:
    from weaver.typedefs import UpdateStatusPartialFunction     # noqa: F401
    from pywps.app import WPSRequest                            # noqa: F401

[docs]LOGGER = logging.getLogger(__name__)
[docs]REMOTE_JOB_PROGRESS_REQ_PREP = 2
[docs]REMOTE_JOB_PROGRESS_EXECUTION = 5
[docs]REMOTE_JOB_PROGRESS_MONITORING = 10
[docs]REMOTE_JOB_PROGRESS_FETCH_OUT = 90
[docs]REMOTE_JOB_PROGRESS_COMPLETED = 100
[docs]class Wps1Process(WpsProcessInterface): def __init__(self, provider, # type: AnyStr process, # type: AnyStr request, # type: WPSRequest update_status, # type: UpdateStatusPartialFunction ): super(Wps1Process, self).__init__(request) self.provider = provider self.process = process self.update_status = lambda _message, _progress, _status: update_status( self.provider, _message, _progress, _status)
[docs] def execute(self, workflow_inputs, out_dir, expected_outputs): self.update_status("Preparing execute request for remote WPS1 provider.", REMOTE_JOB_PROGRESS_REQ_PREP, status.STATUS_RUNNING) LOGGER.debug("Execute process WPS request for %s", self.process) try: try: wps = WebProcessingService(url=self.provider, headers=self.cookies, verify=self.verify) raise_on_xml_exception(wps._capabilities) # noqa: W0212 except Exception as ex: raise OWSNoApplicableCode("Failed to retrieve WPS capabilities. Error: [{}].".format(str(ex))) try: process = wps.describeprocess(self.process) except Exception as ex: raise OWSNoApplicableCode("Failed to retrieve WPS process description. Error: [{}].".format(str(ex))) # prepare inputs complex_inputs = [] for process_input in process.dataInputs: if WPS_COMPLEX_DATA in process_input.dataType: complex_inputs.append(process_input.identifier) # remove any 'null' input, should employ the 'default' of the remote WPS process workflow_inputs_provided_keys = filter(lambda i: workflow_inputs[i] != "null", workflow_inputs) try: wps_inputs = list() for workflow_input_key in workflow_inputs_provided_keys: workflow_input_val = workflow_inputs[workflow_input_key] # in case of array inputs, must repeat (id,value) # in case of complex input (File), obtain location, otherwise get data value input_values = [val["location"] if isinstance(val, dict) else val for val in (workflow_input_val if isinstance(workflow_input_val, list) else [workflow_input_val])] # we need to host file starting with file:// scheme input_values = [self.host_file(val) if str(val).startswith("file://") else val for val in input_values] # need to use ComplexDataInput structure for complex input # need to use literal String for anything else than complex # TODO: BoundingBox not supported wps_inputs.extend([ (workflow_input_key, ComplexDataInput(input_value) if workflow_input_key in complex_inputs else str(input_value)) for input_value in input_values]) except KeyError: wps_inputs = [] # prepare outputs outputs = [(o.identifier, o.dataType == WPS_COMPLEX_DATA) for o in process.processOutputs if o.identifier in expected_outputs] self.update_status("Executing job on remote WPS1 provider.", REMOTE_JOB_PROGRESS_EXECUTION, status.STATUS_RUNNING) mode = EXECUTE_MODE_ASYNC execution = wps.execute(self.process, inputs=wps_inputs, output=outputs, mode=mode, lineage=True) if not execution.process and execution.errors: raise execution.errors[0] self.update_status("Monitoring job on remote WPS1 provider : [{0}]".format(self.provider), REMOTE_JOB_PROGRESS_MONITORING, status.STATUS_RUNNING) max_retries = 5 num_retries = 0 run_step = 0 job_id = "<undefined>" while execution.isNotComplete() or run_step == 0: if num_retries >= max_retries: raise Exception("Could not read status document after {} retries. Giving up.".format(max_retries)) try: execution = check_status(url=execution.statusLocation, verify=self.verify, sleep_secs=wait_secs(run_step)) job_id = execution.statusLocation.replace(".xml", "").split("/")[-1] LOGGER.debug(get_log_monitor_msg(job_id, status.map_status(execution.getStatus()), execution.percentCompleted, execution.statusMessage, execution.statusLocation)) self.update_status(get_job_log_msg(status=status.map_status(execution.getStatus()), message=execution.statusMessage, progress=execution.percentCompleted, duration=None), # get if available map_progress(execution.percentCompleted, REMOTE_JOB_PROGRESS_MONITORING, REMOTE_JOB_PROGRESS_FETCH_OUT), status.STATUS_RUNNING) except Exception as exc: num_retries += 1 LOGGER.debug("Exception raised: %r", exc) sleep(1) else: num_retries = 0 run_step += 1 if not execution.isSucceded(): exec_msg = execution.statusMessage or "Job failed." LOGGER.debug(get_log_monitor_msg(job_id, status.map_status(execution.getStatus()), execution.percentCompleted, exec_msg, execution.statusLocation)) raise Exception(execution.statusMessage or "Job failed.") self.update_status("Fetching job outputs from remote WPS1 provider.", REMOTE_JOB_PROGRESS_FETCH_OUT, status.STATUS_RUNNING) results = [jsonify_output(output, process) for output in execution.processOutputs] for result in results: result_id = get_any_id(result) result_val = get_any_value(result) if result_id in expected_outputs: # This is where cwl expect the output file to be written # TODO We will probably need to handle multiple output value... dst_fn = "/".join([out_dir.rstrip("/"), expected_outputs[result_id]]) # TODO Should we handle other type than File reference? resp = request_extra("get", result_val, allow_redirects=True, settings=self.settings) LOGGER.debug("Fetching result output from [%s] to cwl output destination: [%s]", result_val, dst_fn) with open(dst_fn, mode="wb") as dst_fh: dst_fh.write(resp.content) except Exception as exc: exception_class = "{}.{}".format(type(exc).__module__, type(exc).__name__) errors = "{0}: {1!s}".format(exception_class, exc) raise Exception(errors) self.update_status("Execution on remote WPS1 provider completed.", REMOTE_JOB_PROGRESS_COMPLETED, status.STATUS_SUCCEEDED)