weaver.processes.utils

Module Contents

weaver.processes.utils.LOGGER[source]
weaver.processes.utils.UpdateFieldListMethod[source]
weaver.processes.utils.get_process(process_id: str | None = None, request: weaver.typedefs.PyramidRequest | None = None, settings: weaver.typedefs.SettingsType | None = None, store: weaver.store.base.StoreProcesses | None = None, revision: bool = True) weaver.datatype.Process[source]

Obtain the specified process and validate information, returning appropriate HTTP error if invalid.

Process identifier must be provided from either the request path definition or literal ID. Database must be retrievable from either the request, underlying settings, or direct store reference.

Changed in version 4.20: Process identifier can also be an ‘id:version’ tag. Also, the request query parameter ‘version’ can be used. If using the process_id explicitly instead of the request, a versioned Process reference MUST employ the tagged representation to resolve the appropriate Process revision.

Different parameter combinations are intended to be used as needed or more appropriate, such that redundant operations can be reduced where some objects are already fetched from previous operations.

Parameters:
  • process_id – Explicit Process identifier to employ for lookup.

  • request – When no explicit ID specified, try to find information from the request.

  • settings – Application settings for database connection. Can be guessed from local thread or request object if not given.

  • store – Database process store reference.

  • revision – When parsing the Process ID (either explicit or from request), indicate if any tagged revision specifier should be used or dropped.

weaver.processes.utils.map_progress(progress: weaver.typedefs.Number, range_min: weaver.typedefs.Number, range_max: weaver.typedefs.Number) weaver.typedefs.Number[source]

Calculates the relative progression of the percentage process within min/max values.

weaver.processes.utils.get_process_information(process_description: weaver.typedefs.JSON) weaver.typedefs.JSON[source]

Obtain the details for the process within its description considering various supported formats.

weaver.processes.utils._check_deploy(payload: weaver.typedefs.JSON) weaver.typedefs.ProcessDeployment | weaver.typedefs.CWL[source]

Validate minimum deploy payload field requirements with exception handling.

weaver.processes.utils._validate_deploy_process_info(process_info: weaver.typedefs.JSON, reference: str | None, package: weaver.typedefs.CWL | None, settings: weaver.typedefs.SettingsType, headers: weaver.typedefs.AnyHeadersContainer | None) weaver.typedefs.JSON[source]

Obtain the process definition from deploy payload with exception handling.

weaver.processes.utils.resolve_cwl_graph(package: weaver.typedefs.CWL) weaver.typedefs.CWL | Tuple[List[weaver.typedefs.CWL], weaver.typedefs.CWL][source]

Resolve CWL $graph into deployable packages.

Returns:

  • Single CWL dict if no $graph or $graph with 1 item (backward compatible)

  • tuple of (list of CWL dict items, original package with $graph) if multiple items

weaver.processes.utils.resolve_deployment_order(cwl_packages: List[weaver.typedefs.CWL]) Tuple[List[weaver.typedefs.CWL], weaver.typedefs.CWL | None][source]

Determine deployment order for multiple CWL packages.

Parameters:

cwl_packageslist of CWL package definitions to order.

Returns:

tuple of (dependencies, main_workflow) - dependencies: list of CommandLineTool/ExpressionTool to deploy first - main_workflow: The main Workflow (if any) to deploy last, or None

Raises:
  • HTTPNotImplemented – If multiple Workflow definitions are provided.

  • HTTPBadRequest – If multiple tools without a Workflow and no #main entry point.

weaver.processes.utils.resolve_multi_execution_units(execution_units: List[weaver.typedefs.JSON]) List[weaver.typedefs.CWL][source]

Extract all CWL packages from multiple execution units.

Each execution unit can contain either an inline package (unit) or a reference (href) to fetch the package from a remote location.

Parameters:

execution_unitslist of execution unit definitions. Each unit must have exactly one of unit or href (enforced by schema validation).

Returns:

list of resolved CWL package definitions.

Raises:

HTTPBadRequest – If unable to fetch a remote execution unit reference.

weaver.processes.utils._get_multipart_content(content: str | bytes, request: weaver.typedefs.AnyRequestType | None) bytes[source]

Get raw multipart content as bytes.

weaver.processes.utils.create_multipart_deploy(cwl_files: List[str | weaver.typedefs.CWL], url: weaver.typedefs.URL, process_description: weaver.typedefs.JSON | None = None, boundary: str | None = None) Tuple[bytes, str][source]

Create multipart/related deployment content from a list of CWL files.

Parameters:
  • cwl_files

    list of CWL files. Each item can be:

    • A file path (str) to a CWL file (will be loaded)

    • A CWL dict (already parsed)

  • url – Domain or hostname for Content-ID header generation

  • process_description – Optional Process description metadata to include

  • boundary – Optional custom boundary str (auto-generated if not provided)

Returns:

tuple of (multipart content bytes, full Content-Type header with boundary)

weaver.processes.utils._classify_multipart_part(part_data: weaver.typedefs.JSON, cwl_packages: List[weaver.typedefs.CWL], parts_by_cid: Dict[str, weaver.typedefs.CWL], content_id: str, process_description: weaver.typedefs.JSON | None) weaver.typedefs.JSON | None[source]

Classify parsed multipart part as CWL package or process description.

Returns updated process_description if applicable.

weaver.processes.utils._validate_and_reorder_multipart_workflow(cwl_packages: List[weaver.typedefs.CWL], root_workflow_cid: str | None, parts_by_cid: Dict[str, weaver.typedefs.CWL]) List[weaver.typedefs.CWL][source]

Validate and reorder CWL packages based on root workflow reference.

Validates that the root document (specified by start parameter or first element) is a Workflow as per RFC 5621 requirements for multipart/related.

Parameters:
  • cwl_packageslist of CWL packages extracted from multipart content

  • root_workflow_cid – Content-ID of the root workflow from start parameter (if provided)

  • parts_by_ciddict mapping Content-IDs to CWL packages

Returns:

Reordered list of CWL packages with root workflow last

Raises:

HTTPBadRequest – If start parameter references a non-Workflow CWL

weaver.processes.utils._fetch_multipart_content_location(content_location: str, part_content: str, request: weaver.typedefs.AnyRequestType | None = None) str[source]

Fetch CWL content from Content-Location header if part body is empty.

Checks if part body is empty and Content-Location is a URL, then fetches the content from that location. Content-Location can be a static CWL file URL or an API endpoint (Weaver, WPS, OGC API). Relative URLs are resolved against the base API URL.

Parameters:
  • content_locationContent-Location header value (absolute or relative URL)

  • part_content – Current part content (may be empty)

  • request – Optional request object for resolving relative URLs

Returns:

Updated part content (either original or fetched from Content-Location)

Raises:

HTTPBadRequest – If fetching from Content-Location fails

weaver.processes.utils._parse_multipart_message(content: str | bytes, content_type: str, request: weaver.typedefs.AnyRequestType | None = None) Any[source]

Parse raw multipart content into an email.message.Message object.

Parameters:
  • content – Raw multipart content (str or bytes)

  • content_typeContent-Type header value (must include boundary parameter)

  • request – Optional request object for extracting body

Returns:

Parsed multipart message object

Raises:

HTTPBadRequest – If multipart content is malformed

weaver.processes.utils._extract_multipart_start_parameter(content_type: str) str | None[source]

Extract the start parameter from a multipart/related Content-Type header.

Parameters:

content_type – Full Content-Type header value

Returns:

Content-ID from start parameter, or None if not present

weaver.processes.utils._interpret_multipart_part(part: Any, request: weaver.typedefs.AnyRequestType | None = None) Tuple[str, str, str, weaver.typedefs.JSON] | None[source]

Interpret a single multipart part: decode, fetch content if needed, and parse.

Parameters:
  • part – Single part from multipart message

  • request – Optional request object for resolving relative Content-Location URLs

Returns:

tuple of (content_type, content_id, content_location, parsed_data) or None if part cannot be parsed

weaver.processes.utils._organize_deploy_parts(interpreted_parts: List[Tuple[str, str, str, weaver.typedefs.JSON]], root_workflow_cid: str | None) Tuple[List[weaver.typedefs.CWL], weaver.typedefs.JSON | None][source]

Organize interpreted multipart parts into CWL packages and process description.

Parameters:
  • interpreted_partslist of interpreted parts (content_type, content_id, content_location, data)

  • root_workflow_cid – Content-ID of root workflow from start parameter (if any)

Returns:

tuple of (list of CWL packages, optional process description)

Raises:

HTTPBadRequest – If no CWL packages found or root workflow validation fails

weaver.processes.utils.parse_multipart_deploy(content: str | bytes, content_type: str, request: weaver.typedefs.AnyRequestType | None = None) Tuple[List[weaver.typedefs.CWL], weaver.typedefs.JSON | None][source]

Parse multipart/mixed or multipart/related deployment content.

Extracts CWL packages and optional Process description from multipart request.

Parameters:
  • content – Raw multipart content (str or bytes)

  • content_typeContent-Type header value (must include boundary parameter)

  • request – Optional request object for extracting body

Returns:

tuple of (list of CWL packages, optional Process description metadata)

Raises:

HTTPBadRequest – If multipart content is malformed or invalid

weaver.processes.utils.parse_process_deploy_content(request: weaver.typedefs.AnyRequestType | None = None, content: weaver.typedefs.JSON | str | None = None, content_schema: colander.SchemaNode | None = None, content_type: weaver.formats.ContentType | None = sd.RequestContentTypeHeader.default, content_type_schema: colander.SchemaNode | None = sd.RequestContentTypeHeader) weaver.typedefs.JSON | weaver.typedefs.CWL[source]

Load the request content with validation of expected content type and their schema.

weaver.processes.utils.deploy_process_from_payload(payload: weaver.typedefs.JSON | str, container: weaver.typedefs.AnySettingsContainer | weaver.typedefs.AnyRequestType, overwrite: bool | weaver.datatype.Process = False) pyramid.httpexceptions.HTTPException[source]

Deploy the process after resolution of all references and validation of the parameters from payload definition.

Adds a weaver.datatype.Process instance to storage using the provided JSON payload matching weaver.wps_restapi.swagger_definitions.ProcessDescription.

Parameters:
  • payload – JSON payload that was specified during the process deployment request.

  • container – Container to retrieve application settings. If it is a request-like object, additional parameters may be used to identify the payload schema.

  • overwrite – In case of a pure deployment (from scratch), indicates (using bool) whether to allow override of an existing process definition if conflict occurs. No versioning is applied in this case (full replacement). In case of an update deployment (from previous), indicates which process to be replaced with updated version. The new version should not conflict with another existing process version. If payload doesn’t provide a new version, the following MAJOR version from the specified overwrite process is used to define the new revision.

Returns:

HTTPOk if the process registration was successful.

Raises:

HTTPException – for any invalid process deployment step.

weaver.processes.utils._save_deploy_process(process: weaver.datatype.Process, override: bool, container: weaver.typedefs.AnySettingsContainer) weaver.typedefs.JSON[source]

Store the Process to database with error handling and appropriate message reporting the problem.

weaver.processes.utils._deploy_process_multi_cwl(cwl_packages: List[weaver.typedefs.CWL], container: weaver.typedefs.AnySettingsContainer | weaver.typedefs.AnyRequestType, overwrite: bool | weaver.datatype.Process) pyramid.httpexceptions.HTTPException[source]

Deploy multiple CWL packages from a $graph definition using recursive deployment calls.

Rather than duplicating deployment logic, this function orchestrates the deployment order and recursively calls deploy_process_from_payload() for each package to reuse all validation, URL setup, and storage logic.

Warning

This deployment is NOT atomic. If any CWL package fails during deployment:

  • Previously deployed tools remain in the database (no rollback)

  • Retry attempts will skip already-deployed tools (HTTPConflict is caught and ignored)

  • Child tools are always deployed with overwrite=False, even if the main process uses overwrite=True

This means partial deployments can leave orphaned processes that must be manually cleaned up.

Parameters:
  • cwl_packageslist of resolved CWL package definitions.

  • container – Application container.

  • overwrite – Whether to overwrite existing processes. Note: only applies to main process, not child tools.

Returns:

HTTP response with deployment result from the main process.

weaver.processes.utils._update_deploy_process_version(process: weaver.datatype.Process, process_overwrite: weaver.datatype.Process, update_level: weaver.utils.VersionLevel, container: weaver.typedefs.AnySettingsContainer | None = None) weaver.typedefs.JSON[source]

Handle all necessary update operations of a Process definition.

Validate that any specified version for Process deployment is valid against any other existing versions. Perform any necessary database adjustments to replace the old Process references for the creation of the updated Process to ensure all versions and links remain valid against their original references.

Parameters:
  • process – Desired new process definition.

  • process_overwrite – Old process from which update of the definition in database could be required.

  • update_level – Minimum semantic version level required for this update operation. If the new Process definition did not provide a version explicitly, this level will be used to automatically infer the following revision number based on the old Process reference.

  • container – Any container to retrieve a database connection.

Returns:

Process summary with definition retrieved from storage (saved) after all operations were applied.

Raises:

HTTPException – Relevant error is raised in the even of any erroneous process definition (old and new).

weaver.processes.utils._bump_process_version(version: weaver.typedefs.AnyVersion, update_level: weaver.utils.VersionLevel) weaver.typedefs.AnyVersion[source]

Obtain the relevant version with specified level incremented by one.

weaver.processes.utils._apply_process_metadata(process: weaver.datatype.Process, update_data: weaver.typedefs.JSON) weaver.utils.VersionLevel[source]

Apply requested changes for update of the Process.

Assumes that update data was pre-validated with appropriate schema validation to guarantee relevant typings and formats are applied for expected fields. Validation of fields metadata with their specific conditions is accomplished when attempting to apply changes.

See also

Schema sd.PatchProcessBodySchema describes specific field handling based on unspecified value, null or empty-list. Corresponding update levels required for fields are also provided in this schema definition.

Parameters:
  • process – Process to modify. Can be the latest or a previously tagged version.

  • update_data – Fields with updated data to apply to the process.

Returns:

Applicable update level based on updates to be applied.

weaver.processes.utils.update_process_metadata(request: weaver.typedefs.AnyRequestType) pyramid.httpexceptions.HTTPException[source]

Update only MINOR or PATCH level Process metadata.

Desired new version can be eiter specified explicitly in request payload, or will be guessed accordingly to detected changes to be applied.

weaver.processes.utils.parse_wps_process_config(config_entry: weaver.typedefs.JSON | str) Tuple[str, str, List[str], bool][source]

Parses the available WPS provider or process entry to retrieve its relevant information.

Returns:

WPS provider name, WPS service URL, and list of process identifier(s).

Raises:

ValueError – if the entry cannot be parsed correctly.

weaver.processes.utils.register_wps_processes_static(service_url: str, service_name: str, service_visibility: bool, service_processes: List[str], container: weaver.typedefs.AnyRegistryContainer) None[source]

Register WPS-1 Process under a service Provider as static references.

For a given WPS provider endpoint, either iterates over all available processes under it to register them one by one, or limit itself only to those of the reduced set specified by service_processes.

The registered WPS-1 processes generate a static reference, meaning that metadata of each process as well as any other modifications to the real remote reference will not be tracked, including validation of even their actual existence, or modifications to inputs/outputs. The Application Package will only point to it assuming it remains valid.

Each of the deployed processes using static reference will be accessible directly under Weaver endpoints:

/processes/<service-name>_<process-id>

The service is NOT deployed as Provider since the processes are registered directly.

Parameters:
  • service_url – WPS-1 service location (where GetCapabilities and DescribeProcess requests can be made).

  • service_name – Identifier to employ for generating the full process identifier.

  • service_visibility – Visibility flag of the provider.

  • service_processes – process IDs under the service to be registered, or all if empty.

  • container – settings to retrieve required configuration settings.

weaver.processes.utils.register_wps_processes_dynamic(service_name: str, service_url: str, service_visibility: bool, container: weaver.typedefs.AnyRegistryContainer) None[source]

Register a WPS service provider such that processes under it are dynamically accessible on demand.

The registered WPS-1 provider generates a dynamic reference to processes under it. Only the Provider reference itself is actually registered. No Process are directly registered following this operation.

When information about the offered processes, descriptions of those processes or their execution are requested, Weaver will query the referenced Provider for details and convert the corresponding Process dynamically. This means that latest metadata of the Process, and any modification to it on the remote service will be immediately reflected on Weaver without any need to re-deploy processes.

Each of the deployed processes using dynamic reference will be accessible under Weaver endpoints:

/providers/<service-name>/processes/<process-id>

The processes are NOT deployed locally since the processes are retrieved from the Provider itself.

Parameters:
  • service_url – WPS-1 service location (where GetCapabilities and DescribeProcess requests can be made).

  • service_name – Identifier to employ for registering the provider identifier.

  • service_visibility – Visibility flag of the provider.

  • container – settings to retrieve required configuration settings.

weaver.processes.utils.register_wps_processes_from_config(container: weaver.typedefs.AnySettingsContainer, wps_processes_file_path: weaver.typedefs.FileSystemPathType | None = None) None[source]

Registers remote WPS providers and/or processes as specified from the configuration file.

Loads a wps_processes.yml file and registers processes under WPS-1/2 providers to the current Weaver instance as equivalent OGC API - Processes instances.

References listed under processes are registered statically (by themselves, unchanging snapshot). References listed under providers, the WPS themselves are registered, making each Process listed in their GetCapabilities available. In this case, registered processes are defined dynamically, meaning they will be fetched on the provider each time a request refers to them, keeping their definition up-to-date with the remote server.

Added in version 1.14: When references are specified using providers section instead of processes, the registration only saves the remote WPS provider endpoint to dynamically populate WPS processes on demand. Previous behavior was to register each WPS process individually with ID [service]_[process].

Changed in version 4.19: Parameter position are inverted. If wps_processes_file_path is explicitly provided, it is used directly without considering settings. Otherwise, automatically employ the definition in setting: weaver.wps_processes_file.

See also

  • weaver.wps_processes.yml.example for additional file format details.

Note

Settings with an explicit empty weaver.wps_processes_file entry will be considered as nothing to load. If the entry is omitted, default location WEAVER_DEFAULT_WPS_PROCESSES_CONFIG is attempted instead.

Parameters:
  • container – Registry container to obtain database reference as well as application settings.

  • wps_processes_file_path – Override file path to employ instead of default settings definition.

weaver.processes.utils._check_package_file(cwl_file_path_or_url: str) str[source]

Validates that the specified CWL file path or URL points to an existing and allowed file format.

Parameters:

cwl_file_path_or_url – one of allowed file types path on disk, or an URL pointing to one served somewhere.

Returns:

validated absolute path or URL of the file reference.

Raises:

PackageRegistrationError – in case of missing file, invalid format or invalid HTTP status code.

weaver.processes.utils.is_cwl_package(package: Any) bool[source]

Perform minimal validation of a CWL package definition.

weaver.processes.utils.load_package_file(file_path: str) weaver.typedefs.CWL[source]

Loads the package in YAML/JSON format specified by the file path.

weaver.processes.utils.register_cwl_processes_from_config(container: weaver.typedefs.AnySettingsContainer) int[source]

Load multiple CWL definitions from a directory to register corresponding Process.

Added in version 4.19.

Each individual CWL definition must fully describe a Process by itself. Therefore, an id must be available in the file to indicate the target deployment reference. In case of conflict, the existing database Process will be overridden to ensure file updates are applied.

Files are loaded in alphabetical order. If a Workflow needs to refer to other processes, they should be named in way that dependencies will be resolvable prior to the registration of the Workflow Process. The resolved directory to search for CWL will be traversed recursively. This, along with the name of CWL files themselves, can be used to resolve order-dependent loading cases. Only .cwl extensions are considered to avoid invalid parsing of other files that could be defined in the shared configuration directory.

Note

Settings with an explicit empty weaver.cwl_processes_dir entry will be considered as nothing to load. If the entry is omitted, default location WEAVER_CONFIG_DIR is used to search for CWL files.

Parameters:

container – Registry container to obtain database reference as well as application settings.

Returns:

Number of successfully registered processes from found CWL files.

weaver.processes.utils.pull_docker(docker_auth: weaver.datatype.DockerAuthentication, logger: weaver.utils.LoggerHandler = LOGGER) docker.client.DockerClient | None[source]

Pulls the referenced Docker image to local cache from an optionally secured registry.

If the Docker image is already available locally, simply validates it. Authentication are applied as necessary using the provided parameters.

Warning

Logging calls must employ the

Parameters:
  • docker_auth – Docker reference with optional authentication parameters.

  • logger – Alternative logger reference to log status messages about the operation.

Returns:

Docker client to perform further operations with the retrieved or validated image. None if failed.