"""Cache storage backends for PI data.
This module provides different storage backends for caching
PI data locally, including in-memory, SQLite, and Arrow file-based caches.
"""
from __future__ import annotations
import hashlib
import json
import logging
import sqlite3
import threading
from abc import ABC, abstractmethod
from collections import OrderedDict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import polars as pl
import pyarrow as pa
import pyarrow.ipc as ipc
from pipolars.core.config import CacheBackend, CacheConfig
from pipolars.core.exceptions import PICacheError
logger = logging.getLogger(__name__)
[docs]
class CacheBackendBase(ABC):
"""Abstract base class for cache backends.
Cache backends are responsible for storing and retrieving
Polars DataFrames with associated metadata.
"""
[docs]
@abstractmethod
def get(self, key: str) -> pl.DataFrame | None:
"""Retrieve data from the cache.
Args:
key: Cache key
Returns:
Cached DataFrame or None if not found
"""
pass
[docs]
@abstractmethod
def set(
self,
key: str,
data: pl.DataFrame,
ttl: timedelta | None = None,
) -> None:
"""Store data in the cache.
Args:
key: Cache key
data: DataFrame to cache
ttl: Optional time-to-live
"""
pass
[docs]
@abstractmethod
def delete(self, key: str) -> bool:
"""Delete data from the cache.
Args:
key: Cache key
Returns:
True if deleted, False if not found
"""
pass
[docs]
@abstractmethod
def exists(self, key: str) -> bool:
"""Check if a key exists in the cache.
Args:
key: Cache key
Returns:
True if exists
"""
pass
[docs]
@abstractmethod
def clear(self) -> None:
"""Clear all cached data."""
pass
[docs]
@abstractmethod
def get_stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dictionary with cache stats
"""
pass
[docs]
@staticmethod
def generate_key(
tag: str,
start: datetime | str,
end: datetime | str,
query_type: str = "recorded",
**kwargs: Any,
) -> str:
"""Generate a cache key from query parameters.
Args:
tag: Tag name
start: Start time
end: End time
query_type: Type of query
**kwargs: Additional parameters
Returns:
Cache key string
"""
key_parts = [
tag,
str(start),
str(end),
query_type,
]
if kwargs:
key_parts.append(json.dumps(kwargs, sort_keys=True))
key_string = "|".join(key_parts)
return hashlib.sha256(key_string.encode()).hexdigest()[:32]
[docs]
class MemoryCache(CacheBackendBase):
"""In-memory cache backend using an LRU cache.
This cache is fast but data is lost when the process ends.
Uses an LRU (Least Recently Used) eviction policy.
"""
[docs]
def __init__(self, max_items: int = 1000) -> None:
"""Initialize the memory cache.
Args:
max_items: Maximum number of items to cache
"""
self._cache: OrderedDict[str, tuple[pl.DataFrame, datetime | None]] = (
OrderedDict()
)
self._max_items = max_items
self._lock = threading.Lock()
self._hits = 0
self._misses = 0
[docs]
def get(self, key: str) -> pl.DataFrame | None:
"""Retrieve data from the cache."""
with self._lock:
if key not in self._cache:
self._misses += 1
return None
data, expires_at = self._cache[key]
# Check TTL
if expires_at and datetime.now() > expires_at:
del self._cache[key]
self._misses += 1
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
self._hits += 1
return data
[docs]
def set(
self,
key: str,
data: pl.DataFrame,
ttl: timedelta | None = None,
) -> None:
"""Store data in the cache."""
with self._lock:
expires_at = datetime.now() + ttl if ttl else None
# Evict oldest items if at capacity
while len(self._cache) >= self._max_items:
self._cache.popitem(last=False)
self._cache[key] = (data, expires_at)
self._cache.move_to_end(key)
[docs]
def delete(self, key: str) -> bool:
"""Delete data from the cache."""
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
[docs]
def exists(self, key: str) -> bool:
"""Check if a key exists in the cache."""
with self._lock:
if key not in self._cache:
return False
_, expires_at = self._cache[key]
if expires_at and datetime.now() > expires_at:
del self._cache[key]
return False
return True
[docs]
def clear(self) -> None:
"""Clear all cached data."""
with self._lock:
self._cache.clear()
[docs]
def get_stats(self) -> dict[str, Any]:
"""Get cache statistics."""
with self._lock:
total = self._hits + self._misses
hit_rate = self._hits / total if total > 0 else 0.0
return {
"type": "memory",
"items": len(self._cache),
"max_items": self._max_items,
"hits": self._hits,
"misses": self._misses,
"hit_rate": hit_rate,
}
[docs]
class SQLiteCache(CacheBackendBase):
"""SQLite-based cache backend.
Provides persistent caching using SQLite database with
DataFrame serialization via Apache Arrow IPC format.
"""
[docs]
def __init__(
self,
path: Path | str,
max_size_mb: int = 1024,
) -> None:
"""Initialize the SQLite cache.
Args:
path: Path to the cache directory
max_size_mb: Maximum cache size in MB
"""
self._path = Path(path)
self._path.mkdir(parents=True, exist_ok=True)
self._db_path = self._path / "cache.db"
self._max_size_mb = max_size_mb
self._lock = threading.Lock()
self._hits = 0
self._misses = 0
self._init_db()
def _init_db(self) -> None:
"""Initialize the database schema."""
with sqlite3.connect(self._db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
data BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
size_bytes INTEGER
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_expires_at ON cache(expires_at)
""")
conn.commit()
def _serialize_df(self, df: pl.DataFrame) -> bytes:
"""Serialize a DataFrame to bytes."""
arrow_table = df.to_arrow()
sink = pa.BufferOutputStream()
with ipc.new_stream(sink, arrow_table.schema) as writer:
writer.write_table(arrow_table)
result: bytes = sink.getvalue().to_pybytes()
return result
def _deserialize_df(self, data: bytes) -> pl.DataFrame:
"""Deserialize bytes to a DataFrame."""
reader = ipc.open_stream(data)
arrow_table = reader.read_all()
result = pl.from_arrow(arrow_table)
assert isinstance(result, pl.DataFrame)
return result
[docs]
def get(self, key: str) -> pl.DataFrame | None:
"""Retrieve data from the cache."""
with self._lock, sqlite3.connect(self._db_path) as conn:
cursor = conn.execute(
"""
SELECT data, expires_at FROM cache WHERE key = ?
""",
(key,),
)
row = cursor.fetchone()
if row is None:
self._misses += 1
return None
data, expires_at = row
# Check TTL
if expires_at:
expires_dt = datetime.fromisoformat(expires_at)
if datetime.now() > expires_dt:
conn.execute("DELETE FROM cache WHERE key = ?", (key,))
conn.commit()
self._misses += 1
return None
self._hits += 1
return self._deserialize_df(data)
[docs]
def set(
self,
key: str,
data: pl.DataFrame,
ttl: timedelta | None = None,
) -> None:
"""Store data in the cache."""
with self._lock:
serialized = self._serialize_df(data)
size_bytes = len(serialized)
# Check if we need to evict
self._maybe_evict(size_bytes)
expires_at = (
(datetime.now() + ttl).isoformat() if ttl else None
)
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO cache (key, data, expires_at, size_bytes)
VALUES (?, ?, ?, ?)
""",
(key, serialized, expires_at, size_bytes),
)
conn.commit()
def _maybe_evict(self, new_size: int) -> None:
"""Evict old entries if cache is too large."""
max_bytes = self._max_size_mb * 1024 * 1024
with sqlite3.connect(self._db_path) as conn:
# Get current size
cursor = conn.execute("SELECT SUM(size_bytes) FROM cache")
current_size = cursor.fetchone()[0] or 0
if current_size + new_size > max_bytes:
# Delete expired entries first
conn.execute(
"DELETE FROM cache WHERE expires_at < ?",
(datetime.now().isoformat(),),
)
# If still too large, delete oldest entries
cursor = conn.execute("SELECT SUM(size_bytes) FROM cache")
current_size = cursor.fetchone()[0] or 0
while current_size + new_size > max_bytes:
conn.execute("""
DELETE FROM cache WHERE key IN (
SELECT key FROM cache ORDER BY created_at LIMIT 10
)
""")
cursor = conn.execute("SELECT SUM(size_bytes) FROM cache")
new_current = cursor.fetchone()[0] or 0
if new_current >= current_size:
break
current_size = new_current
conn.commit()
[docs]
def delete(self, key: str) -> bool:
"""Delete data from the cache."""
with self._lock, sqlite3.connect(self._db_path) as conn:
cursor = conn.execute(
"DELETE FROM cache WHERE key = ?", (key,)
)
conn.commit()
return cursor.rowcount > 0
[docs]
def exists(self, key: str) -> bool:
"""Check if a key exists in the cache."""
with self._lock, sqlite3.connect(self._db_path) as conn:
cursor = conn.execute(
"""
SELECT 1 FROM cache
WHERE key = ? AND (expires_at IS NULL OR expires_at > ?)
""",
(key, datetime.now().isoformat()),
)
return cursor.fetchone() is not None
[docs]
def clear(self) -> None:
"""Clear all cached data."""
with self._lock, sqlite3.connect(self._db_path) as conn:
conn.execute("DELETE FROM cache")
conn.commit()
[docs]
def get_stats(self) -> dict[str, Any]:
"""Get cache statistics."""
with self._lock, sqlite3.connect(self._db_path) as conn:
cursor = conn.execute(
"SELECT COUNT(*), SUM(size_bytes) FROM cache"
)
count, total_bytes = cursor.fetchone()
total = self._hits + self._misses
hit_rate = self._hits / total if total > 0 else 0.0
return {
"type": "sqlite",
"items": count or 0,
"size_bytes": total_bytes or 0,
"size_mb": (total_bytes or 0) / (1024 * 1024),
"max_size_mb": self._max_size_mb,
"hits": self._hits,
"misses": self._misses,
"hit_rate": hit_rate,
}
[docs]
class ArrowCache(CacheBackendBase):
"""Arrow IPC file-based cache backend.
Stores each cached DataFrame as a separate Arrow IPC file
for optimal I/O performance with Polars.
"""
[docs]
def __init__(
self,
path: Path | str,
max_size_mb: int = 1024,
) -> None:
"""Initialize the Arrow cache.
Args:
path: Path to the cache directory
max_size_mb: Maximum cache size in MB
"""
self._path = Path(path)
self._path.mkdir(parents=True, exist_ok=True)
self._data_path = self._path / "data"
self._data_path.mkdir(exist_ok=True)
self._meta_path = self._path / "metadata.json"
self._max_size_mb = max_size_mb
self._lock = threading.Lock()
self._hits = 0
self._misses = 0
self._load_metadata()
def _load_metadata(self) -> None:
"""Load cache metadata from disk."""
if self._meta_path.exists():
with self._meta_path.open() as f:
self._metadata = json.load(f)
else:
self._metadata = {"entries": {}}
def _save_metadata(self) -> None:
"""Save cache metadata to disk."""
with self._meta_path.open("w") as f:
json.dump(self._metadata, f)
def _get_file_path(self, key: str) -> Path:
"""Get the file path for a cache key."""
return self._data_path / f"{key}.arrow"
[docs]
def get(self, key: str) -> pl.DataFrame | None:
"""Retrieve data from the cache."""
with self._lock:
if key not in self._metadata["entries"]:
self._misses += 1
return None
entry = self._metadata["entries"][key]
# Check TTL
if entry.get("expires_at"):
expires_dt = datetime.fromisoformat(entry["expires_at"])
if datetime.now() > expires_dt:
self._delete_entry(key)
self._misses += 1
return None
file_path = self._get_file_path(key)
if not file_path.exists():
del self._metadata["entries"][key]
self._save_metadata()
self._misses += 1
return None
self._hits += 1
return pl.read_ipc(file_path)
[docs]
def set(
self,
key: str,
data: pl.DataFrame,
ttl: timedelta | None = None,
) -> None:
"""Store data in the cache."""
with self._lock:
file_path = self._get_file_path(key)
# Write data
data.write_ipc(file_path)
# Update metadata
self._metadata["entries"][key] = {
"created_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + ttl).isoformat() if ttl else None,
"size_bytes": file_path.stat().st_size,
}
self._save_metadata()
# Maybe evict
self._maybe_evict()
def _delete_entry(self, key: str) -> None:
"""Delete a cache entry."""
file_path = self._get_file_path(key)
if file_path.exists():
file_path.unlink()
if key in self._metadata["entries"]:
del self._metadata["entries"][key]
self._save_metadata()
def _maybe_evict(self) -> None:
"""Evict entries if cache is too large."""
max_bytes = self._max_size_mb * 1024 * 1024
# Calculate current size
total_size = sum(
e.get("size_bytes", 0)
for e in self._metadata["entries"].values()
)
if total_size <= max_bytes:
return
# Delete expired first
now = datetime.now()
expired = [
k for k, v in self._metadata["entries"].items()
if v.get("expires_at") and datetime.fromisoformat(v["expires_at"]) < now
]
for key in expired:
self._delete_entry(key)
# Delete oldest if still too large
entries = sorted(
self._metadata["entries"].items(),
key=lambda x: x[1].get("created_at", ""),
)
for key, _ in entries:
total_size = sum(
e.get("size_bytes", 0)
for e in self._metadata["entries"].values()
)
if total_size <= max_bytes:
break
self._delete_entry(key)
[docs]
def delete(self, key: str) -> bool:
"""Delete data from the cache."""
with self._lock:
if key in self._metadata["entries"]:
self._delete_entry(key)
return True
return False
[docs]
def exists(self, key: str) -> bool:
"""Check if a key exists in the cache."""
with self._lock:
if key not in self._metadata["entries"]:
return False
entry = self._metadata["entries"][key]
if entry.get("expires_at"):
expires_dt = datetime.fromisoformat(entry["expires_at"])
if datetime.now() > expires_dt:
return False
return self._get_file_path(key).exists()
[docs]
def clear(self) -> None:
"""Clear all cached data."""
with self._lock:
for key in list(self._metadata["entries"].keys()):
self._delete_entry(key)
[docs]
def get_stats(self) -> dict[str, Any]:
"""Get cache statistics."""
with self._lock:
total_bytes = sum(
e.get("size_bytes", 0)
for e in self._metadata["entries"].values()
)
total = self._hits + self._misses
hit_rate = self._hits / total if total > 0 else 0.0
return {
"type": "arrow",
"items": len(self._metadata["entries"]),
"size_bytes": total_bytes,
"size_mb": total_bytes / (1024 * 1024),
"max_size_mb": self._max_size_mb,
"hits": self._hits,
"misses": self._misses,
"hit_rate": hit_rate,
}
[docs]
def get_cache_backend(config: CacheConfig) -> CacheBackendBase | None:
"""Factory function to create a cache backend from configuration.
Args:
config: Cache configuration
Returns:
Cache backend instance or None if caching is disabled
"""
if config.backend == CacheBackend.NONE:
return None
elif config.backend == CacheBackend.MEMORY:
return MemoryCache()
elif config.backend == CacheBackend.SQLITE:
return SQLiteCache(config.path, config.max_size_mb)
elif config.backend == CacheBackend.ARROW:
return ArrowCache(config.path, config.max_size_mb)
else:
raise PICacheError(f"Unknown cache backend: {config.backend}")