Metadata-Version: 2.4
Name: camera-ui-rpc
Version: 1.0.1
Summary: RPC Client for camera.ui
Author-email: seydx <dev@seydx.com>
Maintainer-email: seydx <dev@seydx.com>
Project-URL: Homepage, https://github.com/seydx/camera.ui
Project-URL: Documentation, https://github.com/seydx/camera.ui
Project-URL: Repository, https://github.com/seydx/camera.ui
Project-URL: Bug Tracker, https://github.com/seydx/camera.ui/issues
Project-URL: Changelog, https://github.com/seydx/camera.ui/blob/master/CHANGELOG.md
Keywords: camera.ui,python,nats,rpc
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE.md
Requires-Dist: typing_extensions>=4.15.0
Requires-Dist: nats-py>=2.12.0
Requires-Dist: ormsgpack>=1.12.0
Dynamic: license-file

# RPC Library for Python

A high-performance, type-safe RPC library built on NATS messaging system for Python applications.

## Features

- 🚀 **High Performance**: Achieves 300-1500+ MB/s throughput with sub-millisecond latency
- 🔒 **Type Safety**: Full type annotations and runtime type checking
- 🌊 **Streaming**: Async generators with push/pull patterns
- 🔄 **Auto-reconnection**: Resilient connection management
- 📦 **Auto-chunking**: Transparent handling of large payloads
- 🎯 **Service Discovery**: Automatic service registration and discovery
- ⚖️ **Load Balancing**: Built-in queue-based load distribution
- 🔀 **Channels**: Bidirectional real-time communication
- 🎨 **Decorators**: Clean API with Python decorators
- 🤝 **Cross-language**: Full compatibility with TypeScript implementation

## Installation

```bash
pip install camera-ui-rpc
```

## Quick Start

### Basic RPC Handlers

For simple RPC endpoints without service discovery:

```python
from camera_ui_rpc import create_rpc_client, RPCClass
import asyncio

# Define your handlers
@RPCClass
class MathHandlers:
    async def add(self, a: float, b: float) -> float:
        return a + b
    
    async def multiply(self, a: float, b: float) -> float:
        return a * b
    
    # Streaming method
    async def fibonacci(self, n: int):
        a, b = 0, 1
        for _ in range(n):
            yield a
            a, b = b, a + b

async def main():
    # Server: Register RPC handlers
    server = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'math-server'
    })
    
    await server.connect()
    
    # Register handlers under a namespace
    await server.register_handler('math', MathHandlers())
    
    print('Math RPC handlers registered')
    await asyncio.Event().wait()

asyncio.run(main())
```

```python
# Client: Use RPC proxy
async def main():
    client = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'math-client'
    })
    
    await client.connect()
    
    # Create a typed proxy (no service discovery)
    math = client.create_proxy('math')
    
    # Call methods
    result = await math.add(5, 3)
    print(f'5 + 3 = {result}')  # 8
    
    # Use streaming
    async for num in math.fibonacci(10):
        print(f'Fibonacci: {num}')

asyncio.run(main())
```

### NATS Micro Services

For production services with discovery, monitoring, and load balancing:

```python
from camera_ui_rpc import ServiceConfig

# Server: Register as NATS micro service
async def main():
    server = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'math-service'
    })
    
    await server.connect()
    
    # Register as a proper NATS micro service
    await server.service.register_handler(
        ServiceConfig(
            name='math',
            version='1.0.0',
            description='Math operations service',
            queue_group='math-workers'  # For load balancing
        ),
        MathHandlers()
    )
    
    print('Math micro service is running')
    await asyncio.Event().wait()
```

```python
# Client: Discover and use service
async def main():
    client = create_rpc_client({
        'servers': ['nats://localhost:4222'],
        'name': 'client'
    })
    
    await client.connect()
    
    # Discover service by name (with load balancing)
    math = await client.create_service_proxy('math')
    
    # Use the service
    result = await math.add(10, 20)
```

## Core Concepts

### RPC Client

The foundation of all operations:

```python
from camera_ui_rpc import create_rpc_client

client = create_rpc_client({
    'servers': ['nats://localhost:4222'],
    'name': 'my-app',
    'auth': {'user': 'app', 'pass': 'secret'},
    'timeout': 5000,  # 5 second default timeout
    'reconnect': True,
    'maxReconnectAttempts': -1  # infinite
})

await client.connect()
```

### RPC Handlers vs NATS Services

**RPC Handlers** (`register_handler`):
- Simple namespace-based RPC endpoints
- Direct addressing via namespace
- No automatic discovery or load balancing
- Lightweight for internal communication

**NATS Micro Services** (`service.register_handler`):
- Full NATS micro service features
- Service discovery via name
- Automatic load balancing with queue groups
- Monitoring and stats
- Ideal for production microservices

### Handler Organization

```python
from camera_ui_rpc import RPCClass, RPCNested
from typing import Dict, List, Optional

@RPCClass
class UserHandlers:
    def __init__(self):
        self.users: Dict[str, User] = {}
    
    async def get_user(self, user_id: str) -> Optional[User]:
        return self.users.get(user_id)
    
    async def create_user(self, data: UserData) -> User:
        user = User(id=generate_id(), **data)
        self.users[user.id] = user
        return user
    
    # Nested object for organization
    @RPCNested
    class admin:
        def __init__(self, parent):
            self.parent = parent
        
        async def list_users(self) -> List[User]:
            return list(self.parent.users.values())
        
        async def delete_user(self, user_id: str) -> bool:
            return self.parent.users.pop(user_id, None) is not None

# Register as handlers or service
await client.register_handler('users', UserHandlers())  # Simple RPC
# OR
await client.service.register_handler(                  # NATS service
    ServiceConfig(
        name='users',
        version='1.0.0',
        queue_group='user-workers'
    ),
    UserHandlers()
)
```

### Channels

Real-time bidirectional communication between multiple clients:

```python
# Server/Client A - Join channel and listen
client_a = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'client-a'})
await client_a.connect()

chat_a = await client_a.channel('room:general')

def on_message_a(msg):
    print(f"[Client A received] {msg['user']}: {msg['text']}")

chat_a.on('message', on_message_a)

# Client B - Join same channel
client_b = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'client-b'})
await client_b.connect()

chat_b = await client_b.channel('room:general')

def on_message_b(msg):
    print(f"[Client B received] {msg['user']}: {msg['text']}")

chat_b.on('message', on_message_b)

# Send messages - all clients in channel receive them
await chat_a.send({'user': 'Alice', 'text': 'Hello everyone!'})
# Output on Client B: [Client B received] Alice: Hello everyone!

await chat_b.send({'user': 'Bob', 'text': 'Hi Alice!'})
# Output on Client A: [Client A received] Bob: Hi Alice!

# Request/Reply pattern in channels
# Client B handles requests
async def handle_info_request(msg):
    if msg.get('type') == 'get-info':
        return {'users': 2, 'topic': 'general chat'}

await chat_b.on_request(handle_info_request)

# Client A makes request
info = await chat_a.request({'type': 'get-info'})
print(f"Channel info: {info}")  # {'users': 2, 'topic': 'general chat'}
```

### Private Channels

Direct one-to-one communication between specific clients:

```python
# Client A (Alice) - Create private channel to Bob
alice_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'alice'})
await alice_client.connect()

# Both clients must use the same channel_id and specify the target client
alice_to_bob = await alice_client.private_channel('secret-chat', 'bob')

def on_alice_message(msg):
    print(f"[Alice received from Bob] {msg}")

alice_to_bob.on('message', on_alice_message)

# Client B (Bob) - Create private channel to Alice
bob_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'bob'})
await bob_client.connect()

# Bob must use the same channel_id ('secret-chat') to connect
bob_to_alice = await bob_client.private_channel('secret-chat', 'alice')

def on_bob_message(msg):
    print(f"[Bob received from Alice] {msg}")

bob_to_alice.on('message', on_bob_message)

# Exchange private messages
await alice_to_bob.send({'text': 'Hi Bob, this is private!', 'timestamp': '10:00'})
# Output: [Bob received from Alice] {'text': 'Hi Bob, this is private!', 'timestamp': '10:00'}

await bob_to_alice.send({'text': 'Hi Alice, got your message!', 'timestamp': '10:01'})
# Output: [Alice received from Bob] {'text': 'Hi Alice, got your message!', 'timestamp': '10:01'}

# Private channels are isolated - other clients cannot see these messages
charlie_client = create_rpc_client({'servers': ['nats://localhost:4222'], 'name': 'charlie'})
await charlie_client.connect()
# Charlie cannot see Alice-Bob messages even if trying to listen
```

## Advanced Features

### Streaming

Two streaming patterns for different use cases:

```python
# Push-based (server controls flow) - better performance
async def generate_data(self, count: int):
    # Method name includes "generate" for push-based iteration
    for i in range(count):
        yield {'index': i, 'data': b'x' * (1024 * 1024)}  # 1MB

# Pull-based (client controls flow) - better backpressure
async def pull_data(self, count: int):
    # Method name includes "pull" for pull-based iteration
    for i in range(count):
        yield {'index': i, 'data': await self.load_data(i)}

# Client usage
service = client.create_proxy('data')

async for item in service.generate_data(100):
    # Process as fast as server sends
    pass

async for item in service.pull_data(100):
    # Pull items at client's pace
    await process_item(item)
```

### Error Handling

```python
from camera_ui_rpc import RPCException, ErrorCode

service = client.create_proxy('myservice')

try:
    await service.some_method()
except RPCException as e:
    if e.code == ErrorCode.METHOD_NOT_FOUND:
        print('Method does not exist')
    elif e.code == ErrorCode.TIMEOUT:
        print('Request timed out')
    elif e.code == ErrorCode.CONNECTION_CLOSED:
        print('Connection lost')
```

### Auto-chunking

Large payloads are automatically chunked:

```python
service = client.create_proxy('data')

# Automatically chunks data > server limit
large_data = b'x' * (100 * 1024 * 1024)  # 100MB
await service.process_large_data(large_data)  # Works transparently!
```

### Isolated Connections

Isolated connections provide separate NATS connections for specific operations, preventing blocking of the main connection. They're available for:

- **Proxies**: `create_proxy()` and `create_service_proxy()`
- **Handlers**: `register_handler()`
- **Channels**: `channel()` and `private_channel()`

```python
# 1. Isolated proxy (returns object with proxy and close method)
isolated = client.create_proxy('service', isolated_connection=True)
result = await isolated.proxy.heavy_computation(params)
await isolated.close()  # Close when done

# 2. Isolated service proxy
service_isolated = await client.create_service_proxy('service', 
    isolated_connection=True
)
await service_isolated.proxy.process_large_dataset()
await service_isolated.close()

# 3. Isolated handler (for CPU-intensive services)
cleanup = await client.register_handler('heavy-service', handlers, 
    isolated_connection=True
)
# Later: await cleanup() to unregister and close

# 4. Isolated channel
channel = await client.channel('data-stream', isolated_connection=True)
```

**Important**: 
- Isolated connections must be closed explicitly when no longer needed
- Alternatively, closing the main client (`client.disconnect()`) will automatically close all isolated connections and channels
- Use isolated connections for: CPU-intensive operations, high-throughput streaming, or operations that might block

### Property Access

Expose class properties for remote access using descriptors:

```python
from camera_ui_rpc import RPCClass, RPCProperty

@RPCClass
class ConfigService:
    # Use RPCProperty descriptor
    name = RPCProperty()
    version = RPCProperty()
    max_connections = RPCProperty()
    
    def __init__(self):
        self.name = 'Config Service'
        self.version = '1.0.0'
        self.max_connections = 100

# Client usage
config = client.create_proxy('config')

# Read properties
name = await config.name
version = await config.version

# Update properties via setter methods
await config.setName('Updated Service')
await config.setMaxConnections(200)
```

## Configuration

### Client Options

```python
from camera_ui_rpc.types import RPCClientOptions

options: RPCClientOptions = {
    'servers': ['nats://localhost:4222'],  # NATS servers
    'name': 'my-client',                   # Client identifier
    'auth': {                              # Authentication
        'user': 'username',
        'pass': 'password',
        'token': 'auth_token',
        'jwt': 'jwt_token'
    },
    'tls': {                               # TLS configuration
        'cert': 'path/to/cert.pem',
        'key': 'path/to/key.pem',
        'ca': 'path/to/ca.pem'
    },
    'timeout': 5000,                       # Default timeout (ms)
    'reconnect': True,                     # Auto-reconnect
    'max_reconnect_attempts': -1,          # -1 for infinite
    'reconnect_time_wait': 2000,           # Reconnect delay (ms)
    'max_payload_size': 4194304            # Override server limit
}
```

### Service Options

```python
from camera_ui_rpc import ServiceConfig

service_config = ServiceConfig(
    name='my-service',                     # Service name
    version='1.0.0',                       # Semantic version
    description='My service',              # Service description
    queue_group='my-service-queue',        # Load balancing group
    metadata={                             # Custom metadata
        'region': 'us-east',
        'environment': 'production'
    }
)

# Register service
await server.service.register_handler(service_config, MyHandlers())
```

## Cross-Language Compatibility

This library is fully compatible with the TypeScript/Node.js implementation:

- Identical wire protocol and message format
- Same feature set and API patterns
- Seamless service interoperability
- Shared channel communication

### Example: Python calling TypeScript service

```python
# TypeScript service running
math = await client.create_service_proxy('math-service')
result = await math.calculate(10, 20)  # Works seamlessly!
```

## Examples

Check the `examples/` directory for complete examples:

- `service.py` - Basic service implementation
- `channel_communication.py` - Channel messaging
- `streaming.py` - Streaming patterns
- `multi_service.py` - Multiple services interaction
- `large_data_transfer.py` - Handling large payloads
- And 17 more examples...

## License

MIT

---

*Part of the camera.ui ecosystem - A comprehensive camera management solution.*
