Coverage for src / tracekit / exploratory / sync.py: 97%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Fuzzy synchronization pattern search for corrupted data. 

2 

3 

4This module provides fuzzy pattern matching for finding sync words and markers 

5in noisy or corrupted logic analyzer captures, with configurable bit error 

6tolerance using Hamming distance. 

7""" 

8 

9from dataclasses import dataclass 

10from enum import Enum 

11from typing import Literal 

12 

13import numpy as np 

14from numpy.typing import NDArray 

15 

16 

17class RecoveryStrategy(Enum): 

18 """Error recovery strategies for corrupted packets. 

19 

20 Attributes: 

21 NEXT_SYNC: Skip to next sync word when corruption detected 

22 SKIP_BYTES: Skip fixed byte count and retry parsing 

23 HEURISTIC: Use statistical packet length model for recovery 

24 """ 

25 

26 NEXT_SYNC = "next_sync" 

27 SKIP_BYTES = "skip_bytes" 

28 HEURISTIC = "heuristic" 

29 

30 

31@dataclass 

32class SyncMatch: 

33 """Result from fuzzy sync pattern search. 

34 

35 Attributes: 

36 index: Starting position of match in bits or bytes 

37 matched_value: The actual value that matched (may differ from pattern) 

38 hamming_distance: Number of bit errors in the match 

39 confidence: Match confidence (1.0 - bit_errors/pattern_length) 

40 pattern_length: Length of pattern in bits 

41 """ 

42 

43 index: int 

44 matched_value: int 

45 hamming_distance: int 

46 confidence: float 

47 pattern_length: int 

48 

49 

50@dataclass 

51class PacketParseResult: 

52 """Result from robust packet parsing. 

53 

54 Attributes: 

55 packets: List of successfully parsed packet data 

56 valid: List of validity flags for each packet 

57 errors: List of error types ('length_corruption', 'sync_lost', None) 

58 error_positions: Byte positions where errors occurred 

59 recovery_count: Number of times recovery was triggered 

60 """ 

61 

62 packets: list[bytes] 

63 valid: list[bool] 

64 errors: list[str | None] 

65 error_positions: list[int] 

66 recovery_count: int 

67 

68 

69def hamming_distance(a: int, b: int, pattern_bits: int) -> int: 

70 """Calculate Hamming distance between two integers. 

71 

72 Args: 

73 a: First integer 

74 b: Second integer 

75 pattern_bits: Number of bits to compare (8, 16, 32, or 64) 

76 

77 Returns: 

78 Number of differing bits 

79 

80 Examples: 

81 >>> hamming_distance(0b10101010, 0b10101011, 8) 

82 1 

83 >>> hamming_distance(0xAA55, 0xAA54, 16) 

84 1 

85 """ 

86 # XOR gives 1s where bits differ 

87 diff = a ^ b 

88 # Mask to pattern length 

89 mask = (1 << pattern_bits) - 1 

90 diff &= mask 

91 # Count set bits (population count) 

92 return (diff).bit_count() 

93 

94 

95def fuzzy_sync_search( 

96 data: NDArray[np.uint8], 

97 pattern: int, 

98 *, 

99 pattern_bits: Literal[8, 16, 32, 64] = 8, 

100 max_errors: int = 2, 

101 min_confidence: float = 0.85, 

102) -> list[SyncMatch]: 

103 """Find sync patterns with bit error tolerance using Hamming distance. 

104 

105 : Searches for sync words even with bit errors, 

106 essential for recovering corrupted logic analyzer captures. 

107 

108 Performance targets (DAQ-001): 

109 - ≥10 MB/s for max_errors=2 

110 - ≥5 MB/s for max_errors=4 

111 - ≥1 MB/s for max_errors=8 

112 

113 Confidence scoring (DAQ-001): 

114 - ≥0.95 (0-1 bit errors): Highly reliable 

115 - 0.85-0.95 (2-4 bit errors): Reliable 

116 - <0.85 (>4 bit errors): Verify manually 

117 

118 Args: 

119 data: Input byte array to search 

120 pattern: Sync pattern to find (e.g., 0xAA55F0F0 for 32-bit) 

121 pattern_bits: Pattern length in bits (8, 16, 32, or 64) 

122 max_errors: Maximum tolerable bit errors (0-8) 

123 min_confidence: Minimum confidence threshold (0.0-1.0) 

124 

125 Returns: 

126 List of SyncMatch objects with position, matched value, distance, 

127 and confidence score 

128 

129 Raises: 

130 ValueError: If pattern_bits not in [8, 16, 32, 64] 

131 ValueError: If max_errors < 0 or > 8 

132 ValueError: If min_confidence not in [0.0, 1.0] 

133 

134 Examples: 

135 >>> data = np.array([0xAA, 0x55, 0xF0, 0xF0, 0xFF], dtype=np.uint8) 

136 >>> # Find exact match 

137 >>> matches = fuzzy_sync_search(data, 0xAA55, pattern_bits=16) 

138 >>> print(matches[0].confidence) 

139 1.0 

140 

141 >>> # Find with 1 bit error (0xAA54 instead of 0xAA55) 

142 >>> data = np.array([0xAA, 0x54, 0x00], dtype=np.uint8) 

143 >>> matches = fuzzy_sync_search(data, 0xAA55, pattern_bits=16, max_errors=2) 

144 >>> print(matches[0].hamming_distance) 

145 1 

146 

147 References: 

148 DAQ-001: Fuzzy Bit Pattern Search with Hamming Distance Tolerance 

149 """ 

150 if pattern_bits not in (8, 16, 32, 64): 

151 raise ValueError("pattern_bits must be 8, 16, 32, or 64") 

152 

153 if max_errors < 0 or max_errors > 8: 

154 raise ValueError("max_errors must be in range [0, 8]") 

155 

156 if not 0.0 <= min_confidence <= 1.0: 

157 raise ValueError("min_confidence must be in range [0.0, 1.0]") 

158 

159 pattern_bytes = pattern_bits // 8 

160 if len(data) < pattern_bytes: 

161 return [] 

162 

163 matches: list[SyncMatch] = [] 

164 

165 # Sliding window search 

166 for i in range(len(data) - pattern_bytes + 1): 

167 # Extract window and convert to integer 

168 window = data[i : i + pattern_bytes] 

169 

170 # Convert bytes to integer (big-endian) 

171 if pattern_bytes == 1: 

172 value = int(window[0]) 

173 elif pattern_bytes == 2: 

174 value = (int(window[0]) << 8) | int(window[1]) 

175 elif pattern_bytes == 4: 

176 value = ( 

177 (int(window[0]) << 24) 

178 | (int(window[1]) << 16) 

179 | (int(window[2]) << 8) 

180 | int(window[3]) 

181 ) 

182 else: # 8 bytes 

183 value = 0 

184 for j in range(8): 

185 value = (value << 8) | int(window[j]) 

186 

187 # Calculate Hamming distance 

188 dist = hamming_distance(value, pattern, pattern_bits) 

189 

190 # Check if within error tolerance 

191 if dist <= max_errors: 

192 confidence = 1.0 - (dist / pattern_bits) 

193 

194 if confidence >= min_confidence: 

195 matches.append( 

196 SyncMatch( 

197 index=i, 

198 matched_value=value, 

199 hamming_distance=dist, 

200 confidence=confidence, 

201 pattern_length=pattern_bits, 

202 ) 

203 ) 

204 

205 return matches 

206 

207 

208def parse_variable_length_packets( 

209 data: NDArray[np.uint8], 

210 *, 

211 sync_pattern: int | None = None, 

212 sync_bits: Literal[8, 16, 32, 64] = 16, 

213 length_offset: int = 2, 

214 length_size: int = 2, 

215 min_packet_size: int = 4, 

216 max_packet_size: int = 1024, 

217 recovery_strategy: RecoveryStrategy = RecoveryStrategy.NEXT_SYNC, 

218 skip_bytes: int = 1, 

219) -> PacketParseResult: 

220 """Parse variable-length packets with error recovery. 

221 

222 : Robust parsing that continues after corruption, 

223 falling back to sync word search when length fields are corrupted. 

224 

225 Error detection (DAQ-002): 

226 - Invalid if: length=0, length>max_packet_size, or length&0xFF00=0xFF00 

227 - Suspicious if: length < min_expected or length > 90th_percentile×2 # noqa: RUF002 

228 

229 Recovery strategies (DAQ-002): 

230 - next_sync: Skip to next sync word (requires sync_pattern) 

231 - skip_bytes: Skip fixed byte count and retry 

232 - heuristic: Use statistical packet length model 

233 

234 Args: 

235 data: Input byte array containing packets 

236 sync_pattern: Optional sync word to search for on errors 

237 sync_bits: Sync pattern length in bits 

238 length_offset: Byte offset to length field from start 

239 length_size: Length field size in bytes (1 or 2) 

240 min_packet_size: Minimum valid packet size in bytes 

241 max_packet_size: Maximum valid packet size in bytes 

242 recovery_strategy: Strategy to use when corruption detected 

243 skip_bytes: Number of bytes to skip for SKIP_BYTES strategy 

244 

245 Returns: 

246 PacketParseResult with parsed packets and error information 

247 

248 Raises: 

249 ValueError: If length_size not 1 or 2 

250 ValueError: If recovery_strategy is NEXT_SYNC without sync_pattern 

251 

252 Examples: 

253 >>> # Simple TLV parsing with sync word 

254 >>> data = np.array([0xAA, 0x55, 0x00, 0x04, 0x01, 0x02], dtype=np.uint8) 

255 >>> result = parse_variable_length_packets( 

256 ... data, sync_pattern=0xAA55, length_offset=2 

257 ... ) 

258 >>> len(result.packets) 

259 1 

260 

261 References: 

262 DAQ-002: Robust Variable-Length Packet Parsing with Error Recovery 

263 """ 

264 if length_size not in (1, 2): 

265 raise ValueError("length_size must be 1 or 2") 

266 

267 if recovery_strategy == RecoveryStrategy.NEXT_SYNC and sync_pattern is None: 

268 raise ValueError("NEXT_SYNC strategy requires sync_pattern") 

269 

270 packets: list[bytes] = [] 

271 valid: list[bool] = [] 

272 errors: list[str | None] = [] 

273 error_positions: list[int] = [] 

274 recovery_count = 0 

275 

276 # Track packet lengths for heuristic recovery 

277 packet_lengths: list[int] = [] 

278 

279 pos = 0 

280 while pos < len(data): 

281 # Try to parse packet at current position 

282 

283 # Check if we have enough data for header 

284 if pos + length_offset + length_size > len(data): 

285 break 

286 

287 # Extract length field 

288 length_pos = pos + length_offset 

289 if length_size == 1: 

290 pkt_length = int(data[length_pos]) 

291 else: # 2 bytes 

292 pkt_length = (int(data[length_pos]) << 8) | int(data[length_pos + 1]) 

293 

294 # Validate length field 

295 is_valid_length = True 

296 error_type = None 

297 

298 # Check for obviously corrupted lengths 

299 if ( 

300 pkt_length == 0 

301 or pkt_length > max_packet_size 

302 or (length_size == 2 and (pkt_length & 0xFF00) == 0xFF00) 

303 or pkt_length < min_packet_size 

304 ): 

305 is_valid_length = False 

306 error_type = "length_corruption" 

307 

308 # Check suspiciously large length (heuristic) 

309 if is_valid_length and len(packet_lengths) >= 10: 

310 p90 = np.percentile(packet_lengths, 90) 

311 if pkt_length > p90 * 2: 

312 is_valid_length = False 

313 error_type = "length_corruption" 

314 

315 if is_valid_length: 

316 # Length looks good, try to extract packet 

317 packet_end = pos + pkt_length 

318 

319 if packet_end <= len(data): 

320 # Successfully extract packet 

321 packet_data = bytes(data[pos:packet_end]) 

322 packets.append(packet_data) 

323 valid.append(True) 

324 errors.append(None) 

325 packet_lengths.append(pkt_length) 

326 pos = packet_end 

327 else: 

328 # Packet extends beyond data 

329 errors.append("truncated") 

330 error_positions.append(pos) 

331 break 

332 else: 

333 # Length field corrupted - apply recovery 

334 recovery_count += 1 

335 error_positions.append(pos) 

336 

337 if recovery_strategy == RecoveryStrategy.NEXT_SYNC: 

338 # Search for next sync word 

339 assert sync_pattern is not None # Validated at function start 

340 search_start = pos + 1 

341 search_data = data[search_start:] 

342 

343 if len(search_data) >= sync_bits // 8: 343 ↛ 359line 343 didn't jump to line 359 because the condition on line 343 was always true

344 matches = fuzzy_sync_search( 

345 search_data, 

346 sync_pattern, 

347 pattern_bits=sync_bits, 

348 max_errors=0, # type: ignore[arg-type] 

349 ) 

350 

351 if matches: 

352 # Found sync, jump to it 

353 pos = search_start + matches[0].index 

354 errors.append("sync_lost") 

355 else: 

356 # No more syncs found 

357 break 

358 else: 

359 break 

360 

361 elif recovery_strategy == RecoveryStrategy.SKIP_BYTES: 

362 # Skip fixed bytes and retry 

363 pos += skip_bytes 

364 errors.append(error_type) 

365 

366 elif recovery_strategy == RecoveryStrategy.HEURISTIC: 366 ↛ 280line 366 didn't jump to line 280 because the condition on line 366 was always true

367 # Use median packet length as guess 

368 if packet_lengths: 368 ↛ 373line 368 didn't jump to line 373 because the condition on line 368 was always true

369 guess_length = int(np.median(packet_lengths)) 

370 pos += guess_length 

371 else: 

372 # No history, skip minimal amount 

373 pos += min_packet_size 

374 errors.append(error_type) 

375 

376 return PacketParseResult( 

377 packets=packets, 

378 valid=valid, 

379 errors=errors, 

380 error_positions=error_positions, 

381 recovery_count=recovery_count, 

382 )