Coverage for src / tracekit / visualization / rendering.py: 95%
125 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Rendering optimization for large datasets and streaming updates.
3This module provides level-of-detail rendering, progressive rendering,
4and memory-efficient plot updates for high-performance visualization.
7Example:
8 >>> from tracekit.visualization.rendering import render_with_lod
9 >>> time_lod, data_lod = render_with_lod(time, data, screen_width=1920)
11References:
12 - Level-of-detail (LOD) rendering techniques
13 - Min-max envelope for waveform rendering
14 - Progressive rendering algorithms
15 - Streaming data visualization
16"""
18from __future__ import annotations
20from typing import TYPE_CHECKING, Literal
22import numpy as np
24if TYPE_CHECKING:
25 from numpy.typing import NDArray
28def render_with_lod(
29 time: NDArray[np.float64],
30 data: NDArray[np.float64],
31 *,
32 screen_width: int = 1920,
33 samples_per_pixel: float = 2.0,
34 max_points: int = 100_000,
35 method: Literal["minmax", "lttb", "uniform"] = "minmax",
36) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
37 """Render signal with level-of-detail decimation./VIS-019.
39 Reduces number of points while preserving visual appearance using
40 intelligent downsampling. Target: <100k points at any zoom level.
42 Args:
43 time: Time array.
44 data: Signal data array.
45 screen_width: Screen width in pixels.
46 samples_per_pixel: Target samples per pixel (2.0 recommended).
47 max_points: Maximum points to render (default: 100k).
48 method: Decimation method ("minmax", "lttb", "uniform").
50 Returns:
51 Tuple of (decimated_time, decimated_data).
53 Raises:
54 ValueError: If arrays are invalid or method unknown.
56 Example:
57 >>> # 1M sample signal decimated for 1920px display
58 >>> time_lod, data_lod = render_with_lod(time, data, screen_width=1920)
59 >>> print(len(data_lod)) # ~3840 samples (2 per pixel)
61 References:
62 VIS-017: Performance - LOD Rendering
63 VIS-019: Memory-Efficient Plot Rendering
64 """
65 if len(time) == 0 or len(data) == 0:
66 raise ValueError("Time or data array is empty")
68 if len(time) != len(data):
69 raise ValueError(f"Time and data length mismatch: {len(time)} vs {len(data)}")
71 # Calculate target point count
72 target_points = min(
73 int(screen_width * samples_per_pixel),
74 max_points,
75 )
77 # Skip decimation if already below target
78 if len(data) <= target_points:
79 return (time, data)
81 # Apply decimation
82 if method == "uniform":
83 return _decimate_uniform(time, data, target_points)
84 elif method == "minmax":
85 return _decimate_minmax_envelope(time, data, target_points)
86 elif method == "lttb":
87 return _decimate_lttb(time, data, target_points)
88 else:
89 raise ValueError(f"Unknown decimation method: {method}")
92def _decimate_uniform(
93 time: NDArray[np.float64],
94 data: NDArray[np.float64],
95 target_points: int,
96) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
97 """Uniform stride decimation (simple but loses peaks).
99 Args:
100 time: Time array.
101 data: Signal data array.
102 target_points: Target number of points after decimation.
104 Returns:
105 Tuple of (decimated_time, decimated_data).
106 """
107 stride = len(data) // target_points
108 stride = max(stride, 1)
110 indices = np.arange(0, len(data), stride)[:target_points]
111 return (time[indices], data[indices])
114def _decimate_minmax_envelope(
115 time: NDArray[np.float64],
116 data: NDArray[np.float64],
117 target_points: int,
118) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
119 """Min-max envelope decimation - preserves peaks and valleys.
121 This method ensures all signal extrema are preserved in the decimated view.
123 Args:
124 time: Time array.
125 data: Signal data array.
126 target_points: Target number of points after decimation.
128 Returns:
129 Tuple of (decimated_time, decimated_data).
130 """
131 # Calculate bucket size (each bucket contributes 2 points: min and max)
132 bucket_size = len(data) // (target_points // 2)
134 if bucket_size < 1: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 return (time, data)
137 decimated_time = []
138 decimated_data = []
140 for i in range(0, len(data), bucket_size):
141 bucket_data = data[i : i + bucket_size]
142 bucket_time = time[i : i + bucket_size]
144 if len(bucket_data) == 0: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 continue
147 # Find min and max in bucket
148 min_idx = np.argmin(bucket_data)
149 max_idx = np.argmax(bucket_data)
151 # Add in chronological order
152 if min_idx < max_idx:
153 decimated_time.extend([bucket_time[min_idx], bucket_time[max_idx]])
154 decimated_data.extend([bucket_data[min_idx], bucket_data[max_idx]])
155 else:
156 decimated_time.extend([bucket_time[max_idx], bucket_time[min_idx]])
157 decimated_data.extend([bucket_data[max_idx], bucket_data[min_idx]])
159 return (np.array(decimated_time), np.array(decimated_data))
162def _decimate_lttb(
163 time: NDArray[np.float64],
164 data: NDArray[np.float64],
165 target_points: int,
166) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
167 """Largest Triangle Three Buckets decimation.
169 Preserves visual shape by maximizing triangle areas.
171 Args:
172 time: Time array.
173 data: Signal data array.
174 target_points: Target number of points after decimation.
176 Returns:
177 Tuple of (decimated_time, decimated_data).
178 """
179 if len(data) <= target_points: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 return (time, data)
182 # Always include first and last points
183 sampled_time = [time[0]]
184 sampled_data = [data[0]]
186 bucket_size = (len(data) - 2) / (target_points - 2)
188 prev_idx = 0
190 for i in range(target_points - 2):
191 # Average point of next bucket
192 avg_range_start = int((i + 1) * bucket_size) + 1
193 avg_range_end = int((i + 2) * bucket_size) + 1
194 avg_range_end = min(avg_range_end, len(data))
196 if avg_range_start < avg_range_end: 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was always true
197 avg_time = np.mean(time[avg_range_start:avg_range_end])
198 avg_data = np.mean(data[avg_range_start:avg_range_end])
199 else:
200 avg_time = time[-1]
201 avg_data = data[-1]
203 # Current bucket range
204 range_start = int(i * bucket_size) + 1
205 range_end = int((i + 1) * bucket_size) + 1
206 range_end = min(range_end, len(data) - 1)
208 # Find point in bucket that forms largest triangle
209 max_area = -1.0
210 max_idx = range_start
212 for idx in range(range_start, range_end):
213 # Calculate triangle area
214 area = abs(
215 (time[prev_idx] - avg_time) * (data[idx] - data[prev_idx])
216 - (time[prev_idx] - time[idx]) * (avg_data - data[prev_idx])
217 )
219 if area > max_area:
220 max_area = area
221 max_idx = idx
223 sampled_time.append(time[max_idx])
224 sampled_data.append(data[max_idx])
225 prev_idx = max_idx
227 # Always include last point
228 sampled_time.append(time[-1])
229 sampled_data.append(data[-1])
231 return (np.array(sampled_time), np.array(sampled_data))
234def progressive_render(
235 time: NDArray[np.float64],
236 data: NDArray[np.float64],
237 *,
238 viewport: tuple[float, float] | None = None,
239 priority: Literal["viewport", "full"] = "viewport",
240) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
241 """Progressive rendering - render visible viewport first.
243 Args:
244 time: Time array.
245 data: Signal data array.
246 viewport: Visible viewport (t_min, t_max). None = full range.
247 priority: Rendering priority ("viewport" = visible first, "full" = all data).
249 Returns:
250 Tuple of (time, data) for priority rendering.
252 Example:
253 >>> # Render only visible portion for fast initial display
254 >>> time_vis, data_vis = progressive_render(
255 ... time, data, viewport=(0, 0.001), priority="viewport"
256 ... )
258 References:
259 VIS-019: Memory-Efficient Plot Rendering (progressive rendering)
260 """
261 if viewport is None or priority == "full":
262 return (time, data)
264 t_min, t_max = viewport
266 # Find indices within viewport
267 mask = (time >= t_min) & (time <= t_max)
268 indices = np.where(mask)[0]
270 if len(indices) == 0:
271 # Viewport is outside data range
272 return (time, data)
274 # Return viewport data first
275 viewport_time = time[indices]
276 viewport_data = data[indices]
278 return (viewport_time, viewport_data)
281def estimate_memory_usage(
282 n_samples: int,
283 n_channels: int = 1,
284 dtype: type = np.float64,
285) -> float:
286 """Estimate memory usage for plot rendering.
288 Args:
289 n_samples: Number of samples per channel.
290 n_channels: Number of channels.
291 dtype: Data type for arrays.
293 Returns:
294 Estimated memory usage in MB.
296 Example:
297 >>> mem_mb = estimate_memory_usage(1_000_000, n_channels=4)
298 >>> print(f"Memory: {mem_mb:.1f} MB")
300 References:
301 VIS-019: Memory-Efficient Plot Rendering
302 """
303 # Bytes per sample
304 if dtype == np.float64:
305 bytes_per_sample = 8
306 elif dtype == np.float32 or dtype == np.int32:
307 bytes_per_sample = 4
308 elif dtype == np.int16:
309 bytes_per_sample = 2
310 else:
311 bytes_per_sample = 8 # Default
313 # Total memory: time + data arrays per channel
314 # Time array: n_samples * bytes_per_sample
315 # Data arrays: n_channels * n_samples * bytes_per_sample
316 total_bytes = (1 + n_channels) * n_samples * bytes_per_sample
318 # Convert to MB
319 return total_bytes / (1024 * 1024)
322def downsample_for_memory(
323 time: NDArray[np.float64],
324 data: NDArray[np.float64],
325 *,
326 target_memory_mb: float = 50.0,
327) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
328 """Downsample signal to meet memory target.
330 Args:
331 time: Time array.
332 data: Signal data array.
333 target_memory_mb: Target memory usage in MB.
335 Returns:
336 Tuple of (decimated_time, decimated_data).
338 Example:
339 >>> # Reduce 100MB dataset to 50MB
340 >>> time_ds, data_ds = downsample_for_memory(time, data, target_memory_mb=50.0)
342 References:
343 VIS-019: Memory-Efficient Plot Rendering (memory target <50MB per subplot)
344 """
345 current_memory = estimate_memory_usage(len(data), n_channels=1)
347 if current_memory <= target_memory_mb:
348 # Already within target
349 return (time, data)
351 # Calculate required decimation factor
352 decimation_factor = current_memory / target_memory_mb
353 target_samples = int(len(data) / decimation_factor)
355 # Use min-max to preserve features
356 return _decimate_minmax_envelope(time, data, target_samples)
359class StreamingRenderer:
360 """Streaming plot renderer for real-time data updates.
362 Handles incremental data updates without full redraws for performance.
364 Example:
365 >>> renderer = StreamingRenderer(max_samples=10000)
366 >>> renderer.append(new_time, new_data)
367 >>> time, data = renderer.get_render_data()
369 References:
370 VIS-018: Streaming Plot Updates
371 """
373 def __init__(
374 self,
375 *,
376 max_samples: int = 10_000,
377 decimation_method: Literal["minmax", "lttb", "uniform"] = "minmax",
378 ):
379 """Initialize streaming renderer.
381 Args:
382 max_samples: Maximum samples to keep in buffer.
383 decimation_method: Decimation method for buffer management.
384 """
385 self.max_samples = max_samples
386 self.decimation_method = decimation_method
388 self._time: list[float] = []
389 self._data: list[float] = []
391 def append(
392 self,
393 time: NDArray[np.float64],
394 data: NDArray[np.float64],
395 ) -> None:
396 """Append new data to streaming buffer.
398 Args:
399 time: New time samples.
400 data: New data samples.
401 """
402 self._time.extend(time.tolist())
403 self._data.extend(data.tolist())
405 # Decimate if buffer exceeds limit
406 if len(self._data) > self.max_samples:
407 self._decimate_buffer()
409 def _decimate_buffer(self) -> None:
410 """Decimate internal buffer to max_samples."""
411 time_arr = np.array(self._time)
412 data_arr = np.array(self._data)
414 time_dec, data_dec = render_with_lod(
415 time_arr,
416 data_arr,
417 max_points=self.max_samples,
418 method=self.decimation_method,
419 )
421 self._time = time_dec.tolist()
422 self._data = data_dec.tolist()
424 def get_render_data(self) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
425 """Get current data for rendering.
427 Returns:
428 Tuple of (time, data) arrays.
429 """
430 return (np.array(self._time), np.array(self._data))
432 def clear(self) -> None:
433 """Clear streaming buffer."""
434 self._time.clear()
435 self._data.clear()
438__all__ = [
439 "StreamingRenderer",
440 "downsample_for_memory",
441 "estimate_memory_usage",
442 "progressive_render",
443 "render_with_lod",
444]