Coverage for src / tracekit / analyzers / protocols / manchester.py: 94%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Manchester encoding decoder. 

2 

3This module provides Manchester and Differential Manchester encoding 

4decoders with clock recovery and violation detection. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.manchester import ManchesterDecoder 

9 >>> decoder = ManchesterDecoder(mode="ieee") 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"Data: {packet.data.hex()}") 

12 

13References: 

14 IEEE 802.3 (Ethernet) 

15 Thomas & Biba Differential Manchester 

16""" 

17 

18from __future__ import annotations 

19 

20from enum import Enum 

21from typing import TYPE_CHECKING, Literal 

22 

23import numpy as np 

24 

25from tracekit.analyzers.protocols.base import ( 

26 AnnotationLevel, 

27 AsyncDecoder, 

28 ChannelDef, 

29 OptionDef, 

30) 

31from tracekit.core.types import DigitalTrace, ProtocolPacket, WaveformTrace 

32 

33if TYPE_CHECKING: 

34 from collections.abc import Iterator 

35 

36 from numpy.typing import NDArray 

37 

38 

39class ManchesterMode(Enum): 

40 """Manchester encoding modes.""" 

41 

42 IEEE = "ieee" # IEEE 802.3: 0=low-high, 1=high-low 

43 THOMAS = "thomas" # G.E. Thomas: 0=high-low, 1=low-high 

44 DIFFERENTIAL = "differential" # Differential Manchester 

45 

46 

47class ManchesterDecoder(AsyncDecoder): 

48 """Manchester encoding decoder. 

49 

50 Decodes Manchester and Differential Manchester encoded data 

51 with automatic clock recovery and encoding violation detection. 

52 

53 Attributes: 

54 id: "manchester" 

55 name: "Manchester" 

56 channels: [data] (required) 

57 

58 Example: 

59 >>> decoder = ManchesterDecoder(mode="ieee", bit_rate=10000000) 

60 >>> for packet in decoder.decode(trace): 

61 ... print(f"Bits: {packet.annotations['bit_count']}") 

62 """ 

63 

64 id = "manchester" 

65 name = "Manchester" 

66 longname = "Manchester Encoding" 

67 desc = "Manchester and Differential Manchester decoder" 

68 

69 channels = [ # noqa: RUF012 

70 ChannelDef("data", "DATA", "Manchester encoded data", required=True), 

71 ] 

72 

73 optional_channels = [] # noqa: RUF012 

74 

75 options = [ # noqa: RUF012 

76 OptionDef("bit_rate", "Bit rate", "Bits per second", default=10000000, values=None), 

77 OptionDef( 

78 "mode", 

79 "Encoding mode", 

80 "Manchester variant", 

81 default="ieee", 

82 values=["ieee", "thomas", "differential"], 

83 ), 

84 ] 

85 

86 annotations = [ # noqa: RUF012 

87 ("bit", "Decoded bit"), 

88 ("clock", "Recovered clock"), 

89 ("violation", "Encoding violation"), 

90 ] 

91 

92 def __init__( 

93 self, 

94 bit_rate: int = 10000000, 

95 mode: Literal["ieee", "thomas", "differential"] = "ieee", 

96 ) -> None: 

97 """Initialize Manchester decoder. 

98 

99 Args: 

100 bit_rate: Bit rate in bps (before encoding). 

101 mode: Encoding mode ("ieee", "thomas", or "differential"). 

102 """ 

103 # Manchester encoding doubles the transition rate 

104 super().__init__(baudrate=bit_rate * 2, mode=mode, bit_rate=bit_rate) 

105 self._bit_rate = bit_rate 

106 self._mode = ManchesterMode(mode) 

107 

108 def decode( 

109 self, 

110 trace: DigitalTrace | WaveformTrace, 

111 **channels: NDArray[np.bool_], 

112 ) -> Iterator[ProtocolPacket]: 

113 """Decode Manchester encoded data. 

114 

115 Args: 

116 trace: Input digital trace. 

117 **channels: Additional channel data. 

118 

119 Yields: 

120 Decoded data as ProtocolPacket objects. 

121 

122 Example: 

123 >>> decoder = ManchesterDecoder(mode="ieee", bit_rate=10e6) 

124 >>> for packet in decoder.decode(trace): 

125 ... print(f"Data: {packet.data.hex()}") 

126 """ 

127 # Convert to digital if needed 

128 if isinstance(trace, WaveformTrace): 

129 from tracekit.analyzers.digital.extraction import to_digital 

130 

131 digital_trace = to_digital(trace, threshold="auto") 

132 else: 

133 digital_trace = trace 

134 

135 data = digital_trace.data 

136 sample_rate = digital_trace.metadata.sample_rate 

137 

138 # Bit period (actual data bit, not symbol period) 

139 bit_period = sample_rate / self._bit_rate 

140 half_bit = bit_period / 2 

141 

142 # Find transitions for clock recovery 

143 transitions = np.where(data[:-1] != data[1:])[0] + 1 

144 

145 if len(transitions) < 2: 

146 return 

147 

148 # Decode bits based on transition timing 

149 decoded_bits = [] # type: ignore[var-annotated] 

150 errors = [] # type: ignore[var-annotated] 

151 start_time = 0 

152 

153 if self._mode == ManchesterMode.DIFFERENTIAL: 

154 decoded_bits, errors = self._decode_differential(data, transitions, half_bit) 

155 else: 

156 decoded_bits, errors = self._decode_standard(data, transitions, half_bit) 

157 

158 if len(decoded_bits) == 0: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 return 

160 

161 # Convert bits to bytes 

162 byte_list = [] 

163 for i in range(0, len(decoded_bits), 8): 

164 if i + 8 <= len(decoded_bits): 

165 byte_val = 0 

166 for j in range(8): 

167 byte_val |= decoded_bits[i + j] << j 

168 byte_list.append(byte_val) 

169 

170 # Calculate timing 

171 start_time = transitions[0] / sample_rate if len(transitions) > 0 else 0 

172 end_time = transitions[-1] / sample_rate if len(transitions) > 0 else 0 

173 

174 # Add annotation 

175 self.put_annotation( 

176 start_time, 

177 end_time, 

178 AnnotationLevel.BYTES, 

179 f"{len(decoded_bits)} bits decoded", 

180 ) 

181 

182 # Create packet 

183 annotations = { 

184 "bit_count": len(decoded_bits), 

185 "byte_count": len(byte_list), 

186 "mode": self._mode.value, 

187 "bit_rate": self._bit_rate, 

188 } 

189 

190 packet = ProtocolPacket( 

191 timestamp=start_time, 

192 protocol="manchester", 

193 data=bytes(byte_list), 

194 annotations=annotations, 

195 errors=errors, 

196 ) 

197 

198 yield packet 

199 

200 def _decode_standard( 

201 self, 

202 data: NDArray[np.bool_], 

203 transitions: NDArray[np.int64], 

204 half_bit: float, 

205 ) -> tuple[list[int], list[str]]: 

206 """Decode standard Manchester (IEEE or Thomas). 

207 

208 Args: 

209 data: Digital data array. 

210 transitions: Transition indices. 

211 half_bit: Half-bit period in samples. 

212 

213 Returns: 

214 (decoded_bits, errors) tuple. 

215 """ 

216 decoded_bits = [] 

217 errors = [] # type: ignore[var-annotated] 

218 

219 # In Manchester, there's always a transition in the middle of each bit 

220 # IEEE: 0=low-to-high, 1=high-to-low (mid-bit transition) 

221 # Thomas: opposite 

222 

223 i = 0 

224 while i < len(transitions) - 1: 

225 trans_idx = transitions[i] 

226 

227 # Sample before and after transition 

228 if trans_idx > 0 and trans_idx < len(data): 228 ↛ 241line 228 didn't jump to line 241 because the condition on line 228 was always true

229 before = data[trans_idx - 1] 

230 after = data[trans_idx] 

231 

232 # Determine bit value based on transition direction 

233 if not before and after: # Rising edge 

234 bit = 0 if self._mode == ManchesterMode.IEEE else 1 

235 else: # Falling edge 

236 bit = 1 if self._mode == ManchesterMode.IEEE else 0 

237 

238 decoded_bits.append(bit) 

239 

240 # Look for next mid-bit transition (should be ~1 bit period away) 

241 i += 1 

242 

243 # Check if there's a boundary transition (should be ~0.5 bit period) 

244 if i < len(transitions): 244 ↛ 224line 244 didn't jump to line 224 because the condition on line 244 was always true

245 trans_spacing = transitions[i] - trans_idx 

246 if trans_spacing < half_bit * 0.7: 

247 # This is a boundary transition, skip it 

248 i += 1 

249 

250 return decoded_bits, errors 

251 

252 def _decode_differential( 

253 self, 

254 data: NDArray[np.bool_], 

255 transitions: NDArray[np.int64], 

256 half_bit: float, 

257 ) -> tuple[list[int], list[str]]: 

258 """Decode Differential Manchester. 

259 

260 Args: 

261 data: Digital data array. 

262 transitions: Transition indices. 

263 half_bit: Half-bit period in samples. 

264 

265 Returns: 

266 (decoded_bits, errors) tuple. 

267 """ 

268 decoded_bits = [] 

269 errors = [] # type: ignore[var-annotated] 

270 

271 # Differential Manchester: 

272 # Always transition at bit boundary 

273 # 0: additional transition at mid-bit 

274 # 1: no additional transition at mid-bit 

275 

276 i = 0 

277 while i < len(transitions) - 1: 

278 trans_idx = transitions[i] 

279 

280 # Check if there's a transition within the bit period 

281 next_trans_spacing = ( 

282 transitions[i + 1] - trans_idx if i + 1 < len(transitions) else float("inf") 

283 ) 

284 

285 if next_trans_spacing < half_bit * 1.5: 

286 # Two transitions in this bit period -> bit = 0 

287 decoded_bits.append(0) 

288 i += 2 

289 else: 

290 # One transition in this bit period -> bit = 1 

291 decoded_bits.append(1) 

292 i += 1 

293 

294 return decoded_bits, errors 

295 

296 

297def decode_manchester( 

298 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace, 

299 sample_rate: float = 1.0, 

300 bit_rate: int = 10000000, 

301 mode: Literal["ieee", "thomas", "differential"] = "ieee", 

302) -> list[ProtocolPacket]: 

303 """Convenience function to decode Manchester encoded data. 

304 

305 Args: 

306 data: Manchester encoded signal (digital array or trace). 

307 sample_rate: Sample rate in Hz. 

308 bit_rate: Bit rate in bps (before encoding). 

309 mode: Encoding mode ("ieee", "thomas", or "differential"). 

310 

311 Returns: 

312 List of decoded packets. 

313 

314 Example: 

315 >>> packets = decode_manchester(signal, sample_rate=100e6, bit_rate=10e6) 

316 >>> for pkt in packets: 

317 ... print(f"Data: {pkt.data.hex()}") 

318 """ 

319 decoder = ManchesterDecoder(bit_rate=bit_rate, mode=mode) 

320 if isinstance(data, WaveformTrace | DigitalTrace): 320 ↛ 323line 320 didn't jump to line 323 because the condition on line 320 was always true

321 return list(decoder.decode(data)) 

322 else: 

323 trace = DigitalTrace( # type: ignore[call-arg] 

324 data=data, 

325 sample_rate=sample_rate, 

326 ) 

327 return list(decoder.decode(trace)) 

328 

329 

330__all__ = ["ManchesterDecoder", "ManchesterMode", "decode_manchester"]