Metadata-Version: 2.4
Name: currai
Version: 0.0.3
Summary: Currai observability SDK for Python
Author: Currai
License: MIT
Requires-Python: >=3.10
Requires-Dist: httpx>=0.27
Requires-Dist: typing-extensions>=4.10
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8; extra == 'dev'
Requires-Dist: respx>=0.21; extra == 'dev'
Description-Content-Type: text/markdown

# currai

The official Python SDK for [Currai](https://www.currai.app) — observability and tracing for LLM applications.

Instrument your AI app with a few lines of code to capture traces, generations, spans, token usage, and cost. Events are buffered and flushed in the background, so tracing adds negligible latency to your request path.

- 🪶 **Lightweight** — one dependency (`httpx`), no heavyweight runtime.
- ⚡ **Non-blocking** — events are batched and flushed on a background thread.
- 🧵 **Thread-safe** — share a single client across your whole process.
- 🔗 **Wire-compatible** with the [TypeScript SDK](../ts) — both speak the same JSON ingestion format against the same backend.

## Installation

```bash
pip install currai
```

Requires Python 3.10+.

> Using `uv`? `uv add currai`. For local development against this repo, see [Development](#development).

## Quickstart

Grab your API keys from the Currai dashboard and set them as environment variables:

```bash
export CURRAI_PUBLIC_KEY="pk-lf-..."
export CURRAI_SECRET_KEY="sk-lf-..."
```

Then instrument an LLM call:

```python
import os
from currai import Currai

currai = Currai(
    public_key=os.environ["CURRAI_PUBLIC_KEY"],
    secret_key=os.environ["CURRAI_SECRET_KEY"],
)

messages = [{"role": "user", "content": "What's the capital of Morocco?"}]

# A trace represents one logical unit of work (e.g. a chat turn or request).
trace = currai.trace(
    name="chat-turn",
    session_id="sess-1",
    user_id="user-1",
    input={"messages": messages},
    tags=["chatbot"],
)

# A generation captures a single LLM call nested under the trace.
generation = trace.generation(
    name="openai.chat.completions",
    model="gpt-4o-mini",
    input=messages,
    model_parameters={"temperature": 0.7},
)

completion = openai_client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages,
)
answer = completion.choices[0].message.content

generation.end(
    output=answer,
    usage={
        "input": completion.usage.prompt_tokens,
        "output": completion.usage.completion_tokens,
        "total": completion.usage.total_tokens,
        "unit": "TOKENS",
    },
)

trace.update(output=answer)

# Flush before short-lived processes (scripts, serverless handlers) exit.
currai.flush()
```

## Core concepts

The SDK models the same hierarchy as the Currai UI:

| Object         | Created via                                      | Represents                                                          |
| -------------- | ------------------------------------------------ | ------------------------------------------------------------------- |
| **Trace**      | `currai.trace(...)`                              | One end-to-end request or workflow. The root of everything.         |
| **Generation** | `trace.generation(...)` / `span.generation(...)` | A single LLM call, with model, input/output, token usage, and cost. |
| **Span**       | `trace.span(...)` / `span.span(...)`             | Any other unit of work (retrieval, a tool call, a sub-step).        |
| **Event**      | `trace.event(...)` / `span.event(...)`           | A point-in-time marker with no duration.                            |

Generations, spans, and events can be nested by creating them from a span, which automatically sets `parent_observation_id`:

```python
trace = currai.trace(name="rag-query", input={"question": question})

retrieval = trace.span(name="retrieve-docs", input={"query": question})
# ... do retrieval ...
retrieval.end(output={"doc_ids": doc_ids})

# Nested under the retrieval span.
embed = retrieval.generation(name="embed", model="text-embedding-3-small", input=question)
embed.end(usage={"input": 12, "unit": "TOKENS"})

trace.update(output=answer)
currai.flush()
```

## API reference

### `Currai(...)`

Constructor options (all keyword-only):

| Option               | Type                              | Default              | Description                                          |
| -------------------- | --------------------------------- | -------------------- | ---------------------------------------------------- |
| `public_key`         | `str`                             | **required**         | Currai API public key (`pk-lf-…`).                   |
| `secret_key`         | `str`                             | **required**         | Currai API secret key (`sk-lf-…`).                   |
| `base_url`           | `str`                             | `https://www.currai.app` | Currai instance URL (set this for self-hosted).      |
| `enabled`            | `bool`                            | `True`               | When `False`, no events are buffered or sent.        |
| `flush_at`           | `int`                             | `15`                 | Auto-flush once the buffer reaches this many events. |
| `flush_interval_ms`  | `int`                             | `10000`              | Background flush interval (ms). `0` disables it.     |
| `request_timeout_ms` | `int`                             | `10000`              | Per-request HTTP timeout (ms).                       |
| `on_error`           | `Callable[[BaseException], None]` | logs a warning       | Sink for network / ingestion errors.                 |

Client methods:

| Method             | Description                                                                   |
| ------------------ | ----------------------------------------------------------------------------- |
| `trace(**kwargs)`  | Create a new `CurraiTrace`. Returns the trace object.                         |
| `flush()`          | Synchronously send all buffered events. Blocks until the buffer is empty.     |
| `flush_async()`    | `async` variant of `flush()` (runs the drain on a worker thread).             |
| `shutdown()`       | Stop the background timer, flush remaining events, and close the HTTP client. |
| `shutdown_async()` | `async` variant of `shutdown()`.                                              |

### `trace.update(...)` / `trace.generation(...)` / `trace.span(...)` / `trace.event(...)`

`trace.update(...)` patches trace fields after creation (commonly `output`). The child factories create nested observations.

Trace fields: `name`, `input`, `output`, `session_id`, `user_id`, `metadata`, `environment`, `release`, `version`, `public`, `tags`, `external_id`, `timestamp`.

### Generations

`generation.update(...)` patches an in-flight generation; `generation.end(...)` closes it and stamps `end_time` (defaults to now).

Fields: `name`, `model`, `model_parameters`, `input`, `output`, `metadata`, `usage`, `usage_details`, `cost_details`, `level`, `status_message`, `completion_start_time`, `prompt_name`, `prompt_version`, `version`.

### Spans

`span.update(...)` patches an in-flight span; `span.end(...)` closes it and stamps `end_time` (defaults to now).

Fields: `name`, `input`, `output`, `metadata`, `level`, `status_message`, `version`, `start_time`, `end_time`.

### Usage and cost

The `usage` dict accepts token/cost counts. `unit` is one of `TOKENS`, `CHARACTERS`, `MILLISECONDS`, `SECONDS`, `REQUESTS`, `IMAGES`:

```python
generation.end(
    usage={
        "input": 120,
        "output": 48,
        "total": 168,
        "unit": "TOKENS",
        "inputCost": 0.00012,
        "outputCost": 0.00010,
        "totalCost": 0.00022,
    },
)
```

### Observation levels

`level` can be `DEBUG`, `DEFAULT`, `WARNING`, or `ERROR` — pair it with `status_message` to flag failures:

```python
generation.end(level="ERROR", status_message="rate limited by provider")
```

## Flushing & lifecycle

Events are flushed in three ways:

1. **By size** — once the buffer hits `flush_at` events (default 15).
2. **By time** — every `flush_interval_ms` (default 10s) on a background thread.
3. **Manually** — `currai.flush()` / `await currai.flush_async()`.

An `atexit` handler drains the queue on normal interpreter shutdown. Still, **call `flush()` (or `shutdown()`) explicitly** in short-lived processes — scripts, serverless handlers, CLI tools — to guarantee delivery before the process exits.

In async apps, prefer the non-blocking variants so you don't tie up the event loop:

```python
await currai.flush_async()
# or, at process teardown:
await currai.shutdown_async()
```

## Error handling

The SDK never raises from the request path — buffering and sending happen out of band. Network failures and partial ingestion errors are routed to `on_error` (which logs a warning by default). Provide your own sink to integrate with your logging or alerting:

```python
import logging

logger = logging.getLogger("my-app")

currai = Currai(
    public_key=...,
    secret_key=...,
    on_error=lambda err: logger.error("currai ingestion failed: %s", err),
)
```

To disable tracing entirely (e.g. in tests or local dev) without touching your instrumentation code, pass `enabled=False`.

## Wire format

The SDK posts to `POST {base_url}/api/public/ingestion` with HTTP Basic auth
(`Basic base64(public_key:secret_key)`) and a JSON body of `{ "batch": IngestionEvent[] }`.

Event types: `trace-create`, `span-create`, `span-update`, `generation-create`, `generation-update`, `event-create`. The server responds with HTTP 207 and `{ "successes": [...], "errors": [...] }`.

This is byte-compatible with the TypeScript SDK — any server that accepts one accepts the other.

## Development

```bash
# From this directory
uv sync --extra dev      # install with dev dependencies
uv run pytest            # run the test suite
```

To consume the SDK from another package in this workspace, add a path source:

```toml
[tool.uv.sources]
currai = { path = "../../packages/sdk/python", editable = true }
```

## License

MIT
