Metadata-Version: 2.4
Name: agentlead-flow-core
Version: 0.1.1
Summary: Define agents (tools + system prompt) in YAML and run them as an orchestrated pipeline.
Author-email: AI Agent Lead <255125103+ai-agent-lead@users.noreply.github.com>
License-Expression: MIT
Project-URL: Homepage, https://github.com/ai-agent-lead/agentic-flow
Project-URL: Repository, https://github.com/ai-agent-lead/agentic-flow
Project-URL: Documentation, https://github.com/ai-agent-lead/agentic-flow/blob/main/README.md
Project-URL: Changelog, https://github.com/ai-agent-lead/agentic-flow/blob/main/CHANGELOG.md
Project-URL: Issues, https://github.com/ai-agent-lead/agentic-flow/issues
Keywords: agents,llm,anthropic,claude,gemini,orchestration,pipeline
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Operating System :: OS Independent
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: google-genai>=1.0
Requires-Dist: anthropic>=0.49
Requires-Dist: pyyaml>=6.0
Provides-Extra: dev
Requires-Dist: pytest>=8; extra == "dev"
Provides-Extra: mcp
Requires-Dist: mcp>=1.0; extra == "mcp"
Dynamic: license-file

# agentic-flow

A tiny framework for **declaring agents in YAML** — each with a system prompt and
a set of tools — and running them as a **graph** over a shared data state. Runs on
**Gemini (Google)** by default, with **Claude (Anthropic)** also built in and a
pluggable provider interface for the rest.

## The idea

You declare a **graph** of nodes (each wrapping an agent or a tool) and edges that
decide the next node — or a linear `pipeline:` that *lowers* to one. The framework
owns *how* it runs: a scheduler walks a *frontier* of ready nodes, writes each
node's result to a versioned channel, **checkpoints at every node boundary**, and
drives the tool-use loop inside each agent.

```
YAML  ─►  build_program  ─►  Graph  ─►  runtime scheduler
   entry ─► gather (agent) ─► analyze (agent) ─► write (tool) ─► __end__
                channels:  sources ─► claims ─► draft   (value + version + reducer)
```

A `pipeline: [a, b, c]` is sugar the loader lowers to `entry: a; a→b→c→__end__`, so
every existing pipeline runs unchanged. The edges can also **branch** (`when:`),
**loop** (a back-edge), or — when the author opts in — let the model **route** to
the next node or **replan** the remaining graph.

Core modules:

| File | Responsibility |
| --- | --- |
| `tools/` | `@tool` decorator + `ToolRegistry` (named groups); `builtin/` holds the built-in tools (`state_*`, `send_message`, the `todo_*` board), one module per concern. |
| `agent.py` | `Agent` = system prompt + tools + the model's tool-use loop. |
| `graph.py` | The graph IR: `Node` + three `Edge` kinds + `Graph`, structural `validate()`, the dry-run renderer, and the replan edit ops. |
| `store.py` | `Store` = versioned **channels** + **reducers** + message log + todo board for a run. |
| `runtime.py` | The flat-frontier **scheduler**: run/resume, edge resolution (incl. route), the per-node checkpoint, replan, the budget. |
| `cursor_format.py` | The graph-cursor checkpoint format (`CheckpointRecord`). |
| `authority.py` | Replan (authority L2): the guard-gated planner hook + the atomic batch validator. |
| `loader.py` | `build_program` — YAML → a validated `Graph` + the agent map; **lowers `pipeline:` to a graph**. |
| `pipeline.py` | The pipeline grammar (the lowering front-end): `Step` / `Loop` types + the YAML → `Step` parser. |
| `program.py` | `Orchestrator` — the ergonomic `from_yaml`/`run`/`resume` facade over the loader + runtime. |
| `retry.py` / `limits.py` | `RetryPolicy` (backoff) and capacity/timeout/visit caps. |
| `providers/` | Provider abstraction + `gemini` and `anthropic` implementations. |
| `checkpoint.py` | `Checkpointer` storage seam (in-memory + JSON-file) for durable runs & resume. |

An agent's `tools:` list accepts individual tool names *or* a group name — e.g.
`tools: [todos]` grants the whole todo board, `tools: [state, send_message]`
mixes a group with a single tool.

## Documentation

This README is the **usage guide**; deeper docs live under
[`docs/`](https://github.com/ai-agent-lead/agentic-flow/tree/main/docs):

| Doc | Read it for |
| --- | --- |
| [docs/CONTEXT.md](https://github.com/ai-agent-lead/agentic-flow/blob/main/docs/CONTEXT.md) | The glossary — the domain language (with `_Avoid_` synonym lists), the layering, and the key decisions. Start here for vocabulary. |
| [docs/ARCHITECTURE.md](https://github.com/ai-agent-lead/agentic-flow/blob/main/docs/ARCHITECTURE.md) | Module map, core abstractions, run lifecycle, design decisions. |
| [docs/FEATURES.md](https://github.com/ai-agent-lead/agentic-flow/blob/main/docs/FEATURES.md) | The full feature catalog — every feature, where it's configured, which example shows it. |
| [docs/INTERNALS.md](https://github.com/ai-agent-lead/agentic-flow/blob/main/docs/INTERNALS.md) | The logic under the hood — the tool-use loop, exactly-once delivery, schema derivation, retries, the provider seam. |
| [CHANGELOG.md](https://github.com/ai-agent-lead/agentic-flow/blob/main/CHANGELOG.md) | Release notes, per version. |

## Visual studio (author pipelines without writing YAML by hand)

A no-build, static web app for **visualizing and authoring** pipeline YAML lives
in the sibling **studio** repo at [`../studio/builder/`](../studio/builder/). Open
it directly — no npm, no build, no server, and js-yaml is vendored so it works
offline:

```bash
open ../studio/builder/index.html          # macOS  (or: xdg-open ../studio/builder/index.html)
# or serve it:  python -m http.server -d ../studio/builder 8000
```

It renders a `pipeline:` as a top-to-bottom flowchart (loops and `when` branches
nest visually), lets you edit agents and steps in an inspector, validates with the
same parse-time rules as the pipeline grammar, shows each step's reads/writes and
the **required initial state**, and round-trips to/from YAML (Open · Export · Copy).
It's authoring-only — it does not run pipelines. The public website and external
docs live alongside it under `../studio/`. See [`../studio/builder/README.md`](../studio/builder/README.md).

## Install & run

`pip install` gives you the importable library (`import agentic_flow`) and the
`agentic-flow` CLI. The distribution is published on PyPI as **`agentlead-flow-core`**:

```bash
pip install agentlead-flow-core
```

The **examples and docs referenced below are not shipped in the wheel** — they live
in the GitHub repo. Clone it to run them (the examples run on **Gemini**, so set
`GEMINI_API_KEY`):

```bash
git clone https://github.com/ai-agent-lead/agentic-flow
cd agentic-flow
pip install -e .                 # editable install from the clone
export GEMINI_API_KEY=...
python -m examples.quickstart.run        # smallest: one agent, one tool, one step
python -m examples.shared_state.run      # multi-agent fan-out → fan-in over one Store

# To run an agent on Claude, set provider: anthropic on it and:
export ANTHROPIC_API_KEY=sk-ant-...
```

Each example is a self-contained, single-concept folder under [`examples/`](https://github.com/ai-agent-lead/agentic-flow/tree/main/examples),
meant to be **read in order** — see [`examples/README.md`](https://github.com/ai-agent-lead/agentic-flow/blob/main/examples/README.md) for the
full 15-step ladder (`quickstart`, `structured_output`, `tool_steps`, `shared_state`,
`refine_loop`, `approval`, `graph_branch`, `router`, `subgraph_intake`, `delegation`,
`shared_transcript`, `team_todos`, `secrets_demo`, `incident_replan`, `data_pipeline`).
Four run with no API key: `subgraph_intake`, `approval`, `secrets_demo`, `data_pipeline`.

### CLI

Installing the package adds an `agentic-flow` command. Point it at a
`pipeline.yaml` and its tools module:

```bash
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools             # run
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools --dry-run   # print the plan, no model
agentic-flow my/pipeline.yaml --tools my.tools --set topic="Q3 sales"                            # seed state
agentic-flow examples/shared_state/pipeline.yaml --tools examples.shared_state.tools --log-level DEBUG  # trace via logging
```

`--tools MODULE` imports a module that registers `@tool` functions (repeatable);
`--set KEY=VALUE` seeds initial state; `--dry-run` builds and **renders the graph**
(nodes, edges grouped by source, `(back-edge)` flags, the authority footer) without
calling any model — a `pipeline:` is shown as the graph it lowers to; `--log-level
LEVEL` routes the trace through `logging`. For durable runs, `--checkpoint SPEC` /
`--run-id` / `--resume` / `--human-input*` persist and resume a run (with exit codes
`0`/`1`/`2`) — see *Durable runs & human-in-the-loop* below.

### Docker

```bash
docker build -t agentic-flow .
docker run --rm -e GEMINI_API_KEY agentic-flow      # runs the shared_state example

# Or with docker compose (reads keys from .env):
cp .env.example .env                                # then put GEMINI_API_KEY in .env
docker compose run --rm agentic-flow                # shared_state (default)
docker compose run --rm graph_branch                # native graph: example; live-mounts examples/graph_branch (rebuild the image to pick up framework changes)
```

Generated artifacts land on the host under `examples/<name>/out/` (mounted by
compose). The default image runs the Gemini `shared_state` example via the CLI.

### Testing (in Docker)

The test suites run **in the container, not your local Python env**. They're
offline (no API key), and the source is mounted, so they test your live code with
no rebuild:

```bash
docker compose run --rm tests        # runs every tests/test_*.py in Docker → exit 0 on success
```

The suites are script-style (each `tests/test_*.py` runs its own assertions), so
they're invoked with `python`, not `pytest`. Adding a suite? Add it to the `tests`
service command in `docker-compose.yml`.

## Defining tools

Any function with type hints becomes a tool; the JSON Schema is derived for you —
the docstring summary becomes the description, `Args:` lines the parameter docs.

```python
from agentic_flow import tool

@tool
def save_report(filename: str, content: str) -> str:
    """Write a report to disk and return the path.

    Args:
        filename: File name to write, e.g. "report.md".
        content: The full text of the report.
    """
    ...
```

Register related tools under a group with `@tool(group="…")` so an agent can grant
them all by listing the group name (see the `tools:` note above).

## Defining agents & the pipeline (YAML)

```yaml
provider: gemini                 # default provider for all agents
model: gemini-3.5-flash          # default model for all agents

agents:
  analyst:
    system: "You are a data analyst..."
    tools: [fetch_sales_data, calculator]
  writer:
    system: "You are an executive writer..."
    tools: [save_report]

pipeline:
  - name: analyze
    agent: analyst
    input: "Analyze this year's sales..."
    output: analysis               # writes result to state["analysis"]

  - name: write
    agent: writer
    input: |                       # {analysis} is pulled from state
      Write a summary of:
      {analysis}
    output: summary
```

A `pipeline:` is **sugar the loader lowers to a graph** (so it runs on the same
engine as a hand-authored `graph:`). Two leaf step kinds: an **agent step**
(`agent:` + `input:`, model-driven) and a **tool step** (`tool:` + `args:`, called
directly, no model). `input`/`args` are templated with the current state (`{key}`,
`{key[field]}`), so each step consumes what earlier steps produced. Per-agent
overrides: `model`, `max_tokens`, `thinking`, `output_schema`, `retry`,
`tool_retry`, `limits`.

**Control flow** — the flow isn't strictly linear. A **loop step** repeats a
`body:` until an `until:` condition goes truthy (or a `max_iterations:` cap), and a
**conditional step** (`when:` + `then:` / `else:`) branches. Conditions are plain
state templates judged for truthiness (`""`/`"false"`/`"0"`/`"no"`/`"none"`/`"off"`
are falsy) — no expression language; the boolean comes from a tool or a
structured-output agent. The `refine_loop` example shows a reviewer agent's
`{approved}` verdict driving both the loop and the final branch.

**Author the graph directly** when a readable sequence can't express the flow —
any-to-any handoff, cycles, fan-in, or letting the model choose. A `graph:` block
declares an `entry`, a `nodes:` mapping, and an `edges:` list (static `{from, to}`,
conditional `{from, to, when}`, or model-driven `{from, route: {by, to}}`). The
model never owns flow by default; you opt into the **authority spectrum** —
*route* (an edge whose successor an agent picks from your candidate set) or
*replan* (an `authority.replan:` policy that lets a planner edit the not-yet-run
subgraph from your declared vocabulary). See [docs/FEATURES.md §2–§2c](docs/FEATURES.md).

```yaml
pipeline:
  - name: refine
    loop:
      max_iterations: 5            # hard cap (the runaway guard)
      until: "{review[approved]}"  # checked after each pass; truthy → stop early
      body:
        - { name: draft,  agent: writer,   input: "Revise. Last review: {review}", output: draft }
        - { name: review, agent: reviewer, input: "Review: {draft}", output: review }   # → {approved}
  - name: ship
    when: "{review[approved]}"
    then: [ { name: publish, tool: save_report, args: { content: "{draft}" } } ]
    else: [ { name: escalate, agent: editor, input: "Not yet: {review}" } ]
```

## Durable runs & human-in-the-loop

A run can be **checkpointed** and **resumed** later — across a crash or a pause for
a person. The engine checkpoints at **every node boundary**, so a crash resumes at
the last completed node (inside loops and branches too). A **`human:`** node (a
`{prompt, output}` mapping) pauses the run: the runtime checkpoints a marker and
raises `NodePaused`; the host collects the answer and calls `resume(...)`, which
writes it to `store[output]` and continues. (A `human:` node needs a checkpointer —
a pause is durable by definition — and is valid anywhere, including inside loops and
branches.)

```yaml
pipeline:
  - name: draft
    tool: make_draft
    args: { topic: "{topic}" }
    output: draft
  - name: approve            # the human gate — pauses the run here
    human:
      prompt: "Approve this draft? Reply 'approve' to publish:\n\n{draft}"
      output: decision       # resume() writes the human's answer here
  - name: ship
    when: "{decision}"       # truthiness branch on the verdict
    then: [ { name: publish, tool: publish, args: { draft: "{draft}" } } ]
    else: [ { name: shelve,  tool: shelve,  args: { draft: "{draft}" } } ]
```

**From code** — pass a `run_id` + a `Checkpointer` to `run`, catch the pause, and
`resume` with the answer (here `InMemoryCheckpointer`, so it runs in one process):

```python
from agentic_flow import Orchestrator, InMemoryCheckpointer, NodePaused, registry

orch = Orchestrator.from_yaml("pipeline.yaml", registry)
cp = InMemoryCheckpointer()                                   # or JsonFileCheckpointer("./runs")
try:
    orch.run({"topic": "Q3"}, run_id="R1", checkpointer=cp)   # checkpoints at every node boundary
except NodePaused as paused:
    print(paused.prompt)                                      # show the human what to answer
    answer = "approve"                                        # …collected out of band…
store = orch.resume("R1", cp, human_input=answer)             # injects + continues
print(store["result"])
```

**From the CLI** — name a backend with `--checkpoint`; a paused run exits `2` and
prints a copy-pasteable resume command:

```bash
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1   # run → exit 2 (paused)
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1 \
    --resume --human-input "approve"                                               # resume → exit 0 (done)
```

Exit codes: `0` completed, `1` error, `2` paused. The answer can also come as
`--human-input-json` (parsed JSON) or `--human-input-file`; on a TTY, `--resume`
prompts for it interactively. Channel values are JSON-serializable and
**exactly-once delivery survives a resume**. A runnable, no-key
demo: `python -m examples.approval.run`. More in
[docs/FEATURES.md §16–§17](docs/FEATURES.md) and
[docs/INTERNALS.md §12](docs/INTERNALS.md).

## Providers (Gemini & Anthropic)

`gemini` (Google) is the default; `anthropic` (Claude) is also built in. Set a
pipeline default and override per agent:

```yaml
provider: gemini             # pipeline default
model: gemini-3.5-flash

agents:
  analyst:                   # uses the defaults above
    system: "..."
  auditor:
    provider: anthropic      # this agent runs on Claude
    model: claude-opus-4-8   # optional — omit to use the provider's default
    system: "..."
```

Agents on different providers share one `Store` and can delegate to each other, so
one graph can mix both. Credentials come from the environment: `GEMINI_API_KEY`
(or `GOOGLE_API_KEY`) / `ANTHROPIC_API_KEY`. Add a provider by implementing the
four-method `Provider` interface and calling `register_provider("name", Cls)`.

> Gemini caveat: `output_schema` (JSON mode) and tool use don't combine on one
> Gemini agent — split into two agents, or run that agent on Anthropic.

## Agent & pipeline features

Each is covered in depth in [docs/FEATURES.md](docs/FEATURES.md); the essentials:

**Structured output** — give an agent an `output_schema` (JSON Schema) and its
answer comes back as a **`dict`** (the framework adds `additionalProperties: false`
for you). Later steps address fields directly: `{report[title]}`.

**Agents as tools (delegation)** — list other agents under an agent's `agents:`
key and they become tools it can call; the sub-agent runs its own loop and shares
the same `Store`. Cycles are bounded by `Agent.MAX_STEPS`.

```yaml
reporter:
  tools: [save_report]
  agents:
    - analyst                                           # bare name → tool "analyst"
    - { name: analyst, as: crunch, description: "..." } # or rename
```

**Shared state & messaging** — one `Store` (versioned channels + message log + todo
board) is threaded through every agent, tool, and sub-agent. Three ways to move
data: node `output:` keys (read via `{key}`), the model-driven built-ins (`state_*`,
`send_message`/`read_messages`, the `todo_*` board), and a `state`/`ctx` parameter
injected into any `@tool` (hidden from the model's schema). Messages and assigned
todos use **exactly-once push delivery** — surfaced into the recipient's prompt
without polling. A `graph:` agent node can also add `emit: <transcript-channel>`
(an `add_messages` channel): its answer is appended as a turn `{id, sender, content}`
a later node reads via `{transcript}` templating — a shared, replayable blackboard
alongside the push log.

**Secrets** — a credential is **named** in YAML, never embedded. A top-level
`secrets:` block maps a local name to a pluggable backend (`{provider: env, var: X}` /
`{provider: file, path: P}` / shorthand `name: VAR`); the value is fetched **late** at
point-of-use, wrapped in a masked `Secret` (prints `Secret(****)`), and **never** written
to the `Store` — so it can't reach a checkpoint. Reference it as `{secret: name}` in an
agent's `api_key:` or an MCP server's `env:`/`headers:` (which also accept `${VAR}` /
`${VAR:-default}`), or read it in a tool via `ctx.secrets.get(name).reveal()`. `env` and
`file` backends ship; a `vault`/cloud backend is a registry entry away. A typo'd reference
or unknown backend fails at **load**; a missing var/file is a clear error. See
[docs/FEATURES.md §3b](docs/FEATURES.md) (offline demo: `python -m examples.secrets_demo.run`).

**Retries & limits** — a `retry:` policy (attempts + exponential backoff) at two
scopes: **tool retry** (per-`@tool(retry=…)` or an agent's `tool_retry:`) and
**agent retry** (`retry:`, defaulting to transient API errors + `OutputParseError`;
refusals aren't retried). A `limits:` block caps steps, output tokens, and
wall-clock — per agent and for the whole run (plus a per-node `max_visits` cycle bound).

> ⚠️ Agent retry re-runs tools already called — for side-effecting tools, prefer
> tool retry.

## Driving it from code

```python
from agentic_flow import Orchestrator, registry

orch = Orchestrator.from_yaml("examples/shared_state/pipeline.yaml", registry)
store = orch.run({"product": "Orbit"})    # seed initial state if you want
print(store["summary"])                    # Store is dict-like; store.log / store.todos too
```

Pass `on_event=...` to `from_yaml` to trace every node, tool call, and result.

## Observability

`on_event(event, data)` is the single seam — every `node_*`, `tool_*`, `retry`,
`checkpoint`, `human`, `route`, `replan` / `replan_rejected`, and
`messages_delivered` flows through it. Three sinks ship on top:

- **Console printer** — `console_tracer()` (`agentic_flow/console.py`), the one
  console sink the CLI and `examples/_common.py` share.
- **Standard `logging`** — `event_logger()` returns an `on_event` hook (INFO for
  node boundaries, WARNING for retries/tool errors, DEBUG for per-call detail);
  call `configure_logging()` to opt in. From the CLI: `--log-level DEBUG`.
- **`EventBus`** — a pub/sub fan-out that *is* an `on_event` hook, with filtered
  subscribers and a thread-safe real-time `stream()` for watching a run live:

```python
from agentic_flow import EventBus, Orchestrator, registry

bus = EventBus()
orch = Orchestrator.from_yaml("pipeline.yaml", registry, on_event=bus)
bus.run_in_background(lambda: orch.run(initial))   # closes the bus when the run ends
for event, data in bus.stream():                   # blocks, yields events live
    if event == "tool_result":
        print(data["agent"], data["tool"], "→", data["result"])
```

A runnable demo: `python -m examples.team_todos.run_stream`. The logging/bus internals
are in [docs/INTERNALS.md](docs/INTERNALS.md).

## Extending

- **New tools** — add `@tool` functions in any imported module (group with `group=`).
- **New agents / nodes / edges** — edit the YAML; no code change.
- **Delegation / structured output / retries / limits** — set `agents:` /
  `output_schema` / `retry:` / `limits:` on an agent.
- **Shared state** — the `state_*`/`send_message` tools, or a `state`/`ctx` param on your own tools.
- **New providers** — implement the `Provider` interface and `register_provider`.
- **Observability** — pass `on_event=...`; route to `logging` (`event_logger()`) or an `EventBus`.
- **Branching / loops** — a `pipeline:`'s `loop:` / `when:` (lowered to a back-edge + conditional edges), or author `graph:` conditional/back edges directly; conditions are state templates judged for truthiness.
- **Let the model drive flow** — add a `route:` on an edge (the model picks the next node) or an `authority.replan:` policy (the model rewrites the not-yet-run subgraph) — both compose only from your declared vocabulary.
- **Pause for a human / persist & resume** — add a `human:` (`{prompt, output}`) node and run with `run(run_id=, checkpointer=)`; catch `NodePaused` and `resume(..., human_input=...)`. Add a backend by subclassing `Checkpointer` + `register_checkpointer`.
