Coverage for src / tracekit / loaders / sigrok.py: 94%
126 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Sigrok session file (.sr) loader.
3This module provides loading of sigrok session files containing
4logic analyzer captures. Sigrok sessions are ZIP archives containing
5metadata and binary signal data.
8Example:
9 >>> from tracekit.loaders.sigrok import load_sigrok
10 >>> trace = load_sigrok("capture.sr")
11 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz")
12 >>> print(f"Channels: {len(trace.data)}")
13"""
15from __future__ import annotations
17import zipfile
18from pathlib import Path
19from typing import TYPE_CHECKING, Any
21import numpy as np
22from numpy.typing import NDArray
24from tracekit.core.exceptions import FormatError, LoaderError
25from tracekit.core.types import DigitalTrace, TraceMetadata
27if TYPE_CHECKING:
28 from os import PathLike
31def load_sigrok(
32 path: str | PathLike[str],
33 *,
34 channel: str | int | None = None,
35) -> DigitalTrace:
36 """Load a sigrok session file (.sr).
38 Sigrok session files are ZIP archives containing:
39 - metadata: JSON file with capture settings
40 - logic-1-*: Binary files with sample data
42 Args:
43 path: Path to the sigrok .sr session file.
44 channel: Optional channel name or index to load. If None,
45 loads the first channel or merges all channels.
47 Returns:
48 DigitalTrace containing the digital signal data and metadata.
50 Raises:
51 LoaderError: If the file cannot be loaded.
52 FormatError: If the file is not a valid sigrok session.
54 Example:
55 >>> trace = load_sigrok("capture.sr")
56 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz")
57 >>> print(f"Duration: {trace.duration:.6f} seconds")
59 References:
60 sigrok session file format specification
61 """
62 path = Path(path)
64 if not path.exists():
65 raise LoaderError(
66 "File not found",
67 file_path=str(path),
68 )
70 if not zipfile.is_zipfile(path):
71 raise FormatError(
72 "File is not a valid sigrok session (not a ZIP archive)",
73 file_path=str(path),
74 expected="ZIP archive",
75 )
77 try:
78 with zipfile.ZipFile(path, "r") as zf:
79 # Parse metadata
80 metadata_dict = _parse_metadata(zf, path)
82 # Get sample rate from metadata
83 sample_rate = metadata_dict.get("samplerate", 1_000_000)
85 # Get channel information
86 channels = metadata_dict.get("channels", [])
87 total_channels = metadata_dict.get("total probes", len(channels))
89 # Find and read logic data files
90 logic_files = [name for name in zf.namelist() if name.startswith("logic-1")]
92 if not logic_files:
93 raise FormatError(
94 "No logic data found in sigrok session",
95 file_path=str(path),
96 expected="logic-1-* data files",
97 )
99 # Read and combine logic data
100 data = _read_logic_data(zf, logic_files, total_channels)
102 # Select specific channel if requested
103 if channel is not None:
104 if isinstance(channel, int):
105 if channel < 0 or channel >= data.shape[0]:
106 raise LoaderError(
107 f"Channel index {channel} out of range",
108 file_path=str(path),
109 details=f"Available channels: 0-{data.shape[0] - 1}",
110 )
111 channel_data = data[channel]
112 channel_name = channels[channel] if channel < len(channels) else f"D{channel}"
113 elif isinstance(channel, str): 113 ↛ 125line 113 didn't jump to line 125 because the condition on line 113 was always true
114 if channel in channels:
115 idx = channels.index(channel)
116 channel_data = data[idx]
117 channel_name = channel
118 else:
119 raise LoaderError(
120 f"Channel '{channel}' not found",
121 file_path=str(path),
122 details=f"Available channels: {channels}",
123 )
124 else:
125 channel_data = data[0] # type: ignore[unreachable]
126 channel_name = channels[0] if channels else "D0"
127 else:
128 # Default to first channel
129 channel_data = data[0] if data.ndim > 1 else data
130 channel_name = channels[0] if channels else "D0"
132 # Compute edges
133 edges = _compute_edges(channel_data, sample_rate)
135 # Build metadata
136 trace_metadata = TraceMetadata(
137 sample_rate=float(sample_rate),
138 source_file=str(path),
139 channel_name=channel_name,
140 trigger_info=metadata_dict.get("trigger", None),
141 )
143 return DigitalTrace(
144 data=channel_data,
145 metadata=trace_metadata,
146 edges=edges,
147 )
149 except zipfile.BadZipFile as e:
150 raise FormatError(
151 "Corrupted sigrok session file",
152 file_path=str(path),
153 expected="Valid ZIP archive",
154 ) from e
155 except Exception as e:
156 if isinstance(e, LoaderError | FormatError): 156 ↛ 158line 156 didn't jump to line 158 because the condition on line 156 was always true
157 raise
158 raise LoaderError(
159 "Failed to load sigrok session",
160 file_path=str(path),
161 details=str(e),
162 fix_hint="Ensure the file is a valid sigrok session (.sr) file.",
163 ) from e
166def _parse_metadata(zf: zipfile.ZipFile, path: Path) -> dict[str, Any]:
167 """Parse sigrok session metadata.
169 Args:
170 zf: Open ZipFile object.
171 path: Path to the session file (for error messages).
173 Returns:
174 Dictionary of metadata values.
175 """
176 metadata: dict[str, Any] = {}
178 # Try to read metadata file (JSON format in newer versions)
179 if "metadata" in zf.namelist(): 179 ↛ 202line 179 didn't jump to line 202 because the condition on line 179 was always true
180 try:
181 with zf.open("metadata") as f:
182 content = f.read().decode("utf-8")
183 # Parse key=value format (sigrok classic format)
184 for line in content.strip().split("\n"):
185 line = line.strip()
186 if "=" in line:
187 key, value = line.split("=", 1)
188 key = key.strip()
189 value = value.strip()
190 # Try to convert numeric values
191 try:
192 if "." in value:
193 metadata[key] = float(value)
194 else:
195 metadata[key] = int(value)
196 except ValueError:
197 metadata[key] = value
198 except Exception:
199 pass # Use defaults if metadata parsing fails
201 # Extract channel names from probe entries
202 channels: list[str] = []
203 for key, value in metadata.items():
204 if key.startswith("probe"):
205 try:
206 idx = int(key.replace("probe", ""))
207 while len(channels) <= idx:
208 channels.append(f"D{len(channels)}")
209 channels[idx] = value
210 except ValueError:
211 pass
213 if channels:
214 metadata["channels"] = channels
216 return metadata
219def _read_logic_data(
220 zf: zipfile.ZipFile,
221 logic_files: list[str],
222 total_channels: int,
223) -> NDArray[np.bool_]:
224 """Read and decode logic data from sigrok session.
226 Args:
227 zf: Open ZipFile object.
228 logic_files: List of logic data file names.
229 total_channels: Total number of digital channels.
231 Returns:
232 Boolean array of shape (channels, samples).
233 """
234 # Sort logic files to ensure correct order
235 logic_files = sorted(logic_files)
237 # Determine bytes per sample based on channel count
238 bytes_per_sample = (total_channels + 7) // 8
240 # Read all logic data
241 all_data = []
242 for logic_file in logic_files:
243 with zf.open(logic_file) as f:
244 raw_data = f.read()
245 all_data.append(raw_data)
247 # Combine data
248 combined = b"".join(all_data)
250 # Convert to numpy array
251 if bytes_per_sample == 1:
252 raw = np.frombuffer(combined, dtype=np.uint8)
253 elif bytes_per_sample == 2:
254 raw = np.frombuffer(combined, dtype=np.uint16)
255 elif bytes_per_sample <= 4:
256 # Pad to 4 bytes and read as uint32
257 padded = combined + b"\x00" * (len(combined) % 4)
258 raw = np.frombuffer(padded, dtype=np.uint32)
259 else:
260 # Handle larger sample widths
261 raw = np.frombuffer(combined, dtype=np.uint8)
263 # Extract individual channel bits
264 n_samples = len(raw)
265 channels_data = np.zeros((total_channels, n_samples), dtype=np.bool_)
267 for ch in range(total_channels):
268 if bytes_per_sample <= 4:
269 channels_data[ch] = (raw >> ch) & 1
270 else:
271 # For larger widths, calculate byte and bit position
272 byte_idx = ch // 8
273 bit_idx = ch % 8
274 byte_data = raw[byte_idx::bytes_per_sample]
275 channels_data[ch, : len(byte_data)] = (byte_data >> bit_idx) & 1
277 return channels_data
280def _compute_edges(
281 data: NDArray[np.bool_],
282 sample_rate: float,
283) -> list[tuple[float, bool]]:
284 """Compute edge timestamps from digital data.
286 Args:
287 data: Boolean array of digital samples.
288 sample_rate: Sample rate in Hz.
290 Returns:
291 List of (timestamp, is_rising) tuples.
292 """
293 edges: list[tuple[float, bool]] = []
295 if len(data) < 2:
296 return edges
298 # Find transitions
299 diff = np.diff(data.astype(np.int8))
300 rising_indices = np.where(diff == 1)[0]
301 falling_indices = np.where(diff == -1)[0]
303 time_per_sample = 1.0 / sample_rate
305 # Add rising edges
306 for idx in rising_indices:
307 timestamp = (idx + 1) * time_per_sample
308 edges.append((timestamp, True))
310 # Add falling edges
311 for idx in falling_indices:
312 timestamp = (idx + 1) * time_per_sample
313 edges.append((timestamp, False))
315 # Sort by timestamp
316 edges.sort(key=lambda x: x[0])
318 return edges
321__all__ = ["load_sigrok"]