Coverage for src / tracekit / inference / stream.py: 94%

304 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Stream reassembly and message framing for network protocols. 

2 

3 - RE-STR-001: UDP Stream Reconstruction 

4 - RE-STR-002: TCP Stream Reassembly 

5 - RE-STR-003: Message Framing and Segmentation 

6 

7This module provides tools for reconstructing application-layer data from 

8transport-layer segments, handling out-of-order delivery, gaps, and 

9message boundaries. 

10""" 

11 

12from __future__ import annotations 

13 

14from collections import defaultdict 

15from collections.abc import Callable, Sequence 

16from dataclasses import dataclass, field 

17from typing import Any, Literal 

18 

19 

20@dataclass 

21class StreamSegment: 

22 """A segment of stream data. 

23 

24 Implements RE-STR-001, RE-STR-002: Stream segment. 

25 

26 Attributes: 

27 sequence_number: Sequence number (TCP) or packet number (UDP). 

28 data: Segment payload. 

29 timestamp: Capture timestamp. 

30 src: Source address. 

31 dst: Destination address. 

32 flags: Protocol flags. 

33 is_retransmit: Whether this is a retransmission. 

34 """ 

35 

36 sequence_number: int 

37 data: bytes 

38 timestamp: float = 0.0 

39 src: str = "" 

40 dst: str = "" 

41 flags: int = 0 

42 is_retransmit: bool = False 

43 

44 

45@dataclass 

46class ReassembledStream: 

47 """A fully reassembled stream. 

48 

49 Implements RE-STR-001, RE-STR-002: Reassembled stream. 

50 

51 Attributes: 

52 data: Complete reassembled data. 

53 src: Source address. 

54 dst: Destination address. 

55 start_time: Stream start time. 

56 end_time: Stream end time. 

57 segments: Number of segments. 

58 gaps: List of (start, end) gap ranges. 

59 retransmits: Number of retransmissions detected. 

60 out_of_order: Number of out-of-order segments. 

61 """ 

62 

63 data: bytes 

64 src: str 

65 dst: str 

66 start_time: float 

67 end_time: float 

68 segments: int 

69 gaps: list[tuple[int, int]] = field(default_factory=list) 

70 retransmits: int = 0 

71 out_of_order: int = 0 

72 

73 

74@dataclass 

75class MessageFrame: 

76 """A framed message from stream data. 

77 

78 Implements RE-STR-003: Message frame. 

79 

80 Attributes: 

81 data: Message data. 

82 offset: Offset in stream. 

83 length: Message length. 

84 frame_type: Detected frame type. 

85 is_complete: Whether message is complete. 

86 sequence: Message sequence number if detected. 

87 """ 

88 

89 data: bytes 

90 offset: int 

91 length: int 

92 frame_type: str = "unknown" 

93 is_complete: bool = True 

94 sequence: int | None = None 

95 

96 

97@dataclass 

98class FramingResult: 

99 """Result of message framing. 

100 

101 Implements RE-STR-003: Framing result. 

102 

103 Attributes: 

104 messages: List of extracted messages. 

105 framing_type: Detected framing type. 

106 delimiter: Detected delimiter if applicable. 

107 length_field_offset: Length field offset if applicable. 

108 length_field_size: Length field size if applicable. 

109 remaining: Unframed bytes at end. 

110 """ 

111 

112 messages: list[MessageFrame] 

113 framing_type: str 

114 delimiter: bytes | None = None 

115 length_field_offset: int | None = None 

116 length_field_size: int | None = None 

117 remaining: bytes = b"" 

118 

119 

120class UDPStreamReassembler: 

121 """Reassemble UDP datagram streams. 

122 

123 Implements RE-STR-001: UDP Stream Reconstruction. 

124 

125 UDP doesn't guarantee order, so this reassembler orders datagrams 

126 by sequence number or timestamp and handles gaps. 

127 

128 Example: 

129 >>> reassembler = UDPStreamReassembler() 

130 >>> for packet in packets: 

131 ... reassembler.add_segment(packet) 

132 >>> stream = reassembler.get_stream() 

133 """ 

134 

135 def __init__( 

136 self, 

137 sequence_key: Callable[[Any], int] | None = None, 

138 max_gap: int = 1000, 

139 ) -> None: 

140 """Initialize UDP reassembler. 

141 

142 Args: 

143 sequence_key: Function to extract sequence number from packet. 

144 max_gap: Maximum sequence gap before treating as new stream. 

145 """ 

146 self.sequence_key = sequence_key 

147 self.max_gap = max_gap 

148 self._segments: dict[str, list[StreamSegment]] = defaultdict(list) 

149 

150 def add_segment( 

151 self, 

152 packet: dict[str, Any] | bytes, 

153 flow_key: str | None = None, 

154 ) -> None: 

155 """Add a UDP datagram to the reassembler. 

156 

157 Args: 

158 packet: Packet data or dict with metadata. 

159 flow_key: Optional flow identifier. 

160 """ 

161 if isinstance(packet, bytes): 

162 segment = StreamSegment( 

163 sequence_number=len(self._segments.get(flow_key or "default", [])), 

164 data=packet, 

165 ) 

166 else: 

167 seq = 0 

168 if self.sequence_key is not None: 

169 try: 

170 seq = self.sequence_key(packet) 

171 except (KeyError, TypeError): 

172 pass 

173 

174 segment = StreamSegment( 

175 sequence_number=seq, 

176 data=packet.get("data", packet.get("payload", b"")), 

177 timestamp=packet.get("timestamp", 0.0), 

178 src=packet.get("src", packet.get("src_ip", "")), 

179 dst=packet.get("dst", packet.get("dst_ip", "")), 

180 ) 

181 

182 key = flow_key or f"{segment.src}-{segment.dst}" 

183 self._segments[key].append(segment) 

184 

185 def get_stream(self, flow_key: str | None = None) -> ReassembledStream: 

186 """Get reassembled stream for a flow. 

187 

188 Implements RE-STR-001: UDP stream reconstruction. 

189 

190 Args: 

191 flow_key: Flow identifier. 

192 

193 Returns: 

194 ReassembledStream with ordered data. 

195 """ 

196 if flow_key is None: 

197 # Get first/only flow 

198 if not self._segments: 

199 return ReassembledStream( 

200 data=b"", 

201 src="", 

202 dst="", 

203 start_time=0.0, 

204 end_time=0.0, 

205 segments=0, 

206 ) 

207 flow_key = next(iter(self._segments.keys())) 

208 

209 segments = self._segments.get(flow_key, []) 

210 if not segments: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 return ReassembledStream( 

212 data=b"", 

213 src="", 

214 dst="", 

215 start_time=0.0, 

216 end_time=0.0, 

217 segments=0, 

218 ) 

219 

220 # Sort by sequence number 

221 sorted_segments = sorted(segments, key=lambda s: s.sequence_number) 

222 

223 # Concatenate data 

224 data = b"".join(s.data for s in sorted_segments) 

225 

226 # Detect out-of-order: count segments that arrived after a higher sequence number 

227 out_of_order = 0 

228 max_seq_seen = -1 

229 for segment in segments: 

230 if segment.sequence_number < max_seq_seen: 

231 out_of_order += 1 

232 max_seq_seen = max(max_seq_seen, segment.sequence_number) 

233 

234 # Detect gaps 

235 gaps = [] 

236 for i in range(1, len(sorted_segments)): 

237 expected = sorted_segments[i - 1].sequence_number + len(sorted_segments[i - 1].data) 

238 actual = sorted_segments[i].sequence_number 

239 if actual > expected: 

240 gaps.append((expected, actual)) 

241 

242 timestamps = [s.timestamp for s in sorted_segments if s.timestamp > 0] 

243 

244 return ReassembledStream( 

245 data=data, 

246 src=sorted_segments[0].src if sorted_segments else "", 

247 dst=sorted_segments[0].dst if sorted_segments else "", 

248 start_time=min(timestamps) if timestamps else 0.0, 

249 end_time=max(timestamps) if timestamps else 0.0, 

250 segments=len(sorted_segments), 

251 gaps=gaps, 

252 retransmits=0, 

253 out_of_order=out_of_order, 

254 ) 

255 

256 def get_all_streams(self) -> dict[str, ReassembledStream]: 

257 """Get all reassembled streams. 

258 

259 Returns: 

260 Dictionary mapping flow keys to streams. 

261 """ 

262 return {key: self.get_stream(key) for key in self._segments} 

263 

264 def clear(self) -> None: 

265 """Clear all segments.""" 

266 self._segments.clear() 

267 

268 

269class TCPStreamReassembler: 

270 """Reassemble TCP byte streams. 

271 

272 Implements RE-STR-002: TCP Stream Reassembly. 

273 

274 Handles TCP sequence numbers, retransmissions, and ordering 

275 to reconstruct the original byte stream. 

276 

277 Example: 

278 >>> reassembler = TCPStreamReassembler() 

279 >>> for segment in tcp_segments: 

280 ... reassembler.add_segment(segment) 

281 >>> stream = reassembler.get_stream() 

282 """ 

283 

284 def __init__( 

285 self, 

286 initial_sequence: int | None = None, 

287 max_buffer_size: int = 10 * 1024 * 1024, 

288 ) -> None: 

289 """Initialize TCP reassembler. 

290 

291 Args: 

292 initial_sequence: Initial sequence number (auto-detect if None). 

293 max_buffer_size: Maximum buffer size in bytes. 

294 """ 

295 self.initial_sequence = initial_sequence 

296 self.max_buffer_size = max_buffer_size 

297 

298 self._segments: dict[str, list[StreamSegment]] = defaultdict(list) 

299 self._isn: dict[str, int | None] = {} # Initial sequence numbers 

300 self._seen_seqs: dict[str, set[int]] = defaultdict(set) # Track seen sequence numbers 

301 

302 def add_segment( 

303 self, 

304 segment: dict[str, Any] | StreamSegment, 

305 flow_key: str | None = None, 

306 ) -> None: 

307 """Add a TCP segment to the reassembler. 

308 

309 Args: 

310 segment: TCP segment data or StreamSegment. 

311 flow_key: Optional flow identifier. 

312 """ 

313 if isinstance(segment, dict): 

314 seq_num = segment.get("seq") or segment.get("sequence_number") or 0 

315 seg_data = segment.get("data") or segment.get("payload") or b"" 

316 seg = StreamSegment( 

317 sequence_number=seq_num, 

318 data=seg_data if isinstance(seg_data, bytes) else b"", 

319 timestamp=segment.get("timestamp", 0.0), 

320 src=segment.get("src", ""), 

321 dst=segment.get("dst", ""), 

322 flags=segment.get("flags", 0), 

323 ) 

324 else: 

325 seg = segment 

326 

327 key = flow_key or f"{seg.src}-{seg.dst}" 

328 

329 # Detect initial sequence number (SYN) 

330 if key not in self._isn or self._isn[key] is None: 

331 if seg.flags & 0x02: # SYN flag 

332 # SYN consumes one sequence number, so ISN+1 is first data byte 

333 self._isn[key] = seg.sequence_number + 1 

334 return # Don't store SYN itself 

335 

336 if self.initial_sequence is not None: 

337 self._isn[key] = self.initial_sequence 

338 else: 

339 # Use first data segment's sequence as initial 

340 self._isn[key] = seg.sequence_number 

341 

342 # Check for retransmit: same sequence number seen before WITH data 

343 # Empty segments (ACK-only) shouldn't cause data segments to be marked as retransmits 

344 if seg.sequence_number in self._seen_seqs[key] and seg.data: 

345 # Check if there's already a segment with data at this sequence 

346 has_data_at_seq = any( 

347 s.sequence_number == seg.sequence_number and s.data for s in self._segments[key] 

348 ) 

349 if has_data_at_seq: 349 ↛ 352line 349 didn't jump to line 352 because the condition on line 349 was always true

350 seg.is_retransmit = True 

351 

352 if seg.data: # Only track sequences with data 

353 self._seen_seqs[key].add(seg.sequence_number) 

354 

355 self._segments[key].append(seg) 

356 

357 def get_stream(self, flow_key: str | None = None) -> ReassembledStream: 

358 """Get reassembled TCP stream. 

359 

360 Implements RE-STR-002: TCP stream reassembly. 

361 

362 Args: 

363 flow_key: Flow identifier. 

364 

365 Returns: 

366 ReassembledStream with complete data. 

367 """ 

368 if flow_key is None: 

369 if not self._segments: 

370 return ReassembledStream( 

371 data=b"", 

372 src="", 

373 dst="", 

374 start_time=0.0, 

375 end_time=0.0, 

376 segments=0, 

377 ) 

378 flow_key = next(iter(self._segments.keys())) 

379 

380 segments = self._segments.get(flow_key, []) 

381 if not segments: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 return ReassembledStream( 

383 data=b"", 

384 src="", 

385 dst="", 

386 start_time=0.0, 

387 end_time=0.0, 

388 segments=0, 

389 ) 

390 

391 isn = self._isn.get(flow_key, 0) or 0 

392 

393 # Count retransmits first (before filtering) 

394 retransmits = sum(1 for seg in segments if seg.is_retransmit) 

395 

396 # If ISN wasn't detected via SYN, use minimum sequence number 

397 if isn == 0 or isn > min(s.sequence_number for s in segments): 

398 isn = min(s.sequence_number for s in segments) 

399 

400 # Detect out-of-order by checking arrival order vs sequence order 

401 # Count segments that arrived before a segment with lower sequence 

402 out_of_order = 0 

403 for i, seg in enumerate(segments): 

404 # Check if any earlier segment has higher sequence 

405 for j in range(i): 

406 if segments[j].sequence_number > seg.sequence_number: 

407 out_of_order += 1 

408 break 

409 

410 # Sort by relative sequence number 

411 sorted_segments = sorted(segments, key=lambda s: (s.sequence_number - isn) % (2**32)) 

412 

413 # Build stream handling overlaps and gaps 

414 data_buffer = bytearray() 

415 current_offset = 0 

416 gaps = [] 

417 

418 for seg in sorted_segments: 

419 if seg.is_retransmit: 

420 continue # Skip retransmits when building data 

421 

422 rel_seq = (seg.sequence_number - isn) % (2**32) 

423 

424 if rel_seq > current_offset: 

425 # Gap detected 

426 gaps.append((current_offset, rel_seq)) 

427 # Fill gap with zeros 

428 data_buffer.extend(b"\x00" * (rel_seq - current_offset)) 

429 current_offset = rel_seq 

430 

431 if rel_seq < current_offset: 

432 # Overlap - use only non-overlapping part 

433 overlap = current_offset - rel_seq 

434 if overlap < len(seg.data): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 data_buffer.extend(seg.data[overlap:]) 

436 current_offset += len(seg.data) - overlap 

437 else: 

438 data_buffer.extend(seg.data) 

439 current_offset += len(seg.data) 

440 

441 timestamps = [s.timestamp for s in sorted_segments if s.timestamp > 0] 

442 

443 return ReassembledStream( 

444 data=bytes(data_buffer), 

445 src=sorted_segments[0].src if sorted_segments else "", 

446 dst=sorted_segments[0].dst if sorted_segments else "", 

447 start_time=min(timestamps) if timestamps else 0.0, 

448 end_time=max(timestamps) if timestamps else 0.0, 

449 segments=len(sorted_segments), 

450 gaps=gaps, 

451 retransmits=retransmits, 

452 out_of_order=out_of_order, 

453 ) 

454 

455 def get_all_streams(self) -> dict[str, ReassembledStream]: 

456 """Get all reassembled TCP streams.""" 

457 return {key: self.get_stream(key) for key in self._segments} 

458 

459 def clear(self) -> None: 

460 """Clear all data.""" 

461 self._segments.clear() 

462 self._isn.clear() 

463 self._seen_seqs.clear() 

464 

465 

466class MessageFramer: 

467 """Extract framed messages from stream data. 

468 

469 Implements RE-STR-003: Message Framing and Segmentation. 

470 

471 Supports multiple framing methods: delimiter-based, length-prefixed, 

472 and fixed-size. 

473 

474 Example: 

475 >>> framer = MessageFramer(framing_type='delimiter', delimiter=b'\\r\\n') 

476 >>> result = framer.frame(stream_data) 

477 >>> for msg in result.messages: 

478 ... print(msg.data) 

479 """ 

480 

481 def __init__( 

482 self, 

483 framing_type: Literal["delimiter", "length_prefix", "fixed", "auto"] = "auto", 

484 delimiter: bytes | None = None, 

485 length_field_offset: int = 0, 

486 length_field_size: int = 2, 

487 length_field_endian: Literal["big", "little"] = "big", 

488 length_includes_header: bool = False, 

489 fixed_size: int = 0, 

490 ) -> None: 

491 """Initialize message framer. 

492 

493 Args: 

494 framing_type: Type of framing to use. 

495 delimiter: Delimiter bytes for delimiter-based framing. 

496 length_field_offset: Offset of length field. 

497 length_field_size: Size of length field in bytes. 

498 length_field_endian: Endianness of length field. 

499 length_includes_header: Whether length includes header. 

500 fixed_size: Fixed message size. 

501 """ 

502 self.framing_type = framing_type 

503 self.delimiter = delimiter 

504 self.length_field_offset = length_field_offset 

505 self.length_field_size = length_field_size 

506 self.length_field_endian = length_field_endian 

507 self.length_includes_header = length_includes_header 

508 self.fixed_size = fixed_size 

509 

510 def frame(self, data: bytes) -> FramingResult: 

511 """Extract framed messages from data. 

512 

513 Implements RE-STR-003: Message framing workflow. 

514 

515 Args: 

516 data: Stream data to frame. 

517 

518 Returns: 

519 FramingResult with extracted messages. 

520 

521 Example: 

522 >>> result = framer.frame(stream_data) 

523 >>> print(f"Found {len(result.messages)} messages") 

524 """ 

525 if self.framing_type == "auto": 

526 return self._auto_frame(data) 

527 elif self.framing_type == "delimiter": 

528 return self._frame_by_delimiter(data) 

529 elif self.framing_type == "length_prefix": 

530 return self._frame_by_length(data) 

531 else: # framing_type == "fixed" 

532 return self._frame_fixed(data) 

533 

534 def detect_framing(self, data: bytes) -> str: 

535 """Detect framing type from data. 

536 

537 Implements RE-STR-003: Framing detection. 

538 

539 Args: 

540 data: Sample data. 

541 

542 Returns: 

543 Detected framing type string. 

544 """ 

545 # Check for common delimiters 

546 common_delimiters = [b"\r\n", b"\n", b"\x00", b"\r"] 

547 for delim in common_delimiters: 

548 count = data.count(delim) 

549 if count >= 3: 

550 # Check for regular spacing 

551 parts = data.split(delim) 

552 if parts and len({len(p) for p in parts if p}) <= 3: 552 ↛ 547line 552 didn't jump to line 547 because the condition on line 552 was always true

553 return "delimiter" 

554 

555 # Check for length-prefixed 

556 if len(data) >= 4: 

557 # Try big-endian 2-byte length 

558 for offset in range(min(8, len(data) - 2)): 

559 length = int.from_bytes(data[offset : offset + 2], "big") 

560 if 4 < length < len(data) and length < 65536: 

561 # Check if data continues with similar pattern 

562 next_offset = offset + length 

563 if next_offset + 2 < len(data): 563 ↛ 558line 563 didn't jump to line 558 because the condition on line 563 was always true

564 next_length = int.from_bytes(data[next_offset : next_offset + 2], "big") 

565 if 4 < next_length < len(data): 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 return "length_prefix" 

567 

568 # Check for fixed size 

569 if len(data) >= 32: 

570 # Look for repeating pattern 

571 for size in range(4, 128): 

572 if len(data) % size == 0: 

573 chunks = [data[i : i + size] for i in range(0, len(data), size)] 

574 if len(chunks) >= 3: 

575 # Check structural similarity 

576 first = chunks[0][:4] if len(chunks[0]) >= 4 else chunks[0] 

577 matches = sum(1 for c in chunks[1:] if c[: len(first)] == first) 

578 if matches >= len(chunks) * 0.5: 

579 return "fixed" 

580 

581 return "unknown" 

582 

583 def _auto_frame(self, data: bytes) -> FramingResult: 

584 """Automatically detect and apply framing. 

585 

586 Args: 

587 data: Stream data. 

588 

589 Returns: 

590 FramingResult with detected framing. 

591 """ 

592 framing_type = self.detect_framing(data) 

593 

594 if framing_type == "delimiter": 

595 # Find the delimiter 

596 for delim in [b"\r\n", b"\n", b"\x00", b"\r"]: 596 ↛ 600line 596 didn't jump to line 600 because the loop on line 596 didn't complete

597 if data.count(delim) >= 3: 

598 self.delimiter = delim 

599 break 

600 return self._frame_by_delimiter(data) 

601 

602 elif framing_type == "length_prefix": 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 return self._frame_by_length(data) 

604 

605 elif framing_type == "fixed": 

606 # Try to detect fixed size 

607 for size in range(4, 128): 607 ↛ 611line 607 didn't jump to line 611 because the loop on line 607 didn't complete

608 if len(data) % size == 0 and len(data) // size >= 3: 608 ↛ 607line 608 didn't jump to line 607 because the condition on line 608 was always true

609 self.fixed_size = size 

610 break 

611 return self._frame_fixed(data) 

612 

613 else: 

614 # Return as single message 

615 return FramingResult( 

616 messages=[ 

617 MessageFrame( 

618 data=data, 

619 offset=0, 

620 length=len(data), 

621 frame_type="unknown", 

622 ) 

623 ], 

624 framing_type="unknown", 

625 ) 

626 

627 def _frame_by_delimiter(self, data: bytes) -> FramingResult: 

628 """Frame by delimiter. 

629 

630 Args: 

631 data: Stream data. 

632 

633 Returns: 

634 FramingResult. 

635 """ 

636 if self.delimiter is None: 

637 return FramingResult(messages=[], framing_type="delimiter") 

638 

639 messages = [] 

640 offset = 0 

641 parts = data.split(self.delimiter) 

642 

643 for i, part in enumerate(parts): 

644 if part: # Skip empty parts 

645 messages.append( 

646 MessageFrame( 

647 data=part, 

648 offset=offset, 

649 length=len(part), 

650 frame_type="delimited", 

651 sequence=i, 

652 ) 

653 ) 

654 offset += len(part) + len(self.delimiter) 

655 

656 # Check for remaining bytes 

657 remaining = b"" 

658 if parts and not parts[-1]: 

659 # Ends with delimiter, no remaining 

660 pass 

661 elif parts: 661 ↛ 664line 661 didn't jump to line 664 because the condition on line 661 was always true

662 remaining = parts[-1] if not data.endswith(self.delimiter) else b"" 

663 

664 return FramingResult( 

665 messages=messages, 

666 framing_type="delimiter", 

667 delimiter=self.delimiter, 

668 remaining=remaining, 

669 ) 

670 

671 def _frame_by_length(self, data: bytes) -> FramingResult: 

672 """Frame by length prefix. 

673 

674 Args: 

675 data: Stream data. 

676 

677 Returns: 

678 FramingResult. 

679 """ 

680 messages = [] 

681 offset = 0 

682 sequence = 0 

683 

684 while offset + self.length_field_offset + self.length_field_size <= len(data): 

685 # Read length field 

686 length_start = offset + self.length_field_offset 

687 length_bytes = data[length_start : length_start + self.length_field_size] 

688 

689 if self.length_field_endian == "big": 

690 length = int.from_bytes(length_bytes, "big") 

691 else: 

692 length = int.from_bytes(length_bytes, "little") 

693 

694 # Calculate total message size 

695 if self.length_includes_header: 

696 msg_size = length 

697 header_size = self.length_field_offset + self.length_field_size 

698 else: 

699 header_size = self.length_field_offset + self.length_field_size 

700 msg_size = header_size + length 

701 

702 # Check if complete message available 

703 if offset + msg_size > len(data): 

704 break 

705 

706 messages.append( 

707 MessageFrame( 

708 data=data[offset : offset + msg_size], 

709 offset=offset, 

710 length=msg_size, 

711 frame_type="length_prefixed", 

712 sequence=sequence, 

713 ) 

714 ) 

715 

716 offset += msg_size 

717 sequence += 1 

718 

719 remaining = data[offset:] if offset < len(data) else b"" 

720 

721 return FramingResult( 

722 messages=messages, 

723 framing_type="length_prefix", 

724 length_field_offset=self.length_field_offset, 

725 length_field_size=self.length_field_size, 

726 remaining=remaining, 

727 ) 

728 

729 def _frame_fixed(self, data: bytes) -> FramingResult: 

730 """Frame by fixed size. 

731 

732 Args: 

733 data: Stream data. 

734 

735 Returns: 

736 FramingResult. 

737 """ 

738 if self.fixed_size <= 0: 

739 return FramingResult(messages=[], framing_type="fixed") 

740 

741 messages = [] 

742 offset = 0 

743 sequence = 0 

744 

745 while offset + self.fixed_size <= len(data): 

746 messages.append( 

747 MessageFrame( 

748 data=data[offset : offset + self.fixed_size], 

749 offset=offset, 

750 length=self.fixed_size, 

751 frame_type="fixed", 

752 sequence=sequence, 

753 ) 

754 ) 

755 offset += self.fixed_size 

756 sequence += 1 

757 

758 remaining = data[offset:] if offset < len(data) else b"" 

759 

760 return FramingResult( 

761 messages=messages, 

762 framing_type="fixed", 

763 remaining=remaining, 

764 ) 

765 

766 

767# ============================================================================= 

768# Convenience functions 

769# ============================================================================= 

770 

771 

772def reassemble_udp_stream( 

773 packets: Sequence[dict[str, Any] | bytes], 

774 sequence_key: Callable[[Any], int] | None = None, 

775) -> ReassembledStream: 

776 """Reassemble UDP datagram stream. 

777 

778 Implements RE-STR-001: UDP Stream Reconstruction. 

779 

780 Args: 

781 packets: List of UDP packets. 

782 sequence_key: Function to extract sequence number. 

783 

784 Returns: 

785 ReassembledStream with ordered data. 

786 

787 Example: 

788 >>> stream = reassemble_udp_stream(udp_packets) 

789 >>> print(f"Reassembled {len(stream.data)} bytes") 

790 """ 

791 reassembler = UDPStreamReassembler(sequence_key=sequence_key) 

792 for packet in packets: 

793 reassembler.add_segment(packet) 

794 return reassembler.get_stream() 

795 

796 

797def reassemble_tcp_stream( 

798 segments: Sequence[dict[str, Any]], 

799 flow_key: str | None = None, 

800) -> ReassembledStream: 

801 """Reassemble TCP byte stream. 

802 

803 Implements RE-STR-002: TCP Stream Reassembly. 

804 

805 Args: 

806 segments: List of TCP segments. 

807 flow_key: Optional flow identifier. 

808 

809 Returns: 

810 ReassembledStream with complete data. 

811 

812 Example: 

813 >>> stream = reassemble_tcp_stream(tcp_segments) 

814 >>> print(f"Reassembled {len(stream.data)} bytes with {stream.gaps} gaps") 

815 """ 

816 reassembler = TCPStreamReassembler() 

817 for segment in segments: 

818 reassembler.add_segment(segment, flow_key) 

819 return reassembler.get_stream(flow_key) 

820 

821 

822def extract_messages( 

823 data: bytes, 

824 framing_type: Literal["auto", "delimiter", "length_prefix", "fixed"] = "auto", 

825 delimiter: bytes | None = None, 

826 length_field_offset: int = 0, 

827 length_field_size: int = 2, 

828 fixed_size: int = 0, 

829) -> FramingResult: 

830 """Extract framed messages from stream data. 

831 

832 Implements RE-STR-003: Message Framing and Segmentation. 

833 

834 Args: 

835 data: Stream data. 

836 framing_type: Type of framing. 

837 delimiter: Delimiter for delimiter-based framing. 

838 length_field_offset: Length field offset. 

839 length_field_size: Length field size. 

840 fixed_size: Fixed message size. 

841 

842 Returns: 

843 FramingResult with extracted messages. 

844 

845 Example: 

846 >>> result = extract_messages(data, framing_type='delimiter', delimiter=b'\\r\\n') 

847 >>> for msg in result.messages: 

848 ... print(msg.data) 

849 """ 

850 framer = MessageFramer( 

851 framing_type=framing_type, 

852 delimiter=delimiter, 

853 length_field_offset=length_field_offset, 

854 length_field_size=length_field_size, 

855 fixed_size=fixed_size, 

856 ) 

857 return framer.frame(data) 

858 

859 

860def detect_message_framing(data: bytes) -> dict[str, Any]: 

861 """Detect message framing type in data. 

862 

863 Implements RE-STR-003: Framing detection. 

864 

865 Args: 

866 data: Stream data sample. 

867 

868 Returns: 

869 Dictionary with detected framing parameters. 

870 

871 Example: 

872 >>> framing = detect_message_framing(stream_data) 

873 >>> print(f"Detected: {framing['type']}") 

874 """ 

875 framer = MessageFramer() 

876 framing_type = framer.detect_framing(data) 

877 

878 result: dict[str, Any] = {"type": framing_type} 

879 

880 if framing_type == "delimiter": 

881 # Find the delimiter 

882 for delim in [b"\r\n", b"\n", b"\x00", b"\r"]: 882 ↛ 900line 882 didn't jump to line 900 because the loop on line 882 didn't complete

883 if data.count(delim) >= 3: 883 ↛ 882line 883 didn't jump to line 882 because the condition on line 883 was always true

884 result["delimiter"] = delim 

885 result["message_count"] = data.count(delim) 

886 break 

887 

888 elif framing_type == "length_prefix": 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 result["length_field_offset"] = 0 

890 result["length_field_size"] = 2 

891 

892 elif framing_type == "fixed": 

893 # Try to detect fixed size 

894 for size in range(4, 128): 894 ↛ 900line 894 didn't jump to line 900 because the loop on line 894 didn't complete

895 if len(data) % size == 0 and len(data) // size >= 3: 895 ↛ 894line 895 didn't jump to line 894 because the condition on line 895 was always true

896 result["fixed_size"] = size 

897 result["message_count"] = len(data) // size 

898 break 

899 

900 return result 

901 

902 

903__all__ = [ 

904 "FramingResult", 

905 "MessageFrame", 

906 "MessageFramer", 

907 "ReassembledStream", 

908 # Data classes 

909 "StreamSegment", 

910 "TCPStreamReassembler", 

911 # Classes 

912 "UDPStreamReassembler", 

913 "detect_message_framing", 

914 "extract_messages", 

915 "reassemble_tcp_stream", 

916 # Functions 

917 "reassemble_udp_stream", 

918]