Coverage for frappe_manager / migration_manager / migration_base.py: 31%
192 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1from abc import ABC
2from logging import Logger
3from pathlib import Path
5from frappe_manager import CLI_DIR
6from frappe_manager.logger import log
7from frappe_manager.migration_manager.backup_manager import BackupManager
8from frappe_manager.migration_manager.bench_migration_state import get_bench_migration_version
9from frappe_manager.migration_manager.migration_constants import DOCKER_COMPOSE_DOWN_TIMEOUT_SECONDS
10from frappe_manager.migration_manager.migration_exections import (
11 MigrationExceptionInBench,
12)
13from frappe_manager.migration_manager.migration_helpers import (
14 MigrationBench,
15 MigrationBenches,
16 MigrationServicesManager,
17)
18from frappe_manager.migration_manager.version import Version
19from frappe_manager.output_manager import OutputHandler
20from frappe_manager.output_manager.rich_output import RichOutputHandler
21from frappe_manager.services_manager.database_service_manager import DatabaseServerServiceInfo, MariaDBManager
22from frappe_manager.utils.helpers import capture_and_format_exception
25class MigrationBase(ABC):
26 version: Version = Version("0.0.0")
27 benches_dir: Path = CLI_DIR / "sites"
28 skip: bool = False
29 migration_executor = None
30 logger: Logger = log.get_logger()
32 def __init__(self, output_handler: OutputHandler | None = None):
33 self.output = output_handler or RichOutputHandler()
35 from frappe_manager.utils.helpers import get_current_fm_version, get_docker_image_tag
37 self._current_fm_version = get_current_fm_version()
38 self.is_dev_environment = self._detect_dev_environment()
39 self.effective_image_tag = get_docker_image_tag()
41 def _detect_dev_environment(self) -> bool:
42 from packaging.version import Version as PV
44 parsed = PV(self._current_fm_version)
45 return parsed.is_devrelease or parsed.is_prerelease
47 def _get_image_tag_for_migration(self) -> str:
48 if self.is_dev_environment:
49 tag = self.effective_image_tag
50 self.logger.info(f"Dev environment detected: using image tag {tag}")
51 return tag
52 else:
53 tag = self.version.version_string()
54 self.logger.info(f"Stable environment: using image tag {tag}")
55 return tag
57 def init(self):
58 self.backup_manager = BackupManager(name=str(self.version), benches_dir=self.benches_dir)
59 self.benches_manager = MigrationBenches(self.benches_dir)
60 self.services_manager: MigrationServicesManager = MigrationServicesManager(services_path=CLI_DIR / "services")
62 def set_migration_executor(self, migration_executor):
63 self.migration_executor = migration_executor
64 if hasattr(migration_executor, "output"):
65 self.output = migration_executor.output
67 def get_rollback_version(self):
68 return self.version
70 def up(self):
71 if self.skip:
72 return True
74 self.output.print(f"[bold][blue]Migration for v{self.version!s}[/blue][/bold]", emoji_code=":package:")
75 self.logger.info(f"v{self.version!s}: Started")
76 self.logger.info("-" * 40)
78 self.init()
80 if self.migration_executor and self.migration_executor.fm_infrastructure_needs_migration:
81 self.services_basic_backup()
82 self.migrate_services()
84 self.migrate_benches()
86 self.logger.info("-" * 40)
88 def down(self):
89 self.output.change_head(f"Working on v{self.version!s} rollback")
90 self.logger.info("-" * 40)
92 # undo each bench
93 for bench_name, bench_data in self.migration_executor.migrate_benches.items():
94 if not bench_data["exception"]:
95 self.undo_bench_migrate(bench_data["object"])
97 for backup in self.backup_manager.backups:
98 self.backup_manager.restore(backup, force=True)
99 # self.output.print(f'Restored {backup.bench}'s {backup.src.name}.')
101 # Clean up newly created files that didn't exist before migration
102 self.backup_manager.cleanup_new_files()
104 self.undo_services_migrate()
106 self.output.print(f"[bold]v{self.version!s}[/bold] rollback successful")
107 self.logger.info("-" * 40)
109 def services_basic_backup(self):
110 if not self.services_manager.compose_file_manager.exists():
111 raise MigrationExceptionInBench(
112 f"Services compose at {self.services_manager.compose_file_manager} not found.",
113 )
114 self.backup_manager.backup(self.services_manager.compose_file_manager.compose_path)
116 def migrate_services(self):
117 pass
119 def undo_services_migrate(self):
120 pass
122 def migrate_benches(self):
123 main_error = False
125 all_benches = self.benches_manager.get_all_benches()
127 # migrate each bench
128 for bench_name, bench_path in all_benches.items():
129 is_infrastructure_only_migration = self.migration_executor.target_benches is None
130 if is_infrastructure_only_migration:
131 continue
133 is_bench_not_targeted = bench_name not in self.migration_executor.target_benches
134 if is_bench_not_targeted:
135 continue
137 if bench_name in self.migration_executor.exclude_benches:
138 self.output.print(f"Skipping {bench_name} (--exclude-bench)", emoji_code="")
139 continue
141 bench = MigrationBench(name=bench_name, path=bench_path.parent, output=self.output)
143 bench_version = get_bench_migration_version(bench.path)
145 # Check if bench is already at or above this migration version.
146 # --rerun overrides this so the migration runs regardless.
147 if not self.migration_executor.rerun and bench_version >= self.version:
148 self.output.print(
149 f"Bench {bench_name} already at v{bench_version}, skipping migration to v{self.version}",
150 )
151 continue
153 if bench.name in self.migration_executor.migrate_benches.keys():
154 bench_info = self.migration_executor.migrate_benches[bench.name]
155 if bench_info["exception"]:
156 self.output.print(f"Skipping migration for failed bench [blue]{bench.name}[/blue]")
157 main_error = True
158 continue
160 self.migration_executor.set_bench_data(bench, migration_version=self.version)
161 try:
162 self.bench_basic_backup(bench)
163 self.migrate_bench(bench)
164 except Exception as e:
165 traceback_str = capture_and_format_exception()
166 self.logger.error(f"{bench.name} [ EXCEPTION TRACEBACK ]:\n {traceback_str}")
167 self.output.update_live()
168 main_error = True
169 self.migration_executor.set_bench_data(bench, e, self.version)
171 # restore all backup files
172 for backup in self.backup_manager.backups:
173 if backup.bench == bench.name:
174 self.backup_manager.restore(backup, force=True)
176 self.undo_bench_migrate(bench)
177 self.logger.info(f"Undo successfull for bench: {bench.name}")
179 try:
180 output = bench.docker.compose.down(
181 remove_orphans=True,
182 volumes=False,
183 timeout=DOCKER_COMPOSE_DOWN_TIMEOUT_SECONDS,
184 stream=True,
185 )
186 except Exception:
187 pass
189 if main_error:
190 raise MigrationExceptionInBench("")
192 def bench_basic_backup(self, bench: MigrationBench):
193 self.output.print(f"Migrating bench [bold][blue]{bench.name}[/blue][/bold]")
195 if self.migration_executor.skip_backup:
196 self.output.warning(f"Skipping backup for {bench.name}")
197 return
199 if bench.name in self.migration_executor.skip_backup_for:
200 self.output.warning(f"Skipping backup for {bench.name}")
201 return
203 bench_config_path = bench.path / "bench_config.toml"
204 if bench_config_path.exists():
205 self.backup_manager.backup(bench_config_path, bench_name=bench.name)
207 self.backup_manager.backup(bench.path / "docker-compose.yml", bench_name=bench.name)
209 bench_common_site_config = bench.path / "workspace" / "frappe-bench" / "sites" / "common_site_config.json"
210 self.backup_manager.backup(bench_common_site_config, bench_name=bench.name)
212 bench_site_config = bench.path / "workspace" / "frappe-bench" / "sites" / bench.name / "site_config.json"
213 self.backup_manager.backup(bench_site_config, bench_name=bench.name)
215 bench_db_info = DatabaseServerServiceInfo.import_from_bench(
216 bench_path=bench.path,
217 bench_name=bench.name,
218 raise_exception=False,
219 )
221 self.bench_db_backup(
222 bench=bench,
223 db_info=bench_db_info,
224 bench_docker=bench.docker,
225 bench_compose_file=bench.compose_file_manager,
226 backup_manager=self.backup_manager,
227 )
229 def migrate_bench(self, bench: MigrationBench):
230 pass
232 def undo_bench_migrate(self, bench: MigrationBench):
233 pass
235 def _resolve_database_name(self, bench: MigrationBench, db_info: DatabaseServerServiceInfo) -> str | None:
236 if db_info.name:
237 return db_info.name
239 bench_config_path = bench.path / "bench_config.toml"
240 if bench_config_path.exists():
241 try:
242 import tomlkit
244 with open(bench_config_path) as f:
245 bench_config = tomlkit.parse(f.read())
246 db_name = bench_config.get("db_name")
247 if db_name:
248 return db_name
249 except Exception as e:
250 self.output.warning(f"Failed to read db_name from bench_config.toml: {e}")
252 return None
254 def bench_db_backup(
255 self,
256 bench: MigrationBench,
257 db_info: DatabaseServerServiceInfo,
258 bench_docker,
259 bench_compose_file,
260 backup_manager: BackupManager,
261 ):
262 self.output.change_head(f"Taking {bench.name} db backup")
264 db_name = self._resolve_database_name(bench, db_info)
266 if not db_name:
267 self.output.warning(
268 f"Could not determine database name for {bench.name}.\n"
269 f"Checked: site_config.json, db_info, bench_config.toml.",
270 )
272 skip_backup_prompt = [
273 "Database backup will be skipped for this bench.",
274 "Do you want to continue migration without database backup?",
275 ]
277 user_choice = self.output.prompt_ask(
278 prompt="\n".join(skip_backup_prompt),
279 choices=["yes", "no"],
280 required_flag="--skip-all-backup or --skip-backup-for <bench>",
281 )
283 if user_choice == "no":
284 self.output.display_error(f"User chose to abort migration for {bench.name}")
285 raise MigrationExceptionInBench(
286 f"Migration aborted for {bench.name}: Unable to determine database name. "
287 f"User declined to skip database backup.",
288 )
290 self.output.warning(f"Skipping database backup for {bench.name}")
291 return
293 mariadb_manager = MariaDBManager(db_info, bench_compose_file, bench_docker, run_on_compose_service="frappe")
295 from datetime import datetime
297 current_datetime = datetime.now()
298 formatted_date = current_datetime.strftime("%d-%m-%Y--%H-%M-%S")
300 host_backup_dir: Path = bench.path / "workspace" / ".cache"
301 db_sql_file_name = f"db-{bench.name}-{formatted_date}.sql"
303 host_db_sql_file_path: Path = host_backup_dir / db_sql_file_name
304 container_db_sql_file_path: Path = Path("/workspace") / ".cache" / db_sql_file_name
306 backup_gz_file_backup_data_path: Path = (
307 bench.path / backup_manager.bench_backup_dir / self.version.version / f"{db_sql_file_name}.gz"
308 )
310 mariadb_manager.db_export(db_name, container_db_sql_file_path)
312 import gzip
313 import shutil
315 # Compress the file using gzip
316 with open(host_db_sql_file_path, "rb") as f_in:
317 with gzip.open(backup_gz_file_backup_data_path, "wb") as f_out:
318 shutil.copyfileobj(f_in, f_out)
320 host_db_sql_file_path.unlink()
322 self.output.print(f"[blue]{bench.name}[/blue] db backup completed successfully.")