Stores to read/write data to from/to `MongoDB` using pymongo.

import logging
from typing import TYPE_CHECKING

import pymongo
from pymongo import ASCENDING, DESCENDING
from pyramid.request import Request
from pywps import Process as ProcessWPS

from weaver.datatype import Bill, Job, Process, Quote, Service
from weaver.exceptions import (
from weaver.execute import EXECUTE_MODE_ASYNC, EXECUTE_MODE_SYNC
from weaver.sort import (
from weaver.status import JOB_STATUS_CATEGORIES, STATUS_ACCEPTED, map_status
from import StoreBills, StoreJobs, StoreProcesses, StoreQuotes, StoreServices
from weaver.utils import get_base_url, get_sane_name, get_weaver_url, islambda, now
from weaver.wps.utils import get_wps_url

    from typing import Any, Callable, Dict, List, Optional, Tuple, Union
    from pymongo.collection import Collection

    from import JobCategoriesAndCount, JobListAndCount
    from weaver.typedefs import AnyProcess, AnyProcessType

[docs]LOGGER = logging.getLogger(__name__)
[docs]class MongodbStore(object): """ Base class extended by all concrete store implementations. """ def __init__(self, collection, sane_name_config=None): # type: (Collection, Optional[Dict[str, Any]]) -> None if not isinstance(collection, pymongo.collection.Collection): raise TypeError("Collection not of expected type.") self.collection = collection # type: Collection self.sane_name_config = sane_name_config or {} @classmethod
[docs] def get_args_kwargs(cls, *args, **kwargs): # type: (*Any, **Any) -> Tuple[Tuple, Dict] """ Filters :class:`MongodbStore`-specific arguments to safely pass them down its ``__init__``. """ collection = None if len(args): collection = args[0] elif "collection" in kwargs: # pylint: disable=R1715 collection = kwargs["collection"] sane_name_config = kwargs.get("sane_name_config", None) return tuple([collection]), {"sane_name_config": sane_name_config}
[docs]class MongodbServiceStore(StoreServices, MongodbStore): """ Registry for OWS services. Uses `MongoDB` to store service url and attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreServices.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_service(self, service, overwrite=True): # type: (Service, bool) -> Service """ Stores an OWS service in mongodb. """ service_url = get_base_url(service.url) # check if service is already registered if self.collection.count_documents({"url": service_url}) > 0: if overwrite: self.collection.delete_one({"url": service_url}) else: raise ServiceRegistrationError("service url already registered.") service_name = get_sane_name(, **self.sane_name_config) if self.collection.count_documents({"name": service_name}) > 0: if overwrite: self.collection.delete_one({"name": service_name}) else: raise ServiceRegistrationError("service name already registered.") self.collection.insert_one(Service( url=service_url, name=service_name, type=service.type, public=service.public, auth=service.auth).params()) return self.fetch_by_url(url=service_url)
[docs] def delete_service(self, name): # type: (str) -> bool """ Removes service from `MongoDB` storage. """ self.collection.delete_one({"name": name}) return True
[docs] def list_services(self): # type: () -> List[Service] """ Lists all services in `MongoDB` storage. """ my_services = [] for service in self.collection.find().sort("name", pymongo.ASCENDING): my_services.append(Service(service)) return my_services
[docs] def fetch_by_name(self, name, visibility=None): # type: (str, Optional[str]) -> Service """ Gets service for given ``name`` from `MongoDB` storage. """ service = self.collection.find_one({"name": name}) if not service: raise ServiceNotFound("Service '{}' could not be found.".format(name)) service = Service(service) same_visibility = (service.public and visibility == VISIBILITY_PUBLIC) or \ (not service.public and visibility == VISIBILITY_PRIVATE) if visibility is not None and not same_visibility: raise ServiceNotAccessible("Service '{}' cannot be accessed.".format(name)) return service
[docs] def fetch_by_url(self, url): # type: (str) -> Service """ Gets service for given ``url`` from `MongoDB` storage. """ service = self.collection.find_one({"url": get_base_url(url)}) if not service: raise ServiceNotFound return Service(service)
[docs] def clear_services(self): # type: () -> bool """ Removes all OWS services from `MongoDB` storage. """ self.collection.drop() return True
[docs]class MongodbProcessStore(StoreProcesses, MongodbStore): """ Registry for processes. Uses `MongoDB` to store processes and attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreProcesses.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs) registry = kwargs.get("registry") default_processes = kwargs.get("default_processes") self.settings = kwargs.get("settings", {}) if not registry else registry.settings self.default_host = get_weaver_url(self.settings) self.default_wps_endpoint = get_wps_url(self.settings) # enforce default process re-registration to receive any applicable update if default_processes: registered_processes = [process.identifier for process in self.list_processes()] for process in default_processes: process_name = self._get_process_id(process) if process_name in registered_processes: self.delete_process(process_name) self._add_process(process)
[docs] def _add_process(self, process): # type: (AnyProcess) -> None new_process = Process.convert(process, processEndpointWPS1=self.default_wps_endpoint) if not isinstance(new_process, Process): raise ProcessInstanceError("Unsupported process type '{}'".format(type(process))) # apply defaults if not specified new_process["type"] = self._get_process_type(process) new_process["identifier"] = self._get_process_id(process) new_process["processEndpointWPS1"] = self._get_process_endpoint_wps1(process) new_process["visibility"] = new_process.visibility self.collection.insert_one(new_process.params())
[docs] def _get_process_field(process, function_dict): # type: (AnyProcess, Union[Dict[AnyProcessType, Callable[[], Any]], Callable[[], Any]]) -> Any """ Takes a lambda expression or a dict of process-specific lambda expressions to retrieve a field. Validates that the passed process object is one of the supported types. :param process: process to retrieve the field from. :param function_dict: lambda or dict of lambda of process type :return: retrieved field if the type was supported :raises ProcessInstanceError: invalid process type """ if isinstance(process, Process): if islambda(function_dict): return function_dict() return function_dict[Process]() elif isinstance(process, ProcessWPS): if islambda(function_dict): return function_dict() return function_dict[ProcessWPS]() else: raise ProcessInstanceError("Unsupported process type '{}'".format(type(process)))
[docs] def _get_process_id(self, process): # type: (AnyProcess) -> str return self._get_process_field(process, lambda: process.identifier)
[docs] def _get_process_type(self, process): # type: (AnyProcess) -> str return self._get_process_field(process, { Process: lambda: process.type, ProcessWPS: lambda: getattr(process, "type", PROCESS_WPS_LOCAL)
[docs] def _get_process_endpoint_wps1(self, process): # type: (AnyProcess) -> str url = self._get_process_field(process, {Process: lambda: process.processEndpointWPS1, ProcessWPS: lambda: None}) if not url: url = self.default_wps_endpoint return url
[docs] def save_process(self, process, overwrite=True): # type: (Union[Process, ProcessWPS], bool) -> Process """ Stores a process in storage. :param process: An instance of :class:`weaver.datatype.Process`. :param overwrite: Overwrite the matching process instance by name if conflicting. """ process_id = self._get_process_id(process) sane_name = get_sane_name(process_id, **self.sane_name_config) if self.collection.count_documents({"identifier": sane_name}) > 0: if overwrite: self.collection.delete_one({"identifier": sane_name}) else: raise ProcessRegistrationError("Process '{}' already registered.".format(sane_name)) process.identifier = sane_name # must use property getter/setter to match both 'Process' types self._add_process(process) return self.fetch_by_id(sane_name)
[docs] def delete_process(self, process_id, visibility=None): # type: (str, Optional[str]) -> bool """ Removes process from database, optionally filtered by visibility. If ``visibility=None``, the process is deleted (if existing) regardless of its visibility value. """ sane_name = get_sane_name(process_id, **self.sane_name_config) process = self.fetch_by_id(sane_name, visibility=visibility) if not process: raise ProcessNotFound("Process '{}' could not be found.".format(sane_name)) return bool(self.collection.delete_one({"identifier": sane_name}).deleted_count)
[docs] def list_processes(self, visibility=None): # type: (Optional[str]) -> List[Process] """ Lists all processes in database, optionally filtered by `visibility`. :param visibility: One value amongst `weaver.visibility`. """ db_processes = [] search_filters = {} if visibility is None: visibility = VISIBILITY_VALUES if isinstance(visibility, str): visibility = [visibility] for v in visibility: if v not in VISIBILITY_VALUES: raise ValueError("Invalid visibility value '{0!s}' is not one of {1!s}" .format(v, list(VISIBILITY_VALUES))) search_filters["visibility"] = {"$in": list(visibility)} for process in self.collection.find(search_filters).sort("identifier", pymongo.ASCENDING): db_processes.append(Process(process)) return db_processes
[docs] def fetch_by_id(self, process_id, visibility=None): # type: (str, Optional[str]) -> Process """ Get process for given `process_id` from storage, optionally filtered by `visibility`. If ``visibility=None``, the process is retrieved (if existing) regardless of its visibility value. :param process_id: process identifier :param visibility: one value amongst `weaver.visibility`. :return: An instance of :class:`weaver.datatype.Process`. """ sane_name = get_sane_name(process_id, **self.sane_name_config) process = self.collection.find_one({"identifier": sane_name}) if not process: raise ProcessNotFound("Process '{}' could not be found.".format(sane_name)) process = Process(process) if visibility is not None and process.visibility != visibility: raise ProcessNotAccessible("Process '{}' cannot be accessed.".format(sane_name)) return process
[docs] def get_visibility(self, process_id): # type: (str) -> str """ Get `visibility` of a process. :return: One value amongst `weaver.visibility`. """ process = self.fetch_by_id(process_id) return process.visibility
[docs] def set_visibility(self, process_id, visibility): # type: (str, str) -> None """ Set `visibility` of a process. :param visibility: One value amongst `weaver.visibility`. :param process_id: :raises TypeError: when :paramref:`visibility` is not :class:`str`. :raises ValueError: when :paramref:`visibility` is not one of :py:data:`weaver.visibility.VISIBILITY_VALUES`. """ process = self.fetch_by_id(process_id) process.visibility = visibility self.save_process(process, overwrite=True)
[docs] def clear_processes(self): # type: () -> bool """ Clears all processes from the store. """ self.collection.drop() return True
[docs]class MongodbJobStore(StoreJobs, MongodbStore): """ Registry for process jobs tracking. Uses `MongoDB` to store job attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreJobs.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_job(self, task_id, # type: str process, # type: str service=None, # type: Optional[str] inputs=None, # type: Optional[List[Any]] is_workflow=False, # type: bool is_local=False, # type: bool user_id=None, # type: Optional[int] execute_async=True, # type: bool custom_tags=None, # type: Optional[List[str]] access=None, # type: Optional[str] notification_email=None, # type: Optional[str] accept_language=None, # type: Optional[str] ): # type: (...) -> Job """ Stores a job in mongodb. """ try: tags = ["dev"] tags.extend(list(filter(lambda t: bool(t), custom_tags or []))) # remove empty tags if is_workflow: tags.append(PROCESS_WORKFLOW) else: tags.append(PROCESS_APPLICATION) if execute_async: tags.append(EXECUTE_MODE_ASYNC) else: tags.append(EXECUTE_MODE_SYNC) if not access: access = VISIBILITY_PRIVATE new_job = Job({ "task_id": task_id, "user_id": user_id, "service": service, # provider identifier (WPS service) "process": process, # process identifier (WPS request) "inputs": inputs, "status": map_status(STATUS_ACCEPTED), "execute_async": execute_async, "is_workflow": is_workflow, "is_local": is_local, "created": now(), "tags": list(set(tags)), # remove duplicates "access": access, "notification_email": notification_email, "accept_language": accept_language, }) self.collection.insert_one(new_job.params()) job = self.fetch_by_id( except Exception as ex: raise JobRegistrationError("Error occurred during job registration: [{}]".format(repr(ex))) if job is None: raise JobRegistrationError("Failed to retrieve registered job.") return job
[docs] def update_job(self, job): # type: (Job) -> Job """ Updates a job parameters in `MongoDB` storage. :param job: instance of ``weaver.datatype.Job``. """ try: result = self.collection.update_one({"id":}, {"$set": job.params()}) if result.acknowledged and result.matched_count == 1: return self.fetch_by_id( except Exception as ex: raise JobUpdateError("Error occurred during job update: [{}]".format(repr(ex))) raise JobUpdateError("Failed to update specified job: '{}'".format(str(job)))
[docs] def delete_job(self, job_id): # type: (str) -> bool """ Removes job from `MongoDB` storage. """ self.collection.delete_one({"id": job_id}) return True
[docs] def fetch_by_id(self, job_id): # type: (str) -> Job """ Gets job for given ``job_id`` from `MongoDB` storage. """ job = self.collection.find_one({"id": job_id}) if not job: raise JobNotFound("Could not find job matching: '{}'".format(job_id)) return Job(job)
[docs] def list_jobs(self): # type: () -> List[Job] """ Lists all jobs in `MongoDB` storage. For user-specific access to available jobs, use :meth:`MongodbJobStore.find_jobs` instead. """ jobs = [] for job in self.collection.find().sort("id", ASCENDING): jobs.append(Job(job)) return jobs
[docs] def find_jobs(self, process=None, # type: Optional[str] service=None, # type: Optional[str] tags=None, # type: Optional[List[str]] access=None, # type: Optional[str] notification_email=None, # type: Optional[str] status=None, # type: Optional[str] sort=None, # type: Optional[str] page=0, # type: int limit=10, # type: int group_by=None, # type: Optional[Union[str, List[str]]] request=None, # type: Optional[Request] ): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount] """ Finds all jobs in `MongoDB` storage matching search filters to obtain results with requested paging or grouping. :param request: request that lead to this call to obtain permissions and user id. :param process: process name to filter matching jobs. :param service: service name to filter matching jobs. :param tags: list of tags to filter matching jobs. :param access: access visibility to filter matching jobs (default: :py:data:`VISIBILITY_PUBLIC`). :param notification_email: notification email to filter matching jobs. :param status: status to filter matching jobs. :param sort: field which is used for sorting results (default: creation date, descending). :param page: page number to return when using result paging (only when not using ``group_by``). :param limit: number of jobs per page when using result paging (only when not using ``group_by``). :param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled). :returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job Example: Using paging (default), result will be in the form:: ( [Job(1), Job(2), Job(3), ...], <total> ) Where ``<total>`` will indicate the complete count of matched jobs with filters, but the list of jobs will be limited only to ``page`` index and ``limit`` specified. Using grouping with a list of field specified with ``group_by``, results will be in the form:: ( [{category: {field1: valueA, field2: valueB, ...}, [Job(1), Job(2), ...], count: <count>}, {category: {field1: valueC, field2: valueD, ...}, [Job(x), Job(y), ...], count: <count>}, ... ], <total> ) Where ``<total>`` will again indicate all matched jobs by every category combined, and ``<count>`` will indicate the amount of jobs matched for each individual category. Also, ``category`` will indicate values of specified fields (from ``group_by``) that compose corresponding jobs with matching values. """ if any(v in tags for v in VISIBILITY_VALUES): raise ValueError("Visibility values not acceptable in 'tags', use 'access' instead.") search_filters = {} if not request: search_filters.setdefault("access", VISIBILITY_PUBLIC) else: if request.has_permission("admin") and access in VISIBILITY_VALUES: search_filters["access"] = access else: user_id = request.authenticated_userid if user_id is not None: search_filters["user_id"] = user_id if access in VISIBILITY_VALUES: search_filters["access"] = access else: search_filters["access"] = VISIBILITY_PUBLIC if tags: search_filters["tags"] = {"$all": tags} if status in JOB_STATUS_CATEGORIES: search_filters["status"] = {"$in": JOB_STATUS_CATEGORIES[status]} elif status: search_filters["status"] = status if notification_email is not None: search_filters["notification_email"] = notification_email if process is not None: search_filters["process"] = process if service is not None: search_filters["service"] = service if sort is None: sort = SORT_CREATED elif sort == SORT_USER: sort = "user_id" if sort not in JOB_SORT_VALUES: raise JobNotFound("Invalid sorting method: '{}'".format(repr(sort))) sort_order = DESCENDING if sort in (SORT_FINISHED, SORT_CREATED) else ASCENDING sort_criteria = {sort: sort_order} # minimal operation, only search for matches and sort them pipeline = [{"$match": search_filters}, {"$sort": sort_criteria}] # results by group categories if group_by: group_by = [group_by] if isinstance(group_by, str) else group_by # type: List[str] group_categories = {field: "$" + field for field in group_by} # fields that can generate groups pipeline.extend([{ "$group": { "_id": group_categories, # grouping categories to aggregate corresponding jobs "jobs": {"$push": "$$ROOT"}, # matched jobs for corresponding grouping categories "count": {"$sum": 1}}, # count of matches for corresponding grouping categories }, { # noqa: E123 # ignore indentation checks "$project": { "_id": False, # removes "_id" field from results "category": "$_id", # renames "_id" grouping categories key "jobs": "$jobs", # preserve field "count": "$count", # preserve field } }]) found = self.collection.aggregate(pipeline) items = [{k: (v if k != "jobs" else [Job(j) for j in v]) # convert to Job object where applicable for k, v in i.items()} for i in found] # results with paging else: pipeline.extend([{"$skip": page * limit}, {"$limit": limit}]) found = self.collection.aggregate(pipeline) items = [Job(item) for item in list(found)] total = self.collection.count_documents(search_filters) return items, total
[docs] def clear_jobs(self): # type: () -> bool """ Removes all jobs from `MongoDB` storage. """ self.collection.drop() return True
[docs]class MongodbQuoteStore(StoreQuotes, MongodbStore): """ Registry for quotes. Uses `MongoDB` to store quote attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreQuotes.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_quote(self, quote): # type: (Quote) -> Quote """ Stores a quote in mongodb. """ if not isinstance(quote, Quote): raise QuoteInstanceError("Invalid quote object: '{}'".format(repr(quote))) try: self.collection.insert_one(quote.params()) quote = self.fetch_by_id( except Exception as ex: raise QuoteRegistrationError("Error occurred during quote registration: [{}]".format(repr(ex))) if quote is None: raise QuoteRegistrationError("Failed to retrieve registered quote.") return quote
[docs] def fetch_by_id(self, quote_id): # type: (str) -> Quote """ Gets quote for given ``quote_id`` from `MongoDB` storage. """ quote = self.collection.find_one({"id": quote_id}) if not quote: raise QuoteNotFound("Could not find quote matching: '{}'".format(quote_id)) return Quote(quote)
[docs] def list_quotes(self): # type: (...) -> List[Quote] """ Lists all quotes in `MongoDB` storage. """ quotes = [] for quote in self.collection.find().sort("id", ASCENDING): quotes.append(Quote(quote)) return quotes
[docs] def find_quotes(self, process_id=None, page=0, limit=10, sort=None): # type: (Optional[str], int, int, Optional[str]) -> Tuple[List[Quote], int] """ Finds all quotes in `MongoDB` storage matching search filters. Returns a tuple of filtered ``items`` and their ``count``, where ``items`` can have paging and be limited to a maximum per page, but ``count`` always indicate the `total` number of matches. """ search_filters = {} if isinstance(process_id, str): search_filters["process"] = process_id if sort is None: sort = SORT_ID if sort not in QUOTE_SORT_VALUES: raise QuoteNotFound("Invalid sorting method: '{!s}'".format(sort)) sort_order = ASCENDING sort_criteria = [(sort, sort_order)] found = self.collection.find(search_filters) count = found.count() items = [Quote(item) for item in list(found.skip(page * limit).limit(limit).sort(sort_criteria))] return items, count
[docs]class MongodbBillStore(StoreBills, MongodbStore): """ Registry for bills. Uses `MongoDB` to store bill attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreBills.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_bill(self, bill): # type: (Bill) -> Bill """ Stores a bill in mongodb. """ if not isinstance(bill, Bill): raise BillInstanceError("Invalid bill object: '{}'".format(repr(bill))) try: self.collection.insert_one(bill.params()) bill = self.fetch_by_id( except Exception as ex: raise BillRegistrationError("Error occurred during bill registration: [{}]".format(repr(ex))) if bill is None: raise BillRegistrationError("Failed to retrieve registered bill.") return Bill(bill)
[docs] def fetch_by_id(self, bill_id): # type: (str) -> Bill """ Gets bill for given ``bill_id`` from `MongoDB` storage. """ bill = self.collection.find_one({"id": bill_id}) if not bill: raise BillNotFound("Could not find bill matching: '{}'".format(bill_id)) return Bill(bill)
[docs] def list_bills(self): # type: (...) -> List[Bill] """ Lists all bills in `MongoDB` storage. """ bills = [] for bill in self.collection.find().sort("id", ASCENDING): bills.append(Bill(bill)) return bills
[docs] def find_bills(self, quote_id=None, page=0, limit=10, sort=None): # type: (Optional[str], int, int, Optional[str]) -> Tuple[List[Bill], int] """ Finds all bills in `MongoDB` storage matching search filters. Returns a tuple of filtered ``items`` and their ``count``, where ``items`` can have paging and be limited to a maximum per page, but ``count`` always indicate the `total` number of matches. """ search_filters = {} if isinstance(quote_id, str): search_filters["quote"] = quote_id if sort is None: sort = SORT_ID if sort not in BILL_SORT_VALUES: raise BillNotFound("Invalid sorting method: '{}'".format(repr(sort))) sort_order = ASCENDING sort_criteria = [(sort, sort_order)] found = self.collection.find(search_filters) count = found.count() items = [Bill(item) for item in list(found.skip(page * limit).limit(limit).sort(sort_criteria))] return items, count