weaver.processes.execution ========================== .. py:module:: weaver.processes.execution Module Contents --------------- .. py:data:: LOGGER .. py:class:: JobProgress Job process execution progress. .. py:attribute:: SETUP :value: 1 .. py:attribute:: DESCRIBE :value: 2 .. py:attribute:: GET_INPUTS :value: 3 .. py:attribute:: GET_OUTPUTS :value: 10 .. py:attribute:: EXECUTE_REQUEST :value: 11 .. py:attribute:: EXECUTE_STATUS_LOCATION :value: 12 .. py:attribute:: EXECUTE_MONITOR_START :value: 13 .. py:attribute:: EXECUTE_MONITOR_LOOP :value: 14 .. py:attribute:: EXECUTE_MONITOR_DONE :value: 96 .. py:attribute:: EXECUTE_MONITOR_END :value: 98 .. py:attribute:: NOTIFY :value: 99 .. py:attribute:: DONE :value: 100 .. py:function:: execute_process(task: celery.app.task.Task, job_id: uuid.UUID, wps_url: str, headers: Optional[weaver.typedefs.HeaderCookiesType] = None) -> weaver.status.StatusType Celery task that executes the WPS process job monitoring as status updates (local and remote). .. py:function:: collect_statistics(process: Optional[psutil.Process], settings: Optional[weaver.typedefs.SettingsType] = None, job: Optional[weaver.datatype.Job] = None, rss_start: Optional[int] = None) -> Optional[weaver.typedefs.Statistics] Collect any available execution statistics and store them in the :term:`Job` if provided. .. py:function:: fetch_wps_process(job: weaver.datatype.Job, wps_url: str, headers: weaver.typedefs.HeadersType, settings: weaver.typedefs.SettingsType) -> weaver.processes.convert.ProcessOWS Retrieves the WPS process description from the local or remote WPS reference URL. .. py:function:: parse_wps_input_format(input_info: weaver.typedefs.JSON, type_field: str = 'mime_type', search_variations: bool = True) -> Tuple[Optional[str], Optional[str]] .. py:function:: parse_wps_input_complex(input_value: Union[str, weaver.typedefs.JSON], input_info: weaver.typedefs.JSON) -> owslib.wps.ComplexDataInput Parse the input data details into a complex input. .. py:function:: parse_wps_input_bbox(input_value: Union[str, weaver.typedefs.JobValueBbox], input_info: weaver.typedefs.JSON) -> owslib.wps.BoundingBoxDataInput Parse the input data details into a bounding box input. .. py:function:: parse_wps_input_literal(input_value: Union[weaver.typedefs.AnyValueType, weaver.typedefs.JSON]) -> Optional[str] Parse the input data details into a literal input. .. py:function:: log_and_save_update_status_handler(job: weaver.datatype.Job, container: weaver.typedefs.AnyDatabaseContainer, update_status: Callable[[weaver.status.AnyStatusType], weaver.status.StatusType] = None, update_progress: Callable[[weaver.typedefs.Number], weaver.typedefs.Number] = None) -> weaver.typedefs.UpdateStatusPartialFunction Creates a :term:`Job` status update function that will immediately reflect the log message in the database. When log messages are generated and saved in the :term:`Job`, those details are not persisted to the database until the updated :term:`Job` is entirely pushed to the database store. This causes clients querying the :term:`Job` endpoints to not receive any latest update from performed operations until the execution returns to the main worker monitoring loop, which will typically perform a :term:`Job` update "at some point". Using this handler, each time a message is pushed to the :term:`Job`, that update is also persisted by maintaining a local database connection handle. However, because updating the entire :term:`Job` each time can become costly and inefficient for multiple subsequent logs, this operation should be applied only on "important milestones" of the execution steps. Any intermediate/subsequent logs should use the usual :meth:`Job.save_log` to "accumulate" the log messages for a following "batch update" of the :term:`Job`. :param job: Reference :term:`Job` for which the status will be updated and saved with uncommitted log entries. :param container: Container to retrieve the database connection. :param update_status: Function to apply override status update operations. Skipped if omitted. :param update_progress: Function to apply override progress update operations. Skipped if omitted. .. py:function:: parse_wps_inputs(wps_process: weaver.processes.convert.ProcessOWS, job: weaver.datatype.Job, container: Optional[weaver.typedefs.AnyDatabaseContainer] = None) -> List[Tuple[str, weaver.processes.convert.OWS_Input_Type]] Parses expected :term:`WPS` process inputs against submitted job input values considering supported definitions. According to the structure of the job inputs, and notably their key arguments, perform the relevant parsing and data retrieval to prepare inputs in a native format that can be understood and employed by a :term:`WPS` worker (i.e.: :class:`weaver.wps.service.WorkerService` and its underlying :mod:`pywps` implementation). .. py:function:: make_results_relative(results: List[weaver.typedefs.JSON], settings: weaver.typedefs.SettingsType) -> List[weaver.typedefs.JSON] Converts file references to a pseudo-relative location to allow the application to dynamically generate paths. Redefines job results to be saved in database as pseudo-relative paths to configured WPS output directory. This allows the application to easily adjust the exposed result HTTP path according to the service configuration (i.e.: relative to ``weaver.wps_output_dir`` and/or ``weaver.wps_output_url``) and it also avoids rewriting the database job results entry if those settings are changed later on following reboot of the web application. Only references prefixed with ``weaver.wps_output_dir``, ``weaver.wps_output_url`` or a corresponding resolution from ``weaver.wps_output_path`` with ``weaver.url`` will be modified to pseudo-relative paths. Other references (file/URL endpoints that do not correspond to `Weaver`) will be left untouched for literal remote reference. Results that do not correspond to a reference are also unmodified. .. note:: The references are not *real* relative paths (i.e.: starting with ``./``), as those could also be specified as input, and there would be no way to guarantee proper differentiation from paths already handled and stored in the database. Instead, *pseudo-relative* paths employ an explicit *absolute*-like path (i.e.: starting with ``/``) and are assumed to always require to be prefixed by the configured WPS locations (i.e.: ``weaver.wps_output_dir`` or ``weaver.wps_output_url`` based on local or HTTP response context). With this approach, data persistence with mapped volumes into the dockerized `Weaver` service can be placed anywhere at convenience. This is important because sibling docker execution require exact mappings such that volume mount ``/data/path:/data/path`` resolve correctly on both sides (host and image path must be identical). If volumes get remapped differently, ensuring that ``weaver.wps_output_dir`` setting follows the same remapping update will automatically resolve to the proper location for both local references and exposed URL endpoints. :param results: JSON mapping of data results as ``{"": }`` entries where a reference can be found. :param settings: container to retrieve current application settings. .. py:function:: map_locations(job: weaver.datatype.Job, settings: weaver.typedefs.SettingsType) -> None Maps directory locations between :mod:`pywps` process execution and produced jobs storage. Generates symlink references from the Job UUID to PyWPS UUID results (outputs directory, status and log locations). Update the Job's WPS ID if applicable (job executed locally). Assumes that all results are located under the same reference UUID. .. py:function:: submit_job_dispatch_wps(request: pyramid.request.Request, process: weaver.datatype.Process) -> weaver.typedefs.AnyViewResponse Dispatch a :term:`XML` request to the relevant :term:`Process` handler using the :term:`WPS` endpoint. Sends the :term:`XML` request to the :term:`WPS` endpoint which knows how to parse it properly. Execution will end up in the same :func:`submit_job_handler` function as for :term:`OGC API - Processes` :term:`JSON` execution. .. warning:: The function assumes that :term:`XML` was pre-validated as present in the :paramref:`request`. .. py:function:: submit_job(request: pyramid.request.Request, reference: Union[weaver.datatype.Service, weaver.datatype.Process], tags: Optional[List[str]] = None, process_id: Optional[str] = None) -> weaver.typedefs.AnyResponseType Generates the job submission from details retrieved in the request. .. seealso:: :func:`submit_job_handler` to provide elements pre-extracted from requests or from other parsing. .. py:function:: submit_job_handler(payload: weaver.typedefs.ProcessExecution, settings: weaver.typedefs.SettingsType, wps_url: str, provider: Optional[weaver.typedefs.AnyServiceRef] = None, process: weaver.typedefs.AnyProcessRef = None, is_workflow: bool = False, is_local: bool = True, visibility: Optional[weaver.visibility.AnyVisibility] = None, language: Optional[str] = None, headers: Optional[weaver.typedefs.HeaderCookiesType] = None, tags: Optional[List[str]] = None, user: Optional[int] = None, context: Optional[str] = None) -> weaver.typedefs.AnyResponseType Parses parameters that defines the submitted :term:`Job`, and responds accordingly with the selected execution mode. Assumes that parameters have been pre-fetched and validated, except for the :paramref:`payload` containing the desired inputs and outputs from the :term:`Job`. The selected execution mode looks up the various combinations of headers and body parameters available across :term:`API` implementations and revisions. .. py:function:: submit_job_dispatch_task(job: weaver.datatype.Job, *, container: weaver.typedefs.AnySettingsContainer, headers: weaver.typedefs.AnyHeadersContainer = None, force_submit: bool = False) -> weaver.typedefs.AnyResponseType Submits the :term:`Job` to the :mod:`celery` worker with provided parameters. Assumes that parameters have been pre-fetched, validated, and can be resolved from the :term:`Job`. .. py:function:: update_job_parameters(job: weaver.datatype.Job, request: pyramid.request.Request) -> None Updates an existing :term:`Job` with new request parameters. .. py:function:: validate_job_json(request: pyramid.request.Request) -> weaver.typedefs.JSON Validates that the request contains valid :term:`JSON` contents, but not necessary valid against expected schema. .. seealso:: :func:`validate_job_schema` .. py:function:: validate_job_schema(payload: Any, body_schema: Union[Type[weaver.wps_restapi.swagger_definitions.Execute], Type[weaver.wps_restapi.swagger_definitions.PatchJobBodySchema]] = sd.Execute) -> weaver.typedefs.ProcessExecution Validates that the input :term:`Job` payload is valid :term:`JSON` for an execution request. .. py:function:: validate_job_accept_header(headers: weaver.typedefs.AnyHeadersContainer, execution_mode: weaver.execute.AnyExecuteMode) -> Optional[str] Validate that the submitted ``Accept`` header is permitted. .. py:function:: validate_process_exec_mode(job_control_options: List[weaver.execute.AnyExecuteControlOption], execution_mode: Optional[weaver.execute.AnyExecuteMode]) -> None Verify that a certain :term:`Job` execution mode fulfills the :term:`Process` ``jobControlOptions`` prerequisite. Assumes that any applicable resolution of the :term:`Job` execution mode (header, query, body, etc.) and the relevant control options was already performed by any applicable upstream operations. .. seealso:: - :ref:`proc_exec_mode` - :func:`parse_prefer_header_execute_mode` :raises HTTPUnprocessableEntity: If the execution mode is not permitted by the :term:`Process`. .. py:function:: validate_process_id(job_process: weaver.datatype.Process, payload: weaver.typedefs.ProcessExecution) -> None Validates that the specified ``process`` in the payload corresponds to the referenced :term:`Job` :term:`Process`. If not ``process```is specified, no check is performed. The :term:`Job` is assumed to have pre-validated that the :term:`Process` is appropriate from another reference, such as using the ID from the path or a query parameter. :raises HTTPException: Corresponding error for detected invalid combination of process references. .. py:function:: validate_process_io(process: weaver.datatype.Process, payload: weaver.typedefs.ProcessExecution) -> None Preemptively verify submitted parameters for execution against expected process definition. Verify missing inputs or obvious type mismatches, but nothing too over-complicated. The ideas behind this function is to avoid unnecessary assignation of :mod:`celery` worker and :term:`Docker` resources that would be guaranteed to fail as soon as the process execution started. This function is **NOT** intended to catch all erroneous inputs, nor validate their values. For example, out-of-range values or unreachable file reference URLs are not guaranteed. However, basic checks such as unacceptable types or cardinality can be performed. Assumes that schema pre-validation was accomplished to minimally guarantee that the structure is valid. :param process: Process description that provides expected inputs and outputs. :param payload: Submitted job execution body. :raises HTTPException: Corresponding error for detected invalid combination of inputs or outputs.