Metadata-Version: 2.4
Name: aiomoqt
Version: 0.9.9
Summary: Python asyncio implementation of the MoQT protocol
Author-email: Giovanni Marzot <gmarzot@marzresearch.net>
License-Expression: MIT
Project-URL: Homepage, https://github.com/gmarzot/aiomoqt
Project-URL: Repository, https://github.com/gmarzot/aiomoqt.git
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: certifi
Requires-Dist: aiopquic>=0.3.8
Requires-Dist: sortedcontainers>=2.4.0
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-asyncio; extra == "test"
Provides-Extra: dev
Requires-Dist: pytest; extra == "dev"
Requires-Dist: pytest-asyncio; extra == "dev"
Requires-Dist: ruff; extra == "dev"
Requires-Dist: ty; extra == "dev"
Provides-Extra: examples
Requires-Dist: argparse; extra == "examples"
Dynamic: license-file

# aiomoqt - Media over QUIC Transport (MoQT)

`aiomoqt` is an implementation of [MoQT](https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html) for `asyncio`, layered on [aiopquic](https://pypi.org/project/aiopquic/).

## Overview

`aiomoqt` implements the MoQT protocol with **dual draft-14 / draft-16 support**, both publish and subscribe roles, H3/WebTransport and raw QUIC transports, and ALPN-based draft negotiation. The architecture extends `aiopquic.asyncio.QuicConnectionProtocol`. The package ships publisher / subscriber example clients, a benchmark suite, a relay version probe, and a [moq-interop-runner](https://github.com/englishm/moq-interop-runner)-compatible test client.

### Features

- H3/WebTransport and raw QUIC transports
- Draft-14 / draft-16 ALPN negotiation (`moq-00` / `moqt-16`)
- Draft-16 wire format: delta-encoded param keys, track extensions, unified request/response
- SubgroupHeader / ObjectDatagram flag encoding, delta-encoded object IDs
- Version-independent control: `MOQTRequestError` exception across drafts
- Async context manager for session lifecycle
- Sync / async response handling on every control message via `wait_response`
- High-level publisher: [`PublishedTrack`](aiomoqt/track.py) — stream setup, subgroup writing, pacing
- High-level subscriber: [`SubscribedTrack`](aiomoqt/track.py) — object reassembly, FETCH / JOIN handling
- Pluggable message handlers via `register_handler()`
- Data publishing via SubgroupHeader streams or ObjectDatagrams
- Data reception via `on_object_received` callback
- Low-level message serialization / deserialization for custom protocol work

## Installation

Pure Python, requires Python 3.12+ (tested on 3.12, 3.13, 3.14):

```bash
uv pip install aiomoqt    # or: pip install aiomoqt
```

`aiopquic` (the QUIC transport) installs as a binary wheel automatically. Only Linux (glibc 2.34+, RHEL 9 / Ubuntu 22.04+) and macOS arm64 have prebuilt wheels; other systems pull `aiopquic` via sdist and need a C build toolchain — see [aiopquic install notes](https://github.com/gmarzot/aiopquic#installation).

A `./bootstrap_python.sh` script is provided for a uv-managed `.venv` if you want a clean dev environment.

### Reporting issues

Include the full version report in any issue. It captures aiomoqt, aiopquic, and the picoquic + picotls submodule SHAs aiopquic was built from — useful for diagnosing version-pair mismatches across the stack:

```bash
python -m aiomoqt.versions   # or the console script: aiomoqt-versions
```

Sample output:

```
aiomoqt:   0.9.7.dev7+g0fa185338 (~/src/aiomoqt/aiomoqt) [2026-06-11 11:57]
aiopquic:  0.3.7.dev12+g6eef9caf6 (~/src/aiopquic/src/aiopquic) [2026-06-11 11:58]
  - picoquic:  1.1.49.2 (d6c5653d) [2026-06-05]
  - picotls:   master (bfa67875) [2026-04-20]
```

## Quick Start

### Verify install + relay liveness

Confirm the stack is wired up before writing any code:

```bash
# Versions of aiomoqt + aiopquic + picoquic + picotls
python -m aiomoqt.versions   # or: aiomoqt-versions

# Liveness + supported drafts against a single relay (one line per probed transport).
# Exit 0 if the relay answered SERVER_SETUP for at least one draft.
python -m aiomoqt.examples.relay_probe --url moqt://moqx-main.ci.openmoq.org:4433
# → moqx-main.ci.openmoq.org:4433  quic   draft-14,draft-16  ✓ (405ms)

python -m aiomoqt.examples.relay_probe --url https://moqx-main.ci.openmoq.org:4433/moq-relay
# → moqx-main.ci.openmoq.org:4433  h3/wt  draft-14,draft-16  ✓ (315ms)
```

### Subscriber

```python
import asyncio
from aiomoqt.client import MOQTClient

def on_object(msg, size, recv_time_ms, group_id=None, subgroup_id=None):
    print(f"g={group_id} obj={msg.object_id} {size}B payload={msg.payload}")

async def main():
    client = MOQTClient('relay.example.com', 443, path='moq',
                         use_quic=True, draft_version=16)
    async with client.connect() as session:
        await session.client_session_init()
        session.on_object_received = on_object
        await session.subscribe('ns', 'track', wait_response=True)
        await session.async_closed()

asyncio.run(main())
```

### Publisher

```python
import asyncio
from aiomoqt.client import MOQTClient
from aiomoqt.types import MOQTMessageType
from aiomoqt.messages import SubgroupHeader

async def main():
    client = MOQTClient('relay.example.com', 443, path='moq',
                         use_quic=True, draft_version=16)
    client.register_handler(MOQTMessageType.SUBSCRIBE, on_subscribe)
    async with client.connect() as session:
        await session.client_session_init()
        await session.publish_namespace('ns', wait_response=True)
        await session.async_closed()  # serve until closed

async def on_subscribe(session, msg):
    """Called when a subscriber requests a track."""
    ok = session.subscribe_ok(request_msg=msg)
    stream_id = session.open_uni_stream()
    hdr = SubgroupHeader(track_alias=ok.track_alias, group_id=0, subgroup_id=0, publisher_priority=0)
    session.stream_write(stream_id, hdr.serialize().data)
    session.stream_write(stream_id, hdr.next_object(payload=b"hello").data)
    session.transmit()

asyncio.run(main())
```

The `on_subscribe` handler above — stream setup, header serialization, object writing — is wrapped by the higher-level `PublishedTrack` / `SubscribedTrack` classes in `aiomoqt.track`. See `*_bench.py` examples for typical usage.

### Control Message API

Control messages support both sync and async patterns via `wait_response`:

```python
# Blocking — awaits and returns the response message
resp = await session.subscribe('ns', 'track', wait_response=True)

# Non-blocking — returns request, response arrives via handler
req = await session.subscribe('ns', 'track')
```

## Examples

### Publisher / Subscriber

```bash
# Publish (SubgroupHeader streams)
python -m aiomoqt.examples.pub_example -h relay.ex.com -q

# Publish (ObjectDatagrams)
python -m aiomoqt.examples.pub_example -h relay.ex.com -q --datagram

# Subscribe
python -m aiomoqt.examples.sub_example -h relay.ex.com -q

# Subscribe + FETCH (join mid-stream)
python -m aiomoqt.examples.join_example -h relay.ex.com -q
```

Common options: `--namespace`, `--trackname`, `--path`, `--debug`, `--keylogfile`. Every tool prints its full option set with `-?` / `--help`.

### Benchmarks

Bench tools take a positional relay URL — `moqt://host[:port]` for raw QUIC, `https://host[:port]/[endpoint]` for H3/WebTransport — except `loopback_bench`, which needs no relay at all:

```bash
# Local loopback (canonical stack benchmark, no relay)
python -m aiomoqt.examples.loopback_bench -s 4096 -P 4 -t 20

# Publisher / subscriber through a relay
python -m aiomoqt.examples.pub_bench moqt://relay.ex.com -s 4096 -P 4 -r 120 -t 60
python -m aiomoqt.examples.sub_bench moqt://relay.ex.com
```

The full tool matrix (two-process, fanout, adaptive ramp), all options, latency methodology (paced vs unpaced, TX budgets), and observed numbers live in [PERFORMANCE.md](PERFORMANCE.md).

### Interop Testing

```bash
# All tests (draft-14, auto-detected)
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433"

# All tests (draft-16)
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433" --draft 16

# Single test case
python -m aiomoqt.examples.moq_interop_client -r "moqt://relay.ex.com:4433" -t subscribe-error

# List test cases
python -m aiomoqt.examples.moq_interop_client -l
```

### Relay Probe

Batch liveness + draft-version check. Reads a relay list, does a
real CLIENT_SETUP / SERVER_SETUP handshake per (endpoint × draft) —
no bare-ALPN tricks — and writes a JSON status report.

Accepts CLI flags, environment variables, or both (CLI overrides env):

```bash
# CLI form (typical interactive use)
python -m aiomoqt.examples.relay_probe -f relays.json -o status.json --once

# Env form (typical container/daemon deployment)
RELAYS_FILE=relays.json OUTPUT_FILE=status.json PROBE_ONCE=1 \
  python -m aiomoqt.examples.relay_probe

# Long-running monitor (re-probe every --interval seconds)
python -m aiomoqt.examples.relay_probe -f relays.json -o status.json
```

| CLI flag | Env var | Default | Meaning |
|----------|---------|---------|---------|
| `-f / --relays-file` | `RELAYS_FILE` | `/app/relays.json` | input relay list |
| `-o / --output-file` | `OUTPUT_FILE` | `/output/relay-status.json` | status report destination |
| `--timeout` | `PROBE_TIMEOUT` | `8` | per-probe handshake timeout (s) |
| `--interval` | `PROBE_INTERVAL` | `300` | re-probe cadence in monitor mode (s) |
| `--once` | `PROBE_ONCE=1` | unset | probe once and exit |

### WebTransport Server

```bash
python -m aiomoqt.examples.server_example \
    --certificate cert.pem --private-key key.pem --port 443
```

### Example Reference

| Example | Description |
|---------|-------------|
| `pub_example.py` | Publisher — SubgroupHeader streams or ObjectDatagrams |
| `sub_example.py` | Subscriber — receives data from a relay |
| `join_example.py` | SUBSCRIBE + FETCH (join mid-stream) |
| `pub_bench.py` | Publisher benchmark, configurable parameters |
| `sub_bench.py` | Subscriber with latency/jitter/loss stats |
| `relay_bench.py` | Combined pub/sub in one process |
| `multi_sub_bench.py` | 1 publisher, N subscribers in one process |
| `loopback_bench.py` | Local loopback (no relay) |
| `adaptive_bench.py` | Ramps rate until buffer growth; loopback (`--mp-loopback` for proc-isolated pub/sub) or relay |
| `server_example.py` | WebTransport server (origin) |
| `relay_probe.py` | Relay version probe (draft-14/16) |
| `moq_interop_client.py` | Interop test client (TAP v14 out; 6 standard + `fetch`/`join`) |

## Interop

Validated against live public relays — OpenMoQ moqx, Meta moxygen,
Cloudflare moq-rs, Quicr libquicr, Meetecho imquic, Red5 Pro, OzU
moqtail — across draft-14/draft-16 and both transports, using the
[moq-interop-runner](https://github.com/englishm/moq-interop-runner)
cases plus a multi-subscriber pub-sub bench. The full point-in-time
matrix lives in [PERFORMANCE.md](PERFORMANCE.md#interop-matrix-point-in-time);
the relay catalog with per-endpoint notes is
[`tests/relays.json`](tests/relays.json).

## Performance

`aiomoqt` sits on [`aiopquic`](https://github.com/gmarzot/aiopquic),
which sits on picoquic + the kernel UDP path; throughput at this layer
is bounded by the layers below. Observed on commodity hardware over
loopback: multi-Gbps sustained throughput at ~4 KiB objects with
bounded memory under stream churn, and sub-millisecond to
low-millisecond latency when paced below saturation. Numbers vary
substantially by platform — methodology, observed figures, the
paced-vs-unpaced distinction, and TX budget tuning are in
[PERFORMANCE.md](PERFORMANCE.md).

## Development

```bash
git clone https://github.com/gmarzot/aiomoqt.git
cd aiomoqt
python3 -m venv .venv && source .venv/bin/activate
uv pip install -e ".[test]"    # or: pip install -e ".[test]"

# Self-signed cert for the loopback server (loopback_bench, pub_server,
# and the test_loopback_* suites; skipped if certs/ is absent). One time:
mkdir -p certs && openssl req -x509 -newkey rsa:2048 -nodes -days 3650 \
  -keyout certs/key.pem -out certs/cert.pem -subj "/CN=localhost" \
  -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"

pytest aiomoqt/tests/
```

## Known Limitations

- **WebTransport fetch / join routing** -- four `[wt]` test variants of FETCH and JOINING_SUBSCRIBE return empty results when the underlying transport is WebTransport. Raw-QUIC variants of the same tests pass and cover the MoQT-level invariant. Tracked separately; affects WT-only consumers of fetch/join.

## TODO

* Fix WebTransport fetch / join routing (the 4 `[wt]` tests currently skipped)
* Track data modules:
  - File transfer (or [MOQT File Format](https://datatracker.ietf.org/doc/html/draft-jennings-moq-file-00)?)
  - Interactive chat
  - MSF/LOC media packaging
  - CMSF media packaging
* Simple relay implementation

## Contributing

Contributions are welcome! Please fork the repository, create a branch,
and submit a pull request. For major changes, open an issue first.

## Resources

- [MoQT Specification](https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html)
- [Media Over QUIC Working Group](https://datatracker.ietf.org/wg/moq/about/)
- [MoQ Interop Runner](https://github.com/englishm/moq-interop-runner)
- [`aiomoqt` GitHub Repository](https://github.com/gmarzot/aiomoqt)

---

## Author

Giovanni Marzot — [gmarzot@marzresearch.net](mailto:gmarzot@marzresearch.net) | [moqarean.marzresearch.net](https://moqarean.marzresearch.net)

## Acknowledgements

This project takes inspiration from, and has benefited from the great work
done by the [OpenMoQ/moxygen](https://github.com/openmoq/moxygen) team,
and the continued efforts of the MOQ IETF WG.
