weaver.processes.execution

Module Contents

weaver.processes.execution.LOGGER[source]
class weaver.processes.execution.JobProgress[source]

Job process execution progress.

SETUP = 1[source]
DESCRIBE = 2[source]
GET_INPUTS = 3[source]
GET_OUTPUTS = 4[source]
EXECUTE_REQUEST = 5[source]
EXECUTE_STATUS_LOCATION = 6[source]
EXECUTE_MONITOR_START = 7[source]
EXECUTE_MONITOR_LOOP = 8[source]
EXECUTE_MONITOR_DONE = 96[source]
EXECUTE_MONITOR_END = 98[source]
NOTIFY = 99[source]
DONE = 100[source]
weaver.processes.execution.execute_process(task: celery.app.task.Task, job_id: uuid.UUID, wps_url: str, headers: weaver.typedefs.HeaderCookiesType | None = None) weaver.status.StatusType[source]

Celery task that executes the WPS process job monitoring as status updates (local and remote).

weaver.processes.execution.collect_statistics(process: psutil.Process | None, settings: weaver.typedefs.SettingsType | None = None, job: weaver.datatype.Job | None = None, rss_start: int | None = None) weaver.typedefs.Statistics | None[source]

Collect any available execution statistics and store them in the Job if provided.

weaver.processes.execution.fetch_wps_process(job: weaver.datatype.Job, wps_url: str, headers: weaver.typedefs.HeadersType, settings: weaver.typedefs.SettingsType) weaver.processes.convert.ProcessOWS[source]

Retrieves the WPS process description from the local or remote WPS reference URL.

weaver.processes.execution.parse_wps_input_format(input_info: weaver.typedefs.JSON, type_field: str = 'mime_type', search_variations: bool = True) Tuple[str | None, str | None][source]
weaver.processes.execution.parse_wps_input_complex(input_value: str | weaver.typedefs.JSON, input_info: weaver.typedefs.JSON) owslib.wps.ComplexDataInput[source]

Parse the input data details into a complex input.

weaver.processes.execution.parse_wps_input_bbox(input_value: str | weaver.typedefs.JobValueBbox, input_info: weaver.typedefs.JSON) owslib.wps.BoundingBoxDataInput[source]

Parse the input data details into a bounding box input.

weaver.processes.execution.parse_wps_input_literal(input_value: weaver.typedefs.AnyValueType | weaver.typedefs.JSON) str | None[source]

Parse the input data details into a literal input.

weaver.processes.execution.parse_wps_inputs(wps_process: weaver.processes.convert.ProcessOWS, job: weaver.datatype.Job) List[Tuple[str, weaver.processes.convert.OWS_Input_Type]][source]

Parses expected WPS process inputs against submitted job input values considering supported definitions.

According to the structure of the job inputs, and notably their key arguments, perform the relevant parsing and data retrieval to prepare inputs in a native format that can be understood and employed by a WPS worker (i.e.: weaver.wps.service.WorkerService and its underlying pywps implementation).

weaver.processes.execution.make_results_relative(results: List[weaver.typedefs.JSON], settings: weaver.typedefs.SettingsType) List[weaver.typedefs.JSON][source]

Converts file references to a pseudo-relative location to allow the application to dynamically generate paths.

Redefines job results to be saved in database as pseudo-relative paths to configured WPS output directory. This allows the application to easily adjust the exposed result HTTP path according to the service configuration (i.e.: relative to weaver.wps_output_dir and/or weaver.wps_output_url) and it also avoids rewriting the database job results entry if those settings are changed later on following reboot of the web application.

Only references prefixed with weaver.wps_output_dir, weaver.wps_output_url or a corresponding resolution from weaver.wps_output_path with weaver.url will be modified to pseudo-relative paths. Other references (file/URL endpoints that do not correspond to Weaver) will be left untouched for literal remote reference. Results that do not correspond to a reference are also unmodified.

Note

The references are not real relative paths (i.e.: starting with ./), as those could also be specified as input, and there would be no way to guarantee proper differentiation from paths already handled and stored in the database. Instead, pseudo-relative paths employ an explicit absolute-like path (i.e.: starting with /) and are assumed to always require to be prefixed by the configured WPS locations (i.e.: weaver.wps_output_dir or weaver.wps_output_url based on local or HTTP response context).

With this approach, data persistence with mapped volumes into the dockerized Weaver service can be placed anywhere at convenience. This is important because sibling docker execution require exact mappings such that volume mount /data/path:/data/path resolve correctly on both sides (host and image path must be identical). If volumes get remapped differently, ensuring that weaver.wps_output_dir setting follows the same remapping update will automatically resolve to the proper location for both local references and exposed URL endpoints.

Parameters:
  • results – JSON mapping of data results as {"<id>": <definition>} entries where a reference can be found.

  • settings – container to retrieve current application settings.

weaver.processes.execution.map_locations(job: weaver.datatype.Job, settings: weaver.typedefs.SettingsType) None[source]

Maps directory locations between pywps process execution and produced jobs storage.

Generates symlink references from the Job UUID to PyWPS UUID results (outputs directory, status and log locations). Update the Job’s WPS ID if applicable (job executed locally). Assumes that all results are located under the same reference UUID.

weaver.processes.execution.submit_job(request: pyramid.request.Request, reference: weaver.datatype.Service | weaver.datatype.Process, tags: List[str] | None = None) weaver.typedefs.AnyResponseType[source]

Generates the job submission from details retrieved in the request.

See also

submit_job_handler() to provide elements pre-extracted from requests or from other parsing.

weaver.processes.execution.submit_job_handler(payload: weaver.typedefs.ProcessExecution, settings: weaver.typedefs.SettingsType, service_url: str, provider: weaver.typedefs.AnyServiceRef | None = None, process: weaver.typedefs.AnyProcessRef = None, is_workflow: bool = False, is_local: bool = True, visibility: weaver.visibility.AnyVisibility | None = None, language: str | None = None, headers: weaver.typedefs.HeaderCookiesType | None = None, tags: List[str] | None = None, user: int | None = None, context: str | None = None) weaver.typedefs.AnyResponseType[source]

Submits the job to the Celery worker with provided parameters.

Assumes that parameters have been pre-fetched and validated, except for the input payload.

weaver.processes.execution.validate_job_accept_header(headers: weaver.typedefs.AnyHeadersContainer, execution_mode: weaver.execute.AnyExecuteMode) str | None[source]

Validate that the submitted Accept header is permitted.

weaver.processes.execution.validate_process_io(process: weaver.datatype.Process, payload: weaver.typedefs.ProcessExecution) None[source]

Preemptively verify submitted parameters for execution against expected process definition.

Verify missing inputs or obvious type mismatches, but nothing too over-complicated. The ideas behind this function is to avoid unnecessary assignation of celery worker and Docker resources that would be guaranteed to fail as soon as the process execution started.

This function is NOT intended to catch all erroneous inputs, nor validate their values. For example, out-of-range values or unreachable file reference URLs are not guaranteed. However, basic checks such as unacceptable types or cardinality can be performed. Assumes that schema pre-validation was accomplished to minimally guarantee that the structure is valid.

Parameters:
  • process – Process description that provides expected inputs and outputs.

  • payload – Submitted job execution body.

Raises:

HTTPException – Corresponding error for detected invalid combination of inputs or outputs.