Source code for runtimepy.net.backoff
"""
A module implementing a simple exponential-backoff interface.
"""
# built-in
import asyncio
[docs]
class ExponentialBackoff:
"""A class implementing a simple exponential-backoff handler."""
wait: float
attempt: int
def __init__(
self,
interval: float = 0.1,
max_sleep: float = 10.0,
max_tries: int = 20,
) -> None:
"""Initialize this instance."""
self.interval = interval
self.max_sleep = max_sleep
self.max_tries = max_tries
self.reset()
[docs]
def reset(self) -> None:
"""Reset this instance's state."""
self.wait = 0.0
self.attempt = 0
@property
def give_up(self) -> bool:
"""
Determine whether or not to give up based on the number of attempts.
"""
return self.attempt >= self.max_tries
[docs]
async def sleep(self) -> None:
"""Sleep for the correct amount of time."""
await asyncio.sleep(self.wait)
self.wait = min((2 ^ self.attempt) * self.interval, self.max_sleep)
self.attempt += 1