Coverage for src / tracekit / analyzers / protocols / flexray.py: 93%
131 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""FlexRay protocol decoder.
3This module implements FlexRay automotive protocol decoder with support
4for static and dynamic segments, 10 Mbps signaling, and CRC validation.
7Example:
8 >>> from tracekit.analyzers.protocols.flexray import FlexRayDecoder
9 >>> decoder = FlexRayDecoder()
10 >>> for packet in decoder.decode(bp=bp, bm=bm):
11 ... print(f"Slot: {packet.annotations['slot_id']}")
13References:
14 FlexRay Communications System Protocol Specification Version 3.0.1
15"""
17from __future__ import annotations
19from dataclasses import dataclass
20from enum import Enum
21from typing import TYPE_CHECKING
23from tracekit.analyzers.protocols.base import (
24 AnnotationLevel,
25 AsyncDecoder,
26 ChannelDef,
27 OptionDef,
28)
29from tracekit.core.types import DigitalTrace, ProtocolPacket, WaveformTrace
31if TYPE_CHECKING:
32 from collections.abc import Iterator
34 import numpy as np
35 from numpy.typing import NDArray
38class FlexRaySegment(Enum):
39 """FlexRay communication segment types."""
41 STATIC = "static"
42 DYNAMIC = "dynamic"
43 SYMBOL = "symbol"
46@dataclass
47class FlexRayFrame:
48 """Decoded FlexRay frame.
50 Attributes:
51 slot_id: Slot identifier (1-2047).
52 cycle_count: Cycle counter (0-63).
53 payload_length: Payload length in 16-bit words (0-127).
54 header_crc: Header CRC value.
55 payload: Payload data bytes.
56 frame_crc: Frame CRC value (24-bit).
57 segment: Segment type (static or dynamic).
58 timestamp: Frame start time in seconds.
59 errors: List of detected errors.
60 """
62 slot_id: int
63 cycle_count: int
64 payload_length: int
65 header_crc: int
66 payload: bytes
67 frame_crc: int
68 segment: FlexRaySegment
69 timestamp: float
70 errors: list[str]
73class FlexRayDecoder(AsyncDecoder):
74 """FlexRay protocol decoder.
76 Decodes FlexRay bus frames with header and frame CRC validation,
77 static and dynamic segment support, and slot/cycle identification.
79 Attributes:
80 id: "flexray"
81 name: "FlexRay"
82 channels: [bp, bm] (differential pair)
84 Example:
85 >>> decoder = FlexRayDecoder(bitrate=10000000)
86 >>> for packet in decoder.decode(bp=bp, bm=bm, sample_rate=100e6):
87 ... print(f"Slot {packet.annotations['slot_id']}, Cycle {packet.annotations['cycle_count']}")
88 """
90 id = "flexray"
91 name = "FlexRay"
92 longname = "FlexRay Automotive Network"
93 desc = "FlexRay protocol decoder"
95 channels = [ # noqa: RUF012
96 ChannelDef("bp", "BP", "FlexRay Bus Plus", required=True),
97 ChannelDef("bm", "BM", "FlexRay Bus Minus", required=True),
98 ]
100 optional_channels = [] # noqa: RUF012
102 options = [ # noqa: RUF012
103 OptionDef(
104 "bitrate",
105 "Bitrate",
106 "Bits per second",
107 default=10000000,
108 values=[2500000, 5000000, 10000000],
109 ),
110 ]
112 annotations = [ # noqa: RUF012
113 ("tss", "Transmission Start Sequence"),
114 ("fss", "Frame Start Sequence"),
115 ("header", "Frame header"),
116 ("payload", "Payload"),
117 ("crc", "Frame CRC"),
118 ("error", "Error"),
119 ]
121 # FlexRay constants
122 TSS_LENGTH = 3 # Transmission Start Sequence (Low + Low + High)
123 FSS_LENGTH = 1 # Frame Start Sequence (Low)
124 BSS_LENGTH = 1 # Byte Start Sequence
126 def __init__(
127 self,
128 bitrate: int = 10000000,
129 ) -> None:
130 """Initialize FlexRay decoder.
132 Args:
133 bitrate: FlexRay bitrate in bps (2.5, 5, or 10 Mbps).
134 """
135 super().__init__(baudrate=bitrate, bitrate=bitrate)
136 self._bitrate = bitrate
138 def decode( # type: ignore[override]
139 self,
140 trace: DigitalTrace | WaveformTrace | None = None,
141 *,
142 bp: NDArray[np.bool_] | None = None,
143 bm: NDArray[np.bool_] | None = None,
144 sample_rate: float = 1.0,
145 ) -> Iterator[ProtocolPacket]:
146 """Decode FlexRay frames.
148 Args:
149 trace: Optional input trace.
150 bp: Bus Plus signal.
151 bm: Bus Minus signal.
152 sample_rate: Sample rate in Hz.
154 Yields:
155 Decoded FlexRay frames as ProtocolPacket objects.
157 Example:
158 >>> decoder = FlexRayDecoder(bitrate=10000000)
159 >>> for pkt in decoder.decode(bp=bp, bm=bm, sample_rate=100e6):
160 ... print(f"Slot: {pkt.annotations['slot_id']}")
161 """
162 if trace is not None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 if isinstance(trace, WaveformTrace):
164 from tracekit.analyzers.digital.extraction import to_digital
166 digital_trace = to_digital(trace, threshold="auto")
167 else:
168 digital_trace = trace
169 bp = digital_trace.data
170 sample_rate = digital_trace.metadata.sample_rate
172 if bp is None or bm is None:
173 return
175 n_samples = min(len(bp), len(bm))
176 bp = bp[:n_samples]
177 bm = bm[:n_samples]
179 # Decode differential signal
180 # IdleLow: BP=0, BM=1 -> 0
181 # Data0: BP=1, BM=0 -> 1
182 # Data1: BP=0, BM=1 -> 0
183 # Simplified: use BP as primary signal
184 diff_signal = bp
186 bit_period = sample_rate / self._bitrate
188 frame_num = 0
189 idx = 0
191 while idx < len(diff_signal):
192 # Look for TSS (Transmission Start Sequence)
193 tss_idx = self._find_tss(diff_signal, idx, bit_period)
194 if tss_idx is None:
195 break
197 # Decode frame
198 frame, end_idx = self._decode_frame(diff_signal, tss_idx, sample_rate, bit_period)
200 if frame is not None:
201 # Add annotation
202 self.put_annotation(
203 frame.timestamp,
204 frame.timestamp + 0.001,
205 AnnotationLevel.PACKETS,
206 f"Slot {frame.slot_id}, Cycle {frame.cycle_count}",
207 )
209 # Create packet
210 annotations = {
211 "frame_num": frame_num,
212 "slot_id": frame.slot_id,
213 "cycle_count": frame.cycle_count,
214 "payload_length": frame.payload_length,
215 "header_crc": frame.header_crc,
216 "frame_crc": frame.frame_crc,
217 "segment": frame.segment.value,
218 }
220 packet = ProtocolPacket(
221 timestamp=frame.timestamp,
222 protocol="flexray",
223 data=frame.payload,
224 annotations=annotations,
225 errors=frame.errors,
226 )
228 yield packet
229 frame_num += 1
231 idx = end_idx if end_idx > idx else idx + int(bit_period)
233 def _find_tss(
234 self,
235 data: NDArray[np.bool_],
236 start_idx: int,
237 bit_period: float,
238 ) -> int | None:
239 """Find Transmission Start Sequence.
241 Args:
242 data: Digital data array.
243 start_idx: Start search index.
244 bit_period: Bit period in samples.
246 Returns:
247 Index of TSS start, or None if not found.
248 """
249 # TSS pattern: Low (idle), Low (data0), High (data1)
250 # Simplified: look for specific transition pattern
251 idx = start_idx
252 while idx < len(data) - int(3 * bit_period):
253 # Sample at bit centers
254 sample1_idx = int(idx + bit_period / 2)
255 sample2_idx = int(idx + 1.5 * bit_period)
256 sample3_idx = int(idx + 2.5 * bit_period)
258 if sample1_idx < len(data) and sample2_idx < len(data) and sample3_idx < len(data): 258 ↛ 263line 258 didn't jump to line 263 because the condition on line 258 was always true
259 # Check for low, low, high pattern
260 if not data[sample1_idx] and not data[sample2_idx] and data[sample3_idx]:
261 return idx
263 idx += int(bit_period / 4)
265 return None
267 def _decode_frame(
268 self,
269 data: NDArray[np.bool_],
270 tss_idx: int,
271 sample_rate: float,
272 bit_period: float,
273 ) -> tuple[FlexRayFrame | None, int]:
274 """Decode FlexRay frame starting from TSS.
276 Args:
277 data: Digital data array.
278 tss_idx: TSS index.
279 sample_rate: Sample rate in Hz.
280 bit_period: Bit period in samples.
282 Returns:
283 (frame, end_index) tuple.
284 """
285 errors = []
286 bit_idx = tss_idx + int(3 * bit_period) # Skip TSS
288 # Sample bits
289 def sample_bits(count: int) -> list[int]:
290 nonlocal bit_idx
291 bits = []
292 for _ in range(count):
293 sample_idx = int(bit_idx + bit_period / 2)
294 if sample_idx < len(data):
295 bits.append(1 if data[sample_idx] else 0)
296 bit_idx += bit_period # type: ignore[assignment]
297 else:
298 return bits
299 return bits
301 # FSS (Frame Start Sequence) - 1 bit
302 fss_bits = sample_bits(1)
303 if not fss_bits or fss_bits[0] != 0: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 errors.append("Invalid FSS")
306 # Header (5 bytes = 40 bits)
307 # Byte 1: Reserved (1) + Payload preamble (1) + NULL frame (1) + Sync (1) + Startup (1) + Slot ID[10:8] (3)
308 # Byte 2: Slot ID[7:0] (8)
309 # Byte 3: Header CRC[10:3] (8)
310 # Byte 4: Header CRC[2:0] (3) + Cycle count[5:0] (6) - split to bits 7:5 and 4:0
311 # Byte 5: Cycle count continued + Payload length[6:0] (7)
313 header_bits = sample_bits(40)
314 if len(header_bits) < 40:
315 return None, int(bit_idx)
317 # Extract header fields (simplified)
318 # Slot ID (11 bits): bits 4-14
319 slot_id_bits = header_bits[4:15]
320 slot_id = 0
321 for bit in slot_id_bits:
322 slot_id = (slot_id << 1) | bit
324 # Header CRC (11 bits): bits 15-25
325 header_crc_bits = header_bits[15:26]
326 header_crc = 0
327 for bit in header_crc_bits:
328 header_crc = (header_crc << 1) | bit
330 # Cycle count (6 bits): bits 26-31
331 cycle_bits = header_bits[26:32]
332 cycle_count = 0
333 for bit in cycle_bits:
334 cycle_count = (cycle_count << 1) | bit
336 # Payload length (7 bits): bits 33-39
337 payload_len_bits = header_bits[33:40]
338 payload_length = 0
339 for bit in payload_len_bits:
340 payload_length = (payload_length << 1) | bit
342 # Payload (payload_length * 2 bytes, as length is in 16-bit words)
343 payload_byte_count = payload_length * 2
344 payload_bytes = []
346 for _ in range(payload_byte_count):
347 byte_bits = sample_bits(8)
348 if len(byte_bits) == 8:
349 byte_val = 0
350 for bit in byte_bits:
351 byte_val = (byte_val << 1) | bit
352 payload_bytes.append(byte_val)
353 else:
354 errors.append("Incomplete payload")
355 break
357 # Frame CRC (24 bits)
358 crc_bits = sample_bits(24)
359 frame_crc = 0
360 for bit in crc_bits:
361 frame_crc = (frame_crc << 1) | bit
363 # Create frame
364 frame = FlexRayFrame(
365 slot_id=slot_id,
366 cycle_count=cycle_count,
367 payload_length=payload_length,
368 header_crc=header_crc,
369 payload=bytes(payload_bytes),
370 frame_crc=frame_crc,
371 segment=FlexRaySegment.STATIC, # Simplified: assume static
372 timestamp=tss_idx / sample_rate,
373 errors=errors,
374 )
376 return frame, int(bit_idx)
379def decode_flexray(
380 bp: NDArray[np.bool_],
381 bm: NDArray[np.bool_],
382 sample_rate: float = 1.0,
383 bitrate: int = 10000000,
384) -> list[ProtocolPacket]:
385 """Convenience function to decode FlexRay frames.
387 Args:
388 bp: Bus Plus signal.
389 bm: Bus Minus signal.
390 sample_rate: Sample rate in Hz.
391 bitrate: FlexRay bitrate in bps.
393 Returns:
394 List of decoded FlexRay frames.
396 Example:
397 >>> packets = decode_flexray(bp, bm, sample_rate=100e6, bitrate=10e6)
398 >>> for pkt in packets:
399 ... print(f"Slot: {pkt.annotations['slot_id']}")
400 """
401 decoder = FlexRayDecoder(bitrate=bitrate)
402 return list(decoder.decode(bp=bp, bm=bm, sample_rate=sample_rate))
405__all__ = ["FlexRayDecoder", "FlexRayFrame", "FlexRaySegment", "decode_flexray"]