Coverage for src / tracekit / analyzers / packet / daq.py: 94%
345 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""DAQ error-tolerant analysis module.
3This module provides error-tolerant DAQ analysis features including fuzzy
4pattern matching, error recovery, bit error characterization, and gap detection.
7Example:
8 >>> from tracekit.analyzers.packet.daq import fuzzy_pattern_search, detect_gaps
9 >>> matches = fuzzy_pattern_search(data, pattern=0xAA55, max_errors=2)
10 >>> for match in matches:
11 ... print(f"Found at {match.offset}, errors: {match.bit_errors}")
12"""
14from __future__ import annotations
16from dataclasses import dataclass, field
17from enum import Enum
18from typing import TYPE_CHECKING, Any
20import numpy as np
22if TYPE_CHECKING:
23 from numpy.typing import NDArray
25 from tracekit.core.types import WaveformTrace
28class ErrorPattern(Enum):
29 """Bit error pattern types."""
31 RANDOM = "random" # Uniformly distributed errors (noise)
32 BURST = "burst" # Clustered errors (interference)
33 SYSTEMATIC = "systematic" # Regular pattern (clock issues)
34 SINGLE_BIT = "single_bit" # Isolated single-bit errors
37@dataclass
38class FuzzyMatch:
39 """Result of fuzzy pattern search.
41 Attributes:
42 offset: Bit offset where pattern was found
43 matched_bits: Actual bits at this location
44 bit_errors: Number of bit errors (Hamming distance)
45 error_positions: Bit positions with errors
46 confidence: Match confidence (0-1)
47 """
49 offset: int
50 matched_bits: int
51 bit_errors: int
52 error_positions: list[int] = field(default_factory=list)
53 confidence: float = 1.0
55 @property
56 def is_exact(self) -> bool:
57 """Return True if exact match (no errors)."""
58 return self.bit_errors == 0
61@dataclass
62class PacketRecoveryResult:
63 """Result of error-tolerant packet parsing.
65 Attributes:
66 packets: Successfully parsed packets
67 recovered_packets: Packets recovered despite errors
68 failed_regions: Regions that could not be parsed
69 total_errors: Total bit errors encountered
70 sync_resync_count: Number of resynchronizations
71 """
73 packets: list[dict[str, Any]] = field(default_factory=list)
74 recovered_packets: list[dict[str, Any]] = field(default_factory=list)
75 failed_regions: list[tuple[int, int]] = field(default_factory=list)
76 total_errors: int = 0
77 sync_resync_count: int = 0
80@dataclass
81class JitterCompensationResult:
82 """Result of timestamp jitter compensation.
84 Attributes:
85 original_timestamps: Original timestamps
86 corrected_timestamps: Jitter-compensated timestamps
87 jitter_removed_ns: RMS jitter removed in nanoseconds
88 clock_drift_ppm: Estimated clock drift in ppm
89 correction_method: Method used for correction
90 """
92 original_timestamps: NDArray[np.float64]
93 corrected_timestamps: NDArray[np.float64]
94 jitter_removed_ns: float
95 clock_drift_ppm: float
96 correction_method: str
99@dataclass
100class BitErrorAnalysis:
101 """Bit error pattern analysis result.
103 Attributes:
104 error_rate: Overall bit error rate
105 error_pattern: Classified error pattern type
106 burst_length_mean: Mean burst length (for burst errors)
107 burst_length_max: Maximum burst length
108 error_distribution: Error count by bit position (LSB to MSB)
109 probable_cause: Inferred probable cause
110 recommendations: Suggested fixes
111 """
113 error_rate: float
114 error_pattern: ErrorPattern
115 burst_length_mean: float = 0.0
116 burst_length_max: int = 0
117 error_distribution: list[int] = field(default_factory=list)
118 probable_cause: str = ""
119 recommendations: list[str] = field(default_factory=list)
122# =============================================================================
123# =============================================================================
126@dataclass
127class DAQGap:
128 """Represents a detected gap in DAQ data.
130 Attributes:
131 start_index: Sample index where gap starts
132 end_index: Sample index where gap ends
133 start_time: Time when gap starts (seconds)
134 end_time: Time when gap ends (seconds)
135 duration: Gap duration in seconds
136 expected_samples: Number of samples that should be present
137 missing_samples: Estimated number of missing samples
138 gap_type: Type of gap ('timestamp', 'sample_count', 'discontinuity')
140 References:
141 PKT-008: DAQ Gap Detection
142 """
144 start_index: int
145 end_index: int
146 start_time: float
147 end_time: float
148 duration: float
149 expected_samples: int
150 missing_samples: int
151 gap_type: str = "timestamp"
154@dataclass
155class DAQGapAnalysis:
156 """Complete gap analysis result.
158 Attributes:
159 gaps: List of detected gaps
160 total_gaps: Total number of gaps found
161 total_missing_samples: Total estimated missing samples
162 total_gap_duration: Total gap duration in seconds
163 acquisition_efficiency: Ratio of captured samples to expected
164 sample_rate: Detected or specified sample rate
165 discontinuities: List of data discontinuity indices
166 metadata: Additional analysis metadata
168 References:
169 PKT-008: DAQ Gap Detection
170 """
172 gaps: list[DAQGap]
173 total_gaps: int
174 total_missing_samples: int
175 total_gap_duration: float
176 acquisition_efficiency: float
177 sample_rate: float
178 discontinuities: list[int]
179 metadata: dict[str, Any] = field(default_factory=dict)
182def detect_gaps(
183 trace: WaveformTrace,
184 *,
185 expected_interval: float | None = None,
186 tolerance: float = 0.1,
187 min_gap_samples: int = 1,
188) -> DAQGapAnalysis:
189 """Detect gaps in DAQ data stream.
191 Identifies missing samples based on expected sample interval
192 and timestamp analysis.
194 Args:
195 trace: Waveform trace to analyze
196 expected_interval: Expected time between samples (None = auto-detect)
197 tolerance: Tolerance for interval deviation (0.1 = 10%)
198 min_gap_samples: Minimum number of missing samples to report
200 Returns:
201 DAQGapAnalysis with detected gaps
203 Example:
204 >>> trace = tk.load('acquisition.wfm')
205 >>> result = detect_gaps(trace)
206 >>> for gap in result.gaps:
207 ... print(f"Gap at {gap.start_time:.6f}s: {gap.missing_samples} samples")
209 References:
210 PKT-008: DAQ Gap Detection
211 """
212 data = trace.data
213 sample_rate = trace.metadata.sample_rate
215 # Calculate expected interval
216 if expected_interval is None:
217 expected_interval = 1.0 / sample_rate
219 return detect_gaps_by_samples(
220 data,
221 sample_rate=sample_rate,
222 expected_interval=expected_interval,
223 tolerance=tolerance,
224 min_gap_samples=min_gap_samples,
225 )
228def detect_gaps_by_timestamps(
229 timestamps: NDArray[np.float64],
230 *,
231 expected_interval: float | None = None,
232 tolerance: float = 0.1,
233 min_gap_samples: int = 1,
234) -> DAQGapAnalysis:
235 """Detect gaps using explicit timestamps.
237 Args:
238 timestamps: Array of sample timestamps in seconds
239 expected_interval: Expected interval between samples
240 tolerance: Tolerance for interval deviation
241 min_gap_samples: Minimum missing samples to report
243 Returns:
244 DAQGapAnalysis with detected gaps
246 Example:
247 >>> timestamps = np.array([0.0, 1e-6, 2e-6, 5e-6, 6e-6]) # Gap at 2-5us
248 >>> result = detect_gaps_by_timestamps(timestamps)
250 References:
251 PKT-008: DAQ Gap Detection
252 """
253 if len(timestamps) < 2:
254 return DAQGapAnalysis(
255 gaps=[],
256 total_gaps=0,
257 total_missing_samples=0,
258 total_gap_duration=0.0,
259 acquisition_efficiency=1.0,
260 sample_rate=0.0,
261 discontinuities=[],
262 )
264 # Calculate intervals
265 intervals = np.diff(timestamps)
267 # Auto-detect expected interval if not provided
268 if expected_interval is None:
269 expected_interval = float(np.median(intervals))
271 sample_rate = 1.0 / expected_interval
273 # Calculate allowed deviation
274 max_interval = expected_interval * (1 + tolerance)
276 # Find gaps
277 gaps: list[DAQGap] = []
278 discontinuities: list[int] = []
279 total_missing = 0
280 total_gap_duration = 0.0
282 for i, interval in enumerate(intervals):
283 if interval > max_interval:
284 # Calculate missing samples
285 missing = round(interval / expected_interval) - 1
287 if missing >= min_gap_samples: 287 ↛ 282line 287 didn't jump to line 282 because the condition on line 287 was always true
288 gap = DAQGap(
289 start_index=i,
290 end_index=i + 1,
291 start_time=float(timestamps[i]),
292 end_time=float(timestamps[i + 1]),
293 duration=float(interval - expected_interval),
294 expected_samples=missing + 1,
295 missing_samples=missing,
296 gap_type="timestamp",
297 )
298 gaps.append(gap)
299 total_missing += missing
300 total_gap_duration += gap.duration
301 discontinuities.append(i)
303 # Calculate efficiency
304 total_expected = len(timestamps) + total_missing
305 efficiency = len(timestamps) / total_expected if total_expected > 0 else 1.0
307 return DAQGapAnalysis(
308 gaps=gaps,
309 total_gaps=len(gaps),
310 total_missing_samples=total_missing,
311 total_gap_duration=total_gap_duration,
312 acquisition_efficiency=efficiency,
313 sample_rate=sample_rate,
314 discontinuities=discontinuities,
315 metadata={
316 "method": "timestamp",
317 "expected_interval": expected_interval,
318 "tolerance": tolerance,
319 },
320 )
323def detect_gaps_by_samples(
324 data: NDArray[np.float64],
325 *,
326 sample_rate: float,
327 expected_interval: float | None = None,
328 tolerance: float = 0.1,
329 min_gap_samples: int = 1,
330 check_discontinuities: bool = True,
331) -> DAQGapAnalysis:
332 """Detect gaps using sample count analysis.
334 Analyzes data for discontinuities that may indicate gaps.
335 Uses derivative analysis to find sudden jumps.
337 Args:
338 data: Sample data array
339 sample_rate: Sample rate in Hz
340 expected_interval: Expected interval (None = 1/sample_rate)
341 tolerance: Tolerance for detection
342 min_gap_samples: Minimum gap size to report
343 check_discontinuities: Check for value discontinuities
345 Returns:
346 DAQGapAnalysis with detected gaps
348 References:
349 PKT-008: DAQ Gap Detection
350 """
351 if len(data) < 2: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 return DAQGapAnalysis(
353 gaps=[],
354 total_gaps=0,
355 total_missing_samples=0,
356 total_gap_duration=0.0,
357 acquisition_efficiency=1.0,
358 sample_rate=sample_rate,
359 discontinuities=[],
360 )
362 if expected_interval is None:
363 expected_interval = 1.0 / sample_rate
365 gaps: list[DAQGap] = []
366 discontinuities: list[int] = []
368 if check_discontinuities: 368 ↛ 410line 368 didn't jump to line 410 because the condition on line 368 was always true
369 # Analyze for sudden value jumps (potential gaps)
370 diff = np.abs(np.diff(data))
371 median_diff = float(np.median(diff))
372 std_diff = float(np.std(diff))
374 # Threshold for discontinuity
375 threshold = median_diff + 5 * std_diff
377 # Find discontinuity points
378 disc_mask = diff > threshold
379 disc_indices = np.where(disc_mask)[0]
381 for idx in disc_indices:
382 # Estimate gap size based on value jump
383 jump_size = diff[idx]
385 # Assume linear trend, estimate missing samples
386 if median_diff > 0: 386 ↛ 389line 386 didn't jump to line 389 because the condition on line 386 was always true
387 estimated_missing = max(1, int(jump_size / median_diff) - 1)
388 else:
389 estimated_missing = min_gap_samples
391 if estimated_missing >= min_gap_samples: 391 ↛ 381line 391 didn't jump to line 381 because the condition on line 391 was always true
392 start_time = idx / sample_rate
393 end_time = (idx + 1) / sample_rate
394 gap_duration = estimated_missing * expected_interval
396 gap = DAQGap(
397 start_index=int(idx),
398 end_index=int(idx) + 1,
399 start_time=start_time,
400 end_time=end_time,
401 duration=gap_duration,
402 expected_samples=estimated_missing + 1,
403 missing_samples=estimated_missing,
404 gap_type="discontinuity",
405 )
406 gaps.append(gap)
407 discontinuities.append(int(idx))
409 # Calculate totals
410 total_missing = sum(g.missing_samples for g in gaps)
411 total_gap_duration = sum(g.duration for g in gaps)
412 total_expected = len(data) + total_missing
413 efficiency = len(data) / total_expected if total_expected > 0 else 1.0
415 return DAQGapAnalysis(
416 gaps=gaps,
417 total_gaps=len(gaps),
418 total_missing_samples=total_missing,
419 total_gap_duration=total_gap_duration,
420 acquisition_efficiency=efficiency,
421 sample_rate=sample_rate,
422 discontinuities=discontinuities,
423 metadata={
424 "method": "sample_count",
425 "expected_interval": expected_interval,
426 "tolerance": tolerance,
427 "check_discontinuities": check_discontinuities,
428 },
429 )
432# =============================================================================
433# =============================================================================
436def fuzzy_pattern_search(
437 data: bytes | NDArray[np.uint8],
438 pattern: int | bytes,
439 *,
440 pattern_bits: int = 32,
441 max_errors: int = 2,
442 step: int = 1,
443) -> list[FuzzyMatch]:
444 """Search for bit patterns with Hamming distance tolerance.
446 : Fuzzy Bit Pattern Search.
448 Finds sync words and patterns even with bit errors (flipped bits).
449 Essential for recovering corrupted logic analyzer captures.
451 Args:
452 data: Binary data to search (bytes or numpy array).
453 pattern: Pattern to search for (int or bytes).
454 pattern_bits: Number of bits in pattern.
455 max_errors: Maximum allowed bit errors (Hamming distance).
456 step: Search step in bits.
458 Returns:
459 List of FuzzyMatch objects for all matches within tolerance.
461 Example:
462 >>> # Find 0xAA55 sync word with up to 2 bit errors
463 >>> data = bytes([0xAA, 0x55, 0x12, 0x34, 0xAB, 0x55])
464 >>> matches = fuzzy_pattern_search(data, 0xAA55, pattern_bits=16, max_errors=2)
465 >>> print(f"Found {len(matches)} matches")
466 """
467 if isinstance(data, bytes):
468 data = np.frombuffer(data, dtype=np.uint8)
470 if isinstance(pattern, bytes):
471 pattern = int.from_bytes(pattern, byteorder="big")
473 # Ensure pattern fits in specified bits
474 pattern_mask = (1 << pattern_bits) - 1
475 pattern = pattern & pattern_mask
477 matches: list[FuzzyMatch] = []
479 # Convert data to bit array for searching
480 total_bits = len(data) * 8
482 for bit_offset in range(0, total_bits - pattern_bits + 1, step):
483 # Extract bits at this offset
484 extracted = _extract_bits(data, bit_offset, pattern_bits)
486 # Calculate Hamming distance
487 xor = extracted ^ pattern
488 bit_errors = (xor).bit_count()
490 if bit_errors <= max_errors:
491 # Find error positions
492 error_positions = []
493 for i in range(pattern_bits):
494 if (xor >> i) & 1:
495 error_positions.append(i)
497 confidence = 1.0 - (bit_errors / pattern_bits)
499 matches.append(
500 FuzzyMatch(
501 offset=bit_offset,
502 matched_bits=extracted,
503 bit_errors=bit_errors,
504 error_positions=error_positions,
505 confidence=confidence,
506 )
507 )
509 return matches
512def _extract_bits(data: NDArray[np.uint8], bit_offset: int, num_bits: int) -> int:
513 """Extract bits from data array."""
514 result = 0
515 for i in range(num_bits):
516 bit_pos = bit_offset + i
517 byte_idx = bit_pos // 8
518 bit_in_byte = 7 - (bit_pos % 8) # MSB first
520 if byte_idx < len(data) and (data[byte_idx] >> bit_in_byte) & 1:
521 result |= 1 << (num_bits - 1 - i)
523 return result
526# =============================================================================
527# =============================================================================
530def robust_packet_parse(
531 data: bytes | NDArray[np.uint8],
532 *,
533 sync_pattern: int = 0xAA55,
534 sync_bits: int = 16,
535 length_offset: int = 2, # Bytes after sync
536 max_packet_length: int = 256,
537 error_tolerance: int = 2,
538) -> PacketRecoveryResult:
539 """Parse variable-length packets with error recovery.
541 : Robust Variable-Length Packet Parsing.
543 Parses packets even when length fields are corrupted by falling
544 back to sync word search.
546 Args:
547 data: Binary data containing packets.
548 sync_pattern: Sync word pattern.
549 sync_bits: Bits in sync pattern.
550 length_offset: Byte offset to length field after sync.
551 max_packet_length: Maximum valid packet length.
552 error_tolerance: Max bit errors for sync detection.
554 Returns:
555 PacketRecoveryResult with parsed and recovered packets.
557 Example:
558 >>> result = robust_packet_parse(data, sync_pattern=0xAA55)
559 >>> print(f"Parsed: {len(result.packets)}, Recovered: {len(result.recovered_packets)}")
560 """
561 if isinstance(data, bytes):
562 data = np.frombuffer(data, dtype=np.uint8)
564 result = PacketRecoveryResult()
566 # Find all sync patterns (fuzzy)
567 sync_matches = fuzzy_pattern_search(
568 data, sync_pattern, pattern_bits=sync_bits, max_errors=error_tolerance
569 )
571 # Sort by offset
572 sync_matches.sort(key=lambda m: m.offset)
574 i = 0
575 while i < len(sync_matches):
576 match = sync_matches[i]
577 byte_offset = match.offset // 8
579 if byte_offset + length_offset >= len(data): 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 break
582 # Read length field
583 length = data[byte_offset + length_offset]
585 # Validate length
586 if length > max_packet_length or length == 0:
587 # Try to find next sync as packet boundary
588 if i + 1 < len(sync_matches): 588 ↛ 613line 588 didn't jump to line 613 because the condition on line 588 was always true
589 next_sync_byte = sync_matches[i + 1].offset // 8
590 inferred_length = next_sync_byte - byte_offset
592 if 0 < inferred_length <= max_packet_length: 592 ↛ 609line 592 didn't jump to line 609 because the condition on line 592 was always true
593 # Recovered packet with inferred length
594 packet_data = bytes(data[byte_offset : byte_offset + inferred_length])
595 result.recovered_packets.append(
596 {
597 "offset": byte_offset,
598 "length": inferred_length,
599 "data": packet_data,
600 "sync_errors": match.bit_errors,
601 "length_corrupted": True,
602 }
603 )
604 result.total_errors += match.bit_errors
605 result.sync_resync_count += 1
606 i += 1
607 continue
608 else:
609 result.failed_regions.append((byte_offset, byte_offset + 10))
610 i += 1
611 continue
612 else:
613 break
615 # Valid length - extract packet
616 packet_end = byte_offset + length_offset + 1 + length
617 if packet_end <= len(data):
618 packet_data = bytes(data[byte_offset:packet_end])
619 result.packets.append(
620 {
621 "offset": byte_offset,
622 "length": length,
623 "data": packet_data,
624 "sync_errors": match.bit_errors,
625 }
626 )
627 result.total_errors += match.bit_errors
629 i += 1
631 return result
634# =============================================================================
635# =============================================================================
638def compensate_timestamp_jitter(
639 timestamps: NDArray[np.float64],
640 *,
641 expected_rate: float | None = None,
642 method: str = "lowpass",
643 cutoff_ratio: float = 0.1,
644) -> JitterCompensationResult:
645 """Compensate timestamp jitter and clock drift.
647 : Timestamp Jitter Compensation.
649 Corrects sample timestamps affected by clock jitter using low-pass
650 filtering or PLL model.
652 Args:
653 timestamps: Array of timestamps in seconds.
654 expected_rate: Expected sample rate (auto-detected if None).
655 method: Compensation method ('lowpass', 'pll', 'linear').
656 cutoff_ratio: Low-pass filter cutoff as ratio of sample rate.
658 Returns:
659 JitterCompensationResult with corrected timestamps.
661 Raises:
662 ValueError: If unknown compensation method specified.
664 Example:
665 >>> result = compensate_timestamp_jitter(timestamps, expected_rate=1e6)
666 >>> print(f"Jitter removed: {result.jitter_removed_ns:.1f} ns")
667 """
668 from scipy import signal
670 n = len(timestamps)
671 if n < 2:
672 return JitterCompensationResult(
673 original_timestamps=timestamps,
674 corrected_timestamps=timestamps,
675 jitter_removed_ns=0,
676 clock_drift_ppm=0,
677 correction_method=method,
678 )
680 # Calculate inter-sample intervals
681 intervals = np.diff(timestamps)
683 # Auto-detect expected rate from median interval
684 if expected_rate is None:
685 expected_interval = np.median(intervals)
686 expected_rate = 1.0 / expected_interval
687 else:
688 expected_interval = 1.0 / expected_rate
690 if method == "lowpass":
691 # Low-pass filter the intervals to remove high-frequency jitter
692 order = 2
693 cutoff = cutoff_ratio
694 b, a = signal.butter(order, cutoff, btype="low")
696 # Apply filter
697 filtered_intervals = signal.filtfilt(b, a, intervals)
699 # Reconstruct timestamps
700 corrected = np.zeros_like(timestamps)
701 corrected[0] = timestamps[0]
702 corrected[1:] = timestamps[0] + np.cumsum(filtered_intervals)
704 elif method == "linear":
705 # Simple linear fit (clock drift only)
706 indices = np.arange(n)
707 coeffs = np.polyfit(indices, timestamps, 1)
708 corrected = np.polyval(coeffs, indices)
710 elif method == "pll": 710 ↛ 729line 710 didn't jump to line 729 because the condition on line 710 was always true
711 # PLL-based correction (simplified)
712 # Track expected vs actual and apply proportional correction
713 corrected = np.zeros_like(timestamps)
714 corrected[0] = timestamps[0]
716 phase_error = 0.0
717 gain = 0.1 # PLL gain
719 for i in range(1, n):
720 expected_time = corrected[i - 1] + expected_interval
721 actual_time = timestamps[i]
723 phase_error = actual_time - expected_time
724 correction = gain * phase_error
726 corrected[i] = expected_time + correction
728 else:
729 raise ValueError(f"Unknown method: {method}")
731 # Calculate metrics
732 original_jitter = np.std(intervals - expected_interval)
733 corrected_intervals = np.diff(corrected)
734 corrected_jitter = np.std(corrected_intervals - expected_interval)
735 jitter_removed = original_jitter - corrected_jitter
737 # Estimate clock drift
738 total_time = timestamps[-1] - timestamps[0]
739 expected_total = (n - 1) * expected_interval
740 drift_ratio = (total_time - expected_total) / expected_total
741 clock_drift_ppm = drift_ratio * 1e6
743 return JitterCompensationResult(
744 original_timestamps=timestamps,
745 corrected_timestamps=corrected,
746 jitter_removed_ns=jitter_removed * 1e9,
747 clock_drift_ppm=clock_drift_ppm,
748 correction_method=method,
749 )
752# =============================================================================
753# =============================================================================
756def error_tolerant_decode(
757 data: bytes | NDArray[np.uint8],
758 protocol: str,
759 *,
760 max_errors_per_frame: int = 2,
761 resync_on_error: bool = True,
762) -> dict[str, Any]:
763 """Decode protocol with error tolerance and resynchronization.
765 : Error-Tolerant Protocol Decoding.
767 Continues decoding after framing/parity errors instead of aborting.
769 Args:
770 data: Binary data to decode.
771 protocol: Protocol name ('uart', 'spi', 'i2c').
772 max_errors_per_frame: Max errors before skipping frame.
773 resync_on_error: Attempt resynchronization on errors.
775 Returns:
776 Dictionary with decoded frames, errors, and sync info.
778 Raises:
779 ValueError: If unsupported protocol specified.
781 Example:
782 >>> result = error_tolerant_decode(data, 'uart', max_errors_per_frame=2)
783 >>> print(f"Decoded: {result['frame_count']}, Errors: {result['error_count']}")
784 """
785 if isinstance(data, bytes):
786 data = np.frombuffer(data, dtype=np.uint8)
788 result = {
789 "protocol": protocol,
790 "frames": [],
791 "frame_count": 0,
792 "error_count": 0,
793 "resync_count": 0,
794 "error_frames": [],
795 }
797 # Protocol-specific decoding with error recovery
798 if protocol.lower() == "uart":
799 result = _decode_uart_tolerant(data, max_errors_per_frame, resync_on_error)
800 elif protocol.lower() == "spi":
801 result = _decode_spi_tolerant(data, max_errors_per_frame)
802 elif protocol.lower() == "i2c":
803 result = _decode_i2c_tolerant(data, max_errors_per_frame, resync_on_error)
804 else:
805 raise ValueError(f"Unsupported protocol: {protocol}")
807 return result
810def _decode_uart_tolerant(
811 data: NDArray[np.uint8],
812 max_errors: int,
813 resync: bool,
814) -> dict[str, Any]:
815 """UART decode with error tolerance."""
816 # Simplified UART decoding with error recovery
817 frames = []
818 errors = []
820 # In reality, would properly decode UART bit stream
821 # Here we treat each byte as a frame for demonstration
822 for i, byte in enumerate(data):
823 # Check for framing errors (simplified: check start/stop bits if present)
824 parity_error = ((byte).bit_count() % 2) != 0 # Assuming odd parity
826 if parity_error:
827 errors.append({"offset": i, "type": "parity", "byte": byte})
828 if len(errors) > max_errors and resync:
829 # Skip to next potential start
830 continue
831 else:
832 frames.append({"offset": i, "data": byte, "valid": True})
834 return {
835 "protocol": "uart",
836 "frames": frames,
837 "frame_count": len(frames),
838 "error_count": len(errors),
839 "resync_count": 0,
840 "error_frames": errors,
841 }
844def _decode_spi_tolerant(
845 data: NDArray[np.uint8],
846 max_errors: int,
847) -> dict[str, Any]:
848 """SPI decode with error tolerance."""
849 frames = []
850 for i, byte in enumerate(data):
851 frames.append({"offset": i, "mosi": byte, "miso": 0, "valid": True})
853 return {
854 "protocol": "spi",
855 "frames": frames,
856 "frame_count": len(frames),
857 "error_count": 0,
858 "resync_count": 0,
859 "error_frames": [],
860 }
863def _decode_i2c_tolerant(
864 data: NDArray[np.uint8],
865 max_errors: int,
866 resync: bool,
867) -> dict[str, Any]:
868 """I2C decode with error tolerance."""
869 frames = []
870 errors = []
872 i = 0
873 while i < len(data):
874 # Look for start condition marker (simplified)
875 if data[i] == 0x00: # Start marker
876 if i + 2 < len(data): 876 ↛ 890line 876 didn't jump to line 890 because the condition on line 876 was always true
877 addr = data[i + 1]
878 data_byte = data[i + 2]
879 frames.append(
880 {
881 "offset": i,
882 "address": addr >> 1,
883 "read": bool(addr & 1),
884 "data": data_byte,
885 "ack": True,
886 }
887 )
888 i += 3
889 else:
890 break
891 else:
892 errors.append({"offset": i, "type": "no_start"})
893 if resync:
894 i += 1
895 else:
896 break
898 return {
899 "protocol": "i2c",
900 "frames": frames,
901 "frame_count": len(frames),
902 "error_count": len(errors),
903 "resync_count": len(errors) if resync else 0,
904 "error_frames": errors,
905 }
908# =============================================================================
909# =============================================================================
912def analyze_bit_errors(
913 expected: bytes | NDArray[np.uint8],
914 actual: bytes | NDArray[np.uint8],
915) -> BitErrorAnalysis:
916 """Analyze bit error patterns for diagnostics.
918 : Bit Error Pattern Analysis.
920 Characterizes bit error patterns to diagnose capture quality issues
921 (EMI, USB problems, clock jitter).
923 Args:
924 expected: Expected data.
925 actual: Actual received data.
927 Returns:
928 BitErrorAnalysis with error characterization.
930 Example:
931 >>> result = analyze_bit_errors(expected_data, actual_data)
932 >>> print(f"Error rate: {result.error_rate:.2e}")
933 >>> print(f"Pattern: {result.error_pattern.value}")
934 >>> print(f"Cause: {result.probable_cause}")
935 """
936 if isinstance(expected, bytes):
937 expected = np.frombuffer(expected, dtype=np.uint8)
938 if isinstance(actual, bytes):
939 actual = np.frombuffer(actual, dtype=np.uint8)
941 # Pad shorter array
942 min_len = min(len(expected), len(actual))
943 expected = expected[:min_len]
944 actual = actual[:min_len]
946 # XOR to find differences
947 xor = expected ^ actual
949 # Count errors per bit position (0-7)
950 bit_errors_by_position = [0] * 8
951 total_bit_errors = 0
952 error_locations = []
954 for i, byte in enumerate(xor):
955 if byte != 0:
956 for bit in range(8):
957 if (byte >> bit) & 1:
958 bit_errors_by_position[bit] += 1
959 total_bit_errors += 1
960 error_locations.append(i * 8 + bit)
962 total_bits = min_len * 8
963 error_rate = total_bit_errors / total_bits if total_bits > 0 else 0
965 # Analyze error pattern
966 if len(error_locations) < 2:
967 pattern = ErrorPattern.SINGLE_BIT
968 burst_mean = 0.0
969 burst_max = 0
970 else:
971 # Check for burst pattern
972 gaps = np.diff(error_locations)
973 mean_gap = np.mean(gaps)
974 std_gap = np.std(gaps)
976 # Calculate burst lengths
977 bursts = []
978 current_burst = 1
979 for gap in gaps:
980 if gap <= 2: # Adjacent or near-adjacent errors
981 current_burst += 1
982 else:
983 bursts.append(current_burst)
984 current_burst = 1
985 bursts.append(current_burst)
987 burst_mean = float(np.mean(bursts))
988 burst_max = int(max(bursts))
990 if burst_max > 5:
991 pattern = ErrorPattern.BURST
992 elif std_gap < mean_gap * 0.3:
993 pattern = ErrorPattern.SYSTEMATIC
994 else:
995 pattern = ErrorPattern.RANDOM
997 # Determine probable cause and recommendations
998 probable_cause, recommendations = _diagnose_errors(pattern, error_rate, bit_errors_by_position)
1000 return BitErrorAnalysis(
1001 error_rate=error_rate,
1002 error_pattern=pattern,
1003 burst_length_mean=burst_mean,
1004 burst_length_max=burst_max,
1005 error_distribution=bit_errors_by_position,
1006 probable_cause=probable_cause,
1007 recommendations=recommendations,
1008 )
1011def _diagnose_errors(
1012 pattern: ErrorPattern,
1013 error_rate: float,
1014 bit_distribution: list[int],
1015) -> tuple[str, list[str]]:
1016 """Diagnose probable cause of errors."""
1017 if pattern == ErrorPattern.BURST:
1018 cause = "Electromagnetic interference (EMI) or USB transmission errors"
1019 recommendations = [
1020 "Use shorter cables",
1021 "Add ferrite beads",
1022 "Check for nearby interference sources",
1023 "Try a different USB port or hub",
1024 ]
1025 elif pattern == ErrorPattern.SYSTEMATIC:
1026 cause = "Clock synchronization or sampling issues"
1027 recommendations = [
1028 "Verify sample rate is adequate (10x signal rate)",
1029 "Check for clock jitter on logic analyzer",
1030 "Ensure proper signal termination",
1031 ]
1032 elif pattern == ErrorPattern.RANDOM:
1033 if error_rate > 0.01: 1033 ↛ 1034line 1033 didn't jump to line 1034 because the condition on line 1033 was never true
1034 cause = "Poor signal quality or threshold issues"
1035 recommendations = [
1036 "Adjust voltage threshold",
1037 "Reduce cable length",
1038 "Check signal integrity",
1039 ]
1040 else:
1041 cause = "Normal noise level"
1042 recommendations = ["Error rate is acceptable"]
1043 else: # SINGLE_BIT
1044 # Check bit distribution for systematic bias
1045 max_bit = max(bit_distribution)
1046 min_bit = min(bit_distribution)
1047 if max_bit > 0 and max_bit > 2 * (min_bit + 1): 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true
1048 biased_bit = bit_distribution.index(max_bit)
1049 cause = f"Bit {biased_bit} shows higher error rate - possible stuck bit"
1050 recommendations = [
1051 f"Check hardware for bit {biased_bit} issues",
1052 "May indicate logic analyzer channel problem",
1053 ]
1054 else:
1055 cause = "Isolated single-bit error"
1056 recommendations = ["Likely transient noise, no action needed"]
1058 return cause, recommendations
1061__all__ = [
1062 "BitErrorAnalysis",
1063 "DAQGap",
1064 "DAQGapAnalysis",
1065 "ErrorPattern",
1066 "FuzzyMatch",
1067 "JitterCompensationResult",
1068 "PacketRecoveryResult",
1069 "analyze_bit_errors",
1070 "compensate_timestamp_jitter",
1071 "detect_gaps",
1072 "detect_gaps_by_samples",
1073 "detect_gaps_by_timestamps",
1074 "error_tolerant_decode",
1075 "fuzzy_pattern_search",
1076 "robust_packet_parse",
1077]