import logging
from typing import TYPE_CHECKING
import colander
from celery.exceptions import TimeoutError as CeleryTaskTimeoutError
from pyramid.httpexceptions import HTTPAccepted, HTTPCreated, HTTPOk, HTTPPaymentRequired, HTTPUnprocessableEntity
from pyramid.settings import asbool
from weaver.config import WeaverFeature, get_weaver_configuration
from weaver.database import get_db
from weaver.datatype import Bill, Quote
from weaver.exceptions import log_unhandled_exceptions
from weaver.execute import ExecuteMode, parse_prefer_header_execute_mode
from weaver.formats import ContentType, OutputFormat
from weaver.owsexceptions import OWSInvalidParameterValue
from weaver.processes.execution import validate_process_io
from weaver.processes.types import ProcessType
from weaver.processes.utils import get_process
from weaver.quotation.estimation import (
execute_quote_estimator,
get_currency,
get_quote_estimator_config,
validate_quote_estimator_config
)
from weaver.sort import Sort
from weaver.store.base import StoreBills, StoreProcesses, StoreQuotes
from weaver.utils import as_int, get_header, get_settings
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.processes.processes import submit_local_job
from weaver.wps_restapi.quotation.utils import get_quote
if TYPE_CHECKING:
from typing import Type, Union
from pyramid.config import Configurator
from weaver.typedefs import AnySettingsContainer, AnyViewResponse, PyramidRequest
[docs]
LOGGER = logging.getLogger(__name__)
[docs]
def get_quote_response(quote, # type: Quote
http_class, # type: Union[Type[HTTPOk], Type[HTTPCreated]]
schema_class, # type: Type[colander.SchemaNode]
description, # type: str
settings, # type: AnySettingsContainer
): # type: (...) -> Union[HTTPOk, HTTPCreated]
"""
Validate the quote contents and generate the response with its relevant details.
"""
data = quote.json()
data.update({"description": description})
data.update({"links": quote.links(settings)})
data = schema_class().deserialize(data)
return http_class(json=data)
@sd.process_quotes_service.post(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_PROCESSES],
schema=sd.PostProcessQuoteRequestEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.post_quotes_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def request_quote(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Request a quotation for a process.
"""
settings = get_settings(request)
weaver_config = get_weaver_configuration(settings)
process = get_process(request=request, settings=settings)
if (
(process.type not in [ProcessType.APPLICATION, ProcessType.WORKFLOW]) or
(process.type == ProcessType.WORKFLOW and weaver_config not in WeaverFeature.REMOTE)
):
raise HTTPUnprocessableEntity(json={
"title": "UnsupportedOperation",
"detail": f"Unsupported quotation for process type '{process.type}' on '{weaver_config}' instance.",
"status": HTTPUnprocessableEntity.code,
"instance": process.href(settings)
})
if not get_quote_estimator_config(process, ignore_error=True):
raise HTTPUnprocessableEntity(json={
"title": "UnsupportedOperation",
"detail": (
f"Unsupported quotation for process '{process.id}'. "
"It does not provide an estimator configuration."
),
"status": HTTPUnprocessableEntity.code,
"instance": process.href(settings)
})
try:
process_params = sd.QuoteProcessParametersSchema().deserialize(request.json)
except colander.Invalid as exc:
raise OWSInvalidParameterValue(json={
"title": "InvalidParameterValue",
"cause": f"Invalid schema: [{exc.msg!s}]",
"error": exc.__class__.__name__,
"value": exc.value
})
validate_process_io(process, process_params)
quote_store = get_db(request).get_store(StoreQuotes)
quote_user = request.authenticated_userid # FIXME: consider other methods to provide the user
quote_currency = get_currency(request)
quote_info = {
"process": process.id,
"processParameters": process_params,
"user": quote_user,
"currency": quote_currency, # requested currency until evaluated, overridden if exchange rate not resolvable
}
quote = Quote(**quote_info)
quote = quote_store.save_quote(quote)
quote_max_wait = settings.get("weaver.quotation_sync_max_wait", settings.get("weaver.quote_sync_max_wait"))
quote_max_wait = as_int(quote_max_wait, default=20)
mode, wait, applied = parse_prefer_header_execute_mode(request.headers, process.jobControlOptions, quote_max_wait)
result = execute_quote_estimator.delay(quote.id)
LOGGER.debug("Celery pending task [%s] for quote [%s].", result.id, quote.id)
if mode == ExecuteMode.SYNC and wait:
LOGGER.debug("Celery task requested as sync if it completes before (wait=%ss)", wait)
try:
result.wait(timeout=wait)
except CeleryTaskTimeoutError:
pass
if result.ready():
quote = quote_store.fetch_by_id(quote.id)
resp = get_quote_response(quote, HTTPCreated, sd.QuoteSchema, sd.CreatedQuoteResponse.description, settings)
return resp
else:
LOGGER.debug("Celery task requested as sync took too long to complete (wait=%ss). Continue in async.", wait)
# sync not respected, therefore must drop it
# since both could be provided as alternative preferences, drop only async with limited subset
prefer = get_header("Preference-Applied", applied, pop=True)
_, _, async_applied = parse_prefer_header_execute_mode({"Prefer": prefer}, [ExecuteMode.ASYNC])
applied = async_applied
data = quote.partial()
data.update({"description": sd.AcceptedQuoteResponse.description})
headers = {"Location": quote.href(settings)}
headers.update(applied)
return HTTPAccepted(headers=headers, json=data)
@sd.process_estimator_service.get(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_PROCESSES],
schema=sd.ProcessQuoteEstimatorGetEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.get_process_quote_estimator_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def get_process_quote_estimator(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Get the process quote estimator configuration.
"""
process = get_process(request=request)
estimator_config = get_quote_estimator_config(process)
return HTTPOk(json=estimator_config)
@sd.process_estimator_service.put(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_PROCESSES],
content_type=ContentType.APP_JSON,
schema=sd.ProcessQuoteEstimatorPutEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.put_process_quote_estimator_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def update_process_quote_estimator(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Replace the process quote estimator configuration.
"""
estimator_config = validate_quote_estimator_config(request.json)
store = get_db(request).get_store(StoreProcesses)
process = get_process(request=request, store=store)
store.set_estimator(process, estimator_config)
return HTTPOk(json={"description": "Process quote estimator updated."})
@sd.process_estimator_service.delete(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_PROCESSES],
schema=sd.ProcessQuoteEstimatorDeleteEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.delete_process_quote_estimator_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def delete_process_quote_estimator(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Reset the process quote estimator configuration to the default values.
"""
store = get_db(request).get_store(StoreProcesses)
process = get_process(request=request, store=store)
store.set_estimator(process, {})
return HTTPOk(json={"description": "Process quote estimator deleted. Defaults will be used."})
@sd.process_quotes_service.get(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_PROCESSES],
schema=sd.ProcessQuotesEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.get_quote_list_responses,
)
@sd.quotes_service.get(
tags=[sd.TAG_BILL_QUOTE],
schema=sd.QuotesEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.get_quote_list_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def get_quote_list(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Get list of quotes IDs.
"""
page = int(request.params.get("page", "0"))
limit = int(request.params.get("limit", "10"))
detail = asbool(request.params.get("detail", False))
filters = {
"process_id": request.params.get("process", None) or request.matchdict.get("process_id", None),
"page": page,
"limit": limit,
"sort": request.params.get("sort", Sort.CREATED),
}
store = get_db(request).get_store(StoreQuotes)
items, total = store.find_quotes(**filters)
quotes = [quote.partial() if detail else quote.id for quote in items]
data = {
"page": page,
"limit": limit,
"count": len(quotes),
"total": total,
"quotations": quotes
}
body = sd.QuotationListSchema().deserialize(data)
return HTTPOk(json=body)
@sd.process_quote_service.get(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_PROCESSES],
schema=sd.ProcessQuoteEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.get_quote_responses,
)
@sd.quote_service.get(
tags=[sd.TAG_BILL_QUOTE],
schema=sd.QuoteEndpoint(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.get_quote_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def get_quote_info(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Get quote information.
"""
quote = get_quote(request)
resp = get_quote_response(quote, HTTPOk, sd.QuoteSchema, sd.OkGetQuoteInfoResponse.description, request)
return resp
@sd.process_quote_service.post(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_EXECUTE, sd.TAG_PROCESSES],
schema=sd.PostProcessQuote(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.post_quote_responses,
)
@sd.quote_service.post(
tags=[sd.TAG_BILL_QUOTE, sd.TAG_EXECUTE],
content_type=ContentType.APP_JSON,
schema=sd.PostQuote(),
accept=ContentType.APP_JSON,
renderer=OutputFormat.JSON,
response_schemas=sd.post_quote_responses,
)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]
def execute_quote(request):
# type: (PyramidRequest) -> AnyViewResponse
"""
Execute a quoted process.
"""
quote = get_quote(request)
if not quote.paid:
raise HTTPPaymentRequired(json={
"title": "Payment Required",
"detail": sd.QuotePaymentRequiredResponse.description,
"status": HTTPPaymentRequired.code,
"value": str(quote.id),
})
quote_bill_info = {
"quote": quote.id,
"price": quote.price,
}
job_resp = submit_local_job(request)
job_json = job_resp.json
job_id = job_json.get("jobID")
user_id = str(request.authenticated_userid)
store = get_db(request).get_store(StoreBills)
bill = store.save_bill(Bill(user=user_id, job=job_id, **quote_bill_info))
job_json.update({
"billID": bill.id,
"quoteID": quote.id,
})
data = sd.CreatedQuoteExecuteResponse().deserialize(job_json)
return HTTPCreated(json=data)
[docs]
def includeme(config):
# type: (Configurator) -> None
LOGGER.info("Adding WPS REST API quote views...")
config.add_cornice_service(sd.process_estimator_service)
config.add_cornice_service(sd.process_quotes_service)
config.add_cornice_service(sd.process_quote_service)
config.add_cornice_service(sd.quotes_service)
config.add_cornice_service(sd.quote_service)