Coverage for src/dataknobs_data/migration/progress.py: 37%

99 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:23 -0700

1"""Migration progress tracking, separate from migration logic. 

2""" 

3 

4from __future__ import annotations 

5 

6import time 

7from dataclasses import dataclass, field 

8from typing import Any 

9 

10 

11@dataclass 

12class MigrationProgress: 

13 """Track migration progress and statistics. 

14  

15 Provides a clean separation between migration logic and progress tracking, 

16 allowing for flexible reporting without cluttering the migration code. 

17 """ 

18 

19 total: int = 0 

20 processed: int = 0 

21 succeeded: int = 0 

22 failed: int = 0 

23 skipped: int = 0 

24 errors: list[dict[str, Any]] = field(default_factory=list) 

25 warnings: list[str] = field(default_factory=list) 

26 start_time: float | None = None 

27 end_time: float | None = None 

28 metadata: dict[str, Any] = field(default_factory=dict) 

29 

30 def start(self) -> MigrationProgress: 

31 """Mark migration as started. 

32  

33 Returns: 

34 Self for chaining 

35 """ 

36 self.start_time = time.time() 

37 return self 

38 

39 def finish(self) -> MigrationProgress: 

40 """Mark migration as finished. 

41  

42 Returns: 

43 Self for chaining 

44 """ 

45 self.end_time = time.time() 

46 return self 

47 

48 @property 

49 def duration(self) -> float: 

50 """Get migration duration in seconds. 

51  

52 Returns: 

53 Duration in seconds, or 0 if not started 

54 """ 

55 if self.start_time is None: 

56 return 0.0 

57 

58 end = self.end_time if self.end_time else time.time() 

59 return end - self.start_time 

60 

61 @property 

62 def percent(self) -> float: 

63 """Get completion percentage. 

64  

65 Returns: 

66 Percentage complete (0-100) 

67 """ 

68 if self.total == 0: 

69 return 0.0 

70 return (self.processed / self.total) * 100 

71 

72 @property 

73 def success_rate(self) -> float: 

74 """Get success rate as percentage. 

75  

76 Returns: 

77 Success rate (0-100) 

78 """ 

79 if self.processed == 0: 

80 return 0.0 

81 return (self.succeeded / self.processed) * 100 

82 

83 @property 

84 def is_complete(self) -> bool: 

85 """Check if migration is complete. 

86  

87 Returns: 

88 True if all records have been processed 

89 """ 

90 return self.total > 0 and self.processed >= self.total 

91 

92 @property 

93 def has_errors(self) -> bool: 

94 """Check if migration had any errors. 

95  

96 Returns: 

97 True if there were any failures 

98 """ 

99 return self.failed > 0 or len(self.errors) > 0 

100 

101 def record_success(self, record_id: str | None = None) -> MigrationProgress: 

102 """Record a successful migration. 

103  

104 Args: 

105 record_id: Optional ID of successfully migrated record 

106  

107 Returns: 

108 Self for chaining 

109 """ 

110 self.processed += 1 

111 self.succeeded += 1 

112 return self 

113 

114 def record_failure( 

115 self, 

116 error: str, 

117 record_id: str | None = None, 

118 exception: Exception | None = None 

119 ) -> MigrationProgress: 

120 """Record a failed migration. 

121  

122 Args: 

123 error: Error message 

124 record_id: Optional ID of failed record 

125 exception: Optional exception that caused failure 

126  

127 Returns: 

128 Self for chaining 

129 """ 

130 self.processed += 1 

131 self.failed += 1 

132 

133 error_info = { 

134 "error": error, 

135 "record_id": record_id, 

136 "timestamp": time.time() 

137 } 

138 

139 if exception: 

140 error_info["exception"] = str(exception) 

141 error_info["exception_type"] = type(exception).__name__ 

142 

143 self.errors.append(error_info) 

144 return self 

145 

146 def record_skip(self, reason: str, record_id: str | None = None) -> MigrationProgress: 

147 """Record a skipped record. 

148  

149 Args: 

150 reason: Reason for skipping 

151 record_id: Optional ID of skipped record 

152  

153 Returns: 

154 Self for chaining 

155 """ 

156 self.processed += 1 

157 self.skipped += 1 

158 self.warnings.append(f"Skipped record {record_id}: {reason}") 

159 return self 

160 

161 def add_warning(self, warning: str) -> MigrationProgress: 

162 """Add a warning message. 

163  

164 Args: 

165 warning: Warning message 

166  

167 Returns: 

168 Self for chaining 

169 """ 

170 self.warnings.append(warning) 

171 return self 

172 

173 def set_metadata(self, key: str, value: Any) -> MigrationProgress: 

174 """Store metadata about the migration. 

175  

176 Args: 

177 key: Metadata key 

178 value: Metadata value 

179  

180 Returns: 

181 Self for chaining 

182 """ 

183 self.metadata[key] = value 

184 return self 

185 

186 def merge(self, other: MigrationProgress) -> MigrationProgress: 

187 """Merge another progress object into this one. 

188  

189 Useful for combining progress from parallel migrations. 

190  

191 Args: 

192 other: Another MigrationProgress to merge 

193  

194 Returns: 

195 Self for chaining 

196 """ 

197 self.total += other.total 

198 self.processed += other.processed 

199 self.succeeded += other.succeeded 

200 self.failed += other.failed 

201 self.skipped += other.skipped 

202 self.errors.extend(other.errors) 

203 self.warnings.extend(other.warnings) 

204 

205 # Merge metadata 

206 for key, value in other.metadata.items(): 

207 if key in self.metadata: 

208 # Combine values if both exist 

209 if isinstance(self.metadata[key], list): 

210 if isinstance(value, list): 

211 self.metadata[key].extend(value) 

212 else: 

213 self.metadata[key].append(value) 

214 else: 

215 # Convert to list if merging different values 

216 self.metadata[key] = [self.metadata[key], value] 

217 else: 

218 self.metadata[key] = value 

219 

220 return self 

221 

222 def get_summary(self) -> str: 

223 """Get a human-readable summary of the migration progress. 

224  

225 Returns: 

226 Summary string 

227 """ 

228 lines = [ 

229 f"Migration Progress: {self.percent:.1f}% complete", 

230 f"Total: {self.total} | Processed: {self.processed}", 

231 f"Succeeded: {self.succeeded} | Failed: {self.failed} | Skipped: {self.skipped}", 

232 ] 

233 

234 if self.duration > 0: 

235 rate = self.processed / self.duration if self.duration > 0 else 0 

236 lines.append(f"Duration: {self.duration:.2f}s | Rate: {rate:.1f} records/s") 

237 

238 if self.has_errors: 

239 lines.append(f"Errors: {len(self.errors)}") 

240 

241 if self.warnings: 

242 lines.append(f"Warnings: {len(self.warnings)}") 

243 

244 return "\n".join(lines) 

245 

246 def to_dict(self) -> dict[str, Any]: 

247 """Convert progress to dictionary for serialization. 

248  

249 Returns: 

250 Dictionary representation 

251 """ 

252 return { 

253 "total": self.total, 

254 "processed": self.processed, 

255 "succeeded": self.succeeded, 

256 "failed": self.failed, 

257 "skipped": self.skipped, 

258 "percent": self.percent, 

259 "success_rate": self.success_rate, 

260 "duration": self.duration, 

261 "errors": self.errors, 

262 "warnings": self.warnings, 

263 "metadata": self.metadata, 

264 "is_complete": self.is_complete, 

265 "has_errors": self.has_errors, 

266 } 

267 

268 def __str__(self) -> str: 

269 """Human-readable string representation.""" 

270 return self.get_summary() 

271 

272 def __repr__(self) -> str: 

273 """Developer-friendly representation.""" 

274 return ( 

275 f"MigrationProgress(total={self.total}, processed={self.processed}, " 

276 f"succeeded={self.succeeded}, failed={self.failed})" 

277 )