Coverage for frappe_manager / migration_manager / migration_base.py: 31%

192 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1from abc import ABC 

2from logging import Logger 

3from pathlib import Path 

4 

5from frappe_manager import CLI_DIR 

6from frappe_manager.logger import log 

7from frappe_manager.migration_manager.backup_manager import BackupManager 

8from frappe_manager.migration_manager.bench_migration_state import get_bench_migration_version 

9from frappe_manager.migration_manager.migration_constants import DOCKER_COMPOSE_DOWN_TIMEOUT_SECONDS 

10from frappe_manager.migration_manager.migration_exections import ( 

11 MigrationExceptionInBench, 

12) 

13from frappe_manager.migration_manager.migration_helpers import ( 

14 MigrationBench, 

15 MigrationBenches, 

16 MigrationServicesManager, 

17) 

18from frappe_manager.migration_manager.version import Version 

19from frappe_manager.output_manager import OutputHandler 

20from frappe_manager.output_manager.rich_output import RichOutputHandler 

21from frappe_manager.services_manager.database_service_manager import DatabaseServerServiceInfo, MariaDBManager 

22from frappe_manager.utils.helpers import capture_and_format_exception 

23 

24 

25class MigrationBase(ABC): 

26 version: Version = Version("0.0.0") 

27 benches_dir: Path = CLI_DIR / "sites" 

28 skip: bool = False 

29 migration_executor = None 

30 logger: Logger = log.get_logger() 

31 

32 def __init__(self, output_handler: OutputHandler | None = None): 

33 self.output = output_handler or RichOutputHandler() 

34 

35 from frappe_manager.utils.helpers import get_current_fm_version, get_docker_image_tag 

36 

37 self._current_fm_version = get_current_fm_version() 

38 self.is_dev_environment = self._detect_dev_environment() 

39 self.effective_image_tag = get_docker_image_tag() 

40 

41 def _detect_dev_environment(self) -> bool: 

42 from packaging.version import Version as PV 

43 

44 parsed = PV(self._current_fm_version) 

45 return parsed.is_devrelease or parsed.is_prerelease 

46 

47 def _get_image_tag_for_migration(self) -> str: 

48 if self.is_dev_environment: 

49 tag = self.effective_image_tag 

50 self.logger.info(f"Dev environment detected: using image tag {tag}") 

51 return tag 

52 else: 

53 tag = self.version.version_string() 

54 self.logger.info(f"Stable environment: using image tag {tag}") 

55 return tag 

56 

57 def init(self): 

58 self.backup_manager = BackupManager(name=str(self.version), benches_dir=self.benches_dir) 

59 self.benches_manager = MigrationBenches(self.benches_dir) 

60 self.services_manager: MigrationServicesManager = MigrationServicesManager(services_path=CLI_DIR / "services") 

61 

62 def set_migration_executor(self, migration_executor): 

63 self.migration_executor = migration_executor 

64 if hasattr(migration_executor, "output"): 

65 self.output = migration_executor.output 

66 

67 def get_rollback_version(self): 

68 return self.version 

69 

70 def up(self): 

71 if self.skip: 

72 return True 

73 

74 self.output.print(f"[bold][blue]Migration for v{self.version!s}[/blue][/bold]", emoji_code=":package:") 

75 self.logger.info(f"v{self.version!s}: Started") 

76 self.logger.info("-" * 40) 

77 

78 self.init() 

79 

80 if self.migration_executor and self.migration_executor.fm_infrastructure_needs_migration: 

81 self.services_basic_backup() 

82 self.migrate_services() 

83 

84 self.migrate_benches() 

85 

86 self.logger.info("-" * 40) 

87 

88 def down(self): 

89 self.output.change_head(f"Working on v{self.version!s} rollback") 

90 self.logger.info("-" * 40) 

91 

92 # undo each bench 

93 for bench_name, bench_data in self.migration_executor.migrate_benches.items(): 

94 if not bench_data["exception"]: 

95 self.undo_bench_migrate(bench_data["object"]) 

96 

97 for backup in self.backup_manager.backups: 

98 self.backup_manager.restore(backup, force=True) 

99 # self.output.print(f'Restored {backup.bench}'s {backup.src.name}.') 

100 

101 # Clean up newly created files that didn't exist before migration 

102 self.backup_manager.cleanup_new_files() 

103 

104 self.undo_services_migrate() 

105 

106 self.output.print(f"[bold]v{self.version!s}[/bold] rollback successful") 

107 self.logger.info("-" * 40) 

108 

109 def services_basic_backup(self): 

110 if not self.services_manager.compose_file_manager.exists(): 

111 raise MigrationExceptionInBench( 

112 f"Services compose at {self.services_manager.compose_file_manager} not found.", 

113 ) 

114 self.backup_manager.backup(self.services_manager.compose_file_manager.compose_path) 

115 

116 def migrate_services(self): 

117 pass 

118 

119 def undo_services_migrate(self): 

120 pass 

121 

122 def migrate_benches(self): 

123 main_error = False 

124 

125 all_benches = self.benches_manager.get_all_benches() 

126 

127 # migrate each bench 

128 for bench_name, bench_path in all_benches.items(): 

129 is_infrastructure_only_migration = self.migration_executor.target_benches is None 

130 if is_infrastructure_only_migration: 

131 continue 

132 

133 is_bench_not_targeted = bench_name not in self.migration_executor.target_benches 

134 if is_bench_not_targeted: 

135 continue 

136 

137 if bench_name in self.migration_executor.exclude_benches: 

138 self.output.print(f"Skipping {bench_name} (--exclude-bench)", emoji_code="") 

139 continue 

140 

141 bench = MigrationBench(name=bench_name, path=bench_path.parent, output=self.output) 

142 

143 bench_version = get_bench_migration_version(bench.path) 

144 

145 # Check if bench is already at or above this migration version. 

146 # --rerun overrides this so the migration runs regardless. 

147 if not self.migration_executor.rerun and bench_version >= self.version: 

148 self.output.print( 

149 f"Bench {bench_name} already at v{bench_version}, skipping migration to v{self.version}", 

150 ) 

151 continue 

152 

153 if bench.name in self.migration_executor.migrate_benches.keys(): 

154 bench_info = self.migration_executor.migrate_benches[bench.name] 

155 if bench_info["exception"]: 

156 self.output.print(f"Skipping migration for failed bench [blue]{bench.name}[/blue]") 

157 main_error = True 

158 continue 

159 

160 self.migration_executor.set_bench_data(bench, migration_version=self.version) 

161 try: 

162 self.bench_basic_backup(bench) 

163 self.migrate_bench(bench) 

164 except Exception as e: 

165 traceback_str = capture_and_format_exception() 

166 self.logger.error(f"{bench.name} [ EXCEPTION TRACEBACK ]:\n {traceback_str}") 

167 self.output.update_live() 

168 main_error = True 

169 self.migration_executor.set_bench_data(bench, e, self.version) 

170 

171 # restore all backup files 

172 for backup in self.backup_manager.backups: 

173 if backup.bench == bench.name: 

174 self.backup_manager.restore(backup, force=True) 

175 

176 self.undo_bench_migrate(bench) 

177 self.logger.info(f"Undo successfull for bench: {bench.name}") 

178 

179 try: 

180 output = bench.docker.compose.down( 

181 remove_orphans=True, 

182 volumes=False, 

183 timeout=DOCKER_COMPOSE_DOWN_TIMEOUT_SECONDS, 

184 stream=True, 

185 ) 

186 except Exception: 

187 pass 

188 

189 if main_error: 

190 raise MigrationExceptionInBench("") 

191 

192 def bench_basic_backup(self, bench: MigrationBench): 

193 self.output.print(f"Migrating bench [bold][blue]{bench.name}[/blue][/bold]") 

194 

195 if self.migration_executor.skip_backup: 

196 self.output.warning(f"Skipping backup for {bench.name}") 

197 return 

198 

199 if bench.name in self.migration_executor.skip_backup_for: 

200 self.output.warning(f"Skipping backup for {bench.name}") 

201 return 

202 

203 bench_config_path = bench.path / "bench_config.toml" 

204 if bench_config_path.exists(): 

205 self.backup_manager.backup(bench_config_path, bench_name=bench.name) 

206 

207 self.backup_manager.backup(bench.path / "docker-compose.yml", bench_name=bench.name) 

208 

209 bench_common_site_config = bench.path / "workspace" / "frappe-bench" / "sites" / "common_site_config.json" 

210 self.backup_manager.backup(bench_common_site_config, bench_name=bench.name) 

211 

212 bench_site_config = bench.path / "workspace" / "frappe-bench" / "sites" / bench.name / "site_config.json" 

213 self.backup_manager.backup(bench_site_config, bench_name=bench.name) 

214 

215 bench_db_info = DatabaseServerServiceInfo.import_from_bench( 

216 bench_path=bench.path, 

217 bench_name=bench.name, 

218 raise_exception=False, 

219 ) 

220 

221 self.bench_db_backup( 

222 bench=bench, 

223 db_info=bench_db_info, 

224 bench_docker=bench.docker, 

225 bench_compose_file=bench.compose_file_manager, 

226 backup_manager=self.backup_manager, 

227 ) 

228 

229 def migrate_bench(self, bench: MigrationBench): 

230 pass 

231 

232 def undo_bench_migrate(self, bench: MigrationBench): 

233 pass 

234 

235 def _resolve_database_name(self, bench: MigrationBench, db_info: DatabaseServerServiceInfo) -> str | None: 

236 if db_info.name: 

237 return db_info.name 

238 

239 bench_config_path = bench.path / "bench_config.toml" 

240 if bench_config_path.exists(): 

241 try: 

242 import tomlkit 

243 

244 with open(bench_config_path) as f: 

245 bench_config = tomlkit.parse(f.read()) 

246 db_name = bench_config.get("db_name") 

247 if db_name: 

248 return db_name 

249 except Exception as e: 

250 self.output.warning(f"Failed to read db_name from bench_config.toml: {e}") 

251 

252 return None 

253 

254 def bench_db_backup( 

255 self, 

256 bench: MigrationBench, 

257 db_info: DatabaseServerServiceInfo, 

258 bench_docker, 

259 bench_compose_file, 

260 backup_manager: BackupManager, 

261 ): 

262 self.output.change_head(f"Taking {bench.name} db backup") 

263 

264 db_name = self._resolve_database_name(bench, db_info) 

265 

266 if not db_name: 

267 self.output.warning( 

268 f"Could not determine database name for {bench.name}.\n" 

269 f"Checked: site_config.json, db_info, bench_config.toml.", 

270 ) 

271 

272 skip_backup_prompt = [ 

273 "Database backup will be skipped for this bench.", 

274 "Do you want to continue migration without database backup?", 

275 ] 

276 

277 user_choice = self.output.prompt_ask( 

278 prompt="\n".join(skip_backup_prompt), 

279 choices=["yes", "no"], 

280 required_flag="--skip-all-backup or --skip-backup-for <bench>", 

281 ) 

282 

283 if user_choice == "no": 

284 self.output.display_error(f"User chose to abort migration for {bench.name}") 

285 raise MigrationExceptionInBench( 

286 f"Migration aborted for {bench.name}: Unable to determine database name. " 

287 f"User declined to skip database backup.", 

288 ) 

289 

290 self.output.warning(f"Skipping database backup for {bench.name}") 

291 return 

292 

293 mariadb_manager = MariaDBManager(db_info, bench_compose_file, bench_docker, run_on_compose_service="frappe") 

294 

295 from datetime import datetime 

296 

297 current_datetime = datetime.now() 

298 formatted_date = current_datetime.strftime("%d-%m-%Y--%H-%M-%S") 

299 

300 host_backup_dir: Path = bench.path / "workspace" / ".cache" 

301 db_sql_file_name = f"db-{bench.name}-{formatted_date}.sql" 

302 

303 host_db_sql_file_path: Path = host_backup_dir / db_sql_file_name 

304 container_db_sql_file_path: Path = Path("/workspace") / ".cache" / db_sql_file_name 

305 

306 backup_gz_file_backup_data_path: Path = ( 

307 bench.path / backup_manager.bench_backup_dir / self.version.version / f"{db_sql_file_name}.gz" 

308 ) 

309 

310 mariadb_manager.db_export(db_name, container_db_sql_file_path) 

311 

312 import gzip 

313 import shutil 

314 

315 # Compress the file using gzip 

316 with open(host_db_sql_file_path, "rb") as f_in: 

317 with gzip.open(backup_gz_file_backup_data_path, "wb") as f_out: 

318 shutil.copyfileobj(f_in, f_out) 

319 

320 host_db_sql_file_path.unlink() 

321 

322 self.output.print(f"[blue]{bench.name}[/blue] db backup completed successfully.")