Metadata-Version: 2.4
Name: cowo
Version: 0.1.1
Summary: codex workflows — multi-agent orchestration over the codex app-server: fan out, pipeline, and build DAGs of agents
Keywords: llm,codex,agents,orchestration,workflows,cowo
Author: yaitso
License-Expression: MIT
License-File: LICENSE
Classifier: Programming Language :: Python :: 3
Classifier: Development Status :: 3 - Alpha
Classifier: Typing :: Typed
Requires-Dist: cyclopts>=4.18.0
Requires-Dist: pydantic>=2.13.4
Requires-Python: >=3.12
Project-URL: Homepage, https://github.com/yaitso/cowo
Project-URL: Repository, https://github.com/yaitso/cowo
Description-Content-Type: text/markdown

# cowo

**codex workflows.** orchestrate many codex agents from plain python. each agent is a real codex thread — tools, sandbox, structured output — and N agents run as N threads multiplexed over one `codex app-server` process. fan out, pipeline, build DAGs.

```python
from cowo import agent, parallel
caps = await parallel([(lambda c=c: agent(f"capital of {c}?")) for c in countries])
```

## install

```sh
uv add cowo
```

cowo drives *your* `codex` CLI over its app-server JSON-RPC — it bundles nothing, so it runs anywhere codex runs (no platform-locked wheels). put `codex` on PATH, auth once:

```sh
bun install -g @openai/codex   # or any codex install
codex login                    # or set OPENAI_API_KEY
cowo doctor
```

## quickstart

```python
import asyncio
from pydantic import BaseModel
from cowo import agent, parallel, spent

class Fact(BaseModel):
    name: str
    year: int

async def main():
    pong = await agent("Reply with one word: pong")
    fact = await agent("Founding year of Tokyo.", schema=Fact)   # Fact(name='Tokyo', year=1457)
    caps = await parallel([(lambda c=c: agent(f"Capital of {c}? one word")) for c in ("France", "Japan")])
    print(pong, fact, caps, "·", spent.spent(), "tokens")

asyncio.run(main())
```

pass a pydantic model as `schema` and codex enforces it (`outputSchema`) — a schema'd agent *cannot* return malformed JSON; you get the parsed instance back.

## DAGs of agents

declare the graph with airflow-style `>>`; it runs over a tunable concurrency limiter:

```python
from cowo import dag, task

a = task("Analyze module A.")
b = task("Analyze module B.")
synth = task(lambda ups: f"Synthesize:\n{ups}")

[a, b] >> synth                          # a, b run concurrently; synth receives both results
result = dag(synth).run(concurrency=8)   # drain the graph through 8 live-adjustable slots
```

the graph is data before it runs — inspectable, scheduled, journalable. `d.concurrency = 32` mid-run to drain a big queue faster.

## recursion

not a primitive — a recursive function whose leaves are `agent()` calls:

```python
async def summarize(chunks):
    if len(chunks) == 1:
        return await agent(f"Summarize:\n{chunks[0]}")
    mid = len(chunks) // 2
    parts = await parallel([(lambda h=h: summarize(h)) for h in (chunks[:mid], chunks[mid:])])
    return await agent("Merge:\n" + "\n".join(parts))
```

## api

| | |
|---|---|
| `agent(prompt, *, schema, model, effort, sandbox, cwd, label)` | one ephemeral thread + one turn. `schema` = dict or pydantic `BaseModel`. `sandbox=None` is yolo; pass `"read-only"`/`"workspace-write"` to restrict |
| `session(*, model, sandbox, cwd)` | `.send(prompt, schema=, effort=)` — a persistent thread that remembers |
| `parallel(thunks)` / `pipeline(items, *stages)` | imperative fan-out / staged flow |
| `dag(*terminals).run(concurrency=N)` + `task` + `>>` | declarative agent DAG |
| `spent` | `budget`: `.spent()` / `.remaining()` / `.total`, summed across all agents |
| `run(coro)` | `asyncio.run` that also closes the app-server |
| `COWO_CONCURRENCY` | global concurrency cap (default `min(16, cpu-2)`) |

## how it works

one `codex app-server` subprocess; `agent()` = `thread/start` + `turn/start`, events demuxed by `threadId`; budget summed from token-usage notifications. ~120 lines over the protocol — see [`client.py`](src/cowo/client.py).

## roadmap

codex ships a native actor runtime (`multi_agents_v2`: spawn/send/wait/close, mailboxes, supervision). v0.2 exposes it as a message-passing actor substrate — see [ACTORS.md](ACTORS.md).

MIT
