Source code for weaver.execute

import logging
from typing import TYPE_CHECKING

from pyramid.httpexceptions import HTTPBadRequest

from weaver.base import Constants
from weaver.exceptions import ProcessInvalidParameter
from weaver.utils import get_header, parse_kvp

if TYPE_CHECKING:
    from typing import List, Optional, Tuple, TypeAlias, Union

    from weaver.datatype import Job
    from weaver.typedefs import AnyHeadersContainer, HeadersType, Literal, PreservedHeadersType

[docs] ExecuteModeAutoType = Literal["auto"]
ExecuteModeAsyncType = Literal["async"] ExecuteModeSyncType = Literal["sync"] ExecuteModeConstantsType: TypeAlias = "ExecuteMode" AnyExecuteMode = Union[ ExecuteModeAutoType, ExecuteModeAsyncType, ExecuteModeSyncType, ExecuteModeConstantsType, ] ExecuteControlOptionAsyncType = Literal["async-execute"] ExecuteControlOptionSyncType = Literal["sync-execute"] ExecuteControlOptionConstantsType: TypeAlias = "ExecuteControlOption" AnyExecuteControlOption = Union[ ExecuteControlOptionAsyncType, ExecuteControlOptionSyncType, ExecuteControlOptionConstantsType, ] ExecuteReturnPreferenceMinimalType = Literal["minimal"] ExecuteReturnPreferenceRepresentationType = Literal["representation"] ExecuteReturnPreferenceConstantsType: TypeAlias = "ExecuteReturnPreference" AnyExecuteReturnPreference = Union[ ExecuteReturnPreferenceMinimalType, ExecuteReturnPreferenceRepresentationType, ExecuteReturnPreferenceConstantsType, ] ExecuteResponseDocumentType = Literal["document"] ExecuteResponseRawType = Literal["raw"] ExecuteResponseConstantsType: TypeAlias = "ExecuteResponse" AnyExecuteResponse = Union[ ExecuteResponseDocumentType, ExecuteResponseRawType, ExecuteResponseConstantsType, ] ExecuteTransmissionModeReferenceType = Literal["reference"] ExecuteTransmissionModeValueType = Literal["value"] ExecuteTransmissionModeConstantsType: TypeAlias = "ExecuteTransmissionMode" AnyExecuteTransmissionMode = Union[ ExecuteTransmissionModeReferenceType, ExecuteTransmissionModeValueType, ExecuteTransmissionModeConstantsType, ] # pylint: disable=C0103,invalid-name ExecuteCollectionFormatType_STAC = Literal["stac-collection"] ExecuteCollectionFormatType_STAC_ITEMS = Literal["stac-items"] ExecuteCollectionFormatType_OGC_COVERAGE = Literal["ogc-coverage-collection"] ExecuteCollectionFormatType_OGC_FEATURES = Literal["ogc-features-collection"] ExecuteCollectionFormatType_OGC_MAP = Literal["ogc-map-collection"] ExecuteCollectionFormatType_GEOJSON = Literal["geojson-feature-collection"] ExecuteCollectionFormatConstantsType: TypeAlias = "ExecuteCollectionFormat" AnyExecuteCollectionFormat = Union[ ExecuteCollectionFormatType_STAC, ExecuteCollectionFormatType_STAC_ITEMS, ExecuteCollectionFormatType_OGC_COVERAGE, ExecuteCollectionFormatType_OGC_FEATURES, ExecuteCollectionFormatType_OGC_MAP, ExecuteCollectionFormatType_GEOJSON, ExecuteCollectionFormatConstantsType, ]
[docs] LOGGER = logging.getLogger(__name__)
[docs] class ExecuteMode(Constants):
[docs] AUTO = "auto" # type: ExecuteModeAutoType
[docs] ASYNC = "async" # type: ExecuteModeAsyncType
[docs] SYNC = "sync" # type: ExecuteModeSyncType
[docs] class ExecuteControlOption(Constants):
[docs] ASYNC = "async-execute" # type: ExecuteControlOptionAsyncType
[docs] SYNC = "sync-execute" # type: ExecuteControlOptionSyncType
@classmethod
[docs] def values(cls): # type: () -> List[AnyExecuteControlOption] """ Return default control options in specific order according to preferred modes for execution by `Weaver`. """ return [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC]
@classmethod
[docs] def from_mode(cls, mode): # type: (Optional[AnyExecuteMode]) -> Optional[ExecuteControlOption] mode = ExecuteMode.get(mode) ctrl = cls.get(f"{mode}-execute") return ctrl
[docs] class ExecuteReturnPreference(Constants):
[docs] MINIMAL = "minimal" # type: ExecuteReturnPreferenceMinimalType
[docs] REPRESENTATION = "representation" # type: ExecuteReturnPreferenceRepresentationType
[docs] class ExecuteResponse(Constants):
[docs] RAW = "raw" # type: ExecuteResponseRawType
[docs] DOCUMENT = "document" # type: ExecuteResponseDocumentType
[docs] class ExecuteTransmissionMode(Constants):
[docs] VALUE = "value" # type: ExecuteTransmissionModeValueType
[docs] REFERENCE = "reference" # type: ExecuteTransmissionModeReferenceType
[docs] class ExecuteCollectionFormat(Constants):
[docs] STAC = "stac-collection" # type: ExecuteCollectionFormatType_STAC
[docs] STAC_ITEMS = "stac-items" # type: ExecuteCollectionFormatType_STAC_ITEMS
[docs] OGC_COVERAGE = "ogc-coverage-collection" # type: ExecuteCollectionFormatType_OGC_COVERAGE
[docs] OGC_FEATURES = "ogc-features-collection" # type: ExecuteCollectionFormatType_OGC_FEATURES
[docs] OGC_MAP = "ogc-map-collection" # type: ExecuteCollectionFormatType_OGC_MAP
[docs] GEOJSON = "geojson-feature-collection" # type: ExecuteCollectionFormatType_GEOJSON
[docs] def parse_prefer_header_return(headers): # type: (AnyHeadersContainer) -> Optional[AnyExecuteReturnPreference] """ Get the return preference if specified. """ prefer_header = get_header("prefer", headers) prefer_params = parse_kvp(prefer_header) prefer_return = prefer_params.get("return") if prefer_return: return ExecuteReturnPreference.get(prefer_return[0])
[docs] def parse_prefer_header_execute_mode( header_container, # type: AnyHeadersContainer supported_modes=None, # type: Optional[List[AnyExecuteControlOption]] wait_max=10, # type: int return_auto=False, # type: bool ): # type: (...) -> Tuple[AnyExecuteMode, Optional[int], HeadersType] """ Obtain execution preference if provided in request headers. .. seealso:: - :term:`OGC API - Processes`: Core, Execution mode < https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execution_mode>`_. This defines all conditions how to handle ``Prefer`` against applicable :term:`Process` description. - :rfc:`7240#section-4.1` HTTP Prefer header ``respond-async`` .. seealso:: If ``Prefer`` format is valid, but server decides it cannot be respected, it can be transparently ignored (:rfc:`7240#section-2`). The server must respond with ``Preference-Applied`` indicating preserved preferences it decided to respect. :param header_container: Request headers to retrieve preference, if any available. :param supported_modes: Execute modes that are permitted for the operation that received the ``Prefer`` header. Resolved mode will respect this constraint following specification requirements of :term:`OGC API - Processes`. :param wait_max: Maximum wait time enforced by the server. If requested wait time is greater, ``wait`` preference will not be applied and will fall back to asynchronous response. :param return_auto: If the resolution ends up being an "auto" selection, the auto-resolved mode, wait-time, etc. are returned by default. Using this option, the "auto" mode will be explicitly returned instead, allowing a mixture of execution mode to be "auto" handled at another time. This is mostly for reporting purposes. :return: Tuple of resolved execution mode, wait time if specified, and header of applied preferences if possible. Maximum wait time indicates duration until synchronous response should fall back to asynchronous response. :raises HTTPBadRequest: If contents of ``Prefer`` are not valid. """ prefer = get_header("prefer", header_container) relevant_modes = [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC] # order important, async default supported_modes = relevant_modes if supported_modes is None else supported_modes supported_modes = [mode for mode in supported_modes if mode in relevant_modes] if not prefer: # /req/core/process-execute-default-execution-mode (A & B) if not supported_modes: return ExecuteMode.ASYNC, None, {} # Weaver's default if len(supported_modes) == 1: mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC wait = None if mode == ExecuteMode.ASYNC else wait_max return mode, wait, {} # /req/core/process-execute-default-execution-mode (C) mode = ExecuteMode.AUTO if return_auto else ExecuteMode.SYNC return mode, wait_max, {} # allow both listing of multiple 'Prefer' headers and single 'Prefer' header with multi-param ';' or ',' separated # see https://www.rfc-editor.org/rfc/rfc7240#section-2 that allows all three variants # it also mentions that duplicates should be ignored in preference of the first occurrence # therefore, merge into a single header where only the first value of corresponding parameter name is considered params = parse_kvp(prefer.replace(";", ","), pair_sep=",", multi_value_sep=None) wait = wait_max if "wait" in params: try: if any(param.isnumeric() for param in params): # 'wait=x,y,z' parsed as 'wait=x' and 'y' / 'z' parameters on their own # since 'wait' is the only referenced that users integers, it is guaranteed to be a misuse raise ValueError("Invalid 'wait' with comma-separated values.") params["wait"] = list(set(params["wait"])) # allow duplicates silently because of extend/merge strategy if not len(params["wait"]) == 1: raise ValueError("Too many 'wait' values.") wait = params["wait"][0] if not str.isnumeric(wait) or "." in wait or wait.startswith("-"): raise ValueError("Invalid integer for 'wait' in seconds.") wait = int(wait) except (TypeError, ValueError) as exc: raise HTTPBadRequest(json={ "code": "InvalidParameterValue", "description": "HTTP Prefer header contains invalid 'wait' definition.", "error": type(exc).__name__, "cause": str(exc), "value": str(params["wait"]), }) if wait > wait_max: LOGGER.info("Requested Prefer wait header too large (%ss > %ss), revert to async execution.", wait, wait_max) return ExecuteMode.ASYNC, None, {} auto = ExecuteMode.ASYNC if "respond-async" in params else ExecuteMode.AUTO applied_preferences = [] # /req/core/process-execute-auto-execution-mode (A & B) if len(supported_modes) == 1: # supported mode is enforced, only indicate if it matches preferences to honour them # otherwise, server is allowed to discard preference since it cannot be honoured mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC wait = None if mode == ExecuteMode.ASYNC else wait if auto in [mode, ExecuteMode.AUTO]: if auto == ExecuteMode.ASYNC: applied_preferences.append("respond-async") if wait and "wait" in params: applied_preferences.append(f"wait={wait}") # /rec/core/process-execute-honor-prefer (A: async & B: wait) # https://datatracker.ietf.org/doc/html/rfc7240#section-3 applied = {} if applied_preferences: applied = {"Preference-Applied": ", ".join(applied_preferences)} return mode, wait, applied # Weaver's default, at server's discretion when both mode are supported # /req/core/process-execute-auto-execution-mode (C) if len(supported_modes) == 2: if auto == ExecuteMode.ASYNC: return ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"} if wait and "wait" in params: return ExecuteMode.SYNC, wait, {"Preference-Applied": f"wait={wait}"} if auto == ExecuteMode.AUTO and return_auto: return ExecuteMode.AUTO, None, {} if wait: # default used, not a supplied preference return ExecuteMode.SYNC, wait, {} return ExecuteMode.ASYNC, None, {}
[docs] def rebuild_prefer_header(job): # type: (Job) -> Optional[str] """ Rebuilds the expected ``Prefer`` header value from :term:`Job` parameters. """ def append_header(header_value, new_value): # type: (str, str) -> str if header_value and new_value: header_value += "; " header_value += new_value return header_value header = "" if job.execution_return: header = append_header(header, f"return={job.execution_return}") if job.execution_wait: header = append_header(header, f"wait={job.execution_wait}") if job.execute_async: header = append_header(header, "respond-async") return header or None
[docs] def update_preference_applied_return_header( job, # type: Job request_headers, # type: Optional[AnyHeadersContainer] response_headers, # type: Optional[PreservedHeadersType] ): # type: (...) -> PreservedHeadersType """ Updates the ``Preference-Applied`` header according to available information. :param job: Job where the desired return preference has be resolved. :param request_headers: Original request headers, to look for any ``Prefer: return``. :param response_headers: Already generated response headers, to extend ``Preference-Applied`` header as needed. :return: Updated response headers with any resolved return preference. """ response_headers = response_headers or {} if not request_headers: return response_headers request_prefer_return = parse_prefer_header_return(request_headers) if not request_prefer_return: return response_headers if job.execution_return != request_prefer_return: return response_headers applied_prefer_header = get_header("Preference-Applied", response_headers) if applied_prefer_header: applied_prefer_header = f"return={request_prefer_return}; {applied_prefer_header}" else: applied_prefer_header = f"return={request_prefer_return}" response_headers.update({"Preference-Applied": applied_prefer_header}) return response_headers
[docs] def resolve_execution_parameters( job_control_options, # type: List[ExecuteControlOption] execute_headers, # type: AnyHeadersContainer execute_mode=None, # type: Optional[AnyExecuteMode] execute_return=None, # type: Optional[AnyExecuteReturnPreference] execute_max_wait=None, # type: Optional[int] ): # type: (...) -> Tuple[AnyExecuteMode, AnyExecuteResponse, AnyHeadersContainer] """ Resolve execution parameters from provided :term:`Job` control options and :term:`Process` execution headers. The execution mode override, if provided, takes precedence over any header preference. If the :term:`Process` only supports a single execution mode, it is enforced regardless of any preference. If that option mismatches the override, an exception is raised. If no override is provided, or no preference is specified in headers, the asynchronous mode will be employed in case of multiple supported modes. .. seealso:: :ref:`proc_exec_body`, :ref:`proc_exec_mode` and :ref:`proc_exec_results` provide combination matrices and details on how execution parameters interact and affect the execution and response strategies. .. note:: Returned parameters include multiple "equivalent" or "redundant" variants to handle both :term:`OGC API - Processes` ``v1`` and ``v2``, and various combinations the servers could implement. :param job_control_options: The allowed execution methods that the :term:`Process` supports. :param execute_headers: Any preestablished headers that could hint a preferred execution mode. :param execute_mode: Explicit execution mode to enforce, if any. :param execute_return: Explicit return preference to enforce, if any. :param execute_max_wait: Maximum wait time for synchronous execution, as applicable. :return: Resolved execution mode and corresponding headers to apply. :raises ProcessInvalidParameter: If the requested execution mode does not respect the supported ones. """ from weaver.wps_restapi.swagger_definitions import OGC_API_PROC_PROFILE_RESULTS_URI exec_headers = execute_headers.copy() exec_headers.setdefault("Prefer", "respond-async") exec_mode = None execute_max_wait = execute_max_wait or 10 if execute_mode: exec_mode = ExecuteMode.get(execute_mode) if exec_mode == ExecuteMode.SYNC: exec_headers["Prefer"] = f"wait={execute_max_wait}" mode, wait, applied = parse_prefer_header_execute_mode(exec_headers, job_control_options, execute_max_wait) if exec_mode and mode != exec_mode: raise ProcessInvalidParameter( f"Requested execution mode '{exec_mode}' does not match supported modes: {job_control_options}." ) if applied: applied["Prefer"] = applied.pop("Preference-Applied") if execute_return: exec_return = ExecuteReturnPreference.get(execute_return) else: exec_return = parse_prefer_header_return(exec_headers) if exec_return: applied["Prefer"] += f"; return={exec_return}" if applied["Prefer"] else f"return={exec_return}" profile = None if mode == ExecuteMode.ASYNC: profile = OGC_API_PROC_PROFILE_RESULTS_URI exec_resp = ExecuteResponse.DOCUMENT elif mode == ExecuteMode.SYNC and exec_return in [ExecuteReturnPreference.MINIMAL, None]: profile = OGC_API_PROC_PROFILE_RESULTS_URI exec_resp = ExecuteResponse.DOCUMENT else: exec_resp = ExecuteResponse.RAW if profile: exec_headers["Accept-Profile"] = profile applied["Prefer"] += f"; profile={profile}" if applied["Prefer"] else f"profile={profile}" return mode, exec_resp, applied