Coverage for src / tracekit / batch / metrics.py: 97%
140 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Batch job performance metrics collection and export.
3This module provides comprehensive metrics collection for batch processing jobs
4including throughput, timing, error statistics, and export capabilities.
7Example:
8 >>> from tracekit.batch.metrics import BatchMetrics
9 >>> metrics = BatchMetrics(batch_id="job-001")
10 >>> metrics.start()
11 >>> metrics.record_file("file1.wfm", duration=0.5, samples=100000)
12 >>> metrics.record_file("file2.wfm", duration=0.3, samples=50000)
13 >>> summary = metrics.summary()
14 >>> metrics.export_json("metrics.json")
16References:
17"""
19from __future__ import annotations
21import csv
22import json
23import statistics
24import threading
25import time
26from dataclasses import dataclass, field
27from datetime import UTC, datetime
28from pathlib import Path
29from typing import Any
31from tracekit.core.logging import format_timestamp, get_logger
33logger = get_logger(__name__)
36@dataclass
37class FileMetrics:
38 """Metrics for a single file in a batch job.
40 Attributes:
41 filename: Path to the file.
42 start_time: Processing start time (epoch seconds).
43 end_time: Processing end time (epoch seconds).
44 duration: Processing duration in seconds.
45 samples: Number of samples processed.
46 measurements: Number of measurements computed.
47 status: Processing status (success, error, skipped).
48 error_type: Error type if status is 'error'.
49 error_message: Error message if status is 'error'.
50 memory_peak: Peak memory usage in bytes (if tracked).
52 References:
53 LOG-012: Batch Job Performance Metrics
54 """
56 filename: str
57 start_time: float = 0.0
58 end_time: float = 0.0
59 duration: float = 0.0
60 samples: int = 0
61 measurements: int = 0
62 status: str = "pending"
63 error_type: str | None = None
64 error_message: str | None = None
65 memory_peak: int | None = None
67 def to_dict(self) -> dict[str, Any]:
68 """Convert to dictionary representation."""
69 return {
70 "filename": self.filename,
71 "start_time": format_timestamp(datetime.fromtimestamp(self.start_time, tz=UTC))
72 if self.start_time
73 else None,
74 "end_time": format_timestamp(datetime.fromtimestamp(self.end_time, tz=UTC))
75 if self.end_time
76 else None,
77 "duration_seconds": self.duration,
78 "samples": self.samples,
79 "measurements": self.measurements,
80 "status": self.status,
81 "error_type": self.error_type,
82 "error_message": self.error_message,
83 "memory_peak_bytes": self.memory_peak,
84 "samples_per_second": self.samples / self.duration if self.duration > 0 else 0,
85 }
88@dataclass
89class ErrorBreakdown:
90 """Breakdown of errors by type.
92 Attributes:
93 by_type: Count of errors grouped by error type.
94 total: Total number of errors.
95 rate: Error rate as percentage.
97 References:
98 LOG-012: Error breakdown requirement
99 """
101 by_type: dict[str, int] = field(default_factory=dict)
102 total: int = 0
103 rate: float = 0.0
105 def to_dict(self) -> dict[str, Any]:
106 """Convert to dictionary representation."""
107 return {
108 "by_type": self.by_type,
109 "total": self.total,
110 "rate_percent": round(self.rate * 100, 2),
111 }
114@dataclass
115class TimingStats:
116 """Timing statistics for batch processing.
118 Attributes:
119 total_duration: Total wall-clock duration in seconds.
120 average_per_file: Average processing time per file.
121 min_per_file: Minimum processing time per file.
122 max_per_file: Maximum processing time per file.
123 median_per_file: Median processing time per file.
124 stddev_per_file: Standard deviation of processing times.
126 References:
127 LOG-012: Timing metrics requirement
128 """
130 total_duration: float = 0.0
131 average_per_file: float = 0.0
132 min_per_file: float = 0.0
133 max_per_file: float = 0.0
134 median_per_file: float = 0.0
135 stddev_per_file: float = 0.0
137 def to_dict(self) -> dict[str, Any]:
138 """Convert to dictionary representation."""
139 return {
140 "total_duration_seconds": round(self.total_duration, 3),
141 "average_per_file_seconds": round(self.average_per_file, 3),
142 "min_per_file_seconds": round(self.min_per_file, 3),
143 "max_per_file_seconds": round(self.max_per_file, 3),
144 "median_per_file_seconds": round(self.median_per_file, 3),
145 "stddev_per_file_seconds": round(self.stddev_per_file, 3),
146 }
149@dataclass
150class ThroughputStats:
151 """Throughput statistics for batch processing.
153 Attributes:
154 files_per_second: Processing rate in files per second.
155 samples_per_second: Processing rate in samples per second.
156 measurements_per_second: Rate of measurements computed per second.
157 bytes_per_second: Data processing rate (if tracked).
159 References:
160 LOG-012: Throughput metrics requirement
161 """
163 files_per_second: float = 0.0
164 samples_per_second: float = 0.0
165 measurements_per_second: float = 0.0
166 bytes_per_second: float = 0.0
168 def to_dict(self) -> dict[str, Any]:
169 """Convert to dictionary representation."""
170 return {
171 "files_per_second": round(self.files_per_second, 3),
172 "samples_per_second": round(self.samples_per_second, 0),
173 "measurements_per_second": round(self.measurements_per_second, 0),
174 "bytes_per_second": round(self.bytes_per_second, 0),
175 }
178@dataclass
179class BatchMetricsSummary:
180 """Complete summary of batch processing metrics.
182 Attributes:
183 batch_id: Unique batch job identifier.
184 total_files: Total number of files in the batch.
185 processed_count: Number of successfully processed files.
186 error_count: Number of files with errors.
187 skip_count: Number of skipped files.
188 timing: Timing statistics.
189 throughput: Throughput statistics.
190 errors: Error breakdown.
191 start_time: Batch start time (ISO 8601).
192 end_time: Batch end time (ISO 8601).
194 References:
195 LOG-012: Batch Job Performance Metrics
196 """
198 batch_id: str
199 total_files: int
200 processed_count: int
201 error_count: int
202 skip_count: int
203 timing: TimingStats
204 throughput: ThroughputStats
205 errors: ErrorBreakdown
206 start_time: str
207 end_time: str
209 def to_dict(self) -> dict[str, Any]:
210 """Convert to dictionary representation."""
211 return {
212 "batch_id": self.batch_id,
213 "total_files": self.total_files,
214 "processed_count": self.processed_count,
215 "error_count": self.error_count,
216 "skip_count": self.skip_count,
217 "success_rate_percent": round(
218 (self.processed_count / self.total_files * 100) if self.total_files > 0 else 0, 2
219 ),
220 "timing": self.timing.to_dict(),
221 "throughput": self.throughput.to_dict(),
222 "errors": self.errors.to_dict(),
223 "start_time": self.start_time,
224 "end_time": self.end_time,
225 }
228class BatchMetrics:
229 """Batch job performance metrics collector.
231 Collects and aggregates performance metrics for batch processing jobs
232 including throughput, timing, and error statistics.
234 Example:
235 >>> metrics = BatchMetrics(batch_id="analysis-001")
236 >>> metrics.start()
237 >>> for file in files:
238 ... metrics.record_file(file, duration=0.5, samples=100000)
239 >>> metrics.finish()
240 >>> summary = metrics.summary()
241 >>> metrics.export_json("metrics.json")
243 References:
244 LOG-012: Batch Job Performance Metrics
245 """
247 def __init__(self, batch_id: str | None = None) -> None:
248 """Initialize batch metrics collector.
250 Args:
251 batch_id: Unique batch job identifier. Auto-generated if None.
252 """
253 import uuid
255 self.batch_id = batch_id or str(uuid.uuid4())
256 self._files: list[FileMetrics] = []
257 self._lock = threading.Lock()
258 self._start_time: float | None = None
259 self._end_time: float | None = None
260 self._error_types: dict[str, int] = {}
262 def start(self) -> None:
263 """Mark batch job as started.
265 Records the start time for duration calculation.
266 """
267 self._start_time = time.time()
268 logger.info(
269 "Batch metrics collection started",
270 extra={"batch_id": self.batch_id},
271 )
273 def finish(self) -> None:
274 """Mark batch job as finished.
276 Records the end time and logs the completion summary.
277 """
278 self._end_time = time.time()
279 summary = self.summary()
280 logger.info(
281 "Batch metrics collection finished",
282 extra={
283 "batch_id": self.batch_id,
284 "total_files": summary.total_files,
285 "processed": summary.processed_count,
286 "errors": summary.error_count,
287 "duration": summary.timing.total_duration,
288 },
289 )
291 def record_file(
292 self,
293 filename: str,
294 *,
295 duration: float,
296 samples: int = 0,
297 measurements: int = 0,
298 status: str = "success",
299 error_type: str | None = None,
300 error_message: str | None = None,
301 memory_peak: int | None = None,
302 ) -> None:
303 """Record metrics for a processed file.
305 Args:
306 filename: Path to the processed file.
307 duration: Processing duration in seconds.
308 samples: Number of samples processed.
309 measurements: Number of measurements computed.
310 status: Processing status (success, error, skipped).
311 error_type: Error type if status is 'error'.
312 error_message: Error message if status is 'error'.
313 memory_peak: Peak memory usage in bytes.
315 References:
316 LOG-012: Per-file metrics tracking
317 """
318 now = time.time()
319 file_metrics = FileMetrics(
320 filename=filename,
321 start_time=now - duration,
322 end_time=now,
323 duration=duration,
324 samples=samples,
325 measurements=measurements,
326 status=status,
327 error_type=error_type,
328 error_message=error_message,
329 memory_peak=memory_peak,
330 )
332 with self._lock:
333 self._files.append(file_metrics)
334 if status == "error" and error_type:
335 self._error_types[error_type] = self._error_types.get(error_type, 0) + 1
337 def record_error(
338 self,
339 filename: str,
340 error_type: str,
341 error_message: str,
342 duration: float = 0.0,
343 ) -> None:
344 """Record a file processing error.
346 Args:
347 filename: Path to the file that failed.
348 error_type: Type of error (e.g., "FileNotFoundError").
349 error_message: Detailed error message.
350 duration: Processing duration before error.
352 References:
353 LOG-012: Error tracking
354 """
355 self.record_file(
356 filename,
357 duration=duration,
358 status="error",
359 error_type=error_type,
360 error_message=error_message,
361 )
363 def record_skip(self, filename: str, reason: str = "") -> None:
364 """Record a skipped file.
366 Args:
367 filename: Path to the skipped file.
368 reason: Reason for skipping.
370 References:
371 LOG-012: Skip count tracking
372 """
373 self.record_file(
374 filename,
375 duration=0.0,
376 status="skipped",
377 error_message=reason,
378 )
380 def summary(self) -> BatchMetricsSummary:
381 """Generate batch processing metrics summary.
383 Returns:
384 BatchMetricsSummary with aggregated statistics.
386 References:
387 LOG-012: Batch Job Performance Metrics
388 """
389 with self._lock:
390 files = list(self._files)
391 error_types = dict(self._error_types)
393 # Count by status
394 processed_count = sum(1 for f in files if f.status == "success")
395 error_count = sum(1 for f in files if f.status == "error")
396 skip_count = sum(1 for f in files if f.status == "skipped")
397 total_files = len(files)
399 # Collect durations for successful files
400 durations = [f.duration for f in files if f.status == "success" and f.duration > 0]
402 # Calculate timing stats
403 if durations:
404 timing = TimingStats(
405 total_duration=self._end_time - self._start_time
406 if self._start_time and self._end_time
407 else sum(durations),
408 average_per_file=statistics.mean(durations),
409 min_per_file=min(durations),
410 max_per_file=max(durations),
411 median_per_file=statistics.median(durations),
412 stddev_per_file=statistics.stdev(durations) if len(durations) > 1 else 0.0,
413 )
414 else:
415 timing = TimingStats(
416 total_duration=self._end_time - self._start_time
417 if self._start_time and self._end_time
418 else 0.0
419 )
421 # Calculate throughput stats
422 total_samples = sum(f.samples for f in files if f.status == "success")
423 total_measurements = sum(f.measurements for f in files if f.status == "success")
425 if timing.total_duration > 0: 425 ↛ 432line 425 didn't jump to line 432 because the condition on line 425 was always true
426 throughput = ThroughputStats(
427 files_per_second=processed_count / timing.total_duration,
428 samples_per_second=total_samples / timing.total_duration,
429 measurements_per_second=total_measurements / timing.total_duration,
430 )
431 else:
432 throughput = ThroughputStats()
434 # Error breakdown
435 errors = ErrorBreakdown(
436 by_type=error_types,
437 total=error_count,
438 rate=error_count / total_files if total_files > 0 else 0.0,
439 )
441 # Timestamps
442 start_time = (
443 format_timestamp(datetime.fromtimestamp(self._start_time, tz=UTC))
444 if self._start_time
445 else ""
446 )
447 end_time = (
448 format_timestamp(datetime.fromtimestamp(self._end_time, tz=UTC))
449 if self._end_time
450 else ""
451 )
453 return BatchMetricsSummary(
454 batch_id=self.batch_id,
455 total_files=total_files,
456 processed_count=processed_count,
457 error_count=error_count,
458 skip_count=skip_count,
459 timing=timing,
460 throughput=throughput,
461 errors=errors,
462 start_time=start_time,
463 end_time=end_time,
464 )
466 def get_file_metrics(self) -> list[dict[str, Any]]:
467 """Get metrics for all files.
469 Returns:
470 List of file metric dictionaries.
471 """
472 with self._lock:
473 return [f.to_dict() for f in self._files]
475 def export_json(self, path: str | Path) -> None:
476 """Export metrics to JSON file.
478 Args:
479 path: Output file path.
481 References:
482 LOG-012: Export batch metrics as JSON
483 """
484 path = Path(path)
485 summary = self.summary()
487 output = {
488 "summary": summary.to_dict(),
489 "files": self.get_file_metrics(),
490 }
492 with open(path, "w") as f:
493 json.dump(output, f, indent=2, default=str)
495 logger.info(f"Batch metrics exported to {path}")
497 def export_csv(self, path: str | Path) -> None:
498 """Export per-file metrics to CSV file.
500 Args:
501 path: Output file path.
503 References:
504 LOG-012: Export batch metrics as CSV
505 """
506 path = Path(path)
507 files = self.get_file_metrics()
509 if not files: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true
510 logger.warning("No file metrics to export")
511 return
513 # Get all keys from first file
514 fieldnames = list(files[0].keys())
516 with open(path, "w", newline="") as f:
517 writer = csv.DictWriter(f, fieldnames=fieldnames)
518 writer.writeheader()
519 writer.writerows(files)
521 logger.info(f"Batch metrics CSV exported to {path}")
524def get_batch_stats(batch_id: str, metrics: BatchMetrics) -> dict[str, Any]:
525 """Get statistics for a batch job.
527 CLI command implementation: tracekit batch stats <batch_id>
529 Args:
530 batch_id: Batch job identifier.
531 metrics: BatchMetrics instance.
533 Returns:
534 Dictionary with batch statistics.
536 Raises:
537 ValueError: If batch ID does not match the metrics instance.
539 References:
540 LOG-012: CLI command requirement
541 """
542 if metrics.batch_id != batch_id:
543 raise ValueError(f"Batch ID mismatch: expected {batch_id}, got {metrics.batch_id}")
545 return metrics.summary().to_dict()
548__all__ = [
549 "BatchMetrics",
550 "BatchMetricsSummary",
551 "ErrorBreakdown",
552 "FileMetrics",
553 "ThroughputStats",
554 "TimingStats",
555 "get_batch_stats",
556]