Coverage for src / tracekit / analyzers / packet / payload_patterns.py: 0%
238 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Payload pattern search and delimiter detection.
3RE-PAY-002: Payload Pattern Search
4RE-PAY-003: Payload Delimiter Detection
6This module provides pattern matching, delimiter detection, and
7message boundary finding for binary payloads.
8"""
10from __future__ import annotations
12import re
13from collections.abc import Sequence
14from dataclasses import dataclass, field
15from typing import Any, Literal, cast
17import numpy as np
19from tracekit.analyzers.packet.payload_extraction import PayloadExtractor
22@dataclass
23class PatternMatch:
24 """Pattern match result.
26 Implements RE-PAY-002: Pattern match with location info.
28 Attributes:
29 pattern_name: Name of matched pattern.
30 offset: Byte offset within payload.
31 matched: Matched bytes.
32 packet_index: Source packet index.
33 context: Surrounding bytes for context.
34 """
36 pattern_name: str
37 offset: int
38 matched: bytes
39 packet_index: int
40 context: bytes = b""
43@dataclass
44class DelimiterResult:
45 """Detected delimiter information.
47 Implements RE-PAY-003: Delimiter detection result.
49 Attributes:
50 delimiter: Detected delimiter bytes.
51 delimiter_type: Type of delimiter (fixed, length_prefix, pattern).
52 confidence: Detection confidence (0-1).
53 occurrences: Number of occurrences found.
54 positions: List of positions where delimiter found.
55 """
57 delimiter: bytes
58 delimiter_type: Literal["fixed", "length_prefix", "pattern"]
59 confidence: float
60 occurrences: int
61 positions: list[int] = field(default_factory=list)
64@dataclass
65class LengthPrefixResult:
66 """Length prefix detection result.
68 Implements RE-PAY-003: Length prefix format detection.
70 Attributes:
71 detected: Whether length prefix was detected.
72 length_bytes: Number of bytes for length field.
73 endian: Endianness (big or little).
74 offset: Offset of length field from message start.
75 includes_length: Whether length includes the length field itself.
76 confidence: Detection confidence (0-1).
77 """
79 detected: bool
80 length_bytes: int = 0
81 endian: Literal["big", "little"] = "big"
82 offset: int = 0
83 includes_length: bool = False
84 confidence: float = 0.0
87@dataclass
88class MessageBoundary:
89 """Message boundary information.
91 Implements RE-PAY-003: Message boundary detection.
93 Attributes:
94 start: Start offset of message.
95 end: End offset of message.
96 length: Message length.
97 data: Message data.
98 index: Message index.
99 """
101 start: int
102 end: int
103 length: int
104 data: bytes
105 index: int
108# =============================================================================
109# RE-PAY-002: Pattern Search Functions
110# =============================================================================
113def search_pattern(
114 packets: Sequence[dict[str, Any] | bytes],
115 pattern: bytes | str,
116 pattern_type: Literal["exact", "wildcard", "regex"] = "exact",
117 context_bytes: int = 8,
118) -> list[PatternMatch]:
119 """Search for pattern in packet payloads.
121 Implements RE-PAY-002: Payload Pattern Search.
123 Args:
124 packets: Sequence of packets to search.
125 pattern: Pattern to search for.
126 pattern_type: Type of pattern matching.
127 context_bytes: Number of context bytes around match.
129 Returns:
130 List of PatternMatch results.
132 Example:
133 >>> matches = search_pattern(packets, b'\\x00\\x01\\x00\\x00')
134 >>> for m in matches:
135 ... print(f"Found at packet {m.packet_index}, offset {m.offset}")
136 """
137 extractor = PayloadExtractor()
138 results = []
140 for i, packet in enumerate(packets):
141 payload = extractor.extract_payload(packet)
142 if isinstance(payload, memoryview | np.ndarray):
143 payload = bytes(payload)
145 matches = _find_pattern_in_data(payload, pattern, pattern_type)
147 for offset, matched in matches:
148 # Get context
149 start = max(0, offset - context_bytes)
150 end = min(len(payload), offset + len(matched) + context_bytes)
151 context = payload[start:end]
153 results.append(
154 PatternMatch(
155 pattern_name=pattern.hex() if isinstance(pattern, bytes) else str(pattern),
156 offset=offset,
157 matched=matched,
158 packet_index=i,
159 context=context,
160 )
161 )
163 return results
166def search_patterns(
167 packets: Sequence[dict[str, Any] | bytes],
168 patterns: dict[str, bytes | str],
169 context_bytes: int = 8,
170) -> dict[str, list[PatternMatch]]:
171 """Search for multiple patterns simultaneously.
173 Implements RE-PAY-002: Multi-pattern search.
175 Args:
176 packets: Sequence of packets to search.
177 patterns: Dictionary mapping names to patterns.
178 context_bytes: Number of context bytes around match.
180 Returns:
181 Dictionary mapping pattern names to match lists.
183 Example:
184 >>> signatures = {
185 ... "header_a": b'\\xAA\\x55',
186 ... "header_b": b'\\xDE\\xAD',
187 ... }
188 >>> results = search_patterns(packets, signatures)
189 >>> for name, matches in results.items():
190 ... print(f"{name}: {len(matches)} matches")
191 """
192 results: dict[str, list[PatternMatch]] = {name: [] for name in patterns}
193 extractor = PayloadExtractor()
195 for i, packet in enumerate(packets):
196 payload = extractor.extract_payload(packet)
197 if isinstance(payload, memoryview | np.ndarray):
198 payload = bytes(payload)
200 for name, pattern in patterns.items():
201 # Detect pattern type
202 if isinstance(pattern, bytes):
203 if b"??" in pattern or b"\\x??" in pattern:
204 pattern_type = "wildcard"
205 else:
206 pattern_type = "exact"
207 else:
208 pattern_type = "regex"
210 matches = _find_pattern_in_data(payload, pattern, pattern_type)
212 for offset, matched in matches:
213 start = max(0, offset - context_bytes)
214 end = min(len(payload), offset + len(matched) + context_bytes)
215 context = payload[start:end]
217 results[name].append(
218 PatternMatch(
219 pattern_name=name,
220 offset=offset,
221 matched=matched,
222 packet_index=i,
223 context=context,
224 )
225 )
227 return results
230def filter_by_pattern(
231 packets: Sequence[dict[str, Any] | bytes],
232 pattern: bytes | str,
233 pattern_type: Literal["exact", "wildcard", "regex"] = "exact",
234) -> list[dict[str, Any] | bytes]:
235 """Filter packets that contain a pattern.
237 Implements RE-PAY-002: Pattern-based filtering.
239 Args:
240 packets: Sequence of packets.
241 pattern: Pattern to match.
242 pattern_type: Type of pattern matching.
244 Returns:
245 List of packets containing the pattern.
246 """
247 extractor = PayloadExtractor()
248 result = []
250 for packet in packets:
251 payload = extractor.extract_payload(packet)
252 if isinstance(payload, memoryview | np.ndarray):
253 payload = bytes(payload)
255 matches = _find_pattern_in_data(payload, pattern, pattern_type)
256 if len(matches) > 0:
257 result.append(packet)
259 return result
262# =============================================================================
263# RE-PAY-003: Delimiter Detection Functions
264# =============================================================================
267def detect_delimiter(
268 payloads: Sequence[bytes] | bytes,
269 candidates: list[bytes] | None = None,
270) -> DelimiterResult:
271 """Automatically detect message delimiter.
273 Implements RE-PAY-003: Delimiter detection.
275 Args:
276 payloads: Payload data or list of payloads.
277 candidates: Optional list of candidate delimiters to test.
279 Returns:
280 DelimiterResult with detected delimiter info.
282 Example:
283 >>> data = b'msg1\\r\\nmsg2\\r\\nmsg3\\r\\n'
284 >>> result = detect_delimiter(data)
285 >>> print(f"Delimiter: {result.delimiter!r}")
286 """
287 # Combine payloads if list
288 if isinstance(payloads, list | tuple):
289 data: bytes = b"".join(payloads)
290 else:
291 # Type narrowing: payloads is bytes here
292 data = cast("bytes", payloads)
294 if not data:
295 return DelimiterResult(
296 delimiter=b"",
297 delimiter_type="fixed",
298 confidence=0.0,
299 occurrences=0,
300 )
302 # Default candidates
303 if candidates is None:
304 candidates = [
305 b"\r\n", # CRLF
306 b"\n", # LF
307 b"\x00", # Null
308 b"\r", # CR
309 b"\x0d\x0a", # CRLF (explicit)
310 ]
312 best_result = None
313 best_score = 0.0
315 for delim in candidates:
316 if len(delim) == 0:
317 continue
319 count = data.count(delim)
320 if count < 2:
321 continue
323 # Calculate score based on frequency and regularity
324 positions = []
325 pos = 0
326 while True:
327 pos = data.find(delim, pos)
328 if pos == -1:
329 break
330 positions.append(pos)
331 pos += len(delim)
333 if len(positions) < 2:
334 continue
336 # Calculate interval regularity
337 intervals = [positions[i + 1] - positions[i] for i in range(len(positions) - 1)]
338 if len(intervals) > 0:
339 mean_interval = sum(intervals) / len(intervals)
340 if mean_interval > 0:
341 variance = sum((x - mean_interval) ** 2 for x in intervals) / len(intervals)
342 cv = (variance**0.5) / mean_interval
343 regularity = 1.0 / (1.0 + cv)
344 else:
345 regularity = 0.0
346 else:
347 regularity = 0.0
349 # Score combines frequency and regularity
350 score = count * (0.5 + 0.5 * regularity)
352 if score > best_score:
353 best_score = score
354 best_result = DelimiterResult(
355 delimiter=delim,
356 delimiter_type="fixed",
357 confidence=min(1.0, regularity * 0.8 + 0.2 * min(1.0, count / 10)),
358 occurrences=count,
359 positions=positions,
360 )
362 if best_result is None:
363 return DelimiterResult(
364 delimiter=b"",
365 delimiter_type="fixed",
366 confidence=0.0,
367 occurrences=0,
368 )
370 return best_result
373def detect_length_prefix(
374 payloads: Sequence[bytes],
375 max_length_bytes: int = 4,
376) -> LengthPrefixResult:
377 """Detect length-prefixed message format.
379 Implements RE-PAY-003: Length prefix detection.
381 Args:
382 payloads: List of payload samples.
383 max_length_bytes: Maximum length field size to test.
385 Returns:
386 LengthPrefixResult with detected format.
388 Example:
389 >>> result = detect_length_prefix(payloads)
390 >>> if result.detected:
391 ... print(f"Length field: {result.length_bytes} bytes, {result.endian}")
392 """
393 if not payloads:
394 return LengthPrefixResult(detected=False)
396 # Concatenate payloads for analysis
397 data = b"".join(payloads)
399 best_result = LengthPrefixResult(detected=False)
400 best_score = 0.0
402 # Try different length field sizes and offsets
403 # IMPORTANT: Prefer larger length_bytes values when scores are equal
404 # by iterating in reverse order (4, 2, 1) and using >= for comparison
405 for length_bytes in [4, 2, 1]:
406 if length_bytes > max_length_bytes:
407 continue
409 for endian_str in ["big", "little"]:
410 endian: Literal["big", "little"] = endian_str # type: ignore[assignment]
411 for offset in range(min(8, len(data) - length_bytes)):
412 for includes_length in [False, True]:
413 score, matches = _test_length_prefix(
414 data, length_bytes, endian, offset, includes_length
415 )
417 # Use > to prefer larger length_bytes (tested first) when scores are equal
418 if score > best_score and matches >= 3:
419 best_score = score
420 best_result = LengthPrefixResult(
421 detected=True,
422 length_bytes=length_bytes,
423 endian=endian,
424 offset=offset,
425 includes_length=includes_length,
426 confidence=score,
427 )
429 return best_result
432def find_message_boundaries(
433 payloads: Sequence[bytes] | bytes,
434 delimiter: bytes | DelimiterResult | None = None,
435 length_prefix: LengthPrefixResult | None = None,
436) -> list[MessageBoundary]:
437 """Find message boundaries in payload data.
439 Implements RE-PAY-003: Message boundary detection.
441 Args:
442 payloads: Payload data or list of payloads.
443 delimiter: Delimiter to use (auto-detect if None).
444 length_prefix: Length prefix format (test if None).
446 Returns:
447 List of MessageBoundary objects.
449 Example:
450 >>> boundaries = find_message_boundaries(data)
451 >>> for b in boundaries:
452 ... print(f"Message {b.index}: {b.length} bytes")
453 """
454 # Combine payloads if list
455 if isinstance(payloads, list | tuple):
456 data: bytes = b"".join(payloads)
457 else:
458 # Type narrowing: payloads is bytes here
459 data = cast("bytes", payloads)
461 if not data:
462 return []
464 boundaries = []
466 # Try length prefix first
467 if length_prefix is None:
468 length_prefix = detect_length_prefix([data] if isinstance(data, bytes) else list(payloads))
470 if length_prefix.detected:
471 boundaries = _extract_length_prefixed_messages(data, length_prefix)
472 if len(boundaries) > 0:
473 return boundaries
475 # Fall back to delimiter
476 if delimiter is None:
477 delimiter = detect_delimiter(data)
479 if isinstance(delimiter, DelimiterResult):
480 delim = delimiter.delimiter
481 else:
482 delim = delimiter
484 if not delim:
485 # No delimiter found, return whole data as one message
486 return [MessageBoundary(start=0, end=len(data), length=len(data), data=data, index=0)]
488 # Split by delimiter
489 parts = data.split(delim)
490 current_offset = 0
492 for _i, part in enumerate(parts):
493 if part: # Skip empty parts
494 boundaries.append(
495 MessageBoundary(
496 start=current_offset,
497 end=current_offset + len(part),
498 length=len(part),
499 data=part,
500 index=len(boundaries),
501 )
502 )
503 current_offset += len(part) + len(delim)
505 return boundaries
508def segment_messages(
509 payloads: Sequence[bytes] | bytes,
510 delimiter: bytes | None = None,
511 length_prefix: LengthPrefixResult | None = None,
512) -> list[bytes]:
513 """Segment stream into individual messages.
515 Implements RE-PAY-003: Message segmentation.
517 Args:
518 payloads: Payload data or list of payloads.
519 delimiter: Delimiter to use (auto-detect if None).
520 length_prefix: Length prefix format (auto-detect if None).
522 Returns:
523 List of message bytes.
524 """
525 boundaries = find_message_boundaries(payloads, delimiter, length_prefix)
526 return [b.data for b in boundaries]
529# =============================================================================
530# Helper Functions
531# =============================================================================
534def _find_pattern_in_data(
535 data: bytes,
536 pattern: bytes | str,
537 pattern_type: str,
538) -> list[tuple[int, bytes]]:
539 """Find pattern occurrences in data."""
540 matches = []
542 if pattern_type == "exact":
543 if isinstance(pattern, str):
544 pattern = pattern.encode()
545 pos = 0
546 while True:
547 pos = data.find(pattern, pos)
548 if pos == -1:
549 break
550 matches.append((pos, pattern))
551 pos += 1
553 elif pattern_type == "wildcard":
554 # Convert wildcard pattern to regex
555 if isinstance(pattern, bytes):
556 # Replace ?? with . for single byte match
557 regex_pattern = pattern.replace(b"??", b".")
558 try:
559 for match in re.finditer(regex_pattern, data, re.DOTALL):
560 matches.append((match.start(), match.group()))
561 except re.error:
562 pass
564 elif pattern_type == "regex":
565 if isinstance(pattern, str):
566 pattern = pattern.encode()
567 try:
568 for match in re.finditer(pattern, data, re.DOTALL):
569 matches.append((match.start(), match.group()))
570 except re.error:
571 pass
573 return matches
576def _test_length_prefix(
577 data: bytes,
578 length_bytes: int,
579 endian: str,
580 offset: int,
581 includes_length: bool,
582) -> tuple[float, int]:
583 """Test if data follows a length-prefix pattern."""
584 matches = 0
585 pos = 0
587 while pos + offset + length_bytes <= len(data):
588 # Read length field
589 length_data = data[pos + offset : pos + offset + length_bytes]
590 if endian == "big":
591 length = int.from_bytes(length_data, "big")
592 else:
593 length = int.from_bytes(length_data, "little")
595 if includes_length:
596 expected_end = pos + length
597 else:
598 expected_end = pos + offset + length_bytes + length
600 # Check if this makes sense
601 if 0 < length < 65536 and expected_end <= len(data):
602 matches += 1
603 pos = expected_end
604 else:
605 break
607 # Score based on matches and coverage
608 coverage = pos / len(data) if len(data) > 0 else 0
609 score = min(1.0, matches / 5) * coverage
611 return score, matches
614def _extract_length_prefixed_messages(
615 data: bytes,
616 length_prefix: LengthPrefixResult,
617) -> list[MessageBoundary]:
618 """Extract messages using detected length prefix format."""
619 boundaries = []
620 pos = 0
621 index = 0
623 while pos + length_prefix.offset + length_prefix.length_bytes <= len(data):
624 # Read length
625 length_data = data[
626 pos + length_prefix.offset : pos + length_prefix.offset + length_prefix.length_bytes
627 ]
628 if length_prefix.endian == "big":
629 length = int.from_bytes(length_data, "big")
630 else:
631 length = int.from_bytes(length_data, "little")
633 if length_prefix.includes_length:
634 end = pos + length
635 else:
636 end = pos + length_prefix.offset + length_prefix.length_bytes + length
638 if end > len(data) or length <= 0:
639 break
641 msg_data = data[pos:end]
642 boundaries.append(
643 MessageBoundary(
644 start=pos,
645 end=end,
646 length=end - pos,
647 data=msg_data,
648 index=index,
649 )
650 )
652 pos = end
653 index += 1
655 return boundaries
658__all__ = [
659 "DelimiterResult",
660 "LengthPrefixResult",
661 "MessageBoundary",
662 "PatternMatch",
663 "detect_delimiter",
664 "detect_length_prefix",
665 "filter_by_pattern",
666 "find_message_boundaries",
667 "search_pattern",
668 "search_patterns",
669 "segment_messages",
670]