Coverage for src / tracekit / loaders / sigrok.py: 94%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Sigrok session file (.sr) loader. 

2 

3This module provides loading of sigrok session files containing 

4logic analyzer captures. Sigrok sessions are ZIP archives containing 

5metadata and binary signal data. 

6 

7 

8Example: 

9 >>> from tracekit.loaders.sigrok import load_sigrok 

10 >>> trace = load_sigrok("capture.sr") 

11 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

12 >>> print(f"Channels: {len(trace.data)}") 

13""" 

14 

15from __future__ import annotations 

16 

17import zipfile 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Any 

20 

21import numpy as np 

22from numpy.typing import NDArray 

23 

24from tracekit.core.exceptions import FormatError, LoaderError 

25from tracekit.core.types import DigitalTrace, TraceMetadata 

26 

27if TYPE_CHECKING: 

28 from os import PathLike 

29 

30 

31def load_sigrok( 

32 path: str | PathLike[str], 

33 *, 

34 channel: str | int | None = None, 

35) -> DigitalTrace: 

36 """Load a sigrok session file (.sr). 

37 

38 Sigrok session files are ZIP archives containing: 

39 - metadata: JSON file with capture settings 

40 - logic-1-*: Binary files with sample data 

41 

42 Args: 

43 path: Path to the sigrok .sr session file. 

44 channel: Optional channel name or index to load. If None, 

45 loads the first channel or merges all channels. 

46 

47 Returns: 

48 DigitalTrace containing the digital signal data and metadata. 

49 

50 Raises: 

51 LoaderError: If the file cannot be loaded. 

52 FormatError: If the file is not a valid sigrok session. 

53 

54 Example: 

55 >>> trace = load_sigrok("capture.sr") 

56 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

57 >>> print(f"Duration: {trace.duration:.6f} seconds") 

58 

59 References: 

60 sigrok session file format specification 

61 """ 

62 path = Path(path) 

63 

64 if not path.exists(): 

65 raise LoaderError( 

66 "File not found", 

67 file_path=str(path), 

68 ) 

69 

70 if not zipfile.is_zipfile(path): 

71 raise FormatError( 

72 "File is not a valid sigrok session (not a ZIP archive)", 

73 file_path=str(path), 

74 expected="ZIP archive", 

75 ) 

76 

77 try: 

78 with zipfile.ZipFile(path, "r") as zf: 

79 # Parse metadata 

80 metadata_dict = _parse_metadata(zf, path) 

81 

82 # Get sample rate from metadata 

83 sample_rate = metadata_dict.get("samplerate", 1_000_000) 

84 

85 # Get channel information 

86 channels = metadata_dict.get("channels", []) 

87 total_channels = metadata_dict.get("total probes", len(channels)) 

88 

89 # Find and read logic data files 

90 logic_files = [name for name in zf.namelist() if name.startswith("logic-1")] 

91 

92 if not logic_files: 

93 raise FormatError( 

94 "No logic data found in sigrok session", 

95 file_path=str(path), 

96 expected="logic-1-* data files", 

97 ) 

98 

99 # Read and combine logic data 

100 data = _read_logic_data(zf, logic_files, total_channels) 

101 

102 # Select specific channel if requested 

103 if channel is not None: 

104 if isinstance(channel, int): 

105 if channel < 0 or channel >= data.shape[0]: 

106 raise LoaderError( 

107 f"Channel index {channel} out of range", 

108 file_path=str(path), 

109 details=f"Available channels: 0-{data.shape[0] - 1}", 

110 ) 

111 channel_data = data[channel] 

112 channel_name = channels[channel] if channel < len(channels) else f"D{channel}" 

113 elif isinstance(channel, str): 113 ↛ 125line 113 didn't jump to line 125 because the condition on line 113 was always true

114 if channel in channels: 

115 idx = channels.index(channel) 

116 channel_data = data[idx] 

117 channel_name = channel 

118 else: 

119 raise LoaderError( 

120 f"Channel '{channel}' not found", 

121 file_path=str(path), 

122 details=f"Available channels: {channels}", 

123 ) 

124 else: 

125 channel_data = data[0] # type: ignore[unreachable] 

126 channel_name = channels[0] if channels else "D0" 

127 else: 

128 # Default to first channel 

129 channel_data = data[0] if data.ndim > 1 else data 

130 channel_name = channels[0] if channels else "D0" 

131 

132 # Compute edges 

133 edges = _compute_edges(channel_data, sample_rate) 

134 

135 # Build metadata 

136 trace_metadata = TraceMetadata( 

137 sample_rate=float(sample_rate), 

138 source_file=str(path), 

139 channel_name=channel_name, 

140 trigger_info=metadata_dict.get("trigger", None), 

141 ) 

142 

143 return DigitalTrace( 

144 data=channel_data, 

145 metadata=trace_metadata, 

146 edges=edges, 

147 ) 

148 

149 except zipfile.BadZipFile as e: 

150 raise FormatError( 

151 "Corrupted sigrok session file", 

152 file_path=str(path), 

153 expected="Valid ZIP archive", 

154 ) from e 

155 except Exception as e: 

156 if isinstance(e, LoaderError | FormatError): 156 ↛ 158line 156 didn't jump to line 158 because the condition on line 156 was always true

157 raise 

158 raise LoaderError( 

159 "Failed to load sigrok session", 

160 file_path=str(path), 

161 details=str(e), 

162 fix_hint="Ensure the file is a valid sigrok session (.sr) file.", 

163 ) from e 

164 

165 

166def _parse_metadata(zf: zipfile.ZipFile, path: Path) -> dict[str, Any]: 

167 """Parse sigrok session metadata. 

168 

169 Args: 

170 zf: Open ZipFile object. 

171 path: Path to the session file (for error messages). 

172 

173 Returns: 

174 Dictionary of metadata values. 

175 """ 

176 metadata: dict[str, Any] = {} 

177 

178 # Try to read metadata file (JSON format in newer versions) 

179 if "metadata" in zf.namelist(): 179 ↛ 202line 179 didn't jump to line 202 because the condition on line 179 was always true

180 try: 

181 with zf.open("metadata") as f: 

182 content = f.read().decode("utf-8") 

183 # Parse key=value format (sigrok classic format) 

184 for line in content.strip().split("\n"): 

185 line = line.strip() 

186 if "=" in line: 

187 key, value = line.split("=", 1) 

188 key = key.strip() 

189 value = value.strip() 

190 # Try to convert numeric values 

191 try: 

192 if "." in value: 

193 metadata[key] = float(value) 

194 else: 

195 metadata[key] = int(value) 

196 except ValueError: 

197 metadata[key] = value 

198 except Exception: 

199 pass # Use defaults if metadata parsing fails 

200 

201 # Extract channel names from probe entries 

202 channels: list[str] = [] 

203 for key, value in metadata.items(): 

204 if key.startswith("probe"): 

205 try: 

206 idx = int(key.replace("probe", "")) 

207 while len(channels) <= idx: 

208 channels.append(f"D{len(channels)}") 

209 channels[idx] = value 

210 except ValueError: 

211 pass 

212 

213 if channels: 

214 metadata["channels"] = channels 

215 

216 return metadata 

217 

218 

219def _read_logic_data( 

220 zf: zipfile.ZipFile, 

221 logic_files: list[str], 

222 total_channels: int, 

223) -> NDArray[np.bool_]: 

224 """Read and decode logic data from sigrok session. 

225 

226 Args: 

227 zf: Open ZipFile object. 

228 logic_files: List of logic data file names. 

229 total_channels: Total number of digital channels. 

230 

231 Returns: 

232 Boolean array of shape (channels, samples). 

233 """ 

234 # Sort logic files to ensure correct order 

235 logic_files = sorted(logic_files) 

236 

237 # Determine bytes per sample based on channel count 

238 bytes_per_sample = (total_channels + 7) // 8 

239 

240 # Read all logic data 

241 all_data = [] 

242 for logic_file in logic_files: 

243 with zf.open(logic_file) as f: 

244 raw_data = f.read() 

245 all_data.append(raw_data) 

246 

247 # Combine data 

248 combined = b"".join(all_data) 

249 

250 # Convert to numpy array 

251 if bytes_per_sample == 1: 

252 raw = np.frombuffer(combined, dtype=np.uint8) 

253 elif bytes_per_sample == 2: 

254 raw = np.frombuffer(combined, dtype=np.uint16) 

255 elif bytes_per_sample <= 4: 

256 # Pad to 4 bytes and read as uint32 

257 padded = combined + b"\x00" * (len(combined) % 4) 

258 raw = np.frombuffer(padded, dtype=np.uint32) 

259 else: 

260 # Handle larger sample widths 

261 raw = np.frombuffer(combined, dtype=np.uint8) 

262 

263 # Extract individual channel bits 

264 n_samples = len(raw) 

265 channels_data = np.zeros((total_channels, n_samples), dtype=np.bool_) 

266 

267 for ch in range(total_channels): 

268 if bytes_per_sample <= 4: 

269 channels_data[ch] = (raw >> ch) & 1 

270 else: 

271 # For larger widths, calculate byte and bit position 

272 byte_idx = ch // 8 

273 bit_idx = ch % 8 

274 byte_data = raw[byte_idx::bytes_per_sample] 

275 channels_data[ch, : len(byte_data)] = (byte_data >> bit_idx) & 1 

276 

277 return channels_data 

278 

279 

280def _compute_edges( 

281 data: NDArray[np.bool_], 

282 sample_rate: float, 

283) -> list[tuple[float, bool]]: 

284 """Compute edge timestamps from digital data. 

285 

286 Args: 

287 data: Boolean array of digital samples. 

288 sample_rate: Sample rate in Hz. 

289 

290 Returns: 

291 List of (timestamp, is_rising) tuples. 

292 """ 

293 edges: list[tuple[float, bool]] = [] 

294 

295 if len(data) < 2: 

296 return edges 

297 

298 # Find transitions 

299 diff = np.diff(data.astype(np.int8)) 

300 rising_indices = np.where(diff == 1)[0] 

301 falling_indices = np.where(diff == -1)[0] 

302 

303 time_per_sample = 1.0 / sample_rate 

304 

305 # Add rising edges 

306 for idx in rising_indices: 

307 timestamp = (idx + 1) * time_per_sample 

308 edges.append((timestamp, True)) 

309 

310 # Add falling edges 

311 for idx in falling_indices: 

312 timestamp = (idx + 1) * time_per_sample 

313 edges.append((timestamp, False)) 

314 

315 # Sort by timestamp 

316 edges.sort(key=lambda x: x[0]) 

317 

318 return edges 

319 

320 

321__all__ = ["load_sigrok"]