# Bao — agent guide (llms.txt)

Bao is a Python full-stack framework. Server-side templating. Secure and bounded by default.
This file teaches you (an AI agent) the conventions so you can write correct code on the first try.

## One obvious way to do each thing

- A page = a controller method that returns `view(...)`.
- A controller = a class decorated `@controller("/base")` in `app/controllers/`.
- A route = a method decorated `@get("/path")` or `@post("/path")` (path is relative to the base).
- A view = a Jinja2 template in `app/views/`, rendered with `view("name.html", **context)`. Autoescaped.
- Dependencies = type-annotated handler parameters are injected from the service container.
  `def index(self, request, config: Config)` → `request` and `config` are provided.

## Example controller

```python
from bao import controller, get, view, Config

@controller("/")
class HomeController:
    @get("/")
    def index(self, request, config: Config):
        return view("home.html", title=config.app_name)
```

## CLI

- `bao run` — start the dev server (loads `main:application`, override with `--app module:attr`).
- `bao serve` — production server (uvicorn workers, no auto-reload). `--host --port --workers`.
- `bao config` — print the resolved configuration with secrets redacted.
- `bao make:dockerfile` — scaffold a Dockerfile + .dockerignore for deployment.
- `bao check` — smoke-load the app and report the registered routes. Use this to verify your work.
- `bao describe` — print a structured JSON description of the app (routes + conventions).
- `bao make:controller NAME` — scaffold a controller and a matching view.
- `bao make:crud NAME field1[:type] field2[:type] ...` — scaffold a full vertical slice (model + controller + 4 views) and append the imports. Supported field types: `str` (default), `int`, `float`, `bool`, `datetime`. Example: `bao make:crud Task name:str count:int done:bool due:datetime`.
- `bao make:auth` — scaffold an email + password auth flow: `User` model, `AuthController` (`/login`, `/register`, `/logout`), `auth/login.html`, `auth/register.html`, and a smoke test. Argon2id hashes, signed-session login, CSRF-protected forms. Run `bao make:migration "create users" && bao migrate` afterwards.
- `bao make:api NAME` — scaffold a JSON API Resource for an existing model. Generates `app/api/<plural>_api.py`, a smoke test, and registers the import in `main.py`. The model must already exist (e.g. via `bao make:crud`).
- `bao workflows:list` — list workflows registered with `@bao.workflows.workflow`.
- `bao workflows:run NAME [--arg key=value ...]` — invoke a workflow by name.
- `bao workflows:resume RUN_ID [--arg key=value ...]` — continue a failed run; completed steps are skipped.
- `bao workflows:signal RUN_ID NAME PAYLOAD_JSON` — deliver a signal to a workflow waiting on `signal_in(NAME)`. PAYLOAD_JSON defaults to `null`.
- `bao init` — scaffold a new project in the current directory. Also writes `conftest.py` and `tests/__init__.py`, so `bao test` works immediately.
- `bao test` — run the test suite (pytest, zero-config). Extra args pass through.
- `bao logs --tail N --action ACTION --actor ID` — print the most recent AuditLog rows. Lightweight viewer for operators.
- `bao workflows:status RUN_ID` — print a WorkflowRun + every step's status / attempts / error.
- `bao workflows:replay RUN_ID [--deterministic-check]` — re-invoke a run; cached steps short-circuit.
- `bao migrate` — apply pending database migrations.
- `bao migrate:rollback` — roll back the last migration.
- `bao make:migration "MESSAGE"` — autogenerate a migration from current models.
- `bao tasks:list` — enumerate `@task` + `@every` entries.
- `bao tasks:run NAME [args...]` — run a registered task synchronously (operator helper).
- `bao scheduler:run [--once]` — block on the `@every` scheduler (or fire one pass and exit).
- `bao dlq:list [--queue X] [--tail N]` — list dead-letter rows.
- `bao dlq:replay ID` — mark a DLQ row replayed and print its payload.
- `bao plugins:list` / `bao plugins:audit` — list / inspect registered plugins.
- `bao tokens:issue USER_ID [--scope X] [--ttl SECONDS]` — mint an API bearer token (printed once).
- `bao tokens:list USER_ID` — list a user's tokens.
- `bao tokens:revoke TOKEN_ID_OR_PLAINTEXT` — revoke a token.
- `bao ai:ask "prompt"` / `bao ai:stream "prompt"` / `bao ai:providers` — drive the AI router.
- `bao agents:list` — enumerate registered `@agent` classes + tool counts.
- `bao agents:run AGENT "message" [--run-id RUN_ID]` — foreground agent run + tool trace.
- `bao agents:pending` / `bao agents:approve REQUEST_ID` / `bao agents:deny REQUEST_ID` — HITL.
- `bao memory:add NAME "text"` / `bao memory:search NAME "query" --k N` / `bao memory:prune NAME --max-age 30d --max-items 1000` — VectorMemory ops.
- `bao rag:ingest --title T (--text "..." | --from-file PATH)` — chunk + embed + persist.
- `bao rag:retrieve "q" --k N` / `bao rag:ask "q" --k N` — query the RAG store.
- `bao mcp:serve` — run the MCP stdio server (read tools always on; write tools gated by `BAO_MCP_ALLOW_WRITE=1`).

## Handler parameters (dependency injection by name and type)

Handler parameters are resolved in this order:
1. ``request`` (or annotated ``Request``) - the Starlette request.
2. ``form`` - parsed form data (only set for POST/PUT/PATCH with a form content-type).
3. A parameter named the same as a route placeholder (e.g. ``id`` for ``/{id:int}``).
4. Anything else with a type annotation bound in the service container.

## Data layer

Define models in `app/models/*.py`. They extend `bao.Model` (SQLAlchemy 2.0 under the hood)
and ship Eloquent-style classmethods. SQLite by default (zero config), swap to PostgreSQL
by setting `BAO_DATABASE_URL=postgresql+psycopg://user:pass@host/db`.

```python
from bao import Model
from sqlalchemy.orm import Mapped, mapped_column

class Post(Model):
    __tablename__ = "posts"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str]
    body: Mapped[str]

# Read
Post.find(1); Post.all(); Post.where(title="Hi"); Post.first(title="Hi"); Post.count()
# Write
Post.create(title="Hi", body="...")
post.save(); post.delete()
# Transactions / multi-step work
from bao import db
with db.session() as s:
    ...
```

Workflow for schema changes: edit the model, run `bao make:migration "what changed"`, review
the generated file under `migrations/versions/`, then `bao migrate`.

## Relationships (belongs_to / has_many)

Two Eloquent-style helpers on top of SQLAlchemy 2.0 ``relationship()``. Use them
inside a ``Model`` body alongside the standard ``mapped_column`` declarations:

```python
from bao import Model, belongs_to, has_many
from sqlalchemy.orm import Mapped, mapped_column


class Post(Model):
    __tablename__ = "posts"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str]
    # One-to-many: pair the back_populates with the inverse on Comment.
    comments = has_many("Comment", back_populates="post")


class Comment(Model):
    __tablename__ = "comments"
    id: Mapped[int] = mapped_column(primary_key=True)
    body: Mapped[str]
    # belongs_to() returns (foreign_key_column, relationship). Tuple-unpack at
    # class scope: the LHS names drive the column and attribute names.
    post_id, post = belongs_to(Post, back_populates="comments")
```

Navigation works in both directions inside an open ``db.session()`` (the
relationships lazy-load against the open connection):

```python
with db.session() as s:
    post = s.get(Post, 1)
    for c in post.comments:    # one-to-many
        print(c.body, c.post.id)
```

``make:crud`` recognises ``name:ref:OtherModel`` as a third field shape (alongside
``name`` and ``name:TYPE``). It emits the FK column + ``belongs_to`` wiring on the
generated model and treats the column as an integer in the form-coerce dict:

```
bao make:crud Note title:str author_id:ref:User
```

Mass-assignment safety: the Pydantic schemas derived from a Model only include
column attributes; ``has_many`` relationships are not columns, so a JSON client
cannot dump a payload that smuggles in arbitrary child rows.

## Tenant scoping (opt-in)

Multi-tenant models opt in with a single class flag; off by default, default-deny when on.

```python
from bao import Model
from sqlalchemy.orm import Mapped, mapped_column

class Widget(Model):
    __tablename__ = "widgets"
    tenant_scoped = True              # opt in: framework adds a tenant_id column

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
```

Set the active tenant with `bao.with_tenant(tenant_id)` (preferred, context manager) or
`bao.set_tenant(tenant_id)` (for middleware that sets it once per request):

```python
from bao import with_tenant

with with_tenant("acme"):
    Widget.create(name="hello")       # tenant_id auto-stamped
    Widget.all()                      # only acme rows
```

Outside any tenant context, every read and write on a tenant-scoped model raises
`bao.TenantContextRequired` (default-deny). This means a tenant-scoped model can never
silently leak rows across tenants. Single-tenant models (the default, `tenant_scoped =
False`) keep working exactly as before.

## Tenant middleware (auto-bind per request)

Use `App(tenant_resolver=...)` to set `current_tenant` automatically per HTTP request.
A resolver is a callable taking the Starlette request and returning the tenant id (or
`None` to leave it unset). Two ready-made strategies live in `bao.tenants`:

```python
from bao import App, header_resolver, subdomain_resolver

# X-Tenant: acme  ->  current_tenant = "acme"
application = App(view_paths=["app/views"], tenant_resolver=header_resolver("X-Tenant"))

# acme.bao.app   ->  current_tenant = "acme"
application = App(view_paths=["app/views"], tenant_resolver=subdomain_resolver(".bao.app"))
```

Without the middleware, tenant-scoped models still default-deny outside a
`with_tenant(...)` block; the middleware just supplies the value automatically.

## Auth (four kinds of principal)

Bao recognises four kinds of principal across HTTP, agents, and CLI tooling.
The right one to gate against depends on who is calling:

- **Human (browser).** Email + password via `bao make:auth`, argon2id hashes,
  signed-cookie sessions, CSRF on form posts. `current_user(request)` resolves
  the User from `request.session["user_id"]`. Decorators: `@requires_login`,
  `@guest_only`.
- **API client (machine, ride bearer tokens).** `issue_token(user_id, scopes,
  ttl_seconds)` mints a hashed-at-rest token; clients send
  `Authorization: Bearer <plaintext>` and `current_user` resolves the same
  User. Tokens carry `scope_list` so `Resource.policy(...)` can gate writes
  on scope.
- **Agent (in-process LLM).** Agents do not authenticate at the HTTP layer;
  they run inside a process that has already authenticated. Sensitive tools
  declare `@tool(requires_approval=True)`, which writes an
  `AgentApprovalRequest` row and requires an operator's
  `bao agents:approve REQUEST_ID`. The agent run id is the audit handle.
- **Plugin (in-process).** Plugins do not authenticate; they declare a
  `Capabilities` scope (network hosts, secrets, db, services) and read
  secrets through `plugin.secrets()` (which only returns env vars in
  `requires.secrets`). `bao plugins:audit` surfaces the declared scope.

## Auth decorators

`@requires_login` and `@guest_only` (from `bao`) wrap controller methods on top of
`current_user(request)`:

```python
from bao import controller, get, requires_login, guest_only, view

@controller("/")
class HomeController:
    @get("/dashboard")
    @requires_login
    def dashboard(self, request):
        return view("dashboard.html")

    @get("/login")
    @guest_only
    def login_form(self, request):
        return view("auth/login.html")
```

`requires_login` returns a 303 redirect to `/login` for anonymous requests;
`guest_only` returns a 303 redirect to `/` when the user is already signed in.

## API layer (opt-in JSON resources)

Declare a JSON CRUD endpoint set with one class. The default actions read and write via
the existing Model classmethods and serialize through `Model.to_dict()`:

```python
from bao import Resource
from bao.api import api
from app.models.post import Post

@api("/api/v1/posts")
class PostApi(Resource):
    model = Post
```

That registers five routes at the given base:

```
GET    /api/v1/posts          -> 200, [dict]
POST   /api/v1/posts          -> 201, dict + Location header
GET    /api/v1/posts/{id:int} -> 200 or 404
PUT    /api/v1/posts/{id:int} -> 200 or 404
DELETE /api/v1/posts/{id:int} -> 204 or 404
```

JSON bodies only (parsed via `await request.json()`). CSRF is *not* applied because
Bao's CSRF check only fires on form content-types; API clients carry their own
credentials. OpenAPI generation is deferred to a Pydantic-schema-from-Model iteration.

Mount the module in `main.py` so the decorator fires at import time:

```python
import app.api.posts_api  # noqa: E402,F401
```

### Auth on @api

Two opt-in hooks gate API resources without rewriting every action:

```python
@api("/api/v1/notes", requires_login=True)
class NotesApi(Resource):
    model = Note

    def policy(self, request, action, item=None):
        if action == "destroy" and item is not None:
            return item.author_id == request.session.get("user_id")
        return None  # allow everything else
```

- `requires_login=True` on `@api(...)` returns `401 {"error":"unauthenticated"}` for
  anonymous requests. The check uses `bao.current_user(request)`.
- `@requires_login` can also decorate an *individual* action method, so a single
  Resource can mix public reads with protected writes:

  ```python
  @api("/api/v1/mixed_posts")
  class MixedPostsApi(Resource):
      model = Post
      # index, show: public (no decorator)
      @requires_login
      async def create(self, request):
          return await Resource.create(self, request)
  ```

- `Resource.policy(request, action, item=None)` is called before every action.
  `action` is one of `"index" / "show" / "create" / "update" / "destroy"`; `item` is
  the loaded model row for show/update/destroy (or `None` if 404 is about to be
  returned), and always `None` for index/create. Return `False` to reject (403
  `{"error":"forbidden"}`); return `None` or `True` to allow.

### API tokens (bearer auth for non-browser clients)

For mobile / third-party / automation / AI agent clients that cannot ride the session
cookie, Bao mints scoped, expiring, hashed-at-rest tokens:

```python
from bao import issue_token, verify_token, revoke_token

plaintext, row = issue_token(user_id=42, scopes=["posts:read"], ttl_seconds=3600)
# Show plaintext exactly once. It is SHA-256 hashed before storage.

# Client sends:  Authorization: Bearer <plaintext>
# current_user(request) accepts the bearer header and resolves the User.
# request.state.token exposes the active Token to policies / actions:
def policy(self, request, action, item=None):
    token = getattr(request.state, "token", None)
    if action in {"create", "update", "destroy"} and token is not None:
        return "posts:write" in token.scope_list
```

Session and bearer both work; if both are present, the session wins. Tokens carry a
``revoked_at`` and optional ``expires_at``; ``verify_token`` returns ``None`` for
revoked / expired / unknown / malformed plaintexts.

CLI:

- `bao tokens:issue USER_ID [--scope X --scope Y] [--ttl SECONDS]` -> prints the
  plaintext exactly once.
- `bao tokens:list USER_ID` -> id, scopes, expiry, status per token.
- `bao tokens:revoke TOKEN_ID_OR_PLAINTEXT` -> sets revoked_at.

Create the table with `bao make:migration "create tokens" && bao migrate`.

### API: derived schemas + OpenAPI

Every Bao model can hand back a Pydantic schema derived from its SQLAlchemy columns:

```python
Post.pydantic_schema("output")   # all columns, all optional - for serialisation
Post.pydantic_schema("create")   # no id/created_at/tenant_id; non-nullable required
Post.pydantic_schema("update")   # no id/created_at/tenant_id; all optional (PATCH-style)
```

The `@api` Resource uses these automatically:

- `create` / `update` validate the JSON body against the create / update schema.
  Failures return `422 {"error":"validation","details":[...]}`. Client-supplied
  values for server-managed columns (`id`, `created_at`, `tenant_id`) are dropped
  by construction, so mass-assignment is closed at the schema layer.
- `index` / `show` serialise rows through the output schema with Pydantic v2's
  `model_dump(mode="json")`, so datetimes/UUIDs/Decimals come out as proper JSON.

Auto-emitted endpoints (no extra config needed):

- `GET /openapi.json` returns an OpenAPI 3.1 document built from every registered
  `@api` resource (paths for the five actions, plus component schemas).
- `GET /docs` loads Swagger UI from a CDN (`cdn.jsdelivr.net`). The default CSP is
  relaxed *only on /docs* to permit the bundle; every other route keeps
  `default-src 'self'`.

Both endpoints are public by default. To hide them, front the app with an auth
proxy or override the routes after `App.build()`.

Security in the spec. The document advertises two ways to authenticate under
`components.securitySchemes`:

- `bearer` (`http`, `scheme=bearer`, `bearerFormat=bao-token`): the API token
  flow. Send the value minted by `bao tokens:issue USER_ID` as
  `Authorization: Bearer <token>`.
- `cookieSession` (`apiKey`, `in=cookie`, `name=bao_session`): Starlette's
  signed session cookie set after a browser login.

Each operation is tagged with its Resource class name (so generated SDKs cluster
the actions together), and gets a `security` array when it requires auth:

- `@api(base, requires_login=True)` on the class: every operation lists both
  schemes under `security`.
- `@requires_login` on a single action method: only that operation lists the
  schemes; the rest stay open.
- Unprotected actions get no `security` entry (the document-wide default is
  empty, i.e. open).

## Workflows (Phase 2 skeleton)

Durable execution backed by SQLite. A workflow is a Python function whose side
effects live inside `step("name")(callable)(*args)`. The first run executes each
step in order and persists the result; `resume(run_id)` re-invokes the function and
each completed step returns its cached value without re-executing.

```python
from bao.workflows import workflow, step, run, resume

def _fetch():
    return ["a", "b"]

def _summarize(items):
    return {"count": len(items)}

@workflow("daily_rollup")
def daily_rollup():
    items = step("fetch")(_fetch)()
    return step("summarize")(_summarize)(items)

# Drive it
result = run(daily_rollup)              # creates a WorkflowRun, executes both steps
# If a step raises, the run is marked failed; you can later:
# result = resume(run_id)               # step("fetch") returns cached, step("summarize") re-runs
```

CLI: `bao workflows:list`, `bao workflows:run NAME`, `bao workflows:resume RUN_ID`,
`bao workflows:status RUN_ID`, `bao workflows:signal RUN_ID NAME PAYLOAD_JSON`.

### Signals (wait inside a workflow, deliver from outside)

A workflow can suspend on a named external event:

```python
from bao.workflows import signal_in, step, workflow

@workflow("needs_approval")
def needs_approval():
    step("draft")(_draft)()
    decision = signal_in("approval", timeout=300.0)   # waits for an external signal
    return step("apply")(_apply)(decision)
```

External code (a controller, the CLI, another worker) delivers the signal:

```python
from bao.workflows import signal
signal(run_id, "approval", {"ok": True, "by": "alice"})
# or, from the shell:
#   bao workflows:signal RUN_ID approval '{"ok": true}'
```

Multiple signals with the same name are allowed (consumed oldest-first). The wait is
wrapped in an implicit `step("signal::<name>")`, so on resume the consumed payload is
replayed from the cache without re-blocking.

Honest caveat: `signal_in` polls the database on a backoff (capped at 1.0s). This is
a single-node skeleton; a production engine would use PG NOTIFY/LISTEN or a queue so
the wait wakes up within milliseconds. `WorkflowSignal` rows ship with `bao.workflows`;
create the table with `bao make:migration "create workflow signals" && bao migrate`.

## Plugins (capability-gated)

Plugins are the first-class way to extend Bao. A plugin declares a capability scope
(what hosts it talks to, which env-var secrets it reads, whether it touches the DB,
which Protocols it binds), then ships routes / DI bindings / lifecycle hooks behind
that declaration.

```python
from typing import Protocol
from bao import App, Plugin, Capabilities

class ClockService(Protocol):
    def now(self) -> "datetime": ...

class ClockPlugin(Plugin):
    name = "clock"
    requires = Capabilities(services=[ClockService])  # advisory scope

    def register(self, app: App) -> None:
        app.bind(ClockService, _SystemClock())

application = App()
application.use(ClockPlugin())     # configure + register run immediately
```

A controller resolves the Protocol via DI as usual:

```python
@controller("/_clock")
class ClockController:
    @get("/")
    def now(self, request, clock: ClockService):
        return PlainTextResponse(clock.now().isoformat())
```

Lifecycle (per app boot):
1. `app.use(plugin)` -> `plugin.configure(app.config)` -> `plugin.register(app)`.
2. ASGI startup -> awaits each plugin's `startup()` in registration order.
3. ASGI shutdown -> awaits each plugin's `shutdown()` in reverse order.

`Plugin.secrets()` returns *only* the env vars listed in `requires.secrets`; plugins
should read every secret through this accessor rather than `os.environ` directly so
the audit output reflects exactly what they consume.

CLI:

- `bao plugins:list` -> names registered on the App.
- `bao plugins:audit` -> declared capability scope per plugin (network, secrets, db,
  routes, services).

Honest caveat: Phase 1a enforcement is *advisory*. The framework surfaces declared
capabilities so operators / AI agents can review them; real isolation (egress
firewall, secret scoping at the OS level) lands in later iterations.

## Events (in-process pubsub)

A tiny synchronous pubsub bus useful for cross-cutting hooks inside a single Bao
process (audit triggers, cache invalidation on writes, refresh notifications).

```python
from bao import subscribe, publish

def on_user_created(payload):
    log.info("welcome", extra={"user_id": payload["id"]})

unsub = subscribe("user.created", on_user_created)
publish("user.created", {"id": 42})  # invokes on_user_created
unsub()                               # remove the listener
```

`publish` returns the number of subscribers it invoked. A subscriber that raises is
logged via `bao.get_logger("bao.events")`; the exception does not propagate and does
not block dispatch to the other subscribers.

Honest caveat: this is *in-process only*. Subscribers in another worker / process /
host see nothing. Manifesto Section 7 plans Redis Streams for cross-process delivery
(later iteration). For workflow-aware durable signalling, use `bao.workflows.signal`.

`step(...)` accepts a few kwargs for hardening:

```python
step("fetch", retries=3, backoff=0.5, jitter=0.1, timeout=10.0)(fn)(*args)
```

- `retries` (default 0): on any exception, sleep `backoff * 2**attempt +
  uniform(0, jitter)` seconds and try again. Total attempts capped at `retries + 1`.
  On final failure the step row is persisted with `status="failed"` and the
  exception's class+message in `error`, then the exception re-raises so the run is
  marked failed.
- `timeout` (default None): if set, the callable runs in a daemon thread and is
  joined for at most `timeout` seconds. Exceeding the budget raises
  `WorkflowStepTimeout` (a `TimeoutError` subclass). HONEST CAVEAT: Python cannot
  kill a running thread; the abandoned callable keeps running in the background
  until it returns, and any side effects it produces still land. Prefer cooperative
  cancellation inside the callable when correctness matters.

`WorkflowStep` rows carry `attempt_count`, `status` (`"completed"` or `"failed"`),
and `error` (the exception text on a final-failed attempt). `bao workflows:status
RUN_ID` prints the run + step rows in a compact operator-friendly format.

Honest caveats (read these before relying on it):

- Step results must be JSON-serializable (str, int, float, bool, None, list, dict).
  Anything else surfaces as a TypeError from json.dumps.
- The workflow function body itself runs every time, including on resume. Code
  outside `step(...)` runs more than once on resume, so put every side effect
  inside a step.
- Retries are local to one step invocation. No automatic parallelism, no scheduling.
  This is the Phase 2 skeleton; the full engine ships later.
- WorkflowRun / WorkflowStep tables ship in `bao.workflows`; create them with
  `bao make:migration "create workflow tables" && bao migrate` (or via
  `metadata.create_all` in tests). When you bump the step schema, also run
  `bao make:migration "extend workflow steps" && bao migrate`.

## Auth scaffold (`bao make:auth`)

One command produces a working email + password auth flow:

```bash
bao make:auth
bao make:migration "create users" && bao migrate
bao test
```

It scaffolds `app/models/user.py` (User with email + password_hash), an `AuthController`
exposing `GET/POST /login`, `GET/POST /register`, and `POST /logout`, the two HTML forms
(with `_csrf` tokens), and a smoke test. Passwords are hashed with argon2id; login stores
`user_id` in the signed session cookie. Use these helpers from `bao`:

```python
from bao import hash_password, verify_password, login, logout, current_user

# In a controller:
def login_submit(self, request, form):
    user = User.first(email=form["email"])
    if user and verify_password(form["password"], user.password_hash):
        login(request, user)
        return redirect("/")

def me(self, request):
    user = current_user(request)            # User instance, or None
    ...
```

`current_user(request)` looks up `request.session["user_id"]` and returns the User
instance (or `None`). Failed logins return a generic "Invalid credentials" message to
avoid email/password enumeration.

## Testing (built in, zero-config)

Put tests in `tests/`. Use the built-in client; it exercises routing, DI, views, and middleware
without a running server. Swap real services for fakes via `overrides=` (DI by type).

```python
from bao.testing import TestClient
from main import application

def test_home():
    TestClient(application).get("/").assert_ok().assert_see("Welcome")
```

Assertions chain: `assert_ok`, `assert_status(n)`, `assert_see(text)`, `assert_dont_see(text)`,
`assert_header(name, value=None)`, `assert_redirect(to=None)`.

## Frontend (Alpine + HTMX, used as-is)

Bao does NOT ship a frontend library. The scaffolded ``layout.html`` loads two
established libraries from unpkg, fully credited in a Jinja comment:

- Alpine.js (https://alpinejs.dev) for client reactivity: ``x-data``, ``x-show``,
  ``@click``, ``x-model``.
- HTMX (https://htmx.org) for server interactions and SSE: ``hx-post``, ``hx-target``,
  ``hx-swap``, ``hx-ext="sse"`` + ``sse-connect``.

Use the established directives directly; do not invent new ones.

CSP trade-off: the default CSP relaxes ``script-src`` to allow ``https://unpkg.com``.
Production projects that self-host these libraries should set ``Config.csp`` (env
var ``BAO_CSP``) to a stricter policy (e.g. ``default-src 'self'``) and remove the
unpkg script tags from their layout.

### Live channels (SSE)

A channel is a kind + id pair (e.g. ``("workflow", run_id)``) that resolves to a
URL like ``/_bao/channels/workflow/<id>``. The framework registers one dispatch
route and forwards to a kind-specific handler that yields server-sent events.

```python
from bao.channels import channel, register_channel

# In a controller, hand the URL back to the client:
url = channel("workflow", run_id)

# Or in a template (``channel`` is a Jinja global):
# <div hx-ext="sse" sse-connect="{{ channel('workflow', run_id) }}"
#      sse-swap="step" hx-swap="beforeend"></div>

# Register a custom channel:
async def agent_channel(request, channel_id):
    yield encode_event("ping", "...")
register_channel("agent", agent_channel)
```

Built-in: the ``workflow`` channel polls the ``WorkflowStep`` table (~500ms) and
emits one ``event: step`` per new row, then ``event: done`` when the run is no
longer ``running``. A tenant-scoped run gates on the caller's ``current_tenant``;
mismatches surface as 403. The example home page shows a working demo wired via
HTMX + SSE.

Honest caveat: the SSE handler polls (single-node skeleton). A production engine
would push from PG NOTIFY or a queue so events land within ms.

### WebSocket channels (bidirectional)

Live channels also ship a WebSocket surface for flows that need server-to-client
push and client-to-server messages on the same connection. Register a handler
with ``channels.register_ws_channel(kind, handler)``; the handler signature is
``async def handler(websocket, *parts) -> None`` where ``parts`` are the
``channel_id:path`` segments split on ``/``. The dispatch route
(``/_bao/ws/{kind}/{channel_id:path}``) runs the same ``current_user`` auth
over the session cookie before accepting the socket; unauthenticated callers
see a ``4401`` close code, unknown kinds see ``4404``.

```python
from bao.channels import register_ws_channel

async def my_channel(websocket, channel_id):
    await websocket.send_text("hello")
    msg = await websocket.receive_text()
    ...

register_ws_channel("my_channel", my_channel)
```

Built-in: ``agent_stream``. The agent runtime publishes the following payload
types on the in-process ``bao.events`` topic ``bao.agents.<run_id>``:

- ``{"type": "delta", "text": "..."}`` - token-level chunks (FSM path only,
  when the active provider implements ``stream``).
- ``{"type": "call", "tool": "...", "args": [...], "kwargs": {...}}`` - just
  before a tool invocation.
- ``{"type": "tool_result", "tool": "...", "result": ...}`` (or ``error``)
  right after.
- ``{"type": "final", "text": "..."}`` - the model's final answer.
- ``{"type": "done", "status": "completed"}`` - run finished; the WS channel
  closes after this frame.

```
ws://host/_bao/ws/agent_stream/<run_id>
```

Native tool-use paths (Anthropic / OpenAI) emit ``call`` / ``tool_result`` /
``final`` / ``done`` boundary events but **not** ``delta`` events: the native
SDKs stream their own protocol that we do not yet re-frame as token chunks.
The FSM path streams both deltas and boundaries.

The session cookie rides through to the WS upgrade automatically (Starlette's
``SessionMiddleware`` applies to both HTTP and WebSocket scopes), so auth
works the same way as HTTP routes.

CLI: ``bao agents:run AGENT "message" --stream`` subscribes to the same
in-process topic locally and prints each event as it lands.

## Admin dashboard (`/_bao/admin`)

Bao auto-mounts a server-rendered operator dashboard. Pages:

- ``GET /_bao/admin`` (overview) - counts of runs (1d / 7d / all), pending
  approvals, DLQ size, audit log size, registered routes / tasks / agents /
  plugins.
- ``GET /_bao/admin/workflows`` - paginated WorkflowRun list (50/page). Each
  row links to ``/_bao/workflows/<run_id>``.
- ``GET /_bao/admin/agents`` - pending ``AgentApprovalRequest`` rows with
  approve / deny POST forms.
- ``GET /_bao/admin/dlq`` - dead-letter queue rows.
- ``GET /_bao/admin/audit`` - the last 200 audit log rows.

Every page is gated by ``current_user``; anonymous traffic redirects to
``/login``. Templates live under ``src/bao/templates/admin/``, mounted via a
``ChoiceLoader`` so user projects can shadow any page by writing a file at the
same relative path (e.g. ``app/views/admin/index.html``).

Toggle with ``Config.admin_enabled`` / env ``BAO_ADMIN_ENABLED=0``.

## Workflow visualization + replay

- ``GET /_bao/workflows/<run_id>`` renders the step graph (ord, name, status,
  attempts, error, truncated JSON result preview) and subscribes to the
  ``workflow`` SSE channel for live updates.
- POST ``/_bao/workflows/<run_id>/replay`` (the dashboard's Replay button)
  invokes ``bao.workflows.replay(run_id)``; cached step results short-circuit
  completed steps, so only failed / missing ones re-execute.
- ``bao workflows:replay RUN_ID [--deterministic-check]`` is the CLI mirror.
- ``replay(run_id, deterministic_check=True)`` opt-in: re-runs the workflow
  in a throwaway sandbox and logs a WARNING on cached vs sandbox divergence,
  to spot workflows reading from ``time.time()`` / ``random`` outside any
  ``step()``.
- Cancel is reserved for Phase 3 (no-op stub button on the detail page).

## Production endpoints (health / readiness / metrics + graceful shutdown)

Three operator-facing routes ship mounted automatically by ``App.build()``:

- ``GET /healthz`` -> liveness probe. 200 ``{"ok": true}`` while the process
  is up; 503 ``{"ok": false, "reason": "shutting_down"}`` mid-shutdown.
- ``GET /readyz`` -> readiness probe. Returns 200 ``{"ok": true, "checks": {
  "database": True, "plugin:foo": True, ...}}`` when the DB ``SELECT 1`` works
  and no plugin recorded a startup error; 503 with the failing entry replaced
  by a ``"TypeName: message"`` string.
- ``GET /metrics`` -> Prometheus text-format scrape, gated by
  ``Config.metrics_enabled`` (env: ``BAO_METRICS_ENABLED=0`` to drop).

Built-in metrics:

- ``bao_http_requests_total{method, route, status}`` counter.
- ``bao_http_request_duration_seconds{method, route}`` histogram.
- ``bao_workflow_runs_total{status}`` counter (terminal status only).
- ``bao_workflow_step_total{status}`` counter.
- ``bao_task_inflight`` gauge (live reader over the IO + CPU semaphores).
- ``bao_agent_runs_total{result}`` counter (``completed``, ``failed``,
  ``awaiting_approval``, ``max_steps_exceeded``).

Graceful shutdown (ASGI lifespan stop):

1. ``bao.health.mark_shutting_down()`` flips a process-wide flag.
2. ``ShutdownShedMiddleware`` 503s new HTTP requests; ``/healthz`` and
   ``/readyz`` are allowlisted so the operator can read the reason.
3. ``bao.tasks.drain_pools(timeout=Config.shutdown_drain_timeout)`` shuts
   the IO + CPU pools with ``wait=True`` up to the timeout.
4. ``bao.tasks.stop_scheduler_thread()`` signals the ``@every`` scheduler
   to exit.
5. Each plugin's ``async shutdown()`` runs in reverse registration order.

A plugin that raises during ``startup`` is recorded on the plugin instance
(``_bao_startup_error``) so ``/readyz`` can report which dependency tripped.
Lifespan never re-raises a plugin startup failure: a single broken plugin
should not take the whole app down at boot.

## Security defaults (do not disable without reason)

- Security headers on every response: `Content-Security-Policy` (default-src 'self'),
  `X-Frame-Options: DENY`, `X-Content-Type-Options: nosniff`,
  `Referrer-Policy: strict-origin-when-cross-origin`.
- Templates autoescape (Jinja2 with `select_autoescape(default=True)`).
- Parameterized queries via SQLAlchemy 2.0; never build SQL by string concatenation.
- CSRF is enforced on unsafe-method (POST/PUT/PATCH/DELETE) form posts. Templates
  automatically receive ``csrf_token``; include
  ``<input type="hidden" name="_csrf" value="{{ csrf_token }}">`` as the first child
  of every ``<form method="post">``. Missing or wrong tokens return 403.
- Sessions are signed cookies (``bao_session``) keyed by ``BAO_SECRET_KEY``.
- Config secrets come from the environment (`BAO_*` vars), never hard-code them.

## Observability (Phase 2 lite)

Three small primitives, all mounted automatically by ``App``:

- **Request id.** ``RequestIdMiddleware`` reads ``X-Request-ID`` from the request or
  generates a UUID4, binds it to ``bao.current_request_id`` (a ContextVar), and
  echoes the same id in the response header. Use it to correlate logs across
  services.
- **Structured logging.** ``bao.get_logger("my.module")`` returns a stdlib logger
  whose root handler emits one JSON line per record (time / level / name / msg /
  request_id / any ``extra={...}`` fields). Level comes from ``Config.log_level``
  (env var ``BAO_LOG_LEVEL``); default INFO.
- **Audit log.** ``bao.AuditLog`` is a Bao model; ``bao.audit("user.login",
  actor_id=..., resource_type=..., resource_id=..., data={...})`` inserts a row
  tagged with the active request id automatically. Tolerates a missing DB context.

```python
from bao import audit, get_logger

log = get_logger(__name__)

def login_submit(self, request, form):
    log.info("auth.attempt", extra={"email": form["email"]})
    audit("user.login", actor_id=user.id, resource_type="user", resource_id=user.id)
```

CLI: ``bao logs --tail N --action ACTION --actor ID`` prints the most recent
``AuditLog`` rows. Create the table with ``bao make:migration "create audit log"
&& bao migrate``.

## Fault tolerance (@retry, CircuitBreaker, DLQ)

General-purpose primitives, independent of the workflow engine. Wrap any callable,
plugin handler, HTTP client, or background task.

```python
from bao import retry, CircuitBreaker, CircuitOpen, dead_letter

@retry(attempts=3, backoff=0.5, jitter=0.1, exceptions=(ConnectionError,))
def fetch_user(uid):
    return http.get(f"/users/{uid}")

cb = CircuitBreaker(failure_threshold=5, recovery_timeout=30.0)
def call_payments(...):
    with cb:           # raises CircuitOpen when the breaker is open
        return ...

# On final failure, hand the message off to the dead-letter queue:
try:
    deliver_email(msg)
except Exception as exc:
    dead_letter("emails", {"to": msg.to, "body": msg.body}, exc, attempts=3)
```

- ``@retry`` retries on the listed exception classes and RAISES the original
  exception after the final attempt (no silent swallow). Each retry logs at
  WARNING with attempt / delay / exception.
- ``CircuitBreaker`` is the standard 3-state breaker (closed / open / half_open).
  Use as a context manager; thread-safe; ``CircuitOpen`` fires while open.
- ``DLQ`` is a Bao Model carrying ``(queue, payload_json, error, attempts,
  created_at, replayed_at)``. ``dead_letter(queue, payload, error)`` inserts a row;
  ``bao dlq:list [--queue X] [--tail N]`` enumerates; ``bao dlq:replay ID`` marks
  the row replayed and prints its payload (Phase 1a: no auto-resubmission, because
  the framework can't know which handler the payload belonged to).

Create the ``dlq`` table with ``bao make:migration "create dlq" && bao migrate``.

## Background tasks (@task / background / @every)

Three primitives, all in-process, bounded by default.

- ``@task(kind="io" | "cpu", max_concurrency=None)`` marks a function as background
  work. Calling the decorated function submits to the right pool and returns a
  ``concurrent.futures.Future``:
  - ``kind="io"`` uses a ``ThreadPoolExecutor`` (size ``Config.io_pool``, default 32).
    Use for HTTP, DB, file I/O.
  - ``kind="cpu"`` uses a ``ProcessPoolExecutor`` (size ``Config.cpu_pool``, default
    ``os.cpu_count()``). The function MUST be picklable: define it at module top
    level (no closures, no lambdas, no class methods).
  - ``max_concurrency`` (IO only) wraps the worker in a ``threading.Semaphore`` so
    only N copies of the same task run at once. In-process; processes don't share.
- ``background(fn, *args, **kwargs)`` submits to the IO pool fire-and-forget; the
  call returns ``None``. Failures inside ``fn`` are caught and logged via
  ``bao.get_logger("bao.tasks")`` and never re-raised. Intended for "send the email
  after returning the response" use cases.
- ``@every("30s" | "5m" | "1h")`` registers a recurring schedule. ``App.build()``
  starts a daemon scheduler thread when ``Config.enable_scheduler=True`` (default
  off; tests stay deterministic). ``bao scheduler:run`` blocks in the foreground;
  ``bao scheduler:run --once`` fires due tasks one pass and exits.

```python
from bao.tasks import task, background, every

@task(kind="io")
def send_welcome(user_id: int) -> None:
    ...

@task(kind="cpu")
def hash_password(password: str) -> str:  # picklable top-level
    ...

@every("5m")
def refresh_caches() -> None:
    ...

# In a controller:
def register_submit(self, request, form):
    user = User.create(email=form["email"])
    background(send_welcome, user.id)         # fire and forget
    return redirect("/")
```

Bounded by default. Each pool has a semaphore-bounded submit queue sized by
``Config.max_inflight_tasks`` (default 500). When full:

- ``@task``-decorated calls raise ``bao.tasks.QueueFull`` so the caller can shed
  load (sleep + retry, drop the request, etc.).
- ``background(...)`` logs the error and drops the work, because the caller has
  already returned its response.

CLI:

- ``bao tasks:list`` enumerates registered ``@task`` and ``@every`` entries.
- ``bao tasks:run NAME [args...]`` invokes a task synchronously for operator
  testing (positional args pass through as strings; no coercion).
- ``bao scheduler:run [--once]`` runs the @every scheduler.

Honest caveats:

- The scheduler is single-process. Two ``bao serve --workers 4`` processes both
  fire the schedule. For once-only execution use a real scheduler (cron,
  Kubernetes CronJob, etc.) or an external lock.
- ``@every`` accepts the simple ``Ns | Nm | Nh`` form *and* cron-ish forms:
  ``"every day at HH:MM"`` and ``"<weekday> at HH:MM"`` (e.g.
  ``"monday at 9am"``, ``"every day at 06:30"``). Unknown forms raise
  ``ValueError`` listing every supported shape. Richer cron strings (every Nth
  day-of-month, multiple times per day) are out of scope; pull in a real cron
  library if you need them.

## AI router (bao.ai)

A single API across LLM providers with fallback + bounded budgets. ``App()``
installs an ``EchoProvider``-only router by default so every test runs without
network calls. Production projects swap in real providers via
``ai.configure(Router([...]))`` in ``main.py``.

```python
from bao import ai
from bao.ai import Router, AnthropicProvider, OpenAIProvider, OllamaProvider

# Once at boot:
ai.configure(Router(
    providers=[AnthropicProvider(), OpenAIProvider(), OllamaProvider()],
    default="anthropic",
))

# Anywhere:
text = ai.ask("write a haiku about retries")
result = ai.complete(messages, model="claude-haiku-4-5-20251001")
vectors = ai.embed(["hello", "world"])
```

The router tries providers in order; a failing provider logs and the next is
attempted. ``RouterAllProvidersFailed`` (carrying every error tried) is raised
only when nothing succeeded.

Bound spend with ``with_budget``:

```python
from bao.ai import with_budget, BudgetExceeded

with with_budget(max_tokens=2000, max_cost_usd=0.10) as b:
    ai.ask("summarise the manifesto")    # raises BudgetExceeded on overflow
print(b.tokens_used, b.cost_used)
```

``EchoProvider`` is the deterministic test stand-in: ``complete`` returns the
last user message reversed, ``embed`` hashes each text into a 32-dim float
vector. Cost is always zero. Honest caveat: hash-based "embeddings" are
deterministic but not semantic; real semantic retrieval requires a real
embedding model.

Real provider wrappers (lazy imports, raise ``ProviderNotConfigured`` without
keys):

```bash
pip install anthropic     # AnthropicProvider; key in ANTHROPIC_API_KEY
pip install openai        # OpenAIProvider; key in OPENAI_API_KEY
# OllamaProvider only needs httpx (already a Bao dep)
```

CLI:

- ``bao ai:ask "prompt"`` -> run a completion through the active router.
- ``bao ai:stream "prompt"`` -> stream a completion through the active router.
- ``bao ai:providers`` -> list configured providers + the default.

### Streaming (``ai.stream`` / ``ai.stream_messages``)

``ai.stream(prompt)`` and ``ai.stream_messages(messages)`` return an iterator
of text chunks. The router walks providers in attempt order and picks the
first whose ``stream()`` does not raise ``NotImplementedError`` and does not
fail before yielding the first chunk; once a chunk is on the wire the router
will not fall back (replaying tokens would confuse clients). Budget
bookkeeping deducts on iterator close.

```python
from bao import ai
for chunk in ai.stream("explain backpressure"):
    print(chunk, end="", flush=True)
```

Echo yields one character per chunk; ``ScriptedProvider(script,
stream_chunks=[...])`` lets a test feed pre-canned chunk lists. Real-provider
wrappers (Anthropic ``messages.stream``, OpenAI chat completions stream,
Ollama ``/api/chat`` with ``stream: true``) implement ``stream`` against
their native streaming APIs.

## AI agent runtime (@agent + @tool + HITL)

An ``@agent`` is a Python class. Methods decorated ``@tool`` become callable by
the LLM through a tiny FSM protocol:

    CALL: tool_name(arg1, arg2, key=value)
    FINAL: <answer>

Bao parses one directive per assistant turn, runs the tool, feeds the result
back, and loops up to ``max_steps`` iterations. The loop is provider-agnostic
(works against any ``bao.ai`` router) and bounded by both ``max_steps`` and an
optional ``Budget(max_tokens, max_cost_usd)``.

```python
from bao.agents import agent, tool

@agent("Mathy", max_steps=4, max_tokens=2000)
class Mathy:
    @tool()
    def adder(self, a: int, b: int) -> int:
        return a + b

    @tool(requires_approval=True, scope="write")
    def post(self, channel: str, text: str) -> str:
        ...

result = Mathy().handle("what is 2+3?")
```

Human-in-the-loop: ``@tool(requires_approval=True)`` writes an
``AgentApprovalRequest`` row and raises ``AwaitingApproval``. An operator runs
``bao agents:approve ID`` (or ``deny``); on the next ``handle()`` call with the
same logical ``run_id`` the loop sees the approval and proceeds.

Inside a ``@workflow``, wrap the ``handle()`` call in ``step(...)``; the
existing workflow retry / signal machinery handles the pause-and-resume.

CLI:

- ``bao agents:list`` -> registered agent names + tool counts.
- ``bao agents:run AGENT "message" [--run-id RUN_ID]`` -> run in the foreground;
  prints the final answer + tool-call trace.
- ``bao agents:pending`` -> list pending HITL approval requests.
- ``bao agents:approve REQUEST_ID`` / ``bao agents:deny REQUEST_ID``.

Migration: ``bao make:migration "create agent approvals" && bao migrate``
(model is ``AgentApprovalRequest`` in ``bao.agents``).

Honest caveat: this is a provider-agnostic FSM. Production deployments that
prefer the model's native tool-use protocol (Anthropic tool_use blocks,
OpenAI function calling) would build a richer loop on top of those; Bao's
FSM is small enough to reason about and ships today.

### Native tool-use (transparent)

When the active provider declares ``supports_native_tools = True``
(Anthropic, OpenAI, ``MockNativeProvider``), the loop uses
``complete_with_tools(messages, tools)`` instead of parsing ``CALL:`` /
``FINAL:``. Tools are passed as ``[{"name", "description", "input_schema"},
...]`` and each provider translates that into its native shape
(Anthropic ``tools=[{...}]``; OpenAI ``tools=[{type:'function', function:{...}}]``).

The ``input_schema`` is inferred from each Python tool's signature
(``str / int / float / bool``; anything else falls back to ``string``); this
keeps the surface honest but small. HITL approvals fire the same way as the
FSM path (a ``requires_approval=True`` tool raises ``AwaitingApproval``).

Echo, Ollama, and ``ScriptedProvider`` keep the FSM path. Tests use
``MockNativeProvider`` (deterministic, scripted) to exercise the native loop
without real API keys; real-provider integration is exercised at run time
when ``ANTHROPIC_API_KEY`` / ``OPENAI_API_KEY`` are set.

## Memory (short-term + vector)

Two memory shapes, both opt-in:

```python
from bao import remember, recall, VectorMemory

remember("user asked about retries", role="user")    # short-term, ContextVar-backed
recent = recall(n=10)                                 # most-recent N, oldest first

vm = VectorMemory("kb", dim=32)
vm.add("Bao bounds every background task by default.")
hits = vm.search("how are background tasks bounded?", k=3)  # [(text, score), ...]
vm.prune(max_age_days=30, max_items=1000)
```

Short-term memory is a ``Memory`` ring buffer (default ``max_items=50``)
pinned to a ContextVar. ``install_memory()`` plants a fresh one explicitly;
``remember()`` lazily creates one on first call. Older entries evict FIFO.

Vector memory persists ``(name, text, embedding_json, created_at)`` rows in
the shared ``memory_items`` table. ``add()`` embeds via the active ``bao.ai``
router unless an explicit embedding is supplied. ``search(query, k)`` ranks
the collection by cosine similarity. Optional ``sqlite-vec`` (``pip install
sqlite-vec`` or ``pip install bao-framework[vec]``) accelerates large collections via a
``vec_<name>`` ``vec0`` virtual table; without it the fallback is a
pure-Python cosine walk over every row. Both paths return the same shape
``[(text, score), ...]``.

Agents opt in via ``@agent(memory='short')`` or ``@agent(memory='vector',
memory_name='kb')``. The agent loop prepends the retrieved content to the
system prompt as a labelled CONTEXT block; the loop treats it as data, never
instructions (prompt-injection defence is the project's responsibility, but
the framing helps).

CLI:

- ``bao memory:add NAME "text"`` -> embed and store.
- ``bao memory:search NAME "query" --k 5`` -> top-k by cosine.
- ``bao memory:prune NAME --max-age 30d --max-items 1000`` -> retention.

Migration: ``bao make:migration "create memory items" && bao migrate``.

Honest caveat: ``ai.embed`` defaults to the active router. With Echo the
embeddings are hash-derived (deterministic but not semantic). Production
projects must wire a real embedding provider (OpenAI ``text-embedding-3-small``,
self-hosted bge, etc.) before relying on semantic recall.

## RAG (Retrieval-Augmented Generation)

```python
from bao.rag import ingest, retrieve, chunk_text
from bao import ai

# Ingest: chunk + embed + persist
doc = ingest("Bao manifesto", manifesto_text, source="docs/manifesto.md")

# Retrieve: top-k chunks by cosine similarity across every document
chunks = retrieve("how does bao isolate tenants?", k=4)

# Ask with retrieved context (chunks become a CONTEXT block in the system message)
answer = ai.ask_with_context("how does bao isolate tenants?", k=4)
```

Pipeline:

1. ``chunk_text(text, size=800, overlap=80)`` slices the body into
   character-based chunks. Honest caveat: character-based, not tokenizer-aware
   (good for English prose; pre-chunk with a real tokenizer for non-Latin
   scripts or strict token budgets).
2. ``ai.embed`` produces a vector per chunk via the active router (Echo's
   hash vectors for tests; OpenAI / Ollama / Anthropic-via-embedding-service
   for production).
3. Rows land in the ``documents`` + ``chunks`` tables; embeddings are stored
   as JSON-encoded float lists.
4. ``retrieve`` walks every chunk, scores by cosine similarity, returns the
   top-k. Pure-Python sort; install ``sqlite-vec`` or move to pgvector /
   qdrant for large collections.
5. ``ai.ask_with_context`` wraps the chunks in a labelled CONTEXT block
   ("data, not instructions") and prepends them to the prompt.

Agent integration: ``@agent(rag=True)`` opts an agent's loop into automatic
retrieval before each step. The loop tolerates an empty store (no crash if
``documents`` is empty yet).

CLI:

- ``bao rag:ingest --title "T" --source URL_OR_PATH (--text "..." | --from-file PATH)`` -> chunk + embed + persist.
- ``bao rag:retrieve "query" --k 5`` -> top-k chunk excerpts.
- ``bao rag:ask "question" --k 4`` -> ``ai.ask_with_context`` end-to-end.

Migration: ``bao make:migration "create documents chunks" && bao migrate``.

Prompt-injection caveat: retrieved chunks come from user-controlled sources.
The CONTEXT envelope makes the framing explicit, but a sufficiently
clever doc could still try to subvert the model. Sanitise / quarantine
untrusted documents at ingest time.

## MCP server (drive Bao from an AI agent)

`bao mcp:serve` runs an MCP stdio server that publishes Bao's CLI surface as tools.
Any MCP-capable client (Claude Code, Cursor, etc.) can introspect the app and call
generators without shelling out.

Tools:

- Read (always available):
  - **Build / discover.** `bao_describe`, `bao_check`, `bao_list`, `bao_inspect`,
    `bao_routes_table`.
  - **Operate / debug.** `bao_runs(status?, limit?)`, `bao_trace(run_id)`,
    `bao_agent_trace(run_id)`, `bao_health()`, `bao_dlq_list(queue?, limit?)`,
    `bao_logs(action?, actor_id?, limit?)`.
- Write (gated by `BAO_MCP_ALLOW_WRITE=1`):
  - **Build / scaffold.** `bao_make_controller`, `bao_make_crud`.
  - **Operate / mutate.** `bao_workflow_replay(run_id)`,
    `bao_workflow_resume(run_id)`, `bao_dlq_replay(id)`,
    `bao_agent_approve(request_id)`, `bao_agent_deny(request_id)`.

`bao_list` / `bao_inspect` accept `kind` in:
`models / routes / controllers / plugins / workflows / tasks / events / api_resources`.

`bao_agent_trace` returns the persisted approval history for a run plus a note
pointing at the in-process `bao.agents.<run_id>` pubsub topic; live deltas /
tool_result / final events stream there while the agent is running.

Sample client config (Claude Code / Cursor / etc.):

```json
{
  "mcpServers": {
    "bao": {
      "command": "bao",
      "args": ["mcp:serve"]
    }
  }
}
```

Write tools default to OFF; set `BAO_MCP_ALLOW_WRITE=1` in the server's environment
to enable scaffolding. The forbidden response is structured (``{ok: false, error:
"forbidden", detail: ...}``) so an agent gets a uniform error shape.

## Deferred (not in v0.1.0)

Items the manifesto names but that v0.1.0 deliberately ships without:

- **Distributed execution (Phase 4a).** Bounded pools, the scheduler, and
  `bao.events` are single-process. Cross-worker queue + leader-elected
  scheduler + Redis Streams pubsub are explicitly deferred (Section 9 of the
  manifesto). Production deployments running `bao serve --workers N` should
  expect each worker to fire `@every` independently and `bao.events`
  subscriptions to be per-worker; use a real scheduler (cron, Kubernetes
  CronJob) when uniqueness matters.
- **Native LLM tool-use is partial.** Anthropic and OpenAI providers
  implement `complete_with_tools`; Ollama does not (yet). The agent loop
  falls back to the `CALL:` / `FINAL:` FSM for non-native providers
  transparently. The same goes for streaming: Anthropic / OpenAI / Ollama
  wrap their native streaming APIs; the FSM path does not yet stream.
- **Workflow DSL.** Workflows are Python today; the YAML/JSON DSL the
  manifesto names is open work.
- **Plugin marketplace + one-command deploy + GPU scheduling.** Vision-tier;
  not in v0.1.0.
- **True plugin sandboxing.** `Capabilities` declarations are advisory.
  Process / container isolation is an opt-in future iteration.

## Verifying your work

After changes, run `bao check`. If it fails it prints the exact error to fix. That is your feedback loop.
