weaver.store.mongodb
¶
Stores to read/write data to from/to MongoDB using pymongo.
Module Contents¶
-
class
weaver.store.mongodb.
MongodbStore
(collection: Optional[Dict[str, Any]], sane_name_config=None)[source]¶ Base class extended by all concrete store implementations.
-
classmethod
get_args_kwargs
(cls: Any, *args: Any, **kwargs) → Tuple[Tuple, Dict][source]¶ Filters
MongodbStore
-specific arguments to safely pass them down its__init__
.
-
classmethod
-
class
weaver.store.mongodb.
MongodbServiceStore
(*args, **kwargs)[source]¶ Registry for OWS services.
Uses MongoDB to store service url and attributes.
-
save_service
(self: weaver.datatype.Service, service: bool, overwrite=True) → weaver.datatype.Service[source]¶ Stores an OWS service in mongodb.
-
list_services
(self) → List[weaver.datatype.Service][source]¶ Lists all services in MongoDB storage.
-
fetch_by_name
(self: str, name: Optional[str], visibility=None) → weaver.datatype.Service[source]¶ Gets service for given
name
from MongoDB storage.
-
fetch_by_url
(self: str, url) → weaver.datatype.Service[source]¶ Gets service for given
url
from MongoDB storage.
-
-
class
weaver.store.mongodb.
MongodbProcessStore
(*args, **kwargs)[source]¶ Registry for processes.
Uses MongoDB to store processes and attributes.
-
_register_defaults
(self: List[weaver.datatype.Process], processes) → None[source]¶ Default process registration to apply definition updates with duplicate entry handling.
-
_add_process
(self: weaver.typedefs.AnyProcess, process: bool, upsert=False) → None[source]¶ Stores the specified process to the database.
The operation assumes that any conflicting or duplicate process definition was pre-validated. Parameter
upsert=True
can be employed to allow exact replacement and ignoring duplicate errors. When usingupsert=True
, it is assumed that whichever the result (insert, update, duplicate error) arises, the final result of the stored process should be identical in each case.Note
Parameter
upsert=True
is useful for initialization-time of the storage with default processes that can sporadically generate clashing-inserts between multi-threaded/workers applications that all try adding builtin processes around the same moment.
-
static
_get_process_field
(process: weaver.typedefs.AnyProcess, function_dict: Union[Dict[weaver.typedefs.AnyProcessType, Callable[], Any]], Callable[], Any]]) → Any[source]¶ Obtain a field from a process instance after validation and using mapping of process implementation functions.
Takes a lambda expression or a dict of process-specific lambda expressions to retrieve a field. Validates that the passed process object is one of the supported types.
- Parameters
process – process to retrieve the field from.
function_dict – lambda or dict of lambda of process type.
- Returns
retrieved field if the type was supported.
- Raises
ProcessInstanceError – invalid process type.
-
save_process
(self: Union[weaver.datatype.Process, pywps.Process], process: bool, overwrite=True) → weaver.datatype.Process[source]¶ Stores a process in storage.
- Parameters
process – An instance of
weaver.datatype.Process
.overwrite – Overwrite the matching process instance by name if conflicting.
-
delete_process
(self: str, process_id: Optional[str], visibility=None) → bool[source]¶ Removes process from database, optionally filtered by visibility.
If
visibility=None
, the process is deleted (if existing) regardless of its visibility value.
-
list_processes
(self: Optional[str], visibility=None) → List[weaver.datatype.Process][source]¶ Lists all processes in database, optionally filtered by visibility.
- Parameters
visibility – One value amongst weaver.visibility.
-
fetch_by_id
(self: str, process_id: Optional[str], visibility=None) → weaver.datatype.Process[source]¶ Get process for given
process_id
from storage, optionally filtered byvisibility
.If
visibility=None
, the process is retrieved (if existing) regardless of its visibility value.- Parameters
process_id – process identifier
visibility – one value amongst
weaver.visibility
.
- Returns
An instance of
weaver.datatype.Process
.
-
get_visibility
(self: str, process_id) → str[source]¶ Get visibility of a process.
- Returns
One value amongst weaver.visibility.
-
set_visibility
(self: str, process_id: str, visibility) → None[source]¶ Set visibility of a process.
- Parameters
visibility – One value amongst weaver.visibility.
process_id –
- Raises
ValueError – when
visibility
is not one ofweaver.visibility.VISIBILITY_VALUES
.
-
-
class
weaver.store.mongodb.
MongodbJobStore
(*args, **kwargs)[source]¶ Registry for process jobs tracking.
Uses MongoDB to store job attributes.
-
save_job
(self: Ellipsis, task_id: str, process: str, service: Optional[str] = None, inputs: Optional[List[Any]] = None, is_workflow: bool = False, is_local: bool = False, execute_async: bool = True, custom_tags: Optional[List[str]] = None, user_id: Optional[int] = None, access: Optional[str] = None, context: Optional[str] = None, notification_email: Optional[str] = None, accept_language: Optional[str] = None, created: Optional[datetime.datetime] = None) → weaver.datatype.Job[source]¶ Creates a new
Job
and stores it in mongodb.
-
update_job
(self: weaver.datatype.Job, job) → weaver.datatype.Job[source]¶ Updates a job parameters in MongoDB storage.
- Parameters
job – instance of
weaver.datatype.Job
.
-
fetch_by_id
(self: str, job_id) → weaver.datatype.Job[source]¶ Gets job for given
job_id
from MongoDB storage.
-
list_jobs
(self) → List[weaver.datatype.Job][source]¶ Lists all jobs in MongoDB storage.
For user-specific access to available jobs, use
MongodbJobStore.find_jobs()
instead.
-
find_jobs
(self: Ellipsis, process: Optional[str] = None, service: Optional[str] = None, job_type: Optional[str] = None, tags: Optional[List[str]] = None, access: Optional[str] = None, notification_email: Optional[str] = None, status: Optional[str] = None, sort: Optional[str] = None, page: int = 0, limit: int = 10, min_duration: Optional[int] = None, max_duration: Optional[int] = None, datetime_interval: Optional[weaver.store.base.DatetimeIntervalType] = None, group_by: Optional[Union[str, List[str]]] = None, request: Optional[pyramid.request.Request] = None) → weaver.store.base.JobSearchResult[source]¶ Finds all jobs in MongoDB storage matching search filters to obtain results with requested paging or grouping.
Using paging (default), result will be in the form:
( [Job(1), Job(2), Job(3), ...], <total> )
Where
<total>
will indicate the complete count of matched jobs with filters, but the list of jobs will be limited only topage
index andlimit
specified.Using grouping with a list of field specified with
group_by
, results will be in the form:( [{category: {field1: valueA, field2: valueB, ...}, [Job(1), Job(2), ...], count: <count>}, {category: {field1: valueC, field2: valueD, ...}, [Job(x), Job(y), ...], count: <count>}, ... ], <total> )
Where
<total>
will again indicate all matched jobs by every category combined, and<count>
will indicate the amount of jobs matched for each individual category. Also,category
will indicate values of specified fields (fromgroup_by
) that compose corresponding jobs with matching values.- Parameters
request – request that lead to this call to obtain permissions and user id.
process – process name to filter matching jobs.
service – service name to filter matching jobs.
job_type – filter matching jobs for given type.
tags – list of tags to filter matching jobs.
access – access visibility to filter matching jobs (default:
VISIBILITY_PUBLIC
).notification_email – notification email to filter matching jobs.
status – status to filter matching jobs.
sort – field which is used for sorting results (default: creation date, descending).
page – page number to return when using result paging (only when not using
group_by
).limit – number of jobs per page when using result paging (only when not using
group_by
).min_duration – minimal duration (seconds) between started time and current/finished time of jobs to find.
max_duration – maximum duration (seconds) between started time and current/finished time of jobs to find.
datetime_interval – field used for filtering data by creation date with a given date or interval of date.
group_by – one or many fields specifying categories to form matching groups of jobs (paging disabled).
- Returns
(list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job.
-
_find_jobs_grouped
(self: MongodbSearchPipeline, pipeline: List[str], group_categories) → Tuple[weaver.store.base.JobGroupCategory, int][source]¶ Retrieves jobs regrouped by specified field categories and predefined search pipeline filters.
-
_find_jobs_paging
(self: MongodbSearchPipeline, pipeline: int, page: int, limit) → Tuple[List[weaver.datatype.Job], int][source]¶ Retrieves jobs limited by specified paging parameters and predefined search pipeline filters.
-
static
_apply_total_result
(search_pipeline: MongodbSearchPipeline, extra_pipeline: MongodbSearchPipeline) → MongodbSearchPipeline[source]¶ Extends the pipeline operations in order to obtain the grand total of matches in parallel to other filtering.
A dual-branch search pipeline is created to apply distinct operations on each facet. The initial search are executed only once for both facets. The first obtains results with other processing steps specified, and the second calculates the total results.
- Parameters
search_pipeline – pipeline employed to obtain initial matches against search filters.
extra_pipeline – additional steps to generate specific results.
- Returns
combination of the grand total of all items and their following processing representation.
-
static
_apply_access_filter
(access: str, request: pyramid.request.Request) → MongodbSearchFilter[source]¶
-
static
_apply_ref_or_type_filter
(job_type: Optional[str], process: Optional[str], service: Optional[str]) → MongodbSearchFilter[source]¶
-
static
_apply_datetime_filter
(datetime_interval: Optional[weaver.store.base.DatetimeIntervalType]) → MongodbSearchFilter[source]¶
-
static
_apply_duration_filter
(pipeline: MongodbSearchPipeline, min_duration: Optional[int], max_duration: Optional[int]) → MongodbSearchPipeline[source]¶ Generate the filter required for comparing against
Job.duration()
.Assumes that the first item of the pipeline is
$match
since steps must be applied before and after. Pipeline is modified inplace and returned as well.
-
-
class
weaver.store.mongodb.
MongodbQuoteStore
(*args, **kwargs)[source]¶ Registry for quotes.
Uses MongoDB to store quote attributes.
-
save_quote
(self: weaver.datatype.Quote, quote) → weaver.datatype.Quote[source]¶ Stores a quote in mongodb.
-
fetch_by_id
(self: str, quote_id) → weaver.datatype.Quote[source]¶ Gets quote for given
quote_id
from MongoDB storage.
-
list_quotes
(self: Ellipsis) → List[weaver.datatype.Quote][source]¶ Lists all quotes in MongoDB storage.
-
find_quotes
(self: Optional[str], process_id: int = None, page: int = 0, limit: Optional[str] = 10, sort=None) → Tuple[List[weaver.datatype.Quote], int][source]¶ Finds all quotes in MongoDB storage matching search filters.
Returns a tuple of filtered
items
and theircount
, whereitems
can have paging and be limited to a maximum per page, butcount
always indicate the total number of matches.
-
-
class
weaver.store.mongodb.
MongodbBillStore
(*args, **kwargs)[source]¶ Registry for bills.
Uses MongoDB to store bill attributes.
-
save_bill
(self: weaver.datatype.Bill, bill) → weaver.datatype.Bill[source]¶ Stores a bill in mongodb.
-
fetch_by_id
(self: str, bill_id) → weaver.datatype.Bill[source]¶ Gets bill for given
bill_id
from MongoDB storage.
-
list_bills
(self: Ellipsis) → List[weaver.datatype.Bill][source]¶ Lists all bills in MongoDB storage.
-
find_bills
(self: Optional[str], quote_id: int = None, page: int = 0, limit: Optional[str] = 10, sort=None) → Tuple[List[weaver.datatype.Bill], int][source]¶ Finds all bills in MongoDB storage matching search filters.
Returns a tuple of filtered
items
and theircount
, whereitems
can have paging and be limited to a maximum per page, butcount
always indicate the total number of matches.
-