weaver.processes.wps_process_base

Module Contents

weaver.processes.wps_process_base.LOGGER[source]
class weaver.processes.wps_process_base.RemoteJobProgress[source]

Progress of a remotely monitored job process execution.

Note

Implementations can reuse same progress values or intermediate ones within the range of the relevant sections.

SETUP = 1[source]
PREPARE = 2[source]
READY = 5[source]
STAGE_IN = 10[source]
FORMAT_IO = 12[source]
EXECUTION = 15[source]
MONITORING = 20[source]
RESULTS = 85[source]
STAGE_OUT = 90[source]
CLEANUP = 95[source]
COMPLETED = 100[source]
class weaver.processes.wps_process_base.WpsProcessInterface(request: weaver.wps.service.WorkerRequest, update_status: weaver.typedefs.UpdateStatusPartialFunction)[source]

Common interface for WPS Process to be used for dispatching CWL jobs.

Multiple convenience methods are provided. Processes inheriting from this base should implement required abstract methods and override operations as needed.

Note

For expected operations details and their execution order, please refer to Workflow Step Operations.

See also

execute() for complete details of the operations and ordering.

execute(workflow_inputs: weaver.typedefs.CWL_RuntimeInputsMap, out_dir: str, expected_outputs: weaver.typedefs.CWL_ExpectedOutputs) None[source]

Execute the core operation of the remote Process using the given inputs.

The function is expected to monitor the process and update the status. Retrieve the expected outputs and store them in the out_dir.

Parameters:
  • workflow_inputsCWL job dict

  • out_dir – directory where the outputs must be written

  • expected_outputs – expected value outputs as {‘id’: ‘value’}

prepare() None[source]

Implementation dependent operations to prepare the Process for Job execution.

This is an optional step that can be omitted entirely if not needed.

format_inputs(workflow_inputs: weaver.typedefs.JobInputs) weaver.typedefs.JobInputs | Any[source]

Implementation dependent operations to configure input values for Job execution.

This is an optional step that will simply pass down the inputs as is if no formatting is required. Otherwise, the implementing Process can override the step to reorganize workflow step inputs into the necessary format required for their dispatch() call.

format_outputs(workflow_outputs: weaver.typedefs.JobOutputs) weaver.typedefs.JobOutputs[source]

Implementation dependent operations to configure expected outputs for Job execution.

This is an optional step that will simply pass down the outputs as is if no formatting is required. Otherwise, the implementing Process can override the step to reorganize workflow step outputs into the necessary format required for their dispatch() call.

abstract dispatch(process_inputs: weaver.typedefs.JobInputs, process_outputs: weaver.typedefs.JobOutputs) weaver.typedefs.JobMonitorReference[source]

Implementation dependent operations to dispatch the Job execution to the remote Process.

Returns:

reference details that will be passed to monitor().

abstract monitor(monitor_reference: weaver.typedefs.JobMonitorReference) bool[source]

Implementation dependent operations to monitor the status of the Job execution that was dispatched.

This step should block execute() until the final status of the remote Job (failed/success) can be obtained.

Returns:

success status

abstract get_results(monitor_reference: weaver.typedefs.JobMonitorReference) weaver.typedefs.JobResults[source]

Implementation dependent operations to retrieve the results following a successful Job execution.

The operation should NOT fetch (stage) results, but only obtain the locations where they can be retrieved, based on the monitoring reference that was generated from the execution.

Returns:

results locations

cleanup() None[source]

Implementation dependent operations to clean the Process or Job execution.

This is an optional step that doesn’t require any override if not needed by derived classes.

get_auth_headers() weaver.typedefs.AnyHeadersContainer[source]

Implementation dependent operation to retrieve applicable authorization headers.

This method is employed for every make_request() call to avoid manually providing them each time. Any overriding method should consider calling this method to retrieve authorization headers from WPS request.

get_auth_cookies() weaver.typedefs.CookiesTupleType[source]

Implementation dependent operation to retrieve applicable authorization cookies.

This method is employed for every make_request() call to avoid manually providing them each time. Any overriding method should consider calling this method to retrieve authorization cookies from WPS request.

make_request(method: str, url: str, retry: bool | int = False, cookies: weaver.typedefs.AnyCookiesContainer | None = None, headers: weaver.typedefs.AnyHeadersContainer | None = None, **kwargs) weaver.typedefs.AnyResponseType[source]

Sends the request with additional parameter handling for the current process definition.

host_reference(reference: str) str[source]

Hosts an intermediate reference between Workflow steps for processes that require remote access.

Parameters:

reference – Intermediate file or directory location (local path expected).

Returns:

Hosted temporary HTTP file or directory location.

stage_results(results: weaver.typedefs.JobResults, expected_outputs: weaver.typedefs.CWL_ExpectedOutputs, out_dir: str) None[source]

Retrieves the remote execution Job results for staging locally into the specified output directory.

This operation should be called by the implementing remote Process definition after execute().

Note

The CWL runner expects the output file(s) to be written matching definition in expected_outputs, but this definition could be a glob pattern to match multiple file and/or nested directories. We cannot rely on specific file names to be mapped, since glob can match many (eg: "*.txt").

See also

Function weaver.processes.convert._convert_any2cwl_io_complex() defines a generic glob pattern from the expected file extension based on Content-Type format. Since the remote WPS Process doesn’t necessarily produce file names with the output ID as expected to find them (could be anything), staging must patch locations to let CWL runtime resolve the files according to glob definitions.

stage_inputs(workflow_inputs: weaver.typedefs.CWL_WorkflowInputs) weaver.typedefs.JobInputs[source]

Retrieves inputs for local staging if required for the following Job execution.

class weaver.processes.wps_process_base.OGCAPIRemoteProcessBase(step_payload: weaver.typedefs.JSON, process: str, request: weaver.wps.service.WorkerRequest, update_status: weaver.typedefs.UpdateStatusPartialFunction)[source]

Common interface for WPS Process to be used for dispatching CWL jobs.

Multiple convenience methods are provided. Processes inheriting from this base should implement required abstract methods and override operations as needed.

Note

For expected operations details and their execution order, please refer to Workflow Step Operations.

See also

execute() for complete details of the operations and ordering.

process_type: str[source]
provider: str[source]
url: str[source]
format_outputs(workflow_outputs: weaver.typedefs.JobOutputs) weaver.typedefs.JobOutputs[source]

Implementation dependent operations to configure expected outputs for Job execution.

This is an optional step that will simply pass down the outputs as is if no formatting is required. Otherwise, the implementing Process can override the step to reorganize workflow step outputs into the necessary format required for their dispatch() call.

dispatch(process_inputs: weaver.typedefs.JobInputs, process_outputs: weaver.typedefs.JobOutputs) str[source]

Implementation dependent operations to dispatch the Job execution to the remote Process.

Returns:

reference details that will be passed to monitor().

monitor(monitor_reference: str) bool[source]

Implementation dependent operations to monitor the status of the Job execution that was dispatched.

This step should block execute() until the final status of the remote Job (failed/success) can be obtained.

Returns:

success status

get_job_status(job_status_uri: weaver.typedefs.JobMonitorReference, retry: bool | int = True) weaver.typedefs.JSON[source]

Obtains the contents from the Job status response.

get_results(monitor_reference: str) weaver.typedefs.JobResults[source]

Obtains produced output results from successful job status ID.