import logging
import warnings
from typing import TYPE_CHECKING
from pyramid.httpexceptions import HTTPConflict, HTTPForbidden, HTTPNotFound, HTTPOk, HTTPUnauthorized
from pyramid.settings import asbool
from weaver.exceptions import PackageExecutionError
from weaver.formats import ContentType
from weaver.processes import opensearch
from weaver.processes.sources import get_data_source_from_url, retrieve_data_source_url
from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase, RemoteJobProgress
from weaver.status import Status
from weaver.utils import pass_http_error, request_extra
from weaver.visibility import Visibility
from weaver.warning import MissingParameterWarning
from weaver.wps_restapi import swagger_definitions as sd
if TYPE_CHECKING:
from typing import Optional, Tuple, Union
from weaver.typedefs import (
AnyHeadersContainer,
CWL,
CWL_ExpectedOutputs,
CWL_RuntimeInputsMap,
HeadersType,
JSON,
UpdateStatusPartialFunction
)
from weaver.wps.service import WorkerRequest
[docs]
LOGGER = logging.getLogger(__name__)
[docs]
class Wps3RemoteJobProgress(RemoteJobProgress):
[docs]
class Wps3Process(OGCAPIRemoteProcessBase):
"""
Remote or local :term:`Process` with :term:`ADES` capabilities, based on :term:`OGC API - Processes` requests.
If a referenced remote service supports :term:`Process` deployment using an :term:`Application Package`, and
that inputs point to a resolvable :term:`Data Source`, the execution will be dispatched to that remote location.
Otherwise, the :term:`Process` is executed locally.
Most of the core operations are handled by :class:`OGCAPIRemoteProcessBase` since request are sent to another
:term:`ADES` instance, or `Weaver` itself for :term:`Workflow` steps, both of which are :term:`OGC API - Processes`.
Additional operations revolve around the resolution of which remote :term:`ADES` to dispatch based on any detected
:term:`Data Source` location.
.. seealso::
- :class:`weaver.processes.wps_process_base.OGCAPIRemoteProcessBase`
"""
[docs]
process_type = "WPS-3" # ADES, EMS or HYBRID (local Application or Workflow)
def __init__(
self,
step_payload, # type: JSON
job_order, # type: CWL_RuntimeInputsMap
process, # type: str
request, # type: Optional[WorkerRequest]
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
super(Wps3Process, self).__init__(
step_payload,
process,
request,
lambda _message, _progress, _status, *args, **kwargs: update_status(
_message, _progress, _status, self.provider or "local", *args, **kwargs
)
)
self.provider, self.url, self.deploy_body = self.resolve_data_source(step_payload, job_order)
[docs]
def resolve_data_source(self, step_payload, job_order):
# type: (CWL, CWL_RuntimeInputsMap) -> Tuple[str, str, JSON]
try:
# Presume that all EOImage given as input can be resolved to the same ADES
# So if we got multiple inputs or multiple values for an input, we take the first one as reference
eodata_inputs = opensearch.get_eo_images_ids_from_payload(step_payload)
data_url = "" # data_source will be set to the default ADES if no EOImages (anything but `None`)
if eodata_inputs:
step_payload = opensearch.alter_payload_after_query(step_payload)
value = job_order[eodata_inputs[0]]
if isinstance(value, list):
value = value[0] # Use the first value to determine the data source
data_url = value["location"]
reason = f"(ADES based on {data_url})"
else:
reason = "(No EOImage -> Default ADES)"
data_source = get_data_source_from_url(data_url)
deploy_body = step_payload
url = retrieve_data_source_url(data_source)
except (IndexError, KeyError) as exc:
LOGGER.error("Error during %s process data source resolution: [%s]", self.process_type, exc, exc_info=exc)
raise PackageExecutionError(f"Failed resolution of {self.process_type} process data source: [{exc!r}]")
self.provider = data_source # fix immediately for below `update_status` call
self.update_status(f"Provider [{data_source}] is selected {reason}.",
Wps3RemoteJobProgress.SETUP, Status.RUNNING)
return data_source, url, deploy_body
[docs]
def is_deployed(self):
return self.describe_process() is not None
[docs]
def is_visible(self):
# type: (...) -> Union[bool, None]
"""
Gets the process visibility.
:returns:
True/False correspondingly for public/private if visibility is retrievable,
False if authorized access but process cannot be found,
None if forbidden access.
"""
LOGGER.debug("Get process %s visibility request for [%s]", self.process_type, self.process)
response = self.make_request(method="GET",
url=self.url + sd.process_visibility_service.path.format(process_id=self.process),
retry=False)
if response.status_code in (HTTPUnauthorized.code, HTTPForbidden.code):
return None
if response.status_code == HTTPNotFound.code:
return False
if response.status_code == HTTPOk.code:
json_body = response.json()
return json_body.get("value") == Visibility.PUBLIC
response.raise_for_status()
[docs]
def set_visibility(self, visibility):
self.update_status("Updating process visibility on remote ADES.",
Wps3RemoteJobProgress.VISIBLE, Status.RUNNING)
path = self.url + sd.process_visibility_service.path.format(process_id=self.process)
LOGGER.debug("Update process %s visibility request for [%s] at [%s]", self.process_type, self.process, path)
response = self.make_request(method="PUT",
url=path,
json={"value": visibility},
retry=False,
swap_error_status_code=HTTPOk.code)
response.raise_for_status()
[docs]
def describe_process(self):
path = self.url + sd.process_service.path.format(process_id=self.process)
LOGGER.debug("Describe process %s request for [%s] at [%s]", self.process_type, self.process, path)
response = self.make_request(method="GET",
url=path,
retry=False,
swap_error_status_code=HTTPOk.code)
if response.status_code == HTTPOk.code:
return response.json()
elif response.status_code == HTTPNotFound.code:
return None
response.raise_for_status()
[docs]
def deploy(self):
self.update_status("Deploying process on remote ADES.",
Wps3RemoteJobProgress.DEPLOY, Status.RUNNING)
path = self.url + sd.processes_service.path
LOGGER.debug("Deploy process %s request for [%s] at [%s]", self.process_type, self.process, path)
response = self.make_request(method="POST", url=path, json=self.deploy_body, retry=True)
response.raise_for_status()
[docs]
def prepare(self, workflow_inputs, expected_outputs):
# type: (CWL_RuntimeInputsMap, CWL_ExpectedOutputs) -> None
visible = self.is_visible()
if not visible: # includes private visibility and non-existing cases
if visible is None:
LOGGER.info("Process [%s] access is unauthorized on [%s] - deploying as admin.", self.process, self.url)
elif visible is False:
LOGGER.info("Process [%s] is not deployed on [%s] - deploying.", self.process, self.url)
# TODO: Maybe always redeploy? What about cases of outdated deployed process?
try:
self.deploy()
except Exception as exc:
pass_http_error(exc, [HTTPConflict])
if visible:
LOGGER.info("Process [%s] already deployed and visible on [%s] - executing.", self.process, self.url)
else:
LOGGER.info("Process [%s] enforcing to public visibility.", self.process)
try:
self.set_visibility(visibility=Visibility.PUBLIC)
except Exception as exc:
pass_http_error(exc, HTTPNotFound)
LOGGER.warning("Process [%s] failed setting public visibility. "
"Assuming feature is not supported by ADES and process is already public.", self.process)