Coverage for src / tracekit / exploratory / sync.py: 97%
120 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Fuzzy synchronization pattern search for corrupted data.
4This module provides fuzzy pattern matching for finding sync words and markers
5in noisy or corrupted logic analyzer captures, with configurable bit error
6tolerance using Hamming distance.
7"""
9from dataclasses import dataclass
10from enum import Enum
11from typing import Literal
13import numpy as np
14from numpy.typing import NDArray
17class RecoveryStrategy(Enum):
18 """Error recovery strategies for corrupted packets.
20 Attributes:
21 NEXT_SYNC: Skip to next sync word when corruption detected
22 SKIP_BYTES: Skip fixed byte count and retry parsing
23 HEURISTIC: Use statistical packet length model for recovery
24 """
26 NEXT_SYNC = "next_sync"
27 SKIP_BYTES = "skip_bytes"
28 HEURISTIC = "heuristic"
31@dataclass
32class SyncMatch:
33 """Result from fuzzy sync pattern search.
35 Attributes:
36 index: Starting position of match in bits or bytes
37 matched_value: The actual value that matched (may differ from pattern)
38 hamming_distance: Number of bit errors in the match
39 confidence: Match confidence (1.0 - bit_errors/pattern_length)
40 pattern_length: Length of pattern in bits
41 """
43 index: int
44 matched_value: int
45 hamming_distance: int
46 confidence: float
47 pattern_length: int
50@dataclass
51class PacketParseResult:
52 """Result from robust packet parsing.
54 Attributes:
55 packets: List of successfully parsed packet data
56 valid: List of validity flags for each packet
57 errors: List of error types ('length_corruption', 'sync_lost', None)
58 error_positions: Byte positions where errors occurred
59 recovery_count: Number of times recovery was triggered
60 """
62 packets: list[bytes]
63 valid: list[bool]
64 errors: list[str | None]
65 error_positions: list[int]
66 recovery_count: int
69def hamming_distance(a: int, b: int, pattern_bits: int) -> int:
70 """Calculate Hamming distance between two integers.
72 Args:
73 a: First integer
74 b: Second integer
75 pattern_bits: Number of bits to compare (8, 16, 32, or 64)
77 Returns:
78 Number of differing bits
80 Examples:
81 >>> hamming_distance(0b10101010, 0b10101011, 8)
82 1
83 >>> hamming_distance(0xAA55, 0xAA54, 16)
84 1
85 """
86 # XOR gives 1s where bits differ
87 diff = a ^ b
88 # Mask to pattern length
89 mask = (1 << pattern_bits) - 1
90 diff &= mask
91 # Count set bits (population count)
92 return (diff).bit_count()
95def fuzzy_sync_search(
96 data: NDArray[np.uint8],
97 pattern: int,
98 *,
99 pattern_bits: Literal[8, 16, 32, 64] = 8,
100 max_errors: int = 2,
101 min_confidence: float = 0.85,
102) -> list[SyncMatch]:
103 """Find sync patterns with bit error tolerance using Hamming distance.
105 : Searches for sync words even with bit errors,
106 essential for recovering corrupted logic analyzer captures.
108 Performance targets (DAQ-001):
109 - ≥10 MB/s for max_errors=2
110 - ≥5 MB/s for max_errors=4
111 - ≥1 MB/s for max_errors=8
113 Confidence scoring (DAQ-001):
114 - ≥0.95 (0-1 bit errors): Highly reliable
115 - 0.85-0.95 (2-4 bit errors): Reliable
116 - <0.85 (>4 bit errors): Verify manually
118 Args:
119 data: Input byte array to search
120 pattern: Sync pattern to find (e.g., 0xAA55F0F0 for 32-bit)
121 pattern_bits: Pattern length in bits (8, 16, 32, or 64)
122 max_errors: Maximum tolerable bit errors (0-8)
123 min_confidence: Minimum confidence threshold (0.0-1.0)
125 Returns:
126 List of SyncMatch objects with position, matched value, distance,
127 and confidence score
129 Raises:
130 ValueError: If pattern_bits not in [8, 16, 32, 64]
131 ValueError: If max_errors < 0 or > 8
132 ValueError: If min_confidence not in [0.0, 1.0]
134 Examples:
135 >>> data = np.array([0xAA, 0x55, 0xF0, 0xF0, 0xFF], dtype=np.uint8)
136 >>> # Find exact match
137 >>> matches = fuzzy_sync_search(data, 0xAA55, pattern_bits=16)
138 >>> print(matches[0].confidence)
139 1.0
141 >>> # Find with 1 bit error (0xAA54 instead of 0xAA55)
142 >>> data = np.array([0xAA, 0x54, 0x00], dtype=np.uint8)
143 >>> matches = fuzzy_sync_search(data, 0xAA55, pattern_bits=16, max_errors=2)
144 >>> print(matches[0].hamming_distance)
145 1
147 References:
148 DAQ-001: Fuzzy Bit Pattern Search with Hamming Distance Tolerance
149 """
150 if pattern_bits not in (8, 16, 32, 64):
151 raise ValueError("pattern_bits must be 8, 16, 32, or 64")
153 if max_errors < 0 or max_errors > 8:
154 raise ValueError("max_errors must be in range [0, 8]")
156 if not 0.0 <= min_confidence <= 1.0:
157 raise ValueError("min_confidence must be in range [0.0, 1.0]")
159 pattern_bytes = pattern_bits // 8
160 if len(data) < pattern_bytes:
161 return []
163 matches: list[SyncMatch] = []
165 # Sliding window search
166 for i in range(len(data) - pattern_bytes + 1):
167 # Extract window and convert to integer
168 window = data[i : i + pattern_bytes]
170 # Convert bytes to integer (big-endian)
171 if pattern_bytes == 1:
172 value = int(window[0])
173 elif pattern_bytes == 2:
174 value = (int(window[0]) << 8) | int(window[1])
175 elif pattern_bytes == 4:
176 value = (
177 (int(window[0]) << 24)
178 | (int(window[1]) << 16)
179 | (int(window[2]) << 8)
180 | int(window[3])
181 )
182 else: # 8 bytes
183 value = 0
184 for j in range(8):
185 value = (value << 8) | int(window[j])
187 # Calculate Hamming distance
188 dist = hamming_distance(value, pattern, pattern_bits)
190 # Check if within error tolerance
191 if dist <= max_errors:
192 confidence = 1.0 - (dist / pattern_bits)
194 if confidence >= min_confidence:
195 matches.append(
196 SyncMatch(
197 index=i,
198 matched_value=value,
199 hamming_distance=dist,
200 confidence=confidence,
201 pattern_length=pattern_bits,
202 )
203 )
205 return matches
208def parse_variable_length_packets(
209 data: NDArray[np.uint8],
210 *,
211 sync_pattern: int | None = None,
212 sync_bits: Literal[8, 16, 32, 64] = 16,
213 length_offset: int = 2,
214 length_size: int = 2,
215 min_packet_size: int = 4,
216 max_packet_size: int = 1024,
217 recovery_strategy: RecoveryStrategy = RecoveryStrategy.NEXT_SYNC,
218 skip_bytes: int = 1,
219) -> PacketParseResult:
220 """Parse variable-length packets with error recovery.
222 : Robust parsing that continues after corruption,
223 falling back to sync word search when length fields are corrupted.
225 Error detection (DAQ-002):
226 - Invalid if: length=0, length>max_packet_size, or length&0xFF00=0xFF00
227 - Suspicious if: length < min_expected or length > 90th_percentile×2 # noqa: RUF002
229 Recovery strategies (DAQ-002):
230 - next_sync: Skip to next sync word (requires sync_pattern)
231 - skip_bytes: Skip fixed byte count and retry
232 - heuristic: Use statistical packet length model
234 Args:
235 data: Input byte array containing packets
236 sync_pattern: Optional sync word to search for on errors
237 sync_bits: Sync pattern length in bits
238 length_offset: Byte offset to length field from start
239 length_size: Length field size in bytes (1 or 2)
240 min_packet_size: Minimum valid packet size in bytes
241 max_packet_size: Maximum valid packet size in bytes
242 recovery_strategy: Strategy to use when corruption detected
243 skip_bytes: Number of bytes to skip for SKIP_BYTES strategy
245 Returns:
246 PacketParseResult with parsed packets and error information
248 Raises:
249 ValueError: If length_size not 1 or 2
250 ValueError: If recovery_strategy is NEXT_SYNC without sync_pattern
252 Examples:
253 >>> # Simple TLV parsing with sync word
254 >>> data = np.array([0xAA, 0x55, 0x00, 0x04, 0x01, 0x02], dtype=np.uint8)
255 >>> result = parse_variable_length_packets(
256 ... data, sync_pattern=0xAA55, length_offset=2
257 ... )
258 >>> len(result.packets)
259 1
261 References:
262 DAQ-002: Robust Variable-Length Packet Parsing with Error Recovery
263 """
264 if length_size not in (1, 2):
265 raise ValueError("length_size must be 1 or 2")
267 if recovery_strategy == RecoveryStrategy.NEXT_SYNC and sync_pattern is None:
268 raise ValueError("NEXT_SYNC strategy requires sync_pattern")
270 packets: list[bytes] = []
271 valid: list[bool] = []
272 errors: list[str | None] = []
273 error_positions: list[int] = []
274 recovery_count = 0
276 # Track packet lengths for heuristic recovery
277 packet_lengths: list[int] = []
279 pos = 0
280 while pos < len(data):
281 # Try to parse packet at current position
283 # Check if we have enough data for header
284 if pos + length_offset + length_size > len(data):
285 break
287 # Extract length field
288 length_pos = pos + length_offset
289 if length_size == 1:
290 pkt_length = int(data[length_pos])
291 else: # 2 bytes
292 pkt_length = (int(data[length_pos]) << 8) | int(data[length_pos + 1])
294 # Validate length field
295 is_valid_length = True
296 error_type = None
298 # Check for obviously corrupted lengths
299 if (
300 pkt_length == 0
301 or pkt_length > max_packet_size
302 or (length_size == 2 and (pkt_length & 0xFF00) == 0xFF00)
303 or pkt_length < min_packet_size
304 ):
305 is_valid_length = False
306 error_type = "length_corruption"
308 # Check suspiciously large length (heuristic)
309 if is_valid_length and len(packet_lengths) >= 10:
310 p90 = np.percentile(packet_lengths, 90)
311 if pkt_length > p90 * 2:
312 is_valid_length = False
313 error_type = "length_corruption"
315 if is_valid_length:
316 # Length looks good, try to extract packet
317 packet_end = pos + pkt_length
319 if packet_end <= len(data):
320 # Successfully extract packet
321 packet_data = bytes(data[pos:packet_end])
322 packets.append(packet_data)
323 valid.append(True)
324 errors.append(None)
325 packet_lengths.append(pkt_length)
326 pos = packet_end
327 else:
328 # Packet extends beyond data
329 errors.append("truncated")
330 error_positions.append(pos)
331 break
332 else:
333 # Length field corrupted - apply recovery
334 recovery_count += 1
335 error_positions.append(pos)
337 if recovery_strategy == RecoveryStrategy.NEXT_SYNC:
338 # Search for next sync word
339 assert sync_pattern is not None # Validated at function start
340 search_start = pos + 1
341 search_data = data[search_start:]
343 if len(search_data) >= sync_bits // 8: 343 ↛ 359line 343 didn't jump to line 359 because the condition on line 343 was always true
344 matches = fuzzy_sync_search(
345 search_data,
346 sync_pattern,
347 pattern_bits=sync_bits,
348 max_errors=0, # type: ignore[arg-type]
349 )
351 if matches:
352 # Found sync, jump to it
353 pos = search_start + matches[0].index
354 errors.append("sync_lost")
355 else:
356 # No more syncs found
357 break
358 else:
359 break
361 elif recovery_strategy == RecoveryStrategy.SKIP_BYTES:
362 # Skip fixed bytes and retry
363 pos += skip_bytes
364 errors.append(error_type)
366 elif recovery_strategy == RecoveryStrategy.HEURISTIC: 366 ↛ 280line 366 didn't jump to line 280 because the condition on line 366 was always true
367 # Use median packet length as guess
368 if packet_lengths: 368 ↛ 373line 368 didn't jump to line 373 because the condition on line 368 was always true
369 guess_length = int(np.median(packet_lengths))
370 pos += guess_length
371 else:
372 # No history, skip minimal amount
373 pos += min_packet_size
374 errors.append(error_type)
376 return PacketParseResult(
377 packets=packets,
378 valid=valid,
379 errors=errors,
380 error_positions=error_positions,
381 recovery_count=recovery_count,
382 )