"""
Stores to read/write data to from/to mongodb using pymongo.
"""
import logging
from typing import TYPE_CHECKING
import pymongo
import six
from pymongo import ASCENDING, DESCENDING
from pyramid.request import Request
from pywps import Process as ProcessWPS
from weaver.datatype import Bill, Job, Process, Quote, Service
from weaver.exceptions import (
BillInstanceError,
BillNotFound,
BillRegistrationError,
JobNotFound,
JobRegistrationError,
JobUpdateError,
ProcessInstanceError,
ProcessNotAccessible,
ProcessNotFound,
ProcessRegistrationError,
QuoteInstanceError,
QuoteNotFound,
QuoteRegistrationError,
ServiceNotAccessible,
ServiceNotFound,
ServiceRegistrationError
)
from weaver.execute import EXECUTE_MODE_ASYNC, EXECUTE_MODE_SYNC
from weaver.processes.types import PROCESS_APPLICATION, PROCESS_WORKFLOW, PROCESS_WPS
from weaver.sort import (
BILL_SORT_VALUES,
JOB_SORT_VALUES,
QUOTE_SORT_VALUES,
SORT_CREATED,
SORT_FINISHED,
SORT_ID,
SORT_USER
)
from weaver.status import JOB_STATUS_CATEGORIES, STATUS_ACCEPTED, map_status
from weaver.store.base import StoreBills, StoreJobs, StoreProcesses, StoreQuotes, StoreServices
from weaver.utils import get_base_url, get_sane_name, get_weaver_url, islambda, now
from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_PUBLIC, VISIBILITY_VALUES
if TYPE_CHECKING:
from weaver.typedefs import AnyValue, AnyProcess, AnyProcessType # noqa: F401
from pymongo.collection import Collection # noqa: F401
from typing import Any, AnyStr, Callable, Dict, List, Optional, Tuple, Union # noqa: F401
[docs] JobListAndCount = Tuple[List[Job], int] # noqa: F401
JobCategory = Dict[AnyStr, Union[AnyValue, Job]] # noqa: F401
JobCategoriesAndCount = Tuple[List[JobCategory], int] # noqa: F401
[docs]LOGGER = logging.getLogger(__name__)
[docs]class MongodbStore(object):
"""
Base class extended by all concrete store implementations.
"""
def __init__(self, collection, sane_name_config=None):
# type: (Collection, Optional[Dict[AnyStr, Any]]) -> None
if not isinstance(collection, pymongo.collection.Collection):
raise TypeError("Collection not of expected type.")
self.collection = collection # type: Collection
self.sane_name_config = sane_name_config or {}
@classmethod
[docs] def get_args_kwargs(cls, *args, **kwargs):
# type: (*Any, **Any) -> Tuple[Tuple, Dict]
"""
Filters :class:`MongodbStore`-specific arguments to safely pass them down its ``__init__``.
"""
collection = None
if len(args):
collection = args[0]
elif "collection" in kwargs: # pylint: disable=R1715
collection = kwargs["collection"]
sane_name_config = kwargs.get("sane_name_config", None)
return tuple([collection]), {"sane_name_config": sane_name_config}
[docs]class MongodbServiceStore(StoreServices, MongodbStore):
"""
Registry for OWS services. Uses mongodb to store service url and attributes.
"""
def __init__(self, *args, **kwargs):
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreServices.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_service(self, service, overwrite=True, request=None):
# type: (Service, bool, Optional[Request]) -> Service
"""
Stores an OWS service in mongodb.
"""
service_url = get_base_url(service.url)
# check if service is already registered
if self.collection.count_documents({"url": service_url}) > 0:
if overwrite:
self.collection.delete_one({"url": service_url})
else:
raise ServiceRegistrationError("service url already registered.")
service_name = get_sane_name(service.name, **self.sane_name_config)
if self.collection.count_documents({"name": service_name}) > 0:
if overwrite:
self.collection.delete_one({"name": service_name})
else:
raise ServiceRegistrationError("service name already registered.")
self.collection.insert_one(Service(
url=service_url,
name=service_name,
type=service.type,
public=service.public,
auth=service.auth).params())
return self.fetch_by_url(url=service_url, request=request)
[docs] def delete_service(self, name, request=None): # noqa: E811
# type: (AnyStr, Optional[Request]) -> bool
"""
Removes service from mongodb storage.
"""
self.collection.delete_one({"name": name})
return True
[docs] def list_services(self, request=None): # noqa: E811 # noqa: E811
# type: (Optional[Request]) -> List[Service]
"""
Lists all services in mongodb storage.
"""
my_services = []
for service in self.collection.find().sort("name", pymongo.ASCENDING):
my_services.append(Service(service))
return my_services
[docs] def fetch_by_name(self, name, visibility=None, request=None): # noqa: E811
# type: (AnyStr, Optional[AnyStr], Optional[Request]) -> Service
"""
Gets service for given ``name`` from mongodb storage.
"""
service = self.collection.find_one({"name": name})
if not service:
raise ServiceNotFound("Service '{}' could not be found.".format(name))
service = Service(service)
same_visibility = (service.public and visibility == VISIBILITY_PUBLIC) or \
(not service.public and visibility == VISIBILITY_PRIVATE)
if visibility is not None and not same_visibility:
raise ServiceNotAccessible("Service '{}' cannot be accessed.".format(name))
return service
[docs] def fetch_by_url(self, url, request=None): # noqa: E811
# type: (AnyStr, Optional[Request]) -> Service
"""
Gets service for given ``url`` from mongodb storage.
"""
service = self.collection.find_one({"url": get_base_url(url)})
if not service:
raise ServiceNotFound
return Service(service)
[docs] def clear_services(self, request=None): # noqa: E811
# type: (Optional[Request]) -> bool
"""
Removes all OWS services from mongodb storage.
"""
self.collection.drop()
return True
[docs]class MongodbProcessStore(StoreProcesses, MongodbStore):
"""
Registry for processes. Uses mongodb to store processes and attributes.
"""
def __init__(self, *args, **kwargs):
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreProcesses.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
registry = kwargs.get("registry")
settings = kwargs.get("settings", {}) if not registry else registry.settings
default_processes = kwargs.get("default_processes")
self.default_host = get_weaver_url(settings)
self.default_wps_endpoint = "{host}{wps}".format(host=self.default_host,
wps=settings.get("weaver.wps_path", ""))
# enforce default process re-registration to receive any applicable update
if default_processes:
registered_processes = [process.identifier for process in self.list_processes()]
for process in default_processes:
process_name = self._get_process_id(process)
if process_name in registered_processes:
self.delete_process(process_name)
self._add_process(process)
[docs] def _add_process(self, process):
# type: (AnyProcess) -> None
if isinstance(process, ProcessWPS):
new_process = Process.from_wps(process, processEndpointWPS1=self.default_wps_endpoint)
else:
new_process = process
if not isinstance(new_process, Process):
raise ProcessInstanceError("Unsupported process type '{}'".format(type(process)))
# apply defaults if not specified
new_process["type"] = self._get_process_type(process)
new_process["identifier"] = self._get_process_id(process)
new_process["processEndpointWPS1"] = self._get_process_endpoint_wps1(process)
new_process["visibility"] = new_process.visibility
self.collection.insert_one(new_process.params())
@staticmethod
[docs] def _get_process_field(process, function_dict):
# type: (AnyProcess, Union[Dict[AnyProcessType, Callable[[], Any]], Callable[[], Any]]) -> Any
"""
Takes a lambda expression or a dict of process-specific lambda expressions to retrieve a field.
Validates that the passed process object is one of the supported types.
:param process: process to retrieve the field from.
:param function_dict: lambda or dict of lambda of process type
:return: retrieved field if the type was supported
:raises: ProcessInstanceError on invalid process type
"""
if isinstance(process, Process):
if islambda(function_dict):
return function_dict()
return function_dict[Process]()
elif isinstance(process, ProcessWPS):
if islambda(function_dict):
return function_dict()
return function_dict[ProcessWPS]()
else:
raise ProcessInstanceError("Unsupported process type '{}'".format(type(process)))
[docs] def _get_process_id(self, process):
# type: (AnyProcess) -> AnyStr
return self._get_process_field(process, lambda: process.identifier)
[docs] def _get_process_type(self, process):
# type: (AnyProcess) -> AnyStr
return self._get_process_field(process, {Process: lambda: process.type,
ProcessWPS: lambda: getattr(process, "type", PROCESS_WPS)}).lower()
[docs] def _get_process_endpoint_wps1(self, process):
# type: (AnyProcess) -> AnyStr
url = self._get_process_field(process, {Process: lambda: process.processEndpointWPS1,
ProcessWPS: lambda: None})
if not url:
url = self.default_wps_endpoint
return url
[docs] def save_process(self, process, overwrite=True, request=None): # noqa: E811
# type: (Union[Process, ProcessWPS], bool, Optional[Request]) -> Process
"""
Stores a process in storage.
:param process: An instance of :class:`weaver.datatype.Process`.
:param overwrite: Overwrite the matching process instance by name if conflicting.
:param request: <unused>
"""
process_id = self._get_process_id(process)
sane_name = get_sane_name(process_id, **self.sane_name_config)
if self.collection.count_documents({"identifier": sane_name}) > 0:
if overwrite:
self.collection.delete_one({"identifier": sane_name})
else:
raise ProcessRegistrationError("Process '{}' already registered.".format(sane_name))
process.identifier = sane_name # must use property getter/setter to match both 'Process' types
self._add_process(process)
return self.fetch_by_id(sane_name)
[docs] def delete_process(self, process_id, visibility=None, request=None):
# type: (AnyStr, Optional[AnyStr], Optional[Request]) -> bool
"""
Removes process from database, optionally filtered by visibility.
If ``visibility=None``, the process is deleted (if existing) regardless of its visibility value.
"""
sane_name = get_sane_name(process_id, **self.sane_name_config)
process = self.fetch_by_id(sane_name, visibility=visibility, request=request)
if not process:
raise ProcessNotFound("Process '{}' could not be found.".format(sane_name))
return bool(self.collection.delete_one({"identifier": sane_name}).deleted_count)
[docs] def list_processes(self, visibility=None, request=None): # noqa: E811
# type: (Optional[AnyStr], Optional[Request]) -> List[Process]
"""
Lists all processes in database, optionally filtered by `visibility`.
:param visibility: One value amongst `weaver.visibility`.
:param request: <unused>
"""
db_processes = []
search_filters = {}
if visibility is None:
visibility = VISIBILITY_VALUES
if isinstance(visibility, six.string_types):
visibility = [visibility]
for v in visibility:
if v not in VISIBILITY_VALUES:
raise ValueError("Invalid visibility value '{0!s}' is not one of {1!s}"
.format(v, list(VISIBILITY_VALUES)))
search_filters["visibility"] = {"$in": list(visibility)}
for process in self.collection.find(search_filters).sort("identifier", pymongo.ASCENDING):
db_processes.append(Process(process))
return db_processes
[docs] def fetch_by_id(self, process_id, visibility=None, request=None): # noqa: E811
# type: (AnyStr, Optional[AnyStr], Optional[Request]) -> Process
"""
Get process for given `process_id` from storage, optionally filtered by `visibility`.
If ``visibility=None``, the process is retrieved (if existing) regardless of its visibility value.
:param process_id: process identifier
:param visibility: one value amongst `weaver.visibility`.
:param request: <unused>
:return: An instance of :class:`weaver.datatype.Process`.
"""
sane_name = get_sane_name(process_id, **self.sane_name_config)
process = self.collection.find_one({"identifier": sane_name})
if not process:
raise ProcessNotFound("Process '{}' could not be found.".format(sane_name))
process = Process(process)
if visibility is not None and process.visibility != visibility:
raise ProcessNotAccessible("Process '{}' cannot be accessed.".format(sane_name))
return process
[docs] def get_visibility(self, process_id, request=None): # noqa: E811
# type: (AnyStr, Optional[Request]) -> AnyStr
"""
Get `visibility` of a process.
:return: One value amongst `weaver.visibility`.
"""
process = self.fetch_by_id(process_id)
return process.visibility
[docs] def set_visibility(self, process_id, visibility, request=None): # noqa: E811
# type: (AnyStr, AnyStr, Optional[Request]) -> None
"""
Set `visibility` of a process.
:param visibility: One value amongst `weaver.visibility`.
:param process_id:
:param request: <unused>
:raises: ``TypeError`` or ``ValueError`` in case of invalid parameter.
"""
process = self.fetch_by_id(process_id)
process.visibility = visibility
self.save_process(process, overwrite=True)
[docs] def clear_processes(self, request=None): # noqa: E811
# type: (Optional[Request]) -> bool
"""
Clears all processes from the store.
:param request: <unused>
"""
self.collection.drop()
return True
[docs]class MongodbJobStore(StoreJobs, MongodbStore):
"""
Registry for process jobs tracking. Uses mongodb to store job attributes.
"""
def __init__(self, *args, **kwargs):
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreJobs.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_job(self,
task_id, # type: AnyStr
process, # type: AnyStr
service=None, # type: Optional[AnyStr]
inputs=None, # type: Optional[List[Any]]
is_workflow=False, # type: bool
user_id=None, # type: Optional[int]
execute_async=True, # type: bool
custom_tags=None, # type: Optional[List[AnyStr]]
access=None, # type: Optional[AnyStr]
notification_email=None, # type: Optional[AnyStr]
accept_language=None, # type: Optional[AnyStr]
): # type: (...) -> Job
"""
Stores a job in mongodb.
"""
try:
tags = ["dev"]
tags.extend(list(filter(lambda t: t, custom_tags or [])))
if is_workflow:
tags.append(PROCESS_WORKFLOW)
else:
tags.append(PROCESS_APPLICATION)
if execute_async:
tags.append(EXECUTE_MODE_ASYNC)
else:
tags.append(EXECUTE_MODE_SYNC)
if not access:
access = VISIBILITY_PRIVATE
new_job = Job({
"task_id": task_id,
"user_id": user_id,
"service": service, # provider identifier (WPS service)
"process": process, # process identifier (WPS request)
"inputs": inputs,
"status": map_status(STATUS_ACCEPTED),
"execute_async": execute_async,
"is_workflow": is_workflow,
"created": now(),
"tags": list(set(tags)),
"access": access,
"notification_email": notification_email,
"accept_language": accept_language,
})
self.collection.insert_one(new_job.params())
job = self.fetch_by_id(job_id=new_job.id)
except Exception as ex:
raise JobRegistrationError("Error occurred during job registration: [{}]".format(repr(ex)))
if job is None:
raise JobRegistrationError("Failed to retrieve registered job.")
return job
[docs] def update_job(self, job):
# type: (Job) -> Job
"""
Updates a job parameters in mongodb storage.
:param job: instance of ``weaver.datatype.Job``.
"""
try:
result = self.collection.update_one({"id": job.id}, {"$set": job.params()})
if result.acknowledged and result.matched_count == 1:
return self.fetch_by_id(job.id)
except Exception as ex:
raise JobUpdateError("Error occurred during job update: [{}]".format(repr(ex)))
raise JobUpdateError("Failed to update specified job: '{}'".format(str(job)))
[docs] def delete_job(self, job_id, request=None): # noqa: E811
# type: (AnyStr, Optional[Request]) -> bool
"""
Removes job from mongodb storage.
"""
self.collection.delete_one({"id": job_id})
return True
[docs] def fetch_by_id(self, job_id, request=None): # noqa: E811
# type: (AnyStr, Optional[Request]) -> Job
"""
Gets job for given ``job_id`` from mongodb storage.
"""
job = self.collection.find_one({"id": job_id})
if not job:
raise JobNotFound("Could not find job matching: '{}'".format(job_id))
return Job(job)
[docs] def list_jobs(self, request=None): # noqa: E811
# type: (Optional[Request]) -> List[Job]
"""
Lists all jobs in mongodb storage.
For user-specific access to available jobs, use :meth:`MongodbJobStore.find_jobs` instead.
"""
jobs = []
for job in self.collection.find().sort("id", ASCENDING):
jobs.append(Job(job))
return jobs
[docs] def find_jobs(self,
request, # type: Request
process=None, # type: Optional[AnyStr]
service=None, # type: Optional[AnyStr]
tags=None, # type: Optional[List[AnyStr]]
access=None, # type: Optional[AnyStr]
notification_email=None, # type: Optional[AnyStr]
status=None, # type: Optional[AnyStr]
sort=None, # type: Optional[AnyStr]
page=0, # type: int
limit=10, # type: int
group_by=None, # type: Optional[Union[AnyStr, List[AnyStr]]]
): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount]
"""
Finds all jobs in mongodb storage matching search filters and obtain results with requested paging or grouping.
:param request: request that lead to this call to obtain permissions and user id.
:param process: process name to filter matching jobs.
:param service: service name to filter matching jobs.
:param tags: list of tags to filter matching jobs.
:param access: access visibility to filter matching jobs (default: PUBLIC).
:param notification_email: notification email to filter matching jobs.
:param status: status to filter matching jobs.
:param sort: field which is used for sorting results (default: creation date, descending).
:param page: page number to return when using result paging (only when not using ``group_by``).
:param limit: number of jobs per page when using result paging (only when not using ``group_by``).
:param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled).
:returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job
Example:
Using paging (default), result will be in the form::
(
[Job(1), Job(2), Job(3), ...],
<total>
)
Where ``<total>`` will indicate the complete count of matched jobs with filters, but the list of jobs
will be limited only to ``page`` index and ``limit`` specified.
Using grouping with a list of field specified with ``group_by``, results will be in the form::
(
[{category: {field1: valueA, field2: valueB, ...}, [Job(1), Job(2), ...], count: <count>},
{category: {field1: valueC, field2: valueD, ...}, [Job(x), Job(y), ...], count: <count>},
...
],
<total>
)
Where ``<total>`` will again indicate all matched jobs by every category combined, and ``<count>`` will
indicate the amount of jobs matched for each individual category. Also, ``category`` will indicate values
of specified fields (from ``group_by``) that compose corresponding jobs with matching values.
"""
if any(v in tags for v in VISIBILITY_VALUES):
raise ValueError("Visibility values not acceptable in 'tags', use 'access' instead.")
search_filters = {}
if request.has_permission("admin") and access in VISIBILITY_VALUES:
search_filters["access"] = access
else:
user_id = request.authenticated_userid
if user_id is not None:
search_filters["user_id"] = user_id
if access in VISIBILITY_VALUES:
search_filters["access"] = access
else:
search_filters["access"] = VISIBILITY_PUBLIC
if tags:
search_filters["tags"] = {"$all": tags}
if status in JOB_STATUS_CATEGORIES.keys():
search_filters["status"] = {"$in": JOB_STATUS_CATEGORIES[status]}
elif status:
search_filters["status"] = status
if notification_email is not None:
search_filters["notification_email"] = notification_email
if process is not None:
search_filters["process"] = process
if service is not None:
search_filters["service"] = service
if sort is None:
sort = SORT_CREATED
elif sort == SORT_USER:
sort = "user_id"
if sort not in JOB_SORT_VALUES:
raise JobNotFound("Invalid sorting method: '{}'".format(repr(sort)))
sort_order = DESCENDING if sort in (SORT_FINISHED, SORT_CREATED) else ASCENDING
sort_criteria = {sort: sort_order}
# minimal operation, only search for matches and sort them
pipeline = [{"$match": search_filters}, {"$sort": sort_criteria}]
# results by group categories
if group_by:
group_by = [group_by] if isinstance(group_by, six.string_types) else group_by # type: List[AnyStr]
group_categories = {field: "$" + field for field in group_by} # fields that can generate groups
pipeline.extend([{
"$group": {
"_id": group_categories, # grouping categories to aggregate corresponding jobs
"jobs": {"$push": "$$ROOT"}, # matched jobs for corresponding grouping categories
"count": {"$sum": 1}}, # count of matches for corresponding grouping categories
}, { # noqa: E123 # ignore indentation checks
"$project": {
"_id": False, # removes "_id" field from results
"category": "$_id", # renames "_id" grouping categories key
"jobs": "$jobs", # preserve field
"count": "$count", # preserve field
}
}])
found = self.collection.aggregate(pipeline)
items = [{k: (v if k != "jobs" else [Job(j) for j in v]) # convert to Job object where applicable
for k, v in i.items()} for i in found]
# results with paging
else:
pipeline.extend([{"$skip": page * limit}, {"$limit": limit}])
found = self.collection.aggregate(pipeline)
items = [Job(item) for item in list(found)]
total = self.collection.count_documents(search_filters)
return items, total
[docs] def clear_jobs(self, request=None): # noqa: E811
# type: (Optional[Request]) -> bool
"""
Removes all jobs from mongodb storage.
"""
self.collection.drop()
return True
[docs]class MongodbQuoteStore(StoreQuotes, MongodbStore):
"""
Registry for quotes. Uses mongodb to store quote attributes.
"""
def __init__(self, *args, **kwargs):
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreQuotes.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_quote(self, quote):
# type: (Quote) -> Quote
"""
Stores a quote in mongodb.
"""
if not isinstance(quote, Quote):
raise QuoteInstanceError("Invalid quote object: '{}'".format(repr(quote)))
try:
self.collection.insert_one(quote.params())
quote = self.fetch_by_id(quote_id=quote.id)
except Exception as ex:
raise QuoteRegistrationError("Error occurred during quote registration: [{}]".format(repr(ex)))
if quote is None:
raise QuoteRegistrationError("Failed to retrieve registered quote.")
return quote
[docs] def fetch_by_id(self, quote_id):
# type: (AnyStr) -> Quote
"""
Gets quote for given ``quote_id`` from mongodb storage.
"""
quote = self.collection.find_one({"id": quote_id})
if not quote:
raise QuoteNotFound("Could not find quote matching: '{}'".format(quote_id))
return Quote(quote)
[docs] def list_quotes(self):
# type: (...) -> List[Quote]
"""
Lists all quotes in mongodb storage.
"""
quotes = []
for quote in self.collection.find().sort("id", ASCENDING):
quotes.append(Quote(quote))
return quotes
[docs] def find_quotes(self, process_id=None, page=0, limit=10, sort=None):
# type: (Optional[AnyStr], int, int, Optional[AnyStr]) -> Tuple[List[Quote], int]
"""
Finds all quotes in mongodb storage matching search filters.
Returns a tuple of filtered ``items`` and their ``count``, where ``items`` can have paging and be limited
to a maximum per page, but ``count`` always indicate the `total` number of matches.
"""
search_filters = {}
if isinstance(process_id, six.string_types):
search_filters["process"] = process_id
if sort is None:
sort = SORT_ID
if sort not in QUOTE_SORT_VALUES:
raise QuoteNotFound("Invalid sorting method: '{!s}'".format(sort))
sort_order = ASCENDING
sort_criteria = [(sort, sort_order)]
found = self.collection.find(search_filters)
count = found.count()
items = [Quote(item) for item in list(found.skip(page * limit).limit(limit).sort(sort_criteria))]
return items, count
[docs]class MongodbBillStore(StoreBills, MongodbStore):
"""
Registry for bills. Uses mongodb to store bill attributes.
"""
def __init__(self, *args, **kwargs):
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreBills.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_bill(self, bill):
# type: (Bill) -> Bill
"""
Stores a bill in mongodb.
"""
if not isinstance(bill, Bill):
raise BillInstanceError("Invalid bill object: '{}'".format(repr(bill)))
try:
self.collection.insert_one(bill.params())
bill = self.fetch_by_id(bill_id=bill.id)
except Exception as ex:
raise BillRegistrationError("Error occurred during bill registration: [{}]".format(repr(ex)))
if bill is None:
raise BillRegistrationError("Failed to retrieve registered bill.")
return Bill(bill)
[docs] def fetch_by_id(self, bill_id):
# type: (AnyStr) -> Bill
"""
Gets bill for given ``bill_id`` from mongodb storage.
"""
bill = self.collection.find_one({"id": bill_id})
if not bill:
raise BillNotFound("Could not find bill matching: '{}'".format(bill_id))
return Bill(bill)
[docs] def list_bills(self):
# type: (...) -> List[Bill]
"""
Lists all bills in mongodb storage.
"""
bills = []
for bill in self.collection.find().sort("id", ASCENDING):
bills.append(Bill(bill))
return bills
[docs] def find_bills(self, quote_id=None, page=0, limit=10, sort=None):
# type: (Optional[AnyStr], int, int, Optional[AnyStr]) -> Tuple[List[Bill], int]
"""
Finds all bills in mongodb storage matching search filters.
Returns a tuple of filtered ``items`` and their ``count``, where ``items`` can have paging and be limited
to a maximum per page, but ``count`` always indicate the `total` number of matches.
"""
search_filters = {}
if isinstance(quote_id, six.string_types):
search_filters["quote"] = quote_id
if sort is None:
sort = SORT_ID
if sort not in BILL_SORT_VALUES:
raise BillNotFound("Invalid sorting method: '{}'".format(repr(sort)))
sort_order = ASCENDING
sort_criteria = [(sort, sort_order)]
found = self.collection.find(search_filters)
count = found.count()
items = [Bill(item) for item in list(found.skip(page * limit).limit(limit).sort(sort_criteria))]
return items, count