Coverage for agentos/tools/retry_queue.py: 0%
120 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:47 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:47 +0800
1"""
2RetryQueue — asynchronous retry with exponential backoff, max attempts, and dead letter queue.
4Supports:
5 - Exponential backoff with jitter
6 - Max retry attempts
7 - Dead letter queue for permanently failed items
8 - Callback hooks (on_retry, on_failure, on_success)
9 - Synchronous and async execution modes
10 - Configurable backoff strategy (exponential, constant, linear)
11"""
13from __future__ import annotations
15import random
16import threading
17import time
18from dataclasses import dataclass, field
19from enum import Enum
20from typing import Any, Callable, Dict, List, Optional
23# ============================================================================
24# Backoff Strategy
25# ============================================================================
27class BackoffStrategy(Enum):
28 EXPONENTIAL = "exponential"
29 CONSTANT = "constant"
30 LINEAR = "linear"
33# ============================================================================
34# Job
35# ============================================================================
37@dataclass
38class RetryJob:
39 id: str
40 func: Callable[..., Any]
41 args: tuple = ()
42 kwargs: Dict[str, Any] = field(default_factory=dict)
43 attempts: int = 0
44 last_error: Optional[Exception] = None
45 created_at: float = field(default_factory=time.time)
47 def execute(self) -> Any:
48 return self.func(*self.args, **self.kwargs)
51# ============================================================================
52# RetryQueue
53# ============================================================================
55class RetryQueue:
56 """Asynchronous retry queue with exponential backoff.
58 Usage:
59 rq = RetryQueue(
60 max_attempts=3,
61 base_delay=1.0,
62 max_delay=30.0,
63 )
65 def risky_call(a, b):
66 # might fail...
67 return a / b
69 # Submit a job — if it fails, it will be retried
70 result = rq.submit(risky_call, 10, 0)
72 # Check dead letters
73 for job, error in rq.dead_letters:
74 print(f"Job {job.id} permanently failed: {error}")
75 """
77 def __init__(
78 self,
79 max_attempts: int = 3,
80 base_delay: float = 1.0,
81 max_delay: float = 60.0,
82 backoff: BackoffStrategy = BackoffStrategy.EXPONENTIAL,
83 jitter: bool = True,
84 ):
85 if max_attempts < 1:
86 raise ValueError("max_attempts must be at least 1")
87 self._max_attempts = max_attempts
88 self._base_delay = base_delay
89 self._max_delay = max_delay
90 self._backoff = backoff
91 self._jitter = jitter
92 self._dead_letters: List[tuple] = []
93 self._lock = threading.RLock()
94 self._total_submitted: int = 0
95 self._total_succeeded: int = 0
96 self._total_failed: int = 0
97 # Hooks
98 self._on_retry: List[Callable[[RetryJob, Exception, int], None]] = []
99 self._on_failure: List[Callable[[RetryJob, Exception], None]] = []
100 self._on_success: List[Callable[[RetryJob, Any], None]] = []
102 # ---------- submit ----------
104 def submit(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
105 """Submit and execute a job with retry. Raises last error if all attempts fail."""
106 import uuid
107 job = RetryJob(
108 id=str(uuid.uuid4())[:8],
109 func=func,
110 args=args,
111 kwargs=kwargs,
112 )
113 with self._lock:
114 self._total_submitted += 1
115 return self._execute(job)
117 def _execute(self, job: RetryJob) -> Any:
118 attempt = 0
119 while True:
120 try:
121 result = job.execute()
122 self._notify_success(job, result)
123 with self._lock:
124 self._total_succeeded += 1
125 return result
126 except Exception as e:
127 job.last_error = e
128 job.attempts += 1
129 attempt += 1
131 if attempt >= self._max_attempts:
132 self._notify_failure(job, e)
133 with self._lock:
134 self._total_failed += 1
135 self._dead_letters.append((job, e))
136 raise
138 self._notify_retry(job, e, attempt)
139 delay = self._compute_delay(attempt)
140 time.sleep(delay)
142 def _compute_delay(self, attempt: int) -> float:
143 if self._backoff == BackoffStrategy.CONSTANT:
144 delay = self._base_delay
145 elif self._backoff == BackoffStrategy.LINEAR:
146 delay = self._base_delay * attempt
147 else: # EXPONENTIAL
148 delay = self._base_delay * (2 ** (attempt - 1))
150 delay = min(delay, self._max_delay)
152 if self._jitter:
153 delay = delay * (0.5 + random.random() * 0.5) # 50%-100% of delay
155 return delay
157 # ---------- hooks ----------
159 def on_retry(self, callback: Callable[[RetryJob, Exception, int], None]) -> None:
160 self._on_retry.append(callback)
162 def on_failure(self, callback: Callable[[RetryJob, Exception], None]) -> None:
163 self._on_failure.append(callback)
165 def on_success(self, callback: Callable[[RetryJob, Any], None]) -> None:
166 self._on_success.append(callback)
168 def _notify_retry(self, job, error, attempt):
169 for cb in self._on_retry:
170 try:
171 cb(job, error, attempt)
172 except Exception:
173 pass
175 def _notify_failure(self, job, error):
176 for cb in self._on_failure:
177 try:
178 cb(job, error)
179 except Exception:
180 pass
182 def _notify_success(self, job, result):
183 for cb in self._on_success:
184 try:
185 cb(job, result)
186 except Exception:
187 pass
189 # ---------- dead letters ----------
191 @property
192 def dead_letters(self) -> List[tuple]:
193 with self._lock:
194 return list(self._dead_letters)
196 def clear_dead_letters(self) -> None:
197 with self._lock:
198 self._dead_letters.clear()
200 def retry_dead_letter(self, index: int) -> Any:
201 """Re-submit a dead letter job."""
202 with self._lock:
203 if index < 0 or index >= len(self._dead_letters):
204 raise IndexError("dead letter index out of range")
205 job, _ = self._dead_letters.pop(index)
206 job.attempts = 0
207 job.last_error = None
208 return self._execute(job)
210 # ---------- stats ----------
212 @property
213 def stats(self) -> Dict[str, Any]:
214 with self._lock:
215 return {
216 "total_submitted": self._total_submitted,
217 "total_succeeded": self._total_succeeded,
218 "total_failed": self._total_failed,
219 "dead_letter_count": len(self._dead_letters),
220 "max_attempts": self._max_attempts,
221 "backoff": self._backoff.value,
222 }