weaver.processes.wps_package
Representation of WPS process with an internal CWL package definition.
Functions and classes that offer interoperability and conversion between corresponding elements defined as CWL CommandLineTool/Workflow and WPS ProcessDescription in order to generate ADES/EMS deployable Application Package.
See also
weaver.wps_restapi.apiconformance details
Module Contents
- weaver.processes.wps_package.get_status_location_log_path(status_location: str, out_dir: str | None = None) str[source]
- weaver.processes.wps_package.retrieve_package_job_log(execution: owslib.wps.WPSExecution, job: weaver.datatype.Job, progress_min: weaver.typedefs.Number = 0, progress_max: weaver.typedefs.Number = 100) None[source]
Obtains the underlying WPS execution log from the status file to add them after existing job log entries.
- weaver.processes.wps_package.get_process_location(process_id_or_url: Dict[str, Any] | str, data_source: str | None = None, container: weaver.typedefs.AnySettingsContainer | None = None) str[source]
Obtains the URL of a WPS REST DescribeProcess given the specified information.
- Parameters:
process_id_or_url – process “identifier” or literal URL to DescribeProcess WPS-REST location.
data_source – identifier of the data source to map to specific ADES, or map to localhost if
None.container – Container that provides access to application settings.
- Returns:
URL of EMS or ADES WPS-REST DescribeProcess.
- weaver.processes.wps_package.get_package_workflow_steps(package_dict_or_url: weaver.typedefs.CWL | str) List[weaver.typedefs.CWL_WorkflowStepReference][source]
Obtain references to intermediate steps of a CWL workflow.
- Parameters:
package_dict_or_url – process package definition or literal URL to DescribeProcess WPS-REST location.
- Returns:
list of workflow steps as {“name”: <name>, “reference”: <reference>} where name is the generic package step name, and reference is the id/url of a registered WPS package.
- weaver.processes.wps_package._fetch_process_info(process_info_url: str, fetch_error: Type[Exception], container: weaver.typedefs.AnySettingsContainer | None = None) weaver.typedefs.JSON[source]
Fetches the JSON process information from the specified URL and validates that it contains something.
- Raises:
fetch_error – provided exception with URL message if the process information could not be retrieved.
- weaver.processes.wps_package._get_process_package(process_url: str, container: weaver.typedefs.AnySettingsContainer | None = None) Tuple[weaver.typedefs.CWL, str][source]
Retrieves the WPS process package content from given process ID or literal URL.
- Parameters:
process_url – process literal URL to DescribeProcess WPS-REST location.
- Returns:
tuple of package body as dictionary and package reference name.
- weaver.processes.wps_package._get_process_payload(process_url: str, container: weaver.typedefs.AnySettingsContainer | None = None) weaver.typedefs.JSON[source]
Retrieves the WPS process payload content from given process ID or literal URL.
- Parameters:
process_url – process literal URL to DescribeProcess WPS-REST location.
- Returns:
payload body as dictionary.
- weaver.processes.wps_package._get_package_type(package_dict: weaver.typedefs.CWL) weaver.typedefs.Literal[weaver.processes.types.ProcessType.APPLICATION, weaver.processes.types.ProcessType.WORKFLOW][source]
- weaver.processes.wps_package._get_package_requirements_normalized(requirements: weaver.typedefs.CWL_AnyRequirements, as_dict: bool = False) weaver.typedefs.CWL_AnyRequirements[source]
Converts CWL package
requirementsorhintsintolistordictrepresentation.Uniformization of CWL
requirementsorhintsinto thelistrepresentation (default) or asdictif requested, whether the input definitions where provided using the dictionary definition as{"<req-class>": {<params>}}or the list of dictionary requirements[{<req-class + params>}]each with aclasskey.
- weaver.processes.wps_package._patch_cuda_requirement(package: weaver.typedefs.CWL, app_pkg_req: weaver.typedefs.CWL_Requirement, patch_requirement: weaver.processes.constants.CWL_RequirementCUDANameType | weaver.processes.constants.CWL_RequirementDockerGpuType) weaver.typedefs.CWL[source]
Updates legacy CWL definitions for combinations of CUDA and Docker requirements and/or hints.
- weaver.processes.wps_package._update_package_compatibility(package: weaver.typedefs.CWL) weaver.typedefs.CWL[source]
Update a CWL package with backward compatibility changes if applicable.
- weaver.processes.wps_package._load_weaver_extensions_schema() weaver.typedefs.CWL_SchemaSalad[source]
- weaver.processes.wps_package._load_supported_schemas() None[source]
Loads CWL schemas supported by Weaver to avoid validation errors when provided in requirements.
Use a similar strategy as
cwltool.main.setup_schema(), but skipping the CLI context and limiting loaded schema definitions to those that Weaver allows. Drops extensions that could cause misbehaving functionalities when other Process types than CWL-based Application Package are used.This operation must be called before the
CWLFactoryattempts loading and validating a CWL document.
- weaver.processes.wps_package._load_package_content(package_dict: weaver.typedefs.CWL, package_name: str = PACKAGE_DEFAULT_FILE_NAME, data_source: str | None = None, only_dump_file: weaver.typedefs.Literal[True] = False, tmp_dir: str | None = None, loading_context: cwltool.context.LoadingContext | None = None, runtime_context: cwltool.context.RuntimeContext | None = None, process_offering: weaver.typedefs.JSON | None = None, container: weaver.typedefs.AnySettingsContainer | None = None) None[source]
- weaver.processes.wps_package._load_package_content(package_dict: weaver.typedefs.CWL, package_name: str = PACKAGE_DEFAULT_FILE_NAME, data_source: str | None = None, only_dump_file: weaver.typedefs.Literal[False] = False, tmp_dir: str | None = None, loading_context: cwltool.context.LoadingContext | None = None, runtime_context: cwltool.context.RuntimeContext | None = None, process_offering: weaver.typedefs.JSON | None = None, container: weaver.typedefs.AnySettingsContainer | None = None) Tuple[cwltool.factory.Callable, str, weaver.typedefs.CWL_WorkflowStepPackageMap]
Loads CWL package definition using various contextual resources.
Following operations are accomplished to validate the package:
Starts by resolving any intermediate sub-packages steps if the parent package is a Workflow in order to recursively generate and validate their process and package, potentially using remote reference. Each of the following operations are applied to every step individually.
Package I/O are reordered using any reference process offering hints if provided to generate consistent results.
Perform backward compatibility checks and conversions to the package if applicable.
The resulting package definition is dumped to a temporary JSON file, to validate the content can be serialized.
Optionally, the CWL factory is employed to create the application runner, validating any provided loading and runtime contexts, and considering all resolved Workflow steps if applicable, or the atomic application otherwise.
- Parameters:
package_dict – Package content representation as a dictionary.
package_name – Name to use to create the package file and CWL identifiers.
data_source – Identifier of the Data Source to map to specific ADES, or map to
localhostifNone.only_dump_file – Specify if the
CWLFactoryCallableshould be validated and returned.tmp_dir – Location of the temporary directory to dump files (deleted on exit).
loading_context –
cwltoolcontext used to create the CWL package.runtime_context –
cwltoolcontext used to execute the CWL package.process_offering – JSON body of the process description payload (used as I/O hint ordering).
container – Container that provides access to application settings.
- Returns:
If
only_dump_fileisTrue, returnsNone. Otherwise,tupleof:Instance of
CWLFactoryCallablePackage type (
ProcessType.WORKFLOWorProcessType.APPLICATION)Package sub-steps definitions if package represents a
ProcessType.WORKFLOW. Otherwise, empty mapping. Mapping of each step name contains their respective package ID and CWL definition that must be run.
Warning
Specified
tmp_dirwill be deleted on exit.
- weaver.processes.wps_package._merge_package_inputs_outputs(wps_inputs_defs: List[weaver.processes.convert.ANY_IO_Type] | Dict[str, weaver.processes.convert.ANY_IO_Type], cwl_inputs_list: List[weaver.processes.convert.WPS_Input_Type], wps_outputs_defs: List[weaver.processes.convert.ANY_IO_Type] | Dict[str, weaver.processes.convert.ANY_IO_Type], cwl_outputs_list: List[weaver.processes.convert.WPS_Output_Type]) Tuple[List[weaver.processes.convert.JSON_IO_Type], List[weaver.processes.convert.JSON_IO_Type]][source]
Merges corresponding metadata of I/O definitions from CWL and WPS sources.
Merges I/O definitions to use for process creation and returned by
GetCapabilities,DescribeProcessusing the WPS specifications (from requestPOST) and CWL specifications (extracted from file).Note
Parameters
cwl_inputs_listandcwl_outputs_listare expected to be in WPS-like format (i.e.: CWL I/O converted to corresponding WPS I/O objects).See also
Conversion of CWL to WPS-equivalent objects is handled by
_get_package_inputs_outputs()and its underlying functions.- Parameters:
wps_inputs_defs – list or mapping of provided WPS input definitions.
cwl_inputs_list – processed list of CWL inputs from the Application Package.
wps_outputs_defs – list or mapping of provided WPS output definitions.
cwl_outputs_list – processed list of CWL inputs from the Application Package.
- Returns:
Tuple of (inputs, outputs) consisting of lists of I/O with merged contents between CWL and WPS.
- weaver.processes.wps_package._get_package_io(package_factory: cwltool.factory.Callable, io_select: weaver.processes.constants.IO_Select_Type, as_json: bool) List[weaver.processes.convert.PKG_IO_Type][source]
Retrieves I/O definitions from a validated
CWLFactoryCallable.See also
Factory can be obtained with validation using
_load_package_content().- Parameters:
package_factory – CWL factory that contains I/O references to the package definition.
io_select – either
IO_INPUTorIO_OUTPUTaccording to what needs to be processed.as_json – toggle to the desired output type. If
True, converts the I/O definitions into JSON representation. IfFalse, converts the I/O definitions into WPS objects.
- Returns:
I/O format depending on value
as_json.
- weaver.processes.wps_package._get_package_inputs_outputs(package_factory: cwltool.factory.Callable, as_json: bool = False) Tuple[List[weaver.processes.convert.PKG_IO_Type], List[weaver.processes.convert.PKG_IO_Type]][source]
Generates WPS-like
(inputs, outputs)tuple using parsed CWL package definitions.
- weaver.processes.wps_package._update_package_metadata(wps_metadata: weaver.typedefs.JSON, cwl_package: weaver.typedefs.CWL) None[source]
Updates the package WPS metadata dictionary from extractable CWL package definition.
- weaver.processes.wps_package._patch_wps_process_description_url(reference: str, process_hint: weaver.typedefs.JSON | None) str[source]
Rebuilds a WPS
ProcessDescriptionURL from other details.A
GetCapabilitiesrequest can be submitted with an ID in query params directly. Otherwise, check if a process hint can provide the ID.
- weaver.processes.wps_package._generate_process_with_cwl_from_reference(reference: str, process_hint: weaver.typedefs.JSON | None = None) Tuple[weaver.typedefs.CWL, weaver.typedefs.JSON][source]
Resolves the
referencetype representing a remote Process and generates a CWLpackagefor it.The reference can point to any definition amongst below known structures: - CWL - WPS-1/2 - WPS-REST - OGC API - Processes
Additionally, provides minimal Process details retrieved from the
reference. The number of details obtained will depend on available parameters from its description as well as the number of metadata that can be mapped between it and the generated CWL package.The resulting Process and its CWL will correspond to a remote instance to which execution should be dispatched and monitored, except if the reference was directly a CWL file.
Warning
Only conversion of the reference into a potential CWL definition is accomplished by this function. Further validations must still be applied to ensure the loaded definition is valid and meets all requirements.
- weaver.processes.wps_package.get_application_requirement(package: weaver.typedefs.CWL, search: weaver.typedefs.CWL_RequirementNames | None = None, default: weaver.typedefs.CWL_Requirement | weaver.typedefs.Default | None = null, validate: bool = True, required: bool = True) weaver.typedefs.CWL_Requirement | weaver.typedefs.Default[source]
Retrieves a requirement or hint from the CWL package definition.
If no
searchfilter is specified (default), retrieve the principal requirement that allows mapping to the appropriate Process implementation. The principal requirement can be extracted for an Application Package of typeProcessType.APPLICATIONbecause only one is permitted simultaneously amongstCWL_REQUIREMENT_APP_TYPES. If the CWL is not of typeProcessType.APPLICATION, the requirement check is skipped regardless ofrequired.If a
searchfilter is provided, this specific requirement or hint is looked for instead. Regardless of the applied filter, only a unique item can be matched acrossrequirements/hintsmapping and/or listing representations.When
validateis enabled, allrequirementsandhintsmust also be defined withinCWL_REQUIREMENTS_SUPPORTEDfor the CWL package to be considered valid.When
convertis enabled, any backward compatibility definitions will be converted to their corresponding definition.- Parameters:
package – CWL definition to parse.
search – Specific requirement/hint name to search and retrieve the definition if available.
default – Default value to return if no match was found. If
None, returns an empty{"class": ""}.validate – Validate supported requirements/hints definition while extracting requested one.
required – Validation will fail if no supported requirements/hints definition could be found.
- Returns:
dictionary that minimally has
classfield, and optionally other parameters from that requirement.
- weaver.processes.wps_package.check_package_instance_compatible(package: weaver.typedefs.CWL) str | None[source]
Verifies if an Application Package definition is valid for the employed Weaver instance configuration.
Given that the CWL is invalid for the active application, explains the reason why that package always require remote execution.
When a package can sometimes be executed locally (ADES) or remotely (EMS) depending on the instance configuration, such as in the case of a
CWL_REQUIREMENT_APP_DOCKER, returnNone. This function instead detects cases where a remote server is mandatory without ambiguity related to the current Weaver instance, regardless whether remote should be an ADES or a remote Provider (WPS or ESGF-CWT).- Parameters:
package – CWL definition for the process.
- Returns:
reason message if it must be executed remotely or
Noneif it could be executed locally.
- weaver.processes.wps_package.get_auth_requirements(requirement: weaver.typedefs.JSON, headers: weaver.typedefs.AnyHeadersContainer | None) weaver.datatype.Authentication | None[source]
Extract any authentication related definitions provided in request headers corresponding to the application type.
- Parameters:
requirement – Application Package requirement as defined by CWL requirements.
headers – Requests headers received during deployment.
- Returns:
Matched authentication details when applicable, otherwise None.
- Raises:
TypeError – When the authentication object cannot be created due to invalid or missing inputs.
ValueError – When the authentication object cannot be created due to incorrectly formed inputs.
- weaver.processes.wps_package.mask_process_inputs(package: weaver.typedefs.CWL, inputs: weaver.typedefs.ExecutionInputs, secret_store: cwltool.secrets.SecretStore | None = None) weaver.typedefs.ExecutionInputs[source]
Obtains a masked representation of the input values as applicable.
See also
CWL_REQUIREMENT_SECRETS
- weaver.processes.wps_package.get_process_identifier(process_info: weaver.typedefs.JSON, package: weaver.typedefs.CWL) str[source]
Obtain a sane name identifier reference from the Process or the Application Package.
- weaver.processes.wps_package.get_process_definition(process_offering: weaver.typedefs.JSON, reference: str | None = None, package: weaver.typedefs.CWL | None = None, data_source: str | None = None, headers: weaver.typedefs.AnyHeadersContainer | None = None, builtin: bool = False, container: weaver.typedefs.AnySettingsContainer | None = None) weaver.typedefs.JSON[source]
Resolve the process definition considering corresponding metadata from the offering, package and references.
Returns an updated process definition dictionary ready for storage using provided WPS
process_offeringand a package definition passed byreferenceorpackageCWL content. The returned process information can be used later on to load an instance ofweaver.wps_package.WpsPackage.- Parameters:
process_offering – WPS REST-API (WPS-3) process offering as JSON.
reference – URL to CWL package, WPS-1 DescribeProcess endpoint or WPS-3 Process endpoint.
package – Literal CWL package definition (YAML or JSON format).
data_source – Where to resolve process IDs (default: localhost if
None).headers – Request headers provided during deployment to retrieve details such as authentication tokens.
builtin – Indicate if the package is expected to be a
CWL_REQUIREMENT_APP_BUILTINdefinition.container – Container that provides access to application settings.
- Returns:
Updated process definition with resolved/merged information from
package/reference.
- weaver.processes.wps_package.format_extension_validator(data_input: pywps.inout.inputs.ComplexInput | pywps.inout.outputs.ComplexOutput, mode: int) bool[source]
Validator that will only check that the extension matches the selected data format.
- class weaver.processes.wps_package.DirectoryNestedStorage(storage: pywps.inout.storage.file.FileStorage | pywps.inout.storage.s3.S3Storage)[source]
Generates a nested storage for a directory where each contained file will be managed by the storage.
Initializes the storage.
- Parameters:
storage – Storage implementation that is employed for storing files in a directory-like structure.
- _do_store(output: pywps.inout.outputs.ComplexOutput) Tuple[pywps.inout.storage.STORE_TYPE, weaver.typedefs.Path, str][source]
Store all files contained in a directory recursively.
Note
This is called from
CachedStorage.store()only if not already in storage using cached output ID.
- write(data: AnyStr, destination: str, data_format: pywps.inout.formats.Format | None = None) str[source]
Write data representing the directory itself or dispatch call to base storage for any other file contents.
When the directory itself is targeted, upload an empty bucket object for S3 base storage, or makes the directory structure for base file storage.
- url(destination: str) str[source]
- Parameters:
destination – the name of the output to calculate the url for
- Returns:
URL where file_name can be reached
- location(destination: str) weaver.typedefs.Path[source]
Provides a location for the specified destination. This may be any path, pathlike object, db connection string, URL, etc and it is not guaranteed to be accessible on the local file system :param destination: the name of the output to calculate
the location for
- Returns:
location where file_name can be found
- class weaver.processes.wps_package.WpsPathMapperFactory(cwl_outdir_prefix: weaver.typedefs.Path, final_stagedir: weaver.typedefs.Path)[source]
Path mapper factory that preserves the original output directory structure for staging.
If a CWL happens to define multiple
outputs, which are collected withglobpatterns pointing to distinct sub-directories under a common directory, and that their respective nested-files also happen to match by name (see below example), the defaultPathMapperthat collects the outputs combines them (flat list) under a common output/staging directory with.ext_{duplicate}to manage conflicts. This is problematic, since the specific extensions are validated to ensureformatintegrity between CWL and WPS operations.Example structure that causes matching duplicates.# input to PathMapper outputs: out1: file1.ext file2.ext out2: file1.ext file2.ext # result from *default* PathMapper stagedir: file1.ext # from out1 file1.ext_1 # from out2 file2.ext # from out1 file2.ext_1 # from out2
See also
The
cwltool.executors.JobExecutor.execute()call passes thePathMappertype from theRuntimeContext, which ends up being used incwltool.process.relocateOutputs(). This relocation instantiates thePathMapperwith a commonstagedir(the desiredpywpsworking directory configured by"outdir": self.workdirinweaver.wps_package.WpsPackage.setup_runtime()), which leads to the removal of the prefix sub-directories (i.e.: aboveout1andout2) when moving the files under the staging directory.This class provides fixes of above issues using the following procedure.
Use a factory to pass additional arguments that helps resolving the correct source/destination of CWL/PyWPS.
Let the parent
PathMapper.setup()do its standard resolution to avoid reimplementing it ourselves.Detect files from CWL temporary output directories (matching
cwl_outdir_prefix).Extract the relative path from within the original CWL output directory.
Adjust the mapping of the resolved target staging location to inject the relative sub-directory structure.
Note
Since
cwltoolexpects to doPathMapper(...)to instantiate it, using this factory can simulate thePathMapper.__init__()by copying its arguments inWpsPathMapperFactory.__call__()and then applying the custom logic with our extra arguments. This is necessary to “hook” intocwltooland its method of creatingPathMapper.
- class weaver.processes.wps_package.WpsPackage(*, identifier: str, title: str | None = None, package: weaver.typedefs.CWL = None, payload: weaver.typedefs.JSON | None = None, settings: weaver.typedefs.AnySettingsContainer | None = None, **kw)[source]
- Parameters:
handler – A callable that gets invoked for each incoming request. It should accept a single
pywps.app.WPSRequestargument and return apywps.app.WPSResponseobject.identifier (string) – Name of this process.
title (string) – Human-readable title of process.
abstract (string) – Brief narrative description of the process.
keywords (list) – Keywords that characterize a process.
inputs – List of inputs accepted by this process. They should be
LiteralInputandComplexInputandBoundingBoxInputobjects.outputs – List of outputs returned by this process. They should be
LiteralOutputandComplexOutputandBoundingBoxOutputobjects.metadata – List of metadata advertised by this process. They should be
pywps.app.Common.Metadataobjects.translations (dict[str,dict[str,str]]) – The first key is the RFC 4646 language code, and the nested mapping contains translated strings accessible by a string property. e.g. {“fr-CA”: {“title”: “Mon titre”, “abstract”: “Une description”}}
Creates a WPS-3 Process instance to execute a CWL application package definition.
Process parameters should be loaded from an existing
weaver.datatype.Processinstance generated usingweaver.wps_package.get_process_definition().Provided
kwshould correspond toweaver.datatype.Process.params_wps()- _logger: logging.Logger | None = None[source]
- request: weaver.wps.service.WorkerRequest | None = None[source]
- _job: weaver.datatype.Job | None = None[source]
- property status_filename: str[source]
Obtain the XML status location of this process when executed.
The status location applies the
WPS-Output-Contextif defined such that any following output or log file references that derive from it will be automatically stored in the same nested context.
- setup_loggers(log_stdout_stderr: bool = True) None[source]
Configures useful loggers to catch most of the common output and/or error messages during package execution.
- insert_package_log(result: weaver.typedefs.CWL_Results | cwltool.factory.WorkflowStatus) List[str][source]
Retrieves additional CWL sub-process logs captures to retrieve internal application output and/or errors.
After execution of this method, the WPS output log (which can be obtained by
retrieve_package_job_log()) will have additionalstderr/stdoutentries extracted from the underlying application package tool execution.The outputs and errors are inserted as best as possible in the logical order to make reading of the merged logs appear as a natural and chronological order. In the event that both output and errors are available, they are appended one after another as merging in an orderly fashion cannot be guaranteed by outside CWL runner.
Note
In case of any exception, log reporting is aborted and ignored.
Todo
Improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131)
- Parameters:
result – output results returned by successful CWL package instance execution or raised CWL exception.
- Returns:
captured execution log lines retrieved from files
- setup_docker_image() bool | None[source]
Pre-pull the Docker image locally for running the process if authentication is required to get it.
- Returns:
success status if operation was successful, or
Nonewhen it does not apply.
- setup_runtime() Dict[str, weaver.typedefs.AnyValueType][source]
Prepares the runtime parameters for the CWL package execution.
Parameter
weaver.wps_workdiris the base-dir where sub-dir per application packages will be generated. Parameterworkdiris the actual location PyWPS reserved for this process (already with sub-dir). If noweaver.wps_workdirwas provided, reuse PyWps parent workdir since we got access to it. Other steps handling outputs need to consider thatCWL<->WPSout dirs could match because of this.- Returns:
resolved runtime parameters
- setup_provenance(loading_context: cwltool.context.LoadingContext, runtime_context: cwltool.context.RuntimeContext) None[source]
Configure
PROVruntime options.
- update_requirements() None[source]
Inplace modification of
packageto adjust invalid items that would break behaviour we must enforce.
- update_cwl_schema_names() None[source]
Detect duplicate CWL schema types not referred by name to provide one and avoid resolution failure.
Doing this resolution avoids reused definitions being considered as “conflicts” because of missing
name. To avoid introducing a real conflict, names are injected only under corresponding CWL I/O by ID. The most common type of definition resolved this way is when CWLEnumis reused for single and array-based definitions simultaneously without using an explicitSchemaDefRequirementfor them.See also
Todo
Workaround for https://github.com/common-workflow-language/cwltool/issues/1908.
- update_effective_user(runtime_context: cwltool.context.RuntimeContext) None[source]
Update effective user/group for the Application Package to be executed.
Reducing permissions is safer inside docker application since weaver/cwltool could be running as root but this requires that mounted volumes have the required permissions so
euid:egidcan use them.Overrides
cwltool’s function to retrieve user/group id for ones we enforce.Warning
This is an experimental feature. Update of user/group permissions to access inputs/outputs could be required.
Note
When invoking a docker application, appropriate user-namespace mapping (if any) should be consistent with the docker daemon configuration. In other words, if the docker daemon is configured to run in rootless mode, mapping of UID/GID might already be applied by the security feature when invoking docker commands, and might therefore not need additional mapping here by Weaver. However, if the docker daemon is configured to run in root mode with added remapping of user-namespaces, setting UID/GID could be required to avoid root-based docker container runs. Developers might also want to override with predefined UID/GID, regardless of user-namespaces mapping depending on their setup. Either way, ensuring that the resulting UID/GID are consistent between Weaver/cwltool/pywps/docker is left up to developer consideration, since it cannot be entirely automated to handle all possible configurations.
- update_status(message: str, progress: weaver.typedefs.Number, status: weaver.status.AnyStatusType, error: Exception | None = None, step: bool = False) None[source]
Updates the
pywpsreal job status from a specified parameters.
- step_update_status(message: str, progress: weaver.typedefs.Number, start_step_progress: weaver.typedefs.Number, end_step_progress: weaver.typedefs.Number, step_name: str, target_host: str, status: weaver.status.AnyStatusType, error: Exception | None = None) None[source]
- log(level: int, message: str, *args: str, **kwargs: Any) None[source]
Logging interface matching
logging.Loggerfor use by other utilities.
- exception_message(exception_type: Type[Exception], exception: Exception | None = None, message: str = 'no message', status: weaver.status.AnyStatusType = Status.EXCEPTION, progress: weaver.typedefs.Number | None = None, level: int = logging.ERROR) Exception[source]
Logs to the job the specified error message with the provided exception type.
- Returns:
formatted exception with message to be raised by calling function.
- property job: weaver.datatype.Job[source]
Obtain the job associated to this package execution as specified by the provided UUID.
Process must be in “execute” state under
pywpsfor this job to be available.
- classmethod map_step_progress(step_index: int, steps_total: int) weaver.typedefs.Number[source]
Calculates the percentage progression of a single step of the full process.
Note
The step procession is adjusted according to delimited start/end of the underlying CWL execution to provide a continuous progress percentage over the complete execution. Otherwise, we would have values that jump around according to whichever progress the underlying remote WPS or monitored CWL employs, if any is provided.
- _handler(request: weaver.wps.service.WorkerRequest, response: pywps.response.execute.ExecuteResponse) pywps.response.execute.ExecuteResponse[source]
Method called when process receives the WPS execution request.
- must_fetch(input_ref: str, input_type: weaver.processes.constants.PACKAGE_COMPLEX_TYPES) bool[source]
Figures out if file reference should be fetched immediately for local execution.
If anything else than local script/docker, remote ADES/WPS process will fetch it. S3 are handled here to avoid error on remote WPS not supporting it.
See also
- make_inputs(wps_inputs: Dict[str, Deque[weaver.processes.convert.WPS_Input_Type]], cwl_inputs_info: Dict[str, weaver.typedefs.CWL_Input_Type], cwl_schema_names: weaver.typedefs.CWL_SchemaNames) Dict[str, weaver.typedefs.ValueType][source]
Converts WPS input values to corresponding CWL input values for processing by the package.
The WPS inputs must correspond to
pywpsdefinitions. Multiple values (repeated objects with corresponding IDs) are adapted to arrays as needed. All WPS Complex types are converted to appropriate locations based on data or reference specification.
- static make_literal_input(input_definition: pywps.inout.inputs.LiteralInput) weaver.typedefs.JSON[source]
Converts Literal Data representations to compatible CWL contents with JSON encodable values.
- static make_location_bbox(input_definition: pywps.inout.inputs.BoundingBoxInput) None[source]
Convert a Bounding Box to a compatible CWL
Fileusing corresponding IOHandler of a Complex input.
- make_location_input_security_check(input_scheme: str, input_type: weaver.typedefs.CWL_IO_ComplexType, input_id: str, input_location: str, input_definition: pywps.inout.inputs.ComplexInput) str[source]
Perform security access validation of the reference, and resolve it afterwards if accessible.
Auto-map local file if possible to avoid useless download from current server. Resolve Vault reference with local file stored after decryption.
- Returns:
Updated file location if any resolution occurred.
- make_location_input(cwl_input_def: weaver.processes.convert.CWLIODefinition, input_definition: pywps.inout.inputs.ComplexInput | pywps.inout.inputs.BoundingBoxInput) weaver.typedefs.JSON | None[source]
Generates the JSON content required to specify a CWL
FileorDirectoryinput from a location.If the input reference corresponds to an HTTP URL that is detected as matching the local WPS output endpoint, implicitly convert the reference to the local WPS output directory to avoid useless download of available file. Since that endpoint could be protected though, perform a minimal HEAD request to validate its accessibility. Otherwise, this operation could incorrectly grant unauthorized access to protected files by forging the URL.
If the process requires
OpenSearchreferences that should be preserved as is, scheme defined byweaver.processes.constants.OpenSearchField.LOCAL_FILE_SCHEMEprefix instead ofhttp(s)://is expected.Any other variant of file reference will be fetched as applicable by the relevant schemes.
If the reference corresponds to a
Directory, all files that can be located in it will be fetched as applicable by the relevant scheme of the reference. It is up to the remote location to provide listing capabilities accordingly to view available files.See also
Documentation details of resolution based on schemes defined in File Reference Types section.
- make_outputs(cwl_result: weaver.typedefs.CWL_Results) None[source]
Maps CWL result outputs to corresponding WPS outputs.
- make_array_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str) None[source]
Converts an array output into a JSON literal representation.
- make_literal_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str, index: int | None = None) None[source]
Converts Literal Data representations to compatible CWL contents with JSON encodable values.
- make_bbox_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str, index: int | None = None) None[source]
Generates the WPS Bounding Box output from the CWL
File.Assumes that location outputs were resolved beforehand, such that the file is available locally.
- make_location_output(cwl_result: weaver.typedefs.CWL_Results, output_id: str, index: int | None = None) None[source]
Rewrite the WPS output with required location using result path from CWL execution.
Configures the parameters such that
pywpswill either auto-resolve the local paths to match with URL defined byweaver.wps_output_urlor upload it to S3 bucket fromweaver.wps_output_s3_bucketand provide reference directly.See also
weaver.wps.load_pywps_config()
- static resolve_output_format(output: pywps.inout.outputs.ComplexOutput, result_path: str, result_cwl_format: str | None) None[source]
Resolves the obtained media-type for an output CWL
File.Considers CWL results
format, the file path, and the Process description to resolve the best possible match, retaining a much as possible the metadata that can be resolved from their corresponding details.When the media-type is resolved, ensure that an appropriate format validator is applied to perform relevant checks, or omit them when not implemented.
- make_location_storage(storage_type: pywps.inout.storage.STORE_TYPE, location_type: weaver.processes.constants.PACKAGE_COMPLEX_TYPES, output_id: str) pywps.inout.storage.file.FileStorage | pywps.inout.storage.s3.S3Storage | DirectoryNestedStorage[source]
Generates the relevant storage implementation with requested types and references.
- Parameters:
storage_type – Where to store the outputs.
location_type – Type of output as defined by CWL package type.
output_id – expected output identifier that will employ this storage.
- Returns:
Storage implementation.
- make_tool(toolpath_object: weaver.typedefs.CWL_ToolPathObject, loading_context: cwltool.context.LoadingContext) cwltool.process.Process[source]
Method called by
cwltoolto generate the tool object from the CWL definition.
- get_job_process_definition(job_name: str, job_order: weaver.typedefs.CWL_WorkflowInputs) WpsPackage[source]
Obtain the execution job definition for the given process (Workflow step implementation).
This function is called before running an ADES Job (either from a workflow step or simple EMS Job dispatching).
It must return a
weaver.processes.wps_process.WpsProcessinstance configured with the proper CWL package definition, ADES target and cookies to access it (if protected).- Parameters:
job_name – The workflow step or the package ID that must be executed.
job_order – Execution input values submitted for the job.