Skip to content

KV

CacheToolsKVDB

Bases: BaseKV, BaseDB, KVDB

CacheTools Key-Value DB.

Source code in src/msgflux/data/dbs/providers/cachetools.py
@register_db
class CacheToolsKVDB(BaseKV, BaseDB, KVDB):
    """CacheTools Key-Value DB."""

    provider = "cachetools"

    def __init__(
        self,
        *,
        ttl: Optional[int] = 3600,
        maxsize: Optional[int] = 10000,
        hash_key: Optional[bool] = True,
    ):
        """Args:
        ttl:
            The time-to-live (TTL) for each cache entry in seconds.
        maxsize:
            The maximum number of items the cache can store.
        hash_key:
            Whether to hash the keys before storing them in the cache.
        """
        if TTLCache is None:
            raise ImportError(
                "`cachetools` client is not available. Install with "
                "`pip install cachetools`"
            )
        self.hash_key = hash_key
        self.maxsize = maxsize
        self.ttl = ttl
        self._initialize()

    def _initialize(self):
        self.client = TTLCache(maxsize=self.maxsize, ttl=self.ttl)

    def add(self, documents: Union[List[Dict[str, Any]], Dict[str, Any]]):
        if not isinstance(documents, list):
            documents = [documents]
        for document in documents:
            for key, value in document.items():
                encoded_key = convert_str_to_hash(key) if self.hash_key else key
                encoded_value = msgspec.msgpack.encode(value)
                self.client[encoded_key] = encoded_value

hash_key instance-attribute

hash_key = hash_key

maxsize instance-attribute

maxsize = maxsize

provider class-attribute instance-attribute

provider = 'cachetools'

ttl instance-attribute

ttl = ttl

__init__

__init__(*, ttl=3600, maxsize=10000, hash_key=True)

ttl: The time-to-live (TTL) for each cache entry in seconds. maxsize: The maximum number of items the cache can store. hash_key: Whether to hash the keys before storing them in the cache.

Source code in src/msgflux/data/dbs/providers/cachetools.py
def __init__(
    self,
    *,
    ttl: Optional[int] = 3600,
    maxsize: Optional[int] = 10000,
    hash_key: Optional[bool] = True,
):
    """Args:
    ttl:
        The time-to-live (TTL) for each cache entry in seconds.
    maxsize:
        The maximum number of items the cache can store.
    hash_key:
        Whether to hash the keys before storing them in the cache.
    """
    if TTLCache is None:
        raise ImportError(
            "`cachetools` client is not available. Install with "
            "`pip install cachetools`"
        )
    self.hash_key = hash_key
    self.maxsize = maxsize
    self.ttl = ttl
    self._initialize()

add

add(documents)
Source code in src/msgflux/data/dbs/providers/cachetools.py
def add(self, documents: Union[List[Dict[str, Any]], Dict[str, Any]]):
    if not isinstance(documents, list):
        documents = [documents]
    for document in documents:
        for key, value in document.items():
            encoded_key = convert_str_to_hash(key) if self.hash_key else key
            encoded_value = msgspec.msgpack.encode(value)
            self.client[encoded_key] = encoded_value

DiskCacheKVDB

Bases: BaseKV, BaseDB, KVDB

DiskCache Key-Value DB.

Source code in src/msgflux/data/dbs/providers/diskcache.py
@register_db
class DiskCacheKVDB(BaseKV, BaseDB, KVDB):
    """DiskCache Key-Value DB."""

    provider = "diskcache"

    def __init__(self, *, ttl: Optional[int] = 3600, hash_key: Optional[bool] = True):
        """Args:
        ttl:
            The time-to-live (TTL) for each cache entry in seconds.
        hash_key:
            Whether to hash the keys before storing them in the cache.
        """
        if Cache is None:
            raise ImportError(
                "`diskcache` client is not available. Install with "
                "`pip install diskcache`"
            )
        self.hash_key = hash_key
        self.ttl = ttl
        self._initialize()

    def _initialize(self):
        self.client = Cache(timeout=1)

    def add(self, documents: Union[List[Dict[str, Any]], Dict[str, Any]]):
        if not isinstance(documents, list):
            documents = [documents]
        for document in documents:
            for k, v in document.items():
                encoded_k = convert_str_to_hash(k) if self.hash_key else k
                encoded_v = msgspec.msgpack.encode(v)
                self.client.set(encoded_k, encoded_v, expire=self.ttl)

hash_key instance-attribute

hash_key = hash_key

provider class-attribute instance-attribute

provider = 'diskcache'

ttl instance-attribute

ttl = ttl

__init__

__init__(*, ttl=3600, hash_key=True)

ttl: The time-to-live (TTL) for each cache entry in seconds. hash_key: Whether to hash the keys before storing them in the cache.

Source code in src/msgflux/data/dbs/providers/diskcache.py
def __init__(self, *, ttl: Optional[int] = 3600, hash_key: Optional[bool] = True):
    """Args:
    ttl:
        The time-to-live (TTL) for each cache entry in seconds.
    hash_key:
        Whether to hash the keys before storing them in the cache.
    """
    if Cache is None:
        raise ImportError(
            "`diskcache` client is not available. Install with "
            "`pip install diskcache`"
        )
    self.hash_key = hash_key
    self.ttl = ttl
    self._initialize()

add

add(documents)
Source code in src/msgflux/data/dbs/providers/diskcache.py
def add(self, documents: Union[List[Dict[str, Any]], Dict[str, Any]]):
    if not isinstance(documents, list):
        documents = [documents]
    for document in documents:
        for k, v in document.items():
            encoded_k = convert_str_to_hash(k) if self.hash_key else k
            encoded_v = msgspec.msgpack.encode(v)
            self.client.set(encoded_k, encoded_v, expire=self.ttl)