Source code for weaver.datatype

"""
Definitions of types used by tokens.
"""
import copy
import enum
import inspect
import json
import re
import traceback
import uuid
import warnings
from datetime import datetime, timedelta
from logging import ERROR, INFO, Logger, getLevelName, getLogger
from typing import TYPE_CHECKING
from urllib.parse import urljoin, urlparse

import pyramid.httpexceptions
import requests.exceptions
from dateutil.parser import parse as dt_parse
from docker.auth import decode_auth
from owslib.wps import Process as ProcessOWS, WPSException
from pywps import Process as ProcessWPS

from weaver import xml_util
from weaver.exceptions import ProcessInstanceError, ServiceParsingError
from weaver.execute import (
    EXECUTE_CONTROL_OPTION_ASYNC,
    EXECUTE_CONTROL_OPTIONS,
    EXECUTE_MODE_ASYNC,
    EXECUTE_MODE_OPTIONS,
    EXECUTE_MODE_SYNC,
    EXECUTE_TRANSMISSION_MODE_OPTIONS,
    EXECUTE_TRANSMISSION_MODE_REFERENCE
)
from weaver.formats import ACCEPT_LANGUAGE_EN_CA, CONTENT_TYPE_APP_JSON, CONTENT_TYPE_APP_XML
from weaver.processes.convert import get_field, null, ows2json, wps2json_io
from weaver.processes.types import (
    PROCESS_APPLICATION,
    PROCESS_BUILTIN,
    PROCESS_TEST,
    PROCESS_WORKFLOW,
    PROCESS_WPS_LOCAL,
    PROCESS_WPS_REMOTE,
    PROCESS_WPS_TYPES
)
from weaver.status import (
    JOB_STATUS_CATEGORIES,
    JOB_STATUS_CATEGORY_FINISHED,
    JOB_STATUS_VALUES,
    STATUS_ACCEPTED,
    STATUS_RUNNING,
    STATUS_SUCCEEDED,
    STATUS_UNKNOWN,
    map_status
)
from weaver.utils import localize_datetime  # for backward compatibility of previously saved jobs not time-locale-aware
from weaver.utils import (
    fully_qualified_name,
    get_job_log_msg,
    get_log_date_fmt,
    get_log_fmt,
    get_settings,
    now,
    request_extra
)
from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_VALUES
from weaver.warning import NonBreakingExceptionWarning
from weaver.wps.utils import get_wps_client, get_wps_url
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url

if TYPE_CHECKING:
    from typing import Any, Dict, List, Optional, Union

    from owslib.wps import WebProcessingService

    from weaver.typedefs import AnyProcess, AnySettingsContainer, Number, CWL, JSON

[docs] AnyParams = Dict[str, Any]
AnyAuthentication = Union["Authentication", "DockerAuthentication"]
[docs]LOGGER = getLogger(__name__)
[docs]class Base(dict): """ Dictionary with extended attributes auto-``getter``/``setter`` for convenience. Explicitly overridden ``getter``/``setter`` attributes are called instead of ``dict``-key ``get``/``set``-item to ensure corresponding checks and/or value adjustments are executed before applying it to the sub-``dict``. """ def __setattr__(self, item, value): """ Uses an existing property setter if defined in the subclass or employs the default dictionary setter otherwise. """ prop = getattr(type(self), item) if isinstance(prop, property) and prop.fset is not None: prop.fset(self, value) # noqa else: super(Base, self).__setitem__(item, value) def __getitem__(self, item): """ Uses an existing property getter if defined in the subclass or employs the default dictionary getter otherwise. """ prop = getattr(type(self), item) if isinstance(prop, property) and prop.fget is not None: return prop.fget(self) # noqa elif item in self: return getattr(self, item, None) else: raise AttributeError(f"Can't get attribute '{item}' in '{type(self)}'.") def __str__(self): # type: () -> str return "{0} <{1}>".format(type(self).__name__, self.id) def __repr__(self): # type: () -> str cls = type(self) repr_ = dict.__repr__(self) return "{0}.{1} ({2})".format(cls.__module__, cls.__name__, repr_) @property
[docs] def id(self): raise NotImplementedError()
@property
[docs] def uuid(self): return self.id
[docs] def json(self): # type: () -> JSON """ Obtain the JSON data representation for response body. .. note:: This method implementation should validate the JSON schema against the API definition whenever applicable to ensure integrity between the represented data type and the expected API response. """ raise NotImplementedError("Method 'json' must be defined for JSON request item representation.")
[docs] def params(self): # type: () -> AnyParams """ Obtain the internal data representation for storage. .. note:: This method implementation should provide a JSON-serializable definition of all fields representing the object to store. """ raise NotImplementedError("Method 'params' must be defined for storage item representation.")
[docs] def dict(self): """ Generate a dictionary representation of the object, but with inplace resolution of attributes as applicable. """ # update any entries by key with their attribute _dict = {key: getattr(self, key, dict.__getitem__(self, key)) for key, val in self.items()} # then, ensure any missing key gets added if a getter property exists for it props = {prop[0] for prop in inspect.getmembers(self) if not prop[0].startswith("_") and prop[0] not in _dict} for key in props: prop = getattr(type(self), key) if isinstance(prop, property) and prop.fget is not None: _dict[key] = prop.fget(self) # noqa return _dict
[docs]class Service(Base): """ Dictionary that contains OWS services. It always has ``url`` key. """ def __init__(self, *args, **kwargs): super(Service, self).__init__(*args, **kwargs) if "name" not in self: raise TypeError("Service 'name' is required") if "url" not in self: raise TypeError("Service 'url' is required") self["_wps"] = None @property
[docs] def id(self): return self.name
@property
[docs] def url(self): """ Service URL. """ return dict.__getitem__(self, "url")
@property
[docs] def name(self): """ Service name. """ return dict.__getitem__(self, "name")
@property
[docs] def type(self): """ Service type. """ return self.get("type", PROCESS_WPS_REMOTE)
@property
[docs] def public(self): """ Flag if service has public access. """ # TODO: public access can be set via auth parameter. return self.get("public", False)
@property
[docs] def auth(self): """ Authentication method: public, token, cert. """ return self.get("auth", "token")
[docs] def json(self): # type: () -> JSON # TODO: apply swagger type deserialize schema check if returned in a response return self.params()
[docs] def params(self): # type: () -> AnyParams return { "url": self.url, "name": self.name, "type": self.type, "public": self.public, "auth": self.auth
}
[docs] def wps(self, container=None, **kwargs): # type: (AnySettingsContainer, Any) -> WebProcessingService """ Obtain the remote WPS service definition and metadata. Stores the reference locally to avoid re-fetching it needlessly for future reference. """ try: _wps = self.get("_wps") if _wps is None: # client retrieval could also be cached if recently fetched an not yet invalidated self["_wps"] = _wps = get_wps_client(self.url, container=container, **kwargs) return _wps except xml_util.ParseError as exc: msg = "Invalid XML returned by WPS [{}] at [{}] cannot be parsed.".format(self.name, self.url) raise ServiceParsingError(json={"description": msg, "cause": str(exc), "error": exc.__class__.__name__})
[docs] def metadata(self, container): # type: (AnySettingsContainer) -> List[JSON] """ Obtains the metadata relevant to the service provider. """ wps = self.wps(container=container) wps_lang = wps.language # FIXME: add more metadata retrieved from 'wps.identification' and 'wps.provider.contact' (?) # if so, should be included only in "long description", while "summary" only returns below info meta = [ { "type": "provider-name", "title": "Provider Name", "role": "http://www.opengis.net/eoc/applicationContext/providerMetadata", "value": wps.provider.name, "lang": wps_lang }, { "type": "provider-site", "title": "Provider Name", "role": "http://www.opengis.net/eoc/applicationContext/providerMetadata", "value": wps.provider.url, "lang": wps_lang }, { "type": "contact-name", "title": "Contact Name", "role": "http://www.opengis.net/eoc/applicationContext/providerMetadata", "value": wps.provider.contact.name, "lang": wps_lang } ] return meta
[docs] def keywords(self, container=None): # type: (AnySettingsContainer) -> List[str] """ Obtains the keywords relevant to the service provider. """ wps = self.wps(container=container) return wps.identification.keywords
[docs] def summary(self, container, fetch=True, ignore=False): # type: (AnySettingsContainer, bool, bool) -> Optional[JSON] """ Obtain the summary information from the provider service. When metadata fetching is disabled, the generated summary will contain only information available locally. :param container: employed to retrieve application settings. :param fetch: indicates whether metadata should be fetched from remote. :param ignore: indicates if failing metadata retrieval/parsing should be silently discarded or raised. :return: generated summary information. """ try: # FIXME: not implemented (https://github.com/crim-ca/weaver/issues/130) if self.type.lower() not in PROCESS_WPS_TYPES: return None # basic information always available (local) data = { "id": self.name, "url": self.url, # remote URL (bw-compat, also in links) "type": PROCESS_WPS_REMOTE, "public": self.public, "links": self.links(container, fetch=fetch), } # retrieve more metadata from remote if possible and requested if fetch: wps = self.wps(container) data.update({ "title": getattr(wps.identification, "title", None), "description": getattr(wps.identification, "abstract", None), "keywords": self.keywords(container), "metadata": self.metadata(container), }) return sd.ProviderSummarySchema().deserialize(data) except ServiceParsingError as exc: err_msg = repr(exc) warnings.warn(err_msg, NonBreakingExceptionWarning) LOGGER.debug(err_msg, exc_info=exc) if ignore: return None raise except Exception as exc: msg = "Exception occurred while fetching WPS [{}] at [{}]".format(self.name, self.url) err_msg = "{}: {!r}".format(msg, exc) warnings.warn(err_msg, NonBreakingExceptionWarning) LOGGER.debug(err_msg, exc_info=exc) if ignore: return None raise ServiceParsingError(json={"description": msg, "cause": str(exc), "error": exc.__class__.__name__})
[docs] def processes(self, container): # type: (AnySettingsContainer) -> List[Process] """ Obtains a list of remote service processes in a compatible :class:`weaver.datatype.Process` format. Note: remote processes won't be stored to the local process storage. """ # FIXME: support other providers (https://github.com/crim-ca/weaver/issues/130) if self.type.lower() not in PROCESS_WPS_TYPES: return [] wps = self.wps(container) settings = get_settings(container) return [Process.convert(process, self, settings) for process in wps.processes]
[docs] def check_accessible(self, settings, ignore=True): # type: (AnySettingsContainer, bool) -> bool """ Verify if the service URL is accessible. """ try: # some WPS don't like HEAD request, so revert to normal GetCapabilities # otherwise use HEAD because it is faster to only 'ping' the service if self.type.lower() in PROCESS_WPS_TYPES: meth = "GET" url = "{}?service=WPS&request=GetCapabilities".format(self.url) else: meth = "HEAD" url = self.url # - allow 500 for services that incorrectly handle invalid request params, but at least respond # (should be acceptable in this case because the 'ping' request is not necessarily well formed) # - allow 400/405 for bad request/method directly reported by the service for the same reasons # - enforce quick timeout (but don't allow 408 code) to avoid long pending connexions that never resolve allowed_codes = [200, 400, 405, 500] resp = request_extra(meth, url, timeout=2, settings=settings, allowed_codes=allowed_codes) return resp.status_code in allowed_codes except (requests.exceptions.RequestException, pyramid.httpexceptions.HTTPException) as exc: msg = "Exception occurred while checking service [{}] accessibility at [{}]".format(self.name, self.url) warnings.warn("{}: {!r}".format(msg, exc), NonBreakingExceptionWarning) if not ignore: raise ServiceParsingError(json={ "description": msg, "cause": "Cannot validate or parse service metadata since it is not accessible.", "error": exc.__class__.__name__ }) return False
[docs]class Job(Base): """ Dictionary that contains OWS service jobs. It always has ``id`` and ``task_id`` keys. """ def __init__(self, *args, **kwargs): super(Job, self).__init__(*args, **kwargs) if "task_id" not in self: raise TypeError("Parameter 'task_id' is required for '{}' creation.".format(type(self))) if not isinstance(self.id, str): raise TypeError("Type 'str' is required for '{}.id'".format(type(self)))
[docs] def _get_log_msg(self, msg=None, status=None, progress=None): # type: (Optional[str], Optional[str], Optional[Number]) -> str if not msg: msg = self.status_message status = map_status(status or self.status) progress = max(0, min(100, progress or self.progress)) return get_job_log_msg(duration=self.duration_str, progress=progress, status=status, message=msg)
@staticmethod
[docs] def _get_err_msg(error): # type: (WPSException) -> str return "{0.text} - code={0.code} - locator={0.locator}".format(error)
[docs] def save_log(self, errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]] logger=None, # type: Optional[Logger] message=None, # type: Optional[str] level=INFO, # type: int status=None, # type: Optional[str] progress=None, # type: Optional[Number] ): # type: (...) -> None """ Logs the specified error and/or message, and adds the log entry to the complete job log. For each new log entry, additional :class:`Job` properties are added according to :meth:`Job._get_log_msg` and the format defined by :func:`get_job_log_msg`. :param errors: An error message or a list of WPS exceptions from which to log and save generated message stack. :param logger: An additional :class:`Logger` for which to propagate logged messages on top saving them to the job. :param message: Explicit string to be logged, otherwise use the current :py:attr:`Job.status_message` is used. :param level: Logging level to apply to the logged ``message``. This parameter is ignored if ``errors`` are logged. :param status: Override status applied in the logged message entry, but does not set it to the job object. Uses the current :attr:`Job.status` value if not specified. Must be one of :mod:`Weaver.status` values. :param progress: Override progress applied in the logged message entry, but does not set it to the job object. Uses the current :attr:`Job.progress` value if not specified. .. note:: The job object is updated with the log but still requires to be pushed to database to actually persist it. """ if isinstance(errors, WPSException): errors = [errors] elif isinstance(errors, Exception): errors = str(errors) if isinstance(errors, str): log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress))] self.exceptions.append(errors) elif isinstance(errors, list): log_msg = [ (ERROR, self._get_log_msg(self._get_err_msg(error), status=status, progress=progress)) for error in errors ] self.exceptions.extend([{ "Code": error.code, "Locator": error.locator, "Text": error.text } for error in errors]) else: log_msg = [(level, self._get_log_msg(message, status=status, progress=progress))] for lvl, msg in log_msg: fmt_msg = get_log_fmt() % dict(asctime=now().strftime(get_log_date_fmt()), levelname=getLevelName(lvl), name=fully_qualified_name(self), message=msg) if len(self.logs) == 0 or self.logs[-1] != fmt_msg: self.logs.append(fmt_msg) if logger: logger.log(lvl, msg)
@property
[docs] def id(self): # type: () -> str """ Job UUID to retrieve the details from storage. """ job_id = self.get("id") if not job_id: job_id = str(uuid.uuid4()) self["id"] = job_id return job_id
@property
[docs] def task_id(self): # type: () -> Optional[str] """ Reference Task UUID attributed by the ``Celery`` worker that monitors and executes this job. """ return self.get("task_id", None)
@task_id.setter def task_id(self, task_id): # type: (str) -> None if not isinstance(task_id, str): raise TypeError("Type 'str' is required for '{}.task_id'".format(type(self))) self["task_id"] = task_id @property
[docs] def wps_id(self): # type: () -> Optional[str] """ Reference WPS Request/Response UUID attributed by the executed ``PyWPS`` process. This UUID matches the status-location, log and output directory of the WPS process. This parameter is only available when the process is executed on this local instance. .. seealso:: - :attr:`Job.request` - :attr:`Job.response` """ return self.get("wps_id", None)
@wps_id.setter def wps_id(self, wps_id): # type: (str) -> None if not isinstance(wps_id, str): raise TypeError("Type 'str' is required for '{}.wps_id'".format(type(self))) self["wps_id"] = wps_id @property
[docs] def service(self): # type: () -> Optional[str] """ Service identifier of the corresponding remote process. .. seealso:: - :attr:`Service.id` """ return self.get("service", None)
@service.setter def service(self, service): # type: (Optional[str]) -> None if not isinstance(service, str) or service is None: raise TypeError("Type 'str' is required for '{}.service'".format(type(self))) self["service"] = service @property
[docs] def process(self): # type: () -> Optional[str] """ Process identifier of the corresponding remote process. .. seealso:: - :attr:`Process.id` """ return self.get("process", None)
@process.setter def process(self, process): # type: (Optional[str]) -> None if not isinstance(process, str) or process is None: raise TypeError("Type 'str' is required for '{}.process'".format(type(self))) self["process"] = process @property
[docs] def type(self): # type: () -> str """ Obtain the type of the element associated to the creation of this job. .. seealso:: - Defined in https://docs.ogc.org/DRAFTS/18-062.html#_response_6 (within ``StatusInfo`` schema). - Queried with https://docs.ogc.org/DRAFTS/18-062.html#_parameter_type """ if self.service is None: return "process" return "provider"
[docs] def _get_inputs(self): # type: () -> List[Optional[Dict[str, JSON]]] if self.get("inputs") is None: self["inputs"] = list() return dict.__getitem__(self, "inputs")
[docs] def _set_inputs(self, inputs): # type: (List[Optional[Dict[str, JSON]]]) -> None if not isinstance(inputs, list): raise TypeError("Type 'list' is required for '{}.inputs'".format(type(self))) self["inputs"] = inputs
# allows to correctly update list by ref using 'job.inputs.extend()'
[docs] inputs = property(_get_inputs, _set_inputs)
@property
[docs] def user_id(self): # type: () -> Optional[str] return self.get("user_id", None)
@user_id.setter def user_id(self, user_id): # type: (Optional[str]) -> None if not isinstance(user_id, int) or user_id is None: raise TypeError("Type 'int' is required for '{}.user_id'".format(type(self))) self["user_id"] = user_id @property
[docs] def status(self): # type: () -> str return self.get("status", STATUS_UNKNOWN)
@status.setter def status(self, status): # type: (str) -> None if status == STATUS_ACCEPTED and self.status == STATUS_RUNNING: LOGGER.debug(traceback.extract_stack()) if not isinstance(status, str): raise TypeError("Type 'str' is required for '{}.status'".format(type(self))) if status not in JOB_STATUS_VALUES: raise ValueError("Status '{0}' is not valid for '{1}.status', must be one of {2!s}'" .format(status, type(self), list(JOB_STATUS_VALUES))) self["status"] = status @property
[docs] def status_message(self): # type: () -> str return self.get("status_message", "no message")
@status_message.setter def status_message(self, message): # type: (Optional[str]) -> None if message is None: return if not isinstance(message, str): raise TypeError("Type 'str' is required for '{}.status_message'".format(type(self))) self["status_message"] = message @property
[docs] def status_location(self): # type: () -> Optional[str] return self.get("status_location", None)
@status_location.setter def status_location(self, location_url): # type: (Optional[str]) -> None if not isinstance(location_url, str) or location_url is None: raise TypeError("Type 'str' is required for '{}.status_location'".format(type(self))) self["status_location"] = location_url @property
[docs] def notification_email(self): # type: () -> Optional[str] return self.get("notification_email")
@notification_email.setter def notification_email(self, email): # type: (Optional[Union[str]]) -> None if not isinstance(email, str): raise TypeError("Type 'str' is required for '{}.notification_email'".format(type(self))) self["notification_email"] = email @property
[docs] def accept_language(self): # type: () -> Optional[str] return self.get("accept_language")
@accept_language.setter def accept_language(self, language): # type: (Optional[Union[str]]) -> None if not isinstance(language, str): raise TypeError("Type 'str' is required for '{}.accept_language'".format(type(self))) self["accept_language"] = language @property
[docs] def execute_async(self): # type: () -> bool return self.execution_mode == EXECUTE_MODE_ASYNC
@property
[docs] def execute_sync(self): # type: () -> bool return self.execution_mode == EXECUTE_MODE_SYNC
@property
[docs] def execution_mode(self): # type: () -> str return self.get("execution_mode", EXECUTE_MODE_ASYNC)
@execution_mode.setter def execution_mode(self, mode): # type: (str) -> None if not isinstance(mode, str): raise TypeError("Type 'str' is required for '{}.execution_mode'".format(type(self))) if mode not in EXECUTE_MODE_OPTIONS: raise ValueError("Invalid value for '{}.execution_mode'. Must be one of {}".format( type(self), list(EXECUTE_MODE_OPTIONS) )) self["execution_mode"] = mode @property
[docs] def is_local(self): # type: () -> bool return self.get("is_local", not self.service)
@is_local.setter def is_local(self, is_local): # type: (bool) -> None if not isinstance(is_local, bool): raise TypeError("Type 'bool' is required for '{}.is_local'".format(type(self))) self["is_local"] = is_local @property
[docs] def is_workflow(self): # type: () -> bool return self.get("is_workflow", False)
@is_workflow.setter def is_workflow(self, is_workflow): # type: (bool) -> None if not isinstance(is_workflow, bool): raise TypeError("Type 'bool' is required for '{}.is_workflow'".format(type(self))) self["is_workflow"] = is_workflow @property
[docs] def created(self): # type: () -> datetime created = self.get("created", None) if not created: self["created"] = now() return localize_datetime(self.get("created"))
@property
[docs] def started(self): # type: () -> Optional[datetime] started = self.get("started", None) if not started: return None return localize_datetime(started)
@started.setter def started(self, started): # type: (datetime) -> None if not isinstance(started, datetime): raise TypeError("Type 'datetime' is required for '{}.started'".format(type(self))) self["started"] = started @property
[docs] def finished(self): # type: () -> Optional[datetime] return self.get("finished", None)
@property
[docs] def is_finished(self): # type: () -> bool return self.finished is not None
[docs] def mark_finished(self): # type: () -> None self["finished"] = now()
@property
[docs] def updated(self): # type: () -> datetime updated = self.get("updated") # backward compatibility when not already set if not updated: if self.status == map_status(STATUS_ACCEPTED): updated = self.created elif self.is_finished: updated = self.finished else: updated = self.started updated = localize_datetime(updated or now()) self.updated = updated # apply to remain static until saved return localize_datetime(updated)
@updated.setter def updated(self, updated): # type: (datetime) -> None if not isinstance(updated, datetime): raise TypeError("Type 'datetime' is required for '{}.updated'".format(type(self))) self["updated"] = updated @property
[docs] def duration(self): # type: () -> Optional[timedelta] if not self.started: return None final_time = self.finished or now() return localize_datetime(final_time) - localize_datetime(self.started)
@property
[docs] def duration_str(self): # type: () -> str duration = self.duration if duration is None: return "00:00:00" return str(duration).split(".")[0].zfill(8) # "HH:MM:SS"
@property
[docs] def progress(self): # type: () -> Number return self.get("progress", 0)
@progress.setter def progress(self, progress): # type: (Number) -> None if not isinstance(progress, (int, float)): raise TypeError("Number is required for '{}.progress'".format(type(self))) if progress < 0 or progress > 100: raise ValueError("Value must be in range [0,100] for '{}.progress'".format(type(self))) self["progress"] = progress
[docs] def _get_results(self): # type: () -> List[Optional[Dict[str, JSON]]] if self.get("results") is None: self["results"] = list() return dict.__getitem__(self, "results")
[docs] def _set_results(self, results): # type: (List[Optional[Dict[str, JSON]]]) -> None if not isinstance(results, list): raise TypeError("Type 'list' is required for '{}.results'".format(type(self))) self["results"] = results
# allows to correctly update list by ref using 'job.results.extend()'
[docs] results = property(_get_results, _set_results)
[docs] def _get_exceptions(self): # type: () -> List[Optional[Dict[str, str]]] if self.get("exceptions") is None: self["exceptions"] = list() return dict.__getitem__(self, "exceptions")
[docs] def _set_exceptions(self, exceptions): # type: (List[Optional[Dict[str, str]]]) -> None if not isinstance(exceptions, list): raise TypeError("Type 'list' is required for '{}.exceptions'".format(type(self))) self["exceptions"] = exceptions
# allows to correctly update list by ref using 'job.exceptions.extend()'
[docs] exceptions = property(_get_exceptions, _set_exceptions)
[docs] def _get_logs(self): # type: () -> List[Dict[str, str]] if self.get("logs") is None: self["logs"] = list() return dict.__getitem__(self, "logs")
[docs] def _set_logs(self, logs): # type: (List[Dict[str, str]]) -> None if not isinstance(logs, list): raise TypeError("Type 'list' is required for '{}.logs'".format(type(self))) self["logs"] = logs
# allows to correctly update list by ref using 'job.logs.extend()'
[docs] logs = property(_get_logs, _set_logs)
[docs] def _get_tags(self): # type: () -> List[Optional[str]] if self.get("tags") is None: self["tags"] = list() return dict.__getitem__(self, "tags")
[docs] def _set_tags(self, tags): # type: (List[Optional[str]]) -> None if not isinstance(tags, list): raise TypeError("Type 'list' is required for '{}.tags'".format(type(self))) self["tags"] = tags
# allows to correctly update list by ref using 'job.tags.extend()'
[docs] tags = property(_get_tags, _set_tags)
@property
[docs] def access(self): # type: () -> str """ Job visibility access from execution. """ return self.get("access", VISIBILITY_PRIVATE)
@access.setter def access(self, visibility): # type: (str) -> None """ Job visibility access from execution. """ if not isinstance(visibility, str): raise TypeError("Type 'str' is required for '{}.access'".format(type(self))) if visibility not in VISIBILITY_VALUES: raise ValueError("Invalid 'visibility' value specified for '{}.access'".format(type(self))) self["access"] = visibility @property
[docs] def context(self): # type: () -> Optional[str] """ Job outputs context. """ return self.get("context") or None
@context.setter def context(self, context): # type: (Optional[str]) -> None """ Job outputs context. """ if not (isinstance(context, str) or context is None): raise TypeError("Type 'str' or 'None' is required for '{}.context'".format(type(self))) self["context"] = context @property
[docs] def request(self): # type: () -> Optional[str] """ XML request for WPS execution submission as string (binary). """ return self.get("request", None)
@request.setter def request(self, request): # type: (Optional[str]) -> None """ XML request for WPS execution submission as string (binary). """ if isinstance(request, xml_util.XML): request = xml_util.tostring(request) self["request"] = request @property
[docs] def response(self): # type: () -> Optional[str] """ XML status response from WPS execution submission as string (binary). """ return self.get("response", None)
@response.setter def response(self, response): # type: (Optional[str]) -> None """ XML status response from WPS execution submission as string (binary). """ if isinstance(response, xml_util.XML): response = xml_util.tostring(response) self["response"] = response
[docs] def _job_url(self, base_url=None): if self.service is not None: base_url += sd.provider_service.path.format(provider_id=self.service) job_path = sd.process_job_service.path.format(process_id=self.process, job_id=self.id) return "{base_job_url}{job_path}".format(base_job_url=base_url, job_path=job_path)
[docs] def json(self, container=None, self_link=None): # pylint: disable=W0221,arguments-differ # type: (Optional[AnySettingsContainer], Optional[str]) -> JSON """ Obtains the JSON data representation for response body. .. note:: Settings are required to update API shortcut URLs to job additional information. Without them, paths will not include the API host, which will not resolve to full URI. """ settings = get_settings(container) if container else {} job_json = { "jobID": self.id, "processID": self.process, "providerID": self.service, "type": self.type, "status": map_status(self.status), "message": self.status_message, "created": self.created, "started": self.started, "finished": self.finished, "updated": self.updated, "duration": self.duration_str, "runningSeconds": self.duration.total_seconds() if self.duration is not None else None, # TODO: available fields not yet employed (https://github.com/crim-ca/weaver/issues/129) "nextPoll": None, "expirationDate": None, "estimatedCompletion": None, "percentCompleted": self.progress, # new name as per OGC-API, enforced integer # https://github.com/opengeospatial/ogcapi-processes/blob/master/core/openapi/schemas/statusInfo.yaml "progress": int(self.progress), "links": self.links(settings, self_link=self_link) } return sd.JobStatusInfo().deserialize(job_json)
[docs] def params(self): # type: () -> AnyParams return { "id": self.id, "task_id": self.task_id, "wps_id": self.wps_id, "service": self.service, "process": self.process, "inputs": self.inputs, "user_id": self.user_id, "status": self.status, "status_message": self.status_message, "status_location": self.status_location, "execution_mode": self.execution_mode, "is_workflow": self.is_workflow, "created": self.created, "started": self.started, "finished": self.finished, "updated": self.updated, "progress": self.progress, "results": self.results, "exceptions": self.exceptions, "logs": self.logs, "tags": self.tags, "access": self.access, "context": self.context, "request": self.request, "response": self.response, "notification_email": self.notification_email, "accept_language": self.accept_language,
}
[docs]class AuthenticationTypes(enum.Enum):
[docs] DOCKER = "docker"
[docs]class Authentication(Base): """ Authentication details to store details required for process operations. """ def __init__(self, auth_type, auth_scheme, auth_token, auth_link, **kwargs): # type: (Union[AuthenticationTypes, str], str, str, str, Any) -> None super(Authentication, self).__init__(**kwargs) # ensure values are provided and of valid format self.scheme = auth_scheme self.type = auth_type self.link = auth_link self.token = auth_token self.setdefault("id", uuid.uuid4()) @property
[docs] def id(self): return dict.__getitem__(self, "id")
@property
[docs] def type(self): # type: () -> AuthenticationTypes return dict.__getitem__(self, "type")
@type.setter def type(self, auth_type): # type: (Union[AuthenticationTypes, str]) -> None if not isinstance(auth_type, (str, AuthenticationTypes)): raise TypeError(f"Type 'AuthenticationTypes' or 'str' is required for '{type(self).__name__}.type', " f"not '{type(auth_type)}'.") auth_type = AuthenticationTypes(auth_type) # invalid raises ValueError self["type"] = auth_type @property @link.setter def link(self, link): # type: (str) -> None if not isinstance(link, str): raise TypeError(f"Type 'str' is required for '{type(self).__name__}.url', not '{type(link)}'.") self["link"] = link @property
[docs] def token(self): # type: () -> str return dict.__getitem__(self, "token")
@token.setter def token(self, token): # type: (str) -> None if not isinstance(token, str): raise TypeError(f"Type 'str' is required for '{type(self).__name__}.token', not '{type(token)}'.") self["token"] = token @property
[docs] def scheme(self): # type: () -> str return dict.__getitem__(self, "scheme")
@scheme.setter def scheme(self, scheme): # type: (str) -> None if not isinstance(scheme, str): raise TypeError(f"Type 'str' is required for '{type(self).__name__}.scheme', not '{type(scheme)}'.") self["scheme"] = scheme
[docs] def json(self): return None # in case it bubbles up by error, never provide it as JSON
[docs] def params(self): # type: () -> AnyParams return { "id": self.id, "type": self.type.value, "link": self.link, "token": self.token, "scheme": self.scheme
} @classmethod
[docs] def from_params(cls, **params): # type: (Any) -> AnyAuthentication """ Obtains the specialized :class:`Authentication` using loaded parameters from :meth:`params`. """ for param in list(params): if not param.startswith("auth_"): params[f"auth_{param}"] = params[param] auth_type = params.get("auth_type") if auth_type and AuthenticationTypes(auth_type) == AuthenticationTypes.DOCKER: return DockerAuthentication.from_params(**params) raise TypeError(f"Unknown authentication type: {auth_type!s}")
[docs]class DockerAuthentication(Authentication): """ Authentication associated to a :term:`Docker` image to retrieve from a private registry given by the reference link. .. seealso:: :ref:`app_pkg_docker` """ # note: # Below regex does not match *every* possible name, but rather ones that need authentication. # Public DockerHub images for example do not require authentication, and are therefore not matched.
[docs] DOCKER_REGISTRY_DEFAULT_DOMAIN = "index.docker.io"
[docs] DOCKER_REGISTRY_DEFAULT_URI = f"https://{DOCKER_REGISTRY_DEFAULT_DOMAIN}/v1/" # DockerHub
def __init__(self, auth_scheme, auth_token, docker_image_link, **kwargs): # type: (str, str, str, Any) -> None matches = re.match(self.DOCKER_LINK_REGEX, docker_image_link) if not matches: raise ValueError(f"Invalid Docker image link does not conform to expected format: [{docker_image_link}]") groups = matches.groupdict() LOGGER.debug("Parsed Docker image/registry link:\n%s", json.dumps(groups, indent=2)) if not groups["image"]: raise ValueError(f"Invalid Docker image reference does not conform to image format: {docker_image_link}") # special case for DockerHub, since it is default, it is often omitted, but could be partially provided # swap the domain by the full URI in that case because that's what is expected when doing plain 'docker login' registry = groups["reg_domain"] image = groups["image"] if registry in [self.DOCKER_REGISTRY_DEFAULT_DOMAIN, "", None]: if not registry: LOGGER.debug("No registry specified for Docker image, using default DockerHub registry.") # when "URI" fragment was detected but is not a real URI (since 'reg_domain' empty), link is invalid # (i.e.: there is no URI repository, so nowhere to send Auth token since not default DockerHub) if groups["uri"] not in [self.DOCKER_REGISTRY_DEFAULT_URI, "", None]: registry = groups["uri"] raise ValueError(f"Invalid registry specifier detected but not a valid URI: [{registry}]") registry = self.DOCKER_REGISTRY_DEFAULT_URI # otherwise, resolve the possible confusion between nested URI/paths vs nested repository/project elif groups["reg_path"]: image = groups["reg_path"] + "/" + groups["image"] LOGGER.debug("Resolved Docker image/registry from link: [%s, %s]", registry, image) self["image"] = image self["registry"] = registry super(DockerAuthentication, self).__init__( AuthenticationTypes.DOCKER, auth_scheme, auth_token, auth_link=docker_image_link, **kwargs ) @property
[docs] def credentials(self): # type: () -> JSON """ Generates the credentials to submit the login operation based on the authentication token and scheme. """ if self.scheme == "Basic": try: usr, pwd = decode_auth(self.token) # when token is invalid such as wrong encoding or missing ':', error is raised except ValueError: return {} return {"registry": self.registry, "username": usr, "password": pwd} # nosec return {}
@property
[docs] def image(self): # type: () -> str """ Obtains the image portion of the reference without repository prefix. """ return dict.__getitem__(self, "image")
@property
[docs] def registry(self): # type: () -> str """ Obtains the registry entry that must used for ``docker login <registry>``. """ return dict.__getitem__(self, "registry")
@property
[docs] def docker(self): # type: () -> str """ Obtains the full reference required when doing :term:`Docker` operations such as ``docker pull <reference>``. """ return self.image if self.registry == self.DOCKER_REGISTRY_DEFAULT_URI else f"{self.registry}/{self.image}"
@property
[docs] def repository(self): # type: () -> str """ Obtains the full :term:`Docker` repository reference without any tag. """ return self.docker.rsplit(":", 1)[0]
@property
[docs] def tag(self): # type: () -> Optional[str] """ Obtain the requested tag from the :term:`Docker` reference. """ repo_tag = self.docker.rsplit(":", 1) if len(repo_tag) < 2: return None return repo_tag[-1]
[docs] def params(self): # type: () -> AnyParams params = super(DockerAuthentication, self).params() params.update({"image": self.image, "registry": self.registry}) return params
@classmethod
[docs] def from_params(cls, **params): # type: (Any) -> DockerAuthentication """ Generate class with parameters directly skipping validation/parsing from initialization. .. warning:: This should be reserved for self-manipulation only when resolving :class:`Authentication` type. """ auth = Authentication(**params) object.__setattr__(auth, "__class__", DockerAuthentication) # avoid setattr from inherited dict == insert key # flush anything irrelevant or duplicate data from passing around fields keys = list(auth.params()) for key in list(auth): if key not in keys: del auth[key] return auth
[docs]class Process(Base): # pylint: disable=C0103,invalid-name """ Dictionary that contains a process definition for db storage. It always has ``identifier`` (or ``id`` alias) and a ``package`` definition. Parameters can be accessed by key or attribute, and appropriate validators or default values will be applied. """ def __init__(self, *args, **kwargs): super(Process, self).__init__(*args, **kwargs) # use both 'id' and 'identifier' to support any call (WPS and recurrent 'id') if "id" not in self and "identifier" not in self: raise TypeError("'id' OR 'identifier' is required") if "id" not in self: self["id"] = self.pop("identifier") if "package" not in self: raise TypeError("'package' is required") @property
[docs] def id(self): # type: () -> str return dict.__getitem__(self, "id")
@property
[docs] def identifier(self): # type: () -> str return self.id
@identifier.setter def identifier(self, value): # type: (str) -> None self["id"] = value @property
[docs] def title(self): # type: () -> str return self.get("title", self.id)
@property
[docs] def abstract(self): # type: () -> str return self.get("abstract", "")
@property
[docs] def description(self): # OGC-API-Processes v1 field representation # bw-compat with existing processes that defined it as abstract return self.abstract or self.get("description", "")
@property
[docs] def keywords(self): # type: () -> List[str] keywords = self.setdefault("keywords", []) if self.type not in keywords: keywords.append(self.type) self["keywords"] = keywords return dict.__getitem__(self, "keywords")
@property
[docs] def metadata(self): # type: () -> List[str] return self.get("metadata", [])
@property
[docs] def version(self): # type: () -> Optional[str] return self.get("version")
@property
[docs] def inputs(self): # type: () -> Optional[List[Dict[str, JSON]]] """ Inputs of the process following backward-compatible conversion of stored parameters. According to `OGC-API`, ``maxOccurs`` and ``minOccurs`` representations should be: - ``maxOccurs``: ``int`` or ``"unbounded"`` - ``minOccurs``: ``int`` And, ``mediaType`` should be in description as: - ``mediaType``: ``string`` .. note:: Because of pre-registered/deployed/retrieved remote processes, inputs are formatted in-line to respect valid OGC-API schema representation and apply any required correction transparently. """ inputs = self.get("inputs") if inputs is not None: for input_ in inputs: input_formats = get_field(input_, "formats", search_variations=False, default=[]) for fmt in input_formats: mime_type = get_field(fmt, "mime_type", search_variations=True, pop_found=True) if mime_type is not null: fmt["mediaType"] = mime_type input_min = get_field(input_, "min_occurs", search_variations=True, pop_found=True, default=1) input_max = get_field(input_, "max_occurs", search_variations=True, pop_found=True, default=1) input_["minOccurs"] = int(input_min) input_["maxOccurs"] = int(input_max) if input_max != "unbounded" else input_max input_desc = get_field(input_, "abstract", search_variations=True, pop_found=True) if input_desc: input_["description"] = input_desc return inputs
@property
[docs] def outputs(self): # type: () -> Optional[List[Dict[str, JSON]]] """ Outputs of the process following backward-compatible conversion of stored parameters. According to `OGC-API`, ``mediaType`` should be in description as: - ``mediaType``: ``string`` .. note:: Because of pre-registered/deployed/retrieved remote processes, inputs are formatted in-line to respect valid OGC-API schema representation and apply any required correction transparently. """ outputs = self.get("outputs", []) for output_ in outputs: output_formats = get_field(output_, "formats", search_variations=False, default=[]) for fmt in output_formats: mime_type = get_field(fmt, "mime_type", pop_found=True, search_variations=True) if mime_type is not null: fmt["mediaType"] = mime_type output_desc = get_field(output_, "abstract", search_variations=True, pop_found=True) if output_desc: output_["description"] = output_desc return outputs
@property
[docs] def jobControlOptions(self): # noqa: N802 # type: () -> List[str] jco = self.setdefault("jobControlOptions", [EXECUTE_CONTROL_OPTION_ASYNC]) if not isinstance(jco, list): # eg: None, bw-compat jco = [EXECUTE_CONTROL_OPTION_ASYNC] jco = [mode for mode in jco if mode in EXECUTE_CONTROL_OPTIONS] if len(jco) == 0: jco.append(EXECUTE_CONTROL_OPTION_ASYNC) self["jobControlOptions"] = jco return dict.__getitem__(self, "jobControlOptions")
@property
[docs] def outputTransmission(self): # noqa: N802 # type: () -> List[str] out = self.setdefault("outputTransmission", [EXECUTE_TRANSMISSION_MODE_REFERENCE]) if not isinstance(out, list): # eg: None, bw-compat out = [EXECUTE_TRANSMISSION_MODE_REFERENCE] out = [mode for mode in out if mode in EXECUTE_TRANSMISSION_MODE_OPTIONS] if len(out) == 0: out.append(EXECUTE_TRANSMISSION_MODE_REFERENCE) self["outputTransmission"] = out return dict.__getitem__(self, "outputTransmission")
@property
[docs] def processDescriptionURL(self): # noqa: N802 # type: () -> Optional[str] return self.get("processDescriptionURL")
@property
[docs] def processEndpointWPS1(self): # noqa: N802 # type: () -> Optional[str] return self.get("processEndpointWPS1")
@property
[docs] def executeEndpoint(self): # noqa: N802 # type: () -> Optional[str] return self.get("executeEndpoint")
@property
[docs] def owsContext(self): # noqa: N802 # type: () -> Optional[JSON] return self.get("owsContext")
# wps, workflow, etc. @property
[docs] def type(self): # type: () -> str """ Type of process amongst :mod:`weaver.processes.types` definitions. """ return self.get("type", PROCESS_APPLICATION)
@property
[docs] def package(self): # type: () -> Optional[CWL] """ Package CWL definition as JSON. """ pkg = self.get("package") return self._decode(pkg) if isinstance(pkg, dict) else pkg
@package.setter def package(self, pkg): # type: (Optional[CWL]) -> None self["package"] = self._decode(pkg) if isinstance(pkg, dict) else pkg @property
[docs] def payload(self): # type: () -> JSON """ Deployment specification as JSON body. """ body = self.get("payload", dict()) return self._decode(body) if isinstance(body, dict) else body
@payload.setter def payload(self, body): # type: (JSON) -> None self["payload"] = self._decode(body) if isinstance(body, dict) else dict() # encode(->)/decode(<-) characters that cannot be in a key during save to db
[docs] _character_codes = [("$", "\uFF04"), (".", "\uFF0E")]
@staticmethod
[docs] def _recursive_replace(pkg, index_from, index_to): # type: (JSON, int, int) -> JSON new = {} for k in pkg: # find modified key with replace matches c_k = k for c in Process._character_codes: c_f = c[index_from] c_t = c[index_to] if c_f in k: c_k = k.replace(c_f, c_t) # process recursive sub-items if isinstance(pkg[k], dict): pkg[k] = Process._recursive_replace(pkg[k], index_from, index_to) if isinstance(pkg[k], list): for i, pkg_i in enumerate(pkg[k]): if isinstance(pkg_i, dict): pkg[k][i] = Process._recursive_replace(pkg[k][i], index_from, index_to) # apply new key to obtained sub-items with replaced keys as needed new[c_k] = pkg[k] # note: cannot use pop when using pkg keys iterator (python 3) return new
@staticmethod
[docs] def _encode(obj): # type: (Optional[JSON]) -> Optional[JSON] if obj is None: return None return Process._recursive_replace(obj, 0, 1)
@staticmethod
[docs] def _decode(obj): # type: (Optional[JSON]) -> Optional[JSON] if obj is None: return None return Process._recursive_replace(obj, 1, 0)
@property
[docs] def visibility(self): # type: () -> str return self.get("visibility", VISIBILITY_PRIVATE)
@visibility.setter def visibility(self, visibility): # type: (str) -> None if not isinstance(visibility, str): raise TypeError("Type 'str' is required for '{}.visibility'".format(type(self))) if visibility not in VISIBILITY_VALUES: raise ValueError("Status '{0}' is not valid for '{1}.visibility, must be one of {2!s}'" .format(visibility, type(self), list(VISIBILITY_VALUES))) self["visibility"] = visibility @property
[docs] def auth(self): # type: () -> Optional[AnyAuthentication] """ Authentication token required for operations with the process. """ auth = self.get("auth", None) if isinstance(auth, Authentication): return auth if isinstance(auth, dict): auth = Authentication.from_params(**auth) self["auth"] = auth # store for later reference without reprocess return auth return None
@auth.setter def auth(self, auth): # type: (Optional[AnyAuthentication]) -> None if auth is None: return if isinstance(auth, dict): auth = Authentication(**auth) if not isinstance(auth, Authentication): raise TypeError(f"Type 'Authentication' is required for '{type(self)}.auth', not '{type(auth)}'.") self["auth"] = auth
[docs] def params(self): # type: () -> AnyParams return { "identifier": self.identifier, "title": self.title, "abstract": self.abstract, "keywords": self.keywords, "metadata": self.metadata, "version": self.version, "inputs": self.inputs, "outputs": self.outputs, "jobControlOptions": self.jobControlOptions, "outputTransmission": self.outputTransmission, "processEndpointWPS1": self.processEndpointWPS1, "processDescriptionURL": self.processDescriptionURL, "executeEndpoint": self.executeEndpoint, "owsContext": self.owsContext, "type": self.type, "package": self._encode(self.package), "payload": self._encode(self.payload), "visibility": self.visibility, "auth": self.auth.params() if self.auth else None
} @property
[docs] def params_wps(self): # type: () -> AnyParams """ Values applicable to create an instance of :class:`pywps.app.Process`. """ return { "identifier": self.identifier, "title": self.title, "abstract": self.abstract, "keywords": self.keywords, "metadata": self.metadata, "version": self.version, "inputs": self.inputs, "outputs": self.outputs, "package": self.package, "payload": self.payload,
}
[docs] def dict(self): data = super(Process, self).dict() data.pop("auth", None) # remote preemptively just in case any deserialize fails to drop it return data
[docs] def json(self): # type: () -> JSON """ Obtains the JSON serializable complete representation of the process. """ return sd.Process().deserialize(self.dict())
[docs] def offering(self, schema="OGC"): # type: (str) -> JSON """ Obtains the JSON serializable offering/description representation of the process. :param schema: One of values defined by :class:`sd.ProcessDescriptionSchemaQuery` to select which process description representation to generate (see each schema for details). .. note:: Property name ``offering`` is employed to differentiate from the string process ``description`` field. The result of this JSON representation is still the ``ProcessDescription`` schema. """ process = self.dict() links = self.links() # force selection of schema to avoid ambiguity if str(schema or "OGC").upper() == "OLD": # nested process fields + I/O as lists process.update({"process": dict(process)}) process.update(links) return sd.ProcessDescriptionOLD().deserialize(process) # direct process + I/O as mappings for io_type in ["inputs", "outputs"]: process[io_type] = { get_field(io_def, "identifier", search_variations=True, pop_found=True): io_def for io_def in process[io_type] } process.update(links) return sd.ProcessDescriptionOGC().deserialize(process)
[docs] def summary(self): # type: () -> JSON """ Obtains the JSON serializable summary representation of the process. """ return sd.ProcessSummary().deserialize(self.dict())
@staticmethod
[docs] def from_wps(wps_process, **extra_params): # type: (ProcessWPS, Any) -> Process """ Converts a :mod:`pywps` Process into a :class:`weaver.datatype.Process` using provided parameters. """ assert isinstance(wps_process, ProcessWPS) process = wps_process.json process_type = getattr(wps_process, "type", wps_process.identifier) process.update({"type": process_type, "package": None, "reference": None, "inputs": [wps2json_io(i) for i in wps_process.inputs], "outputs": [wps2json_io(o) for o in wps_process.outputs]}) process.update(**extra_params) return Process(process)
@staticmethod
[docs] def from_ows(process, service, container, **kwargs): # type: (ProcessOWS, Service, AnySettingsContainer, Any) -> Process """ Converts a :mod:`owslib.wps` Process to local storage :class:`weaver.datatype.Process`. """ assert isinstance(process, ProcessOWS) wps_xml_url = get_wps_url(container) wps_api_url = get_wps_restapi_base_url(container) svc_name = None if not service or wps_api_url == service.url: # local weaver process, using WPS-XML endpoint remote_service_url = wps_xml_url local_provider_url = wps_api_url svc_provider_name = "Weaver" else: svc_name = service.get("name") # can be a custom ID or identical to provider name remote_service_url = service.url local_provider_url = "{}/providers/{}".format(wps_api_url, svc_name) svc_provider_name = service.wps().provider.name describe_process_url = "{}/processes/{}".format(local_provider_url, process.identifier) execute_process_url = "{}/jobs".format(describe_process_url) package, info = ows2json(process, svc_name, remote_service_url, svc_provider_name) wps_description_url = "{}?service=WPS&request=DescribeProcess&version=1.0.0&identifier={}".format( remote_service_url, process.identifier ) kwargs.update({ # parameters that must be enforced to find service "url": describe_process_url, "executeEndpoint": execute_process_url, "processEndpointWPS1": wps_description_url, "processDescriptionURL": describe_process_url, "type": PROCESS_WPS_REMOTE, "package": package, "service": svc_name }) return Process(**info, **kwargs)
@property
[docs] def service(self): # type: () -> Optional[str] """ Name of the parent service provider under which this process resides. .. seealso:: - :meth:`Service.processes` - :meth:`Process.convert` """ return self.get("service", None)
@service.setter def service(self, service): # type: (Optional[str]) -> None if not (isinstance(service, str) or service is None): raise TypeError("Type 'str' is required for '{}.service'".format(type(self))) self["service"] = service @staticmethod
[docs] def convert(process, service=None, container=None, **kwargs): # type: (AnyProcess, Optional[Service], Optional[AnySettingsContainer], Any) -> Process """ Converts known process equivalents definitions into the formal datatype employed by Weaver. """ if isinstance(process, ProcessOWS): return Process.from_ows(process, service, container, **kwargs) if isinstance(process, ProcessWPS): return Process.from_wps(process, **kwargs) if isinstance(process, dict): return Process(process, **kwargs) if isinstance(process, Process): return process raise TypeError("Unknown process type to convert: [{}]".format(type(process)))
[docs] def wps(self): # type: () -> ProcessWPS """ Converts this :class:`Process` to a corresponding format understood by :mod:`pywps`. """ # import here to avoid circular import errors from weaver.processes.wps_default import HelloWPS from weaver.processes.wps_package import WpsPackage from weaver.processes.wps_testing import WpsTestProcess process_map = { HelloWPS.identifier: HelloWPS, PROCESS_TEST: WpsTestProcess, PROCESS_APPLICATION: WpsPackage, # single CWL package PROCESS_BUILTIN: WpsPackage, # local scripts PROCESS_WPS_REMOTE: WpsPackage, # remote WPS PROCESS_WORKFLOW: WpsPackage, # chaining of CWL packages } process_key = self.type if self.type == PROCESS_WPS_LOCAL: process_key = self.identifier if process_key not in process_map: ProcessInstanceError("Unknown process '{}' in mapping.".format(process_key)) return process_map[process_key](**self.params_wps)
[docs]class Quote(Base): """ Dictionary that contains quote information. It always has ``id`` and ``process`` keys. """ # pylint: disable=C0103,invalid-name def __init__(self, *args, **kwargs): super(Quote, self).__init__(*args, **kwargs) if "process" not in self: raise TypeError("Field 'Quote.process' is required") if not isinstance(self.get("process"), str): raise ValueError("Field 'Quote.process' must be a string.") if "user" not in self: raise TypeError("Field 'Quote.user' is required") if not isinstance(self.get("user"), str): raise ValueError("Field 'Quote.user' must be a string.") if "price" not in self: raise TypeError("Field 'Quote.price' is required") if not isinstance(self.get("price"), float): raise ValueError("Field 'Quote.price' must be a float number.") if "currency" not in self: raise TypeError("Field 'Quote.currency' is required") if not isinstance(self.get("currency"), str) or len(self.get("currency")) != 3: raise ValueError("Field 'Quote.currency' must be an ISO-4217 currency string code.") if "created" not in self: self["created"] = now() try: self["created"] = dt_parse(str(self.get("created"))).isoformat() except ValueError: raise ValueError("Field 'Quote.created' must be an ISO-8601 datetime string.") if "expire" not in self: self["expire"] = now() + timedelta(days=1) try: self["expire"] = dt_parse(str(self.get("expire"))).isoformat() except ValueError: raise ValueError("Field 'Quote.expire' must be an ISO-8601 datetime string.") if "id" not in self: self["id"] = str(uuid.uuid4()) @property
[docs] def id(self): """ Quote ID. """ return dict.__getitem__(self, "id")
@property
[docs] def title(self): """ Quote title. """ return self.get("title")
@property
[docs] def description(self): """ Quote description. """ return self.get("description")
@property
[docs] def details(self): """ Quote details. """ return self.get("details")
@property
[docs] def user(self): """ User ID requesting the quote. """ return dict.__getitem__(self, "user")
@property
[docs] def process(self): """ WPS Process ID. """ return dict.__getitem__(self, "process")
@property
[docs] def estimatedTime(self): # noqa: N802 """ Process estimated time. """ return self.get("estimatedTime")
@property
[docs] def processParameters(self): # noqa: N802 """ Process execution parameters for quote. """ return self.get("processParameters")
@property
[docs] def location(self): """ WPS Process URL. """ return self.get("location", "")
@property
[docs] def price(self): """ Price of the current quote. """ return self.get("price", 0.0)
@property
[docs] def currency(self): """ Currency of the quote price. """ return self.get("currency")
@property
[docs] def expire(self): """ Quote expiration datetime. """ return self.get("expire")
@property
[docs] def created(self): """ Quote creation datetime. """ return self.get("created")
@property
[docs] def steps(self): """ Sub-quote IDs if applicable. """ return self.get("steps", [])
[docs] def params(self): # type: () -> AnyParams return { "id": self.id, "price": self.price, "currency": self.currency, "user": self.user, "process": self.process, "location": self.location, "steps": self.steps, "title": self.title, "description": self.description, "details": self.details, "created": self.created, "expire": self.expire, "estimatedTime": self.estimatedTime, "processParameters": self.processParameters,
}
[docs] def json(self): # type: () -> JSON return sd.QuoteSchema().deserialize(self)
[docs]class Bill(Base): """ Dictionary that contains bill information. It always has ``id``, ``user``, ``quote`` and ``job`` keys. """ def __init__(self, *args, **kwargs): super(Bill, self).__init__(*args, **kwargs) if "quote" not in self: raise TypeError("Field 'Bill.quote' is required") if not isinstance(self.get("quote"), str): raise ValueError("Field 'Bill.quote' must be a string.") if "job" not in self: raise TypeError("Field 'Bill.job' is required") if not isinstance(self.get("job"), str): raise ValueError("Field 'Bill.job' must be a string.") if "user" not in self: raise TypeError("Field 'Bill.user' is required") if not isinstance(self.get("user"), str): raise ValueError("Field 'Bill.user' must be a string.") if "price" not in self: raise TypeError("Field 'Bill.price' is required") if not isinstance(self.get("price"), float): raise ValueError("Field 'Bill.price' must be a float number.") if "currency" not in self: raise TypeError("Field 'Bill.currency' is required") if not isinstance(self.get("currency"), str) or len(self.get("currency")) != 3: raise ValueError("Field 'Bill.currency' must be an ISO-4217 currency string code.") if "created" not in self: self["created"] = now() try: self["created"] = dt_parse(str(self.get("created"))).isoformat() except ValueError: raise ValueError("Field 'Bill.created' must be an ISO-8601 datetime string.") if "id" not in self: self["id"] = str(uuid.uuid4()) @property
[docs] def id(self): """ Bill ID. """ return dict.__getitem__(self, "id")
@property
[docs] def user(self): """ User ID. """ return dict.__getitem__(self, "user")
@property
[docs] def quote(self): """ Quote ID. """ return dict.__getitem__(self, "quote")
@property
[docs] def job(self): """ Job ID. """ return dict.__getitem__(self, "job")
@property
[docs] def price(self): """ Price of the current quote. """ return self.get("price", 0.0)
@property
[docs] def currency(self): """ Currency of the quote price. """ return self.get("currency")
@property
[docs] def created(self): """ Quote creation datetime. """ return self.get("created")
@property
[docs] def title(self): """ Quote title. """ return self.get("title")
@property
[docs] def description(self): """ Quote description. """ return self.get("description")
[docs] def params(self): # type: () -> AnyParams return { "id": self.id, "user": self.user, "quote": self.quote, "job": self.job, "price": self.price, "currency": self.currency, "created": self.created, "title": self.title, "description": self.description,
}
[docs] def json(self): # type: () -> JSON return sd.BillSchema().deserialize(self)