Metadata-Version: 2.4
Name: flowgraph-ai
Version: 0.2.1
Summary: YAML-driven AI workflow engine powered by LangGraph — build conversational multi-step AI apps without boilerplate
Author-email: Simpletools India <support@simpletools.in>
License-Expression: MIT
Project-URL: Homepage, https://github.com/simpletoolsindia/flowgraphai
Project-URL: Repository, https://github.com/simpletoolsindia/flowgraphai
Project-URL: Documentation, https://github.com/simpletoolsindia/flowgraphai#readme
Project-URL: Changelog, https://github.com/simpletoolsindia/flowgraphai/blob/main/CHANGELOG.md
Project-URL: Bug Tracker, https://github.com/simpletoolsindia/flowgraphai/issues
Keywords: ai,llm,workflow,langgraph,langchain,chatbot,conversational-ai,yaml,fastapi,openai,anthropic,ollama,agent,automation
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Classifier: Framework :: FastAPI
Classifier: Typing :: Typed
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: langchain-anthropic>=0.1.0
Requires-Dist: langchain-ollama>=0.2.0
Requires-Dist: langchain-openai>=0.1.0
Requires-Dist: langgraph>=0.2.0
Requires-Dist: langgraph-checkpoint-sqlite>=2.0.0
Requires-Dist: langgraph-checkpoint-postgres>=2.0.0
Requires-Dist: fastapi>=0.115.0
Requires-Dist: uvicorn>=0.32.0
Requires-Dist: sse-starlette>=1.8.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: langfuse>=2.0.0
Requires-Dist: opentelemetry-sdk>=1.28.0
Requires-Dist: opentelemetry-exporter-otlp-proto-grpc>=1.28.0
Requires-Dist: opentelemetry-exporter-otlp-proto-http>=1.28.0
Provides-Extra: postgres
Requires-Dist: psycopg[binary]>=3.2.0; extra == "postgres"
Requires-Dist: psycopg-pool>=3.2.0; extra == "postgres"
Requires-Dist: langgraph-checkpoint-postgres>=2.0.0; extra == "postgres"
Provides-Extra: ui
Requires-Dist: streamlit>=1.55.0; extra == "ui"
Requires-Dist: streamlit-mermaid>=0.3.0; extra == "ui"
Provides-Extra: all
Requires-Dist: flowgraph-ai[ui]; extra == "all"
Requires-Dist: flowgraph-ai[postgres]; extra == "all"
Dynamic: license-file

# FlowGraph-AI

[![PyPI version](https://img.shields.io/pypi/v/flowgraph-ai.svg)](https://pypi.org/project/flowgraph-ai/)
[![Python](https://img.shields.io/pypi/pyversions/flowgraph-ai.svg)](https://pypi.org/project/flowgraph-ai/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![LangGraph](https://img.shields.io/badge/powered%20by-LangGraph-blue)](https://github.com/langchain-ai/langgraph)
[![CI](https://github.com/simpletoolsindia/flowgraphai/actions/workflows/ci.yml/badge.svg)](https://github.com/simpletoolsindia/flowgraphai/actions)

**FlowGraph-AI** is a YAML-driven AI workflow engine for building conversational, multi-step AI applications. Define your entire workflow in a simple YAML file — the engine handles LLM calls, user input collection, validation, routing, and state persistence automatically.

Built on top of **LangGraph** with native interrupt/resume support, SQLite/PostgreSQL persistence, and a built-in FastAPI server.

---

## Table of Contents

1. [Why FlowGraph-AI?](#why-flowgraph-ai)
2. [Features](#features)
3. [Quick Start](#quick-start)
4. [Installation](#installation)
5. [Core Concepts — 3 Node Types](#core-concepts--3-node-types)
6. [Smart Features](#smart-features)
   - [Pre-Extraction](#pre-extraction-no-repeat-questions)
   - [Tools in DataCollector](#tools-in-datacollector)
   - [Follow-Up After Workflow Ends](#follow-up-after-workflow-ends)
   - [Guardrails](#guardrails)
   - [Intent Escape](#intent-escape)
   - [Language Detection](#language-detection)
   - [Hallucination Detection](#hallucination-detection)
   - [LLM Parameter Customization](#llm-parameter-customization)
7. [Kanban Board](#kanban-board)
8. [Task Tracking & Correlation IDs](#task-tracking--correlation-ids)
9. [YAML Workflow Reference](#yaml-workflow-reference)
10. [Routing Reference](#routing-reference)
11. [State Reference](#state-reference)
12. [Custom Actions](#custom-actions)
13. [Custom Validators](#custom-validators)
14. [CLI Reference](#cli-reference)
15. [REST API Reference](#rest-api-reference)
16. [Python Builder API](#python-builder-api)
17. [Configuration Reference](#configuration-reference)
18. [LLM Provider Setup](#llm-provider-setup)
19. [Checkpointing & Persistence](#checkpointing--persistence)
20. [Observability](#observability)
21. [Visual Workflow Builder](#visual-workflow-builder)
22. [Project Structure](#project-structure)
23. [Developer & Agent Reference](#developer--agent-reference)
24. [Testing](#testing)
25. [Architecture](#architecture)
26. [Changelog](#changelog)
27. [Roadmap](#roadmap)
28. [License](#license)

---

## Why FlowGraph-AI?

Most AI frameworks make you write hundreds of lines of code to build a simple conversational flow. FlowGraph-AI flips this — you write a YAML file, the engine does the rest.

| Normal chatbot | FlowGraph-AI |
|---|---|
| Free-form conversation, no structure | YAML-defined flow with clear start → end |
| Developer writes all conversation logic | Define fields + routing in YAML |
| Hard to maintain, one giant prompt | Modular nodes, each does one thing |
| Ask questions one at a time | Extract 3 fields from one natural message |
| Safety logic must be custom-built | Built-in guardrails on every output |
| No trace / observability | Langfuse + Jaeger built in |
| User must re-answer if they said it already | Pre-extraction: bot never re-asks what it knows |

```yaml
workflow: support_ticket
description: "Create a support ticket"

state:
  name: str
  email: str
  issue: str

start_node: collect_info

nodes:
  collect_info:
    type: data_collector
    initial_message: "I'll get that ticket created. What's your name, email, and issue?"
    fields:
      - field: name
        description: "Customer full name"
      - field: email
        description: "Email address"
        validation:
          method: regex
          pattern: "^[\\w.-]+@[\\w.-]+\\.\\w+$"
      - field: issue
        description: "Brief description of the issue"
    max_retry: 3
    next: submit
    on_max_retry: error_response

  submit:
    type: custom_action
    method: "actions.tickets.create"
    routes:
      - value: "created"
        next: success
      - default: error_response

  success:
    type: response
    template: "Ticket created! We'll email {state.email} shortly."

  error_response:
    type: response
    error_field: error
```

That is the entire application. Run with `flowgraph run support_ticket` or `flowgraph serve`.

---

## Features

- **3-node architecture** — `data_collector`, `custom_action`, `response` — simple enough to reason about, powerful enough for production
- **Pre-extraction** — if the user's first message already contains a field value, the bot extracts it immediately and never re-asks
- **Tools in DataCollector** — LLM can call Python tools (DB lookups, APIs, system info) during data collection
- **Follow-up handling** — after a workflow ends, users can keep asking questions; the bot answers in context
- **Hallucination detection** — two-stage grounding check with auto-correction, configurable per node via `hallucination_check:`
- **LLM parameter customization** — per-node `llm_params:` overrides `temperature`, `max_tokens`, `top_p`, `streaming`, `timeout`, and `model`
- **`args_from_state:` for custom actions** — clean kwarg-based function calling; maps state fields directly to function parameters
- **Real-time Kanban board** — React app (`frontend/`) showing all workflow executions across 8 status columns with live SSE updates
- **Task tracking** — SQLite-backed task store with pub/sub and correlation IDs linking API, Kanban, Jaeger, and Langfuse
- **YAML-first** — define complete multi-step AI workflows without boilerplate
- **Python builder API** — fluent code alternative to YAML
- **LangGraph-powered** — native interrupt/resume, stateful execution, checkpointer persistence
- **SQLite + PostgreSQL** — conversations persist across restarts
- **Multi-language** — auto-detects user language, responds in kind
- **Intent classification** — automatically routes users to the right workflow
- **Built-in validation** — regex, length, numeric, one_of, not_empty + custom Python validators
- **Guardrails** — LLM safety post-processing on every response, configurable at 3 levels
- **5 LLM providers** — Claude, OpenAI, OpenRouter, Ollama, LM Studio; per-node overrides
- **FastAPI server** — production-ready REST API out of the box
- **Developer CLI** — `flowgraph init --provider <name>` to scaffold, plus validate, run, serve — all from `flowgraph`
- **Observability** — Langfuse (LLM tracing) + Jaeger (distributed tracing) with one-command setup
- **Visual builder** — drag-and-drop workflow editor (React app in `workflow-builder/`)

---

## Quick Start

### Option 1 — Scaffold a new project (recommended)

```bash
pip install flowgraph-ai

# Scaffold a ready-to-run project
flowgraph init my-chatbot --provider ollama
cd my-chatbot

# Start Ollama and pull a model
ollama pull llama3.1:8b

# Chat with the example order tracking workflow
flowgraph run order_tracking
```

Try saying: **"track my order ORD-001 from yesterday"** — both fields extracted at once, no follow-up questions.

### Option 2 — From source

```bash
git clone https://github.com/simpletoolsindia/flowgraphai
cd flowgraph-ai
uv sync

# Set your LLM
flowgraph llm set claude       # or ollama, openai, etc.

# Run a workflow
flowgraph run order_tracking
```

### Option 3 — API server

```bash
flowgraph serve
# API at http://localhost:8000 — see /docs for Swagger UI

curl -X POST http://localhost:8000/workflow/start \
  -H "Content-Type: application/json" \
  -d '{"message": "track my order ORD-001 from yesterday"}'
```

---

## Installation

### With pip

```bash
pip install flowgraph-ai
```

### With uv (recommended)

```bash
uv add flowgraph-ai
```

### With UI extras (Streamlit)

```bash
pip install "flowgraph-ai[ui]"
```

### From source

```bash
git clone https://github.com/simpletoolsindia/flowgraphai
cd flowgraph-ai
uv sync
```

---

## Core Concepts — 3 Node Types

FlowGraph-AI has exactly **3 node types**. That is intentional.

### `data_collector` — Collect information from the user

Asks questions, waits for replies, validates, extracts, and routes.

**Single-field:**
```yaml
collect_sr:
  type: data_collector
  role: "You are a helpful support agent."
  initial_message: "Please provide your SR number (format: SR-XXXXX)."
  field: sr_number
  description: "SR number in format SR-XXXXX"
  max_retry: 3
  next: process
  on_max_retry: error_response
  allow_intent_escape: true
  validation:
    method: "validations.sr.check_sr_format"
```

**Multi-field** (collects multiple fields in a single conversation turn):
```yaml
collect_order:
  type: data_collector
  role: "You are a helpful order support agent."
  initial_message: "Please share your order ID and order date."
  max_retry: 3
  next: lookup_order
  on_max_retry: error_response
  allow_intent_escape: true
  fields:
    - field: order_id
      description: "Order ID (e.g. ORD-12345)"
    - field: order_date
      description: "Order date or approximate date (e.g. yesterday, 10th Dec)"
```

**With tools** (LLM calls Python functions for external data):
```yaml
collect_system_info:
  type: data_collector
  field: cpu_temp
  description: "Current CPU temperature in Celsius"
  tools:
    - method: "actions.system_tools.get_cpu_temp"
      name: "get_cpu_temp"
      description: "Get current CPU temperature. Call when user asks about CPU or system stats."
  next: respond
```

**All data_collector options:**

| Option | Default | Description |
|---|---|---|
| `field` | — | Single field to collect |
| `fields` | — | List of fields (multi-field mode) |
| `description` | field name | Tells LLM what to extract |
| `initial_message` | LLM-generated | First question shown to user |
| `role` | "You are a helpful assistant." | System role for LLM extraction |
| `max_retry` | `3` | Max attempts before routing to `on_max_retry` |
| `next` | `"END"` | Next node on success |
| `on_max_retry` | `"END"` | Node if max retries exceeded |
| `allow_intent_escape` | `false` | Answer off-topic questions inline before re-asking |
| `humanize` | `false` | Run bot messages through humanizer LLM |
| `validation` | — | Validator config (see Custom Validators) |
| `tools` | `[]` | LangChain tools LLM can call during extraction |
| `guardrails` | global | Per-node guardrails override |
| `llm_provider` | global | Per-node LLM provider override |

---

### `custom_action` — Run your Python code

Executes any Python function. The function receives the full state dict, may mutate it, and returns the next node name.

```yaml
check_account:
  type: custom_action
  method: "actions.billing.check_status"
  result_field: billing_result
  routes:
    - value: "active"
      next: success_response
    - value: "suspended"
      next: suspended_response
    - default: error_response
```

```python
# actions/billing.py
def check_status(state: dict) -> str:
    account_id = state["account_id"]
    result = billing_api.lookup(account_id)
    state["billing_result"] = result.status
    state["output"] = f"Account {account_id}: {result.status}"
    return result.status   # matched against routes[]
```

#### `args_from_state` — Function keyword arguments (New in v0.2.0)

Clean, kwarg-based function calling. Maps state fields directly to function parameters. The function return value is stored in `result_field`.

```yaml
lookup_order:
  type: custom_action
  method: "actions.order_tools.get_order_status"
  args_from_state:
    order_id: order_id          # function_kwarg: state_field
  result_field: order_status    # stores return value
  next: success_response        # fixed routing
  on_error: error_response      # routing on exception
```

```python
# actions/order_tools.py
def get_order_status(order_id: str) -> str:
    """Explicit args, no state dict needed."""
    status = db.query(order_id)
    return status  # Stored in state["order_status"]
```

---

### `response` — Send the final reply

The terminal node. Applies guardrails automatically. Supports templates and field references.

```yaml
success_response:
  type: response
  template: "Done! Your request {state.request_id} has been submitted."

# Or use fields set by custom_action
final:
  type: response
  output_field: output     # state["output"] → shown to user
  error_field: error       # state["error"]  → shown if set
```

---

## Smart Features

### Pre-Extraction — No Repeat Questions

Before asking the user anything, the data_collector node checks if the **user's opening message** already contains the needed field values. If found, extraction happens immediately — no `interrupt()` is triggered.

**Example:**
- User sends: `"I want to track my order ORD-12345 placed yesterday"`
- Node needs: `order_id` + `order_date`
- Result: **Both fields extracted from the first message** — bot never asks either question
- If only `order_id` is found: bot asks only for `order_date`

This works for both single-field and multi-field nodes. Validation runs on pre-extracted values; if validation fails, the node falls back to asking normally.

---

### Tools in DataCollector

When a field value requires external data (live system info, DB lookups, API calls), define `tools:` on the node. The LLM decides when to call them based on the user's message.

```yaml
collect_info:
  type: data_collector
  field: server_status
  description: "Current server status (online/offline/degraded)"
  tools:
    - method: "actions.ops_tools.check_server_status"
      name: "check_server_status"
      description: "Check if a server is online. Provide server_name as argument."
  initial_message: "Which server would you like to check?"
  next: respond
```

```python
# actions/ops_tools.py
def check_server_status(server_name: str) -> str:
    """Check server health via API."""
    result = ops_api.ping(server_name)
    return result.status  # e.g. "online", "offline", "degraded"
```

Tool call flow:
1. User says something that needs external data
2. LLM calls the tool (up to 4 rounds of tool calls supported)
3. Tool result is injected into the extraction context
4. LLM uses the tool result to populate the field

Tools also work in **off-topic answers** — if `allow_intent_escape: true` and the user's off-topic question needs live data, the same tools are available.

---

### Follow-Up After Workflow Ends

After a workflow completes, users can continue asking related questions. The bot answers in the context of the completed conversation.

**API behavior:** `POST /message` on a completed `session_id` returns `status: "follow_up"` instead of "already completed".

**CLI behavior:** User can keep typing; the bot answers in context. Only starts a new workflow if the topic is clearly different.

**Example:**
```
Bot: Your order ORD-12345 is processing. Delivery expected Dec 15.
     [workflow completes]

User: Why is it taking so long?
Bot:  I understand your concern. Your order was placed yesterday and is currently
      in the processing stage — this typically takes 1-2 business days. Would you
      like me to raise a priority query?

User: Yes please
Bot:  I've flagged your order for review. You'll receive an update within 24 hours.
```

---

### Guardrails

Every `response` node output passes through an LLM safety review before being shown to the user. Configurable at 3 levels (highest priority wins):

```
node-level guardrails > workflow-level guardrails > global config.yaml
```

```yaml
# Global (config.yaml)
guardrails:
  enabled: true
  tone: "warm and professional"

# Workflow-level (top of workflow YAML)
guardrails:
  tone: "casual and friendly"
  custom_rules:
    - "Never mention competitor products"
    - "Always offer to escalate to human support"

# Node-level (inside any response or data_collector node)
my_response:
  type: response
  guardrails:
    enabled: false          # Disable for this node only
    # OR:
    custom_prompt: |        # Fully replaces all guardrail logic
      You are a strict reviewer. Return only the corrected response.
```

---

### Intent Escape

When `allow_intent_escape: true`, users can ask off-topic questions during data collection. The bot answers inline, then re-asks the original question.

```yaml
collect_order:
  type: data_collector
  allow_intent_escape: true
  field: order_id
  # ...
```

Example:
```
Bot:  Please provide your order ID.
User: Do you ship to Canada?
Bot:  Yes, we ship to Canada! Standard delivery is 5-7 business days.
      Now, could you please provide your order ID?
```

---

### Language Detection

FlowGraph-AI auto-detects the user's language from their first message and injects language instructions into all subsequent LLM prompts. The bot responds in the user's language without any configuration.

Supported: any language the underlying LLM supports (Claude, GPT-4o, Llama 3.1, etc.)

---

### Hallucination Detection

Every data collector node can check extracted values for hallucinations before accepting them.

```yaml
collect_order_info:
  type: data_collector
  hallucination_check: true   # default: true
  fields:
    - field: order_id
      description: "Order ID in format ORD-XXXXX"
```

The check runs in two stages:
1. **Deterministic** — substring match verifies the extracted value actually appeared in the user's message.
2. **LLM fact-check** — if no substring match, an LLM grounding call confirms the value is supported by the conversation context.

**Example:**
- **User says**: "My order is ORD-12345"
- **LLM extracts**: `order_id: "ORD-99999"` (hallucination)
- **Engine**: 
    1. Sees "ORD-99999" is NOT in "My order is ORD-12345".
    2. Runs `check_grounding` LLM call.
    3. LLM confirms "ORD-99999" is not supported.
    4. **Result**: Re-extracts or re-asks the user.

When hallucination is detected:
1. Auto-correction using LLM-suggested value (if available)
2. Strict re-extraction with deterministic prompt
3. Retry question to user if both stages fail

Disable per node with `hallucination_check: false`.

---

### LLM Parameter Customization

Customize LLM behaviour globally in `config.yaml` or per-node in workflow YAML.

**Global (config.yaml):**
```yaml
llm:
  provider: "claude"
  temperature: 0.7
  max_tokens: 2048
  top_p: 1.0
  streaming: false
  timeout: 30
```

**Per-node (workflow YAML):**
```yaml
collect_order_info:
  type: data_collector
  llm_params:
    temperature: 0.1      # precise extraction
    max_tokens: 512
    model: "gpt-4o"       # override model for this node only
```

Any `llm_params:` key overrides the global config for that node only. All other nodes continue to use global settings.

---

## Kanban Board

Track agent execution in real time with the built-in React Kanban board.

### Setup

```bash
cd frontend
npm install
npm run dev   # Opens on http://localhost:5173 (usually)
```

**Note**: Ensure the backend API is running (`flowgraph serve`) on port 8000.

### Usage

The board connects to the backend API (`http://localhost:8000`) and shows all workflow executions across 8 status columns: Backlog, Planned, In Progress, Waiting For Input, Tool Running, Retry/Recovery, Completed, Failed.

**Features:**
- Live SSE updates (no polling)
- Click any card to see the full activity timeline
- Correlation ID + Trace ID linking to Jaeger/Langfuse
- Filter by task title, workflow name, or task ID

Every workflow start automatically creates a task card. The card advances through columns as the workflow progresses. The SSE endpoint is `GET /tasks/stream/events`.

---

## Task Tracking & Correlation IDs

Every workflow execution automatically creates a task card backed by SQLite. Tasks are linked across all systems via a `correlation_id`.

```
API Response
  └── correlation_id ─────┬──→ Kanban board card
                          ├──→ Jaeger trace root span
                          └──→ Langfuse trace session
```

### API usage

Every `POST /workflow/start` and `POST /message` response includes:
```json
{
  "session_id": "order_tracking:abc123",
  "status": "waiting_input",
  "task_id": "A1B2C3D4",
  "correlation_id": "3f8a9c12e4b07d56a1f2e3d4c5b6a7f8"
}
```

Use `task_id` to track execution:
```bash
# Get task + full event timeline
curl http://localhost:8000/tasks/A1B2C3D4

# List all tasks (filter by status)
curl "http://localhost:8000/tasks?status=in_progress"

# Live SSE stream (used by Kanban board)
curl -N http://localhost:8000/tasks/stream/events
```

### Task lifecycle

Tasks move through these statuses automatically:
`planned` → `in_progress` → `waiting_input` ↔ `in_progress` → `completed` or `failed`

When hallucination is detected: `retry_recovery` → back to `in_progress`
When a tool runs: `tool_running` → back to `in_progress`

### Configure persistence

```yaml
# config.yaml
tracking:
  db_path: "./tasks.db"   # SQLite file — omit for in-memory only
```

---

## YAML Workflow Reference

```yaml
# ── Required ──────────────────────────────────────────────────────────────────
workflow: my_workflow          # Must match the .yaml filename
description: "..."             # Used by intent classifier — describe what this handles

# ── Optional ──────────────────────────────────────────────────────────────────
initial_input_field: user_message   # Which state field gets the user's opening message

# Workflow-level guardrails (override global config.yaml)
guardrails:
  enabled: true
  tone: "casual and friendly"

# Entry point
start_node: first_node

# ── Custom state fields (auto-injected fields do not need to be listed) ───────
state:
  order_id: str
  order_date: str
  order_status: str
  count: int
  items: list
  metadata: dict

# ── Nodes ─────────────────────────────────────────────────────────────────────
nodes:

  # data_collector — single field
  collect_order_id:
    type: data_collector
    role: "You are a helpful support agent."
    initial_message: "Please provide your order ID."
    field: order_id
    description: "Order ID (e.g. ORD-12345)"
    max_retry: 3
    next: process
    on_max_retry: error_response
    allow_intent_escape: true
    humanize: false
    validation:
      method: regex
      pattern: "^ORD-\\d{5}$"
    tools:
      - method: "actions.my_module.my_tool"
        name: "my_tool"
        description: "When and why to call this tool"
    guardrails:
      tone: "very formal"

  # data_collector — multi-field
  collect_contact:
    type: data_collector
    role: "You are an HR assistant."
    initial_message: "Please provide your name and email."
    max_retry: 3
    next: submit
    on_max_retry: error_response
    fields:
      - field: customer_name
        description: "Full name"
      - field: customer_email
        description: "Email address"
        validation:
          method: regex
          pattern: "^[\\w.-]+@[\\w.-]+\\.\\w+$"
      - field: preferred_date
        description: "Preferred contact date"
        default: "any day"    # Optional: used when not provided

  # custom_action
  process:
    type: custom_action
    method: "actions.my_actions.process_request"
    result_field: action_result
    next: success                  # Unconditional (if no routes)
    routes:
      - value: "success"
        next: success_response
      - value: "error"
        next: error_response
      - default: error_response    # Fallback for any other return value
    conditional_edges:             # Alternative to routes
      field: action_result
      paths:
        "ok": success_response
        "fail": error_response

  # response
  success_response:
    type: response
    template: "Done! Your request {state.order_id} has been submitted."
    output_field: output           # Read from state["output"] if set
    error_field: error             # Read from state["error"] if set (shown instead)
    guardrails:
      enabled: false

  error_response:
    type: response
    error_field: error
    output_field: output
```

---

## Routing Reference

| Pattern | Where | YAML |
|---|---|---|
| Unconditional | `data_collector`, `custom_action` | `next: node_name` |
| On failure | `data_collector` | `on_max_retry: node_name` |
| Routes list | `custom_action` | `routes: [{value: "x", next: "y"}, {default: "z"}]` |
| Conditional | `custom_action` | `conditional_edges: {field: "f", paths: {v1: n1, v2: n2}}` |
| Direct return | `custom_action` | Return value IS the next node name |

---

## State Reference

### Auto-injected fields (always available, no need to declare)

| Field | Type | Description |
|---|---|---|
| `user_message` | str | User's opening message |
| `user_language` | str | Auto-detected language |
| `output` | str | Final output shown to user |
| `error` | str | Error message (shown instead of output if set) |
| `messages` | list | Full conversation history |
| `_dc_next_node` | str | Internal routing signal |
| `_dc_failed` | bool | Internal: max retry exceeded |
| `_dc_node` | str | Internal: which node hit max retry |

### Template syntax

```yaml
template: "Your order {state.order_id} placed on {state.order_date} is ready."
```

All variants work:
- `{state.field}` — primary (V3)
- `{{state.field}}` — also supported
- `{field}` — backward compat

---

## Custom Actions

```python
# actions/my_actions.py

def process_request(state: dict) -> str:
    """
    Receives full workflow state.
    Mutate state to set output/error values.
    Return a string matched against routes[] in YAML.
    """
    sr = state["sr_number"]
    email = state.get("email", "")

    try:
        result = my_api.submit(sr, email)
        state["output"] = f"Request {sr} submitted. Ref: {result.ref_id}"
        return "success"
    except MyAPIError as e:
        state["error"] = f"Failed: {e.message}"
        return "error"
```

```yaml
submit:
  type: custom_action
  method: "actions.my_actions.process_request"
  result_field: action_result
  routes:
    - value: "success"
      next: success_response
    - value: "error"
      next: error_response
```

---

## Custom Validators

```python
# validations/my_validators.py

def validate_account_id(value: str, config: dict) -> tuple[bool, str]:
    """
    Returns (is_valid, error_message).
    error_message is shown to the user on failure.
    """
    if not value.startswith("ACC-"):
        return False, "Account ID must start with ACC- (e.g. ACC-12345)"
    return True, ""
```

```yaml
collect_account:
  type: data_collector
  field: account_id
  validation:
    method: "validations.my_validators.validate_account_id"
```

### Built-in validators

| Validator | Config | Example |
|---|---|---|
| `regex` | `pattern: "..."` | `pattern: "^\\d{5}$"` |
| `length` | `min: N, max: N` | `min: 5, max: 200` |
| `numeric` | `min: N, max: N` | `min: 1, max: 100` |
| `one_of` | `options: [...]` | `options: [Annual, Sick, Unpaid]` |
| `not_empty` | — | — |

---

## CLI Reference

```bash
# ── Project scaffold ──────────────────────────────────────────────────────────
flowgraph init [dir] [--provider ollama|claude|openai|...]
# Creates workflows/, actions/, validations/, config.yaml, .env.example
# Includes 2 ready-to-run examples (order tracking + FAQ)

# ── Health & diagnostics ──────────────────────────────────────────────────────
flowgraph health               # Full check: DB, LLM, workflows, observability

# ── Database ──────────────────────────────────────────────────────────────────
flowgraph db set <url>         # postgresql://... | ./file.db | (empty = memory)
flowgraph db test              # Test database connection

# ── LLM provider ──────────────────────────────────────────────────────────────
flowgraph llm set <provider>   # claude | openai | openrouter | lmstudio | ollama
flowgraph llm set claude --model claude-opus-4-6
flowgraph llm test             # Send a test prompt

# ── Workflows ─────────────────────────────────────────────────────────────────
flowgraph list                 # List all workflows with descriptions
flowgraph show <name>          # Show node graph + state schema
flowgraph validate [name]      # Validate YAML syntax and routing
flowgraph new <name>           # Scaffold a new workflow interactively
flowgraph run <name>           # Chat with a workflow in the terminal
flowgraph run <name> --thread <id>   # Resume an existing conversation

# ── API server ────────────────────────────────────────────────────────────────
flowgraph serve                # Start FastAPI server (default: 0.0.0.0:8000)
flowgraph serve --port 9000 --reload

# ── Observability stack (Docker) ──────────────────────────────────────────────
flowgraph stack up             # Start PostgreSQL + Langfuse + Jaeger
flowgraph stack down           # Stop the stack
flowgraph stack down --volumes # Stop + delete all data
flowgraph stack status         # Container health + service URLs
flowgraph stack logs           # Tail all service logs
flowgraph stack logs langfuse  # Tail specific service
flowgraph stack configure      # Save Langfuse API keys + enable observability

# ── Dev mode ──────────────────────────────────────────────────────────────────
flowgraph dev                  # stack up + API server + show all URLs

# ── Testing ───────────────────────────────────────────────────────────────────
flowgraph test                 # Run unit tests
flowgraph test --unit          # Unit tests only
flowgraph test --integration   # Integration tests (needs DB + LLM)
flowgraph test --coverage      # With coverage report
flowgraph test -p tests/unit/test_intent.py   # Specific file
flowgraph test -v              # Verbose output
```

---

## REST API Reference

Start with `flowgraph serve`, then use the Swagger UI at `http://localhost:8000/docs`.

### POST `/workflow/start`

Start a new conversation. Intent auto-classified if `workflow_name` is omitted.

```bash
curl -X POST http://localhost:8000/workflow/start \
  -H "Content-Type: application/json" \
  -d '{"message": "I need to check my SR status"}'

# With explicit workflow
curl -X POST http://localhost:8000/workflow/start \
  -H "Content-Type: application/json" \
  -d '{"message": "track ORD-001 from yesterday", "workflow_name": "order_tracking"}'
```

Response:
```json
{
  "session_id": "order_tracking:abc123",
  "workflow_name": "order_tracking",
  "status": "waiting_input",
  "question": {"field": "order_id", "question": "Please provide your order ID."}
}
```

Status values: `waiting_input` | `completed` | `error`

### POST `/message`

Resume a session or send a follow-up after completion.

```bash
curl -X POST http://localhost:8000/message \
  -H "Content-Type: application/json" \
  -d '{"session_id": "order_tracking:abc123", "message": "ORD-12345"}'
```

When session is already completed, returns `status: "follow_up"` with a contextual answer.

### GET `/session/{session_id}`

Inspect current state of a session.

```bash
curl http://localhost:8000/session/order_tracking:abc123
```

### GET `/workflows`

List all workflows with descriptions.

### GET `/health`

System health + configuration summary.

### GET `/workflow/{name}/mermaid`

Mermaid diagram for a workflow (for visualization).

### GET `/tasks`

List all tasks (workflow execution cards). Supports optional `status` query param filter.

### GET `/tasks/{task_id}`

Get a single task by ID including its full event timeline.

### GET `/tasks/session/{session_id}`

Get all tasks associated with a session ID.

### GET `/tasks/stream/events`

Server-Sent Events stream. The Kanban board connects here for live updates. Each event is a JSON-encoded task or task-event object.

All `WorkflowResponse` objects (from `/workflow/start` and `/message`) now include `task_id` and `correlation_id` fields.

---

## Python Builder API

Programmatic workflow construction without YAML:

```python
from flowgraph_ai.engine.workflow_builder import workflow
from flowgraph_ai.llm.provider import get_llm
from flowgraph_ai.config import load_config
from flowgraph_ai.storage.checkpointer import get_checkpointer

cfg = load_config()
llm = get_llm(cfg)
checkpointer = get_checkpointer(cfg)

graph = (
    workflow()
    .state(account_id="str", result="str")
    .collect(
        name="collect_id",
        field="account_id",
        ask="Please provide your account ID.",
        next_node="process",
        on_max_retry="error",
    )
    .action(
        name="process",
        method="actions.billing.check_status",
        result_field="result",
        routes=[{"value": "ok", "next": "done"}, {"default": "error"}],
    )
    .respond(name="done",  template="All good! Account: {state.account_id}")
    .respond(name="error", error_field="error")
    .start("collect_id")
    .compile(llm=llm, app_config=cfg, checkpointer=checkpointer)
)
```

---

## Configuration Reference

`config.yaml` — controls all runtime behaviour:

```yaml
app:
  name: FlowGraph-AI
  version: 0.1.0

# ── LLM Provider ──────────────────────────────────────────────────────────────
llm:
  provider: ollama            # claude | openai | openrouter | lmstudio | ollama
  temperature: 0.7
  max_tokens: 2048            # Global default (per-node llm_params: overrides)
  top_p: 1.0
  streaming: false
  timeout: 30                 # Seconds

  ollama_model: llama3.1:8b
  ollama_base_url: http://localhost:11434

  claude_model: claude-sonnet-4-6
  # ANTHROPIC_API_KEY env var

  openai_model: gpt-4o-mini
  # OPENAI_API_KEY env var

  openrouter_model: openai/gpt-4o-mini
  openrouter_base_url: https://openrouter.ai/api/v1
  # OPENROUTER_API_KEY env var

  lmstudio_model: local-model
  lmstudio_base_url: http://localhost:1234/v1

# ── Engine ────────────────────────────────────────────────────────────────────
engine:
  max_steps: 50               # Max LangGraph execution steps per invocation

# ── Intent Classifier ─────────────────────────────────────────────────────────
intent:
  fallback_workflow: greeting   # Workflow used when no intent matches
  # custom_prompt: |            # Optional: replace the classification prompt

# ── Guardrails ────────────────────────────────────────────────────────────────
guardrails:
  enabled: true
  tone: "warm and professional"
  # custom_rules:
  #   - "Never mention competitor products"
  # custom_prompt: |           # Fully replaces tone + custom_rules
  #   You are a strict reviewer...

# ── Language Detection ────────────────────────────────────────────────────────
language_detection:
  enabled: true               # Detect language + respond in kind

# ── Logging ───────────────────────────────────────────────────────────────────
logging:
  level: WARNING              # DEBUG | INFO | WARNING | ERROR
  format: text                # text | json
  # file: logs/app.log

# ── Checkpointing ────────────────────────────────────────────────────────────
checkpoint:
  connection_string: ""       # Empty = in-memory (dev)
  # connection_string: "./flowgraph.db"          # SQLite
  # connection_string: "postgresql://user:pass@localhost:5432/mydb"  # PostgreSQL

# ── Task Tracking ─────────────────────────────────────────────────────────────
tracking:
  db_path: "./tasks.db"       # SQLite file for task history; omit = in-memory

# ── Observability ─────────────────────────────────────────────────────────────
observability:
  langfuse:
    enabled: false
    host: http://localhost:3000
    public_key: ""            # or env: LANGFUSE_PUBLIC_KEY
    secret_key: ""            # or env: LANGFUSE_SECRET_KEY
  tracing:
    enabled: false
    endpoint: http://localhost:4317   # Jaeger OTLP gRPC
    service_name: flowgraph-ai
```

### Environment variables

| Variable | Purpose |
|---|---|
| `ANTHROPIC_API_KEY` | Claude API key |
| `OPENAI_API_KEY` | OpenAI API key |
| `OPENROUTER_API_KEY` | OpenRouter API key |
| `LANGFUSE_PUBLIC_KEY` | Langfuse public key |
| `LANGFUSE_SECRET_KEY` | Langfuse secret key |
| `FLOWGRAPH_LOG_LEVEL` | Override log level |

Set these in a `.env` file in the project root — auto-loaded at startup.

### Per-node LLM override

```yaml
my_node:
  type: data_collector
  llm_provider: claude        # Override global provider for this node only
  field: name
```

---

## LLM Provider Setup

### Ollama (local, free — default)

```bash
brew install ollama            # macOS
curl -fsSL https://ollama.ai/install.sh | sh   # Linux

ollama pull llama3.1:8b
ollama serve
```

```yaml
llm:
  provider: ollama
  ollama_model: llama3.1:8b
```

Recommended models:
- `llama3.1:8b` — fast, works for most flows
- `llama3.1:70b` — much better JSON extraction (needed for multi-field on small models)
- `mistral:7b` — good structured output

### Claude (best quality)

```bash
echo "ANTHROPIC_API_KEY=sk-ant-..." >> .env
```

```yaml
llm:
  provider: claude
  claude_model: claude-sonnet-4-6
```

### OpenAI

```bash
echo "OPENAI_API_KEY=sk-..." >> .env
```

```yaml
llm:
  provider: openai
  openai_model: gpt-4o-mini
```

### OpenRouter (access any model via one API)

```bash
echo "OPENROUTER_API_KEY=sk-or-..." >> .env
```

```yaml
llm:
  provider: openrouter
  openrouter_model: anthropic/claude-3.5-sonnet
  openrouter_base_url: https://openrouter.ai/api/v1
```

### LM Studio (local)

Start LM Studio, load a model, then:

```yaml
llm:
  provider: lmstudio
  lmstudio_model: local-model
  lmstudio_base_url: http://localhost:1234/v1
```

---

## Checkpointing & Persistence

Conversations persist across server restarts via LangGraph checkpointers.

```bash
# SQLite (dev — zero setup, auto-created)
flowgraph db set ./flowgraph.db

# PostgreSQL (production)
flowgraph db set postgresql://user:pass@localhost:5432/mydb

# In-memory (testing only — lost on restart)
flowgraph db set ""

# Test connection
flowgraph db test
```

---

## Observability

### Quick setup

```bash
flowgraph stack up              # Start PostgreSQL + Langfuse + Jaeger via Docker
# Open http://localhost:3000 → sign up → create project → copy API keys
flowgraph stack configure       # Enter keys → saved to config.yaml
flowgraph dev                   # Full dev mode: stack + API server
```

### Services

| Service | URL | Purpose |
|---|---|---|
| Langfuse UI | http://localhost:3000 | LLM prompts, completions, tokens, cost, latency |
| Jaeger UI | http://localhost:16686 | Distributed traces: API → workflow → node spans |
| OTLP gRPC | localhost:4317 | App sends OTel spans here |
| PostgreSQL | localhost:5433 | Langfuse storage (separate from app DB) |

### What gets tracked

**Langfuse** — every LLM call:
- Full prompt + completion
- Token usage + cost estimate
- Latency per call
- Session grouping by `thread_id`
- Trace name: `workflow:<name>`

**Jaeger** — every workflow execution:
- Root span per API request
- Node-level child spans
- Workflow status + session ID attributes

---

## Visual Workflow Builder

A drag-and-drop React app for building YAML workflows visually — no code required.

```bash
cd workflow-builder
npm install
npm run dev
# Open http://localhost:5174
```

Features:
- Drag-and-drop canvas (n8n-style)
- Three node types: Data Collector (blue), Custom Action (orange), Response (green)
- Click any node to configure in a side panel
- Export to YAML → drop in `workflows/` directory
- Dark/light mode
- Pre-built templates for common patterns

### Building a workflow in the UI

1. Drag node types from the sidebar onto the canvas
2. Connect nodes by dragging from bottom handle to top handle
3. Click a node to configure it (name, fields, routes, validation, etc.)
4. Click **Export YAML** → save to `workflows/my_workflow.yaml`
5. Test with `flowgraph run my_workflow`

---

## Project Structure

```
flowgraph-ai/
├── flowgraph_ai/
│   ├── engine/
│   │   ├── graph_builder.py     # Builds LangGraph StateGraph from YAML
│   │   ├── workflow_loader.py   # YAML loader + normaliser + validator
│   │   ├── workflow_builder.py  # Programmatic workflow builder API
│   │   ├── guardrails.py        # LLM safety post-processing (3-level)
│   │   ├── hallucination.py     # Two-stage hallucination grounding check
│   │   ├── template.py          # {state.field} template rendering
│   │   └── json_repair.py       # Auto-repair malformed LLM JSON
│   ├── nodes/
│   │   ├── data_collector.py    # Pre-extraction, tools, interrupt/resume
│   │   ├── custom_action.py     # Business logic execution + routing
│   │   └── response_node.py     # End node with guardrails
│   ├── tracking/
│   │   ├── models.py            # Task + event data models
│   │   ├── tracker.py           # SQLite task store + pub/sub
│   │   └── middleware.py        # FastAPI middleware for auto task creation
│   ├── validators/
│   │   ├── builtin.py           # regex, length, numeric, one_of, not_empty
│   │   └── registry.py          # run_validation() dispatch
│   ├── actions/                 # Example custom actions (incl. store.py)
│   ├── workflows/               # YAML workflow definitions (auto-discovered)
│   ├── storage/
│   │   └── checkpointer.py      # SqliteSaver / PostgresSaver / MemorySaver
│   ├── llm/
│   │   └── provider.py          # get_llm() — 5 providers + llm_params support
│   ├── api/
│   │   └── server.py            # FastAPI: workflow + task endpoints (10 total)
│   ├── observability/
│   │   ├── langfuse_handler.py  # Langfuse callback factory
│   │   └── tracing.py           # OTel / Jaeger spans (incl. hallucination + follow_up)
│   ├── docker/                  # docker-compose.yml + init-db.sql
│   ├── core/
│   │   ├── language.py          # Language detection + caching
│   │   └── history.py           # Conversation history helpers
│   ├── cli/
│   │   └── app.py               # All CLI commands (Typer)
│   ├── validations/             # Your custom validators go here
│   ├── config.py                # load_config() — YAML + .env
│   ├── config.yaml              # Global runtime config
│   ├── intent.py                # classify_intent() → workflow name
│   └── main.py                  # Interactive CLI entry point
├── frontend/                    # Real-time Kanban board (React)
├── workflows/                   # YAML files (add yours here)
├── actions/                     # Your Python business logic goes here
├── tests/
│   ├── unit/                    # Unit tests (no LLM/DB required)
│   └── integration/             # Integration tests
├── workflow-builder/            # Visual drag-and-drop builder (React)
├── .github/workflows/ci.yml     # GitHub Actions CI/CD
└── README.md                    # ← This file (single source of truth)
```

**As a developer, you only work in:**
- `workflows/` — define conversations in YAML
- `actions/` — write Python business logic
- `validations/` — write custom validators
- `config.yaml` — configure LLM, guardrails, logging

---

## Developer & Agent Reference

> For AI agents: this section is the authoritative reference. Read before starting any work.

### Workspace

- **Root**: `/Users/sridhar/projects/langgrpah/flowgraph-ai/`
- **Package manager**: `uv` (`pyproject.toml`)
- **Python**: 3.12+
- **Venv**: `.venv/`

### Run commands

```bash
cd flowgraph-ai

# Interactive CLI
uv run python flowgraph_ai/main.py

# API server
uv run uvicorn flowgraph_ai.api.server:app --reload

# Run unit tests
uv run pytest tests/unit/ -v
```

### Key files for agents

| File | Purpose |
|---|---|
| `flowgraph_ai/nodes/data_collector.py` | Pre-extraction, tools, single/multi-field logic |
| `flowgraph_ai/engine/graph_builder.py` | Builds LangGraph StateGraph from workflow config |
| `flowgraph_ai/engine/workflow_loader.py` | Loads and normalises YAML workflows |
| `flowgraph_ai/engine/guardrails.py` | 3-level guardrails merge + apply |
| `flowgraph_ai/engine/hallucination.py` | Two-stage hallucination check + auto-correction |
| `flowgraph_ai/tracking/tracker.py` | SQLite task store + pub/sub |
| `flowgraph_ai/tracking/middleware.py` | FastAPI middleware for auto task creation |
| `flowgraph_ai/api/server.py` | FastAPI endpoints + follow-up + task endpoints |
| `flowgraph_ai/main.py` | CLI entry point + follow-up in CLI |
| `flowgraph_ai/cli/app.py` | All CLI commands including `flowgraph init` |
| `flowgraph_ai/intent.py` | Intent classification |
| `flowgraph_ai/llm/provider.py` | LLM provider factory + llm_params support |
| `flowgraph_ai/storage/checkpointer.py` | Checkpointer singleton |

### Adding a new workflow

1. Create `workflows/my_workflow.yaml`
2. Add custom actions in `actions/my_actions.py` (if needed)
3. Add custom validators in `validations/my_validators.py` (if needed)
4. Test: `flowgraph validate my_workflow` → `flowgraph run my_workflow`
5. Update this README if architecture changes

For detailed session-by-session development history, architectural decisions, and file change logs, see [agent_work_logs.md](agent_work_logs.md).

---

## Testing

```bash
# Unit tests (no LLM or DB required — all LLM calls are mocked)
uv run pytest tests/unit/ -v
uv run pytest tests/unit/ --cov=flowgraph_ai --cov-report=term-missing

# Integration tests (requires running LLM + DB)
uv run pytest tests/integration/ -v

# Via CLI
flowgraph test
flowgraph test --coverage
flowgraph test --integration
```

Tests live in:
- `tests/unit/` — fast, isolated, no API keys needed (754 tests, 86% coverage)
- `tests/integration/` — test full graph execution with real LLM + DB

---

## Architecture

```
User / Client
    ↓
FastAPI  (api/server.py)
    ↓
Intent Classifier  (intent.py)  — routes message to correct workflow YAML
    ↓
Graph Builder  (engine/graph_builder.py)  — builds LangGraph StateGraph from YAML
    ↓
LangGraph StateGraph
    ├── data_collector   — interrupt() / resume, pre-extraction, tools, validation
    ├── custom_action    — executes Python, routes on return value
    └── response         — renders output, applies LLM guardrails
    ↓
Checkpointer  (storage/checkpointer.py)  — SQLite / PostgreSQL / memory
    ↓
LLM Provider  (llm/provider.py)  — Claude / OpenAI / Ollama / OpenRouter / LM Studio
    ↓
Observability
    ├── Langfuse  — LLM call tracing (callbacks injected at invocation)
    └── Jaeger    — Distributed request tracing (OpenTelemetry spans)
```

---

## Changelog

See [CHANGELOG.md](CHANGELOG.md) for the full history.

### [0.2.0] — 2026-03-12

- **Real-time Kanban board** (`frontend/`) — 8-column board with SSE live updates and activity timeline
- **Task tracking** (`flowgraph_ai/tracking/`) — SQLite pub/sub task store; auto-creates a card on every workflow start
- **Hallucination detection** (`engine/hallucination.py`) — two-stage grounding check with auto-correction
- **LLM parameter customization** — per-node `llm_params:` for `temperature`, `max_tokens`, `top_p`, `model`, etc.
- **`args_from_state` for custom_action** — clean kwarg-based function calling; maps state fields to function params
- **Correlation IDs** — links API response, Kanban card, Jaeger trace, and Langfuse trace
- **4 new task API endpoints** — `GET /tasks`, `GET /tasks/{id}`, `GET /tasks/session/{id}`, `GET /tasks/stream/events`
- **754 unit tests** passing; 86% code coverage
- Backward compatible — no breaking changes from v0.1.0

### [0.1.0] — 2026-03-12

- **Pre-extraction** — extracts field values from the user's first message before asking
- **Tools in DataCollector** — LLM calls Python tools during data collection (multi-round loop)
- **Follow-up handling** — `POST /message` on a completed session returns `status: "follow_up"`
- **`flowgraph init` command** — scaffolds a complete project with two ready-to-run examples
- **GitHub Actions CI/CD** — unit tests, lint, package build on push/PR
- **155 unit tests** passing

### [0.0.1] — 2026-03-11

Initial public release — 3 node types, LangGraph engine, FastAPI, 5 LLM providers, observability.

---

## Roadmap

| Item | Priority |
|---|---|
| Push to remote git repo | High |
| Multi-field extraction improvement for small models (8B) | Medium |
| Redis checkpointer (horizontal scaling) | Medium |
| Workflow versioning | Low |
| Analytics dashboard (Langfuse-backed) | Low |

---

## License

MIT — see [LICENSE](LICENSE)

---

## Contributing

Contributions are welcome. Open an issue before submitting a PR.

Bug reports: include `flowgraph health` output + full error message.

- Email: support@simpletools.in
- Issues: https://github.com/simpletoolsindia/flowgraphai/issues
