Coverage for src / tracekit / visualization / rendering.py: 95%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Rendering optimization for large datasets and streaming updates. 

2 

3This module provides level-of-detail rendering, progressive rendering, 

4and memory-efficient plot updates for high-performance visualization. 

5 

6 

7Example: 

8 >>> from tracekit.visualization.rendering import render_with_lod 

9 >>> time_lod, data_lod = render_with_lod(time, data, screen_width=1920) 

10 

11References: 

12 - Level-of-detail (LOD) rendering techniques 

13 - Min-max envelope for waveform rendering 

14 - Progressive rendering algorithms 

15 - Streaming data visualization 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING, Literal 

21 

22import numpy as np 

23 

24if TYPE_CHECKING: 

25 from numpy.typing import NDArray 

26 

27 

28def render_with_lod( 

29 time: NDArray[np.float64], 

30 data: NDArray[np.float64], 

31 *, 

32 screen_width: int = 1920, 

33 samples_per_pixel: float = 2.0, 

34 max_points: int = 100_000, 

35 method: Literal["minmax", "lttb", "uniform"] = "minmax", 

36) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

37 """Render signal with level-of-detail decimation./VIS-019. 

38 

39 Reduces number of points while preserving visual appearance using 

40 intelligent downsampling. Target: <100k points at any zoom level. 

41 

42 Args: 

43 time: Time array. 

44 data: Signal data array. 

45 screen_width: Screen width in pixels. 

46 samples_per_pixel: Target samples per pixel (2.0 recommended). 

47 max_points: Maximum points to render (default: 100k). 

48 method: Decimation method ("minmax", "lttb", "uniform"). 

49 

50 Returns: 

51 Tuple of (decimated_time, decimated_data). 

52 

53 Raises: 

54 ValueError: If arrays are invalid or method unknown. 

55 

56 Example: 

57 >>> # 1M sample signal decimated for 1920px display 

58 >>> time_lod, data_lod = render_with_lod(time, data, screen_width=1920) 

59 >>> print(len(data_lod)) # ~3840 samples (2 per pixel) 

60 

61 References: 

62 VIS-017: Performance - LOD Rendering 

63 VIS-019: Memory-Efficient Plot Rendering 

64 """ 

65 if len(time) == 0 or len(data) == 0: 

66 raise ValueError("Time or data array is empty") 

67 

68 if len(time) != len(data): 

69 raise ValueError(f"Time and data length mismatch: {len(time)} vs {len(data)}") 

70 

71 # Calculate target point count 

72 target_points = min( 

73 int(screen_width * samples_per_pixel), 

74 max_points, 

75 ) 

76 

77 # Skip decimation if already below target 

78 if len(data) <= target_points: 

79 return (time, data) 

80 

81 # Apply decimation 

82 if method == "uniform": 

83 return _decimate_uniform(time, data, target_points) 

84 elif method == "minmax": 

85 return _decimate_minmax_envelope(time, data, target_points) 

86 elif method == "lttb": 

87 return _decimate_lttb(time, data, target_points) 

88 else: 

89 raise ValueError(f"Unknown decimation method: {method}") 

90 

91 

92def _decimate_uniform( 

93 time: NDArray[np.float64], 

94 data: NDArray[np.float64], 

95 target_points: int, 

96) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

97 """Uniform stride decimation (simple but loses peaks). 

98 

99 Args: 

100 time: Time array. 

101 data: Signal data array. 

102 target_points: Target number of points after decimation. 

103 

104 Returns: 

105 Tuple of (decimated_time, decimated_data). 

106 """ 

107 stride = len(data) // target_points 

108 stride = max(stride, 1) 

109 

110 indices = np.arange(0, len(data), stride)[:target_points] 

111 return (time[indices], data[indices]) 

112 

113 

114def _decimate_minmax_envelope( 

115 time: NDArray[np.float64], 

116 data: NDArray[np.float64], 

117 target_points: int, 

118) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

119 """Min-max envelope decimation - preserves peaks and valleys. 

120 

121 This method ensures all signal extrema are preserved in the decimated view. 

122 

123 Args: 

124 time: Time array. 

125 data: Signal data array. 

126 target_points: Target number of points after decimation. 

127 

128 Returns: 

129 Tuple of (decimated_time, decimated_data). 

130 """ 

131 # Calculate bucket size (each bucket contributes 2 points: min and max) 

132 bucket_size = len(data) // (target_points // 2) 

133 

134 if bucket_size < 1: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 return (time, data) 

136 

137 decimated_time = [] 

138 decimated_data = [] 

139 

140 for i in range(0, len(data), bucket_size): 

141 bucket_data = data[i : i + bucket_size] 

142 bucket_time = time[i : i + bucket_size] 

143 

144 if len(bucket_data) == 0: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 continue 

146 

147 # Find min and max in bucket 

148 min_idx = np.argmin(bucket_data) 

149 max_idx = np.argmax(bucket_data) 

150 

151 # Add in chronological order 

152 if min_idx < max_idx: 

153 decimated_time.extend([bucket_time[min_idx], bucket_time[max_idx]]) 

154 decimated_data.extend([bucket_data[min_idx], bucket_data[max_idx]]) 

155 else: 

156 decimated_time.extend([bucket_time[max_idx], bucket_time[min_idx]]) 

157 decimated_data.extend([bucket_data[max_idx], bucket_data[min_idx]]) 

158 

159 return (np.array(decimated_time), np.array(decimated_data)) 

160 

161 

162def _decimate_lttb( 

163 time: NDArray[np.float64], 

164 data: NDArray[np.float64], 

165 target_points: int, 

166) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

167 """Largest Triangle Three Buckets decimation. 

168 

169 Preserves visual shape by maximizing triangle areas. 

170 

171 Args: 

172 time: Time array. 

173 data: Signal data array. 

174 target_points: Target number of points after decimation. 

175 

176 Returns: 

177 Tuple of (decimated_time, decimated_data). 

178 """ 

179 if len(data) <= target_points: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 return (time, data) 

181 

182 # Always include first and last points 

183 sampled_time = [time[0]] 

184 sampled_data = [data[0]] 

185 

186 bucket_size = (len(data) - 2) / (target_points - 2) 

187 

188 prev_idx = 0 

189 

190 for i in range(target_points - 2): 

191 # Average point of next bucket 

192 avg_range_start = int((i + 1) * bucket_size) + 1 

193 avg_range_end = int((i + 2) * bucket_size) + 1 

194 avg_range_end = min(avg_range_end, len(data)) 

195 

196 if avg_range_start < avg_range_end: 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was always true

197 avg_time = np.mean(time[avg_range_start:avg_range_end]) 

198 avg_data = np.mean(data[avg_range_start:avg_range_end]) 

199 else: 

200 avg_time = time[-1] 

201 avg_data = data[-1] 

202 

203 # Current bucket range 

204 range_start = int(i * bucket_size) + 1 

205 range_end = int((i + 1) * bucket_size) + 1 

206 range_end = min(range_end, len(data) - 1) 

207 

208 # Find point in bucket that forms largest triangle 

209 max_area = -1.0 

210 max_idx = range_start 

211 

212 for idx in range(range_start, range_end): 

213 # Calculate triangle area 

214 area = abs( 

215 (time[prev_idx] - avg_time) * (data[idx] - data[prev_idx]) 

216 - (time[prev_idx] - time[idx]) * (avg_data - data[prev_idx]) 

217 ) 

218 

219 if area > max_area: 

220 max_area = area 

221 max_idx = idx 

222 

223 sampled_time.append(time[max_idx]) 

224 sampled_data.append(data[max_idx]) 

225 prev_idx = max_idx 

226 

227 # Always include last point 

228 sampled_time.append(time[-1]) 

229 sampled_data.append(data[-1]) 

230 

231 return (np.array(sampled_time), np.array(sampled_data)) 

232 

233 

234def progressive_render( 

235 time: NDArray[np.float64], 

236 data: NDArray[np.float64], 

237 *, 

238 viewport: tuple[float, float] | None = None, 

239 priority: Literal["viewport", "full"] = "viewport", 

240) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

241 """Progressive rendering - render visible viewport first. 

242 

243 Args: 

244 time: Time array. 

245 data: Signal data array. 

246 viewport: Visible viewport (t_min, t_max). None = full range. 

247 priority: Rendering priority ("viewport" = visible first, "full" = all data). 

248 

249 Returns: 

250 Tuple of (time, data) for priority rendering. 

251 

252 Example: 

253 >>> # Render only visible portion for fast initial display 

254 >>> time_vis, data_vis = progressive_render( 

255 ... time, data, viewport=(0, 0.001), priority="viewport" 

256 ... ) 

257 

258 References: 

259 VIS-019: Memory-Efficient Plot Rendering (progressive rendering) 

260 """ 

261 if viewport is None or priority == "full": 

262 return (time, data) 

263 

264 t_min, t_max = viewport 

265 

266 # Find indices within viewport 

267 mask = (time >= t_min) & (time <= t_max) 

268 indices = np.where(mask)[0] 

269 

270 if len(indices) == 0: 

271 # Viewport is outside data range 

272 return (time, data) 

273 

274 # Return viewport data first 

275 viewport_time = time[indices] 

276 viewport_data = data[indices] 

277 

278 return (viewport_time, viewport_data) 

279 

280 

281def estimate_memory_usage( 

282 n_samples: int, 

283 n_channels: int = 1, 

284 dtype: type = np.float64, 

285) -> float: 

286 """Estimate memory usage for plot rendering. 

287 

288 Args: 

289 n_samples: Number of samples per channel. 

290 n_channels: Number of channels. 

291 dtype: Data type for arrays. 

292 

293 Returns: 

294 Estimated memory usage in MB. 

295 

296 Example: 

297 >>> mem_mb = estimate_memory_usage(1_000_000, n_channels=4) 

298 >>> print(f"Memory: {mem_mb:.1f} MB") 

299 

300 References: 

301 VIS-019: Memory-Efficient Plot Rendering 

302 """ 

303 # Bytes per sample 

304 if dtype == np.float64: 

305 bytes_per_sample = 8 

306 elif dtype == np.float32 or dtype == np.int32: 

307 bytes_per_sample = 4 

308 elif dtype == np.int16: 

309 bytes_per_sample = 2 

310 else: 

311 bytes_per_sample = 8 # Default 

312 

313 # Total memory: time + data arrays per channel 

314 # Time array: n_samples * bytes_per_sample 

315 # Data arrays: n_channels * n_samples * bytes_per_sample 

316 total_bytes = (1 + n_channels) * n_samples * bytes_per_sample 

317 

318 # Convert to MB 

319 return total_bytes / (1024 * 1024) 

320 

321 

322def downsample_for_memory( 

323 time: NDArray[np.float64], 

324 data: NDArray[np.float64], 

325 *, 

326 target_memory_mb: float = 50.0, 

327) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

328 """Downsample signal to meet memory target. 

329 

330 Args: 

331 time: Time array. 

332 data: Signal data array. 

333 target_memory_mb: Target memory usage in MB. 

334 

335 Returns: 

336 Tuple of (decimated_time, decimated_data). 

337 

338 Example: 

339 >>> # Reduce 100MB dataset to 50MB 

340 >>> time_ds, data_ds = downsample_for_memory(time, data, target_memory_mb=50.0) 

341 

342 References: 

343 VIS-019: Memory-Efficient Plot Rendering (memory target <50MB per subplot) 

344 """ 

345 current_memory = estimate_memory_usage(len(data), n_channels=1) 

346 

347 if current_memory <= target_memory_mb: 

348 # Already within target 

349 return (time, data) 

350 

351 # Calculate required decimation factor 

352 decimation_factor = current_memory / target_memory_mb 

353 target_samples = int(len(data) / decimation_factor) 

354 

355 # Use min-max to preserve features 

356 return _decimate_minmax_envelope(time, data, target_samples) 

357 

358 

359class StreamingRenderer: 

360 """Streaming plot renderer for real-time data updates. 

361 

362 Handles incremental data updates without full redraws for performance. 

363 

364 Example: 

365 >>> renderer = StreamingRenderer(max_samples=10000) 

366 >>> renderer.append(new_time, new_data) 

367 >>> time, data = renderer.get_render_data() 

368 

369 References: 

370 VIS-018: Streaming Plot Updates 

371 """ 

372 

373 def __init__( 

374 self, 

375 *, 

376 max_samples: int = 10_000, 

377 decimation_method: Literal["minmax", "lttb", "uniform"] = "minmax", 

378 ): 

379 """Initialize streaming renderer. 

380 

381 Args: 

382 max_samples: Maximum samples to keep in buffer. 

383 decimation_method: Decimation method for buffer management. 

384 """ 

385 self.max_samples = max_samples 

386 self.decimation_method = decimation_method 

387 

388 self._time: list[float] = [] 

389 self._data: list[float] = [] 

390 

391 def append( 

392 self, 

393 time: NDArray[np.float64], 

394 data: NDArray[np.float64], 

395 ) -> None: 

396 """Append new data to streaming buffer. 

397 

398 Args: 

399 time: New time samples. 

400 data: New data samples. 

401 """ 

402 self._time.extend(time.tolist()) 

403 self._data.extend(data.tolist()) 

404 

405 # Decimate if buffer exceeds limit 

406 if len(self._data) > self.max_samples: 

407 self._decimate_buffer() 

408 

409 def _decimate_buffer(self) -> None: 

410 """Decimate internal buffer to max_samples.""" 

411 time_arr = np.array(self._time) 

412 data_arr = np.array(self._data) 

413 

414 time_dec, data_dec = render_with_lod( 

415 time_arr, 

416 data_arr, 

417 max_points=self.max_samples, 

418 method=self.decimation_method, 

419 ) 

420 

421 self._time = time_dec.tolist() 

422 self._data = data_dec.tolist() 

423 

424 def get_render_data(self) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

425 """Get current data for rendering. 

426 

427 Returns: 

428 Tuple of (time, data) arrays. 

429 """ 

430 return (np.array(self._time), np.array(self._data)) 

431 

432 def clear(self) -> None: 

433 """Clear streaming buffer.""" 

434 self._time.clear() 

435 self._data.clear() 

436 

437 

438__all__ = [ 

439 "StreamingRenderer", 

440 "downsample_for_memory", 

441 "estimate_memory_usage", 

442 "progressive_render", 

443 "render_with_lod", 

444]