Coverage for src / tracekit / analyzers / packet / payload_patterns.py: 0%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Payload pattern search and delimiter detection. 

2 

3RE-PAY-002: Payload Pattern Search 

4RE-PAY-003: Payload Delimiter Detection 

5 

6This module provides pattern matching, delimiter detection, and 

7message boundary finding for binary payloads. 

8""" 

9 

10from __future__ import annotations 

11 

12import re 

13from collections.abc import Sequence 

14from dataclasses import dataclass, field 

15from typing import Any, Literal, cast 

16 

17import numpy as np 

18 

19from tracekit.analyzers.packet.payload_extraction import PayloadExtractor 

20 

21 

22@dataclass 

23class PatternMatch: 

24 """Pattern match result. 

25 

26 Implements RE-PAY-002: Pattern match with location info. 

27 

28 Attributes: 

29 pattern_name: Name of matched pattern. 

30 offset: Byte offset within payload. 

31 matched: Matched bytes. 

32 packet_index: Source packet index. 

33 context: Surrounding bytes for context. 

34 """ 

35 

36 pattern_name: str 

37 offset: int 

38 matched: bytes 

39 packet_index: int 

40 context: bytes = b"" 

41 

42 

43@dataclass 

44class DelimiterResult: 

45 """Detected delimiter information. 

46 

47 Implements RE-PAY-003: Delimiter detection result. 

48 

49 Attributes: 

50 delimiter: Detected delimiter bytes. 

51 delimiter_type: Type of delimiter (fixed, length_prefix, pattern). 

52 confidence: Detection confidence (0-1). 

53 occurrences: Number of occurrences found. 

54 positions: List of positions where delimiter found. 

55 """ 

56 

57 delimiter: bytes 

58 delimiter_type: Literal["fixed", "length_prefix", "pattern"] 

59 confidence: float 

60 occurrences: int 

61 positions: list[int] = field(default_factory=list) 

62 

63 

64@dataclass 

65class LengthPrefixResult: 

66 """Length prefix detection result. 

67 

68 Implements RE-PAY-003: Length prefix format detection. 

69 

70 Attributes: 

71 detected: Whether length prefix was detected. 

72 length_bytes: Number of bytes for length field. 

73 endian: Endianness (big or little). 

74 offset: Offset of length field from message start. 

75 includes_length: Whether length includes the length field itself. 

76 confidence: Detection confidence (0-1). 

77 """ 

78 

79 detected: bool 

80 length_bytes: int = 0 

81 endian: Literal["big", "little"] = "big" 

82 offset: int = 0 

83 includes_length: bool = False 

84 confidence: float = 0.0 

85 

86 

87@dataclass 

88class MessageBoundary: 

89 """Message boundary information. 

90 

91 Implements RE-PAY-003: Message boundary detection. 

92 

93 Attributes: 

94 start: Start offset of message. 

95 end: End offset of message. 

96 length: Message length. 

97 data: Message data. 

98 index: Message index. 

99 """ 

100 

101 start: int 

102 end: int 

103 length: int 

104 data: bytes 

105 index: int 

106 

107 

108# ============================================================================= 

109# RE-PAY-002: Pattern Search Functions 

110# ============================================================================= 

111 

112 

113def search_pattern( 

114 packets: Sequence[dict[str, Any] | bytes], 

115 pattern: bytes | str, 

116 pattern_type: Literal["exact", "wildcard", "regex"] = "exact", 

117 context_bytes: int = 8, 

118) -> list[PatternMatch]: 

119 """Search for pattern in packet payloads. 

120 

121 Implements RE-PAY-002: Payload Pattern Search. 

122 

123 Args: 

124 packets: Sequence of packets to search. 

125 pattern: Pattern to search for. 

126 pattern_type: Type of pattern matching. 

127 context_bytes: Number of context bytes around match. 

128 

129 Returns: 

130 List of PatternMatch results. 

131 

132 Example: 

133 >>> matches = search_pattern(packets, b'\\x00\\x01\\x00\\x00') 

134 >>> for m in matches: 

135 ... print(f"Found at packet {m.packet_index}, offset {m.offset}") 

136 """ 

137 extractor = PayloadExtractor() 

138 results = [] 

139 

140 for i, packet in enumerate(packets): 

141 payload = extractor.extract_payload(packet) 

142 if isinstance(payload, memoryview | np.ndarray): 

143 payload = bytes(payload) 

144 

145 matches = _find_pattern_in_data(payload, pattern, pattern_type) 

146 

147 for offset, matched in matches: 

148 # Get context 

149 start = max(0, offset - context_bytes) 

150 end = min(len(payload), offset + len(matched) + context_bytes) 

151 context = payload[start:end] 

152 

153 results.append( 

154 PatternMatch( 

155 pattern_name=pattern.hex() if isinstance(pattern, bytes) else str(pattern), 

156 offset=offset, 

157 matched=matched, 

158 packet_index=i, 

159 context=context, 

160 ) 

161 ) 

162 

163 return results 

164 

165 

166def search_patterns( 

167 packets: Sequence[dict[str, Any] | bytes], 

168 patterns: dict[str, bytes | str], 

169 context_bytes: int = 8, 

170) -> dict[str, list[PatternMatch]]: 

171 """Search for multiple patterns simultaneously. 

172 

173 Implements RE-PAY-002: Multi-pattern search. 

174 

175 Args: 

176 packets: Sequence of packets to search. 

177 patterns: Dictionary mapping names to patterns. 

178 context_bytes: Number of context bytes around match. 

179 

180 Returns: 

181 Dictionary mapping pattern names to match lists. 

182 

183 Example: 

184 >>> signatures = { 

185 ... "header_a": b'\\xAA\\x55', 

186 ... "header_b": b'\\xDE\\xAD', 

187 ... } 

188 >>> results = search_patterns(packets, signatures) 

189 >>> for name, matches in results.items(): 

190 ... print(f"{name}: {len(matches)} matches") 

191 """ 

192 results: dict[str, list[PatternMatch]] = {name: [] for name in patterns} 

193 extractor = PayloadExtractor() 

194 

195 for i, packet in enumerate(packets): 

196 payload = extractor.extract_payload(packet) 

197 if isinstance(payload, memoryview | np.ndarray): 

198 payload = bytes(payload) 

199 

200 for name, pattern in patterns.items(): 

201 # Detect pattern type 

202 if isinstance(pattern, bytes): 

203 if b"??" in pattern or b"\\x??" in pattern: 

204 pattern_type = "wildcard" 

205 else: 

206 pattern_type = "exact" 

207 else: 

208 pattern_type = "regex" 

209 

210 matches = _find_pattern_in_data(payload, pattern, pattern_type) 

211 

212 for offset, matched in matches: 

213 start = max(0, offset - context_bytes) 

214 end = min(len(payload), offset + len(matched) + context_bytes) 

215 context = payload[start:end] 

216 

217 results[name].append( 

218 PatternMatch( 

219 pattern_name=name, 

220 offset=offset, 

221 matched=matched, 

222 packet_index=i, 

223 context=context, 

224 ) 

225 ) 

226 

227 return results 

228 

229 

230def filter_by_pattern( 

231 packets: Sequence[dict[str, Any] | bytes], 

232 pattern: bytes | str, 

233 pattern_type: Literal["exact", "wildcard", "regex"] = "exact", 

234) -> list[dict[str, Any] | bytes]: 

235 """Filter packets that contain a pattern. 

236 

237 Implements RE-PAY-002: Pattern-based filtering. 

238 

239 Args: 

240 packets: Sequence of packets. 

241 pattern: Pattern to match. 

242 pattern_type: Type of pattern matching. 

243 

244 Returns: 

245 List of packets containing the pattern. 

246 """ 

247 extractor = PayloadExtractor() 

248 result = [] 

249 

250 for packet in packets: 

251 payload = extractor.extract_payload(packet) 

252 if isinstance(payload, memoryview | np.ndarray): 

253 payload = bytes(payload) 

254 

255 matches = _find_pattern_in_data(payload, pattern, pattern_type) 

256 if len(matches) > 0: 

257 result.append(packet) 

258 

259 return result 

260 

261 

262# ============================================================================= 

263# RE-PAY-003: Delimiter Detection Functions 

264# ============================================================================= 

265 

266 

267def detect_delimiter( 

268 payloads: Sequence[bytes] | bytes, 

269 candidates: list[bytes] | None = None, 

270) -> DelimiterResult: 

271 """Automatically detect message delimiter. 

272 

273 Implements RE-PAY-003: Delimiter detection. 

274 

275 Args: 

276 payloads: Payload data or list of payloads. 

277 candidates: Optional list of candidate delimiters to test. 

278 

279 Returns: 

280 DelimiterResult with detected delimiter info. 

281 

282 Example: 

283 >>> data = b'msg1\\r\\nmsg2\\r\\nmsg3\\r\\n' 

284 >>> result = detect_delimiter(data) 

285 >>> print(f"Delimiter: {result.delimiter!r}") 

286 """ 

287 # Combine payloads if list 

288 if isinstance(payloads, list | tuple): 

289 data: bytes = b"".join(payloads) 

290 else: 

291 # Type narrowing: payloads is bytes here 

292 data = cast("bytes", payloads) 

293 

294 if not data: 

295 return DelimiterResult( 

296 delimiter=b"", 

297 delimiter_type="fixed", 

298 confidence=0.0, 

299 occurrences=0, 

300 ) 

301 

302 # Default candidates 

303 if candidates is None: 

304 candidates = [ 

305 b"\r\n", # CRLF 

306 b"\n", # LF 

307 b"\x00", # Null 

308 b"\r", # CR 

309 b"\x0d\x0a", # CRLF (explicit) 

310 ] 

311 

312 best_result = None 

313 best_score = 0.0 

314 

315 for delim in candidates: 

316 if len(delim) == 0: 

317 continue 

318 

319 count = data.count(delim) 

320 if count < 2: 

321 continue 

322 

323 # Calculate score based on frequency and regularity 

324 positions = [] 

325 pos = 0 

326 while True: 

327 pos = data.find(delim, pos) 

328 if pos == -1: 

329 break 

330 positions.append(pos) 

331 pos += len(delim) 

332 

333 if len(positions) < 2: 

334 continue 

335 

336 # Calculate interval regularity 

337 intervals = [positions[i + 1] - positions[i] for i in range(len(positions) - 1)] 

338 if len(intervals) > 0: 

339 mean_interval = sum(intervals) / len(intervals) 

340 if mean_interval > 0: 

341 variance = sum((x - mean_interval) ** 2 for x in intervals) / len(intervals) 

342 cv = (variance**0.5) / mean_interval 

343 regularity = 1.0 / (1.0 + cv) 

344 else: 

345 regularity = 0.0 

346 else: 

347 regularity = 0.0 

348 

349 # Score combines frequency and regularity 

350 score = count * (0.5 + 0.5 * regularity) 

351 

352 if score > best_score: 

353 best_score = score 

354 best_result = DelimiterResult( 

355 delimiter=delim, 

356 delimiter_type="fixed", 

357 confidence=min(1.0, regularity * 0.8 + 0.2 * min(1.0, count / 10)), 

358 occurrences=count, 

359 positions=positions, 

360 ) 

361 

362 if best_result is None: 

363 return DelimiterResult( 

364 delimiter=b"", 

365 delimiter_type="fixed", 

366 confidence=0.0, 

367 occurrences=0, 

368 ) 

369 

370 return best_result 

371 

372 

373def detect_length_prefix( 

374 payloads: Sequence[bytes], 

375 max_length_bytes: int = 4, 

376) -> LengthPrefixResult: 

377 """Detect length-prefixed message format. 

378 

379 Implements RE-PAY-003: Length prefix detection. 

380 

381 Args: 

382 payloads: List of payload samples. 

383 max_length_bytes: Maximum length field size to test. 

384 

385 Returns: 

386 LengthPrefixResult with detected format. 

387 

388 Example: 

389 >>> result = detect_length_prefix(payloads) 

390 >>> if result.detected: 

391 ... print(f"Length field: {result.length_bytes} bytes, {result.endian}") 

392 """ 

393 if not payloads: 

394 return LengthPrefixResult(detected=False) 

395 

396 # Concatenate payloads for analysis 

397 data = b"".join(payloads) 

398 

399 best_result = LengthPrefixResult(detected=False) 

400 best_score = 0.0 

401 

402 # Try different length field sizes and offsets 

403 # IMPORTANT: Prefer larger length_bytes values when scores are equal 

404 # by iterating in reverse order (4, 2, 1) and using >= for comparison 

405 for length_bytes in [4, 2, 1]: 

406 if length_bytes > max_length_bytes: 

407 continue 

408 

409 for endian_str in ["big", "little"]: 

410 endian: Literal["big", "little"] = endian_str # type: ignore[assignment] 

411 for offset in range(min(8, len(data) - length_bytes)): 

412 for includes_length in [False, True]: 

413 score, matches = _test_length_prefix( 

414 data, length_bytes, endian, offset, includes_length 

415 ) 

416 

417 # Use > to prefer larger length_bytes (tested first) when scores are equal 

418 if score > best_score and matches >= 3: 

419 best_score = score 

420 best_result = LengthPrefixResult( 

421 detected=True, 

422 length_bytes=length_bytes, 

423 endian=endian, 

424 offset=offset, 

425 includes_length=includes_length, 

426 confidence=score, 

427 ) 

428 

429 return best_result 

430 

431 

432def find_message_boundaries( 

433 payloads: Sequence[bytes] | bytes, 

434 delimiter: bytes | DelimiterResult | None = None, 

435 length_prefix: LengthPrefixResult | None = None, 

436) -> list[MessageBoundary]: 

437 """Find message boundaries in payload data. 

438 

439 Implements RE-PAY-003: Message boundary detection. 

440 

441 Args: 

442 payloads: Payload data or list of payloads. 

443 delimiter: Delimiter to use (auto-detect if None). 

444 length_prefix: Length prefix format (test if None). 

445 

446 Returns: 

447 List of MessageBoundary objects. 

448 

449 Example: 

450 >>> boundaries = find_message_boundaries(data) 

451 >>> for b in boundaries: 

452 ... print(f"Message {b.index}: {b.length} bytes") 

453 """ 

454 # Combine payloads if list 

455 if isinstance(payloads, list | tuple): 

456 data: bytes = b"".join(payloads) 

457 else: 

458 # Type narrowing: payloads is bytes here 

459 data = cast("bytes", payloads) 

460 

461 if not data: 

462 return [] 

463 

464 boundaries = [] 

465 

466 # Try length prefix first 

467 if length_prefix is None: 

468 length_prefix = detect_length_prefix([data] if isinstance(data, bytes) else list(payloads)) 

469 

470 if length_prefix.detected: 

471 boundaries = _extract_length_prefixed_messages(data, length_prefix) 

472 if len(boundaries) > 0: 

473 return boundaries 

474 

475 # Fall back to delimiter 

476 if delimiter is None: 

477 delimiter = detect_delimiter(data) 

478 

479 if isinstance(delimiter, DelimiterResult): 

480 delim = delimiter.delimiter 

481 else: 

482 delim = delimiter 

483 

484 if not delim: 

485 # No delimiter found, return whole data as one message 

486 return [MessageBoundary(start=0, end=len(data), length=len(data), data=data, index=0)] 

487 

488 # Split by delimiter 

489 parts = data.split(delim) 

490 current_offset = 0 

491 

492 for _i, part in enumerate(parts): 

493 if part: # Skip empty parts 

494 boundaries.append( 

495 MessageBoundary( 

496 start=current_offset, 

497 end=current_offset + len(part), 

498 length=len(part), 

499 data=part, 

500 index=len(boundaries), 

501 ) 

502 ) 

503 current_offset += len(part) + len(delim) 

504 

505 return boundaries 

506 

507 

508def segment_messages( 

509 payloads: Sequence[bytes] | bytes, 

510 delimiter: bytes | None = None, 

511 length_prefix: LengthPrefixResult | None = None, 

512) -> list[bytes]: 

513 """Segment stream into individual messages. 

514 

515 Implements RE-PAY-003: Message segmentation. 

516 

517 Args: 

518 payloads: Payload data or list of payloads. 

519 delimiter: Delimiter to use (auto-detect if None). 

520 length_prefix: Length prefix format (auto-detect if None). 

521 

522 Returns: 

523 List of message bytes. 

524 """ 

525 boundaries = find_message_boundaries(payloads, delimiter, length_prefix) 

526 return [b.data for b in boundaries] 

527 

528 

529# ============================================================================= 

530# Helper Functions 

531# ============================================================================= 

532 

533 

534def _find_pattern_in_data( 

535 data: bytes, 

536 pattern: bytes | str, 

537 pattern_type: str, 

538) -> list[tuple[int, bytes]]: 

539 """Find pattern occurrences in data.""" 

540 matches = [] 

541 

542 if pattern_type == "exact": 

543 if isinstance(pattern, str): 

544 pattern = pattern.encode() 

545 pos = 0 

546 while True: 

547 pos = data.find(pattern, pos) 

548 if pos == -1: 

549 break 

550 matches.append((pos, pattern)) 

551 pos += 1 

552 

553 elif pattern_type == "wildcard": 

554 # Convert wildcard pattern to regex 

555 if isinstance(pattern, bytes): 

556 # Replace ?? with . for single byte match 

557 regex_pattern = pattern.replace(b"??", b".") 

558 try: 

559 for match in re.finditer(regex_pattern, data, re.DOTALL): 

560 matches.append((match.start(), match.group())) 

561 except re.error: 

562 pass 

563 

564 elif pattern_type == "regex": 

565 if isinstance(pattern, str): 

566 pattern = pattern.encode() 

567 try: 

568 for match in re.finditer(pattern, data, re.DOTALL): 

569 matches.append((match.start(), match.group())) 

570 except re.error: 

571 pass 

572 

573 return matches 

574 

575 

576def _test_length_prefix( 

577 data: bytes, 

578 length_bytes: int, 

579 endian: str, 

580 offset: int, 

581 includes_length: bool, 

582) -> tuple[float, int]: 

583 """Test if data follows a length-prefix pattern.""" 

584 matches = 0 

585 pos = 0 

586 

587 while pos + offset + length_bytes <= len(data): 

588 # Read length field 

589 length_data = data[pos + offset : pos + offset + length_bytes] 

590 if endian == "big": 

591 length = int.from_bytes(length_data, "big") 

592 else: 

593 length = int.from_bytes(length_data, "little") 

594 

595 if includes_length: 

596 expected_end = pos + length 

597 else: 

598 expected_end = pos + offset + length_bytes + length 

599 

600 # Check if this makes sense 

601 if 0 < length < 65536 and expected_end <= len(data): 

602 matches += 1 

603 pos = expected_end 

604 else: 

605 break 

606 

607 # Score based on matches and coverage 

608 coverage = pos / len(data) if len(data) > 0 else 0 

609 score = min(1.0, matches / 5) * coverage 

610 

611 return score, matches 

612 

613 

614def _extract_length_prefixed_messages( 

615 data: bytes, 

616 length_prefix: LengthPrefixResult, 

617) -> list[MessageBoundary]: 

618 """Extract messages using detected length prefix format.""" 

619 boundaries = [] 

620 pos = 0 

621 index = 0 

622 

623 while pos + length_prefix.offset + length_prefix.length_bytes <= len(data): 

624 # Read length 

625 length_data = data[ 

626 pos + length_prefix.offset : pos + length_prefix.offset + length_prefix.length_bytes 

627 ] 

628 if length_prefix.endian == "big": 

629 length = int.from_bytes(length_data, "big") 

630 else: 

631 length = int.from_bytes(length_data, "little") 

632 

633 if length_prefix.includes_length: 

634 end = pos + length 

635 else: 

636 end = pos + length_prefix.offset + length_prefix.length_bytes + length 

637 

638 if end > len(data) or length <= 0: 

639 break 

640 

641 msg_data = data[pos:end] 

642 boundaries.append( 

643 MessageBoundary( 

644 start=pos, 

645 end=end, 

646 length=end - pos, 

647 data=msg_data, 

648 index=index, 

649 ) 

650 ) 

651 

652 pos = end 

653 index += 1 

654 

655 return boundaries 

656 

657 

658__all__ = [ 

659 "DelimiterResult", 

660 "LengthPrefixResult", 

661 "MessageBoundary", 

662 "PatternMatch", 

663 "detect_delimiter", 

664 "detect_length_prefix", 

665 "filter_by_pattern", 

666 "find_message_boundaries", 

667 "search_pattern", 

668 "search_patterns", 

669 "segment_messages", 

670]