import logging
import os
import sys
from importlib import import_module
from string import Template
from typing import TYPE_CHECKING
import yaml
from cwltool.command_line_tool import CommandLineTool
from cwltool.docker import DockerCommandLineJob
from cwltool.job import CommandLineJob, JobBase
from cwltool.singularity import SingularityCommandLineJob
from weaver import WEAVER_ROOT_DIR
from weaver.compat import cache
from weaver.database import get_db
from weaver.datatype import Process
from weaver.exceptions import PackageExecutionError, PackageNotFound, ProcessNotAccessible, ProcessNotFound
from weaver.execute import ExecuteControlOption
from weaver.processes.constants import CWL_REQUIREMENT_APP_BUILTIN
from weaver.processes.types import ProcessType
from weaver.processes.wps_package import get_process_definition
from weaver.store.base import StoreProcesses
from weaver.utils import clean_json_text_body, get_registry, ows_context_href
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_url
from weaver.wps_restapi.utils import get_wps_restapi_base_url
if TYPE_CHECKING:
from typing import Any, Callable, Dict, Optional, Type, Union
from cwltool.builder import Builder
from cwltool.context import RuntimeContext
from cwltool.pathmapper import PathMapper
from weaver.typedefs import AnySettingsContainer, CWL, CWL_RequirementsList, JSON, TypedDict
BuiltinResourceMap = TypedDict("BuiltinResourceMap", {
"package": os.PathLike[str],
"payload": JSON
}, total=True)
LOGGER = logging.getLogger(__name__)
[docs]
WEAVER_BUILTIN_DIR = os.path.abspath(os.path.dirname(__file__))
__all__ = [
"WEAVER_BUILTIN_DIR",
"BuiltinProcess",
"get_builtin_reference_mapping",
"register_builtin_processes"
]
@cache
[docs]
def get_builtin_reference_mapping(root=WEAVER_BUILTIN_DIR):
# type: (os.PathLike[str]) -> Dict[str, BuiltinResourceMap]
"""
Generates a mapping of `reference` to actual ``builtin`` package file path.
"""
builtin_names = [_pkg for _pkg in os.listdir(root) if os.path.splitext(_pkg)[-1] == ".cwl"]
refs = {
os.path.splitext(_pkg)[0]: {"package": os.path.join(root, _pkg), "payload": {}}
for _pkg in builtin_names
}
for builtin_ref in refs.values():
for ext in [".yml", ".yaml", ".json"]:
payload_ref = os.path.splitext(builtin_ref["package"])[0] + ext
if os.path.isfile(payload_ref):
with open(payload_ref, mode="r", encoding="utf-8") as f_payload:
builtin_ref["payload"] = yaml.safe_load(f_payload) or {}
break
return refs
def _get_builtin_metadata(process_id, process_path, meta_field, extra_info=None, extra_param=None, *, clean=False):
# type: (str, os.PathLike[str], str, Optional[Dict[str, Any]], Optional[str], Any, bool) -> Union[str, None]
"""
Retrieves the ``builtin`` process ``meta_field`` from its definition if it exists.
"""
py_file = f"{os.path.splitext(process_path)[0]}.py"
meta = None
if os.path.isfile(py_file):
try:
mod = import_module(f"{__name__}.{process_id}")
meta = getattr(mod, meta_field, None)
if meta and isinstance(meta, str):
return clean_json_text_body(meta) if clean else meta
except ImportError: # pragma: no cover
pass
if isinstance(extra_info, dict) and isinstance(extra_param, str):
meta = extra_info.get(extra_param)
if meta and isinstance(meta, str):
return clean_json_text_body(meta) if clean else meta
return None
def _replace_template(pkg, var, val):
# type: (Union[CWL, str], str, str) -> CWL
if isinstance(pkg, str):
return Template(pkg).safe_substitute({var: val}) # type: ignore
for key in pkg: # type: str
field = pkg[key] # type: ignore
if isinstance(field, list):
for i, _ in enumerate(field):
field[i] = _replace_template(field[i], var, val)
elif isinstance(field, (dict, str)):
pkg[key] = _replace_template(field, var, val) # type: ignore
return pkg
def _get_builtin_package(process_id, package):
# type: (str, CWL) -> CWL
"""
Updates the `CWL` with requirements to allow running a :data:`ProcessType.BUILTIN` process.
Following modifications are applied:
- Add `hints` section with :data:`CWL_REQUIREMENT_APP_BUILTIN`.
- Replace references to environment variable :data:`WEAVER_ROOT_DIR` as needed.
The `CWL` ``hints`` are employed to avoid error from the runner that doesn't know this requirement definition.
The ``hints`` can be directly in the package definition without triggering validation errors.
"""
if "hints" not in package:
package["hints"] = {}
package["hints"].update({CWL_REQUIREMENT_APP_BUILTIN: {"process": process_id}})
# FIXME:
# fix base directory of command until bug fixed:
# https://github.com/common-workflow-language/cwltool/issues/668
return _replace_template(package, "WEAVER_ROOT_DIR", WEAVER_ROOT_DIR)
[docs]
def register_builtin_processes(container):
# type: (AnySettingsContainer) -> None
"""
Registers every ``builtin`` CWL package to the processes database.
CWL definitions must be located within the :mod:`weaver.processes.builtin` module.
.. note::
Although any settings can be provided as input, it is better to specify a
:class:`pyramid.registry.Registry` or :class:`pyramid.request.Request` instance
in order to retrieve any pre-established database connection stored as reference.
Specifying configuration settings will force recreation of a new database connection.
"""
restapi_url = get_wps_restapi_base_url(container)
builtin_apps_mapping = get_builtin_reference_mapping(WEAVER_BUILTIN_DIR)
builtin_processes = []
for process_id, process_data in builtin_apps_mapping.items():
process_path = process_data["package"]
process_desc = process_data["payload"]
process_info = get_process_definition(process_desc, package=None, reference=process_path, builtin=True)
process_abstract = _get_builtin_metadata(
process_id, process_path, "__doc__", process_info, "description", clean=True
)
process_version = _get_builtin_metadata(process_id, process_path, "__version__", process_info, "version")
process_title = _get_builtin_metadata(process_id, process_path, "__title__", process_info, "title")
process_id_resolved = process_info["identifier"]
process_url = "/".join([restapi_url, "processes", process_id_resolved])
process_package = _get_builtin_package(process_id_resolved, process_info["package"])
process_payload = {
"processDescription": {
"process": {
"id": process_id_resolved,
"type": ProcessType.BUILTIN,
"title": process_title,
"version": process_version,
"abstract": process_abstract,
}
},
"deploymentProfileName": "http://www.opengis.net/profiles/eoc/builtinApplication",
"executionUnit": [{"unit": process_package}],
}
process_payload["processDescription"]["process"].update(ows_context_href(process_url))
builtin_processes.append(Process(
id=process_id_resolved,
type=ProcessType.BUILTIN,
title=process_title,
version=process_version,
abstract=process_abstract,
payload=process_payload,
package=process_package,
inputs=process_info["inputs"],
outputs=process_info["outputs"],
processDescriptionURL=process_url,
processEndpointWPS1=get_wps_url(container),
executeEndpoint="/".join([process_url, "jobs"]),
jobControlOptions=ExecuteControlOption.values(),
visibility=Visibility.PUBLIC,
))
# registration of missing/updated apps automatically applied with 'default_processes'
get_db(container).get_store(StoreProcesses, default_processes=builtin_processes)
class BuiltinProcessJobBase(CommandLineJob):
def __init__(self, builder, joborder, make_path_mapper, requirements, hints, name):
# type: (Builder, JSON, Callable[..., PathMapper], CWL_RequirementsList, CWL_RequirementsList, str) -> None
process_hints = [h for h in hints if "process" in h]
if not process_hints or len(process_hints) != 1:
raise PackageNotFound("Could not extract referenced process in job.")
self.process = process_hints[0]["process"]
super(BuiltinProcessJobBase, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
def _validate_process(self):
# type: () -> None
try:
registry = get_registry()
store = get_db(registry).get_store(StoreProcesses)
process = store.fetch_by_id(self.process) # raise if not found
except (ProcessNotAccessible, ProcessNotFound):
raise PackageNotFound(f"Cannot find '{ProcessType.BUILTIN}' package for process '{self.process}'")
if process.type != ProcessType.BUILTIN:
raise PackageExecutionError(f"Invalid package is not of type '{ProcessType.BUILTIN}'")
def _update_command(self):
# type: () -> None
if len(self.command_line) and self.command_line[0] == "python":
LOGGER.debug("Mapping generic builtin Python command to environment: [python] => [%s]", sys.executable)
self.command_line[0] = sys.executable
# pylint: disable=W0221,W0237 # naming using python like arguments
def run(self, runtime_context, **kwargs):
# type: (RuntimeContext, **Any) -> None
try:
self._validate_process()
self._update_command()
super(BuiltinProcessJobBase, self).run(runtime_context, **kwargs)
except Exception as err:
LOGGER.warning("Failed to run process:\n%s", err, exc_info=runtime_context.debug)
self.output_callback({}, "permanentFail")
class BuiltinProcessJobDocker(BuiltinProcessJobBase, DockerCommandLineJob):
pass
class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob):
pass
# pylint: disable=W0221,W0237 # naming using python like arguments
[docs]
class BuiltinProcess(CommandLineTool):
[docs]
def make_job_runner(self, runtime_context):
# type: (RuntimeContext) -> Type[JobBase]
job = super(BuiltinProcess, self).make_job_runner(runtime_context)
if issubclass(job, DockerCommandLineJob):
return BuiltinProcessJobDocker
if issubclass(job, SingularityCommandLineJob):
return BuiltinProcessJobSingularity
return BuiltinProcessJobBase