import logging
from time import sleep
from typing import TYPE_CHECKING
from owslib.wps import ComplexDataInput
from requests.exceptions import HTTPError
from weaver import xml_util
from weaver.execute import ExecuteMode
from weaver.formats import get_format
from weaver.owsexceptions import OWSNoApplicableCode
from weaver.processes.constants import WPS_COMPLEX_DATA
from weaver.processes.convert import DEFAULT_FORMAT, ows2json_output_data
from weaver.processes.utils import map_progress
from weaver.processes.wps_process_base import RemoteJobProgress, WpsProcessInterface
from weaver.status import Status, map_status
from weaver.utils import (
bytes2str,
get_any_id,
get_any_value,
get_job_log_msg,
get_log_monitor_msg,
raise_on_xml_exception,
retry_on_condition,
wait_secs
)
from weaver.wps.utils import check_wps_status, get_exception_from_xml_status, get_wps_client
if TYPE_CHECKING:
from typing import Optional
from owslib.wps import WebProcessingService
from weaver.typedefs import (
CWL_RuntimeInputList,
JobExecution,
JobInputs,
JobOutputs,
JobResults,
OWS_InputDataValues,
ProcessOWS,
UpdateStatusPartialFunction
)
from weaver.wps.service import WorkerRequest
[docs]LOGGER = logging.getLogger(__name__)
[docs]class Wps1RemoteJobProgress(RemoteJobProgress):
pass
[docs]class Wps1Process(WpsProcessInterface):
def __init__(self,
provider, # type: str
process, # type: str
request, # type: WorkerRequest
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
self.provider = provider
self.process = process
# following are defined after 'prepare' step
self.wps_provider = None # type: Optional[WebProcessingService]
self.wps_process = None # type: Optional[ProcessOWS]
super(Wps1Process, self).__init__(
request,
lambda _message, _progress, _status, *args, **kwargs: update_status(
_message, _progress, _status, self.provider, *args, **kwargs
)
)
[docs] def prepare(self):
# type: () -> None
LOGGER.debug("Execute WPS-1 provider: [%s]", self.provider)
LOGGER.debug("Execute WPS-1 process: [%s]", self.process)
try:
headers = {}
headers.update(self.get_auth_cookies())
headers.update(self.get_auth_headers())
self.wps_provider = get_wps_client(self.provider, headers=headers)
raise_on_xml_exception(self.wps_provider._capabilities) # noqa: W0212
except Exception as ex:
raise OWSNoApplicableCode(f"Failed to retrieve WPS capabilities. Error: [{ex!s}].")
try:
self.wps_process = self.wps_provider.describeprocess(self.process)
except Exception as ex:
raise OWSNoApplicableCode(f"Failed to retrieve WPS process description. Error: [{ex!s}].")
[docs] def dispatch(self, process_inputs, process_outputs):
# type: (JobInputs, JobOutputs) -> JobExecution
wps_outputs = [(output["id"], output["as_ref"]) for output in process_outputs]
# some WPS servers sometime have trouble executing the process (unhandled internal server errors due to DB)
# perform retry attempts if possible to silently ignore those error cases and return a successful run
execution = retry_on_condition(
self.wps_provider.execute,
# wps params
self.process,
inputs=process_inputs,
output=wps_outputs,
mode=ExecuteMode.ASYNC,
lineage=True,
# retry params
condition=lambda exc: isinstance(exc, HTTPError) and exc.response.status_code == 500,
retries=5,
)
if not execution.process and execution.errors:
raise execution.errors[0]
return {"execution": execution} # return a dict to allow update by reference
[docs] def monitor(self, monitor_reference):
# type: (JobExecution) -> bool
execution = monitor_reference["execution"]
max_retries = 20 # using 'wait_secs' incremental delays, this is ~3min of retry attempts
num_retries = 0
run_step = 0
job_id = "<undefined>"
log_progress = Wps1RemoteJobProgress.MONITORING
while execution.isNotComplete() or run_step == 0:
if num_retries >= max_retries:
raise Exception(f"Could not read status document after {max_retries} retries. Giving up.")
try:
execution = check_wps_status(location=execution.statusLocation,
sleep_secs=wait_secs(run_step), settings=self.settings)
monitor_reference["execution"] = execution # update reference for later stages
job_id = execution.statusLocation.split("/")[-1].replace(".xml", "")
exec_status = map_status(execution.getStatus())
LOGGER.debug(get_log_monitor_msg(job_id,
exec_status,
execution.percentCompleted,
execution.statusMessage,
execution.statusLocation))
log_msg = get_job_log_msg(status=exec_status,
message=execution.statusMessage,
progress=execution.percentCompleted,
duration=None) # get if available
log_progress = map_progress(execution.percentCompleted,
Wps1RemoteJobProgress.MONITORING,
Wps1RemoteJobProgress.RESULTS)
self.update_status(log_msg, log_progress, Status.RUNNING)
except Exception as exc:
num_retries += 1
LOGGER.debug("Exception raised: %r", exc)
sleep(1)
else:
num_retries = 0
run_step += 1
if not execution.isSucceded():
exec_msg = execution.statusMessage or "Job failed."
exec_status = map_status(execution.getStatus())
exec_status_url = execution.statusLocation
LOGGER.debug(get_log_monitor_msg(job_id,
exec_status,
execution.percentCompleted,
exec_msg,
exec_status_url))
# provide more details in logs of parent job process about the cause of the failing remote execution
xml_err = bytes2str(xml_util.tostring(execution.response))
xml_exc = get_exception_from_xml_status(execution.response)
self.update_status(
f"Retrieved error status response from WPS remote provider on [{exec_status_url}]:\n{xml_err}\n",
log_progress, Status.FAILED, error=xml_exc
)
return False
return True
[docs] def get_results(self, monitor_reference):
# type: (JobExecution) -> JobResults
self.update_status("Retrieving job output definitions from remote WPS-1 provider.",
Wps1RemoteJobProgress.RESULTS, Status.RUNNING)
execution = monitor_reference["execution"]
ows_results = [
ows2json_output_data(output, self.wps_process, self.settings)
for output in execution.processOutputs
]
return ows_results