Coverage for src / tracekit / loaders / configurable.py: 80%

406 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Configurable binary packet loader with schema-driven parsing. 

2 

3This module provides a flexible, configuration-driven system for loading 

4binary packet/frame data from custom DAQ systems, logic analyzers, and 

5packet captures without code changes. 

6 

7Features: 

8 - Schema-driven packet format definition 

9 - Device/source configuration mapping 

10 - Multi-source binary data loader 

11 

12Example: 

13 >>> from tracekit.loaders.configurable import load_binary_packets 

14 >>> packets = load_binary_packets( 

15 ... "capture.bin", 

16 ... format_config="packet_format.yaml", 

17 ... device_config="device_mapping.yaml" 

18 ... ) 

19 >>> traces = extract_channels(packets, {"ch0": {"bits": [0, 7]}}) 

20 >>> print(f"Loaded {len(traces['ch0'].data)} samples") 

21""" 

22 

23from __future__ import annotations 

24 

25import json 

26import logging 

27import struct 

28from collections.abc import Iterator 

29from dataclasses import dataclass, field 

30from pathlib import Path 

31from typing import TYPE_CHECKING, Any 

32 

33import numpy as np 

34import yaml 

35 

36from tracekit.core.exceptions import ConfigurationError, FormatError, LoaderError 

37from tracekit.core.types import DigitalTrace, TraceMetadata 

38 

39if TYPE_CHECKING: 

40 from os import PathLike 

41 

42 

43# Logger for debug output 

44logger = logging.getLogger(__name__) 

45 

46# Type size mapping in bytes 

47TYPE_SIZES = { 

48 "uint8": 1, 

49 "uint16": 2, 

50 "uint32": 4, 

51 "uint40": 5, 

52 "uint48": 6, 

53 "uint64": 8, 

54 "int8": 1, 

55 "int16": 2, 

56 "int32": 4, 

57 "int64": 8, 

58 "float32": 4, 

59 "float64": 8, 

60} 

61 

62# Type alias for parsed packet data 

63ParsedPacket = dict[str, Any] 

64"""Type alias for a parsed packet dictionary with header and samples.""" 

65 

66 

67@dataclass 

68class BitfieldDef: 

69 """Bitfield definition within a header field. 

70 

71 Attributes: 

72 name: Bitfield name. 

73 bit: Single bit position (if single-bit field). 

74 bits: Bit range [start, end] inclusive (if multi-bit field). 

75 description: Human-readable description (optional). 

76 """ 

77 

78 name: str 

79 bit: int | None = None 

80 bits: tuple[int, int] | None = None 

81 description: str = "" 

82 

83 def __post_init__(self) -> None: 

84 """Validate bitfield definition.""" 

85 if self.bit is None and self.bits is None: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 raise ConfigurationError( 

87 "BitfieldDef must have either 'bit' or 'bits' specified", 

88 config_key=f"{self.name}", 

89 ) 

90 if self.bit is not None and self.bits is not None: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 raise ConfigurationError( 

92 "BitfieldDef cannot have both 'bit' and 'bits' specified", 

93 config_key=f"{self.name}", 

94 ) 

95 

96 

97@dataclass 

98class DeviceInfo: 

99 """Device information from configuration. 

100 

101 Attributes: 

102 name: Full device name. 

103 short_name: Short device name (optional). 

104 description: Device description (optional). 

105 category: Device category (optional). 

106 sample_rate: Sample rate in Hz (optional). 

107 channels: Number of channels (optional). 

108 properties: Additional device properties (optional). 

109 """ 

110 

111 name: str 

112 short_name: str = "" 

113 description: str = "" 

114 category: str = "" 

115 sample_rate: float | None = None 

116 channels: int | None = None 

117 properties: dict[str, Any] = field(default_factory=dict) 

118 

119 @classmethod 

120 def from_dict(cls, data: dict[str, Any]) -> DeviceInfo: 

121 """Create DeviceInfo from dictionary. 

122 

123 Args: 

124 data: Device configuration dictionary. 

125 

126 Returns: 

127 DeviceInfo instance. 

128 """ 

129 return cls( 

130 name=data.get("name", "Unknown Device"), 

131 short_name=data.get("short_name", ""), 

132 description=data.get("description", ""), 

133 category=data.get("category", ""), 

134 sample_rate=data.get("sample_rate"), 

135 channels=data.get("channels"), 

136 properties=data.get("properties", {}), 

137 ) 

138 

139 

140@dataclass 

141class HeaderFieldDef: 

142 """Header field definition. 

143 

144 Defines a single field within a packet header including offset, 

145 size, type, and endianness. 

146 

147 Attributes: 

148 name: Field name. 

149 offset: Byte offset from start of packet. 

150 size: Field size in bytes. 

151 type: Data type (uint8, uint16, uint32, uint40, uint48, uint64, bitfield, bytes). 

152 endian: Byte order ("big", "little", or "native"). 

153 value: Expected constant value for validation (optional). 

154 fields: Bitfield definitions if type is "bitfield" (optional). 

155 description: Human-readable description (optional). 

156 """ 

157 

158 name: str 

159 offset: int 

160 size: int 

161 type: str 

162 endian: str = "big" 

163 value: int | bytes | None = None 

164 fields: dict[str, Any] | None = None 

165 description: str = "" 

166 

167 def __post_init__(self) -> None: 

168 """Validate field definition.""" 

169 if self.offset < 0: 

170 raise ConfigurationError( 

171 "Field offset must be non-negative", 

172 config_key=f"{self.name}.offset", 

173 actual_value=self.offset, 

174 ) 

175 if self.size <= 0: 

176 raise ConfigurationError( 

177 "Field size must be positive", 

178 config_key=f"{self.name}.size", 

179 actual_value=self.size, 

180 ) 

181 if self.endian not in ("big", "little", "native"): 

182 raise ConfigurationError( 

183 "Invalid endianness", 

184 config_key=f"{self.name}.endian", 

185 expected_type="'big', 'little', or 'native'", 

186 actual_value=self.endian, 

187 ) 

188 

189 

190@dataclass 

191class SampleFormatDef: 

192 """Sample data format definition. 

193 

194 Defines how to extract sample data from packets. 

195 

196 Attributes: 

197 size: Bytes per sample. 

198 type: Data type (uint8, uint16, uint32, uint64). 

199 endian: Byte order ("big", "little", or "native"). 

200 description: Human-readable description (optional). 

201 """ 

202 

203 size: int 

204 type: str 

205 endian: str = "little" 

206 description: str = "" 

207 

208 def __post_init__(self) -> None: 

209 """Validate sample format.""" 

210 if self.size <= 0: 

211 raise ConfigurationError( 

212 "Sample size must be positive", 

213 config_key="samples.format.size", 

214 actual_value=self.size, 

215 ) 

216 

217 

218@dataclass 

219class PacketFormatConfig: 

220 """Packet format configuration. 

221 

222 Complete packet format specification loaded from YAML/JSON. 

223 

224 Attributes: 

225 name: Format name. 

226 version: Format version. 

227 packet_size: Total packet size in bytes (or "variable"). 

228 byte_order: Default byte order ("big", "little", "native"). 

229 length_field: Header field name containing packet length (for variable-length packets). 

230 length_includes_header: Whether length field includes header size (default True). 

231 header_size: Header size in bytes. 

232 header_fields: List of header field definitions. 

233 sample_offset: Offset where samples begin. 

234 sample_count: Number of samples per packet. 

235 sample_format: Sample format definition. 

236 channel_extraction: Channel extraction configuration (optional). 

237 validation: Validation rules (optional). 

238 description: Human-readable description (optional). 

239 """ 

240 

241 name: str 

242 version: str 

243 packet_size: int | str 

244 byte_order: str 

245 length_field: str | None = None 

246 length_includes_header: bool = True 

247 header_size: int = 0 

248 header_fields: list[HeaderFieldDef] = field(default_factory=list) 

249 sample_offset: int = 0 

250 sample_count: int = 0 

251 sample_format: SampleFormatDef | None = None 

252 channel_extraction: dict[str, Any] | None = None 

253 validation: dict[str, Any] | None = None 

254 description: str = "" 

255 

256 @classmethod 

257 def from_file(cls, path: str | PathLike[str]) -> PacketFormatConfig: 

258 """Load packet format from YAML or JSON file. 

259 

260 Automatically detects file format based on extension. 

261 

262 Args: 

263 path: Path to configuration file (.yaml, .yml, or .json). 

264 

265 Returns: 

266 PacketFormatConfig instance. 

267 

268 Example: 

269 >>> config = PacketFormatConfig.from_file("packet_format.yaml") 

270 >>> print(f"Loaded format: {config.name} v{config.version}") 

271 """ 

272 path = Path(path) 

273 ext = path.suffix.lower() 

274 

275 if ext in (".yaml", ".yml"): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true

276 return cls.from_yaml(path) 

277 elif ext == ".json": 

278 return cls.from_json(path) 

279 else: 

280 # Try YAML by default 

281 logger.warning("Unknown file extension '%s', attempting YAML parsing", ext) 

282 return cls.from_yaml(path) 

283 

284 @classmethod 

285 def from_dict(cls, config: dict[str, Any]) -> PacketFormatConfig: 

286 """Load packet format from dictionary. 

287 

288 Args: 

289 config: Configuration dictionary. 

290 

291 Returns: 

292 PacketFormatConfig instance. 

293 

294 Raises: 

295 ConfigurationError: If configuration is invalid. 

296 

297 Example: 

298 >>> config_dict = { 

299 ... "name": "my_format", 

300 ... "version": "1.0", 

301 ... "packet": {"size": 1024, "byte_order": "big"}, 

302 ... "header": {"size": 16, "fields": []}, 

303 ... "samples": {"offset": 16, "count": 126, "format": {"size": 8, "type": "uint64"}} 

304 ... } 

305 >>> config = PacketFormatConfig.from_dict(config_dict) 

306 """ 

307 # Validate required fields 

308 required = ["name", "version", "packet", "header", "samples"] 

309 missing = [key for key in required if key not in config] 

310 if missing: 

311 raise ConfigurationError( 

312 f"Missing required configuration keys: {', '.join(missing)}", 

313 fix_hint="Ensure configuration has all required sections.", 

314 ) 

315 

316 # Parse packet configuration 

317 packet_cfg = config["packet"] 

318 packet_size = packet_cfg.get("size", "variable") 

319 byte_order = packet_cfg.get("byte_order", "big") 

320 length_field = packet_cfg.get("length_field") 

321 length_includes_header = packet_cfg.get("length_includes_header", True) 

322 

323 # Parse header configuration 

324 header_cfg = config["header"] 

325 header_size = header_cfg["size"] 

326 header_fields = [] 

327 for field_cfg in header_cfg.get("fields", []): 

328 header_fields.append( 

329 HeaderFieldDef( 

330 name=field_cfg["name"], 

331 offset=field_cfg["offset"], 

332 size=field_cfg["size"], 

333 type=field_cfg["type"], 

334 endian=field_cfg.get("endian", byte_order), 

335 value=field_cfg.get("value"), 

336 fields=field_cfg.get("fields"), 

337 description=field_cfg.get("description", ""), 

338 ) 

339 ) 

340 

341 # Parse samples configuration 

342 samples_cfg = config["samples"] 

343 sample_offset = samples_cfg["offset"] 

344 sample_count = samples_cfg["count"] 

345 sample_format = SampleFormatDef( 

346 size=samples_cfg["format"]["size"], 

347 type=samples_cfg["format"]["type"], 

348 endian=samples_cfg["format"].get("endian", "little"), 

349 description=samples_cfg["format"].get("description", ""), 

350 ) 

351 

352 # Optional configurations 

353 channel_extraction = samples_cfg.get("channel_extraction") 

354 validation = config.get("validation") 

355 

356 return cls( 

357 name=config["name"], 

358 version=config["version"], 

359 packet_size=packet_size, 

360 byte_order=byte_order, 

361 length_field=length_field, 

362 length_includes_header=length_includes_header, 

363 header_size=header_size, 

364 header_fields=header_fields, 

365 sample_offset=sample_offset, 

366 sample_count=sample_count, 

367 sample_format=sample_format, 

368 channel_extraction=channel_extraction, 

369 validation=validation, 

370 description=config.get("description", ""), 

371 ) 

372 

373 @classmethod 

374 def from_yaml(cls, path: str | PathLike[str]) -> PacketFormatConfig: 

375 """Load packet format from YAML file. 

376 

377 Args: 

378 path: Path to YAML configuration file. 

379 

380 Returns: 

381 PacketFormatConfig instance. 

382 

383 Raises: 

384 LoaderError: If file cannot be read or configuration is invalid. 

385 """ 

386 path = Path(path) 

387 if not path.exists(): 

388 raise LoaderError( 

389 "Configuration file not found", 

390 file_path=str(path), 

391 ) 

392 

393 try: 

394 with open(path, encoding="utf-8") as f: 

395 config = yaml.safe_load(f) 

396 except Exception as e: 

397 raise LoaderError( 

398 "Failed to load configuration file", 

399 file_path=str(path), 

400 details=str(e), 

401 ) from e 

402 

403 return cls.from_dict(config) 

404 

405 @classmethod 

406 def from_json(cls, path: str | PathLike[str]) -> PacketFormatConfig: 

407 """Load packet format from JSON file. 

408 

409 Args: 

410 path: Path to JSON configuration file. 

411 

412 Returns: 

413 PacketFormatConfig instance. 

414 

415 Raises: 

416 LoaderError: If file cannot be read or configuration is invalid. 

417 """ 

418 path = Path(path) 

419 if not path.exists(): 

420 raise LoaderError( 

421 "Configuration file not found", 

422 file_path=str(path), 

423 ) 

424 

425 try: 

426 with open(path, encoding="utf-8") as f: 

427 config = json.load(f) 

428 except Exception as e: 

429 raise LoaderError( 

430 "Failed to load JSON configuration file", 

431 file_path=str(path), 

432 details=str(e), 

433 ) from e 

434 

435 return cls.from_dict(config) 

436 

437 

438@dataclass 

439class DeviceConfig: 

440 """Device configuration mapping. 

441 

442 Maps device IDs to names and parameters. 

443 

444 Attributes: 

445 devices: Dictionary mapping device ID to device info. 

446 categories: Category definitions (optional). 

447 channels: Channel configuration (optional). 

448 unknown_policy: How to handle unknown devices ("error", "warn", "ignore"). 

449 """ 

450 

451 devices: dict[int, dict[str, Any]] 

452 categories: dict[str, Any] = field(default_factory=dict) 

453 channels: dict[int, Any] = field(default_factory=dict) 

454 unknown_policy: str = "warn" 

455 

456 @classmethod 

457 def from_yaml(cls, path: str | PathLike[str]) -> DeviceConfig: 

458 """Load device configuration from YAML file. 

459 

460 Args: 

461 path: Path to YAML configuration file. 

462 

463 Returns: 

464 DeviceConfig instance. 

465 

466 Raises: 

467 LoaderError: If file cannot be read or configuration is invalid. 

468 """ 

469 path = Path(path) 

470 if not path.exists(): 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 raise LoaderError( 

472 "Device configuration file not found", 

473 file_path=str(path), 

474 ) 

475 

476 try: 

477 with open(path, encoding="utf-8") as f: 

478 config = yaml.safe_load(f) 

479 except Exception as e: 

480 raise LoaderError( 

481 "Failed to load device configuration", 

482 file_path=str(path), 

483 details=str(e), 

484 ) from e 

485 

486 # Parse device mappings 

487 devices = {} 

488 for dev_id_str, dev_info in config.get("devices", {}).items(): 

489 # Convert hex or decimal string to int 

490 if isinstance(dev_id_str, str): 

491 dev_id = int(dev_id_str, 16 if dev_id_str.startswith("0x") else 10) 

492 else: 

493 dev_id = int(dev_id_str) 

494 devices[dev_id] = dev_info 

495 

496 categories = config.get("categories", {}) 

497 channels = config.get("channels", {}) 

498 unknown_policy = config.get("unknown_device", {}).get("policy", "warn") 

499 

500 return cls( 

501 devices=devices, 

502 categories=categories, 

503 channels=channels, 

504 unknown_policy=unknown_policy, 

505 ) 

506 

507 

508class BitfieldExtractor: 

509 """Extract individual bits or bit ranges from integer values. 

510 

511 Supports extracting single bits or bit ranges from multi-byte fields. 

512 

513 Example: 

514 >>> extractor = BitfieldExtractor() 

515 >>> value = 0b1010_1100 

516 >>> extractor.extract_bit(value, 7) # Most significant bit 

517 1 

518 >>> extractor.extract_bits(value, 4, 7) # Upper nibble 

519 10 

520 """ 

521 

522 @staticmethod 

523 def extract_bit(value: int, bit: int) -> int: 

524 """Extract a single bit. 

525 

526 Args: 

527 value: Integer value. 

528 bit: Bit position (0 = LSB). 

529 

530 Returns: 

531 0 or 1. 

532 """ 

533 return (value >> bit) & 1 

534 

535 @staticmethod 

536 def extract_bits(value: int, start_bit: int, end_bit: int) -> int: 

537 """Extract a range of bits. 

538 

539 Args: 

540 value: Integer value. 

541 start_bit: Starting bit position (inclusive). 

542 end_bit: Ending bit position (inclusive). 

543 

544 Returns: 

545 Extracted value. 

546 """ 

547 num_bits = end_bit - start_bit + 1 

548 mask = (1 << num_bits) - 1 

549 return (value >> start_bit) & mask 

550 

551 

552@dataclass 

553class PacketLoadResult: 

554 """Result of packet loading operation. 

555 

556 Attributes: 

557 packets: List of loaded packets. 

558 packet_count: Number of packets loaded. 

559 """ 

560 

561 packets: list[dict[str, Any]] 

562 

563 @property 

564 def packet_count(self) -> int: 

565 """Number of packets loaded.""" 

566 return len(self.packets) 

567 

568 

569class ConfigurablePacketLoader: 

570 """Load binary packets using configuration-driven parsing. 

571 

572 Parses binary files according to packet format configuration, 

573 extracting headers and sample data. 

574 

575 Attributes: 

576 format_config: Packet format configuration. 

577 device_config: Device mapping configuration (optional). 

578 """ 

579 

580 def __init__( 

581 self, 

582 format_config: PacketFormatConfig, 

583 device_config: DeviceConfig | None = None, 

584 ) -> None: 

585 """Initialize configurable packet loader. 

586 

587 Args: 

588 format_config: Packet format configuration. 

589 device_config: Device mapping configuration (optional). 

590 """ 

591 self.format_config = format_config 

592 self.device_config = device_config 

593 self.bitfield_extractor = BitfieldExtractor() 

594 

595 def load_packets(self, path: str | PathLike[str]) -> list[dict[str, Any]]: 

596 """Load and parse all packets from binary file. 

597 

598 Args: 

599 path: Path to binary file. 

600 

601 Returns: 

602 List of parsed packet dictionaries. 

603 

604 Raises: 

605 LoaderError: If file cannot be read. 

606 """ 

607 path = Path(path) 

608 if not path.exists(): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 raise LoaderError( 

610 "Binary file not found", 

611 file_path=str(path), 

612 ) 

613 

614 packets = [] 

615 for packet in self.load_packets_streaming(path): 

616 packets.append(packet) 

617 

618 logger.info("Loaded %d packets from %s", len(packets), path) 

619 return packets 

620 

621 def load_packets_streaming( 

622 self, path: str | PathLike[str], chunk_size: int = 1000 

623 ) -> Iterator[dict[str, Any]]: 

624 """Stream packets from binary file. 

625 

626 Args: 

627 path: Path to binary file. 

628 chunk_size: Number of packets to buffer (for progress tracking). 

629 

630 Yields: 

631 Parsed packet dictionaries. 

632 

633 Raises: 

634 ConfigurationError: If packet configuration is invalid. 

635 LoaderError: If file cannot be read. 

636 FormatError: If packet parsing fails. 

637 """ 

638 path = Path(path) 

639 

640 # Check if packets are variable-length 

641 is_variable_length = ( 

642 isinstance(self.format_config.packet_size, str) 

643 and self.format_config.packet_size == "variable" 

644 ) 

645 

646 if is_variable_length: 

647 # Validate configuration for variable-length packets 

648 if not self.format_config.length_field: 

649 raise ConfigurationError( 

650 "Variable-length packets require 'length_field' in packet configuration", 

651 config_key="packet.length_field", 

652 fix_hint="Specify which header field contains the packet length", 

653 ) 

654 

655 # Determine fixed packet size (if not variable) 

656 fixed_packet_size = None 

657 if not is_variable_length: 

658 if isinstance(self.format_config.packet_size, str): 658 ↛ 659line 658 didn't jump to line 659 because the condition on line 658 was never true

659 fixed_packet_size = int(self.format_config.packet_size) 

660 else: 

661 fixed_packet_size = self.format_config.packet_size 

662 

663 try: 

664 with open(path, "rb") as f: 

665 packet_index = 0 

666 while True: 

667 if is_variable_length: 

668 # Read header first to determine packet size 

669 header_data = f.read(self.format_config.header_size) 

670 if not header_data: 

671 break 

672 

673 if len(header_data) < self.format_config.header_size: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 logger.warning( 

675 "Incomplete header at end of file (packet %d): got %d bytes, expected %d", 

676 packet_index, 

677 len(header_data), 

678 self.format_config.header_size, 

679 ) 

680 break 

681 

682 # Parse header to get length field 

683 header_dict = {} 

684 for field_def in self.format_config.header_fields: 

685 value = self._extract_field(header_data, field_def) 

686 header_dict[field_def.name] = value 

687 

688 # Get packet length from header 

689 if self.format_config.length_field not in header_dict: 689 ↛ 690line 689 didn't jump to line 690 because the condition on line 689 was never true

690 raise FormatError( 

691 f"Length field '{self.format_config.length_field}' not found in header (packet {packet_index})" 

692 ) 

693 

694 packet_length = header_dict[self.format_config.length_field] 

695 

696 # Calculate payload size 

697 if self.format_config.length_includes_header: 697 ↛ 700line 697 didn't jump to line 700 because the condition on line 697 was always true

698 payload_size = packet_length - self.format_config.header_size 

699 else: 

700 payload_size = packet_length 

701 

702 # Read remaining packet data 

703 payload_data = f.read(payload_size) 

704 if len(payload_data) < payload_size: 

705 logger.warning( 

706 "Incomplete payload at end of file (packet %d): got %d bytes, expected %d", 

707 packet_index, 

708 len(payload_data), 

709 payload_size, 

710 ) 

711 break 

712 

713 # Combine header and payload 

714 packet_data = header_data + payload_data 

715 else: 

716 # Fixed-length packets 

717 assert fixed_packet_size is not None 

718 packet_data = f.read(fixed_packet_size) 

719 if not packet_data: 

720 break 

721 

722 if len(packet_data) < fixed_packet_size: 

723 logger.warning( 

724 "Incomplete packet at end of file (packet %d): got %d bytes, expected %d", 

725 packet_index, 

726 len(packet_data), 

727 fixed_packet_size, 

728 ) 

729 break 

730 

731 try: 

732 packet = self._parse_packet(packet_data, packet_index) 

733 yield packet 

734 packet_index += 1 

735 except FormatError: 

736 logger.exception("Failed to parse packet %d", packet_index) 

737 raise 

738 

739 except OSError as e: 

740 raise LoaderError( 

741 "Failed to read binary file", 

742 file_path=str(path), 

743 details=str(e), 

744 ) from e 

745 

746 def _parse_packet(self, packet_data: bytes, packet_index: int) -> dict[str, Any]: 

747 """Parse a single packet. 

748 

749 Args: 

750 packet_data: Raw packet bytes. 

751 packet_index: Packet index in file. 

752 

753 Returns: 

754 Parsed packet dictionary with header and samples. 

755 """ 

756 packet: dict[str, Any] = { 

757 "index": packet_index, 

758 "header": {}, 

759 "samples": [], 

760 } 

761 

762 # Parse header fields 

763 for field_def in self.format_config.header_fields: 

764 value = self._extract_field(packet_data, field_def) 

765 packet["header"][field_def.name] = value 

766 

767 # Parse samples 

768 assert self.format_config.sample_format is not None 

769 sample_offset = self.format_config.sample_offset 

770 sample_count = self.format_config.sample_count 

771 sample_size = self.format_config.sample_format.size 

772 

773 for i in range(sample_count): 

774 offset = sample_offset + (i * sample_size) 

775 if offset + sample_size > len(packet_data): 

776 logger.warning("Sample %d exceeds packet bounds (packet %d)", i, packet_index) 

777 break 

778 

779 sample_bytes = packet_data[offset : offset + sample_size] 

780 sample_value = self._parse_sample(sample_bytes) 

781 packet["samples"].append(sample_value) 

782 

783 return packet 

784 

785 def _extract_field(self, packet_data: bytes, field_def: HeaderFieldDef) -> Any: 

786 """Extract a header field value. 

787 

788 Args: 

789 packet_data: Raw packet bytes. 

790 field_def: Field definition. 

791 

792 Returns: 

793 Extracted field value. 

794 

795 Raises: 

796 ConfigurationError: If field type is unsupported. 

797 FormatError: If field cannot be extracted. 

798 """ 

799 offset = field_def.offset 

800 size = field_def.size 

801 

802 if offset + size > len(packet_data): 802 ↛ 803line 802 didn't jump to line 803 because the condition on line 802 was never true

803 raise FormatError( 

804 f"Field '{field_def.name}' exceeds packet bounds", 

805 expected=f"{offset + size} bytes", 

806 got=f"{len(packet_data)} bytes", 

807 ) 

808 

809 field_bytes = packet_data[offset : offset + size] 

810 

811 # Handle different field types 

812 if field_def.type == "bytes": 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true

813 return field_bytes 

814 elif field_def.type == "bitfield": 

815 # Parse as integer first, then extract bitfields 

816 value = self._bytes_to_int(field_bytes, field_def.endian, signed=False) 

817 if field_def.fields: 817 ↛ 830line 817 didn't jump to line 830 because the condition on line 817 was always true

818 bitfields = {} 

819 for bf_name, bf_def in field_def.fields.items(): 

820 if "bit" in bf_def: 

821 bitfields[bf_name] = self.bitfield_extractor.extract_bit( 

822 value, bf_def["bit"] 

823 ) 

824 elif "bits" in bf_def: 824 ↛ 819line 824 didn't jump to line 819 because the condition on line 824 was always true

825 bit_range = bf_def["bits"] 

826 bitfields[bf_name] = self.bitfield_extractor.extract_bits( 

827 value, bit_range[0], bit_range[1] 

828 ) 

829 return bitfields 

830 return value 

831 elif field_def.type.startswith("uint"): 831 ↛ 833line 831 didn't jump to line 833 because the condition on line 831 was always true

832 return self._bytes_to_int(field_bytes, field_def.endian, signed=False) 

833 elif field_def.type.startswith("int"): 

834 return self._bytes_to_int(field_bytes, field_def.endian, signed=True) 

835 elif field_def.type == "float32": 

836 endian_char = "<" if field_def.endian == "little" else ">" 

837 return struct.unpack(f"{endian_char}f", field_bytes)[0] 

838 elif field_def.type == "float64": 

839 endian_char = "<" if field_def.endian == "little" else ">" 

840 return struct.unpack(f"{endian_char}d", field_bytes)[0] 

841 else: 

842 raise ConfigurationError( 

843 f"Unsupported field type: {field_def.type}", 

844 config_key=f"{field_def.name}.type", 

845 ) 

846 

847 def _bytes_to_int(self, data: bytes, endian: str, signed: bool) -> int: 

848 """Convert bytes to integer with specified endianness. 

849 

850 Args: 

851 data: Byte data. 

852 endian: Byte order ("big", "little", or "native"). 

853 signed: Whether to interpret as signed integer. 

854 

855 Returns: 

856 Integer value. 

857 """ 

858 from typing import Literal 

859 

860 byte_order_str = endian if endian != "native" else "little" 

861 # Type assertion for mypy - we validate endian in __post_init__ 

862 byte_order: Literal["little", "big"] = byte_order_str # type: ignore[assignment] 

863 return int.from_bytes(data, byteorder=byte_order, signed=signed) 

864 

865 def _parse_sample(self, sample_bytes: bytes) -> int: 

866 """Parse a sample value. 

867 

868 Args: 

869 sample_bytes: Raw sample bytes. 

870 

871 Returns: 

872 Sample value as integer. 

873 """ 

874 assert self.format_config.sample_format is not None 

875 return self._bytes_to_int( 

876 sample_bytes, self.format_config.sample_format.endian, signed=False 

877 ) 

878 

879 def load(self, path: str | PathLike[str]) -> PacketLoadResult: 

880 """Load packets and return result object (test-compatible API). 

881 

882 Args: 

883 path: Path to binary file. 

884 

885 Returns: 

886 PacketLoadResult with loaded packets. 

887 """ 

888 packets = self.load_packets(path) 

889 return PacketLoadResult(packets=packets) 

890 

891 def stream( 

892 self, path: str | PathLike[str], chunk_size: int = 1000 

893 ) -> Iterator[PacketLoadResult]: 

894 """Stream packets in chunks (test-compatible API). 

895 

896 Args: 

897 path: Path to binary file. 

898 chunk_size: Number of packets per chunk. 

899 

900 Yields: 

901 PacketLoadResult objects with packet chunks. 

902 """ 

903 chunk = [] 

904 for packet in self.load_packets_streaming(path, chunk_size): 

905 chunk.append(packet) 

906 if len(chunk) >= chunk_size: 

907 yield PacketLoadResult(packets=chunk) 

908 chunk = [] 

909 

910 # Yield remaining packets 

911 if chunk: 

912 yield PacketLoadResult(packets=chunk) 

913 

914 

915class DeviceMapper: 

916 """Map device IDs to names and metadata. 

917 

918 Provides human-readable names and configuration for devices 

919 identified in packet headers. 

920 

921 Attributes: 

922 config: Device configuration. 

923 """ 

924 

925 def __init__(self, config: DeviceConfig) -> None: 

926 """Initialize device mapper. 

927 

928 Args: 

929 config: Device configuration. 

930 """ 

931 self.config = config 

932 

933 @classmethod 

934 def from_file(cls, path: str | PathLike[str]) -> DeviceMapper: 

935 """Create DeviceMapper from configuration file. 

936 

937 Args: 

938 path: Path to device configuration file. 

939 

940 Returns: 

941 DeviceMapper instance. 

942 

943 Example: 

944 >>> mapper = DeviceMapper.from_file("device_mapping.yaml") 

945 >>> device_name = mapper.get_device_name(0x2B) 

946 """ 

947 config = DeviceConfig.from_yaml(path) 

948 return cls(config) 

949 

950 def get_device(self, device_id: int) -> DeviceInfo | None: 

951 """Get device information object. 

952 

953 Args: 

954 device_id: Device ID from packet header. 

955 

956 Returns: 

957 DeviceInfo object or None if device not found. 

958 

959 Raises: 

960 ConfigurationError: If device ID is unknown and unknown_policy is 'error'. 

961 

962 Example: 

963 >>> device = mapper.get_device(0x2B) 

964 >>> if device: 

965 ... print(f"{device.name}: {device.sample_rate} Hz") 

966 """ 

967 if device_id in self.config.devices: 

968 return DeviceInfo.from_dict(self.config.devices[device_id]) 

969 

970 # Handle unknown device 

971 if self.config.unknown_policy == "error": 

972 raise ConfigurationError( 

973 f"Unknown device ID: 0x{device_id:02X}", 

974 fix_hint="Add device to device_mapping configuration or set unknown_policy to 'warn' or 'ignore'.", 

975 ) 

976 elif self.config.unknown_policy == "warn": 

977 logger.warning("Unknown device ID: 0x%02X", device_id) 

978 

979 return None 

980 

981 def resolve_name(self, device_id: int) -> str: 

982 """Resolve device ID to human-readable name. 

983 

984 Args: 

985 device_id: Device ID from packet header. 

986 

987 Returns: 

988 Device name or "Unknown Device 0xXX". 

989 

990 Example: 

991 >>> name = mapper.resolve_name(0x2B) 

992 >>> print(f"Device: {name}") 

993 """ 

994 device = self.get_device(device_id) 

995 if device: 

996 return device.name 

997 return f"Unknown Device 0x{device_id:02X}" 

998 

999 def get_device_name(self, device_id: int) -> str: 

1000 """Get device name from ID. 

1001 

1002 Args: 

1003 device_id: Device ID from packet header. 

1004 

1005 Returns: 

1006 Device name or "Unknown Device 0xXX". 

1007 """ 

1008 return self.resolve_name(device_id) 

1009 

1010 def get_device_info(self, device_id: int) -> dict[str, Any]: 

1011 """Get full device information as dictionary. 

1012 

1013 Args: 

1014 device_id: Device ID from packet header. 

1015 

1016 Returns: 

1017 Device information dictionary. 

1018 """ 

1019 device = self.get_device(device_id) 

1020 if device: 1020 ↛ 1031line 1020 didn't jump to line 1031 because the condition on line 1020 was always true

1021 return { 

1022 "name": device.name, 

1023 "short_name": device.short_name, 

1024 "description": device.description, 

1025 "category": device.category, 

1026 "sample_rate": device.sample_rate, 

1027 "channels": device.channels, 

1028 "properties": device.properties, 

1029 } 

1030 

1031 return { 

1032 "name": f"Unknown Device 0x{device_id:02X}", 

1033 "category": "unknown", 

1034 } 

1035 

1036 

1037def load_binary_packets( 

1038 path: str | PathLike[str], 

1039 format_config: str | PathLike[str] | PacketFormatConfig, 

1040 device_config: str | PathLike[str] | DeviceConfig | None = None, 

1041) -> list[dict[str, Any]]: 

1042 """Load binary packets from file using configuration. 

1043 

1044 Main entry point for loading binary packet data. 

1045 

1046 Args: 

1047 path: Path to binary file. 

1048 format_config: Packet format configuration (path or object). 

1049 device_config: Device mapping configuration (path or object, optional). 

1050 

1051 Returns: 

1052 List of parsed packet dictionaries. 

1053 

1054 Example: 

1055 >>> packets = load_binary_packets( 

1056 ... "capture.bin", 

1057 ... "packet_format.yaml", 

1058 ... "device_mapping.yaml" 

1059 ... ) 

1060 >>> print(f"Loaded {len(packets)} packets") 

1061 >>> print(f"First packet device: {packets[0]['header']['device_id']}") 

1062 """ 

1063 # Load configurations if paths provided 

1064 fmt_cfg: PacketFormatConfig 

1065 if isinstance(format_config, PacketFormatConfig): 

1066 fmt_cfg = format_config 

1067 else: 

1068 fmt_cfg = PacketFormatConfig.from_yaml(format_config) 

1069 

1070 dev_cfg: DeviceConfig | None = None 

1071 if device_config is not None and isinstance(device_config, str | Path): 1071 ↛ 1072line 1071 didn't jump to line 1072 because the condition on line 1071 was never true

1072 dev_cfg = DeviceConfig.from_yaml(device_config) 

1073 elif isinstance(device_config, DeviceConfig): 1073 ↛ 1074line 1073 didn't jump to line 1074 because the condition on line 1073 was never true

1074 dev_cfg = device_config 

1075 

1076 # Create loader and load packets 

1077 loader = ConfigurablePacketLoader(fmt_cfg, dev_cfg) 

1078 return loader.load_packets(path) 

1079 

1080 

1081def load_packets_streaming( 

1082 path: str | PathLike[str], 

1083 format_config: str | PathLike[str] | PacketFormatConfig, 

1084 device_config: str | PathLike[str] | DeviceConfig | None = None, 

1085 chunk_size: int = 1000, 

1086) -> Iterator[dict[str, Any]]: 

1087 """Stream binary packets from file using configuration. 

1088 

1089 Memory-efficient streaming loader for large files. 

1090 

1091 Args: 

1092 path: Path to binary file. 

1093 format_config: Packet format configuration (path or object). 

1094 device_config: Device mapping configuration (path or object, optional). 

1095 chunk_size: Number of packets to buffer. 

1096 

1097 Yields: 

1098 Parsed packet dictionaries. 

1099 

1100 Example: 

1101 >>> for packet in load_packets_streaming("large_capture.bin", "format.yaml"): 

1102 ... process_packet(packet) 

1103 """ 

1104 # Load configurations if paths provided 

1105 fmt_cfg: PacketFormatConfig 

1106 if isinstance(format_config, PacketFormatConfig): 1106 ↛ 1109line 1106 didn't jump to line 1109 because the condition on line 1106 was always true

1107 fmt_cfg = format_config 

1108 else: 

1109 fmt_cfg = PacketFormatConfig.from_yaml(format_config) 

1110 

1111 dev_cfg: DeviceConfig | None = None 

1112 if device_config is not None and isinstance(device_config, str | Path): 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true

1113 dev_cfg = DeviceConfig.from_yaml(device_config) 

1114 elif isinstance(device_config, DeviceConfig): 1114 ↛ 1115line 1114 didn't jump to line 1115 because the condition on line 1114 was never true

1115 dev_cfg = device_config 

1116 

1117 # Create loader and stream packets 

1118 loader = ConfigurablePacketLoader(fmt_cfg, dev_cfg) 

1119 yield from loader.load_packets_streaming(path, chunk_size=chunk_size) 

1120 

1121 

1122def detect_source_type(path: str | PathLike[str]) -> str: 

1123 """Detect binary data source type from file extension or content. 

1124 

1125 Args: 

1126 path: Path to file. 

1127 

1128 Returns: 

1129 Source type ("raw", "pcap", "sigrok", "vcd", "unknown"). 

1130 

1131 Example: 

1132 >>> source_type = detect_source_type("capture.bin") 

1133 >>> print(f"Detected: {source_type}") 

1134 Detected: raw 

1135 """ 

1136 path = Path(path) 

1137 ext = path.suffix.lower() 

1138 

1139 # Extension-based detection 

1140 if ext in (".bin", ".dat", ".raw"): 

1141 return "raw" 

1142 elif ext in (".pcap", ".pcapng"): 

1143 return "pcap" 

1144 elif ext == ".sr": 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true

1145 return "sigrok" 

1146 elif ext == ".vcd": 1146 ↛ 1150line 1146 didn't jump to line 1150 because the condition on line 1146 was always true

1147 return "vcd" 

1148 

1149 # Content-based detection for unknown extensions 

1150 try: 

1151 with open(path, "rb") as f: 

1152 magic = f.read(8) 

1153 

1154 # PCAP magic bytes 

1155 if magic[:4] in (b"\xa1\xb2\xc3\xd4", b"\xd4\xc3\xb2\xa1"): 

1156 return "pcap" 

1157 

1158 # VCD starts with "$" commands 

1159 if magic.startswith(b"$"): 

1160 return "vcd" 

1161 

1162 except Exception: 

1163 pass 

1164 

1165 return "unknown" 

1166 

1167 

1168def extract_channels( 

1169 packets: list[dict[str, Any]], 

1170 channel_map: dict[str, dict[str, Any]], 

1171 sample_rate: float | None = None, 

1172) -> dict[str, DigitalTrace]: 

1173 """Extract individual channels from packet samples. 

1174 

1175 Extracts bit ranges from multi-bit samples to create individual 

1176 channel traces. 

1177 

1178 Args: 

1179 packets: List of parsed packets. 

1180 channel_map: Channel definitions with bit ranges. 

1181 sample_rate: Sample rate in Hz. If None, defaults to 100 MHz 

1182 (typical for high-speed digital). For accurate analysis, 

1183 provide the actual sample rate from your acquisition system. 

1184 

1185 Returns: 

1186 Dictionary mapping channel names to DigitalTrace objects. 

1187 

1188 Raises: 

1189 ConfigurationError: If channel map is invalid. 

1190 

1191 Example: 

1192 >>> channel_map = { 

1193 ... "ch0": {"bits": [0, 7]}, 

1194 ... "ch1": {"bits": [8, 15]}, 

1195 ... } 

1196 >>> traces = extract_channels(packets, channel_map, sample_rate=1e9) 

1197 >>> print(f"Channel 0: {len(traces['ch0'].data)} samples") 

1198 """ 

1199 if not packets: 

1200 raise ConfigurationError( 

1201 "No packets to extract channels from", 

1202 fix_hint="Ensure packets were loaded successfully.", 

1203 ) 

1204 

1205 extractor = BitfieldExtractor() 

1206 channels: dict[str, list[int]] = {name: [] for name in channel_map} 

1207 

1208 # Extract samples for each channel 

1209 for packet in packets: 

1210 for sample in packet["samples"]: 

1211 for ch_name, ch_def in channel_map.items(): 

1212 if "bits" in ch_def: 

1213 bit_range = ch_def["bits"] 

1214 value = extractor.extract_bits(sample, bit_range[0], bit_range[1]) 

1215 channels[ch_name].append(value) 

1216 elif "bit" in ch_def: 1216 ↛ 1211line 1216 didn't jump to line 1211 because the condition on line 1216 was always true

1217 value = extractor.extract_bit(sample, ch_def["bit"]) 

1218 channels[ch_name].append(value) 

1219 

1220 # Use provided sample rate or default to 100 MHz (typical for high-speed digital) 

1221 effective_sample_rate = sample_rate if sample_rate is not None else 100e6 

1222 

1223 # Convert to DigitalTrace objects 

1224 traces = {} 

1225 for ch_name, samples in channels.items(): 

1226 # Convert to boolean array (0/1 -> False/True) 

1227 data = np.array(samples, dtype=np.uint8).astype(np.bool_) 

1228 

1229 # Create metadata with configurable sample rate 

1230 metadata = TraceMetadata( 

1231 sample_rate=effective_sample_rate, 

1232 channel_name=ch_name, 

1233 ) 

1234 

1235 traces[ch_name] = DigitalTrace(data=data, metadata=metadata) 

1236 

1237 return traces 

1238 

1239 

1240__all__ = [ 

1241 "BitfieldDef", 

1242 "BitfieldExtractor", 

1243 "ConfigurablePacketLoader", 

1244 "DeviceConfig", 

1245 "DeviceInfo", 

1246 "DeviceMapper", 

1247 "HeaderFieldDef", 

1248 "PacketFormatConfig", 

1249 "ParsedPacket", 

1250 "SampleFormatDef", 

1251 "detect_source_type", 

1252 "extract_channels", 

1253 "load_binary_packets", 

1254 "load_packets_streaming", 

1255]