Metadata-Version: 2.3
Name: gnosari-realtime
Version: 0.1.0
Summary: Real-time WebSocket pub/sub server with optional AI processing
Author: Gnosari Team
Author-email: team@gnosari.com
Requires-Python: >=3.11,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: alembic (>=1.13.0,<2.0.0)
Requires-Dist: asyncpg (>=0.29.0,<0.30.0)
Requires-Dist: colorama (>=0.4.6,<0.5.0)
Requires-Dist: httpx (>=0.25.2,<0.26.0)
Requires-Dist: opensearch-py (>=2.4.2,<3.0.0)
Requires-Dist: pydantic-settings (>=2.0.0,<3.0.0)
Requires-Dist: pydantic[email] (>=2.5.0,<3.0.0)
Requires-Dist: pyjwt[crypto] (>=2.8.0,<3.0.0)
Requires-Dist: python-multipart (>=0.0.6,<0.0.7)
Requires-Dist: redis (>=5.0.0,<6.0.0)
Requires-Dist: sqlalchemy[asyncio] (>=2.0.23,<3.0.0)
Requires-Dist: starlette (>=0.37.2,<0.38.0)
Requires-Dist: structlog (>=23.2.0,<24.0.0)
Requires-Dist: uvicorn[standard] (>=0.24.0,<0.25.0)
Requires-Dist: uvloop (>=0.19.0,<0.20.0)
Requires-Dist: websockets (>=12.0,<13.0)
Description-Content-Type: text/markdown

# Gnosari Realtime

A high-performance, real-time WebSocket pub/sub server with optional AI processing capabilities. Built with Python, Starlette, and modern async technologies.

## Features

- **Real-time WebSocket Communication**: High-performance WebSocket server with channel-based pub/sub
- **Python SDK**: Easy-to-use client library for programmatic access
- **Authentication & Authorization**: JWT-based authentication with team and user-level permissions
- **Message Persistence**: PostgreSQL storage with full message history
- **Search & Analytics**: OpenSearch integration for message search and analytics
- **Redis Integration**: Pub/sub broadcasting and caching for horizontal scaling
- **Rate Limiting**: Built-in rate limiting and connection management
- **Extensible Architecture**: Clean SOLID architecture ready for AI integrations

## Quick Start

### Installation

```bash
# Clone the repository
git clone <repository-url>
cd realtime-server

# Install dependencies
pip install poetry
poetry install

# Set up environment
cp .env.example .env
# Edit .env with your configuration
```

### Start Services

#### Option 1: Docker Compose (Recommended)

```bash
# Start all services including the server
docker compose up -d

# Wait for services to be healthy, then run migrations
docker compose exec gnosari-server poetry run alembic upgrade head
```

#### Option 2: Local Development

```bash
# Start supporting services only
docker compose up -d postgres redis opensearch-node1

# Run database migrations
poetry run alembic upgrade head

# Start the server locally
poetry run gnosari-realtime-server
```

The server will start on `http://localhost:8000` by default.

### Docker Service Health

Check if all services are running:

```bash
# Check service status
docker compose ps

# Check service logs
docker compose logs gnosari-server
docker compose logs postgres
docker compose logs redis

# Check health status
curl http://localhost:8000/health
```

## Usage

### Python SDK

#### Basic Usage

```python
import asyncio
from gnosari_realtime import RealtimeClient

async def message_handler(data):
    print(f"Received: {data}")

async def main():
    client = RealtimeClient("ws://localhost:8000/ws/v1")
    
    await client.connect()
    await client.subscribe("public:notifications", message_handler)
    await client.publish("public:notifications", {"message": "Hello World!"})
    
    # Keep listening
    await asyncio.sleep(10)
    await client.disconnect()

asyncio.run(main())
```

#### Authenticated Usage

```python
# Create auth token first
import httpx

async with httpx.AsyncClient() as client:
    response = await client.post("http://localhost:8000/api/v1/auth/token", json={
        "user_id": "user123",
        "teams": ["content-team"],
        "permissions": ["publish_public"]
    })
    token = response.json()["token"]

# Use token with WebSocket client
client = RealtimeClient("ws://localhost:8000/ws/v1", auth_token=token)
await client.connect()
await client.subscribe("team:content-team", team_handler)
```

### HTTP API

#### Publish Messages

```bash
# Create authentication token
curl -X POST http://localhost:8000/api/v1/auth/token \
  -H "Content-Type: application/json" \
  -d '{"user_id": "user123", "teams": ["content-team"]}'

# Publish to channel
curl -X POST http://localhost:8000/api/v1/publish/public:alerts \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{"data": {"message": "System alert", "level": "warning"}}'
```

#### Search Messages

```bash
curl -X GET "http://localhost:8000/api/v1/search?q=alert&channel=public:alerts" \
  -H "Authorization: Bearer <token>"
```

#### Get Analytics

```bash
curl -X GET "http://localhost:8000/api/v1/analytics?channel=public:alerts" \
  -H "Authorization: Bearer <token>"
```

## Channels

**Dynamic Creation**: Channels are created automatically when first used - no pre-configuration required.

**Access Control Patterns**:
- `public:*` - Public channels (require `publish_public` permission to publish)
- `team:<team_id>` - Team-specific channels (require team membership)
- `user:<user_id>` - User-private channels (only accessible by the specific user)
- `admin:*` - Admin-only channels (require admin privileges)

**Permissions**: Dynamic strings - create any permission name. Built-in ones: `publish_public`, `read_analytics`.

## WebSocket API

### Connection

Connect to `ws://localhost:8000/ws/v1` with optional authentication:

- Header: `Authorization: Bearer <token>`
- Query param: `?token=<token>`

### Message Format

All WebSocket messages follow this schema:

```json
{
  "id": "uuid",
  "type": "command|query|event|response|error",
  "action": "subscribe|unsubscribe|publish|ping|...",
  "version": "v1",
  "payload": {},
  "metadata": {
    "timestamp": "2024-01-15T10:30:00Z",
    "user_id": "user123",
    "session_id": "session456"
  }
}
```

### Subscribe to Channel

```json
{
  "type": "command",
  "action": "subscribe",
  "payload": {
    "channel": "public:notifications",
    "filters": {"user_id": "user123"}
  }
}
```

### Publish Message

```json
{
  "type": "command",
  "action": "publish",
  "payload": {
    "channel": "public:notifications",
    "data": {"message": "Hello World!"}
  }
}
```

### Receive Messages

```json
{
  "type": "event",
  "action": "message",
  "payload": {
    "channel": "public:notifications",
    "data": {"message": "Hello World!"}
  }
}
```

## Configuration

Environment variables (see `.env.example`):

```bash
# Database
DATABASE_URL=postgresql+asyncpg://gnosari:gnosari123@localhost:5432/gnosari_realtime

# Redis
REDIS_URL=redis://localhost:6379/0

# OpenSearch
OPENSEARCH_URL=http://localhost:9200

# JWT Authentication
JWT_SECRET_KEY=your-secret-key
JWT_ALGORITHM=HS256
JWT_EXPIRE_MINUTES=1440

# Server
HOST=0.0.0.0
PORT=8000
DEBUG=true
LOG_LEVEL=INFO

# Rate Limiting
RATE_LIMIT_PER_MINUTE=100
MAX_CONNECTIONS_PER_CLIENT=5
```

## Development

### Running Tests

The project includes a comprehensive test suite with unit, integration, and end-to-end tests.

```bash
# Run all tests
./scripts/test.sh

# Quick test runner for development
./scripts/run_quick_tests.py

# Run specific test categories
poetry run pytest tests/unit/          # Unit tests only
poetry run pytest tests/integration/   # Integration tests only

# Run with coverage
poetry run pytest --cov=src/gnosari_realtime --cov-report=html

# Run specific test file
poetry run pytest tests/unit/test_schemas.py -v
```

### Test Categories

- **Unit Tests**: Core component testing with mocked dependencies
  - Message schema validation
  - Authentication and authorization logic
  - Channel management and subscription handling
  - Python SDK client functionality
  
- **Integration Tests**: Service integration testing
  - WebSocket communication protocols
  - HTTP API endpoint functionality  
  - Database and external service integration
  
- **End-to-End Tests**: Complete workflow testing
  - Multi-client pub/sub scenarios
  - Authentication flows
  - Error recovery and resilience

### Code Quality

```bash
# Format code
poetry run black .

# Lint code
poetry run ruff check .

# Type checking
poetry run mypy .

# Run all quality checks
./scripts/test.sh  # Includes quality checks
```

### Database Migrations

```bash
# Create new migration
poetry run alembic revision --autogenerate -m "description"

# Apply migrations
poetry run alembic upgrade head

# Rollback migration
poetry run alembic downgrade -1
```

## Architecture

```
┌─────────────────────┐
│   Transport Layer   │  (Starlette WebSocket)
├─────────────────────┤
│ Application Layer   │  (Connection Management, Message Routing)
├─────────────────────┤
│  Integration Layer  │  (Optional AI Processing Hooks)
├─────────────────────┤
│    Domain Layer     │  (Channel Management, Message Schemas)
├─────────────────────┤
│ Infrastructure Layer│  (Database, Redis, OpenSearch)
└─────────────────────┘
```

### Key Components

- **Connection Manager**: Handles WebSocket lifecycle and routing
- **Channel Manager**: Manages channel subscriptions and message routing
- **Message Schemas**: Pydantic models for type-safe message handling
- **Authentication**: JWT-based authentication with role-based access control
- **Persistence**: PostgreSQL for message history and connection tracking
- **Search**: OpenSearch for full-text search and analytics
- **Caching**: Redis for pub/sub and high-performance caching

## Scaling

### Horizontal Scaling

The server supports horizontal scaling through Redis pub/sub:

1. Run multiple server instances
2. All instances connect to the same Redis
3. Messages are automatically distributed across instances
4. WebSocket connections are load-balanced

### Performance Tuning

- Adjust PostgreSQL connection pool size
- Configure Redis memory limits
- Set appropriate OpenSearch sharding
- Tune rate limiting parameters

## Security

- **Authentication**: JWT tokens with configurable expiration
- **Authorization**: Channel-based access control with team/user permissions
- **Rate Limiting**: Per-client message and connection limits
- **Input Validation**: Comprehensive message validation and sanitization
- **CORS**: Configurable CORS policies for web applications

## Monitoring

### Health Check

```bash
curl http://localhost:8000/health
```

### Server Statistics

```bash
curl http://localhost:8000/api/v1/stats
```

### OpenSearch Dashboards

Access analytics at `http://localhost:5601` when using Docker Compose.

## Examples & Demos

### Quick Demo

To see the server in action with real-time messaging:

```bash
# Start the server first
docker compose up -d

# Run the simple demo (1 publisher, 1 subscriber)
./run_demo.sh simple

# Run the full live demo (3 publishers, 2 subscribers, 60 seconds)
./run_demo.sh live
```

### Demo Features

- **Simple Demo** (`examples/simple_demo.py`):
  - 1 publisher sending 5 different message types
  - 1 subscriber receiving and displaying messages
  - Perfect for quick testing and understanding basic functionality

- **Live Demo** (`examples/live_demo.py`):
  - 3 publishers with different sending intervals (fast, medium, slow)
  - 2 subscribers listening to multiple channels
  - Real-time statistics and colored output
  - Demonstrates concurrent messaging and channel routing
  - Generates realistic message types: alerts, metrics, notifications, status updates

### Other Examples

See the `examples/` directory for complete working examples:

- `basic_client.py` - Simple pub/sub client
- `authenticated_client.py` - Client with authentication and team channels  
- `http_publisher.py` - Publishing via HTTP API
- `simple_demo.py` - Quick demonstration script
- `live_demo.py` - Full-featured real-time demo
- `run_demo.sh` - Demo runner script with multiple options

### Demo Runner Usage

```bash
# Available demo options
./run_demo.sh help

# Quick functionality test
./run_demo.sh simple

# Full real-time demonstration  
./run_demo.sh live

# Check server status
./run_demo.sh check

# Run specific examples
./run_demo.sh basic    # Basic client
./run_demo.sh auth     # Authenticated client
./run_demo.sh http     # HTTP publisher
```

## Documentation

Comprehensive documentation is available in the `docs/` directory:

- **[LLM Quick Reference](docs/LLM.md)** - Concise guide for AI/LLM agents and developers
- **[API Reference](docs/API.md)** - Complete WebSocket and HTTP API documentation
- **[System Architecture](docs/ARCHITECTURE.md)** - Technical architecture and design patterns
- **[Kubernetes Deployment](docs/DEPLOYMENT.md)** - Production deployment guide with Helm charts

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make changes with tests
4. Run quality checks (`black`, `ruff`, `mypy`, `pytest`)
5. Submit a pull request

## License

[License information here]
