Coverage for src / tracekit / loaders / pcap.py: 88%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""PCAP/PCAPNG packet capture file loader. 

2 

3This module provides loading of packet capture files using dpkt 

4when available, with a basic fallback implementation. 

5 

6 

7Example: 

8 >>> from tracekit.loaders.pcap import load_pcap 

9 >>> packets = load_pcap("capture.pcap") 

10 >>> for packet in packets: 

11 ... print(f"Time: {packet.timestamp}, Size: {len(packet.data)} bytes") 

12""" 

13 

14from __future__ import annotations 

15 

16import struct 

17from dataclasses import dataclass 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Any 

20 

21from tracekit.core.exceptions import FormatError, LoaderError 

22from tracekit.core.types import ProtocolPacket 

23 

24if TYPE_CHECKING: 

25 from collections.abc import Iterator 

26 from os import PathLike 

27 

28# Try to import dpkt for full PCAP support 

29try: 

30 import dpkt # type: ignore[import-not-found] 

31 

32 DPKT_AVAILABLE = True 

33except ImportError: 

34 DPKT_AVAILABLE = False 

35 

36 

37# PCAP file format constants 

38PCAP_MAGIC_LE = 0xA1B2C3D4 

39PCAP_MAGIC_BE = 0xD4C3B2A1 

40PCAP_MAGIC_NS_LE = 0xA1B23C4D # Nanosecond resolution 

41PCAP_MAGIC_NS_BE = 0x4D3CB2A1 

42PCAPNG_MAGIC = 0x0A0D0D0A 

43 

44 

45@dataclass 

46class PcapPacketList: 

47 """Container for PCAP packets with metadata. 

48 

49 Allows iteration over packets while preserving capture metadata. 

50 

51 Attributes: 

52 packets: List of ProtocolPacket objects. 

53 link_type: Link layer type (e.g., Ethernet = 1). 

54 snaplen: Maximum capture length per packet. 

55 source_file: Path to the source PCAP file. 

56 """ 

57 

58 packets: list[ProtocolPacket] 

59 link_type: int = 1 # Ethernet 

60 snaplen: int = 65535 

61 source_file: str = "" 

62 

63 def __iter__(self) -> Iterator[ProtocolPacket]: 

64 """Iterate over packets.""" 

65 return iter(self.packets) 

66 

67 def __len__(self) -> int: 

68 """Return number of packets.""" 

69 return len(self.packets) 

70 

71 def __getitem__(self, index: int) -> ProtocolPacket: 

72 """Get packet by index.""" 

73 return self.packets[index] 

74 

75 def filter( 

76 self, 

77 protocol: str | None = None, 

78 min_size: int | None = None, 

79 max_size: int | None = None, 

80 ) -> list[ProtocolPacket]: 

81 """Filter packets by criteria. 

82 

83 Args: 

84 protocol: Filter by protocol annotation. 

85 min_size: Minimum packet size in bytes. 

86 max_size: Maximum packet size in bytes. 

87 

88 Returns: 

89 Filtered list of packets. 

90 """ 

91 result = self.packets 

92 

93 if protocol is not None: 

94 result = [ 

95 p 

96 for p in result 

97 if p.annotations.get("layer3_protocol") == protocol 

98 or p.annotations.get("layer4_protocol") == protocol 

99 ] 

100 

101 if min_size is not None: 

102 result = [p for p in result if len(p.data) >= min_size] 

103 

104 if max_size is not None: 

105 result = [p for p in result if len(p.data) <= max_size] 

106 

107 return result 

108 

109 

110def load_pcap( 

111 path: str | PathLike[str], 

112 *, 

113 protocol_filter: str | None = None, 

114 max_packets: int | None = None, 

115) -> PcapPacketList: 

116 """Load a PCAP or PCAPNG packet capture file. 

117 

118 Extracts packets with timestamps and optional protocol annotations. 

119 Uses dpkt library when available for full protocol dissection. 

120 

121 Args: 

122 path: Path to the PCAP/PCAPNG file. 

123 protocol_filter: Optional protocol filter (e.g., "TCP", "UDP"). 

124 max_packets: Maximum number of packets to load. 

125 

126 Returns: 

127 PcapPacketList containing packets and capture metadata. 

128 

129 Raises: 

130 LoaderError: If the file cannot be loaded. 

131 

132 Example: 

133 >>> packets = load_pcap("network.pcap") 

134 >>> print(f"Captured {len(packets)} packets") 

135 >>> for pkt in packets[:5]: 

136 ... print(f" {pkt.timestamp:.6f}s: {len(pkt.data)} bytes") 

137 

138 >>> # Filter by protocol 

139 >>> tcp_packets = packets.filter(protocol="TCP") 

140 """ 

141 path = Path(path) 

142 

143 if not path.exists(): 

144 raise LoaderError( 

145 "File not found", 

146 file_path=str(path), 

147 ) 

148 

149 if DPKT_AVAILABLE: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 return _load_with_dpkt( 

151 path, 

152 protocol_filter=protocol_filter, 

153 max_packets=max_packets, 

154 ) 

155 else: 

156 return _load_basic( 

157 path, 

158 protocol_filter=protocol_filter, 

159 max_packets=max_packets, 

160 ) 

161 

162 

163def _load_with_dpkt( 

164 path: Path, 

165 *, 

166 protocol_filter: str | None = None, 

167 max_packets: int | None = None, 

168) -> PcapPacketList: 

169 """Load PCAP using dpkt library. 

170 

171 Args: 

172 path: Path to the PCAP file. 

173 protocol_filter: Optional protocol filter. 

174 max_packets: Maximum packets to load. 

175 

176 Returns: 

177 PcapPacketList with parsed packets. 

178 

179 Raises: 

180 LoaderError: If file cannot be read or dpkt version is incompatible. 

181 """ 

182 try: 

183 with open(path, "rb") as f: 

184 # Detect file format 

185 magic = f.read(4) 

186 f.seek(0) 

187 

188 magic_int = struct.unpack("<I", magic)[0] 

189 

190 if magic_int == PCAPNG_MAGIC: 

191 # PCAPNG format 

192 try: 

193 pcap_reader = dpkt.pcapng.Reader(f) 

194 except AttributeError: 

195 raise LoaderError( # noqa: B904 

196 "PCAPNG support requires newer dpkt version", 

197 file_path=str(path), 

198 fix_hint="Install dpkt >= 1.9: pip install dpkt>=1.9", 

199 ) 

200 else: 

201 # Standard PCAP format 

202 pcap_reader = dpkt.pcap.Reader(f) 

203 

204 packets: list[ProtocolPacket] = [] 

205 link_type = getattr(pcap_reader, "datalink", lambda: 1)() 

206 

207 for timestamp, raw_data in pcap_reader: 

208 if max_packets is not None and len(packets) >= max_packets: 

209 break 

210 

211 # Parse Ethernet frame 

212 annotations: dict[str, Any] = {} 

213 protocol = "RAW" 

214 

215 try: 

216 if link_type == 1: # Ethernet 216 ↛ 262line 216 didn't jump to line 262 because the condition on line 216 was always true

217 eth = dpkt.ethernet.Ethernet(raw_data) 

218 annotations["src_mac"] = _format_mac(eth.src) 

219 annotations["dst_mac"] = _format_mac(eth.dst) 

220 

221 # Parse IP layer 

222 if isinstance(eth.data, dpkt.ip.IP): 222 ↛ 249line 222 didn't jump to line 249 because the condition on line 222 was always true

223 ip = eth.data 

224 protocol = "IP" 

225 annotations["src_ip"] = _format_ip(ip.src) 

226 annotations["dst_ip"] = _format_ip(ip.dst) 

227 annotations["layer3_protocol"] = "IP" 

228 

229 # Parse transport layer 

230 if isinstance(ip.data, dpkt.tcp.TCP): 

231 tcp = ip.data 

232 protocol = "TCP" 

233 annotations["src_port"] = tcp.sport 

234 annotations["dst_port"] = tcp.dport 

235 annotations["layer4_protocol"] = "TCP" 

236 annotations["tcp_flags"] = tcp.flags 

237 

238 elif isinstance(ip.data, dpkt.udp.UDP): 238 ↛ 245line 238 didn't jump to line 245 because the condition on line 238 was always true

239 udp = ip.data 

240 protocol = "UDP" 

241 annotations["src_port"] = udp.sport 

242 annotations["dst_port"] = udp.dport 

243 annotations["layer4_protocol"] = "UDP" 

244 

245 elif isinstance(ip.data, dpkt.icmp.ICMP): 

246 protocol = "ICMP" 

247 annotations["layer4_protocol"] = "ICMP" 

248 

249 elif isinstance(eth.data, dpkt.ip6.IP6): 

250 protocol = "IPv6" 

251 annotations["layer3_protocol"] = "IPv6" 

252 

253 elif isinstance(eth.data, dpkt.arp.ARP): 

254 protocol = "ARP" 

255 annotations["layer3_protocol"] = "ARP" 

256 

257 except Exception: 

258 # If parsing fails, store raw data 

259 pass 

260 

261 # Apply protocol filter 

262 if protocol_filter is not None and ( 

263 annotations.get("layer3_protocol") != protocol_filter 

264 and annotations.get("layer4_protocol") != protocol_filter 

265 and protocol != protocol_filter 

266 ): 

267 continue 

268 

269 packet = ProtocolPacket( 

270 timestamp=float(timestamp), 

271 protocol=protocol, 

272 data=bytes(raw_data), 

273 annotations=annotations, 

274 ) 

275 packets.append(packet) 

276 

277 return PcapPacketList( 

278 packets=packets, 

279 link_type=link_type, 

280 source_file=str(path), 

281 ) 

282 

283 except Exception as e: 

284 if isinstance(e, LoaderError | FormatError): 

285 raise 

286 raise LoaderError( 

287 "Failed to load PCAP file", 

288 file_path=str(path), 

289 details=str(e), 

290 fix_hint="Ensure the file is a valid PCAP/PCAPNG format.", 

291 ) from e 

292 

293 

294def _load_basic( 

295 path: Path, 

296 *, 

297 protocol_filter: str | None = None, 

298 max_packets: int | None = None, 

299) -> PcapPacketList: 

300 """Basic PCAP loader without dpkt. 

301 

302 Args: 

303 path: Path to the PCAP file. 

304 protocol_filter: Optional protocol filter (not supported in basic mode). 

305 max_packets: Maximum packets to load. 

306 

307 Returns: 

308 PcapPacketList with raw packet data. 

309 

310 Raises: 

311 FormatError: If file is not a valid PCAP. 

312 LoaderError: If file cannot be read. 

313 """ 

314 try: 

315 with open(path, "rb") as f: 

316 # Read global header (24 bytes) 

317 header = f.read(24) 

318 if len(header) < 24: 

319 raise FormatError( 

320 "File too small to be a valid PCAP", 

321 file_path=str(path), 

322 expected="At least 24 bytes", 

323 got=f"{len(header)} bytes", 

324 ) 

325 

326 # Parse magic number 

327 magic = struct.unpack("<I", header[:4])[0] 

328 

329 if magic in (PCAP_MAGIC_LE, PCAP_MAGIC_NS_LE): 

330 byte_order = "<" 

331 nanosecond = magic == PCAP_MAGIC_NS_LE 

332 elif magic in (PCAP_MAGIC_BE, PCAP_MAGIC_NS_BE): 

333 byte_order = ">" 

334 nanosecond = magic == PCAP_MAGIC_NS_BE 

335 elif magic == PCAPNG_MAGIC: 

336 raise LoaderError( 

337 "PCAPNG format requires dpkt library", 

338 file_path=str(path), 

339 fix_hint="Install dpkt: pip install dpkt", 

340 ) 

341 else: 

342 raise FormatError( 

343 "Invalid PCAP magic number", 

344 file_path=str(path), 

345 expected="PCAP magic (0xa1b2c3d4)", 

346 got=f"0x{magic:08x}", 

347 ) 

348 

349 # Parse rest of header (version_major, version_minor, thiszone, sigfigs, snaplen, network) 

350 _, _, _, _, snaplen, link_type = struct.unpack(f"{byte_order}HHiIII", header[4:]) 

351 

352 packets: list[ProtocolPacket] = [] 

353 

354 # Read packets 

355 while True: 

356 if max_packets is not None and len(packets) >= max_packets: 

357 break 

358 

359 # Read packet header (16 bytes) 

360 pkt_header = f.read(16) 

361 if len(pkt_header) < 16: 

362 break 

363 

364 ts_sec, ts_usec, incl_len, orig_len = struct.unpack(f"{byte_order}IIII", pkt_header) 

365 

366 # Calculate timestamp 

367 if nanosecond: 

368 timestamp = ts_sec + ts_usec / 1e9 

369 else: 

370 timestamp = ts_sec + ts_usec / 1e6 

371 

372 # Read packet data 

373 pkt_data = f.read(incl_len) 

374 if len(pkt_data) < incl_len: 

375 break 

376 

377 packet = ProtocolPacket( 

378 timestamp=timestamp, 

379 protocol="RAW", 

380 data=bytes(pkt_data), 

381 annotations={"original_length": orig_len}, 

382 ) 

383 packets.append(packet) 

384 

385 return PcapPacketList( 

386 packets=packets, 

387 link_type=link_type, 

388 snaplen=snaplen, 

389 source_file=str(path), 

390 ) 

391 

392 except struct.error as e: 

393 raise FormatError( 

394 "Corrupted PCAP file", 

395 file_path=str(path), 

396 ) from e 

397 except Exception as e: 

398 if isinstance(e, LoaderError | FormatError): 398 ↛ 400line 398 didn't jump to line 400 because the condition on line 398 was always true

399 raise 

400 raise LoaderError( 

401 "Failed to load PCAP file", 

402 file_path=str(path), 

403 details=str(e), 

404 fix_hint="Install dpkt for full PCAP support: pip install dpkt", 

405 ) from e 

406 

407 

408def _format_mac(mac_bytes: bytes) -> str: 

409 """Format MAC address bytes to string. 

410 

411 Args: 

412 mac_bytes: 6-byte MAC address. 

413 

414 Returns: 

415 MAC address string (e.g., "00:11:22:33:44:55"). 

416 """ 

417 return ":".join(f"{b:02x}" for b in mac_bytes) 

418 

419 

420def _format_ip(ip_bytes: bytes) -> str: 

421 """Format IPv4 address bytes to string. 

422 

423 Args: 

424 ip_bytes: 4-byte IPv4 address. 

425 

426 Returns: 

427 IPv4 address string (e.g., "192.168.1.1"). 

428 """ 

429 return ".".join(str(b) for b in ip_bytes) 

430 

431 

432__all__ = ["PcapPacketList", "load_pcap"]