Metadata-Version: 2.4
Name: aether-observer
Version: 0.1.0
Summary: Observability, drift detection, and guardrails for agentic workflows — backed by a temporal knowledge graph.
Author: Aether Team
License: MIT
Classifier: Development Status :: 3 - Alpha
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: System :: Monitoring
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: graphiti-core[kuzu]>=0.28.2
Requires-Dist: httpx>=0.28.1
Requires-Dist: numpy>=2.0.0
Requires-Dist: pydantic-settings>=2.6.1
Requires-Dist: pydantic>=2.11.5
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: sentence-transformers>=3.2.1
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest>=8.3.3; extra == 'dev'
Requires-Dist: ruff>=0.7.1; extra == 'dev'
Provides-Extra: neo4j
Requires-Dist: graphiti-core[neo4j]>=0.28.2; extra == 'neo4j'
Provides-Extra: server
Requires-Dist: fastapi>=0.115.0; extra == 'server'
Requires-Dist: uvicorn>=0.32.0; extra == 'server'
Description-Content-Type: text/markdown

# Aether Observer

Observability, drift detection, and guardrails for agentic workflows — backed by a temporal knowledge graph.

## Installation

```bash
pip install aether-observer
```

With the API server (FastAPI + Uvicorn):

```bash
pip install aether-observer[server]
```

## Quickstart

```python
import aether

aether.init(
    workflow_id="claims-triage",
    tenant_id="acme",
    observations_before_guardrails=3,
    max_observations=50,
    sliding_window_days=20,
)
aether.log_step(name="plan", kind="llm", model="gpt-4")
aether.log_step(name="policy_lookup", kind="tool", tool_name="policy_lookup")
result = aether.guardrail(open_browser=False)

print(result.risk_score, result.severity.value, len(result.signals))
```

No server required — `guardrail()` / `finish()` spin up the observability service
in-process via Graphiti/Kuzu. See `examples/quickstart.py` for a fuller
example with realistic agent steps.

### Context manager

```python
with aether.run(workflow_id="claims-triage", tenant_id="acme"):
    aether.log_step(name="plan", kind="llm", model="gpt-4")
    aether.log_step(name="execute", kind="tool", tool_name="claims_db")
```

### Async

```python
async with aether.arun(workflow_id="claims-triage", tenant_id="acme"):
    aether.log_step(name="plan", kind="llm", model="gpt-4")
```

### Dashboard

If the server extra is installed:

```bash
aether-observer serve --reload
# open http://127.0.0.1:8080/
```

## Publishing To PyPI

The repo now includes a GitHub Actions release workflow at
[.github/workflows/publish-pypi.yml](/Users/karman/Desktop/projects/aether/.github/workflows/publish-pypi.yml).

It uses PyPI Trusted Publishing through GitHub OIDC, which is the recommended
path for publishing from Actions because it avoids long-lived PyPI API tokens.

What it does:

1. builds the wheel and sdist
2. validates them with `twine check`
3. uploads the artifacts between jobs
4. publishes to PyPI from a protected `pypi` environment

Before it can publish successfully, configure the project on PyPI to trust this
workflow:

1. create the `aether-observer` project on PyPI if it does not exist yet
2. in PyPI, add a Trusted Publisher for this GitHub repository
3. set the workflow filename to `.github/workflows/publish-pypi.yml`
4. set the GitHub environment to `pypi`
5. in GitHub, create the `pypi` environment and add any required approvals

The workflow runs on:

- GitHub release publish
- manual `workflow_dispatch`

---

Aether Observer is a Graphiti-backed observability and governance layer for external agentic workflows. It answers four enterprise questions:

1. What did the agent do?
2. How did its behavior drift over time?
3. Why did that drift happen?
4. Did the run violate client-specific guardrails?

The system is local-first today:

- `Graphiti` for temporal graph memory and provenance
- `Kuzu` as the embedded graph backend for development
- `MLX` through an OpenAI-compatible server for local model testing
- `sentence-transformers` for local embeddings

The architecture is modular so the same product can later run with:

- managed LLMs
- Neo4j or another production graph backend
- custom workflow adapters for LangGraph, internal orchestrators, OTEL, queue workers, or agent platforms

## What The Product Does

Aether Observer sits beside an external workflow and normalizes its execution into a stable schema. It then:

1. stores the run as a Graphiti episode
2. builds graph facts for runs, steps, decisions, actions, tools, prompts, models, functions, and datasets
3. compares the current run against recent workflow history
4. detects drift
5. evaluates client-specific dynamic guardrails
6. stores drift alerts and guardrail incidents back into the graph
7. exposes everything through an API, CLI, and dashboard

This is not just telemetry storage. It is an observability agent that turns workflow traces into:

- temporal memory
- drift signals
- graph-backed explanations
- policy enforcement inputs

## Core Capabilities

### 1. Agentic Drift Detection

The system detects behavioral drift in terms that matter for agent systems:

- `decision_outcome_drift`
- `action_route_drift`
- `function_call_drift`
- `function_arguments_drift`
- `tool_sequence_drift`
- `prompt_drift`
- `model_drift`

This means it can tell when an agent starts:

- choosing different routes
- taking different actions
- calling different functions
- changing function argument shapes
- swapping prompts or models

### 2. Data Drift Detection

The system also tracks:

- `input_schema_drift`
- `output_schema_drift`
- `dataset_version_drift`
- `payload_volume_drift`

### 3. Operational Regression Detection

The system detects:

- `step_count_regression`
- `latency_regression`

### 4. Graph Explainability

For every important signal, Aether Observer can return:

- current behavior state
- baseline behavior state
- supporting Graphiti facts

Examples:

- current decision sequence vs baseline decision sequence
- current function route vs approved function route
- graph facts showing the exact step that made a different decision

### 5. Dynamic Client Guardrails

The system supports workflow-specific guardrail policies that can be:

- stored per tenant and workflow
- provided inline on a single run
- mixed with dynamic baseline expectations from recent stable history

Guardrails support three modes:

- `observe`
- `warn`
- `block`

Supported policy areas:

- allowed decisions
- allowed actions
- blocked actions
- approval-required actions
- allowed functions
- blocked functions
- function argument contracts
- allowed models
- allowed prompts
- allowed tools
- dataset version policies
- max step count
- max latency

Dynamic baseline options let a client say:

- "use my pinned policy"
- "also treat recent stable behavior as policy"

That makes the product useful for real client environments where some workflows are tightly pinned and others are baseline-driven.

## How Graphiti Is Used

Graphiti is not used here as a generic chat memory layer. It is used as the temporal provenance graph under the observability system.

The system stores:

- workflow runs
- workflow steps
- decisions
- selected actions
- candidate actions
- function calls
- function argument shapes
- prompts
- models
- tools
- datasets
- drift alerts
- guardrail policies
- guardrail incidents

This gives you:

- historical comparison
- tenant/workflow partitioning
- graph-backed explainability
- auditable policy history

## Architecture

### Runtime Flow

1. External workflow emits a run payload.
2. An adapter normalizes it into `WorkflowRun`.
3. The run is persisted into Graphiti and Kuzu.
4. Historical runs for the same workflow are loaded.
5. Drift features are extracted.
6. Drift detectors score the run against history.
7. Guardrail policies are loaded and evaluated.
8. Explanations are assembled from structured state plus graph facts.
9. Drift alerts and guardrail incidents are stored.
10. API and dashboard expose the result.

### Important Design Choice

For local development with smaller MLX models, the system uses deterministic Graphiti node and edge writes for workflow telemetry. That is intentional.

Reason:

- small local models are unreliable for Graphiti's more complex extraction schemas
- workflow traces are already structured
- deterministic graph writes are more appropriate for enterprise observability data

So Graphiti is still under the hood, but persistence is explicit and stable.

## Normalized Workflow Schema

The product expects a provider-agnostic workflow run schema. A simplified run looks like this:

```json
{
  "tenant_id": "acme",
  "workflow_id": "claims-triage",
  "run_id": "run-001",
  "agent_name": "triage-orchestrator",
  "environment": "dev",
  "status": "success",
  "started_at": "2026-04-12T07:58:00Z",
  "finished_at": "2026-04-12T07:58:02Z",
  "input_payload": {
    "claim_type": "auto",
    "region": "ca",
    "priority": "normal"
  },
  "output_payload": {
    "decision": "review",
    "confidence": 0.74
  },
  "datasets": [
    {
      "name": "claims_policy",
      "version": "2026-01",
      "schema_hash": "claims-policy-2026-01"
    }
  ],
  "steps": [
    {
      "step_id": "s1",
      "name": "plan",
      "kind": "llm",
      "status": "success",
      "model": "mlx-community/Qwen2.5-1.5B-Instruct-4bit",
      "prompt_version": "planner-v1",
      "decision_name": "routing_policy",
      "decision_value": "policy_lookup",
      "selected_action": "policy_lookup",
      "candidate_actions": ["policy_lookup", "web_search", "manual_review_router"],
      "function_calls": [
        {
          "function_name": "route_claim",
          "arguments": {
            "claim_type": "auto",
            "region": "ca",
            "priority": "normal",
            "policy_version": "2026-01"
          }
        }
      ]
    },
    {
      "step_id": "s2",
      "name": "policy_lookup",
      "kind": "tool",
      "status": "success",
      "tool_name": "policy_lookup",
      "selected_action": "policy_lookup",
      "function_calls": [
        {
          "function_name": "lookup_policy_rules",
          "arguments": {
            "claim_type": "auto",
            "region": "ca",
            "policy_version": "2026-01"
          }
        }
      ]
    }
  ]
}
```

## Dynamic Guardrail Policy Schema

Example client policy:

```json
{
  "policy_id": "claims-triage-dynamic-guardrails",
  "tenant_id": "acme",
  "workflow_id": "claims-triage",
  "mode": "warn",
  "allowed_models": ["mlx-community/Qwen2.5-1.5B-Instruct-4bit"],
  "allowed_prompts": ["planner-v1"],
  "allowed_tools": ["policy_lookup", "claims_db"],
  "allowed_actions": ["policy_lookup", "claims_db"],
  "require_approval_actions": ["manual_review_router"],
  "allowed_functions": [
    "route_claim",
    "lookup_policy_rules",
    "fetch_claim_history"
  ],
  "function_contracts": {
    "route_claim": {
      "required_keys": ["claim_type", "region", "priority", "policy_version"],
      "allowed_keys": ["claim_type", "region", "priority", "policy_version"],
      "allow_additional_keys": false
    }
  },
  "dataset_version_policies": {
    "claims_policy": ["2026-01"]
  },
  "max_step_count": 4,
  "max_latency_ms": 1800,
  "dynamic_baseline": {
    "decisions_from_history": true,
    "actions_from_history": true,
    "functions_from_history": true
  }
}
```

This policy means:

- models are pinned
- prompts are pinned
- tools and actions are pinned
- `manual_review_router` is not outright blocked, but it requires approval
- `route_claim` must follow a strict argument contract
- the dataset version is pinned
- dynamic baseline also contributes expected decisions/actions/functions

## Example Output

A drifted and policy-violating run returns:

- drift `signals`
- `graph_explanations`
- `guardrail_result`
- guardrail `violations`
- guardrail graph explanations

Typical guardrail verdict output:

```json
{
  "policy_id": "claims-triage-dynamic-guardrails",
  "mode": "warn",
  "verdict": "warn",
  "violations": [
    { "kind": "decision_guardrail_violation" },
    { "kind": "action_guardrail_violation" },
    { "kind": "approval_required_action" },
    { "kind": "function_guardrail_violation" },
    { "kind": "function_contract_violation" }
  ]
}
```

## API

### Health

- `GET /health`

### Observe Runs

- `POST /observe`

Query params:

- `adapter`
- `dry_run`

### Workflow Views

- `GET /workflows/{tenant_id}/{workflow_id}/history`
- `GET /workflows/{tenant_id}/{workflow_id}/alerts`
- `GET /facts`

### Guardrails

- `PUT /workflows/{tenant_id}/{workflow_id}/guardrails/policy`
- `GET /workflows/{tenant_id}/{workflow_id}/guardrails/policy`
- `GET /workflows/{tenant_id}/{workflow_id}/guardrails/incidents`

### Demo

- `POST /demo/seed`

The demo seeds:

- stable baseline runs
- a guardrail policy
- a drifted run that violates both behavior baselines and client policy

## CLI

The package exposes:

```bash
aether-observer serve --reload
aether-observer observe path/to/run.json
aether-observer history acme claims-triage --limit 20
```

## Dashboard

The FastAPI app serves a frontend at:

```text
http://127.0.0.1:8081/
```

The dashboard shows:

- health
- workflow history
- drift alert feed
- latest observation result
- graph explanations
- fact search
- inline payload editor
- inline guardrail policy example

## Quickstart

### 1. Start MLX

Example:

```bash
uv run python -m mlx_lm.server \
  --model mlx-community/Qwen2.5-1.5B-Instruct-4bit \
  --host 127.0.0.1 \
  --port 8000
```

### 2. Install Dependencies

```bash
uv sync
cp .env.example .env
```

### 3. Start The API

```bash
uv run uvicorn aether_observer.api:app --host 127.0.0.1 --port 8081
```

Or via CLI:

```bash
uv run aether-observer serve --reload
```

### 4. Open The Dashboard

```text
http://127.0.0.1:8081/
```

### 5. Seed A Demo Workflow

Use the dashboard or:

```bash
curl -X POST http://127.0.0.1:8081/demo/seed \
  -H 'Content-Type: application/json' \
  -d '{
    "tenant_id": "acme",
    "workflow_id": "claims-triage",
    "environment": "dev"
  }'
```

## Project Layout

- `src/aether_observer/api.py`: FastAPI API and dashboard routes
- `src/aether_observer/service.py`: orchestration layer for observation, drift, and guardrails
- `src/aether_observer/drift/`: feature extraction and drift detectors
- `src/aether_observer/guardrails/`: dynamic guardrail engine
- `src/aether_observer/graph_store.py`: Graphiti persistence for runs, alerts, policies, and incidents
- `src/aether_observer/runtime.py`: Graphiti runtime wiring
- `src/aether_observer/adapters/`: external workflow adapter boundary
- `src/aether_observer/static/`: dashboard frontend
- `src/aether_observer/demo.py`: demo data builders and scenario seeding
- `examples/simulate_runs.py`: simple local smoke example
- `tests/`: drift and guardrail tests

## Current Local Validation

The current local build has been exercised with:

- compile checks
- unit tests
- live API checks
- live dashboard checks
- live guardrail policy storage and retrieval
- live guardrail incident storage
- live drift and guardrail explainability output

## Enterprise Deployment Direction

This repo is intentionally shaped for enterprise rollout. The immediate next steps for production are:

1. replace local `Kuzu` with a production graph backend
2. add authenticated workflow adapters for real agent platforms
3. wire `block` mode into real execution control instead of post-run verdict only
4. add approval and waiver records for human review flows
5. add auth, rate limiting, webhooks, and tenant-facing policy management

## Why This Matters

Most agent observability tools stop at traces and token counts. This project is aimed at a more serious target:

- enterprise agent governance
- explainable behavioral drift
- policy-aware execution monitoring
- reusable client-specific guardrails

That is the actual product direction of this repository.
