:mod:`weaver.processes.wps_package` =================================== .. py:module:: weaver.processes.wps_package .. autoapi-nested-parse:: Functions and classes that offer interoperability and conversion between corresponding elements defined as `CWL CommandLineTool/Workflow` and `WPS ProcessDescription` in order to generate `ADES/EMS Application Package`. .. seealso:: - `CWL specification `_ - `WPS-1/2 schemas `_ - `WPS-REST schemas `_ - :mod:`weaver.wps_restapi.api` conformance details Module Contents --------------- .. data:: PACKAGE_EXTENSIONS .. data:: DEFAULT_FORMAT .. function:: retrieve_package_job_log(execution, job) -> None Obtains the underlying WPS execution log from the status file to add them after existing job log entries. .. function:: get_process_location(process_id_or_url, data_source=None) -> AnyStr Obtains the URL of a WPS REST DescribeProcess given the specified information. :param process_id_or_url: process "identifier" or literal URL to DescribeProcess WPS-REST location. :param data_source: identifier of the data source to map to specific ADES, or map to localhost if ``None``. :return: URL of EMS or ADES WPS-REST DescribeProcess. .. function:: get_package_workflow_steps(package_dict_or_url) -> List[Dict[AnyStr, AnyStr]] :param package_dict_or_url: process package definition or literal URL to DescribeProcess WPS-REST location. :return: list of workflow steps as {"name": , "reference": } where `name` is the generic package step name, and `reference` is the id/url of a registered WPS package. .. function:: complex2json(data) -> Union[JSON, Any] Obtains the JSON representation of a :class:`ComplexData` or simply return the unmatched type. .. function:: metadata2json(meta, force=False) -> Union[JSON, Any] Obtains the JSON representation of a :class:`OwsMetadata` or :class:`pywps.app.Common.Metadata`. Otherwise, simply return the unmatched type. If requested, can enforce parsing a dictionary for the corresponding keys. .. function:: get_process_definition(process_offering, reference=None, package=None, data_source=None) -> JSON Returns an updated process definition dictionary ready for storage using provided `WPS` ``process_offering`` and a package definition passed by ``reference`` or ``package`` `CWL` content. The returned process information can be used later on to load an instance of :class:`weaver.wps_package.WpsPackage`. :param process_offering: `WPS REST-API` (`WPS-3`) process offering as `JSON`. :param reference: URL to `CWL` package definition, `WPS-1 DescribeProcess` endpoint or `WPS-3 Process` endpoint. :param package: literal `CWL` package definition (`YAML` or `JSON` format). :param data_source: where to resolve process IDs (default: localhost if ``None``). :return: updated process definition with resolved/merged information from ``package``/``reference``. .. py:class:: WpsPackage(**kw) Bases: :class:`pywps.Process` Creates a `WPS-3 Process` instance to execute a `CWL` package definition. Process parameters should be loaded from an existing :class:`weaver.datatype.Process` instance generated using :func:`weaver.wps_package.get_process_definition`. Provided ``kw`` should correspond to :meth:`weaver.datatype.Process.params_wps` .. attribute:: package :annotation: :Optional[CWL] .. attribute:: package_id :annotation: :Optional[AnyStr] .. attribute:: package_log_hook_stderr :annotation: :Optional[AnyStr] .. attribute:: package_log_hook_stdout :annotation: :Optional[AnyStr] .. attribute:: percent :annotation: :Optional[Number] .. attribute:: log_file :annotation: :Optional[AnyStr] .. attribute:: log_level :annotation: :Optional[int] .. attribute:: logger :annotation: :Optional[logging.Logger] .. attribute:: step_packages :annotation: :Optional[List[CWL]] .. attribute:: step_launched :annotation: :Optional[List[AnyStr]] .. attribute:: request :annotation: :Optional[WPSRequest] .. attribute:: response :annotation: :Optional[ExecuteResponse] .. method:: setup_logger(self) Configures useful loggers to catch most of the common output and/or error messages during package execution. .. seealso:: :meth:`insert_package_log` :func:`retrieve_package_job_log` .. method:: insert_package_log(self, result) Retrieves additional `CWL` sub-process logs captures to retrieve internal application output and/or errors. After execution of this method, the `WPS` output log (which can be obtained by :func:`retrieve_package_job_log`) will have additional ``stderr/stdout`` entries extracted from the underlying application package tool execution. The outputs and errors are inserted as best as possible in the logical order to make reading of the merged logs appear as a natural and chronological order. In the event that both output and errors are available, they are appended one after another as merging in an orderly fashion cannot be guaranteed by outside `CWL` runner. :param result: output results returned from the `CWL` package instance execution. .. todo:: improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131) .. seealso:: :meth:`setup_logger` :func:`retrieve_package_job_log` .. method:: update_requirements(self) Inplace modification of :attr:`package` to remove invalid items that would break behaviour we must enforce. .. method:: update_effective_user(self) Update effective user/group for the `Application Package` to be executed. FIXME: (experimental) update user/group permissions Reducing permissions is safer inside docker application since weaver/cwltool could be running as root but this requires that mounted volumes have the required permissions so euid:egid can use them. Overrides :mod:`cwltool`'s function to retrieve user/group id for ones we enforce. .. method:: update_status(self, message, progress, status) Updates the `PyWPS` real job status from a specified parameters. .. method:: step_update_status(self, message, progress, start_step_progress, end_step_progress, step_name, target_host, status) .. method:: log_message(self, status, message, progress=None, level=logging.INFO) .. method:: exception_message(self, exception_type, exception=None, message='no message', status=STATUS_EXCEPTION, level=logging.ERROR) .. classmethod:: map_step_progress(cls, step_index, steps_total) Calculates the percentage progression of a single step of the full process. .. note:: The step procession is adjusted according to delimited start/end of the underlying `CWL` execution to provide a continuous progress percentage over the complete execution. Otherwise, we would have values that jump around according to whichever progress the underlying remote `WPS` or monitored `CWL` employs, if any is provided. .. method:: _handler(self, request, response) .. staticmethod:: make_location_input(input_type, input_definition) Generates the JSON content required to specify a CWL File input definition from a location. .. method:: make_location_outputs(self, cwl_result) Maps `CWL` result outputs to corresponding `WPS` outputs under required location. .. method:: make_tool(self, toolpath_object, loading_context) .. method:: get_job_process_definition(self, jobname, joborder, tool) This function is called before running an ADES job (either from a workflow step or a simple EMS dispatch). It must return a WpsProcess instance configured with the proper package, ADES target and cookies. :param jobname: The workflow step or the package id that must be launch on an ADES :class:`string` :param joborder: The params for the job :class:`dict {input_name: input_value}` input_value is one of `input_object` or `array [input_object]` input_object is one of `string` or `dict {class: File, location: string}` in our case input are expected to be File object :param tool: Whole `CWL` config including hints requirement