Coverage for src / tracekit / analyzers / packet / parser.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Binary parsing utilities for packet analysis. 

2 

3This module provides fast binary parsing using struct.Struct 

4pre-compilation and TLV record support. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.packet.parser import BinaryParser, parse_tlv 

9 >>> parser = BinaryParser(">HBB") # big-endian: ushort, 2 ubytes 

10 >>> values = parser.unpack(data) 

11 

12References: 

13 Python struct module documentation 

14""" 

15 

16from __future__ import annotations 

17 

18import struct 

19from dataclasses import dataclass 

20from typing import TYPE_CHECKING, Any 

21 

22if TYPE_CHECKING: 

23 from collections.abc import Iterator 

24 

25 

26@dataclass 

27class TLVRecord: 

28 """Type-Length-Value record. 

29 

30 Attributes: 

31 type_id: Record type identifier. 

32 length: Data length in bytes. 

33 value: Record data. 

34 offset: Byte offset in source data. 

35 """ 

36 

37 type_id: int 

38 length: int 

39 value: bytes 

40 offset: int = 0 

41 

42 

43class BinaryParser: 

44 """Fast binary parser using pre-compiled struct format. 

45 

46 Uses struct.Struct for pre-compiled format strings to achieve 

47 high parsing throughput (>10K packets/second). 

48 

49 Args: 

50 format_string: struct format string (e.g., ">HBB"). 

51 

52 Attributes: 

53 format: The format string. 

54 size: Size of the packed structure in bytes. 

55 

56 Example: 

57 >>> parser = BinaryParser(">HHBBI") # Header format 

58 >>> header = parser.unpack(data) 

59 >>> print(f"Fields: {header}") 

60 

61 >>> # Unpack from offset 

62 >>> payload = parser.unpack_from(data, offset=8) 

63 """ 

64 

65 def __init__(self, format_string: str) -> None: 

66 """Initialize parser with format string. 

67 

68 Args: 

69 format_string: struct format string. Use standard prefixes: 

70 - ">": Big-endian 

71 - "<": Little-endian 

72 - "=": Native byte order 

73 - "!": Network byte order (big-endian) 

74 """ 

75 self._struct = struct.Struct(format_string) 

76 self._format = format_string 

77 

78 @property 

79 def format(self) -> str: 

80 """Get format string.""" 

81 return self._format 

82 

83 @property 

84 def size(self) -> int: 

85 """Get packed size in bytes.""" 

86 return self._struct.size 

87 

88 def unpack(self, buffer: bytes) -> tuple[Any, ...]: 

89 """Unpack data from bytes. 

90 

91 Args: 

92 buffer: Bytes to unpack. Must be at least self.size bytes. 

93 

94 Returns: 

95 Tuple of unpacked values. 

96 """ 

97 return self._struct.unpack(buffer[: self.size]) 

98 

99 def unpack_from(self, buffer: bytes, offset: int = 0) -> tuple[Any, ...]: 

100 """Unpack data from offset in buffer. 

101 

102 More efficient than slicing when parsing multiple fields 

103 from a single buffer. 

104 

105 Args: 

106 buffer: Source buffer. 

107 offset: Byte offset to start unpacking. 

108 

109 Returns: 

110 Tuple of unpacked values. 

111 """ 

112 return self._struct.unpack_from(buffer, offset) 

113 

114 def pack(self, *values: Any) -> bytes: 

115 """Pack values to bytes. 

116 

117 Args: 

118 *values: Values matching format string types. 

119 

120 Returns: 

121 Packed bytes. 

122 """ 

123 return self._struct.pack(*values) 

124 

125 def iter_unpack(self, buffer: bytes) -> Iterator[tuple[Any, ...]]: 

126 """Iterate over repeated structures in buffer. 

127 

128 Args: 

129 buffer: Source buffer. 

130 

131 Returns: 

132 Iterator yielding tuples of unpacked values. 

133 

134 Example: 

135 >>> parser = BinaryParser(">HH") 

136 >>> for a, b in parser.iter_unpack(data): 

137 ... print(f"Pair: {a}, {b}") 

138 """ 

139 return self._struct.iter_unpack(buffer) 

140 

141 

142class PacketParser: 

143 """Multi-field packet parser. 

144 

145 Parses packets with multiple fields using named field definitions. 

146 

147 Example: 

148 >>> fields = [ 

149 ... ("sync", "H"), 

150 ... ("length", "H"), 

151 ... ("type", "B"), 

152 ... ("flags", "B"), 

153 ... ] 

154 >>> parser = PacketParser(fields, byte_order=">") 

155 >>> packet = parser.parse(data) 

156 >>> print(f"Type: {packet['type']}") 

157 """ 

158 

159 def __init__( 

160 self, 

161 fields: list[tuple[str, str]], 

162 byte_order: str = ">", 

163 ) -> None: 

164 """Initialize packet parser. 

165 

166 Args: 

167 fields: List of (name, format_char) tuples. 

168 byte_order: Byte order prefix (">", "<", "=", "!"). 

169 """ 

170 self._field_names = [f[0] for f in fields] 

171 format_chars = "".join(f[1] for f in fields) 

172 self._parser = BinaryParser(byte_order + format_chars) 

173 

174 @property 

175 def size(self) -> int: 

176 """Get packet header size in bytes.""" 

177 return self._parser.size 

178 

179 def parse(self, buffer: bytes, offset: int = 0) -> dict[str, Any]: 

180 """Parse packet fields. 

181 

182 Args: 

183 buffer: Source buffer. 

184 offset: Byte offset. 

185 

186 Returns: 

187 Dictionary mapping field names to values. 

188 """ 

189 values = self._parser.unpack_from(buffer, offset) 

190 return dict(zip(self._field_names, values, strict=False)) 

191 

192 def pack(self, **fields: Any) -> bytes: 

193 """Pack fields to bytes. 

194 

195 Args: 

196 **fields: Field values by name. 

197 

198 Returns: 

199 Packed bytes. 

200 """ 

201 values = [fields[name] for name in self._field_names] 

202 return self._parser.pack(*values) 

203 

204 

205def parse_tlv( 

206 buffer: bytes, 

207 *, 

208 type_size: int = 1, 

209 length_size: int = 1, 

210 big_endian: bool = True, 

211 include_length_in_length: bool = False, 

212 type_map: dict[int, str] | None = None, 

213) -> list[TLVRecord]: 

214 """Parse Type-Length-Value records. 

215 

216 Args: 

217 buffer: Source buffer containing TLV records. 

218 type_size: Size of type field in bytes (1, 2, or 4). 

219 length_size: Size of length field in bytes (1, 2, or 4). 

220 big_endian: True for big-endian byte order. 

221 include_length_in_length: True if length includes type+length fields. 

222 type_map: Optional mapping of type IDs to names. 

223 

224 Returns: 

225 List of TLVRecord objects. 

226 

227 Example: 

228 >>> records = parse_tlv(data, type_size=2, length_size=2) 

229 >>> for rec in records: 

230 ... print(f"Type {rec.type_id}: {rec.length} bytes") 

231 """ 

232 records: list[TLVRecord] = [] 

233 offset = 0 

234 header_size = type_size + length_size 

235 

236 # Determine struct format 

237 byte_order = ">" if big_endian else "<" 

238 type_fmt = {1: "B", 2: "H", 4: "I"}[type_size] 

239 length_fmt = {1: "B", 2: "H", 4: "I"}[length_size] 

240 header_parser = BinaryParser(byte_order + type_fmt + length_fmt) 

241 

242 while offset + header_size <= len(buffer): 

243 type_id, length = header_parser.unpack_from(buffer, offset) 

244 

245 # Adjust length if it includes header 

246 data_length = length - header_size if include_length_in_length else length 

247 

248 if data_length < 0: 

249 break 

250 

251 # Extract value 

252 value_start = offset + header_size 

253 value_end = value_start + data_length 

254 

255 if value_end > len(buffer): 

256 break 

257 

258 value = buffer[value_start:value_end] 

259 

260 records.append( 

261 TLVRecord( 

262 type_id=type_id, 

263 length=data_length, 

264 value=value, 

265 offset=offset, 

266 ) 

267 ) 

268 

269 offset = value_end 

270 

271 return records 

272 

273 

274def parse_tlv_nested( 

275 buffer: bytes, 

276 *, 

277 type_size: int = 1, 

278 length_size: int = 1, 

279 big_endian: bool = True, 

280 container_types: set[int] | None = None, 

281) -> dict[int, Any]: 

282 """Parse nested TLV structure. 

283 

284 Args: 

285 buffer: Source buffer. 

286 type_size: Size of type field. 

287 length_size: Size of length field. 

288 big_endian: Byte order. 

289 container_types: Set of type IDs that contain nested TLV. 

290 

291 Returns: 

292 Dictionary with type_id keys and either bytes or nested dict values. 

293 """ 

294 container_types = container_types or set() 

295 result: dict[int, Any] = {} 

296 

297 records = parse_tlv( 

298 buffer, 

299 type_size=type_size, 

300 length_size=length_size, 

301 big_endian=big_endian, 

302 ) 

303 

304 for rec in records: 

305 if rec.type_id in container_types: 

306 # Recursively parse nested structure 

307 nested = parse_tlv_nested( 

308 rec.value, 

309 type_size=type_size, 

310 length_size=length_size, 

311 big_endian=big_endian, 

312 container_types=container_types, 

313 ) 

314 result[rec.type_id] = nested 

315 else: 

316 result[rec.type_id] = rec.value 

317 

318 return result 

319 

320 

321__all__ = [ 

322 "BinaryParser", 

323 "PacketParser", 

324 "TLVRecord", 

325 "parse_tlv", 

326 "parse_tlv_nested", 

327]