weaver.processes.wps_workflow ============================= .. py:module:: weaver.processes.wps_workflow Module Contents --------------- .. py:data:: MonitorFunction .. py:data:: LOGGER .. py:data:: DEFAULT_TMP_PREFIX :value: 'tmp' .. py:function:: default_make_tool(toolpath_object: weaver.typedefs.CWL_ToolPathObject, loading_context: cwltool.context.LoadingContext, package_process: weaver.processes.wps_package.WpsPackage) -> cwltool.process.Process Generate the tool class object from the :term:`CWL` definition to handle its execution. .. warning:: Package :mod:`cwltool` introduces explicit typing definitions with :mod:`mypy_extensions`. This can cause ``TypeError("interpreted classes cannot inherit from compiled")`` when using :class:`cwltool.process.Process` as base class for our custom definitions below. To avoid the error, we must enforce the type using :func:`cast`. .. py:class:: WpsWorkflow(toolpath_object: weaver.typedefs.CWL_ToolPathObject, loading_context: cwltool.context.LoadingContext, package_process: weaver.processes.wps_package.WpsPackage) Definition of a `CWL` ``workflow`` that can execute ``WPS`` application packages as intermediate job steps. Steps are expected to be defined as individual :class:`weaver.processes.wps_package.WpsPackage` references. Initialize this CommandLineTool. .. py:attribute:: prov_obj .. py:attribute:: package_process .. py:attribute:: get_job_process_definition .. py:attribute:: requirements .. py:attribute:: hints .. py:method:: job(job_order: cwltool.utils.CWLObjectType, output_callbacks: Callable[[Any, Any], Any], runtime_context: cwltool.context.RuntimeContext) -> cwltool.utils.JobsGeneratorType Workflow job generator. :param job_order: inputs of the job submission :param output_callbacks: method to fetch step outputs and corresponding step details :param runtime_context: configs about execution environment :return: .. py:method:: collect_output(schema: cwltool.utils.CWLObjectType, builder: cwltool.builder.Builder, outdir: str, fs_access: cwltool.stdfsaccess.StdFsAccess, compute_checksum: bool = True) -> Optional[cwltool.utils.CWLOutputType] Collect outputs from the step :term:`Process` following its execution. .. note: When :term:`CWL` runner tries to forward ``step(i) outputs -> step(i+1) inputs`` using :meth:`collect_outputs`, it expects exact ``outputBindings`` locations to be matched. In other words, a definition like ``outputBindings: {glob: outputs/*.txt}`` will generate results located in ``step(i)`` as ``"/outputs/file.txt"`` and ``step(i+1)`` will look explicitly in ``"/outputs`` using the ``glob`` pattern. Because each of our :term:`Process` in the workflow are distinct/remote entities, each one stages its outputs at different URL locations, not sharing the same *root directory*. When we stage intermediate results locally, the sub-dirs are lost. Therefore, they act like individual :term:`CWL` runner calls where the *final results* are moved back to the local directory for convenient access, but our *local directory* is the URL WPS-outputs location. To let :term:`CWL` :term:`Workflow` inter-steps mapping work as intended, we must remap the locations ignoring any nested dirs where the modified *outputBindings* definition will be able to match as if each step :term:`Process` outputs were generated locally. .. note:: Because the staging operation following remote :term:`Process` execution nests each output under a directory name matching respective output IDs, globs must be update with that modified nested directory as well. .. seealso:: :meth:`weaver.processes.wps_process_base.WpsProcessInterface.stage_results` .. py:class:: WpsWorkflowJob(builder: cwltool.builder.Builder, job_order: cwltool.utils.CWLObjectType, make_path_mapper: Callable[Ellipsis, cwltool.pathmapper.PathMapper], requirements: weaver.typedefs.CWL_RequirementsList, hints: weaver.typedefs.CWL_RequirementsList, name: str, wps_process: weaver.processes.wps_process_base.WpsProcessInterface, expected_outputs: List[weaver.typedefs.CWL_Output_Type]) Base class for get_requirement(). Initialize the job object. .. py:attribute:: wps_process :type: weaver.processes.wps_process_base.WpsProcessInterface .. py:attribute:: expected_outputs :type: weaver.typedefs.CWL_ExpectedOutputs .. py:method:: _resolve_expected_output_files(expected_outputs: List[weaver.typedefs.CWL_Output_Type]) -> None Finds the ``{id: glob-pattern}`` for any eventual output files to collect from the :term:`Workflow` step. :param expected_outputs: All potential output definitions the step defines. :return: Resolved output locations to look for result files, considering the nested `Output-ID` path. .. py:method:: _retrieve_secret_inputs(runtime_context: cwltool.context.RuntimeContext) -> weaver.typedefs.CWL_RuntimeInputsMap Retrieve the job inputs with reverse resolution of any secrets defined in the store. Because the remote :term:`Process` (of any type) will not have the local runtime store definition, they cannot themselves resolve the original values. .. warning:: Any following steps after this call must be mindful of what they log to avoid leaking secrets. .. py:method:: collect_literal_outputs(results: weaver.typedefs.JobResults) -> weaver.typedefs.CWL_Results Collects the literal values provided as output. Typical :term:`CWL` applications (i.e: the ones not implemented by derived :class:`WpsProcessInterface`) will only return outputs from ``File``/``Directory``, or a *literal* inferred by some JavaScript expression or loaded by contents from another source. Because of this, *literal* never needs to be collected in these cases. Since a :term:`Process` derived from :class:`WpsProcessInterface` can directly return literal data, their values must be explicit chained for following :term:`Workflow` steps. :param results: The original :term:`Job` results obtained from the remote :term:`Process`. :return: Collected literal outputs. .. py:method:: _execute(runtime: List[str], env: MutableMapping[str, str], runtime_context: cwltool.context.RuntimeContext, monitor_function: MonitorFunction = None) -> None Execute the :term:`WPS` :term:`Process` defined as :term:`Workflow` step and chains their intermediate results.