Source code for weaver.wps.service

import logging
import os
from configparser import ConfigParser
from typing import TYPE_CHECKING

from owslib.wps import WPSExecution
from pyramid.httpexceptions import HTTPBadRequest, HTTPSeeOther
from pyramid_celery import celery_app as app
from import Process as ProcessWPS, WPSRequest
from import Service as ServiceWPS
from import StorageAbstract
from pywps.response import WPSResponse
from pywps.response.execute import ExecuteResponse

from weaver.database import get_db
from weaver.datatype import Process
from weaver.exceptions import handle_known_exceptions
from weaver.formats import CONTENT_TYPE_APP_JSON
from weaver.owsexceptions import OWSNoApplicableCode
from weaver.processes.convert import wps2json_job_payload
from weaver.processes.execution import submit_job_handler
from weaver.processes.types import PROCESS_WORKFLOW
from weaver.processes.utils import get_job_submission_response, get_process
from import StoreProcesses
from weaver.utils import get_header, get_settings, get_weaver_url
from weaver.visibility import VISIBILITY_PUBLIC
from weaver.wps.utils import check_wps_status, get_wps_local_status_location, load_pywps_config
from weaver.wps_restapi import swagger_definitions as sd

[docs]LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING: from typing import Any, Dict, List, Optional, Union from weaver.processes.convert import WPS_Input_Type, WPS_Output_Type from weaver.typedefs import HTTPValid, JSON, SettingsType
[docs]class ReferenceStatusLocationStorage(StorageAbstract): """ Simple storage that simply redirects to a pre-existing status location. """ # pylint: disable=W0222 # ignore mismatch signature of method params not employed def __init__(self, url_location, settings): # type: (str, SettingsType) -> None self._url = url_location # location might not exist yet based on worker execution timing self._file = get_wps_local_status_location(url_location, settings, must_exist=False)
[docs] def url(self, *_, **__): return self._url
[docs] def location(self, *_, **__): return self._file
[docs] def store(self, *_, **__): pass
[docs] def write(self, *_, **__): pass
[docs]class WorkerExecuteResponse(ExecuteResponse): """ XML response generator from predefined job status URL and executed process definition. """ # pylint: disable=W0231,W0233 # FIXME: tmp until patched def __init__(self, wps_request, uuid, process, job_url, settings, *_, **__): # type: (WPSRequest, str, ProcessWPS, str, SettingsType, Any, Any) -> None # FIXME: # temp patch, do what 'ExecuteResponse.__init__' does bypassing the problem super() call WPSResponse.__init__(self, wps_request, uuid) # pylint: disable=W0231,W0233 # tmp until patched self.process = process self.outputs = {o.identifier: o for o in self.process.outputs} # should be following call, but causes infinite recursion until above fix is applied # super(WorkerExecuteResponse, self).__init__(wps_request, job_id, process=wps_process) # --- end of patch --- # extra setup self.process._status_store = ReferenceStatusLocationStorage(job_url, settings) self.store_status_file = True # enforce storage to provide the status location URL self.wps_request.raw = None # make sure doc gets generated by disabling alternate raw data mode self._update_status_doc() # generate 'doc' property with XML content for response
[docs]class WorkerService(ServiceWPS): """ Dispatches PyWPS requests from WPS-1/2 XML endpoint to WPS-REST as appropriate. .. note:: For every WPS-Request type, the parsing of XML content is already handled by the PyWPS service for GET/POST. All data must be retrieved from parsed :class:`WPSRequest` to avoid managing argument location and WPS versions. When ``GetCapabilities`` or ``DescribeProcess`` requests are received, directly return to result as XML based on content (no need to subprocess as Celery task that gets resolved quickly with only the process(es) details). When JSON content is requested, instead return the redirect link to corresponding WPS-REST API endpoint. When receiving ``Execute`` request, convert the XML payload to corresponding JSON and dispatch it to the Celery Worker to actually process it after job setup for monitoring. """ def __init__(self, *_, is_worker=False, settings=None, **__): super(WorkerService, self).__init__(*_, **__) self.is_worker = is_worker self.settings = settings or get_settings(app) self.dispatched_processes = {} # type: Dict[str, Process] @handle_known_exceptions
[docs] def _get_capabilities_redirect(self, wps_request, *_, **__): # type: (WPSRequest, Any, Any) -> Optional[Union[WPSResponse, HTTPValid]] """ Redirects to WPS-REST endpoint if requested ``Content-Type`` is JSON. """ req = wps_request.http_request accept_type = get_header("Accept", req.headers) if accept_type == CONTENT_TYPE_APP_JSON: url = get_weaver_url(self.settings) resp = HTTPSeeOther(location="{}{}".format(url, sd.processes_service.path)) # redirect setattr(resp, "_update_status", lambda *_, **__: None) # patch to avoid pywps server raising return resp return None
[docs] def get_capabilities(self, wps_request, *_, **__): # type: (WPSRequest, Any, Any) -> Union[WPSResponse, HTTPValid] """ Redirect to WPS-REST endpoint if requested ``Content-Type`` is JSON or handle ``GetCapabilities`` normally. """ resp = self._get_capabilities_redirect(wps_request, *_, **__) return resp or super(WorkerService, self).get_capabilities(wps_request, *_, **__)
[docs] def _describe_process_redirect(self, wps_request, *_, **__): # type: (WPSRequest, Any, Any) -> Optional[Union[WPSResponse, HTTPValid]] """ Redirects to WPS-REST endpoint if requested ``Content-Type`` is JSON. """ req = wps_request.http_request accept_type = get_header("Accept", req.headers) if accept_type == CONTENT_TYPE_APP_JSON: url = get_weaver_url(self.settings) proc = wps_request.identifiers if not proc: raise HTTPBadRequest(sd.BadRequestGetProcessInfoResponse.description) if len(proc) > 1: raise HTTPBadRequest("Unsupported multi-process ID for description. Only provide one.") path = sd.process_service.path.format(process_id=proc[0]) resp = HTTPSeeOther(location="{}{}".format(url, path)) # redirect setattr(resp, "_update_status", lambda *_, **__: None) # patch to avoid pywps server raising return resp return None
[docs] def describe(self, wps_request, *_, **__): # type: (WPSRequest, Any, Any) -> Union[WPSResponse, HTTPValid] """ Redirect to WPS-REST endpoint if requested ``Content-Type`` is JSON or handle ``DescribeProcess`` normally. """ resp = self._describe_process_redirect(wps_request, *_, **__) return resp or super(WorkerService, self).describe(wps_request, *_, **__)
[docs] def _submit_job(self, wps_request): # type: (WPSRequest) -> Union[WPSResponse, HTTPValid, JSON] """ Dispatch operation to WPS-REST endpoint, which in turn should call back the real Celery Worker for execution. """ req = wps_request.http_request pid = wps_request.identifier proc = get_process(process_id=pid, settings=self.settings) # raises if invalid or missing wps_process = self.processes.get(pid) # create the JSON payload from the XML content and submit job is_workflow = proc.type == PROCESS_WORKFLOW tags = req.args.get("tags", "").split(",") + ["xml", "wps-{}".format(wps_request.version)] data = wps2json_job_payload(wps_request, wps_process) body = submit_job_handler(data, self.settings, proc.processEndpointWPS1, process_id=pid, is_local=True, is_workflow=is_workflow, visibility=VISIBILITY_PUBLIC, language=wps_request.language, tags=tags, auth=dict(req.headers)) # if Accept was JSON, provide response content as is accept_type = get_header("Accept", req.headers) if accept_type == CONTENT_TYPE_APP_JSON: resp = get_job_submission_response(body) setattr(resp, "_update_status", lambda *_, **__: None) # patch to avoid pywps server raising return resp return body
[docs] def prepare_process_for_execution(self, identifier): # type: (str) -> ProcessWPS """ Handles dispatched remote provider process preparation during execution request. """ # remote provider processes to instantiate dispatch_process = self.dispatched_processes.get(identifier) if dispatch_process: LOGGER.debug("Preparing dispatched remote provider process definition for execution: [%s]", identifier) try: self.processes[identifier] = dispatch_process.wps() # prepare operation looks within this mapping process_wps = super(WorkerService, self).prepare_process_for_execution(identifier) except Exception as exc: LOGGER.error("Error occurred during remote provider process creation for execution.", exc_info=exc) raise finally: # cleanup temporary references self.dispatched_processes.pop(identifier, None) self.processes.pop(identifier, None) return process_wps # local processes already loaded by the service return super(WorkerService, self).prepare_process_for_execution(identifier)
[docs] def execute(self, identifier, wps_request, uuid): # type: (str, WPSRequest, str) -> Union[WPSResponse, HTTPValid] """ Submit WPS request to corresponding WPS-REST endpoint and convert back for requested ``Accept`` content-type. Overrides the original execute operation, that instead will get handled by :meth:`execute_job` following callback from Celery Worker that handles process job creation and monitoring. If ``Accept`` is JSON, the result is directly returned from :meth:`_submit_job`. If ``Accept`` is XML or undefined, :class:`WorkerExecuteResponse` converts the received JSON with XML template. """ result = self._submit_job(wps_request) if not isinstance(result, dict): # JSON return result # direct WPS response # otherwise, recreate the equivalent content with expected XML template format job_id = result["jobID"] job_url = result["location"] wps_process = self.processes.get(wps_request.identifier) # when called by the WSGI app, 'WorkerExecuteResponse.__call__' on will generate the XML from 'doc' property, # which itself is generated by template substitution of data from above 'json' property try: return WorkerExecuteResponse(wps_request, job_id, wps_process, job_url, settings=self.settings) except Exception as ex: # noqa LOGGER.exception("Error building XML response by PyWPS Service during WPS Execute result from worker.") message = "Failed building XML response from WPS Execute result. Error [{!r}]".format(ex) raise OWSNoApplicableCode(message, locator=job_id)
[docs] def execute_job(self, process_id, wps_inputs, wps_outputs, mode, job_uuid, remote_process): # type: (str, List[WPS_Input_Type], List[WPS_Output_Type], str, str, Optional[Process]) -> WPSExecution """ Real execution of the process by active Celery Worker. """ execution = WPSExecution(version="2.0", url="localhost") xml_request = execution.buildRequest(process_id, wps_inputs, wps_outputs, mode=mode, lineage=True) wps_request = WPSRequest() wps_request.identifier = process_id wps_request.set_version("2.0.0") request_parser = wps_request._post_request_parser(wps_request.WPS.Execute().tag) # noqa: W0212 request_parser(xml_request) # NOTE: # Setting 'status = false' will disable async execution of '' # but this is needed since this job is running within Celery worker already async # (daemon process can't have children processes) # Because if how the code in PyWPS is made, we have to re-enable creation of status file wps_request.status = "false" # When 'execute' is called, pywps will in turn call 'prepare_process_for_execution', # which then setups and retrieves currently loaded 'local' processes. # Since only local processes were defined by 'get_pywps_service', # a temporary process must be added for remote providers execution. if not remote_process: worker_process_id = process_id else: worker_process_id = "wps_package-{}-{}".format(process_id, job_uuid) self.dispatched_processes[worker_process_id] = remote_process wps_response = super(WorkerService, self).execute(worker_process_id, wps_request, job_uuid) wps_response.store_status_file = True # update execution status with actual status file and apply required references execution = check_wps_status(location=wps_response.process.status_location, settings=self.settings) execution.request = xml_request return execution
[docs]def get_pywps_service(environ=None, is_worker=False): """ Generates the PyWPS Service that provides WPS-1/2 XML endpoint. """ environ = environ or {} try: # get config file settings = get_settings(app) pywps_cfg = environ.get("PYWPS_CFG") or settings.get("PYWPS_CFG") or os.getenv("PYWPS_CFG") if not isinstance(pywps_cfg, ConfigParser) or not settings.get("weaver.wps_configured"): load_pywps_config(app, config=pywps_cfg) # call pywps application with processes filtered according to the adapter's definition process_store = get_db(app).get_store(StoreProcesses) # type: StoreProcesses processes_wps = [process.wps() for process in process_store.list_processes(visibility=VISIBILITY_PUBLIC)] service = WorkerService(processes_wps, is_worker=is_worker, settings=settings) except Exception as ex: LOGGER.exception("Error occurred during PyWPS Service and/or Processes setup.") raise OWSNoApplicableCode("Failed setup of PyWPS Service and/or Processes. Error [{!r}]".format(ex)) return service