Coverage for agentos/tools/retry_queue.py: 0%

120 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 07:47 +0800

1""" 

2RetryQueue — asynchronous retry with exponential backoff, max attempts, and dead letter queue. 

3 

4Supports: 

5 - Exponential backoff with jitter 

6 - Max retry attempts 

7 - Dead letter queue for permanently failed items 

8 - Callback hooks (on_retry, on_failure, on_success) 

9 - Synchronous and async execution modes 

10 - Configurable backoff strategy (exponential, constant, linear) 

11""" 

12 

13from __future__ import annotations 

14 

15import random 

16import threading 

17import time 

18from dataclasses import dataclass, field 

19from enum import Enum 

20from typing import Any, Callable, Dict, List, Optional 

21 

22 

23# ============================================================================ 

24# Backoff Strategy 

25# ============================================================================ 

26 

27class BackoffStrategy(Enum): 

28 EXPONENTIAL = "exponential" 

29 CONSTANT = "constant" 

30 LINEAR = "linear" 

31 

32 

33# ============================================================================ 

34# Job 

35# ============================================================================ 

36 

37@dataclass 

38class RetryJob: 

39 id: str 

40 func: Callable[..., Any] 

41 args: tuple = () 

42 kwargs: Dict[str, Any] = field(default_factory=dict) 

43 attempts: int = 0 

44 last_error: Optional[Exception] = None 

45 created_at: float = field(default_factory=time.time) 

46 

47 def execute(self) -> Any: 

48 return self.func(*self.args, **self.kwargs) 

49 

50 

51# ============================================================================ 

52# RetryQueue 

53# ============================================================================ 

54 

55class RetryQueue: 

56 """Asynchronous retry queue with exponential backoff. 

57 

58 Usage: 

59 rq = RetryQueue( 

60 max_attempts=3, 

61 base_delay=1.0, 

62 max_delay=30.0, 

63 ) 

64 

65 def risky_call(a, b): 

66 # might fail... 

67 return a / b 

68 

69 # Submit a job — if it fails, it will be retried 

70 result = rq.submit(risky_call, 10, 0) 

71 

72 # Check dead letters 

73 for job, error in rq.dead_letters: 

74 print(f"Job {job.id} permanently failed: {error}") 

75 """ 

76 

77 def __init__( 

78 self, 

79 max_attempts: int = 3, 

80 base_delay: float = 1.0, 

81 max_delay: float = 60.0, 

82 backoff: BackoffStrategy = BackoffStrategy.EXPONENTIAL, 

83 jitter: bool = True, 

84 ): 

85 if max_attempts < 1: 

86 raise ValueError("max_attempts must be at least 1") 

87 self._max_attempts = max_attempts 

88 self._base_delay = base_delay 

89 self._max_delay = max_delay 

90 self._backoff = backoff 

91 self._jitter = jitter 

92 self._dead_letters: List[tuple] = [] 

93 self._lock = threading.RLock() 

94 self._total_submitted: int = 0 

95 self._total_succeeded: int = 0 

96 self._total_failed: int = 0 

97 # Hooks 

98 self._on_retry: List[Callable[[RetryJob, Exception, int], None]] = [] 

99 self._on_failure: List[Callable[[RetryJob, Exception], None]] = [] 

100 self._on_success: List[Callable[[RetryJob, Any], None]] = [] 

101 

102 # ---------- submit ---------- 

103 

104 def submit(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

105 """Submit and execute a job with retry. Raises last error if all attempts fail.""" 

106 import uuid 

107 job = RetryJob( 

108 id=str(uuid.uuid4())[:8], 

109 func=func, 

110 args=args, 

111 kwargs=kwargs, 

112 ) 

113 with self._lock: 

114 self._total_submitted += 1 

115 return self._execute(job) 

116 

117 def _execute(self, job: RetryJob) -> Any: 

118 attempt = 0 

119 while True: 

120 try: 

121 result = job.execute() 

122 self._notify_success(job, result) 

123 with self._lock: 

124 self._total_succeeded += 1 

125 return result 

126 except Exception as e: 

127 job.last_error = e 

128 job.attempts += 1 

129 attempt += 1 

130 

131 if attempt >= self._max_attempts: 

132 self._notify_failure(job, e) 

133 with self._lock: 

134 self._total_failed += 1 

135 self._dead_letters.append((job, e)) 

136 raise 

137 

138 self._notify_retry(job, e, attempt) 

139 delay = self._compute_delay(attempt) 

140 time.sleep(delay) 

141 

142 def _compute_delay(self, attempt: int) -> float: 

143 if self._backoff == BackoffStrategy.CONSTANT: 

144 delay = self._base_delay 

145 elif self._backoff == BackoffStrategy.LINEAR: 

146 delay = self._base_delay * attempt 

147 else: # EXPONENTIAL 

148 delay = self._base_delay * (2 ** (attempt - 1)) 

149 

150 delay = min(delay, self._max_delay) 

151 

152 if self._jitter: 

153 delay = delay * (0.5 + random.random() * 0.5) # 50%-100% of delay 

154 

155 return delay 

156 

157 # ---------- hooks ---------- 

158 

159 def on_retry(self, callback: Callable[[RetryJob, Exception, int], None]) -> None: 

160 self._on_retry.append(callback) 

161 

162 def on_failure(self, callback: Callable[[RetryJob, Exception], None]) -> None: 

163 self._on_failure.append(callback) 

164 

165 def on_success(self, callback: Callable[[RetryJob, Any], None]) -> None: 

166 self._on_success.append(callback) 

167 

168 def _notify_retry(self, job, error, attempt): 

169 for cb in self._on_retry: 

170 try: 

171 cb(job, error, attempt) 

172 except Exception: 

173 pass 

174 

175 def _notify_failure(self, job, error): 

176 for cb in self._on_failure: 

177 try: 

178 cb(job, error) 

179 except Exception: 

180 pass 

181 

182 def _notify_success(self, job, result): 

183 for cb in self._on_success: 

184 try: 

185 cb(job, result) 

186 except Exception: 

187 pass 

188 

189 # ---------- dead letters ---------- 

190 

191 @property 

192 def dead_letters(self) -> List[tuple]: 

193 with self._lock: 

194 return list(self._dead_letters) 

195 

196 def clear_dead_letters(self) -> None: 

197 with self._lock: 

198 self._dead_letters.clear() 

199 

200 def retry_dead_letter(self, index: int) -> Any: 

201 """Re-submit a dead letter job.""" 

202 with self._lock: 

203 if index < 0 or index >= len(self._dead_letters): 

204 raise IndexError("dead letter index out of range") 

205 job, _ = self._dead_letters.pop(index) 

206 job.attempts = 0 

207 job.last_error = None 

208 return self._execute(job) 

209 

210 # ---------- stats ---------- 

211 

212 @property 

213 def stats(self) -> Dict[str, Any]: 

214 with self._lock: 

215 return { 

216 "total_submitted": self._total_submitted, 

217 "total_succeeded": self._total_succeeded, 

218 "total_failed": self._total_failed, 

219 "dead_letter_count": len(self._dead_letters), 

220 "max_attempts": self._max_attempts, 

221 "backoff": self._backoff.value, 

222 }