Metadata-Version: 2.4
Name: rivven
Version: 0.0.15
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Rust
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Framework :: AsyncIO
Classifier: Typing :: Typed
Requires-Dist: pytest>=7.0 ; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21 ; extra == 'dev'
Requires-Dist: mypy>=1.0 ; extra == 'dev'
Requires-Dist: ruff>=0.1 ; extra == 'dev'
Provides-Extra: dev
Summary: High-performance Python client for Rivven distributed streaming platform
Keywords: streaming,kafka,messaging,distributed,async
License: Apache-2.0
Requires-Python: >=3.8
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Documentation, https://github.com/hupe1980/rivven/tree/main/crates/rivven-python
Project-URL: Homepage, https://github.com/hupe1980/rivven
Project-URL: Repository, https://github.com/hupe1980/rivven

# rivven-python

> High-performance Python bindings for the Rivven event streaming platform.

## Features

- **Native Performance**: Zero-copy message handling through Rust bindings (PyO3)
- **Async-First**: Full async/await support with Python's asyncio
- **Type-Safe**: Complete type annotations for IDE support
- **Transaction Support**: Exactly-once semantics with transactional producers
- **Authentication**: Multiple auth methods (simple, SCRAM-SHA-256)
- **Admin Operations**: Full topic and partition management

## Installation

```bash
pip install rivven
```

## Usage

### Basic Connection

```python
import asyncio
import rivven

async def main():
    # Connect to Rivven cluster
    client = await rivven.connect("localhost:9092")
    
    # Create a topic
    await client.create_topic("my-topic", partitions=3)
    
    # List topics
    topics = await client.list_topics()
    print(f"Topics: {topics}")

asyncio.run(main())
```

### Producer

```python
import asyncio
import rivven

async def produce():
    client = await rivven.connect("localhost:9092")
    producer = client.producer("my-topic")
    
    # Send a message
    offset = await producer.send(b'{"event": "login"}', key=b"user-123")
    print(f"Sent at offset: {offset}")
    
    # Send to specific partition
    await producer.send_to_partition(b"value", partition=0, key=b"key")
    
    # Batch send for better throughput
    offsets = await producer.send_batch([b"msg1", b"msg2", b"msg3"])

asyncio.run(produce())
```

### Consumer

```python
import asyncio
import rivven

async def consume():
    client = await rivven.connect("localhost:9092")
    consumer = client.consumer("my-topic", group_id="my-group")
    
    # Fetch messages
    messages = await consumer.fetch(max_messages=100)
    for msg in messages:
        print(f"Offset {msg.offset}: {msg.value_str()}")
    
    # Commit offsets
    await consumer.commit()
    
    # Or use async iterator
    async for msg in consumer:
        print(f"Received: {msg.value_str()}")
        await consumer.commit()

asyncio.run(consume())
```

### Admin Operations

```python
import asyncio
import rivven

async def admin():
    client = await rivven.connect("localhost:9092")
    
    # Create topic
    await client.create_topic("new-topic", partitions=3, replication_factor=1)
    
    # List topics
    topics = await client.list_topics()
    print(f"Topics: {topics}")
    
    # Get topic configuration
    configs = await client.describe_topic_configs("new-topic")
    print(f"Configs: {configs}")
    
    # Modify topic configuration
    await client.alter_topic_config("new-topic", "retention.ms", "86400000")
    
    # Add partitions
    await client.create_partitions("new-topic", new_total=6)
    
    # Get offset for timestamp
    offset = await client.get_offset_for_timestamp("new-topic", 0, 1699900000000)
    
    # Delete records before offset
    await client.delete_records("new-topic", 0, before_offset=100)
    
    # Delete topic
    await client.delete_topic("old-topic")

asyncio.run(admin())
```

### Authentication

```python
import asyncio
import rivven

async def authenticated():
    client = await rivven.connect("localhost:9092")
    
    # Simple authentication
    await client.authenticate("username", "password")
    
    # Or SCRAM-SHA-256 authentication
    await client.authenticate_scram("username", "password")
    
    # Use client as normal
    topics = await client.list_topics()

asyncio.run(authenticated())
```

### TLS Connection

```python
import asyncio
import rivven

async def secure():
    # Connect with TLS
    client = await rivven.connect_tls(
        "localhost:9093",
        ca_cert="/path/to/ca.crt",
        client_cert="/path/to/client.crt",  # Optional: mTLS
        client_key="/path/to/client.key",   # Optional: mTLS
    )
    
    topics = await client.list_topics()

asyncio.run(secure())
```

### Transactions (Exactly-Once Semantics)

```python
import asyncio
import rivven

async def transactional():
    client = await rivven.connect("localhost:9092")
    
    # Initialize transactional producer - returns a ProducerState object
    producer_state = await client.init_producer_id("my-txn-id")
    print(f"Producer ID: {producer_state.producer_id}, Epoch: {producer_state.producer_epoch}")
    
    try:
        # Begin transaction
        await client.begin_transaction("my-txn-id", producer_state)
        
        # Add partitions to transaction
        await client.add_partitions_to_txn("my-txn-id", producer_state, [
            ("my-topic", 0),
            ("my-topic", 1),
        ])
        
        # Publish with idempotent semantics - sequence is auto-incremented
        await client.publish_idempotent(
            topic="my-topic",
            value=b"message-1",
            producer_state=producer_state,
            key=b"key-1"
        )
        
        await client.publish_idempotent(
            topic="my-topic",
            value=b"message-2",
            producer_state=producer_state,
            key=b"key-2"
        )
        
        # Commit transaction
        await client.commit_transaction("my-txn-id", producer_state)
        
    except Exception as e:
        # Abort on error
        await client.abort_transaction("my-txn-id", producer_state)
        raise

asyncio.run(transactional())
```

## Exception Handling

Rivven provides a hierarchy of exception types for granular error handling:

```python
from rivven import (
    RivvenException,        # Base exception for all Rivven errors
    ConnectionException,    # Connection-related errors
    ServerException,        # Server-side errors
    TimeoutException,       # Request timeouts
    SerializationException, # Serialization/deserialization errors
    ConfigException,        # Configuration errors
)

try:
    client = await rivven.connect("localhost:9092")
except ConnectionException as e:
    print(f"Failed to connect: {e}")
except TimeoutException as e:
    print(f"Connection timed out: {e}")
except RivvenException as e:
    print(f"General Rivven error: {e}")
```

## Testing

### Running Tests

```bash
# Install test dependencies
pip install -r requirements-test.txt

# Run API tests (no broker required)
pytest tests/test_api.py -v

# Run integration tests (requires running broker)
export RIVVEN_BROKER="localhost:9092"
pytest tests/test_integration.py -v -m integration
```

### Type Checking

The package includes type stubs (`rivven.pyi`) for IDE support and static type checking:

```bash
pip install mypy
mypy your_code.py
```

## Building from Source

```bash
# Install maturin
pip install maturin

# Build wheel
cd crates/rivven-python
maturin build --release

# Install locally
pip install target/wheels/rivven-*.whl

# Development install (editable)
maturin develop --release
```

## Documentation

- [Getting Started](https://rivven.hupe1980.github.io/rivven/docs/getting-started)
- [Python Examples](https://rivven.hupe1980.github.io/rivven/docs/getting-started#python-client)

## License

Apache-2.0. See [LICENSE](../../LICENSE).

