Coverage for src / tracekit / analyzers / packet / payload_extraction.py: 0%
72 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Payload extraction framework for network packets.
3RE-PAY-001: Payload Extraction Framework
5This module provides payload extraction from PCAP packets with metadata
6preservation, filtering, and multiple output formats.
7"""
9from __future__ import annotations
11from collections.abc import Iterator, Sequence
12from dataclasses import dataclass
13from typing import Any, Literal
15import numpy as np
18@dataclass
19class PayloadInfo:
20 """Extracted payload with metadata.
22 Implements RE-PAY-001: Payload with preserved metadata.
24 Attributes:
25 data: Payload bytes.
26 packet_index: Index of source packet.
27 timestamp: Packet timestamp (optional).
28 src_ip: Source IP address (optional).
29 dst_ip: Destination IP address (optional).
30 src_port: Source port (optional).
31 dst_port: Destination port (optional).
32 protocol: Protocol name (optional).
33 is_fragment: Whether packet is a fragment.
34 fragment_offset: Fragment offset if fragmented.
35 """
37 data: bytes
38 packet_index: int
39 timestamp: float | None = None
40 src_ip: str | None = None
41 dst_ip: str | None = None
42 src_port: int | None = None
43 dst_port: int | None = None
44 protocol: str | None = None
45 is_fragment: bool = False
46 fragment_offset: int = 0
49class PayloadExtractor:
50 """Extract payloads from network packets.
52 Implements RE-PAY-001: Payload Extraction Framework.
54 Provides zero-copy payload extraction from UDP/TCP packets
55 with metadata preservation and fragment handling.
57 Example:
58 >>> extractor = PayloadExtractor()
59 >>> payloads = extractor.extract_all_payloads(packets, protocol="UDP")
60 >>> for p in payloads:
61 ... print(f"{p.src_ip}:{p.src_port} -> {len(p.data)} bytes")
62 """
64 def __init__(
65 self,
66 include_headers: bool = False,
67 zero_copy: bool = True,
68 return_type: Literal["bytes", "memoryview", "numpy"] = "bytes",
69 ) -> None:
70 """Initialize payload extractor.
72 Args:
73 include_headers: Include protocol headers in payload.
74 zero_copy: Use zero-copy memoryview where possible.
75 return_type: Type for returned payload data.
76 """
77 self.include_headers = include_headers
78 self.zero_copy = zero_copy
79 self.return_type = return_type
81 def extract_payload(
82 self,
83 packet: dict[str, Any] | bytes,
84 layer: Literal["ethernet", "ip", "transport", "application"] = "application",
85 ) -> bytes | memoryview | np.ndarray[tuple[int], np.dtype[np.uint8]]:
86 """Extract payload from a single packet.
88 Implements RE-PAY-001: Single packet payload extraction.
90 Args:
91 packet: Packet data (dict with 'data' key or raw bytes).
92 layer: OSI layer to extract from.
94 Returns:
95 Payload data in requested format.
97 Example:
98 >>> payload = extractor.extract_payload(packet)
99 >>> print(f"Payload: {len(payload)} bytes")
100 """
101 # Handle different packet formats
102 if isinstance(packet, dict):
103 raw_data = packet.get("data", packet.get("payload", b""))
104 if isinstance(raw_data, list | tuple):
105 raw_data = bytes(raw_data)
106 else:
107 raw_data = packet
109 if not raw_data:
110 return self._format_output(b"")
112 # For raw bytes, return as-is
113 if layer == "application":
114 return self._format_output(raw_data)
116 # Layer-based extraction would require protocol parsing
117 # For now, return full data
118 return self._format_output(raw_data)
120 def extract_all_payloads(
121 self,
122 packets: Sequence[dict[str, Any] | bytes],
123 protocol: str | None = None,
124 port_filter: tuple[int | None, int | None] | None = None,
125 ) -> list[PayloadInfo]:
126 """Extract payloads from all packets with metadata.
128 Implements RE-PAY-001: Batch payload extraction with metadata.
130 Args:
131 packets: Sequence of packets.
132 protocol: Filter by protocol (e.g., "UDP", "TCP").
133 port_filter: (src_port, dst_port) filter tuple.
135 Returns:
136 List of PayloadInfo with extracted data and metadata.
138 Example:
139 >>> payloads = extractor.extract_all_payloads(packets, protocol="UDP")
140 >>> print(f"Extracted {len(payloads)} payloads")
141 """
142 results = []
144 for i, packet in enumerate(packets):
145 if isinstance(packet, dict):
146 # Extract metadata from dict
147 pkt_protocol = packet.get("protocol", "")
148 src_port = packet.get("src_port")
149 dst_port = packet.get("dst_port")
151 # Apply filters
152 if protocol and pkt_protocol.upper() != protocol.upper():
153 continue
155 if port_filter:
156 if port_filter[0] is not None and src_port != port_filter[0]:
157 continue
158 if port_filter[1] is not None and dst_port != port_filter[1]:
159 continue
161 payload = self.extract_payload(packet)
162 if isinstance(payload, memoryview | np.ndarray):
163 payload = bytes(payload)
165 info = PayloadInfo(
166 data=payload,
167 packet_index=i,
168 timestamp=packet.get("timestamp"),
169 src_ip=packet.get("src_ip"),
170 dst_ip=packet.get("dst_ip"),
171 src_port=src_port,
172 dst_port=dst_port,
173 protocol=pkt_protocol,
174 is_fragment=packet.get("is_fragment", False),
175 fragment_offset=packet.get("fragment_offset", 0),
176 )
177 results.append(info)
178 else:
179 # Raw bytes
180 payload = bytes(packet)
181 info = PayloadInfo(data=payload, packet_index=i)
182 results.append(info)
184 return results
186 def iter_payloads(
187 self,
188 packets: Sequence[dict[str, Any] | bytes],
189 ) -> Iterator[PayloadInfo]:
190 """Iterate over payloads for memory-efficient processing.
192 Implements RE-PAY-001: Streaming payload iteration.
194 Args:
195 packets: Sequence of packets.
197 Yields:
198 PayloadInfo for each packet.
199 """
200 for i, packet in enumerate(packets):
201 payload = self.extract_payload(packet)
202 if isinstance(payload, memoryview | np.ndarray):
203 payload = bytes(payload)
205 if isinstance(packet, dict):
206 info = PayloadInfo(
207 data=payload,
208 packet_index=i,
209 timestamp=packet.get("timestamp"),
210 src_ip=packet.get("src_ip"),
211 dst_ip=packet.get("dst_ip"),
212 src_port=packet.get("src_port"),
213 dst_port=packet.get("dst_port"),
214 protocol=packet.get("protocol"),
215 )
216 else:
217 info = PayloadInfo(data=payload, packet_index=i)
219 yield info
221 def _format_output(
222 self, data: bytes
223 ) -> bytes | memoryview | np.ndarray[tuple[int], np.dtype[np.uint8]]:
224 """Format output according to return_type setting."""
225 if self.return_type == "bytes":
226 return data
227 elif self.return_type == "memoryview":
228 return memoryview(data)
229 # self.return_type == "numpy"
230 return np.frombuffer(data, dtype=np.uint8)
233__all__ = [
234 "PayloadExtractor",
235 "PayloadInfo",
236]