Coverage for session_buddy / memory / migration.py: 73.33%
134 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""
2Schema migration and versioning for conversations/reflections v1 → v2.
4Implements:
5- Version detection and history logging
6- Idempotent creation of v2 schema
7- Best-effort migration preserving ONNX vectors
8- Backup/rollback helpers (filesystem copy)
9"""
11from __future__ import annotations
13import json
14import shutil
15import typing as t
16from contextlib import suppress
17from dataclasses import dataclass
18from datetime import datetime
19from pathlib import Path
21import duckdb
22from session_buddy.memory.schema_v2 import MIGRATION_SQL, SCHEMA_V2_SQL
23from session_buddy.settings import get_database_path
25SCHEMA_META_SQL = """
26CREATE TABLE IF NOT EXISTS schema_meta (
27 key TEXT PRIMARY KEY,
28 value TEXT,
29 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
30);
32CREATE TABLE IF NOT EXISTS schema_migrations (
33 id TEXT PRIMARY KEY,
34 from_version TEXT,
35 to_version TEXT,
36 started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
37 completed_at TIMESTAMP,
38 status TEXT, -- pending|success|failed
39 stats TEXT, -- JSON-encoded stats for portability
40 error TEXT
41);
42"""
45@dataclass(slots=True)
46class MigrationResult:
47 success: bool
48 error: str | None = None
49 stats: dict[str, t.Any] | None = None
50 duration_seconds: float | None = None
53def _connect(db_path: Path) -> duckdb.DuckDBPyConnection:
54 return duckdb.connect(str(db_path), config={"allow_unsigned_extensions": True})
57def _ensure_meta(conn: duckdb.DuckDBPyConnection) -> None:
58 conn.execute(SCHEMA_META_SQL)
61def _get_schema_version(conn: duckdb.DuckDBPyConnection) -> str:
62 """Return 'v2' if v2 tables exist, 'v1' if only legacy tables exist, else 'unknown'."""
63 _ensure_meta(conn)
64 # Try meta table first
65 from contextlib import suppress
67 with suppress(Exception):
68 row = conn.execute(
69 "SELECT value FROM schema_meta WHERE key='schema_version'"
70 ).fetchone()
71 if row and isinstance(row[0], str):
72 return row[0]
74 # Fallback to table existence checks
75 v2_tables = {
76 "conversations_v2",
77 "reflections_v2",
78 "memory_entities",
79 "memory_relationships",
80 "memory_promotions",
81 "memory_access_log",
82 }
83 v2_count = 0
84 for name in v2_tables:
85 try:
86 conn.execute(f"SELECT 1 FROM {name} LIMIT 1")
87 v2_count += 1
88 except Exception:
89 continue
90 if v2_count >= 2: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 return "v2"
93 # Check for legacy v1 tables
94 try:
95 conn.execute("SELECT 1 FROM conversations LIMIT 1")
96 return "v1"
97 except Exception:
98 return "unknown"
101def get_schema_version(db_path: Path | None = None) -> str:
102 path = Path(db_path) if db_path else get_database_path()
103 with _connect(path) as conn:
104 return _get_schema_version(conn)
107def update_schema_version(conn: duckdb.DuckDBPyConnection, version: str) -> None:
108 _ensure_meta(conn)
109 conn.execute(
110 """
111 INSERT INTO schema_meta(key, value, updated_at)
112 VALUES ('schema_version', ?, CURRENT_TIMESTAMP)
113 ON CONFLICT (key) DO UPDATE SET value=excluded.value, updated_at=NOW()
114 """,
115 [version],
116 )
119def create_v2_schema(conn: duckdb.DuckDBPyConnection) -> None:
120 conn.execute(SCHEMA_V2_SQL)
123def count_v1_conversations(conn: duckdb.DuckDBPyConnection) -> int:
124 try:
125 row = conn.execute("SELECT COUNT(*) FROM conversations").fetchone()
126 return int(row[0]) if row else 0
127 except Exception:
128 return 0
131def count_v2_conversations(conn: duckdb.DuckDBPyConnection) -> int:
132 try:
133 row = conn.execute("SELECT COUNT(*) FROM conversations_v2").fetchone()
134 return int(row[0]) if row else 0
135 except Exception:
136 return 0
139def create_backup(backup_dir: Path | None = None) -> Path:
140 """Create a timestamped DB backup and return path to the backup file."""
141 db_path = get_database_path()
142 backup_root = backup_dir or db_path.parent
143 backup_root.mkdir(parents=True, exist_ok=True)
144 ts = datetime.now().strftime("%Y%m%d_%H%M%S")
145 backup_path = backup_root / f"backup_v1_{ts}.duckdb"
146 shutil.copy2(db_path, backup_path)
147 return backup_path
150def restore_backup(backup_path: Path) -> None:
151 db_path = get_database_path()
152 shutil.copy2(backup_path, db_path)
155def needs_migration(db_path: Path | None = None) -> bool:
156 version = get_schema_version(db_path)
157 return version == "v1"
160def migrate_v1_to_v2(
161 *, db_path: Path | None = None, dry_run: bool = False
162) -> MigrationResult:
163 """Migrate legacy v1 data to v2 schema.
165 - Creates v2 schema if missing
166 - Best-effort categorization via MIGRATION_SQL
167 - Preserves existing embeddings
168 - Updates schema_meta on success
169 """
170 start = datetime.now()
171 path = Path(db_path) if db_path else get_database_path()
173 with _connect(path) as conn:
174 _ensure_meta(conn)
175 current = _get_schema_version(conn)
176 if current == "v2": 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 return MigrationResult(
178 success=True,
179 stats={"skipped": True, "reason": "already_v2"},
180 duration_seconds=0.0,
181 )
183 mig_id = f"mig_{start.strftime('%Y%m%d_%H%M%S_%f')}"
185 if dry_run:
186 return _handle_dry_run(conn, start)
187 try:
188 return _perform_migration(conn, current, mig_id, start)
189 except Exception as e:
190 return _handle_migration_exception(conn, mig_id, start, e)
193def _handle_dry_run(
194 conn: duckdb.DuckDBPyConnection, start: datetime
195) -> MigrationResult:
196 """Handle dry run of migration."""
197 v1_count = count_v1_conversations(conn)
198 stats = {"preview": True, "would_migrate": v1_count}
199 return MigrationResult(
200 success=True,
201 stats=stats,
202 duration_seconds=(datetime.now() - start).total_seconds(),
203 )
206def _perform_migration(
207 conn: duckdb.DuckDBPyConnection, current: str, mig_id: str, start: datetime
208) -> MigrationResult:
209 """Perform the actual migration."""
210 # Record migration row (pending) for real migration
211 conn.execute(
212 """
213 INSERT INTO schema_migrations(id, from_version, to_version, started_at, status)
214 VALUES (?, ?, ?, CURRENT_TIMESTAMP, 'pending')
215 """,
216 [mig_id, current, "v2"],
217 )
219 # Create v2 schema
220 create_v2_schema(conn)
222 # Execute data migration (best-effort); run statements separately
223 for stmt in MIGRATION_SQL.split(";"):
224 sql = stmt.strip()
225 if not sql:
226 continue
227 conn.execute(sql)
229 v1_count = count_v1_conversations(conn)
230 v2_count = count_v2_conversations(conn)
232 if v2_count >= v1_count: 232 ↛ 234line 232 didn't jump to line 234 because the condition on line 232 was always true
233 return _handle_migration_success(conn, mig_id, start, v1_count, v2_count)
234 return _handle_migration_failure(conn, mig_id, start, v1_count, v2_count)
237def _handle_migration_success(
238 conn: duckdb.DuckDBPyConnection,
239 mig_id: str,
240 start: datetime,
241 v1_count: int,
242 v2_count: int,
243) -> MigrationResult:
244 """Handle successful migration completion."""
245 update_schema_version(conn, "v2")
246 stats = {"migrated": v2_count, "source": v1_count}
247 conn.execute(
248 "UPDATE schema_migrations SET status='success', completed_at=CURRENT_TIMESTAMP, stats=? WHERE id=?",
249 [json.dumps(stats), mig_id],
250 )
251 return MigrationResult(
252 success=True,
253 stats=stats,
254 duration_seconds=(datetime.now() - start).total_seconds(),
255 )
258def _handle_migration_failure(
259 conn: duckdb.DuckDBPyConnection,
260 mig_id: str,
261 start: datetime,
262 v1_count: int,
263 v2_count: int,
264) -> MigrationResult:
265 """Handle migration failure."""
266 err = f"Missing data after migration: v1={v1_count}, v2={v2_count}"
267 conn.execute(
268 "UPDATE schema_migrations SET status='failed', completed_at=CURRENT_TIMESTAMP, error=? WHERE id=?",
269 [err, mig_id],
270 )
271 return MigrationResult(
272 success=False,
273 error=err,
274 stats={"v1": v1_count, "v2": v2_count},
275 duration_seconds=(datetime.now() - start).total_seconds(),
276 )
279def _handle_migration_exception(
280 conn: duckdb.DuckDBPyConnection, mig_id: str, start: datetime, exception: Exception
281) -> MigrationResult:
282 """Handle migration exception."""
283 with suppress(Exception):
284 conn.execute(
285 "UPDATE schema_migrations SET status='failed', completed_at=CURRENT_TIMESTAMP, error=? WHERE id=?",
286 [str(exception), mig_id],
287 )
288 return MigrationResult(
289 success=False,
290 error=str(exception),
291 duration_seconds=(datetime.now() - start).total_seconds(),
292 )
295def get_migration_status(db_path: Path | None = None) -> dict[str, t.Any]:
296 path = Path(db_path) if db_path else get_database_path()
297 with _connect(path) as conn:
298 _ensure_meta(conn)
299 # Basic stats
300 try:
301 mig_history = conn.execute(
302 "SELECT id, from_version, to_version, started_at, completed_at, status FROM schema_migrations ORDER BY started_at DESC LIMIT 10"
303 ).fetchall()
304 except Exception:
305 mig_history = []
307 return {
308 "current_version": _get_schema_version(conn),
309 "migration_history": [
310 {
311 "id": r[0],
312 "from": r[1],
313 "to": r[2],
314 "started_at": str(r[3]),
315 "completed_at": str(r[4]) if r[4] else None,
316 "status": r[5],
317 }
318 for r in mig_history
319 ],
320 "counts": {
321 "v1_conversations": count_v1_conversations(conn),
322 "v2_conversations": count_v2_conversations(conn),
323 },
324 }
327__all__ = [
328 "MigrationResult",
329 "create_backup",
330 "create_v2_schema",
331 "get_migration_status",
332 "get_schema_version",
333 "migrate_v1_to_v2",
334 "needs_migration",
335 "restore_backup",
336]