Coverage for src / tracekit / loaders / rigol.py: 99%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Rigol WFM file loader. 

2 

3This module provides loading of Rigol oscilloscope .wfm files 

4using the RigolWFM library when available. 

5 

6 

7Example: 

8 >>> from tracekit.loaders.rigol import load_rigol_wfm 

9 >>> trace = load_rigol_wfm("DS1054Z_001.wfm") 

10 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

11""" 

12 

13from __future__ import annotations 

14 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any 

17 

18import numpy as np 

19 

20from tracekit.core.exceptions import FormatError, LoaderError 

21from tracekit.core.types import TraceMetadata, WaveformTrace 

22 

23if TYPE_CHECKING: 

24 from os import PathLike 

25 

26# Try to import RigolWFM for full Rigol support 

27try: 

28 import RigolWFM.wfm as rigol_wfm # type: ignore[import-not-found] 

29 

30 RIGOL_WFM_AVAILABLE = True 

31except ImportError: 

32 RIGOL_WFM_AVAILABLE = False 

33 

34 

35def load_rigol_wfm( 

36 path: str | PathLike[str], 

37 *, 

38 channel: int = 0, 

39) -> WaveformTrace: 

40 """Load a Rigol oscilloscope WFM file. 

41 

42 Extracts waveform data and metadata from Rigol .wfm files. 

43 Uses the RigolWFM library when available for full support. 

44 

45 Args: 

46 path: Path to the Rigol .wfm file. 

47 channel: Channel index for multi-channel files (default: 0). 

48 

49 Returns: 

50 WaveformTrace containing the waveform data and metadata. 

51 

52 Raises: 

53 LoaderError: If the file cannot be loaded or does not exist. 

54 

55 Example: 

56 >>> trace = load_rigol_wfm("DS1054Z_001.wfm") 

57 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

58 >>> print(f"Vertical scale: {trace.metadata.vertical_scale} V/div") 

59 """ 

60 path = Path(path) 

61 

62 if not path.exists(): 

63 raise LoaderError( 

64 "File not found", 

65 file_path=str(path), 

66 ) 

67 

68 if RIGOL_WFM_AVAILABLE: 

69 return _load_with_rigolwfm(path, channel=channel) 

70 else: 

71 return _load_basic(path, channel=channel) 

72 

73 

74def _load_with_rigolwfm( 

75 path: Path, 

76 *, 

77 channel: int = 0, 

78) -> WaveformTrace: 

79 """Load Rigol WFM using RigolWFM library. 

80 

81 Args: 

82 path: Path to the WFM file. 

83 channel: Channel index. 

84 

85 Returns: 

86 WaveformTrace with full metadata. 

87 

88 Raises: 

89 FormatError: If no waveform data is found in the file. 

90 LoaderError: If the file cannot be loaded. 

91 """ 

92 try: 

93 # Use RigolWFM to read the file 

94 wfm = rigol_wfm.Wfm.from_file(str(path)) 

95 

96 # Get channel data 

97 if hasattr(wfm, "channels") and len(wfm.channels) > channel: 

98 ch = wfm.channels[channel] 

99 data = np.array(ch.volts, dtype=np.float64) 

100 sample_rate = wfm.sample_rate if hasattr(wfm, "sample_rate") else 1e6 

101 vertical_scale = ch.volts_per_div if hasattr(ch, "volts_per_div") else None 

102 vertical_offset = ch.volt_offset if hasattr(ch, "volt_offset") else None 

103 channel_name = f"CH{channel + 1}" 

104 elif hasattr(wfm, "volts"): 

105 # Single channel format 

106 data = np.array(wfm.volts, dtype=np.float64) 

107 sample_rate = wfm.sample_rate if hasattr(wfm, "sample_rate") else 1e6 

108 vertical_scale = wfm.volts_per_div if hasattr(wfm, "volts_per_div") else None 

109 vertical_offset = wfm.volt_offset if hasattr(wfm, "volt_offset") else None 

110 channel_name = "CH1" 

111 else: 

112 raise FormatError( 

113 "No waveform data found in Rigol file", 

114 file_path=str(path), 

115 expected="Rigol channel data", 

116 ) 

117 

118 # Build metadata 

119 metadata = TraceMetadata( 

120 sample_rate=sample_rate, 

121 vertical_scale=vertical_scale, 

122 vertical_offset=vertical_offset, 

123 source_file=str(path), 

124 channel_name=channel_name, 

125 trigger_info=_extract_trigger_info(wfm), 

126 ) 

127 

128 return WaveformTrace(data=data, metadata=metadata) 

129 

130 except Exception as e: 

131 if isinstance(e, LoaderError | FormatError): 

132 raise 

133 raise LoaderError( 

134 "Failed to load Rigol WFM file", 

135 file_path=str(path), 

136 details=str(e), 

137 fix_hint="Ensure the file is a valid Rigol WFM format.", 

138 ) from e 

139 

140 

141def _load_basic( 

142 path: Path, 

143 *, 

144 channel: int = 0, 

145) -> WaveformTrace: 

146 """Basic Rigol WFM loader without RigolWFM library. 

147 

148 This is a simplified loader that reads basic waveform data 

149 from Rigol WFM files. For full feature support, install RigolWFM. 

150 

151 Args: 

152 path: Path to the WFM file. 

153 channel: Channel index (ignored in basic mode). 

154 

155 Returns: 

156 WaveformTrace with basic metadata. 

157 

158 Raises: 

159 FormatError: If the file is too small or has no waveform data. 

160 LoaderError: If the file cannot be read or parsed. 

161 """ 

162 try: 

163 with open(path, "rb") as f: 

164 # Read header 

165 header = f.read(256) 

166 

167 # Basic validation 

168 if len(header) < 256: 

169 raise FormatError( 

170 "File too small to be a valid Rigol WFM", 

171 file_path=str(path), 

172 expected="At least 256 bytes header", 

173 got=f"{len(header)} bytes", 

174 ) 

175 

176 # Default values 

177 sample_rate = 1e6 # Default 1 MSa/s 

178 vertical_scale = None 

179 vertical_offset = None 

180 

181 # Read waveform data 

182 f.seek(0, 2) 

183 file_size = f.tell() 

184 data_size = file_size - 256 

185 

186 if data_size <= 0: 

187 raise FormatError( 

188 "No waveform data in file", 

189 file_path=str(path), 

190 ) 

191 

192 f.seek(256) 

193 raw_data = f.read(data_size) 

194 

195 # Rigol typically uses int16 or int8 for samples 

196 try: 

197 # Try int16 first (common in Rigol files) 

198 data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) 

199 data = data / 32768.0 # Normalize to -1 to 1 

200 except ValueError: 

201 # Fall back to int8 

202 data = np.frombuffer(raw_data, dtype=np.int8).astype(np.float64) 

203 data = data / 128.0 # Normalize to -1 to 1 

204 

205 # Build metadata 

206 metadata = TraceMetadata( 

207 sample_rate=sample_rate, 

208 vertical_scale=vertical_scale, 

209 vertical_offset=vertical_offset, 

210 source_file=str(path), 

211 channel_name=f"CH{channel + 1}", 

212 ) 

213 

214 return WaveformTrace(data=data, metadata=metadata) 

215 

216 except OSError as e: 

217 raise LoaderError( 

218 "Failed to read Rigol WFM file", 

219 file_path=str(path), 

220 details=str(e), 

221 ) from e 

222 except Exception as e: 

223 if isinstance(e, LoaderError | FormatError): 

224 raise 

225 raise LoaderError( 

226 "Failed to parse Rigol WFM file", 

227 file_path=str(path), 

228 details=str(e), 

229 fix_hint="Install RigolWFM for full Rigol support: pip install RigolWFM", 

230 ) from e 

231 

232 

233def _extract_trigger_info(wfm: Any) -> dict[str, Any] | None: 

234 """Extract trigger information from Rigol waveform object. 

235 

236 Args: 

237 wfm: Rigol waveform object from RigolWFM. 

238 

239 Returns: 

240 Dictionary of trigger settings or None. 

241 """ 

242 trigger_info: dict[str, Any] = {} 

243 

244 if hasattr(wfm, "trigger_level"): 

245 trigger_info["level"] = wfm.trigger_level 

246 if hasattr(wfm, "trigger_mode"): 

247 trigger_info["mode"] = wfm.trigger_mode 

248 if hasattr(wfm, "trigger_source"): 

249 trigger_info["source"] = wfm.trigger_source 

250 

251 return trigger_info if trigger_info else None 

252 

253 

254__all__ = ["load_rigol_wfm"]