Coverage for src / tracekit / loaders / mmap_loader.py: 79%
139 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Memory-mapped file loader for huge waveform files.
3This module provides efficient memory-mapped loading for GB+ files that cannot
4fit in RAM. Unlike eager loading, memory-mapped arrays don't load the entire
5file into memory but access it in chunks on-demand via the OS page cache.
7Key features:
8- Zero-copy data access via numpy.memmap
9- Chunked iteration for processing huge files
10- Integration with existing TraceKit loader infrastructure
11- Support for common binary formats (raw, NPY, structured)
12- Automatic fallback to regular loading for small files
14Example:
15 >>> from tracekit.loaders.mmap_loader import load_mmap
16 >>> # Load 10 GB file without loading all data to RAM
17 >>> trace = load_mmap("huge_trace.npy", sample_rate=1e9)
18 >>> print(f"Length: {len(trace)} samples")
19 >>>
20 >>> # Process in chunks to avoid OOM
21 >>> for chunk in trace.iter_chunks(chunk_size=1_000_000):
22 ... result = analyze_chunk(chunk)
24References:
25 Performance optimization for huge files (>1 GB)
26 API-017: Lazy Loading for Huge Files
27"""
29from __future__ import annotations
31from collections.abc import Iterator
32from pathlib import Path
33from typing import TYPE_CHECKING, Any
35import numpy as np
37from tracekit.core.exceptions import LoaderError
38from tracekit.core.types import TraceMetadata
40if TYPE_CHECKING:
41 from os import PathLike
43 from numpy.typing import DTypeLike, NDArray
46# File size threshold for automatic mmap suggestion (1 GB)
47MMAP_THRESHOLD = 1024 * 1024 * 1024
50class MmapWaveformTrace:
51 """Memory-mapped waveform trace for huge files.
53 Provides lazy access to waveform data via memory mapping. Data is not
54 loaded into RAM but accessed directly from disk through the OS page cache.
56 This allows working with files larger than available RAM without OOM errors.
58 Attributes:
59 file_path: Path to the memory-mapped file.
60 sample_rate: Sample rate in Hz.
61 length: Number of samples in the trace.
62 dtype: NumPy dtype of the samples.
63 metadata: Additional trace metadata.
65 Example:
66 >>> trace = MmapWaveformTrace(
67 ... file_path="huge_trace.bin",
68 ... sample_rate=1e9,
69 ... length=10_000_000_000,
70 ... dtype=np.float32
71 ... )
72 >>> # Access subset without loading entire file
73 >>> subset = trace[1000:2000]
74 >>> # Process in chunks
75 >>> for chunk in trace.iter_chunks(chunk_size=1_000_000):
76 ... process(chunk)
77 """
79 def __init__(
80 self,
81 file_path: str | Path,
82 sample_rate: float,
83 length: int,
84 *,
85 dtype: DTypeLike = np.float64,
86 offset: int = 0,
87 metadata: dict[str, Any] | None = None,
88 mode: str = "r",
89 ) -> None:
90 """Initialize memory-mapped trace.
92 Args:
93 file_path: Path to binary data file.
94 sample_rate: Sample rate in Hz.
95 length: Number of samples.
96 dtype: Data type of samples.
97 offset: Byte offset to start of data in file.
98 metadata: Additional metadata dictionary.
99 mode: File access mode ('r' for read-only, 'r+' for read-write).
101 Raises:
102 LoaderError: If file not found or invalid parameters.
104 Example:
105 >>> trace = MmapWaveformTrace(
106 ... file_path="trace.f32",
107 ... sample_rate=1e9,
108 ... length=1_000_000_000,
109 ... dtype=np.float32
110 ... )
111 """
112 self._file_path = Path(file_path)
113 self._sample_rate = float(sample_rate)
114 self._length = int(length)
115 self._dtype = np.dtype(dtype)
116 self._offset = int(offset)
117 self._metadata = metadata or {}
118 self._mode = mode
120 # Memory-mapped array - created on first access
121 self._memmap: np.memmap[Any, np.dtype[Any]] | None = None
123 # Validate inputs
124 if self._sample_rate <= 0:
125 raise LoaderError(f"sample_rate must be positive, got {self._sample_rate}")
126 if self._length < 0: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 raise LoaderError(f"length must be non-negative, got {self._length}")
128 if self._offset < 0: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 raise LoaderError(f"offset must be non-negative, got {self._offset}")
131 # Verify file exists
132 if not self._file_path.exists(): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 raise LoaderError(f"File not found: {self._file_path}")
135 # Verify file size
136 expected_size = self._offset + self._length * self._dtype.itemsize
137 actual_size = self._file_path.stat().st_size
138 if actual_size < expected_size:
139 raise LoaderError(
140 f"File too small for requested data. "
141 f"Expected at least {expected_size} bytes, got {actual_size} bytes",
142 file_path=str(self._file_path),
143 )
145 @property
146 def sample_rate(self) -> float:
147 """Sample rate in Hz."""
148 return self._sample_rate
150 @property
151 def length(self) -> int:
152 """Number of samples."""
153 return self._length
155 @property
156 def duration(self) -> float:
157 """Duration in seconds."""
158 return self._length / self._sample_rate
160 @property
161 def metadata(self) -> dict[str, Any]:
162 """Metadata dictionary."""
163 return self._metadata
165 @property
166 def dtype(self) -> np.dtype[Any]:
167 """Data type of samples."""
168 return self._dtype
170 @property
171 def file_path(self) -> Path:
172 """Path to memory-mapped file."""
173 return self._file_path
175 @property
176 def data(self) -> np.memmap[Any, np.dtype[Any]]:
177 """Memory-mapped data array.
179 Returns a numpy.memmap object that behaves like a numpy array
180 but doesn't load data into memory until accessed.
182 Returns:
183 Memory-mapped numpy array.
185 Example:
186 >>> trace = load_mmap("huge.npy", sample_rate=1e9)
187 >>> data = trace.data # No data loaded yet
188 >>> subset = data[1000:2000] # Only this range loaded
189 """
190 if self._memmap is None:
191 self._memmap = np.memmap( # type: ignore[call-overload]
192 str(self._file_path),
193 dtype=self._dtype,
194 mode=self._mode,
195 offset=self._offset,
196 shape=(self._length,),
197 )
198 return self._memmap
200 @property
201 def time_vector(self) -> NDArray[np.float64]:
202 """Time vector in seconds.
204 Note: For huge traces, this can consume significant memory.
205 Consider using time values on-demand instead.
207 Returns:
208 Array of time values corresponding to samples.
210 Example:
211 >>> # For huge traces, avoid materializing full time vector
212 >>> # Instead compute on-demand:
213 >>> t_start = 0
214 >>> t_end = trace.length / trace.sample_rate
215 """
216 return np.arange(self._length, dtype=np.float64) / self._sample_rate
218 def __getitem__(self, key: int | slice) -> float | NDArray[np.float64]:
219 """Slice the memory-mapped trace.
221 Supports both integer indexing and slicing. Only the requested
222 portion is loaded from disk.
224 Args:
225 key: Index or slice.
227 Returns:
228 Single sample (float) or array slice.
230 Raises:
231 TypeError: If key is not int or slice.
232 IndexError: If index out of range.
234 Example:
235 >>> sample = trace[1000] # Load single sample
236 >>> chunk = trace[1000:2000] # Load 1000 samples
237 >>> every_10th = trace[::10] # Load decimated data
238 """
239 if isinstance(key, (int, slice)): 239 ↛ 242line 239 didn't jump to line 242 because the condition on line 239 was always true
240 return self.data[key]
241 else:
242 raise TypeError(f"Indices must be int or slice, not {type(key).__name__}")
244 def __len__(self) -> int:
245 """Number of samples."""
246 return self._length
248 def iter_chunks(
249 self, chunk_size: int = 1_000_000, overlap: int = 0
250 ) -> Iterator[NDArray[np.float64]]:
251 """Iterate over trace in chunks.
253 Yields consecutive chunks of data, optionally with overlap between
254 chunks. This is efficient for processing huge files that don't fit
255 in memory.
257 Args:
258 chunk_size: Number of samples per chunk.
259 overlap: Number of samples to overlap between chunks.
261 Yields:
262 Numpy arrays of chunk_size (or smaller for last chunk).
264 Raises:
265 ValueError: If chunk_size or overlap invalid.
267 Example:
268 >>> # Process 10 GB file in 1M sample chunks
269 >>> for chunk in trace.iter_chunks(chunk_size=1_000_000):
270 ... result = compute_fft(chunk)
271 ...
272 >>> # With 50% overlap for windowed processing
273 >>> for chunk in trace.iter_chunks(chunk_size=2048, overlap=1024):
274 ... spectrum = analyze_spectrum(chunk)
275 """
276 if chunk_size <= 0: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 raise ValueError(f"chunk_size must be positive, got {chunk_size}")
278 if overlap < 0: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 raise ValueError(f"overlap must be non-negative, got {overlap}")
280 if overlap >= chunk_size: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 raise ValueError(f"overlap ({overlap}) must be less than chunk_size ({chunk_size})")
283 data = self.data
284 step = chunk_size - overlap
286 for start in range(0, self._length, step):
287 end = min(start + chunk_size, self._length)
288 # Convert memmap slice to regular array to avoid keeping file handle open
289 yield np.asarray(data[start:end], dtype=np.float64)
291 def to_eager(self) -> Any:
292 """Convert to eager WaveformTrace by loading all data.
294 WARNING: This loads the entire file into memory. Only use this
295 if you're sure the data fits in RAM.
297 Returns:
298 WaveformTrace with data loaded in memory.
300 Raises:
301 MemoryError: If data doesn't fit in RAM.
303 Example:
304 >>> # Only convert to eager if file is small enough
305 >>> if trace.length < 10_000_000:
306 ... eager_trace = trace.to_eager()
307 """
308 from tracekit.core.types import WaveformTrace
310 # Load all data into memory
311 data = np.asarray(self.data, dtype=np.float64)
313 metadata = TraceMetadata(
314 sample_rate=self._sample_rate,
315 source_file=str(self._file_path),
316 **self._metadata, # type: ignore[arg-type]
317 )
319 return WaveformTrace(data=data, metadata=metadata)
321 def close(self) -> None:
322 """Close memory-mapped file handle.
324 Should be called when done with the trace to free resources.
325 The trace cannot be used after closing.
327 Example:
328 >>> trace = load_mmap("huge.npy", sample_rate=1e9)
329 >>> # ... use trace ...
330 >>> trace.close()
331 """
332 if self._memmap is not None:
333 # Delete reference to allow garbage collection
334 del self._memmap
335 self._memmap = None
337 def __del__(self) -> None:
338 """Cleanup memory map on deletion."""
339 self.close()
341 def __repr__(self) -> str:
342 """String representation."""
343 size_mb = (self._length * self._dtype.itemsize) / (1024 * 1024)
344 return (
345 f"MmapWaveformTrace("
346 f"file={self._file_path.name}, "
347 f"sample_rate={self._sample_rate:.2e} Hz, "
348 f"length={self._length:,} samples, "
349 f"size={size_mb:.1f} MB, "
350 f"dtype={self._dtype})"
351 )
353 def __enter__(self) -> MmapWaveformTrace:
354 """Context manager entry."""
355 return self
357 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
358 """Context manager exit - close the file."""
359 self.close()
362def load_mmap(
363 file_path: str | PathLike[str],
364 sample_rate: float | None = None,
365 *,
366 dtype: DTypeLike | None = None,
367 offset: int = 0,
368 length: int | None = None,
369 mode: str = "r",
370 **metadata: Any,
371) -> MmapWaveformTrace:
372 """Load waveform file with memory mapping.
374 Creates a memory-mapped trace that doesn't load data into RAM.
375 Supports .npy files (auto-detects format) and raw binary files.
377 Args:
378 file_path: Path to waveform file (.npy or raw binary).
379 sample_rate: Sample rate in Hz (required for raw files, optional for .npy).
380 dtype: Data type (required for raw files, auto-detected for .npy).
381 offset: Byte offset to data start (auto-computed for .npy).
382 length: Number of samples (auto-computed if possible).
383 mode: File access mode ('r' for read-only, 'r+' for read-write).
384 **metadata: Additional metadata to store.
386 Returns:
387 MmapWaveformTrace for memory-mapped access.
389 Raises:
390 LoaderError: If file not found or parameters invalid.
392 Example:
393 >>> # Load NumPy file (auto-detects format)
394 >>> trace = load_mmap("huge_trace.npy", sample_rate=1e9)
395 >>>
396 >>> # Load raw binary file
397 >>> trace = load_mmap(
398 ... "data.f32",
399 ... sample_rate=1e9,
400 ... dtype=np.float32,
401 ... length=1_000_000_000
402 ... )
403 >>>
404 >>> # Use context manager
405 >>> with load_mmap("huge.npy", sample_rate=1e9) as trace:
406 ... for chunk in trace.iter_chunks(chunk_size=1_000_000):
407 ... process(chunk)
409 References:
410 API-017: Lazy Loading for Huge Files
411 """
412 file_path = Path(file_path)
414 if not file_path.exists():
415 raise LoaderError(f"File not found: {file_path}")
417 suffix = file_path.suffix.lower()
419 # Handle .npy files with automatic format detection
420 if suffix == ".npy":
421 return _load_npy_mmap(file_path, sample_rate, mode, metadata)
423 # Handle .npz files (not directly memory-mappable, but can extract)
424 elif suffix == ".npz":
425 raise LoaderError(
426 "NPZ files cannot be directly memory-mapped. "
427 "Extract the array first using np.load() and save as .npy",
428 file_path=str(file_path),
429 fix_hint="Use: np.save('array.npy', np.load('file.npz')['array'])",
430 )
432 # Handle raw binary files
433 else:
434 if dtype is None: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 raise LoaderError(
436 "dtype is required for raw binary files",
437 file_path=str(file_path),
438 fix_hint="Specify dtype, e.g., dtype=np.float32",
439 )
440 if sample_rate is None: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 raise LoaderError(
442 "sample_rate is required for raw binary files",
443 file_path=str(file_path),
444 )
446 # Compute length from file size if not provided
447 dtype_np = np.dtype(dtype)
448 if length is None: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 file_size = file_path.stat().st_size - offset
450 length = file_size // dtype_np.itemsize
452 return MmapWaveformTrace(
453 file_path=file_path,
454 sample_rate=sample_rate,
455 length=length,
456 dtype=dtype_np,
457 offset=offset,
458 metadata=metadata,
459 mode=mode,
460 )
463def _load_npy_mmap(
464 file_path: Path,
465 sample_rate: float | None,
466 mode: str,
467 metadata: dict[str, Any],
468) -> MmapWaveformTrace:
469 """Load NumPy .npy file with memory mapping.
471 Reads the .npy header to extract dtype, shape, and data offset,
472 then creates a memory-mapped array.
474 Args:
475 file_path: Path to .npy file.
476 sample_rate: Sample rate in Hz (required).
477 mode: File access mode.
478 metadata: Additional metadata.
480 Returns:
481 MmapWaveformTrace for the .npy file.
483 Raises:
484 LoaderError: If sample_rate not provided or file invalid.
485 """
486 if sample_rate is None: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 raise LoaderError(
488 "sample_rate is required for .npy files",
489 file_path=str(file_path),
490 fix_hint="Specify sample_rate, e.g., sample_rate=1e9",
491 )
493 try:
494 # Read NumPy header without loading data
495 with open(file_path, "rb") as f:
496 import numpy.lib.format as npf
498 # Read header
499 version = npf.read_magic(f) # type: ignore[no-untyped-call]
501 if version == (1, 0): 501 ↛ 503line 501 didn't jump to line 503 because the condition on line 501 was always true
502 shape, fortran_order, dtype = npf.read_array_header_1_0(f) # type: ignore[no-untyped-call]
503 elif version == (2, 0):
504 shape, fortran_order, dtype = npf.read_array_header_2_0(f) # type: ignore[no-untyped-call]
505 else:
506 raise LoaderError(
507 f"Unsupported NPY version: {version}",
508 file_path=str(file_path),
509 )
511 # Get data offset
512 offset = f.tell()
514 # Validate shape
515 if not isinstance(shape, tuple): 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 raise LoaderError(
517 f"Invalid .npy shape: {shape}",
518 file_path=str(file_path),
519 )
521 if len(shape) != 1: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 raise LoaderError(
523 f"Expected 1D array, got shape {shape}",
524 file_path=str(file_path),
525 fix_hint="Reshape to 1D or extract specific column",
526 )
528 length = shape[0]
530 if fortran_order: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 raise LoaderError(
532 "Fortran-ordered arrays not supported for memory mapping",
533 file_path=str(file_path),
534 fix_hint="Resave array in C order: np.save('file.npy', arr, allow_pickle=False)",
535 )
537 return MmapWaveformTrace(
538 file_path=file_path,
539 sample_rate=sample_rate,
540 length=length,
541 dtype=dtype,
542 offset=offset,
543 metadata=metadata,
544 mode=mode,
545 )
547 except Exception as e:
548 if isinstance(e, LoaderError): 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was always true
549 raise
550 raise LoaderError(
551 f"Failed to load .npy file: {e}",
552 file_path=str(file_path),
553 ) from e
556def should_use_mmap(file_path: str | PathLike[str], threshold: int = MMAP_THRESHOLD) -> bool:
557 """Check if file should use memory mapping.
559 Recommends memory mapping for files larger than threshold (default 1 GB).
561 Args:
562 file_path: Path to file.
563 threshold: Size threshold in bytes (default: 1 GB).
565 Returns:
566 True if file size >= threshold, False otherwise.
568 Example:
569 >>> if should_use_mmap("huge_trace.npy"):
570 ... trace = load_mmap("huge_trace.npy", sample_rate=1e9)
571 ... else:
572 ... trace = load("huge_trace.npy", sample_rate=1e9)
573 """
574 file_path = Path(file_path)
575 if not file_path.exists():
576 return False
578 file_size = file_path.stat().st_size
579 return file_size >= threshold
582__all__ = [
583 "MMAP_THRESHOLD",
584 "MmapWaveformTrace",
585 "load_mmap",
586 "should_use_mmap",
587]