Source code for weaver.processes.sources

import os
from typing import TYPE_CHECKING
from urllib.parse import urlparse

import yaml
from pyramid.settings import asbool

from weaver import WEAVER_ROOT_DIR
from weaver.config import WEAVER_DEFAULT_DATA_SOURCES_CONFIG, get_weaver_config_file
from weaver.processes.constants import OpenSearchField
from weaver.utils import get_settings
from weaver.wps_restapi.utils import get_wps_restapi_base_url

    from typing import Optional

    from weaver.typedefs import AnySettingsContainer, DataSourceConfig

[docs] DATA_SOURCES = {} # type: DataSourceConfig
""" Data sources configuration. Unless explicitly overridden, the configuration will be loaded from file as specified by``weaver.data_sources`` setting. Following JSON schema format is expected (corresponding YAML also supported): .. code-block:: json { "$schema": "", "title": "Data Sources", "type": "object", "patternProperties": { ".*": { "type": "object", "required": [ "netloc", "ades" ], "additionalProperties": false, "properties": { "netloc": { "type": "string", "description": "Net location of a data source url use to match this data source." }, "ades": { "type": "string", "description": "ADES endpoint where the processing of this data source can occur." }, "default": { "type": "string", "description": "True indicate that if no data source match this one should be used (Use the first default)." } } } } } """
[docs] def fetch_data_sources(container=None): # type: (Optional[AnySettingsContainer]) -> DataSourceConfig global DATA_SOURCES # pylint: disable=W0603,global-statement if DATA_SOURCES: return DATA_SOURCES settings = get_settings(container) or {} data_source_config = settings.get("weaver.data_sources", "") if data_source_config: data_source_config = get_weaver_config_file(str(data_source_config), WEAVER_DEFAULT_DATA_SOURCES_CONFIG) if not os.path.isabs(data_source_config): data_source_config = os.path.normpath(os.path.join(WEAVER_ROOT_DIR, data_source_config)) try: with open(data_source_config, mode="r", encoding="utf-8") as f: DATA_SOURCES = yaml.safe_load(f) # both JSON/YAML except Exception as exc: raise ValueError(f"Data sources file [{data_source_config}] cannot be loaded due to error: [{exc!r}].") if not DATA_SOURCES: raise ValueError("No data sources found in setting 'weaver.data_sources'. Data source required for EMS.") return DATA_SOURCES
[docs] def get_default_data_source(data_sources): # type: (DataSourceConfig) -> str # Check for a data source with the default property for src, val in data_sources.items(): if asbool(val.get("default", False)): return src # Use the first one if no default have been set return next(iter(data_sources))
[docs] def retrieve_data_source_url(data_source, container=None): # type: (Optional[str], Optional[AnySettingsContainer]) -> str """ Finds the data source URL using the provided data source identifier. :returns: found URL, 'default' data source if not found, or current weaver WPS Rest API base URL if `None`. """ settings = get_settings(container) or {} if data_source is None: # get local data source return get_wps_restapi_base_url(settings) data_sources = fetch_data_sources(settings) return data_sources[data_source if data_source in data_sources else get_default_data_source(data_sources)]["ades"]
[docs] def get_data_source_from_url(data_url, container=None): # type: (str, Optional[AnySettingsContainer]) -> str data_sources = fetch_data_sources(container) try: parsed = urlparse(data_url) netloc, path, scheme = parsed.netloc, parsed.path, parsed.scheme if netloc: for src, val in data_sources.items(): if val["netloc"] == netloc: return src elif scheme == OpenSearchField.LOCAL_FILE_SCHEME: # for file links, try to find if any rootdir matches in the file path for src, val in data_sources.items(): if path.startswith(val["rootdir"]): return src except Exception: # noqa: W0703 # nosec: B110 pass return get_default_data_source(data_sources)