Coverage for src / tracekit / loaders / lazy.py: 99%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Lazy loading for huge waveform files. 

2 

3This module provides memory-mapped file loading where metadata is loaded 

4immediately but data is deferred until first access. Useful for multi-GB files. 

5""" 

6 

7from __future__ import annotations 

8 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12import numpy as np 

13 

14from tracekit.core.exceptions import LoaderError 

15 

16if TYPE_CHECKING: 

17 from numpy.typing import DTypeLike, NDArray 

18 

19 from tracekit.core.types import WaveformTrace 

20 

21 

22class LazyWaveformTrace: 

23 """Lazy-loading wrapper for WaveformTrace. 

24 

25 Loads metadata immediately but defers data loading until first access. 

26 Uses numpy.memmap for efficient memory-mapped file access. 

27 

28 Example: 

29 >>> from tracekit.loaders.lazy import load_trace_lazy 

30 >>> trace = load_trace_lazy('huge_trace.npy', lazy=True) 

31 >>> # Metadata available immediately 

32 >>> print(f"Length: {trace.length}, Sample rate: {trace.sample_rate}") 

33 >>> # Data loaded on first access 

34 >>> data = trace.data # Loads data now 

35 >>> subset = trace[1000:2000] # Only loads requested slice 

36 

37 References: 

38 API-017: Lazy Loading for Huge Files 

39 """ 

40 

41 def __init__( 

42 self, 

43 file_path: str | Path, 

44 sample_rate: float, 

45 length: int, 

46 *, 

47 dtype: DTypeLike = np.float64, 

48 offset: int = 0, 

49 metadata: dict[str, Any] | None = None, 

50 ) -> None: 

51 """Initialize lazy trace. 

52 

53 Args: 

54 file_path: Path to binary data file. 

55 sample_rate: Sample rate in Hz. 

56 length: Number of samples. 

57 dtype: Data type of samples. 

58 offset: Byte offset to start of data in file. 

59 metadata: Additional metadata. 

60 

61 Raises: 

62 LoaderError: If file not found. 

63 

64 Example: 

65 >>> trace = LazyWaveformTrace( 

66 ... file_path='trace.npy', 

67 ... sample_rate=1e9, 

68 ... length=10_000_000 

69 ... ) 

70 """ 

71 self._file_path = Path(file_path) 

72 self._sample_rate = sample_rate 

73 self._length = length 

74 self._dtype = np.dtype(dtype) 

75 self._offset = offset 

76 self._metadata = metadata or {} 

77 

78 # Deferred data - loaded on first access 

79 self._data: NDArray[np.float64] | None = None 

80 self._memmap: np.memmap[Any, np.dtype[Any]] | None = None 

81 

82 # Verify file exists 

83 if not self._file_path.exists(): 

84 raise LoaderError(f"File not found: {self._file_path}") 

85 

86 @property 

87 def sample_rate(self) -> float: 

88 """Sample rate in Hz.""" 

89 return self._sample_rate 

90 

91 @property 

92 def length(self) -> int: 

93 """Number of samples.""" 

94 return self._length 

95 

96 @property 

97 def duration(self) -> float: 

98 """Duration in seconds.""" 

99 return self._length / self._sample_rate 

100 

101 @property 

102 def metadata(self) -> dict[str, Any]: 

103 """Metadata dictionary.""" 

104 return self._metadata 

105 

106 @property 

107 def data(self) -> NDArray[np.float64]: 

108 """Waveform data array. 

109 

110 Loads data on first access. Subsequent accesses return cached data. 

111 

112 Returns: 

113 Numpy array of waveform samples. 

114 """ 

115 if self._data is None: 

116 self._load_data() 

117 return self._data # type: ignore[return-value] 

118 

119 @property 

120 def time_vector(self) -> NDArray[np.float64]: 

121 """Time vector in seconds. 

122 

123 Returns: 

124 Array of time values corresponding to samples. 

125 """ 

126 return np.arange(self._length) / self._sample_rate 

127 

128 def _load_data(self) -> None: 

129 """Load data from file using memory mapping.""" 

130 try: 

131 # Use memmap for efficient access 

132 self._memmap = np.memmap( 

133 self._file_path, 

134 dtype=self._dtype, 

135 mode="r", 

136 offset=self._offset, 

137 shape=(self._length,), 

138 ) 

139 

140 # Convert to regular array (copies data into memory) 

141 self._data = np.array(self._memmap, dtype=np.float64) 

142 

143 except Exception as e: 

144 raise LoaderError(f"Failed to load data from {self._file_path}: {e}") from e 

145 

146 def __getitem__(self, key: int | slice) -> LazyWaveformTrace | float: 

147 """Slice the trace. 

148 

149 Slicing remains lazy - only loads requested portion. 

150 

151 Args: 

152 key: Index or slice. 

153 

154 Returns: 

155 LazyWaveformTrace for slice, float for single index. 

156 

157 Raises: 

158 TypeError: If key is not int or slice. 

159 

160 Example: 

161 >>> subset = trace[1000:2000] # Lazy - doesn't load full data 

162 >>> sample = trace[500] # Loads single sample 

163 """ 

164 if isinstance(key, int): 

165 # Load single sample 

166 if self._memmap is None: 166 ↛ 174line 166 didn't jump to line 174 because the condition on line 166 was always true

167 self._memmap = np.memmap( 

168 self._file_path, 

169 dtype=self._dtype, 

170 mode="r", 

171 offset=self._offset, 

172 shape=(self._length,), 

173 ) 

174 return float(self._memmap[key]) 

175 

176 elif isinstance(key, slice): 

177 # Create new lazy trace for slice 

178 start, stop, step = key.indices(self._length) 

179 

180 if step != 1: 

181 # Non-unit step requires loading data 

182 if self._data is None: 182 ↛ 184line 182 didn't jump to line 184 because the condition on line 182 was always true

183 self._load_data() 

184 sliced_data = self._data[key] # type: ignore[index] 

185 

186 # Return eager trace 

187 from tracekit.core.types import TraceMetadata, WaveformTrace 

188 

189 metadata = TraceMetadata( 

190 sample_rate=self._sample_rate, 

191 **self._metadata, # type: ignore[arg-type] 

192 ) 

193 return WaveformTrace(data=sliced_data, metadata=metadata) # type: ignore[return-value] 

194 

195 # Create lazy slice 

196 length = stop - start 

197 offset = self._offset + start * self._dtype.itemsize 

198 

199 return LazyWaveformTrace( 

200 file_path=self._file_path, 

201 sample_rate=self._sample_rate, 

202 length=length, 

203 dtype=self._dtype, 

204 offset=offset, 

205 metadata=self._metadata.copy(), 

206 ) 

207 

208 else: 

209 raise TypeError(f"Indices must be int or slice, not {type(key)}") 

210 

211 def to_eager(self) -> WaveformTrace: 

212 """Convert to regular WaveformTrace by loading all data. 

213 

214 Returns: 

215 WaveformTrace with data loaded in memory. 

216 

217 Example: 

218 >>> eager_trace = lazy_trace.to_eager() 

219 """ 

220 from tracekit.core.types import TraceMetadata, WaveformTrace 

221 

222 metadata = TraceMetadata( 

223 sample_rate=self._sample_rate, 

224 **self._metadata, # type: ignore[arg-type] 

225 ) 

226 return WaveformTrace(data=self.data, metadata=metadata) 

227 

228 def close(self) -> None: 

229 """Close memory-mapped file handle. 

230 

231 Should be called when done with the trace to free resources. 

232 

233 Example: 

234 >>> trace = load_trace_lazy('huge_file.npy', lazy=True) 

235 >>> # ... use trace ... 

236 >>> trace.close() 

237 """ 

238 if self._memmap is not None: 

239 del self._memmap 

240 self._memmap = None 

241 

242 def __del__(self) -> None: 

243 """Cleanup memory map on deletion.""" 

244 self.close() 

245 

246 def __repr__(self) -> str: 

247 """String representation.""" 

248 return ( 

249 f"LazyWaveformTrace(file={self._file_path.name}, " 

250 f"sample_rate={self._sample_rate:.2e}, " 

251 f"length={self._length}, " 

252 f"loaded={self._data is not None})" 

253 ) 

254 

255 def __len__(self) -> int: 

256 """Number of samples.""" 

257 return self._length 

258 

259 

260def load_trace_lazy( 

261 file_path: str | Path, 

262 sample_rate: float | None = None, 

263 *, 

264 lazy: bool = True, 

265 **kwargs: Any, 

266) -> LazyWaveformTrace | WaveformTrace: 

267 """Load trace with optional lazy loading. 

268 

269 Loads metadata immediately but defers data loading until first access 

270 if lazy=True. 

271 

272 Args: 

273 file_path: Path to trace file (must be .npy or raw binary). 

274 sample_rate: Sample rate in Hz (required for raw files). 

275 lazy: If True, defer data loading. If False, load immediately. 

276 **kwargs: Additional arguments (dtype, offset, etc.). 

277 

278 Returns: 

279 LazyWaveformTrace if lazy=True, otherwise WaveformTrace. 

280 

281 Raises: 

282 LoaderError: If file not found or has invalid format. 

283 

284 Example: 

285 >>> # Lazy loading for huge files 

286 >>> trace = load_trace_lazy('10GB_trace.npy', lazy=True) 

287 >>> print(f"Duration: {trace.duration} seconds") # No data loaded yet 

288 >>> data_subset = trace[1000:2000].data # Only loads this slice 

289 >>> 

290 >>> # Eager loading 

291 >>> trace = load_trace_lazy('small_trace.npy', lazy=False) 

292 >>> data = trace.data # Already loaded 

293 

294 References: 

295 API-017: Lazy Loading for Huge Files 

296 """ 

297 file_path = Path(file_path) 

298 

299 if not file_path.exists(): 

300 raise LoaderError(f"File not found: {file_path}") 

301 

302 # Determine format 

303 suffix = file_path.suffix.lower() 

304 

305 if suffix == ".npy": 

306 # NumPy format - can read shape and dtype without loading data 

307 with open(file_path, "rb") as f: 

308 # Read NumPy header 

309 import numpy.lib.format as npf 

310 

311 npf.read_magic(f) # type: ignore[no-untyped-call] 

312 shape, _fortran_order, dtype = npf.read_array_header_1_0(f) # type: ignore[no-untyped-call] 

313 offset = f.tell() 

314 

315 if not isinstance(shape, tuple) or len(shape) != 1: 

316 raise LoaderError(f"Expected 1D array, got shape {shape}") 

317 

318 length = shape[0] 

319 

320 # Get sample rate from metadata or argument 

321 if sample_rate is None: 

322 raise LoaderError("sample_rate is required for .npy files") 

323 

324 if lazy: 

325 return LazyWaveformTrace( 

326 file_path=file_path, 

327 sample_rate=sample_rate, 

328 length=length, 

329 dtype=dtype, 

330 offset=offset, 

331 ) 

332 else: 

333 # Load eagerly 

334 from tracekit.core.types import TraceMetadata, WaveformTrace 

335 

336 data = np.load(file_path).astype(np.float64) 

337 metadata = TraceMetadata(sample_rate=sample_rate) 

338 return WaveformTrace(data=data, metadata=metadata) 

339 

340 else: 

341 # Raw binary - need sample rate and dtype 

342 if sample_rate is None: 

343 raise LoaderError("sample_rate is required for raw binary files") 

344 

345 dtype = kwargs.get("dtype", np.float64) 

346 offset = kwargs.get("offset", 0) 

347 

348 # Compute length from file size 

349 file_size = file_path.stat().st_size - offset 

350 dtype_size = np.dtype(dtype).itemsize 

351 length = file_size // dtype_size 

352 

353 if lazy: 

354 return LazyWaveformTrace( 

355 file_path=file_path, 

356 sample_rate=sample_rate, 

357 length=length, 

358 dtype=dtype, 

359 offset=offset, 

360 ) 

361 else: 

362 # Load eagerly 

363 from tracekit.core.types import TraceMetadata, WaveformTrace 

364 

365 data = np.fromfile(file_path, dtype=dtype, count=length, offset=offset) 

366 metadata = TraceMetadata(sample_rate=sample_rate) 

367 return WaveformTrace(data=data.astype(np.float64), metadata=metadata) 

368 

369 

370__all__ = ["LazyWaveformTrace", "load_trace_lazy"]