Coverage for src / tracekit / analyzers / protocols / i2s.py: 91%
83 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""I2S protocol decoder.
3This module provides Inter-IC Sound (I2S) audio protocol decoding
4with support for standard, left-justified, and right-justified modes.
7Example:
8 >>> from tracekit.analyzers.protocols.i2s import I2SDecoder
9 >>> decoder = I2SDecoder(bit_depth=16)
10 >>> for packet in decoder.decode(bck=bck, ws=ws, sd=sd):
11 ... print(f"Left: {packet.annotations['left_sample']}")
13References:
14 I2S Bus Specification (Philips Semiconductors)
15"""
17from __future__ import annotations
19from enum import Enum
20from typing import TYPE_CHECKING, Literal
22import numpy as np
24from tracekit.analyzers.protocols.base import (
25 AnnotationLevel,
26 ChannelDef,
27 OptionDef,
28 SyncDecoder,
29)
30from tracekit.core.types import DigitalTrace, ProtocolPacket
32if TYPE_CHECKING:
33 from collections.abc import Iterator
35 from numpy.typing import NDArray
38class I2SMode(Enum):
39 """I2S alignment modes."""
41 STANDARD = "standard" # MSB 1 clock after WS change
42 LEFT_JUSTIFIED = "left_justified" # MSB at WS change
43 RIGHT_JUSTIFIED = "right_justified" # MSB before WS change
46class I2SDecoder(SyncDecoder):
47 """I2S protocol decoder.
49 Decodes I2S audio bus transactions with configurable bit depth
50 and alignment modes (standard, left-justified, right-justified).
52 Attributes:
53 id: "i2s"
54 name: "I2S"
55 channels: [bck, ws, sd] (required)
57 Example:
58 >>> decoder = I2SDecoder(bit_depth=24, mode="standard")
59 >>> for packet in decoder.decode(bck=bck, ws=ws, sd=sd, sample_rate=1e6):
60 ... print(f"Stereo: L={packet.annotations['left']} R={packet.annotations['right']}")
61 """
63 id = "i2s"
64 name = "I2S"
65 longname = "Inter-IC Sound"
66 desc = "I2S audio bus protocol decoder"
68 channels = [ # noqa: RUF012
69 ChannelDef("bck", "BCK", "Bit Clock (SCLK)", required=True),
70 ChannelDef("ws", "WS", "Word Select (LRCLK)", required=True),
71 ChannelDef("sd", "SD", "Serial Data", required=True),
72 ]
74 optional_channels = [] # noqa: RUF012
76 options = [ # noqa: RUF012
77 OptionDef(
78 "bit_depth",
79 "Bit depth",
80 "Bits per sample",
81 default=16,
82 values=[8, 16, 24, 32],
83 ),
84 OptionDef(
85 "mode",
86 "Mode",
87 "Alignment mode",
88 default="standard",
89 values=["standard", "left_justified", "right_justified"],
90 ),
91 ]
93 annotations = [ # noqa: RUF012
94 ("left", "Left channel sample"),
95 ("right", "Right channel sample"),
96 ("word", "Word boundary"),
97 ]
99 def __init__(
100 self,
101 bit_depth: int = 16,
102 mode: Literal["standard", "left_justified", "right_justified"] = "standard",
103 ) -> None:
104 """Initialize I2S decoder.
106 Args:
107 bit_depth: Bits per sample (8, 16, 24, 32).
108 mode: Alignment mode.
109 """
110 super().__init__(bit_depth=bit_depth, mode=mode)
111 self._bit_depth = bit_depth
112 self._mode = I2SMode(mode)
114 def decode( # type: ignore[override]
115 self,
116 trace: DigitalTrace | None = None,
117 *,
118 bck: NDArray[np.bool_] | None = None,
119 ws: NDArray[np.bool_] | None = None,
120 sd: NDArray[np.bool_] | None = None,
121 sample_rate: float = 1.0,
122 ) -> Iterator[ProtocolPacket]:
123 """Decode I2S audio data.
125 Args:
126 trace: Optional primary trace.
127 bck: Bit Clock signal.
128 ws: Word Select signal (0=left, 1=right).
129 sd: Serial Data signal.
130 sample_rate: Sample rate in Hz.
132 Yields:
133 Decoded I2S samples as ProtocolPacket objects.
135 Example:
136 >>> decoder = I2SDecoder(bit_depth=16)
137 >>> for pkt in decoder.decode(bck=bck, ws=ws, sd=sd, sample_rate=1e6):
138 ... print(f"Left: {pkt.annotations['left_sample']}")
139 """
140 if bck is None or ws is None or sd is None:
141 return
143 n_samples = min(len(bck), len(ws), len(sd))
144 bck = bck[:n_samples]
145 ws = ws[:n_samples]
146 sd = sd[:n_samples]
148 # Find rising edges of BCK (data sampled on rising edge in I2S)
149 rising_edges = np.where(~bck[:-1] & bck[1:])[0] + 1
151 # Find WS transitions to identify word boundaries
152 ws_transitions = np.where(ws[:-1] != ws[1:])[0] + 1
154 if len(rising_edges) == 0 or len(ws_transitions) == 0:
155 return
157 trans_num = 0
158 ws_idx = 0
160 while ws_idx < len(ws_transitions) - 1:
161 # Get word boundaries
162 word_start_idx = ws_transitions[ws_idx]
163 word_end_idx = ws_transitions[ws_idx + 1]
165 # Determine channel (WS=0 is left, WS=1 is right in standard I2S)
166 is_left = not ws[word_start_idx]
168 # Find BCK edges in this word period
169 word_edges = rising_edges[
170 (rising_edges >= word_start_idx) & (rising_edges < word_end_idx)
171 ]
173 if len(word_edges) == 0:
174 ws_idx += 1
175 continue
177 # In standard I2S mode, data starts 1 clock after WS change
178 # In left-justified mode, data starts at WS change
179 # In right-justified mode, data is aligned to end of word period
180 if self._mode == I2SMode.STANDARD: 180 ↛ 183line 180 didn't jump to line 183 because the condition on line 180 was always true
181 # Skip first edge (data starts on second edge)
182 data_edges = word_edges[1:] if len(word_edges) > 1 else []
183 elif self._mode == I2SMode.LEFT_JUSTIFIED:
184 data_edges = word_edges
185 else: # RIGHT_JUSTIFIED
186 # Take last bit_depth edges
187 data_edges = (
188 word_edges[-self._bit_depth :]
189 if len(word_edges) >= self._bit_depth
190 else word_edges
191 )
193 # Extract sample data (MSB first)
194 sample_bits = []
195 for edge_idx in data_edges[: self._bit_depth]:
196 if edge_idx < len(sd): 196 ↛ 195line 196 didn't jump to line 195 because the condition on line 196 was always true
197 sample_bits.append(1 if sd[edge_idx] else 0)
199 if len(sample_bits) < self._bit_depth: 199 ↛ 204line 199 didn't jump to line 204 because the condition on line 199 was always true
200 # Incomplete sample, pad with zeros
201 sample_bits.extend([0] * (self._bit_depth - len(sample_bits)))
203 # Convert to signed integer value (MSB first, two's complement)
204 sample_value = 0
205 for bit in sample_bits:
206 sample_value = (sample_value << 1) | bit
208 # Convert from unsigned to signed (two's complement)
209 if sample_bits[0] == 1: # Negative number
210 sample_value = sample_value - (1 << self._bit_depth)
212 # Calculate timing
213 start_time = word_start_idx / sample_rate
214 end_time = word_end_idx / sample_rate
216 # Store left and right channels
217 if ws_idx % 2 == 0:
218 # First word of stereo pair
219 left_sample = sample_value if is_left else 0
220 right_sample = 0 if is_left else sample_value
221 first_start_time = start_time
222 else:
223 # Second word of stereo pair - emit packet
224 if is_left: 224 ↛ 227line 224 didn't jump to line 227 because the condition on line 224 was always true
225 left_sample = sample_value
226 else:
227 right_sample = sample_value
229 # Add annotation
230 self.put_annotation(
231 first_start_time,
232 end_time,
233 AnnotationLevel.PACKETS,
234 f"L: {left_sample} / R: {right_sample}",
235 )
237 # Create packet
238 annotations = {
239 "sample_num": trans_num,
240 "left_sample": left_sample,
241 "right_sample": right_sample,
242 "bit_depth": self._bit_depth,
243 "mode": self._mode.value,
244 }
246 # Encode as bytes (little-endian, signed)
247 byte_count = (self._bit_depth + 7) // 8
248 left_bytes = left_sample.to_bytes(byte_count, "little", signed=True)
249 right_bytes = right_sample.to_bytes(byte_count, "little", signed=True)
250 data_bytes = left_bytes + right_bytes
252 packet = ProtocolPacket(
253 timestamp=first_start_time,
254 protocol="i2s",
255 data=data_bytes,
256 annotations=annotations,
257 errors=[],
258 )
260 yield packet
261 trans_num += 1
263 ws_idx += 1
266def decode_i2s(
267 bck: NDArray[np.bool_],
268 ws: NDArray[np.bool_],
269 sd: NDArray[np.bool_],
270 sample_rate: float = 1.0,
271 bit_depth: int = 16,
272 mode: Literal["standard", "left_justified", "right_justified"] = "standard",
273) -> list[ProtocolPacket]:
274 """Convenience function to decode I2S audio data.
276 Args:
277 bck: Bit Clock signal.
278 ws: Word Select signal.
279 sd: Serial Data signal.
280 sample_rate: Sample rate in Hz.
281 bit_depth: Bits per sample (8, 16, 24, 32).
282 mode: Alignment mode.
284 Returns:
285 List of decoded I2S stereo samples.
287 Example:
288 >>> packets = decode_i2s(bck, ws, sd, sample_rate=1e6, bit_depth=16)
289 >>> for pkt in packets:
290 ... print(f"L={pkt.annotations['left_sample']}, R={pkt.annotations['right_sample']}")
291 """
292 decoder = I2SDecoder(bit_depth=bit_depth, mode=mode)
293 return list(decoder.decode(bck=bck, ws=ws, sd=sd, sample_rate=sample_rate))
296__all__ = ["I2SDecoder", "I2SMode", "decode_i2s"]