Metadata-Version: 2.4
Name: digixt-realtime-sdk
Version: 1.0.0
Summary: Python SDK for DigiXT Real-Time Data Delivery Infrastructure
Home-page: https://github.com/digixt/digixt-centrifugo
Author: DigiXT
Author-email: support@digixt.com
Project-URL: Bug Tracker, https://github.com/digixt/digixt-centrifugo/issues
Project-URL: Documentation, https://github.com/digixt/digixt-centrifugo/tree/main/clients/python
Project-URL: Source Code, https://github.com/digixt/digixt-centrifugo
Keywords: digixt,real-time,websocket,centrifugo,kafka,sdk,async
Platform: any
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: websockets<16.0,>=12.0
Requires-Dist: aiohttp>=3.9.0
Requires-Dist: centrifuge-python>=0.4.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: flake8>=6.0.0; extra == "dev"
Requires-Dist: twine>=4.0.0; extra == "dev"
Requires-Dist: build>=0.10.0; extra == "dev"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: platform
Dynamic: project-url
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# DigiXT Real-Time SDK for Python

A Python SDK for connecting to DigiXT real-time data delivery infrastructure powered by Centrifugo and Kafka.

## Features

- 🔌 **WebSocket Connection**: Async WebSocket connection to Centrifugo
- 🔐 **Authentication**: JWT token-based authentication via auth service
- 📡 **Channel Subscriptions**: Subscribe to real-time data channels
- 🔄 **Auto Reconnection**: Automatic reconnection with configurable retry logic
- ✅ **API Key Validation**: Periodic API key validation to ensure connection validity
- 📨 **Message Publishing**: Publish messages to channels via HTTP API
- 🎯 **Event Handling**: Comprehensive event system for connection and message events

## Installation

### Install from PyPI (Recommended)

```bash
pip install digixt-realtime-sdk
```

### Install from Source

If you need to install from the source repository:

```bash
# Clone the repository
git clone https://github.com/digixt/digixt-centrifugo.git
cd digixt-centrifugo/clients/python

# Install in development mode
pip install -e .

# Or install in production mode
pip install .
```

### Development Installation

For contributing to the SDK:

```bash
cd clients/python
pip install -e ".[dev]"
```

This installs the package in editable mode with development dependencies including pytest, black, and flake8.

## Quick Start

### Basic Usage

```python
import asyncio
from digixt_realtime_sdk import DigiXTSDK

async def main():
    # Initialize SDK
    sdk = DigiXTSDK(
        url='ws://localhost:8000/connection/websocket',
        auth_url='http://localhost:3000',
        username='myuser',
        api_key='my-api-key',
        user='myuser',  # User ID for presence tracking
        debug=True
    )
    
    # Set up event handlers
    sdk.on('connected', lambda ctx: print('✅ Connected!', ctx))
    sdk.on('disconnected', lambda ctx: print('❌ Disconnected', ctx))
    sdk.on('error', lambda err: print('❌ Error:', err))
    sdk.on('api_key_expired', lambda err: print('⚠️ API key expired:', err))
    
    # Connect to server
    await sdk.connect()
    
    # Subscribe to channel with message handler
    async def on_message(ctx):
        print(f"📨 Message on {ctx['channel']}: {ctx['data']}")
    
    await sdk.subscribe('updates', on_publication=on_message)
    
    # Keep connection alive
    print("Listening for messages... Press Ctrl+C to stop")
    try:
        await asyncio.sleep(3600)  # Run for 1 hour
    except KeyboardInterrupt:
        print("\nShutting down...")
    
    # Disconnect
    await sdk.disconnect()

if __name__ == '__main__':
    asyncio.run(main())
```

### Advanced Usage

```python
import asyncio
from digixt_realtime_sdk import DigiXTSDK

async def main():
    sdk = DigiXTSDK(
        url='ws://localhost:8000/connection/websocket',
        auth_url='http://localhost:3000',
        username='myuser',
        api_key='my-api-key',
        user='myuser',
        debug=True,
        reconnect_timeout=5,
        max_reconnect_attempts=10,
        api_key_validation_interval=30
    )
    
    # Connection events
    def on_connected(ctx):
        print(f"Connected! Client ID: {ctx.get('client')}")
        print(f"Connection state: {sdk.get_connection_state()}")
    
    def on_disconnected(ctx):
        print(f"Disconnected: {ctx.get('reason')}")
    
    def on_error(err):
        print(f"Error: {err.get('error')}")
    
    def on_api_key_expired(err):
        print(f"API key expired: {err.get('error')}")
        # Handle API key expiration (e.g., refresh token)
    
    sdk.on('connected', on_connected)
    sdk.on('disconnected', on_disconnected)
    sdk.on('error', on_error)
    sdk.on('api_key_expired', on_api_key_expired)
    
    # Connect
    await sdk.connect()
    
    # Subscribe to multiple channels
    async def on_updates(ctx):
        print(f"Updates channel: {ctx['data']}")
    
    async def on_telemetry(ctx):
        print(f"Telemetry channel: {ctx['data']}")
    
    await sdk.subscribe('updates', on_publication=on_updates)
    await sdk.subscribe('telemetry', on_publication=on_telemetry)
    
    # Publish a message (via HTTP API)
    try:
        result = await sdk.publish('updates', {'message': 'Hello from Python!'})
        print(f"Published: {result}")
    except Exception as e:
        print(f"Publish failed: {e}")
    
    # Keep running
    await asyncio.sleep(60)
    
    # Unsubscribe from a channel
    await sdk.unsubscribe('telemetry')
    
    # Disconnect
    await sdk.disconnect()

if __name__ == '__main__':
    asyncio.run(main())
```

## API Reference

### DigiXTSDK

#### Constructor

```python
DigiXTSDK(
    url='ws://localhost:8000/connection/websocket',
    api_url='http://localhost:8000/api',
    auth_url='http://localhost:3000',
    api_key='',
    username='',
    user=None,
    token=None,
    debug=False,
    reconnect_timeout=5,
    max_reconnect_attempts=10,
    api_key_validation_interval=30
)
```

**Parameters:**
- `url` (str): WebSocket URL for Centrifugo
- `api_url` (str): HTTP API URL for Centrifugo
- `auth_url` (str): Auth service URL
- `api_key` (str): API key for authentication
- `username` (str): Username for authentication
- `user` (str, optional): User ID for token generation (defaults to username or generates unique ID)
- `token` (str, optional): Pre-generated JWT token
- `debug` (bool): Enable debug logging
- `reconnect_timeout` (int): Seconds to wait before reconnecting
- `max_reconnect_attempts` (int): Maximum reconnection attempts
- `api_key_validation_interval` (int): Seconds between API key validations

#### Methods

##### `async connect()`

Connect to Centrifugo WebSocket server.

**Raises:**
- `ConnectionError`: If connection fails
- `AuthenticationError`: If authentication fails

##### `async disconnect()`

Disconnect from server and clean up resources.

##### `async subscribe(channel, on_publication=None, on_join=None, on_leave=None, on_error=None)`

Subscribe to a channel.

**Parameters:**
- `channel` (str): Channel name to subscribe to
- `on_publication` (callable, optional): Callback for received messages
- `on_join` (callable, optional): Callback when user joins channel
- `on_leave` (callable, optional): Callback when user leaves channel
- `on_error` (callable, optional): Callback for subscription errors

**Raises:**
- `SubscriptionError`: If subscription fails

##### `async unsubscribe(channel)`

Unsubscribe from a channel.

**Parameters:**
- `channel` (str): Channel name to unsubscribe from

##### `async publish(channel, data)`

Publish a message to a channel via HTTP API.

**Parameters:**
- `channel` (str): Channel name
- `data` (any): Message data to publish

**Returns:**
- Response from Centrifugo API

**Raises:**
- `DigiXTError`: If publish fails

##### `on(event, handler)`

Register event handler.

**Parameters:**
- `event` (str): Event name ('connected', 'disconnected', 'error', etc.)
- `handler` (callable): Callback function

##### `off(event, handler)`

Unregister event handler.

**Parameters:**
- `event` (str): Event name
- `handler` (callable): Callback function to remove

##### `get_connection_state()`

Get current connection state.

**Returns:**
- Dictionary with connection state information

### Events

- `connected`: Emitted when connection is established
- `disconnected`: Emitted when connection is lost
- `error`: Emitted when an error occurs
- `api_key_expired`: Emitted when API key validation fails
- `api_key_validated`: Emitted when API key validation succeeds
- `subscribed`: Emitted when successfully subscribed to a channel
- `reconnect_failed`: Emitted when max reconnection attempts reached

### Exceptions

- `DigiXTError`: Base exception for all SDK errors
- `ConnectionError`: Raised when connection fails
- `AuthenticationError`: Raised when authentication fails
- `SubscriptionError`: Raised when subscription operations fail
- `TokenError`: Raised when token operations fail

## Usage Examples

### Command Line (CLI)

After installing the package, you can use the CLI tool `digixt-sdk`:

```bash
# Basic usage - subscribe to a channel
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates

# Subscribe to multiple channels
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channels updates telemetry

# Publish a message
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --publish '{"message": "Hello from CLI"}'

# Run with timeout (60 seconds)
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --timeout 60

# JSON output mode
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --json-output

# Debug mode
digixt-sdk \
  --url ws://localhost:8000/connection/websocket \
  --auth-url http://localhost:3000 \
  --username myuser \
  --api-key my-api-key \
  --channel updates \
  --debug
```

You can also use it as a Python module:

```bash
python -m digixt_realtime_sdk.cli --help
```

Or use it programmatically in Python scripts:

```python
import asyncio
from digixt_realtime_sdk import DigiXTSDK

async def main():
    sdk = DigiXTSDK(
        url='ws://localhost:8000/connection/websocket',
        auth_url='http://localhost:3000',
        username='myuser',
        api_key='my-api-key'
    )
    await sdk.connect()
    await sdk.subscribe('updates', on_publication=lambda ctx: print(ctx['data']))
    await asyncio.sleep(60)
    await sdk.disconnect()

asyncio.run(main())
```

### Jupyter Notebook

Jupyter notebooks support top-level `await`, making async SDK usage straightforward:

```python
# In a Jupyter notebook cell
import asyncio
from digixt_realtime_sdk import DigiXTSDK

# Initialize
sdk = DigiXTSDK(
    url='ws://localhost:8000/connection/websocket',
    auth_url='http://localhost:3000',
    username='myuser',
    api_key='my-api-key',
    debug=True
)

# Set up handlers
messages = []
def on_message(ctx):
    messages.append(ctx['data'])
    print(f"Received: {ctx['data']}")

sdk.on('connected', lambda ctx: print('Connected!'))
sdk.on('error', lambda err: print(f'Error: {err}'))

# Connect and subscribe (use await at top level)
await sdk.connect()
await sdk.subscribe('updates', on_publication=on_message)

# Analyze messages later
print(f"Total messages: {len(messages)}")
```

See `examples/jupyter_example.ipynb` for a complete Jupyter notebook example.

### Python Scripts

See the `examples/` directory for more usage examples:

- `basic_example.py`: Basic connection and subscription
- `production_example.py`: Production-ready example with error handling
- `jupyter_example.ipynb`: Jupyter notebook example

## Requirements

- Python 3.8+
- websockets>=12.0
- aiohttp>=3.9.0

## License

MIT

## Support

For issues and questions, please open an issue on GitHub.

