Cache

This module provides caching backends and strategies for PIPolars.

Storage Module

Cache Backends

CacheBackendBase

class pipolars.cache.storage.CacheBackendBase[source]

Bases: ABC

Abstract base class for cache backends.

Cache backends are responsible for storing and retrieving Polars DataFrames with associated metadata.

Abstract base class for cache backends.

All cache backends implement these methods:

  • get(key) - Retrieve cached data

  • set(key, data, ttl) - Store data in cache

  • delete(key) - Delete cached data

  • exists(key) - Check if key exists

  • clear() - Clear all cached data

  • get_stats() - Get cache statistics

abstractmethod get(key)[source]

Retrieve data from the cache.

Parameters:

key (str) – Cache key

Returns:

Cached DataFrame or None if not found

Return type:

DataFrame | None

abstractmethod set(key, data, ttl=None)[source]

Store data in the cache.

Parameters:
  • key (str) – Cache key

  • data (DataFrame) – DataFrame to cache

  • ttl (timedelta | None) – Optional time-to-live

abstractmethod delete(key)[source]

Delete data from the cache.

Parameters:

key (str) – Cache key

Returns:

True if deleted, False if not found

Return type:

bool

abstractmethod exists(key)[source]

Check if a key exists in the cache.

Parameters:

key (str) – Cache key

Returns:

True if exists

Return type:

bool

abstractmethod clear()[source]

Clear all cached data.

abstractmethod get_stats()[source]

Get cache statistics.

Returns:

Dictionary with cache stats

Return type:

dict[str, Any]

static generate_key(tag, start, end, query_type='recorded', **kwargs)[source]

Generate a cache key from query parameters.

Parameters:
  • tag (str) – Tag name

  • start (datetime | str) – Start time

  • end (datetime | str) – End time

  • query_type (str) – Type of query

  • **kwargs (Any) – Additional parameters

Returns:

Cache key string

Return type:

str

MemoryCache

class pipolars.cache.storage.MemoryCache[source]

Bases: CacheBackendBase

In-memory cache backend using an LRU cache.

This cache is fast but data is lost when the process ends. Uses an LRU (Least Recently Used) eviction policy.

In-memory LRU cache backend.

Features:

  • Fast access

  • Thread-safe

  • LRU eviction

  • Automatic TTL expiration

  • Data lost on process exit

__init__(max_items=1000)[source]

Initialize the memory cache.

Parameters:

max_items (int) – Maximum number of items to cache

get(key)[source]

Retrieve data from the cache.

set(key, data, ttl=None)[source]

Store data in the cache.

delete(key)[source]

Delete data from the cache.

exists(key)[source]

Check if a key exists in the cache.

clear()[source]

Clear all cached data.

get_stats()[source]

Get cache statistics.

Usage:

from pipolars.cache.storage import MemoryCache

cache = MemoryCache(max_items=1000)

# Store data
cache.set("key", df, ttl=timedelta(hours=1))

# Retrieve data
df = cache.get("key")

# Check stats
print(cache.get_stats())

SQLiteCache

class pipolars.cache.storage.SQLiteCache[source]

Bases: CacheBackendBase

SQLite-based cache backend.

Provides persistent caching using SQLite database with DataFrame serialization via Apache Arrow IPC format.

SQLite-based persistent cache backend.

Features:

  • Persistent storage

  • Automatic size management

  • TTL-based expiration

  • Compressed storage (Arrow IPC format)

  • Thread-safe

__init__(path, max_size_mb=1024)[source]

Initialize the SQLite cache.

Parameters:
  • path (Path | str) – Path to the cache directory

  • max_size_mb (int) – Maximum cache size in MB

get(key)[source]

Retrieve data from the cache.

set(key, data, ttl=None)[source]

Store data in the cache.

delete(key)[source]

Delete data from the cache.

exists(key)[source]

Check if a key exists in the cache.

clear()[source]

Clear all cached data.

get_stats()[source]

Get cache statistics.

Usage:

from pipolars.cache.storage import SQLiteCache
from pathlib import Path

cache = SQLiteCache(
    path=Path("~/.pipolars/cache").expanduser(),
    max_size_mb=1024
)

# Store data
cache.set("key", df, ttl=timedelta(hours=24))

# Retrieve data
df = cache.get("key")

ArrowCache

class pipolars.cache.storage.ArrowCache[source]

Bases: CacheBackendBase

Arrow IPC file-based cache backend.

Stores each cached DataFrame as a separate Arrow IPC file for optimal I/O performance with Polars.

Arrow IPC file-based cache backend.

Features:

  • Native Polars format

  • Zero-copy reads

  • Fast serialization

  • Optimal for large DataFrames

__init__(path, max_size_mb=1024)[source]

Initialize the Arrow cache.

Parameters:
  • path (Path | str) – Path to the cache directory

  • max_size_mb (int) – Maximum cache size in MB

get(key)[source]

Retrieve data from the cache.

set(key, data, ttl=None)[source]

Store data in the cache.

delete(key)[source]

Delete data from the cache.

exists(key)[source]

Check if a key exists in the cache.

clear()[source]

Clear all cached data.

get_stats()[source]

Get cache statistics.

Usage:

from pipolars.cache.storage import ArrowCache
from pathlib import Path

cache = ArrowCache(
    path=Path("/data/pipolars_cache"),
    max_size_mb=4096
)

Factory Function

pipolars.cache.storage.get_cache_backend(config)[source]

Factory function to create a cache backend from configuration.

Parameters:

config (CacheConfig) – Cache configuration

Returns:

Cache backend instance or None if caching is disabled

Return type:

CacheBackendBase | None

Factory function to create a cache backend from configuration.

Args:

config: CacheConfig instance

Returns:

Cache backend instance or None if caching is disabled

Usage:

from pipolars.cache.storage import get_cache_backend
from pipolars.core.config import CacheConfig, CacheBackend

config = CacheConfig(
    backend=CacheBackend.SQLITE,
    max_size_mb=1024
)

cache = get_cache_backend(config)

Strategies Module

TTLStrategy

class pipolars.cache.strategies.TTLStrategy[source]

Bases: CacheStrategy

Time-to-live caching strategy.

Cached data expires after a specified duration. Once expired, data is re-fetched from the source.

Time-to-live caching strategy.

__init__(backend, ttl=datetime.timedelta(days=1))[source]

Initialize the TTL strategy.

Parameters:
property ttl: timedelta

Get the TTL duration.

get_or_fetch(key, fetch_func)[source]

Get data from cache or fetch with TTL.

set_with_ttl(key, data, ttl=None)[source]

Set data with custom TTL.

Parameters:
  • key (str) – Cache key

  • data (DataFrame) – Data to cache

  • ttl (timedelta | None) – Custom TTL (uses default if None)

Cache Key Generation

Cache keys are generated from query parameters using SHA-256 hashing:

from pipolars.cache.storage import CacheBackendBase

key = CacheBackendBase.generate_key(
    tag="SINUSOID",
    start="*-1d",
    end="*",
    query_type="recorded",
    interval="1h"  # Optional additional parameters
)

print(key)  # e.g., "a7b3c9d1e5f2a8b4..."

The key includes:

  • Tag name

  • Start time (string representation)

  • End time (string representation)

  • Query type

  • Additional parameters (JSON-serialized)

Cache Statistics

All backends provide statistics via get_stats():

stats = cache.get_stats()

# Memory cache stats
{
    "type": "memory",
    "items": 42,
    "max_items": 1000,
    "hits": 156,
    "misses": 42,
    "hit_rate": 0.788
}

# SQLite cache stats
{
    "type": "sqlite",
    "items": 42,
    "size_bytes": 131457024,
    "size_mb": 125.5,
    "max_size_mb": 1024,
    "hits": 156,
    "misses": 42,
    "hit_rate": 0.788
}

# Arrow cache stats
{
    "type": "arrow",
    "items": 42,
    "size_bytes": 131457024,
    "size_mb": 125.5,
    "max_size_mb": 4096,
    "hits": 156,
    "misses": 42,
    "hit_rate": 0.788
}

Integration with PIClient

Caching is configured via PIConfig:

from pipolars import PIClient, PIConfig
from pipolars.core.config import CacheConfig, CacheBackend, PIServerConfig

config = PIConfig(
    server=PIServerConfig(host="my-pi-server"),
    cache=CacheConfig(
        backend=CacheBackend.SQLITE,
        ttl_hours=24,
    ),
)

with PIClient(config=config) as client:
    # First query - cache miss
    df = client.recorded_values("SINUSOID", "*-1h", "*")

    # Second query - cache hit
    df = client.recorded_values("SINUSOID", "*-1h", "*")

    # Check stats
    print(client.cache_stats())

    # Clear cache
    client.clear_cache()

Custom Cache Backend

Implement custom backends by extending CacheBackendBase:

from pipolars.cache.storage import CacheBackendBase
import polars as pl
from datetime import timedelta

class RedisCache(CacheBackendBase):
    def __init__(self, redis_url: str):
        import redis
        self.redis = redis.from_url(redis_url)
        self._hits = 0
        self._misses = 0

    def get(self, key: str) -> pl.DataFrame | None:
        data = self.redis.get(key)
        if data:
            self._hits += 1
            return pl.read_ipc(data)
        self._misses += 1
        return None

    def set(
        self,
        key: str,
        data: pl.DataFrame,
        ttl: timedelta | None = None
    ) -> None:
        buffer = data.write_ipc(None)
        ex = int(ttl.total_seconds()) if ttl else None
        self.redis.set(key, buffer, ex=ex)

    def delete(self, key: str) -> bool:
        return self.redis.delete(key) > 0

    def exists(self, key: str) -> bool:
        return self.redis.exists(key) > 0

    def clear(self) -> None:
        self.redis.flushdb()

    def get_stats(self) -> dict:
        info = self.redis.info()
        total = self._hits + self._misses
        return {
            "type": "redis",
            "keys": info.get("db0", {}).get("keys", 0),
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": self._hits / total if total > 0 else 0,
        }

See Also