Coverage for src / tracekit / loaders / vcd.py: 91%
181 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""IEEE 1364 VCD (Value Change Dump) file loader.
3This module provides loading of VCD files, which are commonly used
4for digital waveform data from logic analyzers and simulators.
7Example:
8 >>> from tracekit.loaders.vcd import load_vcd
9 >>> trace = load_vcd("simulation.vcd")
10 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz")
11"""
13from __future__ import annotations
15import re
16from dataclasses import dataclass, field
17from pathlib import Path
18from typing import TYPE_CHECKING
20import numpy as np
21from numpy.typing import NDArray
23from tracekit.core.exceptions import FormatError, LoaderError
24from tracekit.core.types import DigitalTrace, TraceMetadata
26if TYPE_CHECKING:
27 from os import PathLike
30@dataclass
31class VCDVariable:
32 """VCD variable definition.
34 Attributes:
35 var_type: Variable type (wire, reg, etc.).
36 size: Bit width of the variable.
37 identifier: Single-character identifier code.
38 name: Human-readable variable name.
39 scope: Hierarchical scope path.
40 """
42 var_type: str
43 size: int
44 identifier: str
45 name: str
46 scope: str = ""
49@dataclass
50class VCDHeader:
51 """Parsed VCD file header information.
53 Attributes:
54 timescale: Timescale in seconds (e.g., 1e-9 for 1ns).
55 variables: Dictionary mapping identifier to VCDVariable.
56 date: Date string from header.
57 version: VCD version string.
58 comment: Comment from header.
59 """
61 timescale: float = 1e-9 # Default 1ns
62 variables: dict[str, VCDVariable] = field(default_factory=dict)
63 date: str = ""
64 version: str = ""
65 comment: str = ""
68def load_vcd(
69 path: str | PathLike[str],
70 *,
71 signal: str | None = None,
72 sample_rate: float | None = None,
73) -> DigitalTrace:
74 """Load an IEEE 1364 VCD (Value Change Dump) file.
76 VCD files contain digital waveform data with value changes and
77 timestamps. This loader converts the event-based format to a
78 sampled digital trace.
80 Args:
81 path: Path to the VCD file.
82 signal: Optional signal name to load. If None, loads the
83 first signal found.
84 sample_rate: Sample rate for conversion to sampled data.
85 If None, automatically determined from timescale.
87 Returns:
88 DigitalTrace containing the digital signal data and metadata.
90 Raises:
91 LoaderError: If the file cannot be loaded.
92 FormatError: If the file is not a valid VCD file.
94 Example:
95 >>> trace = load_vcd("simulation.vcd", signal="clk")
96 >>> print(f"Duration: {trace.duration:.6f} seconds")
97 >>> print(f"Edges: {len(trace.edges or [])}")
99 References:
100 IEEE 1364-2005: Verilog Hardware Description Language
101 """
102 path = Path(path)
104 if not path.exists():
105 raise LoaderError(
106 "File not found",
107 file_path=str(path),
108 )
110 try:
111 with open(path, encoding="utf-8", errors="replace") as f:
112 content = f.read()
114 # Parse header
115 header = _parse_vcd_header(content, path)
117 if not header.variables:
118 raise FormatError(
119 "No variables found in VCD file",
120 file_path=str(path),
121 expected="At least one $var definition",
122 )
124 # Select signal to load
125 if signal is not None:
126 # Find by name
127 target_var = None
128 for var in header.variables.values():
129 if signal in (var.name, var.identifier):
130 target_var = var
131 break
132 if target_var is None:
133 available = [v.name for v in header.variables.values()]
134 raise LoaderError(
135 f"Signal '{signal}' not found",
136 file_path=str(path),
137 details=f"Available signals: {available}",
138 )
139 else:
140 # Use first variable
141 target_var = next(iter(header.variables.values()))
143 # Parse value changes
144 changes = _parse_value_changes(content, target_var.identifier)
146 if not changes:
147 raise FormatError(
148 f"No value changes found for signal '{target_var.name}'",
149 file_path=str(path),
150 )
152 # Determine sample rate and convert to sampled data
153 if sample_rate is None:
154 # Auto-determine from timescale and value changes
155 sample_rate = _determine_sample_rate(changes, header.timescale)
157 # Convert to sampled digital trace
158 data, edges = _changes_to_samples(
159 changes,
160 header.timescale,
161 sample_rate,
162 )
164 # Build metadata
165 metadata = TraceMetadata(
166 sample_rate=sample_rate,
167 source_file=str(path),
168 channel_name=target_var.name,
169 trigger_info={
170 "timescale": header.timescale,
171 "var_type": target_var.var_type,
172 "bit_width": target_var.size,
173 },
174 )
176 return DigitalTrace(
177 data=data.astype(np.bool_), # type: ignore[arg-type]
178 metadata=metadata,
179 edges=edges,
180 )
182 except UnicodeDecodeError as e:
183 raise FormatError(
184 "VCD file contains invalid characters",
185 file_path=str(path),
186 expected="UTF-8 or ASCII text",
187 ) from e
188 except Exception as e:
189 if isinstance(e, LoaderError | FormatError): 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 raise
191 raise LoaderError(
192 "Failed to load VCD file",
193 file_path=str(path),
194 details=str(e),
195 fix_hint="Ensure the file is a valid IEEE 1364 VCD format.",
196 ) from e
199def _parse_vcd_header(content: str, path: Path) -> VCDHeader:
200 """Parse VCD file header section.
202 Args:
203 content: Full VCD file content.
204 path: Path for error messages.
206 Returns:
207 Parsed VCDHeader object.
209 Raises:
210 FormatError: If VCD header is invalid.
211 """
212 header = VCDHeader()
213 current_scope: list[str] = []
215 # Find header section (before $enddefinitions)
216 end_def_match = re.search(r"\$enddefinitions\s+\$end", content)
217 if not end_def_match:
218 raise FormatError(
219 "Invalid VCD file: missing $enddefinitions",
220 file_path=str(path),
221 )
223 header_content = content[: end_def_match.end()]
225 # Parse timescale
226 timescale_match = re.search(r"\$timescale\s+(\d+)\s*(s|ms|us|ns|ps|fs)\s+\$end", header_content)
227 if timescale_match:
228 value = int(timescale_match.group(1))
229 unit = timescale_match.group(2)
230 unit_multipliers = {
231 "s": 1.0,
232 "ms": 1e-3,
233 "us": 1e-6,
234 "ns": 1e-9,
235 "ps": 1e-12,
236 "fs": 1e-15,
237 }
238 header.timescale = value * unit_multipliers.get(unit, 1e-9)
240 # Parse date
241 date_match = re.search(r"\$date\s+(.*?)\s*\$end", header_content, re.DOTALL)
242 if date_match: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 header.date = date_match.group(1).strip()
245 # Parse version
246 version_match = re.search(r"\$version\s+(.*?)\s*\$end", header_content, re.DOTALL)
247 if version_match:
248 header.version = version_match.group(1).strip()
250 # Parse comment
251 comment_match = re.search(r"\$comment\s+(.*?)\s*\$end", header_content, re.DOTALL)
252 if comment_match: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 header.comment = comment_match.group(1).strip()
255 # Parse scopes and variables
256 scope_pattern = re.compile(r"\$scope\s+(\w+)\s+(\w+)\s+\$end")
257 upscope_pattern = re.compile(r"\$upscope\s+\$end")
258 var_pattern = re.compile(r"\$var\s+(\w+)\s+(\d+)\s+(\S+)\s+(\S+)(?:\s+\[.*?\])?\s+\$end")
260 pos = 0
261 while pos < len(header_content):
262 # Check for scope
263 scope_match = scope_pattern.match(header_content, pos)
264 if scope_match:
265 current_scope.append(scope_match.group(2))
266 pos = scope_match.end()
267 continue
269 # Check for upscope
270 upscope_match = upscope_pattern.match(header_content, pos)
271 if upscope_match:
272 if current_scope: 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was always true
273 current_scope.pop()
274 pos = upscope_match.end()
275 continue
277 # Check for variable
278 var_match = var_pattern.match(header_content, pos)
279 if var_match:
280 var = VCDVariable(
281 var_type=var_match.group(1),
282 size=int(var_match.group(2)),
283 identifier=var_match.group(3),
284 name=var_match.group(4),
285 scope=".".join(current_scope),
286 )
287 header.variables[var.identifier] = var
288 pos = var_match.end()
289 continue
291 pos += 1
293 return header
296def _parse_value_changes(
297 content: str,
298 identifier: str,
299) -> list[tuple[int, str]]:
300 """Parse value changes for a specific signal.
302 Args:
303 content: Full VCD file content.
304 identifier: Signal identifier to track.
306 Returns:
307 List of (timestamp, value) tuples.
308 """
309 changes: list[tuple[int, str]] = []
310 current_time = 0
312 # Find data section (after $enddefinitions)
313 end_def_match = re.search(r"\$enddefinitions\s+\$end", content)
314 if not end_def_match: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 return changes
317 data_content = content[end_def_match.end() :]
319 # Parse line by line
320 for line in data_content.split("\n"):
321 line = line.strip()
322 if not line:
323 continue
325 # Timestamp
326 if line.startswith("#"):
327 try:
328 current_time = int(line[1:])
329 except ValueError:
330 continue
332 # Binary value change: 0x, 1x, xx, zx (single bit)
333 elif line[0] in "01xXzZ" and len(line) >= 2:
334 value = line[0]
335 var_id = line[1:]
336 if var_id == identifier: 336 ↛ 320line 336 didn't jump to line 320 because the condition on line 336 was always true
337 changes.append((current_time, value))
339 # Multi-bit value: bVALUE IDENTIFIER or BVALUE IDENTIFIER
340 elif line[0] in "bB" or line[0] in "rR":
341 parts = line[1:].split()
342 if len(parts) >= 2: 342 ↛ 320line 342 didn't jump to line 320 because the condition on line 342 was always true
343 value = parts[0]
344 var_id = parts[1]
345 if var_id == identifier: 345 ↛ 320line 345 didn't jump to line 320 because the condition on line 345 was always true
346 changes.append((current_time, value))
348 return changes
351def _determine_sample_rate(
352 changes: list[tuple[int, str]],
353 timescale: float,
354) -> float:
355 """Determine appropriate sample rate from value changes.
357 Args:
358 changes: List of (timestamp, value) tuples.
359 timescale: VCD timescale in seconds.
361 Returns:
362 Sample rate in Hz.
363 """
364 if len(changes) < 2: 364 ↛ 366line 364 didn't jump to line 366 because the condition on line 364 was never true
365 # Default to 1 MHz if not enough data
366 return 1e6
368 # Calculate minimum time interval between changes
369 timestamps = sorted({t for t, _ in changes})
370 if len(timestamps) < 2: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 return 1e6
373 min_interval = min(timestamps[i + 1] - timestamps[i] for i in range(len(timestamps) - 1))
375 if min_interval <= 0: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 return 1e6
378 # Convert to seconds and set sample rate for ~10 samples per interval
379 interval_seconds = min_interval * timescale
380 sample_rate = 10.0 / interval_seconds
382 # Clamp to reasonable range
383 sample_rate = max(1e3, min(1e12, sample_rate))
385 return sample_rate
388def _changes_to_samples(
389 changes: list[tuple[int, str]],
390 timescale: float,
391 sample_rate: float,
392) -> tuple[NDArray[np.bool_], list[tuple[float, bool]]]:
393 """Convert value changes to sampled data.
395 Args:
396 changes: List of (timestamp, value) tuples.
397 timescale: VCD timescale in seconds.
398 sample_rate: Target sample rate in Hz.
400 Returns:
401 Tuple of (data array, edges list).
402 """
403 if not changes: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 return np.array([], dtype=np.bool_), []
406 # Sort changes by timestamp
407 changes = sorted(changes, key=lambda x: x[0])
409 # Get time range
410 start_time = changes[0][0]
411 end_time = changes[-1][0]
413 # Calculate number of samples
414 duration_seconds = (end_time - start_time) * timescale
415 n_samples = max(1, int(duration_seconds * sample_rate) + 1)
417 # Initialize data array
418 data = np.zeros(n_samples, dtype=np.bool_)
419 edges: list[tuple[float, bool]] = []
421 # Convert values to boolean (for single-bit) or LSB (for multi-bit)
422 def value_to_bool(val: str) -> bool:
423 """Convert VCD value to boolean."""
424 val = val.lower()
425 if val in ("1", "h"):
426 return True
427 if val in ("0", "l"):
428 return False
429 # For multi-bit, check LSB
430 return bool(val and val[-1] in ("1", "h"))
432 # Fill samples based on value changes
433 prev_value = False
434 for i, (timestamp, value) in enumerate(changes):
435 current_value = value_to_bool(value)
437 # Calculate sample index
438 time_seconds = (timestamp - start_time) * timescale
439 sample_idx = int(time_seconds * sample_rate)
441 # Calculate next change sample index
442 if i + 1 < len(changes):
443 next_time_seconds = (changes[i + 1][0] - start_time) * timescale
444 next_sample_idx = int(next_time_seconds * sample_rate)
445 else:
446 next_sample_idx = n_samples
448 # Fill samples
449 sample_idx = max(0, min(sample_idx, n_samples - 1))
450 next_sample_idx = max(0, min(next_sample_idx, n_samples))
451 data[sample_idx:next_sample_idx] = current_value
453 # Record edge
454 if current_value != prev_value:
455 edge_time = time_seconds
456 is_rising = current_value
457 edges.append((edge_time, is_rising))
459 prev_value = current_value
461 return data, edges
464__all__ = ["load_vcd"]