Coverage for frappe_manager / migration_manager / migration_executor.py: 67%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1from pathlib import Path
3from frappe_manager import CLI_BENCHES_DIRECTORY
4from frappe_manager.logger import log
5from frappe_manager.metadata_manager import FMConfigManager
6from frappe_manager.migration_manager.bench_migration_state import get_bench_migration_version
7from frappe_manager.migration_manager.migration_constants import MINIMUM_SUPPORTED_VERSION
8from frappe_manager.migration_manager.migration_discovery import MigrationDiscovery
9from frappe_manager.migration_manager.migration_error_handler import MigrationErrorHandler
10from frappe_manager.migration_manager.migration_exections import (
11 MigrationExceptionInBench,
12)
13from frappe_manager.migration_manager.migration_helpers import MigrationBench, MigrationBenches
14from frappe_manager.migration_manager.migration_orchestrator import MigrationOrchestrator
15from frappe_manager.migration_manager.migration_validator import BenchFilter, MigrationValidator
16from frappe_manager.migration_manager.version import Version
17from frappe_manager.output_manager import OutputHandler
18from frappe_manager.output_manager.rich_output import RichOutputHandler
19from frappe_manager.utils.helpers import get_current_fm_version
22def needs_migration(fm_config_manager: FMConfigManager) -> bool:
23 prev_version = fm_config_manager.version
24 current_version = Version(get_current_fm_version())
25 return prev_version < current_version
28def needs_fm_infrastructure_migration(fm_config_manager: FMConfigManager) -> bool:
29 current_version = Version(get_current_fm_version())
30 fm_infrastructure_version = fm_config_manager.get_system_migration_version()
31 return fm_infrastructure_version < current_version
34def get_benches_needing_migration(benches_directory: Path, current_version: Version) -> list[str]:
35 from frappe_manager import CLI_BENCH_CONFIG_FILE_NAME
36 from frappe_manager.migration_manager.bench_migration_state import bench_needs_migration
38 needs_migration_list = []
40 if not benches_directory.exists():
41 return needs_migration_list
43 for bench_path in benches_directory.iterdir():
44 if bench_path.is_dir():
45 bench_config = bench_path / CLI_BENCH_CONFIG_FILE_NAME
46 if bench_config.exists():
47 if bench_needs_migration(bench_path, current_version):
48 needs_migration_list.append(bench_path.name)
50 return needs_migration_list
53class MigrationExecutor:
54 """
55 Migration executor class.
57 This class is responsible for executing migrations.
58 """
60 def __init__(
61 self,
62 fm_config_manager: FMConfigManager,
63 skip_backup: bool = False,
64 skip_backup_for: list[str] | None = None,
65 exclude_benches: list[str] | None = None,
66 auto_proceed: bool = False,
67 rerun: bool = False,
68 on_failure: str = "prompt",
69 target_benches: list[str] | None = None,
70 migrate_fm_infrastructure: bool = False,
71 output_handler: OutputHandler | None = None,
72 ):
73 self.fm_config_manager: FMConfigManager = fm_config_manager
74 self.rerun = rerun
75 self.prev_version = self.fm_config_manager.version
76 self.rollback_version = self.fm_config_manager.version
77 self.current_version = Version(get_current_fm_version())
78 self.migrations_path = Path(__file__).parent / "migrations"
79 self.logger = log.get_logger()
80 self.migrations = []
81 self.undo_stack = []
82 self.migrate_benches = {}
83 self.skip_backup = skip_backup
84 self.skip_backup_for = skip_backup_for or []
85 self.exclude_benches = exclude_benches or []
86 self.auto_proceed = auto_proceed
87 self.on_failure = on_failure
88 self.target_benches = target_benches
89 self.migrate_fm_infrastructure = migrate_fm_infrastructure
90 self.fm_infrastructure_needs_migration = False
91 self.output = output_handler or RichOutputHandler()
93 # Initialize helper classes (composition)
94 bench_filter = BenchFilter(target_benches=target_benches, exclude_benches=self.exclude_benches)
95 self.validator = MigrationValidator(
96 prev_version=self.prev_version,
97 current_version=self.current_version,
98 bench_filter=bench_filter,
99 output_handler=self.output,
100 )
101 self.discovery = MigrationDiscovery(self.migrations_path, output_handler=self.output)
102 self.orchestrator = MigrationOrchestrator(self)
103 self.error_handler = MigrationErrorHandler(self)
105 def _get_minimum_bench_version(self) -> Version:
106 """Get the minimum migration version across all target benches.
108 Returns the lowest version that needs migration. This is used to determine
109 which migration classes need to be loaded.
111 DEPRECATED: Use validator.get_minimum_bench_version() instead.
112 """
113 return self.validator.get_minimum_bench_version()
115 def _check_benches_need_migration(self) -> bool:
116 """Check if any benches need migration to current version.
118 DEPRECATED: Use validator.check_benches_need_migration() instead.
119 """
120 return self.validator.check_benches_need_migration()
122 def execute(self):
123 """
124 Execute the migration.
125 This method will execute the migration and return the number of
126 executed statements.
127 """
129 fm_infrastructure_version_outdated = self.rerun or (self.prev_version < self.current_version)
130 fm_infrastructure_needs_migration = self.migrate_fm_infrastructure and fm_infrastructure_version_outdated
131 self.fm_infrastructure_needs_migration = fm_infrastructure_needs_migration
132 benches_need_migration = self.rerun or self._check_benches_need_migration()
134 if not fm_infrastructure_needs_migration and not benches_need_migration:
135 return True
137 effective_prev_version = self.prev_version
138 if benches_need_migration:
139 min_bench_version = self._get_minimum_bench_version()
140 effective_prev_version = min(self.prev_version, min_bench_version)
142 # When --rerun is active, ensure migrations are discovered even when
143 # prev_version == current_version. The strict ``<`` in discovery
144 # (``from_version < migration.version``) would otherwise exclude the
145 # current version's migration class.
146 #
147 # We narrow the range to the current base version's minor floor
148 # (e.g. 0.18.9999 for 0.19.x) rather than ``0.0.0``, so that only
149 # migrations belonging to the current release are included and old,
150 # potentially non-idempotent migrations are not re-applied.
151 if self.rerun and effective_prev_version >= self.current_version:
152 from packaging.version import Version as PV
154 parsed = PV(self.current_version.version)
155 base = parsed.base_version # e.g. "0.19.0"
156 parts = base.split(".")
157 floor = Version(f"{parts[0]}.{int(parts[1]) - 1}.9999")
158 effective_prev_version = min(effective_prev_version, floor)
160 if effective_prev_version != Version("0.0.0") and effective_prev_version < MINIMUM_SUPPORTED_VERSION:
161 self.validator.validate_version_support(effective_prev_version)
162 return False
164 # Discovery: Load migration classes dynamically
165 self.migrations = self.discovery.discover_migrations(effective_prev_version, self.current_version, self)
167 if self.migrations:
168 if fm_infrastructure_needs_migration:
169 self.output.print(
170 f"FM Infrastructure: [yellow]v{self.prev_version}[/yellow] → [green]v{self.current_version}[/green]",
171 emoji_code="",
172 )
173 self.output.print(" • CLI configuration", emoji_code="")
174 self.output.print(" • Global services (MariaDB, Nginx-Proxy)", emoji_code="")
176 if benches_need_migration and self.target_benches:
177 self.output.print("", emoji_code="")
178 self.output.print("Benches:", emoji_code="")
179 benches_manager = MigrationBenches(CLI_BENCHES_DIRECTORY)
180 all_benches = benches_manager.get_all_benches()
182 for bench_name in self.target_benches:
183 if bench_name in self.exclude_benches:
184 continue
186 if bench_name in all_benches:
187 bench_path = all_benches[bench_name].parent
188 bench_version = get_bench_migration_version(bench_path)
190 if bench_version < self.current_version:
191 self.output.print(
192 f" • {bench_name}: [yellow]v{bench_version}[/yellow] → [green]v{self.current_version}[/green]",
193 emoji_code="",
194 )
196 self.output.print("", emoji_code="")
198 self.output.print("Migration versions:", emoji_code="")
199 for migration in self.migrations:
200 self.output.print(f" • v{migration.version}", emoji_code="")
202 self.output.print("", emoji_code="")
203 self.output.print("This process may take a while.", emoji_code="")
204 self.output.print(
205 "Manual guide: https://github.com/rtCamp/Frappe-Manager/wiki/Migrations#manual-migration-procedure",
206 emoji_code="",
207 )
209 self.output.print("", emoji_code="")
211 if self.target_benches:
212 benches_manager = MigrationBenches(CLI_BENCHES_DIRECTORY)
213 all_benches = benches_manager.get_all_benches()
214 running = []
215 for bench_name in self.target_benches:
216 if bench_name in all_benches:
217 bench_path = all_benches[bench_name].parent
218 bench = MigrationBench(bench_name, bench_path, output=self.output)
219 if bench.running or bench.workers_running:
220 running.append(bench_name)
221 if running:
222 self.output.warning(
223 f"The following target benches are currently running and will be restarted (containers recreated) during migration: {', '.join(running)}",
224 )
225 self.output.print(
226 "If you'd prefer no disruption, stop these benches (fm stop <bench>) and re-run migration.",
227 )
228 self.output.print("", emoji_code="")
230 if not self.auto_proceed:
231 continue_migration = self.output.prompt_ask(
232 prompt="Do you want to proceed?",
233 choices=[
234 {"name": "yes - Start migration", "value": "yes"},
235 {"name": "no - Abort and revert to previous fm version", "value": "no"},
236 ],
237 required_flag="--auto-proceed",
238 )
239 else:
240 continue_migration = "yes"
241 self.output.print("Proceeding with migration (--auto-proceed)", emoji_code="")
243 if continue_migration == "no":
244 self.output.print("", emoji_code="")
245 self.output.print(
246 f"Migration aborted. To revert to v{self.prev_version.version!s}, run:",
247 emoji_code="",
248 )
249 self.output.print(f" uv tool install frappe-manager=={self.prev_version.version!s}", emoji_code="")
250 self.output.print("", emoji_code="")
251 return False
253 # Orchestration: Execute migrations with error handling
254 try:
255 self.orchestrator.execute_migrations()
256 self.undo_stack = self.orchestrator.undo_stack
257 self.error_handler.finalize_success()
258 return True
260 except MigrationExceptionInBench as e:
261 return self.error_handler.handle_bench_migration_failure(e)
263 except Exception as e:
264 return self.error_handler.handle_system_migration_failure(e)
266 def set_bench_data(
267 self,
268 bench: MigrationBench,
269 exception=None,
270 migration_version: Version | None = None,
271 traceback_str: str | None = None,
272 ):
273 self.migrate_benches[bench.name] = {
274 "object": bench,
275 "exception": exception,
276 "last_migration_version": migration_version,
277 "traceback": traceback_str,
278 }
280 def get_site_data(self, bench_name):
281 """Get migration data for a specific bench."""
282 try:
283 data = self.migrate_benches[bench_name]
284 except KeyError as e:
285 return None
286 return data
288 def rollback(self):
289 """
290 Rollback the migration.
292 DEPRECATED: Use orchestrator.rollback_migrations() instead.
293 This method is kept for backward compatibility.
294 """
295 self.orchestrator.undo_stack = self.undo_stack
296 self.orchestrator.rollback_migrations()