Coverage for src / tracekit / core / lazy.py: 88%
159 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Lazy evaluation module for deferred computation in TraceKit workflows.
3This module provides lazy evaluation primitives that defer computation until
4results are actually accessed. Designed for analysis workflows where not all
5intermediate results may be needed, reducing memory usage and computation time.
7Key features:
8- Thread-safe lazy evaluation with compute-once semantics
9- Chained operations without eager evaluation
10- Partial evaluation (compute subset of results)
11- Memory-efficient release of source data after computation
12- Integration with TraceKit's memory monitoring and progress tracking
15Example:
16 >>> from tracekit.core.lazy import LazyResult, lazy, LazyDict
17 >>>
18 >>> # Defer expensive FFT computation
19 >>> @lazy
20 >>> def compute_fft(signal, nfft):
21 ... return np.fft.fft(signal, n=nfft)
22 >>>
23 >>> # Create lazy result - not computed yet
24 >>> lazy_fft = compute_fft(signal, 8192)
25 >>> print(lazy_fft.is_computed()) # False
26 >>>
27 >>> # Access triggers computation
28 >>> spectrum = lazy_fft.value # Computed now
29 >>> print(lazy_fft.is_computed()) # True
30 >>>
31 >>> # Multiple accesses use cached result
32 >>> spectrum2 = lazy_fft.value # Returns cached value
33 >>>
34 >>> # LazyDict for multiple lazy results
35 >>> results = LazyDict()
36 >>> results['fft'] = LazyResult(lambda: np.fft.fft(signal, 8192))
37 >>> results['power'] = LazyResult(lambda: np.abs(results['fft'].value)**2)
38 >>> # Access triggers computation chain
39 >>> power_spectrum = results['power'] # Computes fft, then power
41References:
42 Python lazy evaluation patterns
43 Threading locks for thread-safe computation
44 TraceKit memory monitoring (core.memory_monitor)
45"""
47from __future__ import annotations
49import functools
50import threading
51from dataclasses import dataclass
52from typing import TYPE_CHECKING, Any
54if TYPE_CHECKING:
55 from collections.abc import Callable
58@dataclass
59class LazyComputeStats:
60 """Statistics for lazy computation tracking.
62 Attributes:
63 total_created: Total number of LazyResult instances created.
64 total_computed: Number of LazyResult instances that have been computed.
65 total_invalidated: Number of invalidations (for delta analysis).
66 compute_time_total: Total time spent computing (seconds).
67 cache_hits: Number of times a computed result was reused.
69 Example:
70 >>> stats = LazyComputeStats()
71 >>> stats.total_created += 1
72 >>> stats.total_computed += 1
73 >>> print(f"Computed: {stats.total_computed}/{stats.total_created}")
74 """
76 total_created: int = 0
77 total_computed: int = 0
78 total_invalidated: int = 0
79 compute_time_total: float = 0.0
80 cache_hits: int = 0
82 @property
83 def hit_rate(self) -> float:
84 """Calculate cache hit rate.
86 Returns:
87 Fraction of accesses that were cache hits (0.0-1.0).
88 """
89 total_accesses = self.total_computed + self.cache_hits
90 if total_accesses == 0:
91 return 0.0
92 return self.cache_hits / total_accesses
94 def __str__(self) -> str:
95 """Format statistics as readable string."""
96 return (
97 f"Lazy Computation Statistics:\n"
98 f" Created: {self.total_created}\n"
99 f" Computed: {self.total_computed}\n"
100 f" Cache Hits: {self.cache_hits}\n"
101 f" Hit Rate: {self.hit_rate * 100:.1f}%\n"
102 f" Invalidations: {self.total_invalidated}\n"
103 f" Total Compute Time: {self.compute_time_total:.3f}s\n"
104 )
107# Global statistics tracker
108_global_stats = LazyComputeStats()
109_stats_lock = threading.Lock()
112def get_lazy_stats() -> LazyComputeStats:
113 """Get global lazy computation statistics.
115 Returns:
116 Global LazyComputeStats instance.
118 Example:
119 >>> stats = get_lazy_stats()
120 >>> print(stats)
121 Lazy Computation Statistics:
122 Created: 42
123 Computed: 35
124 ...
126 References:
127 MEM-031: Cache statistics tracking
128 """
129 with _stats_lock:
130 return LazyComputeStats(
131 total_created=_global_stats.total_created,
132 total_computed=_global_stats.total_computed,
133 total_invalidated=_global_stats.total_invalidated,
134 compute_time_total=_global_stats.compute_time_total,
135 cache_hits=_global_stats.cache_hits,
136 )
139def reset_lazy_stats() -> None:
140 """Reset global lazy computation statistics.
142 Example:
143 >>> reset_lazy_stats()
144 >>> stats = get_lazy_stats()
145 >>> assert stats.total_created == 0
146 """
147 global _global_stats
148 with _stats_lock:
149 _global_stats = LazyComputeStats()
152class LazyResult[T]:
153 """Deferred computation wrapper with thread-safe compute-once semantics.
155 Wraps a computation function that will be called only when the result is
156 first accessed. Subsequent accesses return the cached result. Thread-safe
157 for parallel access from multiple analyzers.
160 Attributes:
161 name: Optional name for debugging/logging.
163 Example:
164 >>> # Create lazy FFT computation
165 >>> lazy_fft = LazyResult(
166 ... lambda: np.fft.fft(signal, n=8192),
167 ... name="fft_8192"
168 ... )
169 >>>
170 >>> # Check if computed without triggering computation
171 >>> if not lazy_fft.is_computed():
172 ... print("Not computed yet")
173 >>>
174 >>> # Access triggers computation
175 >>> spectrum = lazy_fft.value
176 >>>
177 >>> # Subsequent accesses use cache
178 >>> spectrum2 = lazy_fft.value # No recomputation
179 >>>
180 >>> # Invalidate for delta analysis
181 >>> lazy_fft.invalidate()
182 >>> spectrum3 = lazy_fft.value # Recomputes
184 References:
185 Python threading for thread safety
186 PERF-002: Lazy evaluation requirements
187 """
189 def __init__(
190 self,
191 compute_fn: Callable[[], T],
192 name: str = "",
193 *,
194 weak_source: bool = False,
195 ):
196 """Initialize lazy result.
198 Args:
199 compute_fn: Function to call to compute the result.
200 name: Optional name for debugging/logging.
201 weak_source: If True, use weak reference to source data
202 (allows GC after computation).
204 Example:
205 >>> lazy_result = LazyResult(
206 ... lambda: expensive_computation(),
207 ... name="expensive_op"
208 ... )
209 """
210 self._compute_fn = compute_fn
211 self._name = name or f"LazyResult_{id(self)}"
212 self._result: T | None = None
213 self._computed = False
214 self._lock = threading.RLock()
215 self._weak_source = weak_source
216 self._source_released = False
218 # Track creation
219 with _stats_lock:
220 _global_stats.total_created += 1
222 @property
223 def value(self) -> T:
224 """Get the result, computing if necessary.
226 Thread-safe lazy computation with compute-once semantics.
227 Multiple concurrent accesses will only compute once.
229 Returns:
230 The computed result.
232 Raises:
233 Exception: Any exception raised by the compute function.
235 Example:
236 >>> lazy_fft = LazyResult(lambda: np.fft.fft(signal))
237 >>> spectrum = lazy_fft.value # Computes here
238 >>> spectrum2 = lazy_fft.value # Uses cache
240 References:
241 PERF-002: Lazy evaluation for analysis pipelines
242 """
243 with self._lock:
244 if self._computed:
245 # Cache hit
246 with _stats_lock:
247 _global_stats.cache_hits += 1
248 return self._result # type: ignore[return-value]
250 # Compute result
251 import time
253 start_time = time.time()
255 try:
256 self._result = self._compute_fn()
257 self._computed = True
259 # Track computation
260 compute_time = time.time() - start_time
261 with _stats_lock:
262 _global_stats.total_computed += 1
263 _global_stats.compute_time_total += compute_time
265 # Optionally release source data
266 if self._weak_source and not self._source_released: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 self._release_source()
269 return self._result
271 except Exception:
272 # Don't cache errors, allow retry
273 self._computed = False
274 raise
276 def is_computed(self) -> bool:
277 """Check if result has been computed without triggering computation.
279 Returns:
280 True if result is computed and cached.
282 Example:
283 >>> lazy_result = LazyResult(lambda: expensive_op())
284 >>> if lazy_result.is_computed():
285 ... result = lazy_result.value # No computation
286 ... else:
287 ... print("Will compute on next access")
289 References:
290 API-012: Lazy result access patterns
291 """
292 with self._lock:
293 return self._computed
295 def invalidate(self) -> None:
296 """Mark result as invalid, forcing recomputation on next access.
298 Useful for delta analysis where the underlying data changes and
299 results need to be recomputed.
301 Example:
302 >>> lazy_fft = LazyResult(lambda: np.fft.fft(signal))
303 >>> spectrum1 = lazy_fft.value
304 >>>
305 >>> # Signal data changed
306 >>> signal = new_signal
307 >>> lazy_fft.invalidate()
308 >>>
309 >>> # Next access recomputes with new signal
310 >>> spectrum2 = lazy_fft.value
312 References:
313 API-012: Delta analysis support
314 """
315 with self._lock:
316 self._computed = False
317 self._result = None
318 self._source_released = False
320 with _stats_lock:
321 _global_stats.total_invalidated += 1
323 def get_if_computed(self) -> T | None:
324 """Get result only if already computed, otherwise return None.
326 Returns:
327 Computed result or None if not yet computed.
329 Example:
330 >>> lazy_result = LazyResult(lambda: expensive_op())
331 >>> result = lazy_result.get_if_computed() # None
332 >>> _ = lazy_result.value # Compute
333 >>> result = lazy_result.get_if_computed() # Returns result
334 """
335 with self._lock:
336 if self._computed:
337 return self._result
338 return None
340 def peek(self) -> tuple[bool, T | None]:
341 """Get computation status and result if available.
343 Returns:
344 Tuple of (is_computed, result). Result is None if not computed.
346 Example:
347 >>> lazy_result = LazyResult(lambda: expensive_op())
348 >>> computed, result = lazy_result.peek()
349 >>> if computed:
350 ... print(f"Result: {result}")
351 ... else:
352 ... print("Not computed yet")
353 """
354 with self._lock:
355 return (self._computed, self._result)
357 def map(self, fn: Callable[[T], Any]) -> LazyResult[Any]:
358 """Create a new lazy result by applying a function to this result.
360 Enables chained lazy operations without eager evaluation.
362 Args:
363 fn: Function to apply to the result.
365 Returns:
366 New LazyResult that computes fn(self.value).
368 Example:
369 >>> lazy_fft = LazyResult(lambda: np.fft.fft(signal))
370 >>> lazy_power = lazy_fft.map(lambda x: np.abs(x)**2)
371 >>> lazy_peak = lazy_power.map(lambda x: x.max())
372 >>>
373 >>> # Nothing computed yet
374 >>> peak = lazy_peak.value # Computes entire chain
376 References:
377 PERF-002: Lazy evaluation for chained operations
378 """
379 return LazyResult(
380 lambda: fn(self.value),
381 name=f"{self._name}.map({fn.__name__})",
382 weak_source=self._weak_source,
383 )
385 def _release_source(self) -> None:
386 """Release source data to allow garbage collection.
388 After computation completes, we can release the source data
389 if weak_source=True was specified. This replaces the compute
390 function's closure to break references to large input data.
392 Note: We don't call gc.collect() here as it would be called
393 very frequently and is expensive. Python's automatic GC will
394 handle cleanup.
395 """
396 # Clear the compute function's closure to release references
397 if hasattr(self._compute_fn, "__closure__") and self._compute_fn.__closure__:
398 # Can't directly clear closure, but can replace function
399 # to break references
400 result = self._result
402 def return_result() -> T:
403 return result # type: ignore[return-value]
405 self._compute_fn = return_result
407 self._source_released = True
408 # Let Python's automatic GC handle cleanup
410 def __repr__(self) -> str:
411 """String representation for debugging."""
412 status = "computed" if self._computed else "deferred"
413 return f"LazyResult(name={self._name!r}, status={status})"
416class LazyDict(dict[str, Any]):
417 """Dictionary where LazyResult values are auto-evaluated on access.
419 Extends standard dict to automatically evaluate LazyResult values when
420 accessed. Regular (non-lazy) values pass through unchanged.
422 Useful for collections of analysis results where some may not be needed.
424 Example:
425 >>> results = LazyDict()
426 >>> results['fft'] = LazyResult(lambda: np.fft.fft(signal))
427 >>> results['power'] = LazyResult(lambda: np.abs(results['fft'])**2)
428 >>> results['constant'] = 42 # Non-lazy value
429 >>>
430 >>> # Access auto-evaluates lazy results
431 >>> fft_spectrum = results['fft'] # Computes FFT
432 >>> power_spectrum = results['power'] # Computes power
433 >>> const = results['constant'] # Returns 42 directly
434 >>>
435 >>> # Check if computed without triggering computation
436 >>> fft_lazy = super(LazyDict, results).__getitem__('fft')
437 >>> if fft_lazy.is_computed():
438 ... print("FFT already computed")
440 References:
441 API-012: Lazy result access patterns
442 """
444 def __getitem__(self, key: str) -> Any:
445 """Get value, auto-evaluating if it's a LazyResult.
447 Args:
448 key: Dictionary key.
450 Returns:
451 Evaluated value (LazyResult.value) or raw value.
453 Example:
454 >>> lazy_dict = LazyDict()
455 >>> lazy_dict['result'] = LazyResult(lambda: expensive_op())
456 >>> value = lazy_dict['result'] # Auto-evaluates
457 """
458 value = super().__getitem__(key)
459 if isinstance(value, LazyResult):
460 return value.value
461 return value
463 def get_lazy(self, key: str) -> LazyResult[Any] | Any:
464 """Get the raw value without auto-evaluation.
466 Returns the LazyResult instance itself, not its value.
468 Args:
469 key: Dictionary key.
471 Returns:
472 Raw value (may be LazyResult instance).
474 Example:
475 >>> lazy_dict = LazyDict()
476 >>> lazy_dict['result'] = LazyResult(lambda: expensive_op())
477 >>> lazy_obj = lazy_dict.get_lazy('result')
478 >>> if not lazy_obj.is_computed():
479 ... print("Will compute on access")
480 """
481 return super().__getitem__(key)
483 def is_computed(self, key: str) -> bool:
484 """Check if a lazy value has been computed.
486 Args:
487 key: Dictionary key.
489 Returns:
490 True if value is computed (or not lazy), False otherwise.
492 Example:
493 >>> if not lazy_dict.is_computed('fft'):
494 ... print("FFT not computed yet")
495 """
496 value = super().__getitem__(key)
497 if isinstance(value, LazyResult):
498 return value.is_computed()
499 return True # Non-lazy values are "computed"
501 def invalidate(self, key: str) -> None:
502 """Invalidate a lazy result, forcing recomputation.
504 Args:
505 key: Dictionary key.
507 Example:
508 >>> lazy_dict.invalidate('fft')
509 >>> # Next access will recompute
510 >>> fft = lazy_dict['fft']
511 """
512 value = super().__getitem__(key)
513 if isinstance(value, LazyResult): 513 ↛ exitline 513 didn't return from function 'invalidate' because the condition on line 513 was always true
514 value.invalidate()
516 def invalidate_all(self) -> None:
517 """Invalidate all lazy results in the dictionary.
519 Example:
520 >>> lazy_dict.invalidate_all()
521 >>> # All lazy values will recompute on next access
522 """
523 for value in self.values():
524 if isinstance(value, LazyResult):
525 value.invalidate()
527 def computed_keys(self) -> list[str]:
528 """Get list of keys with computed values.
530 Returns:
531 List of keys whose values are computed.
533 Example:
534 >>> computed = lazy_dict.computed_keys()
535 >>> print(f"Computed: {computed}")
536 """
537 return [
538 key
539 for key, value in super().items()
540 if not isinstance(value, LazyResult) or value.is_computed()
541 ]
543 def deferred_keys(self) -> list[str]:
544 """Get list of keys with deferred (not computed) values.
546 Returns:
547 List of keys whose LazyResult values are not computed.
549 Example:
550 >>> deferred = lazy_dict.deferred_keys()
551 >>> print(f"Not computed: {deferred}")
552 """
553 return [
554 key
555 for key, value in super().items()
556 if isinstance(value, LazyResult) and not value.is_computed()
557 ]
560def lazy[T](fn: Callable[..., T]) -> Callable[..., LazyResult[T]]:
561 """Decorator to make a function return a LazyResult.
563 Wraps a function so it returns a LazyResult instead of computing
564 immediately. Useful for expensive analysis functions.
566 Args:
567 fn: Function to wrap.
569 Returns:
570 Wrapped function that returns LazyResult.
572 Example:
573 >>> @lazy
574 ... def compute_fft(signal, nfft):
575 ... print("Computing FFT...")
576 ... return np.fft.fft(signal, n=nfft)
577 >>>
578 >>> # Returns LazyResult, doesn't compute yet
579 >>> lazy_fft = compute_fft(signal, 8192)
580 >>> print("Created lazy result")
581 >>>
582 >>> # Access triggers computation
583 >>> spectrum = lazy_fft.value
584 >>> # Prints: "Computing FFT..."
585 >>>
586 >>> # Second access uses cache
587 >>> spectrum2 = lazy_fft.value
588 >>> # No print - uses cached result
590 References:
591 PERF-002: Lazy evaluation for analysis pipelines
592 """
594 @functools.wraps(fn)
595 def wrapper(*args: Any, **kwargs: Any) -> LazyResult[T]:
596 def compute() -> T:
597 return fn(*args, **kwargs)
599 return LazyResult(
600 compute,
601 name=fn.__name__,
602 )
604 return wrapper
607class LazyAnalysisResult:
608 """Lazy wrapper for multi-domain analysis results.
610 Provides lazy evaluation for analysis engines that produce results across
611 multiple domains (time, frequency, statistical, etc.). Only computes
612 results for domains that are actually accessed.
615 Attributes:
616 domains: List of available analysis domains.
618 Example:
619 >>> # Create analyzer with multiple domains
620 >>> analyzer = SignalAnalyzer()
621 >>>
622 >>> # Wrap in lazy result - nothing computed yet
623 >>> lazy_results = LazyAnalysisResult(
624 ... analyzer,
625 ... signal_data,
626 ... domains=['time', 'frequency', 'statistics']
627 ... )
628 >>>
629 >>> # Only compute frequency domain
630 >>> freq_results = lazy_results.get_domain('frequency')
631 >>> # Time and statistics domains not computed
632 >>>
633 >>> # Check what's been computed
634 >>> print(lazy_results.computed_domains()) # ['frequency']
635 >>> print(lazy_results.deferred_domains()) # ['time', 'statistics']
636 >>>
637 >>> # Access multiple domains
638 >>> all_results = lazy_results.compute_all()
640 References:
641 PERF-002: Lazy evaluation requirements
642 API-012: Multi-domain analysis patterns
643 """
645 def __init__(
646 self,
647 engine: Any,
648 data: Any,
649 domains: list[str],
650 *,
651 compute_fn_template: Callable[[Any, Any, str], Any] | None = None,
652 ):
653 """Initialize lazy analysis result.
655 Args:
656 engine: Analysis engine instance.
657 data: Input data for analysis.
658 domains: List of available analysis domains.
659 compute_fn_template: Optional custom compute function.
660 Signature: fn(engine, data, domain) -> result.
661 Default uses engine.analyze(data, domain=domain).
663 Example:
664 >>> lazy_results = LazyAnalysisResult(
665 ... my_analyzer,
666 ... signal_data,
667 ... domains=['time', 'frequency', 'wavelet']
668 ... )
669 """
670 self._engine = engine
671 self._data = data
672 self.domains = domains
673 self._compute_fn = compute_fn_template or self._default_compute
675 # Create lazy results for each domain
676 self._domain_results = LazyDict()
677 for domain in domains:
679 def make_compute_fn(d: str = domain) -> Callable[[], Any]:
680 def compute_domain() -> Any:
681 return self._compute_fn(self._engine, self._data, d)
683 return compute_domain
685 self._domain_results[domain] = LazyResult(
686 make_compute_fn(),
687 name=f"{engine.__class__.__name__}.{domain}",
688 )
690 def _default_compute(self, engine: Any, data: Any, domain: str) -> dict[str, Any]:
691 """Default compute function for domain analysis.
693 Args:
694 engine: Analysis engine.
695 data: Input data.
696 domain: Domain to analyze.
698 Returns:
699 Analysis result for domain.
701 Raises:
702 AttributeError: If engine has no analyze() or analyze_{domain}() method.
703 """
704 # Try common patterns
705 if hasattr(engine, "analyze"): 705 ↛ 708line 705 didn't jump to line 708 because the condition on line 705 was always true
706 result: dict[str, Any] = engine.analyze(data, domain=domain)
707 return result
708 elif hasattr(engine, f"analyze_{domain}"):
709 method = getattr(engine, f"analyze_{domain}")
710 result = method(data)
711 return result
712 else:
713 raise AttributeError(
714 f"Engine {engine.__class__.__name__} has no analyze() method "
715 f"or analyze_{domain}() method"
716 )
718 def get_domain(self, domain: str) -> Any:
719 """Get results for specific domain, computing only that domain.
721 Args:
722 domain: Domain name (e.g., 'time', 'frequency').
724 Returns:
725 Analysis results for the domain.
727 Raises:
728 KeyError: If domain not available.
730 Example:
731 >>> freq_results = lazy_results.get_domain('frequency')
732 >>> # Only frequency domain computed
734 References:
735 PERF-002: Partial evaluation
736 """
737 if domain not in self.domains: 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true
738 raise KeyError(f"Domain '{domain}' not available. Available: {self.domains}")
739 return self._domain_results[domain]
741 def computed_domains(self) -> list[str]:
742 """Get list of domains that have been computed.
744 Returns:
745 List of computed domain names.
747 Example:
748 >>> computed = lazy_results.computed_domains()
749 >>> print(f"Computed: {computed}")
750 """
751 return self._domain_results.computed_keys()
753 def deferred_domains(self) -> list[str]:
754 """Get list of domains that have not been computed.
756 Returns:
757 List of deferred domain names.
759 Example:
760 >>> deferred = lazy_results.deferred_domains()
761 >>> print(f"Not computed: {deferred}")
762 """
763 return self._domain_results.deferred_keys()
765 def compute_all(self) -> dict[str, Any]:
766 """Compute all domains and return results dictionary.
768 Returns:
769 Dictionary mapping domain names to results.
771 Example:
772 >>> all_results = lazy_results.compute_all()
773 >>> print(all_results.keys()) # All domains
775 References:
776 API-012: Bulk computation
777 """
778 return {domain: self.get_domain(domain) for domain in self.domains}
780 def invalidate_domain(self, domain: str) -> None:
781 """Invalidate a specific domain's results.
783 Args:
784 domain: Domain to invalidate.
786 Example:
787 >>> lazy_results.invalidate_domain('frequency')
788 >>> # Next access will recompute
789 """
790 self._domain_results.invalidate(domain)
792 def invalidate_all(self) -> None:
793 """Invalidate all domain results.
795 Example:
796 >>> lazy_results.invalidate_all()
797 >>> # All domains will recompute on next access
798 """
799 self._domain_results.invalidate_all()
801 def __getitem__(self, domain: str) -> Any:
802 """Dictionary-style access to domains.
804 Args:
805 domain: Domain name.
807 Returns:
808 Domain results.
810 Example:
811 >>> freq_results = lazy_results['frequency']
812 """
813 return self.get_domain(domain)
815 def __repr__(self) -> str:
816 """String representation for debugging."""
817 computed = self.computed_domains()
818 deferred = self.deferred_domains()
819 return (
820 f"LazyAnalysisResult(domains={self.domains}, computed={computed}, deferred={deferred})"
821 )
824__all__ = [
825 "LazyAnalysisResult",
826 "LazyComputeStats",
827 "LazyDict",
828 "LazyResult",
829 "get_lazy_stats",
830 "lazy",
831 "reset_lazy_stats",
832]