Coverage for src/dataknobs_data/migration_v2/progress.py: 80%
98 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:29 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:29 -0500
1"""
2Migration progress tracking, separate from migration logic.
3"""
5import time
6from dataclasses import dataclass, field
7from typing import Any, Dict, List, Optional
10@dataclass
11class MigrationProgress:
12 """
13 Track migration progress and statistics.
15 Provides a clean separation between migration logic and progress tracking,
16 allowing for flexible reporting without cluttering the migration code.
17 """
19 total: int = 0
20 processed: int = 0
21 succeeded: int = 0
22 failed: int = 0
23 skipped: int = 0
24 errors: List[Dict[str, Any]] = field(default_factory=list)
25 warnings: List[str] = field(default_factory=list)
26 start_time: Optional[float] = None
27 end_time: Optional[float] = None
28 metadata: Dict[str, Any] = field(default_factory=dict)
30 def start(self) -> 'MigrationProgress':
31 """
32 Mark migration as started.
34 Returns:
35 Self for chaining
36 """
37 self.start_time = time.time()
38 return self
40 def finish(self) -> 'MigrationProgress':
41 """
42 Mark migration as finished.
44 Returns:
45 Self for chaining
46 """
47 self.end_time = time.time()
48 return self
50 @property
51 def duration(self) -> float:
52 """
53 Get migration duration in seconds.
55 Returns:
56 Duration in seconds, or 0 if not started
57 """
58 if self.start_time is None:
59 return 0.0
61 end = self.end_time if self.end_time else time.time()
62 return end - self.start_time
64 @property
65 def percent(self) -> float:
66 """
67 Get completion percentage.
69 Returns:
70 Percentage complete (0-100)
71 """
72 if self.total == 0:
73 return 0.0
74 return (self.processed / self.total) * 100
76 @property
77 def success_rate(self) -> float:
78 """
79 Get success rate as percentage.
81 Returns:
82 Success rate (0-100)
83 """
84 if self.processed == 0:
85 return 0.0
86 return (self.succeeded / self.processed) * 100
88 @property
89 def is_complete(self) -> bool:
90 """
91 Check if migration is complete.
93 Returns:
94 True if all records have been processed
95 """
96 return self.total > 0 and self.processed >= self.total
98 @property
99 def has_errors(self) -> bool:
100 """
101 Check if migration had any errors.
103 Returns:
104 True if there were any failures
105 """
106 return self.failed > 0 or len(self.errors) > 0
108 def record_success(self, record_id: Optional[str] = None) -> 'MigrationProgress':
109 """
110 Record a successful migration.
112 Args:
113 record_id: Optional ID of successfully migrated record
115 Returns:
116 Self for chaining
117 """
118 self.processed += 1
119 self.succeeded += 1
120 return self
122 def record_failure(
123 self,
124 error: str,
125 record_id: Optional[str] = None,
126 exception: Optional[Exception] = None
127 ) -> 'MigrationProgress':
128 """
129 Record a failed migration.
131 Args:
132 error: Error message
133 record_id: Optional ID of failed record
134 exception: Optional exception that caused failure
136 Returns:
137 Self for chaining
138 """
139 self.processed += 1
140 self.failed += 1
142 error_info = {
143 "error": error,
144 "record_id": record_id,
145 "timestamp": time.time()
146 }
148 if exception:
149 error_info["exception"] = str(exception)
150 error_info["exception_type"] = type(exception).__name__
152 self.errors.append(error_info)
153 return self
155 def record_skip(self, reason: str, record_id: Optional[str] = None) -> 'MigrationProgress':
156 """
157 Record a skipped record.
159 Args:
160 reason: Reason for skipping
161 record_id: Optional ID of skipped record
163 Returns:
164 Self for chaining
165 """
166 self.processed += 1
167 self.skipped += 1
168 self.warnings.append(f"Skipped record {record_id}: {reason}")
169 return self
171 def add_warning(self, warning: str) -> 'MigrationProgress':
172 """
173 Add a warning message.
175 Args:
176 warning: Warning message
178 Returns:
179 Self for chaining
180 """
181 self.warnings.append(warning)
182 return self
184 def set_metadata(self, key: str, value: Any) -> 'MigrationProgress':
185 """
186 Store metadata about the migration.
188 Args:
189 key: Metadata key
190 value: Metadata value
192 Returns:
193 Self for chaining
194 """
195 self.metadata[key] = value
196 return self
198 def merge(self, other: 'MigrationProgress') -> 'MigrationProgress':
199 """
200 Merge another progress object into this one.
202 Useful for combining progress from parallel migrations.
204 Args:
205 other: Another MigrationProgress to merge
207 Returns:
208 Self for chaining
209 """
210 self.total += other.total
211 self.processed += other.processed
212 self.succeeded += other.succeeded
213 self.failed += other.failed
214 self.skipped += other.skipped
215 self.errors.extend(other.errors)
216 self.warnings.extend(other.warnings)
218 # Merge metadata
219 for key, value in other.metadata.items():
220 if key in self.metadata:
221 # Combine values if both exist
222 if isinstance(self.metadata[key], list):
223 if isinstance(value, list):
224 self.metadata[key].extend(value)
225 else:
226 self.metadata[key].append(value)
227 else:
228 # Convert to list if merging different values
229 self.metadata[key] = [self.metadata[key], value]
230 else:
231 self.metadata[key] = value
233 return self
235 def get_summary(self) -> str:
236 """
237 Get a human-readable summary of the migration progress.
239 Returns:
240 Summary string
241 """
242 lines = [
243 f"Migration Progress: {self.percent:.1f}% complete",
244 f"Total: {self.total} | Processed: {self.processed}",
245 f"Succeeded: {self.succeeded} | Failed: {self.failed} | Skipped: {self.skipped}",
246 ]
248 if self.duration > 0:
249 rate = self.processed / self.duration if self.duration > 0 else 0
250 lines.append(f"Duration: {self.duration:.2f}s | Rate: {rate:.1f} records/s")
252 if self.has_errors:
253 lines.append(f"Errors: {len(self.errors)}")
255 if self.warnings:
256 lines.append(f"Warnings: {len(self.warnings)}")
258 return "\n".join(lines)
260 def to_dict(self) -> Dict[str, Any]:
261 """
262 Convert progress to dictionary for serialization.
264 Returns:
265 Dictionary representation
266 """
267 return {
268 "total": self.total,
269 "processed": self.processed,
270 "succeeded": self.succeeded,
271 "failed": self.failed,
272 "skipped": self.skipped,
273 "percent": self.percent,
274 "success_rate": self.success_rate,
275 "duration": self.duration,
276 "errors": self.errors,
277 "warnings": self.warnings,
278 "metadata": self.metadata,
279 "is_complete": self.is_complete,
280 "has_errors": self.has_errors,
281 }
283 def __str__(self) -> str:
284 """Human-readable string representation."""
285 return self.get_summary()
287 def __repr__(self) -> str:
288 """Developer-friendly representation."""
289 return (
290 f"MigrationProgress(total={self.total}, processed={self.processed}, "
291 f"succeeded={self.succeeded}, failed={self.failed})"
292 )