Coverage for src / tracekit / analyzers / packet / payload.py: 74%
774 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Payload extraction and analysis framework for network packets.
3 - RE-PAY-001: Payload Extraction Framework
4 - RE-PAY-002: Payload Pattern Search
5 - RE-PAY-003: Payload Delimiter Detection
6 - RE-PAY-004: Payload Field Inference
7 - RE-PAY-005: Payload Comparison and Differential Analysis
9This module provides comprehensive payload extraction from PCAP packets,
10pattern search capabilities, delimiter detection, and comparison tools.
11"""
13from __future__ import annotations
15import logging
16import re
17import struct
18from collections import Counter
19from collections.abc import Iterator, Sequence
20from dataclasses import dataclass, field
21from typing import Any, Literal, cast
23import numpy as np
25logger = logging.getLogger(__name__)
28@dataclass
29class PayloadInfo:
30 """Extracted payload with metadata.
32 Implements RE-PAY-001: Payload with preserved metadata.
34 Attributes:
35 data: Payload bytes.
36 packet_index: Index of source packet.
37 timestamp: Packet timestamp (optional).
38 src_ip: Source IP address (optional).
39 dst_ip: Destination IP address (optional).
40 src_port: Source port (optional).
41 dst_port: Destination port (optional).
42 protocol: Protocol name (optional).
43 is_fragment: Whether packet is a fragment.
44 fragment_offset: Fragment offset if fragmented.
45 """
47 data: bytes
48 packet_index: int
49 timestamp: float | None = None
50 src_ip: str | None = None
51 dst_ip: str | None = None
52 src_port: int | None = None
53 dst_port: int | None = None
54 protocol: str | None = None
55 is_fragment: bool = False
56 fragment_offset: int = 0
59@dataclass
60class PatternMatch:
61 """Pattern match result.
63 Implements RE-PAY-002: Pattern match with location info.
65 Attributes:
66 pattern_name: Name of matched pattern.
67 offset: Byte offset within payload.
68 matched: Matched bytes.
69 packet_index: Source packet index.
70 context: Surrounding bytes for context.
71 """
73 pattern_name: str
74 offset: int
75 matched: bytes
76 packet_index: int
77 context: bytes = b""
80@dataclass
81class DelimiterResult:
82 """Detected delimiter information.
84 Implements RE-PAY-003: Delimiter detection result.
86 Attributes:
87 delimiter: Detected delimiter bytes.
88 delimiter_type: Type of delimiter (fixed, length_prefix, pattern).
89 confidence: Detection confidence (0-1).
90 occurrences: Number of occurrences found.
91 positions: List of positions where delimiter found.
92 """
94 delimiter: bytes
95 delimiter_type: Literal["fixed", "length_prefix", "pattern"]
96 confidence: float
97 occurrences: int
98 positions: list[int] = field(default_factory=list)
101@dataclass
102class LengthPrefixResult:
103 """Length prefix detection result.
105 Implements RE-PAY-003: Length prefix format detection.
107 Attributes:
108 detected: Whether length prefix was detected.
109 length_bytes: Number of bytes for length field.
110 endian: Endianness (big or little).
111 offset: Offset of length field from message start.
112 includes_length: Whether length includes the length field itself.
113 confidence: Detection confidence (0-1).
114 """
116 detected: bool
117 length_bytes: int = 0
118 endian: Literal["big", "little"] = "big"
119 offset: int = 0
120 includes_length: bool = False
121 confidence: float = 0.0
124@dataclass
125class MessageBoundary:
126 """Message boundary information.
128 Implements RE-PAY-003: Message boundary detection.
130 Attributes:
131 start: Start offset of message.
132 end: End offset of message.
133 length: Message length.
134 data: Message data.
135 index: Message index.
136 """
138 start: int
139 end: int
140 length: int
141 data: bytes
142 index: int
145@dataclass
146class PayloadDiff:
147 """Difference between two payloads.
149 Implements RE-PAY-005: Payload comparison result.
151 Attributes:
152 common_prefix_length: Length of common prefix.
153 common_suffix_length: Length of common suffix.
154 differences: List of (offset, byte_a, byte_b) for differences.
155 similarity: Similarity score (0-1).
156 edit_distance: Levenshtein edit distance.
157 """
159 common_prefix_length: int
160 common_suffix_length: int
161 differences: list[tuple[int, int, int]]
162 similarity: float
163 edit_distance: int
166@dataclass
167class VariablePositions:
168 """Analysis of which byte positions vary across payloads.
170 Implements RE-PAY-005: Variable position analysis.
172 Attributes:
173 constant_positions: Positions that are constant.
174 variable_positions: Positions that vary.
175 constant_values: Values at constant positions.
176 variance_by_position: Variance at each position.
177 """
179 constant_positions: list[int]
180 variable_positions: list[int]
181 constant_values: dict[int, int]
182 variance_by_position: np.ndarray[tuple[int], np.dtype[np.float64]]
185@dataclass
186class PayloadCluster:
187 """Cluster of similar payloads.
189 Implements RE-PAY-005: Payload clustering result.
191 Attributes:
192 cluster_id: Cluster identifier.
193 payloads: List of payload data in cluster.
194 indices: Original indices of payloads.
195 representative: Representative payload (centroid).
196 size: Number of payloads in cluster.
197 """
199 cluster_id: int
200 payloads: list[bytes]
201 indices: list[int]
202 representative: bytes
203 size: int
206# =============================================================================
207# RE-PAY-004: Payload Field Inference
208# =============================================================================
211@dataclass
212class InferredField:
213 """Inferred field from binary payload.
215 Implements RE-PAY-004: Inferred field structure.
217 Attributes:
218 name: Field name (auto-generated).
219 offset: Byte offset within message.
220 size: Field size in bytes.
221 inferred_type: Inferred data type.
222 endianness: Detected endianness.
223 is_constant: Whether field is constant across messages.
224 is_sequence: Whether field appears to be a counter/sequence.
225 is_checksum: Whether field appears to be a checksum.
226 constant_value: Value if constant.
227 confidence: Inference confidence (0-1).
228 sample_values: Sample values from messages.
229 """
231 name: str
232 offset: int
233 size: int
234 inferred_type: Literal[
235 "uint8",
236 "uint16",
237 "uint32",
238 "uint64",
239 "int8",
240 "int16",
241 "int32",
242 "int64",
243 "float32",
244 "float64",
245 "bytes",
246 "string",
247 "unknown",
248 ]
249 endianness: Literal["big", "little", "n/a"] = "n/a"
250 is_constant: bool = False
251 is_sequence: bool = False
252 is_checksum: bool = False
253 constant_value: bytes | None = None
254 confidence: float = 0.5
255 sample_values: list[Any] = field(default_factory=list)
258@dataclass
259class MessageSchema:
260 """Inferred message schema.
262 Implements RE-PAY-004: Complete message schema.
264 Attributes:
265 fields: List of inferred fields.
266 message_length: Total message length.
267 fixed_length: Whether all messages have same length.
268 length_range: (min, max) length range.
269 sample_count: Number of samples analyzed.
270 confidence: Overall schema confidence.
271 """
273 fields: list[InferredField]
274 message_length: int
275 fixed_length: bool
276 length_range: tuple[int, int]
277 sample_count: int
278 confidence: float
281class FieldInferrer:
282 """Infer field structure within binary payloads.
284 Implements RE-PAY-004: Payload Field Inference.
286 Uses statistical analysis, alignment detection, and type inference
287 to reconstruct message formats from binary payload samples.
289 Example:
290 >>> inferrer = FieldInferrer()
291 >>> messages = [pkt.data for pkt in udp_packets]
292 >>> schema = inferrer.infer_fields(messages)
293 >>> for field in schema.fields:
294 ... print(f"{field.name}: {field.inferred_type} at offset {field.offset}")
295 """
297 def __init__(
298 self,
299 min_samples: int = 10,
300 entropy_threshold: float = 0.5,
301 sequence_threshold: int = 3,
302 ) -> None:
303 """Initialize field inferrer.
305 Args:
306 min_samples: Minimum samples for reliable inference.
307 entropy_threshold: Entropy change threshold for boundary detection.
308 sequence_threshold: Minimum consecutive incrementing values for sequence.
309 """
310 self.min_samples = min_samples
311 self.entropy_threshold = entropy_threshold
312 self.sequence_threshold = sequence_threshold
314 def infer_fields(
315 self,
316 messages: Sequence[bytes],
317 min_samples: int | None = None,
318 ) -> MessageSchema:
319 """Infer field structure from message samples.
321 Implements RE-PAY-004: Complete field inference.
323 Args:
324 messages: List of binary message samples.
325 min_samples: Override minimum sample count.
327 Returns:
328 MessageSchema with inferred field structure.
330 Example:
331 >>> schema = inferrer.infer_fields(messages)
332 >>> print(f"Detected {len(schema.fields)} fields")
333 """
334 if not messages:
335 return MessageSchema(
336 fields=[],
337 message_length=0,
338 fixed_length=True,
339 length_range=(0, 0),
340 sample_count=0,
341 confidence=0.0,
342 )
344 min_samples = min_samples or self.min_samples
345 lengths = [len(m) for m in messages]
346 min_len = min(lengths)
347 max_len = max(lengths)
348 fixed_length = min_len == max_len
350 # Use shortest message length for analysis
351 analysis_length = min_len
353 # Find field boundaries using entropy transitions
354 boundaries = self._detect_field_boundaries(messages, analysis_length)
356 # Infer field types for each segment
357 fields = []
358 for i, (start, end) in enumerate(boundaries):
359 field = self._infer_field(messages, start, end, i)
360 fields.append(field)
362 # Calculate overall confidence
363 if fields: 363 ↛ 366line 363 didn't jump to line 366 because the condition on line 363 was always true
364 confidence = sum(f.confidence for f in fields) / len(fields)
365 else:
366 confidence = 0.0
368 return MessageSchema(
369 fields=fields,
370 message_length=analysis_length,
371 fixed_length=fixed_length,
372 length_range=(min_len, max_len),
373 sample_count=len(messages),
374 confidence=confidence,
375 )
377 def detect_field_types(
378 self,
379 messages: Sequence[bytes],
380 boundaries: list[tuple[int, int]],
381 ) -> list[InferredField]:
382 """Detect field types for given boundaries.
384 Implements RE-PAY-004: Field type detection.
386 Args:
387 messages: Message samples.
388 boundaries: List of (start, end) field boundaries.
390 Returns:
391 List of InferredField with type information.
392 """
393 fields = []
394 for i, (start, end) in enumerate(boundaries):
395 field = self._infer_field(messages, start, end, i)
396 fields.append(field)
397 return fields
399 def find_sequence_fields(
400 self,
401 messages: Sequence[bytes],
402 ) -> list[tuple[int, int]]:
403 """Find fields that appear to be sequence/counter values.
405 Implements RE-PAY-004: Sequence field detection.
407 Args:
408 messages: Message samples (should be in order).
410 Returns:
411 List of (offset, size) for sequence fields.
413 Raises:
414 ValueError: If messages are too short for field extraction.
415 """
416 if len(messages) < self.sequence_threshold: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true
417 return []
419 min_len = min(len(m) for m in messages)
420 sequence_fields = []
422 # Check each possible field size at each offset
423 for size in [1, 2, 4]:
424 for offset in range(min_len - size + 1):
425 values = []
426 try:
427 for msg in messages:
428 # Validate message length before slicing
429 if len(msg) < offset + size: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true
430 raise ValueError(
431 f"Message too short: expected at least {offset + size} bytes, "
432 f"got {len(msg)} bytes"
433 )
434 # Try both endianness
435 val_be = int.from_bytes(msg[offset : offset + size], "big")
436 values.append(val_be)
438 if self._is_sequence(values):
439 sequence_fields.append((offset, size))
440 except (ValueError, IndexError) as e:
441 # Skip this offset/size combination if extraction fails
442 logger.debug(f"Skipping field at offset={offset}, size={size}: {e}")
443 continue
445 return sequence_fields
447 def find_checksum_fields(
448 self,
449 messages: Sequence[bytes],
450 ) -> list[tuple[int, int, str]]:
451 """Find fields that appear to be checksums.
453 Implements RE-PAY-004: Checksum field detection.
455 Args:
456 messages: Message samples.
458 Returns:
459 List of (offset, size, algorithm_hint) for checksum fields.
461 Raises:
462 ValueError: If checksum field validation fails.
463 """
464 if len(messages) < 5: 464 ↛ 467line 464 didn't jump to line 467 because the condition on line 464 was always true
465 return []
467 min_len = min(len(m) for m in messages)
468 checksum_fields = []
470 # Common checksum sizes and positions
471 for size in [1, 2, 4]:
472 # Check last position (most common)
473 for offset in [min_len - size, 0]:
474 if offset < 0:
475 continue
477 try:
478 # Validate offset and size before processing
479 if offset + size > min_len:
480 raise ValueError(
481 f"Invalid checksum field: offset={offset} + size={size} exceeds "
482 f"minimum message length={min_len}"
483 )
485 # Extract field values and message content
486 score = self._check_checksum_correlation(messages, offset, size)
488 if score > 0.8:
489 algorithm = self._guess_checksum_algorithm(messages, offset, size)
490 checksum_fields.append((offset, size, algorithm))
491 except (ValueError, IndexError) as e:
492 # Skip this offset/size combination if validation fails
493 logger.debug(f"Skipping checksum field at offset={offset}, size={size}: {e}")
494 continue
496 return checksum_fields
498 def _detect_field_boundaries(
499 self,
500 messages: Sequence[bytes],
501 max_length: int,
502 ) -> list[tuple[int, int]]:
503 """Detect field boundaries using entropy analysis.
505 Args:
506 messages: Message samples.
507 max_length: Maximum length to analyze.
509 Returns:
510 List of (start, end) boundaries.
511 """
512 if max_length == 0: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 return []
515 # Calculate per-byte entropy
516 byte_entropies = []
517 for pos in range(max_length):
518 values = [m[pos] for m in messages if len(m) > pos]
519 if len(values) < 2:
520 byte_entropies.append(0.0)
521 continue
523 counts = Counter(values)
524 total = len(values)
525 entropy = 0.0
526 for count in counts.values():
527 if count > 0: 527 ↛ 526line 527 didn't jump to line 526 because the condition on line 527 was always true
528 p = count / total
529 entropy -= p * np.log2(p)
530 byte_entropies.append(entropy)
532 # Find boundaries at entropy transitions
533 boundaries = []
534 current_start = 0
536 for i in range(1, len(byte_entropies)):
537 delta = abs(byte_entropies[i] - byte_entropies[i - 1])
539 # Also check for constant vs variable patterns
540 if delta > self.entropy_threshold:
541 if i > current_start: 541 ↛ 543line 541 didn't jump to line 543 because the condition on line 541 was always true
542 boundaries.append((current_start, i))
543 current_start = i
545 # Add final segment
546 if max_length > current_start: 546 ↛ 550line 546 didn't jump to line 550 because the condition on line 546 was always true
547 boundaries.append((current_start, max_length))
549 # Merge very small segments
550 merged: list[tuple[int, int]] = []
551 for start, end in boundaries:
552 if merged and start - merged[-1][1] == 0 and end - start < 2:
553 # Merge with previous
554 merged[-1] = (merged[-1][0], end)
555 else:
556 merged.append((start, end))
558 return merged if merged else [(0, max_length)]
560 def _infer_field(
561 self,
562 messages: Sequence[bytes],
563 start: int,
564 end: int,
565 index: int,
566 ) -> InferredField:
567 """Infer type for a single field.
569 Args:
570 messages: Message samples.
571 start: Field start offset.
572 end: Field end offset.
573 index: Field index for naming.
575 Returns:
576 InferredField with inferred type.
577 """
578 size = end - start
579 name = f"field_{index}"
581 # Extract field values
582 values = []
583 raw_values = []
584 for msg in messages:
585 if len(msg) >= end: 585 ↛ 584line 585 didn't jump to line 584 because the condition on line 585 was always true
586 field_bytes = msg[start:end]
587 raw_values.append(field_bytes)
588 values.append(field_bytes)
590 if not values: 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true
591 return InferredField(
592 name=name,
593 offset=start,
594 size=size,
595 inferred_type="unknown",
596 confidence=0.0,
597 )
599 # Check if constant
600 unique_values = set(raw_values)
601 is_constant = len(unique_values) == 1
603 # Check if sequence
604 is_sequence = False
605 if not is_constant and size in [1, 2, 4, 8]:
606 int_values = [int.from_bytes(v, "big") for v in raw_values]
607 is_sequence = self._is_sequence(int_values)
609 # Check for checksum patterns
610 is_checksum = False
611 if start >= min(len(m) for m in messages) - 4:
612 score = self._check_checksum_correlation(messages, start, size)
613 is_checksum = score > 0.7
615 # Infer type
616 inferred_type, endianness, confidence = self._infer_type(raw_values, size)
618 # Sample values for debugging
619 sample_values: list[int | str] = []
620 for v in raw_values[:5]:
621 if inferred_type.startswith("uint") or inferred_type.startswith("int"):
622 try:
623 # Cast endianness to Literal type for type checker
624 byte_order: Literal["big", "little"] = (
625 "big" if endianness == "n/a" else endianness # type: ignore[assignment]
626 )
627 sample_values.append(int.from_bytes(v, byte_order))
628 except Exception:
629 sample_values.append(v.hex())
630 elif inferred_type == "string":
631 try:
632 sample_values.append(v.decode("utf-8", errors="replace"))
633 except Exception:
634 sample_values.append(v.hex())
635 else:
636 sample_values.append(v.hex())
638 # Cast to Literal types for type checker
639 inferred_type_literal: Literal[
640 "uint8",
641 "uint16",
642 "uint32",
643 "uint64",
644 "int8",
645 "int16",
646 "int32",
647 "int64",
648 "float32",
649 "float64",
650 "bytes",
651 "string",
652 "unknown",
653 ] = inferred_type # type: ignore[assignment]
654 endianness_literal: Literal["big", "little", "n/a"] = endianness # type: ignore[assignment]
656 return InferredField(
657 name=name,
658 offset=start,
659 size=size,
660 inferred_type=inferred_type_literal,
661 endianness=endianness_literal,
662 is_constant=is_constant,
663 is_sequence=is_sequence,
664 is_checksum=is_checksum,
665 constant_value=raw_values[0] if is_constant else None,
666 confidence=confidence,
667 sample_values=sample_values,
668 )
670 def _infer_type(
671 self,
672 values: list[bytes],
673 size: int,
674 ) -> tuple[str, str, float]:
675 """Infer data type from values.
677 Args:
678 values: Field values.
679 size: Field size.
681 Returns:
682 Tuple of (type, endianness, confidence).
683 """
684 if not values: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true
685 return "unknown", "n/a", 0.0
687 # Check for string (high printable ratio)
688 printable_ratio = sum(
689 1 for v in values for b in v if 32 <= b <= 126 or b in (9, 10, 13)
690 ) / (len(values) * size)
692 if printable_ratio > 0.8:
693 return "string", "n/a", printable_ratio
695 # Check for standard integer sizes
696 if size == 1:
697 return "uint8", "n/a", 0.9
699 elif size == 2:
700 # Try to detect endianness
701 be_variance = np.var([int.from_bytes(v, "big") for v in values])
702 le_variance = np.var([int.from_bytes(v, "little") for v in values])
704 if be_variance < le_variance:
705 endian = "big"
706 else:
707 endian = "little"
709 return "uint16", endian, 0.8
711 elif size == 4: 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was never true
712 # Check for float
713 float_valid = 0
714 for v in values:
715 try:
716 f = struct.unpack(">f", v)[0]
717 if not (np.isnan(f) or np.isinf(f)) and -1e10 < f < 1e10:
718 float_valid += 1
719 except Exception:
720 pass
722 if float_valid / len(values) > 0.8:
723 return "float32", "big", 0.7
725 # Otherwise integer
726 be_variance = np.var([int.from_bytes(v, "big") for v in values])
727 le_variance = np.var([int.from_bytes(v, "little") for v in values])
728 endian = "big" if be_variance < le_variance else "little"
729 return "uint32", endian, 0.8
731 elif size == 8: 731 ↛ 733line 731 didn't jump to line 733 because the condition on line 731 was never true
732 # Check for float64 or uint64
733 be_variance = np.var([int.from_bytes(v, "big") for v in values])
734 le_variance = np.var([int.from_bytes(v, "little") for v in values])
735 endian = "big" if be_variance < le_variance else "little"
736 return "uint64", endian, 0.7
738 else:
739 return "bytes", "n/a", 0.6
741 def _is_sequence(self, values: list[int]) -> bool:
742 """Check if values form a sequence.
744 Args:
745 values: Integer values.
747 Returns:
748 True if values are incrementing/decrementing.
749 """
750 if len(values) < self.sequence_threshold: 750 ↛ 751line 750 didn't jump to line 751 because the condition on line 750 was never true
751 return False
753 # Check for incrementing sequence
754 diffs = [values[i + 1] - values[i] for i in range(len(values) - 1)]
756 # Most diffs should be 1 (or consistent)
757 counter = Counter(diffs)
758 if not counter: 758 ↛ 759line 758 didn't jump to line 759 because the condition on line 758 was never true
759 return False
761 most_common_diff, count = counter.most_common(1)[0]
762 ratio = count / len(diffs)
764 return ratio > 0.8 and most_common_diff in [1, -1, 0]
766 def _check_checksum_correlation(
767 self,
768 messages: Sequence[bytes],
769 offset: int,
770 size: int,
771 ) -> float:
772 """Check if field correlates with message content like a checksum.
774 Args:
775 messages: Message samples.
776 offset: Field offset.
777 size: Field size.
779 Returns:
780 Correlation score (0-1).
781 """
782 # Simple heuristic: checksum fields have high correlation with
783 # changes in other parts of the message
785 if len(messages) < 5: 785 ↛ 789line 785 didn't jump to line 789 because the condition on line 785 was always true
786 return 0.0
788 # Extract checksum values and message content
789 checksums = []
790 contents = []
792 for msg in messages:
793 if len(msg) >= offset + size:
794 checksums.append(int.from_bytes(msg[offset : offset + size], "big"))
795 # Content before checksum
796 content = msg[:offset] + msg[offset + size :]
797 contents.append(sum(content) % 65536)
799 if len(checksums) < 5:
800 return 0.0
802 # Check if checksum changes correlate with content changes
803 unique_contents = len(set(contents))
804 unique_checksums = len(set(checksums))
806 if unique_contents == 1 and unique_checksums == 1:
807 return 0.3 # Both constant - inconclusive
809 # Simple correlation check
810 if unique_contents > 1 and unique_checksums > 1:
811 return 0.8
813 return 0.3
815 def _guess_checksum_algorithm(
816 self,
817 messages: Sequence[bytes],
818 offset: int,
819 size: int,
820 ) -> str:
821 """Guess the checksum algorithm.
823 Args:
824 messages: Message samples.
825 offset: Checksum offset.
826 size: Checksum size.
828 Returns:
829 Algorithm name hint.
830 """
831 if size == 1:
832 return "xor8_or_sum8"
833 elif size == 2:
834 return "crc16_or_sum16"
835 elif size == 4:
836 return "crc32"
837 return "unknown"
840# =============================================================================
841# RE-PAY-004: Convenience functions
842# =============================================================================
845def infer_fields(messages: Sequence[bytes], min_samples: int = 10) -> MessageSchema:
846 """Infer field structure from message samples.
848 Implements RE-PAY-004: Payload Field Inference.
850 Args:
851 messages: List of binary message samples.
852 min_samples: Minimum samples for reliable inference.
854 Returns:
855 MessageSchema with inferred field structure.
857 Example:
858 >>> messages = [pkt.data for pkt in packets]
859 >>> schema = infer_fields(messages)
860 >>> for field in schema.fields:
861 ... print(f"{field.name}: {field.inferred_type}")
862 """
863 inferrer = FieldInferrer(min_samples=min_samples)
864 return inferrer.infer_fields(messages)
867def detect_field_types(
868 messages: Sequence[bytes],
869 boundaries: list[tuple[int, int]],
870) -> list[InferredField]:
871 """Detect field types for given boundaries.
873 Implements RE-PAY-004: Field type detection.
875 Args:
876 messages: Message samples.
877 boundaries: List of (start, end) field boundaries.
879 Returns:
880 List of InferredField with type information.
881 """
882 inferrer = FieldInferrer()
883 return inferrer.detect_field_types(messages, boundaries)
886def find_sequence_fields(messages: Sequence[bytes]) -> list[tuple[int, int]]:
887 """Find fields that appear to be sequence/counter values.
889 Implements RE-PAY-004: Sequence field detection.
891 Args:
892 messages: Message samples (should be in order).
894 Returns:
895 List of (offset, size) for sequence fields.
896 """
897 inferrer = FieldInferrer()
898 return inferrer.find_sequence_fields(messages)
901def find_checksum_fields(messages: Sequence[bytes]) -> list[tuple[int, int, str]]:
902 """Find fields that appear to be checksums.
904 Implements RE-PAY-004: Checksum field detection.
906 Args:
907 messages: Message samples.
909 Returns:
910 List of (offset, size, algorithm_hint) for checksum fields.
911 """
912 inferrer = FieldInferrer()
913 return inferrer.find_checksum_fields(messages)
916class PayloadExtractor:
917 """Extract payloads from network packets.
919 Implements RE-PAY-001: Payload Extraction Framework.
921 Provides zero-copy payload extraction from UDP/TCP packets
922 with metadata preservation and fragment handling.
924 Example:
925 >>> extractor = PayloadExtractor()
926 >>> payloads = extractor.extract_all_payloads(packets, protocol="UDP")
927 >>> for p in payloads:
928 ... print(f"{p.src_ip}:{p.src_port} -> {len(p.data)} bytes")
929 """
931 def __init__(
932 self,
933 include_headers: bool = False,
934 zero_copy: bool = True,
935 return_type: Literal["bytes", "memoryview", "numpy"] = "bytes",
936 ) -> None:
937 """Initialize payload extractor.
939 Args:
940 include_headers: Include protocol headers in payload.
941 zero_copy: Use zero-copy memoryview where possible.
942 return_type: Type for returned payload data.
943 """
944 self.include_headers = include_headers
945 self.zero_copy = zero_copy
946 self.return_type = return_type
948 def extract_payload(
949 self,
950 packet: dict[str, Any] | bytes,
951 layer: Literal["ethernet", "ip", "transport", "application"] = "application",
952 ) -> bytes | memoryview | np.ndarray[tuple[int], np.dtype[np.uint8]]:
953 """Extract payload from a single packet.
955 Implements RE-PAY-001: Single packet payload extraction.
957 Args:
958 packet: Packet data (dict with 'data' key or raw bytes).
959 layer: OSI layer to extract from.
961 Returns:
962 Payload data in requested format.
964 Example:
965 >>> payload = extractor.extract_payload(packet)
966 >>> print(f"Payload: {len(payload)} bytes")
967 """
968 # Handle different packet formats
969 if isinstance(packet, dict):
970 raw_data = packet.get("data", packet.get("payload", b""))
971 if isinstance(raw_data, list | tuple):
972 raw_data = bytes(raw_data)
973 else:
974 raw_data = packet
976 if not raw_data:
977 return self._format_output(b"")
979 # For raw bytes, return as-is
980 if layer == "application": 980 ↛ 985line 980 didn't jump to line 985 because the condition on line 980 was always true
981 return self._format_output(raw_data)
983 # Layer-based extraction would require protocol parsing
984 # For now, return full data
985 return self._format_output(raw_data)
987 def extract_all_payloads(
988 self,
989 packets: Sequence[dict[str, Any] | bytes],
990 protocol: str | None = None,
991 port_filter: tuple[int | None, int | None] | None = None,
992 ) -> list[PayloadInfo]:
993 """Extract payloads from all packets with metadata.
995 Implements RE-PAY-001: Batch payload extraction with metadata.
997 Args:
998 packets: Sequence of packets.
999 protocol: Filter by protocol (e.g., "UDP", "TCP").
1000 port_filter: (src_port, dst_port) filter tuple.
1002 Returns:
1003 List of PayloadInfo with extracted data and metadata.
1005 Example:
1006 >>> payloads = extractor.extract_all_payloads(packets, protocol="UDP")
1007 >>> print(f"Extracted {len(payloads)} payloads")
1008 """
1009 results = []
1011 for i, packet in enumerate(packets):
1012 if isinstance(packet, dict): 1012 ↛ 1047line 1012 didn't jump to line 1047 because the condition on line 1012 was always true
1013 # Extract metadata from dict
1014 pkt_protocol = packet.get("protocol", "")
1015 src_port = packet.get("src_port")
1016 dst_port = packet.get("dst_port")
1018 # Apply filters
1019 if protocol and pkt_protocol.upper() != protocol.upper():
1020 continue
1022 if port_filter:
1023 if port_filter[0] is not None and src_port != port_filter[0]:
1024 continue
1025 if port_filter[1] is not None and dst_port != port_filter[1]: 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true
1026 continue
1028 payload = self.extract_payload(packet)
1029 if isinstance(payload, memoryview | np.ndarray): 1029 ↛ 1030line 1029 didn't jump to line 1030 because the condition on line 1029 was never true
1030 payload = bytes(payload)
1032 info = PayloadInfo(
1033 data=payload,
1034 packet_index=i,
1035 timestamp=packet.get("timestamp"),
1036 src_ip=packet.get("src_ip"),
1037 dst_ip=packet.get("dst_ip"),
1038 src_port=src_port,
1039 dst_port=dst_port,
1040 protocol=pkt_protocol,
1041 is_fragment=packet.get("is_fragment", False),
1042 fragment_offset=packet.get("fragment_offset", 0),
1043 )
1044 results.append(info)
1045 else:
1046 # Raw bytes
1047 payload = bytes(packet)
1048 info = PayloadInfo(data=payload, packet_index=i)
1049 results.append(info)
1051 return results
1053 def iter_payloads(
1054 self,
1055 packets: Sequence[dict[str, Any] | bytes],
1056 ) -> Iterator[PayloadInfo]:
1057 """Iterate over payloads for memory-efficient processing.
1059 Implements RE-PAY-001: Streaming payload iteration.
1061 Args:
1062 packets: Sequence of packets.
1064 Yields:
1065 PayloadInfo for each packet.
1066 """
1067 for i, packet in enumerate(packets):
1068 payload = self.extract_payload(packet)
1069 if isinstance(payload, memoryview | np.ndarray): 1069 ↛ 1070line 1069 didn't jump to line 1070 because the condition on line 1069 was never true
1070 payload = bytes(payload)
1072 if isinstance(packet, dict): 1072 ↛ 1073line 1072 didn't jump to line 1073 because the condition on line 1072 was never true
1073 info = PayloadInfo(
1074 data=payload,
1075 packet_index=i,
1076 timestamp=packet.get("timestamp"),
1077 src_ip=packet.get("src_ip"),
1078 dst_ip=packet.get("dst_ip"),
1079 src_port=packet.get("src_port"),
1080 dst_port=packet.get("dst_port"),
1081 protocol=packet.get("protocol"),
1082 )
1083 else:
1084 info = PayloadInfo(data=payload, packet_index=i)
1086 yield info
1088 def _format_output(
1089 self, data: bytes
1090 ) -> bytes | memoryview | np.ndarray[tuple[int], np.dtype[np.uint8]]:
1091 """Format output according to return_type setting."""
1092 if self.return_type == "bytes":
1093 return data
1094 elif self.return_type == "memoryview":
1095 return memoryview(data)
1096 # self.return_type == "numpy"
1097 return np.frombuffer(data, dtype=np.uint8)
1100def search_pattern(
1101 packets: Sequence[dict[str, Any] | bytes],
1102 pattern: bytes | str,
1103 pattern_type: Literal["exact", "wildcard", "regex"] = "exact",
1104 context_bytes: int = 8,
1105) -> list[PatternMatch]:
1106 """Search for pattern in packet payloads.
1108 Implements RE-PAY-002: Payload Pattern Search.
1110 Args:
1111 packets: Sequence of packets to search.
1112 pattern: Pattern to search for.
1113 pattern_type: Type of pattern matching.
1114 context_bytes: Number of context bytes around match.
1116 Returns:
1117 List of PatternMatch results.
1119 Example:
1120 >>> matches = search_pattern(packets, b'\\x00\\x01\\x00\\x00')
1121 >>> for m in matches:
1122 ... print(f"Found at packet {m.packet_index}, offset {m.offset}")
1123 """
1124 extractor = PayloadExtractor()
1125 results = []
1127 for i, packet in enumerate(packets):
1128 payload = extractor.extract_payload(packet)
1129 if isinstance(payload, memoryview | np.ndarray): 1129 ↛ 1130line 1129 didn't jump to line 1130 because the condition on line 1129 was never true
1130 payload = bytes(payload)
1132 matches = _find_pattern_in_data(payload, pattern, pattern_type)
1134 for offset, matched in matches:
1135 # Get context
1136 start = max(0, offset - context_bytes)
1137 end = min(len(payload), offset + len(matched) + context_bytes)
1138 context = payload[start:end]
1140 results.append(
1141 PatternMatch(
1142 pattern_name=pattern.hex() if isinstance(pattern, bytes) else str(pattern),
1143 offset=offset,
1144 matched=matched,
1145 packet_index=i,
1146 context=context,
1147 )
1148 )
1150 return results
1153def search_patterns(
1154 packets: Sequence[dict[str, Any] | bytes],
1155 patterns: dict[str, bytes | str],
1156 context_bytes: int = 8,
1157) -> dict[str, list[PatternMatch]]:
1158 """Search for multiple patterns simultaneously.
1160 Implements RE-PAY-002: Multi-pattern search.
1162 Args:
1163 packets: Sequence of packets to search.
1164 patterns: Dictionary mapping names to patterns.
1165 context_bytes: Number of context bytes around match.
1167 Returns:
1168 Dictionary mapping pattern names to match lists.
1170 Example:
1171 >>> signatures = {
1172 ... "header_a": b'\\xAA\\x55',
1173 ... "header_b": b'\\xDE\\xAD',
1174 ... }
1175 >>> results = search_patterns(packets, signatures)
1176 >>> for name, matches in results.items():
1177 ... print(f"{name}: {len(matches)} matches")
1178 """
1179 results: dict[str, list[PatternMatch]] = {name: [] for name in patterns}
1180 extractor = PayloadExtractor()
1182 for i, packet in enumerate(packets):
1183 payload = extractor.extract_payload(packet)
1184 if isinstance(payload, memoryview | np.ndarray): 1184 ↛ 1185line 1184 didn't jump to line 1185 because the condition on line 1184 was never true
1185 payload = bytes(payload)
1187 for name, pattern in patterns.items():
1188 # Detect pattern type
1189 if isinstance(pattern, bytes): 1189 ↛ 1195line 1189 didn't jump to line 1195 because the condition on line 1189 was always true
1190 if b"??" in pattern or b"\\x??" in pattern: 1190 ↛ 1191line 1190 didn't jump to line 1191 because the condition on line 1190 was never true
1191 pattern_type = "wildcard"
1192 else:
1193 pattern_type = "exact"
1194 else:
1195 pattern_type = "regex"
1197 matches = _find_pattern_in_data(payload, pattern, pattern_type)
1199 for offset, matched in matches:
1200 start = max(0, offset - context_bytes)
1201 end = min(len(payload), offset + len(matched) + context_bytes)
1202 context = payload[start:end]
1204 results[name].append(
1205 PatternMatch(
1206 pattern_name=name,
1207 offset=offset,
1208 matched=matched,
1209 packet_index=i,
1210 context=context,
1211 )
1212 )
1214 return results
1217def filter_by_pattern(
1218 packets: Sequence[dict[str, Any] | bytes],
1219 pattern: bytes | str,
1220 pattern_type: Literal["exact", "wildcard", "regex"] = "exact",
1221) -> list[dict[str, Any] | bytes]:
1222 """Filter packets that contain a pattern.
1224 Implements RE-PAY-002: Pattern-based filtering.
1226 Args:
1227 packets: Sequence of packets.
1228 pattern: Pattern to match.
1229 pattern_type: Type of pattern matching.
1231 Returns:
1232 List of packets containing the pattern.
1233 """
1234 extractor = PayloadExtractor()
1235 result = []
1237 for packet in packets:
1238 payload = extractor.extract_payload(packet)
1239 if isinstance(payload, memoryview | np.ndarray): 1239 ↛ 1240line 1239 didn't jump to line 1240 because the condition on line 1239 was never true
1240 payload = bytes(payload)
1242 matches = _find_pattern_in_data(payload, pattern, pattern_type)
1243 if len(matches) > 0:
1244 result.append(packet)
1246 return result
1249def detect_delimiter(
1250 payloads: Sequence[bytes] | bytes,
1251 candidates: list[bytes] | None = None,
1252) -> DelimiterResult:
1253 """Automatically detect message delimiter.
1255 Implements RE-PAY-003: Delimiter detection.
1257 Args:
1258 payloads: Payload data or list of payloads.
1259 candidates: Optional list of candidate delimiters to test.
1261 Returns:
1262 DelimiterResult with detected delimiter info.
1264 Example:
1265 >>> data = b'msg1\\r\\nmsg2\\r\\nmsg3\\r\\n'
1266 >>> result = detect_delimiter(data)
1267 >>> print(f"Delimiter: {result.delimiter!r}")
1268 """
1269 # Combine payloads if list
1270 if isinstance(payloads, list | tuple):
1271 data: bytes = b"".join(payloads)
1272 else:
1273 # Type narrowing: payloads is bytes here
1274 data = cast("bytes", payloads)
1276 if not data: 1276 ↛ 1277line 1276 didn't jump to line 1277 because the condition on line 1276 was never true
1277 return DelimiterResult(
1278 delimiter=b"",
1279 delimiter_type="fixed",
1280 confidence=0.0,
1281 occurrences=0,
1282 )
1284 # Default candidates
1285 if candidates is None:
1286 candidates = [
1287 b"\r\n", # CRLF
1288 b"\n", # LF
1289 b"\x00", # Null
1290 b"\r", # CR
1291 b"\x0d\x0a", # CRLF (explicit)
1292 ]
1294 best_result = None
1295 best_score = 0.0
1297 for delim in candidates:
1298 if len(delim) == 0: 1298 ↛ 1299line 1298 didn't jump to line 1299 because the condition on line 1298 was never true
1299 continue
1301 count = data.count(delim)
1302 if count < 2:
1303 continue
1305 # Calculate score based on frequency and regularity
1306 positions = []
1307 pos = 0
1308 while True:
1309 pos = data.find(delim, pos)
1310 if pos == -1:
1311 break
1312 positions.append(pos)
1313 pos += len(delim)
1315 if len(positions) < 2: 1315 ↛ 1316line 1315 didn't jump to line 1316 because the condition on line 1315 was never true
1316 continue
1318 # Calculate interval regularity
1319 intervals = [positions[i + 1] - positions[i] for i in range(len(positions) - 1)]
1320 if len(intervals) > 0: 1320 ↛ 1329line 1320 didn't jump to line 1329 because the condition on line 1320 was always true
1321 mean_interval = sum(intervals) / len(intervals)
1322 if mean_interval > 0: 1322 ↛ 1327line 1322 didn't jump to line 1327 because the condition on line 1322 was always true
1323 variance = sum((x - mean_interval) ** 2 for x in intervals) / len(intervals)
1324 cv = (variance**0.5) / mean_interval if mean_interval > 0 else 1.0
1325 regularity = 1.0 / (1.0 + cv)
1326 else:
1327 regularity = 0.0
1328 else:
1329 regularity = 0.0
1331 # Score combines frequency and regularity
1332 score = count * (0.5 + 0.5 * regularity)
1334 if score > best_score:
1335 best_score = score
1336 best_result = DelimiterResult(
1337 delimiter=delim,
1338 delimiter_type="fixed",
1339 confidence=min(1.0, regularity * 0.8 + 0.2 * min(1.0, count / 10)),
1340 occurrences=count,
1341 positions=positions,
1342 )
1344 if best_result is None:
1345 return DelimiterResult(
1346 delimiter=b"",
1347 delimiter_type="fixed",
1348 confidence=0.0,
1349 occurrences=0,
1350 )
1352 return best_result
1355def detect_length_prefix(
1356 payloads: Sequence[bytes],
1357 max_length_bytes: int = 4,
1358) -> LengthPrefixResult:
1359 """Detect length-prefixed message format.
1361 Implements RE-PAY-003: Length prefix detection.
1363 Args:
1364 payloads: List of payload samples.
1365 max_length_bytes: Maximum length field size to test.
1367 Returns:
1368 LengthPrefixResult with detected format.
1370 Example:
1371 >>> result = detect_length_prefix(payloads)
1372 >>> if result.detected:
1373 ... print(f"Length field: {result.length_bytes} bytes, {result.endian}")
1374 """
1375 if not payloads: 1375 ↛ 1376line 1375 didn't jump to line 1376 because the condition on line 1375 was never true
1376 return LengthPrefixResult(detected=False)
1378 # Concatenate payloads for analysis
1379 data = b"".join(payloads)
1381 best_result = LengthPrefixResult(detected=False)
1382 best_score = 0.0
1384 # Try different length field sizes and offsets
1385 # IMPORTANT: Prefer larger length_bytes values when scores are equal
1386 # by iterating in reverse order (4, 2, 1) and using >= for comparison
1387 for length_bytes in [4, 2, 1]:
1388 if length_bytes > max_length_bytes: 1388 ↛ 1389line 1388 didn't jump to line 1389 because the condition on line 1388 was never true
1389 continue
1391 for endian_str in ["big", "little"]:
1392 endian: Literal["big", "little"] = endian_str # type: ignore[assignment]
1393 for offset in range(min(8, len(data) - length_bytes)):
1394 for includes_length in [False, True]:
1395 score, matches = _test_length_prefix(
1396 data, length_bytes, endian, offset, includes_length
1397 )
1399 # Use > to prefer larger length_bytes (tested first) when scores are equal
1400 if score > best_score and matches >= 3:
1401 best_score = score
1402 best_result = LengthPrefixResult(
1403 detected=True,
1404 length_bytes=length_bytes,
1405 endian=endian,
1406 offset=offset,
1407 includes_length=includes_length,
1408 confidence=score,
1409 )
1411 return best_result
1414def find_message_boundaries(
1415 payloads: Sequence[bytes] | bytes,
1416 delimiter: bytes | DelimiterResult | None = None,
1417 length_prefix: LengthPrefixResult | None = None,
1418) -> list[MessageBoundary]:
1419 """Find message boundaries in payload data.
1421 Implements RE-PAY-003: Message boundary detection.
1423 Args:
1424 payloads: Payload data or list of payloads.
1425 delimiter: Delimiter to use (auto-detect if None).
1426 length_prefix: Length prefix format (test if None).
1428 Returns:
1429 List of MessageBoundary objects.
1431 Example:
1432 >>> boundaries = find_message_boundaries(data)
1433 >>> for b in boundaries:
1434 ... print(f"Message {b.index}: {b.length} bytes")
1435 """
1436 # Combine payloads if list
1437 if isinstance(payloads, list | tuple): 1437 ↛ 1438line 1437 didn't jump to line 1438 because the condition on line 1437 was never true
1438 data: bytes = b"".join(payloads)
1439 else:
1440 # Type narrowing: payloads is bytes here
1441 data = cast("bytes", payloads)
1443 if not data: 1443 ↛ 1444line 1443 didn't jump to line 1444 because the condition on line 1443 was never true
1444 return []
1446 boundaries = []
1448 # Try length prefix first
1449 if length_prefix is None:
1450 length_prefix = detect_length_prefix([data] if isinstance(data, bytes) else list(payloads))
1452 if length_prefix.detected:
1453 boundaries = _extract_length_prefixed_messages(data, length_prefix)
1454 if len(boundaries) > 0: 1454 ↛ 1458line 1454 didn't jump to line 1458 because the condition on line 1454 was always true
1455 return boundaries
1457 # Fall back to delimiter
1458 if delimiter is None:
1459 delimiter = detect_delimiter(data)
1461 if isinstance(delimiter, DelimiterResult):
1462 delim = delimiter.delimiter
1463 else:
1464 delim = delimiter
1466 if not delim: 1466 ↛ 1468line 1466 didn't jump to line 1468 because the condition on line 1466 was never true
1467 # No delimiter found, return whole data as one message
1468 return [MessageBoundary(start=0, end=len(data), length=len(data), data=data, index=0)]
1470 # Split by delimiter
1471 parts = data.split(delim)
1472 current_offset = 0
1474 for _i, part in enumerate(parts):
1475 if part: # Skip empty parts
1476 boundaries.append(
1477 MessageBoundary(
1478 start=current_offset,
1479 end=current_offset + len(part),
1480 length=len(part),
1481 data=part,
1482 index=len(boundaries),
1483 )
1484 )
1485 current_offset += len(part) + len(delim)
1487 return boundaries
1490def segment_messages(
1491 payloads: Sequence[bytes] | bytes,
1492 delimiter: bytes | None = None,
1493 length_prefix: LengthPrefixResult | None = None,
1494) -> list[bytes]:
1495 """Segment stream into individual messages.
1497 Implements RE-PAY-003: Message segmentation.
1499 Args:
1500 payloads: Payload data or list of payloads.
1501 delimiter: Delimiter to use (auto-detect if None).
1502 length_prefix: Length prefix format (auto-detect if None).
1504 Returns:
1505 List of message bytes.
1506 """
1507 boundaries = find_message_boundaries(payloads, delimiter, length_prefix)
1508 return [b.data for b in boundaries]
1511def diff_payloads(payload_a: bytes, payload_b: bytes) -> PayloadDiff:
1512 """Compare two payloads and identify differences.
1514 Implements RE-PAY-005: Payload differential analysis.
1516 Args:
1517 payload_a: First payload.
1518 payload_b: Second payload.
1520 Returns:
1521 PayloadDiff with comparison results.
1523 Example:
1524 >>> diff = diff_payloads(pkt1.data, pkt2.data)
1525 >>> print(f"Common prefix: {diff.common_prefix_length} bytes")
1526 >>> print(f"Different bytes: {len(diff.differences)}")
1527 """
1528 # Find common prefix
1529 common_prefix = 0
1530 min_len = min(len(payload_a), len(payload_b))
1531 for i in range(min_len):
1532 if payload_a[i] == payload_b[i]:
1533 common_prefix += 1
1534 else:
1535 break
1537 # Find common suffix
1538 common_suffix = 0
1539 for i in range(1, min_len - common_prefix + 1):
1540 if payload_a[-i] == payload_b[-i]:
1541 common_suffix += 1
1542 else:
1543 break
1545 # Find all differences
1546 differences = []
1547 for i in range(min_len):
1548 if payload_a[i] != payload_b[i]:
1549 differences.append((i, payload_a[i], payload_b[i]))
1551 # Add length differences
1552 if len(payload_a) > len(payload_b):
1553 for i in range(len(payload_b), len(payload_a)):
1554 differences.append((i, payload_a[i], -1))
1555 elif len(payload_b) > len(payload_a):
1556 for i in range(len(payload_a), len(payload_b)):
1557 differences.append((i, -1, payload_b[i]))
1559 # Calculate similarity
1560 max_len = max(len(payload_a), len(payload_b))
1561 if max_len == 0:
1562 similarity = 1.0
1563 else:
1564 matching = min_len - len([d for d in differences if d[0] < min_len])
1565 similarity = matching / max_len
1567 # Calculate edit distance (simplified Levenshtein)
1568 edit_distance = _levenshtein_distance(payload_a, payload_b)
1570 return PayloadDiff(
1571 common_prefix_length=common_prefix,
1572 common_suffix_length=common_suffix,
1573 differences=differences,
1574 similarity=similarity,
1575 edit_distance=edit_distance,
1576 )
1579def find_common_bytes(payloads: Sequence[bytes]) -> bytes:
1580 """Find common prefix across all payloads.
1582 Implements RE-PAY-005: Common byte analysis.
1584 Args:
1585 payloads: List of payloads to analyze.
1587 Returns:
1588 Common prefix bytes.
1589 """
1590 if not payloads: 1590 ↛ 1591line 1590 didn't jump to line 1591 because the condition on line 1590 was never true
1591 return b""
1593 if len(payloads) == 1: 1593 ↛ 1594line 1593 didn't jump to line 1594 because the condition on line 1593 was never true
1594 return payloads[0]
1596 # Find minimum length
1597 min_len = min(len(p) for p in payloads)
1599 # Find common prefix
1600 common = bytearray()
1601 for i in range(min_len): 1601 ↛ 1608line 1601 didn't jump to line 1608 because the loop on line 1601 didn't complete
1602 byte = payloads[0][i]
1603 if all(p[i] == byte for p in payloads):
1604 common.append(byte)
1605 else:
1606 break
1608 return bytes(common)
1611def find_variable_positions(payloads: Sequence[bytes]) -> VariablePositions:
1612 """Identify which byte positions vary across payloads.
1614 Implements RE-PAY-005: Variable position detection.
1616 Args:
1617 payloads: List of payloads to analyze.
1619 Returns:
1620 VariablePositions with constant and variable position info.
1622 Example:
1623 >>> result = find_variable_positions(payloads)
1624 >>> print(f"Constant positions: {result.constant_positions}")
1625 >>> print(f"Variable positions: {result.variable_positions}")
1626 """
1627 if not payloads: 1627 ↛ 1628line 1627 didn't jump to line 1628 because the condition on line 1627 was never true
1628 return VariablePositions(
1629 constant_positions=[],
1630 variable_positions=[],
1631 constant_values={},
1632 variance_by_position=np.array([]),
1633 )
1635 # Use shortest payload length
1636 min_len = min(len(p) for p in payloads)
1638 constant_positions = []
1639 variable_positions = []
1640 constant_values = {}
1641 variances = []
1643 for i in range(min_len):
1644 values = [p[i] for p in payloads]
1645 unique = set(values)
1647 if len(unique) == 1:
1648 constant_positions.append(i)
1649 constant_values[i] = values[0]
1650 variances.append(0.0)
1651 else:
1652 variable_positions.append(i)
1653 variances.append(float(np.var(values)))
1655 return VariablePositions(
1656 constant_positions=constant_positions,
1657 variable_positions=variable_positions,
1658 constant_values=constant_values,
1659 variance_by_position=np.array(variances),
1660 )
1663def compute_similarity(
1664 payload_a: bytes,
1665 payload_b: bytes,
1666 metric: Literal["levenshtein", "hamming", "jaccard"] = "levenshtein",
1667) -> float:
1668 """Compute similarity between two payloads.
1670 Implements RE-PAY-005: Similarity computation.
1672 Args:
1673 payload_a: First payload.
1674 payload_b: Second payload.
1675 metric: Similarity metric to use.
1677 Returns:
1678 Similarity score (0-1).
1679 """
1680 if metric == "levenshtein": 1680 ↛ 1687line 1680 didn't jump to line 1687 because the condition on line 1680 was always true
1681 max_len = max(len(payload_a), len(payload_b))
1682 if max_len == 0: 1682 ↛ 1683line 1682 didn't jump to line 1683 because the condition on line 1682 was never true
1683 return 1.0
1684 distance = _levenshtein_distance(payload_a, payload_b)
1685 return 1.0 - (distance / max_len)
1687 elif metric == "hamming":
1688 if len(payload_a) != len(payload_b):
1689 # Pad shorter one
1690 max_len = max(len(payload_a), len(payload_b))
1691 payload_a = payload_a.ljust(max_len, b"\x00")
1692 payload_b = payload_b.ljust(max_len, b"\x00")
1694 matches = sum(a == b for a, b in zip(payload_a, payload_b, strict=True))
1695 return matches / len(payload_a) if payload_a else 1.0
1697 # metric == "jaccard"
1698 # Treat bytes as sets
1699 set_a = set(payload_a)
1700 set_b = set(payload_b)
1701 intersection = len(set_a & set_b)
1702 union = len(set_a | set_b)
1703 return intersection / union if union > 0 else 1.0
1706def cluster_payloads(
1707 payloads: Sequence[bytes],
1708 threshold: float = 0.8,
1709 algorithm: Literal["greedy", "dbscan"] = "greedy",
1710) -> list[PayloadCluster]:
1711 """Cluster similar payloads together.
1713 Implements RE-PAY-005: Payload clustering.
1715 Args:
1716 payloads: List of payloads to cluster.
1717 threshold: Similarity threshold for clustering.
1718 algorithm: Clustering algorithm.
1720 Returns:
1721 List of PayloadCluster objects.
1723 Example:
1724 >>> clusters = cluster_payloads(payloads, threshold=0.85)
1725 >>> for c in clusters:
1726 ... print(f"Cluster {c.cluster_id}: {c.size} payloads")
1727 """
1728 if not payloads: 1728 ↛ 1729line 1728 didn't jump to line 1729 because the condition on line 1728 was never true
1729 return []
1731 if algorithm == "greedy": 1731 ↛ 1734line 1731 didn't jump to line 1734 because the condition on line 1731 was always true
1732 return _cluster_greedy_optimized(payloads, threshold)
1733 # algorithm == "dbscan"
1734 return _cluster_dbscan(payloads, threshold)
1737def correlate_request_response(
1738 requests: Sequence[PayloadInfo],
1739 responses: Sequence[PayloadInfo],
1740 max_delay: float = 1.0,
1741) -> list[tuple[PayloadInfo, PayloadInfo, float]]:
1742 """Correlate request payloads with responses.
1744 Implements RE-PAY-005: Request-response correlation.
1746 Args:
1747 requests: List of request PayloadInfo.
1748 responses: List of response PayloadInfo.
1749 max_delay: Maximum time between request and response.
1751 Returns:
1752 List of (request, response, latency) tuples.
1753 """
1754 pairs = []
1756 for request in requests:
1757 if request.timestamp is None: 1757 ↛ 1758line 1757 didn't jump to line 1758 because the condition on line 1757 was never true
1758 continue
1760 best_response = None
1761 best_latency = float("inf")
1763 for response in responses:
1764 if response.timestamp is None: 1764 ↛ 1765line 1764 didn't jump to line 1765 because the condition on line 1764 was never true
1765 continue
1767 latency = response.timestamp - request.timestamp
1768 if 0 <= latency <= max_delay and latency < best_latency:
1769 best_response = response
1770 best_latency = latency
1772 if best_response is not None: 1772 ↛ 1756line 1772 didn't jump to line 1756 because the condition on line 1772 was always true
1773 pairs.append((request, best_response, best_latency))
1775 return pairs
1778# =============================================================================
1779# Helper functions
1780# =============================================================================
1783def _find_pattern_in_data(
1784 data: bytes,
1785 pattern: bytes | str,
1786 pattern_type: str,
1787) -> list[tuple[int, bytes]]:
1788 """Find pattern occurrences in data."""
1789 matches = []
1791 if pattern_type == "exact":
1792 if isinstance(pattern, str): 1792 ↛ 1793line 1792 didn't jump to line 1793 because the condition on line 1792 was never true
1793 pattern = pattern.encode()
1794 pos = 0
1795 while True:
1796 pos = data.find(pattern, pos)
1797 if pos == -1:
1798 break
1799 matches.append((pos, pattern))
1800 pos += 1
1802 elif pattern_type == "wildcard": 1802 ↛ 1804line 1802 didn't jump to line 1804 because the condition on line 1802 was never true
1803 # Convert wildcard pattern to regex
1804 if isinstance(pattern, bytes):
1805 # Replace ?? with . for single byte match
1806 regex_pattern = pattern.replace(b"??", b".")
1807 try:
1808 for match in re.finditer(regex_pattern, data, re.DOTALL):
1809 matches.append((match.start(), match.group()))
1810 except re.error:
1811 pass
1813 elif pattern_type == "regex": 1813 ↛ 1822line 1813 didn't jump to line 1822 because the condition on line 1813 was always true
1814 if isinstance(pattern, str): 1814 ↛ 1815line 1814 didn't jump to line 1815 because the condition on line 1814 was never true
1815 pattern = pattern.encode()
1816 try:
1817 for match in re.finditer(pattern, data, re.DOTALL):
1818 matches.append((match.start(), match.group()))
1819 except re.error:
1820 pass
1822 return matches
1825def _test_length_prefix(
1826 data: bytes,
1827 length_bytes: int,
1828 endian: str,
1829 offset: int,
1830 includes_length: bool,
1831) -> tuple[float, int]:
1832 """Test if data follows a length-prefix pattern."""
1833 matches = 0
1834 pos = 0
1836 while pos + offset + length_bytes <= len(data):
1837 # Read length field
1838 length_data = data[pos + offset : pos + offset + length_bytes]
1839 if endian == "big":
1840 length = int.from_bytes(length_data, "big")
1841 else:
1842 length = int.from_bytes(length_data, "little")
1844 if includes_length:
1845 expected_end = pos + length
1846 else:
1847 expected_end = pos + offset + length_bytes + length
1849 # Check if this makes sense
1850 if 0 < length < 65536 and expected_end <= len(data):
1851 matches += 1
1852 pos = expected_end
1853 else:
1854 break
1856 # Score based on matches and coverage
1857 coverage = pos / len(data) if len(data) > 0 else 0
1858 score = min(1.0, matches / 5) * coverage
1860 return score, matches
1863def _extract_length_prefixed_messages(
1864 data: bytes,
1865 length_prefix: LengthPrefixResult,
1866) -> list[MessageBoundary]:
1867 """Extract messages using detected length prefix format."""
1868 boundaries = []
1869 pos = 0
1870 index = 0
1872 while pos + length_prefix.offset + length_prefix.length_bytes <= len(data):
1873 # Read length
1874 length_data = data[
1875 pos + length_prefix.offset : pos + length_prefix.offset + length_prefix.length_bytes
1876 ]
1877 if length_prefix.endian == "big": 1877 ↛ 1880line 1877 didn't jump to line 1880 because the condition on line 1877 was always true
1878 length = int.from_bytes(length_data, "big")
1879 else:
1880 length = int.from_bytes(length_data, "little")
1882 if length_prefix.includes_length: 1882 ↛ 1883line 1882 didn't jump to line 1883 because the condition on line 1882 was never true
1883 end = pos + length
1884 else:
1885 end = pos + length_prefix.offset + length_prefix.length_bytes + length
1887 if end > len(data) or length <= 0: 1887 ↛ 1888line 1887 didn't jump to line 1888 because the condition on line 1887 was never true
1888 break
1890 msg_data = data[pos:end]
1891 boundaries.append(
1892 MessageBoundary(
1893 start=pos,
1894 end=end,
1895 length=end - pos,
1896 data=msg_data,
1897 index=index,
1898 )
1899 )
1901 pos = end
1902 index += 1
1904 return boundaries
1907def _levenshtein_distance(a: bytes, b: bytes) -> int:
1908 """Calculate Levenshtein edit distance between two byte sequences."""
1909 if len(a) < len(b):
1910 return _levenshtein_distance(b, a)
1912 if len(b) == 0:
1913 return len(a)
1915 previous_row: list[int] = list(range(len(b) + 1))
1916 for i, c1 in enumerate(a):
1917 current_row = [i + 1]
1918 for j, c2 in enumerate(b):
1919 insertions = previous_row[j + 1] + 1
1920 deletions = current_row[j] + 1
1921 substitutions = previous_row[j] + (c1 != c2)
1922 current_row.append(min(insertions, deletions, substitutions))
1923 previous_row = current_row
1925 return previous_row[-1]
1928def _fast_similarity(payload_a: bytes, payload_b: bytes, threshold: float) -> float | None:
1929 """Fast similarity check with early termination.
1931 Uses length-based filtering and sampling to quickly reject dissimilar payloads.
1932 Returns None if payloads are likely similar (needs full check),
1933 or a similarity value if they can be quickly determined.
1935 Args:
1936 payload_a: First payload.
1937 payload_b: Second payload.
1938 threshold: Similarity threshold for clustering.
1940 Returns:
1941 Similarity value if quickly determined, None if full check needed.
1942 """
1943 len_a = len(payload_a)
1944 len_b = len(payload_b)
1946 # Empty payloads
1947 if len_a == 0 and len_b == 0: 1947 ↛ 1948line 1947 didn't jump to line 1948 because the condition on line 1947 was never true
1948 return 1.0
1949 if len_a == 0 or len_b == 0: 1949 ↛ 1950line 1949 didn't jump to line 1950 because the condition on line 1949 was never true
1950 return 0.0
1952 # Length difference filter: if lengths differ by more than (1-threshold)*max_len,
1953 # similarity can't exceed threshold
1954 max_len = max(len_a, len_b)
1955 min_len = min(len_a, len_b)
1956 _length_diff = max_len - min_len
1958 # Maximum possible similarity given length difference
1959 max_possible_similarity = min_len / max_len
1960 if max_possible_similarity < threshold: 1960 ↛ 1961line 1960 didn't jump to line 1961 because the condition on line 1960 was never true
1961 return max_possible_similarity
1963 # For same-length payloads, use fast hamming similarity
1964 if len_a == len_b: 1964 ↛ 2001line 1964 didn't jump to line 2001 because the condition on line 1964 was always true
1965 # Sample comparison for large payloads
1966 if len_a > 50: 1966 ↛ 1968line 1966 didn't jump to line 1968 because the condition on line 1966 was never true
1967 # Sample first 16, last 16, and some middle bytes
1968 sample_size = min(48, len_a)
1969 mismatches = 0
1971 # First 16 bytes
1972 for i in range(min(16, len_a)):
1973 if payload_a[i] != payload_b[i]:
1974 mismatches += 1
1976 # Last 16 bytes
1977 for i in range(1, min(17, len_a + 1)):
1978 if payload_a[-i] != payload_b[-i]:
1979 mismatches += 1
1981 # Middle samples
1982 if len_a > 32:
1983 step = (len_a - 32) // 16
1984 if step > 0:
1985 for i in range(16, len_a - 16, step):
1986 if payload_a[i] != payload_b[i]:
1987 mismatches += 1
1989 # Estimate similarity from sample
1990 estimated_similarity = 1.0 - (mismatches / sample_size)
1992 # If sample shows very low similarity, reject early
1993 if estimated_similarity < threshold * 0.8:
1994 return estimated_similarity
1996 # Full hamming comparison for same-length payloads (faster than Levenshtein)
1997 matches = sum(a == b for a, b in zip(payload_a, payload_b, strict=True))
1998 return matches / len_a
2000 # For different-length payloads, use common prefix/suffix heuristic
2001 common_prefix = 0
2002 for i in range(min_len):
2003 if payload_a[i] == payload_b[i]:
2004 common_prefix += 1
2005 else:
2006 break
2008 common_suffix = 0
2009 for i in range(1, min_len - common_prefix + 1):
2010 if payload_a[-i] == payload_b[-i]:
2011 common_suffix += 1
2012 else:
2013 break
2015 # Estimate similarity from prefix/suffix
2016 common_bytes = common_prefix + common_suffix
2017 estimated_similarity = common_bytes / max_len
2019 # If common bytes suggest low similarity, reject
2020 if estimated_similarity < threshold * 0.7:
2021 return estimated_similarity
2023 # Need full comparison
2024 return None
2027def _cluster_greedy_optimized(
2028 payloads: Sequence[bytes],
2029 threshold: float,
2030) -> list[PayloadCluster]:
2031 """Optimized greedy clustering algorithm.
2033 Uses fast pre-filtering based on length and sampling to avoid
2034 expensive Levenshtein distance calculations when possible.
2036 Args:
2037 payloads: List of payloads to cluster.
2038 threshold: Similarity threshold for clustering.
2040 Returns:
2041 List of PayloadCluster objects.
2042 """
2043 clusters: list[PayloadCluster] = []
2044 assigned = [False] * len(payloads)
2046 # Precompute lengths for fast filtering
2047 lengths = [len(p) for p in payloads]
2049 for i, payload in enumerate(payloads):
2050 if assigned[i]:
2051 continue
2053 # Start new cluster
2054 cluster_payloads = [payload]
2055 cluster_indices = [i]
2056 assigned[i] = True
2058 payload_len = lengths[i]
2060 # Find similar payloads
2061 for j in range(i + 1, len(payloads)):
2062 if assigned[j]: 2062 ↛ 2063line 2062 didn't jump to line 2063 because the condition on line 2062 was never true
2063 continue
2065 other_len = lengths[j]
2067 # Quick length-based rejection
2068 max_len = max(payload_len, other_len)
2069 min_len = min(payload_len, other_len)
2070 if min_len / max_len < threshold: 2070 ↛ 2071line 2070 didn't jump to line 2071 because the condition on line 2070 was never true
2071 continue
2073 # Try fast similarity check first
2074 fast_result = _fast_similarity(payload, payloads[j], threshold)
2076 if fast_result is not None: 2076 ↛ 2080line 2076 didn't jump to line 2080 because the condition on line 2076 was always true
2077 similarity = fast_result
2078 else:
2079 # Fall back to Levenshtein for uncertain cases
2080 similarity = compute_similarity(payload, payloads[j])
2082 if similarity >= threshold:
2083 cluster_payloads.append(payloads[j])
2084 cluster_indices.append(j)
2085 assigned[j] = True
2087 clusters.append(
2088 PayloadCluster(
2089 cluster_id=len(clusters),
2090 payloads=cluster_payloads,
2091 indices=cluster_indices,
2092 representative=payload,
2093 size=len(cluster_payloads),
2094 )
2095 )
2097 return clusters
2100def _cluster_greedy(
2101 payloads: Sequence[bytes],
2102 threshold: float,
2103) -> list[PayloadCluster]:
2104 """Greedy clustering algorithm (legacy, uses optimized version)."""
2105 return _cluster_greedy_optimized(payloads, threshold)
2108def _cluster_dbscan(
2109 payloads: Sequence[bytes],
2110 threshold: float,
2111) -> list[PayloadCluster]:
2112 """DBSCAN-style clustering (simplified)."""
2113 # For simplicity, fall back to greedy
2114 # Full DBSCAN would require scipy or custom implementation
2115 return _cluster_greedy_optimized(payloads, threshold)
2118__all__ = [
2119 "DelimiterResult",
2120 "FieldInferrer",
2121 # RE-PAY-004: Field inference
2122 "InferredField",
2123 "LengthPrefixResult",
2124 "MessageBoundary",
2125 "MessageSchema",
2126 "PatternMatch",
2127 "PayloadCluster",
2128 "PayloadDiff",
2129 # Classes
2130 "PayloadExtractor",
2131 # Data classes
2132 "PayloadInfo",
2133 "VariablePositions",
2134 "cluster_payloads",
2135 "compute_similarity",
2136 "correlate_request_response",
2137 # RE-PAY-003: Delimiter detection
2138 "detect_delimiter",
2139 "detect_field_types",
2140 "detect_length_prefix",
2141 # RE-PAY-005: Comparison
2142 "diff_payloads",
2143 "filter_by_pattern",
2144 "find_checksum_fields",
2145 "find_common_bytes",
2146 "find_message_boundaries",
2147 "find_sequence_fields",
2148 "find_variable_positions",
2149 "infer_fields",
2150 # RE-PAY-001: Extraction
2151 # (via PayloadExtractor methods)
2152 # RE-PAY-002: Pattern search
2153 "search_pattern",
2154 "search_patterns",
2155 "segment_messages",
2156]