import logging
import warnings
from time import sleep
from typing import TYPE_CHECKING
from pyramid.httpexceptions import HTTPConflict, HTTPForbidden, HTTPNotFound, HTTPOk, HTTPUnauthorized
from pyramid.settings import asbool
from weaver.exceptions import PackageExecutionError
from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType, repr_json
from weaver.processes import opensearch
from weaver.processes.sources import get_data_source_from_url, retrieve_data_source_url
from weaver.processes.utils import map_progress
from weaver.processes.wps_process_base import WpsProcessInterface, WpsRemoteJobProgress
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.utils import (
get_any_id,
get_any_message,
get_any_value,
get_job_log_msg,
get_log_monitor_msg,
pass_http_error,
request_extra
)
from weaver.visibility import Visibility
from weaver.warning import MissingParameterWarning
from weaver.wps_restapi import swagger_definitions as sd
if TYPE_CHECKING:
from typing import Any, Union
from weaver.typedefs import (
AnyHeadersContainer,
JSON,
JobInputs,
JobMonitorReference,
JobOutputs,
JobResults,
UpdateStatusPartialFunction
)
from weaver.wps.service import WorkerRequest
[docs]LOGGER = logging.getLogger(__name__)
[docs]class Wps3RemoteJobProgress(WpsRemoteJobProgress):
[docs]class Wps3Process(WpsProcessInterface):
def __init__(self,
step_payload, # type: JSON
joborder, # type: JSON
process, # type: str
request, # type: WorkerRequest
update_status, # type: UpdateStatusPartialFunction
):
super(Wps3Process, self).__init__(
request,
lambda _message, _progress, _status: update_status(_message, _progress, _status, self.provider or "local")
)
self.provider, self.url, self.deploy_body = self.resolve_data_source(step_payload, joborder)
self.process = process
[docs] def resolve_data_source(self, step_payload, joborder):
try:
# Presume that all EOImage given as input can be resolved to the same ADES
# So if we got multiple inputs or multiple values for an input, we take the first one as reference
eodata_inputs = opensearch.get_eo_images_ids_from_payload(step_payload)
data_url = "" # data_source will be set to the default ADES if no EOImages (anything but `None`)
if eodata_inputs:
step_payload = opensearch.alter_payload_after_query(step_payload)
value = joborder[eodata_inputs[0]]
if isinstance(value, list):
value = value[0] # Use the first value to determine the data source
data_url = value["location"]
reason = f"(ADES based on {data_url})"
else:
reason = "(No EOImage -> Default ADES)"
data_source = get_data_source_from_url(data_url)
deploy_body = step_payload
url = retrieve_data_source_url(data_source)
except (IndexError, KeyError) as exc:
LOGGER.error("Error during WPS-3 process data source resolution: [%s]", exc, exc_info=exc)
raise PackageExecutionError(f"Failed resolution of WPS-3 process data source: [{exc!r}]")
self.provider = data_source # fix immediately for below `update_status` call
self.update_status(f"Provider {data_source} is selected {reason}.",
Wps3RemoteJobProgress.PROVIDER, Status.RUNNING)
return data_source, url, deploy_body
[docs] def is_deployed(self):
return self.describe_process() is not None
[docs] def is_visible(self):
# type: (...) -> Union[bool, None]
"""
Gets the process visibility.
:returns:
True/False correspondingly for public/private if visibility is retrievable,
False if authorized access but process cannot be found,
None if forbidden access.
"""
LOGGER.debug("Get process WPS visibility request for [%s]", self.process)
response = self.make_request(method="GET",
url=self.url + sd.process_visibility_service.path.format(process_id=self.process),
retry=False)
if response.status_code in (HTTPUnauthorized.code, HTTPForbidden.code):
return None
if response.status_code == HTTPNotFound.code:
return False
if response.status_code == HTTPOk.code:
json_body = response.json()
return json_body.get("value") == Visibility.PUBLIC
response.raise_for_status()
[docs] def set_visibility(self, visibility):
self.update_status("Updating process visibility on remote ADES.",
Wps3RemoteJobProgress.VISIBLE, Status.RUNNING)
path = self.url + sd.process_visibility_service.path.format(process_id=self.process)
LOGGER.debug("Update process WPS visibility request for [%s] at [%s]", self.process, path)
response = self.make_request(method="PUT",
url=path,
json={"value": visibility},
retry=False,
swap_error_status_code=HTTPOk.code)
response.raise_for_status()
[docs] def describe_process(self):
path = self.url + sd.process_service.path.format(process_id=self.process)
LOGGER.debug("Describe process WPS request for [%s] at [%s]", self.process, path)
response = self.make_request(method="GET",
url=path,
retry=False,
swap_error_status_code=HTTPOk.code)
if response.status_code == HTTPOk.code:
return response.json()
elif response.status_code == HTTPNotFound.code:
return None
response.raise_for_status()
[docs] def deploy(self):
self.update_status("Deploying process on remote ADES.",
Wps3RemoteJobProgress.DEPLOY, Status.RUNNING)
path = self.url + sd.processes_service.path
LOGGER.debug("Deploy process WPS request for [%s] at [%s]", self.process, path)
response = self.make_request(method="POST", url=path, json=self.deploy_body, retry=True)
response.raise_for_status()
[docs] def prepare(self):
visible = self.is_visible()
if not visible: # includes private visibility and non-existing cases
if visible is None:
LOGGER.info("Process [%s] access is unauthorized on [%s] - deploying as admin.", self.process, self.url)
elif visible is False:
LOGGER.info("Process [%s] is not deployed on [%s] - deploying.", self.process, self.url)
# TODO: Maybe always redeploy? What about cases of outdated deployed process?
try:
self.deploy()
except Exception as exc:
pass_http_error(exc, [HTTPConflict])
if visible:
LOGGER.info("Process [%s] already deployed and visible on [%s] - executing.", self.process, self.url)
else:
LOGGER.info("Process [%s] enforcing to public visibility.", self.process)
try:
self.set_visibility(visibility=Visibility.PUBLIC)
except Exception as exc:
pass_http_error(exc, HTTPNotFound)
LOGGER.warning("Process [%s] failed setting public visibility. "
"Assuming feature is not supported by ADES and process is already public.", self.process)
[docs] def dispatch(self, process_inputs, process_outputs):
# type: (JobInputs, JobOutputs) -> Any
LOGGER.debug("Execute process WPS request for [%s]", self.process)
execute_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": process_inputs,
"outputs": process_outputs
}
LOGGER.debug("Execute process WPS body for [%s]:\n%s", self.process, repr_json(execute_body))
request_url = self.url + sd.process_jobs_service.path.format(process_id=self.process)
response = self.make_request(method="POST", url=request_url, json=execute_body, retry=True)
if response.status_code != 201:
LOGGER.error("Request [POST %s] failed with: [%s]", request_url, response.status_code)
raise Exception(f"Was expecting a 201 status code from the execute request : {request_url}")
job_status_uri = response.headers["Location"]
return job_status_uri
[docs] def monitor(self, monitor_reference):
# type: (str) -> bool
job_status_uri = monitor_reference
job_status_data = self.get_job_status(job_status_uri)
job_status_value = map_status(job_status_data["status"])
job_id = job_status_data["jobID"]
self.update_status(f"Monitoring job on remote ADES : {job_status_uri}",
Wps3RemoteJobProgress.MONITORING, Status.RUNNING)
while job_status_value not in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]:
sleep(5)
job_status_data = self.get_job_status(job_status_uri)
job_status_value = map_status(job_status_data["status"])
LOGGER.debug(get_log_monitor_msg(job_id, job_status_value,
job_status_data.get("percentCompleted", 0),
get_any_message(job_status_data), job_status_data.get("statusLocation")))
self.update_status(get_job_log_msg(status=job_status_value,
message=get_any_message(job_status_data),
progress=job_status_data.get("percentCompleted", 0),
duration=job_status_data.get("duration", None)), # get if available
map_progress(job_status_data.get("percentCompleted", 0),
Wps3RemoteJobProgress.MONITORING, Wps3RemoteJobProgress.FETCH_OUT),
Status.RUNNING)
if job_status_value != Status.SUCCEEDED:
LOGGER.debug(get_log_monitor_msg(job_id, job_status_value,
job_status_data.get("percentCompleted", 0),
get_any_message(job_status_data), job_status_data.get("statusLocation")))
raise PackageExecutionError(job_status_data)
return True
[docs] def get_job_status(self, job_status_uri, retry=True):
# type: (JobMonitorReference, Union[bool, int]) -> JSON
"""
Obtains the contents from the :term:`Job` status response.
"""
response = self.make_request(method="GET", url=job_status_uri, retry=retry) # retry in case not yet ready
response.raise_for_status()
job_status = response.json()
job_id = job_status_uri.split("/")[-1]
if "jobID" not in job_status:
job_status["jobID"] = job_id # provide if not implemented by ADES
job_status["status"] = map_status(job_status["status"])
return job_status
[docs] def get_results(self, monitor_reference):
# type: (str) -> JobResults
"""
Obtains produced output results from successful job status ID.
"""
# use '/results' endpoint instead of '/outputs' to ensure support with other
result_url = monitor_reference + "/results"
response = self.make_request(method="GET", url=result_url, retry=True)
response.raise_for_status()
contents = response.json()
# backward compatibility for ADES that returns output IDs nested under 'outputs'
if "outputs" in contents:
# ensure that we don't incorrectly pick a specific output ID named 'outputs'
maybe_outputs = contents["outputs"]
if isinstance(maybe_outputs, dict) and get_any_id(maybe_outputs) is None:
contents = maybe_outputs
# backward compatibility for ADES that returns list of outputs nested under 'outputs'
# (i.e.: as Weaver-specific '/outputs' endpoint)
elif isinstance(maybe_outputs, list) and all(get_any_id(out) is not None for out in maybe_outputs):
contents = maybe_outputs
# rebuild the expected (old) list format for calling method
if isinstance(contents, dict) and all(get_any_value(out) is not None for out in contents.values()):
outputs = []
for out_id, out_val in contents.items():
out_val.update({"id": out_id})
outputs.append(out_val)
contents = outputs
return contents