Metadata-Version: 2.4
Name: autoplay-sdk
Version: 0.1.0
Summary: Real-time event streaming client for Autoplay connectors
Project-URL: Homepage, https://github.com/Autoplay-AI/real-time-poc
Project-URL: Documentation, https://github.com/Autoplay-AI/real-time-poc/tree/main/later-rho-event-connector/src/customer_sdk#readme
Project-URL: Bug Tracker, https://github.com/Autoplay-AI/real-time-poc/issues
License: MIT
Keywords: autoplay,chatbot,events,rag,real-time,sse
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: httpx-sse>=0.4.0
Requires-Dist: httpx>=0.27.0
Description-Content-Type: text/markdown

# autoplay-sdk

Real-time event streaming client for [Autoplay](https://autoplay.so) connectors.

Receive live UI actions and session summaries from the Autoplay event connector
via SSE (Server-Sent Events) — ideal as a real-time data source for RAG
pipelines, analytics systems, or custom automation.

---

## Installation

```bash
pip install autoplay-sdk
```

**Requirements:** Python 3.10+

---

## Quickstart

```python
from autoplay_sdk import ConnectorClient, ActionsPayload

STREAM_URL = "https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID"
API_TOKEN  = "uk_live_..."

def on_actions(payload: ActionsPayload):
    print(payload.to_text())

ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
    .on_actions(on_actions) \
    .run()
```

`run()` blocks and reconnects automatically on any network failure.
Press **Ctrl-C** to stop.

---

## Typed payloads

All callbacks receive typed dataclass instances — not raw dicts.  Your IDE
will autocomplete fields and you get a `.to_text()` method on every payload
that returns an embedding-ready string.

```python
from autoplay_sdk import ActionsPayload, SummaryPayload

def on_actions(payload: ActionsPayload):
    print(payload.session_id)       # str | None
    print(payload.email)            # str | None
    print(payload.count)            # int
    for action in payload.actions:
        print(action.title)         # str
        print(action.description)   # str
        print(action.canonical_url) # str

    # embedding-ready text — one call, no formatting logic needed
    text = payload.to_text()
    # "Session ps_abc123 — 3 actions\n1. Viewed Dashboard — https://...\n..."

def on_summary(payload: SummaryPayload):
    print(payload.session_id)
    print(payload.replaces)         # number of actions this summary replaces
    text = payload.to_text()        # the prose summary string directly
```

---

## Background usage (non-blocking)

```python
client = ConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)
client.run_in_background()

# your application continues here
import time
time.sleep(60)

client.stop()
```

---

## Context manager

```python
with ConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
    client.on_actions(on_actions).run_in_background()
    do_other_work()
# stop() is called automatically on exit
```

---

## Async RAG pipeline (recommended)

Use `AsyncConnectorClient` when your RAG pipeline is built on `asyncio` —
LangChain, LlamaIndex, FastAPI, or any framework where embedding/vector calls
are already async.

```python
import asyncio
import openai
from autoplay_sdk import AsyncConnectorClient, ActionsPayload, SummaryPayload

openai_client = openai.AsyncOpenAI()

async def on_actions(payload: ActionsPayload):
    """Embed the batch and upsert into your vector store."""
    text = payload.to_text()  # embedding-ready string, no formatting needed
    response = await openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    )
    embedding = response.data[0].embedding

    # upsert into Pinecone, Weaviate, Chroma, pgvector, etc.
    await your_vector_store.upsert(
        id=payload.session_id or "unknown",
        vector=embedding,
        metadata={
            "session_id": payload.session_id,
            "email":      payload.email,
            "count":      payload.count,
        },
    )

async def on_summary(payload: SummaryPayload):
    """Replace the raw action history with a compact prose summary."""
    text = payload.to_text()  # the prose summary string directly
    print(f"[summary] session={payload.session_id}: {text}")
    # store in your RAG context window / vector store as well

async def main():
    async with AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
        client.on_actions(on_actions).on_summary(on_summary)
        await client.run()

asyncio.run(main())
```

### Non-blocking inside an existing event loop

```python
client = AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)
task = client.run_in_background()  # asyncio.Task — returns immediately

await do_other_async_work()

client.stop()
await task
```

## Sync RAG pipeline

If your pipeline is synchronous (no `async/await`), use `ConnectorClient`.
Slow callbacks are safe — the SSE reader and the callback executor run on
separate threads.

```python
import openai
from autoplay_sdk import ConnectorClient, ActionsPayload

openai_client = openai.OpenAI()

def on_actions(payload: ActionsPayload):
    text = payload.to_text()
    embedding = openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    ).data[0].embedding
    your_vector_store.upsert(id=payload.session_id or "unknown", vector=embedding)

ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
    .on_actions(on_actions) \
    .run()
```

---

## Drop handling (high-volume)

If your callback is slower than the incoming event rate, events are queued
internally.  When the queue is full, events are dropped.  Register a handler
to be notified:

```python
def on_drop(payload, total_dropped):
    print(f"WARNING: dropped event (type={payload['type']}, total={total_dropped})")
    # alert your on-call rotation, increment a metric, etc.

ConnectorClient(url=STREAM_URL, token=API_TOKEN, max_queue_size=1000) \
    .on_actions(on_actions) \
    .on_drop(on_drop) \
    .run()

# Check the running drop count at any time:
print(client.dropped_count)   # events dropped so far
print(client.queue_size)      # events waiting in the queue right now
```

---

## Payload schemas

### `actions` event

```json
{
  "type":       "actions",
  "session_id": "ps_abc123",
  "ts":         "2024-01-15T10:30:00Z",
  "actions": [
    {
      "event_type":     "pageview",
      "title":          "Dashboard",
      "canonical_url":  "https://app.example.com/dashboard",
      "ts":             "2024-01-15T10:30:00Z",
      "properties": {
        "referrer": "https://app.example.com/login"
      }
    },
    {
      "event_type":     "click",
      "title":          "Export CSV button",
      "canonical_url":  "https://app.example.com/dashboard",
      "ts":             "2024-01-15T10:30:05Z",
      "properties": {}
    }
  ]
}
```

| Field                  | Type   | Description                                  |
|------------------------|--------|----------------------------------------------|
| `type`                 | string | Always `"actions"`                           |
| `session_id`           | string | Unique user session identifier               |
| `ts`                   | string | ISO-8601 timestamp of the batch              |
| `actions`              | array  | Ordered list of user actions in this batch   |
| `actions[].event_type` | string | `pageview`, `click`, `input`, …             |
| `actions[].title`      | string | Human-readable page or element title         |
| `actions[].canonical_url` | string | Page URL where the action occurred        |
| `actions[].ts`         | string | ISO-8601 timestamp of the individual action  |
| `actions[].properties` | object | Additional event properties (may be empty)   |

### `summary` event

```json
{
  "type":       "summary",
  "session_id": "ps_abc123",
  "ts":         "2024-01-15T10:35:00Z",
  "content":    "The user navigated to the Dashboard, exported a CSV report, then opened account settings to update their billing plan."
}
```

| Field        | Type   | Description                                   |
|--------------|--------|-----------------------------------------------|
| `type`       | string | Always `"summary"`                            |
| `session_id` | string | Unique user session identifier                |
| `ts`         | string | ISO-8601 timestamp when the summary was made  |
| `content`    | string | Prose summary of the session up to this point |

---

## API reference

### Typed models

| Class | Description |
|-------|-------------|
| `SlimAction` | One UI action: `title`, `description`, `canonical_url` + `.to_text()` |
| `ActionsPayload` | A batch of actions for a session + `.to_text()` for embedding |
| `SummaryPayload` | LLM prose summary for a session + `.to_text()` for embedding |

### `ConnectorClient(url, token="", max_queue_size=500)`

| Parameter       | Type | Default | Description                                            |
|-----------------|------|---------|--------------------------------------------------------|
| `url`           | str  | —       | Full URL to `GET /stream/{product_id}` on the connector |
| `token`         | str  | `""`    | Unkey API key (`uk_live_...`)                          |
| `max_queue_size`| int  | `500`   | Max events buffered before drops start occurring       |

#### Methods

| Method                      | Returns             | Description                                                   |
|-----------------------------|---------------------|---------------------------------------------------------------|
| `.on_actions(fn)`           | `ConnectorClient`   | Register callback; `fn` receives `ActionsPayload`             |
| `.on_summary(fn)`           | `ConnectorClient`   | Register callback; `fn` receives `SummaryPayload`             |
| `.on_drop(fn)`              | `ConnectorClient`   | Register callback when events are dropped (queue full)        |
| `.run()`                    | `None` (blocks)     | Connect and process events; reconnects automatically          |
| `.run_in_background()`      | `threading.Thread`  | Start the client on a daemon thread; returns immediately      |
| `.stop()`                   | `None`              | Signal the client to stop cleanly                             |

#### Properties

| Property        | Type | Description                                          |
|-----------------|------|------------------------------------------------------|
| `dropped_count` | int  | Running total of events dropped due to a full queue  |
| `queue_size`    | int  | Number of events currently waiting in the queue      |

---

### `AsyncConnectorClient(url, token="")`

Async-native client for `asyncio` environments.  Callbacks are `async def` coroutines.

| Parameter | Type | Default | Description                                             |
|-----------|------|---------|---------------------------------------------------------|
| `url`     | str  | —       | Full URL to `GET /stream/{product_id}` on the connector |
| `token`   | str  | `""`    | Unkey API key (`uk_live_...`)                           |

#### Methods

| Method                 | Returns          | Description                                                    |
|------------------------|------------------|----------------------------------------------------------------|
| `.on_actions(fn)`      | `AsyncConnectorClient` | Register async callback; `fn` receives `ActionsPayload`  |
| `.on_summary(fn)`      | `AsyncConnectorClient` | Register async callback; `fn` receives `SummaryPayload`  |
| `.run()`               | `Coroutine`      | Connect and process events (await this)                        |
| `.run_in_background()` | `asyncio.Task`   | Schedule `run()` as a background task; returns immediately     |
| `.stop()`              | `None`           | Signal the client to stop cleanly                              |

Supports `async with` for automatic `stop()` on exit.

---

## Reconnection behaviour

The client reconnects automatically on any network failure or non-fatal HTTP
error.  It uses exponential backoff starting at 1 s and capping at 30 s.

Fatal HTTP errors (401 Unauthorized, 403 Forbidden, 404 Not Found) are
**not** retried and will raise immediately — check your `url` and `token`.

---

## Logging

The SDK logs to the standard Python `logging` hierarchy under the logger name
`autoplay_sdk`.  Enable it in your application:

```python
import logging
logging.basicConfig(level=logging.INFO)
```

Or to see only SDK output:

```python
logging.getLogger("autoplay_sdk").setLevel(logging.DEBUG)
```

---

## License

MIT
