Coverage for src / tracekit / config / migration.py: 94%
120 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Schema migration system for TraceKit configuration files.
3This module provides schema migration functionality to automatically upgrade
4configuration files between schema versions while preserving user data.
7Example:
8 >>> from tracekit.config.migration import migrate_config, register_migration
9 >>> # Register a migration function
10 >>> def migrate_1_0_to_1_1(config: dict) -> dict:
11 ... config['new_field'] = 'default_value'
12 ... return config
13 >>> register_migration("1.0.0", "1.1.0", migrate_1_0_to_1_1)
14 >>> # Migrate config to latest version
15 >>> old_config = {"version": "1.0.0", "name": "test"}
16 >>> new_config = migrate_config(old_config)
17 >>> print(new_config["version"])
18 1.1.0
19"""
21from __future__ import annotations
23import copy
24from collections.abc import Callable
25from dataclasses import dataclass
26from typing import Any
28from tracekit.core.exceptions import ConfigurationError
30# Type alias for migration functions
31MigrationFunction = Callable[[dict[str, Any]], dict[str, Any]]
34@dataclass
35class Migration:
36 """Schema migration definition.
38 Attributes:
39 from_version: Source schema version (semver).
40 to_version: Target schema version (semver).
41 migrate_fn: Function to perform migration.
42 description: Human-readable description of changes.
43 """
45 from_version: str
46 to_version: str
47 migrate_fn: MigrationFunction
48 description: str = ""
50 def __post_init__(self) -> None:
51 """Validate migration after initialization."""
52 if not self.from_version:
53 raise ValueError("from_version cannot be empty")
54 if not self.to_version:
55 raise ValueError("to_version cannot be empty")
56 if not callable(self.migrate_fn):
57 raise ValueError("migrate_fn must be callable")
60class SchemaMigration:
61 """Schema migration manager with version tracking.
63 Manages registration of migration functions and execution of migration
64 paths from older schema versions to newer ones.
66 Supports forward migrations only (not backward) to maintain data integrity.
67 Preserves unknown keys during migration to avoid data loss.
69 Example:
70 >>> migration = SchemaMigration()
71 >>> migration.register_migration("1.0.0", "1.1.0", upgrade_fn)
72 >>> config = {"version": "1.0.0", "data": "value"}
73 >>> migrated = migration.migrate_config(config, "1.1.0")
74 >>> print(migrated["version"])
75 1.1.0
76 """
78 def __init__(self) -> None:
79 """Initialize empty migration registry."""
80 # Map of (from_version, to_version) -> Migration
81 self._migrations: dict[tuple[str, str], Migration] = {}
82 # Map of from_version -> list of to_versions for path finding
83 self._version_graph: dict[str, list[str]] = {}
85 def register_migration(
86 self,
87 from_version: str,
88 to_version: str,
89 migrate_fn: MigrationFunction,
90 *,
91 description: str = "",
92 ) -> None:
93 """Register a migration function.
95 Args:
96 from_version: Source schema version (semver).
97 to_version: Target schema version (semver).
98 migrate_fn: Function that takes config dict and returns migrated dict.
99 description: Human-readable description of migration.
101 Raises:
102 ValueError: If migration already registered for this version pair.
104 Example:
105 >>> migration = SchemaMigration()
106 >>> def upgrade(cfg): return {**cfg, "new_field": "default"}
107 >>> migration.register_migration("1.0.0", "1.1.0", upgrade)
108 """
109 key = (from_version, to_version)
111 if key in self._migrations:
112 raise ValueError(f"Migration from {from_version} to {to_version} already registered")
114 mig = Migration(
115 from_version=from_version,
116 to_version=to_version,
117 migrate_fn=migrate_fn,
118 description=description,
119 )
121 self._migrations[key] = mig
123 # Update graph for path finding
124 if from_version not in self._version_graph:
125 self._version_graph[from_version] = []
126 self._version_graph[from_version].append(to_version)
128 def migrate_config(
129 self,
130 config: dict[str, Any],
131 target_version: str | None = None,
132 ) -> dict[str, Any]:
133 """Migrate config to target version.
135 Automatically finds migration path and applies migrations in sequence.
136 Preserves unknown keys during migration.
138 Args:
139 config: Configuration dictionary to migrate.
140 target_version: Target schema version. If None, migrate to latest.
142 Returns:
143 Migrated configuration dictionary.
145 Raises:
146 ConfigurationError: If migration path not found or migration fails.
148 Example:
149 >>> migration = SchemaMigration()
150 >>> config = {"version": "1.0.0", "name": "test"}
151 >>> migrated = migration.migrate_config(config, "2.0.0")
152 """
153 # Make a deep copy to avoid mutating input
154 result = copy.deepcopy(config)
156 # Get current version
157 current_version = self.get_config_version(result)
159 # If no version field, add default
160 if current_version is None:
161 result["version"] = "1.0.0"
162 current_version = "1.0.0"
164 # If no target specified, use latest available
165 if target_version is None:
166 target_version = self._get_latest_version(current_version)
167 if target_version is None:
168 # No migrations available, return as-is
169 return result
171 # Already at target version
172 if current_version == target_version:
173 return result
175 # Find migration path
176 path = self._find_migration_path(current_version, target_version)
178 if path is None:
179 available = self.list_migrations()
180 raise ConfigurationError(
181 f"No migration path from {current_version} to {target_version}",
182 details=f"Available migrations: {available}",
183 fix_hint=f"Register migrations to connect {current_version} to {target_version}",
184 )
186 # Apply migrations in sequence
187 for from_ver, to_ver in path:
188 migration = self._migrations[(from_ver, to_ver)]
190 try:
191 result = migration.migrate_fn(result)
192 # Update version field
193 result["version"] = to_ver
194 except Exception as e:
195 raise ConfigurationError(
196 f"Migration from {from_ver} to {to_ver} failed",
197 details=str(e),
198 fix_hint="Check migration function implementation",
199 ) from e
201 return result
203 def get_config_version(self, config: dict[str, Any]) -> str | None:
204 """Extract version from config.
206 Args:
207 config: Configuration dictionary.
209 Returns:
210 Version string or None if not present.
212 Example:
213 >>> migration = SchemaMigration()
214 >>> config = {"version": "1.2.3", "data": "value"}
215 >>> migration.get_config_version(config)
216 '1.2.3'
217 """
218 return config.get("version")
220 def list_migrations(self) -> list[tuple[str, str]]:
221 """List available migrations.
223 Returns:
224 List of (from_version, to_version) tuples.
226 Example:
227 >>> migration = SchemaMigration()
228 >>> migration.register_migration("1.0.0", "1.1.0", lambda c: c)
229 >>> migration.list_migrations()
230 [('1.0.0', '1.1.0')]
231 """
232 return sorted(self._migrations.keys())
234 def has_migration(self, from_version: str, to_version: str) -> bool:
235 """Check if migration exists.
237 Args:
238 from_version: Source version.
239 to_version: Target version.
241 Returns:
242 True if migration exists.
243 """
244 return (from_version, to_version) in self._migrations
246 def _find_migration_path(
247 self,
248 from_version: str,
249 to_version: str,
250 ) -> list[tuple[str, str]] | None:
251 """Find shortest migration path using BFS.
253 Args:
254 from_version: Source version.
255 to_version: Target version.
257 Returns:
258 List of (from, to) version pairs representing migration path,
259 or None if no path exists.
260 """
261 if from_version == to_version: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 return []
264 # BFS to find shortest path
265 queue: list[tuple[str, list[tuple[str, str]]]] = [(from_version, [])]
266 visited = {from_version}
268 while queue:
269 current, path = queue.pop(0)
271 # Get all possible next versions from current
272 if current in self._version_graph:
273 for next_version in self._version_graph[current]:
274 if next_version in visited: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 continue
277 new_path = [*path, (current, next_version)]
279 if next_version == to_version:
280 return new_path
282 visited.add(next_version)
283 queue.append((next_version, new_path))
285 return None
287 def _get_latest_version(self, from_version: str) -> str | None:
288 """Get latest version reachable from given version.
290 Args:
291 from_version: Starting version.
293 Returns:
294 Latest version string or None if no migrations available.
295 """
296 if from_version not in self._version_graph:
297 return None
299 # Find all reachable versions
300 reachable = set()
301 queue = [from_version]
302 visited = {from_version}
304 while queue:
305 current = queue.pop(0)
307 if current in self._version_graph:
308 for next_version in self._version_graph[current]:
309 if next_version not in visited: 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true
310 visited.add(next_version)
311 reachable.add(next_version)
312 queue.append(next_version)
314 if not reachable: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 return None
317 # Sort versions and return latest (simple lexicographic sort)
318 # For proper semver comparison, could use packaging.version
319 return sorted(reachable, key=_parse_version)[-1]
322# Global migration registry
323_global_migration: SchemaMigration | None = None
326def get_migration_registry() -> SchemaMigration:
327 """Get the global migration registry.
329 Initializes with built-in migrations on first call.
331 Returns:
332 Global SchemaMigration instance.
333 """
334 global _global_migration
336 if _global_migration is None:
337 _global_migration = SchemaMigration()
338 _register_builtin_migrations(_global_migration)
340 return _global_migration
343def register_migration(
344 from_version: str,
345 to_version: str,
346 migrate_fn: MigrationFunction,
347 *,
348 description: str = "",
349) -> None:
350 """Register a migration with the global registry.
352 Args:
353 from_version: Source schema version.
354 to_version: Target schema version.
355 migrate_fn: Migration function.
356 description: Human-readable description.
357 """
358 get_migration_registry().register_migration(
359 from_version, to_version, migrate_fn, description=description
360 )
363def migrate_config(
364 config: dict[str, Any],
365 target_version: str | None = None,
366) -> dict[str, Any]:
367 """Migrate configuration to target version using global registry.
369 Args:
370 config: Configuration to migrate.
371 target_version: Target version or None for latest.
373 Returns:
374 Migrated configuration.
375 """
376 return get_migration_registry().migrate_config(config, target_version)
379def get_config_version(config: dict[str, Any]) -> str | None:
380 """Get version from configuration.
382 Args:
383 config: Configuration dictionary.
385 Returns:
386 Version string or None.
387 """
388 return get_migration_registry().get_config_version(config)
391def list_migrations() -> list[tuple[str, str]]:
392 """List all registered migrations.
394 Returns:
395 List of (from_version, to_version) tuples.
396 """
397 return get_migration_registry().list_migrations()
400def _parse_version(version: str) -> tuple[int, ...]:
401 """Parse semver version string into tuple for comparison.
403 Args:
404 version: Version string (e.g., "1.2.3").
406 Returns:
407 Tuple of integers (e.g., (1, 2, 3)).
408 """
409 try:
410 return tuple(int(part) for part in version.split("."))
411 except (ValueError, AttributeError):
412 # Return (0, 0, 0) for invalid versions
413 return (0, 0, 0)
416def _register_builtin_migrations(migration: SchemaMigration) -> None:
417 """Register built-in migrations for core schemas.
419 Args:
420 migration: Migration registry to populate.
421 """
423 # Example migration for protocol schema (1.0.0 -> 1.1.0)
424 # This is a placeholder - real migrations would be added as needed
425 def _migrate_protocol_1_0_to_1_1(config: dict[str, Any]) -> dict[str, Any]:
426 """Migrate protocol config from 1.0.0 to 1.1.0.
428 Example migration that preserves all existing fields.
430 Args:
431 config: Configuration dictionary to migrate.
433 Returns:
434 Migrated configuration dictionary.
435 """
436 # Keep all existing fields (preserves unknown keys)
437 # Add new optional fields with defaults if needed
438 return config
440 # Register when actual schema changes are needed
441 # migration.register_migration(
442 # "1.0.0",
443 # "1.1.0",
444 # _migrate_protocol_1_0_to_1_1,
445 # description="Protocol schema update for new features",
446 # )
449__all__ = [
450 "Migration",
451 "MigrationFunction",
452 "SchemaMigration",
453 "get_config_version",
454 "get_migration_registry",
455 "list_migrations",
456 "migrate_config",
457 "register_migration",
458]