Metadata-Version: 2.4
Name: streamrelay
Version: 0.1.0
Summary: WebSocket relay for real-time token streaming from batch HPC executors
Project-URL: Homepage, https://github.com/uicacer/streamrelay
Project-URL: Repository, https://github.com/uicacer/streamrelay
Project-URL: Bug Tracker, https://github.com/uicacer/streamrelay/issues
Author-email: Anas Nassar <nassar@uic.edu>
License:                                  Apache License
                                   Version 2.0, January 2004
                                http://www.apache.org/licenses/
        
           TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
        
           1. Definitions.
        
              "License" shall mean the terms and conditions for use, reproduction,
              and distribution as defined by Sections 1 through 9 of this document.
        
              "Licensor" shall mean the copyright owner or entity authorized by
              the copyright owner that is granting the License.
        
              "Legal Entity" shall mean the union of the acting entity and all
              other entities that control, are controlled by, or are under common
              control with that entity. For the purposes of this definition,
              "control" means (i) the power, direct or indirect, to cause the
              direction or management of such entity, whether by contract or
              otherwise, or (ii) ownership of fifty percent (50%) or more of the
              outstanding shares, or (iii) beneficial ownership of such entity.
        
              "You" (or "Your") shall mean an individual or Legal Entity
              exercising permissions granted by this License.
        
              "Source" form shall mean the preferred form for making modifications,
              including but not limited to software source code, documentation
              source, and configuration files.
        
              "Object" form shall mean any form resulting from mechanical
              transformation or translation of a Source form, including but
              not limited to compiled object code, generated documentation,
              and conversions to other media types.
        
              "Work" shall mean the work of authorship made available under
              the License, as indicated by a copyright notice that is included in
              or attached to the work (an example is provided in the Appendix below).
        
              "Derivative Works" shall mean any work, whether in Source or Object
              form, that is based on (or derived from) the Work and for which the
              editorial revisions, annotations, elaborations, or other modifications
              represent, as a whole, an original work of authorship. For the purposes
              of this License, Derivative Works shall not include works that remain
              separable from, or merely link (or bind by name) to the interfaces of,
              the Work and Derivative Works thereof.
        
              "Contribution" shall mean, as defined by Section 1(a), any work of
              authorship submitted to the Licensor for inclusion in the Work by the
              copyright owner or by an individual or Legal Entity authorized to
              submit on behalf of the copyright owner.
        
              "Contributor" shall mean Licensor and any Legal Entity on behalf of
              whom a Contribution has been received by the Licensor and included
              within the Work.
        
           2. Grant of Copyright License. Subject to the terms and conditions of
              this License, each Contributor hereby grants to You a perpetual,
              worldwide, non-exclusive, no-charge, royalty-free, irrevocable
              copyright license to reproduce, prepare Derivative Works of,
              publicly display, publicly perform, sublicense, and distribute the
              Work and such Derivative Works in Source or Object form.
        
           3. Grant of Patent License. Subject to the terms and conditions of
              this License, each Contributor hereby grants to You a perpetual,
              worldwide, non-exclusive, no-charge, royalty-free, irrevocable
              (except as stated in this section) patent license to make, have made,
              use, offer to sell, sell, import, and otherwise transfer the Work,
              where such license applies only to those patent claims licensable
              by such Contributor that are necessarily infringed by their
              Contribution(s) alone or by the combinations of their Contribution(s)
              with the Work to which such Contribution(s) was submitted. If You
              institute patent proceedings against any entity (including a
              cross-claim or counterclaim in a lawsuit) alleging that the Work
              or a Contribution incorporated within the Work constitutes direct
              or contributory patent infringement, then any patent licenses
              granted to You under this License for that Work shall terminate
              as of the date such litigation is filed.
        
           4. Redistribution. You may reproduce and distribute copies of the
              Work or Derivative Works thereof in any medium, with or without
              modifications, and in Source or Object form, provided that You
              meet the following conditions:
        
              (a) You must give any other recipients of the Work or Derivative
                  Works a copy of this License; and
        
              (b) You must cause any modified files to carry prominent notices
                  stating that You changed the files; and
        
              (c) You must retain, in the Source form of any Derivative Works
                  that You distribute, all copyright, patent, trademark, and
                  attribution notices from the Source form of the Work,
                  excluding those notices that do not pertain to any part of
                  the Derivative Works; and
        
              (d) If the Work includes a "NOTICE" text file as part of its
                  distribution, You must include a readable copy of the
                  attribution notices contained within such NOTICE file, in
                  at least one of the following places: within a NOTICE text
                  file distributed as part of the Derivative Works; within
                  the Source form or documentation, if provided along with the
                  Derivative Works; or, within a display generated by the
                  Derivative Works, if and wherever such third-party notices
                  normally appear. The contents of the NOTICE file are for
                  informational purposes only and do not modify the License.
        
           5. Submission of Contributions. Unless You explicitly state otherwise,
              any Contribution intentionally submitted for inclusion in the Work
              by You to the Licensor shall be under the terms and conditions of
              this License, without any additional terms or conditions.
        
           6. Trademarks. This License does not grant permission to use the trade
              names, trademarks, service marks, or product names of the Licensor,
              except as required for reasonable and customary use in describing the
              origin of the Work and reproducing the content of the NOTICE file.
        
           7. Disclaimer of Warranty. Unless required by applicable law or agreed
              to in writing, Licensor provides the Work (and each Contributor
              provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES
              OR CONDITIONS OF ANY KIND, either express or implied, including,
              without limitation, any warranties or conditions of TITLE,
              NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE.
        
           8. Limitation of Liability. In no event and under no legal theory,
              whether in tort (including negligence), contract, or otherwise,
              unless required by applicable law (such as deliberate and grossly
              negligent acts) or agreed to in writing, shall any Contributor be
              liable to You for damages, including any direct, indirect, special,
              incidental, or exemplary damages of any kind arising as a result of
              this License or out of the use or inability to use the Work.
        
           9. Accepting Warranty or Liability. While redistributing the Work or
              Derivative Works thereof, You may choose to offer, and charge a fee
              for, acceptance of support, warranty, indemnity, or other liability
              obligations and/or rights consistent to the Law. However, in accepting
              such obligations, You may offer such obligations only on Your own behalf
              and on Your sole responsibility, not on behalf of any other Contributor.
        
           END OF TERMS AND CONDITIONS
        
           Copyright 2026 Anas Nassar, University of Illinois Chicago
        
           Licensed under the Apache License, Version 2.0 (the "License");
           you may not use this file except in compliance with the License.
           You may obtain a copy of the License at
        
               http://www.apache.org/licenses/LICENSE-2.0
        
           Unless required by applicable law or agreed to in writing, software
           distributed under the License is distributed on an "AS IS" BASIS,
           WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
           See the License for the specific language governing permissions and
           limitations under the License.
License-File: LICENSE
Keywords: globus-compute,hpc,llm,slurm,streaming,websocket
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.11
Requires-Dist: cryptography>=42.0
Requires-Dist: websockets>=13.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff; extra == 'dev'
Provides-Extra: globus
Requires-Dist: globus-compute-sdk>=2.0; extra == 'globus'
Description-Content-Type: text/markdown

# streamrelay

**Real-time token streaming from batch HPC executors via WebSocket relay.**

[![PyPI](https://img.shields.io/pypi/v/streamrelay)](https://pypi.org/project/streamrelay/)
[![License](https://img.shields.io/badge/license-Apache%202.0-blue)](LICENSE)
[![Tests](https://github.com/uicacer/streamrelay/actions/workflows/tests.yml/badge.svg)](https://github.com/uicacer/streamrelay/actions)
[![JOSS](https://joss.theoj.org/papers/TODO/status.svg)](https://joss.theoj.org/papers/TODO)

**New here? Start with the [full tutorial](docs/tutorial.md)** — deploy the relay,
write a producer on your HPC node, consume tokens in your app, add encryption.
All in one place.

---

## The problem

HPC batch systems execute jobs to completion and return a single result. When that
job is an LLM inference request, the user stares at a blank screen for the full
generation time — often 15–20 seconds — before seeing any output.

**streamrelay solves this with a dual-channel architecture:**

- **Control plane** (unchanged): your existing execution framework — a SLURM or PBS
  job script, an SSH command, a Globus Compute function call — handles job
  submission and authentication exactly as before.
- **Data plane** (streamrelay): a lightweight WebSocket relay through which the
  compute node streams tokens back in real time as the GPU generates them.

Both the compute node (producer) and your application (consumer) connect
**outbound** to the relay. Neither side accepts an inbound connection — no firewall
exceptions, no VPN, no tunnels required.

```
Your application              Relay server              HPC compute node
────────────────              ────────────              ────────────────
1. Submit job via         2. Both connect               3. Job starts
   SLURM / PBS /             outbound here              4. Tokens stream
   Globus Compute /          (this is streamrelay)         to relay →
   SSH / anything    ◄─────────────────────────────────────────────────
5. Tokens arrive,
   first one in < 1s
```

Measured in the [STREAM system](https://github.com/uicacer/stream) at UIC:
**0.85 s** median time-to-first-token from HPC with streaming, vs. **15.68 s**
in batch mode.

---

## Installation

```bash
pip install streamrelay
```

Optional — Globus Compute integration (adds `StreamingExecutor`):

```bash
pip install streamrelay[globus]
```

Requires Python ≥ 3.11.

---

## Quick start

### 1. Start the relay server

Run this on any machine with a public IP — a small cloud VM, a campus server, or
your laptop with a Cloudflare tunnel for development:

```bash
streamrelay --host 0.0.0.0 --port 8765 --secret my-shared-secret
```

Development shortcut (no public server needed):

```bash
streamrelay --port 8765 &
cloudflared tunnel --url http://localhost:8765   # gives you a public wss:// URL
```

### 2. On the HPC compute node — producer

Inside your job script or remote function, send tokens as your model generates them:

```python
from streamrelay import RelayProducer

# relay_url and channel_id are passed in as job arguments or env vars
with RelayProducer(relay_url, channel_id, relay_secret="my-shared-secret") as relay:
    for token in your_model.stream(prompt):
        relay.send_token(token)
# done signal is sent automatically on exit
```

This works in any execution context: a SLURM batch script, a PBS job, a plain SSH
command, a Globus Compute function, or any subprocess.

### 3. On your application — consumer

```python
import uuid
from streamrelay import RelayConsumer

channel_id = str(uuid.uuid4())   # generate before submitting the job

# submit the job here — SLURM, PBS, Globus Compute, SSH — your choice
# pass relay_url and channel_id to the job as arguments or env vars

for token in RelayConsumer(relay_url, channel_id, relay_secret="my-shared-secret").stream():
    print(token, end="", flush=True)
```

Async version for FastAPI or other asyncio applications:

```python
async for token in RelayConsumer(relay_url, channel_id):
    yield f"data: {token}\n\n"   # forward as Server-Sent Events to a browser
```

---

## Example: SLURM job

**Submitter (your laptop or login node):**

```python
# submit.py
import subprocess, uuid
from streamrelay import RelayConsumer

relay_url = "wss://your-relay.example.com"
channel_id = str(uuid.uuid4())

subprocess.run([
    "sbatch",
    f"--export=ALL,RELAY_URL={relay_url},CHANNEL_ID={channel_id}",
    "inference_job.sh",
])

for token in RelayConsumer(relay_url, channel_id).stream():
    print(token, end="", flush=True)
```

**Job script (SLURM compute node):**

```bash
#!/bin/bash
#SBATCH --partition=gpu --gres=gpu:1

python - <<'EOF'
import os
from streamrelay import RelayProducer

with RelayProducer(os.environ["RELAY_URL"], os.environ["CHANNEL_ID"]) as relay:
    for token in your_model.stream(prompt):
        relay.send_token(token)
EOF
```

---

## Globus Compute integration

[Globus Compute](https://www.globus.org/compute) is a federated function execution
service that dispatches Python functions to remote HPC endpoints (which themselves
run on SLURM or PBS clusters). Because Globus Compute returns a single result when
the function completes, it has no native mechanism for streaming incremental output.
streamrelay adds that capability.

```bash
pip install streamrelay[globus]
```

`StreamingExecutor` wraps any Globus Compute function and streams its output:

```python
from streamrelay import StreamingExecutor

async with StreamingExecutor(
    endpoint_id="your-globus-endpoint-uuid",
    relay_url="wss://your-relay.example.com",
    relay_secret="my-shared-secret",
) as executor:
    async for token in executor.stream(my_inference_fn, prompt="Explain quantum entanglement"):
        print(token, end="", flush=True)
```

Your remote function receives `relay_url` and `channel_id` as keyword arguments
automatically:

```python
def my_inference_fn(prompt, relay_url, channel_id, relay_secret=""):
    # all imports must be inline — Globus Compute serializes this function
    from streamrelay import RelayProducer
    with RelayProducer(relay_url, channel_id, relay_secret=relay_secret) as relay:
        for token in call_vllm_streaming(prompt):
            relay.send_token(token)
```

---

## End-to-end encryption

By default, the relay server can see the token payloads it forwards. For sensitive
workloads (medical, financial, or personal data), enable AES-256-GCM end-to-end
encryption. The relay then forwards opaque ciphertext and cannot read the content.

Generate a key once and store it in your `.env`:

```bash
python -c "from streamrelay import generate_key; print(generate_key())"
```

Pass the same key to both producer and consumer:

```python
# Producer (HPC node)
with RelayProducer(relay_url, channel_id, encryption_key=KEY) as relay:
    relay.send_token(token)

# Consumer (your application)
for token in RelayConsumer(relay_url, channel_id, encryption_key=KEY).stream():
    print(token, end="", flush=True)
```

Each message uses a fresh random 12-byte nonce. The GCM authentication tag detects
any tampering in transit.

---

## Security model

`streamrelay` enforces three independent security layers:

### Layer 1 — Transport encryption (TLS)

Deploy the relay behind a TLS-terminating reverse proxy (Caddy, nginx) so all
connections use `wss://` (WebSocket over TLS). This encrypts traffic between each
client and the relay server. See [docs/deployment.md](docs/deployment.md) for a
Caddy setup with auto-provisioned Let's Encrypt certificates.

### Layer 2 — Access control (shared secret)

Start the relay with `--secret MY_SECRET`. Every producer and consumer must supply
the same value as a query parameter (`?secret=MY_SECRET`). Connections without the
correct secret are rejected at the WebSocket handshake before any channel state is
created.

```bash
# Server
streamrelay --port 8765 --secret MY_SECRET

# Producer (HPC node) — same secret
with RelayProducer(relay_url, channel_id, relay_secret="MY_SECRET") as relay: ...

# Consumer (your app) — same secret
RelayConsumer(relay_url, channel_id, relay_secret="MY_SECRET").stream()
```

**How to share the secret with the HPC node:** pass it as a job argument, an
environment variable in your SLURM/PBS script, or as a keyword argument to your
Globus Compute function. It does not need to be embedded in code:

```bash
# SLURM — pass via --export
sbatch --export=ALL,RELAY_URL=wss://...,RELAY_SECRET=MY_SECRET job.sh

# Globus Compute — inject as a kwarg
executor.submit(my_fn, relay_url=relay_url, relay_secret=MY_SECRET, ...)
```

In addition to the shared secret, each request uses a **unique UUID channel ID**
(122 bits of entropy). Even if an attacker knows the relay address, guessing a valid
channel ID is computationally infeasible. The relay holds no persistent state —
all channel state is discarded once both sides disconnect. No OAuth2 credentials or
user identity information traverse the relay at any point.

### Layer 3 — End-to-end payload encryption (AES-256-GCM)

TLS protects the link to the relay, but the relay operator can still see plaintext
token payloads. For sensitive workloads (medical, financial, or personal data),
enable AES-256-GCM end-to-end encryption. The relay then forwards opaque ciphertext
and cannot read the content.

**Generate a key once** and store it securely (e.g., in your `.env`):

```bash
python -c "from streamrelay import generate_key; print(generate_key())"
# Outputs a base64-encoded 32-byte key, e.g.: xK3mP9vQ2rL...
```

**Pass the same key to both producer and consumer:**

```python
KEY = os.getenv("RELAY_ENCRYPTION_KEY")

# Producer (HPC node)
with RelayProducer(relay_url, channel_id, encryption_key=KEY) as relay:
    relay.send_token(token)

# Consumer (your app)
for token in RelayConsumer(relay_url, channel_id, encryption_key=KEY).stream():
    print(token, end="", flush=True)
```

Each message is encrypted with a **fresh random 12-byte nonce** (per NIST SP
800-38D). The GCM authentication tag detects any tampering in transit — if the relay
or any intermediary modifies a message, decryption raises an `InvalidTag` exception
rather than silently returning corrupted data. Encryption is opt-in and
backward-compatible: an unencrypted consumer connecting to an encrypted producer
will receive ciphertext it cannot parse, but no silent data corruption occurs.

### Summary

| Layer | Mechanism | Protects against | How to enable |
|-------|-----------|-----------------|---------------|
| TLS (`wss://`) | Reverse proxy (Caddy) | Network eavesdropping | Deploy behind Caddy/nginx |
| Shared secret | WebSocket handshake | Unauthorized connections | `--secret` flag on server |
| AES-256-GCM | Per-message encryption | Relay operator reading payloads | `encryption_key=` on producer + consumer |
| UUID channel isolation | 122-bit random ID | Channel collision / guessing | Always on |

See [docs/deployment.md](docs/deployment.md) for a production deployment guide
(cloud VM + Caddy + systemd).

---

## Relay protocol

All messages are JSON strings. The relay forwards them without interpretation:

```
{"type": "token",  "content": "Hello"}        ← one text chunk
{"type": "done",   "usage": {...}}             ← generation complete
{"type": "error",  "message": "..."}           ← something went wrong
```

When encryption is enabled, each message is wrapped before transmission:

```
{"type": "enc", "d": "<base64(nonce + ciphertext + GCM tag)>"}
```

---

## API reference

### `RelayProducer`

Runs on the HPC compute node. Connects outbound to the relay and sends tokens.

```python
from streamrelay import RelayProducer

# Synchronous — use inside SLURM jobs, PBS scripts, Globus Compute functions
with RelayProducer(
    relay_url,          # str: "wss://relay.example.com" or "ws://localhost:8765"
    channel_id,         # str: uuid.uuid4() generated before submitting the job
    relay_secret="",    # str: must match --secret on the relay server
    encryption_key="",  # str: base64 AES-256 key from generate_key(); "" = no encryption
) as relay:
    relay.send_token("Hello")            # send one text chunk
    relay.send_token(" world")
    # send_done() called automatically when the with block exits normally
    # send_error() called automatically if an exception is raised inside the block

# Asynchronous — use when your code already runs in an asyncio event loop
async with RelayProducer(relay_url, channel_id) as relay:
    await relay._async_send_raw({"type": "token", "content": "Hello"})
```

**Explicit methods (when not using the context manager):**

```python
p = RelayProducer(relay_url, channel_id)
p.connect()                              # open the synchronous WebSocket
p.send_token("chunk")                   # send a token
p.send_done(usage={"total_tokens": 50}) # signal completion with optional usage stats
p.send_error("something broke")         # report an error (also sends done)
p.close()                               # close the connection
```

---

### `RelayConsumer`

Runs on your application side. Connects outbound to the relay and yields tokens.

```python
from streamrelay import RelayConsumer

consumer = RelayConsumer(
    relay_url,          # str: same relay URL as the producer
    channel_id,         # str: same channel_id passed to RelayProducer
    relay_secret="",    # str: same secret as the producer
    encryption_key="",  # str: same encryption key as the producer
)

# --- Synchronous iteration (CLI scripts, Jupyter notebooks) ---
for token in consumer.stream():
    print(token, end="", flush=True)

# --- Asynchronous iteration (FastAPI, aiohttp, any asyncio application) ---
async for token in consumer:            # uses __aiter__ → astream()
    yield f"data: {token}\n\n"         # forward as Server-Sent Events

# --- Collect the full response as a single string ---
text = consumer.collect()               # blocking
text = await consumer.acollect()        # async
```

**Connect the consumer before (or at the same time as) submitting the HPC job.**
Any tokens that arrive before you connect are buffered by the relay (default 1,000
messages) and flushed when you connect — you will not miss the beginning of the response.

---

### `start_relay` / `streamrelay` CLI

Start the relay server — run this once on any machine with a public IP.

```bash
# CLI
streamrelay --host 0.0.0.0 --port 8765 --secret MY_SECRET

# All options:
streamrelay --help
#   --host HOST            bind address (default: 0.0.0.0)
#   --port PORT            port to listen on (default: 8765)
#   --secret SECRET        shared auth secret; also reads RELAY_SECRET env var
#   --max-buffer N         max buffered messages per channel (default: 1000)
#   --channel-timeout N    seconds before abandoned channels are reaped (default: 300)
#   --log-level LEVEL      DEBUG / INFO / WARNING / ERROR (default: INFO)
```

```python
# Python API — embed the relay inside an existing asyncio application
import asyncio
from streamrelay import start_relay

asyncio.run(start_relay(
    host="0.0.0.0",
    port=8765,
    secret="MY_SECRET",
    max_buffer=1000,
    channel_timeout=300,
))
```

**Health check** — the relay exposes `/health` (no auth required):

```python
import asyncio, websockets, json

async def check(relay_url):
    async with websockets.connect(f"{relay_url}/health") as ws:
        status = json.loads(await ws.recv())
        print(status)  # {"status": "healthy", "active_channels": 0, "timestamp": "..."}

asyncio.run(check("wss://relay.example.com"))
```

---

### `generate_key`

```python
from streamrelay import generate_key

key = generate_key()          # base64-encoded 32-byte AES-256 key
print(key)                    # e.g. "xK3mP9vQ2rL8nJ6w..."
# Store in .env as RELAY_ENCRYPTION_KEY=<key>
# Pass the same key to both RelayProducer and RelayConsumer
```

Or from the shell:
```bash
python -c "from streamrelay import generate_key; print(generate_key())"
```

---

### `StreamingExecutor` (Globus Compute)

High-level wrapper for Globus Compute users. Handles channel ID generation,
function submission with relay coordinates injected, and relay consumption.

```python
from streamrelay import StreamingExecutor

async with StreamingExecutor(
    endpoint_id="your-globus-endpoint-uuid",
    relay_url="wss://relay.example.com",
    relay_secret="MY_SECRET",
    encryption_key="",          # optional AES-256 key
    consumer_timeout=300.0,     # seconds to wait for first token
) as executor:
    async for token in executor.stream(my_inference_fn, prompt="Hello"):
        print(token, end="", flush=True)
```

Your remote function automatically receives `relay_url`, `channel_id`, and
optionally `relay_secret` / `encryption_key` as extra kwargs:

```python
def my_inference_fn(prompt, relay_url, channel_id, relay_secret="", encryption_key=""):
    # All imports must be inline — Globus Compute serializes only the function body
    from streamrelay import RelayProducer        # if streamrelay is installed on the endpoint
    with RelayProducer(relay_url, channel_id, relay_secret=relay_secret) as relay:
        for token in call_vllm_streaming(prompt):
            relay.send_token(token)
    return "ok"
```

If `streamrelay` is not installed on the endpoint workers, use the **inline producer
pattern** from [docs/tutorial.md](docs/tutorial.md) (Pattern B / Pattern C) — it requires
only `websockets` and `cryptography`, which are available on most HPC environments.

---

## Troubleshooting

**Consumer hangs and never receives any tokens**

1. Check the relay is reachable from both sides: `ws://your-relay:8765/health`
2. Check the `channel_id` matches exactly between producer and consumer — a mismatch
   means they connect to different channels and never find each other
3. Check the `relay_secret` matches — a wrong secret is rejected at handshake with
   WebSocket close code 4003; catch with `"4003" in str(e)`
4. Check the producer actually ran — if the Globus job failed before connecting,
   the consumer waits until the channel timeout (default 5 minutes)

**`ConnectionRefusedError` or `ConnectionClosedError`**

- Relay server is not running, or the URL/port is wrong
- For `wss://` connections: the TLS certificate must be valid (use Caddy or Let's Encrypt)
- For development: use `ws://` (unencrypted) with a local relay + Cloudflare tunnel for
  the public URL

**`InvalidTag` when decrypting**

- The `encryption_key` does not match between producer and consumer — generate once and
  store in both environments: `python -c "from streamrelay import generate_key; print(generate_key())"`

**Tokens arrive out of order**

- The relay forwards messages in arrival order — this should not happen
- If using the buffering path (producer connects first), messages are flushed in FIFO order

**`streamrelay` command not found after `pip install`**

- The `streamrelay` CLI is installed into your Python environment's bin directory
- Activate your virtual environment first, or use `python -m streamrelay.server`

**`ModuleNotFoundError: No module named 'streamrelay'` on the HPC node**

- The HPC endpoint workers may not have `streamrelay` installed
- Use the **inline producer pattern** (no install needed): see Pattern B in
  [docs/tutorial.md](docs/tutorial.md)

---

## Documentation

| Guide | What it covers |
|-------|---------------|
| [docs/tutorial.md](docs/tutorial.md) | **Start here.** Zero-to-streaming walkthrough: deploy relay, three producer patterns (pip install / inline / Globus Compute exec), consumer patterns, passing credentials to HPC jobs, E2E encryption, production checklist |
| [docs/deployment.md](docs/deployment.md) | Relay server deployment: Cloudflare tunnel, VM + Caddy + systemd, Docker Compose, health monitoring |
| [CONTRIBUTING.md](CONTRIBUTING.md) | Testing at three levels: unit tests, local end-to-end via Cloudflare, live relay test script |

---

## Citation

If you use streamrelay in your research, please cite the JOSS paper:

```bibtex
@article{nassar2026streamrelay,
  title   = {{streamrelay}: A {WebSocket} Relay for Real-Time Token Streaming
             from Batch {HPC} Executors},
  author  = {Nassar, Anas and Mohr, Steve and Apanasevich, Leonard and Sharma, Himanshu},
  journal = {Journal of Open Source Software},
  year    = {2026},
  doi     = {10.21105/joss.TODO},
}
```

streamrelay was developed as part of the STREAM system:

```bibtex
@inproceedings{nassar2026stream,
  title     = {{STREAM}: Smart Tiered Routing Engine for {AI} Models},
  author    = {Nassar, Anas and Mohr, Steve and Apanasevich, Leonard and Sharma, Himanshu},
  booktitle = {Proceedings of PEARC '26},
  year      = {2026},
}
```

---

## License

Apache 2.0 — see [LICENSE](LICENSE).
