Coverage for frappe_manager / migration_manager / migration_orchestrator.py: 61%
71 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1"""
2Migration orchestration and execution flow.
4Handles the main migration loop, progress tracking, error handling coordination,
5and service management during migrations.
6"""
8from typing import TYPE_CHECKING
10from frappe_manager.logger import log
11from frappe_manager.migration_manager.migration_exections import MigrationExceptionInBench
12from frappe_manager.utils.helpers import capture_and_format_exception
14if TYPE_CHECKING:
15 from frappe_manager.migration_manager.migration_base import MigrationBase
16 from frappe_manager.migration_manager.migration_executor import MigrationExecutor
19class MigrationOrchestrator:
20 """
21 Orchestrates migration execution flow.
23 Responsibilities:
24 - Run migration loop through all migrations
25 - Track undo stack for rollbacks
26 - Handle MigrationExceptionInBench (partial failures)
27 - Coordinate with services manager
28 - Update rollback version tracking
29 """
31 def __init__(self, executor: "MigrationExecutor"):
32 self.executor = executor
33 self.logger = log.get_logger()
34 self.undo_stack: list[MigrationBase] = []
35 self.exception_in_bench_occurred = False
37 def execute_migrations(self) -> bool:
38 """
39 Execute all migrations in sequence.
41 Returns:
42 bool: True if all migrations succeeded, False if any failed
44 Raises:
45 MigrationExceptionInBench: If any bench migration failed
46 Exception: If any system migration failed
47 """
48 self._ensure_global_services_running()
50 prev_migration: MigrationBase | None = None
52 for migration in self.executor.migrations:
53 self.executor.output.update_head(f"Running migration introduced in v{migration.version}")
54 self.logger.info(f"[{migration.version}] : Migration starting")
56 try:
57 self.undo_stack.append(migration)
58 migration.up()
60 prev_migration = migration
61 if not self.exception_in_bench_occurred:
62 self.executor.rollback_version = prev_migration.get_rollback_version()
64 except MigrationExceptionInBench as e:
65 captured_output = capture_and_format_exception(traceback_max_frames=0)
66 self.logger.error(f"[{migration.version}] : Migration Failed\n{captured_output}")
68 # If this isn't the last migration, mark exception but continue
69 if migration.version < self.executor.migrations[-1].version:
70 self.exception_in_bench_occurred = True
71 continue
72 raise e
74 except Exception as e:
75 captured_output = capture_and_format_exception()
76 self.logger.error(f"[{migration.version}] : Migration Failed\n{captured_output}")
77 raise e
79 # If any bench exceptions occurred during migration loop, raise now
80 if self.exception_in_bench_occurred:
81 raise MigrationExceptionInBench("")
83 return True
85 def rollback_migrations(self):
86 """
87 Rollback all migrations in undo stack that are newer than rollback_version.
89 Iterates through undo stack in reverse order and calls down() on each migration.
90 """
91 for migration in reversed(self.undo_stack):
92 if migration.version > self.executor.rollback_version:
93 self.executor.output.change_head(f"Rolling back migration introduced in v{migration.version}")
94 self.logger.info(f"[{migration.version}] : Rollback starting")
95 try:
96 migration.down()
97 except Exception as e:
98 self.logger.error(f"[{migration.version}] : Rollback Failed\n{e}")
99 raise e
101 def _ensure_global_services_running(self):
102 """
103 Ensure global services (MariaDB/PostgreSQL) are running before migration.
105 Creates services if not initialized, starts them if stopped.
106 Logs warnings if services check fails but continues migration.
107 """
108 from frappe_manager.output_manager.context_managers import temporary_stop
109 from frappe_manager.output_manager.silent_output import SilentOutputHandler
110 from frappe_manager.services_manager.services import ServicesManager
112 try:
113 services_manager = ServicesManager(output_handler=SilentOutputHandler())
115 if not services_manager.path.exists():
116 with temporary_stop(self.executor.output):
117 self.executor.output.print(
118 "Global services not initialized. Creating...",
119 emoji_code=":construction:",
120 )
121 services_manager.init()
122 services_manager.entrypoint_checks(start=True)
123 self.executor.output.print("Global services started successfully", emoji_code=":white_check_mark:")
124 return
126 services_manager.init()
128 services_list = services_manager.compose_file_manager.get_services_list()
129 all_running = all(services_manager.is_service_running(svc) for svc in services_list)
131 if not all_running:
132 with temporary_stop(self.executor.output):
133 self.executor.output.print(
134 "Global services not running. Starting them now...",
135 emoji_code=":construction:",
136 )
137 services_manager.start_service()
138 self.executor.output.print("Global services started successfully", emoji_code=":white_check_mark:")
139 except Exception as e:
140 self.logger.error(f"Failed to ensure global services are running: {e}")
141 with temporary_stop(self.executor.output):
142 self.executor.output.print(
143 "Warning: Could not verify/start global services. "
144 "Migration may fail if services are not running. "
145 "Try manually: fm services start",
146 emoji_code=":warning:",
147 )