Coverage for src / mysingle / database / duckdb_manager.py: 0%

112 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1""" 

2DuckDB Database Manager - Common Base Class 

3모든 서비스에서 공통으로 사용하는 DuckDB 관리 클래스 

4""" 

5 

6import json 

7import uuid 

8from datetime import UTC, datetime 

9from pathlib import Path 

10from typing import Any 

11 

12import duckdb 

13 

14from mysingle.core.logging import get_structured_logger 

15 

16logger = get_structured_logger(__name__) 

17 

18 

19class BaseDuckDBManager: 

20 """DuckDB 데이터베이스 관리 기본 클래스""" 

21 

22 def __init__(self, db_path: str): 

23 """ 

24 Args: 

25 db_path: DuckDB 파일 경로 

26 """ 

27 self.db_path = db_path 

28 self.connection: duckdb.DuckDBPyConnection | None = None 

29 

30 # 데이터베이스 디렉토리 생성 

31 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 

32 

33 @property 

34 def duckdb_conn(self) -> duckdb.DuckDBPyConnection: 

35 """DuckDB 연결 객체 반환""" 

36 if self.connection is None: 

37 self.connect() 

38 if self.connection is None: 

39 raise RuntimeError("DuckDB connection not established") 

40 return self.connection 

41 

42 def __enter__(self): 

43 """컨텍스트 매니저 진입""" 

44 self.connect() 

45 return self 

46 

47 def __exit__(self, exc_type, exc_val, exc_tb): 

48 """컨텍스트 매니저 종료""" 

49 self.close() 

50 

51 def connect(self) -> None: 

52 """데이터베이스 연결""" 

53 if self.connection is None: 

54 logger.info(f"Connecting to DuckDB at: {self.db_path}") 

55 try: 

56 # 기존 연결이 있다면 종료 

57 self.close() 

58 # 새 연결 생성 

59 self.connection = duckdb.connect(self.db_path) 

60 self._create_tables() 

61 logger.info(f"✅ DuckDB connected successfully at: {self.db_path}") 

62 except Exception as e: 

63 logger.error(f"❌ Failed to connect to DuckDB: {e}") 

64 # 파일이 잠겨있다면 메모리 DB로 폴백 

65 if "lock" in str(e).lower(): 

66 logger.warning("🔄 Falling back to in-memory database") 

67 self.connection = duckdb.connect(":memory:") 

68 self._create_tables() 

69 logger.info("✅ DuckDB connected to in-memory database") 

70 else: 

71 raise 

72 

73 def close(self) -> None: 

74 """데이터베이스 연결 종료""" 

75 if self.connection: 

76 try: 

77 self.connection.close() 

78 logger.info("🔒 DuckDB connection closed") 

79 except Exception as e: 

80 logger.warning(f"⚠️ Error closing DuckDB connection: {e}") 

81 finally: 

82 self.connection = None 

83 

84 def _create_tables(self) -> None: 

85 """테이블 생성 - 서브클래스에서 오버라이드""" 

86 raise NotImplementedError("Subclass must implement _create_tables()") 

87 

88 def _ensure_connected(self) -> None: 

89 """연결이 없으면 자동으로 연결""" 

90 if self.connection is None: 

91 self.connect() 

92 

93 def _make_json_serializable(self, obj) -> Any: 

94 """객체를 JSON 직렬화 가능하도록 변환""" 

95 import json 

96 from datetime import datetime 

97 from decimal import Decimal 

98 

99 if isinstance(obj, dict): 

100 return {k: self._make_json_serializable(v) for k, v in obj.items()} 

101 elif isinstance(obj, list): 

102 return [self._make_json_serializable(item) for item in obj] 

103 elif isinstance(obj, datetime): 

104 return obj.isoformat() 

105 elif isinstance(obj, Decimal): 

106 return float(obj) 

107 elif hasattr(obj, "model_dump"): # Pydantic v2 

108 return self._make_json_serializable(obj.model_dump()) 

109 elif hasattr(obj, "dict"): # Pydantic v1 

110 return self._make_json_serializable(obj.dict()) 

111 else: 

112 # 기본 JSON 직렬화 시도 

113 try: 

114 json.dumps(obj) 

115 return obj 

116 except (TypeError, ValueError): 

117 return str(obj) 

118 

119 # ===== 공통 캐시 메서드들 ===== 

120 

121 def store_cache_data( 

122 self, cache_key: str, data: list[dict], table_name: str = "cache_data" 

123 ) -> bool: 

124 """DuckDB 캐시에 데이터 저장""" 

125 self._ensure_connected() 

126 if not self.connection: 

127 return False 

128 

129 try: 

130 # 테이블이 없으면 생성 

131 self._create_cache_table(table_name) 

132 

133 # 기존 데이터 삭제 

134 self.connection.execute( 

135 f"DELETE FROM {table_name} WHERE cache_key = ?", [cache_key] 

136 ) 

137 

138 # 새 데이터 삽입 

139 now = datetime.now(UTC) 

140 record_id = str(uuid.uuid4()) 

141 

142 self.connection.execute( 

143 f""" 

144 INSERT INTO {table_name} (id, cache_key, data_json, created_at, updated_at) 

145 VALUES (?, ?, ?, ?, ?) 

146 """, 

147 [record_id, cache_key, json.dumps(data), now, now], 

148 ) 

149 

150 logger.info(f"캐시 데이터 저장 완료: {cache_key}") 

151 return True 

152 

153 except Exception as e: 

154 logger.error(f"캐시 데이터 저장 실패: {e}") 

155 return False 

156 

157 def get_cache_data( 

158 self, cache_key: str, table_name: str = "cache_data", ttl_hours: int = 24 

159 ) -> list[dict] | None: 

160 """DuckDB 캐시에서 데이터 조회""" 

161 self._ensure_connected() 

162 if not self.connection: 

163 return None 

164 

165 try: 

166 # TTL 체크를 위한 시간 계산 

167 cutoff_time = datetime.now(UTC).timestamp() - (ttl_hours * 3600) 

168 

169 result = self.connection.execute( 

170 f""" 

171 SELECT data_json, updated_at 

172 FROM {table_name} 

173 WHERE cache_key = ? 

174 AND EXTRACT(EPOCH FROM updated_at) > ? 

175 """, 

176 [cache_key, cutoff_time], 

177 ).fetchone() 

178 

179 if result: 

180 data_json, _ = result 

181 parsed_data: list[dict[Any, Any]] = json.loads(data_json) # type: ignore[assignment] 

182 return parsed_data 

183 else: 

184 return None 

185 

186 except Exception as e: 

187 logger.error(f"캐시 데이터 조회 실패: {e}") 

188 return None 

189 

190 def _create_cache_table(self, table_name: str) -> None: 

191 """캐시 테이블 생성""" 

192 self._ensure_connected() 

193 if not self.connection: 

194 return 

195 

196 self.connection.execute( 

197 f""" 

198 CREATE TABLE IF NOT EXISTS {table_name} ( 

199 id VARCHAR PRIMARY KEY, 

200 cache_key VARCHAR NOT NULL, 

201 data_json TEXT NOT NULL, 

202 created_at TIMESTAMP NOT NULL, 

203 updated_at TIMESTAMP NOT NULL 

204 ) 

205 """ 

206 ) 

207 

208 # 인덱스 생성 

209 self.connection.execute( 

210 f""" 

211 CREATE INDEX IF NOT EXISTS idx_{table_name}_cache_key 

212 ON {table_name}(cache_key) 

213 """ 

214 ) 

215 

216 self.connection.execute( 

217 f""" 

218 CREATE INDEX IF NOT EXISTS idx_{table_name}_updated_at 

219 ON {table_name}(updated_at) 

220 """ 

221 )