import logging
import re
from collections import deque
from copy import deepcopy
from typing import TYPE_CHECKING
from urllib.parse import parse_qsl, urlparse
import shapely.wkt
from pyramid.httpexceptions import HTTPOk
from pyramid.settings import asbool
from weaver import xml_util
from weaver.formats import ContentType
from weaver.processes.constants import WPS_LITERAL, OpenSearchField
from weaver.processes.convert import normalize_ordered_io
from weaver.processes.sources import fetch_data_sources
from weaver.processes.utils import get_process_information
from weaver.utils import get_any_id, request_extra
if TYPE_CHECKING:
from typing import Deque, Dict, Iterable, Iterator, List, Optional, Tuple
from weaver.processes.convert import JSON_IO_Type, WPS_Input_Type
from weaver.typedefs import AnySettingsContainer, DataSourceOpenSearch, JSON
[docs]
LOGGER = logging.getLogger("PACKAGE")
[docs]
def alter_payload_after_query(payload):
# type: (JSON) -> JSON
"""
When redeploying the package on :term:`ADES`, strip out any :term:`EOImage` parameter.
"""
new_payload = deepcopy(payload)
proc_desc = get_process_information(new_payload)
inputs = proc_desc["inputs"]
for input_ in inputs:
if EOImageDescribeProcessHandler.is_eoimage_input(input_):
del input_["additionalParameters"]
return new_payload
[docs]
WKT_BBOX_BOUNDS = re.compile(
r"(?P<x1>(\-?\d+(\.\d+)?)),"
r"(?P<y1>(\-?\d+(\.\d+)?)),"
r"(?P<x2>(\-?\d+(\.\d+)?)),"
r"(?P<y2>(\-?\d+(\.\d+)?))"
)
[docs]
def load_wkt_bbox_bounds(wkt):
# type: (str) -> str
bounds = re.match(WKT_BBOX_BOUNDS, wkt)
if bounds:
return ",".join(map(str, map(float, wkt.split(","))))
bounds = shapely.wkt.loads(wkt).bounds
bbox_str = ",".join(map(str, bounds))
return bbox_str
[docs]
def validate_bbox(bbox):
# type: (str) -> None
"""
Validate bounding box formatted as ``x1, y1, x2, y2`` string composed of floating point numbers.
"""
try:
if not len(list(map(float, bbox.split(",")))) == 4:
raise ValueError
except (AttributeError, TypeError, ValueError):
raise ValueError(f"Could not parse bbox as a list of 4 floats: {bbox}")
[docs]
def replace_with_opensearch_scheme(link):
# type: (str) -> str
"""
Replaces ``file://`` scheme by ``opensearch://`` scheme.
"""
scheme = urlparse(link).scheme
if scheme == "file":
link_without_scheme = link[link.find(":"):]
return OpenSearchField.LOCAL_FILE_SCHEME + link_without_scheme
return link
[docs]
class OpenSearchQuery(object):
[docs]
DEFAULT_MAX_QUERY_RESULTS = 5 # usually the default at the OpenSearch server too
def __init__(
self,
collection_identifier, # type: str
osdd_url, # type: str
catalog_search_field="parentIdentifier", # type: str
settings=None, # type: Optional[AnySettingsContainer]
): # type: (...) -> None
"""
Container to handle `OpenSearch` queries.
:param collection_identifier: Collection ID to query
:param osdd_url: Global OSDD url for `OpenSearch` queries.
:param catalog_search_field: Name of the field for the collection identifier.
:param settings: application settings to retrieve request options as necessary.
"""
[docs]
self.settings = settings
[docs]
self.collection_identifier = collection_identifier
[docs]
self.osdd_url = osdd_url
[docs]
self.params = {
catalog_search_field: collection_identifier,
"httpAccept": "application/geo+json",
}
# validate inputs
if any(c in "/?" for c in collection_identifier):
raise ValueError(f"Invalid collection identifier: {collection_identifier}")
[docs]
def get_template_url(self):
# type: () -> str
resp = request_extra("get", self.osdd_url, params=self.params, settings=self.settings)
resp.raise_for_status()
data = xml_util.fromstring(resp.content)
xpath = "//*[local-name() = 'Url'][@rel='results']"
url = data.xpath(xpath)[0] # type: xml_util.XML
return url.attrib["template"]
[docs]
def _prepare_query_url(self, template_url, params):
# type: (str, Dict) -> Tuple[str, JSON]
"""
Prepare the URL for the `OpenSearch` query.
:param template_url: url containing query parameters.
:param params: parameters to insert in formatted URL.
"""
base_url, query = template_url.split("?", 1)
query_params = {}
template_parameters = parse_qsl(query)
allowed_names = {p[0] for p in template_parameters}
for key, value in template_parameters:
if "{" in value and "}" in value:
pass
else: # default value
query_params[key] = value
for key, value in params.items():
if key not in allowed_names:
raise ValueError(f"Key '{key}' is not an allowed query parameter.")
query_params[key] = value
if "maximumRecords" not in query_params:
query_params["maximumRecords"] = self.DEFAULT_MAX_QUERY_RESULTS
return base_url, query_params
[docs]
def _fetch_datatsets_from_alternates_links(self, alternate_links):
# Try loading from atom alternate link
for link in alternate_links:
if link["type"] == "application/atom+xml":
resp = request_extra("get", link["href"], settings=self.settings)
resp.raise_for_status()
data = xml_util.fromstring(resp.content)
xpath = "//*[local-name() = 'entry']/*[local-name() = 'link']"
links = data.xpath(xpath) # type: List[xml_util.XML]
return [link.attrib for link in links]
return []
[docs]
def _query_features_paginated(self, params):
# type: (JSON) -> Iterator[JSON, str]
"""
Iterates over paginated results until all features are retrieved.
:param params: query parameters
"""
start_index = 1
maximum_records = params.get("maximumRecords")
template_url = self.get_template_url()
base_url, query_params = self._prepare_query_url(template_url, params)
while True:
query_params["startRecord"] = start_index
response = request_extra("get", base_url, params=query_params,
intervals=list(range(1, 5)), allowed_codes=[HTTPOk.code],
settings=self.settings)
if response.status_code != 200:
break
json_body = response.json()
features = json_body.get("features", [])
for feature in features:
yield feature, response.url
n_received_features = len(features)
n_received_so_far = start_index + n_received_features - 1 # index starts at 1
total_results = json_body["totalResults"]
if not n_received_features:
break
if n_received_so_far >= total_results:
break
if maximum_records and n_received_so_far >= maximum_records:
break
start_index += n_received_features
[docs]
def query_datasets(self, params, accept_schemes, accept_mime_types):
# type: (JSON, List[str], List[str]) -> Iterator[str]
"""
Query the specified datasets.
Loop on every `OpenSearch` result feature and yield URL matching required mime-type and scheme.
Log a warning if a feature cannot yield a valid URL (either no compatible mime-type or scheme)
:param params: query parameters
:param accept_schemes: only return links of this scheme
:param accept_mime_types: list of accepted mime types, ordered by preference
:raise KeyError: If the feature doesn't contain a json data section or an atom alternative link
"""
if params is None:
params = {}
for feature, url in self._query_features_paginated(params):
try:
try:
data_links = feature["properties"]["links"]["data"]
except KeyError:
# Try loading from atom alternate link
data_links = self._fetch_datatsets_from_alternates_links(
feature["properties"]["links"]["alternates"])
data_links_mime_types = [d["type"] for d in data_links]
except KeyError:
LOGGER.exception("Badly formatted json at: [%s]", url)
raise
for mime_type in accept_mime_types:
good_links = [data["href"]
for data in data_links
if data["type"] == mime_type and
urlparse(data["href"]).scheme in accept_schemes]
if good_links:
yield good_links[0]
break
else:
# Do not raise an error right now, just loop until we reach the number of inputs we want
# Raise only if that number isn't reach
LOGGER.warning("Could not match any accepted mimetype [%s] to received mimetype [%s] using params %s",
", ".join(accept_mime_types), ", ".join(data_links_mime_types), params)
[docs]
def get_additional_parameters(input_data):
# type: (JSON) -> List[Tuple[str, str]]
"""
Retrieve the values from the ``additionalParameters`` of the input.
:param input_data: Dict containing or not the "additionalParameters" key
"""
output = []
additional_parameters = input_data.get("additionalParameters", [])
for additional_param in additional_parameters:
for key, value in additional_param.items():
if key == "parameters":
for param in value:
name = param.get("name", "")
values = param.get("values", [])
if name:
output.append((name, values))
return output
[docs]
class EOImageDescribeProcessHandler(object):
def __init__(self, inputs):
# type: (List[JSON_IO_Type]) -> None
@staticmethod
@staticmethod
[docs]
def get_allowed_collections(input_data):
# type: (JSON) -> List[str]
for name, value in get_additional_parameters(input_data):
if name.upper() == "ALLOWEDCOLLECTIONS":
return value.split(",")
return []
@staticmethod
[docs]
def make_aoi(id_):
# type: (str) -> JSON
data = {
"id": id_,
"title": "Area of Interest",
"abstract": "Area of Interest (Bounding Box)",
"formats": [{"mimeType": "OGC-WKT", "default": True}],
"minOccurs": "1",
"maxOccurs": "1",
"additionalParameters": [
{
"role": "http://www.opengis.net/eoc/applicationContext/inputMetadata",
"parameters": [
{"name": "CatalogSearchField", "values": ["bbox"]}
],
}
],
}
return data
@staticmethod
[docs]
def make_collection(identifier, allowed_values): # pylint: disable=W0613
# type: (str, List[str]) -> JSON
description = "Collection of the data."
data = {
"id": str(identifier),
"title": description,
"abstract": description,
"formats": [{"mimeType": ContentType.TEXT_PLAIN, "default": True}],
"minOccurs": "1",
"maxOccurs": "unbounded",
"literalDataDomains": [{"dataType": {"name": "String"}}],
"additionalParameters": [
{
"role": "http://www.opengis.net/eoc/applicationContext/inputMetadata",
"parameters": [
{
"name": "CatalogSearchField",
"values": ["parentIdentifier"],
}
],
}
],
}
return data
@staticmethod
[docs]
def make_toi(id_, start_date=True):
# type: (str, bool) -> JSON
"""
Generate the Time-Of-Interest definition.
:param id_: ID of the input.
:param start_date: (Default value = True)
"""
date = OpenSearchField.START_DATE if start_date else OpenSearchField.END_DATE
search_field = f"{date[0].lower()}{date[1:]}"
data = {
"id": id_,
"title": "Time of Interest",
"abstract": "Time of Interest (defined as Start date - End date)",
"formats": [{"mimeType": ContentType.TEXT_PLAIN, "default": True}],
"minOccurs": "1",
"maxOccurs": "1",
"literalDataDomains": [{"dataType": {"name": "String"}}],
"additionalParameters": [
{
"role": "http://www.opengis.net/eoc/applicationContext/inputMetadata",
"parameters": [
{"name": "CatalogSearchField", "values": [search_field]}
],
}
],
}
return data
[docs]
def to_opensearch(self, unique_aoi, unique_toi):
# type: (bool, bool) -> List[JSON]
"""
Convert the inputs with `OpenSearch` request parameters considering Area-Of-Interest and Time-Of-Interest.
:param unique_aoi: indicate if a single/global AOI must be applied or individual ones for each input.
:param unique_toi: indicate if a single/global TOI must be applied or individual ones for each input.
"""
if not self.eoimage_inputs:
return self.other_inputs
eoimage_names = [get_any_id(i) for i in self.eoimage_inputs]
allowed_collections = [
self.get_allowed_collections(i) for i in self.eoimage_inputs
]
toi = []
aoi = []
collections = []
if unique_toi:
toi.append(self.make_toi(OpenSearchField.START_DATE, start_date=True))
toi.append(self.make_toi(OpenSearchField.END_DATE, start_date=False))
else:
for name in eoimage_names:
toi.append(
self.make_toi(
make_param_id(OpenSearchField.START_DATE, name), start_date=True
)
)
toi.append(
self.make_toi(
make_param_id(OpenSearchField.END_DATE, name), start_date=False
)
)
if unique_aoi:
aoi.append(self.make_aoi(OpenSearchField.AOI))
else:
for name in eoimage_names:
aoi.append(self.make_aoi(make_param_id(OpenSearchField.AOI, name)))
eoimage_names = modified_collection_identifiers(eoimage_names)
for name, allowed_col in zip(eoimage_names, allowed_collections):
collections.append(self.make_collection(name, allowed_col))
new_inputs = toi + aoi + collections
# inputs must have the WPS input type
for i in new_inputs:
i["type"] = WPS_LITERAL
i["data_type"] = "string"
return new_inputs + self.other_inputs
[docs]
def get_original_collection_id(payload, wps_inputs):
# type: (JSON, Dict[str, deque[WPS_Input_Type]]) -> Dict[str, deque[WPS_Input_Type]]
"""
Obtains modified WPS inputs considering mapping to known `OpenSearch` collection IDs.
When we deploy a `Process` that contains `OpenSearch` parameters, the collection identifier is modified::
Ex: files -> collection
Ex: s2 -> collection_s2, probav -> collection_probav
This function changes the ID in the execute request to the one from the deployed `Process` description.
:param payload:
:param wps_inputs:
:return:
"""
new_inputs = deepcopy(wps_inputs)
inputs = get_eo_images_inputs_from_payload(payload)
original_ids = [get_any_id(i) for i in inputs]
correspondence = dict(
zip(modified_collection_identifiers(original_ids), original_ids)
)
for execute_id, deploy_id in correspondence.items():
if execute_id not in new_inputs:
raise ValueError(f"Missing required input parameter: {execute_id}")
new_inputs[deploy_id] = new_inputs.pop(execute_id)
return new_inputs
[docs]
def get_eo_images_data_sources(payload, wps_inputs):
# type: (Dict, Dict[str, deque[WPS_Input_Type]]) -> Dict[str, DataSourceOpenSearch]
"""
Resolve the data source of an ``EOImage`` input reference.
:param payload: Deploy payload
:param wps_inputs: Execute inputs
:returns: Data source of the ``EOImage``.
"""
inputs = get_eo_images_inputs_from_payload(payload)
eo_image_identifiers = [get_any_id(i) for i in inputs]
data_sources = {i: get_data_source(wps_inputs[i][0].data) for i in eo_image_identifiers}
return data_sources
[docs]
def get_eo_images_mime_types(payload):
# type: (JSON) -> Dict[str, List[str]]
"""
Get the accepted media-types from the deployment payload.
:param payload: Deploy payload.
:returns: Accepted media-type.
"""
inputs = get_eo_images_inputs_from_payload(payload)
result = {}
for input_ in inputs:
formats_default_first = sorted(input_["formats"],
key=lambda x: x.get("default", False),
reverse=True)
mimetypes = [f["mimeType"] for f in formats_default_first]
result[get_any_id(input_)] = mimetypes
return result
[docs]
def insert_max_occurs(payload, wps_inputs):
# type: (JSON, Dict[str, Deque[WPS_Input_Type]]) -> None
"""
Insert maxOccurs value in wps inputs using the deploy payload.
:param payload: Deploy payload.
:param wps_inputs: WPS inputs.
"""
inputs = get_eo_images_inputs_from_payload(payload)
for input_ in inputs:
try:
wps_inputs[get_any_id(input_)][0].max_occurs = int(input_["maxOccurs"])
except ValueError:
pass
[docs]
def modified_collection_identifiers(eo_image_identifiers):
# type: (List[str]) -> List[str]
unique_eoimage = len(eo_image_identifiers) == 1
new_identifiers = []
for identifier in eo_image_identifiers:
new = OpenSearchField.COLLECTION if unique_eoimage else make_param_id(identifier, OpenSearchField.COLLECTION)
new_identifiers.append(new)
return new_identifiers
[docs]
def get_data_source(collection_id):
# type: (str) -> DataSourceOpenSearch
"""
Obtain the applicable :term:`Data Source` based on the provided collection identifier.
.. seealso::
- :ref:`conf_data_sources`
- :ref:`opensearch_data_source`
"""
data_sources = fetch_data_sources()
for source_data in data_sources.values():
try:
if source_data["collection_id"] == collection_id:
return source_data
except KeyError:
pass
# specific collection id not found, try to return the default one
try:
return data_sources["opensearchdefault"]
except KeyError:
raise ValueError(f"No OSDD URL found in data sources for collection ID '{collection_id}'")
[docs]
def get_eo_images_ids_from_payload(payload):
# type: (JSON) -> List[str]
return [get_any_id(i) for i in get_eo_images_inputs_from_payload(payload)]
[docs]
def make_param_id(param_name, identifier):
# type: (str, str) -> str
"""
Only adds an underscore between the parameters.
"""
return f"{param_name}_{identifier}"