Metadata-Version: 2.4
Name: sunwaee
Version: 1.2.0
Summary: SUNWÆE gen — multi-provider LLM engine library.
Author: David NAISSE
Maintainer: David NAISSE
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: httpx>=0.27.0
Provides-Extra: files
Requires-Dist: pypdf>=4.0.0; extra == "files"
Requires-Dist: python-docx>=1.1.0; extra == "files"
Requires-Dist: openpyxl>=3.1.0; extra == "files"
Requires-Dist: python-pptx>=1.0.0; extra == "files"
Provides-Extra: dev
Requires-Dist: pytest>=8.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=1.0.0; extra == "dev"
Requires-Dist: pytest-cov>=5.0.0; extra == "dev"
Requires-Dist: setuptools_scm>=8; extra == "dev"
Requires-Dist: twine; extra == "dev"
Requires-Dist: build; extra == "dev"
Dynamic: license-file

# SUNWÆE

![Coverage](https://img.shields.io/badge/coverage-100%25-brightgreen) ![Python](https://img.shields.io/badge/python-3.11%2B-blue) ![PyPI](https://img.shields.io/pypi/v/sunwaee) ![License](https://img.shields.io/badge/license-MIT-blue)

All LLMs, one response format, one dependency (httpx). Supports switching model in conversations (e.g. draft with GPT, refine with Anthropic).

Handles streaming, tool calls, file attachments, prompt caching, extended thinking, and cost tracking across Anthropic, OpenAI, Google, DeepSeek, xAI, and Moonshot (and any other OpenAI-compatible provider).

---

## Install

```bash
pip install sunwaee
# with file/image attachment support (pdf, docx, xlsx, pptx extraction)
pip install "sunwaee[files]"
# or for development
pip install -e ".[dev,files]"
```

---

## Quick start

```python
import asyncio
from sunwaee.modules.gen.engine import get_engine
from sunwaee.modules.gen.engine.types import Message, Role

engine = get_engine("anthropic", "claude-sonnet-4-6")  # reads ANTHROPIC_API_KEY

async def main():
    messages = [Message(role=Role.USER, content="Hello")]

    # Non-streaming
    response = await engine.chat(messages)
    print(response.content)
    print(response.cost.total)  # in USD

    # Streaming
    async for chunk in engine.stream(messages):
        if chunk.content:
            print(chunk.content, end="", flush=True)

asyncio.run(main())
```

---

## Providers & API keys

| Provider  | `provider=`   | Env var             |
| --------- | ------------- | ------------------- |
| Anthropic | `"anthropic"` | `ANTHROPIC_API_KEY` |
| OpenAI    | `"openai"`    | `OPENAI_API_KEY`    |
| Google    | `"google"`    | `GOOGLE_API_KEY`    |
| DeepSeek  | `"deepseek"`  | `DEEPSEEK_API_KEY`  |
| xAI       | `"xai"`       | `XAI_API_KEY`       |
| Moonshot  | `"moonshot"`  | `MOONSHOT_API_KEY`  |

---

## Directory structure

```
sunwaee/
├── core/
│   ├── logger.py                 # get_logger(name) — scoped under "sunwaee.*"
│   └── tools.py                  # @tool decorator, ok(), err()
└── modules/gen/
    ├── __init__.py               # public re-exports (get_engine, run, stream_run, …)
    ├── agent.py                  # ReAct loop — run() + stream_run()
    ├── tools.py                  # TOOLS list (extend to add tools to the agent)
    └── engine/
        ├── __init__.py           # get_engine, Message, Response, Tool, …
        ├── base.py               # BaseEngine ABC
        ├── factory.py            # get_engine() — provider routing + connection pooling
        ├── model.py              # Model dataclass + compute_cost()
        ├── types.py              # Message, Response, ToolCall, Usage, Cost, Performance, …
        ├── models/               # model registry per provider
        │   ├── __init__.py       # get_model(), list_models()
        │   ├── anthropic.py
        │   ├── openai.py
        │   ├── google.py
        │   ├── deepseek.py
        │   ├── xai.py
        │   └── moonshot.py
        └── providers/            # engine implementations
            ├── anthropic.py      # AnthropicEngine
            ├── openai.py         # OpenAIEngine (also used by DeepSeek, xAI, Moonshot)
            └── google.py         # GoogleEngine

tests/gen/
├── test_agent.py
├── test_stream_agent.py
├── test_tools.py
└── engine/
    ├── test_types.py
    ├── test_factory.py
    ├── test_model.py
    ├── providers/
    │   ├── test_anthropic.py
    │   ├── test_openai.py
    │   └── test_google.py
    └── live/
        ├── test_providers.py     # real API calls, all providers × all scenarios
        └── run/                  # JSON snapshots written after each live run
```

---

## Core types (`engine/types.py`)

These types flow through every layer. Read them before touching any engine code.

```python
class Role(Enum):       SYSTEM, USER, ASSISTANT, TOOL
class StopReason(Enum): END_TURN, TOOL_USE, MAX_TOKENS

@dataclass
class FileAttachment:
    data: bytes                  # raw file bytes
    filename: str                # used for mime-type detection and the <file> wrapper
    media_type: str = ""         # auto-detected from filename if omitted

    # Supported types: text/*, image/jpeg|png|gif|webp,
    #   application/json|pdf + OOXML (docx, xlsx, pptx)
    # is_text  → serialized as <file name="…">…</file> text block
    # is_image → serialized as base64 inline image (provider-specific format)
    # as_text()    → decoded / extracted string (pdf/docx/xlsx/pptx use dedicated parsers)
    # as_base64()  → base64-encoded string for image parts

@dataclass
class Message:
    role: Role
    content: str | None = None
    reasoning_content: str | None = None    # thinking for models that support it
    reasoning_signature: str | None = None  # opaque blob — must be echoed back verbatim
    tool_call_id: str | None = None         # set on Role.TOOL messages
    tool_calls: list[ToolCall] | None = None
    attachments: list[FileAttachment] | None = None  # Role.USER only; ignored on other roles

@dataclass
class ToolCall:
    id: str
    name: str
    arguments: dict
    thought_signature: str | None = None    # Google only — lives on functionCall parts
    error: str | None = None
    duration: float = 0.0
    results: list[dict] = field(default_factory=list)

@dataclass
class Tool:
    name: str
    description: str
    parameters: dict        # JSON Schema object
    fn: Callable | None = None

@dataclass
class Response:
    provider: str
    model: str
    streaming: bool = False
    synthetic: bool = False         # sentinel — not a real model chunk
    content: str | None = None
    reasoning_content: str | None = None
    reasoning_signature: str | None = None
    tool_calls: list[ToolCall] | None = None
    stop_reason: StopReason | None = None
    error: str | None = None
    usage: Usage | None = None
    cost: Cost | None = None
    performance: Performance | None = None

@dataclass
class Usage:
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0
    cache_read_tokens: int = 0
    cache_write_tokens: int = 0

@dataclass
class Cost:
    input: float = 0.0
    output: float = 0.0
    cache_read: float = 0.0
    cache_write: float = 0.0
    total: float = 0.0

@dataclass
class Performance:
    latency: float = 0.0            # seconds to first chunk
    reasoning_duration: float = 0.0
    content_duration: float = 0.0
    total_duration: float = 0.0
    throughput: int = 0             # output tokens / second
```

---

## Usage

### Basic chat

```python
from sunwaee.modules.gen.engine import get_engine
from sunwaee.modules.gen.engine.types import Message, Role

engine = get_engine("anthropic", "claude-sonnet-4-6")
response = await engine.chat([Message(role=Role.USER, content="Hi")])
print(response.content, response.cost.total)
```

### Streaming

```python
async for chunk in engine.stream(messages):
    if chunk.content:
        print(chunk.content, end="", flush=True)
    if chunk.stop_reason is not None:
        print(chunk.usage, chunk.cost)  # only on the final chunk
```

### With tools

```python
from sunwaee.core.tools import tool, ok, err
from sunwaee.modules.gen.engine.types import Tool

@tool("Return the current UTC time.")
def get_time() -> str:
    from datetime import datetime, timezone
    return ok({"time": datetime.now(timezone.utc).isoformat()})

tools = [get_time._tool]
response = await engine.chat(messages, tools=tools)
```

### File and image attachments

```python
from sunwaee.modules.gen.engine.types import FileAttachment, Message, Role

# Text file — serialized as <file name="…">…</file> before the user text
with open("report.pdf", "rb") as f:
    pdf_att = FileAttachment(data=f.read(), filename="report.pdf")

response = await engine.chat([
    Message(role=Role.USER, content="Summarise this document.", attachments=[pdf_att])
])

# Image — serialized as base64 inline (provider-specific format)
# Raises ValueError if the model's supports_vision is False
with open("photo.png", "rb") as f:
    img_att = FileAttachment(data=f.read(), filename="photo.png")

response = await engine.chat([
    Message(role=Role.USER, content="What is in this image?", attachments=[img_att])
])
```

Supported attachment types:

| Category  | Mime types                                    | Extensions                          |
| --------- | --------------------------------------------- | ----------------------------------- |
| Text      | `text/*`                                      | `.txt`, `.csv`, `.md`, `.py`, …     |
| JSON      | `application/json`                            | `.json`                             |
| Image     | `image/jpeg`, `image/png`, `image/gif`, `image/webp` | `.jpg`, `.png`, `.gif`, `.webp` |
| PDF       | `application/pdf`                             | `.pdf`                              |
| Word      | `application/vnd…wordprocessingml.document`   | `.docx`                             |
| Excel     | `application/vnd…spreadsheetml.sheet`         | `.xlsx`                             |
| PowerPoint| `application/vnd…presentationml.presentation` | `.pptx`                             |

Text and document files are serialized as a `<file name="…">…</file>` text block (extracted via `pypdf`, `python-docx`, `openpyxl`, or `python-pptx`). Images are sent as base64 inline — no provider upload API is used.

### ReAct agent loop (automatic tool execution)

```python
from sunwaee.modules.gen.agent import stream_run

new_messages = []
async for chunk in stream_run(messages, tools, engine, new_messages=new_messages):
    if chunk.content:
        print(chunk.content, end="", flush=True)
# new_messages contains all assistant + tool turns appended during the run
```

`stream_run` runs up to 10 iterations by default. It calls `engine.stream()`, detects `TOOL_USE`, runs all tool calls concurrently with `asyncio.gather`, appends results, and loops. Sync tool functions are dispatched via `run_in_executor`.

### Listing available models

```python
from sunwaee.modules.gen.engine.models import list_models, get_model

all_models = list_models()              # list[Model]
model = get_model("claude-sonnet-4-6")  # Model | None
```

---

## Testing

```bash
# Unit tests — mocked HTTP, no API keys needed
pytest tests/gen/ -m "not live"

# Live tests — real API calls, keys required
pytest tests/gen/ -m live

# Coverage
pytest tests/gen/ -m "not live" --cov=sunwaee --cov-report=term-missing

# Single file / filter by name
pytest tests/gen/engine/providers/test_anthropic.py
pytest tests/gen/ -k "tool_call"
```

### Unit test conventions

- Mock `httpx.AsyncClient` — never make real HTTP calls in unit tests.
- Standard pattern (see `test_anthropic.py` / `test_openai.py`):

```python
import pytest

@pytest.fixture
def mock_client():
    class FakeResponse:
        status_code = 200
        def raise_for_status(self): pass
        def json(self): return { ... }  # provider response shape

    class FakeClient:
        async def post(self, *a, **kw): return FakeResponse()

    return FakeClient()

async def test_chat_basic(mock_client):
    engine = NewProviderEngine("model-name", "sk-test", client=mock_client)
    response = await engine.chat([Message(role=Role.USER, content="Hi")])
    assert response.content == "Hello"
    assert response.cost.total > 0
```

- For streaming, use an async generator as the mock transport.
- Always assert that `response.cost`, `response.usage`, and `response.performance` are populated on the final chunk.
- Live tests (`-m live`) are excluded from coverage and CI — they require real API keys.

### Live test scenarios

All providers are tested against 6 scenarios × chat + stream (12 calls total per provider):

| Scenario           | What it tests                                                        |
| ------------------ | -------------------------------------------------------------------- |
| `ONLY_SYSTEM`      | System-only input edge case; lenient assertions                      |
| `ONLY_USER`        | Single user message; asserts content + usage + cost populated        |
| `SYSTEM_AND_USER`  | System prompt is respected in the response                           |
| `TOOL_CALL`        | Model must issue at least one tool call                              |
| `TOOL_CALL_RESULT` | Full multi-turn with real tool IDs/signatures captured live first    |
| `FILE_ATTACHMENT`  | Text file attached to a user message; asserts content populated      |

`TOOL_CALL_RESULT` runs `TOOL_CALL` first to capture real tool IDs and reasoning signatures, then replays with tool results. Required because Google has no tool call IDs and Anthropic/Google require reasoning signatures echoed verbatim.

Image attachments are tested separately via `test_chat_image_attachment` / `test_stream_image_attachment`, parametrized over vision-capable engines only (all except `deepseek-chat`).

---

## How to add a model to an existing provider

**File:** `sunwaee/modules/gen/engine/models/<provider>.py`

Add a `Model(...)` entry to the `MODELS` list. Pricing fields are in USD per million tokens (`_per_mtok`).

```python
Model(
    name="provider-model-name",         # exact API model identifier
    display_name="Human Readable Name",
    provider="anthropic",
    context_window=200_000,
    max_output_tokens=64_000,
    input_price_per_mtok=3.0,
    output_price_per_mtok=15.0,
    cache_read_price_per_mtok=0.3,      # omit if provider doesn't support caching
    cache_write_price_per_mtok=3.75,
    input_price_per_mtok_200k=6.0,      # omit if no >200k tier
    output_price_per_mtok_200k=22.5,
    supports_vision=True,
    supports_tools=True,
    supports_thinking=True,
    supports_reasoning_tokens=True,
    release_date="2025-01-01",
    deprecated_at=None,
    sunset_at=None,                     # psql/sync_models.py uses this
)
```

**Pricing tiers** (`engine/model.py`): base always required; `_128k` when `input_tokens > 128_000` (xAI only); `_200k` when `> 200_000`; `_272k` when `> 272_000` (OpenAI only).

The registry in `engine/models/__init__.py` picks up the new model automatically.

**Tests:** Add assertions in `engine/test_model.py` if the model has non-standard pricing tiers.

---

## How to add an OpenAI-compatible provider

Use this path when the provider speaks the OpenAI Chat Completions API.

**Step 1 — Models:** Create `engine/models/<provider>.py` with a `MODELS` list.

**Step 2 — Register models:** In `engine/models/__init__.py`, import and add to `_ALL`.

**Step 3 — Register base URL:** In `engine/factory.py`, add to `_OPENAI_COMPATIBLE`:

```python
_OPENAI_COMPATIBLE: dict[str, str] = {
    ...
    "newprovider": "https://api.newprovider.com/v1",
}
```

The env var is auto-derived as `NEWPROVIDER_API_KEY`. `get_engine("newprovider", "model-name")` now works.

**Step 4 — Live tests:** Add `("newprovider", "cheapest-model")` to `ENGINES` in `tests/gen/engine/live/test_providers.py`.

---

## How to add a provider with a custom API

Use this path when the provider does **not** speak the OpenAI schema.

**Step 1 — Models:** Same as OpenAI-compatible step 1.

**Step 2 — Register models:** Same as above.

**Step 3 — Create the engine:** `engine/providers/<provider>.py`

```python
class NewProviderEngine(BaseEngine):
    BASE_URL = "https://api.newprovider.com"

    def __init__(self, model, api_key, max_tokens=8192, client=None):
        self.model = model
        self.api_key = api_key
        self.max_tokens = max_tokens
        self._client = client or httpx.AsyncClient()

    async def chat(self, messages, tools=None) -> Response: ...
    async def stream(self, messages, tools=None) -> AsyncIterator[Response]: ...
```

Key rules for every provider implementation:
1. Accept `client: httpx.AsyncClient | None = None` — the factory injects a pooled client.
2. Call `resolve_tokens(usage)` before `compute_cost`.
3. Strip `reasoning_content`/`reasoning_signature` from all assistant messages except the last.
4. Handle system-only input: promote system message to `Role.USER` if no other messages.
5. On 4xx/5xx in streaming, read the full body before raising.
6. Buffer tool call JSON across SSE chunks; parse only on the final stop event.

**Step 4 — Wire into the factory:** Import and add a case in `get_engine()` in `engine/factory.py`.

**Step 5 — Tests:**
- `tests/gen/engine/providers/test_newprovider.py` — unit tests with mocked HTTP. Cover: payload building, response parsing, streaming events, tool call accumulation, error handling.
- `tests/gen/engine/live/test_providers.py` — add `("newprovider", "cheapest-model")` to `ENGINES`.

---

## How to add a tool to the agent

**Step 1 — Implement the tool:**

```python
from typing import Annotated, Literal
from sunwaee.core.tools import tool, ok, err

@tool("Search the web for current information.")
def web_search(
    query: Annotated[str, "The search query"],
    num_results: Annotated[int, "Number of results to return"] = 5,
) -> str:
    try:
        results = _do_search(query, num_results)
        return ok(results)
    except Exception as e:
        return err(str(e))
```

**Step 2 — Register:** In `sunwaee/modules/gen/tools.py`, add `web_search._tool` to `TOOLS`.

**Step 3 — Tests:** Add `tests/gen/test_<tool_name>.py`. Call the function directly, assert JSON output shape, test the error path. Never call real external APIs in unit tests — mock them.

---

## `@tool` decorator reference

The `@tool` decorator introspects the function signature to build a JSON Schema `parameters` object automatically.

- Supports: `str`, `int`, `float`, `bool`, `list[T]`, `Literal[...]`, `Optional[T]`, `Annotated[T, "description"]`
- Parameters with defaults are not marked `required`; `Optional` parameters are also not required
- Both sync and async functions supported — `agent._execute` awaits async, dispatches sync to thread pool
- Must return a JSON string — use `ok()` / `err()`, or `json.dumps()`

```python
ok({"id": "123"})   # '{"ok": true, "data": {"id": "123"}}'
err("Not found")    # '{"ok": false, "error": "Not found"}'
```

---

## Provider-specific quirks

**Must read before touching any provider code.**

### 1. Token normalisation — `resolve_tokens()`

xAI and Google (with thinking) exclude reasoning tokens from `output_tokens` but include them in `total_tokens`. Always call `resolve_tokens(usage)` before `compute_cost`. It treats `total_tokens` as truth and back-calculates `output_tokens`.

### 2. Reasoning echoed only on the last assistant turn

Strip `reasoning_content` and `reasoning_signature` from every assistant message **except the very last one** before building the request. Stale signatures break the API. This also enables mid-session provider switches.

### 3. OpenAI uses `max_completion_tokens`, not `max_tokens`

Translate at payload-build time. All other providers (including OpenAI-compatible ones) use `max_tokens`.

### 4. OpenAI reasoning models are silent during thinking

When `reasoning_effort` is set, the stream hangs until the answer starts. `OpenAIEngine.stream()` immediately yields a synthetic `Response(reasoning_content="Reasoning in progress…", synthetic=True)` so callers have something to display.

### 5. Google — `thoughtSignature` lives on `functionCall` parts

When thinking is enabled and a tool is called, the `thoughtSignature` is on the `functionCall` part, not the top-level thought block. Capture it into `ToolCall.thought_signature` and echo it back on every subsequent assistant turn.

### 6. Google — no tool call IDs

Use the function name as the ID for correlation throughout the Google provider.

### 7. Google streaming — `?alt=sse` required

The endpoint is `streamGenerateContent?alt=sse`. Without it, the response is a JSON array.

### 8. System-only input

Anthropic and Google reject requests with no non-system messages. Promote the system message to `Role.USER`.

### 9. Anthropic thinking budget

Requires `1024 ≤ thinking_budget < max_tokens`. Default: `max(1024, max_tokens - 1024)`.

### 10. Connection pooling

`factory.py` maintains one `httpx.AsyncClient` per `(event_loop_id, base_url)`. Always default to `client or httpx.AsyncClient()` so the engine is usable standalone in tests.
