Coverage for src / tracekit / analyzers / digital / bus.py: 96%
222 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Configurable multi-bit parallel bus decoding.
3This module provides configurable bus decoding for parallel digital signals,
4supporting various bit orderings, active-low signaling, and clock-based
5or interval-based sampling strategies.
8Example:
9 >>> import numpy as np
10 >>> from tracekit.analyzers.digital.bus import BusConfig, BusDecoder
11 >>> # Define 8-bit bus configuration
12 >>> config = BusConfig(name="data_bus", width=8, bit_order='lsb_first')
13 >>> config.bits = [{'channel': i, 'bit': i, 'name': f'D{i}'} for i in range(8)]
14 >>> # Create decoder
15 >>> decoder = BusDecoder(config, sample_rate=100e6)
16 >>> # Decode bus values from bit traces
17 >>> bit_traces = {i: np.random.randint(0, 2, 1000, dtype=np.uint8) for i in range(8)}
18 >>> transactions = decoder.decode_bus(bit_traces)
19"""
21from __future__ import annotations
23from dataclasses import dataclass, field
24from pathlib import Path
25from typing import TYPE_CHECKING, Any, Literal
27import numpy as np
29if TYPE_CHECKING:
30 from numpy.typing import NDArray
33@dataclass
34class BusConfig:
35 """Configuration for parallel bus decoding.
37 Attributes:
38 name: Descriptive name for the bus.
39 width: Number of bits in the bus.
40 bit_order: Bit ordering, 'lsb_first' or 'msb_first'.
41 active_low: Whether signals are active-low (inverted).
42 bits: List of bit definitions with channel mapping.
44 Example:
45 >>> config = BusConfig(name="addr_bus", width=12, bit_order='lsb_first')
46 >>> config.bits = [{'channel': i, 'bit': i} for i in range(12)]
47 """
49 name: str
50 width: int # Number of bits
51 bit_order: Literal["lsb_first", "msb_first"] = "lsb_first"
52 active_low: bool = False
53 bits: list[dict[str, Any]] = field(
54 default_factory=list
55 ) # [{channel: 0, bit: 0, name: 'D0'}, ...]
57 def __post_init__(self) -> None:
58 """Validate bus configuration."""
59 if self.width <= 0:
60 raise ValueError(f"Bus width must be positive, got {self.width}")
61 if self.bit_order not in ["lsb_first", "msb_first"]:
62 raise ValueError(f"Invalid bit_order: {self.bit_order}")
64 @classmethod
65 def from_yaml(cls, path: str | Path) -> BusConfig:
66 """Load bus configuration from YAML file.
68 Args:
69 path: Path to YAML configuration file.
71 Returns:
72 BusConfig instance loaded from file.
74 Raises:
75 ImportError: If PyYAML is not installed.
76 FileNotFoundError: If file does not exist.
77 """
78 try:
79 import yaml
80 except ImportError as e:
81 raise ImportError(
82 "PyYAML is required for YAML loading. Install with: pip install pyyaml"
83 ) from e
85 path = Path(path)
86 if not path.exists():
87 raise FileNotFoundError(f"Configuration file not found: {path}")
89 with open(path) as f:
90 config_dict = yaml.safe_load(f)
92 return cls.from_dict(config_dict)
94 @classmethod
95 def from_dict(cls, config: dict[str, Any]) -> BusConfig:
96 """Create bus configuration from dictionary.
98 Args:
99 config: Dictionary with bus configuration parameters.
101 Returns:
102 BusConfig instance created from dictionary.
104 Example:
105 >>> config_dict = {
106 ... 'name': 'data_bus',
107 ... 'width': 8,
108 ... 'bit_order': 'lsb_first',
109 ... 'bits': [{'channel': i, 'bit': i} for i in range(8)]
110 ... }
111 >>> config = BusConfig.from_dict(config_dict)
112 """
113 return cls(
114 name=config.get("name", "bus"),
115 width=config["width"],
116 bit_order=config.get("bit_order", "lsb_first"),
117 active_low=config.get("active_low", False),
118 bits=config.get("bits", []),
119 )
122@dataclass
123class ParallelBusConfig:
124 """Configuration for parallel bus decoding with simplified interface.
126 This is a convenience class for tests that provides a simpler interface
127 than the full BusConfig.
129 Attributes:
130 data_width: Number of data bits in the bus.
131 bit_order: Bit ordering, 'lsb_first' or 'msb_first'.
132 has_clock: Whether the bus uses a clock signal.
133 address_width: Optional number of address bits.
134 active_low: Whether signals are active-low.
136 Example:
137 >>> config = ParallelBusConfig(data_width=8, bit_order='lsb_first')
138 """
140 data_width: int
141 bit_order: Literal["lsb_first", "msb_first"] = "lsb_first"
142 has_clock: bool = False
143 address_width: int | None = None
144 active_low: bool = False
146 def __post_init__(self) -> None:
147 """Validate configuration."""
148 if self.data_width <= 0:
149 raise ValueError(f"data_width must be positive, got {self.data_width}")
150 if self.address_width is not None and self.address_width <= 0:
151 raise ValueError(f"address_width must be positive, got {self.address_width}")
153 def to_bus_config(self, name: str = "parallel_bus") -> BusConfig:
154 """Convert to BusConfig.
156 Args:
157 name: Name for the bus configuration.
159 Returns:
160 BusConfig instance.
161 """
162 return BusConfig(
163 name=name,
164 width=self.data_width,
165 bit_order=self.bit_order,
166 active_low=self.active_low,
167 bits=[{"channel": i, "bit": i} for i in range(self.data_width)],
168 )
171@dataclass
172class BusTransaction:
173 """A decoded bus transaction.
175 Attributes:
176 timestamp: Time in seconds when transaction occurred.
177 sample_index: Sample index in the original traces.
178 value: Decoded bus value as integer.
179 raw_bits: Individual bit values (after active-low inversion if applicable).
180 transaction_type: Optional transaction type label.
181 address: Optional address field if this is an address bus.
182 data: Optional data field if this is a data bus.
183 """
185 timestamp: float # Time in seconds
186 sample_index: int
187 value: int # Decoded bus value
188 raw_bits: list[int] # Individual bit values
189 transaction_type: str = "" # 'read', 'write', etc.
190 address: int | None = None # If address bus present
191 data: int | None = None # If data value
194class BusDecoder:
195 """Decode multi-bit parallel buses from individual bit traces.
197 Supports configurable bit ordering, active-low signaling, and various
198 sampling strategies (clock-based or interval-based).
200 Attributes:
201 config: Bus configuration specifying width, ordering, etc.
202 sample_rate: Sample rate of input traces in Hz.
204 Example:
205 >>> config = BusConfig(name="data", width=8, bit_order='lsb_first')
206 >>> decoder = BusDecoder(config, sample_rate=100e6)
207 >>> bit_traces = {i: trace_data for i in range(8)}
208 >>> transactions = decoder.decode_bus(bit_traces)
209 """
211 def __init__(
212 self,
213 config: BusConfig | ParallelBusConfig,
214 sample_rate: float = 1.0,
215 ):
216 """Initialize decoder with configuration.
218 Args:
219 config: Bus configuration (BusConfig or ParallelBusConfig).
220 sample_rate: Sample rate of input traces in Hz.
222 Raises:
223 ValueError: If sample rate is invalid.
224 """
225 if sample_rate <= 0:
226 raise ValueError(f"Sample rate must be positive, got {sample_rate}")
228 # Handle ParallelBusConfig
229 self._parallel_config: ParallelBusConfig | None
230 if isinstance(config, ParallelBusConfig):
231 self._parallel_config = config
232 self.config = config.to_bus_config()
233 else:
234 self._parallel_config = None
235 self.config = config
237 self.sample_rate = sample_rate
238 self._time_base = 1.0 / sample_rate
240 def decode_bus(
241 self,
242 bit_traces: dict[int, NDArray[np.uint8]], # channel_index -> trace data
243 clock_trace: NDArray[np.uint8] | None = None,
244 clock_edge: Literal["rising", "falling"] = "rising",
245 ) -> list[BusTransaction]:
246 """Decode bus values from individual bit traces.
248 Args:
249 bit_traces: Dictionary mapping channel index to trace data (boolean or 0/1).
250 clock_trace: Optional clock signal for synchronous sampling.
251 clock_edge: Which clock edge to sample on ('rising' or 'falling').
253 Returns:
254 List of BusTransaction objects with decoded values.
256 Raises:
257 ValueError: If bit traces don't match configuration.
259 Example:
260 >>> bit_traces = {0: np.array([0,1,1,0]), 1: np.array([1,1,0,0])}
261 >>> transactions = decoder.decode_bus(bit_traces)
262 """
263 if not bit_traces:
264 raise ValueError("bit_traces cannot be empty")
266 # Use clock-based or interval-based sampling
267 if clock_trace is not None:
268 return self.sample_at_clock(bit_traces, clock_trace, clock_edge)
269 else:
270 # Sample every point (could be optimized with interval sampling)
271 _trace_length = len(next(iter(bit_traces.values())))
272 return self.sample_at_intervals(bit_traces, interval_samples=1)
274 def decode_parallel(
275 self,
276 channels: list[NDArray[np.uint8]],
277 ) -> list[int]:
278 """Decode parallel bus values from channel list.
280 Simplified interface for parallel bus decoding without clock.
282 Args:
283 channels: List of channel data arrays, indexed by bit position.
285 Returns:
286 List of decoded integer values (one per sample).
288 Example:
289 >>> channels = [ch0, ch1, ch2, ch3] # 4-bit bus
290 >>> values = decoder.decode_parallel(channels)
291 """
292 if not channels:
293 return []
295 trace_length = len(channels[0])
296 width = len(channels)
297 bit_order = self.config.bit_order
299 values = []
300 for sample_idx in range(trace_length):
301 value = 0
302 for bit_idx in range(width):
303 bit_val = int(bool(channels[bit_idx][sample_idx]))
304 if self.config.active_low:
305 bit_val = 1 - bit_val
307 if bit_order == "lsb_first":
308 if bit_val:
309 value |= 1 << bit_idx
310 else: # msb_first
311 if bit_val:
312 value |= 1 << (width - 1 - bit_idx)
314 values.append(value)
316 return values
318 def decode_with_clock(
319 self,
320 channels: list[NDArray[np.uint8]],
321 clock: NDArray[np.uint8],
322 edge: Literal["rising", "falling"] = "rising",
323 ) -> list[int]:
324 """Decode parallel bus values at clock edges.
326 Args:
327 channels: List of channel data arrays, indexed by bit position.
328 clock: Clock signal trace (boolean or 0/1).
329 edge: Which edge to sample on ('rising' or 'falling').
331 Returns:
332 List of decoded integer values (one per clock edge).
334 Example:
335 >>> values = decoder.decode_with_clock(channels, clock, 'rising')
336 """
337 if not channels:
338 return []
340 # Convert clock to boolean
341 clock_bool = np.asarray(clock, dtype=bool)
343 # Find edges
344 if edge == "rising":
345 edges = np.where(np.diff(clock_bool.astype(int)) > 0)[0] + 1
346 else:
347 edges = np.where(np.diff(clock_bool.astype(int)) < 0)[0] + 1
349 width = len(channels)
350 bit_order = self.config.bit_order
352 values = []
353 for edge_idx in edges:
354 value = 0
355 for bit_idx in range(width):
356 if edge_idx < len(channels[bit_idx]): 356 ↛ 359line 356 didn't jump to line 359 because the condition on line 356 was always true
357 bit_val = int(bool(channels[bit_idx][edge_idx]))
358 else:
359 bit_val = 0
361 if self.config.active_low: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 bit_val = 1 - bit_val
364 if bit_order == "lsb_first":
365 if bit_val:
366 value |= 1 << bit_idx
367 else: # msb_first
368 if bit_val:
369 value |= 1 << (width - 1 - bit_idx)
371 values.append(value)
373 return values
375 def decode_transactions(
376 self,
377 address_channels: list[NDArray[np.uint8]],
378 data_channels: list[NDArray[np.uint8]],
379 clock: NDArray[np.uint8],
380 edge: Literal["rising", "falling"] = "rising",
381 ) -> list[dict[str, int]]:
382 """Decode bus transactions with address and data.
384 Args:
385 address_channels: List of address channel data arrays.
386 data_channels: List of data channel data arrays.
387 clock: Clock signal trace.
388 edge: Which clock edge to sample on.
390 Returns:
391 List of transaction dictionaries with 'address' and 'data' keys.
393 Example:
394 >>> transactions = decoder.decode_transactions(
395 ... address_channels=addr_ch,
396 ... data_channels=data_ch,
397 ... clock=clk
398 ... )
399 """
400 # Convert clock to boolean
401 clock_bool = np.asarray(clock, dtype=bool)
403 # Find edges
404 if edge == "rising":
405 edges = np.where(np.diff(clock_bool.astype(int)) > 0)[0] + 1
406 else:
407 edges = np.where(np.diff(clock_bool.astype(int)) < 0)[0] + 1
409 addr_width = len(address_channels)
410 data_width = len(data_channels)
411 bit_order = self.config.bit_order
413 transactions = []
414 for edge_idx in edges:
415 # Decode address
416 address = 0
417 for bit_idx in range(addr_width):
418 if edge_idx < len(address_channels[bit_idx]): 418 ↛ 421line 418 didn't jump to line 421 because the condition on line 418 was always true
419 bit_val = int(bool(address_channels[bit_idx][edge_idx]))
420 else:
421 bit_val = 0
423 if bit_order == "lsb_first":
424 if bit_val:
425 address |= 1 << bit_idx
426 else:
427 if bit_val:
428 address |= 1 << (addr_width - 1 - bit_idx)
430 # Decode data
431 data = 0
432 for bit_idx in range(data_width):
433 if edge_idx < len(data_channels[bit_idx]): 433 ↛ 436line 433 didn't jump to line 436 because the condition on line 433 was always true
434 bit_val = int(bool(data_channels[bit_idx][edge_idx]))
435 else:
436 bit_val = 0
438 if bit_order == "lsb_first":
439 if bit_val:
440 data |= 1 << bit_idx
441 else:
442 if bit_val:
443 data |= 1 << (data_width - 1 - bit_idx)
445 transactions.append(
446 {
447 "address": address,
448 "data": data,
449 "sample_index": int(edge_idx),
450 }
451 )
453 return transactions
455 def sample_at_clock(
456 self,
457 bit_traces: dict[int, NDArray[np.uint8]],
458 clock_trace: NDArray[np.uint8],
459 edge: Literal["rising", "falling"] = "rising",
460 ) -> list[BusTransaction]:
461 """Sample bus at clock edges.
463 Args:
464 bit_traces: Dictionary mapping channel index to trace data.
465 clock_trace: Clock signal trace (boolean or 0/1).
466 edge: Which edge to sample on ('rising' or 'falling').
468 Returns:
469 List of BusTransaction objects sampled at clock edges.
471 Example:
472 >>> clock = np.array([0,1,0,1,0,1], dtype=bool)
473 >>> transactions = decoder.sample_at_clock(bit_traces, clock, 'rising')
474 """
475 # Convert clock to boolean
476 clock_bool = np.asarray(clock_trace, dtype=bool)
478 # Find edges
479 if edge == "rising":
480 # Rising edge: 0->1 transition
481 edges = np.where(np.diff(clock_bool.astype(int)) > 0)[0] + 1
482 else:
483 # Falling edge: 1->0 transition
484 edges = np.where(np.diff(clock_bool.astype(int)) < 0)[0] + 1
486 transactions = []
488 for edge_idx in edges:
489 # Sample all bits at this edge
490 bit_values = []
491 for bit_def in self.config.bits:
492 channel = bit_def.get("channel", bit_def.get("bit", 0))
493 if channel in bit_traces:
494 trace = bit_traces[channel]
495 if edge_idx < len(trace): 495 ↛ 499line 495 didn't jump to line 499 because the condition on line 495 was always true
496 bit_val = int(bool(trace[edge_idx]))
497 bit_values.append(bit_val)
498 else:
499 bit_values.append(0)
500 else:
501 bit_values.append(0)
503 # Apply active-low inversion if needed
504 if self.config.active_low: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 bit_values = self._apply_active_low(bit_values)
507 # Reconstruct bus value
508 value = self._reconstruct_value(bit_values)
510 # Create transaction
511 transaction = BusTransaction(
512 timestamp=edge_idx * self._time_base,
513 sample_index=int(edge_idx),
514 value=value,
515 raw_bits=bit_values,
516 )
517 transactions.append(transaction)
519 return transactions
521 def sample_at_intervals(
522 self, bit_traces: dict[int, NDArray[np.uint8]], interval_samples: int
523 ) -> list[BusTransaction]:
524 """Sample bus at regular intervals.
526 Args:
527 bit_traces: Dictionary mapping channel index to trace data.
528 interval_samples: Number of samples between each bus sample.
530 Returns:
531 List of BusTransaction objects sampled at intervals.
533 Raises:
534 ValueError: If interval_samples is not positive.
536 Example:
537 >>> transactions = decoder.sample_at_intervals(bit_traces, interval_samples=10)
538 """
539 if interval_samples <= 0:
540 raise ValueError(f"interval_samples must be positive, got {interval_samples}")
542 # Determine trace length
543 trace_length = len(next(iter(bit_traces.values())))
545 transactions = []
547 for sample_idx in range(0, trace_length, interval_samples):
548 # Sample all bits at this index
549 bit_values = []
550 for bit_def in self.config.bits:
551 channel = bit_def.get("channel", bit_def.get("bit", 0))
552 if channel in bit_traces: 552 ↛ 560line 552 didn't jump to line 560 because the condition on line 552 was always true
553 trace = bit_traces[channel]
554 if sample_idx < len(trace):
555 bit_val = int(bool(trace[sample_idx]))
556 bit_values.append(bit_val)
557 else:
558 bit_values.append(0)
559 else:
560 bit_values.append(0)
562 # Apply active-low inversion if needed
563 if self.config.active_low:
564 bit_values = self._apply_active_low(bit_values)
566 # Reconstruct bus value
567 value = self._reconstruct_value(bit_values)
569 # Create transaction
570 transaction = BusTransaction(
571 timestamp=sample_idx * self._time_base,
572 sample_index=sample_idx,
573 value=value,
574 raw_bits=bit_values,
575 )
576 transactions.append(transaction)
578 return transactions
580 def _reconstruct_value(self, bit_values: list[int]) -> int:
581 """Reconstruct bus value from individual bits.
583 Args:
584 bit_values: List of bit values (0 or 1) in config order.
586 Returns:
587 Integer value reconstructed from bits.
588 """
589 if not bit_values:
590 return 0
592 value = 0
594 if self.config.bit_order == "lsb_first":
595 # LSB is first in list, MSB is last
596 for i, bit_val in enumerate(bit_values):
597 if bit_val:
598 value |= 1 << i
599 else: # msb_first
600 # MSB is first in list, LSB is last
601 n_bits = len(bit_values)
602 for i, bit_val in enumerate(bit_values):
603 if bit_val:
604 value |= 1 << (n_bits - 1 - i)
606 return value
608 def _apply_active_low(self, bit_values: list[int]) -> list[int]:
609 """Apply active-low inversion if configured.
611 Args:
612 bit_values: List of bit values (0 or 1).
614 Returns:
615 Inverted bit values if active_low is True, otherwise unchanged.
616 """
617 if self.config.active_low:
618 return [1 - bit for bit in bit_values]
619 return bit_values
622# Convenience functions
625def decode_bus(
626 bit_traces: dict[int, NDArray[np.uint8]],
627 config: BusConfig | str | Path,
628 sample_rate: float,
629 clock_trace: NDArray[np.uint8] | None = None,
630 clock_edge: Literal["rising", "falling"] = "rising",
631) -> list[BusTransaction]:
632 """Decode bus from bit traces.
634 Convenience function for quick bus decoding without creating a decoder instance.
636 Args:
637 bit_traces: Dictionary mapping channel index to trace data.
638 config: BusConfig instance or path to YAML config file.
639 sample_rate: Sample rate of traces in Hz.
640 clock_trace: Optional clock signal for synchronous sampling.
641 clock_edge: Which clock edge to sample on.
643 Returns:
644 List of BusTransaction objects.
646 Example:
647 >>> transactions = decode_bus(bit_traces, 'bus_config.yaml', 100e6)
648 """
649 if isinstance(config, str | Path):
650 config = BusConfig.from_yaml(config)
652 decoder = BusDecoder(config, sample_rate)
653 return decoder.decode_bus(bit_traces, clock_trace, clock_edge)
656def sample_at_clock(
657 bit_traces: dict[int, NDArray[np.uint8]],
658 clock_trace: NDArray[np.uint8],
659 config: BusConfig,
660 sample_rate: float,
661 edge: Literal["rising", "falling"] = "rising",
662) -> list[BusTransaction]:
663 """Sample bus at clock edges.
665 Convenience function for clock-based bus sampling.
667 Args:
668 bit_traces: Dictionary mapping channel index to trace data.
669 clock_trace: Clock signal trace.
670 config: Bus configuration.
671 sample_rate: Sample rate of traces in Hz.
672 edge: Which clock edge to sample on.
674 Returns:
675 List of BusTransaction objects.
677 Example:
678 >>> transactions = sample_at_clock(bit_traces, clock, config, 100e6, 'rising')
679 """
680 decoder = BusDecoder(config, sample_rate)
681 return decoder.sample_at_clock(bit_traces, clock_trace, edge)
684__all__ = [
685 "BusConfig",
686 "BusDecoder",
687 "BusTransaction",
688 "ParallelBusConfig",
689 "decode_bus",
690 "sample_at_clock",
691]