Metadata-Version: 2.4
Name: donkit-llm-agent
Version: 0.3.0
Summary: Reusable LLM Agent with tool calling and MCP support
Author: Donkit AI
Author-email: opensource@donkit.ai
Requires-Python: >=3.12,<3.14
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: donkit-llm (>=0.1.15,<0.2.0)
Requires-Dist: loguru (>=0.7.3,<0.8.0)
Requires-Dist: pydantic (>=2.8.0,<3.0.0)
Description-Content-Type: text/markdown

# donkit-llm-agent

Reusable LLM agent with tool calling, MCP support, and automatic context compression.

## Overview

`donkit-llm-agent` implements an **agentic loop**: calls an LLM, detects tool call requests, executes the tools, feeds results back — and repeats until the LLM produces a final answer. Supports local Python functions and remote MCP servers as tool sources. Built-in streaming and multi-stage context compression.

## Installation

```bash
pip install donkit-llm-agent
```

**Requirements:** Python 3.12+

**Runtime dependencies:**
- `donkit-llm` — LLM provider abstractions
- `loguru` — logging
- `pydantic` — data validation

## Quick Start

```python
from donkit.llm import Message
from donkit.llm_agent import LLMAgent, AgentTool

# 1. Define a local tool
def search_handler(args: dict) -> str:
    return f"Search results for: {args['query']}"

search_tool = AgentTool(
    name="search",
    description="Search the web for information",
    parameters={
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"},
        },
        "required": ["query"],
    },
    handler=search_handler,
)

# 2. Create agent
llm = ...  # Your LLMModelAbstract implementation
agent = LLMAgent(provider=llm, tools=[search_tool])

# 3. Run
messages = [
    Message(role="system", content="You are a helpful assistant."),
    Message(role="user", content="What is the capital of France?"),
]
result = await agent.arespond(messages)
print(result)
```

## Core Concepts

### Agentic Loop

The agent runs a loop:
1. Sends messages to the LLM
2. If the LLM requests tool calls → execute tools, append results, repeat
3. If the LLM responds with text → return the text

The loop runs up to `max_iterations` times (default: 500). If the limit is reached, the agent returns an empty string.

### Tool Sources

Two types of tools can be used simultaneously:

- **Local tools** (`AgentTool`) — Python functions with a JSON Schema definition
- **MCP tools** (`MCPClientProtocol`) — tools exposed by remote MCP servers (stdio or HTTP transport)

### Context Compression

When the conversation history exceeds a token threshold, the `HistoryCompressor` automatically reduces context using one of three strategies (in order of preference):

1. **LLM-based summary** — summarizes old turns using the LLM itself
2. **Tool-call compression** — groups old tool call pairs into compact summaries
3. **Emergency truncation** — truncates oversized individual messages

---

## API Reference

### `LLMAgent`

Main agent class.

```python
class LLMAgent:
    def __init__(
        self,
        provider: LLMModelAbstract,
        tools: list[AgentTool] | None = None,
        mcp_clients: list[MCPClientProtocol] | None = None,
        max_iterations: int = 500,
        history_compressor: HistoryCompressor | None = None,
    ) -> None: ...
```

**Parameters:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `provider` | `LLMModelAbstract` | required | LLM provider from `donkit-llm` |
| `tools` | `list[AgentTool]` | `None` | Local Python tools |
| `mcp_clients` | `list[MCPClientProtocol]` | `None` | MCP server clients |
| `max_iterations` | `int` | `500` | Max agentic loop iterations |
| `history_compressor` | `HistoryCompressor` | `None` | Context compressor (uses defaults if `None`) |

---

#### `ainit_mcp_tools()`

Must be called before using the agent if `mcp_clients` are provided. Connects to MCP servers and discovers available tools.

```python
await agent.ainit_mcp_tools()
```

---

#### `arespond()`

Core agentic loop. Accepts a full message list, mutates it in place by appending tool results, and returns the final LLM response.

```python
messages = [Message(role="user", content="Do something")]
result: str = await agent.arespond(messages)
```

After the call, `messages` contains the full conversation including tool call records.

---

#### `arespond_stream()`

Streaming agentic loop. Accepts a full message list, mutates it in place, yields `StreamEvent` objects.

```python
async for event in agent.arespond_stream(messages):
    ...
```

---

### `AgentTool`

Wraps a local Python function as an agent tool.

```python
class AgentTool:
    def __init__(
        self,
        name: str,
        description: str,
        parameters: dict,          # JSON Schema object
        handler: Callable,         # Function to call: (dict) -> str
        is_async: bool = False,    # True if handler is async
    ) -> None: ...
```

**Sync handler:**
```python
def my_handler(args: dict) -> str:
    return f"Result: {args['input']}"

tool = AgentTool(
    name="process",
    description="Process input",
    parameters={
        "type": "object",
        "properties": {"input": {"type": "string"}},
        "required": ["input"],
    },
    handler=my_handler,
)
```

**Async handler:**
```python
async def my_async_handler(args: dict) -> str:
    result = await some_async_operation(args["input"])
    return str(result)

tool = AgentTool(
    name="process",
    description="Process input",
    parameters={...},
    handler=my_async_handler,
    is_async=True,
)
```

---

### `MCPClientProtocol`

Abstract interface for MCP clients. Implement this to connect the agent to any MCP server.

```python
from donkit.llm_agent import MCPClientProtocol

class MyMCPClient(MCPClientProtocol):
    @property
    def identifier(self) -> str:
        return "http://my-mcp-server:8000"

    @property
    def timeout(self) -> float:
        return 30.0

    @property
    def progress_callback(self) -> ProgressCallback | None:
        return None

    def list_tools(self) -> list[dict]:
        # Sync: return list of tool definitions
        ...

    async def alist_tools(self) -> list[dict]:
        # Async: return list of tool definitions
        ...

    def call_tool(self, name: str, arguments: dict) -> str:
        # Sync: call tool, return string result
        ...

    async def acall_tool(self, name: str, arguments: dict) -> str:
        # Async: call tool, return string result
        ...
```

**Tool definition format** (returned by `list_tools` / `alist_tools`):
```python
[
    {
        "name": "tool_name",
        "description": "What the tool does",
        "parameters": {
            "type": "object",
            "properties": {
                "param1": {"type": "string"},
            },
            "required": ["param1"],
        },
    },
]
```

**Progress callback signature:**
```python
ProgressCallback = Callable[[float, float | None, str | None], None]
#                            progress  total        message
```

---

### `StreamEvent`

Event yielded during streaming.

```python
@dataclass
class StreamEvent:
    type: EventType
    content: str | None = None      # For CONTENT events
    tool_name: str | None = None    # For TOOL_CALL_* events
    tool_args: dict | None = None   # For TOOL_CALL_START events
    error: str | None = None        # For TOOL_CALL_ERROR events
```

### `EventType`

```python
class EventType(StrEnum):
    CONTENT             # LLM text chunk
    TOOL_CALL_START     # Tool execution started
    TOOL_CALL_END       # Tool execution completed
    TOOL_CALL_ERROR     # Tool execution failed
    HISTORY_COMPRESSED  # Context was compressed
```

---

### `HistoryCompressor`

Controls context compression behavior.

```python
class HistoryCompressor:
    def __init__(
        self,
        token_threshold: int = 150_000,
        keep_recent_turns: int = 1,
        keep_recent_tool_pairs: int = 3,
        tool_result_summary_chars: int = 500,
        emergency_msg_max_chars: int = 4_000,
        summary_prompt: str = "Summarize this conversation concisely...",
        fallback_notice: str = "[CONVERSATION HISTORY TRUNCATED]...",
    ) -> None: ...
```

**Parameters:**
| Parameter | Default | Description |
|-----------|---------|-------------|
| `token_threshold` | `150_000` | Trigger compression when history exceeds this many tokens |
| `keep_recent_turns` | `1` | Number of recent user-assistant turns to keep verbatim (for LLM summary strategy) |
| `keep_recent_tool_pairs` | `3` | Number of recent tool call pairs to preserve verbatim (for tool-call compression) |
| `tool_result_summary_chars` | `500` | Max characters per tool result in compressed summaries |
| `emergency_msg_max_chars` | `4_000` | Max characters per message in emergency truncation |
| `summary_prompt` | (default) | System prompt used when asking LLM to summarize history |
| `fallback_notice` | (default) | Text inserted when LLM-based compression fails |

**Custom compressor:**
```python
from donkit.llm_agent import HistoryCompressor, LLMAgent

compressor = HistoryCompressor(
    token_threshold=100_000,
    keep_recent_turns=2,
    keep_recent_tool_pairs=5,
)

agent = LLMAgent(
    provider=llm,
    tools=tools,
    history_compressor=compressor,
)
```

**Direct usage:**
```python
compressed = await compressor.compress_if_needed(history, provider)
```

**Module-level helper** (uses default compressor with default settings):
```python
from donkit.llm_agent import compress_history_if_needed

compressed = await compress_history_if_needed(history, provider)
```

---

## Usage Examples

### Agent with MCP server

```python
from donkit.llm import Message
from donkit.llm_agent import LLMAgent

# mcp_client implements MCPClientProtocol
agent = LLMAgent(
    provider=llm,
    mcp_clients=[mcp_client],
)

# Required step: discover tools from MCP server
await agent.ainit_mcp_tools()

messages = [Message(role="user", content="List all files in /tmp")]
result = await agent.arespond(messages)
```

### Mixed local + MCP tools

```python
agent = LLMAgent(
    provider=llm,
    tools=[local_tool_1, local_tool_2],
    mcp_clients=[stdio_mcp_client, http_mcp_client],
)

await agent.ainit_mcp_tools()  # Only needed when mcp_clients present

messages = [Message(role="user", content="...")]
result = await agent.arespond(messages)
```

### Streaming with event handling

```python
async for event in agent.arespond_stream(messages):
    match event.type:
        case EventType.CONTENT:
            print(event.content, end="", flush=True)

        case EventType.TOOL_CALL_START:
            print(f"\n[→ {event.tool_name}({event.tool_args})]")

        case EventType.TOOL_CALL_END:
            print(f"[← {event.tool_name} done]")

        case EventType.TOOL_CALL_ERROR:
            print(f"[✗ {event.tool_name}: {event.error}]")

        case EventType.HISTORY_COMPRESSED:
            print("\n[context compressed]")
```

### Working with message history directly

```python
from donkit.llm import Message

messages = [
    Message(role="system", content="You are a coding assistant."),
    Message(role="user", content="Help me refactor this function."),
]

# arespond mutates the messages list — tool calls are appended in place
result = await agent.arespond(messages)

# Inspect full conversation after the call
for msg in messages:
    print(f"{msg.role}: {msg.content[:100]}")

# Continue the conversation
messages.append(Message(role="user", content="Now add type hints."))
result2 = await agent.arespond(messages)
```

### Limiting iterations

```python
# Agent will stop after 10 tool calls and return "" if no final answer
agent = LLMAgent(
    provider=llm,
    tools=tools,
    max_iterations=10,
)
```

---

## Testing

The library ships with mock classes for unit testing agents without real LLM or MCP servers.

### `BaseMockProvider`

```python
from donkit.llm_agent.testing import BaseMockProvider

provider = BaseMockProvider(
    supports_tools_val=True,
    supports_streaming_val=False,
    model_name_val="mock-model",
    responses=[
        # First call: request a tool
        {"tool_calls": [{"name": "search", "arguments": {"query": "Paris"}}]},
        # Second call: produce final answer
        {"content": "The capital of France is Paris."},
    ],
)

agent = LLMAgent(provider=provider, tools=[search_tool])
result = await agent.arespond(messages)

# Inspect calls made to the mock
assert provider.call_count == 2
assert provider.messages_history[0][0].role == "user"
```

### `BaseMockMCPClient`

```python
from donkit.llm_agent.testing import BaseMockMCPClient

mcp_client = BaseMockMCPClient(
    name="file-server",
    tools={
        "read_file": {
            "description": "Read a file from disk",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string"},
                },
                "required": ["path"],
            },
            "handler": lambda args: f"Contents of {args['path']}: hello world",
        },
        "list_files": {
            "description": "List files in a directory",
            "parameters": {
                "type": "object",
                "properties": {
                    "directory": {"type": "string"},
                },
            },
            # No handler = returns empty string by default
        },
    },
)

agent = LLMAgent(provider=provider, mcp_clients=[mcp_client])
await agent.ainit_mcp_tools()
result = await agent.arespond(messages)
```

### Full test example

```python
import pytest
from donkit.llm import Message
from donkit.llm_agent import LLMAgent, AgentTool, EventType
from donkit.llm_agent.testing import BaseMockProvider

@pytest.mark.asyncio
async def test_agent_calls_tool_and_returns_answer():
    call_log = []

    def my_tool(args: dict) -> str:
        call_log.append(args)
        return "tool result"

    tool = AgentTool(
        name="my_tool",
        description="Test tool",
        parameters={
            "type": "object",
            "properties": {"x": {"type": "string"}},
        },
        handler=my_tool,
    )

    provider = BaseMockProvider(
        supports_tools_val=True,
        responses=[
            {"tool_calls": [{"name": "my_tool", "arguments": {"x": "hello"}}]},
            {"content": "Done!"},
        ],
    )

    messages = [Message(role="user", content="Do the thing")]
    agent = LLMAgent(provider=provider, tools=[tool])
    result = await agent.arespond(messages)

    assert result == "Done!"
    assert call_log == [{"x": "hello"}]
    assert provider.call_count == 2
```

---

## Error Handling

| Situation | Behavior |
|-----------|----------|
| Tool not found | Returns `"Error: Tool 'name' not found."` as tool result |
| Tool raises exception | Returns `"Error: {exception_message}"` as tool result |
| `KeyboardInterrupt` / `asyncio.CancelledError` during tool | Returns `"Tool execution cancelled by user (Ctrl+C)"` |
| LLM compression fails | Falls back to mechanical truncation, inserts `fallback_notice` |
| Max iterations reached | Returns empty string `""` |

Tool errors do not crash the agent — the error message is fed back to the LLM as a tool result, allowing the LLM to handle or report it.

---

## Architecture

```
LLMAgent
├── provider: LLMModelAbstract          ← LLM provider (donkit-llm)
├── local_tools: list[AgentTool]        ← Local Python functions
├── mcp_clients: list[MCPClientProtocol] ← Remote MCP servers
├── mcp_tools: dict[str, ...]           ← Discovered MCP tools (after ainit_mcp_tools)
└── history_compressor: HistoryCompressor

Agentic loop (arespond / arespond_stream):
  1. compress_if_needed(history)
  2. LLM.generate(messages, tools=all_tool_specs)
  3. if tool_calls:
       for each tool_call:
           execute_tool(name, args)
           append result to messages
       goto 1
  4. return response.content
```

---

## Package Info

| Field | Value |
|-------|-------|
| Package name | `donkit-llm-agent` |
| Version | `0.2.0` |
| Python | `>=3.12,<3.14` |
| License | See repository |
| Authors | Donkit AI `<opensource@donkit.ai>` |

