Coverage for src / tracekit / analyzers / protocols / spi.py: 99%
96 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""SPI protocol decoder.
3This module provides SPI (Serial Peripheral Interface) protocol
4decoding with configurable CPOL/CPHA modes and word sizes.
7Example:
8 >>> from tracekit.analyzers.protocols.spi import SPIDecoder
9 >>> decoder = SPIDecoder(cpol=0, cpha=0, word_size=8)
10 >>> for packet in decoder.decode(clk=clock, mosi=mosi, miso=miso, cs=cs):
11 ... print(f"TX: {packet.annotations['mosi'].hex()}")
13References:
14 SPI Specification (Motorola)
15"""
17from __future__ import annotations
19from typing import TYPE_CHECKING, Literal
21import numpy as np
23from tracekit.analyzers.protocols.base import (
24 AnnotationLevel,
25 ChannelDef,
26 OptionDef,
27 SyncDecoder,
28)
29from tracekit.core.types import DigitalTrace, ProtocolPacket
31if TYPE_CHECKING:
32 from collections.abc import Iterator
34 from numpy.typing import NDArray
37class SPIDecoder(SyncDecoder):
38 """SPI protocol decoder.
40 Decodes SPI bus transactions with configurable clock polarity,
41 clock phase, and word size.
43 Mode mapping:
44 - Mode 0: CPOL=0, CPHA=0 (sample on rising, shift on falling)
45 - Mode 1: CPOL=0, CPHA=1 (sample on falling, shift on rising)
46 - Mode 2: CPOL=1, CPHA=0 (sample on falling, shift on rising)
47 - Mode 3: CPOL=1, CPHA=1 (sample on rising, shift on falling)
49 Example:
50 >>> decoder = SPIDecoder(cpol=0, cpha=0, word_size=8)
51 >>> for packet in decoder.decode(trace, clk=clk, mosi=mosi, miso=miso):
52 ... print(f"MOSI: {packet.annotations['mosi'].hex()}")
53 """
55 id = "spi"
56 name = "SPI"
57 longname = "Serial Peripheral Interface"
58 desc = "SPI bus protocol decoder"
60 channels = [ # noqa: RUF012
61 ChannelDef("clk", "CLK", "Clock signal", required=True),
62 ChannelDef("mosi", "MOSI", "Master Out Slave In", required=True),
63 ]
65 optional_channels = [ # noqa: RUF012
66 ChannelDef("miso", "MISO", "Master In Slave Out", required=False),
67 ChannelDef("cs", "CS#", "Chip Select (active low)", required=False),
68 ]
70 options = [ # noqa: RUF012
71 OptionDef("cpol", "Clock Polarity", "Clock idle state", default=0, values=[0, 1]),
72 OptionDef("cpha", "Clock Phase", "Sample edge", default=0, values=[0, 1]),
73 OptionDef(
74 "word_size",
75 "Word size",
76 "Bits per word",
77 default=8,
78 values=[4, 8, 16, 24, 32],
79 ),
80 OptionDef("bit_order", "Bit order", "Bit order", default="msb", values=["msb", "lsb"]),
81 OptionDef(
82 "cs_polarity",
83 "CS polarity",
84 "Chip select polarity",
85 default=0,
86 values=[0, 1],
87 ),
88 ]
90 annotations = [ # noqa: RUF012
91 ("bit", "Bit value"),
92 ("byte", "Decoded byte"),
93 ("word", "Decoded word"),
94 ("transfer", "Complete transfer"),
95 ]
97 def __init__(
98 self,
99 cpol: Literal[0, 1] = 0,
100 cpha: Literal[0, 1] = 0,
101 word_size: int = 8,
102 bit_order: Literal["msb", "lsb"] = "msb",
103 cs_polarity: Literal[0, 1] = 0,
104 ) -> None:
105 """Initialize SPI decoder.
107 Args:
108 cpol: Clock polarity (0=idle low, 1=idle high).
109 cpha: Clock phase (0=sample on first edge, 1=sample on second edge).
110 word_size: Bits per word.
111 bit_order: Bit order ("msb" or "lsb").
112 cs_polarity: CS active level (0=active low, 1=active high).
113 """
114 super().__init__(
115 cpol=cpol,
116 cpha=cpha,
117 word_size=word_size,
118 bit_order=bit_order,
119 cs_polarity=cs_polarity,
120 )
121 self._cpol = cpol
122 self._cpha = cpha
123 self._word_size = word_size
124 self._bit_order = bit_order
125 self._cs_polarity = cs_polarity
127 def decode( # type: ignore[override]
128 self,
129 trace: DigitalTrace | None = None,
130 *,
131 clk: NDArray[np.bool_] | None = None,
132 mosi: NDArray[np.bool_] | None = None,
133 miso: NDArray[np.bool_] | None = None,
134 cs: NDArray[np.bool_] | None = None,
135 sample_rate: float = 1.0,
136 ) -> Iterator[ProtocolPacket]:
137 """Decode SPI transactions.
139 Args:
140 trace: Optional primary trace (uses clk if provided).
141 clk: Clock signal.
142 mosi: Master Out Slave In data.
143 miso: Master In Slave Out data (optional).
144 cs: Chip Select signal (optional).
145 sample_rate: Sample rate in Hz.
147 Yields:
148 Decoded SPI words as ProtocolPacket objects.
150 Example:
151 >>> decoder = SPIDecoder(cpol=0, cpha=0)
152 >>> for pkt in decoder.decode(clk=clk, mosi=mosi, miso=miso, sample_rate=1e9):
153 ... print(f"Word: 0x{pkt.annotations['mosi_value']:04X}")
154 """
155 if trace is not None:
156 clk = trace.data
157 sample_rate = trace.metadata.sample_rate
159 if clk is None or mosi is None:
160 return
162 n_samples = min(len(clk), len(mosi))
163 if miso is not None:
164 n_samples = min(n_samples, len(miso))
165 if cs is not None:
166 n_samples = min(n_samples, len(cs))
168 clk = clk[:n_samples]
169 mosi = mosi[:n_samples]
170 if miso is not None:
171 miso = miso[:n_samples]
172 if cs is not None:
173 cs = cs[:n_samples]
175 # Determine sampling edge based on CPOL and CPHA
176 # CPOL=0: idle low, first edge is rising
177 # CPOL=1: idle high, first edge is falling
178 # CPHA=0: sample on first edge
179 # CPHA=1: sample on second edge
180 if self._cpol == 0:
181 sample_edge = "rising" if self._cpha == 0 else "falling"
182 elif self._cpha == 0:
183 sample_edge = "falling"
184 else:
185 sample_edge = "rising"
187 # Find clock edges
188 if sample_edge == "rising":
189 edges = np.where(~clk[:-1] & clk[1:])[0] + 1
190 else:
191 edges = np.where(clk[:-1] & ~clk[1:])[0] + 1
193 if len(edges) == 0:
194 return
196 # Collect bits into words
197 mosi_bits: list[int] = []
198 miso_bits: list[int] = []
199 word_start_idx = edges[0]
200 word_num = 0
202 for edge_idx in edges:
203 # Check if CS is active (if provided)
204 if cs is not None:
205 cs_active = cs[edge_idx] == (self._cs_polarity == 1)
206 if not cs_active:
207 # CS not active, reset and skip
208 if mosi_bits: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was never true
209 # Emit partial word if any
210 pass
211 mosi_bits = []
212 miso_bits = []
213 continue
215 # Sample MOSI
216 mosi_bit = 1 if mosi[edge_idx] else 0
217 mosi_bits.append(mosi_bit)
219 # Sample MISO if available
220 if miso is not None:
221 miso_bit = 1 if miso[edge_idx] else 0
222 miso_bits.append(miso_bit)
224 # Check if we have a complete word
225 if len(mosi_bits) >= self._word_size:
226 # Convert bits to value
227 mosi_value = self._bits_to_value(mosi_bits[: self._word_size])
228 miso_value = (
229 self._bits_to_value(miso_bits[: self._word_size]) if miso_bits else None
230 )
232 # Calculate timing
233 start_time = word_start_idx / sample_rate
234 end_time = edge_idx / sample_rate
236 # Encode as bytes
237 byte_count = (self._word_size + 7) // 8
238 mosi_bytes = mosi_value.to_bytes(byte_count, "big")
240 # Add annotations
241 self.put_annotation(
242 start_time,
243 end_time,
244 AnnotationLevel.WORDS,
245 f"MOSI: 0x{mosi_value:0{byte_count * 2}X}",
246 data=mosi_bytes,
247 )
249 annotations = {
250 "word_num": word_num,
251 "mosi_bits": mosi_bits[: self._word_size],
252 "mosi_value": mosi_value,
253 "word_size": self._word_size,
254 "mode": self._cpol * 2 + self._cpha,
255 }
257 if miso_value is not None:
258 annotations["miso_bits"] = miso_bits[: self._word_size]
259 annotations["miso_value"] = miso_value
261 packet = ProtocolPacket(
262 timestamp=start_time,
263 protocol="spi",
264 data=mosi_bytes,
265 annotations=annotations,
266 errors=[],
267 )
269 yield packet
271 # Reset for next word
272 mosi_bits = mosi_bits[self._word_size :]
273 miso_bits = miso_bits[self._word_size :] if miso_bits else []
274 word_start_idx = edge_idx
275 word_num += 1
277 def _bits_to_value(self, bits: list[int]) -> int:
278 """Convert bit list to integer value.
280 Args:
281 bits: List of bit values (0 or 1).
283 Returns:
284 Integer value.
285 """
286 value = 0
288 if self._bit_order == "msb":
289 for bit in bits:
290 value = (value << 1) | bit
291 else:
292 for i, bit in enumerate(bits):
293 value |= bit << i
295 return value
298def decode_spi(
299 clk: NDArray[np.bool_],
300 mosi: NDArray[np.bool_] | None = None,
301 miso: NDArray[np.bool_] | None = None,
302 cs: NDArray[np.bool_] | None = None,
303 sample_rate: float = 1.0,
304 cpol: Literal[0, 1] = 0,
305 cpha: Literal[0, 1] = 0,
306 word_size: int = 8,
307 bit_order: Literal["msb", "lsb"] = "msb",
308) -> list[ProtocolPacket]:
309 """Convenience function to decode SPI transactions.
311 Args:
312 clk: Clock signal.
313 mosi: Master Out Slave In signal (optional).
314 miso: Master In Slave Out signal (optional).
315 cs: Chip select signal (optional, active low).
316 sample_rate: Sample rate in Hz.
317 cpol: Clock polarity (0 or 1).
318 cpha: Clock phase (0 or 1).
319 word_size: Bits per word (default 8).
320 bit_order: Bit order ("msb" or "lsb").
322 Returns:
323 List of decoded SPI transactions.
325 Example:
326 >>> packets = decode_spi(clk, mosi=mosi, miso=miso, sample_rate=10e6)
327 >>> for pkt in packets:
328 ... print(f"MOSI: {pkt.annotations['mosi'].hex()}")
329 """
330 decoder = SPIDecoder(cpol=cpol, cpha=cpha, word_size=word_size, bit_order=bit_order)
331 return list(decoder.decode(clk=clk, mosi=mosi, miso=miso, cs=cs, sample_rate=sample_rate))
334__all__ = ["SPIDecoder", "decode_spi"]