Coverage for src / tracekit / analyzers / packet / daq.py: 94%

345 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""DAQ error-tolerant analysis module. 

2 

3This module provides error-tolerant DAQ analysis features including fuzzy 

4pattern matching, error recovery, bit error characterization, and gap detection. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.packet.daq import fuzzy_pattern_search, detect_gaps 

9 >>> matches = fuzzy_pattern_search(data, pattern=0xAA55, max_errors=2) 

10 >>> for match in matches: 

11 ... print(f"Found at {match.offset}, errors: {match.bit_errors}") 

12""" 

13 

14from __future__ import annotations 

15 

16from dataclasses import dataclass, field 

17from enum import Enum 

18from typing import TYPE_CHECKING, Any 

19 

20import numpy as np 

21 

22if TYPE_CHECKING: 

23 from numpy.typing import NDArray 

24 

25 from tracekit.core.types import WaveformTrace 

26 

27 

28class ErrorPattern(Enum): 

29 """Bit error pattern types.""" 

30 

31 RANDOM = "random" # Uniformly distributed errors (noise) 

32 BURST = "burst" # Clustered errors (interference) 

33 SYSTEMATIC = "systematic" # Regular pattern (clock issues) 

34 SINGLE_BIT = "single_bit" # Isolated single-bit errors 

35 

36 

37@dataclass 

38class FuzzyMatch: 

39 """Result of fuzzy pattern search. 

40 

41 Attributes: 

42 offset: Bit offset where pattern was found 

43 matched_bits: Actual bits at this location 

44 bit_errors: Number of bit errors (Hamming distance) 

45 error_positions: Bit positions with errors 

46 confidence: Match confidence (0-1) 

47 """ 

48 

49 offset: int 

50 matched_bits: int 

51 bit_errors: int 

52 error_positions: list[int] = field(default_factory=list) 

53 confidence: float = 1.0 

54 

55 @property 

56 def is_exact(self) -> bool: 

57 """Return True if exact match (no errors).""" 

58 return self.bit_errors == 0 

59 

60 

61@dataclass 

62class PacketRecoveryResult: 

63 """Result of error-tolerant packet parsing. 

64 

65 Attributes: 

66 packets: Successfully parsed packets 

67 recovered_packets: Packets recovered despite errors 

68 failed_regions: Regions that could not be parsed 

69 total_errors: Total bit errors encountered 

70 sync_resync_count: Number of resynchronizations 

71 """ 

72 

73 packets: list[dict[str, Any]] = field(default_factory=list) 

74 recovered_packets: list[dict[str, Any]] = field(default_factory=list) 

75 failed_regions: list[tuple[int, int]] = field(default_factory=list) 

76 total_errors: int = 0 

77 sync_resync_count: int = 0 

78 

79 

80@dataclass 

81class JitterCompensationResult: 

82 """Result of timestamp jitter compensation. 

83 

84 Attributes: 

85 original_timestamps: Original timestamps 

86 corrected_timestamps: Jitter-compensated timestamps 

87 jitter_removed_ns: RMS jitter removed in nanoseconds 

88 clock_drift_ppm: Estimated clock drift in ppm 

89 correction_method: Method used for correction 

90 """ 

91 

92 original_timestamps: NDArray[np.float64] 

93 corrected_timestamps: NDArray[np.float64] 

94 jitter_removed_ns: float 

95 clock_drift_ppm: float 

96 correction_method: str 

97 

98 

99@dataclass 

100class BitErrorAnalysis: 

101 """Bit error pattern analysis result. 

102 

103 Attributes: 

104 error_rate: Overall bit error rate 

105 error_pattern: Classified error pattern type 

106 burst_length_mean: Mean burst length (for burst errors) 

107 burst_length_max: Maximum burst length 

108 error_distribution: Error count by bit position (LSB to MSB) 

109 probable_cause: Inferred probable cause 

110 recommendations: Suggested fixes 

111 """ 

112 

113 error_rate: float 

114 error_pattern: ErrorPattern 

115 burst_length_mean: float = 0.0 

116 burst_length_max: int = 0 

117 error_distribution: list[int] = field(default_factory=list) 

118 probable_cause: str = "" 

119 recommendations: list[str] = field(default_factory=list) 

120 

121 

122# ============================================================================= 

123# ============================================================================= 

124 

125 

126@dataclass 

127class DAQGap: 

128 """Represents a detected gap in DAQ data. 

129 

130 Attributes: 

131 start_index: Sample index where gap starts 

132 end_index: Sample index where gap ends 

133 start_time: Time when gap starts (seconds) 

134 end_time: Time when gap ends (seconds) 

135 duration: Gap duration in seconds 

136 expected_samples: Number of samples that should be present 

137 missing_samples: Estimated number of missing samples 

138 gap_type: Type of gap ('timestamp', 'sample_count', 'discontinuity') 

139 

140 References: 

141 PKT-008: DAQ Gap Detection 

142 """ 

143 

144 start_index: int 

145 end_index: int 

146 start_time: float 

147 end_time: float 

148 duration: float 

149 expected_samples: int 

150 missing_samples: int 

151 gap_type: str = "timestamp" 

152 

153 

154@dataclass 

155class DAQGapAnalysis: 

156 """Complete gap analysis result. 

157 

158 Attributes: 

159 gaps: List of detected gaps 

160 total_gaps: Total number of gaps found 

161 total_missing_samples: Total estimated missing samples 

162 total_gap_duration: Total gap duration in seconds 

163 acquisition_efficiency: Ratio of captured samples to expected 

164 sample_rate: Detected or specified sample rate 

165 discontinuities: List of data discontinuity indices 

166 metadata: Additional analysis metadata 

167 

168 References: 

169 PKT-008: DAQ Gap Detection 

170 """ 

171 

172 gaps: list[DAQGap] 

173 total_gaps: int 

174 total_missing_samples: int 

175 total_gap_duration: float 

176 acquisition_efficiency: float 

177 sample_rate: float 

178 discontinuities: list[int] 

179 metadata: dict[str, Any] = field(default_factory=dict) 

180 

181 

182def detect_gaps( 

183 trace: WaveformTrace, 

184 *, 

185 expected_interval: float | None = None, 

186 tolerance: float = 0.1, 

187 min_gap_samples: int = 1, 

188) -> DAQGapAnalysis: 

189 """Detect gaps in DAQ data stream. 

190 

191 Identifies missing samples based on expected sample interval 

192 and timestamp analysis. 

193 

194 Args: 

195 trace: Waveform trace to analyze 

196 expected_interval: Expected time between samples (None = auto-detect) 

197 tolerance: Tolerance for interval deviation (0.1 = 10%) 

198 min_gap_samples: Minimum number of missing samples to report 

199 

200 Returns: 

201 DAQGapAnalysis with detected gaps 

202 

203 Example: 

204 >>> trace = tk.load('acquisition.wfm') 

205 >>> result = detect_gaps(trace) 

206 >>> for gap in result.gaps: 

207 ... print(f"Gap at {gap.start_time:.6f}s: {gap.missing_samples} samples") 

208 

209 References: 

210 PKT-008: DAQ Gap Detection 

211 """ 

212 data = trace.data 

213 sample_rate = trace.metadata.sample_rate 

214 

215 # Calculate expected interval 

216 if expected_interval is None: 

217 expected_interval = 1.0 / sample_rate 

218 

219 return detect_gaps_by_samples( 

220 data, 

221 sample_rate=sample_rate, 

222 expected_interval=expected_interval, 

223 tolerance=tolerance, 

224 min_gap_samples=min_gap_samples, 

225 ) 

226 

227 

228def detect_gaps_by_timestamps( 

229 timestamps: NDArray[np.float64], 

230 *, 

231 expected_interval: float | None = None, 

232 tolerance: float = 0.1, 

233 min_gap_samples: int = 1, 

234) -> DAQGapAnalysis: 

235 """Detect gaps using explicit timestamps. 

236 

237 Args: 

238 timestamps: Array of sample timestamps in seconds 

239 expected_interval: Expected interval between samples 

240 tolerance: Tolerance for interval deviation 

241 min_gap_samples: Minimum missing samples to report 

242 

243 Returns: 

244 DAQGapAnalysis with detected gaps 

245 

246 Example: 

247 >>> timestamps = np.array([0.0, 1e-6, 2e-6, 5e-6, 6e-6]) # Gap at 2-5us 

248 >>> result = detect_gaps_by_timestamps(timestamps) 

249 

250 References: 

251 PKT-008: DAQ Gap Detection 

252 """ 

253 if len(timestamps) < 2: 

254 return DAQGapAnalysis( 

255 gaps=[], 

256 total_gaps=0, 

257 total_missing_samples=0, 

258 total_gap_duration=0.0, 

259 acquisition_efficiency=1.0, 

260 sample_rate=0.0, 

261 discontinuities=[], 

262 ) 

263 

264 # Calculate intervals 

265 intervals = np.diff(timestamps) 

266 

267 # Auto-detect expected interval if not provided 

268 if expected_interval is None: 

269 expected_interval = float(np.median(intervals)) 

270 

271 sample_rate = 1.0 / expected_interval 

272 

273 # Calculate allowed deviation 

274 max_interval = expected_interval * (1 + tolerance) 

275 

276 # Find gaps 

277 gaps: list[DAQGap] = [] 

278 discontinuities: list[int] = [] 

279 total_missing = 0 

280 total_gap_duration = 0.0 

281 

282 for i, interval in enumerate(intervals): 

283 if interval > max_interval: 

284 # Calculate missing samples 

285 missing = round(interval / expected_interval) - 1 

286 

287 if missing >= min_gap_samples: 287 ↛ 282line 287 didn't jump to line 282 because the condition on line 287 was always true

288 gap = DAQGap( 

289 start_index=i, 

290 end_index=i + 1, 

291 start_time=float(timestamps[i]), 

292 end_time=float(timestamps[i + 1]), 

293 duration=float(interval - expected_interval), 

294 expected_samples=missing + 1, 

295 missing_samples=missing, 

296 gap_type="timestamp", 

297 ) 

298 gaps.append(gap) 

299 total_missing += missing 

300 total_gap_duration += gap.duration 

301 discontinuities.append(i) 

302 

303 # Calculate efficiency 

304 total_expected = len(timestamps) + total_missing 

305 efficiency = len(timestamps) / total_expected if total_expected > 0 else 1.0 

306 

307 return DAQGapAnalysis( 

308 gaps=gaps, 

309 total_gaps=len(gaps), 

310 total_missing_samples=total_missing, 

311 total_gap_duration=total_gap_duration, 

312 acquisition_efficiency=efficiency, 

313 sample_rate=sample_rate, 

314 discontinuities=discontinuities, 

315 metadata={ 

316 "method": "timestamp", 

317 "expected_interval": expected_interval, 

318 "tolerance": tolerance, 

319 }, 

320 ) 

321 

322 

323def detect_gaps_by_samples( 

324 data: NDArray[np.float64], 

325 *, 

326 sample_rate: float, 

327 expected_interval: float | None = None, 

328 tolerance: float = 0.1, 

329 min_gap_samples: int = 1, 

330 check_discontinuities: bool = True, 

331) -> DAQGapAnalysis: 

332 """Detect gaps using sample count analysis. 

333 

334 Analyzes data for discontinuities that may indicate gaps. 

335 Uses derivative analysis to find sudden jumps. 

336 

337 Args: 

338 data: Sample data array 

339 sample_rate: Sample rate in Hz 

340 expected_interval: Expected interval (None = 1/sample_rate) 

341 tolerance: Tolerance for detection 

342 min_gap_samples: Minimum gap size to report 

343 check_discontinuities: Check for value discontinuities 

344 

345 Returns: 

346 DAQGapAnalysis with detected gaps 

347 

348 References: 

349 PKT-008: DAQ Gap Detection 

350 """ 

351 if len(data) < 2: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 return DAQGapAnalysis( 

353 gaps=[], 

354 total_gaps=0, 

355 total_missing_samples=0, 

356 total_gap_duration=0.0, 

357 acquisition_efficiency=1.0, 

358 sample_rate=sample_rate, 

359 discontinuities=[], 

360 ) 

361 

362 if expected_interval is None: 

363 expected_interval = 1.0 / sample_rate 

364 

365 gaps: list[DAQGap] = [] 

366 discontinuities: list[int] = [] 

367 

368 if check_discontinuities: 368 ↛ 410line 368 didn't jump to line 410 because the condition on line 368 was always true

369 # Analyze for sudden value jumps (potential gaps) 

370 diff = np.abs(np.diff(data)) 

371 median_diff = float(np.median(diff)) 

372 std_diff = float(np.std(diff)) 

373 

374 # Threshold for discontinuity 

375 threshold = median_diff + 5 * std_diff 

376 

377 # Find discontinuity points 

378 disc_mask = diff > threshold 

379 disc_indices = np.where(disc_mask)[0] 

380 

381 for idx in disc_indices: 

382 # Estimate gap size based on value jump 

383 jump_size = diff[idx] 

384 

385 # Assume linear trend, estimate missing samples 

386 if median_diff > 0: 386 ↛ 389line 386 didn't jump to line 389 because the condition on line 386 was always true

387 estimated_missing = max(1, int(jump_size / median_diff) - 1) 

388 else: 

389 estimated_missing = min_gap_samples 

390 

391 if estimated_missing >= min_gap_samples: 391 ↛ 381line 391 didn't jump to line 381 because the condition on line 391 was always true

392 start_time = idx / sample_rate 

393 end_time = (idx + 1) / sample_rate 

394 gap_duration = estimated_missing * expected_interval 

395 

396 gap = DAQGap( 

397 start_index=int(idx), 

398 end_index=int(idx) + 1, 

399 start_time=start_time, 

400 end_time=end_time, 

401 duration=gap_duration, 

402 expected_samples=estimated_missing + 1, 

403 missing_samples=estimated_missing, 

404 gap_type="discontinuity", 

405 ) 

406 gaps.append(gap) 

407 discontinuities.append(int(idx)) 

408 

409 # Calculate totals 

410 total_missing = sum(g.missing_samples for g in gaps) 

411 total_gap_duration = sum(g.duration for g in gaps) 

412 total_expected = len(data) + total_missing 

413 efficiency = len(data) / total_expected if total_expected > 0 else 1.0 

414 

415 return DAQGapAnalysis( 

416 gaps=gaps, 

417 total_gaps=len(gaps), 

418 total_missing_samples=total_missing, 

419 total_gap_duration=total_gap_duration, 

420 acquisition_efficiency=efficiency, 

421 sample_rate=sample_rate, 

422 discontinuities=discontinuities, 

423 metadata={ 

424 "method": "sample_count", 

425 "expected_interval": expected_interval, 

426 "tolerance": tolerance, 

427 "check_discontinuities": check_discontinuities, 

428 }, 

429 ) 

430 

431 

432# ============================================================================= 

433# ============================================================================= 

434 

435 

436def fuzzy_pattern_search( 

437 data: bytes | NDArray[np.uint8], 

438 pattern: int | bytes, 

439 *, 

440 pattern_bits: int = 32, 

441 max_errors: int = 2, 

442 step: int = 1, 

443) -> list[FuzzyMatch]: 

444 """Search for bit patterns with Hamming distance tolerance. 

445 

446 : Fuzzy Bit Pattern Search. 

447 

448 Finds sync words and patterns even with bit errors (flipped bits). 

449 Essential for recovering corrupted logic analyzer captures. 

450 

451 Args: 

452 data: Binary data to search (bytes or numpy array). 

453 pattern: Pattern to search for (int or bytes). 

454 pattern_bits: Number of bits in pattern. 

455 max_errors: Maximum allowed bit errors (Hamming distance). 

456 step: Search step in bits. 

457 

458 Returns: 

459 List of FuzzyMatch objects for all matches within tolerance. 

460 

461 Example: 

462 >>> # Find 0xAA55 sync word with up to 2 bit errors 

463 >>> data = bytes([0xAA, 0x55, 0x12, 0x34, 0xAB, 0x55]) 

464 >>> matches = fuzzy_pattern_search(data, 0xAA55, pattern_bits=16, max_errors=2) 

465 >>> print(f"Found {len(matches)} matches") 

466 """ 

467 if isinstance(data, bytes): 

468 data = np.frombuffer(data, dtype=np.uint8) 

469 

470 if isinstance(pattern, bytes): 

471 pattern = int.from_bytes(pattern, byteorder="big") 

472 

473 # Ensure pattern fits in specified bits 

474 pattern_mask = (1 << pattern_bits) - 1 

475 pattern = pattern & pattern_mask 

476 

477 matches: list[FuzzyMatch] = [] 

478 

479 # Convert data to bit array for searching 

480 total_bits = len(data) * 8 

481 

482 for bit_offset in range(0, total_bits - pattern_bits + 1, step): 

483 # Extract bits at this offset 

484 extracted = _extract_bits(data, bit_offset, pattern_bits) 

485 

486 # Calculate Hamming distance 

487 xor = extracted ^ pattern 

488 bit_errors = (xor).bit_count() 

489 

490 if bit_errors <= max_errors: 

491 # Find error positions 

492 error_positions = [] 

493 for i in range(pattern_bits): 

494 if (xor >> i) & 1: 

495 error_positions.append(i) 

496 

497 confidence = 1.0 - (bit_errors / pattern_bits) 

498 

499 matches.append( 

500 FuzzyMatch( 

501 offset=bit_offset, 

502 matched_bits=extracted, 

503 bit_errors=bit_errors, 

504 error_positions=error_positions, 

505 confidence=confidence, 

506 ) 

507 ) 

508 

509 return matches 

510 

511 

512def _extract_bits(data: NDArray[np.uint8], bit_offset: int, num_bits: int) -> int: 

513 """Extract bits from data array.""" 

514 result = 0 

515 for i in range(num_bits): 

516 bit_pos = bit_offset + i 

517 byte_idx = bit_pos // 8 

518 bit_in_byte = 7 - (bit_pos % 8) # MSB first 

519 

520 if byte_idx < len(data) and (data[byte_idx] >> bit_in_byte) & 1: 

521 result |= 1 << (num_bits - 1 - i) 

522 

523 return result 

524 

525 

526# ============================================================================= 

527# ============================================================================= 

528 

529 

530def robust_packet_parse( 

531 data: bytes | NDArray[np.uint8], 

532 *, 

533 sync_pattern: int = 0xAA55, 

534 sync_bits: int = 16, 

535 length_offset: int = 2, # Bytes after sync 

536 max_packet_length: int = 256, 

537 error_tolerance: int = 2, 

538) -> PacketRecoveryResult: 

539 """Parse variable-length packets with error recovery. 

540 

541 : Robust Variable-Length Packet Parsing. 

542 

543 Parses packets even when length fields are corrupted by falling 

544 back to sync word search. 

545 

546 Args: 

547 data: Binary data containing packets. 

548 sync_pattern: Sync word pattern. 

549 sync_bits: Bits in sync pattern. 

550 length_offset: Byte offset to length field after sync. 

551 max_packet_length: Maximum valid packet length. 

552 error_tolerance: Max bit errors for sync detection. 

553 

554 Returns: 

555 PacketRecoveryResult with parsed and recovered packets. 

556 

557 Example: 

558 >>> result = robust_packet_parse(data, sync_pattern=0xAA55) 

559 >>> print(f"Parsed: {len(result.packets)}, Recovered: {len(result.recovered_packets)}") 

560 """ 

561 if isinstance(data, bytes): 

562 data = np.frombuffer(data, dtype=np.uint8) 

563 

564 result = PacketRecoveryResult() 

565 

566 # Find all sync patterns (fuzzy) 

567 sync_matches = fuzzy_pattern_search( 

568 data, sync_pattern, pattern_bits=sync_bits, max_errors=error_tolerance 

569 ) 

570 

571 # Sort by offset 

572 sync_matches.sort(key=lambda m: m.offset) 

573 

574 i = 0 

575 while i < len(sync_matches): 

576 match = sync_matches[i] 

577 byte_offset = match.offset // 8 

578 

579 if byte_offset + length_offset >= len(data): 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 break 

581 

582 # Read length field 

583 length = data[byte_offset + length_offset] 

584 

585 # Validate length 

586 if length > max_packet_length or length == 0: 

587 # Try to find next sync as packet boundary 

588 if i + 1 < len(sync_matches): 588 ↛ 613line 588 didn't jump to line 613 because the condition on line 588 was always true

589 next_sync_byte = sync_matches[i + 1].offset // 8 

590 inferred_length = next_sync_byte - byte_offset 

591 

592 if 0 < inferred_length <= max_packet_length: 592 ↛ 609line 592 didn't jump to line 609 because the condition on line 592 was always true

593 # Recovered packet with inferred length 

594 packet_data = bytes(data[byte_offset : byte_offset + inferred_length]) 

595 result.recovered_packets.append( 

596 { 

597 "offset": byte_offset, 

598 "length": inferred_length, 

599 "data": packet_data, 

600 "sync_errors": match.bit_errors, 

601 "length_corrupted": True, 

602 } 

603 ) 

604 result.total_errors += match.bit_errors 

605 result.sync_resync_count += 1 

606 i += 1 

607 continue 

608 else: 

609 result.failed_regions.append((byte_offset, byte_offset + 10)) 

610 i += 1 

611 continue 

612 else: 

613 break 

614 

615 # Valid length - extract packet 

616 packet_end = byte_offset + length_offset + 1 + length 

617 if packet_end <= len(data): 

618 packet_data = bytes(data[byte_offset:packet_end]) 

619 result.packets.append( 

620 { 

621 "offset": byte_offset, 

622 "length": length, 

623 "data": packet_data, 

624 "sync_errors": match.bit_errors, 

625 } 

626 ) 

627 result.total_errors += match.bit_errors 

628 

629 i += 1 

630 

631 return result 

632 

633 

634# ============================================================================= 

635# ============================================================================= 

636 

637 

638def compensate_timestamp_jitter( 

639 timestamps: NDArray[np.float64], 

640 *, 

641 expected_rate: float | None = None, 

642 method: str = "lowpass", 

643 cutoff_ratio: float = 0.1, 

644) -> JitterCompensationResult: 

645 """Compensate timestamp jitter and clock drift. 

646 

647 : Timestamp Jitter Compensation. 

648 

649 Corrects sample timestamps affected by clock jitter using low-pass 

650 filtering or PLL model. 

651 

652 Args: 

653 timestamps: Array of timestamps in seconds. 

654 expected_rate: Expected sample rate (auto-detected if None). 

655 method: Compensation method ('lowpass', 'pll', 'linear'). 

656 cutoff_ratio: Low-pass filter cutoff as ratio of sample rate. 

657 

658 Returns: 

659 JitterCompensationResult with corrected timestamps. 

660 

661 Raises: 

662 ValueError: If unknown compensation method specified. 

663 

664 Example: 

665 >>> result = compensate_timestamp_jitter(timestamps, expected_rate=1e6) 

666 >>> print(f"Jitter removed: {result.jitter_removed_ns:.1f} ns") 

667 """ 

668 from scipy import signal 

669 

670 n = len(timestamps) 

671 if n < 2: 

672 return JitterCompensationResult( 

673 original_timestamps=timestamps, 

674 corrected_timestamps=timestamps, 

675 jitter_removed_ns=0, 

676 clock_drift_ppm=0, 

677 correction_method=method, 

678 ) 

679 

680 # Calculate inter-sample intervals 

681 intervals = np.diff(timestamps) 

682 

683 # Auto-detect expected rate from median interval 

684 if expected_rate is None: 

685 expected_interval = np.median(intervals) 

686 expected_rate = 1.0 / expected_interval 

687 else: 

688 expected_interval = 1.0 / expected_rate 

689 

690 if method == "lowpass": 

691 # Low-pass filter the intervals to remove high-frequency jitter 

692 order = 2 

693 cutoff = cutoff_ratio 

694 b, a = signal.butter(order, cutoff, btype="low") 

695 

696 # Apply filter 

697 filtered_intervals = signal.filtfilt(b, a, intervals) 

698 

699 # Reconstruct timestamps 

700 corrected = np.zeros_like(timestamps) 

701 corrected[0] = timestamps[0] 

702 corrected[1:] = timestamps[0] + np.cumsum(filtered_intervals) 

703 

704 elif method == "linear": 

705 # Simple linear fit (clock drift only) 

706 indices = np.arange(n) 

707 coeffs = np.polyfit(indices, timestamps, 1) 

708 corrected = np.polyval(coeffs, indices) 

709 

710 elif method == "pll": 710 ↛ 729line 710 didn't jump to line 729 because the condition on line 710 was always true

711 # PLL-based correction (simplified) 

712 # Track expected vs actual and apply proportional correction 

713 corrected = np.zeros_like(timestamps) 

714 corrected[0] = timestamps[0] 

715 

716 phase_error = 0.0 

717 gain = 0.1 # PLL gain 

718 

719 for i in range(1, n): 

720 expected_time = corrected[i - 1] + expected_interval 

721 actual_time = timestamps[i] 

722 

723 phase_error = actual_time - expected_time 

724 correction = gain * phase_error 

725 

726 corrected[i] = expected_time + correction 

727 

728 else: 

729 raise ValueError(f"Unknown method: {method}") 

730 

731 # Calculate metrics 

732 original_jitter = np.std(intervals - expected_interval) 

733 corrected_intervals = np.diff(corrected) 

734 corrected_jitter = np.std(corrected_intervals - expected_interval) 

735 jitter_removed = original_jitter - corrected_jitter 

736 

737 # Estimate clock drift 

738 total_time = timestamps[-1] - timestamps[0] 

739 expected_total = (n - 1) * expected_interval 

740 drift_ratio = (total_time - expected_total) / expected_total 

741 clock_drift_ppm = drift_ratio * 1e6 

742 

743 return JitterCompensationResult( 

744 original_timestamps=timestamps, 

745 corrected_timestamps=corrected, 

746 jitter_removed_ns=jitter_removed * 1e9, 

747 clock_drift_ppm=clock_drift_ppm, 

748 correction_method=method, 

749 ) 

750 

751 

752# ============================================================================= 

753# ============================================================================= 

754 

755 

756def error_tolerant_decode( 

757 data: bytes | NDArray[np.uint8], 

758 protocol: str, 

759 *, 

760 max_errors_per_frame: int = 2, 

761 resync_on_error: bool = True, 

762) -> dict[str, Any]: 

763 """Decode protocol with error tolerance and resynchronization. 

764 

765 : Error-Tolerant Protocol Decoding. 

766 

767 Continues decoding after framing/parity errors instead of aborting. 

768 

769 Args: 

770 data: Binary data to decode. 

771 protocol: Protocol name ('uart', 'spi', 'i2c'). 

772 max_errors_per_frame: Max errors before skipping frame. 

773 resync_on_error: Attempt resynchronization on errors. 

774 

775 Returns: 

776 Dictionary with decoded frames, errors, and sync info. 

777 

778 Raises: 

779 ValueError: If unsupported protocol specified. 

780 

781 Example: 

782 >>> result = error_tolerant_decode(data, 'uart', max_errors_per_frame=2) 

783 >>> print(f"Decoded: {result['frame_count']}, Errors: {result['error_count']}") 

784 """ 

785 if isinstance(data, bytes): 

786 data = np.frombuffer(data, dtype=np.uint8) 

787 

788 result = { 

789 "protocol": protocol, 

790 "frames": [], 

791 "frame_count": 0, 

792 "error_count": 0, 

793 "resync_count": 0, 

794 "error_frames": [], 

795 } 

796 

797 # Protocol-specific decoding with error recovery 

798 if protocol.lower() == "uart": 

799 result = _decode_uart_tolerant(data, max_errors_per_frame, resync_on_error) 

800 elif protocol.lower() == "spi": 

801 result = _decode_spi_tolerant(data, max_errors_per_frame) 

802 elif protocol.lower() == "i2c": 

803 result = _decode_i2c_tolerant(data, max_errors_per_frame, resync_on_error) 

804 else: 

805 raise ValueError(f"Unsupported protocol: {protocol}") 

806 

807 return result 

808 

809 

810def _decode_uart_tolerant( 

811 data: NDArray[np.uint8], 

812 max_errors: int, 

813 resync: bool, 

814) -> dict[str, Any]: 

815 """UART decode with error tolerance.""" 

816 # Simplified UART decoding with error recovery 

817 frames = [] 

818 errors = [] 

819 

820 # In reality, would properly decode UART bit stream 

821 # Here we treat each byte as a frame for demonstration 

822 for i, byte in enumerate(data): 

823 # Check for framing errors (simplified: check start/stop bits if present) 

824 parity_error = ((byte).bit_count() % 2) != 0 # Assuming odd parity 

825 

826 if parity_error: 

827 errors.append({"offset": i, "type": "parity", "byte": byte}) 

828 if len(errors) > max_errors and resync: 

829 # Skip to next potential start 

830 continue 

831 else: 

832 frames.append({"offset": i, "data": byte, "valid": True}) 

833 

834 return { 

835 "protocol": "uart", 

836 "frames": frames, 

837 "frame_count": len(frames), 

838 "error_count": len(errors), 

839 "resync_count": 0, 

840 "error_frames": errors, 

841 } 

842 

843 

844def _decode_spi_tolerant( 

845 data: NDArray[np.uint8], 

846 max_errors: int, 

847) -> dict[str, Any]: 

848 """SPI decode with error tolerance.""" 

849 frames = [] 

850 for i, byte in enumerate(data): 

851 frames.append({"offset": i, "mosi": byte, "miso": 0, "valid": True}) 

852 

853 return { 

854 "protocol": "spi", 

855 "frames": frames, 

856 "frame_count": len(frames), 

857 "error_count": 0, 

858 "resync_count": 0, 

859 "error_frames": [], 

860 } 

861 

862 

863def _decode_i2c_tolerant( 

864 data: NDArray[np.uint8], 

865 max_errors: int, 

866 resync: bool, 

867) -> dict[str, Any]: 

868 """I2C decode with error tolerance.""" 

869 frames = [] 

870 errors = [] 

871 

872 i = 0 

873 while i < len(data): 

874 # Look for start condition marker (simplified) 

875 if data[i] == 0x00: # Start marker 

876 if i + 2 < len(data): 876 ↛ 890line 876 didn't jump to line 890 because the condition on line 876 was always true

877 addr = data[i + 1] 

878 data_byte = data[i + 2] 

879 frames.append( 

880 { 

881 "offset": i, 

882 "address": addr >> 1, 

883 "read": bool(addr & 1), 

884 "data": data_byte, 

885 "ack": True, 

886 } 

887 ) 

888 i += 3 

889 else: 

890 break 

891 else: 

892 errors.append({"offset": i, "type": "no_start"}) 

893 if resync: 

894 i += 1 

895 else: 

896 break 

897 

898 return { 

899 "protocol": "i2c", 

900 "frames": frames, 

901 "frame_count": len(frames), 

902 "error_count": len(errors), 

903 "resync_count": len(errors) if resync else 0, 

904 "error_frames": errors, 

905 } 

906 

907 

908# ============================================================================= 

909# ============================================================================= 

910 

911 

912def analyze_bit_errors( 

913 expected: bytes | NDArray[np.uint8], 

914 actual: bytes | NDArray[np.uint8], 

915) -> BitErrorAnalysis: 

916 """Analyze bit error patterns for diagnostics. 

917 

918 : Bit Error Pattern Analysis. 

919 

920 Characterizes bit error patterns to diagnose capture quality issues 

921 (EMI, USB problems, clock jitter). 

922 

923 Args: 

924 expected: Expected data. 

925 actual: Actual received data. 

926 

927 Returns: 

928 BitErrorAnalysis with error characterization. 

929 

930 Example: 

931 >>> result = analyze_bit_errors(expected_data, actual_data) 

932 >>> print(f"Error rate: {result.error_rate:.2e}") 

933 >>> print(f"Pattern: {result.error_pattern.value}") 

934 >>> print(f"Cause: {result.probable_cause}") 

935 """ 

936 if isinstance(expected, bytes): 

937 expected = np.frombuffer(expected, dtype=np.uint8) 

938 if isinstance(actual, bytes): 

939 actual = np.frombuffer(actual, dtype=np.uint8) 

940 

941 # Pad shorter array 

942 min_len = min(len(expected), len(actual)) 

943 expected = expected[:min_len] 

944 actual = actual[:min_len] 

945 

946 # XOR to find differences 

947 xor = expected ^ actual 

948 

949 # Count errors per bit position (0-7) 

950 bit_errors_by_position = [0] * 8 

951 total_bit_errors = 0 

952 error_locations = [] 

953 

954 for i, byte in enumerate(xor): 

955 if byte != 0: 

956 for bit in range(8): 

957 if (byte >> bit) & 1: 

958 bit_errors_by_position[bit] += 1 

959 total_bit_errors += 1 

960 error_locations.append(i * 8 + bit) 

961 

962 total_bits = min_len * 8 

963 error_rate = total_bit_errors / total_bits if total_bits > 0 else 0 

964 

965 # Analyze error pattern 

966 if len(error_locations) < 2: 

967 pattern = ErrorPattern.SINGLE_BIT 

968 burst_mean = 0.0 

969 burst_max = 0 

970 else: 

971 # Check for burst pattern 

972 gaps = np.diff(error_locations) 

973 mean_gap = np.mean(gaps) 

974 std_gap = np.std(gaps) 

975 

976 # Calculate burst lengths 

977 bursts = [] 

978 current_burst = 1 

979 for gap in gaps: 

980 if gap <= 2: # Adjacent or near-adjacent errors 

981 current_burst += 1 

982 else: 

983 bursts.append(current_burst) 

984 current_burst = 1 

985 bursts.append(current_burst) 

986 

987 burst_mean = float(np.mean(bursts)) 

988 burst_max = int(max(bursts)) 

989 

990 if burst_max > 5: 

991 pattern = ErrorPattern.BURST 

992 elif std_gap < mean_gap * 0.3: 

993 pattern = ErrorPattern.SYSTEMATIC 

994 else: 

995 pattern = ErrorPattern.RANDOM 

996 

997 # Determine probable cause and recommendations 

998 probable_cause, recommendations = _diagnose_errors(pattern, error_rate, bit_errors_by_position) 

999 

1000 return BitErrorAnalysis( 

1001 error_rate=error_rate, 

1002 error_pattern=pattern, 

1003 burst_length_mean=burst_mean, 

1004 burst_length_max=burst_max, 

1005 error_distribution=bit_errors_by_position, 

1006 probable_cause=probable_cause, 

1007 recommendations=recommendations, 

1008 ) 

1009 

1010 

1011def _diagnose_errors( 

1012 pattern: ErrorPattern, 

1013 error_rate: float, 

1014 bit_distribution: list[int], 

1015) -> tuple[str, list[str]]: 

1016 """Diagnose probable cause of errors.""" 

1017 if pattern == ErrorPattern.BURST: 

1018 cause = "Electromagnetic interference (EMI) or USB transmission errors" 

1019 recommendations = [ 

1020 "Use shorter cables", 

1021 "Add ferrite beads", 

1022 "Check for nearby interference sources", 

1023 "Try a different USB port or hub", 

1024 ] 

1025 elif pattern == ErrorPattern.SYSTEMATIC: 

1026 cause = "Clock synchronization or sampling issues" 

1027 recommendations = [ 

1028 "Verify sample rate is adequate (10x signal rate)", 

1029 "Check for clock jitter on logic analyzer", 

1030 "Ensure proper signal termination", 

1031 ] 

1032 elif pattern == ErrorPattern.RANDOM: 

1033 if error_rate > 0.01: 1033 ↛ 1034line 1033 didn't jump to line 1034 because the condition on line 1033 was never true

1034 cause = "Poor signal quality or threshold issues" 

1035 recommendations = [ 

1036 "Adjust voltage threshold", 

1037 "Reduce cable length", 

1038 "Check signal integrity", 

1039 ] 

1040 else: 

1041 cause = "Normal noise level" 

1042 recommendations = ["Error rate is acceptable"] 

1043 else: # SINGLE_BIT 

1044 # Check bit distribution for systematic bias 

1045 max_bit = max(bit_distribution) 

1046 min_bit = min(bit_distribution) 

1047 if max_bit > 0 and max_bit > 2 * (min_bit + 1): 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true

1048 biased_bit = bit_distribution.index(max_bit) 

1049 cause = f"Bit {biased_bit} shows higher error rate - possible stuck bit" 

1050 recommendations = [ 

1051 f"Check hardware for bit {biased_bit} issues", 

1052 "May indicate logic analyzer channel problem", 

1053 ] 

1054 else: 

1055 cause = "Isolated single-bit error" 

1056 recommendations = ["Likely transient noise, no action needed"] 

1057 

1058 return cause, recommendations 

1059 

1060 

1061__all__ = [ 

1062 "BitErrorAnalysis", 

1063 "DAQGap", 

1064 "DAQGapAnalysis", 

1065 "ErrorPattern", 

1066 "FuzzyMatch", 

1067 "JitterCompensationResult", 

1068 "PacketRecoveryResult", 

1069 "analyze_bit_errors", 

1070 "compensate_timestamp_jitter", 

1071 "detect_gaps", 

1072 "detect_gaps_by_samples", 

1073 "detect_gaps_by_timestamps", 

1074 "error_tolerant_decode", 

1075 "fuzzy_pattern_search", 

1076 "robust_packet_parse", 

1077]