Stores to read/write data to from/to MongoDB using pymongo.

Module Contents

class weaver.store.mongodb.MongodbStore(collection: pymongo.collection.Collection, sane_name_config: Dict[str, Any] | None = None)[source]

Base class extended by all concrete store implementations.

classmethod get_args_kwargs(*args: Any, **kwargs: Any) Tuple[Tuple, Dict][source]

Filters MongodbStore-specific arguments to safely pass them down its __init__.

class weaver.store.mongodb.MongodbServiceStore(*args: Any, **kwargs: Any)[source]

Registry for OWS services.

Uses MongoDB to store service url and attributes.

save_service(service: weaver.datatype.Service, overwrite: bool = True) weaver.datatype.Service[source]

Stores an OWS service in mongodb.

delete_service(name: str) bool[source]

Removes service from MongoDB storage.

list_services() List[weaver.datatype.Service][source]

Lists all services in MongoDB storage.

fetch_by_name(name: str, visibility: weaver.visibility.AnyVisibility | None = None) weaver.datatype.Service[source]

Gets service for given name from MongoDB storage.

fetch_by_url(url: str) weaver.datatype.Service[source]

Gets service for given url from MongoDB storage.

clear_services() bool[source]

Removes all OWS services from MongoDB storage.

class weaver.store.mongodb.ListingMixin[source]
static _apply_paging_pipeline(page: int | None, limit: int | None) List[MongodbAggregateStep][source]
static _apply_sort_method(sort_field: weaver.sort.AnySortType | None, sort_default: weaver.sort.AnySortType, sort_allowed: List[weaver.sort.AnySortType]) MongodbAggregateSortOrder[source]
static _apply_total_result(search_pipeline: MongodbAggregatePipeline, extra_pipeline: MongodbAggregatePipeline) MongodbAggregatePipeline[source]

Extends the pipeline operations in order to obtain the grand total of matches in parallel to other filtering.

A dual-branch search pipeline is created to apply distinct operations on each facet. The initial search are executed only once for both facets. The first obtains results with other processing steps specified, and the second calculates the total results.

The result of the aggregation pipeline following this operation will be returned in the following format:

    "items": [ MatchedDocument, MatchedDocument, ... ],
    "total": int
  • search_pipeline – pipeline employed to obtain initial matches against search filters.

  • extra_pipeline – additional steps to generate specific results.


combination of the grand total of all items and their following processing representation.

class weaver.store.mongodb.MongodbProcessStore(*args: Any, **kwargs: Any)[source]

Registry for processes.

Uses MongoDB to store processes and attributes.

_register_defaults(processes: List[weaver.datatype.Process]) None[source]

Default process registration to apply definition updates with duplicate entry handling.

_add_process(process: weaver.typedefs.AnyProcess, upsert: bool = False) None[source]

Stores the specified process to the database.

The operation assumes that any conflicting or duplicate process definition was pre-validated. Parameter upsert=True can be employed to allow exact replacement and ignoring duplicate errors. When using upsert=True, it is assumed that whichever the result (insert, update, duplicate error) arises, the final result of the stored process should be identical in each case.


Parameter upsert=True is useful for initialization-time of the storage with default processes that can sporadically generate clashing-inserts between multi-threaded/workers applications that all try adding builtin processes around the same moment.

static _get_process_field(process: weaver.typedefs.AnyProcess, function_dict: Dict[weaver.typedefs.AnyProcessClass, Callable[[], Any]] | Callable[[], Any]) Any[source]

Obtain a field from a process instance after validation and using mapping of process implementation functions.

Takes a lambda expression or a dict of process-specific lambda expressions to retrieve a field. Validates that the passed process object is one of the supported types.

  • process – process to retrieve the field from.

  • function_dict – lambda or dict of lambda of process type.


retrieved field if the type was supported.


ProcessInstanceError – invalid process type.

_get_process_id(process: weaver.typedefs.AnyProcessRef | weaver.typedefs.AnyProcess) str[source]
_get_process_type(process: weaver.typedefs.AnyProcess) weaver.processes.types.AnyProcessType[source]
_get_process_endpoint_wps1(process: weaver.typedefs.AnyProcess) str[source]
save_process(process: weaver.typedefs.AnyProcessRef | weaver.datatype.Process | pywps.Process, overwrite: bool = True) weaver.datatype.Process[source]

Stores a process in storage.

  • process – An instance of weaver.datatype.Process.

  • overwrite – Overwrite the matching process instance by name if conflicting.

delete_process(process_id: str, visibility: weaver.visibility.Visibility | None = None) bool[source]

Removes process from database, optionally filtered by visibility.

If visibility=None, the process is deleted (if existing) regardless of its visibility value.

list_processes(visibility: weaver.visibility.AnyVisibility | List[weaver.visibility.AnyVisibility] | None = None, page: int | None = None, limit: int | None = None, sort: weaver.sort.AnySortType | None = None, total: bool = False, revisions: bool = False, process: str | None = None) List[weaver.datatype.Process] | Tuple[List[weaver.datatype.Process], int][source]

Lists all processes in database, optionally filtered by visibility.

  • visibility – One or many value amongst Visibility.

  • page – Page number to return when using result paging.

  • limit – Number of processes per page when using result paging.

  • sort – Field which is used for sorting results (default: process ID, descending).

  • total – Request the total number of processes to be calculated (ignoring paging).

  • revisions – Include all process revisions instead of only latest ones.

  • process – Limit results only to specified process ID (makes sense mostly when combined with revisions).


List of sorted, and possibly page-filtered, processes matching queries. If total was requested, return a tuple of this list and the number of processes.

Obtain the search criteria and version of the specified Process ID if it specified a revision tag.


Database search operation and the matched version as string.

fetch_by_id(process_id: weaver.typedefs.AnyProcessRef, visibility: weaver.visibility.AnyVisibility | None = None) weaver.datatype.Process[source]

Get process for given process_id from storage, optionally filtered by visibility.

If visibility=None, the process is retrieved (if existing) regardless of its visibility value.

  • process_id – Process identifier (optionally with version tag).

  • visibility – One value amongst weaver.visibility.


An instance of weaver.datatype.Process.

find_versions(process_id: weaver.typedefs.AnyProcessRef, version_format: weaver.utils.VersionFormat = VersionFormat.OBJECT) List[weaver.typedefs.AnyVersion][source]

Retrieves all existing versions of a given process.

update_version(process_id: weaver.typedefs.AnyProcessRef, version: weaver.typedefs.AnyVersion) weaver.datatype.Process[source]

Updates the specified (latest) process ID to become an older revision.

See also

Use revert_latest() for the inverse operation.


Updated process definition with older revision.

revert_latest(process_id: weaver.typedefs.AnyProcessRef) weaver.datatype.Process[source]

Makes the specified (older) revision process the new latest revision.

Assumes there are no active latest in storage. If one is still defined, it will generate a conflict. The process ID must also contain a tagged revision. Failing to provide a version will fail the operation.

See also

Use update_version() for the inverse operation.


Updated process definition with older revision.

get_estimator(process_id: weaver.typedefs.AnyProcessRef) weaver.typedefs.JSON[source]

Get estimator of a process.

set_estimator(process_id: weaver.typedefs.AnyProcessRef, estimator: weaver.typedefs.JSON) None[source]

Set estimator of a process.

get_visibility(process_id: weaver.typedefs.AnyProcessRef) weaver.visibility.Visibility[source]

Get visibility of a process.


One value amongst weaver.visibility.

set_visibility(process_id: weaver.typedefs.AnyProcessRef, visibility: weaver.visibility.AnyVisibility) None[source]

Set visibility of a process.

  • visibility – One value amongst weaver.visibility.

  • process_id

clear_processes() bool[source]

Clears all processes from the store.

class weaver.store.mongodb.MongodbJobStore(*args: Any, **kwargs: Any)[source]

Registry for process jobs tracking.

Uses MongoDB to store job attributes.

save_job(task_id: weaver.typedefs.AnyUUID, process: weaver.typedefs.AnyProcessRef, service: weaver.typedefs.AnyServiceRef | None = None, inputs: weaver.typedefs.ExecutionInputs | None = None, outputs: weaver.typedefs.ExecutionOutputs | None = None, is_workflow: bool = False, is_local: bool = False, execute_async: bool = True, execute_response: weaver.execute.AnyExecuteResponse | None = None, custom_tags: List[str] | None = None, user_id: int | None = None, access: weaver.visibility.AnyVisibility | None = None, context: str | None = None, subscribers: weaver.typedefs.ExecutionSubscribers | None = None, accept_language: str | None = None, created: datetime.datetime | None = None) weaver.datatype.Job[source]

Creates a new Job and stores it in mongodb.

batch_update_jobs(job_filter: Dict[str, Any], job_update: Dict[str, Any]) int[source]

Update specified fields of matched jobs against filters.

  • job_update – Fields and values to update on matched jobs.

  • job_filter – Fields to filter jobs to be updated.


Number of affected jobs.

update_job(job: weaver.datatype.Job) weaver.datatype.Job[source]

Updates a job parameters in MongoDB storage.


job – instance of weaver.datatype.Job.

delete_job(job_id: weaver.typedefs.AnyUUID) bool[source]

Removes job from MongoDB storage.

fetch_by_id(job_id: weaver.typedefs.AnyUUID) weaver.datatype.Job[source]

Gets job for given job_id from MongoDB storage.

list_jobs() List[weaver.datatype.Job][source]

Lists all jobs in MongoDB storage.

For user-specific access to available jobs, use MongodbJobStore.find_jobs() instead.

find_jobs(process: str | None = None, service: str | None = None, job_type: str | None = None, tags: List[str] | None = None, access: str | None = None, status: weaver.status.AnyStatusSearch | List[weaver.status.AnyStatusSearch] | None = None, sort: weaver.sort.AnySortType | None = None, page: int | None = 0, limit: int | None = 10, min_duration: int | None = None, max_duration: int | None = None, datetime_interval: weaver.store.base.DatetimeIntervalType | None = None, group_by: str | List[str] | None = None, request: pyramid.request.Request | None = None) weaver.store.base.JobSearchResult[source]

Finds all jobs in MongoDB storage matching search filters to obtain results with requested paging or grouping.

Using paging (default), result will be in the form.

    [Job(1), Job(2), Job(3), ...],

Where <total> will indicate the complete count of matched jobs with filters, but the list of jobs will be limited only to page index and limit specified.

Limit and paging can be disabled by setting them to None. Paging must always be combined with limit, but limit can be employed by itself.

Using grouping with a list of field specified with group_by, results will be in the form.

    [{category: {field1: valueA, field2: valueB, ...}, [Job(1), Job(2), ...], count: <count>},
     {category: {field1: valueC, field2: valueD, ...}, [Job(x), Job(y), ...], count: <count>},

Where <total> will again indicate all matched jobs by every category combined, and <count> will indicate the amount of jobs matched for each individual category. Also, category will indicate values of specified fields (from group_by) that compose corresponding jobs with matching values.

  • request – request that lead to this call to obtain permissions and user id.

  • process – process name to filter matching jobs.

  • service – service name to filter matching jobs.

  • job_type – filter matching jobs for given type.

  • tags – list of tags to filter matching jobs.

  • access – access visibility to filter matching jobs (default: Visibility.PUBLIC).

  • status – status to filter matching jobs.

  • sort – field which is used for sorting results (default: creation date, descending).

  • page – page number to return when using result paging (only when not using group_by).

  • limit – number of jobs (per page or total) when using result paging (only when not using group_by).

  • min_duration – minimal duration (seconds) between started time and current/finished time of jobs to find.

  • max_duration – maximum duration (seconds) between started time and current/finished time of jobs to find.

  • datetime_interval – field used for filtering data by creation date with a given date or interval of date.

  • group_by – one or many fields specifying categories to form matching groups of jobs (paging disabled).


(list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job.

_find_jobs_grouped(pipeline: MongodbAggregatePipeline, group_categories: List[str]) Tuple[weaver.store.base.JobGroupCategory, int][source]

Retrieves jobs regrouped by specified field categories and predefined search pipeline filters.

_find_jobs_paging(search_pipeline: MongodbAggregatePipeline, page: int | None, limit: int | None) Tuple[List[weaver.datatype.Job], int][source]

Retrieves jobs limited by specified paging parameters and predefined search pipeline filters.

static _apply_tags_filter(tags: str | List[str] | None) MongodbAggregateExpression[source]
static _apply_access_filter(access: weaver.visibility.AnyVisibility, request: pyramid.request.Request) MongodbAggregateExpression[source]
static _apply_ref_or_type_filter(job_type: str | None, process: str | None, service: str | None) MongodbAggregateExpression[source]
static _apply_status_filter(status: weaver.status.AnyStatusSearch | List[weaver.status.AnyStatusSearch] | None) MongodbAggregateExpression[source]
static _apply_datetime_filter(datetime_interval: weaver.store.base.DatetimeIntervalType | None) MongodbAggregateExpression[source]
static _apply_duration_filter(pipeline: MongodbAggregatePipeline, min_duration: int | None, max_duration: int | None) MongodbAggregatePipeline[source]

Generate the filter required for comparing against Job.duration().

Assumes that the first item of the pipeline is $match since steps must be applied before and after. Pipeline is modified inplace and returned as well.

clear_jobs() bool[source]

Removes all jobs from MongoDB storage.

class weaver.store.mongodb.MongodbQuoteStore(*args: Any, **kwargs: Any)[source]

Registry for quotes.

Uses MongoDB to store quote attributes.

_apply_quote(quote: weaver.datatype.Quote, override: bool = False) weaver.datatype.Quote[source]
save_quote(quote: weaver.datatype.Quote) weaver.datatype.Quote[source]

Stores a quote in MongoDB storage.

update_quote(quote: weaver.datatype.Quote) weaver.datatype.Quote[source]

Update quote parameters in MongoDB storage.

fetch_by_id(quote_id: weaver.typedefs.AnyUUID) weaver.datatype.Quote[source]

Gets quote for given quote_id from MongoDB storage.

list_quotes() List[weaver.datatype.Quote][source]

Lists all quotes in MongoDB storage.

find_quotes(process_id: str | None = None, page: int = 0, limit: int = 10, sort: weaver.sort.AnySortType | None = None) Tuple[List[weaver.datatype.Quote], int][source]

Finds all quotes in MongoDB storage matching search filters.

Returns a tuple of filtered items and their total, where items can have paging and be limited to a maximum per page, but total always indicate the total number of matches excluding paging.

class weaver.store.mongodb.MongodbBillStore(*args, **kwargs)[source]

Registry for bills.

Uses MongoDB to store bill attributes.

save_bill(bill: weaver.datatype.Bill) weaver.datatype.Bill[source]

Stores a bill in mongodb.

fetch_by_id(bill_id: str) weaver.datatype.Bill[source]

Gets bill for given bill_id from MongoDB storage.

list_bills() List[weaver.datatype.Bill][source]

Lists all bills in MongoDB storage.

find_bills(quote_id: str | None = None, page: int = 0, limit: int = 10, sort: weaver.sort.AnySortType | None = None) Tuple[List[weaver.datatype.Bill], int][source]

Finds all bills in MongoDB storage matching search filters.

Returns a tuple of filtered items and their total, where items can have paging and be limited to a maximum per page, but total always indicate the total number of matches excluding paging.

class weaver.store.mongodb.MongodbVaultStore(*args: Any, **kwargs: Any)[source]

Registry for vault files.

Uses MongoDB to store vault files attributes.

get_file(file_id: weaver.typedefs.AnyUUID, nothrow: bool = False) weaver.datatype.VaultFile | None[source]

Gets vault file for given file_id from MongoDB storage.


VaultFileNotFound – If the file does not exist and nothrow was not requested.


Found file if it exists or None if it doesn’t exist and nothrow was requested.

save_file(file: weaver.datatype.VaultFile) None[source]

Stores a vault file in MongoDB storage.

delete_file(file: weaver.datatype.VaultFile | weaver.typedefs.AnyUUID) bool[source]

Removes vault file from MongoDB storage.