Coverage for src / tracekit / utils / lazy.py: 82%
94 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Lazy evaluation utilities for deferred computation.
3This module provides lazy evaluation proxies that defer computation until
4results are actually needed, enabling memory-efficient operation chaining.
7Example:
8 >>> from tracekit.utils.lazy import LazyArray, lazy_operation
9 >>> # Operations are deferred until .compute() is called
10 >>> lazy_result = lazy_operation(large_data, lambda x: x ** 2)
11 >>> result = lazy_result.compute() # Only now is computation performed
13References:
14 Dask documentation on lazy evaluation
15 NumPy lazy evaluation patterns
16"""
18from __future__ import annotations
20from abc import ABC, abstractmethod
21from typing import TYPE_CHECKING, Any, TypeVar
23import numpy as np
24from numpy.typing import NDArray
26if TYPE_CHECKING:
27 from collections.abc import Callable
29T = TypeVar("T")
32class LazyProxy[T](ABC):
33 """Abstract base class for lazy evaluation proxies.
35 Defers computation until explicitly requested via .compute().
36 """
38 def __init__(self) -> None:
39 self._computed: bool = False
40 self._result: T | None = None
42 @abstractmethod
43 def _evaluate(self) -> T:
44 """Perform the actual computation.
46 Returns:
47 Computed result.
48 """
49 pass
51 def compute(self) -> T:
52 """Evaluate and return the result.
54 Returns:
55 Computed result (cached after first evaluation).
57 Example:
58 >>> lazy_obj = LazyArray(lambda: np.arange(1000))
59 >>> result = lazy_obj.compute()
60 """
61 if not self._computed:
62 self._result = self._evaluate()
63 self._computed = True
64 return self._result # type: ignore[return-value]
66 def is_computed(self) -> bool:
67 """Check if result has been computed.
69 Returns:
70 True if compute() has been called.
71 """
72 return self._computed
74 def reset(self) -> None:
75 """Clear cached result, forcing re-evaluation on next compute()."""
76 self._computed = False
77 self._result = None
80class LazyArray(LazyProxy[NDArray[np.floating[Any]]]):
81 """Lazy evaluation proxy for numpy arrays.
83 Wraps a computation that returns a numpy array, deferring
84 execution until the result is needed.
86 Args:
87 func: Callable that returns a numpy array.
88 args: Positional arguments for func.
89 kwargs: Keyword arguments for func.
91 Example:
92 >>> def expensive_computation():
93 ... return np.random.randn(1000000)
94 >>> lazy = LazyArray(expensive_computation)
95 >>> # No computation yet
96 >>> result = lazy.compute() # Now it runs
97 """
99 def __init__(
100 self,
101 func: Callable[..., NDArray[np.floating[Any]]],
102 *args: Any,
103 **kwargs: Any,
104 ) -> None:
105 super().__init__()
106 self._func = func
107 self._args = args
108 self._kwargs = kwargs
110 def _evaluate(self) -> NDArray[np.floating[Any]]:
111 """Execute the deferred computation."""
112 return self._func(*self._args, **self._kwargs)
114 def __len__(self) -> int:
115 """Get length (triggers computation)."""
116 return len(self.compute())
118 def __getitem__(self, key: Any) -> Any:
119 """Get item (triggers computation)."""
120 return self.compute()[key]
122 def shape(self) -> tuple[int, ...]:
123 """Get shape (triggers computation)."""
124 return self.compute().shape # type: ignore[no-any-return]
126 def dtype(self) -> np.dtype[Any]:
127 """Get dtype (triggers computation)."""
128 return self.compute().dtype
131class LazyOperation(LazyProxy[Any]):
132 """Lazy evaluation of an operation on data.
134 Chains operations without intermediate materialization.
136 Args:
137 operation: Callable that performs the operation.
138 *operands: Input data or other lazy proxies.
139 **kwargs: Keyword arguments for the operation.
141 Example:
142 >>> data = np.arange(1000)
143 >>> # Chain operations without computing intermediate results
144 >>> op1 = LazyOperation(lambda x: x ** 2, data)
145 >>> op2 = LazyOperation(lambda x: x + 1, op1)
146 >>> result = op2.compute()
147 """
149 def __init__(
150 self,
151 operation: Callable[..., Any],
152 *operands: Any,
153 **kwargs: Any,
154 ) -> None:
155 super().__init__()
156 self._operation = operation
157 self._operands = operands
158 self._kwargs = kwargs
160 def _evaluate(self) -> Any:
161 """Evaluate the operation, computing operands if needed."""
162 # Evaluate any lazy operands
163 evaluated_operands = []
164 for operand in self._operands:
165 if isinstance(operand, LazyProxy):
166 evaluated_operands.append(operand.compute())
167 else:
168 evaluated_operands.append(operand)
170 return self._operation(*evaluated_operands, **self._kwargs)
173def lazy_operation[T](
174 func: Callable[..., T],
175 *args: Any,
176 **kwargs: Any,
177) -> LazyOperation:
178 """Create a lazy operation from a function.
180 Args:
181 func: Function to defer.
182 *args: Arguments to pass to func.
183 **kwargs: Keyword arguments to pass to func.
185 Returns:
186 LazyOperation that will execute func when computed.
188 Example:
189 >>> import numpy as np
190 >>> data = np.arange(1000)
191 >>> lazy_result = lazy_operation(np.fft.fft, data)
192 >>> # Computation happens here
193 >>> result = lazy_result.compute()
194 """
195 return LazyOperation(func, *args, **kwargs)
198def auto_preview(
199 data: NDArray[np.floating[Any]],
200 *,
201 downsample_factor: int = 10,
202 preview_only: bool = False,
203) -> NDArray[np.float64]:
204 """Generate preview of large dataset with automatic downsampling.
206 Two-stage analysis: quick preview before full processing.
208 Args:
209 data: Input data array.
210 downsample_factor: Factor to downsample by for preview (default 10).
211 preview_only: If True, return only preview. If False, return full data.
213 Returns:
214 Preview (downsampled) or full data based on preview_only flag.
216 Example:
217 >>> import numpy as np
218 >>> large_data = np.random.randn(10_000_000)
219 >>> # Quick preview
220 >>> preview = auto_preview(large_data, preview_only=True)
221 >>> print(f"Preview shape: {preview.shape}")
222 >>> # Full data
223 >>> full = auto_preview(large_data, preview_only=False)
225 References:
226 MEM-026: Two-Stage Analysis (Preview + Full)
227 """
228 if preview_only or len(data) > 1_000_000:
229 # Generate downsampled preview
230 preview = data[::downsample_factor].copy()
231 return preview.astype(np.float64)
232 else:
233 # Small enough, return full data
234 return data.astype(np.float64)
237def select_roi(
238 data: NDArray[np.floating[Any]],
239 start: int | None = None,
240 end: int | None = None,
241 *,
242 start_time: float | None = None,
243 end_time: float | None = None,
244 sample_rate: float | None = None,
245) -> NDArray[np.float64]:
246 """Select region of interest from data.
248 Allows selection by sample indices or time values.
250 Args:
251 data: Input data array.
252 start: Start sample index (inclusive).
253 end: End sample index (exclusive).
254 start_time: Start time in seconds (alternative to start).
255 end_time: End time in seconds (alternative to end).
256 sample_rate: Sample rate in Hz (required if using time-based selection).
258 Returns:
259 Selected region of interest.
261 Raises:
262 ValueError: If time-based selection used without sample_rate.
264 Example:
265 >>> import numpy as np
266 >>> data = np.random.randn(10_000_000)
267 >>> # Select by sample indices
268 >>> roi = select_roi(data, start=1000, end=2000)
269 >>> # Select by time
270 >>> roi_time = select_roi(
271 ... data, start_time=0.001, end_time=0.002, sample_rate=1e6
272 ... )
274 References:
275 MEM-027: Region-of-Interest Selection from Preview
276 """
277 # Convert time-based to sample-based
278 if start_time is not None or end_time is not None:
279 if sample_rate is None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 raise ValueError("sample_rate required for time-based selection")
282 if start_time is not None: 282 ↛ 284line 282 didn't jump to line 284 because the condition on line 282 was always true
283 start = int(start_time * sample_rate)
284 if end_time is not None: 284 ↛ 288line 284 didn't jump to line 288 because the condition on line 284 was always true
285 end = int(end_time * sample_rate)
287 # Apply defaults
288 if start is None: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true
289 start = 0
290 if end is None: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 end = len(data)
293 # Validate and clip to bounds
294 start = max(0, start)
295 end = min(len(data), end)
297 if start >= end: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 raise ValueError(f"Invalid ROI: start ({start}) >= end ({end})")
300 # Extract region
301 return data[start:end].astype(np.float64)
304class ProgressiveResolution:
305 """Progressive resolution analyzer for large datasets.
307 Implements coarse-to-fine analysis: preview then zoom into ROI.
309 Args:
310 data: Input data array or lazy proxy.
311 sample_rate: Sample rate in Hz.
313 Example:
314 >>> import numpy as np
315 >>> data = np.random.randn(100_000_000)
316 >>> analyzer = ProgressiveResolution(data, sample_rate=1e6)
317 >>> # Stage 1: Preview
318 >>> preview = analyzer.get_preview(downsample_factor=100)
319 >>> # Stage 2: User selects ROI
320 >>> roi_data = analyzer.get_roi(start_time=0.5, end_time=0.6)
322 References:
323 MEM-013: Progressive Resolution (Coarse-to-Fine)
324 """
326 def __init__(
327 self,
328 data: NDArray[np.floating[Any]] | LazyProxy[NDArray[np.floating[Any]]],
329 sample_rate: float,
330 ) -> None:
331 self._data = data
332 self._sample_rate = sample_rate
333 self._preview: NDArray[np.float64] | None = None
334 self._preview_factor: int | None = None
336 def get_preview(
337 self,
338 downsample_factor: int = 10,
339 force_recompute: bool = False,
340 ) -> NDArray[np.float64]:
341 """Generate low-resolution preview.
343 Args:
344 downsample_factor: Factor to downsample by.
345 force_recompute: If True, recompute even if cached.
347 Returns:
348 Downsampled preview of data.
349 """
350 if self._preview is not None and not force_recompute: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 if self._preview_factor == downsample_factor:
352 return self._preview
354 # Get full data
355 data = self._data.compute() if isinstance(self._data, LazyProxy) else self._data
357 # Downsample
358 self._preview = data[::downsample_factor].copy().astype(np.float64)
359 self._preview_factor = downsample_factor
361 return self._preview
363 def get_roi(
364 self,
365 start: int | None = None,
366 end: int | None = None,
367 *,
368 start_time: float | None = None,
369 end_time: float | None = None,
370 ) -> NDArray[np.float64]:
371 """Get high-resolution region of interest.
373 Args:
374 start: Start sample index.
375 end: End sample index.
376 start_time: Start time in seconds (alternative).
377 end_time: End time in seconds (alternative).
379 Returns:
380 Full-resolution ROI data.
381 """
382 # Get full data
383 data = self._data.compute() if isinstance(self._data, LazyProxy) else self._data
385 return select_roi(
386 data,
387 start=start,
388 end=end,
389 start_time=start_time,
390 end_time=end_time,
391 sample_rate=self._sample_rate,
392 )
394 @property
395 def sample_rate(self) -> float:
396 """Sample rate in Hz."""
397 return self._sample_rate
400__all__ = [
401 "LazyArray",
402 "LazyOperation",
403 "LazyProxy",
404 "ProgressiveResolution",
405 "auto_preview",
406 "lazy_operation",
407 "select_roi",
408]