Source code for weaver.processes.wps_process_base
from abc import abstractmethod
from time import sleep
from typing import TYPE_CHECKING
import requests
from pyramid.httpexceptions import HTTPBadGateway
from pyramid.settings import asbool
from pyramid_celery import celery_app as app
from weaver.formats import CONTENT_TYPE_APP_JSON
from weaver.utils import get_cookie_headers, get_settings
from weaver.wps import get_wps_output_dir, get_wps_output_url
if TYPE_CHECKING:
from weaver.typedefs import CWL # noqa: F401
from typing import AnyStr, Dict # noqa: F401
from pywps.app import WPSRequest # noqa: F401
[docs]class WpsProcessInterface(object):
"""
Common interface for WpsProcess to be used is cwl jobs
"""
@abstractmethod
[docs] def execute(self,
workflow_inputs, # type: CWL
out_dir, # type: AnyStr
expected_outputs, # type: Dict[AnyStr, AnyStr]
):
"""
Execute a remote process using the given inputs.
The function is expected to monitor the process and update the status.
Retrieve the expected outputs and store them in the ``out_dir``.
:param workflow_inputs: `CWL` job dict
:param out_dir: directory where the outputs must be written
:param expected_outputs: expected value outputs as `{'id': 'value'}`
"""
raise NotImplementedError
def __init__(self, request):
# type: (WPSRequest) -> None
self.request = request
self.cookies = get_cookie_headers(self.request.http_request.headers)
self.headers = {"Accept": CONTENT_TYPE_APP_JSON, "Content-Type": CONTENT_TYPE_APP_JSON}
self.settings = get_settings(app)
self.verify = asbool(self.settings.get("weaver.ows_proxy_ssl_verify", True))
[docs] def make_request(self, method, url, retry, status_code_mock=None, **kwargs):
response = requests.request(method,
url=url,
headers=self.headers,
cookies=self.cookies,
verify=self.verify,
**kwargs)
# TODO: Remove patch for Geomatys unreliable server
if response.status_code == HTTPBadGateway.code and retry:
sleep(10)
response = self.make_request(method, url, False, **kwargs)
if response.status_code == HTTPBadGateway.code and status_code_mock:
response.status_code = status_code_mock
return response
@staticmethod
[docs] def host_file(file_name):
settings = get_settings(app)
weaver_output_url = get_wps_output_url(settings)
weaver_output_dir = get_wps_output_dir(settings)
file_name = file_name.replace("file://", "")
if not file_name.startswith(weaver_output_dir):
raise Exception("Cannot host files outside of the output path : {0}".format(file_name))
return file_name.replace(weaver_output_dir, weaver_output_url)