Metadata-Version: 2.4
Name: axonpush
Version: 0.0.8
Summary: Python SDK for AxonPush — real-time event infrastructure for AI agent systems
Project-URL: Homepage, https://github.com/axonpush/python-sdk
Project-URL: Repository, https://github.com/axonpush/python-sdk
Project-URL: Issues, https://github.com/axonpush/python-sdk/issues
Project-URL: Changelog, https://github.com/axonpush/python-sdk/releases
Author: AxonPush
License-Expression: MIT
License-File: LICENSE
Keywords: ai-agents,anthropic,axonpush,crewai,deepagents,distributed-tracing,events,langchain,langgraph,multi-agent,observability,openai,real-time,sdk,sse,tracing,webhooks,websocket
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: Pydantic :: 2
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Monitoring
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: httpx-sse<1.0,>=0.4.0
Requires-Dist: httpx<1.0,>=0.25.0
Requires-Dist: pydantic<3.0,>=2.0
Provides-Extra: all
Requires-Dist: anthropic>=0.30.0; extra == 'all'
Requires-Dist: crewai>=0.50.0; (python_version >= '3.11') and extra == 'all'
Requires-Dist: deepagents>=0.1.0; (python_version >= '3.11') and extra == 'all'
Requires-Dist: langchain-core>=0.1.0; extra == 'all'
Requires-Dist: loguru>=0.7.0; extra == 'all'
Requires-Dist: openai-agents>=0.1.0; extra == 'all'
Requires-Dist: opentelemetry-api>=1.20.0; extra == 'all'
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == 'all'
Requires-Dist: python-socketio[asyncio-client]<6.0,>=5.10; extra == 'all'
Requires-Dist: rq>=2.0; extra == 'all'
Requires-Dist: structlog>=24.0.0; extra == 'all'
Provides-Extra: anthropic
Requires-Dist: anthropic>=0.30.0; extra == 'anthropic'
Provides-Extra: crewai
Requires-Dist: crewai>=0.50.0; (python_version >= '3.11') and extra == 'crewai'
Provides-Extra: deepagents
Requires-Dist: deepagents>=0.1.0; (python_version >= '3.11') and extra == 'deepagents'
Provides-Extra: dev
Requires-Dist: mypy>=1.10; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: respx>=0.21; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Provides-Extra: langchain
Requires-Dist: langchain-core>=0.1.0; extra == 'langchain'
Provides-Extra: loguru
Requires-Dist: loguru>=0.7.0; extra == 'loguru'
Provides-Extra: openai-agents
Requires-Dist: openai-agents>=0.1.0; extra == 'openai-agents'
Provides-Extra: otel
Requires-Dist: opentelemetry-api>=1.20.0; extra == 'otel'
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == 'otel'
Provides-Extra: rq
Requires-Dist: rq>=2.0; extra == 'rq'
Provides-Extra: structlog
Requires-Dist: structlog>=24.0.0; extra == 'structlog'
Provides-Extra: websocket
Requires-Dist: python-socketio[asyncio-client]<6.0,>=5.10; extra == 'websocket'
Description-Content-Type: text/markdown

# axonpush

Python SDK for [AxonPush](https://axonpush.xyz) — real-time event infrastructure for AI agent systems.

Publish, subscribe, trace, and deliver agent events with sub-100ms latency. Drop-in integrations for LangChain, OpenAI Agents SDK, Claude/Anthropic, CrewAI, and the Python observability stack (stdlib `logging`, Loguru, structlog, OpenTelemetry).

## Install

```bash
pip install axonpush
```

With framework integrations:

```bash
pip install axonpush[langchain]       # LangChain/LangGraph
pip install axonpush[openai-agents]   # OpenAI Agents SDK
pip install axonpush[anthropic]       # Claude/Anthropic
pip install axonpush[crewai]          # CrewAI
pip install axonpush[deepagents]      # LangChain Deep Agents
pip install axonpush[rq]              # Redis Queue backend (python-rq)
```

With observability integrations:

```bash
pip install axonpush                  # stdlib logging — no extra deps
pip install axonpush[loguru]          # Loguru sink
pip install axonpush[structlog]       # structlog processor
pip install axonpush[otel]            # OpenTelemetry SpanExporter
pip install axonpush[all]             # Everything
```

## Quick Start

```python
from axonpush import AxonPush, EventType

with AxonPush(api_key="ak_...", tenant_id="1", environment="production") as client:
    # Publish an event
    event = client.events.publish(
        "web_search",
        {"query": "AI agent frameworks"},
        channel_id=1,
        agent_id="researcher",
        trace_id="tr_run_42",
        event_type=EventType.AGENT_TOOL_CALL_START,
    )
    # event.queued == True, event.id is None — publishes are async-ingested
    # by default. See "Response shape" below.

    # List events
    events = client.events.list(channel_id=1)

    # Get a trace summary
    summary = client.traces.get_summary("tr_run_42")
```

### Async

```python
from axonpush import AsyncAxonPush

async with AsyncAxonPush(api_key="ak_...", tenant_id="1", environment="production") as client:
    event = await client.events.publish(
        "web_search",
        {"query": "AI agents"},
        channel_id=1,
        agent_id="researcher",
        event_type="agent.tool_call.start",
    )
```

### Response shape

By default, `events.publish()` returns as soon as the server has queued the event — typically under 1&nbsp;ms. The returned `Event` carries `identifier`, `queued=True`, `created_at`, and the resolved `environment_id`, but **not** a DB-assigned `id` (`event.id` is `None`). Treat `event.identifier` and `event.trace_id` as the durable correlation keys. List endpoints and subscriptions return the fully-persisted shape (with `id`) once the event is written.

## Framework Integrations

### LangChain / LangGraph

```python
# Sync (default — background thread)
from axonpush import AxonPush
from axonpush.integrations.langchain import AxonPushCallbackHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushCallbackHandler(client, channel_id=1, agent_id="my-agent")
chain.invoke({"input": "..."}, config={"callbacks": [handler]})

# Async (default — fire-and-forget tasks, zero event-loop blocking)
from axonpush import AsyncAxonPush
from axonpush.integrations.langchain import get_langchain_handler

client = AsyncAxonPush(api_key="ak_...", tenant_id="1")
handler = get_langchain_handler(client, channel_id=1, agent_id="my-agent")
await chain.ainvoke({"input": "..."}, config={"callbacks": [handler]})
```

### OpenAI Agents SDK

Events are published asynchronously via fire-and-forget tasks by default — no event-loop blocking.

```python
from axonpush import AsyncAxonPush
from axonpush.integrations.openai_agents import AxonPushRunHooks

client = AsyncAxonPush(api_key="ak_...", tenant_id="1")
hooks = AxonPushRunHooks(client, channel_id=1)

result = await Runner.run(agent, input="...", hooks=hooks)
await hooks.flush()  # optional — drain pending publishes before exit
```

### Claude / Anthropic

```python
from axonpush import AxonPush
from axonpush.integrations.anthropic import AxonPushAnthropicTracer

client = AxonPush(api_key="ak_...", tenant_id="1")
tracer = AxonPushAnthropicTracer(client, channel_id=1)

# Wraps messages.create() — auto-emits events for tool_use, text, turns
response = tracer.create_message(
    anthropic_client,
    model="claude-sonnet-4-20250514",
    messages=[{"role": "user", "content": "Hello"}],
)
```

### CrewAI

```python
from axonpush import AxonPush
from axonpush.integrations.crewai import AxonPushCrewCallbacks

client = AxonPush(api_key="ak_...", tenant_id="1")
callbacks = AxonPushCrewCallbacks(client, channel_id=1)

callbacks.on_crew_start()
result = Crew(
    agents=[...],
    tasks=[...],
    step_callback=callbacks.on_step,
    task_callback=callbacks.on_task_complete,
).kickoff()
callbacks.on_crew_end(result)
```

## Publishing Modes

All integrations accept a `mode` parameter to control how events reach AxonPush:

| Mode | Backend | Best for |
|------|---------|----------|
| `"background"` (default) | In-process queue (sync) or `asyncio.create_task` (async) | Most apps — zero config |
| `"rq"` | Redis Queue ([python-rq](https://python-rq.org/)) | Durable delivery, serverless, high volume |
| `"sync"` | Direct HTTP call | Debugging, tests |

### Redis Queue mode

Offload event publishing to a separate worker process backed by Redis. Events survive app restarts and are retried on transient failures.

```bash
pip install axonpush[rq]
```

```python
from redis import Redis
from axonpush import AxonPush
from axonpush.integrations.langchain import AxonPushCallbackHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushCallbackHandler(
    client, channel_id=1,
    mode="rq",
    rq_options={"redis_conn": Redis(), "queue_name": "axonpush"},
)
chain.invoke({"input": "..."}, config={"callbacks": [handler]})
```

Start an rq worker to process the queue:

```bash
rq worker axonpush
```

## Environments

Tag every event with the environment it came from (`"production"`, `"staging"`, `"eval"`, or any string your team uses). AxonPush uses the tag server-side for isolation, filtering, and per-env quotas. The SDK forwards it as an `X-Axonpush-Environment` header on every request and threads it into the logging handler's OTel resource attributes.

### Constructor

```python
from axonpush import AxonPush

client = AxonPush(api_key="ak_...", tenant_id="1", environment="production")
```

If you omit `environment=`, the SDK auto-detects it from the first of these that's set: **`AXONPUSH_ENVIRONMENT`** → `SENTRY_ENVIRONMENT` → `APP_ENV` → `ENV`. That ordering means existing Sentry/12-factor setups work out of the box, and you can override with `AXONPUSH_ENVIRONMENT` when you need to.

### Per-call override

```python
client.events.publish(
    "rerun_eval",
    {"dataset": "v2"},
    channel_id=1,
    environment="eval",   # this event only — doesn't change the client default
)
```

### Temporary override with a context manager

Useful for isolating eval runs, backfills, or shadow traffic from your production event stream without constructing a second client:

```python
with client.environment("eval"):
    for row in dataset:
        client.events.publish("row_processed", {"id": row.id}, channel_id=1)
# outside the block: environment reverts to whatever the client was constructed with
```

## Logging & Observability

Ship logs and traces from your existing Python observability stack to AxonPush. All four integrations emit OpenTelemetry-shaped payloads, so the events line up with anything else you're already sending to an OTel-compatible backend.

> **Non-blocking by default (v0.0.7+).** Sync integrations use a bounded in-memory queue + daemon thread. Async integrations use `asyncio.create_task()` fire-and-forget with backpressure (max 1000 pending tasks). For durable delivery, use `mode="rq"` to offload publishing to a Redis-backed worker process. Call `handler.flush(timeout=)` or use `@flush_after_invocation(handler)` at known checkpoints (end of a Lambda invocation, end of a test) to guarantee delivery. Pass `mode="sync"` on any integration if you need blocking publishes (one-shot scripts, deterministic tests). Fork-safe via `os.register_at_fork` — Gunicorn `--preload` / Celery `--pool=prefork` workers get a fresh queue + thread after fork.

> **Self-recursion filter.** The stdlib `AxonPushLoggingHandler` installs a filter by default that drops records from `httpx`, `httpcore`, and the SDK's own `axonpush` logger. Without it, each publish would trigger an `httpx` INFO log ("HTTP Request: POST /event 201 Created") that would get re-shipped, creating an infinite loop. The filter is always-on and cannot be disabled; you can add more excluded prefixes via `exclude_loggers=[...]`.

### Stdlib `logging` (FastAPI, Flask, Django, …)

```python
import logging
from axonpush import AxonPush
from axonpush.integrations.logging_handler import AxonPushLoggingHandler

client = AxonPush(api_key="ak_...", tenant_id="1")
handler = AxonPushLoggingHandler(client=client, channel_id=1, service_name="my-api")

logging.getLogger().addHandler(handler)
logging.info("order created", extra={"order_id": 1234})
```

**Django** uses `LOGGING` dictConfig, which can't pass a pre-built client — so the handler also accepts credential kwargs (or reads `AXONPUSH_API_KEY` / `AXONPUSH_TENANT_ID` from the environment):

```python
# settings.py
LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "axonpush": {
            "class": "axonpush.integrations.logging_handler.AxonPushLoggingHandler",
            "channel_id": 1,
            "service_name": "my-django-app",
            "exclude_loggers": ["django.db.backends"],  # optional
        },
    },
    "root": {"handlers": ["axonpush"], "level": "INFO"},
}
```

**FastAPI / Flask** — construct the handler with a pre-built `client=` in your app startup and attach it to `logging.getLogger()` (or `app.logger` for Flask).

> **Uvicorn propagation trap (FastAPI/Starlette):** uvicorn's default `LOGGING_CONFIG` sets `uvicorn.propagate=False`, so records emitted on `logging.getLogger("uvicorn.error")` **never reach the root logger**. If you only attach the handler to root, your app's startup/request logs will be invisible to AxonPush. Also attach the handler to `uvicorn.error` directly:
>
> ```python
> logging.getLogger().addHandler(axonpush_handler)
> logging.getLogger("uvicorn.error").addHandler(axonpush_handler)
> # Optional: one event per HTTP request
> # logging.getLogger("uvicorn.access").addHandler(axonpush_handler)
> ```

### AWS Lambda / Google Cloud Functions / Azure Functions

Serverless containers are **frozen between invocations**, so the background worker thread can't drain the queue during the freeze. To guarantee delivery, call `handler.flush()` at the end of each invocation. The `@flush_after_invocation` decorator wraps your handler function and flushes in a `finally:` block:

```python
import os, logging
from axonpush import AxonPush
from axonpush.integrations.logging_handler import (
    AxonPushLoggingHandler,
    flush_after_invocation,
)

client = AxonPush(
    api_key=os.environ["AXONPUSH_API_KEY"],
    tenant_id=os.environ["AXONPUSH_TENANT_ID"],
)
handler = AxonPushLoggingHandler(client=client, channel_id=1, service_name="my-lambda")
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)

@flush_after_invocation(handler)
def lambda_handler(event, context):
    logging.info("processing event", extra={"event_id": event["id"]})
    return {"statusCode": 200}
```

Performance stays good: `emit()` is still O(microseconds) (just a queue enqueue), and `flush()` runs **once per invocation** at the end — not once per log call. The handler auto-detects Lambda / GCF / Azure Functions at construction time and logs a one-time reminder to use `flush_after_invocation`.

Pass `*handlers` to the decorator to flush multiple handlers in one wrap:

```python
@flush_after_invocation(logging_handler, otel_exporter, structlog_processor)
def lambda_handler(event, context):
    ...
```

### Loguru

```python
from loguru import logger
from axonpush import AxonPush
from axonpush.integrations.loguru import create_axonpush_loguru_sink

client = AxonPush(api_key="ak_...", tenant_id="1")
sink = create_axonpush_loguru_sink(client=client, channel_id=1, service_name="my-api")
logger.add(sink, serialize=True)  # serialize=True is required

logger.error("connection refused", user_id=42)
```

### structlog

```python
import structlog
from axonpush import AxonPush
from axonpush.integrations.structlog import axonpush_structlog_processor

client = AxonPush(api_key="ak_...", tenant_id="1")
forwarder = axonpush_structlog_processor(client=client, channel_id=1, service_name="my-api")

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        forwarder,  # non-destructive — composes with other processors
        structlog.processors.JSONRenderer(),
    ],
)
structlog.get_logger().error("downstream timeout", endpoint="/search")
```

### Print capture (stdout/stderr → AxonPush)

For AI agents that emit free-form output via `print()`. Patches `sys.stdout` / `sys.stderr` with a tee stream that still writes to the original console.

```python
from axonpush import AxonPush
from axonpush.integrations.print_capture import setup_print_capture

client = AxonPush(api_key="ak_...", tenant_id="1")
handle = setup_print_capture(client, channel_id=1, agent_id="demo-agent")

print("agent starting")  # forwarded to AxonPush as an agent.log event
handle.unpatch()
```

### OpenTelemetry

If your service is already instrumented with the OTel SDK, add `AxonPushSpanExporter` to your tracer provider and every span ships to AxonPush alongside whatever other backends you export to.

```python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from axonpush import AxonPush
from axonpush.integrations.otel import AxonPushSpanExporter

client = AxonPush(api_key="ak_...", tenant_id="1")
provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(
        AxonPushSpanExporter(client=client, channel_id=1, service_name="my-api")
    )
)
trace.set_tracer_provider(provider)
```

### Sentry

If your app is already using `sentry-sdk`, point it at AxonPush with a one-liner. `install_sentry()` builds a Sentry DSN from your AxonPush credentials and calls `sentry_sdk.init(**kwargs)` for you — errors captured anywhere in your app (including Sentry's Flask/FastAPI/Django/Celery instrumentations) flow into your AxonPush channel instead of Sentry's cloud.

```bash
pip install sentry-sdk   # axonpush does not bundle sentry-sdk
```

```python
from axonpush import install_sentry

install_sentry(
    api_key="ak_...",
    channel_id=42,
    environment="production",
    release="my-app@1.2.3",
    # Any extra kwargs are forwarded to sentry_sdk.init() unchanged:
    traces_sample_rate=0.1,
    send_default_pii=False,
)

# That's it — sentry_sdk.capture_exception / capture_message now ship to AxonPush.
```

`api_key`, `channel_id`, and `host` fall back to `AXONPUSH_API_KEY`, `AXONPUSH_CHANNEL_ID`, and `AXONPUSH_HOST` (default `api.axonpush.xyz`) if omitted. `environment` uses the same auto-detect precedence as the client (`AXONPUSH_ENVIRONMENT` → `SENTRY_ENVIRONMENT` → `APP_ENV` → `ENV`). If you need a fully-formed DSN instead, pass `dsn="..."` and the other args are ignored.

## Real-Time Subscriptions

axonpush supports two real-time subscription mechanisms: **SSE** (Server-Sent Events) and **WebSocket** (Socket.IO).

### SSE (Server-Sent Events)

SSE is the simplest way to consume events in real time — no extra dependencies required.

#### Subscribe to all events on a channel

```python
from axonpush import AxonPush

with AxonPush(api_key="ak_...", tenant_id="1") as client:
    with client.channels.subscribe_sse(channel_id=1) as sub:
        for event in sub:
            print(event.agent_id, event.identifier, event.payload)
```

#### Subscribe to a specific event identifier

```python
with client.channels.subscribe_event_sse(channel_id=1, event_identifier="web_search") as sub:
    for event in sub:
        print(event.payload)
```

#### Filter by agent, event type, or trace

All SSE methods accept optional filters to narrow the event stream:

```python
with client.channels.subscribe_sse(
    channel_id=1,
    agent_id="researcher",
    event_type=EventType.AGENT_ERROR,
    trace_id="tr_run_42",
) as sub:
    for event in sub:
        print(f"[{event.agent_id}] {event.identifier}: {event.payload}")
```

### WebSocket (Socket.IO)

WebSocket subscriptions are callback-based and support bidirectional communication (subscribe, publish, unsubscribe).

```bash
pip install axonpush[websocket]
```

#### Sync

```python
ws = client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
ws.wait()  # blocks until disconnected
```

#### Async

```python
ws = await async_client.connect_websocket()
ws.on_event(lambda e: print(e.agent_id, e.payload))
await ws.subscribe(channel_id=1, event_type="agent.tool_call.start")
await ws.wait()
```

#### Publish and unsubscribe via WebSocket

```python
ws.publish(channel_id=1, identifier="status", payload={"step": "done"}, agent_id="worker")
ws.unsubscribe(channel_id=1)
ws.disconnect()
```

## Use Case Guides

Step-by-step guides for common scenarios:

- [See what your agent is doing — in real time](docs/use-cases/01-realtime-agent-events.md)
- [Add observability in 3 lines](docs/use-cases/02-framework-integrations.md)
- [Build a live dashboard with SSE](docs/use-cases/03-live-dashboard-sse.md)
- [Trace a multi-step agent run](docs/use-cases/04-distributed-tracing.md)
- [Get notified when your agent fails](docs/use-cases/05-error-webhooks.md)
- [Agent-to-agent communication](docs/use-cases/06-agent-to-agent-websockets.md)
- [Production error handling](docs/use-cases/07-production-error-handling.md)

## Resources

The client exposes Stripe-style resource objects:

| Resource | Methods |
|---|---|
| `client.events` | `publish()`, `list()` |
| `client.channels` | `create()`, `get()`, `update()`, `delete()`, `subscribe_sse()` |
| `client.apps` | `create()`, `get()`, `list()`, `update()`, `delete()` |
| `client.webhooks` | `create_endpoint()`, `list_endpoints()`, `delete_endpoint()`, `get_deliveries()` |
| `client.traces` | `list()`, `get_events()`, `get_summary()` |

## License

MIT
