Metadata-Version: 2.4
Name: harnessloop
Version: 0.3.0
Summary: Lightweight zero-dependency Agent orchestration framework: Tool Registry, Pipeline/ReAct Agent, Multi-Agent Orchestrator, Pluggable Verification, and Loop Engineering
Author: Gao Yuwei (Willsgao)
License: MIT
Project-URL: Homepage, https://github.com/willsgao/harnessloop
Project-URL: Repository, https://github.com/willsgao/harnessloop
Project-URL: Issues, https://github.com/willsgao/harnessloop/issues
Keywords: agent,agent-framework,orchestrator,react,multi-agent,loop-engineering,pipeline,llm,ai-agent,tool-calling
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: license-file

# harnessloop

> 轻量级零依赖 Agent 编排框架  
> **Model + Harness = Agent**

[![Python](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Zero Dependencies](https://img.shields.io/badge/dependencies-0-brightgreen.svg)]()

---

## 概述

`harnessloop` 是一个**零外部依赖**的 Agent 编排框架，提供从 Tool 注册、Agent 执行循环、多 Agent 编排到验证规则引擎的完整控制流。

核心思想：**Agent 循环的本质是 决策→执行→观察→验证→收敛 这套骨架**，Pipeline、ReAct、多 Agent 协作都只是在骨架上的不同变体。

### 架构

```
harnessloop/
├── harnessloop/
│   ├── exceptions.py          # 结构化异常体系（类型优先 + 字符串降级）
│   ├── tool_registry.py       # Tool / ToolResult / ToolRegistry
│   ├── agent.py               # Agent（Pipeline）+ ReActAgent（LLM 动态推理）
│   ├── orchestrator.py        # Orchestrator（约束→执行→验证→纠错→收敛 + HITL + Streaming）
│   ├── verification.py        # RuleEngine（输入/输出分层护栏 + 5 条内置规则）
│   └── loop/                  # Loop Engineering 生产级加固
│       ├── termination.py     #   Terminator：6 种可组合终止条件
│       ├── smart_retry.py     #   SmartRetry：错误分类 + 指数退避 + 死信队列
│       ├── loop_state.py      #   LoopState：结构化循环状态追踪
│       ├── hitl.py            #   HITLManager：异步/同步人机协作
│       ├── checkpoint.py      #   Checkpointer：JSON 检查点，可恢复中断执行
│       ├── callbacks.py       #   CallbackManager：8 阶段步骤回调（监控/通知/日志）
│       └── plan_agent.py      #   PlanAgent：先规划再执行
└── setup.py
```

### 执行流程图

```mermaid
flowchart TD
    Start(["📋 接收任务"]) --> Guard["🛡️ 输入护栏<br/>MaxInputLength / SensitiveKeyword"]
    Guard -->|通过| Decide{"🤔 decide_next_action()<br/>Pipeline 按序 / ReAct LLM 推理"}
    Guard -->|阻断 tripwire| Fail(["❌ 终止"])

    Decide -->|无更多动作| Converge["🏁 收敛"]
    Decide -->|选择 Tool| CB_BEFORE["📡 BEFORE_EXECUTE"]
    CB_BEFORE --> Retry{"♻️ SmartRetry<br/>错误分类 + 指数退避"}

    Retry -->|瞬时故障| Backoff["⏳ 退避等待"]
    Backoff --> Retry
    Retry -->|永久故障| Dead["📬 死信队列"]
    Dead --> HITL{"🧑 HITL 人工决策？"}
    Retry -->|成功| Execute["✅ Tool.execute()"]

    HITL -->|异步挂起| Ckpt["💾 Checkpoint 保存"]
    Ckpt --> Pause["⏸️ 返回 AWAITING_HUMAN_DECISION"]
    HITL -->|同步重试| Decide
    HITL -->|同步跳过| Decide
    HITL -->|同步中止| Fail

    Execute --> Observe["👁️ Observation + memory"]
    Observe --> CB_AFTER["📡 AFTER_EXECUTE / ON_ERROR"]
    CB_AFTER --> Verify{"🔍 RuleEngine<br/>输出验证"}
    Verify -->|未通过| Decide
    Verify -->|通过| CB_VERIFY["📡 AFTER_VERIFY"]
    CB_VERIFY --> Term{"🛑 Terminator<br/>6 种终止条件"}

    Term -->|完成/步数上限/连续失败/循环检测/工具耗尽| Converge
    Term -->|继续| Decide

    Converge --> CB_CONV["📡 ON_CONVERGE"]
    CB_CONV --> Output["📊 OrchestrationResult<br/>或 StreamEvent 流式输出"]
```

### 三层架构

```mermaid
flowchart LR
    subgraph Orchestrator["🎯 Orchestrator 编排层"]
        direction TB
        O1["多 Agent 串行协作"]
        O2["验证 + 纠错 + 收敛"]
        O3["HITL 人工决策"]
        O4["Streaming 流式输出"]
        O1 --> O2 --> O3 --> O4
    end

    subgraph Agent["🤖 Agent 执行层"]
        direction TB
        A1["Pipeline 模式<br/>按工具注册顺序执行"]
        A2["ReAct 模式<br/>LLM think → act → observe"]
        A3["PlanAgent<br/>先规划再执行"]
    end

    subgraph Tools["🔧 Tool 注册层"]
        direction TB
        T1["Tool 抽象 + Registry"]
        T2["重试 / 超时 / Fallback"]
        T3["死信队列"]
    end

    subgraph LoopEng["🔄 Loop Engineering（灰度可拔插）"]
        direction LR
        L1["Terminator"]
        L2["SmartRetry"]
        L3["Callbacks"]
        L4["Checkpoint"]
    end

    Orchestrator --> Agent
    Agent --> Tools
    LoopEng -.->|LoopConfig 灰度开关注入| Orchestrator
    LoopEng -.->|LoopConfig 灰度开关注入| Agent
```

---

## 安装

```bash
pip install harnessloop
# 或
pip install harnessloop
```

---

## 快速开始

### 1. 定义 Tool

```python
from harnessloop import Tool, ToolResult

class GreetingTool(Tool):
    name = "greet"
    description = "Say hello to someone"
    input_schema = {"name": "str"}

    def execute(self, name: str = "World", **kwargs) -> ToolResult:
        return ToolResult(success=True, data={"greeting": f"Hello, {name}!"})

class WeatherTool(Tool):
    name = "weather"
    description = "Get weather for a city"
    input_schema = {"city": "str"}

    def execute(self, city: str = "Beijing", **kwargs) -> ToolResult:
        return ToolResult(success=True, data={"city": city, "temp": "22°C"})
```

### 2. Pipeline 模式（固定管线）

```python
from harnessloop import Agent, Orchestrator

agent = Agent(
    name="greeter",
    tools=[GreetingTool(), WeatherTool()],
    system_prompt="You are a helpful assistant.",
)

orchestrator = Orchestrator(agents=[agent], max_loops=10)
result = orchestrator.run(
    task="Greet Alice and check Beijing weather",
    context={"name": "Alice", "city": "Beijing"},
)
print(result.to_dict())
```

### 3. ReAct 模式（LLM 动态推理）

```python
from harnessloop import ReActAgent

def my_llm(prompt: str) -> str:
    """替换为你自己的 LLM 调用"""
    import openai
    client = openai.OpenAI(api_key="your-key")
    response = client.chat.completions.create(
        model="gpt-4", messages=[{"role": "user", "content": prompt}],
        temperature=0.1, max_tokens=2000,
    )
    return response.choices[0].message.content

agent = ReActAgent(
    name="data_analyst",
    tools=[GreetingTool(), WeatherTool()],
    system_prompt="You are a helpful assistant. Use tools to answer questions.",
    llm_call=my_llm,
    max_steps=5,
)

orchestrator = Orchestrator(agents=[agent])
result = orchestrator.run(task="What's the weather in Tokyo?")
```

### 4. Loop Engineering 加固（可选启用）

```python
from harnessloop import LoopConfig, LoopState, Terminator

config = LoopConfig(
    max_steps=10,
    consecutive_failures_limit=3,
    enable_terminator=True,
    enable_smart_retry=True,
    enable_loop_state=True,
)

orchestrator = Orchestrator(
    agents=[agent],
    loop_config=config,
)
```

### 5. 多 Agent 协作

```python
agent_a = Agent(name="parser", tools=[...])
agent_b = Agent(name="auditor", tools=[...])

orchestrator = Orchestrator(agents=[agent_a, agent_b])
# agent_a 的 final_data 自动注入 agent_b 的 context["previous_agent_output"]
result = orchestrator.run(task="Parse and audit the document")
```

### 6. 验证规则

```python
from harnessloop import RuleEngine, NotNullRule, ColumnConsistencyRule

verifier = RuleEngine()
verifier.register(NotNullRule())
verifier.register(ColumnConsistencyRule())

orchestrator = Orchestrator(agents=[agent], verifier=verifier)
```

---

## 模块说明

| 模块 | 作用 | 关键类 |
|------|------|--------|
| **exceptions** | 结构化异常体系 | `HarnessLoopError`, `ToolTimeoutError`, `ToolNotFoundError`, `ToolExecutionError`, `GuardrailTripError` |
| **tool_registry** | Tool 抽象与注册中心 | `Tool`, `ToolResult`, `ToolRegistry` |
| **agent** | Agent 基类 + ReActAgent | `Agent`（Pipeline）, `ReActAgent`（LLM驱动推理） |
| **orchestrator** | 多 Agent 编排器 | `Orchestrator`, `OrchestrationResult`, `StreamEvent` |
| **verification** | 输入/输出分层护栏 | `Rule`, `RuleEngine`, `NotNullRule`, `ColumnConsistencyRule`, `TableCountRule`, `MaxInputLengthRule`, `SensitiveKeywordRule` |
| **loop/termination** | 多条件终止引擎 | `Terminator`, `TermReason`（6 种） |
| **loop/smart_retry** | 智能重试（类型优先） | `SmartRetry`, `ErrorCategory`, `DeadLetter`, `RetryResult` |
| **loop/loop_state** | 结构化状态追踪 | `LoopState`, `AttemptRecord`, `Strategy` |
| **loop/hitl** | 人机协作 | `HITLManager`, `HITLRequest`（同步+异步双模式） |
| **loop/checkpoint** | 可恢复执行检查点 | `Checkpoint`, `Checkpointer`（JSON 持久化） |
| **loop/callbacks** | 8 阶段步骤回调 | `CallbackManager`, `StepPhase` |
| **loop/plan_agent** | 先规划再执行 | `PlanAgent`, `PlanStep` |

---

## Loop Engineering

Agent 循环的生产级加固，全部通过 `LoopConfig` 灰度开关逐模块启用：

```python
config = LoopConfig(
    # Terminator
    max_steps=10,
    consecutive_failures_limit=3,
    loop_detection_threshold=5,

    # SmartRetry
    base_delay=1.0,
    max_delay=30.0,
    jitter_factor=0.1,

    # HITL
    hitl_timeout_seconds=300,

    # 灰度开关
    enable_terminator=True,
    enable_smart_retry=True,
    enable_loop_state=True,
    enable_hitl=True,
    enable_plan_agent=False,   # 先规划再执行（默认关闭）
    enable_checkpoint=False,   # Checkpoint 持久化（默认关闭）
)
```

### Terminator 终止条件优先级

```
P0: EXPLICIT_ABORT / TASK_COMPLETE
P1: MAX_STEPS / CONSECUTIVE_FAILURES
P2: LOOP_DETECTED / ALL_TOOLS_EXHAUSTED
```

### SmartRetry 错误分类

- **类型优先**：`ToolTimeoutError` → TRANSIENT, `ToolNotFoundError` → PERMANENT
- **字符串降级**：兼容旧版错误消息，中英文关键词匹配
- **TRANSIENT**（瞬时故障）→ 指数退避重试
- **PERMANENT**（永久故障）→ 立即返回失败，进死信队列
- **UNKNOWN**（未知）→ 保守重试

### Callbacks 8 阶段回调（v0.3.0 新增）

```python
from harnessloop import CallbackManager, StepPhase

cbm = CallbackManager()

@cbm.on(StepPhase.AFTER_EXECUTE)
def log_to_monitor(agent, tool, step, success, **kw):
    print(f"[{agent}] {tool} step={step} {'OK' if success else 'FAIL'}")

orchestrator = Orchestrator(agents=[agent], callback_manager=cbm)
```

支持的阶段：`BEFORE_DECIDE`, `AFTER_DECIDE`, `BEFORE_EXECUTE`, `AFTER_EXECUTE`, `ON_ERROR`, `ON_CONVERGE`, `BEFORE_VERIFY`, `AFTER_VERIFY`

### Checkpoint 可恢复中断（v0.3.0 新增）

```python
from harnessloop import Checkpointer

checkpointer = Checkpointer(store_dir="./.checkpoints")
cp = checkpointer.save(agent, task, context, hitl_request_id="...")
# 进程重启后恢复
cp = checkpointer.load(cp.checkpoint_id)
# cp.memory_snapshot, cp.loop_state_snapshot, cp.react_trace 均可用
```

纯 JSON 文件存储，零数据库依赖。与 HITL 异步模式联动：触发人工决策时自动保存检查点。

### HITL 双模式

- **异步模式**：挂起任务 → 保存 Checkpoint → 返回 `AWAITING_HUMAN_DECISION` → 等待外部 API 决策
- **同步模式**：调用回调函数阻塞等待（兼容旧接口）

---

## 设计原则

1. **零依赖**：只使用 Python 标准库（`dataclasses`, `enum`, `concurrent.futures`, `hashlib`, `json`, `re`, `time`）
2. **LLM 由外部注入**：Agent 框架本身不绑定任何 LLM SDK，通过 `llm_call` 回调注入
3. **分层明确**：Tool → Agent → Orchestrator 三层独立，每层可单独测试
4. **灰度可控**：Loop Engineering 所有模块通过 `LoopConfig` 按需启用

---

## 实战验证

已在 [BankDataViz](https://github.com/willsgao/BankDataViz) 项目中实战验证：
- 12 家银行 2020-2024 年报全流程处理
- Pipeline：OCR → LLM分析 → 表格重建 → 审计
- ReAct：自然语言驱动的多 Tool 协作数据分析
- Loop Engineering：连续失败保护、循环检测、死信队列

---

## License

MIT
