weaver.processes.wps_package
¶
Functions and classes that offer interoperability and conversion between corresponding elements defined as CWL CommandLineTool/Workflow and WPS ProcessDescription in order to generate ADES/EMS Application Package.
See also
weaver.wps_restapi.api
conformance details
Module Contents¶
-
weaver.processes.wps_package.
retrieve_package_job_log
(execution: WPSExecution, job: Job) → None[source]¶ Obtains the underlying WPS execution log from the status file to add them after existing job log entries.
-
weaver.processes.wps_package.
get_process_location
(process_id_or_url: Union[Dict[AnyStr, Any], AnyStr], data_source: Optional[AnyStr] = None) → AnyStr[source]¶ Obtains the URL of a WPS REST DescribeProcess given the specified information.
- Parameters
process_id_or_url – process “identifier” or literal URL to DescribeProcess WPS-REST location.
data_source – identifier of the data source to map to specific ADES, or map to localhost if
None
.
- Returns
URL of EMS or ADES WPS-REST DescribeProcess.
-
weaver.processes.wps_package.
get_package_workflow_steps
(package_dict_or_url: Union[Dict[AnyStr, Any], AnyStr]) → List[Dict[AnyStr, AnyStr]][source]¶ - Parameters
package_dict_or_url – process package definition or literal URL to DescribeProcess WPS-REST location.
- Returns
list of workflow steps as {“name”: <name>, “reference”: <reference>} where name is the generic package step name, and reference is the id/url of a registered WPS package.
-
weaver.processes.wps_package.
complex2json
(data: Union[ComplexData, Any]) → Union[JSON, Any][source]¶ Obtains the JSON representation of a
ComplexData
or simply return the unmatched type.
-
weaver.processes.wps_package.
metadata2json
(meta: Union[ANY_Metadata_Type, Any], force: bool = False) → Union[JSON, Any][source]¶ Obtains the JSON representation of a
OwsMetadata
orpywps.app.Common.Metadata
. Otherwise, simply return the unmatched type. If requested, can enforce parsing a dictionary for the corresponding keys.
-
weaver.processes.wps_package.
get_process_definition
(process_offering: JSON, reference: Optional[AnyStr] = None, package: Optional[CWL] = None, data_source: Optional[AnyStr] = None) → JSON[source]¶ Returns an updated process definition dictionary ready for storage using provided WPS
process_offering
and a package definition passed byreference
orpackage
CWL content. The returned process information can be used later on to load an instance ofweaver.wps_package.WpsPackage
.- Parameters
process_offering – WPS REST-API (WPS-3) process offering as JSON.
reference – URL to CWL package definition, WPS-1 DescribeProcess endpoint or WPS-3 Process endpoint.
package – literal CWL package definition (YAML or JSON format).
data_source – where to resolve process IDs (default: localhost if
None
).
- Returns
updated process definition with resolved/merged information from
package
/reference
.
-
class
weaver.processes.wps_package.
WpsPackage
(**kw)[source]¶ - Parameters
handler – A callable that gets invoked for each incoming request. It should accept a single
pywps.app.WPSRequest
argument and return apywps.app.WPSResponse
object.identifier (string) – Name of this process.
title (string) – Human readable title of process.
abstract (string) – Brief narrative description of the process.
keywords (list) – Keywords that characterize a process.
inputs – List of inputs accepted by this process. They should be
LiteralInput
andComplexInput
andBoundingBoxInput
objects.outputs – List of outputs returned by this process. They should be
LiteralOutput
andComplexOutput
andBoundingBoxOutput
objects.metadata – List of metadata advertised by this process. They should be
pywps.app.Common.Metadata
objects.translations (dict[str,dict[str,str]]) – The first key is the RFC 4646 language code, and the nested mapping contains translated strings accessible by a string property. e.g. {“fr-CA”: {“title”: “Mon titre”, “abstract”: “Une description”}}
Creates a WPS-3 Process instance to execute a CWL package definition.
Process parameters should be loaded from an existing
weaver.datatype.Process
instance generated usingweaver.wps_package.get_process_definition()
.Provided
kw
should correspond toweaver.datatype.Process.params_wps()
-
setup_logger
(self: bool, log_stdout_stderr=True)[source]¶ Configures useful loggers to catch most of the common output and/or error messages during package execution.
-
insert_package_log
(self, result)[source]¶ Retrieves additional CWL sub-process logs captures to retrieve internal application output and/or errors.
After execution of this method, the WPS output log (which can be obtained by
retrieve_package_job_log()
) will have additionalstderr/stdout
entries extracted from the underlying application package tool execution.The outputs and errors are inserted as best as possible in the logical order to make reading of the merged logs appear as a natural and chronological order. In the event that both output and errors are available, they are appended one after another as merging in an orderly fashion cannot be guaranteed by outside CWL runner.
- Parameters
result – output results returned from the CWL package instance execution.
Todo
improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131)
See also
-
update_requirements
(self)[source]¶ Inplace modification of
package
to remove invalid items that would break behaviour we must enforce.
-
update_effective_user
(self)[source]¶ Update effective user/group for the Application Package to be executed.
FIXME: (experimental) update user/group permissions
Reducing permissions is safer inside docker application since weaver/cwltool could be running as root but this requires that mounted volumes have the required permissions so euid:egid can use them.
Overrides
cwltool
’s function to retrieve user/group id for ones we enforce.
-
update_status
(self: AnyStr, message: Number, progress: AnyStatusType, status)[source]¶ Updates the PyWPS real job status from a specified parameters.
-
step_update_status
(self: AnyStr, message: Number, progress: Number, start_step_progress: Number, end_step_progress: AnyStr, step_name: AnyValue, target_host: AnyStr, status)[source]¶
-
log_message
(self: AnyStatusType, status: AnyStr, message: Optional[Number], progress: int = None, level=logging.INFO)[source]¶
-
exception_message
(self: Type[Exception], exception_type: Optional[Exception], exception: AnyStr = None, message: AnyStatusType = 'no message', status: int = STATUS_EXCEPTION, level=logging.ERROR)[source]¶
-
classmethod
map_step_progress
(cls: int, step_index: int, steps_total)[source]¶ Calculates the percentage progression of a single step of the full process.
Note
The step procession is adjusted according to delimited start/end of the underlying CWL execution to provide a continuous progress percentage over the complete execution. Otherwise, we would have values that jump around according to whichever progress the underlying remote WPS or monitored CWL employs, if any is provided.
-
_handler
(self: WPSRequest, request: ExecuteResponse, response)[source]¶ Method called when process receives the WPS execution request.
-
must_fetch
(self: AnyStr, input_ref)[source]¶ Figures out if file reference should be fetched immediately for local execution. If anything else than local script/docker, remote ADES/WPS process will fetch it. S3 are handled here to avoid error on remote WPS not supporting it.
See also
-
make_location_input
(self: AnyStr, input_type: ComplexInput, input_definition)[source]¶ Generates the JSON content required to specify a CWL
File
input definition from a location.Note
If the process requires
OpenSearch
references that should be preserved as is, use scheme defined byweaver.processes.constants.OPENSEARCH_LOCAL_FILE_SCHEME
prefix instead ofhttp(s)://
.
-
make_outputs
(self: CWLResults, cwl_result)[source]¶ Maps CWL result outputs to corresponding WPS outputs.
-
make_location_output
(self: CWLResults, cwl_result: AnyStr, output_id)[source]¶ Rewrite the WPS output with required location using result path from CWL execution.
Configures the parameters such that PyWPS will either auto-resolve the local paths to match with URL defined by
weaver.wps_output_url
or upload it to S3 bucket fromweaver.wps_output_s3_bucket
and provide reference directly.See also
-
get_application_requirement
(self)[source]¶ Obtains the first item in CWL package
requirements
orhints
that corresponds to a Weaver-specific application type as defined inCWL_REQUIREMENT_APP_TYPES
.- Returns
dictionary that minimally has
class
field, and optionally other parameters from that requirement.
-
get_job_process_definition
(self: AnyStr, jobname: JSON, joborder: CWL, tool)[source]¶ This function is called before running an ADES job (either from a workflow step or a simple EMS dispatch). It must return a WpsProcess instance configured with the proper package, ADES target and cookies.
- Parameters
jobname – The workflow step or the package id that must be launch on an ADES
string
joborder – The params for the job
dict {input_name: input_value}
input_value is one of input_object or array [input_object] input_object is one of string or dict {class: File, location: string} in our case input are expected to be File objecttool – Whole CWL config including hints requirement