weaver.processes.wps_package
¶
Representation of WPS process with an internal CWL package definition.
Functions and classes that offer interoperability and conversion between corresponding elements defined as CWL CommandLineTool/Workflow and WPS ProcessDescription in order to generate ADES/EMS deployable Application Package.
See also
weaver.wps_restapi.api
conformance details
Module Contents¶
-
weaver.processes.wps_package.
get_status_location_log_path
(status_location: str, out_dir: Optional[str] = None) → str[source]¶
-
weaver.processes.wps_package.
retrieve_package_job_log
(execution: owslib.wps.WPSExecution, job: weaver.datatype.Job, progress_min: weaver.typedefs.Number = 0, progress_max: weaver.typedefs.Number = 100) → None[source]¶ Obtains the underlying WPS execution log from the status file to add them after existing job log entries.
-
weaver.processes.wps_package.
get_process_location
(process_id_or_url: Union[Dict[str, Any], str], data_source: Optional[str] = None) → str[source]¶ Obtains the URL of a WPS REST DescribeProcess given the specified information.
- Parameters
process_id_or_url -- process "identifier" or literal URL to DescribeProcess WPS-REST location.
data_source -- identifier of the data source to map to specific ADES, or map to localhost if
None
.
- Returns
URL of EMS or ADES WPS-REST DescribeProcess.
-
weaver.processes.wps_package.
get_package_workflow_steps
(package_dict_or_url: Union[Dict[str, Any], str]) → List[Dict[str, str]][source]¶ Obtain references to intermediate steps of a CWL workflow.
- Parameters
package_dict_or_url -- process package definition or literal URL to DescribeProcess WPS-REST location.
- Returns
list of workflow steps as {"name": <name>, "reference": <reference>} where name is the generic package step name, and reference is the id/url of a registered WPS package.
-
weaver.processes.wps_package.
_fetch_process_info
(process_info_url: str, fetch_error: Type[Exception]) → weaver.typedefs.JSON[source]¶ Fetches the JSON process information from the specified URL and validates that it contains something.
- Raises
fetch_error -- provided exception with URL message if the process information could not be retrieved.
-
weaver.processes.wps_package.
_get_process_package
(process_url: str) → Tuple[weaver.typedefs.CWL, str][source]¶ Retrieves the WPS process package content from given process ID or literal URL.
- Parameters
process_url -- process literal URL to DescribeProcess WPS-REST location.
- Returns
tuple of package body as dictionary and package reference name.
-
weaver.processes.wps_package.
_get_process_payload
(process_url: str) → weaver.typedefs.JSON[source]¶ Retrieves the WPS process payload content from given process ID or literal URL.
- Parameters
process_url -- process literal URL to DescribeProcess WPS-REST location.
- Returns
payload body as dictionary.
-
weaver.processes.wps_package.
_get_package_type
(package_dict: weaver.typedefs.CWL) → Union[weaver.processes.types.PROCESS_APPLICATION, weaver.processes.types.PROCESS_WORKFLOW][source]¶
-
weaver.processes.wps_package.
_get_package_requirements_as_class_list
(requirements: weaver.typedefs.CWL_AnyRequirements) → weaver.typedefs.CWL_RequirementsList[source]¶ Converts CWL package
requirements
orhints
into list representation.Uniformization CWL requirements into the list representation, whether the input definitions where provided using the dictionary definition as
{"<req-class>": {<params>}}
or the list of dictionary requirements[{<req-class+params>}]
each with aclass
key.
-
weaver.processes.wps_package.
_check_package_file
(cwl_file_path_or_url: str) → str[source]¶ Validates that the specified CWL file path or URL points to an existing and allowed file format.
- Parameters
cwl_file_path_or_url -- one of allowed file types path on disk, or an URL pointing to one served somewhere.
- Returns
validated absolute path or URL of the file reference.
- Raises
PackageRegistrationError -- in case of missing file, invalid format or invalid HTTP status code.
-
weaver.processes.wps_package.
_load_package_file
(file_path: str) → weaver.typedefs.CWL[source]¶ Loads the package in YAML/JSON format specified by the file path.
-
weaver.processes.wps_package.
_load_package_content
(package_dict: Ellipsis, package_name: str = PACKAGE_DEFAULT_FILE_NAME, data_source: Optional[str] = None, only_dump_file: bool = False, tmp_dir: Optional[str] = None, loading_context: Optional[cwltool.context.LoadingContext] = None, runtime_context: Optional[cwltool.context.RuntimeContext] = None, process_offering: Optional[weaver.typedefs.JSON] = None) → Optional[Tuple[cwltool.factory.Callable, str, Dict[str, str]]][source]¶ Loads CWL package definition using various contextual resources.
Following operations are accomplished to validate the package:
Starts by resolving any intermediate sub-packages steps if the parent package is a Workflow (CWL class), in order to recursively generate and validate their process and package, potentially using remote reference. Each of those operations are applied to every step.
Package I/O are reordered using any reference process offering hints if provided to generate consistent results.
The resulting package definition is dumped to a temporary JSON file, to validate the content can be serialized.
Optionally, the CWL factory is employed to create the application runner, validating any provided loading and runtime contexts, and considering all Workflow steps if applicable, or the single application otherwise.
- Parameters
package_dict -- package content representation as a json dictionary.
package_name -- name to use to create the package file.
data_source -- identifier of the data source to map to specific ADES, or map to localhost if
None
.only_dump_file -- specify if the
CWLFactoryCallable
should be validated and returned.tmp_dir -- location of the temporary directory to dump files (deleted on exit).
loading_context -- cwltool context used to create the cwl package (required if
only_dump_file=False
)runtime_context -- cwltool context used to execute the cwl package (required if
only_dump_file=False
)process_offering -- JSON body of the process description payload (used as I/O hint ordering)
- Returns
If
only_dump_file
isTrue
:None
. Otherwise, tuple of: - instance ofCWLFactoryCallable
- package type (PROCESS_WORKFLOW
orPROCESS_APPLICATION
) - mapping of each step ID with their package name that must be run
Warning
Specified
tmp_dir
will be deleted on exit.
-
weaver.processes.wps_package.
_merge_package_inputs_outputs
(wps_inputs_defs: Ellipsis, cwl_inputs_list: List[weaver.processes.convert.WPS_Input_Type], wps_outputs_defs: Union[List[weaver.processes.convert.ANY_IO_Type], Dict[str, weaver.processes.convert.ANY_IO_Type]], cwl_outputs_list: List[weaver.processes.convert.WPS_Output_Type]) → Tuple[List[weaver.processes.convert.JSON_IO_Type], List[weaver.processes.convert.JSON_IO_Type]][source]¶ Merges corresponding metadata of I/O definitions from CWL and WPS sources.
Merges I/O definitions to use for process creation and returned by
GetCapabilities
,DescribeProcess
using the WPS specifications (from requestPOST
) and CWL specifications (extracted from file).Note
Parameters
cwl_inputs_list
andcwl_outputs_list
are expected to be in WPS-like format (i.e.: CWL I/O converted to corresponding WPS I/O objects).See also
Conversion of CWL to WPS-equivalent objects is handled by
_get_package_inputs_outputs()
and its underlying functions.- Parameters
wps_inputs_defs -- list or mapping of provided WPS input definitions.
cwl_inputs_list -- processed list of CWL inputs from the Application Package.
wps_outputs_defs -- list or mapping of provided WPS output definitions.
cwl_outputs_list -- processed list of CWL inputs from the Application Package.
- Returns
Tuple of (inputs, outputs) consisting of lists of I/O with merged contents between CWL and WPS.
-
weaver.processes.wps_package.
_get_package_io
(package_factory: cwltool.factory.Callable, io_select: str, as_json: bool) → List[weaver.processes.convert.PKG_IO_Type][source]¶ Retrieves I/O definitions from a validated
CWLFactoryCallable
.See also
Factory can be obtained with validation using
_load_package_content()
.- Parameters
package_factory -- CWL factory that contains I/O references to the package definition.
io_select -- either
WPS_INPUT
orWPS_OUTPUT
according to what needs to be processed.as_json -- toggle to specific the desired output type.
- Returns
I/O format depends on value
as_json
. IfTrue
, converts the I/O definitions into JSON representation. IfFalse
, converts the I/O definitions into WPS objects.
-
weaver.processes.wps_package.
_get_package_inputs_outputs
(package_factory: Ellipsis, as_json: bool = False) → Tuple[List[weaver.processes.convert.PKG_IO_Type], List[weaver.processes.convert.PKG_IO_Type]][source]¶ Generates WPS-like
(inputs, outputs)
tuple using parsed CWL package definitions.
-
weaver.processes.wps_package.
_update_package_metadata
(wps_package_metadata: weaver.typedefs.JSON, cwl_package_package: weaver.typedefs.CWL) → None[source]¶ Updates the package WPS metadata dictionary from extractable CWL package definition.
-
weaver.processes.wps_package.
_generate_process_with_cwl_from_reference
(reference: str) → Tuple[weaver.typedefs.CWL, weaver.typedefs.JSON][source]¶ Resolves the
reference
type (CWL, WPS-1, WPS-2, WPS-3) and generates a CWLpackage
from it.Additionally provides minimal process details retrieved from the
reference
. The number of details obtained from the process will depend on available parameters from its description as well as the number of metadata that can be mapped between it and the generated CWL package.
-
weaver.processes.wps_package.
get_application_requirement
(package: weaver.typedefs.CWL) → Dict[str, Any][source]¶ Retrieve the principal requirement that allows mapping to the appropriate process implementation.
Obtains the first item in CWL package
requirements
orhints
that corresponds to a Weaver-specific application type as defined inCWL_REQUIREMENT_APP_TYPES
.- Returns
dictionary that minimally has
class
field, and optionally other parameters from that requirement.
-
weaver.processes.wps_package.
check_package_instance_compatible
(package: weaver.typedefs.CWL) → Optional[str][source]¶ Verifies if an Application Package definition is valid for the employed Weaver instance configuration.
Given that the CWL is invalid for the active application, explains the reason why that package always require remote execution.
When a package can sometimes be executed locally (ADES) or remotely (EMS) depending on the instance configuration, such as in the case of a
CWL_REQUIREMENT_APP_DOCKER
, returnNone
. This function instead detects cases where a remote server is mandatory without ambiguity related to the current Weaver instance, regardless whether remote should be an ADES or a remote Provider (WPS or ESGF-CWT).- Parameters
package -- CWL definition for the process.
- Returns
reason message if must be executed remotely or
None
if it could be executed locally.
-
weaver.processes.wps_package.
get_auth_requirements
(requirement: weaver.typedefs.JSON, headers: Optional[weaver.typedefs.AnyHeadersContainer]) → Optional[weaver.datatype.Authentication][source]¶ Extract any authentication related definitions provided in request headers corresponding to the application type.
- Parameters
requirement -- Application Package requirement as defined by CWL requirements.
headers -- Requests headers received during deployment.
- Returns
Matched authentication details when applicable, otherwise None.
- Raises
TypeError -- When the authentication object cannot be created due to invalid or missing inputs.
ValueError -- When the authentication object cannot be created due to incorrectly formed inputs.
-
weaver.processes.wps_package.
get_process_definition
(process_offering: weaver.typedefs.JSON, reference: Optional[str] = None, package: Optional[weaver.typedefs.CWL] = None, data_source: Optional[str] = None, headers: Optional[weaver.typedefs.AnyHeadersContainer] = None) → weaver.typedefs.JSON[source]¶ Resolve the process definition considering corresponding metadata from the offering, package and references.
Returns an updated process definition dictionary ready for storage using provided WPS
process_offering
and a package definition passed byreference
orpackage
CWL content. The returned process information can be used later on to load an instance ofweaver.wps_package.WpsPackage
.- Parameters
process_offering -- WPS REST-API (WPS-3) process offering as JSON.
reference -- URL to CWL package definition, WPS-1 DescribeProcess endpoint or WPS-3 Process endpoint.
package -- literal CWL package definition (YAML or JSON format).
data_source -- where to resolve process IDs (default: localhost if
None
).headers -- Request headers provided during deployment to retrieve details such as authentication tokens.
- Returns
Updated process definition with resolved/merged information from
package
/reference
.
-
class
weaver.processes.wps_package.
WpsPackage
(**kw)[source]¶ - Parameters
handler -- A callable that gets invoked for each incoming request. It should accept a single
pywps.app.WPSRequest
argument and return apywps.app.WPSResponse
object.identifier (string) -- Name of this process.
title (string) -- Human readable title of process.
abstract (string) -- Brief narrative description of the process.
keywords (list) -- Keywords that characterize a process.
inputs -- List of inputs accepted by this process. They should be
LiteralInput
andComplexInput
andBoundingBoxInput
objects.outputs -- List of outputs returned by this process. They should be
LiteralOutput
andComplexOutput
andBoundingBoxOutput
objects.metadata -- List of metadata advertised by this process. They should be
pywps.app.Common.Metadata
objects.translations (dict[str,dict[str,str]]) -- The first key is the RFC 4646 language code, and the nested mapping contains translated strings accessible by a string property. e.g. {"fr-CA": {"title": "Mon titre", "abstract": "Une description"}}
Creates a WPS-3 Process instance to execute a CWL application package definition.
Process parameters should be loaded from an existing
weaver.datatype.Process
instance generated usingweaver.wps_package.get_process_definition()
.Provided
kw
should correspond toweaver.datatype.Process.params_wps()
-
setup_loggers
(self: bool, log_stdout_stderr=True) → None[source]¶ Configures useful loggers to catch most of the common output and/or error messages during package execution.
-
insert_package_log
(self: Union[weaver.typedefs.CWL_Results, cwltool.factory.WorkflowStatus], result) → List[str][source]¶ Retrieves additional CWL sub-process logs captures to retrieve internal application output and/or errors.
After execution of this method, the WPS output log (which can be obtained by
retrieve_package_job_log()
) will have additionalstderr/stdout
entries extracted from the underlying application package tool execution.The outputs and errors are inserted as best as possible in the logical order to make reading of the merged logs appear as a natural and chronological order. In the event that both output and errors are available, they are appended one after another as merging in an orderly fashion cannot be guaranteed by outside CWL runner.
Note
In case of any exception, log reporting is aborted and ignored.
Todo
Improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131)
- Parameters
result -- output results returned by successful CWL package instance execution or raised CWL exception.
- Returns
captured execution log lines retrieved from files
-
setup_docker_image
(self) → Optional[bool][source]¶ Pre-pull the Docker image locally for running the process if authentication is required to get it.
- Returns
success status if operation was successful, or
None
when it does not apply.
-
setup_runtime
(self) → Dict[str, weaver.typedefs.AnyValueType][source]¶ Prepares the runtime parameters for the CWL package execution.
Parameter
weaver.wps_workdir
is the base-dir where sub-dir per application packages will be generated. Parameterworkdir
is the actual location PyWPS reserved for this process (already with sub-dir). If noweaver.wps_workdir
was provided, reuse PyWps parent workdir since we got access to it. Other steps handling outputs need to consider thatCWL<->WPS
out dirs could match because of this.- Returns
resolved runtime parameters
-
update_requirements
(self)[source]¶ Inplace modification of
package
to adjust invalid items that would break behaviour we must enforce.
-
update_effective_user
(self)[source]¶ Update effective user/group for the Application Package to be executed.
FIXME: (experimental) update user/group permissions
Reducing permissions is safer inside docker application since weaver/cwltool could be running as root but this requires that mounted volumes have the required permissions so euid:egid can use them.
Overrides
cwltool
's function to retrieve user/group id for ones we enforce.
-
update_status
(self: str, message: weaver.typedefs.Number, progress: weaver.status.AnyStatusType, status) → None[source]¶ Updates the PyWPS real job status from a specified parameters.
-
step_update_status
(self: str, message: weaver.typedefs.Number, progress: weaver.typedefs.Number, start_step_progress: weaver.typedefs.Number, end_step_progress: str, step_name: weaver.typedefs.AnyValueType, target_host: str, status) → None[source]¶
-
log_message
(self: weaver.status.AnyStatusType, status: str, message: Optional[weaver.typedefs.Number], progress: int = None, level=logging.INFO) → None[source]¶
-
exception_message
(self: Type[Exception], exception_type: Optional[Exception], exception: str = None, message: weaver.status.AnyStatusType = 'no message', status: int = STATUS_EXCEPTION, level=logging.ERROR) → Exception[source]¶ Logs to the job the specified error message with the provided exception type.
- Returns
formatted exception with message to be raised by calling function.
-
property
job
(self) → weaver.datatype.Job[source]¶ Obtain the job associated to this package execution as specified by the provided UUID.
Process must be in "execute" state under
pywps
for this job to be available.
-
classmethod
map_step_progress
(cls: int, step_index: int, steps_total) → weaver.typedefs.Number[source]¶ Calculates the percentage progression of a single step of the full process.
Note
The step procession is adjusted according to delimited start/end of the underlying CWL execution to provide a continuous progress percentage over the complete execution. Otherwise, we would have values that jump around according to whichever progress the underlying remote WPS or monitored CWL employs, if any is provided.
-
_handler
(self: pywps.app.WPSRequest, request: pywps.response.execute.ExecuteResponse, response) → pywps.response.execute.ExecuteResponse[source]¶ Method called when process receives the WPS execution request.
-
must_fetch
(self: str, input_ref) → bool[source]¶ Figures out if file reference should be fetched immediately for local execution.
If anything else than local script/docker, remote ADES/WPS process will fetch it. S3 are handled here to avoid error on remote WPS not supporting it.
See also
-
make_inputs
(self: Ellipsis, wps_inputs: Dict[str, Deque[weaver.processes.convert.WPS_Input_Type]], cwl_inputs_info: Dict[str, weaver.processes.convert.CWL_Input_Type]) → Dict[str, weaver.typedefs.ValueType][source]¶ Converts WPS input values to corresponding CWL input values for processing by CWL package instance.
The WPS inputs must correspond to
pywps
definitions. Multiple values are adapted to arrays as needed. WPSComplex
types (files) are converted to appropriate locations based on data or reference specification.- Parameters
wps_inputs -- actual WPS inputs parsed from execution request
cwl_inputs_info -- expected CWL input definitions for mapping
- Returns
CWL input values
-
make_location_input
(self: str, input_type: pywps.inout.ComplexInput, input_definition) → Optional[weaver.typedefs.JSON][source]¶ Generates the JSON content required to specify a CWL
File
input definition from a location.If the input reference corresponds to an HTTP URL that is detected as matching the local WPS output endpoint, implicitly convert the reference to the local WPS output directory to avoid useless download of available file. Since that endpoint could be protected though, perform a minimal HEAD request to validate its accessibility. Otherwise, this operation could incorrectly grant unauthorized access to protected files by forging the URL.
If the process requires
OpenSearch
references that should be preserved as is, scheme defined byweaver.processes.constants.OPENSEARCH_LOCAL_FILE_SCHEME
prefix instead ofhttp(s)://
is expected.Any other variant of file reference will be fetched as applicable by the relevant schemes.
See also
Documentation details of resolution based on schemes defined in File Reference Types section.
-
make_outputs
(self: weaver.typedefs.CWL_Results, cwl_result) → None[source]¶ Maps CWL result outputs to corresponding WPS outputs.
-
make_location_output
(self: weaver.typedefs.CWL_Results, cwl_result: str, output_id) → None[source]¶ Rewrite the WPS output with required location using result path from CWL execution.
Configures the parameters such that PyWPS will either auto-resolve the local paths to match with URL defined by
weaver.wps_output_url
or upload it to S3 bucket fromweaver.wps_output_s3_bucket
and provide reference directly.See also
weaver.wps.load_pywps_config()
-
make_tool
(self: weaver.typedefs.ToolPathObjectType, toolpath_object: cwltool.context.LoadingContext, loading_context) → cwltool.process.Process[source]¶
-
get_job_process_definition
(self: str, jobname: weaver.typedefs.JSON, joborder: weaver.typedefs.CWL, tool) → WpsPackage[source]¶ Obtain the execution job definition for the given process.
This function is called before running an ADES job (either from a workflow step or a simple EMS dispatch). It must return a
weaver.processes.wps_process.WpsProcess
instance configured with the properCWL
package definition, ADES target and cookies to access it (if protected).- Parameters
jobname -- The workflow step or the package id that must be launched on an ADES
string
joborder -- The params for the job
dict {input_name: input_value}
input_value is one of input_object or array [input_object] input_object is one of string or dict {class: File, location: string} in our case input are expected to be File objecttool -- Whole CWL config including hints requirement (see:
weaver.processes.constants.CWL_REQUIREMENT_APP_TYPES
)