Metadata-Version: 2.4
Name: witness-sdk
Version: 0.1.1
Summary: The Python SDK for the Witness Application.
Project-URL: Homepage, https://witness.sh
Project-URL: Source, https://github.com/witness-sdk/python-sdk
Project-URL: Issues, https://github.com/witness-sdk/python-sdk/issues
Author-email: Christopher Law <christopher-law@live.com>
License-Expression: MIT
License-File: LICENSE
Keywords: inference,llm,observability,telemetry,tracing
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: System :: Monitoring
Requires-Python: >=3.10
Requires-Dist: httpx>=0.24
Provides-Extra: dev
Requires-Dist: jsonschema>=4; extra == 'dev'
Requires-Dist: pytest>=7; extra == 'dev'
Description-Content-Type: text/markdown

# Witness Python SDK

The Python SDK for the Witness Application.

Repository: [github.com/witness-sdk/python-sdk](https://github.com/witness-sdk/python-sdk)

Witness captures inference telemetry on a side channel: events are enqueued
in-process and delivered by a background thread to
[witness.sh](https://witness.sh). Your inference hot path is never blocked,
and delivery failures never raise into your application — if ingestion is
down, your models still answer.

## Install

```bash
pip install witness-sdk
```

The import name is `witness`. Requires Python 3.10+. The only runtime
dependency is `httpx`.

## Quickstart

```python
import witness
from openai import OpenAI

witness.init(api_key="w_live_...", project="prod-chat")
witness.watch(OpenAI)

client = OpenAI(
    api_key="gsk_...",
    base_url="https://api.groq.com/openai/v1",
)

response = client.chat.completions.create(
    model="llama-3.3-70b-versatile",
    messages=[{"role": "user", "content": "Hello"}],
)

# inference → your provider, direct
# telemetry → witness.sh, async
```

`witness.watch()` patches an OpenAI-compatible client class once at startup.
Every `chat.completions.create`, `embeddings.create`, `messages.create`
(Anthropic-style), and streaming call on instances of that class is traced
automatically. Provider is inferred from the client `base_url` (Groq, OpenAI,
Together, etc.). Async clients are not supported yet — `watch(AsyncOpenAI)`
raises rather than recording wrong data.

Prefer explicit control? Use `@witness.log()` instead:

```python
@witness.log(provider="groq")
def chat(messages):
    return client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=messages,
    )
```

`witness.log()` wraps any callable that performs an inference call. It times
the call, extracts `model` and token usage from the response by duck-typing
(any OpenAI-compatible response shape works), and enqueues an event. The
wrapped function's return value and exceptions pass through untouched —
failed calls are recorded with `status: "error"` and re-raised.

Provider and model are read from the response when possible; decorator
keyword arguments (`provider=`, `model=`) fill in whatever the response
cannot answer.

## Opting out of telemetry

Skip reporting for specific calls:

```python
# Block scope — works with watch() and @witness.log()
with witness.ignore():
    client.chat.completions.create(model="...", messages=[...])

# Single call — works with both; stripped before your code / the provider sees it
client.chat.completions.create(model="...", messages=[...], witness_ignore=True)
```

To keep only a fraction of events on high-volume paths, set a sample rate:

```python
witness.init(api_key="...", project="...", sample_rate=0.1)  # keep ~10%
```

## Configuration

`witness.init()` accepts:

| Parameter | Default | Notes |
|-----------|---------|-------|
| `api_key` | `WITNESS_API_KEY` env var | Required (argument or env) |
| `project` | `WITNESS_PROJECT` env var | Required (argument or env) |
| `base_url` | `https://witness.sh/api/v1` | Override for staging / self-hosted |
| `queue_size` | `10000` | Bounded in-memory event queue |
| `sample_rate` | `1.0` | Fraction of calls to record (per-call decision) |

If no API key or project is configured, `init()` logs a warning and the SDK
stays **disabled**: instrumented code runs normally and nothing is sent.
Your observability layer never crashes your app over a missing env var.

`witness.flush(timeout=5.0)` blocks until queued events are delivered and
keeps the worker running — useful in notebooks and short scripts.
`witness.shutdown()` flushes and stops; an `atexit` handler is registered
automatically by `init()`.

## Privacy

Witness sends **metadata only**: provider, model, latency, token counts, and
status. Prompt and completion text are never transmitted.

## Prefork servers (gunicorn, uwsgi)

Background threads do not survive `fork()`. If you run a prefork server,
call `witness.init()` in a post-fork hook so each worker gets its own
transport:

```python
# gunicorn.conf.py
def post_fork(server, worker):
    import witness
    witness.init()  # reads WITNESS_API_KEY / WITNESS_PROJECT
```

## Wire contract

The SDK posts `{"events": [...]}` batch envelopes to
`POST {base_url}/events` with a `Bearer` API key. The full contract lives in
[`src/witness/schema/v1/event.json`](src/witness/schema/v1/event.json) and is
versioned via the `schema_version` field on every event.

## Roadmap

- Async `@witness.log` for coroutines
- Worker-side batching (the wire format already supports it)
- Time-to-last-token on streamed responses

## Development

```bash
python -m venv .venv && .venv/bin/pip install -e ".[dev]"
.venv/bin/python -m pytest tests/
```

## License

MIT
