Coverage for src / tracekit / batch / metrics.py: 97%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Batch job performance metrics collection and export. 

2 

3This module provides comprehensive metrics collection for batch processing jobs 

4including throughput, timing, error statistics, and export capabilities. 

5 

6 

7Example: 

8 >>> from tracekit.batch.metrics import BatchMetrics 

9 >>> metrics = BatchMetrics(batch_id="job-001") 

10 >>> metrics.start() 

11 >>> metrics.record_file("file1.wfm", duration=0.5, samples=100000) 

12 >>> metrics.record_file("file2.wfm", duration=0.3, samples=50000) 

13 >>> summary = metrics.summary() 

14 >>> metrics.export_json("metrics.json") 

15 

16References: 

17""" 

18 

19from __future__ import annotations 

20 

21import csv 

22import json 

23import statistics 

24import threading 

25import time 

26from dataclasses import dataclass, field 

27from datetime import UTC, datetime 

28from pathlib import Path 

29from typing import Any 

30 

31from tracekit.core.logging import format_timestamp, get_logger 

32 

33logger = get_logger(__name__) 

34 

35 

36@dataclass 

37class FileMetrics: 

38 """Metrics for a single file in a batch job. 

39 

40 Attributes: 

41 filename: Path to the file. 

42 start_time: Processing start time (epoch seconds). 

43 end_time: Processing end time (epoch seconds). 

44 duration: Processing duration in seconds. 

45 samples: Number of samples processed. 

46 measurements: Number of measurements computed. 

47 status: Processing status (success, error, skipped). 

48 error_type: Error type if status is 'error'. 

49 error_message: Error message if status is 'error'. 

50 memory_peak: Peak memory usage in bytes (if tracked). 

51 

52 References: 

53 LOG-012: Batch Job Performance Metrics 

54 """ 

55 

56 filename: str 

57 start_time: float = 0.0 

58 end_time: float = 0.0 

59 duration: float = 0.0 

60 samples: int = 0 

61 measurements: int = 0 

62 status: str = "pending" 

63 error_type: str | None = None 

64 error_message: str | None = None 

65 memory_peak: int | None = None 

66 

67 def to_dict(self) -> dict[str, Any]: 

68 """Convert to dictionary representation.""" 

69 return { 

70 "filename": self.filename, 

71 "start_time": format_timestamp(datetime.fromtimestamp(self.start_time, tz=UTC)) 

72 if self.start_time 

73 else None, 

74 "end_time": format_timestamp(datetime.fromtimestamp(self.end_time, tz=UTC)) 

75 if self.end_time 

76 else None, 

77 "duration_seconds": self.duration, 

78 "samples": self.samples, 

79 "measurements": self.measurements, 

80 "status": self.status, 

81 "error_type": self.error_type, 

82 "error_message": self.error_message, 

83 "memory_peak_bytes": self.memory_peak, 

84 "samples_per_second": self.samples / self.duration if self.duration > 0 else 0, 

85 } 

86 

87 

88@dataclass 

89class ErrorBreakdown: 

90 """Breakdown of errors by type. 

91 

92 Attributes: 

93 by_type: Count of errors grouped by error type. 

94 total: Total number of errors. 

95 rate: Error rate as percentage. 

96 

97 References: 

98 LOG-012: Error breakdown requirement 

99 """ 

100 

101 by_type: dict[str, int] = field(default_factory=dict) 

102 total: int = 0 

103 rate: float = 0.0 

104 

105 def to_dict(self) -> dict[str, Any]: 

106 """Convert to dictionary representation.""" 

107 return { 

108 "by_type": self.by_type, 

109 "total": self.total, 

110 "rate_percent": round(self.rate * 100, 2), 

111 } 

112 

113 

114@dataclass 

115class TimingStats: 

116 """Timing statistics for batch processing. 

117 

118 Attributes: 

119 total_duration: Total wall-clock duration in seconds. 

120 average_per_file: Average processing time per file. 

121 min_per_file: Minimum processing time per file. 

122 max_per_file: Maximum processing time per file. 

123 median_per_file: Median processing time per file. 

124 stddev_per_file: Standard deviation of processing times. 

125 

126 References: 

127 LOG-012: Timing metrics requirement 

128 """ 

129 

130 total_duration: float = 0.0 

131 average_per_file: float = 0.0 

132 min_per_file: float = 0.0 

133 max_per_file: float = 0.0 

134 median_per_file: float = 0.0 

135 stddev_per_file: float = 0.0 

136 

137 def to_dict(self) -> dict[str, Any]: 

138 """Convert to dictionary representation.""" 

139 return { 

140 "total_duration_seconds": round(self.total_duration, 3), 

141 "average_per_file_seconds": round(self.average_per_file, 3), 

142 "min_per_file_seconds": round(self.min_per_file, 3), 

143 "max_per_file_seconds": round(self.max_per_file, 3), 

144 "median_per_file_seconds": round(self.median_per_file, 3), 

145 "stddev_per_file_seconds": round(self.stddev_per_file, 3), 

146 } 

147 

148 

149@dataclass 

150class ThroughputStats: 

151 """Throughput statistics for batch processing. 

152 

153 Attributes: 

154 files_per_second: Processing rate in files per second. 

155 samples_per_second: Processing rate in samples per second. 

156 measurements_per_second: Rate of measurements computed per second. 

157 bytes_per_second: Data processing rate (if tracked). 

158 

159 References: 

160 LOG-012: Throughput metrics requirement 

161 """ 

162 

163 files_per_second: float = 0.0 

164 samples_per_second: float = 0.0 

165 measurements_per_second: float = 0.0 

166 bytes_per_second: float = 0.0 

167 

168 def to_dict(self) -> dict[str, Any]: 

169 """Convert to dictionary representation.""" 

170 return { 

171 "files_per_second": round(self.files_per_second, 3), 

172 "samples_per_second": round(self.samples_per_second, 0), 

173 "measurements_per_second": round(self.measurements_per_second, 0), 

174 "bytes_per_second": round(self.bytes_per_second, 0), 

175 } 

176 

177 

178@dataclass 

179class BatchMetricsSummary: 

180 """Complete summary of batch processing metrics. 

181 

182 Attributes: 

183 batch_id: Unique batch job identifier. 

184 total_files: Total number of files in the batch. 

185 processed_count: Number of successfully processed files. 

186 error_count: Number of files with errors. 

187 skip_count: Number of skipped files. 

188 timing: Timing statistics. 

189 throughput: Throughput statistics. 

190 errors: Error breakdown. 

191 start_time: Batch start time (ISO 8601). 

192 end_time: Batch end time (ISO 8601). 

193 

194 References: 

195 LOG-012: Batch Job Performance Metrics 

196 """ 

197 

198 batch_id: str 

199 total_files: int 

200 processed_count: int 

201 error_count: int 

202 skip_count: int 

203 timing: TimingStats 

204 throughput: ThroughputStats 

205 errors: ErrorBreakdown 

206 start_time: str 

207 end_time: str 

208 

209 def to_dict(self) -> dict[str, Any]: 

210 """Convert to dictionary representation.""" 

211 return { 

212 "batch_id": self.batch_id, 

213 "total_files": self.total_files, 

214 "processed_count": self.processed_count, 

215 "error_count": self.error_count, 

216 "skip_count": self.skip_count, 

217 "success_rate_percent": round( 

218 (self.processed_count / self.total_files * 100) if self.total_files > 0 else 0, 2 

219 ), 

220 "timing": self.timing.to_dict(), 

221 "throughput": self.throughput.to_dict(), 

222 "errors": self.errors.to_dict(), 

223 "start_time": self.start_time, 

224 "end_time": self.end_time, 

225 } 

226 

227 

228class BatchMetrics: 

229 """Batch job performance metrics collector. 

230 

231 Collects and aggregates performance metrics for batch processing jobs 

232 including throughput, timing, and error statistics. 

233 

234 Example: 

235 >>> metrics = BatchMetrics(batch_id="analysis-001") 

236 >>> metrics.start() 

237 >>> for file in files: 

238 ... metrics.record_file(file, duration=0.5, samples=100000) 

239 >>> metrics.finish() 

240 >>> summary = metrics.summary() 

241 >>> metrics.export_json("metrics.json") 

242 

243 References: 

244 LOG-012: Batch Job Performance Metrics 

245 """ 

246 

247 def __init__(self, batch_id: str | None = None) -> None: 

248 """Initialize batch metrics collector. 

249 

250 Args: 

251 batch_id: Unique batch job identifier. Auto-generated if None. 

252 """ 

253 import uuid 

254 

255 self.batch_id = batch_id or str(uuid.uuid4()) 

256 self._files: list[FileMetrics] = [] 

257 self._lock = threading.Lock() 

258 self._start_time: float | None = None 

259 self._end_time: float | None = None 

260 self._error_types: dict[str, int] = {} 

261 

262 def start(self) -> None: 

263 """Mark batch job as started. 

264 

265 Records the start time for duration calculation. 

266 """ 

267 self._start_time = time.time() 

268 logger.info( 

269 "Batch metrics collection started", 

270 extra={"batch_id": self.batch_id}, 

271 ) 

272 

273 def finish(self) -> None: 

274 """Mark batch job as finished. 

275 

276 Records the end time and logs the completion summary. 

277 """ 

278 self._end_time = time.time() 

279 summary = self.summary() 

280 logger.info( 

281 "Batch metrics collection finished", 

282 extra={ 

283 "batch_id": self.batch_id, 

284 "total_files": summary.total_files, 

285 "processed": summary.processed_count, 

286 "errors": summary.error_count, 

287 "duration": summary.timing.total_duration, 

288 }, 

289 ) 

290 

291 def record_file( 

292 self, 

293 filename: str, 

294 *, 

295 duration: float, 

296 samples: int = 0, 

297 measurements: int = 0, 

298 status: str = "success", 

299 error_type: str | None = None, 

300 error_message: str | None = None, 

301 memory_peak: int | None = None, 

302 ) -> None: 

303 """Record metrics for a processed file. 

304 

305 Args: 

306 filename: Path to the processed file. 

307 duration: Processing duration in seconds. 

308 samples: Number of samples processed. 

309 measurements: Number of measurements computed. 

310 status: Processing status (success, error, skipped). 

311 error_type: Error type if status is 'error'. 

312 error_message: Error message if status is 'error'. 

313 memory_peak: Peak memory usage in bytes. 

314 

315 References: 

316 LOG-012: Per-file metrics tracking 

317 """ 

318 now = time.time() 

319 file_metrics = FileMetrics( 

320 filename=filename, 

321 start_time=now - duration, 

322 end_time=now, 

323 duration=duration, 

324 samples=samples, 

325 measurements=measurements, 

326 status=status, 

327 error_type=error_type, 

328 error_message=error_message, 

329 memory_peak=memory_peak, 

330 ) 

331 

332 with self._lock: 

333 self._files.append(file_metrics) 

334 if status == "error" and error_type: 

335 self._error_types[error_type] = self._error_types.get(error_type, 0) + 1 

336 

337 def record_error( 

338 self, 

339 filename: str, 

340 error_type: str, 

341 error_message: str, 

342 duration: float = 0.0, 

343 ) -> None: 

344 """Record a file processing error. 

345 

346 Args: 

347 filename: Path to the file that failed. 

348 error_type: Type of error (e.g., "FileNotFoundError"). 

349 error_message: Detailed error message. 

350 duration: Processing duration before error. 

351 

352 References: 

353 LOG-012: Error tracking 

354 """ 

355 self.record_file( 

356 filename, 

357 duration=duration, 

358 status="error", 

359 error_type=error_type, 

360 error_message=error_message, 

361 ) 

362 

363 def record_skip(self, filename: str, reason: str = "") -> None: 

364 """Record a skipped file. 

365 

366 Args: 

367 filename: Path to the skipped file. 

368 reason: Reason for skipping. 

369 

370 References: 

371 LOG-012: Skip count tracking 

372 """ 

373 self.record_file( 

374 filename, 

375 duration=0.0, 

376 status="skipped", 

377 error_message=reason, 

378 ) 

379 

380 def summary(self) -> BatchMetricsSummary: 

381 """Generate batch processing metrics summary. 

382 

383 Returns: 

384 BatchMetricsSummary with aggregated statistics. 

385 

386 References: 

387 LOG-012: Batch Job Performance Metrics 

388 """ 

389 with self._lock: 

390 files = list(self._files) 

391 error_types = dict(self._error_types) 

392 

393 # Count by status 

394 processed_count = sum(1 for f in files if f.status == "success") 

395 error_count = sum(1 for f in files if f.status == "error") 

396 skip_count = sum(1 for f in files if f.status == "skipped") 

397 total_files = len(files) 

398 

399 # Collect durations for successful files 

400 durations = [f.duration for f in files if f.status == "success" and f.duration > 0] 

401 

402 # Calculate timing stats 

403 if durations: 

404 timing = TimingStats( 

405 total_duration=self._end_time - self._start_time 

406 if self._start_time and self._end_time 

407 else sum(durations), 

408 average_per_file=statistics.mean(durations), 

409 min_per_file=min(durations), 

410 max_per_file=max(durations), 

411 median_per_file=statistics.median(durations), 

412 stddev_per_file=statistics.stdev(durations) if len(durations) > 1 else 0.0, 

413 ) 

414 else: 

415 timing = TimingStats( 

416 total_duration=self._end_time - self._start_time 

417 if self._start_time and self._end_time 

418 else 0.0 

419 ) 

420 

421 # Calculate throughput stats 

422 total_samples = sum(f.samples for f in files if f.status == "success") 

423 total_measurements = sum(f.measurements for f in files if f.status == "success") 

424 

425 if timing.total_duration > 0: 425 ↛ 432line 425 didn't jump to line 432 because the condition on line 425 was always true

426 throughput = ThroughputStats( 

427 files_per_second=processed_count / timing.total_duration, 

428 samples_per_second=total_samples / timing.total_duration, 

429 measurements_per_second=total_measurements / timing.total_duration, 

430 ) 

431 else: 

432 throughput = ThroughputStats() 

433 

434 # Error breakdown 

435 errors = ErrorBreakdown( 

436 by_type=error_types, 

437 total=error_count, 

438 rate=error_count / total_files if total_files > 0 else 0.0, 

439 ) 

440 

441 # Timestamps 

442 start_time = ( 

443 format_timestamp(datetime.fromtimestamp(self._start_time, tz=UTC)) 

444 if self._start_time 

445 else "" 

446 ) 

447 end_time = ( 

448 format_timestamp(datetime.fromtimestamp(self._end_time, tz=UTC)) 

449 if self._end_time 

450 else "" 

451 ) 

452 

453 return BatchMetricsSummary( 

454 batch_id=self.batch_id, 

455 total_files=total_files, 

456 processed_count=processed_count, 

457 error_count=error_count, 

458 skip_count=skip_count, 

459 timing=timing, 

460 throughput=throughput, 

461 errors=errors, 

462 start_time=start_time, 

463 end_time=end_time, 

464 ) 

465 

466 def get_file_metrics(self) -> list[dict[str, Any]]: 

467 """Get metrics for all files. 

468 

469 Returns: 

470 List of file metric dictionaries. 

471 """ 

472 with self._lock: 

473 return [f.to_dict() for f in self._files] 

474 

475 def export_json(self, path: str | Path) -> None: 

476 """Export metrics to JSON file. 

477 

478 Args: 

479 path: Output file path. 

480 

481 References: 

482 LOG-012: Export batch metrics as JSON 

483 """ 

484 path = Path(path) 

485 summary = self.summary() 

486 

487 output = { 

488 "summary": summary.to_dict(), 

489 "files": self.get_file_metrics(), 

490 } 

491 

492 with open(path, "w") as f: 

493 json.dump(output, f, indent=2, default=str) 

494 

495 logger.info(f"Batch metrics exported to {path}") 

496 

497 def export_csv(self, path: str | Path) -> None: 

498 """Export per-file metrics to CSV file. 

499 

500 Args: 

501 path: Output file path. 

502 

503 References: 

504 LOG-012: Export batch metrics as CSV 

505 """ 

506 path = Path(path) 

507 files = self.get_file_metrics() 

508 

509 if not files: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 logger.warning("No file metrics to export") 

511 return 

512 

513 # Get all keys from first file 

514 fieldnames = list(files[0].keys()) 

515 

516 with open(path, "w", newline="") as f: 

517 writer = csv.DictWriter(f, fieldnames=fieldnames) 

518 writer.writeheader() 

519 writer.writerows(files) 

520 

521 logger.info(f"Batch metrics CSV exported to {path}") 

522 

523 

524def get_batch_stats(batch_id: str, metrics: BatchMetrics) -> dict[str, Any]: 

525 """Get statistics for a batch job. 

526 

527 CLI command implementation: tracekit batch stats <batch_id> 

528 

529 Args: 

530 batch_id: Batch job identifier. 

531 metrics: BatchMetrics instance. 

532 

533 Returns: 

534 Dictionary with batch statistics. 

535 

536 Raises: 

537 ValueError: If batch ID does not match the metrics instance. 

538 

539 References: 

540 LOG-012: CLI command requirement 

541 """ 

542 if metrics.batch_id != batch_id: 

543 raise ValueError(f"Batch ID mismatch: expected {batch_id}, got {metrics.batch_id}") 

544 

545 return metrics.summary().to_dict() 

546 

547 

548__all__ = [ 

549 "BatchMetrics", 

550 "BatchMetricsSummary", 

551 "ErrorBreakdown", 

552 "FileMetrics", 

553 "ThroughputStats", 

554 "TimingStats", 

555 "get_batch_stats", 

556]