weaver.store.mongodb ==================== .. py:module:: weaver.store.mongodb .. autoapi-nested-parse:: Stores to read/write data to from/to `MongoDB` using pymongo. Module Contents --------------- .. py:data:: MongodbValue .. py:data:: LOGGER .. py:class:: MongodbStore(collection: pymongo.collection.Collection, sane_name_config: Optional[Dict[str, Any]] = None) Base class extended by all concrete store implementations. .. py:attribute:: collection :type: pymongo.collection.Collection .. py:attribute:: sane_name_config .. py:method:: get_args_kwargs(*args: Any, **kwargs: Any) -> Tuple[Tuple, Dict] :classmethod: Filters :class:`MongodbStore`-specific arguments to safely pass them down its ``__init__``. .. py:class:: MongodbServiceStore(*args: Any, **kwargs: Any) Registry for OWS services. Uses `MongoDB` to store service url and attributes. .. py:method:: save_service(service: weaver.datatype.Service, overwrite: bool = True) -> weaver.datatype.Service Stores an OWS service in mongodb. .. py:method:: delete_service(name: str) -> bool Removes service from `MongoDB` storage. .. py:method:: list_services() -> List[weaver.datatype.Service] Lists all services in `MongoDB` storage. .. py:method:: fetch_by_name(name: str, visibility: Optional[weaver.visibility.AnyVisibility] = None) -> weaver.datatype.Service Gets service for given ``name`` from `MongoDB` storage. .. py:method:: fetch_by_url(url: str) -> weaver.datatype.Service Gets service for given ``url`` from `MongoDB` storage. .. py:method:: clear_services() -> bool Removes all OWS services from `MongoDB` storage. .. py:class:: ListingMixin .. py:method:: _apply_paging_pipeline(page: Optional[int], limit: Optional[int]) -> List[MongodbAggregateStep] :staticmethod: .. py:method:: _apply_sort_method(sort_field: Optional[weaver.sort.AnySortType], sort_default: weaver.sort.AnySortType, sort_allowed: List[weaver.sort.AnySortType]) -> MongodbAggregateSortOrder :staticmethod: .. py:method:: _apply_total_result(search_pipeline: MongodbAggregatePipeline, extra_pipeline: MongodbAggregatePipeline) -> MongodbAggregatePipeline :staticmethod: Extends the pipeline operations in order to obtain the grand total of matches in parallel to other filtering. A dual-branch search pipeline is created to apply distinct operations on each facet. The initial search are executed only once for both facets. The first obtains results with other processing steps specified, and the second calculates the total results. The result of the aggregation pipeline following this operation will be returned in the following format: .. code-block:: python [{ "items": [ MatchedDocument, MatchedDocument, ... ], "total": int }] :param search_pipeline: pipeline employed to obtain initial matches against search filters. :param extra_pipeline: additional steps to generate specific results. :return: combination of the grand total of all items and their following processing representation. .. py:class:: MongodbProcessStore(*args: Any, **kwargs: Any) Registry for processes. Uses `MongoDB` to store processes and attributes. .. py:attribute:: settings .. py:attribute:: default_host .. py:attribute:: default_wps_endpoint .. py:method:: _register_defaults(processes: List[weaver.datatype.Process]) -> None Default process registration to apply definition updates with duplicate entry handling. .. py:method:: _add_process(process: weaver.typedefs.AnyProcess, upsert: bool = False) -> None Stores the specified process to the database. The operation assumes that any conflicting or duplicate process definition was pre-validated. Parameter ``upsert=True`` can be employed to allow exact replacement and ignoring duplicate errors. When using ``upsert=True``, it is assumed that whichever the result (insert, update, duplicate error) arises, the final result of the stored process should be identical in each case. .. note:: Parameter ``upsert=True`` is useful for initialization-time of the storage with default processes that can sporadically generate clashing-inserts between multi-threaded/workers applications that all try adding builtin processes around the same moment. .. py:method:: _get_process_field(process: weaver.typedefs.AnyProcess, function_dict: Union[Dict[weaver.typedefs.AnyProcessClass, Callable[[], Any]], Callable[[], Any]]) -> Any :staticmethod: Obtain a field from a process instance after validation and using mapping of process implementation functions. Takes a lambda expression or a dict of process-specific lambda expressions to retrieve a field. Validates that the passed process object is one of the supported types. :param process: process to retrieve the field from. :param function_dict: lambda or dict of lambda of process type. :return: retrieved field if the type was supported. :raises ProcessInstanceError: invalid process type. .. py:method:: _get_process_id(process: Union[weaver.typedefs.AnyProcessRef, weaver.typedefs.AnyProcess]) -> str .. py:method:: _get_process_type(process: weaver.typedefs.AnyProcess) -> weaver.processes.types.AnyProcessType .. py:method:: _get_process_endpoint_wps1(process: weaver.typedefs.AnyProcess) -> str .. py:method:: save_process(process: Union[weaver.typedefs.AnyProcessRef, weaver.datatype.Process, pywps.Process], overwrite: bool = True) -> weaver.datatype.Process Stores a process in storage. :param process: An instance of :class:`weaver.datatype.Process`. :param overwrite: Overwrite the matching process instance by name if conflicting. .. py:method:: delete_process(process_id: str, visibility: Optional[weaver.visibility.Visibility] = None) -> bool Removes process from database, optionally filtered by visibility. If ``visibility=None``, the process is deleted (if existing) regardless of its visibility value. .. py:method:: list_processes(visibility: Optional[weaver.visibility.AnyVisibility, List[weaver.visibility.AnyVisibility]] = None, page: Optional[int] = None, limit: Optional[int] = None, sort: Optional[weaver.sort.AnySortType] = None, total: bool = False, revisions: bool = False, process: Optional[str] = None) -> Union[List[weaver.datatype.Process], Tuple[List[weaver.datatype.Process], int]] Lists all processes in database, optionally filtered by `visibility`. :param visibility: One or many value amongst :class:`Visibility`. :param page: Page number to return when using result paging. :param limit: Number of processes per page when using result paging. :param sort: Field which is used for sorting results (default: process ID, descending). :param total: Request the total number of processes to be calculated (ignoring paging). :param revisions: Include all process revisions instead of only latest ones. :param process: Limit results only to specified process ID (makes sense mostly when combined with revisions). :returns: List of sorted, and possibly page-filtered, processes matching queries. If ``total`` was requested, return a tuple of this list and the number of processes. .. py:method:: _get_revision_search(process_id: str) -> Tuple[MongodbAggregateExpression, Optional[str]] Obtain the search criteria and version of the specified :term:`Process` ID if it specified a revision tag. :return: Database search operation and the matched version as string. .. py:method:: fetch_by_id(process_id: weaver.typedefs.AnyProcessRef, visibility: Optional[weaver.visibility.AnyVisibility] = None) -> weaver.datatype.Process Get process for given :paramref:`process_id` from storage, optionally filtered by :paramref:`visibility`. If ``visibility=None``, the process is retrieved (if existing) regardless of its visibility value. :param process_id: Process identifier (optionally with version tag). :param visibility: One value amongst :py:mod:`weaver.visibility`. :return: An instance of :class:`weaver.datatype.Process`. .. py:method:: find_versions(process_id: weaver.typedefs.AnyProcessRef, version_format: weaver.utils.VersionFormat = VersionFormat.OBJECT) -> List[weaver.typedefs.AnyVersion] Retrieves all existing versions of a given process. .. py:method:: update_version(process_id: weaver.typedefs.AnyProcessRef, version: weaver.typedefs.AnyVersion) -> weaver.datatype.Process Updates the specified (latest) process ID to become an older revision. .. seealso:: Use :meth:`revert_latest` for the inverse operation. :returns: Updated process definition with older revision. .. py:method:: revert_latest(process_id: weaver.typedefs.AnyProcessRef) -> weaver.datatype.Process Makes the specified (older) revision process the new latest revision. Assumes there are no active *latest* in storage. If one is still defined, it will generate a conflict. The process ID must also contain a tagged revision. Failing to provide a version will fail the operation. .. seealso:: Use :meth:`update_version` for the inverse operation. :returns: Updated process definition with older revision. .. py:method:: get_estimator(process_id: weaver.typedefs.AnyProcessRef) -> weaver.typedefs.JSON Get `estimator` of a process. .. py:method:: set_estimator(process_id: weaver.typedefs.AnyProcessRef, estimator: weaver.typedefs.JSON) -> None Set `estimator` of a process. .. py:method:: get_visibility(process_id: weaver.typedefs.AnyProcessRef) -> weaver.visibility.Visibility Get `visibility` of a process. :returns: One value amongst `weaver.visibility`. .. py:method:: set_visibility(process_id: weaver.typedefs.AnyProcessRef, visibility: weaver.visibility.AnyVisibility) -> None Set `visibility` of a process. :param visibility: One value amongst `weaver.visibility`. :param process_id: :raises TypeError: when :paramref:`visibility` is not :class:`str`. :raises ValueError: when :paramref:`visibility` is not one of :class:`Visibility`. .. py:method:: clear_processes() -> bool Clears all processes from the store. .. py:class:: MongodbJobStore(*args: Any, **kwargs: Any) Registry for process jobs tracking. Uses `MongoDB` to store job attributes. .. py:method:: save_job(task_id: weaver.typedefs.AnyUUID, process: weaver.typedefs.AnyProcessRef, service: Optional[weaver.typedefs.AnyServiceRef] = None, inputs: Optional[weaver.typedefs.ExecutionInputs] = None, outputs: Optional[weaver.typedefs.ExecutionOutputs] = None, is_workflow: bool = False, is_local: bool = False, execute_mode: Optional[weaver.execute.AnyExecuteMode] = None, execute_wait: Optional[int] = None, execute_response: Optional[weaver.execute.AnyExecuteResponse] = None, execute_return: Optional[weaver.execute.AnyExecuteReturnPreference] = None, custom_tags: Optional[List[str]] = None, user_id: Optional[int] = None, access: Optional[weaver.visibility.AnyVisibility] = None, context: Optional[str] = None, subscribers: Optional[weaver.typedefs.ExecutionSubscribers] = None, accept_type: Optional[str] = None, accept_language: Optional[str] = None, created: Optional[datetime.datetime] = None, status: Optional[weaver.status.AnyStatusType] = None) -> weaver.datatype.Job Creates a new :class:`Job` and stores it in mongodb. .. py:method:: batch_update_jobs(job_filter: Dict[str, Any], job_update: Dict[str, Any]) -> int Update specified fields of matched jobs against filters. :param job_update: Fields and values to update on matched jobs. :param job_filter: Fields to filter jobs to be updated. :return: Number of affected jobs. .. py:method:: update_job(job: weaver.datatype.Job) -> weaver.datatype.Job Updates a job parameters in `MongoDB` storage. :param job: instance of ``weaver.datatype.Job``. .. py:method:: delete_job(job_id: weaver.typedefs.AnyUUID) -> bool Removes job from `MongoDB` storage. .. py:method:: fetch_by_id(job_id: weaver.typedefs.AnyUUID) -> weaver.datatype.Job Gets job for given ``job_id`` from `MongoDB` storage. .. py:method:: list_jobs() -> List[weaver.datatype.Job] Lists all jobs in `MongoDB` storage. For user-specific access to available jobs, use :meth:`MongodbJobStore.find_jobs` instead. .. py:method:: find_jobs(process: Optional[str] = None, service: Optional[str] = None, job_type: Optional[str] = None, tags: Optional[List[str]] = None, access: Optional[str] = None, status: Optional[weaver.status.AnyStatusSearch, List[weaver.status.AnyStatusSearch]] = None, sort: Optional[weaver.sort.AnySortType] = None, page: Optional[int] = 0, limit: Optional[int] = 10, min_duration: Optional[int] = None, max_duration: Optional[int] = None, datetime_interval: Optional[weaver.store.base.DatetimeIntervalType] = None, group_by: Optional[Union[str, List[str]]] = None, request: Optional[pyramid.request.Request] = None) -> weaver.store.base.JobSearchResult Finds all jobs in `MongoDB` storage matching search filters to obtain results with requested paging or grouping. Using paging (default), result will be in the form. .. code-block:: python ( [Job(1), Job(2), Job(3), ...], ) Where ```` will indicate the complete count of matched jobs with filters, but the list of jobs will be limited only to ``page`` index and ``limit`` specified. Limit and paging can be disabled by setting them to ``None``. Paging must always be combined with limit, but limit can be employed by itself. Using grouping with a list of field specified with ``group_by``, results will be in the form. .. code-block:: python ( [{category: {field1: valueA, field2: valueB, ...}, [Job(1), Job(2), ...], count: }, {category: {field1: valueC, field2: valueD, ...}, [Job(x), Job(y), ...], count: }, ... ], ) Where ```` will again indicate all matched jobs by every category combined, and ```` will indicate the amount of jobs matched for each individual category. Also, ``category`` will indicate values of specified fields (from ``group_by``) that compose corresponding jobs with matching values. :param request: request that lead to this call to obtain permissions and user id. :param process: process name to filter matching jobs. :param service: service name to filter matching jobs. :param job_type: filter matching jobs for given type. :param tags: list of tags to filter matching jobs. :param access: access visibility to filter matching jobs (default: :py:data:`Visibility.PUBLIC`). :param status: status to filter matching jobs. :param sort: field which is used for sorting results (default: creation date, descending). :param page: page number to return when using result paging (only when not using ``group_by``). :param limit: number of jobs (per page or total) when using result paging (only when not using ``group_by``). :param min_duration: minimal duration (seconds) between started time and current/finished time of jobs to find. :param max_duration: maximum duration (seconds) between started time and current/finished time of jobs to find. :param datetime_interval: field used for filtering data by creation date with a given date or interval of date. :param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled). :returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job. .. py:method:: _find_jobs_grouped(pipeline: MongodbAggregatePipeline, group_categories: List[str]) -> Tuple[weaver.store.base.JobGroupCategory, int] Retrieves jobs regrouped by specified field categories and predefined search pipeline filters. .. py:method:: _find_jobs_paging(search_pipeline: MongodbAggregatePipeline, page: Optional[int], limit: Optional[int]) -> Tuple[List[weaver.datatype.Job], int] Retrieves jobs limited by specified paging parameters and predefined search pipeline filters. .. py:method:: _apply_tags_filter(tags: Optional[Union[str, List[str]]]) -> MongodbAggregateExpression :staticmethod: .. py:method:: _apply_access_filter(access: weaver.visibility.AnyVisibility, request: pyramid.request.Request) -> MongodbAggregateExpression :staticmethod: .. py:method:: _apply_ref_or_type_filter(job_type: Optional[str], process: Optional[str], service: Optional[str]) -> MongodbAggregateExpression :staticmethod: .. py:method:: _apply_status_filter(status: Optional[Union[weaver.status.AnyStatusSearch, List[weaver.status.AnyStatusSearch]]]) -> MongodbAggregateExpression :staticmethod: .. py:method:: _apply_datetime_filter(datetime_interval: Optional[weaver.store.base.DatetimeIntervalType]) -> MongodbAggregateExpression :staticmethod: .. py:method:: _apply_duration_filter(pipeline: MongodbAggregatePipeline, min_duration: Optional[int], max_duration: Optional[int]) -> MongodbAggregatePipeline :staticmethod: Generate the filter required for comparing against :meth:`Job.duration`. Assumes that the first item of the pipeline is ``$match`` since steps must be applied before and after. Pipeline is modified inplace and returned as well. .. py:method:: clear_jobs() -> bool Removes all jobs from `MongoDB` storage. .. py:class:: MongodbQuoteStore(*args: Any, **kwargs: Any) Registry for quotes. Uses `MongoDB` to store quote attributes. .. py:method:: _apply_quote(quote: weaver.datatype.Quote, override: bool = False) -> weaver.datatype.Quote .. py:method:: save_quote(quote: weaver.datatype.Quote) -> weaver.datatype.Quote Stores a quote in `MongoDB` storage. .. py:method:: update_quote(quote: weaver.datatype.Quote) -> weaver.datatype.Quote Update quote parameters in `MongoDB` storage. .. py:method:: fetch_by_id(quote_id: weaver.typedefs.AnyUUID) -> weaver.datatype.Quote Gets quote for given ``quote_id`` from `MongoDB` storage. .. py:method:: list_quotes() -> List[weaver.datatype.Quote] Lists all quotes in `MongoDB` storage. .. py:method:: find_quotes(process_id: Optional[str] = None, page: int = 0, limit: int = 10, sort: Optional[weaver.sort.AnySortType] = None) -> Tuple[List[weaver.datatype.Quote], int] Finds all quotes in `MongoDB` storage matching search filters. Returns a tuple of filtered ``items`` and their ``total``, where ``items`` can have paging and be limited to a maximum per page, but ``total`` always indicate the `total` number of matches excluding paging. .. py:class:: MongodbBillStore(*args, **kwargs) Registry for bills. Uses `MongoDB` to store bill attributes. .. py:method:: save_bill(bill: weaver.datatype.Bill) -> weaver.datatype.Bill Stores a bill in mongodb. .. py:method:: fetch_by_id(bill_id: str) -> weaver.datatype.Bill Gets bill for given ``bill_id`` from `MongoDB` storage. .. py:method:: list_bills() -> List[weaver.datatype.Bill] Lists all bills in `MongoDB` storage. .. py:method:: find_bills(quote_id: Optional[str] = None, page: int = 0, limit: int = 10, sort: Optional[weaver.sort.AnySortType] = None) -> Tuple[List[weaver.datatype.Bill], int] Finds all bills in `MongoDB` storage matching search filters. Returns a tuple of filtered ``items`` and their ``total``, where ``items`` can have paging and be limited to a maximum per page, but ``total`` always indicate the `total` number of matches excluding paging. .. py:class:: MongodbVaultStore(*args: Any, **kwargs: Any) Registry for vault files. Uses `MongoDB` to store vault files attributes. .. py:method:: get_file(file_id: weaver.typedefs.AnyUUID, nothrow: bool = False) -> Optional[weaver.datatype.VaultFile] Gets vault file for given ``file_id`` from `MongoDB` storage. :raises VaultFileNotFound: If the file does not exist and :paramref:`nothrow` was not requested. :returns: Found file if it exists or ``None`` if it doesn't exist and :paramref:`nothrow` was requested. .. py:method:: save_file(file: weaver.datatype.VaultFile) -> None Stores a vault file in `MongoDB` storage. .. py:method:: delete_file(file: Union[weaver.datatype.VaultFile, weaver.typedefs.AnyUUID]) -> bool Removes vault file from `MongoDB` storage.