Coverage for src / tracekit / loaders / vcd.py: 91%

181 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""IEEE 1364 VCD (Value Change Dump) file loader. 

2 

3This module provides loading of VCD files, which are commonly used 

4for digital waveform data from logic analyzers and simulators. 

5 

6 

7Example: 

8 >>> from tracekit.loaders.vcd import load_vcd 

9 >>> trace = load_vcd("simulation.vcd") 

10 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

11""" 

12 

13from __future__ import annotations 

14 

15import re 

16from dataclasses import dataclass, field 

17from pathlib import Path 

18from typing import TYPE_CHECKING 

19 

20import numpy as np 

21from numpy.typing import NDArray 

22 

23from tracekit.core.exceptions import FormatError, LoaderError 

24from tracekit.core.types import DigitalTrace, TraceMetadata 

25 

26if TYPE_CHECKING: 

27 from os import PathLike 

28 

29 

30@dataclass 

31class VCDVariable: 

32 """VCD variable definition. 

33 

34 Attributes: 

35 var_type: Variable type (wire, reg, etc.). 

36 size: Bit width of the variable. 

37 identifier: Single-character identifier code. 

38 name: Human-readable variable name. 

39 scope: Hierarchical scope path. 

40 """ 

41 

42 var_type: str 

43 size: int 

44 identifier: str 

45 name: str 

46 scope: str = "" 

47 

48 

49@dataclass 

50class VCDHeader: 

51 """Parsed VCD file header information. 

52 

53 Attributes: 

54 timescale: Timescale in seconds (e.g., 1e-9 for 1ns). 

55 variables: Dictionary mapping identifier to VCDVariable. 

56 date: Date string from header. 

57 version: VCD version string. 

58 comment: Comment from header. 

59 """ 

60 

61 timescale: float = 1e-9 # Default 1ns 

62 variables: dict[str, VCDVariable] = field(default_factory=dict) 

63 date: str = "" 

64 version: str = "" 

65 comment: str = "" 

66 

67 

68def load_vcd( 

69 path: str | PathLike[str], 

70 *, 

71 signal: str | None = None, 

72 sample_rate: float | None = None, 

73) -> DigitalTrace: 

74 """Load an IEEE 1364 VCD (Value Change Dump) file. 

75 

76 VCD files contain digital waveform data with value changes and 

77 timestamps. This loader converts the event-based format to a 

78 sampled digital trace. 

79 

80 Args: 

81 path: Path to the VCD file. 

82 signal: Optional signal name to load. If None, loads the 

83 first signal found. 

84 sample_rate: Sample rate for conversion to sampled data. 

85 If None, automatically determined from timescale. 

86 

87 Returns: 

88 DigitalTrace containing the digital signal data and metadata. 

89 

90 Raises: 

91 LoaderError: If the file cannot be loaded. 

92 FormatError: If the file is not a valid VCD file. 

93 

94 Example: 

95 >>> trace = load_vcd("simulation.vcd", signal="clk") 

96 >>> print(f"Duration: {trace.duration:.6f} seconds") 

97 >>> print(f"Edges: {len(trace.edges or [])}") 

98 

99 References: 

100 IEEE 1364-2005: Verilog Hardware Description Language 

101 """ 

102 path = Path(path) 

103 

104 if not path.exists(): 

105 raise LoaderError( 

106 "File not found", 

107 file_path=str(path), 

108 ) 

109 

110 try: 

111 with open(path, encoding="utf-8", errors="replace") as f: 

112 content = f.read() 

113 

114 # Parse header 

115 header = _parse_vcd_header(content, path) 

116 

117 if not header.variables: 

118 raise FormatError( 

119 "No variables found in VCD file", 

120 file_path=str(path), 

121 expected="At least one $var definition", 

122 ) 

123 

124 # Select signal to load 

125 if signal is not None: 

126 # Find by name 

127 target_var = None 

128 for var in header.variables.values(): 

129 if signal in (var.name, var.identifier): 

130 target_var = var 

131 break 

132 if target_var is None: 

133 available = [v.name for v in header.variables.values()] 

134 raise LoaderError( 

135 f"Signal '{signal}' not found", 

136 file_path=str(path), 

137 details=f"Available signals: {available}", 

138 ) 

139 else: 

140 # Use first variable 

141 target_var = next(iter(header.variables.values())) 

142 

143 # Parse value changes 

144 changes = _parse_value_changes(content, target_var.identifier) 

145 

146 if not changes: 

147 raise FormatError( 

148 f"No value changes found for signal '{target_var.name}'", 

149 file_path=str(path), 

150 ) 

151 

152 # Determine sample rate and convert to sampled data 

153 if sample_rate is None: 

154 # Auto-determine from timescale and value changes 

155 sample_rate = _determine_sample_rate(changes, header.timescale) 

156 

157 # Convert to sampled digital trace 

158 data, edges = _changes_to_samples( 

159 changes, 

160 header.timescale, 

161 sample_rate, 

162 ) 

163 

164 # Build metadata 

165 metadata = TraceMetadata( 

166 sample_rate=sample_rate, 

167 source_file=str(path), 

168 channel_name=target_var.name, 

169 trigger_info={ 

170 "timescale": header.timescale, 

171 "var_type": target_var.var_type, 

172 "bit_width": target_var.size, 

173 }, 

174 ) 

175 

176 return DigitalTrace( 

177 data=data.astype(np.bool_), # type: ignore[arg-type] 

178 metadata=metadata, 

179 edges=edges, 

180 ) 

181 

182 except UnicodeDecodeError as e: 

183 raise FormatError( 

184 "VCD file contains invalid characters", 

185 file_path=str(path), 

186 expected="UTF-8 or ASCII text", 

187 ) from e 

188 except Exception as e: 

189 if isinstance(e, LoaderError | FormatError): 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 raise 

191 raise LoaderError( 

192 "Failed to load VCD file", 

193 file_path=str(path), 

194 details=str(e), 

195 fix_hint="Ensure the file is a valid IEEE 1364 VCD format.", 

196 ) from e 

197 

198 

199def _parse_vcd_header(content: str, path: Path) -> VCDHeader: 

200 """Parse VCD file header section. 

201 

202 Args: 

203 content: Full VCD file content. 

204 path: Path for error messages. 

205 

206 Returns: 

207 Parsed VCDHeader object. 

208 

209 Raises: 

210 FormatError: If VCD header is invalid. 

211 """ 

212 header = VCDHeader() 

213 current_scope: list[str] = [] 

214 

215 # Find header section (before $enddefinitions) 

216 end_def_match = re.search(r"\$enddefinitions\s+\$end", content) 

217 if not end_def_match: 

218 raise FormatError( 

219 "Invalid VCD file: missing $enddefinitions", 

220 file_path=str(path), 

221 ) 

222 

223 header_content = content[: end_def_match.end()] 

224 

225 # Parse timescale 

226 timescale_match = re.search(r"\$timescale\s+(\d+)\s*(s|ms|us|ns|ps|fs)\s+\$end", header_content) 

227 if timescale_match: 

228 value = int(timescale_match.group(1)) 

229 unit = timescale_match.group(2) 

230 unit_multipliers = { 

231 "s": 1.0, 

232 "ms": 1e-3, 

233 "us": 1e-6, 

234 "ns": 1e-9, 

235 "ps": 1e-12, 

236 "fs": 1e-15, 

237 } 

238 header.timescale = value * unit_multipliers.get(unit, 1e-9) 

239 

240 # Parse date 

241 date_match = re.search(r"\$date\s+(.*?)\s*\$end", header_content, re.DOTALL) 

242 if date_match: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 header.date = date_match.group(1).strip() 

244 

245 # Parse version 

246 version_match = re.search(r"\$version\s+(.*?)\s*\$end", header_content, re.DOTALL) 

247 if version_match: 

248 header.version = version_match.group(1).strip() 

249 

250 # Parse comment 

251 comment_match = re.search(r"\$comment\s+(.*?)\s*\$end", header_content, re.DOTALL) 

252 if comment_match: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 header.comment = comment_match.group(1).strip() 

254 

255 # Parse scopes and variables 

256 scope_pattern = re.compile(r"\$scope\s+(\w+)\s+(\w+)\s+\$end") 

257 upscope_pattern = re.compile(r"\$upscope\s+\$end") 

258 var_pattern = re.compile(r"\$var\s+(\w+)\s+(\d+)\s+(\S+)\s+(\S+)(?:\s+\[.*?\])?\s+\$end") 

259 

260 pos = 0 

261 while pos < len(header_content): 

262 # Check for scope 

263 scope_match = scope_pattern.match(header_content, pos) 

264 if scope_match: 

265 current_scope.append(scope_match.group(2)) 

266 pos = scope_match.end() 

267 continue 

268 

269 # Check for upscope 

270 upscope_match = upscope_pattern.match(header_content, pos) 

271 if upscope_match: 

272 if current_scope: 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was always true

273 current_scope.pop() 

274 pos = upscope_match.end() 

275 continue 

276 

277 # Check for variable 

278 var_match = var_pattern.match(header_content, pos) 

279 if var_match: 

280 var = VCDVariable( 

281 var_type=var_match.group(1), 

282 size=int(var_match.group(2)), 

283 identifier=var_match.group(3), 

284 name=var_match.group(4), 

285 scope=".".join(current_scope), 

286 ) 

287 header.variables[var.identifier] = var 

288 pos = var_match.end() 

289 continue 

290 

291 pos += 1 

292 

293 return header 

294 

295 

296def _parse_value_changes( 

297 content: str, 

298 identifier: str, 

299) -> list[tuple[int, str]]: 

300 """Parse value changes for a specific signal. 

301 

302 Args: 

303 content: Full VCD file content. 

304 identifier: Signal identifier to track. 

305 

306 Returns: 

307 List of (timestamp, value) tuples. 

308 """ 

309 changes: list[tuple[int, str]] = [] 

310 current_time = 0 

311 

312 # Find data section (after $enddefinitions) 

313 end_def_match = re.search(r"\$enddefinitions\s+\$end", content) 

314 if not end_def_match: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 return changes 

316 

317 data_content = content[end_def_match.end() :] 

318 

319 # Parse line by line 

320 for line in data_content.split("\n"): 

321 line = line.strip() 

322 if not line: 

323 continue 

324 

325 # Timestamp 

326 if line.startswith("#"): 

327 try: 

328 current_time = int(line[1:]) 

329 except ValueError: 

330 continue 

331 

332 # Binary value change: 0x, 1x, xx, zx (single bit) 

333 elif line[0] in "01xXzZ" and len(line) >= 2: 

334 value = line[0] 

335 var_id = line[1:] 

336 if var_id == identifier: 336 ↛ 320line 336 didn't jump to line 320 because the condition on line 336 was always true

337 changes.append((current_time, value)) 

338 

339 # Multi-bit value: bVALUE IDENTIFIER or BVALUE IDENTIFIER 

340 elif line[0] in "bB" or line[0] in "rR": 

341 parts = line[1:].split() 

342 if len(parts) >= 2: 342 ↛ 320line 342 didn't jump to line 320 because the condition on line 342 was always true

343 value = parts[0] 

344 var_id = parts[1] 

345 if var_id == identifier: 345 ↛ 320line 345 didn't jump to line 320 because the condition on line 345 was always true

346 changes.append((current_time, value)) 

347 

348 return changes 

349 

350 

351def _determine_sample_rate( 

352 changes: list[tuple[int, str]], 

353 timescale: float, 

354) -> float: 

355 """Determine appropriate sample rate from value changes. 

356 

357 Args: 

358 changes: List of (timestamp, value) tuples. 

359 timescale: VCD timescale in seconds. 

360 

361 Returns: 

362 Sample rate in Hz. 

363 """ 

364 if len(changes) < 2: 364 ↛ 366line 364 didn't jump to line 366 because the condition on line 364 was never true

365 # Default to 1 MHz if not enough data 

366 return 1e6 

367 

368 # Calculate minimum time interval between changes 

369 timestamps = sorted({t for t, _ in changes}) 

370 if len(timestamps) < 2: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true

371 return 1e6 

372 

373 min_interval = min(timestamps[i + 1] - timestamps[i] for i in range(len(timestamps) - 1)) 

374 

375 if min_interval <= 0: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 return 1e6 

377 

378 # Convert to seconds and set sample rate for ~10 samples per interval 

379 interval_seconds = min_interval * timescale 

380 sample_rate = 10.0 / interval_seconds 

381 

382 # Clamp to reasonable range 

383 sample_rate = max(1e3, min(1e12, sample_rate)) 

384 

385 return sample_rate 

386 

387 

388def _changes_to_samples( 

389 changes: list[tuple[int, str]], 

390 timescale: float, 

391 sample_rate: float, 

392) -> tuple[NDArray[np.bool_], list[tuple[float, bool]]]: 

393 """Convert value changes to sampled data. 

394 

395 Args: 

396 changes: List of (timestamp, value) tuples. 

397 timescale: VCD timescale in seconds. 

398 sample_rate: Target sample rate in Hz. 

399 

400 Returns: 

401 Tuple of (data array, edges list). 

402 """ 

403 if not changes: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 return np.array([], dtype=np.bool_), [] 

405 

406 # Sort changes by timestamp 

407 changes = sorted(changes, key=lambda x: x[0]) 

408 

409 # Get time range 

410 start_time = changes[0][0] 

411 end_time = changes[-1][0] 

412 

413 # Calculate number of samples 

414 duration_seconds = (end_time - start_time) * timescale 

415 n_samples = max(1, int(duration_seconds * sample_rate) + 1) 

416 

417 # Initialize data array 

418 data = np.zeros(n_samples, dtype=np.bool_) 

419 edges: list[tuple[float, bool]] = [] 

420 

421 # Convert values to boolean (for single-bit) or LSB (for multi-bit) 

422 def value_to_bool(val: str) -> bool: 

423 """Convert VCD value to boolean.""" 

424 val = val.lower() 

425 if val in ("1", "h"): 

426 return True 

427 if val in ("0", "l"): 

428 return False 

429 # For multi-bit, check LSB 

430 return bool(val and val[-1] in ("1", "h")) 

431 

432 # Fill samples based on value changes 

433 prev_value = False 

434 for i, (timestamp, value) in enumerate(changes): 

435 current_value = value_to_bool(value) 

436 

437 # Calculate sample index 

438 time_seconds = (timestamp - start_time) * timescale 

439 sample_idx = int(time_seconds * sample_rate) 

440 

441 # Calculate next change sample index 

442 if i + 1 < len(changes): 

443 next_time_seconds = (changes[i + 1][0] - start_time) * timescale 

444 next_sample_idx = int(next_time_seconds * sample_rate) 

445 else: 

446 next_sample_idx = n_samples 

447 

448 # Fill samples 

449 sample_idx = max(0, min(sample_idx, n_samples - 1)) 

450 next_sample_idx = max(0, min(next_sample_idx, n_samples)) 

451 data[sample_idx:next_sample_idx] = current_value 

452 

453 # Record edge 

454 if current_value != prev_value: 

455 edge_time = time_seconds 

456 is_rising = current_value 

457 edges.append((edge_time, is_rising)) 

458 

459 prev_value = current_value 

460 

461 return data, edges 

462 

463 

464__all__ = ["load_vcd"]