Coverage for src / tracekit / reporting / batch.py: 91%

260 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Batch report generation for TraceKit. 

2 

3This module provides utilities for generating reports across multiple DUTs 

4or files with summary reports and yield analysis. 

5 

6 

7Example: 

8 >>> from tracekit.reporting.batch import batch_report, generate_batch_report 

9 >>> # Process multiple files 

10 >>> results = batch_report(['dut1.wfm', 'dut2.wfm'], template='production', output_dir='reports/') 

11 >>> # Or generate from pre-computed results 

12 >>> report = generate_batch_report(batch_results, "batch_summary.pdf") 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Any 

20 

21import numpy as np 

22 

23from tracekit.reporting.core import Report, ReportConfig, Section 

24from tracekit.reporting.tables import format_batch_summary_table 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Callable 

28 

29 from numpy.typing import NDArray 

30 

31logger = logging.getLogger(__name__) 

32 

33__all__ = [ 

34 "BatchReportResult", 

35 "aggregate_batch_measurements", 

36 "batch_report", 

37 "generate_batch_report", 

38] 

39 

40 

41class BatchReportResult: 

42 """Result of batch report generation. 

43 

44 Attributes: 

45 summary_report_path: Path to summary report 

46 individual_report_paths: Paths to individual DUT reports 

47 total_duts: Total number of DUTs processed 

48 passed_duts: Number of passing DUTs 

49 failed_duts: Number of failing DUTs 

50 errors: List of processing errors 

51 

52 References: 

53 RPT-003: Batch Report Generation 

54 """ 

55 

56 def __init__(self) -> None: 

57 """Initialize batch result.""" 

58 self.summary_report_path: Path | None = None 

59 self.individual_report_paths: list[Path] = [] 

60 self.total_duts: int = 0 

61 self.passed_duts: int = 0 

62 self.failed_duts: int = 0 

63 self.errors: list[tuple[str, str]] = [] 

64 

65 @property 

66 def dut_yield(self) -> float: 

67 """Calculate DUT yield percentage.""" 

68 if self.total_duts == 0: 

69 return 0.0 

70 return (self.passed_duts / self.total_duts) * 100 

71 

72 

73def batch_report( 

74 files: list[str | Path], 

75 template: str = "production", 

76 output_dir: str | Path = "reports", 

77 *, 

78 analyzer: Callable[[Any], dict[str, Any]] | None = None, 

79 generate_individual: bool = True, 

80 generate_summary: bool = True, 

81 output_format: str = "pdf", 

82 file_pattern: str = "{dut_id}_report.{ext}", 

83 summary_filename: str = "batch_summary.{ext}", 

84 dut_id_extractor: Callable[[Path], str] | None = None, 

85) -> BatchReportResult: 

86 """Generate reports for multiple DUTs/files. 

87 

88 This is the primary interface for batch report generation. 

89 

90 Args: 

91 files: List of input file paths 

92 template: Report template name (default: 'production') 

93 output_dir: Output directory for reports 

94 analyzer: Optional analysis function (trace -> results dict) 

95 generate_individual: Generate individual DUT reports 

96 generate_summary: Generate summary report across all DUTs 

97 output_format: Output format ('pdf', 'html') 

98 file_pattern: Filename pattern for individual reports 

99 summary_filename: Filename for summary report 

100 dut_id_extractor: Function to extract DUT ID from file path 

101 

102 Returns: 

103 BatchReportResult with paths and statistics 

104 

105 Example: 

106 >>> result = batch_report( 

107 ... files=['dut1.wfm', 'dut2.wfm', 'dut3.wfm'], 

108 ... template='production', 

109 ... output_dir='./reports' 

110 ... ) 

111 >>> print(f"Yield: {result.dut_yield:.1f}%") 

112 >>> print(f"Summary: {result.summary_report_path}") 

113 

114 References: 

115 RPT-003: Batch Report Generation 

116 """ 

117 import tracekit as tk 

118 from tracekit.reporting.template_system import load_template 

119 

120 output_path = Path(output_dir) 

121 output_path.mkdir(parents=True, exist_ok=True) 

122 

123 result = BatchReportResult() 

124 batch_results: list[dict[str, Any]] = [] 

125 

126 # Load template 

127 try: 

128 report_template = load_template(template) 

129 except ValueError: 

130 logger.warning(f"Template '{template}' not found, using 'default'") 

131 report_template = load_template("default") 

132 

133 # Default DUT ID extractor 

134 if dut_id_extractor is None: 

135 

136 def dut_id_extractor(p: Path) -> str: 

137 return Path(p).stem 

138 

139 # Process each file 

140 for file_path in files: 

141 file_path = Path(file_path) 

142 dut_id = dut_id_extractor(file_path) 

143 

144 try: 

145 # Load trace 

146 trace = tk.load(str(file_path)) 

147 

148 # Run analysis 

149 if analyzer is not None: 

150 dut_result = analyzer(trace) 

151 else: 

152 # Default analysis 

153 dut_result = _default_analysis(trace) 

154 

155 # Add DUT metadata 

156 dut_result["dut_id"] = dut_id 

157 dut_result["source_file"] = str(file_path) 

158 

159 batch_results.append(dut_result) 

160 

161 # Check pass/fail 

162 if dut_result.get("pass_count", 0) == dut_result.get("total_count", 0): 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true

163 result.passed_duts += 1 

164 else: 

165 result.failed_duts += 1 

166 

167 result.total_duts += 1 

168 

169 # Generate individual report if requested 

170 if generate_individual: 

171 ext = output_format.lower() 

172 individual_filename = file_pattern.format(dut_id=dut_id, ext=ext) 

173 individual_path = output_path / individual_filename 

174 

175 try: 

176 _generate_individual_report( 

177 dut_result, individual_path, report_template, output_format 

178 ) 

179 result.individual_report_paths.append(individual_path) 

180 except Exception as e: 

181 logger.error(f"Failed to generate report for {dut_id}: {e}") 

182 result.errors.append((dut_id, str(e))) 

183 

184 except Exception as e: 

185 logger.error(f"Failed to process {file_path}: {e}") 

186 result.errors.append((str(file_path), str(e))) 

187 

188 # Generate summary report if requested 

189 if generate_summary and batch_results: 

190 ext = output_format.lower() 

191 summary_path = output_path / summary_filename.format(ext=ext) 

192 

193 try: 

194 summary_report = generate_batch_report(batch_results) 

195 _save_report(summary_report, summary_path, output_format) 

196 result.summary_report_path = summary_path 

197 except Exception as e: 

198 logger.error(f"Failed to generate summary report: {e}") 

199 result.errors.append(("summary", str(e))) 

200 

201 return result 

202 

203 

204def _default_analysis(trace: Any) -> dict[str, Any]: 

205 """Default analysis for a trace.""" 

206 import numpy as np 

207 

208 from tracekit.core.types import WaveformTrace 

209 

210 if not isinstance(trace, WaveformTrace): 210 ↛ 217line 210 didn't jump to line 217 because the condition on line 210 was always true

211 return { 

212 "measurements": {}, 

213 "pass_count": 0, 

214 "total_count": 0, 

215 } 

216 

217 data = trace.data 

218 

219 # Basic measurements 

220 measurements = { 

221 "peak_to_peak": { 

222 "value": float(np.ptp(data)), 

223 "unit": "V", 

224 "passed": True, 

225 }, 

226 "rms": { 

227 "value": float(np.sqrt(np.mean(data**2))), 

228 "unit": "V", 

229 "passed": True, 

230 }, 

231 "mean": { 

232 "value": float(np.mean(data)), 

233 "unit": "V", 

234 "passed": True, 

235 }, 

236 "std_dev": { 

237 "value": float(np.std(data)), 

238 "unit": "V", 

239 "passed": True, 

240 }, 

241 } 

242 

243 return { 

244 "measurements": measurements, 

245 "pass_count": len(measurements), 

246 "total_count": len(measurements), 

247 } 

248 

249 

250def _generate_individual_report( 

251 dut_result: dict[str, Any], 

252 output_path: Path, 

253 template: Any, 

254 output_format: str, 

255) -> None: 

256 """Generate individual DUT report.""" 

257 from tracekit.reporting.core import Report, ReportConfig 

258 

259 dut_id = dut_result.get("dut_id", "Unknown") 

260 config = ReportConfig(title=f"Test Report: {dut_id}") 

261 report = Report(config=config) 

262 

263 # Add summary section 

264 pass_count = dut_result.get("pass_count", 0) 

265 total_count = dut_result.get("total_count", 0) 

266 pass_rate = (pass_count / total_count * 100) if total_count > 0 else 0 

267 

268 summary = f"DUT: {dut_id}\n" 

269 summary += f"Source: {dut_result.get('source_file', 'N/A')}\n" 

270 summary += f"Pass Rate: {pass_count}/{total_count} ({pass_rate:.1f}%)" 

271 

272 report.add_section("Summary", summary, level=1) 

273 

274 # Add measurements 

275 if "measurements" in dut_result: 275 ↛ 281line 275 didn't jump to line 281 because the condition on line 275 was always true

276 from tracekit.reporting.tables import create_measurement_table 

277 

278 table = create_measurement_table(dut_result["measurements"], format="dict") 

279 report.add_section("Measurements", [table], level=1) 

280 

281 _save_report(report, output_path, output_format) 

282 

283 

284def _save_report(report: Report, output_path: Path, output_format: str) -> None: 

285 """Save report in specified format.""" 

286 if output_format.lower() == "pdf": 

287 from tracekit.reporting.pdf import save_pdf_report 

288 

289 save_pdf_report(report, str(output_path)) 

290 elif output_format.lower() == "html": 

291 from tracekit.reporting.html import save_html_report 

292 

293 save_html_report(report, str(output_path)) 

294 else: 

295 raise ValueError(f"Unsupported output format: {output_format}") 

296 

297 

298def generate_batch_report( 

299 batch_results: list[dict[str, Any]], 

300 *, 

301 title: str = "Batch Test Summary Report", 

302 include_individual: bool = True, 

303 include_yield_analysis: bool = True, 

304 include_outliers: bool = True, 

305 **kwargs: Any, 

306) -> Report: 

307 """Generate batch summary report for multiple DUTs. 

308 

309 Args: 

310 batch_results: List of result dictionaries, one per DUT. 

311 title: Report title. 

312 include_individual: Include individual DUT sections. 

313 include_yield_analysis: Include yield analysis section. 

314 include_outliers: Include outlier detection section. 

315 **kwargs: Additional report configuration options. 

316 

317 Returns: 

318 Batch Report object. 

319 

320 References: 

321 REPORT-009, REPORT-018 

322 """ 

323 config = ReportConfig(title=title, **kwargs) 

324 report = Report(config=config) 

325 

326 # Add batch summary 

327 summary = _generate_batch_summary(batch_results) 

328 report.add_section("Batch Summary", summary, level=1) 

329 

330 # Add batch summary table 

331 summary_table = format_batch_summary_table(batch_results, format="dict") 

332 report.add_section("DUT Summary Table", [summary_table], level=1) 

333 

334 # Add yield analysis 

335 if include_yield_analysis: 

336 yield_section = _create_yield_analysis_section(batch_results) 

337 report.sections.append(yield_section) 

338 

339 # Add statistical analysis 

340 stats_section = _create_batch_statistics_section(batch_results) 

341 report.sections.append(stats_section) 

342 

343 # Add outlier detection 

344 if include_outliers: 

345 outlier_section = _create_outlier_detection_section(batch_results) 

346 report.sections.append(outlier_section) 

347 

348 # Add individual DUT sections 

349 if include_individual: 

350 for i, dut_result in enumerate(batch_results): 

351 dut_section = _create_dut_section(dut_result, i) 

352 report.sections.append(dut_section) 

353 

354 return report 

355 

356 

357def _generate_batch_summary(batch_results: list[dict[str, Any]]) -> str: 

358 """Generate batch summary text.""" 

359 summary_parts = [] 

360 

361 total_duts = len(batch_results) 

362 summary_parts.append(f"Tested {total_duts} DUT(s).") 

363 

364 # Aggregate statistics 

365 total_tests = sum(r.get("total_count", 0) for r in batch_results) 

366 total_passed = sum(r.get("pass_count", 0) for r in batch_results) 

367 

368 if total_tests > 0: 

369 pass_rate = total_passed / total_tests * 100 

370 summary_parts.append( 

371 f"\nOverall: {total_passed}/{total_tests} tests passed ({pass_rate:.1f}% pass rate)." 

372 ) 

373 

374 # DUT-level yield 

375 passing_duts = sum( 

376 1 for r in batch_results if r.get("pass_count", 0) == r.get("total_count", 0) 

377 ) 

378 

379 if total_duts > 0: 

380 dut_yield = passing_duts / total_duts * 100 

381 summary_parts.append( 

382 f"\nDUT Yield: {passing_duts}/{total_duts} DUTs passed all tests " 

383 f"({dut_yield:.1f}% yield)." 

384 ) 

385 

386 # Failed DUTs 

387 if passing_duts < total_duts: 

388 failed_duts = [] 

389 for i, r in enumerate(batch_results): 

390 dut_id = r.get("dut_id", f"DUT-{i + 1}") 

391 if r.get("pass_count", 0) < r.get("total_count", 0): 

392 failed_duts.append(dut_id) 

393 

394 summary_parts.append(f"\nFailed DUTs: {', '.join(failed_duts)}") 

395 

396 return "\n".join(summary_parts) 

397 

398 

399def _create_yield_analysis_section(batch_results: list[dict[str, Any]]) -> Section: 

400 """Create yield analysis section.""" 

401 content_parts = [] 

402 

403 # Overall yield 

404 total_duts = len(batch_results) 

405 passing_duts = sum( 

406 1 for r in batch_results if r.get("pass_count", 0) == r.get("total_count", 0) 

407 ) 

408 

409 overall_yield = (passing_duts / total_duts * 100) if total_duts > 0 else 0 

410 

411 content_parts.append(f"**Overall Yield:** {overall_yield:.2f}%") 

412 content_parts.append(f"**Passing DUTs:** {passing_duts}/{total_duts}") 

413 

414 # Per-test yield 

415 content_parts.append("\n**Per-Test Yield:**") 

416 

417 # Collect all test names 

418 all_tests: set[str] = set() 

419 for result in batch_results: 

420 if "measurements" in result: 

421 all_tests.update(result["measurements"].keys()) 

422 

423 test_yields: list[tuple[str, float, int, int]] = [] 

424 for test_name in sorted(all_tests): 

425 total_with_test = 0 

426 passed_test = 0 

427 

428 for result in batch_results: 

429 if "measurements" in result and test_name in result["measurements"]: 429 ↛ 428line 429 didn't jump to line 428 because the condition on line 429 was always true

430 total_with_test += 1 

431 if result["measurements"][test_name].get("passed", True): 

432 passed_test += 1 

433 

434 if total_with_test > 0: 434 ↛ 424line 434 didn't jump to line 424 because the condition on line 434 was always true

435 test_yield = passed_test / total_with_test * 100 

436 test_yields.append((test_name, test_yield, passed_test, total_with_test)) 

437 

438 # Sort by yield (worst first) 

439 test_yields.sort(key=lambda x: x[1]) 

440 

441 for test_name, yield_pct, passed, total in test_yields: 

442 content_parts.append(f"- {test_name}: {yield_pct:.1f}% ({passed}/{total})") 

443 

444 content = "\n".join(content_parts) 

445 

446 return Section( 

447 title="Yield Analysis", 

448 content=content, 

449 level=1, 

450 visible=True, 

451 ) 

452 

453 

454def _create_batch_statistics_section(batch_results: list[dict[str, Any]]) -> Section: 

455 """Create batch statistical analysis section.""" 

456 from tracekit.reporting.formatting import NumberFormatter 

457 

458 formatter = NumberFormatter() 

459 

460 # Collect measurements across all DUTs 

461 param_values: dict[str, list[float]] = {} 

462 

463 for result in batch_results: 

464 if "measurements" in result: 

465 for param, meas in result["measurements"].items(): 

466 value = meas.get("value") 

467 if value is not None: 467 ↛ 465line 467 didn't jump to line 465 because the condition on line 467 was always true

468 if param not in param_values: 

469 param_values[param] = [] 

470 param_values[param].append(value) 

471 

472 # Build statistics table 

473 headers = ["Parameter", "Mean", "Std Dev", "Min", "Max", "Range"] 

474 rows = [] 

475 

476 for param in sorted(param_values.keys()): 

477 values = np.array(param_values[param]) 

478 unit = "" 

479 

480 # Get unit from first measurement 

481 for result in batch_results: 481 ↛ 486line 481 didn't jump to line 486 because the loop on line 481 didn't complete

482 if "measurements" in result and param in result["measurements"]: 482 ↛ 481line 482 didn't jump to line 481 because the condition on line 482 was always true

483 unit = result["measurements"][param].get("unit", "") 

484 break 

485 

486 rows.append( 

487 [ 

488 param, 

489 formatter.format(float(np.mean(values)), unit), 

490 formatter.format(float(np.std(values)), unit), 

491 formatter.format(float(np.min(values)), unit), 

492 formatter.format(float(np.max(values)), unit), 

493 formatter.format(float(np.max(values) - np.min(values)), unit), 

494 ] 

495 ) 

496 

497 table = {"type": "table", "headers": headers, "data": rows} 

498 

499 return Section( 

500 title="Batch Statistics", 

501 content=[table], 

502 level=1, 

503 visible=True, 

504 ) 

505 

506 

507def _create_outlier_detection_section(batch_results: list[dict[str, Any]]) -> Section: 

508 """Create outlier detection section.""" 

509 content_parts: list[str] = [] 

510 

511 # Collect measurements 

512 param_values: dict[str, list[tuple[int, float]]] = {} 

513 

514 for i, result in enumerate(batch_results): 

515 if "measurements" in result: 

516 for param, meas in result["measurements"].items(): 

517 value = meas.get("value") 

518 if value is not None: 518 ↛ 516line 518 didn't jump to line 516 because the condition on line 518 was always true

519 if param not in param_values: 

520 param_values[param] = [] 

521 param_values[param].append((i, value)) 

522 

523 # Detect outliers using 3-sigma rule 

524 outliers_found = False 

525 

526 for param in sorted(param_values.keys()): 

527 values_with_idx = param_values[param] 

528 values = np.array([v for _, v in values_with_idx]) 

529 

530 mean = float(np.mean(values)) 

531 std = float(np.std(values)) 

532 

533 if std > 0: 

534 outlier_indices: list[tuple[int, float, float]] = [] 

535 for idx, value in values_with_idx: 

536 z_score = abs(value - mean) / std 

537 if z_score > 3: # 3-sigma rule 

538 outlier_indices.append((idx, value, z_score)) 

539 

540 if outlier_indices: 

541 outliers_found = True 

542 content_parts.append(f"**{param}:**") 

543 for idx, value, z_score in outlier_indices: 

544 dut_id = batch_results[idx].get("dut_id", f"DUT-{idx + 1}") 

545 content_parts.append(f"- {dut_id}: {value:.3g} (z-score: {z_score:.2f})") 

546 

547 if not outliers_found: 

548 content_parts.append("No statistical outliers detected (3-sigma threshold).") 

549 

550 content = "\n".join(content_parts) 

551 

552 return Section( 

553 title="Outlier Detection", 

554 content=content, 

555 level=1, 

556 visible=True, 

557 ) 

558 

559 

560def _create_dut_section(result: dict[str, Any], index: int) -> Section: 

561 """Create individual DUT section.""" 

562 dut_id = result.get("dut_id", f"DUT-{index + 1}") 

563 

564 content_parts: list[Any] = [] 

565 

566 # DUT summary 

567 pass_count = result.get("pass_count", 0) 

568 total_count = result.get("total_count", 0) 

569 

570 if total_count > 0: 

571 pass_rate = pass_count / total_count * 100 

572 content_parts.append(f"Pass rate: {pass_count}/{total_count} ({pass_rate:.1f}%)") 

573 

574 # Measurements table 

575 if "measurements" in result: 

576 from tracekit.reporting.tables import create_measurement_table 

577 

578 table = create_measurement_table(result["measurements"], format="dict") 

579 content_parts.append(table) 

580 

581 return Section( 

582 title=f"DUT: {dut_id}", 

583 content=content_parts, 

584 level=2, 

585 visible=True, 

586 collapsible=True, 

587 ) 

588 

589 

590def aggregate_batch_measurements( 

591 batch_results: list[dict[str, Any]], 

592) -> dict[str, NDArray[np.float64]]: 

593 """Aggregate measurements across batch for statistical analysis. 

594 

595 Args: 

596 batch_results: List of DUT results. 

597 

598 Returns: 

599 Dictionary mapping parameter name to array of values. 

600 

601 References: 

602 REPORT-009 

603 """ 

604 param_values: dict[str, list[float]] = {} 

605 

606 for result in batch_results: 

607 if "measurements" in result: 

608 for param, meas in result["measurements"].items(): 

609 value = meas.get("value") 

610 if value is not None: 

611 if param not in param_values: 

612 param_values[param] = [] 

613 param_values[param].append(value) 

614 

615 return {k: np.array(v) for k, v in param_values.items()}