Coverage for src / tracekit / analyzers / packet / stream.py: 99%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Stream processing utilities for packet analysis.
3This module provides generator-based stream processing for
4O(1) memory usage when analyzing large packet captures.
7Example:
8 >>> from tracekit.analyzers.packet.stream import stream_packets
9 >>> for packet in stream_packets(file_path, format="pcap"):
10 ... process(packet) # Process one at a time
12References:
13 Python generators and itertools patterns
14"""
16from __future__ import annotations
18import io
19from dataclasses import dataclass
20from pathlib import Path
21from typing import TYPE_CHECKING, Any, BinaryIO, TypeVar
23from tracekit.analyzers.packet.parser import BinaryParser
25if TYPE_CHECKING:
26 from collections.abc import Callable, Iterator
28T = TypeVar("T")
31@dataclass
32class StreamPacket:
33 """Packet from a stream.
35 Attributes:
36 timestamp: Packet timestamp in seconds.
37 data: Raw packet data.
38 metadata: Additional packet metadata.
39 """
41 timestamp: float
42 data: bytes
43 metadata: dict[str, Any]
46def stream_file(
47 file_path: str | Path,
48 chunk_size: int = 65536,
49) -> Iterator[bytes]:
50 """Stream file in chunks.
52 Args:
53 file_path: Path to file.
54 chunk_size: Bytes per chunk (default 64KB).
56 Yields:
57 Byte chunks from file.
59 Example:
60 >>> for chunk in stream_file("large_capture.bin"):
61 ... process_chunk(chunk)
62 """
63 path = Path(file_path)
65 with open(path, "rb") as f:
66 while True:
67 chunk = f.read(chunk_size)
68 if not chunk:
69 break
70 yield chunk
73def stream_records(
74 file_or_buffer: str | Path | BinaryIO | bytes,
75 record_size: int,
76) -> Iterator[bytes]:
77 """Stream fixed-size records.
79 Args:
80 file_or_buffer: Source file path, file object, or bytes.
81 record_size: Size of each record in bytes.
83 Yields:
84 Records as bytes objects.
86 Example:
87 >>> for record in stream_records("data.bin", record_size=128):
88 ... parse_record(record)
89 """
90 if isinstance(file_or_buffer, bytes):
91 buffer: BinaryIO = io.BytesIO(file_or_buffer)
92 should_close = True
93 elif isinstance(file_or_buffer, str | Path):
94 buffer = open(file_or_buffer, "rb") # noqa: SIM115
95 should_close = True
96 else:
97 buffer = file_or_buffer
98 should_close = False
100 try:
101 while True:
102 record = buffer.read(record_size)
103 if len(record) < record_size:
104 break
105 yield record
106 finally:
107 if should_close:
108 buffer.close()
111def stream_packets(
112 file_or_buffer: str | Path | BinaryIO | bytes,
113 *,
114 header_parser: BinaryParser | None = None,
115 length_field: int = 1,
116 header_included: bool = False,
117) -> Iterator[StreamPacket]:
118 """Stream variable-length packets.
120 Parses packets with length-prefixed format.
122 Args:
123 file_or_buffer: Source.
124 header_parser: Parser for packet header.
125 length_field: Index of length field in header (default 1).
126 header_included: True if length includes header.
128 Yields:
129 StreamPacket objects.
131 Example:
132 >>> header = BinaryParser(">HH") # sync, length
133 >>> for pkt in stream_packets("capture.bin", header_parser=header):
134 ... print(f"Packet: {len(pkt.data)} bytes")
135 """
136 if header_parser is None:
137 # Default: 2-byte big-endian length prefix
138 header_parser = BinaryParser(">H")
139 length_field = 0
141 header_size = header_parser.size
143 if isinstance(file_or_buffer, bytes):
144 buffer: BinaryIO = io.BytesIO(file_or_buffer)
145 should_close = True
146 elif isinstance(file_or_buffer, str | Path):
147 buffer = open(file_or_buffer, "rb") # noqa: SIM115
148 should_close = True
149 else:
150 buffer = file_or_buffer
151 should_close = False
153 try:
154 packet_num = 0
156 while True:
157 # Read header
158 header_bytes = buffer.read(header_size)
159 if len(header_bytes) < header_size:
160 break
162 header = header_parser.unpack(header_bytes)
163 length = header[length_field]
165 # Compute payload size
166 payload_size = length - header_size if header_included else length
168 if payload_size < 0: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 break
171 # Read payload
172 payload = buffer.read(payload_size)
173 if len(payload) < payload_size:
174 break
176 packet_num += 1
178 yield StreamPacket(
179 timestamp=packet_num, # Placeholder
180 data=header_bytes + payload,
181 metadata={"header": header, "packet_num": packet_num},
182 )
184 finally:
185 if should_close:
186 buffer.close()
189def stream_delimited(
190 file_or_buffer: str | Path | BinaryIO | bytes,
191 delimiter: bytes = b"\n",
192 *,
193 max_record_size: int = 1048576,
194) -> Iterator[bytes]:
195 """Stream delimiter-separated records.
197 Args:
198 file_or_buffer: Source.
199 delimiter: Record delimiter (default newline).
200 max_record_size: Maximum record size (default 1MB).
202 Yields:
203 Records as bytes (without delimiter).
205 Example:
206 >>> for line in stream_delimited("log.txt", b"\\n"):
207 ... process_line(line)
208 """
209 if isinstance(file_or_buffer, bytes):
210 buffer: BinaryIO = io.BytesIO(file_or_buffer)
211 should_close = True
212 elif isinstance(file_or_buffer, str | Path):
213 buffer = open(file_or_buffer, "rb") # noqa: SIM115
214 should_close = True
215 else:
216 buffer = file_or_buffer
217 should_close = False
219 try:
220 partial = b""
222 while True:
223 chunk = buffer.read(65536)
224 if not chunk:
225 if partial:
226 yield partial
227 break
229 data = partial + chunk
230 parts = data.split(delimiter)
232 # Yield complete records
233 for part in parts[:-1]:
234 if len(part) <= max_record_size:
235 yield part
237 # Keep partial record for next iteration
238 partial = parts[-1]
240 # Guard against memory issues
241 if len(partial) > max_record_size:
242 yield partial[:max_record_size]
243 partial = b""
245 finally:
246 if should_close:
247 buffer.close()
250def pipeline(
251 source: Iterator[T],
252 *transforms: Callable[[Iterator[T]], Iterator[T]],
253) -> Iterator[T]:
254 """Chain processing transforms.
256 Args:
257 source: Source iterator.
258 *transforms: Transform functions.
260 Yields:
261 Transformed items.
263 Example:
264 >>> def filter_large(packets):
265 ... for pkt in packets:
266 ... if len(pkt.data) > 100:
267 ... yield pkt
268 ...
269 >>> def decode(packets):
270 ... for pkt in packets:
271 ... pkt.metadata["decoded"] = decode_packet(pkt.data)
272 ... yield pkt
273 ...
274 >>> for pkt in pipeline(stream_packets(f), filter_large, decode):
275 ... print(pkt)
276 """
277 result: Iterator = source # type: ignore[type-arg]
279 for transform in transforms:
280 result = transform(result)
282 yield from result
285def batch[T](
286 source: Iterator[T],
287 size: int,
288) -> Iterator[list[T]]:
289 """Batch items from iterator.
291 Args:
292 source: Source iterator.
293 size: Batch size.
295 Yields:
296 Lists of items.
298 Example:
299 >>> for batch_items in batch(stream_packets(f), size=100):
300 ... process_batch(batch_items)
301 """
302 current_batch: list[T] = []
304 for item in source:
305 current_batch.append(item)
306 if len(current_batch) >= size:
307 yield current_batch
308 current_batch = []
310 if current_batch:
311 yield current_batch
314def take[T](source: Iterator[T], n: int) -> Iterator[T]:
315 """Take first n items.
317 Args:
318 source: Source iterator.
319 n: Number of items to take.
321 Yields:
322 First n items.
323 """
324 count = 0
325 for item in source:
326 if count >= n:
327 break
328 yield item
329 count += 1 # noqa: SIM113
332def skip[T](source: Iterator[T], n: int) -> Iterator[T]:
333 """Skip first n items.
335 Args:
336 source: Source iterator.
337 n: Number of items to skip.
339 Yields:
340 Items after first n.
341 """
342 count = 0
343 for item in source:
344 if count >= n:
345 yield item
346 count += 1 # noqa: SIM113
349__all__ = [
350 "StreamPacket",
351 "batch",
352 "pipeline",
353 "skip",
354 "stream_delimited",
355 "stream_file",
356 "stream_packets",
357 "stream_records",
358 "take",
359]