Source code for weaver.database.base

import abc
from typing import TYPE_CHECKING

from import StoreInterface

    from weaver.typedefs import AnySettingsContainer, JSON, Type, Union

[docs] StoreSelector = Union[Type[StoreInterface], StoreInterface, str]
[docs]class DatabaseInterface(metaclass=abc.ABCMeta): """Return the unique identifier of db type matching settings.""" __slots__ = ["type"] def __init__(self, _): # type: (AnySettingsContainer) -> None if not self.type: # pylint: disable=E1101,no-member raise NotImplementedError("Database 'type' must be overridden in inheriting class.") @staticmethod
[docs] def _get_store_type(store_type): # type: (StoreSelector) -> str if isinstance(store_type, StoreInterface): return store_type.type if isinstance(store_type, type) and issubclass(store_type, StoreInterface): return store_type.type if isinstance(store_type, str): return store_type raise TypeError("Unsupported store type selector: [{}] ({})".format(store_type, type(store_type)))
[docs] def get_store(self, store_type, *store_args, **store_kwargs): raise NotImplementedError
[docs] def reset_store(self, store_type): # type: (StoreSelector) -> None raise NotImplementedError
[docs] def get_session(self): raise NotImplementedError
[docs] def get_information(self): # type: (...) -> JSON """ :returns: {'version': version, 'type': db_type} """ raise NotImplementedError
[docs] def is_ready(self): # type: (...) -> bool raise NotImplementedError
[docs] def run_migration(self): # type: (...) -> None raise NotImplementedError