Coverage for src / tracekit / exporters / npz_export.py: 72%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""NumPy NPZ export functionality for TraceKit. 

2 

3This module provides export to NumPy's compressed archive format for 

4efficient storage and fast loading of trace data. 

5 

6 

7Example: 

8 >>> from tracekit.exporters.npz_export import export_npz 

9 >>> export_npz(trace, "waveform.npz") 

10 >>> # Load later with numpy 

11 >>> import numpy as np 

12 >>> data = np.load("waveform.npz") 

13 >>> signal = data['signal'] 

14 >>> sample_rate = float(data['sample_rate']) 

15""" 

16 

17from __future__ import annotations 

18 

19import contextlib 

20from pathlib import Path 

21from typing import TYPE_CHECKING, Any 

22 

23import numpy as np 

24 

25from tracekit.core.types import DigitalTrace, WaveformTrace 

26 

27if TYPE_CHECKING: 

28 from numpy.typing import NDArray 

29 

30 

31def export_npz( 

32 data: WaveformTrace | DigitalTrace | dict[str, Any] | NDArray[Any], 

33 path: str | Path, 

34 *, 

35 compressed: bool = True, 

36 include_metadata: bool = True, 

37 include_time: bool = False, 

38) -> None: 

39 """Export data to NumPy NPZ archive format. 

40 

41 Creates a NumPy .npz file containing the trace data and optional metadata. 

42 Files can be loaded with `numpy.load()` for fast array access. 

43 

44 Args: 

45 data: Data to export. Can be: 

46 - WaveformTrace or DigitalTrace 

47 - Dictionary of arrays 

48 - NumPy array 

49 path: Output file path (should end with .npz). 

50 compressed: Use compression (default True). Results in smaller files 

51 but slightly slower save/load. 

52 include_metadata: Include metadata in the archive. 

53 include_time: Include precomputed time array (increases file size). 

54 

55 Raises: 

56 TypeError: If data type is not supported. 

57 

58 Example: 

59 >>> export_npz(trace, "waveform.npz") 

60 >>> # Load later 

61 >>> data = np.load("waveform.npz") 

62 >>> signal = data['signal'] 

63 >>> sample_rate = float(data['sample_rate']) 

64 >>> time = np.arange(len(signal)) / sample_rate 

65 

66 References: 

67 EXP-004 

68 """ 

69 path = Path(path) 

70 

71 # Ensure .npz extension 

72 if path.suffix != ".npz": 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 path = path.with_suffix(".npz") 

74 

75 if isinstance(data, WaveformTrace | DigitalTrace): 

76 _export_trace(data, path, compressed, include_metadata, include_time) 

77 elif isinstance(data, dict): 

78 _export_dict(data, path, compressed) 

79 elif isinstance(data, np.ndarray): 79 ↛ 82line 79 didn't jump to line 82 because the condition on line 79 was always true

80 _export_array(data, path, compressed) 

81 else: 

82 raise TypeError(f"Unsupported data type: {type(data)}") 

83 

84 

85def _export_trace( 

86 trace: WaveformTrace | DigitalTrace, 

87 path: Path, 

88 compressed: bool, 

89 include_metadata: bool, 

90 include_time: bool, 

91) -> None: 

92 """Export trace to NPZ. 

93 

94 Args: 

95 trace: Trace to export. 

96 path: Output file path. 

97 compressed: Use compression. 

98 include_metadata: Include metadata arrays. 

99 include_time: Include time array. 

100 """ 

101 arrays: dict[str, Any] = {} 

102 

103 # Main signal data 

104 arrays["signal"] = trace.data 

105 

106 # Time array (optional - can be reconstructed from sample_rate) 

107 if include_time: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 arrays["time"] = trace.time_vector 

109 

110 # Metadata as scalars 

111 if include_metadata: 111 ↛ 135line 111 didn't jump to line 135 because the condition on line 111 was always true

112 meta = trace.metadata 

113 arrays["sample_rate"] = np.array(meta.sample_rate) 

114 arrays["samples"] = np.array(len(trace.data)) 

115 

116 if hasattr(meta, "channel"): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 arrays["channel"] = np.array(str(meta.channel or ""), dtype="U64") 

118 

119 if hasattr(meta, "source_file") and meta.source_file: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 arrays["source_file"] = np.array(str(meta.source_file), dtype="U256") 

121 

122 if hasattr(meta, "capture_time") and meta.capture_time: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 arrays["capture_time"] = np.array(meta.capture_time.isoformat(), dtype="U64") 

124 

125 if hasattr(meta, "units") and meta.units: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 arrays["units"] = np.array(str(meta.units), dtype="U16") 

127 

128 # Add trace type marker 

129 if isinstance(trace, DigitalTrace): 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 arrays["trace_type"] = np.array("digital", dtype="U16") 

131 else: 

132 arrays["trace_type"] = np.array("waveform", dtype="U16") 

133 

134 # Save 

135 if compressed: 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true

136 np.savez_compressed(path, **arrays) 

137 else: 

138 np.savez(path, **arrays) 

139 

140 

141def _export_dict( 

142 data: dict[str, Any], 

143 path: Path, 

144 compressed: bool, 

145) -> None: 

146 """Export dictionary of arrays to NPZ. 

147 

148 Args: 

149 data: Dictionary to export. 

150 path: Output file path. 

151 compressed: Use compression. 

152 """ 

153 # Convert values to arrays 

154 arrays = {} 

155 for key, value in data.items(): 

156 if isinstance(value, np.ndarray): 

157 arrays[key] = value 

158 elif isinstance(value, list | tuple | int | float): 158 ↛ 160line 158 didn't jump to line 160 because the condition on line 158 was always true

159 arrays[key] = np.array(value) 

160 elif isinstance(value, str): 

161 arrays[key] = np.array(value, dtype="U256") 

162 else: 

163 # Try to convert, skip on failure 

164 with contextlib.suppress(TypeError, ValueError): 

165 arrays[key] = np.array(value) 

166 

167 if compressed: 167 ↛ 170line 167 didn't jump to line 170 because the condition on line 167 was always true

168 np.savez_compressed(path, **arrays) 

169 else: 

170 np.savez(path, **arrays) 

171 

172 

173def _export_array( 

174 data: NDArray[Any], 

175 path: Path, 

176 compressed: bool, 

177) -> None: 

178 """Export single array to NPZ. 

179 

180 Args: 

181 data: Array to export. 

182 path: Output file path. 

183 compressed: Use compression. 

184 """ 

185 if compressed: 

186 np.savez_compressed(path, data=data) 

187 else: 

188 np.savez(path, data=data) 

189 

190 

191def load_npz(path: str | Path) -> dict[str, NDArray[Any]]: 

192 """Load NPZ file and return dictionary of arrays. 

193 

194 Convenience wrapper around numpy.load() that returns a regular dict. 

195 

196 Args: 

197 path: Path to NPZ file. 

198 

199 Returns: 

200 Dictionary mapping array names to numpy arrays. 

201 

202 Example: 

203 >>> data = load_npz("waveform.npz") 

204 >>> signal = data['signal'] 

205 >>> sample_rate = float(data['sample_rate']) 

206 

207 References: 

208 EXP-004 

209 """ 

210 path = Path(path) 

211 

212 with np.load(path) as npz: 

213 return {key: npz[key] for key in npz.files} 

214 

215 

216__all__ = [ 

217 "export_npz", 

218 "load_npz", 

219]