Coverage for src / tracekit / loaders / pcap.py: 88%
151 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""PCAP/PCAPNG packet capture file loader.
3This module provides loading of packet capture files using dpkt
4when available, with a basic fallback implementation.
7Example:
8 >>> from tracekit.loaders.pcap import load_pcap
9 >>> packets = load_pcap("capture.pcap")
10 >>> for packet in packets:
11 ... print(f"Time: {packet.timestamp}, Size: {len(packet.data)} bytes")
12"""
14from __future__ import annotations
16import struct
17from dataclasses import dataclass
18from pathlib import Path
19from typing import TYPE_CHECKING, Any
21from tracekit.core.exceptions import FormatError, LoaderError
22from tracekit.core.types import ProtocolPacket
24if TYPE_CHECKING:
25 from collections.abc import Iterator
26 from os import PathLike
28# Try to import dpkt for full PCAP support
29try:
30 import dpkt # type: ignore[import-not-found]
32 DPKT_AVAILABLE = True
33except ImportError:
34 DPKT_AVAILABLE = False
37# PCAP file format constants
38PCAP_MAGIC_LE = 0xA1B2C3D4
39PCAP_MAGIC_BE = 0xD4C3B2A1
40PCAP_MAGIC_NS_LE = 0xA1B23C4D # Nanosecond resolution
41PCAP_MAGIC_NS_BE = 0x4D3CB2A1
42PCAPNG_MAGIC = 0x0A0D0D0A
45@dataclass
46class PcapPacketList:
47 """Container for PCAP packets with metadata.
49 Allows iteration over packets while preserving capture metadata.
51 Attributes:
52 packets: List of ProtocolPacket objects.
53 link_type: Link layer type (e.g., Ethernet = 1).
54 snaplen: Maximum capture length per packet.
55 source_file: Path to the source PCAP file.
56 """
58 packets: list[ProtocolPacket]
59 link_type: int = 1 # Ethernet
60 snaplen: int = 65535
61 source_file: str = ""
63 def __iter__(self) -> Iterator[ProtocolPacket]:
64 """Iterate over packets."""
65 return iter(self.packets)
67 def __len__(self) -> int:
68 """Return number of packets."""
69 return len(self.packets)
71 def __getitem__(self, index: int) -> ProtocolPacket:
72 """Get packet by index."""
73 return self.packets[index]
75 def filter(
76 self,
77 protocol: str | None = None,
78 min_size: int | None = None,
79 max_size: int | None = None,
80 ) -> list[ProtocolPacket]:
81 """Filter packets by criteria.
83 Args:
84 protocol: Filter by protocol annotation.
85 min_size: Minimum packet size in bytes.
86 max_size: Maximum packet size in bytes.
88 Returns:
89 Filtered list of packets.
90 """
91 result = self.packets
93 if protocol is not None:
94 result = [
95 p
96 for p in result
97 if p.annotations.get("layer3_protocol") == protocol
98 or p.annotations.get("layer4_protocol") == protocol
99 ]
101 if min_size is not None:
102 result = [p for p in result if len(p.data) >= min_size]
104 if max_size is not None:
105 result = [p for p in result if len(p.data) <= max_size]
107 return result
110def load_pcap(
111 path: str | PathLike[str],
112 *,
113 protocol_filter: str | None = None,
114 max_packets: int | None = None,
115) -> PcapPacketList:
116 """Load a PCAP or PCAPNG packet capture file.
118 Extracts packets with timestamps and optional protocol annotations.
119 Uses dpkt library when available for full protocol dissection.
121 Args:
122 path: Path to the PCAP/PCAPNG file.
123 protocol_filter: Optional protocol filter (e.g., "TCP", "UDP").
124 max_packets: Maximum number of packets to load.
126 Returns:
127 PcapPacketList containing packets and capture metadata.
129 Raises:
130 LoaderError: If the file cannot be loaded.
132 Example:
133 >>> packets = load_pcap("network.pcap")
134 >>> print(f"Captured {len(packets)} packets")
135 >>> for pkt in packets[:5]:
136 ... print(f" {pkt.timestamp:.6f}s: {len(pkt.data)} bytes")
138 >>> # Filter by protocol
139 >>> tcp_packets = packets.filter(protocol="TCP")
140 """
141 path = Path(path)
143 if not path.exists():
144 raise LoaderError(
145 "File not found",
146 file_path=str(path),
147 )
149 if DPKT_AVAILABLE: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 return _load_with_dpkt(
151 path,
152 protocol_filter=protocol_filter,
153 max_packets=max_packets,
154 )
155 else:
156 return _load_basic(
157 path,
158 protocol_filter=protocol_filter,
159 max_packets=max_packets,
160 )
163def _load_with_dpkt(
164 path: Path,
165 *,
166 protocol_filter: str | None = None,
167 max_packets: int | None = None,
168) -> PcapPacketList:
169 """Load PCAP using dpkt library.
171 Args:
172 path: Path to the PCAP file.
173 protocol_filter: Optional protocol filter.
174 max_packets: Maximum packets to load.
176 Returns:
177 PcapPacketList with parsed packets.
179 Raises:
180 LoaderError: If file cannot be read or dpkt version is incompatible.
181 """
182 try:
183 with open(path, "rb") as f:
184 # Detect file format
185 magic = f.read(4)
186 f.seek(0)
188 magic_int = struct.unpack("<I", magic)[0]
190 if magic_int == PCAPNG_MAGIC:
191 # PCAPNG format
192 try:
193 pcap_reader = dpkt.pcapng.Reader(f)
194 except AttributeError:
195 raise LoaderError( # noqa: B904
196 "PCAPNG support requires newer dpkt version",
197 file_path=str(path),
198 fix_hint="Install dpkt >= 1.9: pip install dpkt>=1.9",
199 )
200 else:
201 # Standard PCAP format
202 pcap_reader = dpkt.pcap.Reader(f)
204 packets: list[ProtocolPacket] = []
205 link_type = getattr(pcap_reader, "datalink", lambda: 1)()
207 for timestamp, raw_data in pcap_reader:
208 if max_packets is not None and len(packets) >= max_packets:
209 break
211 # Parse Ethernet frame
212 annotations: dict[str, Any] = {}
213 protocol = "RAW"
215 try:
216 if link_type == 1: # Ethernet 216 ↛ 262line 216 didn't jump to line 262 because the condition on line 216 was always true
217 eth = dpkt.ethernet.Ethernet(raw_data)
218 annotations["src_mac"] = _format_mac(eth.src)
219 annotations["dst_mac"] = _format_mac(eth.dst)
221 # Parse IP layer
222 if isinstance(eth.data, dpkt.ip.IP): 222 ↛ 249line 222 didn't jump to line 249 because the condition on line 222 was always true
223 ip = eth.data
224 protocol = "IP"
225 annotations["src_ip"] = _format_ip(ip.src)
226 annotations["dst_ip"] = _format_ip(ip.dst)
227 annotations["layer3_protocol"] = "IP"
229 # Parse transport layer
230 if isinstance(ip.data, dpkt.tcp.TCP):
231 tcp = ip.data
232 protocol = "TCP"
233 annotations["src_port"] = tcp.sport
234 annotations["dst_port"] = tcp.dport
235 annotations["layer4_protocol"] = "TCP"
236 annotations["tcp_flags"] = tcp.flags
238 elif isinstance(ip.data, dpkt.udp.UDP): 238 ↛ 245line 238 didn't jump to line 245 because the condition on line 238 was always true
239 udp = ip.data
240 protocol = "UDP"
241 annotations["src_port"] = udp.sport
242 annotations["dst_port"] = udp.dport
243 annotations["layer4_protocol"] = "UDP"
245 elif isinstance(ip.data, dpkt.icmp.ICMP):
246 protocol = "ICMP"
247 annotations["layer4_protocol"] = "ICMP"
249 elif isinstance(eth.data, dpkt.ip6.IP6):
250 protocol = "IPv6"
251 annotations["layer3_protocol"] = "IPv6"
253 elif isinstance(eth.data, dpkt.arp.ARP):
254 protocol = "ARP"
255 annotations["layer3_protocol"] = "ARP"
257 except Exception:
258 # If parsing fails, store raw data
259 pass
261 # Apply protocol filter
262 if protocol_filter is not None and (
263 annotations.get("layer3_protocol") != protocol_filter
264 and annotations.get("layer4_protocol") != protocol_filter
265 and protocol != protocol_filter
266 ):
267 continue
269 packet = ProtocolPacket(
270 timestamp=float(timestamp),
271 protocol=protocol,
272 data=bytes(raw_data),
273 annotations=annotations,
274 )
275 packets.append(packet)
277 return PcapPacketList(
278 packets=packets,
279 link_type=link_type,
280 source_file=str(path),
281 )
283 except Exception as e:
284 if isinstance(e, LoaderError | FormatError):
285 raise
286 raise LoaderError(
287 "Failed to load PCAP file",
288 file_path=str(path),
289 details=str(e),
290 fix_hint="Ensure the file is a valid PCAP/PCAPNG format.",
291 ) from e
294def _load_basic(
295 path: Path,
296 *,
297 protocol_filter: str | None = None,
298 max_packets: int | None = None,
299) -> PcapPacketList:
300 """Basic PCAP loader without dpkt.
302 Args:
303 path: Path to the PCAP file.
304 protocol_filter: Optional protocol filter (not supported in basic mode).
305 max_packets: Maximum packets to load.
307 Returns:
308 PcapPacketList with raw packet data.
310 Raises:
311 FormatError: If file is not a valid PCAP.
312 LoaderError: If file cannot be read.
313 """
314 try:
315 with open(path, "rb") as f:
316 # Read global header (24 bytes)
317 header = f.read(24)
318 if len(header) < 24:
319 raise FormatError(
320 "File too small to be a valid PCAP",
321 file_path=str(path),
322 expected="At least 24 bytes",
323 got=f"{len(header)} bytes",
324 )
326 # Parse magic number
327 magic = struct.unpack("<I", header[:4])[0]
329 if magic in (PCAP_MAGIC_LE, PCAP_MAGIC_NS_LE):
330 byte_order = "<"
331 nanosecond = magic == PCAP_MAGIC_NS_LE
332 elif magic in (PCAP_MAGIC_BE, PCAP_MAGIC_NS_BE):
333 byte_order = ">"
334 nanosecond = magic == PCAP_MAGIC_NS_BE
335 elif magic == PCAPNG_MAGIC:
336 raise LoaderError(
337 "PCAPNG format requires dpkt library",
338 file_path=str(path),
339 fix_hint="Install dpkt: pip install dpkt",
340 )
341 else:
342 raise FormatError(
343 "Invalid PCAP magic number",
344 file_path=str(path),
345 expected="PCAP magic (0xa1b2c3d4)",
346 got=f"0x{magic:08x}",
347 )
349 # Parse rest of header (version_major, version_minor, thiszone, sigfigs, snaplen, network)
350 _, _, _, _, snaplen, link_type = struct.unpack(f"{byte_order}HHiIII", header[4:])
352 packets: list[ProtocolPacket] = []
354 # Read packets
355 while True:
356 if max_packets is not None and len(packets) >= max_packets:
357 break
359 # Read packet header (16 bytes)
360 pkt_header = f.read(16)
361 if len(pkt_header) < 16:
362 break
364 ts_sec, ts_usec, incl_len, orig_len = struct.unpack(f"{byte_order}IIII", pkt_header)
366 # Calculate timestamp
367 if nanosecond:
368 timestamp = ts_sec + ts_usec / 1e9
369 else:
370 timestamp = ts_sec + ts_usec / 1e6
372 # Read packet data
373 pkt_data = f.read(incl_len)
374 if len(pkt_data) < incl_len:
375 break
377 packet = ProtocolPacket(
378 timestamp=timestamp,
379 protocol="RAW",
380 data=bytes(pkt_data),
381 annotations={"original_length": orig_len},
382 )
383 packets.append(packet)
385 return PcapPacketList(
386 packets=packets,
387 link_type=link_type,
388 snaplen=snaplen,
389 source_file=str(path),
390 )
392 except struct.error as e:
393 raise FormatError(
394 "Corrupted PCAP file",
395 file_path=str(path),
396 ) from e
397 except Exception as e:
398 if isinstance(e, LoaderError | FormatError): 398 ↛ 400line 398 didn't jump to line 400 because the condition on line 398 was always true
399 raise
400 raise LoaderError(
401 "Failed to load PCAP file",
402 file_path=str(path),
403 details=str(e),
404 fix_hint="Install dpkt for full PCAP support: pip install dpkt",
405 ) from e
408def _format_mac(mac_bytes: bytes) -> str:
409 """Format MAC address bytes to string.
411 Args:
412 mac_bytes: 6-byte MAC address.
414 Returns:
415 MAC address string (e.g., "00:11:22:33:44:55").
416 """
417 return ":".join(f"{b:02x}" for b in mac_bytes)
420def _format_ip(ip_bytes: bytes) -> str:
421 """Format IPv4 address bytes to string.
423 Args:
424 ip_bytes: 4-byte IPv4 address.
426 Returns:
427 IPv4 address string (e.g., "192.168.1.1").
428 """
429 return ".".join(str(b) for b in ip_bytes)
432__all__ = ["PcapPacketList", "load_pcap"]