Coverage for session_buddy / memory / migration.py: 73.33%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1""" 

2Schema migration and versioning for conversations/reflections v1 → v2. 

3 

4Implements: 

5- Version detection and history logging 

6- Idempotent creation of v2 schema 

7- Best-effort migration preserving ONNX vectors 

8- Backup/rollback helpers (filesystem copy) 

9""" 

10 

11from __future__ import annotations 

12 

13import json 

14import shutil 

15import typing as t 

16from contextlib import suppress 

17from dataclasses import dataclass 

18from datetime import datetime 

19from pathlib import Path 

20 

21import duckdb 

22from session_buddy.memory.schema_v2 import MIGRATION_SQL, SCHEMA_V2_SQL 

23from session_buddy.settings import get_database_path 

24 

25SCHEMA_META_SQL = """ 

26CREATE TABLE IF NOT EXISTS schema_meta ( 

27 key TEXT PRIMARY KEY, 

28 value TEXT, 

29 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

30); 

31 

32CREATE TABLE IF NOT EXISTS schema_migrations ( 

33 id TEXT PRIMARY KEY, 

34 from_version TEXT, 

35 to_version TEXT, 

36 started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

37 completed_at TIMESTAMP, 

38 status TEXT, -- pending|success|failed 

39 stats TEXT, -- JSON-encoded stats for portability 

40 error TEXT 

41); 

42""" 

43 

44 

45@dataclass(slots=True) 

46class MigrationResult: 

47 success: bool 

48 error: str | None = None 

49 stats: dict[str, t.Any] | None = None 

50 duration_seconds: float | None = None 

51 

52 

53def _connect(db_path: Path) -> duckdb.DuckDBPyConnection: 

54 return duckdb.connect(str(db_path), config={"allow_unsigned_extensions": True}) 

55 

56 

57def _ensure_meta(conn: duckdb.DuckDBPyConnection) -> None: 

58 conn.execute(SCHEMA_META_SQL) 

59 

60 

61def _get_schema_version(conn: duckdb.DuckDBPyConnection) -> str: 

62 """Return 'v2' if v2 tables exist, 'v1' if only legacy tables exist, else 'unknown'.""" 

63 _ensure_meta(conn) 

64 # Try meta table first 

65 from contextlib import suppress 

66 

67 with suppress(Exception): 

68 row = conn.execute( 

69 "SELECT value FROM schema_meta WHERE key='schema_version'" 

70 ).fetchone() 

71 if row and isinstance(row[0], str): 

72 return row[0] 

73 

74 # Fallback to table existence checks 

75 v2_tables = { 

76 "conversations_v2", 

77 "reflections_v2", 

78 "memory_entities", 

79 "memory_relationships", 

80 "memory_promotions", 

81 "memory_access_log", 

82 } 

83 v2_count = 0 

84 for name in v2_tables: 

85 try: 

86 conn.execute(f"SELECT 1 FROM {name} LIMIT 1") 

87 v2_count += 1 

88 except Exception: 

89 continue 

90 if v2_count >= 2: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 return "v2" 

92 

93 # Check for legacy v1 tables 

94 try: 

95 conn.execute("SELECT 1 FROM conversations LIMIT 1") 

96 return "v1" 

97 except Exception: 

98 return "unknown" 

99 

100 

101def get_schema_version(db_path: Path | None = None) -> str: 

102 path = Path(db_path) if db_path else get_database_path() 

103 with _connect(path) as conn: 

104 return _get_schema_version(conn) 

105 

106 

107def update_schema_version(conn: duckdb.DuckDBPyConnection, version: str) -> None: 

108 _ensure_meta(conn) 

109 conn.execute( 

110 """ 

111 INSERT INTO schema_meta(key, value, updated_at) 

112 VALUES ('schema_version', ?, CURRENT_TIMESTAMP) 

113 ON CONFLICT (key) DO UPDATE SET value=excluded.value, updated_at=NOW() 

114 """, 

115 [version], 

116 ) 

117 

118 

119def create_v2_schema(conn: duckdb.DuckDBPyConnection) -> None: 

120 conn.execute(SCHEMA_V2_SQL) 

121 

122 

123def count_v1_conversations(conn: duckdb.DuckDBPyConnection) -> int: 

124 try: 

125 row = conn.execute("SELECT COUNT(*) FROM conversations").fetchone() 

126 return int(row[0]) if row else 0 

127 except Exception: 

128 return 0 

129 

130 

131def count_v2_conversations(conn: duckdb.DuckDBPyConnection) -> int: 

132 try: 

133 row = conn.execute("SELECT COUNT(*) FROM conversations_v2").fetchone() 

134 return int(row[0]) if row else 0 

135 except Exception: 

136 return 0 

137 

138 

139def create_backup(backup_dir: Path | None = None) -> Path: 

140 """Create a timestamped DB backup and return path to the backup file.""" 

141 db_path = get_database_path() 

142 backup_root = backup_dir or db_path.parent 

143 backup_root.mkdir(parents=True, exist_ok=True) 

144 ts = datetime.now().strftime("%Y%m%d_%H%M%S") 

145 backup_path = backup_root / f"backup_v1_{ts}.duckdb" 

146 shutil.copy2(db_path, backup_path) 

147 return backup_path 

148 

149 

150def restore_backup(backup_path: Path) -> None: 

151 db_path = get_database_path() 

152 shutil.copy2(backup_path, db_path) 

153 

154 

155def needs_migration(db_path: Path | None = None) -> bool: 

156 version = get_schema_version(db_path) 

157 return version == "v1" 

158 

159 

160def migrate_v1_to_v2( 

161 *, db_path: Path | None = None, dry_run: bool = False 

162) -> MigrationResult: 

163 """Migrate legacy v1 data to v2 schema. 

164 

165 - Creates v2 schema if missing 

166 - Best-effort categorization via MIGRATION_SQL 

167 - Preserves existing embeddings 

168 - Updates schema_meta on success 

169 """ 

170 start = datetime.now() 

171 path = Path(db_path) if db_path else get_database_path() 

172 

173 with _connect(path) as conn: 

174 _ensure_meta(conn) 

175 current = _get_schema_version(conn) 

176 if current == "v2": 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 return MigrationResult( 

178 success=True, 

179 stats={"skipped": True, "reason": "already_v2"}, 

180 duration_seconds=0.0, 

181 ) 

182 

183 mig_id = f"mig_{start.strftime('%Y%m%d_%H%M%S_%f')}" 

184 

185 if dry_run: 

186 return _handle_dry_run(conn, start) 

187 try: 

188 return _perform_migration(conn, current, mig_id, start) 

189 except Exception as e: 

190 return _handle_migration_exception(conn, mig_id, start, e) 

191 

192 

193def _handle_dry_run( 

194 conn: duckdb.DuckDBPyConnection, start: datetime 

195) -> MigrationResult: 

196 """Handle dry run of migration.""" 

197 v1_count = count_v1_conversations(conn) 

198 stats = {"preview": True, "would_migrate": v1_count} 

199 return MigrationResult( 

200 success=True, 

201 stats=stats, 

202 duration_seconds=(datetime.now() - start).total_seconds(), 

203 ) 

204 

205 

206def _perform_migration( 

207 conn: duckdb.DuckDBPyConnection, current: str, mig_id: str, start: datetime 

208) -> MigrationResult: 

209 """Perform the actual migration.""" 

210 # Record migration row (pending) for real migration 

211 conn.execute( 

212 """ 

213 INSERT INTO schema_migrations(id, from_version, to_version, started_at, status) 

214 VALUES (?, ?, ?, CURRENT_TIMESTAMP, 'pending') 

215 """, 

216 [mig_id, current, "v2"], 

217 ) 

218 

219 # Create v2 schema 

220 create_v2_schema(conn) 

221 

222 # Execute data migration (best-effort); run statements separately 

223 for stmt in MIGRATION_SQL.split(";"): 

224 sql = stmt.strip() 

225 if not sql: 

226 continue 

227 conn.execute(sql) 

228 

229 v1_count = count_v1_conversations(conn) 

230 v2_count = count_v2_conversations(conn) 

231 

232 if v2_count >= v1_count: 232 ↛ 234line 232 didn't jump to line 234 because the condition on line 232 was always true

233 return _handle_migration_success(conn, mig_id, start, v1_count, v2_count) 

234 return _handle_migration_failure(conn, mig_id, start, v1_count, v2_count) 

235 

236 

237def _handle_migration_success( 

238 conn: duckdb.DuckDBPyConnection, 

239 mig_id: str, 

240 start: datetime, 

241 v1_count: int, 

242 v2_count: int, 

243) -> MigrationResult: 

244 """Handle successful migration completion.""" 

245 update_schema_version(conn, "v2") 

246 stats = {"migrated": v2_count, "source": v1_count} 

247 conn.execute( 

248 "UPDATE schema_migrations SET status='success', completed_at=CURRENT_TIMESTAMP, stats=? WHERE id=?", 

249 [json.dumps(stats), mig_id], 

250 ) 

251 return MigrationResult( 

252 success=True, 

253 stats=stats, 

254 duration_seconds=(datetime.now() - start).total_seconds(), 

255 ) 

256 

257 

258def _handle_migration_failure( 

259 conn: duckdb.DuckDBPyConnection, 

260 mig_id: str, 

261 start: datetime, 

262 v1_count: int, 

263 v2_count: int, 

264) -> MigrationResult: 

265 """Handle migration failure.""" 

266 err = f"Missing data after migration: v1={v1_count}, v2={v2_count}" 

267 conn.execute( 

268 "UPDATE schema_migrations SET status='failed', completed_at=CURRENT_TIMESTAMP, error=? WHERE id=?", 

269 [err, mig_id], 

270 ) 

271 return MigrationResult( 

272 success=False, 

273 error=err, 

274 stats={"v1": v1_count, "v2": v2_count}, 

275 duration_seconds=(datetime.now() - start).total_seconds(), 

276 ) 

277 

278 

279def _handle_migration_exception( 

280 conn: duckdb.DuckDBPyConnection, mig_id: str, start: datetime, exception: Exception 

281) -> MigrationResult: 

282 """Handle migration exception.""" 

283 with suppress(Exception): 

284 conn.execute( 

285 "UPDATE schema_migrations SET status='failed', completed_at=CURRENT_TIMESTAMP, error=? WHERE id=?", 

286 [str(exception), mig_id], 

287 ) 

288 return MigrationResult( 

289 success=False, 

290 error=str(exception), 

291 duration_seconds=(datetime.now() - start).total_seconds(), 

292 ) 

293 

294 

295def get_migration_status(db_path: Path | None = None) -> dict[str, t.Any]: 

296 path = Path(db_path) if db_path else get_database_path() 

297 with _connect(path) as conn: 

298 _ensure_meta(conn) 

299 # Basic stats 

300 try: 

301 mig_history = conn.execute( 

302 "SELECT id, from_version, to_version, started_at, completed_at, status FROM schema_migrations ORDER BY started_at DESC LIMIT 10" 

303 ).fetchall() 

304 except Exception: 

305 mig_history = [] 

306 

307 return { 

308 "current_version": _get_schema_version(conn), 

309 "migration_history": [ 

310 { 

311 "id": r[0], 

312 "from": r[1], 

313 "to": r[2], 

314 "started_at": str(r[3]), 

315 "completed_at": str(r[4]) if r[4] else None, 

316 "status": r[5], 

317 } 

318 for r in mig_history 

319 ], 

320 "counts": { 

321 "v1_conversations": count_v1_conversations(conn), 

322 "v2_conversations": count_v2_conversations(conn), 

323 }, 

324 } 

325 

326 

327__all__ = [ 

328 "MigrationResult", 

329 "create_backup", 

330 "create_v2_schema", 

331 "get_migration_status", 

332 "get_schema_version", 

333 "migrate_v1_to_v2", 

334 "needs_migration", 

335 "restore_backup", 

336]