weaver.processes.wps_process_base ================================= .. py:module:: weaver.processes.wps_process_base Module Contents --------------- .. py:data:: LOGGER .. py:class:: RemoteJobProgress Progress of a remotely monitored job process execution. .. note:: Implementations can reuse same progress values or intermediate ones within the range of the relevant sections. .. py:attribute:: SETUP :value: 1 .. py:attribute:: PREPARE :value: 2 .. py:attribute:: READY :value: 5 .. py:attribute:: STAGE_IN :value: 10 .. py:attribute:: FORMAT_IO :value: 12 .. py:attribute:: EXECUTION :value: 15 .. py:attribute:: MONITORING :value: 20 .. py:attribute:: RESULTS :value: 85 .. py:attribute:: STAGE_OUT :value: 90 .. py:attribute:: CLEANUP :value: 95 .. py:attribute:: COMPLETED :value: 100 .. py:class:: WpsProcessInterface(request: Optional[weaver.wps.service.WorkerRequest], update_status: weaver.typedefs.UpdateStatusPartialFunction) Common interface for :term:`WPS` :term:`Process` to be used for dispatching :term:`CWL` jobs. Multiple convenience methods are provided. Processes inheriting from this base should implement required abstract methods and override operations as needed. .. note:: For expected operations details and their execution order, please refer to :ref:`proc_workflow_ops`. .. seealso:: :meth:`execute` for complete details of the operations and ordering. .. py:attribute:: request .. py:attribute:: headers .. py:attribute:: settings .. py:attribute:: update_status :type: weaver.typedefs.UpdateStatusPartialFunction .. py:attribute:: temp_staging .. py:method:: execute(workflow_inputs: Union[weaver.typedefs.CWL_WorkflowInputs, weaver.typedefs.JobCustomInputs], out_dir: str, expected_outputs: Union[weaver.typedefs.CWL_ExpectedOutputs, weaver.typedefs.JobCustomOutputs]) -> weaver.typedefs.JobOutputs Execute the core operation of the remote :term:`Process` using the given inputs. The function is expected to monitor the process and update the status. Retrieve the expected outputs and store them in the ``out_dir``. :param workflow_inputs: :term:`CWL` job dictionary. :param out_dir: directory where the outputs must be written. :param expected_outputs: expected outputs to collect from complex references. .. py:method:: prepare(workflow_inputs: Union[weaver.typedefs.CWL_WorkflowInputs, weaver.typedefs.JobCustomInputs], expected_outputs: Union[weaver.typedefs.CWL_ExpectedOutputs, weaver.typedefs.JobCustomOutputs]) -> None Implementation dependent operations to prepare the :term:`Process` for :term:`Job` execution. This is an optional step that can be omitted entirely if not needed. This step should be considered for the creation of a reusable client or object handler that does not need to be recreated on any subsequent steps, such as for :meth:`dispatch` and :meth:`monitor` calls. .. py:method:: format_inputs(job_inputs: weaver.typedefs.JobInputs) -> Union[weaver.typedefs.JobInputs, weaver.typedefs.JobCustomInputs] Implementation dependent operations to configure input values for :term:`Job` execution. This is an optional step that will simply pass down the inputs as is if no formatting is required. Otherwise, the implementing :term:`Process` can override the step to reorganize workflow step inputs into the necessary format required for their :meth:`dispatch` call. .. py:method:: format_outputs(job_outputs: weaver.typedefs.JobOutputs) -> Optional[Union[weaver.typedefs.JobOutputs, weaver.typedefs.JobCustomOutputs]] Implementation dependent operations to configure expected outputs for :term:`Job` execution. This is an optional step that will simply pass down the outputs as is if no formatting is required. Otherwise, the implementing :term:`Process` can override the step to reorganize workflow step outputs into the necessary format required for their :meth:`dispatch` call. .. py:method:: dispatch(process_inputs: Union[weaver.typedefs.JobInputs, weaver.typedefs.JobCustomInputs], process_outputs: Optional[Union[weaver.typedefs.JobOutputs, weaver.typedefs.JobCustomOutputs]]) -> weaver.typedefs.JobMonitorReference :abstractmethod: Implementation dependent operations to dispatch the :term:`Job` execution to the remote :term:`Process`. :returns: reference details that will be passed to :meth:`monitor`. .. py:method:: monitor(monitor_reference: weaver.typedefs.JobMonitorReference) -> bool :abstractmethod: Implementation dependent operations to monitor the status of the :term:`Job` execution that was dispatched. This step should block :meth:`execute` until the final status of the remote :term:`Job` (failed/success) can be obtained. :returns: success status .. py:method:: get_results(monitor_reference: weaver.typedefs.JobMonitorReference) -> weaver.typedefs.JobResults :abstractmethod: Implementation dependent operations to retrieve the results following a successful :term:`Job` execution. The operation should **NOT** fetch (stage) results, but only obtain the locations where they can be retrieved, based on the monitoring reference that was generated from the execution. :returns: results locations .. py:method:: cleanup() -> None Implementation dependent operations to clean the :term:`Process` or :term:`Job` execution. This is an optional step that doesn't require any override if not needed by derived classes. .. py:method:: get_auth_headers() -> weaver.typedefs.AnyHeadersContainer Implementation dependent operation to retrieve applicable authorization headers. This method is employed for every :meth:`make_request` call to avoid manually providing them each time. Any overriding method should consider calling this method to retrieve authorization headers from WPS request. .. py:method:: get_auth_cookies() -> weaver.typedefs.CookiesTupleType Implementation dependent operation to retrieve applicable authorization cookies. This method is employed for every :meth:`make_request` call to avoid manually providing them each time. Any overriding method should consider calling this method to retrieve authorization cookies from WPS request. .. py:method:: make_request(method: str, url: str, retry: Union[bool, int] = False, cookies: Optional[weaver.typedefs.AnyCookiesContainer] = None, headers: Optional[weaver.typedefs.AnyHeadersContainer] = None, **kwargs) -> weaver.typedefs.AnyResponseType Sends the request with additional parameter handling for the current process definition. .. py:method:: host_reference(reference: str) -> str Hosts an intermediate reference between :term:`Workflow` steps for processes that require remote access. :param reference: Intermediate file or directory location (local path expected). :return: Hosted temporary HTTP file or directory location. .. py:method:: stage_results(results: weaver.typedefs.JobResults, expected_outputs: weaver.typedefs.CWL_ExpectedOutputs, out_dir: str) -> None Retrieves the remote execution :term:`Job` results for staging locally into the specified output directory. This operation should be called by the implementing remote :term:`Process` definition after :meth:`execute`. .. note:: The :term:`CWL` runner expects the output file(s) to be written matching definition in ``expected_outputs``, but this definition could be a glob pattern to match multiple file and/or nested directories. We cannot rely on specific file names to be mapped, since glob can match many (eg: ``"*.txt"``). .. seealso:: Function :func:`weaver.processes.convert._convert_any2cwl_io_complex` defines a generic glob pattern from the expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process` doesn't necessarily produce file names with the output ID as expected to find them (could be anything), staging must patch locations to let :term:`CWL` runtime resolve the files according to glob definitions. .. py:method:: stage_inputs(workflow_inputs: weaver.typedefs.CWL_WorkflowInputs) -> weaver.typedefs.JobInputs Retrieves inputs for local staging if required for the following :term:`Job` execution. .. py:class:: OGCAPIRemoteProcessBase(step_payload: weaver.typedefs.JSON, process: str, request: Optional[weaver.wps.service.WorkerRequest], update_status: weaver.typedefs.UpdateStatusPartialFunction) Common interface for :term:`WPS` :term:`Process` to be used for dispatching :term:`CWL` jobs. Multiple convenience methods are provided. Processes inheriting from this base should implement required abstract methods and override operations as needed. .. note:: For expected operations details and their execution order, please refer to :ref:`proc_workflow_ops`. .. seealso:: :meth:`execute` for complete details of the operations and ordering. .. py:attribute:: process_type :type: str :value: NotImplemented .. py:attribute:: provider :type: str :value: NotImplemented .. py:attribute:: url :type: str :value: NotImplemented .. py:attribute:: deploy_body .. py:attribute:: process .. py:method:: format_outputs(job_outputs: weaver.typedefs.JobOutputs) -> Optional[weaver.typedefs.JobOutputs] Implementation dependent operations to configure expected outputs for :term:`Job` execution. This is an optional step that will simply pass down the outputs as is if no formatting is required. Otherwise, the implementing :term:`Process` can override the step to reorganize workflow step outputs into the necessary format required for their :meth:`dispatch` call. .. py:method:: dispatch(process_inputs: weaver.typedefs.JobInputs, process_outputs: Optional[weaver.typedefs.JobOutputs]) -> str Implementation dependent operations to dispatch the :term:`Job` execution to the remote :term:`Process`. :returns: reference details that will be passed to :meth:`monitor`. .. py:method:: monitor(monitor_reference: str) -> bool Implementation dependent operations to monitor the status of the :term:`Job` execution that was dispatched. This step should block :meth:`execute` until the final status of the remote :term:`Job` (failed/success) can be obtained. :returns: success status .. py:method:: get_job_status(job_status_uri: weaver.typedefs.JobMonitorReference, retry: Union[bool, int] = True) -> weaver.typedefs.JSON Obtains the contents from the :term:`Job` status response. .. py:method:: get_results(monitor_reference: str) -> weaver.typedefs.JobResults Obtains produced output results from successful job status ID.