Coverage for src / tracekit / analyzers / patterns / discovery.py: 97%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Automatic signature and delimiter discovery. 

2 

3This module implements algorithms for automatically discovering candidate 

4signatures, headers, and delimiters in binary data through statistical analysis. 

5 

6 

7Author: TraceKit Development Team 

8""" 

9 

10from __future__ import annotations 

11 

12import math 

13from collections import Counter, defaultdict 

14from dataclasses import dataclass 

15from typing import TYPE_CHECKING 

16 

17import numpy as np 

18 

19if TYPE_CHECKING: 

20 from numpy.typing import NDArray 

21 

22 

23@dataclass 

24class CandidateSignature: 

25 """A candidate signature/header pattern. 

26 

27 Attributes: 

28 pattern: The signature byte pattern 

29 length: Length of signature in bytes 

30 occurrences: Number of occurrences in data 

31 positions: Start positions of each occurrence 

32 interval_mean: Mean interval between occurrences (samples) 

33 interval_std: Standard deviation of intervals (consistency measure) 

34 entropy: Pattern entropy (low = more structured) 

35 score: Overall distinctiveness score (0-1, higher = better) 

36 """ 

37 

38 pattern: bytes 

39 length: int 

40 occurrences: int 

41 positions: list[int] 

42 interval_mean: float 

43 interval_std: float 

44 entropy: float 

45 score: float 

46 

47 def __post_init__(self) -> None: 

48 """Validate candidate signature.""" 

49 if self.length <= 0: 

50 raise ValueError("length must be positive") 

51 if self.occurrences < 0: 

52 raise ValueError("occurrences must be non-negative") 

53 if len(self.pattern) != self.length: 

54 raise ValueError("pattern length must match length field") 

55 if self.score < 0 or self.score > 1: 

56 raise ValueError("score must be in range [0, 1]") 

57 

58 

59class SignatureDiscovery: 

60 """Automatic signature and header discovery. 

61 

62 : Automatic Signature Discovery 

63 

64 This class analyzes binary data to automatically identify candidate 

65 signatures, headers, and delimiters based on statistical patterns. 

66 """ 

67 

68 def __init__(self, min_length: int = 4, max_length: int = 16, min_occurrences: int = 2): 

69 """Initialize signature discovery. 

70 

71 Args: 

72 min_length: Minimum signature length in bytes 

73 max_length: Maximum signature length in bytes 

74 min_occurrences: Minimum number of times a pattern must occur to be considered 

75 

76 Raises: 

77 ValueError: If min_length or max_length are invalid 

78 """ 

79 if min_length < 1: 

80 raise ValueError("min_length must be at least 1") 

81 if max_length < min_length: 

82 raise ValueError("max_length must be >= min_length") 

83 if min_occurrences < 1: 

84 raise ValueError("min_occurrences must be at least 1") 

85 

86 self.min_length = min_length 

87 self.max_length = max_length 

88 self.min_occurrences = min_occurrences 

89 

90 def discover_signatures( 

91 self, data: bytes | NDArray[np.uint8] | list[bytes] 

92 ) -> list[CandidateSignature]: 

93 """Discover candidate signatures in data. 

94 

95 : General signature discovery 

96 

97 Finds byte patterns that appear regularly throughout the data, 

98 suggesting they may be headers, sync markers, or delimiters. 

99 

100 Args: 

101 data: Input binary data or list of messages 

102 

103 Returns: 

104 List of CandidateSignature sorted by score (best first) 

105 

106 Examples: 

107 >>> data = b"\\xAA\\x55DATA" * 100 

108 >>> discovery = SignatureDiscovery(min_length=2, max_length=8) 

109 >>> sigs = discovery.discover_signatures(data) 

110 >>> assert any(s.pattern == b"\\xAA\\x55" for s in sigs) 

111 """ 

112 # Handle list of messages 

113 if isinstance(data, list): 

114 # Concatenate all messages for analysis 

115 data_bytes = b"".join(_to_bytes(msg) for msg in data) 

116 else: 

117 data_bytes = _to_bytes(data) 

118 n = len(data_bytes) 

119 

120 if n < self.min_length: 

121 return [] 

122 

123 # Find all repeating patterns 

124 pattern_dict = defaultdict(list) 

125 

126 for length in range(self.min_length, min(self.max_length + 1, n + 1)): 

127 for i in range(n - length + 1): 

128 pattern = data_bytes[i : i + length] 

129 pattern_dict[pattern].append(i) 

130 

131 # Analyze each pattern 

132 candidates = [] 

133 for pattern, positions in pattern_dict.items(): 

134 # Filter by min_occurrences 

135 if len(positions) < self.min_occurrences: 

136 continue 

137 

138 # Calculate statistics 

139 intervals = np.diff(positions) 

140 interval_mean = float(np.mean(intervals)) if len(intervals) > 0 else 0.0 

141 interval_std = float(np.std(intervals)) if len(intervals) > 0 else 0.0 

142 

143 # Calculate pattern entropy 

144 entropy = _calculate_entropy(pattern) 

145 

146 # Calculate distinctiveness score 

147 score = self._calculate_score( 

148 pattern=pattern, 

149 occurrences=len(positions), 

150 interval_mean=interval_mean, 

151 interval_std=interval_std, 

152 entropy=entropy, 

153 data_length=n, 

154 ) 

155 

156 candidates.append( 

157 CandidateSignature( 

158 pattern=pattern, 

159 length=len(pattern), 

160 occurrences=len(positions), 

161 positions=sorted(positions), 

162 interval_mean=interval_mean, 

163 interval_std=interval_std, 

164 entropy=entropy, 

165 score=score, 

166 ) 

167 ) 

168 

169 # Sort by score (descending) 

170 candidates.sort(key=lambda x: x.score, reverse=True) 

171 

172 return candidates 

173 

174 def find_header_candidates( 

175 self, data: bytes | NDArray[np.uint8], max_candidates: int = 20 

176 ) -> list[CandidateSignature]: 

177 """Find patterns likely to be message headers. 

178 

179 : Header candidate detection 

180 

181 Headers typically: 

182 - Have low entropy (structured, not random) 

183 - Appear at regular intervals 

184 - Are relatively short (2-16 bytes) 

185 - May contain magic bytes or sync markers 

186 

187 Args: 

188 data: Input binary data 

189 max_candidates: Maximum number of candidates to return 

190 

191 Returns: 

192 List of CandidateSignature sorted by likelihood (best first) 

193 """ 

194 # Discover all signatures 

195 candidates = self.discover_signatures(data) 

196 

197 # Filter and rank for header characteristics 

198 header_candidates = [] 

199 for sig in candidates: 

200 # Headers should have low entropy (structured) 

201 if sig.entropy > 6.0: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 continue 

203 

204 # Headers should be reasonably frequent 

205 if sig.occurrences < 3: 

206 continue 

207 

208 # Prefer regular intervals (low std deviation) 

209 regularity = 1.0 / (1.0 + sig.interval_std / max(sig.interval_mean, 1.0)) 

210 

211 # Combine score with header-specific features 

212 header_score = sig.score * 0.6 + (1.0 - sig.entropy / 8.0) * 0.2 + regularity * 0.2 

213 

214 header_candidates.append( 

215 CandidateSignature( 

216 pattern=sig.pattern, 

217 length=sig.length, 

218 occurrences=sig.occurrences, 

219 positions=sig.positions, 

220 interval_mean=sig.interval_mean, 

221 interval_std=sig.interval_std, 

222 entropy=sig.entropy, 

223 score=header_score, 

224 ) 

225 ) 

226 

227 # Sort by header score 

228 header_candidates.sort(key=lambda x: x.score, reverse=True) 

229 

230 return header_candidates[:max_candidates] 

231 

232 def find_delimiter_candidates( 

233 self, data: bytes | NDArray[np.uint8] 

234 ) -> list[CandidateSignature]: 

235 """Find patterns likely to be message delimiters. 

236 

237 : Delimiter candidate detection 

238 

239 Delimiters typically: 

240 - Are short (1-4 bytes) 

241 - Have very low entropy (often single byte like \\n, \\0, etc.) 

242 - Appear frequently 

243 - May have variable intervals 

244 

245 Args: 

246 data: Input binary data 

247 

248 Returns: 

249 List of CandidateSignature sorted by likelihood (best first) 

250 """ 

251 data_bytes = _to_bytes(data) 

252 n = len(data_bytes) 

253 

254 if n < 2: 

255 return [] 

256 

257 # Focus on short patterns (typical delimiters) 

258 delimiter_candidates = [] 

259 max_delim_length = min(4, self.max_length) 

260 

261 for length in range(1, max_delim_length + 1): 

262 pattern_positions = defaultdict(list) 

263 

264 for i in range(n - length + 1): 

265 pattern = data_bytes[i : i + length] 

266 pattern_positions[pattern].append(i) 

267 

268 for pattern, positions in pattern_positions.items(): 

269 if len(positions) < 5: # Delimiters should be frequent 

270 continue 

271 

272 # Calculate statistics 

273 intervals = np.diff(positions) 

274 interval_mean = float(np.mean(intervals)) if len(intervals) > 0 else 0.0 

275 interval_std = float(np.std(intervals)) if len(intervals) > 0 else 0.0 

276 entropy = _calculate_entropy(pattern) 

277 

278 # Delimiters should have very low entropy 

279 if entropy > 3.0: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 continue 

281 

282 # Calculate delimiter score 

283 # High frequency + low entropy + short length = good delimiter 

284 frequency_score = min(len(positions) / (n / 100.0), 1.0) 

285 entropy_score = 1.0 - entropy / 8.0 

286 length_score = 1.0 - (length - 1) / max_delim_length 

287 

288 delimiter_score = frequency_score * 0.5 + entropy_score * 0.3 + length_score * 0.2 

289 

290 delimiter_candidates.append( 

291 CandidateSignature( 

292 pattern=pattern, 

293 length=length, 

294 occurrences=len(positions), 

295 positions=sorted(positions), 

296 interval_mean=interval_mean, 

297 interval_std=interval_std, 

298 entropy=entropy, 

299 score=delimiter_score, 

300 ) 

301 ) 

302 

303 # Sort by delimiter score 

304 delimiter_candidates.sort(key=lambda x: x.score, reverse=True) 

305 

306 return delimiter_candidates[:20] # Top 20 delimiter candidates 

307 

308 def rank_signatures(self, candidates: list[CandidateSignature]) -> list[CandidateSignature]: 

309 """Rank signatures by distinctiveness. 

310 

311 : Signature ranking 

312 

313 Re-ranks candidates considering: 

314 - Frequency vs. expected random occurrence 

315 - Regularity of appearance 

316 - Entropy characteristics 

317 - Pattern uniqueness 

318 

319 Args: 

320 candidates: List of candidate signatures 

321 

322 Returns: 

323 Re-ranked list of CandidateSignature 

324 """ 

325 if not candidates: 

326 return [] 

327 

328 # Re-calculate scores with more sophisticated ranking 

329 ranked = [] 

330 for sig in candidates: 

331 # Regularity measure 

332 if sig.interval_mean > 0: 

333 regularity = 1.0 / (1.0 + sig.interval_std / sig.interval_mean) 

334 else: 

335 regularity = 0.0 

336 

337 # Entropy score (prefer low entropy for signatures) 

338 entropy_score = max(0.0, 1.0 - sig.entropy / 8.0) 

339 

340 # Frequency score (normalized) 

341 frequency_score = min(sig.occurrences / 100.0, 1.0) 

342 

343 # Combined score 

344 new_score = regularity * 0.4 + entropy_score * 0.3 + frequency_score * 0.3 

345 

346 ranked.append( 

347 CandidateSignature( 

348 pattern=sig.pattern, 

349 length=sig.length, 

350 occurrences=sig.occurrences, 

351 positions=sig.positions, 

352 interval_mean=sig.interval_mean, 

353 interval_std=sig.interval_std, 

354 entropy=sig.entropy, 

355 score=new_score, 

356 ) 

357 ) 

358 

359 # Sort by new score 

360 ranked.sort(key=lambda x: x.score, reverse=True) 

361 

362 return ranked 

363 

364 def _calculate_score( 

365 self, 

366 pattern: bytes, 

367 occurrences: int, 

368 interval_mean: float, 

369 interval_std: float, 

370 entropy: float, 

371 data_length: int, 

372 ) -> float: 

373 """Calculate distinctiveness score for a pattern. 

374 

375 Args: 

376 pattern: The byte pattern 

377 occurrences: Number of occurrences 

378 interval_mean: Mean interval between occurrences 

379 interval_std: Standard deviation of intervals 

380 entropy: Pattern entropy 

381 data_length: Total data length 

382 

383 Returns: 

384 Score in range [0, 1], higher is more distinctive 

385 """ 

386 # Frequency score (normalized) 

387 frequency_score = min(occurrences / 50.0, 1.0) 

388 

389 # Regularity score (prefer consistent intervals) 

390 if interval_mean > 0: 390 ↛ 393line 390 didn't jump to line 393 because the condition on line 390 was always true

391 regularity_score = 1.0 / (1.0 + interval_std / interval_mean) 

392 else: 

393 regularity_score = 0.0 

394 

395 # Entropy score (prefer structured patterns, not random) 

396 entropy_score = max(0.0, 1.0 - entropy / 8.0) 

397 

398 # Length score (prefer medium-length patterns) 

399 optimal_length = 4.0 

400 length_score = 1.0 - abs(len(pattern) - optimal_length) / 8.0 

401 length_score = max(0.0, length_score) 

402 

403 # Combine scores 

404 score = ( 

405 frequency_score * 0.3 

406 + regularity_score * 0.4 

407 + entropy_score * 0.2 

408 + length_score * 0.1 

409 ) 

410 

411 return min(1.0, max(0.0, score)) 

412 

413 

414# Convenience functions 

415 

416 

417def discover_signatures( 

418 data: bytes | NDArray[np.uint8] | list[bytes], 

419 min_length: int = 4, 

420 max_length: int = 16, 

421 min_occurrences: int = 2, 

422) -> list[CandidateSignature]: 

423 """Convenience function for signature discovery. 

424 

425 : Signature discovery API 

426 

427 Args: 

428 data: Input binary data or list of messages 

429 min_length: Minimum signature length 

430 max_length: Maximum signature length 

431 min_occurrences: Minimum number of times a pattern must occur 

432 

433 Returns: 

434 List of CandidateSignature sorted by score 

435 

436 Examples: 

437 >>> data = b"\\xFF\\xFF" + b"DATA" * 50 

438 >>> signatures = discover_signatures(data, min_length=2) 

439 """ 

440 discovery = SignatureDiscovery(min_length, max_length, min_occurrences) 

441 return discovery.discover_signatures(data) 

442 

443 

444def find_header_candidates(data: bytes | NDArray[np.uint8]) -> list[CandidateSignature]: 

445 """Find header candidates. 

446 

447 : Header discovery API 

448 

449 Args: 

450 data: Input binary data 

451 

452 Returns: 

453 List of header candidates 

454 

455 Examples: 

456 >>> data = b"HDR" + b"payload" * 20 

457 >>> headers = find_header_candidates(data) 

458 """ 

459 discovery = SignatureDiscovery(min_length=2, max_length=16) 

460 return discovery.find_header_candidates(data) 

461 

462 

463def find_delimiter_candidates(data: bytes | NDArray[np.uint8]) -> list[CandidateSignature]: 

464 """Find delimiter candidates. 

465 

466 : Delimiter discovery API 

467 

468 Args: 

469 data: Input binary data 

470 

471 Returns: 

472 List of delimiter candidates 

473 

474 Examples: 

475 >>> data = b"field1,field2,field3" 

476 >>> delimiters = find_delimiter_candidates(data) 

477 >>> assert any(d.pattern == b"," for d in delimiters) 

478 """ 

479 discovery = SignatureDiscovery(min_length=1, max_length=4) 

480 return discovery.find_delimiter_candidates(data) 

481 

482 

483# Helper functions 

484 

485 

486def _to_bytes(data: bytes | NDArray[np.uint8] | memoryview | bytearray) -> bytes: 

487 """Convert input data to bytes. 

488 

489 Args: 

490 data: Input data (bytes, bytearray, memoryview, or numpy array) 

491 

492 Returns: 

493 Bytes representation 

494 

495 Raises: 

496 TypeError: If data type is not supported 

497 """ 

498 if isinstance(data, bytes): 

499 return data 

500 elif isinstance(data, bytearray | memoryview): 

501 return bytes(data) 

502 elif isinstance(data, np.ndarray): 

503 return data.astype(np.uint8).tobytes() # type: ignore[no-any-return] 

504 else: 

505 raise TypeError(f"Unsupported data type: {type(data)}") 

506 

507 

508def _calculate_entropy(data: bytes) -> float: 

509 """Calculate Shannon entropy of byte sequence. 

510 

511 : Entropy calculation 

512 

513 Args: 

514 data: Byte sequence 

515 

516 Returns: 

517 Entropy in bits (0-8 for byte data) 

518 

519 Examples: 

520 >>> _calculate_entropy(b"\\x00" * 100) # All same byte 

521 0.0 

522 >>> entropy = _calculate_entropy(bytes(range(256))) # Uniform distribution 

523 >>> assert entropy > 7.9 # Close to 8.0 

524 """ 

525 if len(data) == 0: 

526 return 0.0 

527 

528 # Count byte frequencies 

529 byte_counts = Counter(data) 

530 n = len(data) 

531 

532 # Calculate entropy 

533 entropy = 0.0 

534 for count in byte_counts.values(): 

535 if count > 0: 535 ↛ 534line 535 didn't jump to line 534 because the condition on line 535 was always true

536 prob = count / n 

537 entropy -= prob * math.log2(prob) 

538 

539 return entropy