Coverage for src \ truenex_memory \ core \ migration.py: 93%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Local schema migration support. 

2 

3Provides safe, idempotent migration primitives with automatic pre-migration 

4database backup, backup listing, and local restore with a safety backup before 

5overwrite. 

6""" 

7 

8from __future__ import annotations 

9 

10from datetime import datetime, timezone 

11from pathlib import Path 

12import shutil 

13import sqlite3 

14 

15from truenex_memory.release.version import DB_SCHEMA_VERSION 

16from truenex_memory.store.sqlite import connect, initialize_schema, apply_column_upgrades 

17 

18 

19MigrationStatus = dict[str, object] 

20MigrationResult = dict[str, object] 

21BackupEntry = dict[str, object] 

22 

23 

24def get_current_schema_version(conn: sqlite3.Connection) -> str: 

25 """Return the highest schema version applied in the database. 

26 

27 Returns ``"0"`` when no migrations have been applied (missing table or row). 

28 """ 

29 try: 

30 row = conn.execute( 

31 "SELECT version FROM schema_migrations ORDER BY CAST(version AS INTEGER) DESC LIMIT 1" 

32 ).fetchone() 

33 except sqlite3.OperationalError: 

34 return "0" 

35 return row["version"] if row else "0" 

36 

37 

38def get_latest_schema_version() -> str: 

39 """Return the schema version defined in the current code.""" 

40 return DB_SCHEMA_VERSION 

41 

42 

43def migration_status(db_path: Path) -> MigrationStatus: 

44 """Return a dict with *current_version*, *latest_version*, and *pending*. 

45 

46 Does **not** create the database file when it does not exist. 

47 """ 

48 if not db_path.exists(): 

49 return { 

50 "current_version": "0", 

51 "latest_version": get_latest_schema_version(), 

52 "pending": True, 

53 } 

54 

55 with connect(db_path) as conn: 

56 current = get_current_schema_version(conn) 

57 

58 return { 

59 "current_version": current, 

60 "latest_version": get_latest_schema_version(), 

61 "pending": current != get_latest_schema_version(), 

62 } 

63 

64 

65def backup_database(db_path: Path, backups_dir: Path) -> Path | None: 

66 """Copy the database file into *backups_dir* with a UTC timestamp suffix. 

67 

68 Returns the backup path, or ``None`` when *db_path* does not exist. 

69 """ 

70 if not db_path.exists(): 

71 return None 

72 

73 backups_dir.mkdir(parents=True, exist_ok=True) 

74 timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") 

75 backup_name = f"{db_path.stem}_{timestamp}{db_path.suffix}" 

76 backup_path = backups_dir / backup_name 

77 shutil.copy2(db_path, backup_path) 

78 return backup_path 

79 

80 

81def migrate_apply(db_path: Path, backups_dir: Path) -> MigrationResult: 

82 """Apply pending schema migrations. 

83 

84 1. Checks whether migrations are pending. 

85 2. Creates a timestamped backup of the database (when it exists) **before** 

86 applying any changes. 

87 3. Runs ``initialize_schema`` idempotently (``CREATE TABLE IF NOT EXISTS``, 

88 ``INSERT OR IGNORE`` for ``schema_migrations``). 

89 

90 Returns a JSON-safe dict with previous/current/latest versions, 

91 *backup_path* (``None`` when no backup was taken), and *applied*. 

92 """ 

93 before = migration_status(db_path) 

94 if not before["pending"]: 

95 return { 

96 **before, 

97 "previous_version": before["current_version"], 

98 "backup_path": None, 

99 "applied": False, 

100 } 

101 

102 backup_path = backup_database(db_path, backups_dir) if db_path.exists() else None 

103 

104 with connect(db_path) as conn: 

105 initialize_schema(conn) 

106 apply_column_upgrades(conn) 

107 

108 after = migration_status(db_path) 

109 return { 

110 **after, 

111 "previous_version": before["current_version"], 

112 "backup_path": str(backup_path) if backup_path else None, 

113 "applied": True, 

114 } 

115 

116 

117def list_backups(backups_dir: Path) -> list[BackupEntry]: 

118 """Return a list of migration backups sorted newest-first. 

119 

120 Each entry is a JSON-safe dict with *filename*, *path*, *size_bytes*, 

121 and *created* (ISO-8601 UTC timestamp from the filesystem). 

122 """ 

123 if not backups_dir.exists(): 

124 return [] 

125 

126 entries: list[BackupEntry] = [] 

127 for f in sorted(backups_dir.iterdir(), key=lambda p: p.name, reverse=True): 

128 if not f.is_file() or f.suffix != ".db": 

129 continue 

130 stat = f.stat() 

131 entries.append( 

132 { 

133 "filename": f.name, 

134 "path": str(f.resolve()), 

135 "size_bytes": stat.st_size, 

136 "created": datetime.fromtimestamp( 

137 stat.st_ctime, tz=timezone.utc 

138 ).isoformat(), 

139 } 

140 ) 

141 return entries 

142 

143 

144def restore_backup( 

145 db_path: Path, backups_dir: Path, backup_filename: str 

146) -> MigrationResult: 

147 """Restore *backup_filename* from *backups_dir* onto *db_path*. 

148 

149 1. Resolves the backup path and validates it stays inside *backups_dir* 

150 (path-traversal rejection). 

151 2. Confirms the backup file exists. 

152 3. Creates a pre-restore safety backup of the current database (if it 

153 exists) before overwriting. 

154 4. Copies the backup file over the active database. 

155 5. Verifies the restored database is readable. 

156 

157 Returns a JSON-safe dict with *restored*, *backup_filename*, *db_path*, 

158 *safety_backup_path* (``None`` when no safety backup was taken), and 

159 the *current_version* read from the restored database. 

160 """ 

161 backup_name = Path(backup_filename) 

162 if backup_name.name != backup_filename: 

163 raise ValueError(f"Backup name must be a filename: {backup_filename!r}") 

164 if backup_name.suffix != ".db": 

165 raise ValueError(f"Backup must be a .db file: {backup_filename!r}") 

166 

167 backups_dir = backups_dir.resolve() 

168 backup_path = (backups_dir / backup_filename).resolve() 

169 try: 

170 backup_path.relative_to(backups_dir) 

171 except ValueError as exc: 

172 raise ValueError(f"Backup filename escapes backups_dir: {backup_filename!r}") from exc 

173 

174 if not backup_path.exists(): 

175 raise FileNotFoundError(f"Backup not found: {backup_path}") 

176 if not backup_path.is_file(): 

177 raise ValueError(f"Backup is not a file: {backup_path}") 

178 

179 # Pre-restore safety backup 

180 safety_backup_path: str | None = None 

181 if db_path.exists(): 

182 safety = backup_database(db_path, backups_dir) 

183 if safety is not None: 

184 safety_backup_path = str(safety.resolve()) 

185 

186 db_path.parent.mkdir(parents=True, exist_ok=True) 

187 shutil.copy2(backup_path, db_path) 

188 

189 # Verify the restored database is readable 

190 try: 

191 with connect(db_path) as conn: 

192 current = get_current_schema_version(conn) 

193 except Exception as exc: 

194 raise RuntimeError(f"Restored database is not readable: {exc}") from exc 

195 

196 return { 

197 "restored": True, 

198 "backup_filename": backup_filename, 

199 "db_path": str(db_path.resolve()), 

200 "safety_backup_path": safety_backup_path, 

201 "current_version": current, 

202 }