weaver.processes.execution
Module Contents
- weaver.processes.execution.execute_process(task: celery.app.task.Task, job_id: uuid.UUID, wps_url: str, headers: weaver.typedefs.HeaderCookiesType | None = None) weaver.status.StatusType [source]
Celery task that executes the WPS process job monitoring as status updates (local and remote).
- weaver.processes.execution.collect_statistics(process: psutil.Process | None, settings: weaver.typedefs.SettingsType | None = None, job: weaver.datatype.Job | None = None, rss_start: int | None = None) weaver.typedefs.Statistics | None [source]
Collect any available execution statistics and store them in the Job if provided.
- weaver.processes.execution.fetch_wps_process(job: weaver.datatype.Job, wps_url: str, headers: weaver.typedefs.HeadersType, settings: weaver.typedefs.SettingsType) weaver.processes.convert.ProcessOWS [source]
Retrieves the WPS process description from the local or remote WPS reference URL.
- weaver.processes.execution.parse_wps_input_format(input_info: weaver.typedefs.JSON, type_field: str = 'mime_type', search_variations: bool = True) Tuple[str | None, str | None] [source]
- weaver.processes.execution.parse_wps_input_complex(input_value: str | weaver.typedefs.JSON, input_info: weaver.typedefs.JSON) owslib.wps.ComplexDataInput [source]
Parse the input data details into a complex input.
- weaver.processes.execution.parse_wps_input_bbox(input_value: str | weaver.typedefs.JobValueBbox, input_info: weaver.typedefs.JSON) owslib.wps.BoundingBoxDataInput [source]
Parse the input data details into a bounding box input.
- weaver.processes.execution.parse_wps_input_literal(input_value: weaver.typedefs.AnyValueType | weaver.typedefs.JSON) str | None [source]
Parse the input data details into a literal input.
- weaver.processes.execution.log_and_save_update_status_handler(job: weaver.datatype.Job, container: weaver.typedefs.AnyDatabaseContainer, update_status: Callable[[weaver.status.AnyStatusType], weaver.status.StatusType] = None, update_progress: Callable[[weaver.typedefs.Number], weaver.typedefs.Number] = None) weaver.typedefs.UpdateStatusPartialFunction [source]
Creates a Job status update function that will immediately reflect the log message in the database.
When log messages are generated and saved in the Job, those details are not persisted to the database until the updated Job is entirely pushed to the database store. This causes clients querying the Job endpoints to not receive any latest update from performed operations until the execution returns to the main worker monitoring loop, which will typically perform a Job update “at some point”.
Using this handler, each time a message is pushed to the Job, that update is also persisted by maintaining a local database connection handle. However, because updating the entire Job each time can become costly and inefficient for multiple subsequent logs, this operation should be applied only on “important milestones” of the execution steps. Any intermediate/subsequent logs should use the usual
Job.save_log()
to “accumulate” the log messages for a following “batch update” of the Job.- Parameters:
job – Reference Job for which the status will be updated and saved with uncommitted log entries.
container – Container to retrieve the database connection.
update_status – Function to apply override status update operations. Skipped if omitted.
update_progress – Function to apply override progress update operations. Skipped if omitted.
- weaver.processes.execution.parse_wps_inputs(wps_process: weaver.processes.convert.ProcessOWS, job: weaver.datatype.Job, container: weaver.typedefs.AnyDatabaseContainer | None = None) List[Tuple[str, weaver.processes.convert.OWS_Input_Type]] [source]
Parses expected WPS process inputs against submitted job input values considering supported definitions.
According to the structure of the job inputs, and notably their key arguments, perform the relevant parsing and data retrieval to prepare inputs in a native format that can be understood and employed by a WPS worker (i.e.:
weaver.wps.service.WorkerService
and its underlyingpywps
implementation).
- weaver.processes.execution.make_results_relative(results: List[weaver.typedefs.JSON], settings: weaver.typedefs.SettingsType) List[weaver.typedefs.JSON] [source]
Converts file references to a pseudo-relative location to allow the application to dynamically generate paths.
Redefines job results to be saved in database as pseudo-relative paths to configured WPS output directory. This allows the application to easily adjust the exposed result HTTP path according to the service configuration (i.e.: relative to
weaver.wps_output_dir
and/orweaver.wps_output_url
) and it also avoids rewriting the database job results entry if those settings are changed later on following reboot of the web application.Only references prefixed with
weaver.wps_output_dir
,weaver.wps_output_url
or a corresponding resolution fromweaver.wps_output_path
withweaver.url
will be modified to pseudo-relative paths. Other references (file/URL endpoints that do not correspond to Weaver) will be left untouched for literal remote reference. Results that do not correspond to a reference are also unmodified.Note
The references are not real relative paths (i.e.: starting with
./
), as those could also be specified as input, and there would be no way to guarantee proper differentiation from paths already handled and stored in the database. Instead, pseudo-relative paths employ an explicit absolute-like path (i.e.: starting with/
) and are assumed to always require to be prefixed by the configured WPS locations (i.e.:weaver.wps_output_dir
orweaver.wps_output_url
based on local or HTTP response context).With this approach, data persistence with mapped volumes into the dockerized Weaver service can be placed anywhere at convenience. This is important because sibling docker execution require exact mappings such that volume mount
/data/path:/data/path
resolve correctly on both sides (host and image path must be identical). If volumes get remapped differently, ensuring thatweaver.wps_output_dir
setting follows the same remapping update will automatically resolve to the proper location for both local references and exposed URL endpoints.- Parameters:
results – JSON mapping of data results as
{"<id>": <definition>}
entries where a reference can be found.settings – container to retrieve current application settings.
- weaver.processes.execution.map_locations(job: weaver.datatype.Job, settings: weaver.typedefs.SettingsType) None [source]
Maps directory locations between
pywps
process execution and produced jobs storage.Generates symlink references from the Job UUID to PyWPS UUID results (outputs directory, status and log locations). Update the Job’s WPS ID if applicable (job executed locally). Assumes that all results are located under the same reference UUID.
- weaver.processes.execution.submit_job_dispatch_wps(request: pyramid.request.Request, process: weaver.datatype.Process) weaver.typedefs.AnyViewResponse [source]
Dispatch a XML request to the relevant Process handler using the WPS endpoint.
Sends the XML request to the WPS endpoint which knows how to parse it properly. Execution will end up in the same
submit_job_handler()
function as for OGC API - Processes JSON execution.Warning
The function assumes that XML was pre-validated as present in the
request
.
- weaver.processes.execution.submit_job(request: pyramid.request.Request, reference: weaver.datatype.Service | weaver.datatype.Process, tags: List[str] | None = None, process_id: str | None = None) weaver.typedefs.AnyResponseType [source]
Generates the job submission from details retrieved in the request.
See also
submit_job_handler()
to provide elements pre-extracted from requests or from other parsing.
- weaver.processes.execution.submit_job_handler(payload: weaver.typedefs.ProcessExecution, settings: weaver.typedefs.SettingsType, wps_url: str, provider: weaver.typedefs.AnyServiceRef | None = None, process: weaver.typedefs.AnyProcessRef = None, is_workflow: bool = False, is_local: bool = True, visibility: weaver.visibility.AnyVisibility | None = None, language: str | None = None, headers: weaver.typedefs.HeaderCookiesType | None = None, tags: List[str] | None = None, user: int | None = None, context: str | None = None) weaver.typedefs.AnyResponseType [source]
Parses parameters that defines the submitted Job, and responds accordingly with the selected execution mode.
Assumes that parameters have been pre-fetched and validated, except for the
payload
containing the desired inputs and outputs from the Job. The selected execution mode looks up the various combinations of headers and body parameters available across API implementations and revisions.
- weaver.processes.execution.submit_job_dispatch_task(job: weaver.datatype.Job, *, container: weaver.typedefs.AnySettingsContainer, headers: weaver.typedefs.AnyHeadersContainer = None, force_submit: bool = False) weaver.typedefs.AnyResponseType [source]
Submits the Job to the
celery
worker with provided parameters.Assumes that parameters have been pre-fetched, validated, and can be resolved from the Job.
- weaver.processes.execution.update_job_parameters(job: weaver.datatype.Job, request: pyramid.request.Request) None [source]
Updates an existing Job with new request parameters.
- weaver.processes.execution.validate_job_json(request: pyramid.request.Request) weaver.typedefs.JSON [source]
Validates that the request contains valid JSON contents, but not necessary valid against expected schema.
See also
- weaver.processes.execution.validate_job_schema(payload: Any, body_schema: Type[weaver.wps_restapi.swagger_definitions.Execute] | Type[weaver.wps_restapi.swagger_definitions.PatchJobBodySchema] = sd.Execute) weaver.typedefs.ProcessExecution [source]
Validates that the input Job payload is valid JSON for an execution request.
- weaver.processes.execution.validate_job_accept_header(headers: weaver.typedefs.AnyHeadersContainer, execution_mode: weaver.execute.AnyExecuteMode) str | None [source]
Validate that the submitted
Accept
header is permitted.
- weaver.processes.execution.validate_process_exec_mode(job_control_options: List[weaver.execute.AnyExecuteControlOption], execution_mode: weaver.execute.AnyExecuteMode | None) None [source]
Verify that a certain Job execution mode fulfills the Process
jobControlOptions
prerequisite.Assumes that any applicable resolution of the Job execution mode (header, query, body, etc.) and the relevant control options was already performed by any applicable upstream operations.
See also
parse_prefer_header_execute_mode()
- Raises:
HTTPUnprocessableEntity – If the execution mode is not permitted by the Process.
- weaver.processes.execution.validate_process_id(job_process: weaver.datatype.Process, payload: weaver.typedefs.ProcessExecution) None [source]
Validates that the specified
process
in the payload corresponds to the referenced Job Process.If not ``process```is specified, no check is performed. The Job is assumed to have pre-validated that the Process is appropriate from another reference, such as using the ID from the path or a query parameter.
- Raises:
HTTPException – Corresponding error for detected invalid combination of process references.
- weaver.processes.execution.validate_process_io(process: weaver.datatype.Process, payload: weaver.typedefs.ProcessExecution) None [source]
Preemptively verify submitted parameters for execution against expected process definition.
Verify missing inputs or obvious type mismatches, but nothing too over-complicated. The ideas behind this function is to avoid unnecessary assignation of
celery
worker and Docker resources that would be guaranteed to fail as soon as the process execution started.This function is NOT intended to catch all erroneous inputs, nor validate their values. For example, out-of-range values or unreachable file reference URLs are not guaranteed. However, basic checks such as unacceptable types or cardinality can be performed. Assumes that schema pre-validation was accomplished to minimally guarantee that the structure is valid.
- Parameters:
process – Process description that provides expected inputs and outputs.
payload – Submitted job execution body.
- Raises:
HTTPException – Corresponding error for detected invalid combination of inputs or outputs.