Coverage for src / tracekit / loaders / configurable.py: 80%
406 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Configurable binary packet loader with schema-driven parsing.
3This module provides a flexible, configuration-driven system for loading
4binary packet/frame data from custom DAQ systems, logic analyzers, and
5packet captures without code changes.
7Features:
8 - Schema-driven packet format definition
9 - Device/source configuration mapping
10 - Multi-source binary data loader
12Example:
13 >>> from tracekit.loaders.configurable import load_binary_packets
14 >>> packets = load_binary_packets(
15 ... "capture.bin",
16 ... format_config="packet_format.yaml",
17 ... device_config="device_mapping.yaml"
18 ... )
19 >>> traces = extract_channels(packets, {"ch0": {"bits": [0, 7]}})
20 >>> print(f"Loaded {len(traces['ch0'].data)} samples")
21"""
23from __future__ import annotations
25import json
26import logging
27import struct
28from collections.abc import Iterator
29from dataclasses import dataclass, field
30from pathlib import Path
31from typing import TYPE_CHECKING, Any
33import numpy as np
34import yaml
36from tracekit.core.exceptions import ConfigurationError, FormatError, LoaderError
37from tracekit.core.types import DigitalTrace, TraceMetadata
39if TYPE_CHECKING:
40 from os import PathLike
43# Logger for debug output
44logger = logging.getLogger(__name__)
46# Type size mapping in bytes
47TYPE_SIZES = {
48 "uint8": 1,
49 "uint16": 2,
50 "uint32": 4,
51 "uint40": 5,
52 "uint48": 6,
53 "uint64": 8,
54 "int8": 1,
55 "int16": 2,
56 "int32": 4,
57 "int64": 8,
58 "float32": 4,
59 "float64": 8,
60}
62# Type alias for parsed packet data
63ParsedPacket = dict[str, Any]
64"""Type alias for a parsed packet dictionary with header and samples."""
67@dataclass
68class BitfieldDef:
69 """Bitfield definition within a header field.
71 Attributes:
72 name: Bitfield name.
73 bit: Single bit position (if single-bit field).
74 bits: Bit range [start, end] inclusive (if multi-bit field).
75 description: Human-readable description (optional).
76 """
78 name: str
79 bit: int | None = None
80 bits: tuple[int, int] | None = None
81 description: str = ""
83 def __post_init__(self) -> None:
84 """Validate bitfield definition."""
85 if self.bit is None and self.bits is None: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 raise ConfigurationError(
87 "BitfieldDef must have either 'bit' or 'bits' specified",
88 config_key=f"{self.name}",
89 )
90 if self.bit is not None and self.bits is not None: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 raise ConfigurationError(
92 "BitfieldDef cannot have both 'bit' and 'bits' specified",
93 config_key=f"{self.name}",
94 )
97@dataclass
98class DeviceInfo:
99 """Device information from configuration.
101 Attributes:
102 name: Full device name.
103 short_name: Short device name (optional).
104 description: Device description (optional).
105 category: Device category (optional).
106 sample_rate: Sample rate in Hz (optional).
107 channels: Number of channels (optional).
108 properties: Additional device properties (optional).
109 """
111 name: str
112 short_name: str = ""
113 description: str = ""
114 category: str = ""
115 sample_rate: float | None = None
116 channels: int | None = None
117 properties: dict[str, Any] = field(default_factory=dict)
119 @classmethod
120 def from_dict(cls, data: dict[str, Any]) -> DeviceInfo:
121 """Create DeviceInfo from dictionary.
123 Args:
124 data: Device configuration dictionary.
126 Returns:
127 DeviceInfo instance.
128 """
129 return cls(
130 name=data.get("name", "Unknown Device"),
131 short_name=data.get("short_name", ""),
132 description=data.get("description", ""),
133 category=data.get("category", ""),
134 sample_rate=data.get("sample_rate"),
135 channels=data.get("channels"),
136 properties=data.get("properties", {}),
137 )
140@dataclass
141class HeaderFieldDef:
142 """Header field definition.
144 Defines a single field within a packet header including offset,
145 size, type, and endianness.
147 Attributes:
148 name: Field name.
149 offset: Byte offset from start of packet.
150 size: Field size in bytes.
151 type: Data type (uint8, uint16, uint32, uint40, uint48, uint64, bitfield, bytes).
152 endian: Byte order ("big", "little", or "native").
153 value: Expected constant value for validation (optional).
154 fields: Bitfield definitions if type is "bitfield" (optional).
155 description: Human-readable description (optional).
156 """
158 name: str
159 offset: int
160 size: int
161 type: str
162 endian: str = "big"
163 value: int | bytes | None = None
164 fields: dict[str, Any] | None = None
165 description: str = ""
167 def __post_init__(self) -> None:
168 """Validate field definition."""
169 if self.offset < 0:
170 raise ConfigurationError(
171 "Field offset must be non-negative",
172 config_key=f"{self.name}.offset",
173 actual_value=self.offset,
174 )
175 if self.size <= 0:
176 raise ConfigurationError(
177 "Field size must be positive",
178 config_key=f"{self.name}.size",
179 actual_value=self.size,
180 )
181 if self.endian not in ("big", "little", "native"):
182 raise ConfigurationError(
183 "Invalid endianness",
184 config_key=f"{self.name}.endian",
185 expected_type="'big', 'little', or 'native'",
186 actual_value=self.endian,
187 )
190@dataclass
191class SampleFormatDef:
192 """Sample data format definition.
194 Defines how to extract sample data from packets.
196 Attributes:
197 size: Bytes per sample.
198 type: Data type (uint8, uint16, uint32, uint64).
199 endian: Byte order ("big", "little", or "native").
200 description: Human-readable description (optional).
201 """
203 size: int
204 type: str
205 endian: str = "little"
206 description: str = ""
208 def __post_init__(self) -> None:
209 """Validate sample format."""
210 if self.size <= 0:
211 raise ConfigurationError(
212 "Sample size must be positive",
213 config_key="samples.format.size",
214 actual_value=self.size,
215 )
218@dataclass
219class PacketFormatConfig:
220 """Packet format configuration.
222 Complete packet format specification loaded from YAML/JSON.
224 Attributes:
225 name: Format name.
226 version: Format version.
227 packet_size: Total packet size in bytes (or "variable").
228 byte_order: Default byte order ("big", "little", "native").
229 length_field: Header field name containing packet length (for variable-length packets).
230 length_includes_header: Whether length field includes header size (default True).
231 header_size: Header size in bytes.
232 header_fields: List of header field definitions.
233 sample_offset: Offset where samples begin.
234 sample_count: Number of samples per packet.
235 sample_format: Sample format definition.
236 channel_extraction: Channel extraction configuration (optional).
237 validation: Validation rules (optional).
238 description: Human-readable description (optional).
239 """
241 name: str
242 version: str
243 packet_size: int | str
244 byte_order: str
245 length_field: str | None = None
246 length_includes_header: bool = True
247 header_size: int = 0
248 header_fields: list[HeaderFieldDef] = field(default_factory=list)
249 sample_offset: int = 0
250 sample_count: int = 0
251 sample_format: SampleFormatDef | None = None
252 channel_extraction: dict[str, Any] | None = None
253 validation: dict[str, Any] | None = None
254 description: str = ""
256 @classmethod
257 def from_file(cls, path: str | PathLike[str]) -> PacketFormatConfig:
258 """Load packet format from YAML or JSON file.
260 Automatically detects file format based on extension.
262 Args:
263 path: Path to configuration file (.yaml, .yml, or .json).
265 Returns:
266 PacketFormatConfig instance.
268 Example:
269 >>> config = PacketFormatConfig.from_file("packet_format.yaml")
270 >>> print(f"Loaded format: {config.name} v{config.version}")
271 """
272 path = Path(path)
273 ext = path.suffix.lower()
275 if ext in (".yaml", ".yml"): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true
276 return cls.from_yaml(path)
277 elif ext == ".json":
278 return cls.from_json(path)
279 else:
280 # Try YAML by default
281 logger.warning("Unknown file extension '%s', attempting YAML parsing", ext)
282 return cls.from_yaml(path)
284 @classmethod
285 def from_dict(cls, config: dict[str, Any]) -> PacketFormatConfig:
286 """Load packet format from dictionary.
288 Args:
289 config: Configuration dictionary.
291 Returns:
292 PacketFormatConfig instance.
294 Raises:
295 ConfigurationError: If configuration is invalid.
297 Example:
298 >>> config_dict = {
299 ... "name": "my_format",
300 ... "version": "1.0",
301 ... "packet": {"size": 1024, "byte_order": "big"},
302 ... "header": {"size": 16, "fields": []},
303 ... "samples": {"offset": 16, "count": 126, "format": {"size": 8, "type": "uint64"}}
304 ... }
305 >>> config = PacketFormatConfig.from_dict(config_dict)
306 """
307 # Validate required fields
308 required = ["name", "version", "packet", "header", "samples"]
309 missing = [key for key in required if key not in config]
310 if missing:
311 raise ConfigurationError(
312 f"Missing required configuration keys: {', '.join(missing)}",
313 fix_hint="Ensure configuration has all required sections.",
314 )
316 # Parse packet configuration
317 packet_cfg = config["packet"]
318 packet_size = packet_cfg.get("size", "variable")
319 byte_order = packet_cfg.get("byte_order", "big")
320 length_field = packet_cfg.get("length_field")
321 length_includes_header = packet_cfg.get("length_includes_header", True)
323 # Parse header configuration
324 header_cfg = config["header"]
325 header_size = header_cfg["size"]
326 header_fields = []
327 for field_cfg in header_cfg.get("fields", []):
328 header_fields.append(
329 HeaderFieldDef(
330 name=field_cfg["name"],
331 offset=field_cfg["offset"],
332 size=field_cfg["size"],
333 type=field_cfg["type"],
334 endian=field_cfg.get("endian", byte_order),
335 value=field_cfg.get("value"),
336 fields=field_cfg.get("fields"),
337 description=field_cfg.get("description", ""),
338 )
339 )
341 # Parse samples configuration
342 samples_cfg = config["samples"]
343 sample_offset = samples_cfg["offset"]
344 sample_count = samples_cfg["count"]
345 sample_format = SampleFormatDef(
346 size=samples_cfg["format"]["size"],
347 type=samples_cfg["format"]["type"],
348 endian=samples_cfg["format"].get("endian", "little"),
349 description=samples_cfg["format"].get("description", ""),
350 )
352 # Optional configurations
353 channel_extraction = samples_cfg.get("channel_extraction")
354 validation = config.get("validation")
356 return cls(
357 name=config["name"],
358 version=config["version"],
359 packet_size=packet_size,
360 byte_order=byte_order,
361 length_field=length_field,
362 length_includes_header=length_includes_header,
363 header_size=header_size,
364 header_fields=header_fields,
365 sample_offset=sample_offset,
366 sample_count=sample_count,
367 sample_format=sample_format,
368 channel_extraction=channel_extraction,
369 validation=validation,
370 description=config.get("description", ""),
371 )
373 @classmethod
374 def from_yaml(cls, path: str | PathLike[str]) -> PacketFormatConfig:
375 """Load packet format from YAML file.
377 Args:
378 path: Path to YAML configuration file.
380 Returns:
381 PacketFormatConfig instance.
383 Raises:
384 LoaderError: If file cannot be read or configuration is invalid.
385 """
386 path = Path(path)
387 if not path.exists():
388 raise LoaderError(
389 "Configuration file not found",
390 file_path=str(path),
391 )
393 try:
394 with open(path, encoding="utf-8") as f:
395 config = yaml.safe_load(f)
396 except Exception as e:
397 raise LoaderError(
398 "Failed to load configuration file",
399 file_path=str(path),
400 details=str(e),
401 ) from e
403 return cls.from_dict(config)
405 @classmethod
406 def from_json(cls, path: str | PathLike[str]) -> PacketFormatConfig:
407 """Load packet format from JSON file.
409 Args:
410 path: Path to JSON configuration file.
412 Returns:
413 PacketFormatConfig instance.
415 Raises:
416 LoaderError: If file cannot be read or configuration is invalid.
417 """
418 path = Path(path)
419 if not path.exists():
420 raise LoaderError(
421 "Configuration file not found",
422 file_path=str(path),
423 )
425 try:
426 with open(path, encoding="utf-8") as f:
427 config = json.load(f)
428 except Exception as e:
429 raise LoaderError(
430 "Failed to load JSON configuration file",
431 file_path=str(path),
432 details=str(e),
433 ) from e
435 return cls.from_dict(config)
438@dataclass
439class DeviceConfig:
440 """Device configuration mapping.
442 Maps device IDs to names and parameters.
444 Attributes:
445 devices: Dictionary mapping device ID to device info.
446 categories: Category definitions (optional).
447 channels: Channel configuration (optional).
448 unknown_policy: How to handle unknown devices ("error", "warn", "ignore").
449 """
451 devices: dict[int, dict[str, Any]]
452 categories: dict[str, Any] = field(default_factory=dict)
453 channels: dict[int, Any] = field(default_factory=dict)
454 unknown_policy: str = "warn"
456 @classmethod
457 def from_yaml(cls, path: str | PathLike[str]) -> DeviceConfig:
458 """Load device configuration from YAML file.
460 Args:
461 path: Path to YAML configuration file.
463 Returns:
464 DeviceConfig instance.
466 Raises:
467 LoaderError: If file cannot be read or configuration is invalid.
468 """
469 path = Path(path)
470 if not path.exists(): 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 raise LoaderError(
472 "Device configuration file not found",
473 file_path=str(path),
474 )
476 try:
477 with open(path, encoding="utf-8") as f:
478 config = yaml.safe_load(f)
479 except Exception as e:
480 raise LoaderError(
481 "Failed to load device configuration",
482 file_path=str(path),
483 details=str(e),
484 ) from e
486 # Parse device mappings
487 devices = {}
488 for dev_id_str, dev_info in config.get("devices", {}).items():
489 # Convert hex or decimal string to int
490 if isinstance(dev_id_str, str):
491 dev_id = int(dev_id_str, 16 if dev_id_str.startswith("0x") else 10)
492 else:
493 dev_id = int(dev_id_str)
494 devices[dev_id] = dev_info
496 categories = config.get("categories", {})
497 channels = config.get("channels", {})
498 unknown_policy = config.get("unknown_device", {}).get("policy", "warn")
500 return cls(
501 devices=devices,
502 categories=categories,
503 channels=channels,
504 unknown_policy=unknown_policy,
505 )
508class BitfieldExtractor:
509 """Extract individual bits or bit ranges from integer values.
511 Supports extracting single bits or bit ranges from multi-byte fields.
513 Example:
514 >>> extractor = BitfieldExtractor()
515 >>> value = 0b1010_1100
516 >>> extractor.extract_bit(value, 7) # Most significant bit
517 1
518 >>> extractor.extract_bits(value, 4, 7) # Upper nibble
519 10
520 """
522 @staticmethod
523 def extract_bit(value: int, bit: int) -> int:
524 """Extract a single bit.
526 Args:
527 value: Integer value.
528 bit: Bit position (0 = LSB).
530 Returns:
531 0 or 1.
532 """
533 return (value >> bit) & 1
535 @staticmethod
536 def extract_bits(value: int, start_bit: int, end_bit: int) -> int:
537 """Extract a range of bits.
539 Args:
540 value: Integer value.
541 start_bit: Starting bit position (inclusive).
542 end_bit: Ending bit position (inclusive).
544 Returns:
545 Extracted value.
546 """
547 num_bits = end_bit - start_bit + 1
548 mask = (1 << num_bits) - 1
549 return (value >> start_bit) & mask
552@dataclass
553class PacketLoadResult:
554 """Result of packet loading operation.
556 Attributes:
557 packets: List of loaded packets.
558 packet_count: Number of packets loaded.
559 """
561 packets: list[dict[str, Any]]
563 @property
564 def packet_count(self) -> int:
565 """Number of packets loaded."""
566 return len(self.packets)
569class ConfigurablePacketLoader:
570 """Load binary packets using configuration-driven parsing.
572 Parses binary files according to packet format configuration,
573 extracting headers and sample data.
575 Attributes:
576 format_config: Packet format configuration.
577 device_config: Device mapping configuration (optional).
578 """
580 def __init__(
581 self,
582 format_config: PacketFormatConfig,
583 device_config: DeviceConfig | None = None,
584 ) -> None:
585 """Initialize configurable packet loader.
587 Args:
588 format_config: Packet format configuration.
589 device_config: Device mapping configuration (optional).
590 """
591 self.format_config = format_config
592 self.device_config = device_config
593 self.bitfield_extractor = BitfieldExtractor()
595 def load_packets(self, path: str | PathLike[str]) -> list[dict[str, Any]]:
596 """Load and parse all packets from binary file.
598 Args:
599 path: Path to binary file.
601 Returns:
602 List of parsed packet dictionaries.
604 Raises:
605 LoaderError: If file cannot be read.
606 """
607 path = Path(path)
608 if not path.exists(): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 raise LoaderError(
610 "Binary file not found",
611 file_path=str(path),
612 )
614 packets = []
615 for packet in self.load_packets_streaming(path):
616 packets.append(packet)
618 logger.info("Loaded %d packets from %s", len(packets), path)
619 return packets
621 def load_packets_streaming(
622 self, path: str | PathLike[str], chunk_size: int = 1000
623 ) -> Iterator[dict[str, Any]]:
624 """Stream packets from binary file.
626 Args:
627 path: Path to binary file.
628 chunk_size: Number of packets to buffer (for progress tracking).
630 Yields:
631 Parsed packet dictionaries.
633 Raises:
634 ConfigurationError: If packet configuration is invalid.
635 LoaderError: If file cannot be read.
636 FormatError: If packet parsing fails.
637 """
638 path = Path(path)
640 # Check if packets are variable-length
641 is_variable_length = (
642 isinstance(self.format_config.packet_size, str)
643 and self.format_config.packet_size == "variable"
644 )
646 if is_variable_length:
647 # Validate configuration for variable-length packets
648 if not self.format_config.length_field:
649 raise ConfigurationError(
650 "Variable-length packets require 'length_field' in packet configuration",
651 config_key="packet.length_field",
652 fix_hint="Specify which header field contains the packet length",
653 )
655 # Determine fixed packet size (if not variable)
656 fixed_packet_size = None
657 if not is_variable_length:
658 if isinstance(self.format_config.packet_size, str): 658 ↛ 659line 658 didn't jump to line 659 because the condition on line 658 was never true
659 fixed_packet_size = int(self.format_config.packet_size)
660 else:
661 fixed_packet_size = self.format_config.packet_size
663 try:
664 with open(path, "rb") as f:
665 packet_index = 0
666 while True:
667 if is_variable_length:
668 # Read header first to determine packet size
669 header_data = f.read(self.format_config.header_size)
670 if not header_data:
671 break
673 if len(header_data) < self.format_config.header_size: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 logger.warning(
675 "Incomplete header at end of file (packet %d): got %d bytes, expected %d",
676 packet_index,
677 len(header_data),
678 self.format_config.header_size,
679 )
680 break
682 # Parse header to get length field
683 header_dict = {}
684 for field_def in self.format_config.header_fields:
685 value = self._extract_field(header_data, field_def)
686 header_dict[field_def.name] = value
688 # Get packet length from header
689 if self.format_config.length_field not in header_dict: 689 ↛ 690line 689 didn't jump to line 690 because the condition on line 689 was never true
690 raise FormatError(
691 f"Length field '{self.format_config.length_field}' not found in header (packet {packet_index})"
692 )
694 packet_length = header_dict[self.format_config.length_field]
696 # Calculate payload size
697 if self.format_config.length_includes_header: 697 ↛ 700line 697 didn't jump to line 700 because the condition on line 697 was always true
698 payload_size = packet_length - self.format_config.header_size
699 else:
700 payload_size = packet_length
702 # Read remaining packet data
703 payload_data = f.read(payload_size)
704 if len(payload_data) < payload_size:
705 logger.warning(
706 "Incomplete payload at end of file (packet %d): got %d bytes, expected %d",
707 packet_index,
708 len(payload_data),
709 payload_size,
710 )
711 break
713 # Combine header and payload
714 packet_data = header_data + payload_data
715 else:
716 # Fixed-length packets
717 assert fixed_packet_size is not None
718 packet_data = f.read(fixed_packet_size)
719 if not packet_data:
720 break
722 if len(packet_data) < fixed_packet_size:
723 logger.warning(
724 "Incomplete packet at end of file (packet %d): got %d bytes, expected %d",
725 packet_index,
726 len(packet_data),
727 fixed_packet_size,
728 )
729 break
731 try:
732 packet = self._parse_packet(packet_data, packet_index)
733 yield packet
734 packet_index += 1
735 except FormatError:
736 logger.exception("Failed to parse packet %d", packet_index)
737 raise
739 except OSError as e:
740 raise LoaderError(
741 "Failed to read binary file",
742 file_path=str(path),
743 details=str(e),
744 ) from e
746 def _parse_packet(self, packet_data: bytes, packet_index: int) -> dict[str, Any]:
747 """Parse a single packet.
749 Args:
750 packet_data: Raw packet bytes.
751 packet_index: Packet index in file.
753 Returns:
754 Parsed packet dictionary with header and samples.
755 """
756 packet: dict[str, Any] = {
757 "index": packet_index,
758 "header": {},
759 "samples": [],
760 }
762 # Parse header fields
763 for field_def in self.format_config.header_fields:
764 value = self._extract_field(packet_data, field_def)
765 packet["header"][field_def.name] = value
767 # Parse samples
768 assert self.format_config.sample_format is not None
769 sample_offset = self.format_config.sample_offset
770 sample_count = self.format_config.sample_count
771 sample_size = self.format_config.sample_format.size
773 for i in range(sample_count):
774 offset = sample_offset + (i * sample_size)
775 if offset + sample_size > len(packet_data):
776 logger.warning("Sample %d exceeds packet bounds (packet %d)", i, packet_index)
777 break
779 sample_bytes = packet_data[offset : offset + sample_size]
780 sample_value = self._parse_sample(sample_bytes)
781 packet["samples"].append(sample_value)
783 return packet
785 def _extract_field(self, packet_data: bytes, field_def: HeaderFieldDef) -> Any:
786 """Extract a header field value.
788 Args:
789 packet_data: Raw packet bytes.
790 field_def: Field definition.
792 Returns:
793 Extracted field value.
795 Raises:
796 ConfigurationError: If field type is unsupported.
797 FormatError: If field cannot be extracted.
798 """
799 offset = field_def.offset
800 size = field_def.size
802 if offset + size > len(packet_data): 802 ↛ 803line 802 didn't jump to line 803 because the condition on line 802 was never true
803 raise FormatError(
804 f"Field '{field_def.name}' exceeds packet bounds",
805 expected=f"{offset + size} bytes",
806 got=f"{len(packet_data)} bytes",
807 )
809 field_bytes = packet_data[offset : offset + size]
811 # Handle different field types
812 if field_def.type == "bytes": 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true
813 return field_bytes
814 elif field_def.type == "bitfield":
815 # Parse as integer first, then extract bitfields
816 value = self._bytes_to_int(field_bytes, field_def.endian, signed=False)
817 if field_def.fields: 817 ↛ 830line 817 didn't jump to line 830 because the condition on line 817 was always true
818 bitfields = {}
819 for bf_name, bf_def in field_def.fields.items():
820 if "bit" in bf_def:
821 bitfields[bf_name] = self.bitfield_extractor.extract_bit(
822 value, bf_def["bit"]
823 )
824 elif "bits" in bf_def: 824 ↛ 819line 824 didn't jump to line 819 because the condition on line 824 was always true
825 bit_range = bf_def["bits"]
826 bitfields[bf_name] = self.bitfield_extractor.extract_bits(
827 value, bit_range[0], bit_range[1]
828 )
829 return bitfields
830 return value
831 elif field_def.type.startswith("uint"): 831 ↛ 833line 831 didn't jump to line 833 because the condition on line 831 was always true
832 return self._bytes_to_int(field_bytes, field_def.endian, signed=False)
833 elif field_def.type.startswith("int"):
834 return self._bytes_to_int(field_bytes, field_def.endian, signed=True)
835 elif field_def.type == "float32":
836 endian_char = "<" if field_def.endian == "little" else ">"
837 return struct.unpack(f"{endian_char}f", field_bytes)[0]
838 elif field_def.type == "float64":
839 endian_char = "<" if field_def.endian == "little" else ">"
840 return struct.unpack(f"{endian_char}d", field_bytes)[0]
841 else:
842 raise ConfigurationError(
843 f"Unsupported field type: {field_def.type}",
844 config_key=f"{field_def.name}.type",
845 )
847 def _bytes_to_int(self, data: bytes, endian: str, signed: bool) -> int:
848 """Convert bytes to integer with specified endianness.
850 Args:
851 data: Byte data.
852 endian: Byte order ("big", "little", or "native").
853 signed: Whether to interpret as signed integer.
855 Returns:
856 Integer value.
857 """
858 from typing import Literal
860 byte_order_str = endian if endian != "native" else "little"
861 # Type assertion for mypy - we validate endian in __post_init__
862 byte_order: Literal["little", "big"] = byte_order_str # type: ignore[assignment]
863 return int.from_bytes(data, byteorder=byte_order, signed=signed)
865 def _parse_sample(self, sample_bytes: bytes) -> int:
866 """Parse a sample value.
868 Args:
869 sample_bytes: Raw sample bytes.
871 Returns:
872 Sample value as integer.
873 """
874 assert self.format_config.sample_format is not None
875 return self._bytes_to_int(
876 sample_bytes, self.format_config.sample_format.endian, signed=False
877 )
879 def load(self, path: str | PathLike[str]) -> PacketLoadResult:
880 """Load packets and return result object (test-compatible API).
882 Args:
883 path: Path to binary file.
885 Returns:
886 PacketLoadResult with loaded packets.
887 """
888 packets = self.load_packets(path)
889 return PacketLoadResult(packets=packets)
891 def stream(
892 self, path: str | PathLike[str], chunk_size: int = 1000
893 ) -> Iterator[PacketLoadResult]:
894 """Stream packets in chunks (test-compatible API).
896 Args:
897 path: Path to binary file.
898 chunk_size: Number of packets per chunk.
900 Yields:
901 PacketLoadResult objects with packet chunks.
902 """
903 chunk = []
904 for packet in self.load_packets_streaming(path, chunk_size):
905 chunk.append(packet)
906 if len(chunk) >= chunk_size:
907 yield PacketLoadResult(packets=chunk)
908 chunk = []
910 # Yield remaining packets
911 if chunk:
912 yield PacketLoadResult(packets=chunk)
915class DeviceMapper:
916 """Map device IDs to names and metadata.
918 Provides human-readable names and configuration for devices
919 identified in packet headers.
921 Attributes:
922 config: Device configuration.
923 """
925 def __init__(self, config: DeviceConfig) -> None:
926 """Initialize device mapper.
928 Args:
929 config: Device configuration.
930 """
931 self.config = config
933 @classmethod
934 def from_file(cls, path: str | PathLike[str]) -> DeviceMapper:
935 """Create DeviceMapper from configuration file.
937 Args:
938 path: Path to device configuration file.
940 Returns:
941 DeviceMapper instance.
943 Example:
944 >>> mapper = DeviceMapper.from_file("device_mapping.yaml")
945 >>> device_name = mapper.get_device_name(0x2B)
946 """
947 config = DeviceConfig.from_yaml(path)
948 return cls(config)
950 def get_device(self, device_id: int) -> DeviceInfo | None:
951 """Get device information object.
953 Args:
954 device_id: Device ID from packet header.
956 Returns:
957 DeviceInfo object or None if device not found.
959 Raises:
960 ConfigurationError: If device ID is unknown and unknown_policy is 'error'.
962 Example:
963 >>> device = mapper.get_device(0x2B)
964 >>> if device:
965 ... print(f"{device.name}: {device.sample_rate} Hz")
966 """
967 if device_id in self.config.devices:
968 return DeviceInfo.from_dict(self.config.devices[device_id])
970 # Handle unknown device
971 if self.config.unknown_policy == "error":
972 raise ConfigurationError(
973 f"Unknown device ID: 0x{device_id:02X}",
974 fix_hint="Add device to device_mapping configuration or set unknown_policy to 'warn' or 'ignore'.",
975 )
976 elif self.config.unknown_policy == "warn":
977 logger.warning("Unknown device ID: 0x%02X", device_id)
979 return None
981 def resolve_name(self, device_id: int) -> str:
982 """Resolve device ID to human-readable name.
984 Args:
985 device_id: Device ID from packet header.
987 Returns:
988 Device name or "Unknown Device 0xXX".
990 Example:
991 >>> name = mapper.resolve_name(0x2B)
992 >>> print(f"Device: {name}")
993 """
994 device = self.get_device(device_id)
995 if device:
996 return device.name
997 return f"Unknown Device 0x{device_id:02X}"
999 def get_device_name(self, device_id: int) -> str:
1000 """Get device name from ID.
1002 Args:
1003 device_id: Device ID from packet header.
1005 Returns:
1006 Device name or "Unknown Device 0xXX".
1007 """
1008 return self.resolve_name(device_id)
1010 def get_device_info(self, device_id: int) -> dict[str, Any]:
1011 """Get full device information as dictionary.
1013 Args:
1014 device_id: Device ID from packet header.
1016 Returns:
1017 Device information dictionary.
1018 """
1019 device = self.get_device(device_id)
1020 if device: 1020 ↛ 1031line 1020 didn't jump to line 1031 because the condition on line 1020 was always true
1021 return {
1022 "name": device.name,
1023 "short_name": device.short_name,
1024 "description": device.description,
1025 "category": device.category,
1026 "sample_rate": device.sample_rate,
1027 "channels": device.channels,
1028 "properties": device.properties,
1029 }
1031 return {
1032 "name": f"Unknown Device 0x{device_id:02X}",
1033 "category": "unknown",
1034 }
1037def load_binary_packets(
1038 path: str | PathLike[str],
1039 format_config: str | PathLike[str] | PacketFormatConfig,
1040 device_config: str | PathLike[str] | DeviceConfig | None = None,
1041) -> list[dict[str, Any]]:
1042 """Load binary packets from file using configuration.
1044 Main entry point for loading binary packet data.
1046 Args:
1047 path: Path to binary file.
1048 format_config: Packet format configuration (path or object).
1049 device_config: Device mapping configuration (path or object, optional).
1051 Returns:
1052 List of parsed packet dictionaries.
1054 Example:
1055 >>> packets = load_binary_packets(
1056 ... "capture.bin",
1057 ... "packet_format.yaml",
1058 ... "device_mapping.yaml"
1059 ... )
1060 >>> print(f"Loaded {len(packets)} packets")
1061 >>> print(f"First packet device: {packets[0]['header']['device_id']}")
1062 """
1063 # Load configurations if paths provided
1064 fmt_cfg: PacketFormatConfig
1065 if isinstance(format_config, PacketFormatConfig):
1066 fmt_cfg = format_config
1067 else:
1068 fmt_cfg = PacketFormatConfig.from_yaml(format_config)
1070 dev_cfg: DeviceConfig | None = None
1071 if device_config is not None and isinstance(device_config, str | Path): 1071 ↛ 1072line 1071 didn't jump to line 1072 because the condition on line 1071 was never true
1072 dev_cfg = DeviceConfig.from_yaml(device_config)
1073 elif isinstance(device_config, DeviceConfig): 1073 ↛ 1074line 1073 didn't jump to line 1074 because the condition on line 1073 was never true
1074 dev_cfg = device_config
1076 # Create loader and load packets
1077 loader = ConfigurablePacketLoader(fmt_cfg, dev_cfg)
1078 return loader.load_packets(path)
1081def load_packets_streaming(
1082 path: str | PathLike[str],
1083 format_config: str | PathLike[str] | PacketFormatConfig,
1084 device_config: str | PathLike[str] | DeviceConfig | None = None,
1085 chunk_size: int = 1000,
1086) -> Iterator[dict[str, Any]]:
1087 """Stream binary packets from file using configuration.
1089 Memory-efficient streaming loader for large files.
1091 Args:
1092 path: Path to binary file.
1093 format_config: Packet format configuration (path or object).
1094 device_config: Device mapping configuration (path or object, optional).
1095 chunk_size: Number of packets to buffer.
1097 Yields:
1098 Parsed packet dictionaries.
1100 Example:
1101 >>> for packet in load_packets_streaming("large_capture.bin", "format.yaml"):
1102 ... process_packet(packet)
1103 """
1104 # Load configurations if paths provided
1105 fmt_cfg: PacketFormatConfig
1106 if isinstance(format_config, PacketFormatConfig): 1106 ↛ 1109line 1106 didn't jump to line 1109 because the condition on line 1106 was always true
1107 fmt_cfg = format_config
1108 else:
1109 fmt_cfg = PacketFormatConfig.from_yaml(format_config)
1111 dev_cfg: DeviceConfig | None = None
1112 if device_config is not None and isinstance(device_config, str | Path): 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true
1113 dev_cfg = DeviceConfig.from_yaml(device_config)
1114 elif isinstance(device_config, DeviceConfig): 1114 ↛ 1115line 1114 didn't jump to line 1115 because the condition on line 1114 was never true
1115 dev_cfg = device_config
1117 # Create loader and stream packets
1118 loader = ConfigurablePacketLoader(fmt_cfg, dev_cfg)
1119 yield from loader.load_packets_streaming(path, chunk_size=chunk_size)
1122def detect_source_type(path: str | PathLike[str]) -> str:
1123 """Detect binary data source type from file extension or content.
1125 Args:
1126 path: Path to file.
1128 Returns:
1129 Source type ("raw", "pcap", "sigrok", "vcd", "unknown").
1131 Example:
1132 >>> source_type = detect_source_type("capture.bin")
1133 >>> print(f"Detected: {source_type}")
1134 Detected: raw
1135 """
1136 path = Path(path)
1137 ext = path.suffix.lower()
1139 # Extension-based detection
1140 if ext in (".bin", ".dat", ".raw"):
1141 return "raw"
1142 elif ext in (".pcap", ".pcapng"):
1143 return "pcap"
1144 elif ext == ".sr": 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true
1145 return "sigrok"
1146 elif ext == ".vcd": 1146 ↛ 1150line 1146 didn't jump to line 1150 because the condition on line 1146 was always true
1147 return "vcd"
1149 # Content-based detection for unknown extensions
1150 try:
1151 with open(path, "rb") as f:
1152 magic = f.read(8)
1154 # PCAP magic bytes
1155 if magic[:4] in (b"\xa1\xb2\xc3\xd4", b"\xd4\xc3\xb2\xa1"):
1156 return "pcap"
1158 # VCD starts with "$" commands
1159 if magic.startswith(b"$"):
1160 return "vcd"
1162 except Exception:
1163 pass
1165 return "unknown"
1168def extract_channels(
1169 packets: list[dict[str, Any]],
1170 channel_map: dict[str, dict[str, Any]],
1171 sample_rate: float | None = None,
1172) -> dict[str, DigitalTrace]:
1173 """Extract individual channels from packet samples.
1175 Extracts bit ranges from multi-bit samples to create individual
1176 channel traces.
1178 Args:
1179 packets: List of parsed packets.
1180 channel_map: Channel definitions with bit ranges.
1181 sample_rate: Sample rate in Hz. If None, defaults to 100 MHz
1182 (typical for high-speed digital). For accurate analysis,
1183 provide the actual sample rate from your acquisition system.
1185 Returns:
1186 Dictionary mapping channel names to DigitalTrace objects.
1188 Raises:
1189 ConfigurationError: If channel map is invalid.
1191 Example:
1192 >>> channel_map = {
1193 ... "ch0": {"bits": [0, 7]},
1194 ... "ch1": {"bits": [8, 15]},
1195 ... }
1196 >>> traces = extract_channels(packets, channel_map, sample_rate=1e9)
1197 >>> print(f"Channel 0: {len(traces['ch0'].data)} samples")
1198 """
1199 if not packets:
1200 raise ConfigurationError(
1201 "No packets to extract channels from",
1202 fix_hint="Ensure packets were loaded successfully.",
1203 )
1205 extractor = BitfieldExtractor()
1206 channels: dict[str, list[int]] = {name: [] for name in channel_map}
1208 # Extract samples for each channel
1209 for packet in packets:
1210 for sample in packet["samples"]:
1211 for ch_name, ch_def in channel_map.items():
1212 if "bits" in ch_def:
1213 bit_range = ch_def["bits"]
1214 value = extractor.extract_bits(sample, bit_range[0], bit_range[1])
1215 channels[ch_name].append(value)
1216 elif "bit" in ch_def: 1216 ↛ 1211line 1216 didn't jump to line 1211 because the condition on line 1216 was always true
1217 value = extractor.extract_bit(sample, ch_def["bit"])
1218 channels[ch_name].append(value)
1220 # Use provided sample rate or default to 100 MHz (typical for high-speed digital)
1221 effective_sample_rate = sample_rate if sample_rate is not None else 100e6
1223 # Convert to DigitalTrace objects
1224 traces = {}
1225 for ch_name, samples in channels.items():
1226 # Convert to boolean array (0/1 -> False/True)
1227 data = np.array(samples, dtype=np.uint8).astype(np.bool_)
1229 # Create metadata with configurable sample rate
1230 metadata = TraceMetadata(
1231 sample_rate=effective_sample_rate,
1232 channel_name=ch_name,
1233 )
1235 traces[ch_name] = DigitalTrace(data=data, metadata=metadata)
1237 return traces
1240__all__ = [
1241 "BitfieldDef",
1242 "BitfieldExtractor",
1243 "ConfigurablePacketLoader",
1244 "DeviceConfig",
1245 "DeviceInfo",
1246 "DeviceMapper",
1247 "HeaderFieldDef",
1248 "PacketFormatConfig",
1249 "ParsedPacket",
1250 "SampleFormatDef",
1251 "detect_source_type",
1252 "extract_channels",
1253 "load_binary_packets",
1254 "load_packets_streaming",
1255]