Source code for weaver.wps_restapi.processes.utils

import logging
import math
from typing import TYPE_CHECKING

import colander
from pyramid.httpexceptions import HTTPBadRequest
from pyramid.settings import asbool

from weaver.compat import InvalidVersion
from weaver.config import WeaverFeature, get_weaver_configuration
from weaver.database import get_db
from weaver.formats import ContentType
from weaver.store.base import StoreProcesses
from weaver.utils import get_path_kvp, get_settings, get_weaver_url
from weaver.visibility import Visibility
from weaver.wps_restapi import swagger_definitions as sd

if TYPE_CHECKING:
    from typing import Dict, List, Optional, Tuple

    from weaver.datatype import Process, Service
    from weaver.typedefs import JSON, PyramidRequest

[docs] LOGGER = logging.getLogger(__name__)
[docs] def resolve_process_tag(request, process_query=False): # type: (PyramidRequest, bool) -> str """ Obtain the tagged :term:`Process` reference from request path and/or query according to available information. Whether the :term:`Process` is specified by path or query, another ``version`` query can be provided to specify the desired revision by itself. This ``version`` query is considered only if another version indication is not already specified in the :term:`Process` reference using the tagged semantic. When ``process_query = False``, possible combinations are as follows: - ``/processes/{processID}:{version}`` - ``/processes/{processID}?version={version}`` When ``process_query = True``, possible combinations are as follows: - ``/...?process={processID}:{version}`` - ``/...?process={processID}&version={version}`` :param request: Request from which to retrieve the process reference. :param process_query: Whether the process ID reference is located in request path or ``process={id}`` query. """ if process_query: process_id = request.params.get("process") else: process_id = request.matchdict.get("process_id", "") params = sd.LocalProcessQuery().deserialize(request.params) version = params.get("version") if version and ":" not in process_id: # priority to tagged version over query if specified process_id = f"{process_id}:{version}" process_id = sd.ProcessIdentifierTag(name="ProcessID").deserialize(process_id) return process_id
[docs] def get_processes_filtered_by_valid_schemas(request, detail=True): # type: (PyramidRequest, bool) -> Tuple[List[JSON], List[str], Dict[str, Optional[int]], bool, int] """ Validates the processes summary schemas and returns them into valid/invalid lists. :returns: List of valid process and invalid processes IDs for manual cleanup, along with filtering parameters. """ settings = get_settings(request) with_providers = False if get_weaver_configuration(settings) in WeaverFeature.REMOTE: with_providers = asbool(request.params.get("providers", False)) revisions_param = sd.ProcessRevisionsQuery(unknown="ignore").deserialize(request.params) with_revisions = revisions_param.get("revisions") with_links = asbool(request.params.get("links", False)) and detail paging_query = sd.ProcessPagingQuery() paging_value = {param.name: param.default for param in paging_query.children} paging_names = set(paging_value) paging_param = paging_query.deserialize(request.params) if with_providers and any(value != paging_value[param] for param, value in paging_param.items()): raise HTTPBadRequest(json={ "description": "Cannot combine paging/sorting parameters with providers full listing query.", "error": "ListingInvalidParameter", "value": list(paging_names.intersection(request.params)) }) store = get_db(request).get_store(StoreProcesses) processes, total_local_processes = store.list_processes( visibility=Visibility.PUBLIC, total=True, **revisions_param, **paging_param ) valid_processes = [] invalid_processes_ids = [] for process in processes: # type: Process try: try: valid_processes.append(process.summary(revision=with_revisions, links=with_links, container=request)) except (InvalidVersion, ValueError) as exc: raise colander.Invalid(sd.ProcessSummary, value=None, msg=str(exc)) except colander.Invalid as invalid: process_ref = process.tag if with_revisions else process.identifier LOGGER.debug("Invalid process [%s] because:\n%s", process_ref, invalid) invalid_processes_ids.append(process.identifier) return valid_processes, invalid_processes_ids, paging_param, with_providers, total_local_processes