Coverage for src \ truenex_memory \ core \ migration.py: 93%
82 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Local schema migration support.
3Provides safe, idempotent migration primitives with automatic pre-migration
4database backup, backup listing, and local restore with a safety backup before
5overwrite.
6"""
8from __future__ import annotations
10from datetime import datetime, timezone
11from pathlib import Path
12import shutil
13import sqlite3
15from truenex_memory.release.version import DB_SCHEMA_VERSION
16from truenex_memory.store.sqlite import connect, initialize_schema, apply_column_upgrades
19MigrationStatus = dict[str, object]
20MigrationResult = dict[str, object]
21BackupEntry = dict[str, object]
24def get_current_schema_version(conn: sqlite3.Connection) -> str:
25 """Return the highest schema version applied in the database.
27 Returns ``"0"`` when no migrations have been applied (missing table or row).
28 """
29 try:
30 row = conn.execute(
31 "SELECT version FROM schema_migrations ORDER BY CAST(version AS INTEGER) DESC LIMIT 1"
32 ).fetchone()
33 except sqlite3.OperationalError:
34 return "0"
35 return row["version"] if row else "0"
38def get_latest_schema_version() -> str:
39 """Return the schema version defined in the current code."""
40 return DB_SCHEMA_VERSION
43def migration_status(db_path: Path) -> MigrationStatus:
44 """Return a dict with *current_version*, *latest_version*, and *pending*.
46 Does **not** create the database file when it does not exist.
47 """
48 if not db_path.exists():
49 return {
50 "current_version": "0",
51 "latest_version": get_latest_schema_version(),
52 "pending": True,
53 }
55 with connect(db_path) as conn:
56 current = get_current_schema_version(conn)
58 return {
59 "current_version": current,
60 "latest_version": get_latest_schema_version(),
61 "pending": current != get_latest_schema_version(),
62 }
65def backup_database(db_path: Path, backups_dir: Path) -> Path | None:
66 """Copy the database file into *backups_dir* with a UTC timestamp suffix.
68 Returns the backup path, or ``None`` when *db_path* does not exist.
69 """
70 if not db_path.exists():
71 return None
73 backups_dir.mkdir(parents=True, exist_ok=True)
74 timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
75 backup_name = f"{db_path.stem}_{timestamp}{db_path.suffix}"
76 backup_path = backups_dir / backup_name
77 shutil.copy2(db_path, backup_path)
78 return backup_path
81def migrate_apply(db_path: Path, backups_dir: Path) -> MigrationResult:
82 """Apply pending schema migrations.
84 1. Checks whether migrations are pending.
85 2. Creates a timestamped backup of the database (when it exists) **before**
86 applying any changes.
87 3. Runs ``initialize_schema`` idempotently (``CREATE TABLE IF NOT EXISTS``,
88 ``INSERT OR IGNORE`` for ``schema_migrations``).
90 Returns a JSON-safe dict with previous/current/latest versions,
91 *backup_path* (``None`` when no backup was taken), and *applied*.
92 """
93 before = migration_status(db_path)
94 if not before["pending"]:
95 return {
96 **before,
97 "previous_version": before["current_version"],
98 "backup_path": None,
99 "applied": False,
100 }
102 backup_path = backup_database(db_path, backups_dir) if db_path.exists() else None
104 with connect(db_path) as conn:
105 initialize_schema(conn)
106 apply_column_upgrades(conn)
108 after = migration_status(db_path)
109 return {
110 **after,
111 "previous_version": before["current_version"],
112 "backup_path": str(backup_path) if backup_path else None,
113 "applied": True,
114 }
117def list_backups(backups_dir: Path) -> list[BackupEntry]:
118 """Return a list of migration backups sorted newest-first.
120 Each entry is a JSON-safe dict with *filename*, *path*, *size_bytes*,
121 and *created* (ISO-8601 UTC timestamp from the filesystem).
122 """
123 if not backups_dir.exists():
124 return []
126 entries: list[BackupEntry] = []
127 for f in sorted(backups_dir.iterdir(), key=lambda p: p.name, reverse=True):
128 if not f.is_file() or f.suffix != ".db":
129 continue
130 stat = f.stat()
131 entries.append(
132 {
133 "filename": f.name,
134 "path": str(f.resolve()),
135 "size_bytes": stat.st_size,
136 "created": datetime.fromtimestamp(
137 stat.st_ctime, tz=timezone.utc
138 ).isoformat(),
139 }
140 )
141 return entries
144def restore_backup(
145 db_path: Path, backups_dir: Path, backup_filename: str
146) -> MigrationResult:
147 """Restore *backup_filename* from *backups_dir* onto *db_path*.
149 1. Resolves the backup path and validates it stays inside *backups_dir*
150 (path-traversal rejection).
151 2. Confirms the backup file exists.
152 3. Creates a pre-restore safety backup of the current database (if it
153 exists) before overwriting.
154 4. Copies the backup file over the active database.
155 5. Verifies the restored database is readable.
157 Returns a JSON-safe dict with *restored*, *backup_filename*, *db_path*,
158 *safety_backup_path* (``None`` when no safety backup was taken), and
159 the *current_version* read from the restored database.
160 """
161 backup_name = Path(backup_filename)
162 if backup_name.name != backup_filename:
163 raise ValueError(f"Backup name must be a filename: {backup_filename!r}")
164 if backup_name.suffix != ".db":
165 raise ValueError(f"Backup must be a .db file: {backup_filename!r}")
167 backups_dir = backups_dir.resolve()
168 backup_path = (backups_dir / backup_filename).resolve()
169 try:
170 backup_path.relative_to(backups_dir)
171 except ValueError as exc:
172 raise ValueError(f"Backup filename escapes backups_dir: {backup_filename!r}") from exc
174 if not backup_path.exists():
175 raise FileNotFoundError(f"Backup not found: {backup_path}")
176 if not backup_path.is_file():
177 raise ValueError(f"Backup is not a file: {backup_path}")
179 # Pre-restore safety backup
180 safety_backup_path: str | None = None
181 if db_path.exists():
182 safety = backup_database(db_path, backups_dir)
183 if safety is not None:
184 safety_backup_path = str(safety.resolve())
186 db_path.parent.mkdir(parents=True, exist_ok=True)
187 shutil.copy2(backup_path, db_path)
189 # Verify the restored database is readable
190 try:
191 with connect(db_path) as conn:
192 current = get_current_schema_version(conn)
193 except Exception as exc:
194 raise RuntimeError(f"Restored database is not readable: {exc}") from exc
196 return {
197 "restored": True,
198 "backup_filename": backup_filename,
199 "db_path": str(db_path.resolve()),
200 "safety_backup_path": safety_backup_path,
201 "current_version": current,
202 }