Coverage for frappe_manager / migration_manager / backup_manager.py: 81%
118 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1import shutil
2import time
3from dataclasses import dataclass
4from datetime import datetime
5from pathlib import Path
6from typing import ClassVar
8from frappe_manager import CLI_BENCHES_DIRECTORY, CLI_DIR
9from frappe_manager.logger import log
10from frappe_manager.migration_manager.migration_constants import TIMESTAMP_COLLISION_RETRY_DELAY_SECONDS
13@dataclass
14class BackupData:
15 src: Path
16 dest: Path
17 bench: str | None = None
18 bench_path: Path | None = None
19 prefix_timestamp: bool = False
20 allow_restore: bool = True
21 _is_restored: bool = False
23 _used_timestamps: ClassVar[set[str]] = set()
25 @property
26 def is_restored(self) -> bool:
27 return self._is_restored
29 @is_restored.setter
30 def is_restored(self, v: bool) -> None:
31 self._is_restored = v
33 def __post_init__(self):
34 file_name = self.dest.name
36 if self.prefix_timestamp:
37 file_name = self._generate_unique_timestamp_filename()
39 self.real_dest = self.dest.parent
40 self.real_dest: Path = self.real_dest / file_name
42 def _generate_unique_timestamp_filename(self) -> str:
43 base_name = self.dest.name
44 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S")
46 while timestamp in BackupData._used_timestamps:
47 time.sleep(TIMESTAMP_COLLISION_RETRY_DELAY_SECONDS)
48 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S-%f")[:23]
50 BackupData._used_timestamps.add(timestamp)
51 return f"{base_name}-{timestamp}"
53 def exists(self):
54 return self.dest.exists()
57CLI_MIGARATIONS_DIR = CLI_DIR / "backups"
60class BackupManager:
61 _active_sessions: ClassVar[set[str]] = set()
63 def __init__(
64 self,
65 name: str,
66 backup_group_name: str = "migrations",
67 benches_dir: Path = CLI_BENCHES_DIRECTORY,
68 backup_dir: Path = CLI_MIGARATIONS_DIR,
69 ):
70 self.name = name
71 self.backup_group_name = backup_group_name
72 self.migration_timestamp = self._generate_unique_session_timestamp()
73 self.root_backup_dir: Path = backup_dir / backup_group_name / self.migration_timestamp
74 self.benches_dir: Path = benches_dir
75 self.backup_dir: Path = self.root_backup_dir / self.name
76 self.bench_backup_dir: Path = Path("backups") / backup_group_name / self.migration_timestamp
77 self.backups = []
78 self.new_files = [] # Track newly created files for cleanup on rollback
79 self.logger = log.get_logger()
81 self.backup_dir.mkdir(parents=True, exist_ok=True)
83 def _generate_unique_session_timestamp(self) -> str:
84 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S")
86 while timestamp in BackupManager._active_sessions:
87 time.sleep(TIMESTAMP_COLLISION_RETRY_DELAY_SECONDS)
88 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S-%f")[:23]
90 BackupManager._active_sessions.add(timestamp)
91 return timestamp
93 def backup(
94 self,
95 src: Path,
96 dest: Path | None = None,
97 bench_name: str | None = None,
98 allow_restore: bool = True,
99 ):
100 if not src.exists():
101 return None
103 backup_dest = dest
104 if not backup_dest:
105 backup_dest = self.backup_dir / src.name
107 if bench_name:
108 backup_dest = self.benches_dir / bench_name / self.bench_backup_dir / self.name / src.name
110 if self.name == self.backup_group_name:
111 backup_dest = self.benches_dir / bench_name / self.bench_backup_dir / src.name
113 backup_data = BackupData(src, backup_dest, bench=bench_name, allow_restore=allow_restore)
115 if not backup_data.real_dest.parent.exists():
116 backup_data.real_dest.parent.mkdir(parents=True, exist_ok=True)
118 self.logger.debug(f"Backup: {backup_data.src} => {backup_data.real_dest} ")
120 if src.is_dir():
121 shutil.copytree(backup_data.src, backup_data.real_dest)
122 else:
123 shutil.copy2(backup_data.src, backup_data.real_dest)
125 self.backups.append(backup_data)
127 return backup_data
129 def restore(self, backup_data, force=False):
130 """
131 Restore a file from a backup.
132 """
133 if not backup_data.allow_restore:
134 return None
136 if not backup_data.real_dest.exists():
137 # print(f"No backup found at {backup_data.real_dest}")
138 return None
140 if force:
141 self.logger.debug(f"Restore: {backup_data.real_dest} => {backup_data.src} ")
142 if backup_data.src.exists():
143 if backup_data.src.is_dir():
144 shutil.rmtree(backup_data.src)
145 else:
146 backup_data.src.unlink()
148 dest = shutil.copy(backup_data.real_dest, backup_data.src)
150 backup_data.is_restored = True
152 return dest
153 # print(f"Restored {backup_data.src} from backup")
155 def delete(self, backup_data):
156 """
157 Delete a specific backup.
158 """
159 if not backup_data.real_dest.exists():
160 # print(f"No backup found at {backup_data.real_dest}")
161 return
163 shutil.rmtree(backup_data.real_dest)
165 self.backups.remove(backup_data)
166 # print(f"Deleted backup at {backup_data.real_dest}")
168 def delete_all(self):
169 """
170 Delete all backups.
171 """
172 for backup_data in self.backups:
173 if backup_data.real_dest.exists():
174 shutil.rmtree(backup_data.real_dest)
175 # print(f"Deleted backup at {backup_data.real_dest}")
177 self.backups.clear()
178 # print("Deleted all backups")
180 def track_new_file(self, filepath: Path):
181 """
182 Track a newly created file so it can be cleaned up during rollback.
184 Args:
185 filepath: Path to the newly created file.
186 """
187 if filepath.exists():
188 self.new_files.append(filepath)
189 self.logger.debug(f"Tracked new file for rollback cleanup: {filepath}")
191 def cleanup_new_files(self):
192 """
193 Delete all tracked newly created files (used during rollback).
194 """
195 for filepath in self.new_files:
196 if filepath.exists():
197 if filepath.is_dir():
198 shutil.rmtree(filepath)
199 else:
200 filepath.unlink()
201 self.logger.debug(f"Cleaned up new file: {filepath}")
203 self.new_files.clear()