Cache¶
This module provides caching backends and strategies for PIPolars.
Storage Module¶
Cache Backends¶
CacheBackendBase¶
- class pipolars.cache.storage.CacheBackendBase[source]¶
Bases:
ABCAbstract base class for cache backends.
Cache backends are responsible for storing and retrieving Polars DataFrames with associated metadata.
Abstract base class for cache backends.
All cache backends implement these methods:
get(key)- Retrieve cached dataset(key, data, ttl)- Store data in cachedelete(key)- Delete cached dataexists(key)- Check if key existsclear()- Clear all cached dataget_stats()- Get cache statistics
- abstractmethod get(key)[source]¶
Retrieve data from the cache.
- Parameters:
key (str) – Cache key
- Returns:
Cached DataFrame or None if not found
- Return type:
DataFrame | None
MemoryCache¶
- class pipolars.cache.storage.MemoryCache[source]¶
Bases:
CacheBackendBaseIn-memory cache backend using an LRU cache.
This cache is fast but data is lost when the process ends. Uses an LRU (Least Recently Used) eviction policy.
In-memory LRU cache backend.
Features:
Fast access
Thread-safe
LRU eviction
Automatic TTL expiration
Data lost on process exit
Usage:
from pipolars.cache.storage import MemoryCache
cache = MemoryCache(max_items=1000)
# Store data
cache.set("key", df, ttl=timedelta(hours=1))
# Retrieve data
df = cache.get("key")
# Check stats
print(cache.get_stats())
SQLiteCache¶
- class pipolars.cache.storage.SQLiteCache[source]¶
Bases:
CacheBackendBaseSQLite-based cache backend.
Provides persistent caching using SQLite database with DataFrame serialization via Apache Arrow IPC format.
SQLite-based persistent cache backend.
Features:
Persistent storage
Automatic size management
TTL-based expiration
Compressed storage (Arrow IPC format)
Thread-safe
Usage:
from pipolars.cache.storage import SQLiteCache
from pathlib import Path
cache = SQLiteCache(
path=Path("~/.pipolars/cache").expanduser(),
max_size_mb=1024
)
# Store data
cache.set("key", df, ttl=timedelta(hours=24))
# Retrieve data
df = cache.get("key")
ArrowCache¶
- class pipolars.cache.storage.ArrowCache[source]¶
Bases:
CacheBackendBaseArrow IPC file-based cache backend.
Stores each cached DataFrame as a separate Arrow IPC file for optimal I/O performance with Polars.
Arrow IPC file-based cache backend.
Features:
Native Polars format
Zero-copy reads
Fast serialization
Optimal for large DataFrames
Usage:
from pipolars.cache.storage import ArrowCache
from pathlib import Path
cache = ArrowCache(
path=Path("/data/pipolars_cache"),
max_size_mb=4096
)
Factory Function¶
- pipolars.cache.storage.get_cache_backend(config)[source]¶
Factory function to create a cache backend from configuration.
- Parameters:
config (CacheConfig) – Cache configuration
- Returns:
Cache backend instance or None if caching is disabled
- Return type:
CacheBackendBase | None
Factory function to create a cache backend from configuration.
- Args:
config: CacheConfig instance
- Returns:
Cache backend instance or None if caching is disabled
Usage:
from pipolars.cache.storage import get_cache_backend
from pipolars.core.config import CacheConfig, CacheBackend
config = CacheConfig(
backend=CacheBackend.SQLITE,
max_size_mb=1024
)
cache = get_cache_backend(config)
Strategies Module¶
TTLStrategy¶
- class pipolars.cache.strategies.TTLStrategy[source]¶
Bases:
CacheStrategyTime-to-live caching strategy.
Cached data expires after a specified duration. Once expired, data is re-fetched from the source.
Time-to-live caching strategy.
- __init__(backend, ttl=datetime.timedelta(days=1))[source]¶
Initialize the TTL strategy.
- Parameters:
backend (CacheBackendBase) – Cache backend
ttl (timedelta) – Time-to-live for cached data
Cache Key Generation¶
Cache keys are generated from query parameters using SHA-256 hashing:
from pipolars.cache.storage import CacheBackendBase
key = CacheBackendBase.generate_key(
tag="SINUSOID",
start="*-1d",
end="*",
query_type="recorded",
interval="1h" # Optional additional parameters
)
print(key) # e.g., "a7b3c9d1e5f2a8b4..."
The key includes:
Tag name
Start time (string representation)
End time (string representation)
Query type
Additional parameters (JSON-serialized)
Cache Statistics¶
All backends provide statistics via get_stats():
stats = cache.get_stats()
# Memory cache stats
{
"type": "memory",
"items": 42,
"max_items": 1000,
"hits": 156,
"misses": 42,
"hit_rate": 0.788
}
# SQLite cache stats
{
"type": "sqlite",
"items": 42,
"size_bytes": 131457024,
"size_mb": 125.5,
"max_size_mb": 1024,
"hits": 156,
"misses": 42,
"hit_rate": 0.788
}
# Arrow cache stats
{
"type": "arrow",
"items": 42,
"size_bytes": 131457024,
"size_mb": 125.5,
"max_size_mb": 4096,
"hits": 156,
"misses": 42,
"hit_rate": 0.788
}
Integration with PIClient¶
Caching is configured via PIConfig:
from pipolars import PIClient, PIConfig
from pipolars.core.config import CacheConfig, CacheBackend, PIServerConfig
config = PIConfig(
server=PIServerConfig(host="my-pi-server"),
cache=CacheConfig(
backend=CacheBackend.SQLITE,
ttl_hours=24,
),
)
with PIClient(config=config) as client:
# First query - cache miss
df = client.recorded_values("SINUSOID", "*-1h", "*")
# Second query - cache hit
df = client.recorded_values("SINUSOID", "*-1h", "*")
# Check stats
print(client.cache_stats())
# Clear cache
client.clear_cache()
Custom Cache Backend¶
Implement custom backends by extending CacheBackendBase:
from pipolars.cache.storage import CacheBackendBase
import polars as pl
from datetime import timedelta
class RedisCache(CacheBackendBase):
def __init__(self, redis_url: str):
import redis
self.redis = redis.from_url(redis_url)
self._hits = 0
self._misses = 0
def get(self, key: str) -> pl.DataFrame | None:
data = self.redis.get(key)
if data:
self._hits += 1
return pl.read_ipc(data)
self._misses += 1
return None
def set(
self,
key: str,
data: pl.DataFrame,
ttl: timedelta | None = None
) -> None:
buffer = data.write_ipc(None)
ex = int(ttl.total_seconds()) if ttl else None
self.redis.set(key, buffer, ex=ex)
def delete(self, key: str) -> bool:
return self.redis.delete(key) > 0
def exists(self, key: str) -> bool:
return self.redis.exists(key) > 0
def clear(self) -> None:
self.redis.flushdb()
def get_stats(self) -> dict:
info = self.redis.info()
total = self._hits + self._misses
return {
"type": "redis",
"keys": info.get("db0", {}).get("keys", 0),
"hits": self._hits,
"misses": self._misses,
"hit_rate": self._hits / total if total > 0 else 0,
}
See Also¶
Caching - Caching guide
Configuration - CacheConfig options