Coverage for src / tracekit / loaders / tdms.py: 92%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""NI TDMS (Technical Data Management Streaming) file loader. 

2 

3This module provides loading of NI LabVIEW TDMS files using the 

4npTDMS library when available. 

5 

6 

7Example: 

8 >>> from tracekit.loaders.tdms import load_tdms 

9 >>> trace = load_tdms("measurement.tdms") 

10 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

11""" 

12 

13from __future__ import annotations 

14 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any 

17 

18import numpy as np 

19 

20from tracekit.core.exceptions import FormatError, LoaderError 

21from tracekit.core.types import TraceMetadata, WaveformTrace 

22 

23if TYPE_CHECKING: 

24 from os import PathLike 

25 

26# Try to import npTDMS for TDMS support 

27try: 

28 from nptdms import TdmsFile 

29 

30 NPTDMS_AVAILABLE = True 

31except ImportError: 

32 NPTDMS_AVAILABLE = False 

33 

34 

35def load_tdms( 

36 path: str | PathLike[str], 

37 *, 

38 channel: str | int | None = None, 

39 group: str | None = None, 

40) -> WaveformTrace: 

41 """Load an NI TDMS file. 

42 

43 TDMS files contain hierarchical data with groups and channels. 

44 Each channel can have associated properties including sample rate. 

45 

46 Args: 

47 path: Path to the TDMS file. 

48 channel: Channel name or index to load. If None, loads the 

49 first channel found. 

50 group: Group name to select from. If None, uses the first group. 

51 

52 Returns: 

53 WaveformTrace containing the channel data and metadata. 

54 

55 Raises: 

56 Exception: If the file cannot be read or parsed. 

57 LoaderError: If the file cannot be loaded or npTDMS is not installed. 

58 

59 Example: 

60 >>> trace = load_tdms("measurement.tdms", group="Voltage", channel="CH1") 

61 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

62 >>> print(f"Duration: {trace.duration:.6f} seconds") 

63 

64 References: 

65 NI TDMS File Format: https://www.ni.com/en-us/support/documentation/ 

66 """ 

67 path = Path(path) 

68 

69 if not path.exists(): 

70 raise LoaderError( 

71 "File not found", 

72 file_path=str(path), 

73 ) 

74 

75 if not NPTDMS_AVAILABLE: 

76 raise LoaderError( 

77 "npTDMS library required for TDMS files", 

78 file_path=str(path), 

79 fix_hint="Install npTDMS: pip install npTDMS", 

80 ) 

81 

82 try: 

83 return _load_with_nptdms(path, channel=channel, group=group) 

84 except Exception as e: 

85 if isinstance(e, LoaderError | FormatError): 85 ↛ 87line 85 didn't jump to line 87 because the condition on line 85 was always true

86 raise 

87 raise LoaderError( 

88 "Failed to load TDMS file", 

89 file_path=str(path), 

90 details=str(e), 

91 fix_hint="Ensure the file is a valid NI TDMS format.", 

92 ) from e 

93 

94 

95def _load_with_nptdms( 

96 path: Path, 

97 *, 

98 channel: str | int | None = None, 

99 group: str | None = None, 

100) -> WaveformTrace: 

101 """Load TDMS using npTDMS library. 

102 

103 Args: 

104 path: Path to the TDMS file. 

105 channel: Channel name or index. 

106 group: Group name to select. 

107 

108 Returns: 

109 WaveformTrace with channel data and metadata. 

110 

111 Raises: 

112 FormatError: If file is not valid TDMS format or has no data. 

113 LoaderError: If channel or group not found. 

114 """ 

115 try: 

116 tdms_file = TdmsFile.read(str(path)) 

117 except Exception as e: 

118 raise FormatError( 

119 "Failed to parse TDMS file", 

120 file_path=str(path), 

121 expected="Valid NI TDMS format", 

122 ) from e 

123 

124 # Get available groups 

125 groups = list(tdms_file.groups()) 

126 

127 if not groups: 

128 raise FormatError( 

129 "No groups found in TDMS file", 

130 file_path=str(path), 

131 ) 

132 

133 # Select group 

134 if group is not None: 

135 target_group = None 

136 for g in groups: 

137 if g.name == group: 

138 target_group = g 

139 break 

140 if target_group is None: 

141 available_groups = [g.name for g in groups] 

142 raise LoaderError( 

143 f"Group '{group}' not found", 

144 file_path=str(path), 

145 details=f"Available groups: {available_groups}", 

146 ) 

147 else: 

148 target_group = groups[0] 

149 

150 # Get channels in group 

151 channels = list(target_group.channels()) 

152 

153 if not channels: 

154 raise FormatError( 

155 f"No channels found in group '{target_group.name}'", 

156 file_path=str(path), 

157 ) 

158 

159 # Select channel 

160 if channel is not None: 

161 if isinstance(channel, int): 

162 if channel < 0 or channel >= len(channels): 

163 raise LoaderError( 

164 f"Channel index {channel} out of range", 

165 file_path=str(path), 

166 details=f"Available channels: 0-{len(channels) - 1}", 

167 ) 

168 target_channel = channels[channel] 

169 elif isinstance(channel, str): 169 ↛ 183line 169 didn't jump to line 183 because the condition on line 169 was always true

170 target_channel = None 

171 for ch in channels: 

172 if ch.name == channel: 

173 target_channel = ch 

174 break 

175 if target_channel is None: 

176 available_channels = [ch.name for ch in channels] 

177 raise LoaderError( 

178 f"Channel '{channel}' not found", 

179 file_path=str(path), 

180 details=f"Available channels: {available_channels}", 

181 ) 

182 else: 

183 target_channel = channels[0] # type: ignore[unreachable] 

184 else: 

185 target_channel = channels[0] 

186 

187 # Get channel data 

188 data = target_channel.data 

189 if data is None or len(data) == 0: 

190 raise FormatError( 

191 f"Channel '{target_channel.name}' has no data", 

192 file_path=str(path), 

193 ) 

194 

195 # Convert to float64 

196 data = np.asarray(data, dtype=np.float64) 

197 

198 # Extract sample rate from properties 

199 sample_rate = _get_sample_rate(target_channel, target_group, tdms_file) 

200 

201 # Extract other metadata 

202 vertical_scale = target_channel.properties.get("NI_Scale[0]_Linear_Slope") 

203 vertical_offset = target_channel.properties.get("NI_Scale[0]_Linear_Y_Intercept") 

204 

205 # Get units if available 

206 target_channel.properties.get("unit_string", None) 

207 

208 # Build metadata 

209 metadata = TraceMetadata( 

210 sample_rate=sample_rate, 

211 vertical_scale=float(vertical_scale) if vertical_scale is not None else None, 

212 vertical_offset=float(vertical_offset) if vertical_offset is not None else None, 

213 source_file=str(path), 

214 channel_name=target_channel.name, 

215 trigger_info=_extract_tdms_properties(target_channel), 

216 ) 

217 

218 return WaveformTrace(data=data, metadata=metadata) 

219 

220 

221def _get_sample_rate( 

222 channel: Any, 

223 group: Any, 

224 tdms_file: Any, 

225) -> float: 

226 """Extract sample rate from TDMS channel properties. 

227 

228 Checks multiple common property names used by different NI software. 

229 

230 Args: 

231 channel: TDMS channel object. 

232 group: TDMS group object. 

233 tdms_file: TDMS file object. 

234 

235 Returns: 

236 Sample rate in Hz. 

237 """ 

238 # Common property names for sample rate 

239 sample_rate_keys = [ 

240 "wf_samples", # DAQmx 

241 "wf_increment", # Waveform dt (inverse of sample rate) 

242 "NI_RF_IQ_Rate", # RF signal analyzer 

243 "SamplingFrequency", # SignalExpress 

244 "dt", # Delta time 

245 "Fs", # Sample rate 

246 "SampleRate", 

247 "sample_rate", 

248 ] 

249 

250 # Check channel properties 

251 for key in sample_rate_keys: 

252 value = channel.properties.get(key) 

253 if value is not None: 

254 if key in ("wf_increment", "dt"): 

255 # These are time intervals, invert for sample rate 

256 if value > 0: 

257 return 1.0 / float(value) 

258 else: 

259 return float(value) 

260 

261 # Check group properties 

262 for key in sample_rate_keys: 

263 value = group.properties.get(key) 

264 if value is not None: 

265 if key in ("wf_increment", "dt"): 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 if value > 0: 

267 return 1.0 / float(value) 

268 else: 

269 return float(value) 

270 

271 # Check file properties 

272 for key in sample_rate_keys: 

273 value = tdms_file.properties.get(key) 

274 if value is not None: 

275 if key in ("wf_increment", "dt"): 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 if value > 0: 

277 return 1.0 / float(value) 

278 else: 

279 return float(value) 

280 

281 # Default sample rate if not found 

282 return 1.0e6 # 1 MHz default 

283 

284 

285def _extract_tdms_properties(channel: Any) -> dict[str, Any] | None: 

286 """Extract relevant properties from TDMS channel. 

287 

288 Args: 

289 channel: TDMS channel object. 

290 

291 Returns: 

292 Dictionary of properties, or None if no useful properties found. 

293 """ 

294 props: dict[str, Any] = {} 

295 

296 # Common useful properties 

297 useful_keys = [ 

298 "unit_string", 

299 "NI_ChannelName", 

300 "wf_start_time", 

301 "wf_start_offset", 

302 "description", 

303 "NI_Scale[0]_Linear_Slope", 

304 "NI_Scale[0]_Linear_Y_Intercept", 

305 ] 

306 

307 for key in useful_keys: 

308 value = channel.properties.get(key) 

309 if value is not None: 

310 props[key] = value 

311 

312 return props if props else None 

313 

314 

315def list_tdms_channels( 

316 path: str | PathLike[str], 

317) -> dict[str, list[str]]: 

318 """List all groups and channels in a TDMS file. 

319 

320 Args: 

321 path: Path to the TDMS file. 

322 

323 Returns: 

324 Dictionary mapping group names to lists of channel names. 

325 

326 Raises: 

327 LoaderError: If the file cannot be loaded. 

328 

329 Example: 

330 >>> channels = list_tdms_channels("measurement.tdms") 

331 >>> for group, chans in channels.items(): 

332 ... print(f"Group '{group}': {chans}") 

333 """ 

334 path = Path(path) 

335 

336 if not path.exists(): 

337 raise LoaderError( 

338 "File not found", 

339 file_path=str(path), 

340 ) 

341 

342 if not NPTDMS_AVAILABLE: 

343 raise LoaderError( 

344 "npTDMS library required for TDMS files", 

345 file_path=str(path), 

346 fix_hint="Install npTDMS: pip install npTDMS", 

347 ) 

348 

349 try: 

350 tdms_file = TdmsFile.read(str(path)) 

351 result: dict[str, list[str]] = {} 

352 

353 for group in tdms_file.groups(): 

354 channel_names = [ch.name for ch in group.channels()] 

355 result[group.name] = channel_names 

356 

357 return result 

358 

359 except Exception as e: 

360 raise LoaderError( 

361 "Failed to read TDMS file", 

362 file_path=str(path), 

363 details=str(e), 

364 ) from e 

365 

366 

367__all__ = ["list_tdms_channels", "load_tdms"]