Source code for weaver.execute
import logging
from typing import TYPE_CHECKING
from pyramid.httpexceptions import HTTPBadRequest
from weaver.base import Constants
from weaver.exceptions import ProcessInvalidParameter
from weaver.utils import get_header, parse_kvp
if TYPE_CHECKING:
from typing import List, Optional, Tuple, TypeAlias, Union
from weaver.datatype import Job
from weaver.typedefs import AnyHeadersContainer, HeadersType, Literal, PreservedHeadersType
ExecuteModeAsyncType = Literal["async"]
ExecuteModeSyncType = Literal["sync"]
ExecuteModeConstantsType: TypeAlias = "ExecuteMode"
AnyExecuteMode = Union[
ExecuteModeAutoType,
ExecuteModeAsyncType,
ExecuteModeSyncType,
ExecuteModeConstantsType,
]
ExecuteControlOptionAsyncType = Literal["async-execute"]
ExecuteControlOptionSyncType = Literal["sync-execute"]
ExecuteControlOptionConstantsType: TypeAlias = "ExecuteControlOption"
AnyExecuteControlOption = Union[
ExecuteControlOptionAsyncType,
ExecuteControlOptionSyncType,
ExecuteControlOptionConstantsType,
]
ExecuteReturnPreferenceMinimalType = Literal["minimal"]
ExecuteReturnPreferenceRepresentationType = Literal["representation"]
ExecuteReturnPreferenceConstantsType: TypeAlias = "ExecuteReturnPreference"
AnyExecuteReturnPreference = Union[
ExecuteReturnPreferenceMinimalType,
ExecuteReturnPreferenceRepresentationType,
ExecuteReturnPreferenceConstantsType,
]
ExecuteResponseDocumentType = Literal["document"]
ExecuteResponseRawType = Literal["raw"]
ExecuteResponseConstantsType: TypeAlias = "ExecuteResponse"
AnyExecuteResponse = Union[
ExecuteResponseDocumentType,
ExecuteResponseRawType,
ExecuteResponseConstantsType,
]
ExecuteTransmissionModeReferenceType = Literal["reference"]
ExecuteTransmissionModeValueType = Literal["value"]
ExecuteTransmissionModeConstantsType: TypeAlias = "ExecuteTransmissionMode"
AnyExecuteTransmissionMode = Union[
ExecuteTransmissionModeReferenceType,
ExecuteTransmissionModeValueType,
ExecuteTransmissionModeConstantsType,
]
# pylint: disable=C0103,invalid-name
ExecuteCollectionFormatType_STAC = Literal["stac-collection"]
ExecuteCollectionFormatType_STAC_ITEMS = Literal["stac-items"]
ExecuteCollectionFormatType_OGC_COVERAGE = Literal["ogc-coverage-collection"]
ExecuteCollectionFormatType_OGC_FEATURES = Literal["ogc-features-collection"]
ExecuteCollectionFormatType_OGC_MAP = Literal["ogc-map-collection"]
ExecuteCollectionFormatType_GEOJSON = Literal["geojson-feature-collection"]
ExecuteCollectionFormatConstantsType: TypeAlias = "ExecuteCollectionFormat"
AnyExecuteCollectionFormat = Union[
ExecuteCollectionFormatType_STAC,
ExecuteCollectionFormatType_STAC_ITEMS,
ExecuteCollectionFormatType_OGC_COVERAGE,
ExecuteCollectionFormatType_OGC_FEATURES,
ExecuteCollectionFormatType_OGC_MAP,
ExecuteCollectionFormatType_GEOJSON,
ExecuteCollectionFormatConstantsType,
]
[docs]
class ExecuteControlOption(Constants):
@classmethod
[docs]
def values(cls):
# type: () -> List[AnyExecuteControlOption]
"""
Return default control options in specific order according to preferred modes for execution by `Weaver`.
"""
return [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC]
@classmethod
[docs]
def from_mode(cls, mode):
# type: (Optional[AnyExecuteMode]) -> Optional[ExecuteControlOption]
mode = ExecuteMode.get(mode)
ctrl = cls.get(f"{mode}-execute")
return ctrl
[docs]
def parse_prefer_header_return(headers):
# type: (AnyHeadersContainer) -> Optional[AnyExecuteReturnPreference]
"""
Get the return preference if specified.
"""
prefer_header = get_header("prefer", headers)
prefer_params = parse_kvp(prefer_header)
prefer_return = prefer_params.get("return")
if prefer_return:
return ExecuteReturnPreference.get(prefer_return[0])
[docs]
def parse_prefer_header_execute_mode(
header_container, # type: AnyHeadersContainer
supported_modes=None, # type: Optional[List[AnyExecuteControlOption]]
wait_max=10, # type: int
return_auto=False, # type: bool
): # type: (...) -> Tuple[AnyExecuteMode, Optional[int], HeadersType]
"""
Obtain execution preference if provided in request headers.
.. seealso::
- :term:`OGC API - Processes`: Core, Execution mode <
https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execution_mode>`_.
This defines all conditions how to handle ``Prefer`` against applicable :term:`Process` description.
- :rfc:`7240#section-4.1` HTTP Prefer header ``respond-async``
.. seealso::
If ``Prefer`` format is valid, but server decides it cannot be respected, it can be transparently ignored
(:rfc:`7240#section-2`). The server must respond with ``Preference-Applied`` indicating preserved preferences
it decided to respect.
:param header_container: Request headers to retrieve preference, if any available.
:param supported_modes:
Execute modes that are permitted for the operation that received the ``Prefer`` header.
Resolved mode will respect this constraint following specification requirements of :term:`OGC API - Processes`.
:param wait_max:
Maximum wait time enforced by the server. If requested wait time is greater, ``wait`` preference will not be
applied and will fall back to asynchronous response.
:param return_auto:
If the resolution ends up being an "auto" selection, the auto-resolved mode, wait-time, etc. are returned
by default. Using this option, the "auto" mode will be explicitly returned instead, allowing a mixture of
execution mode to be "auto" handled at another time. This is mostly for reporting purposes.
:return:
Tuple of resolved execution mode, wait time if specified, and header of applied preferences if possible.
Maximum wait time indicates duration until synchronous response should fall back to asynchronous response.
:raises HTTPBadRequest: If contents of ``Prefer`` are not valid.
"""
prefer = get_header("prefer", header_container)
relevant_modes = [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC] # order important, async default
supported_modes = relevant_modes if supported_modes is None else supported_modes
supported_modes = [mode for mode in supported_modes if mode in relevant_modes]
if not prefer:
# /req/core/process-execute-default-execution-mode (A & B)
if not supported_modes:
return ExecuteMode.ASYNC, None, {} # Weaver's default
if len(supported_modes) == 1:
mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC
wait = None if mode == ExecuteMode.ASYNC else wait_max
return mode, wait, {}
# /req/core/process-execute-default-execution-mode (C)
mode = ExecuteMode.AUTO if return_auto else ExecuteMode.SYNC
return mode, wait_max, {}
# allow both listing of multiple 'Prefer' headers and single 'Prefer' header with multi-param ';' or ',' separated
# see https://www.rfc-editor.org/rfc/rfc7240#section-2 that allows all three variants
# it also mentions that duplicates should be ignored in preference of the first occurrence
# therefore, merge into a single header where only the first value of corresponding parameter name is considered
params = parse_kvp(prefer.replace(";", ","), pair_sep=",", multi_value_sep=None)
wait = wait_max
if "wait" in params:
try:
if any(param.isnumeric() for param in params):
# 'wait=x,y,z' parsed as 'wait=x' and 'y' / 'z' parameters on their own
# since 'wait' is the only referenced that users integers, it is guaranteed to be a misuse
raise ValueError("Invalid 'wait' with comma-separated values.")
params["wait"] = list(set(params["wait"])) # allow duplicates silently because of extend/merge strategy
if not len(params["wait"]) == 1:
raise ValueError("Too many 'wait' values.")
wait = params["wait"][0]
if not str.isnumeric(wait) or "." in wait or wait.startswith("-"):
raise ValueError("Invalid integer for 'wait' in seconds.")
wait = int(wait)
except (TypeError, ValueError) as exc:
raise HTTPBadRequest(json={
"code": "InvalidParameterValue",
"description": "HTTP Prefer header contains invalid 'wait' definition.",
"error": type(exc).__name__,
"cause": str(exc),
"value": str(params["wait"]),
})
if wait > wait_max:
LOGGER.info("Requested Prefer wait header too large (%ss > %ss), revert to async execution.", wait, wait_max)
return ExecuteMode.ASYNC, None, {}
auto = ExecuteMode.ASYNC if "respond-async" in params else ExecuteMode.AUTO
applied_preferences = []
# /req/core/process-execute-auto-execution-mode (A & B)
if len(supported_modes) == 1:
# supported mode is enforced, only indicate if it matches preferences to honour them
# otherwise, server is allowed to discard preference since it cannot be honoured
mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC
wait = None if mode == ExecuteMode.ASYNC else wait
if auto in [mode, ExecuteMode.AUTO]:
if auto == ExecuteMode.ASYNC:
applied_preferences.append("respond-async")
if wait and "wait" in params:
applied_preferences.append(f"wait={wait}")
# /rec/core/process-execute-honor-prefer (A: async & B: wait)
# https://datatracker.ietf.org/doc/html/rfc7240#section-3
applied = {}
if applied_preferences:
applied = {"Preference-Applied": ", ".join(applied_preferences)}
return mode, wait, applied
# Weaver's default, at server's discretion when both mode are supported
# /req/core/process-execute-auto-execution-mode (C)
if len(supported_modes) == 2:
if auto == ExecuteMode.ASYNC:
return ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"}
if wait and "wait" in params:
return ExecuteMode.SYNC, wait, {"Preference-Applied": f"wait={wait}"}
if auto == ExecuteMode.AUTO and return_auto:
return ExecuteMode.AUTO, None, {}
if wait: # default used, not a supplied preference
return ExecuteMode.SYNC, wait, {}
return ExecuteMode.ASYNC, None, {}
[docs]
def rebuild_prefer_header(job):
# type: (Job) -> Optional[str]
"""
Rebuilds the expected ``Prefer`` header value from :term:`Job` parameters.
"""
def append_header(header_value, new_value):
# type: (str, str) -> str
if header_value and new_value:
header_value += "; "
header_value += new_value
return header_value
header = ""
if job.execution_return:
header = append_header(header, f"return={job.execution_return}")
if job.execution_wait:
header = append_header(header, f"wait={job.execution_wait}")
if job.execute_async:
header = append_header(header, "respond-async")
return header or None
[docs]
def update_preference_applied_return_header(
job, # type: Job
request_headers, # type: Optional[AnyHeadersContainer]
response_headers, # type: Optional[PreservedHeadersType]
): # type: (...) -> PreservedHeadersType
"""
Updates the ``Preference-Applied`` header according to available information.
:param job: Job where the desired return preference has be resolved.
:param request_headers: Original request headers, to look for any ``Prefer: return``.
:param response_headers: Already generated response headers, to extend ``Preference-Applied`` header as needed.
:return: Updated response headers with any resolved return preference.
"""
response_headers = response_headers or {}
if not request_headers:
return response_headers
request_prefer_return = parse_prefer_header_return(request_headers)
if not request_prefer_return:
return response_headers
if job.execution_return != request_prefer_return:
return response_headers
applied_prefer_header = get_header("Preference-Applied", response_headers)
if applied_prefer_header:
applied_prefer_header = f"return={request_prefer_return}; {applied_prefer_header}"
else:
applied_prefer_header = f"return={request_prefer_return}"
response_headers.update({"Preference-Applied": applied_prefer_header})
return response_headers
[docs]
def resolve_execution_parameters(
job_control_options, # type: List[ExecuteControlOption]
execute_headers, # type: AnyHeadersContainer
execute_mode=None, # type: Optional[AnyExecuteMode]
execute_return=None, # type: Optional[AnyExecuteReturnPreference]
execute_max_wait=None, # type: Optional[int]
): # type: (...) -> Tuple[AnyExecuteMode, AnyExecuteResponse, AnyHeadersContainer]
"""
Resolve execution parameters from provided :term:`Job` control options and :term:`Process` execution headers.
The execution mode override, if provided, takes precedence over any header preference.
If the :term:`Process` only supports a single execution mode, it is enforced regardless of any preference.
If that option mismatches the override, an exception is raised.
If no override is provided, or no preference is specified in headers, the asynchronous mode will be employed
in case of multiple supported modes.
.. seealso::
:ref:`proc_exec_body`, :ref:`proc_exec_mode` and :ref:`proc_exec_results` provide combination matrices
and details on how execution parameters interact and affect the execution and response strategies.
.. note::
Returned parameters include multiple "equivalent" or "redundant" variants to handle
both :term:`OGC API - Processes` ``v1`` and ``v2``, and various combinations the servers could implement.
:param job_control_options: The allowed execution methods that the :term:`Process` supports.
:param execute_headers: Any preestablished headers that could hint a preferred execution mode.
:param execute_mode: Explicit execution mode to enforce, if any.
:param execute_return: Explicit return preference to enforce, if any.
:param execute_max_wait: Maximum wait time for synchronous execution, as applicable.
:return: Resolved execution mode and corresponding headers to apply.
:raises ProcessInvalidParameter: If the requested execution mode does not respect the supported ones.
"""
from weaver.wps_restapi.swagger_definitions import OGC_API_PROC_PROFILE_RESULTS_URI
exec_headers = execute_headers.copy()
exec_headers.setdefault("Prefer", "respond-async")
exec_mode = None
execute_max_wait = execute_max_wait or 10
if execute_mode:
exec_mode = ExecuteMode.get(execute_mode)
if exec_mode == ExecuteMode.SYNC:
exec_headers["Prefer"] = f"wait={execute_max_wait}"
mode, wait, applied = parse_prefer_header_execute_mode(exec_headers, job_control_options, execute_max_wait)
if exec_mode and mode != exec_mode:
raise ProcessInvalidParameter(
f"Requested execution mode '{exec_mode}' does not match supported modes: {job_control_options}."
)
if applied:
applied["Prefer"] = applied.pop("Preference-Applied")
if execute_return:
exec_return = ExecuteReturnPreference.get(execute_return)
else:
exec_return = parse_prefer_header_return(exec_headers)
if exec_return:
applied["Prefer"] += f"; return={exec_return}" if applied["Prefer"] else f"return={exec_return}"
profile = None
if mode == ExecuteMode.ASYNC:
profile = OGC_API_PROC_PROFILE_RESULTS_URI
exec_resp = ExecuteResponse.DOCUMENT
elif mode == ExecuteMode.SYNC and exec_return in [ExecuteReturnPreference.MINIMAL, None]:
profile = OGC_API_PROC_PROFILE_RESULTS_URI
exec_resp = ExecuteResponse.DOCUMENT
else:
exec_resp = ExecuteResponse.RAW
if profile:
exec_headers["Accept-Profile"] = profile
applied["Prefer"] += f"; profile={profile}" if applied["Prefer"] else f"profile={profile}"
return mode, exec_resp, applied