Source code for weaver.store.base

import abc
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import datetime
    from typing import Any, Dict, List, Optional, Tuple, Union
    from pyramid.request import Request
    from pywps import Process as ProcessWPS
    from weaver.datatype import Bill, Job, Process, Quote, Service
    from weaver.typedefs import AnyValue, DatetimeIntervalType

[docs] JobListAndCount = Tuple[List[Job], int]
JobCategory = Dict[str, Union[AnyValue, Job]] JobCategoriesAndCount = Tuple[List[JobCategory], int]
[docs]class StoreInterface(object, metaclass=abc.ABCMeta):
[docs] type = None
def __init__(self): if not self.type: raise NotImplementedError("Store 'type' must be overridden in inheriting class.")
[docs]class StoreServices(StoreInterface):
[docs] type = "services"
@abc.abstractmethod
[docs] def save_service(self, service, overwrite=True): # type: (Service, bool) -> Service raise NotImplementedError
@abc.abstractmethod
[docs] def delete_service(self, name): # type: (str) -> bool raise NotImplementedError
@abc.abstractmethod
[docs] def list_services(self): # type: () -> List[Service] raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_name(self, name, visibility=None): # type: (str, Optional[str]) -> Service raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_url(self, url): # type: (str) -> Service raise NotImplementedError
@abc.abstractmethod
[docs] def clear_services(self): # type: () -> bool raise NotImplementedError
[docs]class StoreProcesses(StoreInterface):
[docs] type = "processes"
@abc.abstractmethod
[docs] def save_process(self, process, overwrite=True): # type: (Union[Process, ProcessWPS], bool) -> Process raise NotImplementedError
@abc.abstractmethod
[docs] def delete_process(self, process_id, visibility=None): # type: (str, Optional[str]) -> bool raise NotImplementedError
@abc.abstractmethod
[docs] def list_processes(self, visibility=None): # type: (Optional[str]) -> List[Process] raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, process_id, visibility=None): # type: (str, Optional[str]) -> Process raise NotImplementedError
@abc.abstractmethod
[docs] def get_visibility(self, process_id): # type: (str) -> str raise NotImplementedError
@abc.abstractmethod
[docs] def set_visibility(self, process_id, visibility): # type: (str, str) -> None raise NotImplementedError
@abc.abstractmethod
[docs] def clear_processes(self): # type: () -> bool raise NotImplementedError
[docs]class StoreJobs(StoreInterface):
[docs] type = "jobs"
@abc.abstractmethod
[docs] def save_job(self, task_id, # type: str process, # type: str service=None, # type: Optional[str] inputs=None, # type: Optional[List[Any]] is_workflow=False, # type: bool is_local=False, # type: bool user_id=None, # type: Optional[int] execute_async=True, # type: bool custom_tags=None, # type: Optional[List[str]] access=None, # type: Optional[str] notification_email=None, # type: Optional[str] accept_language=None, # type: Optional[str] created=None, # type: Optional[datetime.datetime] ): # type: (...) -> Job raise NotImplementedError
@abc.abstractmethod
[docs] def update_job(self, job): # type: (Job) -> Job raise NotImplementedError
@abc.abstractmethod
[docs] def delete_job(self, job_id): # type: (str) -> bool raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, job_id): # type: (str) -> Job raise NotImplementedError
@abc.abstractmethod
[docs] def list_jobs(self): # type: () -> List[Job] raise NotImplementedError
@abc.abstractmethod
[docs] def find_jobs(self, process=None, # type: Optional[str] service=None, # type: Optional[str] tags=None, # type: Optional[List[str]] access=None, # type: Optional[str] notification_email=None, # type: Optional[str] status=None, # type: Optional[str] sort=None, # type: Optional[str] page=0, # type: int limit=10, # type: int datetime=None, # type: Optional[DatetimeIntervalType] group_by=None, # type: Optional[Union[str, List[str]]] request=None, # type: Optional[Request] ): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount] raise NotImplementedError
@abc.abstractmethod
[docs] def clear_jobs(self): # type: () -> bool raise NotImplementedError
[docs]class StoreQuotes(StoreInterface):
[docs] type = "quotes"
@abc.abstractmethod
[docs] def save_quote(self, quote): # type: (Quote) -> Quote raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, quote_id): # type: (str) -> Quote raise NotImplementedError
@abc.abstractmethod
[docs] def list_quotes(self): # type: (...) -> List[Quote] raise NotImplementedError
@abc.abstractmethod
[docs] def find_quotes(self, process_id=None, page=0, limit=10, sort=None): # type: (Optional[str], int, int, Optional[str]) -> Tuple[List[Quote], int] raise NotImplementedError
[docs]class StoreBills(StoreInterface):
[docs] type = "bills"
@abc.abstractmethod
[docs] def save_bill(self, bill): # type: (Bill) -> Bill raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, bill_id): # type: (str) -> Bill raise NotImplementedError
@abc.abstractmethod
[docs] def list_bills(self): # type: (...) -> List[Bill] raise NotImplementedError
@abc.abstractmethod
[docs] def find_bills(self, quote_id=None, page=0, limit=10, sort=None): # type: (Optional[str], int, int, Optional[str]) -> Tuple[List[Bill], int] raise NotImplementedError