Metadata-Version: 2.3
Name: streaming-utils
Version: 1.0.0
Summary: Async orchestration utilities for Janus Streaming and VideoRoom workflows, including mountpoint and room provisioning, RTP forwarding, FFmpeg and GStreamer relay management, OBS-to-SRS ingest control, queue-backed workers, lifecycle cleanup, health reporting, and autoscaling.
Author: Lakan
Author-email: Lakan <leydotpy.dev@gmail.com>
Requires-Dist: janus-api>=2.0.0
Requires-Python: >=3.14
Description-Content-Type: text/markdown

# streaming_utils

`streaming_utils` is a repo-local orchestration package for turning media sources into Janus Streaming mountpoints and Janus VideoRoom RTP forwards.

In this repository it is designed to plug directly into `janus_api`. The package does not replace `janus_api`; it sits above it and coordinates:

- Janus Streaming mountpoint creation and destruction
- Janus VideoRoom room provisioning and RTP forwarding
- FFmpeg and GStreamer relay subprocesses
- OBS control over OBS WebSocket v5
- queue-backed worker execution
- lifecycle cleanup, health reporting, and autoscaling signals

It is "pluggable" in the sense that the orchestrator works against small backend protocols, but in practice this implementation is tightly coupled to this repo's `janus_api` package:

- `streaming_utils.models` imports `janus_api.conf.settings`
- the default Janus integration path is `streaming_utils.adapters`
- examples in this README use real `janus_api` plugin classes from this codebase

The package root, [`streaming_utils/__init__.py`](./__init__.py), is empty, so import from submodules directly.

## What the package is for

Use `streaming_utils` when you already have Janus plugin handles from `janus_api` and want one higher-level utility that can:

- create a Janus Streaming RTP mountpoint
- optionally ensure a VideoRoom exists
- start `rtp_forward` from a VideoRoom publisher into the mountpoint's RTP ports
- start an FFmpeg or GStreamer relay into Janus for RTMP/SRS or OBS-based flows
- manage cleanup if the Janus resources are owned by your application
- run the same work synchronously or through a queue worker

Typical use cases:

- bridge a Janus VideoRoom publisher into a Janus Streaming mountpoint
- relay an externally published RTMP feed into Janus Streaming
- control OBS to publish to SRS, then relay that ingest into Janus
- wrap the whole flow in a worker process with Redis Streams or Kafka

## Package map

- [`models.py`](./models.py): dataclass models for tracks, mountpoints, VideoRoom specs, source configs, and broadcast requests/results
- [`types.py`](./types.py): `TypedDict` equivalents for service-layer keyword inputs
- [`adapters.py`](./adapters.py): protocol definitions and Janus adapters that wrap `janus_api` plugin objects
- [`orchestrator.py`](./orchestrator.py): main runtime coordinator
- [`service.py`](./service.py): thin facade around the orchestrator
- [`process.py`](./process.py): managed subprocess lifecycle with restart/backoff and health checks
- [`lib/ffmpeg.py`](./lib/ffmpeg.py): FFmpeg relay wrapper
- [`lib/gstreamer.py`](./lib/gstreamer.py): GStreamer relay wrapper
- [`lib/obs.py`](./lib/obs.py): OBS WebSocket client and bridge helpers
- [`lib/srs.py`](./lib/srs.py): small SRS ingest spec model
- [`queue.py`](./queue.py): queue protocol plus Redis Streams and Kafka backends
- [`observer.py`](./observer.py): logging/event/error/metrics hooks
- [`lifecycle.py`](./lifecycle.py): reverse-order async cleanup manager used by the orchestrator
- [`cleanup.py`](./cleanup.py): a second cleanup stack helper that is currently not wired into the orchestrator
- [`autoscale.py`](./autoscale.py): backlog-driven autoscaling helpers
- [`exceptions.py`](./exceptions.py): package-specific exception types

## How it plugs into janus_api

### Required Janus-side objects

The orchestrator needs two backend objects:

- a Streaming backend
- a VideoRoom backend

When you use `janus_api`, you normally pass:

- a `StreamingPlugin` instance for Streaming mountpoint work
- a `Publisher` instance for VideoRoom work

Those are adapted by [`streaming_utils.adapters`](./adapters.py).

### The exact janus_api methods the adapters expect

`JanusStreamingBackend` wraps a `janus_api` Streaming plugin and calls:

- `plugin.create(**kwargs)` from [`janus_api/lib/plugins/streaming.py`](../janus_api/lib/plugins/streaming.py)
- `plugin.destroy(mountpoint_id=..., permanent=..., secret=...)`

`JanusVideoRoomBackend` wraps a `janus_api` VideoRoom publisher plugin and calls:

- `plugin.exists(room=...)`
- `plugin.create(**kwargs)`
- `plugin.rtp_forward(spec)`
- `plugin.destroy(room=..., permanent=..., secret=...)`

The VideoRoom backend therefore needs a `Publisher`-style plugin handle, not a `Subscriber`, because the adapter calls `rtp_forward()`. That method is implemented on [`janus_api/lib/plugins/videoroom.py`](../janus_api/lib/plugins/videoroom.py) by `Publisher`.

### Real janus_api integration example

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.models import BroadcastRequest
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(
            session=session,
            admin_key="stream-admin-key",  # optional, but this is where Streaming admin_key belongs
        ).attach()

        videoroom = await Publisher(
            session=session,
            room=1234,
            username="stream-bridge",
        ).attach()

        orchestrator = BroadcastOrchestrator(
            streaming=streaming,
            videoroom=videoroom,
            description="VideoRoom publisher relay",
        )

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="videoroom",
                relay_kind="none",
                room_id=1234,
                publisher_id=5678,
                mountpoint_id=9001,
                create_mountpoint=True,
                owned_by_app=False,
                host="127.0.0.1",
                video_port=5004,
                audio_port=5006,
                video_pt=96,
                audio_pt=111,
                video_codec="H264",
                audio_codec="opus",
            )
        )

        print(result.broadcast_id)
        print(result.state)
        print(result.mountpoint_response)
        print(result.forward_response)
    finally:
        await session.destroy()


asyncio.run(main())
```

Important points in that example:

- `owned_by_app=False` is safer when you are bridging into a room your application did not create itself
- `StreamingPlugin(admin_key=...)` is the practical place to provide the Streaming admin key when using the adapter path
- the orchestrator accepts raw `janus_api` plugin instances; you do not have to wrap them yourself

## Adapter field mapping to janus_api

`streaming_utils` models use friendly Python field names, but Janus Streaming create requests still expect Janus-specific names. [`adapters.py`](./adapters.py) translates the important differences for `StreamingPlugin.create()`.

Key mappings:

| streaming_utils model field | Janus / janus_api create key |
| --- | --- |
| `multicast` or `mcast` | `mcast` |
| `bind_interface` or `iface` | `iface` |
| `rtp_port` | `rtcpport` |
| `h264_sps` | `h264sps` |
| `data_type` | `datatype` |
| `buffer_latest_message` | `databuffermsg` |

Most mountpoint-level fields already match the `janus_api.models.streaming.request.CreateRequest` model in [`janus_api/models/streaming/request.py`](../janus_api/models/streaming/request.py).

For VideoRoom `rtp_forward`, the `streaming_utils` request model already matches the `janus_api.models.videoroom.request.RTPForwardRequest` shape in [`janus_api/models/videoroom/request.py`](../janus_api/models/videoroom/request.py), so the adapter mainly passes the spec through.

## Core models

### Track models

[`models.py`](./models.py) defines:

- `Track`
- `AudioTrack`
- `VideoTrack`
- `DataTrack`

These are used to populate `MountPoint.media`.

Example:

```python
from streaming_utils.models import AudioTrack, MountPoint, VideoTrack

mountpoint = MountPoint(
    id=9001,
    description="camera feed",
    enabled=True,
    media=[
        VideoTrack(
            mid="v",
            port=5004,
            pt=96,
            codec="H264",
            fmtp="packetization-mode=1;profile-level-id=42e01f",
        ),
        AudioTrack(
            mid="a",
            port=5006,
            pt=111,
            codec="opus",
            fmtp="minptime=10;useinbandfec=1",
        ),
    ],
)
```

### MountPoint

`MountPoint` is the Janus Streaming create-spec model. It supports:

- `type`: `"rtp"`, `"live"`, `"ondemand"`, or `"rtsp"`
- mountpoint identity and access fields such as `id`, `description`, `secret`, `pin`, `is_private`
- RTP/media settings through `media`
- RTSP-specific fields such as `url`, `rtsp_user`, `rtsp_pwd`, `rtsp_timeout`, and related flags
- `extra` for direct payload extension

### RoomSpec and RTPForwardRequestSpec

These model the Janus VideoRoom side:

- `RoomSpec`: room create/ensure payload
- `RTPForwardRequestSpec`: `rtp_forward` request
- `RTPForwardStreamSpec`: per-stream forward descriptor

`RoomSpec.admin_key` and `RTPForwardRequestSpec.admin_key` are part of the dataclasses and can be passed directly to the VideoRoom side.

### SRSIngestSpec and RTSPCameraConfig

- `SRSIngestSpec` builds `publish_url` as `"{rtmp_url}/{app}/{stream_key}"`
- `RTSPCameraConfig` stores `url`, credentials, transport, and timeout settings

### BroadcastRequest

`BroadcastRequest` is the top-level input model the orchestrator understands.

Important fields:

- `source_kind`
- `relay_kind`
- `room_id`
- `publisher_id`
- `mountpoint_id`
- `create_mountpoint`
- `owned_by_app`
- `host`
- `video_port`, `audio_port`, `data_port`
- `video_pt`, `audio_pt`
- `video_codec`, `audio_codec`
- `video_fmtp`, `audio_fmtp`
- `simulcast`, `video_port2`, `video_port3`
- `srs`
- `room_spec`
- `rtsp_camera`
- `mountpoint_spec`

`BroadcastRequest.to_payload()` and `.from_payload()` exist for queue serialization, not for Janus plugin calls directly.

### BroadcastResult

`BroadcastResult` is the orchestrator output. It holds:

- the original request
- `broadcast_id`
- `state`
- `room_response`
- `mountpoint_response`
- `forward_response`
- `relay_info`
- `srs_info`
- `queue_info`
- `health`
- `note`

Backend responses are intentionally not normalized. When using `janus_api`, fields such as `mountpoint_response`, `room_response`, and `forward_response` may contain raw `JanusResponse` objects. In the "room already exists" path, `room_response` may instead be the synthetic dict `{"exists": True, "room": <room_id>}`.

## Supported source flows

The orchestrator does not treat all `BroadcastRequest.source_kind` values the same way.

### 1. `source_kind="videoroom"`

Behavior:

- optionally create a Streaming mountpoint
- ensure a VideoRoom exists
- call `rtp_forward` from the specified publisher into the configured host/ports

Required in practice:

- `room_id`
- `publisher_id`
- at least one of `video_port`, `audio_port`, or `data_port`, unless you provided a full `mountpoint_spec`

Cleanup behavior:

- if `owned_by_app=True` and `room_id` is set, the orchestrator registers a room-destroy callback
- if `create_mountpoint=True` and `owned_by_app=True`, it also registers mountpoint cleanup

Use this flow when you want Janus-to-Janus bridging: VideoRoom publisher -> RTP forward -> Streaming mountpoint.

### 2. `source_kind="obs"`

Behavior:

- requires `srs`
- creates an OBS WebSocket client
- configures OBS to publish to a custom RTMP target
- starts the selected relay (`ffmpeg` or `gstreamer`)
- starts OBS streaming
- registers cleanup for OBS and the relay

Required in practice:

- `relay_kind` must be `"ffmpeg"` or `"gstreamer"`
- `srs` must be set
- Janus RTP ports or a prebuilt `mountpoint_spec` must be available if you are creating a mountpoint

### 3. `source_kind="rtmp_external"`

Behavior:

- assumes an external publisher is already sending media to `request.srs.publish_url`
- starts the selected relay (`ffmpeg` or `gstreamer`)
- does not control OBS

Required in practice:

- `relay_kind` must be `"ffmpeg"` or `"gstreamer"`
- `srs` must be set

### 4. `source_kind="rtsp_camera"`

Behavior:

- expects `rtsp_camera`
- expects `relay_kind="ffmpeg"`
- injects RTSP-specific FFmpeg flags

Current implementation note:

- the RTSP path delegates to `_create_ffmpeg_relay()`, and that helper currently uses `request.srs.publish_url` as the FFmpeg input URL
- in other words, the current RTSP branch does not build a self-contained relay directly from `rtsp_camera.url`

Treat this branch as implementation-incomplete unless you have verified the exact request shape in your deployment.

### 5. `source_kind="frontend_owned"`

Behavior:

- no server-side relay is started
- the result gets a note telling the caller to use VideoRoom or ingest-style browser paths elsewhere

This is effectively a placeholder mode.

### Declared but not implemented

`SourceKind` includes `"browser_webrtc"` in [`models.py`](./models.py) and [`types.py`](./types.py), but the orchestrator does not have a corresponding branch for it.

The orchestrator also still accepts the legacy string `"external"` at runtime, even though that value is not part of the declared `SourceKind` type.

## Relay selection

`RelayKind` is declared as:

- `"none"`
- `"ffmpeg"`
- `"gstreamer"`

Actual behavior:

- `videoroom` can reasonably use `"none"`
- `frontend_owned` does not use a relay
- `obs` requires `"ffmpeg"` or `"gstreamer"`
- `rtmp_external` requires `"ffmpeg"` or `"gstreamer"`
- `rtsp_camera` expects `"ffmpeg"`

[`BroadcastOrchestrator._get_request_relay()`](./orchestrator.py) only knows how to construct FFmpeg and GStreamer relays.

## Orchestrator behavior

[`BroadcastOrchestrator`](./orchestrator.py) is the main runtime object.

Construction:

```python
from streaming_utils.orchestrator import BroadcastOrchestrator

orchestrator = BroadcastOrchestrator(
    streaming=streaming_plugin_or_backend,
    videoroom=videoroom_publisher_or_backend,
    observer=observer,   # optional
    queue=queue,         # optional
    description="broadcast feed",
)
```

What it does during `start()`:

1. generates a new `broadcast_id`
2. creates a `JanusLifecycleManager`
3. stores a `BroadcastResult(state="starting")`
4. emits `broadcast_starting`
5. optionally creates a Streaming mountpoint
6. branches by `source_kind`
7. stores running relay handles in `_relays`
8. registers cleanup callbacks in `_lifecycles`
9. marks the result `running`
10. attaches a health snapshot from `health()`
11. emits `broadcast_running`

State it keeps in memory:

- `_active`: `broadcast_id -> BroadcastResult`
- `_relays`: `broadcast_id -> relay/process handle`
- `_lifecycles`: `broadcast_id -> JanusLifecycleManager`

Stop behavior:

- `stop(broadcast_id)` stops one tracked broadcast
- `stop(None)` stops all tracked broadcasts
- cleanup runs in reverse registration order
- stopped results remain in `_active` with terminal state

Context manager behavior:

```python
async with BroadcastOrchestrator(streaming=streaming, videoroom=videoroom) as orchestrator:
    ...
```

Exiting the context stops all tracked broadcasts.

## Service facade

[`service.Broadcast`](./service.py) is a thin wrapper around the orchestrator.

Methods:

- `start(**kwargs)`: builds `BroadcastRequest(**kwargs)` and calls `orchestrator.start()`
- `stop()`: stops all broadcasts through `orchestrator.stop()`
- `enqueue(**kwargs)`: serializes a request and queues it through `orchestrator.enqueue_start()`
- `health()`: delegates to `orchestrator.health()`
- `listen(...)`: delegates to `orchestrator.worker_loop()`
- `autoscale(...)`: delegates to `orchestrator.autoscale_tick()`

Use the service if you want a keyword-argument interface around the dataclass models.

Use the orchestrator directly if you need:

- full `BroadcastResult` objects
- per-broadcast stop control
- tighter control over request/response objects

## Queues, workers, and autoscaling

### Queue backends

[`queue.py`](./queue.py) provides:

- `OrchestrationQueue` protocol
- `RedisStreamsQueue`
- `KafkaTopicQueue`
- `JobEnvelope`

`RedisStreamsQueue`:

- stores jobs in a Redis stream
- uses consumer groups
- requeues failed jobs by appending a new entry with `attempts + 1`
- reports real lag through `XINFO GROUPS`

`KafkaTopicQueue`:

- publishes JSON jobs to a topic
- commits offsets only on `ack()` or `nack()`
- currently returns `0` from `lag()`

### Worker loop

The worker loop:

- blocks on `queue.claim()`
- dispatches `start_broadcast` or `stop_broadcast`
- `ack()`s on success
- `nack(requeue=True)` on failure
- emits `observer.emit_error(..., phase="worker_job", ...)` on per-job failure

### Autoscaling

Autoscaling is provided by [`autoscale.py`](./autoscale.py):

- `AutoscalePolicy`
- `RelayAutoscaler`
- `WorkerPool`

The orchestrator's `autoscale_tick()`:

- reads queue lag
- estimates current worker count as `len(_relays)` or `1`
- computes a desired count
- emits a `scale_request` observer event through `_scale_workers()`

It does not spawn or kill worker processes itself.

## Observer, lifecycle, and process management

### OpsObserver

[`observer.py`](./observer.py) provides `OpsObserver`, which can be given:

- `logger`
- `metrics`
- `on_state_change`
- `on_event`
- `on_error`

The orchestrator, relay wrappers, OBS client, and cleanup helpers all use it.

### JanusLifecycleManager

[`lifecycle.py`](./lifecycle.py) is the cleanup stack used by the orchestrator.

It:

- stores async closers
- runs them in reverse order
- emits `janus_resource_closed` on success
- emits `on_error(..., phase="janus_cleanup", ...)` on failure
- raises `LifecycleError` if any cleanup step fails

### AsyncManagedProcess

[`process.py`](./process.py) supervises long-lived subprocesses.

It provides:

- async start/stop/restart
- health check loop every 5 seconds
- process-group termination on POSIX
- restart with exponential backoff and jitter
- optional restart budget

This is the base class used by:

- `FFmpeg`
- `GStreamer`

## FFmpeg, GStreamer, OBS, and SRS helpers

### FFmpeg relay

[`lib/ffmpeg.py`](./lib/ffmpeg.py) defines:

- `Endpoint`
- `FFmpeg`

`FFmpeg.build_command()`:

- uses `ffmpeg -re -i <input_url>`
- maps audio-like codecs to `0:a:0`
- maps all other destinations to `0:v:0`
- sends RTP to `rtp://host:port?pkt_size=1200`

### GStreamer relay

[`lib/gstreamer.py`](./lib/gstreamer.py) defines `GStreamer`, a `gst-launch-1.0` managed wrapper around a caller-supplied pipeline string.

### OBS helper

[`lib/obs.py`](./lib/obs.py) defines:

- `ObsConnectionConfig`
- `OBSWebSocketV5Adapter`
- `ObsBridge`

`OBSWebSocketV5Adapter` handles:

- OBS v5 Hello/Identify handshake
- optional password auth
- request/response correlation
- event dispatch
- reconnect helpers
- health checks through `GetStreamStatus`

`ObsBridge` adds simple flows:

- `configure_custom_rtmp()`
- `connect_and_start()`
- `stop()`

### SRS ingest helper

[`lib/srs.py`](./lib/srs.py) defines `SRSIngestSpec`.

Example:

```python
from streaming_utils.lib.srs import SRSIngestSpec

srs = SRSIngestSpec(
    rtmp_url="rtmp://127.0.0.1",
    app="live",
    stream_key="cam-01",
)

print(srs.publish_url)
# rtmp://127.0.0.1/live/cam-01
```

## Example: VideoRoom publisher -> Streaming mountpoint

This is the cleanest `janus_api` integration path in the current implementation.

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.models import BroadcastRequest
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        publisher = await Publisher(session=session, room=4321, username="forwarder").attach()

        orchestrator = BroadcastOrchestrator(
            streaming=streaming,
            videoroom=publisher,
            description="publisher 777 mirrored to Streaming",
        )

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="videoroom",
                relay_kind="none",
                room_id=4321,
                publisher_id=777,
                create_mountpoint=True,
                mountpoint_id=9901,
                owned_by_app=False,
                host="127.0.0.1",
                video_port=5004,
                audio_port=5006,
                video_pt=96,
                audio_pt=111,
                video_codec="H264",
                audio_codec="opus",
            )
        )

        print(result.mountpoint_response)
        print(result.forward_response)
    finally:
        await session.destroy()


asyncio.run(main())
```

What that does:

- creates a Janus Streaming RTP mountpoint on ports `5004` and `5006`
- ensures VideoRoom `4321` exists
- starts `rtp_forward` from publisher `777` to those ports

## Example: OBS -> SRS -> FFmpeg -> Janus Streaming

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.lib.srs import SRSIngestSpec
from streaming_utils.models import BroadcastRequest
from streaming_utils.orchestrator import BroadcastOrchestrator


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        videoroom = await Publisher(session=session, room=1234, username="obs-bridge").attach()

        orchestrator = BroadcastOrchestrator(streaming=streaming, videoroom=videoroom)

        result = await orchestrator.start(
            BroadcastRequest(
                source_kind="obs",
                relay_kind="ffmpeg",
                create_mountpoint=True,
                mountpoint_id=9100,
                owned_by_app=True,
                host="127.0.0.1",
                video_port=6004,
                audio_port=6006,
                srs=SRSIngestSpec(
                    rtmp_url="rtmp://127.0.0.1",
                    app="live",
                    stream_key="obs-9100",
                ),
                obs_host="127.0.0.1",
                obs_port=4455,
                obs_password="secret",
                obs_secure=False,
            )
        )

        print(result.srs_info)
        print(result.relay_info)
    finally:
        await session.destroy()


asyncio.run(main())
```

## Example: queue-backed service wrapper

```python
import asyncio

from janus_api.session.websocket import WebsocketSession
from janus_api.lib.plugins.streaming import StreamingPlugin
from janus_api.lib.plugins.videoroom import Publisher

from streaming_utils.autoscale import AutoscalePolicy
from streaming_utils.orchestrator import BroadcastOrchestrator
from streaming_utils.queue import RedisStreamsQueue
from streaming_utils.service import Broadcast


async def main() -> None:
    session = WebsocketSession()
    await session.create()

    try:
        streaming = await StreamingPlugin(session=session, admin_key="stream-admin").attach()
        videoroom = await Publisher(session=session, room=4321, username="worker").attach()

        queue = RedisStreamsQueue(redis_url="redis://127.0.0.1:6379/0")
        orchestrator = BroadcastOrchestrator(streaming=streaming, videoroom=videoroom, queue=queue)
        service = Broadcast(orchestrator=orchestrator, queue_kind="redis")

        await service.enqueue(
            source_kind="videoroom",
            relay_kind="none",
            room_id=4321,
            publisher_id=777,
            mountpoint_id=9901,
            create_mountpoint=True,
            owned_by_app=False,
            host="127.0.0.1",
            video_port=5004,
            audio_port=5006,
        )

        stop_event = asyncio.Event()
        worker = asyncio.create_task(
            service.listen(
                consumer="broadcast-worker-1",
                block_ms=1000,
                stop_event=stop_event,
            )
        )

        desired = await service.autoscale(
            policy=AutoscalePolicy(min_workers=1, max_workers=5, target_lag_per_worker=2)
        )
        print(desired)

        await asyncio.sleep(2)
        stop_event.set()
        await worker
    finally:
        await session.destroy()


asyncio.run(main())
```

## Operational caveats and current implementation limits

These points are important if you are using `streaming_utils` as production glue around `janus_api`.

- `streaming_utils.__init__` exports nothing. Import from submodules directly.
- `BroadcastRequest.source_kind` declares `"browser_webrtc"`, but the orchestrator does not implement that branch.
- The orchestrator still accepts a legacy `"external"` string even though it is not part of the declared `SourceKind`.
- `BroadcastRequest.to_payload()` and `.from_payload()` do not serialize `rtsp_camera`, so queued RTSP jobs are not faithfully round-tripped.
- The RTSP branch currently injects RTSP FFmpeg flags but still builds the FFmpeg input from `request.srs.publish_url`, not `rtsp_camera.url`.
- `BroadcastOrchestrator.start()` creates a mountpoint before entering its main `try/except`, so a mountpoint-create failure is not handled by the later `result.state="failed"` and lifecycle-close path.
- Automatic cleanup for mountpoints and rooms does not pass secrets back into `destroy_mountpoint()` or `destroy_room()`. If you create secret-protected resources, automatic teardown may fail unless your Janus setup allows destruction without those secrets.
- Automatic room cleanup is controlled only by `owned_by_app` and `room_id`. If you point the orchestrator at an existing room and leave `owned_by_app=True`, stop/cleanup will still try to destroy that room.
- `BroadcastResult.health` is a global orchestrator health snapshot, not a per-broadcast-only snapshot.
- `BroadcastResult` response fields are raw backend results, not a normalized schema.
- `service.Broadcast.start()` returns `result.request.__dict__`, which can contain nested dataclass objects and therefore is not guaranteed to be JSON-safe.
- `service.Broadcast.stop()` has no `broadcast_id` parameter and stops every tracked broadcast.
- `queue_kind` is stored in enqueued payloads but is not otherwise interpreted by the orchestrator.
- `KafkaTopicQueue.lag()` currently returns `0`, so autoscaling decisions are only meaningfully backlog-aware with `RedisStreamsQueue` unless you extend the Kafka backend.
- The orchestrator tracks relay health through `_relays`; it does not keep a separate OBS bridge object in that map.
- `BroadcastRequest.obs_secure` defaults to `janus_api.conf.settings.DEBUG`. Set it explicitly if you do not want environment-dependent OBS transport behavior.

## When to use what

Use `BroadcastOrchestrator` if you want:

- full control over lifecycle
- per-broadcast stop
- direct access to dataclass request/result objects
- easiest integration with raw `janus_api` plugin instances

Use `Broadcast` if you want:

- a thinner interface for RPC or HTTP handlers
- keyword-argument inputs shaped like `BroadcastRequestType`
- queue worker entrypoints

Use the lower-level helpers directly if you only need one part:

- `FFmpeg` or `GStreamer` for process supervision
- `OBSWebSocketV5Adapter` or `ObsBridge` for OBS control
- `RedisStreamsQueue` or `KafkaTopicQueue` for background dispatch
- `JanusLifecycleManager` if you only want ordered async cleanup

## Repository references

Useful `janus_api` references when reading this package:

- [`janus_api/lib/plugins/streaming.py`](../janus_api/lib/plugins/streaming.py)
- [`janus_api/lib/plugins/videoroom.py`](../janus_api/lib/plugins/videoroom.py)
- [`janus_api/models/streaming/request.py`](../janus_api/models/streaming/request.py)
- [`janus_api/models/videoroom/request.py`](../janus_api/models/videoroom/request.py)
- [`janus_api/lib/source/_base.py`](../janus_api/lib/source/_base.py)
- [`janus_api/lib/source/_ffmpeg.py`](../janus_api/lib/source/_ffmpeg.py)
- [`janus_api/lib/source/_gstreamer.py`](../janus_api/lib/source/_gstreamer.py)
- [`janus_api/lib/source/_obs.py`](../janus_api/lib/source/_obs.py)

Those files are the closest code-level reference for how `streaming_utils` is meant to be used as a utility layer on top of `janus_api`.
