Coverage for src / tracekit / analyzers / protocols / flexray.py: 93%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""FlexRay protocol decoder. 

2 

3This module implements FlexRay automotive protocol decoder with support 

4for static and dynamic segments, 10 Mbps signaling, and CRC validation. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.flexray import FlexRayDecoder 

9 >>> decoder = FlexRayDecoder() 

10 >>> for packet in decoder.decode(bp=bp, bm=bm): 

11 ... print(f"Slot: {packet.annotations['slot_id']}") 

12 

13References: 

14 FlexRay Communications System Protocol Specification Version 3.0.1 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass 

20from enum import Enum 

21from typing import TYPE_CHECKING 

22 

23from tracekit.analyzers.protocols.base import ( 

24 AnnotationLevel, 

25 AsyncDecoder, 

26 ChannelDef, 

27 OptionDef, 

28) 

29from tracekit.core.types import DigitalTrace, ProtocolPacket, WaveformTrace 

30 

31if TYPE_CHECKING: 

32 from collections.abc import Iterator 

33 

34 import numpy as np 

35 from numpy.typing import NDArray 

36 

37 

38class FlexRaySegment(Enum): 

39 """FlexRay communication segment types.""" 

40 

41 STATIC = "static" 

42 DYNAMIC = "dynamic" 

43 SYMBOL = "symbol" 

44 

45 

46@dataclass 

47class FlexRayFrame: 

48 """Decoded FlexRay frame. 

49 

50 Attributes: 

51 slot_id: Slot identifier (1-2047). 

52 cycle_count: Cycle counter (0-63). 

53 payload_length: Payload length in 16-bit words (0-127). 

54 header_crc: Header CRC value. 

55 payload: Payload data bytes. 

56 frame_crc: Frame CRC value (24-bit). 

57 segment: Segment type (static or dynamic). 

58 timestamp: Frame start time in seconds. 

59 errors: List of detected errors. 

60 """ 

61 

62 slot_id: int 

63 cycle_count: int 

64 payload_length: int 

65 header_crc: int 

66 payload: bytes 

67 frame_crc: int 

68 segment: FlexRaySegment 

69 timestamp: float 

70 errors: list[str] 

71 

72 

73class FlexRayDecoder(AsyncDecoder): 

74 """FlexRay protocol decoder. 

75 

76 Decodes FlexRay bus frames with header and frame CRC validation, 

77 static and dynamic segment support, and slot/cycle identification. 

78 

79 Attributes: 

80 id: "flexray" 

81 name: "FlexRay" 

82 channels: [bp, bm] (differential pair) 

83 

84 Example: 

85 >>> decoder = FlexRayDecoder(bitrate=10000000) 

86 >>> for packet in decoder.decode(bp=bp, bm=bm, sample_rate=100e6): 

87 ... print(f"Slot {packet.annotations['slot_id']}, Cycle {packet.annotations['cycle_count']}") 

88 """ 

89 

90 id = "flexray" 

91 name = "FlexRay" 

92 longname = "FlexRay Automotive Network" 

93 desc = "FlexRay protocol decoder" 

94 

95 channels = [ # noqa: RUF012 

96 ChannelDef("bp", "BP", "FlexRay Bus Plus", required=True), 

97 ChannelDef("bm", "BM", "FlexRay Bus Minus", required=True), 

98 ] 

99 

100 optional_channels = [] # noqa: RUF012 

101 

102 options = [ # noqa: RUF012 

103 OptionDef( 

104 "bitrate", 

105 "Bitrate", 

106 "Bits per second", 

107 default=10000000, 

108 values=[2500000, 5000000, 10000000], 

109 ), 

110 ] 

111 

112 annotations = [ # noqa: RUF012 

113 ("tss", "Transmission Start Sequence"), 

114 ("fss", "Frame Start Sequence"), 

115 ("header", "Frame header"), 

116 ("payload", "Payload"), 

117 ("crc", "Frame CRC"), 

118 ("error", "Error"), 

119 ] 

120 

121 # FlexRay constants 

122 TSS_LENGTH = 3 # Transmission Start Sequence (Low + Low + High) 

123 FSS_LENGTH = 1 # Frame Start Sequence (Low) 

124 BSS_LENGTH = 1 # Byte Start Sequence 

125 

126 def __init__( 

127 self, 

128 bitrate: int = 10000000, 

129 ) -> None: 

130 """Initialize FlexRay decoder. 

131 

132 Args: 

133 bitrate: FlexRay bitrate in bps (2.5, 5, or 10 Mbps). 

134 """ 

135 super().__init__(baudrate=bitrate, bitrate=bitrate) 

136 self._bitrate = bitrate 

137 

138 def decode( # type: ignore[override] 

139 self, 

140 trace: DigitalTrace | WaveformTrace | None = None, 

141 *, 

142 bp: NDArray[np.bool_] | None = None, 

143 bm: NDArray[np.bool_] | None = None, 

144 sample_rate: float = 1.0, 

145 ) -> Iterator[ProtocolPacket]: 

146 """Decode FlexRay frames. 

147 

148 Args: 

149 trace: Optional input trace. 

150 bp: Bus Plus signal. 

151 bm: Bus Minus signal. 

152 sample_rate: Sample rate in Hz. 

153 

154 Yields: 

155 Decoded FlexRay frames as ProtocolPacket objects. 

156 

157 Example: 

158 >>> decoder = FlexRayDecoder(bitrate=10000000) 

159 >>> for pkt in decoder.decode(bp=bp, bm=bm, sample_rate=100e6): 

160 ... print(f"Slot: {pkt.annotations['slot_id']}") 

161 """ 

162 if trace is not None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 if isinstance(trace, WaveformTrace): 

164 from tracekit.analyzers.digital.extraction import to_digital 

165 

166 digital_trace = to_digital(trace, threshold="auto") 

167 else: 

168 digital_trace = trace 

169 bp = digital_trace.data 

170 sample_rate = digital_trace.metadata.sample_rate 

171 

172 if bp is None or bm is None: 

173 return 

174 

175 n_samples = min(len(bp), len(bm)) 

176 bp = bp[:n_samples] 

177 bm = bm[:n_samples] 

178 

179 # Decode differential signal 

180 # IdleLow: BP=0, BM=1 -> 0 

181 # Data0: BP=1, BM=0 -> 1 

182 # Data1: BP=0, BM=1 -> 0 

183 # Simplified: use BP as primary signal 

184 diff_signal = bp 

185 

186 bit_period = sample_rate / self._bitrate 

187 

188 frame_num = 0 

189 idx = 0 

190 

191 while idx < len(diff_signal): 

192 # Look for TSS (Transmission Start Sequence) 

193 tss_idx = self._find_tss(diff_signal, idx, bit_period) 

194 if tss_idx is None: 

195 break 

196 

197 # Decode frame 

198 frame, end_idx = self._decode_frame(diff_signal, tss_idx, sample_rate, bit_period) 

199 

200 if frame is not None: 

201 # Add annotation 

202 self.put_annotation( 

203 frame.timestamp, 

204 frame.timestamp + 0.001, 

205 AnnotationLevel.PACKETS, 

206 f"Slot {frame.slot_id}, Cycle {frame.cycle_count}", 

207 ) 

208 

209 # Create packet 

210 annotations = { 

211 "frame_num": frame_num, 

212 "slot_id": frame.slot_id, 

213 "cycle_count": frame.cycle_count, 

214 "payload_length": frame.payload_length, 

215 "header_crc": frame.header_crc, 

216 "frame_crc": frame.frame_crc, 

217 "segment": frame.segment.value, 

218 } 

219 

220 packet = ProtocolPacket( 

221 timestamp=frame.timestamp, 

222 protocol="flexray", 

223 data=frame.payload, 

224 annotations=annotations, 

225 errors=frame.errors, 

226 ) 

227 

228 yield packet 

229 frame_num += 1 

230 

231 idx = end_idx if end_idx > idx else idx + int(bit_period) 

232 

233 def _find_tss( 

234 self, 

235 data: NDArray[np.bool_], 

236 start_idx: int, 

237 bit_period: float, 

238 ) -> int | None: 

239 """Find Transmission Start Sequence. 

240 

241 Args: 

242 data: Digital data array. 

243 start_idx: Start search index. 

244 bit_period: Bit period in samples. 

245 

246 Returns: 

247 Index of TSS start, or None if not found. 

248 """ 

249 # TSS pattern: Low (idle), Low (data0), High (data1) 

250 # Simplified: look for specific transition pattern 

251 idx = start_idx 

252 while idx < len(data) - int(3 * bit_period): 

253 # Sample at bit centers 

254 sample1_idx = int(idx + bit_period / 2) 

255 sample2_idx = int(idx + 1.5 * bit_period) 

256 sample3_idx = int(idx + 2.5 * bit_period) 

257 

258 if sample1_idx < len(data) and sample2_idx < len(data) and sample3_idx < len(data): 258 ↛ 263line 258 didn't jump to line 263 because the condition on line 258 was always true

259 # Check for low, low, high pattern 

260 if not data[sample1_idx] and not data[sample2_idx] and data[sample3_idx]: 

261 return idx 

262 

263 idx += int(bit_period / 4) 

264 

265 return None 

266 

267 def _decode_frame( 

268 self, 

269 data: NDArray[np.bool_], 

270 tss_idx: int, 

271 sample_rate: float, 

272 bit_period: float, 

273 ) -> tuple[FlexRayFrame | None, int]: 

274 """Decode FlexRay frame starting from TSS. 

275 

276 Args: 

277 data: Digital data array. 

278 tss_idx: TSS index. 

279 sample_rate: Sample rate in Hz. 

280 bit_period: Bit period in samples. 

281 

282 Returns: 

283 (frame, end_index) tuple. 

284 """ 

285 errors = [] 

286 bit_idx = tss_idx + int(3 * bit_period) # Skip TSS 

287 

288 # Sample bits 

289 def sample_bits(count: int) -> list[int]: 

290 nonlocal bit_idx 

291 bits = [] 

292 for _ in range(count): 

293 sample_idx = int(bit_idx + bit_period / 2) 

294 if sample_idx < len(data): 

295 bits.append(1 if data[sample_idx] else 0) 

296 bit_idx += bit_period # type: ignore[assignment] 

297 else: 

298 return bits 

299 return bits 

300 

301 # FSS (Frame Start Sequence) - 1 bit 

302 fss_bits = sample_bits(1) 

303 if not fss_bits or fss_bits[0] != 0: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 errors.append("Invalid FSS") 

305 

306 # Header (5 bytes = 40 bits) 

307 # Byte 1: Reserved (1) + Payload preamble (1) + NULL frame (1) + Sync (1) + Startup (1) + Slot ID[10:8] (3) 

308 # Byte 2: Slot ID[7:0] (8) 

309 # Byte 3: Header CRC[10:3] (8) 

310 # Byte 4: Header CRC[2:0] (3) + Cycle count[5:0] (6) - split to bits 7:5 and 4:0 

311 # Byte 5: Cycle count continued + Payload length[6:0] (7) 

312 

313 header_bits = sample_bits(40) 

314 if len(header_bits) < 40: 

315 return None, int(bit_idx) 

316 

317 # Extract header fields (simplified) 

318 # Slot ID (11 bits): bits 4-14 

319 slot_id_bits = header_bits[4:15] 

320 slot_id = 0 

321 for bit in slot_id_bits: 

322 slot_id = (slot_id << 1) | bit 

323 

324 # Header CRC (11 bits): bits 15-25 

325 header_crc_bits = header_bits[15:26] 

326 header_crc = 0 

327 for bit in header_crc_bits: 

328 header_crc = (header_crc << 1) | bit 

329 

330 # Cycle count (6 bits): bits 26-31 

331 cycle_bits = header_bits[26:32] 

332 cycle_count = 0 

333 for bit in cycle_bits: 

334 cycle_count = (cycle_count << 1) | bit 

335 

336 # Payload length (7 bits): bits 33-39 

337 payload_len_bits = header_bits[33:40] 

338 payload_length = 0 

339 for bit in payload_len_bits: 

340 payload_length = (payload_length << 1) | bit 

341 

342 # Payload (payload_length * 2 bytes, as length is in 16-bit words) 

343 payload_byte_count = payload_length * 2 

344 payload_bytes = [] 

345 

346 for _ in range(payload_byte_count): 

347 byte_bits = sample_bits(8) 

348 if len(byte_bits) == 8: 

349 byte_val = 0 

350 for bit in byte_bits: 

351 byte_val = (byte_val << 1) | bit 

352 payload_bytes.append(byte_val) 

353 else: 

354 errors.append("Incomplete payload") 

355 break 

356 

357 # Frame CRC (24 bits) 

358 crc_bits = sample_bits(24) 

359 frame_crc = 0 

360 for bit in crc_bits: 

361 frame_crc = (frame_crc << 1) | bit 

362 

363 # Create frame 

364 frame = FlexRayFrame( 

365 slot_id=slot_id, 

366 cycle_count=cycle_count, 

367 payload_length=payload_length, 

368 header_crc=header_crc, 

369 payload=bytes(payload_bytes), 

370 frame_crc=frame_crc, 

371 segment=FlexRaySegment.STATIC, # Simplified: assume static 

372 timestamp=tss_idx / sample_rate, 

373 errors=errors, 

374 ) 

375 

376 return frame, int(bit_idx) 

377 

378 

379def decode_flexray( 

380 bp: NDArray[np.bool_], 

381 bm: NDArray[np.bool_], 

382 sample_rate: float = 1.0, 

383 bitrate: int = 10000000, 

384) -> list[ProtocolPacket]: 

385 """Convenience function to decode FlexRay frames. 

386 

387 Args: 

388 bp: Bus Plus signal. 

389 bm: Bus Minus signal. 

390 sample_rate: Sample rate in Hz. 

391 bitrate: FlexRay bitrate in bps. 

392 

393 Returns: 

394 List of decoded FlexRay frames. 

395 

396 Example: 

397 >>> packets = decode_flexray(bp, bm, sample_rate=100e6, bitrate=10e6) 

398 >>> for pkt in packets: 

399 ... print(f"Slot: {pkt.annotations['slot_id']}") 

400 """ 

401 decoder = FlexRayDecoder(bitrate=bitrate) 

402 return list(decoder.decode(bp=bp, bm=bm, sample_rate=sample_rate)) 

403 

404 

405__all__ = ["FlexRayDecoder", "FlexRayFrame", "FlexRaySegment", "decode_flexray"]