Metadata-Version: 2.4
Name: monetise-circuit-breaker
Version: 0.1.1
Summary: Circuit breaker for AI agents — pick budget-guard or loop-killer mode and stop runaway token spend or stuck agents in one wrapper. Adapters for LangChain, OpenAI Agents SDK, the Claude Agent SDK, and the LangGraph Platform SDK.
Project-URL: Homepage, https://github.com/MonetiseBG/circuit-breaker-python
Project-URL: Repository, https://github.com/MonetiseBG/circuit-breaker-python
Project-URL: Issues, https://github.com/MonetiseBG/circuit-breaker-python/issues
Author: MonetiseBG
License-Expression: Apache-2.0
License-File: LICENSE
Keywords: agent,ai,anthropic,budget-guard,circuit-breaker,claude,claude-agent-sdk,guardrail,langchain,langgraph,langgraph-sdk,llm,loop-killer,openai,openai-agents,token-limit
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.10
Provides-Extra: claude-agent-sdk
Requires-Dist: claude-agent-sdk<1.0,>=0.1; extra == 'claude-agent-sdk'
Provides-Extra: dev
Requires-Dist: langchain-core<2.0,>=0.3; extra == 'dev'
Requires-Dist: langgraph-sdk<2.0,>=0.1; extra == 'dev'
Requires-Dist: mypy>=1.11; extra == 'dev'
Requires-Dist: openai-agents<1.0,>=0.1; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest>=8; extra == 'dev'
Provides-Extra: langchain
Requires-Dist: langchain-core<2.0,>=0.3; extra == 'langchain'
Provides-Extra: langgraph-sdk
Requires-Dist: langgraph-sdk<2.0,>=0.1; extra == 'langgraph-sdk'
Provides-Extra: openai-agents
Requires-Dist: openai-agents<1.0,>=0.1; extra == 'openai-agents'
Description-Content-Type: text/markdown

# monetise-circuit-breaker

[![CI](https://github.com/MonetiseBG/circuit-breaker-python/actions/workflows/ci.yml/badge.svg)](https://github.com/MonetiseBG/circuit-breaker-python/actions/workflows/ci.yml)

> One wrapper between you and runaway execution.

Minimal **circuit breaker** for AI agents. Wrap any supported agent and pick a
mode — the breaker cuts the run short once provider-reported usage crosses a
limit, and (optionally) refuses an oversized prompt before it is even sent.

This is the Python port of
[`@monetisebg/circuit-breaker`](https://github.com/MonetiseBG/circuit-breaker)
(the TypeScript/npm package).

- Zero-config: defaults work out of the box.
- Two modes, pick one: **`budget-guard`** (token caps) and **`loop-killer`**
  (state-repeat detection).
- Post-hoc enforcement by default: token tripping happens **after** each call
  or turn boundary, so the call that crosses the limit still counts. Use the
  optional `estimate_input_tokens` preflight (see below) to reject oversized
  initial inputs before any provider work happens.
- Visible: emits `CircuitBreakerEvent`s as the run progresses.
- Typed: raises a `CircuitBreakerError`, or routes through your `on_trip` handler.
- Optional dependencies — only install the framework you actually use.
- No bundled tokenizer: bring your own (`tiktoken`, `transformers`, provider SDK).

Shipped adapters: **LangChain**, **OpenAI Agents SDK**, **Claude Agent SDK**,
**LangGraph Platform SDK**. The core is framework-agnostic; rolling your own
adapter is a few lines.

## Install

Requires **Python ≥ 3.10**.

```bash
pip install monetise-circuit-breaker
# plus the framework you use (only the one you need):
pip install "monetise-circuit-breaker[langchain]"
pip install "monetise-circuit-breaker[openai-agents]"
pip install "monetise-circuit-breaker[claude-agent-sdk]"
pip install "monetise-circuit-breaker[langgraph-sdk]"
```

## Quick start (`budget-guard`, the default)

```python
from monetise_circuit_breaker.openai_agents import with_circuit_breaker

safe_agent = with_circuit_breaker(agent)  # defaults: 10k input + 10k output

await safe_agent.run("Analyze this dataset")
```

`budget-guard` caps input and output tokens **independently**. Default limits:
`max_input_token = 10_000`, `max_output_token = 10_000`. Token usage is read
from each provider response, so the breaker trips on the **next** call/turn
after either bucket is exceeded — the call that pushed the bucket over the
limit still counts. To reject an oversized first prompt before it is sent, pass
an optional `estimate_input_tokens` preflight (next section).

```python
with_circuit_breaker(
    agent,
    mode="budget-guard",      # optional — this is the default
    max_input_token=50_000,
    max_output_token=20_000,
)
```

### Preflight — `estimate_input_tokens`

```python
import tiktoken

enc = tiktoken.encoding_for_model("gpt-4o")

with_circuit_breaker(
    agent,
    max_input_token=50_000,
    # input is the wrapper's call argument (per adapter)
    estimate_input_tokens=lambda input: (
        len(enc.encode(input)) if isinstance(input, str) else None
    ),
)
```

If the estimate exceeds `max_input_token` the wrapper raises
`CircuitBreakerError` with `reason == "max_input_tokens"` **before** the
underlying runnable / runner / query is called. Return `None` to skip the
check for that invocation (e.g. when you can't tokenize the input shape). This
is opt-in — without an estimator the wrapper behaves as before. No tokenizer is
bundled.

## `loop-killer` mode

```python
with_circuit_breaker(
    agent,
    mode="loop-killer",
    max_retries=3,               # default
    detect_repeated_state=True,  # default — hashes each step's state
)
```

With `detect_repeated_state=True` (default), the breaker hashes each step's
state (the latest message / turn input) and trips when any single state recurs
more than `max_retries` times. Set `detect_repeated_state=False` to fall back
to a plain iteration cap.

## Visibility — `on_event`

The breaker emits events you can log, surface in your UI, or pipe to your
observability stack.

```python
def handle(event):  # event: CircuitBreakerEvent
    print(event)

with_circuit_breaker(agent, mode="loop-killer", max_retries=2, on_event=handle)
```

`CircuitBreakerEvent` shapes (frozen dataclasses):

| Event                                                  | When                                              | Modes        |
| ------------------------------------------------------ | ------------------------------------------------- | ------------ |
| `RetryEvent(type="retry", retries: int)`               | A state recurred (`detect_repeated_state=True`) or each iteration past the first (`detect_repeated_state=False`) | loop-killer  |
| `StopEvent(type="stop", reason: StopReason, saved: int)` | The breaker tripped                             | both         |

`saved` is signed `limit - usage`: positive means headroom that won't be
spent, negative means the call that pushed us over the limit still counted.

`StopReason` is one of `"max_input_tokens"`, `"max_output_tokens"`,
`"max_retries"`, `"repeated_state"`.

## Graceful handling — `on_trip`

Provide `on_trip` to suppress the raise and return a fallback value:

```python
safe = with_circuit_breaker(
    agent,
    max_input_token=50_000,
    max_output_token=20_000,
    on_trip=lambda ctx: {
        "output": "Sorry, I had to stop early.",
        "reason": ctx.reason,
        "metrics": ctx.metrics,
    },
)
```

`on_trip` receives a `TripContext` (frozen dataclass) and may be sync or async:

```python
@dataclass(frozen=True)
class TripContext:
    reason: StopReason
    mode: Mode                  # "budget-guard" | "loop-killer"
    metrics: Metrics            # iterations, retries, tokens
    limits: ResolvedLimits      # the limits actually in force
    saved: int
    message: str
```

## LangChain

```python
from monetise_circuit_breaker.langchain import with_circuit_breaker

safe_executor = with_circuit_breaker(
    executor,                   # any LangChain Runnable (e.g. AgentExecutor)
    max_input_token=50_000,
    max_output_token=20_000,
)

await safe_executor.ainvoke({"input": "..."})   # or safe_executor.invoke(...)
```

Iterations are counted on `on_llm_start` / `on_chat_model_start`. Token usage
is read from `on_llm_end` with provider-agnostic extraction (OpenAI
`token_usage`, Anthropic `usage`, the newer `usage_metadata`). The handler sets
`raise_error = True`, so a trip propagates out of LangChain's callback dispatch
instead of being swallowed.

You can also attach the handler directly:

```python
from monetise_circuit_breaker.langchain import CircuitBreakerCallback

breaker = CircuitBreakerCallback(max_input_token=50_000)
await runnable.ainvoke(payload, config={"callbacks": [breaker]})
```

## OpenAI Agents SDK

```python
from agents import Agent
from monetise_circuit_breaker.openai_agents import with_circuit_breaker

agent = Agent(name="Assistant", instructions="...", tools=[...])

safe_agent = with_circuit_breaker(agent, mode="loop-killer", max_retries=3)

await safe_agent.run("Hello")
```

Iterations are counted on each LLM call (one per turn); the most recent input
item is hashed for loop detection. Tokens are read from the cumulative
`RunContext.usage` snapshot at each turn boundary. Because the SDK **awaits**
its lifecycle hooks, a trip raises `CircuitBreakerError` straight out of
`Runner.run` — no `AbortSignal` plumbing needed. Extra keyword arguments to
`run(...)` are forwarded to `Runner.run`; a caller-supplied `hooks` object is
composed with the breaker's, and `run_config` (passed at wrap time or per call)
is forwarded too.

> Streaming is not yet supported. Use the core `CircuitBreaker` directly if you
> need it.

## Claude Agent SDK

```python
from claude_agent_sdk import query
from monetise_circuit_breaker.claude_agent_sdk import with_circuit_breaker

safe_query = with_circuit_breaker(
    query,
    max_input_token=50_000,
    max_output_token=20_000,
)

async for message in safe_query(prompt="Analyze this repo"):
    ...  # messages stream through untouched
```

The wrapper takes the SDK's `query` function and returns a drop-in replacement
with the same call signature. It's itself an async generator — messages stream
through unchanged while the breaker watches them.

Iterations are counted on each `AssistantMessage` (one per turn); its content
blocks are hashed for `loop-killer` detection. Tokens are read from each
assistant message's `usage` (input counts `input_tokens` plus cache
read/creation tokens). When a limit is hit, iteration stops and the underlying
generator is closed. `estimate_input_tokens` receives the `prompt`.

With `on_trip`, the callback's return value is yielded as the generator's final
item instead of raising.

## LangGraph Platform SDK

For graphs deployed to **LangGraph Platform** and driven through the remote
`langgraph-sdk` client. (For an in-process `langgraph` graph, use the
[LangChain adapter](#langchain) — a compiled graph is a `Runnable` and
propagates callbacks.)

```python
from langgraph_sdk import get_client
from monetise_circuit_breaker.langgraph_sdk import with_circuit_breaker

client = get_client(url="http://localhost:2024")
runs = with_circuit_breaker(
    client.runs,
    max_input_token=50_000,
    max_output_token=20_000,
)

async for chunk in runs.stream(
    thread_id,
    "agent",
    input={"messages": [{"role": "user", "content": "Analyze this repo"}]},
    stream_mode="updates",
):
    ...  # chunks stream through untouched
```

The wrapper takes `client.runs` and returns an object with the same
`stream(thread_id, assistant_id, **payload)` signature.

Because the graph executes server-side, the breaker is driven off the `events`
stream mode — the only mode that reports both per-LLM-call boundaries and token
usage. The wrapper **forces `events` into the run's `stream_mode`**; if you
didn't request it, those injected chunks are consumed internally and never
yielded, so your stream is unchanged. Iterations are counted on each
`on_chat_model_start`; tokens are read from each `on_chat_model_end`'s
`usage_metadata`. For `loop-killer`, the latest input message is hashed.

On a trip the wrapper stops the local stream **and** calls
`client.runs.cancel(...)` to stop the run server-side (the run id is taken from
the `metadata` event) — closing the connection alone would leave the graph
running. `estimate_input_tokens` receives
`{"thread_id", "assistant_id", "payload"}`.

With `on_trip`, the callback's return value is yielded as the generator's final
item instead of raising.

## Trip output

When a limit is reached the breaker logs (via the standard `logging` module
under the `monetise_circuit_breaker` logger) and raises:

```
[circuit-breaker] Agent stopped: input token budget exceeded (10120/10000; iterations: 8).
```

Pass `silent=True` to suppress the log, or `logger=lambda msg, ctx: ...` to
send it elsewhere.

## Options reference

| Field                   | Mode         | Type                              | Default          | Description                                                                  |
| ----------------------- | ------------ | -------------------------------- | ---------------- | ---------------------------------------------------------------------------- |
| `mode`                  | both         | `Mode`                           | `"budget-guard"` | `"budget-guard"` or `"loop-killer"`.                                         |
| `max_input_token`       | budget-guard | `int ≥ 1`                        | `10_000`         | Max aggregate input tokens before trip (post-hoc).                           |
| `max_output_token`      | budget-guard | `int ≥ 1`                        | `10_000`         | Max aggregate output tokens before trip (post-hoc).                          |
| `estimate_input_tokens` | budget-guard | `(input) -> int \| None`         | —                | Preflight estimator; trips before the call when the estimate exceeds `max_input_token`. |
| `max_retries`           | loop-killer  | `int ≥ 1`                        | `3`              | Max times the same state may recur (or, with detection off, raw iterations). |
| `detect_repeated_state` | loop-killer  | `bool`                           | `True`           | Hash each step's state for loop detection.                                   |
| `silent`                | both         | `bool`                           | `False`          | Suppress the default trip log.                                               |
| `logger`                | both         | `(message, context) -> None`     | default logger   | Custom trip logger. Ignored when `silent=True`.                              |
| `on_event`              | both         | `EventListener`                  | —                | Receives `CircuitBreakerEvent` updates.                                      |
| `on_trip`               | wrappers     | `OnTrip` (sync or async)         | —                | Suppress the raise and use the callback's return value instead.             |

All numeric options are validated at construction. A wrong **type** raises
`TypeError` (a non-integer such as `1.5`, `NaN`, or `Infinity`); a wrong
**value** of the right type raises `ValueError` (`0` or a negative integer).

## Rolling your own adapter

The core is framework-agnostic — use `CircuitBreaker` directly for any
framework not shipped here:

```python
from monetise_circuit_breaker import CircuitBreaker, CircuitBreakerError

breaker = CircuitBreaker(max_input_token=50_000, max_output_token=20_000)

# on each new LLM call / agent turn:
breaker.record_iteration(state_key)        # state_key summarises the step (loop-killer)
# on per-call usage:
breaker.add_tokens(input_delta, output_delta)
# or, when the framework exposes running totals:
breaker.set_token_snapshot(total_input, total_output)
```

See [`AGENTS.md`](./AGENTS.md) for the full adapter recipe.

## Contributing

We built Circuit Breaker to solve the immediate, visceral pain of runaway agent
costs and infinite loops. The API is intentionally minimal — `budget-guard` and
`loop-killer` — and the roadmap is driven by how you use (or fight) the tool in
the wild.

We especially want to hear from you if it *almost* fits, if you're building
workarounds, or if your use case diverges from the defaults. Open an issue or
share a snippet — your edge cases are our roadmap.

See [`AGENTS.md`](./AGENTS.md) for the project layout, test commands, and the
recipe for adding a new framework adapter.

## License

Apache-2.0 — © 2026 MonetiseBG
