Source code for jeevesagent.governance.budget
"""Token / call / cost budgets.
:class:`StandardBudget` enforces hard limits on tokens, cost, and
wall clock; emits a soft warning at a configurable threshold.
:class:`NoBudget` is the always-allow stub used when the user has
opted out of governance entirely.
**Multi-tenant accounting (M9).** ``StandardBudget`` tracks usage
per-``user_id`` so one user can't exhaust another's quota. Pass
``per_user_max_tokens`` / ``per_user_max_cost_usd`` /
``per_user_max_wall_clock`` in the :class:`BudgetConfig` to enforce
per-user caps in addition to (or instead of) the global ones. The
agent loop forwards ``user_id`` from the live :class:`RunContext`
into every ``allows_step`` / ``consume`` call automatically;
direct callers pass it explicitly via the keyword.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
import anyio
from ..core._eviction import BoundedDict
from ..core.types import BudgetStatus
_DEFAULT_MAX_USERS = 100_000
_DEFAULT_USER_TTL_SECONDS = 24 * 3600 # 24h idle
[docs]
class NoBudget:
"""Never blocks, never warns."""
[docs]
async def allows_step(
self, *, user_id: str | None = None
) -> BudgetStatus:
return BudgetStatus.ok_()
[docs]
async def consume(
self,
*,
tokens_in: int,
tokens_out: int,
cost_usd: float,
user_id: str | None = None,
) -> None:
return None
[docs]
@dataclass(slots=True)
class BudgetConfig:
"""Global + per-user budget caps.
Every ``max_*`` field has a global counterpart and a
``per_user_*`` counterpart. The global cap applies to the whole
Agent (all users combined); the per-user cap applies to each
user_id's bucket independently. A run is blocked when *either*
its user's cap or the global cap is exceeded — whichever fires
first.
Use one or both depending on what you want to enforce:
* ``max_tokens=200_000`` — Agent-wide total. Caps the whole tenant.
* ``per_user_max_tokens=10_000`` — Per user. Caps each user.
* Both — one user can't hog the global, and the global stops
runaway aggregate usage.
The warning threshold (``soft_warning_at``) is shared across
global and per-user caps.
"""
# Global caps (apply to all users combined).
max_tokens: int | None = None
max_input_tokens: int | None = None
max_output_tokens: int | None = None
max_cost_usd: float | None = None
max_wall_clock: timedelta | None = None
# Per-user caps (apply to each user_id's bucket independently).
per_user_max_tokens: int | None = None
per_user_max_input_tokens: int | None = None
per_user_max_output_tokens: int | None = None
per_user_max_cost_usd: float | None = None
per_user_max_wall_clock: timedelta | None = None
soft_warning_at: float = 0.8 # 80% triggers a warning
@dataclass(slots=True)
class _UserUsage:
"""Per-user-id bucket. Mirrors the global counters."""
tokens_in: int = 0
tokens_out: int = 0
cost: float = 0.0
started_at: datetime | None = None
def total_tokens(self) -> int:
return self.tokens_in + self.tokens_out
[docs]
class StandardBudget:
"""Hard-limited, thread-safe budget tracker with per-user
accounting.
Tracks usage globally AND per-user-id; either limit can fire.
Multi-tenant production agents should pass ``user_id`` to every
``allows_step`` / ``consume`` call (the agent loop does this
automatically from the live :class:`~jeevesagent.RunContext`).
Single-tenant code can omit it; the framework treats unspecified
user_id as the anonymous bucket.
"""
def __init__(
self,
cfg: BudgetConfig | None = None,
*,
max_users: int | None = _DEFAULT_MAX_USERS,
user_idle_ttl_seconds: float | None = _DEFAULT_USER_TTL_SECONDS,
) -> None:
self._cfg = cfg or BudgetConfig()
# Global counters.
self._tokens_in = 0
self._tokens_out = 0
self._cost = 0.0
self._started_at = datetime.now(UTC)
# Per-user counters. Bounded so a runaway tenant or
# adversarial caller can't grow this dict without limit
# (process OOM is the only ceiling otherwise). LRU evicts
# the least-recently-touched user when ``max_users`` is
# exceeded; idle TTL drops users who haven't consumed in
# ``user_idle_ttl_seconds`` (default 24h). Evicting a
# bucket *resets* that user's running totals — appropriate
# for in-process accounting where the alternative is
# unbounded growth. Pass ``max_users=None`` /
# ``user_idle_ttl_seconds=None`` to disable bounding for
# single-tenant or small fixed-tenant deployments.
self._by_user: BoundedDict[str | None, _UserUsage] = BoundedDict(
max_keys=max_users,
ttl_seconds=user_idle_ttl_seconds,
)
self._lock = anyio.Lock()
[docs]
async def allows_step(
self, *, user_id: str | None = None
) -> BudgetStatus:
async with self._lock:
blocked = self._first_block_reason(user_id)
if blocked is not None:
return BudgetStatus.blocked_(blocked)
warn = self._first_warning_reason(user_id)
if warn is not None:
return BudgetStatus.warn_(warn)
return BudgetStatus.ok_()
[docs]
async def consume(
self,
*,
tokens_in: int,
tokens_out: int,
cost_usd: float,
user_id: str | None = None,
) -> None:
async with self._lock:
self._tokens_in += tokens_in
self._tokens_out += tokens_out
self._cost += cost_usd
bucket = self._by_user.setdefault(user_id, _UserUsage())
if bucket.started_at is None:
bucket.started_at = datetime.now(UTC)
bucket.tokens_in += tokens_in
bucket.tokens_out += tokens_out
bucket.cost += cost_usd
# ---- introspection (test + ops helper) ------------------------------
[docs]
def usage_for(self, user_id: str | None) -> dict[str, float]:
"""Snapshot one user's running totals — for telemetry / ops
dashboards. Returns an empty bucket for a user who hasn't
consumed anything yet."""
bucket = self._by_user.get(user_id, _UserUsage())
return {
"tokens_in": bucket.tokens_in,
"tokens_out": bucket.tokens_out,
"tokens_total": bucket.total_tokens(),
"cost_usd": bucket.cost,
}
# ---- helpers ---------------------------------------------------------
def _total_tokens(self) -> int:
return self._tokens_in + self._tokens_out
def _elapsed(self) -> timedelta:
return datetime.now(UTC) - self._started_at
def _user_elapsed(self, user_id: str | None) -> timedelta | None:
bucket = self._by_user.get(user_id)
if bucket is None or bucket.started_at is None:
return None
return datetime.now(UTC) - bucket.started_at
def _first_block_reason(self, user_id: str | None) -> str | None:
c = self._cfg
# Global caps first.
if c.max_tokens is not None and self._total_tokens() >= c.max_tokens:
return "max_tokens"
if c.max_input_tokens is not None and self._tokens_in >= c.max_input_tokens:
return "max_input_tokens"
if c.max_output_tokens is not None and self._tokens_out >= c.max_output_tokens:
return "max_output_tokens"
if c.max_cost_usd is not None and self._cost >= c.max_cost_usd:
return "max_cost_usd"
if c.max_wall_clock is not None and self._elapsed() >= c.max_wall_clock:
return "max_wall_clock"
# Per-user caps.
u = self._by_user.get(user_id)
if u is None:
return None
if (
c.per_user_max_tokens is not None
and u.total_tokens() >= c.per_user_max_tokens
):
return "per_user_max_tokens"
if (
c.per_user_max_input_tokens is not None
and u.tokens_in >= c.per_user_max_input_tokens
):
return "per_user_max_input_tokens"
if (
c.per_user_max_output_tokens is not None
and u.tokens_out >= c.per_user_max_output_tokens
):
return "per_user_max_output_tokens"
if (
c.per_user_max_cost_usd is not None
and u.cost >= c.per_user_max_cost_usd
):
return "per_user_max_cost_usd"
if c.per_user_max_wall_clock is not None:
elapsed = self._user_elapsed(user_id)
if elapsed is not None and elapsed >= c.per_user_max_wall_clock:
return "per_user_max_wall_clock"
return None
def _first_warning_reason(self, user_id: str | None) -> str | None:
c = self._cfg
threshold = c.soft_warning_at
# Global warnings.
if c.max_tokens is not None and self._total_tokens() >= c.max_tokens * threshold:
return f"tokens at {self._total_tokens() / c.max_tokens:.0%}"
if c.max_cost_usd is not None and self._cost >= c.max_cost_usd * threshold:
return f"cost at {self._cost / c.max_cost_usd:.0%}"
# Per-user warnings.
u = self._by_user.get(user_id)
if u is None:
return None
if (
c.per_user_max_tokens is not None
and u.total_tokens() >= c.per_user_max_tokens * threshold
):
return (
f"per-user tokens at "
f"{u.total_tokens() / c.per_user_max_tokens:.0%}"
)
if (
c.per_user_max_cost_usd is not None
and u.cost >= c.per_user_max_cost_usd * threshold
):
return f"per-user cost at {u.cost / c.per_user_max_cost_usd:.0%}"
return None