Coverage for frappe_manager / migration_manager / migration_executor.py: 67%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1from pathlib import Path 

2 

3from frappe_manager import CLI_BENCHES_DIRECTORY 

4from frappe_manager.logger import log 

5from frappe_manager.metadata_manager import FMConfigManager 

6from frappe_manager.migration_manager.bench_migration_state import get_bench_migration_version 

7from frappe_manager.migration_manager.migration_constants import MINIMUM_SUPPORTED_VERSION 

8from frappe_manager.migration_manager.migration_discovery import MigrationDiscovery 

9from frappe_manager.migration_manager.migration_error_handler import MigrationErrorHandler 

10from frappe_manager.migration_manager.migration_exections import ( 

11 MigrationExceptionInBench, 

12) 

13from frappe_manager.migration_manager.migration_helpers import MigrationBench, MigrationBenches 

14from frappe_manager.migration_manager.migration_orchestrator import MigrationOrchestrator 

15from frappe_manager.migration_manager.migration_validator import BenchFilter, MigrationValidator 

16from frappe_manager.migration_manager.version import Version 

17from frappe_manager.output_manager import OutputHandler 

18from frappe_manager.output_manager.rich_output import RichOutputHandler 

19from frappe_manager.utils.helpers import get_current_fm_version 

20 

21 

22def needs_migration(fm_config_manager: FMConfigManager) -> bool: 

23 prev_version = fm_config_manager.version 

24 current_version = Version(get_current_fm_version()) 

25 return prev_version < current_version 

26 

27 

28def needs_fm_infrastructure_migration(fm_config_manager: FMConfigManager) -> bool: 

29 current_version = Version(get_current_fm_version()) 

30 fm_infrastructure_version = fm_config_manager.get_system_migration_version() 

31 return fm_infrastructure_version < current_version 

32 

33 

34def get_benches_needing_migration(benches_directory: Path, current_version: Version) -> list[str]: 

35 from frappe_manager import CLI_BENCH_CONFIG_FILE_NAME 

36 from frappe_manager.migration_manager.bench_migration_state import bench_needs_migration 

37 

38 needs_migration_list = [] 

39 

40 if not benches_directory.exists(): 

41 return needs_migration_list 

42 

43 for bench_path in benches_directory.iterdir(): 

44 if bench_path.is_dir(): 

45 bench_config = bench_path / CLI_BENCH_CONFIG_FILE_NAME 

46 if bench_config.exists(): 

47 if bench_needs_migration(bench_path, current_version): 

48 needs_migration_list.append(bench_path.name) 

49 

50 return needs_migration_list 

51 

52 

53class MigrationExecutor: 

54 """ 

55 Migration executor class. 

56 

57 This class is responsible for executing migrations. 

58 """ 

59 

60 def __init__( 

61 self, 

62 fm_config_manager: FMConfigManager, 

63 skip_backup: bool = False, 

64 skip_backup_for: list[str] | None = None, 

65 exclude_benches: list[str] | None = None, 

66 auto_proceed: bool = False, 

67 rerun: bool = False, 

68 on_failure: str = "prompt", 

69 target_benches: list[str] | None = None, 

70 migrate_fm_infrastructure: bool = False, 

71 output_handler: OutputHandler | None = None, 

72 ): 

73 self.fm_config_manager: FMConfigManager = fm_config_manager 

74 self.rerun = rerun 

75 self.prev_version = self.fm_config_manager.version 

76 self.rollback_version = self.fm_config_manager.version 

77 self.current_version = Version(get_current_fm_version()) 

78 self.migrations_path = Path(__file__).parent / "migrations" 

79 self.logger = log.get_logger() 

80 self.migrations = [] 

81 self.undo_stack = [] 

82 self.migrate_benches = {} 

83 self.skip_backup = skip_backup 

84 self.skip_backup_for = skip_backup_for or [] 

85 self.exclude_benches = exclude_benches or [] 

86 self.auto_proceed = auto_proceed 

87 self.on_failure = on_failure 

88 self.target_benches = target_benches 

89 self.migrate_fm_infrastructure = migrate_fm_infrastructure 

90 self.fm_infrastructure_needs_migration = False 

91 self.output = output_handler or RichOutputHandler() 

92 

93 # Initialize helper classes (composition) 

94 bench_filter = BenchFilter(target_benches=target_benches, exclude_benches=self.exclude_benches) 

95 self.validator = MigrationValidator( 

96 prev_version=self.prev_version, 

97 current_version=self.current_version, 

98 bench_filter=bench_filter, 

99 output_handler=self.output, 

100 ) 

101 self.discovery = MigrationDiscovery(self.migrations_path, output_handler=self.output) 

102 self.orchestrator = MigrationOrchestrator(self) 

103 self.error_handler = MigrationErrorHandler(self) 

104 

105 def _get_minimum_bench_version(self) -> Version: 

106 """Get the minimum migration version across all target benches. 

107 

108 Returns the lowest version that needs migration. This is used to determine 

109 which migration classes need to be loaded. 

110 

111 DEPRECATED: Use validator.get_minimum_bench_version() instead. 

112 """ 

113 return self.validator.get_minimum_bench_version() 

114 

115 def _check_benches_need_migration(self) -> bool: 

116 """Check if any benches need migration to current version. 

117 

118 DEPRECATED: Use validator.check_benches_need_migration() instead. 

119 """ 

120 return self.validator.check_benches_need_migration() 

121 

122 def execute(self): 

123 """ 

124 Execute the migration. 

125 This method will execute the migration and return the number of 

126 executed statements. 

127 """ 

128 

129 fm_infrastructure_version_outdated = self.rerun or (self.prev_version < self.current_version) 

130 fm_infrastructure_needs_migration = self.migrate_fm_infrastructure and fm_infrastructure_version_outdated 

131 self.fm_infrastructure_needs_migration = fm_infrastructure_needs_migration 

132 benches_need_migration = self.rerun or self._check_benches_need_migration() 

133 

134 if not fm_infrastructure_needs_migration and not benches_need_migration: 

135 return True 

136 

137 effective_prev_version = self.prev_version 

138 if benches_need_migration: 

139 min_bench_version = self._get_minimum_bench_version() 

140 effective_prev_version = min(self.prev_version, min_bench_version) 

141 

142 # When --rerun is active, ensure migrations are discovered even when 

143 # prev_version == current_version. The strict ``<`` in discovery 

144 # (``from_version < migration.version``) would otherwise exclude the 

145 # current version's migration class. 

146 # 

147 # We narrow the range to the current base version's minor floor 

148 # (e.g. 0.18.9999 for 0.19.x) rather than ``0.0.0``, so that only 

149 # migrations belonging to the current release are included and old, 

150 # potentially non-idempotent migrations are not re-applied. 

151 if self.rerun and effective_prev_version >= self.current_version: 

152 from packaging.version import Version as PV 

153 

154 parsed = PV(self.current_version.version) 

155 base = parsed.base_version # e.g. "0.19.0" 

156 parts = base.split(".") 

157 floor = Version(f"{parts[0]}.{int(parts[1]) - 1}.9999") 

158 effective_prev_version = min(effective_prev_version, floor) 

159 

160 if effective_prev_version != Version("0.0.0") and effective_prev_version < MINIMUM_SUPPORTED_VERSION: 

161 self.validator.validate_version_support(effective_prev_version) 

162 return False 

163 

164 # Discovery: Load migration classes dynamically 

165 self.migrations = self.discovery.discover_migrations(effective_prev_version, self.current_version, self) 

166 

167 if self.migrations: 

168 if fm_infrastructure_needs_migration: 

169 self.output.print( 

170 f"FM Infrastructure: [yellow]v{self.prev_version}[/yellow] → [green]v{self.current_version}[/green]", 

171 emoji_code="", 

172 ) 

173 self.output.print(" • CLI configuration", emoji_code="") 

174 self.output.print(" • Global services (MariaDB, Nginx-Proxy)", emoji_code="") 

175 

176 if benches_need_migration and self.target_benches: 

177 self.output.print("", emoji_code="") 

178 self.output.print("Benches:", emoji_code="") 

179 benches_manager = MigrationBenches(CLI_BENCHES_DIRECTORY) 

180 all_benches = benches_manager.get_all_benches() 

181 

182 for bench_name in self.target_benches: 

183 if bench_name in self.exclude_benches: 

184 continue 

185 

186 if bench_name in all_benches: 

187 bench_path = all_benches[bench_name].parent 

188 bench_version = get_bench_migration_version(bench_path) 

189 

190 if bench_version < self.current_version: 

191 self.output.print( 

192 f"{bench_name}: [yellow]v{bench_version}[/yellow] → [green]v{self.current_version}[/green]", 

193 emoji_code="", 

194 ) 

195 

196 self.output.print("", emoji_code="") 

197 

198 self.output.print("Migration versions:", emoji_code="") 

199 for migration in self.migrations: 

200 self.output.print(f" • v{migration.version}", emoji_code="") 

201 

202 self.output.print("", emoji_code="") 

203 self.output.print("This process may take a while.", emoji_code="") 

204 self.output.print( 

205 "Manual guide: https://github.com/rtCamp/Frappe-Manager/wiki/Migrations#manual-migration-procedure", 

206 emoji_code="", 

207 ) 

208 

209 self.output.print("", emoji_code="") 

210 

211 if self.target_benches: 

212 benches_manager = MigrationBenches(CLI_BENCHES_DIRECTORY) 

213 all_benches = benches_manager.get_all_benches() 

214 running = [] 

215 for bench_name in self.target_benches: 

216 if bench_name in all_benches: 

217 bench_path = all_benches[bench_name].parent 

218 bench = MigrationBench(bench_name, bench_path, output=self.output) 

219 if bench.running or bench.workers_running: 

220 running.append(bench_name) 

221 if running: 

222 self.output.warning( 

223 f"The following target benches are currently running and will be restarted (containers recreated) during migration: {', '.join(running)}", 

224 ) 

225 self.output.print( 

226 "If you'd prefer no disruption, stop these benches (fm stop <bench>) and re-run migration.", 

227 ) 

228 self.output.print("", emoji_code="") 

229 

230 if not self.auto_proceed: 

231 continue_migration = self.output.prompt_ask( 

232 prompt="Do you want to proceed?", 

233 choices=[ 

234 {"name": "yes - Start migration", "value": "yes"}, 

235 {"name": "no - Abort and revert to previous fm version", "value": "no"}, 

236 ], 

237 required_flag="--auto-proceed", 

238 ) 

239 else: 

240 continue_migration = "yes" 

241 self.output.print("Proceeding with migration (--auto-proceed)", emoji_code="") 

242 

243 if continue_migration == "no": 

244 self.output.print("", emoji_code="") 

245 self.output.print( 

246 f"Migration aborted. To revert to v{self.prev_version.version!s}, run:", 

247 emoji_code="", 

248 ) 

249 self.output.print(f" uv tool install frappe-manager=={self.prev_version.version!s}", emoji_code="") 

250 self.output.print("", emoji_code="") 

251 return False 

252 

253 # Orchestration: Execute migrations with error handling 

254 try: 

255 self.orchestrator.execute_migrations() 

256 self.undo_stack = self.orchestrator.undo_stack 

257 self.error_handler.finalize_success() 

258 return True 

259 

260 except MigrationExceptionInBench as e: 

261 return self.error_handler.handle_bench_migration_failure(e) 

262 

263 except Exception as e: 

264 return self.error_handler.handle_system_migration_failure(e) 

265 

266 def set_bench_data( 

267 self, 

268 bench: MigrationBench, 

269 exception=None, 

270 migration_version: Version | None = None, 

271 traceback_str: str | None = None, 

272 ): 

273 self.migrate_benches[bench.name] = { 

274 "object": bench, 

275 "exception": exception, 

276 "last_migration_version": migration_version, 

277 "traceback": traceback_str, 

278 } 

279 

280 def get_site_data(self, bench_name): 

281 """Get migration data for a specific bench.""" 

282 try: 

283 data = self.migrate_benches[bench_name] 

284 except KeyError as e: 

285 return None 

286 return data 

287 

288 def rollback(self): 

289 """ 

290 Rollback the migration. 

291 

292 DEPRECATED: Use orchestrator.rollback_migrations() instead. 

293 This method is kept for backward compatibility. 

294 """ 

295 self.orchestrator.undo_stack = self.undo_stack 

296 self.orchestrator.rollback_migrations()