Metadata-Version: 2.4
Name: agentkeeper
Version: 0.1.1
Summary: Coordination infrastructure for AI agents — registry, leases, leader election, and entitlements over gRPC
Project-URL: Repository, https://github.com/agentkeeper/agentkeeper
License: MIT
Keywords: agents,ai,coordination,grpc,llm,multi-agent,orchestration
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.10
Requires-Dist: grpcio>=1.63.0
Requires-Dist: protobuf>=6.31.1
Description-Content-Type: text/markdown

# AgentKeeper Python SDK

Coordination infrastructure for AI agent systems. AgentKeeper gives your agents a shared nervous system: they can register themselves, discover each other, claim exclusive tasks, elect a leader, and enforce who is allowed to do what — all over a single gRPC connection.

## What it does

| Feature | What it solves |
|---|---|
| **Agent Registry** | Agents announce themselves with a description and tool list. Dead agents are detected automatically via heartbeat timeouts. |
| **Semantic Search** | Find the right agent for a task using a natural language query. |
| **Lease Manager** | Guarantees exactly one agent works on a given task at a time. Leases expire automatically if an agent crashes. |
| **Leader Election** | Multiple orchestrator instances stand by; only one leads. Failover is automatic when the leader dies. |
| **Entitlements** | ABAC policy engine. Controls which agents can call which tools. DENY rules always override ALLOW. |

## Installation

```bash
pip install agentkeeper
```

The SDK connects to a running AgentKeeper daemon. Download the binary for your platform from the [releases page](https://github.com/agentkeeper/agentkeeper/releases) and run it:

```bash
./agentkeeper   # listens on :7070 by default
```

## Quick start

The typical flow has two sides: an **orchestrator** that creates tasks and assigns them, and **agents** that register and do the work.

```python
import uuid
from agentkeeper import AgentKeeperClient, AgentInfo, WatchEvent

client = AgentKeeperClient(host="localhost", port=7070)

# ── Orchestrator side ──────────────────────────────────────────────────────

# React to events in real time — no polling needed
def on_event(event: WatchEvent):
    if event.topic == "agent.died":
        agent: AgentInfo = event.payload
        print(f"Agent {agent.card.name} died — reassigning its tasks")
        reassign_tasks_for(agent.agent_id)

sub = client.watch(["agent."], on_event)

# Create a task — task_id is whatever ID your system uses (database row,
# queue message ID, UUID). AgentKeeper doesn't create tasks; it coordinates
# who works on them.
task_id = f"task:{uuid.uuid4()}"

# Find the best available agent for the work
agents = client.find(query="fetch and parse web pages", tools=["browser"])
for agent in agents:
    lease = client.acquire_lease(task_id, agent.agent_id, ttl=120)
    if lease:
        print(f"Assigned {task_id} to {agent.card.name}")
        send_task_to_agent(agent.agent_id, task_id)
        break  # one winner, everyone else is locked out

# ── Agent side ─────────────────────────────────────────────────────────────

with client.register(
    name="scraper-1",
    description="Fetches and parses web pages",
    version="1.0.0",
    tools=["browser", "http_client"],
) as agent:
    # Heartbeat runs automatically in the background.
    # The agent stays ALIVE as long as this block is running.

    task_id = receive_task_assignment()  # from orchestrator

    with client.acquire_lease(task_id, agent.agent_id, ttl=120) as lease:
        do_work(task_id)
    # lease released automatically on exit

# Agent deregistered automatically on exit — triggers agent.died on the bus
```

## Connecting

**Sidecar mode** — connect to a daemon that is already running:

```python
# Default: localhost:7070
client = AgentKeeperClient()

# Custom host/port
client = AgentKeeperClient(host="agentkeeper.internal", port=7070)

# Use as a context manager to close the channel on exit
with AgentKeeperClient() as client:
    ...
```

**Embedded mode** — the SDK finds and starts the daemon for you. Useful for local development and tests. The daemon process is shut down automatically when the client closes:

```python
# SDK locates the binary and starts it on a free port
with AgentKeeperClient.embedded() as client:
    with client.register("my-agent", ...) as agent:
        ...
# daemon is stopped here

# Explicit binary path or port
client = AgentKeeperClient.embedded(
    binary="/usr/local/bin/agentkeeper",
    port=9090,
)

# With persistence — policies and agent history survive restarts
client = AgentKeeperClient.embedded(
    data_dir="/var/lib/myapp/agentkeeper",
)
```

Binary resolution order for embedded mode:
1. `binary=` argument
2. `AGENTKEEPER_BIN` environment variable
3. `agentkeeper` on `PATH`
4. `./daemon/agentkeeper` relative to the project root (dev layout)

## Dashboard and metrics

The daemon exposes an HTTP server on port `9090` (configurable with `--metrics-addr`):

| Endpoint | What it returns |
|---|---|
| `http://localhost:9090/` | Live dashboard — agents alive, leases active. Auto-refreshes every 5 seconds. |
| `http://localhost:9090/healthz` | `ok` with HTTP 200 — for K8s liveness probes and load balancers. |
| `http://localhost:9090/metrics` | Prometheus text format. |

Open **http://localhost:9090** in your browser while the daemon is running to see the dashboard.

Prometheus metrics exposed:

| Metric | Type | Description |
|---|---|---|
| `agentkeeper_agents_alive` | Gauge | Agents currently ALIVE or UNREACHABLE |
| `agentkeeper_leases_active` | Gauge | Leases currently held |
| `agentkeeper_rpc_total{method="..."}` | Counter | gRPC calls served per method |

## Agent Registry

### Register an agent

```python
with client.register(
    name="analyst-1",          # unique name within your deployment
    description="Reads data and produces summaries using GPT-4",
    version="2.1.0",
    tools=["sql_runner", "pdf_reader"],
    metadata={"team": "data", "region": "us-east"},
    ttl=30,                    # seconds before heartbeat is required
) as agent:
    print(agent.agent_id)
    # background thread sends Heartbeat every ttl/2 seconds automatically
```

The `ttl` controls how quickly a crashed agent is detected. An agent that stops heartbeating transitions `ALIVE → UNREACHABLE → DEAD`. Once `DEAD`, all its leases are released and it is removed from elections.

### Find agents

```python
# Semantic search — returns agents ranked by relevance to the query
agents = client.find(query="fetch and parse web pages", limit=5)

# Hard filter by tools — only returns agents that have ALL listed tools
agents = client.find(tools=["browser", "http_client"])

# Combine: semantic ranking within the tool-filtered set
agents = client.find(
    query="scrape structured data from HTML",
    tools=["browser"],
    limit=3,
)

for agent in agents:
    print(agent.agent_id, agent.card.name, agent.status)
```

### Inspect an agent

```python
from agentkeeper import AgentStatus

info = client.get_agent(agent_id)
print(info.card.name)          # "scraper-1"
print(info.status)             # AgentStatus.ALIVE
print(info.registered_at)      # datetime
print(info.last_heartbeat_at)  # datetime
```

## Lease Manager

Leases solve the double-execution problem: when two agents race to pick up the same task, only one wins. The winning agent holds an exclusive lock until the work is done, the TTL expires, or the agent dies.

### Acquire a lease

```python
lease = client.acquire_lease(
    task_id="task:process-invoice-42",  # any string key
    agent_id=agent.agent_id,
    ttl=120,                            # seconds
)

if lease is None:
    print("Another agent already owns this task")
else:
    print(f"I own it. Token: {lease.lease.token}")
    lease.release()
```

### Use as a context manager (recommended)

```python
lease = client.acquire_lease("task:process-invoice-42", agent.agent_id)
if lease:
    with lease:
        do_work()
    # lease.release() called automatically on exit, even on exception
```

### Renew a lease for long-running tasks

```python
with client.acquire_lease("task:long-job", agent.agent_id, ttl=60) as lease:
    for chunk in process_large_file():
        lease.renew(ttl=60)  # reset the TTL after each chunk
        process(chunk)
```

### Typical orchestrator pattern

```python
agents = client.find(query="process invoices", tools=["pdf_reader"])

for agent in agents:
    lease = client.acquire_lease("task:invoice-42", agent.agent_id)
    if lease:
        # This agent won the race — assign the work
        assign_task(agent.agent_id, "task:invoice-42")
        break
```

## Leader Election

Use leader election when you run multiple orchestrator instances for high availability. Only one is active at a time; the others are hot standbys.

```python
with client.register("orchestrator", "Pipeline orchestrator", "1.0.0") as orch:

    state = client.campaign(group="orchestrator:pipeline-A", agent_id=orch.agent_id)

    if state.leader_agent_id == orch.agent_id:
        print("I am the leader — running the pipeline")
        run_pipeline()
    else:
        print(f"Standby — leader is {state.leader_agent_id}")
        wait_for_leadership_change()
```

When the leader's agent dies (crash, `deregister()`, or heartbeat timeout), the daemon immediately re-elects from the remaining candidates and increments `term`. A new candidate joining never displaces the current leader — only death or `resign()` does.

```python
# Graceful handoff: resign so a standby takes over immediately
client.resign(group="orchestrator:pipeline-A", agent_id=orch.agent_id)
```

```python
# Check the current leader without campaigning
state = client.campaign("orchestrator:pipeline-A", orch.agent_id)
print(state.leader_agent_id)
print(state.term)        # logical clock — increments on every election event
print(state.elected_at)  # datetime
```

## Entitlements

The entitlements engine controls which agents can perform which actions on which resources. All three fields (`subject`, `action`, `resource`) support `*` as a wildcard.

### Set policies

```python
from agentkeeper import PolicyEffect

# Allow the orchestrator to call any tool
client.set_policy(
    id="allow-orch-all",
    subject="agent:orchestrator",
    action="call",
    resource="tool:*",
    effect=PolicyEffect.ALLOW,
)

# Deny the scraper from writing files (even if a broader allow exists)
client.set_policy(
    id="deny-scraper-write",
    subject="agent:scraper",
    action="call",
    resource="tool:write_file",
    effect=PolicyEffect.DENY,
)
```

DENY always wins. If both an ALLOW and a DENY match the same request, the request is rejected.

### Check a policy

```python
result = client.check_policy(
    subject="agent:scraper",
    action="call",
    resource="tool:write_file",
)
print(result.allowed)  # False
print(result.reason)   # 'denied by policy "deny-scraper-write"'
```

Use `check_policy` in your tool handlers before executing any action:

```python
def call_tool(agent_name: str, tool: str):
    result = client.check_policy(
        subject=f"agent:{agent_name}",
        action="call",
        resource=f"tool:{tool}",
    )
    if not result.allowed:
        raise PermissionError(result.reason)
    run_tool(tool)
```

### List and delete policies

```python
# All policies
policies = client.list_policies()

# Filtered by subject prefix
scraper_policies = client.list_policies(subject_prefix="agent:scraper")

# Delete by id
client.delete_policy("deny-scraper-write")
```

## Event Bus

The event bus is the reactive layer. Instead of polling `get_agent()` in a loop to detect deaths or checking election state on a timer, you subscribe once and receive events as they happen.

### watch()

```python
from agentkeeper import WatchEvent, AgentInfo, Lease

def on_event(event: WatchEvent):
    print(event.topic)      # e.g. "agent.died"
    print(event.sequence)   # globally monotonic — detect gaps for replay
    print(event.timestamp)  # datetime (UTC)
    print(event.payload)    # AgentInfo | Lease | ElectionState | None

# Subscribe to a topic prefix — "agent." matches agent.registered,
# agent.died, agent.unreachable
sub = client.watch(["agent."], on_event)

# Subscribe to multiple prefixes
sub = client.watch(["agent.", "lease."], on_event)

# Empty list = all topics
sub = client.watch([], on_event)

# Stop receiving events
sub.cancel()
```

Use as a context manager to automatically cancel on exit:

```python
with client.watch(["agent.died"], on_event):
    run_pipeline()
# subscription cancelled here
```

**Built-in topics and their payload types:**

| Topic | Payload type | Fired when |
|---|---|---|
| `agent.registered` | `AgentInfo` | An agent calls `register()` |
| `agent.unreachable` | `AgentInfo` | First missed heartbeat deadline |
| `agent.died` | `AgentInfo` | Agent dead — deregistered or heartbeat timeout |
| `lease.acquired` | `Lease` | An agent wins a lease |
| `lease.renewed` | `Lease` | An agent renews its lease |
| `lease.expired` | `Lease` | A lease TTL elapsed with no renewal |
| `lease.released` | `Lease` | An agent explicitly released its lease |
| `leader.changed` | `ElectionState` | Leadership changed in any election group |

### Practical orchestrator pattern

```python
import uuid
from agentkeeper import WatchEvent, AgentInfo, Lease

# task registry — in production this would be a database
active_tasks: dict[str, str] = {}  # task_id → agent_id

def on_agent_died(event: WatchEvent):
    dead_agent: AgentInfo = event.payload
    # find all tasks held by this agent and reassign them
    for task_id, holder in list(active_tasks.items()):
        if holder == dead_agent.agent_id:
            reassign(task_id)

def on_lease_expired(event: WatchEvent):
    lease: Lease = event.payload
    # a task went unfinished — put it back in the queue
    requeue(lease.task_id)

def on_event(event: WatchEvent):
    if event.topic == "agent.died":
        on_agent_died(event)
    elif event.topic == "lease.expired":
        on_lease_expired(event)

with client.watch(["agent.died", "lease.expired"], on_event):
    run_forever()
```

### watch_election()

Standby orchestrators use `watch_election()` to know the moment they become leader — no polling, no delay:

```python
my_agent_id = agent.agent_id

def on_leader_change(state):
    if state.leader_agent_id == my_agent_id:
        print(f"I am now the leader (term {state.term})")
        become_active()
    else:
        print(f"Standby — leader is {state.leader_agent_id}")
        become_standby()

with client.watch_election("orchestrator:pipeline-A", on_leader_change):
    # blocks or does standby work — on_leader_change fires whenever
    # leadership changes for this group
    wait_indefinitely()
```

## Data types

All methods return plain Python dataclasses, not proto objects.

```python
from agentkeeper import (
    AgentCard,      # name, description, version, tools, metadata
    AgentInfo,      # agent_id, card, status, registered_at, last_heartbeat_at
    AgentStatus,    # ALIVE | UNREACHABLE | DEAD
    Lease,          # task_id, holder_agent_id, token, acquired_at, expires_at
    ElectionState,  # group, leader_agent_id, term, elected_at
    Policy,         # id, subject, action, resource, effect
    PolicyEffect,   # ALLOW | DENY
    CheckResult,    # allowed, reason
)
```

## Error handling

The SDK surfaces gRPC errors as `grpc.RpcError`. The most common cases:

```python
import grpc

try:
    lease.renew()
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.PERMISSION_DENIED:
        print("Token mismatch — lease was taken by another agent")
    elif e.code() == grpc.StatusCode.NOT_FOUND:
        print("Agent or lease no longer exists")
    else:
        raise
```

## Running the tests

The test suite requires a running daemon:

```bash
# Terminal 1
./agentkeeper

# Terminal 2
cd sdk/python
uv run pytest tests/ -v
```
