Metadata-Version: 2.4
Name: gen-worker
Version: 0.5.6
Summary: A library used to build custom functions in Cozy Creator's serverless function platform.
Project-URL: Homepage, https://github.com/cozy-creator/python-gen-worker
Project-URL: Repository, https://github.com/cozy-creator/python-gen-worker
Project-URL: Issues, https://github.com/cozy-creator/python-gen-worker/issues
Author-email: Paul Fidika <paul@fidika.com>
License-Expression: MIT
License-File: LICENSE
Keywords: ai,cozy,inference,ml,serverless
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.11
Requires-Dist: aiohttp>=3.11.14
Requires-Dist: backoff>=2.2.1
Requires-Dist: blake3>=1.0.0
Requires-Dist: grpcio>=1.71.0
Requires-Dist: huggingface-hub>=0.26.0
Requires-Dist: msgspec>=0.18.6
Requires-Dist: numpy>=1.24.0
Requires-Dist: pillow>=9.0.0
Requires-Dist: protobuf>=6.30.0
Requires-Dist: psutil>=7.0.0
Requires-Dist: pyjwt[crypto]>=2.8.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: requests>=2.32.0
Requires-Dist: tomli-w>=1.2.0
Requires-Dist: tqdm>=4.66.0
Provides-Extra: dev
Requires-Dist: grpcio-tools>=1.71.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pytest>=9.0.0; extra == 'dev'
Requires-Dist: types-pyyaml>=6.0.12.20250915; extra == 'dev'
Requires-Dist: types-requests>=2.32.4.20250913; extra == 'dev'
Provides-Extra: torch
Requires-Dist: flashpack>=0.2.1; extra == 'torch'
Requires-Dist: safetensors>=0.7.0; extra == 'torch'
Requires-Dist: torch>=2.11.0; extra == 'torch'
Requires-Dist: torchaudio>=2.11.0; extra == 'torch'
Requires-Dist: torchvision>=0.25.0; extra == 'torch'
Provides-Extra: trainer
Requires-Dist: pyarrow>=17.0.0; extra == 'trainer'
Description-Content-Type: text/markdown

# gen-worker

A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.

## Tenant Worker Build Contract (Dockerfile-First)

When publishing a tenant worker, Cozy expects a **Dockerfile-first** project layout.

Build inputs MUST include:

- `endpoint.toml` (Cozy manifest; used at build/publish time)
- `Dockerfile` (builds the worker image)
- tenant code (`pyproject.toml`, `uv.lock`, `src/`, etc.)

The built image MUST:

1. Install `gen-worker` (so discovery + runtime can run).
2. Bake function discovery output (manifest) at build time:

```dockerfile
RUN mkdir -p /app/.tensorhub && python -m gen_worker.discover > /app/.tensorhub/endpoint.lock
```

3. Use the Cozy worker runtime as the ENTRYPOINT:

```dockerfile
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]
```

Notes:

- `endpoint.toml` is **not required** to be present in the final image; it is a build-time input.
- The platform reads `/app/.tensorhub/endpoint.lock` from the built image and stores it in Cozy Hub DB for routing/invocation.

## Installation

Start a python project, and then run:

```bash
uv add gen-worker
```

With PyTorch support:

```bash
uv add gen-worker[torch]
```

## Quick Start

```python
import msgspec
from gen_worker import RequestContext, worker_function

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    text: str

@worker_function()
def generate(ctx: RequestContext, payload: Input) -> Output:
    return Output(text=f"Hello, {payload.prompt}!")
```

## Features

- **Function discovery** - Automatic detection of `@worker_function` decorated functions
- **Schema generation** - Input/output schemas extracted from msgspec types
- **Model injection** - Dependency injection for ML models with caching
- **Streaming output** - Support for incremental/streaming responses
- **Progress reporting** - Built-in progress events via `RequestContext`
- **Perf metrics** - Best-effort per-run metrics emitted to gen-orchestrator (`metrics.*` worker events)
- **Trainer runtime mode** - SDK-native trainer loop via `WORKER_MODE=trainer`
- **File handling** - Upload/download assets via Cozy hub file API
- **Model caching** - LRU cache with VRAM/disk management and cache-aware routing

## Authoring Endpoints

Three endpoint types are supported — **inference**, **conversion**, and
**training**. See `docs/endpoint-authoring.md` for the full manual covering
`RequestContext`, model injection (fixed and payload-selected), streaming
output, file persistence, conversion reserved-name payloads
(`source`/`destination`/`outputs`), and the trainer class contract
(`setup`/`configure`/`prepare_batch`/`train_step`/`state_dict`/`load_state_dict`).

Training runs use trainer mode:

```bash
WORKER_MODE=trainer \
TRAINER_JOB_SPEC_PATH=/app/.cozy/trainer_job.json \
python -m gen_worker.entrypoint
```

## Dev HTTP Runner (Local Inference Without gen-orchestrator)

For local testing of a built worker image (without standing up gen-orchestrator),
run the dev HTTP runner and write outputs to a mounted local directory.

Container example:

```bash
docker run --rm --gpus all -p 8081:8081 \
  -v "$(pwd)/out:/outputs" \
  -e TENSORHUB_URL='http://host.docker.internal:7777' \
  <your-worker-image> \
  python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputs
```

Prefetch a public model (example: SD1.5):

```bash
curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
  -H 'content-type: application/json' \
  -d '{"models":[{"ref":"runwayml/stable-diffusion-v1-5","dtypes":["bf16","fp16"]}]}'
```

Invoke a function:

```bash
curl -sS -X POST 'http://localhost:8081/v1/request/generate' \
  -H 'content-type: application/json' \
  -d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'
```

Outputs are written under `/outputs/jobs/<request_id>/outputs/...` (matching Cozy ref semantics).

## Configuration

### endpoint.toml

```toml
schema_version = 1
name = "my-worker"
main = "my_pkg.main"

[functions.generate]
batch_dimension = "items"  # optional

[models]
sdxl = { ref = "stabilityai/stable-diffusion-xl-base-1.0", attributes = { dtype = ["fp16", "bf16"] } }

[models.generate]
dreamshaper = { ref = "lykon/dreamshaper-xl-v2-turbo", attributes = { dtype = ["fp16", "bf16"] } }

[resources]
max_inflight_requests = 1
```

### Environment Variables

Orchestrator-injected (production contract):

| Variable | Default | Description |
|----------|---------|-------------|
| `WORKER_MODE` | `inference` | Runtime mode selector (`inference` or `trainer`) |
| `PUBLIC_ORCHESTRATOR_GRPC_ADDR` | - | Scheduler address workers should dial |
| `SCHEDULER_ADDRS` | - | Optional comma-separated LB seed addresses |
| `WORKER_JWT` | - | Worker-connect JWT (required; claims are authoritative) |

Local dev / advanced (not injected by orchestrator):

| Variable | Default | Description |
|----------|---------|-------------|
| `SCHEDULER_JWKS_URL` | - | Optional: verify WORKER_JWT locally against scheduler JWKS |
| `SCHEDULER_JWT_ISSUER` | - | Optional: expected `iss` when verifying WORKER_JWT locally |
| `SCHEDULER_JWT_AUDIENCE` | - | Optional: expected `aud` when verifying WORKER_JWT locally |
| `USE_TLS` | `false` | Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream |
| `LB_ONLY_RETRIES` | `true` | Retry via configured LB endpoint(s) only; ignore direct owner redirect hints |
| `RECONNECT_DELAY` | `0.1` | Base reconnect backoff in seconds (exponential) |
| `RECONNECT_MAX_DELAY` | `1.0` | Reconnect backoff cap in seconds |
| `RECONNECT_JITTER_SECONDS` | `0.1` | Added jitter upper bound in seconds, capped by `RECONNECT_MAX_DELAY` |
| `MAX_RECONNECT_ATTEMPTS` | `0` | Max reconnect attempts (`0` = infinite retries) |
| `WORKER_MAX_CONCURRENCY` | - | Max concurrent request executions |
| `WORKER_MAX_INPUT_BYTES` | - | Max input payload size |
| `WORKER_MAX_OUTPUT_BYTES` | - | Max output payload size |
| `WORKER_MAX_UPLOAD_BYTES` | - | Max file upload size |
| `WORKER_MAX_VRAM_GB` | Auto | Maximum VRAM for models |
| `WORKER_VRAM_SAFETY_MARGIN_GB` | 3.5 | Reserved VRAM for working memory |
| `TENSORHUB_CACHE_DIR` | `~/.cache/tensorhub` | TensorHub cache root; worker CAS defaults derive from this (`${TENSORHUB_CACHE_DIR}/cas/...`) |
| `WORKER_LOCAL_MODEL_CACHE_DIR` | `/tmp/tensorhub/local-model-cache` | Optional local (non-NFS) cache for snapshot localization |
| `WORKER_REGISTER_TIMEOUT_S` | `90` | Startup watchdog: fail fast if worker never registers with scheduler |
| `WORKER_WARN_MODEL_RESOLVE_S` | `30` | Emit `request.model_resolve.stuck` warning after this duration |
| `WORKER_WARN_MODEL_LOAD_S` | `60` | Emit `request.model_load.stuck` warning after this duration |
| `WORKER_WARN_INFERENCE_S` | `60` | Emit `request.inference.stuck` warning after this duration |
| `WORKER_MAX_CONCURRENT_DOWNLOADS` | 2 | Max parallel model downloads |
| `TENSORHUB_URL` | - | Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve) |
| `WORKER_ALLOW_TENSORHUB_API_RESOLVE` | `false` | Local dev only: allow the worker to call Cozy Hub resolve APIs |
| `TENSORHUB_TOKEN` | - | Cozy Hub bearer token (optional; enables ingest-if-missing for public models, if Cozy Hub requires auth) |
| `TRAINER_JOB_SPEC_PATH` | `/app/.cozy/trainer_job.json` | Trainer-mode JSON job manifest path |
| `TRAINER_PLUGIN` | - | Trainer plugin import (`module:symbol`); optional if provided in job JSON |
| `TRAINER_CHECKPOINTS_DIR` | `/tmp/training/checkpoints` | Local checkpoint output directory in trainer mode |
| `TRAINER_SAMPLES_DIR` | `/tmp/training/samples` | Local sample output directory in trainer mode |
| `TRAINER_EVENTS_PATH` | - | Optional line-delimited JSON lifecycle event log for trainer mode |

## Metrics

The worker can emit best-effort performance/debug metrics to gen-orchestrator via `WorkerEvent` messages.

See the **Observability** section in `docs/endpoint-authoring.md` for the event catalog (request lifecycle, startup phases, per-run `metrics.*`, and cache inventory).

### Model Download Behavior

Model refs are plain lower-case strings:
- `owner/repo`
- `owner/repo:tag`
- `owner/repo@blake3:<digest>`

Tags are mutable pointers that resolve to published versions.

Cozy snapshot/object file downloads are written to `*.part` and then atomically renamed on success. If a `*.part` file exists from a previous interrupted download, the worker attempts to resume it using HTTP `Range` requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.

## Docker Deployment

### Project Structure

```
my-worker/
├── pyproject.toml
├── uv.lock
└── src/
    └── my_module/
        └── main.py
```

### Local Dev Build (Using Root `Dockerfile`)

For production, use the `cozyctl` CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided `Dockerfile`:

```bash
# Build an example using the same root Dockerfile
docker build -t medasr-worker -f Dockerfile examples/medasr-transcribe

# Run
docker run \
  -e PUBLIC_ORCHESTRATOR_GRPC_ADDR=orchestrator:8080 \
  -e WORKER_JWT='<worker-connect-jwt>' \
  medasr-worker
```

Canonical local dev build args (GPU, CUDA 12.6, torch 2.11.x, Python 3.12):

```bash
cd ~/cozy/python-gen-worker

docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu126 \
  --build-arg TORCH_SPEC='~=2.11.0' \
  -f Dockerfile \
  -t my-worker:dev \
  examples/medasr-transcribe
```

Optional build args:

```bash
docker build \
  --build-arg PYTHON_VERSION=3.12 \
  --build-arg UV_TORCH_BACKEND=cu128 \
  --build-arg TORCH_SPEC=">=2.9,<3" \
  -t my-worker -f Dockerfile examples/medasr-transcribe
```

### Build Base

Worker images build directly from a Python+uv base image:

- `ghcr.io/astral-sh/uv:python3.12-bookworm-slim`

PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.

## Publish/Promote Lifecycle

Control-plane behavior (tensorhub + orchestrator):

- Every publish creates a new immutable internal `release_id`.
- End users invoke functions by `owner/endpoint/function` (default `prod`) or `owner/endpoint/function:tag`.
- `endpoint` is derived from `endpoint.toml` `name` and normalized to a URL-safe slug.
- `function` names are derived from Python `@worker_function` names and normalized to URL-safe slugs (for example, `medasr_transcribe` -> `medasr-transcribe`).
- Publishing does not move traffic by default.
- Promoting a function tag moves traffic to that release.
- Rollback is just retargeting the tag to an older release.

## Model Cache

Workers report model availability for intelligent job routing:

| State | Location | Latency |
|-------|----------|---------|
| Hot | VRAM | Instant |
| Warm | Disk | Seconds |
| Cold | None | Minutes (download required) |

## Dev Testing (Mock Orchestrator)

For local end-to-end tests without standing up `gen-orchestrator`, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one `JobExecutionRequest`, prints the result, and exits.

Start your worker container first:

```bash
docker run --rm \
  --add-host=host.docker.internal:host-gateway \
  -e PUBLIC_ORCHESTRATOR_GRPC_ADDR=host.docker.internal:8080 \
  -e WORKER_JWT='dev-worker-jwt' \
  <your-worker-image>
```

In another terminal, send one request:

```bash
python -m gen_worker.testing.mock_orchestrator \
  --listen 0.0.0.0:8080 \
  --run hello \
  --payload-json '{"name":"world"}'
```

Run the command again with a different payload whenever you want to send another request.

```python
from gen_worker.model_cache import ModelCache

cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a")  # True
cache.get_vram_models()      # ["model-a"]
```

## Error Handling

```python
from gen_worker.errors import RetryableError, ValidationError, FatalError

@worker_function()
def process(ctx: RequestContext, payload: Input) -> Output:
    if not payload.prompt:
        raise ValidationError("prompt is required")  # 400, no retry

    try:
        result = call_external_api()
    except TimeoutError:
        raise RetryableError("API timeout")  # Will be retried

    return Output(result=result)
```

## Development

```bash
# Install dev dependencies
uv sync --extra dev

# Run tests
uv run pytest

# Type checking
uv run mypy src/gen_worker

# Build
uv build
```

### Regenerating Protobuf Stubs

Requires `gen-orchestrator` as a sibling repo:

```bash
uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.proto
```

### Worker Wire Protocol

The worker advertises a protocol `MAJOR.MINOR` in `WorkerRegistration` (`protocol_major`, `protocol_minor`).

- Current runtime constants live in `src/gen_worker/wire_protocol.py`.
- Orchestrator compatibility policy/ranges are documented in `../gen-orchestrator/docs/worker_wire_protocol.md`.

## License

MIT
