import argparse
import base64
import copy
import inspect
import logging
import os
import sys
import time
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import yaml
from yaml.scanner import ScannerError
from weaver import __meta__
from weaver.datatype import AutoBase
from weaver.exceptions import PackageRegistrationError
from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType, get_content_type, get_format
from weaver.processes.constants import ProcessSchema
from weaver.processes.convert import (
convert_input_values_schema,
cwl2json_input_values,
get_field,
repr2json_input_values
)
from weaver.processes.wps_package import get_process_definition
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.utils import (
fetch_file,
fully_qualified_name,
get_any_id,
get_any_value,
get_file_headers,
load_file,
null,
repr_json,
request_extra,
setup_loggers
)
from weaver.visibility import Visibility
from weaver.wps_restapi import swagger_definitions as sd
if TYPE_CHECKING:
from typing import Any, Dict, Optional, Tuple, Union
from requests import Response
# avoid failing sphinx-argparse documentation
# https://github.com/ashb/sphinx-argparse/issues/7
try:
from weaver.typedefs import CWL, JSON, ExecutionInputsMap, HeadersType
except ImportError:
CWL = JSON = ExecutionInputsMap = HeadersType = Any # avoid linter issue
try:
from weaver.processes.constants import ProcessSchemaType
except ImportError:
[docs] ProcessSchemaType = str
[docs]LOGGER = logging.getLogger(__name__)
[docs]OPERATION_ARGS_TITLE = "Operation Arguments"
[docs]OPTIONAL_ARGS_TITLE = "Optional Arguments"
[docs]REQUIRED_ARGS_TITLE = "Required Arguments"
[docs]def _json2text(data):
# type: (Any) -> str
return repr_json(data, indent=2, ensure_ascii=False)
[docs]class OperationResult(AutoBase):
"""
Data container for any :class:`WeaverClient` operation results.
:param success: Success status of the operation.
:param message: Detail extracted from response content if available.
:param headers: Headers returned by the response for reference.
:param body: Content of :term:`JSON` response or fallback in plain text.
:param text: Pre-formatted text representation of :paramref:`body`.
"""
[docs] success = False # type: Optional[bool]
[docs] message = "" # type: Optional[str]
[docs] body = {} # type: Optional[Union[JSON, str]]
[docs] code = None # type: Optional[int]
def __init__(self,
success=None, # type: Optional[bool]
message=None, # type: Optional[str]
body=None, # type: Optional[Union[str, JSON]]
headers=None, # type: Optional[HeadersType]
text=None, # type: Optional[str]
code=None, # type: Optional[int]
**kwargs, # type: Any
): # type: (...) -> None
super(OperationResult, self).__init__(**kwargs)
self.success = success
self.message = message
self.headers = headers
self.body = body
self.text = text
self.code = code
def __repr__(self):
params = ["success", "code", "message"]
quotes = [False, False, True]
quoted = lambda q, v: f"\"{v}\"" if q and v is not None else v # noqa: E731
values = ", ".join([f"{param}={quoted(quote, getattr(self, param))}" for quote, param in zip(quotes, params)])
return f"{type(self).__name__}({values})\n{self.text}"
@property
[docs] def text(self):
# type: () -> str
text = dict.get(self, "text", None)
if not text and self.body:
text = _json2text(self.body)
self["text"] = text
return text
@text.setter
def text(self, text):
# type: (str) -> None
self["text"] = text
[docs]class WeaverClient(object):
"""
Client that handles common HTTP requests with a `Weaver` or similar :term:`OGC API - Processes` instance.
"""
# default configuration parameters, overridable by corresponding method parameters
[docs] monitor_timeout = 60 # maximum delay to wait for job completion
[docs] monitor_interval = 5 # interval between monitor pooling job status requests
def __init__(self, url=None):
# type: (Optional[str]) -> None
if url:
self._url = self._parse_url(url)
LOGGER.debug("Using URL: [%s]", self._url)
else:
self._url = None
LOGGER.warning("No URL provided. All operations must provide it directly or through another parameter!")
self._headers = {"Accept": ContentType.APP_JSON, "Content-Type": ContentType.APP_JSON}
self._settings = {
"weaver.request_options": {}
} # FIXME: load from INI, overrides as input (cumul arg '--setting weaver.x=value') ?
[docs] def _get_url(self, url):
# type: (Optional[str]) -> str
if not self._url and not url:
raise ValueError("No URL available. Client was not created with an URL and operation did not receive one.")
return self._url or self._parse_url(url)
@staticmethod
[docs] def _parse_url(url):
parsed = urlparse("http://" + url if not url.startswith("http") else url)
parsed_netloc_path = f"{parsed.netloc}{parsed.path}".replace("//", "/")
parsed_url = f"{parsed.scheme}://{parsed_netloc_path}"
return parsed_url.rsplit("/", 1)[0] if parsed_url.endswith("/") else parsed_url
@staticmethod
[docs] def _parse_result(response, message=None):
# type: (Response, Optional[str]) -> OperationResult
hdr = dict(response.headers)
success = False
try:
body = response.json()
msg = message or body.get("description", body.get("message", "undefined"))
if response.status_code >= 400:
if not msg:
msg = body.get("error", body.get("exception", "unknown"))
else:
success = True
text = _json2text(body)
except Exception: # noqa
text = body = response.text
msg = "Could not parse body."
return OperationResult(success, msg, body, hdr, text=text, code=response.status_code)
@staticmethod
[docs] def _parse_deploy_body(body, process_id):
# type: (Optional[Union[JSON, str]], Optional[str]) -> OperationResult
data = {} # type: JSON
try:
if body:
if isinstance(body, str) and (body.startswith("http") or os.path.isfile(body)):
data = load_file(body)
elif isinstance(body, str) and body.startswith("{") and body.endswith("}"):
data = yaml.safe_load(body)
elif isinstance(body, dict):
data = body
else:
msg = "Cannot load badly formed body. Deploy JSON object or file reference expected."
return OperationResult(False, msg, body, {})
elif not body:
data = {
"processDescription": {
"process": {"id": process_id}
}
}
if body and process_id:
LOGGER.debug("Override provided process ID [%s] into provided/loaded body.", process_id)
data.setdefault("processDescription", {})
data["processDescription"].setdefault("process", {})
data["processDescription"]["process"]["id"] = process_id # type: ignore
# for convenience, always set visibility by default
data.setdefault("processDescription", {})
data["processDescription"].setdefault("process", {})
data["processDescription"]["process"]["visibility"] = Visibility.PUBLIC # type: ignore
except (ValueError, TypeError, ScannerError) as exc:
return OperationResult(False, f"Failed resolution of body definition: [{exc!s}]", body)
return OperationResult(True, "", data)
@staticmethod
[docs] def _parse_deploy_package(body, cwl, wps, process_id, headers):
# type: (JSON, Optional[CWL], Optional[str], Optional[str], HeadersType) -> OperationResult
try:
p_id = body.get("processDescription", {}).get("process", {}).get("id", process_id)
info = {"id": p_id} # minimum requirement for process offering validation
if (isinstance(cwl, str) and not cwl.startswith("{")) or isinstance(wps, str):
LOGGER.debug("Override loaded CWL into provided/loaded body for process: [%s]", p_id)
proc = get_process_definition(info, reference=cwl or wps, headers=headers) # validate
body["executionUnit"] = [{"unit": proc["package"]}]
elif isinstance(cwl, str) and cwl.startswith("{") and cwl.endswith("}"):
LOGGER.debug("Override parsed CWL into provided/loaded body for process: [%s]", p_id)
pkg = yaml.safe_load(cwl)
if not isinstance(pkg, dict) or pkg.get("cwlVersion") is None:
raise PackageRegistrationError("Failed parsing or invalid CWL from expected literal JSON string.")
proc = get_process_definition(info, package=pkg, headers=headers) # validate
body["executionUnit"] = [{"unit": proc["package"]}]
elif isinstance(cwl, dict):
LOGGER.debug("Override provided CWL into provided/loaded body for process: [%s]", p_id)
get_process_definition(info, package=cwl, headers=headers) # validate
body["executionUnit"] = [{"unit": cwl}]
except (PackageRegistrationError, ScannerError) as exc:
message = f"Failed resolution of package definition: [{exc!s}]"
return OperationResult(False, message, cwl)
return OperationResult(True, p_id, body)
[docs] def _parse_job_ref(self, job_reference, url=None):
# type: (str, Optional[str]) -> Tuple[Optional[str], Optional[str]]
if job_reference.startswith("http"):
job_url = job_reference
job_parts = [part for part in job_url.split("/") if part.strip()]
job_id = job_parts[-1]
else:
url = self._get_url(url)
job_id = job_reference
job_url = f"{url}/jobs/{job_id}"
return job_id, job_url
@staticmethod
[docs] def _parse_auth_token(token, username, password):
# type: (Optional[str], Optional[str], Optional[str]) -> HeadersType
if token or (username and password):
if not token:
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("utf-8")
return {sd.XAuthDockerHeader.name: f"Basic {token}"}
return {}
[docs] def deploy(self,
process_id=None, # type: Optional[str]
body=None, # type: Optional[Union[JSON, str]]
cwl=None, # type: Optional[Union[CWL, str]]
wps=None, # type: Optional[str]
token=None, # type: Optional[str]
username=None, # type: Optional[str]
password=None, # type: Optional[str]
undeploy=False, # type: bool
url=None, # type: Optional[str]
): # type: (...) -> OperationResult
"""
Deploy a new :term:`Process` with specified metadata and reference to an :term:`Application Package`.
The referenced :term:`Application Package` must be one of:
- :term:`CWL` body, local file or URL in :term:`JSON` or :term:`YAML` format
- :term:`WPS` process URL with :term:`XML` response
- :term:`WPS-REST` process URL with :term:`JSON` response
- :term:`OGC API - Processes` process URL with :term:`JSON` response
If the reference is resolved to be a :term:`Workflow`, all its underlying :term:`Process` steps must be
available under the same URL that this client was initialized with.
.. seealso::
:ref:`proc_op_deploy`
:param process_id:
Desired process identifier.
Can be omitted if already provided in body contents or file.
:param body:
Literal :term:`JSON` contents, either using string representation of actual Python objects forming the
request body, or file path/URL to :term:`YAML` or :term:`JSON` contents of the request body.
Other parameters (:paramref:`process_id`, :paramref:`cwl`) can override corresponding fields within the
provided body.
:param cwl:
Literal :term:`JSON` or :term:`YAML` contents, either using string representation of actual Python objects,
or file path/URL with contents of the :term:`CWL` definition of the :term:`Application package` to be
inserted into the body.
:param wps:
URL to an existing :term:`WPS` process (WPS-1/2 or WPS-REST/OGC-API).
:param token:
Authentication token for accessing private Docker registry if :term:`CWL` refers to such image.
:param username:
Username to form the authentication token to a private Docker registry.
:param password:
Password to form the authentication token to a private Docker registry.
:param undeploy:
Perform undeploy step as applicable prior to deployment to avoid conflict with exiting :term:`Process`.
:param url:
Instance URL if not already provided during client creation.
:returns: Results of the operation.
"""
result = self._parse_deploy_body(body, process_id)
if not result.success:
return result
headers = copy.deepcopy(self._headers)
headers.update(self._parse_auth_token(token, username, password))
data = result.body
result = self._parse_deploy_package(data, cwl, wps, process_id, headers)
if not result.success:
return result
p_id = result.message
data = result.body
base = self._get_url(url)
if undeploy:
LOGGER.debug("Performing requested undeploy of process: [%s]", p_id)
result = self.undeploy(process_id=p_id, url=base)
if result.code not in [200, 404]:
return OperationResult(False, "Failed requested undeployment prior deployment.",
body=result.body, text=result.text, code=result.code, headers=result.headers)
path = f"{base}/processes"
resp = request_extra("POST", path, json=data, headers=headers, settings=self._settings)
return self._parse_result(resp)
[docs] def undeploy(self, process_id, url=None):
# type: (str, Optional[str]) -> OperationResult
"""
Undeploy an existing :term:`Process`.
:param process_id: Identifier of the process to undeploy.
:param url: Instance URL if not already provided during client creation.
"""
base = self._get_url(url)
path = f"{base}/processes/{process_id}"
resp = request_extra("DELETE", path, headers=self._headers, settings=self._settings)
return self._parse_result(resp)
[docs] def capabilities(self, url=None):
# type: (Optional[str]) -> OperationResult
"""
List all available :term:`Process` on the instance.
.. seealso::
:ref:`proc_op_getcap`
:param url: Instance URL if not already provided during client creation.
"""
base = self._get_url(url)
path = f"{base}/processes"
query = {"detail": False} # not supported by non-Weaver, but save the work if possible
resp = request_extra("GET", path, params=query, headers=self._headers, settings=self._settings)
result = self._parse_result(resp)
processes = result.body.get("processes")
if isinstance(processes, list) and all(isinstance(proc, dict) for proc in processes):
processes = [get_any_id(proc) for proc in processes]
result.body = processes
return result
[docs] processes = capabilities # alias
"""
Alias of :meth:`capabilities` for :term:`Process` listing.
"""
[docs] def describe(self, process_id, url=None, schema=ProcessSchema.OGC):
# type: (str, Optional[str], Optional[ProcessSchemaType]) -> OperationResult
"""
Describe the specified :term:`Process`.
.. seealso::
:ref:`proc_op_describe`
:param process_id: Identifier of the process to describe.
:param url: Instance URL if not already provided during client creation.
:param schema: Representation schema of the returned process description.
"""
base = self._get_url(url)
path = f"{base}/processes/{process_id}"
query = None
if isinstance(schema, str) and schema.upper() in ProcessSchema.values():
query = {"schema": schema.upper()}
resp = request_extra("GET", path, params=query, headers=self._headers, settings=self._settings)
# API response from this request can contain 'description' matching the process description
# rather than a generic response 'description'. Enforce the provided message to avoid confusion.
return self._parse_result(resp, message="Process description successfully retrieved.")
@staticmethod
[docs] def _update_files(self, inputs, url=None):
# type: (ExecutionInputsMap, Optional[str]) -> Tuple[ExecutionInputsMap, HeadersType]
"""
Replaces local file paths by references uploaded to the :term:`Vault`.
.. seealso::
- Headers dictionary limitation by :mod:`requests`:
https://docs.python-requests.org/en/latest/user/quickstart/#response-headers
- Headers formatting with multiple values must be provided by comma-separated values
(:rfc:`7230#section-3.2.2`).
- Multi Vault-Token parsing accomplished by :func:`weaver.vault.utils.parse_vault_token`.
- More details about formats and operations related to :term:`Vault` are provided
in :ref:`file_vault_token` and :ref:`vault` chapters.
:param inputs: Input values for submission of :term:`Process` execution.
:return: Updated inputs.
"""
auth_tokens = {} # type: Dict[str, str]
update_inputs = dict(inputs)
for input_id, input_data in dict(inputs).items():
if not isinstance(input_data, list): # support array of files
input_data = [input_data]
for data in input_data:
if not isinstance(data, dict):
continue
file = href = get_any_value(data, default=null, data=False)
if not isinstance(href, str):
continue
if href.startswith("file://"):
href = href[7:]
if "://" not in href and not os.path.isfile(href):
LOGGER.warning(
"Ignoring potential local file reference since it does not exist. "
"Cannot upload to vault: [%s]", file
)
continue
fmt = data.get("format", {})
ctype = get_field(fmt, "mime_type", search_variations=True)
if not ctype:
ext = os.path.splitext(href)[-1]
ctype = get_content_type(ext)
fmt = get_format(ctype, default=ContentType.TEXT_PLAIN)
res = self.upload(href, content_type=fmt.mime_type, url=url)
if res.code != 200:
return res
vault_href = res.body["file_href"]
vault_id = res.body["file_id"]
token = res.body["access_token"]
auth_tokens[vault_id] = token
LOGGER.info("Converted (input: %s) [%s] -> [%s]", input_id, file, vault_href)
update_inputs[input_id] = {"href": vault_href, "format": {"mediaType": ctype}}
auth_headers = {}
if auth_tokens:
multi_tokens = ",".join([
f"token {token}; id={input_id}"
for input_id, token in auth_tokens.items()
])
auth_headers = {sd.XAuthVaultFileHeader.name: multi_tokens}
return update_inputs, auth_headers
# FIXME: support sync (https://github.com/crim-ca/weaver/issues/247)
# :param execute_async:
# Execute the process asynchronously (user must call :meth:`monitor` themselves,
# or synchronously where monitoring is done automatically until completion before returning.
[docs] def execute(self, process_id, inputs=None, monitor=False, timeout=None, interval=None, url=None):
# type: (str, Optional[Union[str, JSON]], bool, Optional[int], Optional[int], Optional[str]) -> OperationResult
"""
Execute a :term:`Job` for the specified :term:`Process` with provided inputs.
When submitting inputs with :term:`OGC API - Processes` schema, top-level ``inputs`` key is expected.
Under it, either the mapping (key-value) or listing (id,value) representation are accepted.
If ``inputs`` is not found, the alternative :term:`CWL` will be assumed.
When submitting inputs with :term:`CWL` *job* schema, plain key-value(s) pairs are expected.
All values should be provided directly under the key (including arrays), except for ``File``
type that must include the ``class`` and ``path`` details.
.. seealso::
:ref:`proc_op_execute`
:param process_id: Identifier of the process to execute.
:param inputs:
Literal :term:`JSON` or :term:`YAML` contents of the inputs submitted and inserted into the execution body,
using either the :term:`OGC API - Processes` or :term:`CWL` format, or a file path/URL referring to them.
:param monitor:
Automatically perform :term:`Job` execution monitoring until completion or timeout to obtain final results.
If requested, this operation will become blocking until either the completed status or timeout is reached.
:param timeout:
Monitoring timeout (seconds) if requested.
:param interval:
Monitoring interval (seconds) between job status polling requests.
:param url: Instance URL if not already provided during client creation.
:returns: Results of the operation.
"""
if isinstance(inputs, list) and all(isinstance(item, list) for item in inputs):
inputs = [items for sub in inputs for items in sub] # flatten 2D->1D list
values = self._parse_inputs(inputs)
if isinstance(values, OperationResult):
return values
base = self._get_url(url)
result = self._update_files(values, url=base)
if isinstance(result, OperationResult):
return result
values, auth_headers = result
data = {
# NOTE: since sync is not yet properly implemented in Weaver, simulate with monitoring after if requested
# FIXME: support 'sync' (https://github.com/crim-ca/weaver/issues/247)
"mode": ExecuteMode.ASYNC,
"inputs": values,
# FIXME: support 'response: raw' (https://github.com/crim-ca/weaver/issues/376)
"response": ExecuteResponse.DOCUMENT,
# FIXME: allow omitting 'outputs' (https://github.com/crim-ca/weaver/issues/375)
# FIXME: allow 'transmissionMode: value/reference' selection (https://github.com/crim-ca/weaver/issues/377)
"outputs": {}
}
# FIXME: since (https://github.com/crim-ca/weaver/issues/375) not implemented, auto-populate all the outputs
result = self.describe(process_id, url=base)
if not result.success:
return OperationResult(False, "Could not obtain process description for execution.",
body=result.body, headers=result.headers, code=result.code, text=result.text)
outputs = result.body.get("outputs")
for output_id in outputs:
# use 'value' to have all outputs reported in body as 'value/href' rather than 'Link' headers
data["outputs"][output_id] = {"transmissionMode": ExecuteTransmissionMode.VALUE}
LOGGER.info("Executing [%s] with inputs:\n%s", process_id, _json2text(values))
path = f"{base}/processes/{process_id}/execution" # use OGC-API compliant endpoint (not '/jobs')
headers = {}
headers.update(self._headers)
headers.update(auth_headers)
resp = request_extra("POST", path, json=data, headers=headers, settings=self._settings)
result = self._parse_result(resp)
if not monitor or not result.success:
return result
# although Weaver returns "jobID" in the body for convenience,
# employ the "Location" header to be OGC-API compliant
job_url = resp.headers.get("Location", "")
return self.monitor(job_url, timeout=timeout, interval=interval)
[docs] def upload(self, file_path, content_type=None, url=None):
# type: (str, Optional[str], Optional[str]) -> OperationResult
"""
Upload a local file to the :term:`Vault`.
.. note::
Feature only available for `Weaver` instances. Not available for standard :term:`OGC API - Processes`.
.. seealso::
More details about formats and operations related to :term:`Vault` are provided
in :ref:`file_vault_token` and :ref:`vault` chapters.
:param file_path: Location of the file to be uploaded.
:param content_type:
Explicit Content-Type of the file.
This should be an IANA Media-Type, optionally with additional parameters such as charset.
If not provided, attempts to guess it based on the file extension.
:param url: Instance URL if not already provided during client creation.
:returns: Results of the operation.
"""
if not isinstance(file_path, str):
file_type = fully_qualified_name(file_path)
return OperationResult(False, "Local file reference is not a string.", {"file_path": file_type})
if file_path.startswith("file://"):
file_path = file_path[7:]
if "://" in file_path:
scheme = file_path.split("://", 1)[0]
return OperationResult(False, "Scheme not supported for local file reference.", {"file_scheme": scheme})
file_path = os.path.abspath(os.path.expanduser(file_path))
if not os.path.isfile(file_path):
return OperationResult(False, "Resolved local file reference does not exist.", {"file_path": file_path})
LOGGER.debug("Processing file for vault upload: [%s]", file_path)
file_headers = get_file_headers(file_path, content_headers=True, content_type=content_type)
base = self._get_url(url)
path = f"{base}/vault"
files = {
"file": (
os.path.basename(file_path),
open(file_path, "r", encoding="utf-8"),
file_headers["Content-Type"]
)
}
req_headers = {
"Accept": ContentType.APP_JSON, # no 'Content-Type' since auto generated with multipart boundary
"Cache-Control": "no-cache", # ensure the cache is not used to return a previously uploaded file
}
# allow retry to avoid some sporadic HTTP 403 errors
resp = request_extra("POST", path, headers=req_headers, settings=self._settings, files=files, retry=2)
return self._parse_result(resp)
[docs] def status(self, job_reference, url=None):
# type: (str, Optional[str]) -> OperationResult
"""
Obtain the status of a :term:`Job`.
.. seealso::
:ref:`proc_op_status`
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param url: Instance URL if not already provided during client creation.
:returns: Retrieved status of the job.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
LOGGER.info("Getting job status: [%s]", job_id)
resp = request_extra("GET", job_url, headers=self._headers, settings=self._settings)
return self._parse_result(resp)
[docs] def monitor(self, job_reference, timeout=None, interval=None, wait_for_status=Status.SUCCEEDED, url=None):
# type: (str, Optional[int], Optional[int], str, Optional[str]) -> OperationResult
"""
Monitor the execution of a :term:`Job` until completion.
.. seealso::
:ref:`proc_op_monitor`
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param timeout: timeout (seconds) of maximum wait time for monitoring if completion is not reached.
:param interval: wait interval (seconds) between polling monitor requests.
:param wait_for_status: monitor until the requested status is reached (default: job failed or succeeded).
:param url: Instance URL if not already provided during client creation.
:return: Result of the successful or failed job, or timeout of monitoring process.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
remain = timeout = timeout or self.monitor_timeout
delta = interval or self.monitor_interval
LOGGER.info("Monitoring job [%s] for %ss at intervals of %ss.", job_id, timeout, delta)
once = True
body = None
while remain >= 0 or once:
resp = request_extra("GET", job_url, headers=self._headers, settings=self._settings)
if resp.status_code != 200:
return OperationResult(False, "Could not find job with specified reference.", {"job": job_reference})
body = resp.json()
status = body.get("status")
if status == wait_for_status:
return OperationResult(True, f"Requested job status reached [{wait_for_status}].", body)
if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]:
return OperationResult(False, "Requested job status not reached, but job has finished.", body)
time.sleep(delta)
remain -= delta
once = False
return OperationResult(False, f"Monitoring timeout reached ({timeout}s). Job did not complete in time.", body)
[docs] def results(self, job_reference, out_dir=None, download=False, url=None):
# type: (str, Optional[str], bool, Optional[str]) -> OperationResult
"""
Obtain the results of a successful :term:`Job` execution.
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param out_dir: Output directory where to store downloaded files if requested (default: CURDIR/JobID/<outputs>).
:param download: Download any file reference found within results (CAUTION: could transfer lots of data!).
:param url: Instance URL if not already provided during client creation.
:returns: Result details and local paths if downloaded.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
status = self.status(job_url)
if not status.success:
return OperationResult(False, "Cannot process results from incomplete or failed job.", status.body)
# use results endpoint instead of outputs to be OGC-API compliant, should be able to target non-Weaver instance
# with this endpoint, outputs IDs are directly at the root of the body
result_url = f"{job_url}/results"
LOGGER.info("Retrieving results from [%s]", result_url)
resp = request_extra("GET", result_url, headers=self._headers, settings=self._settings)
res_out = self._parse_result(resp)
outputs = res_out.body
if not res_out.success or not isinstance(res_out.body, dict):
return OperationResult(False, "Could not retrieve any output results from job.", outputs)
if not download:
return OperationResult(True, "Listing job results.", outputs)
# download file results
if not any("href" in value for value in outputs.values()):
return OperationResult(False, "Outputs were found but none are downloadable (only raw values?).", outputs)
if not out_dir:
out_dir = os.path.join(os.path.realpath(os.path.curdir), job_id)
os.makedirs(out_dir, exist_ok=True)
LOGGER.info("Will store job [%s] output results in [%s]", job_id, out_dir)
for output, value in outputs.items():
is_list = True
if not isinstance(value, list):
value = [value]
is_list = False
for i, item in enumerate(value):
if "href" in item:
file_path = fetch_file(item["href"], out_dir, link=False)
if is_list:
outputs[output][i]["path"] = file_path
else:
outputs[output]["path"] = file_path
return OperationResult(True, "Retrieved job results.", outputs)
[docs] def dismiss(self, job_reference, url=None):
"""
Dismiss pending or running :term:`Job`, or clear result artifacts from a completed :term:`Job`.
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param url: Instance URL if not already provided during client creation.
:returns: Obtained result from the operation.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
LOGGER.debug("Dismissing job: [%s]", job_id)
resp = request_extra("DELETE", job_url, headers=self._headers, settings=self._settings)
return self._parse_result(resp)
[docs]def setup_logger_from_options(logger, args): # pragma: no cover
# type: (logging.Logger, argparse.Namespace) -> None
"""
Uses argument parser options to setup logging level from specified flags.
Setup both the specific CLI logger that is provided and the top-level package logger.
"""
if args.log_level:
logger.setLevel(logging.getLevelName(args.log_level.upper()))
elif args.quiet:
logger.setLevel(logging.ERROR)
elif args.verbose:
logger.setLevel(logging.INFO)
elif args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
setup_loggers({}, force_stdout=args.stdout)
if logger.name != __meta__.__name__:
setup_logger_from_options(logging.getLogger(__meta__.__name__), args)
[docs]def make_logging_options(parser):
# type: (argparse.ArgumentParser) -> None
"""
Defines argument parser options for logging operations.
"""
log_title = "Logging Options"
log_desc = "Options that configure output logging."
log_opts = parser.add_argument_group(title=log_title, description=log_desc)
log_opts.add_argument("--stdout", action="store_true", help="Enforce logging to stdout for display in console.")
log_opts.add_argument("--log", "--log-file", help="Output file to write generated logs.")
lvl_opts = log_opts.add_mutually_exclusive_group()
lvl_opts.title = log_title
lvl_opts.description = log_desc
lvl_opts.add_argument("--quiet", "-q", action="store_true", help="Do not output anything else than error.")
lvl_opts.add_argument("--debug", "-d", action="store_true", help="Enable extra debug logging.")
lvl_opts.add_argument("--verbose", "-v", action="store_true", help="Output informative logging details.")
lvl_names = ["debug", "info", "warn", "error"]
lvl_opts.add_argument("--log-level", "-l", dest="log_level",
choices=list(sorted(lvl_names + [lvl.upper() for lvl in lvl_names])),
help="Explicit log level to employ (default: %(default)s).")
[docs]def add_url_param(parser, required=True):
# type: (argparse.ArgumentParser, bool) -> None
parser.add_argument("-u", "--url", metavar="URL", help="URL of the instance to run operations.", required=required)
[docs]def add_process_param(parser, description=None, required=True):
# type: (argparse.ArgumentParser, Optional[str], bool) -> None
operation = parser.prog.split(" ")[-1]
parser.add_argument(
"-p", "--id", "--process", dest="process_id", required=required,
help=description if description else f"Identifier of the process to run {operation} operation."
)
[docs]def add_job_ref_param(parser):
# type: (argparse.ArgumentParser) -> None
operation = parser.prog.split(" ")[-1]
parser.add_argument(
"-j", "--job", dest="job_reference", required=True,
help=f"Job URL or UUID to run {operation} operation. "
"If full Job URL is provided, the instance ``--url`` parameter can be omitted."
)
[docs]def add_timeout_param(parser):
# type: (argparse.ArgumentParser) -> None
parser.add_argument(
"-T", "--timeout", dest="timeout", type=int, default=WeaverClient.monitor_timeout,
help="Wait timeout (seconds) of the maximum monitoring duration of the job execution (default: %(default)ss). "
"If this timeout is reached but job is still running, another call directly to the monitoring operation "
"can be done to resume monitoring. The job execution itself will not stop in case of timeout."
)
parser.add_argument(
"-W", "--wait", "--interval", dest="interval", type=int, default=WeaverClient.monitor_interval,
help="Wait interval (seconds) between each job status polling during monitoring (default: %(default)ss)."
)
[docs]def set_parser_sections(parser):
# type: (argparse.ArgumentParser) -> None
parser._optionals.title = OPTIONAL_ARGS_TITLE
parser._positionals.title = REQUIRED_ARGS_TITLE
[docs]class SubArgumentParserFixedMutexGroups(argparse.ArgumentParser):
"""
Patch incorrectly handled mutually exclusive groups sections in subparsers.
.. seealso::
- https://bugs.python.org/issue43259
- https://bugs.python.org/issue16807
"""
[docs] def _add_container_actions(self, container):
# pylint: disable=W0212
groups = container._mutually_exclusive_groups
tmp_mutex_groups = container._mutually_exclusive_groups
container._mutually_exclusive_groups = []
super(SubArgumentParserFixedMutexGroups, self)._add_container_actions(container)
for group in groups:
mutex_group = self.add_mutually_exclusive_group(required=group.required)
for action in group._group_actions:
mutex_group._group_actions.append(action)
container._mutually_exclusive_groups = tmp_mutex_groups
[docs]class ArgumentParserFixedRequiredArgs(argparse.ArgumentParser):
"""
Override action grouping under 'required' section to consider explicit flag even if action has option prefix.
Default behaviour places option prefixed (``-``, ``--``) arguments into optionals even if ``required`` is defined.
Help string correctly considers this flag and doesn't place those arguments in brackets (``[--<optional-arg>]``).
"""
[docs] def _add_action(self, action):
if action.option_strings and not action.required:
self._optionals._add_action(action)
else:
self._positionals._add_action(action)
return action
[docs]class WeaverArgumentParser(ArgumentParserFixedRequiredArgs, SubArgumentParserFixedMutexGroups):
"""
Parser that provides fixes for proper representation of `Weaver` :term:`CLI` arguments.
"""
[docs]def make_parser():
# type: () -> argparse.ArgumentParser
"""
Generate the :term:`CLI` parser.
.. note::
Instead of employing :class:`argparse.ArgumentParser` instances returned
by :meth:`argparse._SubParsersAction.add_parser`, distinct :class:`argparse.ArgumentParser` instances are
created for each operation and then merged back by ourselves as subparsers under the main parser.
This provides more flexibility in arguments passed down and resolves, amongst other things, incorrect
handling of exclusive argument groups and their grouping under corresponding section titles.
"""
# generic logging parser to pass down to each operation
# this allows providing logging options to any of them
log_parser = WeaverArgumentParser(add_help=False)
make_logging_options(log_parser)
desc = "Run {} operations.".format(__meta__.__title__)
parser = WeaverArgumentParser(prog=__meta__.__name__, description=desc, parents=[log_parser])
set_parser_sections(parser)
parser.add_argument(
"--version", "-V",
action="version",
version="%(prog)s {}".format(__meta__.__version__),
help="Display the version of the package."
)
ops_parsers = parser.add_subparsers(
title="Operations", dest="operation",
description="Name of the operation to run."
)
op_deploy = WeaverArgumentParser(
"deploy",
description="Deploy a process.",
)
set_parser_sections(op_deploy)
add_url_param(op_deploy)
add_process_param(op_deploy, required=False, description=(
"Process identifier for deployment. If no ``--body`` is provided, this is required. "
"Otherwise, provided value overrides the corresponding ID in the body."
))
op_deploy.add_argument(
"-b", "--body", dest="body",
help="Deployment body directly provided. Allows both JSON and YAML format when using file reference. "
"If provided in combination with process ID or CWL, they will override the corresponding content. "
"Can be provided either with a local file, an URL or literal string contents formatted as JSON."
)
op_deploy_app_pkg = op_deploy.add_mutually_exclusive_group()
op_deploy_app_pkg.add_argument(
"--cwl", dest="cwl",
help="Application Package of the process defined using Common Workflow Language (CWL) as JSON or YAML "
"format when provided by file reference. File reference can be a local file or URL location. "
"Can also be provided as literal string contents formatted as JSON. "
"Provided contents will be inserted into an automatically generated request deploy body if none was "
"specified with ``--body`` option (note: ``--process`` must be specified instead in that case). "
"Otherwise, it will override the appropriate execution unit section within the provided deploy body."
)
op_deploy_app_pkg.add_argument(
"--wps", dest="wps",
help="Reference URL to a specific process under a Web Processing Service (WPS) to package as OGC-API Process."
)
op_deploy_token = op_deploy.add_mutually_exclusive_group()
op_deploy_token.add_argument(
"-t", "--token", dest="token",
help="Authentication token to retrieve a Docker image reference from a private registry during execution."
)
op_deploy_creds = op_deploy_token.add_argument_group("Credentials")
op_deploy_creds.add_argument(
"-U", "--username", dest="username",
help="Username to compute the authentication token for Docker image retrieval from a private registry."
)
op_deploy_creds.add_argument(
"-P", "--password", dest="password",
help="Password to compute the authentication token for Docker image retrieval from a private registry."
)
op_deploy.add_argument(
"-D", "--delete", "--undeploy", dest="undeploy", action="store_true",
help="Perform undeploy step as applicable prior to deployment to avoid conflict with exiting process."
)
op_undeploy = WeaverArgumentParser(
"undeploy",
description="Undeploy an existing process.",
)
set_parser_sections(op_undeploy)
add_url_param(op_undeploy)
add_process_param(op_undeploy)
op_capabilities = WeaverArgumentParser(
"capabilities",
description="List available processes.",
)
set_parser_sections(op_capabilities)
add_url_param(op_capabilities)
op_describe = WeaverArgumentParser(
"describe",
description="Obtain an existing process description.",
)
set_parser_sections(op_describe)
add_url_param(op_describe)
add_process_param(op_describe)
op_describe.add_argument(
"-S", "--schema", dest="schema", choices=ProcessSchema.values(), default=ProcessSchema.OGC,
help="Representation schema of the returned process description."
)
op_execute = WeaverArgumentParser(
"execute",
description="Submit a job execution for an existing process.",
formatter_class=InputsFormatter,
)
set_parser_sections(op_execute)
add_url_param(op_execute)
add_process_param(op_execute)
op_execute.add_argument(
"-I", "--inputs", dest="inputs",
required=True, nargs=1, action="append", # collect max 1 item per '-I', but allow many '-I'
# note: below is formatted using 'InputsFormatter' with detected paragraphs
help=inspect.cleandoc("""
Literal input definitions, or a file path or URL reference to JSON or YAML
contents defining job inputs with OGC-API or CWL schema. This parameter is required.
To provide inputs using a file reference, refer to relevant CWL Job schema or API request schema
for selected format. Both mapping and listing formats are supported.
To execute a process without any inputs (e.g.: using its defaults),
supply an explicit empty input (i.e.: ``-I ""`` or loaded from file as ``{}``).
To provide inputs using literal command-line definitions, inputs should be specified using ``<id>=<value>``
convention, with distinct ``-I`` options for each applicable input value.
Values that require other type than string to be converted for job submission can include the type
following the ID using a colon separator (i.e.: ``<id>:<type>=<value>``). For example, an integer could be
specified as follows: ``number:int=1`` while a floating point number would be: ``number:float=1.23``.
File references (``href``) should be specified using ``File`` as the type (i.e.: ``input:File=http://...``).
Note that ``File`` in this case is expected to be an URL location where the file can be download from.
When a local file is supplied, Weaver will automatically convert it to a remote Vault File in order to
upload it and make it available for the remote process.
Array input (``maxOccurs > 1``) should be specified using semicolon (;) separated values.
The type of an item of this array can also be provided (i.e.: ``array:int=1;2;3``).
Example: ``-I message='Hello Weaver' -I value:int=1234``
""")
)
# FIXME: support sync (https://github.com/crim-ca/weaver/issues/247)
# op_execute.add_argument(
# "-A", "--async", dest="execute_async",
# help=""
# )
op_execute.add_argument(
"-M", "--monitor", dest="monitor", action="store_true",
help="Automatically perform the monitoring operation following job submission to retrieve final results. "
"If not requested, the created job status location is directly returned."
)
add_timeout_param(op_execute)
op_dismiss = WeaverArgumentParser(
"dismiss",
description="Dismiss a pending or running job, or wipe any finished job results.",
)
set_parser_sections(op_dismiss)
add_url_param(op_dismiss, required=False)
add_job_ref_param(op_dismiss)
op_monitor = WeaverArgumentParser(
"monitor",
description="Monitor a pending or running job execution until completion or up to a maximum wait time."
)
add_url_param(op_monitor, required=False)
add_job_ref_param(op_monitor)
add_timeout_param(op_monitor)
op_status = WeaverArgumentParser(
"status",
description=(
"Obtain the status of a job using a reference UUID or URL. "
"This is equivalent to doing a single-shot 'monitor' operation without any pooling or retries."
),
)
set_parser_sections(op_status)
add_url_param(op_status, required=False)
add_job_ref_param(op_status)
op_results = WeaverArgumentParser(
"results",
description=(
"Obtain the output results description of a job. "
"This operation can also download them from the remote server if requested."
),
)
set_parser_sections(op_results)
add_url_param(op_results, required=False)
add_job_ref_param(op_results)
op_results.add_argument(
"-D", "--download", dest="download", action="store_true",
help="Download all found job results file references to output location. "
"If not requested, the operation simply displays the job results (default: %(default)s)."
)
op_results.add_argument(
"-O", "--outdir", dest="out_dir",
help="Output directory where to store downloaded files from job results if requested "
"(default: ${CURDIR}/{JobID}/<outputs.files>)."
)
op_upload = WeaverArgumentParser(
"upload",
description=(
"Upload a local file to the remote server vault for reference in process execution inputs. "
"This operation is accomplished automatically for all execution inputs submitted using local files. "
"[note: feature only available for Weaver instances]"
),
)
set_parser_sections(op_upload)
add_url_param(op_upload, required=True)
op_upload.add_argument(
"-c", "--content-type", dest="content_type",
help="Content-Type of the file to apply. "
"This should be an IANA Media-Type, optionally with additional parameters such as charset. "
"If not provided, attempts to guess it based on the file extension."
)
op_upload.add_argument(
"-f", "--file", dest="file_path", metavar="FILE", required=True,
help="Local file path to upload to the vault."
)
operations = [
op_deploy,
op_undeploy,
op_capabilities,
op_describe,
op_execute,
op_monitor,
op_dismiss,
op_status,
op_results,
op_upload,
]
aliases = {
"processes": op_capabilities
}
for op_parser in operations:
op_aliases = [alias for alias, op_alias in aliases.items() if op_alias is op_parser]
# add help disabled otherwise conflicts with main parser help
sub_op_parser = ops_parsers.add_parser(
op_parser.prog, aliases=op_aliases, parents=[log_parser, op_parser],
add_help=False, help=op_parser.description,
formatter_class=op_parser.formatter_class,
description=op_parser.description, usage=op_parser.usage
)
set_parser_sections(sub_op_parser)
return parser
[docs]def main(*args):
# type: (Any) -> int
parser = make_parser()
ns = parser.parse_args(args=args or None)
setup_logger_from_options(LOGGER, ns)
kwargs = vars(ns)
# remove logging params not known by operations
for param in ["stdout", "log", "log_level", "quiet", "debug", "verbose"]:
kwargs.pop(param, None)
oper = kwargs.pop("operation", None)
LOGGER.debug("Requested operation: [%s]", oper)
if not oper or oper not in dir(WeaverClient):
parser.print_help()
return 0
url = kwargs.pop("url", None)
client = WeaverClient(url)
result = getattr(client, oper)(**kwargs)
if result.success:
LOGGER.info("%s successful. %s", oper.title(), result.message)
print(result.text) # use print in case logger disabled or level error/warn
return 0
LOGGER.error("%s failed. %s", oper.title(), result.message)
print(result.text)
return -1
if __name__ == "__main__":
sys.exit(main())