import logging
import os
import re
import socket
from typing import TYPE_CHECKING
from urllib.error import HTTPError
from urllib.request import urlopen
from pyramid.httpexceptions import HTTPNotFound, HTTPOk
from pyramid_storage.extensions import resolve_extensions
from pywps.inout.formats import FORMATS, Format
from requests.exceptions import ConnectionError
if TYPE_CHECKING:
from typing import Dict, List, Optional, Tuple, Union
from weaver.typedefs import JSON
# Languages
[docs]ACCEPT_LANGUAGE_EN_CA = "en-CA"
[docs]ACCEPT_LANGUAGE_FR_CA = "fr-CA"
[docs]ACCEPT_LANGUAGE_EN_US = "en-US"
[docs]ACCEPT_LANGUAGES = frozenset([
ACCEPT_LANGUAGE_EN_US, # place first to match default of PyWPS and most existing remote servers
ACCEPT_LANGUAGE_EN_CA,
ACCEPT_LANGUAGE_FR_CA,
])
# Content-Types
# MIME-type nomenclature:
# <type> "/" [x- | <tree> "."] <subtype> ["+" suffix] *[";" parameter=value]
[docs]CONTENT_TYPE_APP_CWL = "application/x-cwl"
[docs]CONTENT_TYPE_APP_FORM = "application/x-www-form-urlencoded"
[docs]CONTENT_TYPE_APP_NETCDF = "application/x-netcdf"
[docs]CONTENT_TYPE_APP_GZIP = "application/gzip"
[docs]CONTENT_TYPE_APP_HDF5 = "application/x-hdf5"
[docs]CONTENT_TYPE_APP_OCTET_STREAM = "application/octet-stream"
[docs]CONTENT_TYPE_APP_TAR = "application/x-tar" # map to existing gzip for CWL
[docs]CONTENT_TYPE_APP_TAR_GZ = "application/tar+gzip" # map to existing gzip for CWL
[docs]CONTENT_TYPE_APP_YAML = "application/x-yaml"
[docs]CONTENT_TYPE_APP_ZIP = "application/zip"
[docs]CONTENT_TYPE_TEXT_HTML = "text/html"
[docs]CONTENT_TYPE_TEXT_PLAIN = "text/plain"
[docs]CONTENT_TYPE_APP_PDF = "application/pdf"
[docs]CONTENT_TYPE_APP_JSON = "application/json"
[docs]CONTENT_TYPE_APP_GEOJSON = "application/geo+json"
[docs]CONTENT_TYPE_APP_VDN_GEOJSON = "application/vnd.geo+json"
[docs]CONTENT_TYPE_APP_XML = "application/xml"
[docs]CONTENT_TYPE_IMAGE_GEOTIFF = "image/tiff; subtype=geotiff"
[docs]CONTENT_TYPE_IMAGE_JPEG = "image/jpeg"
[docs]CONTENT_TYPE_IMAGE_PNG = "image/png"
[docs]CONTENT_TYPE_IMAGE_TIFF = "image/tiff"
[docs]CONTENT_TYPE_MULTI_PART_FORM = "multipart/form-data"
[docs]CONTENT_TYPE_TEXT_XML = "text/xml"
[docs]CONTENT_TYPE_ANY_XML = {CONTENT_TYPE_APP_XML, CONTENT_TYPE_TEXT_XML}
[docs]CONTENT_TYPE_ANY = "*/*"
# explicit mime-type to extension when not literally written in item after '/' (excluding 'x-' prefix)
[docs]_CONTENT_TYPE_EXTENSION_OVERRIDES = {
CONTENT_TYPE_APP_VDN_GEOJSON: ".geojson", # pywps 4.4 default extension without vdn prefix
CONTENT_TYPE_APP_NETCDF: ".nc",
CONTENT_TYPE_APP_GZIP: ".gz",
CONTENT_TYPE_APP_TAR_GZ: ".tar.gz",
CONTENT_TYPE_APP_YAML: ".yml",
CONTENT_TYPE_IMAGE_TIFF: ".tif", # common alternate to .tiff
CONTENT_TYPE_ANY: ".*", # any for glob
CONTENT_TYPE_APP_OCTET_STREAM: "",
CONTENT_TYPE_APP_FORM: "",
CONTENT_TYPE_MULTI_PART_FORM: "",
}
[docs]_CONTENT_TYPE_EXCLUDE = [
CONTENT_TYPE_APP_OCTET_STREAM,
CONTENT_TYPE_APP_FORM,
CONTENT_TYPE_MULTI_PART_FORM,
]
[docs]_EXTENSION_CONTENT_TYPES_OVERRIDES = {
".tiff": CONTENT_TYPE_IMAGE_TIFF, # avoid defaulting to subtype geotiff
".yaml": CONTENT_TYPE_APP_YAML, # common alternative to .yml
}
[docs]_CONTENT_TYPE_EXTENSION_MAPPING = {} # type: Dict[str, str]
_CONTENT_TYPE_EXTENSION_MAPPING.update(_CONTENT_TYPE_EXTENSION_OVERRIDES)
# extend with all known pywps formats
[docs]_CONTENT_TYPE_FORMAT_MAPPING = {
# content-types here are fully defined with extra parameters (e.g.: geotiff as subtype of tiff)
fmt.mime_type: fmt
for _, fmt in FORMATS._asdict().items() # noqa: W0212
if fmt.mime_type not in _CONTENT_TYPE_EXCLUDE
} # type: Dict[str, Format]
# back-propagate changes from new formats
_CONTENT_TYPE_EXTENSION_MAPPING.update({
ctype: fmt.extension
for ctype, fmt in _CONTENT_TYPE_FORMAT_MAPPING.items() # noqa: W0212
if ctype not in _CONTENT_TYPE_EXTENSION_MAPPING
})
# apply any remaining local types not explicitly or indirectly added by FORMATS
[docs]_CONTENT_TYPE_EXT_PATTERN = re.compile(r"^[a-z]+/(x-)?(?P<ext>([a-z]+)).*$")
_CONTENT_TYPE_LOCALS_MISSING = [
(ctype, _CONTENT_TYPE_EXT_PATTERN.match(ctype))
for name, ctype in locals().items()
if name.startswith("CONTENT_TYPE_")
and isinstance(ctype, str)
and ctype not in _CONTENT_TYPE_EXCLUDE
and ctype not in _CONTENT_TYPE_FORMAT_MAPPING
and ctype not in _CONTENT_TYPE_EXTENSION_MAPPING
]
[docs]_CONTENT_TYPE_LOCALS_MISSING = sorted(
[
(ctype, "." + re_ext["ext"])
for ctype, re_ext in _CONTENT_TYPE_LOCALS_MISSING if re_ext
],
key=lambda typ: typ[0]
)
# update and back-propagate generated local types
_CONTENT_TYPE_EXTENSION_MAPPING.update(_CONTENT_TYPE_LOCALS_MISSING)
# extend additional types
# FIXME: disabled for security reasons
# _CONTENT_TYPE_EXTENSION_MAPPING.update({
# ctype: ext
# for ext, ctype in mimetypes.types_map.items()
# if ctype not in _CONTENT_TYPE_EXCLUDE
# and ctype not in _CONTENT_TYPE_EXTENSION_MAPPING
# })
_CONTENT_TYPE_FORMAT_MAPPING.update({
ctype: Format(ctype, extension=ext)
for ctype, ext in _CONTENT_TYPE_LOCALS_MISSING
if ctype not in _CONTENT_TYPE_EXCLUDE
})
_CONTENT_TYPE_FORMAT_MAPPING.update({
ctype: Format(ctype, extension=ext)
for ctype, ext in _CONTENT_TYPE_EXTENSION_MAPPING.items()
if ctype not in _CONTENT_TYPE_EXCLUDE
and ctype not in _CONTENT_TYPE_FORMAT_MAPPING
})
[docs]_EXTENSION_CONTENT_TYPES_MAPPING = {
# because the same extension can represent multiple distinct Content-Types,
# derive the simplest (shortest) one by default for guessing generic Content-Type
ext: ctype for ctype, ext in reversed(sorted(
_CONTENT_TYPE_EXTENSION_MAPPING.items(),
key=lambda typ_ext: len(typ_ext[0])
))
}
_EXTENSION_CONTENT_TYPES_MAPPING.update(_EXTENSION_CONTENT_TYPES_OVERRIDES)
# file types that can contain textual characters
[docs]_CONTENT_TYPE_CHAR_TYPES = [
"application",
"multipart",
"text",
]
# redirect type resolution semantically equivalent CWL validators
# should only be used to map CWL 'format' field if they are not already resolved through existing IANA/EDAM reference
[docs]_CONTENT_TYPE_SYNONYM_MAPPING = {
CONTENT_TYPE_APP_TAR: CONTENT_TYPE_APP_GZIP,
CONTENT_TYPE_APP_TAR_GZ: CONTENT_TYPE_APP_GZIP,
}
# Mappings for "CWL->File->Format"
# IANA contains most standard MIME-types, but might not include special (application/x-hdf5, application/x-netcdf, etc.)
# EDAM contains many field-specific schemas, but don't have an implicit URL definition (uses 'format_<id>' instead)
# search:
# - IANA: https://www.iana.org/assignments/media-types/media-types.xhtml
# - EDAM-classes: http://bioportal.bioontology.org/ontologies/EDAM/?p=classes (section 'Format')
# - EDAM-browser: https://ifb-elixirfr.github.io/edam-browser/
[docs]IANA_NAMESPACE_DEFINITION = {IANA_NAMESPACE: "https://www.iana.org/assignments/media-types/"}
[docs]EDAM_NAMESPACE_DEFINITION = {EDAM_NAMESPACE: "http://edamontology.org/"}
[docs]EDAM_SCHEMA = "http://edamontology.org/EDAM_1.24.owl"
[docs]EDAM_MAPPING = {
CONTENT_TYPE_APP_CWL: "format_3857",
CONTENT_TYPE_APP_HDF5: "format_3590",
CONTENT_TYPE_APP_JSON: "format_3464",
CONTENT_TYPE_APP_NETCDF: "format_3650",
CONTENT_TYPE_APP_YAML: "format_3750",
CONTENT_TYPE_TEXT_PLAIN: "format_1964",
}
# renderers output formats for OpenAPI generation
[docs]WPS_VERSION_100 = "1.0.0"
[docs]WPS_VERSION_200 = "2.0.0"
}
[docs]LOGGER = logging.getLogger(__name__)
[docs]def get_allowed_extensions():
# type: () -> List[str]
"""
Obtain the complete list of extensions that are permitted for processing by the application.
.. note::
This is employed for security reasons. Files can still be specified with another allowed extension, but
it will not automatically inherit properties applicable to scripts and executables.
If a specific file type is refused due to its extension, a PR can be submitted to add it explicitly.
"""
groups = [
"archives",
"audio",
"data",
"documents",
# "executables",
"images",
# "scripts",
"text",
"video",
]
base = set(resolve_extensions("+".join(groups)))
extra = {ext[1:] for ext in _EXTENSION_CONTENT_TYPES_MAPPING if ext and "*" not in ext}
return list(base | extra)
[docs]def get_extension(mime_type):
# type: (str) -> str
"""
Retrieves the extension corresponding to :paramref:`mime_type` if explicitly defined, or by parsing it.
"""
fmt = _CONTENT_TYPE_FORMAT_MAPPING.get(mime_type)
if fmt:
return fmt.extension
ext = _CONTENT_TYPE_EXTENSION_MAPPING.get(mime_type)
if ext:
return ext
ctype = clean_mime_type_format(mime_type, strip_parameters=True)
if not ctype:
return ""
return _CONTENT_TYPE_EXTENSION_MAPPING.get(ctype, ".{}".format(ctype.split("/")[-1].replace("x-", "")))
[docs]def get_content_type(extension, charset=None, default=None):
# type: (str, Optional[str], Optional[str]) -> Optional[str]
"""
Retrieves the Content-Type corresponding to the specified extension if it can be matched.
:param extension: Extension for which to attempt finding a known Content-Type.
:param charset: Charset to apply to the Content-Type as needed if extension was matched.
:param default: Default Content-Type to return if no extension is matched.
:return: Matched or default Content-Type.
"""
if not extension:
return default
if not extension.startswith("."):
extension = f".{extension}"
ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension)
if not ctype:
return default
# no parameters in Media-Type, but explicit Content-Type with charset could exist as needed
if charset and "charset=" in ctype:
return re.sub(r"charset\=[A-Za-z0-9\_\-]+", f"charset={charset}", ctype)
# make sure to never include by mistake if the represented type cannot be characters
if charset and any(ctype.startswith(_type + "/") for _type in _CONTENT_TYPE_CHAR_TYPES):
return f"{ctype}; charset={charset}"
return ctype