import abc
import argparse
import base64
import copy
import inspect
import logging
import os
import re
import sys
import time
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import yaml
from requests.auth import AuthBase, HTTPBasicAuth
from requests.structures import CaseInsensitiveDict
from webob.headers import ResponseHeaders
from yaml.scanner import ScannerError
from weaver import __meta__
from weaver.datatype import AutoBase
from weaver.exceptions import PackageRegistrationError
from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType, OutputFormat, get_content_type, get_format
from weaver.processes.constants import ProcessSchema
from weaver.processes.convert import (
convert_input_values_schema,
cwl2json_input_values,
get_field,
repr2json_input_values
)
from weaver.processes.utils import get_process_information
from weaver.processes.wps_package import get_process_definition
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.utils import (
fetch_file,
fully_qualified_name,
get_any_id,
get_any_value,
get_file_headers,
get_header,
import_target,
load_file,
null,
parse_kvp,
request_extra,
setup_loggers
)
from weaver.visibility import Visibility
from weaver.wps_restapi import swagger_definitions as sd
if TYPE_CHECKING:
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union
from requests import Response
# avoid failing sphinx-argparse documentation
# https://github.com/ashb/sphinx-argparse/issues/7
try:
from weaver.typedefs import (
AnyHeadersContainer,
AnyRequestMethod,
AnyRequestType,
AnyResponseType,
CWL,
JSON,
ExecutionInputsMap,
ExecutionResults,
HeadersType
)
except ImportError:
# avoid linter issue
AnyHeadersContainer = AnyRequestType = AnyResponseType = Any
CWL = JSON = ExecutionInputsMap = ExecutionResults = HeadersType = Any
try:
from weaver.formats import AnyOutputFormat
from weaver.processes.constants import ProcessSchemaType
from weaver.status import StatusType
except ImportError:
AnyOutputFormat = str
ProcessSchemaType = str
StatusType = str
[docs]LOGGER = logging.getLogger(__name__)
[docs]OPERATION_ARGS_TITLE = "Operation Arguments"
[docs]OPTIONAL_ARGS_TITLE = "Optional Arguments"
[docs]REQUIRED_ARGS_TITLE = "Required Arguments"
[docs]class OperationResult(AutoBase):
"""
Data container for any :class:`WeaverClient` operation results.
:param success: Success status of the operation.
:param message: Detail extracted from response content if available.
:param headers: Headers returned by the response for reference.
:param body: Content of :term:`JSON` response or fallback in plain text.
:param text: Pre-formatted text representation of :paramref:`body`.
"""
[docs] success = False # type: Optional[bool]
[docs] message = "" # type: Optional[str]
[docs] body = {} # type: Optional[Union[JSON, str]]
[docs] code = None # type: Optional[int]
def __init__(self,
success=None, # type: Optional[bool]
message=None, # type: Optional[str]
body=None, # type: Optional[Union[str, JSON]]
headers=None, # type: Optional[AnyHeadersContainer]
text=None, # type: Optional[str]
code=None, # type: Optional[int]
**kwargs, # type: Any
): # type: (...) -> None
super(OperationResult, self).__init__(**kwargs)
self.success = success
self.message = message
self.headers = ResponseHeaders(headers) if headers is not None else None
self.body = body
self.text = text
self.code = code
def __repr__(self):
# type: () -> str
params = ["success", "code", "message"]
quotes = [False, False, True]
quoted = lambda q, v: f"\"{v}\"" if q and v is not None else v # noqa: E731
values = ", ".join([f"{param}={quoted(quote, getattr(self, param))}" for quote, param in zip(quotes, params)])
return f"{type(self).__name__}({values})\n{self.text}"
@property
[docs] def text(self):
# type: () -> str
text = dict.get(self, "text", None)
if not text and self.body:
text = OutputFormat.convert(self.body, OutputFormat.JSON_STR)
self["text"] = text
return text
@text.setter
def text(self, text):
# type: (str) -> None
self["text"] = text
[docs] def links(self, header_names=None):
# type: (Optional[List[str]]) -> ResponseHeaders
"""
Obtain HTTP headers sorted in the result that corresponds to any link reference.
:param header_names:
Limit link names to be considered.
By default, considered headers are ``Link``, ``Content-Location`` and ``Location``.
"""
if not self.headers:
return ResponseHeaders([])
if not isinstance(self.headers, ResponseHeaders):
self.headers = ResponseHeaders(self.headers)
if not header_names:
header_names = ["Link", "Content-Location", "Location"]
header_names = [hdr.lower() for hdr in header_names]
link_headers = ResponseHeaders()
for hdr_n, hdr_v in self.headers.items():
if hdr_n.lower() in header_names:
link_headers.add(hdr_n, hdr_v)
return link_headers
[docs]class AuthHandler(AuthBase):
[docs] url = None # type: Optional[str]
[docs] method = "GET" # type: AnyRequestMethod
[docs] headers = {} # type: Optional[AnyHeadersContainer]
[docs] identity = None # type: Optional[str]
[docs] password = None # type: Optional[str] # nosec
def __init__(self, identity=None, password=None, url=None, method="GET", headers=None):
# type: (Optional[str], Optional[str], Optional[str], AnyRequestMethod, Optional[AnyHeadersContainer]) -> None
if identity is not None:
self.identity = identity
if password is not None:
self.password = password
if url is not None:
self.url = url
if method is not None:
self.method = method
if headers:
self.headers = headers
@abc.abstractmethod
def __call__(self, request):
# type: (AnyRequestType) -> AnyRequestType
"""
Operation that performs inline authentication retrieval prior to sending the request.
"""
raise NotImplementedError
[docs]class BasicAuthHandler(AuthHandler, HTTPBasicAuth):
"""
Adds the ``Authorization`` header formed from basic authentication encoding of username and password to the request.
Authentication URL and method are not needed for this handler.
"""
def __init__(self, username, password, **kwargs):
# type: (str, str, Any) -> None
AuthHandler.__init__(self, identity=username, password=password, **kwargs)
HTTPBasicAuth.__init__(self, username=username, password=password)
@property
[docs] def username(self):
# type: () -> str
return self.identity
@username.setter
def username(self, username):
# type: (str) -> None
self.identity = username
def __call__(self, request):
# type: (AnyRequestType) -> AnyRequestType
return HTTPBasicAuth.__call__(self, request)
[docs]class RequestAuthHandler(AuthHandler, HTTPBasicAuth):
"""
Base class to send a request in order to retrieve an authorization token.
"""
@property
[docs] def auth_token_name(self):
# type: () -> str
"""
Override token name to retrieve in response authentication handler implementation.
Default looks amongst common names: [auth, access_token, token]
"""
return ""
@abc.abstractmethod
[docs] def auth_header(self, token):
# type: (str) -> AnyHeadersContainer
"""
Obtain the header definition with the provided authorization token.
"""
raise NotImplementedError
[docs] def request_auth(self):
# type: () -> Optional[str]
"""
Performs a request using authentication parameters to retrieve the authorization token.
"""
auth_headers = {"Accept": ContentType.APP_JSON}
auth_headers.update(self.headers)
resp = request_extra(self.method, self.url, headers=auth_headers)
if resp.status_code != 200:
return None
return self.response_parser(resp)
[docs] def response_parser(self, response):
# type: (Response) -> Optional[str]
"""
Parses a valid authentication response to extract the received authorization token.
"""
ctype = get_header("Content-Type", response.headers)
auth = None
if ContentType.APP_JSON in ctype:
body = response.json()
if self.auth_token_name:
auth = body.get(self.auth_token_name)
else:
auth = body.get("auth") or body.get("access_token") or body.get("token")
return auth
def __call__(self, request):
# type: (AnyRequestType) -> AnyRequestType
auth_token = self.request_auth()
if not auth_token:
LOGGER.warning("Expected authorization token could not be retrieved from: [%s] in [%s]",
self.url, fully_qualified_name(self))
else:
auth_header = self.auth_header(auth_token)
request.headers.update(auth_header)
return request
[docs]class BearerAuthHandler(RequestAuthHandler):
"""
Adds the ``Authorization`` header formed of the authentication bearer token from the underlying request.
"""
[docs] def auth_header(self, token):
# type: (str) -> AnyHeadersContainer
return {"Authorization": f"Bearer {token}"}
[docs]class CookieAuthHandler(RequestAuthHandler):
"""
Adds the ``Authorization`` header formed of the authentication bearer token from the underlying request.
"""
[docs] def auth_header(self, token):
# type: (str) -> AnyHeadersContainer
return {"Cookie": token}
[docs]class WeaverClient(object):
"""
Client that handles common HTTP requests with a `Weaver` or similar :term:`OGC API - Processes` instance.
"""
# default configuration parameters, overridable by corresponding method parameters
[docs] monitor_timeout = 60 # maximum delay to wait for job completion
[docs] monitor_interval = 5 # interval between monitor pooling job status requests
[docs] auth = None # type: AuthHandler
def __init__(self, url=None, auth=None):
# type: (Optional[str], Optional[AuthHandler]) -> None
"""
Initialize the client with predefined parameters.
:param url: Instance URL to employ for each method call. Must be provided each time if not defined here.
:param auth:
Instance authentication handler that will be applied for every request.
For specific authentication method on per-request basis, parameter should be provided to respective methods.
Should perform required adjustments to request to allow access control of protected contents.
"""
if url:
self._url = self._parse_url(url)
LOGGER.debug("Using URL: [%s]", self._url)
else:
self._url = None
LOGGER.warning("No URL provided. All operations must provide it directly or through another parameter!")
self.auth = auth
self._headers = {"Accept": ContentType.APP_JSON, "Content-Type": ContentType.APP_JSON}
self._settings = {
"weaver.request_options": {}
} # FIXME: load from INI, overrides as input (cumul arg '--setting weaver.x=value') ?
[docs] def _request(self,
method, # type: AnyRequestMethod
url, # type: str
*args, # type: Any
headers=None, # type: Optional[AnyHeadersContainer]
x_headers=None, # type: Optional[AnyHeadersContainer]
**kwargs # type: Any
): # type: (...) -> AnyResponseType
if self.auth is not None and kwargs.get("auth") is None:
kwargs["auth"] = self.auth
if not headers and x_headers:
headers = x_headers
elif headers:
headers = CaseInsensitiveDict(headers)
x_headers = CaseInsensitiveDict(x_headers)
headers.update(x_headers)
return request_extra(method, url, *args, headers=headers, **kwargs)
[docs] def _get_url(self, url):
# type: (Optional[str]) -> str
if not self._url and not url:
raise ValueError("No URL available. Client was not created with an URL and operation did not receive one.")
return self._url or self._parse_url(url)
@staticmethod
[docs] def _parse_url(url):
parsed = urlparse("http://" + url if not url.startswith("http") else url)
parsed_netloc_path = f"{parsed.netloc}{parsed.path}".replace("//", "/")
parsed_url = f"{parsed.scheme}://{parsed_netloc_path}"
return parsed_url.rsplit("/", 1)[0] if parsed_url.endswith("/") else parsed_url
@staticmethod
[docs] def _parse_result(response, # type: Union[Response, OperationResult]
body=None, # type: Optional[JSON] # override response body
message=None, # type: Optional[str] # override message/description in contents
success=None, # type: Optional[bool] # override resolved success
with_headers=False, # type: bool
with_links=True, # type: bool
nested_links=None, # type: Optional[str]
output_format=None, # type: Optional[AnyOutputFormat]
content_type=None, # type: Optional[ContentType]
): # type: (...) -> OperationResult
# multi-header of same name, for example to support many Link
headers = ResponseHeaders(response.headers)
code = getattr(response, "status_code", None) or getattr(response, "code", None)
_success = False
try:
msg = None
ctype = headers.get("Content-Type", content_type)
content = getattr(response, "content", None) or getattr(response, "body", None)
text = None
if not body and content and ctype and ContentType.APP_JSON in ctype and hasattr(response, "json"):
body = response.json()
elif isinstance(response, OperationResult):
body = response.body
# Don't set text if no-content, since used by jobs header-only response. Explicit null will replace it.
elif response.text and not body:
msg = "Could not parse body."
text = response.text
if isinstance(body, dict):
if not with_links:
if nested_links:
nested = body.get(nested_links, [])
if isinstance(nested, list):
for item in nested:
if isinstance(item, dict):
item.pop("links", None)
body.pop("links", None)
msg = body.get("description", body.get("message", "undefined"))
if code >= 400:
if not msg and isinstance(body, dict):
msg = body.get("error", body.get("exception", "unknown"))
else:
_success = True
msg = message or getattr(response, "message", None) or msg or "undefined"
text = text or OutputFormat.convert(body, output_format or OutputFormat.JSON_STR, item_root="result")
except Exception as exc: # noqa # pragma: no cover # ignore safeguard against error in implementation
msg = "Could not parse body."
text = body = response.text
LOGGER.warning(msg, exc_info=exc)
if with_headers:
# convert potential multi-equal-key headers into a JSON/YAML serializable format
hdr_l = [{hdr_name: hdr_val} for hdr_name, hdr_val in sorted(headers.items())]
hdr_s = OutputFormat.convert({"Headers": hdr_l}, OutputFormat.YAML)
text = f"{hdr_s}---\n{text}"
if success is not None:
_success = success
return OperationResult(_success, msg, body, headers, text=text, code=code)
@staticmethod
[docs] def _parse_deploy_body(body, process_id):
# type: (Optional[Union[JSON, str]], Optional[str]) -> OperationResult
data = {} # type: JSON
try:
if body:
if isinstance(body, str) and (body.startswith("http") or os.path.isfile(body)):
data = load_file(body)
elif isinstance(body, str) and body.startswith("{") and body.endswith("}"):
data = yaml.safe_load(body)
elif isinstance(body, dict):
data = body
else:
msg = "Cannot load badly formed body. Deploy JSON object or file reference expected."
return OperationResult(False, msg, body, {})
elif not body:
data = {
"processDescription": {
"process": {"id": process_id}
}
}
desc = data.get("processDescription", {})
if data and process_id:
LOGGER.debug("Override provided process ID [%s] into provided/loaded body.", process_id)
desc = data.get("processDescription", {}).get("process", {}) or data.get("processDescription", {})
desc["id"] = process_id
data.setdefault("processDescription", desc) # already applied if description was found/updated at any level
desc["visibility"] = Visibility.PUBLIC
except (ValueError, TypeError, ScannerError) as exc: # pragma: no cover
return OperationResult(False, f"Failed resolution of body definition: [{exc!s}]", body)
return OperationResult(True, "", data)
@staticmethod
[docs] def _parse_deploy_package(body, cwl, wps, process_id, headers):
# type: (JSON, Optional[CWL], Optional[str], Optional[str], HeadersType) -> OperationResult
try:
p_desc = get_process_information(body)
p_id = get_any_id(p_desc, default=process_id)
info = {"id": p_id} # minimum requirement for process offering validation
if (isinstance(cwl, str) and not cwl.startswith("{")) or isinstance(wps, str):
LOGGER.debug("Override loaded CWL into provided/loaded body for process: [%s]", p_id)
proc = get_process_definition(info, reference=cwl or wps, headers=headers) # validate
body["executionUnit"] = [{"unit": proc["package"]}]
elif isinstance(cwl, str) and cwl.startswith("{") and cwl.endswith("}"):
LOGGER.debug("Override parsed CWL into provided/loaded body for process: [%s]", p_id)
pkg = yaml.safe_load(cwl)
if not isinstance(pkg, dict) or pkg.get("cwlVersion") is None:
raise PackageRegistrationError("Failed parsing or invalid CWL from expected literal JSON string.")
proc = get_process_definition(info, package=pkg, headers=headers) # validate
body["executionUnit"] = [{"unit": proc["package"]}]
elif isinstance(cwl, dict):
LOGGER.debug("Override provided CWL into provided/loaded body for process: [%s]", p_id)
get_process_definition(info, package=cwl, headers=headers) # validate
body["executionUnit"] = [{"unit": cwl}]
except (PackageRegistrationError, ScannerError) as exc: # pragma: no cover
message = f"Failed resolution of package definition: [{exc!s}]"
return OperationResult(False, message, cwl)
return OperationResult(True, p_id, body)
[docs] def _parse_job_ref(self, job_reference, url=None):
# type: (str, Optional[str]) -> Tuple[Optional[str], Optional[str]]
if job_reference.startswith("http"):
job_url = job_reference
job_parts = [part for part in job_url.split("/") if part.strip()]
job_id = job_parts[-1]
else:
url = self._get_url(url)
job_id = job_reference
job_url = f"{url}/jobs/{job_id}"
return job_id, job_url
@staticmethod
[docs] def _parse_auth_token(token, username, password):
# type: (Optional[str], Optional[str], Optional[str]) -> HeadersType
if token or (username and password):
if not token:
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("utf-8")
return {sd.XAuthDockerHeader.name: f"Basic {token}"}
return {}
[docs] def deploy(self,
process_id=None, # type: Optional[str]
body=None, # type: Optional[Union[JSON, str]]
cwl=None, # type: Optional[Union[CWL, str]]
wps=None, # type: Optional[str]
token=None, # type: Optional[str]
username=None, # type: Optional[str]
password=None, # type: Optional[str]
undeploy=False, # type: bool
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Deploy a new :term:`Process` with specified metadata and reference to an :term:`Application Package`.
The referenced :term:`Application Package` must be one of:
- :term:`CWL` body, local file or URL in :term:`JSON` or :term:`YAML` format
- :term:`WPS` process URL with :term:`XML` response
- :term:`WPS-REST` process URL with :term:`JSON` response
- :term:`OGC API - Processes` process URL with :term:`JSON` response
If the reference is resolved to be a :term:`Workflow`, all its underlying :term:`Process` steps must be
available under the same URL that this client was initialized with.
.. seealso::
:ref:`proc_op_deploy`
:param process_id:
Desired process identifier.
Can be omitted if already provided in body contents or file.
:param body:
Literal :term:`JSON` contents, either using string representation of actual Python objects forming the
request body, or file path/URL to :term:`YAML` or :term:`JSON` contents of the request body.
Other parameters (:paramref:`process_id`, :paramref:`cwl`) can override corresponding fields within the
provided body.
:param cwl:
Literal :term:`JSON` or :term:`YAML` contents, either using string representation of actual Python objects,
or file path/URL with contents of the :term:`CWL` definition of the :term:`Application package` to be
inserted into the body.
:param wps: URL to an existing :term:`WPS` process (WPS-1/2 or WPS-REST/OGC-API).
:param token: Authentication token for accessing private Docker registry if :term:`CWL` refers to such image.
:param username: Username to form the authentication token to a private Docker registry.
:param password: Password to form the authentication token to a private Docker registry.
:param undeploy: Perform undeploy as necessary before deployment to avoid conflict with exiting :term:`Process`.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Results of the operation.
"""
result = self._parse_deploy_body(body, process_id)
if not result.success:
return result
req_headers = copy.deepcopy(self._headers)
req_headers.update(self._parse_auth_token(token, username, password))
data = result.body
result = self._parse_deploy_package(data, cwl, wps, process_id, req_headers)
if not result.success:
return result
p_id = result.message
data = result.body
base = self._get_url(url)
if undeploy:
LOGGER.debug("Performing requested undeploy of process: [%s]", p_id)
result = self.undeploy(process_id=p_id, url=base)
if result.code not in [200, 404]:
return OperationResult(False, "Failed requested undeployment prior deployment.",
body=result.body, text=result.text, code=result.code, headers=result.headers)
LOGGER.debug("Deployment Body:\n%s", OutputFormat.convert(data, OutputFormat.JSON_STR))
path = f"{base}/processes"
resp = self._request("POST", path, json=data, headers=req_headers, x_headers=headers,
settings=self._settings, auth=auth)
return self._parse_result(resp, with_links=with_links, with_headers=with_headers, output_format=output_format)
[docs] def undeploy(self,
process_id, # type: str
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Undeploy an existing :term:`Process`.
:param process_id: Identifier of the process to undeploy.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Results of the operation.
"""
base = self._get_url(url)
path = f"{base}/processes/{process_id}"
resp = self._request("DELETE", path, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
return self._parse_result(resp, with_links=with_links, with_headers=with_headers, output_format=output_format)
[docs] def capabilities(self,
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
List all available :term:`Process` on the instance.
.. seealso::
:ref:`proc_op_getcap`
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Results of the operation.
"""
base = self._get_url(url)
path = f"{base}/processes"
query = {"detail": False} # not supported by non-Weaver, but save the work if possible
resp = self._request("GET", path, params=query, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
result = self._parse_result(resp)
if not result.success:
return result
body = resp.json()
processes = body.get("processes")
if isinstance(processes, list) and all(isinstance(proc, dict) for proc in processes):
body = [get_any_id(proc) for proc in processes]
return self._parse_result(resp, body=body, output_format=output_format,
with_links=with_links, with_headers=with_headers)
[docs] processes = capabilities # alias
"""
Alias of :meth:`capabilities` for :term:`Process` listing.
"""
[docs] def describe(self,
process_id, # type: str
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
schema=ProcessSchema.OGC, # type: Optional[ProcessSchemaType]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Describe the specified :term:`Process`.
.. seealso::
:ref:`proc_op_describe`
:param process_id: Identifier of the process to describe.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param schema: Representation schema of the returned process description.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Results of the operation.
"""
base = self._get_url(url)
path = f"{base}/processes/{process_id}"
query = None
schema = ProcessSchema.get(schema)
if schema:
query = {"schema": schema}
resp = self._request("GET", path, params=query, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
# API response from this request can contain 'description' matching the process description
# rather than a generic response 'description'. Enforce the provided message to avoid confusion.
return self._parse_result(resp, message="Retrieving process description.", output_format=output_format,
with_links=with_links, with_headers=with_headers)
@staticmethod
[docs] def _update_files(self, inputs, url=None):
# type: (ExecutionInputsMap, Optional[str]) -> Union[Tuple[ExecutionInputsMap, HeadersType], OperationResult]
"""
Replaces local file paths by references uploaded to the :term:`Vault`.
.. seealso::
- Headers dictionary limitation by :mod:`requests`:
https://requests.readthedocs.io/en/master/user/quickstart/#response-content
- Headers formatting with multiple values must be provided by comma-separated values
(:rfc:`7230#section-3.2.2`).
- Multi Vault-Token parsing accomplished by :func:`weaver.vault.utils.parse_vault_token`.
- More details about formats and operations related to :term:`Vault` are provided
in :ref:`file_vault_token` and :ref:`vault` chapters.
:param inputs: Input values for submission of :term:`Process` execution.
:return: Updated inputs or the result of a failing intermediate request.
"""
auth_tokens = {} # type: Dict[str, str]
update_inputs = dict(inputs)
for input_id, input_data in dict(inputs).items():
if not isinstance(input_data, list): # support array of files
input_data = [input_data]
for data in input_data:
if not isinstance(data, dict):
continue
file = href = get_any_value(data, default=null, data=False)
if not isinstance(href, str):
continue
if href.startswith("file://"):
href = href[7:]
if "://" not in href and not os.path.isfile(href):
LOGGER.warning(
"Ignoring potential local file reference since it does not exist. "
"Cannot upload to vault: [%s]", file
)
continue
fmt = data.get("format", {})
ctype = get_field(fmt, "mime_type", search_variations=True)
if not ctype:
ext = os.path.splitext(href)[-1]
ctype = get_content_type(ext)
fmt = get_format(ctype, default=ContentType.TEXT_PLAIN)
res = self.upload(href, content_type=fmt.mime_type, url=url)
if res.code != 200:
return res
vault_href = res.body["file_href"]
vault_id = res.body["file_id"]
token = res.body["access_token"]
auth_tokens[vault_id] = token
LOGGER.info("Converted (input: %s) [%s] -> [%s]", input_id, file, vault_href)
update_inputs[input_id] = {"href": vault_href, "format": {"mediaType": ctype}}
auth_headers = {}
if auth_tokens:
multi_tokens = ",".join([
f"token {token}; id={input_id}"
for input_id, token in auth_tokens.items()
])
auth_headers = {sd.XAuthVaultFileHeader.name: multi_tokens}
return update_inputs, auth_headers
[docs] def execute(self,
process_id, # type: str
inputs=None, # type: Optional[Union[str, JSON]]
monitor=False, # type: bool
timeout=None, # type: Optional[int]
interval=None, # type: Optional[int]
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
output_refs=None, # type: Optional[Iterable[str]]
): # type: (...) -> OperationResult
"""
Execute a :term:`Job` for the specified :term:`Process` with provided inputs.
When submitting inputs with :term:`OGC API - Processes` schema, top-level ``inputs`` key is expected.
Under it, either the mapping (key-value) or listing (id,value) representation are accepted.
If ``inputs`` is not found, the alternative :term:`CWL` will be assumed.
When submitting inputs with :term:`CWL` *job* schema, plain key-value(s) pairs are expected.
All values should be provided directly under the key (including arrays), except for ``File``
type that must include the ``class`` and ``path`` details.
.. seealso::
:ref:`proc_op_execute`
.. note::
Execution requests are always accomplished asynchronously. To obtain the final :term:`Job` status as if
they were executed synchronously, provide the :paramref:`monitor` argument. This offers more flexibility
over servers that could decide to ignore sync/async preferences, and avoids closing/timeout connection
errors that could occur for long running processes, since status is pooled iteratively rather than waiting.
:param process_id: Identifier of the process to execute.
:param inputs:
Literal :term:`JSON` or :term:`YAML` contents of the inputs submitted and inserted into the execution body,
using either the :term:`OGC API - Processes` or :term:`CWL` format, or a file path/URL referring to them.
:param monitor:
Automatically perform :term:`Job` execution monitoring until completion or timeout to obtain final results.
If requested, this operation will become blocking until either the completed status or timeout is reached.
:param timeout:
Monitoring timeout (seconds) if requested.
:param interval:
Monitoring interval (seconds) between job status polling requests.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:param output_refs:
Indicates which outputs by ID to be returned as HTTP Link header reference instead of body content value.
With reference transmission mode, outputs that contain literal data will be linked by ``text/plain`` file
containing the data. outputs that refer to a file reference will simply contain that URL reference as link.
With value transmission mode (default behavior when outputs are not specified in this list), outputs are
returned as direct values (literal or href) within the response content body.
:returns: Results of the operation.
"""
base = self._get_url(url) # raise before inputs parsing if not available
if isinstance(inputs, list) and all(isinstance(item, list) for item in inputs):
inputs = [items for sub in inputs for items in sub] # flatten 2D->1D list
values = self._parse_inputs(inputs)
if isinstance(values, OperationResult):
return values
result = self._update_files(values, url=base)
if isinstance(result, OperationResult):
return result
values, auth_headers = result
data = {
# NOTE: Backward compatibility for servers that only know ``mode`` and don't handle ``Prefer`` header.
"mode": ExecuteMode.ASYNC,
"inputs": values,
"response": ExecuteResponse.DOCUMENT,
# FIXME: allow filtering 'outputs' (https://github.com/crim-ca/weaver/issues/380)
"outputs": {}
}
# omit x-headers on purpose for 'describe', assume they are intended for 'execute' operation only
result = self.describe(process_id, url=base, auth=auth)
if not result.success:
return OperationResult(False, "Could not obtain process description for execution.",
body=result.body, headers=result.headers, code=result.code, text=result.text)
outputs = result.body.get("outputs")
output_refs = set(output_refs or [])
for output_id in outputs:
if output_id in output_refs:
# If any 'reference' is requested explicitly, must switch to 'response=raw'
# since 'response=document' ignores 'transmissionMode' definitions.
data["response"] = ExecuteResponse.RAW
# Use 'value' to have all outputs reported in body as 'value/href' rather than 'Link' headers.
out_mode = ExecuteTransmissionMode.REFERENCE
else:
# make sure to set value to outputs not requested as reference in case another one needs reference
# mode doesn't matter if no output by reference requested since 'response=document' would be used
out_mode = ExecuteTransmissionMode.VALUE
data["outputs"][output_id] = {"transmissionMode": out_mode}
LOGGER.info("Executing [%s] with inputs:\n%s", process_id, OutputFormat.convert(values, OutputFormat.JSON_STR))
path = f"{base}/processes/{process_id}/execution" # use OGC-API compliant endpoint (not '/jobs')
exec_headers = {"Prefer": "respond-async"} # for more recent servers, OGC-API compliant async request
exec_headers.update(self._headers)
exec_headers.update(auth_headers)
resp = self._request("POST", path, json=data, headers=exec_headers, x_headers=headers,
settings=self._settings, auth=auth)
result = self._parse_result(resp, with_links=with_links, with_headers=with_headers, output_format=output_format)
if not monitor or not result.success:
return result
# although Weaver returns "jobID" in the body for convenience,
# employ the "Location" header to be OGC-API compliant
job_url = resp.headers.get("Location", "")
return self.monitor(job_url, timeout=timeout, interval=interval, auth=auth, # omit x-headers on purpose
with_links=with_links, with_headers=with_headers, output_format=output_format)
[docs] def upload(self,
file_path, # type: str
content_type=None, # type: Optional[str]
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Upload a local file to the :term:`Vault`.
.. note::
Feature only available for `Weaver` instances. Not available for standard :term:`OGC API - Processes`.
.. seealso::
More details about formats and operations related to :term:`Vault` are provided
in :ref:`file_vault_token` and :ref:`vault` chapters.
:param file_path: Location of the file to be uploaded.
:param content_type:
Explicit Content-Type of the file.
This should be an IANA Media-Type, optionally with additional parameters such as charset.
If not provided, attempts to guess it based on the file extension.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Results of the operation.
"""
if not isinstance(file_path, str):
file_type = fully_qualified_name(file_path)
return OperationResult(False, "Local file reference is not a string.", {"file_path": file_type})
if file_path.startswith("file://"):
file_path = file_path[7:]
if "://" in file_path:
scheme = file_path.split("://", 1)[0]
return OperationResult(False, "Scheme not supported for local file reference.", {"file_scheme": scheme})
file_path = os.path.abspath(os.path.expanduser(file_path))
if not os.path.isfile(file_path):
return OperationResult(False, "Resolved local file reference does not exist.", {"file_path": file_path})
LOGGER.debug("Processing file for vault upload: [%s]", file_path)
file_headers = get_file_headers(file_path, content_headers=True, content_type=content_type)
base = self._get_url(url)
path = f"{base}/vault"
files = {
"file": (
os.path.basename(file_path),
open(file_path, "r", encoding="utf-8"), # pylint: disable=R1732
file_headers["Content-Type"]
)
}
req_headers = {
"Accept": ContentType.APP_JSON, # no 'Content-Type' since auto generated with multipart boundary
"Cache-Control": "no-cache", # ensure the cache is not used to return a previously uploaded file
}
# allow retry to avoid some sporadic HTTP 403 errors
resp = self._request("POST", path, files=files, retry=2, headers=req_headers, x_headers=headers,
settings=self._settings, auth=auth)
return self._parse_result(resp, with_links=with_links, with_headers=with_headers, output_format=output_format)
[docs] def jobs(self,
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
page=None, # type: Optional[int]
limit=None, # type: Optional[int]
status=None, # type: Optional[StatusType]
detail=False, # type: bool
groups=False, # type: bool
): # type: (...) -> OperationResult
"""
Obtain a listing of :term:`Job`.
.. seealso::
:ref:`proc_op_status`
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:param page: Paging index to list jobs.
:param limit: Amount of jobs to list per page.
:param status: Filter job listing only to matching status.
:param detail: Obtain detailed job descriptions.
:param groups: Obtain grouped representation of jobs per provider and process categories.
:returns: Retrieved status of the job.
"""
base_url = self._get_url(url)
jobs_url = f"{base_url}/jobs" if not base_url.endswith("/jobs") else base_url
LOGGER.info("Getting job listing: [%s]", jobs_url)
query = {}
if isinstance(page, int) and page > 0:
query["page"] = page
if isinstance(limit, int) and limit > 0:
query["limit"] = limit
if isinstance(status, str) and status:
query["status"] = map_status(status)
if isinstance(detail, bool) and detail:
query["detail"] = detail
if isinstance(groups, bool) and groups:
query["groups"] = groups
resp = self._request("GET", jobs_url, params=query, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
return self._parse_result(resp, output_format=output_format,
nested_links="jobs", with_links=with_links, with_headers=with_headers)
[docs] def status(self,
job_reference, # type: str
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Obtain the status of a :term:`Job`.
.. seealso::
:ref:`proc_op_status`
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Retrieved status of the job.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
LOGGER.info("Getting job status: [%s]", job_id)
resp = self._request("GET", job_url, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
return self._parse_result(resp, with_links=with_links, with_headers=with_headers, output_format=output_format)
[docs] def monitor(self,
job_reference, # type: str
timeout=None, # type: Optional[int]
interval=None, # type: Optional[int]
wait_for_status=Status.SUCCEEDED, # type: str
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Monitor the execution of a :term:`Job` until completion.
.. seealso::
:ref:`proc_op_monitor`
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param timeout: timeout (seconds) of maximum wait time for monitoring if completion is not reached.
:param interval: wait interval (seconds) between polling monitor requests.
:param wait_for_status: monitor until the requested status is reached (default: job failed or succeeded).
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:return: Result of the successful or failed job, or timeout of monitoring process.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
remain = timeout = timeout or self.monitor_timeout
delta = interval or self.monitor_interval
LOGGER.info("Monitoring job [%s] for %ss at intervals of %ss.", job_id, timeout, delta)
LOGGER.debug("Job URL: [%s]", job_url)
once = True
while remain >= 0 or once:
resp = self._request("GET", job_url, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
if resp.status_code != 200:
return OperationResult(False, "Could not find job with specified reference.", {"job": job_reference})
body = resp.json()
status = body.get("status")
if status == wait_for_status:
msg = f"Requested job status reached [{wait_for_status}]."
return self._parse_result(resp, success=True, message=msg, with_links=with_links,
with_headers=with_headers, output_format=output_format)
if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]:
msg = "Requested job status not reached, but job has finished."
return self._parse_result(resp, success=False, message=msg, with_links=with_links,
with_headers=with_headers, output_format=output_format)
time.sleep(delta)
remain -= delta
once = False
return OperationResult(False, f"Monitoring timeout reached ({timeout}s). Job did not complete in time.")
[docs] def _download_references(self, outputs, out_links, out_dir, job_id, auth=None):
# type: (ExecutionResults, AnyHeadersContainer, str, str, Optional[AuthHandler]) -> ExecutionResults
"""
Download file references from results response contents and link headers.
Downloaded files extend the results contents with ``path`` and ``source`` fields to indicate where the
retrieved files have been saved and where they came from. When files are found by HTTP header links, they
are added to the output contents to generate a combined representation in the operation result.
"""
if not isinstance(outputs, dict):
# default if links-only needed later (insert as content for printed output)
outputs = {} # type: ExecutionResults
# download file results
if not (any("href" in value for value in outputs.values()) or len(out_links)):
return OperationResult(False, "Outputs were found but none are downloadable (only raw values?).", outputs)
if not out_dir:
out_dir = os.path.join(os.path.realpath(os.path.curdir), job_id)
os.makedirs(out_dir, exist_ok=True)
LOGGER.info("Will store job [%s] output results in [%s]", job_id, out_dir)
# download outputs from body content
LOGGER.debug("%s outputs in results content.", "Processing" if len(outputs) else "No")
for output, value in outputs.items():
is_list = True
if not isinstance(value, list):
value = [value]
is_list = False
for i, item in enumerate(value):
if "href" in item:
file_path = fetch_file(item["href"], out_dir, link=False, auth=auth)
if is_list:
outputs[output][i]["path"] = file_path
outputs[output][i]["source"] = "body"
else:
outputs[output]["path"] = file_path
outputs[output]["source"] = "body"
# download links from headers
LOGGER.debug("%s outputs in results link headers.", "Processing" if len(out_links) else "No")
for _, link_header in ResponseHeaders(out_links).items():
link, params = link_header.split(";", 1)
href = link.strip("<>")
params = parse_kvp(params, multi_value_sep=None, accumulate_keys=False)
ctype = (params.get("type") or [None])[0]
rel = params["rel"][0].split(".")
output = rel[0]
is_array = len(rel) > 1 and str.isnumeric(rel[1])
file_path = fetch_file(href, out_dir, link=False, auth=auth)
value = {"href": href, "type": ctype, "path": file_path, "source": "link"}
if output in outputs:
if isinstance(outputs[output], dict): # in case 'rel="<output>.<index"' was not employed
outputs[output] = [outputs[output], value]
else:
outputs[output].append(value)
else:
outputs[output] = [value] if is_array else value
return outputs
[docs] def results(self,
job_reference, # type: str
out_dir=None, # type: Optional[str]
download=False, # type: bool
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Obtain the results of a successful :term:`Job` execution.
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param out_dir: Output directory where to store downloaded files if requested (default: CURDIR/JobID/<outputs>).
:param download: Download any file reference found within results (CAUTION: could transfer lots of data!).
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Result details and local paths if downloaded.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
# omit x-headers on purpose for 'status', assume they are intended for 'results' operation only
status = self.status(job_url, auth=auth)
if not status.success:
return OperationResult(False, "Cannot process results from incomplete or failed job.", status.body)
# use results endpoint instead of outputs to be OGC-API compliant, should be able to target non-Weaver instance
# with this endpoint, outputs IDs are directly at the root of the body
result_url = f"{job_url}/results"
LOGGER.info("Retrieving results from [%s]", result_url)
resp = self._request("GET", result_url, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
res_out = self._parse_result(resp, output_format=output_format,
with_links=with_links, with_headers=with_headers)
outputs = res_out.body
headers = res_out.headers
out_links = res_out.links(["Link"])
if not res_out.success or not (isinstance(res_out.body, dict) or len(out_links)): # pragma: no cover
return OperationResult(False, "Could not retrieve any output results from job.", outputs, headers)
if not download:
res_out.message = "Listing job results."
return res_out
outputs = self._download_references(outputs, out_links, out_dir, job_id, auth=auth)
# rebuild result with modified outputs that contains downloaded paths
result = OperationResult(True, "Retrieved job results.", outputs, headers, code=200)
return self._parse_result(result, body=outputs, output_format=output_format,
with_links=with_links, with_headers=with_headers, content_type=ContentType.APP_JSON)
[docs] def dismiss(self,
job_reference, # type: str
url=None, # type: Optional[str]
auth=None, # type: Optional[AuthHandler]
headers=None, # type: Optional[AnyHeadersContainer]
with_links=True, # type: bool
with_headers=False, # type: bool
output_format=None, # type: Optional[AnyOutputFormat]
): # type: (...) -> OperationResult
"""
Dismiss pending or running :term:`Job`, or clear result artifacts from a completed :term:`Job`.
:param job_reference: Either the full :term:`Job` status URL or only its UUID.
:param url: Instance URL if not already provided during client creation.
:param auth:
Instance authentication handler if not already created during client creation.
Should perform required adjustments to request to allow access control of protected contents.
:param headers:
Additional headers to employ when sending request.
Note that this can break functionalities if expected headers are overridden. Use with care.
:param with_links: Indicate if ``links`` section should be preserved in returned result body.
:param with_headers: Indicate if response headers should be returned in result output.
:param output_format: Select an alternate output representation of the result body contents.
:returns: Obtained result from the operation.
"""
job_id, job_url = self._parse_job_ref(job_reference, url)
LOGGER.debug("Dismissing job: [%s]", job_id)
resp = self._request("DELETE", job_url, headers=self._headers, x_headers=headers,
settings=self._settings, auth=auth)
return self._parse_result(resp, with_links=with_links, with_headers=with_headers, output_format=output_format)
[docs]def setup_logger_from_options(logger, args): # pragma: no cover
# type: (logging.Logger, argparse.Namespace) -> None
"""
Uses argument parser options to setup logging level from specified flags.
Setup both the specific CLI logger that is provided and the top-level package logger.
"""
if args.log_level:
logger.setLevel(logging.getLevelName(args.log_level.upper()))
elif args.quiet:
logger.setLevel(logging.ERROR)
elif args.verbose:
logger.setLevel(logging.INFO)
elif args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
setup_loggers({}, force_stdout=args.stdout)
if logger.name != __meta__.__name__:
setup_logger_from_options(logging.getLogger(__meta__.__name__), args)
[docs]def make_logging_options(parser):
# type: (argparse.ArgumentParser) -> None
"""
Defines argument parser options for logging operations.
"""
log_title = "Logging Arguments"
log_desc = "Options that configure output logging."
log_opts = parser.add_argument_group(title=log_title, description=log_desc)
log_opts.add_argument("--stdout", action="store_true", help="Enforce logging to stdout for display in console.")
log_opts.add_argument("--log", "--log-file", help="Output file to write generated logs.")
lvl_opts = log_opts.add_mutually_exclusive_group()
lvl_opts.title = log_title
lvl_opts.description = log_desc
lvl_opts.add_argument("--quiet", "-q", action="store_true", help="Do not output anything else than error.")
lvl_opts.add_argument("--debug", "-d", action="store_true", help="Enable extra debug logging.")
lvl_opts.add_argument("--verbose", "-v", action="store_true", help="Output informative logging details.")
lvl_names = ["DEBUG", "INFO", "WARN", "ERROR"]
lvl_opts.add_argument("--log-level", "-l", dest="log_level",
choices=list(sorted(lvl_names)), type=str.upper,
help="Explicit log level to employ (default: %(default)s, case-insensitive).")
[docs]def add_url_param(parser, required=True):
# type: (argparse.ArgumentParser, bool) -> None
parser.add_argument("-u", "--url", metavar="URL", help="URL of the instance to run operations.", required=required)
[docs]def add_shared_options(parser):
# type: (argparse.ArgumentParser) -> None
links_grp = parser.add_mutually_exclusive_group()
links_grp.add_argument("-nL", "--no-links", dest="with_links", action="store_false",
help="Remove \"links\" section from returned result body.")
links_grp.add_argument("-wL", "--with-links", dest="with_links", action="store_true", default=True,
help="Preserve \"links\" section from returned result body (default).")
headers_grp = parser.add_mutually_exclusive_group()
headers_grp.add_argument("-nH", "--no-headers", dest="with_headers", action="store_false", default=False,
help="Omit response headers, only returning the result body (default).")
headers_grp.add_argument("-wH", "--with-headers", dest="with_headers", action="store_true",
help="Return response headers additionally to the result body.")
parser.add_argument(
"-H", "--header", action=ValidateHeaderAction, nargs=1, dest="headers", metavar="HEADER",
help=(
"Additional headers to apply for sending requests toward the service. "
"This option can be provided multiple times, each with a value formatted as:\n\n'Header-Name: value'\n\n"
"Header names are case-insensitive. "
"Quotes can be used in the <value> portion to delimit it. "
"Surrounding spaces are trimmed. "
"Note that overridden headers expected by requests and the service could break some functionalities."
)
)
fmt_docs = "\n\n".join([
re.sub(r"\:[a-z]+\:\`([A-Za-z0-9_\-]+)\`", r"\1", f"{getattr(OutputFormat, fmt).upper()}: {doc}") # remove RST
for fmt, doc in sorted(OutputFormat.docs().items()) if doc
])
fmt_choices = [fmt.upper() for fmt in sorted(OutputFormat.values())]
parser.add_argument(
"-F", "--format", choices=fmt_choices, type=str.upper, dest="output_format",
help=(
f"Select an alternative output representation (default: {OutputFormat.JSON_STR.upper()}, case-insensitive)."
f"\n\n{fmt_docs}"
)
)
auth_grp = parser.add_argument_group(
title="Service Authentication Arguments",
description="Parameters to obtain access to a protected service using a request authentication handler."
)
auth_handlers = "".join([
fully_qualified_name(handler) + "\n"
for handler in [BasicAuthHandler, BearerAuthHandler, CookieAuthHandler]
])
auth_grp.add_argument(
"-aC", "--auth-class", "--auth-handler", dest="auth_handler", metavar="AUTH_HANDLER_CLASS",
action=ValidateAuthHandlerAction,
help=(
"Script or module path reference to class implementation to handle inline request authentication. "
"Format [path/to/script.py:module.AuthHandlerClass] or [installed.module.AuthHandlerClass] is expected.\n\n"
f"Utility definitions are available as:\n\n{auth_handlers}\n\n"
"Custom implementations are allowed for more advanced use cases."
)
)
auth_grp.add_argument(
"-aI", "--auth-identity", "--auth-username", dest="auth_identity", metavar="IDENTITY",
help="Authentication identity (or username) to be passed down to the specified Authentication Handler."
)
auth_grp.add_argument(
"-aP", "--auth-password", dest="auth_password", metavar="PASSWORD",
help="Authentication password to be passed down to the specified Authentication Handler."
)
auth_grp.add_argument(
"-aU", "--auth-url",
help="Authentication URL to be passed down to the specified Authentication Handler."
)
auth_grp.add_argument(
"-aM", "--auth-method", dest="auth_method", metavar="HTTP_METHOD",
action=ValidateMethodAction, choices=ValidateMethodAction.methods, type=str.upper,
default=AuthHandler.method,
help=(
"Authentication HTTP request method to be passed down to the specified Authentication Handler "
"(default: %(default)s, case-insensitive)."
)
)
auth_grp.add_argument(
"-aH", "--auth-header", action=ValidateHeaderAction, nargs=1, dest="auth_headers", metavar="HEADER",
help=(
"Additional headers to apply for sending requests when using the authentication handler. "
"This option can be provided multiple times, each with a value formatted as:\n\n'Header-Name: value'\n\n"
"Header names are case-insensitive. "
"Quotes can be used in the <value> portion to delimit it. "
"Surrounding spaces are trimmed."
)
)
[docs]def parse_auth(kwargs):
# type: (Dict[str, Union[Type[AuthHandler], str, None]]) -> Optional[AuthHandler]
"""
Parses arguments that can define an authentication handler and remove them from dictionary for following calls.
"""
auth_handler = kwargs.pop("auth_handler", None)
auth_identity = kwargs.pop("auth_identity", None)
auth_password = kwargs.pop("auth_password", None)
auth_url = kwargs.pop("auth_url", None)
auth_method = kwargs.pop("auth_method", None)
auth_headers = kwargs.pop("auth_headers", {})
if not (auth_handler and issubclass(auth_handler, (AuthHandler, AuthBase))):
return None
auth_handler_name = fully_qualified_name(auth_handler)
auth_sign = inspect.signature(auth_handler)
auth_opts = [
("username", auth_identity),
("identity", auth_identity),
("password", auth_password),
("url", auth_url),
("method", auth_method),
("headers", CaseInsensitiveDict(auth_headers)),
]
if len(auth_sign.parameters) == 0:
auth_handler = auth_handler()
for auth_param, auth_option in auth_opts:
if auth_option and hasattr(auth_handler, auth_param):
setattr(auth_handler, auth_param, auth_option)
else:
auth_params = list(auth_sign.parameters)
auth_kwargs = {opt: val for opt, val in auth_opts if opt in auth_params}
# allow partial match of required parameters by name to better support custom implementations
# (e.g.: 'MagpieAuth' using 'magpie_url' instead of plain 'url')
for param_name, param in auth_sign.parameters.items():
if param.kind not in [param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD]:
continue
if param_name not in auth_kwargs:
for opt, val in auth_opts:
if param_name.endswith(opt):
LOGGER.debug("Using authentication partial match: [%s] -> [%s]", opt, param_name)
auth_kwargs[param_name] = val
break
LOGGER.debug("Using authentication parameters: %s", auth_kwargs)
auth_handler = auth_handler(**auth_kwargs)
LOGGER.info("Will use specified Authentication Handler [%s] with provided options.", auth_handler_name)
return auth_handler
[docs]def add_process_param(parser, description=None, required=True):
# type: (argparse.ArgumentParser, Optional[str], bool) -> None
operation = parser.prog.split(" ")[-1]
parser.add_argument(
"-p", "--id", "--process", dest="process_id", required=required,
help=description if description else f"Identifier of the process to run {operation} operation."
)
[docs]def add_job_ref_param(parser):
# type: (argparse.ArgumentParser) -> None
operation = parser.prog.split(" ")[-1]
parser.add_argument(
"-j", "--job", dest="job_reference", required=True,
help=f"Job URL or UUID to run {operation} operation. "
"If full Job URL is provided, the instance ``--url`` parameter can be omitted."
)
[docs]def add_timeout_param(parser):
# type: (argparse.ArgumentParser) -> None
parser.add_argument(
"-T", "--timeout", dest="timeout", type=int, default=WeaverClient.monitor_timeout,
help="Wait timeout (seconds) of the maximum monitoring duration of the job execution (default: %(default)ss). "
"If this timeout is reached but job is still running, another call directly to the monitoring operation "
"can be done to resume monitoring. The job execution itself will not stop in case of timeout."
)
parser.add_argument(
"-W", "--wait", "--interval", dest="interval", type=int, default=WeaverClient.monitor_interval,
help="Wait interval (seconds) between each job status polling during monitoring (default: %(default)ss)."
)
[docs]def set_parser_sections(parser):
# type: (argparse.ArgumentParser) -> None
parser._optionals.title = OPTIONAL_ARGS_TITLE
parser._positionals.title = REQUIRED_ARGS_TITLE
[docs]class ValidateAuthHandlerAction(argparse.Action):
def __call__(self, parser, namespace, auth_handler_ref, option_string=None):
# type: (argparse.ArgumentParser, argparse.Namespace, Optional[str], Optional[str]) -> None
if not (auth_handler_ref and isinstance(auth_handler_ref, str)):
return None
auth_handler = import_target(auth_handler_ref)
if not auth_handler:
error = f"Could not resolve class reference to specified Authentication Handler: [{auth_handler_ref}]."
raise argparse.ArgumentError(self, error)
auth_handler_name = fully_qualified_name(auth_handler)
if not issubclass(auth_handler, (AuthHandler, AuthBase)):
error = (
f"Resolved Authentication Handler [{auth_handler_name}] is "
"not of appropriate sub-type: oneOf[AuthHandler, AuthBase]."
)
raise argparse.ArgumentError(self, error)
setattr(namespace, self.dest, auth_handler)
[docs]class ValidateMethodAction(argparse.Action):
[docs] methods = ["GET", "HEAD", "POST", "PUT", "DELETE"]
def __call__(self, parser, namespace, values, option_string=None):
# type: (argparse.ArgumentParser, argparse.Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None
if values not in self.methods:
allow = ", ".join(self.methods)
error = f"Value '{values}' is not a valid HTTP method, must be one of [{allow}]."
raise argparse.ArgumentError(self, error)
setattr(namespace, self.dest, values)
[docs]class SubArgumentParserFixedMutexGroups(argparse.ArgumentParser):
"""
Patch incorrectly handled mutually exclusive groups sections in subparsers.
.. seealso::
- https://bugs.python.org/issue43259
- https://bugs.python.org/issue16807
"""
[docs] def _add_container_actions(self, container):
# pylint: disable=W0212
groups = container._mutually_exclusive_groups
tmp_mutex_groups = container._mutually_exclusive_groups
container._mutually_exclusive_groups = []
super(SubArgumentParserFixedMutexGroups, self)._add_container_actions(container)
for group in groups:
# following is like calling 'add_mutually_exclusive_group' but avoids enforced '_MutuallyExclusiveGroup'
# use provided instance directly to preserve class implementation (an any added special handling)
self._mutually_exclusive_groups.append(group)
container._mutually_exclusive_groups = tmp_mutex_groups
[docs]class ArgumentParserFixedRequiredArgs(argparse.ArgumentParser):
"""
Override action grouping under 'required' section to consider explicit flag even if action has option prefix.
Default behaviour places option prefixed (``-``, ``--``) arguments into optionals even if ``required`` is defined.
Help string correctly considers this flag and doesn't place those arguments in brackets (``[--<optional-arg>]``).
"""
[docs] def _add_action(self, action):
if action.option_strings and not action.required:
self._optionals._add_action(action)
else:
self._positionals._add_action(action)
return action
[docs]class WeaverSubParserAction(argparse._SubParsersAction): # noqa
[docs] def add_parser(self, *args, **kwargs): # type: ignore
sub_parser = super(WeaverSubParserAction, self).add_parser(*args, **kwargs)
parser = getattr(self, "parser", None) # type: WeaverArgumentParser
sub_parser._conditional_groups = parser._conditional_groups
sub_parser._help_mode = parser._help_mode
return sub_parser
[docs]class WeaverArgumentParser(ArgumentParserFixedRequiredArgs, SubArgumentParserFixedMutexGroups):
"""
Parser that provides fixes for proper representation of `Weaver` :term:`CLI` arguments.
"""
def __init__(self, *args, **kwargs):
# type: (Any, Any) -> None
super(WeaverArgumentParser, self).__init__(*args, **kwargs)
self._help_mode = False
self._conditional_groups = []
[docs] def add_subparsers(self, *args, **kwargs): # type: ignore
self.register("action", "parsers", WeaverSubParserAction)
group = super(WeaverArgumentParser, self).add_subparsers(*args, **kwargs)
setattr(group, "parser", self)
return group
@property
[docs] def help_mode(self):
"""
Option enabled only during help formatting to generate different conditional evaluations.
"""
return self._help_mode
@help_mode.setter
def help_mode(self, mode):
if self._help_mode != mode:
self._help_mode = mode
for group, help_required, use_required in self._conditional_groups:
group.required = help_required if self._help_mode else use_required
[docs] def add_help_conditional(self, container, help_required=True, use_required=False):
# type: (argparse._ActionsContainer, bool, bool) -> argparse._ActionsContainer # noqa
setattr(container, "required", use_required)
self._conditional_groups.append((container, help_required, use_required))
return container
[docs]def make_parser():
# type: () -> argparse.ArgumentParser
"""
Generate the :term:`CLI` parser.
.. note::
Instead of employing :class:`argparse.ArgumentParser` instances returned
by :meth:`argparse._SubParsersAction.add_parser`, distinct :class:`argparse.ArgumentParser` instances are
created for each operation and then merged back by ourselves as subparsers under the main parser.
This provides more flexibility in arguments passed down and resolves, amongst other things, incorrect
handling of exclusive argument groups and their grouping under corresponding section titles.
"""
# generic logging parser to pass down to each operation
# this allows providing logging options to any of them
log_parser = WeaverArgumentParser(add_help=False)
make_logging_options(log_parser)
desc = f"Run {__meta__.__title__} operations."
parser = WeaverArgumentParser(prog=__meta__.__name__, description=desc, parents=[log_parser])
set_parser_sections(parser)
parser.add_argument(
"--version", "-V",
action="version",
version=f"%(prog)s {__meta__.__version__}",
help="Display the version of the package."
)
ops_parsers = parser.add_subparsers(
title="Operations", dest="operation",
description="Name of the operation to run."
)
op_deploy = WeaverArgumentParser(
"deploy",
description="Deploy a process.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_deploy)
add_url_param(op_deploy)
add_shared_options(op_deploy)
add_process_param(op_deploy, required=False, description=(
"Process identifier for deployment. If no ``--body`` is provided, this is required. "
"Otherwise, provided value overrides the corresponding ID in the body."
))
op_deploy.add_argument(
"-b", "--body", dest="body",
help="Deployment body directly provided. Allows both JSON and YAML format when using file reference. "
"If provided in combination with process ID or CWL, they will override the corresponding content. "
"Can be provided either with a local file, an URL or literal string contents formatted as JSON."
)
op_deploy_app_pkg = op_deploy.add_mutually_exclusive_group()
op_deploy_app_pkg.add_argument(
"--cwl", dest="cwl",
help="Application Package of the process defined using Common Workflow Language (CWL) as JSON or YAML "
"format when provided by file reference. File reference can be a local file or URL location. "
"Can also be provided as literal string contents formatted as JSON. "
"Provided contents will be inserted into an automatically generated request deploy body if none was "
"specified with ``--body`` option (note: ``--process`` must be specified instead in that case). "
"Otherwise, it will override the appropriate execution unit section within the provided deploy body."
)
op_deploy_app_pkg.add_argument(
"--wps", dest="wps",
help="Reference URL to a specific process under a Web Processing Service (WPS) to package as OGC-API Process."
)
docker_auth_title = "Docker Authentication Arguments"
docker_auth_desc = "Parameters to obtain access to a protected Docker registry to retrieve the referenced image."
op_deploy_group = op_deploy.add_argument_group(
title=docker_auth_title,
description=docker_auth_desc,
)
op_deploy_token = op_deploy_group.add_mutually_exclusive_group()
op_deploy_token.add_argument(
"-T", "--token", dest="token",
help="Authentication token to retrieve a Docker image reference from a protected registry during execution."
)
op_deploy_creds = op_deploy_token.add_argument_group(docker_auth_title, docker_auth_desc)
op_deploy_usr = op_deploy_creds.add_argument(
"-U", "--username", dest="username",
help="Username to compute the authentication token for Docker image retrieval from a protected registry."
)
op_deploy_group._group_actions.append(op_deploy_usr)
op_deploy_pwd = op_deploy_creds.add_argument(
"-P", "--password", dest="password",
help="Password to compute the authentication token for Docker image retrieval from a protected registry."
)
# when actions are evaluated for actual executions, conditional 'required' will consider them as options
# when actions are printed in help, they will be considered required, causing ( ) to be added to form the
# rendered group of *mutually required* arguments
parser.add_help_conditional(op_deploy_creds)
# following adjust references in order to make arguments appear within sections/groups as intended
op_deploy._mutually_exclusive_groups.append(op_deploy_creds) # type: ignore
op_deploy_group._group_actions.append(op_deploy_pwd)
op_deploy_token._group_actions.append(op_deploy_usr)
op_deploy_token._group_actions.append(op_deploy_pwd)
op_deploy.add_argument(
"-D", "--delete", "--undeploy", dest="undeploy", action="store_true",
help="Perform undeploy step as applicable prior to deployment to avoid conflict with exiting process."
)
op_undeploy = WeaverArgumentParser(
"undeploy",
description="Undeploy an existing process.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_undeploy)
add_url_param(op_undeploy)
add_shared_options(op_undeploy)
add_process_param(op_undeploy)
op_capabilities = WeaverArgumentParser(
"capabilities",
description="List available processes.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_capabilities)
add_url_param(op_capabilities)
add_shared_options(op_capabilities)
op_describe = WeaverArgumentParser(
"describe",
description="Obtain an existing process description.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_describe)
add_url_param(op_describe)
add_shared_options(op_describe)
add_process_param(op_describe)
op_describe.add_argument(
"-S", "--schema", dest="schema", choices=ProcessSchema.values(), type=str.upper, default=ProcessSchema.OGC,
help="Representation schema of the returned process description (default: %(default)s, case-insensitive)."
)
op_execute = WeaverArgumentParser(
"execute",
description="Submit a job execution for an existing process.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_execute)
add_url_param(op_execute)
add_shared_options(op_execute)
add_process_param(op_execute)
op_execute.add_argument(
"-I", "--inputs", dest="inputs",
required=True, nargs=1, action="append", # collect max 1 item per '-I', but allow many '-I'
# note: below is formatted using 'ParagraphFormatter' with detected paragraphs
help=inspect.cleandoc("""
Literal input definitions, or a file path or URL reference to JSON or YAML
contents defining job inputs with OGC-API or CWL schema. This parameter is required.
To provide inputs using a file reference, refer to relevant CWL Job schema or API request schema
for selected format. Both mapping and listing formats are supported.
To execute a process without any inputs (e.g.: using its defaults),
supply an explicit empty input (i.e.: ``-I ""`` or loaded from JSON/YAML file as ``{}``).
To provide inputs using literal command-line definitions, inputs should be specified using ``<id>=<value>``
convention, with distinct ``-I`` options for each applicable input value.
Values that require other type than string to be converted for job submission can include the data type
following the ID using a colon separator (i.e.: ``<id>:<type>=<value>``). For example, an integer could be
specified as follows: ``number:int=1`` while a floating point number would be: ``number:float=1.23``.
File references (``href``) should be specified using ``File`` as the type (i.e.: ``input:File=http://...``).
Note that ``File`` in this case is expected to be an URL location where the file can be download from.
When a local file is supplied, Weaver will automatically convert it to a remote Vault File in order to
upload it at the specified URL location and make it available for the remote process.
Inputs with multiplicity (``maxOccurs > 1``) can be specified using semicolon (``;``) separated values
after a single input ID. Note that this is not the same as an single-value array-like input, which should
use comma (``,``) separated values instead.
The type of an element-wise item of this input can also be provided (i.e.: ``multiInput:int=1;2;3``).
Alternatively, the same input ID can be repeated over many ``-I`` options each providing an element of the
multi-value input to be formed (i.e.: ``-I multiInput=1 -I multiInput=2 -I multiInput=3``).
Additional parameters can be specified following any ``<value>`` using any amount of ``@<param>=<info>``
specifiers. Those will be added to the inputs body submitted for execution. This can be used, amongst other
things, to provide a file's ``mediaType`` or ``encoding`` details. When using multi-value inputs, each item
value can take ``@`` parameters independently with distinct properties.
Any value that contains special separator characters (``:;@``) to be used as literal entries
must be URL-encoded (``%%XX``) to avoid invalid parsing.
Example: ``-I message='Hello Weaver' -I value:int=1234 -I file:File=data.xml@mediaType=text/xml``
""")
)
# FIXME: allow filtering 'outputs' (https://github.com/crim-ca/weaver/issues/380)
# Only specified ones are returned, if none specified, return all.
# op_execute.add_argument(
# "-O", "--output",
op_execute.add_argument(
"-R", "--ref", "--reference", metavar="REFERENCE", dest="output_refs", action="append",
help=inspect.cleandoc("""
Indicates which outputs by ID to be returned as HTTP Link header reference instead of body content value.
This defines the output transmission mode when submitting the execution request.
With reference transmission mode,
outputs that contain literal data will be linked by ``text/plain`` file containing the data.
Outputs that refer to a file reference will simply contain that URL reference as link.
With value transmission mode (default behavior when outputs are not specified in this list), outputs are
returned as direct values (literal or href) within the response content body.
When requesting any output to be returned by reference, option ``-H/--headers`` should be considered as
well to return the provided ``Link`` headers for these outputs on the command line.
Example: ``-R output-one -R output-two``
""")
)
op_execute.add_argument(
"-M", "--monitor", dest="monitor", action="store_true",
help="Automatically perform the monitoring operation following job submission to retrieve final results. "
"If not requested, the created job status location is directly returned."
)
add_timeout_param(op_execute)
op_jobs = WeaverArgumentParser(
"jobs",
description="Obtain listing of registered jobs.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_jobs)
add_url_param(op_jobs, required=True)
add_shared_options(op_jobs)
op_jobs.add_argument(
"-P", "--page", dest="page", type=int,
help="Specify the paging index for listing jobs."
)
op_jobs.add_argument(
"-N", "--number", "--limit", dest="limit", type=int,
help="Specify the amount of jobs to list per page."
)
op_jobs.add_argument(
"-S", "--status", dest="status", choices=Status.values(), type=str.lower,
help="Filter job listing only to matching status."
)
op_jobs.add_argument(
"-D", "--detail", dest="detail", action="store_true",
help="Obtain detailed job descriptions."
)
op_jobs.add_argument(
"-G", "--groups", dest="groups", action="store_true",
help="Obtain grouped representation of jobs per provider and process categories."
)
op_dismiss = WeaverArgumentParser(
"dismiss",
description="Dismiss a pending or running job, or wipe any finished job results.",
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_dismiss)
add_url_param(op_dismiss, required=False)
add_job_ref_param(op_dismiss)
add_shared_options(op_dismiss)
op_monitor = WeaverArgumentParser(
"monitor",
description="Monitor a pending or running job execution until completion or up to a maximum wait time.",
formatter_class=ParagraphFormatter,
)
add_url_param(op_monitor, required=False)
add_job_ref_param(op_monitor)
add_timeout_param(op_monitor)
add_shared_options(op_monitor)
op_status = WeaverArgumentParser(
"status",
description=(
"Obtain the status of a job using a reference UUID or URL. "
"This is equivalent to doing a single-shot 'monitor' operation without any pooling or retries."
),
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_status)
add_url_param(op_status, required=False)
add_job_ref_param(op_status)
add_shared_options(op_status)
op_results = WeaverArgumentParser(
"results",
description=(
"Obtain the output results from a job successfully executed. "
"This operation can also download them from the remote server if requested."
),
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_results)
add_url_param(op_results, required=False)
add_job_ref_param(op_results)
add_shared_options(op_results)
op_results.add_argument(
"-D", "--download", dest="download", action="store_true",
help="Download all found job results file references to output location. "
"If not requested, the operation simply displays the job results (default: %(default)s)."
)
op_results.add_argument(
"-O", "--outdir", dest="out_dir",
help="Output directory where to store downloaded files from job results if requested "
"(default: ${CURDIR}/{JobID}/<outputs.files>)."
)
op_upload = WeaverArgumentParser(
"upload",
description=(
"Upload a local file to the remote server vault for reference in process execution inputs. "
"This operation is accomplished automatically for all execution inputs submitted using local files. "
"[note: feature only available for Weaver instances]"
),
formatter_class=ParagraphFormatter,
)
set_parser_sections(op_upload)
add_url_param(op_upload, required=True)
add_shared_options(op_upload)
op_upload.add_argument(
"-c", "--content-type", dest="content_type",
help="Content-Type of the file to apply. "
"This should be an IANA Media-Type, optionally with additional parameters such as charset. "
"If not provided, attempts to guess it based on the file extension."
)
op_upload.add_argument(
"-f", "--file", dest="file_path", metavar="FILE", required=True,
help="Local file path to upload to the vault."
)
operations = [
op_deploy,
op_undeploy,
op_capabilities,
op_describe,
op_execute,
op_jobs,
op_monitor,
op_dismiss,
op_status,
op_results,
op_upload,
]
aliases = {
"processes": op_capabilities
}
for op_parser in operations:
op_aliases = [alias for alias, op_alias in aliases.items() if op_alias is op_parser]
# add help disabled otherwise conflicts with main parser help
sub_op_parser = ops_parsers.add_parser(
op_parser.prog, aliases=op_aliases, parents=[log_parser, op_parser],
add_help=False, help=op_parser.description,
formatter_class=op_parser.formatter_class,
description=op_parser.description, usage=op_parser.usage
)
set_parser_sections(sub_op_parser)
return parser
[docs]def main(*args):
# type: (*str) -> int
parser = make_parser()
ns = parser.parse_args(args=args or None)
setup_logger_from_options(LOGGER, ns)
kwargs = vars(ns)
# remove logging params not known by operations
for param in ["stdout", "log", "log_level", "quiet", "debug", "verbose"]:
kwargs.pop(param, None)
oper = kwargs.pop("operation", None)
LOGGER.debug("Requested operation: [%s]", oper)
if not oper or oper not in dir(WeaverClient):
parser.print_help()
return 0
url = kwargs.pop("url", None)
auth = parse_auth(kwargs)
client = WeaverClient(url, auth=auth)
try:
result = getattr(client, oper)(**kwargs)
except Exception as exc:
msg = "Operation failed due to exception."
err = fully_qualified_name(exc)
result = OperationResult(False, message=msg, body={"message": msg, "cause": str(exc), "error": err})
if result.success:
LOGGER.info("%s successful. %s\n", oper.title(), result.message)
print(result.text) # use print in case logger disabled or level error/warn
return 0
LOGGER.error("%s failed. %s\n---\nStatus Code: %s\n---", oper.title(), result.message, result.code)
print(result.text)
return -1
if __name__ == "__main__":
sys.exit(main())