Coverage for src / tracekit / loaders / mmap_loader.py: 79%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Memory-mapped file loader for huge waveform files. 

2 

3This module provides efficient memory-mapped loading for GB+ files that cannot 

4fit in RAM. Unlike eager loading, memory-mapped arrays don't load the entire 

5file into memory but access it in chunks on-demand via the OS page cache. 

6 

7Key features: 

8- Zero-copy data access via numpy.memmap 

9- Chunked iteration for processing huge files 

10- Integration with existing TraceKit loader infrastructure 

11- Support for common binary formats (raw, NPY, structured) 

12- Automatic fallback to regular loading for small files 

13 

14Example: 

15 >>> from tracekit.loaders.mmap_loader import load_mmap 

16 >>> # Load 10 GB file without loading all data to RAM 

17 >>> trace = load_mmap("huge_trace.npy", sample_rate=1e9) 

18 >>> print(f"Length: {len(trace)} samples") 

19 >>> 

20 >>> # Process in chunks to avoid OOM 

21 >>> for chunk in trace.iter_chunks(chunk_size=1_000_000): 

22 ... result = analyze_chunk(chunk) 

23 

24References: 

25 Performance optimization for huge files (>1 GB) 

26 API-017: Lazy Loading for Huge Files 

27""" 

28 

29from __future__ import annotations 

30 

31from collections.abc import Iterator 

32from pathlib import Path 

33from typing import TYPE_CHECKING, Any 

34 

35import numpy as np 

36 

37from tracekit.core.exceptions import LoaderError 

38from tracekit.core.types import TraceMetadata 

39 

40if TYPE_CHECKING: 

41 from os import PathLike 

42 

43 from numpy.typing import DTypeLike, NDArray 

44 

45 

46# File size threshold for automatic mmap suggestion (1 GB) 

47MMAP_THRESHOLD = 1024 * 1024 * 1024 

48 

49 

50class MmapWaveformTrace: 

51 """Memory-mapped waveform trace for huge files. 

52 

53 Provides lazy access to waveform data via memory mapping. Data is not 

54 loaded into RAM but accessed directly from disk through the OS page cache. 

55 

56 This allows working with files larger than available RAM without OOM errors. 

57 

58 Attributes: 

59 file_path: Path to the memory-mapped file. 

60 sample_rate: Sample rate in Hz. 

61 length: Number of samples in the trace. 

62 dtype: NumPy dtype of the samples. 

63 metadata: Additional trace metadata. 

64 

65 Example: 

66 >>> trace = MmapWaveformTrace( 

67 ... file_path="huge_trace.bin", 

68 ... sample_rate=1e9, 

69 ... length=10_000_000_000, 

70 ... dtype=np.float32 

71 ... ) 

72 >>> # Access subset without loading entire file 

73 >>> subset = trace[1000:2000] 

74 >>> # Process in chunks 

75 >>> for chunk in trace.iter_chunks(chunk_size=1_000_000): 

76 ... process(chunk) 

77 """ 

78 

79 def __init__( 

80 self, 

81 file_path: str | Path, 

82 sample_rate: float, 

83 length: int, 

84 *, 

85 dtype: DTypeLike = np.float64, 

86 offset: int = 0, 

87 metadata: dict[str, Any] | None = None, 

88 mode: str = "r", 

89 ) -> None: 

90 """Initialize memory-mapped trace. 

91 

92 Args: 

93 file_path: Path to binary data file. 

94 sample_rate: Sample rate in Hz. 

95 length: Number of samples. 

96 dtype: Data type of samples. 

97 offset: Byte offset to start of data in file. 

98 metadata: Additional metadata dictionary. 

99 mode: File access mode ('r' for read-only, 'r+' for read-write). 

100 

101 Raises: 

102 LoaderError: If file not found or invalid parameters. 

103 

104 Example: 

105 >>> trace = MmapWaveformTrace( 

106 ... file_path="trace.f32", 

107 ... sample_rate=1e9, 

108 ... length=1_000_000_000, 

109 ... dtype=np.float32 

110 ... ) 

111 """ 

112 self._file_path = Path(file_path) 

113 self._sample_rate = float(sample_rate) 

114 self._length = int(length) 

115 self._dtype = np.dtype(dtype) 

116 self._offset = int(offset) 

117 self._metadata = metadata or {} 

118 self._mode = mode 

119 

120 # Memory-mapped array - created on first access 

121 self._memmap: np.memmap[Any, np.dtype[Any]] | None = None 

122 

123 # Validate inputs 

124 if self._sample_rate <= 0: 

125 raise LoaderError(f"sample_rate must be positive, got {self._sample_rate}") 

126 if self._length < 0: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 raise LoaderError(f"length must be non-negative, got {self._length}") 

128 if self._offset < 0: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 raise LoaderError(f"offset must be non-negative, got {self._offset}") 

130 

131 # Verify file exists 

132 if not self._file_path.exists(): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 raise LoaderError(f"File not found: {self._file_path}") 

134 

135 # Verify file size 

136 expected_size = self._offset + self._length * self._dtype.itemsize 

137 actual_size = self._file_path.stat().st_size 

138 if actual_size < expected_size: 

139 raise LoaderError( 

140 f"File too small for requested data. " 

141 f"Expected at least {expected_size} bytes, got {actual_size} bytes", 

142 file_path=str(self._file_path), 

143 ) 

144 

145 @property 

146 def sample_rate(self) -> float: 

147 """Sample rate in Hz.""" 

148 return self._sample_rate 

149 

150 @property 

151 def length(self) -> int: 

152 """Number of samples.""" 

153 return self._length 

154 

155 @property 

156 def duration(self) -> float: 

157 """Duration in seconds.""" 

158 return self._length / self._sample_rate 

159 

160 @property 

161 def metadata(self) -> dict[str, Any]: 

162 """Metadata dictionary.""" 

163 return self._metadata 

164 

165 @property 

166 def dtype(self) -> np.dtype[Any]: 

167 """Data type of samples.""" 

168 return self._dtype 

169 

170 @property 

171 def file_path(self) -> Path: 

172 """Path to memory-mapped file.""" 

173 return self._file_path 

174 

175 @property 

176 def data(self) -> np.memmap[Any, np.dtype[Any]]: 

177 """Memory-mapped data array. 

178 

179 Returns a numpy.memmap object that behaves like a numpy array 

180 but doesn't load data into memory until accessed. 

181 

182 Returns: 

183 Memory-mapped numpy array. 

184 

185 Example: 

186 >>> trace = load_mmap("huge.npy", sample_rate=1e9) 

187 >>> data = trace.data # No data loaded yet 

188 >>> subset = data[1000:2000] # Only this range loaded 

189 """ 

190 if self._memmap is None: 

191 self._memmap = np.memmap( # type: ignore[call-overload] 

192 str(self._file_path), 

193 dtype=self._dtype, 

194 mode=self._mode, 

195 offset=self._offset, 

196 shape=(self._length,), 

197 ) 

198 return self._memmap 

199 

200 @property 

201 def time_vector(self) -> NDArray[np.float64]: 

202 """Time vector in seconds. 

203 

204 Note: For huge traces, this can consume significant memory. 

205 Consider using time values on-demand instead. 

206 

207 Returns: 

208 Array of time values corresponding to samples. 

209 

210 Example: 

211 >>> # For huge traces, avoid materializing full time vector 

212 >>> # Instead compute on-demand: 

213 >>> t_start = 0 

214 >>> t_end = trace.length / trace.sample_rate 

215 """ 

216 return np.arange(self._length, dtype=np.float64) / self._sample_rate 

217 

218 def __getitem__(self, key: int | slice) -> float | NDArray[np.float64]: 

219 """Slice the memory-mapped trace. 

220 

221 Supports both integer indexing and slicing. Only the requested 

222 portion is loaded from disk. 

223 

224 Args: 

225 key: Index or slice. 

226 

227 Returns: 

228 Single sample (float) or array slice. 

229 

230 Raises: 

231 TypeError: If key is not int or slice. 

232 IndexError: If index out of range. 

233 

234 Example: 

235 >>> sample = trace[1000] # Load single sample 

236 >>> chunk = trace[1000:2000] # Load 1000 samples 

237 >>> every_10th = trace[::10] # Load decimated data 

238 """ 

239 if isinstance(key, (int, slice)): 239 ↛ 242line 239 didn't jump to line 242 because the condition on line 239 was always true

240 return self.data[key] 

241 else: 

242 raise TypeError(f"Indices must be int or slice, not {type(key).__name__}") 

243 

244 def __len__(self) -> int: 

245 """Number of samples.""" 

246 return self._length 

247 

248 def iter_chunks( 

249 self, chunk_size: int = 1_000_000, overlap: int = 0 

250 ) -> Iterator[NDArray[np.float64]]: 

251 """Iterate over trace in chunks. 

252 

253 Yields consecutive chunks of data, optionally with overlap between 

254 chunks. This is efficient for processing huge files that don't fit 

255 in memory. 

256 

257 Args: 

258 chunk_size: Number of samples per chunk. 

259 overlap: Number of samples to overlap between chunks. 

260 

261 Yields: 

262 Numpy arrays of chunk_size (or smaller for last chunk). 

263 

264 Raises: 

265 ValueError: If chunk_size or overlap invalid. 

266 

267 Example: 

268 >>> # Process 10 GB file in 1M sample chunks 

269 >>> for chunk in trace.iter_chunks(chunk_size=1_000_000): 

270 ... result = compute_fft(chunk) 

271 ... 

272 >>> # With 50% overlap for windowed processing 

273 >>> for chunk in trace.iter_chunks(chunk_size=2048, overlap=1024): 

274 ... spectrum = analyze_spectrum(chunk) 

275 """ 

276 if chunk_size <= 0: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 raise ValueError(f"chunk_size must be positive, got {chunk_size}") 

278 if overlap < 0: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 raise ValueError(f"overlap must be non-negative, got {overlap}") 

280 if overlap >= chunk_size: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 raise ValueError(f"overlap ({overlap}) must be less than chunk_size ({chunk_size})") 

282 

283 data = self.data 

284 step = chunk_size - overlap 

285 

286 for start in range(0, self._length, step): 

287 end = min(start + chunk_size, self._length) 

288 # Convert memmap slice to regular array to avoid keeping file handle open 

289 yield np.asarray(data[start:end], dtype=np.float64) 

290 

291 def to_eager(self) -> Any: 

292 """Convert to eager WaveformTrace by loading all data. 

293 

294 WARNING: This loads the entire file into memory. Only use this 

295 if you're sure the data fits in RAM. 

296 

297 Returns: 

298 WaveformTrace with data loaded in memory. 

299 

300 Raises: 

301 MemoryError: If data doesn't fit in RAM. 

302 

303 Example: 

304 >>> # Only convert to eager if file is small enough 

305 >>> if trace.length < 10_000_000: 

306 ... eager_trace = trace.to_eager() 

307 """ 

308 from tracekit.core.types import WaveformTrace 

309 

310 # Load all data into memory 

311 data = np.asarray(self.data, dtype=np.float64) 

312 

313 metadata = TraceMetadata( 

314 sample_rate=self._sample_rate, 

315 source_file=str(self._file_path), 

316 **self._metadata, # type: ignore[arg-type] 

317 ) 

318 

319 return WaveformTrace(data=data, metadata=metadata) 

320 

321 def close(self) -> None: 

322 """Close memory-mapped file handle. 

323 

324 Should be called when done with the trace to free resources. 

325 The trace cannot be used after closing. 

326 

327 Example: 

328 >>> trace = load_mmap("huge.npy", sample_rate=1e9) 

329 >>> # ... use trace ... 

330 >>> trace.close() 

331 """ 

332 if self._memmap is not None: 

333 # Delete reference to allow garbage collection 

334 del self._memmap 

335 self._memmap = None 

336 

337 def __del__(self) -> None: 

338 """Cleanup memory map on deletion.""" 

339 self.close() 

340 

341 def __repr__(self) -> str: 

342 """String representation.""" 

343 size_mb = (self._length * self._dtype.itemsize) / (1024 * 1024) 

344 return ( 

345 f"MmapWaveformTrace(" 

346 f"file={self._file_path.name}, " 

347 f"sample_rate={self._sample_rate:.2e} Hz, " 

348 f"length={self._length:,} samples, " 

349 f"size={size_mb:.1f} MB, " 

350 f"dtype={self._dtype})" 

351 ) 

352 

353 def __enter__(self) -> MmapWaveformTrace: 

354 """Context manager entry.""" 

355 return self 

356 

357 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

358 """Context manager exit - close the file.""" 

359 self.close() 

360 

361 

362def load_mmap( 

363 file_path: str | PathLike[str], 

364 sample_rate: float | None = None, 

365 *, 

366 dtype: DTypeLike | None = None, 

367 offset: int = 0, 

368 length: int | None = None, 

369 mode: str = "r", 

370 **metadata: Any, 

371) -> MmapWaveformTrace: 

372 """Load waveform file with memory mapping. 

373 

374 Creates a memory-mapped trace that doesn't load data into RAM. 

375 Supports .npy files (auto-detects format) and raw binary files. 

376 

377 Args: 

378 file_path: Path to waveform file (.npy or raw binary). 

379 sample_rate: Sample rate in Hz (required for raw files, optional for .npy). 

380 dtype: Data type (required for raw files, auto-detected for .npy). 

381 offset: Byte offset to data start (auto-computed for .npy). 

382 length: Number of samples (auto-computed if possible). 

383 mode: File access mode ('r' for read-only, 'r+' for read-write). 

384 **metadata: Additional metadata to store. 

385 

386 Returns: 

387 MmapWaveformTrace for memory-mapped access. 

388 

389 Raises: 

390 LoaderError: If file not found or parameters invalid. 

391 

392 Example: 

393 >>> # Load NumPy file (auto-detects format) 

394 >>> trace = load_mmap("huge_trace.npy", sample_rate=1e9) 

395 >>> 

396 >>> # Load raw binary file 

397 >>> trace = load_mmap( 

398 ... "data.f32", 

399 ... sample_rate=1e9, 

400 ... dtype=np.float32, 

401 ... length=1_000_000_000 

402 ... ) 

403 >>> 

404 >>> # Use context manager 

405 >>> with load_mmap("huge.npy", sample_rate=1e9) as trace: 

406 ... for chunk in trace.iter_chunks(chunk_size=1_000_000): 

407 ... process(chunk) 

408 

409 References: 

410 API-017: Lazy Loading for Huge Files 

411 """ 

412 file_path = Path(file_path) 

413 

414 if not file_path.exists(): 

415 raise LoaderError(f"File not found: {file_path}") 

416 

417 suffix = file_path.suffix.lower() 

418 

419 # Handle .npy files with automatic format detection 

420 if suffix == ".npy": 

421 return _load_npy_mmap(file_path, sample_rate, mode, metadata) 

422 

423 # Handle .npz files (not directly memory-mappable, but can extract) 

424 elif suffix == ".npz": 

425 raise LoaderError( 

426 "NPZ files cannot be directly memory-mapped. " 

427 "Extract the array first using np.load() and save as .npy", 

428 file_path=str(file_path), 

429 fix_hint="Use: np.save('array.npy', np.load('file.npz')['array'])", 

430 ) 

431 

432 # Handle raw binary files 

433 else: 

434 if dtype is None: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 raise LoaderError( 

436 "dtype is required for raw binary files", 

437 file_path=str(file_path), 

438 fix_hint="Specify dtype, e.g., dtype=np.float32", 

439 ) 

440 if sample_rate is None: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 raise LoaderError( 

442 "sample_rate is required for raw binary files", 

443 file_path=str(file_path), 

444 ) 

445 

446 # Compute length from file size if not provided 

447 dtype_np = np.dtype(dtype) 

448 if length is None: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true

449 file_size = file_path.stat().st_size - offset 

450 length = file_size // dtype_np.itemsize 

451 

452 return MmapWaveformTrace( 

453 file_path=file_path, 

454 sample_rate=sample_rate, 

455 length=length, 

456 dtype=dtype_np, 

457 offset=offset, 

458 metadata=metadata, 

459 mode=mode, 

460 ) 

461 

462 

463def _load_npy_mmap( 

464 file_path: Path, 

465 sample_rate: float | None, 

466 mode: str, 

467 metadata: dict[str, Any], 

468) -> MmapWaveformTrace: 

469 """Load NumPy .npy file with memory mapping. 

470 

471 Reads the .npy header to extract dtype, shape, and data offset, 

472 then creates a memory-mapped array. 

473 

474 Args: 

475 file_path: Path to .npy file. 

476 sample_rate: Sample rate in Hz (required). 

477 mode: File access mode. 

478 metadata: Additional metadata. 

479 

480 Returns: 

481 MmapWaveformTrace for the .npy file. 

482 

483 Raises: 

484 LoaderError: If sample_rate not provided or file invalid. 

485 """ 

486 if sample_rate is None: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 raise LoaderError( 

488 "sample_rate is required for .npy files", 

489 file_path=str(file_path), 

490 fix_hint="Specify sample_rate, e.g., sample_rate=1e9", 

491 ) 

492 

493 try: 

494 # Read NumPy header without loading data 

495 with open(file_path, "rb") as f: 

496 import numpy.lib.format as npf 

497 

498 # Read header 

499 version = npf.read_magic(f) # type: ignore[no-untyped-call] 

500 

501 if version == (1, 0): 501 ↛ 503line 501 didn't jump to line 503 because the condition on line 501 was always true

502 shape, fortran_order, dtype = npf.read_array_header_1_0(f) # type: ignore[no-untyped-call] 

503 elif version == (2, 0): 

504 shape, fortran_order, dtype = npf.read_array_header_2_0(f) # type: ignore[no-untyped-call] 

505 else: 

506 raise LoaderError( 

507 f"Unsupported NPY version: {version}", 

508 file_path=str(file_path), 

509 ) 

510 

511 # Get data offset 

512 offset = f.tell() 

513 

514 # Validate shape 

515 if not isinstance(shape, tuple): 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 raise LoaderError( 

517 f"Invalid .npy shape: {shape}", 

518 file_path=str(file_path), 

519 ) 

520 

521 if len(shape) != 1: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 raise LoaderError( 

523 f"Expected 1D array, got shape {shape}", 

524 file_path=str(file_path), 

525 fix_hint="Reshape to 1D or extract specific column", 

526 ) 

527 

528 length = shape[0] 

529 

530 if fortran_order: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 raise LoaderError( 

532 "Fortran-ordered arrays not supported for memory mapping", 

533 file_path=str(file_path), 

534 fix_hint="Resave array in C order: np.save('file.npy', arr, allow_pickle=False)", 

535 ) 

536 

537 return MmapWaveformTrace( 

538 file_path=file_path, 

539 sample_rate=sample_rate, 

540 length=length, 

541 dtype=dtype, 

542 offset=offset, 

543 metadata=metadata, 

544 mode=mode, 

545 ) 

546 

547 except Exception as e: 

548 if isinstance(e, LoaderError): 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was always true

549 raise 

550 raise LoaderError( 

551 f"Failed to load .npy file: {e}", 

552 file_path=str(file_path), 

553 ) from e 

554 

555 

556def should_use_mmap(file_path: str | PathLike[str], threshold: int = MMAP_THRESHOLD) -> bool: 

557 """Check if file should use memory mapping. 

558 

559 Recommends memory mapping for files larger than threshold (default 1 GB). 

560 

561 Args: 

562 file_path: Path to file. 

563 threshold: Size threshold in bytes (default: 1 GB). 

564 

565 Returns: 

566 True if file size >= threshold, False otherwise. 

567 

568 Example: 

569 >>> if should_use_mmap("huge_trace.npy"): 

570 ... trace = load_mmap("huge_trace.npy", sample_rate=1e9) 

571 ... else: 

572 ... trace = load("huge_trace.npy", sample_rate=1e9) 

573 """ 

574 file_path = Path(file_path) 

575 if not file_path.exists(): 

576 return False 

577 

578 file_size = file_path.stat().st_size 

579 return file_size >= threshold 

580 

581 

582__all__ = [ 

583 "MMAP_THRESHOLD", 

584 "MmapWaveformTrace", 

585 "load_mmap", 

586 "should_use_mmap", 

587]