Metadata-Version: 2.4
Name: agentcop
Version: 0.4.2
Summary: Universal forensic auditor for agent systems — OTel-aligned event schema, pluggable violation detectors
Author: Shay Mizuno, Hind Tagmouti
License: MIT
Project-URL: Homepage, https://github.com/trusthandoff/agentcop
Project-URL: Repository, https://github.com/trusthandoff/agentcop
Keywords: agent,audit,forensics,security,observability,llm,opentelemetry,violation,detector
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Topic :: Security
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Monitoring
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Typing :: Typed
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pydantic<3,>=2.7
Provides-Extra: otel
Requires-Dist: opentelemetry-sdk>=1.20; extra == "otel"
Provides-Extra: langgraph
Requires-Dist: langgraph>=0.2; extra == "langgraph"
Provides-Extra: crewai
Requires-Dist: crewai>=0.80; extra == "crewai"
Provides-Extra: autogen
Requires-Dist: pyautogen>=0.2; extra == "autogen"
Provides-Extra: llamaindex
Requires-Dist: llama-index-core>=0.10; extra == "llamaindex"
Provides-Extra: haystack
Requires-Dist: haystack-ai>=2.0; extra == "haystack"
Provides-Extra: semantic-kernel
Requires-Dist: semantic-kernel>=1.0; extra == "semantic-kernel"
Provides-Extra: langfuse
Requires-Dist: langfuse>=4.0; extra == "langfuse"
Provides-Extra: langsmith
Requires-Dist: langsmith>=0.1; extra == "langsmith"
Provides-Extra: ddtrace
Requires-Dist: ddtrace>=1.0; extra == "ddtrace"
Provides-Extra: watchdog
Requires-Dist: watchdog>=3.0; extra == "watchdog"
Provides-Extra: dev
Requires-Dist: pytest; extra == "dev"
Dynamic: license-file

# agentcop — The Agent Cop

[![CI](https://github.com/trusthandoff/agentcop/actions/workflows/test.yml/badge.svg)](https://github.com/trusthandoff/agentcop/actions/workflows/test.yml)
[![PyPI](https://img.shields.io/pypi/v/agentcop)](https://pypi.org/project/agentcop/)
[![Python](https://img.shields.io/pypi/pyversions/agentcop)](https://pypi.org/project/agentcop/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)

**The cop for agent fleets.**

Every agent fleet needs a cop. Agents delegate, handoff, and execute — and without forensic oversight, violations are invisible until they're incidents. `agentcop` is a universal auditor: ingest events from any agent system, run violation detectors, get structured findings.

OTel-aligned schema. Pluggable detectors. Adapter bridge to your stack. Zero required infrastructure.

```
pip install agentcop
```

---

## Adapters

Nine adapters are available — install only what you need:

| Adapter | Framework | Install |
|---|---|---|
| [LangGraph](docs/adapters/langgraph.md) | LangGraph graph nodes & edges | `pip install agentcop[langgraph]` |
| [LangSmith](docs/adapters/langsmith.md) | LangSmith run tracing | `pip install agentcop[langsmith]` |
| [Langfuse](docs/adapters/langfuse.md) | Langfuse 4.x observations | `pip install agentcop[langfuse]` |
| [Datadog](docs/adapters/datadog.md) | ddtrace APM spans | `pip install agentcop[ddtrace]` |
| [Haystack](docs/adapters/haystack.md) | Haystack pipeline components | `pip install agentcop[haystack]` |
| [Semantic Kernel](docs/adapters/semantic_kernel.md) | Semantic Kernel filters | `pip install agentcop[semantic-kernel]` |
| [LlamaIndex](docs/adapters/llamaindex.md) | LlamaIndex pipeline events | `pip install agentcop[llamaindex]` |
| [CrewAI](docs/adapters/crewai.md) | CrewAI agent & task events | `pip install agentcop[crewai]` |
| [AutoGen](docs/adapters/autogen.md) | AutoGen agent messages | `pip install agentcop[autogen]` |

---

## How it works

```
your agent system
      │
      ▼
 SentinelAdapter          ← translate domain events to universal schema
      │
      ▼
  Sentinel.ingest()       ← load SentinelEvents into the auditor
      │
      ▼
  detect_violations()     ← run detectors, get ViolationRecords
      │
      ▼
  report() / your sink    ← stdout, OTel, alerting, whatever
```

---

## Quickstart

```python
from agentcop import Sentinel, SentinelEvent

sentinel = Sentinel()

# Feed it events (any source, any schema — adapt first)
sentinel.ingest([
    SentinelEvent(
        event_id="evt-001",
        event_type="packet_rejected",
        timestamp="2026-03-31T12:00:00Z",
        severity="ERROR",
        body="packet rejected — TTL expired",
        source_system="my-agent",
        attributes={"packet_id": "pkt-abc", "reason": "ttl_expired"},
    )
])

violations = sentinel.detect_violations()
# [ViolationRecord(violation_type='rejected_packet', severity='ERROR', ...)]

sentinel.report()
# [ERROR] rejected_packet — packet rejected — TTL expired
#   packet_id: pkt-abc
#   reason: ttl_expired
```

Built-in detectors fire on four event types out of the box:

| `event_type`            | Detector                      | Severity |
|-------------------------|-------------------------------|----------|
| `packet_rejected`       | `detect_rejected_packet`      | ERROR    |
| `capability_stale`      | `detect_stale_capability`     | ERROR    |
| `token_overlap_used`    | `detect_overlap_window`       | WARN     |
| `ai_generated_payload`  | `detect_ai_generated_payload` | WARN     |

---

## Custom detectors

Detectors are plain functions. Register as many as you need.

```python
from agentcop import Sentinel, SentinelEvent, ViolationRecord
from typing import Optional

def detect_unauthorized_tool(event: SentinelEvent) -> Optional[ViolationRecord]:
    if event.event_type != "tool_call":
        return None
    if event.attributes.get("tool") in {"shell", "fs_write"}:
        return ViolationRecord(
            violation_type="unauthorized_tool",
            severity="CRITICAL",
            source_event_id=event.event_id,
            trace_id=event.trace_id,
            detail={"tool": event.attributes["tool"]},
        )

sentinel = Sentinel()
sentinel.register_detector(detect_unauthorized_tool)
```

---

## TrustHandoff adapter

[TrustHandoff](https://github.com/trusthandoff/trusthandoff) ships a first-class adapter. If you're using `trusthandoff` for cryptographic delegation, plug it in directly:

```python
from trusthandoff.sentinel_adapter import TrustHandoffSentinelAdapter
from agentcop import Sentinel

adapter = TrustHandoffSentinelAdapter()
sentinel = Sentinel()

# raw_events: list of dicts from trusthandoff's forensic log
sentinel.ingest(adapter.to_sentinel_event(e) for e in raw_events)

violations = sentinel.detect_violations()
sentinel.report()
```

The adapter maps trusthandoff's event fields — `packet_id`, `correlation_id`, `reason`, `event_type` — to the universal `SentinelEvent` schema. Severity is inferred from event type. Everything else lands in `attributes`.

---

## Write your own adapter

Implement the `SentinelAdapter` protocol to bridge any system:

```python
from agentcop import SentinelAdapter, SentinelEvent
from typing import Dict, Any

class MySystemAdapter:
    source_system = "my-system"

    def to_sentinel_event(self, raw: Dict[str, Any]) -> SentinelEvent:
        return SentinelEvent(
            event_id=raw["id"],
            event_type=raw["type"],
            timestamp=raw["ts"],
            severity=raw.get("level", "INFO"),
            body=raw.get("message", ""),
            source_system=self.source_system,
            trace_id=raw.get("trace_id"),
            attributes=raw.get("metadata", {}),
        )
```

---

## LangGraph integration

Plug into any LangGraph graph with zero changes to your graph code. The adapter reads the debug event stream — node starts, node results, checkpoint saves — and translates each into a `SentinelEvent` for violation detection.

```
pip install agentcop[langgraph]
```

Stream a graph in `debug` mode and pipe every event through the adapter:

```python
from agentcop import Sentinel
from agentcop.adapters.langgraph import LangGraphSentinelAdapter

adapter = LangGraphSentinelAdapter(thread_id="run-abc")
sentinel = Sentinel()

sentinel.ingest(
    adapter.iter_events(
        graph.stream({"input": "..."}, config, stream_mode="debug")
    )
)

violations = sentinel.detect_violations()
sentinel.report()
```

Three LangGraph debug event types are translated:

| LangGraph event  | SentinelEvent type        | Severity |
|------------------|---------------------------|----------|
| `task`           | `node_start`              | INFO     |
| `task_result`    | `node_end`                | INFO     |
| `task_result`    | `node_error` (if errored) | ERROR    |
| `checkpoint`     | `checkpoint_saved`        | INFO     |

Each event carries structured `attributes` — `node`, `task_id`, `step`, `triggers`, `checkpoint_id`, `next` — so you can write targeted violation detectors:

```python
from agentcop import ViolationRecord

def detect_node_failure(event):
    if event.event_type == "node_error":
        return ViolationRecord(
            violation_type="node_execution_failed",
            severity="ERROR",
            source_event_id=event.event_id,
            trace_id=event.trace_id,
            detail={
                "node": event.attributes["node"],
                "error": event.attributes["error"],
            },
        )

sentinel = Sentinel(detectors=[detect_node_failure])
```

The `thread_id` passed to `LangGraphSentinelAdapter` is used as `trace_id` on every event, correlating all events from a single graph run.

---

## OpenTelemetry export *(optional)*

`agentcop` events use an OTel-aligned schema out of the box (`trace_id`, `span_id`, severity levels). To export events as OTel log records:

```
pip install agentcop[otel]
```

```python
from agentcop.otel import OtelSentinelExporter
from opentelemetry.sdk._logs import LoggerProvider

exporter = OtelSentinelExporter(logger_provider=LoggerProvider())
exporter.export(events)
```

Attributes are emitted under the `sentinel.*` namespace. `trace_id` and `span_id` are mapped to OTel trace context.

---

## Requirements

- Python 3.11+
- `pydantic>=2.7`

---

## License

MIT
