Source code for weaver.datatype

Definitions of types used by tokens.
import abc
import base64
import copy
import enum
import inspect
import json
import re
import traceback
import uuid
import warnings
from datetime import datetime, timedelta
from io import BytesIO
from logging import ERROR, INFO, Logger, getLevelName, getLogger
from secrets import compare_digest, token_hex
from typing import TYPE_CHECKING
from urllib.parse import urljoin, urlparse

import colander
import pyramid.httpexceptions
import requests.exceptions
from cryptography.fernet import Fernet
from dateutil.parser import parse as dt_parse
from docker.auth import decode_auth
from owslib.util import ServiceException as OWSServiceException
from owslib.wps import Process as ProcessOWS, WPSException
from pywps import Process as ProcessWPS

from weaver import xml_util
from weaver.exceptions import ProcessInstanceError, ServiceParsingError
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import AcceptLanguage, ContentType, repr_json
from weaver.processes.constants import ProcessSchema
from weaver.processes.convert import get_field, json2oas_io, normalize_ordered_io, null, ows2json, wps2json_io
from weaver.processes.types import ProcessType
from weaver.quotation.status import QuoteStatus
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.utils import localize_datetime  # for backward compatibility of previously saved jobs not time-locale-aware
from weaver.utils import (
from weaver.visibility import Visibility
from weaver.warning import NonBreakingExceptionWarning
from weaver.wps.utils import get_wps_client, get_wps_url
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url

    from typing import Any, Callable, Dict, IO, List, Optional, Union

    from owslib.wps import WebProcessingService

    from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode
    from weaver.processes.constants import ProcessSchemaType
    from weaver.processes.types import AnyProcessType
    from weaver.quotation.status import AnyQuoteStatus
    from weaver.status import AnyStatusType, StatusType
    from weaver.typedefs import (
    from weaver.visibility import AnyVisibility

[docs] AnyParams = Dict[str, Any]
AnyAuthentication = Union["Authentication", "DockerAuthentication"]
[docs]LOGGER = getLogger(__name__)
[docs]class DictBase(dict): """ Dictionary with extended attributes auto-``getter``/``setter`` for convenience. Explicitly overridden ``getter``/``setter`` attributes are called instead of ``dict``-key ``get``/``set``-item to ensure corresponding checks and/or value adjustments are executed before applying it to the sub-``dict``. """ def __setattr__(self, item, value): """ Uses an existing property setter if defined in the subclass or employs the default dictionary setter otherwise. """ prop = getattr(type(self), item) if isinstance(prop, property) and prop.fset is not None: prop.fset(self, value) # noqa else: super(DictBase, self).__setitem__(item, value) def __getitem__(self, item): """ Uses an existing property getter if defined in the subclass or employs the default dictionary getter otherwise. """ prop = getattr(type(self), item) if isinstance(prop, property) and prop.fget is not None: return prop.fget(self) # noqa elif item in self: return getattr(self, item, None) else: raise AttributeError(f"Can't get attribute '{item}' in '{fully_qualified_name(self)}'.") def __str__(self): # type: () -> str return type(self).__name__ def __repr__(self): # type: () -> str _type = fully_qualified_name(self) _repr = dict.__repr__(self) return f"{_type} ({_repr})"
[docs] def dict(self): # type: () -> AnyParams """ Generate a dictionary representation of the object, but with inplace resolution of attributes as applicable. """ # update any entries by key with their attribute _dict = {key: getattr(self, key, dict.__getitem__(self, key)) for key, val in self.items()} # then, ensure any missing key gets added if a getter property exists for it props = {prop[0] for prop in inspect.getmembers(self) if not prop[0].startswith("_") and prop[0] not in _dict} for key in props: prop = getattr(type(self), key) if isinstance(prop, property) and prop.fget is not None: _dict[key] = prop.fget(self) # noqa return _dict
[docs]class AutoBase(DictBase): """ Base that automatically converts literal class members to properties also accessible by dictionary keys. .. code-block:: python class Data(AutoBase): field = 1 other = None d = Data() d.other # returns None d.other = 2 # other is modified d.other # returns 2 dict(d) # returns {'field': 1, 'other': 2} d.field # returns 1 d["field"] # also 1 ! """ def __new__(cls, *args, **kwargs): extra_props = set(dir(cls)) - set(dir(DictBase)) auto_cls = DictBase.__new__(cls, *args, **kwargs) for prop in extra_props: prop_func = property( lambda self, key: dict.__getitem__(self, key), lambda self, key, value: dict.__setattr__(self, key, value) ) default = getattr(auto_cls, prop, None) setattr(auto_cls, prop, prop_func) AutoBase.__setattr__(auto_cls, prop, default) return auto_cls def __getitem__(self, item): return dict.__getitem__(self, item) def __setattr__(self, key, value): # set both as object and dict reference DictBase.__setattr__(self, key, value) dict.__setattr__(self, key, value)
[docs]class Base(DictBase): """ Base interface for all data-types. """ def __str__(self): # type: () -> str return f"{type(self).__name__} <{}>" @property def __name__(self): # type: () -> str return fully_qualified_name(self) @property
[docs] def id(self): raise NotImplementedError()
[docs] def uuid(self): # type: () -> uuid.UUID return
[docs] def json(self): # type: () -> JSON """ Obtain the JSON data representation for response body. .. note:: This method implementation should validate the JSON schema against the API definition whenever applicable to ensure integrity between the represented data type and the expected API response. """ raise NotImplementedError("Method 'json' must be defined for JSON request item representation.")
[docs] def params(self): # type: () -> AnyParams """ Obtain the internal data representation for storage. .. note:: This method implementation should provide a JSON-serializable definition of all fields representing the object to store. """ raise NotImplementedError("Method 'params' must be defined for storage item representation.")
[docs]class LocalizedDateTimeProperty(property): """ Property that ensures date-time localization is applied on the stored/retrieved value as required. """ def __init__(self, fget=None, # type: Callable[[Any], Optional[datetime]] fset=None, # type: Callable[[Any, Union[datetime, str]], None] fdel=None, # type: Callable[[Any], None] doc=None, # type: str default_now=False, # type: bool ): # type: (...) -> None self.default_now = default_now fget = fget or self.__get__ fset = fset or self.__set__ super(LocalizedDateTimeProperty, self).__init__(fget=fget, fset=fset, fdel=fdel, doc=doc) def __set_name__(self, owner, name): # type: (Any, str) -> None = name # pylint: disable=W0201 def __get__(self, instance, *_): # type: (Any, Optional[Any]) -> Optional[datetime] if instance is None: # allow access to the descriptor as class attribute 'getattr(type(instance), property-name)' return self # noqa dt = instance.get(, None) if not dt: if self.default_now: cur_dt = now() self.__set__(instance, cur_dt) return cur_dt return None return localize_datetime(dt) def __set__(self, instance, value): # type: (Any, Union[datetime, str]) -> None if isinstance(str, datetime): value = dt_parse(value) if not isinstance(value, datetime): name = fully_qualified_name(instance) raise TypeError(f"Type 'datetime' is required for '{name}.{}'") instance[] = localize_datetime(value)
[docs]class Service(Base): """ Dictionary that contains OWS services. It always has ``url`` key. """ def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None super(Service, self).__init__(*args, **kwargs) if "name" not in self: raise TypeError("Service 'name' is required") if "url" not in self: raise TypeError("Service 'url' is required") self["_wps"] = None @property
[docs] def id(self): return
[docs] def url(self): """ Service URL. """ return dict.__getitem__(self, "url")
[docs] def name(self): """ Service name. """ return dict.__getitem__(self, "name")
[docs] def type(self): """ Service type. """ return self.get("type", ProcessType.WPS_REMOTE)
[docs] def public(self): """ Flag if service has public access. """ # TODO: public access can be set via auth parameter. return self.get("public", False)
[docs] def auth(self): """ Authentication method: public, token, cert. """ return self.get("auth", "token")
[docs] def json(self): # type: () -> JSON # TODO: apply swagger type deserialize schema check if returned in a response return self.params()
[docs] def params(self): # type: () -> AnyParams return { "url": self.url, "name":, "type": self.type, "public": self.public, "auth": self.auth
[docs] def wps(self, container=None, **kwargs): # type: (AnySettingsContainer, **Any) -> WebProcessingService """ Obtain the remote WPS service definition and metadata. Stores the reference locally to avoid re-fetching it needlessly for future reference. """ try: _wps = self.get("_wps") if _wps is None: # client retrieval could also be cached if recently fetched an not yet invalidated self["_wps"] = _wps = get_wps_client(self.url, container=container, **kwargs) return _wps except (OWSServiceException, xml_util.ParseError) as exc: msg = f"Invalid XML returned by WPS [{}] at [{self.url}] cannot be parsed." raise ServiceParsingError(json={"description": msg, "cause": str(exc), "error": exc.__class__.__name__})
[docs] def metadata(self, container): # type: (AnySettingsContainer) -> List[Metadata] """ Obtains the metadata relevant to the service provider. """ wps = self.wps(container=container) wps_lang = wps.language # FIXME: add more metadata retrieved from 'wps.identification' and '' (?) # if so, should be included only in "long description", while "summary" only returns below info meta = [ { "type": "provider-name", "title": "Provider Name", "role": "", "value":, "lang": wps_lang }, { "type": "provider-site", "title": "Provider Name", "role": "", "value": wps.provider.url, "lang": wps_lang }, { "type": "contact-name", "title": "Contact Name", "role": "", "value":, "lang": wps_lang } ] return meta
[docs] def keywords(self, container=None): # type: (AnySettingsContainer) -> List[str] """ Obtains the keywords relevant to the service provider. """ wps = self.wps(container=container) return wps.identification.keywords
[docs] def summary(self, container, fetch=True, ignore=False): # type: (AnySettingsContainer, bool, bool) -> Optional[JSON] """ Obtain the summary information from the provider service. When metadata fetching is disabled, the generated summary will contain only information available locally. :param container: employed to retrieve application settings. :param fetch: indicates whether metadata should be fetched from remote. :param ignore: indicates if failing metadata retrieval/parsing should be silently discarded or raised. :return: generated summary information. """ try: # FIXME: not implemented ( if not ProcessType.is_wps(self.type): return None # basic information always available (local) data = { "id":, "url": self.url, # remote URL (bw-compat, also in links) "type": ProcessType.WPS_REMOTE, "public": self.public, "links": self.links(container, fetch=fetch), } # retrieve more metadata from remote if possible and requested if fetch: wps = self.wps(container) data.update({ "title": getattr(wps.identification, "title", None), "description": getattr(wps.identification, "abstract", None), "keywords": self.keywords(container), "metadata": self.metadata(container), }) return sd.ProviderSummarySchema().deserialize(data) except colander.Invalid as exc: LOGGER.error("Failed schema validation on otherwise valid parsing of provider definition.", exc_info=exc) raise # invalid schema on our side, don't ignore it except Exception as exc: msg = f"Exception occurred while fetching or parsing WPS [{}] at [{self.url}]" err_msg = f"{msg}: {exc!r}" LOGGER.debug(err_msg, exc_info=exc) if ignore: warnings.warn(err_msg, NonBreakingExceptionWarning) return None if isinstance(exc, ServiceParsingError): raise raise ServiceParsingError(json={"description": msg, "cause": str(exc), "error": fully_qualified_name(exc)})
[docs] def processes(self, container, ignore=False): # type: (AnySettingsContainer, bool) -> Optional[List[Process]] """ Obtains a list of remote service processes in a compatible :class:`weaver.datatype.Process` format. .. note:: Remote processes won't be stored to the local process storage. :param container: Employed to retrieve application settings. :param ignore: Indicates if failing service retrieval/parsing should be silently discarded or raised. :raises ServiceParsingError: If parsing failed and was NOT requested to be ignored. :return: If parsing was successful, list of converted remote service processes. If parsing failed and was requested to be ignored, returns ``None`` to distinguish from empty process list. """ # FIXME: support other providers ( if not ProcessType.is_wps(self.type): return [] try: wps = self.wps(container) except ServiceParsingError as exc: err_msg = repr(exc) LOGGER.debug(err_msg, exc_info=exc) if ignore: warnings.warn(err_msg, NonBreakingExceptionWarning) return None raise settings = get_settings(container) return [Process.convert(process, self, settings) for process in wps.processes]
[docs] def check_accessible(self, settings, ignore=True): # type: (AnySettingsContainer, bool) -> bool """ Verify if the service URL is accessible. """ try: # some WPS don't like HEAD request, so revert to normal GetCapabilities # otherwise use HEAD because it is faster to only 'ping' the service if ProcessType.is_wps(self.type): meth = "GET" url = f"{self.url}?service=WPS&request=GetCapabilities" else: meth = "HEAD" url = self.url # - allow 500 for services that incorrectly handle invalid request params, but at least respond # (should be acceptable in this case because the 'ping' request is not necessarily well formed) # - allow 400/405 for bad request/method directly reported by the service for the same reasons # - enforce quick timeout (but don't allow 408 code) to avoid long pending connexions that never resolve allowed_codes = [200, 400, 405, 500] resp = request_extra(meth, url, timeout=2, settings=settings, allowed_codes=allowed_codes) return resp.status_code in allowed_codes except (requests.exceptions.RequestException, pyramid.httpexceptions.HTTPException) as exc: msg = f"Exception occurred while checking service [{}] accessibility at [{self.url}]" warnings.warn(f"{msg}: {exc!r}", NonBreakingExceptionWarning) if not ignore: raise ServiceParsingError(json={ "description": msg, "cause": "Cannot validate or parse service metadata since it is not accessible.", "error": exc.__class__.__name__ }) return False
[docs]class Job(Base): """ Dictionary that contains OWS service jobs. It always has ``id`` and ``task_id`` keys. """ def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None super(Job, self).__init__(*args, **kwargs) if "task_id" not in self: raise TypeError(f"Parameter 'task_id' is required for '{self.__name__}' creation.") if not isinstance(, (str, uuid.UUID)): raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.id'")
[docs] def _get_log_msg(self, msg=None, status=None, progress=None): # type: (Optional[str], Optional[AnyStatusType], Optional[Number]) -> str if not msg: msg = self.status_message status = map_status(status or self.status) progress = max(0, min(100, progress or self.progress)) return get_job_log_msg(duration=self.duration_str, progress=progress, status=status, message=msg)
[docs] def _get_err_msg(error): # type: (WPSException) -> str return f"{error.text} - code={error.code} - locator={error.locator}"
[docs] def save_log(self, errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]] logger=None, # type: Optional[Logger] message=None, # type: Optional[str] level=INFO, # type: AnyLogLevel status=None, # type: Optional[AnyStatusType] progress=None, # type: Optional[Number] ): # type: (...) -> None """ Logs the specified error and/or message, and adds the log entry to the complete job log. For each new log entry, additional :class:`Job` properties are added according to :meth:`Job._get_log_msg` and the format defined by :func:`get_job_log_msg`. :param errors: An error message or a list of WPS exceptions from which to log and save generated message stack. :param logger: An additional :class:`Logger` for which to propagate logged messages on top saving them to the job. :param message: Explicit string to be logged, otherwise use the current :py:attr:`Job.status_message` is used. :param level: Logging level to apply to the logged ``message``. This parameter is ignored if ``errors`` are logged. :param status: Override status applied in the logged message entry, but does not set it to the job object. Uses the current :attr:`Job.status` value if not specified. Must be one of :mod:`Weaver.status` values. :param progress: Override progress applied in the logged message entry, but does not set it to the job object. Uses the current :attr:`Job.progress` value if not specified. .. note:: The job object is updated with the log but still requires to be pushed to database to actually persist it. """ if isinstance(errors, WPSException): errors = [errors] elif isinstance(errors, Exception): errors = str(errors) if isinstance(errors, str): log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress))] self.exceptions.append(errors) elif isinstance(errors, list): log_msg = [ (ERROR, self._get_log_msg(self._get_err_msg(error), status=status, progress=progress)) for error in errors ] self.exceptions.extend([{ "Code": error.code, "Locator": error.locator, "Text": error.text } for error in errors]) else: log_msg = [(level, self._get_log_msg(message, status=status, progress=progress))] for lvl, msg in log_msg: fmt_msg = get_log_fmt() % dict(asctime=now().strftime(get_log_date_fmt()), levelname=getLevelName(lvl), name=self.__name__, message=msg) if len(self.logs) == 0 or self.logs[-1] != fmt_msg: self.logs.append(fmt_msg) if logger: logger.log(lvl, msg)
[docs] def id(self): # type: () -> uuid.UUID """ Job UUID to retrieve the details from storage. """ job_id = self.get("id") if not job_id: job_id = uuid.uuid4() self["id"] = job_id if isinstance(job_id, str): return uuid.UUID(job_id) return job_id
[docs] def task_id(self): # type: () -> Optional[AnyUUID] """ Reference Task UUID attributed by the ``Celery`` worker that monitors and executes this job. """ task_id = self.get("task_id", None) try: # task ID can be a temporary non-UUID value if isinstance(task_id, str): return uuid.UUID(task_id) except ValueError: pass return task_id
@task_id.setter def task_id(self, task_id): # type: (AnyUUID) -> None if not isinstance(task_id, (str, uuid.UUID)): raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.task_id'") self["task_id"] = task_id @property
[docs] def wps_id(self): # type: () -> Optional[uuid.UUID] """ Reference WPS Request/Response UUID attributed by the executed ``PyWPS`` process. This UUID matches the status-location, log and output directory of the WPS process. This parameter is only available when the process is executed on this local instance. .. seealso:: - :attr:`Job.request` - :attr:`Job.response` """ wps_id = self.get("wps_id", None) if isinstance(wps_id, str): return uuid.UUID(wps_id) return wps_id
@wps_id.setter def wps_id(self, wps_id): # type: (AnyUUID) -> None if not isinstance(wps_id, (str, uuid.UUID)): raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.wps_id'") self["wps_id"] = wps_id @property
[docs] def service(self): # type: () -> Optional[str] """ Service identifier of the corresponding remote process. .. seealso:: - :attr:`` """ return self.get("service", None)
@service.setter def service(self, service): # type: (Optional[str]) -> None if not isinstance(service, str) or service is None: raise TypeError(f"Type 'str' is required for '{self.__name__}.service'") self["service"] = service @property
[docs] def process(self): # type: () -> Optional[str] """ Process identifier of the corresponding remote process. .. seealso:: - :attr:`` """ return self.get("process", None)
@process.setter def process(self, process): # type: (Optional[str]) -> None if not isinstance(process, str) or process is None: raise TypeError(f"Type 'str' is required for '{self.__name__}.process'") self["process"] = process @property
[docs] def type(self): # type: () -> str """ Obtain the type of the element associated to the creation of this job. .. seealso:: - Defined in - Queried with (Parameter Type section). """ if self.service is None: return "process" return "provider"
[docs] def _get_inputs(self): # type: () -> Optional[ExecutionInputs] if self.get("inputs") is None: return {} return dict.__getitem__(self, "inputs")
[docs] def _set_inputs(self, inputs): # type: (Optional[ExecutionInputs]) -> None self["inputs"] = inputs
# allows to correctly update list by ref using 'job.inputs.extend()'
[docs] inputs = property(_get_inputs, _set_inputs, doc="Input values and reference submitted for execution.")
[docs] def _get_outputs(self): # type: () -> Optional[ExecutionOutputs] if self.get("outputs") is None: return {} return dict.__getitem__(self, "outputs")
[docs] def _set_outputs(self, outputs): # type: (Optional[ExecutionOutputs]) -> None self["outputs"] = outputs
[docs] outputs = property(_get_outputs, _set_outputs, doc="Output transmission modes submitted for execution.")
[docs] def user_id(self): # type: () -> Optional[str] return self.get("user_id", None)
@user_id.setter def user_id(self, user_id): # type: (Optional[str]) -> None if not isinstance(user_id, int) or user_id is None: raise TypeError(f"Type 'int' is required for '{self.__name__}.user_id'") self["user_id"] = user_id @property
[docs] def status(self): # type: () -> Status return Status.get(self.get("status"), Status.UNKNOWN)
@status.setter def status(self, status): # type: (StatusType) -> None value = Status.get(status) if value == Status.ACCEPTED and self.status == Status.RUNNING: LOGGER.debug(traceback.extract_stack()) if value not in Status: statuses = list(Status.values()) name = self.__name__ raise ValueError(f"Status '{status}' is not valid for '{name}.status', must be one of {statuses!s}'") self["status"] = value @property
[docs] def status_message(self): # type: () -> str return self.get("status_message", "no message")
@status_message.setter def status_message(self, message): # type: (Optional[str]) -> None if message is None: return if not isinstance(message, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.status_message'") self["status_message"] = message @property
[docs] def status_location(self): # type: () -> Optional[str] return self.get("status_location", None)
@status_location.setter def status_location(self, location_url): # type: (Optional[str]) -> None if not isinstance(location_url, str) or location_url is None: raise TypeError(f"Type 'str' is required for '{self.__name__}.status_location'") self["status_location"] = location_url
[docs] def status_url(self, container=None): # type: (Optional[AnySettingsContainer]) -> str """ Obtain the resolved endpoint where the :term:`Job` status information can be obtained. """ settings = get_settings(container) location_base = f"/providers/{self.service}" if self.service else "" api_base_url = get_wps_restapi_base_url(settings) location_url = f"{api_base_url}{location_base}/processes/{self.process}/jobs/{}" return location_url
[docs] def notification_email(self): # type: () -> Optional[str] return self.get("notification_email")
@notification_email.setter def notification_email(self, email): # type: (Optional[Union[str]]) -> None if not isinstance(email, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.notification_email'") self["notification_email"] = email @property
[docs] def accept_language(self): # type: () -> Optional[str] return self.get("accept_language")
@accept_language.setter def accept_language(self, language): # type: (Optional[Union[str]]) -> None if not isinstance(language, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.accept_language'") self["accept_language"] = language @property
[docs] def execute_async(self): # type: () -> bool return self.execution_mode == ExecuteMode.ASYNC
[docs] def execute_sync(self): # type: () -> bool return self.execution_mode == ExecuteMode.SYNC
[docs] def execution_mode(self): # type: () -> AnyExecuteMode return ExecuteMode.get(self.get("execution_mode"), ExecuteMode.ASYNC)
@execution_mode.setter def execution_mode(self, mode): # type: (Union[AnyExecuteMode, str]) -> None exec_mode = ExecuteMode.get(mode) if exec_mode not in ExecuteMode: modes = list(ExecuteMode.values()) raise ValueError(f"Invalid value for '{self.__name__}.execution_mode'. Must be one of {modes}") self["execution_mode"] = mode @property
[docs] def execution_response(self): # type: () -> AnyExecuteResponse out = self.setdefault("execution_response", ExecuteResponse.DOCUMENT) if out not in ExecuteResponse.values(): out = ExecuteResponse.DOCUMENT self["execution_response"] = out return out
@execution_response.setter def execution_response(self, response): # type: (Optional[Union[AnyExecuteResponse, str]]) -> None if response is None: exec_resp = ExecuteResponse.DOCUMENT else: exec_resp = ExecuteResponse.get(response) if exec_resp not in ExecuteResponse: resp = list(ExecuteResponse.values()) raise ValueError(f"Invalid value for '{self.__name__}.execution_response'. Must be one of {resp}") self["execution_response"] = exec_resp @property
[docs] def is_local(self): # type: () -> bool return self.get("is_local", not self.service)
@is_local.setter def is_local(self, is_local): # type: (bool) -> None if not isinstance(is_local, bool): raise TypeError(f"Type 'bool' is required for '{self.__name__}.is_local'") self["is_local"] = is_local @property
[docs] def is_workflow(self): # type: () -> bool return self.get("is_workflow", False)
@is_workflow.setter def is_workflow(self, is_workflow): # type: (bool) -> None if not isinstance(is_workflow, bool): raise TypeError(f"Type 'bool' is required for '{self.__name__}.is_workflow'") self["is_workflow"] = is_workflow @property
[docs] def is_finished(self): # type: () -> bool return self.finished is not None
[docs] def mark_finished(self): # type: () -> None self["finished"] = now()
[docs] def _get_updated(self): # type: () -> datetime updated = self.get("updated") # backward compatibility when not already set if not updated: if self.status == map_status(Status.ACCEPTED): updated = self.created elif self.is_finished: updated = self.finished else: updated = self.started updated = localize_datetime(updated or now()) self.updated = updated # apply to remain static until saved return localize_datetime(updated)
[docs] created = LocalizedDateTimeProperty(default_now=True)
[docs] started = LocalizedDateTimeProperty()
[docs] finished = LocalizedDateTimeProperty()
[docs] updated = LocalizedDateTimeProperty(fget=_get_updated)
[docs] def duration(self): # type: () -> Optional[timedelta] if not self.started: return None final_time = self.finished or now() return localize_datetime(final_time) - localize_datetime(self.started)
[docs] def duration_str(self): # type: () -> str duration = self.duration if duration is None: return "00:00:00" return str(duration).split(".", 1)[0].zfill(8) # "HH:MM:SS"
[docs] def progress(self): # type: () -> Number return self.get("progress", 0)
@progress.setter def progress(self, progress): # type: (Number) -> None if not isinstance(progress, (int, float)): raise TypeError(f"Number is required for '{self.__name__}.progress'") if progress < 0 or progress > 100: raise ValueError(f"Value must be in range [0,100] for '{self.__name__}.progress'") self["progress"] = progress
[docs] def _get_results(self): # type: () -> List[Optional[Dict[str, JSON]]] if self.get("results") is None: self["results"] = [] return dict.__getitem__(self, "results")
[docs] def _set_results(self, results): # type: (List[Optional[Dict[str, JSON]]]) -> None if not isinstance(results, list): raise TypeError(f"Type 'list' is required for '{self.__name__}.results'") self["results"] = results
# allows to correctly update list by ref using 'job.results.extend()'
[docs] results = property(_get_results, _set_results, doc="Output values and references that resulted from execution.")
[docs] def _get_exceptions(self): # type: () -> List[Union[str, Dict[str, str]]] if self.get("exceptions") is None: self["exceptions"] = [] return dict.__getitem__(self, "exceptions")
[docs] def _set_exceptions(self, exceptions): # type: (List[Union[str, Dict[str, str]]]) -> None if not isinstance(exceptions, list): raise TypeError(f"Type 'list' is required for '{self.__name__}.exceptions'") self["exceptions"] = exceptions
# allows to correctly update list by ref using 'job.exceptions.extend()'
[docs] exceptions = property(_get_exceptions, _set_exceptions)
[docs] def _get_logs(self): # type: () -> List[Dict[str, str]] if self.get("logs") is None: self["logs"] = [] return dict.__getitem__(self, "logs")
[docs] def _set_logs(self, logs): # type: (List[Dict[str, str]]) -> None if not isinstance(logs, list): raise TypeError(f"Type 'list' is required for '{self.__name__}.logs'") self["logs"] = logs
# allows to correctly update list by ref using 'job.logs.extend()'
[docs] logs = property(_get_logs, _set_logs)
[docs] def _get_tags(self): # type: () -> List[Optional[str]] if self.get("tags") is None: self["tags"] = [] return dict.__getitem__(self, "tags")
[docs] def _set_tags(self, tags): # type: (List[Optional[str]]) -> None if not isinstance(tags, list): raise TypeError(f"Type 'list' is required for '{self.__name__}.tags'") self["tags"] = tags
# allows to correctly update list by ref using 'job.tags.extend()'
[docs] tags = property(_get_tags, _set_tags)
[docs] def access(self): # type: () -> Visibility """ Job visibility access from execution. """ return Visibility.get(self.get("access"), Visibility.PRIVATE)
@access.setter def access(self, visibility): # type: (str) -> None """ Job visibility access from execution. """ vis = Visibility.get(visibility) if visibility not in Visibility.values(): raise ValueError(f"Invalid 'visibility' value '{visibility!s}' specified for '{self.__name__}.access'") self["access"] = vis @property
[docs] def context(self): # type: () -> Optional[str] """ Job outputs context. """ return self.get("context") or None
@context.setter def context(self, context): # type: (Optional[str]) -> None """ Job outputs context. """ if not (isinstance(context, str) or context is None): raise TypeError(f"Type 'str' or 'None' is required for '{self.__name__}.context'") self["context"] = context @property
[docs] def request(self): # type: () -> Optional[str] """ XML request for WPS execution submission as string (binary). """ return self.get("request", None)
@request.setter def request(self, request): # type: (Optional[str]) -> None """ XML request for WPS execution submission as string (binary). """ if isinstance(request, xml_util.XML): request = xml_util.tostring(request) self["request"] = request @property
[docs] def response(self): # type: () -> Optional[str] """ XML status response from WPS execution submission as string (binary). """ return self.get("response", None)
@response.setter def response(self, response): # type: (Optional[str]) -> None """ XML status response from WPS execution submission as string (binary). """ if isinstance(response, xml_util.XML): response = xml_util.tostring(response) self["response"] = response
[docs] def _job_url(self, base_url=None): # type: (Optional[str]) -> str if self.service is not None: base_url += sd.provider_service.path.format(provider_id=self.service) job_path = sd.process_job_service.path.format(process_id=self.process, return base_url + job_path
[docs] def json(self, container=None, self_link=None): # pylint: disable=W0221,arguments-differ # type: (Optional[AnySettingsContainer], Optional[str]) -> JSON """ Obtains the JSON data representation for response body. .. note:: Settings are required to update API shortcut URLs to job additional information. Without them, paths will not include the API host, which will not resolve to full URI. """ settings = get_settings(container) if container else {} job_json = { "jobID":, "processID": self.process, "providerID": self.service, "type": self.type, "status": map_status(self.status), "message": self.status_message, "created": self.created, "started": self.started, "finished": self.finished, "updated": self.updated, "duration": self.duration_str, "runningDuration": self.duration, "runningSeconds": self.duration.total_seconds() if self.duration is not None else None, # TODO: available fields not yet employed ( "nextPoll": None, "expirationDate": None, "estimatedCompletion": None, "percentCompleted": self.progress, # new name as per OGC-API, enforced integer # "progress": int(self.progress), "links": self.links(settings, self_link=self_link) } return sd.JobStatusInfo().deserialize(job_json)
[docs] def params(self): # type: () -> AnyParams return { "id":, "task_id": self.task_id, "wps_id": self.wps_id, "service": self.service, "process": self.process, "inputs": self.inputs, "outputs": self.outputs, "user_id": self.user_id, "status": self.status, "status_message": self.status_message, "status_location": self.status_location, "execution_response": self.execution_response, "execution_mode": self.execution_mode, "is_workflow": self.is_workflow, "created": self.created, "started": self.started, "finished": self.finished, "updated": self.updated, "progress": self.progress, "results": self.results, "exceptions": self.exceptions, "logs": self.logs, "tags": self.tags, "access": self.access, "context": self.context, "request": self.request, "response": self.response, "notification_email": self.notification_email, "accept_language": self.accept_language,
[docs]class AuthenticationTypes(enum.Enum):
[docs] DOCKER = "docker"
[docs] VAULT = "vault"
[docs]class Authentication(Base): """ Authentication details to store details required for process operations. """ def __init__(self, auth_scheme, auth_token, auth_link, **kwargs): # type: (str, str, Optional[str], **Any) -> None super(Authentication, self).__init__(**kwargs) # ensure values are provided and of valid format self.scheme = auth_scheme if auth_link: = auth_link self.token = auth_token self.setdefault("id", uuid.uuid4()) @property @abc.abstractmethod
[docs] def type(self): # type: () -> AuthenticationTypes raise NotImplementedError
[docs] def id(self): # type: () -> uuid.UUID _id = dict.__getitem__(self, "id") if isinstance(_id, str): return uuid.UUID(_id) return _id
@property @link.setter def link(self, link): # type: (str) -> None if not isinstance(link, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.url', not '{type(link)}'.") self["link"] = link @property
[docs] def token(self): # type: () -> str return dict.__getitem__(self, "token")
@token.setter def token(self, token): # type: (str) -> None if not isinstance(token, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.token', not '{type(token)}'.") self["token"] = token @property
[docs] def scheme(self): # type: () -> str return dict.__getitem__(self, "scheme")
@scheme.setter def scheme(self, scheme): # type: (str) -> None if not isinstance(scheme, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.scheme', not '{type(scheme)}'.") self["scheme"] = scheme
[docs] def json(self): return None # in case it bubbles up by error, never provide it as JSON
[docs] def params(self): # type: () -> AnyParams return { "id":, "type": self.type.value, "link":, "token": self.token, "scheme": self.scheme
} @classmethod
[docs] def from_params(cls, **params): # type: (**Any) -> AnyAuthentication """ Obtains the specialized :class:`Authentication` using loaded parameters from :meth:`params`. """ for param in list(params): if not param.startswith("auth_"): params[f"auth_{param}"] = params[param] auth_type = params.pop("auth_type", None) params.pop("type", None) # remove type that must be enforced by specialized class property auth_cls = list(filter(lambda auth: auth_type == auth.type.value, [DockerAuthentication, VaultFile])) if not auth_cls: raise TypeError(f"Unknown authentication type: {auth_type!s}") auth_obj = auth_cls[0](**params) keys = list(auth_obj.params()) for key in list(auth_obj): if key not in keys: del auth_obj[key] return auth_obj
[docs]class DockerAuthentication(Authentication): """ Authentication associated to a :term:`Docker` image to retrieve from a private registry given by the reference link. .. seealso:: :ref:`app_pkg_docker` """ # note: # Below regex does not match *every* possible name, but rather ones that need authentication. # Public DockerHub images for example do not require authentication, and are therefore not matched.
[docs] type = AuthenticationTypes.DOCKER
# NOTE: # Specific parameter names are important for reload from database using 'Authentication.from_params' def __init__(self, auth_scheme, auth_token, auth_link, **kwargs): # type: (str, str, str, **Any) -> None """ Initialize the authentication reference for pulling a Docker image from a protected registry. :param auth_scheme: Authentication scheme (Basic, Bearer, etc.) :param auth_token: Applied token or credentials according to specified scheme. :param auth_link: Fully qualified Docker registry image link (``{registry-url}/{image}:{label}``). :param kwargs: Additional parameters for loading contents already parsed from database. """ matches = re.match(self.DOCKER_LINK_REGEX, auth_link) if not matches: raise ValueError(f"Invalid Docker image link does not conform to expected format: [{auth_link}]") groups = matches.groupdict() LOGGER.debug("Parsed Docker image/registry link:\n%s", json.dumps(groups, indent=2)) if not groups["image"]: raise ValueError(f"Invalid Docker image reference does not conform to image format: {auth_link}") # special case for DockerHub, since it is default, it is often omitted, but could be partially provided # swap the domain by the full URI in that case because that's what is expected when doing plain 'docker login' registry = groups["reg_domain"] image = groups["image"] if registry in [self.DOCKER_REGISTRY_DEFAULT_DOMAIN, "", None]: if not registry: LOGGER.debug("No registry specified for Docker image, using default DockerHub registry.") # when "URI" fragment was detected but is not a real URI (since 'reg_domain' empty), link is invalid # (i.e.: there is no URI repository, so nowhere to send Auth token since not default DockerHub) if groups["uri"] not in [self.DOCKER_REGISTRY_DEFAULT_URI, "", None]: registry = groups["uri"] raise ValueError(f"Invalid registry specifier detected but not a valid URI: [{registry}]") registry = self.DOCKER_REGISTRY_DEFAULT_URI # otherwise, resolve the possible confusion between nested URI/paths vs nested repository/project elif groups["reg_path"]: image = groups["reg_path"] + "/" + groups["image"] LOGGER.debug("Resolved Docker image/registry from link: [%s, %s]", registry, image) self["image"] = image self["registry"] = registry super(DockerAuthentication, self).__init__( auth_scheme, auth_token, auth_link=auth_link, **kwargs ) @property
[docs] def credentials(self): # type: () -> JSON """ Generates the credentials to submit the login operation based on the authentication token and scheme. """ if self.scheme == "Basic": try: usr, pwd = decode_auth(self.token) # when token is invalid such as wrong encoding or missing ':', error is raised except ValueError: return {} return {"registry": self.registry, "username": usr, "password": pwd} # nosec return {}
[docs] def image(self): # type: () -> str """ Obtains the image portion of the reference without repository prefix. """ return dict.__getitem__(self, "image")
[docs] def registry(self): # type: () -> str """ Obtains the registry entry that must used for ``docker login {registry}``. """ return dict.__getitem__(self, "registry")
[docs] def docker(self): # type: () -> str """ Obtains the full reference required when doing :term:`Docker` operations such as ``docker pull <reference>``. """ return self.image if self.registry == self.DOCKER_REGISTRY_DEFAULT_URI else f"{self.registry}/{self.image}"
[docs] def repository(self): # type: () -> str """ Obtains the full :term:`Docker` repository reference without any tag. """ return self.docker.rsplit(":", 1)[0]
[docs] def tag(self): # type: () -> Optional[str] """ Obtain the requested tag from the :term:`Docker` reference. """ repo_tag = self.docker.rsplit(":", 1) if len(repo_tag) < 2: return None return repo_tag[-1]
[docs] def params(self): # type: () -> AnyParams params = super(DockerAuthentication, self).params() params.update({"image": self.image, "registry": self.registry}) return params
[docs]class VaultFile(Authentication): """ Dictionary that contains :term:`Vault` file and its authentication information. """
[docs] type = AuthenticationTypes.VAULT
[docs] bytes = 32
def __init__(self, file_name="", file_format=None, file_secret=None, auth_token=None, **kwargs): # type: (str, Optional[str], Optional[str], Optional[str], **Any) -> None for key in ["type", "scheme", "link", "token"]: kwargs.pop(f"auth_{key}", None) kwargs.pop(key, None) if not file_name: kwargs.setdefault("name", "") if file_format: kwargs["format"] = file_format elif not kwargs.get("format"): kwargs.pop("format", None) # avoid error setting None from reload if file_secret: kwargs["secret"] = file_secret super(VaultFile, self).__init__( auth_scheme="token", auth_link=None, # don't care auth_token=auth_token or token_hex(VaultFile.bytes), **kwargs ) @classmethod
[docs] def authorized(cls, file, token): # type: (Optional[VaultFile], Optional[str]) -> bool """ Determine whether the file access is authorized. This method should be employed to validate access and reduce impact of timing attack analysis. """ default = VaultFile("") access = file.token if file else default.token return compare_digest(str(access), str(token))
[docs] def encrypt(self, file): # type: (IO[bytes|str]) -> BytesIO """ Encrypt file data using a secret to avoid plain text contents during temporary :term:`Vault` storage. .. note:: This is not intended to be a *strong* security countermeasure as contents can still be decrypted at any time if provided with the right secret. This is only to slightly obfuscate the contents while it transits between storage phases until destruction by the consuming process. """ data = value = data.encode("utf-8") if isinstance(data, str) else data digest = Fernet(self.secret).encrypt(value) return BytesIO(digest)
[docs] def decrypt(self, file): # type: (IO[bytes|str]) -> BytesIO """ Decrypt file contents using secret. """ data = data = data.encode("utf-8") if isinstance(data, str) else data value = Fernet(self.secret).decrypt(data) return BytesIO(value)
[docs] def secret(self): # type: () -> bytes """ Secret associated to this :term:`Vault` file to hash contents back and forth. """ secret = self.get("secret") if not secret: secret = self["secret"] = Fernet.generate_key() return secret
@secret.setter def secret(self, secret): # type: (Union[bytes, str]) -> None if not secret or not isinstance(secret, (bytes, str)): raise ValueError(f"Invalid '{self.__name__}.secret' must be bytes or string.") if isinstance(secret, str): secret = base64.urlsafe_b64encode(secret.encode()) self["secret"] = secret @property
[docs] def id(self): # type: () -> uuid.UUID """ Vault file UUID to retrieve the details from storage. """ file_id = self.get("id") if not file_id: file_id = uuid.uuid4() self["id"] = file_id if isinstance(file_id, str): return uuid.UUID(file_id) return file_id
[docs] def name(self): # type: () -> str """ Name to retrieve the file. """ return dict.__getitem__(self, "name")
@name.setter def name(self, name): # type: (str) -> None if not isinstance(name, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.name'") self["name"] = name @property
[docs] def format(self): # type: () -> Optional[str] """ Format Media-Type of the file. """ return dict.get(self, "format", None)
@format.setter def format(self, media_type): # type: (str) -> None if not isinstance(media_type, str): raise TypeError(f"Type 'str' is required for '{self.__name__}.format'") self["format"] = media_type @property
[docs] def href(self): # type: () -> str """ Obtain the vault input reference corresponding to the file. This corresponds to the ``href`` value to be provided when submitting an input that should be updated using the vault file of specified UUID and using the respective authorization token in ``X-Auth-Vault`` header. """ return f"vault://{!s}"
[docs] def json(self): # type: () -> JSON body = { "file_id":, "file_href": self.href, "access_token": self.token, } return sd.VaultFileUploadedBodySchema().deserialize(body)
[docs] def params(self): # type: () -> AnyParams return { "id":, "name":, "format": self.format, "type": self.type.value, "token": self.token, "secret": self.secret, "scheme": self.scheme,
[docs]class Process(Base): # pylint: disable=C0103,invalid-name """ Dictionary that contains a process definition for db storage. It always has ``identifier`` (or ``id`` alias) and a ``package`` definition. Parameters can be accessed by key or attribute, and appropriate validators or default values will be applied. """ def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None super(Process, self).__init__(*args, **kwargs) # use both 'id' and 'identifier' to support any call (WPS and recurrent 'id') if "id" not in self and "identifier" not in self: raise TypeError("'id' OR 'identifier' is required") if "id" not in self: self["id"] = self.pop("identifier") if "package" not in self: raise TypeError("'package' is required") @property
[docs] def id(self): # type: () -> str return dict.__getitem__(self, "id")
[docs] def identifier(self): # type: () -> str return
@identifier.setter def identifier(self, value): # type: (str) -> None self["id"] = value @property
[docs] def title(self): # type: () -> str return self.get("title",
[docs] def abstract(self): # type: () -> str return self.get("abstract", "")
[docs] def description(self): # OGC-API-Processes v1 field representation # bw-compat with existing processes that defined it as abstract return self.abstract or self.get("description", "")
[docs] def keywords(self): # type: () -> List[str] keywords = self.setdefault("keywords", []) if self.type not in keywords: keywords.append(self.type) self["keywords"] = keywords return dict.__getitem__(self, "keywords")
[docs] def metadata(self): # type: () -> List[Metadata] return self.get("metadata", [])
[docs] def version(self): # type: () -> Optional[str] return self.get("version")
[docs] def inputs(self): # type: () -> Optional[List[Dict[str, JSON]]] """ Inputs of the process following backward-compatible conversion of stored parameters. According to `OGC-API`, ``maxOccurs`` and ``minOccurs`` representations should be: - ``maxOccurs``: ``int`` or ``"unbounded"`` - ``minOccurs``: ``int`` And, ``mediaType`` should be in description as: - ``mediaType``: ``string`` .. note:: Because of pre-registered/deployed/retrieved remote processes, inputs are formatted in-line to respect valid OGC-API schema representation and apply any required correction transparently. """ inputs = self.get("inputs") if inputs is not None: for _input in inputs: input_formats = get_field(_input, "formats", search_variations=False, default=[]) for fmt in input_formats: mime_type = get_field(fmt, "mime_type", search_variations=True, pop_found=True) if mime_type is not null: fmt["mediaType"] = mime_type input_min = get_field(_input, "min_occurs", search_variations=True, pop_found=True, default=1) input_max = get_field(_input, "max_occurs", search_variations=True, pop_found=True, default=1) _input["minOccurs"] = int(input_min) _input["maxOccurs"] = int(input_max) if input_max != "unbounded" else input_max input_desc = get_field(_input, "abstract", search_variations=True, pop_found=True) if input_desc: _input["description"] = input_desc input_schema = get_field(_input, "schema", search_variations=False) if input_schema: _input["schema"] = self._decode(input_schema) return inputs
[docs] def outputs(self): # type: () -> Optional[List[Dict[str, JSON]]] """ Outputs of the process following backward-compatible conversion of stored parameters. According to `OGC-API`, ``mediaType`` should be in description as: - ``mediaType``: ``string`` .. note:: Because of pre-registered/deployed/retrieved remote processes, inputs are formatted in-line to respect valid OGC-API schema representation and apply any required correction transparently. """ outputs = self.get("outputs", []) for _output in outputs: output_formats = get_field(_output, "formats", search_variations=False, default=[]) for fmt in output_formats: mime_type = get_field(fmt, "mime_type", pop_found=True, search_variations=True) if mime_type is not null: fmt["mediaType"] = mime_type output_desc = get_field(_output, "abstract", search_variations=True, pop_found=True) if output_desc: _output["description"] = output_desc output_schema = get_field(_output, "schema", search_variations=False) if output_schema: _output["schema"] = self._decode(output_schema) return outputs
[docs] def jobControlOptions(self): # noqa: N802 # type: () -> List[AnyExecuteControlOption] """ Control options that indicate which :term:`Job` execution modes are supported by the :term:`Process`. .. note:: There are no official mentions about the ordering of ``jobControlOptions``. Nevertheless, it is often expected that the first item can be considered the default mode when none is requested explicitly (at execution time). With the definition of execution mode through the ``Prefer`` header, `Weaver` has the option to decide if it wants to honor this header, according to available resources and :term:`Job` duration. For this reason, ``async`` is placed first by default when nothing was defined during deployment, since it is the preferred mode in `Weaver`. If deployment included items though, they are preserved as is. This allows to re-deploy a :term:`Process` to a remote non-`Weaver` :term:`ADES` preserving the original :term:`Process` definition. .. seealso:: Discussion about expected ordering of ``jobControlOptions``: """ # Weaver's default async only, must override explicitly during deploy if sync is needed jco_default = [ExecuteControlOption.ASYNC] jco = self.setdefault("jobControlOptions", jco_default) if not isinstance(jco, list): # eg: None, bw-compat jco = jco_default jco = [ExecuteControlOption.get(opt) for opt in jco] jco = [opt for opt in jco if opt is not None] if len(jco) == 0: jco = jco_default self["jobControlOptions"] = jco # no alpha order important! return dict.__getitem__(self, "jobControlOptions")
[docs] def outputTransmission(self): # noqa: N802 # type: () -> List[AnyExecuteTransmissionMode] out = self.setdefault("outputTransmission", ExecuteTransmissionMode.values()) if not isinstance(out, list): # eg: None, bw-compat out = [ExecuteTransmissionMode.VALUE] out = [ExecuteTransmissionMode.get(mode) for mode in out] out = [mode for mode in out if mode is not None] if len(out) == 0: out.extend(ExecuteTransmissionMode.values()) self["outputTransmission"] = list(sorted(out)) return dict.__getitem__(self, "outputTransmission")
[docs] def processDescriptionURL(self): # noqa: N802 # type: () -> Optional[str] return self.get("processDescriptionURL")
[docs] def processEndpointWPS1(self): # noqa: N802 # type: () -> Optional[str] return self.get("processEndpointWPS1")
[docs] def executeEndpoint(self): # noqa: N802 # type: () -> Optional[str] return self.get("executeEndpoint")
[docs] def owsContext(self): # noqa: N802 # type: () -> Optional[JSON] return self.get("owsContext")
# wps, workflow, etc. @property
[docs] def type(self): # type: () -> AnyProcessType """ Type of process amongst :mod:`weaver.processes.types` definitions. """ return self.get("type", ProcessType.APPLICATION)
[docs] def mutable(self): # type: () -> bool """ Indicates if a process can be modified. """ return self.type != ProcessType.BUILTIN
[docs] def deployment_profile(self): # type: () -> str base = "" _typ = self.type if _typ == ProcessType.APPLICATION: profile = base + "dockerizedApplication" elif "wps" in _typ: profile = base + "wpsApplication" else: profile = base + _typ return profile
[docs] def package(self): # type: () -> Optional[CWL] """ Package CWL definition as JSON. """ pkg = self.get("package") return self._decode(pkg) if isinstance(pkg, dict) else pkg
@package.setter def package(self, pkg): # type: (Optional[CWL]) -> None self["package"] = self._decode(pkg) if isinstance(pkg, dict) else pkg @property
[docs] def payload(self): # type: () -> JSON """ Deployment specification as JSON body. """ body = self.get("payload", {}) return self._decode(body) if isinstance(body, dict) else body
@payload.setter def payload(self, body): # type: (JSON) -> None self["payload"] = self._decode(body) if isinstance(body, dict) else {} # encode(->)/decode(<-) characters that cannot be in a key during save to db
[docs] _character_codes = [("$", "\uFF04"), (".", "\uFF0E")]
[docs] def _recursive_replace(pkg, index_from, index_to): # type: (JSON, int, int) -> JSON new = {} for k in pkg: # find modified key with replace matches c_k = k for c in Process._character_codes: c_f = c[index_from] c_t = c[index_to] if c_f in k: c_k = k.replace(c_f, c_t) # process recursive sub-items if isinstance(pkg[k], dict): pkg[k] = Process._recursive_replace(pkg[k], index_from, index_to) if isinstance(pkg[k], list): for i, pkg_i in enumerate(pkg[k]): if isinstance(pkg_i, dict): pkg[k][i] = Process._recursive_replace(pkg[k][i], index_from, index_to) # apply new key to obtained sub-items with replaced keys as needed new[c_k] = pkg[k] # note: cannot use pop when using pkg keys iterator (python 3) return new
[docs] def _encode(obj): # type: (Optional[JSON]) -> Optional[JSON] if obj is None: return None return Process._recursive_replace(obj, 0, 1)
[docs] def _decode(obj): # type: (Optional[JSON]) -> Optional[JSON] if obj is None: return None return Process._recursive_replace(obj, 1, 0)
[docs] def visibility(self): # type: () -> Visibility return Visibility.get(self.get("visibility"), Visibility.PRIVATE)
@visibility.setter def visibility(self, visibility): # type: (AnyVisibility) -> None vis = Visibility.get(visibility) if vis not in Visibility: values = list(Visibility.values()) raise ValueError( f"Status '{visibility}' is not valid for '{self.__name__}.visibility, must be one of {values!s}'" ) self["visibility"] = vis @property
[docs] def auth(self): # type: () -> Optional[AnyAuthentication] """ Authentication token required for operations with the process. """ auth = self.get("auth", None) if isinstance(auth, Authentication): return auth if isinstance(auth, dict): auth = Authentication.from_params(**auth) self["auth"] = auth # store for later reference without reprocess return auth return None
@auth.setter def auth(self, auth): # type: (Optional[AnyAuthentication]) -> None if auth is None: return if isinstance(auth, dict): auth = Authentication(**auth) if not isinstance(auth, Authentication): name = fully_qualified_name(auth) raise TypeError(f"Type 'Authentication' is required for '{self.__name__}.auth', not '{name}'.") self["auth"] = auth
[docs] def params(self): # type: () -> AnyParams return { "identifier": self.identifier, "title": self.title, "abstract": self.abstract, "keywords": self.keywords, "metadata": self.metadata, "version": self.version, # escape potential OpenAPI JSON $ref in 'schema' also used by Mongo BSON "inputs": [self._encode(_input) for _input in self.inputs or []], "outputs": [self._encode(_output) for _output in self.outputs or []], "jobControlOptions": self.jobControlOptions, "outputTransmission": self.outputTransmission, "processEndpointWPS1": self.processEndpointWPS1, "processDescriptionURL": self.processDescriptionURL, "executeEndpoint": self.executeEndpoint, "owsContext": self.owsContext, "type": self.type, "package": self._encode(self.package), "payload": self._encode(self.payload), "visibility": self.visibility, "auth": self.auth.params() if self.auth else None
} @property
[docs] def params_wps(self): # type: () -> AnyParams """ Values applicable to create an instance of :class:``. """ return { "identifier": self.identifier, "title": self.title, "abstract": self.abstract, "keywords": self.keywords, "metadata": self.metadata, "version": self.version, "inputs": self.inputs, "outputs": self.outputs, "package": self.package, "payload": self.payload,
[docs] def dict(self): # type: () -> AnyParams data = super(Process, self).dict() data.pop("auth", None) # remote preemptively just in case any deserialize fails to drop it return data
[docs] def json(self): # type: () -> JSON """ Obtains the JSON serializable complete representation of the process. """ return sd.Process().deserialize(self.dict())
[docs] def href(self, container=None): # type: (Optional[AnySettingsContainer]) -> str """ Obtain the reference URL for this :term:`Process`. """ settings = get_settings(container) base_url = get_wps_restapi_base_url(settings) if self.service: base_url += sd.provider_service.path.format(provider_id=self.service) proc_desc = base_url + sd.process_service.path.format( return proc_desc
[docs] def offering(self, schema=ProcessSchema.OGC): # type: (ProcessSchemaType) -> JSON """ Obtains the JSON serializable offering/description representation of the process. :param schema: One of values defined by :class:`sd.ProcessDescriptionSchemaQuery` to select which process description representation to generate (see each schema for details). .. note:: Property name ``offering`` is employed to differentiate from the string process ``description`` field. The result of this JSON representation is still the ``ProcessDescription`` schema. """ process = self.dict() links = self.links() process.update({ "deploymentProfile": self.deployment_profile, "links": links }) # adjust I/O definitions with missing information for both representations io_hints = {} for io_type in ["inputs", "outputs"]: io_hints[io_type] = process[io_type] process[io_type] = { get_field(io_def, "identifier", search_variations=True): io_def for io_def in process[io_type] } # When OpenAPI schema is not predefined explicitly in deployed I/O definitions, generate them dynamically. # This call allow to fill missing details for preexisting (already deployed) processes in database. # Another possible case is a deployment providing only CWL definitions, and WPS are inferred from them. # In this situation, the lack of WPS I/O altogether requires to generate OAS from I/O merge/conversion. # Deployment with OAS should have generated this field already to save time or for more precise definitions. for io_def in process[io_type].values(): io_schema = get_field(io_def, "schema", search_variations=False) if not isinstance(io_schema, dict): io_def["schema"] = json2oas_io(io_def) # force selection of schema to avoid ambiguity if str(schema or ProcessSchema.OGC).upper() == ProcessSchema.OLD: # fields nested under 'process' + I/O as lists for io_type in ["inputs", "outputs"]: process[io_type] = normalize_ordered_io(process[io_type], io_hints[io_type]) process.update({"process": dict(process)}) return sd.ProcessDescriptionOLD().deserialize(process) # process fields directly at root + I/O as mappings return sd.ProcessDescriptionOGC().deserialize(process)
[docs] def summary(self): # type: () -> JSON """ Obtains the JSON serializable summary representation of the process. """ return sd.ProcessSummary().deserialize(self.dict())
[docs] def from_wps(wps_process, **extra_params): # type: (ProcessWPS, **Any) -> Process """ Converts a :mod:`pywps` Process into a :class:`weaver.datatype.Process` using provided parameters. """ assert isinstance(wps_process, ProcessWPS) process = wps_process.json process_type = getattr(wps_process, "type", wps_process.identifier) process.update({"type": process_type, "package": None, "reference": None, "inputs": [wps2json_io(i) for i in wps_process.inputs], "outputs": [wps2json_io(o) for o in wps_process.outputs]}) process.update(**extra_params) return Process(process)
[docs] def from_ows(process, service, container, **kwargs): # type: (ProcessOWS, Service, AnySettingsContainer, **Any) -> Process """ Converts a :mod:`owslib.wps` Process to local storage :class:`weaver.datatype.Process`. """ assert isinstance(process, ProcessOWS) wps_xml_url = get_wps_url(container) wps_api_url = get_wps_restapi_base_url(container) svc_name = None if not service or wps_api_url == service.url: # local weaver process, using WPS-XML endpoint remote_service_url = wps_xml_url local_provider_url = wps_api_url svc_provider_name = "Weaver" else: svc_name = service.get("name") # can be a custom ID or identical to provider name remote_service_url = service.url local_provider_url = f"{wps_api_url}/providers/{svc_name}" svc_provider_name = service.wps() describe_process_url = f"{local_provider_url}/processes/{process.identifier}" execute_process_url = f"{describe_process_url}/jobs" package, info = ows2json(process, svc_name, remote_service_url, svc_provider_name) wps_query = f"service=WPS&request=DescribeProcess&version=1.0.0&identifier={process.identifier}" wps_description_url = f"{remote_service_url}?{wps_query}" kwargs.update({ # parameters that must be enforced to find service "url": describe_process_url, "executeEndpoint": execute_process_url, "processEndpointWPS1": wps_description_url, "processDescriptionURL": describe_process_url, "type": ProcessType.WPS_REMOTE, "package": package, "service": svc_name }) return Process(**info, **kwargs)
[docs] def service(self): # type: () -> Optional[str] """ Name of the parent service provider under which this process resides. .. seealso:: - :meth:`Service.processes` - :meth:`Process.convert` """ return self.get("service", None)
@service.setter def service(self, service): # type: (Optional[str]) -> None if not (isinstance(service, str) or service is None): raise TypeError(f"Type 'str' is required for '{self.__name__}.service'") self["service"] = service @staticmethod
[docs] def convert(process, service=None, container=None, **kwargs): # type: (AnyProcess, Optional[Service], Optional[AnySettingsContainer], **Any) -> Process """ Converts known process equivalents definitions into the formal datatype employed by Weaver. """ if isinstance(process, ProcessOWS): return Process.from_ows(process, service, container, **kwargs) if isinstance(process, ProcessWPS): return Process.from_wps(process, **kwargs) if isinstance(process, dict): return Process(process, **kwargs) if isinstance(process, Process): return process raise TypeError(f"Unknown process type to convert: [{fully_qualified_name(process)}]")
[docs] def wps(self): # type: () -> ProcessWPS """ Converts this :class:`Process` to a corresponding format understood by :mod:`pywps`. """ # import here to avoid circular import errors from weaver.processes.wps_default import HelloWPS from weaver.processes.wps_package import WpsPackage from weaver.processes.wps_testing import WpsTestProcess process_map = { HelloWPS.identifier: HelloWPS, ProcessType.TEST: WpsTestProcess, ProcessType.APPLICATION: WpsPackage, # single CWL package ProcessType.BUILTIN: WpsPackage, # local scripts ProcessType.WPS_REMOTE: WpsPackage, # remote WPS ProcessType.WORKFLOW: WpsPackage, # chaining of CWL packages } process_key = self.type if self.type == ProcessType.WPS_LOCAL: process_key = self.identifier if process_key not in process_map: ProcessInstanceError(f"Unknown process '{process_key}' in mapping.") return process_map[process_key](**self.params_wps)
[docs]class Quote(Base): """ Dictionary that contains quote information. It always has ``id`` and ``process`` keys. """ # pylint: disable=C0103,invalid-name def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None """ Initialize the quote. .. note:: Although many parameters are required to render the final quote, they are not enforced at creation since the partial quote definition is needed before it can be processed. """ super(Quote, self).__init__(*args, **kwargs) # set defaults if "status" not in self: self["status"] = QuoteStatus.SUBMITTED if "created" not in self: self["created"] = now() if "expire" not in self: self["expire"] = now() + timedelta(days=1) if "id" not in self: self["id"] = uuid.uuid4() @property
[docs] def id(self): # type: () -> uuid.UUID """ Quote ID. """ return dict.__getitem__(self, "id")
[docs] def detail(self): # type: () -> Optional[str] return self.get("detail")
@detail.setter def detail(self, detail): # type: (str) -> None if detail is None and self.detail is not None: return if not isinstance(detail, str): raise TypeError(f"String required for '{self.__name__}.detail'.") self["detail"] = detail @property
[docs] def status(self): # type: () -> QuoteStatus return QuoteStatus.get(self.get("status"), QuoteStatus.SUBMITTED)
@status.setter def status(self, status): # type: (AnyQuoteStatus) -> None value = QuoteStatus.get(status) if value not in QuoteStatus: statuses = list(QuoteStatus.values()) name = self.__name__ raise ValueError(f"Status '{status}' is not valid for '{name}.status', must be one of {statuses!s}'") prev = self.status if ( (value == QuoteStatus.SUBMITTED and prev != QuoteStatus.SUBMITTED) or (value == QuoteStatus.PROCESSING and prev == QuoteStatus.COMPLETED) ): LOGGER.error("Cannot revert back to previous quote status (%s => %s)", value, self.status) LOGGER.debug(traceback.extract_stack()) return self["status"] = value @property
[docs] def user(self): # type: () -> Optional[Union[str, int]] """ User ID requesting the quote. """ return dict.__getitem__(self, "user")
@user.setter def user(self, user): # type: (Optional[Union[str, int]]) -> None if not isinstance(user, (str, int, type(None))): raise ValueError(f"Field '{self.__name__}.user' must be a string, integer or None.") self["user"] = user @property
[docs] def process(self): # type: () -> str """ Process ID. """ return dict.__getitem__(self, "process")
@process.setter def process(self, process): # type: (str) -> None if not isinstance(process, str) or not len(process): raise ValueError(f"Field '{self.__name__}.process' must be a string.") self["process"] = process @property
[docs] def seconds(self): # type: () -> int """ Estimated time of the process execution in seconds. """ return self.get("seconds") or 0
@seconds.setter def seconds(self, seconds): # type: (int) -> None if not isinstance(seconds, int): raise TypeError(f"Invalid estimated duration type for '{self.__name__}.seconds'.") if seconds < 0: raise ValueError(f"Invalid estimated duration value for '{self.__name__}.seconds'.") self["seconds"] = seconds @property
[docs] def duration(self): # type: () -> timedelta """ Duration as delta time that can be converted to ISO-8601 format (``P[n]Y[n]M[n]DT[n]H[n]M[n]S``). """ return timedelta(seconds=self.seconds)
[docs] def duration_str(self): # type: () -> str """ Human-readable duration in formatted as ``hh:mm:ss``. """ duration = self.duration if duration is None: return "00:00:00" return str(duration).split(".", 1)[0].zfill(8)
[docs] def parameters(self): # type: () -> QuoteProcessParameters """ Process execution parameters for quote. This should include minimally the inputs and expected outputs, but could be extended as needed with relevant details for quoting algorithm. """ params = dict.pop(self, "processParameters", None) # backward compatibility if params and "parameters" not in self: self.parameters = params params = self.get("parameters", {}) return params
@parameters.setter def parameters(self, data): # type: (QuoteProcessParameters) -> None try: sd.QuoteProcessParameters().deserialize(data) except colander.Invalid: LOGGER.error("Invalid process parameters for quote submission.\n%s", repr_json(data, indent=2)) raise TypeError("Invalid process parameters for quote submission.") self["parameters"] = data
[docs] processParameters = parameters # noqa # backward compatible alias
[docs] def price(self): # type: () -> float # FIXME: decimal? """ Price of the current quote. """ return self.get("price", 0.0)
@price.setter def price(self, price): # type: (float) -> None if not isinstance(price, float): raise ValueError(f"Field '{self.__name__}.price' must be a floating point number.") self["price"] = price @property
[docs] def currency(self): # type: () -> Optional[str] """ Currency of the quote price. """ currency = self.get("currency") if not self.price: # zero/undefined price valid to have no currency return currency return currency or "CAN" # some default if not specified but price is defined
@currency.setter def currency(self, currency): if not isinstance(currency, str) or not re.match(r"^[A-Z]{3}$", currency): raise ValueError(f"Field '{self.__name__}.currency' must be an ISO-4217 currency string code.") self["currency"] = currency
[docs] expire = LocalizedDateTimeProperty(doc="Quote expiration datetime.")
[docs] created = LocalizedDateTimeProperty(doc="Quote creation datetime.", default_now=True)
[docs] def steps(self): # type: () -> List[uuid.UUID] """ Sub-quote IDs if applicable. """ return self.get("steps", [])
[docs] def params(self): # type: () -> AnyParams return { "id":, "detail": self.detail, "status": self.status, "price": self.price, "currency": self.currency, "user": self.user, "process": self.process, "steps": self.steps, "created": self.created, "expire": self.expire, "seconds": self.seconds, "parameters": self.parameters,
[docs] def partial(self): # type: () -> JSON """ Submitted :term:`Quote` representation with minimal details until evaluation is completed. """ data = { "id":, "status": self.status, "processID": self.process } return sd.PartialQuoteSchema().deserialize(data)
[docs] def json(self): # type: () -> JSON """ Step :term:`Quote` with :term:`JSON` representation. .. note:: Does not include derived :term:`Quote` details if the associated :term:`Process` is a :term:`Workflow`. """ data = self.dict() data.update(self.partial()) data.update({ "userID": self.user, "estimatedTime": self.duration_str, "estimatedSeconds": self.seconds, "estimatedDuration": self.duration, "processParameters": self.parameters, }) return sd.Quotation().deserialize(data)
[docs] def href(self, container=None): # type: (Optional[AnySettingsContainer]) -> str """ Obtain the reference URL for this :term:`Quote`. """ settings = get_settings(container) base_url = get_wps_restapi_base_url(settings) quote_url = base_url + sd.quote_service.path.format( return quote_url
[docs]class Bill(Base): """ Dictionary that contains bill information. It always has ``id``, ``user``, ``quote`` and ``job`` keys. """ def __init__(self, *args, **kwargs): super(Bill, self).__init__(*args, **kwargs) if "quote" not in self: raise TypeError("Field 'Bill.quote' is required") if not isinstance(self.get("quote"), str): raise ValueError("Field 'Bill.quote' must be a string.") if "job" not in self: raise TypeError("Field 'Bill.job' is required") if not isinstance(self.get("job"), str): raise ValueError("Field 'Bill.job' must be a string.") if "user" not in self: raise TypeError("Field 'Bill.user' is required") if not isinstance(self.get("user"), str): raise ValueError("Field 'Bill.user' must be a string.") if "price" not in self: raise TypeError("Field 'Bill.price' is required") if not isinstance(self.get("price"), float): raise ValueError("Field 'Bill.price' must be a float number.") if "currency" not in self: raise TypeError("Field 'Bill.currency' is required") if not isinstance(self.get("currency"), str) or len(self.get("currency")) != 3: raise ValueError("Field 'Bill.currency' must be an ISO-4217 currency string code.") if "created" not in self: self["created"] = now() try: self["created"] = dt_parse(str(self.get("created"))).isoformat() except ValueError: raise ValueError("Field 'Bill.created' must be an ISO-8601 datetime string.") if "id" not in self: self["id"] = uuid.uuid4() @property
[docs] def id(self): """ Bill ID. """ return dict.__getitem__(self, "id")
[docs] def user(self): """ User ID. """ return dict.__getitem__(self, "user")
[docs] def quote(self): """ Quote ID. """ return dict.__getitem__(self, "quote")
[docs] def job(self): """ Job ID. """ return dict.__getitem__(self, "job")
[docs] def price(self): """ Price of the current quote. """ return self.get("price", 0.0)
[docs] def currency(self): """ Currency of the quote price. """ return self.get("currency")
[docs] def created(self): """ Quote creation datetime. """ return self.get("created")
[docs] def title(self): """ Quote title. """ return self.get("title")
[docs] def description(self): """ Quote description. """ return self.get("description")
[docs] def params(self): # type: () -> AnyParams return { "id":, "user": self.user, "quote": self.quote, "job": self.job, "price": self.price, "currency": self.currency, "created": self.created, "title": self.title, "description": self.description,
[docs] def json(self): # type: () -> JSON return sd.BillSchema().deserialize(self)