Coverage for src / tracekit / config / migration.py: 94%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Schema migration system for TraceKit configuration files. 

2 

3This module provides schema migration functionality to automatically upgrade 

4configuration files between schema versions while preserving user data. 

5 

6 

7Example: 

8 >>> from tracekit.config.migration import migrate_config, register_migration 

9 >>> # Register a migration function 

10 >>> def migrate_1_0_to_1_1(config: dict) -> dict: 

11 ... config['new_field'] = 'default_value' 

12 ... return config 

13 >>> register_migration("1.0.0", "1.1.0", migrate_1_0_to_1_1) 

14 >>> # Migrate config to latest version 

15 >>> old_config = {"version": "1.0.0", "name": "test"} 

16 >>> new_config = migrate_config(old_config) 

17 >>> print(new_config["version"]) 

18 1.1.0 

19""" 

20 

21from __future__ import annotations 

22 

23import copy 

24from collections.abc import Callable 

25from dataclasses import dataclass 

26from typing import Any 

27 

28from tracekit.core.exceptions import ConfigurationError 

29 

30# Type alias for migration functions 

31MigrationFunction = Callable[[dict[str, Any]], dict[str, Any]] 

32 

33 

34@dataclass 

35class Migration: 

36 """Schema migration definition. 

37 

38 Attributes: 

39 from_version: Source schema version (semver). 

40 to_version: Target schema version (semver). 

41 migrate_fn: Function to perform migration. 

42 description: Human-readable description of changes. 

43 """ 

44 

45 from_version: str 

46 to_version: str 

47 migrate_fn: MigrationFunction 

48 description: str = "" 

49 

50 def __post_init__(self) -> None: 

51 """Validate migration after initialization.""" 

52 if not self.from_version: 

53 raise ValueError("from_version cannot be empty") 

54 if not self.to_version: 

55 raise ValueError("to_version cannot be empty") 

56 if not callable(self.migrate_fn): 

57 raise ValueError("migrate_fn must be callable") 

58 

59 

60class SchemaMigration: 

61 """Schema migration manager with version tracking. 

62 

63 Manages registration of migration functions and execution of migration 

64 paths from older schema versions to newer ones. 

65 

66 Supports forward migrations only (not backward) to maintain data integrity. 

67 Preserves unknown keys during migration to avoid data loss. 

68 

69 Example: 

70 >>> migration = SchemaMigration() 

71 >>> migration.register_migration("1.0.0", "1.1.0", upgrade_fn) 

72 >>> config = {"version": "1.0.0", "data": "value"} 

73 >>> migrated = migration.migrate_config(config, "1.1.0") 

74 >>> print(migrated["version"]) 

75 1.1.0 

76 """ 

77 

78 def __init__(self) -> None: 

79 """Initialize empty migration registry.""" 

80 # Map of (from_version, to_version) -> Migration 

81 self._migrations: dict[tuple[str, str], Migration] = {} 

82 # Map of from_version -> list of to_versions for path finding 

83 self._version_graph: dict[str, list[str]] = {} 

84 

85 def register_migration( 

86 self, 

87 from_version: str, 

88 to_version: str, 

89 migrate_fn: MigrationFunction, 

90 *, 

91 description: str = "", 

92 ) -> None: 

93 """Register a migration function. 

94 

95 Args: 

96 from_version: Source schema version (semver). 

97 to_version: Target schema version (semver). 

98 migrate_fn: Function that takes config dict and returns migrated dict. 

99 description: Human-readable description of migration. 

100 

101 Raises: 

102 ValueError: If migration already registered for this version pair. 

103 

104 Example: 

105 >>> migration = SchemaMigration() 

106 >>> def upgrade(cfg): return {**cfg, "new_field": "default"} 

107 >>> migration.register_migration("1.0.0", "1.1.0", upgrade) 

108 """ 

109 key = (from_version, to_version) 

110 

111 if key in self._migrations: 

112 raise ValueError(f"Migration from {from_version} to {to_version} already registered") 

113 

114 mig = Migration( 

115 from_version=from_version, 

116 to_version=to_version, 

117 migrate_fn=migrate_fn, 

118 description=description, 

119 ) 

120 

121 self._migrations[key] = mig 

122 

123 # Update graph for path finding 

124 if from_version not in self._version_graph: 

125 self._version_graph[from_version] = [] 

126 self._version_graph[from_version].append(to_version) 

127 

128 def migrate_config( 

129 self, 

130 config: dict[str, Any], 

131 target_version: str | None = None, 

132 ) -> dict[str, Any]: 

133 """Migrate config to target version. 

134 

135 Automatically finds migration path and applies migrations in sequence. 

136 Preserves unknown keys during migration. 

137 

138 Args: 

139 config: Configuration dictionary to migrate. 

140 target_version: Target schema version. If None, migrate to latest. 

141 

142 Returns: 

143 Migrated configuration dictionary. 

144 

145 Raises: 

146 ConfigurationError: If migration path not found or migration fails. 

147 

148 Example: 

149 >>> migration = SchemaMigration() 

150 >>> config = {"version": "1.0.0", "name": "test"} 

151 >>> migrated = migration.migrate_config(config, "2.0.0") 

152 """ 

153 # Make a deep copy to avoid mutating input 

154 result = copy.deepcopy(config) 

155 

156 # Get current version 

157 current_version = self.get_config_version(result) 

158 

159 # If no version field, add default 

160 if current_version is None: 

161 result["version"] = "1.0.0" 

162 current_version = "1.0.0" 

163 

164 # If no target specified, use latest available 

165 if target_version is None: 

166 target_version = self._get_latest_version(current_version) 

167 if target_version is None: 

168 # No migrations available, return as-is 

169 return result 

170 

171 # Already at target version 

172 if current_version == target_version: 

173 return result 

174 

175 # Find migration path 

176 path = self._find_migration_path(current_version, target_version) 

177 

178 if path is None: 

179 available = self.list_migrations() 

180 raise ConfigurationError( 

181 f"No migration path from {current_version} to {target_version}", 

182 details=f"Available migrations: {available}", 

183 fix_hint=f"Register migrations to connect {current_version} to {target_version}", 

184 ) 

185 

186 # Apply migrations in sequence 

187 for from_ver, to_ver in path: 

188 migration = self._migrations[(from_ver, to_ver)] 

189 

190 try: 

191 result = migration.migrate_fn(result) 

192 # Update version field 

193 result["version"] = to_ver 

194 except Exception as e: 

195 raise ConfigurationError( 

196 f"Migration from {from_ver} to {to_ver} failed", 

197 details=str(e), 

198 fix_hint="Check migration function implementation", 

199 ) from e 

200 

201 return result 

202 

203 def get_config_version(self, config: dict[str, Any]) -> str | None: 

204 """Extract version from config. 

205 

206 Args: 

207 config: Configuration dictionary. 

208 

209 Returns: 

210 Version string or None if not present. 

211 

212 Example: 

213 >>> migration = SchemaMigration() 

214 >>> config = {"version": "1.2.3", "data": "value"} 

215 >>> migration.get_config_version(config) 

216 '1.2.3' 

217 """ 

218 return config.get("version") 

219 

220 def list_migrations(self) -> list[tuple[str, str]]: 

221 """List available migrations. 

222 

223 Returns: 

224 List of (from_version, to_version) tuples. 

225 

226 Example: 

227 >>> migration = SchemaMigration() 

228 >>> migration.register_migration("1.0.0", "1.1.0", lambda c: c) 

229 >>> migration.list_migrations() 

230 [('1.0.0', '1.1.0')] 

231 """ 

232 return sorted(self._migrations.keys()) 

233 

234 def has_migration(self, from_version: str, to_version: str) -> bool: 

235 """Check if migration exists. 

236 

237 Args: 

238 from_version: Source version. 

239 to_version: Target version. 

240 

241 Returns: 

242 True if migration exists. 

243 """ 

244 return (from_version, to_version) in self._migrations 

245 

246 def _find_migration_path( 

247 self, 

248 from_version: str, 

249 to_version: str, 

250 ) -> list[tuple[str, str]] | None: 

251 """Find shortest migration path using BFS. 

252 

253 Args: 

254 from_version: Source version. 

255 to_version: Target version. 

256 

257 Returns: 

258 List of (from, to) version pairs representing migration path, 

259 or None if no path exists. 

260 """ 

261 if from_version == to_version: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 return [] 

263 

264 # BFS to find shortest path 

265 queue: list[tuple[str, list[tuple[str, str]]]] = [(from_version, [])] 

266 visited = {from_version} 

267 

268 while queue: 

269 current, path = queue.pop(0) 

270 

271 # Get all possible next versions from current 

272 if current in self._version_graph: 

273 for next_version in self._version_graph[current]: 

274 if next_version in visited: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 continue 

276 

277 new_path = [*path, (current, next_version)] 

278 

279 if next_version == to_version: 

280 return new_path 

281 

282 visited.add(next_version) 

283 queue.append((next_version, new_path)) 

284 

285 return None 

286 

287 def _get_latest_version(self, from_version: str) -> str | None: 

288 """Get latest version reachable from given version. 

289 

290 Args: 

291 from_version: Starting version. 

292 

293 Returns: 

294 Latest version string or None if no migrations available. 

295 """ 

296 if from_version not in self._version_graph: 

297 return None 

298 

299 # Find all reachable versions 

300 reachable = set() 

301 queue = [from_version] 

302 visited = {from_version} 

303 

304 while queue: 

305 current = queue.pop(0) 

306 

307 if current in self._version_graph: 

308 for next_version in self._version_graph[current]: 

309 if next_version not in visited: 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true

310 visited.add(next_version) 

311 reachable.add(next_version) 

312 queue.append(next_version) 

313 

314 if not reachable: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 return None 

316 

317 # Sort versions and return latest (simple lexicographic sort) 

318 # For proper semver comparison, could use packaging.version 

319 return sorted(reachable, key=_parse_version)[-1] 

320 

321 

322# Global migration registry 

323_global_migration: SchemaMigration | None = None 

324 

325 

326def get_migration_registry() -> SchemaMigration: 

327 """Get the global migration registry. 

328 

329 Initializes with built-in migrations on first call. 

330 

331 Returns: 

332 Global SchemaMigration instance. 

333 """ 

334 global _global_migration 

335 

336 if _global_migration is None: 

337 _global_migration = SchemaMigration() 

338 _register_builtin_migrations(_global_migration) 

339 

340 return _global_migration 

341 

342 

343def register_migration( 

344 from_version: str, 

345 to_version: str, 

346 migrate_fn: MigrationFunction, 

347 *, 

348 description: str = "", 

349) -> None: 

350 """Register a migration with the global registry. 

351 

352 Args: 

353 from_version: Source schema version. 

354 to_version: Target schema version. 

355 migrate_fn: Migration function. 

356 description: Human-readable description. 

357 """ 

358 get_migration_registry().register_migration( 

359 from_version, to_version, migrate_fn, description=description 

360 ) 

361 

362 

363def migrate_config( 

364 config: dict[str, Any], 

365 target_version: str | None = None, 

366) -> dict[str, Any]: 

367 """Migrate configuration to target version using global registry. 

368 

369 Args: 

370 config: Configuration to migrate. 

371 target_version: Target version or None for latest. 

372 

373 Returns: 

374 Migrated configuration. 

375 """ 

376 return get_migration_registry().migrate_config(config, target_version) 

377 

378 

379def get_config_version(config: dict[str, Any]) -> str | None: 

380 """Get version from configuration. 

381 

382 Args: 

383 config: Configuration dictionary. 

384 

385 Returns: 

386 Version string or None. 

387 """ 

388 return get_migration_registry().get_config_version(config) 

389 

390 

391def list_migrations() -> list[tuple[str, str]]: 

392 """List all registered migrations. 

393 

394 Returns: 

395 List of (from_version, to_version) tuples. 

396 """ 

397 return get_migration_registry().list_migrations() 

398 

399 

400def _parse_version(version: str) -> tuple[int, ...]: 

401 """Parse semver version string into tuple for comparison. 

402 

403 Args: 

404 version: Version string (e.g., "1.2.3"). 

405 

406 Returns: 

407 Tuple of integers (e.g., (1, 2, 3)). 

408 """ 

409 try: 

410 return tuple(int(part) for part in version.split(".")) 

411 except (ValueError, AttributeError): 

412 # Return (0, 0, 0) for invalid versions 

413 return (0, 0, 0) 

414 

415 

416def _register_builtin_migrations(migration: SchemaMigration) -> None: 

417 """Register built-in migrations for core schemas. 

418 

419 Args: 

420 migration: Migration registry to populate. 

421 """ 

422 

423 # Example migration for protocol schema (1.0.0 -> 1.1.0) 

424 # This is a placeholder - real migrations would be added as needed 

425 def _migrate_protocol_1_0_to_1_1(config: dict[str, Any]) -> dict[str, Any]: 

426 """Migrate protocol config from 1.0.0 to 1.1.0. 

427 

428 Example migration that preserves all existing fields. 

429 

430 Args: 

431 config: Configuration dictionary to migrate. 

432 

433 Returns: 

434 Migrated configuration dictionary. 

435 """ 

436 # Keep all existing fields (preserves unknown keys) 

437 # Add new optional fields with defaults if needed 

438 return config 

439 

440 # Register when actual schema changes are needed 

441 # migration.register_migration( 

442 # "1.0.0", 

443 # "1.1.0", 

444 # _migrate_protocol_1_0_to_1_1, 

445 # description="Protocol schema update for new features", 

446 # ) 

447 

448 

449__all__ = [ 

450 "Migration", 

451 "MigrationFunction", 

452 "SchemaMigration", 

453 "get_config_version", 

454 "get_migration_registry", 

455 "list_migrations", 

456 "migrate_config", 

457 "register_migration", 

458]