Coverage for src / tracekit / analyzers / packet / payload_extraction.py: 0%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Payload extraction framework for network packets. 

2 

3RE-PAY-001: Payload Extraction Framework 

4 

5This module provides payload extraction from PCAP packets with metadata 

6preservation, filtering, and multiple output formats. 

7""" 

8 

9from __future__ import annotations 

10 

11from collections.abc import Iterator, Sequence 

12from dataclasses import dataclass 

13from typing import Any, Literal 

14 

15import numpy as np 

16 

17 

18@dataclass 

19class PayloadInfo: 

20 """Extracted payload with metadata. 

21 

22 Implements RE-PAY-001: Payload with preserved metadata. 

23 

24 Attributes: 

25 data: Payload bytes. 

26 packet_index: Index of source packet. 

27 timestamp: Packet timestamp (optional). 

28 src_ip: Source IP address (optional). 

29 dst_ip: Destination IP address (optional). 

30 src_port: Source port (optional). 

31 dst_port: Destination port (optional). 

32 protocol: Protocol name (optional). 

33 is_fragment: Whether packet is a fragment. 

34 fragment_offset: Fragment offset if fragmented. 

35 """ 

36 

37 data: bytes 

38 packet_index: int 

39 timestamp: float | None = None 

40 src_ip: str | None = None 

41 dst_ip: str | None = None 

42 src_port: int | None = None 

43 dst_port: int | None = None 

44 protocol: str | None = None 

45 is_fragment: bool = False 

46 fragment_offset: int = 0 

47 

48 

49class PayloadExtractor: 

50 """Extract payloads from network packets. 

51 

52 Implements RE-PAY-001: Payload Extraction Framework. 

53 

54 Provides zero-copy payload extraction from UDP/TCP packets 

55 with metadata preservation and fragment handling. 

56 

57 Example: 

58 >>> extractor = PayloadExtractor() 

59 >>> payloads = extractor.extract_all_payloads(packets, protocol="UDP") 

60 >>> for p in payloads: 

61 ... print(f"{p.src_ip}:{p.src_port} -> {len(p.data)} bytes") 

62 """ 

63 

64 def __init__( 

65 self, 

66 include_headers: bool = False, 

67 zero_copy: bool = True, 

68 return_type: Literal["bytes", "memoryview", "numpy"] = "bytes", 

69 ) -> None: 

70 """Initialize payload extractor. 

71 

72 Args: 

73 include_headers: Include protocol headers in payload. 

74 zero_copy: Use zero-copy memoryview where possible. 

75 return_type: Type for returned payload data. 

76 """ 

77 self.include_headers = include_headers 

78 self.zero_copy = zero_copy 

79 self.return_type = return_type 

80 

81 def extract_payload( 

82 self, 

83 packet: dict[str, Any] | bytes, 

84 layer: Literal["ethernet", "ip", "transport", "application"] = "application", 

85 ) -> bytes | memoryview | np.ndarray[tuple[int], np.dtype[np.uint8]]: 

86 """Extract payload from a single packet. 

87 

88 Implements RE-PAY-001: Single packet payload extraction. 

89 

90 Args: 

91 packet: Packet data (dict with 'data' key or raw bytes). 

92 layer: OSI layer to extract from. 

93 

94 Returns: 

95 Payload data in requested format. 

96 

97 Example: 

98 >>> payload = extractor.extract_payload(packet) 

99 >>> print(f"Payload: {len(payload)} bytes") 

100 """ 

101 # Handle different packet formats 

102 if isinstance(packet, dict): 

103 raw_data = packet.get("data", packet.get("payload", b"")) 

104 if isinstance(raw_data, list | tuple): 

105 raw_data = bytes(raw_data) 

106 else: 

107 raw_data = packet 

108 

109 if not raw_data: 

110 return self._format_output(b"") 

111 

112 # For raw bytes, return as-is 

113 if layer == "application": 

114 return self._format_output(raw_data) 

115 

116 # Layer-based extraction would require protocol parsing 

117 # For now, return full data 

118 return self._format_output(raw_data) 

119 

120 def extract_all_payloads( 

121 self, 

122 packets: Sequence[dict[str, Any] | bytes], 

123 protocol: str | None = None, 

124 port_filter: tuple[int | None, int | None] | None = None, 

125 ) -> list[PayloadInfo]: 

126 """Extract payloads from all packets with metadata. 

127 

128 Implements RE-PAY-001: Batch payload extraction with metadata. 

129 

130 Args: 

131 packets: Sequence of packets. 

132 protocol: Filter by protocol (e.g., "UDP", "TCP"). 

133 port_filter: (src_port, dst_port) filter tuple. 

134 

135 Returns: 

136 List of PayloadInfo with extracted data and metadata. 

137 

138 Example: 

139 >>> payloads = extractor.extract_all_payloads(packets, protocol="UDP") 

140 >>> print(f"Extracted {len(payloads)} payloads") 

141 """ 

142 results = [] 

143 

144 for i, packet in enumerate(packets): 

145 if isinstance(packet, dict): 

146 # Extract metadata from dict 

147 pkt_protocol = packet.get("protocol", "") 

148 src_port = packet.get("src_port") 

149 dst_port = packet.get("dst_port") 

150 

151 # Apply filters 

152 if protocol and pkt_protocol.upper() != protocol.upper(): 

153 continue 

154 

155 if port_filter: 

156 if port_filter[0] is not None and src_port != port_filter[0]: 

157 continue 

158 if port_filter[1] is not None and dst_port != port_filter[1]: 

159 continue 

160 

161 payload = self.extract_payload(packet) 

162 if isinstance(payload, memoryview | np.ndarray): 

163 payload = bytes(payload) 

164 

165 info = PayloadInfo( 

166 data=payload, 

167 packet_index=i, 

168 timestamp=packet.get("timestamp"), 

169 src_ip=packet.get("src_ip"), 

170 dst_ip=packet.get("dst_ip"), 

171 src_port=src_port, 

172 dst_port=dst_port, 

173 protocol=pkt_protocol, 

174 is_fragment=packet.get("is_fragment", False), 

175 fragment_offset=packet.get("fragment_offset", 0), 

176 ) 

177 results.append(info) 

178 else: 

179 # Raw bytes 

180 payload = bytes(packet) 

181 info = PayloadInfo(data=payload, packet_index=i) 

182 results.append(info) 

183 

184 return results 

185 

186 def iter_payloads( 

187 self, 

188 packets: Sequence[dict[str, Any] | bytes], 

189 ) -> Iterator[PayloadInfo]: 

190 """Iterate over payloads for memory-efficient processing. 

191 

192 Implements RE-PAY-001: Streaming payload iteration. 

193 

194 Args: 

195 packets: Sequence of packets. 

196 

197 Yields: 

198 PayloadInfo for each packet. 

199 """ 

200 for i, packet in enumerate(packets): 

201 payload = self.extract_payload(packet) 

202 if isinstance(payload, memoryview | np.ndarray): 

203 payload = bytes(payload) 

204 

205 if isinstance(packet, dict): 

206 info = PayloadInfo( 

207 data=payload, 

208 packet_index=i, 

209 timestamp=packet.get("timestamp"), 

210 src_ip=packet.get("src_ip"), 

211 dst_ip=packet.get("dst_ip"), 

212 src_port=packet.get("src_port"), 

213 dst_port=packet.get("dst_port"), 

214 protocol=packet.get("protocol"), 

215 ) 

216 else: 

217 info = PayloadInfo(data=payload, packet_index=i) 

218 

219 yield info 

220 

221 def _format_output( 

222 self, data: bytes 

223 ) -> bytes | memoryview | np.ndarray[tuple[int], np.dtype[np.uint8]]: 

224 """Format output according to return_type setting.""" 

225 if self.return_type == "bytes": 

226 return data 

227 elif self.return_type == "memoryview": 

228 return memoryview(data) 

229 # self.return_type == "numpy" 

230 return np.frombuffer(data, dtype=np.uint8) 

231 

232 

233__all__ = [ 

234 "PayloadExtractor", 

235 "PayloadInfo", 

236]