Coverage for src / tracekit / analyzers / digital / bus.py: 96%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Configurable multi-bit parallel bus decoding. 

2 

3This module provides configurable bus decoding for parallel digital signals, 

4supporting various bit orderings, active-low signaling, and clock-based 

5or interval-based sampling strategies. 

6 

7 

8Example: 

9 >>> import numpy as np 

10 >>> from tracekit.analyzers.digital.bus import BusConfig, BusDecoder 

11 >>> # Define 8-bit bus configuration 

12 >>> config = BusConfig(name="data_bus", width=8, bit_order='lsb_first') 

13 >>> config.bits = [{'channel': i, 'bit': i, 'name': f'D{i}'} for i in range(8)] 

14 >>> # Create decoder 

15 >>> decoder = BusDecoder(config, sample_rate=100e6) 

16 >>> # Decode bus values from bit traces 

17 >>> bit_traces = {i: np.random.randint(0, 2, 1000, dtype=np.uint8) for i in range(8)} 

18 >>> transactions = decoder.decode_bus(bit_traces) 

19""" 

20 

21from __future__ import annotations 

22 

23from dataclasses import dataclass, field 

24from pathlib import Path 

25from typing import TYPE_CHECKING, Any, Literal 

26 

27import numpy as np 

28 

29if TYPE_CHECKING: 

30 from numpy.typing import NDArray 

31 

32 

33@dataclass 

34class BusConfig: 

35 """Configuration for parallel bus decoding. 

36 

37 Attributes: 

38 name: Descriptive name for the bus. 

39 width: Number of bits in the bus. 

40 bit_order: Bit ordering, 'lsb_first' or 'msb_first'. 

41 active_low: Whether signals are active-low (inverted). 

42 bits: List of bit definitions with channel mapping. 

43 

44 Example: 

45 >>> config = BusConfig(name="addr_bus", width=12, bit_order='lsb_first') 

46 >>> config.bits = [{'channel': i, 'bit': i} for i in range(12)] 

47 """ 

48 

49 name: str 

50 width: int # Number of bits 

51 bit_order: Literal["lsb_first", "msb_first"] = "lsb_first" 

52 active_low: bool = False 

53 bits: list[dict[str, Any]] = field( 

54 default_factory=list 

55 ) # [{channel: 0, bit: 0, name: 'D0'}, ...] 

56 

57 def __post_init__(self) -> None: 

58 """Validate bus configuration.""" 

59 if self.width <= 0: 

60 raise ValueError(f"Bus width must be positive, got {self.width}") 

61 if self.bit_order not in ["lsb_first", "msb_first"]: 

62 raise ValueError(f"Invalid bit_order: {self.bit_order}") 

63 

64 @classmethod 

65 def from_yaml(cls, path: str | Path) -> BusConfig: 

66 """Load bus configuration from YAML file. 

67 

68 Args: 

69 path: Path to YAML configuration file. 

70 

71 Returns: 

72 BusConfig instance loaded from file. 

73 

74 Raises: 

75 ImportError: If PyYAML is not installed. 

76 FileNotFoundError: If file does not exist. 

77 """ 

78 try: 

79 import yaml 

80 except ImportError as e: 

81 raise ImportError( 

82 "PyYAML is required for YAML loading. Install with: pip install pyyaml" 

83 ) from e 

84 

85 path = Path(path) 

86 if not path.exists(): 

87 raise FileNotFoundError(f"Configuration file not found: {path}") 

88 

89 with open(path) as f: 

90 config_dict = yaml.safe_load(f) 

91 

92 return cls.from_dict(config_dict) 

93 

94 @classmethod 

95 def from_dict(cls, config: dict[str, Any]) -> BusConfig: 

96 """Create bus configuration from dictionary. 

97 

98 Args: 

99 config: Dictionary with bus configuration parameters. 

100 

101 Returns: 

102 BusConfig instance created from dictionary. 

103 

104 Example: 

105 >>> config_dict = { 

106 ... 'name': 'data_bus', 

107 ... 'width': 8, 

108 ... 'bit_order': 'lsb_first', 

109 ... 'bits': [{'channel': i, 'bit': i} for i in range(8)] 

110 ... } 

111 >>> config = BusConfig.from_dict(config_dict) 

112 """ 

113 return cls( 

114 name=config.get("name", "bus"), 

115 width=config["width"], 

116 bit_order=config.get("bit_order", "lsb_first"), 

117 active_low=config.get("active_low", False), 

118 bits=config.get("bits", []), 

119 ) 

120 

121 

122@dataclass 

123class ParallelBusConfig: 

124 """Configuration for parallel bus decoding with simplified interface. 

125 

126 This is a convenience class for tests that provides a simpler interface 

127 than the full BusConfig. 

128 

129 Attributes: 

130 data_width: Number of data bits in the bus. 

131 bit_order: Bit ordering, 'lsb_first' or 'msb_first'. 

132 has_clock: Whether the bus uses a clock signal. 

133 address_width: Optional number of address bits. 

134 active_low: Whether signals are active-low. 

135 

136 Example: 

137 >>> config = ParallelBusConfig(data_width=8, bit_order='lsb_first') 

138 """ 

139 

140 data_width: int 

141 bit_order: Literal["lsb_first", "msb_first"] = "lsb_first" 

142 has_clock: bool = False 

143 address_width: int | None = None 

144 active_low: bool = False 

145 

146 def __post_init__(self) -> None: 

147 """Validate configuration.""" 

148 if self.data_width <= 0: 

149 raise ValueError(f"data_width must be positive, got {self.data_width}") 

150 if self.address_width is not None and self.address_width <= 0: 

151 raise ValueError(f"address_width must be positive, got {self.address_width}") 

152 

153 def to_bus_config(self, name: str = "parallel_bus") -> BusConfig: 

154 """Convert to BusConfig. 

155 

156 Args: 

157 name: Name for the bus configuration. 

158 

159 Returns: 

160 BusConfig instance. 

161 """ 

162 return BusConfig( 

163 name=name, 

164 width=self.data_width, 

165 bit_order=self.bit_order, 

166 active_low=self.active_low, 

167 bits=[{"channel": i, "bit": i} for i in range(self.data_width)], 

168 ) 

169 

170 

171@dataclass 

172class BusTransaction: 

173 """A decoded bus transaction. 

174 

175 Attributes: 

176 timestamp: Time in seconds when transaction occurred. 

177 sample_index: Sample index in the original traces. 

178 value: Decoded bus value as integer. 

179 raw_bits: Individual bit values (after active-low inversion if applicable). 

180 transaction_type: Optional transaction type label. 

181 address: Optional address field if this is an address bus. 

182 data: Optional data field if this is a data bus. 

183 """ 

184 

185 timestamp: float # Time in seconds 

186 sample_index: int 

187 value: int # Decoded bus value 

188 raw_bits: list[int] # Individual bit values 

189 transaction_type: str = "" # 'read', 'write', etc. 

190 address: int | None = None # If address bus present 

191 data: int | None = None # If data value 

192 

193 

194class BusDecoder: 

195 """Decode multi-bit parallel buses from individual bit traces. 

196 

197 Supports configurable bit ordering, active-low signaling, and various 

198 sampling strategies (clock-based or interval-based). 

199 

200 Attributes: 

201 config: Bus configuration specifying width, ordering, etc. 

202 sample_rate: Sample rate of input traces in Hz. 

203 

204 Example: 

205 >>> config = BusConfig(name="data", width=8, bit_order='lsb_first') 

206 >>> decoder = BusDecoder(config, sample_rate=100e6) 

207 >>> bit_traces = {i: trace_data for i in range(8)} 

208 >>> transactions = decoder.decode_bus(bit_traces) 

209 """ 

210 

211 def __init__( 

212 self, 

213 config: BusConfig | ParallelBusConfig, 

214 sample_rate: float = 1.0, 

215 ): 

216 """Initialize decoder with configuration. 

217 

218 Args: 

219 config: Bus configuration (BusConfig or ParallelBusConfig). 

220 sample_rate: Sample rate of input traces in Hz. 

221 

222 Raises: 

223 ValueError: If sample rate is invalid. 

224 """ 

225 if sample_rate <= 0: 

226 raise ValueError(f"Sample rate must be positive, got {sample_rate}") 

227 

228 # Handle ParallelBusConfig 

229 self._parallel_config: ParallelBusConfig | None 

230 if isinstance(config, ParallelBusConfig): 

231 self._parallel_config = config 

232 self.config = config.to_bus_config() 

233 else: 

234 self._parallel_config = None 

235 self.config = config 

236 

237 self.sample_rate = sample_rate 

238 self._time_base = 1.0 / sample_rate 

239 

240 def decode_bus( 

241 self, 

242 bit_traces: dict[int, NDArray[np.uint8]], # channel_index -> trace data 

243 clock_trace: NDArray[np.uint8] | None = None, 

244 clock_edge: Literal["rising", "falling"] = "rising", 

245 ) -> list[BusTransaction]: 

246 """Decode bus values from individual bit traces. 

247 

248 Args: 

249 bit_traces: Dictionary mapping channel index to trace data (boolean or 0/1). 

250 clock_trace: Optional clock signal for synchronous sampling. 

251 clock_edge: Which clock edge to sample on ('rising' or 'falling'). 

252 

253 Returns: 

254 List of BusTransaction objects with decoded values. 

255 

256 Raises: 

257 ValueError: If bit traces don't match configuration. 

258 

259 Example: 

260 >>> bit_traces = {0: np.array([0,1,1,0]), 1: np.array([1,1,0,0])} 

261 >>> transactions = decoder.decode_bus(bit_traces) 

262 """ 

263 if not bit_traces: 

264 raise ValueError("bit_traces cannot be empty") 

265 

266 # Use clock-based or interval-based sampling 

267 if clock_trace is not None: 

268 return self.sample_at_clock(bit_traces, clock_trace, clock_edge) 

269 else: 

270 # Sample every point (could be optimized with interval sampling) 

271 _trace_length = len(next(iter(bit_traces.values()))) 

272 return self.sample_at_intervals(bit_traces, interval_samples=1) 

273 

274 def decode_parallel( 

275 self, 

276 channels: list[NDArray[np.uint8]], 

277 ) -> list[int]: 

278 """Decode parallel bus values from channel list. 

279 

280 Simplified interface for parallel bus decoding without clock. 

281 

282 Args: 

283 channels: List of channel data arrays, indexed by bit position. 

284 

285 Returns: 

286 List of decoded integer values (one per sample). 

287 

288 Example: 

289 >>> channels = [ch0, ch1, ch2, ch3] # 4-bit bus 

290 >>> values = decoder.decode_parallel(channels) 

291 """ 

292 if not channels: 

293 return [] 

294 

295 trace_length = len(channels[0]) 

296 width = len(channels) 

297 bit_order = self.config.bit_order 

298 

299 values = [] 

300 for sample_idx in range(trace_length): 

301 value = 0 

302 for bit_idx in range(width): 

303 bit_val = int(bool(channels[bit_idx][sample_idx])) 

304 if self.config.active_low: 

305 bit_val = 1 - bit_val 

306 

307 if bit_order == "lsb_first": 

308 if bit_val: 

309 value |= 1 << bit_idx 

310 else: # msb_first 

311 if bit_val: 

312 value |= 1 << (width - 1 - bit_idx) 

313 

314 values.append(value) 

315 

316 return values 

317 

318 def decode_with_clock( 

319 self, 

320 channels: list[NDArray[np.uint8]], 

321 clock: NDArray[np.uint8], 

322 edge: Literal["rising", "falling"] = "rising", 

323 ) -> list[int]: 

324 """Decode parallel bus values at clock edges. 

325 

326 Args: 

327 channels: List of channel data arrays, indexed by bit position. 

328 clock: Clock signal trace (boolean or 0/1). 

329 edge: Which edge to sample on ('rising' or 'falling'). 

330 

331 Returns: 

332 List of decoded integer values (one per clock edge). 

333 

334 Example: 

335 >>> values = decoder.decode_with_clock(channels, clock, 'rising') 

336 """ 

337 if not channels: 

338 return [] 

339 

340 # Convert clock to boolean 

341 clock_bool = np.asarray(clock, dtype=bool) 

342 

343 # Find edges 

344 if edge == "rising": 

345 edges = np.where(np.diff(clock_bool.astype(int)) > 0)[0] + 1 

346 else: 

347 edges = np.where(np.diff(clock_bool.astype(int)) < 0)[0] + 1 

348 

349 width = len(channels) 

350 bit_order = self.config.bit_order 

351 

352 values = [] 

353 for edge_idx in edges: 

354 value = 0 

355 for bit_idx in range(width): 

356 if edge_idx < len(channels[bit_idx]): 356 ↛ 359line 356 didn't jump to line 359 because the condition on line 356 was always true

357 bit_val = int(bool(channels[bit_idx][edge_idx])) 

358 else: 

359 bit_val = 0 

360 

361 if self.config.active_low: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 bit_val = 1 - bit_val 

363 

364 if bit_order == "lsb_first": 

365 if bit_val: 

366 value |= 1 << bit_idx 

367 else: # msb_first 

368 if bit_val: 

369 value |= 1 << (width - 1 - bit_idx) 

370 

371 values.append(value) 

372 

373 return values 

374 

375 def decode_transactions( 

376 self, 

377 address_channels: list[NDArray[np.uint8]], 

378 data_channels: list[NDArray[np.uint8]], 

379 clock: NDArray[np.uint8], 

380 edge: Literal["rising", "falling"] = "rising", 

381 ) -> list[dict[str, int]]: 

382 """Decode bus transactions with address and data. 

383 

384 Args: 

385 address_channels: List of address channel data arrays. 

386 data_channels: List of data channel data arrays. 

387 clock: Clock signal trace. 

388 edge: Which clock edge to sample on. 

389 

390 Returns: 

391 List of transaction dictionaries with 'address' and 'data' keys. 

392 

393 Example: 

394 >>> transactions = decoder.decode_transactions( 

395 ... address_channels=addr_ch, 

396 ... data_channels=data_ch, 

397 ... clock=clk 

398 ... ) 

399 """ 

400 # Convert clock to boolean 

401 clock_bool = np.asarray(clock, dtype=bool) 

402 

403 # Find edges 

404 if edge == "rising": 

405 edges = np.where(np.diff(clock_bool.astype(int)) > 0)[0] + 1 

406 else: 

407 edges = np.where(np.diff(clock_bool.astype(int)) < 0)[0] + 1 

408 

409 addr_width = len(address_channels) 

410 data_width = len(data_channels) 

411 bit_order = self.config.bit_order 

412 

413 transactions = [] 

414 for edge_idx in edges: 

415 # Decode address 

416 address = 0 

417 for bit_idx in range(addr_width): 

418 if edge_idx < len(address_channels[bit_idx]): 418 ↛ 421line 418 didn't jump to line 421 because the condition on line 418 was always true

419 bit_val = int(bool(address_channels[bit_idx][edge_idx])) 

420 else: 

421 bit_val = 0 

422 

423 if bit_order == "lsb_first": 

424 if bit_val: 

425 address |= 1 << bit_idx 

426 else: 

427 if bit_val: 

428 address |= 1 << (addr_width - 1 - bit_idx) 

429 

430 # Decode data 

431 data = 0 

432 for bit_idx in range(data_width): 

433 if edge_idx < len(data_channels[bit_idx]): 433 ↛ 436line 433 didn't jump to line 436 because the condition on line 433 was always true

434 bit_val = int(bool(data_channels[bit_idx][edge_idx])) 

435 else: 

436 bit_val = 0 

437 

438 if bit_order == "lsb_first": 

439 if bit_val: 

440 data |= 1 << bit_idx 

441 else: 

442 if bit_val: 

443 data |= 1 << (data_width - 1 - bit_idx) 

444 

445 transactions.append( 

446 { 

447 "address": address, 

448 "data": data, 

449 "sample_index": int(edge_idx), 

450 } 

451 ) 

452 

453 return transactions 

454 

455 def sample_at_clock( 

456 self, 

457 bit_traces: dict[int, NDArray[np.uint8]], 

458 clock_trace: NDArray[np.uint8], 

459 edge: Literal["rising", "falling"] = "rising", 

460 ) -> list[BusTransaction]: 

461 """Sample bus at clock edges. 

462 

463 Args: 

464 bit_traces: Dictionary mapping channel index to trace data. 

465 clock_trace: Clock signal trace (boolean or 0/1). 

466 edge: Which edge to sample on ('rising' or 'falling'). 

467 

468 Returns: 

469 List of BusTransaction objects sampled at clock edges. 

470 

471 Example: 

472 >>> clock = np.array([0,1,0,1,0,1], dtype=bool) 

473 >>> transactions = decoder.sample_at_clock(bit_traces, clock, 'rising') 

474 """ 

475 # Convert clock to boolean 

476 clock_bool = np.asarray(clock_trace, dtype=bool) 

477 

478 # Find edges 

479 if edge == "rising": 

480 # Rising edge: 0->1 transition 

481 edges = np.where(np.diff(clock_bool.astype(int)) > 0)[0] + 1 

482 else: 

483 # Falling edge: 1->0 transition 

484 edges = np.where(np.diff(clock_bool.astype(int)) < 0)[0] + 1 

485 

486 transactions = [] 

487 

488 for edge_idx in edges: 

489 # Sample all bits at this edge 

490 bit_values = [] 

491 for bit_def in self.config.bits: 

492 channel = bit_def.get("channel", bit_def.get("bit", 0)) 

493 if channel in bit_traces: 

494 trace = bit_traces[channel] 

495 if edge_idx < len(trace): 495 ↛ 499line 495 didn't jump to line 499 because the condition on line 495 was always true

496 bit_val = int(bool(trace[edge_idx])) 

497 bit_values.append(bit_val) 

498 else: 

499 bit_values.append(0) 

500 else: 

501 bit_values.append(0) 

502 

503 # Apply active-low inversion if needed 

504 if self.config.active_low: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 bit_values = self._apply_active_low(bit_values) 

506 

507 # Reconstruct bus value 

508 value = self._reconstruct_value(bit_values) 

509 

510 # Create transaction 

511 transaction = BusTransaction( 

512 timestamp=edge_idx * self._time_base, 

513 sample_index=int(edge_idx), 

514 value=value, 

515 raw_bits=bit_values, 

516 ) 

517 transactions.append(transaction) 

518 

519 return transactions 

520 

521 def sample_at_intervals( 

522 self, bit_traces: dict[int, NDArray[np.uint8]], interval_samples: int 

523 ) -> list[BusTransaction]: 

524 """Sample bus at regular intervals. 

525 

526 Args: 

527 bit_traces: Dictionary mapping channel index to trace data. 

528 interval_samples: Number of samples between each bus sample. 

529 

530 Returns: 

531 List of BusTransaction objects sampled at intervals. 

532 

533 Raises: 

534 ValueError: If interval_samples is not positive. 

535 

536 Example: 

537 >>> transactions = decoder.sample_at_intervals(bit_traces, interval_samples=10) 

538 """ 

539 if interval_samples <= 0: 

540 raise ValueError(f"interval_samples must be positive, got {interval_samples}") 

541 

542 # Determine trace length 

543 trace_length = len(next(iter(bit_traces.values()))) 

544 

545 transactions = [] 

546 

547 for sample_idx in range(0, trace_length, interval_samples): 

548 # Sample all bits at this index 

549 bit_values = [] 

550 for bit_def in self.config.bits: 

551 channel = bit_def.get("channel", bit_def.get("bit", 0)) 

552 if channel in bit_traces: 552 ↛ 560line 552 didn't jump to line 560 because the condition on line 552 was always true

553 trace = bit_traces[channel] 

554 if sample_idx < len(trace): 

555 bit_val = int(bool(trace[sample_idx])) 

556 bit_values.append(bit_val) 

557 else: 

558 bit_values.append(0) 

559 else: 

560 bit_values.append(0) 

561 

562 # Apply active-low inversion if needed 

563 if self.config.active_low: 

564 bit_values = self._apply_active_low(bit_values) 

565 

566 # Reconstruct bus value 

567 value = self._reconstruct_value(bit_values) 

568 

569 # Create transaction 

570 transaction = BusTransaction( 

571 timestamp=sample_idx * self._time_base, 

572 sample_index=sample_idx, 

573 value=value, 

574 raw_bits=bit_values, 

575 ) 

576 transactions.append(transaction) 

577 

578 return transactions 

579 

580 def _reconstruct_value(self, bit_values: list[int]) -> int: 

581 """Reconstruct bus value from individual bits. 

582 

583 Args: 

584 bit_values: List of bit values (0 or 1) in config order. 

585 

586 Returns: 

587 Integer value reconstructed from bits. 

588 """ 

589 if not bit_values: 

590 return 0 

591 

592 value = 0 

593 

594 if self.config.bit_order == "lsb_first": 

595 # LSB is first in list, MSB is last 

596 for i, bit_val in enumerate(bit_values): 

597 if bit_val: 

598 value |= 1 << i 

599 else: # msb_first 

600 # MSB is first in list, LSB is last 

601 n_bits = len(bit_values) 

602 for i, bit_val in enumerate(bit_values): 

603 if bit_val: 

604 value |= 1 << (n_bits - 1 - i) 

605 

606 return value 

607 

608 def _apply_active_low(self, bit_values: list[int]) -> list[int]: 

609 """Apply active-low inversion if configured. 

610 

611 Args: 

612 bit_values: List of bit values (0 or 1). 

613 

614 Returns: 

615 Inverted bit values if active_low is True, otherwise unchanged. 

616 """ 

617 if self.config.active_low: 

618 return [1 - bit for bit in bit_values] 

619 return bit_values 

620 

621 

622# Convenience functions 

623 

624 

625def decode_bus( 

626 bit_traces: dict[int, NDArray[np.uint8]], 

627 config: BusConfig | str | Path, 

628 sample_rate: float, 

629 clock_trace: NDArray[np.uint8] | None = None, 

630 clock_edge: Literal["rising", "falling"] = "rising", 

631) -> list[BusTransaction]: 

632 """Decode bus from bit traces. 

633 

634 Convenience function for quick bus decoding without creating a decoder instance. 

635 

636 Args: 

637 bit_traces: Dictionary mapping channel index to trace data. 

638 config: BusConfig instance or path to YAML config file. 

639 sample_rate: Sample rate of traces in Hz. 

640 clock_trace: Optional clock signal for synchronous sampling. 

641 clock_edge: Which clock edge to sample on. 

642 

643 Returns: 

644 List of BusTransaction objects. 

645 

646 Example: 

647 >>> transactions = decode_bus(bit_traces, 'bus_config.yaml', 100e6) 

648 """ 

649 if isinstance(config, str | Path): 

650 config = BusConfig.from_yaml(config) 

651 

652 decoder = BusDecoder(config, sample_rate) 

653 return decoder.decode_bus(bit_traces, clock_trace, clock_edge) 

654 

655 

656def sample_at_clock( 

657 bit_traces: dict[int, NDArray[np.uint8]], 

658 clock_trace: NDArray[np.uint8], 

659 config: BusConfig, 

660 sample_rate: float, 

661 edge: Literal["rising", "falling"] = "rising", 

662) -> list[BusTransaction]: 

663 """Sample bus at clock edges. 

664 

665 Convenience function for clock-based bus sampling. 

666 

667 Args: 

668 bit_traces: Dictionary mapping channel index to trace data. 

669 clock_trace: Clock signal trace. 

670 config: Bus configuration. 

671 sample_rate: Sample rate of traces in Hz. 

672 edge: Which clock edge to sample on. 

673 

674 Returns: 

675 List of BusTransaction objects. 

676 

677 Example: 

678 >>> transactions = sample_at_clock(bit_traces, clock, config, 100e6, 'rising') 

679 """ 

680 decoder = BusDecoder(config, sample_rate) 

681 return decoder.sample_at_clock(bit_traces, clock_trace, edge) 

682 

683 

684__all__ = [ 

685 "BusConfig", 

686 "BusDecoder", 

687 "BusTransaction", 

688 "ParallelBusConfig", 

689 "decode_bus", 

690 "sample_at_clock", 

691]