Coverage for src / tracekit / inference / stream.py: 94%
304 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Stream reassembly and message framing for network protocols.
3 - RE-STR-001: UDP Stream Reconstruction
4 - RE-STR-002: TCP Stream Reassembly
5 - RE-STR-003: Message Framing and Segmentation
7This module provides tools for reconstructing application-layer data from
8transport-layer segments, handling out-of-order delivery, gaps, and
9message boundaries.
10"""
12from __future__ import annotations
14from collections import defaultdict
15from collections.abc import Callable, Sequence
16from dataclasses import dataclass, field
17from typing import Any, Literal
20@dataclass
21class StreamSegment:
22 """A segment of stream data.
24 Implements RE-STR-001, RE-STR-002: Stream segment.
26 Attributes:
27 sequence_number: Sequence number (TCP) or packet number (UDP).
28 data: Segment payload.
29 timestamp: Capture timestamp.
30 src: Source address.
31 dst: Destination address.
32 flags: Protocol flags.
33 is_retransmit: Whether this is a retransmission.
34 """
36 sequence_number: int
37 data: bytes
38 timestamp: float = 0.0
39 src: str = ""
40 dst: str = ""
41 flags: int = 0
42 is_retransmit: bool = False
45@dataclass
46class ReassembledStream:
47 """A fully reassembled stream.
49 Implements RE-STR-001, RE-STR-002: Reassembled stream.
51 Attributes:
52 data: Complete reassembled data.
53 src: Source address.
54 dst: Destination address.
55 start_time: Stream start time.
56 end_time: Stream end time.
57 segments: Number of segments.
58 gaps: List of (start, end) gap ranges.
59 retransmits: Number of retransmissions detected.
60 out_of_order: Number of out-of-order segments.
61 """
63 data: bytes
64 src: str
65 dst: str
66 start_time: float
67 end_time: float
68 segments: int
69 gaps: list[tuple[int, int]] = field(default_factory=list)
70 retransmits: int = 0
71 out_of_order: int = 0
74@dataclass
75class MessageFrame:
76 """A framed message from stream data.
78 Implements RE-STR-003: Message frame.
80 Attributes:
81 data: Message data.
82 offset: Offset in stream.
83 length: Message length.
84 frame_type: Detected frame type.
85 is_complete: Whether message is complete.
86 sequence: Message sequence number if detected.
87 """
89 data: bytes
90 offset: int
91 length: int
92 frame_type: str = "unknown"
93 is_complete: bool = True
94 sequence: int | None = None
97@dataclass
98class FramingResult:
99 """Result of message framing.
101 Implements RE-STR-003: Framing result.
103 Attributes:
104 messages: List of extracted messages.
105 framing_type: Detected framing type.
106 delimiter: Detected delimiter if applicable.
107 length_field_offset: Length field offset if applicable.
108 length_field_size: Length field size if applicable.
109 remaining: Unframed bytes at end.
110 """
112 messages: list[MessageFrame]
113 framing_type: str
114 delimiter: bytes | None = None
115 length_field_offset: int | None = None
116 length_field_size: int | None = None
117 remaining: bytes = b""
120class UDPStreamReassembler:
121 """Reassemble UDP datagram streams.
123 Implements RE-STR-001: UDP Stream Reconstruction.
125 UDP doesn't guarantee order, so this reassembler orders datagrams
126 by sequence number or timestamp and handles gaps.
128 Example:
129 >>> reassembler = UDPStreamReassembler()
130 >>> for packet in packets:
131 ... reassembler.add_segment(packet)
132 >>> stream = reassembler.get_stream()
133 """
135 def __init__(
136 self,
137 sequence_key: Callable[[Any], int] | None = None,
138 max_gap: int = 1000,
139 ) -> None:
140 """Initialize UDP reassembler.
142 Args:
143 sequence_key: Function to extract sequence number from packet.
144 max_gap: Maximum sequence gap before treating as new stream.
145 """
146 self.sequence_key = sequence_key
147 self.max_gap = max_gap
148 self._segments: dict[str, list[StreamSegment]] = defaultdict(list)
150 def add_segment(
151 self,
152 packet: dict[str, Any] | bytes,
153 flow_key: str | None = None,
154 ) -> None:
155 """Add a UDP datagram to the reassembler.
157 Args:
158 packet: Packet data or dict with metadata.
159 flow_key: Optional flow identifier.
160 """
161 if isinstance(packet, bytes):
162 segment = StreamSegment(
163 sequence_number=len(self._segments.get(flow_key or "default", [])),
164 data=packet,
165 )
166 else:
167 seq = 0
168 if self.sequence_key is not None:
169 try:
170 seq = self.sequence_key(packet)
171 except (KeyError, TypeError):
172 pass
174 segment = StreamSegment(
175 sequence_number=seq,
176 data=packet.get("data", packet.get("payload", b"")),
177 timestamp=packet.get("timestamp", 0.0),
178 src=packet.get("src", packet.get("src_ip", "")),
179 dst=packet.get("dst", packet.get("dst_ip", "")),
180 )
182 key = flow_key or f"{segment.src}-{segment.dst}"
183 self._segments[key].append(segment)
185 def get_stream(self, flow_key: str | None = None) -> ReassembledStream:
186 """Get reassembled stream for a flow.
188 Implements RE-STR-001: UDP stream reconstruction.
190 Args:
191 flow_key: Flow identifier.
193 Returns:
194 ReassembledStream with ordered data.
195 """
196 if flow_key is None:
197 # Get first/only flow
198 if not self._segments:
199 return ReassembledStream(
200 data=b"",
201 src="",
202 dst="",
203 start_time=0.0,
204 end_time=0.0,
205 segments=0,
206 )
207 flow_key = next(iter(self._segments.keys()))
209 segments = self._segments.get(flow_key, [])
210 if not segments: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 return ReassembledStream(
212 data=b"",
213 src="",
214 dst="",
215 start_time=0.0,
216 end_time=0.0,
217 segments=0,
218 )
220 # Sort by sequence number
221 sorted_segments = sorted(segments, key=lambda s: s.sequence_number)
223 # Concatenate data
224 data = b"".join(s.data for s in sorted_segments)
226 # Detect out-of-order: count segments that arrived after a higher sequence number
227 out_of_order = 0
228 max_seq_seen = -1
229 for segment in segments:
230 if segment.sequence_number < max_seq_seen:
231 out_of_order += 1
232 max_seq_seen = max(max_seq_seen, segment.sequence_number)
234 # Detect gaps
235 gaps = []
236 for i in range(1, len(sorted_segments)):
237 expected = sorted_segments[i - 1].sequence_number + len(sorted_segments[i - 1].data)
238 actual = sorted_segments[i].sequence_number
239 if actual > expected:
240 gaps.append((expected, actual))
242 timestamps = [s.timestamp for s in sorted_segments if s.timestamp > 0]
244 return ReassembledStream(
245 data=data,
246 src=sorted_segments[0].src if sorted_segments else "",
247 dst=sorted_segments[0].dst if sorted_segments else "",
248 start_time=min(timestamps) if timestamps else 0.0,
249 end_time=max(timestamps) if timestamps else 0.0,
250 segments=len(sorted_segments),
251 gaps=gaps,
252 retransmits=0,
253 out_of_order=out_of_order,
254 )
256 def get_all_streams(self) -> dict[str, ReassembledStream]:
257 """Get all reassembled streams.
259 Returns:
260 Dictionary mapping flow keys to streams.
261 """
262 return {key: self.get_stream(key) for key in self._segments}
264 def clear(self) -> None:
265 """Clear all segments."""
266 self._segments.clear()
269class TCPStreamReassembler:
270 """Reassemble TCP byte streams.
272 Implements RE-STR-002: TCP Stream Reassembly.
274 Handles TCP sequence numbers, retransmissions, and ordering
275 to reconstruct the original byte stream.
277 Example:
278 >>> reassembler = TCPStreamReassembler()
279 >>> for segment in tcp_segments:
280 ... reassembler.add_segment(segment)
281 >>> stream = reassembler.get_stream()
282 """
284 def __init__(
285 self,
286 initial_sequence: int | None = None,
287 max_buffer_size: int = 10 * 1024 * 1024,
288 ) -> None:
289 """Initialize TCP reassembler.
291 Args:
292 initial_sequence: Initial sequence number (auto-detect if None).
293 max_buffer_size: Maximum buffer size in bytes.
294 """
295 self.initial_sequence = initial_sequence
296 self.max_buffer_size = max_buffer_size
298 self._segments: dict[str, list[StreamSegment]] = defaultdict(list)
299 self._isn: dict[str, int | None] = {} # Initial sequence numbers
300 self._seen_seqs: dict[str, set[int]] = defaultdict(set) # Track seen sequence numbers
302 def add_segment(
303 self,
304 segment: dict[str, Any] | StreamSegment,
305 flow_key: str | None = None,
306 ) -> None:
307 """Add a TCP segment to the reassembler.
309 Args:
310 segment: TCP segment data or StreamSegment.
311 flow_key: Optional flow identifier.
312 """
313 if isinstance(segment, dict):
314 seq_num = segment.get("seq") or segment.get("sequence_number") or 0
315 seg_data = segment.get("data") or segment.get("payload") or b""
316 seg = StreamSegment(
317 sequence_number=seq_num,
318 data=seg_data if isinstance(seg_data, bytes) else b"",
319 timestamp=segment.get("timestamp", 0.0),
320 src=segment.get("src", ""),
321 dst=segment.get("dst", ""),
322 flags=segment.get("flags", 0),
323 )
324 else:
325 seg = segment
327 key = flow_key or f"{seg.src}-{seg.dst}"
329 # Detect initial sequence number (SYN)
330 if key not in self._isn or self._isn[key] is None:
331 if seg.flags & 0x02: # SYN flag
332 # SYN consumes one sequence number, so ISN+1 is first data byte
333 self._isn[key] = seg.sequence_number + 1
334 return # Don't store SYN itself
336 if self.initial_sequence is not None:
337 self._isn[key] = self.initial_sequence
338 else:
339 # Use first data segment's sequence as initial
340 self._isn[key] = seg.sequence_number
342 # Check for retransmit: same sequence number seen before WITH data
343 # Empty segments (ACK-only) shouldn't cause data segments to be marked as retransmits
344 if seg.sequence_number in self._seen_seqs[key] and seg.data:
345 # Check if there's already a segment with data at this sequence
346 has_data_at_seq = any(
347 s.sequence_number == seg.sequence_number and s.data for s in self._segments[key]
348 )
349 if has_data_at_seq: 349 ↛ 352line 349 didn't jump to line 352 because the condition on line 349 was always true
350 seg.is_retransmit = True
352 if seg.data: # Only track sequences with data
353 self._seen_seqs[key].add(seg.sequence_number)
355 self._segments[key].append(seg)
357 def get_stream(self, flow_key: str | None = None) -> ReassembledStream:
358 """Get reassembled TCP stream.
360 Implements RE-STR-002: TCP stream reassembly.
362 Args:
363 flow_key: Flow identifier.
365 Returns:
366 ReassembledStream with complete data.
367 """
368 if flow_key is None:
369 if not self._segments:
370 return ReassembledStream(
371 data=b"",
372 src="",
373 dst="",
374 start_time=0.0,
375 end_time=0.0,
376 segments=0,
377 )
378 flow_key = next(iter(self._segments.keys()))
380 segments = self._segments.get(flow_key, [])
381 if not segments: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 return ReassembledStream(
383 data=b"",
384 src="",
385 dst="",
386 start_time=0.0,
387 end_time=0.0,
388 segments=0,
389 )
391 isn = self._isn.get(flow_key, 0) or 0
393 # Count retransmits first (before filtering)
394 retransmits = sum(1 for seg in segments if seg.is_retransmit)
396 # If ISN wasn't detected via SYN, use minimum sequence number
397 if isn == 0 or isn > min(s.sequence_number for s in segments):
398 isn = min(s.sequence_number for s in segments)
400 # Detect out-of-order by checking arrival order vs sequence order
401 # Count segments that arrived before a segment with lower sequence
402 out_of_order = 0
403 for i, seg in enumerate(segments):
404 # Check if any earlier segment has higher sequence
405 for j in range(i):
406 if segments[j].sequence_number > seg.sequence_number:
407 out_of_order += 1
408 break
410 # Sort by relative sequence number
411 sorted_segments = sorted(segments, key=lambda s: (s.sequence_number - isn) % (2**32))
413 # Build stream handling overlaps and gaps
414 data_buffer = bytearray()
415 current_offset = 0
416 gaps = []
418 for seg in sorted_segments:
419 if seg.is_retransmit:
420 continue # Skip retransmits when building data
422 rel_seq = (seg.sequence_number - isn) % (2**32)
424 if rel_seq > current_offset:
425 # Gap detected
426 gaps.append((current_offset, rel_seq))
427 # Fill gap with zeros
428 data_buffer.extend(b"\x00" * (rel_seq - current_offset))
429 current_offset = rel_seq
431 if rel_seq < current_offset:
432 # Overlap - use only non-overlapping part
433 overlap = current_offset - rel_seq
434 if overlap < len(seg.data): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 data_buffer.extend(seg.data[overlap:])
436 current_offset += len(seg.data) - overlap
437 else:
438 data_buffer.extend(seg.data)
439 current_offset += len(seg.data)
441 timestamps = [s.timestamp for s in sorted_segments if s.timestamp > 0]
443 return ReassembledStream(
444 data=bytes(data_buffer),
445 src=sorted_segments[0].src if sorted_segments else "",
446 dst=sorted_segments[0].dst if sorted_segments else "",
447 start_time=min(timestamps) if timestamps else 0.0,
448 end_time=max(timestamps) if timestamps else 0.0,
449 segments=len(sorted_segments),
450 gaps=gaps,
451 retransmits=retransmits,
452 out_of_order=out_of_order,
453 )
455 def get_all_streams(self) -> dict[str, ReassembledStream]:
456 """Get all reassembled TCP streams."""
457 return {key: self.get_stream(key) for key in self._segments}
459 def clear(self) -> None:
460 """Clear all data."""
461 self._segments.clear()
462 self._isn.clear()
463 self._seen_seqs.clear()
466class MessageFramer:
467 """Extract framed messages from stream data.
469 Implements RE-STR-003: Message Framing and Segmentation.
471 Supports multiple framing methods: delimiter-based, length-prefixed,
472 and fixed-size.
474 Example:
475 >>> framer = MessageFramer(framing_type='delimiter', delimiter=b'\\r\\n')
476 >>> result = framer.frame(stream_data)
477 >>> for msg in result.messages:
478 ... print(msg.data)
479 """
481 def __init__(
482 self,
483 framing_type: Literal["delimiter", "length_prefix", "fixed", "auto"] = "auto",
484 delimiter: bytes | None = None,
485 length_field_offset: int = 0,
486 length_field_size: int = 2,
487 length_field_endian: Literal["big", "little"] = "big",
488 length_includes_header: bool = False,
489 fixed_size: int = 0,
490 ) -> None:
491 """Initialize message framer.
493 Args:
494 framing_type: Type of framing to use.
495 delimiter: Delimiter bytes for delimiter-based framing.
496 length_field_offset: Offset of length field.
497 length_field_size: Size of length field in bytes.
498 length_field_endian: Endianness of length field.
499 length_includes_header: Whether length includes header.
500 fixed_size: Fixed message size.
501 """
502 self.framing_type = framing_type
503 self.delimiter = delimiter
504 self.length_field_offset = length_field_offset
505 self.length_field_size = length_field_size
506 self.length_field_endian = length_field_endian
507 self.length_includes_header = length_includes_header
508 self.fixed_size = fixed_size
510 def frame(self, data: bytes) -> FramingResult:
511 """Extract framed messages from data.
513 Implements RE-STR-003: Message framing workflow.
515 Args:
516 data: Stream data to frame.
518 Returns:
519 FramingResult with extracted messages.
521 Example:
522 >>> result = framer.frame(stream_data)
523 >>> print(f"Found {len(result.messages)} messages")
524 """
525 if self.framing_type == "auto":
526 return self._auto_frame(data)
527 elif self.framing_type == "delimiter":
528 return self._frame_by_delimiter(data)
529 elif self.framing_type == "length_prefix":
530 return self._frame_by_length(data)
531 else: # framing_type == "fixed"
532 return self._frame_fixed(data)
534 def detect_framing(self, data: bytes) -> str:
535 """Detect framing type from data.
537 Implements RE-STR-003: Framing detection.
539 Args:
540 data: Sample data.
542 Returns:
543 Detected framing type string.
544 """
545 # Check for common delimiters
546 common_delimiters = [b"\r\n", b"\n", b"\x00", b"\r"]
547 for delim in common_delimiters:
548 count = data.count(delim)
549 if count >= 3:
550 # Check for regular spacing
551 parts = data.split(delim)
552 if parts and len({len(p) for p in parts if p}) <= 3: 552 ↛ 547line 552 didn't jump to line 547 because the condition on line 552 was always true
553 return "delimiter"
555 # Check for length-prefixed
556 if len(data) >= 4:
557 # Try big-endian 2-byte length
558 for offset in range(min(8, len(data) - 2)):
559 length = int.from_bytes(data[offset : offset + 2], "big")
560 if 4 < length < len(data) and length < 65536:
561 # Check if data continues with similar pattern
562 next_offset = offset + length
563 if next_offset + 2 < len(data): 563 ↛ 558line 563 didn't jump to line 558 because the condition on line 563 was always true
564 next_length = int.from_bytes(data[next_offset : next_offset + 2], "big")
565 if 4 < next_length < len(data): 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true
566 return "length_prefix"
568 # Check for fixed size
569 if len(data) >= 32:
570 # Look for repeating pattern
571 for size in range(4, 128):
572 if len(data) % size == 0:
573 chunks = [data[i : i + size] for i in range(0, len(data), size)]
574 if len(chunks) >= 3:
575 # Check structural similarity
576 first = chunks[0][:4] if len(chunks[0]) >= 4 else chunks[0]
577 matches = sum(1 for c in chunks[1:] if c[: len(first)] == first)
578 if matches >= len(chunks) * 0.5:
579 return "fixed"
581 return "unknown"
583 def _auto_frame(self, data: bytes) -> FramingResult:
584 """Automatically detect and apply framing.
586 Args:
587 data: Stream data.
589 Returns:
590 FramingResult with detected framing.
591 """
592 framing_type = self.detect_framing(data)
594 if framing_type == "delimiter":
595 # Find the delimiter
596 for delim in [b"\r\n", b"\n", b"\x00", b"\r"]: 596 ↛ 600line 596 didn't jump to line 600 because the loop on line 596 didn't complete
597 if data.count(delim) >= 3:
598 self.delimiter = delim
599 break
600 return self._frame_by_delimiter(data)
602 elif framing_type == "length_prefix": 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 return self._frame_by_length(data)
605 elif framing_type == "fixed":
606 # Try to detect fixed size
607 for size in range(4, 128): 607 ↛ 611line 607 didn't jump to line 611 because the loop on line 607 didn't complete
608 if len(data) % size == 0 and len(data) // size >= 3: 608 ↛ 607line 608 didn't jump to line 607 because the condition on line 608 was always true
609 self.fixed_size = size
610 break
611 return self._frame_fixed(data)
613 else:
614 # Return as single message
615 return FramingResult(
616 messages=[
617 MessageFrame(
618 data=data,
619 offset=0,
620 length=len(data),
621 frame_type="unknown",
622 )
623 ],
624 framing_type="unknown",
625 )
627 def _frame_by_delimiter(self, data: bytes) -> FramingResult:
628 """Frame by delimiter.
630 Args:
631 data: Stream data.
633 Returns:
634 FramingResult.
635 """
636 if self.delimiter is None:
637 return FramingResult(messages=[], framing_type="delimiter")
639 messages = []
640 offset = 0
641 parts = data.split(self.delimiter)
643 for i, part in enumerate(parts):
644 if part: # Skip empty parts
645 messages.append(
646 MessageFrame(
647 data=part,
648 offset=offset,
649 length=len(part),
650 frame_type="delimited",
651 sequence=i,
652 )
653 )
654 offset += len(part) + len(self.delimiter)
656 # Check for remaining bytes
657 remaining = b""
658 if parts and not parts[-1]:
659 # Ends with delimiter, no remaining
660 pass
661 elif parts: 661 ↛ 664line 661 didn't jump to line 664 because the condition on line 661 was always true
662 remaining = parts[-1] if not data.endswith(self.delimiter) else b""
664 return FramingResult(
665 messages=messages,
666 framing_type="delimiter",
667 delimiter=self.delimiter,
668 remaining=remaining,
669 )
671 def _frame_by_length(self, data: bytes) -> FramingResult:
672 """Frame by length prefix.
674 Args:
675 data: Stream data.
677 Returns:
678 FramingResult.
679 """
680 messages = []
681 offset = 0
682 sequence = 0
684 while offset + self.length_field_offset + self.length_field_size <= len(data):
685 # Read length field
686 length_start = offset + self.length_field_offset
687 length_bytes = data[length_start : length_start + self.length_field_size]
689 if self.length_field_endian == "big":
690 length = int.from_bytes(length_bytes, "big")
691 else:
692 length = int.from_bytes(length_bytes, "little")
694 # Calculate total message size
695 if self.length_includes_header:
696 msg_size = length
697 header_size = self.length_field_offset + self.length_field_size
698 else:
699 header_size = self.length_field_offset + self.length_field_size
700 msg_size = header_size + length
702 # Check if complete message available
703 if offset + msg_size > len(data):
704 break
706 messages.append(
707 MessageFrame(
708 data=data[offset : offset + msg_size],
709 offset=offset,
710 length=msg_size,
711 frame_type="length_prefixed",
712 sequence=sequence,
713 )
714 )
716 offset += msg_size
717 sequence += 1
719 remaining = data[offset:] if offset < len(data) else b""
721 return FramingResult(
722 messages=messages,
723 framing_type="length_prefix",
724 length_field_offset=self.length_field_offset,
725 length_field_size=self.length_field_size,
726 remaining=remaining,
727 )
729 def _frame_fixed(self, data: bytes) -> FramingResult:
730 """Frame by fixed size.
732 Args:
733 data: Stream data.
735 Returns:
736 FramingResult.
737 """
738 if self.fixed_size <= 0:
739 return FramingResult(messages=[], framing_type="fixed")
741 messages = []
742 offset = 0
743 sequence = 0
745 while offset + self.fixed_size <= len(data):
746 messages.append(
747 MessageFrame(
748 data=data[offset : offset + self.fixed_size],
749 offset=offset,
750 length=self.fixed_size,
751 frame_type="fixed",
752 sequence=sequence,
753 )
754 )
755 offset += self.fixed_size
756 sequence += 1
758 remaining = data[offset:] if offset < len(data) else b""
760 return FramingResult(
761 messages=messages,
762 framing_type="fixed",
763 remaining=remaining,
764 )
767# =============================================================================
768# Convenience functions
769# =============================================================================
772def reassemble_udp_stream(
773 packets: Sequence[dict[str, Any] | bytes],
774 sequence_key: Callable[[Any], int] | None = None,
775) -> ReassembledStream:
776 """Reassemble UDP datagram stream.
778 Implements RE-STR-001: UDP Stream Reconstruction.
780 Args:
781 packets: List of UDP packets.
782 sequence_key: Function to extract sequence number.
784 Returns:
785 ReassembledStream with ordered data.
787 Example:
788 >>> stream = reassemble_udp_stream(udp_packets)
789 >>> print(f"Reassembled {len(stream.data)} bytes")
790 """
791 reassembler = UDPStreamReassembler(sequence_key=sequence_key)
792 for packet in packets:
793 reassembler.add_segment(packet)
794 return reassembler.get_stream()
797def reassemble_tcp_stream(
798 segments: Sequence[dict[str, Any]],
799 flow_key: str | None = None,
800) -> ReassembledStream:
801 """Reassemble TCP byte stream.
803 Implements RE-STR-002: TCP Stream Reassembly.
805 Args:
806 segments: List of TCP segments.
807 flow_key: Optional flow identifier.
809 Returns:
810 ReassembledStream with complete data.
812 Example:
813 >>> stream = reassemble_tcp_stream(tcp_segments)
814 >>> print(f"Reassembled {len(stream.data)} bytes with {stream.gaps} gaps")
815 """
816 reassembler = TCPStreamReassembler()
817 for segment in segments:
818 reassembler.add_segment(segment, flow_key)
819 return reassembler.get_stream(flow_key)
822def extract_messages(
823 data: bytes,
824 framing_type: Literal["auto", "delimiter", "length_prefix", "fixed"] = "auto",
825 delimiter: bytes | None = None,
826 length_field_offset: int = 0,
827 length_field_size: int = 2,
828 fixed_size: int = 0,
829) -> FramingResult:
830 """Extract framed messages from stream data.
832 Implements RE-STR-003: Message Framing and Segmentation.
834 Args:
835 data: Stream data.
836 framing_type: Type of framing.
837 delimiter: Delimiter for delimiter-based framing.
838 length_field_offset: Length field offset.
839 length_field_size: Length field size.
840 fixed_size: Fixed message size.
842 Returns:
843 FramingResult with extracted messages.
845 Example:
846 >>> result = extract_messages(data, framing_type='delimiter', delimiter=b'\\r\\n')
847 >>> for msg in result.messages:
848 ... print(msg.data)
849 """
850 framer = MessageFramer(
851 framing_type=framing_type,
852 delimiter=delimiter,
853 length_field_offset=length_field_offset,
854 length_field_size=length_field_size,
855 fixed_size=fixed_size,
856 )
857 return framer.frame(data)
860def detect_message_framing(data: bytes) -> dict[str, Any]:
861 """Detect message framing type in data.
863 Implements RE-STR-003: Framing detection.
865 Args:
866 data: Stream data sample.
868 Returns:
869 Dictionary with detected framing parameters.
871 Example:
872 >>> framing = detect_message_framing(stream_data)
873 >>> print(f"Detected: {framing['type']}")
874 """
875 framer = MessageFramer()
876 framing_type = framer.detect_framing(data)
878 result: dict[str, Any] = {"type": framing_type}
880 if framing_type == "delimiter":
881 # Find the delimiter
882 for delim in [b"\r\n", b"\n", b"\x00", b"\r"]: 882 ↛ 900line 882 didn't jump to line 900 because the loop on line 882 didn't complete
883 if data.count(delim) >= 3: 883 ↛ 882line 883 didn't jump to line 882 because the condition on line 883 was always true
884 result["delimiter"] = delim
885 result["message_count"] = data.count(delim)
886 break
888 elif framing_type == "length_prefix": 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 result["length_field_offset"] = 0
890 result["length_field_size"] = 2
892 elif framing_type == "fixed":
893 # Try to detect fixed size
894 for size in range(4, 128): 894 ↛ 900line 894 didn't jump to line 900 because the loop on line 894 didn't complete
895 if len(data) % size == 0 and len(data) // size >= 3: 895 ↛ 894line 895 didn't jump to line 894 because the condition on line 895 was always true
896 result["fixed_size"] = size
897 result["message_count"] = len(data) // size
898 break
900 return result
903__all__ = [
904 "FramingResult",
905 "MessageFrame",
906 "MessageFramer",
907 "ReassembledStream",
908 # Data classes
909 "StreamSegment",
910 "TCPStreamReassembler",
911 # Classes
912 "UDPStreamReassembler",
913 "detect_message_framing",
914 "extract_messages",
915 "reassemble_tcp_stream",
916 # Functions
917 "reassemble_udp_stream",
918]