from typing import TYPE_CHECKING
from celery.utils.log import get_task_logger
from colander import Invalid
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPNotFound,
HTTPOk,
HTTPPermanentRedirect,
HTTPUnauthorized,
HTTPUnprocessableEntity
)
from pyramid.request import Request
from pyramid.settings import asbool
from pyramid_celery import celery_app as app
from notify import encrypt_email
from weaver import status
from weaver.database import get_db
from weaver.datatype import Job
from weaver.exceptions import (
InvalidIdentifierValue,
JobNotFound,
ProcessNotAccessible,
ProcessNotFound,
ServiceNotAccessible,
ServiceNotFound,
log_unhandled_exceptions
)
from weaver.formats import CONTENT_TYPE_TEXT_PLAIN, OUTPUT_FORMAT_JSON, get_format
from weaver.owsexceptions import OWSNotFound
from weaver.processes.convert import any2wps_literal_datatype
from weaver.store.base import StoreJobs, StoreProcesses, StoreServices
from weaver.utils import get_any_id, get_any_value, get_settings
from weaver.visibility import VISIBILITY_PUBLIC
from weaver.wps.utils import get_wps_output_url
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.providers.utils import forbid_local_only
from weaver.wps_restapi.swagger_definitions import datetime_interval_parser
if TYPE_CHECKING:
from typing import List, Optional, Tuple, Union
from pyramid.httpexceptions import HTTPException
from weaver.typedefs import AnySettingsContainer, JSON
[docs]LOGGER = get_task_logger(__name__)
[docs]def get_job(request):
# type: (Request) -> Job
"""
Obtain a job from request parameters.
:returns: Job information if found.
:raise HTTPNotFound: with JSON body details on missing/non-matching job, process, provider IDs.
"""
job_id = request.matchdict.get("job_id")
store = get_db(request).get_store(StoreJobs)
try:
job = store.fetch_by_id(job_id)
except JobNotFound:
raise OWSNotFound(code="NoSuchJob", locator="JobID", description="Could not find job with specified 'job_id'.")
provider_id = request.matchdict.get("provider_id", job.service)
process_id = request.matchdict.get("process_id", job.process)
if provider_id:
forbid_local_only(request)
if job.service != provider_id:
raise OWSNotFound(
code="NoSuchProvider",
locator="provider",
description="Could not find job corresponding to specified 'provider_id'."
)
if job.process != process_id:
raise OWSNotFound(
code="NoSuchProcess",
locator="process",
description="Could not find job corresponding to specified 'process_id'."
)
return job
[docs]def get_results(job, container, value_key=None, ogc_api=False):
# type: (Job, AnySettingsContainer, Optional[str], bool) -> Union[List[JSON], JSON]
"""
Obtains the job results with extended full WPS output URL as applicable and according to configuration settings.
:param job: job from which to retrieve results.
:param container: any container giving access to instance settings (to resolve reference output location).
:param value_key:
If not specified, the returned values will have the appropriate ``data``/``href`` key according to the content.
Otherwise, all values will have the specified key.
:param ogc_api:
If ``True``, formats the results using the ``OGC-API - Processes`` format.
:returns: list of all outputs each with minimally an ID and value under the requested key.
"""
wps_url = get_wps_output_url(container)
if not wps_url.endswith("/"):
wps_url = wps_url + "/"
outputs = {} if ogc_api else []
fmt_key = "mediaType" if ogc_api else "mimeType"
for result in job.results:
rtype = "data" if any(k in result for k in ["data", "value"]) else "href"
value = get_any_value(result)
out_id = get_any_id(result)
out_key = rtype
if rtype == "href":
# fix paths relative to instance endpoint, but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.)
if value.startswith("/"):
value = str(value).lstrip("/")
if "://" not in value:
value = wps_url + value
elif ogc_api:
out_key = "value"
elif value_key:
out_key = value_key
output = {out_key: value}
if rtype == "href": # required for the rest to be there, other fields optional
if "mimeType" not in result:
result["mimeType"] = get_format(value, default=CONTENT_TYPE_TEXT_PLAIN).mime_type
output["format"] = {fmt_key: result["mimeType"]}
for field in ["encoding", "schema"]:
if field in result:
output["format"][field] = result[field]
elif rtype != "href":
# literal data
# FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51)
dtype = result.get("dataType", any2wps_literal_datatype(value, is_value=True) or "string")
if ogc_api:
output["dataType"] = {"name": dtype}
else:
output["dataType"] = dtype
if ogc_api:
if out_id in outputs:
output_list = outputs[out_id]
if not isinstance(output_list, list):
output_list = [output_list]
output_list.append(output)
outputs[out_id] = output_list
else:
outputs[out_id] = output
else:
# if ordered insert supported by python version, insert ID first
output = dict([("id", out_id)] + list(output.items())) # noqa
outputs.append(output)
return outputs
[docs]def validate_service_process(request):
# type: (Request) -> Tuple[Optional[str], Optional[str]]
"""
Verifies that service or process specified by path or query will raise the appropriate error if applicable.
"""
service_name = request.matchdict.get("provider_id", None) or request.params.get("service", None)
process_name = request.matchdict.get("process_id", None) or request.params.get("process", None)
item_test = None
item_type = None
try:
service = None
if service_name:
forbid_local_only(request)
item_type = "Service"
item_test = service_name
store = get_db(request).get_store(StoreServices)
service = store.fetch_by_name(service_name, visibility=VISIBILITY_PUBLIC)
if process_name:
item_type = "Process"
item_test = process_name
# local process
if not service:
store = get_db(request).get_store(StoreProcesses)
store.fetch_by_id(process_name, visibility=VISIBILITY_PUBLIC)
# remote process
else:
processes = service.processes(request)
if process_name not in [p.id for p in processes]:
raise ProcessNotFound
except (ServiceNotFound, ProcessNotFound):
raise HTTPNotFound(json={
"code": "NoSuch{}".format(item_type),
"description": "{} of id '{}' cannot be found.".format(item_type, item_test)
})
except (ServiceNotAccessible, ProcessNotAccessible):
raise HTTPUnauthorized(json={
"code": "Unauthorized{}".format(item_type),
"description": "{} of id '{}' is not accessible.".format(item_type, item_test)
})
except InvalidIdentifierValue as ex:
raise HTTPBadRequest(json={
"code": InvalidIdentifierValue.__name__,
"description": str(ex)
})
return service_name, process_name
@sd.provider_jobs_service.get(tags=[sd.TAG_JOBS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetProviderJobsEndpoint(), response_schemas=sd.get_prov_all_jobs_responses)
@sd.process_jobs_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetProcessJobsEndpoint(), response_schemas=sd.get_all_jobs_responses)
@sd.jobs_service.get(tags=[sd.TAG_JOBS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetJobsEndpoint(), response_schemas=sd.get_all_jobs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_queried_jobs(request):
"""
Retrieve the list of jobs which can be filtered, sorted, paged and categorized using query parameters.
"""
settings = get_settings(request)
service, process = validate_service_process(request)
if service:
forbid_local_only(settings)
filters = {**request.params, "process": process, "provider": service}
filters["detail"] = asbool(request.params.get("detail"))
if request.params.get("datetime", False):
# replace white space with '+' since request.params replaces '+' with whitespaces when parsing
filters["datetime"] = request.params["datetime"].replace(" ", "+")
try:
filters = sd.GetJobsQueries().deserialize(filters)
except Invalid as ex:
raise HTTPUnprocessableEntity(json={
"code": Invalid.__name__,
"description": str(ex)
})
else:
detail = filters.pop("detail", False)
groups = filters.pop("groups", "").split(",") if filters.get("groups", False) else filters.pop("groups", None)
filters["tags"] = list(filter(lambda s: s, filters["tags"].split(",") if filters.get("tags", False) else ""))
filters["notification_email"] = (
encrypt_email(filters["notification_email"], settings)
if filters.get("notification_email", False) else None
)
filters["datetime"] = datetime_interval_parser(filters["datetime"]) if filters.get("datetime", False) else None
filters["service"] = filters.pop("provider", None)
if (
filters["datetime"]
and filters["datetime"].get("before", False)
and filters["datetime"].get("after", False)
and filters["datetime"]["after"] > filters["datetime"]["before"]
):
raise HTTPUnprocessableEntity(json={
"code": "InvalidDateFormat",
"description": "Datetime at the start of the interval must be less than the datetime at the end."
})
store = get_db(request).get_store(StoreJobs)
items, total = store.find_jobs(request=request, group_by=groups, **filters)
body = {"total": total}
def _job_list(jobs):
return [j.json(settings) if detail else j.id for j in jobs]
if groups:
for grouped_jobs in items:
grouped_jobs["jobs"] = _job_list(grouped_jobs["jobs"])
body.update({"groups": items})
else:
body.update({"jobs": _job_list(items), "page": filters["page"], "limit": filters["limit"]})
body = sd.GetQueriedJobsSchema().deserialize(body)
return HTTPOk(json=body)
@sd.provider_job_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderJobEndpoint(), response_schemas=sd.get_prov_single_job_status_responses)
@sd.job_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATUS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobEndpoint(), response_schemas=sd.get_single_job_status_responses)
@sd.process_job_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS, sd.TAG_STATUS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetProcessJobEndpoint(), response_schemas=sd.get_single_job_status_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_status(request):
"""
Retrieve the status of a job.
"""
job = get_job(request)
job_status = job.json(request, self_link="status")
return HTTPOk(json=job_status)
@sd.provider_job_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderJobEndpoint(), response_schemas=sd.delete_prov_job_responses)
@sd.job_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobEndpoint(), response_schemas=sd.delete_job_responses)
@sd.process_job_service.delete(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS, sd.TAG_DISMISS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.DeleteProcessJobEndpoint(), response_schemas=sd.delete_job_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def cancel_job(request):
"""
Dismiss a job.
Note: Will only stop tracking this particular process (WPS 1.0 doesn't allow to stop a process)
"""
job = get_job(request)
app.control.revoke(job.task_id, terminate=True)
store = get_db(request).get_store(StoreJobs)
job.status_message = "Job dismissed."
job.status = status.map_status(status.STATUS_DISMISSED)
store.update_job(job)
return HTTPOk(json={
"jobID": job.id,
"status": job.status,
"message": job.status_message,
"percentCompleted": job.progress,
})
@sd.provider_inputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderInputsEndpoint(), response_schemas=sd.get_prov_inputs_responses)
@sd.process_inputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessInputsEndpoint(), response_schemas=sd.get_job_inputs_responses)
@sd.job_inputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobInputsEndpoint(), response_schemas=sd.get_job_inputs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
@sd.provider_outputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderOutputsEndpoint(), response_schemas=sd.get_prov_outputs_responses)
@sd.process_outputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessOutputsEndpoint(), response_schemas=sd.get_job_outputs_responses)
@sd.job_outputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobOutputsEndpoint(), response_schemas=sd.get_job_outputs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_outputs(request):
# type: (Request) -> HTTPException
"""
Retrieve the outputs of a job.
"""
job = get_job(request)
outputs = {"outputs": get_results(job, request)}
outputs.update(job.links(request, self_link="outputs"))
outputs = sd.JobOutputsSchema().deserialize(outputs)
return HTTPOk(json=outputs)
@sd.provider_results_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderResultsEndpoint(), response_schemas=sd.get_prov_results_responses)
@sd.process_results_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessResultsEndpoint(), response_schemas=sd.get_job_results_responses)
@sd.job_results_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobResultsEndpoint(), response_schemas=sd.get_job_results_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_results(request):
# type: (Request) -> HTTPException
"""
Retrieve the results of a job.
"""
job = get_job(request)
job_status = status.map_status(job.status)
if job_status in status.JOB_STATUS_CATEGORIES[status.STATUS_CATEGORY_RUNNING]:
raise HTTPNotFound(json={
"code": "ResultsNotReady",
"description": "Job status is '{}'. Results are not yet available.".format(job_status)
})
results = get_results(job, request, value_key="value", ogc_api=True)
results = sd.Result().deserialize(results)
return HTTPOk(json=results)
@sd.provider_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROVIDERS],
renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderExceptionsEndpoint(),
response_schemas=sd.get_prov_exceptions_responses)
@sd.job_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses)
@sd.process_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_exceptions(request):
"""
Retrieve the exceptions of a job.
"""
job = get_job(request)
exceptions = sd.JobExceptionsSchema().deserialize(job.exceptions)
return HTTPOk(json=exceptions)
@sd.provider_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderLogsEndpoint(), response_schemas=sd.get_prov_logs_responses)
@sd.job_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobLogsEndpoint(), response_schemas=sd.get_logs_responses)
@sd.process_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessLogsEndpoint(), response_schemas=sd.get_logs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_logs(request):
"""
Retrieve the logs of a job.
"""
job = get_job(request)
logs = sd.JobLogsSchema().deserialize(job.logs)
return HTTPOk(json=logs)
@sd.provider_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS, sd.TAG_DEPRECATED],
renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderResultEndpoint(),
response_schemas=sd.get_result_redirect_responses)
@sd.process_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES, sd.TAG_DEPRECATED],
renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessResultEndpoint(),
response_schemas=sd.get_result_redirect_responses)
@sd.job_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_DEPRECATED],
renderer=OUTPUT_FORMAT_JSON, schema=sd.JobResultEndpoint(),
response_schemas=sd.get_result_redirect_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def redirect_job_result(request):
"""
Deprecated job result endpoint that is now returned by corresponding outputs path with added links.
"""
location = request.url.rsplit("/", 1)[0] + "/outputs"
LOGGER.warning("Deprecated route redirection [%s] -> [%s]", request.url, location)
return HTTPPermanentRedirect(comment="deprecated", location=location)