Source code for weaver.typedefs

from typing import TYPE_CHECKING  # pragma: no cover

# FIXME:
#  replace invalid 'Optional' (type or None) used instead of 'NotRequired' (optional key) when better supported
#  https://youtrack.jetbrains.com/issue/PY-53611/Support-PEP-655-typingRequiredtypingNotRequired-for-TypedDicts
if TYPE_CHECKING:
    import os
    import sys
    import typing
    import uuid
    from datetime import datetime
    from distutils.version import LooseVersion
    from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union

    import psutil
    from typing_extensions import Literal, NotRequired, Protocol, TypeAlias, TypedDict

    if hasattr(os, "PathLike"):
[docs] FileSystemPathType = Union[os.PathLike, str]
else: FileSystemPathType = str MemoryInfo = Any if sys.platform == "win32": try: MemoryInfo = psutil._psutil_windows._pfullmem # noqa: W0212 except (AttributeError, ImportError, NameError): pass if MemoryInfo is Any: try: MemoryInfo = psutil._pslinux.pfullmem # noqa: W0212 except (AttributeError, ImportError, NameError): pass if MemoryInfo is Any: if TypedDict is Dict: MemoryInfo = Dict else: MemoryInfo = TypedDict("MemoryInfo", { "rss": int, "uss": int, "vms": int, }, total=False) TimesCPU = psutil._common.pcputimes # noqa: W0212 from celery.app import Celery from celery.result import AsyncResult, EagerResult, GroupResult, ResultSet from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Process as ProcessOWS, WPSExecution from pyramid.httpexceptions import HTTPException, HTTPSuccessful, HTTPRedirection from pyramid.registry import Registry from pyramid.request import Request as PyramidRequest from pyramid.response import Response as PyramidResponse from pyramid.testing import DummyRequest from pyramid.config import Configurator from pywps import Process as ProcessWPS from pywps.app import WPSRequest from pywps.inout import BoundingBoxInput, ComplexInput, LiteralInput from requests import PreparedRequest, Request as RequestsRequest from requests.models import Response as RequestsResponse from requests.structures import CaseInsensitiveDict from webob.headers import ResponseHeaders, EnvironHeaders from webob.response import Response as WebobResponse from webtest.response import TestResponse from werkzeug.wrappers import Request as WerkzeugRequest from weaver.processes.constants import CWL_RequirementNames from weaver.processes.wps_process_base import WpsProcessInterface from weaver.datatype import Process from weaver.status import AnyStatusType ReturnValue = TypeVar("ReturnValue") # alias to identify the same return value as a decorated/wrapped function AnyCallable = TypeVar("AnyCallable", bound=Callable[..., Any]) # callable used for decorated/wrapped functions AnyCallableWrapped = Callable[[..., Any], ReturnValue] AnyCallableAnyArgs = Union[Callable[[], ReturnValue], Callable[[..., Any], ReturnValue]] # pylint: disable=C0103,invalid-name Number = Union[int, float] ValueType = Union[str, Number, bool] AnyValueType = Optional[ValueType] # avoid naming ambiguity with PyWPS AnyValue AnyKey = Union[str, int] AnyUUID = Union[str, uuid.UUID] AnyVersion = Union[LooseVersion, Number, str, Tuple[int, ...], List[int]] # add more levels of explicit definitions than necessary to simulate JSON recursive structure better than 'Any' # amount of repeated equivalent definition makes typing analysis 'work well enough' for most use cases _JSON: TypeAlias = "JSON" _JsonObjectItemAlias: TypeAlias = "_JsonObjectItem" _JsonListItemAlias: TypeAlias = "_JsonListItem" _JsonObjectItem = Dict[str, Union[_JSON, _JsonObjectItemAlias, _JsonListItemAlias]] _JsonListItem = List[Union[AnyValueType, _JsonObjectItem, _JsonListItemAlias]] _JsonItem = Union[AnyValueType, _JsonObjectItem, _JsonListItem, _JSON] JSON = Union[Dict[str, _JsonItem], List[_JsonItem], AnyValueType] Link = TypedDict("Link", { "rel": str, "title": str, "href": str, "hreflang": NotRequired[str], "type": NotRequired[str], # IANA Media-Type }, total=False) Metadata = TypedDict("Metadata", { "title": str, "role": str, # URL "value": str, "lang": NotRequired[str], "type": NotRequired[str], # FIXME: relevant? }, total=False) LogLevelStr = Literal[ "CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG", "critical", "fatal", "error", "warn", "warning", "info", "debug" ] AnyLogLevel = Union[LogLevelStr, int] # CWL definition GlobType = TypedDict("GlobType", {"glob": Union[str, List[str]]}, total=False) CWL_IO_FileValue = TypedDict("CWL_IO_FileValue", { "class": str, "path": str, "format": NotRequired[Optional[str]], }, total=True) CWL_IO_Value = Union[AnyValueType, List[AnyValueType], CWL_IO_FileValue, List[CWL_IO_FileValue]] CWL_IO_NullableType = Union[str, List[str]] # "<type>?" or ["<type>", "null"] CWL_IO_NestedType = TypedDict("CWL_IO_NestedType", {"type": CWL_IO_NullableType}, total=True) CWL_IO_EnumSymbols = Union[List[str], List[int], List[float]] CWL_IO_EnumType = TypedDict("CWL_IO_EnumType", { "type": str, "symbols": CWL_IO_EnumSymbols, }) CWL_IO_ArrayType = TypedDict("CWL_IO_ArrayType", { "type": str, "items": Union[str, CWL_IO_EnumType], # "items" => type of every item }) CWL_IO_TypeItem = Union[str, CWL_IO_NestedType, CWL_IO_ArrayType, CWL_IO_EnumType] CWL_IO_DataType = Union[CWL_IO_TypeItem, List[CWL_IO_TypeItem]] CWL_Input_Type = TypedDict("CWL_Input_Type", { "id": NotRequired[str], # representation used by plain CWL definition "name": NotRequired[str], # representation used by parsed tool instance "type": CWL_IO_DataType, "items": NotRequired[Union[str, CWL_IO_EnumType]], "symbols": NotRequired[CWL_IO_EnumSymbols], "format": NotRequired[Optional[Union[str, List[str]]]], "inputBinding": NotRequired[Any], "default": NotRequired[Optional[AnyValueType]], }, total=False) CWL_Output_Type = TypedDict("CWL_Output_Type", { "id": NotRequired[str], # representation used by plain CWL definition "name": NotRequired[str], # representation used by parsed tool instance "type": CWL_IO_DataType, "format": NotRequired[Optional[Union[str, List[str]]]], "outputBinding": NotRequired[GlobType] }, total=False) CWL_Inputs = Union[List[CWL_Input_Type], Dict[str, CWL_Input_Type]] CWL_Outputs = Union[List[CWL_Output_Type], Dict[str, CWL_Output_Type]] # 'requirements' includes 'hints' CWL_Requirement = TypedDict("CWL_Requirement", {"class": CWL_RequirementNames}, total=False) # type: ignore CWL_RequirementsDict = Dict[CWL_RequirementNames, Dict[str, str]] # {'<req>': {<param>: <val>}} CWL_RequirementsList = List[CWL_Requirement] # [{'class': <req>, <param>: <val>}] CWL_AnyRequirements = Union[CWL_RequirementsDict, CWL_RequirementsList] # results from CWL execution CWL_ResultFile = TypedDict("CWL_ResultFile", {"location": str}, total=False) CWL_ResultValue = Union[AnyValueType, List[AnyValueType]] CWL_ResultEntry = Union[Dict[str, CWL_ResultValue], CWL_ResultFile, List[CWL_ResultFile]] CWL_Results = Dict[str, CWL_ResultEntry] CWL_Class = Literal["CommandLineTool", "ExpressionTool", "Workflow"] CWL_WorkflowStep = TypedDict("CWL_WorkflowStep", { "run": str, "in": Dict[str, str], # mapping of <step input: workflow input | other-step output> "out": List[str], # output to retrieve from step, for mapping with other steps }) CWL_WorkflowStepID = str CWL_WorkflowStepReference = TypedDict("CWL_WorkflowStepReference", { "name": CWL_WorkflowStepID, "reference": str, # URL }) _CWL = "CWL" # type: TypeAlias CWL_Graph = List[_CWL] CWL = TypedDict("CWL", { "cwlVersion": str, "class": CWL_Class, "label": str, "doc": str, "id": NotRequired[str], "intent": NotRequired[str], "s:keywords": List[str], "baseCommand": NotRequired[Optional[Union[str, List[str]]]], "parameters": NotRequired[List[str]], "requirements": NotRequired[CWL_AnyRequirements], "hints": NotRequired[CWL_AnyRequirements], "inputs": CWL_Inputs, "outputs": CWL_Outputs, "steps": NotRequired[Dict[CWL_WorkflowStepID, CWL_WorkflowStep]], "stderr": NotRequired[str], "stdout": NotRequired[str], "$namespaces": NotRequired[Dict[str, str]], "$schemas": NotRequired[Dict[str, str]], "$graph": NotRequired[CWL_Graph], }, total=False) CWL_WorkflowStepPackage = TypedDict("CWL_WorkflowStepPackage", { "id": str, # reference ID of the package "package": CWL # definition of the package as sub-step of a Workflow }) CWL_WorkflowStepPackageMap = Dict[CWL_WorkflowStepID, CWL_WorkflowStepPackage] # CWL loading CWL_WorkflowInputs = Dict[str, AnyValueType] # mapping of ID:value (any type) CWL_ExpectedOutputs = Dict[str, AnyValueType] # mapping of ID:pattern (File only) CWL_ToolPathObjectType = Dict[str, Any] JobProcessDefinitionCallback = Callable[[str, Dict[str, str], Dict[str, Any]], WpsProcessInterface] # CWL runtime CWL_RuntimeLiteral = Union[str, float, int] CWL_RuntimeLiteralObject = TypedDict("CWL_RuntimeLiteralObject", { "id": str, "value": CWL_RuntimeLiteral, }, total=False) CWL_RuntimeInputFile = TypedDict("CWL_RuntimeInputFile", { "id": NotRequired[str], "class": str, "location": str, "format": NotRequired[Optional[str]], "basename": str, "nameroot": str, "nameext": str, }, total=False) CWL_RuntimeOutputFile = TypedDict("CWL_RuntimeOutputFile", { "class": str, "location": str, "format": NotRequired[Optional[str]], "basename": str, "nameroot": str, "nameext": str, "checksum": NotRequired[str], "size": NotRequired[str], }, total=False) CWL_RuntimeInput = Union[CWL_RuntimeLiteral, CWL_RuntimeInputFile] CWL_RuntimeInputsMap = Dict[str, CWL_RuntimeInput] CWL_RuntimeInputList = List[Union[CWL_RuntimeLiteralObject, CWL_RuntimeInputFile]] CWL_RuntimeOutput = Union[CWL_RuntimeLiteral, CWL_RuntimeOutputFile] # OWSLib Execution # inputs of OWSLib are either a string (any literal type, bbox or complex file) OWS_InputData = Union[str, BoundingBoxDataInput, ComplexDataInput] OWS_InputDataValues = List[Tuple[str, OWS_InputData]] AnyInputData = Union[OWS_InputData, BoundingBoxInput, ComplexInput, LiteralInput] # PyWPS Execution WPS_InputData = Tuple[str, AnyInputData] WPS_OutputAsRef = Tuple[str, Optional[bool]] # (output_id, as_ref) WPS_OutputAsRefMimeType = Tuple[str, Optional[bool], Optional[str]] # (output_id, as_ref, mime_type) WPS_OutputRequested = Union[WPS_OutputAsRef, WPS_OutputAsRefMimeType] KVP_Item = Union[ValueType, Sequence[ValueType]] KVP_Container = Union[Sequence[Tuple[str, KVP_Item]], Dict[str, KVP_Item]] KVP = Dict[str, List[KVP_Item]] AnyContainer = Union[Configurator, Registry, PyramidRequest, WerkzeugRequest, Celery] SettingValue = Optional[Union[JSON, AnyValueType]] SettingsType = Dict[str, SettingValue] AnySettingsContainer = Union[AnyContainer, SettingsType] AnyRegistryContainer = AnyContainer AnyDatabaseContainer = AnyContainer CookiesType = Dict[str, str] HeadersType = Dict[str, str] CookiesTupleType = List[Tuple[str, str]] HeadersTupleType = List[Tuple[str, str]] CookiesBaseType = Union[CookiesType, CookiesTupleType] HeadersBaseType = Union[HeadersType, HeadersTupleType] HeaderCookiesType = Union[HeadersBaseType, CookiesBaseType] HeaderCookiesTuple = Union[Tuple[None, None], Tuple[HeadersBaseType, CookiesBaseType]] AnyHeadersContainer = Union[HeadersBaseType, ResponseHeaders, EnvironHeaders, CaseInsensitiveDict] AnyCookiesContainer = Union[CookiesBaseType, WPSRequest, PyramidRequest, AnyHeadersContainer] AnyRequestType = Union[PyramidRequest, WerkzeugRequest, PreparedRequest, RequestsRequest, DummyRequest] AnyResponseType = Union[PyramidResponse, WebobResponse, RequestsResponse, TestResponse] AnyViewResponse = Union[PyramidResponse, WebobResponse, HTTPException] RequestMethod = Literal[ "HEAD", "GET", "POST", "PUT", "PATCH", "DELETE", "head", "get", "post", "put", "patch", "delete", ] AnyRequestMethod = Union[RequestMethod, str] HTTPValid = Union[HTTPSuccessful, HTTPRedirection] AnyProcess = Union[Process, ProcessOWS, ProcessWPS, JSON] AnyProcessClass = Union[Type[Process], Type[ProcessWPS]] # update_status(message, progress, status, *args, **kwargs) UpdateStatusPartialFunction = TypeVar( "UpdateStatusPartialFunction", bound=Callable[[str, Number, AnyStatusType, ..., Any], None] ) DatetimeIntervalType = TypedDict("DatetimeIntervalType", { "before": datetime, "after": datetime, "match": datetime }, total=False) # data source configuration DataSourceFileRef = TypedDict("DataSourceFileRef", { "ades": str, # target ADES to dispatch "netloc": str, # definition to match file references against "default": NotRequired[bool], # default ADES when no match was possible (single one allowed in config) }, total=True) DataSourceOpenSearch = TypedDict("DataSourceOpenSearch", { "ades": str, # target ADES to dispatch "netloc": str, # where to send OpenSearch request "collection_id": NotRequired[str], # OpenSearch collection ID to match against "default": NotRequired[bool], # default ADES when no match was possible (single one allowed) "accept_schemes": NotRequired[List[str]], # allowed URL schemes (http, https, etc.) "mime_types": NotRequired[List[str]], # allowed Media-Types (text/xml, application/json, etc.) "rootdir": str, # root position of the data to retrieve "osdd_url": str, # global OpenSearch description document to employ }, total=True) DataSource = Union[DataSourceFileRef, DataSourceOpenSearch] DataSourceConfig = Dict[str, DataSource] # JSON/YAML file contents JobValueFormat = TypedDict("JobValueFormat", { "mime_type": NotRequired[str], "media_type": NotRequired[str], "encoding": NotRequired[str], "schema": NotRequired[str], "extension": NotRequired[str], }, total=False) JobValueFile = TypedDict("JobValueFile", { "href": str, "format": NotRequired[JobValueFormat], }, total=False) JobValueData = TypedDict("JobValueData", { "data": AnyValueType, }, total=False) JobValueValue = TypedDict("JobValueValue", { "value": AnyValueType, }, total=False) JobValueObject = Union[JobValueData, JobValueValue, JobValueFile] JobValueFileItem = TypedDict("JobValueFileItem", { "id": str, "href": Optional[str], "format": Optional[JobValueFormat], }, total=False) JobValueDataItem = TypedDict("JobValueDataItem", { "id": str, "data": AnyValueType, }, total=False) JobValueValueItem = TypedDict("JobValueValueItem", { "id": str, "value": AnyValueType, }, total=False) JobValueItem = Union[JobValueDataItem, JobValueFileItem] JobExpectItem = TypedDict("JobExpectItem", {"id": str}, total=True) JobInputs = List[Union[JobValueItem, Dict[str, AnyValueType]]] JobOutputs = List[Union[JobExpectItem, Dict[str, AnyValueType]]] JobResults = List[JobValueItem] JobMonitorReference = Any # typically an URI of the remote job status or an execution object/handler ExecutionInputsMap = Dict[str, JobValueObject] # when schema='weaver.processes.constants.ProcessSchema.OGC' ExecutionInputsList = List[JobValueItem] # when schema='weaver.processes.constants.ProcessSchema.OLD' ExecutionInputs = Union[ExecutionInputsList, ExecutionInputsMap] ExecutionOutputObject = TypedDict("ExecutionOutputObject", { "transmissionMode": str }, total=False) ExecutionOutputItem = TypedDict("ExecutionOutputItem", { "id": str, "transmissionMode": str }, total=False) ExecutionOutputsList = List[ExecutionOutputItem] ExecutionOutputsMap = Dict[str, ExecutionOutputObject] ExecutionOutputs = Union[ExecutionOutputsList, ExecutionOutputsMap] ExecutionResultObjectRef = TypedDict("ExecutionResultObjectRef", { "href": Optional[str], "type": NotRequired[str], }, total=False) ExecutionResultObjectValue = TypedDict("ExecutionResultObjectValue", { "value": Optional[AnyValueType], "type": NotRequired[str], }, total=False) ExecutionResultObject = Union[ExecutionResultObjectRef, ExecutionResultObjectValue] ExecutionResultArray = List[ExecutionResultObject] ExecutionResultValue = Union[ExecutionResultObject, ExecutionResultArray] ExecutionResults = Dict[str, ExecutionResultValue] # reference employed as 'JobMonitorReference' by 'WPS1Process' JobExecution = TypedDict("JobExecution", {"execution": WPSExecution}) # quoting QuoteProcessParameters = TypedDict("QuoteProcessParameters", { "inputs": JobInputs, "outputs": JobOutputs, }) # job execution statistics ApplicationStatistics = TypedDict("ApplicationStatistics", { "usedMemory": str, "usedMemoryBytes": int, }, total=True) ProcessStatistics = TypedDict("ProcessStatistics", { "rss": str, "rssBytes": int, "uss": str, "ussBytes": int, "vms": str, "vmsBytes": int, "usedThreads": int, "usedCPU": int, "usedHandles": int, "usedMemory": str, "usedMemoryBytes": int, "totalSize": str, "totalSizeBytes": int, }, total=False) OutputStatistics = TypedDict("OutputStatistics", { "size": str, "sizeBytes": int, }, total=True) Statistics = TypedDict("Statistics", { "application": NotRequired[ApplicationStatistics], "process": NotRequired[ProcessStatistics], "outputs": Dict[str, OutputStatistics], }, total=False) CeleryResult = Union[AsyncResult, EagerResult, GroupResult, ResultSet] # simple/partial definitions of OpenAPI schema _OpenAPISchema: TypeAlias = "OpenAPISchema" _OpenAPISchemaProperty: TypeAlias = "OpenAPISchemaProperty" OpenAPISchemaTypes = Literal["object", "array", "boolean", "integer", "number", "string"] OpenAPISchemaReference = TypedDict("OpenAPISchemaReference", { "$ref": _OpenAPISchema }, total=True) OpenAPISchemaMetadata = TypedDict("OpenAPISchemaMetadata", { "$id": NotRequired[str], # reference to external '$ref' after local resolution for tracking "$schema": NotRequired[str], # how to parse schema (usually: 'https://json-schema.org/draft/2020-12/schema') "@context": NotRequired[str], # extra details or JSON-LD references }, total=False) OpenAPISchemaProperty = TypedDict("OpenAPISchemaProperty", { "type": OpenAPISchemaTypes, "format": NotRequired[str], "default": NotRequired[Any], "example": NotRequired[Any], "title": NotRequired[str], "description": NotRequired[str], "enum": NotRequired[List[Union[str, Number]]], "items": NotRequired[List[_OpenAPISchema, OpenAPISchemaReference]], "required": NotRequired[List[str]], "nullable": NotRequired[bool], "deprecated": NotRequired[bool], "readOnly": NotRequired[bool], "writeOnly": NotRequired[bool], "multipleOf": NotRequired[Number], "minimum": NotRequired[Number], "maximum": NotRequired[Number], "exclusiveMinimum": NotRequired[bool], "exclusiveMaximum": NotRequired[bool], "minLength": NotRequired[Number], "maxLength": NotRequired[Number], "pattern": NotRequired[str], "minItems": NotRequired[Number], "maxItems": NotRequired[Number], "uniqueItems": NotRequired[bool], "minProperties": NotRequired[Number], "maxProperties": NotRequired[Number], "contentMediaType": NotRequired[str], "contentEncoding": NotRequired[str], "contentSchema": NotRequired[str], "properties": NotRequired[Dict[str, _OpenAPISchemaProperty]], "additionalProperties": NotRequired[Union[bool, Dict[str, Union[_OpenAPISchema, OpenAPISchemaReference]]]], }, total=False) OpenAPISchemaObject = TypedDict("OpenAPISchemaObject", { "type": Literal["object"], "properties": Dict[str, OpenAPISchemaProperty], }, total=False) OpenAPISchemaArray = TypedDict("OpenAPISchemaArray", { "type": Literal["array"], "items": _OpenAPISchema, }, total=False) OpenAPISchemaAllOf = TypedDict("OpenAPISchemaAllOf", { "allOf": List[Union[_OpenAPISchema, OpenAPISchemaReference]], }, total=False) OpenAPISchemaAnyOf = TypedDict("OpenAPISchemaAnyOf", { "anyOf": List[Union[_OpenAPISchema, OpenAPISchemaReference]], }, total=False) OpenAPISchemaOneOf = TypedDict("OpenAPISchemaOneOf", { "oneOf": List[Union[_OpenAPISchema, OpenAPISchemaReference]], }, total=False) OpenAPISchemaNot = TypedDict("OpenAPISchemaNot", { "not": Union[_OpenAPISchema, OpenAPISchemaReference], }, total=False) OpenAPISchemaKeyword = Union[ OpenAPISchemaAllOf, OpenAPISchemaAnyOf, OpenAPISchemaOneOf, OpenAPISchemaNot, ] OpenAPISchema = Union[ OpenAPISchemaObject, OpenAPISchemaArray, OpenAPISchemaKeyword, OpenAPISchemaProperty, OpenAPISchemaReference, OpenAPISchemaMetadata, ]