Source code for weaver.store.base

import abc
from typing import TYPE_CHECKING

from weaver.utils import VersionFormat

if TYPE_CHECKING:
    import datetime
    from typing import Any, Dict, List, Optional, Tuple, Union

    from pyramid.request import Request
    from pywps import Process as ProcessWPS

    from weaver.datatype import Bill, Job, Process, Quote, Service, VaultFile
    from weaver.execute import AnyExecuteResponse
    from weaver.sort import AnySortType
    from weaver.status import AnyStatusSearch
    from weaver.typedefs import (
        AnyUUID,
        AnyVersion,
        ExecutionInputs,
        ExecutionOutputs,
        DatetimeIntervalType,
        SettingsType,
        TypedDict
    )
    from weaver.visibility import AnyVisibility

[docs] JobGroupCategory = TypedDict("JobGroupCategory", {"category": Dict[str, Optional[str]], "count": int, "jobs": List[Job]})
JobSearchResult = Tuple[Union[List[Job], JobGroupCategory], int]
[docs]class StoreInterface(object, metaclass=abc.ABCMeta):
[docs] type = None # type: str
[docs] settings = None # type: SettingsType
def __init__(self, settings=None): # type: (Optional[SettingsType]) -> None self.settings = settings if not self.type: raise NotImplementedError("Store 'type' must be overridden in inheriting class.")
[docs]class StoreServices(StoreInterface):
[docs] type = "services"
@abc.abstractmethod
[docs] def save_service(self, service, overwrite=True): # type: (Service, bool) -> Service raise NotImplementedError
@abc.abstractmethod
[docs] def delete_service(self, name): # type: (str) -> bool raise NotImplementedError
@abc.abstractmethod
[docs] def list_services(self): # type: () -> List[Service] raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_name(self, name, visibility=None): # type: (str, Optional[AnyVisibility]) -> Service raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_url(self, url): # type: (str) -> Service raise NotImplementedError
@abc.abstractmethod
[docs] def clear_services(self): # type: () -> bool raise NotImplementedError
[docs]class StoreProcesses(StoreInterface):
[docs] type = "processes"
@abc.abstractmethod
[docs] def save_process(self, process, overwrite=True): # type: (Union[Process, ProcessWPS], bool) -> Process raise NotImplementedError
@abc.abstractmethod
[docs] def delete_process(self, process_id, visibility=None): # type: (str, Optional[AnyVisibility]) -> bool raise NotImplementedError
@abc.abstractmethod
[docs] def list_processes(self, visibility=None, # type: Optional[AnyVisibility, List[AnyVisibility]] page=None, # type: Optional[int] limit=None, # type: Optional[int] sort=None, # type: Optional[AnySortType] total=False, # type: bool revisions=False, # type: bool process=None, # type: Optional[str] ): # type: (...) -> Union[List[Process], Tuple[List[Process], int]] raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, process_id, visibility=None): # type: (str, Optional[AnyVisibility]) -> Process raise NotImplementedError
@abc.abstractmethod
[docs] def find_versions(self, process_id, version_format=VersionFormat.OBJECT): # type: (str, VersionFormat) -> List[AnyVersion] raise NotImplementedError
@abc.abstractmethod
[docs] def update_version(self, process_id, version): # type: (str, AnyVersion) -> Process raise NotImplementedError
@abc.abstractmethod
[docs] def get_visibility(self, process_id): # type: (str) -> AnyVisibility raise NotImplementedError
@abc.abstractmethod
[docs] def set_visibility(self, process_id, visibility): # type: (str, AnyVisibility) -> None raise NotImplementedError
@abc.abstractmethod
[docs] def clear_processes(self): # type: () -> bool raise NotImplementedError
[docs]class StoreJobs(StoreInterface):
[docs] type = "jobs"
@abc.abstractmethod
[docs] def save_job(self, task_id, # type: str process, # type: str service=None, # type: Optional[str] inputs=None, # type: Optional[ExecutionInputs] outputs=None, # type: Optional[ExecutionOutputs] is_workflow=False, # type: bool is_local=False, # type: bool execute_async=True, # type: bool execute_response=None, # type: Optional[AnyExecuteResponse] custom_tags=None, # type: Optional[List[str]] user_id=None, # type: Optional[int] access=None, # type: Optional[AnyVisibility] context=None, # type: Optional[str] notification_email=None, # type: Optional[str] accept_language=None, # type: Optional[str] created=None, # type: Optional[datetime.datetime] ): # type: (...) -> Job raise NotImplementedError
@abc.abstractmethod
[docs] def batch_update_jobs(self, job_filter, job_update): # type: (Dict[str, Any], Dict[str, Any]) -> int raise NotImplementedError
@abc.abstractmethod
[docs] def update_job(self, job): # type: (Job) -> Job raise NotImplementedError
@abc.abstractmethod
[docs] def delete_job(self, job_id): # type: (AnyUUID) -> bool raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, job_id): # type: (AnyUUID) -> Job raise NotImplementedError
@abc.abstractmethod
[docs] def list_jobs(self): # type: () -> List[Job] raise NotImplementedError
@abc.abstractmethod
[docs] def find_jobs(self, process=None, # type: Optional[str] service=None, # type: Optional[str] job_type=None, # type: Optional[str] tags=None, # type: Optional[List[str]] access=None, # type: Optional[str] notification_email=None, # type: Optional[str] status=None, # type: Optional[AnyStatusSearch, List[AnyStatusSearch]] sort=None, # type: Optional[AnySortType] page=0, # type: Optional[int] limit=10, # type: Optional[int] min_duration=None, # type: Optional[int] max_duration=None, # type: Optional[int] datetime_interval=None, # type: Optional[DatetimeIntervalType] group_by=None, # type: Optional[Union[str, List[str]]] request=None, # type: Optional[Request] ): # type: (...) -> JobSearchResult raise NotImplementedError
@abc.abstractmethod
[docs] def clear_jobs(self): # type: () -> bool raise NotImplementedError
[docs]class StoreQuotes(StoreInterface):
[docs] type = "quotes"
@abc.abstractmethod
[docs] def save_quote(self, quote): # type: (Quote) -> Quote raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, quote_id): # type: (AnyUUID) -> Quote raise NotImplementedError
@abc.abstractmethod
[docs] def list_quotes(self): # type: (...) -> List[Quote] raise NotImplementedError
@abc.abstractmethod
[docs] def find_quotes(self, process_id=None, page=0, limit=10, sort=None): # type: (Optional[str], int, int, Optional[AnySortType]) -> Tuple[List[Quote], int] raise NotImplementedError
@abc.abstractmethod
[docs] def update_quote(self, quote): # type: (Quote) -> Quote raise NotImplementedError
[docs]class StoreBills(StoreInterface):
[docs] type = "bills"
@abc.abstractmethod
[docs] def save_bill(self, bill): # type: (Bill) -> Bill raise NotImplementedError
@abc.abstractmethod
[docs] def fetch_by_id(self, bill_id): # type: (AnyUUID) -> Bill raise NotImplementedError
@abc.abstractmethod
[docs] def list_bills(self): # type: (...) -> List[Bill] raise NotImplementedError
@abc.abstractmethod
[docs] def find_bills(self, quote_id=None, page=0, limit=10, sort=None): # type: (Optional[str], int, int, Optional[AnySortType]) -> Tuple[List[Bill], int] raise NotImplementedError
[docs]class StoreVault(StoreInterface):
[docs] type = "vault"
@abc.abstractmethod
[docs] def get_file(self, file_id, nothrow=False): # type: (AnyUUID, bool) -> VaultFile raise NotImplementedError
@abc.abstractmethod
[docs] def save_file(self, file): # type: (VaultFile) -> None raise NotImplementedError
@abc.abstractmethod
[docs] def delete_file(self, file): # type: (Union[VaultFile, AnyUUID]) -> bool raise NotImplementedError