Coverage for src / tracekit / loaders / tektronix.py: 88%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Tektronix WFM file loader. 

2 

3This module provides loading of Tektronix oscilloscope .wfm files 

4using the tm_data_types library when available, with fallback to 

5basic binary parsing. 

6 

7Supports both analog and digital waveforms from Tektronix oscilloscopes 

8including mixed-signal instruments. 

9 

10 

11Example: 

12 >>> from tracekit.loaders.tektronix import load_tektronix_wfm 

13 >>> trace = load_tektronix_wfm("TEK00001.wfm") 

14 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

15 

16 >>> # Load digital waveform 

17 >>> digital_trace = load_tektronix_wfm("digital_capture.wfm") 

18 >>> print(f"Digital trace: {len(digital_trace.data)} samples") 

19""" 

20 

21from __future__ import annotations 

22 

23import contextlib 

24import logging 

25from pathlib import Path 

26from typing import TYPE_CHECKING, Any, Union 

27 

28import numpy as np 

29from numpy.typing import NDArray 

30 

31from tracekit.core.exceptions import FormatError, LoaderError 

32from tracekit.core.types import DigitalTrace, IQTrace, TraceMetadata, WaveformTrace 

33 

34if TYPE_CHECKING: 

35 from os import PathLike 

36 

37# Logger for debug output 

38logger = logging.getLogger(__name__) 

39 

40# Try to import tm_data_types for full Tektronix support 

41try: 

42 import tm_data_types # type: ignore[import-untyped, import-not-found] 

43 

44 TM_DATA_TYPES_AVAILABLE = True 

45except ImportError: 

46 TM_DATA_TYPES_AVAILABLE = False 

47 

48# Type alias for return type 

49TektronixTrace = Union[WaveformTrace, DigitalTrace, IQTrace] 

50 

51# Minimum file size for valid WFM files 

52MIN_WFM_FILE_SIZE = 512 

53 

54 

55def load_tektronix_wfm( 

56 path: str | PathLike[str], 

57 *, 

58 channel: int = 0, 

59) -> TektronixTrace: 

60 """Load a Tektronix oscilloscope WFM file. 

61 

62 Extracts waveform data and metadata from Tektronix .wfm files. 

63 Uses the tm_data_types library when available for full support, 

64 otherwise falls back to basic binary parsing. 

65 

66 Supports both analog and digital waveforms from mixed-signal 

67 oscilloscopes (channels 5-8 are typically digital on MSO scopes). 

68 

69 Args: 

70 path: Path to the Tektronix .wfm file. 

71 channel: Channel index for multi-channel files (default: 0). 

72 

73 Returns: 

74 WaveformTrace for analog waveforms or DigitalTrace for digital waveforms. 

75 

76 Raises: 

77 LoaderError: If the file cannot be loaded. 

78 FormatError: If the file is not a valid Tektronix WFM file. 

79 

80 Example: 

81 >>> trace = load_tektronix_wfm("TEK00001.wfm") 

82 >>> print(f"Sample rate: {trace.metadata.sample_rate} Hz") 

83 >>> print(f"Channel: {trace.metadata.channel_name}") 

84 

85 >>> # Check trace type 

86 >>> if isinstance(trace, DigitalTrace): 

87 ... print("Digital waveform loaded") 

88 """ 

89 path = Path(path) 

90 

91 if not path.exists(): 

92 raise LoaderError( 

93 "File not found", 

94 file_path=str(path), 

95 ) 

96 

97 # File size validation 

98 file_size = path.stat().st_size 

99 if file_size < MIN_WFM_FILE_SIZE: 

100 raise FormatError( 

101 f"File too small ({file_size} bytes), may be empty or corrupted", 

102 file_path=str(path), 

103 expected=f"At least {MIN_WFM_FILE_SIZE} bytes", 

104 got=f"{file_size} bytes", 

105 ) 

106 

107 logger.debug("Loading Tektronix WFM file: %s (%d bytes)", path, file_size) 

108 

109 if TM_DATA_TYPES_AVAILABLE: 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true

110 return _load_with_tm_data_types(path, channel=channel) 

111 else: 

112 return _load_basic(path, channel=channel) 

113 

114 

115def _load_with_tm_data_types( 

116 path: Path, 

117 *, 

118 channel: int = 0, 

119) -> TektronixTrace: 

120 """Load Tektronix WFM using tm_data_types library. 

121 

122 Handles multiple waveform formats: 

123 - Multi-channel container with analog_waveforms 

124 - Direct AnalogWaveform with y_axis_values 

125 - Legacy format with y_data 

126 - DigitalWaveform with y_axis_byte_values 

127 

128 Args: 

129 path: Path to the WFM file. 

130 channel: Channel index. 

131 

132 Returns: 

133 WaveformTrace for analog data or DigitalTrace for digital data. 

134 

135 Raises: 

136 FormatError: If the file format is not recognized or invalid. 

137 LoaderError: If the file cannot be loaded. 

138 """ 

139 try: 

140 # Use tm_data_types to read the file 

141 wfm = tm_data_types.read_file(str(path)) 

142 

143 # Log object information for debugging 

144 wfm_type = type(wfm).__name__ 

145 available_attrs = [attr for attr in dir(wfm) if not attr.startswith("_")] 

146 logger.debug("WFM object type: %s", wfm_type) 

147 logger.debug("WFM attributes: %s", available_attrs[:20]) # First 20 attrs 

148 

149 # Check for digital waveforms attribute 

150 if hasattr(wfm, "digital_waveforms"): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 logger.debug("Digital waveforms found: %d", len(wfm.digital_waveforms)) 

152 

153 # Extract waveform data - handle different file formats 

154 # Path 1: Multi-channel container format (wrapped analog) 

155 if hasattr(wfm, "analog_waveforms") and len(wfm.analog_waveforms) > channel: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 logger.debug("Loading from analog_waveforms[%d]", channel) 

157 waveform = wfm.analog_waveforms[channel] 

158 data = np.array(waveform.y_data, dtype=np.float64) 

159 sample_rate = 1.0 / waveform.x_increment if waveform.x_increment > 0 else 1e6 

160 vertical_scale = getattr(waveform, "y_scale", None) 

161 vertical_offset = getattr(waveform, "y_offset", None) 

162 channel_name = getattr(waveform, "name", f"CH{channel + 1}") 

163 

164 return _build_waveform_trace( 

165 data=data, 

166 sample_rate=sample_rate, 

167 vertical_scale=vertical_scale, 

168 vertical_offset=vertical_offset, 

169 channel_name=channel_name, 

170 path=path, 

171 wfm=wfm, 

172 ) 

173 

174 # Path 2: Direct AnalogWaveform format (tm_data_types 0.3.0+) 

175 elif hasattr(wfm, "y_axis_values") and wfm_type == "AnalogWaveform": 

176 logger.debug("Loading direct AnalogWaveform with y_axis_values") 

177 # Extract raw integer values 

178 y_raw = np.array(wfm.y_axis_values, dtype=np.float64) 

179 # Reconstruct voltage values using offset and spacing 

180 y_spacing = float(wfm.y_axis_spacing) if wfm.y_axis_spacing else 1.0 

181 y_offset = float(wfm.y_axis_offset) if wfm.y_axis_offset else 0.0 

182 data = y_raw * y_spacing + y_offset 

183 

184 x_spacing = float(wfm.x_axis_spacing) if wfm.x_axis_spacing else 1e-6 

185 sample_rate = 1.0 / x_spacing if x_spacing > 0 else 1e6 

186 vertical_offset = y_offset 

187 channel_name = ( 

188 wfm.source_name 

189 if hasattr(wfm, "source_name") and wfm.source_name 

190 else f"CH{channel + 1}" 

191 ) 

192 

193 return _build_waveform_trace( 

194 data=data, 

195 sample_rate=sample_rate, 

196 vertical_scale=None, 

197 vertical_offset=vertical_offset, 

198 channel_name=channel_name, 

199 path=path, 

200 wfm=wfm, 

201 ) 

202 

203 # Path 3: DigitalWaveform format 

204 elif wfm_type == "DigitalWaveform" or hasattr(wfm, "y_axis_byte_values"): 

205 logger.debug("Loading DigitalWaveform with y_axis_byte_values") 

206 return _load_digital_waveform(wfm, path, channel) 

207 

208 # Path 4: Legacy single channel format with y_data 

209 elif hasattr(wfm, "y_data"): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 logger.debug("Loading legacy format with y_data") 

211 data = np.array(wfm.y_data, dtype=np.float64) 

212 x_increment = getattr(wfm, "x_increment", 1e-6) 

213 sample_rate = 1.0 / x_increment if x_increment > 0 else 1e6 

214 vertical_scale = getattr(wfm, "y_scale", None) 

215 vertical_offset = getattr(wfm, "y_offset", None) 

216 channel_name = getattr(wfm, "name", "CH1") 

217 

218 return _build_waveform_trace( 

219 data=data, 

220 sample_rate=sample_rate, 

221 vertical_scale=vertical_scale, 

222 vertical_offset=vertical_offset, 

223 channel_name=channel_name, 

224 path=path, 

225 wfm=wfm, 

226 ) 

227 

228 # Path 5: Check for wrapped digital waveforms 

229 elif hasattr(wfm, "digital_waveforms") and len(wfm.digital_waveforms) > channel: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 logger.debug("Loading from digital_waveforms[%d]", channel) 

231 digital_wfm = wfm.digital_waveforms[channel] 

232 return _load_digital_waveform(digital_wfm, path, channel) 

233 

234 # Path 6: IQWaveform format (I/Q data) 

235 elif wfm_type == "IQWaveform" or ( 

236 hasattr(wfm, "i_axis_values") and hasattr(wfm, "q_axis_values") 

237 ): 

238 logger.debug("Loading IQWaveform with i_axis_values and q_axis_values") 

239 return _load_iq_waveform(wfm, path) 

240 

241 # No recognized format - provide detailed error 

242 raise FormatError( 

243 f"No waveform data found. Object type: {wfm_type}. " 

244 f"Available attributes: {', '.join(available_attrs[:15])}", 

245 file_path=str(path), 

246 expected="Tektronix analog or digital waveform data", 

247 fix_hint=( 

248 "This file may use an unsupported Tektronix format variant. " 

249 "Check that tm_data_types is up to date: pip install -U tm_data_types" 

250 ), 

251 ) 

252 

253 except Exception as e: 

254 if isinstance(e, LoaderError | FormatError): 

255 raise 

256 raise LoaderError( 

257 "Failed to load Tektronix WFM file", 

258 file_path=str(path), 

259 details=str(e), 

260 fix_hint="Ensure the file is a valid Tektronix WFM format.", 

261 ) from e 

262 

263 

264def _build_waveform_trace( 

265 data: NDArray[np.float64], 

266 sample_rate: float, 

267 vertical_scale: float | None, 

268 vertical_offset: float | None, 

269 channel_name: str, 

270 path: Path, 

271 wfm: Any, 

272) -> WaveformTrace: 

273 """Build a WaveformTrace from extracted data. 

274 

275 Args: 

276 data: Waveform sample data. 

277 sample_rate: Sample rate in Hz. 

278 vertical_scale: Vertical scale in volts/div. 

279 vertical_offset: Vertical offset in volts. 

280 channel_name: Channel name. 

281 path: Source file path. 

282 wfm: Original waveform object for trigger info extraction. 

283 

284 Returns: 

285 Constructed WaveformTrace. 

286 """ 

287 # Extract acquisition time if available 

288 acquisition_time = None 

289 if hasattr(wfm, "date_time"): 

290 with contextlib.suppress(ValueError, AttributeError): 

291 acquisition_time = wfm.date_time 

292 

293 metadata = TraceMetadata( 

294 sample_rate=sample_rate, 

295 vertical_scale=vertical_scale, 

296 vertical_offset=vertical_offset, 

297 acquisition_time=acquisition_time, 

298 source_file=str(path), 

299 channel_name=channel_name, 

300 trigger_info=_extract_trigger_info(wfm), 

301 ) 

302 

303 return WaveformTrace(data=data, metadata=metadata) 

304 

305 

306def _load_digital_waveform( 

307 wfm: Any, 

308 path: Path, 

309 channel: int = 0, 

310) -> DigitalTrace: 

311 """Load a digital waveform from tm_data_types object. 

312 

313 Handles DigitalWaveform objects with y_axis_byte_values attribute, 

314 commonly used for digital/logic analyzer captures on mixed-signal 

315 oscilloscopes. 

316 

317 Args: 

318 wfm: DigitalWaveform object from tm_data_types. 

319 path: Source file path. 

320 channel: Channel index. 

321 

322 Returns: 

323 DigitalTrace with boolean sample data. 

324 

325 Raises: 

326 FormatError: If DigitalWaveform has no recognized data attribute. 

327 """ 

328 logger.debug("Extracting digital waveform data") 

329 

330 # Extract digital sample data 

331 if hasattr(wfm, "y_axis_byte_values"): 

332 # y_axis_byte_values contains byte-level digital data 

333 raw_bytes = wfm.y_axis_byte_values 

334 # Convert bytes to numpy array and interpret as boolean 

335 # Each byte typically represents a logic state (0 = low, non-zero = high) 

336 byte_array = np.frombuffer(bytes(raw_bytes), dtype=np.uint8) 

337 data = byte_array.astype(np.bool_) 

338 logger.debug("Loaded %d digital samples from y_axis_byte_values", len(data)) 

339 elif hasattr(wfm, "samples"): 

340 # Alternative attribute name 

341 data = np.array(wfm.samples, dtype=np.bool_) 

342 logger.debug("Loaded %d digital samples from samples", len(data)) 

343 else: 

344 # Try to find any data attribute 

345 for attr in ["data", "digital_data", "logic_data"]: 

346 if hasattr(wfm, attr): 

347 data = np.array(getattr(wfm, attr), dtype=np.bool_) 

348 logger.debug("Loaded %d digital samples from %s", len(data), attr) 

349 break 

350 else: 

351 raise FormatError( 

352 "DigitalWaveform has no recognized data attribute", 

353 file_path=str(path), 

354 expected="y_axis_byte_values, samples, or data attribute", 

355 ) 

356 

357 # Extract timing information 

358 x_spacing = 1e-6 # Default 1 microsecond per sample 

359 if hasattr(wfm, "x_axis_spacing") and wfm.x_axis_spacing: 

360 x_spacing = float(wfm.x_axis_spacing) 

361 elif hasattr(wfm, "horizontal_spacing") and wfm.horizontal_spacing: 

362 x_spacing = float(wfm.horizontal_spacing) 

363 

364 sample_rate = 1.0 / x_spacing if x_spacing > 0 else 1e6 

365 

366 # Extract channel name 

367 channel_name = f"D{channel + 1}" # Digital channels typically labeled D1, D2, etc. 

368 if hasattr(wfm, "source_name") and wfm.source_name: 

369 channel_name = wfm.source_name 

370 elif hasattr(wfm, "name") and wfm.name: 

371 channel_name = wfm.name 

372 

373 # Build metadata 

374 metadata = TraceMetadata( 

375 sample_rate=sample_rate, 

376 source_file=str(path), 

377 channel_name=channel_name, 

378 ) 

379 

380 # Extract edge information if available 

381 edges = None 

382 if hasattr(wfm, "edges"): 

383 try: 

384 edges = [(float(ts), bool(is_rising)) for ts, is_rising in wfm.edges] 

385 except (TypeError, ValueError): 

386 pass 

387 

388 return DigitalTrace(data=data, metadata=metadata, edges=edges) 

389 

390 

391def _load_iq_waveform( 

392 wfm: Any, 

393 path: Path, 

394) -> IQTrace: 

395 """Load I/Q waveform data from tm_data_types IQWaveform object. 

396 

397 Handles IQWaveform objects with i_axis_values and q_axis_values, 

398 commonly used for RF and software-defined radio captures. 

399 

400 Args: 

401 wfm: IQWaveform object from tm_data_types. 

402 path: Source file path. 

403 

404 Returns: 

405 IQTrace with I and Q component data. 

406 """ 

407 logger.debug("Extracting I/Q waveform data") 

408 

409 # Extract I/Q data 

410 i_data = np.array(wfm.i_axis_values, dtype=np.float64) 

411 q_data = np.array(wfm.q_axis_values, dtype=np.float64) 

412 

413 logger.debug("Loaded %d I/Q samples", len(i_data)) 

414 

415 # Apply scaling if available 

416 if hasattr(wfm, "iq_axis_spacing") and wfm.iq_axis_spacing: 

417 iq_spacing = float(wfm.iq_axis_spacing) 

418 i_data = i_data * iq_spacing 

419 q_data = q_data * iq_spacing 

420 if hasattr(wfm, "iq_axis_offset") and wfm.iq_axis_offset: 

421 iq_offset = float(wfm.iq_axis_offset) 

422 i_data = i_data + iq_offset 

423 q_data = q_data + iq_offset 

424 

425 # Extract timing information 

426 x_spacing = 1e-6 # Default 1 microsecond per sample 

427 if hasattr(wfm, "x_axis_spacing") and wfm.x_axis_spacing: 

428 x_spacing = float(wfm.x_axis_spacing) 

429 

430 sample_rate = 1.0 / x_spacing if x_spacing > 0 else 1e6 

431 

432 # Extract channel name 

433 channel_name = "IQ1" 

434 if hasattr(wfm, "source_name") and wfm.source_name: 

435 channel_name = wfm.source_name 

436 

437 # Build metadata 

438 metadata = TraceMetadata( 

439 sample_rate=sample_rate, 

440 source_file=str(path), 

441 channel_name=channel_name, 

442 ) 

443 

444 return IQTrace(i_data=i_data, q_data=q_data, metadata=metadata) 

445 

446 

447def _load_basic( 

448 path: Path, 

449 *, 

450 channel: int = 0, 

451) -> WaveformTrace: 

452 """Basic Tektronix WFM loader without tm_data_types. 

453 

454 This is a simplified loader that reads the basic waveform data 

455 from Tektronix WFM files, including support for WFM#003 format. 

456 For full feature support, install tm_data_types. 

457 

458 Args: 

459 path: Path to the WFM file. 

460 channel: Channel index (ignored in basic mode). 

461 

462 Returns: 

463 WaveformTrace with basic metadata. 

464 

465 Raises: 

466 FormatError: If the file format is invalid or cannot be parsed. 

467 LoaderError: If the file cannot be read. 

468 """ 

469 try: 

470 with open(path, "rb") as f: 

471 # Read full file for format detection 

472 file_data = f.read() 

473 

474 if len(file_data) < MIN_WFM_FILE_SIZE: 

475 raise FormatError( 

476 "File too small to be a valid Tektronix WFM", 

477 file_path=str(path), 

478 expected=f"At least {MIN_WFM_FILE_SIZE} bytes", 

479 got=f"{len(file_data)} bytes", 

480 ) 

481 

482 # Detect WFM format version 

483 if file_data[2:10] == b":WFM#003": 

484 return _parse_wfm003(file_data, path, channel) 

485 else: 

486 # Legacy WFM format (older versions) 

487 return _parse_wfm_legacy(file_data, path, channel) 

488 

489 except OSError as e: 

490 raise LoaderError( 

491 "Failed to read Tektronix WFM file", 

492 file_path=str(path), 

493 details=str(e), 

494 ) from e 

495 except Exception as e: 

496 if isinstance(e, LoaderError | FormatError): 496 ↛ 498line 496 didn't jump to line 498 because the condition on line 496 was always true

497 raise 

498 raise LoaderError( 

499 "Failed to parse Tektronix WFM file", 

500 file_path=str(path), 

501 details=str(e), 

502 fix_hint="Install tm_data_types for full Tektronix support: pip install tm_data_types", 

503 ) from e 

504 

505 

506def _parse_wfm003( 

507 file_data: bytes, 

508 path: Path, 

509 channel: int = 0, 

510) -> WaveformTrace: 

511 """Parse Tektronix WFM#003 format files. 

512 

513 WFM#003 is a binary format used by Tektronix oscilloscopes. 

514 The file structure consists of: 

515 - Static file header (first ~80 bytes) 

516 - Main waveform header (~838 bytes total) 

517 - Waveform data (int16 samples) 

518 - Optional metadata footer (tekmeta!) 

519 

520 Args: 

521 file_data: Raw file bytes. 

522 path: Path to file (for error messages). 

523 channel: Channel index. 

524 

525 Returns: 

526 WaveformTrace with extracted data and metadata. 

527 

528 Raises: 

529 FormatError: If the file signature is invalid or no waveform data found. 

530 """ 

531 import struct 

532 

533 # Validate signature 

534 signature = file_data[2:10] 

535 if signature != b":WFM#003": 

536 raise FormatError( 

537 "Invalid WFM#003 signature", 

538 file_path=str(path), 

539 expected=":WFM#003", 

540 got=signature.decode("latin-1", errors="replace"), 

541 ) 

542 

543 # WFM#003 files have a fixed header size of 838 bytes 

544 # This is consistent across all WFM#003 files 

545 header_size = 838 

546 

547 # Find metadata footer (tekmeta!) if present 

548 # This helps us determine where waveform data ends 

549 footer_start = len(file_data) 

550 if b"tekmeta!" in file_data: 

551 footer_start = file_data.find(b"tekmeta!") 

552 

553 # Extract waveform data region 

554 data_start = header_size 

555 data_end = footer_start 

556 waveform_bytes = file_data[data_start:data_end] 

557 

558 if len(waveform_bytes) < 2: 

559 raise FormatError( 

560 "No waveform data found in WFM#003 file", 

561 file_path=str(path), 

562 ) 

563 

564 # WFM#003 data is stored as int16 (16-bit signed integers) 

565 # Ensure we have an even number of bytes 

566 if len(waveform_bytes) % 2 != 0: 

567 waveform_bytes = waveform_bytes[:-1] 

568 

569 # Parse as int16 little-endian 

570 data = np.frombuffer(waveform_bytes, dtype=np.int16).astype(np.float64) 

571 

572 # Try to extract metadata from header 

573 sample_rate = 1e6 # Default 1 MSa/s 

574 vertical_scale = None 

575 vertical_offset = None 

576 channel_name = f"CH{channel + 1}" 

577 

578 # Try to find sample interval in header 

579 # The header contains doubles at various offsets 

580 # Sample interval is typically found in the horizontal dimension info 

581 try: 

582 # Search for reasonable sample interval values (doubles in header) 

583 for offset in range(16, min(header_size - 8, 200), 8): 583 ↛ 594line 583 didn't jump to line 594 because the loop on line 583 didn't complete

584 val = struct.unpack("<d", file_data[offset : offset + 8])[0] 

585 # Sample intervals are typically 1e-12 to 1e-3 (1ps to 1ms) 

586 if 1e-12 < val < 1e-3: 586 ↛ 583line 586 didn't jump to line 583 because the condition on line 586 was always true

587 sample_rate = 1.0 / val 

588 break 

589 except (struct.error, ZeroDivisionError): 

590 pass 

591 

592 # Try to extract vertical scale/offset 

593 # These are also doubles in the header 

594 try: 

595 # Vertical scale is often in a specific range 

596 for offset in range(16, min(header_size - 8, 400), 8): 596 ↛ 610line 596 didn't jump to line 610 because the loop on line 596 didn't complete

597 val = struct.unpack("<d", file_data[offset : offset + 8])[0] 

598 # Vertical scale is typically 1e-9 to 1e3 (nV to kV range) 

599 if 1e-9 < abs(val) < 1e3 and vertical_scale is None: 599 ↛ 596line 599 didn't jump to line 596 because the condition on line 599 was always true

600 vertical_scale = abs(val) 

601 # Offset might be nearby 

602 next_val = struct.unpack("<d", file_data[offset + 8 : offset + 16])[0] 

603 if abs(next_val) < 1e6: 603 ↛ 605line 603 didn't jump to line 605 because the condition on line 603 was always true

604 vertical_offset = next_val 

605 break 

606 except struct.error: 

607 pass 

608 

609 # Build metadata 

610 metadata = TraceMetadata( 

611 sample_rate=sample_rate, 

612 vertical_scale=vertical_scale, 

613 vertical_offset=vertical_offset, 

614 source_file=str(path), 

615 channel_name=channel_name, 

616 ) 

617 

618 return WaveformTrace(data=data, metadata=metadata) 

619 

620 

621def _parse_wfm_legacy( 

622 file_data: bytes, 

623 path: Path, 

624 channel: int = 0, 

625) -> WaveformTrace: 

626 """Parse legacy Tektronix WFM formats (pre-WFM#003). 

627 

628 Args: 

629 file_data: Raw file bytes. 

630 path: Path to file (for error messages). 

631 channel: Channel index. 

632 

633 Returns: 

634 WaveformTrace with extracted data and metadata. 

635 

636 Raises: 

637 FormatError: If no waveform data is found in the file. 

638 """ 

639 import struct 

640 

641 # Default values 

642 sample_rate = 1e6 # Default 1 MSa/s 

643 vertical_scale = None 

644 vertical_offset = None 

645 

646 # Try to find sample interval in header (little-endian double at offset ~40) 

647 try: 

648 # Sample interval is typically at offset 40 in many WFM versions 

649 sample_interval_bytes = file_data[40:48] 

650 if len(sample_interval_bytes) == 8: 650 ↛ 658line 650 didn't jump to line 658 because the condition on line 650 was always true

651 sample_interval = struct.unpack("<d", sample_interval_bytes)[0] 

652 if 0 < sample_interval < 1: # Sanity check 

653 sample_rate = 1.0 / sample_interval 

654 except (struct.error, ZeroDivisionError): 

655 pass 

656 

657 # Read waveform data - assume rest of file is float32 samples after 512-byte header 

658 header_size = 512 

659 data_size = len(file_data) - header_size 

660 

661 if data_size <= 0: 

662 raise FormatError( 

663 "No waveform data in file", 

664 file_path=str(path), 

665 ) 

666 

667 raw_data = file_data[header_size:] 

668 

669 # Try to interpret as float32 or int16 

670 try: 

671 # Try float32 first (common in Tektronix files) 

672 data = np.frombuffer(raw_data, dtype=np.float32).astype(np.float64) 

673 except ValueError: 

674 # Fall back to int16 

675 data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) 

676 data = data / 32768.0 # Normalize to -1 to 1 

677 

678 # Build metadata 

679 metadata = TraceMetadata( 

680 sample_rate=sample_rate, 

681 vertical_scale=vertical_scale, 

682 vertical_offset=vertical_offset, 

683 source_file=str(path), 

684 channel_name=f"CH{channel + 1}", 

685 ) 

686 

687 return WaveformTrace(data=data, metadata=metadata) 

688 

689 

690def _extract_trigger_info(wfm: Any) -> dict[str, Any] | None: 

691 """Extract trigger information from Tektronix waveform object. 

692 

693 Args: 

694 wfm: Tektronix waveform object from tm_data_types. 

695 

696 Returns: 

697 Dictionary of trigger settings or None. 

698 """ 

699 trigger_info: dict[str, Any] = {} 

700 

701 if hasattr(wfm, "trigger_level"): 

702 trigger_info["level"] = wfm.trigger_level 

703 if hasattr(wfm, "trigger_slope"): 

704 trigger_info["slope"] = wfm.trigger_slope 

705 if hasattr(wfm, "trigger_position"): 

706 trigger_info["position"] = wfm.trigger_position 

707 

708 return trigger_info if trigger_info else None 

709 

710 

711__all__ = ["TektronixTrace", "load_tektronix_wfm"]