weaver.processes.wps_workflow

Module Contents

weaver.processes.wps_workflow.MonitorFunction[source]
weaver.processes.wps_workflow.LOGGER[source]
weaver.processes.wps_workflow.DEFAULT_TMP_PREFIX = 'tmp'[source]
weaver.processes.wps_workflow.default_make_tool(toolpath_object: weaver.typedefs.CWL_ToolPathObject, loading_context: cwltool.context.LoadingContext, package_process: weaver.processes.wps_package.WpsPackage) cwltool.process.Process[source]

Generate the tool class object from the CWL definition to handle its execution.

Warning

Package cwltool introduces explicit typing definitions with mypy_extensions. This can cause TypeError("interpreted classes cannot inherit from compiled") when using cwltool.process.Process as base class for our custom definitions below. To avoid the error, we must enforce the type using cast().

class weaver.processes.wps_workflow.WpsWorkflow(toolpath_object: weaver.typedefs.CWL_ToolPathObject, loading_context: cwltool.context.LoadingContext, package_process: weaver.processes.wps_package.WpsPackage)[source]

Definition of a CWL workflow that can execute WPS application packages as intermediate job steps.

Steps are expected to be defined as individual weaver.processes.wps_package.WpsPackage references.

Initialize this CommandLineTool.

prov_obj[source]
package_process[source]
get_job_process_definition[source]
requirements[source]
hints[source]
job(job_order: cwltool.utils.CWLObjectType, output_callbacks: Callable[[Any, Any], Any], runtime_context: cwltool.context.RuntimeContext) cwltool.utils.JobsGeneratorType[source]

Workflow job generator.

Parameters:
  • job_order – inputs of the job submission

  • output_callbacks – method to fetch step outputs and corresponding step details

  • runtime_context – configs about execution environment

Returns:

collect_output(schema: cwltool.utils.CWLObjectType, builder: cwltool.builder.Builder, outdir: str, fs_access: cwltool.stdfsaccess.StdFsAccess, compute_checksum: bool = True) cwltool.utils.CWLOutputType | None[source]

Collect outputs from the step Process following its execution.

Note

When CWL runner tries to forward step(i) outputs -> step(i+1) inputs using collect_outputs(), it expects exact outputBindings locations to be matched. In other words, a definition like outputBindings: {glob: outputs/*.txt} will generate results located in step(i) as "<tmp-workdir>/outputs/file.txt" and step(i+1) will look explicitly in "<tmp-workdir>/outputs using the glob pattern. Because each of our Process in the workflow are distinct/remote entities, each one stages its outputs at different URL locations, not sharing the same root directory. When we stage intermediate results locally, the sub-dirs are lost. Therefore, they act like individual CWL runner calls where the final results are moved back to the local directory for convenient access, but our local directory is the URL WPS-outputs location. To let CWL Workflow inter-steps mapping work as intended, we must remap the locations ignoring any nested dirs where the modified outputBindings definition will be able to match as if each step Process outputs were generated locally.

Note

Because the staging operation following remote Process execution nests each output under a directory name matching respective output IDs, globs must be updated with that modified nested directory as well.

class weaver.processes.wps_workflow.WpsWorkflowJob(builder: cwltool.builder.Builder, job_order: cwltool.utils.CWLObjectType, make_path_mapper: Callable[Ellipsis, cwltool.pathmapper.PathMapper], requirements: weaver.typedefs.CWL_RequirementsList, hints: weaver.typedefs.CWL_RequirementsList, name: str, wps_process: weaver.processes.wps_process_base.WpsProcessInterface, expected_outputs: List[weaver.typedefs.CWL_Output_Type])[source]

Base class for get_requirement().

Initialize the job object.

wps_process: weaver.processes.wps_process_base.WpsProcessInterface[source]
expected_outputs: weaver.typedefs.CWL_ExpectedOutputs[source]
_resolve_expected_output_files(expected_outputs: List[weaver.typedefs.CWL_Output_Type]) None[source]

Finds the {id: glob-pattern} for any eventual output files to collect from the Workflow step.

Parameters:

expected_outputs – All potential output definitions the step defines.

Returns:

Resolved output locations to look for result files, considering the nested Output-ID path.

_retrieve_secret_inputs(runtime_context: cwltool.context.RuntimeContext) weaver.typedefs.CWL_RuntimeInputsMap[source]

Retrieve the job inputs with reverse resolution of any secrets defined in the store.

Because the remote Process (of any type) will not have the local runtime store definition, they cannot themselves resolve the original values.

Warning

Any following steps after this call must be mindful of what they log to avoid leaking secrets.

collect_literal_outputs(results: weaver.typedefs.JobResults) weaver.typedefs.CWL_Results[source]

Collects the literal values provided as output.

Typical CWL applications (i.e: the ones not implemented by derived WpsProcessInterface) will only return outputs from File/Directory, or a literal inferred by some JavaScript expression or loaded by contents from another source. Because of this, literal never needs to be collected in these cases. Since a Process derived from WpsProcessInterface can directly return literal data, their values must be explicitly chained for following Workflow steps.

Parameters:

results – The original Job results obtained from the remote Process.

Returns:

Collected literal outputs.

_execute(runtime: List[str], env: MutableMapping[str, str], runtime_context: cwltool.context.RuntimeContext, monitor_function: MonitorFunction = None) None[source]

Execute the WPS Process defined as Workflow step and chains their intermediate results.