Coverage for src / mysingle / auth / cache.py: 0%

233 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1""" 

2User Authentication Cache System 

3 

4Kong Gateway 인증 성능 최적화를 위한 User 객체 캐싱 시스템입니다. 

5Redis를 우선 사용하고, Redis가 없으면 In-Memory 캐시로 폴백합니다. 

6 

7Architecture: 

8- Primary: Redis (다중 인스턴스 간 캐시 공유) 

9- Fallback: In-Memory TTL Cache (단일 인스턴스) 

10 

11Cache Strategy: 

12- Key Pattern: user:{user_id} 

13- TTL: 5 minutes (300 seconds) 

14- Invalidation: 명시적 호출 또는 TTL 만료 

15 

16Usage: 

17 from mysingle.auth.cache import get_user_cache 

18 

19 cache = get_user_cache() 

20 

21 # 캐시에서 조회 

22 user = await cache.get_user(user_id) 

23 

24 # 캐시에 저장 

25 await cache.set_user(user) 

26 

27 # 캐시 무효화 

28 await cache.invalidate_user(user_id) 

29""" 

30 

31import json 

32from abc import ABC, abstractmethod 

33from datetime import datetime, timedelta 

34from typing import Any, Optional 

35 

36from ..core.logging import get_structured_logger 

37from .models import User 

38 

39logger = get_structured_logger(__name__) 

40 

41# Redis 타입 힌트 (optional) 

42try: 

43 from redis.asyncio import Redis 

44except ImportError: 

45 Redis = Any # type: ignore 

46 

47 

48# ============================================================================= 

49# Base Cache Interface 

50# ============================================================================= 

51 

52 

53class BaseUserCache(ABC): 

54 """User 캐시 추상 기본 클래스""" 

55 

56 default_ttl: int = 300 

57 key_prefix: str = "user" 

58 

59 @abstractmethod 

60 async def get_user(self, user_id: str) -> Optional[User]: 

61 """사용자 조회""" 

62 pass 

63 

64 @abstractmethod 

65 async def set_user(self, user: User, ttl: int | None = None) -> None: 

66 """사용자 캐시 저장""" 

67 pass 

68 

69 @abstractmethod 

70 async def invalidate_user(self, user_id: str) -> None: 

71 """사용자 캐시 무효화""" 

72 pass 

73 

74 @abstractmethod 

75 async def clear_all(self) -> None: 

76 """전체 캐시 삭제""" 

77 pass 

78 

79 @abstractmethod 

80 async def health_check(self) -> bool: 

81 """캐시 시스템 상태 확인""" 

82 pass 

83 

84 

85# ============================================================================= 

86# Redis Cache Implementation 

87# ============================================================================= 

88 

89 

90class RedisUserCache(BaseUserCache): 

91 """ 

92 Redis 기반 User 캐시 

93 

94 다중 서비스 인스턴스 간 캐시를 공유할 수 있습니다. 

95 """ 

96 

97 def __init__( 

98 self, 

99 redis_url: str = "redis://localhost:6379/0", 

100 *, 

101 key_prefix: str = "user", 

102 default_ttl: int = 300, 

103 ): 

104 """ 

105 Redis 캐시 초기화 

106 

107 Args: 

108 redis_url: Redis 연결 URL 

109 """ 

110 self.redis_url = redis_url 

111 self.redis_client: Optional[Redis] = None # type: ignore 

112 self._initialized = False 

113 self._init_attempted = False 

114 self.key_prefix = key_prefix 

115 self.default_ttl = default_ttl 

116 

117 async def _ensure_initialized(self): 

118 """Redis 클라이언트 초기화 (lazy initialization)""" 

119 if self._initialized: 

120 return True 

121 

122 if self._init_attempted: 

123 return False 

124 

125 self._init_attempted = True 

126 

127 try: 

128 import redis.asyncio as redis 

129 

130 self.redis_client = redis.from_url( 

131 self.redis_url, 

132 encoding="utf-8", 

133 decode_responses=True, 

134 socket_connect_timeout=2, 

135 socket_timeout=2, 

136 ) 

137 

138 # 연결 테스트 

139 if self.redis_client: 

140 await self.redis_client.ping() 

141 self._initialized = True 

142 logger.info("Redis User Cache initialized successfully") 

143 return True 

144 

145 except ImportError: 

146 logger.warning( 

147 "redis package not installed - falling back to in-memory cache" 

148 ) 

149 return False 

150 except Exception as e: 

151 logger.warning( 

152 f"Redis connection failed: {e} - falling back to in-memory cache" 

153 ) 

154 return False 

155 

156 def _user_cache_key(self, user_id: str) -> str: 

157 """User 캐시 키 생성""" 

158 return f"{self.key_prefix}:{user_id}" 

159 

160 def _serialize_user(self, user: User) -> str: 

161 """User 객체를 JSON 문자열로 직렬화""" 

162 user_dict = { 

163 "id": str(user.id), 

164 "email": user.email, 

165 "is_active": user.is_active, 

166 "is_verified": user.is_verified, 

167 "is_superuser": user.is_superuser, 

168 # 추가 필드는 필요에 따라 확장 

169 } 

170 

171 # 선택적 필드 추가 

172 optional_fields = ["full_name", "first_name", "last_name"] 

173 for field in optional_fields: 

174 if hasattr(user, field): 

175 user_dict[field] = getattr(user, field) 

176 

177 return json.dumps(user_dict) 

178 

179 def _deserialize_user(self, data: str) -> User: 

180 """JSON 문자열을 User 객체로 역직렬화""" 

181 user_dict = json.loads(data) 

182 

183 # User 모델 생성 (Beanie Document의 경우 직접 생성 불가능할 수 있음) 

184 # 여기서는 dict를 User처럼 사용할 수 있는 Proxy 객체 반환 

185 # 실제 프로덕션에서는 User.parse_obj(user_dict) 또는 from_dict() 메서드 사용 

186 return User(**user_dict) 

187 

188 async def get_user(self, user_id: str) -> Optional[User]: 

189 """Redis에서 사용자 조회""" 

190 if not await self._ensure_initialized(): 

191 return None 

192 

193 if self.redis_client is None: 

194 return None 

195 

196 try: 

197 cache_key = self._user_cache_key(user_id) 

198 data = await self.redis_client.get(cache_key) 

199 

200 if data: 

201 logger.debug(f"Redis cache HIT for user_id: {user_id}") 

202 return self._deserialize_user(data) 

203 else: 

204 logger.debug(f"Redis cache MISS for user_id: {user_id}") 

205 return None 

206 

207 except Exception as e: 

208 logger.error(f"Redis get_user error: {e}") 

209 return None 

210 

211 async def set_user(self, user: User, ttl: int | None = None) -> None: 

212 """Redis에 사용자 캐시 저장""" 

213 if not await self._ensure_initialized(): 

214 return 

215 

216 if self.redis_client is None: 

217 return 

218 

219 try: 

220 cache_key = self._user_cache_key(str(user.id)) 

221 data = self._serialize_user(user) 

222 ttl_to_use = ttl if ttl is not None else self.default_ttl 

223 await self.redis_client.setex(cache_key, ttl_to_use, data) 

224 logger.debug(f"Redis cache SET for user_id: {user.id}, TTL: {ttl_to_use}s") 

225 

226 except Exception as e: 

227 logger.error(f"Redis set_user error: {e}") 

228 

229 async def invalidate_user(self, user_id: str) -> None: 

230 """Redis에서 사용자 캐시 무효화""" 

231 if not await self._ensure_initialized(): 

232 return 

233 

234 if self.redis_client is None: 

235 return 

236 

237 try: 

238 cache_key = self._user_cache_key(user_id) 

239 await self.redis_client.delete(cache_key) 

240 logger.debug(f"Redis cache INVALIDATED for user_id: {user_id}") 

241 

242 except Exception as e: 

243 logger.error(f"Redis invalidate_user error: {e}") 

244 

245 async def clear_all(self) -> None: 

246 """모든 user 캐시 삭제 (개발/테스트용)""" 

247 if not await self._ensure_initialized(): 

248 return 

249 

250 if self.redis_client is None: 

251 return 

252 

253 try: 

254 # user:* 패턴의 모든 키 삭제 

255 cursor = 0 

256 while True: 

257 cursor, keys = await self.redis_client.scan( 

258 cursor, match="user:*", count=100 

259 ) 

260 if keys: 

261 await self.redis_client.delete(*keys) 

262 if cursor == 0: 

263 break 

264 

265 logger.info("Redis cache CLEARED (all user keys)") 

266 

267 except Exception as e: 

268 logger.error(f"Redis clear_all error: {e}") 

269 

270 async def health_check(self) -> bool: 

271 """Redis 연결 상태 확인""" 

272 if not await self._ensure_initialized(): 

273 return False 

274 

275 if self.redis_client is None: 

276 return False 

277 

278 try: 

279 await self.redis_client.ping() 

280 return True 

281 except Exception: 

282 return False 

283 

284 

285# ============================================================================= 

286# In-Memory Cache Implementation 

287# ============================================================================= 

288 

289 

290class InMemoryUserCache(BaseUserCache): 

291 """ 

292 In-Memory TTL 기반 User 캐시 

293 

294 Redis가 없을 때 폴백으로 사용됩니다. 

295 단일 프로세스 내에서만 유효합니다. 

296 """ 

297 

298 def __init__(self, *, key_prefix: str = "user", default_ttl: int = 300): 

299 """In-Memory 캐시 초기화""" 

300 self._cache: dict[str, tuple[User, datetime]] = {} 

301 self.key_prefix = key_prefix 

302 self.default_ttl = default_ttl 

303 logger.info("In-Memory User Cache initialized") 

304 

305 def _is_expired(self, expiry: datetime) -> bool: 

306 """캐시 만료 여부 확인""" 

307 return datetime.utcnow() > expiry 

308 

309 def _cleanup_expired(self): 

310 """만료된 캐시 항목 제거 (주기적 실행 필요)""" 

311 now = datetime.utcnow() 

312 expired_keys = [key for key, (_, expiry) in self._cache.items() if now > expiry] 

313 for key in expired_keys: 

314 del self._cache[key] 

315 

316 async def get_user(self, user_id: str) -> Optional[User]: 

317 """In-Memory 캐시에서 사용자 조회""" 

318 self._cleanup_expired() 

319 cache_key = f"{self.key_prefix}:{user_id}" 

320 if cache_key in self._cache: 

321 user, expiry = self._cache[cache_key] 

322 if not self._is_expired(expiry): 

323 logger.debug(f"In-Memory cache HIT for user_id: {user_id}") 

324 return user 

325 else: 

326 # 만료된 항목 삭제 

327 del self._cache[cache_key] 

328 logger.debug(f"In-Memory cache EXPIRED for user_id: {user_id}") 

329 

330 logger.debug(f"In-Memory cache MISS for user_id: {user_id}") 

331 return None 

332 

333 async def set_user(self, user: User, ttl: int | None = None) -> None: 

334 """In-Memory 캐시에 사용자 저장""" 

335 cache_key = f"{self.key_prefix}:{user.id}" 

336 ttl_to_use = ttl if ttl is not None else self.default_ttl 

337 expiry = datetime.utcnow() + timedelta(seconds=ttl_to_use) 

338 self._cache[cache_key] = (user, expiry) 

339 logger.debug(f"In-Memory cache SET for user_id: {user.id}, TTL: {ttl_to_use}s") 

340 

341 async def invalidate_user(self, user_id: str) -> None: 

342 """In-Memory 캐시에서 사용자 무효화""" 

343 cache_key = f"{self.key_prefix}:{user_id}" 

344 if cache_key in self._cache: 

345 del self._cache[cache_key] 

346 logger.debug(f"In-Memory cache INVALIDATED for user_id: {user_id}") 

347 

348 async def clear_all(self) -> None: 

349 """전체 캐시 삭제""" 

350 self._cache.clear() 

351 logger.info("In-Memory cache CLEARED") 

352 

353 async def health_check(self) -> bool: 

354 """In-Memory 캐시는 항상 사용 가능""" 

355 return True 

356 

357 

358# ============================================================================= 

359# Hybrid Cache (Redis with In-Memory Fallback) 

360# ============================================================================= 

361 

362 

363class HybridUserCache(BaseUserCache): 

364 """ 

365 Redis + In-Memory 하이브리드 캐시 

366 

367 Redis를 우선 사용하고, Redis가 없으면 In-Memory로 폴백합니다. 

368 """ 

369 

370 def __init__( 

371 self, 

372 redis_url: Optional[str] = None, 

373 *, 

374 key_prefix: str = "user", 

375 default_ttl: int = 300, 

376 ): 

377 """ 

378 하이브리드 캐시 초기화 

379 

380 Args: 

381 redis_url: Redis 연결 URL (None이면 환경변수 사용) 

382 """ 

383 # Redis 캐시 (Primary) 

384 self.redis_cache = RedisUserCache( 

385 redis_url or "redis://localhost:6379/0", 

386 key_prefix=key_prefix, 

387 default_ttl=default_ttl, 

388 ) 

389 

390 # In-Memory 캐시 (Fallback) 

391 self.memory_cache = InMemoryUserCache( 

392 key_prefix=key_prefix, default_ttl=default_ttl 

393 ) 

394 

395 self._use_redis = True # Redis 사용 가능 여부 

396 self.key_prefix = key_prefix 

397 self.default_ttl = default_ttl 

398 

399 async def _check_redis_available(self) -> bool: 

400 """Redis 사용 가능 여부 확인 (캐싱)""" 

401 if not self._use_redis: 

402 return False 

403 

404 is_available = await self.redis_cache.health_check() 

405 if not is_available and self._use_redis: 

406 logger.warning("Redis unavailable - falling back to in-memory cache") 

407 self._use_redis = False 

408 

409 return is_available 

410 

411 async def get_user(self, user_id: str) -> Optional[User]: 

412 """하이브리드 캐시에서 사용자 조회""" 

413 # Redis 우선 시도 

414 if await self._check_redis_available(): 

415 user = await self.redis_cache.get_user(user_id) 

416 if user: 

417 return user 

418 

419 # Redis 실패 시 In-Memory 폴백 

420 return await self.memory_cache.get_user(user_id) 

421 

422 async def set_user(self, user: User, ttl: int | None = None) -> None: 

423 """하이브리드 캐시에 사용자 저장""" 

424 # Redis에 저장 시도 

425 if await self._check_redis_available(): 

426 await self.redis_cache.set_user(user, ttl) 

427 

428 # In-Memory에도 저장 (이중 캐싱) 

429 await self.memory_cache.set_user(user, ttl) 

430 

431 async def invalidate_user(self, user_id: str) -> None: 

432 """하이브리드 캐시에서 사용자 무효화""" 

433 # 양쪽 모두 무효화 

434 if await self._check_redis_available(): 

435 await self.redis_cache.invalidate_user(user_id) 

436 

437 await self.memory_cache.invalidate_user(user_id) 

438 

439 async def clear_all(self) -> None: 

440 """전체 캐시 삭제""" 

441 if await self._check_redis_available(): 

442 await self.redis_cache.clear_all() 

443 

444 await self.memory_cache.clear_all() 

445 

446 async def health_check(self) -> bool: 

447 """캐시 시스템 상태 확인""" 

448 redis_ok = await self.redis_cache.health_check() 

449 memory_ok = await self.memory_cache.health_check() 

450 

451 return redis_ok or memory_ok # 하나라도 사용 가능하면 OK 

452 

453 

454# ============================================================================= 

455# Cache Factory & Singleton 

456# ============================================================================= 

457 

458_user_cache_instance: Optional[BaseUserCache] = None 

459 

460 

461def get_user_cache(redis_url: Optional[str] = None) -> BaseUserCache: 

462 """ 

463 User 캐시 싱글톤 인스턴스 반환 

464 

465 Redis URL이 제공되면 Redis 캐시를 사용하고, 

466 없으면 In-Memory 캐시로 폴백합니다. 

467 

468 Args: 

469 redis_url: Redis 연결 URL (선택) 

470 

471 Returns: 

472 BaseUserCache: 캐시 인스턴스 

473 

474 Example: 

475 cache = get_user_cache() 

476 user = await cache.get_user(user_id) 

477 """ 

478 global _user_cache_instance 

479 

480 if _user_cache_instance is None: 

481 # 환경설정에서 Redis URL 및 캐시 설정 가져오기 

482 key_prefix = "user" 

483 default_ttl = 300 

484 if redis_url is None: 

485 try: 

486 from ..core.config import CommonSettings 

487 

488 settings = CommonSettings() 

489 redis_url = getattr(settings, "REDIS_URL", None) 

490 key_prefix = getattr(settings, "USER_CACHE_KEY_PREFIX", "user") 

491 default_ttl = getattr(settings, "USER_CACHE_TTL_SECONDS", 300) 

492 except Exception: 

493 redis_url = None 

494 # 하이브리드 캐시 생성 (Redis + In-Memory) 

495 _user_cache_instance = HybridUserCache( 

496 redis_url, key_prefix=key_prefix, default_ttl=default_ttl 

497 ) 

498 logger.info( 

499 f"User cache singleton initialized (Hybrid: Redis + In-Memory, prefix='{key_prefix}', ttl={default_ttl}s)" 

500 ) 

501 

502 return _user_cache_instance 

503 

504 

505def reset_user_cache(): 

506 """ 

507 캐시 싱글톤 리셋 (테스트용) 

508 """ 

509 global _user_cache_instance 

510 _user_cache_instance = None 

511 logger.info("User cache singleton reset")