Coverage for src / tracekit / utils / buffer.py: 100%
134 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Buffer utilities for streaming data.
3This module provides circular buffer implementation for
4streaming data with O(1) operations.
7Example:
8 >>> from tracekit.utils.buffer import CircularBuffer
9 >>> buf = CircularBuffer(1000)
10 >>> buf.append(value)
11 >>> recent = buf.get_last(100)
13References:
14 Ring buffer data structure
15"""
17from __future__ import annotations
19from typing import TYPE_CHECKING, Any, TypeVar, overload
21import numpy as np
23if TYPE_CHECKING:
24 from numpy.typing import DTypeLike, NDArray
26T = TypeVar("T")
29class CircularBuffer[T]:
30 """Fixed-size circular buffer with O(1) operations.
32 Thread-safe for single producer, single consumer pattern.
34 Args:
35 capacity: Maximum buffer size.
36 dtype: NumPy dtype for numeric buffers.
38 Attributes:
39 capacity: Buffer capacity.
40 count: Current number of items.
42 Example:
43 >>> buf = CircularBuffer(1000, dtype=np.float64)
44 >>> for value in stream:
45 ... buf.append(value)
46 ... if buf.is_full():
47 ... process(buf.get_all())
48 """
50 def __init__(
51 self,
52 capacity: int,
53 dtype: DTypeLike | None = None,
54 ) -> None:
55 """Initialize circular buffer.
57 Args:
58 capacity: Maximum buffer size.
59 dtype: NumPy dtype. If None, uses object array.
60 """
61 self._capacity = capacity
62 self._dtype = dtype
64 if dtype is not None:
65 self._data: NDArray[Any] = np.zeros(capacity, dtype=dtype)
66 else:
67 self._data = np.empty(capacity, dtype=object)
69 self._head = 0 # Next write position
70 self._count = 0 # Number of valid items
72 @property
73 def capacity(self) -> int:
74 """Get buffer capacity."""
75 return self._capacity
77 @property
78 def count(self) -> int:
79 """Get current item count."""
80 return self._count
82 def is_empty(self) -> bool:
83 """Check if buffer is empty."""
84 return self._count == 0
86 def is_full(self) -> bool:
87 """Check if buffer is full."""
88 return self._count == self._capacity
90 def append(self, value: T) -> None:
91 """Append value to buffer.
93 O(1) operation. Overwrites oldest value if full.
95 Args:
96 value: Value to append.
97 """
98 self._data[self._head] = value
99 self._head = (self._head + 1) % self._capacity
101 if self._count < self._capacity:
102 self._count += 1
104 def extend(self, values: list[T] | NDArray[Any]) -> None:
105 """Extend buffer with multiple values.
107 Args:
108 values: Values to append.
109 """
110 for value in values:
111 self.append(value)
113 def get_last(self, n: int = 1) -> NDArray[Any]:
114 """Get last n items.
116 Args:
117 n: Number of items (default 1).
119 Returns:
120 Array of last n items (newest first).
121 """
122 n = min(n, self._count)
124 if n == 0:
125 if self._dtype is not None:
126 return np.array([], dtype=self._dtype)
127 return np.array([], dtype=object)
129 result = np.empty(n, dtype=self._data.dtype)
131 for i in range(n):
132 idx = (self._head - 1 - i) % self._capacity
133 result[i] = self._data[idx]
135 return result
137 def get_first(self, n: int = 1) -> NDArray[Any]:
138 """Get first n items (oldest).
140 Args:
141 n: Number of items.
143 Returns:
144 Array of first n items (oldest first).
145 """
146 n = min(n, self._count)
148 if n == 0:
149 if self._dtype is not None:
150 return np.array([], dtype=self._dtype)
151 return np.array([], dtype=object)
153 # Calculate tail position (oldest item)
154 tail = (self._head - self._count) % self._capacity
156 result = np.empty(n, dtype=self._data.dtype)
158 for i in range(n):
159 idx = (tail + i) % self._capacity
160 result[i] = self._data[idx]
162 return result
164 def get_all(self) -> NDArray[Any]:
165 """Get all items in order.
167 Returns:
168 Array of all items (oldest first).
169 """
170 return self.get_first(self._count)
172 @overload
173 def __getitem__(self, index: int) -> T: ...
175 @overload
176 def __getitem__(self, index: slice) -> NDArray[Any]: ...
178 def __getitem__(self, index: int | slice) -> T | NDArray[Any]:
179 """Get item(s) by index.
181 Positive indices count from oldest (0 = oldest).
182 Negative indices count from newest (-1 = newest).
184 Args:
185 index: Integer index or slice.
187 Returns:
188 Item or array of items.
190 Raises:
191 IndexError: If index out of range.
192 """
193 if isinstance(index, slice):
194 # Convert slice to indices
195 start, stop, step = index.indices(self._count)
196 result = []
197 for i in range(start, stop, step):
198 result.append(self[i])
199 return np.array(result, dtype=self._data.dtype)
201 if index < 0:
202 index = self._count + index
204 if index < 0 or index >= self._count:
205 raise IndexError(f"Index {index} out of range [0, {self._count})")
207 # Calculate actual position
208 tail = (self._head - self._count) % self._capacity
209 actual_idx = (tail + index) % self._capacity
211 return self._data[actual_idx] # type: ignore[no-any-return]
213 def __len__(self) -> int:
214 """Get current item count."""
215 return self._count
217 def clear(self) -> None:
218 """Clear all items."""
219 self._head = 0
220 self._count = 0
222 def mean(self) -> float:
223 """Compute mean of numeric buffer.
225 Returns:
226 Mean value, or NaN if empty.
227 """
228 if self._count == 0:
229 return float("nan")
231 return float(np.mean(self.get_all()))
233 def std(self) -> float:
234 """Compute standard deviation.
236 Returns:
237 Standard deviation, or NaN if empty.
238 """
239 if self._count < 2:
240 return float("nan")
242 return float(np.std(self.get_all()))
244 def min(self) -> T:
245 """Get minimum value.
247 Returns:
248 Minimum value.
250 Raises:
251 ValueError: If buffer is empty.
252 """
253 if self._count == 0:
254 raise ValueError("Buffer is empty")
256 return np.min(self.get_all()) # type: ignore[no-any-return]
258 def max(self) -> T:
259 """Get maximum value.
261 Returns:
262 Maximum value.
264 Raises:
265 ValueError: If buffer is empty.
266 """
267 if self._count == 0:
268 raise ValueError("Buffer is empty")
270 return np.max(self.get_all()) # type: ignore[no-any-return]
273class SlidingWindow:
274 """Sliding window for time-series analysis.
276 Maintains a window of samples based on time or count.
278 Args:
279 window_size: Window size in samples or seconds.
280 time_based: If True, window_size is in seconds.
282 Example:
283 >>> window = SlidingWindow(1000) # 1000 samples
284 >>> for sample, time in stream:
285 ... window.add(sample, time)
286 ... if window.is_ready():
287 ... result = analyze(window.get_data())
288 """
290 def __init__(
291 self,
292 window_size: int | float,
293 time_based: bool = False,
294 dtype: DTypeLike = np.float64,
295 ) -> None:
296 """Initialize sliding window.
298 Args:
299 window_size: Size in samples or seconds.
300 time_based: True for time-based window.
301 dtype: Data type for samples.
302 """
303 self._window_size = window_size
304 self._time_based = time_based
306 if time_based:
307 # Use large buffer for time-based
308 capacity = 100000
309 else:
310 capacity = int(window_size)
312 self._data = CircularBuffer(capacity, dtype=dtype) # type: ignore[var-annotated]
313 self._times = CircularBuffer(capacity, dtype=np.float64) # type: ignore[var-annotated]
315 def add(self, value: float, timestamp: float | None = None) -> None:
316 """Add sample to window.
318 Args:
319 value: Sample value.
320 timestamp: Sample timestamp (required for time-based).
322 Raises:
323 ValueError: If timestamp is None for time-based window.
324 """
325 self._data.append(value)
327 if self._time_based:
328 if timestamp is None:
329 raise ValueError("Timestamp required for time-based window")
330 self._times.append(timestamp)
332 def is_ready(self) -> bool:
333 """Check if window is full."""
334 if self._time_based:
335 if self._times.count < 2:
336 return False
337 times = self._times.get_all()
338 duration = times[-1] - times[0]
339 return duration >= self._window_size # type: ignore[no-any-return]
340 else:
341 return self._data.count >= self._window_size
343 def get_data(self) -> NDArray[np.float64]:
344 """Get window data.
346 Returns:
347 Array of samples in window.
348 """
349 if self._time_based:
350 # Get samples within time window
351 times = self._times.get_all()
352 data = self._data.get_all()
354 if len(times) == 0:
355 return np.array([], dtype=np.float64)
357 cutoff = times[-1] - self._window_size
358 mask = times >= cutoff
360 result: NDArray[np.float64] = data[mask]
361 return result
362 else:
363 result_all: NDArray[np.float64] = self._data.get_all()
364 return result_all
366 def get_times(self) -> NDArray[np.float64]:
367 """Get timestamps for time-based window.
369 Returns:
370 Array of timestamps.
372 Raises:
373 ValueError: If not a time-based window.
374 """
375 if not self._time_based:
376 raise ValueError("Not a time-based window")
378 return self._times.get_all()
380 def clear(self) -> None:
381 """Clear window."""
382 self._data.clear()
383 self._times.clear()
386__all__ = [
387 "CircularBuffer",
388 "SlidingWindow",
389]