Metadata-Version: 2.4
Name: cowo
Version: 0.1.0
Summary: codex workflows — multi-agent orchestration over the codex app-server: fan out, pipeline, and build DAGs of agents
Keywords: llm,codex,agents,orchestration,workflows,cowo
Author: yaitso
License-Expression: MIT
License-File: LICENSE
Classifier: Programming Language :: Python :: 3
Classifier: Development Status :: 3 - Alpha
Classifier: Typing :: Typed
Requires-Dist: cyclopts>=4.18.0
Requires-Dist: openai-codex>=0.1.0b2
Requires-Dist: pydantic>=2.13.4
Requires-Python: >=3.12
Project-URL: Homepage, https://github.com/yaitso/cowo
Project-URL: Repository, https://github.com/yaitso/cowo
Description-Content-Type: text/markdown

# cowo

multi-agent orchestration for python, built on the official [codex app-server](https://developers.openai.com/codex/app-server). fan out, pipeline, and compose **DAGs of agents** — each agent is a full codex thread (tools, sandbox, structured output), not a bare completion — all multiplexed over one long-lived process.

## why

the codex app-server already gives you threads, forking, mailboxes, structured output, and a whole agent-graph runtime — but its python SDK is a *single-client driver*. it has no conductor: no fan-out, no DAG, no tunable concurrency, no token budgeting across agents. `cowo` is that conductor.

- **`agent()`** — one codex thread, one turn, structured result. the leaf primitive.
- **`parallel` / `pipeline`** — imperative fan-out / staged flows.
- **`dag` + `task` + `>>`** — declarative agent graphs (airflow-style operators) scheduled over a **tunable concurrency** limiter — drain thousands of queued agents through N live-adjustable slots.
- **`session`** — a persistent multi-turn agent (a codex thread that remembers).
- **pydantic-native** — pass a `BaseModel` as the schema, get a validated instance back.
- **one process** — N agents = N threads multiplexed over a single app-server, not N subprocesses.

## install

```sh
uv add cowo          # distribution name; you still `import cowo`
```

the [`openai-codex`](https://pypi.org/project/openai-codex/) SDK (a dependency) bundles the codex binary — no separate install. you do need codex auth once:

```sh
codex login        # or set OPENAI_API_KEY
cowo doctor
```

## quickstart

```python
import asyncio
from pydantic import BaseModel
from cowo import agent, parallel, spent

class Fact(BaseModel):
    name: str
    year: int

async def main():
    answer = await agent("Reply with one word: pong")
    fact = await agent("Founding year of Tokyo.", schema=Fact)   # -> Fact(name='Tokyo', year=1457)
    cities = await parallel([
        (lambda c=c: agent(f"Capital of {c}? one word")) for c in ("France", "Japan")
    ])
    print(answer, fact, cities, "tokens:", spent.spent())

asyncio.run(main())
```

structured output is enforced by the app-server (`outputSchema`) — a schema'd agent can't return non-conforming JSON.

## DAGs of agents

declare the graph with airflow-style `>>`; the scheduler runs it over a concurrency limiter:

```python
from cowo import dag, task

a = task("Analyze module A.")
b = task("Analyze module B.")
synth = task(lambda ups: f"Synthesize these analyses:\n{ups}")

[a, b] >> synth                          # a, b run concurrently; synth gets their results
result = dag(synth).run(concurrency=8)   # drain the graph through 8 slots
```

the DAG is explicit data before it runs — inspectable, schedulable, journalable. concurrency is live-adjustable (`d.concurrency = 32`) for draining large queues.

## recursion

recursion isn't a primitive — it's a recursive python function whose leaves are `agent()` calls:

```python
async def summarize(chunks):
    if len(chunks) == 1:
        return await agent(f"Summarize:\n{chunks[0]}")
    mid = len(chunks) // 2
    parts = await parallel([(lambda h=h: summarize(h)) for h in (chunks[:mid], chunks[mid:])])
    return await agent("Merge these summaries:\n" + "\n".join(parts))
```

## api

- `agent(prompt, *, schema=None, model=None, effort=None, sandbox=None, cwd=None, label=None)` — one-shot agent (ephemeral thread). `schema` accepts a dict JSON Schema or a pydantic `BaseModel`. `sandbox=None` is yolo (full access, no approvals); pass `"read-only"`/`"workspace-write"` to restrict.
- `session(*, model=None, sandbox=None, cwd=None)` — `.send(prompt, schema=, effort=)`, a persistent thread across turns.
- `parallel(thunks)` / `pipeline(items, *stages)` — imperative fan-out / staged flow.
- `dag(*terminals).run(concurrency=N)` + `task(...)` + `>>` — declarative agent DAG.
- `spent` — `budget` with `.spent()` / `.remaining()` / `.total`, aggregated across all agents.
- `run(coro)` — `asyncio.run` that also closes the shared app-server.
- `COWO_CONCURRENCY` env — default global concurrency cap (`min(16, cpu-2)`).

## roadmap

codex ships a full actor runtime (`multi_agents_v2`: spawn/send/wait/close over a tree of threads with mailboxes and supervision). v0.2 will expose the **actor substrate** — message-passing "soup of agents", cyclic feedback (critic ⇄ generator), and `let-it-crash` supervision — with `dag`/`parallel`/`pipeline` re-expressed as topologies over it. see [ACTORS.md](ACTORS.md).

## license

MIT
