Coverage for src / tracekit / utils / lazy.py: 82%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Lazy evaluation utilities for deferred computation. 

2 

3This module provides lazy evaluation proxies that defer computation until 

4results are actually needed, enabling memory-efficient operation chaining. 

5 

6 

7Example: 

8 >>> from tracekit.utils.lazy import LazyArray, lazy_operation 

9 >>> # Operations are deferred until .compute() is called 

10 >>> lazy_result = lazy_operation(large_data, lambda x: x ** 2) 

11 >>> result = lazy_result.compute() # Only now is computation performed 

12 

13References: 

14 Dask documentation on lazy evaluation 

15 NumPy lazy evaluation patterns 

16""" 

17 

18from __future__ import annotations 

19 

20from abc import ABC, abstractmethod 

21from typing import TYPE_CHECKING, Any, TypeVar 

22 

23import numpy as np 

24from numpy.typing import NDArray 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Callable 

28 

29T = TypeVar("T") 

30 

31 

32class LazyProxy[T](ABC): 

33 """Abstract base class for lazy evaluation proxies. 

34 

35 Defers computation until explicitly requested via .compute(). 

36 """ 

37 

38 def __init__(self) -> None: 

39 self._computed: bool = False 

40 self._result: T | None = None 

41 

42 @abstractmethod 

43 def _evaluate(self) -> T: 

44 """Perform the actual computation. 

45 

46 Returns: 

47 Computed result. 

48 """ 

49 pass 

50 

51 def compute(self) -> T: 

52 """Evaluate and return the result. 

53 

54 Returns: 

55 Computed result (cached after first evaluation). 

56 

57 Example: 

58 >>> lazy_obj = LazyArray(lambda: np.arange(1000)) 

59 >>> result = lazy_obj.compute() 

60 """ 

61 if not self._computed: 

62 self._result = self._evaluate() 

63 self._computed = True 

64 return self._result # type: ignore[return-value] 

65 

66 def is_computed(self) -> bool: 

67 """Check if result has been computed. 

68 

69 Returns: 

70 True if compute() has been called. 

71 """ 

72 return self._computed 

73 

74 def reset(self) -> None: 

75 """Clear cached result, forcing re-evaluation on next compute().""" 

76 self._computed = False 

77 self._result = None 

78 

79 

80class LazyArray(LazyProxy[NDArray[np.floating[Any]]]): 

81 """Lazy evaluation proxy for numpy arrays. 

82 

83 Wraps a computation that returns a numpy array, deferring 

84 execution until the result is needed. 

85 

86 Args: 

87 func: Callable that returns a numpy array. 

88 args: Positional arguments for func. 

89 kwargs: Keyword arguments for func. 

90 

91 Example: 

92 >>> def expensive_computation(): 

93 ... return np.random.randn(1000000) 

94 >>> lazy = LazyArray(expensive_computation) 

95 >>> # No computation yet 

96 >>> result = lazy.compute() # Now it runs 

97 """ 

98 

99 def __init__( 

100 self, 

101 func: Callable[..., NDArray[np.floating[Any]]], 

102 *args: Any, 

103 **kwargs: Any, 

104 ) -> None: 

105 super().__init__() 

106 self._func = func 

107 self._args = args 

108 self._kwargs = kwargs 

109 

110 def _evaluate(self) -> NDArray[np.floating[Any]]: 

111 """Execute the deferred computation.""" 

112 return self._func(*self._args, **self._kwargs) 

113 

114 def __len__(self) -> int: 

115 """Get length (triggers computation).""" 

116 return len(self.compute()) 

117 

118 def __getitem__(self, key: Any) -> Any: 

119 """Get item (triggers computation).""" 

120 return self.compute()[key] 

121 

122 def shape(self) -> tuple[int, ...]: 

123 """Get shape (triggers computation).""" 

124 return self.compute().shape # type: ignore[no-any-return] 

125 

126 def dtype(self) -> np.dtype[Any]: 

127 """Get dtype (triggers computation).""" 

128 return self.compute().dtype 

129 

130 

131class LazyOperation(LazyProxy[Any]): 

132 """Lazy evaluation of an operation on data. 

133 

134 Chains operations without intermediate materialization. 

135 

136 Args: 

137 operation: Callable that performs the operation. 

138 *operands: Input data or other lazy proxies. 

139 **kwargs: Keyword arguments for the operation. 

140 

141 Example: 

142 >>> data = np.arange(1000) 

143 >>> # Chain operations without computing intermediate results 

144 >>> op1 = LazyOperation(lambda x: x ** 2, data) 

145 >>> op2 = LazyOperation(lambda x: x + 1, op1) 

146 >>> result = op2.compute() 

147 """ 

148 

149 def __init__( 

150 self, 

151 operation: Callable[..., Any], 

152 *operands: Any, 

153 **kwargs: Any, 

154 ) -> None: 

155 super().__init__() 

156 self._operation = operation 

157 self._operands = operands 

158 self._kwargs = kwargs 

159 

160 def _evaluate(self) -> Any: 

161 """Evaluate the operation, computing operands if needed.""" 

162 # Evaluate any lazy operands 

163 evaluated_operands = [] 

164 for operand in self._operands: 

165 if isinstance(operand, LazyProxy): 

166 evaluated_operands.append(operand.compute()) 

167 else: 

168 evaluated_operands.append(operand) 

169 

170 return self._operation(*evaluated_operands, **self._kwargs) 

171 

172 

173def lazy_operation[T]( 

174 func: Callable[..., T], 

175 *args: Any, 

176 **kwargs: Any, 

177) -> LazyOperation: 

178 """Create a lazy operation from a function. 

179 

180 Args: 

181 func: Function to defer. 

182 *args: Arguments to pass to func. 

183 **kwargs: Keyword arguments to pass to func. 

184 

185 Returns: 

186 LazyOperation that will execute func when computed. 

187 

188 Example: 

189 >>> import numpy as np 

190 >>> data = np.arange(1000) 

191 >>> lazy_result = lazy_operation(np.fft.fft, data) 

192 >>> # Computation happens here 

193 >>> result = lazy_result.compute() 

194 """ 

195 return LazyOperation(func, *args, **kwargs) 

196 

197 

198def auto_preview( 

199 data: NDArray[np.floating[Any]], 

200 *, 

201 downsample_factor: int = 10, 

202 preview_only: bool = False, 

203) -> NDArray[np.float64]: 

204 """Generate preview of large dataset with automatic downsampling. 

205 

206 Two-stage analysis: quick preview before full processing. 

207 

208 Args: 

209 data: Input data array. 

210 downsample_factor: Factor to downsample by for preview (default 10). 

211 preview_only: If True, return only preview. If False, return full data. 

212 

213 Returns: 

214 Preview (downsampled) or full data based on preview_only flag. 

215 

216 Example: 

217 >>> import numpy as np 

218 >>> large_data = np.random.randn(10_000_000) 

219 >>> # Quick preview 

220 >>> preview = auto_preview(large_data, preview_only=True) 

221 >>> print(f"Preview shape: {preview.shape}") 

222 >>> # Full data 

223 >>> full = auto_preview(large_data, preview_only=False) 

224 

225 References: 

226 MEM-026: Two-Stage Analysis (Preview + Full) 

227 """ 

228 if preview_only or len(data) > 1_000_000: 

229 # Generate downsampled preview 

230 preview = data[::downsample_factor].copy() 

231 return preview.astype(np.float64) 

232 else: 

233 # Small enough, return full data 

234 return data.astype(np.float64) 

235 

236 

237def select_roi( 

238 data: NDArray[np.floating[Any]], 

239 start: int | None = None, 

240 end: int | None = None, 

241 *, 

242 start_time: float | None = None, 

243 end_time: float | None = None, 

244 sample_rate: float | None = None, 

245) -> NDArray[np.float64]: 

246 """Select region of interest from data. 

247 

248 Allows selection by sample indices or time values. 

249 

250 Args: 

251 data: Input data array. 

252 start: Start sample index (inclusive). 

253 end: End sample index (exclusive). 

254 start_time: Start time in seconds (alternative to start). 

255 end_time: End time in seconds (alternative to end). 

256 sample_rate: Sample rate in Hz (required if using time-based selection). 

257 

258 Returns: 

259 Selected region of interest. 

260 

261 Raises: 

262 ValueError: If time-based selection used without sample_rate. 

263 

264 Example: 

265 >>> import numpy as np 

266 >>> data = np.random.randn(10_000_000) 

267 >>> # Select by sample indices 

268 >>> roi = select_roi(data, start=1000, end=2000) 

269 >>> # Select by time 

270 >>> roi_time = select_roi( 

271 ... data, start_time=0.001, end_time=0.002, sample_rate=1e6 

272 ... ) 

273 

274 References: 

275 MEM-027: Region-of-Interest Selection from Preview 

276 """ 

277 # Convert time-based to sample-based 

278 if start_time is not None or end_time is not None: 

279 if sample_rate is None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 raise ValueError("sample_rate required for time-based selection") 

281 

282 if start_time is not None: 282 ↛ 284line 282 didn't jump to line 284 because the condition on line 282 was always true

283 start = int(start_time * sample_rate) 

284 if end_time is not None: 284 ↛ 288line 284 didn't jump to line 288 because the condition on line 284 was always true

285 end = int(end_time * sample_rate) 

286 

287 # Apply defaults 

288 if start is None: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true

289 start = 0 

290 if end is None: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 end = len(data) 

292 

293 # Validate and clip to bounds 

294 start = max(0, start) 

295 end = min(len(data), end) 

296 

297 if start >= end: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 raise ValueError(f"Invalid ROI: start ({start}) >= end ({end})") 

299 

300 # Extract region 

301 return data[start:end].astype(np.float64) 

302 

303 

304class ProgressiveResolution: 

305 """Progressive resolution analyzer for large datasets. 

306 

307 Implements coarse-to-fine analysis: preview then zoom into ROI. 

308 

309 Args: 

310 data: Input data array or lazy proxy. 

311 sample_rate: Sample rate in Hz. 

312 

313 Example: 

314 >>> import numpy as np 

315 >>> data = np.random.randn(100_000_000) 

316 >>> analyzer = ProgressiveResolution(data, sample_rate=1e6) 

317 >>> # Stage 1: Preview 

318 >>> preview = analyzer.get_preview(downsample_factor=100) 

319 >>> # Stage 2: User selects ROI 

320 >>> roi_data = analyzer.get_roi(start_time=0.5, end_time=0.6) 

321 

322 References: 

323 MEM-013: Progressive Resolution (Coarse-to-Fine) 

324 """ 

325 

326 def __init__( 

327 self, 

328 data: NDArray[np.floating[Any]] | LazyProxy[NDArray[np.floating[Any]]], 

329 sample_rate: float, 

330 ) -> None: 

331 self._data = data 

332 self._sample_rate = sample_rate 

333 self._preview: NDArray[np.float64] | None = None 

334 self._preview_factor: int | None = None 

335 

336 def get_preview( 

337 self, 

338 downsample_factor: int = 10, 

339 force_recompute: bool = False, 

340 ) -> NDArray[np.float64]: 

341 """Generate low-resolution preview. 

342 

343 Args: 

344 downsample_factor: Factor to downsample by. 

345 force_recompute: If True, recompute even if cached. 

346 

347 Returns: 

348 Downsampled preview of data. 

349 """ 

350 if self._preview is not None and not force_recompute: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 if self._preview_factor == downsample_factor: 

352 return self._preview 

353 

354 # Get full data 

355 data = self._data.compute() if isinstance(self._data, LazyProxy) else self._data 

356 

357 # Downsample 

358 self._preview = data[::downsample_factor].copy().astype(np.float64) 

359 self._preview_factor = downsample_factor 

360 

361 return self._preview 

362 

363 def get_roi( 

364 self, 

365 start: int | None = None, 

366 end: int | None = None, 

367 *, 

368 start_time: float | None = None, 

369 end_time: float | None = None, 

370 ) -> NDArray[np.float64]: 

371 """Get high-resolution region of interest. 

372 

373 Args: 

374 start: Start sample index. 

375 end: End sample index. 

376 start_time: Start time in seconds (alternative). 

377 end_time: End time in seconds (alternative). 

378 

379 Returns: 

380 Full-resolution ROI data. 

381 """ 

382 # Get full data 

383 data = self._data.compute() if isinstance(self._data, LazyProxy) else self._data 

384 

385 return select_roi( 

386 data, 

387 start=start, 

388 end=end, 

389 start_time=start_time, 

390 end_time=end_time, 

391 sample_rate=self._sample_rate, 

392 ) 

393 

394 @property 

395 def sample_rate(self) -> float: 

396 """Sample rate in Hz.""" 

397 return self._sample_rate 

398 

399 

400__all__ = [ 

401 "LazyArray", 

402 "LazyOperation", 

403 "LazyProxy", 

404 "ProgressiveResolution", 

405 "auto_preview", 

406 "lazy_operation", 

407 "select_roi", 

408]