Coverage for src / tracekit / utils / buffer.py: 100%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Buffer utilities for streaming data. 

2 

3This module provides circular buffer implementation for 

4streaming data with O(1) operations. 

5 

6 

7Example: 

8 >>> from tracekit.utils.buffer import CircularBuffer 

9 >>> buf = CircularBuffer(1000) 

10 >>> buf.append(value) 

11 >>> recent = buf.get_last(100) 

12 

13References: 

14 Ring buffer data structure 

15""" 

16 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Any, TypeVar, overload 

20 

21import numpy as np 

22 

23if TYPE_CHECKING: 

24 from numpy.typing import DTypeLike, NDArray 

25 

26T = TypeVar("T") 

27 

28 

29class CircularBuffer[T]: 

30 """Fixed-size circular buffer with O(1) operations. 

31 

32 Thread-safe for single producer, single consumer pattern. 

33 

34 Args: 

35 capacity: Maximum buffer size. 

36 dtype: NumPy dtype for numeric buffers. 

37 

38 Attributes: 

39 capacity: Buffer capacity. 

40 count: Current number of items. 

41 

42 Example: 

43 >>> buf = CircularBuffer(1000, dtype=np.float64) 

44 >>> for value in stream: 

45 ... buf.append(value) 

46 ... if buf.is_full(): 

47 ... process(buf.get_all()) 

48 """ 

49 

50 def __init__( 

51 self, 

52 capacity: int, 

53 dtype: DTypeLike | None = None, 

54 ) -> None: 

55 """Initialize circular buffer. 

56 

57 Args: 

58 capacity: Maximum buffer size. 

59 dtype: NumPy dtype. If None, uses object array. 

60 """ 

61 self._capacity = capacity 

62 self._dtype = dtype 

63 

64 if dtype is not None: 

65 self._data: NDArray[Any] = np.zeros(capacity, dtype=dtype) 

66 else: 

67 self._data = np.empty(capacity, dtype=object) 

68 

69 self._head = 0 # Next write position 

70 self._count = 0 # Number of valid items 

71 

72 @property 

73 def capacity(self) -> int: 

74 """Get buffer capacity.""" 

75 return self._capacity 

76 

77 @property 

78 def count(self) -> int: 

79 """Get current item count.""" 

80 return self._count 

81 

82 def is_empty(self) -> bool: 

83 """Check if buffer is empty.""" 

84 return self._count == 0 

85 

86 def is_full(self) -> bool: 

87 """Check if buffer is full.""" 

88 return self._count == self._capacity 

89 

90 def append(self, value: T) -> None: 

91 """Append value to buffer. 

92 

93 O(1) operation. Overwrites oldest value if full. 

94 

95 Args: 

96 value: Value to append. 

97 """ 

98 self._data[self._head] = value 

99 self._head = (self._head + 1) % self._capacity 

100 

101 if self._count < self._capacity: 

102 self._count += 1 

103 

104 def extend(self, values: list[T] | NDArray[Any]) -> None: 

105 """Extend buffer with multiple values. 

106 

107 Args: 

108 values: Values to append. 

109 """ 

110 for value in values: 

111 self.append(value) 

112 

113 def get_last(self, n: int = 1) -> NDArray[Any]: 

114 """Get last n items. 

115 

116 Args: 

117 n: Number of items (default 1). 

118 

119 Returns: 

120 Array of last n items (newest first). 

121 """ 

122 n = min(n, self._count) 

123 

124 if n == 0: 

125 if self._dtype is not None: 

126 return np.array([], dtype=self._dtype) 

127 return np.array([], dtype=object) 

128 

129 result = np.empty(n, dtype=self._data.dtype) 

130 

131 for i in range(n): 

132 idx = (self._head - 1 - i) % self._capacity 

133 result[i] = self._data[idx] 

134 

135 return result 

136 

137 def get_first(self, n: int = 1) -> NDArray[Any]: 

138 """Get first n items (oldest). 

139 

140 Args: 

141 n: Number of items. 

142 

143 Returns: 

144 Array of first n items (oldest first). 

145 """ 

146 n = min(n, self._count) 

147 

148 if n == 0: 

149 if self._dtype is not None: 

150 return np.array([], dtype=self._dtype) 

151 return np.array([], dtype=object) 

152 

153 # Calculate tail position (oldest item) 

154 tail = (self._head - self._count) % self._capacity 

155 

156 result = np.empty(n, dtype=self._data.dtype) 

157 

158 for i in range(n): 

159 idx = (tail + i) % self._capacity 

160 result[i] = self._data[idx] 

161 

162 return result 

163 

164 def get_all(self) -> NDArray[Any]: 

165 """Get all items in order. 

166 

167 Returns: 

168 Array of all items (oldest first). 

169 """ 

170 return self.get_first(self._count) 

171 

172 @overload 

173 def __getitem__(self, index: int) -> T: ... 

174 

175 @overload 

176 def __getitem__(self, index: slice) -> NDArray[Any]: ... 

177 

178 def __getitem__(self, index: int | slice) -> T | NDArray[Any]: 

179 """Get item(s) by index. 

180 

181 Positive indices count from oldest (0 = oldest). 

182 Negative indices count from newest (-1 = newest). 

183 

184 Args: 

185 index: Integer index or slice. 

186 

187 Returns: 

188 Item or array of items. 

189 

190 Raises: 

191 IndexError: If index out of range. 

192 """ 

193 if isinstance(index, slice): 

194 # Convert slice to indices 

195 start, stop, step = index.indices(self._count) 

196 result = [] 

197 for i in range(start, stop, step): 

198 result.append(self[i]) 

199 return np.array(result, dtype=self._data.dtype) 

200 

201 if index < 0: 

202 index = self._count + index 

203 

204 if index < 0 or index >= self._count: 

205 raise IndexError(f"Index {index} out of range [0, {self._count})") 

206 

207 # Calculate actual position 

208 tail = (self._head - self._count) % self._capacity 

209 actual_idx = (tail + index) % self._capacity 

210 

211 return self._data[actual_idx] # type: ignore[no-any-return] 

212 

213 def __len__(self) -> int: 

214 """Get current item count.""" 

215 return self._count 

216 

217 def clear(self) -> None: 

218 """Clear all items.""" 

219 self._head = 0 

220 self._count = 0 

221 

222 def mean(self) -> float: 

223 """Compute mean of numeric buffer. 

224 

225 Returns: 

226 Mean value, or NaN if empty. 

227 """ 

228 if self._count == 0: 

229 return float("nan") 

230 

231 return float(np.mean(self.get_all())) 

232 

233 def std(self) -> float: 

234 """Compute standard deviation. 

235 

236 Returns: 

237 Standard deviation, or NaN if empty. 

238 """ 

239 if self._count < 2: 

240 return float("nan") 

241 

242 return float(np.std(self.get_all())) 

243 

244 def min(self) -> T: 

245 """Get minimum value. 

246 

247 Returns: 

248 Minimum value. 

249 

250 Raises: 

251 ValueError: If buffer is empty. 

252 """ 

253 if self._count == 0: 

254 raise ValueError("Buffer is empty") 

255 

256 return np.min(self.get_all()) # type: ignore[no-any-return] 

257 

258 def max(self) -> T: 

259 """Get maximum value. 

260 

261 Returns: 

262 Maximum value. 

263 

264 Raises: 

265 ValueError: If buffer is empty. 

266 """ 

267 if self._count == 0: 

268 raise ValueError("Buffer is empty") 

269 

270 return np.max(self.get_all()) # type: ignore[no-any-return] 

271 

272 

273class SlidingWindow: 

274 """Sliding window for time-series analysis. 

275 

276 Maintains a window of samples based on time or count. 

277 

278 Args: 

279 window_size: Window size in samples or seconds. 

280 time_based: If True, window_size is in seconds. 

281 

282 Example: 

283 >>> window = SlidingWindow(1000) # 1000 samples 

284 >>> for sample, time in stream: 

285 ... window.add(sample, time) 

286 ... if window.is_ready(): 

287 ... result = analyze(window.get_data()) 

288 """ 

289 

290 def __init__( 

291 self, 

292 window_size: int | float, 

293 time_based: bool = False, 

294 dtype: DTypeLike = np.float64, 

295 ) -> None: 

296 """Initialize sliding window. 

297 

298 Args: 

299 window_size: Size in samples or seconds. 

300 time_based: True for time-based window. 

301 dtype: Data type for samples. 

302 """ 

303 self._window_size = window_size 

304 self._time_based = time_based 

305 

306 if time_based: 

307 # Use large buffer for time-based 

308 capacity = 100000 

309 else: 

310 capacity = int(window_size) 

311 

312 self._data = CircularBuffer(capacity, dtype=dtype) # type: ignore[var-annotated] 

313 self._times = CircularBuffer(capacity, dtype=np.float64) # type: ignore[var-annotated] 

314 

315 def add(self, value: float, timestamp: float | None = None) -> None: 

316 """Add sample to window. 

317 

318 Args: 

319 value: Sample value. 

320 timestamp: Sample timestamp (required for time-based). 

321 

322 Raises: 

323 ValueError: If timestamp is None for time-based window. 

324 """ 

325 self._data.append(value) 

326 

327 if self._time_based: 

328 if timestamp is None: 

329 raise ValueError("Timestamp required for time-based window") 

330 self._times.append(timestamp) 

331 

332 def is_ready(self) -> bool: 

333 """Check if window is full.""" 

334 if self._time_based: 

335 if self._times.count < 2: 

336 return False 

337 times = self._times.get_all() 

338 duration = times[-1] - times[0] 

339 return duration >= self._window_size # type: ignore[no-any-return] 

340 else: 

341 return self._data.count >= self._window_size 

342 

343 def get_data(self) -> NDArray[np.float64]: 

344 """Get window data. 

345 

346 Returns: 

347 Array of samples in window. 

348 """ 

349 if self._time_based: 

350 # Get samples within time window 

351 times = self._times.get_all() 

352 data = self._data.get_all() 

353 

354 if len(times) == 0: 

355 return np.array([], dtype=np.float64) 

356 

357 cutoff = times[-1] - self._window_size 

358 mask = times >= cutoff 

359 

360 result: NDArray[np.float64] = data[mask] 

361 return result 

362 else: 

363 result_all: NDArray[np.float64] = self._data.get_all() 

364 return result_all 

365 

366 def get_times(self) -> NDArray[np.float64]: 

367 """Get timestamps for time-based window. 

368 

369 Returns: 

370 Array of timestamps. 

371 

372 Raises: 

373 ValueError: If not a time-based window. 

374 """ 

375 if not self._time_based: 

376 raise ValueError("Not a time-based window") 

377 

378 return self._times.get_all() 

379 

380 def clear(self) -> None: 

381 """Clear window.""" 

382 self._data.clear() 

383 self._times.clear() 

384 

385 

386__all__ = [ 

387 "CircularBuffer", 

388 "SlidingWindow", 

389]