Source code for weaver.wps_restapi.jobs.utils

import math
import os
import shutil
from copy import deepcopy
from typing import TYPE_CHECKING

from celery.utils.log import get_task_logger
from pyramid.httpexceptions import (
    HTTPBadRequest,
    HTTPCreated,
    HTTPNoContent,
    HTTPNotFound,
    HTTPNotImplemented,
    HTTPOk,
    HTTPUnauthorized
)
from pyramid.response import FileResponse
from pyramid_celery import celery_app

from weaver.database import get_db
from weaver.datatype import Job
from weaver.exceptions import (
    InvalidIdentifierValue,
    JobGone,
    JobInvalidParameter,
    JobNotFound,
    ProcessNotAccessible,
    ProcessNotFound,
    ServiceNotAccessible,
    ServiceNotFound
)
from weaver.execute import ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType, get_format
from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound
from weaver.processes.convert import any2wps_literal_datatype, convert_output_params_schema, get_field
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.store.base import StoreJobs, StoreProcesses, StoreServices
from weaver.utils import (
    get_any_id,
    get_any_value,
    get_file_headers,
    get_header,
    get_path_kvp,
    get_settings,
    get_weaver_url,
    is_uuid
)
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.constants import JobInputsOutputsSchema
from weaver.wps_restapi.providers.utils import forbid_local_only

if TYPE_CHECKING:
    from typing import Dict, List, Optional, Tuple, Union

    from weaver.typedefs import (
        AnyHeadersContainer,
        AnyRequestType,
        AnyResponseType,
        AnySettingsContainer,
        AnyUUID,
        AnyValueType,
        ExecutionResultArray,
        ExecutionResultObject,
        ExecutionResults,
        HeadersTupleType,
        JSON,
        PyramidRequest,
        SettingsType
    )
    from weaver.wps_restapi.constants import JobInputsOutputsSchemaType

[docs]LOGGER = get_task_logger(__name__)
[docs]def get_job(request): # type: (PyramidRequest) -> Job """ Obtain a job from request parameters. :returns: Job information if found. :raise HTTPNotFound: with JSON body details on missing/non-matching job, process, provider IDs. """ job_id = request.matchdict.get("job_id") try: if not is_uuid(job_id): raise JobInvalidParameter store = get_db(request).get_store(StoreJobs) job = store.fetch_by_id(job_id) except (JobInvalidParameter, JobNotFound) as exc: exception = type(exc) if exception is JobInvalidParameter: desc = "Invalid job reference is not a valid UUID." else: desc = "Could not find job with specified reference." title = "NoSuchJob" raise exception( # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job json={ "title": title, "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", "detail": desc, "status": exception.code, "cause": str(job_id) }, code=title, locator="JobID", description=desc # old format ) provider_id = request.matchdict.get("provider_id", job.service) process_id = request.matchdict.get("process_id", job.process) if provider_id: forbid_local_only(request) if job.service != provider_id: title = "NoSuchProvider" desc = "Could not find job reference corresponding to specified provider reference." raise OWSNotFound( # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job json={ "title": title, "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", "detail": desc, "status": OWSNotFound.code, "cause": str(process_id) }, code=title, locator="provider", description=desc # old format ) if job.process != process_id: title = "NoSuchProcess" desc = "Could not find job reference corresponding to specified process reference." raise OWSNotFound( # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job # note: although 'no-such-process' error, return 'no-such-job' because process could exist, only mismatches json={ "title": title, "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", "detail": desc, "status": OWSNotFound.code, "cause": str(process_id) }, code=title, locator="process", description=desc # old format ) return job
[docs]def get_schema_query(schema, strict=True): # type: (Optional[JobInputsOutputsSchemaType], bool) -> Optional[JobInputsOutputsSchemaType] if not schema: return None # unescape query (eg: "OGC+strict" becomes "OGC string" from URL parsing) schema_checked = str(schema).replace(" ", "+").lower() if JobInputsOutputsSchema.get(schema_checked) is None: raise HTTPBadRequest(json={ "type": "InvalidParameterValue", "detail": "Query parameter 'schema' value is invalid.", "status": HTTPBadRequest.code, "locator": "query", "value": str(schema), }) if not strict: return schema_checked.split("+", 1)[0] return schema_checked
[docs]def get_results(job, # type: Job container, # type: AnySettingsContainer value_key=None, # type: Optional[str] schema=JobInputsOutputsSchema.OLD, # type: JobInputsOutputsSchemaType link_references=False, # type: bool ): # type: (...) -> Tuple[ExecutionResults, HeadersTupleType] """ Obtains the job results with extended full WPS output URL as applicable and according to configuration settings. :param job: job from which to retrieve results. :param container: any container giving access to instance settings (to resolve reference output location). :param value_key: If not specified, the returned values will have the appropriate ``data``/``href`` key according to the content. Otherwise, all values will have the specified key. :param schema: Selects which schema to employ for representing the output results (listing or mapping). :param link_references: If enabled, an output that was requested by reference instead of value will be returned as ``Link`` reference. :returns: Tuple with: - List or mapping of all outputs each with minimally an ID and value under the requested key. - List of ``Link`` headers for reference outputs when requested. Empty otherwise. """ settings = get_settings(container) wps_url = get_wps_output_url(settings) if not wps_url.endswith("/"): wps_url = wps_url + "/" schema = JobInputsOutputsSchema.get(str(schema).lower(), default=JobInputsOutputsSchema.OLD) strict = schema.endswith("+strict") schema = schema.split("+")[0] ogc_api = schema == JobInputsOutputsSchema.OGC outputs = {} if ogc_api else [] fmt_key = "mediaType" if ogc_api else "mimeType" out_ref = convert_output_params_schema(job.outputs, JobInputsOutputsSchema.OGC) if link_references else {} references = {} for result in job.results: rtype = "data" if any(k in result for k in ["data", "value"]) else "href" value = get_any_value(result) out_key = rtype out_id = get_any_id(result) out_mode = out_ref.get(out_id, {}).get("transmissionMode") as_ref = link_references and out_mode == ExecuteTransmissionMode.REFERENCE if rtype == "href": # fix paths relative to instance endpoint, but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.) if value.startswith("/"): value = str(value).lstrip("/") if "://" not in value: value = wps_url + value elif ogc_api: out_key = "value" elif value_key: out_key = value_key output = {out_key: value} if rtype == "href": # required for the rest to be there, other fields optional if "mimeType" not in result: result["mimeType"] = get_format(value, default=ContentType.TEXT_PLAIN).mime_type if ogc_api or not strict: output["type"] = result["mimeType"] if not ogc_api or not strict or as_ref: output["format"] = {fmt_key: result["mimeType"]} for field in ["encoding", "schema"]: if field in result: output["format"][field] = result[field] elif rtype != "href": # literal data # FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51) dtype = result.get("dataType", any2wps_literal_datatype(value, is_value=True) or "string") if ogc_api: output["dataType"] = {"name": dtype} else: output["dataType"] = dtype if ogc_api or as_ref: mapping = references if as_ref else outputs if out_id in mapping: output_list = mapping[out_id] if not isinstance(output_list, list): output_list = [output_list] output_list.append(output) mapping[out_id] = output_list else: mapping[out_id] = output else: # if ordered insert supported by python version, insert ID first output = dict([("id", out_id)] + list(output.items())) # noqa outputs.append(output) # needed to collect and aggregate outputs of same ID first in case of array # convert any requested link references using indices if needed headers = [] for out_id, output in references.items(): res_links = make_result_link(out_id, output, job.id, settings) headers.extend([("Link", link) for link in res_links]) return outputs, headers
[docs]def get_job_results_response(job, container, headers=None): # type: (Job, AnySettingsContainer, Optional[AnyHeadersContainer]) -> AnyResponseType """ Generates the :term:`OGC` compliant :term:`Job` results response according to submitted execution parameters. Parameters that impact the format of the response are: - Amount of outputs to be returned. - Parameter ``response: raw|document`` - Parameter ``transmissionMode: value|reference`` per output if ``response: raw``. .. seealso:: More details available for each combination: - https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 :param job: Job for which to generate the results response. :param container: Application settings. :param headers: Additional headers to provide in the response. """ raise_job_dismissed(job, container) raise_job_bad_status(job, container) # when 'response=document', ignore 'transmissionMode=value|reference', respect it when 'response=raw' # See: # - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 (/req/core/job-results-async-document) # - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-document is_raw = job.execution_response == ExecuteResponse.RAW results, refs = get_results(job, container, value_key="value", schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details link_references=is_raw) # type: Union[ExecutionResults, HeadersTupleType] headers = headers or {} if "location" not in headers: headers["Location"] = job.status_url(container) if not is_raw: # note: # Cannot add "links" field in response body because variable Output ID keys are directly at the root # Possible conflict with an output that would be named "links". results = sd.Result().deserialize(results) return HTTPOk(json=results, headers=headers) if not results: # avoid schema validation error if all by reference # Status code 204 for empty body # see: # - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-ref refs.extend(headers.items()) return HTTPNoContent(headers=refs) # raw response can be only data value, only link or a mix of them if results: # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-one out_info = list(results.items())[0][-1] out_type = get_any_value(out_info, key=True) out_data = get_any_value(out_info) # FIXME: https://github.com/crim-ca/weaver/issues/376 # implement multipart, both for multi-output IDs and array-output under same ID if len(results) > 1 or (isinstance(out_data, list) and len(out_data) > 1): # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-multi raise HTTPNotImplemented(json={ "code": "NotImplemented", "type": "NotImplemented", "detail": "Multipart results with 'transmissionMode=value' and 'response=raw' not implemented.", }) # single value only out_data = out_data[0] if isinstance(out_data, list) else out_data if out_type == "href": out_path = map_wps_output_location(out_data, container, exists=True, url=False) out_type = out_info.get("type") # noqa out_headers = get_file_headers(out_path, download_headers=True, content_headers=True, content_type=out_type) resp = FileResponse(out_path) resp.headers.update(out_headers) resp.headers.update(headers) else: resp = HTTPOk(body=out_data, charset="UTF-8", content_type=ContentType.TEXT_PLAIN, headers=headers) else: resp = HTTPOk(headers=headers) if refs: # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-ref # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-mixed-multi resp.headerlist.extend(refs) return resp
[docs]def get_job_submission_response(body, headers, error=False): # type: (JSON, AnyHeadersContainer, bool) -> Union[HTTPOk, HTTPCreated] """ Generates the successful response from contents returned by :term:`Job` submission process. If :term:`Job` already finished processing within requested ``Prefer: wait=X`` seconds delay (and if allowed by the :term:`Process` ``jobControlOptions``), return the successful status immediately instead of created status. Otherwise, return the status monitoring location of the created :term:`Job` to be monitored asynchronously. .. seealso:: :func:`weaver.processes.execution.submit_job` :func:`weaver.processes.execution.submit_job_handler` """ status = map_status(body.get("status")) location = get_header("location", headers) if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: if error: http_class = HTTPBadRequest http_desc = sd.FailedSyncJobResponse.description else: http_class = HTTPOk http_desc = sd.CompletedJobResponse.description body = sd.CompletedJobStatusSchema().deserialize(body) body["description"] = http_desc return http_class(location=location, json=body, headers=headers) body["description"] = sd.CreatedLaunchJobResponse.description body = sd.CreatedJobStatusSchema().deserialize(body) return HTTPCreated(location=location, json=body, headers=headers)
[docs]def validate_service_process(request): # type: (PyramidRequest) -> Tuple[Optional[str], Optional[str]] """ Verifies that service or process specified by path or query will raise the appropriate error if applicable. """ service_name = ( request.matchdict.get("provider_id", None) or request.params.get("provider", None) or request.params.get("service", None) # backward compatibility ) process_name = ( request.matchdict.get("process_id", None) or request.params.get("process", None) or request.params.get("processID", None) # OGC-API conformance ) item_test = None item_type = None try: service = None if service_name: forbid_local_only(request) item_type = "Service" item_test = service_name store = get_db(request).get_store(StoreServices) service = store.fetch_by_name(service_name, visibility=Visibility.PUBLIC) if process_name: item_type = "Process" item_test = process_name # local process if not service: store = get_db(request).get_store(StoreProcesses) store.fetch_by_id(process_name, visibility=Visibility.PUBLIC) # remote process else: processes = service.processes(request) if process_name not in [p.id for p in processes]: raise ProcessNotFound except (ServiceNotFound, ProcessNotFound): raise HTTPNotFound(json={ "code": f"NoSuch{item_type}", "description": f"{item_type} reference '{item_test}' cannot be found." }) except (ServiceNotAccessible, ProcessNotAccessible): raise HTTPUnauthorized(json={ "code": f"Unauthorized{item_type}", "description": f"{item_type} reference '{item_test}' is not accessible." }) except InvalidIdentifierValue as ex: raise HTTPBadRequest(json={ "code": InvalidIdentifierValue.__name__, "description": str(ex) }) return service_name, process_name
[docs]def raise_job_bad_status(job, container=None): # type: (Job, Optional[AnySettingsContainer]) -> None """ Raise the appropriate message for :term:`Job` not ready or unable to retrieve output results due to status. """ if job.status != Status.SUCCEEDED: links = job.links(container=container) if job.status == Status.FAILED: err_code = None err_info = None err_known_modules = [ "pywps.exceptions", "owslib.wps", "weaver.exceptions", "weaver.owsexceptions", ] # try to infer the cause, fallback to generic error otherwise for error in job.exceptions: try: if isinstance(error, dict): err_code = error.get("Code") err_info = error.get("Text") elif isinstance(error, str) and any(error.startswith(mod) for mod in err_known_modules): err_code, err_info = error.split(":", 1) err_code = err_code.split(".")[-1].strip() err_info = err_info.strip() except Exception: err_code = None if err_code: break if not err_code: # default err_code = OWSNoApplicableCode.code err_info = "unknown" # /req/core/job-results-failed raise HTTPBadRequest(json={ "title": "JobResultsFailed", "type": err_code, "detail": "Job results not available because execution failed.", "status": HTTPBadRequest.code, "cause": err_info, "links": links }) # /req/core/job-results-exception/results-not-ready # must use OWS instead of HTTP class to preserve provided JSON body # otherwise, pyramid considers it as not found view/path and rewrites contents in append slash handler raise OWSNotFound(json={ "title": "JobResultsNotReady", "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready", "detail": "Job is not ready to obtain results.", "status": HTTPNotFound.code, "cause": {"status": job.status}, "links": links
})
[docs]def raise_job_dismissed(job, container=None): # type: (Job, Optional[AnySettingsContainer]) -> None """ Raise the appropriate messages for dismissed :term:`Job` status. """ if job.status == Status.DISMISSED: # provide the job status links since it is still available for reference settings = get_settings(container) job_links = job.links(settings) job_links = [link for link in job_links if link["rel"] in ["status", "alternate", "collection", "up"]] raise JobGone( json={ "title": "JobDismissed", "type": "JobDismissed", "status": JobGone.code, "detail": "Job was dismissed and artifacts have been removed.", "cause": {"status": job.status}, "value": str(job.id), "links": job_links
} )
[docs]def dismiss_job_task(job, container): # type: (Job, AnySettingsContainer) -> Job """ Cancels any pending or running :mod:`Celery` task and removes completed job artifacts. .. note:: The :term:`Job` object itself is not deleted, only its artifacts. Therefore, its inputs, outputs, logs, exceptions, etc. are still available in the database, but corresponding files that would be exposed by ``weaver.wps_output`` configurations are removed. :param job: Job to cancel or cleanup. :param container: Application settings. :return: Updated and dismissed job. """ raise_job_dismissed(job, container) if job.status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: # signal to stop celery task. Up to it to terminate remote if any. LOGGER.debug("Job [%s] dismiss operation: Canceling task [%s]", job.id, job.task_id) celery_app.control.revoke(job.task_id, terminate=True) wps_out_dir = get_wps_output_dir(container) job_out_dir = os.path.join(wps_out_dir, str(job.id)) job_out_log = os.path.join(wps_out_dir, str(job.id) + ".log") job_out_xml = os.path.join(wps_out_dir, str(job.id) + ".xml") if os.path.isdir(job_out_dir): LOGGER.debug("Job [%s] dismiss operation: Removing output results.", job.id) shutil.rmtree(job_out_dir, onerror=lambda func, path, _exc: LOGGER.warning( "Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_dir, _exc )) if os.path.isfile(job_out_log): LOGGER.debug("Job [%s] dismiss operation: Removing output logs.", job.id) try: os.remove(job_out_log) except OSError as exc: LOGGER.warning("Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_log, exc) if os.path.isfile(job_out_xml): LOGGER.debug("Job [%s] dismiss operation: Removing output WPS status.", job.id) try: os.remove(job_out_xml) except OSError as exc: LOGGER.warning("Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_xml, exc) LOGGER.debug("Job [%s] dismiss operation: Updating job status.") store = get_db(container).get_store(StoreJobs) job.status_message = f"Job {Status.DISMISSED}." job.status = map_status(Status.DISMISSED) job = store.update_job(job) return job