Metadata-Version: 2.3
Name: telcoflow
Version: 1.0.0
Summary: Python SDK for Telcoflow
Keywords: websocket,sdk,voip,telephony
Author: Telcoflow
License: MIT License
         
         Copyright (c) 2024 Telcoflow
         
         Permission is hereby granted, free of charge, to any person obtaining a copy
         of this software and associated documentation files (the "Software"), to deal
         in the Software without restriction, including without limitation the rights
         to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
         copies of the Software, and to permit persons to whom the Software is
         furnished to do so, subject to the following conditions:
         
         The above copyright notice and this permission notice shall be included in all
         copies or substantial portions of the Software.
         
         THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
         IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
         FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
         AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
         LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
         OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
         SOFTWARE.
         
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Dist: websockets>=15.0
Requires-Dist: typing-extensions>=4.8.0
Requires-Dist: dotenv>=0.9.9
Requires-Dist: httpx>=0.27
Requires-Dist: abxbus>=2.4
Requires-Python: >=3.11
Description-Content-Type: text/markdown

# Telcoflow Server Python SDK

A Python SDK for connecting to the Telcoflow Server, handling call events, managing media connections, and processing audio streams.

## Features

- **Dual Authentication**: Support for API Key and mTLS certificates
- **Automatic Reconnection**: Automatically reconnect upon connection lost
- **Frame Demultiplexing**: Automatic routing of Text vs Binary WebSocket frames
- **Async/Await Support**: Built with `asyncio` for high-performance concurrent calls
- **Event Handling**: Decorator-based events for discrete events, async iterators for streams
- **Explicit Session Readiness**: Register handlers, then call `session.ready()` — no events are delivered until you're ready, so nothing is missed
- **Flow Control & Buffering**: Internal byte buffer for outgoing audio with pull-based flow control
- **Interruption Handling**: Ability to clear outgoing buffers instantly when an AI model is interrupted
- **Distributed Handling**: Serialize a `Call` with `to_json()` / `from_json()` to hand media processing to another node

## Logging

The SDK uses Python's standard `logging` module for all log messages. The SDK follows Python logging best practices for shared libraries:

- **Module-level loggers**: Each module uses `logging.getLogger(__name__)` for hierarchical logging
- **No handlers in library**: The SDK does not configure handlers, allowing applications to control logging configuration
- **NullHandler by default**: Prevents "No handler found" warnings if the application doesn't configure logging

### Configuring Logging

To enable logging in your application, configure the logging system before using the SDK:

```python
import logging

# Configure logging for the SDK
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Or configure specific loggers
logging.getLogger('telcoflow').setLevel(logging.DEBUG)
logging.getLogger('telcoflow.session_manager_connection').setLevel(logging.INFO)
logging.getLogger('telcoflow.voice_session').setLevel(logging.INFO)
```

### Log Levels

- **DEBUG**: Detailed diagnostic information (e.g., sent commands, received message types)
- **INFO**: General informational messages (e.g., connection established, call events)
- **WARNING**: Warning messages (e.g., reconnection attempts, missing data)
- **ERROR**: Error messages (e.g., connection failures, parsing errors)

### Security Note

The SDK does not log sensitive information such as:
- API keys
- Authentication tokens
- Certificate contents
- Private keys

Only connection URLs and call IDs are logged for debugging purposes.

## Installation

```bash
pip install telcoflow
```

## Quick Start

Build a config with `SessionManagerConfig.create()`, run inside `async with SessionManager(config) as sm`, and keep the process alive with `await sm.run_forever()`.

The session flow is **notify → open → ready**:

1. Register `@sm.on_session_notification` to receive a `SessionNotification` for each new inbound session.
2. Call `await sm.open_session(noti.session_id)` to claim it and get a `Session`.
3. Register the session's handlers (`@session.on_incoming_call`, …).
4. Call `await session.ready()` — only then does the server start delivering events. Registering before `ready()` guarantees no event is missed.

By default the SDK connects to the production endpoint. Pass `base_url="ws://localhost:8080"` to target a local server.

```python
import asyncio
import logging
import os
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def main():
    config = SessionManagerConfig.create(
        api_key=os.getenv("WSS_API_KEY"),
        connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
        base_url="ws://localhost:8080",                 # omit for the production endpoint
        call_audio=CallAudioConfig(sample_rate=24000),
    )
    try:
        async with SessionManager(config) as sm:
            logger.info("Connected. Waiting for sessions...")

            @sm.on_session_notification
            async def handle_session(noti: SessionNotification):
                session = await sm.open_session(noti.session_id)

                @session.on_incoming_call
                async def on_call(call: Call):
                    logger.info("Incoming call %s from %s", call.id, call.caller_number)

                    @call.on_terminated
                    def on_terminated():
                        logger.info("Call %s terminated", call.id)

                    result = await call.answer()
                    if not result:
                        logger.error("Answer failed: %s (%s)", result.error_message, result.error_code)
                        return

                    async for audio_chunk in call.audio_stream():
                        await call.send_audio(audio_chunk)  # echo back
                    await call.close()

                await session.ready()      # start event delivery (register handlers first!)

            await sm.run_forever()
    except asyncio.CancelledError:
        logger.info("Interrupted.")

if __name__ == "__main__":
    asyncio.run(main())
```

For concurrent receive/send (e.g. separate tasks for reading from `audio_stream()` and writing with `send_audio()`), use `asyncio.TaskGroup` and a queue.

### Authentication

`SessionManagerConfig.create()` accepts either API key credentials or mTLS certificates. The default endpoint is production; pass `base_url=` to override (e.g. for local development).

```python
# API key auth
config = SessionManagerConfig.create(
    api_key=os.getenv("WSS_API_KEY"),
    connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
    call_audio=CallAudioConfig(sample_rate=24000),
)

# mTLS auth
config = SessionManagerConfig.create(
    cert_path="/etc/certs/client.pem",
    key_path="/etc/certs/client.key",
    call_audio=CallAudioConfig(sample_rate=24000),
)
```

Audio settings (sample rate, audio mode, buffer size) live in an optional `CallAudioConfig` passed via `call_audio=`; omit it for the defaults (16000 Hz / MIXED / 1 MB).

## Architecture

The SDK follows a dual-connection architecture:

1. **Session Manager Connection**: Persistent WebSocket connection for session/call events
2. **Voice Session**: Ephemeral WebSocket connection per call for audio streaming

When a new inbound session arrives:
1. The server broadcasts a `session.notify`, delivered to your `@sm.on_session_notification` handler as a `SessionNotification` (`session_id`, `remote_address`, `local_address`, `channel`, `activity`, `created_at`).
2. You call `await sm.open_session(noti.session_id)` to claim it, register the session's handlers, then call `await session.ready()`.
3. For CALL sessions, a `Call` is delivered to your `@session.on_incoming_call` handler. Answering opens the voice session for audio I/O.

To process media on a different node, serialize the `Call` with `call.to_json()`, forward it over your own transport, and reconstruct it with `Call.from_json(data)` on the other side.

## Common Call Flows

Achieve different interaction patterns using the SDK's core commands. Command methods return a `CommandResult` (truthy on success) — check it rather than catching exceptions for operational failures.

### 1. Basic AI Agent (Answer)
The most common flow: answer, listen to the stream, and respond.

```python
@session.on_incoming_call
async def handle_call(call: Call):
    await call.answer()
    async for chunk in call.audio_stream():
        response = await ai_model.generate(chunk)
        await call.send_audio(response)
    await call.close()
```

### 2. AI Assistant (Connect & Whisper)
The agent connects the caller to another party (3-way conference) and can "whisper" private audio to the callee.

```python
@session.on_incoming_call
async def assistant_flow(call: Call):
    await call.answer()
    await call.connect(ring_time_seconds=30)
    await call.whisper()
    await call.send_audio(private_guidance_pcm)
    await call.close()
```

### 3. Distributed Call Handling
Forward the call to another server for media processing.

```python
@session.on_incoming_call
async def forward_call(call: Call):
    # Send handoff JSON to another server via your message queue / HTTP
    await message_queue.publish(call.to_json())

# On the other server:
call = Call.from_json(received_message)
await call.answer()
```

## Event Handling

### Decorator-based Events (Discrete Events)

Use decorators for discrete, one-time events.

```python
@session.on_incoming_call
async def handle_call(call: Call):
    await call.answer()

    @call.on_terminated
    def on_terminated():
        print("Call ended")

    # Server-side call errors (e.g. TTS failures)
    @call.on_call_event(CallEvent.CALL_ERROR)
    def on_error(data):
        print(f"Call error: {data['error_code']} — {data.get('error_message')}")
```

Available `CallErrorCode` values:

| Code | Meaning |
|---|---|
| `CallErrorCode.TTS_SESSION_ERROR` | TTS voice cloning session encountered an error |

### Async Iterators (Continuous Streams)

Use async iterators for continuous audio streams:

```python
async for audio_chunk in call.audio_stream():
    # Process each audio chunk
    processed = await process_audio(audio_chunk)
    await call.send_audio(processed)
```

## API Reference

### SessionManager

Main entry point for managing connections.

**Methods:**

| Method | Description |
|---|---|
| `on_session_notification` | Decorator — receives a `SessionNotification` for every new inbound session. |
| `await open_session(session_id)` | Claim a session by id. Returns a `Session`. Idempotent. |
| `await create_session(channel, remote_address)` | Start an SDK-initiated (outbound) session. Returns a `Session`. |
| `await list_sessions(channel=None, remote_address=None)` | List OPEN sessions for this connector. |
| `await setup_call_flow(config)` | Push a custom call/message flow (`CallFlowConfig`) to the server. Send-once; re-send after a reconnect if needed. |
| `await run_forever()` | Run until Ctrl+C or context-manager exit. Use inside `async with SessionManager(config) as sm:` after registering handlers. |

### SessionNotification

Delivered to `@sm.on_session_notification` handlers.

**Fields:**
- `session_id` — Session identifier to pass to `open_session()`
- `remote_address` — Remote party address (caller for inbound calls)
- `local_address` — Local party address (callee for inbound calls)
- `channel` — Channel type (e.g. `TELCO`, `WA`)
- `activity` — A `SessionActivityType` value (`NEW_INCOMING_CALL` / `NEW_INCOMING_MESSAGE` / `RESUME`)
- `created_at` — Notify emit time, unix epoch seconds

### Session

Represents a claimed session. Register handlers, then call `ready()`.

**Methods:**

| Method | Description |
|---|---|
| `on_incoming_call(func)` | Decorator — receives the `Call` for a CALL session. |
| `on_incoming_message(func)` | Decorator — receives an `IncomingMessage` (WA channel). |
| `on_closed(func)` | Decorator — invoked when the session closes. |
| `await ready()` | Signal readiness; the server then starts delivering events. Idempotent; no-op on a closed session. |
| `await send_message(...)` | Send an outbound message (WA channel). |
| `await wait_closed()` | Await until the session is closed. |
| `await close(...)` | Close the session and release resources. |

**Properties:**
- `is_closed` — Whether the session has been closed
- `channel` — Channel type

### Call

Represents a voice call with a media connection. Command methods (`answer`, `connect`, `whisper`, `barge`, `spy`, `disconnect`, `close`) return a `CommandResult` — truthy on success — for operational failures; connection-gone and programmer errors still raise.

**Methods:**

| Method | Description |
|---|---|
| `await answer()` | Answer the call |
| `await connect(ring_time_seconds=60)` | Initiate a 3-way conference between the caller, callee, and the agent |
| `await whisper()` | Switch to whisper mode. Only available after `connect()`. Agent audio is heard by the number subscriber only (the callee for incoming calls, or the caller for outgoing calls). The other party cannot hear the agent. |
| `await barge()` | Switch to barge mode. Only available after `connect()`. Agent joins the conversation and can be heard by both the caller and the callee. |
| `await spy()` | Switch to spy mode. Only available after `connect()`. Agent listens to both parties but neither can hear the agent (silent monitoring). |
| `await disconnect()` | End the call for all parties (server ends the call) |
| `await close()` | Close the call and release resources. **After `connect()`:** the agent leaves the call; the caller and callee remain connected. **After `answer()` without `connect()`:** the call ends for both the agent and the caller. |
| `await send_audio(audio_data)` | Queue binary audio data for sending |
| `await clear_send_audio_buffer()` | Clear all queued audio from the outgoing buffer |
| `await get_send_audio_buffer_size()` | Get current size of the outgoing buffer |
| `audio_stream(channel_id=0)` | Async iterator for incoming audio chunks |
| `on_terminated(func)` | Decorator sugar for `CallEvent.CALL_TERMINATED` |
| `on_call_event(event)` | Decorator for call event handlers (e.g. `CallEvent.CALL_ERROR`) |
| `to_json()` / `from_json(data)` | Serialize / reconstruct a `Call` for cross-node handoff |

**Properties:**
- `id` — Unique call identifier
- `caller_number` — Caller phone number
- `callee_number` — Callee phone number
- `state` — Current `CallState`
- `audio_config` — The `CallAudioConfig` for this call

### CommandResult

Returned by `Call` command methods.

- `success` — `True` on success (the object is truthy, so `if await call.answer(): ...` works)
- `error_code` — Server error code on failure, or `"TIMEOUT"` if the command timed out
- `error_message` — Human-readable error detail
- `payload` — Server response payload, when present

## Error Handling

The SDK provides a hierarchy of custom exceptions. Catch `WSSError` to handle any SDK-raised error, or catch specific subclasses for more granular control. Note that **operational** command failures (server `success=false`, timeouts) are returned as a `CommandResult`, not raised.

- `WSSError` - Base exception for all SDK errors
- `WSSConnectionError` - Connection-related errors (session manager or media)
- `WSSAuthenticationError` - Authentication failures (API key, tokens, or mTLS)
- `WSSCallError` - Base class for call-related errors:
    - `WSSCallClosedError` - Attempted operation on a closed call
    - `WSSCallCommandError` - Command failure carrying `error_code` and `error_message`
    - `WSSCallCommandTimeoutError` - Client timed out waiting for a command response
- `WSSSessionError` - Voice session connection errors
- `BufferFullError` - Outgoing audio buffer is full
- `BufferClosedError` - Write attempted on a closed outgoing audio buffer (e.g. after call ended)

### Handling CALL_UNANSWERED for connect()

When using `call.connect()`, the callee may not answer within the ring time. The server returns error code `CALL_UNANSWERED`, surfaced on the returned `CommandResult`. The call remains active — you can retry, try another number, or end the call.

```python
@session.on_incoming_call
async def assistant_flow(call: Call):
    await call.answer()
    result = await call.connect(ring_time_seconds=30)
    if not result:
        if result.error_code == "CALL_UNANSWERED":
            await call.disconnect()
        else:
            logger.error("Connect failed: %s (%s)", result.error_message, result.error_code)
```

## Audio Buffering & Interruption Handling

The SDK uses a pull-based flow control mechanism. When you call `send_audio()`, the data is placed in an internal `ConcurrentByteBuffer`. The SDK then sends this data to the server only when the server requests it.

### Why use a buffer?
- **Smooth Playback**: Prevents audio jitter by maintaining a steady supply of data for the server.
- **Flow Control**: Automatically handles the rate at which audio should be sent.
- **Interruption Handling**: If your AI model gets interrupted (e.g., via a Gemini Live interruption event), you can instantly clear the buffer to stop any pending audio from being played.

### Handling Interruption

```python
# When the AI model detects an interruption
await call.clear_send_audio_buffer()
# Now you can start sending new audio without old audio playing first
```

## Integration with AI Models

This example demonstrates bidirectional audio streaming and real-time interruption handling using the [Google GenAI SDK](https://github.com/google-gemini/generative-ai-python).

```python
import asyncio
import os
from google import genai
from google.genai import types
from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig

gemini_client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
MODEL = "gemini-2.5-flash-native-audio-preview-12-2025"

async def start_gemini_session(call: Call):
    await call.answer()
    async with gemini_client.aio.live.connect(model=MODEL) as session:
        async def stream_to_gemini():
            async for chunk in call.audio_stream():
                await session.send_realtime_input(
                    audio=types.Blob(data=chunk, mime_type="audio/pcm;rate=24000")
                )
        async def receive_from_gemini():
            async for response in session.receive():
                if content := response.server_content:
                    if content.interrupted:
                        await call.clear_send_audio_buffer()
                    elif content.model_turn:
                        for part in content.model_turn.parts:
                            if part.inline_data:
                                await call.send_audio(part.inline_data.data)
        await asyncio.gather(stream_to_gemini(), receive_from_gemini())

async def main():
    config = SessionManagerConfig.create(
        api_key=os.getenv("WSS_API_KEY"),
        connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
        call_audio=CallAudioConfig(sample_rate=24000),
    )
    async with SessionManager(config) as sm:
        @sm.on_session_notification
        async def handle_session(noti: SessionNotification):
            session = await sm.open_session(noti.session_id)

            @session.on_incoming_call
            async def on_call(call: Call):
                await start_gemini_session(call)

            await session.ready()

        await sm.run_forever()

if __name__ == "__main__":
    asyncio.run(main())
```

## Integration with Google ADK (Agent Development Kit)

For more complex AI agent behavior, including multi-agent orchestration, long-term memory, and structured tools, you can integrate with the [Google Agent Development Kit (ADK)](https://github.com/google-gemini/google-adk).

The following example shows how to bridge Telcoflow's bidirectional audio stream with ADK's `Runner` and `LiveRequestQueue`.

```python
import asyncio
import os
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.agents.live_request_queue import LiveRequestQueue
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.genai import types

from telcoflow import SessionManager, SessionManagerConfig, SessionNotification, Call, CallAudioConfig

# Configure ADK Agent
agent = Agent(
    name="telcoflow_agent",
    model="gemini-2.5-flash-native-audio-preview-12-2025",
    instruction="You are a helpful AI assistant talking over a phone call.",
)
session_service = InMemorySessionService()
runner = Runner(app_name="telcoflow_app", agent=agent, session_service=session_service)

async def start_adk_session(call: Call):
    """Bridges audio with ADK Live Runner."""
    await call.answer()

    # ADK uses a queue to receive real-time inputs (audio chunks)
    live_request_queue = LiveRequestQueue()

    # Task 1: Stream audio to ADK
    async def stream_to_adk():
        async for audio_chunk in call.audio_stream():
            audio_blob = types.Blob(
                data=audio_chunk,
                mime_type="audio/pcm;rate=24000"
            )
            live_request_queue.send_realtime(audio_blob)

    # Task 2: Receive events and audio from ADK and send back to caller
    async def receive_from_adk():
        run_config = RunConfig(
            streaming_mode=StreamingMode.BIDI,
            response_modalities=["AUDIO"]
        )
        async for event in runner.run_live(
            user_id="default_user",
            session_id=call.id,
            live_request_queue=live_request_queue,
            run_config=run_config,
        ):
            # Instant interruption handling
            if event.interrupted:
                await call.clear_send_audio_buffer()

            # Forward model audio to the caller
            if event.content and event.content.parts[0].inline_data:
                await call.send_audio(event.content.parts[0].inline_data.data)

    # Run both directions concurrently
    await asyncio.gather(stream_to_adk(), receive_from_adk())

async def main():
    config = SessionManagerConfig.create(
        api_key=os.getenv("WSS_API_KEY"),
        connector_uuid=os.getenv("WSS_CONNECTOR_UUID"),
        call_audio=CallAudioConfig(sample_rate=24000),
    )
    async with SessionManager(config) as sm:
        @sm.on_session_notification
        async def handle_session(noti: SessionNotification):
            session = await sm.open_session(noti.session_id)

            @session.on_incoming_call
            async def handle_call(call: Call):
                await start_adk_session(call)

            await session.ready()

        await sm.run_forever()

if __name__ == "__main__":
    asyncio.run(main())
```

## Thread Safety & Concurrency

- Each `call.new` event is processed in its own dedicated background task, allowing multiple calls to be handled concurrently
- Each `Call` runs independently in its own asyncio task
- Control connection heartbeat runs in a separate task
- Reconnection logic runs in a separate task
- All operations are non-blocking
- The SDK automatically manages task lifecycle and cleanup

## Requirements

- Python 3.11+
- websockets>=15.0
- typing-extensions>=4.8.0
- abxbus>=2.4
