Coverage for src / tracekit / analyzers / protocols / spi.py: 99%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""SPI protocol decoder. 

2 

3This module provides SPI (Serial Peripheral Interface) protocol 

4decoding with configurable CPOL/CPHA modes and word sizes. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.spi import SPIDecoder 

9 >>> decoder = SPIDecoder(cpol=0, cpha=0, word_size=8) 

10 >>> for packet in decoder.decode(clk=clock, mosi=mosi, miso=miso, cs=cs): 

11 ... print(f"TX: {packet.annotations['mosi'].hex()}") 

12 

13References: 

14 SPI Specification (Motorola) 

15""" 

16 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Literal 

20 

21import numpy as np 

22 

23from tracekit.analyzers.protocols.base import ( 

24 AnnotationLevel, 

25 ChannelDef, 

26 OptionDef, 

27 SyncDecoder, 

28) 

29from tracekit.core.types import DigitalTrace, ProtocolPacket 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Iterator 

33 

34 from numpy.typing import NDArray 

35 

36 

37class SPIDecoder(SyncDecoder): 

38 """SPI protocol decoder. 

39 

40 Decodes SPI bus transactions with configurable clock polarity, 

41 clock phase, and word size. 

42 

43 Mode mapping: 

44 - Mode 0: CPOL=0, CPHA=0 (sample on rising, shift on falling) 

45 - Mode 1: CPOL=0, CPHA=1 (sample on falling, shift on rising) 

46 - Mode 2: CPOL=1, CPHA=0 (sample on falling, shift on rising) 

47 - Mode 3: CPOL=1, CPHA=1 (sample on rising, shift on falling) 

48 

49 Example: 

50 >>> decoder = SPIDecoder(cpol=0, cpha=0, word_size=8) 

51 >>> for packet in decoder.decode(trace, clk=clk, mosi=mosi, miso=miso): 

52 ... print(f"MOSI: {packet.annotations['mosi'].hex()}") 

53 """ 

54 

55 id = "spi" 

56 name = "SPI" 

57 longname = "Serial Peripheral Interface" 

58 desc = "SPI bus protocol decoder" 

59 

60 channels = [ # noqa: RUF012 

61 ChannelDef("clk", "CLK", "Clock signal", required=True), 

62 ChannelDef("mosi", "MOSI", "Master Out Slave In", required=True), 

63 ] 

64 

65 optional_channels = [ # noqa: RUF012 

66 ChannelDef("miso", "MISO", "Master In Slave Out", required=False), 

67 ChannelDef("cs", "CS#", "Chip Select (active low)", required=False), 

68 ] 

69 

70 options = [ # noqa: RUF012 

71 OptionDef("cpol", "Clock Polarity", "Clock idle state", default=0, values=[0, 1]), 

72 OptionDef("cpha", "Clock Phase", "Sample edge", default=0, values=[0, 1]), 

73 OptionDef( 

74 "word_size", 

75 "Word size", 

76 "Bits per word", 

77 default=8, 

78 values=[4, 8, 16, 24, 32], 

79 ), 

80 OptionDef("bit_order", "Bit order", "Bit order", default="msb", values=["msb", "lsb"]), 

81 OptionDef( 

82 "cs_polarity", 

83 "CS polarity", 

84 "Chip select polarity", 

85 default=0, 

86 values=[0, 1], 

87 ), 

88 ] 

89 

90 annotations = [ # noqa: RUF012 

91 ("bit", "Bit value"), 

92 ("byte", "Decoded byte"), 

93 ("word", "Decoded word"), 

94 ("transfer", "Complete transfer"), 

95 ] 

96 

97 def __init__( 

98 self, 

99 cpol: Literal[0, 1] = 0, 

100 cpha: Literal[0, 1] = 0, 

101 word_size: int = 8, 

102 bit_order: Literal["msb", "lsb"] = "msb", 

103 cs_polarity: Literal[0, 1] = 0, 

104 ) -> None: 

105 """Initialize SPI decoder. 

106 

107 Args: 

108 cpol: Clock polarity (0=idle low, 1=idle high). 

109 cpha: Clock phase (0=sample on first edge, 1=sample on second edge). 

110 word_size: Bits per word. 

111 bit_order: Bit order ("msb" or "lsb"). 

112 cs_polarity: CS active level (0=active low, 1=active high). 

113 """ 

114 super().__init__( 

115 cpol=cpol, 

116 cpha=cpha, 

117 word_size=word_size, 

118 bit_order=bit_order, 

119 cs_polarity=cs_polarity, 

120 ) 

121 self._cpol = cpol 

122 self._cpha = cpha 

123 self._word_size = word_size 

124 self._bit_order = bit_order 

125 self._cs_polarity = cs_polarity 

126 

127 def decode( # type: ignore[override] 

128 self, 

129 trace: DigitalTrace | None = None, 

130 *, 

131 clk: NDArray[np.bool_] | None = None, 

132 mosi: NDArray[np.bool_] | None = None, 

133 miso: NDArray[np.bool_] | None = None, 

134 cs: NDArray[np.bool_] | None = None, 

135 sample_rate: float = 1.0, 

136 ) -> Iterator[ProtocolPacket]: 

137 """Decode SPI transactions. 

138 

139 Args: 

140 trace: Optional primary trace (uses clk if provided). 

141 clk: Clock signal. 

142 mosi: Master Out Slave In data. 

143 miso: Master In Slave Out data (optional). 

144 cs: Chip Select signal (optional). 

145 sample_rate: Sample rate in Hz. 

146 

147 Yields: 

148 Decoded SPI words as ProtocolPacket objects. 

149 

150 Example: 

151 >>> decoder = SPIDecoder(cpol=0, cpha=0) 

152 >>> for pkt in decoder.decode(clk=clk, mosi=mosi, miso=miso, sample_rate=1e9): 

153 ... print(f"Word: 0x{pkt.annotations['mosi_value']:04X}") 

154 """ 

155 if trace is not None: 

156 clk = trace.data 

157 sample_rate = trace.metadata.sample_rate 

158 

159 if clk is None or mosi is None: 

160 return 

161 

162 n_samples = min(len(clk), len(mosi)) 

163 if miso is not None: 

164 n_samples = min(n_samples, len(miso)) 

165 if cs is not None: 

166 n_samples = min(n_samples, len(cs)) 

167 

168 clk = clk[:n_samples] 

169 mosi = mosi[:n_samples] 

170 if miso is not None: 

171 miso = miso[:n_samples] 

172 if cs is not None: 

173 cs = cs[:n_samples] 

174 

175 # Determine sampling edge based on CPOL and CPHA 

176 # CPOL=0: idle low, first edge is rising 

177 # CPOL=1: idle high, first edge is falling 

178 # CPHA=0: sample on first edge 

179 # CPHA=1: sample on second edge 

180 if self._cpol == 0: 

181 sample_edge = "rising" if self._cpha == 0 else "falling" 

182 elif self._cpha == 0: 

183 sample_edge = "falling" 

184 else: 

185 sample_edge = "rising" 

186 

187 # Find clock edges 

188 if sample_edge == "rising": 

189 edges = np.where(~clk[:-1] & clk[1:])[0] + 1 

190 else: 

191 edges = np.where(clk[:-1] & ~clk[1:])[0] + 1 

192 

193 if len(edges) == 0: 

194 return 

195 

196 # Collect bits into words 

197 mosi_bits: list[int] = [] 

198 miso_bits: list[int] = [] 

199 word_start_idx = edges[0] 

200 word_num = 0 

201 

202 for edge_idx in edges: 

203 # Check if CS is active (if provided) 

204 if cs is not None: 

205 cs_active = cs[edge_idx] == (self._cs_polarity == 1) 

206 if not cs_active: 

207 # CS not active, reset and skip 

208 if mosi_bits: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was never true

209 # Emit partial word if any 

210 pass 

211 mosi_bits = [] 

212 miso_bits = [] 

213 continue 

214 

215 # Sample MOSI 

216 mosi_bit = 1 if mosi[edge_idx] else 0 

217 mosi_bits.append(mosi_bit) 

218 

219 # Sample MISO if available 

220 if miso is not None: 

221 miso_bit = 1 if miso[edge_idx] else 0 

222 miso_bits.append(miso_bit) 

223 

224 # Check if we have a complete word 

225 if len(mosi_bits) >= self._word_size: 

226 # Convert bits to value 

227 mosi_value = self._bits_to_value(mosi_bits[: self._word_size]) 

228 miso_value = ( 

229 self._bits_to_value(miso_bits[: self._word_size]) if miso_bits else None 

230 ) 

231 

232 # Calculate timing 

233 start_time = word_start_idx / sample_rate 

234 end_time = edge_idx / sample_rate 

235 

236 # Encode as bytes 

237 byte_count = (self._word_size + 7) // 8 

238 mosi_bytes = mosi_value.to_bytes(byte_count, "big") 

239 

240 # Add annotations 

241 self.put_annotation( 

242 start_time, 

243 end_time, 

244 AnnotationLevel.WORDS, 

245 f"MOSI: 0x{mosi_value:0{byte_count * 2}X}", 

246 data=mosi_bytes, 

247 ) 

248 

249 annotations = { 

250 "word_num": word_num, 

251 "mosi_bits": mosi_bits[: self._word_size], 

252 "mosi_value": mosi_value, 

253 "word_size": self._word_size, 

254 "mode": self._cpol * 2 + self._cpha, 

255 } 

256 

257 if miso_value is not None: 

258 annotations["miso_bits"] = miso_bits[: self._word_size] 

259 annotations["miso_value"] = miso_value 

260 

261 packet = ProtocolPacket( 

262 timestamp=start_time, 

263 protocol="spi", 

264 data=mosi_bytes, 

265 annotations=annotations, 

266 errors=[], 

267 ) 

268 

269 yield packet 

270 

271 # Reset for next word 

272 mosi_bits = mosi_bits[self._word_size :] 

273 miso_bits = miso_bits[self._word_size :] if miso_bits else [] 

274 word_start_idx = edge_idx 

275 word_num += 1 

276 

277 def _bits_to_value(self, bits: list[int]) -> int: 

278 """Convert bit list to integer value. 

279 

280 Args: 

281 bits: List of bit values (0 or 1). 

282 

283 Returns: 

284 Integer value. 

285 """ 

286 value = 0 

287 

288 if self._bit_order == "msb": 

289 for bit in bits: 

290 value = (value << 1) | bit 

291 else: 

292 for i, bit in enumerate(bits): 

293 value |= bit << i 

294 

295 return value 

296 

297 

298def decode_spi( 

299 clk: NDArray[np.bool_], 

300 mosi: NDArray[np.bool_] | None = None, 

301 miso: NDArray[np.bool_] | None = None, 

302 cs: NDArray[np.bool_] | None = None, 

303 sample_rate: float = 1.0, 

304 cpol: Literal[0, 1] = 0, 

305 cpha: Literal[0, 1] = 0, 

306 word_size: int = 8, 

307 bit_order: Literal["msb", "lsb"] = "msb", 

308) -> list[ProtocolPacket]: 

309 """Convenience function to decode SPI transactions. 

310 

311 Args: 

312 clk: Clock signal. 

313 mosi: Master Out Slave In signal (optional). 

314 miso: Master In Slave Out signal (optional). 

315 cs: Chip select signal (optional, active low). 

316 sample_rate: Sample rate in Hz. 

317 cpol: Clock polarity (0 or 1). 

318 cpha: Clock phase (0 or 1). 

319 word_size: Bits per word (default 8). 

320 bit_order: Bit order ("msb" or "lsb"). 

321 

322 Returns: 

323 List of decoded SPI transactions. 

324 

325 Example: 

326 >>> packets = decode_spi(clk, mosi=mosi, miso=miso, sample_rate=10e6) 

327 >>> for pkt in packets: 

328 ... print(f"MOSI: {pkt.annotations['mosi'].hex()}") 

329 """ 

330 decoder = SPIDecoder(cpol=cpol, cpha=cpha, word_size=word_size, bit_order=bit_order) 

331 return list(decoder.decode(clk=clk, mosi=mosi, miso=miso, cs=cs, sample_rate=sample_rate)) 

332 

333 

334__all__ = ["SPIDecoder", "decode_spi"]