import math
from copy import deepcopy
from typing import TYPE_CHECKING
from celery.utils.log import get_task_logger
from colander import Invalid
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPNotFound,
HTTPOk,
HTTPPermanentRedirect,
HTTPUnauthorized,
HTTPUnprocessableEntity
)
from pyramid.request import Request
from pyramid.settings import asbool
from pyramid_celery import celery_app as app
from notify import encrypt_email
from weaver import status
from weaver.database import get_db
from weaver.datatype import Job
from weaver.exceptions import (
InvalidIdentifierValue,
JobNotFound,
ProcessNotAccessible,
ProcessNotFound,
ServiceNotAccessible,
ServiceNotFound,
log_unhandled_exceptions
)
from weaver.formats import CONTENT_TYPE_APP_JSON, CONTENT_TYPE_TEXT_PLAIN, OUTPUT_FORMAT_JSON, get_format
from weaver.owsexceptions import OWSNotFound
from weaver.processes.convert import any2wps_literal_datatype
from weaver.store.base import StoreJobs, StoreProcesses, StoreServices
from weaver.utils import get_any_id, get_any_value, get_path_kvp, get_settings, get_weaver_url
from weaver.visibility import VISIBILITY_PUBLIC
from weaver.wps.utils import get_wps_output_url
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.providers.utils import forbid_local_only
from weaver.wps_restapi.swagger_definitions import datetime_interval_parser
if TYPE_CHECKING:
from typing import Dict, List, Optional, Tuple, Union
from pyramid.httpexceptions import HTTPException
from weaver.typedefs import AnySettingsContainer, AnyValue, JSON
[docs]LOGGER = get_task_logger(__name__)
[docs]def get_job(request):
# type: (Request) -> Job
"""
Obtain a job from request parameters.
:returns: Job information if found.
:raise HTTPNotFound: with JSON body details on missing/non-matching job, process, provider IDs.
"""
job_id = request.matchdict.get("job_id")
store = get_db(request).get_store(StoreJobs)
try:
job = store.fetch_by_id(job_id)
except JobNotFound:
raise OWSNotFound(code="NoSuchJob", locator="JobID", description="Could not find job with specified 'job_id'.")
provider_id = request.matchdict.get("provider_id", job.service)
process_id = request.matchdict.get("process_id", job.process)
if provider_id:
forbid_local_only(request)
if job.service != provider_id:
raise OWSNotFound(
code="NoSuchProvider",
locator="provider",
description="Could not find job corresponding to specified 'provider_id'."
)
if job.process != process_id:
raise OWSNotFound(
code="NoSuchProcess",
locator="process",
description="Could not find job corresponding to specified 'process_id'."
)
return job
[docs]def get_job_list_links(job_total, filters, request):
# type: (int, Dict[str, AnyValue], Request) -> List[JSON]
"""
Obtains a list of all relevant links for the corresponding job listing defined by query parameter filters.
"""
base_url = get_weaver_url(request)
# reapply queries that must be given to obtain the same result in case of subsequent requests (sort, limits, etc.)
kvp_params = {param: value for param, value in request.params.items() if param != "page"}
# patch datetime that have some extra character manipulation (reapply '+' auto-converted to ' ' by params parser)
if "datetime" in kvp_params:
kvp_params["datetime"] = kvp_params["datetime"].replace(" ", "+")
alt_kvp = deepcopy(kvp_params)
# request job uses general endpoint, obtain the full path if any service/process was given as alternate location
if request.path.startswith(sd.jobs_service.path):
job_path = base_url + sd.jobs_service.path
alt_path = None
parent_url = None
# cannot generate full path apply for 'service' by itself
if filters["process"] and filters["service"]:
alt_path = base_url + sd.provider_jobs_service.path.format(
provider_id=filters["service"], process_id=filters["process"]
)
parent_url = alt_path.rsplit("/", 1)[0]
elif filters["process"]:
alt_path = base_url + sd.process_jobs_service.path.format(process_id=filters["process"])
parent_url = alt_path.rsplit("/", 1)[0]
for param in ["service", "provider", "process"]:
alt_kvp.pop(param, None)
# path is whichever specific service/process endpoint, jobs are pre-filtered by them
# transform sub-endpoints into matching query parameters and use generic path as alternate location
else:
job_path = base_url + request.path
alt_path = base_url + sd.jobs_service.path
alt_kvp["process"] = filters["process"]
if filters["service"]:
alt_kvp["provider"] = filters["service"]
parent_url = job_path.rsplit("/", 1)[0]
cur_page = filters["page"]
per_page = filters["limit"]
max_page = math.ceil(job_total / per_page) - 1
alt_links = []
if alt_path:
alt_links = [{
"href": get_path_kvp(alt_path, page=cur_page, **alt_kvp), "rel": "alternate",
"type": CONTENT_TYPE_APP_JSON, "title": "Alternate endpoint with equivalent set of filtered jobs."
}]
links = alt_links + [
{"href": job_path, "rel": "collection",
"type": CONTENT_TYPE_APP_JSON, "title": "Complete job listing (no filtering queries applied)."},
{"href": base_url + sd.jobs_service.path, "rel": "search",
"type": CONTENT_TYPE_APP_JSON, "title": "Generic query endpoint to search for jobs."},
{"href": job_path + "?detail=false", "rel": "preview",
"type": CONTENT_TYPE_APP_JSON, "title": "Job listing summary (UUID and count only)."},
{"href": job_path, "rel": "http://www.opengis.net/def/rel/ogc/1.0/job-list",
"type": CONTENT_TYPE_APP_JSON, "title": "List of registered jobs."},
{"href": get_path_kvp(job_path, page=cur_page, **kvp_params), "rel": "current",
"type": CONTENT_TYPE_APP_JSON, "title": "Current page of job query listing."},
{"href": get_path_kvp(job_path, page=0, **kvp_params), "rel": "first",
"type": CONTENT_TYPE_APP_JSON, "title": "First page of job query listing."},
{"href": get_path_kvp(job_path, page=max_page, **kvp_params), "rel": "last",
"type": CONTENT_TYPE_APP_JSON, "title": "Last page of job query listing."},
]
if cur_page > 0:
links.append({
"href": get_path_kvp(job_path, page=cur_page - 1, **kvp_params), "rel": "prev",
"type": CONTENT_TYPE_APP_JSON, "title": "Previous page of job query listing."
})
if cur_page < max_page:
links.append({
"href": get_path_kvp(job_path, page=cur_page + 1, **kvp_params), "rel": "next",
"type": CONTENT_TYPE_APP_JSON, "title": "Next page of job query listing."
})
if parent_url:
links.append({
"href": parent_url, "rel": "up",
"type": CONTENT_TYPE_APP_JSON, "title": "Parent collection for which listed jobs apply."
})
return links
[docs]def get_results(job, container, value_key=None, ogc_api=False):
# type: (Job, AnySettingsContainer, Optional[str], bool) -> Union[List[JSON], JSON]
"""
Obtains the job results with extended full WPS output URL as applicable and according to configuration settings.
:param job: job from which to retrieve results.
:param container: any container giving access to instance settings (to resolve reference output location).
:param value_key:
If not specified, the returned values will have the appropriate ``data``/``href`` key according to the content.
Otherwise, all values will have the specified key.
:param ogc_api:
If ``True``, formats the results using the ``OGC-API - Processes`` format.
:returns: list of all outputs each with minimally an ID and value under the requested key.
"""
wps_url = get_wps_output_url(container)
if not wps_url.endswith("/"):
wps_url = wps_url + "/"
outputs = {} if ogc_api else []
fmt_key = "mediaType" if ogc_api else "mimeType"
for result in job.results:
rtype = "data" if any(k in result for k in ["data", "value"]) else "href"
value = get_any_value(result)
out_id = get_any_id(result)
out_key = rtype
if rtype == "href":
# fix paths relative to instance endpoint, but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.)
if value.startswith("/"):
value = str(value).lstrip("/")
if "://" not in value:
value = wps_url + value
elif ogc_api:
out_key = "value"
elif value_key:
out_key = value_key
output = {out_key: value}
if rtype == "href": # required for the rest to be there, other fields optional
if "mimeType" not in result:
result["mimeType"] = get_format(value, default=CONTENT_TYPE_TEXT_PLAIN).mime_type
output["format"] = {fmt_key: result["mimeType"]}
for field in ["encoding", "schema"]:
if field in result:
output["format"][field] = result[field]
elif rtype != "href":
# literal data
# FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51)
dtype = result.get("dataType", any2wps_literal_datatype(value, is_value=True) or "string")
if ogc_api:
output["dataType"] = {"name": dtype}
else:
output["dataType"] = dtype
if ogc_api:
if out_id in outputs:
output_list = outputs[out_id]
if not isinstance(output_list, list):
output_list = [output_list]
output_list.append(output)
outputs[out_id] = output_list
else:
outputs[out_id] = output
else:
# if ordered insert supported by python version, insert ID first
output = dict([("id", out_id)] + list(output.items())) # noqa
outputs.append(output)
return outputs
[docs]def validate_service_process(request):
# type: (Request) -> Tuple[Optional[str], Optional[str]]
"""
Verifies that service or process specified by path or query will raise the appropriate error if applicable.
"""
service_name = request.matchdict.get("provider_id", None) or request.params.get("service", None)
process_name = request.matchdict.get("process_id", None) or request.params.get("process", None)
item_test = None
item_type = None
try:
service = None
if service_name:
forbid_local_only(request)
item_type = "Service"
item_test = service_name
store = get_db(request).get_store(StoreServices)
service = store.fetch_by_name(service_name, visibility=VISIBILITY_PUBLIC)
if process_name:
item_type = "Process"
item_test = process_name
# local process
if not service:
store = get_db(request).get_store(StoreProcesses)
store.fetch_by_id(process_name, visibility=VISIBILITY_PUBLIC)
# remote process
else:
processes = service.processes(request)
if process_name not in [p.id for p in processes]:
raise ProcessNotFound
except (ServiceNotFound, ProcessNotFound):
raise HTTPNotFound(json={
"code": "NoSuch{}".format(item_type),
"description": "{} of id '{}' cannot be found.".format(item_type, item_test)
})
except (ServiceNotAccessible, ProcessNotAccessible):
raise HTTPUnauthorized(json={
"code": "Unauthorized{}".format(item_type),
"description": "{} of id '{}' is not accessible.".format(item_type, item_test)
})
except InvalidIdentifierValue as ex:
raise HTTPBadRequest(json={
"code": InvalidIdentifierValue.__name__,
"description": str(ex)
})
return service_name, process_name
@sd.provider_jobs_service.get(tags=[sd.TAG_JOBS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetProviderJobsEndpoint(), response_schemas=sd.get_prov_all_jobs_responses)
@sd.process_jobs_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetProcessJobsEndpoint(), response_schemas=sd.get_all_jobs_responses)
@sd.jobs_service.get(tags=[sd.TAG_JOBS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetJobsEndpoint(), response_schemas=sd.get_all_jobs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_queried_jobs(request):
"""
Retrieve the list of jobs which can be filtered, sorted, paged and categorized using query parameters.
"""
settings = get_settings(request)
service, process = validate_service_process(request)
if service:
forbid_local_only(settings)
filters = {**request.params, "process": process, "provider": service}
filters["detail"] = asbool(request.params.get("detail"))
if request.params.get("datetime", False):
# replace white space with '+' since request.params replaces '+' with whitespaces when parsing
filters["datetime"] = request.params["datetime"].replace(" ", "+")
try:
filters = sd.GetJobsQueries().deserialize(filters)
except Invalid as ex:
raise HTTPUnprocessableEntity(json={
"code": Invalid.__name__,
"description": str(ex)
})
detail = filters.pop("detail", False)
groups = filters.pop("groups", "").split(",") if filters.get("groups", False) else filters.pop("groups", None)
filters["tags"] = list(filter(lambda s: s, filters["tags"].split(",") if filters.get("tags", False) else ""))
filters["notification_email"] = (
encrypt_email(filters["notification_email"], settings)
if filters.get("notification_email", False) else None
)
filters["datetime"] = datetime_interval_parser(filters["datetime"]) if filters.get("datetime", False) else None
filters["service"] = filters.pop("provider", None)
if (
filters["datetime"]
and filters["datetime"].get("before", False)
and filters["datetime"].get("after", False)
and filters["datetime"]["after"] > filters["datetime"]["before"]
):
raise HTTPUnprocessableEntity(json={
"code": "InvalidDateFormat",
"description": "Datetime at the start of the interval must be less than the datetime at the end."
})
store = get_db(request).get_store(StoreJobs)
items, total = store.find_jobs(request=request, group_by=groups, **filters)
body = {"total": total}
def _job_list(jobs):
return [j.json(settings) if detail else j.id for j in jobs]
if groups:
for grouped_jobs in items:
grouped_jobs["jobs"] = _job_list(grouped_jobs["jobs"])
body.update({"groups": items})
else:
body.update({"jobs": _job_list(items), "page": filters["page"], "limit": filters["limit"]})
body.update({"links": get_job_list_links(total, filters, request)})
body = sd.GetQueriedJobsSchema().deserialize(body)
return HTTPOk(json=body)
@sd.provider_job_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderJobEndpoint(), response_schemas=sd.get_prov_single_job_status_responses)
@sd.process_job_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS, sd.TAG_STATUS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.GetProcessJobEndpoint(), response_schemas=sd.get_single_job_status_responses)
@sd.job_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATUS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobEndpoint(), response_schemas=sd.get_single_job_status_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_status(request):
"""
Retrieve the status of a job.
"""
job = get_job(request)
job_status = job.json(request, self_link="status")
return HTTPOk(json=job_status)
[docs]def cancel_job_task(job, container):
# type: (Job, AnySettingsContainer) -> Job
app.control.revoke(job.task_id, terminate=True) # signal to stop celery task. Up to it to terminate remote if any.
store = get_db(container).get_store(StoreJobs)
job.status_message = "Job dismissed."
job.status = status.map_status(status.STATUS_DISMISSED)
job = store.update_job(job)
return job
@sd.provider_job_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderJobEndpoint(), response_schemas=sd.delete_prov_job_responses)
@sd.process_job_service.delete(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS, sd.TAG_DISMISS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.DeleteProcessJobEndpoint(), response_schemas=sd.delete_job_responses)
@sd.job_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobEndpoint(), response_schemas=sd.delete_job_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def cancel_job(request):
"""
Dismiss a job.
Note:
Will only stop tracking this particular process execution when not supported by underlying provider
services such as WPS 1.0. Services supporting cancel operation could attempt to terminate remote jobs.
"""
job = get_job(request)
job = cancel_job_task(job, request)
return HTTPOk(json={
"jobID": job.id,
"status": job.status,
"message": job.status_message,
"percentCompleted": job.progress,
})
@sd.jobs_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.DeleteJobsEndpoint(), response_schemas=sd.delete_jobs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def cancel_job_batch(request):
"""
Dismiss multiple jobs.
Note:
Will only stop tracking jobs when underlying remote provider services do not support cancel operation.
"""
try:
body = sd.DeleteJobsBodySchema().deserialize(request.json)
jobs = body["jobs"]
except Invalid as exc:
raise HTTPUnprocessableEntity(json={"code": Invalid.__name__, "description": str(exc)})
except Exception as exc:
raise HTTPBadRequest(json={"code": "Could not parse request body.", "description": str(exc)})
store = get_db(request).get_store(StoreJobs)
found_jobs = []
for job_id in jobs:
try:
job = store.fetch_by_id(job_id)
except JobNotFound:
continue
found_jobs.append(job.id)
cancel_job_task(job, request)
body = sd.BatchDismissJobsBodySchema().deserialize({"jobs": found_jobs})
return HTTPOk(json=body)
@sd.provider_inputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderInputsEndpoint(), response_schemas=sd.get_prov_inputs_responses)
@sd.process_inputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessInputsEndpoint(), response_schemas=sd.get_job_inputs_responses)
@sd.job_inputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobInputsEndpoint(), response_schemas=sd.get_job_inputs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
@sd.provider_outputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderOutputsEndpoint(), response_schemas=sd.get_prov_outputs_responses)
@sd.process_outputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessOutputsEndpoint(), response_schemas=sd.get_job_outputs_responses)
@sd.job_outputs_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobOutputsEndpoint(), response_schemas=sd.get_job_outputs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_outputs(request):
# type: (Request) -> HTTPException
"""
Retrieve the outputs of a job.
"""
job = get_job(request)
outputs = {"outputs": get_results(job, request)}
outputs.update(job.links(request, self_link="outputs"))
outputs = sd.JobOutputsSchema().deserialize(outputs)
return HTTPOk(json=outputs)
@sd.provider_results_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderResultsEndpoint(), response_schemas=sd.get_prov_results_responses)
@sd.process_results_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessResultsEndpoint(), response_schemas=sd.get_job_results_responses)
@sd.job_results_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobResultsEndpoint(), response_schemas=sd.get_job_results_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_results(request):
# type: (Request) -> HTTPException
"""
Retrieve the results of a job.
"""
job = get_job(request)
job_status = status.map_status(job.status)
if job_status in status.JOB_STATUS_CATEGORIES[status.STATUS_CATEGORY_RUNNING]:
raise HTTPNotFound(json={
"code": "ResultsNotReady",
"description": "Job status is '{}'. Results are not yet available.".format(job_status)
})
results = get_results(job, request, value_key="value", ogc_api=True)
# note: cannot add links in this case because variable OutputID keys are directly at the root
results = sd.Result().deserialize(results)
return HTTPOk(json=results)
@sd.provider_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROVIDERS],
renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderExceptionsEndpoint(),
response_schemas=sd.get_prov_exceptions_responses)
@sd.job_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses)
@sd.process_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_exceptions(request):
"""
Retrieve the exceptions of a job.
"""
job = get_job(request)
exceptions = sd.JobExceptionsSchema().deserialize(job.exceptions)
return HTTPOk(json=exceptions)
@sd.provider_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProviderLogsEndpoint(), response_schemas=sd.get_prov_logs_responses)
@sd.job_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS], renderer=OUTPUT_FORMAT_JSON,
schema=sd.JobLogsEndpoint(), response_schemas=sd.get_logs_responses)
@sd.process_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS, sd.TAG_PROCESSES], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ProcessLogsEndpoint(), response_schemas=sd.get_logs_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def get_job_logs(request):
"""
Retrieve the logs of a job.
"""
job = get_job(request)
logs = sd.JobLogsSchema().deserialize(job.logs)
return HTTPOk(json=logs)
@sd.provider_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS, sd.TAG_DEPRECATED],
renderer=OUTPUT_FORMAT_JSON, schema=sd.ProviderResultEndpoint(),
response_schemas=sd.get_result_redirect_responses)
@sd.process_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES, sd.TAG_DEPRECATED],
renderer=OUTPUT_FORMAT_JSON, schema=sd.ProcessResultEndpoint(),
response_schemas=sd.get_result_redirect_responses)
@sd.job_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_DEPRECATED],
renderer=OUTPUT_FORMAT_JSON, schema=sd.JobResultEndpoint(),
response_schemas=sd.get_result_redirect_responses)
@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description)
[docs]def redirect_job_result(request):
"""
Deprecated job result endpoint that is now returned by corresponding outputs path with added links.
"""
location = request.url.rsplit("/", 1)[0] + "/outputs"
LOGGER.warning("Deprecated route redirection [%s] -> [%s]", request.url, location)
return HTTPPermanentRedirect(comment="deprecated", location=location)