Coverage for src / tracekit / analyzers / protocols / uart.py: 94%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""UART protocol decoder. 

2 

3This module provides UART/RS-232 protocol decoding with auto-baud 

4detection and configurable parameters. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.uart import UARTDecoder 

9 >>> decoder = UARTDecoder(baudrate=115200) 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"Data: {packet.data.hex()}") 

12 

13References: 

14 EIA/TIA-232-F Standard 

15""" 

16 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Literal 

20 

21import numpy as np 

22 

23from tracekit.analyzers.protocols.base import ( 

24 AnnotationLevel, 

25 AsyncDecoder, 

26 ChannelDef, 

27 OptionDef, 

28) 

29from tracekit.core.types import ( 

30 DigitalTrace, 

31 ProtocolPacket, 

32 TraceMetadata, 

33 WaveformTrace, 

34) 

35 

36if TYPE_CHECKING: 

37 from collections.abc import Iterator 

38 

39 from numpy.typing import NDArray 

40 

41 

42class UARTDecoder(AsyncDecoder): 

43 """UART protocol decoder. 

44 

45 Decodes UART data with configurable parameters including 

46 auto-baud detection, data bits, parity, and stop bits. 

47 

48 Attributes: 

49 id: "uart" 

50 name: "UART" 

51 channels: [rx] (required), [tx] (optional) 

52 

53 Example: 

54 >>> decoder = UARTDecoder(baudrate=115200, data_bits=8, parity="none") 

55 >>> for packet in decoder.decode(trace): 

56 ... print(f"Byte: 0x{packet.data[0]:02X}") 

57 """ 

58 

59 id = "uart" 

60 name = "UART" 

61 longname = "Universal Asynchronous Receiver/Transmitter" 

62 desc = "UART/RS-232 serial protocol decoder" 

63 

64 channels = [ # noqa: RUF012 

65 ChannelDef("rx", "RX", "Receive data line", required=True), 

66 ] 

67 

68 optional_channels = [ # noqa: RUF012 

69 ChannelDef("tx", "TX", "Transmit data line", required=False), 

70 ] 

71 

72 options = [ # noqa: RUF012 

73 OptionDef("baudrate", "Baud rate", "Bits per second", default=0, values=None), 

74 OptionDef( 

75 "data_bits", 

76 "Data bits", 

77 "Number of data bits", 

78 default=8, 

79 values=[5, 6, 7, 8, 9], 

80 ), 

81 OptionDef( 

82 "parity", 

83 "Parity", 

84 "Parity mode", 

85 default="none", 

86 values=["none", "odd", "even", "mark", "space"], 

87 ), 

88 OptionDef( 

89 "stop_bits", 

90 "Stop bits", 

91 "Number of stop bits", 

92 default=1, 

93 values=[1, 1.5, 2], 

94 ), 

95 OptionDef( 

96 "bit_order", 

97 "Bit order", 

98 "Data bit order", 

99 default="lsb", 

100 values=["lsb", "msb"], 

101 ), 

102 OptionDef("idle_level", "Idle level", "Idle line level", default=1, values=[0, 1]), 

103 ] 

104 

105 annotations = [ # noqa: RUF012 

106 ("bit", "Bit value"), 

107 ("start", "Start bit"), 

108 ("data", "Data bits"), 

109 ("parity", "Parity bit"), 

110 ("stop", "Stop bit"), 

111 ("byte", "Decoded byte"), 

112 ("error", "Error"), 

113 ] 

114 

115 def __init__( 

116 self, 

117 baudrate: int = 0, 

118 data_bits: int = 8, 

119 parity: Literal["none", "odd", "even", "mark", "space"] = "none", 

120 stop_bits: float = 1, 

121 bit_order: Literal["lsb", "msb"] = "lsb", 

122 idle_level: int = 1, 

123 ) -> None: 

124 """Initialize UART decoder. 

125 

126 Args: 

127 baudrate: Baud rate in bps. 0 for auto-detect. 

128 data_bits: Number of data bits (5-9). 

129 parity: Parity mode. 

130 stop_bits: Number of stop bits (1, 1.5, 2). 

131 bit_order: Bit order ("lsb" or "msb"). 

132 idle_level: Idle line level (0 or 1). 

133 """ 

134 super().__init__( 

135 baudrate=baudrate, 

136 data_bits=data_bits, 

137 parity=parity, 

138 stop_bits=stop_bits, 

139 bit_order=bit_order, 

140 idle_level=idle_level, 

141 ) 

142 self._data_bits = data_bits 

143 self._parity = parity 

144 self._stop_bits = stop_bits 

145 self._bit_order = bit_order 

146 self._idle_level = idle_level 

147 

148 def decode( 

149 self, 

150 trace: DigitalTrace | WaveformTrace, 

151 **channels: NDArray[np.bool_], 

152 ) -> Iterator[ProtocolPacket]: 

153 """Decode UART data from trace. 

154 

155 Args: 

156 trace: Input digital trace. 

157 **channels: Additional channel data. 

158 

159 Yields: 

160 Decoded UART bytes as ProtocolPacket objects. 

161 

162 Example: 

163 >>> decoder = UARTDecoder(baudrate=9600) 

164 >>> for packet in decoder.decode(trace): 

165 ... print(f"Byte: {packet.data.hex()}") 

166 """ 

167 # Convert to digital if needed 

168 if isinstance(trace, WaveformTrace): 

169 from tracekit.analyzers.digital.extraction import to_digital 

170 

171 digital_trace = to_digital(trace, threshold="auto") 

172 else: 

173 digital_trace = trace 

174 

175 data = digital_trace.data 

176 sample_rate = digital_trace.metadata.sample_rate 

177 

178 # Auto-detect baud rate if not specified 

179 if self._baudrate == 0: 

180 from tracekit.utils.autodetect import detect_baud_rate 

181 

182 self._baudrate = detect_baud_rate(digital_trace) # type: ignore[assignment] 

183 if self._baudrate == 0: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 self._baudrate = 9600 # Fallback 

185 

186 bit_period = sample_rate / self._baudrate 

187 half_bit = bit_period / 2 

188 

189 # Frame structure 

190 frame_bits = 1 + self._data_bits # Start + data 

191 if self._parity != "none": 

192 frame_bits += 1 

193 frame_bits += self._stop_bits # type: ignore[assignment] 

194 

195 idx = 0 

196 frame_num = 0 

197 

198 while idx < len(data) - int(frame_bits * bit_period): 

199 # Look for start bit (transition from idle) 

200 start_idx = self._find_start_bit(data, idx) 

201 if start_idx is None: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 break 

203 

204 # Sample at center of each bit 

205 sample_points = [] 

206 for bit_num in range(int(frame_bits)): 

207 sample_idx = int(start_idx + half_bit + bit_num * bit_period) 

208 if sample_idx < len(data): 208 ↛ 206line 208 didn't jump to line 206 because the condition on line 208 was always true

209 sample_points.append(sample_idx) 

210 

211 if len(sample_points) < 1 + self._data_bits: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 break 

213 

214 # Verify start bit (should be opposite of idle) 

215 start_bit = data[sample_points[0]] 

216 if (self._idle_level == 1 and start_bit) or (self._idle_level == 0 and not start_bit): 

217 # Not a valid start bit 

218 idx = start_idx + 1 

219 continue 

220 

221 # Extract data bits 

222 data_value = 0 

223 data_bits = [] 

224 

225 for i in range(self._data_bits): 

226 bit_idx = sample_points[1 + i] 

227 bit_val = 1 if data[bit_idx] else 0 

228 data_bits.append(bit_val) 

229 

230 if self._bit_order == "lsb": 

231 data_value |= bit_val << i 

232 else: 

233 data_value |= bit_val << (self._data_bits - 1 - i) 

234 

235 # Check parity if enabled 

236 errors = [] 

237 parity_idx = 1 + self._data_bits 

238 

239 if self._parity != "none" and parity_idx < len(sample_points): 

240 parity_bit = 1 if data[sample_points[parity_idx]] else 0 

241 ones_count = sum(data_bits) 

242 

243 if self._parity == "odd": 

244 expected = (ones_count + 1) % 2 

245 elif self._parity == "even": 

246 expected = ones_count % 2 

247 elif self._parity == "mark": 

248 expected = 1 

249 else: # space 

250 expected = 0 

251 

252 if parity_bit != expected: 

253 errors.append("Parity error") 

254 

255 # Verify stop bit(s) 

256 stop_idx = parity_idx + (1 if self._parity != "none" else 0) 

257 if stop_idx < len(sample_points): 257 ↛ 265line 257 didn't jump to line 265 because the condition on line 257 was always true

258 stop_bit = data[sample_points[stop_idx]] 

259 expected_stop = self._idle_level == 1 

260 

261 if stop_bit != expected_stop: 

262 errors.append("Framing error") 

263 

264 # Calculate timestamps 

265 start_time = start_idx / sample_rate 

266 end_time = (start_idx + frame_bits * bit_period) / sample_rate 

267 

268 # Add annotations 

269 self.put_annotation( 

270 start_time, 

271 start_time + bit_period / sample_rate, 

272 AnnotationLevel.BITS, 

273 "START", 

274 ) 

275 

276 for i, bit_val in enumerate(data_bits): 

277 bit_start = start_time + (1 + i) * bit_period / sample_rate 

278 bit_end = bit_start + bit_period / sample_rate 

279 self.put_annotation( 

280 bit_start, 

281 bit_end, 

282 AnnotationLevel.BITS, 

283 str(bit_val), 

284 ) 

285 

286 self.put_annotation( 

287 start_time, 

288 end_time, 

289 AnnotationLevel.BYTES, 

290 f"0x{data_value:02X}", 

291 data=bytes([data_value]), 

292 ) 

293 

294 # Create packet 

295 packet = ProtocolPacket( 

296 timestamp=start_time, 

297 protocol="uart", 

298 data=bytes([data_value]), 

299 annotations={ 

300 "frame_num": frame_num, 

301 "data_bits": data_bits, 

302 "baudrate": self._baudrate, 

303 }, 

304 errors=errors, 

305 ) 

306 

307 self.put_packet(start_time, bytes([data_value]), packet.annotations, errors) 

308 

309 yield packet 

310 

311 frame_num += 1 

312 # Advance to the end of the frame 

313 # Use the last sample point + 1 to avoid re-detecting the same frame 

314 last_sample = sample_points[-1] if sample_points else start_idx 

315 idx = last_sample + 1 

316 

317 def _find_start_bit( 

318 self, 

319 data: NDArray[np.bool_], 

320 start_idx: int, 

321 ) -> int | None: 

322 """Find start bit transition. 

323 

324 Args: 

325 data: Digital data array. 

326 start_idx: Index to start searching. 

327 

328 Returns: 

329 Index of start bit, or None if not found. 

330 """ 

331 search = data[start_idx:] 

332 

333 if self._idle_level == 1: 

334 # Look for falling edge (high to low) 

335 transitions = np.where(search[:-1] & ~search[1:])[0] 

336 else: 

337 # Look for rising edge (low to high) 

338 transitions = np.where(~search[:-1] & search[1:])[0] 

339 

340 if len(transitions) == 0: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 return None 

342 

343 # Return index of first sample after the transition (start of start bit) 

344 # transitions[0] is the last idle-level sample before the edge 

345 return int(start_idx + transitions[0] + 1) 

346 

347 

348def decode_uart( 

349 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace, 

350 sample_rate: float = 1.0, 

351 baudrate: int | None = None, 

352 data_bits: Literal[5, 6, 7, 8, 9] = 8, 

353 parity: Literal["none", "odd", "even", "mark", "space"] = "none", 

354 stop_bits: Literal[1, 1.5, 2] = 1, # type: ignore[valid-type] 

355 idle_level: Literal[0, 1] = 1, 

356) -> list[ProtocolPacket]: 

357 """Convenience function to decode UART data. 

358 

359 Args: 

360 data: UART signal (digital array or trace). 

361 sample_rate: Sample rate in Hz. 

362 baudrate: Baud rate (None for auto-detection). 

363 data_bits: Number of data bits per frame. 

364 parity: Parity mode. 

365 stop_bits: Number of stop bits. 

366 idle_level: Idle line level. 

367 

368 Returns: 

369 List of decoded UART bytes. 

370 

371 Example: 

372 >>> packets = decode_uart(signal, sample_rate=10e6, baudrate=115200) 

373 >>> for pkt in packets: 

374 ... print(f"Byte: 0x{pkt.data[0]:02X}") 

375 """ 

376 decoder = UARTDecoder( 

377 baudrate=baudrate if baudrate is not None else 0, # 0 for auto-detect 

378 data_bits=data_bits, 

379 parity=parity, 

380 stop_bits=stop_bits, 

381 idle_level=idle_level, 

382 ) 

383 if isinstance(data, WaveformTrace | DigitalTrace): 

384 return list(decoder.decode(data)) 

385 else: 

386 trace = DigitalTrace( 

387 data=data, 

388 metadata=TraceMetadata(sample_rate=sample_rate), 

389 ) 

390 return list(decoder.decode(trace)) 

391 

392 

393__all__ = ["UARTDecoder", "decode_uart"]