Source code for weaver.store.mongodb

"""
Stores to read/write data to from/to mongodb using pymongo.
"""

import logging
from typing import TYPE_CHECKING

import pymongo
import six
from pymongo import ASCENDING, DESCENDING
from pyramid.request import Request
from pywps import Process as ProcessWPS

from weaver.datatype import Bill, Job, Process, Quote, Service
from weaver.exceptions import (
    BillInstanceError,
    BillNotFound,
    BillRegistrationError,
    JobNotFound,
    JobRegistrationError,
    JobUpdateError,
    ProcessInstanceError,
    ProcessNotAccessible,
    ProcessNotFound,
    ProcessRegistrationError,
    QuoteInstanceError,
    QuoteNotFound,
    QuoteRegistrationError,
    ServiceNotAccessible,
    ServiceNotFound,
    ServiceRegistrationError
)
from weaver.execute import EXECUTE_MODE_ASYNC, EXECUTE_MODE_SYNC
from weaver.processes.types import PROCESS_APPLICATION, PROCESS_WORKFLOW, PROCESS_WPS
from weaver.sort import (
    BILL_SORT_VALUES,
    JOB_SORT_VALUES,
    QUOTE_SORT_VALUES,
    SORT_CREATED,
    SORT_FINISHED,
    SORT_ID,
    SORT_USER
)
from weaver.status import JOB_STATUS_CATEGORIES, STATUS_ACCEPTED, map_status
from weaver.store.base import StoreBills, StoreJobs, StoreProcesses, StoreQuotes, StoreServices
from weaver.utils import get_base_url, get_sane_name, get_weaver_url, islambda, now
from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_PUBLIC, VISIBILITY_VALUES

if TYPE_CHECKING:
    from weaver.typedefs import AnyValue, AnyProcess, AnyProcessType                # noqa: F401
    from pymongo.collection import Collection                                       # noqa: F401
    from typing import Any, AnyStr, Callable, Dict, List, Optional, Tuple, Union    # noqa: F401
[docs] JobListAndCount = Tuple[List[Job], int] # noqa: F401
JobCategory = Dict[AnyStr, Union[AnyValue, Job]] # noqa: F401 JobCategoriesAndCount = Tuple[List[JobCategory], int] # noqa: F401
[docs]LOGGER = logging.getLogger(__name__)
[docs]class MongodbStore(object): """ Base class extended by all concrete store implementations. """ def __init__(self, collection, sane_name_config=None): # type: (Collection, Optional[Dict[AnyStr, Any]]) -> None if not isinstance(collection, pymongo.collection.Collection): raise TypeError("Collection not of expected type.") self.collection = collection # type: Collection self.sane_name_config = sane_name_config or {} @classmethod
[docs] def get_args_kwargs(cls, *args, **kwargs): # type: (*Any, **Any) -> Tuple[Tuple, Dict] """ Filters :class:`MongodbStore`-specific arguments to safely pass them down its ``__init__``. """ collection = None if len(args): collection = args[0] elif "collection" in kwargs: # pylint: disable=R1715 collection = kwargs["collection"] sane_name_config = kwargs.get("sane_name_config", None) return tuple([collection]), {"sane_name_config": sane_name_config}
[docs]class MongodbServiceStore(StoreServices, MongodbStore): """ Registry for OWS services. Uses mongodb to store service url and attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreServices.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_service(self, service, overwrite=True, request=None): # type: (Service, bool, Optional[Request]) -> Service """ Stores an OWS service in mongodb. """ service_url = get_base_url(service.url) # check if service is already registered if self.collection.count_documents({"url": service_url}) > 0: if overwrite: self.collection.delete_one({"url": service_url}) else: raise ServiceRegistrationError("service url already registered.") service_name = get_sane_name(service.name, **self.sane_name_config) if self.collection.count_documents({"name": service_name}) > 0: if overwrite: self.collection.delete_one({"name": service_name}) else: raise ServiceRegistrationError("service name already registered.") self.collection.insert_one(Service( url=service_url, name=service_name, type=service.type, public=service.public, auth=service.auth).params()) return self.fetch_by_url(url=service_url, request=request)
[docs] def delete_service(self, name, request=None): # noqa: E811 # type: (AnyStr, Optional[Request]) -> bool """ Removes service from mongodb storage. """ self.collection.delete_one({"name": name}) return True
[docs] def list_services(self, request=None): # noqa: E811 # noqa: E811 # type: (Optional[Request]) -> List[Service] """ Lists all services in mongodb storage. """ my_services = [] for service in self.collection.find().sort("name", pymongo.ASCENDING): my_services.append(Service(service)) return my_services
[docs] def fetch_by_name(self, name, visibility=None, request=None): # noqa: E811 # type: (AnyStr, Optional[AnyStr], Optional[Request]) -> Service """ Gets service for given ``name`` from mongodb storage. """ service = self.collection.find_one({"name": name}) if not service: raise ServiceNotFound("Service '{}' could not be found.".format(name)) service = Service(service) same_visibility = (service.public and visibility == VISIBILITY_PUBLIC) or \ (not service.public and visibility == VISIBILITY_PRIVATE) if visibility is not None and not same_visibility: raise ServiceNotAccessible("Service '{}' cannot be accessed.".format(name)) return service
[docs] def fetch_by_url(self, url, request=None): # noqa: E811 # type: (AnyStr, Optional[Request]) -> Service """ Gets service for given ``url`` from mongodb storage. """ service = self.collection.find_one({"url": get_base_url(url)}) if not service: raise ServiceNotFound return Service(service)
[docs] def clear_services(self, request=None): # noqa: E811 # type: (Optional[Request]) -> bool """ Removes all OWS services from mongodb storage. """ self.collection.drop() return True
[docs]class MongodbProcessStore(StoreProcesses, MongodbStore): """ Registry for processes. Uses mongodb to store processes and attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreProcesses.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs) registry = kwargs.get("registry") settings = kwargs.get("settings", {}) if not registry else registry.settings default_processes = kwargs.get("default_processes") self.default_host = get_weaver_url(settings) self.default_wps_endpoint = "{host}{wps}".format(host=self.default_host, wps=settings.get("weaver.wps_path", "")) # enforce default process re-registration to receive any applicable update if default_processes: registered_processes = [process.identifier for process in self.list_processes()] for process in default_processes: process_name = self._get_process_id(process) if process_name in registered_processes: self.delete_process(process_name) self._add_process(process)
[docs] def _add_process(self, process): # type: (AnyProcess) -> None if isinstance(process, ProcessWPS): new_process = Process.from_wps(process, processEndpointWPS1=self.default_wps_endpoint) else: new_process = process if not isinstance(new_process, Process): raise ProcessInstanceError("Unsupported process type '{}'".format(type(process))) # apply defaults if not specified new_process["type"] = self._get_process_type(process) new_process["identifier"] = self._get_process_id(process) new_process["processEndpointWPS1"] = self._get_process_endpoint_wps1(process) new_process["visibility"] = new_process.visibility self.collection.insert_one(new_process.params())
@staticmethod
[docs] def _get_process_field(process, function_dict): # type: (AnyProcess, Union[Dict[AnyProcessType, Callable[[], Any]], Callable[[], Any]]) -> Any """ Takes a lambda expression or a dict of process-specific lambda expressions to retrieve a field. Validates that the passed process object is one of the supported types. :param process: process to retrieve the field from. :param function_dict: lambda or dict of lambda of process type :return: retrieved field if the type was supported :raises: ProcessInstanceError on invalid process type """ if isinstance(process, Process): if islambda(function_dict): return function_dict() return function_dict[Process]() elif isinstance(process, ProcessWPS): if islambda(function_dict): return function_dict() return function_dict[ProcessWPS]() else: raise ProcessInstanceError("Unsupported process type '{}'".format(type(process)))
[docs] def _get_process_id(self, process): # type: (AnyProcess) -> AnyStr return self._get_process_field(process, lambda: process.identifier)
[docs] def _get_process_type(self, process): # type: (AnyProcess) -> AnyStr return self._get_process_field(process, {Process: lambda: process.type, ProcessWPS: lambda: getattr(process, "type", PROCESS_WPS)}).lower()
[docs] def _get_process_endpoint_wps1(self, process): # type: (AnyProcess) -> AnyStr url = self._get_process_field(process, {Process: lambda: process.processEndpointWPS1, ProcessWPS: lambda: None}) if not url: url = self.default_wps_endpoint return url
[docs] def save_process(self, process, overwrite=True, request=None): # noqa: E811 # type: (Union[Process, ProcessWPS], bool, Optional[Request]) -> Process """ Stores a process in storage. :param process: An instance of :class:`weaver.datatype.Process`. :param overwrite: Overwrite the matching process instance by name if conflicting. :param request: <unused> """ process_id = self._get_process_id(process) sane_name = get_sane_name(process_id, **self.sane_name_config) if self.collection.count_documents({"identifier": sane_name}) > 0: if overwrite: self.collection.delete_one({"identifier": sane_name}) else: raise ProcessRegistrationError("Process '{}' already registered.".format(sane_name)) process.identifier = sane_name # must use property getter/setter to match both 'Process' types self._add_process(process) return self.fetch_by_id(sane_name)
[docs] def delete_process(self, process_id, visibility=None, request=None): # type: (AnyStr, Optional[AnyStr], Optional[Request]) -> bool """ Removes process from database, optionally filtered by visibility. If ``visibility=None``, the process is deleted (if existing) regardless of its visibility value. """ sane_name = get_sane_name(process_id, **self.sane_name_config) process = self.fetch_by_id(sane_name, visibility=visibility, request=request) if not process: raise ProcessNotFound("Process '{}' could not be found.".format(sane_name)) return bool(self.collection.delete_one({"identifier": sane_name}).deleted_count)
[docs] def list_processes(self, visibility=None, request=None): # noqa: E811 # type: (Optional[AnyStr], Optional[Request]) -> List[Process] """ Lists all processes in database, optionally filtered by `visibility`. :param visibility: One value amongst `weaver.visibility`. :param request: <unused> """ db_processes = [] search_filters = {} if visibility is None: visibility = VISIBILITY_VALUES if isinstance(visibility, six.string_types): visibility = [visibility] for v in visibility: if v not in VISIBILITY_VALUES: raise ValueError("Invalid visibility value '{0!s}' is not one of {1!s}" .format(v, list(VISIBILITY_VALUES))) search_filters["visibility"] = {"$in": list(visibility)} for process in self.collection.find(search_filters).sort("identifier", pymongo.ASCENDING): db_processes.append(Process(process)) return db_processes
[docs] def fetch_by_id(self, process_id, visibility=None, request=None): # noqa: E811 # type: (AnyStr, Optional[AnyStr], Optional[Request]) -> Process """ Get process for given `process_id` from storage, optionally filtered by `visibility`. If ``visibility=None``, the process is retrieved (if existing) regardless of its visibility value. :param process_id: process identifier :param visibility: one value amongst `weaver.visibility`. :param request: <unused> :return: An instance of :class:`weaver.datatype.Process`. """ sane_name = get_sane_name(process_id, **self.sane_name_config) process = self.collection.find_one({"identifier": sane_name}) if not process: raise ProcessNotFound("Process '{}' could not be found.".format(sane_name)) process = Process(process) if visibility is not None and process.visibility != visibility: raise ProcessNotAccessible("Process '{}' cannot be accessed.".format(sane_name)) return process
[docs] def get_visibility(self, process_id, request=None): # noqa: E811 # type: (AnyStr, Optional[Request]) -> AnyStr """ Get `visibility` of a process. :return: One value amongst `weaver.visibility`. """ process = self.fetch_by_id(process_id) return process.visibility
[docs] def set_visibility(self, process_id, visibility, request=None): # noqa: E811 # type: (AnyStr, AnyStr, Optional[Request]) -> None """ Set `visibility` of a process. :param visibility: One value amongst `weaver.visibility`. :param process_id: :param request: <unused> :raises: ``TypeError`` or ``ValueError`` in case of invalid parameter. """ process = self.fetch_by_id(process_id) process.visibility = visibility self.save_process(process, overwrite=True)
[docs] def clear_processes(self, request=None): # noqa: E811 # type: (Optional[Request]) -> bool """ Clears all processes from the store. :param request: <unused> """ self.collection.drop() return True
[docs]class MongodbJobStore(StoreJobs, MongodbStore): """ Registry for process jobs tracking. Uses mongodb to store job attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreJobs.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_job(self, task_id, # type: AnyStr process, # type: AnyStr service=None, # type: Optional[AnyStr] inputs=None, # type: Optional[List[Any]] is_workflow=False, # type: bool user_id=None, # type: Optional[int] execute_async=True, # type: bool custom_tags=None, # type: Optional[List[AnyStr]] access=None, # type: Optional[AnyStr] notification_email=None, # type: Optional[AnyStr] accept_language=None, # type: Optional[AnyStr] ): # type: (...) -> Job """ Stores a job in mongodb. """ try: tags = ["dev"] tags.extend(list(filter(lambda t: t, custom_tags or []))) if is_workflow: tags.append(PROCESS_WORKFLOW) else: tags.append(PROCESS_APPLICATION) if execute_async: tags.append(EXECUTE_MODE_ASYNC) else: tags.append(EXECUTE_MODE_SYNC) if not access: access = VISIBILITY_PRIVATE new_job = Job({ "task_id": task_id, "user_id": user_id, "service": service, # provider identifier (WPS service) "process": process, # process identifier (WPS request) "inputs": inputs, "status": map_status(STATUS_ACCEPTED), "execute_async": execute_async, "is_workflow": is_workflow, "created": now(), "tags": list(set(tags)), "access": access, "notification_email": notification_email, "accept_language": accept_language, }) self.collection.insert_one(new_job.params()) job = self.fetch_by_id(job_id=new_job.id) except Exception as ex: raise JobRegistrationError("Error occurred during job registration: [{}]".format(repr(ex))) if job is None: raise JobRegistrationError("Failed to retrieve registered job.") return job
[docs] def update_job(self, job): # type: (Job) -> Job """ Updates a job parameters in mongodb storage. :param job: instance of ``weaver.datatype.Job``. """ try: result = self.collection.update_one({"id": job.id}, {"$set": job.params()}) if result.acknowledged and result.matched_count == 1: return self.fetch_by_id(job.id) except Exception as ex: raise JobUpdateError("Error occurred during job update: [{}]".format(repr(ex))) raise JobUpdateError("Failed to update specified job: '{}'".format(str(job)))
[docs] def delete_job(self, job_id, request=None): # noqa: E811 # type: (AnyStr, Optional[Request]) -> bool """ Removes job from mongodb storage. """ self.collection.delete_one({"id": job_id}) return True
[docs] def fetch_by_id(self, job_id, request=None): # noqa: E811 # type: (AnyStr, Optional[Request]) -> Job """ Gets job for given ``job_id`` from mongodb storage. """ job = self.collection.find_one({"id": job_id}) if not job: raise JobNotFound("Could not find job matching: '{}'".format(job_id)) return Job(job)
[docs] def list_jobs(self, request=None): # noqa: E811 # type: (Optional[Request]) -> List[Job] """ Lists all jobs in mongodb storage. For user-specific access to available jobs, use :meth:`MongodbJobStore.find_jobs` instead. """ jobs = [] for job in self.collection.find().sort("id", ASCENDING): jobs.append(Job(job)) return jobs
[docs] def find_jobs(self, request, # type: Request process=None, # type: Optional[AnyStr] service=None, # type: Optional[AnyStr] tags=None, # type: Optional[List[AnyStr]] access=None, # type: Optional[AnyStr] notification_email=None, # type: Optional[AnyStr] status=None, # type: Optional[AnyStr] sort=None, # type: Optional[AnyStr] page=0, # type: int limit=10, # type: int group_by=None, # type: Optional[Union[AnyStr, List[AnyStr]]] ): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount] """ Finds all jobs in mongodb storage matching search filters and obtain results with requested paging or grouping. :param request: request that lead to this call to obtain permissions and user id. :param process: process name to filter matching jobs. :param service: service name to filter matching jobs. :param tags: list of tags to filter matching jobs. :param access: access visibility to filter matching jobs (default: PUBLIC). :param notification_email: notification email to filter matching jobs. :param status: status to filter matching jobs. :param sort: field which is used for sorting results (default: creation date, descending). :param page: page number to return when using result paging (only when not using ``group_by``). :param limit: number of jobs per page when using result paging (only when not using ``group_by``). :param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled). :returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job Example: Using paging (default), result will be in the form:: ( [Job(1), Job(2), Job(3), ...], <total> ) Where ``<total>`` will indicate the complete count of matched jobs with filters, but the list of jobs will be limited only to ``page`` index and ``limit`` specified. Using grouping with a list of field specified with ``group_by``, results will be in the form:: ( [{category: {field1: valueA, field2: valueB, ...}, [Job(1), Job(2), ...], count: <count>}, {category: {field1: valueC, field2: valueD, ...}, [Job(x), Job(y), ...], count: <count>}, ... ], <total> ) Where ``<total>`` will again indicate all matched jobs by every category combined, and ``<count>`` will indicate the amount of jobs matched for each individual category. Also, ``category`` will indicate values of specified fields (from ``group_by``) that compose corresponding jobs with matching values. """ if any(v in tags for v in VISIBILITY_VALUES): raise ValueError("Visibility values not acceptable in 'tags', use 'access' instead.") search_filters = {} if request.has_permission("admin") and access in VISIBILITY_VALUES: search_filters["access"] = access else: user_id = request.authenticated_userid if user_id is not None: search_filters["user_id"] = user_id if access in VISIBILITY_VALUES: search_filters["access"] = access else: search_filters["access"] = VISIBILITY_PUBLIC if tags: search_filters["tags"] = {"$all": tags} if status in JOB_STATUS_CATEGORIES.keys(): search_filters["status"] = {"$in": JOB_STATUS_CATEGORIES[status]} elif status: search_filters["status"] = status if notification_email is not None: search_filters["notification_email"] = notification_email if process is not None: search_filters["process"] = process if service is not None: search_filters["service"] = service if sort is None: sort = SORT_CREATED elif sort == SORT_USER: sort = "user_id" if sort not in JOB_SORT_VALUES: raise JobNotFound("Invalid sorting method: '{}'".format(repr(sort))) sort_order = DESCENDING if sort in (SORT_FINISHED, SORT_CREATED) else ASCENDING sort_criteria = {sort: sort_order} # minimal operation, only search for matches and sort them pipeline = [{"$match": search_filters}, {"$sort": sort_criteria}] # results by group categories if group_by: group_by = [group_by] if isinstance(group_by, six.string_types) else group_by # type: List[AnyStr] group_categories = {field: "$" + field for field in group_by} # fields that can generate groups pipeline.extend([{ "$group": { "_id": group_categories, # grouping categories to aggregate corresponding jobs "jobs": {"$push": "$$ROOT"}, # matched jobs for corresponding grouping categories "count": {"$sum": 1}}, # count of matches for corresponding grouping categories }, { # noqa: E123 # ignore indentation checks "$project": { "_id": False, # removes "_id" field from results "category": "$_id", # renames "_id" grouping categories key "jobs": "$jobs", # preserve field "count": "$count", # preserve field } }]) found = self.collection.aggregate(pipeline) items = [{k: (v if k != "jobs" else [Job(j) for j in v]) # convert to Job object where applicable for k, v in i.items()} for i in found] # results with paging else: pipeline.extend([{"$skip": page * limit}, {"$limit": limit}]) found = self.collection.aggregate(pipeline) items = [Job(item) for item in list(found)] total = self.collection.count_documents(search_filters) return items, total
[docs] def clear_jobs(self, request=None): # noqa: E811 # type: (Optional[Request]) -> bool """ Removes all jobs from mongodb storage. """ self.collection.drop() return True
[docs]class MongodbQuoteStore(StoreQuotes, MongodbStore): """ Registry for quotes. Uses mongodb to store quote attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreQuotes.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_quote(self, quote): # type: (Quote) -> Quote """ Stores a quote in mongodb. """ if not isinstance(quote, Quote): raise QuoteInstanceError("Invalid quote object: '{}'".format(repr(quote))) try: self.collection.insert_one(quote.params()) quote = self.fetch_by_id(quote_id=quote.id) except Exception as ex: raise QuoteRegistrationError("Error occurred during quote registration: [{}]".format(repr(ex))) if quote is None: raise QuoteRegistrationError("Failed to retrieve registered quote.") return quote
[docs] def fetch_by_id(self, quote_id): # type: (AnyStr) -> Quote """ Gets quote for given ``quote_id`` from mongodb storage. """ quote = self.collection.find_one({"id": quote_id}) if not quote: raise QuoteNotFound("Could not find quote matching: '{}'".format(quote_id)) return Quote(quote)
[docs] def list_quotes(self): # type: (...) -> List[Quote] """ Lists all quotes in mongodb storage. """ quotes = [] for quote in self.collection.find().sort("id", ASCENDING): quotes.append(Quote(quote)) return quotes
[docs] def find_quotes(self, process_id=None, page=0, limit=10, sort=None): # type: (Optional[AnyStr], int, int, Optional[AnyStr]) -> Tuple[List[Quote], int] """ Finds all quotes in mongodb storage matching search filters. Returns a tuple of filtered ``items`` and their ``count``, where ``items`` can have paging and be limited to a maximum per page, but ``count`` always indicate the `total` number of matches. """ search_filters = {} if isinstance(process_id, six.string_types): search_filters["process"] = process_id if sort is None: sort = SORT_ID if sort not in QUOTE_SORT_VALUES: raise QuoteNotFound("Invalid sorting method: '{!s}'".format(sort)) sort_order = ASCENDING sort_criteria = [(sort, sort_order)] found = self.collection.find(search_filters) count = found.count() items = [Quote(item) for item in list(found.skip(page * limit).limit(limit).sort(sort_criteria))] return items, count
[docs]class MongodbBillStore(StoreBills, MongodbStore): """ Registry for bills. Uses mongodb to store bill attributes. """ def __init__(self, *args, **kwargs): db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs) StoreBills.__init__(self) MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_bill(self, bill): # type: (Bill) -> Bill """ Stores a bill in mongodb. """ if not isinstance(bill, Bill): raise BillInstanceError("Invalid bill object: '{}'".format(repr(bill))) try: self.collection.insert_one(bill.params()) bill = self.fetch_by_id(bill_id=bill.id) except Exception as ex: raise BillRegistrationError("Error occurred during bill registration: [{}]".format(repr(ex))) if bill is None: raise BillRegistrationError("Failed to retrieve registered bill.") return Bill(bill)
[docs] def fetch_by_id(self, bill_id): # type: (AnyStr) -> Bill """ Gets bill for given ``bill_id`` from mongodb storage. """ bill = self.collection.find_one({"id": bill_id}) if not bill: raise BillNotFound("Could not find bill matching: '{}'".format(bill_id)) return Bill(bill)
[docs] def list_bills(self): # type: (...) -> List[Bill] """ Lists all bills in mongodb storage. """ bills = [] for bill in self.collection.find().sort("id", ASCENDING): bills.append(Bill(bill)) return bills
[docs] def find_bills(self, quote_id=None, page=0, limit=10, sort=None): # type: (Optional[AnyStr], int, int, Optional[AnyStr]) -> Tuple[List[Bill], int] """ Finds all bills in mongodb storage matching search filters. Returns a tuple of filtered ``items`` and their ``count``, where ``items`` can have paging and be limited to a maximum per page, but ``count`` always indicate the `total` number of matches. """ search_filters = {} if isinstance(quote_id, six.string_types): search_filters["quote"] = quote_id if sort is None: sort = SORT_ID if sort not in BILL_SORT_VALUES: raise BillNotFound("Invalid sorting method: '{}'".format(repr(sort))) sort_order = ASCENDING sort_criteria = [(sort, sort_order)] found = self.collection.find(search_filters) count = found.count() items = [Bill(item) for item in list(found.skip(page * limit).limit(limit).sort(sort_criteria))] return items, count