litefs.cache.redis 源代码

#!/usr/bin/env python
# coding: utf-8

"""
Redis 缓存后端

提供基于 Redis 的缓存实现
"""

import time
import json
from typing import Any, Optional


[文档] class RedisCache: """ Redis 缓存实现 使用 Redis 作为缓存后端,提供高性能的分布式缓存支持 """ def __init__( self, redis_client=None, host: str = "localhost", port: int = 6379, db: int = 0, password: Optional[str] = None, key_prefix: str = "litefs:", expiration_time: int = 3600, **kwargs ): """ 初始化 Redis 缓存 Args: redis_client: Redis 客户端实例(如果提供,则忽略其他连接参数) host: Redis 服务器地址 port: Redis 服务器端口 db: Redis 数据库编号 password: Redis 密码 key_prefix: 键前缀 expiration_time: 默认过期时间(秒) **kwargs: 其他 Redis 连接参数 """ self._key_prefix = key_prefix self._expiration_time = expiration_time if redis_client is not None: self._redis = redis_client else: try: import redis except ImportError: raise ImportError( "redis-py 包未安装,请使用 pip install redis 安装" ) connection_params = { "host": host, "port": port, "db": db, "decode_responses": True, **kwargs } if password: connection_params["password"] = password self._redis = redis.Redis(**connection_params) self._test_connection() def _test_connection(self): """测试 Redis 连接""" try: self._redis.ping() except Exception as e: raise ConnectionError(f"无法连接到 Redis 服务器: {e}") def _make_key(self, key: str) -> str: """生成带前缀的键""" return f"{self._key_prefix}{key}"
[文档] def put(self, key: str, val: Any, expiration: Optional[int] = None) -> None: """ 存储值到缓存 Args: key: 缓存键 val: 缓存值 expiration: 过期时间(秒),如果为 None 则使用默认过期时间 """ redis_key = self._make_key(key) expiration = expiration if expiration is not None else self._expiration_time # 序列化值 try: val_str = json.dumps(val) except Exception as e: raise ValueError(f"无法序列化值: {e}") if expiration > 0: self._redis.setex(redis_key, expiration, val_str) else: self._redis.set(redis_key, val_str)
[文档] def get(self, key: str) -> Optional[Any]: """ 从缓存获取值 Args: key: 缓存键 Returns: 缓存值,如果不存在则返回 None """ redis_key = self._make_key(key) val_str = self._redis.get(redis_key) if val_str is None: return None # 反序列化值 try: return json.loads(val_str) except Exception as e: # 如果反序列化失败,返回原始字符串 return val_str
[文档] def delete(self, key: str) -> None: """ 从缓存删除值 Args: key: 缓存键 """ redis_key = self._make_key(key) self._redis.delete(redis_key)
[文档] def delete_pattern(self, pattern: str) -> int: """ 删除匹配模式的键 Args: pattern: 键模式 Returns: 删除的键数量 """ redis_pattern = self._make_key(pattern) return self._redis.delete(*self._redis.keys(redis_pattern))
[文档] def exists(self, key: str) -> bool: """ 检查键是否存在 Args: key: 缓存键 Returns: 键是否存在 """ redis_key = self._make_key(key) return self._redis.exists(redis_key) > 0
[文档] def expire(self, key: str, expiration: int) -> bool: """ 设置键的过期时间 Args: key: 缓存键 expiration: 过期时间(秒) Returns: 是否设置成功 """ redis_key = self._make_key(key) return self._redis.expire(redis_key, expiration)
[文档] def ttl(self, key: str) -> int: """ 获取键的剩余过期时间 Args: key: 缓存键 Returns: 剩余过期时间(秒),如果键不存在则返回 -2 """ redis_key = self._make_key(key) return self._redis.ttl(redis_key)
[文档] def clear(self) -> None: """ 清空所有缓存 注意:这会删除所有带前缀的键 """ pattern = self._make_key("*") keys = self._redis.keys(pattern) if keys: self._redis.delete(*keys)
def __len__(self) -> int: """ 获取缓存中的键数量 Returns: 键数量 """ pattern = self._make_key("*") keys = self._redis.keys(pattern) return len(keys) if keys else 0
[文档] def get_many(self, keys: list) -> dict: """ 批量获取值 Args: keys: 缓存键列表 Returns: 键值字典 """ if not keys: return {} redis_keys = [self._make_key(key) for key in keys] values = self._redis.mget(redis_keys) result = {} for i, key in enumerate(keys): val_str = values[i] if val_str is not None: # 反序列化值 try: result[key] = json.loads(val_str) except Exception: # 如果反序列化失败,返回原始字符串 result[key] = val_str return result
[文档] def set_many(self, mapping: dict, expiration: Optional[int] = None) -> None: """ 批量存储值 Args: mapping: 键值字典 expiration: 过期时间(秒),如果为 None 则使用默认过期时间 """ if not mapping: return expiration = expiration if expiration is not None else self._expiration_time pipe = self._redis.pipeline() for key, value in mapping.items(): # 序列化值 try: val_str = json.dumps(value) except Exception as e: raise ValueError(f"无法序列化值: {e}") redis_key = self._make_key(key) if expiration > 0: pipe.setex(redis_key, expiration, val_str) else: pipe.set(redis_key, val_str) pipe.execute()
[文档] def delete_many(self, keys: list) -> None: """ 批量删除键 Args: keys: 缓存键列表 """ if not keys: return redis_keys = [self._make_key(key) for key in keys] if redis_keys: self._redis.delete(*redis_keys)
[文档] def close(self) -> None: """ 关闭 Redis 连接 """ try: self._redis.close() except Exception: pass
def __enter__(self): """支持上下文管理器""" return self def __exit__(self, exc_type, exc_val, exc_tb): """支持上下文管理器""" self.close()
__all__ = [ "RedisCache", ]