Coverage for frappe_manager / migration_manager / backup_manager.py: 81%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1import shutil 

2import time 

3from dataclasses import dataclass 

4from datetime import datetime 

5from pathlib import Path 

6from typing import ClassVar 

7 

8from frappe_manager import CLI_BENCHES_DIRECTORY, CLI_DIR 

9from frappe_manager.logger import log 

10from frappe_manager.migration_manager.migration_constants import TIMESTAMP_COLLISION_RETRY_DELAY_SECONDS 

11 

12 

13@dataclass 

14class BackupData: 

15 src: Path 

16 dest: Path 

17 bench: str | None = None 

18 bench_path: Path | None = None 

19 prefix_timestamp: bool = False 

20 allow_restore: bool = True 

21 _is_restored: bool = False 

22 

23 _used_timestamps: ClassVar[set[str]] = set() 

24 

25 @property 

26 def is_restored(self) -> bool: 

27 return self._is_restored 

28 

29 @is_restored.setter 

30 def is_restored(self, v: bool) -> None: 

31 self._is_restored = v 

32 

33 def __post_init__(self): 

34 file_name = self.dest.name 

35 

36 if self.prefix_timestamp: 

37 file_name = self._generate_unique_timestamp_filename() 

38 

39 self.real_dest = self.dest.parent 

40 self.real_dest: Path = self.real_dest / file_name 

41 

42 def _generate_unique_timestamp_filename(self) -> str: 

43 base_name = self.dest.name 

44 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S") 

45 

46 while timestamp in BackupData._used_timestamps: 

47 time.sleep(TIMESTAMP_COLLISION_RETRY_DELAY_SECONDS) 

48 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S-%f")[:23] 

49 

50 BackupData._used_timestamps.add(timestamp) 

51 return f"{base_name}-{timestamp}" 

52 

53 def exists(self): 

54 return self.dest.exists() 

55 

56 

57CLI_MIGARATIONS_DIR = CLI_DIR / "backups" 

58 

59 

60class BackupManager: 

61 _active_sessions: ClassVar[set[str]] = set() 

62 

63 def __init__( 

64 self, 

65 name: str, 

66 backup_group_name: str = "migrations", 

67 benches_dir: Path = CLI_BENCHES_DIRECTORY, 

68 backup_dir: Path = CLI_MIGARATIONS_DIR, 

69 ): 

70 self.name = name 

71 self.backup_group_name = backup_group_name 

72 self.migration_timestamp = self._generate_unique_session_timestamp() 

73 self.root_backup_dir: Path = backup_dir / backup_group_name / self.migration_timestamp 

74 self.benches_dir: Path = benches_dir 

75 self.backup_dir: Path = self.root_backup_dir / self.name 

76 self.bench_backup_dir: Path = Path("backups") / backup_group_name / self.migration_timestamp 

77 self.backups = [] 

78 self.new_files = [] # Track newly created files for cleanup on rollback 

79 self.logger = log.get_logger() 

80 

81 self.backup_dir.mkdir(parents=True, exist_ok=True) 

82 

83 def _generate_unique_session_timestamp(self) -> str: 

84 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S") 

85 

86 while timestamp in BackupManager._active_sessions: 

87 time.sleep(TIMESTAMP_COLLISION_RETRY_DELAY_SECONDS) 

88 timestamp = datetime.now().strftime("%d-%b-%y--%H-%M-%S-%f")[:23] 

89 

90 BackupManager._active_sessions.add(timestamp) 

91 return timestamp 

92 

93 def backup( 

94 self, 

95 src: Path, 

96 dest: Path | None = None, 

97 bench_name: str | None = None, 

98 allow_restore: bool = True, 

99 ): 

100 if not src.exists(): 

101 return None 

102 

103 backup_dest = dest 

104 if not backup_dest: 

105 backup_dest = self.backup_dir / src.name 

106 

107 if bench_name: 

108 backup_dest = self.benches_dir / bench_name / self.bench_backup_dir / self.name / src.name 

109 

110 if self.name == self.backup_group_name: 

111 backup_dest = self.benches_dir / bench_name / self.bench_backup_dir / src.name 

112 

113 backup_data = BackupData(src, backup_dest, bench=bench_name, allow_restore=allow_restore) 

114 

115 if not backup_data.real_dest.parent.exists(): 

116 backup_data.real_dest.parent.mkdir(parents=True, exist_ok=True) 

117 

118 self.logger.debug(f"Backup: {backup_data.src} => {backup_data.real_dest} ") 

119 

120 if src.is_dir(): 

121 shutil.copytree(backup_data.src, backup_data.real_dest) 

122 else: 

123 shutil.copy2(backup_data.src, backup_data.real_dest) 

124 

125 self.backups.append(backup_data) 

126 

127 return backup_data 

128 

129 def restore(self, backup_data, force=False): 

130 """ 

131 Restore a file from a backup. 

132 """ 

133 if not backup_data.allow_restore: 

134 return None 

135 

136 if not backup_data.real_dest.exists(): 

137 # print(f"No backup found at {backup_data.real_dest}") 

138 return None 

139 

140 if force: 

141 self.logger.debug(f"Restore: {backup_data.real_dest} => {backup_data.src} ") 

142 if backup_data.src.exists(): 

143 if backup_data.src.is_dir(): 

144 shutil.rmtree(backup_data.src) 

145 else: 

146 backup_data.src.unlink() 

147 

148 dest = shutil.copy(backup_data.real_dest, backup_data.src) 

149 

150 backup_data.is_restored = True 

151 

152 return dest 

153 # print(f"Restored {backup_data.src} from backup") 

154 

155 def delete(self, backup_data): 

156 """ 

157 Delete a specific backup. 

158 """ 

159 if not backup_data.real_dest.exists(): 

160 # print(f"No backup found at {backup_data.real_dest}") 

161 return 

162 

163 shutil.rmtree(backup_data.real_dest) 

164 

165 self.backups.remove(backup_data) 

166 # print(f"Deleted backup at {backup_data.real_dest}") 

167 

168 def delete_all(self): 

169 """ 

170 Delete all backups. 

171 """ 

172 for backup_data in self.backups: 

173 if backup_data.real_dest.exists(): 

174 shutil.rmtree(backup_data.real_dest) 

175 # print(f"Deleted backup at {backup_data.real_dest}") 

176 

177 self.backups.clear() 

178 # print("Deleted all backups") 

179 

180 def track_new_file(self, filepath: Path): 

181 """ 

182 Track a newly created file so it can be cleaned up during rollback. 

183 

184 Args: 

185 filepath: Path to the newly created file. 

186 """ 

187 if filepath.exists(): 

188 self.new_files.append(filepath) 

189 self.logger.debug(f"Tracked new file for rollback cleanup: {filepath}") 

190 

191 def cleanup_new_files(self): 

192 """ 

193 Delete all tracked newly created files (used during rollback). 

194 """ 

195 for filepath in self.new_files: 

196 if filepath.exists(): 

197 if filepath.is_dir(): 

198 shutil.rmtree(filepath) 

199 else: 

200 filepath.unlink() 

201 self.logger.debug(f"Cleaned up new file: {filepath}") 

202 

203 self.new_files.clear()