Coverage for src/usaspending/utils/rate_limit.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 17:15 -0700

1"""Rate limiting implementation for USASpending API client.""" 

2 

3from __future__ import annotations 

4 

5import threading 

6import time 

7from collections import deque 

8from typing import Deque, Optional 

9 

10from ..logging_config import USASpendingLogger 

11 

12logger = USASpendingLogger.get_logger(__name__) 

13 

14 

15class RateLimiter: 

16 """ 

17 Rate limiter using a sliding window algorithm. 

18 

19 This implementation tracks the timestamps of API calls and ensures 

20 that no more than `max_calls` are made within any `period` second window. 

21 

22 Thread-safe implementation using locks. 

23 """ 

24 

25 def __init__(self, max_calls: int, period: float): 

26 """ 

27 Initialize the rate limiter. 

28 

29 Args: 

30 max_calls: Maximum number of calls allowed in the period 

31 period: Time period in seconds 

32 """ 

33 if max_calls <= 0: 

34 raise ValueError("max_calls must be positive") 

35 if period <= 0: 

36 raise ValueError("period must be positive") 

37 

38 self.max_calls = max_calls 

39 self.period = period 

40 self._call_times: Deque[float] = deque() 

41 self._lock = threading.Lock() 

42 

43 logger.debug(f"Initialized RateLimiter: {max_calls} calls per {period}s") 

44 

45 def wait_if_needed(self) -> None: 

46 """ 

47 Wait if necessary to avoid exceeding the rate limit. 

48 

49 This method will block until it's safe to make another API call 

50 without exceeding the configured rate limit. 

51 """ 

52 with self._lock: 

53 now = time.time() 

54 

55 # Remove timestamps outside the current window 

56 cutoff_time = now - self.period 

57 while self._call_times and self._call_times[0] <= cutoff_time: 

58 self._call_times.popleft() 

59 

60 # If we're at the limit, calculate how long to wait 

61 if len(self._call_times) >= self.max_calls: 

62 # Wait until the oldest call exits the window 

63 oldest_call = self._call_times[0] 

64 wait_time = (oldest_call + self.period) - now 

65 

66 if wait_time > 0: 

67 logger.info( 

68 f"Rate limit reached. Waiting {wait_time:.2f}s before next request" 

69 ) 

70 # Release lock while sleeping to allow other threads 

71 self._lock.release() 

72 try: 

73 time.sleep(wait_time) 

74 finally: 

75 self._lock.acquire() 

76 

77 # Re-check and clean up after sleeping 

78 now = time.time() 

79 cutoff_time = now - self.period 

80 while self._call_times and self._call_times[0] <= cutoff_time: 

81 self._call_times.popleft() 

82 

83 # Record this call 

84 self._call_times.append(now) 

85 

86 logger.debug( 

87 f"Recorded API call at {now:.3f}. " 

88 f"Current window has {len(self._call_times)} calls" 

89 ) 

90 

91 def reset(self) -> None: 

92 """Reset the rate limiter, clearing all recorded calls.""" 

93 with self._lock: 

94 self._call_times.clear() 

95 logger.debug("Rate limiter reset") 

96 

97 @property 

98 def available_calls(self) -> int: 

99 """ 

100 Get the number of calls that can be made immediately. 

101 

102 Returns: 

103 Number of available calls without waiting 

104 """ 

105 with self._lock: 

106 now = time.time() 

107 cutoff_time = now - self.period 

108 

109 # Remove outdated timestamps 

110 while self._call_times and self._call_times[0] <= cutoff_time: 

111 self._call_times.popleft() 

112 

113 return max(0, self.max_calls - len(self._call_times)) 

114 

115 @property 

116 def next_available_time(self) -> Optional[float]: 

117 """ 

118 Get the timestamp when the next call will be available. 

119 

120 Returns: 

121 Unix timestamp when next call can be made, or None if calls are available now 

122 """ 

123 with self._lock: 

124 now = time.time() 

125 cutoff_time = now - self.period 

126 

127 # Remove outdated timestamps 

128 while self._call_times and self._call_times[0] <= cutoff_time: 

129 self._call_times.popleft() 

130 

131 if len(self._call_times) < self.max_calls: 

132 return None # Calls available now 

133 

134 # Return when the oldest call will exit the window 

135 return self._call_times[0] + self.period