"""
Conversion functions between corresponding data structures.
"""
import copy
import inspect
import json
import logging
import os
from collections import OrderedDict
from collections.abc import Hashable
from copy import deepcopy
from dataclasses import dataclass
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, overload
from urllib.parse import unquote, urlparse
import colander
import pint
from dateutil import parser as date_parser
from owslib.wps import ComplexData, Metadata as OWS_Metadata, is_reference
from pywps import OGCUNIT, Process as ProcessWPS
from pywps.app.Common import Metadata as WPS_Metadata
from pywps.inout import BoundingBoxInput, BoundingBoxOutput, ComplexInput, ComplexOutput, LiteralInput, LiteralOutput
from pywps.inout.basic import UOM, BasicBoundingBox, BasicComplex, BasicIO
from pywps.inout.formats import Format
from pywps.inout.literaltypes import ALLOWEDVALUETYPE, LITERAL_DATA_TYPES, RANGECLOSURETYPE, AllowedValue, AnyValue
from pywps.validator.mode import MODE
from weaver import xml_util
from weaver.exceptions import PackageTypeError
from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import (
DEFAULT_FORMAT,
DEFAULT_FORMAT_MISSING,
ContentType,
SchemaRole,
get_content_type,
get_cwl_file_format,
get_extension,
get_format,
repr_json
)
from weaver.processes.constants import (
CWL_NAMESPACES_REVERSED,
CWL_REQUIREMENT_APP_OGC_API,
CWL_REQUIREMENT_APP_WPS1,
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
IO_INPUT,
IO_OUTPUT,
OAS_ARRAY_TYPES,
OAS_COMPLEX_TYPES,
OAS_KEYWORD_TYPES,
OAS_LITERAL_BINARY_FORMATS,
OAS_LITERAL_DATETIME_FORMATS,
OAS_LITERAL_FLOAT_FORMATS,
OAS_LITERAL_INTEGER_FORMATS,
OAS_LITERAL_NUMERIC,
OAS_LITERAL_NUMERIC_FORMATS,
OAS_LITERAL_STRING_FORMATS,
OAS_LITERAL_TYPES,
PACKAGE_ARRAY_BASE,
PACKAGE_ARRAY_ITEMS,
PACKAGE_ARRAY_MAX_SIZE,
PACKAGE_ARRAY_TYPES,
PACKAGE_BASIC_TYPES,
PACKAGE_COMPLEX_TYPES,
PACKAGE_CUSTOM_TYPES,
PACKAGE_DIRECTORY_TYPE,
PACKAGE_ENUM_BASE,
PACKAGE_FILE_TYPE,
PACKAGE_FLOATING_TYPES,
PACKAGE_INTEGER_TYPES,
PACKAGE_LITERAL_TYPES,
PACKAGE_NUMERIC_TYPES,
WPS_BOUNDINGBOX,
WPS_BOUNDINGBOX_DATA,
WPS_COMPLEX,
WPS_COMPLEX_DATA,
WPS_COMPLEX_TYPES,
WPS_DATA_TYPES,
WPS_LITERAL,
WPS_LITERAL_DATA_BOOLEAN,
WPS_LITERAL_DATA_DATETIME,
WPS_LITERAL_DATA_FLOAT,
WPS_LITERAL_DATA_INTEGER,
WPS_LITERAL_DATA_STRING,
WPS_LITERAL_DATA_TYPES,
WPS_REFERENCE,
IO_Select_Type,
JobInputsOutputsSchema,
ProcessSchema,
WPS_CategoryType
)
from weaver.utils import (
HashDict,
SchemaRefResolver,
bytes2str,
fetch_file,
fully_qualified_name,
get_any_id,
get_any_value,
get_sane_name,
get_url_without_query,
null,
str2bytes,
transform_json
)
from weaver.wps.utils import get_wps_client
from weaver.wps_restapi import swagger_definitions as sd
if TYPE_CHECKING:
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union
from typing_extensions import Literal, NotRequired, Required, TypedDict
from urllib.parse import ParseResult
from owslib.ows import BoundingBox
from owslib.wps import (
BoundingBoxDataInput,
ComplexDataInput,
Input as OWS_Input_Base,
Output as OWS_Output_Base,
Process as ProcessOWS
)
from pint._typing import Scalar
from pywps.app import WPSRequest
from requests.models import Response
from weaver.processes.constants import (
JobInputsOutputsSchemaAnyOGCType,
JobInputsOutputsSchemaAnyOLDType,
JobInputsOutputsSchemaType,
ProcessSchemaType,
WPS_DataType,
WPS_LiteralData_Type
)
from weaver.typedefs import (
AnySettingsContainer,
AnyValueType,
CWL,
CWL_IO_ComplexType,
CWL_IO_DataType,
CWL_IO_EnumSymbols,
CWL_IO_FileValue,
CWL_IO_LiteralType,
CWL_IO_Type,
CWL_IO_ValueMap,
CWL_SchemaNames,
ExecutionInputs,
ExecutionInputsList,
ExecutionInputsMap,
ExecutionOutputs,
ExecutionOutputsList,
ExecutionOutputsMap,
JobValueFile,
JobValueItem,
JSON,
OpenAPISchema,
OpenAPISchemaArray,
OpenAPISchemaKeyword,
OpenAPISchemaObject,
OpenAPISchemaProperty,
OpenAPISchemaReference
)
# typing shortcuts
# pylint: disable=C0103,invalid-name
WPS_Output_Type = Union[LiteralOutput, ComplexOutput, BoundingBoxOutput]
WPS_IO_Type = Union[WPS_Input_Type, WPS_Output_Type]
OWS_Input_Type = Union[OWS_Input_Base, BoundingBox, BoundingBoxDataInput, ComplexData, ComplexDataInput]
OWS_Output_Type = Union[OWS_Output_Base, BoundingBox, ComplexData]
OWS_IO_Type = Union[OWS_Input_Type, OWS_Output_Type]
JSON_Format = TypedDict("JSON_Format", {
"mime_type": Required[str],
"encoding": NotRequired[Optional[str]],
"schema": NotRequired[Optional[str]],
"maximumMegabytes": NotRequired[Optional[int]],
"default": NotRequired[bool],
}, total=True)
JSON_UoM = TypedDict("JSON_UoM", {
"uom": str,
"reference": NotRequired[str],
}, total=True)
JSON_IO_TypeBase = TypedDict("JSON_IO_TypeBase", {
"id": Required[str],
"type": Required[str],
"identifier": NotRequired[str],
}, total=False)
JSON_IO_Type = Union[JSON, JSON_IO_TypeBase]
JSON_IO_TypedInfo = TypedDict("JSON_IO_TypedInfo", {
"type": WPS_DataType,
"data_type": NotRequired[Optional[str]],
"data_uom": NotRequired[Optional[bool]],
"minOccurs": NotRequired[int],
"maxOccurs": NotRequired[int],
"supported_formats": NotRequired[List[JSON_Format]],
"uom": NotRequired[JSON_UoM],
"uoms": NotRequired[List[JSON_UoM]],
}, total=False)
JSON_IO_ListOrMap = Union[List[JSON], Dict[str, Union[JSON, str]]]
PKG_IO_Type = Union[JSON_IO_Type, WPS_IO_Type]
ANY_IO_Type = Union[CWL_IO_Type, JSON_IO_Type, WPS_IO_Type, OWS_IO_Type]
ANY_Format_Type = Union[Dict[str, Optional[str]], Format]
ANY_Metadata_Type = Union[OWS_Metadata, WPS_Metadata, Dict[str, str]]
DataInputType = TypedDict("DataInputType", {
"data": Union[float, int, bool, str],
"format": NotRequired[JSON_Format],
# ** any other params
}, total=False)
# WPS object attribute -> all possible *other* naming variations (no need to repeat key name)
[docs]
WPS_FIELD_MAPPING = {
"identifier": ["id", "ID", "Id", "Identifier"],
"title": ["Title", "Label", "label"],
"abstract": ["description", "Description", "Abstract"],
"version": ["processVersion", "Version"],
"metadata": ["Metadata"],
"keywords": ["Keywords"],
"allowed_values": ["AllowedValues", "allowedValues", "allowedvalues", "Allowed_Values", "Allowedvalues"],
"allowed_collections": ["AllowedCollections", "allowedCollections", "allowedcollections", "Allowed_Collections",
"Allowedcollections"],
"any_value": ["anyvalue", "anyValue", "AnyValue"],
"literal_data_domains": ["literalDataDomains"],
"default": ["default_value", "defaultValue", "DefaultValue", "Default", "_default", "data_format", "data"],
"supported_values": ["SupportedValues", "supportedValues", "supportedvalues", "Supported_Values"],
"supported_formats": ["SupportedFormats", "supportedFormats", "supportedformats", "Supported_Formats", "formats"],
"supported_crs": ["SupportedCRS", "supportedCRS", "crss", "crs", "CRS"],
"additional_parameters": ["AdditionalParameters", "additionalParameters", "additionalparameters",
"Additional_Parameters"],
"data_type": ["data_type", "dataType", "DataType", "Data_Type"],
"type": ["Type", "data_type", "dataType", "DataType", "Data_Type"],
"min_occurs": ["minOccurs", "MinOccurs", "Min_Occurs", "minoccurs"],
"max_occurs": ["maxOccurs", "MaxOccurs", "Max_Occurs", "maxoccurs"],
"max_megabytes": ["maximumMegabytes", "max_size"],
"mime_type": ["mimeType", "MimeType", "mime-type", "Mime-Type", "mimetype",
"mediaType", "MediaType", "media-type", "Media-Type", "mediatype",
"content_type", "contentMediaType"],
"range_minimum": ["minval", "minimum", "minimumValue"],
"range_maximum": ["maxval", "maximum", "maximumValue"],
"range_spacing": ["spacing"],
"range_closure": ["closure", "rangeClosure"],
"encoding": ["Encoding", "content_encoding", "contentEncoding"],
"schema": ["Schema", "contentSchema"],
"href": ["url", "link", "reference"],
"uom": ["UoM", "unit", "default_uom"],
"uoms": ["UOMs", "units", "supported_uoms"],
"measure": ["value", "measurement"],
}
# WPS fields that contain a structure corresponding to `Format` object
# - keys must match `WPS_FIELD_MAPPING` keys
# - fields are placed in order of relevance (prefer explicit format, then supported, and defaults as last resort)
# setup unit registry and aliases
[docs]
UNIT_REGISTRY = pint.UnitRegistry()
UNIT_REGISTRY.default_format = "~P" # short-form symbols, pretty-format units
[docs]
UNIT_OGC_REVERSED = {} # type: Dict[str, List[str]] # {URN: [shorthand-names]}
for plain_unit, ogc_unit in OGCUNIT.items():
UNIT_OGC_REVERSED.setdefault(ogc_unit, [])
UNIT_OGC_REVERSED[ogc_unit].append(plain_unit)
[docs]
LOGGER = logging.getLogger(__name__)
[docs]
def convert_unit(unit):
# type: (str) -> pint.Unit
"""
Convert units considering any known registry names and :term:`OGC` :term:`URN`.
Since :term:`OGC` defines units using :term:`URN`, they cannot be registered as aliases in :data:`UNIT_REGISTRY`.
This is a limitation from the naming format that requires valid Python identifier names, which is not possible
due to the ``:`` characters in :term:`URN` references.
"""
if unit in ["unity", OGCUNIT["unity"]]:
return UNIT_REGISTRY.Unit("") # dimensionless
unit = UNIT_OGC_REVERSED.get(unit) or unit
if isinstance(unit, list):
unit = unit[0]
return UNIT_REGISTRY.Unit(unit)
[docs]
def convert_value_units(value, uom, to):
# type: (Scalar, str, str) -> Scalar
"""
Converts the provided value from one :term:`UoM` to another.
"""
uom = convert_unit(uom)
to = convert_unit(to)
quantity = UNIT_REGISTRY.Quantity(value, uom)
converted = quantity.to(to)
return converted.magnitude
[docs]
def complex2json(data):
# type: (Union[ComplexData, Any]) -> Union[JSON, Any]
"""
Obtains the :term:`JSON` representation of a :class:`ComplexData` or simply return the unmatched type.
"""
if not isinstance(data, ComplexData):
return data
# backward compat based on OWSLib version, field did not always exist
max_mb = getattr(data, "maximumMegabytes", None)
if isinstance(max_mb, str) and max_mb.isnumeric():
max_mb = int(max_mb)
return {
"mimeType": data.mimeType,
"encoding": data.encoding,
"schema": data.schema,
"maximumMegabytes": max_mb,
"default": False, # always assume it is a supported format/value, caller should override
}
[docs]
def uom2json(uom, ref=None):
# type: (Union[str, UOM, JSON_UoM], Optional[str]) -> JSON_UoM
"""
Convert an :term:`UoM` definition into corresponding :term:`JSON` representation.
"""
if isinstance(uom, str):
return UOM(
str(convert_unit(uom)), # compact form
ref or "", # FIXME: default string fix for https://github.com/geopython/pywps/issues/685
).json
if isinstance(uom, UOM):
uom.uom = str(convert_unit(uom.uom)) # compact from as needed
return uom.json
if isinstance(uom, dict) and "uom" in uom:
uom["uom"] = str(convert_unit(uom["uom"])) # compact from as needed
return uom
raise TypeError(f"Unable to convert JSON UoM definition from '{fully_qualified_name(uom)}'.")
[docs]
def ows2json_io(ows_io):
# type: (OWS_IO_Type) -> JSON_IO_Type
"""
Converts :term:`I/O` definition from :mod:`owslib.wps` to :term:`JSON`.
"""
json_io = {}
for field in WPS_FIELD_MAPPING:
value = get_field(ows_io, field, search_variations=True)
# preserve numeric values (ex: "minOccurs"=0) as actual parameters
# ignore undefined values represented by `null`, empty list, or empty string
if value or value in [0, 0.0]:
if isinstance(value, list):
# complex data is converted as is
# metadata converted and preserved if it results into a minimally valid definition (otherwise dropped)
json_io[field] = [
complex2json(v) if isinstance(v, ComplexData) else
metadata2json(v) if isinstance(v, OWS_Metadata) else v
for v in value if not isinstance(v, OWS_Metadata) or v.url is not None
]
elif isinstance(value, ComplexData):
json_io[field] = complex2json(value)
elif isinstance(value, OWS_Metadata):
json_io[field] = metadata2json(value)
else:
json_io[field] = value
json_io["id"] = get_field(json_io, "identifier", search_variations=True, pop_found=True)
io_type = json_io.get("type")
# add 'format' if missing, derived from other variants
if io_type == WPS_COMPLEX_DATA:
fmt_default_found = False
if "default" in json_io and isinstance(json_io["default"], dict):
# NOTE:
# When parsing between OWSLib and PyWPS for corresponding 'ComplexData' structures, there is an
# ambiguity in the field names. The OWSLib 'defaultValue' attribute is employed as the "default format"
# to resolve if the **format** was omitted (but only when the input reference/data must is provided!),
# whereas PyWPS uses the attribute named 'data_format' for the applied (or default) format of the input
# reference/data. However, both of these libraries also reuse the 'defaultValue' and 'default' attributes
# respectively for the 'LiteralData' type, where the value is the **actual data** used by default.
# The conversion between those libraries must therefore be careful about these specific field combinations
# to avoid misbehavior when resolving some form of "default" by omission of data/reference/format.
# To be sure, swap the ambiguous 'default' for explicit 'data_format' (where 'data=None' if I/O omitted).
fmt_default_value = json_io.pop("default")
fmt_default_value["default"] = True # provide for workflow extension (internal), schema drops it (API)
json_io["data_format"] = fmt_default_value
fmt_default_found = True
# retrieve alternate format definitions
if "formats" not in json_io:
# correct complex data 'formats' from OWSLib from initial fields loop can get stored in 'supported_values'
fmt_val = get_field(json_io, "supported_values", pop_found=True)
if fmt_val:
json_io["formats"] = fmt_val
else:
# search for format fields directly specified in I/O body
for field in WPS_FIELD_FORMAT:
fmt = get_field(json_io, field, search_variations=True)
if not fmt:
continue
if isinstance(fmt, dict):
fmt = [fmt]
fmt = filter(lambda f: isinstance(f, dict), fmt)
if not isinstance(json_io.get("formats"), list):
json_io["formats"] = []
for var_fmt in fmt:
# add it only if not exclusively provided by a previous variant
json_fmt_items = [j_fmt.items() for j_fmt in json_io["formats"]]
if any(all(var_item in items for var_item in var_fmt.items()) for items in json_fmt_items):
continue
json_io["formats"].append(var_fmt)
json_io.setdefault("formats", [])
# apply the default flag
for fmt in json_io["formats"]:
fmt_default_value = get_field(json_io, "data_format", default={})
fmt["default"] = fmt_default_found and is_equal_formats(fmt_default_value, fmt)
if fmt["default"]:
break
# NOTE:
# Don't force 'minOccurs=0' as in below literal case because default 'format' does not imply that unspecified
# input is valid, but rather that given an input without explicit 'format' specified, that 'default' is used.
# If the I/O is optional, it should have indicated explicitly the 'minOccurs=0' attribute for 'ComplexData'.
return json_io
# add value constraints in specifications
elif io_type in WPS_LITERAL_DATA_TYPES:
domains = any2json_literal_data_domains(ows_io)
if domains:
json_io["literalDataDomains"] = domains
# fix inconsistencies of some process descriptions
# WPS are allowed to report 'minOccurs=1' although 'defaultValue' can also be provided
# (see https://github.com/geopython/pywps/issues/625)
if "defaultValue" in domains[0]:
json_io["min_occurs"] = 0
return json_io
[docs]
def ows2json_output_data(output, process_description, container=None):
# type: (OWS_Output_Type, ProcessOWS, Optional[AnySettingsContainer]) -> JSON
"""
Utility method to convert an :mod:`owslib.wps` process execution output data (result) to :term:`JSON`.
In the case that a ``reference`` output of `JSON` content-type is specified and that it refers to a file that
contains an array list of URL references to simulate a multiple-output, this specific output gets expanded to
contain both the original URL ``reference`` field and the loaded URL list under ``data`` field for easier access
from the response body.
Referenced file(s) are fetched in order to store them locally if executed on a remote process, such that they can
become accessible as local job result for following reporting or use by other processes in a workflow chain.
If the ``dataType`` details is missing from the data output (depending on servers that might omit it), the
:paramref:`process_description` is employed to retrieve the original description with expected result details.
:param output: Output with data value or reference according to expected result for the corresponding process.
:param process_description: Definition of the process producing the specified output following execution.
:param container: Container to retrieve application settings (for request options during file retrieval as needed).
:return:
Converted :term:`JSON` result data and additional metadata as applicable based on data-type and content-type.
"""
if not output.dataType:
for process_output in getattr(process_description, "processOutputs", []):
if getattr(process_output, "identifier", "") == output.identifier:
output.dataType = process_output.dataType
break
json_output = {
"identifier": output.identifier,
"title": output.title,
"dataType": output.dataType,
"data": None,
}
# WPS standard v1.0.0 specify that either a reference or a data field has to be provided
if output.reference:
json_output["reference"] = output.reference
# Handle special case where we have a reference to a json array containing dataset reference.
# Avoid reference to reference by fetching directly the dataset references.
json_array = _get_multi_json_references(output, container)
if json_array and all(str(ref).startswith("http") for ref in json_array):
json_output["data"] = json_array
json_output = any2json_output_data(output, json_output) # normalize any missing definitions as necessary
elif (
output.dataType == WPS_COMPLEX_DATA and
output.mimeType == ContentType.APP_RAW_JSON and
output.data and isinstance(output.data[0], str)
):
json_data = json.loads(output.data[0])
if isinstance(json_data, list):
list_data = []
for item in json_data:
item_data = item.get("data") or item # literal or reference
item_data = any2json_output_data(item, item_data) # normalize fields before lookup of contained data
item_data = item_data["data"] if item_data["dataType"] in WPS_LITERAL_DATA_TYPES else item_data
list_data.append(item_data)
json_output["data"] = list_data
else:
json_output["data"] = json_data
json_output["mimeType"] = ContentType.APP_JSON
else:
# WPS standard v1.0.0 specify that Output data field has Zero or one value
output_data = output.data[0] if output.data else None # type: Union[BoundingBox, AnyValueType]
json_output = any2json_output_data(output, output_data)
return json_output
[docs]
def any2json_output_data(output_info, output_data):
# type: (Union[OWS_Output_Type, JSON], Union[JSON, BoundingBox, AnyValueType]) -> JSON
"""
Converts :mod:`owslib` or :mod:`pywps` :term:`JSON` equivalent output data to normalized :term:`JSON` format.
Only processes a single item. Must be called iteratively for each data entry in a list.
"""
json_output = {
"identifier": get_field(output_info, "identifier", search_variations=True),
"title": get_field(output_info, "title", default=None),
"dataType": get_field(output_info, "data_type", search_variations=True, default=None),
"data": None,
}
io_type = get_field(output_info, "type", search_variations=True)
# convert data value as required since WPS XML uses raw strings or container objects
if json_output["dataType"] in WPS_LITERAL_DATA_TYPES or json_output["dataType"] == WPS_LITERAL:
json_output["data"] = any2json_literal_data(output_data, json_output["dataType"])
elif json_output["dataType"] == WPS_BOUNDINGBOX_DATA:
json_data = ows2json_bbox_data(output_data)
json_output["data"] = json_data
elif json_output["dataType"] == WPS_COMPLEX_DATA or io_type == WPS_REFERENCE or WPS_REFERENCE in output_data:
json_output["dataType"] = WPS_COMPLEX_DATA # force in case not set
media_type = (
get_field(output_data, "mime_type", search_variations=True, default=None) or
get_field(output_info, "mime_type", search_variations=True, default=None)
)
if isinstance(output_data, dict):
complex_data = get_any_value(output_data, file=False, data=True, default=None)
complex_href = get_any_value(output_data, file=True, data=False, default=None)
else:
complex_data = output_data
complex_href = None
if media_type:
json_output.setdefault("mimeType", media_type)
if media_type in ContentType.ANY_JSON and isinstance(complex_data, (list, dict)) and complex_data:
json_output["data"] = complex_data
if complex_href:
json_output["reference"] = complex_href
return json_output
[docs]
def any2json_literal_data(data, data_type):
# type: (AnyValueType, WPS_LiteralData_Type) -> AnyValueType
"""
Converts :mod:`owslib` :term:`WPS` literal data using strings into the specific :term:`JSON` compatible type.
"""
if data is None:
return None
dtype = any2wps_literal_datatype(data_type)
if dtype == "float":
return float(data)
if dtype == "boolean":
return bool(data)
if dtype == "integer":
return int(data)
if dtype in WPS_LITERAL_DATA_DATETIME:
data = date_parser.parse(data) if isinstance(data, str) else data
return data.isoformat()
return str(data)
[docs]
def ows2json_bbox_data(bbox):
# type: (BoundingBox) -> JSON
"""
Converts :mod:`owslib` :term:`WPS` Bounding Box data into a :term:`JSON` representation.
"""
# FIXME: owslib does not actually handle 3D+ coordinates...
bbox_crs = str(bbox.crs)
if bbox.crs.axisorder == "yx":
bbox_val = [bbox.miny, bbox.minx, bbox.maxy, bbox.maxx]
else:
bbox_val = [bbox.minx, bbox.miny, bbox.maxx, bbox.maxy]
bbox_val = [float(val) for val in bbox_val]
bbox_data = {"bbox": bbox_val, "crs": bbox_crs}
if bbox.crs.id.upper() == sd.OGC_API_BBOX_EPSG:
bbox_data.update({
"format": sd.OGC_API_BBOX_FORMAT,
"schema": sd.OGC_API_BBOX_SCHEMA,
})
return bbox_data
# FIXME: support metalink unwrapping / multi-output array (weaver https://github.com/crim-ca/weaver/issues/25)
[docs]
def _get_multi_json_references(output, container):
# type: (OWS_Output_Type, Optional[AnySettingsContainer]) -> Optional[List[JSON]]
"""
Obtains the :term:`JSON` contents of a single output corresponding to multi-file references.
Since WPS standard does not allow to return multiple values for a single output,
a lot of process actually return a :term:`JSON` array containing references to these outputs.
Because the multi-output references are contained within this :term:`JSON` file, it is not very convenient to
retrieve the list of URLs as one always needs to open and read the file to get them. This function goal is to
detect this particular format and expand the references to make them quickly available in the job output response.
:return:
Array of HTTP(S) references if the specified output a :term:`JSON` with URL references, ``None`` otherwise.
"""
# Check for the json datatype and mime-type
if output.dataType == WPS_COMPLEX_DATA and output.mimeType == ContentType.APP_JSON:
try:
# If the json data is referenced read it's content
if output.reference:
with TemporaryDirectory() as tmp_dir:
file_path = fetch_file(output.reference, tmp_dir, settings=container)
with open(file_path, "r", encoding=output.encoding) as tmp_file:
json_data_str = tmp_file.read()
# Else get the data directly
else:
# process output data are append into a list and
# WPS standard v1.0.0 specify that Output data field has zero or one value
if not output.data:
return None
json_data_str = output.data[0]
# Load the actual json dict
json_data = json.loads(json_data_str)
except Exception as exc: # pylint: disable=W0703
LOGGER.debug("Failed retrieval of JSON output file for multi-reference unwrapping", exc_info=exc)
return None
if isinstance(json_data, list):
return None if any(not is_reference(data_value) for data_value in json_data) else json_data
return None
[docs]
def get_io_type_category(io_info):
# type: (ANY_IO_Type) -> WPS_CategoryType
"""
Guesses the applicable :term:`I/O` type with provided information from any known :term:`I/O` structure.
"""
io_type = get_field(io_info, "type", search_variations=True)
if isinstance(io_type, str):
if io_type in WPS_DATA_TYPES:
return WPS_COMPLEX if io_type in WPS_COMPLEX_TYPES else io_type
io_type = any2cwl_literal_datatype(io_type) or any2wps_literal_datatype(io_type)
return WPS_COMPLEX if io_type is null else WPS_LITERAL
if isinstance(io_type, dict):
io_info = copy.deepcopy(io_info)
io_info.setdefault("name", "dontcare")
io_def = get_cwl_io_type(io_info, strict=False)
return WPS_COMPLEX if io_def.type in [null, PACKAGE_FILE_TYPE, PACKAGE_DIRECTORY_TYPE] else WPS_LITERAL
io_fmt = get_field(io_info, "supported_formats", search_variations=True)
return WPS_LITERAL if io_fmt is null else WPS_COMPLEX
[docs]
def _get_cwl_fmt_details(wps_fmt):
# type: (ANY_Format_Type) -> Union[Tuple[Tuple[str, str], str, str], Tuple[None, None, None]]
_wps_io_fmt = get_field(wps_fmt, "mime_type", search_variations=True)
if not _wps_io_fmt:
return None, None, None
_cwl_io_ext = get_extension(_wps_io_fmt)
_cwl_io_ref, _cwl_io_fmt = get_cwl_file_format(_wps_io_fmt, must_exist=True, allow_synonym=False)
return _cwl_io_ref, _cwl_io_fmt, _cwl_io_ext
[docs]
def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select):
# type: (CWL_IO_Type, Dict[str, str], Union[JSON_IO_Type, WPS_IO_Type, OWS_IO_Type], IO_Select_Type) -> None
"""
Converts the :term:`WPS`-like :term:`I/O` definition and defines them inplace into the :term:`CWL` containers.
.. seealso::
See :meth:`weaver.processes.wps_process_base.WpsProcessInterface.stage_results` which closely interacts
with the produced ``outputBinding.glob`` patterns generated here. Methodology should align between them.
:param cwl_io: Basic :term:`CWL` :term:`I/O` container (only ID needed) where to write conversion results.
:param cwl_ns: Namespaces to gradually update when encountering new format Media-Type definitions.
:param wps_io: Original :term:`WPS`-like :term:`I/O` to be converted.
:param io_select: Context of the :term:`I/O`.
:return: Nothing. Changed inplace.
"""
cwl_io_fmt = None
cwl_io_ext = get_extension(ContentType.ANY)
cwl_io["type"] = PACKAGE_FILE_TYPE
cwl_id = cwl_io["id"]
# inputs are allowed to define multiple 'supported' formats
# outputs are allowed to define only one 'applied' format
for field in WPS_FIELD_FORMAT:
fmt = get_field(wps_io, field, search_variations=True)
if not fmt:
continue
if isinstance(fmt, (list, tuple)) and len(fmt) == 1:
fmt = fmt[0]
if not isinstance(fmt, (list, tuple)): # could be 'dict', 'Format' or any other 'object' holder
cwl_io_ref, cwl_io_fmt, cwl_io_ext = _get_cwl_fmt_details(fmt)
if cwl_io_ref and cwl_io_fmt:
cwl_ns.update(cwl_io_ref)
break
if isinstance(fmt, (list, tuple)):
cwl_ns_multi = {}
cwl_fmt_multi = {} # use dict as ordered set
cwl_ext_multi = {} # use dict as ordered set
for fmt_i in fmt:
# FIXME: (?)
# when multiple formats are specified, but at least one schema/namespace reference can't be found,
# we must drop all since that unknown format is still allowed but cannot be validated
# avoid potential validation error if that format was the one provided during execute...
# (see: https://github.com/crim-ca/weaver/issues/50)
cwl_io_ref_i, cwl_io_fmt_i, cwl_io_ext = _get_cwl_fmt_details(fmt_i)
if cwl_io_ref_i and cwl_io_fmt_i: # if any known format was resolved
cwl_ns_multi.update(cwl_io_ref_i)
cwl_fmt_multi.update({cwl_io_fmt_i: None})
cwl_ext_multi.update({cwl_io_ext: None})
else:
# reset all since at least one format could not be mapped to an official schema
cwl_ns_multi = {}
cwl_fmt_multi = None
break
cwl_io_fmt = cwl_fmt_multi # all formats or none of them
cwl_io_ext = cwl_ext_multi
cwl_ns.update(cwl_ns_multi)
break
cwl_io_ext = [cwl_io_ext] if isinstance(cwl_io_ext, str) else list(cwl_io_ext)
if cwl_io_fmt:
# don't use any format if more than one because we cannot enforce multiple formats
# ('format' must be string: https://www.commonwl.org/v1.2/CommandLineTool.html#File)
if not isinstance(cwl_io_fmt, str) and len(cwl_io_fmt) == 1:
cwl_io["format"] = list(cwl_io_fmt)[0]
if isinstance(cwl_io_fmt, str):
cwl_io["format"] = cwl_io_fmt
if io_select == IO_OUTPUT:
# for backward compatibility with deployed processes, consider text/plan as 'any' for glob pattern
cwl_io_txt = get_extension(ContentType.TEXT_PLAIN)
if cwl_io_txt in cwl_io_ext:
cwl_io_any = get_extension(ContentType.ANY)
LOGGER.warning("Replacing '%s' [%s] to generic '%s' [%s] glob pattern from resolved formats %s. "
"More explicit format media-type should be considered for %s '%s'.",
ContentType.TEXT_PLAIN, cwl_io_txt, cwl_io_ext,
ContentType.ANY, cwl_io_any, io_select, cwl_id)
cwl_io_ext = [cwl_io_any]
# Method 'weaver.processes.wps_process_base.WpsProcessInterface.stage_results' uses the produced glob
# pattern(s) below of generated output definitions from WPS items that don't offer any hint about the
# expected file naming format or specification (because we cannot guess what will be produced as output
# from the remote process definitions alone). We can only provide expected extension based on the file
# format/schema/media-type of the output definition.
# To avoid potential naming clashes or conflicting matching from generic patterns when CWL tries to resolve
# paths, that staging operation stage outputs and adjust each glob pattern under a directory named by the
# respective output ID.
# However, it is very important **NOT** to add the output ID directory nesting approach here, otherwise it
# will confuse the staging process between Workflow steps, since it won't be able to distinguish whether the
# nesting was already applied by Weaver (here), or provided by a user-provided CWL Application Package, since
# WPS-based. OGC-based, CWL-based, (or any future implementation) can be combined within a same Workflow.
cwl_glob = [
f"*{ext}" if ext != "/" else "./" # handle special case of "extension" for 'Directory' type
for ext in cwl_io_ext
]
cwl_io["outputBinding"] = {
"glob": cwl_glob[0] if len(cwl_glob) == 1 else cwl_glob
}
[docs]
def _get_cwl_js_value_from(cwl_io_symbols, allow_unique, allow_array):
# type: (List[AnyValueType], bool, bool) -> str
"""
Obtain the JavaScript ``valueFrom`` definition for a :term:`CWL` input of non-``string`` allowed values.
"""
cwl_js_value_from_const = f"const values = {json.dumps(cwl_io_symbols)};"
cwl_js_value_from_array = """
if (self.every(item => values.includes(item))) {
return self;
}
else {
throw "invalid value(s) in [" + self + "] are not all allowed values from [" + values + "]";
}
"""
cwl_js_value_from_unique = """
if (values.includes(self)) {
return self;
}
else {
throw "invalid value " + self + " is not an allowed value from [" + values + "]";
}
"""
if allow_unique and not allow_array:
cwl_js_value_from = inspect.cleandoc(f"""
${{
if (Array.isArray(self)) {{
throw "invalid value " + self + " does not match expected type";
}}
{cwl_js_value_from_const}
{cwl_js_value_from_unique}
}}
""")
elif not allow_unique and allow_array:
cwl_js_value_from = f"""
${{
if (!Array.isArray(self)) {{
throw "invalid value " + self + " does not match expected type";
}}
{cwl_js_value_from_const}
{cwl_js_value_from_array}
}}
"""
else:
cwl_js_value_from = f"""
${{
{cwl_js_value_from_const}
if (Array.isArray(self)) {{
{cwl_js_value_from_array}
}}
else {{
{cwl_js_value_from_unique}
}}
}}
"""
return cwl_js_value_from
[docs]
def _convert_cwl_io_enum(cwl_io_type, cwl_io_symbols, io_select, allow_unique, allow_array):
# type: (Union[str, Type[null]], List[AnyValueType], IO_Select_Type, bool, bool) -> CWL_IO_Type
"""
Converts the :term:`I/O` definition to a :term:`CWL` :term:`I/O` that allows ``Enum``-like functionality.
In the event of an explicit ``string`` as base type, :term:`CWL` directly supports ``type: enum``. Other basic
types are not directly supported, and must instead perform manual validation against the set of allowed values.
.. seealso::
- https://github.com/common-workflow-language/cwl-v1.2/issues/267
- https://github.com/common-workflow-language/common-workflow-language/issues/764
- https://github.com/common-workflow-language/common-workflow-language/issues/907
.. warning::
Because ``valueFrom`` can only be used with ``inputBinding``, any output providing a set of allowed values
that are not ``string``-based will be ignored when converted to :term:`CWL` :term:`I/O`.
:param cwl_io_type: Basic type for which allowed values should apply.
:param cwl_io_symbols: Allowed values to restrict the :term:`I/O` definition.
:return: Converted definition as CWL Enum or with relevant value validation as applicable for the type.
"""
if cwl_io_type not in PACKAGE_BASIC_TYPES:
return {}
if cwl_io_type == "string":
return {"type": {"type": PACKAGE_ENUM_BASE, "symbols": cwl_io_symbols}}
if cwl_io_type not in PACKAGE_NUMERIC_TYPES:
LOGGER.warning(
"Could not resolve conversion of CWL I/O as Enum for type '%s'. "
"Ignoring value validation against specified allowed values: %s.",
cwl_io_type,
cwl_io_symbols,
)
return {"type": cwl_io_type}
if not (
(all(isinstance(value, bool) for value in cwl_io_symbols) and cwl_io_type == "boolean") or
(all(isinstance(value, int) for value in cwl_io_symbols) and cwl_io_type in PACKAGE_INTEGER_TYPES) or
(all(isinstance(value, float) for value in cwl_io_symbols) and cwl_io_type in PACKAGE_FLOATING_TYPES)
):
LOGGER.warning(
"Incompatible CWL I/O type '%s' detected for specified allowed values: %s. "
"Will use generic CWL 'Any' type instead.",
cwl_io_type,
cwl_io_symbols,
)
cwl_io_type = "Any"
if io_select != IO_INPUT:
return {"type": cwl_io_type}
cwl_js_value_from = _get_cwl_js_value_from(cwl_io_symbols, allow_unique, allow_array)
return {"type": cwl_io_type, "inputBinding": {"valueFrom": cwl_js_value_from}}
[docs]
def any2cwl_io(wps_io, io_select):
# type: (Union[JSON_IO_Type, WPS_IO_Type, OWS_IO_Type], IO_Select_Type) -> Tuple[CWL_IO_Type, Dict[str, str]]
"""
Converts a :term:`WPS`-like :term:`I/O` from various :term:`WPS` library representations to :term:`CWL` :term:`I/O`.
Conversion can be accomplished for :mod:`pywps` and :mod:`owslib` objects, as well as their :term:`JSON` equivalent.
Because :term:`CWL` :term:`I/O` of type ``File`` with ``format`` field are namespaced, this is also returned if
needed.
:returns: converted :term:`I/O` and namespace dictionary with corresponding format references as required.
"""
wps_io_cat = get_io_type_category(wps_io)
wps_io_id = get_field(wps_io, "identifier", search_variations=True)
cwl_ns = {}
cwl_io = {"id": wps_io_id} # type: CWL_IO_Type # noqa
# convert OAS format to JSON first to simplify following comparisons
wps_io_type = get_field(wps_io, "type", search_variations=True)
wps_io_schema = get_field(wps_io, "schema", search_variations=False)
if wps_io_type is null and isinstance(wps_io_schema, dict):
wps_io = oas2json_io(wps_io_schema)
wps_io_cat = get_field(wps_io, "type", search_variations=False)
wps_io_type = get_field(wps_io, "data_type", search_variations=False)
wps_default = get_field(wps_io, "default", search_variations=True)
wps_min_occ = get_field(wps_io, "min_occurs", search_variations=True, default=1)
wps_max_occ = get_field(wps_io, "max_occurs", search_variations=True)
is_min_null = wps_min_occ in [0, "0"]
allow_unique = wps_min_occ in [0, "0", 1, "1"]
allow_array = wps_max_occ != null and (wps_max_occ == "unbounded" or wps_max_occ > 1)
if wps_io_cat not in list(WPS_COMPLEX_TYPES):
cwl_io_type = any2cwl_literal_datatype(wps_io_type)
if cwl_io_type is null:
LOGGER.warning("Could not identify a CWL literal data type with [%s].", wps_io_type)
wps_allow = get_field(wps_io, "allowed_values", search_variations=True)
if isinstance(wps_allow, list) and len(wps_allow) > 0:
cwl_io_enum = _convert_cwl_io_enum(cwl_io_type, wps_allow, io_select, allow_unique, allow_array)
cwl_io.update(cwl_io_enum)
else:
cwl_io["type"] = cwl_io_type
# FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51)
else:
_convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select)
# FIXME: multi-outputs (https://github.com/crim-ca/weaver/issues/25)
# min/max occurs can only be in inputs, outputs are enforced min/max=1 by WPS
if io_select == IO_INPUT:
# field 'default' must correspond to a fallback "value", not a default "format"
# NOTE:
# Don't set any 'default' field here (neither 'null' string or 'None' type) if no value was provided
# since those are interpreted by CWL as literal string 'null' (for 'string' type) or null object.
# Instead, 'null' string entry is added to 'type' to indicate drop/ignore missing input.
if wps_default != null and not isinstance(wps_default, dict):
cwl_io["default"] = wps_default
if allow_array:
cwl_array = {
"type": PACKAGE_ARRAY_BASE,
"items": cwl_io["type"]
}
# if single value still allowed, or explicitly multi-value array if min greater than one
if wps_min_occ > 1:
cwl_io["type"] = cwl_array
else:
cwl_io["type"] = [cwl_io["type"], cwl_array]
# apply default null after handling literal/array/enum type variants
# (easier to apply against their many different structures)
if is_min_null:
if isinstance(cwl_io["type"], list):
cwl_io["type"].insert(0, "null") # if min=0,max>1 (null, <type>, <array-type>)
else:
cwl_io["type"] = ["null", cwl_io["type"]] # if min=0,max=1 (null, <type>)
return cwl_io, cwl_ns
[docs]
def _patch_cwl_enum_js_requirement(cwl_package):
# type: (CWL) -> None
"""
Applies the JavaScript requirement to validate a pseudo-``Enum`` applied to a :term:`CWL` input definition.
.. seealso::
- :func:`any2cwl_io`
- :func:`_convert_cwl_io_enum`
- :func:`_get_cwl_js_value_from`
"""
cwl_items = cwl_package.get("inputs", [])
if isinstance(cwl_items, dict):
cwl_items = list(cwl_items.values())
for cwl_input in cwl_items:
cwl_value_from = cwl_input.get("inputBinding", {}).get("valueFrom", {})
if isinstance(cwl_value_from, str):
cwl_value_from = cwl_value_from.strip()
if cwl_value_from.startswith("${") and cwl_value_from.endswith("}"):
cwl_package.setdefault("requirements", {})
cwl_package["requirements"].setdefault(CWL_REQUIREMENT_INLINE_JAVASCRIPT, {})
return # early exit, no need to check more
[docs]
def wps2cwl_requirement(wps_service_url, wps_process_id):
# type: (Union[str, ParseResult], str) -> JSON
"""
Obtains the `CWL` requirements definition needed for parsing by a remote `WPS` provider as an `Application Package`.
"""
return OrderedDict([
("cwlVersion", "v1.0"),
("class", "CommandLineTool"),
("hints", {
CWL_REQUIREMENT_APP_WPS1: {
"provider": get_url_without_query(wps_service_url),
"process": wps_process_id,
}}),
])
[docs]
def ows2json(wps_process, wps_service_name, wps_service_url, wps_provider_name=None):
# type: (ProcessOWS, str, Union[str, ParseResult], Optional[str]) -> Tuple[CWL, JSON]
"""
Generates the `CWL` package and process definitions from a :class:`owslib.wps.Process` hosted under `WPS` location.
"""
process_info = OrderedDict([
("id", wps_process.identifier),
("keywords", [wps_service_name] if wps_service_name else []),
])
if wps_provider_name and wps_provider_name not in process_info["keywords"]:
process_info["keywords"].append(wps_provider_name)
default_title = wps_process.identifier.capitalize()
process_info["title"] = get_field(wps_process, "title", default=default_title, search_variations=True)
process_info["description"] = get_field(wps_process, "abstract", default=None, search_variations=True)
process_info["version"] = get_field(wps_process, "version", default=None, search_variations=True)
process_info["metadata"] = []
if wps_process.metadata:
for meta in wps_process.metadata:
metadata = metadata2json(meta)
if metadata:
process_info["metadata"].append(metadata)
process_info["inputs"] = [] # type: List[JSON]
process_info["outputs"] = [] # type: List[JSON]
for wps_in in wps_process.dataInputs: # type: OWS_Input_Type
process_info["inputs"].append(ows2json_io(wps_in))
for wps_out in wps_process.processOutputs: # type: OWS_Output_Type
process_info["outputs"].append(ows2json_io(wps_out))
# generate CWL for WPS-1 using parsed WPS-3
cwl_package = wps2cwl_requirement(wps_service_url, wps_process.identifier)
for io_select in [IO_INPUT, IO_OUTPUT]:
io_section = f"{io_select}s"
cwl_package[io_section] = []
for wps_io in process_info[io_section]:
cwl_io, cwl_ns = any2cwl_io(wps_io, io_select)
cwl_package[io_section].append(cwl_io)
if cwl_ns:
if "$namespaces" not in cwl_package:
cwl_package["$namespaces"] = {}
cwl_package["$namespaces"].update(cwl_ns)
_patch_cwl_enum_js_requirement(cwl_package)
return cwl_package, process_info
[docs]
def xml_wps2cwl(wps_process_response, settings):
# type: (Response, AnySettingsContainer) -> Tuple[CWL, JSON]
"""
Obtains the :term:`CWL` definition that corresponds to an :term:`XML` :term:`WPS` process.
Converts a `WPS-1 ProcessDescription XML` tree structure to an equivalent `WPS-3 Process JSON`, and builds the
associated :term:`CWL` package in conformance to :data:`weaver.processes.wps_package.CWL_REQUIREMENT_APP_WPS1`.
:param wps_process_response: Valid response (XML, 200) from a `WPS-1 ProcessDescription`.
:param settings: Application settings to retrieve additional request options.
"""
def _tag_name(_xml):
# type: (Union[xml_util.XML, str]) -> str
"""
Obtains ``tag`` from a ``{namespace}Tag`` `XML` element.
"""
if hasattr(_xml, "tag"):
_xml = _xml.tag
return _xml.split("}")[-1].lower()
# look for `XML` structure starting at `ProcessDescription` (WPS-1)
xml_resp = xml_util.fromstring(str2bytes(wps_process_response.content))
xml_wps_process = xml_resp.xpath("//ProcessDescription") # type: List[xml_util.XML]
if not len(xml_wps_process) == 1:
raise ValueError("Could not retrieve a valid 'ProcessDescription' from WPS-1 response.")
process_id = None
for sub_xml in xml_wps_process[0]:
tag = _tag_name(sub_xml)
if tag == "identifier":
process_id = sub_xml.text
break
if not process_id:
raise ValueError("Could not find a match for 'ProcessDescription.identifier' from WPS-1 response.")
# transform WPS-1 -> WPS-3
wps = get_wps_client(wps_process_response.url, settings)
wps_service_url = urlparse(wps_process_response.url)
if wps.provider:
wps_service_name = wps.provider.name
else:
wps_service_name = wps_service_url.hostname
wps_process = wps.describeprocess(process_id, xml=wps_process_response.content)
cwl_package, process_info = ows2json(wps_process, wps_service_name, wps_service_url)
return cwl_package, process_info
[docs]
def ogcapi2cwl_process(payload, reference):
# type: (JSON, str) -> Tuple[CWL, JSON]
"""
Generate a :term:`CWL` for a remote :term:`OGC API - Processes` description to dispatch :term:`Process` execution.
.. seealso::
- :class:`weaver.processes.wps3_process.Wps3Process`
:param payload: :term:`JSON` :term:`Process` description in :term:`OGC API - Processes` format.
:param reference: URL where the :term:`Process` is located.
:returns: Updated :term:`CWL` package with the reference to the :term:`Process`.
"""
from weaver.processes.utils import is_cwl_package, load_package_file # pylint: disable=C0415 # circular import
payload_copy = copy.deepcopy(payload)
process_info = payload_copy.get("process", payload) # type: JSON # OLD/OGC schemas nested process or at root
# the process information is sufficient to define the process by itself,
# but attempt retrieval of further details to generate better CWL references if it can be located
ows_ref = process_info.get("owsContext", {}).get("offering", {}).get("content", {}).get("href")
proc_ref = process_info.get("href")
cwl_ref = proc_ref or ows_ref # type: Optional[str]
cwl_pkg = {} # type: CWL
if cwl_ref:
cwl_pkg = load_package_file(cwl_ref)
else:
exec_unit = payload_copy.get("executionUnit")
try:
if sd.ExecutionUnitList(missing=colander.drop).deserialize(exec_unit) is not colander.drop:
for unit in exec_unit:
unit_ref = unit.get("href")
unit_pkg = unit.get("unit")
if unit_ref:
cwl_pkg = load_package_file(unit_ref)
break
if is_cwl_package(unit_pkg):
cwl_pkg = unit_pkg
break
except colander.Invalid:
pass
if cwl_pkg:
# CWL resolved with most amount of metadata available
# remove fields that would cause conflicting specification of the local CWL for its remote counterpart
for drop_field in ["baseCommand", "arguments", "hints", "requirements"]:
cwl_pkg.pop(drop_field, None)
else:
# if no CWL could be resolved, generate I/O from process
io_ns = {} # type: Dict[str, str]
for io_select in [IO_INPUT, IO_OUTPUT]:
io_holder = f"{io_select}s" # type: Literal["inputs", "outputs"] # noqa
io_struct = copy.deepcopy(process_info.get(io_holder, {}))
io_struct = normalize_ordered_io(io_struct)
cwl_pkg[io_holder] = {} # type: Dict[str, CWL_IO_Type] # noqa
for io_def in io_struct:
io_id = get_field(io_def, "identifier", search_variations=True)
cwl_io, cwl_ns = any2cwl_io(io_def, io_select)
cwl_io.pop("id", None) # remove duplicate since provided as key
cwl_pkg[io_holder][io_id] = cwl_io # type: ignore
io_ns.update(cwl_ns)
cwl_pkg.update({"$namespaces": io_ns} if io_ns else {})
# even if the remote process is actually a Workflow on the target server,
# dispatched execution from Weaver will consider it as a single application
cwl_package = {
"cwlVersion": "v1.0",
"class": "CommandLineTool",
"hints": {
CWL_REQUIREMENT_APP_OGC_API: {
"process": reference
}
}
}
cwl_package.update(cwl_pkg) # type: ignore
_patch_cwl_enum_js_requirement(cwl_package)
payload_copy["executionUnit"] = [{"unit": cwl_package}]
payload_copy["deploymentProfile"] = "http://www.opengis.net/profiles/eoc/ogcapiApplication"
return cwl_package, payload_copy
[docs]
def is_cwl_complex_type(io_info, complex_types=PACKAGE_COMPLEX_TYPES):
# type: (CWL_IO_Type, Iterable[CWL_IO_ComplexType]) -> bool
"""
Identifies if the provided :term:`CWL` input/output corresponds to one, many or a potential `Complex` type(s).
When multiple distinct *atomic* types are allowed for a given I/O (e.g.: ``type: [string, File]``) and that one
of them is one of the considered `Complex` type, the result will be ``True`` even if other types are not `Complex`.
Similarly, optional `Complex` types combined with ``"null"`` will also return ``True``.
:param io_info: :term:`I/O` to verify for complex type.
:param complex_types:
Complex types to consider.
By default, any type between :term:`CWL` ``File`` and ``Directory`` are valid.
The operation can be limited to one or the other if needed to identify a specific one.
"""
io_type = io_info.get("type")
if not io_type:
raise ValueError(f"Missing CWL 'type' definition: [{io_info!s}]")
if isinstance(io_type, str):
return io_type in complex_types
if isinstance(io_type, dict):
if io_type["type"] == PACKAGE_ARRAY_BASE:
return io_type["items"] in complex_types
return io_type["type"] in complex_types
if isinstance(io_type, list):
return any(
(isinstance(typ, str) and typ in complex_types) or
is_cwl_complex_type({"type": typ}, complex_types)
for typ in io_type
)
raise ValueError(f"Unknown parsing of CWL 'type' format ({type(io_type)!s}) [{io_type!s}] in [{io_info}]")
[docs]
def parse_cwl_array_type(io_info, strict=True):
# type: (CWL_IO_Type, bool) -> CWLIODefinition
"""
Parses the specified :term:`I/O` for one of the various potential CWL array definitions.
:param io_info: :term:`CWL` :term:`I/O` definition to parse.
:param strict: Indicates if only pure :term:`CWL` definition is allowed, or allow implicit data-type conversions.
:returns: Updated :term:`CWL` :term:`I/O` definition with applicable properties.
:raises PackageTypeError: if the array element doesn't have the required values and valid format.
"""
# use mapping to allow sub-function updates
io_return = CWLIODefinition(
array=False,
symbols=AnyValue,
type=get_cwl_io_type_name(io_info["type"]),
mode=MODE.NONE,
)
def _update_if_sub_enum(_io_item):
# type: (CWL_IO_Type) -> bool
"""
Updates the ``io_return`` parameters if ``io_item`` evaluates to a valid ``enum`` type.
Parameter ``io_item`` should correspond to field ``items`` of an array :term:`I/O` definition.
Simple pass-through if the array item is not an ``enum``.
"""
_def = parse_cwl_enum_type({"type": _io_item})
if _def.enum:
LOGGER.debug("I/O [%s] parsed as 'array' with sub-item as 'enum'", io_info["name"])
io_return.enum = True
io_return.type = _def.type
io_return.mode = _def.mode
io_return.symbols = _def.symbols
return _def.enum
# optional I/O could be an array of '["null", "<type>"]' with "<type>" being any of the formats parsed after
# is it the literal representation instead of the shorthand with '?'
if isinstance(io_info["type"], list) and any(sub_type == "null" for sub_type in io_info["type"]):
# we can ignore the optional indication in this case because it doesn't impact following parsing
io_return.type = list(filter(lambda sub_type: sub_type != "null", io_info["type"]))[0]
# array type conversion when defined as '{"type": "array", "items": "<type>"}'
# validate against 'Hashable' instead of 'dict' since 'OrderedDict'/'CommentedMap' can fail 'isinstance()'
if (
not isinstance(io_return.type, str)
and not isinstance(io_return.type, Hashable)
and "items" in io_return.type
and "type" in io_return.type
):
io_type = dict(io_return.type) # make hashable to allow comparison
if io_type["type"] != PACKAGE_ARRAY_BASE:
raise PackageTypeError(f"Unsupported I/O 'array' definition: '{io_info!r}'.")
# parse enum in case we got an array of allowed symbols
io_items = get_cwl_io_type_name(io_type["items"])
is_enum = _update_if_sub_enum(io_items)
if not is_enum:
io_return.type = io_items
io_type = get_cwl_io_type_name(io_return.type)
if io_type not in PACKAGE_ARRAY_ITEMS: # includes Complex, so implicit literal-only check possible
io_type = any2cwl_literal_datatype(io_type)
if strict or io_type not in PACKAGE_ARRAY_ITEMS:
raise PackageTypeError(f"Unsupported I/O 'array' definition: '{io_info!r}'.")
io_return.type = io_type
LOGGER.debug("I/O [%s] parsed as 'array' with nested dict notation", io_info["name"])
io_return.array = True
# array type conversion when defined as string '<type>[]'
elif isinstance(io_return.type, str) and get_cwl_io_type_name(io_return.type) in PACKAGE_ARRAY_TYPES:
io_return.type = get_cwl_io_type_name(io_return.type[:-2]) # remove '[]'
if io_return.type in PACKAGE_CUSTOM_TYPES:
# parse 'enum[]' for array of allowed symbols, provide expected structure for sub-item parsing
io_item = deepcopy(io_info)
io_item["type"] = io_return.type # override corrected type without '[]'
_update_if_sub_enum(io_item)
if io_return.type not in PACKAGE_ARRAY_ITEMS:
raise PackageTypeError(f"Unsupported I/O 'array' definition: '{io_info!r}'.")
LOGGER.debug("I/O [%s] parsed as 'array' with shorthand '[]' notation", io_info["name"])
io_return.array = True
# in case the I/O was not an array parsed with one of the above conditions,
# still check for enum to be consistant in returned definition if one was provided
try:
_update_if_sub_enum(io_info)
except PackageTypeError:
pass
return io_return
[docs]
def parse_cwl_enum_type(io_info):
# type: (CWL_IO_Type) -> CWLIODefinition
"""
Parses the specified I/O for potential CWL enum definition.
:returns: Updated :term:`CWL` I/O definition with applicable properties.
:raises PackageTypeError: if the enum doesn't have the required parameters and valid format.
"""
io_type = get_cwl_io_type_name(io_info["type"])
if not isinstance(io_type, dict) or "type" not in io_type:
io_def = CWLIODefinition(
type=io_type,
enum=False,
mode=MODE.NONE,
)
return io_def
if isinstance(io_type, dict) and "type" in io_type and (
isinstance(io_type["type"], str) and io_type["type"] not in PACKAGE_CUSTOM_TYPES or
isinstance(io_type["type"], list)
):
io_type = io_type["type"] if isinstance(io_type["type"], str) else PACKAGE_ARRAY_BASE
io_def = CWLIODefinition(
type=io_type,
enum=False,
mode=MODE.NONE,
)
return io_def
if "symbols" not in io_type:
raise PackageTypeError(f"Unsupported I/O 'enum' definition missing 'symbols': '{io_info!r}'.")
io_allow = io_type["symbols"]
if not isinstance(io_allow, list) or len(io_allow) < 1:
raise PackageTypeError(f"Invalid I/O 'enum.symbols' definition: '{io_info!r}'.")
# validate matching types in allowed symbols and convert to supported CWL type
first_allow = io_allow[0]
for io_i in io_allow:
if type(io_i) is not type(first_allow):
raise PackageTypeError(f"Ambiguous types in I/O 'enum.symbols' definition: '{io_info!r}'.")
if isinstance(first_allow, str):
io_type = "string"
elif isinstance(first_allow, float):
io_type = "float"
elif isinstance(first_allow, int):
io_type = "int"
else:
raise PackageTypeError(
f"Unsupported I/O 'enum' base type: `{type(first_allow)!s}`, from definition: `{io_info!r}`."
)
io_def = CWLIODefinition(
type=io_type, # type: ignore
enum=True,
mode=MODE.SIMPLE, # allowed value validator mode must be set for input
symbols=io_allow,
)
return io_def
[docs]
def get_cwl_io_type_name(io_type):
# type: (Any) -> Any
"""
Obtain the simple type-name representation of a :term:`CWL` I/O.
Depending on :mod:`cwltool` version, types are represented with or without an extended prefix, and using an
explicit quoted class representation rather than plain strings.
"""
if isinstance(io_type, str):
return str(io_type.replace("org.w3id.cwl.cwl.", ""))
return io_type
[docs]
def resolve_cwl_io_type_schema(io_info, cwl_schema_names=None):
# type: (CWL_IO_Type, Optional[CWL_SchemaNames]) -> CWL_IO_Type
"""
Reverse :term:`CWL` schema references by name back to their full :term:`CWL` I/O definition.
.. seealso::
- :meth:`weaver.processes.wps_package.WpsPackage.make_inputs`
- :meth:`weaver.processes.wps_package.WpsPackage.update_cwl_schema_names`
"""
if not isinstance(io_info, dict) or not cwl_schema_names:
return get_cwl_io_type_name(io_info)
io_type = io_info.get("type")
io_item = io_info.get("items")
if io_type == PACKAGE_ARRAY_BASE and isinstance(io_item, str):
io_info = io_info.copy() # avoid undoing CWL tool parsing/resolution
io_name = get_cwl_io_type_name(io_item) # avoid mapping back to File/Directory records in CWL schema names
if io_name in cwl_schema_names:
io_name = cwl_schema_names[io_item]._props
io_info["items"] = io_name
elif isinstance(io_type, str):
io_info = io_info.copy() # avoid undoing CWL tool parsing/resolution
io_name = get_cwl_io_type_name(io_type) # avoid mapping back to File/Directory records in CWL schema names
if io_name in cwl_schema_names:
io_name = cwl_schema_names[io_type]._props
io_info["type"] = io_name
return io_info
[docs]
def resolve_cwl_namespaced_name(name):
# type: (str) -> str
"""
Remove any :term:`URN` prefixes added by :term:`CWL` from a name.
Includes removal of contextual reference of the source :term:`CWL` file that contained the name.
Includes reversing :term:`CWL`-specific namespaces :term:`URN` extended to their full URL form.
"""
ns = name.split("#")
if len(ns) < 2:
return name.rsplit("/", 1)[-1]
ns_uri = f"{ns[0]}#"
ns_urn = CWL_NAMESPACES_REVERSED.get(ns_uri)
if ns_urn:
return f"{ns_urn}:{ns[-1]}"
return ns[-1]
@dataclass
[docs]
class CWLIODefinition(object):
"""
Utility :term:`CWL` I/O definition to contain metadata from parsing results.
.. seealso::
:func:`weaver.processes.convert.get_cwl_io_type`
"""
# provide dataclass conversions for 'tuple()', 'list()', 'dict()'
[docs]
def keys(self):
# type: () -> List[str]
fields = getattr(self, "__dataclass_fields__")
return list(fields)
def __getitem__(self, key):
# type: (str) -> Any
return getattr(self, key)
def __iter__(self):
# type: () -> Iterator[Any]
for key in self.keys():
value = self[key]
yield value
# --- FIELDS ---
"""
Name (or identifier) or the I/O.
"""
[docs]
type: "Union[CWL_IO_LiteralType, CWL_IO_ComplexType, CWL_IO_DataType]" = None
"""
Type of the :term:`CWL` I/O.
If :attr:`enum` is ``True``, represents the enum base type.
If :attr:`array` is ``True``, represents the item type.
.. note::
Before resolution with :func:`parse_cwl_array_type`, this attribute can temporarily hold any :term:`CWL` type
structure (list, dict, nested types, etc.). After parsing, it will be resolved to the basic string type.
"""
"""
Indicates if the I/O is nullable.
This is obtained from a type composed of ``"null"`` and something else,
or using the shorthand ``{type}?`` notation.
"""
"""
Minimum number of occurrences allowed.
When :attr:`null` is ``True``, it is equal to ``0``.
Otherwise, it is greater or equal to ``1``.
If greater than ``1``, :attr:`array` should be ``True``.
"""
"""
Maximum number of occurrences allowed.
Applies only when :attr:`array` is ``True``. Otherwise, always equal to ``1``.
Can take the value :data:`PACKAGE_ARRAY_MAX_SIZE` to represent ``"unbounded"`` occurrences.
"""
"""
Specifies if the I/O is of array type.
"""
"""
Specifies if the I/O is of enum type.
"""
[docs]
symbols: "Union[CWL_IO_EnumSymbols, AnyValue, Type[AnyValue]]" = AnyValue
"""
Specifies the allowed values when the definition is marked as :attr:`enum`.
When not overridden by literal values, it uses the default :class:`AnyValue`.
"""
"""
Validation mode to be applied if I/O requires it.
Defaults to :attr:`MODE.NONE`. Indicates how strict the validation must be.
Usually applies when an enum must only allow a specific set of symbols.
Can also be used with Media-Types in more advanced validation use case with :mod:`pywps`.
"""
[docs]
def get_cwl_io_type(io_info, strict=True, cwl_schema_names=None):
# type: (CWL_IO_Type, bool, Optional[CWL_SchemaNames]) -> CWLIODefinition
"""
Obtains the basic type of the CWL input and identity if it is optional.
CWL allows multiple shorthand representation or combined types definition.
The *base* type must be extracted in order to identify the expected data format and supported values.
Obtains real type if ``"default"`` or shorthand ``"<type>?"`` was in CWL, which
can also be defined as type ``["null", <type>]``.
CWL allows multiple distinct types (e.g.: ``string`` and ``int`` simultaneously), but not WPS inputs.
WPS allows only different amount of *same type* through ``minOccurs`` and ``maxOccurs``.
Considering WPS conversion, we can also have the following definition ``["null", <type>, <array-type>]`` will all
basic types matching exactly. Whether single or array-like type, the base type can be extracted.
:param io_info: :term:`CWL` definition to parse.
:param strict: Indicates if only pure :term:`CWL` definition is allowed, or allow implicit data-type conversions.
:param cwl_schema_names: Mapping of CWL type schema references to resolve in long form if used in a definition.
:return: tuple of guessed base type and flag indicating if it can be null (optional input).
"""
io_type = get_cwl_io_type_name(io_info["type"])
is_null = False
io_mode = MODE.NONE
io_allow = AnyValue
# parse multi-definition
if isinstance(io_type, list):
if not len(io_type) > 1:
raise PackageTypeError(f"Unsupported I/O type as list cannot have only one base type: '{io_info}'")
if "null" in io_type:
if len(io_type) == 1:
raise PackageTypeError(f"Unsupported I/O cannot be only 'null' type: '{io_info}'")
LOGGER.debug("I/O parsed for 'default'")
is_null = True # I/O can be omitted since default value exists
io_type = [typ for typ in io_type if typ != "null"]
if len(io_type) == 1: # valid if other was "null" now removed
io_type = io_type[0]
else:
# check that many sub-type definitions all match same base type (no conflicting literals)
io_type_many = set()
io_base_type = None
for i, typ in enumerate(io_type, start=int(is_null)):
typ = resolve_cwl_io_type_schema(typ, cwl_schema_names)
io_name = io_info["name"]
sub_type = {"type": typ, "name": f"{io_name}[{i}]"} # type: CWL_IO_Type
array_io_def = parse_cwl_array_type(sub_type, strict=strict)
enum_io_def = parse_cwl_enum_type(sub_type)
# array base type more important than enum because later array conversion also handles allowed values
if array_io_def.array:
io_base_type = typ # highest priority (can have sub-literal or sub-enum)
io_type_many.add(array_io_def.type)
elif enum_io_def.enum:
io_base_type = io_base_type if io_base_type is not None else enum_io_def.type # less priority
io_type_many.add(enum_io_def.type)
else:
io_base_type = io_base_type if io_base_type is not None else typ # less priority
io_type_many.add(typ) # literal base type by itself (not array/enum)
if len(io_type_many) != 1:
raise PackageTypeError(f"Unsupported I/O with many distinct base types for info: '{io_info!s}'")
io_type = io_base_type
LOGGER.debug("I/O parsed for multiple base types")
# parse single-definition
io_info = io_info.copy()
io_info["type"] = io_type # override resolved multi-type base for more parsing
io_name = io_info["name"]
io_min_occurs = 0 if is_null else 1
io_max_occurs = 1 # unless array after
# convert array types
array_io_def = parse_cwl_array_type(io_info, strict=strict)
if array_io_def.array:
LOGGER.debug("I/O parsed for 'array'")
io_type = array_io_def.type
io_max_occurs = PACKAGE_ARRAY_MAX_SIZE
# convert enum types
enum_io_def = parse_cwl_enum_type(io_info)
is_enum = False
if enum_io_def.enum:
LOGGER.debug("I/O parsed for 'enum' from base")
io_type = enum_io_def.type
io_allow = enum_io_def.symbols
io_mode = enum_io_def.mode
is_enum = True
elif array_io_def.enum:
LOGGER.debug("I/O parsed for 'enum' from array")
io_type = array_io_def.type
io_allow = array_io_def.symbols
io_mode = array_io_def.mode
is_enum = True
# debug info for unhandled types conversion
if not isinstance(io_type, str):
LOGGER.debug("is_array: [%s]", repr(array_io_def.array))
LOGGER.debug("array_elem: [%s]", repr(array_io_def.type))
LOGGER.debug("is_enum: [%s]", repr(enum_io_def.enum))
LOGGER.debug("enum_type: [%s]", repr(enum_io_def.type))
LOGGER.debug("enum_allow: [%s]", repr(enum_io_def.symbols))
LOGGER.debug("io_info: [%s]", repr(io_info))
LOGGER.debug("io_type: [%s]", repr(io_type))
LOGGER.debug("type(io_type): [%s]", type(io_type))
raise TypeError(f"I/O type has not been properly decoded. Should be a string, got: '{io_type!r}'")
io_type = get_cwl_io_type_name(io_type)
# parse shorthand notation for nullable
if io_type.endswith("?"):
io_type = io_type[:-1]
io_min_occurs = 0
is_null = True
if io_type not in PACKAGE_COMPLEX_TYPES:
io_type = any2cwl_literal_datatype(io_type)
io_def = CWLIODefinition(
name=io_name,
type=io_type,
null=is_null,
min_occurs=io_min_occurs,
max_occurs=io_max_occurs,
array=array_io_def.array,
enum=is_enum,
symbols=io_allow,
mode=io_mode,
)
return io_def
[docs]
def cwl2wps_io(io_info, io_select):
# type:(CWL_IO_Type, IO_Select_Type) -> WPS_IO_Type
"""
Converts input/output parameters from CWL types to WPS types.
:param io_info: parsed IO of a CWL file
:param io_select: :py:data:`IO_INPUT` or :py:data:`IO_OUTPUT` to specify desired WPS type conversion.
:returns: corresponding IO in WPS format
"""
is_input = False
is_output = False
# FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51)
if io_select == IO_INPUT:
is_input = True
io_literal = LiteralInput # type: Union[Type[LiteralInput], Type[LiteralOutput]]
io_complex = ComplexInput # type: Union[Type[ComplexInput], Type[ComplexOutput]]
# io_bbox = BoundingBoxInput # type: Union[Type[BoundingBoxInput], Type[BoundingBoxOutput]]
elif io_select == IO_OUTPUT:
is_output = True
io_literal = LiteralOutput # type: Union[Type[LiteralInput], Type[LiteralOutput]]
io_complex = ComplexOutput # type: Union[Type[ComplexInput], Type[ComplexOutput]]
# io_bbox = BoundingBoxOutput # type: Union[Type[BoundingBoxInput], Type[BoundingBoxOutput]]
else:
raise PackageTypeError(f"Unsupported I/O info definition: '{io_info!r}' with '{io_select}'.")
# obtain base type considering possible CWL type representations
io_def = get_cwl_io_type(io_info)
# literal types
if io_def.enum or (isinstance(io_def.type, str) and io_def.type in PACKAGE_LITERAL_TYPES):
if io_def.type == "Any":
io_def.type = "anyvalue"
if io_def.type == "null":
io_def.type = "novalue"
if io_def.type in ["int", "integer", "long"]:
io_def.type = "integer"
if io_def.type in ["float", "double"]:
io_def.type = "float"
# keywords commonly used by I/O
kw = {
"identifier": io_def.name,
"title": io_info.get("label", ""),
"abstract": io_info.get("doc", ""),
"data_type": io_def.type,
"mode": io_def.mode,
}
if is_input:
# avoid storing 'AnyValue' which become more problematic than
# anything later on when CWL/WPS merging is attempted
if io_def.symbols is not AnyValue:
kw["allowed_values"] = io_def.symbols
kw["default"] = io_info.get("default", None)
kw["min_occurs"] = io_def.min_occurs
kw["max_occurs"] = io_def.max_occurs
return io_literal(**kw)
# complex types
else:
# keywords commonly used by I/O
kw = {
"identifier": io_def.name,
"title": io_info.get("label", io_def.name),
"abstract": io_info.get("doc", ""),
}
# format can represent either a Media-Type, a schema reference, or a CWL expression
# - format as Media-Type is useful for WPS Complex
# - format as schema is useful for WPS BoundingBox JSON/YAML structure
# - format as CWL expression is ignored since it cannot be easily interpreted
io_formats = []
if "format" in io_info:
io_fmt = io_info["format"]
io_formats = [io_fmt] if isinstance(io_fmt, str) else io_fmt
io_formats = [
get_format(fmt) for fmt in io_formats
if (
fmt and isinstance(fmt, str)
and not any(fmt.strip().startswith(fmt_s) for fmt_s in ["$", "{", "("])
and not any(fmt.strip().endswith(fmt_e) for fmt_e in ["}", ")"])
)
]
# filter out duplicates
io_formats = list({HashDict(fmt.json): fmt for fmt in io_formats}.values())
if io_formats:
kw["supported_formats"] = io_formats
kw["mode"] = MODE.SIMPLE # only validate the extension (not file contents)
else:
# we need to minimally add 1 format, otherwise empty list is evaluated as None by pywps
# when "supported_formats" is None, the process's json property raises because of it cannot iterate formats
if io_def.type == PACKAGE_FILE_TYPE:
kw["supported_formats"] = [DEFAULT_FORMAT]
if io_def.type == PACKAGE_DIRECTORY_TYPE:
kw["supported_formats"] = [get_format(ContentType.APP_DIR)]
kw["mode"] = MODE.NONE # don't validate anything as default is only raw text
if is_output:
if io_def.type == PACKAGE_DIRECTORY_TYPE:
kw["as_reference"] = True
if io_def.type == PACKAGE_FILE_TYPE:
has_contents = io_info.get("contents") is not None
kw["as_reference"] = not has_contents
else:
# note:
# value of 'data_format' is identified as 'default' input format if specified with `Format`
# otherwise, `None` makes it automatically use the first one available in 'supported_formats'
kw["data_format"] = get_field(io_info, "data_format")
kw["data_format"] = json2wps_field(kw["data_format"], "supported_formats") if kw["data_format"] else None
kw.update({
"min_occurs": io_def.min_occurs,
"max_occurs": io_def.max_occurs,
})
return io_complex(**kw)
@overload
@overload
def convert_input_values_schema(inputs, schema):
# type: (ExecutionInputs, JobInputsOutputsSchema.OLD) -> ExecutionInputsList
...
def convert_input_values_schema(inputs, schema):
# type: (ExecutionInputs, JobInputsOutputsSchemaType) -> ExecutionInputs
"""
Convert execution input values between equivalent formats.
.. seealso::
- :func:`convert_output_params_schema`
- :func:`normalize_ordered_io` for I/O definitions.
:param inputs: Inputs to convert.
:param schema: Desired schema.
:return: Converted inputs.
"""
if isinstance(schema, str):
schema = schema.lower().split("+", 1)[0]
if (
(schema == JobInputsOutputsSchema.OGC and isinstance(inputs, dict)) or
(schema == JobInputsOutputsSchema.OLD and isinstance(inputs, list))
):
return inputs
if (
(schema == JobInputsOutputsSchema.OGC and not isinstance(inputs, list)) or
(schema == JobInputsOutputsSchema.OLD and not isinstance(inputs, dict))
):
name = fully_qualified_name(inputs)
raise ValueError(f"Unknown conversion method to schema [{schema}] for inputs of type [{name}]: {inputs}")
if schema == JobInputsOutputsSchema.OGC:
input_dict = {}
for input_item in inputs: # type: JobValueItem
input_id = get_any_id(input_item, pop=True)
input_val = get_any_value(input_item)
input_key = get_any_value(input_item, key=True, data=True, file=False)
# if the input type is data, values are grouped directly (inline values)
# if the input type is file, {reference + format} must both be regrouped
input_data = input_val if input_key else input_item
if input_id not in input_dict:
input_dict[input_id] = input_data
else:
# when repeated input ID are found, they must be regrouped as list under that ID
input_prev = input_dict[input_id]
if not isinstance(input_prev, list):
input_prev = [input_prev]
input_prev.append(input_data)
input_dict[input_id] = input_prev
return input_dict
if schema == JobInputsOutputsSchema.OLD:
input_list = []
for input_id, input_value in inputs.items():
# list must be flattened with repeating ID
if isinstance(input_value, list):
for input_data in input_value:
if isinstance(input_data, dict):
# can be either nested {value: literal} or {file + format} items
# either way, those are already in the desired format
input_item = {"id": input_id}
input_item.update(input_data)
else:
# otherwise only literals are accepted inline
input_item = {"id": input_id, "value": input_data}
input_list.append(input_item)
elif isinstance(input_value, dict):
input_key = list(input_value)[0]
input_value = input_value[input_key]
input_list.append({"id": input_id, input_key: input_value})
else:
input_list.append({"id": input_id, "value": input_value})
return input_list
raise NotImplementedError(f"Unknown conversion format of input values for schema: [{schema}]")
@overload
[docs]
def convert_output_params_schema(inputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaAnyOGCType) -> Optional[ExecutionOutputsMap]
...
@overload
def convert_output_params_schema(inputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaAnyOLDType) -> Optional[ExecutionOutputsList]
...
def convert_output_params_schema(outputs, schema):
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaType) -> Optional[ExecutionOutputs]
"""
Convert execution output parameters between equivalent formats.
.. warning::
These outputs are not *values* (i.e.: *results*), but *submitted* :term:`Job` outputs for return definitions.
Contents are transferred as-is without any consideration of ``value`` or ``href`` fields.
.. seealso::
- :func:`convert_input_values_schema`
- :func:`normalize_ordered_io` for I/O definitions.
:param outputs: Outputs to convert.
:param schema: Desired schema.
:return: Converted outputs.
"""
if outputs is None:
return outputs
if isinstance(schema, str):
schema = schema.lower().split("+")[0]
if (
(schema == JobInputsOutputsSchema.OGC and isinstance(outputs, dict)) or
(schema == JobInputsOutputsSchema.OLD and isinstance(outputs, list))
):
return outputs
if (
(schema == JobInputsOutputsSchema.OGC and not isinstance(outputs, list)) or
(schema == JobInputsOutputsSchema.OLD and not isinstance(outputs, dict))
):
name = fully_qualified_name(outputs)
raise ValueError(f"Unknown conversion method to schema [{schema}] for outputs of type [{name}]: {outputs}")
if schema == JobInputsOutputsSchema.OGC:
out_dict = {}
for out in outputs:
out = dict(out) # type: ignore # avoid modifying reference
out_id = get_any_id(out, pop=True)
out_dict[out_id] = out
return out_dict
if schema == JobInputsOutputsSchema.OLD:
out_list = [{"id": out} for out in outputs]
for out in out_list:
out.update(outputs[out["id"]])
return out_list
raise NotImplementedError(f"Unknown conversion format of outputs definitions for schema: [{schema}]")
[docs]
def any2cwl_literal_datatype(io_type):
# type: (str) -> Union[str, Type[null]]
"""
Solves common literal data-type names to supported ones for `CWL`.
"""
if io_type in WPS_LITERAL_DATA_STRING | WPS_LITERAL_DATA_DATETIME | OAS_LITERAL_STRING_FORMATS:
return "string"
if io_type in WPS_LITERAL_DATA_INTEGER | OAS_LITERAL_INTEGER_FORMATS:
return "int"
if io_type in WPS_LITERAL_DATA_FLOAT | OAS_LITERAL_FLOAT_FORMATS | OAS_LITERAL_NUMERIC:
return "float"
if io_type in WPS_LITERAL_DATA_BOOLEAN:
return "boolean"
return null
[docs]
def any2wps_literal_datatype(io_type, is_value=False, pywps=False):
# type: (Union[AnyValueType, WPS_LiteralData_Type], bool, bool) -> Union[str, Type[null]]
"""
Solves common literal data-type names to supported ones for `WPS`.
Verification is accomplished by name when ``is_value=False``, otherwise with python ``type`` when ``is_value=True``.
:param io_type: Type to convert to :term:`WPS` supported literal data type.
:param is_value: If enabled, consider :paramref:`io_type` literal data itself to attempt detection of the type.
:param pywps: If enabled, restrict only to types supported by :mod:`pywps` (subset of full :term:`WPS`).
"""
if isinstance(io_type, str):
if not is_value:
if io_type in WPS_LITERAL_DATA_STRING | WPS_LITERAL_DATA_DATETIME | OAS_LITERAL_STRING_FORMATS:
if io_type in WPS_LITERAL_DATA_DATETIME | OAS_LITERAL_DATETIME_FORMATS:
return io_type if io_type in WPS_LITERAL_DATA_DATETIME else "dateTime"
return "string"
if io_type in WPS_LITERAL_DATA_INTEGER | OAS_LITERAL_INTEGER_FORMATS:
return "integer"
if io_type in WPS_LITERAL_DATA_FLOAT | OAS_LITERAL_FLOAT_FORMATS | OAS_LITERAL_NUMERIC:
return "float" if pywps or io_type not in WPS_LITERAL_DATA_FLOAT else io_type
if io_type in WPS_LITERAL_DATA_BOOLEAN:
return "boolean"
LOGGER.warning("Unknown named literal data type: '%s', using default 'string'. Should be one of: %s",
io_type, list(WPS_LITERAL_DATA_TYPES))
return "string"
if is_value and isinstance(io_type, bool):
return "boolean"
if is_value and isinstance(io_type, int):
return "integer"
if is_value and isinstance(io_type, float):
return "float"
return null
[docs]
def any2json_literal_allowed_value(io_allow):
# type: (Union[AllowedValue, JSON, str, float, int, bool]) -> Union[JSON, str, str, float, int, bool, Type[null]]
"""
Converts an ``AllowedValues`` definition from different packages into standardized JSON representation of `OGC-API`.
"""
if isinstance(io_allow, AllowedValue):
io_allow = io_allow.json
if isinstance(io_allow, dict):
wps_range = {}
for field, dest in [
("range_minimum", "minimumValue"),
("range_maximum", "maximumValue"),
("range_spacing", "spacing"),
("range_closure", "rangeClosure")
]:
wps_range_value = get_field(io_allow, field, search_variations=True, pop_found=True)
if wps_range_value is not null:
wps_range[dest] = wps_range_value
# in case input was a PyWPS AllowedValue object converted to JSON,
# extra metadata must be removed/transformed accordingly for literal value
basic_type = io_allow.pop("type", None)
allowed_type = io_allow.pop("allowed_type", None)
allowed_type = allowed_type or basic_type
allowed_value = io_allow.pop("value", None)
if allowed_value is not None:
# note: closure must be ignored for range compare because it defaults to 'close' even for a 'value' type
range_fields = ["minimumValue", "maximumValue", "spacing"]
if allowed_type == "value" or not any(field in io_allow for field in range_fields):
return allowed_value
io_allow = wps_range
if not io_allow: # empty container
return null
return io_allow
[docs]
def any2json_literal_data_domains(io_info):
# type: (ANY_IO_Type) -> Union[Type[null], List[JSON]]
"""
Extracts allowed value constrains from the input definition and generate the expected literal data domains.
The generated result, if applicable, corresponds to a list of a single instance of
schema definition :class:`weaver.wps_restapi.swagger_definitions.LiteralDataDomainList` with following structure.
.. code-block:: yaml
default: bool
defaultValue: float, int, bool, str
dataType: {name: string, <reference: url: string>}
UOMs:
- default: {uom: string, reference: url-string}
- supported: [{uom: string, reference: url-string}]
valueDefinition:
oneOf:
- string
- url-string
- {anyValue: bool}
- [float, int, bool, str]
- [{minimum: number/none, maximum: number/none, spacing: number/none, closure: str open/close variations}]
"""
io_type = get_field(io_info, "type", search_variations=False)
if io_type in [WPS_BOUNDINGBOX, WPS_COMPLEX]:
return null
io_data_type = get_field(io_info, "type", search_variations=True, only_variations=True)
domain = {
"default": True, # since it is generated from convert, only one is available anyway
"dataType": {
"name": any2wps_literal_datatype(io_data_type, is_value=False), # just to make sure, simplify type
# reference: # FIXME: unsupported named-reference data-type (need example to test it)
}
}
wps_allowed_values = get_field(io_info, "allowed_values", search_variations=True)
wps_default_value = get_field(io_info, "default", search_variations=True)
wps_value_definition = {"anyValue": get_field(io_info, "any_value", search_variations=True, default=False)}
if wps_default_value not in [null, None]:
domain["defaultValue"] = wps_default_value
if isinstance(wps_allowed_values, list) and len(wps_allowed_values) > 0:
wps_allowed_values = [any2json_literal_allowed_value(io_value) for io_value in wps_allowed_values]
wps_allowed_values = [io_value for io_value in wps_allowed_values if io_value is not null]
if wps_allowed_values:
wps_value_definition = wps_allowed_values
domain["valueDefinition"] = wps_value_definition
wps_support_uom = get_field(io_info, "uoms", search_variations=True)
wps_default_uom = get_field(io_info, "uom", search_variations=True)
if wps_support_uom:
domain["UOMs"] = {
"supported": [uom2json(uom) for uom in wps_support_uom],
}
if wps_default_uom:
domain["UOMs"]["default"] = uom2json(wps_default_uom)
return [domain]
[docs]
def json2oas_io_complex(io_info, io_hint=null):
# type: (JSON_IO_Type, Union[OpenAPISchema, Type[null]]) -> OpenAPISchema
"""
Convert a single-dimension complex :term:`JSON` I/O definition into corresponding :term:`OpenAPI` schema.
"""
item_types = []
item_formats = get_field(io_info, "supported_formats", search_variations=True)
if isinstance(item_formats, list) and item_formats:
json_schema_refs = set()
json_schema_any = None
for fmt in item_formats:
fmt_media = get_field(fmt, "mime_type", search_variations=True)
fmt_encode = get_field(fmt, "encoding", search_variations=True)
fmt_schema = get_field(fmt, "schema", search_variations=False)
# heuristic to guess more specific encoding
fmt_type_as_text = ["multipart/", "application/"] # others always binary (eg: image)
fmt_subtype_as_text = ["+xml", "/json", "yaml"]
if not fmt_encode:
if fmt_media.startswith("text/") or (
any(fmt_media.startswith(fmt_sub) for fmt_sub in fmt_type_as_text) and
any(fmt_enc in fmt_media for fmt_enc in fmt_subtype_as_text)
):
fmt_encode = None
else:
fmt_encode = "base64"
if fmt_encode:
# format/contentEncoding somewhat redundant,
# but providing both allows using "preferred" approach by either OpenAPI 3.0/3.1
item_types.append({
"type": "string",
"format": "binary",
"contentMediaType": fmt_media,
"contentEncoding": fmt_encode,
})
else:
item_types.append({
"type": "string",
"contentMediaType": fmt_media,
})
if fmt_schema: # could be non-JSON, just a reference
item_types[-1]["contentSchema"] = fmt_schema
if ContentType.APP_JSON in fmt_media:
json_schema_any = True
if fmt_schema: # got an explicit JSON
json_schema_any = False
item_types[-1]["contentSchema"] = fmt_schema
json_schema_refs.add(fmt_schema)
json_objects = []
if json_schema_any:
# if no ref schema, best we can do is 'any JSON' since cannot guess applicable schema not provided by user
json_objects = [{"type": "object", "additionalProperties": True}]
elif json_schema_refs:
json_objects = [{"$ref": ref} for ref in json_schema_refs]
# if we have a hint of the raw-data schema originally submitted during deploy, use it instead
if isinstance(io_hint, dict):
# consider that submitted schema could already have contained 'content[...]' definitions
# remove them in this case since they should have already been processed during first conversion/merge
io_hint = oas_resolve_remote(io_hint)
json_objects = []
if "oneOf" in io_hint or "anyOf" in io_hint:
for item in io_hint.get("oneOf", io_hint.get("anyOf", [])): # type: OpenAPISchema
if item.get("type") == "object" or "allOf" in item:
json_objects.append(item)
elif "allOf" in io_hint:
json_objects = [io_hint]
elif io_hint.get("type") == "object":
json_objects = [io_hint]
item_types.extend(json_objects)
else:
# complex by reference or encoded data
item_types = [{"type": "string", "format": "binary"}]
item_schema = {"oneOf": item_types} if len(item_types) > 1 else item_types[0]
return item_schema
[docs]
def json2oas_io_bbox(io_info, io_hint=null):
# type: (JSON_IO_Type, Union[OpenAPISchema, Type[null]]) -> OpenAPISchema
"""
Convert a single-dimension bounding box :term:`JSON` I/O definition into corresponding :term:`OpenAPI` schema.
.. seealso::
:data:`sd.OGC_API_BBOX_SCHEMA`
"""
# don't add the 'enum' of CRS as defined in the reference schema since this is auto-generated
# and could mismatch the intended CRS by the user, unless available explicitly
crs_schema = {"type": "string", "format": "uri", "default": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"}
supported_crs = get_field(io_info, "supported_crs", search_variations=True)
if isinstance(supported_crs, list) and all(isinstance(crs, str) for crs in supported_crs):
crs_schema["enum"] = supported_crs
bbox_object_schema = {
"type": "object",
"format": sd.OGC_API_BBOX_FORMAT,
"required": ["bbox"],
"properties": {
"crs": crs_schema,
"bbox": {
"type": PACKAGE_ARRAY_BASE,
"items": "number",
"oneOf": [
{"minItems": 4, "maxItems": 4},
{"minItems": 6, "maxItems": 6},
]
},
}
} # type: OpenAPISchemaObject
if isinstance(io_hint, dict):
if "$ref" in io_hint:
bbox_object_schema["$id"] = io_hint["$ref"]
elif "allOf" in io_hint:
for item in io_hint["allOf"]:
if "$ref" in item:
bbox_object_schema["$id"] = item["$ref"]
break
# add the alternate representation method
bbox_string_schema = {
"type": "string",
"format": sd.OGC_API_BBOX_FORMAT,
"contentSchema": sd.OGC_API_BBOX_SCHEMA,
}
bbox_schema = {
"oneOf": [
bbox_object_schema,
bbox_string_schema,
]
}
return bbox_schema
[docs]
def json2oas_io_literal_data_type(io_type):
# type: (str) -> JSON
"""
Converts various literal data types into corresponding :term:`OpenAPI` fields.
.. seealso::
- https://spec.openapis.org/oas/v3.1.0#data-types
- https://swagger.io/specification/#data-types
"""
data_info = {"type": "string"}
if io_type in OAS_LITERAL_FLOAT_FORMATS | WPS_LITERAL_DATA_FLOAT:
data_info["type"] = "number"
data_info["format"] = io_type
if io_type in OAS_LITERAL_INTEGER_FORMATS | WPS_LITERAL_DATA_INTEGER:
data_info["type"] = "integer"
if io_type != "integer":
data_info["format"] = io_type
if io_type in WPS_LITERAL_DATA_BOOLEAN:
data_info["type"] = "boolean"
if io_type in OAS_LITERAL_STRING_FORMATS | WPS_LITERAL_DATA_STRING | WPS_LITERAL_DATA_DATETIME:
data_info["type"] = "string"
if "time" in io_type.lower():
data_info["format"] = "date-time"
elif "date" in io_type.lower():
data_info["format"] = "date"
elif io_type != "string":
data_info["format"] = io_type
if io_type in OAS_LITERAL_BINARY_FORMATS:
data_info["type"] = "string"
data_info["format"] = "binary"
return data_info
[docs]
def json2oas_io_allowed_values(io_base, io_allowed):
# type: (JSON, JSON) -> List[JSON]
"""
Converts literal data allowed values :term:`JSON` definitions ino :term:`OpenAPI` equivalent variations.
:param io_base: Base value definitions that can be shared across variations (e.g.: default values).
:param io_allowed: Allowed values definitions (enum, ranges) extracted from :term:`JSON` literal data domains.
:return: List of converted :term:`OpenAPI` definitions applicable to represent the allowed values.
"""
item_variation = []
if isinstance(io_allowed, dict):
# anyValue
# nothing to do since regardless of true/false, nothing can be applied as OpenAPI schema definition
return [io_base]
if isinstance(io_allowed, list) and all(isinstance(val_def, (int, float, str)) for val_def in io_allowed):
# allowed values
# need to split the different types if a mix is used (e.g.: 1, 2, "A", "B")
data_val_types = {
"string": [val for val in io_allowed if isinstance(val, str)],
"number": [val for val in io_allowed if isinstance(val, (float, int))],
}
for _typ, vals in data_val_types.items():
if vals:
data_enum = {"type": _typ, "enum": vals}
data_enum.update(io_base)
if _typ == "number" and all(val for val in io_allowed if isinstance(val, int)):
data_enum.update(json2oas_io_literal_data_type("integer"))
elif _typ == "number" and all(val for val in io_allowed if isinstance(val, float)):
data_enum.update(json2oas_io_literal_data_type("double"))
item_variation.append(data_enum)
return item_variation
if isinstance(io_allowed, list) and all(isinstance(val_def, dict) for val_def in io_allowed):
# allowed ranges
for val in io_allowed:
min_val = get_field(val, "range_minimum", search_variations=True, default=None)
max_val = get_field(val, "range_maximum", search_variations=True, default=None)
spacing = get_field(val, "range_spacing", search_variations=True, default=None)
closure = get_field(val, "range_closure", search_variations=True, default=RANGECLOSURETYPE.CLOSED)
data_range = {}
data_range.update(io_base)
if min_val is not None:
data_range["minimum"] = min_val
if max_val is not None:
data_range["maximum"] = max_val
if spacing is not None:
data_range["multipleOf"] = spacing
if closure == RANGECLOSURETYPE.OPEN: # ]min, max[
data_range.update({"exclusiveMinimum": True, "exclusiveMaximum": True})
elif closure == RANGECLOSURETYPE.OPENCLOSED: # ]min, max]
data_range.update({"exclusiveMinimum": True})
elif closure == RANGECLOSURETYPE.CLOSEDOPEN: # [min, max[
data_range.update({"exclusiveMaximum": True})
item_variation.append(data_range)
return item_variation
return [io_base]
[docs]
def json2oas_io_literal(io_info, io_hint=null):
# type: (JSON_IO_Type, Union[OpenAPISchema, Type[null]]) -> OpenAPISchema
"""
Convert a single-dimension literal value :term:`JSON` I/O definition into corresponding :term:`OpenAPI` schema.
"""
item_variation = []
domains = get_field(io_info, "literal_data_domains", search_variations=True, default=[])
for data_info in domains:
data_fmt = {}
data_type = get_field(data_info, "type", search_variations=True)
if isinstance(data_type, dict) and "name" in data_type:
data_type = data_type["name"] # type: Optional[str]
data_href = get_field(data_type, "href", search_variations=True)
else:
data_type = None
data_href = None
if io_hint:
# if original data type is available in hint OAS I/O definition, use it since it should be more specific
# because of conversions between type/format of different scopes, some types could be less precise
# (eg: 'double' transformed to 'float')
data_hint = oas2json_io(io_hint)
data_hint = (data_hint or {}).get("data_type")
# ignore 'string' type which is the fallback type to avoid undoing proper detection
data_hint = null if data_hint == "string" and data_type is not null else data_hint
data_type = data_hint or data_type
data_fmt = get_field(io_hint, "format", search_variations=False)
data_fmt = {"format": data_fmt} if data_fmt is not null else {}
# if the provided literal data can still be represented by a 'type: object'
# then the I/O can contain additional metadata along the value, such as for a measurement
# make sure to preserve that data representation variant in the schema to describe it
io_type = get_field(io_hint, "type")
if io_type == "object":
item_variation.append(io_hint)
if not data_type:
continue
data_var = json2oas_io_literal_data_type(data_type)
data_var.update(data_fmt)
if data_href:
data_var["contentSchema"] = data_href
data_default = get_field(io_info, "default", search_variations=True)
if data_default is not null:
data_var["default"] = data_default
data_def = get_field(data_info, "valueDefinition")
# extend definition with relevant value definitions
# basic definition if no special enum/range handling was applied
data_var = json2oas_io_allowed_values(data_var, data_def)
item_variation.extend(data_var)
if not domains:
return {"type": "string"}
if len(item_variation) > 1:
item_schema = {"oneOf": item_variation}
else:
item_schema = item_variation[0]
if isinstance(io_hint, dict):
if "$ref" in io_hint:
item_schema["$id"] = io_hint["$ref"]
return item_schema
[docs]
def json2oas_io(io_info, io_hint=null):
# type: (JSON_IO_Type, Union[OpenAPISchema, Type[null]]) -> OpenAPISchema
"""
Converts definitions from a :term:`JSON` :term:`Process` I/O definition into corresponding :term:`OpenAPI` schema.
:param io_info: :term:`WPS` I/O definition to generate a corresponding :term:`OpenAPI` schema.
:param io_hint: Reference :term:`OpenAPI` definition that can improve more explicit object definitions.
"""
io_type = get_field(io_info, "type")
if io_type == WPS_COMPLEX:
item_schema = json2oas_io_complex(io_info, io_hint)
elif io_type == WPS_BOUNDINGBOX:
item_schema = json2oas_io_bbox(io_info, io_hint)
else:
item_schema = json2oas_io_literal(io_info, io_hint)
min_occurs = get_field(io_info, "min_occurs", search_variations=True)
max_occurs = get_field(io_info, "max_occurs", search_variations=True)
# backward support of values as strings
if isinstance(min_occurs, str) and str.isnumeric(min_occurs):
min_occurs = int(min_occurs)
if isinstance(max_occurs, str) and str.isnumeric(max_occurs):
max_occurs = int(max_occurs)
# resolve a single/multi/both value cardinality
# because specified single-value/objects *MUST* be provided, optional can be represented only by zero-length array
if isinstance(min_occurs, int) and (min_occurs == 0 or min_occurs > 1):
io_schema = {
"type": PACKAGE_ARRAY_BASE,
"items": item_schema,
"minItems": min_occurs,
}
if isinstance(max_occurs, int):
io_schema["maxItems"] = max_occurs
elif max_occurs == 1 or max_occurs is null: # assume unspecified is default=1
io_schema = item_schema
else:
array_schema = {"type": PACKAGE_ARRAY_BASE, "items": item_schema}
if isinstance(min_occurs, int):
array_schema["minItems"] = min_occurs
if isinstance(max_occurs, int):
array_schema["maxItems"] = max_occurs
# if item schema was itself 'oneOf', combine them to make it easier to read
if len(item_schema) == 1 and "oneOf" in item_schema:
io_schema = deepcopy(item_schema) # avoid recursion by dict references
io_schema["oneOf"].append(array_schema) # noqa
# otherwise simply stack (still valid, just slightly more confusing to read)
else:
io_schema = {
"oneOf": [
item_schema,
array_schema,
]
}
return io_schema
[docs]
def oas2json_io_literal(io_info):
# type: (OpenAPISchemaProperty) -> Union[JSON_IO_TypedInfo, Type[null]]
"""
Converts a literal value I/O definition by :term:`OpenAPI` schema into the equivalent :term:`JSON` representation.
:param io_info: :term:`OpenAPI` schema of the I/O.
:return: Converted :term:`JSON` I/O definition, or :data:`null` if definition could not be resolved.
"""
io_type = get_field(io_info, "type", search_variations=False)
io_fmt = get_field(io_info, "format", search_variations=False)
if io_fmt is not null:
if io_type in OAS_LITERAL_NUMERIC:
if io_fmt in OAS_LITERAL_FLOAT_FORMATS:
io_type = "double"
elif io_fmt in OAS_LITERAL_INTEGER_FORMATS:
io_type = "integer"
elif io_fmt in OAS_LITERAL_STRING_FORMATS:
io_type = io_fmt
data_type = any2wps_literal_datatype(io_type, False)
io_json = {"type": WPS_LITERAL, "data_type": data_type}
io_enum = get_field(io_info, "enum", search_variations=False)
min_val = get_field(io_info, "minimum", search_variations=False)
max_val = get_field(io_info, "maximum", search_variations=False)
min_exc = get_field(io_info, "exclusiveMinimum", search_variations=False, default=False)
max_exc = get_field(io_info, "exclusiveMaximum", search_variations=False, default=False)
mult_of = get_field(io_info, "multipleOf", search_variations=False, default=None)
io_allow = null
if io_enum is not null:
io_allow = {"allowed_values": io_enum}
elif min_val is not null or max_val is not null:
if min_exc and max_exc:
closure = RANGECLOSURETYPE.OPEN
elif min_exc:
closure = RANGECLOSURETYPE.OPENCLOSED
elif max_exc:
closure = RANGECLOSURETYPE.CLOSEDOPEN
else:
closure = RANGECLOSURETYPE.CLOSED
io_allow = {
"allowed_values": [{
"minimum": min_val,
"maximum": max_val,
"spacing": mult_of,
"closure": closure
}]
}
if io_allow is not null:
io_allow.update(io_info) # noqa
io_allow["data_type"] = data_type
domains = any2json_literal_data_domains(io_allow)
io_json["allowed_values"] = io_allow["allowed_values"] # propagate to help CWL resolution of enum later on
io_json["literalDataDomains"] = domains
return io_json
[docs]
def oas2json_io_array(io_info):
# type: (OpenAPISchemaArray) -> Union[JSON_IO_TypedInfo, Type[null]]
"""
Converts an array I/O definition by :term:`OpenAPI` schema into the equivalent :term:`JSON` representation.
:param io_info: :term:`OpenAPI` schema of the I/O.
:return: Converted :term:`JSON` I/O definition, or :data:`null` if definition could not be resolved.
"""
io_items = get_field(io_info, "items", search_variations=False)
io_json = oas2json_io(io_items)
min_items = get_field(io_info, "minItems")
max_items = get_field(io_info, "maxItems")
if isinstance(min_items, int):
io_json["minOccurs"] = min_items
if isinstance(max_items, int):
io_json["maxOccurs"] = max_items
return io_json
[docs]
def oas2json_io_object(io_info, io_href=null):
# type: (OpenAPISchemaObject, str) -> Union[JSON_IO_TypedInfo, Type[null]]
"""
Converts an object I/O definition by :term:`OpenAPI` schema into the equivalent :term:`JSON` representation.
An explicit :term:`OpenAPI` schema with ``object`` type can represent any of the following I/O:
- Bounding Box as GeoJSON feature
- Complex JSON structure
.. seealso::
:func:`oas2json_io_file` is used for file reference to be parsed as other Complex I/O.
:param io_info: :term:`OpenAPI` schema of the I/O.
:param io_href: Alternate schema reference for the type.
:return: Converted :term:`JSON` I/O definition, or :data:`null` if definition could not be resolved.
"""
io_fmt = get_field(io_info, "format", search_variations=False)
io_props = get_field(io_info, "properties", search_variations=False) or {}
if ("bbox" in io_props and "crs" in io_props) or io_fmt == sd.OGC_API_BBOX_FORMAT:
io_json = {"type": WPS_BOUNDINGBOX}
io_crs = get_field(io_props, "crs", search_variations=False)
if isinstance(io_crs, dict):
io_crs_allow = get_field(io_crs, "enum", search_variations=False)
if isinstance(io_crs_allow, list) and all(isinstance(crs, str) for crs in io_crs_allow):
io_json["supported_crs"] = io_crs_allow
if io_href is not null:
io_meta = {"href": io_href, "role": SchemaRole.JSON_SCHEMA, "title": "Schema"}
io_ext = os.path.splitext(io_href)[-1]
io_ctype = io_ext and get_content_type(io_ext)
if io_ctype:
io_meta["type"] = io_ctype
io_json["metadata"] = [io_meta]
else:
# note:
# In this case we are dealing only with literal OAS objects, therefore JSON content.
# Complex I/O provided by file reference are done by other methods.
obj_fmt = {"mime_type": ContentType.APP_JSON}
if io_href is not null:
obj_fmt["schema"] = io_href
io_json = {"type": WPS_COMPLEX, "supported_formats": [obj_fmt]}
return io_json
[docs]
def oas2json_io_keyword(io_info):
# type: (OpenAPISchemaKeyword) -> Union[JSON_IO_TypedInfo, Type[null]]
"""
Converts a keyword I/O definition by :term:`OpenAPI` schema into the equivalent :term:`JSON` representation.
Keywords are defined as a list of combinations of :term:`OpenAPI` schema representing how to combine them
according to the keyword value, being one of :data:`OAS_KEYWORD_TYPES`.
:param io_info: :term:`OpenAPI` schema of the I/O.
:return: Converted :term:`JSON` I/O definition, or :data:`null` if definition could not be resolved.
"""
# if it cannot be resolved, must be too ambiguous, so assume complex data dump
io_json = {"type": WPS_COMPLEX, "supported_formats": [{"mime_type": ContentType.APP_JSON}]}
kw_key_val = {key: val for key, val in io_info.items() if key in OAS_KEYWORD_TYPES}
if len(kw_key_val) != 1:
return null
keyword = list(kw_key_val)[0]
keyword_schemas = io_info[keyword]
if keyword == "not":
keyword_objects = [oas2json_io(keyword_schemas)] # noqa
elif keyword == "allOf":
merged_schema = {} # type: OpenAPISchema
for schema in keyword_schemas:
merged_schema.update(schema)
keyword_objects = [oas2json_io(merged_schema)]
else:
keyword_objects = [oas2json_io(schema) for schema in keyword_schemas]
keyword_types = [get_field(obj, "type", search_variations=False) for obj in keyword_objects]
keyword_types = set(filter(lambda obj: isinstance(obj, str), keyword_types))
keyword_dtypes = [get_field(obj, "data_type", search_variations=False) for obj in keyword_objects]
keyword_dtypes = set(filter(lambda obj: isinstance(obj, str), keyword_dtypes))
if keyword_types:
# literals are all or nothing, but can allow different 'data format'
# any mixed type with literal must be elevated to complex since there is no way to handle both as literals
if all(typ == WPS_LITERAL for typ in keyword_types):
io_json = {"type": WPS_LITERAL}
if keyword_dtypes and len(keyword_dtypes) == 1:
io_json["data_type"] = list(keyword_dtypes)[0]
elif all(dtype in OAS_LITERAL_FLOAT_FORMATS for dtype in keyword_dtypes):
io_json["data_type"] = "double"
elif all(dtype in OAS_LITERAL_INTEGER_FORMATS for dtype in keyword_dtypes):
io_json["data_type"] = "integer"
# acceptable to use 'numeric' for either integers or floats
elif all(dtype in OAS_LITERAL_NUMERIC | OAS_LITERAL_NUMERIC_FORMATS for dtype in keyword_dtypes):
io_json["data_type"] = "numeric"
else:
io_json["data_type"] = "string"
# since some variations can be an external reference or a partial definition,
# anything matching a bbox marks the whole definition as one, falling back to complex otherwise
elif any(typ == WPS_BOUNDINGBOX for typ in keyword_types):
for obj_json in keyword_objects:
io_type = get_field(obj_json, "type")
if io_type == WPS_BOUNDINGBOX:
io_json = obj_json
break
elif all(typ == WPS_COMPLEX for typ in keyword_types):
# improve definition of complex type of multiple distinct supported formats
formats = []
for obj_json in keyword_objects:
obj_fmt = get_field(obj_json, "supported_formats", default=[])
formats.extend([fmt for fmt in obj_fmt if fmt not in formats])
io_json = {"type": WPS_COMPLEX, "supported_formats": formats}
return io_json
[docs]
def oas2json_io_file(io_info, io_href=null):
# type: (OpenAPISchemaObject, str) -> JSON_IO_TypedInfo
"""
Converts a file reference I/O definition by :term:`OpenAPI` schema into the equivalent :term:`JSON` representation.
:param io_info: :term:`OpenAPI` schema of the I/O.
:param io_href: Alternate schema reference for the type.
:return: Converted :term:`JSON` I/O definition, or :data:`null` if definition could not be resolved.
"""
io_json = {"type": WPS_COMPLEX}
io_ctype = get_field(io_info, "contentMediaType", search_variations=False)
io_encode = get_field(io_info, "contentEncoding", search_variations=False)
io_schema = get_field(io_info, "contentSchema", search_variations=False, default=io_href)
io_format = {}
if isinstance(io_encode, str):
io_format["encoding"] = io_encode
if isinstance(io_schema, str):
io_format["schema"] = io_schema
if isinstance(io_ctype, str):
io_format["mime_type"] = io_ctype
# other fields don't matter if required media-type is omitted
io_json["supported_formats"] = [io_format]
return io_json
[docs]
def oas2json_io_measure(io_info):
# type: (OpenAPISchemaObject) -> Union[JSON_IO_TypedInfo, Type[null]]
"""
Convert a unit of measure (``UoM``) I/O definition by :term:`OpenAPI` schema into :term:`JSON` representation.
This conversion projects an object (normally complex type) into a literal type, considering that other provided
parameters are all metadata information.
:param io_info: Potential :term:`OpenAPI` schema of an UoM I/O.
:return: Converted I/O if it matched the UoM format, or null otherwise.
"""
io_type = get_field(io_info, "type", search_variations=False)
if io_type == "object":
io_prop = get_field(io_info, "properties", search_variations=False)
if isinstance(io_prop, dict):
io_uom = get_field(io_prop, "uom", search_variations=True)
io_val = get_field(io_prop, "measure", search_variations=True)
io_uom_ref = get_field(io_prop, "reference", search_variations=False, default={})
if isinstance(io_uom, dict) and isinstance(io_val, dict):
io_key = get_field(io_prop, "measure", search_variations=True, key=True)
io_req = get_field(io_info, "required", search_variations=False)
if not isinstance(io_req, list) or io_key not in io_req:
io_err = repr_json(io_info, force_string=True, indent=None)
raise ValueError(
f"Detected UoM I/O schema but missing 'required' field entry for the measure value: {io_err}"
)
# detect if any number, int/float explicit, or any min/max constraints
io_json = oas2json_io_literal(io_val)
uom_enum = io_uom.get("enum")
uom_const = io_uom.get("const")
if isinstance(uom_enum, list) or isinstance(uom_const, str):
# although WPS can support many UoM, OGC-API schema representation can use oneOf to refer
# to multiple variations of UoM with their respective combinations of unit/references
# they can also use a single UoM definition with a list of units
uom_enum = uom_enum or ([uom_const] if uom_const else [])
if len(uom_enum) < 1:
raise ValueError(
"Detected UoM I/O schema with invalid units enum/const. "
f"At least 1 unit must be specified. Got: {io_uom}."
)
ref_enum = io_uom_ref.get("enum") or ([""] * len(uom_enum))
ref_const = io_uom_ref.get("const") or []
ref_enum = ref_enum or ([ref_const] if ref_const else [])
if isinstance(ref_enum, list) and len(ref_enum) != len(uom_enum):
raise ValueError(
"Detected UoM I/O schema with invalid unit/reference enums/const."
"When UoM reference are provided, they must be of equal quantity with units. "
f"Got: (units: {io_uom}, references: {io_uom_ref})."
)
io_json["uoms"] = [uom2json(unit, ref) for unit, ref in zip(uom_enum, ref_enum)]
io_uom_default = io_uom.get("default")
io_uom_ref_default = io_uom_ref.get("default")
if isinstance(io_uom_default, str) and io_uom_default:
io_json["uom"] = uom2json(io_uom_default, io_uom_ref_default)
return io_json
return null
[docs]
def oas2json_io(io_info):
# type: (OpenAPISchema) -> Union[JSON_IO_TypedInfo, Type[null]]
"""
Converts an :term:`I/O` definition by :term:`OpenAPI` schema into the equivalent :term:`JSON` representation.
:param io_info: :term:`OpenAPI` schema of the :term:`I/O`.
:return: Converted :term:`JSON` :term:`I/O` definition, or :data:`null` if definition could not be resolved.
"""
io_href = get_field(io_info, "$ref")
io_info = oas_resolve_remote(io_info)
io_type = get_field(io_info, "type", search_variations=False)
io_json = null
# File I/O can be defined with raw-data string type, but must be associated with content information to
# help distinguish them from plain string value. Try to detect this to avoid literal data interpretation.
# NOTE:
# Don't include "{type: string, format: uri}" as complex type.
# Leave this as the method to indicate that a process uses a plain URL reference that must not be fetched.
# The appropriate way to define "fetchable" resources is with 'content' prefixed properties below.
if io_type == "string":
io_ctype = get_field(io_info, "contentMediaType", search_variations=False)
io_encode = get_field(io_info, "contentEncoding", search_variations=False)
io_schema = get_field(io_info, "contentSchema", search_variations=False)
io_format = get_field(io_info, "format", search_variations=False)
if any(io_field is not null for io_field in [io_ctype, io_encode]): # ignore schema since possible in literal
io_type = WPS_COMPLEX # set value to avoid null return below, but no parsing after since not OAS type
io_json = oas2json_io_file(io_info, io_href)
if io_schema == sd.OGC_API_BBOX_SCHEMA or io_format == sd.OGC_API_BBOX_FORMAT:
return oas2json_io_object(io_info, io_href)
else:
# known special case of extended OAS object representing a literal (unit of measure)
io_json = oas2json_io_measure(io_info)
if io_json:
io_type = WPS_LITERAL # set value to avoid null return below, but no parsing after since not OAS type
if io_type is not null:
if io_type in OAS_LITERAL_TYPES:
io_json = oas2json_io_literal(io_info)
elif io_type in OAS_COMPLEX_TYPES:
io_json = oas2json_io_object(io_info, io_href)
elif io_type in OAS_ARRAY_TYPES:
io_json = oas2json_io_array(io_info)
elif any(key in OAS_KEYWORD_TYPES for key in io_info):
io_json = oas2json_io_keyword(io_info)
# in case this keyword was a large combination of multiple complex JSON variants with a reference schema
# forward the reference in the supported type since this is a special case that we extend with contentSchema
io_type = get_field(io_json, "type", default="keyword") # ensure not null value to skip return null
if io_type == WPS_COMPLEX:
io_formats = get_field(io_json, "supported_formats", default=[])
if "$id" in io_info and len(io_formats) == 1:
io_ctype = get_field(io_formats[0], "mime_type", search_variations=True)
if io_ctype and ContentType.APP_JSON in io_ctype:
io_formats[0]["schema"] = io_info["$id"]
if io_type is null or io_json is null:
LOGGER.debug("Unknown OpenAPI to JSON I/O resolution for schema:\n%s", repr_json(io_info))
return null
# default literal value can help resolve as last resort if specific type cannot be inferred
io_default = get_field(io_info, "default", search_variations=False)
if io_default is not null:
io_json["default"] = io_default
return io_json
[docs]
def oas_resolve_remote(io_info):
# type: (OpenAPISchema) -> OpenAPISchema
"""
Perform remote :term:`OpenAPI` schema ``$ref`` resolution.
Resolution is performed only sufficiently to provide enough context for following :term:`JSON` :term:`I/O`
conversion. Remote references are not resolved further than required to speedup loading time and avoid recursive
error on self-referring schema. Passed sufficient levels of schema definitions, the specific contents is not
important nor needs to be resolved as there is they cannot be mapped to anything else than :data:`WPS_COMPLEX`
:term:`I/O` type.
:param io_info: :term:`I/O` :term:`OpenAPI` schema to attempt resolution as applicable.
:return:
Resolved :term:`I/O` schema or directly the provided schema returned unmodified if no references need
resolution.
"""
# retrieve external schema reference (possibly nested)
io_href = get_field(io_info, "$ref", search_variations=False, pop_found=True)
if isinstance(io_href, str):
# first encountered reference should be full-uri to allow us knowing where to look for
if not any(io_href.startswith(f"{scheme}://") for scheme in ["http", "https", "s3"]):
raise ValueError(f"External OpenAPI schema reference [{io_href}] must be absolue.")
if not any(io_href.endswith(extension) for extension in [".yaml", ".yml", ".json"]):
raise ValueError(f"External OpenAPI schema reference [{io_href}] must be formatted as JSON or YAML.")
try:
# use resolver which will handle all intricacies of loading remote schema into a local dict definition
# this way, no need to handle other external, relative, absolute, etc. nested '$ref' locations
# note: '$ref' are still loaded on the first level only to avoid recursive schemas breaking on load
io_base = f"{io_href.rsplit('/', 1)[0]}/"
resolver = SchemaRefResolver(base_uri=io_base, referrer=io_info)
io_resolved = resolver.resolve_from_url(io_href)
# In case the input schema was the result of a 'allOf' merge,
# update with other fields that forms a whole object
# This way, there is more chance we can detect a combined form that matches a known conversion type.
io_info.update(io_resolved)
io_info["$id"] = io_href
# If the remote $ref schema itself was a combination of $ref schemas (e.g.: it contained 'oneOf').
# Then update the first level of references that we can potentially work with to resolve conversion type.
# No need to resolve more since this is guaranteed to be 'complex' type.
# We must use the resolver right away in case the remote $ref are relative to the same root $ref.
for keyword in OAS_KEYWORD_TYPES: # type: Literal["oneOf", "anyOf", "allOf", "not"]
if keyword in io_info:
if isinstance(io_info[keyword], list): # all keywords except 'not'
for i, schema in enumerate(list(io_info[keyword])): # type: int, OpenAPISchema
if "$ref" in schema:
ref_id, schema = resolver.resolve(schema["$ref"])
schema["$id"] = ref_id
io_info[keyword][i] = schema # noqa
elif "$ref" in io_info[keyword]: # only 'not' keyword
io_keyword = io_info[keyword] # type: OpenAPISchemaReference # noqa
ref_schema = io_keyword["$ref"]
ref_id, schema = resolver.resolve(ref_schema)
schema["$id"] = ref_id
io_info[keyword] = schema # type: ignore
except Exception as exc:
raise ValueError(f"External OpenAPI schema reference [{io_href}] could not be loaded.") from exc
return io_info
[docs]
def json2wps_datatype(io_info):
# type: (JSON_IO_Type) -> str
"""
Converts a JSON input definition into the corresponding :mod:`pywps` parameters.
Guesses the literal data-type from :term:`JSON` :term:`I/O` information in order to allow creation of the
corresponding :term:`WPS` :term:`I/O`. Defaults to ``string`` if no suitable guess can be accomplished.
"""
io_type = get_field(io_info, "type", search_variations=False, pop_found=True)
if str(io_type).lower() == WPS_LITERAL:
io_type = null
io_guesses = [
(io_type, False),
(get_field(io_info, "type", search_variations=True), False),
(get_field(io_info, "default", search_variations=True), True),
(get_field(io_info, "allowed_values", search_variations=True), True),
(get_field(io_info, "supported_values", search_variations=True), True)
]
for io_guess, is_value in io_guesses:
if io_type:
break
if isinstance(io_guess, list) and len(io_guess):
io_guess = io_guess[0]
io_type = any2wps_literal_datatype(io_guess, is_value)
if not isinstance(io_type, str):
LOGGER.warning("Failed literal data-type guess, using default 'string' for I/O [%s].",
get_field(io_info, "identifier", search_variations=True))
return "string"
return io_type
[docs]
def json2wps_field(field_info, field_category):
# type: (JSON, str) -> Any
"""
Converts an :term:`I/O` field from :term:`JSON` literal, list, or dictionary to corresponding :term:`WPS` types.
:param field_info: literal data or information container describing the type to be generated.
:param field_category: one of :data:`WPS_FIELD_MAPPING` keys to indicate how to parse ``field_info``.
"""
if field_category == "allowed_values":
return json2wps_allowed_values({"allowed_values": field_info})
elif field_category == "supported_formats":
fmt = None
field_info = field_info.copy()
field_info.pop("$schema", None)
# pywps doesn't allow 'default' field in init, remove if found, but preserve it indirectly
default = get_field(field_info, "default", search_variations=False, pop_found=True)
if isinstance(field_info, dict):
fmt = Format(**field_info)
if isinstance(field_info, str):
fmt = Format(field_info)
if fmt:
# consider any explicit 'default' format specification to allow resolution against many supported formats
# set a temporary additional attribute in PyWPS Format object that can be found later
if isinstance(default, bool):
set_field(fmt, "default", default)
return fmt
elif field_category == "metadata":
if isinstance(field_info, WPS_Metadata):
return field_info
if isinstance(field_info, dict):
meta = metadata2json(field_info, force=True)
rel = meta.pop("rel", None)
ctype = meta.pop("type", None)
meta = WPS_Metadata(**meta)
if isinstance(rel, str):
set_field(meta, "rel", rel)
if isinstance(ctype, str):
set_field(meta, "type", ctype)
return meta
if isinstance(field_info, str):
return WPS_Metadata(field_info)
elif field_category == "keywords" and isinstance(field_info, list):
return field_info
elif field_category == "uom" and isinstance(field_info, str):
return field_info
elif field_category in ["identifier", "title", "abstract"] and isinstance(field_info, str):
return field_info
LOGGER.warning("Field of type '%s' not handled as known WPS field.", field_category)
return None
[docs]
def json2wps_supported_uoms(io_info):
# type: (JSON_IO_Type) -> Union[Type[null], List[UOM]]
"""
Obtains instances of supported Unit of Measure (:term:`UoM`) from a :term:`JSON` :term:`I/O` definition.
"""
uoms = get_field(io_info, "uoms", search_variations=True)
if not uoms or not isinstance(uoms, list):
return null
supported_uoms = []
for uom in uoms:
if isinstance(uom, UOM):
supported_uoms.append(uom)
continue
if not isinstance(uom, dict):
return null
unit = get_field(uom, "uom", search_variations=True)
href = get_any_value(uom, file=True, data=False, default="")
# don't check for 'if not unit', but for string type
# an explicit empty string is valid, as "unit" of no-unit value
if not isinstance(unit, str):
return null
supported_uoms.append(UOM(unit, href))
return supported_uoms
[docs]
def json2wps_allowed_values(io_info):
# type: (JSON_IO_Type) -> Union[Type[null], List[AllowedValue]]
"""
Obtains the allowed values constrains for the literal data type from a :term:`JSON` :term:`I/O` definition.
Converts the ``literalDataDomains`` definition into ``allowed_values`` understood by :mod:`pywps`.
Handles explicit ``allowed_values`` if available and not previously defined by ``literalDataDomains``.
.. seealso::
Function :func:`any2json_literal_data_domains` defines generated ``literalDataDomains`` JSON definition.
"""
domains = get_field(io_info, "literal_data_domains", search_variations=True)
allowed = get_field(io_info, "allowed_values", search_variations=True)
if not domains and isinstance(allowed, list):
if all(isinstance(value, AllowedValue) for value in allowed):
return allowed
if all(isinstance(value, (float, int, str)) for value in allowed):
return [AllowedValue(value=value) for value in allowed]
if all(isinstance(value, dict) for value in allowed):
allowed_values = []
for value in allowed:
min_val = get_field(value, "range_minimum", search_variations=True, default=None)
max_val = get_field(value, "range_maximum", search_variations=True, default=None)
spacing = get_field(value, "range_spacing", search_variations=True, default=None)
closure = get_field(value, "range_closure", search_variations=True, default=RANGECLOSURETYPE.CLOSED)
literal = get_field(value, "value", search_variations=False, default=None)
if min_val or max_val or spacing:
allowed_values.append(AllowedValue(ALLOWEDVALUETYPE.RANGE,
minval=min_val, maxval=max_val,
spacing=spacing, range_closure=closure))
elif literal:
allowed_values.append(AllowedValue(ALLOWEDVALUETYPE.VALUE, value=literal))
# literalDataDomains could be 'anyValue', which is to be ignored here
return allowed_values
LOGGER.debug("Cannot parse literal I/O AllowedValues: %s", allowed)
raise ValueError(f"Unknown parsing of 'AllowedValues' for value: {allowed!s}")
if domains:
for domain in domains:
values = domain.get("valueDefinition")
if values:
allowed = json2wps_allowed_values({"allowed_values": values})
# stop on first because undefined how to combine multiple
# no multiple definitions by 'any2json_literal_data_domains' regardless, and not directly handled by pywps
if allowed:
return allowed
return null
[docs]
def json2wps_io(io_info, io_select): # pylint: disable=R1260
# type: (JSON_IO_Type, IO_Select_Type) -> WPS_IO_Type
"""
Converts an :term:`I/O` from a :term:`JSON` dict to :mod:`pywps` types.
:param io_info: :term:`I/O` in :term:`JSON` dict format.
:param io_select: :data:`IO_INPUT` or :data:`IO_OUTPUT` to specify desired :term:`WPS` type conversion.
:return: corresponding :term:`I/O` in :term:`WPS` format.
"""
io_info["identifier"] = get_field(io_info, "identifier", search_variations=True, pop_found=True)
rename = {
"formats": "supported_formats",
"minOccurs": "min_occurs",
"maxOccurs": "max_occurs",
"dataType": "data_type",
"defaultValue": "default",
"crs": "default",
"supportedValues": "supported_values",
}
remove = [
"id",
"workdir",
"any_value",
"data_format",
"data",
"file",
"mimetype",
"mediaType",
"encoding",
"schema",
"asreference",
"additionalParameters",
"ll",
"ur",
"bbox",
]
replace_values = {"unbounded": PACKAGE_ARRAY_MAX_SIZE}
transform_json(io_info, rename=rename, remove=remove, replace_values=replace_values)
# convert allowed value objects
values = json2wps_allowed_values(io_info)
if values is not null:
if isinstance(values, list) and len(values) > 0:
io_info["allowed_values"] = values
else:
io_info["allowed_values"] = AnyValue # noqa
# convert supported format objects
formats = get_field(io_info, "supported_formats", search_variations=True, pop_found=True)
if formats is not null:
for fmt in formats:
fmt["mime_type"] = get_field(fmt, "mime_type", search_variations=True, pop_found=True)
fmt.pop("maximumMegabytes", None)
# define the 'default' with 'data_format' to be used if explicitly specified from the payload
if fmt.get("default", None) is True:
if get_field(io_info, "data_format") != null: # if set by previous 'fmt'
raise PackageTypeError("Cannot have multiple 'default' formats simultaneously.")
# use 'data_format' instead of 'default' to avoid overwriting a potential 'default' value
# field 'data_format' is mapped as 'default' format
io_info["data_format"] = json2wps_field(fmt, "supported_formats")
io_info["supported_formats"] = [json2wps_field(fmt, "supported_formats") for fmt in formats]
# convert metadata objects
metadata = get_field(io_info, "metadata", search_variations=True, pop_found=True)
if metadata is not null:
io_info["metadata"] = [json2wps_field(meta, "metadata") for meta in metadata]
# convert literal fields specified as is
for field in ["identifier", "title", "abstract", "keywords"]:
value = get_field(io_info, field, search_variations=True, pop_found=True)
if value is not null:
io_info[field] = json2wps_field(value, field)
# convert by type, add missing required arguments and
# remove additional arguments according to each case
io_type = io_info.pop("type", WPS_COMPLEX) # only ComplexData doesn't have "type"
# attempt to identify defined data-type directly in 'type' field instead of 'data_type'
if io_type not in WPS_DATA_TYPES:
io_type_guess = any2wps_literal_datatype(io_type, is_value=False)
if io_type_guess is not null:
io_type = WPS_LITERAL
io_info["data_type"] = io_type_guess
if io_type == WPS_LITERAL:
data_type = json2wps_datatype(io_info)
# pywps literals subset is more restrictive than all possible standard WPS
# make use of some non-pywps compatible types since other valid WPS types are easier to match with OAS
if data_type in WPS_LITERAL_DATA_TYPES and data_type not in LITERAL_DATA_TYPES:
data_type = any2wps_literal_datatype(data_type, is_value=False, pywps=True)
io_info["data_type"] = data_type
supported_uoms = json2wps_supported_uoms(io_info)
if supported_uoms:
io_info["uoms"] = supported_uoms
else:
io_info.pop("uoms", None)
io_info.pop("uom", None) # remove 'default', auto-selected from first 'uoms'
if io_select == IO_INPUT:
if ("max_occurs", "unbounded") in io_info.items():
io_info["max_occurs"] = PACKAGE_ARRAY_MAX_SIZE
if io_type in WPS_COMPLEX_TYPES:
if "supported_formats" not in io_info:
io_info["supported_formats"] = [DEFAULT_FORMAT]
io_info.pop("data_type", None)
io_info.pop("allowed_values", None)
io_info.pop("supported_values", None)
return ComplexInput(**io_info)
if io_type == WPS_BOUNDINGBOX:
io_info.pop("supported_formats", None)
io_info["crss"] = get_field(io_info, "supported_crs", search_variations=True, pop_found=True, default=None)
return BoundingBoxInput(**io_info)
if io_type == WPS_LITERAL:
io_info.pop("data_format", None)
io_info.pop("supported_formats", None)
allowed_values = json2wps_allowed_values(io_info)
if allowed_values:
io_info["allowed_values"] = allowed_values
else:
io_info.pop("allowed_values", None)
io_info.pop("literalDataDomains", None)
return LiteralInput(**io_info)
elif io_select == IO_OUTPUT:
# following not allowed for PyWPS instance creation,
# but they are useful for other steps, so forward them afterward
io_min = io_info.pop("min_occurs", null)
io_max = io_info.pop("max_occurs", null)
io_allow = io_info.pop("allowed_values", null)
io_default = io_info.pop("default", null)
io_wps = null
if io_type in WPS_COMPLEX_TYPES:
io_info.pop("supported_values", None)
io_info.pop("data_type", None) # ComplexData explicitly indicated (e.g.: embedded multi-value JSON array)
io_wps = ComplexOutput(**io_info)
elif io_type == WPS_BOUNDINGBOX:
io_info.pop("supported_formats", None)
io_info["crss"] = get_field(io_info, "supported_crs", search_variations=True, pop_found=True, default=None)
io_wps = BoundingBoxOutput(**io_info)
elif io_type == WPS_LITERAL:
io_info.pop("supported_formats", None)
io_info.pop("literalDataDomains", None)
io_wps = LiteralOutput(**io_info)
set_field(io_wps, "allowed_values", io_allow)
if io_wps:
set_field(io_wps, "min_occurs", io_min)
set_field(io_wps, "max_occurs", io_max)
set_field(io_wps, "default", io_default)
return io_wps
raise PackageTypeError(f"Unknown conversion from dict to WPS type (type={io_type}, mode={io_select}).")
[docs]
def wps2json_io(io_wps, forced_fields=False):
# type: (WPS_IO_Type, bool) -> JSON_IO_Type
"""
Converts a :mod:`pywps` :term:`I/O` into a :term:`JSON` dictionary with corresponding standard keys names.
Employs standard key names as defined by :term:`WPS` 2.0.
:param io_wps: Any :mod:`pywps` :term:`I/O` definition to be converted to :term:`JSON` representation.
:param forced_fields:
Request transfer of additional fields normally undefined for outputs if they are available by being forcefully
inserted in the objects after their creation (i.e.: using :func:`set_field`). These fields can be useful for
obtaining mandatory details for further processing operations (e.g.: :term:`OpenAPI` schema conversion).
"""
if not isinstance(io_wps, BasicIO):
raise PackageTypeError(f"Invalid type, expected 'BasicIO', got: [{type(io_wps)!r}] '{io_wps!r}'")
if not hasattr(io_wps, "json"):
raise PackageTypeError("Invalid type definition expected to have a 'json' property.")
io_wps_json = io_wps.json # type: JSON # noqa
# transfer additional fields normally undefined for outputs if available in original object (forcefully added)
# when they are requested for further processing operations (eg: later OAS conversion)
if forced_fields:
for field in ["min_occurs", "max_occurs"]:
if field not in io_wps_json:
io_field = get_field(io_wps, field, search_variations=True)
if io_field is not null:
io_wps_json[field] = io_field
rename = {
"identifier": "id",
"abstract": "description",
"supported_formats": "formats",
"mime_type": "mediaType",
"min_occurs": "minOccurs",
"max_occurs": "maxOccurs",
}
replace_values = {
PACKAGE_ARRAY_MAX_SIZE: "unbounded",
}
replace_func = {
"maxOccurs": str,
"minOccurs": str,
"metadata": lambda metadata: [metadata2json(meta) for meta in metadata]
}
remove = [
"data" # string encoded value can cause confusion with default
]
transform_json(io_wps_json, rename=rename, remove=remove, replace_values=replace_values, replace_func=replace_func)
# in some cases (Complex I/O), 'as_reference=True' causes "type" to be overwritten, revert it back
if "type" in io_wps_json and io_wps_json["type"] == WPS_REFERENCE:
io_wps_json["type"] = WPS_COMPLEX
# minimum requirement of 1 format object which defines mime-type
if io_wps_json["type"] == WPS_COMPLEX:
# FIXME: should we store 'None' in db instead of empty string when missing "encoding", "schema", etc. ?
if "formats" not in io_wps_json or not len(io_wps_json["formats"]):
io_wps_json["formats"] = [DEFAULT_FORMAT.json]
for io_format in io_wps_json["formats"]:
transform_json(io_format, rename=rename, replace_values=replace_values, replace_func=replace_func)
# set 'default' format if it matches perfectly, or if only mime-type matches, and it is the only available one
# (this avoids 'encoding' possibly not matching due to CWL not providing this information)
io_default = get_field(io_wps_json, "default", search_variations=True)
for io_format in io_wps_json["formats"]:
io_format["default"] = (io_default != null and is_equal_formats(io_format, io_default))
if len(io_wps_json["formats"]) == 1 and not io_wps_json["formats"][0]["default"]:
io_single_fmt_mime_type = get_field(io_wps_json["formats"][0], "mime_type", search_variations=True)
if io_default:
io_default_mime_type = get_field(io_default, "mime_type", search_variations=True)
io_fmt_is_default = (io_default_mime_type == io_single_fmt_mime_type) # pylint: disable=C0325
io_wps_json["formats"][0]["default"] = io_fmt_is_default
elif DEFAULT_FORMAT.mime_type == io_single_fmt_mime_type:
io_supported = get_field(io_wps, "supported_formats", default=[DEFAULT_FORMAT])
io_missing = get_field(io_supported[0], DEFAULT_FORMAT_MISSING, default=False)
io_wps_json["formats"][0]["default"] = io_missing
elif io_wps_json["type"] == WPS_BOUNDINGBOX:
# nothing to be done for 'bbox' as 'crss', both are already plain lists of numbers/strings
io_wps_json["dimensions"] = None # reset in case default (2) was applied, both 2/3 simultaneously for OGC API
else: # literal
# retrieve the default definition with original value type (default 'data' is string encoded with it)
io_wps_default = get_field(io_wps, "default", search_variations=True)
if io_wps_default not in [null, None]:
io_wps_json["default"] = io_wps_default
if "allowed_values" not in io_wps_json:
io_field = get_field(io_wps, "allowed_values", search_variations=False)
if io_field is not null:
io_wps_json["allowed_values"] = [
io_allow.json if not isinstance(io_allow, dict) else io_allow
for io_allow in io_field
]
domains = any2json_literal_data_domains(io_wps_json)
if domains:
io_wps_json["literalDataDomains"] = domains
return io_wps_json
[docs]
def wps2json_job_payload(wps_request, wps_process):
# type: (WPSRequest, ProcessWPS) -> JSON
"""
Converts the input and output values of a :mod:`pywps` WPS ``Execute`` request to corresponding WPS-REST job.
The inputs and outputs must be parsed from XML POST payload or KVP GET query parameters, and converted to data
container defined by :mod:`pywps` based on the process definition.
"""
data = {
"inputs": [],
"outputs": [],
"response": ExecuteResponse.DOCUMENT,
"mode": ExecuteMode.ASYNC,
}
multi_inputs = list(wps_request.inputs.values())
for input_list in multi_inputs:
iid = get_any_id(input_list[0])
for input_value in input_list:
input_data = input_value.get("data")
input_href = input_value.get("href")
if input_href: # when href is provided, it must always be non-empty
data["inputs"].append({"id": iid, "href": input_href})
else: # no check if value to allow possible empty string, numeric zero or explicit null
data["inputs"].append({"id": iid, "data": input_data})
output_ids = list(wps_request.outputs)
for output in wps_process.outputs:
oid = output.identifier
as_ref = isinstance(output, ComplexOutput)
if oid not in output_ids:
data_output = {"identifier": oid, "asReference": str(as_ref).lower()}
else:
data_output = wps_request.outputs[oid]
if as_ref:
data_output["transmissionMode"] = ExecuteTransmissionMode.REFERENCE
else:
data_output["transmissionMode"] = ExecuteTransmissionMode.VALUE
data_output["id"] = oid
data["outputs"].append(data_output)
if not data["outputs"]:
data.pop("outputs") # ensure not 'no output' filter
return data
[docs]
def get_field(io_object,
field,
search_variations=False,
extra_variations=None,
only_variations=False,
pop_found=False,
key=False,
default=null,
):
# type: (Union[JSON, object], str, bool, Optional[List[str]], bool, bool, bool, Any) -> Any
"""
Gets a field by name from various :term:`I/O` object types.
Default value is :py:data:`null` used for most situations to differentiate from literal ``None`` which is often
used as default for parameters. The :class:`NullType` allows to explicitly tell that there was 'no field' and
not 'no value' in existing field. If you provided another value, it will be returned if not found within the
input object.
When :paramref:`search_variation` is enabled and that :paramref:`field` could not be found within the object,
field lookup will employ the values under the :paramref:`field` entry within :data:`WPS_FIELD_MAPPING` as
additional field names to search for an existing property or key. Search continues until the first match is found,
respecting order within the variations listing, and finally uses :paramref:`default` if no match was found.
:param io_object: Any :term:`I/O` representation, either as a class instance or :term:`JSON` container.
:param field: Name of the field to look for, either as property or key name based on input object type.
:param search_variations: If enabled, search for all variations to the field name to attempt search until matched.
:param extra_variations: Additional field names to consider as search variations, with priority over field mapping.
:param only_variations: If enabled, skip the first 'basic' field and start search directly with field variations.
:param pop_found: If enabled, whenever a match is found by field or variations, remove that entry from the object.
:param key: If enabled, whenever a match is found by field or variations, return matched key instead of the value.
:param default: Alternative default value to return if no match could be found.
:returns: Matched value (including search variations if enabled), or ``default``.
"""
if not (search_variations and only_variations):
if isinstance(io_object, dict):
value = io_object.get(field, null)
if value is not null:
if pop_found:
io_object.pop(field)
return field if key else value
else:
value = getattr(io_object, field, null)
if value is not null:
return field if key else value
variations = extra_variations or []
if search_variations or variations:
variations += WPS_FIELD_MAPPING.get(field, [])
for var in variations:
value = get_field(io_object, var, search_variations=False, only_variations=False, pop_found=pop_found)
if value is not null:
return var if key else value
return default
[docs]
def set_field(io_object, field, value, force=False):
# type: (Union[JSON, object], str, Any, bool) -> None
"""
Sets a field by name into various :term:`I/O` object types.
Field value is set only if not ``null`` to avoid inserting data considered `invalid`.
If ``force=True``, verification of ``null`` value is ignored.
"""
if value is not null or force:
if isinstance(io_object, dict):
io_object[field] = value
return
setattr(io_object, field, value)
[docs]
def _are_different_and_set(item1, item2):
# type: (Any, Any) -> bool
"""
Verifies if two items are set and are different of different "representative" value.
Compares two value representations and returns ``True`` only if both are not ``null``, are of same ``type`` and
of different representative value. By "representative", we consider here the visual representation of byte/unicode
strings rather than literal values to support XML/JSON and Python 2/3 implementations.
Other non-string-like types are verified with literal (usual) equality method.
"""
if item1 is null or item2 is null:
return False
try:
# Note:
# Calling ``==`` will result in one defined item's type ``__eq__`` method calling a property to validate
# equality with the second. When compared to a ``null``, ``None`` or differently typed second item, the
# missing property on the second item could raise and ``AssertionError`` depending on the ``__eq__``
# implementation (eg: ``Format`` checking for ``item.mime_type``, etc.).
equal = item1 == item2
except AttributeError:
return False
if equal:
return False
# Note: check for both (str, bytes) for any python implementation that modifies its value
type1 = str if isinstance(item1, (str, bytes)) else type(item1)
type2 = str if isinstance(item2, (str, bytes)) else type(item2)
if type1 is str and type2 is str:
return bytes2str(item1) != bytes2str(item2)
return True
[docs]
def normalize_ordered_io(io_section, order_hints=None):
# type: (JSON_IO_ListOrMap, Optional[JSON_IO_ListOrMap]) -> List[JSON]
"""
Reorders and converts :term:`I/O` from any representation (:class:`dict` or :class:`list`) using ordering hints.
First, converts :term:`I/O` definitions defined as dictionary to an equivalent :class:`list` representation,
in order to work only with a single representation method. The :class:`list` is chosen over :class:`dict` because
sequences can enforce a specific order, while mapping (when saved as :term:`JSON` or :term:`YAML`) have no specific
order. The list representation ensures that :term:`I/O` order is preserved when written to file and reloaded
afterwards regardless of server and/or library's implementation of the mapping container.
If this function fails to correctly order any :term:`I/O` or cannot correctly guarantee such result because of
the provided parameters (e.g.: no hints given when required), the result will not break nor change the final
processing behaviour of parsers. This is merely *cosmetic* adjustments to ease readability of :term:`I/O` to avoid
always shuffling their order across multiple :term:`Application Package` and :term:`Process` reporting formats.
The important result of this function is to provide the :term:`I/O` as a consistent list of objects, so it is less
cumbersome to compare/merge/iterate over the elements with all functions that will follow.
.. note::
When defined as a dictionary, an :class:`OrderedDict` is expected as input to ensure preserved field order.
Prior to Python 3.7 or CPython 3.5, preserved order is not guaranteed for *builtin* :class:`dict`.
In this case the :paramref:`order_hints` is required to ensure same order.
This function is intended for parsing :term:`I/O` from :term:`Process` descriptions, :term:`Application Package`
and other definitions that employ a ``"type"`` field. For submitted execution :term:`I/O` values, refer to other
relevant functions.
.. seealso::
- :func:`convert_input_values_schema`
- :func:`convert_output_params_schema`
:param io_section: Definition contained under the ``inputs`` or ``outputs`` fields.
:param order_hints: Optional/partial :term:`I/O` definitions hinting an order to sort unsorted-dict I/O.
:returns: :term:`I/O` specified as list of dictionary definitions with preserved order (as good as possible).
"""
if isinstance(io_section, list):
return io_section
io_list = [] # type: List[JSON]
io_dict = OrderedDict()
if isinstance(io_section, dict) and not isinstance(io_section, OrderedDict) and order_hints and len(order_hints):
# convert the hints themselves to list if they are provided as mapping
if isinstance(order_hints, dict):
order_list = []
for key, values in order_hints.items():
values["id"] = key
order_list.append(values)
order_hints = order_list
# pre-order I/O that can be resolved with hint when the specified I/O section is not ordered
io_section = deepcopy(io_section)
for hint in order_hints:
hint_id = get_field(hint, "identifier", search_variations=True)
if hint_id and hint_id in io_section: # ignore hint where ID could not be resolved
io_dict[hint_id] = io_section.pop(hint_id)
for hint in io_section:
io_dict[hint] = io_section[hint]
else:
io_dict = io_section
for io_id, io_value in io_dict.items():
# I/O value can be a literal type string or dictionary with more details at this point
# make it always detailed dictionary to avoid problems for later parsing
# this is also required to make the list, since all list items must have a matching type
if isinstance(io_value, str):
io_list.append({"type": io_value})
else:
io_list.append(io_value)
io_list[-1]["id"] = io_id
return io_list
[docs]
def merge_io_fields(wps_io, cwl_io):
# type: (WPS_IO_Type, WPS_IO_Type) -> WPS_IO_Type
"""
Combines corresponding :term:`I/O` fields from :term:`WPS` and :term:`CWL` definitions.
.. seealso::
:func:`cwl2wps_io` for conversion of :term:`CWL` to :term:`WPS` representation.
:param wps_io: Original :term:`WPS` :term:`I/O` provided in the process definition during deployment.
:param cwl_io: Converted :term:`CWL` :term:`I/O` into :term:`WPS` representation for matching similar details.
:return: Merged :term:`I/O` definition.
"""
# Retrieve any complementing fields (metadata, keywords, etc.) passed in CWL/WPS inputs
# Enforce some additional fields to keep value specified by WPS if applicable.
# These are only added here rather that 'WPS_FIELD_MAPPING' to avoid erroneous detection by other functions.
# - Literal: 'default' value defined by 'data' (forced converted to string) or '_default' with original value
# - Complex: 'default' format defined by 'data_format'
# (see function 'json2wps_io' for detail)
# Important to have 'data_format' after, as it depends on 'supported_formats' processed an interation before.
# Important to have 'metadata' before 'supported_formats' that interact together during mixed typed exchanges.
wps_field_list = ["metadata"] + list(set(WPS_FIELD_MAPPING) - {"metadata"}) + ["_default", "data_format"]
for field_type in wps_field_list:
cwl_field = get_field(cwl_io, field_type)
wps_field = get_field(wps_io, field_type)
# employ provided formats if different (keep WPS), or if CWL offers more that were missing in WPS
# because 'updated_io_list' contains the WPS I/O already, only need to push differences found in CWL
if _are_different_and_set(wps_field, cwl_field) or (wps_field is null and cwl_field is not null):
# because WPS Bbox is mapped against CWL Complex, adjust format to metadata in that case
# WPS expected to have no formats, since not a complex
# CWL expected to have only 1 because 'format' field is unique (see also 'cwl2wps_io' schema handling)
if (
field_type in ["supported_formats", "data_format"] and
isinstance(wps_io, BasicBoundingBox) and isinstance(cwl_io, BasicComplex)
):
cwl_field = cwl_field[0] if isinstance(cwl_field, (list, tuple)) else cwl_field
wps_field = get_field(wps_io, "metadata", default=[])
if not any(meta.href == cwl_field.schema for meta in wps_field):
wps_field.append(WPS_Metadata(
title="Schema",
href=cwl_field.schema,
role=SchemaRole.JSON_SCHEMA,
type_=cwl_field.mime_type
))
set_field(wps_io, "metadata", wps_field)
continue
if field_type == "supported_formats" and cwl_field is not null:
wps_field = merge_io_formats(wps_field, cwl_field)
# default 'data_format' must be one of the 'supported_formats'
# avoid setting something invalid in this case, or it will cause problem after
# note: 'supported_formats' must have been processed before
elif field_type == "data_format":
wps_fmts = get_field(wps_io, "supported_formats", search_variations=False, default=[])
if wps_field not in wps_fmts:
continue
set_field(wps_io, field_type, wps_field)
return wps_io
[docs]
def merge_package_io(wps_io_list, cwl_io_list, io_select):
# type: (List[ANY_IO_Type], List[WPS_IO_Type], IO_Select_Type) -> List[JSON_IO_Type]
"""
Merges corresponding parameters of different :term:`I/O` definition sources.
Handled definition formats include :term:`I/O` representation for :term:`CWL`, :term:`OpenAPI`, :term:`WPS` and
generic :term:`JSON` using properties resembling :mod:`pywps` or :mod:`owslib` objects.
Update :term:`I/O` definitions to use for :term:`Process` creation and returned
by ``GetCapabilities``/``DescribeProcess``.
If :term:`WPS` :term:`I/O` definitions where provided during deployment, update `CWL-to-WPS`
converted :term:`I/O` with the :term:`WPS` :term:`I/O` complementary details. If an :term:`OpenAPI` ``schema``
definition was provided to define the :term:`I/O`, infer the corresponding :term:`WPS` :term:`I/O` details.
Then, considering those resolved definitions and any missing information that could be inferred,
extend field requirements that can be retrieved from :term:`CWL` definitions.
Removes any deployment :term:`WPS` :term:`I/O` definitions that don't match any :term:`CWL` :term:`I/O` by ID,
since they will be of no use for the underlying :term:`Application Package`.
Adds missing deployment :term:`WPS` :term:`I/O` definitions using expected :term:`CWL` :term:`I/O` IDs.
.. seealso::
:func:`cwl2wps_io` for conversion of :term:`CWL` to :term:`WPS` representation.
:param wps_io_list: List of :term:`WPS` :term:`I/O` (as json) passed during process deployment.
:param cwl_io_list:
List of :term:`CWL` :term:`I/O` converted to :term:`WPS`-like :term:`I/O` for counter-validation.
:param io_select: :data:`IO_INPUT` or :data:`IO_OUTPUT` to specify desired WPS type conversion.
:returns:
List of updated :term:`JSON` :term:`I/O` combing :term:`CWL`, :term:`WPS` and :term:`OpenAPI` specifications.
"""
if not isinstance(cwl_io_list, list):
raise PackageTypeError("CWL I/O definitions must be provided, empty list if none required.")
if not wps_io_list:
wps_io_list = []
wps_io_dict = OrderedDict((get_field(wps_io, "identifier", search_variations=True), deepcopy(wps_io))
for wps_io in wps_io_list)
cwl_io_dict = OrderedDict((get_field(cwl_io, "identifier", search_variations=True), deepcopy(cwl_io))
for cwl_io in cwl_io_list)
missing_io_list = [cwl_io for cwl_io in cwl_io_dict if cwl_io not in wps_io_dict] # preserve ordering
updated_io_list = []
# WPS I/O by id not matching any converted CWL->WPS I/O are discarded
# otherwise, evaluate provided WPS I/O definitions and find potential new information to be merged
for cwl_id in cwl_io_dict:
cwl_io = cwl_io_dict[cwl_id]
if cwl_id in missing_io_list:
json_io = wps2json_io(cwl_io)
updated_io_list.append(json_io)
continue # missing WPS I/O can only be inferred using CWL->WPS definitions
# enforce expected CWL->WPS I/O required parameters
cwl_io_json = cwl_io.json
wps_io_json = wps_io_dict[cwl_id]
cwl_identifier = get_field(cwl_io_json, "identifier", search_variations=True)
cwl_title = get_field(wps_io_json, "title", search_variations=True)
wps_io_json.update({
"identifier": cwl_identifier,
"title": cwl_title if cwl_title is not null else cwl_identifier
})
# attempt to infer additional typing or constraints from OpenAPI schema if available
wps_io_schema = get_field(wps_io_json, "schema")
if wps_io_schema:
json_io_schema = oas2json_io(wps_io_schema)
if json_io_schema and isinstance(json_io_schema, dict):
wps_io_json.update(json_io_schema)
# check if WPS I/O resolves to default literal string due to missing detection of details for explicit type
# this is permitted if the corresponding CWL I/O can provide the remaining details of the partial WPS I/O
if "type" not in wps_io_json and "data_type" not in wps_io_json:
cwl_io_type = get_field(cwl_io_json, "type", search_variations=False)
wps_io_json["type"] = cwl_io_type
# preemptively transfer the specific data-type as well, otherwise we might need to deal with different ones
if cwl_io_type == WPS_LITERAL:
wps_io_json["data_type"] = get_field(cwl_io_json, "data_type", search_variations=False)
# fill missing WPS min/max occurs in 'provided' json to avoid overwriting resolved CWL values by WPS default '1'
# with 'default' field, this default '1' causes erroneous result when 'min_occurs' should be "0"
# with 'array' type, this default '1' causes erroneous result when 'max_occurs' should be "unbounded"
cwl_min_occurs = get_field(cwl_io_json, "min_occurs", search_variations=True)
cwl_max_occurs = get_field(cwl_io_json, "max_occurs", search_variations=True)
wps_min_occurs = get_field(wps_io_json, "min_occurs", search_variations=True)
wps_max_occurs = get_field(wps_io_json, "max_occurs", search_variations=True)
if wps_min_occurs == null and cwl_min_occurs != null:
wps_io_json["min_occurs"] = cwl_min_occurs
if wps_max_occurs == null and cwl_max_occurs != null:
wps_io_json["max_occurs"] = cwl_max_occurs
wps_io = json2wps_io(wps_io_json, io_select)
check_io_compatible(wps_io, cwl_io, cwl_id)
wps_io = merge_io_fields(wps_io, cwl_io)
# Given OpenAPI schema provided during WPS deployment, generate its extended I/O definition.
# This extension allows the user to provide only 'raw' or '$ref' complex object schema, while Weaver can
# receive it during execution either as raw data or file reference. This provides a more precise schema
# to what would actually be accepted/produced as input/output for the process.
# Otherwise, generate one that best represents available details using only available WPS/CWL fields.
json_io = wps2json_io(wps_io, forced_fields=True)
oas_io = json2oas_io(json_io, wps_io_schema)
json_io["schema"] = oas_io
updated_io_list.append(json_io)
return updated_io_list
[docs]
def check_io_compatible(wps_io, cwl_io, io_id):
# type: (WPS_IO_Type, WPS_IO_Type, str) -> None
"""
Validate types to ensure they match categories, otherwise merging will cause more confusion.
For `Literal`/`Complex` :term:`I/O` coming from :term:`WPS` side, they should be matched exactly
with `Literal`/`Complex` :term:`I/O` on the :term:`CWL` side.
.. note::
The :term`CWL` :term:`I/O` in this case is expected to be a :mod:`pywps` converted :term:`I/O`
from the :term`CWL` definition, and not a direct :term`CWL` :term:`I/O` definition.
.. warning::
When BoundingBox for :term:`WPS`, it should be mapped to ComplexInput on :term:`CWL` side (since no equivalent).
:raises PackageTypeError: If :term:`I/O` are not compatible.
"""
cwl_io_type = type(cwl_io)
wps_io_type = type(wps_io)
if not (
(wps_io_type in [LiteralInput, LiteralOutput] and cwl_io_type in [LiteralInput, LiteralOutput]) or
(wps_io_type in [BoundingBoxInput, BoundingBoxOutput] and cwl_io_type in [ComplexInput, ComplexOutput]) or
(wps_io_type in [ComplexInput, ComplexOutput] and cwl_io_type in [ComplexInput, ComplexOutput])
):
msg_err = f"Mismatching CWL/WPS types for merge of I/O ID: [{io_id}] "
msg_typ = f" (CWL: {fully_qualified_name(cwl_io_type)}, WPS: {fully_qualified_name(wps_io_type)})."
LOGGER.error("%s.\n CWL: %s\n WPS: %s", msg_err, cwl_io_type, wps_io_type)
raise PackageTypeError(msg_err + msg_typ)