Coverage for src / tracekit / exporters / csv.py: 41%
117 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""CSV export functionality.
3This module provides trace and measurement export to CSV format.
6Example:
7 >>> from tracekit.exporters.csv import export_csv
8 >>> export_csv(trace, "output.csv")
9 >>> export_csv(measurements, "results.csv")
11References:
12 RFC 4180 (CSV format)
13"""
15from __future__ import annotations
17import csv
18from pathlib import Path
19from typing import TYPE_CHECKING, Any
21import numpy as np
23from tracekit.core.types import DigitalTrace, WaveformTrace
25if TYPE_CHECKING:
26 from numpy.typing import NDArray
29def export_csv(
30 data: WaveformTrace | DigitalTrace | dict[str, Any] | NDArray[Any],
31 path: str | Path,
32 *,
33 include_time: bool = True,
34 time_unit: str = "s",
35 precision: int = 9,
36 delimiter: str = ",",
37 header: bool = True,
38) -> None:
39 """Export data to CSV format.
41 Args:
42 data: Data to export. Can be:
43 - WaveformTrace or DigitalTrace (with metadata as comments)
44 - Dictionary of measurements
45 - NumPy array
46 path: Output file path.
47 include_time: Include time column for traces.
48 time_unit: Time unit ("s", "ms", "us", "ns").
49 precision: Decimal precision for floating point values.
50 delimiter: Column delimiter.
51 header: Include header row and metadata comments.
53 Raises:
54 TypeError: If data type is not supported.
56 Example:
57 >>> export_csv(trace, "waveform.csv")
58 >>> export_csv(trace, "data.csv", precision=6, delimiter="\t")
59 >>> export_csv(measurements, "results.csv")
61 Note:
62 When exporting traces, metadata is included as comment lines
63 starting with '#' when header=True.
65 References:
66 EXP-001
67 """
68 path = Path(path)
70 if isinstance(data, WaveformTrace | DigitalTrace):
71 _export_trace(data, path, include_time, time_unit, precision, delimiter, header)
72 elif isinstance(data, dict): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 _export_dict(data, path, precision, delimiter, header)
74 elif isinstance(data, np.ndarray): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 _export_array(data, path, precision, delimiter, header)
76 else:
77 raise TypeError(f"Unsupported data type: {type(data)}")
80def _export_trace(
81 trace: WaveformTrace | DigitalTrace,
82 path: Path,
83 include_time: bool,
84 time_unit: str,
85 precision: int,
86 delimiter: str,
87 header: bool,
88) -> None:
89 """Export trace to CSV.
91 Args:
92 trace: Trace to export.
93 path: Output file path.
94 include_time: Include time column.
95 time_unit: Time unit for column.
96 precision: Decimal precision.
97 delimiter: Column delimiter.
98 header: Include header row.
99 """
100 time_multipliers = {"s": 1.0, "ms": 1e3, "us": 1e6, "ns": 1e9}
101 multiplier = time_multipliers.get(time_unit, 1.0)
103 with open(path, "w", newline="") as f:
104 writer = csv.writer(f, delimiter=delimiter)
106 # Write metadata as comments if header is enabled
107 if header:
108 # Metadata comments
109 meta = trace.metadata
110 f.write("# TraceKit CSV Export\n")
111 f.write(f"# Sample Rate: {meta.sample_rate} Hz\n")
112 f.write(f"# Time Base: {meta.time_base} s\n")
113 f.write(f"# Samples: {len(trace.data)}\n")
114 f.write(f"# Duration: {trace.duration} s\n")
116 if meta.vertical_scale is not None:
117 f.write(f"# Vertical Scale: {meta.vertical_scale} V/div\n")
118 if meta.vertical_offset is not None:
119 f.write(f"# Vertical Offset: {meta.vertical_offset} V\n")
120 if meta.acquisition_time is not None:
121 f.write(f"# Acquisition Time: {meta.acquisition_time.isoformat()}\n")
122 if meta.source_file is not None:
123 f.write(f"# Source File: {meta.source_file}\n")
124 if meta.channel_name is not None:
125 f.write(f"# Channel: {meta.channel_name}\n")
127 f.write("#\n")
129 # Column headers
130 if include_time: 130 ↛ 135line 130 didn't jump to line 135 because the condition on line 130 was always true
131 if isinstance(trace, WaveformTrace):
132 writer.writerow([f"Time ({time_unit})", "Voltage"])
133 else:
134 writer.writerow([f"Time ({time_unit})", "Digital"])
135 elif isinstance(trace, WaveformTrace):
136 writer.writerow(["Voltage"])
137 else:
138 writer.writerow(["Digital"])
140 # Data
141 n_samples = len(trace.data)
142 time_base = trace.metadata.time_base
144 for i in range(n_samples):
145 if include_time: 145 ↛ 151line 145 didn't jump to line 151 because the condition on line 145 was always true
146 time_val = i * time_base * multiplier
147 if isinstance(trace, WaveformTrace):
148 writer.writerow([f"{time_val:.{precision}g}", f"{trace.data[i]:.{precision}g}"])
149 else:
150 writer.writerow([f"{time_val:.{precision}g}", int(trace.data[i])])
151 elif isinstance(trace, WaveformTrace):
152 writer.writerow([f"{trace.data[i]:.{precision}g}"])
153 else:
154 writer.writerow([int(trace.data[i])])
157def _export_dict(
158 data: dict[str, Any],
159 path: Path,
160 precision: int,
161 delimiter: str,
162 header: bool,
163) -> None:
164 """Export dictionary to CSV.
166 Args:
167 data: Dictionary to export.
168 path: Output file path.
169 precision: Decimal precision.
170 delimiter: Column delimiter.
171 header: Include header row.
172 """
173 with open(path, "w", newline="") as f:
174 writer = csv.writer(f, delimiter=delimiter)
176 if header:
177 writer.writerow(["Parameter", "Value", "Unit"])
179 for key, value in data.items():
180 if isinstance(value, dict):
181 # Nested dict with value/unit
182 val = value.get("value", value)
183 unit = value.get("unit", "")
184 if isinstance(val, float):
185 writer.writerow([key, f"{val:.{precision}g}", unit])
186 else:
187 writer.writerow([key, val, unit])
188 elif isinstance(value, float):
189 writer.writerow([key, f"{value:.{precision}g}", ""])
190 else:
191 writer.writerow([key, value, ""])
194def _export_array(
195 data: NDArray[Any],
196 path: Path,
197 precision: int,
198 delimiter: str,
199 header: bool,
200) -> None:
201 """Export numpy array to CSV.
203 Args:
204 data: NumPy array to export.
205 path: Output file path.
206 precision: Decimal precision.
207 delimiter: Column delimiter.
208 header: Include header row.
209 """
210 # Handle different array dimensions
211 if data.ndim == 1:
212 data = data.reshape(-1, 1)
214 with open(path, "w", newline="") as f:
215 writer = csv.writer(f, delimiter=delimiter)
217 if header:
218 cols = [f"Column_{i}" for i in range(data.shape[1])]
219 writer.writerow(cols)
221 for row in data:
222 formatted = []
223 for val in row:
224 if isinstance(val, float | np.floating):
225 formatted.append(f"{val:.{precision}g}")
226 else:
227 formatted.append(str(val)) # type: ignore[unreachable]
228 writer.writerow(formatted)
231def export_multi_trace_csv(
232 traces: list[WaveformTrace | DigitalTrace],
233 path: str | Path,
234 *,
235 names: list[str] | None = None,
236 include_time: bool = True,
237 time_unit: str = "s",
238 precision: int = 9,
239) -> None:
240 """Export multiple traces to single CSV file.
242 Args:
243 traces: List of traces to export.
244 path: Output file path.
245 names: Column names for each trace.
246 include_time: Include time column.
247 time_unit: Time unit.
248 precision: Decimal precision.
250 Example:
251 >>> export_multi_trace_csv([ch1, ch2, ch3], "channels.csv",
252 ... names=["CH1", "CH2", "CH3"])
253 """
254 if len(traces) == 0:
255 return
257 path = Path(path)
259 if names is None:
260 names = [f"Trace_{i}" for i in range(len(traces))]
262 # Use first trace for timing
263 ref_trace = traces[0]
264 n_samples = len(ref_trace.data)
265 time_base = ref_trace.metadata.time_base
267 time_multipliers = {"s": 1.0, "ms": 1e3, "us": 1e6, "ns": 1e9}
268 multiplier = time_multipliers.get(time_unit, 1.0)
270 with open(path, "w", newline="") as f:
271 writer = csv.writer(f)
273 # Header
274 header_row = []
275 if include_time:
276 header_row.append(f"Time ({time_unit})")
277 header_row.extend(names)
278 writer.writerow(header_row)
280 # Data
281 for i in range(n_samples):
282 row = []
284 if include_time:
285 time_val = i * time_base * multiplier
286 row.append(f"{time_val:.{precision}g}")
288 for trace in traces:
289 if i < len(trace.data):
290 if isinstance(trace, WaveformTrace):
291 row.append(f"{trace.data[i]:.{precision}g}")
292 else:
293 row.append(str(int(trace.data[i])))
294 else:
295 row.append("")
297 writer.writerow(row)
300__all__ = [
301 "export_csv",
302 "export_multi_trace_csv",
303]