"""
Schema definitions for `OpenAPI` generation and validation of data from received requests and returned responses.
This module should contain any and every definition in use to build the Swagger UI and the OpenAPI JSON schema
so that one can update the specification without touching any other files after the initial integration.
Schemas defined in this module are employed (through ``deserialize`` method calls) to validate that data conforms to
reported definitions. This makes the documentation of the API better aligned with resulting code execution under it.
It also provides a reference point for external users to understand expected data structures with complete schema
definitions generated on the exposed endpoints (JSON and Swagger UI).
The definitions are also employed to generate the `OpenAPI` definitions reported in the documentation published
on `Weaver`'s `ReadTheDocs` page.
"""
# pylint: disable=C0103,invalid-name
# pylint: disable=E0241,duplicate-bases
import datetime
import inspect
import os
import re
from copy import copy
from decimal import Decimal
from functools import cache
from typing import TYPE_CHECKING
import colander
import duration
import jsonschema
import yaml
from babel.numbers import list_currencies
from colander import All, DateTime, Email as EmailRegex, Length, Money, OneOf, Range, Regex, drop, null, required
from dateutil import parser as date_parser
from pygeofilter.backends.cql2_json import to_cql2
from pygeofilter.parsers import cql2_json, cql2_text, cql_json, ecql, fes, jfe
from weaver import WEAVER_SCHEMA_DIR, __meta__
from weaver.config import WeaverFeature
from weaver.execute import (
ExecuteCollectionFormat,
ExecuteControlOption,
ExecuteMode,
ExecuteResponse,
ExecuteTransmissionMode
)
from weaver.formats import (
EDAM_NAMESPACE,
EDAM_NAMESPACE_URL,
IANA_NAMESPACE,
IANA_NAMESPACE_URL,
OGC_NAMESPACE,
OGC_NAMESPACE_URL,
OPENGIS_NAMESPACE,
OPENGIS_NAMESPACE_URL,
AcceptLanguage,
ContentType,
OutputFormat
)
from weaver.owsexceptions import OWSMissingParameterValue
from weaver.processes.constants import (
CWL_NAMESPACE_CWL_SPEC_ID,
CWL_NAMESPACE_CWL_SPEC_URL,
CWL_NAMESPACE_CWLTOOL_ID,
CWL_NAMESPACE_CWLTOOL_URL,
CWL_NAMESPACE_OGC_API_PROC_PART1_ID,
CWL_NAMESPACE_OGC_API_PROC_PART1_URL,
CWL_NAMESPACE_SCHEMA_ID,
CWL_NAMESPACE_SCHEMA_METADATA_AUTHOR,
CWL_NAMESPACE_SCHEMA_METADATA_CODE_REPOSITORY,
CWL_NAMESPACE_SCHEMA_METADATA_CONTRIBUTOR,
CWL_NAMESPACE_SCHEMA_METADATA_DATE_CREATED,
CWL_NAMESPACE_SCHEMA_METADATA_EMAIL,
CWL_NAMESPACE_SCHEMA_METADATA_IDENTIFIER,
CWL_NAMESPACE_SCHEMA_METADATA_KEYWORDS,
CWL_NAMESPACE_SCHEMA_METADATA_LICENSE,
CWL_NAMESPACE_SCHEMA_METADATA_NAME,
CWL_NAMESPACE_SCHEMA_METADATA_PERSON,
CWL_NAMESPACE_SCHEMA_METADATA_SOFTWARE_VERSION,
CWL_NAMESPACE_SCHEMA_METADATA_VERSION,
CWL_NAMESPACE_SCHEMA_URL,
CWL_NAMESPACE_WEAVER_ID,
CWL_NAMESPACE_WEAVER_URL,
CWL_REQUIREMENT_APP_BUILTIN,
CWL_REQUIREMENT_APP_DOCKER,
CWL_REQUIREMENT_APP_DOCKER_GPU,
CWL_REQUIREMENT_APP_ESGF_CWT,
CWL_REQUIREMENT_APP_OGC_API,
CWL_REQUIREMENT_APP_WPS1,
CWL_REQUIREMENT_CUDA,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
CWL_REQUIREMENT_INPLACE_UPDATE,
CWL_REQUIREMENT_LOAD_LISTING,
CWL_REQUIREMENT_MULTIPLE_INPUT,
CWL_REQUIREMENT_NETWORK_ACCESS,
CWL_REQUIREMENT_RESOURCE,
CWL_REQUIREMENT_SCATTER,
CWL_REQUIREMENT_SECRETS,
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION,
CWL_REQUIREMENT_SUBWORKFLOW,
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE,
CWL_REQUIREMENTS_SUPPORTED,
OAS_COMPLEX_TYPES,
OAS_DATA_TYPES,
PACKAGE_ARRAY_BASE,
PACKAGE_ARRAY_ITEMS,
PACKAGE_CUSTOM_TYPES,
PACKAGE_ENUM_BASE,
PACKAGE_TYPE_POSSIBLE_VALUES,
WPS_LITERAL_DATA_TYPES,
JobInputsOutputsSchema,
JobStatusSchema,
ProcessSchema
)
from weaver.provenance import ProvenanceFormat
from weaver.quotation.status import QuoteStatus
from weaver.sort import Sort, SortMethods
from weaver.status import JOB_STATUS_CODE_API, JOB_STATUS_SEARCH_API, Status
from weaver.utils import AWS_S3_BUCKET_REFERENCE_PATTERN, json_hashable, load_file, repr_json
from weaver.visibility import Visibility
from weaver.wps_restapi.colander_extras import (
NO_DOUBLE_SLASH_PATTERN,
AllOfKeywordSchema,
AnyOfKeywordSchema,
BoundedRange,
CommaSeparated,
EmptyMappingSchema,
ExpandStringList,
ExtendedBoolean as Boolean,
ExtendedFloat as Float,
ExtendedInteger as Integer,
ExtendedMappingSchema,
ExtendedSchemaNode,
ExtendedSequenceSchema,
ExtendedString as String,
NoneType,
NotKeywordSchema,
OAS3DefinitionHandler,
OneOfCaseInsensitive,
OneOfKeywordSchema,
PermissiveMappingSchema,
PermissiveSequenceSchema,
SchemeURL,
SemanticVersion,
StrictMappingSchema,
StringOneOf,
StringRange,
XMLObject
)
from weaver.wps_restapi.constants import ConformanceCategory
from weaver.wps_restapi.patches import WeaverService as Service # warning: don't use 'cornice.Service'
if TYPE_CHECKING:
from typing import Any, Dict, List, Type, Union
from typing_extensions import TypedDict
from pygeofilter.ast import AstType as FilterAstType
from weaver.typedefs import DatetimeIntervalType, JSON
[docs]
ViewInfo = TypedDict("ViewInfo", {"name": str, "pattern": str})
[docs]
WEAVER_CONFIG_REMOTE_LIST = f"[{', '.join(WeaverFeature.REMOTE)}]"
[docs]
API_TITLE = "Weaver REST API"
[docs]
API_INFO = {
"description": __meta__.__description__,
"contact": {"name": __meta__.__authors__, "email": __meta__.__emails__, "url": __meta__.__source_repository__}
}
[docs]
API_DOCS = {
"description": f"{__meta__.__title__} documentation",
"url": __meta__.__documentation_url__
}
[docs]
DOC_URL = f"{__meta__.__documentation_url__}/en/latest"
[docs]
CWL_REPO_URL = "https://github.com/common-workflow-language"
[docs]
CWL_SCHEMA_BRANCH = "v1.2.1"
[docs]
CWL_SCHEMA_PATH = "json-schema/cwl.yaml"
[docs]
CWL_SCHEMA_REPO = f"https://raw.githubusercontent.com/common-workflow-language/cwl-{CWL_VERSION}"
[docs]
CWL_SCHEMA_URL = f"{CWL_SCHEMA_REPO}/{CWL_SCHEMA_BRANCH}/{CWL_SCHEMA_PATH}"
[docs]
CWL_BASE_URL = "https://www.commonwl.org"
[docs]
CWL_SPEC_URL = f"{CWL_BASE_URL}/#Specification"
[docs]
CWL_USER_GUIDE_URL = f"{CWL_BASE_URL}/user_guide"
[docs]
CWL_DOC_BASE_URL = f"{CWL_BASE_URL}/{CWL_VERSION}"
[docs]
CWL_WORKFLOW_URL = f"{CWL_DOC_BASE_URL}/Workflow.html"
[docs]
CWL_DOC_MESSAGE = (
"Note that multiple formats are supported and not all specification variants or parameters "
f"are presented here. Please refer to official CWL documentation for more details ({CWL_BASE_URL})."
)
[docs]
IO_INFO_IDS = (
"Identifier of the {first} {what}. To merge details between corresponding {first} and {second} "
"{what} specifications, this is the value that will be used to associate them together."
)
# development references
[docs]
OGC_API_REPO_URL = "https://github.com/opengeospatial/ogcapi-processes"
[docs]
OGC_API_SCHEMA_URL = "https://raw.githubusercontent.com/opengeospatial/ogcapi-processes"
[docs]
OGC_API_SCHEMA_VERSION = "master"
[docs]
OGC_API_SCHEMA_BASE = f"{OGC_API_SCHEMA_URL}/{OGC_API_SCHEMA_VERSION}"
[docs]
OGC_API_SCHEMA_CORE = f"{OGC_API_SCHEMA_BASE}/openapi/schemas/processes-core"
[docs]
OGC_API_EXAMPLES_CORE = f"{OGC_API_SCHEMA_BASE}/core/examples"
# FIXME: OGC OpenAPI schema restructure (https://github.com/opengeospatial/ogcapi-processes/issues/319)
# OGC_API_SCHEMA_EXT_DEPLOY = f"{OGC_API_SCHEMA_BASE}/openapi/schemas/processes-dru"
[docs]
OGC_API_SCHEMA_EXT_DEPLOY = f"{OGC_API_SCHEMA_BASE}/openapi/schemas/processes-dru"
[docs]
OGC_API_EXAMPLES_EXT_DEPLOY = f"{OGC_API_SCHEMA_BASE}/extensions/deploy_replace_undeploy/examples"
# not available yet:
[docs]
OGC_API_SCHEMA_EXT_BILL = f"{OGC_API_SCHEMA_BASE}/openapi/schemas/processes-billing"
[docs]
OGC_API_SCHEMA_EXT_QUOTE = f"{OGC_API_SCHEMA_BASE}/openapi/schemas/processes-quotation"
[docs]
OGC_API_SCHEMA_EXT_WORKFLOW = f"{OGC_API_SCHEMA_BASE}/openapi/schemas/processes-workflows"
# official/published references
[docs]
OGC_API_SCHEMAS_URL = "https://schemas.opengis.net"
[docs]
OGC_API_COMMON_PART1_BASE = f"{OGC_API_SCHEMAS_URL}/ogcapi/common/part1/1.0"
[docs]
OGC_API_COMMON_PART1_SCHEMAS = f"{OGC_API_COMMON_PART1_BASE}/openapi/schemas"
[docs]
OGC_API_PROC_PART1_BASE = f"{OGC_API_SCHEMAS_URL}/ogcapi/processes/part1/1.0"
[docs]
OGC_API_PROC_PART1_SCHEMAS = f"{OGC_API_PROC_PART1_BASE}/openapi/schemas"
[docs]
OGC_API_PROC_PART1_RESPONSES = f"{OGC_API_PROC_PART1_BASE}/openapi/responses"
[docs]
OGC_API_PROC_PART1_PARAMETERS = f"{OGC_API_PROC_PART1_BASE}/openapi/parameters"
[docs]
OGC_API_PROC_PART1_EXAMPLES = f"{OGC_API_PROC_PART1_BASE}/examples"
[docs]
OGC_WPS_1_SCHEMAS = f"{OGC_API_SCHEMAS_URL}/wps/1.0.0"
[docs]
OGC_WPS_2_SCHEMAS = f"{OGC_API_SCHEMAS_URL}/wps/2.0"
# Because this type has special handling functionalities to distinguish it from any other usual 'complex' I/O
# or any generic JSON-object data, define common constants that can be reused across the code.
# If this changes later on, it will be easier to ensure backward compatibility with explicit references to it.
[docs]
OGC_API_BBOX_SCHEMA = f"{OGC_API_PROC_PART1_SCHEMAS}/bbox.yaml"
[docs]
OGC_API_BBOX_EPSG = "EPSG:4326"
[docs]
OGC_API_SCHEMA_JOB_STATUS_URL = f"{OGC_API_PROC_PART1_SCHEMAS}/statusInfo.yaml"
[docs]
OPENEO_API_SCHEMA_URL = "https://openeo.org/documentation/1.0/developers/api/openapi.yaml"
[docs]
OPENEO_API_SCHEMA_JOB_STATUS_URL = f"{OPENEO_API_SCHEMA_URL}#/components/schemas/batch_job"
[docs]
WEAVER_SCHEMA_VERSION = "master"
[docs]
WEAVER_SCHEMA_URL = f"https://raw.githubusercontent.com/crim-ca/weaver/{WEAVER_SCHEMA_VERSION}/weaver/schemas"
[docs]
DATETIME_INTERVAL_CLOSED_SYMBOL = "/"
[docs]
DATETIME_INTERVAL_OPEN_START_SYMBOL = "../"
[docs]
DATETIME_INTERVAL_OPEN_END_SYMBOL = "/.."
# fields ordering for generation of ProcessDescription body (shared for OGC/OLD schema format)
[docs]
PROCESS_DESCRIPTION_FIELD_FIRST = [
"id",
"title",
"version",
"mutable",
"abstract", # backward compat for deployment
"description",
"keywords",
"metadata",
"inputs",
"outputs"
]
[docs]
PROCESS_DESCRIPTION_FIELD_AFTER = [
"processDescriptionURL",
"processEndpointWPS1",
"executeEndpoint",
"deploymentProfile",
"links"
]
# fields ordering for nested process definition of OLD schema format of ProcessDescription
[docs]
PROCESS_DESCRIPTION_FIELD_FIRST_OLD_SCHEMA = ["process"]
[docs]
PROCESS_DESCRIPTION_FIELD_AFTER_OLD_SCHEMA = ["links"]
[docs]
PROCESS_IO_FIELD_FIRST = ["id", "title", "description", "minOccurs", "maxOccurs"]
[docs]
PROCESS_IO_FIELD_AFTER = ["literalDataDomains", "formats", "crs", "bbox"]
[docs]
PROCESSES_LISTING_FIELD_FIRST = ["description", "processes", "providers"]
[docs]
PROCESSES_LISTING_FIELD_AFTER = ["page", "limit", "count", "total", "links"]
[docs]
PROVIDER_DESCRIPTION_FIELD_FIRST = [
"id",
"title",
"version",
"mutable",
"description",
"url",
"type",
"public",
"keywords",
"metadata",
]
[docs]
PROVIDER_DESCRIPTION_FIELD_AFTER = ["links"]
[docs]
JOBS_LISTING_FIELD_FIRST = ["description", "jobs", "groups"]
[docs]
JOBS_LISTING_FIELD_AFTER = ["page", "limit", "count", "total", "links"]
[docs]
QUOTES_LISTING_FIELD_FIRST = ["description", "quotations"]
[docs]
QUOTES_LISTING_FIELD_AFTER = ["page", "limit", "count", "total", "links"]
#########################################################
# Examples
#########################################################
# load examples by file names as keys
[docs]
SCHEMA_EXAMPLE_DIR = os.path.join(os.path.dirname(__file__), "examples")
for _name in os.listdir(SCHEMA_EXAMPLE_DIR):
[docs]
_path = os.path.join(SCHEMA_EXAMPLE_DIR, _name)
_ext = os.path.splitext(_name)[-1]
with open(_path, "r", encoding="utf-8") as f:
if _ext in [".json", ".yaml", ".yml"]:
EXAMPLES[_name] = yaml.safe_load(f) # both JSON/YAML
else:
EXAMPLES[_name] = f.read()
#########################################################
# API tags
#########################################################
[docs]
TAG_VISIBILITY = "Visibility"
[docs]
TAG_BILL_QUOTE = "Billing & Quoting"
[docs]
TAG_PROVIDERS = "Providers"
[docs]
TAG_PROCESSES = "Processes"
[docs]
TAG_GETCAPABILITIES = "GetCapabilities"
[docs]
TAG_DESCRIBEPROCESS = "DescribeProcess"
[docs]
TAG_EXECUTE = "Execute"
[docs]
TAG_DISMISS = "Dismiss"
[docs]
TAG_RESULTS = "Results"
[docs]
TAG_EXCEPTIONS = "Exceptions"
[docs]
TAG_STATISTICS = "Statistics"
[docs]
TAG_PROVENANCE = "Provenance"
[docs]
TAG_DEPRECATED = "Deprecated Endpoints"
###############################################################################
# API endpoints
# These "services" are wrappers that allow Cornice to generate the JSON API
###############################################################################
[docs]
api_frontpage_service = Service(name="api_frontpage", path="/")
[docs]
api_openapi_ui_service = Service(name="api_openapi_ui", path="/api") # idem to swagger
[docs]
api_swagger_ui_service = Service(name="api_swagger_ui", path="/swagger")
[docs]
api_redoc_ui_service = Service(name="api_redoc_ui", path="/redoc")
[docs]
api_versions_service = Service(name="api_versions", path="/versions")
[docs]
openapi_json_service = Service(name="openapi_json", path="/json")
[docs]
quotes_service = Service(name="quotes", path="/quotations")
[docs]
quote_service = Service(name="quote", path=f"{quotes_service.path}/{{quote_id}}")
[docs]
bills_service = Service(name="bills", path="/bills")
[docs]
bill_service = Service(name="bill", path=f"{bills_service.path}/{{bill_id}}")
[docs]
jobs_service = Service(name="jobs", path="/jobs")
[docs]
job_service = Service(name="job", path=f"{jobs_service.path}/{{job_id}}")
[docs]
job_results_service = Service(name="job_results", path=f"{job_service.path}/results")
[docs]
job_outputs_service = Service(name="job_outputs", path=f"{job_service.path}/outputs")
[docs]
job_exceptions_service = Service(name="job_exceptions", path=f"{job_service.path}/exceptions")
[docs]
job_logs_service = Service(name="job_logs", path=f"{job_service.path}/logs")
[docs]
job_stats_service = Service(name="job_stats", path=f"{job_service.path}/statistics")
[docs]
job_prov_service = Service(name="job_prov", path=f"{job_service.path}/prov")
[docs]
job_prov_info_service = Service(name="job_prov_info", path=f"{job_prov_service.path}/info")
[docs]
job_prov_who_service = Service(name="job_prov_who", path=f"{job_prov_service.path}/who")
[docs]
job_prov_outputs_service = Service(name="job_prov_outputs", path=f"{job_prov_service.path}/outputs")
[docs]
job_prov_outputs_run_service = Service(name="job_prov_outputs_run", path=f"{job_prov_service.path}/outputs/{{run_id}}")
[docs]
job_prov_run_service = Service(name="job_prov_run", path=f"{job_prov_service.path}/run")
[docs]
job_prov_run_id_service = Service(name="job_prov_run_id", path=f"{job_prov_service.path}/run/{{run_id}}")
[docs]
job_prov_runs_service = Service(name="job_prov_runs", path=f"{job_prov_service.path}/runs")
[docs]
processes_service = Service(name="processes", path="/processes")
[docs]
process_service = Service(name="process", path=f"{processes_service.path}/{{process_id}}")
[docs]
process_quotes_service = Service(name="process_quotes", path=process_service.path + quotes_service.path)
[docs]
process_quote_service = Service(name="process_quote", path=process_service.path + quote_service.path)
[docs]
process_estimator_service = Service(name="process_estimator_service", path=f"{process_service.path}/estimator")
[docs]
process_visibility_service = Service(name="process_visibility", path=f"{process_service.path}/visibility")
[docs]
process_package_service = Service(name="process_package", path=f"{process_service.path}/package")
[docs]
process_payload_service = Service(name="process_payload", path=f"{process_service.path}/payload")
[docs]
process_execution_service = Service(name="process_execution", path=f"{process_service.path}/execution")
[docs]
process_jobs_service = Service(name="process_jobs", path=process_service.path + jobs_service.path)
[docs]
process_job_service = Service(name="process_job", path=process_service.path + job_service.path)
[docs]
process_results_service = Service(name="process_results", path=process_service.path + job_results_service.path)
[docs]
process_outputs_service = Service(name="process_outputs", path=process_service.path + job_outputs_service.path)
[docs]
process_exceptions_service = Service(name="process_exceptions", path=process_service.path + job_exceptions_service.path)
[docs]
process_logs_service = Service(name="process_logs", path=process_service.path + job_logs_service.path)
[docs]
process_stats_service = Service(name="process_stats", path=process_service.path + job_stats_service.path)
[docs]
process_prov_service = Service(name="process_prov", path=process_service.path + job_prov_service.path)
[docs]
process_prov_info_service = Service(name="process_prov_info", path=process_service.path + job_prov_info_service.path)
[docs]
process_prov_who_service = Service(name="process_prov_who", path=process_service.path + job_prov_who_service.path)
[docs]
process_prov_outputs_service = Service(
name="process_prov_outputs",
path=process_service.path + job_prov_outputs_service.path,
)
[docs]
process_prov_outputs_run_service = Service(
name="process_prov_outputs_run",
path=process_service.path + job_prov_outputs_run_service.path,
)
[docs]
process_prov_run_service = Service(
name="process_prov_run",
path=process_service.path + job_prov_run_service.path,
)
[docs]
process_prov_run_id_service = Service(
name="process_prov_run_id",
path=process_service.path + job_prov_run_id_service.path,
)
[docs]
process_prov_runs_service = Service(
name="process_prov_runs",
path=process_service.path + job_prov_runs_service.path,
)
[docs]
providers_service = Service(name="providers", path="/providers")
[docs]
provider_service = Service(name="provider", path=f"{providers_service.path}/{{provider_id}}")
[docs]
provider_processes_service = Service(name="provider_processes", path=provider_service.path + processes_service.path)
[docs]
provider_process_service = Service(name="provider_process", path=provider_service.path + process_service.path)
[docs]
provider_process_package_service = Service(name="provider_process_pkg", path=f"{provider_process_service.path}/package")
[docs]
provider_execution_service = Service(name="provider_execution", path=f"{provider_process_service.path}/execution")
[docs]
provider_jobs_service = Service(name="provider_jobs", path=provider_service.path + process_jobs_service.path)
[docs]
provider_job_service = Service(name="provider_job", path=provider_service.path + process_job_service.path)
[docs]
provider_results_service = Service(name="provider_results", path=provider_service.path + process_results_service.path)
[docs]
provider_outputs_service = Service(name="provider_outputs", path=provider_service.path + process_outputs_service.path)
[docs]
provider_exceptions_service = Service(
name="provider_exceptions",
path=provider_service.path + process_exceptions_service.path,
)
[docs]
provider_logs_service = Service(name="provider_logs", path=provider_service.path + process_logs_service.path)
[docs]
provider_stats_service = Service(name="provider_stats", path=provider_service.path + process_stats_service.path)
[docs]
provider_prov_service = Service(name="provider_prov", path=provider_service.path + process_prov_service.path)
[docs]
provider_prov_info_service = Service(
name="provider_prov_info",
path=provider_service.path + process_prov_info_service.path,
)
[docs]
provider_prov_who_service = Service(
name="provider_prov_who",
path=provider_service.path + process_prov_who_service.path,
)
[docs]
provider_prov_outputs_service = Service(
name="provider_prov_outputs",
path=provider_service.path + process_prov_outputs_service.path,
)
[docs]
provider_prov_outputs_run_service = Service(
name="provider_prov_outputs_run",
path=provider_service.path + process_prov_outputs_run_service.path,
)
[docs]
provider_prov_run_service = Service(
name="provider_prov_run",
path=provider_service.path + process_prov_run_service.path,
)
[docs]
provider_prov_run_id_service = Service(
name="provider_prov_run_id",
path=provider_service.path + process_prov_run_id_service.path,
)
[docs]
provider_prov_runs_service = Service(
name="provider_prov_runs",
path=provider_service.path + process_prov_runs_service.path,
)
# backward compatibility deprecated routes
[docs]
job_result_service = Service(name="job_result", path=f"{job_service.path}/result")
[docs]
process_result_service = Service(name="process_result", path=process_service.path + job_result_service.path)
[docs]
provider_result_service = Service(name="provider_result", path=provider_service.path + process_result_service.path)
[docs]
vault_service = Service(name="vault", path="/vault")
[docs]
vault_file_service = Service(name="vault_file", path=f"{vault_service.path}/{{file_id}}")
#########################################################
# Generic schemas
#########################################################
[docs]
class SLUG(ExtendedSchemaNode):
[docs]
description = "Slug name pattern."
[docs]
example = "some-object-slug-name"
[docs]
pattern = re.compile(r"^[A-Za-z0-9]+(?:[-_][A-Za-z0-9]+)*$")
[docs]
class Tag(ExtendedSchemaNode):
[docs]
description = "Identifier with optional tagged version forming a unique reference."
# ranges used to remove starting/ending ^$ characters
[docs]
pattern = re.compile(
rf"{SLUG.pattern.pattern[:-1]}"
rf"(:{SemanticVersion(v_prefix=False, rc_suffix=False).pattern[1:-1]})?$"
)
[docs]
class URL(ExtendedSchemaNode):
"""
String format that will be automatically mapped to a URL-pattern validator.
.. seealso::
- :data:`weaver.wps_restapi.colander_extras.URL`
- :class:`weaver.wps_restapi.colander_extras.ExtendedSchemaBase`
"""
[docs]
description = "URL reference."
[docs]
class URI(ExtendedSchemaNode):
"""
String format that will be automatically mapped to a URI-pattern validator.
.. seealso::
- :data:`weaver.wps_restapi.colander_extras.URI`
- :class:`weaver.wps_restapi.colander_extras.ExtendedSchemaBase`
"""
[docs]
description = "URI reference."
[docs]
class Email(ExtendedSchemaNode):
[docs]
description = "Email recipient."
[docs]
validator = EmailRegex()
[docs]
class QueryBoolean(Boolean):
[docs]
description = "Boolean query parameter that allows handles common truthy/falsy values."
def __init__(self, *_, **__):
# type: (*Any, **Any) -> None
super(QueryBoolean, self).__init__(
allow_string=True,
false_choices=("False", "false", "0", "off", "no", "null", "Null", "none", "None", ""),
true_choices=("True", "true", "1", "on", "yes")
)
[docs]
class DateTimeInterval(ExtendedSchemaNode):
[docs]
_schema = f"{OGC_API_PROC_PART1_PARAMETERS}/datetime.yaml"
[docs]
description = (
"DateTime format against OGC API - Processes, "
"to get values before a certain date-time use '../' before the date-time, "
"to get values after a certain date-time use '/..' after the date-time like the example, "
"to get values between two date-times use '/' between the date-times, "
"to get values with a specific date-time just pass the datetime. "
)
[docs]
example = "2022-03-02T03:32:38.487000+00:00/.."
[docs]
regex_datetime = re.compile(r"(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d(\.\d+)?(([+-]\d\d:\d\d)|Z)?)")
[docs]
regex_interval_closed = re.compile(rf"{regex_datetime.pattern}\/{regex_datetime.pattern}")
[docs]
regex_interval_open_start = re.compile(rf"\.\.\/{regex_datetime.pattern}")
[docs]
regex_interval_open_end = re.compile(rf"{regex_datetime.pattern}\/\.\.")
[docs]
pattern = re.compile(
rf"^{regex_datetime.pattern}"
rf"|{regex_interval_closed.pattern}"
rf"|{regex_interval_open_start.pattern}"
rf"|{regex_interval_open_end.pattern}"
r"$"
)
[docs]
class S3BucketReference(ExtendedSchemaNode):
[docs]
description = "S3 bucket shorthand URL representation: 's3://{bucket}/[{dirs}/][{file-key}]'"
[docs]
pattern = AWS_S3_BUCKET_REFERENCE_PATTERN
[docs]
class FileLocal(ExtendedSchemaNode):
[docs]
description = "Local file reference."
[docs]
pattern = re.compile(rf"^(file://)?{NO_DOUBLE_SLASH_PATTERN}(?:/|[/?]\S+)$")
[docs]
class FileURL(ExtendedSchemaNode):
[docs]
description = "URL file reference."
[docs]
validator = SchemeURL(schemes=["http", "https"])
[docs]
class VaultReference(ExtendedSchemaNode):
[docs]
description = "Vault file reference."
[docs]
example = "vault://399dc5ac-ff66-48d9-9c02-b144a975abe4"
[docs]
pattern = re.compile(r"^vault://[a-f0-9]{8}(?:-?[a-f0-9]{4}){3}-?[a-f0-9]{12}$")
[docs]
class ProcessURL(ExtendedSchemaNode):
[docs]
description = "Process URL reference."
[docs]
validator = SchemeURL(schemes=["http", "https"], path_pattern=r"(?:/processes/\S+/?)")
[docs]
class ReferenceURL(AnyOfKeywordSchema):
[docs]
_any_of = [
FileURL(),
FileLocal(),
S3BucketReference(),
]
[docs]
class ExecuteReferenceURL(AnyOfKeywordSchema):
[docs]
_any_of = [
FileURL(),
FileLocal(),
S3BucketReference(),
VaultReference(),
]
[docs]
class UUID(ExtendedSchemaNode):
[docs]
description = "Unique identifier."
[docs]
example = "a9d14bf4-84e0-449a-bac8-16e598efe807"
[docs]
pattern = re.compile("^[a-f0-9]{8}(?:-?[a-f0-9]{4}){3}-?[a-f0-9]{12}$")
[docs]
class AnyIdentifier(SLUG):
pass
[docs]
class CWLFileName(SLUG):
[docs]
description = "File with a CWL extension."
[docs]
pattern = re.compile(
f"{SLUG.pattern.pattern[:-1]}" # remove '$'
r"\.cwl$"
)
[docs]
class ProcessIdentifier(AnyOfKeywordSchema):
[docs]
description = "Process identifier."
[docs]
_any_of = [
# UUID first because more strict than SLUG, and SLUG can be similar to UUID, but in the end any is valid
UUID(description="Unique identifier."),
SLUG(description="Generic identifier. This is a user-friendly slug-name. "
"Note that this will represent the latest process matching this name. "
"For specific process version, use the UUID instead.", title="ID"),
]
[docs]
class ProcessIdentifierTag(AnyOfKeywordSchema):
[docs]
description = "Process identifier with optional revision tag."
[docs]
_schema = f"{OGC_API_PROC_PART1_PARAMETERS}/processIdPathParam.yaml"
[docs]
_any_of = [Tag] + ProcessIdentifier._any_of # type: ignore # noqa: W0212
[docs]
class JobID(UUID):
[docs]
_schema = f"{OGC_API_PROC_PART1_PARAMETERS}/jobId.yaml"
[docs]
description = "ID of the job."
[docs]
example = "a9d14bf4-84e0-449a-bac8-16e598efe807"
[docs]
class Version(ExtendedSchemaNode):
[docs]
description = "Version string."
[docs]
validator = SemanticVersion()
# FIXME: oneOf validator for supported languages (?)
[docs]
class NoContent(ExtendedMappingSchema):
[docs]
description = "Empty response body."
[docs]
class FileUploadContent(ExtendedSchemaNode):
[docs]
schema_type = String()
[docs]
description = (
"Contents of the file being uploaded with multipart. When prefixed with 'Content-Type: {media-type}', the "
"specified format will be applied to the input that will be attributed the 'vault://{UUID}' during execution. "
"Contents can also have 'Content-Disposition' definition to provide the desired file name."
)
[docs]
class AccessToken(ExtendedSchemaNode):
[docs]
class DescriptionSchema(ExtendedMappingSchema):
[docs]
description = ExtendedSchemaNode(String(), description="Description of the obtained contents.")
[docs]
class KeywordList(ExtendedSequenceSchema):
[docs]
keyword = ExtendedSchemaNode(String(), validator=Length(min=1))
[docs]
class Language(ExtendedSchemaNode):
[docs]
example = AcceptLanguage.EN_CA
[docs]
validator = OneOf(AcceptLanguage.values())
[docs]
class ValueLanguage(ExtendedMappingSchema):
[docs]
lang = Language(missing=drop, description="Language of the value content.")
[docs]
class LinkLanguage(ExtendedMappingSchema):
[docs]
hreflang = Language(missing=drop, description="Language of the content located at the link.")
[docs]
class LinkRelationshipType(OneOfKeywordSchema):
[docs]
description = (
"Link relation as registered or extension type "
"(see https://www.rfc-editor.org/rfc/rfc8288.html#section-2.1)."
)
[docs]
_one_of = [
SLUG(description=(
"Relationship of the link to the current content. "
"This should be one item amongst registered relations https://www.iana.org/assignments/link-relations/."
)),
URL(description="Fully qualified extension link relation to the current content.")
]
[docs]
class LinkRelationship(ExtendedMappingSchema):
[docs]
rel = LinkRelationshipType()
[docs]
class LinkBase(LinkLanguage, MetadataBase):
[docs]
href = URL(description="Hyperlink reference.")
[docs]
type = MediaType(description="IANA identifier of content-type located at the link.", missing=drop)
[docs]
class Link(LinkRelationship, LinkBase):
[docs]
_schema = f"{OGC_API_COMMON_PART1_SCHEMAS}/link.json"
[docs]
_schema_include_deserialize = False # only in OpenAPI otherwise too verbose
[docs]
class MetadataContent(OneOfKeywordSchema):
[docs]
_one_of = [
MetadataLink(),
MetadataValue(),
]
[docs]
class LinkList(ExtendedSequenceSchema):
[docs]
description = "List of links relative to the applicable object."
[docs]
class LandingPage(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/landingPage.yaml"
[docs]
links = LinkList()
# sub-schema within:
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/format.yaml
# only extra portion from:
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1538-L1547
# although original schema defines 'default' in above 'FormatDescription', separate it in order to omit it
# from 'ResultFormat' employed for result reporting, which shouldn't have a default (applied vs supported format)
[docs]
class AdditionalParameterUnique(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(String(), title="InputParameterLiteral.String"),
ExtendedSchemaNode(Boolean(), title="InputParameterLiteral.Boolean"),
ExtendedSchemaNode(Integer(), title="InputParameterLiteral.Integer"),
ExtendedSchemaNode(Float(), title="InputParameterLiteral.Float"),
# PermissiveMappingSchema(title="InputParameterLiteral.object"),
]
[docs]
class AdditionalParameterListing(ExtendedSequenceSchema):
[docs]
param = AdditionalParameterUnique()
[docs]
class AdditionalParameterValues(OneOfKeywordSchema):
[docs]
_one_of = [
AdditionalParameterUnique(),
AdditionalParameterListing()
]
[docs]
class AdditionalParameterDefinition(ExtendedMappingSchema):
[docs]
name = SLUG(title="AdditionalParameterName", example="EOImage")
[docs]
values = AdditionalParameterValues(example=["true"])
[docs]
class AdditionalParameterList(ExtendedSequenceSchema):
[docs]
param = AdditionalParameterDefinition()
[docs]
class AdditionalParameters(ExtendedMappingSchema):
[docs]
parameters = AdditionalParameterList()
[docs]
class AdditionalParametersItem(AnyOfKeywordSchema):
[docs]
_any_of = [
AdditionalParametersMeta(),
AdditionalParameters()
]
[docs]
class AdditionalParametersList(ExtendedSequenceSchema):
[docs]
additionalParameter = AdditionalParametersItem()
[docs]
class Content(ExtendedMappingSchema):
[docs]
href = ReferenceURL(description="URL to CWL file.", title="OWSContentURL",
default=drop, # if invalid, drop it completely,
missing=required, # but still mark as 'required' for parent objects
example="http://some.host/applications/cwl/multisensor_ndvi.cwl")
[docs]
class Offering(ExtendedMappingSchema):
[docs]
code = ExtendedSchemaNode(String(), missing=drop, description="Descriptor of represented information in 'content'.")
[docs]
content = Content()
[docs]
class OWSContext(ExtendedMappingSchema):
[docs]
description = "OGC Web Service definition from an URL reference."
[docs]
title = "owsContext"
[docs]
offering = Offering()
[docs]
class DescriptionBase(ExtendedMappingSchema):
[docs]
title = ExtendedSchemaNode(String(), missing=drop, description="Short human-readable name of the object.")
[docs]
description = ExtendedSchemaNode(String(), missing=drop, description="Detailed explanation of the object.")
[docs]
class DescriptionLinks(ExtendedMappingSchema):
[docs]
links = LinkList(missing=drop, description="References to endpoints with information related to object.")
[docs]
class ProcessContext(ExtendedMappingSchema):
[docs]
owsContext = OWSContext(missing=drop)
[docs]
class DescriptionType(DescriptionBase, DescriptionLinks, DescriptionExtra):
pass
[docs]
class DeploymentType(DescriptionType):
[docs]
abstract = ExtendedSchemaNode(
String(), missing=drop, deprecated=True,
description="Description of the object. Will be replaced by 'description' field if not already provided. "
"Preserved for backward compatibility of pre-existing process deployment. "
"Consider using 'description' directly instead."
)
[docs]
class ReferenceOAS(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/reference.yaml"
[docs]
_ref = ReferenceURL(name="$ref", description="External OpenAPI schema reference.")
[docs]
class TypeOAS(ExtendedSchemaNode):
[docs]
validator = OneOf(OAS_DATA_TYPES)
[docs]
class EnumItemOAS(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(Float()),
ExtendedSchemaNode(Integer()),
ExtendedSchemaNode(String()),
]
[docs]
class EnumOAS(ExtendedSequenceSchema):
[docs]
class RequiredOAS(ExtendedSequenceSchema):
[docs]
required_field = ExtendedSchemaNode(String(), description="Name of the field that is required under the object.")
[docs]
class MultipleOfOAS(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(Float()),
ExtendedSchemaNode(Integer()),
]
[docs]
class PermissiveDefinitionOAS(NotKeywordSchema, PermissiveMappingSchema):
[docs]
_not = [
ReferenceOAS
]
# cannot make recursive declarative schemas
# simulate it and assume it is sufficient for validation purposes
[docs]
class PseudoObjectOAS(OneOfKeywordSchema):
[docs]
_one_of = [
ReferenceOAS(),
PermissiveDefinitionOAS(),
]
[docs]
class KeywordObjectOAS(ExtendedSequenceSchema):
[docs]
item = PseudoObjectOAS()
[docs]
class AdditionalPropertiesOAS(OneOfKeywordSchema):
[docs]
_one_of = [
ReferenceOAS(),
PermissiveDefinitionOAS(),
ExtendedSchemaNode(Boolean())
]
[docs]
class AnyValueOAS(AnyOfKeywordSchema):
[docs]
_any_of = [
PermissiveMappingSchema(),
PermissiveSequenceSchema(),
ExtendedSchemaNode(Float()),
ExtendedSchemaNode(Integer()),
ExtendedSchemaNode(Boolean()),
ExtendedSchemaNode(String()),
]
# reference:
# https://raw.githubusercontent.com/opengeospatial/ogcapi-processes/master/core/openapi/schemas/schema.yaml
# note:
# although reference definition provides multiple 'default: 0|false' entries, we omit them since the behaviour
# of colander with extended schema nodes is to set this value by default in deserialize result if they were missing,
# but reference 'default' correspond more to the default *interpretation* value if none was provided.
# It is preferable in our case to omit (i.e.: drop) these defaults to keep obtained/resolved definitions succinct,
# since those defaults can be defined (by default...) if needed. No reason to add them explicitly.
# WARNING:
# cannot use any KeywordMapper derived instance here, otherwise conflicts with same OpenAPI keywords as children nodes
[docs]
class PropertyOAS(PermissiveMappingSchema):
[docs]
_type = TypeOAS(name="type", missing=drop) # not present if top-most schema is {allOf,anyOf,oneOf,not}
[docs]
default = AnyValueOAS(unknown="preserve", missing=drop)
[docs]
example = AnyValueOAS(unknown="preserve", missing=drop)
[docs]
title = ExtendedSchemaNode(String(), missing=drop)
[docs]
description = ExtendedSchemaNode(String(), missing=drop)
[docs]
enum = EnumOAS(missing=drop)
[docs]
items = PseudoObjectOAS(name="items", missing=drop)
[docs]
required = RequiredOAS(missing=drop)
[docs]
nullable = ExtendedSchemaNode(Boolean(), missing=drop)
[docs]
deprecated = ExtendedSchemaNode(Boolean(), missing=drop)
[docs]
read_only = ExtendedSchemaNode(Boolean(), name="readOnly", missing=drop)
[docs]
write_only = ExtendedSchemaNode(Boolean(), name="writeOnly", missing=drop)
[docs]
multiple_of = MultipleOfOAS(name="multipleOf", missing=drop, validator=BoundedRange(min=0, exclusive_min=True))
[docs]
minimum = ExtendedSchemaNode(Integer(), name="minimum", missing=drop, validator=Range(min=0)) # default=0
[docs]
maximum = ExtendedSchemaNode(Integer(), name="maximum", missing=drop, validator=Range(min=0))
[docs]
exclusive_min = ExtendedSchemaNode(Boolean(), name="exclusiveMinimum", missing=drop) # default=False
[docs]
exclusive_max = ExtendedSchemaNode(Boolean(), name="exclusiveMaximum", missing=drop) # default=False
[docs]
min_length = ExtendedSchemaNode(Integer(), name="minLength", missing=drop, validator=Range(min=0)) # default=0
[docs]
max_length = ExtendedSchemaNode(Integer(), name="maxLength", missing=drop, validator=Range(min=0))
[docs]
pattern = ExtendedSchemaNode(Integer(), missing=drop)
[docs]
min_items = ExtendedSchemaNode(Integer(), name="minItems", missing=drop, validator=Range(min=0)) # default=0
[docs]
max_items = ExtendedSchemaNode(Integer(), name="maxItems", missing=drop, validator=Range(min=0))
[docs]
unique_items = ExtendedSchemaNode(Boolean(), name="uniqueItems", missing=drop) # default=False
[docs]
min_prop = ExtendedSchemaNode(Integer(), name="minProperties", missing=drop, validator=Range(min=0)) # default=0
[docs]
max_prop = ExtendedSchemaNode(Integer(), name="maxProperties", missing=drop, validator=Range(min=0))
[docs]
content_type = ExtendedSchemaNode(String(), name="contentMediaType", missing=drop)
[docs]
content_encode = ExtendedSchemaNode(String(), name="contentEncoding", missing=drop)
[docs]
content_schema = ExtendedSchemaNode(String(), name="contentSchema", missing=drop)
[docs]
_not_key = PseudoObjectOAS(name="not", title="not", missing=drop)
[docs]
_all_of = KeywordObjectOAS(name="allOf", missing=drop)
[docs]
_any_of = KeywordObjectOAS(name="anyOf", missing=drop)
[docs]
_one_of = KeywordObjectOAS(name="oneOf", missing=drop)
[docs]
x_props = AdditionalPropertiesOAS(name="additionalProperties", missing=drop)
[docs]
properties = PermissiveMappingSchema(missing=drop) # cannot do real recursive definitions, simply check mapping
# this class is only to avoid conflicting names with keyword mappers
[docs]
class AnyPropertyOAS(OneOfKeywordSchema):
[docs]
_one_of = [
ReferenceOAS(),
PropertyOAS(),
]
[docs]
class ObjectPropertiesOAS(ExtendedMappingSchema):
[docs]
property_name = AnyPropertyOAS(
variable="{property-name}",
description="Named of the property being defined under the OpenAPI object.",
)
# would not need this if we could do explicit recursive definitions but at the very least, validate that when an
# object type is specified, its properties are as well and are slightly more specific than permissive mapping
[docs]
class ObjectOAS(NotKeywordSchema, ExtendedMappingSchema):
[docs]
_type = TypeOAS(name="type", missing=drop, validator=OneOf(OAS_COMPLEX_TYPES))
[docs]
properties = ObjectPropertiesOAS() # required and more specific contrary to 'properties' in 'PropertyOAS'
# since we redefine 'properties', do not cause validation error for 'oneOf'
[docs]
class DefinitionOAS(AnyOfKeywordSchema):
[docs]
_any_of = [
ObjectOAS(),
PropertyOAS(), # for top-level keyword schemas {allOf, anyOf, oneOf, not}
]
[docs]
class OAS(OneOfKeywordSchema):
[docs]
description = "OpenAPI schema definition."
# _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/schema.yaml" # definition used by OAP, but JSON-schema is more accurate
[docs]
_schema = "http://json-schema.org/draft-07/schema#"
[docs]
_one_of = [
ReferenceOAS(),
DefinitionOAS(),
]
[docs]
class MinOccursDefinition(OneOfKeywordSchema):
[docs]
description = "Minimum amount of values required for this input."
[docs]
_one_of = [
ExtendedSchemaNode(Integer(), validator=Range(min=0), title="MinOccurs.integer",
ddescription="Positive integer."),
ExtendedSchemaNode(String(), validator=StringRange(min=0), pattern="^[0-9]+$", title="MinOccurs.string",
description="Numerical string representing a positive integer."),
]
[docs]
class MaxOccursDefinition(OneOfKeywordSchema):
[docs]
description = "Maximum amount of values allowed for this input."
[docs]
_one_of = [
ExtendedSchemaNode(Integer(), validator=Range(min=0), title="MaxOccurs.integer",
description="Positive integer."),
ExtendedSchemaNode(String(), validator=StringRange(min=0), pattern="^[0-9]+$", title="MaxOccurs.string",
description="Numerical string representing a positive integer."),
ExtendedSchemaNode(String(), validator=OneOf(["unbounded"]), title="MaxOccurs.unbounded",
description="Special value indicating no limit to occurrences."),
]
[docs]
class DescribeMinMaxOccurs(ExtendedMappingSchema):
[docs]
minOccurs = MinOccursDefinition()
[docs]
maxOccurs = MaxOccursDefinition()
[docs]
class DeployMinMaxOccurs(ExtendedMappingSchema):
# entirely omitted definitions are permitted to allow inference from fields in package (CWL) or using defaults
# if explicitly provided though, schema format and values should be validated
# - do not use 'missing=drop' to ensure we raise provided invalid value instead of ignoring it
# - do not use any specific value (e.g.: 1) for 'default' such that we do not inject an erroneous value when it
# was originally omitted, since it could be resolved differently depending on matching CWL inputs definitions
[docs]
minOccurs = MinOccursDefinition(default=null, missing=null)
[docs]
maxOccurs = MaxOccursDefinition(default=null, missing=null)
# does not inherit from 'DescriptionLinks' because other 'ProcessDescription<>' schema depend on this without 'links'
[docs]
class ProcessDescriptionType(DescriptionBase, DescriptionExtra):
[docs]
id = ProcessIdentifierTag()
[docs]
version = Version(missing=None, default=None, example="1.2.3")
[docs]
mutable = ExtendedSchemaNode(Boolean(), default=True, description=(
"Indicates if the process is mutable (dynamically deployed), or immutable (builtin with this instance)."
))
[docs]
class OutputIdentifierType(ExtendedMappingSchema):
[docs]
id = AnyIdentifier(description=IO_INFO_IDS.format(first="WPS", second="CWL", what="output"))
[docs]
class AnyCRS(AnyOfKeywordSchema):
# note:
# other CRS exist (EGM, NAVD, NAD, etc.)
# however, only support EPSG (short form, normative from, or URI) that are supported by 'owslib.crs'
# handle equivalent representations of EPSG:4326 that are also supported by 'owslib.crs'
[docs]
_any_of = [
ExtendedSchemaNode(String(), pattern=re.compile(r"^urn:ogc:def:crs:EPSG::?[0-9]{4,5}$")),
ExtendedSchemaNode(String(), pattern=re.compile(r"^\[?EPSG::?[0-9]{4,5}\]?$")),
ExtendedSchemaNode(String(), pattern=re.compile(r"^https?://www\.opengis\.net/def/crs/EPSG/0/[0-9]{4,5}$")),
ExtendedSchemaNode(String(), validator=OneOf([
# equivalent forms of EPSG:4326, 2D or 3D
"https://www.opengis.net/def/crs/OGC/1.3/CRS84",
"http://www.opengis.net/def/crs/OGC/1.3/CRS84",
"https://www.opengis.net/def/crs/OGC/0/CRS84h",
"http://www.opengis.net/def/crs/OGC/0/CRS84h",
"https://www.opengis.net/def/crs/OGC/0/CRS84",
"http://www.opengis.net/def/crs/OGC/0/CRS84",
"urn:ogc:def:crs:OGC:2:84",
"WGS84",
])),
]
[docs]
default = OGC_API_BBOX_EPSG
[docs]
class AnyFilterExpression(AnyOfKeywordSchema):
[docs]
_any_of = [
PermissiveMappingSchema(),
PermissiveSequenceSchema(validator=Length(min=1)),
ExtendedSchemaNode(String(), validator=Length(min=1)),
]
[docs]
class AnyFilterLanguage(ExtendedSchemaNode):
[docs]
validator = OneOfCaseInsensitive([
"cql2-json",
"cql2-text",
"cql",
"cql-text",
"cql-json",
"ecql",
"simple-cql",
"fes",
"jfe"
])
[docs]
summary = "Filter expression language to use for parsing."
[docs]
description = (
"Filter expression language to use for parsing. "
"Supports multiple variants of OGC Common Query Language (CQL), "
"Filter Expression Standard (FES), or JSON Filter Expression (JFE). "
"Values are case-insensitive."
)
[docs]
def deserialize(self, cstruct):
# type: (Any) -> Union[str, colander.null]
if isinstance(cstruct, str):
cstruct = cstruct.lower()
return super().deserialize(cstruct)
[docs]
class FilterSchema(ExtendedMappingSchema):
# note:
# defer format and parsing to 'pygeofilter'
# to accommodate all filter expression representations, string, array and object must be allowed
[docs]
filter = AnyFilterExpression(
description="Filter expression according to the specified parsing language.",
missing=drop, # optional since combined within other JSON schema definitions
)
[docs]
filter_crs = AnyCRS(
name="filter-crs",
missing=drop,
default=drop, # override to avoid injecting it by default, remote server could use a different default
description="Coordinate Reference System for provided spatial properties.",
)
[docs]
filter_lang = AnyFilterLanguage(
name="filter-lang",
missing=drop,
default=drop, # override to avoid injecting it by default, remote server could use a different default
description=AnyFilterLanguage.description + (
" If unspecified, the filter language will default to CQL2-Text if a string was provided,"
" or CQL2-JSON if a JSON object or array structure is detected as the filter."
" For any other language, or to resolve ambiguous cases such as a CQL2-JSON encoded as literal string,"
" the filter language must be specified explicitly."
)
)
@staticmethod
@json_hashable
@cache
[docs]
def parse(filter_expr, filter_lang):
# type: (Union[JSON, str], str) -> FilterAstType
parsed_expr = None
if filter_lang == "cql2-json":
parsed_expr = cql2_json.parse(filter_expr)
elif filter_lang == "cql2-text":
parsed_expr = cql2_text.parse(filter_expr)
elif filter_lang == "cql-json":
parsed_expr = cql_json.parse(filter_expr)
elif filter_lang in ["cql", "cql-text", "ecql", "simple-cql"]:
parsed_expr = ecql.parse(filter_expr)
elif filter_lang == "fes":
parsed_expr = fes.parse(filter_expr)
elif filter_lang == "jfe":
parsed_expr = jfe.parse(filter_expr)
if not parsed_expr:
raise colander.Invalid(
node=AnyFilterLanguage(),
msg="Unresolved filter expression language.",
value={"filter-lang": filter_lang},
)
return parsed_expr
[docs]
def validate(self, filter_expr, filter_lang):
# type: (Union[JSON, str], str) -> FilterAstType
try:
return self.parse(filter_expr, filter_lang)
except (TypeError, ValueError) as exc:
raise colander.Invalid(
node=self,
msg="Invalid filter expression could not be parsed against specified language.",
value={"filter": filter_expr, "filter-lang": filter_lang},
) from exc
[docs]
def convert(self, filter_expr, filter_lang):
# type: (Union[JSON, str], str) -> JSON
try:
parsed_expr = self.validate(filter_expr, filter_lang)
return to_cql2(parsed_expr)
except (TypeError, ValueError, NotImplementedError) as exc:
raise colander.Invalid(
node=self,
msg="Invalid filter expression could not be interpreted.",
value={"filter": filter_expr, "filter-lang": filter_lang},
) from exc
[docs]
def deserialize(self, cstruct):
# type: (JSON) -> Union[JSON, colander.null]
result = super().deserialize(cstruct)
if not result:
return result
filter_expr = result.get("filter")
filter_lang = result.get("filter-lang")
if filter_expr in [null, drop, None]: # explicit "", {}, [] should be raised as invalid since dropped
if "filter" in cstruct:
raise colander.Invalid(
node=self,
msg="Invalid filter expression could not be interpreted.",
value={"filter": repr_json(cstruct["filter"]), "filter-lang": filter_lang},
)
filter_crs = cstruct.get("filter-crs")
filter_lang = cstruct.get("filter-lang")
if filter_crs or filter_lang:
raise colander.Invalid(
node=self,
msg="Missing filter expression provided with CRS and/or language parameters.",
value={"filter-crs": filter_crs, "filter-lang": filter_lang},
)
return result
if not filter_lang:
filter_lang = "cql2-text" if isinstance(filter, str) else "cql2-json"
# perform conversion to validate
# but don't return the converted CQL2-JSON to preserve the original definition where called (storage/dispatch)
# conversion can be done as needed to obtain a uniform representation locally
self.convert(filter_expr, filter_lang)
return result
[docs]
class SortByExpression(ExpandStringList, ExtendedSchemaNode):
[docs]
example = "arg1,prop2,+asc,-desc"
[docs]
description = (
"Comma-separated list of sorting fields. "
"Each field can be prefixed by +/- for ascending or descending sort order."
)
[docs]
class SortBySchema(ExtendedMappingSchema):
[docs]
sort_by_lower = SortByExpression(name="sortby", missing=drop)
[docs]
sort_by_upper = SortByExpression(name="sortBy", missing=drop)
[docs]
def deserialize(self, cstruct):
# type: (JSON) -> Union[JSON, colander.null]
"""
Resolve the upper/lower variant representation.
Consider that this schema could be integrated with another.
Therefore, additional fields must be left untouched.
"""
result = super().deserialize(cstruct)
if not result:
return result
if result.get("sortby"):
# keep only 'official' "sortBy" from OGC API Processes
# others OGC APIs use "sortby", but their query parameters are usually case-insensitive
if not result.get("sortBy"):
result["sortBy"] = result["sortby"]
del result["sortby"]
return result
[docs]
class SupportedCRS(ExtendedMappingSchema):
[docs]
crs = AnyCRS(title="CRS", description="Coordinate Reference System")
[docs]
default = ExtendedSchemaNode(Boolean(), missing=drop)
[docs]
class SupportedCRSList(ExtendedSequenceSchema):
[docs]
crs = SupportedCRS(title="SupportedCRS")
[docs]
class AnyLiteralType(OneOfKeywordSchema):
"""
Submitted values that correspond to literal data.
.. seealso::
- :class:`AnyLiteralDataType`
- :class:`AnyLiteralValueType`
- :class:`AnyLiteralDefaultType`
"""
[docs]
_one_of = [
ExtendedSchemaNode(
Float(),
title="LiteralDataFloat",
description="Literal data type representing a floating point number.",
),
ExtendedSchemaNode(
Integer(),
title="LiteralDataInteger",
description="Literal data type representing an integer number.",
),
ExtendedSchemaNode(
Boolean(),
title="LiteralDataBoolean",
description="Literal data type representing a boolean flag.",
),
ExtendedSchemaNode(
# pylint: disable=C0301,line-too-long
# FIXME: support byte/binary type (string + format:byte) ?
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/binaryInputValue.yaml
# see if we can use 'encoding' parameter available for below 'String' schema-type to handle this?
String(allow_empty=True), # valid to submit a process with empty string
title="LiteralDataString",
description="Literal data type representing a generic string.",
),
ExtendedSchemaNode(
NoneType(),
title="LiteralDataNoneType",
description="Literal data type representing a null value.",
)
]
[docs]
class Number(OneOfKeywordSchema):
"""
Represents a literal number, integer or float.
"""
[docs]
_one_of = [
ExtendedSchemaNode(Float()),
ExtendedSchemaNode(Integer()),
]
[docs]
class NumericType(OneOfKeywordSchema):
"""
Represents a numeric-like value.
"""
[docs]
_one_of = [
ExtendedSchemaNode(Float()),
ExtendedSchemaNode(Integer()),
ExtendedSchemaNode(String(), pattern="^[0-9]+$"),
]
[docs]
class DecimalType(ExtendedSchemaNode):
[docs]
schema_type = colander.Decimal
[docs]
class PositiveNumber(AnyOfKeywordSchema):
"""
Represents a literal number, integer or float, of positive value.
"""
[docs]
_any_of = [
DecimalType(validator=Range(min=0.0)),
ExtendedSchemaNode(Float(), validator=Range(min=0.0)),
ExtendedSchemaNode(Integer(), validator=Range(min=0)),
]
[docs]
class LiteralReference(ExtendedMappingSchema):
[docs]
reference = ExecuteReferenceURL()
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1707-L1716
[docs]
class NameReferenceType(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/nameReferenceType.yaml"
[docs]
_schema_include_deserialize = False
[docs]
name = ExtendedSchemaNode(String(), description="Name of the entity definition.")
[docs]
reference = ReferenceURL(missing=drop, description="Reference URL to schema definition of the named entity.")
[docs]
class DataTypeSchema(NameReferenceType):
[docs]
description = "Type of the literal data representation."
# any named type that can be converted by: 'weaver.processes.convert.any2wps_literal_datatype'
[docs]
name = ExtendedSchemaNode(String(), validator=OneOf(list(WPS_LITERAL_DATA_TYPES)))
[docs]
class UomSchema(NameReferenceType):
[docs]
title = "UnitOfMeasure"
[docs]
name = ExtendedSchemaNode(
String(),
description="Name of the entity definition.",
missing=drop, # override to make optional in contrat to 'NameReferenceType'
)
[docs]
uom = ExtendedSchemaNode(
String(allow_empty=True), # unit/dimension-less value
description="Unit applicable for the corresponding measurement representation.",
)
[docs]
class SupportedUoM(ExtendedSequenceSchema):
[docs]
description = "List of supported units for the represented measurement."
[docs]
validator = Length(min=1)
[docs]
class MeasurementDataDomain(ExtendedMappingSchema):
[docs]
supported = SupportedUoM()
[docs]
default = UomSchema(missing=drop)
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1423
# NOTE: Original is only 'string', but we allow any literal type
[docs]
class AllowedValuesList(ExtendedSequenceSchema):
[docs]
value = AnyLiteralType()
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1772-L1787
# NOTE:
# Contrary to original schema where all fields are 'string', we allow any literal type as well since those make more
# sense when parsing corresponding data values (eg: float, integer, bool).
[docs]
class AllowedRange(ExtendedMappingSchema):
[docs]
minimumValue = NumericType(missing=drop)
[docs]
maximumValue = NumericType(missing=drop)
[docs]
spacing = NumericType(missing=drop)
[docs]
rangeClosure = ExtendedSchemaNode(String(), missing=drop,
validator=OneOf(["closed", "open", "open-closed", "closed-open"]))
[docs]
class AllowedRangesList(ExtendedSequenceSchema):
[docs]
class AllowedValues(OneOfKeywordSchema):
[docs]
_one_of = [
AllowedRangesList(description="List of value ranges and constraints."), # array of {range}
AllowedValuesList(description="List of enumerated allowed values."), # array of "value"
ExtendedSchemaNode(String(), description="Single allowed value."), # single "value"
]
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1425-L1430
[docs]
class AnyValue(ExtendedMappingSchema):
[docs]
anyValue = ExtendedSchemaNode(
Boolean(), missing=drop, default=True,
description="Explicitly indicate if any value is allowed. "
"This is the default behaviour if no other constrains are specified."
)
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1801-L1803
[docs]
class ValuesReference(ExecuteReferenceURL):
[docs]
description = "URL where to retrieve applicable values."
[docs]
class ArrayLiteralType(ExtendedSequenceSchema):
[docs]
value_item = AnyLiteralType()
[docs]
class ArrayLiteralDataType(ExtendedMappingSchema):
[docs]
data = ArrayLiteralType()
[docs]
class ArrayLiteralValueType(ExtendedMappingSchema):
[docs]
value = ArrayLiteralType()
[docs]
class AnyLiteralDataType(ExtendedMappingSchema):
[docs]
data = AnyLiteralType()
[docs]
class AnyLiteralValueType(ExtendedMappingSchema):
[docs]
value = AnyLiteralType()
[docs]
class AnyLiteralDefaultType(ExtendedMappingSchema):
[docs]
default = AnyLiteralType()
[docs]
class LiteralDataValueDefinition(OneOfKeywordSchema):
[docs]
_one_of = [
AllowedValues(description="Constraints of allowed values."),
ValuesReference(description="Reference URL where to retrieve allowed values."),
# 'AnyValue' must be last because it's the most permissive (always valid, default)
AnyValue(description="Permissive definition for any allowed value."),
]
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1675-L1688
# literalDataDomain:
# valueDefinition: oneOf(<allowedValues, anyValue, valuesReference>)
# defaultValue: <string>
# dataType: <nameReferenceType>
# uom: <nameReferenceType>
[docs]
class LiteralDataDomain(ExtendedMappingSchema):
[docs]
default = ExtendedSchemaNode(Boolean(), default=True,
description="Indicates if this literal data domain definition is the default one.")
[docs]
defaultValue = AnyLiteralType(missing=drop, description="Default value to employ if none was provided.")
[docs]
dataType = DataTypeSchema(missing=drop, description="Type name and reference of the literal data representation.")
[docs]
valueDefinition = LiteralDataValueDefinition(description="Literal data domain constraints.")
[docs]
uoms = MeasurementDataDomain(name="UOMs", missing=drop, description="Unit of measure applicable for the data.")
[docs]
class LiteralDataDomainList(ExtendedSequenceSchema):
"""
Constraints that apply to the literal data values.
"""
[docs]
literalDataDomain = LiteralDataDomain()
# https://github.com/opengeospatial/ogcapi-processes/blob/e6893b/extensions/workflows/openapi/workflows.yaml#L1689-L1697
[docs]
class LiteralDataType(NotKeywordSchema, ExtendedMappingSchema):
# NOTE:
# Apply 'missing=drop' although original schema of 'literalDataDomains' (see link above) requires it because
# we support omitting it for minimalistic literal input definition.
# This is because our schema validation allows us to do detection of 'basic' types using the literal parsing.
# Because there is not explicit requirement though (ie: missing would fail schema validation), we must check
# that 'format' is not present to avoid conflict with minimalistic literal data definition in case of ambiguity.
[docs]
literalDataDomains = LiteralDataDomainList(missing=drop)
[docs]
_not = [
DescribeWithFormats,
]
# Different definition than 'Describe' such that nested 'complex' type 'formats' can be validated and backward
# compatible with pre-existing/deployed/remote processes, with either ``mediaType`` and ``mimeType`` formats.
# for [{id: "", ...}] representation within ProcessDescription (OLD schema)
# for {"<id>": {...}} representation within ProcessDescription (OGC schema)
# for [{id: "", ...}] representation within ProcessDeployment (OLD schema)
# for {"<id>": {...}} representation within ProcessDeployment (OGC schema)
[docs]
class LiteralOutputType(LiteralDataType):
pass
[docs]
class BoundingBoxOutputType(ExtendedMappingSchema):
[docs]
supportedCRS = SupportedCRSList()
[docs]
class DescribeComplexOutputType(DescribeWithFormats):
pass
[docs]
class DeployComplexOutputType(DeployWithFormats):
pass
[docs]
class DescribeOutputTypeDefinition(OneOfKeywordSchema):
[docs]
_one_of = [
BoundingBoxOutputType,
DescribeComplexOutputType, # should be 2nd to last because very permissive, but requires formats at least
LiteralOutputType, # must be last because it's the most permissive (all can default if omitted)
]
[docs]
class DeployOutputTypeDefinition(OneOfKeywordSchema):
[docs]
_one_of = [
BoundingBoxOutputType,
DeployComplexOutputType, # should be 2nd to last because very permissive, but requires formats at least
LiteralOutputType, # must be last because it's the most permissive (all can default if omitted)
]
[docs]
class DescribeOutputType(AllOfKeywordSchema):
[docs]
_all_of = [
DescriptionType(),
InputOutputDescriptionMeta(),
InputOutputDescriptionSchema(),
DescribeOutputTypeDefinition(),
]
[docs]
_sort_first = PROCESS_IO_FIELD_FIRST
[docs]
_sort_after = PROCESS_IO_FIELD_AFTER
[docs]
class DescribeOutputTypeWithID(OutputIdentifierType, DescribeOutputType):
pass
[docs]
class DescribeOutputTypeList(ExtendedSequenceSchema):
"""
Listing of process outputs descriptions.
"""
[docs]
output = DescribeOutputTypeWithID()
# for {"<id>": {...}} representation within ProcessDescription (OGC schema)
[docs]
class DescribeOutputTypeMap(PermissiveMappingSchema):
"""
Definition of all process outputs under mapping.
"""
[docs]
output_id = DescribeOutputType(
variable="{output-id}", title="ProcessOutputDefinition",
description="Output definition under mapping of process description."
)
# Different definition than 'Describe' such that nested 'complex' type 'formats' can be validated and backward
# compatible with pre-existing/deployed/remote processes, with either ``mediaType`` and ``mimeType`` formats.
[docs]
class DeployOutputType(AllOfKeywordSchema):
[docs]
_all_of = [
DeploymentType(),
InputOutputDescriptionMeta(),
InputOutputDescriptionSchema(),
DeployOutputTypeDefinition(),
]
[docs]
_sort_first = PROCESS_IO_FIELD_FIRST
[docs]
_sort_after = PROCESS_IO_FIELD_AFTER
[docs]
class DeployOutputTypeWithID(OutputIdentifierType, DeployOutputType):
pass
# for [{id: "", ...}] representation within ProcessDeployment (OLD schema)
[docs]
class DeployOutputTypeList(ExtendedSequenceSchema):
"""
Listing of process output definitions to deploy.
"""
# for {"<id>": {...}} representation within ProcessDeployment (OGC schema)
[docs]
class DeployOutputTypeMap(PermissiveMappingSchema):
"""
Definition of all process outputs under mapping.
"""
[docs]
class DeployOutputTypeAny(OneOfKeywordSchema):
[docs]
_one_of = [
DeployOutputTypeList,
DeployOutputTypeMap,
]
[docs]
class JobExecuteModeEnum(ExtendedSchemaNode):
# _schema: none available by itself, legacy parameter that was directly embedded in 'execute.yaml'
# (https://github.com/opengeospatial/ogcapi-processes/blob/1.0-draft.5/core/openapi/schemas/execute.yaml)
[docs]
title = "JobExecuteMode"
# no default to enforce required input as per OGC-API schemas
# default = EXECUTE_MODE_AUTO
[docs]
example = ExecuteMode.ASYNC
[docs]
description = (
"Desired execution mode specified directly. This is intended for backward compatibility support. "
"To obtain more control over execution mode selection, employ the official Prefer header instead "
f"(see for more details: {DOC_URL}/processes.html#execution-mode)."
)
[docs]
validator = OneOf(ExecuteMode.values())
[docs]
class JobControlOptionsEnum(ExtendedSchemaNode):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/jobControlOptions.yaml"
[docs]
title = "JobControlOptions"
[docs]
default = ExecuteControlOption.ASYNC
[docs]
example = ExecuteControlOption.ASYNC
[docs]
validator = OneOf(ExecuteControlOption.values())
[docs]
class JobResponseOptionsEnum(ExtendedSchemaNode):
# _schema: none available by itself, legacy parameter that was directly embedded in 'execute.yaml'
# (https://github.com/opengeospatial/ogcapi-processes/blob/1.0-draft.6/core/openapi/schemas/execute.yaml)
[docs]
title = "JobResponseOptions"
# no default to enforce required input as per OGC-API schemas
# default = ExecuteResponse.DOCUMENT
[docs]
example = ExecuteResponse.DOCUMENT
[docs]
description = (
"Indicates the desired representation format of the response. "
f"(see for more details: {DOC_URL}/processes.html#execution-body)."
)
[docs]
validator = OneOf(ExecuteResponse.values())
[docs]
class TransmissionModeEnum(ExtendedSchemaNode):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/transmissionMode.yaml"
[docs]
title = "TransmissionMode"
# no default to allow auto-resolution as data/link if omitted
# default = ExecuteTransmissionMode.VALUE
[docs]
example = ExecuteTransmissionMode.VALUE
[docs]
validator = OneOf(ExecuteTransmissionMode.values())
[docs]
class JobStatusEnum(ExtendedSchemaNode):
[docs]
_schema = f"{OGC_API_PROC_PART1_PARAMETERS}/status.yaml" # subset of this implementation
[docs]
default = Status.ACCEPTED
[docs]
example = Status.ACCEPTED
[docs]
validator = OneOf(JOB_STATUS_CODE_API)
[docs]
class JobStatusCreate(ExtendedSchemaNode):
[docs]
validator = OneOf(["create"])
[docs]
class JobStatusSearchEnum(ExtendedSchemaNode):
[docs]
title = "JobStatusSearch"
[docs]
default = Status.ACCEPTED
[docs]
example = Status.ACCEPTED
[docs]
validator = StringOneOf(JOB_STATUS_SEARCH_API, delimiter=",", case_sensitive=False)
[docs]
class JobTypeEnum(ExtendedSchemaNode):
[docs]
_schema = f"{OGC_API_PROC_PART1_PARAMETERS}/type.yaml" # subset of this implementation
[docs]
validator = OneOf(["process", "provider", "service"])
[docs]
class JobTitle(ExtendedSchemaNode):
[docs]
description = "Title assigned to the job for user-readable identification."
[docs]
validator = Length(min=1)
[docs]
class JobSortEnum(ExtendedSchemaNode):
[docs]
title = "JobSortingMethod"
[docs]
validator = OneOf(SortMethods.JOB)
[docs]
class ProcessSortEnum(ExtendedSchemaNode):
[docs]
title = "ProcessSortMethod"
[docs]
validator = OneOf(SortMethods.PROCESS)
[docs]
class QuoteSortEnum(ExtendedSchemaNode):
[docs]
title = "QuoteSortingMethod"
[docs]
validator = OneOf(SortMethods.QUOTE)
[docs]
class JobGroupsCommaSeparated(ExpandStringList, ExtendedSchemaNode):
[docs]
example = "process,service"
[docs]
description = "Comma-separated list of grouping fields with which to list jobs."
[docs]
validator = StringOneOf(["process", "provider", "service", "status"], delimiter=",", case_sensitive=True)
[docs]
class JobExecuteSubscribers(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/subscriber.yaml"
[docs]
description = "Optional URIs for callbacks for this job."
# basic OGC subscribers
[docs]
success_uri = URL(
name="successUri",
description="Location where to POST the job results on successful completion.",
# allow omitting against official schema to support partial use/update
# (see https://github.com/opengeospatial/ogcapi-processes/issues/460)
missing=drop,
)
[docs]
failed_uri = URL(
name="failedUri",
description="Location where to POST the job status if it fails execution.",
missing=drop,
)
[docs]
in_progress_uri = URL(
name="inProgressUri",
description="Location where to POST the job status once it starts execution.",
missing=drop,
)
# additional subscribers
[docs]
success_email = Email(
name="successEmail",
description="Email recipient to send a notification on successful job completion.",
missing=drop,
)
[docs]
failed_email = Email(
name="failedEmail",
description="Email recipient to send a notification on failed job completion.",
missing=drop,
)
[docs]
in_progress_email = Email(
name="inProgressEmail",
description="Email recipient to send a notification of job status once it starts execution.",
missing=drop,
)
[docs]
class LaunchJobQuerystring(ExtendedMappingSchema):
[docs]
class VisibilityValue(ExtendedSchemaNode):
[docs]
validator = OneOf(Visibility.values())
[docs]
example = Visibility.PUBLIC
[docs]
class JobAccess(VisibilityValue):
pass
[docs]
class VisibilitySchema(ExtendedMappingSchema):
[docs]
value = VisibilityValue()
[docs]
class QuoteEstimatorConfigurationSchema(ExtendedMappingSchema):
[docs]
_schema = f"{WEAVER_SCHEMA_URL}/quotation/quote-estimator.yaml#/definitions/Configuration"
[docs]
description = "Quote Estimator Configuration"
[docs]
def deserialize(self, cstruct):
schema = ExtendedMappingSchema(_schema=self._schema) # avoid recursion
return validate_node_schema(schema, cstruct)
[docs]
class QuoteEstimatorWeightedParameterSchema(ExtendedMappingSchema):
# NOTE:
# value/size parameters omitted since they will be provided at runtime by the
# quote estimation job obtained from submitted body in 'QuoteProcessParametersSchema'
[docs]
weight = ExtendedSchemaNode(
Float(),
default=1.0,
missing=drop,
description="Weight attributed to this parameter when submitted for quote estimation.",
)
[docs]
class QuoteEstimatorSchema(ExtendedMappingSchema):
[docs]
_schema = f"{WEAVER_SCHEMA_URL}/quotation/quote-estimator.yaml"
[docs]
description = "Configuration of the quote estimation algorithm for a given process."
[docs]
config = QuoteEstimatorConfigurationSchema()
[docs]
outputs = QuoteEstimatorOutputParametersSchema(missing=drop, default={})
#########################################################
# Path parameter definitions
#########################################################
[docs]
class LocalProcessQuery(ExtendedMappingSchema):
[docs]
version = Version(example="1.2.3", missing=drop, description=(
"Specific process version to locate. "
"If process ID was requested with tagged 'id:version' revision format, this parameter is ignored."
))
[docs]
class LocalProcessPath(ExtendedMappingSchema):
[docs]
process_id = ProcessIdentifierTag(
example="jsonarray2netcdf[:1.0.0]",
summary="Process identifier with optional tag version.",
description=(
"Process identifier with optional tag version. "
"If tag is omitted, the latest version of that process is assumed. "
"Otherwise, the specific process revision as 'id:version' must be matched. "
"Alternatively, the plain process ID can be specified in combination to 'version' query parameter."
),
)
[docs]
class ProviderPath(ExtendedMappingSchema):
[docs]
provider_id = AnyIdentifier(description="Remote provider identifier.", example="hummingbird")
[docs]
class ProviderProcessPath(ProviderPath):
# note: Tag representation not allowed in this case
[docs]
process_id = ProcessIdentifier(example="provider-process", description=(
"Identifier of a process that is offered by the remote provider."
))
[docs]
class JobPath(ExtendedMappingSchema):
[docs]
job_id = UUID(description="Job ID", example="14c68477-c3ed-4784-9c0f-a4c9e1344db5")
[docs]
class BillPath(ExtendedMappingSchema):
[docs]
bill_id = UUID(description="Bill ID")
[docs]
class QuotePath(ExtendedMappingSchema):
[docs]
quote_id = UUID(description="Quote ID")
[docs]
class ResultPath(ExtendedMappingSchema):
[docs]
result_id = UUID(description="Result ID")
#########################################################
# These classes define each of the endpoints parameters
#########################################################
[docs]
class FrontpageEndpoint(ExtendedMappingSchema):
[docs]
class VersionsEndpoint(ExtendedMappingSchema):
# FIXME: support YAML (https://github.com/crim-ca/weaver/issues/456)
[docs]
class OpenAPIEndpoint(ExtendedMappingSchema):
[docs]
class SwaggerUIEndpoint(ExtendedMappingSchema):
pass
[docs]
class RedocUIEndpoint(ExtendedMappingSchema):
pass
[docs]
class OWSNamespace(XMLObject):
[docs]
namespace = "http://www.opengis.net/ows/1.1"
[docs]
class WPSNamespace(XMLObject):
[docs]
namespace = "http://www.opengis.net/wps/1.0.0"
[docs]
class XMLNamespace(XMLObject):
[docs]
class XMLReferenceAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class MimeTypeAttribute(ExtendedSchemaNode, XMLObject):
[docs]
example = ContentType.APP_JSON
[docs]
class EncodingAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class OWSVersion(ExtendedSchemaNode, OWSNamespace):
[docs]
class OWSAcceptVersions(ExtendedSequenceSchema, OWSNamespace):
[docs]
description = "Accepted versions to produce the response."
[docs]
name = "AcceptVersions"
[docs]
class OWSLanguage(ExtendedSchemaNode, OWSNamespace):
[docs]
description = "Desired language to produce the response."
[docs]
default = AcceptLanguage.EN_US
[docs]
example = AcceptLanguage.EN_CA
[docs]
class OWSLanguageAttribute(OWSLanguage):
[docs]
description = "RFC-4646 language code of the human-readable text."
[docs]
class OWSService(ExtendedSchemaNode, OWSNamespace):
[docs]
description = "Desired service to produce the response (SHOULD be 'WPS')."
[docs]
default = AcceptLanguage.EN_US
[docs]
example = AcceptLanguage.EN_CA
[docs]
class WPSServiceAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class WPSVersionAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class WPSLanguageAttribute(ExtendedSchemaNode, XMLNamespace):
[docs]
default = AcceptLanguage.EN_US
[docs]
example = AcceptLanguage.EN_CA
[docs]
class WPSParameters(ExtendedMappingSchema):
[docs]
service = ExtendedSchemaNode(String(), example="WPS", description="Service selection.",
validator=OneOfCaseInsensitive(["WPS"]))
[docs]
request = ExtendedSchemaNode(String(), example="GetCapabilities", description="WPS operation to accomplish",
validator=OneOfCaseInsensitive(["GetCapabilities", "DescribeProcess", "Execute"]))
[docs]
version = Version(exaple="1.0.0", default="1.0.0", validator=OneOf(["1.0.0", "2.0.0", "2.0"]))
[docs]
identifier = ExtendedSchemaNode(String(), exaple="hello", missing=drop,
example="example-process,another-process",
description="Single or comma-separated list of process identifiers to describe, "
"and single one for execution.")
[docs]
class WPSOperationGetNoContent(ExtendedMappingSchema):
[docs]
description = "No content body provided (GET requests)."
[docs]
class WPSOperationPost(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/common/RequestBaseType.xsd"
[docs]
accepted_versions = OWSAcceptVersions(missing=drop, default="1.0.0")
[docs]
language = OWSLanguageAttribute(missing=drop)
[docs]
service = OWSService()
[docs]
class WPSGetCapabilitiesPost(WPSOperationPost, WPSNamespace):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/wpsGetCapabilities_request.xsd"
[docs]
name = "GetCapabilities"
[docs]
title = "GetCapabilities"
[docs]
class OWSIdentifier(ExtendedSchemaNode, OWSNamespace):
[docs]
class OWSProcessIdentifier(ProcessIdentifier, OWSNamespace):
pass
[docs]
class OWSProcessIdentifierList(ExtendedSequenceSchema, OWSNamespace):
[docs]
item = OWSProcessIdentifier()
[docs]
class OWSTitle(ExtendedSchemaNode, OWSNamespace):
[docs]
class OWSAbstract(ExtendedSchemaNode, OWSNamespace):
[docs]
class WPSDescribeProcessPost(WPSOperationPost, WPSNamespace):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/wpsDescribeProcess_request.xsd"
[docs]
name = "DescribeProcess"
[docs]
title = "DescribeProcess"
[docs]
identifier = OWSProcessIdentifierList(
description="Single or comma-separated list of process identifier to describe.",
example="example"
)
# FIXME: missing details about 'DataInputs'
[docs]
class WPSExecutePost(WPSOperationPost, WPSNamespace):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/wpsExecute_request.xsd"
[docs]
identifier = OWSProcessIdentifier(description="Identifier of the process to execute with data inputs.")
[docs]
dataInputs = WPSExecuteDataInputs(description="Data inputs to be provided for process execution.")
[docs]
class WPSRequestBody(OneOfKeywordSchema):
[docs]
_one_of = [
WPSExecutePost(),
WPSDescribeProcessPost(),
WPSGetCapabilitiesPost(),
]
[docs]
examples = {
"Execute": {
"summary": "Execute request example.",
"value": EXAMPLES["wps_execute_request.xml"]
}
}
[docs]
class WPSEndpointGet(ExtendedMappingSchema):
[docs]
querystring = WPSParameters()
[docs]
body = WPSOperationGetNoContent(missing=drop)
[docs]
class WPSEndpointPost(ExtendedMappingSchema):
[docs]
body = WPSRequestBody()
[docs]
class XMLBooleanAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class XMLString(ExtendedSchemaNode, XMLObject):
[docs]
class OWSString(ExtendedSchemaNode, OWSNamespace):
[docs]
class OWSKeywordList(ExtendedSequenceSchema, OWSNamespace):
[docs]
keyword = OWSString(name="Keyword", title="OWSKeyword", example="Weaver")
[docs]
class OWSType(ExtendedMappingSchema, OWSNamespace):
[docs]
additionalProperties = {
"codeSpace": {
"type": "string",
"example": "ISOTC211/19115",
"xml": {"attribute": True}
}
}
[docs]
class OWSPhone(ExtendedMappingSchema, OWSNamespace):
[docs]
voice = OWSString(name="Voice", title="OWSVoice", example="1-234-567-8910", missing=drop)
[docs]
facsimile = OWSString(name="Facsimile", title="OWSFacsimile", missing=drop)
[docs]
class OWSAddress(ExtendedMappingSchema, OWSNamespace):
[docs]
delivery_point = OWSString(name="DeliveryPoint", title="OWSDeliveryPoint",
example="123 Place Street", missing=drop)
[docs]
city = OWSString(name="City", title="OWSCity", example="Nowhere", missing=drop)
[docs]
country = OWSString(name="Country", title="OWSCountry", missing=drop)
[docs]
admin_area = OWSString(name="AdministrativeArea", title="AdministrativeArea", missing=drop)
[docs]
postal_code = OWSString(name="PostalCode", title="OWSPostalCode", example="A1B 2C3", missing=drop)
[docs]
email = OWSString(name="ElectronicMailAddress", title="OWSElectronicMailAddress",
example="mail@me.com", validator=EmailRegex, missing=drop)
[docs]
class OWSServiceProvider(ExtendedMappingSchema, OWSNamespace):
[docs]
description = "Details about the institution providing the service."
[docs]
name = "ServiceProvider"
[docs]
title = "ServiceProvider"
[docs]
provider_name = OWSString(name="ProviderName", title="OWSProviderName", example="EXAMPLE")
[docs]
provider_site = OWSString(name="ProviderName", title="OWSProviderName", example="http://schema-example.com")
[docs]
class WPSDescriptionType(ExtendedMappingSchema, OWSNamespace):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/common/DescriptionType.xsd"
[docs]
name = "DescriptionType"
[docs]
_title = OWSTitle(description="Title of the service.", example="Weaver")
[docs]
abstract = OWSAbstract(description="Detail about the service.", example="Weaver WPS example schema.", missing=drop)
[docs]
class OWSServiceIdentification(WPSDescriptionType, OWSNamespace):
[docs]
name = "ServiceIdentification"
[docs]
title = "ServiceIdentification"
[docs]
keywords = OWSKeywordList(name="Keywords")
[docs]
svc_type = OWSString(name="ServiceType", title="ServiceType", example="WPS")
[docs]
svc_type_ver1 = OWSString(name="ServiceTypeVersion", title="ServiceTypeVersion", example="1.0.0")
[docs]
svc_type_ver2 = OWSString(name="ServiceTypeVersion", title="ServiceTypeVersion", example="2.0.0")
[docs]
fees = OWSString(name="Fees", title="Fees", example="NONE", missing=drop, default="NONE")
[docs]
access = OWSString(name="AccessConstraints", title="AccessConstraints",
example="NONE", missing=drop, default="NONE")
[docs]
provider = OWSServiceProvider()
[docs]
class OWSOperationName(ExtendedSchemaNode, OWSNamespace):
[docs]
example = "GetCapabilities"
[docs]
validator = OneOf(["GetCapabilities", "DescribeProcess", "Execute"])
[docs]
class OperationLink(ExtendedSchemaNode, XMLObject):
[docs]
example = "http://schema-example.com/wps"
[docs]
class OperationRequest(ExtendedMappingSchema, OWSNamespace):
[docs]
class OWS_HTTP(ExtendedMappingSchema, OWSNamespace): # noqa: N802
[docs]
get = OperationRequest(name="Get", title="OWSGet")
[docs]
post = OperationRequest(name="Post", title="OWSPost")
[docs]
class OWS_DCP(ExtendedMappingSchema, OWSNamespace): # noqa: N802
[docs]
http = OWS_HTTP(name="HTTP", missing=drop)
[docs]
https = OWS_HTTP(name="HTTPS", missing=drop)
[docs]
class Operation(ExtendedMappingSchema, OWSNamespace):
[docs]
name = OWSOperationName()
[docs]
class ProcessVersion(ExtendedSchemaNode, WPSNamespace):
[docs]
class OWSProcessSummary(ExtendedMappingSchema, WPSNamespace):
[docs]
version = ProcessVersion(name="processVersion", default="None", example="1.2",
description="Version of the corresponding process summary.")
[docs]
identifier = OWSProcessIdentifier(example="example", description="Identifier to refer to the process.")
[docs]
_title = OWSTitle(example="Example Process", description="Title of the process.")
[docs]
abstract = OWSAbstract(example="Process for example schema.", description="Detail about the process.")
[docs]
class WPSProcessOfferings(ExtendedSequenceSchema, WPSNamespace):
[docs]
name = "ProcessOfferings"
[docs]
title = "ProcessOfferings"
[docs]
process = OWSProcessSummary(name="Process")
[docs]
class WPSLanguagesType(ExtendedSequenceSchema, WPSNamespace):
[docs]
title = "LanguagesType"
[docs]
lang = OWSLanguage(name="Language")
[docs]
class WPSLanguageSpecification(ExtendedMappingSchema, WPSNamespace):
[docs]
default = OWSLanguage(name="Default")
[docs]
supported = WPSLanguagesType(name="Supported")
[docs]
class WPSResponseBaseType(PermissiveMappingSchema, WPSNamespace):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/common/ResponseBaseType.xsd"
[docs]
service = WPSServiceAttribute()
[docs]
version = WPSVersionAttribute()
[docs]
lang = WPSLanguageAttribute()
[docs]
class WPSProcessVersion(ExtendedSchemaNode, WPSNamespace):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/common/ProcessVersion.xsd"
[docs]
description = "Release version of this Process."
[docs]
name = "processVersion"
[docs]
class WPSLiteralData(WPSLiteralInputType):
[docs]
class XMLStringCRS(AnyCRS, XMLObject):
pass
[docs]
class WPSSupportedCRS(ExtendedSequenceSchema):
[docs]
crs = WPSCRSsType(name="CRS")
[docs]
class WPSSupportedCRSType(ExtendedMappingSchema, WPSNamespace):
[docs]
name = "SupportedCRSsType"
[docs]
default = WPSCRSsType(name="Default")
[docs]
supported = WPSSupportedCRS(name="Supported")
[docs]
class WPSBoundingBoxData(ExtendedMappingSchema, XMLObject):
[docs]
data = WPSSupportedCRSType(name="BoundingBoxData")
[docs]
class WPSComplexData(ExtendedMappingSchema, XMLObject):
[docs]
data = WPSComplexInputType(name="ComplexData")
[docs]
class WPSMinOccursAttribute(MinOccursDefinition, XMLObject):
[docs]
class WPSMaxOccursAttribute(MinOccursDefinition, XMLObject):
[docs]
class WPSOutputDescriptionType(WPSDescriptionType):
[docs]
name = "OutputDescriptionType"
[docs]
title = "OutputDescriptionType"
[docs]
identifier = OWSIdentifier(description="Unique identifier of the output.")
# override below to have different examples/descriptions
[docs]
_title = OWSTitle(description="Human-readable representation of the process output.")
[docs]
abstract = OWSAbstract(missing=drop)
[docs]
class ProcessOutputs(ExtendedSequenceSchema, WPSNamespace):
[docs]
name = "ProcessOutputs"
[docs]
title = "ProcessOutputs"
[docs]
output = WPSOutputDescriptionType()
[docs]
class WPSGetCapabilities(WPSResponseBaseType):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/wpsGetCapabilities_response.xsd"
[docs]
title = "Capabilities" # not to be confused by 'GetCapabilities' used for request
[docs]
svc = OWSServiceIdentification()
[docs]
ops = OperationsMetadata()
[docs]
offering = WPSProcessOfferings()
[docs]
languages = WPSLanguageSpecification()
[docs]
class WPSProcessDescriptionType(WPSResponseBaseType, WPSProcessVersion):
[docs]
name = "ProcessDescriptionType"
[docs]
description = "Description of the requested process by identifier."
[docs]
store = XMLBooleanAttribute(name="storeSupported", example=True, default=True)
[docs]
status = XMLBooleanAttribute(name="statusSupported", example=True, default=True)
[docs]
outputs = ProcessOutputs()
[docs]
class WPSProcessDescriptionList(ExtendedSequenceSchema, WPSNamespace):
[docs]
name = "ProcessDescriptions"
[docs]
title = "ProcessDescriptions"
[docs]
description = "Listing of process description for every requested identifier."
[docs]
process = WPSProcessDescriptionType()
[docs]
class WPSDescribeProcess(WPSResponseBaseType):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/wpsDescribeProcess_response.xsd"
[docs]
name = "DescribeProcess"
[docs]
title = "DescribeProcess"
[docs]
process = WPSProcessDescriptionList()
[docs]
class WPSStatusLocationAttribute(ExtendedSchemaNode, XMLObject):
[docs]
name = "statusLocation"
[docs]
class WPSServiceInstanceAttribute(ExtendedSchemaNode, XMLObject):
[docs]
name = "serviceInstance"
[docs]
class CreationTimeAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class WPSStatusSuccess(ExtendedSchemaNode, WPSNamespace):
[docs]
name = "ProcessSucceeded"
[docs]
title = "ProcessSucceeded"
[docs]
class WPSStatusFailed(ExtendedSchemaNode, WPSNamespace):
[docs]
title = "ProcessFailed"
[docs]
class WPSStatus(ExtendedMappingSchema, WPSNamespace):
[docs]
creationTime = CreationTimeAttribute()
[docs]
status_success = WPSStatusSuccess(missing=drop)
[docs]
status_failed = WPSStatusFailed(missing=drop)
[docs]
class WPSProcessSummary(ExtendedMappingSchema, WPSNamespace):
[docs]
identifier = OWSProcessIdentifier()
[docs]
abstract = OWSAbstract(missing=drop)
[docs]
class WPSOutputBase(ExtendedMappingSchema):
[docs]
identifier = OWSIdentifier()
[docs]
abstract = OWSAbstract(missing=drop)
[docs]
class WPSOutputDefinitionItem(WPSOutputBase, WPSNamespace):
# use different title to avoid OpenAPI schema definition clash with 'Output' of 'WPSProcessOutputs'
[docs]
title = "OutputDefinition"
[docs]
class WPSOutputDefinitions(ExtendedSequenceSchema, WPSNamespace):
[docs]
name = "OutputDefinitions"
[docs]
title = "OutputDefinitions"
[docs]
out_def = WPSOutputDefinitionItem()
[docs]
class WPSOutputLiteral(ExtendedMappingSchema):
[docs]
class WPSReference(ExtendedMappingSchema, WPSNamespace):
[docs]
href = XMLReferenceAttribute()
[docs]
mimeType = MimeTypeAttribute()
[docs]
encoding = EncodingAttribute()
[docs]
class WPSOutputReference(ExtendedMappingSchema):
[docs]
title = "OutputReference"
[docs]
reference = WPSReference(name="Reference")
[docs]
class WPSOutputData(OneOfKeywordSchema):
[docs]
_one_of = [
WPSOutputLiteral(),
WPSOutputReference(),
]
[docs]
class WPSDataOutputItem(AllOfKeywordSchema, WPSNamespace):
# use different title to avoid OpenAPI schema definition clash with 'Output' of 'WPSOutputDefinitions'
[docs]
_all_of = [
WPSOutputBase(),
WPSOutputData(),
]
[docs]
class WPSProcessOutputs(ExtendedSequenceSchema, WPSNamespace):
[docs]
name = "ProcessOutputs"
[docs]
title = "ProcessOutputs"
[docs]
output = WPSDataOutputItem()
[docs]
class WPSExecuteResponse(WPSResponseBaseType, WPSProcessVersion):
[docs]
_schema = f"{OGC_WPS_1_SCHEMAS}/wpsExecute_response.xsd"
[docs]
name = "ExecuteResponse"
[docs]
title = "ExecuteResponse" # not to be confused by 'Execute' used for request
[docs]
location = WPSStatusLocationAttribute()
[docs]
svc_loc = WPSServiceInstanceAttribute()
[docs]
process = WPSProcessSummary()
[docs]
out_def = WPSOutputDefinitions(missing=drop) # when lineage is requested only
[docs]
outputs = WPSProcessOutputs()
[docs]
class WPSXMLSuccessBodySchema(OneOfKeywordSchema):
[docs]
_one_of = [
WPSGetCapabilities(),
WPSDescribeProcess(),
WPSExecuteResponse(),
]
[docs]
class OWSExceptionCodeAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class OWSExceptionLocatorAttribute(ExtendedSchemaNode, XMLObject):
[docs]
class OWSExceptionText(ExtendedSchemaNode, OWSNamespace):
[docs]
schema_type = String
[docs]
name = "ExceptionText"
[docs]
class OWSException(ExtendedMappingSchema, OWSNamespace):
[docs]
code = OWSExceptionCodeAttribute(example="MissingParameterValue")
[docs]
locator = OWSExceptionLocatorAttribute(default="None", example="service")
[docs]
text = OWSExceptionText(example="Missing service")
[docs]
class OWSExceptionReport(ExtendedMappingSchema, OWSNamespace):
[docs]
name = "ExceptionReport"
[docs]
title = "ExceptionReport"
[docs]
exception = OWSException()
[docs]
class WPSException(ExtendedMappingSchema):
[docs]
report = OWSExceptionReport()
[docs]
class OkWPSResponse(ExtendedMappingSchema):
[docs]
description = "WPS operation successful"
[docs]
body = WPSXMLSuccessBodySchema()
[docs]
class ErrorWPSResponse(ExtendedMappingSchema):
[docs]
description = "Unhandled error occurred on WPS endpoint."
[docs]
body = WPSException()
[docs]
class ProviderEndpoint(ProviderPath):
[docs]
class ProcessDescriptionQuery(ExtendedMappingSchema):
# see: 'ProcessDescription' schema and 'Process.offering' method
[docs]
schema = ExtendedSchemaNode(
String(), example=ProcessSchema.OGC, default=ProcessSchema.OGC,
validator=OneOfCaseInsensitive(ProcessSchema.values()),
summary="Selects the desired schema representation of the process description.",
description=(
"Selects the desired schema representation of the process description. "
f"When '{ProcessSchema.OGC}' is used, inputs and outputs will be represented as mapping of objects. "
"Process metadata are also directly provided at the root of the content. "
f"When '{ProcessSchema.OLD}' is used, inputs and outputs will be represented as list of objects with ID. "
"Process metadata are also reported nested under a 'process' field. "
"See '#/definitions/ProcessDescription' schema for more details about each case. "
"These schemas are all represented with JSON content. "
f"For the XML definition, employ '{ProcessSchema.WPS}' or any format selector (f, format, Accept) with XML."
)
)
[docs]
class ProviderProcessEndpoint(ProviderProcessPath):
[docs]
querystring = ProcessDescriptionQuery()
[docs]
class LocalProcessDescriptionQuery(ProcessDescriptionQuery, LocalProcessQuery, FormatQuery):
pass
[docs]
class ProcessEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessDescriptionQuery()
[docs]
class ProcessPackageEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProviderProcessPackageEndpoint(ProviderProcessPath, ProcessPackageEndpoint):
pass
[docs]
class ProcessPayloadEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProcessQuoteEstimatorGetEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProcessQuoteEstimatorPutEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
body = QuoteEstimatorSchema()
[docs]
class ProcessQuoteEstimatorDeleteEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProcessVisibilityGetEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProcessVisibilityPutEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
body = VisibilitySchema()
[docs]
class JobStatusQueryProfileSchema(ExtendedSchemaNode):
[docs]
summary = "Job status schema representation."
[docs]
description = "Selects the schema employed for representation of returned job status response."
[docs]
title = "JobStatusQuerySchema"
[docs]
example = JobStatusSchema.OGC
[docs]
default = JobStatusSchema.OGC
[docs]
validator = OneOfCaseInsensitive(JobStatusSchema.values())
[docs]
class GetJobQuery(ExtendedMappingSchema):
[docs]
schema = JobStatusQueryProfileSchema(missing=drop)
[docs]
profile = JobStatusQueryProfileSchema(missing=drop)
[docs]
class GetProviderJobEndpoint(ProviderProcessPath, JobPath):
[docs]
querystring = GetJobQuery()
[docs]
class GetJobEndpoint(JobPath):
[docs]
querystring = GetJobQuery()
[docs]
class JobResultsQuery(FormatQuery):
[docs]
schema = ExtendedSchemaNode(
String(),
title="JobOutputResultsSchema",
example=JobInputsOutputsSchema.OGC,
default=JobInputsOutputsSchema.OGC,
validator=OneOfCaseInsensitive(JobInputsOutputsSchema.values()),
summary="Selects the schema employed for representation of job outputs.",
description=(
"Selects the schema employed for representation of job results (produced outputs) "
"for providing file Content-Type details. "
f"When '{JobInputsOutputsSchema.OLD}' is employed, "
"'format.mimeType' is used and 'type' is reported as well. "
f"When '{JobInputsOutputsSchema.OGC}' is employed, "
"'format.mediaType' is used and 'type' is reported as well. "
"When the '+strict' value is added, only the 'format' or 'type' will be represented according to the "
f"reference standard ({JobInputsOutputsSchema.OGC}, {JobInputsOutputsSchema.OLD}) representation."
)
)
[docs]
class LocalProcessJobResultsQuery(LocalProcessQuery, JobResultsQuery):
pass
[docs]
class JobOutputsEndpoint(JobPath):
[docs]
querystring = LocalProcessJobResultsQuery()
[docs]
class ProcessOutputsEndpoint(LocalProcessPath, JobPath):
[docs]
querystring = LocalProcessJobResultsQuery()
[docs]
class ProviderOutputsEndpoint(ProviderProcessPath, JobPath):
[docs]
querystring = JobResultsQuery()
[docs]
class ProcessResultEndpoint(ProcessOutputsEndpoint):
[docs]
class ProviderResultEndpoint(ProviderOutputsEndpoint):
[docs]
class JobResultEndpoint(JobPath):
[docs]
class ProcessResultsEndpoint(LocalProcessPath, JobPath):
[docs]
class ProviderResultsEndpoint(ProviderProcessPath, JobPath):
[docs]
class JobResultsEndpoint(JobPath):
[docs]
class JobResultsTriggerExecutionEndpoint(JobResultsEndpoint):
[docs]
body = NoContent()
[docs]
class ProcessJobResultsTriggerExecutionEndpoint(JobResultsTriggerExecutionEndpoint, LocalProcessPath):
pass
[docs]
class ProviderExceptionsEndpoint(ProviderProcessPath, JobPath):
[docs]
class JobExceptionsEndpoint(JobPath):
[docs]
class ProcessExceptionsEndpoint(LocalProcessPath, JobPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProviderLogsEndpoint(ProviderProcessPath, JobPath):
[docs]
class JobLogsEndpoint(JobPath):
[docs]
class ProcessLogsEndpoint(LocalProcessPath, JobPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class JobStatisticsEndpoint(JobPath):
[docs]
class ProcessJobStatisticsEndpoint(LocalProcessPath, JobPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProviderJobStatisticsEndpoint(ProviderProcessPath, JobPath):
##################################################################
# These classes define schemas for requests that feature a body
##################################################################
[docs]
class ProviderPublic(ExtendedMappingSchema):
[docs]
public = ExtendedSchemaNode(
Boolean(),
default=False,
description="Whether the service is defined as publicly visible. "
"This will not control allowance/denial of requests to the registered endpoint of the service. "
"It only indicates if it should appear during listing of providers."
)
[docs]
class CreateProviderRequestBody(ProviderPublic):
[docs]
id = AnyIdentifier()
[docs]
url = URL(description="Endpoint where to query the provider.")
[docs]
class ExecuteOutputDataType(OutputIdentifierType):
pass
[docs]
class ExecuteOutputDefinition(ExtendedMappingSchema):
[docs]
transmissionMode = TransmissionModeEnum(missing=drop)
[docs]
class ExecuteOutputItem(ExecuteOutputDataType, ExecuteOutputDefinition):
pass
[docs]
class ExecuteOutputSpecList(ExtendedSequenceSchema):
"""
Filter list of outputs to be obtained from execution and their reporting method.
"""
[docs]
output = ExecuteOutputItem()
[docs]
class ExecuteOutputMapAdditionalProperties(ExtendedMappingSchema):
[docs]
output_id = ExecuteOutputDefinition(variable="{output-id}", title="ExecuteOutputSpecMap",
description="Desired output reporting method.")
[docs]
class ExecuteOutputSpecMap(AnyOfKeywordSchema):
[docs]
_any_of = [
ExecuteOutputMapAdditionalProperties(), # normal {"<output-id>": {...}}
EmptyMappingSchema(), # allows explicitly provided {}
]
[docs]
class ExecuteOutputSpec(OneOfKeywordSchema):
"""
Filter list of outputs to be obtained from execution and define their reporting method.
"""
[docs]
_one_of = [
# OLD format: {"outputs": [{"id": "<id>", "transmissionMode": "value|reference"}, ...]}
ExecuteOutputSpecList(),
# OGC-API: {"inputs": {"<id>": {"transmissionMode": "value|reference"}, ...}}
ExecuteOutputSpecMap(),
]
[docs]
class ProviderNameSchema(AnyIdentifier):
[docs]
description = "Identifier of the remote provider."
[docs]
class ProviderSummarySchema(DescriptionType, ProviderPublic, DescriptionMeta, DescriptionLinks):
"""
Service provider summary definition.
"""
[docs]
id = ProviderNameSchema()
[docs]
url = URL(description="Endpoint of the service provider.")
[docs]
type = ExtendedSchemaNode(String())
[docs]
_sort_first = PROVIDER_DESCRIPTION_FIELD_FIRST
[docs]
_sort_after = PROVIDER_DESCRIPTION_FIELD_AFTER
[docs]
class ProviderCapabilitiesSchema(ProviderSummarySchema):
"""
Service provider detailed capabilities.
"""
[docs]
class TransmissionModeList(ExtendedSequenceSchema):
[docs]
transmissionMode = TransmissionModeEnum()
[docs]
class JobControlOptionsList(ExtendedSequenceSchema):
[docs]
jobControlOption = JobControlOptionsEnum()
[docs]
class ExceptionReportType(ExtendedMappingSchema):
[docs]
code = ExtendedSchemaNode(String())
[docs]
description = ExtendedSchemaNode(String(), missing=drop)
[docs]
class ProcessControl(ExtendedMappingSchema):
[docs]
jobControlOptions = JobControlOptionsList(missing=ExecuteControlOption.values(),
default=ExecuteControlOption.values())
[docs]
outputTransmission = TransmissionModeList(missing=ExecuteTransmissionMode.values(),
default=ExecuteTransmissionMode.values())
[docs]
class ProcessLocations(ExtendedMappingSchema):
"""
Additional endpoint locations specific to the process.
"""
[docs]
processDescriptionURL = URL(description="Process description endpoint using OGC-API interface.",
missing=drop, title="processDescriptionURL")
[docs]
processEndpointWPS1 = URL(description="Process description endpoint using WPS-1 interface.",
missing=drop, title="processEndpointWPS1")
[docs]
executeEndpoint = URL(description="Endpoint where the process can be executed from.",
missing=drop, title="executeEndpoint")
# 'links' already included via 'ProcessDescriptionType->DescriptionType'
[docs]
class ProcessSummary(
ProcessDescriptionType,
DescriptionMeta,
ProcessControl,
ProcessLocations,
DescriptionLinks
):
"""
Summary process definition.
"""
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/processSummary.yaml"
[docs]
_sort_first = PROCESS_DESCRIPTION_FIELD_FIRST
[docs]
_sort_after = PROCESS_DESCRIPTION_FIELD_AFTER
[docs]
class ProcessSummaryList(ExtendedSequenceSchema):
[docs]
summary = ProcessSummary()
[docs]
class ProcessNamesList(ExtendedSequenceSchema):
[docs]
process_name = ProcessIdentifierTag(
description="Process identifier or tagged representation if revision was requested."
)
[docs]
class ProcessListing(OneOfKeywordSchema):
[docs]
_one_of = [
ProcessSummaryList(description="Listing of process summary details from existing definitions."),
ProcessNamesList(description="Listing of process names when not requesting details.",
missing=drop), # in case of empty list, both schema are valid, drop this one to resolve
]
[docs]
class ProcessCollection(ExtendedMappingSchema):
[docs]
processes = ProcessListing()
[docs]
class ProcessPagingQuery(ExtendedMappingSchema):
[docs]
sort = ProcessSortEnum(missing=drop)
# if page is omitted but limit provided, use reasonable zero by default
[docs]
page = ExtendedSchemaNode(Integer(allow_string=True), missing=0, default=0, validator=Range(min=0))
[docs]
limit = ExtendedSchemaNode(Integer(allow_string=True), missing=None, default=None, validator=Range(min=1),
schema=f"{OGC_API_PROC_PART1_PARAMETERS}/limit.yaml")
[docs]
class ProcessVisibility(ExtendedMappingSchema):
[docs]
visibility = VisibilityValue(missing=drop)
[docs]
class ProcessDeploymentProfile(ExtendedMappingSchema):
[docs]
deploymentProfile = URL(missing=drop)
[docs]
class Process(
# following are like 'ProcessSummary',
# except without 'ProcessControl' and 'DescriptionLinks' that are outside the nested 'process'
ProcessDescriptionType, DescriptionMeta,
# following are additional fields only in description, just like for OGC-API ProcessDescription
ProcessContext, ProcessVisibility, ProcessLocations
):
"""
Old nested process schema for process description.
"""
# note: deprecated in favor of OGC-API schema
[docs]
outputs = DescribeOutputTypeList(description="Outputs definition of the process.")
[docs]
_sort_first = PROCESS_DESCRIPTION_FIELD_FIRST
[docs]
_sort_after = PROCESS_DESCRIPTION_FIELD_AFTER
[docs]
class ProcessDescriptionOLD(ProcessControl, ProcessDeploymentProfile, DescriptionLinks):
"""
Old schema for process description.
"""
[docs]
_sort_first = PROCESS_DESCRIPTION_FIELD_FIRST_OLD_SCHEMA
[docs]
_sort_after = PROCESS_DESCRIPTION_FIELD_AFTER_OLD_SCHEMA
[docs]
class ProcessDescriptionOGC(
ProcessSummary,
ProcessContext,
ProcessVisibility,
ProcessLocations,
ProcessDeploymentProfile,
DescriptionLinks
):
"""
OGC-API schema for process description.
"""
# technically, empty inputs are allowed for processes that should generate constant/randomized outputs
# example:
# https://pavics.ouranos.ca/twitcher/ows/proxy/catalog
# ?service=WPS&request=DescribeProcess&version=1.0.0&identifier=pavicstestdocs
[docs]
outputs = DescribeOutputTypeMap(description="Outputs definition of the process.")
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/process.yaml"
[docs]
_sort_first = PROCESS_DESCRIPTION_FIELD_FIRST
[docs]
_sort_after = PROCESS_DESCRIPTION_FIELD_AFTER
[docs]
class ProcessDescription(OneOfKeywordSchema):
"""
Supported schema representations of a process description (based on specified query parameters).
"""
[docs]
_one_of = [
ProcessDescriptionOGC,
ProcessDescriptionOLD,
]
[docs]
class ProcessDeployment(ProcessSummary, ProcessContext, ProcessDeployMeta):
# override ID to forbid deploy to contain a tagged version part
# if version should be applied, it must be provided with its 'Version' field
[docs]
id = ProcessIdentifier()
# explicit "abstract" handling for bw-compat, new versions should use "description"
# only allowed in deploy to support older servers that report abstract (or parsed from WPS-1/2)
# recent OGC-API v1+ will usually provide directly "description" as per the specification
[docs]
abstract = ExtendedSchemaNode(String(), missing=drop, deprecated=True,
description="Detailed explanation of the process being deployed. "
"[Deprecated] Consider using 'description' instead.")
# allowed undefined I/O during deploy because of reference from owsContext or executionUnit
[docs]
outputs = DeployOutputTypeAny(
missing=drop, title="DeploymentOutputs",
description="Additional definitions for process outputs to extend generated details by the referred package. "
"These are optional as they can mostly be inferred from the 'executionUnit', but allow specific "
f"overrides (see '{DOC_URL}/package.html#correspondence-between-cwl-and-wps-fields')")
[docs]
visibility = VisibilityValue(missing=drop)
[docs]
_schema = f"{OGC_API_SCHEMA_EXT_DEPLOY}/processSummary.yaml"
[docs]
_sort_first = PROCESS_DESCRIPTION_FIELD_FIRST
[docs]
_sort_after = PROCESS_DESCRIPTION_FIELD_AFTER
[docs]
class Duration(ExtendedSchemaNode):
# note: using String instead of Time because timedelta object cannot be directly handled (missing parts at parsing)
[docs]
description = "Human-readable representation of the duration."
# FIXME: use ISO-8601 duration (?) - P[n]Y[n]M[n]DT[n]H[n]M[n]S
# https://pypi.org/project/isodate/
# https://en.wikipedia.org/wiki/ISO_8601#Durations
# See:
# 'duration.to_iso8601' already employed for quotes, should apply for jobs as well
[docs]
class DurationISO(ExtendedSchemaNode):
"""
Duration represented using ISO-8601 format.
.. seealso::
- https://json-schema.org/draft/2019-09/json-schema-validation.html#rfc.section.7.3.1
- :rfc:`3339#appendix-A`
"""
[docs]
description = "ISO-8601 representation of the duration."
[docs]
example = "P[n]Y[n]M[n]DT[n]H[n]M[n]S"
[docs]
def deserialize(self, cstruct):
# type: (Union[datetime.timedelta, str]) -> str
if isinstance(cstruct, datetime.timedelta) or isinstance(cstruct, str) and not cstruct.startswith("P"):
return duration.to_iso8601(cstruct)
return cstruct
[docs]
class JobStatusInfo(ExtendedMappingSchema):
[docs]
_schema = OGC_API_SCHEMA_JOB_STATUS_URL
[docs]
processID = ProcessIdentifierTag(missing=None, default=None,
description="Process identifier corresponding to the job execution.")
[docs]
providerID = ProcessIdentifier(missing=None, default=None,
description="Provider identifier corresponding to the job execution.")
[docs]
type = JobTypeEnum(description="Type of the element associated to the creation of this job.")
[docs]
title = JobTitle(missing=drop)
[docs]
status = JobStatusEnum(description="Last updated status.")
[docs]
message = ExtendedSchemaNode(String(), missing=drop, description="Information about the last status update.")
[docs]
created = ExtendedSchemaNode(DateTime(), missing=drop, default=None,
description="Timestamp when the process execution job was created.")
[docs]
started = ExtendedSchemaNode(DateTime(), missing=drop, default=None,
description="Timestamp when the process started execution if applicable.")
[docs]
finished = ExtendedSchemaNode(DateTime(), missing=drop, default=None,
description="Timestamp when the process completed execution if applicable.")
[docs]
updated = ExtendedSchemaNode(DateTime(), missing=drop, default=None,
description="Timestamp of the last update of the job status. This can correspond to "
"any of the other timestamps according to current execution status or "
"even slightly after job finished execution according to the duration "
"needed to deallocate job resources and store results.")
[docs]
duration = Duration(missing=drop, description="Duration since the start of the process execution.")
[docs]
runningDuration = DurationISO(missing=drop,
description="Duration in ISO-8601 format since the start of the process execution.")
[docs]
runningSeconds = Number(missing=drop,
description="Duration in seconds since the start of the process execution.")
[docs]
expirationDate = ExtendedSchemaNode(DateTime(), missing=drop,
description="Timestamp when the job will be canceled if not yet completed.")
[docs]
estimatedCompletion = ExtendedSchemaNode(DateTime(), missing=drop)
[docs]
nextPoll = ExtendedSchemaNode(DateTime(), missing=drop,
description="Timestamp when the job will be prompted for updated status details.")
[docs]
percentCompleted = Number(example=0, validator=Range(min=0, max=100),
description="Completion percentage of the job as indicated by the process.")
[docs]
progress = ExtendedSchemaNode(Integer(), example=100, validator=Range(0, 100),
description="Completion progress of the job (alias to 'percentCompleted').")
[docs]
links = LinkList(missing=drop)
[docs]
class JobEntrySchema(OneOfKeywordSchema):
# note:
# Since JobID is a simple string (not a dict), no additional mapping field can be added here.
# They will be discarded by `OneOfKeywordSchema.deserialize()`.
[docs]
_one_of = [
JobStatusInfo,
UUID(description="Job ID."),
]
[docs]
class JobCollection(ExtendedSequenceSchema):
[docs]
item = JobEntrySchema()
[docs]
class CreatedJobStatusSchema(DescriptionSchema):
[docs]
jobID = JobID(description="Unique identifier of the created job for execution.")
[docs]
processID = ProcessIdentifierTag(description="Identifier of the process that will be executed.")
[docs]
providerID = AnyIdentifier(description="Remote provider identifier if applicable.", missing=drop)
[docs]
status = ExtendedSchemaNode(String(), example=Status.ACCEPTED)
[docs]
location = ExtendedSchemaNode(String(), example="http://{host}/weaver/processes/{my-process-id}/jobs/{my-job-id}")
[docs]
class PagingBodySchema(ExtendedMappingSchema):
# don't use defaults if missing, otherwise we might report incorrect values compared to actual contents
[docs]
count = ExtendedSchemaNode(Integer(), missing=drop, validator=Range(min=0),
description="Number of items returned within this paged result.")
[docs]
limit = ExtendedSchemaNode(Integer(), missing=drop, validator=Range(min=1, max=1000),
schema=f"{OGC_API_PROC_PART1_PARAMETERS}/limit.yaml",
description="Maximum number of items returned per page.")
[docs]
page = ExtendedSchemaNode(Integer(), missing=drop, validator=Range(min=0),
description="Paging index.")
[docs]
total = ExtendedSchemaNode(Integer(), missing=drop, validator=Range(min=0),
description="Total number of items regardless of paging.")
[docs]
class GetPagingJobsSchema(PagingBodySchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/jobList.yaml" # technically, no 'links' yet, but added after by oneOf
[docs]
class JobCategoryFilters(PermissiveMappingSchema):
[docs]
category = ExtendedSchemaNode(String(), title="CategoryFilter", variable="{category}", default=None, missing=None,
description="Value of the corresponding parameter forming that category group.")
[docs]
class GroupedJobsCategorySchema(ExtendedMappingSchema):
[docs]
category = JobCategoryFilters(description="Grouping values that compose the corresponding job list category.")
[docs]
jobs = JobCollection(description="List of jobs that matched the corresponding grouping values.")
[docs]
count = ExtendedSchemaNode(Integer(), description="Number of matching jobs for the corresponding group category.")
[docs]
class GroupedCategoryJobsSchema(ExtendedSequenceSchema):
[docs]
job_group_category_item = GroupedJobsCategorySchema()
[docs]
class GetGroupedJobsSchema(ExtendedMappingSchema):
[docs]
groups = GroupedCategoryJobsSchema()
[docs]
class GetQueriedJobsSchema(OneOfKeywordSchema):
[docs]
_one_of = [
GetPagingJobsSchema(description="Matched jobs according to filter queries."),
GetGroupedJobsSchema(description="Matched jobs grouped by specified categories."),
]
[docs]
total = ExtendedSchemaNode(Integer(),
description="Total number of matched jobs regardless of grouping or paging result.")
[docs]
links = LinkList() # required by OGC schema
[docs]
_sort_first = JOBS_LISTING_FIELD_FIRST
[docs]
_sort_after = JOBS_LISTING_FIELD_AFTER
[docs]
class DismissedJobSchema(ExtendedMappingSchema):
[docs]
status = JobStatusEnum()
[docs]
message = ExtendedSchemaNode(String(), example="Job dismissed.")
[docs]
percentCompleted = ExtendedSchemaNode(Integer(), example=0)
# same as base Format, but for process/job responses instead of process submission
# (ie: 'Format' is for allowed/supported formats, this is the result format)
[docs]
class DataEncodingAttributes(FormatSelection):
pass
[docs]
class ReferenceBase(ExtendedMappingSchema):
[docs]
body = ExtendedSchemaNode(String(), missing=drop)
[docs]
bodyReference = ReferenceURL(missing=drop)
[docs]
class Reference(ReferenceBase):
[docs]
href = ReferenceURL(description="Endpoint of the reference.")
[docs]
class ExecuteReference(ReferenceBase):
[docs]
title = "ExecuteReference"
[docs]
href = ExecuteReferenceURL(description="Endpoint of the reference.")
[docs]
class ArrayReference(ExtendedSequenceSchema):
[docs]
item = ExecuteReference()
[docs]
class ArrayReferenceValueType(ExtendedMappingSchema):
[docs]
value = ArrayReference()
[docs]
class ExecuteNestedProcessReference(ExtendedMappingSchema):
[docs]
title = "ExecuteNestedProcessReference"
# 'process' is required for a nested definition, otherwise it will not even be detected as one!
[docs]
process = ProcessURL(description="Process reference to be executed.")
[docs]
class ExecuteNestedProcessParameters(ExtendedMappingSchema):
"""
Dynamically defines the nested process parameters with recursive schema handling.
This class must create the nested properties dynamically because the required classes are not yet defined, and
those required definitions also depend on this class to define the nested process as a possible input value.
.. seealso::
- https://docs.pylonsproject.org/projects/colander/en/latest/binding.html
"""
[docs]
title = "ExecuteNestedProcessParameters"
[docs]
_sort_first = ["process", "inputs", "outputs", "properties", "mode", "response"]
@colander.deferred
[docs]
def _children(self, __bindings):
# type: (Dict[str, Any]) -> List[colander.SchemaNode]
self.children = [node.clone() for node in ExecuteProcessParameters().children]
for child in self.children:
# avoid inserting nested default properties that were omitted (ie: mode/response)
# they should be included explicitly only on the top-most process by 'Execute(ExecuteParameters)' schema
child.default = null
return self.children
# calling 'bind' method will initialize this
# schema node attribute from the deferred method
[docs]
children_bound = False # only for optimization
[docs]
def deserialize(self, cstruct):
"""
Defer deserialization validation to the class that contains the set of expected properties.
Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition,
although correspondence is not explicitly ensured.
"""
node = self
if not self.children_bound:
node = self.bind() # ensure bindings are applied to generate children recursive references
node.children_bound = True # avoid doing the binding to resolve children on each recursive resolution
return ExtendedMappingSchema.deserialize(node, cstruct)
# Backward compatible data-input that allows values to be nested under 'data' or 'value' fields,
# both for literal values and link references, for inputs submitted as list-items.
# Also allows the explicit 'href' (+ optional format) reference for a link.
#
# Because this data-input structure applies only to list-items (see 'ExecuteInputItem' below), mapping is always needed.
# (i.e.: values cannot be submitted inline in the list, because field 'id' of each input must also be provided)
# For this reason, one of 'value', 'data', 'href' or 'reference' is mandatory.
# backward compatible definition:
#
# inputs: [
# {"id": "<id>", "value": <data>},
# {"id": "<id>", "href": <link>}
# ... (other variants) ...
# ]
#
# same as 'ExecuteInputReference', but using 'OGC' schema with 'type' field
# Defined as:
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/link.yaml
# But explicitly in the context of an execution input, rather than any other link (eg: metadata)
# same as 'ExecuteInputLink', but using 'OLD' schema with 'format' field
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/inputValueNoObject.yaml
# Any literal value directly provided inline in input mapping.
#
# {"inputs": {"<id>": <literal-data>}}
#
# Excludes objects to avoid conflict with later object mapping and {"value": <data>} definitions.
# Excludes array literals that will be defined separately with allowed array of any item within this schema.
# Note: Also supports 'binaryInputValue', since 'type: string' regardless of 'format' is valid.
[docs]
class BoundingBox2D(ExtendedSequenceSchema):
[docs]
description = "Bounding Box using 2D points."
[docs]
validator = Length(min=4, max=4)
[docs]
class BoundingBox3D(ExtendedSequenceSchema):
[docs]
description = "Bounding Box using 3D points."
[docs]
validator = Length(min=6, max=6)
[docs]
class BoundingBoxValue(OneOfKeywordSchema):
[docs]
_one_of = [
BoundingBox2D,
BoundingBox3D,
]
[docs]
class BoundingBoxObject(StrictMappingSchema):
[docs]
_schema = OGC_API_BBOX_SCHEMA
[docs]
description = "Execute bounding box value provided inline."
[docs]
bbox = BoundingBoxValue(
description="Point values of the bounding box."
)
[docs]
crs = AnyCRS(
default="http://www.opengis.net/def/crs/OGC/1.3/CRS84",
description="Coordinate Reference System of the Bounding Box points.",
)
# extended 'object' part form:
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/inputValue.yaml
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/inlineOrRefData.yaml
#
# oneOf:
# - $ref: "inputValueNoObject.yaml" # in OGC-API spec, includes a generic array
# - $ref: "qualifiedInputValue.yaml"
# - $ref: "link.yaml"
#
# combine 'inlineOrRefData' and its 'array[inlineOrRefData]' variants to simplify 'ExecuteInputAny' definition
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/execute.yaml
#
# inputs:
# additionalProperties: # this is the below 'variable=<input-id>'
# oneOf:
# - $ref: "inlineOrRefData.yaml"
# - type: array
# items:
# $ref: "inlineOrRefData.yaml"
#
# depend on 'StrictMappingSchema' such that any unmapped "additionalProperties"
# caused by the nested schema to fail validation is refused in the final mapping
[docs]
class ExecuteParameters(ExecuteInputOutputs):
"""
Basic execution parameters that can be submitted to run a process.
These parameters can be either for a top-level process job, or any nested process call.
"""
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml"
[docs]
mode = JobExecuteModeEnum(
missing=drop,
default=ExecuteMode.AUTO,
deprecated=True,
)
[docs]
response = JobResponseOptionsEnum(
missing=drop, # no default to ensure 'Prefer' header vs 'response' body resolution order can be performed
)
[docs]
notification_email = Email(
missing=drop,
deprecated=True,
description=(
"Optionally send a notification email when the job is completed. "
"This is equivalent to using subscribers for both failed and successful job status emails simultaneously."
)
)
[docs]
subscribers = JobExecuteSubscribers(missing=drop)
[docs]
class ExecuteProcessParameters(ExecuteParameters):
[docs]
title = "ExecuteProcessParameters"
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml"
[docs]
_sort_first = [
"title",
"process",
"inputs",
"outputs",
"properties",
"mode",
"response",
"subscribers",
]
[docs]
_title = JobTitle(name="title", missing=drop)
[docs]
process = ProcessURL(
missing=drop,
description=(
"Process reference to be executed. "
"This parameter is required if the process cannot be inferred from the request endpoint."
),
example="https://example.com/processes/example"
)
[docs]
class ExecuteJobParameters(ExtendedMappingSchema):
[docs]
_title = JobTitle(name="title", missing=drop)
[docs]
status = JobStatusCreate(
description=(
"Status to request creation of the job without submitting it to processing queue "
"and leave it pending until triggered by another results request to start it "
"(see *OGC API - Processes* - Part 4: Job Management)."
),
missing=drop,
)
[docs]
class Execute(AllOfKeywordSchema):
"""
Main execution parameters that can be submitted to run a process.
Additional parameters are only applicable to the top-most process in a nested definition.
"""
# OGC 'execute.yaml' does not enforce any required item
[docs]
description = "Process execution parameters."
[docs]
examples = {
"ExecuteJSON": {
"summary": "Execute a process job using REST JSON payload with OGC API schema.",
"value": EXAMPLES["job_execute.json"],
},
}
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml"
[docs]
_sort_first = [
"title",
"status",
"process",
"inputs",
"outputs",
"properties",
"mode",
"response",
"subscribers",
]
[docs]
_all_of = [
ExecuteJobParameters(),
ExecuteProcessParameters(),
]
[docs]
class QuoteStatusSchema(ExtendedSchemaNode):
[docs]
validator = OneOf(QuoteStatus.values())
[docs]
class PartialQuoteSchema(ExtendedMappingSchema):
[docs]
status = QuoteStatusSchema()
[docs]
quoteID = UUID(description="Quote ID.")
[docs]
processID = ProcessIdentifierTag(description="Process identifier corresponding to the quote definition.")
[docs]
class DecimalRegex(Regex):
# because the received value can be a Decimal,
# the pattern match object cannot perform regex check directly on it
def __call__(self, node, value):
return super().__call__(node, str(value))
[docs]
class PriceAmount(ExtendedSchemaNode):
[docs]
description = "Monetary value of the price."
[docs]
validator = All(
Range(min=0),
DecimalRegex(re.compile("^[0-9]+.[0-9]+$"), msg="Number must be formatted as currency decimal."),
)
[docs]
class PriceCurrency(ExtendedSchemaNode):
[docs]
description = "Currency code in ISO-4217 format."
[docs]
default = "USD" # most common online
[docs]
validator = All(
Length(min=3, max=3),
OneOf(sorted(list_currencies())),
)
[docs]
class PriceSchema(ExtendedMappingSchema):
[docs]
currency = PriceCurrency(description=(
"Until processed by the quotation estimator for the process, corresponds to the user-requested currency. "
"Once processed, the requested currency will be applied for the amount if exchange rates could be resolved. "
"Otherwise, the estimator-specific or API-wide default currency value will be used for the estimated amount."
))
def __json__(self, value):
"""
Handler for :mod:`pyramid` and :mod:`webob` if the object reaches the JSON serializer.
Combined with :mod:`simplejson` to automatically handle :class:`Decimal` conversion.
"""
return super().deserialize(value)
[docs]
class QuoteProcessParameters(PermissiveMappingSchema, ExecuteInputOutputs):
[docs]
description = (
"Parameters passed for traditional process execution (inputs, outputs) "
"with added metadata for quote evaluation."
)
[docs]
class QuoteEstimateValue(PermissiveMappingSchema):
[docs]
description = "Details of an estimated value, with it attributed rate and resulting cost."
[docs]
estimate = PositiveNumber(default=Decimal("0.0"), missing=None)
[docs]
rate = PositiveNumber(default=Decimal("1.0"), missing=None)
[docs]
cost = PositiveNumber(default=Decimal("0.0"), missing=Decimal("0.0"))
[docs]
class QuoteStepOutputParameters(ExtendedMappingSchema):
[docs]
description = "Outputs from a quote estimation to be chained as inputs for a following Workflow step."
[docs]
output_id = QuoteStepChainedInput(
variable="{output-id}",
description="Mapping of output to chain as input for quote estimation.",
)
[docs]
class QuoteProcessResults(PermissiveMappingSchema):
[docs]
_schema = f"{WEAVER_SCHEMA_URL}/quotation/quote-estimation-result.yaml"
[docs]
description = (
"Results of the quote estimation. "
"Will be empty until completed. "
"Contents may vary according to the estimation methodology. "
"Each category provides details about its contribution toward the total."
)
[docs]
flat = QuoteEstimateValue(missing=drop)
[docs]
memory = QuoteEstimateValue(missing=drop)
[docs]
storage = QuoteEstimateValue(missing=drop)
[docs]
duration = QuoteEstimateValue(missing=drop)
[docs]
cpu = QuoteEstimateValue(missing=drop)
[docs]
gpu = QuoteEstimateValue(missing=drop)
[docs]
total = PositiveNumber(default=Decimal("0.0"))
[docs]
currency = PriceCurrency(missing=drop, description=(
"Optional currency employed by the estimator to produce the quote. "
"API-wide default currency employed if not specified."
))
[docs]
class UserIdSchema(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(String(), missing=drop),
ExtendedSchemaNode(Integer(), default=None),
]
[docs]
class StepQuotation(PartialQuoteSchema):
[docs]
detail = ExtendedSchemaNode(String(), description="Detail about quote processing.", missing=None)
[docs]
price = PriceSchema(description="Estimated price for process execution.")
[docs]
expire = ExtendedSchemaNode(DateTime(), description="Expiration date and time of the quote in ISO-8601 format.")
[docs]
created = ExtendedSchemaNode(DateTime(), description="Creation date and time of the quote in ISO-8601 format.")
[docs]
userID = UserIdSchema(description="User ID that requested the quote.", missing=required, default=None)
[docs]
estimatedTime = Duration(missing=drop,
description="Estimated duration of process execution in human-readable format.")
[docs]
estimatedSeconds = ExtendedSchemaNode(Integer(), missing=drop,
description="Estimated duration of process execution in seconds.")
[docs]
estimatedDuration = DurationISO(missing=drop,
description="Estimated duration of process execution in ISO-8601 format.")
[docs]
processParameters = QuoteProcessParameters(title="QuoteProcessParameters")
[docs]
results = QuoteProcessResults(title="QuoteProcessResults", default={})
[docs]
outputs = QuoteStepOutputParameters(missing=drop)
[docs]
class StepQuotationList(ExtendedSequenceSchema):
[docs]
description = "Detailed child processes and prices part of the complete quote."
[docs]
step = StepQuotation(description="Quote of a workflow step process.")
[docs]
class Quotation(StepQuotation):
[docs]
paid = ExtendedSchemaNode(Boolean(), default=False, description=(
"Indicates if the quote as been paid by the user. "
"This is mandatory in order to execute the job corresponding to the produced quote."
))
[docs]
steps = StepQuotationList(missing=drop)
[docs]
class QuoteStepReferenceList(ExtendedSequenceSchema):
[docs]
description = "Summary of child process quote references part of the complete quote."
[docs]
class QuoteBase(ExtendedMappingSchema):
[docs]
price = PriceSchema(description=(
"Total price of the quote including all estimated costs and step processes if applicable."
))
[docs]
class QuoteSummary(PartialQuoteSchema, QuoteBase):
[docs]
steps = QuoteStepReferenceList()
[docs]
class QuoteSchema(Quotation, QuoteBase):
pass
[docs]
class QuotationList(ExtendedSequenceSchema):
[docs]
quote = UUID(description="Quote ID.")
[docs]
class QuotationListSchema(PagingBodySchema):
[docs]
_sort_first = QUOTES_LISTING_FIELD_FIRST
[docs]
_sort_after = QUOTES_LISTING_FIELD_AFTER
[docs]
quotations = QuotationList()
[docs]
class CreatedQuotedJobStatusSchema(PartialQuoteSchema, CreatedJobStatusSchema):
[docs]
billID = UUID(description="ID of the created bill.")
[docs]
class BillSchema(ExtendedMappingSchema):
[docs]
billID = UUID(description="Bill ID.")
[docs]
quoteID = UUID(description="Original quote ID that produced this bill.", missing=drop)
[docs]
processID = ProcessIdentifierTag()
[docs]
title = ExtendedSchemaNode(String(), description="Name of the bill.")
[docs]
description = ExtendedSchemaNode(String(), missing=drop)
[docs]
price = PriceSchema(description="Price associated to the bill.")
[docs]
created = ExtendedSchemaNode(DateTime(), description="Creation date and time of the bill in ISO-8601 format.")
[docs]
userID = ExtendedSchemaNode(Integer(), description="User id that requested the quote.")
[docs]
class BillList(ExtendedSequenceSchema):
[docs]
bill = UUID(description="Bill ID.")
[docs]
class BillListSchema(ExtendedMappingSchema):
[docs]
class SupportedValues(ExtendedMappingSchema):
pass
[docs]
class DefaultValues(ExtendedMappingSchema):
pass
[docs]
class CWLClass(ExtendedSchemaNode):
# in this case it is ok to use 'name' because target fields receiving it will
# never be able to be named 'class' because of Python reserved keyword
[docs]
example = "CommandLineTool"
[docs]
validator = OneOf(["CommandLineTool", "ExpressionTool", "Workflow"])
[docs]
description = (
"CWL class specification. This is used to differentiate between single Application Package (AP)"
"definitions and Workflow that chains multiple packages."
)
[docs]
class CWLExpression(ExtendedSchemaNode):
[docs]
title = "CWLExpression"
[docs]
description = (
f"When combined with '{CWL_REQUIREMENT_INLINE_JAVASCRIPT}', "
"this field allows runtime parameter references "
f"(see also: {CWL_CMD_TOOL_URL}#Expression)."
)
[docs]
class RequirementClass(ExtendedSchemaNode):
# in this case it is ok to use 'name' because target fields receiving it will
# never be able to be named 'class' because of Python reserved keyword
[docs]
title = "RequirementClass"
[docs]
description = "CWL requirement class specification."
[docs]
class CUDAComputeCapability(ExtendedSchemaNode):
[docs]
title = "CUDA compute capability"
[docs]
description = "The compute capability supported by the GPU hardware."
[docs]
validator = SemanticVersion(regex=r"^\d+\.\d+$")
[docs]
class CUDAComputeCapabilityArray(ExtendedSequenceSchema):
[docs]
item = CUDAComputeCapability()
[docs]
validator = Length(min=1)
[docs]
class CUDAComputeCapabilitySchema(OneOfKeywordSchema):
# https://github.com/common-workflow-language/cwltool/blob/67a180/cwltool/extensions.yml#L178
[docs]
title = CUDAComputeCapability.title
[docs]
description = inspect.cleandoc("""
The compute capability supported by the GPU hardware.
* If this is a single value, it defines only the minimum
compute capability. GPUs with higher capability are also
accepted.
* If it is an array value, then only select GPUs with compute
capabilities that explicitly appear in the array.
See https://docs.nvidia.com/deploy/cuda-compatibility/#faq and
https://docs.nvidia.com/cuda/cuda-c-best-practices-guide/index.html#cuda-compute-capability for details.
""")
[docs]
_one_of = [
CUDAComputeCapability,
CUDAComputeCapabilityArray,
]
[docs]
class CUDARequirementSpecification(PermissiveMappingSchema):
# https://github.com/common-workflow-language/cwltool/blob/67a180/cwltool/extensions.yml#L178
[docs]
cudaVersionMin = ExtendedSchemaNode(
String(),
example="11.4",
title="CUDA version minimum",
description=inspect.cleandoc("""
The minimum CUDA version required to run the software. This corresponds to a CUDA SDK release.
When run in a container, the container image should provide the CUDA runtime, and the host
driver is injected into the container. In this case, because CUDA drivers are backwards compatible,
it is possible to use an older SDK with a newer driver across major versions.
See https://docs.nvidia.com/deploy/cuda-compatibility/ for details.
"""),
validator=SemanticVersion(regex=r"^\d+\.\d+$"),
)
[docs]
cudaComputeCapability = CUDAComputeCapabilitySchema()
[docs]
cudaDeviceCountMin = ExtendedSchemaNode(
Integer(),
example=1,
default=1,
validator=Range(min=1),
title="CUDA device count minimum",
description="The minimum amount of devices required.",
)
[docs]
cudaDeviceCountMax = ExtendedSchemaNode(
Integer(),
example=8,
default=1,
validator=Range(min=1),
title="CUDA device count maximum",
description="The maximum amount of devices required.",
)
[docs]
class CUDARequirementMap(ExtendedMappingSchema):
[docs]
CUDARequirement = CUDARequirementSpecification(
name=CWL_REQUIREMENT_CUDA,
title=CWL_REQUIREMENT_CUDA,
)
[docs]
class CUDARequirementClass(CUDARequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_CUDA, validator=OneOf([CWL_REQUIREMENT_CUDA]))
[docs]
class NetworkAccessRequirementSpecification(PermissiveMappingSchema):
[docs]
networkAccess = ExtendedSchemaNode(
Boolean(),
example=True,
title="Network Access",
description="Indicate whether a process requires outgoing IPv4/IPv6 network access.",
)
[docs]
class NetworkAccessRequirementMap(ExtendedMappingSchema):
[docs]
NetworkAccessRequirement = NetworkAccessRequirementSpecification(
name=CWL_REQUIREMENT_NETWORK_ACCESS,
title=CWL_REQUIREMENT_NETWORK_ACCESS,
)
[docs]
class NetworkAccessRequirementClass(NetworkAccessRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_NETWORK_ACCESS, validator=OneOf([CWL_REQUIREMENT_NETWORK_ACCESS]))
[docs]
class ResourceRequirementValue(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(Float(), validator=BoundedRange(min=0.0, exclusive_min=True)),
ExtendedSchemaNode(Integer(), validator=Range(min=1)),
CWLExpression,
]
[docs]
class ResourceRequirementSpecification(PermissiveMappingSchema):
[docs]
description = inspect.cleandoc(f"""
Specify basic hardware resource requirements
(see also: {CWL_CMD_TOOL_URL}#{CWL_REQUIREMENT_RESOURCE}).
""")
[docs]
coresMin = ResourceRequirementValue(
missing=drop,
default=1,
title="ResourceCoresMinimum",
summary="Minimum reserved number of CPU cores.",
description=inspect.cleandoc("""
Minimum reserved number of CPU cores.
May be a fractional value to indicate to a scheduling algorithm that one core can be allocated
to multiple jobs. For example, a value of 0.25 indicates that up to 4 jobs may run in parallel
on 1 core. A value of 1.25 means that up to 3 jobs can run on a 4 core system (4/1.25 ≈ 3).
Processes can only share a core allocation if the sum of each of their 'ramMax', 'tmpdirMax',
and 'outdirMax' requests also do not exceed the capacity of the node.
Processes sharing a core must have the same level of isolation (typically a container or VM)
that they would normally have.
The reported number of CPU cores reserved for the process, which is available to expressions on the
'CommandLineTool' as 'runtime.cores', must be a non-zero integer, and may be calculated by rounding up
the cores request to the next whole number.
Scheduling systems may allocate fractional CPU resources by setting quotas or scheduling weights.
Scheduling systems that do not support fractional CPUs may round up the request to the next whole number.
"""),
)
[docs]
coresMax = ResourceRequirementValue(
missing=drop,
title="ResourceCoresMaximum",
summary="Maximum reserved number of CPU cores.",
description=inspect.cleandoc("""
Maximum reserved number of CPU cores.
See 'coresMin' for discussion about fractional CPU requests.
"""),
)
[docs]
ramMin = ResourceRequirementValue(
missing=drop,
default=256,
title="ResourceRAMMinimum",
summary="Minimum reserved RAM in mebibytes.",
description=inspect.cleandoc("""
Minimum reserved RAM in mebibytes (2**20).
May be a fractional value. If so, the actual RAM request must be rounded up to the next whole number.
The reported amount of RAM reserved for the process, which is available to expressions on the
'CommandLineTool' as 'runtime.ram', must be a non-zero integer.
"""),
)
[docs]
ramMax = ResourceRequirementValue(
missing=drop,
title="ResourceRAMMaximum",
summary="Maximum reserved RAM in mebibytes.",
description=inspect.cleandoc("""
Maximum reserved RAM in mebibytes (2**20).
See 'ramMin' for discussion about fractional RAM requests.
"""),
)
[docs]
tmpdirMin = ResourceRequirementValue(
missing=drop,
default=1024,
title="ResourceTmpDirMinimum",
summary="Minimum reserved filesystem based storage for the designated temporary directory in mebibytes.",
description=inspect.cleandoc("""
Minimum reserved filesystem based storage for the designated temporary directory in mebibytes (2**20).
May be a fractional value. If so, the actual storage request must be rounded up to the next whole number.
The reported amount of storage reserved for the process, which is available to expressions on the
'CommandLineTool' as 'runtime.tmpdirSize', must be a non-zero integer.
"""),
)
[docs]
tmpdirMax = ResourceRequirementValue(
missing=drop,
title="ResourceTmpDirMaximum",
summary="Maximum reserved filesystem based storage for the designated temporary directory in mebibytes.",
description=inspect.cleandoc("""
Maximum reserved filesystem based storage for the designated temporary directory in mebibytes (2**20).
See 'tmpdirMin' for discussion about fractional storage requests.
"""),
)
[docs]
outdirMin = ResourceRequirementValue(
missing=drop,
default=1024,
title="ResourceOutDirMinimum",
summary="Minimum reserved filesystem based storage for the designated output directory in mebibytes.",
description=inspect.cleandoc("""
Minimum reserved filesystem based storage for the designated output directory in mebibytes (2**20).
May be a fractional value. If so, the actual storage request must be rounded up to the next whole number.
The reported amount of storage reserved for the process, which is available to expressions on the
'CommandLineTool' as runtime.outdirSize, must be a non-zero integer.
"""),
)
[docs]
outdirMax = ResourceRequirementValue(
missing=drop,
default=1,
title="ResourceOutDirMaximum",
summary="Maximum reserved filesystem based storage for the designated output directory in mebibytes.",
description=inspect.cleandoc("""
Maximum reserved filesystem based storage for the designated output directory in mebibytes (2**20).
See 'outdirMin' for discussion about fractional storage requests.
"""),
)
[docs]
class ResourceRequirementMap(ExtendedMappingSchema):
[docs]
ResourceRequirement = ResourceRequirementSpecification(
name=CWL_REQUIREMENT_RESOURCE,
title=CWL_REQUIREMENT_RESOURCE,
)
[docs]
class ResourceRequirementClass(ResourceRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_RESOURCE, validator=OneOf([CWL_REQUIREMENT_RESOURCE]))
[docs]
class DockerRequirementSpecification(PermissiveMappingSchema):
[docs]
dockerPull = ExtendedSchemaNode(
String(),
example="docker-registry.host.com/namespace/image:1.2.3",
title="Docker pull reference",
description="Reference package that will be retrieved and executed by CWL."
)
[docs]
class DockerRequirementMap(ExtendedMappingSchema):
[docs]
DockerRequirement = DockerRequirementSpecification(
name=CWL_REQUIREMENT_APP_DOCKER,
title=CWL_REQUIREMENT_APP_DOCKER
)
[docs]
class DockerRequirementClass(DockerRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_APP_DOCKER, validator=OneOf([CWL_REQUIREMENT_APP_DOCKER]))
[docs]
class DockerGpuRequirementSpecification(DockerRequirementSpecification):
[docs]
description = inspect.cleandoc(f"""
Docker requirement with GPU-enabled support (https://github.com/NVIDIA/nvidia-docker).
The instance must have the NVIDIA toolkit installed to use this feature.
WARNING:
This requirement is specific to Weaver and is preserved only for backward compatibility.
Prefer the combined use of official '{CWL_REQUIREMENT_APP_DOCKER}' and '{CWL_REQUIREMENT_CUDA}'
for better support of GPU capabilities and portability to other CWL-supported platforms.
""")
[docs]
class DockerGpuRequirementMap(ExtendedMappingSchema):
[docs]
req = DockerGpuRequirementSpecification(name=CWL_REQUIREMENT_APP_DOCKER_GPU)
[docs]
class DockerGpuRequirementClass(DockerGpuRequirementSpecification):
[docs]
title = CWL_REQUIREMENT_APP_DOCKER_GPU
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_APP_DOCKER_GPU, validator=OneOf([CWL_REQUIREMENT_APP_DOCKER_GPU]))
[docs]
class DirectoryListingItem(PermissiveMappingSchema):
[docs]
entry = ExtendedSchemaNode(String(), missing=drop)
[docs]
entryname = ExtendedSchemaNode(String(), missing=drop)
[docs]
writable = ExtendedSchemaNode(Boolean(), missing=drop)
[docs]
class InitialWorkDirListing(ExtendedSequenceSchema):
[docs]
item = DirectoryListingItem()
[docs]
class InitialWorkDirRequirementSpecification(PermissiveMappingSchema):
[docs]
listing = InitialWorkDirListing()
[docs]
class InitialWorkDirRequirementMap(ExtendedMappingSchema):
[docs]
req = InitialWorkDirRequirementSpecification(name=CWL_REQUIREMENT_INIT_WORKDIR)
[docs]
class InitialWorkDirRequirementClass(InitialWorkDirRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_INIT_WORKDIR,
validator=OneOf([CWL_REQUIREMENT_INIT_WORKDIR]))
[docs]
class InlineJavascriptLibraries(ExtendedSequenceSchema):
[docs]
description = inspect.cleandoc("""
Additional code fragments that will also be inserted before executing the expression code.
Allows for function definitions that may be called from CWL expressions.
""")
[docs]
exp_lib = ExtendedSchemaNode(String(), missing=drop)
[docs]
class InlineJavascriptRequirementSpecification(PermissiveMappingSchema):
[docs]
description = inspect.cleandoc(f"""
Indicates that the workflow platform must support inline Javascript expressions.
If this requirement is not present, the workflow platform must not perform expression interpolation
(see also: {CWL_CMD_TOOL_URL}#{CWL_REQUIREMENT_INLINE_JAVASCRIPT}).
""")
[docs]
expressionLib = InlineJavascriptLibraries(missing=drop)
[docs]
class InlineJavascriptRequirementMap(ExtendedMappingSchema):
[docs]
req = InlineJavascriptRequirementSpecification(name=CWL_REQUIREMENT_INLINE_JAVASCRIPT)
[docs]
class InlineJavascriptRequirementClass(InlineJavascriptRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_INLINE_JAVASCRIPT,
validator=OneOf([CWL_REQUIREMENT_INLINE_JAVASCRIPT]))
[docs]
class InplaceUpdateRequirementSpecification(PermissiveMappingSchema):
[docs]
description = inspect.cleandoc(f"""
If 'inplaceUpdate' is true, then an implementation supporting this feature may permit tools to directly
update files with 'writable: true' in '{CWL_REQUIREMENT_INIT_WORKDIR}'. That is, as an optimization,
files may be destructively modified in place as opposed to copied and updated
(see also: {CWL_CMD_TOOL_URL}#{CWL_REQUIREMENT_INPLACE_UPDATE}).
""")
[docs]
inplaceUpdate = ExtendedSchemaNode(Boolean())
[docs]
class InplaceUpdateRequirementMap(ExtendedMappingSchema):
[docs]
req = InplaceUpdateRequirementSpecification(name=CWL_REQUIREMENT_INPLACE_UPDATE)
[docs]
class InplaceUpdateRequirementClass(InplaceUpdateRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_INPLACE_UPDATE,
validator=OneOf([CWL_REQUIREMENT_INPLACE_UPDATE]))
[docs]
class LoadContents(ExtendedSchemaNode):
[docs]
schema_type = Boolean
[docs]
title = "LoadContents"
[docs]
description = "Indicates if a 'File' type reference should be loaded under the 'contents' property."
[docs]
class LoadListingEnum(ExtendedSchemaNode):
[docs]
title = "LoadListingEnum"
[docs]
validator = OneOf(["no_listing", "shallow_listing", "deep_listing"])
[docs]
class LoadListingRequirementSpecification(PermissiveMappingSchema):
[docs]
description = (
"Specify the desired behavior for loading the listing field of a 'Directory' object for use by expressions "
f"(see also: {CWL_CMD_TOOL_URL}#{CWL_REQUIREMENT_LOAD_LISTING})."
)
[docs]
loadListing = LoadListingEnum()
[docs]
class LoadListingRequirementMap(ExtendedMappingSchema):
[docs]
req = LoadListingRequirementSpecification(name=CWL_REQUIREMENT_LOAD_LISTING)
[docs]
class LoadListingRequirementClass(LoadListingRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_LOAD_LISTING,
validator=OneOf([CWL_REQUIREMENT_LOAD_LISTING]))
[docs]
class IdentifierArray(ExtendedSequenceSchema):
[docs]
class ScatterFeatureReference(PermissiveMappingSchema):
[docs]
scatter = ScatterIdentifiersSchema(missing=drop)
[docs]
scatterMethod = ExtendedSchemaNode(
String(),
validator=OneOf(["dotproduct", "nested_crossproduct", "flat_crossproduct"]),
missing=drop,
description=inspect.cleandoc("""
If 'scatter' declares more than one input parameter, 'scatterMethod' describes how to decompose the
input into a discrete set of jobs.
- dotproduct: specifies that each of the input arrays are aligned and one element taken from each array
to construct each job. It is an error if all input arrays are not the same length.
- nested_crossproduct: specifies the Cartesian product of the inputs, producing a job for every
combination of the scattered inputs. The output must be nested arrays for each level of scattering,
in the order that the input arrays are listed in the 'scatter' field.
- flat_crossproduct: specifies the Cartesian product of the inputs, producing a job for every combination
of the scattered inputs. The output arrays must be flattened to a single level, but otherwise listed in
the order that the input arrays are listed in the 'scatter' field.
""")
)
[docs]
class ScatterFeatureRequirementSpecification(StrictMappingSchema):
[docs]
description = inspect.cleandoc(f"""
A 'scatter' operation specifies that the associated Workflow step should execute separately over a list of
input elements. Each job making up a scatter operation is independent and may be executed concurrently
(see also: {CWL_WORKFLOW_URL}#WorkflowStep).
""")
[docs]
class ScatterFeatureRequirementMap(ExtendedMappingSchema):
[docs]
req = ScatterFeatureRequirementSpecification(name=CWL_REQUIREMENT_SCATTER)
[docs]
class ScatterFeatureRequirementClass(ScatterFeatureRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_SCATTER, validator=OneOf([CWL_REQUIREMENT_SCATTER]))
[docs]
class SecretsRequirementSpecification(StrictMappingSchema):
[docs]
description = "Lists input parameters containing sensitive information to be masked."
[docs]
secrets = IdentifierArray(
title="Secrets",
description="Input parameter identifiers to consider as secrets.",
validator=Length(min=1),
)
[docs]
class SecretsRequirementMap(ExtendedMappingSchema):
[docs]
req = SecretsRequirementSpecification(name=CWL_REQUIREMENT_SECRETS)
[docs]
class SecretsRequirementClass(SecretsRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_SECRETS, validator=OneOf([CWL_REQUIREMENT_SECRETS]))
[docs]
class SubworkflowRequirementSpecification(StrictMappingSchema):
[docs]
description = "Indicates that a Workflow employs another Workflow as one of its steps."
[docs]
class SubworkflowRequirementMap(ExtendedMappingSchema):
[docs]
req = SubworkflowRequirementSpecification(name=CWL_REQUIREMENT_SUBWORKFLOW)
[docs]
class SubworkflowRequirementClass(SubworkflowRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_SUBWORKFLOW, validator=OneOf([CWL_REQUIREMENT_SUBWORKFLOW]))
[docs]
class TimeLimitValue(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(Float(), validator=Range(min=0.0)),
ExtendedSchemaNode(Integer(), validator=Range(min=0)),
CWLExpression,
]
[docs]
class EnableReuseValue(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(Boolean(allow_string=False)),
CWLExpression,
]
[docs]
class WorkReuseRequirementSpecification(PermissiveMappingSchema):
[docs]
description = inspect.cleandoc(f"""
For implementations that support reusing output from past work
(on the assumption that same code and same input produce same results),
control whether to enable or disable the reuse behavior for a particular tool or step
(to accommodate situations where that assumption is incorrect).
A reused step is not executed but instead returns the same output as the original execution.
If '{CWL_REQUIREMENT_WORK_REUSE}' is not specified, correct tools should assume it is enabled by default.
""")
[docs]
enableReuse = EnableReuseValue(
description=inspect.cleandoc(f"""
Indicates if reuse is enabled for this tool.
Can be an expression when combined with '{CWL_REQUIREMENT_INLINE_JAVASCRIPT}'
(see also: {CWL_CMD_TOOL_URL}#Expression).
""")
)
[docs]
class WorkReuseRequirementMap(ExtendedMappingSchema):
[docs]
req = WorkReuseRequirementSpecification(name=CWL_REQUIREMENT_WORK_REUSE)
[docs]
class WorkReuseRequirementClass(WorkReuseRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_WORK_REUSE, validator=OneOf([CWL_REQUIREMENT_WORK_REUSE]))
[docs]
class BuiltinRequirementSpecification(PermissiveMappingSchema):
[docs]
title = CWL_REQUIREMENT_APP_BUILTIN
[docs]
description = (
"Hint indicating that the Application Package corresponds to a builtin process of "
"this instance. (note: can only be an 'hint' as it is unofficial CWL specification)."
)
[docs]
process = AnyIdentifier(description="Builtin process identifier.")
[docs]
class BuiltinRequirementMap(ExtendedMappingSchema):
[docs]
req = BuiltinRequirementSpecification(name=CWL_REQUIREMENT_APP_BUILTIN)
[docs]
class BuiltinRequirementClass(BuiltinRequirementSpecification):
[docs]
_class = RequirementClass(example=CWL_REQUIREMENT_APP_BUILTIN, validator=OneOf([CWL_REQUIREMENT_APP_BUILTIN]))
[docs]
class ESGF_CWT_RequirementSpecification(PermissiveMappingSchema): # noqa: N802
[docs]
title = CWL_REQUIREMENT_APP_ESGF_CWT
[docs]
description = (
"Hint indicating that the Application Package corresponds to an ESGF-CWT provider process"
"that should be remotely executed and monitored by this instance. "
"(note: can only be an 'hint' as it is unofficial CWL specification)."
)
[docs]
process = AnyIdentifier(description="Process identifier of the remote ESGF-CWT provider.")
[docs]
provider = URL(description="ESGF-CWT provider endpoint.")
[docs]
class ESGF_CWT_RequirementMap(ExtendedMappingSchema): # noqa: N802
[docs]
req = ESGF_CWT_RequirementSpecification(name=CWL_REQUIREMENT_APP_ESGF_CWT)
[docs]
class WeaverESGF_CWT_RequirementMap(ExtendedMappingSchema):
[docs]
req = ESGF_CWT_RequirementSpecification(name=f"{CWL_NAMESPACE_WEAVER_ID}:{CWL_REQUIREMENT_APP_ESGF_CWT}")
[docs]
class ESGF_CWT_RequirementClass(ESGF_CWT_RequirementSpecification): # noqa: N802
[docs]
_class = RequirementClass(
example=CWL_REQUIREMENT_APP_ESGF_CWT,
validator=OneOf([CWL_REQUIREMENT_APP_ESGF_CWT, f"{CWL_NAMESPACE_WEAVER_ID}:{CWL_REQUIREMENT_APP_ESGF_CWT}"]),
)
[docs]
class OGCAPIRequirementSpecification(PermissiveMappingSchema):
[docs]
title = CWL_REQUIREMENT_APP_OGC_API
[docs]
description = (
"Hint indicating that the Application Package corresponds to an OGC API - Processes provider"
"that should be remotely executed and monitored by this instance. "
"(note: can only be an 'hint' as it is unofficial CWL specification)."
)
[docs]
process = ReferenceURL(description="Process URL of the remote OGC API Process.")
[docs]
class OGCAPIRequirementMap(ExtendedMappingSchema):
[docs]
req = OGCAPIRequirementSpecification(name=CWL_REQUIREMENT_APP_OGC_API)
[docs]
class WeaverOGCAPIRequirementMap(ExtendedMappingSchema):
[docs]
req = OGCAPIRequirementSpecification(name=f"{CWL_NAMESPACE_WEAVER_ID}:{CWL_REQUIREMENT_APP_OGC_API}")
[docs]
class OGCAPIRequirementClass(OGCAPIRequirementSpecification):
[docs]
_class = RequirementClass(
example=CWL_REQUIREMENT_APP_OGC_API,
validator=OneOf([CWL_REQUIREMENT_APP_OGC_API, f"{CWL_NAMESPACE_WEAVER_ID}:{CWL_REQUIREMENT_APP_OGC_API}"]),
)
[docs]
class WPS1RequirementSpecification(PermissiveMappingSchema):
[docs]
title = CWL_REQUIREMENT_APP_WPS1
[docs]
description = (
"Hint indicating that the Application Package corresponds to a WPS-1 provider process"
"that should be remotely executed and monitored by this instance. "
"(note: can only be an 'hint' as it is unofficial CWL specification)."
)
[docs]
process = AnyIdentifier(description="Process identifier of the remote WPS provider.")
[docs]
provider = URL(description="WPS provider endpoint.")
[docs]
class WPS1RequirementMap(ExtendedMappingSchema):
[docs]
req = WPS1RequirementSpecification(name=CWL_REQUIREMENT_APP_WPS1)
[docs]
class WeaverWPS1RequirementMap(ExtendedMappingSchema):
[docs]
req = WPS1RequirementSpecification(name=f"{CWL_NAMESPACE_WEAVER_ID}:{CWL_REQUIREMENT_APP_WPS1}")
[docs]
class WPS1RequirementClass(WPS1RequirementSpecification):
[docs]
_class = RequirementClass(
example=CWL_REQUIREMENT_APP_WPS1,
validator=OneOf([CWL_REQUIREMENT_APP_WPS1, f"{CWL_NAMESPACE_WEAVER_ID}:{CWL_REQUIREMENT_APP_WPS1}"]),
)
[docs]
class UnknownRequirementMap(PermissiveMappingSchema):
[docs]
description = "Generic schema to allow alternative CWL requirements/hints not explicitly defined in schemas."
[docs]
class UnknownRequirementClass(PermissiveMappingSchema):
[docs]
_class = RequirementClass(example="UnknownRequirement")
[docs]
class CWLRequirementsMapDefinitions(AnyOfKeywordSchema):
[docs]
_any_of = [
DockerRequirementMap(missing=drop),
DockerGpuRequirementMap(missing=drop),
InitialWorkDirRequirementMap(missing=drop),
InlineJavascriptRequirementMap(missing=drop),
InplaceUpdateRequirementMap(missing=drop),
LoadListingRequirementMap(missing=drop),
MultipleInputRequirementMap(missing=drop),
NetworkAccessRequirementMap(missing=drop),
ResourceRequirementMap(missing=drop),
ScatterFeatureRequirementMap(missing=drop),
StepInputExpressionRequirementMap(missing=drop),
SubworkflowRequirementMap(missing=drop),
ToolTimeLimitRequirementMap(missing=drop),
WorkReuseRequirementMap(missing=drop),
# specific weaver-namespaced definitions
# note:
# Do not allow 'builtin', since it is only an internal 'hint', not 'required' for CWL execution.
# Also, disallow its used explicitly from deployment.
WeaverESGF_CWT_RequirementMap(missing=drop),
WeaverOGCAPIRequirementMap(missing=drop),
WeaverWPS1RequirementMap(missing=drop),
]
[docs]
class CWLRequirementsMapSupported(StrictMappingSchema):
[docs]
description = "Schema that ensures only supported CWL requirements are permitted."
def __init__(self, *_, **__):
# type: (*Any, **Any) -> None
"""
Initialize the mapping to allow only supported CWL requirements.
Because :class:`StrictMappingSchema` is used, initialized sub-nodes are equivalent
the following :term:`JSON` schema definition, with a key of every known requirement name.
.. code-block::
{
"<supported-requirement>": {},
"additionalProperties: false
}
The actual validation of nested fields under each requirement will be
handled by their respective schema in :class:`CWLRequirementsMapDefinitions`.
"""
super().__init__(*_, **__)
[docs]
self.children = [
PermissiveMappingSchema(name=req, missing=drop)
for req in CWL_REQUIREMENTS_SUPPORTED
]
[docs]
class CWLRequirementsMap(AllOfKeywordSchema):
[docs]
_all_of = [
CWLRequirementsMapDefinitions(),
CWLRequirementsMapSupported(),
]
[docs]
class CWLRequirementsItem(OneOfKeywordSchema):
# in case there is any conflict between definitions,
# the 'class' field can be used to discriminate which one is expected.
# however, this **requires** that every mapping defined in '_one_of' should
# provide a 'class' definition that can perform discrimination using a validator
[docs]
discriminator = "class"
[docs]
_one_of = [
DockerRequirementClass(missing=drop),
DockerGpuRequirementClass(missing=drop),
InitialWorkDirRequirementClass(missing=drop),
InlineJavascriptRequirementClass(missing=drop),
InplaceUpdateRequirementClass(missing=drop),
LoadListingRequirementClass(missing=drop),
MultipleInputRequirementClass(missing=drop),
NetworkAccessRequirementClass(missing=drop),
ResourceRequirementClass(missing=drop),
ScatterFeatureRequirementClass(missing=drop),
StepInputExpressionRequirementClass(missing=drop),
SubworkflowRequirementClass(missing=drop),
ToolTimeLimitRequirementClass(missing=drop),
WorkReuseRequirementClass(missing=drop),
# specific weaver-namespaced definitions
# note:
# Do not allow 'builtin', since it is only an internal 'hint', not 'required' for CWL execution.
# Also, disallow its used explicitly from deployment.
WeaverESGF_CWT_RequirementMap(missing=drop),
WeaverOGCAPIRequirementMap(missing=drop),
WeaverWPS1RequirementMap(missing=drop),
]
[docs]
class CWLRequirementsList(ExtendedSequenceSchema):
[docs]
requirement = CWLRequirementsItem()
[docs]
class CWLRequirements(OneOfKeywordSchema):
[docs]
_one_of = [
CWLRequirementsMap(),
CWLRequirementsList(),
]
[docs]
class CWLHintsMap(AnyOfKeywordSchema, PermissiveMappingSchema):
[docs]
_any_of = [
BuiltinRequirementMap(missing=drop),
CUDARequirementMap(missing=drop),
DockerRequirementMap(missing=drop),
DockerGpuRequirementMap(missing=drop),
InitialWorkDirRequirementMap(missing=drop),
InlineJavascriptRequirementMap(missing=drop),
InplaceUpdateRequirementMap(missing=drop),
LoadListingRequirementMap(missing=drop),
NetworkAccessRequirementMap(missing=drop),
ResourceRequirementMap(missing=drop),
ScatterFeatureRequirementMap(missing=drop),
SecretsRequirementMap(missing=drop),
StepInputExpressionRequirementMap(missing=drop),
ToolTimeLimitRequirementMap(missing=drop),
WorkReuseRequirementMap(missing=drop),
ESGF_CWT_RequirementMap(missing=drop),
OGCAPIRequirementMap(missing=drop),
WPS1RequirementMap(missing=drop),
UnknownRequirementMap(missing=drop), # allows anything, must be last
]
[docs]
class CWLHintsItem(AnyOfKeywordSchema, PermissiveMappingSchema):
[docs]
_any_of = [
BuiltinRequirementClass(missing=drop),
CUDARequirementClass(missing=drop),
DockerRequirementClass(missing=drop),
DockerGpuRequirementClass(missing=drop),
InitialWorkDirRequirementClass(missing=drop),
InlineJavascriptRequirementClass(missing=drop),
InplaceUpdateRequirementClass(missing=drop),
LoadListingRequirementClass(missing=drop),
NetworkAccessRequirementClass(missing=drop),
ResourceRequirementClass(missing=drop),
ScatterFeatureRequirementClass(missing=drop),
SecretsRequirementClass(missing=drop),
StepInputExpressionRequirementClass(missing=drop),
ToolTimeLimitRequirementClass(missing=drop),
WorkReuseRequirementClass(missing=drop),
ESGF_CWT_RequirementClass(missing=drop),
OGCAPIRequirementClass(missing=drop),
WPS1RequirementClass(missing=drop),
UnknownRequirementClass(missing=drop), # allows anything, must be last
]
[docs]
class CWLHintsList(ExtendedSequenceSchema):
[docs]
class CWLHints(OneOfKeywordSchema):
[docs]
_one_of = [
CWLHintsMap(),
CWLHintsList(),
]
[docs]
class CWLArguments(ExtendedSequenceSchema):
[docs]
argument = ExtendedSchemaNode(String())
[docs]
class CWLTypeString(ExtendedSchemaNode):
[docs]
description = "Field type definition."
[docs]
validator = OneOf(PACKAGE_TYPE_POSSIBLE_VALUES)
# NOTE: CWL Enum does not support non-string values
# - https://github.com/common-workflow-language/cwl-v1.2/issues/267
# - https://github.com/common-workflow-language/common-workflow-language/issues/764
# - https://github.com/common-workflow-language/common-workflow-language/issues/907
# class CWLTypeSymbolValues(OneOfKeywordSchema):
# _one_of = [
# ExtendedSchemaNode(Float()),
# ExtendedSchemaNode(Integer()),
# ExtendedSchemaNode(String()),
# ]
[docs]
class CWLTypeSymbolValues(ExtendedSchemaNode):
[docs]
class CWLTypeSymbols(ExtendedSequenceSchema):
[docs]
symbol = CWLTypeSymbolValues()
[docs]
class CWLTypeEnum(ExtendedMappingSchema):
[docs]
type = ExtendedSchemaNode(String(), example=PACKAGE_ENUM_BASE, validator=OneOf(PACKAGE_CUSTOM_TYPES))
[docs]
symbols = CWLTypeSymbols(summary="Allowed values composing the enum.")
[docs]
class CWLTypeArrayItemObject(ExtendedMappingSchema):
[docs]
type = CWLTypeString(validator=OneOf(PACKAGE_ARRAY_ITEMS - PACKAGE_CUSTOM_TYPES))
[docs]
class CWLTypeArrayItems(OneOfKeywordSchema):
[docs]
_one_of = [
CWLTypeString(title="CWLTypeArrayItemsString", validator=OneOf(PACKAGE_ARRAY_ITEMS)),
CWLTypeEnum(summary="CWL type as enum of values."),
CWLTypeArrayItemObject(summary="CWL type in nested object definition."),
]
[docs]
class CWLTypeArray(ExtendedMappingSchema):
[docs]
type = ExtendedSchemaNode(String(), example=PACKAGE_ARRAY_BASE, validator=OneOf([PACKAGE_ARRAY_BASE]))
[docs]
items = CWLTypeArrayItems()
[docs]
class CWLTypeBase(OneOfKeywordSchema):
[docs]
_one_of = [
CWLTypeString(summary="CWL type as literal value."),
CWLTypeArray(summary="CWL type as list of items."),
CWLTypeEnum(summary="CWL type as enum of values."),
]
[docs]
class CWLTypeList(ExtendedSequenceSchema):
[docs]
class CWLType(OneOfKeywordSchema):
[docs]
_one_of = [
CWLTypeBase(summary="CWL type definition."),
CWLTypeList(summary="Combination of allowed CWL types."),
]
[docs]
class AnyLiteralList(ExtendedSequenceSchema):
[docs]
default = AnyLiteralType()
[docs]
class CWLDefault(OneOfKeywordSchema):
[docs]
_one_of = [
AnyLiteralType(),
AnyLiteralList(),
]
[docs]
class CWLTypeStringList(ExtendedSequenceSchema):
[docs]
description = "List of allowed direct CWL type specifications as strings."
[docs]
class OutputBinding(PermissiveMappingSchema):
[docs]
description = "Defines how to retrieve the output result from the command."
[docs]
glob = ExtendedSchemaNode(
String(),
missing=drop,
description="Glob pattern to find the output on disk or mounted docker volume.",
)
[docs]
class CWLOutputObject(PermissiveMappingSchema):
# 'outputBinding' should usually be there most of the time (if not always) to retrieve file,
# but can technically be omitted in some very specific use-cases such as output literal or output is std logs
[docs]
outputBinding = OutputBinding(missing=drop)
[docs]
loadContents = LoadContents(missing=drop)
[docs]
class CWLOutputType(OneOfKeywordSchema):
[docs]
_one_of = [
CWLTypeString(summary="Direct CWL type string specification."),
CWLTypeStringList(summary="List of allowed CWL type strings."),
CWLOutputObject(summary="CWL type definition with parameters."),
]
[docs]
class CWLOutputMap(ExtendedMappingSchema):
[docs]
output_id = CWLOutputType(variable="{output-id}", title="CWLOutputDefinition",
description=IO_INFO_IDS.format(first="CWL", second="WPS", what="output") +
" (Note: '{output-id}' is a variable corresponding for each identifier)")
[docs]
class CWLOutputItem(CWLOutputObject):
[docs]
id = AnyIdentifier(description=IO_INFO_IDS.format(first="CWL", second="WPS", what="output"))
[docs]
class CWLOutputList(ExtendedSequenceSchema):
[docs]
class CWLOutputsDefinition(OneOfKeywordSchema):
[docs]
_one_of = [
CWLOutputList(description="Package outputs defined as items."),
CWLOutputMap(description="Package outputs defined as mapping."),
]
[docs]
class CWLCommandParts(ExtendedSequenceSchema):
[docs]
cmd = ExtendedSchemaNode(String())
[docs]
class CWLCommand(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(String(), title="String command."),
CWLCommandParts(title="CommandParts", summary="Command Parts")
]
[docs]
class CWLVersion(Version):
[docs]
description = "CWL version of the described application package."
[docs]
validator = SemanticVersion(v_prefix=True, rc_suffix=False)
[docs]
class CWLIdentifier(ProcessIdentifier):
[docs]
description = (
"Reference to the process identifier. If CWL is provided within a process deployment payload, this can be "
"omitted. If used in a deployment with only CWL details, this information is required."
)
[docs]
class CWLIntentURL(URL):
[docs]
description = (
"Identifier URL to a concept for the type of computational operation accomplished by this Process "
"(see example operations: http://edamontology.org/operation_0004)."
)
[docs]
class CWLIntent(ExtendedSequenceSchema):
[docs]
class CWLBase(ExtendedMappingSchema):
[docs]
cwlVersion = CWLVersion()
[docs]
class CWLNamespaces(StrictMappingSchema):
"""
Mapping of :term:`CWL` namespace definitions for shorthand notation.
.. note::
Use a combination of `strict` mapping and ``variable`` (see ``var`` field) such that any additional namespace
other than the ones explicitly listed are allowed, but if provided, they must succeed URI validation minimally.
If no additional namespace is provided, including none at all, the mapping definition remains valid because
of ``missing=drop`` under ``var``. If a URI is invalid for additional namespaces, the failing validation causes
the property to be unmapped to the variable, which leads to an ``"unknown"`` property raised by the `strict`
mapping. For explicit URI definitions, the specific URI combinations provided must be matched exactly to
succeed. This ensures that no invalid mapping gets applied for commonly-known URI namespaces.
"""
[docs]
title = "CWL Namespaces Mapping"
[docs]
description = "Mapping of CWL namespace definitions for shorthand notation."
[docs]
var = URI(variable="{namespace}", missing=drop)
[docs]
cwl = URI(
missing=drop,
name=CWL_NAMESPACE_CWL_SPEC_ID,
validator=OneOf([CWL_NAMESPACE_CWL_SPEC_URL, CWL_NAMESPACE_CWL_SPEC_URL.rstrip("#")]),
)
[docs]
edam = URI(
missing=drop,
name=EDAM_NAMESPACE,
validator=OneOf([EDAM_NAMESPACE_URL, EDAM_NAMESPACE_URL.rstrip("#")]),
)
[docs]
iana = URI(
missing=drop,
name=IANA_NAMESPACE,
validator=OneOf([IANA_NAMESPACE_URL, IANA_NAMESPACE_URL.rstrip("#")]),
)
[docs]
ogc = URI(
missing=drop,
name=OGC_NAMESPACE,
validator=OneOf([OGC_NAMESPACE_URL, OGC_NAMESPACE_URL.rstrip("#")]),
)
[docs]
ogc_api_proc_part1 = URI(
missing=drop,
name=CWL_NAMESPACE_OGC_API_PROC_PART1_ID,
validator=OneOf([CWL_NAMESPACE_OGC_API_PROC_PART1_URL])
)
[docs]
opengis = URI(
missing=drop,
name=OPENGIS_NAMESPACE,
validator=OneOf([OPENGIS_NAMESPACE_URL, OPENGIS_NAMESPACE_URL.rstrip("#")]),
)
[docs]
s = URI(
missing=drop,
name=CWL_NAMESPACE_SCHEMA_ID,
validator=OneOf([CWL_NAMESPACE_SCHEMA_URL, CWL_NAMESPACE_SCHEMA_URL.rstrip("#")]),
)
[docs]
weaver = URI(
missing=drop,
name=CWL_NAMESPACE_WEAVER_ID,
validator=OneOf([CWL_NAMESPACE_WEAVER_URL, CWL_NAMESPACE_WEAVER_URL.rstrip("#")]),
)
[docs]
class CWLSchemas(ExtendedSequenceSchema):
[docs]
url = URL(title="CWLSchemaURL", description="Schema reference for the CWL definition.")
[docs]
class CWLPerson(PermissiveMappingSchema):
[docs]
_sort_first = [
"class",
CWL_NAMESPACE_SCHEMA_METADATA_IDENTIFIER,
CWL_NAMESPACE_SCHEMA_METADATA_EMAIL,
CWL_NAMESPACE_SCHEMA_METADATA_NAME,
]
[docs]
_class = ExtendedSchemaNode(
String(),
name="class",
validator=OneOf([CWL_NAMESPACE_SCHEMA_METADATA_PERSON]),
description="Author of the Application Package.",
)
[docs]
_id = URI(
name=CWL_NAMESPACE_SCHEMA_METADATA_IDENTIFIER,
description="Reference identifier of the person. Typically, an ORCID URI.",
missing=drop,
)
[docs]
name = ExtendedSchemaNode(
String(),
name=CWL_NAMESPACE_SCHEMA_METADATA_NAME,
description="Name of the person.",
missing=drop,
)
[docs]
email = ExtendedSchemaNode(
String(),
name=CWL_NAMESPACE_SCHEMA_METADATA_NAME,
description="Email of the person.",
missing=drop,
)
[docs]
class CWLAuthors(ExtendedSequenceSchema):
[docs]
validator = Length(min=1)
[docs]
class CWLDateCreated(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(
DateTime(),
name=CWL_NAMESPACE_SCHEMA_METADATA_DATE_CREATED,
format="date-time",
description="Date-time of creation in ISO-8601 format.",
)
]
[docs]
class CWLScatterMulti(ExtendedSequenceSchema):
[docs]
class CWLScatter(OneOfKeywordSchema):
[docs]
_one_of = [
CWLIdentifier(),
CWLScatterMulti()
]
[docs]
class CWLScatterMethod(ExtendedSchemaNode):
[docs]
description = (
"Describes how to decompose the scattered input into a discrete set of jobs. "
"When 'dotproduct', specifies that each of the input arrays are aligned and one element taken from each array"
"to construct each job. It is an error if all input arrays are of different length. "
"When 'nested_crossproduct', specifies the Cartesian product of the inputs, producing a job for every "
"combination of the scattered inputs. The output must be nested arrays for each level of scattering, "
"in the order that the input arrays are listed in the scatter field. "
"When 'flat_crossproduct', specifies the Cartesian product of the inputs, producing a job for every "
"combination of the scattered inputs. The output arrays must be flattened to a single level, but otherwise "
"listed in the order that the input arrays are listed in the scatter field."
)
[docs]
validator = OneOf(["dotproduct", "nested_crossproduct", "flat_crossproduct"])
[docs]
class CWLScatterDefinition(PermissiveMappingSchema):
[docs]
scatter = CWLScatter(missing=drop, description=(
"One or more input identifier of an application step within a Workflow were an array-based input to that "
"Workflow should be scattered across multiple instances of the step application."
))
[docs]
scatterMethod = CWLScatterMethod(missing=drop)
[docs]
class CWLWorkflowStepRunDefinition(AnyOfKeywordSchema):
[docs]
_any_of = [
AnyIdentifier(
title="CWLWorkflowSteprun_id",
description="Reference to local process ID, with or without '.cwl' extension."
),
CWLFileName(),
ProcessURL(),
# do not perform deeper validation other than checking there is a CWL 'class'
# cwltool should perform any relevant cross-reference check of other files when resolving the workflow
# Weaver runtime requirements (application package type) should be checked when reaching that step
CWLTool(),
]
[docs]
class LinkMergeMethod(ExtendedSchemaNode):
[docs]
title = "LinkMergeMethod"
[docs]
validator = OneOf(["merge_nested", "merge_flattened"])
[docs]
class PickValueMethod(ExtendedSchemaNode):
[docs]
title = "PickValueMethod"
[docs]
validator = OneOf(["first_non_null", "the_only_non_null", "all_non_null"])
[docs]
class CWLWorkflowStepOutputID(ExtendedSchemaNode):
[docs]
description = "Identifier of the tool's output to use as the step output."
[docs]
class CWLWorkflowStepOutputObject(PermissiveMappingSchema):
[docs]
id = CWLWorkflowStepOutputID()
[docs]
class CWLWorkflowStepOutputItem(OneOfKeywordSchema):
[docs]
_one_of = [
CWLWorkflowStepOutputID(),
CWLWorkflowStepOutputObject(),
]
[docs]
class CWLWorkflowStepOutputList(ExtendedSequenceSchema):
[docs]
out = CWLWorkflowStepOutputItem()
[docs]
validator = Length(min=1)
[docs]
class CWLWorkflowStepOutputDefinition(OneOfKeywordSchema):
[docs]
description = "Defines the parameters representing the output of the process."
[docs]
_one_of = [
CWLWorkflowStepOutputList(), # only list allowed
]
[docs]
class CWLWorkflowStepObject(CWLScatterDefinition, PermissiveMappingSchema):
[docs]
id = AnyIdentifier(description="Identifier of the step.", missing=drop)
[docs]
_in = CWLWorkflowStepInputDefinition(name="in")
[docs]
out = CWLWorkflowStepOutputDefinition()
[docs]
run = CWLWorkflowStepRunDefinition()
[docs]
requirements = CWLRequirements(missing=drop)
[docs]
hints = CWLHints(missing=drop)
[docs]
class CWLWorkflowStepItem(CWLWorkflowStepObject):
[docs]
id = AnyIdentifier(description="Identifier of the step.")
[docs]
class CWLWorkflowStepsMap(ExtendedMappingSchema):
[docs]
step_id = CWLWorkflowStepObject(variable="{step-id}", title="CWLWorkflowStep")
[docs]
class CWLWorkflowStepsList(ExtendedSequenceSchema):
[docs]
step = CWLWorkflowStepItem()
[docs]
class CWLWorkflowStepsDefinition(OneOfKeywordSchema):
[docs]
_one_of = [
CWLWorkflowStepsMap(),
CWLWorkflowStepsList(),
]
[docs]
class CWLApp(CWLTool, CWLScatterDefinition, PermissiveMappingSchema):
[docs]
id = CWLIdentifier(missing=drop) # can be omitted only if within a process deployment that also includes it
[docs]
intent = CWLIntent(missing=drop)
[docs]
requirements = CWLRequirements(description="Explicit requirement to execute the application package.", missing=drop)
[docs]
hints = CWLHints(description="Non-failing additional hints that can help resolve extra requirements.", missing=drop)
[docs]
baseCommand = CWLCommand(description="Command called in the docker image or on shell according to requirements "
"and hints specifications. Can be omitted if already defined in the "
"docker image.", missing=drop)
[docs]
arguments = CWLArguments(description="Base arguments passed to the command.", missing=drop)
[docs]
outputs = CWLOutputsDefinition(description="All outputs produced by the Application Package.")
[docs]
steps = CWLWorkflowStepsDefinition(missing=drop)
[docs]
class CWL(CWLBase, CWLMetadata, CWLApp):
[docs]
_sort_first = ["$schema"] + CWLMetadata._sort_first
[docs]
_sort_after = ["requirements", "hints", "inputs", "outputs", "steps", "$graph", "$namespaces", "$schemas"]
[docs]
_schema = CWL_SCHEMA_URL
[docs]
class ExecutionUnitCWL(CWL):
[docs]
description = f"Execution unit definition as CWL package specification. {CWL_DOC_MESSAGE}"
# use 'strict' base schema such that it can contain only 'unit', nothing else permitted to reduce oneOf conflicts
[docs]
class ExecutionUnitNested(StrictMappingSchema):
[docs]
unit = ExecutionUnitCWL()
[docs]
type = MediaType(
description="IANA identifier of content-type represented by the unit definition.",
missing=drop,
validator=OneOf(ContentType.ANY_CWL),
)
[docs]
class ProviderSummaryList(ExtendedSequenceSchema):
[docs]
provider_service = ProviderSummarySchema()
[docs]
class ProviderNamesList(ExtendedSequenceSchema):
[docs]
provider_name = ProviderNameSchema()
[docs]
class ProviderListing(OneOfKeywordSchema):
[docs]
_one_of = [
ProviderSummaryList(description="Listing of provider summary details retrieved from remote service."),
ProviderNamesList(description="Listing of provider names, possibly unvalidated from remote service.",
missing=drop), # in case of empty list, both schema are valid, drop this one to resolve
]
[docs]
class ProvidersBodySchema(ExtendedMappingSchema):
[docs]
checked = ExtendedSchemaNode(
Boolean(),
description="Indicates if the listed providers have been validated and are accessible from registered URL. "
"In such case, provider metadata was partially retrieved from remote services and is accessible. "
"Otherwise, only local metadata is provided and service availability is not guaranteed."
)
[docs]
providers = ProviderListing(description="Providers listing according to specified query parameters.")
[docs]
class ProviderProcessesSchema(ExtendedSequenceSchema):
[docs]
provider_process = ProcessSummary()
[docs]
class JobOutputReference(ExtendedMappingSchema):
[docs]
href = ReferenceURL(description="Output file reference.")
# either with 'type', 'format.mediaType' or 'format.mimeType' according requested 'schema=OGC/OLD'
# if 'schema=strict' as well, either 'type' or 'format' could be dropped altogether
[docs]
type = MediaType(missing=drop, description="IANA Content-Type of the file reference.")
[docs]
class JobOutputArrayReference(ExtendedSequenceSchema):
[docs]
item = JobOutputReference()
[docs]
class JobOutputQualifiedValueLiteral(Format):
[docs]
value = AnyLiteralType()
[docs]
class JobOutputQualifiedDataLiteral(Format):
[docs]
data = AnyLiteralType()
[docs]
class JobOutputLiteral(OneOfKeywordSchema):
[docs]
_one_of = [
# AnyLiteralType(), # NOTE: purposely omit value inline, always embed in 'value' or 'data' for job outputs
JobOutputQualifiedDataLiteral(),
JobOutputQualifiedValueLiteral(),
]
[docs]
class JobOutputArrayLiteral(ExtendedSequenceSchema):
[docs]
item = JobOutputLiteral()
[docs]
class JobOutputValueObject(OneOfKeywordSchema):
[docs]
_one_of = [
JobOutputReference(),
JobOutputLiteral(),
# array possible since nested object under 'id'
JobOutputArrayReference(),
JobOutputArrayLiteral(),
]
[docs]
class JobOutputMap(ExtendedMappingSchema):
[docs]
output_id = JobOutputValueObject(
variable="{output-id}", title="JobOutputData",
description=(
"Output data as literal value or file reference. "
"(Note: '{output-id}' is a variable corresponding for each identifier)"
)
)
[docs]
class JobOutputFields(OneOfKeywordSchema):
[docs]
_one_of = [
JobOutputReference(),
JobOutputQualifiedDataLiteral(),
JobOutputQualifiedValueLiteral(),
]
[docs]
class JobOutputItem(AllOfKeywordSchema):
[docs]
_all_of = [
OutputIdentifierType(),
JobOutputFields(), # cannot be an array directly, since 'id' field needed in this representation
]
[docs]
class JobOutputList(ExtendedSequenceSchema):
[docs]
title = "JobOutputList"
[docs]
output = JobOutputItem(description="Job output result with specific keyword according to represented format.")
[docs]
class JobOutputs(OneOfKeywordSchema):
[docs]
description = "Job outputs with many alternate representations according to the specified 'schema' query parameter."
[docs]
_one_of = [
JobOutputMap(),
JobOutputList(),
]
# implement only literal parts from following schemas:
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/inputValueNoObject.yaml
# https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/inlineOrRefData.yaml
# Other parts are implemented separately with:
# - 'ValueFormatted' (qualifiedInputValue)
# - 'ResultBoundingBox' (bbox)
# - 'ResultReference' (link)
[docs]
class ResultLiteral(AnyLiteralType):
...
[docs]
class ResultLiteralList(ExtendedSequenceSchema):
[docs]
result = ResultLiteral()
[docs]
class ResultBoundingBox(BoundingBoxObject):
[docs]
_schema_include_deserialize = False
[docs]
class ResultReference(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/link.yaml"
[docs]
_schema_include_deserialize = False
[docs]
href = ReferenceURL(description="Result file reference.")
[docs]
type = MediaType(description="IANA Content-Type of the file reference.")
[docs]
class ResultReferenceList(ExtendedSequenceSchema):
[docs]
result = ResultReference()
[docs]
class ResultData(OneOfKeywordSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/inlineOrRefData.yaml"
[docs]
_one_of = [
# must place formatted value first since both value/format fields are simultaneously required
# other classes require only one of the two, and therefore are more permissive during schema validation
ValueFormatted(description="Result formatted content value."),
ValueFormattedList(description="Result formatted content of multiple values."),
ResultBoundingBox(description="Result bounding box value."),
ResultReference(description="Result reference location."),
ResultReferenceList(description="Result locations for multiple references."),
ResultLiteral(description="Result literal value."),
ResultLiteralList(description="Result list of literal values."),
]
[docs]
class ResultsDocument(ExtendedMappingSchema):
[docs]
description = "Results representation as JSON document."
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/results.yaml"
[docs]
output_id = ResultData(
variable="{output-id}", title="ResultData",
description=(
"Resulting value of the output that conforms to 'OGC API - Processes' standard. "
"(Note: '{output-id}' is a variable corresponding for each output identifier of the process)"
)
)
[docs]
class ResultsContent(ExtendedSchemaNode):
[docs]
description = "Results representation as literal contents."
[docs]
schema_type = String()
[docs]
class ResultsBody(OneOfKeywordSchema):
[docs]
description = "Results obtained from a successful process job execution."
[docs]
_one_of = [
ResultsDocument(),
ResultsContent(),
]
[docs]
class JobInputsBody(ExecuteInputOutputs):
# note:
# following definitions do not employ 'missing=drop' to explicitly indicate the fields
# this makes it easier to consider everything that could be implied when executing the job
[docs]
mode = JobExecuteModeEnum(default=ExecuteMode.AUTO)
[docs]
response = JobResponseOptionsEnum(default=None)
[docs]
headers = JobExecuteHeaders(missing={}, default={})
[docs]
links = LinkList(missing=drop)
[docs]
class JobOutputsBody(ExtendedMappingSchema):
[docs]
outputs = JobOutputs()
[docs]
links = LinkList(missing=drop)
[docs]
class JobExceptionPlain(ExtendedSchemaNode):
[docs]
description = "Generic exception description corresponding to any error message."
[docs]
class JobExceptionDetailed(ExtendedMappingSchema):
[docs]
description = "Fields correspond exactly to 'owslib.wps.WPSException' represented as dictionary."
[docs]
Code = ExtendedSchemaNode(String())
[docs]
Locator = ExtendedSchemaNode(String(), default=None)
[docs]
Text = ExtendedSchemaNode(String())
[docs]
class JobException(OneOfKeywordSchema):
[docs]
_one_of = [
JobExceptionDetailed(),
JobExceptionPlain()
]
[docs]
class JobExceptionsSchema(ExtendedSequenceSchema):
[docs]
exceptions = JobException()
[docs]
class JobLogsSchema(ExtendedSequenceSchema):
[docs]
log = ExtendedSchemaNode(String())
[docs]
class ApplicationStatisticsSchema(ExtendedMappingSchema):
[docs]
mem = ExtendedSchemaNode(String(), name="usedMemory", example="10 MiB")
[docs]
mem_bytes = ExtendedSchemaNode(Integer(), name="usedMemoryBytes", example=10485760)
[docs]
class ProcessStatisticsSchema(ExtendedMappingSchema):
[docs]
uss = ExtendedSchemaNode(String(), name="uss", example="80 MiB")
[docs]
uss_bytes = ExtendedSchemaNode(Integer(), name="ussBytes", example=83886080)
[docs]
vms = ExtendedSchemaNode(String(), name="vms", example="1.4 GiB")
[docs]
vms_bytes = ExtendedSchemaNode(Integer(), name="vmsBytes", example=1503238554)
[docs]
used_threads = ExtendedSchemaNode(Integer(), name="usedThreads", example=10)
[docs]
used_cpu = ExtendedSchemaNode(Integer(), name="usedCPU", example=2)
[docs]
used_handles = ExtendedSchemaNode(Integer(), name="usedHandles", example=0)
[docs]
mem = ExtendedSchemaNode(String(), name="usedMemory", example="10 MiB",
description="RSS memory employed by the job execution omitting worker memory.")
[docs]
mem_bytes = ExtendedSchemaNode(Integer(), name="usedMemoryBytes", example=10485760,
description="RSS memory employed by the job execution omitting worker memory.")
[docs]
total_size = ExtendedSchemaNode(String(), name="totalSize", example="10 MiB",
description="Total size to store job output files.")
[docs]
total_size_bytes = ExtendedSchemaNode(Integer(), name="totalSizeBytes", example=10485760,
description="Total size to store job output files.")
[docs]
class OutputStatisticsSchema(ExtendedMappingSchema):
[docs]
size = ExtendedSchemaNode(String(), name="size", example="5 MiB")
[docs]
size_bytes = ExtendedSchemaNode(Integer(), name="sizeBytes", example=5242880)
[docs]
class OutputStatisticsMap(ExtendedMappingSchema):
[docs]
output = OutputStatisticsSchema(variable="{output-id}", description="Spaced used by this output file.")
[docs]
class JobStatisticsSchema(ExtendedMappingSchema):
[docs]
application = ApplicationStatisticsSchema(missing=drop)
[docs]
process = ProcessStatisticsSchema(missing=drop)
[docs]
outputs = OutputStatisticsMap(missing=drop)
[docs]
class FrontpageParameterSchema(ExtendedMappingSchema):
[docs]
name = ExtendedSchemaNode(String(), example="api")
[docs]
enabled = ExtendedSchemaNode(Boolean(), example=True)
[docs]
url = URL(
description="Referenced parameter endpoint. Root URL when the functionality implies multiple endpoints.",
example="https://weaver-host",
missing=drop,
)
[docs]
doc = ExtendedSchemaNode(
String(),
description="Endpoint where additional documentation can be found about the functionality.",
example="https://weaver-host/api",
missing=drop
)
[docs]
api = URL(
String(),
description="OpenAPI documentation endpoint about the functionality.",
example="https://weaver-host/api",
missing=drop,
)
[docs]
class FrontpageParameters(ExtendedSequenceSchema):
[docs]
parameter = FrontpageParameterSchema()
[docs]
class FrontpageSchema(LandingPage, DescriptionSchema):
[docs]
_schema = f"{OGC_API_COMMON_PART1_SCHEMAS}/landingPage.json"
[docs]
_sort_first = ["title", "configuration", "message", "description", "attribution"]
[docs]
_sort_after = ["parameters", "links"]
[docs]
title = ExtendedSchemaNode(String(), default="Weaver", example="Weaver")
[docs]
message = ExtendedSchemaNode(String(), default="Weaver Information", example="Weaver Information")
[docs]
configuration = ExtendedSchemaNode(String(), default="default", example="default")
[docs]
attribution = ExtendedSchemaNode(String(), description="Short representation of the API maintainers.")
[docs]
parameters = FrontpageParameters()
[docs]
class OpenAPISpecSchema(ExtendedMappingSchema):
# "http://json-schema.org/draft-04/schema#"
[docs]
_schema = "https://spec.openapis.org/oas/3.0/schema/2021-09-28"
[docs]
class SwaggerUISpecSchema(ExtendedMappingSchema):
pass
[docs]
class VersionsSpecSchema(ExtendedMappingSchema):
[docs]
name = ExtendedSchemaNode(String(), description="Identification name of the current item.", example="weaver")
[docs]
type = ExtendedSchemaNode(String(), description="Identification type of the current item.", example="api")
[docs]
version = Version(description="Version of the current item.", example="0.1.0")
[docs]
class VersionsList(ExtendedSequenceSchema):
[docs]
version = VersionsSpecSchema()
[docs]
class VersionsSchema(ExtendedMappingSchema):
[docs]
versions = VersionsList()
#################################################################
# Local Processes schemas
#################################################################
[docs]
class PackageBody(ExtendedMappingSchema):
pass
[docs]
class ExecutionUnit(OneOfKeywordSchema):
[docs]
title = "ExecutionUnit"
[docs]
description = "Definition of the Application Package to execute."
[docs]
_one_of = [
Reference(name="Reference", title="Reference", description="Execution Unit reference."),
ExecutionUnitCWL(name="Unit", title="Unit", description="Execution Unit definition directly provided."),
ExecutionUnitNested(
name="UnitNested",
title="UnitNested",
description="Execution Unit definition nested under a 'unit' property.",
),
]
[docs]
class ExecutionUnitList(ExtendedSequenceSchema):
[docs]
item = ExecutionUnit(name="ExecutionUnit")
[docs]
validator = Length(min=1, max=1)
[docs]
class ProcessDeploymentWithContext(ProcessDeployment):
[docs]
description = "Process deployment with OWS Context reference."
[docs]
owsContext = OWSContext(missing=required)
[docs]
class ProcessVersionField(ExtendedMappingSchema):
[docs]
processVersion = Version(
title="processVersion", missing=drop, deprecated=True,
description="Old method of specifying the process version, prefer 'version' under 'process'."
)
[docs]
class DeployProcessOfferingContext(ProcessControl, ProcessVersionField):
# alternative definition to make 'executionUnit' optional if instead provided through 'owsContext'
# case where definition is nested under 'processDescription.process'
[docs]
process = ProcessDeploymentWithContext(
description="Process definition nested under process field for backward compatibility."
)
[docs]
class DeployProcessDescriptionContext(NotKeywordSchema, ProcessDeploymentWithContext, ProcessControl):
# alternative definition to make 'executionUnit' optional if instead provided through 'owsContext'
# case where definition is directly under 'processDescription'
[docs]
_not = [
Reference() # avoid conflict with deploy by href
]
[docs]
class DeployProcessContextChoiceType(OneOfKeywordSchema):
[docs]
_one_of = [
DeployProcessOfferingContext(),
DeployProcessDescriptionContext(),
]
[docs]
class DeployProcessOffering(ProcessControl, ProcessVersionField):
[docs]
process = ProcessDeployment(description="Process definition nested under process field for backward compatibility.")
[docs]
class DeployProcessDescription(NotKeywordSchema, ProcessDeployment, ProcessControl):
[docs]
_not = [
Reference() # avoid conflict with deploy by href
]
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/process.yaml"
[docs]
description = "Process description fields directly provided."
[docs]
class DeployReference(Reference):
[docs]
id = ProcessIdentifier(missing=drop, description=(
"Optional identifier of the specific process to obtain the description from in case the reference URL "
"corresponds to an endpoint that can refer to multiple process definitions (e.g.: GetCapabilities)."
))
[docs]
class ProcessDescriptionChoiceType(OneOfKeywordSchema):
[docs]
_one_of = [
DeployReference(),
DeployProcessOffering(),
DeployProcessDescription(),
]
# combination of following 2 references:
# https://github.com/opengeospatial/ogcapi-processes/blob/8c41db3f/openapi/schemas/processes-dru/ogcapppkg.yaml
# https://github.com/opengeospatial/ogcapi-processes/blob/8c41db3f/openapi/schemas/processes-dru/ogcapppkg-array.yaml
# but omitting the unsupported generic properties (which can be mapped to equivalent CWL requirements):
# https://github.com/opengeospatial/ogcapi-processes/blob/8c41db3f/openapi/schemas/processes-dru/executionUnit.yaml
[docs]
class ExecutionUnitVariations(OneOfKeywordSchema):
[docs]
_one_of = [
# each 'ExecutionUnit', either individually or within an array,
# already combines the JSON unit (nested under 'unit' or directly) vs href link representations
ExecutionUnit(),
ExecutionUnitList(),
]
[docs]
class ExecutionUnitDefinition(ExtendedMappingSchema):
[docs]
executionUnit = ExecutionUnitVariations()
[docs]
class DeployParameters(ExtendedMappingSchema):
[docs]
deploymentProfileName = URL(missing=drop)
[docs]
class DeployOGCAppPackage(NotKeywordSchema, ExecutionUnitDefinition, DeployParameters):
[docs]
description = "Deployment using standard OGC Application Package definition."
[docs]
_schema = f"{OGC_API_SCHEMA_EXT_DEPLOY}/ogcapppkg.yaml"
[docs]
processDescription = ProcessDescriptionChoiceType()
[docs]
class DeployContextDefinition(NotKeywordSchema, DeployParameters):
# alternative definition to make 'executionUnit' optional if instead provided in 'owsContext' (somewhere nested)
[docs]
_not = [
CWLBase(),
ExecutionUnitDefinition(),
]
[docs]
processDescription = DeployProcessContextChoiceType()
[docs]
class CWLGraphItem(CWLApp): # no 'cwlVersion', only one at the top
[docs]
id = CWLIdentifier() # required in this case
[docs]
class CWLGraphList(ExtendedSequenceSchema):
# FIXME: supported nested and $graph multi-deployment (https://github.com/crim-ca/weaver/issues/56)
[docs]
class CWLGraphBase(ExtendedMappingSchema):
[docs]
graph = CWLGraphList(
name="$graph", description=(
"Graph definition that defines *exactly one* CWL Application Package represented as list. "
"Multiple definitions simultaneously deployed is NOT supported currently."
# "Graph definition that combines one or many CWL Application Packages within a single payload. "
# "If a single application is given (list of one item), it will be deployed as normal CWL by itself. "
# "If multiple applications are defined, the first MUST be the top-most Workflow process. "
# "Deployment of other items will be performed, and the full deployment will be persisted only if all are "
# "valid. The resulting Workflow will be registered as a package by itself (i.e: not as a graph)."
),
validator=Length(min=1, max=1)
)
[docs]
class UpdateVersion(ExtendedMappingSchema):
[docs]
version = Version(missing=drop, example="1.2.3", description=(
"Explicit version to employ for initial or updated process definition. "
"Must not already exist and must be greater than the latest available semantic version for the "
"corresponding version level according to the applied update operation. "
"For example, if only versions '1.2.3' and '1.3.1' exist, the submitted version can be anything before "
"version '1.2.0' excluding it (i.e.: '1.1.X', '0.1.2', etc.), between '1.2.4' and '1.3.0' exclusively, or "
"'1.3.2' and anything above. If no version is provided, the next *patch* level after the current process "
"version is applied. If the current process did not define any version, it is assumed '0.0.0' and this patch"
"will use '0.0.1'. The applicable update level (MAJOR, MINOR, PATCH) depends on the operation being applied. "
"As a rule of thumb, if changes affect only metadata, PATCH is required. If changes affect parameters or "
"execution method of the process, but not directly its entire definition, MINOR is required. If the process "
"must be completely redeployed due to application redefinition, MAJOR is required."
))
[docs]
class DeployCWLGraph(CWLBase, CWLGraphBase, UpdateVersion):
pass
[docs]
class DeployCWL(NotKeywordSchema, CWL, UpdateVersion):
[docs]
_not = [
CWLGraphBase()
]
[docs]
id = CWLIdentifier() # required in this case, cannot have a version directly as tag, use 'version' field instead
[docs]
class DeployOGCRemoteProcess(ExtendedMappingSchema):
[docs]
id = ProcessIdentifier(missing=drop, description=(
"Optional identifier for the new process to deploy. "
"If not provided, the ID inferred from the specified OGC API - Processes endpoint is reused."
))
[docs]
class Deploy(OneOfKeywordSchema):
[docs]
_one_of = [
DeployOGCRemoteProcess(),
DeployOGCAppPackage(),
DeployContextDefinition(),
DeployCWL(),
DeployCWLGraph(),
]
[docs]
class DeployContentType(ContentTypeHeader):
[docs]
example = ContentType.APP_JSON
[docs]
default = ContentType.APP_JSON
[docs]
validator = OneOf([
ContentType.APP_JSON,
ContentType.APP_CWL,
ContentType.APP_CWL_JSON,
ContentType.APP_CWL_YAML,
ContentType.APP_CWL_X,
ContentType.APP_OGC_PKG_JSON,
ContentType.APP_OGC_PKG_YAML,
ContentType.APP_YAML,
])
[docs]
class PostProcessesEndpoint(ExtendedMappingSchema):
[docs]
querystring = FormatQuery()
[docs]
body = Deploy(title="Deploy", examples={
"DeployCWL": {
"summary": "Deploy a process from a CWL definition.",
"value": EXAMPLES["deploy_process_cwl.json"],
},
"DeployOGC": {
"summary": "Deploy a process from an OGC Application Package definition.",
"value": EXAMPLES["deploy_process_ogcapppkg.json"],
},
"DeployWPS": {
"summary": "Deploy a process from a remote WPS-1 reference URL.",
"value": EXAMPLES["deploy_process_wps1.json"],
}
})
[docs]
class PatchProcessBodySchema(UpdateVersion):
[docs]
title = ExtendedSchemaNode(String(), missing=drop, description=(
"New title to override current one. "
"Minimum required change version level: PATCH."
))
[docs]
description = ExtendedSchemaNode(String(), missing=drop, description=(
"New description to override current one. "
"Minimum required change version level: PATCH."
))
[docs]
keywords = KeywordList(missing=drop, description=(
"Keywords to add (append) to existing definitions. "
"To remove all keywords, submit an empty list. "
"To replace keywords, perform two requests, one with empty list and the following one with new definitions. "
"Minimum required change version level: PATCH."
))
[docs]
metadata = MetadataList(missing=drop, description=(
"Metadata to add (append) to existing definitions. "
"To remove all metadata, submit an empty list. "
"To replace metadata, perform two requests, one with empty list and the following one with new definitions. "
"Relations must be unique across existing and new submitted metadata. "
"Minimum required change version level: PATCH."
))
[docs]
links = LinkList(missing=drop, description=(
"Links to add (append) to existing definitions. Relations must be unique. "
"To remove all (additional) links, submit an empty list. "
"To replace links, perform two requests, one with empty list and the following one with new definitions. "
"Note that modifications to links only considers custom links. Other automatically generated links such as "
"API endpoint and navigation references cannot be removed or modified. "
"Relations must be unique across existing and new submitted links. "
"Minimum required change version level: PATCH."
))
[docs]
inputs = UpdateInputOutputDefinition(missing=drop, description=(
"Update details of individual input elements. "
"Minimum required change version levels are the same as process-level fields of corresponding names."
))
[docs]
outputs = UpdateInputOutputDefinition(missing=drop, description=(
"Update details of individual output elements. "
"Minimum required change version levels are the same as process-level fields of corresponding names."
))
[docs]
jobControlOptions = JobControlOptionsList(missing=drop, description=(
"New job control options supported by this process for its execution. "
"All desired job control options must be provided (full override, not appending). "
"Order is important to define the default behaviour (first item) to use when unspecified during job execution. "
"Minimum required change version level: MINOR."
))
[docs]
outputTransmission = TransmissionModeList(missing=drop, description=(
"New output transmission methods supported following this process execution. "
"All desired output transmission modes must be provided (full override, not appending). "
"Minimum required change version level: MINOR."
))
[docs]
visibility = VisibilityValue(missing=drop, description=(
"New process visibility. "
"Minimum required change version level: MINOR."
))
[docs]
class PutProcessBodySchema(Deploy):
[docs]
description = "Process re-deployment using an updated version and definition."
[docs]
class PatchProcessEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
body = PatchProcessBodySchema()
[docs]
class PutProcessEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
body = PutProcessBodySchema()
[docs]
class PostJobsEndpointJSON(ExtendedMappingSchema):
[docs]
querystring = LocalProcessQuery()
[docs]
class PostProcessJobsEndpointJSON(PostJobsEndpointJSON, LocalProcessPath):
pass
[docs]
class PostJobsEndpointXML(ExtendedMappingSchema):
[docs]
querystring = LocalProcessQuery()
[docs]
body = WPSExecutePost(
# very important to override 'name' in this case
# original schema uses it to specify the XML class name
# in this context, it is used to define the 'in' location of this schema to form 'requestBody' in OpenAPI
name="body",
examples={
"ExecuteXML": {
"summary": "Execute a process job using WPS-like XML payload.",
"value": EXAMPLES["wps_execute_request.xml"],
}
}
)
[docs]
class PostProcessJobsEndpointXML(PostJobsEndpointXML, LocalProcessPath):
pass
[docs]
class JobTitleNullable(OneOfKeywordSchema):
[docs]
description = "Job title to update, or unset if 'null'."
[docs]
_one_of = [
JobTitle(),
ExtendedSchemaNode(NoneType(), name="null"), # allow explicit 'title: null' to unset a predefined title
]
[docs]
class PatchJobBodySchema(ExecuteProcessParameters):
[docs]
description = "Execution request contents to be updated."
# 'missing=null' ensures that, if a field is provided with an "empty" definition (JSON null, no-field dict, etc.),
# contents are passed down as is rather than dropping them (what 'missing=drop' would do due to DropableSchemaNode)
# this is to allow "unsetting" any values that could have been defined during job creation or previous updates
[docs]
title = JobTitleNullable(missing=null)
[docs]
mode = JobExecuteModeEnum(missing=drop, deprecated=True) # override without default 'auto'
[docs]
subscribers = JobExecuteSubscribers(missing=null)
# all parameters that are not 'missing=drop' in original 'Execute' definition must be added to allow partial update
[docs]
inputs = ExecuteInputValues(missing=drop, description="Input values or references to be updated.")
[docs]
outputs = ExecuteOutputSpec(missing=drop, description="Output format and transmission mode to be updated.")
[docs]
class PatchJobEndpoint(JobPath):
[docs]
summary = "Execution request parameters to be updated."
[docs]
description = (
"Execution request parameters to be updated. "
"If parameters are omitted, they will be left unmodified. "
"If provided, parameters will override existing definitions integrally. "
"Therefore, if only a partial update of certain nested elements in a mapping or list is desired, "
"all elements under the corresponding parameters must be resubmitted entirely with the applied changes. "
"In the case of certain parameters, equivalent definitions can cause conflicting definitions between "
"headers and contents "
f"(see for more details: {DOC_URL}/processes.html#execution-body and {DOC_URL}/processes.html#execution-mode). "
"To verify the resulting parameterization of any pending job, consider using the `GET /jobs/{jobId}/inputs`."
)
[docs]
querystring = LocalProcessQuery()
[docs]
body = PatchJobBodySchema()
[docs]
class PatchProcessJobEndpoint(JobPath, ProcessEndpoint):
[docs]
body = PatchJobBodySchema()
[docs]
class PatchProviderJobEndpoint(PatchProcessJobEndpoint):
[docs]
class PagingQueries(ExtendedMappingSchema):
[docs]
page = ExtendedSchemaNode(Integer(allow_string=True), missing=0, default=0, validator=Range(min=0))
[docs]
limit = ExtendedSchemaNode(Integer(allow_string=True), missing=10, default=10, validator=Range(min=1, max=1000),
schema=f"{OGC_API_PROC_PART1_PARAMETERS}/limit.yaml")
[docs]
class GetJobsQueries(PagingQueries):
# note:
# This schema is also used to generate any missing defaults during filter parameter handling.
# Items with default value are added if omitted, except 'default=null' which are removed after handling by alias.
[docs]
detail = ExtendedSchemaNode(QueryBoolean(), default=False, example=True, missing=drop,
description="Provide job details instead of IDs.")
[docs]
groups = JobGroupsCommaSeparated()
[docs]
min_duration = ExtendedSchemaNode(
Integer(allow_string=True), name="minDuration", missing=drop, default=null, validator=Range(min=0),
schema=f"{OGC_API_PROC_PART1_PARAMETERS}/minDuration.yaml",
description="Minimal duration (seconds) between started time and current/finished time of jobs to find.")
[docs]
max_duration = ExtendedSchemaNode(
Integer(allow_string=True), name="maxDuration", missing=drop, default=null, validator=Range(min=0),
schema=f"{OGC_API_PROC_PART1_PARAMETERS}/maxDuration.yaml",
description="Maximum duration (seconds) between started time and current/finished time of jobs to find.")
[docs]
datetime = DateTimeInterval(missing=drop, default=None)
[docs]
status = JobStatusSearchEnum(description="One of more comma-separated statuses to filter jobs.",
missing=drop, default=None)
[docs]
processID = ProcessIdentifierTag(missing=drop, default=null,
schema=f"{OGC_API_PROC_PART1_PARAMETERS}/processIdQueryParam.yaml",
description="Alias to 'process' for OGC-API compliance.")
[docs]
process = ProcessIdentifierTag(missing=drop, default=None,
description="Identifier and optional version tag of the process to filter search.")
[docs]
version = Version(
missing=drop, default=None, example="0.1.0", description=(
"Version of the 'process' or 'processID' query parameters. "
"If version is provided, those query parameters should specify the ID without tag."
)
)
[docs]
service = AnyIdentifier(missing=drop, default=null, description="Alias to 'provider' for backward compatibility.")
[docs]
provider = AnyIdentifier(missing=drop, default=None, description="Identifier of service provider to filter search.")
[docs]
type = JobTypeEnum(missing=drop, default=null,
description="Filter jobs only to matching type (note: 'service' and 'provider' are aliases).")
[docs]
sort = JobSortEnum(missing=drop)
[docs]
access = JobAccess(missing=drop, default=None)
[docs]
class GetProcessJobsQuery(LocalProcessQuery, GetJobsQueries):
pass
[docs]
class GetProviderJobsQueries(GetJobsQueries): # ':version' not allowed for process ID in this case
pass
[docs]
class GetJobsEndpoint(ExtendedMappingSchema):
[docs]
querystring = GetProcessJobsQuery() # allowed version in this case since can be either local or remote processes
[docs]
class GetProcessJobsEndpoint(LocalProcessPath):
[docs]
querystring = GetProcessJobsQuery()
[docs]
class GetProviderJobsEndpoint(ProviderProcessPath):
[docs]
querystring = GetProviderJobsQueries()
[docs]
class JobIdentifierList(ExtendedSequenceSchema):
[docs]
job_id = UUID(description="ID of a job to dismiss. Identifiers not matching any known job are ignored.")
[docs]
class DeleteJobsBodySchema(ExtendedMappingSchema):
[docs]
jobs = JobIdentifierList()
[docs]
class DeleteJobsEndpoint(ExtendedMappingSchema):
[docs]
body = DeleteJobsBodySchema()
[docs]
class DeleteProcessJobsEndpoint(DeleteJobsEndpoint, LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class DeleteProviderJobsEndpoint(DeleteJobsEndpoint, ProviderProcessPath):
pass
[docs]
class GetProcessJobQuery(LocalProcessQuery, GetJobQuery):
pass
[docs]
class GetProcessJobEndpoint(LocalProcessPath):
[docs]
querystring = GetProcessJobQuery()
[docs]
class DeleteJobEndpoint(JobPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class DeleteProcessJobEndpoint(LocalProcessPath):
[docs]
class DeleteProviderJobEndpoint(DeleteProcessJobEndpoint, ProviderProcessPath):
pass
[docs]
class BillsEndpoint(ExtendedMappingSchema):
[docs]
class BillEndpoint(BillPath):
[docs]
class ProcessQuotesEndpoint(LocalProcessPath):
[docs]
querystring = LocalProcessQuery()
[docs]
class ProcessQuoteEndpoint(LocalProcessPath, QuotePath):
[docs]
querystring = LocalProcessQuery()
[docs]
class GetQuotesQueries(PagingQueries):
[docs]
process = AnyIdentifier(missing=None)
[docs]
sort = QuoteSortEnum(missing=drop)
[docs]
class QuotesEndpoint(ExtendedMappingSchema):
[docs]
querystring = GetQuotesQueries()
[docs]
class QuoteEndpoint(QuotePath):
[docs]
class PostProcessQuote(LocalProcessPath, QuotePath):
[docs]
querystring = LocalProcessQuery()
[docs]
body = NoContent()
[docs]
class PostQuote(QuotePath):
[docs]
body = NoContent()
[docs]
class PostProcessQuoteRequestEndpoint(LocalProcessPath, QuotePath):
[docs]
querystring = LocalProcessQuery()
[docs]
body = QuoteProcessParametersSchema()
# ################################################################
# Provider Processes schemas
# ################################################################
[docs]
class ProvidersQuerySchema(ExtendedMappingSchema):
[docs]
detail = ExtendedSchemaNode(
QueryBoolean(), example=True, default=True, missing=drop,
description="Return summary details about each provider, or simply their IDs."
)
[docs]
check = ExtendedSchemaNode(
QueryBoolean(),
example=True, default=True, missing=drop,
description="List only reachable providers, dropping unresponsive ones that cannot be checked for listing. "
"Otherwise, all registered providers are listed regardless of their availability. When requesting "
"details, less metadata will be provided since it will not be fetched from remote services."
)
[docs]
ignore = ExtendedSchemaNode(
QueryBoolean(), example=True, default=True, missing=drop,
description="When listing providers with check of reachable remote service definitions, unresponsive response "
"or unprocessable contents will be silently ignored and dropped from full listing in the response. "
"Disabling this option will raise an error immediately instead of ignoring invalid services."
)
[docs]
class GetProviders(ExtendedMappingSchema):
[docs]
querystring = ProvidersQuerySchema()
[docs]
class PostProvider(ExtendedMappingSchema):
[docs]
body = CreateProviderRequestBody()
[docs]
class ProcessDetailQuery(ExtendedMappingSchema):
[docs]
detail = ExtendedSchemaNode(
QueryBoolean(), example=True, default=True, missing=drop,
description="Return summary details about each process, or simply their IDs."
)
[docs]
class ProcessLinksQuery(ExtendedMappingSchema):
[docs]
links = ExtendedSchemaNode(
QueryBoolean(), example=True, default=True, missing=True,
description="Return summary details with included links for each process."
)
[docs]
class ProcessRevisionsQuery(ExtendedMappingSchema):
[docs]
process = ProcessIdentifier(missing=drop, description=(
"Process ID (excluding version) for which to filter results. "
"When combined with 'revisions=true', allows listing of all reversions of a given process. "
"If omitted when 'revisions=true', all revisions of every process ID will be returned. "
"If used without 'revisions' query, list should include a single process as if summary was requested directly."
))
[docs]
revisions = ExtendedSchemaNode(
QueryBoolean(), example=True, default=False, missing=drop, description=(
"Return all revisions of processes, or simply their latest version. When returning all revisions, "
"IDs will be replaced by '{processID}:{version}' tag representation to avoid duplicates."
)
)
[docs]
class ProviderProcessesQuery(ProcessPagingQuery, ProcessDetailQuery, ProcessLinksQuery):
pass
[docs]
class ProviderProcessesEndpoint(ProviderPath):
[docs]
querystring = ProviderProcessesQuery()
[docs]
class GetProviderProcess(ExtendedMappingSchema):
[docs]
class PostProviderProcessJobRequest(ExtendedMappingSchema):
"""
Launching a new process request definition.
"""
[docs]
querystring = LaunchJobQuerystring()
# ################################################################
# Responses schemas
# ################################################################
[docs]
class GenericHTMLResponse(ExtendedMappingSchema):
"""
Generic HTML response.
"""
[docs]
body = ExtendedMappingSchema()
def __new__(cls, *, name, description, **kwargs): # pylint: disable=W0221
# type: (Type[GenericHTMLResponse], *Any, str, str, **Any) -> GenericHTMLResponse
"""
Generates a derived HTML response schema with direct forwarding of custom parameters to the body's schema.
This strategy allows the quicker definition of schema variants without duplicating class definitions only
providing alternate documentation parameters.
.. note::
Method ``__new__`` is used instead of ``__init__`` because some :mod:`cornice_swagger` operations will
look explicitly for ``schema_node.__class__.__name__``. If using ``__init__``, the first instance would
set the name value for all following instances instead of the intended reusable meta-schema class.
"""
if not isinstance(name, str) or not re.match(r"^[A-Z][A-Z0-9_-]*$", name, re.I):
raise ValueError(
"New schema name must be provided to avoid invalid mixed use of $ref pointers. "
f"Name '{name}' is invalid."
)
obj = super().__new__(cls) # type: ExtendedSchemaNode
obj.__init__(name=name, description=description)
obj.__class__.__name__ = name
obj.children = [
child
if child.name != "body" else
ExtendedMappingSchema(name="body", **kwargs)
for child in obj.children
]
return obj
def __deepcopy__(self, *args, **kwargs):
# type: (*Any, *Any) -> GenericHTMLResponse
return GenericHTMLResponse(name=self.name, description=self.description, children=self.children)
[docs]
class OWSErrorCode(ExtendedSchemaNode):
[docs]
example = "InvalidParameterValue"
[docs]
description = "OWS error code or URI reference that identifies the problem type."
[docs]
class OWSExceptionResponse(ExtendedMappingSchema):
"""
Error content in XML format.
"""
[docs]
description = "OWS formatted exception."
[docs]
code = OWSErrorCode(example="NoSuchProcess")
[docs]
locator = ExtendedSchemaNode(String(), example="identifier",
description="Indication of the element that caused the error.")
[docs]
message = ExtendedSchemaNode(String(), example="Invalid process ID.",
description="Specific description of the error.")
[docs]
class ErrorDetail(ExtendedMappingSchema):
[docs]
code = ExtendedSchemaNode(Integer(), description="HTTP status code.", example=400)
[docs]
status = ExtendedSchemaNode(String(), description="HTTP status detail.", example="400 Bad Request")
[docs]
class ErrorSource(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(String(), description="Error name or description."),
ErrorDetail(description="Detailed error representation.")
]
[docs]
class ErrorCause(OneOfKeywordSchema):
[docs]
_one_of = [
ExtendedSchemaNode(String(), description="Error message from exception or cause of failure."),
PermissiveMappingSchema(description="Relevant error fields with details about the cause."),
]
[docs]
class ErrorJsonResponseBodySchema(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_COMMON_PART1_SCHEMAS}/exception.json"
[docs]
description = "JSON schema for exceptions based on RFC 7807"
[docs]
type = OWSErrorCode() # only this is required
[docs]
title = ExtendedSchemaNode(String(), description="Short description of the error.", missing=drop)
[docs]
detail = ExtendedSchemaNode(String(), description="Detail about the error cause.", missing=drop)
[docs]
status = ExtendedSchemaNode(Integer(), description="Error status code.", example=500, missing=drop)
[docs]
cause = ErrorCause(missing=drop)
[docs]
value = ErrorCause(missing=drop)
[docs]
error = ErrorSource(missing=drop)
[docs]
instance = URI(missing=drop)
[docs]
exception = OWSExceptionResponse(missing=drop)
[docs]
class ServerErrorBaseResponseSchema(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/ServerError.yaml"
[docs]
class BadRequestResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = "Incorrectly formed request contents."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class ConflictRequestResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = "Conflict between the affected entity and another existing definition."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class UnprocessableEntityResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = "Wrong format or schema of given parameters."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class MethodNotAllowedErrorResponseSchema(ServerErrorBaseResponseSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/NotAllowed.yaml"
[docs]
description = "HTTP method not allowed for requested path."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class NotAcceptableErrorResponseSchema(ServerErrorBaseResponseSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/NotSupported.yaml"
[docs]
description = "Requested media-types are not supported at the path."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class NotFoundResponseSchema(ServerErrorBaseResponseSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/NotFound.yaml"
[docs]
description = "Requested resource could not be found."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class ForbiddenProcessAccessResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = "Referenced process is not accessible."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class ForbiddenProviderAccessResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = "Referenced provider is not accessible."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class ForbiddenProviderLocalResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = (
"Provider operation is not allowed on local-only Weaver instance. "
f"Applies only when application configuration is not within: {WEAVER_CONFIG_REMOTE_LIST}"
)
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class InternalServerErrorResponseSchema(ServerErrorBaseResponseSchema):
[docs]
description = "Unhandled internal server error."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkGetSwaggerJSONResponse(ExtendedMappingSchema):
[docs]
body = OpenAPISpecSchema(description="OpenAPI JSON schema of Weaver API.")
[docs]
examples = {
"OpenAPI Schema": {
"summary": "OpenAPI specification of this API.",
"value": {"$ref": OpenAPISpecSchema._schema},
}
}
[docs]
class OkGetSwaggerUIResponse(ExtendedMappingSchema):
[docs]
body = SwaggerUISpecSchema(description="Swagger UI of Weaver API.")
[docs]
class OkGetRedocUIResponse(ExtendedMappingSchema):
[docs]
body = SwaggerUISpecSchema(description="Redoc UI of Weaver API.")
[docs]
class OkGetVersionsResponse(ExtendedMappingSchema):
[docs]
body = VersionsSchema()
[docs]
class OkGetProvidersListResponse(ExtendedMappingSchema):
[docs]
body = ProvidersBodySchema()
[docs]
class OkGetProviderCapabilitiesSchema(ExtendedMappingSchema):
[docs]
body = ProviderCapabilitiesSchema()
[docs]
class NoContentDeleteProviderSchema(ExtendedMappingSchema):
[docs]
body = NoContent()
[docs]
class NotImplementedDeleteProviderResponse(ExtendedMappingSchema):
[docs]
description = "Provider removal not supported using referenced storage."
[docs]
class OkGetProviderProcessesSchema(ExtendedMappingSchema):
[docs]
body = ProviderProcessesSchema()
[docs]
class GetProcessesQuery(ProcessPagingQuery, ProcessDetailQuery, ProcessLinksQuery, ProcessRevisionsQuery):
[docs]
providers = ExtendedSchemaNode(
QueryBoolean(), example=True, default=False, missing=drop,
description="List local processes as well as all sub-processes of all registered providers. "
"Paging and sorting query parameters are unavailable when providers are requested since lists are "
"populated dynamically and cannot ensure consistent process lists per page across providers. "
f"Applicable only for Weaver configurations {WEAVER_CONFIG_REMOTE_LIST}, ignored otherwise."
)
[docs]
ignore = ExtendedSchemaNode(
QueryBoolean(), example=True, default=True, missing=drop,
description="Only when listing provider processes, any unreachable remote service definitions "
"or unprocessable contents will be silently ignored and dropped from full listing in the response. "
"Disabling this option will raise an error immediately instead of ignoring invalid providers."
)
[docs]
class GetProcessesEndpoint(ExtendedMappingSchema):
[docs]
querystring = GetProcessesQuery()
[docs]
class ProviderProcessesListing(ProcessCollection):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/processList.yaml"
[docs]
_sort_first = ["id", "processes"]
[docs]
id = ProviderNameSchema()
[docs]
class ProviderProcessesList(ExtendedSequenceSchema):
[docs]
item = ProviderProcessesListing(description="Processes offered by the identified remote provider.")
[docs]
class ProvidersProcessesCollection(ExtendedMappingSchema):
[docs]
providers = ProviderProcessesList(missing=drop)
[docs]
class ProcessListingLinks(ExtendedMappingSchema):
[docs]
links = LinkList(missing=drop)
[docs]
class ProcessesListing(ProcessCollection, ProcessListingLinks):
[docs]
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/processList.yaml"
[docs]
_sort_first = PROCESSES_LISTING_FIELD_FIRST
[docs]
_sort_after = PROCESSES_LISTING_FIELD_AFTER
[docs]
class MultiProcessesListing(DescriptionSchema, ProcessesListing, ProvidersProcessesCollection, ProcessListingMetadata):
[docs]
class OkGetProcessesListResponse(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/ProcessList.yaml"
[docs]
description = "Listing of available processes successful."
[docs]
querystring = FormatQuery()
[docs]
body = MultiProcessesListing()
[docs]
class OkPostProcessDeployBodySchema(ExtendedMappingSchema):
[docs]
description = ExtendedSchemaNode(String(), description="Detail about the operation.")
[docs]
deploymentDone = ExtendedSchemaNode(Boolean(), default=False, example=True,
description="Indicates if the process was successfully deployed.")
[docs]
processSummary = ProcessSummary(missing=drop, description="Deployed process summary if successful.")
[docs]
failureReason = ExtendedSchemaNode(String(), missing=drop,
description="Description of deploy failure if applicable.")
[docs]
class OkPostProcessesResponse(ExtendedMappingSchema):
[docs]
description = "Process successfully deployed."
[docs]
body = OkPostProcessDeployBodySchema()
[docs]
class OkPatchProcessUpdatedBodySchema(ExtendedMappingSchema):
[docs]
description = ExtendedSchemaNode(String(), description="Detail about the operation.")
[docs]
processSummary = ProcessSummary(missing=drop, description="Deployed process summary if successful.")
[docs]
class OkPatchProcessResponse(ExtendedMappingSchema):
[docs]
description = "Process successfully updated."
[docs]
body = OkPatchProcessUpdatedBodySchema()
[docs]
class BadRequestGetProcessInfoResponse(ExtendedMappingSchema):
[docs]
description = "Missing process identifier."
[docs]
body = NoContent()
[docs]
class NotFoundProcessResponse(NotFoundResponseSchema):
[docs]
description = "Process with specified reference identifier does not exist."
[docs]
examples = {
"ProcessNotFound": {
"summary": "Example response when specified process reference cannot be found.",
"value": EXAMPLES["local_process_not_found.json"]
}
}
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkGetProcessInfoResponse(ExtendedMappingSchema):
[docs]
querystring = FormatQuery()
[docs]
body = ProcessDescription()
[docs]
class OkGetProcessPackageSchema(ExtendedMappingSchema):
[docs]
querystring = FormatQuery()
[docs]
class OkGetProcessPayloadSchema(ExtendedMappingSchema):
[docs]
body = NoContent()
[docs]
class ProcessVisibilityResponseBodySchema(ExtendedMappingSchema):
[docs]
value = VisibilityValue()
[docs]
class OkGetProcessVisibilitySchema(ExtendedMappingSchema):
[docs]
body = ProcessVisibilityResponseBodySchema()
[docs]
class OkPutProcessVisibilitySchema(ExtendedMappingSchema):
[docs]
body = ProcessVisibilityResponseBodySchema()
[docs]
class ForbiddenVisibilityUpdateResponseSchema(ExtendedMappingSchema):
[docs]
description = "Visibility value modification not allowed."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkDeleteProcessUndeployBodySchema(ExtendedMappingSchema):
[docs]
deploymentDone = ExtendedSchemaNode(Boolean(), default=False, example=True,
description="Indicates if the process was successfully undeployed.")
[docs]
identifier = ExtendedSchemaNode(String(), example="workflow")
[docs]
failureReason = ExtendedSchemaNode(String(), missing=drop,
description="Description of undeploy failure if applicable.")
[docs]
class OkDeleteProcessResponse(ExtendedMappingSchema):
[docs]
description = "Process successfully undeployed."
[docs]
body = OkDeleteProcessUndeployBodySchema()
[docs]
class OkGetProviderProcessDescriptionResponse(ExtendedMappingSchema):
[docs]
body = ProcessDescription()
[docs]
class CreatedPostProvider(ExtendedMappingSchema):
[docs]
body = ProviderSummarySchema()
[docs]
class NotImplementedPostProviderResponse(ExtendedMappingSchema):
[docs]
description = "Provider registration not supported using specified definition."
[docs]
class CreatedLaunchJobResponse(ExtendedMappingSchema):
[docs]
description = (
"Job successfully submitted. "
"Execution should begin when resources are available or when triggered, according to requested execution mode."
)
[docs]
examples = {
"JobAccepted": {
"summary": "Job accepted for execution asynchronously.",
"value": EXAMPLES["job_status_accepted.json"]
},
"JobCreated": {
"summary": "Job created for later execution by trigger.",
"value": EXAMPLES["job_status_created.json"]
}
}
[docs]
body = CreatedJobStatusSchema()
[docs]
class CompletedJobStatusSchema(DescriptionSchema, JobStatusInfo):
pass
[docs]
class CompletedJobResponse(ExtendedMappingSchema):
[docs]
description = "Job submitted and completed execution synchronously."
[docs]
body = CompletedJobStatusSchema()
[docs]
class FailedSyncJobResponse(CompletedJobResponse):
[docs]
description = "Job submitted and failed synchronous execution. See server logs for more details."
[docs]
class InvalidJobParametersResponse(ExtendedMappingSchema):
[docs]
description = "Job parameters failed validation."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkDeleteProcessJobResponse(ExtendedMappingSchema):
[docs]
body = DismissedJobSchema()
[docs]
class OkGetQueriedJobsResponse(ExtendedMappingSchema):
[docs]
querystring = FormatQuery()
[docs]
body = GetQueriedJobsSchema()
[docs]
class BatchDismissJobsBodySchema(DescriptionSchema):
[docs]
jobs = JobIdentifierList(description="Confirmation of jobs that have been dismissed.")
[docs]
class OkBatchDismissJobsResponseSchema(ExtendedMappingSchema):
[docs]
body = BatchDismissJobsBodySchema()
[docs]
class OkDismissJobResponse(ExtendedMappingSchema):
[docs]
body = DismissedJobSchema()
[docs]
class NoContentJobUpdatedResponse(ExtendedMappingSchema):
[docs]
description = "Job detail updated with provided parameters."
[docs]
body = NoContent()
[docs]
class OkGetJobStatusResponse(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/Status.yaml"
[docs]
body = JobStatusInfo()
[docs]
class InvalidJobResponseSchema(ExtendedMappingSchema):
[docs]
description = "Job reference is not a valid UUID."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class NotFoundJobResponseSchema(NotFoundResponseSchema):
[docs]
description = "Job reference UUID cannot be found."
[docs]
examples = {
"JobNotFound": {
"summary": "Example response when specified job reference cannot be found.",
"value": EXAMPLES["job_not_found.json"]
}
}
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class GoneJobResponseSchema(ExtendedMappingSchema):
[docs]
description = "Job reference UUID cannot be dismissed again or its result artifacts were removed."
[docs]
examples = {
"JobDismissed": {
"summary": "Example response when specified job reference was already dismissed.",
"value": EXAMPLES["job_dismissed_error.json"]
}
}
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class JobProvEndpoint(JobPath):
[docs]
querystring = FormatQuery()
[docs]
class ProcessJobProvEndpoint(JobProvEndpoint, LocalProcessPath):
pass
[docs]
class ProviderJobProvEndpoint(JobProvEndpoint, ProviderProcessPath):
pass
[docs]
class OkGetJobProvResponse(ExtendedMappingSchema):
[docs]
description = "Job provenance details in the requested format."
[docs]
class JobProvMetadataResponseBody(ExtendedSchemaNode):
[docs]
description = "Multipart file contents for upload to the vault."
[docs]
class NotFoundJobProvResponseSchema(NotFoundResponseSchema):
[docs]
description = (
"Job reference UUID cannot be found, or a specified provenance "
"run UUID cannot be retrieved from the Workflow execution steps."
)
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class GoneJobProvResponseSchema(ExtendedMappingSchema):
[docs]
description = (
"Job reference contents have been removed or does not contain PROV metadata. "
"This could be because the job was created before provenance support was enabled, "
"or because a job retention period deleted the contents."
)
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkGetJobOutputsResponse(ExtendedMappingSchema):
[docs]
body = JobOutputsBody()
[docs]
class RedirectResultResponse(ExtendedMappingSchema):
[docs]
class OkGetJobResultsResponse(ExtendedMappingSchema):
[docs]
_schema = f"{OGC_API_PROC_PART1_RESPONSES}/Results.yaml"
[docs]
body = ResultsBody()
[docs]
class NoContentJobResultsResponse(ExtendedMappingSchema):
[docs]
description = "Job completed execution synchronously with results returned in Link headers."
[docs]
body = NoContent(default="")
[docs]
class CreatedQuoteExecuteResponse(ExtendedMappingSchema):
[docs]
body = CreatedQuotedJobStatusSchema()
[docs]
class CreatedQuoteResponse(ExtendedMappingSchema):
[docs]
description = "Quote successfully obtained for process execution definition."
[docs]
body = QuoteSchema()
[docs]
class AcceptedQuoteResponse(ExtendedMappingSchema):
[docs]
summary = "Quote successfully submitted."
[docs]
description = (
"Quote successfully submitted for evaluating process execution definition. "
"Complete details will be available once evaluation has completed."
)
[docs]
body = PartialQuoteSchema()
[docs]
class QuotePaymentRequiredResponse(ServerErrorBaseResponseSchema):
[docs]
description = "Quoted process execution refused due to missing payment."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkGetQuoteInfoResponse(ExtendedMappingSchema):
[docs]
body = QuoteSchema()
[docs]
class NotFoundQuoteResponse(NotFoundResponseSchema):
[docs]
description = "Quote with specified reference identifier does not exist."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class OkGetQuoteListResponse(ExtendedMappingSchema):
[docs]
body = QuotationListSchema()
[docs]
class OkGetEstimatorResponse(ExtendedMappingSchema):
[docs]
body = QuoteEstimatorSchema()
[docs]
class OkPutEstimatorResponse(ExtendedMappingSchema):
[docs]
body = DescriptionSchema()
[docs]
class OkDeleteEstimatorResponse(ExtendedMappingSchema):
[docs]
body = DescriptionSchema()
[docs]
class OkGetBillDetailResponse(ExtendedMappingSchema):
[docs]
body = BillSchema()
[docs]
class OkGetBillListResponse(ExtendedMappingSchema):
[docs]
body = BillListSchema()
[docs]
class OkGetJobExceptionsResponse(ExtendedMappingSchema):
[docs]
body = JobExceptionsSchema()
[docs]
class OkGetJobLogsResponse(ExtendedMappingSchema):
[docs]
body = JobLogsSchema()
[docs]
class OkGetJobStatsResponse(ExtendedMappingSchema):
[docs]
body = JobStatisticsSchema()
[docs]
class VaultFileID(UUID):
[docs]
description = "Vault file identifier."
[docs]
example = "78977deb-28af-46f3-876b-cdd272742678"
[docs]
class VaultAccessToken(UUID):
[docs]
description = "Vault file access token."
[docs]
example = "30d889cfb7ae3a63229a8de5f91abc1ef5966bb664972f234a4db9d28f8148e0e" # nosec
[docs]
class VaultEndpoint(ExtendedMappingSchema):
[docs]
class VaultUploadBody(ExtendedSchemaNode):
[docs]
schema_type = String
[docs]
description = "Multipart file contents for upload to the vault."
[docs]
examples = {
ContentType.MULTIPART_FORM: {
"summary": "Upload JSON file to vault as multipart content.",
"value": EXAMPLES["vault_file_upload.txt"],
}
}
[docs]
class VaultUploadEndpoint(ExtendedMappingSchema):
[docs]
body = VaultUploadBody()
[docs]
class VaultFileUploadedBodySchema(ExtendedMappingSchema):
[docs]
access_token = AccessToken()
[docs]
file_id = VaultFileID()
[docs]
file_href = VaultReference()
[docs]
class OkVaultFileUploadedResponse(ExtendedMappingSchema):
[docs]
description = "File successfully uploaded to vault."
[docs]
body = VaultFileUploadedBodySchema()
[docs]
class BadRequestVaultFileUploadResponse(ExtendedMappingSchema):
[docs]
description = "Missing or incorrectly formed file contents."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class UnprocessableEntityVaultFileUploadResponse(ExtendedMappingSchema):
[docs]
description = (
"Invalid filename refused for upload. "
"Filename should include only alphanumeric, underscore, dash, and dot characters. "
"Filename should include both the base name and the desired file extension."
)
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class VaultFileEndpoint(VaultEndpoint):
[docs]
file_id = VaultFileID()
[docs]
class OkVaultFileDetailResponse(ExtendedMappingSchema):
[docs]
body = NoContent(default="")
[docs]
class OkVaultFileDownloadResponse(OkVaultFileDetailResponse):
pass
[docs]
class BadRequestVaultFileAccessResponse(ExtendedMappingSchema):
[docs]
description = "Invalid file vault reference."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class ForbiddenVaultFileDownloadResponse(ExtendedMappingSchema):
[docs]
description = "Forbidden access to vault file. Invalid authorization from provided token."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
class GoneVaultFileDownloadResponse(ExtendedMappingSchema):
[docs]
description = "Vault File resource corresponding to specified ID is not available anymore."
[docs]
body = ErrorJsonResponseBodySchema()
[docs]
get_api_frontpage_responses = {
"200": OkGetFrontpageResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_openapi_json_responses = {
"200": OkGetSwaggerJSONResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_api_swagger_ui_responses = {
"200": OkGetSwaggerUIResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_api_redoc_ui_responses = {
"200": OkGetRedocUIResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_api_versions_responses = {
"200": OkGetVersionsResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_processes_responses = {
"200": OkGetProcessesListResponse(examples={
"ProcessesListing": {
"summary": "Listing of identifiers of local processes registered in Weaver.",
"value": EXAMPLES["local_process_listing.json"],
},
"ProcessesDetails": {
"summary": "Detailed definitions of local processes registered in Weaver.",
"value": EXAMPLES["local_process_listing.json"],
},
"ProvidersProcessesListing": {
"summary": "List of identifiers combining all local and remote processes known by Weaver.",
"value": EXAMPLES["providers_processes_listing.json"],
},
"ProvidersProcessesDetails": {
"summary": "Detailed definitions Combining all local and remote processes known by Weaver.",
"value": EXAMPLES["providers_processes_listing.json"],
}
}),
"400": BadRequestResponseSchema(description="Error in case of invalid listing query parameters."),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
post_processes_responses = {
"201": OkPostProcessesResponse(examples={
"ProcessDeployed": {
"summary": "Process successfully deployed.",
"value": EXAMPLES["local_process_deploy_success.json"],
}
}),
"400": BadRequestResponseSchema(description="Unable to parse process definition"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"409": ConflictRequestResponseSchema(description="Process with same ID already exists."),
"415": UnsupportedMediaTypeResponseSchema(description="Unsupported Media-Type for process deployment."),
"422": UnprocessableEntityResponseSchema(description="Invalid schema for process definition."),
"500": InternalServerErrorResponseSchema(),
}
[docs]
put_process_responses = copy(post_processes_responses)
put_process_responses.update({
"404": NotFoundProcessResponse(description="Process to update could not be found."),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"409": ConflictRequestResponseSchema(description="Process with same ID or version already exists."),
})
[docs]
patch_process_responses = {
"200": OkPatchProcessResponse(),
"400": BadRequestGetProcessInfoResponse(description="Unable to parse process definition"),
"404": NotFoundProcessResponse(description="Process to update could not be found."),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"409": ConflictRequestResponseSchema(description="Process with same ID or version already exists."),
"422": UnprocessableEntityResponseSchema(description="Invalid schema for process definition."),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_process_responses = {
"200": OkGetProcessInfoResponse(description="success", examples={
"ProcessDescriptionSchemaOGC": {
"summary": "Description of a local process registered in Weaver (OGC Schema) "
"with fields on top-level and using inputs/outputs as mapping with keys as IDs.",
"value": EXAMPLES["local_process_description_ogc_api.json"],
},
"ProcessDescriptionSchemaOld": {
"summary": "Description of a local process registered in Weaver (Old Schema) "
"with fields nested under a process section and using inputs/outputs listed with IDs.",
"value": EXAMPLES["local_process_description.json"],
},
"ProcessDescriptionSchemaWPS": {
"Summary": "Description of a local process registered in Weaver (WPS Schema) when requesting XML format.",
"value": EXAMPLES["wps_describeprocess_response.xml"],
}
}),
"400": BadRequestGetProcessInfoResponse(),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_process_package_responses = {
"200": OkGetProcessPackageSchema(description="success", examples={
"PackageCWL": {
"summary": "CWL Application Package definition of the local process.",
"value": EXAMPLES["local_process_package.json"],
}
}),
"403": ForbiddenProcessAccessResponseSchema(),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_process_payload_responses = {
"200": OkGetProcessPayloadSchema(description="success", examples={
"Payload": {
"summary": "Payload employed during process deployment and registration.",
"value": EXAMPLES["local_process_payload.json"],
}
}),
"403": ForbiddenProcessAccessResponseSchema(),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_process_visibility_responses = {
"200": OkGetProcessVisibilitySchema(description="success"),
"403": ForbiddenProcessAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
put_process_visibility_responses = {
"200": OkPutProcessVisibilitySchema(description="success"),
"403": ForbiddenVisibilityUpdateResponseSchema(),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
delete_process_responses = {
"200": OkDeleteProcessResponse(examples={
"ProcessUndeployed": {
"summary": "Process successfully undeployed.",
"value": EXAMPLES["local_process_undeploy_success.json"],
}
}),
"403": ForbiddenProcessAccessResponseSchema(),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_providers_list_responses = {
"200": OkGetProvidersListResponse(description="success", examples={
"ProviderList": {
"summary": "Listing of registered remote providers.",
"value": EXAMPLES["provider_listing.json"],
},
"ProviderNames": {
"summary": "Listing of registered providers names without validation.",
"value": EXAMPLES["provider_names.json"],
}
}),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_responses = {
"200": OkGetProviderCapabilitiesSchema(description="success", examples={
"ProviderDescription": {
"summary": "Description of a registered remote WPS provider.",
"value": EXAMPLES["provider_description.json"],
}
}),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
delete_provider_responses = {
"204": NoContentDeleteProviderSchema(description="success"),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
"501": NotImplementedDeleteProviderResponse(),
}
[docs]
get_provider_processes_responses = {
"200": OkGetProviderProcessesSchema(description="success"),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_process_responses = {
"200": OkGetProviderProcessDescriptionResponse(description="success", examples={
"ProviderProcessWPS": {
"summary": "Description of a remote WPS provider process converted to OGC-API Processes format.",
"value": EXAMPLES["provider_process_description.json"]
}
}),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_process_package_responses = copy(get_process_package_responses)
get_provider_process_package_responses.update({
"403": ForbiddenProviderAccessResponseSchema(),
})
[docs]
post_provider_responses = {
"201": CreatedPostProvider(description="success"),
"400": ExtendedMappingSchema(description=OWSMissingParameterValue.description),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
"501": NotImplementedPostProviderResponse(),
}
[docs]
post_provider_process_job_responses = {
"200": CompletedJobResponse(),
"201": CreatedLaunchJobResponse(),
"204": NoContentJobResultsResponse(),
"400": InvalidJobParametersResponse(),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"415": UnsupportedMediaTypeResponseSchema(),
"422": UnprocessableEntityResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
post_process_jobs_responses = {
"200": CompletedJobResponse(),
"201": CreatedLaunchJobResponse(),
"204": NoContentJobResultsResponse(),
"400": InvalidJobParametersResponse(),
"403": ForbiddenProviderAccessResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"415": UnsupportedMediaTypeResponseSchema(),
"422": UnprocessableEntityResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
post_jobs_responses = copy(post_process_jobs_responses)
[docs]
post_job_results_responses = copy(post_process_jobs_responses)
post_job_results_responses.pop("201") # job already created, therefore invalid
post_job_results_responses.update({
"202": CreatedLaunchJobResponse(), # alternate to '201' for async case since job already exists
})
[docs]
get_all_jobs_responses = {
"200": OkGetQueriedJobsResponse(description="success", examples={
"JobListing": {
"summary": "Job ID listing with default queries.",
"value": EXAMPLES["jobs_listing.json"]
}
}),
"400": BadRequestResponseSchema(description="Error in case of invalid search query parameters."),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"422": UnprocessableEntityResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
delete_jobs_responses = {
"200": OkBatchDismissJobsResponseSchema(description="success"),
"400": BadRequestResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"422": UnprocessableEntityResponseSchema(),
}
[docs]
get_provider_all_jobs_responses = copy(get_all_jobs_responses)
get_provider_all_jobs_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_single_job_status_responses = {
"200": OkGetJobStatusResponse(description="success", examples={
"JobStatusSuccess": {
"summary": "Successful job status response.",
"value": EXAMPLES["job_status_success.json"],
},
"JobStatusFailure": {
"summary": "Failed job status response.",
"value": EXAMPLES["job_status_failed.json"],
}
}),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_single_job_status_responses = copy(get_single_job_status_responses)
get_provider_single_job_status_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
patch_job_responses = {
"204": NoContentJobUpdatedResponse(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"415": UnsupportedMediaTypeResponseSchema(),
"422": UnprocessableEntityResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
patch_process_job_responses = copy(patch_job_responses)
[docs]
patch_provider_job_responses = copy(patch_job_responses)
[docs]
delete_job_responses = {
"200": OkDismissJobResponse(description="success", examples={
"JobDismissedSuccess": {
"summary": "Successful job dismissed response.",
"value": EXAMPLES["job_dismissed_success.json"]
},
}),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
delete_provider_job_responses = copy(delete_job_responses)
delete_provider_job_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
get_provider_inputs_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_job_outputs_responses = {
"200": OkGetJobOutputsResponse(description="success", examples={
"JobOutputs": {
"summary": "Obtained job outputs values following process execution.",
"value": EXAMPLES["job_outputs.json"],
}
}),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_outputs_responses = copy(get_job_outputs_responses)
get_provider_outputs_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_result_redirect_responses = {
"308": RedirectResultResponse(description="Redirects '/result' (without 's') to corresponding '/results' path."),
}
[docs]
get_job_results_responses = {
"200": OkGetJobResultsResponse(description="success", examples={
"JobResults": {
"summary": "Obtained job results.",
"value": EXAMPLES["job_results.json"],
}
}),
"204": NoContentJobResultsResponse(description="success"),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_results_responses = copy(get_job_results_responses)
get_provider_results_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_job_exceptions_responses = {
"200": OkGetJobExceptionsResponse(description="success", examples={
"JobExceptions": {
"summary": "Job exceptions that occurred during failing process execution.",
"value": EXAMPLES["job_exceptions.json"],
}
}),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_exceptions_responses = copy(get_job_exceptions_responses)
get_provider_exceptions_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_job_logs_responses = {
"200": OkGetJobLogsResponse(description="success", examples={
"JobLogs": {
"summary": "Job logs registered and captured throughout process execution.",
"value": EXAMPLES["job_logs.json"],
}
}),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_logs_responses = copy(get_job_logs_responses)
get_provider_logs_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_job_stats_responses = {
"200": OkGetJobStatsResponse(description="success", examples={
"JobStatistics": {
"summary": "Job statistics collected following process execution.",
"value": EXAMPLES["job_statistics.json"],
}
}),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobResponseSchema(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_provider_stats_responses = copy(get_job_stats_responses)
get_provider_stats_responses.update({
"403": ForbiddenProviderLocalResponseSchema(),
})
[docs]
get_job_prov_responses = {
"200": OkGetJobProvResponse(
description="Successful job PROV details.",
examples={
"PROV-JSON": {
"summary": "Provenance details returned in PROV-JSON format.",
"value": EXAMPLES["job_prov.json"],
},
"PROV-N": {
"summary": "Provenance details returned in PROV-N format.",
"value": EXAMPLES["job_prov.txt"],
},
"PROV-XML": {
"summary": "Provenance details returned in PROV-XML format.",
"value": EXAMPLES["job_prov.xml"],
}
}
),
"400": InvalidJobResponseSchema(),
"404": NotFoundJobProvResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneJobProvResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_quote_list_responses = {
"200": OkGetQuoteListResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_quote_responses = {
"200": OkGetQuoteInfoResponse(description="success"),
"404": NotFoundQuoteResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
post_quotes_responses = {
"201": CreatedQuoteResponse(),
"202": AcceptedQuoteResponse(),
"400": InvalidJobParametersResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
post_quote_responses = {
"201": CreatedQuoteExecuteResponse(description="success"),
"400": InvalidJobParametersResponse(),
"402": QuotePaymentRequiredResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_process_quote_estimator_responses = {
"200": OkGetEstimatorResponse(description="success"),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
put_process_quote_estimator_responses = {
"200": OkPutEstimatorResponse(description="success"),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
delete_process_quote_estimator_responses = {
"204": OkDeleteEstimatorResponse(description="success"),
"404": NotFoundProcessResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_bill_list_responses = {
"200": OkGetBillListResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_bill_responses = {
"200": OkGetBillDetailResponse(description="success"),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
post_vault_responses = {
"200": OkVaultFileUploadedResponse(description="success", examples={
"VaultFileUploaded": {
"summary": "File successfully uploaded to vault.",
"value": EXAMPLES["vault_file_uploaded.json"],
}
}),
"400": BadRequestVaultFileUploadResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"422": UnprocessableEntityVaultFileUploadResponse(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
head_vault_file_responses = {
"200": OkVaultFileDetailResponse(description="success", examples={
"VaultFileDetails": {
"summary": "Obtain vault file metadata.",
"value": EXAMPLES["vault_file_head.json"],
}
}),
"400": BadRequestVaultFileAccessResponse(),
"403": ForbiddenVaultFileDownloadResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneVaultFileDownloadResponse(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
get_vault_file_responses = {
"200": OkVaultFileDownloadResponse(description="success"),
"400": BadRequestVaultFileAccessResponse(),
"403": ForbiddenVaultFileDownloadResponse(),
"405": MethodNotAllowedErrorResponseSchema(),
"406": NotAcceptableErrorResponseSchema(),
"410": GoneVaultFileDownloadResponse(),
"500": InternalServerErrorResponseSchema(),
}
[docs]
wps_responses = {
"200": OkWPSResponse(examples={
"GetCapabilities": {
"summary": "GetCapabilities example response.",
"value": EXAMPLES["wps_getcapabilities_response.xml"]
},
"DescribeProcess": {
"summary": "DescribeProcess example response.",
"value": EXAMPLES["wps_describeprocess_response.xml"]
},
"ExecuteSuccess": {
"summary": "Successful process execute example response.",
"value": EXAMPLES["wps_execute_response.xml"]
},
"ExecuteFailed": {
"summary": "Failed process execute example response.",
"value": EXAMPLES["wps_execute_failed_response.xml"]
}
}),
"400": ErrorWPSResponse(examples={
"MissingParameterError": {
"summary": "Error report in case of missing request parameter.",
"value": EXAMPLES["wps_missing_parameter_response.xml"],
},
"AccessForbiddenError": {
"summary": "Error report in case of forbidden access to the service.",
"value": EXAMPLES["wps_access_forbidden_response.xml"],
}
}),
"405": ErrorWPSResponse(),
"406": ErrorWPSResponse(),
"500": ErrorWPSResponse(),
}
#################################################################
# Utility methods
#################################################################
[docs]
def derive_responses(responses, response_schema, status_code=200):
# type: (Dict[str, ExtendedSchemaNode], ExtendedSchemaNode, int) -> Dict[str, ExtendedSchemaNode]
"""
Generates a derived definition of the responses mapping using the specified schema and status code.
:param responses: Mapping of status codes to response schemas.
:param response_schema: Desired response schema to apply.
:param status_code: Desired status code for which to apply the response schema.
:return: Derived responses mapping.
"""
responses = copy(responses)
responses[str(status_code)] = response_schema
return responses
[docs]
def datetime_interval_parser(datetime_interval):
# type: (str) -> DatetimeIntervalType
"""
This function parses a given datetime or interval into a dictionary that will be easy for database process.
"""
parsed_datetime = {}
if datetime_interval.startswith(DATETIME_INTERVAL_OPEN_START_SYMBOL):
datetime_interval = datetime_interval.replace(DATETIME_INTERVAL_OPEN_START_SYMBOL, "")
parsed_datetime["before"] = date_parser.parse(datetime_interval)
elif datetime_interval.endswith(DATETIME_INTERVAL_OPEN_END_SYMBOL):
datetime_interval = datetime_interval.replace(DATETIME_INTERVAL_OPEN_END_SYMBOL, "")
parsed_datetime["after"] = date_parser.parse(datetime_interval)
elif DATETIME_INTERVAL_CLOSED_SYMBOL in datetime_interval:
datetime_interval = datetime_interval.split(DATETIME_INTERVAL_CLOSED_SYMBOL)
parsed_datetime["after"] = date_parser.parse(datetime_interval[0])
parsed_datetime["before"] = date_parser.parse(datetime_interval[-1])
else:
parsed_datetime["match"] = date_parser.parse(datetime_interval)
return parsed_datetime
[docs]
def validate_node_schema(schema_node, cstruct):
# type: (ExtendedMappingSchema, JSON) -> JSON
"""
Validate a schema node defined against a reference schema within :data:`WEAVER_SCHEMA_DIR`.
If the reference contains an anchor (e.g.: ``#/definitions/Def``), the sub-schema of that
reference will be used for validation against the data structure.
"""
schema_node.deserialize(cstruct)
schema_file = schema_node._schema.replace(WEAVER_SCHEMA_URL, WEAVER_SCHEMA_DIR)
schema_path = []
if "#" in schema_file:
schema_file, schema_ref = schema_file.split("#", 1)
schema_path = [ref for ref in schema_ref.split("/") if ref]
schema_base = schema = load_file(schema_file)
if schema_path:
for part in schema_path:
schema = schema[part]
# ensure local schema can find relative $ref, since the provided reference can be a sub-schema (with "#/...")
scheme_uri = f"file://{schema_file}" if schema_file.startswith("/") else schema_file
validator = jsonschema.validators.validator_for(schema_base)
validator.resolver = jsonschema.RefResolver(base_uri=scheme_uri, referrer=schema_base)
validator(schema_base).validate(cstruct, schema) # raises if invalid
return cstruct