Source code for weaver.wps_restapi.providers.providers

import logging
import warnings
from typing import TYPE_CHECKING

from pyramid.httpexceptions import HTTPCreated, HTTPNoContent, HTTPNotFound, HTTPOk

from weaver.database import get_db
from weaver.datatype import Service
from weaver.exceptions import ServiceNotFound, log_unhandled_exceptions
from weaver.formats import OUTPUT_FORMAT_JSON
from weaver.owsexceptions import OWSMissingParameterValue, OWSNotImplemented
from weaver.processes.types import PROCESS_WPS_REMOTE
from weaver.store.base import StoreServices
from weaver.utils import get_any_id, get_settings, request_extra
from weaver.warning import NonBreakingExceptionWarning
from weaver.wps.utils import get_wps_client
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url

[docs]LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING: from typing import List from pyramid.request import Request
[docs]def get_provider_services(request): # type: (Request) -> List[Service] """ Obtain the list of remote provider services. """ settings = get_settings(request) store = get_db(settings).get_store(StoreServices) providers = [] for service in store.list_services(): # pre-check service location # status can be 500 because of missing query params, but faster skip of invalid references # this avoids long pending connexions that never resolve because of down server try: resp = request_extra("head", service.url, timeout=1, settings=settings) if resp.status_code == 404: LOGGER.warning("Skipping unresponsive service (%s) [%s]", service.name, service.url) continue except Exception as exc: msg = "Exception occurred while fetching wps {0} : {1!r}".format(service.url, exc) warnings.warn(msg, NonBreakingExceptionWarning) else: providers.append(service) return providers
@sd.providers_service.get(tags=[sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON, schema=sd.GetProviders(), response_schemas=sd.get_providers_list_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_providers(request): """ Lists registered providers. """ reachable_services = get_provider_services(request) providers = [] for service in reachable_services: summary = service.summary(request) if summary: providers.append(summary) return HTTPOk(json={"providers": providers})
[docs]def get_capabilities(service, request): """ GetCapabilities of a wps provider. """ wps = get_wps_client(service.url, request) url = get_wps_restapi_base_url(request) return { "id": service.name, "title": wps.identification.title, "abstract": wps.identification.abstract, "url": "{base_url}/providers/{provider_id}".format(base_url=url, provider_id=service.name), "processes": "{base_url}/providers/{provider_id}/processes".format(base_url=url, provider_id=service.name), "type": PROCESS_WPS_REMOTE, "contact": wps.provider.contact.name,
}
[docs]def get_service(request): """ Get the request service using provider_id from the service store. """ store = get_db(request).get_store(StoreServices) provider_id = request.matchdict.get("provider_id") try: service = store.fetch_by_name(provider_id) except ServiceNotFound: raise HTTPNotFound("Provider {0} cannot be found.".format(provider_id)) return service, store
@sd.providers_service.post(tags=[sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON, schema=sd.PostProvider(), response_schemas=sd.post_provider_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def add_provider(request): """ Add a provider. """ store = get_db(request).get_store(StoreServices) try: new_service = Service(url=request.json["url"], name=get_any_id(request.json)) except KeyError as exc: raise OWSMissingParameterValue("Missing json parameter '{!s}'.".format(exc), value=exc) if "public" in request.json: new_service["public"] = request.json["public"] if "auth" in request.json: new_service["auth"] = request.json["auth"] try: store.save_service(new_service) except NotImplementedError: raise OWSNotImplemented(sd.NotImplementedPostProviderResponse.description, value=new_service) return HTTPCreated(json=get_capabilities(new_service, request))
@sd.provider_service.delete(tags=[sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderEndpoint(), response_schemas=sd.delete_provider_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def remove_provider(request): """ Remove a provider. """ service, store = get_service(request) try: store.delete_service(service.name) except NotImplementedError: raise OWSNotImplemented(sd.NotImplementedDeleteProviderResponse.description) return HTTPNoContent(json={})
@sd.provider_service.get(tags=[sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderEndpoint(), response_schemas=sd.get_provider_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_provider(request): """ Get a provider definition (GetCapabilities). """ service, _ = get_service(request) return HTTPOk(json=get_capabilities(service, request))