Metadata-Version: 2.4
Name: fastapi-openai-compat
Version: 0.2.0
Summary: FastAPI router factory for OpenAI-compatible Chat Completion endpoints
Project-URL: Source, https://github.com/deepset-ai/fastapi-openai-compat
Project-URL: Issues, https://github.com/deepset-ai/fastapi-openai-compat/issues
License-Expression: Apache-2.0
License-File: LICENSE
Keywords: chat-completion,fastapi,haystack,openai,streaming
Classifier: Development Status :: 3 - Alpha
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Requires-Python: >=3.10
Requires-Dist: fastapi
Requires-Dist: pydantic
Provides-Extra: haystack
Requires-Dist: haystack-ai; extra == 'haystack'
Description-Content-Type: text/markdown

# fastapi-openai-compat

[![PyPI - Version](https://img.shields.io/pypi/v/fastapi-openai-compat.svg)](https://pypi.org/project/fastapi-openai-compat)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/fastapi-openai-compat.svg)](https://pypi.org/project/fastapi-openai-compat)
[![Tests](https://github.com/deepset-ai/fastapi-openai-compat/actions/workflows/tests.yml/badge.svg)](https://github.com/deepset-ai/fastapi-openai-compat/actions/workflows/tests.yml)

FastAPI router factory for OpenAI-compatible [Chat Completions](https://platform.openai.com/docs/api-reference/chat) endpoints.

Provides a configurable `APIRouter` that exposes `/v1/chat/completions` and `/v1/models` endpoints,
following the [OpenAI API specification](https://platform.openai.com/docs/api-reference/chat),
with support for streaming (SSE), non-streaming responses, tool calling, configurable hooks, and custom chunk mapping.

## Installation

```bash
pip install fastapi-openai-compat
```

With Haystack `StreamingChunk` support:

```bash
pip install fastapi-openai-compat[haystack]
```

## Quick start

Create an OpenAI-compatible Chat Completions server in a few lines. Both sync and async
callables are supported -- sync callables are automatically executed in a thread pool
so they never block the async event loop.

```python
from fastapi import FastAPI
from fastapi_openai_compat import create_openai_router, CompletionResult

def list_models() -> list[str]:
    return ["my-pipeline"]

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    # Your (potentially blocking) pipeline execution logic here
    return "Hello from Haystack!"

app = FastAPI()
router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
)
app.include_router(router)
```

Async callables work the same way:

```python
async def list_models() -> list[str]:
    return ["my-pipeline"]

async def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    return "Hello from Haystack!"
```

## The `run_completion` callable

The `run_completion` callable receives three arguments:

| Argument   | Type         | Description |
|------------|--------------|-------------|
| `model`    | `str`        | The model name from the request (e.g. `"my-pipeline"`). |
| `messages` | `list[dict]` | The conversation history in OpenAI format. |
| `body`     | `dict`       | The full request body, including all extra parameters (e.g. `temperature`, `max_tokens`, `stream`, `metadata`, `tools`). |

The request model accepts any additional fields beyond `model`, `messages`, and `stream`.
These extra parameters are forwarded as-is in the `body` dict, so you can use them
however you need without any library changes.

For example, you can access `metadata` and any other extra field from `body`:

```python
import time
from fastapi_openai_compat import ChatCompletion, Choice, Message, CompletionResult

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    metadata = body.get("metadata", {})
    temperature = body.get("temperature", 1.0)
    request_id = metadata.get("request_id", "unknown")

    return ChatCompletion(
        id=f"resp-{request_id}",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[
            Choice(
                index=0,
                message=Message(role="assistant", content="Hello!"),
                finish_reason="stop",
            )
        ],
        metadata={"request_id": request_id, "temperature_used": temperature},
    )
```

A client can then send:

```bash
curl http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "my-pipeline",
    "messages": [{"role": "user", "content": "Hello!"}],
    "temperature": 0.7,
    "metadata": {"request_id": "abc-123", "user_tier": "premium"}
  }'
```

The `metadata` field in the response works because `ChatCompletion` also allows extra fields,
so you can attach any additional data to the response object.

The return type determines how the response is formatted:

| Return type        | Behavior |
|--------------------|----------|
| `str`              | Wrapped automatically into a `ChatCompletion` response. |
| `Generator`        | Each yielded chunk is converted to a `chat.completion.chunk` SSE message. |
| `AsyncGenerator`   | Same as `Generator`, but async. |
| `ChatCompletion`   | Returned as-is for full control over the response. |

## Response types

### Returning a string

The simplest option -- return a plain string and the library wraps it as a
complete `ChatCompletion` response automatically:

```python
def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    last_msg = messages[-1]["content"]
    return f"You said: {last_msg}"
```

### Streaming with a generator

Return a generator to stream responses token by token via SSE.
Each yielded string is automatically wrapped into a `chat.completion.chunk` message --
you only need to yield the text content, the library handles the SSE wire format.
A `finish_reason="stop"` sentinel is appended automatically at the end of the stream.

Your `run_completion` should check `body.get("stream", False)` to decide whether
to return a generator or a plain string:

```python
from collections.abc import Generator

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    last_msg = messages[-1]["content"]

    if body.get("stream", False):
        def stream() -> Generator[str, None, None]:
            for word in last_msg.split():
                yield word + " "
        return stream()

    return f"You said: {last_msg}"
```

Async generators work the same way:

```python
from collections.abc import AsyncGenerator

async def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    async def stream() -> AsyncGenerator[str, None]:
        for word in ["Hello", " from", " Haystack", "!"]:
            yield word
    return stream()
```

### Returning a ChatCompletion

For full control over the response (e.g. custom `usage`, `finish_reason`, or `system_fingerprint`),
return a `ChatCompletion` object directly:

```python
import time
from fastapi_openai_compat import ChatCompletion, Choice, Message, CompletionResult

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    return ChatCompletion(
        id="resp-1",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[
            Choice(
                index=0,
                message=Message(role="assistant", content="Hello!"),
                finish_reason="stop",
            )
        ],
        usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15},
    )
```

## Tool calling

### Returning ChatCompletion directly

For tool calls and other advanced responses, return a `ChatCompletion` directly
from `run_completion` for full control over the response structure:

```python
import time
from fastapi_openai_compat import ChatCompletion, Choice, Message, CompletionResult

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    return ChatCompletion(
        id="resp-1",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[
            Choice(
                index=0,
                message=Message(
                    role="assistant",
                    content=None,
                    tool_calls=[{
                        "id": "call_1",
                        "type": "function",
                        "function": {"name": "get_weather", "arguments": '{"city": "Paris"}'},
                    }],
                ),
                finish_reason="tool_calls",
            )
        ],
    )
```

Streaming tool calls work the same way -- yield `ChatCompletion` chunk objects
from your generator and the library serializes them directly as SSE:

```python
def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    def stream():
        yield ChatCompletion(
            id="resp-1", object="chat.completion.chunk",
            created=int(time.time()), model=model,
            choices=[Choice(index=0, delta=Message(
                role="assistant",
                tool_calls=[{"index": 0, "id": "call_1", "type": "function",
                             "function": {"name": "get_weather", "arguments": ""}}],
            ))],
        )
        yield ChatCompletion(
            id="resp-1", object="chat.completion.chunk",
            created=int(time.time()), model=model,
            choices=[Choice(index=0, delta=Message(
                role="assistant",
                tool_calls=[{"index": 0, "function": {"arguments": '{"city": "Paris"}'}}],
            ))],
        )
        yield ChatCompletion(
            id="resp-1", object="chat.completion.chunk",
            created=int(time.time()), model=model,
            choices=[Choice(index=0, delta=Message(role="assistant"), finish_reason="tool_calls")],
        )
    return stream()
```

### Automatic StreamingChunk support

When using Haystack's `StreamingChunk` (requires `pip install fastapi-openai-compat[haystack]`),
tool call deltas and finish reasons are handled automatically via duck typing:

```python
from haystack.dataclasses import StreamingChunk
from haystack.dataclasses.streaming_chunk import ToolCallDelta

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    def stream():
        yield StreamingChunk(
            content="",
            tool_calls=[ToolCallDelta(
                index=0, id="call_1",
                tool_name="get_weather", arguments='{"city": "Paris"}',
            )],
            index=0,
        )
        yield StreamingChunk(content="", finish_reason="tool_calls")
    return stream()
```

The library automatically:

- Converts `ToolCallDelta` objects to OpenAI wire format (`tool_calls[].function.name/arguments`)
- Propagates `finish_reason` from chunks (e.g. `"stop"`, `"tool_calls"`, `"length"`)
- Only auto-appends `finish_reason="stop"` if no chunk already carried a finish reason
- Works via duck typing -- any object with `tool_calls` and `finish_reason` attributes is supported

## Custom SSE events

You can yield custom SSE events alongside regular chat completion chunks. This is useful
for sending side-channel data to clients like [Open WebUI](https://openwebui.com) --
status updates, notifications, source citations, etc.

Any object with a `.to_event_dict()` method is recognized as a custom event and serialized
as `data: {"event": {...}}` in the SSE stream. Custom events don't interfere with
chat completion chunks or the `finish_reason` tracking.

```python
from collections.abc import Generator
from fastapi_openai_compat import CompletionResult

class StatusEvent:
    def __init__(self, description: str, done: bool = False):
        self.description = description
        self.done = done

    def to_event_dict(self) -> dict:
        return {"type": "status", "data": {"description": self.description, "done": self.done}}

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    def stream() -> Generator[str | StatusEvent, None, None]:
        yield StatusEvent("Processing your request...")
        for word in ["Hello", " from", " Haystack", "!"]:
            yield word
        yield StatusEvent("Done", done=True)
    return stream()
```

This works via duck typing -- any object implementing `to_event_dict() -> dict` is supported.
The protocol is compatible with [Hayhooks' Open WebUI events](https://deepset-ai.github.io/hayhooks/).

## Hooks

You can inject pre/post hooks to modify requests and results (transformer hooks)
or to observe them without modification (observer hooks). Both sync and async
hooks are supported.

### Transformer hooks

Return a modified value to transform the request or result:

```python
from fastapi_openai_compat import ChatRequest, CompletionResult

async def pre_hook(request: ChatRequest) -> ChatRequest:
    # e.g. inject system prompts, validate, rate-limit
    return request

async def post_hook(result: CompletionResult) -> CompletionResult:
    # e.g. transform, filter
    return result

router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
    pre_hook=pre_hook,
    post_hook=post_hook,
)
```

### Observer hooks

Return `None` to observe without modifying (useful for logging, metrics, etc.):

```python
def log_request(request: ChatRequest) -> None:
    print(f"Request for model: {request.model}")

def log_result(result: CompletionResult) -> None:
    print(f"Got result type: {type(result).__name__}")

router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
    pre_hook=log_request,
    post_hook=log_result,
)
```

## Custom chunk mapping

By default the router handles plain `str` chunks and objects with a `.content`
attribute (e.g. Haystack `StreamingChunk`). If your pipeline streams a different
type, provide a `chunk_mapper` to extract text content:

```python
from dataclasses import dataclass

@dataclass
class MyChunk:
    text: str
    score: float

def my_mapper(chunk: MyChunk) -> str:
    return chunk.text

router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
    chunk_mapper=my_mapper,
)
```

This works with any object -- dataclasses, dicts, Pydantic models, etc.:

```python
def dict_mapper(chunk: dict) -> str:
    return chunk["payload"]
```

## Examples

The [`examples/`](examples/) folder contains ready-to-run servers:

- **[`basic.py`](examples/basic.py)** -- Minimal echo server, no external API keys required.
- **[`haystack_chat.py`](examples/haystack_chat.py)** -- Haystack `OpenAIChatGenerator` with streaming support.

See the [examples README](examples/README.md) for setup and usage instructions.

## API reference

This library implements endpoints compatible with the [OpenAI Chat Completions API](https://platform.openai.com/docs/api-reference/chat).

### `create_openai_router`

```python
create_openai_router(
    *,
    list_models,
    run_completion,
    pre_hook=None,
    post_hook=None,
    chunk_mapper=default_chunk_mapper,
    owned_by="custom",
    tags=None,
) -> APIRouter
```

| Parameter        | Type                      | Description |
|------------------|---------------------------|-------------|
| `list_models`    | `Callable -> list[str]`   | Returns available model/pipeline names. |
| `run_completion` | `Callable -> CompletionResult` | Runs a chat completion given `(model, messages, body)`. |
| `pre_hook`       | `Callable` or `None`      | Called before `run_completion`. Receives `ChatRequest`, returns modified request (transformer) or `None` (observer). |
| `post_hook`      | `Callable` or `None`      | Called after `run_completion`. Receives `CompletionResult`, returns modified result (transformer) or `None` (observer). |
| `chunk_mapper`   | `Callable[[Any], str]`    | Converts streamed chunks to strings. Default handles `str` and `.content` attribute. |
| `owned_by`       | `str`                     | Value for the `owned_by` field in model objects. Defaults to `"custom"`. |
| `tags`           | `list[str]` or `None`     | OpenAPI tags for the generated endpoints. Defaults to `["openai"]`. |

### Endpoints

The router exposes the following endpoints (with and without the `/v1` prefix):

| Method | Path                        | Description |
|--------|-----------------------------|-------------|
| `GET`  | `/v1/models`                | List available models. |
| `POST` | `/v1/chat/completions`      | Create a chat completion (streaming or non-streaming). |
| `GET`  | `/models`                   | Alias for `/v1/models`. |
| `POST` | `/chat/completions`         | Alias for `/v1/chat/completions`. |
