import logging
from time import sleep
from typing import TYPE_CHECKING
from owslib.wps import ComplexDataInput
from weaver import status
from weaver.execute import EXECUTE_MODE_ASYNC
from weaver.formats import CONTENT_TYPE_TEXT_PLAIN, get_extension, get_format
from weaver.owsexceptions import OWSNoApplicableCode
from weaver.processes.constants import WPS_COMPLEX_DATA
from weaver.processes.convert import get_field, ows2json_output_data
from weaver.processes.utils import map_progress
from weaver.processes.wps_process_base import WpsProcessInterface
from weaver.utils import (
get_any_id,
get_any_value,
get_job_log_msg,
get_log_monitor_msg,
raise_on_xml_exception,
request_extra,
wait_secs
)
from weaver.wps.utils import check_wps_status, get_wps_client
if TYPE_CHECKING:
from pywps.app import WPSRequest
from weaver.typedefs import UpdateStatusPartialFunction
[docs]LOGGER = logging.getLogger(__name__)
[docs]REMOTE_JOB_PROGRESS_REQ_PREP = 2
[docs]REMOTE_JOB_PROGRESS_EXECUTION = 5
[docs]REMOTE_JOB_PROGRESS_MONITORING = 10
[docs]REMOTE_JOB_PROGRESS_FETCH_OUT = 90
[docs]REMOTE_JOB_PROGRESS_COMPLETED = 100
[docs]class Wps1Process(WpsProcessInterface):
def __init__(self,
provider, # type: str
process, # type: str
request, # type: WPSRequest
update_status, # type: UpdateStatusPartialFunction
):
super(Wps1Process, self).__init__(request)
self.provider = provider
self.process = process
self.update_status = lambda _message, _progress, _status: update_status(
self.provider, _message, _progress, _status)
[docs] def execute(self, workflow_inputs, out_dir, expected_outputs):
self.update_status("Preparing execute request for remote WPS1 provider.",
REMOTE_JOB_PROGRESS_REQ_PREP, status.STATUS_RUNNING)
LOGGER.debug("Execute process WPS request for %s", self.process)
try:
try:
wps = get_wps_client(self.provider, headers=self.cookies)
raise_on_xml_exception(wps._capabilities) # noqa: W0212
except Exception as ex:
raise OWSNoApplicableCode("Failed to retrieve WPS capabilities. Error: [{}].".format(str(ex)))
try:
process = wps.describeprocess(self.process)
except Exception as ex:
raise OWSNoApplicableCode("Failed to retrieve WPS process description. Error: [{}].".format(str(ex)))
# prepare inputs
complex_inputs = []
for process_input in process.dataInputs:
if WPS_COMPLEX_DATA in process_input.dataType:
complex_inputs.append(process_input.identifier)
# remove any 'null' input, should employ the 'default' of the remote WPS process
inputs_provided_keys = filter(lambda i: workflow_inputs[i] != "null", workflow_inputs)
wps_inputs = []
for input_key in inputs_provided_keys:
input_val = workflow_inputs[input_key]
# in case of array inputs, must repeat (id,value)
# in case of complex input (File), obtain location, otherwise get data value
if not isinstance(input_val, list):
input_val = [input_val]
input_values = []
for val in input_val:
mime_type = None
encoding = None
if isinstance(val, dict):
fmt = val.get("format") # format as namespace:link
val = val["location"]
if fmt:
fmt = get_format(workflow_inputs[input_key]["format"]) # format as content-type
mime_type = fmt.mime_type or None
encoding = fmt.encoding or None # avoid empty string
# owslib only accepts strings, not numbers directly
if isinstance(val, (int, float)):
val = str(val)
if val.startswith("file://"):
# we need to host file starting with file:// scheme
val = self.host_file(val)
input_values.append((val, mime_type, encoding))
# need to use ComplexDataInput structure for complex input
# TODO: BoundingBox not supported
for input_value, mime_type, encoding in input_values:
if input_key in complex_inputs:
input_value = ComplexDataInput(input_value, mimeType=mime_type, encoding=encoding)
wps_inputs.append((input_key, input_value))
# prepare outputs
outputs_as_ref = [
(o.identifier, o.dataType == WPS_COMPLEX_DATA) for o in process.processOutputs
if o.identifier in expected_outputs
]
self.update_status("Executing job on remote WPS1 provider.",
REMOTE_JOB_PROGRESS_EXECUTION, status.STATUS_RUNNING)
mode = EXECUTE_MODE_ASYNC
execution = wps.execute(self.process, inputs=wps_inputs, output=outputs_as_ref, mode=mode, lineage=True)
if not execution.process and execution.errors:
raise execution.errors[0]
self.update_status("Monitoring job on remote WPS1 provider : [{0}]".format(self.provider),
REMOTE_JOB_PROGRESS_MONITORING, status.STATUS_RUNNING)
max_retries = 5
num_retries = 0
run_step = 0
job_id = "<undefined>"
while execution.isNotComplete() or run_step == 0:
if num_retries >= max_retries:
raise Exception("Could not read status document after {} retries. Giving up.".format(max_retries))
try:
execution = check_wps_status(location=execution.statusLocation,
sleep_secs=wait_secs(run_step), settings=self.settings)
job_id = execution.statusLocation.replace(".xml", "").split("/")[-1]
LOGGER.debug(get_log_monitor_msg(job_id, status.map_status(execution.getStatus()),
execution.percentCompleted, execution.statusMessage,
execution.statusLocation))
self.update_status(get_job_log_msg(status=status.map_status(execution.getStatus()),
message=execution.statusMessage,
progress=execution.percentCompleted,
duration=None), # get if available
map_progress(execution.percentCompleted,
REMOTE_JOB_PROGRESS_MONITORING, REMOTE_JOB_PROGRESS_FETCH_OUT),
status.STATUS_RUNNING)
except Exception as exc:
num_retries += 1
LOGGER.debug("Exception raised: %r", exc)
sleep(1)
else:
num_retries = 0
run_step += 1
if not execution.isSucceded():
exec_msg = execution.statusMessage or "Job failed."
LOGGER.debug(get_log_monitor_msg(job_id, status.map_status(execution.getStatus()),
execution.percentCompleted, exec_msg, execution.statusLocation))
raise Exception(execution.statusMessage or "Job failed.")
self.update_status("Fetching job outputs from remote WPS1 provider.",
REMOTE_JOB_PROGRESS_FETCH_OUT, status.STATUS_RUNNING)
results = [ows2json_output_data(output, process, self.settings) for output in execution.processOutputs]
for result in results:
result_id = get_any_id(result)
result_val = get_any_value(result)
# TODO Should we handle other type than File reference?
if result_id in expected_outputs:
# This is where cwl expect the output file to be written
# TODO We will probably need to handle multiple output value...
dst_fn = "/".join([out_dir.rstrip("/"), expected_outputs[result_id]])
# in case of ".*" glob pattern, replace specified extension with real value
if "." in result_val:
result_ext = "." + result_val.rsplit("/")[-1].rsplit(".", 1)[-1]
else:
result_fmt = get_field(result, "mime_type",
search_variations=True,
default=CONTENT_TYPE_TEXT_PLAIN)
result_ext = get_extension(result_fmt)
dst_fn = "{}{}".format(dst_fn.rsplit(".", 1)[0], result_ext)
resp = request_extra("get", result_val, allow_redirects=True, settings=self.settings)
LOGGER.debug("Fetching result output from [%s] to cwl output destination: [%s]", result_val, dst_fn)
with open(dst_fn, mode="wb") as dst_fh:
dst_fh.write(resp.content)
except Exception as exc:
exception_class = "{}.{}".format(type(exc).__module__, type(exc).__name__)
errors = "{0}: {1!s}".format(exception_class, exc)
LOGGER.exception(exc)
raise Exception(errors)
self.update_status("Execution on remote WPS1 provider completed.",
REMOTE_JOB_PROGRESS_COMPLETED, status.STATUS_SUCCEEDED)