Coverage for frappe_manager / migration_manager / migration_orchestrator.py: 61%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1""" 

2Migration orchestration and execution flow. 

3 

4Handles the main migration loop, progress tracking, error handling coordination, 

5and service management during migrations. 

6""" 

7 

8from typing import TYPE_CHECKING 

9 

10from frappe_manager.logger import log 

11from frappe_manager.migration_manager.migration_exections import MigrationExceptionInBench 

12from frappe_manager.utils.helpers import capture_and_format_exception 

13 

14if TYPE_CHECKING: 

15 from frappe_manager.migration_manager.migration_base import MigrationBase 

16 from frappe_manager.migration_manager.migration_executor import MigrationExecutor 

17 

18 

19class MigrationOrchestrator: 

20 """ 

21 Orchestrates migration execution flow. 

22 

23 Responsibilities: 

24 - Run migration loop through all migrations 

25 - Track undo stack for rollbacks 

26 - Handle MigrationExceptionInBench (partial failures) 

27 - Coordinate with services manager 

28 - Update rollback version tracking 

29 """ 

30 

31 def __init__(self, executor: "MigrationExecutor"): 

32 self.executor = executor 

33 self.logger = log.get_logger() 

34 self.undo_stack: list[MigrationBase] = [] 

35 self.exception_in_bench_occurred = False 

36 

37 def execute_migrations(self) -> bool: 

38 """ 

39 Execute all migrations in sequence. 

40 

41 Returns: 

42 bool: True if all migrations succeeded, False if any failed 

43 

44 Raises: 

45 MigrationExceptionInBench: If any bench migration failed 

46 Exception: If any system migration failed 

47 """ 

48 self._ensure_global_services_running() 

49 

50 prev_migration: MigrationBase | None = None 

51 

52 for migration in self.executor.migrations: 

53 self.executor.output.update_head(f"Running migration introduced in v{migration.version}") 

54 self.logger.info(f"[{migration.version}] : Migration starting") 

55 

56 try: 

57 self.undo_stack.append(migration) 

58 migration.up() 

59 

60 prev_migration = migration 

61 if not self.exception_in_bench_occurred: 

62 self.executor.rollback_version = prev_migration.get_rollback_version() 

63 

64 except MigrationExceptionInBench as e: 

65 captured_output = capture_and_format_exception(traceback_max_frames=0) 

66 self.logger.error(f"[{migration.version}] : Migration Failed\n{captured_output}") 

67 

68 # If this isn't the last migration, mark exception but continue 

69 if migration.version < self.executor.migrations[-1].version: 

70 self.exception_in_bench_occurred = True 

71 continue 

72 raise e 

73 

74 except Exception as e: 

75 captured_output = capture_and_format_exception() 

76 self.logger.error(f"[{migration.version}] : Migration Failed\n{captured_output}") 

77 raise e 

78 

79 # If any bench exceptions occurred during migration loop, raise now 

80 if self.exception_in_bench_occurred: 

81 raise MigrationExceptionInBench("") 

82 

83 return True 

84 

85 def rollback_migrations(self): 

86 """ 

87 Rollback all migrations in undo stack that are newer than rollback_version. 

88 

89 Iterates through undo stack in reverse order and calls down() on each migration. 

90 """ 

91 for migration in reversed(self.undo_stack): 

92 if migration.version > self.executor.rollback_version: 

93 self.executor.output.change_head(f"Rolling back migration introduced in v{migration.version}") 

94 self.logger.info(f"[{migration.version}] : Rollback starting") 

95 try: 

96 migration.down() 

97 except Exception as e: 

98 self.logger.error(f"[{migration.version}] : Rollback Failed\n{e}") 

99 raise e 

100 

101 def _ensure_global_services_running(self): 

102 """ 

103 Ensure global services (MariaDB/PostgreSQL) are running before migration. 

104 

105 Creates services if not initialized, starts them if stopped. 

106 Logs warnings if services check fails but continues migration. 

107 """ 

108 from frappe_manager.output_manager.context_managers import temporary_stop 

109 from frappe_manager.output_manager.silent_output import SilentOutputHandler 

110 from frappe_manager.services_manager.services import ServicesManager 

111 

112 try: 

113 services_manager = ServicesManager(output_handler=SilentOutputHandler()) 

114 

115 if not services_manager.path.exists(): 

116 with temporary_stop(self.executor.output): 

117 self.executor.output.print( 

118 "Global services not initialized. Creating...", 

119 emoji_code=":construction:", 

120 ) 

121 services_manager.init() 

122 services_manager.entrypoint_checks(start=True) 

123 self.executor.output.print("Global services started successfully", emoji_code=":white_check_mark:") 

124 return 

125 

126 services_manager.init() 

127 

128 services_list = services_manager.compose_file_manager.get_services_list() 

129 all_running = all(services_manager.is_service_running(svc) for svc in services_list) 

130 

131 if not all_running: 

132 with temporary_stop(self.executor.output): 

133 self.executor.output.print( 

134 "Global services not running. Starting them now...", 

135 emoji_code=":construction:", 

136 ) 

137 services_manager.start_service() 

138 self.executor.output.print("Global services started successfully", emoji_code=":white_check_mark:") 

139 except Exception as e: 

140 self.logger.error(f"Failed to ensure global services are running: {e}") 

141 with temporary_stop(self.executor.output): 

142 self.executor.output.print( 

143 "Warning: Could not verify/start global services. " 

144 "Migration may fail if services are not running. " 

145 "Try manually: fm services start", 

146 emoji_code=":warning:", 

147 )