"""Resilience for model calls.
Two pieces:
* :class:`RetryPolicy` — a small dataclass describing the backoff
schedule (max attempts, initial delay, multiplier, max cap, jitter).
* :func:`classify_model_error` — inspects a raw exception from any
model SDK and maps it to our taxonomy
(:class:`~jeevesagent.TransientModelError` /
:class:`~jeevesagent.RateLimitError` / :class:`~jeevesagent.AuthenticationError`
/ etc.). Lazy imports so we never require an SDK that isn't
installed.
The actual retry loop lives in
:class:`~jeevesagent.model.retrying.RetryingModel`, which wraps any
:class:`~jeevesagent.Model` and runs every call through this
policy + classifier pair. Splitting policy/classification from the
retry mechanics keeps each piece testable in isolation and lets
callers reuse the classifier for non-Agent code (e.g. cron jobs
that hit the same SDK).
"""
from __future__ import annotations
import random
from dataclasses import dataclass
from ..core.errors import (
AuthenticationError,
ContentFilterError,
InvalidRequestError,
ModelError,
PermanentModelError,
RateLimitError,
TransientModelError,
)
__all__ = [
"RetryPolicy",
"classify_model_error",
"compute_backoff",
]
# ---------------------------------------------------------------------------
# RetryPolicy
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True, slots=True)
class RetryPolicy:
"""Exponential-backoff-with-jitter retry schedule.
The default is sensible for production: up to **3 attempts**
(one initial + two retries), starting at 1 s, doubling each
attempt, capped at 30 s, with ±10% jitter so synchronised
clients don't reform a thundering herd.
Examples::
# default — sensible for most apps
RetryPolicy()
# disable retries (fail fast)
RetryPolicy.disabled()
# aggressive — survives long provider blips
RetryPolicy.aggressive()
# tuned to a specific SLO
RetryPolicy(max_attempts=4, initial_delay_s=0.5, max_delay_s=15)
The schedule applies *between* attempts: the first call has no
delay, the second is delayed by ``initial_delay_s`` (± jitter),
the third by ``initial_delay_s * multiplier`` (± jitter), etc.,
each capped at ``max_delay_s``. Provider-supplied
``Retry-After`` hints (carried on
:class:`~jeevesagent.RateLimitError.retry_after`) override the
computed delay when they ask for *more* time — we never sleep
less than the provider asked for.
"""
max_attempts: int = 3
"""Maximum total attempts including the first call. ``1`` means
no retries; the call either succeeds or raises immediately. The
minimum-meaningful retry policy is therefore ``max_attempts=2``."""
initial_delay_s: float = 1.0
"""Backoff before the FIRST retry (i.e. between attempts 1 and 2).
Subsequent retries use ``initial_delay_s * multiplier**n``."""
max_delay_s: float = 30.0
"""Cap on any single backoff. Prevents runaway sleeps when
``multiplier`` is large or ``max_attempts`` is high."""
multiplier: float = 2.0
"""Geometric growth between successive retries. ``2.0`` doubles
each time; ``1.0`` makes the policy linear (fixed-interval)."""
jitter: float = 0.1
"""Fractional ±jitter applied to each computed delay. ``0.1`` =
±10%. Set to ``0`` for deterministic backoff (useful in tests)."""
[docs]
@classmethod
def disabled(cls) -> RetryPolicy:
"""Single attempt, no retries — fail fast on any error."""
return cls(max_attempts=1, initial_delay_s=0.0, jitter=0.0)
[docs]
@classmethod
def aggressive(cls) -> RetryPolicy:
"""Up to 6 attempts, faster initial backoff, longer cap.
Use when the underlying provider is known-flaky and the
caller prefers slow success over fast failure."""
return cls(
max_attempts=6,
initial_delay_s=0.5,
max_delay_s=60.0,
multiplier=2.0,
)
[docs]
def is_enabled(self) -> bool:
"""``True`` when the policy permits at least one retry."""
return self.max_attempts >= 2
[docs]
def compute_backoff(
policy: RetryPolicy,
attempt: int,
*,
retry_after: float | None = None,
rng: random.Random | None = None,
) -> float:
"""Backoff (seconds) before retry number ``attempt`` (1-indexed).
``attempt=1`` is the delay before the first *retry* (i.e. between
attempts 1 and 2 of ``max_attempts``). Returns ``0`` when
``policy`` is disabled.
``retry_after`` (provider hint, e.g. from a 429 ``Retry-After``
header) acts as a *floor*: we never wait less than the provider
asked for, but we still cap at ``policy.max_delay_s``. This
means a provider-supplied 60-second hint paired with a 30-second
cap is honoured at 60 seconds (exceeding the cap on purpose —
the provider is more authoritative than our heuristic).
"""
if policy.max_attempts <= 1:
return 0.0
base = policy.initial_delay_s * (policy.multiplier ** max(0, attempt - 1))
base = min(base, policy.max_delay_s)
if policy.jitter > 0:
r = rng or random
# Symmetric ±jitter around ``base``.
base = base * (1.0 + r.uniform(-policy.jitter, policy.jitter))
base = max(0.0, base)
if retry_after is not None and retry_after > base:
# Provider asked for more time — honour it (still bounded
# below by 0; can exceed our cap when the provider says so).
return retry_after
return base
# ---------------------------------------------------------------------------
# Classification — SDK exception → our taxonomy
# ---------------------------------------------------------------------------
[docs]
def classify_model_error(exc: BaseException) -> ModelError | None:
"""Map an exception from any model SDK to the framework's taxonomy.
Returns ``None`` when the exception is not recognised as a
model-call failure — let callers decide whether to wrap it in
something else or propagate. Returns an instance of one of
:class:`~jeevesagent.TransientModelError` /
:class:`~jeevesagent.RateLimitError` /
:class:`~jeevesagent.AuthenticationError` /
:class:`~jeevesagent.InvalidRequestError` /
:class:`~jeevesagent.ContentFilterError` /
:class:`~jeevesagent.PermanentModelError` otherwise.
SDK imports are lazy — having e.g. the ``anthropic`` package
installed is not required for OpenAI classification to work,
and vice versa.
"""
# Already classified — pass through unchanged so wrapping
# double-classification doesn't lose context.
if isinstance(exc, ModelError):
return exc
classified = _classify_openai(exc)
if classified is not None:
return classified
classified = _classify_anthropic(exc)
if classified is not None:
return classified
classified = _classify_httpx(exc)
if classified is not None:
return classified
return None
def _classify_openai(exc: BaseException) -> ModelError | None:
try:
import openai
except ImportError:
return None
# Rate limit is a subclass of APIStatusError; check it first.
if isinstance(exc, getattr(openai, "RateLimitError", ())):
return RateLimitError(
f"OpenAI rate limit: {exc}",
retry_after=_extract_retry_after(exc),
cause=exc,
)
if isinstance(exc, getattr(openai, "AuthenticationError", ())):
return AuthenticationError(f"OpenAI auth failed: {exc}", cause=exc)
if isinstance(exc, getattr(openai, "PermissionDeniedError", ())):
return AuthenticationError(
f"OpenAI permission denied: {exc}", cause=exc
)
if isinstance(exc, getattr(openai, "BadRequestError", ())):
# OpenAI's BadRequestError covers content-filter rejections
# too; the body's ``code`` field disambiguates.
if _is_content_filter(exc):
return ContentFilterError(
f"OpenAI content filter: {exc}", cause=exc
)
return InvalidRequestError(
f"OpenAI bad request: {exc}", cause=exc
)
if isinstance(
exc,
(
getattr(openai, "APITimeoutError", ()),
getattr(openai, "APIConnectionError", ()),
getattr(openai, "InternalServerError", ()),
),
):
return TransientModelError(
f"OpenAI transient: {exc}",
retry_after=_extract_retry_after(exc),
cause=exc,
)
if isinstance(exc, getattr(openai, "APIStatusError", ())):
# Anything else with an HTTP status from the OpenAI SDK.
status = getattr(exc, "status_code", None)
if status is not None and 500 <= int(status) < 600:
return TransientModelError(
f"OpenAI {status}: {exc}",
retry_after=_extract_retry_after(exc),
cause=exc,
)
return PermanentModelError(f"OpenAI error: {exc}", cause=exc)
return None
def _classify_anthropic(exc: BaseException) -> ModelError | None:
try:
import anthropic
except ImportError:
return None
if isinstance(exc, getattr(anthropic, "RateLimitError", ())):
return RateLimitError(
f"Anthropic rate limit: {exc}",
retry_after=_extract_retry_after(exc),
cause=exc,
)
if isinstance(exc, getattr(anthropic, "AuthenticationError", ())):
return AuthenticationError(
f"Anthropic auth failed: {exc}", cause=exc
)
if isinstance(exc, getattr(anthropic, "PermissionDeniedError", ())):
return AuthenticationError(
f"Anthropic permission denied: {exc}", cause=exc
)
if isinstance(exc, getattr(anthropic, "BadRequestError", ())):
if _is_content_filter(exc):
return ContentFilterError(
f"Anthropic content filter: {exc}", cause=exc
)
return InvalidRequestError(
f"Anthropic bad request: {exc}", cause=exc
)
if isinstance(
exc,
(
getattr(anthropic, "APITimeoutError", ()),
getattr(anthropic, "APIConnectionError", ()),
getattr(anthropic, "InternalServerError", ()),
),
):
return TransientModelError(
f"Anthropic transient: {exc}",
retry_after=_extract_retry_after(exc),
cause=exc,
)
if isinstance(exc, getattr(anthropic, "APIStatusError", ())):
status = getattr(exc, "status_code", None)
if status is not None and 500 <= int(status) < 600:
return TransientModelError(
f"Anthropic {status}: {exc}",
retry_after=_extract_retry_after(exc),
cause=exc,
)
return PermanentModelError(
f"Anthropic error: {exc}", cause=exc
)
return None
def _classify_httpx(exc: BaseException) -> ModelError | None:
"""Bare ``httpx`` exceptions surface from custom transports
(non-OpenAI/Anthropic providers via LiteLLM, custom HTTP-based
Models). Only network-layer failures are classified — HTTP
status responses never raise from ``httpx`` directly."""
try:
import httpx
except ImportError:
return None
if isinstance(
exc,
(
getattr(httpx, "TimeoutException", ()),
getattr(httpx, "ConnectError", ()),
getattr(httpx, "ReadError", ()),
getattr(httpx, "WriteError", ()),
getattr(httpx, "RemoteProtocolError", ()),
),
):
return TransientModelError(
f"network: {exc}", cause=exc
)
return None
# ---------------------------------------------------------------------------
# Helpers — best-effort metadata extraction from heterogeneous SDK errors
# ---------------------------------------------------------------------------
def _extract_retry_after(exc: BaseException) -> float | None:
"""Pull a ``Retry-After`` value off a SDK exception when one is
available. SDKs vary: some carry the response on ``.response``,
some on ``.body``, some not at all.
Returns the value in seconds, or ``None`` when nothing is set.
"""
response = getattr(exc, "response", None)
if response is None:
return None
headers = getattr(response, "headers", None)
if headers is None:
return None
raw = headers.get("retry-after") or headers.get("Retry-After")
if not raw:
return None
try:
return float(raw)
except (TypeError, ValueError):
return None
_CONTENT_FILTER_HINTS = (
"content_filter",
"content_policy",
"safety",
)
def _is_content_filter(exc: BaseException) -> bool:
"""Heuristic check — does this 400 look like a content-filter
rejection? Both OpenAI and Anthropic surface these as 400s
with a marker code in the body."""
msg = str(exc).lower()
if any(hint in msg for hint in _CONTENT_FILTER_HINTS):
return True
body = getattr(exc, "body", None)
if isinstance(body, dict):
code = body.get("code") or body.get("type")
if isinstance(code, str) and any(
hint in code.lower() for hint in _CONTENT_FILTER_HINTS
):
return True
return False