Metadata-Version: 2.3
Name: streaming-utils
Version: 1.1.0
Summary: Asynchronous orchestration toolkit for Janus Streaming and VideoRoom pipelines, covering mountpoint/room provisioning, RTP forwarding, FFmpeg/GStreamer relays, OBS ingest control, queue workers, lifecycle cleanup, health reporting, and autoscaling signals.
Keywords: janus,streaming,videoroom,webrtc,rtp,ffmpeg,gstreamer,obs,orchestration
Author: Lakan
Author-email: Lakan <leydotpy.dev@gmail.com>
License: Proprietary
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Multimedia :: Sound/Audio :: Capture/Recording
Classifier: Topic :: Multimedia :: Video
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Dist: janus-api>=2.0.0
Requires-Dist: websockets>=16.0
Requires-Dist: aiokafka>=0.13.0 ; extra == 'kafka'
Requires-Dist: redis>=5.0.0 ; extra == 'queues'
Requires-Dist: aiokafka>=0.13.0 ; extra == 'queues'
Requires-Dist: redis>=5.0.0 ; extra == 'redis'
Maintainer: Lakan
Maintainer-email: Lakan <leydotpy.dev@gmail.com>
Requires-Python: >=3.14
Provides-Extra: kafka
Provides-Extra: queues
Provides-Extra: redis
Description-Content-Type: text/markdown

# streaming_utils

`streaming_utils` is an asynchronous orchestration library for Janus-based media systems. It sits above `janus_api` and coordinates complete broadcast lifecycles: creating and tearing down Janus Streaming mountpoints, ensuring Janus VideoRoom rooms, wiring `rtp_forward` from publishers, starting relay pipelines (FFmpeg or GStreamer), controlling OBS over WebSocket v5, and executing the same flows in queue-backed workers.

The package is built for service backends that need predictable, observable media operations across start, run, and stop phases. It includes cleanup stacks for reverse-order teardown, relay and connection health reporting, observer hooks for operational events/errors, and autoscaling signals based on queue backlog.

Core capabilities include:

- Janus Streaming mountpoint provisioning and cleanup
- Janus VideoRoom room creation/ensure and RTP forwarding
- FFmpeg/GStreamer relay supervision with restart and health checks
- OBS WebSocket v5 ingest configuration and control
- Redis Streams and Kafka queue backends for worker-driven orchestration
- structured lifecycle management, health snapshots, and autoscale hooks

The orchestration interfaces are protocol-based, but this implementation is intentionally integrated with this repository's `janus_api` package:

- `streaming_utils.models` imports `janus_api.conf.settings`
- default Janus integration is implemented in `streaming_utils.adapters`
- examples in this README use `janus_api` plugin handles directly

## What the package is for

Use `streaming_utils` when you already have Janus plugin handles from `janus_api` and want one higher-level utility that can:

- create a Janus Streaming RTP mountpoint
- optionally ensure a VideoRoom exists
- start `rtp_forward` from a VideoRoom publisher into the mountpoint's RTP ports
- start an FFmpeg or GStreamer relay into Janus for RTMP/SRS or OBS-based flows
- manage cleanup if the Janus resources are owned by your application
- run the same work synchronously or through a queue worker

Typical use cases:

- bridge a Janus VideoRoom publisher into a Janus Streaming mountpoint
- relay an externally published RTMP feed into Janus Streaming
- control OBS to publish to SRS, then relay that ingest into Janus
- wrap the whole flow in a worker process with Redis Streams or Kafka

## Package map

- [`models.py`](./models.py): dataclass models for tracks, mountpoints, VideoRoom specs, source configs, and broadcast requests/results
- [`types.py`](./types.py): `TypedDict` equivalents for service-layer keyword inputs
- [`adapters.py`](./adapters.py): protocol definitions and Janus adapters that wrap `janus_api` plugin objects
- [`orchestrator.py`](./orchestrator.py): main runtime coordinator
- [`service.py`](./service.py): thin facade around the orchestrator
- [`process.py`](./process.py): managed subprocess lifecycle with restart/backoff and health checks
- [`lib/ffmpeg.py`](./lib/ffmpeg.py): FFmpeg relay wrapper
- [`lib/gstreamer.py`](./lib/gstreamer.py): GStreamer relay wrapper
- [`lib/obs.py`](./lib/obs.py): OBS WebSocket client and bridge helpers
- [`lib/srs.py`](./lib/srs.py): small SRS ingest spec model
- [`queue.py`](./queue.py): queue protocol plus Redis Streams and Kafka backends
- [`observer.py`](./observer.py): logging/event/error/metrics hooks
- [`lifecycle.py`](./lifecycle.py): reverse-order async cleanup manager used by the orchestrator
- [`cleanup.py`](./cleanup.py): a second cleanup stack helper that is currently not wired into the orchestrator
- [`autoscale.py`](./autoscale.py): backlog-driven autoscaling helpers
- [`exceptions.py`](./exceptions.py): package-specific exception types

## How it plugs into janus_api

### Required Janus-side objects

The orchestrator needs two backend objects:

- a Streaming backend
- a VideoRoom backend

When you use `janus_api`, you normally pass:

- a `StreamingPlugin` instance for Streaming mountpoint work
- a `Publisher` instance for VideoRoom work

Those are adapted by [`streaming_utils.adapters`](./adapters.py).

### The exact janus_api methods the adapters expect

`JanusStreamingBackend` wraps a `janus_api` Streaming plugin and calls:

- `plugin.create(**kwargs)` from [`janus_api/lib/plugins/streaming.py`](../janus_api/lib/plugins/streaming.py)
- `plugin.destroy(mountpoint_id=..., permanent=..., secret=...)`

`JanusVideoRoomBackend` wraps a `janus_api` VideoRoom publisher plugin and calls:

- `plugin.exists(room=...)`
- `plugin.create(**kwargs)`
- `plugin.rtp_forward(spec)`
- `plugin.destroy(room=..., permanent=..., secret=...)`

The VideoRoom backend therefore needs a `Publisher`-style plugin handle, not a `Subscriber`, because the adapter calls `rtp_forward()`. That method is implemented on [`janus_api/lib/plugins/videoroom.py`](../janus_api/lib/plugins/videoroom.py) by `Publisher`.

### Real janus_api integration example

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.models import AudioTrack, BroadcastRequest, MountPoint, VideoTrack
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(
            session=session,
            admin_key="stream-admin-key",  # optional, but this is where Streaming admin_key belongs
        ).attach()

        videoroom = await Publisher(
            session=session,
            room=1234,
            username="stream-bridge",
        ).attach()

        orchestrator = BroadcastOrchestrator(
            streaming=streaming,
            videoroom=videoroom,
            description="VideoRoom publisher relay",
        )

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="videoroom",
                relay_kind="none",
                room_id=1234,
                publisher_id=5678,
                create_mountpoint=True,
                owned_by_app=False,
                host="127.0.0.1",
                mountpoint_spec=MountPoint(
                    id=9001,
                    media=[
                        VideoTrack(mid="v", port=5004, pt=96, codec="H264"),
                        AudioTrack(mid="a", port=5006, pt=111, codec="opus"),
                    ],
                ),
            )
        )

        print(result.broadcast_id)
        print(result.state)
        print(result.mountpoint_response)
        print(result.forward_response)
    finally:
        await session.destroy()


asyncio.run(main())
```

Important points in that example:

- `owned_by_app=False` is safer when you are bridging into a room your application did not create itself
- `StreamingPlugin(admin_key=...)` is the practical place to provide the Streaming admin key when using the adapter path
- the orchestrator accepts raw `janus_api` plugin instances; you do not have to wrap them yourself

## Adapter field mapping to janus_api

`streaming_utils` models use friendly Python field names, but Janus Streaming create requests still expect Janus-specific names. [`adapters.py`](./adapters.py) translates the important differences for `StreamingPlugin.create()`.

Key mappings:

| streaming_utils model field | Janus / janus_api create key |
| --- | --- |
| `multicast` or `mcast` | `mcast` |
| `bind_interface` or `iface` | `iface` |
| `rtp_port` | `rtcpport` |
| `h264_sps` | `h264sps` |
| `data_type` | `datatype` |
| `buffer_latest_message` | `databuffermsg` |

Most mountpoint-level fields already match the `janus_api.models.streaming.request.CreateRequest` model in [`janus_api/models/streaming/request.py`](../janus_api/models/streaming/request.py).

For VideoRoom `rtp_forward`, the `streaming_utils` request model already matches the `janus_api.models.videoroom.request.RTPForwardRequest` shape in [`janus_api/models/videoroom/request.py`](../janus_api/models/videoroom/request.py), so the adapter mainly passes the spec through.

## Core models

### Track models

[`models.py`](./models.py) defines:

- `Track`
- `AudioTrack`
- `VideoTrack`
- `DataTrack`

These are used to populate `MountPoint.media`.

Example:

```python
from streaming_utils.models import AudioTrack, MountPoint, VideoTrack

mountpoint = MountPoint(
    id=9001,
    description="camera feed",
    enabled=True,
    media=[
        VideoTrack(
            mid="v",
            port=5004,
            pt=96,
            codec="H264",
            fmtp="packetization-mode=1;profile-level-id=42e01f",
        ),
        AudioTrack(
            mid="a",
            port=5006,
            pt=111,
            codec="opus",
            fmtp="minptime=10;useinbandfec=1",
        ),
    ],
)
```

### MountPoint

`MountPoint` is the Janus Streaming create-spec model. It supports:

- `type`: `"rtp"`, `"live"`, `"ondemand"`, or `"rtsp"`
- mountpoint identity and access fields such as `id`, `description`, `secret`, `pin`, `is_private`
- RTP/media settings through `media`
- RTSP-specific fields such as `url`, `rtsp_user`, `rtsp_pwd`, `rtsp_timeout`, and related flags
- `extra` for direct payload extension

### RoomSpec and RTPForwardRequestSpec

These model the Janus VideoRoom side:

- `RoomSpec`: room create/ensure payload
- `RTPForwardRequestSpec`: `rtp_forward` request
- `RTPForwardStreamSpec`: per-stream forward descriptor

`RoomSpec.admin_key` and `RTPForwardRequestSpec.admin_key` are part of the dataclasses and can be passed directly to the VideoRoom side.

### SRSIngestSpec and RTSPCameraConfig

- `SRSIngestSpec` builds `publish_url` as `"{rtmp_url}/{app}/{stream_key}"`
- `RTSPCameraConfig` stores `url`, credentials, transport, and timeout settings

### BroadcastRequest

`BroadcastRequest` is the top-level input model the orchestrator understands.

Important fields:

- `source_kind`
- `relay_kind`
- `room_id`
- `publisher_id`
- `create_mountpoint`
- `owned_by_app`
- `host`
- `mountpoint_spec`
- `input_url`
- `gstreamer_pipeline`
- `srs`
- `room_spec`
- `rtsp_camera`

`mountpoint_spec.media` is the canonical Janus stream definition. Each item in
that list carries the per-track port, payload type, codec, and optional
`mindex` used to map multiple audio or video source streams to the correct
Janus destination.

`BroadcastRequest.to_payload()` and `.from_payload()` exist for queue serialization, not for Janus plugin calls directly.

### BroadcastResult

`BroadcastResult` is the orchestrator output. It holds:

- the original request
- `broadcast_id`
- `state`
- `room_response`
- `mountpoint_response`
- `forward_response`
- `relay_info`
- `srs_info`
- `queue_info`
- `health`
- `note`

Backend responses are intentionally not normalized. When using `janus_api`, fields such as `mountpoint_response`, `room_response`, and `forward_response` may contain raw `JanusResponse` objects. In the "room already exists" path, `room_response` may instead be the synthetic dict `{"exists": True, "room": <room_id>}`.

## Supported source flows

The orchestrator does not treat all `BroadcastRequest.source_kind` values the same way.

### 1. `source_kind="videoroom"`

Behavior:

- optionally create a Streaming mountpoint
- ensure a VideoRoom exists
- call `rtp_forward` from the specified publisher into the ports declared in `mountpoint_spec.media`

Required in practice:

- `room_id`
- `publisher_id`
- `mountpoint_spec` with a non-empty `media` list
- if `create_mountpoint=False`, every stream in `mountpoint_spec.media` must already include its Janus RTP `port`

Cleanup behavior:

- if `owned_by_app=True` and `room_id` is set, the orchestrator registers a room-destroy callback
- if `create_mountpoint=True` and `owned_by_app=True`, it also registers mountpoint cleanup

Use this flow when you want Janus-to-Janus bridging: VideoRoom publisher -> RTP forward -> Streaming mountpoint.

### 2. `source_kind="obs"`

Behavior:

- requires `srs`
- creates an OBS WebSocket client
- configures OBS to publish to a custom RTMP target
- starts the selected relay (`ffmpeg` or `gstreamer`)
- starts OBS streaming
- registers cleanup for OBS and the relay

Required in practice:

- `relay_kind` must be `"ffmpeg"` or `"gstreamer"`
- `srs` must be set
- `mountpoint_spec` must be set
- if you omit `gstreamer_pipeline`, the library auto-generates a Janus-targeted GStreamer pipeline from the resolved media list

### 3. `source_kind="rtmp_external"`

Behavior:

- assumes an external publisher is already sending media to `request.srs.publish_url`
- starts the selected relay (`ffmpeg` or `gstreamer`)
- does not control OBS

Required in practice:

- `relay_kind` must be `"ffmpeg"` or `"gstreamer"`
- `srs` must be set
- `mountpoint_spec` must be set

### 4. `source_kind="rtsp_camera"`

Behavior:

- expects `rtsp_camera`
- expects `relay_kind="ffmpeg"` or `relay_kind="gstreamer"`
- builds the relay input directly from `rtsp_camera.url`
- injects RTSP-specific FFmpeg transport and timeout flags when the FFmpeg relay is selected
- `mountpoint_spec` must be set

### 5. `source_kind="frontend_owned"`

Behavior:

- no server-side relay is started
- the result gets a note telling the caller to use VideoRoom or ingest-style browser paths elsewhere

This is effectively a placeholder mode.

### Declared but not implemented

`SourceKind` includes `"browser_webrtc"` in [`models.py`](./models.py) and [`types.py`](./types.py), but the orchestrator does not have a corresponding branch for it.

The orchestrator also still accepts the legacy string `"external"` at runtime, even though that value is not part of the declared `SourceKind` type.

## Relay selection

`RelayKind` is declared as:

- `"none"`
- `"ffmpeg"`
- `"gstreamer"`

Actual behavior:

- `videoroom` can reasonably use `"none"`
- `frontend_owned` does not use a relay
- `obs` requires `"ffmpeg"` or `"gstreamer"`
- `rtmp_external` requires `"ffmpeg"` or `"gstreamer"`
- `rtsp_camera` expects `"ffmpeg"`

[`BroadcastOrchestrator._get_request_relay()`](./orchestrator.py) only knows how to construct FFmpeg and GStreamer relays.

## Orchestrator behavior

[`BroadcastOrchestrator`](./orchestrator.py) is the main runtime object.

Construction:

```python
from streaming_utils.orchestrator import BroadcastOrchestrator

orchestrator = BroadcastOrchestrator(
    streaming=streaming_plugin_or_backend,
    videoroom=videoroom_publisher_or_backend,
    observer=observer,   # optional
    queue=queue,         # optional
    description="broadcast feed",
)
```

What it does during `start()`:

1. generates a new `broadcast_id`
2. creates a `JanusLifecycleManager`
3. stores a `BroadcastResult(state="starting")`
4. emits `broadcast_starting`
5. optionally creates a Streaming mountpoint
6. branches by `source_kind`
7. stores running relay handles in `_relays`
8. registers cleanup callbacks in `_lifecycles`
9. marks the result `running`
10. attaches a health snapshot from `health()`
11. emits `broadcast_running`

State it keeps in memory:

- `_active`: `broadcast_id -> BroadcastResult`
- `_relays`: `broadcast_id -> relay/process handle`
- `_lifecycles`: `broadcast_id -> JanusLifecycleManager`

Stop behavior:

- `stop(broadcast_id)` stops one tracked broadcast
- `stop(None)` stops all tracked broadcasts
- cleanup runs in reverse registration order
- stopped results remain in `_active` with terminal state

Context manager behavior:

```python
async with BroadcastOrchestrator(streaming=streaming, videoroom=videoroom) as orchestrator:
    ...
```

Exiting the context stops all tracked broadcasts.

## Service facade

[`service.Broadcast`](./service.py) is a thin wrapper around the orchestrator.

Methods:

- `start(**kwargs)`: builds `BroadcastRequest(**kwargs)` and calls `orchestrator.start()`
- `stop()`: stops all broadcasts through `orchestrator.stop()`
- `enqueue(**kwargs)`: serializes a request and queues it through `orchestrator.enqueue_start()`
- `health()`: delegates to `orchestrator.health()`
- `listen(...)`: delegates to `orchestrator.worker_loop()`
- `autoscale(...)`: delegates to `orchestrator.autoscale_tick()`

Use the service if you want a keyword-argument interface around the dataclass models.

Use the orchestrator directly if you need:

- full `BroadcastResult` objects
- per-broadcast stop control
- tighter control over request/response objects

## Queues, workers, and autoscaling

### Queue backends

[`queue.py`](./queue.py) provides:

- `OrchestrationQueue` protocol
- `RedisStreamsQueue`
- `KafkaTopicQueue`
- `JobEnvelope`

`RedisStreamsQueue`:

- stores jobs in a Redis stream
- uses consumer groups
- requeues failed jobs by appending a new entry with `attempts + 1`
- reports real lag through `XINFO GROUPS`

`KafkaTopicQueue`:

- publishes JSON jobs to a topic
- commits offsets only on `ack()` or `nack()`
- currently returns `0` from `lag()`

### Worker loop

The worker loop:

- blocks on `queue.claim()`
- dispatches `start_broadcast` or `stop_broadcast`
- `ack()`s on success
- `nack(requeue=True)` on failure
- emits `observer.emit_error(..., phase="worker_job", ...)` on per-job failure

### Autoscaling

Autoscaling is provided by [`autoscale.py`](./autoscale.py):

- `AutoscalePolicy`
- `RelayAutoscaler`
- `WorkerPool`

The orchestrator's `autoscale_tick()`:

- reads queue lag
- estimates current worker count as `len(_relays)` or `1`
- computes a desired count
- emits a `scale_request` observer event through `_scale_workers()`

It does not spawn or kill worker processes itself.

## Observer, lifecycle, and process management

### OpsObserver

[`observer.py`](./observer.py) provides `OpsObserver`, which can be given:

- `logger`
- `metrics`
- `on_state_change`
- `on_event`
- `on_error`

The orchestrator, relay wrappers, OBS client, and cleanup helpers all use it.

### JanusLifecycleManager

[`lifecycle.py`](./lifecycle.py) is the cleanup stack used by the orchestrator.

It:

- stores async closers
- runs them in reverse order
- emits `janus_resource_closed` on success
- emits `on_error(..., phase="janus_cleanup", ...)` on failure
- raises `LifecycleError` if any cleanup step fails

### AsyncManagedProcess

[`process.py`](./process.py) supervises long-lived subprocesses.

It provides:

- async start/stop/restart
- health check loop every 5 seconds
- process-group termination on POSIX
- restart with exponential backoff and jitter
- optional restart budget

This is the base class used by:

- `FFmpeg`
- `GStreamer`

## FFmpeg, GStreamer, OBS, and SRS helpers

### FFmpeg relay

[`lib/ffmpeg.py`](./lib/ffmpeg.py) defines:

- `Endpoint`
- `FFmpeg`

`FFmpeg.build_command()`:

- uses `ffmpeg -re -i <input_url>`
- maps audio-like codecs to `0:a:0`
- maps all other destinations to `0:v:0`
- sends RTP to `rtp://host:port?pkt_size=1200`

### GStreamer relay

[`lib/gstreamer.py`](./lib/gstreamer.py) defines `GStreamer`, a `gst-launch-1.0` managed wrapper around a caller-supplied pipeline string.

### OBS helper

[`lib/obs.py`](./lib/obs.py) defines:

- `ObsConnectionConfig`
- `OBSWebSocketV5Adapter`
- `ObsBridge`

`OBSWebSocketV5Adapter` handles:

- OBS v5 Hello/Identify handshake
- optional password auth
- request/response correlation
- event dispatch
- reconnect helpers
- health checks through `GetStreamStatus`

`ObsBridge` adds simple flows:

- `configure_custom_rtmp()`
- `connect_and_start()`
- `stop()`

### SRS ingest helper

[`lib/srs.py`](./lib/srs.py) defines `SRSIngestSpec`.

Example:

```python
from streaming_utils.lib.srs import SRSIngestSpec

srs = SRSIngestSpec(
    rtmp_url="rtmp://127.0.0.1",
    app="live",
    stream_key="cam-01",
)

print(srs.publish_url)
# rtmp://127.0.0.1/live/cam-01
```

## Example: VideoRoom publisher -> Streaming mountpoint

This is the cleanest `janus_api` integration path in the current implementation.

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.models import AudioTrack, BroadcastRequest, MountPoint, VideoTrack
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        publisher = await Publisher(session=session, room=4321, username="forwarder").attach()

        orchestrator = BroadcastOrchestrator(
            streaming=streaming,
            videoroom=publisher,
            description="publisher 777 mirrored to Streaming",
        )

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="videoroom",
                relay_kind="none",
                room_id=4321,
                publisher_id=777,
                create_mountpoint=True,
                owned_by_app=False,
                host="127.0.0.1",
                mountpoint_spec=MountPoint(
                    id=9901,
                    media=[
                        VideoTrack(mid="v", port=5004, pt=96, codec="H264"),
                        AudioTrack(mid="a", port=5006, pt=111, codec="opus"),
                    ],
                ),
            )
        )

        print(result.mountpoint_response)
        print(result.forward_response)
    finally:
        await session.destroy()


asyncio.run(main())
```

What that does:

- creates a Janus Streaming RTP mountpoint from the `mountpoint_spec.media` list
- ensures VideoRoom `4321` exists
- starts `rtp_forward` from publisher `777` to those media ports

## Example: OBS -> SRS -> FFmpeg -> Janus Streaming

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.lib.srs import SRSIngestSpec
from streaming_utils.models import AudioTrack, BroadcastRequest, MountPoint, VideoTrack
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        videoroom = await Publisher(session=session, room=1234, username="obs-bridge").attach()

        orchestrator = BroadcastOrchestrator(streaming=streaming, videoroom=videoroom)

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="obs",
                relay_kind="ffmpeg",
                create_mountpoint=True,
                owned_by_app=True,
                host="127.0.0.1",
                mountpoint_spec=MountPoint(
                    id=9100,
                    media=[
                        VideoTrack(mid="v", port=6004, pt=96, codec="H264"),
                        AudioTrack(mid="a", port=6006, pt=111, codec="opus"),
                    ],
                ),
                srs=SRSIngestSpec(
                    rtmp_url="rtmp://127.0.0.1",
                    app="live",
                    stream_key="obs-9100",
                ),
                obs_host="127.0.0.1",
                obs_port=4455,
                obs_password="secret",
                obs_secure=False,
            )
        )

        print(result.srs_info)
        print(result.relay_info)
    finally:
        await session.destroy()


asyncio.run(main())
```

## Example: queue-backed service wrapper

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.autoscale import AutoscalePolicy
from streaming_utils.orchestrator import BroadcastOrchestrator
from streaming_utils.queue import RedisStreamsQueue
from streaming_utils.service import Broadcast


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        videoroom = await Publisher(session=session, room=4321, username="worker").attach()

        queue = RedisStreamsQueue(redis_url="redis://127.0.0.1:6379/0")
        orchestrator = BroadcastOrchestrator(streaming=streaming, videoroom=videoroom, queue=queue)
        service = Broadcast(orchestrator=orchestrator, queue_kind="redis")

        await service.enqueue(
            source_kind="videoroom",
            relay_kind="none",
            room_id=4321,
            publisher_id=777,
            create_mountpoint=True,
            owned_by_app=False,
            host="127.0.0.1",
            mountpoint_spec={
                "id": 9901,
                "media": [
                    {"mid": "v", "type": "video", "port": 5004, "pt": 96, "codec": "H264"},
                    {"mid": "a", "type": "audio", "port": 5006, "pt": 111, "codec": "opus"},
                ],
            },
        )

        stop_event = asyncio.Event()
        worker = asyncio.create_task(
            service.listen(
                consumer="broadcast-worker-1",
                block_ms=1000,
                stop_event=stop_event,
            )
        )

        desired = await service.autoscale(
            policy=AutoscalePolicy(min_workers=1, max_workers=5, target_lag_per_worker=2)
        )
        print(desired)

        await asyncio.sleep(2)
        stop_event.set()
        await worker
    finally:
        await session.destroy()


asyncio.run(main())
```

## Operational caveats and current implementation limits

These points are important if you are using `streaming_utils` as production glue around `janus_api`.

- `streaming_utils.__init__` currently exports only `Broadcast`. Import from submodules directly if you need the lower-level models and orchestrator types.
- `BroadcastRequest.source_kind` declares `"browser_webrtc"`, but the orchestrator does not implement that branch.
- The orchestrator still accepts a legacy `"external"` string even though it is not part of the declared `SourceKind`.
- Automatic cleanup for mountpoints and rooms does not pass secrets back into `destroy_mountpoint()` or `destroy_room()`. If you create secret-protected resources, automatic teardown may fail unless your Janus setup allows destruction without those secrets.
- Automatic room cleanup is controlled only by `owned_by_app` and `room_id`. If you point the orchestrator at an existing room and leave `owned_by_app=True`, stop/cleanup will still try to destroy that room.
- `BroadcastResult.health` is a global orchestrator health snapshot, not a per-broadcast-only snapshot.
- `BroadcastResult` response fields are raw backend results, not a normalized schema.
- `service.Broadcast.stop()` has no `broadcast_id` parameter and stops every tracked broadcast.
- `queue_kind` is stored in enqueued payloads but is not otherwise interpreted by the orchestrator.
- `KafkaTopicQueue.lag()` currently returns `0`, so autoscaling decisions are only meaningfully backlog-aware with `RedisStreamsQueue` unless you extend the Kafka backend.
- The orchestrator tracks relay health through `_relays`; it does not keep a separate OBS bridge object in that map.
- `BroadcastRequest.obs_secure` defaults to `janus_api.conf.settings.DEBUG`. Set it explicitly if you do not want environment-dependent OBS transport behavior.

## When to use what

Use `BroadcastOrchestrator` if you want:

- full control over lifecycle
- per-broadcast stop
- direct access to dataclass request/result objects
- easiest integration with raw `janus_api` plugin instances

Use `Broadcast` if you want:

- a thinner interface for RPC or HTTP handlers
- keyword-argument inputs shaped like `BroadcastRequestType`
- queue worker entrypoints

Use the lower-level helpers directly if you only need one part:

- `FFmpeg` or `GStreamer` for process supervision
- `OBSWebSocketV5Adapter` or `ObsBridge` for OBS control
- `RedisStreamsQueue` or `KafkaTopicQueue` for background dispatch
- `JanusLifecycleManager` if you only want ordered async cleanup

## Repository references

Useful `janus_api` references when reading this package:

- [`janus_api/lib/plugins/streaming.py`](../janus_api/lib/plugins/streaming.py)
- [`janus_api/lib/plugins/videoroom.py`](../janus_api/lib/plugins/videoroom.py)
- [`janus_api/models/streaming/request.py`](../janus_api/models/streaming/request.py)
- [`janus_api/models/videoroom/request.py`](../janus_api/models/videoroom/request.py)
- [`janus_api/lib/source/_base.py`](../janus_api/lib/source/_base.py)
- [`janus_api/lib/source/_ffmpeg.py`](../janus_api/lib/source/_ffmpeg.py)
- [`janus_api/lib/source/_gstreamer.py`](../janus_api/lib/source/_gstreamer.py)
- [`janus_api/lib/source/_obs.py`](../janus_api/lib/source/_obs.py)

Those files are the closest code-level reference for how `streaming_utils` is meant to be used as a utility layer on top of `janus_api`.
