Source code for weaver.processes.sources

import os
from typing import TYPE_CHECKING
from urllib.parse import urlparse

import yaml
from pyramid.settings import asbool
from pyramid_celery import celery_app as app

from weaver import WEAVER_ROOT_DIR
from weaver.config import WEAVER_DEFAULT_DATA_SOURCES_CONFIG, get_weaver_config_file
from weaver.processes.constants import OPENSEARCH_LOCAL_FILE_SCHEME
from weaver.utils import get_settings
from weaver.wps_restapi.utils import get_wps_restapi_base_url

if TYPE_CHECKING:
    from typing import Optional, Text

[docs]DATA_SOURCES = {}
"""Data sources configuration. Unless explicitly overridden, the configuration will be loaded from file as specified by``weaver.data_sources`` setting. Following JSON schema format is expected (corresponding YAML also supported): .. code-block:: json { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Data Sources", "type": "object", "patternProperties": { ".*": { "type": "object", "required": [ "netloc", "ades" ], "additionalProperties": false, "properties": { "netloc": { "type": "string", "description": "Net location of a data source url use to match this data source." }, "ades": { "type": "string", "description": "ADES endpoint where the processing of this data source can occur." }, "default": { "type": "string", "description": "True indicate that if no data source match this one should be used (Use the first default)." } } } } } """
[docs]def fetch_data_sources(): global DATA_SOURCES # pylint: disable=W0603,global-statement if DATA_SOURCES: return DATA_SOURCES data_source_config = get_settings(app).get("weaver.data_sources", "") if data_source_config: data_source_config = get_weaver_config_file(str(data_source_config), WEAVER_DEFAULT_DATA_SOURCES_CONFIG) if not os.path.isabs(data_source_config): data_source_config = os.path.normpath(os.path.join(WEAVER_ROOT_DIR, data_source_config)) try: with open(data_source_config) as f: DATA_SOURCES = yaml.safe_load(f) # both JSON/YAML except Exception as exc: raise ValueError("Data sources file [{0}] cannot be loaded due to error: [{1!r}]." .format(data_source_config, exc)) if not DATA_SOURCES: raise ValueError("No data sources found in setting 'weaver.data_sources'.") return DATA_SOURCES
[docs]def get_default_data_source(data_sources): # Check for a data source with the default property for src, val in data_sources.items(): if asbool(val.get("default", False)): return src # Use the first one if no default have been set return next(iter(data_sources))
[docs]def retrieve_data_source_url(data_source): # type: (Optional[Text]) -> Text """ Finds the data source URL using the provided data source identifier. :returns: found URL, 'default' data source if not found, or current weaver WPS Rest API base URL if `None`. """ if data_source is None: # get local data source return get_wps_restapi_base_url(get_settings(app)) data_sources = fetch_data_sources() return data_sources[data_source if data_source in data_sources else get_default_data_source(data_sources)]["ades"]
[docs]def get_data_source_from_url(data_url): data_sources = fetch_data_sources() try: parsed = urlparse(data_url) netloc, path, scheme = parsed.netloc, parsed.path, parsed.scheme if netloc: for src, val in data_sources.items(): if val["netloc"] == netloc: return src elif scheme == OPENSEARCH_LOCAL_FILE_SCHEME: # for file links, try to find if any rootdir matches in the file path for src, val in data_sources.items(): if path.startswith(val["rootdir"]): return src except Exception: # noqa: W0703 # nosec: B110 pass return get_default_data_source(data_sources)