Coverage for src / tracekit / analyzers / protocols / uart.py: 94%
114 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""UART protocol decoder.
3This module provides UART/RS-232 protocol decoding with auto-baud
4detection and configurable parameters.
7Example:
8 >>> from tracekit.analyzers.protocols.uart import UARTDecoder
9 >>> decoder = UARTDecoder(baudrate=115200)
10 >>> for packet in decoder.decode(trace):
11 ... print(f"Data: {packet.data.hex()}")
13References:
14 EIA/TIA-232-F Standard
15"""
17from __future__ import annotations
19from typing import TYPE_CHECKING, Literal
21import numpy as np
23from tracekit.analyzers.protocols.base import (
24 AnnotationLevel,
25 AsyncDecoder,
26 ChannelDef,
27 OptionDef,
28)
29from tracekit.core.types import (
30 DigitalTrace,
31 ProtocolPacket,
32 TraceMetadata,
33 WaveformTrace,
34)
36if TYPE_CHECKING:
37 from collections.abc import Iterator
39 from numpy.typing import NDArray
42class UARTDecoder(AsyncDecoder):
43 """UART protocol decoder.
45 Decodes UART data with configurable parameters including
46 auto-baud detection, data bits, parity, and stop bits.
48 Attributes:
49 id: "uart"
50 name: "UART"
51 channels: [rx] (required), [tx] (optional)
53 Example:
54 >>> decoder = UARTDecoder(baudrate=115200, data_bits=8, parity="none")
55 >>> for packet in decoder.decode(trace):
56 ... print(f"Byte: 0x{packet.data[0]:02X}")
57 """
59 id = "uart"
60 name = "UART"
61 longname = "Universal Asynchronous Receiver/Transmitter"
62 desc = "UART/RS-232 serial protocol decoder"
64 channels = [ # noqa: RUF012
65 ChannelDef("rx", "RX", "Receive data line", required=True),
66 ]
68 optional_channels = [ # noqa: RUF012
69 ChannelDef("tx", "TX", "Transmit data line", required=False),
70 ]
72 options = [ # noqa: RUF012
73 OptionDef("baudrate", "Baud rate", "Bits per second", default=0, values=None),
74 OptionDef(
75 "data_bits",
76 "Data bits",
77 "Number of data bits",
78 default=8,
79 values=[5, 6, 7, 8, 9],
80 ),
81 OptionDef(
82 "parity",
83 "Parity",
84 "Parity mode",
85 default="none",
86 values=["none", "odd", "even", "mark", "space"],
87 ),
88 OptionDef(
89 "stop_bits",
90 "Stop bits",
91 "Number of stop bits",
92 default=1,
93 values=[1, 1.5, 2],
94 ),
95 OptionDef(
96 "bit_order",
97 "Bit order",
98 "Data bit order",
99 default="lsb",
100 values=["lsb", "msb"],
101 ),
102 OptionDef("idle_level", "Idle level", "Idle line level", default=1, values=[0, 1]),
103 ]
105 annotations = [ # noqa: RUF012
106 ("bit", "Bit value"),
107 ("start", "Start bit"),
108 ("data", "Data bits"),
109 ("parity", "Parity bit"),
110 ("stop", "Stop bit"),
111 ("byte", "Decoded byte"),
112 ("error", "Error"),
113 ]
115 def __init__(
116 self,
117 baudrate: int = 0,
118 data_bits: int = 8,
119 parity: Literal["none", "odd", "even", "mark", "space"] = "none",
120 stop_bits: float = 1,
121 bit_order: Literal["lsb", "msb"] = "lsb",
122 idle_level: int = 1,
123 ) -> None:
124 """Initialize UART decoder.
126 Args:
127 baudrate: Baud rate in bps. 0 for auto-detect.
128 data_bits: Number of data bits (5-9).
129 parity: Parity mode.
130 stop_bits: Number of stop bits (1, 1.5, 2).
131 bit_order: Bit order ("lsb" or "msb").
132 idle_level: Idle line level (0 or 1).
133 """
134 super().__init__(
135 baudrate=baudrate,
136 data_bits=data_bits,
137 parity=parity,
138 stop_bits=stop_bits,
139 bit_order=bit_order,
140 idle_level=idle_level,
141 )
142 self._data_bits = data_bits
143 self._parity = parity
144 self._stop_bits = stop_bits
145 self._bit_order = bit_order
146 self._idle_level = idle_level
148 def decode(
149 self,
150 trace: DigitalTrace | WaveformTrace,
151 **channels: NDArray[np.bool_],
152 ) -> Iterator[ProtocolPacket]:
153 """Decode UART data from trace.
155 Args:
156 trace: Input digital trace.
157 **channels: Additional channel data.
159 Yields:
160 Decoded UART bytes as ProtocolPacket objects.
162 Example:
163 >>> decoder = UARTDecoder(baudrate=9600)
164 >>> for packet in decoder.decode(trace):
165 ... print(f"Byte: {packet.data.hex()}")
166 """
167 # Convert to digital if needed
168 if isinstance(trace, WaveformTrace):
169 from tracekit.analyzers.digital.extraction import to_digital
171 digital_trace = to_digital(trace, threshold="auto")
172 else:
173 digital_trace = trace
175 data = digital_trace.data
176 sample_rate = digital_trace.metadata.sample_rate
178 # Auto-detect baud rate if not specified
179 if self._baudrate == 0:
180 from tracekit.utils.autodetect import detect_baud_rate
182 self._baudrate = detect_baud_rate(digital_trace) # type: ignore[assignment]
183 if self._baudrate == 0: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 self._baudrate = 9600 # Fallback
186 bit_period = sample_rate / self._baudrate
187 half_bit = bit_period / 2
189 # Frame structure
190 frame_bits = 1 + self._data_bits # Start + data
191 if self._parity != "none":
192 frame_bits += 1
193 frame_bits += self._stop_bits # type: ignore[assignment]
195 idx = 0
196 frame_num = 0
198 while idx < len(data) - int(frame_bits * bit_period):
199 # Look for start bit (transition from idle)
200 start_idx = self._find_start_bit(data, idx)
201 if start_idx is None: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 break
204 # Sample at center of each bit
205 sample_points = []
206 for bit_num in range(int(frame_bits)):
207 sample_idx = int(start_idx + half_bit + bit_num * bit_period)
208 if sample_idx < len(data): 208 ↛ 206line 208 didn't jump to line 206 because the condition on line 208 was always true
209 sample_points.append(sample_idx)
211 if len(sample_points) < 1 + self._data_bits: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 break
214 # Verify start bit (should be opposite of idle)
215 start_bit = data[sample_points[0]]
216 if (self._idle_level == 1 and start_bit) or (self._idle_level == 0 and not start_bit):
217 # Not a valid start bit
218 idx = start_idx + 1
219 continue
221 # Extract data bits
222 data_value = 0
223 data_bits = []
225 for i in range(self._data_bits):
226 bit_idx = sample_points[1 + i]
227 bit_val = 1 if data[bit_idx] else 0
228 data_bits.append(bit_val)
230 if self._bit_order == "lsb":
231 data_value |= bit_val << i
232 else:
233 data_value |= bit_val << (self._data_bits - 1 - i)
235 # Check parity if enabled
236 errors = []
237 parity_idx = 1 + self._data_bits
239 if self._parity != "none" and parity_idx < len(sample_points):
240 parity_bit = 1 if data[sample_points[parity_idx]] else 0
241 ones_count = sum(data_bits)
243 if self._parity == "odd":
244 expected = (ones_count + 1) % 2
245 elif self._parity == "even":
246 expected = ones_count % 2
247 elif self._parity == "mark":
248 expected = 1
249 else: # space
250 expected = 0
252 if parity_bit != expected:
253 errors.append("Parity error")
255 # Verify stop bit(s)
256 stop_idx = parity_idx + (1 if self._parity != "none" else 0)
257 if stop_idx < len(sample_points): 257 ↛ 265line 257 didn't jump to line 265 because the condition on line 257 was always true
258 stop_bit = data[sample_points[stop_idx]]
259 expected_stop = self._idle_level == 1
261 if stop_bit != expected_stop:
262 errors.append("Framing error")
264 # Calculate timestamps
265 start_time = start_idx / sample_rate
266 end_time = (start_idx + frame_bits * bit_period) / sample_rate
268 # Add annotations
269 self.put_annotation(
270 start_time,
271 start_time + bit_period / sample_rate,
272 AnnotationLevel.BITS,
273 "START",
274 )
276 for i, bit_val in enumerate(data_bits):
277 bit_start = start_time + (1 + i) * bit_period / sample_rate
278 bit_end = bit_start + bit_period / sample_rate
279 self.put_annotation(
280 bit_start,
281 bit_end,
282 AnnotationLevel.BITS,
283 str(bit_val),
284 )
286 self.put_annotation(
287 start_time,
288 end_time,
289 AnnotationLevel.BYTES,
290 f"0x{data_value:02X}",
291 data=bytes([data_value]),
292 )
294 # Create packet
295 packet = ProtocolPacket(
296 timestamp=start_time,
297 protocol="uart",
298 data=bytes([data_value]),
299 annotations={
300 "frame_num": frame_num,
301 "data_bits": data_bits,
302 "baudrate": self._baudrate,
303 },
304 errors=errors,
305 )
307 self.put_packet(start_time, bytes([data_value]), packet.annotations, errors)
309 yield packet
311 frame_num += 1
312 # Advance to the end of the frame
313 # Use the last sample point + 1 to avoid re-detecting the same frame
314 last_sample = sample_points[-1] if sample_points else start_idx
315 idx = last_sample + 1
317 def _find_start_bit(
318 self,
319 data: NDArray[np.bool_],
320 start_idx: int,
321 ) -> int | None:
322 """Find start bit transition.
324 Args:
325 data: Digital data array.
326 start_idx: Index to start searching.
328 Returns:
329 Index of start bit, or None if not found.
330 """
331 search = data[start_idx:]
333 if self._idle_level == 1:
334 # Look for falling edge (high to low)
335 transitions = np.where(search[:-1] & ~search[1:])[0]
336 else:
337 # Look for rising edge (low to high)
338 transitions = np.where(~search[:-1] & search[1:])[0]
340 if len(transitions) == 0: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 return None
343 # Return index of first sample after the transition (start of start bit)
344 # transitions[0] is the last idle-level sample before the edge
345 return int(start_idx + transitions[0] + 1)
348def decode_uart(
349 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace,
350 sample_rate: float = 1.0,
351 baudrate: int | None = None,
352 data_bits: Literal[5, 6, 7, 8, 9] = 8,
353 parity: Literal["none", "odd", "even", "mark", "space"] = "none",
354 stop_bits: Literal[1, 1.5, 2] = 1, # type: ignore[valid-type]
355 idle_level: Literal[0, 1] = 1,
356) -> list[ProtocolPacket]:
357 """Convenience function to decode UART data.
359 Args:
360 data: UART signal (digital array or trace).
361 sample_rate: Sample rate in Hz.
362 baudrate: Baud rate (None for auto-detection).
363 data_bits: Number of data bits per frame.
364 parity: Parity mode.
365 stop_bits: Number of stop bits.
366 idle_level: Idle line level.
368 Returns:
369 List of decoded UART bytes.
371 Example:
372 >>> packets = decode_uart(signal, sample_rate=10e6, baudrate=115200)
373 >>> for pkt in packets:
374 ... print(f"Byte: 0x{pkt.data[0]:02X}")
375 """
376 decoder = UARTDecoder(
377 baudrate=baudrate if baudrate is not None else 0, # 0 for auto-detect
378 data_bits=data_bits,
379 parity=parity,
380 stop_bits=stop_bits,
381 idle_level=idle_level,
382 )
383 if isinstance(data, WaveformTrace | DigitalTrace):
384 return list(decoder.decode(data))
385 else:
386 trace = DigitalTrace(
387 data=data,
388 metadata=TraceMetadata(sample_rate=sample_rate),
389 )
390 return list(decoder.decode(trace))
393__all__ = ["UARTDecoder", "decode_uart"]