weaver.processes.wps_package

Functions and classes that offer interoperability and conversion between corresponding elements defined as CWL CommandLineTool/Workflow and WPS ProcessDescription in order to generate ADES/EMS Application Package.

Module Contents

weaver.processes.wps_package.CWLRequirement[source]
weaver.processes.wps_package.LOGGER[source]
weaver.processes.wps_package.PACKAGE_DEFAULT_FILE_NAME = package[source]
weaver.processes.wps_package.PACKAGE_EXTENSIONS[source]
weaver.processes.wps_package.PACKAGE_OUTPUT_HOOK_LOG_UUID = PACKAGE_OUTPUT_HOOK_LOG_{}[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_PREP_LOG = 1[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_LAUNCHING = 2[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_LOADING = 5[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_GET_INPUT = 6[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_ADD_EO_IMAGES = 7[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_CONVERT_INPUT = 8[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_CWL_RUN = 10[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_CWL_DONE = 95[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_PREP_OUT = 98[source]
weaver.processes.wps_package.PACKAGE_PROGRESS_DONE = 100[source]
weaver.processes.wps_package.get_status_location_log_path(status_location: str, out_dir: Optional[str] = None)str[source]
weaver.processes.wps_package.retrieve_package_job_log(execution: owslib.wps.WPSExecution, job: weaver.datatype.Job, progress_min: weaver.typedefs.Number = 0, progress_max: weaver.typedefs.Number = 100)None[source]

Obtains the underlying WPS execution log from the status file to add them after existing job log entries.

weaver.processes.wps_package.get_process_location(process_id_or_url: Union[Dict[str, Any], str], data_source: Optional[str] = None)str[source]

Obtains the URL of a WPS REST DescribeProcess given the specified information.

Parameters
  • process_id_or_url – process “identifier” or literal URL to DescribeProcess WPS-REST location.

  • data_source – identifier of the data source to map to specific ADES, or map to localhost if None.

Returns

URL of EMS or ADES WPS-REST DescribeProcess.

weaver.processes.wps_package.get_package_workflow_steps(package_dict_or_url: Union[Dict[str, Any], str])List[Dict[str, str]][source]
Parameters

package_dict_or_url – process package definition or literal URL to DescribeProcess WPS-REST location.

Returns

list of workflow steps as {“name”: <name>, “reference”: <reference>} where name is the generic package step name, and reference is the id/url of a registered WPS package.

weaver.processes.wps_package._fetch_process_info(process_info_url: str, fetch_error: Type[Exception])weaver.typedefs.JSON[source]

Fetches the JSON process information from the specified URL and validates that it contains something.

Raises

fetch_error – provided exception with URL message if the process information could not be retrieved.

weaver.processes.wps_package._get_process_package(process_url: str)Tuple[weaver.typedefs.CWL, str][source]

Retrieves the WPS process package content from given process ID or literal URL.

Parameters

process_url – process literal URL to DescribeProcess WPS-REST location.

Returns

tuple of package body as dictionary and package reference name.

weaver.processes.wps_package._get_process_payload(process_url: str)weaver.typedefs.JSON[source]

Retrieves the WPS process payload content from given process ID or literal URL.

Parameters

process_url – process literal URL to DescribeProcess WPS-REST location.

Returns

payload body as dictionary.

weaver.processes.wps_package._get_package_type(package_dict: weaver.typedefs.CWL)Union[weaver.processes.types.PROCESS_APPLICATION, weaver.processes.types.PROCESS_WORKFLOW][source]
weaver.processes.wps_package._get_package_requirements_as_class_list(requirements: AnyCWLRequirements)ListCWLRequirements[source]

Converts CWL package requirements or hints sometime defined as Dict[<req>: {<params>}] to an explicit list of dictionary requirements with class key.

weaver.processes.wps_package._get_package_ordered_io(io_section: Union[List[weaver.typedefs.JSON], Dict[str, Union[weaver.typedefs.JSON, str]]], order_hints: Optional[List[weaver.typedefs.JSON]] = None)List[weaver.typedefs.JSON][source]

Converts CWL package I/O definitions defined as dictionary to an equivalent list representation. The list representation ensures that I/O order is preserved when written to file and reloaded afterwards regardless of each server and/or library’s implementation of dict container.

If this function fails to correctly order any I/O or cannot correctly guarantee such result because of the provided parameters (e.g.: no hints given when required), the result will not break nor change the final processing behaviour of the CWL engine. This is merely cosmetic adjustments to ease readability of I/O to avoid always shuffling their order across multiple application package reporting.

The important result of this function is to provide the CWL I/O as a consistent list of objects so it is less cumbersome to compare/merge/iterate over the elements with all functions that will follow.

Note

When defined as a dictionary, an OrderedDict is expected as input to ensure preserved field order. Prior to Python 3.7 or CPython 3.5, preserved order is not guaranteed for builtin dict. In this case the order_hints is required to ensure same order.

Parameters
  • io_section – Definition contained under the CWL inputs or outputs package fields.

  • order_hints – Optional/partial list of WPS I/O definitions hinting an order to sort CWL unsorted-dict I/O.

Returns

I/O specified as list of dictionary definitions with preserved order (as best as possible).

weaver.processes.wps_package._check_package_file(cwl_file_path_or_url: str)Tuple[str, bool][source]

Validates that the specified CWL file path or URL points to an existing and allowed file format.

Parameters

cwl_file_path_or_url – one of allowed file types path on disk, or an URL pointing to one served somewhere.

Returns

absolute_path, is_url: absolute path or URL, and boolean indicating if it is a remote URL file.

Raises

PackageRegistrationError – in case of missing file, invalid format or invalid HTTP status code.

weaver.processes.wps_package._load_package_file(file_path: str)weaver.typedefs.CWL[source]

Loads the package in YAML/JSON format specified by the file path.

weaver.processes.wps_package._load_package_content(package_dict: Ellipsis, package_name: str = PACKAGE_DEFAULT_FILE_NAME, data_source: Optional[str] = None, only_dump_file: bool = False, tmp_dir: Optional[str] = None, loading_context: Optional[cwltool.context.LoadingContext] = None, runtime_context: Optional[cwltool.context.RuntimeContext] = None, process_offering: Optional[weaver.typedefs.JSON] = None)Optional[Tuple[cwltool.factory.Callable, str, Dict[str, str]]][source]

Loads the package content to file in a temporary directory. Recursively processes sub-packages steps if the parent is a Workflow (CWL class).

Parameters
  • package_dict – package content representation as a json dictionary.

  • package_name – name to use to create the package file.

  • data_source – identifier of the data source to map to specific ADES, or map to localhost if None.

  • only_dump_file – specify if the CWLFactoryCallable should be validated and returned.

  • tmp_dir – location of the temporary directory to dump files (deleted on exit).

  • loading_context – cwltool context used to create the cwl package (required if only_dump_file=False)

  • runtime_context – cwltool context used to execute the cwl package (required if only_dump_file=False)

  • process_offering – JSON body of the process description payload (used as I/O hint ordering)

Returns

if only_dump_file is True: None otherwise, tuple of:

  • instance of CWLFactoryCallable

  • package type (PROCESS_WORKFLOW or PROCESS_APPLICATION)

  • mapping of each step ID with their package name that must be run

Warning

Specified tmp_dir will be deleted on exit.

weaver.processes.wps_package._merge_package_inputs_outputs(wps_inputs_list: Ellipsis, cwl_inputs_list: List[weaver.processes.convert.WPS_Input_Type], wps_outputs_list: List[weaver.processes.convert.ANY_IO_Type], cwl_outputs_list: List[weaver.processes.convert.WPS_Output_Type])Tuple[List[weaver.processes.convert.JSON_IO_Type], List[weaver.processes.convert.JSON_IO_Type]][source]

Merges I/O definitions to use for process creation and returned by GetCapabilities, DescribeProcess using the WPS specifications (from request POST) and CWL specifications (extracted from file).

Note:

parameters cwl_inputs_list and cwl_outputs_list are expected to be in WPS-like format (ie: CWL I/O converted to corresponding WPS I/O)

weaver.processes.wps_package._get_package_io(package_factory: cwltool.factory.Callable, io_select: str, as_json: bool)List[weaver.processes.convert.PKG_IO_Type][source]

Retrieves I/O definitions from a validated CWLFactoryCallable. Returned I/O format depends on value as_json.

weaver.processes.wps_package._get_package_inputs_outputs(package_factory: Ellipsis, as_json: bool = False)Tuple[List[weaver.processes.convert.PKG_IO_Type], List[weaver.processes.convert.PKG_IO_Type]][source]

Generates WPS-like (inputs, outputs) tuple using parsed CWL package definitions.

weaver.processes.wps_package._update_package_metadata(wps_package_metadata: weaver.typedefs.JSON, cwl_package_package: weaver.typedefs.CWL)None[source]

Updates the package WPS metadata dictionary from extractable CWL package definition.

weaver.processes.wps_package._generate_process_with_cwl_from_reference(reference: str)Tuple[weaver.typedefs.CWL, weaver.typedefs.JSON][source]

Resolves the reference type (CWL, WPS-1, WPS-2, WPS-3) and generates a CWL package from it. Additionally provides minimal process details retrieved from the reference.

weaver.processes.wps_package.get_process_definition(process_offering: weaver.typedefs.JSON, reference: Optional[str] = None, package: Optional[weaver.typedefs.CWL] = None, data_source: Optional[str] = None)weaver.typedefs.JSON[source]

Returns an updated process definition dictionary ready for storage using provided WPS process_offering and a package definition passed by reference or package CWL content. The returned process information can be used later on to load an instance of weaver.wps_package.WpsPackage.

Parameters
  • process_offeringWPS REST-API (WPS-3) process offering as JSON.

  • reference – URL to CWL package definition, WPS-1 DescribeProcess endpoint or WPS-3 Process endpoint.

  • package – literal CWL package definition (YAML or JSON format).

  • data_source – where to resolve process IDs (default: localhost if None).

Returns

updated process definition with resolved/merged information from package/reference.

class weaver.processes.wps_package.WpsPackage(**kw)[source]
Parameters
  • handler – A callable that gets invoked for each incoming request. It should accept a single pywps.app.WPSRequest argument and return a pywps.app.WPSResponse object.

  • identifier (string) – Name of this process.

  • title (string) – Human readable title of process.

  • abstract (string) – Brief narrative description of the process.

  • keywords (list) – Keywords that characterize a process.

  • inputs – List of inputs accepted by this process. They should be LiteralInput and ComplexInput and BoundingBoxInput objects.

  • outputs – List of outputs returned by this process. They should be LiteralOutput and ComplexOutput and BoundingBoxOutput objects.

  • metadata – List of metadata advertised by this process. They should be pywps.app.Common.Metadata objects.

  • translations (dict[str,dict[str,str]]) – The first key is the RFC 4646 language code, and the nested mapping contains translated strings accessible by a string property. e.g. {“fr-CA”: {“title”: “Mon titre”, “abstract”: “Une description”}}

Creates a WPS-3 Process instance to execute a CWL application package definition.

Process parameters should be loaded from an existing weaver.datatype.Process instance generated using weaver.wps_package.get_process_definition().

Provided kw should correspond to weaver.datatype.Process.params_wps()

package :Optional[weaver.typedefs.CWL][source]
package_id :Optional[str][source]
package_type :Optional[str][source]
package_log_hook_stderr :Optional[str][source]
package_log_hook_stdout :Optional[str][source]
percent :Optional[weaver.typedefs.Number][source]
remote_execution :Optional[bool][source]
log_file :Optional[str][source]
log_level :Optional[int][source]
logger :Optional[logging.Logger][source]
step_packages :Optional[List[weaver.typedefs.CWL]][source]
step_launched :Optional[List[str]][source]
request :Optional[pywps.app.WPSRequest][source]
response :Optional[pywps.response.execute.ExecuteResponse][source]
setup_loggers(self: bool, log_stdout_stderr=True)None[source]

Configures useful loggers to catch most of the common output and/or error messages during package execution.

insert_package_log(self: Union[CWLResults, cwltool.factory.WorkflowStatus], result)List[str][source]

Retrieves additional CWL sub-process logs captures to retrieve internal application output and/or errors.

After execution of this method, the WPS output log (which can be obtained by retrieve_package_job_log()) will have additional stderr/stdout entries extracted from the underlying application package tool execution.

The outputs and errors are inserted as best as possible in the logical order to make reading of the merged logs appear as a natural and chronological order. In the event that both output and errors are available, they are appended one after another as merging in an orderly fashion cannot be guaranteed by outside CWL runner.

Note

In case of any exception, log reporting is aborted and ignored.

Todo

Improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131)

Parameters

result – output results returned by successful CWL package instance execution or raised CWL exception.

Returns

captured execution log lines retrieved from files

update_requirements(self)[source]

Inplace modification of package to adjust invalid items that would break behaviour we must enforce.

update_effective_user(self)[source]

Update effective user/group for the Application Package to be executed.

FIXME: (experimental) update user/group permissions

Reducing permissions is safer inside docker application since weaver/cwltool could be running as root but this requires that mounted volumes have the required permissions so euid:egid can use them.

Overrides cwltool’s function to retrieve user/group id for ones we enforce.

update_status(self: str, message: weaver.typedefs.Number, progress: weaver.status.AnyStatusType, status)None[source]

Updates the PyWPS real job status from a specified parameters.

step_update_status(self: str, message: weaver.typedefs.Number, progress: weaver.typedefs.Number, start_step_progress: weaver.typedefs.Number, end_step_progress: str, step_name: pywps.inout.literaltypes.AnyValue, target_host: str, status)None[source]
log_message(self: weaver.status.AnyStatusType, status: str, message: Optional[weaver.typedefs.Number], progress: int = None, level=logging.INFO)None[source]
exception_message(self: Type[Exception], exception_type: Optional[Exception], exception: str = None, message: weaver.status.AnyStatusType = 'no message', status: int = STATUS_EXCEPTION, level=logging.ERROR)Exception[source]

Logs to the job the specified error message with the provided exception type.

Returns

formatted exception with message to be raised by calling function.

classmethod map_step_progress(cls: int, step_index: int, steps_total)weaver.typedefs.Number[source]

Calculates the percentage progression of a single step of the full process.

Note

The step procession is adjusted according to delimited start/end of the underlying CWL execution to provide a continuous progress percentage over the complete execution. Otherwise, we would have values that jump around according to whichever progress the underlying remote WPS or monitored CWL employs, if any is provided.

_handler(self: pywps.app.WPSRequest, request: pywps.response.execute.ExecuteResponse, response)pywps.response.execute.ExecuteResponse[source]

Method called when process receives the WPS execution request.

must_fetch(self: str, input_ref)bool[source]

Figures out if file reference should be fetched immediately for local execution. If anything else than local script/docker, remote ADES/WPS process will fetch it. S3 are handled here to avoid error on remote WPS not supporting it.

make_inputs(self: Ellipsis, wps_inputs: Dict[str, Deque[weaver.processes.convert.WPS_Input_Type]], cwl_inputs_info: Dict[str, weaver.processes.convert.CWL_Input_Type])Dict[str, weaver.typedefs.ValueType][source]

Converts WPS input values to corresponding CWL input values for processing by CWL package instance.

The WPS inputs must correspond to pywps definitions. Multiple values are adapted to arrays as needed. WPS Complex types (files) are converted to appropriate locations based on data or reference specification.

Parameters
  • wps_inputs – actual WPS inputs parsed from execution request

  • cwl_inputs_info – expected CWL input definitions for mapping

Returns

CWL input values

make_location_input(self: str, input_type: pywps.inout.ComplexInput, input_definition)weaver.typedefs.JSON[source]

Generates the JSON content required to specify a CWL File input definition from a location.

Note

If the process requires OpenSearch references that should be preserved as is, use scheme defined by weaver.processes.constants.OPENSEARCH_LOCAL_FILE_SCHEME prefix instead of http(s)://.

make_outputs(self: CWLResults, cwl_result)None[source]

Maps CWL result outputs to corresponding WPS outputs.

make_location_output(self: CWLResults, cwl_result: str, output_id)None[source]

Rewrite the WPS output with required location using result path from CWL execution.

Configures the parameters such that PyWPS will either auto-resolve the local paths to match with URL defined by weaver.wps_output_url or upload it to S3 bucket from weaver.wps_output_s3_bucket and provide reference directly.

See also

  • weaver.wps.load_pywps_config()

make_tool(self: weaver.typedefs.ToolPathObjectType, toolpath_object: cwltool.context.LoadingContext, loading_context)cwltool.process.Process[source]
get_application_requirement(self)Dict[str, Any][source]

Obtains the first item in CWL package requirements or hints that corresponds to a Weaver-specific application type as defined in CWL_REQUIREMENT_APP_TYPES.

Returns

dictionary that minimally has class field, and optionally other parameters from that requirement.

get_job_process_definition(self: str, jobname: weaver.typedefs.JSON, joborder: weaver.typedefs.CWL, tool)WpsPackage[source]

This function is called before running an ADES job (either from a workflow step or a simple EMS dispatch). It must return a weaver.processes.wps_process.WpsProcess instance configured with the proper CWL package definition, ADES target and cookies to access it (if protected).

Parameters
  • jobname – The workflow step or the package id that must be launched on an ADES string

  • joborder – The params for the job dict {input_name: input_value} input_value is one of input_object or array [input_object] input_object is one of string or dict {class: File, location: string} in our case input are expected to be File object

  • tool – Whole CWL config including hints requirement (see: weaver.processes.constants.CWL_REQUIREMENT_APP_TYPES)