Metadata-Version: 2.4
Name: gen-worker
Version: 0.4.4
Summary: A library used to build custom functions in Cozy Creator's serverless function platform.
Project-URL: Homepage, https://github.com/cozy-creator/python-gen-worker
Project-URL: Repository, https://github.com/cozy-creator/python-gen-worker
Project-URL: Issues, https://github.com/cozy-creator/python-gen-worker/issues
Author-email: Paul Fidika <paul@fidika.com>
License-Expression: MIT
License-File: LICENSE
Keywords: ai,cozy,inference,ml,serverless
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.11
Requires-Dist: aiohttp>=3.11.14
Requires-Dist: backoff>=2.2.1
Requires-Dist: blake3>=1.0.0
Requires-Dist: grpcio>=1.71.0
Requires-Dist: huggingface-hub>=0.26.0
Requires-Dist: msgspec>=0.18.6
Requires-Dist: numpy>=1.24.0
Requires-Dist: pillow>=9.0.0
Requires-Dist: protobuf>=6.30.0
Requires-Dist: psutil>=7.0.0
Requires-Dist: pyjwt[crypto]>=2.8.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: requests>=2.32.0
Requires-Dist: tomli-w>=1.2.0
Requires-Dist: tqdm>=4.66.0
Provides-Extra: dev
Requires-Dist: grpcio-tools>=1.71.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pytest>=9.0.0; extra == 'dev'
Requires-Dist: types-pyyaml>=6.0.12.20250915; extra == 'dev'
Requires-Dist: types-requests>=2.32.4.20250913; extra == 'dev'
Provides-Extra: torch
Requires-Dist: flashpack>=0.2.1; extra == 'torch'
Requires-Dist: safetensors>=0.7.0; extra == 'torch'
Requires-Dist: torch>=2.10.0; extra == 'torch'
Requires-Dist: torchaudio>=2.10.0; extra == 'torch'
Requires-Dist: torchvision>=0.25.0; extra == 'torch'
Provides-Extra: trainer
Requires-Dist: pyarrow>=17.0.0; extra == 'trainer'
Description-Content-Type: text/markdown

# gen-worker

A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.

## Tenant Worker Build Contract (Dockerfile-First)

When publishing a tenant worker, Cozy expects a **Dockerfile-first** project layout.

Build inputs MUST include:

- `endpoint.toml` (Cozy manifest; used at build/publish time)
- `Dockerfile` (builds the worker image)
- tenant code (`pyproject.toml`, `uv.lock`, `src/`, etc.)

The built image MUST:

1. Install `gen-worker` (so discovery + runtime can run).
2. Bake function discovery output (manifest) at build time:

```dockerfile
RUN mkdir -p /app/.tensorhub && python -m gen_worker.discover > /app/.tensorhub/endpoint.lock
```

3. Use the Cozy worker runtime as the ENTRYPOINT:

```dockerfile
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]
```

Notes:

- `endpoint.toml` is **not required** to be present in the final image; it is a build-time input.
- The platform reads `/app/.tensorhub/endpoint.lock` from the built image and stores it in Cozy Hub DB for routing/invocation.

## Installation

Start a python project, and then run:

```bash
uv add gen-worker
```

With PyTorch support:

```bash
uv add gen-worker[torch]
```

## Quick Start

```python
import msgspec
from gen_worker import RequestContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    text: str

@worker_function()
def generate(ctx: RequestContext, payload: Input) -> Output:
    return Output(text=f"Hello, {payload.prompt}!")
```

## Features

- **Function discovery** - Automatic detection of `@worker_function` decorated functions
- **Schema generation** - Input/output schemas extracted from msgspec types
- **Model injection** - Dependency injection for ML models with caching
- **Streaming output** - Support for incremental/streaming responses
- **Progress reporting** - Built-in progress events via `RequestContext`
- **Perf metrics** - Best-effort per-run metrics emitted to gen-orchestrator (`metrics.*` worker events)
- **Trainer runtime mode** - SDK-native trainer loop via `WORKER_MODE=trainer`
- **File handling** - Upload/download assets via Cozy hub file API
- **Model caching** - LRU cache with VRAM/disk management and cache-aware routing

## Usage

### Basic Function

```python
import msgspec
from gen_worker import RequestContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    result: str

@worker_function()
def my_function(ctx: RequestContext, payload: Input) -> Output:
    return Output(result=f"Processed: {payload.prompt}")
```

### Streaming Output

```python
from typing import Iterator

class Delta(msgspec.Struct):
    chunk: str

@worker_function()
def stream(ctx: RequestContext, payload: Input) -> Iterator[Delta]:
    for word in payload.prompt.split():
        if ctx.is_canceled():
            raise InterruptedError("canceled")
        yield Delta(chunk=word)
```

### Model Injection

Declare fixed model keys in code, with refs/dtypes in `endpoint.toml [models]`:

```toml
[models]
sd15 = { ref = "stable-diffusion-v1-5/stable-diffusion-v1-5", dtypes = ["fp16", "bf16"] }
```

```python
from typing import Annotated
from diffusers import DiffusionPipeline
from gen_worker.injection import ModelRef, ModelRefSource as Src

@worker_function()
def generate(
    ctx: RequestContext,
    pipe: Annotated[DiffusionPipeline, ModelRef(Src.FIXED, "sd15")],
    payload: Input,
) -> Output:
    # Use the injected pipeline (loaded/cached by the worker's model manager).
    return Output(result="done")
```

### Payload-Selected Model (Short Key)

If you want the client payload to choose which repo to run, declare selector
keyspaces in `endpoint.toml` and use `ModelRef(PAYLOAD, ...)`:

```toml
[models.generate]
sd15 = { ref = "stable-diffusion-v1-5/stable-diffusion-v1-5", dtypes = ["fp16", "bf16"] }
flux = { ref = "black-forest-labs/flux.2-klein-4b", dtypes = ["bf16"] }
```

```python
from typing import Annotated
import msgspec
from diffusers import DiffusionPipeline
from gen_worker import RequestContext, worker_function
from gen_worker.injection import ModelRef, ModelRefSource as Src

class Input(msgspec.Struct):
    prompt: str
    model: str  # must be one of: "sd15" | "flux"

@worker_function()
def generate(
    ctx: RequestContext,
    pipe: Annotated[DiffusionPipeline, ModelRef(Src.PAYLOAD, "model")],
    payload: Input,
):
    ...
```

Note: by default the worker requires payload model selection to use a known
short-key from the function keyspace in `endpoint.toml`. It will not accept
arbitrary repo refs in the payload.

### Saving Files

```python
@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
    # Save bytes and get asset reference
    asset = ctx.save_bytes(f"jobs/{ctx.request_id}/outputs/output.png", image_bytes)
    return Output(result=asset.ref)
```

For conversion/training weight artifacts, use `Tensors`:

```python
from gen_worker import Tensors

@worker_function()
def convert(ctx: RequestContext, payload: Input) -> Output:
    local_weights = "/tmp/converted.safetensors"
    tensors = ctx.save_checkpoint(
        f"jobs/{ctx.request_id}/outputs/weights.safetensors",
        local_weights,
    )
    return Output(weights=tensors)
```

For large artifacts, stream bytes incrementally and finalize once:

```python
with ctx.open_checkpoint_stream(
    f"jobs/{ctx.request_id}/outputs/weights.safetensors",
    format="safetensors",
) as out:
    for chunk in generate_chunks():
        out.write(chunk)
    tensors = out.finalize()
```

### Trainer Mode (Class-Only)

```python
from gen_worker import StepContext, StepResult

class MyTrainer:
    def setup(self, ctx: StepContext) -> None:
        pass

    def configure(self, ctx: StepContext) -> dict[str, object]:
        return {"step": 0}

    def prepare_batch(self, raw_batch: object, state: dict[str, object], ctx: StepContext) -> object:
        return raw_batch

    def train_step(self, batch: object, state: dict[str, object], ctx: StepContext) -> StepResult:
        return StepResult(metrics={"train/loss": 0.123})

    def state_dict(self, state: dict[str, object]) -> dict[str, object]:
        return dict(state)

    def load_state_dict(self, state: dict[str, object], payload: dict[str, object], ctx: StepContext) -> None:
        state.update(payload)
```

Run trainer mode:

```bash
WORKER_MODE=trainer \
TRAINER_JOB_SPEC_PATH=/app/.cozy/trainer_job.json \
python -m gen_worker.entrypoint
```

Full authoring guide: `docs/custom-trainer-authoring.md`.
Orchestrated runtime contract: `docs/issue-081-orchestrated-trainer-runtime.md`.

## Dev HTTP Runner (Local Inference Without gen-orchestrator)

For local testing of a built worker image (without standing up gen-orchestrator),
run the dev HTTP runner and write outputs to a mounted local directory.

Container example:

```bash
docker run --rm --gpus all -p 8081:8081 \
  -v "$(pwd)/out:/outputs" \
  -e TENSORHUB_URL='http://host.docker.internal:7777' \
  <your-worker-image> \
  python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs
```

Prefetch a public model (example: SD1.5):

```bash
curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
  -H 'content-type: application/json' \
  -d '{"models":[{"ref":"runwayml/stable-diffusion-v1-5","dtypes":["bf16","fp16"]}]}'
```

Invoke a function:

```bash
curl -sS -X POST 'http://localhost:8081/v1/request/generate' \
  -H 'content-type: application/json' \
  -d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'
```

Outputs are written under `/outputs/jobs/<request_id>/outputs/...` (matching Cozy ref semantics).

## Configuration

### endpoint.toml

```toml
schema_version = 1
name = "my-worker"
main = "my_pkg.main"

[functions.generate]
batch_dimension = "items"  # optional

[models]
sdxl = { ref = "stabilityai/stable-diffusion-xl-base-1.0", dtypes = ["fp16", "bf16"] }

[models.generate]
dreamshaper = { ref = "lykon/dreamshaper-xl-v2-turbo", dtypes = ["fp16", "bf16"] }

[resources]
max_inflight_requests = 1
```

### Environment Variables

Orchestrator-injected (production contract):

| Variable | Default | Description |
|----------|---------|-------------|
| `WORKER_MODE` | `inference` | Runtime mode selector (`inference` or `trainer`) |
| `SCHEDULER_PUBLIC_ADDR` | - | Scheduler address workers should dial |
| `SCHEDULER_ADDRS` | - | Optional comma-separated LB seed addresses |
| `WORKER_JWT` | - | Worker-connect JWT (required; claims are authoritative) |

Local dev / advanced (not injected by orchestrator):

| Variable | Default | Description |
|----------|---------|-------------|
| `SCHEDULER_JWKS_URL` | - | Optional: verify WORKER_JWT locally against scheduler JWKS |
| `SCHEDULER_JWT_ISSUER` | - | Optional: expected `iss` when verifying WORKER_JWT locally |
| `SCHEDULER_JWT_AUDIENCE` | - | Optional: expected `aud` when verifying WORKER_JWT locally |
| `USE_TLS` | `false` | Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream |
| `LB_ONLY_RETRIES` | `true` | Retry via configured LB endpoint(s) only; ignore direct owner redirect hints |
| `RECONNECT_DELAY` | `0.1` | Base reconnect backoff in seconds (exponential) |
| `RECONNECT_MAX_DELAY` | `1.0` | Reconnect backoff cap in seconds |
| `RECONNECT_JITTER_SECONDS` | `0.1` | Added jitter upper bound in seconds, capped by `RECONNECT_MAX_DELAY` |
| `MAX_RECONNECT_ATTEMPTS` | `0` | Max reconnect attempts (`0` = infinite retries) |
| `WORKER_MAX_CONCURRENCY` | - | Max concurrent request executions |
| `WORKER_MAX_INPUT_BYTES` | - | Max input payload size |
| `WORKER_MAX_OUTPUT_BYTES` | - | Max output payload size |
| `WORKER_MAX_UPLOAD_BYTES` | - | Max file upload size |
| `WORKER_MAX_VRAM_GB` | Auto | Maximum VRAM for models |
| `WORKER_VRAM_SAFETY_MARGIN_GB` | 3.5 | Reserved VRAM for working memory |
| `TENSORHUB_CACHE_DIR` | `~/.cache/tensorhub` | TensorHub cache root; worker CAS defaults derive from this (`${TENSORHUB_CACHE_DIR}/cas/...`) |
| `WORKER_LOCAL_MODEL_CACHE_DIR` | `/tmp/tensorhub/local-model-cache` | Optional local (non-NFS) cache for snapshot localization |
| `WORKER_REGISTER_TIMEOUT_S` | `90` | Startup watchdog: fail fast if worker never registers with scheduler |
| `WORKER_WARN_MODEL_RESOLVE_S` | `30` | Emit `request.model_resolve.stuck` warning after this duration |
| `WORKER_WARN_MODEL_LOAD_S` | `60` | Emit `request.model_load.stuck` warning after this duration |
| `WORKER_WARN_INFERENCE_S` | `60` | Emit `request.inference.stuck` warning after this duration |
| `WORKER_MAX_CONCURRENT_DOWNLOADS` | 2 | Max parallel model downloads |
| `TENSORHUB_URL` | - | Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve) |
| `WORKER_ALLOW_TENSORHUB_API_RESOLVE` | `false` | Local dev only: allow the worker to call Cozy Hub resolve APIs |
| `TENSORHUB_TOKEN` | - | Cozy Hub bearer token (optional; enables ingest-if-missing for public models, if Cozy Hub requires auth) |
| `TRAINER_JOB_SPEC_PATH` | `/app/.cozy/trainer_job.json` | Trainer-mode JSON job manifest path |
| `TRAINER_PLUGIN` | - | Trainer plugin import (`module:symbol`); optional if provided in job JSON |
| `TRAINER_CHECKPOINTS_DIR` | `/tmp/training/checkpoints` | Local checkpoint output directory in trainer mode |
| `TRAINER_SAMPLES_DIR` | `/tmp/training/samples` | Local sample output directory in trainer mode |
| `TRAINER_EVENTS_PATH` | - | Optional line-delimited JSON lifecycle event log for trainer mode |

## Metrics

The worker can emit best-effort performance/debug metrics to gen-orchestrator via `WorkerEvent` messages.

See `docs/metrics.md`.
See `docs/worker-stuck-visibility.md` for startup/request watchdog events used to diagnose stuck workers.

### Model Download Behavior

Model refs are plain lower-case strings:
- `owner/repo`
- `owner/repo:tag`
- `owner/repo@blake3:<digest>`

Tags are mutable pointers that resolve to published versions.

Cozy snapshot/object file downloads are written to `*.part` and then atomically renamed on success. If a `*.part` file exists from a previous interrupted download, the worker attempts to resume it using HTTP `Range` requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.

## Docker Deployment

### Project Structure

```
my-worker/
├── pyproject.toml
├── uv.lock
└── src/
    └── my_module/
        └── main.py
```

### Local Dev Build (Using Root `Dockerfile`)

For production, use the `cozyctl` CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided `Dockerfile`:

```bash
# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15

# Run
docker run \
  -e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
  -e WORKER_JWT='<worker-connect-jwt>' \
  sd15-worker
```

Canonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):

```bash
cd ~/cozy/python-gen-worker

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu126 \
  --build-arg TORCH_SPEC='~=2.10.0' \
  -f Dockerfile \
  -t my-worker:dev \
  examples/sd15
```

Optional build args:

```bash
docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu128 \
  --build-arg TORCH_SPEC=">=2.9,<3" \
  -t my-worker -f Dockerfile examples/sd15
```

### Build Base

Worker images build directly from a Python+uv base image:

- `ghcr.io/astral-sh/uv:python3.12-bookworm-slim`

PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.

## Publish/Promote Lifecycle

Control-plane behavior (tensorhub + orchestrator):

- Every publish creates a new immutable internal `release_id`.
- End users invoke functions by `owner/endpoint/function` (default `prod`) or `owner/endpoint/function:tag`.
- `endpoint` is derived from `endpoint.toml` `name` and normalized to a URL-safe slug.
- `function` names are derived from Python `@worker_function` names and normalized to URL-safe slugs (for example, `medasr_transcribe` -> `medasr-transcribe`).
- Publishing does not move traffic by default.
- Promoting a function tag moves traffic to that release.
- Rollback is just retargeting the tag to an older release.

## Model Cache

Workers report model availability for intelligent job routing:

| State | Location | Latency |
|-------|----------|---------|
| Hot | VRAM | Instant |
| Warm | Disk | Seconds |
| Cold | None | Minutes (download required) |

## Dev Testing (Mock Orchestrator)

For local end-to-end tests without standing up `gen-orchestrator`, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one `JobExecutionRequest`, prints the result, and exits.

Start your worker container first:

```bash
docker run --rm \
  --add-host=host.docker.internal:host-gateway \
  -e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
  -e WORKER_JWT='dev-worker-jwt' \
  <your-worker-image>
```

In another terminal, send one request:

```bash
python -m gen_worker.testing.mock_orchestrator \
  --listen 0.0.0.0:8080 \
  --run hello \
  --payload-json '{"name":"world"}'
```

Run the command again with a different payload whenever you want to send another request.

```python
from gen_worker.model_cache import ModelCache

cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a")  # True
cache.get_vram_models()      # ["model-a"]
```

## Error Handling

```python
from gen_worker.errors import RetryableError, ValidationError, FatalError

@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
    if not payload.prompt:
        raise ValidationError("prompt is required")  # 400, no retry

    try:
        result = call_external_api()
    except TimeoutError:
        raise RetryableError("API timeout")  # Will be retried

    return Output(result=result)
```

## Development

```bash
# Install dev dependencies
uv sync --extra dev

# Run tests
uv run pytest

# Type checking
uv run mypy src/gen_worker

# Build
uv build
```

### Regenerating Protobuf Stubs

Requires `gen-orchestrator` as a sibling repo:

```bash
uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto
```

### Worker Wire Protocol

The worker advertises a protocol `MAJOR.MINOR` in `WorkerRegistration` (`protocol_major`, `protocol_minor`).

- Current runtime constants live in `src/gen_worker/wire_protocol.py`.
- Orchestrator compatibility policy/ranges are documented in `../gen-orchestrator/docs/worker_wire_protocol.md`.

## License

MIT
