Coverage for src / tracekit / analyzers / protocols / i2c.py: 86%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""I2C protocol decoder. 

2 

3This module provides I2C (Inter-Integrated Circuit) protocol decoding 

4with ACK/NAK detection, arbitration monitoring, and multi-speed support. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.i2c import I2CDecoder 

9 >>> decoder = I2CDecoder() 

10 >>> for packet in decoder.decode(sda=sda, scl=scl): 

11 ... print(f"Address: 0x{packet.annotations['address']:02X}") 

12 

13References: 

14 I2C Specification (NXP UM10204) 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass 

20from enum import Enum 

21from typing import TYPE_CHECKING 

22 

23import numpy as np 

24 

25from tracekit.analyzers.protocols.base import ( 

26 AnnotationLevel, 

27 ChannelDef, 

28 OptionDef, 

29 SyncDecoder, 

30) 

31from tracekit.core.types import DigitalTrace, ProtocolPacket 

32 

33if TYPE_CHECKING: 

34 from collections.abc import Iterator 

35 

36 from numpy.typing import NDArray 

37 

38 

39class I2CCondition(Enum): 

40 """I2C bus conditions.""" 

41 

42 START = "start" 

43 STOP = "stop" 

44 REPEATED_START = "repeated_start" 

45 ACK = "ack" 

46 NAK = "nak" 

47 

48 

49@dataclass 

50class I2CTransaction: 

51 """I2C transaction record. 

52 

53 Attributes: 

54 address: 7-bit or 10-bit device address. 

55 read: True for read, False for write. 

56 data: Data bytes transferred. 

57 acks: List of ACK (True) / NAK (False) for each byte. 

58 errors: List of detected errors. 

59 """ 

60 

61 address: int 

62 read: bool 

63 data: list[int] 

64 acks: list[bool] 

65 errors: list[str] 

66 

67 

68class I2CDecoder(SyncDecoder): 

69 """I2C protocol decoder. 

70 

71 Decodes I2C bus transactions with ACK/NAK detection, 

72 arbitration monitoring, and support for standard, fast, 

73 and high-speed modes. 

74 

75 Example: 

76 >>> decoder = I2CDecoder() 

77 >>> for packet in decoder.decode(sda=sda, scl=scl, sample_rate=10e6): 

78 ... print(f"Addr: 0x{packet.annotations['address']:02X}") 

79 ... print(f"Data: {packet.data.hex()}") 

80 """ 

81 

82 id = "i2c" 

83 name = "I2C" 

84 longname = "Inter-Integrated Circuit" 

85 desc = "I2C bus protocol decoder" 

86 

87 channels = [ # noqa: RUF012 

88 ChannelDef("scl", "SCL", "Clock line", required=True), 

89 ChannelDef("sda", "SDA", "Data line", required=True), 

90 ] 

91 

92 optional_channels = [] # noqa: RUF012 

93 

94 options = [ # noqa: RUF012 

95 OptionDef( 

96 "address_format", 

97 "Address format", 

98 "7-bit or 10-bit", 

99 default="auto", 

100 values=["auto", "7bit", "10bit"], 

101 ), 

102 ] 

103 

104 annotations = [ # noqa: RUF012 

105 ("start", "Start condition"), 

106 ("stop", "Stop condition"), 

107 ("address", "Device address"), 

108 ("data", "Data byte"), 

109 ("ack", "ACK"), 

110 ("nak", "NAK"), 

111 ("error", "Error"), 

112 ] 

113 

114 def __init__( 

115 self, 

116 address_format: str = "auto", 

117 ) -> None: 

118 """Initialize I2C decoder. 

119 

120 Args: 

121 address_format: Address format ("auto", "7bit", "10bit"). 

122 """ 

123 super().__init__(address_format=address_format) 

124 self._address_format = address_format 

125 

126 def decode( # type: ignore[override] 

127 self, 

128 trace: DigitalTrace | None = None, 

129 *, 

130 scl: NDArray[np.bool_] | None = None, 

131 sda: NDArray[np.bool_] | None = None, 

132 sample_rate: float = 1.0, 

133 ) -> Iterator[ProtocolPacket]: 

134 """Decode I2C transactions. 

135 

136 Args: 

137 trace: Optional primary trace. 

138 scl: Clock signal. 

139 sda: Data signal. 

140 sample_rate: Sample rate in Hz. 

141 

142 Yields: 

143 Decoded I2C transactions as ProtocolPacket objects. 

144 

145 Example: 

146 >>> decoder = I2CDecoder() 

147 >>> for pkt in decoder.decode(scl=scl, sda=sda, sample_rate=10e6): 

148 ... print(f"Address: 0x{pkt.annotations['address']:02X}") 

149 """ 

150 if scl is None or sda is None: 

151 return 

152 

153 n_samples = min(len(scl), len(sda)) 

154 scl = scl[:n_samples] 

155 sda = sda[:n_samples] 

156 

157 # Find start and stop conditions 

158 # START: SDA falls while SCL is high 

159 # STOP: SDA rises while SCL is high 

160 

161 conditions = [] 

162 

163 for i in range(1, n_samples): 

164 if scl[i] and scl[i - 1]: # SCL is high 

165 if sda[i - 1] and not sda[i]: # SDA falling 

166 conditions.append((i, I2CCondition.START)) 

167 elif not sda[i - 1] and sda[i]: # SDA rising 

168 conditions.append((i, I2CCondition.STOP)) 

169 

170 if len(conditions) == 0: 

171 return 

172 

173 # Process transactions between START and STOP 

174 trans_idx = 0 

175 i = 0 

176 

177 while i < len(conditions): 

178 if conditions[i][1] != I2CCondition.START: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 i += 1 

180 continue 

181 

182 start_idx = conditions[i][0] 

183 start_time = start_idx / sample_rate 

184 

185 # Find corresponding STOP or next START 

186 end_cond_idx = i + 1 

187 while end_cond_idx < len(conditions): 187 ↛ 195line 187 didn't jump to line 195 because the condition on line 187 was always true

188 if conditions[end_cond_idx][1] == I2CCondition.STOP: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true

189 break 

190 if conditions[end_cond_idx][1] == I2CCondition.START: 

191 # Repeated START 

192 break 

193 end_cond_idx += 1 

194 

195 if end_cond_idx >= len(conditions): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 break 

197 

198 end_idx = conditions[end_cond_idx][0] 

199 is_repeated = conditions[end_cond_idx][1] == I2CCondition.START 

200 

201 # Extract bytes from this transaction 

202 bytes_data, acks = self._extract_bytes( 

203 scl[start_idx:end_idx], 

204 sda[start_idx:end_idx], 

205 ) 

206 

207 if len(bytes_data) == 0: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 i = end_cond_idx 

209 continue 

210 

211 # First byte is address + R/W 

212 address_byte = bytes_data[0] 

213 address = address_byte >> 1 

214 is_read = (address_byte & 1) == 1 

215 

216 # Check for 10-bit address 

217 is_10bit = False 

218 actual_address = address 

219 

220 if self._address_format == "10bit" or ( 

221 self._address_format == "auto" and (address_byte >> 3) == 0b11110 

222 ): 

223 # 10-bit address format 

224 if len(bytes_data) >= 2: 224 ↛ 232line 224 didn't jump to line 232 because the condition on line 224 was always true

225 is_10bit = True 

226 high_bits = (address_byte >> 1) & 0b11 

227 low_bits = bytes_data[1] 

228 actual_address = (high_bits << 8) | low_bits 

229 data_bytes = bytes_data[2:] 

230 data_acks = acks[2:] if len(acks) > 2 else [] 

231 else: 

232 data_bytes = [] 

233 data_acks = [] 

234 else: 

235 actual_address = address 

236 data_bytes = bytes_data[1:] 

237 data_acks = acks[1:] if len(acks) > 1 else [] 

238 

239 # Check for errors 

240 errors = [] 

241 if len(acks) > 0 and not acks[0]: 

242 errors.append("NAK on address") 

243 

244 for j, (_byte, ack) in enumerate(zip(data_bytes, data_acks, strict=False)): 

245 if not ack and not is_read: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 errors.append(f"NAK on byte {j}") 

247 

248 end_time = end_idx / sample_rate 

249 

250 # Add annotations 

251 self.put_annotation( 

252 start_time, 

253 start_time + 1e-6, 

254 AnnotationLevel.BITS, 

255 "START" if not is_repeated else "Sr", 

256 ) 

257 

258 addr_text = f"0x{actual_address:02X}" if not is_10bit else f"0x{actual_address:03X}" 

259 self.put_annotation( 

260 start_time, 

261 end_time, 

262 AnnotationLevel.FIELDS, 

263 f"{addr_text} {'R' if is_read else 'W'}", 

264 ) 

265 

266 # Create packet 

267 annotations = { 

268 "address": actual_address, 

269 "address_10bit": is_10bit, 

270 "read": is_read, 

271 "bytes": bytes_data, 

272 "acks": acks, 

273 "transaction_num": trans_idx, 

274 } 

275 

276 packet = ProtocolPacket( 

277 timestamp=start_time, 

278 protocol="i2c", 

279 data=bytes(data_bytes), 

280 annotations=annotations, 

281 errors=errors, 

282 ) 

283 

284 yield packet 

285 

286 trans_idx += 1 

287 i = end_cond_idx 

288 

289 if is_repeated: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 continue 

291 else: 

292 i += 1 

293 

294 def _extract_bytes( 

295 self, 

296 scl: NDArray[np.bool_], 

297 sda: NDArray[np.bool_], 

298 ) -> tuple[list[int], list[bool]]: 

299 """Extract bytes from I2C transaction. 

300 

301 Args: 

302 scl: Clock signal segment. 

303 sda: Data signal segment. 

304 

305 Returns: 

306 (bytes, acks) - List of byte values and ACK flags. 

307 """ 

308 # Find rising edges of SCL (data sampling points) 

309 rising_edges = np.where(~scl[:-1] & scl[1:])[0] + 1 

310 

311 if len(rising_edges) < 9: # Need at least 8 data bits + ACK 

312 return [], [] 

313 

314 bytes_data = [] 

315 acks = [] 

316 

317 i = 0 

318 while i + 9 <= len(rising_edges): 

319 # Extract 8 data bits (MSB first) 

320 byte_val = 0 

321 for bit_idx in range(8): 

322 sample_idx = rising_edges[i + bit_idx] 

323 if sample_idx < len(sda): 323 ↛ 321line 323 didn't jump to line 321 because the condition on line 323 was always true

324 bit = 1 if sda[sample_idx] else 0 

325 byte_val = (byte_val << 1) | bit 

326 

327 # Extract ACK bit (9th bit, low = ACK, high = NAK) 

328 ack_idx = rising_edges[i + 8] 

329 if ack_idx < len(sda): 329 ↛ 332line 329 didn't jump to line 332 because the condition on line 329 was always true

330 ack = not sda[ack_idx] # Low = ACK 

331 else: 

332 ack = False 

333 

334 bytes_data.append(byte_val) 

335 acks.append(ack) 

336 

337 i += 9 

338 

339 return bytes_data, acks 

340 

341 

342def decode_i2c( 

343 scl: NDArray[np.bool_], 

344 sda: NDArray[np.bool_], 

345 sample_rate: float = 1.0, 

346 address_format: str = "auto", 

347) -> list[ProtocolPacket]: 

348 """Convenience function to decode I2C transactions. 

349 

350 Args: 

351 scl: Clock signal. 

352 sda: Data signal. 

353 sample_rate: Sample rate in Hz. 

354 address_format: Address format ("auto", "7bit", "10bit"). 

355 

356 Returns: 

357 List of decoded I2C transactions. 

358 

359 Example: 

360 >>> packets = decode_i2c(scl, sda, sample_rate=10e6) 

361 >>> for pkt in packets: 

362 ... print(f"Address: 0x{pkt.annotations['address']:02X}") 

363 """ 

364 decoder = I2CDecoder(address_format=address_format) 

365 return list(decoder.decode(scl=scl, sda=sda, sample_rate=sample_rate)) 

366 

367 

368__all__ = ["I2CCondition", "I2CDecoder", "I2CTransaction", "decode_i2c"]