Metadata-Version: 2.4
Name: blazen
Version: 0.5.0
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)
Classifier: Operating System :: OS Independent
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Summary: Python bindings for the Blazen workflow engine
Author-email: Zach Handley <zachhandley@gmail.com>
License: MPL-2.0
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM

# Blazen

**Event-driven AI workflow engine, powered by Rust.**

[![PyPI](https://img.shields.io/pypi/v/blazen)](https://pypi.org/project/blazen/)
[![Python](https://img.shields.io/pypi/pyversions/blazen)](https://pypi.org/project/blazen/)
[![License: MPL-2.0](https://img.shields.io/badge/license-MPL--2.0-blue)](https://github.com/ZachHandley/Blazen/blob/main/LICENSE)

Blazen lets you build multi-step AI workflows as composable, event-driven graphs. Define steps with a decorator, wire them together with typed events, and run everything on a native Rust engine with async Python bindings.

## Installation

```bash
# Recommended
uv add blazen

# Or with pip
pip install blazen
```

Requires Python 3.10+.

## Quick Start

```python
import asyncio
from blazen import Workflow, step, Event, StartEvent, StopEvent, Context

class GreetEvent(Event):
    name: str

@step
async def parse(ctx: Context, ev: Event):
    return GreetEvent(name=ev.name)

@step
async def greet(ctx: Context, ev: GreetEvent):
    return StopEvent(result={"greeting": f"Hello, {ev.name}!"})

async def main():
    wf = Workflow("hello", [parse, greet])
    handler = await wf.run(name="Blazen")
    result = await handler.result()
    print(result.result)  # {"greeting": "Hello, Blazen!"}

asyncio.run(main())
```

### How it works

- **`class GreetEvent(Event)`** -- Subclassing `Event` auto-sets `event_type` to the class name (`"GreetEvent"`). Annotations like `name: str` are for documentation only; at runtime all keyword arguments are stored as JSON.
- **`@step` reads type annotations** -- `ev: GreetEvent` on a step function automatically sets `accepts=["GreetEvent"]`. The step will only receive events of that type.
- **`@step` with no type hint or `ev: Event`** -- defaults to accepting `StartEvent` (the event emitted by `wf.run()`).
- **`ev.name`** -- Direct attribute access on events. No need for `ev.to_dict()["name"]`.
- **`wf.run(name="Blazen")`** -- Keyword arguments become the `StartEvent` payload. Steps that accept `StartEvent` receive an event where `ev.name == "Blazen"`.
- **`result.result`** -- preserves `is`-identity for non-JSON Python objects. You can pass class instances, Pydantic models, and even live DB connections through `StopEvent.result` and get the *same* object back on the other side.

## Multi-Step Workflows

Chain steps together using custom event subclasses. Each step declares which events it accepts via its type annotation.

```python
import asyncio
from blazen import Workflow, step, Event, StopEvent, Context

class FetchedEvent(Event):
    text: str
    source: str

class AnalyzedEvent(Event):
    summary: str

@step
async def fetch(ctx: Context, ev: Event):
    # ev is a StartEvent with url=...
    text = f"Content from {ev.url}"
    return FetchedEvent(text=text, source=ev.url)

@step
async def analyze(ctx: Context, ev: FetchedEvent):
    summary = f"Analysis of: {ev.text}"
    return AnalyzedEvent(summary=summary)

@step
async def report(ctx: Context, ev: AnalyzedEvent):
    return StopEvent(result={"summary": ev.summary})

async def main():
    wf = Workflow("pipeline", [fetch, analyze, report])
    handler = await wf.run(url="https://example.com")
    result = await handler.result()
    print(result.result)  # {"summary": "Analysis of: Content from https://example.com"}

asyncio.run(main())
```

## Event Streaming

Stream intermediate events from a running workflow in real time using `ctx.write_event_to_stream()`.

```python
import asyncio
from blazen import Workflow, step, Event, StopEvent, Context

class ProgressEvent(Event):
    step_num: int
    message: str

@step
async def work(ctx: Context, ev: Event):
    for i in range(3):
        ctx.write_event_to_stream(ProgressEvent(step_num=i, message=f"Processing {i}"))
    return StopEvent(result="done")

async def main():
    wf = Workflow("streamer", [work])
    handler = await wf.run()

    async for event in handler.stream_events():
        print(event.event_type, event.step_num, event.message)

    result = await handler.result()
    print(result.result)  # "done"

asyncio.run(main())
```

`write_event_to_stream()` publishes to an external broadcast stream. Consumers read it with `async for event in handler.stream_events()`. These events are **not** routed through the step graph -- they are for external observation only.

## LLM Integration

Blazen includes a built-in multi-provider LLM client. All providers share the same `CompletionModel` / `ChatMessage` interface. Responses are returned as typed `CompletionResponse` objects.

### ChatMessage, Role, and CompletionResponse

```python
import os
from blazen import CompletionModel, ChatMessage, Role, CompletionResponse, ProviderOptions

model = CompletionModel.openrouter(options=ProviderOptions(api_key=os.environ["OPENROUTER_API_KEY"], model="openai/gpt-4o"))
response: CompletionResponse = await model.complete([
    ChatMessage.system("You are helpful."),
    ChatMessage.user("What is 2+2?"),
], temperature=0.7, max_tokens=256)

# Typed attribute access
print(response.content)        # "4"
print(response.model)          # model name used
print(response.finish_reason)  # "stop", "tool_calls", etc.
print(response.tool_calls)     # list[ToolCall] or None
print(response.usage)          # TokenUsage with .prompt_tokens, .completion_tokens, .total_tokens

# Dict-style access also works for backwards compatibility
print(response["content"])
```

### Role Enum

```python
from blazen import Role

Role.SYSTEM     # "system"
Role.USER       # "user"
Role.ASSISTANT  # "assistant"
Role.TOOL       # "tool"

# Use with ChatMessage constructor
msg = ChatMessage(role=Role.USER, content="Hello")
```

### Multimodal Messages

Send images alongside text using multimodal factory methods:

```python
from blazen import ChatMessage, ContentPart

# Image from URL
msg = ChatMessage.user_image_url("https://example.com/photo.jpg", "What's in this image?")

# Image from base64
msg = ChatMessage.user_image_base64(base64_data, "image/png", "Describe this.")

# Multiple content parts
msg = ChatMessage.user_parts([
    ContentPart.text(text="Compare these two images:"),
    ContentPart.image_url(url="https://example.com/a.jpg", media_type="image/jpeg"),
    ContentPart.image_url(url="https://example.com/b.jpg", media_type="image/jpeg"),
])
```

### Media Sources

For APIs that take generic media inputs (vision, OCR, audio-paired pipelines), use `ImageSource` directly or its alias `MediaSource`. The alias is provided so callers can spell intent at the call site without changing the underlying type.

```python
from blazen import ImageSource, MediaSource  # MediaSource is ImageSource

src1 = ImageSource.url("https://example.com/photo.jpg")
src2 = MediaSource.path("/tmp/scan.png")        # same class, ergonomic alias
src3 = MediaSource.base64(b64_bytes, "image/png")
```

### Supported Providers

| Provider | Constructor | Default Model |
|---|---|---|
| OpenAI | `CompletionModel.openai(options=ProviderOptions(api_key=key, model="gpt-4o"))` | `gpt-4o` |
| Anthropic | `CompletionModel.anthropic(options=ProviderOptions(api_key=key, model="claude-sonnet-4-20250514"))` | `claude-sonnet-4-20250514` |
| Google Gemini | `CompletionModel.gemini(options=ProviderOptions(api_key=key, model="gemini-2.0-flash"))` | `gemini-2.0-flash` |
| Azure OpenAI | `CompletionModel.azure(options=AzureOptions(api_key=key, resource_name="...", deployment_name="..."))` | (deployment) |
| OpenRouter | `CompletionModel.openrouter(options=ProviderOptions(api_key=key, model="..."))` | -- |
| Groq | `CompletionModel.groq(options=ProviderOptions(api_key=key, model="..."))` | -- |
| Together AI | `CompletionModel.together(options=ProviderOptions(api_key=key, model="..."))` | -- |
| Mistral | `CompletionModel.mistral(options=ProviderOptions(api_key=key, model="..."))` | -- |
| DeepSeek | `CompletionModel.deepseek(options=ProviderOptions(api_key=key, model="..."))` | -- |
| Fireworks | `CompletionModel.fireworks(options=ProviderOptions(api_key=key, model="..."))` | -- |
| Perplexity | `CompletionModel.perplexity(options=ProviderOptions(api_key=key, model="..."))` | -- |
| xAI (Grok) | `CompletionModel.xai(options=ProviderOptions(api_key=key, model="..."))` | -- |
| Cohere | `CompletionModel.cohere(options=ProviderOptions(api_key=key, model="..."))` | -- |
| AWS Bedrock | `CompletionModel.bedrock(options=BedrockOptions(api_key=key, region="...", model="..."))` | -- |
| fal.ai | `CompletionModel.fal(options=FalOptions(api_key=key, model="..."))` | -- |

### Using LLMs in Workflows

```python
import os
from blazen import Workflow, step, Event, StopEvent, Context, CompletionModel, ChatMessage, ProviderOptions

class AnswerEvent(Event):
    answer: str

@step
async def ask_llm(ctx: Context, ev: Event):
    model = CompletionModel.anthropic(options=ProviderOptions(api_key=os.environ["ANTHROPIC_API_KEY"], model="claude-sonnet-4-20250514"))
    response = await model.complete([
        ChatMessage.system("Answer concisely."),
        ChatMessage.user(ev.prompt),
    ], max_tokens=256)
    return AnswerEvent(answer=response.content)  # typed attribute access

@step
async def format_answer(ctx: Context, ev: AnswerEvent):
    return StopEvent(result={"answer": ev.answer})

async def main():
    wf = Workflow("llm-pipeline", [ask_llm, format_answer])
    handler = await wf.run(prompt="Explain gravity in one sentence.")
    result = await handler.result()
    print(result.result)
```

## Local Inference

Blazen ships first-class bindings for several local-inference backends. They share a common shape: build an engine, call `generate(...)` for one-shot output, or `stream(...)` for an async iterator of typed chunks. Each backend has its own message and result types so you can keep them straight in mixed pipelines.

### mistral.rs

```python
from blazen import ChatMessageInput, ChatRole, InferenceResult, InferenceChunkStream

messages = [
    ChatMessageInput(role=ChatRole.SYSTEM, content="You are concise."),
    ChatMessageInput(role=ChatRole.USER, content="Explain entropy."),
]

result: InferenceResult = await engine.generate(messages, max_tokens=256)
print(result.text)

stream: InferenceChunkStream = await engine.stream(messages)
async for chunk in stream:
    if chunk.delta:
        print(chunk.delta, end="", flush=True)
    if chunk.reasoning_delta:
        ...  # optional reasoning trace
    if chunk.tool_calls:
        for call in chunk.tool_calls:  # list[InferenceToolCall]
            ...
    if chunk.finish_reason:
        break
```

Vision-capable models accept `InferenceImage` parts built from `InferenceImageSource` (URL, path, or base64). `InferenceUsage` is attached to both `InferenceResult` and the final chunk.

### llama.cpp

Symmetric API with its own type family so the two backends never alias:

```python
from blazen import (
    LlamaCppChatMessageInput,
    LlamaCppChatRole,
    LlamaCppInferenceResult,
    LlamaCppInferenceChunkStream,
)

messages = [
    LlamaCppChatMessageInput(role=LlamaCppChatRole.USER, content="Hi!"),
]

result: LlamaCppInferenceResult = await engine.generate(messages)
print(result.text, result.usage)  # usage is LlamaCppInferenceUsage

stream: LlamaCppInferenceChunkStream = await engine.stream(messages)
async for chunk in stream:  # LlamaCppInferenceChunk
    if chunk.delta:
        print(chunk.delta, end="", flush=True)
```

### candle

Candle returns a `CandleInferenceResult` from its non-streaming path:

```python
from blazen import CandleInferenceResult

result: CandleInferenceResult = await engine.generate(prompt="Once upon a time", max_tokens=128)
print(result.text)
```

### Progress Reporting

Long downloads and model loads accept a progress callback. Subclass `ProgressCallback` and override `on_progress`. The default implementation is a no-op, so partial overrides are safe.

```python
from blazen import ProgressCallback

class TqdmProgress(ProgressCallback):
    def __init__(self):
        super().__init__()
        self._bar = None

    def on_progress(self, downloaded: int, total: int | None) -> None:
        # total is None when the upstream doesn't report Content-Length
        if total is not None:
            pct = 100.0 * downloaded / total
            print(f"\r{pct:5.1f}%  {downloaded}/{total} bytes", end="", flush=True)
        else:
            print(f"\r{downloaded} bytes", end="", flush=True)

# Pass instances to APIs that accept callbacks (model-cache download paths, etc.)
await cache.download(model_id="...", progress=TqdmProgress())
```

## Telemetry

Blazen exposes opt-in telemetry initializers. Each one is gated behind a Cargo feature on the underlying wheel (`langfuse`, `otlp`, `prometheus`); calling an initializer for a feature that wasn't compiled in raises an `UnsupportedError`.

### Langfuse

```python
from blazen import LangfuseConfig, init_langfuse

init_langfuse(LangfuseConfig(public_key="pk-...", secret_key="sk-...", host="https://cloud.langfuse.com"))
```

### OpenTelemetry (OTLP)

```python
from blazen import OtlpConfig, init_otlp

init_otlp(OtlpConfig(endpoint="http://localhost:4317", service_name="my-app"))
```

### Prometheus

```python
from blazen import init_prometheus

init_prometheus(9090)  # exposes /metrics on the given port
```

## Branching / Fan-Out

Return a list of events from a step to dispatch multiple events simultaneously. Each event is routed independently to steps that accept its type.

```python
from blazen import Workflow, step, Event, StopEvent, Context

class TaskEvent(Event):
    task_id: int
    payload: str

@step
async def fan_out(ctx: Context, ev: Event):
    return [
        TaskEvent(task_id=1, payload="first"),
        TaskEvent(task_id=2, payload="second"),
        TaskEvent(task_id=3, payload="third"),
    ]

@step
async def process_task(ctx: Context, ev: TaskEvent):
    # Called once per TaskEvent
    return StopEvent(result={"task_id": ev.task_id, "done": True})
```

## Side-Effect Steps

A step can return `None` and use `ctx.send_event()` to route events through the internal step graph without returning them. This is useful for steps that perform side effects (logging, saving state) before forwarding.

```python
from blazen import Workflow, step, Event, StopEvent, Context

class ProcessedEvent(Event):
    data: str

@step
async def log_and_forward(ctx: Context, ev: Event):
    ctx.set("received_at", "2025-01-01T00:00:00Z")
    ctx.send_event(ProcessedEvent(data=ev.payload))
    return None  # no direct return -- event sent via ctx

@step
async def finish(ctx: Context, ev: ProcessedEvent):
    received = ctx.get("received_at")
    return StopEvent(result={"data": ev.data, "received_at": received})
```

`ctx.send_event()` routes the event through the internal step registry (to steps whose `accepts` matches the event type). This is different from `ctx.write_event_to_stream()` which publishes to the external broadcast stream.

## Pause and Resume

Snapshot a running workflow and resume it later -- useful for long-running processes, human-in-the-loop patterns, or persisting state across restarts.

```python
# Pause: signal pause, then capture workflow state as JSON
handler = await wf.run(prompt="Hello")
handler.pause()
snapshot_json = await handler.snapshot()
# Save snapshot_json to disk, database, etc.

# Resume: restore from snapshot with the same steps
handler = await Workflow.resume(snapshot_json, [step1, step2])
await handler.resume_in_place()
result = await handler.result()
```

> **Note on `ctx.session` and pause/resume.** Values in `ctx.session` are live references and are deliberately **excluded** from snapshots. If you store live objects there and then call `handler.snapshot()`, the workflow's `session_pause_policy` decides what happens: the default (`pickle_or_error`) attempts to pickle each entry into the snapshot and raises a clear error if any entry can't be serialised. For workflows that explicitly want ephemeral runs, use `ctx.state` for anything that must survive pause/resume, and `ctx.session` for everything else.

## Errors

All Blazen-raised exceptions inherit from `BlazenError`, so a single `except BlazenError` catches everything the SDK throws while leaving unrelated `Exception`s to propagate. Catch narrower bases when you want to react differently per category.

### Base classes

| Class | Raised when |
|---|---|
| `BlazenError` | Root of the hierarchy. |
| `AuthError` | Missing/invalid credentials. |
| `RateLimitError` | Provider rate-limited the request. Inspect `retry_after_ms`. |
| `TimeoutError` | Request or workflow exceeded its deadline. |
| `ValidationError` | Bad input shape (events, options, snapshots). |
| `ContentPolicyError` | Provider refused the prompt or output for policy reasons. |
| `ProviderError` | Generic upstream/provider failure. Base for backend-specific errors. |
| `UnsupportedError` | Feature not compiled into this wheel (e.g. telemetry without the feature). |
| `ComputeError` | Local-inference compute failure (CUDA OOM, kernel error, etc.). |
| `MediaError` | Decode/encode failure for image/audio inputs. |

### Per-backend `ProviderError` subclasses

These are feature-gated at runtime — they exist on `blazen` but are only raised when the corresponding backend is bundled in the wheel.

| Class | Backend |
|---|---|
| `LlamaCppError` | llama.cpp |
| `CandleLlmError` | candle (LLM) |
| `CandleEmbedError` | candle (embeddings) |
| `MistralRsError` | mistral.rs |
| `WhisperError` | whisper.cpp |
| `PiperError` | piper |
| `DiffusionError` | diffusion (image generation) |
| `FastEmbedError` | fastembed |
| `TractError` | tract |

### Structured attributes

`ProviderError` (and every per-backend subclass) carries structured fields you can read in `except` blocks:

- `provider` — short provider name (`"openai"`, `"llama_cpp"`, ...)
- `status` — HTTP status code if applicable, else `None`
- `endpoint` — request URL/route if applicable
- `request_id` — provider-supplied request id if available
- `detail` — human-readable detail string
- `raw_body` — raw response body as `bytes` if captured
- `retry_after_ms` — milliseconds the provider asked you to wait (mirrored on `RateLimitError`)

```python
from blazen import (
    BlazenError, AuthError, RateLimitError, ProviderError,
    LlamaCppError, MistralRsError, ContentPolicyError,
)

try:
    response = await model.complete(messages)
except AuthError as e:
    print("bad key:", e)
except RateLimitError as e:
    print(f"slow down; retry in {e.retry_after_ms} ms")
except ContentPolicyError:
    print("provider refused the prompt")
except (LlamaCppError, MistralRsError) as e:
    # Local-inference specific handling
    print(f"local backend {e.provider} failed: {e.detail}")
except ProviderError as e:
    print(f"upstream {e.provider} returned {e.status}: {e.detail}")
except BlazenError:
    raise
```

## Context API

Steps share state through the `Context` object. Every method on `Context` is **synchronous** -- no `await` needed.

Values are stored using a 4-tier dispatch:

1. `bytes` / `bytearray` -- raw binary (survives snapshots)
2. JSON-serializable (`dict`, `list`, `str`, `int`, `float`, `bool`, `None`) -- JSON (survives snapshots)
3. Picklable objects (Pydantic models, dataclasses, etc.) -- pickled automatically (survives snapshots)
4. Unpicklable objects (DB connections, file handles, sockets) -- live in-process reference (same-process only, excluded from snapshots)

`ctx.get` returns the original Python type for all four tiers.

| Method | Description |
|---|---|
| `ctx.set(key, value)` | Store a JSON-serializable value. |
| `ctx.get(key)` | Retrieve a value (returns `None` if missing). |
| `ctx.set_bytes(key, data)` | Store raw binary data (bytes). No serialization requirement. |
| `ctx.get_bytes(key)` | Retrieve raw binary data (returns `None` if missing). |
| `ctx.send_event(event)` | Route an event through the internal step graph. |
| `ctx.write_event_to_stream(event)` | Publish an event to the external broadcast stream. |
| `ctx.run_id()` | Get the UUID string for the current workflow run. |

```python
@step
async def example(ctx: Context, ev: Event):
    ctx.set("counter", 42)              # synchronous
    val = ctx.get("counter")            # synchronous, returns 42
    run = ctx.run_id()                  # synchronous, returns UUID string
    ctx.send_event(SomeEvent(x=1))      # synchronous, routes internally
    ctx.write_event_to_stream(SomeEvent(x=1))  # synchronous, broadcasts externally
    return None
```

### State vs Session namespaces

Alongside the smart-routing `ctx.set` / `ctx.get` shortcuts, `Context` exposes two explicit namespaces so you can make intent clear at the call site:

- **`ctx.state`** -- persistable values (survives `pause()` / `resume()` and checkpoint stores). Routes through the same 4-tier dispatch as `ctx.set`.
- **`ctx.session`** -- live in-process references. Identity is preserved within a single workflow run -- `ctx.session["conn"]` returns the *same* Python object across steps. Deliberately excluded from snapshots.

```python
import sqlite3
from blazen import step, Context, StartEvent, StopEvent

@step
async def setup(ctx: Context, ev: StartEvent) -> StopEvent:
    # Persistable JSON state
    ctx.state["input_path"] = "data.csv"
    ctx.state["row_count"] = 0

    # Live in-process references -- identity preserved
    conn = sqlite3.connect(":memory:")
    ctx.session["db"] = conn

    # Same object on every access
    assert ctx.session["db"] is conn

    return StopEvent(result={"ok": True})
```

Both namespaces support the dict protocol (`__setitem__`, `__getitem__`, `__contains__`, `keys`).

### Binary Storage

`set_bytes` / `get_bytes` let you store raw binary data with no serialization requirement. Any type can be stored by converting to bytes yourself (e.g., pickle, msgpack, protobuf). Binary data persists through pause/resume/checkpoint.

```python
import pickle

@step
async def store_model(ctx: Context, ev: Event):
    # Store arbitrary data as bytes
    model_data = pickle.dumps({"weights": [1.0, 2.0, 3.0]})
    ctx.set_bytes("model", model_data)
    return NextEvent()

@step
async def load_model(ctx: Context, ev: NextEvent):
    raw = ctx.get_bytes("model")
    model = pickle.loads(raw)
    return StopEvent(result=model)
```

## API Reference

| Class / Function | Description |
|---|---|
| `Event(event_type, **kwargs)` | Base event class. Subclass it: `class MyEvent(Event)` auto-sets `event_type` to class name. Direct attribute access: `ev.name`. Also has `ev.to_dict()` and `ev.event_type`. |
| `StartEvent(**kwargs)` | Emitted by `wf.run(**kwargs)`. Steps with `ev: Event` or no annotation accept this. |
| `StopEvent(**kwargs)` | Terminates the workflow. Access the result via `result.result`. |
| `Context` | Shared typed storage, event emission, and stream publishing. Use `ctx.state` for persistable values, `ctx.session` for live in-process references. Smart-routing `ctx.set` / `ctx.get` shortcuts still work. Methods: `set`, `get`, `set_bytes`, `get_bytes`, `send_event`, `write_event_to_stream`, `run_id`. All synchronous. |
| `@step` | Decorator for workflow steps. Infers `accepts` from the `ev` parameter type annotation. Supports `async def` and plain `def`. May also be called as `@step(accepts=[...], emits=[...], max_concurrency=N)`. |
| `Workflow(name, steps, timeout=None)` | Validated workflow graph. `timeout` is in seconds (default: 300). |
| `await wf.run(**kwargs)` | Execute the workflow. Returns a `WorkflowHandler`. Kwargs become the `StartEvent` payload. |
| `WorkflowHandler` | Handle to a running workflow: `await handler.result()`, `async for ev in handler.stream_events()`, `handler.pause()`, `await handler.snapshot()`, `await handler.resume_in_place()`, `await handler.respond_to_input(request_id, response)`, `await handler.abort()`. |
| `await Workflow.resume(snapshot_json, steps, timeout=None)` | Resume a paused workflow from a JSON snapshot. Returns a `WorkflowHandler`. |
| `CompletionModel.<provider>(options=ProviderOptions(...))` | LLM provider. Pass a typed options struct (`ProviderOptions`, `AzureOptions`, `BedrockOptions`, or `FalOptions`) via `options=`. Providers: `openai`, `anthropic`, `gemini`, `azure`, `openrouter`, `groq`, `together`, `mistral`, `deepseek`, `fireworks`, `perplexity`, `xai`, `cohere`, `bedrock`, `fal`. |
| `await model.complete(messages, ...)` | Chat completion. Returns a typed `CompletionResponse`. |
| `ChatMessage(role=, content=, parts=)` | Chat message. Constructor with keyword args (role defaults to `"user"`). Static factories: `.system()`, `.user()`, `.assistant()`, `.tool()`, `.user_image_url()`, `.user_image_base64()`, `.user_parts()`. |
| `Role` | Role enum: `Role.SYSTEM`, `Role.USER`, `Role.ASSISTANT`, `Role.TOOL`. |
| `CompletionResponse` | Typed response: `.content`, `.model`, `.finish_reason`, `.tool_calls`, `.usage`. Also supports dict-style `response["content"]`. |
| `ToolCall` | Tool call object: `.id`, `.name`, `.arguments`. |
| `TokenUsage` | Token usage: `.prompt_tokens`, `.completion_tokens`, `.total_tokens`. |
| `ContentPart` | Multimodal content part: `.text(text=...)`, `.image_url(url=..., media_type=...)`, `.image_base64(data=..., media_type=...)`. |

## Documentation

Full docs: [blazen.dev](https://blazen.dev)

Source: [github.com/ZachHandley/Blazen](https://github.com/ZachHandley/Blazen)

## License

MPL-2.0 -- see [LICENSE](https://github.com/ZachHandley/Blazen/blob/main/LICENSE) for details.

Author: Zach Handley

