Coverage for src / tracekit / analyzers / protocols / i2s.py: 91%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""I2S protocol decoder. 

2 

3This module provides Inter-IC Sound (I2S) audio protocol decoding 

4with support for standard, left-justified, and right-justified modes. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.i2s import I2SDecoder 

9 >>> decoder = I2SDecoder(bit_depth=16) 

10 >>> for packet in decoder.decode(bck=bck, ws=ws, sd=sd): 

11 ... print(f"Left: {packet.annotations['left_sample']}") 

12 

13References: 

14 I2S Bus Specification (Philips Semiconductors) 

15""" 

16 

17from __future__ import annotations 

18 

19from enum import Enum 

20from typing import TYPE_CHECKING, Literal 

21 

22import numpy as np 

23 

24from tracekit.analyzers.protocols.base import ( 

25 AnnotationLevel, 

26 ChannelDef, 

27 OptionDef, 

28 SyncDecoder, 

29) 

30from tracekit.core.types import DigitalTrace, ProtocolPacket 

31 

32if TYPE_CHECKING: 

33 from collections.abc import Iterator 

34 

35 from numpy.typing import NDArray 

36 

37 

38class I2SMode(Enum): 

39 """I2S alignment modes.""" 

40 

41 STANDARD = "standard" # MSB 1 clock after WS change 

42 LEFT_JUSTIFIED = "left_justified" # MSB at WS change 

43 RIGHT_JUSTIFIED = "right_justified" # MSB before WS change 

44 

45 

46class I2SDecoder(SyncDecoder): 

47 """I2S protocol decoder. 

48 

49 Decodes I2S audio bus transactions with configurable bit depth 

50 and alignment modes (standard, left-justified, right-justified). 

51 

52 Attributes: 

53 id: "i2s" 

54 name: "I2S" 

55 channels: [bck, ws, sd] (required) 

56 

57 Example: 

58 >>> decoder = I2SDecoder(bit_depth=24, mode="standard") 

59 >>> for packet in decoder.decode(bck=bck, ws=ws, sd=sd, sample_rate=1e6): 

60 ... print(f"Stereo: L={packet.annotations['left']} R={packet.annotations['right']}") 

61 """ 

62 

63 id = "i2s" 

64 name = "I2S" 

65 longname = "Inter-IC Sound" 

66 desc = "I2S audio bus protocol decoder" 

67 

68 channels = [ # noqa: RUF012 

69 ChannelDef("bck", "BCK", "Bit Clock (SCLK)", required=True), 

70 ChannelDef("ws", "WS", "Word Select (LRCLK)", required=True), 

71 ChannelDef("sd", "SD", "Serial Data", required=True), 

72 ] 

73 

74 optional_channels = [] # noqa: RUF012 

75 

76 options = [ # noqa: RUF012 

77 OptionDef( 

78 "bit_depth", 

79 "Bit depth", 

80 "Bits per sample", 

81 default=16, 

82 values=[8, 16, 24, 32], 

83 ), 

84 OptionDef( 

85 "mode", 

86 "Mode", 

87 "Alignment mode", 

88 default="standard", 

89 values=["standard", "left_justified", "right_justified"], 

90 ), 

91 ] 

92 

93 annotations = [ # noqa: RUF012 

94 ("left", "Left channel sample"), 

95 ("right", "Right channel sample"), 

96 ("word", "Word boundary"), 

97 ] 

98 

99 def __init__( 

100 self, 

101 bit_depth: int = 16, 

102 mode: Literal["standard", "left_justified", "right_justified"] = "standard", 

103 ) -> None: 

104 """Initialize I2S decoder. 

105 

106 Args: 

107 bit_depth: Bits per sample (8, 16, 24, 32). 

108 mode: Alignment mode. 

109 """ 

110 super().__init__(bit_depth=bit_depth, mode=mode) 

111 self._bit_depth = bit_depth 

112 self._mode = I2SMode(mode) 

113 

114 def decode( # type: ignore[override] 

115 self, 

116 trace: DigitalTrace | None = None, 

117 *, 

118 bck: NDArray[np.bool_] | None = None, 

119 ws: NDArray[np.bool_] | None = None, 

120 sd: NDArray[np.bool_] | None = None, 

121 sample_rate: float = 1.0, 

122 ) -> Iterator[ProtocolPacket]: 

123 """Decode I2S audio data. 

124 

125 Args: 

126 trace: Optional primary trace. 

127 bck: Bit Clock signal. 

128 ws: Word Select signal (0=left, 1=right). 

129 sd: Serial Data signal. 

130 sample_rate: Sample rate in Hz. 

131 

132 Yields: 

133 Decoded I2S samples as ProtocolPacket objects. 

134 

135 Example: 

136 >>> decoder = I2SDecoder(bit_depth=16) 

137 >>> for pkt in decoder.decode(bck=bck, ws=ws, sd=sd, sample_rate=1e6): 

138 ... print(f"Left: {pkt.annotations['left_sample']}") 

139 """ 

140 if bck is None or ws is None or sd is None: 

141 return 

142 

143 n_samples = min(len(bck), len(ws), len(sd)) 

144 bck = bck[:n_samples] 

145 ws = ws[:n_samples] 

146 sd = sd[:n_samples] 

147 

148 # Find rising edges of BCK (data sampled on rising edge in I2S) 

149 rising_edges = np.where(~bck[:-1] & bck[1:])[0] + 1 

150 

151 # Find WS transitions to identify word boundaries 

152 ws_transitions = np.where(ws[:-1] != ws[1:])[0] + 1 

153 

154 if len(rising_edges) == 0 or len(ws_transitions) == 0: 

155 return 

156 

157 trans_num = 0 

158 ws_idx = 0 

159 

160 while ws_idx < len(ws_transitions) - 1: 

161 # Get word boundaries 

162 word_start_idx = ws_transitions[ws_idx] 

163 word_end_idx = ws_transitions[ws_idx + 1] 

164 

165 # Determine channel (WS=0 is left, WS=1 is right in standard I2S) 

166 is_left = not ws[word_start_idx] 

167 

168 # Find BCK edges in this word period 

169 word_edges = rising_edges[ 

170 (rising_edges >= word_start_idx) & (rising_edges < word_end_idx) 

171 ] 

172 

173 if len(word_edges) == 0: 

174 ws_idx += 1 

175 continue 

176 

177 # In standard I2S mode, data starts 1 clock after WS change 

178 # In left-justified mode, data starts at WS change 

179 # In right-justified mode, data is aligned to end of word period 

180 if self._mode == I2SMode.STANDARD: 180 ↛ 183line 180 didn't jump to line 183 because the condition on line 180 was always true

181 # Skip first edge (data starts on second edge) 

182 data_edges = word_edges[1:] if len(word_edges) > 1 else [] 

183 elif self._mode == I2SMode.LEFT_JUSTIFIED: 

184 data_edges = word_edges 

185 else: # RIGHT_JUSTIFIED 

186 # Take last bit_depth edges 

187 data_edges = ( 

188 word_edges[-self._bit_depth :] 

189 if len(word_edges) >= self._bit_depth 

190 else word_edges 

191 ) 

192 

193 # Extract sample data (MSB first) 

194 sample_bits = [] 

195 for edge_idx in data_edges[: self._bit_depth]: 

196 if edge_idx < len(sd): 196 ↛ 195line 196 didn't jump to line 195 because the condition on line 196 was always true

197 sample_bits.append(1 if sd[edge_idx] else 0) 

198 

199 if len(sample_bits) < self._bit_depth: 199 ↛ 204line 199 didn't jump to line 204 because the condition on line 199 was always true

200 # Incomplete sample, pad with zeros 

201 sample_bits.extend([0] * (self._bit_depth - len(sample_bits))) 

202 

203 # Convert to signed integer value (MSB first, two's complement) 

204 sample_value = 0 

205 for bit in sample_bits: 

206 sample_value = (sample_value << 1) | bit 

207 

208 # Convert from unsigned to signed (two's complement) 

209 if sample_bits[0] == 1: # Negative number 

210 sample_value = sample_value - (1 << self._bit_depth) 

211 

212 # Calculate timing 

213 start_time = word_start_idx / sample_rate 

214 end_time = word_end_idx / sample_rate 

215 

216 # Store left and right channels 

217 if ws_idx % 2 == 0: 

218 # First word of stereo pair 

219 left_sample = sample_value if is_left else 0 

220 right_sample = 0 if is_left else sample_value 

221 first_start_time = start_time 

222 else: 

223 # Second word of stereo pair - emit packet 

224 if is_left: 224 ↛ 227line 224 didn't jump to line 227 because the condition on line 224 was always true

225 left_sample = sample_value 

226 else: 

227 right_sample = sample_value 

228 

229 # Add annotation 

230 self.put_annotation( 

231 first_start_time, 

232 end_time, 

233 AnnotationLevel.PACKETS, 

234 f"L: {left_sample} / R: {right_sample}", 

235 ) 

236 

237 # Create packet 

238 annotations = { 

239 "sample_num": trans_num, 

240 "left_sample": left_sample, 

241 "right_sample": right_sample, 

242 "bit_depth": self._bit_depth, 

243 "mode": self._mode.value, 

244 } 

245 

246 # Encode as bytes (little-endian, signed) 

247 byte_count = (self._bit_depth + 7) // 8 

248 left_bytes = left_sample.to_bytes(byte_count, "little", signed=True) 

249 right_bytes = right_sample.to_bytes(byte_count, "little", signed=True) 

250 data_bytes = left_bytes + right_bytes 

251 

252 packet = ProtocolPacket( 

253 timestamp=first_start_time, 

254 protocol="i2s", 

255 data=data_bytes, 

256 annotations=annotations, 

257 errors=[], 

258 ) 

259 

260 yield packet 

261 trans_num += 1 

262 

263 ws_idx += 1 

264 

265 

266def decode_i2s( 

267 bck: NDArray[np.bool_], 

268 ws: NDArray[np.bool_], 

269 sd: NDArray[np.bool_], 

270 sample_rate: float = 1.0, 

271 bit_depth: int = 16, 

272 mode: Literal["standard", "left_justified", "right_justified"] = "standard", 

273) -> list[ProtocolPacket]: 

274 """Convenience function to decode I2S audio data. 

275 

276 Args: 

277 bck: Bit Clock signal. 

278 ws: Word Select signal. 

279 sd: Serial Data signal. 

280 sample_rate: Sample rate in Hz. 

281 bit_depth: Bits per sample (8, 16, 24, 32). 

282 mode: Alignment mode. 

283 

284 Returns: 

285 List of decoded I2S stereo samples. 

286 

287 Example: 

288 >>> packets = decode_i2s(bck, ws, sd, sample_rate=1e6, bit_depth=16) 

289 >>> for pkt in packets: 

290 ... print(f"L={pkt.annotations['left_sample']}, R={pkt.annotations['right_sample']}") 

291 """ 

292 decoder = I2SDecoder(bit_depth=bit_depth, mode=mode) 

293 return list(decoder.decode(bck=bck, ws=ws, sd=sd, sample_rate=sample_rate)) 

294 

295 

296__all__ = ["I2SDecoder", "I2SMode", "decode_i2s"]