import copy
import json
import logging
import os
import pathlib
import warnings
from copy import deepcopy
from email import message_from_bytes
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import TYPE_CHECKING
from urllib.parse import parse_qs, urljoin, urlparse
import colander
import docker
import yaml
from docker.errors import ImageNotFound # pylint: disable=E0611
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPConflict,
HTTPCreated,
HTTPException,
HTTPForbidden,
HTTPNotFound,
HTTPNotImplemented,
HTTPOk,
HTTPUnprocessableEntity,
HTTPUnsupportedMediaType
)
from pyramid.settings import asbool
from weaver.compat import Version
from weaver.config import (
WEAVER_CONFIG_DIR,
WEAVER_DEFAULT_WPS_PROCESSES_CONFIG,
WeaverFeature,
get_weaver_config_file,
get_weaver_configuration
)
from weaver.database import get_db
from weaver.datatype import DockerAuthentication, Process, Service
from weaver.exceptions import (
InvalidIdentifierValue,
MissingIdentifierValue,
PackageNotFound,
PackageRegistrationError,
PackageTypeError,
ProcessNotAccessible,
ProcessNotFound,
ProcessRegistrationError,
ServiceNotFound,
log_unhandled_exceptions
)
from weaver.formats import ContentType, repr_json
from weaver.processes.constants import PACKAGE_EXTENSIONS
from weaver.processes.convert import get_field, normalize_ordered_io, set_field
from weaver.processes.types import ProcessType
from weaver.store.base import StoreJobs, StoreProcesses, StoreServices
from weaver.utils import (
VersionFormat,
VersionLevel,
as_version_major_minor_patch,
fully_qualified_name,
generate_diff,
get_any_id,
get_header,
get_sane_name,
get_settings,
get_url_without_query,
is_remote_file,
is_update_version,
load_file,
parse_content_id,
request_extra,
str2bytes
)
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_client
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.processes.utils import resolve_process_tag
from weaver.wps_restapi.utils import get_wps_restapi_base_url
[docs]
LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
from typing import Any, Dict, List, Optional, Tuple, Union
from docker.client import DockerClient
from weaver.typedefs import (
URL,
AnyHeadersContainer,
AnyRegistryContainer,
AnyRequestType,
AnySettingsContainer,
AnyVersion,
CWL,
FileSystemPathType,
JSON,
Literal,
NotRequired,
Number,
ProcessDeployment,
PyramidRequest,
SettingsType,
TypedDict
)
from weaver.utils import LoggerHandler
[docs]
UpdateFieldListMethod = Literal["append", "override"]
UpdateFieldListSpec = TypedDict("UpdateFieldListSpec", {
"source": str,
"target": NotRequired[str],
"unique": NotRequired[bool],
"method": UpdateFieldListMethod,
}, total=True)
UpdateFields = List[Union[str, UpdateFieldListMethod]]
[docs]
def get_process(process_id=None, request=None, settings=None, store=None, revision=True):
# type: (Optional[str], Optional[PyramidRequest], Optional[SettingsType], Optional[StoreProcesses], bool) -> Process
"""
Obtain the specified process and validate information, returning appropriate HTTP error if invalid.
Process identifier must be provided from either the request path definition or literal ID.
Database must be retrievable from either the request, underlying settings, or direct store reference.
.. versionchanged:: 4.20
Process identifier can also be an 'id:version' tag. Also, the request query parameter 'version' can be used.
If using the :paramref:`process_id` explicitly instead of the request, a versioned :term:`Process` reference
MUST employ the tagged representation to resolve the appropriate :term:`Process` revision.
Different parameter combinations are intended to be used as needed or more appropriate, such that redundant
operations can be reduced where some objects are already fetched from previous operations.
:param process_id: Explicit :term:`Process` identifier to employ for lookup.
:param request: When no explicit ID specified, try to find information from the request.
:param settings:
Application settings for database connection. Can be guessed from local thread or request object if not given.
:param store: Database process store reference.
:param revision:
When parsing the :term:`Process` ID (either explicit or from request), indicate if any tagged revision
specifier should be used or dropped.
"""
store = store or get_db(settings or request).get_store(StoreProcesses)
try:
if process_id is None and request is not None:
process_id = resolve_process_tag(request)
if not revision:
process_id = Process.split_version(process_id)[0]
process = store.fetch_by_id(process_id, visibility=Visibility.PUBLIC)
return process
except (InvalidIdentifierValue, MissingIdentifierValue, colander.Invalid) as exc:
msg = getattr(exc, "msg", str(exc))
raise HTTPBadRequest(json={
"type": "InvalidIdentifierValue",
"title": "Process ID is invalid.",
"description": "Failed schema validation to retrieve process.",
"cause": f"Invalid schema: [{msg}]",
"error": exc.__class__.__name__,
"value": str(process_id)
})
except ProcessNotAccessible:
raise HTTPForbidden(f"Process with ID '{process_id!s}' is not accessible.")
except ProcessNotFound:
raise ProcessNotFound(json={
"title": "NoSuchProcess",
"type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-process",
"detail": sd.NotFoundProcessResponse.description,
"status": ProcessNotFound.code,
"cause": str(process_id)
})
[docs]
def map_progress(progress, range_min, range_max):
# type: (Number, Number, Number) -> Number
"""
Calculates the relative progression of the percentage process within min/max values.
"""
return max(range_min, min(range_max, range_min + (progress * (range_max - range_min)) / 100))
@log_unhandled_exceptions(logger=LOGGER, message="Unhandled error occurred during parsing of deploy payload.",
is_request=False)
[docs]
def _check_deploy(payload):
# type: (JSON) -> Union[ProcessDeployment, CWL]
"""
Validate minimum deploy payload field requirements with exception handling.
"""
message = "Process deployment definition is invalid."
try:
results = sd.Deploy(schema_meta_include=False, schema_include=False).deserialize(payload)
# Because many fields are optional during deployment to allow flexibility between compatible WPS/CWL
# definitions, any invalid field at lower-level could make a full higher-level definition to be dropped.
# Verify the result to ensure this was not the case for known cases to attempt early detection.
p_process = payload.get("processDescription", {})
r_process = results.get("processDescription", {})
if "process" in p_process:
# if process is nested, both provided/result description must align
# don't use "get_process_information" to make sure everything is retrieved under same location
p_process = p_process.get("process", {})
r_process = r_process.get("process", {})
for io_type, io_schema in [("inputs", sd.DeployInputTypeAny), ("outputs", sd.DeployOutputTypeAny)]:
p_io = p_process.get(io_type)
r_io = r_process.get(io_type)
if p_io and p_io != r_io:
message = f"Process deployment {io_type} definition is invalid."
# try raising sub-schema to have specific reason
d_io = io_schema(name=io_type).deserialize(p_io)
# Raise directly if we were unable to detect the cause, but there is something incorrectly dropped.
# Only raise if indirect vs direct deserialize differ such that auto-resolved defaults omitted from
# submitted process I/O or unknowns fields that were correctly ignored don't cause false-positive diffs.
if r_io != d_io:
message = (
f"Process deployment {p_io} definition resolved as valid schema "
f"but differ from submitted values. "
f"Validate provided {p_io} against resolved {p_io} with schemas "
f"to avoid mismatching definitions."
)
raise HTTPBadRequest(json={
"title": message,
"cause": "unknown",
"error": "Invalid",
"value": d_io
})
LOGGER.warning(
"Detected difference between original/parsed deploy %s, but no invalid schema:\n%s",
io_type, generate_diff(p_io, r_io, val_name="original payload", ref_name="parsed result")
)
# Execution Unit is optional since process reference (e.g.: WPS-1 href) can be provided in processDescription
# Cannot validate as CWL yet, since execution unit can also be an href that is not yet fetched (it will later)
p_exec_unit = payload.get("executionUnit", [{}])
r_exec_unit = results.get("executionUnit", [{}])
if p_exec_unit and p_exec_unit != r_exec_unit:
message = "Process deployment execution unit is invalid."
d_exec_unit = sd.ExecutionUnitVariations(
schema_meta_include=False,
schema_include=False,
).deserialize(p_exec_unit) # raises directly if caused by invalid schema
if r_exec_unit != d_exec_unit: # otherwise raise a generic error, don't allow differing definitions
message = (
"Process deployment execution unit resolved as valid definition but differs from submitted "
"package. Aborting deployment to avoid mismatching package definitions."
)
raise HTTPBadRequest(json={
"title": message,
"cause": "unknown",
"error": PackageRegistrationError.__name__,
"value": d_exec_unit
})
LOGGER.warning(
"Detected difference between original/parsed deploy execution unit, but no invalid schema:\n%s",
generate_diff(p_exec_unit, r_exec_unit, val_name="original payload", ref_name="parsed result")
)
return results
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as exc:
LOGGER.debug("Failed deploy body schema validation:\n%s", exc)
raise HTTPBadRequest(json={
"title": message,
"detail": f"Invalid schema: [{exc.msg!s}]",
"error": exc.__class__.__name__,
"cause": exc.asdict(),
"value": repr_json(exc.value),
})
@log_unhandled_exceptions(logger=LOGGER, message="Unhandled error occurred during parsing of process definition.",
is_request=False)
[docs]
def _validate_deploy_process_info(process_info, reference, package, settings, headers):
# type: (JSON, Optional[str], Optional[CWL], SettingsType, Optional[AnyHeadersContainer]) -> JSON
"""
Obtain the process definition from deploy payload with exception handling.
.. seealso::
- :func:`weaver.processes.wps_package.get_process_definition`
"""
from weaver.processes.wps_package import check_package_instance_compatible, get_process_definition
try:
# data_source `None` forces workflow process to search locally for deployed step applications
info = get_process_definition(
process_info,
reference,
package,
data_source=None,
headers=headers,
container=settings,
)
# validate process type and package against weaver configuration
cfg = get_weaver_configuration(settings)
if cfg not in WeaverFeature.REMOTE:
problem = check_package_instance_compatible(info["package"])
if problem:
proc_type = info["type"]
raise HTTPForbidden(json={
"description": (
f"Invalid process deployment of type [{proc_type}] on [{cfg}] instance. "
"Remote execution is required but not supported."
),
"cause": problem
})
return info
except PackageNotFound as ex:
# raised when a workflow sub-process is not found (not deployed locally)
raise HTTPNotFound(detail=str(ex))
except InvalidIdentifierValue as ex:
raise HTTPBadRequest(str(ex))
except (PackageRegistrationError, PackageTypeError) as ex:
msg = f"Invalid package/reference definition. Loading generated error: [{ex!s}]"
LOGGER.exception(msg)
raise HTTPUnprocessableEntity(detail=msg)
[docs]
def resolve_cwl_graph(package):
# type: (CWL) -> Union[CWL, Tuple[List[CWL], CWL]]
"""
Resolve :term:`CWL` ``$graph`` into deployable packages.
:returns:
- Single :term:`CWL` ``dict`` if no ``$graph`` or ``$graph`` with 1 item (backward compatible)
- ``tuple`` of (``list`` of :term:`CWL` ``dict`` items, original package with ``$graph``) if multiple items
.. seealso::
- `crim-ca/weaver#56 <https://github.com/crim-ca/weaver/issues/56>`_
- `CWL Packed Documents <https://www.commonwl.org/v1.2/CommandLineTool.html#Packed_documents>`_
"""
if "$graph" not in package:
return package
graph_items = package.get("$graph", [])
if not isinstance(graph_items, list):
return package
if len(graph_items) == 1:
# Single item: unpack as before (backward compatible)
cwl_base = {k: v for k, v in package.items() if k != "$graph"}
cwl_base.update(graph_items[0])
return cwl_base
# Multiple items: return list with original package for workflow reference resolution
# Each item inherits top-level fields (e.g., cwlVersion)
cwl_base = {k: v for k, v in package.items() if k != "$graph"}
resolved_items = []
for item in graph_items:
cwl_item = deepcopy(cwl_base)
cwl_item.update(item)
resolved_items.append(cwl_item)
return resolved_items, package
[docs]
def resolve_deployment_order(cwl_packages):
# type: (List[CWL]) -> Tuple[List[CWL], Optional[CWL]]
"""
Determine deployment order for multiple :term:`CWL` packages.
:param cwl_packages: ``list`` of :term:`CWL` package definitions to order.
:returns:
``tuple`` of (dependencies, main_workflow)
- dependencies: ``list`` of ``CommandLineTool``/``ExpressionTool`` to deploy first
- main_workflow: The main ``Workflow`` (if any) to deploy last, or ``None``
:raises HTTPNotImplemented: If multiple ``Workflow`` definitions are provided.
:raises HTTPBadRequest: If multiple tools without a ``Workflow`` and no ``#main`` entry point.
.. seealso::
- `CWL Packed Documents <https://www.commonwl.org/v1.2/CommandLineTool.html#Packed_documents>`_
"""
workflows = []
tools = []
for pkg in cwl_packages:
cwl_class = pkg.get("class", "")
if cwl_class == "Workflow":
workflows.append(pkg)
elif cwl_class in ["CommandLineTool", "ExpressionTool"]:
tools.append(pkg)
# FIXME: Temporarily require at least one Workflow in multi-CWL deployments.
# See: https://github.com/crim-ca/weaver/issues/171
# If multiple sub-Workflow do not work directly, keep this limit.
# Otherwise, allow tool-only deployments and demonstrate multi-workflow deployment.
if len(workflows) > 1:
raise HTTPNotImplemented(json={
"title": "Multiple Workflow definitions in $graph.",
"description": "Only one top-level Workflow is supported per deployment.",
"cause": {"workflow_count": len(workflows)},
"value": [wf.get("id") for wf in workflows]
})
main_tool = None
if len(cwl_packages) > 1:
# Check for duplicate #main if explicitly used
main_items = [pkg for pkg in cwl_packages if pkg.get("id") == "#main"]
if len(main_items) > 1:
raise HTTPBadRequest(json={
"title": "Duplicate #main entry point in $graph.",
"description": (
"Only one item in $graph can have id '#main' as the entry point."
),
"cause": {"main_count": len(main_items)},
"value": [item.get("class") for item in main_items]
})
# If Workflow exists, it's the main entry point (regardless of #main designation)
if len(workflows) > 0:
main_tool = None
elif len(tools) > 1:
# Multiple tools without Workflow: NOT ALLOWED (even with #main)
main_tool_found = next((t for t in tools if t.get("id") == "#main"), None)
raise HTTPBadRequest(json={
"title": "No entry point in $graph.",
"description": (
"Multi-CWL deployment with multiple tools requires a Workflow as the entry point, "
"according to CWL packed document specification. Multiple CommandLineTools or "
"ExpressionTools without a Workflow are not supported, even with #main designation."
),
"cause": {"workflow_count": 0, "tool_count": len(tools), "main_found": bool(main_tool_found)},
"value": [tool.get("id") for tool in tools]
})
elif len(tools) == 1:
# Single tool: it's implicitly the main entry point
main_tool = tools[0]
main_workflow = workflows[0] if workflows else main_tool
return tools, main_workflow
[docs]
def resolve_multi_execution_units(execution_units):
# type: (List[JSON]) -> List[CWL]
"""
Extract all :term:`CWL` packages from multiple execution units.
Each execution unit can contain either an inline package (``unit``) or a reference (``href``)
to fetch the package from a remote location.
:param execution_units:
``list`` of execution unit definitions.
Each unit must have exactly one of ``unit`` or ``href`` (enforced by schema validation).
:returns: ``list`` of resolved :term:`CWL` package definitions.
:raises HTTPBadRequest: If unable to fetch a remote execution unit reference.
"""
packages = []
for idx, execution_unit in enumerate(execution_units):
unit_package = execution_unit.get("unit")
unit_reference = execution_unit.get("href")
if unit_package:
packages.append(unit_package)
elif unit_reference:
try:
# To avoid circular dependencies
from weaver.processes.wps_package import _generate_process_with_cwl_from_reference
cwl_pkg, _ = _generate_process_with_cwl_from_reference(unit_reference)
packages.append(cwl_pkg)
except Exception as exc:
raise HTTPBadRequest(json={
"title": "Failed to fetch execution unit reference",
"description": f"Could not retrieve CWL from href at index {idx}: {unit_reference}",
"cause": {"error": str(exc), "href": unit_reference, "index": idx}
})
return packages
[docs]
def _get_multipart_content(content, request):
# type: (Union[str, bytes], Optional[AnyRequestType]) -> bytes
"""
Get raw multipart content as ``bytes``. """
if request is not None and hasattr(request, 'body'):
return request.body
try:
return str2bytes(content)
except TypeError as exc:
raise HTTPBadRequest(f"Invalid multipart content format: {exc}")
[docs]
def create_multipart_deploy(cwl_files, url, process_description=None, boundary=None):
# type: (List[Union[str, CWL]], URL, Optional[JSON], Optional[str]) -> Tuple[bytes, str]
"""
Create ``multipart/related`` deployment content from a ``list`` of :term:`CWL` files.
:param cwl_files:
``list`` of :term:`CWL` files. Each item can be:
- A file path (``str``) to a :term:`CWL` file (will be loaded)
- A :term:`CWL` ``dict`` (already parsed)
:param url: Domain or hostname for ``Content-ID`` header generation
:param process_description: Optional :term:`Process` description metadata to include
:param boundary: Optional custom ``boundary`` ``str`` (auto-generated if not provided)
:returns:
``tuple`` of (multipart content ``bytes``, full ``Content-Type`` header with ``boundary``)
"""
if not cwl_files:
raise ValueError("At least one CWL file must be provided")
msg = MIMEMultipart("related", boundary=boundary)
main_workflow_cid = None
workflow_count = 0
# Add CWL parts
for idx, cwl_item in enumerate(cwl_files):
if isinstance(cwl_item, str):
cwl_data = load_file(cwl_item)
else:
cwl_data = cwl_item
cwl_class = cwl_data.get("class", "")
is_workflow = cwl_class == "Workflow"
cwl_id = cwl_data.get("id")
if not cwl_id:
raise ValueError(
f"CWL package at index {idx} missing required 'id' field for multipart deployment. "
f"Each CWL package must have an 'id' to be used as Content-ID."
)
if is_workflow:
workflow_count += 1
if workflow_count == 1:
# First workflow becomes the main one
main_workflow_cid = cwl_id
# Create the CWL part
cwl_json = json.dumps(cwl_data, indent=2)
part = MIMEText(cwl_json, _subtype="json", _charset="utf-8")
# Replace the default Content-Type (text/json) with the CWL-specific one
part.replace_header("Content-Type", ContentType.APP_CWL_JSON)
part.add_header("Content-ID", f"<{cwl_id}@{url}>")
part.add_header("Content-Location", cwl_id)
msg.attach(part)
# Add process description if provided
if process_description:
desc_json = json.dumps(process_description, indent=2)
desc_part = MIMEText(desc_json, _subtype="json", _charset="utf-8")
# Replace the default Content-Type (text/json) with application/json
desc_part.replace_header("Content-Type", ContentType.APP_JSON)
desc_part.add_header("Content-ID", "<process-description>")
msg.attach(desc_part)
multipart_content = msg.as_bytes()
# Build the Content-Type header with start parameter if we have a main workflow
# Extract the boundary from the message (auto-generated if not explicitly provided)
boundary = msg.get_boundary()
content_type = f"multipart/related; boundary={boundary}"
if main_workflow_cid:
content_type += f"; start={main_workflow_cid}"
return multipart_content, content_type
[docs]
def _classify_multipart_part(part_data, cwl_packages, parts_by_cid, content_id, process_description):
# type: (JSON, List[CWL], Dict[str, CWL], str, Optional[JSON]) -> Optional[JSON]
"""
Classify parsed multipart part as CWL package or process description.
Returns updated process_description if applicable.
"""
if not isinstance(part_data, dict):
return process_description
if "class" in part_data and part_data["class"] in ["CommandLineTool", "Workflow", "ExpressionTool"]:
cwl_packages.append(part_data)
if content_id:
parts_by_cid[content_id] = part_data
return process_description
if "cwlVersion" in part_data and "$graph" in part_data:
cwl_packages.append(part_data)
if content_id:
parts_by_cid[content_id] = part_data
return process_description
if "processDescription" in part_data or "process" in part_data:
if process_description is not None:
LOGGER.warning("Multiple process descriptions found in multipart, using first one")
return process_description
return part_data
if any(k in part_data for k in ["inputs", "outputs", "baseCommand", "steps"]):
cwl_packages.append(part_data)
if content_id:
parts_by_cid[content_id] = part_data
return process_description
# Unknown part type - use as process description if we don't have one yet
return process_description or part_data
[docs]
def _validate_and_reorder_multipart_workflow(cwl_packages, root_workflow_cid, parts_by_cid):
# type: (List[CWL], Optional[str], Dict[str, CWL]) -> List[CWL]
"""
Validate and reorder :term:`CWL` packages based on root workflow reference.
Validates that the root document (specified by ``start`` parameter or first element) is a ``Workflow``
as per RFC 5621 requirements for ``multipart/related``.
:param cwl_packages: ``list`` of :term:`CWL` packages extracted from multipart content
:param root_workflow_cid: Content-ID of the root workflow from ``start`` parameter (if provided)
:param parts_by_cid: ``dict`` mapping Content-IDs to :term:`CWL` packages
:returns: Reordered ``list`` of :term:`CWL` packages with root workflow last
:raises HTTPBadRequest: If ``start`` parameter references a non-Workflow :term:`CWL`
"""
if root_workflow_cid and root_workflow_cid in parts_by_cid:
root_pkg = parts_by_cid[root_workflow_cid]
# Validate that the root is actually a Workflow (per RFC 5621 and multipart/related requirements)
root_class = root_pkg.get("class", "")
if root_class != "Workflow":
raise HTTPBadRequest(json={
"title": "Invalid root workflow reference",
"description": (
f"The 'start' parameter references a CWL with class '{root_class}', "
"but only 'Workflow' is permitted as root document in multipart/related."
),
"cause": {"Content-ID": root_workflow_cid, "class": root_class}
})
cwl_packages = [pkg for pkg in cwl_packages if pkg is not root_pkg]
cwl_packages.append(root_pkg)
elif not root_workflow_cid and cwl_packages:
# No explicit start parameter: validate first element is a Workflow (RFC 5621 §7 default)
first_pkg = cwl_packages[0]
first_class = first_pkg.get("class", "")
if first_class and first_class != "Workflow":
LOGGER.warning(
"No 'start' parameter provided in multipart/related. First element has class '%s' "
"but 'Workflow' is recommended for root document. Proceeding with deployment.",
first_class
)
return cwl_packages
[docs]
def _fetch_multipart_content_location(content_location, part_content, request=None):
# type: (str, str, Optional[AnyRequestType]) -> str
"""
Fetch :term:`CWL` content from ``Content-Location`` header if part body is empty.
Checks if part body is empty and ``Content-Location`` is a URL, then fetches the content from that location.
``Content-Location`` can be a static :term:`CWL` file URL or an API endpoint (Weaver, WPS, OGC API).
Relative URLs are resolved against the base API URL.
:param content_location: ``Content-Location`` header value (absolute or relative URL)
:param part_content: Current part content (may be empty)
:param request: Optional request object for resolving relative URLs
:returns: Updated part content (either original or fetched from ``Content-Location``)
:raises HTTPBadRequest: If fetching from ``Content-Location`` fails
"""
# Only fetch if part body is empty AND Content-Location is provided
if not content_location or (part_content and part_content.strip()):
return part_content
# Resolve relative URLs using base API URL
absolute_url = content_location
if not any(content_location.startswith(scheme) for scheme in ["http://", "https://", "file://", "s3://"]):
# Relative URL - resolve against base API URL
if request:
base_url = get_wps_restapi_base_url(request)
absolute_url = urljoin(f"{base_url}/", content_location)
LOGGER.debug(
"Resolving relative Content-Location [%s] to absolute URL [%s]",
content_location, absolute_url
)
else:
# No request context to resolve relative URL
LOGGER.error(
"Content-Location [%s] is relative but no request context available to resolve it.",
content_location
)
raise HTTPBadRequest(json={
"title": "Invalid Content-Location",
"description": (
f"Content-Location [{content_location}] is a relative URL but cannot be resolved "
f"without request context."
),
"cause": {"location": content_location}
})
LOGGER.debug("Fetching CWL from Content-Location: %s", absolute_url)
try:
# Content-Location can be:
# 1. A static CWL file URL (http/https/s3/file)
# 2. A Weaver process package endpoint (/processes/{pid}/package)
# 3. A WPS process endpoint (/wps?request=DescribeProcess&identifier=...)
# 4. An OGC API Processes endpoint
_, ext = os.path.splitext(absolute_url.split('?')[0]) # strip query params
if ext.replace('.', '') in PACKAGE_EXTENSIONS:
return load_file(absolute_url, text=True)
# Could be an API endpoint (Weaver, WPS, OGC API)
# Use the process definition resolver that handles all reference types
from weaver.processes.wps_package import _generate_process_with_cwl_from_reference
cwl_pkg, _ = _generate_process_with_cwl_from_reference(absolute_url)
return json.dumps(cwl_pkg) if isinstance(cwl_pkg, dict) else str(cwl_pkg)
except Exception as exc:
LOGGER.error("Failed to fetch content from Content-Location %s: %s", absolute_url, exc)
raise HTTPBadRequest(json={
"title": "Failed to fetch Content-Location",
"description": f"Could not retrieve CWL from Content-Location: {content_location}",
"cause": {"error": str(exc), "Content-Location": content_location, "location": absolute_url}
})
[docs]
def _parse_multipart_message(content, content_type, request=None):
# type: (Union[str, bytes], str, Optional[AnyRequestType]) -> Any
"""
Parse raw multipart content into an email.message.Message object.
:param content: Raw multipart content (``str`` or ``bytes``)
:param content_type: ``Content-Type`` header value (must include ``boundary`` parameter)
:param request: Optional request object for extracting body
:returns: Parsed multipart message object
:raises HTTPBadRequest: If multipart content is malformed
"""
raw_content = _get_multipart_content(content, request)
msg_bytes = b"Content-Type: " + content_type.encode('utf-8') + b"\r\n\r\n" + raw_content
try:
msg = message_from_bytes(msg_bytes)
except Exception as exc:
raise HTTPBadRequest(json={
"title": "Failed to parse multipart content",
"description": str(exc),
"cause": {"error": exc.__class__.__name__}
})
if not msg.is_multipart():
raise HTTPBadRequest("Content is not multipart format")
return msg
[docs]
def _interpret_multipart_part(part, request=None):
# type: (Any, Optional[AnyRequestType]) -> Optional[Tuple[str, str, str, JSON]]
"""
Interpret a single multipart part: decode, fetch content if needed, and parse.
:param part: Single part from multipart message
:param request: Optional request object for resolving relative ``Content-Location`` URLs
:returns:
``tuple`` of (content_type, content_id, content_location, parsed_data) or ``None`` if part cannot be parsed
"""
part_content_type = part.get_content_type()
content_id = part.get('Content-ID', '').strip('<>')
content_location = part.get('Content-Location', '').strip()
# Decode part content
part_content = part.get_payload(decode=True)
if isinstance(part_content, bytes):
charset = part.get_content_charset() or 'utf-8'
try:
part_content = part_content.decode(charset)
except (UnicodeDecodeError, LookupError):
part_content = part_content.decode('utf-8', errors='replace')
# Fetch from Content-Location if part body is empty
part_content = _fetch_multipart_content_location(content_location, part_content, request)
# Parse JSON/YAML if applicable
if (part_content_type in ContentType.ANY_CWL or
part_content_type in [ContentType.APP_JSON, ContentType.APP_YAML]):
try:
part_data = yaml.safe_load(part_content)
return part_content_type, content_id, content_location, part_data
except Exception as exc:
LOGGER.warning("Failed to parse part with Content-Type %s: %s", part_content_type, exc)
return None
return None
[docs]
def _organize_deploy_parts(interpreted_parts, root_workflow_cid):
# type: (List[Tuple[str, str, str, JSON]], Optional[str]) -> Tuple[List[CWL], Optional[JSON]]
"""
Organize interpreted multipart parts into CWL packages and process description.
:param interpreted_parts: ``list`` of interpreted parts (content_type, content_id, content_location, data)
:param root_workflow_cid: Content-ID of root workflow from ``start`` parameter (if any)
:returns: ``tuple`` of (``list`` of :term:`CWL` packages, optional process description)
:raises HTTPBadRequest: If no CWL packages found or root workflow validation fails
"""
cwl_packages = []
process_description = None
parts_by_cid = {}
for _, content_id, _, part_data in interpreted_parts:
process_description = _classify_multipart_part(
part_data, cwl_packages, parts_by_cid, content_id, process_description
)
if not cwl_packages:
raise HTTPBadRequest(json={
"title": "No CWL packages found in multipart content",
"description": "Multipart request must contain at least one CWL package part.",
"cause": {"parts_found": len(interpreted_parts)}
})
cwl_packages = _validate_and_reorder_multipart_workflow(cwl_packages, root_workflow_cid, parts_by_cid)
return cwl_packages, process_description
[docs]
def parse_multipart_deploy(content, content_type, request=None):
# type: (Union[str, bytes], str, Optional[AnyRequestType]) -> Tuple[List[CWL], Optional[JSON]]
"""
Parse ``multipart/mixed`` or ``multipart/related`` deployment content.
Extracts :term:`CWL` packages and optional :term:`Process` description from multipart request.
:param content: Raw multipart content (``str`` or ``bytes``)
:param content_type: ``Content-Type`` header value (must include ``boundary`` parameter)
:param request: Optional request object for extracting body
:returns: ``tuple`` of (``list`` of :term:`CWL` packages, optional :term:`Process` description metadata)
:raises HTTPBadRequest: If multipart content is malformed or invalid
"""
msg = _parse_multipart_message(content, content_type, request)
root_workflow_cid = _extract_multipart_start_parameter(content_type)
interpreted_parts = []
for part in msg.get_payload():
interpreted = _interpret_multipart_part(part, request)
if interpreted:
interpreted_parts.append(interpreted)
return _organize_deploy_parts(interpreted_parts, root_workflow_cid)
[docs]
def parse_process_deploy_content(
request=None, # type: Optional[AnyRequestType]
content=None, # type: Optional[Union[JSON, str]]
content_schema=None, # type: Optional[colander.SchemaNode]
content_type=sd.RequestContentTypeHeader.default, # type: Optional[ContentType]
content_type_schema=sd.RequestContentTypeHeader, # type: Optional[colander.SchemaNode]
): # type: (...) -> Union[JSON, CWL]
"""
Load the request content with validation of expected content type and their schema.
"""
# Get full Content-Type header (including parameters like boundary) from request if available
if request is not None:
# Use get_header to get the full header value with parameters, not request.content_type which strips them
request_headers = getattr(request, 'headers', {})
full_content_type = get_header("Content-Type", request_headers)
if full_content_type:
content_type = full_content_type
# If content is already a parsed dict (from recursive calls), skip parsing
if isinstance(content, dict):
pass # Content already parsed, proceed to validation
elif content_type and any(mt in content_type.lower() for mt in ContentType.ANY_MULTIPART):
LOGGER.info("Detected multipart deployment request")
cwl_packages, _ = parse_multipart_deploy(
content=content if content is not None else request.body,
content_type=content_type,
request=request
)
if len(cwl_packages) == 1:
# Single CWL package - use as-is
# This avoids double-wrapping if the package already contains a $graph
content = cwl_packages[0]
else:
# Multiple packages - return as list to be wrapped in $graph
content = cwl_packages
# Content is now parsed, skip to validation
else:
# Non-multipart content: parse from request or string
try:
if request is not None:
content = request.text
content_type = request.content_type
if content_type is not None and content_type_schema is not None:
content_type = content_type_schema().deserialize(content_type)
if isinstance(content, str):
content = yaml.safe_load(content)
if not isinstance(content, dict):
raise TypeError("Not a valid JSON body for process deployment.")
except colander.Invalid as exc:
raise HTTPUnsupportedMediaType(json={
"title": "Unsupported Media Type",
"type": "http://www.opengis.net/def/exceptions/ogcapi-processes-2/1.0/unsupported-media-type",
"detail": str(exc),
"status": HTTPUnsupportedMediaType.code,
"cause": {"Content-Type": None if content_type is None else str(content_type)},
})
except Exception as exc:
raise HTTPBadRequest(json={
"title": "Bad Request",
"type": "BadRequest",
"detail": "Unable to parse contents.",
"status": HTTPBadRequest.code,
"cause": str(exc),
})
# Validate content schema (applies to both multipart and non-multipart)
try:
if content_schema is not None:
content = content_schema().deserialize(content)
except colander.Invalid as exc:
raise HTTPUnprocessableEntity(json={
"type": "InvalidParameterValue",
"title": "Failed schema validation.",
"status": HTTPUnprocessableEntity.code,
"error": colander.Invalid.__name__,
"cause": exc.msg,
"value": repr_json(exc.value, force_string=False),
})
return content
[docs]
def deploy_process_from_payload(payload, container, overwrite=False): # pylint: disable=R1260,too-complex
# type: (Union[JSON, str], Union[AnySettingsContainer, AnyRequestType], Union[bool, Process]) -> HTTPException
"""
Deploy the process after resolution of all references and validation of the parameters from payload definition.
Adds a :class:`weaver.datatype.Process` instance to storage using the provided JSON ``payload``
matching :class:`weaver.wps_restapi.swagger_definitions.ProcessDescription`.
:param payload: JSON payload that was specified during the process deployment request.
:param container:
Container to retrieve application settings.
If it is a ``request``-like object, additional parameters may be used to identify the payload schema.
:param overwrite:
In case of a pure deployment (from scratch), indicates (using :class:`bool`) whether to allow override of
an existing process definition if conflict occurs. No versioning is applied in this case (full replacement).
In case of an update deployment (from previous), indicates which process to be replaced with updated version.
The new version should not conflict with another existing process version. If payload doesn't provide a new
version, the following `MAJOR` version from the specified overwrite process is used to define the new revision.
:returns: HTTPOk if the process registration was successful.
:raises HTTPException: for any invalid process deployment step.
"""
headers = getattr(container, "headers", {}) # container is any request (as when called from API Deploy request)
c_type_full = get_header("Content-Type", headers) or ContentType.APP_OGC_PKG_JSON
# use deepcopy of to remove any circular dependencies before writing to mongodb or any updates to the payload
# For multipart requests, we need to pass the request object to access the raw body
payload = parse_process_deploy_content(
request=container if hasattr(container, 'body') else None,
content=payload,
content_type=c_type_full, # Pass full Content-Type with parameters (e.g., boundary)
content_type_schema=sd.DeployContentType,
)
# Extract process ID from Content-ID header if provided (RFC 2392)
content_id_header = get_header("Content-ID", headers)
resource_id = None
if content_id_header:
try:
resource_id, _ = parse_content_id(content_id_header)
except ValueError as exc:
raise HTTPBadRequest(json={
"title": "Invalid Content-ID header format",
"description": (
f"Content-ID header does not conform to RFC 2392 format: {exc}. "
"Expected format: '<resource-id@context>'"
),
"cause": {"Content-ID": content_id_header, "error": str(exc)},
})
if resource_id:
# Extract process ID from payload - check multiple possible locations
payload_id = payload.get("id") # Direct CWL or top-level
if not payload_id and "processDescription" in payload:
process_desc = payload.get("processDescription", {})
# OLD schema: processDescription.process.id
if "process" in process_desc and isinstance(process_desc["process"], dict):
payload_id = process_desc["process"].get("id")
# OGC schema: processDescription.id
elif "id" in process_desc:
payload_id = process_desc.get("id")
# Validate Content-ID matches payload ID
if payload_id and payload_id != resource_id:
raise HTTPUnprocessableEntity(json={
"title": "Content-ID header mismatch",
"description": (
f"Content-ID header resource [{resource_id}] does not match payload id [{payload_id}]. "
"Please ensure they match or omit the Content-ID header."
),
"cause": {"Content-ID": content_id_header, "payload_id": payload_id},
})
# For multipart/list payload, skip validation and go directly to multi-CWL deployment
# Each individual CWL package will be validated during recursive deployment
# WARNING: Multi-CWL deployment is NOT atomic. If deployment fails midway, previously deployed
# tools remain in the database without rollback. Retry attempts will skip already-deployed tools.
if isinstance(payload, list):
LOGGER.info("Detected multi-CWL deployment (multipart) with %d packages", len(payload))
return _deploy_process_multi_cwl(payload, container, overwrite)
payload_copy = deepcopy(payload)
payload = _check_deploy(payload)
# Remove schema fields regardless of deployment type
payload.pop("$schema", None)
payload.pop("$id", None)
# validate identifier naming for unsupported characters
process_desc = payload.get("processDescription", {}) # empty possible if CWL directly passed
process_info = process_desc.get("process", process_desc)
process_href = process_desc.pop("href", None) or payload.get("process", None)
process_href = process_href if isinstance(process_href, str) else None
if "process" in process_desc:
process_param = "processDescription.process"
elif process_href and "process" in payload:
process_param = "process"
payload.pop("process")
process_info = payload
else:
process_param = "processDescription"
# retrieve CWL package definition, either via "href" (WPS-1/2), "owsContext" or "executionUnit" (package/reference)
deployment_profile_name = payload.get("deploymentProfileName", "")
ows_context = process_info.pop("owsContext", None)
reference = None
package = None
found = False
# Detect direct CWL format (vs OGC deployment wrapper with processDescription/executionUnit).
# Recursion is safe: resolve_cwl_graph() unpacks $graph items before recursive calls.
if "cwlVersion" in payload:
# Extract and remove Weaver-specific 'version' field before CWL processing.
# The 'version' field is a Weaver extension for process versioning and is not part of the CWL standard.
# Leaving it in would cause CWL schema validation errors downstream.
process_info = {"version": payload.pop("version", None)}
package = resolve_cwl_graph(payload)
found = True
elif process_href:
reference = process_href # reference type handled downstream
found = isinstance(reference, str)
elif isinstance(ows_context, dict):
offering = ows_context.get("offering")
if not isinstance(offering, dict):
raise HTTPUnprocessableEntity(f"Invalid parameter '{process_param}.owsContext.offering'.")
content = offering.get("content")
if not isinstance(content, dict):
raise HTTPUnprocessableEntity(f"Invalid parameter '{process_param}.owsContext.offering.content'.")
package = None
reference = content.get("href")
found = isinstance(reference, str)
else: # ogc-apppkg type, but no explicit check since used by default (backward compat)
if deployment_profile_name: # optional hint
allowed_profile_suffix = [ProcessType.APPLICATION, ProcessType.WORKFLOW]
if not any(deployment_profile_name.lower().endswith(typ) for typ in allowed_profile_suffix):
raise HTTPBadRequest("Invalid value for parameter 'deploymentProfileName'.")
execution_units = payload.get("executionUnit")
if isinstance(execution_units, dict):
if "unit" not in execution_units and "href" not in execution_units:
execution_units = {"unit": execution_units}
execution_units = [execution_units]
if (not isinstance(execution_units, list) or len(execution_units) < 1 or
not isinstance(execution_units[0], dict)):
raise HTTPUnprocessableEntity("Invalid parameter 'executionUnit'.")
if len(execution_units) == 1:
execution_unit = execution_units[0]
package = execution_unit.get("unit")
reference = execution_unit.get("href")
found = package or reference
else:
# Multiple execution units: extract all packages (inline or fetch from references)
package = resolve_multi_execution_units(execution_units)
reference = None
found = True
if not found:
params = [
"process (href)",
"processDescription.process.href",
"processDescription.process.owsContext.content.href",
"processDescription.href",
"processDescription.owsContext.content.href",
"executionUnit[*].(unit|href)",
"{ <CWL> }",
]
raise HTTPBadRequest(
f"Missing one of required parameters {params} to obtain package/process definition or reference."
)
if process_info.get("type", "") == ProcessType.BUILTIN:
raise HTTPBadRequest(
f"Invalid process type resolved from package: [{ProcessType.BUILTIN}]. "
f"Deployment of {ProcessType.BUILTIN} process is not allowed."
)
settings = get_settings(container)
# Handle multi-CWL deployment from $graph
if isinstance(package, tuple):
# package is (list_of_cwls, original_payload_with_graph)
package, _ = package
if isinstance(package, list):
return _deploy_process_multi_cwl(package, container, overwrite)
# update and validate process information using WPS process offering, CWL/WPS reference or CWL package definition
process_info = _validate_deploy_process_info(process_info, reference, package, settings, headers)
restapi_url = get_wps_restapi_base_url(settings)
description_url = "/".join([restapi_url, "processes", process_info["identifier"]])
execute_endpoint = "/".join([description_url, "jobs"])
# ensure that required "processEndpointWPS1" in db is added,
# will be auto-fixed to localhost if not specified in body
process_info["processEndpointWPS1"] = process_desc.get("processEndpointWPS1")
process_info["executeEndpoint"] = execute_endpoint
process_info["payload"] = payload_copy
process_info["jobControlOptions"] = process_desc.get("jobControlOptions", [])
process_info["outputTransmission"] = process_desc.get("outputTransmission", [])
process_info["processDescriptionURL"] = description_url
# insert the "resolved" context using details retrieved from "executionUnit"/"href" or directly with "owsContext"
if "owsContext" not in process_info and reference:
process_info["owsContext"] = {"offering": {"content": {"href": str(reference)}}}
elif isinstance(ows_context, dict):
process_info["owsContext"] = ows_context
# if user provided additional links that have valid schema,
# process them separately since links are generated dynamically from API settings per process
# don't leave them there as they would be seen as if the 'Process' class generated the field
if "links" in process_info:
process_info["additional_links"] = process_info.pop("links")
# remove schema to avoid later deserialization error if different, but remaining content is valid
# also, avoid storing this field in the process object, regenerate it as needed during responses
process_info.pop("$schema", None)
process_info.pop("$id", None)
try:
process = Process(process_info) # if 'version' was provided in deploy info, it will be added as hint here
if isinstance(overwrite, Process):
process_summary = _update_deploy_process_version(process, overwrite, VersionLevel.MAJOR, container)
else:
process_summary = _save_deploy_process(process, overwrite, container)
except ValueError as exc:
LOGGER.error("Failed schema validation of deployed process summary:\n%s", exc)
raise HTTPBadRequest(detail=str(exc))
except HTTPException:
raise
links = process.links(container)
loc_url = next(link["href"] for link in links if link["rel"] == "self")
process_summary["links"] = links
data = {
"description": sd.OkPostProcessesResponse.description,
"processSummary": process_summary,
"deploymentDone": True,
}
if deployment_profile_name:
data["deploymentProfileName"] = deployment_profile_name
headers = {
"Content-Type": ContentType.APP_JSON,
"Content-Location": loc_url,
"Location": loc_url,
}
if overwrite and (
isinstance(overwrite, bool) or (
isinstance(overwrite, Process) and
overwrite.version == process_summary.get("version")
)
):
http_cls = HTTPOk
else:
http_cls = HTTPCreated
return http_cls(json=data, headers=headers)
[docs]
def _save_deploy_process(process, override, container):
# type: (Process, bool, AnySettingsContainer) -> JSON
"""
Store the :class:`Process` to database with error handling and appropriate message reporting the problem.
"""
try:
sd.ProcessSummary().deserialize(process) # make it fail before save if invalid, then apply for real
db = get_db(container)
store = db.get_store(StoreProcesses)
new_process = store.save_process(process, overwrite=override)
process_summary = new_process.summary(container=container)
except ProcessRegistrationError as exc:
raise HTTPConflict(json={
"type": "http://www.opengis.net/def/exceptions/ogcapi-processes-2/1.0/duplicated-process",
"title": "Process definition conflict.",
"detail": str(exc),
"status": HTTPConflict.code,
"cause": {"process_id": process.id},
})
except colander.Invalid as exc:
LOGGER.error("Failed schema validation of updated process summary:\n%s", exc)
raise HTTPBadRequest(json={
"description": "Failed schema validation of process summary.",
"cause": f"Invalid schema: [{exc.msg or exc!s}]",
"error": exc.__class__.__name__,
"value": exc.value
})
return process_summary
[docs]
def _deploy_process_multi_cwl(
cwl_packages, # type: List[CWL]
container, # type: Union[AnySettingsContainer, AnyRequestType]
overwrite, # type: Union[bool, Process]
): # type: (...) -> HTTPException
"""
Deploy multiple :term:`CWL` packages from a ``$graph`` definition using recursive deployment calls.
Rather than duplicating deployment logic, this function orchestrates the deployment order and recursively
calls :func:`deploy_process_from_payload` for each package to reuse all validation, URL setup, and storage logic.
.. warning::
This deployment is **NOT atomic**. If any CWL package fails during deployment:
- Previously deployed tools remain in the database (no rollback)
- Retry attempts will skip already-deployed tools (``HTTPConflict`` is caught and ignored)
- Child tools are always deployed with ``overwrite=False``, even if the main process uses ``overwrite=True``
This means partial deployments can leave orphaned processes that must be manually cleaned up.
:param cwl_packages: ``list`` of resolved :term:`CWL` package definitions.
:param container: Application container.
:param overwrite: Whether to overwrite existing processes. Note: only applies to main process, not child tools.
:returns: HTTP response with deployment result from the main process.
"""
LOGGER.info("Deploying multi-CWL package with %d definitions", len(cwl_packages))
# Resolve deployment order (tools first, main process last)
# main_process is either the Workflow or the tool with id "#main"/"main"
tools, main_process_pkg = resolve_deployment_order(cwl_packages)
main_process_id = main_process_pkg.get("id")
deployed_processes = []
# Deploy CommandLineTools first (but skip the one that will be the main process)
for tool_pkg in tools:
tool_id = tool_pkg.get("id")
# Skip if this tool will be deployed as the main process
if tool_id == main_process_id:
LOGGER.info("Skipping child deployment of %s (will be deployed as main process)", tool_id)
continue
LOGGER.info("Deploying CWL tool: %s", tool_id)
# Strip # prefix from tool ID if present (valid in $graph but not as process ID)
tool_pkg_deploy = deepcopy(tool_pkg)
if tool_id and tool_id.startswith("#"):
tool_pkg_deploy["id"] = tool_id[1:]
# Pass CWL package directly - it will be detected as CWL content by cwlVersion field
# NOTE: Child tools are always deployed with overwrite=False to preserve existing dependencies.
# This means retrying a failed multi-CWL deployment will skip already-deployed tools.
try:
response = deploy_process_from_payload(tool_pkg_deploy, container, overwrite=False)
deployed_processes.append(response.json["processSummary"])
LOGGER.info("Successfully deployed CWL tool: %s", tool_id)
except HTTPConflict:
# Tool already exists from a previous (possibly failed) deployment attempt.
# Continue deployment to allow retry scenarios without requiring manual cleanup.
LOGGER.info("CWL tool already exists: %s, skipping deployment", tool_id)
except Exception as exc:
# Any other error (validation, permission, etc.) stops the entire deployment.
# WARNING: Previously deployed tools from this attempt are NOT rolled back.
LOGGER.error("Failed to deploy CWL tool %s: %s", tool_id, exc)
raise
# Deploy main process (workflow or main tool)
main_pkg_deploy = deepcopy(main_process_pkg)
# Strip # prefix from main process ID if present
if main_process_id and main_process_id.startswith("#"):
main_pkg_deploy["id"] = main_process_id[1:]
# For workflows, update step references (remove # prefixes to reference deployed processes)
if main_pkg_deploy.get("class") == "Workflow" and "steps" in main_pkg_deploy:
for step_data in main_pkg_deploy.get("steps", {}).values():
if "run" in step_data and isinstance(step_data["run"], str):
if step_data["run"].startswith("#"):
step_data["run"] = step_data["run"][1:]
# Pass CWL package directly
LOGGER.info("Deploying main CWL process: %s", main_process_id)
response = deploy_process_from_payload(main_pkg_deploy, container, overwrite)
LOGGER.info("Successfully deployed multi-CWL package: main process %s with %d child processes",
main_process_id, len(deployed_processes))
return response
[docs]
def _update_deploy_process_version(process, process_overwrite, update_level, container=None):
# type: (Process, Process, VersionLevel, Optional[AnySettingsContainer]) -> JSON
"""
Handle all necessary update operations of a :term:`Process` definition.
Validate that any specified version for :term:`Process` deployment is valid against any other existing versions.
Perform any necessary database adjustments to replace the old :term:`Process` references for the creation of the
updated :term:`Process` to ensure all versions and links remain valid against their original references.
:param process: Desired new process definition.
:param process_overwrite: Old process from which update of the definition in database could be required.
:param update_level:
Minimum semantic version level required for this update operation.
If the new :term:`Process` definition did not provide a version explicitly, this level will be used to
automatically infer the following revision number based on the old :term:`Process` reference.
:param container: Any container to retrieve a database connection.
:returns: Process summary with definition retrieved from storage (saved) after all operations were applied.
:raises HTTPException: Relevant error is raised in the even of any erroneous process definition (old and new).
"""
if not process.mutable:
raise HTTPForbidden(json={
"type": "http://www.opengis.net/def/exceptions/ogcapi-processes-2/1.0/immutable-process",
"title": "Process immutable.",
"detail": "Cannot update an immutable process.",
"status": HTTPForbidden.code,
"cause": {"mutable": False}
})
if process.name != process_overwrite.name:
raise HTTPBadRequest(json={
"type": "InvalidParameterValue",
"title": "Invalid process identifier.",
"detail": "Specified process identifier in payload definition does not match expected ID in request path.",
"status": HTTPBadRequest.code,
"cause": {"pathProcessID": process_overwrite.name, "bodyProcessID": process.name}
})
db = get_db(container)
store = db.get_store(StoreProcesses)
# if no new version was specified, simply take the next one relevant for the update level
# then check that new version is within available range against target process for required update level
new_version = process.version if process.version else _bump_process_version(process_overwrite.version, update_level)
taken_versions = store.find_versions(process.id, VersionFormat.STRING) # string format for output if error
if not is_update_version(new_version, taken_versions, update_level):
new_version = as_version_major_minor_patch(new_version, VersionFormat.STRING)
ref_version = as_version_major_minor_patch(process_overwrite.version, VersionFormat.STRING)
if new_version in taken_versions:
http_error = HTTPConflict
message = "Process version conflicts with already taken revisions."
else:
http_error = HTTPUnprocessableEntity
message = "Semantic version is not of appropriate update level for requested changes."
raise http_error(json={
"type": "InvalidParameterValue",
"title": "Invalid version value.",
"detail": message,
"status": http_error.code,
"cause": {
"revisions": taken_versions,
"reference": ref_version,
"version": new_version,
"change": update_level,
}
})
old_version = None
op_override = process_overwrite
try:
# if source process for update is not the latest, no need to rewrite 'id:version' since it is not only 'id'
# otherwise, replace latest process 'id' by explicit 'id:version'
if process_overwrite.latest:
pid_only = process_overwrite.name
old_version = process_overwrite.version
old_process = store.update_version(pid_only, old_version)
process_tag = old_process.tag
# since 'id' reference changes from old to new process,
# reflect the change in any job that could refer to it
job_store = db.get_store(StoreJobs)
n_updated = job_store.batch_update_jobs({"process": pid_only}, {"process": process_tag})
LOGGER.debug("Updated %s jobs from process [%s] to old revision [%s]", n_updated, pid_only, process_tag)
op_override = False # make sure no conflict when saving process afterward
process.version = new_version
process_summary = _save_deploy_process(process, op_override, container)
# add more version information to already handled error to better report the real conflict of revisions if any
except (ProcessRegistrationError, HTTPConflict):
if old_version is not None:
old_version = as_version_major_minor_patch(old_version, VersionFormat.STRING)
if new_version is not None:
new_version = as_version_major_minor_patch(new_version, VersionFormat.STRING)
raise HTTPConflict(json={
"type": "http://www.opengis.net/def/exceptions/ogcapi-processes-2/1.0/duplicated-process",
"title": "Process definition conflict.",
"detail": "Failed update of process conflicting with another definition or revision.",
"status": HTTPConflict.code,
"cause": {"process_id": process.id, "old_version": old_version, "new_version": new_version},
})
return process_summary
[docs]
def _bump_process_version(version, update_level):
# type: (AnyVersion, VersionLevel) -> AnyVersion
"""
Obtain the relevant version with specified level incremented by one.
"""
new_version = list(as_version_major_minor_patch(version, VersionFormat.PARTS))
if update_level == VersionLevel.PATCH:
new_version[2] += 1
elif update_level == VersionLevel.MINOR:
new_version[1] += 1
new_version[2] = 0
elif update_level == VersionLevel.MAJOR:
new_version[0] += 1
new_version[1] = 0
new_version[2] = 0
return new_version
[docs]
def parse_wps_process_config(config_entry):
# type: (Union[JSON, str]) -> Tuple[str, str, List[str], bool]
"""
Parses the available WPS provider or process entry to retrieve its relevant information.
:return: WPS provider name, WPS service URL, and list of process identifier(s).
:raise ValueError: if the entry cannot be parsed correctly.
"""
if isinstance(config_entry, dict):
svc_url = config_entry["url"]
svc_name = config_entry.get("name")
svc_proc = config_entry.get("id", [])
svc_vis = asbool(config_entry.get("visible", False))
elif isinstance(config_entry, str):
svc_url = config_entry
svc_name = None
svc_proc = []
svc_vis = False
else:
raise ValueError(f"Invalid service value: [{config_entry!s}].")
url_p = urlparse(svc_url)
qs_p = parse_qs(url_p.query)
svc_url = get_url_without_query(url_p)
# if explicit name was provided, validate it (assert fail if not),
# otherwise replace silently bad character since it is requested to be inferred
svc_name = get_sane_name(svc_name or url_p.hostname, assert_invalid=bool(svc_name), min_len=1)
svc_proc = svc_proc or qs_p.get("identifier", []) # noqa # 'identifier=a,b,c' techically allowed
svc_proc = [proc.strip() for proc in svc_proc if proc.strip()] # remote empty
if not isinstance(svc_name, str):
raise ValueError(f"Invalid service value: [{svc_name!s}].")
if not isinstance(svc_proc, list):
raise ValueError(f"Invalid process value: [{svc_proc!s}].")
return svc_name, svc_url, svc_proc, svc_vis
[docs]
def register_wps_processes_static(service_url, service_name, service_visibility, service_processes, container):
# type: (str, str, bool, List[str], AnyRegistryContainer) -> None
"""
Register WPS-1 :term:`Process` under a service :term:`Provider` as static references.
For a given WPS provider endpoint, either iterates over all available processes under it to register them one
by one, or limit itself only to those of the reduced set specified by :paramref:`service_processes`.
The registered `WPS-1` processes generate a **static** reference, meaning that metadata of each process as well
as any other modifications to the real remote reference will not be tracked, including validation of even their
actual existence, or modifications to inputs/outputs. The :term:`Application Package` will only point to it
assuming it remains valid.
Each of the deployed processes using *static* reference will be accessible directly under `Weaver` endpoints::
/processes/<service-name>_<process-id>
The service is **NOT** deployed as :term:`Provider` since the processes are registered directly.
.. seealso::
- :func:`register_wps_processes_dynamic`
:param service_url: WPS-1 service location (where ``GetCapabilities`` and ``DescribeProcess`` requests can be made).
:param service_name: Identifier to employ for generating the full process identifier.
:param service_visibility: Visibility flag of the provider.
:param service_processes: process IDs under the service to be registered, or all if empty.
:param container: settings to retrieve required configuration settings.
"""
db = get_db(container)
process_store = db.get_store(StoreProcesses) # type: StoreProcesses
LOGGER.info("Fetching WPS-1: [%s]", service_url)
wps = get_wps_client(service_url, container)
if Version(wps.version) >= Version("2.0"):
LOGGER.warning("Invalid WPS-1 provider, version was [%s]", wps.version)
return
wps_processes = [wps.describeprocess(p) for p in service_processes] or wps.processes
for wps_process in wps_processes:
proc_id = f"{service_name}_{get_sane_name(wps_process.identifier, min_len=1)}"
wps_pid = wps_process.identifier
proc_url = f"{service_url}?service=WPS&request=DescribeProcess&identifier={wps_pid}&version={wps.version}"
svc_vis = Visibility.PUBLIC if service_visibility else Visibility.PRIVATE
try:
old_process = process_store.fetch_by_id(proc_id)
except ProcessNotFound:
pass
else:
if (
old_process.id == proc_id
and old_process.processDescriptionURL == proc_url
and old_process.visibility == svc_vis
):
LOGGER.warning("Process already registered: [%s]. Skipping...", proc_id)
continue
LOGGER.warning("Process matches registered one: [%s]. Updating details...", proc_id)
payload = {
"processDescription": {"process": {"id": proc_id, "visibility": svc_vis}},
"executionUnit": [{"href": proc_url}],
"deploymentProfileName": "http://www.opengis.net/profiles/eoc/wpsApplication",
}
try:
resp = deploy_process_from_payload(payload, container, overwrite=True)
if resp.status_code == HTTPOk.code:
LOGGER.info("Process registered: [%s]", proc_id)
else:
raise RuntimeError(f"Process registration failed: [{proc_id}]")
except Exception as ex:
LOGGER.exception("Exception during process registration: [%r]. Skipping...", ex)
continue
[docs]
def register_wps_processes_dynamic(service_name, service_url, service_visibility, container):
# type: (str, str, bool, AnyRegistryContainer) -> None
"""
Register a WPS service ``provider`` such that ``processes`` under it are dynamically accessible on demand.
The registered `WPS-1` provider generates a **dynamic** reference to processes under it. Only the :term:`Provider`
reference itself is actually registered. No :term:`Process` are directly registered following this operation.
When information about the offered processes, descriptions of those processes or their execution are requested,
`Weaver` will query the referenced :term:`Provider` for details and convert the corresponding :term:`Process`
dynamically. This means that latest metadata of the :term:`Process`, and any modification to it on the remote
service will be immediately reflected on `Weaver` without any need to re-deploy processes.
Each of the deployed processes using *dynamic* reference will be accessible under `Weaver` endpoints::
/providers/<service-name>/processes/<process-id>
The processes are **NOT** deployed locally since the processes are retrieved from the :term:`Provider` itself.
.. seealso::
- :func:`register_wps_processes_static`
:param service_url: WPS-1 service location (where ``GetCapabilities`` and ``DescribeProcess`` requests can be made).
:param service_name: Identifier to employ for registering the provider identifier.
:param service_visibility: Visibility flag of the provider.
:param container: settings to retrieve required configuration settings.
"""
db = get_db(container)
service_store = db.get_store(StoreServices) # type: StoreServices
LOGGER.info("Register WPS-1/2 provider: [%s]", service_url)
try:
get_wps_client(service_url, container) # only attempt fetch to validate it exists
except Exception as ex:
LOGGER.exception("Exception during provider validation: [%s] [%r]. Skipping...", service_name, ex)
return
new_service = Service(name=service_name, url=service_url, public=service_visibility)
try:
old_service = service_store.fetch_by_name(service_name)
except ServiceNotFound:
LOGGER.info("Registering new provider: [%s]...", service_name)
else:
if new_service == old_service:
LOGGER.warning("Provider already registered: [%s]. Skipping...", service_name)
return
LOGGER.warning("Provider matches registered service: [%s]. Updating details...", service_name)
try:
service_store.save_service(new_service, overwrite=True)
except Exception as ex:
LOGGER.exception("Exception during provider registration: [%s] [%r]. Skipping...", service_name, ex)
[docs]
def register_wps_processes_from_config(container, wps_processes_file_path=None):
# type: (AnySettingsContainer, Optional[FileSystemPathType]) -> None
"""
Registers remote :term:`WPS` providers and/or processes as specified from the configuration file.
Loads a ``wps_processes.yml`` file and registers processes under :ref:`proc_wps_12` providers to the
current `Weaver` instance as equivalent :term:`OGC API - Processes` instances.
References listed under ``processes`` are registered statically (by themselves, unchanging snapshot).
References listed under ``providers``, the :term:`WPS` themselves are registered, making each :term:`Process`
listed in their ``GetCapabilities`` available. In this case, registered processes are defined dynamically,
meaning they will be fetched on the provider each time a request refers to them, keeping their definition
up-to-date with the remote server.
.. versionadded:: 1.14
When references are specified using ``providers`` section instead of ``processes``, the registration
only saves the remote WPS provider endpoint to dynamically populate :term:`WPS` processes on demand.
Previous behavior was to register each :term:`WPS` process individually with ID ``[service]_[process]``.
.. versionchanged:: 4.19
Parameter position are inverted.
If :paramref:`wps_processes_file_path` is explicitly provided, it is used directly without considering settings.
Otherwise, automatically employ the definition in setting: ``weaver.wps_processes_file``.
.. seealso::
- `weaver.wps_processes.yml.example` for additional file format details.
.. note::
Settings with an explicit empty ``weaver.wps_processes_file`` entry will be considered as *nothing to load*.
If the entry is omitted, default location :data:`WEAVER_DEFAULT_WPS_PROCESSES_CONFIG` is attempted instead.
:param container: Registry container to obtain database reference as well as application settings.
:param wps_processes_file_path: Override file path to employ instead of default settings definition.
"""
if wps_processes_file_path is not None:
LOGGER.info("Using WPS-1 explicit override parameter to obtain file reference.")
else:
LOGGER.info("Using WPS-1 file reference from configuration settings.")
settings = get_settings(container)
wps_processes_file_path = settings.get("weaver.wps_processes_file")
if wps_processes_file_path is None:
warnings.warn("No file specified for WPS-1 providers registration.", RuntimeWarning)
wps_processes_file_path = get_weaver_config_file("", WEAVER_DEFAULT_WPS_PROCESSES_CONFIG,
generate_default_from_example=False)
elif wps_processes_file_path == "":
warnings.warn("Configuration file for WPS-1 providers registration explicitly defined as empty in settings. "
"Not loading anything.", RuntimeWarning)
return
# reprocess the path in case it is relative to default config directory
wps_processes_file_path = get_weaver_config_file(wps_processes_file_path, WEAVER_DEFAULT_WPS_PROCESSES_CONFIG,
generate_default_from_example=False)
if wps_processes_file_path == "":
warnings.warn("No file specified for WPS-1 providers registration.", RuntimeWarning)
return
LOGGER.info("Using WPS-1 provider processes file: [%s]", wps_processes_file_path)
try:
with open(wps_processes_file_path, mode="r", encoding="utf-8") as f:
# if file is empty (not even processes/providers section), None is return instead of dict
processes_config = yaml.safe_load(f) or {}
if processes_config:
processes = processes_config.get("processes") or []
providers = processes_config.get("providers") or []
else:
processes = providers = None
if not processes and not providers:
LOGGER.warning("Nothing to process from file: [%s]", wps_processes_file_path)
return
# either 'service' references to register every underlying 'process' individually
# or explicit 'process' references to register by themselves
for cfg_service in processes:
svc_name, svc_url, svc_proc, svc_vis = parse_wps_process_config(cfg_service)
register_wps_processes_static(svc_url, svc_name, svc_vis, svc_proc, container)
# direct WPS providers to register
for cfg_service in providers:
svc_name, svc_url, _, svc_vis = parse_wps_process_config(cfg_service)
register_wps_processes_dynamic(svc_name, svc_url, svc_vis, container)
LOGGER.info("Finished processing configuration file [%s].", wps_processes_file_path)
except Exception as exc: # pragma: no cover
msg = f"Invalid WPS-1 providers configuration file caused: [{fully_qualified_name(exc)}]({exc!s})."
LOGGER.exception(msg)
raise RuntimeError(msg)
[docs]
def _check_package_file(cwl_file_path_or_url):
# type: (str) -> str
"""
Validates that the specified :term:`CWL` file path or URL points to an existing and allowed file format.
:param cwl_file_path_or_url: one of allowed file types path on disk, or an URL pointing to one served somewhere.
:returns: validated absolute path or URL of the file reference.
:raises PackageRegistrationError: in case of missing file, invalid format or invalid HTTP status code.
"""
if is_remote_file(cwl_file_path_or_url):
cwl_path = cwl_file_path_or_url
cwl_resp = request_extra("head", cwl_path, settings=get_settings())
if cwl_resp.status_code != HTTPOk.code:
raise PackageRegistrationError(f"Cannot find CWL file at: '{cwl_path}'.")
else:
cwl_path = cwl_file_path_or_url[7:] if cwl_file_path_or_url.startswith("file://") else cwl_file_path_or_url
cwl_path = os.path.abspath(cwl_path)
if not os.path.isfile(cwl_path):
raise PackageRegistrationError(f"Cannot find CWL file at: '{cwl_file_path_or_url}'.")
file_ext = os.path.splitext(cwl_path)[-1].replace(".", "")
if file_ext not in PACKAGE_EXTENSIONS:
raise PackageRegistrationError(f"Not a valid CWL file type: '{file_ext}'.")
return cwl_path
[docs]
def is_cwl_package(package):
# type: (Any) -> bool
"""
Perform minimal validation of a :term:`CWL` package definition.
"""
return isinstance(package, dict) and "cwlVersion" in package
[docs]
def load_package_file(file_path):
# type: (str) -> CWL
"""
Loads the package in :term:`YAML`/:term:`JSON` format specified by the file path.
"""
file_path = _check_package_file(file_path)
try:
file_data = load_file(file_path)
except ValueError as ex:
raise PackageRegistrationError(f"Package parsing generated an error: [{ex!s}]")
if is_cwl_package(file_data):
return file_data
raise PackageRegistrationError(f"Package is not a valid CWL document: [{file_path}]")
[docs]
def register_cwl_processes_from_config(container):
# type: (AnySettingsContainer) -> int
"""
Load multiple :term:`CWL` definitions from a directory to register corresponding :term:`Process`.
.. versionadded:: 4.19
Each individual :term:`CWL` definition must fully describe a :term:`Process` by itself. Therefore, an ``id`` must
be available in the file to indicate the target deployment reference. In case of conflict, the existing database
:term:`Process` will be overridden to ensure file updates are applied.
Files are loaded in alphabetical order. If a :term:`Workflow` needs to refer to other processes, they should be
named in way that dependencies will be resolvable prior to the registration of the :term:`Workflow` :term:`Process`.
The resolved directory to search for :term:`CWL` will be traversed recursively.
This, along with the name of :term:`CWL` files themselves, can be used to resolve order-dependent loading cases.
Only ``.cwl`` extensions are considered to avoid invalid parsing of other files that could be defined in the shared
configuration directory.
.. note::
Settings with an explicit empty ``weaver.cwl_processes_dir`` entry will be considered as *nothing to load*.
If the entry is omitted, default location :data:`WEAVER_CONFIG_DIR` is used to search for :term:`CWL` files.
:param container: Registry container to obtain database reference as well as application settings.
:returns: Number of successfully registered processes from found :term:`CWL` files.
"""
settings = get_settings(container)
cwl_processes_dir = settings.get("weaver.cwl_processes_dir")
if cwl_processes_dir is None:
warnings.warn("No configuration setting [weaver.cwl_processes_dir] specified for CWL processes registration. "
f"Will use default location: [{WEAVER_CONFIG_DIR}]", RuntimeWarning)
cwl_processes_dir = WEAVER_CONFIG_DIR
elif cwl_processes_dir == "":
warnings.warn("Configuration setting [weaver.cwl_processes_dir] for CWL processes registration "
"is explicitly defined as empty. Not loading anything.", RuntimeWarning)
return 0
if not os.path.isdir(cwl_processes_dir):
warnings.warn(
"Configuration setting [weaver.cwl_processes_dir] for CWL processes registration "
f"is not an existing directory: [{cwl_processes_dir}]. Not loading anything.", RuntimeWarning
)
return 0
cwl_processes_dir = os.path.abspath(cwl_processes_dir)
cwl_files = sorted(pathlib.Path(cwl_processes_dir).rglob("*.cwl"),
# consider directory structure to sort, then use usual alphabetical order for same level
key=lambda file: (len(str(file).split("/")), str(file)))
if not cwl_files:
warnings.warn(
f"Configuration directory [{cwl_processes_dir}] for CWL processes registration "
"does not contain any CWL file. Not loading anything.", RuntimeWarning
)
return 0
register_count = 0
register_total = len(cwl_files)
register_error = asbool(settings.get("weaver.cwl_processes_register_error", False))
for cwl_path in cwl_files:
try:
cwl = load_package_file(str(cwl_path))
deploy_process_from_payload(cwl, settings, overwrite=True)
register_count += 1
except (HTTPException, PackageRegistrationError) as exc:
msg = (
f"Failed registration of process from CWL file: [{cwl_path!s}] "
f"caused by [{fully_qualified_name(exc)}]({exc!s})."
)
if register_error:
LOGGER.info("Requested immediate CWL registration failure with 'weaver.cwl_processes_register_error'.")
LOGGER.error(msg)
raise
warnings.warn(f"{msg} Skipping definition.", RuntimeWarning)
continue
if register_count and register_count == register_total:
LOGGER.info("Successfully registered %s processes from CWL files.", register_total)
elif register_count != register_total:
LOGGER.warning("Partial registration of CWL processes, only %s/%s succeeded.", register_count, register_total)
return register_count
[docs]
def pull_docker(docker_auth, logger=LOGGER):
# type: (DockerAuthentication, LoggerHandler) -> Optional[DockerClient]
"""
Pulls the referenced Docker image to local cache from an optionally secured registry.
If the Docker image is already available locally, simply validates it.
Authentication are applied as necessary using the provided parameters.
.. warning::
Logging calls must employ the
:param docker_auth: Docker reference with optional authentication parameters.
:param logger: Alternative logger reference to log status messages about the operation.
:returns: Docker client to perform further operations with the retrieved or validated image. None if failed.
"""
client = None
image = None
ref = docker_auth.reference
try:
# load from env is the same as CLI call
client = docker.from_env() # pylint: disable=I1101
# following login does not update '~/.docker/config.json' by design, but can use it if available
# session remains active only within the client
# Note:
# Force re-auth to ensure credentials are validated against remote registry and API Status is returned.
# This way, even if the auth were pre-resolved, we make sure they are still valid.
# This is important mostly because Docker images could still be present in cache, so pull doesn't occur.
# Warning:
# Without re-auth, plain credentials resolved from auth config are returned in body instead!
# With re-auth, body *could* contain an identity token depending on auth method.
if docker_auth.credentials:
logger.log(logging.DEBUG, "Retrieving image [%s] from Docker registry or cache.", ref)
body = client.login(reauth=True, **docker_auth.credentials)
if body.get("Status") != "Login Succeeded":
logger.log(
logging.DEBUG,
"Failed authentication to Docker private registry [%s].",
docker_auth.registry,
)
return None
else:
logger.log(logging.WARNING, "Expecting public access for image [%s] in Docker registry.", ref)
logger.log(logging.DEBUG, "Retrieving image [%s] from Docker registry or cache.", ref)
# docker client pulls all available images when no tag, provide the default to limit
try:
tag = docker_auth.tag or "latest"
image = client.images.pull(docker_auth.repository, tag) # actual pull or raise ImageNotFound
except ImageNotFound:
image = client.images.get(ref) # resolved from cache or raise ImageNotFound
LOGGER.warning("Failed pull of image [%s] from Docker registry, but found it in cache.", ref)
except Exception as exc: # noqa: W0703 # nosec: B110 # do not let anything up to avoid leaking auths
logger.log(
logging.DEBUG,
"Unhandled exception [%s] during Docker registry authentication or image retrieval.",
exc.__class__.__name__, exc_info=False, # only class name to help debug, but no contents
)
if not image or docker_auth.docker not in image.tags:
logger.log(
logging.DEBUG,
"Failed authorization or could not retrieve Docker image [%s] from private registry.",
ref,
)
return None
logger.log(logging.DEBUG, "Docker image [%s] retrieved.", ref)
return client