Coverage for src / tracekit / analyzers / protocols / i2c.py: 86%
132 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""I2C protocol decoder.
3This module provides I2C (Inter-Integrated Circuit) protocol decoding
4with ACK/NAK detection, arbitration monitoring, and multi-speed support.
7Example:
8 >>> from tracekit.analyzers.protocols.i2c import I2CDecoder
9 >>> decoder = I2CDecoder()
10 >>> for packet in decoder.decode(sda=sda, scl=scl):
11 ... print(f"Address: 0x{packet.annotations['address']:02X}")
13References:
14 I2C Specification (NXP UM10204)
15"""
17from __future__ import annotations
19from dataclasses import dataclass
20from enum import Enum
21from typing import TYPE_CHECKING
23import numpy as np
25from tracekit.analyzers.protocols.base import (
26 AnnotationLevel,
27 ChannelDef,
28 OptionDef,
29 SyncDecoder,
30)
31from tracekit.core.types import DigitalTrace, ProtocolPacket
33if TYPE_CHECKING:
34 from collections.abc import Iterator
36 from numpy.typing import NDArray
39class I2CCondition(Enum):
40 """I2C bus conditions."""
42 START = "start"
43 STOP = "stop"
44 REPEATED_START = "repeated_start"
45 ACK = "ack"
46 NAK = "nak"
49@dataclass
50class I2CTransaction:
51 """I2C transaction record.
53 Attributes:
54 address: 7-bit or 10-bit device address.
55 read: True for read, False for write.
56 data: Data bytes transferred.
57 acks: List of ACK (True) / NAK (False) for each byte.
58 errors: List of detected errors.
59 """
61 address: int
62 read: bool
63 data: list[int]
64 acks: list[bool]
65 errors: list[str]
68class I2CDecoder(SyncDecoder):
69 """I2C protocol decoder.
71 Decodes I2C bus transactions with ACK/NAK detection,
72 arbitration monitoring, and support for standard, fast,
73 and high-speed modes.
75 Example:
76 >>> decoder = I2CDecoder()
77 >>> for packet in decoder.decode(sda=sda, scl=scl, sample_rate=10e6):
78 ... print(f"Addr: 0x{packet.annotations['address']:02X}")
79 ... print(f"Data: {packet.data.hex()}")
80 """
82 id = "i2c"
83 name = "I2C"
84 longname = "Inter-Integrated Circuit"
85 desc = "I2C bus protocol decoder"
87 channels = [ # noqa: RUF012
88 ChannelDef("scl", "SCL", "Clock line", required=True),
89 ChannelDef("sda", "SDA", "Data line", required=True),
90 ]
92 optional_channels = [] # noqa: RUF012
94 options = [ # noqa: RUF012
95 OptionDef(
96 "address_format",
97 "Address format",
98 "7-bit or 10-bit",
99 default="auto",
100 values=["auto", "7bit", "10bit"],
101 ),
102 ]
104 annotations = [ # noqa: RUF012
105 ("start", "Start condition"),
106 ("stop", "Stop condition"),
107 ("address", "Device address"),
108 ("data", "Data byte"),
109 ("ack", "ACK"),
110 ("nak", "NAK"),
111 ("error", "Error"),
112 ]
114 def __init__(
115 self,
116 address_format: str = "auto",
117 ) -> None:
118 """Initialize I2C decoder.
120 Args:
121 address_format: Address format ("auto", "7bit", "10bit").
122 """
123 super().__init__(address_format=address_format)
124 self._address_format = address_format
126 def decode( # type: ignore[override]
127 self,
128 trace: DigitalTrace | None = None,
129 *,
130 scl: NDArray[np.bool_] | None = None,
131 sda: NDArray[np.bool_] | None = None,
132 sample_rate: float = 1.0,
133 ) -> Iterator[ProtocolPacket]:
134 """Decode I2C transactions.
136 Args:
137 trace: Optional primary trace.
138 scl: Clock signal.
139 sda: Data signal.
140 sample_rate: Sample rate in Hz.
142 Yields:
143 Decoded I2C transactions as ProtocolPacket objects.
145 Example:
146 >>> decoder = I2CDecoder()
147 >>> for pkt in decoder.decode(scl=scl, sda=sda, sample_rate=10e6):
148 ... print(f"Address: 0x{pkt.annotations['address']:02X}")
149 """
150 if scl is None or sda is None:
151 return
153 n_samples = min(len(scl), len(sda))
154 scl = scl[:n_samples]
155 sda = sda[:n_samples]
157 # Find start and stop conditions
158 # START: SDA falls while SCL is high
159 # STOP: SDA rises while SCL is high
161 conditions = []
163 for i in range(1, n_samples):
164 if scl[i] and scl[i - 1]: # SCL is high
165 if sda[i - 1] and not sda[i]: # SDA falling
166 conditions.append((i, I2CCondition.START))
167 elif not sda[i - 1] and sda[i]: # SDA rising
168 conditions.append((i, I2CCondition.STOP))
170 if len(conditions) == 0:
171 return
173 # Process transactions between START and STOP
174 trans_idx = 0
175 i = 0
177 while i < len(conditions):
178 if conditions[i][1] != I2CCondition.START: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 i += 1
180 continue
182 start_idx = conditions[i][0]
183 start_time = start_idx / sample_rate
185 # Find corresponding STOP or next START
186 end_cond_idx = i + 1
187 while end_cond_idx < len(conditions): 187 ↛ 195line 187 didn't jump to line 195 because the condition on line 187 was always true
188 if conditions[end_cond_idx][1] == I2CCondition.STOP: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true
189 break
190 if conditions[end_cond_idx][1] == I2CCondition.START:
191 # Repeated START
192 break
193 end_cond_idx += 1
195 if end_cond_idx >= len(conditions): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 break
198 end_idx = conditions[end_cond_idx][0]
199 is_repeated = conditions[end_cond_idx][1] == I2CCondition.START
201 # Extract bytes from this transaction
202 bytes_data, acks = self._extract_bytes(
203 scl[start_idx:end_idx],
204 sda[start_idx:end_idx],
205 )
207 if len(bytes_data) == 0: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 i = end_cond_idx
209 continue
211 # First byte is address + R/W
212 address_byte = bytes_data[0]
213 address = address_byte >> 1
214 is_read = (address_byte & 1) == 1
216 # Check for 10-bit address
217 is_10bit = False
218 actual_address = address
220 if self._address_format == "10bit" or (
221 self._address_format == "auto" and (address_byte >> 3) == 0b11110
222 ):
223 # 10-bit address format
224 if len(bytes_data) >= 2: 224 ↛ 232line 224 didn't jump to line 232 because the condition on line 224 was always true
225 is_10bit = True
226 high_bits = (address_byte >> 1) & 0b11
227 low_bits = bytes_data[1]
228 actual_address = (high_bits << 8) | low_bits
229 data_bytes = bytes_data[2:]
230 data_acks = acks[2:] if len(acks) > 2 else []
231 else:
232 data_bytes = []
233 data_acks = []
234 else:
235 actual_address = address
236 data_bytes = bytes_data[1:]
237 data_acks = acks[1:] if len(acks) > 1 else []
239 # Check for errors
240 errors = []
241 if len(acks) > 0 and not acks[0]:
242 errors.append("NAK on address")
244 for j, (_byte, ack) in enumerate(zip(data_bytes, data_acks, strict=False)):
245 if not ack and not is_read: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 errors.append(f"NAK on byte {j}")
248 end_time = end_idx / sample_rate
250 # Add annotations
251 self.put_annotation(
252 start_time,
253 start_time + 1e-6,
254 AnnotationLevel.BITS,
255 "START" if not is_repeated else "Sr",
256 )
258 addr_text = f"0x{actual_address:02X}" if not is_10bit else f"0x{actual_address:03X}"
259 self.put_annotation(
260 start_time,
261 end_time,
262 AnnotationLevel.FIELDS,
263 f"{addr_text} {'R' if is_read else 'W'}",
264 )
266 # Create packet
267 annotations = {
268 "address": actual_address,
269 "address_10bit": is_10bit,
270 "read": is_read,
271 "bytes": bytes_data,
272 "acks": acks,
273 "transaction_num": trans_idx,
274 }
276 packet = ProtocolPacket(
277 timestamp=start_time,
278 protocol="i2c",
279 data=bytes(data_bytes),
280 annotations=annotations,
281 errors=errors,
282 )
284 yield packet
286 trans_idx += 1
287 i = end_cond_idx
289 if is_repeated: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 continue
291 else:
292 i += 1
294 def _extract_bytes(
295 self,
296 scl: NDArray[np.bool_],
297 sda: NDArray[np.bool_],
298 ) -> tuple[list[int], list[bool]]:
299 """Extract bytes from I2C transaction.
301 Args:
302 scl: Clock signal segment.
303 sda: Data signal segment.
305 Returns:
306 (bytes, acks) - List of byte values and ACK flags.
307 """
308 # Find rising edges of SCL (data sampling points)
309 rising_edges = np.where(~scl[:-1] & scl[1:])[0] + 1
311 if len(rising_edges) < 9: # Need at least 8 data bits + ACK
312 return [], []
314 bytes_data = []
315 acks = []
317 i = 0
318 while i + 9 <= len(rising_edges):
319 # Extract 8 data bits (MSB first)
320 byte_val = 0
321 for bit_idx in range(8):
322 sample_idx = rising_edges[i + bit_idx]
323 if sample_idx < len(sda): 323 ↛ 321line 323 didn't jump to line 321 because the condition on line 323 was always true
324 bit = 1 if sda[sample_idx] else 0
325 byte_val = (byte_val << 1) | bit
327 # Extract ACK bit (9th bit, low = ACK, high = NAK)
328 ack_idx = rising_edges[i + 8]
329 if ack_idx < len(sda): 329 ↛ 332line 329 didn't jump to line 332 because the condition on line 329 was always true
330 ack = not sda[ack_idx] # Low = ACK
331 else:
332 ack = False
334 bytes_data.append(byte_val)
335 acks.append(ack)
337 i += 9
339 return bytes_data, acks
342def decode_i2c(
343 scl: NDArray[np.bool_],
344 sda: NDArray[np.bool_],
345 sample_rate: float = 1.0,
346 address_format: str = "auto",
347) -> list[ProtocolPacket]:
348 """Convenience function to decode I2C transactions.
350 Args:
351 scl: Clock signal.
352 sda: Data signal.
353 sample_rate: Sample rate in Hz.
354 address_format: Address format ("auto", "7bit", "10bit").
356 Returns:
357 List of decoded I2C transactions.
359 Example:
360 >>> packets = decode_i2c(scl, sda, sample_rate=10e6)
361 >>> for pkt in packets:
362 ... print(f"Address: 0x{pkt.annotations['address']:02X}")
363 """
364 decoder = I2CDecoder(address_format=address_format)
365 return list(decoder.decode(scl=scl, sda=sda, sample_rate=sample_rate))
368__all__ = ["I2CCondition", "I2CDecoder", "I2CTransaction", "decode_i2c"]