Coverage for src / dataknobs_data / migration / progress.py: 37%
99 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""Migration progress tracking, separate from migration logic.
2"""
4from __future__ import annotations
6import time
7from dataclasses import dataclass, field
8from typing import Any
11@dataclass
12class MigrationProgress:
13 """Track migration progress and statistics.
15 Provides a clean separation between migration logic and progress tracking,
16 allowing for flexible reporting without cluttering the migration code.
17 """
19 total: int = 0
20 processed: int = 0
21 succeeded: int = 0
22 failed: int = 0
23 skipped: int = 0
24 errors: list[dict[str, Any]] = field(default_factory=list)
25 warnings: list[str] = field(default_factory=list)
26 start_time: float | None = None
27 end_time: float | None = None
28 metadata: dict[str, Any] = field(default_factory=dict)
30 def start(self) -> MigrationProgress:
31 """Mark migration as started.
33 Returns:
34 Self for chaining
35 """
36 self.start_time = time.time()
37 return self
39 def finish(self) -> MigrationProgress:
40 """Mark migration as finished.
42 Returns:
43 Self for chaining
44 """
45 self.end_time = time.time()
46 return self
48 @property
49 def duration(self) -> float:
50 """Get migration duration in seconds.
52 Returns:
53 Duration in seconds, or 0 if not started
54 """
55 if self.start_time is None:
56 return 0.0
58 end = self.end_time if self.end_time else time.time()
59 return end - self.start_time
61 @property
62 def percent(self) -> float:
63 """Get completion percentage.
65 Returns:
66 Percentage complete (0-100)
67 """
68 if self.total == 0:
69 return 0.0
70 return (self.processed / self.total) * 100
72 @property
73 def success_rate(self) -> float:
74 """Get success rate as percentage.
76 Returns:
77 Success rate (0-100)
78 """
79 if self.processed == 0:
80 return 0.0
81 return (self.succeeded / self.processed) * 100
83 @property
84 def is_complete(self) -> bool:
85 """Check if migration is complete.
87 Returns:
88 True if all records have been processed
89 """
90 return self.total > 0 and self.processed >= self.total
92 @property
93 def has_errors(self) -> bool:
94 """Check if migration had any errors.
96 Returns:
97 True if there were any failures
98 """
99 return self.failed > 0 or len(self.errors) > 0
101 def record_success(self, record_id: str | None = None) -> MigrationProgress:
102 """Record a successful migration.
104 Args:
105 record_id: Optional ID of successfully migrated record
107 Returns:
108 Self for chaining
109 """
110 self.processed += 1
111 self.succeeded += 1
112 return self
114 def record_failure(
115 self,
116 error: str,
117 record_id: str | None = None,
118 exception: Exception | None = None
119 ) -> MigrationProgress:
120 """Record a failed migration.
122 Args:
123 error: Error message
124 record_id: Optional ID of failed record
125 exception: Optional exception that caused failure
127 Returns:
128 Self for chaining
129 """
130 self.processed += 1
131 self.failed += 1
133 error_info = {
134 "error": error,
135 "record_id": record_id,
136 "timestamp": time.time()
137 }
139 if exception:
140 error_info["exception"] = str(exception)
141 error_info["exception_type"] = type(exception).__name__
143 self.errors.append(error_info)
144 return self
146 def record_skip(self, reason: str, record_id: str | None = None) -> MigrationProgress:
147 """Record a skipped record.
149 Args:
150 reason: Reason for skipping
151 record_id: Optional ID of skipped record
153 Returns:
154 Self for chaining
155 """
156 self.processed += 1
157 self.skipped += 1
158 self.warnings.append(f"Skipped record {record_id}: {reason}")
159 return self
161 def add_warning(self, warning: str) -> MigrationProgress:
162 """Add a warning message.
164 Args:
165 warning: Warning message
167 Returns:
168 Self for chaining
169 """
170 self.warnings.append(warning)
171 return self
173 def set_metadata(self, key: str, value: Any) -> MigrationProgress:
174 """Store metadata about the migration.
176 Args:
177 key: Metadata key
178 value: Metadata value
180 Returns:
181 Self for chaining
182 """
183 self.metadata[key] = value
184 return self
186 def merge(self, other: MigrationProgress) -> MigrationProgress:
187 """Merge another progress object into this one.
189 Useful for combining progress from parallel migrations.
191 Args:
192 other: Another MigrationProgress to merge
194 Returns:
195 Self for chaining
196 """
197 self.total += other.total
198 self.processed += other.processed
199 self.succeeded += other.succeeded
200 self.failed += other.failed
201 self.skipped += other.skipped
202 self.errors.extend(other.errors)
203 self.warnings.extend(other.warnings)
205 # Merge metadata
206 for key, value in other.metadata.items():
207 if key in self.metadata:
208 # Combine values if both exist
209 if isinstance(self.metadata[key], list):
210 if isinstance(value, list):
211 self.metadata[key].extend(value)
212 else:
213 self.metadata[key].append(value)
214 else:
215 # Convert to list if merging different values
216 self.metadata[key] = [self.metadata[key], value]
217 else:
218 self.metadata[key] = value
220 return self
222 def get_summary(self) -> str:
223 """Get a human-readable summary of the migration progress.
225 Returns:
226 Summary string
227 """
228 lines = [
229 f"Migration Progress: {self.percent:.1f}% complete",
230 f"Total: {self.total} | Processed: {self.processed}",
231 f"Succeeded: {self.succeeded} | Failed: {self.failed} | Skipped: {self.skipped}",
232 ]
234 if self.duration > 0:
235 rate = self.processed / self.duration if self.duration > 0 else 0
236 lines.append(f"Duration: {self.duration:.2f}s | Rate: {rate:.1f} records/s")
238 if self.has_errors:
239 lines.append(f"Errors: {len(self.errors)}")
241 if self.warnings:
242 lines.append(f"Warnings: {len(self.warnings)}")
244 return "\n".join(lines)
246 def to_dict(self) -> dict[str, Any]:
247 """Convert progress to dictionary for serialization.
249 Returns:
250 Dictionary representation
251 """
252 return {
253 "total": self.total,
254 "processed": self.processed,
255 "succeeded": self.succeeded,
256 "failed": self.failed,
257 "skipped": self.skipped,
258 "percent": self.percent,
259 "success_rate": self.success_rate,
260 "duration": self.duration,
261 "errors": self.errors,
262 "warnings": self.warnings,
263 "metadata": self.metadata,
264 "is_complete": self.is_complete,
265 "has_errors": self.has_errors,
266 }
268 def __str__(self) -> str:
269 """Human-readable string representation."""
270 return self.get_summary()
272 def __repr__(self) -> str:
273 """Developer-friendly representation."""
274 return (
275 f"MigrationProgress(total={self.total}, processed={self.processed}, "
276 f"succeeded={self.succeeded}, failed={self.failed})"
277 )