Metadata-Version: 2.4
Name: restless-stream
Version: 0.1.2
Summary: Python SDK for Restless Stream
Project-URL: Homepage, https://github.com/restless-stream/sdk/tree/main/packages/python/core#readme
Project-URL: Repository, https://github.com/restless-stream/sdk
Project-URL: Issues, https://github.com/restless-stream/sdk/issues
Author: Restless Stream
License: MIT
Keywords: restless-stream,sdk,sse,streaming,websocket
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Typing :: Typed
Requires-Python: >=3.14
Requires-Dist: httpx-sse>=0.4
Requires-Dist: httpx>=0.27
Requires-Dist: pydantic>=2
Requires-Dist: typing-extensions>=4.8
Requires-Dist: websockets>=16.0
Provides-Extra: dev
Requires-Dist: build>=1.2; extra == 'dev'
Requires-Dist: pytest-asyncio>=1.4.0; extra == 'dev'
Requires-Dist: pytest-cov>=7.1.0; extra == 'dev'
Requires-Dist: pytest>=9.0.3; extra == 'dev'
Requires-Dist: ruff>=0.15.16; extra == 'dev'
Description-Content-Type: text/markdown

# restless-stream

Official Python SDK for Restless Stream: <https://restlessapi.stream>

Restless Stream turns REST APIs into live Server-Sent Events and WebSocket streams. This package provides synchronous REST and SSE support, asynchronous REST, SSE, and WebSocket support, typed Pydantic models, runtime URL builders, and HMAC signature helpers.

## Requirements

- Python `>=3.14`.
- A Restless Stream account and API key.

## Installation

```bash
python -m pip install restless-stream
```

## Getting Started

### Async Client

Use `AsyncRestlessStreamClient` when you need async REST calls, SSE subscriptions, or WebSocket subscriptions.

```python
import asyncio
import os

from restless_stream import AsyncRestlessStreamClient


async def main() -> None:
  async with AsyncRestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
    stream = await client.streams.create(
      name="Orders",
      description="Live order feed",
      status="ACTIVE",
      method="GET",
      url="https://api.example.com/orders",
      payload_mode="FULL_DATA",
      polling_interval=30,
    )

    async for event in client.streams.subscribe_sse(stream.sse_url, reconnect=False):
      print(event.type, event.data)


asyncio.run(main())
```

### Sync Client

Use `RestlessStreamClient` for synchronous REST calls and SSE subscriptions.

```python
import os

from restless_stream import RestlessStreamClient


with RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
  streams = client.streams.list(limit=20, offset=0)

  for stream in streams.streams:
    print(stream.id, stream.name)
```

## Client Configuration

```python
from restless_stream import AsyncRestlessStreamClient, RestlessStreamClient

client = RestlessStreamClient(
  api_key="rs_...",
  base_url="https://api.restlessapi.stream",
  stream_base_url="https://stream.restlessapi.stream",
  timeout=30.0,
)

async_client = AsyncRestlessStreamClient(
  api_key="rs_...",
  timeout=30.0,
)
```

| Option | Description |
| --- | --- |
| `api_key` | Sends `x-api-key` on REST requests and `Authorization: Bearer <key>` on stream runtime requests. |
| `base_url` | REST API base URL. Defaults to `https://api.restlessapi.stream`. |
| `stream_base_url` | Runtime stream base URL. Defaults to `https://stream.restlessapi.stream`. |
| `timeout` | HTTP timeout in seconds for the owned `httpx` client. Defaults to `30.0`. |
| `http_client` | Optional `httpx.Client` or `httpx.AsyncClient`. When provided, the SDK does not close it. |

Both clients support context managers. Use `close()` for the sync client and `await aclose()` for the async client when you do not use a context manager.

When creating or updating streams, the SDK adds `apiKey` to the request body when the client has an API key and the body does not already include `apiKey` or `api_key`.

## Stream Management

All stream management methods are available as top-level client methods and through `client.streams`.

```python
stream = await client.streams.create(
  name="Orders",
  description="Live order feed",
  status="ACTIVE",
  method="GET",
  url="https://api.example.com/orders",
  headers={"Accept": "application/json"},
  payload_mode="FULL_DATA",
  polling_interval=30,
)

await client.streams.update(stream.id, name="Orders v2", polling_interval=60)
await client.streams.stop(stream.id)
await client.streams.start(stream.id)

usage = await client.streams.credit_usage_stats(stream.id)
snippets = await client.streams.connection_snippets(stream_id=stream.id, language="python")
```

| Top-level method | Resource method | Description |
| --- | --- | --- |
| `list_streams(limit=20, offset=0)` | `streams.list(...)` | List streams. |
| `get_stream(stream_id)` | `streams.get(stream_id)` | Get one stream. |
| `create_stream(data=None, **kwargs)` | `streams.create(...)` | Create a persisted stream. |
| `update_stream(stream_id, data=None, **kwargs)` | `streams.update(...)` | Patch stream configuration. |
| `start_stream(stream_id)` | `streams.start(stream_id)` | Mark a stream active. |
| `stop_stream(stream_id)` | `streams.stop(stream_id)` | Mark a stream inactive. |
| `delete_stream(stream_id)` | `streams.delete(stream_id)` | Delete a stream. |
| `validate_stream_api_key(api_key=None)` | `streams.validate_api_key(...)` | Validate an API key. Uses the client key when omitted. |
| `credit_usage_stats(stream_id)` | `streams.credit_usage_stats(...)` | Fetch credit usage totals and daily usage. |
| `connection_snippets(data=None, **kwargs)` | `streams.connection_snippets(...)` | Generate runtime URLs and snippets. |
| `direct_setup(data=None, **kwargs)` | `streams.direct_setup(...)` | Generate direct-stream commands, URLs, and snippets. |
| `direct_session(data=None, **kwargs)` | `streams.direct_session(...)` | Create or reuse a direct stream session. |
| `subscribe_sse(url, **kwargs)` | `streams.subscribe_sse(...)` | Subscribe to an SSE runtime URL. |
| `subscribe_direct_sse(**kwargs)` | `streams.subscribe_direct_sse(...)` | Build and subscribe to a direct SSE URL. |
| `subscribe_websocket(url, **kwargs)` | `streams.subscribe_websocket(...)` | Async client only. Subscribe to a WebSocket runtime URL. |
| `subscribe_direct_websocket(**kwargs)` | `streams.subscribe_direct_websocket(...)` | Async client only. Build and subscribe to a direct WebSocket URL. |

Request bodies may be mappings, Pydantic models, or keyword arguments. Snake-case keys are converted to the camel-case field names expected by the API.

```python
await client.streams.create(
  name="Orders",
  description="",
  status="ACTIVE",
  method="POST",
  url="https://api.example.com/orders/search",
  body={"status": "open"},
  payload_mode="JSON_PATCH",
  polling_interval=30,
)
```

## Direct Streams

Direct streams let you stream a REST endpoint without creating a persisted stream first.

```python
async for event in client.streams.subscribe_direct_sse(
  url="https://api.example.com/orders",
  method="GET",
  polling_interval=30,
  reconnect=False,
):
  print(event)
```

Create a reusable direct session when you need a stable URL.

```python
session = await client.streams.direct_session(
  dedupe_key="orders-feed-v1",
  method="GET",
  url="https://api.example.com/orders",
  polling_interval=30,
)

async for event in client.streams.subscribe_sse(session.sse_url):
  print(event.data)
```

Generate setup commands and code snippets without subscribing.

```python
setup = await client.streams.direct_setup(
  method="POST",
  url="https://api.example.com/search",
  body={"query": "restless"},
  language="python",
)

print(setup.commands.header.curl)
print(setup.runtime.direct_sse_url)
```

Direct stream inputs support `url`, `method`, `headers`, `body`, `jq_filter`, `payload_mode`, `polling_interval`, `polling_strategies`, and `api_key`.

## SSE Streaming

SSE subscriptions yield `StreamEvent` Pydantic models.

```python
for event in sync_client.streams.subscribe_sse(
  "https://stream.restlessapi.stream?streamId=stream_123",
  cursor="1700000000000",
  reconnect=False,
):
  print(event.type, event.data)
```

```python
async for event in async_client.streams.subscribe_sse(
  "https://stream.restlessapi.stream?streamId=stream_123",
  since="2026-01-01T00:00:00Z",
  max_reconnects=3,
  retry_seconds=2,
):
  print(event.type, event.meta.timestamp)
```

SSE options:

| Option | Description |
| --- | --- |
| `cursor` | Initial cursor or SSE event ID. Updated automatically from received events. |
| `since` | Initial timestamp or cursor used until an event cursor is observed. |
| `reconnect` | Reconnect after the stream ends or errors. Defaults to `True`. |
| `max_reconnects` | Maximum reconnect attempts. `None` means unbounded. |
| `retry_seconds` | Delay between reconnect attempts. Defaults to `1.0`. |

The SDK sends `Authorization: Bearer <api_key>` for runtime subscriptions when the client has an API key.

## WebSocket Streaming

WebSocket subscriptions are async-only.

```python
async for event in async_client.streams.subscribe_websocket(
  "wss://stream.restlessapi.stream/ws?streamId=stream_123",
  cursor="42",
  reconnect=True,
  max_reconnects=3,
):
  print(event.type, event.data)
```

Direct WebSocket subscription:

```python
async for event in async_client.streams.subscribe_direct_websocket(
  url="https://api.example.com/orders",
  method="GET",
  polling_interval=30,
  connect_kwargs={"open_timeout": 10},
):
  print(event)
```

WebSocket options include all SSE runtime options plus `connect_kwargs`, which are passed to `websockets.connect`. Do not include `additional_headers` or `extra_headers` in `connect_kwargs` when the SDK client has an `api_key`; the SDK owns the runtime Authorization header in that case.

## Runtime URL Helpers

Build runtime URLs without creating a client.

```python
from restless_stream import (
  build_direct_sse_url,
  build_direct_websocket_url,
  build_sse_url,
  build_websocket_url,
  to_websocket_url,
)

sse_url = build_sse_url(stream_id="stream_123", since="2026-01-01T00:00:00Z")
ws_url = build_websocket_url(session_id="session_123")

direct_sse_url = build_direct_sse_url(
  url="https://api.example.com/orders",
  method="POST",
  body={"status": "open"},
  payload_mode="JSON_PATCH",
  polling_interval=30,
)

direct_ws_url = build_direct_websocket_url(url="https://api.example.com/orders")
converted_ws_url = to_websocket_url(sse_url)
```

`build_sse_url` requires exactly one of `stream_id` or `session_id`.

## Models

Response models are Pydantic v2 models. Python attributes use snake case and accept the API's camel-case aliases.

Common models and enums:

| Export | Description |
| --- | --- |
| `RestlessModel` | Base Pydantic model with camel-case aliases and extra fields allowed. |
| `HttpMethod` | `GET`, `POST`, `PUT`, `PATCH`, `DELETE`, `OPTIONS`. |
| `StreamStatus` | `ACTIVE` or `INACTIVE`. |
| `PayloadMode` | `FULL_DATA` or `JSON_PATCH`. |
| `PollingStrategy` | Time-windowed polling strategy. |
| `Stream` | Persisted stream response. |
| `StreamsResponse` | Stream list response with pagination info. |
| `BaseActionResponse` | Generic action response. |
| `StreamCreditUsageStats`, `DailyCreditUsage`, `CreditUsageChargeTypeBreakdown` | Credit usage responses. |
| `ConnectionSnippetsResponse` | Managed stream runtime URLs and snippets. |
| `DirectSetupResponse` | Direct setup commands, runtime URLs, snippets, and stream config. |
| `DirectSessionResponse` | Direct session runtime URLs and expiry. |
| `StreamEvent`, `StreamEventMeta`, `StreamErrorDetail` | Runtime event models. |

Runtime update events contain `type`, `meta`, `data`, optional `signature`, and optional `event_id`. Error events contain `type`, `meta`, and `error`.

## Error Handling

REST API failures raise `RestlessStreamAPIError`.

```python
from restless_stream import RestlessStreamAPIError

try:
  stream = client.streams.get("missing")
except RestlessStreamAPIError as error:
  print(error.status_code, error.message)
```

Invalid runtime event payloads raise `RestlessStreamParseError`. Both exceptions inherit from `RestlessStreamError`.

## HMAC Helpers

Use HMAC helpers to compute or verify Restless Stream HMAC-SHA256 signatures over JSON-compatible payloads.

```python
from restless_stream import compute_hmac_signature, verify_hmac_signature

payload = {"id": "order_123", "total": 42}
signature = compute_hmac_signature("secret", payload)

if verify_hmac_signature("secret", payload, signature):
  print("valid")
```

When verifying a runtime event signature, pass the same JSON payload your integration signs and the event's `signature` value.

## Public Exports

| Export | Purpose |
| --- | --- |
| `RestlessStreamClient` | Synchronous REST and SSE client. |
| `AsyncRestlessStreamClient` | Asynchronous REST, SSE, and WebSocket client. |
| `RestlessStreamError` | Base SDK exception. |
| `RestlessStreamAPIError` | REST API error exception. |
| `RestlessStreamParseError` | Runtime event parsing exception. |
| `compute_hmac_signature` | Computes an HMAC-SHA256 signature for a JSON-compatible payload. |
| `verify_hmac_signature` | Constant-time HMAC signature verification. |
| `DEFAULT_STREAM_BASE_URL` | Default runtime stream base URL. |
| `build_sse_url`, `build_websocket_url` | Managed stream runtime URL builders. |
| `build_direct_sse_url`, `build_direct_websocket_url` | Direct stream runtime URL builders. |
| `to_websocket_url` | Converts an SSE URL to a WebSocket URL. |
| Models and enums | Pydantic response models and enum types listed above. |

## Appendix: Creating an API Key

API keys authenticate requests to the Restless Stream management API and runtime streams. Keys start with `rs_` followed by a hex string:

```
rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
```

### Create a key in the dashboard

1. Sign in at [restlessapi.stream](https://restlessapi.stream) and go to **API Keys** in the dashboard.

![API Keys dashboard showing existing keys](docs/api-keys-dashboard.png)

2. Click **Create API Key**, enter a name and optional description, and set an optional expiry date.

![Create API Key modal form](docs/api-keys-create-modal.png)

3. Copy the key immediately — it is shown only once after creation. Store it in an environment variable or secrets manager.

```bash
export RESTLESS_API_KEY="rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
```

### Pass the key to the client

```python
from restless_stream import RestlessStreamClient

client = RestlessStreamClient(api_key="rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
# or read from environment
import os
client = RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"])
```

## Sample Responses

### `streams.list()`

```python
from restless_stream import RestlessStreamClient
import os

with RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
    result = client.streams.list(limit=5, offset=0)
    print(result)
```

**Response:**

```json
{
  "streams": [],
  "pagination_info": {
    "has_next_page": false
  }
}
```

### `streams.validate_api_key()`

```python
result = client.streams.validate_api_key()
```

**Response:**

```json
{
  "id": "rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
  "affected": true
}
```

### `streams.subscribe_direct_sse()` — live event

```python
import asyncio
from restless_stream import AsyncRestlessStreamClient

async def main():
    async with AsyncRestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
        async for event in client.streams.subscribe_direct_sse(
            url="https://httpbin.org/get",
            method="GET",
            polling_interval=15,
            reconnect=False,
        ):
            print(event)
            break  # receive one event then stop

asyncio.run(main())
```

**Event received:**

```json
{
  "type": "update",
  "meta": {
    "timestamp": "2026-06-07T00:49:36.562+00:00",
    "attempt_id": "a59d80d4-7d36-4019-b2c6-40cc8b8742ce",
    "status": 200,
    "latency_ms": 43,
    "payload_mode_fallback": null
  },
  "data": {
    "args": {},
    "headers": {
      "Accept": "*/*",
      "Host": "httpbin.org",
      "User-Agent": "Mozilla/5.0 (compatible; OurInternalService/1.0)",
      "X-Amzn-Trace-Id": "Root=1-6a24c020-316d1d396470c0900ce68495"
    },
    "origin": "140.82.14.127",
    "url": "https://httpbin.org/get"
  },
  "error": null,
  "signature": null,
  "event_id": "1780793376562"
}
```

## Development

```bash
python -m pip install -e "packages/python/core[dev]"
python -m ruff check packages/python/core packages/python/examples
cd packages/python/core
python -m pytest tests
```
