Coverage for src / tracekit / exporters / csv.py: 41%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""CSV export functionality. 

2 

3This module provides trace and measurement export to CSV format. 

4 

5 

6Example: 

7 >>> from tracekit.exporters.csv import export_csv 

8 >>> export_csv(trace, "output.csv") 

9 >>> export_csv(measurements, "results.csv") 

10 

11References: 

12 RFC 4180 (CSV format) 

13""" 

14 

15from __future__ import annotations 

16 

17import csv 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Any 

20 

21import numpy as np 

22 

23from tracekit.core.types import DigitalTrace, WaveformTrace 

24 

25if TYPE_CHECKING: 

26 from numpy.typing import NDArray 

27 

28 

29def export_csv( 

30 data: WaveformTrace | DigitalTrace | dict[str, Any] | NDArray[Any], 

31 path: str | Path, 

32 *, 

33 include_time: bool = True, 

34 time_unit: str = "s", 

35 precision: int = 9, 

36 delimiter: str = ",", 

37 header: bool = True, 

38) -> None: 

39 """Export data to CSV format. 

40 

41 Args: 

42 data: Data to export. Can be: 

43 - WaveformTrace or DigitalTrace (with metadata as comments) 

44 - Dictionary of measurements 

45 - NumPy array 

46 path: Output file path. 

47 include_time: Include time column for traces. 

48 time_unit: Time unit ("s", "ms", "us", "ns"). 

49 precision: Decimal precision for floating point values. 

50 delimiter: Column delimiter. 

51 header: Include header row and metadata comments. 

52 

53 Raises: 

54 TypeError: If data type is not supported. 

55 

56 Example: 

57 >>> export_csv(trace, "waveform.csv") 

58 >>> export_csv(trace, "data.csv", precision=6, delimiter="\t") 

59 >>> export_csv(measurements, "results.csv") 

60 

61 Note: 

62 When exporting traces, metadata is included as comment lines 

63 starting with '#' when header=True. 

64 

65 References: 

66 EXP-001 

67 """ 

68 path = Path(path) 

69 

70 if isinstance(data, WaveformTrace | DigitalTrace): 

71 _export_trace(data, path, include_time, time_unit, precision, delimiter, header) 

72 elif isinstance(data, dict): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 _export_dict(data, path, precision, delimiter, header) 

74 elif isinstance(data, np.ndarray): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 _export_array(data, path, precision, delimiter, header) 

76 else: 

77 raise TypeError(f"Unsupported data type: {type(data)}") 

78 

79 

80def _export_trace( 

81 trace: WaveformTrace | DigitalTrace, 

82 path: Path, 

83 include_time: bool, 

84 time_unit: str, 

85 precision: int, 

86 delimiter: str, 

87 header: bool, 

88) -> None: 

89 """Export trace to CSV. 

90 

91 Args: 

92 trace: Trace to export. 

93 path: Output file path. 

94 include_time: Include time column. 

95 time_unit: Time unit for column. 

96 precision: Decimal precision. 

97 delimiter: Column delimiter. 

98 header: Include header row. 

99 """ 

100 time_multipliers = {"s": 1.0, "ms": 1e3, "us": 1e6, "ns": 1e9} 

101 multiplier = time_multipliers.get(time_unit, 1.0) 

102 

103 with open(path, "w", newline="") as f: 

104 writer = csv.writer(f, delimiter=delimiter) 

105 

106 # Write metadata as comments if header is enabled 

107 if header: 

108 # Metadata comments 

109 meta = trace.metadata 

110 f.write("# TraceKit CSV Export\n") 

111 f.write(f"# Sample Rate: {meta.sample_rate} Hz\n") 

112 f.write(f"# Time Base: {meta.time_base} s\n") 

113 f.write(f"# Samples: {len(trace.data)}\n") 

114 f.write(f"# Duration: {trace.duration} s\n") 

115 

116 if meta.vertical_scale is not None: 

117 f.write(f"# Vertical Scale: {meta.vertical_scale} V/div\n") 

118 if meta.vertical_offset is not None: 

119 f.write(f"# Vertical Offset: {meta.vertical_offset} V\n") 

120 if meta.acquisition_time is not None: 

121 f.write(f"# Acquisition Time: {meta.acquisition_time.isoformat()}\n") 

122 if meta.source_file is not None: 

123 f.write(f"# Source File: {meta.source_file}\n") 

124 if meta.channel_name is not None: 

125 f.write(f"# Channel: {meta.channel_name}\n") 

126 

127 f.write("#\n") 

128 

129 # Column headers 

130 if include_time: 130 ↛ 135line 130 didn't jump to line 135 because the condition on line 130 was always true

131 if isinstance(trace, WaveformTrace): 

132 writer.writerow([f"Time ({time_unit})", "Voltage"]) 

133 else: 

134 writer.writerow([f"Time ({time_unit})", "Digital"]) 

135 elif isinstance(trace, WaveformTrace): 

136 writer.writerow(["Voltage"]) 

137 else: 

138 writer.writerow(["Digital"]) 

139 

140 # Data 

141 n_samples = len(trace.data) 

142 time_base = trace.metadata.time_base 

143 

144 for i in range(n_samples): 

145 if include_time: 145 ↛ 151line 145 didn't jump to line 151 because the condition on line 145 was always true

146 time_val = i * time_base * multiplier 

147 if isinstance(trace, WaveformTrace): 

148 writer.writerow([f"{time_val:.{precision}g}", f"{trace.data[i]:.{precision}g}"]) 

149 else: 

150 writer.writerow([f"{time_val:.{precision}g}", int(trace.data[i])]) 

151 elif isinstance(trace, WaveformTrace): 

152 writer.writerow([f"{trace.data[i]:.{precision}g}"]) 

153 else: 

154 writer.writerow([int(trace.data[i])]) 

155 

156 

157def _export_dict( 

158 data: dict[str, Any], 

159 path: Path, 

160 precision: int, 

161 delimiter: str, 

162 header: bool, 

163) -> None: 

164 """Export dictionary to CSV. 

165 

166 Args: 

167 data: Dictionary to export. 

168 path: Output file path. 

169 precision: Decimal precision. 

170 delimiter: Column delimiter. 

171 header: Include header row. 

172 """ 

173 with open(path, "w", newline="") as f: 

174 writer = csv.writer(f, delimiter=delimiter) 

175 

176 if header: 

177 writer.writerow(["Parameter", "Value", "Unit"]) 

178 

179 for key, value in data.items(): 

180 if isinstance(value, dict): 

181 # Nested dict with value/unit 

182 val = value.get("value", value) 

183 unit = value.get("unit", "") 

184 if isinstance(val, float): 

185 writer.writerow([key, f"{val:.{precision}g}", unit]) 

186 else: 

187 writer.writerow([key, val, unit]) 

188 elif isinstance(value, float): 

189 writer.writerow([key, f"{value:.{precision}g}", ""]) 

190 else: 

191 writer.writerow([key, value, ""]) 

192 

193 

194def _export_array( 

195 data: NDArray[Any], 

196 path: Path, 

197 precision: int, 

198 delimiter: str, 

199 header: bool, 

200) -> None: 

201 """Export numpy array to CSV. 

202 

203 Args: 

204 data: NumPy array to export. 

205 path: Output file path. 

206 precision: Decimal precision. 

207 delimiter: Column delimiter. 

208 header: Include header row. 

209 """ 

210 # Handle different array dimensions 

211 if data.ndim == 1: 

212 data = data.reshape(-1, 1) 

213 

214 with open(path, "w", newline="") as f: 

215 writer = csv.writer(f, delimiter=delimiter) 

216 

217 if header: 

218 cols = [f"Column_{i}" for i in range(data.shape[1])] 

219 writer.writerow(cols) 

220 

221 for row in data: 

222 formatted = [] 

223 for val in row: 

224 if isinstance(val, float | np.floating): 

225 formatted.append(f"{val:.{precision}g}") 

226 else: 

227 formatted.append(str(val)) # type: ignore[unreachable] 

228 writer.writerow(formatted) 

229 

230 

231def export_multi_trace_csv( 

232 traces: list[WaveformTrace | DigitalTrace], 

233 path: str | Path, 

234 *, 

235 names: list[str] | None = None, 

236 include_time: bool = True, 

237 time_unit: str = "s", 

238 precision: int = 9, 

239) -> None: 

240 """Export multiple traces to single CSV file. 

241 

242 Args: 

243 traces: List of traces to export. 

244 path: Output file path. 

245 names: Column names for each trace. 

246 include_time: Include time column. 

247 time_unit: Time unit. 

248 precision: Decimal precision. 

249 

250 Example: 

251 >>> export_multi_trace_csv([ch1, ch2, ch3], "channels.csv", 

252 ... names=["CH1", "CH2", "CH3"]) 

253 """ 

254 if len(traces) == 0: 

255 return 

256 

257 path = Path(path) 

258 

259 if names is None: 

260 names = [f"Trace_{i}" for i in range(len(traces))] 

261 

262 # Use first trace for timing 

263 ref_trace = traces[0] 

264 n_samples = len(ref_trace.data) 

265 time_base = ref_trace.metadata.time_base 

266 

267 time_multipliers = {"s": 1.0, "ms": 1e3, "us": 1e6, "ns": 1e9} 

268 multiplier = time_multipliers.get(time_unit, 1.0) 

269 

270 with open(path, "w", newline="") as f: 

271 writer = csv.writer(f) 

272 

273 # Header 

274 header_row = [] 

275 if include_time: 

276 header_row.append(f"Time ({time_unit})") 

277 header_row.extend(names) 

278 writer.writerow(header_row) 

279 

280 # Data 

281 for i in range(n_samples): 

282 row = [] 

283 

284 if include_time: 

285 time_val = i * time_base * multiplier 

286 row.append(f"{time_val:.{precision}g}") 

287 

288 for trace in traces: 

289 if i < len(trace.data): 

290 if isinstance(trace, WaveformTrace): 

291 row.append(f"{trace.data[i]:.{precision}g}") 

292 else: 

293 row.append(str(int(trace.data[i]))) 

294 else: 

295 row.append("") 

296 

297 writer.writerow(row) 

298 

299 

300__all__ = [ 

301 "export_csv", 

302 "export_multi_trace_csv", 

303]