Coverage for src / tracekit / analyzers / packet / stream.py: 99%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Stream processing utilities for packet analysis. 

2 

3This module provides generator-based stream processing for 

4O(1) memory usage when analyzing large packet captures. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.packet.stream import stream_packets 

9 >>> for packet in stream_packets(file_path, format="pcap"): 

10 ... process(packet) # Process one at a time 

11 

12References: 

13 Python generators and itertools patterns 

14""" 

15 

16from __future__ import annotations 

17 

18import io 

19from dataclasses import dataclass 

20from pathlib import Path 

21from typing import TYPE_CHECKING, Any, BinaryIO, TypeVar 

22 

23from tracekit.analyzers.packet.parser import BinaryParser 

24 

25if TYPE_CHECKING: 

26 from collections.abc import Callable, Iterator 

27 

28T = TypeVar("T") 

29 

30 

31@dataclass 

32class StreamPacket: 

33 """Packet from a stream. 

34 

35 Attributes: 

36 timestamp: Packet timestamp in seconds. 

37 data: Raw packet data. 

38 metadata: Additional packet metadata. 

39 """ 

40 

41 timestamp: float 

42 data: bytes 

43 metadata: dict[str, Any] 

44 

45 

46def stream_file( 

47 file_path: str | Path, 

48 chunk_size: int = 65536, 

49) -> Iterator[bytes]: 

50 """Stream file in chunks. 

51 

52 Args: 

53 file_path: Path to file. 

54 chunk_size: Bytes per chunk (default 64KB). 

55 

56 Yields: 

57 Byte chunks from file. 

58 

59 Example: 

60 >>> for chunk in stream_file("large_capture.bin"): 

61 ... process_chunk(chunk) 

62 """ 

63 path = Path(file_path) 

64 

65 with open(path, "rb") as f: 

66 while True: 

67 chunk = f.read(chunk_size) 

68 if not chunk: 

69 break 

70 yield chunk 

71 

72 

73def stream_records( 

74 file_or_buffer: str | Path | BinaryIO | bytes, 

75 record_size: int, 

76) -> Iterator[bytes]: 

77 """Stream fixed-size records. 

78 

79 Args: 

80 file_or_buffer: Source file path, file object, or bytes. 

81 record_size: Size of each record in bytes. 

82 

83 Yields: 

84 Records as bytes objects. 

85 

86 Example: 

87 >>> for record in stream_records("data.bin", record_size=128): 

88 ... parse_record(record) 

89 """ 

90 if isinstance(file_or_buffer, bytes): 

91 buffer: BinaryIO = io.BytesIO(file_or_buffer) 

92 should_close = True 

93 elif isinstance(file_or_buffer, str | Path): 

94 buffer = open(file_or_buffer, "rb") # noqa: SIM115 

95 should_close = True 

96 else: 

97 buffer = file_or_buffer 

98 should_close = False 

99 

100 try: 

101 while True: 

102 record = buffer.read(record_size) 

103 if len(record) < record_size: 

104 break 

105 yield record 

106 finally: 

107 if should_close: 

108 buffer.close() 

109 

110 

111def stream_packets( 

112 file_or_buffer: str | Path | BinaryIO | bytes, 

113 *, 

114 header_parser: BinaryParser | None = None, 

115 length_field: int = 1, 

116 header_included: bool = False, 

117) -> Iterator[StreamPacket]: 

118 """Stream variable-length packets. 

119 

120 Parses packets with length-prefixed format. 

121 

122 Args: 

123 file_or_buffer: Source. 

124 header_parser: Parser for packet header. 

125 length_field: Index of length field in header (default 1). 

126 header_included: True if length includes header. 

127 

128 Yields: 

129 StreamPacket objects. 

130 

131 Example: 

132 >>> header = BinaryParser(">HH") # sync, length 

133 >>> for pkt in stream_packets("capture.bin", header_parser=header): 

134 ... print(f"Packet: {len(pkt.data)} bytes") 

135 """ 

136 if header_parser is None: 

137 # Default: 2-byte big-endian length prefix 

138 header_parser = BinaryParser(">H") 

139 length_field = 0 

140 

141 header_size = header_parser.size 

142 

143 if isinstance(file_or_buffer, bytes): 

144 buffer: BinaryIO = io.BytesIO(file_or_buffer) 

145 should_close = True 

146 elif isinstance(file_or_buffer, str | Path): 

147 buffer = open(file_or_buffer, "rb") # noqa: SIM115 

148 should_close = True 

149 else: 

150 buffer = file_or_buffer 

151 should_close = False 

152 

153 try: 

154 packet_num = 0 

155 

156 while True: 

157 # Read header 

158 header_bytes = buffer.read(header_size) 

159 if len(header_bytes) < header_size: 

160 break 

161 

162 header = header_parser.unpack(header_bytes) 

163 length = header[length_field] 

164 

165 # Compute payload size 

166 payload_size = length - header_size if header_included else length 

167 

168 if payload_size < 0: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 break 

170 

171 # Read payload 

172 payload = buffer.read(payload_size) 

173 if len(payload) < payload_size: 

174 break 

175 

176 packet_num += 1 

177 

178 yield StreamPacket( 

179 timestamp=packet_num, # Placeholder 

180 data=header_bytes + payload, 

181 metadata={"header": header, "packet_num": packet_num}, 

182 ) 

183 

184 finally: 

185 if should_close: 

186 buffer.close() 

187 

188 

189def stream_delimited( 

190 file_or_buffer: str | Path | BinaryIO | bytes, 

191 delimiter: bytes = b"\n", 

192 *, 

193 max_record_size: int = 1048576, 

194) -> Iterator[bytes]: 

195 """Stream delimiter-separated records. 

196 

197 Args: 

198 file_or_buffer: Source. 

199 delimiter: Record delimiter (default newline). 

200 max_record_size: Maximum record size (default 1MB). 

201 

202 Yields: 

203 Records as bytes (without delimiter). 

204 

205 Example: 

206 >>> for line in stream_delimited("log.txt", b"\\n"): 

207 ... process_line(line) 

208 """ 

209 if isinstance(file_or_buffer, bytes): 

210 buffer: BinaryIO = io.BytesIO(file_or_buffer) 

211 should_close = True 

212 elif isinstance(file_or_buffer, str | Path): 

213 buffer = open(file_or_buffer, "rb") # noqa: SIM115 

214 should_close = True 

215 else: 

216 buffer = file_or_buffer 

217 should_close = False 

218 

219 try: 

220 partial = b"" 

221 

222 while True: 

223 chunk = buffer.read(65536) 

224 if not chunk: 

225 if partial: 

226 yield partial 

227 break 

228 

229 data = partial + chunk 

230 parts = data.split(delimiter) 

231 

232 # Yield complete records 

233 for part in parts[:-1]: 

234 if len(part) <= max_record_size: 

235 yield part 

236 

237 # Keep partial record for next iteration 

238 partial = parts[-1] 

239 

240 # Guard against memory issues 

241 if len(partial) > max_record_size: 

242 yield partial[:max_record_size] 

243 partial = b"" 

244 

245 finally: 

246 if should_close: 

247 buffer.close() 

248 

249 

250def pipeline( 

251 source: Iterator[T], 

252 *transforms: Callable[[Iterator[T]], Iterator[T]], 

253) -> Iterator[T]: 

254 """Chain processing transforms. 

255 

256 Args: 

257 source: Source iterator. 

258 *transforms: Transform functions. 

259 

260 Yields: 

261 Transformed items. 

262 

263 Example: 

264 >>> def filter_large(packets): 

265 ... for pkt in packets: 

266 ... if len(pkt.data) > 100: 

267 ... yield pkt 

268 ... 

269 >>> def decode(packets): 

270 ... for pkt in packets: 

271 ... pkt.metadata["decoded"] = decode_packet(pkt.data) 

272 ... yield pkt 

273 ... 

274 >>> for pkt in pipeline(stream_packets(f), filter_large, decode): 

275 ... print(pkt) 

276 """ 

277 result: Iterator = source # type: ignore[type-arg] 

278 

279 for transform in transforms: 

280 result = transform(result) 

281 

282 yield from result 

283 

284 

285def batch[T]( 

286 source: Iterator[T], 

287 size: int, 

288) -> Iterator[list[T]]: 

289 """Batch items from iterator. 

290 

291 Args: 

292 source: Source iterator. 

293 size: Batch size. 

294 

295 Yields: 

296 Lists of items. 

297 

298 Example: 

299 >>> for batch_items in batch(stream_packets(f), size=100): 

300 ... process_batch(batch_items) 

301 """ 

302 current_batch: list[T] = [] 

303 

304 for item in source: 

305 current_batch.append(item) 

306 if len(current_batch) >= size: 

307 yield current_batch 

308 current_batch = [] 

309 

310 if current_batch: 

311 yield current_batch 

312 

313 

314def take[T](source: Iterator[T], n: int) -> Iterator[T]: 

315 """Take first n items. 

316 

317 Args: 

318 source: Source iterator. 

319 n: Number of items to take. 

320 

321 Yields: 

322 First n items. 

323 """ 

324 count = 0 

325 for item in source: 

326 if count >= n: 

327 break 

328 yield item 

329 count += 1 # noqa: SIM113 

330 

331 

332def skip[T](source: Iterator[T], n: int) -> Iterator[T]: 

333 """Skip first n items. 

334 

335 Args: 

336 source: Source iterator. 

337 n: Number of items to skip. 

338 

339 Yields: 

340 Items after first n. 

341 """ 

342 count = 0 

343 for item in source: 

344 if count >= n: 

345 yield item 

346 count += 1 # noqa: SIM113 

347 

348 

349__all__ = [ 

350 "StreamPacket", 

351 "batch", 

352 "pipeline", 

353 "skip", 

354 "stream_delimited", 

355 "stream_file", 

356 "stream_packets", 

357 "stream_records", 

358 "take", 

359]