weaver.processes.wps_package
============================
.. py:module:: weaver.processes.wps_package
.. autoapi-nested-parse::
Representation of :term:`WPS` process with an internal :term:`CWL` package definition.
Functions and classes that offer interoperability and conversion between corresponding elements
defined as :term:`CWL` `CommandLineTool`/`Workflow` and :term:`WPS` `ProcessDescription` in order to
generate :term:`ADES`/:term:`EMS` deployable :term:`Application Package`.
.. seealso::
- `CWL specification `_
- `WPS-1/2 XML schemas `_
- `WPS-REST schemas `_
- :mod:`weaver.wps_restapi.api` conformance details
Module Contents
---------------
.. py:data:: LOGGER
.. py:data:: PACKAGE_DEFAULT_FILE_NAME
:value: 'package'
.. py:data:: PACKAGE_OUTPUT_HOOK_LOG_UUID
:value: 'PACKAGE_OUTPUT_HOOK_LOG_{}'
.. py:data:: PACKAGE_PROGRESS_PREP_LOG
:value: 1
.. py:data:: PACKAGE_PROGRESS_LAUNCHING
:value: 2
.. py:data:: PACKAGE_PROGRESS_LOADING
:value: 5
.. py:data:: PACKAGE_PROGRESS_GET_INPUT
:value: 6
.. py:data:: PACKAGE_PROGRESS_ADD_EO_IMAGES
:value: 7
.. py:data:: PACKAGE_PROGRESS_CONVERT_INPUT
:value: 8
.. py:data:: PACKAGE_PROGRESS_PREPARATION
:value: 9
.. py:data:: PACKAGE_PROGRESS_CWL_RUN
:value: 10
.. py:data:: PACKAGE_PROGRESS_CWL_DONE
:value: 95
.. py:data:: PACKAGE_PROGRESS_PREP_OUT
:value: 98
.. py:data:: PACKAGE_PROGRESS_DONE
:value: 100
.. py:data:: PACKAGE_SCHEMA_CACHE
:type: Dict[str, Tuple[str, str]]
.. py:function:: get_status_location_log_path(status_location: str, out_dir: Optional[str] = None) -> str
.. py:function:: retrieve_package_job_log(execution: owslib.wps.WPSExecution, job: weaver.datatype.Job, progress_min: weaver.typedefs.Number = 0, progress_max: weaver.typedefs.Number = 100) -> None
Obtains the underlying WPS execution log from the status file to add them after existing job log entries.
.. py:function:: get_process_location(process_id_or_url: Union[Dict[str, Any], str], data_source: Optional[str] = None, container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> str
Obtains the URL of a WPS REST DescribeProcess given the specified information.
:param process_id_or_url: process "identifier" or literal URL to DescribeProcess WPS-REST location.
:param data_source: identifier of the data source to map to specific ADES, or map to localhost if ``None``.
:param container: Container that provides access to application settings.
:return: URL of EMS or ADES WPS-REST DescribeProcess.
.. py:function:: get_package_workflow_steps(package_dict_or_url: Union[weaver.typedefs.CWL, str]) -> List[weaver.typedefs.CWL_WorkflowStepReference]
Obtain references to intermediate steps of a CWL workflow.
:param package_dict_or_url: process package definition or literal URL to DescribeProcess WPS-REST location.
:return: list of workflow steps as {"name": , "reference": }
where `name` is the generic package step name, and `reference` is the id/url of a registered WPS package.
.. py:function:: _fetch_process_info(process_info_url: str, fetch_error: Type[Exception], container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> weaver.typedefs.JSON
Fetches the JSON process information from the specified URL and validates that it contains something.
:raises fetch_error: provided exception with URL message if the process information could not be retrieved.
.. py:function:: _get_process_package(process_url: str, container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> Tuple[weaver.typedefs.CWL, str]
Retrieves the WPS process package content from given process ID or literal URL.
:param process_url: process literal URL to DescribeProcess WPS-REST location.
:return: tuple of package body as dictionary and package reference name.
.. py:function:: _get_process_payload(process_url: str, container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> weaver.typedefs.JSON
Retrieves the WPS process payload content from given process ID or literal URL.
:param process_url: process literal URL to DescribeProcess WPS-REST location.
:return: payload body as dictionary.
.. py:function:: _get_package_type(package_dict: weaver.typedefs.CWL) -> weaver.typedefs.Literal[weaver.processes.types.ProcessType.APPLICATION, weaver.processes.types.ProcessType.WORKFLOW]
.. py:function:: _get_package_requirements_normalized(requirements: weaver.typedefs.CWL_AnyRequirements, as_dict: bool = False) -> weaver.typedefs.CWL_AnyRequirements
Converts :term:`CWL` package ``requirements`` or ``hints`` into :class:`list` or :class:`dict` representation.
Uniformization of :term:`CWL` ``requirements`` or ``hints`` into the :class:`list` representation (default)
or as :class:`dict` if requested, whether the input definitions where provided using the dictionary definition
as ``{"": {}}`` or the list of dictionary requirements ``[{}]``
each with a ``class`` key.
.. py:function:: _patch_cuda_requirement(package: weaver.typedefs.CWL, app_pkg_req: weaver.typedefs.CWL_Requirement, patch_requirement: Union[weaver.processes.constants.CWL_RequirementCUDANameType, weaver.processes.constants.CWL_RequirementDockerGpuType]) -> weaver.typedefs.CWL
Updates legacy :term:`CWL` definitions for combinations of `CUDA` and `Docker` requirements and/or hints.
.. py:function:: _update_package_compatibility(package: weaver.typedefs.CWL) -> weaver.typedefs.CWL
Update a :term:`CWL` package with backward compatibility changes if applicable.
.. py:function:: _load_weaver_extensions_schema() -> weaver.typedefs.CWL_SchemaSalad
.. py:function:: _load_supported_schemas() -> None
Loads :term:`CWL` schemas supported by `Weaver` to avoid validation errors when provided in requirements.
Use a similar strategy as :func:`cwltool.main.setup_schema`, but skipping the :term:`CLI` context and limiting
loaded schema definitions to those that `Weaver` allows. Drops extensions that could cause misbehaving
functionalities when other :term:`Process` types than :term:`CWL`-based :term:`Application Package` are used.
This operation must be called before the :class:`CWLFactory` attempts loading and validating a :term:`CWL` document.
.. py:function:: _load_package_content(package_dict: weaver.typedefs.CWL, package_name: str = PACKAGE_DEFAULT_FILE_NAME, data_source: Optional[str] = None, only_dump_file: weaver.typedefs.Literal[True] = False, tmp_dir: Optional[str] = None, loading_context: Optional[cwltool.context.LoadingContext] = None, runtime_context: Optional[cwltool.context.RuntimeContext] = None, process_offering: Optional[weaver.typedefs.JSON] = None, container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> None
_load_package_content(package_dict: weaver.typedefs.CWL, package_name: str = PACKAGE_DEFAULT_FILE_NAME, data_source: Optional[str] = None, only_dump_file: weaver.typedefs.Literal[False] = False, tmp_dir: Optional[str] = None, loading_context: Optional[cwltool.context.LoadingContext] = None, runtime_context: Optional[cwltool.context.RuntimeContext] = None, process_offering: Optional[weaver.typedefs.JSON] = None, container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> Tuple[cwltool.factory.Callable, str, weaver.typedefs.CWL_WorkflowStepPackageMap]
Loads :term:`CWL` package definition using various contextual resources.
Following operations are accomplished to validate the package:
- Starts by resolving any intermediate sub-packages steps if the parent package is a :term:`Workflow`
in order to recursively generate and validate their process and package, potentially using remote reference.
Each of the following operations are applied to every step individually.
- Package I/O are reordered using any reference process offering hints if provided to generate consistent results.
- Perform backward compatibility checks and conversions to the package if applicable.
- The resulting package definition is dumped to a temporary JSON file, to validate the content can be serialized.
- Optionally, the :term:`CWL` factory is employed to create the application runner, validating any provided loading
and runtime contexts, and considering all resolved :term:`Workflow` steps if applicable, or the atomic application
otherwise.
:param package_dict: Package content representation as a dictionary.
:param package_name: Name to use to create the package file and :term:`CWL` identifiers.
:param data_source:
Identifier of the :term:`Data Source` to map to specific :term:`ADES`, or map to ``localhost`` if ``None``.
:param only_dump_file: Specify if the :class:`CWLFactoryCallable` should be validated and returned.
:param tmp_dir: Location of the temporary directory to dump files (deleted on exit).
:param loading_context: :mod:`cwltool` context used to create the :term:`CWL` package.
:param runtime_context: :mod:`cwltool` context used to execute the :term:`CWL` package.
:param process_offering: :term:`JSON` body of the process description payload (used as I/O hint ordering).
:param container: Container that provides access to application settings.
:returns:
If :paramref:`only_dump_file` is ``True``, returns ``None``.
Otherwise, :class:`tuple` of:
- Instance of :class:`CWLFactoryCallable`
- Package type (:attr:`ProcessType.WORKFLOW` or :attr:`ProcessType.APPLICATION`)
- Package sub-steps definitions if package represents a :attr:`ProcessType.WORKFLOW`. Otherwise, empty mapping.
Mapping of each step name contains their respective package ID and :term:`CWL` definition that must be run.
.. warning::
Specified :paramref:`tmp_dir` will be deleted on exit.
.. py:function:: _merge_package_inputs_outputs(wps_inputs_defs: Union[List[weaver.processes.convert.ANY_IO_Type], Dict[str, weaver.processes.convert.ANY_IO_Type]], cwl_inputs_list: List[weaver.processes.convert.WPS_Input_Type], wps_outputs_defs: Union[List[weaver.processes.convert.ANY_IO_Type], Dict[str, weaver.processes.convert.ANY_IO_Type]], cwl_outputs_list: List[weaver.processes.convert.WPS_Output_Type]) -> Tuple[List[weaver.processes.convert.JSON_IO_Type], List[weaver.processes.convert.JSON_IO_Type]]
Merges corresponding metadata of I/O definitions from :term:`CWL` and :term:`WPS` sources.
Merges I/O definitions to use for process creation and returned by ``GetCapabilities``, ``DescribeProcess``
using the `WPS` specifications (from request ``POST``) and `CWL` specifications (extracted from file).
.. note::
Parameters :paramref:`cwl_inputs_list` and :paramref:`cwl_outputs_list` are expected to be
in :term:`WPS`-like format (i.e.: :term:`CWL` I/O converted to corresponding :term:`WPS` I/O objects).
.. seealso::
Conversion of :term:`CWL` to :term:`WPS`-equivalent objects is handled by :func:`_get_package_inputs_outputs`
and its underlying functions.
:param wps_inputs_defs: list or mapping of provided :term:`WPS` input definitions.
:param cwl_inputs_list: processed list of :term:`CWL` inputs from the :term:`Application Package`.
:param wps_outputs_defs: list or mapping of provided :term:`WPS` output definitions.
:param cwl_outputs_list: processed list of :term:`CWL` inputs from the :term:`Application Package`.
:returns:
Tuple of (inputs, outputs) consisting of lists of I/O with merged contents between :term:`CWL` and :term:`WPS`.
.. py:function:: _get_package_io(package_factory: cwltool.factory.Callable, io_select: weaver.processes.constants.IO_Select_Type, as_json: bool) -> List[weaver.processes.convert.PKG_IO_Type]
Retrieves I/O definitions from a validated :class:`CWLFactoryCallable`.
.. seealso::
Factory can be obtained with validation using :func:`_load_package_content`.
:param package_factory: :term:`CWL` factory that contains I/O references to the package definition.
:param io_select: either :data:`IO_INPUT` or :data:`IO_OUTPUT` according to what needs to be processed.
:param as_json: toggle to the desired output type.
If ``True``, converts the I/O definitions into :term:`JSON` representation.
If ``False``, converts the I/O definitions into :term:`WPS` objects.
:returns: I/O format depending on value :paramref:`as_json`.
.. py:function:: _get_package_inputs_outputs(package_factory: cwltool.factory.Callable, as_json: bool = False) -> Tuple[List[weaver.processes.convert.PKG_IO_Type], List[weaver.processes.convert.PKG_IO_Type]]
Generates :term:`WPS`-like ``(inputs, outputs)`` tuple using parsed CWL package definitions.
.. py:function:: _update_package_metadata(wps_metadata: weaver.typedefs.JSON, cwl_package: weaver.typedefs.CWL) -> None
Updates the package :term:`WPS` metadata dictionary from extractable `CWL` package definition.
.. py:function:: _patch_wps_process_description_url(reference: str, process_hint: Optional[weaver.typedefs.JSON]) -> str
Rebuilds a :term:`WPS` ``ProcessDescription`` URL from other details.
A ``GetCapabilities`` request can be submitted with an ID in query params directly.
Otherwise, check if a process hint can provide the ID.
.. py:function:: _generate_process_with_cwl_from_reference(reference: str, process_hint: Optional[weaver.typedefs.JSON] = None) -> Tuple[weaver.typedefs.CWL, weaver.typedefs.JSON]
Resolves the ``reference`` type representing a remote :term:`Process` and generates a `CWL` ``package`` for it.
The reference can point to any definition amongst below known structures:
- :term:`CWL`
- :term:`WPS`-1/2
- :term:`WPS-REST`
- :term:`OGC API - Processes`
Additionally, provides minimal :term:`Process` details retrieved from the ``reference``.
The number of details obtained will depend on available parameters from its description as well
as the number of metadata that can be mapped between it and the generated :term:`CWL` package.
The resulting :term:`Process` and its :term:`CWL` will correspond to a remote instance to which execution should
be dispatched and monitored, except if the reference was directly a :term:`CWL` file.
.. warning::
Only conversion of the reference into a potential :term:`CWL` definition is accomplished by this function.
Further validations must still be applied to ensure the loaded definition is valid and meets all requirements.
.. seealso::
- :class:`weaver.processes.ogc_api_process.OGCAPIRemoteProcess`
- :class:`weaver.processes.wps1_process.Wps1Process`
- :class:`weaver.processes.wps3_process.Wps3Process`
.. py:function:: get_application_requirement(package: weaver.typedefs.CWL, search: Optional[weaver.typedefs.CWL_RequirementNames] = None, default: Optional[Union[weaver.typedefs.CWL_Requirement, weaver.typedefs.Default]] = null, validate: bool = True, required: bool = True) -> Union[weaver.typedefs.CWL_Requirement, weaver.typedefs.Default]
Retrieves a requirement or hint from the :term:`CWL` package definition.
If no :paramref:`search` filter is specified (default), retrieve the *principal* requirement that allows
mapping to the appropriate :term:`Process` implementation. The *principal* requirement can be extracted
for an :term:`Application Package` of type :data:`ProcessType.APPLICATION` because only one is permitted
simultaneously amongst :data:`CWL_REQUIREMENT_APP_TYPES`. If the :term:`CWL` is not of type
:data:`ProcessType.APPLICATION`, the requirement check is skipped regardless of :paramref:`required`.
If a :paramref:`search` filter is provided, this specific requirement or hint is looked for instead.
Regardless of the applied filter, only a unique item can be matched across ``requirements``/``hints`` mapping
and/or listing representations.
When :paramref:`validate` is enabled, all ``requirements`` and ``hints`` must also be defined
within :data:`CWL_REQUIREMENTS_SUPPORTED` for the :term:`CWL` package to be considered valid.
When :paramref:`convert` is enabled, any backward compatibility definitions will be converted to their
corresponding definition.
:param package: CWL definition to parse.
:param search: Specific requirement/hint name to search and retrieve the definition if available.
:param default: Default value to return if no match was found. If ``None``, returns an empty ``{"class": ""}``.
:param validate: Validate supported requirements/hints definition while extracting requested one.
:param required: Validation will fail if no supported requirements/hints definition could be found.
:returns: dictionary that minimally has ``class`` field, and optionally other parameters from that requirement.
.. py:function:: check_package_instance_compatible(package: weaver.typedefs.CWL) -> Optional[str]
Verifies if an :term:`Application Package` definition is valid for the employed `Weaver` instance configuration.
Given that the :term:`CWL` is invalid for the active application, explains the reason why that package `always`
require remote execution.
When a package can sometimes be executed locally (:term:`ADES`) or remotely (:term:`EMS`) depending on the instance
configuration, such as in the case of a :data:`CWL_REQUIREMENT_APP_DOCKER`, return ``None``. This function instead
detects cases where a remote server is mandatory without ambiguity related to the current `Weaver` instance,
regardless whether remote should be an :term:`ADES` or a remote :term:`Provider` (:term:`WPS` or :term:`ESGF-CWT`).
:param package: CWL definition for the process.
:returns: reason message if it must be executed remotely or ``None`` if it *could* be executed locally.
.. py:function:: get_auth_requirements(requirement: weaver.typedefs.JSON, headers: Optional[weaver.typedefs.AnyHeadersContainer]) -> Optional[weaver.datatype.Authentication]
Extract any authentication related definitions provided in request headers corresponding to the application type.
:param requirement: :term:`Application Package` requirement as defined by :term:`CWL` requirements.
:param headers: Requests headers received during deployment.
:return: Matched authentication details when applicable, otherwise None.
:raises TypeError: When the authentication object cannot be created due to invalid or missing inputs.
:raises ValueError: When the authentication object cannot be created due to incorrectly formed inputs.
.. py:function:: mask_process_inputs(package: weaver.typedefs.CWL, inputs: weaver.typedefs.ExecutionInputs, secret_store: Optional[cwltool.secrets.SecretStore] = None) -> weaver.typedefs.ExecutionInputs
Obtains a masked representation of the input values as applicable.
.. seealso::
:data:`CWL_REQUIREMENT_SECRETS`
.. py:function:: get_process_identifier(process_info: weaver.typedefs.JSON, package: weaver.typedefs.CWL) -> str
Obtain a sane name identifier reference from the :term:`Process` or the :term:`Application Package`.
.. py:function:: get_process_definition(process_offering: weaver.typedefs.JSON, reference: Optional[str] = None, package: Optional[weaver.typedefs.CWL] = None, data_source: Optional[str] = None, headers: Optional[weaver.typedefs.AnyHeadersContainer] = None, builtin: bool = False, container: Optional[weaver.typedefs.AnySettingsContainer] = None) -> weaver.typedefs.JSON
Resolve the process definition considering corresponding metadata from the offering, package and references.
Returns an updated process definition dictionary ready for storage using provided `WPS` ``process_offering``
and a package definition passed by ``reference`` or ``package`` `CWL` content.
The returned process information can be used later on to load an instance of :class:`weaver.wps_package.WpsPackage`.
:param process_offering: `WPS REST-API` (`WPS-3`) process offering as :term:`JSON`.
:param reference: URL to :term:`CWL` package, `WPS-1 DescribeProcess` endpoint or `WPS-3 Process` endpoint.
:param package: Literal :term:`CWL` package definition (`YAML` or `JSON` format).
:param data_source: Where to resolve process IDs (default: localhost if ``None``).
:param headers: Request headers provided during deployment to retrieve details such as authentication tokens.
:param builtin: Indicate if the package is expected to be a :data:`CWL_REQUIREMENT_APP_BUILTIN` definition.
:param container: Container that provides access to application settings.
:return: Updated process definition with resolved/merged information from ``package``/``reference``.
.. py:function:: format_extension_validator(data_input: Union[pywps.inout.inputs.ComplexInput, pywps.inout.outputs.ComplexOutput], mode: int) -> bool
Validator that will only check that the extension matches the selected data format.
.. py:class:: DirectoryNestedStorage(storage: Union[pywps.inout.storage.file.FileStorage, pywps.inout.storage.s3.S3Storage])
Generates a nested storage for a directory where each contained file will be managed by the storage.
Initializes the storage.
:param storage: Storage implementation that is employed for storing files in a directory-like structure.
.. py:property:: type
:type: pywps.inout.storage.STORE_TYPE
.. py:method:: _patch_destination(destination: str) -> str
.. py:method:: _do_store(output: pywps.inout.outputs.ComplexOutput) -> Tuple[pywps.inout.storage.STORE_TYPE, weaver.typedefs.Path, str]
Store all files contained in a directory recursively.
.. note::
This is called from :meth:`CachedStorage.store` only if not already in storage using cached output ID.
.. py:method:: write(data: AnyStr, destination: str, data_format: Optional[pywps.inout.formats.Format] = None) -> str
Write data representing the directory itself or dispatch call to base storage for any other file contents.
When the directory itself is targeted, upload an empty bucket object for S3 base storage, or makes the
directory structure for base file storage.
.. py:method:: url(destination: str) -> str
:param destination: the name of the output to calculate
the url for
:returns: URL where file_name can be reached
.. py:method:: location(destination: str) -> weaver.typedefs.Path
Provides a location for the specified destination.
This may be any path, pathlike object, db connection string, URL, etc
and it is not guaranteed to be accessible on the local file system
:param destination: the name of the output to calculate
the location for
:returns: location where file_name can be found
.. py:class:: WpsPathMapperFactory(cwl_outdir_prefix: weaver.typedefs.Path, final_stagedir: weaver.typedefs.Path)
Path mapper factory that preserves the original output directory structure for staging.
If a :term:`CWL` happens to define multiple ``outputs``, which are collected with ``glob`` patterns pointing to
distinct sub-directories under a common directory, and that their respective nested-files also happen to match
by name (see below example), the default :class:`PathMapper` that collects the outputs combines them (flat list)
under a common output/staging directory with ``.ext_{duplicate}`` to manage conflicts. This is problematic, since
the specific extensions are validated to ensure ``format`` integrity between :term:`CWL` and :term:`WPS` operations.
.. code-block:: yaml
:caption: Example structure that causes matching duplicates.
# input to PathMapper
outputs:
out1:
file1.ext
file2.ext
out2:
file1.ext
file2.ext
# result from *default* PathMapper
stagedir:
file1.ext # from out1
file1.ext_1 # from out2
file2.ext # from out1
file2.ext_1 # from out2
.. seealso::
The :func:`cwltool.executors.JobExecutor.execute` call passes the :class:`PathMapper` type from
the :class:`RuntimeContext`, which ends up being used in :func:`cwltool.process.relocateOutputs`.
This relocation instantiates the :class:`PathMapper` with a common ``stagedir``
(the desired :mod:`pywps` working directory configured by ``"outdir": self.workdir``
in :func:`weaver.wps_package.WpsPackage.setup_runtime`), which leads to the removal of the prefix
sub-directories (i.e.: above ``out1`` and ``out2``) when moving the files under the staging directory.
This class provides fixes of above issues using the following procedure.
1. Use a factory to pass additional arguments that helps resolving the correct source/destination of CWL/PyWPS.
2. Let the parent :func:`PathMapper.setup` do its standard resolution to avoid reimplementing it ourselves.
3. Detect files from :term:`CWL` temporary output directories (matching ``cwl_outdir_prefix``).
4. Extract the relative path from within the original :term:`CWL` output directory.
5. Adjust the mapping of the resolved target staging location to inject the relative sub-directory structure.
.. note::
Since :mod:`cwltool` expects to do ``PathMapper(...)`` to instantiate it, using this factory can simulate
the :func:`PathMapper.__init__` by copying its arguments in :func:`WpsPathMapperFactory.__call__`
and then applying the custom logic with our extra arguments. This is necessary to "hook" into :mod:`cwltool`
and its method of creating :class:`PathMapper`.
.. py:attribute:: cwl_outdir_prefix
.. py:attribute:: final_stagedir
.. py:class:: WpsPackage(*, identifier: str, title: Optional[str] = None, package: weaver.typedefs.CWL = None, payload: Optional[weaver.typedefs.JSON] = None, settings: Optional[weaver.typedefs.AnySettingsContainer] = None, **kw)
:param handler: A callable that gets invoked for each incoming
request. It should accept a single
:class:`pywps.app.WPSRequest` argument and return a
:class:`pywps.app.WPSResponse` object.
:param string identifier: Name of this process.
:param string title: Human-readable title of process.
:param string abstract: Brief narrative description of the process.
:param list keywords: Keywords that characterize a process.
:param inputs: List of inputs accepted by this process. They
should be :class:`~LiteralInput` and :class:`~ComplexInput`
and :class:`~BoundingBoxInput`
objects.
:param outputs: List of outputs returned by this process. They
should be :class:`~LiteralOutput` and :class:`~ComplexOutput`
and :class:`~BoundingBoxOutput`
objects.
:param metadata: List of metadata advertised by this process. They
should be :class:`pywps.app.Common.Metadata` objects.
:param dict[str,dict[str,str]] translations: The first key is the RFC 4646 language code,
and the nested mapping contains translated strings accessible by a string property.
e.g. {"fr-CA": {"title": "Mon titre", "abstract": "Une description"}}
Creates a `WPS-3 Process` instance to execute a `CWL` application package definition.
Process parameters should be loaded from an existing :class:`weaver.datatype.Process`
instance generated using :func:`weaver.wps_package.get_process_definition`.
Provided ``kw`` should correspond to :meth:`weaver.datatype.Process.params_wps`
.. py:attribute:: package_id
:type: str
.. py:attribute:: package_type
:type: Optional[str]
:value: None
.. py:attribute:: package_requirement
:type: Optional[weaver.typedefs.CWL_RequirementsDict]
:value: None
.. py:attribute:: package_log_hook_stderr
:type: Optional[str]
:value: None
.. py:attribute:: package_log_hook_stdout
:type: Optional[str]
:value: None
.. py:attribute:: percent
:type: Optional[weaver.typedefs.Number]
:value: None
.. py:attribute:: status
:type: Optional[weaver.status.AnyStatusType]
:value: None
.. py:attribute:: remote_execution
:type: Optional[bool]
:value: None
.. py:attribute:: _log_file
:type: Optional[str]
:value: None
.. py:attribute:: _log_level
:type: Optional[int]
:value: None
.. py:attribute:: _logger
:type: Optional[logging.Logger]
:value: None
.. py:attribute:: step_packages
:type: weaver.typedefs.CWL_WorkflowStepPackageMap
.. py:attribute:: step_launched
:type: List[str]
:value: []
.. py:attribute:: request
:type: Optional[weaver.wps.service.WorkerRequest]
:value: None
.. py:attribute:: response
:type: Optional[pywps.response.execute.ExecuteResponse]
:value: None
.. py:attribute:: uuid
:type: Optional[uuid.UUID]
:value: None
.. py:attribute:: _job
:type: Optional[weaver.datatype.Job]
:value: None
.. py:attribute:: _job_status_file
:type: Optional[str]
:value: None
.. py:attribute:: payload
:value: None
.. py:attribute:: package
:value: None
.. py:attribute:: settings
.. py:property:: status_filename
:type: str
Obtain the XML status location of this process when executed.
The status location applies the ``WPS-Output-Context`` if defined such that any following output
or log file references that derive from it will be automatically stored in the same nested context.
.. py:method:: setup_loggers(log_stdout_stderr: bool = True) -> None
Configures useful loggers to catch most of the common output and/or error messages during package execution.
.. seealso::
:meth:`insert_package_log`
:func:`retrieve_package_job_log`
.. py:method:: insert_package_log(result: Union[weaver.typedefs.CWL_Results, cwltool.factory.WorkflowStatus]) -> List[str]
Retrieves additional `CWL` sub-process logs captures to retrieve internal application output and/or errors.
After execution of this method, the `WPS` output log (which can be obtained by :func:`retrieve_package_job_log`)
will have additional ``stderr/stdout`` entries extracted from the underlying application package tool execution.
The outputs and errors are inserted *as best as possible* in the logical order to make reading of the merged
logs appear as a natural and chronological order. In the event that both output and errors are available, they
are appended one after another as merging in an orderly fashion cannot be guaranteed by outside `CWL` runner.
.. note::
In case of any exception, log reporting is aborted and ignored.
.. todo::
Improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131)
.. seealso::
:meth:`setup_loggers`
:func:`retrieve_package_job_log`
:param result: output results returned by successful `CWL` package instance execution or raised CWL exception.
:returns: captured execution log lines retrieved from files
.. py:method:: setup_docker_image() -> Optional[bool]
Pre-pull the :term:`Docker` image locally for running the process if authentication is required to get it.
:returns: success status if operation was successful, or ``None`` when it does not apply.
.. py:method:: setup_runtime() -> Dict[str, weaver.typedefs.AnyValueType]
Prepares the runtime parameters for the :term:`CWL` package execution.
Parameter ``weaver.wps_workdir`` is the base-dir where sub-dir per application packages will be generated.
Parameter :attr:`workdir` is the actual location PyWPS reserved for this process (already with sub-dir).
If no ``weaver.wps_workdir`` was provided, reuse PyWps parent workdir since we got access to it.
Other steps handling outputs need to consider that ``CWL<->WPS`` out dirs could match because of this.
:return: resolved runtime parameters
.. py:method:: setup_provenance(loading_context: cwltool.context.LoadingContext, runtime_context: cwltool.context.RuntimeContext) -> None
Configure ``PROV`` runtime options.
.. seealso::
- https://www.w3.org/TR/prov-overview/
- https://cwltool.readthedocs.io/en/latest/CWLProv.html
- https://docs.ogc.org/DRAFTS/24-051.html#_requirements_class_provenance
.. py:method:: finalize_provenance(runtime_context: cwltool.context.RuntimeContext) -> None
.. py:method:: update_requirements() -> None
Inplace modification of :attr:`package` to adjust invalid items that would break behaviour we must enforce.
.. py:method:: update_cwl_schema_names() -> None
Detect duplicate :term:`CWL` schema types not referred by name to provide one and avoid resolution failure.
Doing this resolution avoids reused definitions being considered as "conflicts" because of missing ``name``.
To avoid introducing a real conflict, names are injected only under corresponding :term:`CWL` I/O by ID.
The most common type of definition resolved this way is when :term:`CWL` ``Enum`` is reused for single and
array-based definitions simultaneously without using an explicit ``SchemaDefRequirement`` for them.
.. seealso::
- :func:`weaver.processes.convert.resolve_cwl_io_type_schema`
- :meth:`weaver.processes.wps_package.WpsPackage.make_inputs`
.. fixme:
.. todo::
Workaround for https://github.com/common-workflow-language/cwltool/issues/1908.
.. py:method:: update_effective_user(runtime_context: cwltool.context.RuntimeContext) -> None
Update effective user/group for the :term:`Application Package` to be executed.
Reducing permissions is safer inside docker application since weaver/cwltool could be running as root
but this requires that mounted volumes have the required permissions so ``euid:egid`` can use them.
Overrides :mod:`cwltool`'s function to retrieve user/group id for ones we enforce.
.. warning::
This is an experimental feature.
Update of user/group permissions to access inputs/outputs could be required.
.. note::
When invoking a docker application, appropriate user-namespace mapping (if any) should be consistent with
the docker daemon configuration. In other words, if the docker daemon is configured to run in rootless mode,
mapping of UID/GID might already be applied by the security feature when invoking docker commands, and might
therefore not need additional mapping here by Weaver. However, if the docker daemon is configured to run in
root mode with added remapping of user-namespaces, setting UID/GID could be required to avoid root-based
docker container runs. Developers might also want to override with predefined UID/GID, regardless of
user-namespaces mapping depending on their setup. Either way, ensuring that the resulting UID/GID are
consistent between Weaver/cwltool/pywps/docker is left up to developer consideration, since it cannot be
entirely automated to handle all possible configurations.
.. seealso::
- https://docs.docker.com/engine/security/rootless/
- https://docs.docker.com/engine/security/userns-remap/
.. py:method:: update_status(message: str, progress: weaver.typedefs.Number, status: weaver.status.AnyStatusType, error: Optional[Exception] = None, step: bool = False) -> None
Updates the :mod:`pywps` real job status from a specified parameters.
.. py:method:: step_update_status(message: str, progress: weaver.typedefs.Number, start_step_progress: weaver.typedefs.Number, end_step_progress: weaver.typedefs.Number, step_name: str, target_host: str, status: weaver.status.AnyStatusType, error: Optional[Exception] = None) -> None
.. py:method:: log(level: int, message: str, *args: str, **kwargs: Any) -> None
Logging interface matching :class:`logging.Logger` for use by other utilities.
.. py:method:: log_message(message, *args, status=None, progress=None, level=logging.INFO, **kwargs)
.. py:method:: exception_message(exception_type: Type[Exception], exception: Optional[Exception] = None, message: str = 'no message', status: weaver.status.AnyStatusType = Status.EXCEPTION, progress: Optional[weaver.typedefs.Number] = None, level: int = logging.ERROR) -> Exception
Logs to the job the specified error message with the provided exception type.
:returns: formatted exception with message to be raised by calling function.
.. py:property:: job
:type: weaver.datatype.Job
Obtain the job associated to this package execution as specified by the provided UUID.
Process must be in "execute" state under :mod:`pywps` for this job to be available.
.. py:method:: map_step_progress(step_index: int, steps_total: int) -> weaver.typedefs.Number
:classmethod:
Calculates the percentage progression of a single step of the full process.
.. note::
The step procession is adjusted according to delimited start/end of the underlying `CWL` execution to
provide a continuous progress percentage over the complete execution. Otherwise, we would have values
that jump around according to whichever progress the underlying remote `WPS` or monitored `CWL` employs,
if any is provided.
.. py:property:: auth
:type: weaver.typedefs.AnyHeadersContainer
.. py:method:: _handler(request: weaver.wps.service.WorkerRequest, response: pywps.response.execute.ExecuteResponse) -> pywps.response.execute.ExecuteResponse
Method called when process receives the WPS execution request.
.. py:method:: must_fetch(input_ref: str, input_type: weaver.processes.constants.PACKAGE_COMPLEX_TYPES) -> bool
Figures out if file reference should be fetched immediately for local execution.
If anything else than local script/docker, remote ADES/WPS process will fetch it.
S3 are handled here to avoid error on remote WPS not supporting it.
.. seealso::
- :ref:`file_ref_types`
- :ref:`dir_ref_type`
.. py:method:: make_inputs(wps_inputs: Dict[str, Deque[weaver.processes.convert.WPS_Input_Type]], cwl_inputs_info: Dict[str, weaver.typedefs.CWL_Input_Type], cwl_schema_names: weaver.typedefs.CWL_SchemaNames) -> Dict[str, weaver.typedefs.ValueType]
Converts :term:`WPS` input values to corresponding :term:`CWL` input values for processing by the package.
The :term:`WPS` inputs must correspond to :mod:`pywps` definitions.
Multiple values (repeated objects with corresponding IDs) are adapted to arrays as needed.
All :term:`WPS` `Complex` types are converted to appropriate locations based on data or reference specification.
:param wps_inputs: Actual :term:`WPS` inputs parsed from execution request.
:param cwl_inputs_info: Expected CWL input definitions for mapping.
:param cwl_schema_names: Mapping of CWL type schema references to resolve 'type: [' if used in a definition.
:return: :term:`CWL` input values.
.. py:method:: make_literal_input(input_definition: pywps.inout.inputs.LiteralInput) -> weaver.typedefs.JSON
:staticmethod:
Converts Literal Data representations to compatible :term:`CWL` contents with :term:`JSON` encodable values.
.. py:method:: make_location_bbox(input_definition: pywps.inout.inputs.BoundingBoxInput) -> None
:staticmethod:
Convert a Bounding Box to a compatible :term:`CWL` ``File`` using corresponding IOHandler of a Complex input.
.. py:method:: make_location_input_security_check(input_scheme: str, input_type: weaver.typedefs.CWL_IO_ComplexType, input_id: str, input_location: str, input_definition: pywps.inout.inputs.ComplexInput) -> str
Perform security access validation of the reference, and resolve it afterwards if accessible.
Auto-map local file if possible to avoid useless download from current server.
Resolve :term:`Vault` reference with local file stored after decryption.
:returns: Updated file location if any resolution occurred.
.. py:method:: make_location_input(cwl_input_def: weaver.processes.convert.CWLIODefinition, input_definition: Union[pywps.inout.inputs.ComplexInput, pywps.inout.inputs.BoundingBoxInput]) -> Optional[weaver.typedefs.JSON]
Generates the JSON content required to specify a `CWL` ``File`` or ``Directory`` input from a location.
If the input reference corresponds to an HTTP URL that is detected as matching the local WPS output endpoint,
implicitly convert the reference to the local WPS output directory to avoid useless download of available file.
Since that endpoint could be protected though, perform a minimal HEAD request to validate its accessibility.
Otherwise, this operation could incorrectly grant unauthorized access to protected files by forging the URL.
If the process requires ``OpenSearch`` references that should be preserved as is, scheme defined by
:py:data:`weaver.processes.constants.OpenSearchField.LOCAL_FILE_SCHEME` prefix instead of ``http(s)://``
is expected.
Any other variant of file reference will be fetched as applicable by the relevant schemes.
If the reference corresponds to a ``Directory``, all files that can be located in it will be fetched as
applicable by the relevant scheme of the reference. It is up to the remote location to provide listing
capabilities accordingly to view available files.
.. seealso::
Documentation details of resolution based on schemes defined in :ref:`file_ref_types` section.
.. py:method:: make_outputs(cwl_result: weaver.typedefs.CWL_Results) -> None
Maps `CWL` result outputs to corresponding `WPS` outputs.
.. py:method:: make_array_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str) -> None
Converts an array output into a :term:`JSON` literal representation.
.. py:method:: make_literal_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str, index: Optional[int] = None) -> None
Converts Literal Data representations to compatible :term:`CWL` contents with :term:`JSON` encodable values.
.. py:method:: make_bbox_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str, index: Optional[int] = None) -> None
Generates the :term:`WPS` Bounding Box output from the :term:`CWL` ``File``.
Assumes that location outputs were resolved beforehand, such that the file is available locally.
.. py:method:: make_location_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str, index: Optional[int] = None) -> None
Rewrite the :term:`WPS` output with required location using result path from :term:`CWL` execution.
Configures the parameters such that :mod:`pywps` will either auto-resolve the local paths to match with URL
defined by ``weaver.wps_output_url`` or upload it to `S3` bucket from ``weaver.wps_output_s3_bucket`` and
provide reference directly.
.. seealso::
- :func:`weaver.wps.load_pywps_config`
.. py:method:: resolve_output_format(output: pywps.inout.outputs.ComplexOutput, result_path: str, result_cwl_format: Optional[str]) -> None
:staticmethod:
Resolves the obtained media-type for an output :term:`CWL` ``File``.
Considers :term:`CWL` results ``format``, the file path, and the :term:`Process` description to resolve the
best possible match, retaining a much as possible the metadata that can be resolved from their corresponding
details.
When the media-type is resolved, ensure that an appropriate format validator is applied to perform relevant
checks, or omit them when not implemented.
.. py:method:: make_location_storage(storage_type: pywps.inout.storage.STORE_TYPE, location_type: weaver.processes.constants.PACKAGE_COMPLEX_TYPES, output_id: str) -> Union[pywps.inout.storage.file.FileStorage, pywps.inout.storage.s3.S3Storage, DirectoryNestedStorage]
Generates the relevant storage implementation with requested types and references.
:param storage_type: Where to store the outputs.
:param location_type: Type of output as defined by CWL package type.
:param output_id: expected output identifier that will employ this storage.
:return: Storage implementation.
.. py:method:: make_tool(toolpath_object: weaver.typedefs.CWL_ToolPathObject, loading_context: cwltool.context.LoadingContext) -> cwltool.process.Process
Method called by :mod:`cwltool` to generate the tool object from the :term:`CWL` definition.
.. py:method:: get_workflow_step_package(job_name: str) -> weaver.typedefs.CWL_WorkflowStepPackage
Resolve the step :term:`CWL` definition under a :term:`Workflow`.
.. py:method:: get_job_process_definition(job_name: str, job_order: weaver.typedefs.CWL_WorkflowInputs) -> WpsPackage
Obtain the execution job definition for the given process (:term:`Workflow` step implementation).
This function is called before running an :term:`ADES` :term:`Job` (either from a :term:`workflow` step or
simple :term:`EMS` :term:`Job` dispatching).
It must return a :class:`weaver.processes.wps_process.WpsProcess` instance configured with the
proper :term:`CWL` package definition, :term:`ADES` target and cookies to access it (if protected).
:param job_name: The workflow step or the package ID that must be executed.
:param job_order: Execution input values submitted for the job.
]