Metadata-Version: 2.4
Name: mcp-backpressure
Version: 0.1.1
Summary: Backpressure/concurrency control middleware for FastMCP MCP servers
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fastmcp>=2.9.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: ruff>=0.1; extra == "dev"
Requires-Dist: mypy>=1.0; extra == "dev"
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: ruff>=0.4; extra == "dev"
Requires-Dist: mypy>=1.10; extra == "dev"
Requires-Dist: pytest-cov>=5.0; extra == "dev"
Dynamic: license-file

# mcp-backpressure

Backpressure and concurrency control middleware for [FastMCP](https://github.com/jlowin/fastmcp) MCP servers.

**Problem:** LLMs can generate hundreds of parallel tool calls, causing resource exhaustion, server crashes, and no structured feedback for clients to retry.

**Solution:** Middleware that limits concurrent executions, queues excess requests with timeout, and returns structured JSON-RPC overload errors.

## Quickstart

```python
from fastmcp import FastMCP
from mcp_backpressure import BackpressureMiddleware

mcp = FastMCP("MyServer")
mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,      # Max parallel executions
    queue_size=10,         # Bounded queue for waiting requests
    queue_timeout=30.0,    # Queue wait timeout (seconds)
))
```

## Installation

```bash
pip install mcp-backpressure
```

## Features

- **Concurrency limiting**: Semaphore-based control of parallel executions
- **Bounded queue**: Optional FIFO queue with configurable size
- **Queue timeout**: Automatic timeout for queued requests with cleanup
- **Structured errors**: JSON-RPC compliant overload errors with detailed metrics
- **Metrics**: Real-time counters for active, queued, and rejected requests
- **Callback hook**: Optional notification on each overload event
- **Zero dependencies**: Only requires FastMCP and Python 3.10+

## Usage

### Basic Configuration

```python
from mcp_backpressure import BackpressureMiddleware

mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,           # Required: max parallel tool executions
    queue_size=10,              # Optional: bounded queue (0 = no queue)
    queue_timeout=30.0,         # Optional: seconds to wait in queue
    overload_error_code=-32001, # Optional: JSON-RPC error code
    on_overload=callback,       # Optional: called on each overload
))
```

### Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `max_concurrent` | `int` | **required** | Maximum number of concurrent tool executions. Must be >= 1. |
| `queue_size` | `int` | `0` | Maximum queue size for waiting requests. Set to 0 to reject immediately when limit reached. |
| `queue_timeout` | `float` | `30.0` | Maximum time (seconds) a request can wait in queue before timing out. Must be > 0. |
| `overload_error_code` | `int` | `-32001` | JSON-RPC error code returned when server is overloaded. |
| `on_overload` | `Callable` | `None` | Optional callback `(error: OverloadError) -> None` invoked on each overload. |

### Error Handling

When the server is overloaded, requests are rejected with a structured JSON-RPC error:

```json
{
  "code": -32001,
  "message": "SERVER_OVERLOADED",
  "data": {
    "reason": "queue_full",
    "active": 5,
    "queued": 10,
    "max_concurrent": 5,
    "queue_size": 10,
    "queue_timeout_ms": 30000,
    "retry_after_ms": 1000
  }
}
```

#### Overload Reasons

| Reason | Description |
|--------|-------------|
| `concurrency_limit` | All execution slots full and no queue configured (queue_size=0) |
| `queue_full` | All execution slots and queue slots are full |
| `queue_timeout` | Request waited in queue longer than `queue_timeout` |

### Metrics

Get real-time metrics from the middleware:

```python
metrics = middleware.get_metrics()  # Synchronous

print(f"Active: {metrics.active}")
print(f"Queued: {metrics.queued}")
print(f"Total rejected: {metrics.total_rejected}")
print(f"Rejected (concurrency): {metrics.rejected_concurrency_limit}")
print(f"Rejected (queue full): {metrics.rejected_queue_full}")
print(f"Rejected (timeout): {metrics.rejected_queue_timeout}")
```

For async contexts, use `await middleware.get_metrics_async()`.

### Callback Hook

Register a callback to be notified of each overload event:

```python
def on_overload(error: OverloadError):
    print(f"OVERLOAD: {error.reason} (active={error.active})")
    # Log to monitoring system, update metrics, etc.

middleware = BackpressureMiddleware(
    max_concurrent=5,
    queue_size=10,
    on_overload=on_overload,
)
```

## Examples

### Simple Server

See [examples/simple_server.py](examples/simple_server.py) for a minimal FastMCP server with backpressure.

### Load Simulation

Run [examples/load_simulation.py](examples/load_simulation.py) to see backpressure behavior under heavy concurrent load:

```bash
python examples/load_simulation.py
```

This simulates 30 concurrent requests against a server limited to 5 concurrent executions with a queue of 10, demonstrating how the middleware handles overload.

## How It Works

The middleware provides two-level limiting:

1. **Semaphore** (max_concurrent): Controls active executions
2. **Bounded queue** (queue_size): Holds waiting requests with timeout

**Request flow:**
- If execution slot available → execute immediately
- If execution slots full and queue not full → wait in queue with timeout
- If queue full → reject with `queue_full`
- If timeout in queue → reject with `queue_timeout`

**Invariants** (guaranteed under all conditions):
- `active <= max_concurrent` ALWAYS
- `queued <= queue_size` ALWAYS
- Cancellation correctly frees slots and decrements counters
- Queue timeout removes item from queue

## Development

### Running Tests

```bash
python -m pytest tests/ -v
```

### Linting

```bash
ruff check src/ tests/
```

## Design Rationale

This library emerged from [python-sdk #1698](https://github.com/modelcontextprotocol/python-sdk/issues/1698) (closed as "not planned"). Key design decisions:

- **Global limits only** (v0.1): Per-client and per-tool limits deferred to v0.2+
- **Simple counters**: No Prometheus/OTEL dependencies by default
- **JSON-RPC errors**: Follows MCP protocol conventions
- **Monotonic time**: Queue timeouts use `time.monotonic()` for reliability

## License

MIT

## Contributing

Contributions welcome! Please open an issue before submitting PRs.

## Changelog

See [CHANGELOG.md](https://github.com/nulone/mcp-backpressure/blob/main/CHANGELOG.md)
