Coverage for src/usaspending/utils/rate_limit.py: 100%
61 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 17:15 -0700
1"""Rate limiting implementation for USASpending API client."""
3from __future__ import annotations
5import threading
6import time
7from collections import deque
8from typing import Deque, Optional
10from ..logging_config import USASpendingLogger
12logger = USASpendingLogger.get_logger(__name__)
15class RateLimiter:
16 """
17 Rate limiter using a sliding window algorithm.
19 This implementation tracks the timestamps of API calls and ensures
20 that no more than `max_calls` are made within any `period` second window.
22 Thread-safe implementation using locks.
23 """
25 def __init__(self, max_calls: int, period: float):
26 """
27 Initialize the rate limiter.
29 Args:
30 max_calls: Maximum number of calls allowed in the period
31 period: Time period in seconds
32 """
33 if max_calls <= 0:
34 raise ValueError("max_calls must be positive")
35 if period <= 0:
36 raise ValueError("period must be positive")
38 self.max_calls = max_calls
39 self.period = period
40 self._call_times: Deque[float] = deque()
41 self._lock = threading.Lock()
43 logger.debug(f"Initialized RateLimiter: {max_calls} calls per {period}s")
45 def wait_if_needed(self) -> None:
46 """
47 Wait if necessary to avoid exceeding the rate limit.
49 This method will block until it's safe to make another API call
50 without exceeding the configured rate limit.
51 """
52 with self._lock:
53 now = time.time()
55 # Remove timestamps outside the current window
56 cutoff_time = now - self.period
57 while self._call_times and self._call_times[0] <= cutoff_time:
58 self._call_times.popleft()
60 # If we're at the limit, calculate how long to wait
61 if len(self._call_times) >= self.max_calls:
62 # Wait until the oldest call exits the window
63 oldest_call = self._call_times[0]
64 wait_time = (oldest_call + self.period) - now
66 if wait_time > 0:
67 logger.info(
68 f"Rate limit reached. Waiting {wait_time:.2f}s before next request"
69 )
70 # Release lock while sleeping to allow other threads
71 self._lock.release()
72 try:
73 time.sleep(wait_time)
74 finally:
75 self._lock.acquire()
77 # Re-check and clean up after sleeping
78 now = time.time()
79 cutoff_time = now - self.period
80 while self._call_times and self._call_times[0] <= cutoff_time:
81 self._call_times.popleft()
83 # Record this call
84 self._call_times.append(now)
86 logger.debug(
87 f"Recorded API call at {now:.3f}. "
88 f"Current window has {len(self._call_times)} calls"
89 )
91 def reset(self) -> None:
92 """Reset the rate limiter, clearing all recorded calls."""
93 with self._lock:
94 self._call_times.clear()
95 logger.debug("Rate limiter reset")
97 @property
98 def available_calls(self) -> int:
99 """
100 Get the number of calls that can be made immediately.
102 Returns:
103 Number of available calls without waiting
104 """
105 with self._lock:
106 now = time.time()
107 cutoff_time = now - self.period
109 # Remove outdated timestamps
110 while self._call_times and self._call_times[0] <= cutoff_time:
111 self._call_times.popleft()
113 return max(0, self.max_calls - len(self._call_times))
115 @property
116 def next_available_time(self) -> Optional[float]:
117 """
118 Get the timestamp when the next call will be available.
120 Returns:
121 Unix timestamp when next call can be made, or None if calls are available now
122 """
123 with self._lock:
124 now = time.time()
125 cutoff_time = now - self.period
127 # Remove outdated timestamps
128 while self._call_times and self._call_times[0] <= cutoff_time:
129 self._call_times.popleft()
131 if len(self._call_times) < self.max_calls:
132 return None # Calls available now
134 # Return when the oldest call will exit the window
135 return self._call_times[0] + self.period