Coverage for src / tracekit / analyzers / protocols / manchester.py: 94%
95 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Manchester encoding decoder.
3This module provides Manchester and Differential Manchester encoding
4decoders with clock recovery and violation detection.
7Example:
8 >>> from tracekit.analyzers.protocols.manchester import ManchesterDecoder
9 >>> decoder = ManchesterDecoder(mode="ieee")
10 >>> for packet in decoder.decode(trace):
11 ... print(f"Data: {packet.data.hex()}")
13References:
14 IEEE 802.3 (Ethernet)
15 Thomas & Biba Differential Manchester
16"""
18from __future__ import annotations
20from enum import Enum
21from typing import TYPE_CHECKING, Literal
23import numpy as np
25from tracekit.analyzers.protocols.base import (
26 AnnotationLevel,
27 AsyncDecoder,
28 ChannelDef,
29 OptionDef,
30)
31from tracekit.core.types import DigitalTrace, ProtocolPacket, WaveformTrace
33if TYPE_CHECKING:
34 from collections.abc import Iterator
36 from numpy.typing import NDArray
39class ManchesterMode(Enum):
40 """Manchester encoding modes."""
42 IEEE = "ieee" # IEEE 802.3: 0=low-high, 1=high-low
43 THOMAS = "thomas" # G.E. Thomas: 0=high-low, 1=low-high
44 DIFFERENTIAL = "differential" # Differential Manchester
47class ManchesterDecoder(AsyncDecoder):
48 """Manchester encoding decoder.
50 Decodes Manchester and Differential Manchester encoded data
51 with automatic clock recovery and encoding violation detection.
53 Attributes:
54 id: "manchester"
55 name: "Manchester"
56 channels: [data] (required)
58 Example:
59 >>> decoder = ManchesterDecoder(mode="ieee", bit_rate=10000000)
60 >>> for packet in decoder.decode(trace):
61 ... print(f"Bits: {packet.annotations['bit_count']}")
62 """
64 id = "manchester"
65 name = "Manchester"
66 longname = "Manchester Encoding"
67 desc = "Manchester and Differential Manchester decoder"
69 channels = [ # noqa: RUF012
70 ChannelDef("data", "DATA", "Manchester encoded data", required=True),
71 ]
73 optional_channels = [] # noqa: RUF012
75 options = [ # noqa: RUF012
76 OptionDef("bit_rate", "Bit rate", "Bits per second", default=10000000, values=None),
77 OptionDef(
78 "mode",
79 "Encoding mode",
80 "Manchester variant",
81 default="ieee",
82 values=["ieee", "thomas", "differential"],
83 ),
84 ]
86 annotations = [ # noqa: RUF012
87 ("bit", "Decoded bit"),
88 ("clock", "Recovered clock"),
89 ("violation", "Encoding violation"),
90 ]
92 def __init__(
93 self,
94 bit_rate: int = 10000000,
95 mode: Literal["ieee", "thomas", "differential"] = "ieee",
96 ) -> None:
97 """Initialize Manchester decoder.
99 Args:
100 bit_rate: Bit rate in bps (before encoding).
101 mode: Encoding mode ("ieee", "thomas", or "differential").
102 """
103 # Manchester encoding doubles the transition rate
104 super().__init__(baudrate=bit_rate * 2, mode=mode, bit_rate=bit_rate)
105 self._bit_rate = bit_rate
106 self._mode = ManchesterMode(mode)
108 def decode(
109 self,
110 trace: DigitalTrace | WaveformTrace,
111 **channels: NDArray[np.bool_],
112 ) -> Iterator[ProtocolPacket]:
113 """Decode Manchester encoded data.
115 Args:
116 trace: Input digital trace.
117 **channels: Additional channel data.
119 Yields:
120 Decoded data as ProtocolPacket objects.
122 Example:
123 >>> decoder = ManchesterDecoder(mode="ieee", bit_rate=10e6)
124 >>> for packet in decoder.decode(trace):
125 ... print(f"Data: {packet.data.hex()}")
126 """
127 # Convert to digital if needed
128 if isinstance(trace, WaveformTrace):
129 from tracekit.analyzers.digital.extraction import to_digital
131 digital_trace = to_digital(trace, threshold="auto")
132 else:
133 digital_trace = trace
135 data = digital_trace.data
136 sample_rate = digital_trace.metadata.sample_rate
138 # Bit period (actual data bit, not symbol period)
139 bit_period = sample_rate / self._bit_rate
140 half_bit = bit_period / 2
142 # Find transitions for clock recovery
143 transitions = np.where(data[:-1] != data[1:])[0] + 1
145 if len(transitions) < 2:
146 return
148 # Decode bits based on transition timing
149 decoded_bits = [] # type: ignore[var-annotated]
150 errors = [] # type: ignore[var-annotated]
151 start_time = 0
153 if self._mode == ManchesterMode.DIFFERENTIAL:
154 decoded_bits, errors = self._decode_differential(data, transitions, half_bit)
155 else:
156 decoded_bits, errors = self._decode_standard(data, transitions, half_bit)
158 if len(decoded_bits) == 0: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 return
161 # Convert bits to bytes
162 byte_list = []
163 for i in range(0, len(decoded_bits), 8):
164 if i + 8 <= len(decoded_bits):
165 byte_val = 0
166 for j in range(8):
167 byte_val |= decoded_bits[i + j] << j
168 byte_list.append(byte_val)
170 # Calculate timing
171 start_time = transitions[0] / sample_rate if len(transitions) > 0 else 0
172 end_time = transitions[-1] / sample_rate if len(transitions) > 0 else 0
174 # Add annotation
175 self.put_annotation(
176 start_time,
177 end_time,
178 AnnotationLevel.BYTES,
179 f"{len(decoded_bits)} bits decoded",
180 )
182 # Create packet
183 annotations = {
184 "bit_count": len(decoded_bits),
185 "byte_count": len(byte_list),
186 "mode": self._mode.value,
187 "bit_rate": self._bit_rate,
188 }
190 packet = ProtocolPacket(
191 timestamp=start_time,
192 protocol="manchester",
193 data=bytes(byte_list),
194 annotations=annotations,
195 errors=errors,
196 )
198 yield packet
200 def _decode_standard(
201 self,
202 data: NDArray[np.bool_],
203 transitions: NDArray[np.int64],
204 half_bit: float,
205 ) -> tuple[list[int], list[str]]:
206 """Decode standard Manchester (IEEE or Thomas).
208 Args:
209 data: Digital data array.
210 transitions: Transition indices.
211 half_bit: Half-bit period in samples.
213 Returns:
214 (decoded_bits, errors) tuple.
215 """
216 decoded_bits = []
217 errors = [] # type: ignore[var-annotated]
219 # In Manchester, there's always a transition in the middle of each bit
220 # IEEE: 0=low-to-high, 1=high-to-low (mid-bit transition)
221 # Thomas: opposite
223 i = 0
224 while i < len(transitions) - 1:
225 trans_idx = transitions[i]
227 # Sample before and after transition
228 if trans_idx > 0 and trans_idx < len(data): 228 ↛ 241line 228 didn't jump to line 241 because the condition on line 228 was always true
229 before = data[trans_idx - 1]
230 after = data[trans_idx]
232 # Determine bit value based on transition direction
233 if not before and after: # Rising edge
234 bit = 0 if self._mode == ManchesterMode.IEEE else 1
235 else: # Falling edge
236 bit = 1 if self._mode == ManchesterMode.IEEE else 0
238 decoded_bits.append(bit)
240 # Look for next mid-bit transition (should be ~1 bit period away)
241 i += 1
243 # Check if there's a boundary transition (should be ~0.5 bit period)
244 if i < len(transitions): 244 ↛ 224line 244 didn't jump to line 224 because the condition on line 244 was always true
245 trans_spacing = transitions[i] - trans_idx
246 if trans_spacing < half_bit * 0.7:
247 # This is a boundary transition, skip it
248 i += 1
250 return decoded_bits, errors
252 def _decode_differential(
253 self,
254 data: NDArray[np.bool_],
255 transitions: NDArray[np.int64],
256 half_bit: float,
257 ) -> tuple[list[int], list[str]]:
258 """Decode Differential Manchester.
260 Args:
261 data: Digital data array.
262 transitions: Transition indices.
263 half_bit: Half-bit period in samples.
265 Returns:
266 (decoded_bits, errors) tuple.
267 """
268 decoded_bits = []
269 errors = [] # type: ignore[var-annotated]
271 # Differential Manchester:
272 # Always transition at bit boundary
273 # 0: additional transition at mid-bit
274 # 1: no additional transition at mid-bit
276 i = 0
277 while i < len(transitions) - 1:
278 trans_idx = transitions[i]
280 # Check if there's a transition within the bit period
281 next_trans_spacing = (
282 transitions[i + 1] - trans_idx if i + 1 < len(transitions) else float("inf")
283 )
285 if next_trans_spacing < half_bit * 1.5:
286 # Two transitions in this bit period -> bit = 0
287 decoded_bits.append(0)
288 i += 2
289 else:
290 # One transition in this bit period -> bit = 1
291 decoded_bits.append(1)
292 i += 1
294 return decoded_bits, errors
297def decode_manchester(
298 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace,
299 sample_rate: float = 1.0,
300 bit_rate: int = 10000000,
301 mode: Literal["ieee", "thomas", "differential"] = "ieee",
302) -> list[ProtocolPacket]:
303 """Convenience function to decode Manchester encoded data.
305 Args:
306 data: Manchester encoded signal (digital array or trace).
307 sample_rate: Sample rate in Hz.
308 bit_rate: Bit rate in bps (before encoding).
309 mode: Encoding mode ("ieee", "thomas", or "differential").
311 Returns:
312 List of decoded packets.
314 Example:
315 >>> packets = decode_manchester(signal, sample_rate=100e6, bit_rate=10e6)
316 >>> for pkt in packets:
317 ... print(f"Data: {pkt.data.hex()}")
318 """
319 decoder = ManchesterDecoder(bit_rate=bit_rate, mode=mode)
320 if isinstance(data, WaveformTrace | DigitalTrace): 320 ↛ 323line 320 didn't jump to line 323 because the condition on line 320 was always true
321 return list(decoder.decode(data))
322 else:
323 trace = DigitalTrace( # type: ignore[call-arg]
324 data=data,
325 sample_rate=sample_rate,
326 )
327 return list(decoder.decode(trace))
330__all__ = ["ManchesterDecoder", "ManchesterMode", "decode_manchester"]