Coverage for src/dataknobs_data/migration_v2/progress.py: 80%

98 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:29 -0500

1""" 

2Migration progress tracking, separate from migration logic. 

3""" 

4 

5import time 

6from dataclasses import dataclass, field 

7from typing import Any, Dict, List, Optional 

8 

9 

10@dataclass 

11class MigrationProgress: 

12 """ 

13 Track migration progress and statistics. 

14  

15 Provides a clean separation between migration logic and progress tracking, 

16 allowing for flexible reporting without cluttering the migration code. 

17 """ 

18 

19 total: int = 0 

20 processed: int = 0 

21 succeeded: int = 0 

22 failed: int = 0 

23 skipped: int = 0 

24 errors: List[Dict[str, Any]] = field(default_factory=list) 

25 warnings: List[str] = field(default_factory=list) 

26 start_time: Optional[float] = None 

27 end_time: Optional[float] = None 

28 metadata: Dict[str, Any] = field(default_factory=dict) 

29 

30 def start(self) -> 'MigrationProgress': 

31 """ 

32 Mark migration as started. 

33  

34 Returns: 

35 Self for chaining 

36 """ 

37 self.start_time = time.time() 

38 return self 

39 

40 def finish(self) -> 'MigrationProgress': 

41 """ 

42 Mark migration as finished. 

43  

44 Returns: 

45 Self for chaining 

46 """ 

47 self.end_time = time.time() 

48 return self 

49 

50 @property 

51 def duration(self) -> float: 

52 """ 

53 Get migration duration in seconds. 

54  

55 Returns: 

56 Duration in seconds, or 0 if not started 

57 """ 

58 if self.start_time is None: 

59 return 0.0 

60 

61 end = self.end_time if self.end_time else time.time() 

62 return end - self.start_time 

63 

64 @property 

65 def percent(self) -> float: 

66 """ 

67 Get completion percentage. 

68  

69 Returns: 

70 Percentage complete (0-100) 

71 """ 

72 if self.total == 0: 

73 return 0.0 

74 return (self.processed / self.total) * 100 

75 

76 @property 

77 def success_rate(self) -> float: 

78 """ 

79 Get success rate as percentage. 

80  

81 Returns: 

82 Success rate (0-100) 

83 """ 

84 if self.processed == 0: 

85 return 0.0 

86 return (self.succeeded / self.processed) * 100 

87 

88 @property 

89 def is_complete(self) -> bool: 

90 """ 

91 Check if migration is complete. 

92  

93 Returns: 

94 True if all records have been processed 

95 """ 

96 return self.total > 0 and self.processed >= self.total 

97 

98 @property 

99 def has_errors(self) -> bool: 

100 """ 

101 Check if migration had any errors. 

102  

103 Returns: 

104 True if there were any failures 

105 """ 

106 return self.failed > 0 or len(self.errors) > 0 

107 

108 def record_success(self, record_id: Optional[str] = None) -> 'MigrationProgress': 

109 """ 

110 Record a successful migration. 

111  

112 Args: 

113 record_id: Optional ID of successfully migrated record 

114  

115 Returns: 

116 Self for chaining 

117 """ 

118 self.processed += 1 

119 self.succeeded += 1 

120 return self 

121 

122 def record_failure( 

123 self, 

124 error: str, 

125 record_id: Optional[str] = None, 

126 exception: Optional[Exception] = None 

127 ) -> 'MigrationProgress': 

128 """ 

129 Record a failed migration. 

130  

131 Args: 

132 error: Error message 

133 record_id: Optional ID of failed record 

134 exception: Optional exception that caused failure 

135  

136 Returns: 

137 Self for chaining 

138 """ 

139 self.processed += 1 

140 self.failed += 1 

141 

142 error_info = { 

143 "error": error, 

144 "record_id": record_id, 

145 "timestamp": time.time() 

146 } 

147 

148 if exception: 

149 error_info["exception"] = str(exception) 

150 error_info["exception_type"] = type(exception).__name__ 

151 

152 self.errors.append(error_info) 

153 return self 

154 

155 def record_skip(self, reason: str, record_id: Optional[str] = None) -> 'MigrationProgress': 

156 """ 

157 Record a skipped record. 

158  

159 Args: 

160 reason: Reason for skipping 

161 record_id: Optional ID of skipped record 

162  

163 Returns: 

164 Self for chaining 

165 """ 

166 self.processed += 1 

167 self.skipped += 1 

168 self.warnings.append(f"Skipped record {record_id}: {reason}") 

169 return self 

170 

171 def add_warning(self, warning: str) -> 'MigrationProgress': 

172 """ 

173 Add a warning message. 

174  

175 Args: 

176 warning: Warning message 

177  

178 Returns: 

179 Self for chaining 

180 """ 

181 self.warnings.append(warning) 

182 return self 

183 

184 def set_metadata(self, key: str, value: Any) -> 'MigrationProgress': 

185 """ 

186 Store metadata about the migration. 

187  

188 Args: 

189 key: Metadata key 

190 value: Metadata value 

191  

192 Returns: 

193 Self for chaining 

194 """ 

195 self.metadata[key] = value 

196 return self 

197 

198 def merge(self, other: 'MigrationProgress') -> 'MigrationProgress': 

199 """ 

200 Merge another progress object into this one. 

201  

202 Useful for combining progress from parallel migrations. 

203  

204 Args: 

205 other: Another MigrationProgress to merge 

206  

207 Returns: 

208 Self for chaining 

209 """ 

210 self.total += other.total 

211 self.processed += other.processed 

212 self.succeeded += other.succeeded 

213 self.failed += other.failed 

214 self.skipped += other.skipped 

215 self.errors.extend(other.errors) 

216 self.warnings.extend(other.warnings) 

217 

218 # Merge metadata 

219 for key, value in other.metadata.items(): 

220 if key in self.metadata: 

221 # Combine values if both exist 

222 if isinstance(self.metadata[key], list): 

223 if isinstance(value, list): 

224 self.metadata[key].extend(value) 

225 else: 

226 self.metadata[key].append(value) 

227 else: 

228 # Convert to list if merging different values 

229 self.metadata[key] = [self.metadata[key], value] 

230 else: 

231 self.metadata[key] = value 

232 

233 return self 

234 

235 def get_summary(self) -> str: 

236 """ 

237 Get a human-readable summary of the migration progress. 

238  

239 Returns: 

240 Summary string 

241 """ 

242 lines = [ 

243 f"Migration Progress: {self.percent:.1f}% complete", 

244 f"Total: {self.total} | Processed: {self.processed}", 

245 f"Succeeded: {self.succeeded} | Failed: {self.failed} | Skipped: {self.skipped}", 

246 ] 

247 

248 if self.duration > 0: 

249 rate = self.processed / self.duration if self.duration > 0 else 0 

250 lines.append(f"Duration: {self.duration:.2f}s | Rate: {rate:.1f} records/s") 

251 

252 if self.has_errors: 

253 lines.append(f"Errors: {len(self.errors)}") 

254 

255 if self.warnings: 

256 lines.append(f"Warnings: {len(self.warnings)}") 

257 

258 return "\n".join(lines) 

259 

260 def to_dict(self) -> Dict[str, Any]: 

261 """ 

262 Convert progress to dictionary for serialization. 

263  

264 Returns: 

265 Dictionary representation 

266 """ 

267 return { 

268 "total": self.total, 

269 "processed": self.processed, 

270 "succeeded": self.succeeded, 

271 "failed": self.failed, 

272 "skipped": self.skipped, 

273 "percent": self.percent, 

274 "success_rate": self.success_rate, 

275 "duration": self.duration, 

276 "errors": self.errors, 

277 "warnings": self.warnings, 

278 "metadata": self.metadata, 

279 "is_complete": self.is_complete, 

280 "has_errors": self.has_errors, 

281 } 

282 

283 def __str__(self) -> str: 

284 """Human-readable string representation.""" 

285 return self.get_summary() 

286 

287 def __repr__(self) -> str: 

288 """Developer-friendly representation.""" 

289 return ( 

290 f"MigrationProgress(total={self.total}, processed={self.processed}, " 

291 f"succeeded={self.succeeded}, failed={self.failed})" 

292 )