Coverage for src / mysingle / database / duckdb_manager.py: 0%
112 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""
2DuckDB Database Manager - Common Base Class
3모든 서비스에서 공통으로 사용하는 DuckDB 관리 클래스
4"""
6import json
7import uuid
8from datetime import UTC, datetime
9from pathlib import Path
10from typing import Any
12import duckdb
14from mysingle.core.logging import get_structured_logger
16logger = get_structured_logger(__name__)
19class BaseDuckDBManager:
20 """DuckDB 데이터베이스 관리 기본 클래스"""
22 def __init__(self, db_path: str):
23 """
24 Args:
25 db_path: DuckDB 파일 경로
26 """
27 self.db_path = db_path
28 self.connection: duckdb.DuckDBPyConnection | None = None
30 # 데이터베이스 디렉토리 생성
31 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
33 @property
34 def duckdb_conn(self) -> duckdb.DuckDBPyConnection:
35 """DuckDB 연결 객체 반환"""
36 if self.connection is None:
37 self.connect()
38 if self.connection is None:
39 raise RuntimeError("DuckDB connection not established")
40 return self.connection
42 def __enter__(self):
43 """컨텍스트 매니저 진입"""
44 self.connect()
45 return self
47 def __exit__(self, exc_type, exc_val, exc_tb):
48 """컨텍스트 매니저 종료"""
49 self.close()
51 def connect(self) -> None:
52 """데이터베이스 연결"""
53 if self.connection is None:
54 logger.info(f"Connecting to DuckDB at: {self.db_path}")
55 try:
56 # 기존 연결이 있다면 종료
57 self.close()
58 # 새 연결 생성
59 self.connection = duckdb.connect(self.db_path)
60 self._create_tables()
61 logger.info(f"✅ DuckDB connected successfully at: {self.db_path}")
62 except Exception as e:
63 logger.error(f"❌ Failed to connect to DuckDB: {e}")
64 # 파일이 잠겨있다면 메모리 DB로 폴백
65 if "lock" in str(e).lower():
66 logger.warning("🔄 Falling back to in-memory database")
67 self.connection = duckdb.connect(":memory:")
68 self._create_tables()
69 logger.info("✅ DuckDB connected to in-memory database")
70 else:
71 raise
73 def close(self) -> None:
74 """데이터베이스 연결 종료"""
75 if self.connection:
76 try:
77 self.connection.close()
78 logger.info("🔒 DuckDB connection closed")
79 except Exception as e:
80 logger.warning(f"⚠️ Error closing DuckDB connection: {e}")
81 finally:
82 self.connection = None
84 def _create_tables(self) -> None:
85 """테이블 생성 - 서브클래스에서 오버라이드"""
86 raise NotImplementedError("Subclass must implement _create_tables()")
88 def _ensure_connected(self) -> None:
89 """연결이 없으면 자동으로 연결"""
90 if self.connection is None:
91 self.connect()
93 def _make_json_serializable(self, obj) -> Any:
94 """객체를 JSON 직렬화 가능하도록 변환"""
95 import json
96 from datetime import datetime
97 from decimal import Decimal
99 if isinstance(obj, dict):
100 return {k: self._make_json_serializable(v) for k, v in obj.items()}
101 elif isinstance(obj, list):
102 return [self._make_json_serializable(item) for item in obj]
103 elif isinstance(obj, datetime):
104 return obj.isoformat()
105 elif isinstance(obj, Decimal):
106 return float(obj)
107 elif hasattr(obj, "model_dump"): # Pydantic v2
108 return self._make_json_serializable(obj.model_dump())
109 elif hasattr(obj, "dict"): # Pydantic v1
110 return self._make_json_serializable(obj.dict())
111 else:
112 # 기본 JSON 직렬화 시도
113 try:
114 json.dumps(obj)
115 return obj
116 except (TypeError, ValueError):
117 return str(obj)
119 # ===== 공통 캐시 메서드들 =====
121 def store_cache_data(
122 self, cache_key: str, data: list[dict], table_name: str = "cache_data"
123 ) -> bool:
124 """DuckDB 캐시에 데이터 저장"""
125 self._ensure_connected()
126 if not self.connection:
127 return False
129 try:
130 # 테이블이 없으면 생성
131 self._create_cache_table(table_name)
133 # 기존 데이터 삭제
134 self.connection.execute(
135 f"DELETE FROM {table_name} WHERE cache_key = ?", [cache_key]
136 )
138 # 새 데이터 삽입
139 now = datetime.now(UTC)
140 record_id = str(uuid.uuid4())
142 self.connection.execute(
143 f"""
144 INSERT INTO {table_name} (id, cache_key, data_json, created_at, updated_at)
145 VALUES (?, ?, ?, ?, ?)
146 """,
147 [record_id, cache_key, json.dumps(data), now, now],
148 )
150 logger.info(f"캐시 데이터 저장 완료: {cache_key}")
151 return True
153 except Exception as e:
154 logger.error(f"캐시 데이터 저장 실패: {e}")
155 return False
157 def get_cache_data(
158 self, cache_key: str, table_name: str = "cache_data", ttl_hours: int = 24
159 ) -> list[dict] | None:
160 """DuckDB 캐시에서 데이터 조회"""
161 self._ensure_connected()
162 if not self.connection:
163 return None
165 try:
166 # TTL 체크를 위한 시간 계산
167 cutoff_time = datetime.now(UTC).timestamp() - (ttl_hours * 3600)
169 result = self.connection.execute(
170 f"""
171 SELECT data_json, updated_at
172 FROM {table_name}
173 WHERE cache_key = ?
174 AND EXTRACT(EPOCH FROM updated_at) > ?
175 """,
176 [cache_key, cutoff_time],
177 ).fetchone()
179 if result:
180 data_json, _ = result
181 parsed_data: list[dict[Any, Any]] = json.loads(data_json) # type: ignore[assignment]
182 return parsed_data
183 else:
184 return None
186 except Exception as e:
187 logger.error(f"캐시 데이터 조회 실패: {e}")
188 return None
190 def _create_cache_table(self, table_name: str) -> None:
191 """캐시 테이블 생성"""
192 self._ensure_connected()
193 if not self.connection:
194 return
196 self.connection.execute(
197 f"""
198 CREATE TABLE IF NOT EXISTS {table_name} (
199 id VARCHAR PRIMARY KEY,
200 cache_key VARCHAR NOT NULL,
201 data_json TEXT NOT NULL,
202 created_at TIMESTAMP NOT NULL,
203 updated_at TIMESTAMP NOT NULL
204 )
205 """
206 )
208 # 인덱스 생성
209 self.connection.execute(
210 f"""
211 CREATE INDEX IF NOT EXISTS idx_{table_name}_cache_key
212 ON {table_name}(cache_key)
213 """
214 )
216 self.connection.execute(
217 f"""
218 CREATE INDEX IF NOT EXISTS idx_{table_name}_updated_at
219 ON {table_name}(updated_at)
220 """
221 )