Metadata-Version: 2.4
Name: forge-ml-serve
Version: 0.2.1
Summary: A lightweight ML inference server with dynamic batching, hot-swapping, and Prometheus metrics
License: MIT
Keywords: batching,inference,machine-learning,pytorch,serving
Requires-Python: >=3.10
Requires-Dist: fastapi>=0.111.0
Requires-Dist: numpy>=1.24.0
Requires-Dist: prometheus-client>=0.20.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: torch>=2.0.0
Requires-Dist: uvicorn[standard]>=0.29.0
Provides-Extra: bench
Requires-Dist: httpx>=0.27.0; extra == 'bench'
Requires-Dist: matplotlib>=3.8.0; extra == 'bench'
Requires-Dist: tqdm>=4.66.0; extra == 'bench'
Provides-Extra: dev
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Description-Content-Type: text/markdown

# Forge

A lightweight ML inference server with dynamic batching, model hot-swapping, a multi-model registry and Prometheus metrics -- built from scratch in Python.

This is not `model.predict()` behind Flask. Forge implements the core mechanics of what production runtimes like TorchServe and Triton do, from first principles: request queuing, batch assembly, concurrent GPU scheduling and zero-downtime model replacement.

```bash
pip install forge-ml-serve
```


---

## Features

| Feature | Description |
|---|---|
| Dynamic batching | Accumulates concurrent requests over a configurable time window then stacks them into a single tensor operation. Typically 4-8x throughput vs. sequential serving. |
| Backpressure queue | asyncio.Queue with a hard depth cap. Returns HTTP 503 immediately when saturated rather than blocking the event loop. |
| Model hot-swapping | Load a new checkpoint with zero downtime. In-flight requests finish on the old model; new requests use the new one. |
| Multi-model registry | Serve multiple models simultaneously at independent endpoints, each with its own queue and batching config. |
| Prometheus metrics | P50/P95/P99 latency histograms, batch size distribution, queue depth, timeout counters and swap duration. |

---

## Architecture

```
  HTTP Request
       |
       v
  +-------------------------------------+
  |  FastAPI  POST /v1/{model}/predict  |
  +------------------+------------------+
                     | asyncio.Future
                     v
  +-------------------------------------+
  |  RequestQueue  (backpressure cap)   |
  +------------------+------------------+
                     | blocking get / nowait drain
                     v
  +-------------------------------------+
  |  BatchScheduler                     |
  |  +-------------------------------+  |
  |  | Collect for batch_window_ms   |  |
  |  |   OR until max_batch_size     |  |  <- whichever fires first
  |  +--------------+----------------+  |
  |                 | run_in_executor   |
  |  +--------------v----------------+  |
  |  |  torch.no_grad() forward pass |  |
  |  +--------------+----------------+  |
  |                 | scatter results   |
  +-----------------|-------------------+
                    |
                    v
         Future.set_result(tensor)
                    |
                    v
         HTTP Response to caller
```

When 50 clients send requests at the same time, Forge does not run 50 separate inferences. It groups them into batches (up to your configured `max_batch_size`), runs a single GPU forward pass, then splits and returns the individual results to each caller. That is the core value.

---

## Installation

### From PyPI (recommended)

```bash
pip install forge-ml-serve
```

### From source

```bash
git clone https://github.com/verz0/Forge.git
cd Forge
pip install -e ".[dev]"
```

---

## Quickstart

### 1. Serve a dummy model (no GPU needed)

```bash
forge serve examples/serve_dummy.py
```

```bash
curl -X POST http://localhost:8000/v1/dummy/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [1.0, 2.0, 3.0]}'
```

Response:
```json
{
  "model": "dummy",
  "output": [2.0, 4.0, 6.0],
  "request_id": "forge",
  "latency_ms": 1.2
}
```

### 2. Serve ResNet-50

```bash
pip install torchvision
forge serve examples/serve_resnet.py
```

### 3. Check metrics (Prometheus format)

```bash
curl http://localhost:8000/metrics
```

### 4. Interactive API docs

Open `http://localhost:8000/docs` in your browser. FastAPI auto-generates a full Swagger UI for every registered model endpoint.

---

## Tutorials

### Tutorial 1: Serve a Custom PyTorch Model

Any `nn.Module` can be served through Forge. Write a config file with an async `setup(registry)` function that registers your model.

**sentiment_server.py:**

```python
import torch
import torch.nn as nn
from forge import ModelConfig, ModelRegistry


class SentimentModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Linear(256, 3),  # negative, neutral, positive
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


async def setup(registry: ModelRegistry) -> None:
    model = SentimentModel()
    model.load_state_dict(torch.load("sentiment_weights.pt"))
    model.eval()

    config = ModelConfig(
        batch_window_ms=25.0,   # collect requests for 25ms then batch
        max_batch_size=32,
        max_queue_depth=256,
        device="cuda" if torch.cuda.is_available() else "cpu",
    )

    await registry.register("sentiment", model, config)
```

Start the server:

```bash
forge serve sentiment_server.py
```

Send a request:

```bash
curl -X POST http://localhost:8000/v1/sentiment/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [0.1, 0.2, 0.3, ...]}'
```

Response:

```json
{
  "model": "sentiment",
  "output": [0.12, 0.03, 0.85],
  "request_id": "forge",
  "latency_ms": 4.21
}
```

---

### Tutorial 2: Serve Multiple Models Simultaneously

Register any number of models in a single config file. Each model gets its own endpoint, queue and batching configuration.

**multi_server.py:**

```python
import torch
import torch.nn as nn
from forge import ModelConfig, ModelRegistry


class ImageClassifier(nn.Module):
    def forward(self, x):
        return torch.softmax(x.mean(dim=-1, keepdim=True).expand(-1, 10), dim=-1)


class TextEmbedder(nn.Module):
    def forward(self, x):
        return x / x.norm(dim=-1, keepdim=True)


async def setup(registry: ModelRegistry) -> None:
    # Image classifier on GPU with larger batch window
    await registry.register("image-classifier", ImageClassifier(), ModelConfig(
        batch_window_ms=50.0,
        max_batch_size=16,
        device="cuda",
    ))

    # Text embedder on CPU with fast turnaround
    await registry.register("text-embedder", TextEmbedder(), ModelConfig(
        batch_window_ms=10.0,
        max_batch_size=64,
        device="cpu",
    ))
```

```bash
forge serve multi_server.py
```

Two independent endpoints are now live:

```bash
# Classify an image
curl -X POST http://localhost:8000/v1/image-classifier/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [0.5, 0.3, ...]}'

# Generate an embedding
curl -X POST http://localhost:8000/v1/text-embedder/predict \
     -H "Content-Type: application/json" \
     -d '{"input": [0.1, 0.2, ...]}'

# See all registered models
curl http://localhost:8000/v1/models
```

---

### Tutorial 3: Hot-Swap a Model Without Downtime

You have retrained your model overnight. Instead of restarting the server (which drops all in-flight requests), swap it live.

**Step 1 -- Save the new model as TorchScript:**

```python
import torch

new_model = SentimentModel()
new_model.load_state_dict(torch.load("sentiment_v2.pt"))

scripted = torch.jit.script(new_model)
torch.jit.save(scripted, "sentiment_v2_scripted.pt")
```

**Step 2 -- Tell Forge to swap:**

```bash
curl -X POST http://localhost:8000/v1/sentiment/reload \
     -H "Content-Type: application/json" \
     -d '{"model_path": "/path/to/sentiment_v2_scripted.pt"}'
```

Response:

```json
{"status": "swapped", "model": "sentiment", "path": "/path/to/sentiment_v2_scripted.pt"}
```

Zero downtime. Requests that were already being processed finish on the old model. New requests immediately use the new one.

---

### Tutorial 4: Monitor with Prometheus

Forge exposes production-grade metrics at the `/metrics` endpoint in Prometheus text format.

```bash
curl http://localhost:8000/metrics
```

Tracked metrics include:

- **Request latency** -- P50, P95, P99 histograms per model
- **Batch size distribution** -- how effectively requests are being grouped
- **Queue depth** -- current backlog per model
- **Timeout counters** -- requests that exceeded the configured timeout
- **Swap duration** -- time taken for each model hot-swap operation

Connect this to a Prometheus scrape target and visualize in Grafana for real-time dashboards.

---

## Configuration Reference

```python
from forge import ModelConfig

config = ModelConfig(
    batch_window_ms=50.0,   # Collect requests for 50ms before dispatching
    max_batch_size=32,       # Early flush if 32 requests accumulate first
    max_queue_depth=256,     # Return 503 if more than 256 requests are pending
    request_timeout_s=30.0,  # Fail requests waiting longer than 30s
    device="cuda",           # "cpu", "cuda", "cuda:0" or "mps"
    num_threads=4,           # PyTorch intraop threads (CPU only)
)
```

| Parameter | Default | Description |
|---|---|---|
| `batch_window_ms` | 50.0 | Time window in milliseconds to collect requests before dispatching a batch |
| `max_batch_size` | 32 | Maximum number of requests per batch. Flushes early if reached before the window expires |
| `max_queue_depth` | 256 | Maximum pending requests. Returns HTTP 503 when exceeded |
| `request_timeout_s` | 30.0 | Per-request timeout. Returns HTTP 504 on expiry |
| `device` | "cpu" | PyTorch device string for inference |
| `num_threads` | 4 | PyTorch intra-op thread count (relevant for CPU inference) |

---

## API Reference

| Endpoint | Method | Description |
|---|---|---|
| `/v1/{model}/predict` | POST | Submit a tensor for batched inference |
| `/v1/{model}/reload` | POST | Hot-swap to a new TorchScript checkpoint |
| `/v1/models` | GET | List all registered models and their queue depths |
| `/metrics` | GET | Prometheus metrics in text format |
| `/health` | GET | Liveness probe with per-model readiness status |
| `/docs` | GET | Interactive Swagger API documentation |

---

## Benchmark

```bash
# Start the server
forge serve examples/serve_dummy.py

# Run the sweep in another terminal
python benchmarks/bench_throughput.py --concurrency 1,5,10,25,50,100

# With chart output
python benchmarks/bench_throughput.py --plot
```

**Sample results** (CPU, dummy model, 128-float input):

| Concurrency | RPS | P50 (ms) | P95 (ms) | P99 (ms) |
|---|---|---|---|---|
| 1 | 420 | 2.1 | 2.8 | 3.1 |
| 10 | 1,850 | 4.9 | 8.2 | 11.4 |
| 50 | 3,200 | 14.8 | 28.3 | 41.7 |
| 100 | 3,400 | 28.1 | 52.6 | 71.2 |

At concurrency 50, batching yields roughly 7.6x the throughput of a naive sequential server at the cost of approximately 15ms added latency (the batch window).

---

## Running Tests

```bash
pytest tests/ -v
```

---

## Project Structure

```
forge/
  forge/
    config.py      # ModelConfig and ServerConfig dataclasses
    queue.py       # RequestQueue with backpressure and InferenceRequest
    batcher.py     # BatchScheduler -- the core batching engine
    worker.py      # ModelWorker with hot-swap protocol
    registry.py    # Multi-model ModelRegistry
    metrics.py     # Prometheus metric definitions
    server.py      # FastAPI application and route handlers
    cli.py         # forge serve CLI entry point
  tests/
  benchmarks/
  examples/
```

---

## License

MIT
