Metadata-Version: 2.4
Name: aether-observer
Version: 0.1.1
Summary: Observability, drift detection, and guardrails for agentic workflows — backed by a temporal knowledge graph.
Author: Aether Team
License: MIT
Classifier: Development Status :: 3 - Alpha
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: System :: Monitoring
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: graphiti-core[kuzu]>=0.28.2
Requires-Dist: httpx>=0.28.1
Requires-Dist: numpy>=2.0.0
Requires-Dist: pydantic-settings>=2.6.1
Requires-Dist: pydantic>=2.11.5
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: sentence-transformers>=3.2.1
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest>=8.3.3; extra == 'dev'
Requires-Dist: ruff>=0.7.1; extra == 'dev'
Provides-Extra: langgraph
Requires-Dist: langchain-core>=0.3.0; extra == 'langgraph'
Requires-Dist: langgraph>=1.0.0; extra == 'langgraph'
Provides-Extra: neo4j
Requires-Dist: graphiti-core[neo4j]>=0.28.2; extra == 'neo4j'
Provides-Extra: openai-agents
Requires-Dist: openai-agents>=0.13.0; extra == 'openai-agents'
Provides-Extra: server
Requires-Dist: fastapi>=0.115.0; extra == 'server'
Requires-Dist: uvicorn>=0.32.0; extra == 'server'
Description-Content-Type: text/markdown

# Aether Observer

Observability, drift detection, and guardrails for agentic workflows — backed by a temporal knowledge graph.

## Installation

```bash
pip install aether-observer
```

With the API server (FastAPI + Uvicorn):

```bash
pip install aether-observer[server]
```

With LangGraph helpers:

```bash
pip install "aether-observer[langgraph]"
```

With OpenAI Agents helpers:

```bash
pip install "aether-observer[openai-agents]"
```

## Quickstart

```python
import aether

aether.init(
    workflow_id="claims-triage",
    tenant_id="acme",
    observations_before_guardrails=3,
    max_observations=50,
    sliding_window_days=20,
)
aether.log_step(name="plan", kind="llm", model="gpt-4")
aether.log_step(name="policy_lookup", kind="tool", tool_name="policy_lookup")
result = aether.guardrail(open_browser=False)

print(result.risk_score, result.severity.value, len(result.signals))
```

No server required — `guardrail()` / `finish()` spin up the observability service
in-process via Graphiti/Kuzu. See `examples/quickstart.py` for a fuller
example with realistic agent steps.

### Hosted Monitoring

The SDK can also mirror finished observations into a hosted monitoring stack.

Recommended shape:

- SDK writes finished observations to `https://your-vercel-app.vercel.app/api/ingest`
- Vercel writes those records into Supabase
- the Vercel frontend reads from Supabase and shows experiments/runs

The intended setup is env-based, so application code does not need to carry Vercel URLs or credentials:

```bash
export AETHER_MONITORING_ENABLED=true
export AETHER_MONITORING_DASHBOARD_URL="https://your-vercel-app.vercel.app"
export AETHER_MONITORING_USERNAME="demo"
export AETHER_MONITORING_PASSWORD="replace-me"
```

Then your SDK code stays simple:

```python
import aether

aether.init(
    workflow_id="claims-triage",
    tenant_id="acme"
)
```

Scaffold files:

- [HOSTED_MONITORING.md](/Users/karman/Desktop/projects/aether/HOSTED_MONITORING.md)
- [Supabase schema](/Users/karman/Desktop/projects/aether/supabase/monitoring_schema.sql)
- [Vercel monitoring app](</Users/karman/Desktop/projects/aether/apps/vercel-monitoring/package.json>)

### Context manager

```python
with aether.run(workflow_id="claims-triage", tenant_id="acme"):
    aether.log_step(name="plan", kind="llm", model="gpt-4")
    aether.log_step(name="execute", kind="tool", tool_name="claims_db")
```

### Async

```python
async with aether.arun(workflow_id="claims-triage", tenant_id="acme"):
    aether.log_step(name="plan", kind="llm", model="gpt-4")
```

### LangGraph

Users can keep normal LangGraph code and call Aether directly inside nodes:

```python
import aether
from langgraph.graph import END, START, StateGraph

async def planner(state):
    aether.log(name="planner", kind="llm", input_payload=state)
    return {"plan": "route to support"}

async def responder(state):
    aether.log(name="responder", kind="llm", input_payload=state)
    return {"answer": "done"}

aether.init(workflow_id="support-agent", tenant_id="acme")
result = await graph.ainvoke({"question": "help"})
await aether.aguardrail(open_browser=False)
```

There is also an optional wrapper if you want one-call execution:

```python
from aether.langgraph import ainvoke

result = await ainvoke(
    graph,
    {"question": "help"},
    workflow_id="support-agent",
    tenant_id="acme",
)
```

See [examples/langgraph_quickstart.py](/Users/karman/Desktop/projects/aether/examples/langgraph_quickstart.py).

### OpenAI Agents SDK

The OpenAI Agents SDK already exposes run lifecycle hooks through `Runner.run(...)`, `Runner.run_sync(...)`, and `Runner.run_streamed(...)`. Aether now plugs into that surface directly.

You can keep native OpenAI Agents code and add Aether hooks:

```python
import aether
from agents import Agent, Runner
from aether.openai_agents import AetherOpenAIAgentsHooks

agent = Agent(name="Assistant", instructions="Be concise.")

aether.init(workflow_id="support-agent", tenant_id="acme")
result = await Runner.run(
    agent,
    "How do I reset my password?",
    hooks=AetherOpenAIAgentsHooks(),
)
await aether.aguardrail(open_browser=False)
```

Or use the one-call wrapper:

```python
from agents import Agent
from aether.openai_agents import run

agent = Agent(name="Assistant", instructions="Be concise.")
result = await run(
    agent,
    "How do I reset my password?",
    workflow_id="support-agent",
    tenant_id="acme",
)
```

See [examples/openai_agents_quickstart.py](/Users/karman/Desktop/projects/aether/examples/openai_agents_quickstart.py).

### Dashboard

If the server extra is installed:

```bash
aether-observer serve --reload
# open http://127.0.0.1:8080/
```

## Publishing To PyPI

The repo now includes two GitHub Actions workflows:

- [Cut Release](/Users/karman/Desktop/projects/aether/.github/workflows/cut-release.yml)
- [Publish To PyPI](/Users/karman/Desktop/projects/aether/.github/workflows/publish-pypi.yml)

The intended flow is:

1. run `Cut Release`
2. it reuses the current version if it has not been tagged yet, otherwise it bumps `patch`, `minor`, or `major`
3. it updates [pyproject.toml](/Users/karman/Desktop/projects/aether/pyproject.toml), creates a `vX.Y.Z` tag, and opens a GitHub release
4. the GitHub release automatically triggers `Publish To PyPI`

This avoids re-uploading the same filename to PyPI, which is what caused the earlier `400 File already exists` failure.

`Publish To PyPI` uses PyPI Trusted Publishing through GitHub OIDC, which is the recommended
path for publishing from Actions because it avoids long-lived PyPI API tokens.

What it does:

1. builds the wheel and sdist
2. validates them with `twine check`
3. uploads the artifacts between jobs
4. verifies the release tag matches the package version
5. publishes to PyPI from a protected `pypi` environment

Before it can publish successfully, configure the project on PyPI to trust this
workflow:

1. create the `aether-observer` project on PyPI if it does not exist yet
2. in PyPI, add a Trusted Publisher for this GitHub repository
3. set the workflow filename to `.github/workflows/publish-pypi.yml`
4. set the GitHub environment to `pypi`
5. in GitHub, create the `pypi` environment and add any required approvals

After that, you should normally release from the `Cut Release` workflow rather than manually dispatching `Publish To PyPI`.

---

Aether Observer is a Graphiti-backed observability and governance layer for external agentic workflows. It answers four enterprise questions:

1. What did the agent do?
2. How did its behavior drift over time?
3. Why did that drift happen?
4. Did the run violate client-specific guardrails?

The system is local-first today:

- `Graphiti` for temporal graph memory and provenance
- `Kuzu` as the embedded graph backend for development
- `MLX` through an OpenAI-compatible server for local model testing
- `sentence-transformers` for local embeddings

The architecture is modular so the same product can later run with:

- managed LLMs
- Neo4j or another production graph backend
- custom workflow adapters for LangGraph, internal orchestrators, OTEL, queue workers, or agent platforms

## What The Product Does

Aether Observer sits beside an external workflow and normalizes its execution into a stable schema. It then:

1. stores the run as a Graphiti episode
2. builds graph facts for runs, steps, decisions, actions, tools, prompts, models, functions, and datasets
3. compares the current run against recent workflow history
4. detects drift
5. evaluates client-specific dynamic guardrails
6. stores drift alerts and guardrail incidents back into the graph
7. exposes everything through an API, CLI, and dashboard

This is not just telemetry storage. It is an observability agent that turns workflow traces into:

- temporal memory
- drift signals
- graph-backed explanations
- policy enforcement inputs

## Core Capabilities

### 1. Agentic Drift Detection

The system detects behavioral drift in terms that matter for agent systems:

- `decision_outcome_drift`
- `action_route_drift`
- `function_call_drift`
- `function_arguments_drift`
- `tool_sequence_drift`
- `prompt_drift`
- `model_drift`

This means it can tell when an agent starts:

- choosing different routes
- taking different actions
- calling different functions
- changing function argument shapes
- swapping prompts or models

### 2. Data Drift Detection

The system also tracks:

- `input_schema_drift`
- `output_schema_drift`
- `dataset_version_drift`
- `payload_volume_drift`

### 3. Operational Regression Detection

The system detects:

- `step_count_regression`
- `latency_regression`

### 4. Graph Explainability

For every important signal, Aether Observer can return:

- current behavior state
- baseline behavior state
- supporting Graphiti facts

Examples:

- current decision sequence vs baseline decision sequence
- current function route vs approved function route
- graph facts showing the exact step that made a different decision

### 5. Dynamic Client Guardrails

The system supports workflow-specific guardrail policies that can be:

- stored per tenant and workflow
- provided inline on a single run
- mixed with dynamic baseline expectations from recent stable history

Guardrails support three modes:

- `observe`
- `warn`
- `block`

Supported policy areas:

- allowed decisions
- allowed actions
- blocked actions
- approval-required actions
- allowed functions
- blocked functions
- function argument contracts
- allowed models
- allowed prompts
- allowed tools
- dataset version policies
- max step count
- max latency

Dynamic baseline options let a client say:

- "use my pinned policy"
- "also treat recent stable behavior as policy"

That makes the product useful for real client environments where some workflows are tightly pinned and others are baseline-driven.

## How Graphiti Is Used

Graphiti is not used here as a generic chat memory layer. It is used as the temporal provenance graph under the observability system.

The system stores:

- workflow runs
- workflow steps
- decisions
- selected actions
- candidate actions
- function calls
- function argument shapes
- prompts
- models
- tools
- datasets
- drift alerts
- guardrail policies
- guardrail incidents

This gives you:

- historical comparison
- tenant/workflow partitioning
- graph-backed explainability
- auditable policy history

## Architecture

### Runtime Flow

1. External workflow emits a run payload.
2. An adapter normalizes it into `WorkflowRun`.
3. The run is persisted into Graphiti and Kuzu.
4. Historical runs for the same workflow are loaded.
5. Drift features are extracted.
6. Drift detectors score the run against history.
7. Guardrail policies are loaded and evaluated.
8. Explanations are assembled from structured state plus graph facts.
9. Drift alerts and guardrail incidents are stored.
10. API and dashboard expose the result.

### Important Design Choice

For local development with smaller MLX models, the system uses deterministic Graphiti node and edge writes for workflow telemetry. That is intentional.

Reason:

- small local models are unreliable for Graphiti's more complex extraction schemas
- workflow traces are already structured
- deterministic graph writes are more appropriate for enterprise observability data

So Graphiti is still under the hood, but persistence is explicit and stable.

## Normalized Workflow Schema

The product expects a provider-agnostic workflow run schema. A simplified run looks like this:

```json
{
  "tenant_id": "acme",
  "workflow_id": "claims-triage",
  "run_id": "run-001",
  "agent_name": "triage-orchestrator",
  "environment": "dev",
  "status": "success",
  "started_at": "2026-04-12T07:58:00Z",
  "finished_at": "2026-04-12T07:58:02Z",
  "input_payload": {
    "claim_type": "auto",
    "region": "ca",
    "priority": "normal"
  },
  "output_payload": {
    "decision": "review",
    "confidence": 0.74
  },
  "datasets": [
    {
      "name": "claims_policy",
      "version": "2026-01",
      "schema_hash": "claims-policy-2026-01"
    }
  ],
  "steps": [
    {
      "step_id": "s1",
      "name": "plan",
      "kind": "llm",
      "status": "success",
      "model": "mlx-community/Qwen2.5-1.5B-Instruct-4bit",
      "prompt_version": "planner-v1",
      "decision_name": "routing_policy",
      "decision_value": "policy_lookup",
      "selected_action": "policy_lookup",
      "candidate_actions": ["policy_lookup", "web_search", "manual_review_router"],
      "function_calls": [
        {
          "function_name": "route_claim",
          "arguments": {
            "claim_type": "auto",
            "region": "ca",
            "priority": "normal",
            "policy_version": "2026-01"
          }
        }
      ]
    },
    {
      "step_id": "s2",
      "name": "policy_lookup",
      "kind": "tool",
      "status": "success",
      "tool_name": "policy_lookup",
      "selected_action": "policy_lookup",
      "function_calls": [
        {
          "function_name": "lookup_policy_rules",
          "arguments": {
            "claim_type": "auto",
            "region": "ca",
            "policy_version": "2026-01"
          }
        }
      ]
    }
  ]
}
```

## Dynamic Guardrail Policy Schema

Example client policy:

```json
{
  "policy_id": "claims-triage-dynamic-guardrails",
  "tenant_id": "acme",
  "workflow_id": "claims-triage",
  "mode": "warn",
  "allowed_models": ["mlx-community/Qwen2.5-1.5B-Instruct-4bit"],
  "allowed_prompts": ["planner-v1"],
  "allowed_tools": ["policy_lookup", "claims_db"],
  "allowed_actions": ["policy_lookup", "claims_db"],
  "require_approval_actions": ["manual_review_router"],
  "allowed_functions": [
    "route_claim",
    "lookup_policy_rules",
    "fetch_claim_history"
  ],
  "function_contracts": {
    "route_claim": {
      "required_keys": ["claim_type", "region", "priority", "policy_version"],
      "allowed_keys": ["claim_type", "region", "priority", "policy_version"],
      "allow_additional_keys": false
    }
  },
  "dataset_version_policies": {
    "claims_policy": ["2026-01"]
  },
  "max_step_count": 4,
  "max_latency_ms": 1800,
  "dynamic_baseline": {
    "decisions_from_history": true,
    "actions_from_history": true,
    "functions_from_history": true
  }
}
```

This policy means:

- models are pinned
- prompts are pinned
- tools and actions are pinned
- `manual_review_router` is not outright blocked, but it requires approval
- `route_claim` must follow a strict argument contract
- the dataset version is pinned
- dynamic baseline also contributes expected decisions/actions/functions

## Example Output

A drifted and policy-violating run returns:

- drift `signals`
- `graph_explanations`
- `guardrail_result`
- guardrail `violations`
- guardrail graph explanations

Typical guardrail verdict output:

```json
{
  "policy_id": "claims-triage-dynamic-guardrails",
  "mode": "warn",
  "verdict": "warn",
  "violations": [
    { "kind": "decision_guardrail_violation" },
    { "kind": "action_guardrail_violation" },
    { "kind": "approval_required_action" },
    { "kind": "function_guardrail_violation" },
    { "kind": "function_contract_violation" }
  ]
}
```

## API

### Health

- `GET /health`

### Observe Runs

- `POST /observe`

Query params:

- `adapter`
- `dry_run`

### Workflow Views

- `GET /workflows/{tenant_id}/{workflow_id}/history`
- `GET /workflows/{tenant_id}/{workflow_id}/alerts`
- `GET /facts`

### Guardrails

- `PUT /workflows/{tenant_id}/{workflow_id}/guardrails/policy`
- `GET /workflows/{tenant_id}/{workflow_id}/guardrails/policy`
- `GET /workflows/{tenant_id}/{workflow_id}/guardrails/incidents`

### Demo

- `POST /demo/seed`

The demo seeds:

- stable baseline runs
- a guardrail policy
- a drifted run that violates both behavior baselines and client policy

## CLI

The package exposes:

```bash
aether-observer serve --reload
aether-observer observe path/to/run.json
aether-observer history acme claims-triage --limit 20
```

## Dashboard

The FastAPI app serves a frontend at:

```text
http://127.0.0.1:8081/
```

The dashboard shows:

- health
- workflow history
- drift alert feed
- latest observation result
- graph explanations
- fact search
- inline payload editor
- inline guardrail policy example

## Quickstart

### 1. Start MLX

Example:

```bash
uv run python -m mlx_lm.server \
  --model mlx-community/Qwen2.5-1.5B-Instruct-4bit \
  --host 127.0.0.1 \
  --port 8000
```

### 2. Install Dependencies

```bash
uv sync
cp .env.example .env
```

### 3. Start The API

```bash
uv run uvicorn aether_observer.api:app --host 127.0.0.1 --port 8081
```

Or via CLI:

```bash
uv run aether-observer serve --reload
```

### 4. Open The Dashboard

```text
http://127.0.0.1:8081/
```

### 5. Seed A Demo Workflow

Use the dashboard or:

```bash
curl -X POST http://127.0.0.1:8081/demo/seed \
  -H 'Content-Type: application/json' \
  -d '{
    "tenant_id": "acme",
    "workflow_id": "claims-triage",
    "environment": "dev"
  }'
```

## Project Layout

- `src/aether_observer/api.py`: FastAPI API and dashboard routes
- `src/aether_observer/service.py`: orchestration layer for observation, drift, and guardrails
- `src/aether_observer/drift/`: feature extraction and drift detectors
- `src/aether_observer/guardrails/`: dynamic guardrail engine
- `src/aether_observer/graph_store.py`: Graphiti persistence for runs, alerts, policies, and incidents
- `src/aether_observer/runtime.py`: Graphiti runtime wiring
- `src/aether_observer/adapters/`: external workflow adapter boundary
- `src/aether_observer/static/`: dashboard frontend
- `src/aether_observer/demo.py`: demo data builders and scenario seeding
- `examples/simulate_runs.py`: simple local smoke example
- `tests/`: drift and guardrail tests

## Current Local Validation

The current local build has been exercised with:

- compile checks
- unit tests
- live API checks
- live dashboard checks
- live guardrail policy storage and retrieval
- live guardrail incident storage
- live drift and guardrail explainability output

## Enterprise Deployment Direction

This repo is intentionally shaped for enterprise rollout. The immediate next steps for production are:

1. replace local `Kuzu` with a production graph backend
2. add authenticated workflow adapters for real agent platforms
3. wire `block` mode into real execution control instead of post-run verdict only
4. add approval and waiver records for human review flows
5. add auth, rate limiting, webhooks, and tenant-facing policy management

## Why This Matters

Most agent observability tools stop at traces and token counts. This project is aimed at a more serious target:

- enterprise agent governance
- explainable behavioral drift
- policy-aware execution monitoring
- reusable client-specific guardrails

That is the actual product direction of this repository.
