Metadata-Version: 2.4
Name: janus-api
Version: 0.1.5
Summary: Python bindings for Janus webrtc Media Server
Author-email: Leydotpy <leydotpy.dev@gmail.com>
Requires-Python: >=3.14
Description-Content-Type: text/markdown
Requires-Dist: aiohttp>=3.13.2
Requires-Dist: asgiref>=3.10.0
Requires-Dist: httpx>=0.28.1
Requires-Dist: pydantic>=2.12.4
Requires-Dist: pyee>=13.0.0
Requires-Dist: python-decouple>=3.8
Requires-Dist: reactivex>=4.1.0
Requires-Dist: redis>=7.0.1
Requires-Dist: websockets>=15.0.1

# Janus API - Python Client

A modern, async Python client library for the [Janus WebRTC Gateway](https://janus.conf.meetecho.com/).

## Features

- ✨ **Async/Await Support** - Built on `asyncio` for modern Python applications
- 🔌 **WebSocket Transport** - Real-time communication with Janus server via WebSockets
- 📹 **VideoRoom Plugin** - Full support for multi-party video conferencing with Publisher/Subscriber patterns
- 🎙️ **AudioBridge Plugin** - Audio conferencing capabilities
- 💬 **TextRoom Plugin** - Text chat rooms
- 📞 **P2P Plugin** - Peer-to-peer video calls
- 📡 **Streaming Plugin** - Media streaming support
- ☎️ **SIP Plugin** - SIP integration
- 🔒 **Type-Safe** - Fully typed with Pydantic models
- ⚡ **Event-Driven** - ReactiveX (RxPY) support for reactive programming
- 🔄 **Auto-Reconnection** - Built-in reconnection logic with exponential backoff
- 💓 **Keep-Alive Management** - Automatic session keep-alive handling
- 🌐 **ASGI Integration** - Built-in support for FastAPI/Starlette applications

## Installation

```bash
# Using uv (recommended)
uv add janus-api
```

# Using pip
```bash
pip install janus-api
```

## Quick Start

### Basic Session Setup
```python
import asyncio
from janus_api import Janus


async def main():
    # Get the singleton session instance
    session = Janus.get_session()
    await session.create()

    # Your code here...

    # Clean up
    await session.destroy()


asyncio.run(main())
```

### Simplified Setup/Teardown
```python
from janus_api import Janus
```
# Create session in background thread
```python
Janus.setup()
```

# Your application code...

# Clean up when done
```python
Janus.teardown()
```

## Configuration

Configure the Janus server URL via environment variable:

```bash
# .env file
JANUS_SESSION_URL=ws://localhost:8188/janus
```
Or use default: `ws://localhost:8188/janus`

## Plugin Usage

### Attaching Plugins

The library uses a unified `Plugin.attach()` interface with automatic plugin discovery:
```python
from janus_api import Plugin
```

# Attach to VideoRoom as publisher
```python
publisher = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    username="alice"
)
```

# Attach to VideoRoom as subscriber
```python
subscriber = await Plugin.attach(
    type="videoroom",
    mode="subscriber",
    room=1234
)
```

# Other plugins
```python
audiobridge = await Plugin.attach(type="audiobridge", room=5678)
textroom = await Plugin.attach(type="textroom", room=9012)
p2p = await Plugin.attach(type="p2p")
streaming = await Plugin.attach(type="streaming")
sip = await Plugin.attach(type="sip")
```

### Available Plugins

| Plugin          | Type          | Description                    |
|-----------------|---------------|--------------------------------|
| **VideoRoom**   | `videoroom`   | Multi-party video conferencing |
| **AudioBridge** | `audiobridge` | Audio conferencing rooms       |
| **TextRoom**    | `textroom`    | Text chat rooms                |
| **P2P**         | `p2p`         | Peer-to-peer video calls       |
| **Streaming**   | `streaming`   | Media streaming                |
| **SIP**         | `sip`         | SIP gateway integration        |

## VideoRoom Plugin

### Publisher Workflow

```python
from janus_api import Plugin
```

# Attach as publisher
```python
publisher = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    username="alice"
)
```

# Join room and configure media in one step

```python
response = await publisher.join_and_configure(
    sdp="<your-sdp-offer>",
    sdp_type="offer",
    audio=True,
    video=True,
    data=True
)
```

# Handle SDP answer
```python
print(f"SDP Answer: {response.jsep.sdp}")
```

# Send ICE candidates
```python

from janus_api.models.request import TrickleCandidate

await publisher.trickle([
    TrickleCandidate(
        sdpMLineIndex=0,
        candidate="<ice-candidate-string>"
    )
])
```

# Signal ICE complete

```python
await publisher.complete_trickle()
```

# Leave and detach when done
```python
await publisher.leave()
await publisher.detach()
```

### Subscriber Workflow
```python
from janus_api import Plugin
from janus_api.models.videoroom.request import SubscriberStreams
```

# Attach as subscriber
```python
subscriber = await Plugin.attach(
    type="videoroom",
    mode="subscriber",
    room=1234
)
```

# Subscribe to publisher's streams

```python

streams = [
    SubscriberStreams(
        feed='<publisher-id>',
        mid='0',  # optional
        sub_mid='1'  # optional
    )
]

response = await subscriber.join(streams=streams)
```

# Send SDP answer to start receiving media

```python
await subscriber.watch(
    sdp="<your-sdp-answer>",
    sdp_type="answer"
)
```

# Update subscriptions dynamically
```python
await subscriber.update(
    add=[SubscriberStreams(feed='<new-publisher-id>')],
    drop=[{'feed': '<old-publisher-id>'}]
)
```

# Pause/Resume media
```python
await subscriber.pause()
await subscriber.resume()
```

# Leave and detach
```python
await subscriber.leave()
await subscriber.detach()
```

# Room Management

### Create a new room

```python
await publisher.create(
    room=1234,
    description="My Video Room",
    publishers=10,
    audiocodec="opus",
    videocodec="vp8",
    record=False,
    permanent=False
)
```

# Check if room exists
```python
exists = await publisher.exists()
```

# List participants
```python
participants = await publisher.participants()
for p in participants:
    print(f"{p.display} - Publisher ID: {p.id}")

```

# Kick a user

```python
await publisher.kick(
    password="<room-password>",
    user_id="<user-id-to-kick>"
)
```

# Moderate room
```python
await publisher.moderate(
    room=1234,
    secret="<admin-secret>",
    moderator_id="<moderator-user-id>"
)

```

# Destroy room
```python
await publisher.destroy(
    secret="<admin-secret>",
    permanent=True
)
```

# Manage allowed tokens
```python
await publisher.allowed(
    passcode="<room-passcode>",
    action="add",
    tokens=["token1", "token2"]
)
```
## Event Handling

### Callback-Based Events
```python
from janus_api.models.response import JanusResponse


def on_plugin_event(event: JanusResponse):
    """Handle plugin events"""
    if event.janus == "event":
        print(f"Event: {event.plugindata.data}")
    elif event.janus == "webrtcup":
        print("WebRTC connection is up!")
    elif event.janus == "hangup":
        print("Call ended")


plugin = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    on_event=on_plugin_event
)
```
### ReactiveX (RxPY) Streams

The library uses ReactiveX for reactive event handling:
```python
def on_rx_event(event):
    """Handle reactive events"""
    event_type = event.get('event')
    payload = event.get('payload')
    sender = event.get('from')
    
    if event_type == 'sdp':
        print(f"Received SDP from {sender}: {payload}")
    elif event_type == 'webrtc':
        print(f"WebRTC event from {sender}: {payload}")


plugin = await Plugin.attach(
    type="videoroom",
    mode="publisher",
    room=1234,
    on_rx_event=on_rx_event
)

# Reactive stream is automatically started
# Stop when needed (usually not required as it's handled internally)
plugin.stop()
```
## ASGI Integration

### FastAPI/Starlette Integration
```python
from fastapi import FastAPI
from janus_api.servers.asgi import JanusASGILifespanWrapper

app = FastAPI()

# Wrap your app with Janus lifespan management
app = JanusASGILifespanWrapper(app)

# Janus session is now automatically managed
# It will be created on startup and destroyed on shutdown

@app.get("/")
async def root():
    from janus_api.servers.rpc import get_session
    session = get_session()
    return {"session_id": session.id}
```
The `JanusASGILifespanWrapper` handles:
- ✅ Automatic session creation on application startup
- ✅ Session keep-alive management
- ✅ Graceful session destruction on shutdown
- ✅ WebSocket connection management
- ✅ ReactiveX stream lifecycle

### Django Integration

For Django projects, use the global accessor:
```python
from janus_api import Janus

# In your Django view/middleware
def my_view(request):
    session = Janus.get_session()
    if session:
        # Use the session
        pass
```
Set up in your Django app configuration:
```python
from django.apps import AppConfig
from janus_api import Janus
from janus_api.session import WebsocketSession

class MyAppConfig(AppConfig):
    def ready(self):
        # Initialize Janus manager
        session = WebsocketSession()
        Janus.set_manager(session)
        
        # Start session
        Janus.setup()
```
## Session Management

### Manual Session Creation
```python
from janus_api.session import WebsocketSession

session = WebsocketSession(session_id="custom-id")  # Optional custom ID
await session.create()

# Session automatically manages:
# - WebSocket connection
# - Keep-alive messages (every 15 seconds)
# - Plugin registry
# - Event dispatching

await session.destroy()
```
### Session as Context Manager
```python
from contextlib import asynccontextmanager
from janus_api.servers.rpc import get_session
from janus_api.plugins import Plugin

@asynccontextmanager
async def janus_context():
    session = get_session()
    try:
        await session.create()
        yield session
    finally:
        await session.destroy()

async def manage_session():
    async with janus_context() as session:
        plugin = await Plugin.attach(type="videoroom", mode="publisher", room=1234)
        # Use plugin...
```
### Singleton Pattern

The session uses a singleton pattern via metaclass, ensuring only one session instance exists:
```python
from janus_api.servers.rpc import get_session

session1 = get_session()
session2 = get_session()

assert session1 is session2  # True - same instance
```
## Advanced Features

### Plugin Registry

Plugins are automatically registered and discoverable:
```python
from janus_api.plugins.base import PluginRegistry

# List all registered plugins
plugins = PluginRegistry.list_plugins()
print(plugins)  # ['videoroom', 'audiobridge', 'textroom', 'p2p', 'streaming', 'sip']

# Get plugin class by name
VideoRoomPlugin = PluginRegistry.get_plugin('videoroom')
```

### Custom Plugin Development

Create custom plugins by extending `PluginBase`

```python
from janus_api.plugins.base import PluginBase, PluginRegistry
from janus_api.models.request import JanusRequest
from janus_api.plugins import Plugin

@PluginRegistry.register(name="mycustom")
class MyCustomPlugin(PluginBase):
    name = "janus.plugin.mycustom"
    
    async def custom_method(self):
        request = JanusRequest(janus="create", transaction="...")
        response = await self.send(
            request
        )
        return response

# Use it
async def custom_attach():
    plugin = await Plugin.attach(type="mycustom")
    await plugin.custom_method()
```
### Transport Layer

The WebSocket transport includes:
- **Automatic reconnection** with exponential backoff
- **Transaction management** for request/response pairing
- **JSEP extraction** for SDP handling
- **ReactiveX streams** for event dispatching
- **Ping/Pong keepalive** (20 second intervals)

```python
# Access transport directly (advanced usage)
from janus_api.servers.rpc import get_session

session = get_session()
transport = session.transport

# Transport metrics
print(transport._metrics)
# {
#     "received": 42,
#     "futures_resolved": 38,
#     "errors": 0,
#     "webrtc_events": 12
# }
```
## Error Handling
```python
from janus_api.core.exceptions import JanusException
from janus_api.plugins import Plugin

async def attach():
    try:
        plugin = await Plugin.attach(type="videoroom", mode="publisher", room=1234)
        await plugin.join()
    except JanusException as e:
        print(f"Janus error: {e}")
    except ConnectionError as e:
        print(f"Connection error: {e}")
    except TimeoutError as e:
        print(f"Request timeout: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
```
## API Reference

### Plugin Base Methods

All plugins inherit these methods:

| Method                | Description                         |
|-----------------------|-------------------------------------|
| `attach()`            | Attach plugin to session            |
| `detach()`            | Detach plugin from session          |
| `send(body, jsep)`    | Send message to plugin              |
| `trickle(candidates)` | Send ICE candidates                 |
| `complete_trickle()`  | Signal ICE gathering complete       |
| `start()`             | Start reactive stream (auto-called) |
| `stop()`              | Stop reactive stream                |

### VideoRoom Publisher Methods

| Method                                        | Parameters             | Description                    |
|-----------------------------------------------|------------------------|--------------------------------|
| `join(**kwargs)`                              | Various room options   | Join room as publisher         |
| `join_and_configure(sdp, sdp_type, **kwargs)` | SDP + media options    | Join and configure in one step |
| `publish(sdp, sdp_type, **kwargs)`            | SDP + media options    | Publish media                  |
| `configure(sdp, sdp_type, **kwargs)`          | SDP + media options    | Reconfigure publisher          |
| `unpublish()`                                 | None                   | Stop publishing                |
| `leave()`                                     | None                   | Leave the room                 |
| `create(**kwargs)`                            | Room configuration     | Create a new room              |
| `destroy(secret, permanent)`                  | Admin credentials      | Destroy a room                 |
| `exists()`                                    | None                   | Check if room exists           |
| `participants()`                              | None                   | List room participants         |
| `kick(password, user_id)`                     | Kick user(s) from room |
| `moderate(**kwargs)`                          | Moderation options     | Moderate room settings         |
| `allowed(passcode, action, tokens)`           | Token management       | Manage allowed tokens          |

### VideoRoom Subscriber Methods

| Method                 | Parameters            | Description                  |
|------------------------|-----------------------|------------------------------|
| `join(streams)`        | List of streams       | Join as subscriber           |
| `subscribe(streams)`   | List of streams       | Subscribe to streams         |
| `watch(sdp, sdp_type)` | SDP answer            | Start watching streams       |
| `update(add, drop)`    | Stream modifications  | Update subscriptions         |
| `unsubscribe(streams)` | List of streams       | Unsubscribe from streams     |
| `configure(streams)`   | Stream configurations | Configure subscriber streams |
| `pause()`              | None                  | Pause receiving media        |
| `resume()`             | None                  | Resume receiving media       |
| `leave()`              | None                  | Leave the room               |

### Session Methods

| Method              | Description                    |
|---------------------|--------------------------------|
| `create()`          | Create session and connect     |
| `destroy()`         | Destroy session and disconnect |
| `attach(plugin)`    | Attach plugin by name          |
| `detach(handle_id)` | Detach plugin by handle ID     |
| `send(data)`        | Send request to Janus          |

## Requirements

- Python 3.10+
- asyncio
- websockets
- pydantic
- pyee (event emitters)
- reactivex (reactive streams)
- decouple (configuration)
- asgiref (ASGI support)

## Project Structure


janus-api/
├── src/
│   └── janus_api/
│       ├── core/              # Core functionality
│       │   ├── manager.py     # Plugin manager
│       │   ├── utils.py       # Utilities
│       │   └── exceptions.py  # Custom exceptions
│       ├── models/            # Pydantic models
│       │   ├── base.py        # Base models
│       │   ├── request.py     # Request models
│       │   ├── response.py    # Response models
│       │   └── videoroom/     # VideoRoom-specific models
│       ├── plugins/           # Plugin implementations
│       │   ├── base.py        # Base plugin class
│       │   ├── videoroom.py   # VideoRoom plugin
│       │   ├── audiobridge.py # AudioBridge plugin
│       │   ├── textroom.py    # TextRoom plugin
│       │   ├── p2p.py         # P2P plugin
│       │   ├── streaming.py   # Streaming plugin
│       │   └── sip.py         # SIP plugin
│       ├── session/           # Session management
│       │   ├── base.py        # Abstract base session
│       │   └── websocket.py   # WebSocket session
│       ├── transport/         # Transport layer
│       │   └── websocket.py   # WebSocket client
│       ├── servers/           # Server integrations
│       │   └── asgi/          # ASGI support
│       │       ├── asgi.py    # Lifespan wrapper
│       │       └── rpc.py     # RPC helpers
│       └── types/             # Type definitions
├── pyproject.toml
└── README.md

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

[MIT]

## Credits

Built for use with the [Janus WebRTC Gateway](https://janus.conf.meetecho.com/).

## Support

For issues and questions:
- GitHub Issues: [https://github.com/Leydotpy/Janus-API/issues]
- Documentation: [https://pypi.org/project/janus-api/]

## Roadmap

- [ ] HTTP/REST transport support
- [ ] Additional plugin method implementations
- [ ] Comprehensive test coverage
- [ ] Enhanced documentation with more examples
- [ ] Type stubs for better IDE support
- [ ] Recording and playback support
- [ ] E2E encryption helpers
