Coverage for src / tracekit / analyzers / packet / parser.py: 100%
74 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Binary parsing utilities for packet analysis.
3This module provides fast binary parsing using struct.Struct
4pre-compilation and TLV record support.
7Example:
8 >>> from tracekit.analyzers.packet.parser import BinaryParser, parse_tlv
9 >>> parser = BinaryParser(">HBB") # big-endian: ushort, 2 ubytes
10 >>> values = parser.unpack(data)
12References:
13 Python struct module documentation
14"""
16from __future__ import annotations
18import struct
19from dataclasses import dataclass
20from typing import TYPE_CHECKING, Any
22if TYPE_CHECKING:
23 from collections.abc import Iterator
26@dataclass
27class TLVRecord:
28 """Type-Length-Value record.
30 Attributes:
31 type_id: Record type identifier.
32 length: Data length in bytes.
33 value: Record data.
34 offset: Byte offset in source data.
35 """
37 type_id: int
38 length: int
39 value: bytes
40 offset: int = 0
43class BinaryParser:
44 """Fast binary parser using pre-compiled struct format.
46 Uses struct.Struct for pre-compiled format strings to achieve
47 high parsing throughput (>10K packets/second).
49 Args:
50 format_string: struct format string (e.g., ">HBB").
52 Attributes:
53 format: The format string.
54 size: Size of the packed structure in bytes.
56 Example:
57 >>> parser = BinaryParser(">HHBBI") # Header format
58 >>> header = parser.unpack(data)
59 >>> print(f"Fields: {header}")
61 >>> # Unpack from offset
62 >>> payload = parser.unpack_from(data, offset=8)
63 """
65 def __init__(self, format_string: str) -> None:
66 """Initialize parser with format string.
68 Args:
69 format_string: struct format string. Use standard prefixes:
70 - ">": Big-endian
71 - "<": Little-endian
72 - "=": Native byte order
73 - "!": Network byte order (big-endian)
74 """
75 self._struct = struct.Struct(format_string)
76 self._format = format_string
78 @property
79 def format(self) -> str:
80 """Get format string."""
81 return self._format
83 @property
84 def size(self) -> int:
85 """Get packed size in bytes."""
86 return self._struct.size
88 def unpack(self, buffer: bytes) -> tuple[Any, ...]:
89 """Unpack data from bytes.
91 Args:
92 buffer: Bytes to unpack. Must be at least self.size bytes.
94 Returns:
95 Tuple of unpacked values.
96 """
97 return self._struct.unpack(buffer[: self.size])
99 def unpack_from(self, buffer: bytes, offset: int = 0) -> tuple[Any, ...]:
100 """Unpack data from offset in buffer.
102 More efficient than slicing when parsing multiple fields
103 from a single buffer.
105 Args:
106 buffer: Source buffer.
107 offset: Byte offset to start unpacking.
109 Returns:
110 Tuple of unpacked values.
111 """
112 return self._struct.unpack_from(buffer, offset)
114 def pack(self, *values: Any) -> bytes:
115 """Pack values to bytes.
117 Args:
118 *values: Values matching format string types.
120 Returns:
121 Packed bytes.
122 """
123 return self._struct.pack(*values)
125 def iter_unpack(self, buffer: bytes) -> Iterator[tuple[Any, ...]]:
126 """Iterate over repeated structures in buffer.
128 Args:
129 buffer: Source buffer.
131 Returns:
132 Iterator yielding tuples of unpacked values.
134 Example:
135 >>> parser = BinaryParser(">HH")
136 >>> for a, b in parser.iter_unpack(data):
137 ... print(f"Pair: {a}, {b}")
138 """
139 return self._struct.iter_unpack(buffer)
142class PacketParser:
143 """Multi-field packet parser.
145 Parses packets with multiple fields using named field definitions.
147 Example:
148 >>> fields = [
149 ... ("sync", "H"),
150 ... ("length", "H"),
151 ... ("type", "B"),
152 ... ("flags", "B"),
153 ... ]
154 >>> parser = PacketParser(fields, byte_order=">")
155 >>> packet = parser.parse(data)
156 >>> print(f"Type: {packet['type']}")
157 """
159 def __init__(
160 self,
161 fields: list[tuple[str, str]],
162 byte_order: str = ">",
163 ) -> None:
164 """Initialize packet parser.
166 Args:
167 fields: List of (name, format_char) tuples.
168 byte_order: Byte order prefix (">", "<", "=", "!").
169 """
170 self._field_names = [f[0] for f in fields]
171 format_chars = "".join(f[1] for f in fields)
172 self._parser = BinaryParser(byte_order + format_chars)
174 @property
175 def size(self) -> int:
176 """Get packet header size in bytes."""
177 return self._parser.size
179 def parse(self, buffer: bytes, offset: int = 0) -> dict[str, Any]:
180 """Parse packet fields.
182 Args:
183 buffer: Source buffer.
184 offset: Byte offset.
186 Returns:
187 Dictionary mapping field names to values.
188 """
189 values = self._parser.unpack_from(buffer, offset)
190 return dict(zip(self._field_names, values, strict=False))
192 def pack(self, **fields: Any) -> bytes:
193 """Pack fields to bytes.
195 Args:
196 **fields: Field values by name.
198 Returns:
199 Packed bytes.
200 """
201 values = [fields[name] for name in self._field_names]
202 return self._parser.pack(*values)
205def parse_tlv(
206 buffer: bytes,
207 *,
208 type_size: int = 1,
209 length_size: int = 1,
210 big_endian: bool = True,
211 include_length_in_length: bool = False,
212 type_map: dict[int, str] | None = None,
213) -> list[TLVRecord]:
214 """Parse Type-Length-Value records.
216 Args:
217 buffer: Source buffer containing TLV records.
218 type_size: Size of type field in bytes (1, 2, or 4).
219 length_size: Size of length field in bytes (1, 2, or 4).
220 big_endian: True for big-endian byte order.
221 include_length_in_length: True if length includes type+length fields.
222 type_map: Optional mapping of type IDs to names.
224 Returns:
225 List of TLVRecord objects.
227 Example:
228 >>> records = parse_tlv(data, type_size=2, length_size=2)
229 >>> for rec in records:
230 ... print(f"Type {rec.type_id}: {rec.length} bytes")
231 """
232 records: list[TLVRecord] = []
233 offset = 0
234 header_size = type_size + length_size
236 # Determine struct format
237 byte_order = ">" if big_endian else "<"
238 type_fmt = {1: "B", 2: "H", 4: "I"}[type_size]
239 length_fmt = {1: "B", 2: "H", 4: "I"}[length_size]
240 header_parser = BinaryParser(byte_order + type_fmt + length_fmt)
242 while offset + header_size <= len(buffer):
243 type_id, length = header_parser.unpack_from(buffer, offset)
245 # Adjust length if it includes header
246 data_length = length - header_size if include_length_in_length else length
248 if data_length < 0:
249 break
251 # Extract value
252 value_start = offset + header_size
253 value_end = value_start + data_length
255 if value_end > len(buffer):
256 break
258 value = buffer[value_start:value_end]
260 records.append(
261 TLVRecord(
262 type_id=type_id,
263 length=data_length,
264 value=value,
265 offset=offset,
266 )
267 )
269 offset = value_end
271 return records
274def parse_tlv_nested(
275 buffer: bytes,
276 *,
277 type_size: int = 1,
278 length_size: int = 1,
279 big_endian: bool = True,
280 container_types: set[int] | None = None,
281) -> dict[int, Any]:
282 """Parse nested TLV structure.
284 Args:
285 buffer: Source buffer.
286 type_size: Size of type field.
287 length_size: Size of length field.
288 big_endian: Byte order.
289 container_types: Set of type IDs that contain nested TLV.
291 Returns:
292 Dictionary with type_id keys and either bytes or nested dict values.
293 """
294 container_types = container_types or set()
295 result: dict[int, Any] = {}
297 records = parse_tlv(
298 buffer,
299 type_size=type_size,
300 length_size=length_size,
301 big_endian=big_endian,
302 )
304 for rec in records:
305 if rec.type_id in container_types:
306 # Recursively parse nested structure
307 nested = parse_tlv_nested(
308 rec.value,
309 type_size=type_size,
310 length_size=length_size,
311 big_endian=big_endian,
312 container_types=container_types,
313 )
314 result[rec.type_id] = nested
315 else:
316 result[rec.type_id] = rec.value
318 return result
321__all__ = [
322 "BinaryParser",
323 "PacketParser",
324 "TLVRecord",
325 "parse_tlv",
326 "parse_tlv_nested",
327]