weaver.processes.esgf_process

Module Contents

weaver.processes.esgf_process.ESGFProcessInputs[source]
weaver.processes.esgf_process.LAST_PERCENT_REGEX[source]
class weaver.processes.esgf_process.Percent[source]
PREPARING = 2[source]
SENDING = 3[source]
COMPUTE_DONE = 98[source]
FINISHED = 100[source]
class weaver.processes.esgf_process.InputNames[source]
FILES = 'files'[source]
VARIABLE = 'variable'[source]
API_KEY = 'api_key'[source]
TIME = 'time'[source]
LAT = 'lat'[source]
LON = 'lon'[source]
class weaver.processes.esgf_process.InputArguments[source]
START = 'start'[source]
END = 'end'[source]
CRS = 'crs'[source]
class weaver.processes.esgf_process.ESGFProcess(provider: str, process: str, request: weaver.wps.service.WorkerRequest, update_status: weaver.typedefs.UpdateStatusPartialFunction)[source]

Common interface for WPS Process to be used for dispatching CWL jobs.

Multiple convenience methods are provided. Processes inheriting from this base should implement required abstract methods and override operations as needed.

Note

For expected operations details and their execution order, please refer to Workflow Step Operations.

See also

execute() for complete details of the operations and ordering.

required_inputs[source]
provider[source]
process[source]
wps_provider: cwt.WPSClient | None = None[source]
wps_process: cwt.Process | None = None[source]
static _get_domain(workflow_inputs: weaver.typedefs.CWL_RuntimeInputsMap) cwt.Domain | None[source]
_check_required_inputs(workflow_inputs)[source]
static _get_files_urls(workflow_inputs: weaver.typedefs.JSON) List[Tuple[str, str]][source]

Get all netcdf files from the cwl inputs.

static _get_variable(workflow_inputs: weaver.typedefs.JSON) str[source]

Get all netcdf files from the cwl inputs.

prepare(workflow_inputs: weaver.typedefs.CWL_RuntimeInputsMap, expected_outputs: weaver.typedefs.CWL_ExpectedOutputs) None[source]

Prepare the ESGF-CWT WPS client.

format_inputs(job_inputs: weaver.typedefs.JobInputs) ESGFProcessInputs[source]

Convert inputs from cwl inputs to ESGF-CWT format.

dispatch(process_inputs: ESGFProcessInputs, process_outputs: weaver.typedefs.CWL_ExpectedOutputs) cwt.Process[source]

Run an ESGF-CWT process.

monitor(esgf_process: cwt.Process, sleep_time: float = 2) bool[source]

Wait for an ESGF-CWT process to finish, while reporting its status.

get_results(esgf_process: cwt.Process) weaver.typedefs.JobResults[source]

Process the result of the execution.

stage_results(results: weaver.typedefs.JobResults, expected_outputs: weaver.typedefs.CWL_ExpectedOutputs, out_dir: str) None[source]

Retrieves the remote execution Job results for staging locally into the specified output directory.

This operation should be called by the implementing remote Process definition after execute().

Note

The CWL runner expects the output file(s) to be written matching definition in expected_outputs, but this definition could be a glob pattern to match multiple file and/or nested directories. We cannot rely on specific file names to be mapped, since glob can match many (eg: "*.txt").

See also

Function weaver.processes.convert._convert_any2cwl_io_complex() defines a generic glob pattern from the expected file extension based on Content-Type format. Since the remote WPS Process doesn’t necessarily produce file names with the output ID as expected to find them (could be anything), staging must patch locations to let CWL runtime resolve the files according to glob definitions.