Metadata-Version: 2.4
Name: agent-memory-layer
Version: 0.1.0
Summary: A reusable memory layer for SAP agentic workflows
Project-URL: Changelog, https://github.com/anthropics/agent-memory-layer/blob/main/CHANGELOG.md
Author-email: Kunal Kumar <kunal.kumar03@sap.com>
License: Apache-2.0
License-File: LICENSE
Keywords: agents,ai-core,consolidation,langgraph,memory,sap
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.11
Requires-Dist: aiosqlite>=0.20
Requires-Dist: apscheduler>=3.10
Requires-Dist: asyncpg>=0.29
Requires-Dist: pydantic-settings>=2.0
Requires-Dist: pydantic>=2.0
Requires-Dist: sap-ai-sdk-gen[all]
Provides-Extra: dev
Requires-Dist: hatch; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Provides-Extra: langgraph
Requires-Dist: langgraph>=0.1; extra == 'langgraph'
Provides-Extra: redis
Requires-Dist: redis>=5.0; extra == 'redis'
Description-Content-Type: text/markdown

# agent-memory-layer

A reusable memory layer for agentic workflows built on LangGraph and SAP AI Core.

It gives your agents persistent memory across sessions, so facts learned in one conversation can be injected into the next.

## How It Works

```text
Session starts
  → inject_node fetches relevant facts from PostgreSQL
  → Facts added to agent state as working_memory

Session runs
  → Agent responds using injected context

Session ends
  → record_node saves the conversation as an EpisodeRecord

Background (periodic)
  → Consolidation pipeline processes unprocessed episodes
  → LLM extracts reusable facts via SAP AI Core
  → MemoryItems written to PostgreSQL with importance scores
  → Importance decaying pipeline runs every day once
  → Memories below threshold are automatically deleted
```

## Installation

Install the core package:

```bash
pip install agent-memory-layer
```

If you are integrating with LangGraph, install the optional extra:

```bash
pip install "agent-memory-layer[langgraph]"
```

## Requirements

- Python 3.11+
- PostgreSQL (local Docker or managed instance)
- SAP AI Core account with a deployed LLM model

## Quick Start

### 1. Set Up PostgreSQL

The repo includes a `docker-compose.yml` for local development:

```bash
docker compose up -d
```

This starts PostgreSQL on `localhost:5432` with:
- User: `memory_user`
- Password: `memory_pass`
- Database: `agent_memory`

### 2. Configure Credentials

Copy the example env file and fill in your SAP AI Core credentials:

```bash
cp .env.example .env
```

Edit `.env`:

```env
AICORE_CLIENT_ID=your-client-id
AICORE_CLIENT_SECRET=your-client-secret
AICORE_AUTH_URL=https://your-tenant.authentication.eu10.hana.ondemand.com
AICORE_BASE_URL=https://api.ai.prod.eu-central-1.aws.ml.hana.ondemand.com/v2
```

### 3. Use MemoryManager (Recommended)

`MemoryManager` is the simplest way to use the library. It handles database setup, node creation, and the background scheduler in one object.

```python
import asyncio
from agent_memory_layer import MemoryManager

async def main():
    # Create and start — connects to DB, runs migrations, starts scheduler
    manager = await MemoryManager.create(
        db_url="postgresql://memory_user:memory_pass@localhost:5432/agent_memory",
        aicore_client_id="...",
        aicore_client_secret="...",
        aicore_auth_url="...",
        aicore_base_url="...",
    )

    # Use manager.inject_node and manager.record_node in your LangGraph
    # (see "Wire Into LangGraph" below)

    # When your app shuts down
    await manager.close()

asyncio.run(main())
```

You can also create and start in two steps if you need to configure things between construction and startup:

```python
manager = MemoryManager(
    db_url="postgresql://memory_user:memory_pass@localhost:5432/agent_memory",
    aicore_client_id="...",
    aicore_client_secret="...",
    aicore_auth_url="...",
    aicore_base_url="...",
)
await manager.start()
```

### 4. Wire Into LangGraph

```python
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage
from typing import Annotated, Optional
from typing_extensions import TypedDict

# Define your state — must include these keys for memory to work
class AgentState(TypedDict, total=False):
    messages: Annotated[list[BaseMessage], add_messages]
    user_id: str
    agent_id: str
    workflow_id: Optional[str]
    working_memory: dict  # populated by inject_node

# Build the graph
graph = StateGraph(AgentState)

graph.add_node("inject_memory", manager.inject_node)
graph.add_node("agent", your_agent_node)
graph.add_node("record_memory", manager.record_node)

graph.set_entry_point("inject_memory")
graph.add_edge("inject_memory", "agent")
graph.add_edge("agent", "record_memory")
graph.add_edge("record_memory", END)

app = graph.compile()

# Invoke the graph
result = await app.ainvoke({
    "user_id": "user-123",
    "agent_id": "support-bot",
    "workflow_id": None,
    "messages": [HumanMessage(content="Our S4 system keeps timing out on month-end close")],
})
```

### 5. Access Injected Memories in Your Agent Node

After `inject_node` runs, your agent node can read the injected facts from `state["working_memory"]`:

```python
async def your_agent_node(state: AgentState) -> dict:
    working_memory = state.get("working_memory", {})
    injected_facts = working_memory.get("injected_facts", [])

    # Build a system prompt that includes the memories
    system = "You are a helpful assistant."
    if injected_facts:
        system += "\n\nYou know these facts about the user:\n"
        system += "\n".join(f"- {fact}" for fact in injected_facts)

    # Use the system prompt with your LLM call
    # ...

    return {"messages": [AIMessage(content=response)]}
```

### 6. Important: Message Format for record_node

`record_node` expects `messages` to be a `list[dict]` (e.g., `[{"role": "user", "content": "..."}]`). If your LangGraph uses the `add_messages` reducer (which stores `BaseMessage` objects), you need to convert them before they reach `record_node`. Wrap it like this:

```python
from langchain_core.messages import BaseMessage

def build_record_wrapper(record_node):
    async def wrapper(state):
        messages = state.get("messages", [])
        serialized = []
        for m in messages:
            if isinstance(m, BaseMessage):
                serialized.append({"role": m.type, "content": m.content})
            elif isinstance(m, dict):
                serialized.append(m)
        return await record_node({**state, "messages": serialized})
    return wrapper

# Use in your graph
graph.add_node("record_memory", build_record_wrapper(manager.record_node))
```

## Advanced Usage: Manual Setup (Without MemoryManager)

If you need more control over individual components:

```python
from agent_memory_layer import (
    MemoryConfig,
    apply_migrations,
    build_inject_node,
    build_record_node,
    build_scheduler,
    get_pool,
)

async def main():
    config = MemoryConfig()  # reads from env vars / .env

    async with get_pool(config) as pool:
        await apply_migrations(pool, config)

        inject = build_inject_node(pool, config)
        record = build_record_node(pool, config)

        # Build your graph with inject and record nodes...

        # Start background consolidation and decay
        scheduler = build_scheduler(
            pool,
            config,
            scopes=[{"user_id": "u1", "agent_id": "support_agent", "workflow_id": None}],
        )
        scheduler.start()

        # ... run your app ...

        scheduler.shutdown()
```

## Triggering Consolidation Manually

Instead of waiting for the scheduler, you can trigger consolidation directly:

```python
from agent_memory_layer import (
    run_consolidation,
    EpisodeRepository,
    MemoryItemRepository,
    ConsolidationJobRepository,
)

episode_repo = EpisodeRepository(pool, schema="sap_agent_memory")
memory_repo = MemoryItemRepository(pool, schema="sap_agent_memory")
job_repo = ConsolidationJobRepository(pool, schema="sap_agent_memory")

job = await run_consolidation(
    episode_repo=episode_repo,
    memory_repo=memory_repo,
    job_repo=job_repo,
    config=config,
    user_id="user-123",
    agent_id="support-bot",
    workflow_id=None,
)

if job:
    print(f"Status: {job.status}, Facts created: {job.facts_created}")
```

## Running the E2E Test

The repo includes a self-contained end-to-end test script that builds a mini LangGraph, runs two conversation turns, consolidates memories in between, and verifies that facts carry across turns.

```bash
# 1. Start PostgreSQL
docker compose up -d

# 2. Set credentials (or put them in .env)
export AICORE_CLIENT_ID="..."
export AICORE_CLIENT_SECRET="..."
export AICORE_AUTH_URL="..."
export AICORE_BASE_URL="..."

# 3. Install the library with LangGraph support
pip install -e ".[langgraph]"

# 4. Run
python scripts/e2e_test.py
```

Expected output:

```
=== agent-memory-layer E2E Test ===

[1/8] Starting MemoryManager...                    ✓ OK
[2/8] Building mini LangGraph...                   ✓ OK
[3/8] Turn 1: "I'm a Python developer..."          ✓ OK
[4/8] Consolidating episodes...                    ✓ OK  — 5 facts created
[5/8] Turn 2: "What language for my project?"      ✓ OK
[6/8] Verifying memory injection...                ✓ OK  — memories carried across turns
[7/8] Testing decay...                             ✓ OK
[8/8] Closing MemoryManager...                     ✓ OK
```

## LangGraph State Keys

Your state dictionary must include these keys for memory to work:

| Key | Type | Description |
| --- | --- | --- |
| `user_id` | `str \| None` | Identifies the user |
| `agent_id` | `str \| None` | Identifies the agent |
| `workflow_id` | `str \| None` | Identifies the workflow |
| `messages` | `list[dict]` | Conversation turns (role + content) |

After `inject_node` runs, the state will also contain:

| Key | Type | Description |
| --- | --- | --- |
| `working_memory` | `dict` | Contains `injected_facts` (list of strings), `session_id`, `scratch`, etc. |

## Scopes

A scope is a combination of `user_id`, `agent_id`, and `workflow_id`. Memory is isolated per scope — different users and agents do not share facts.

All three fields are optional. Use only what makes sense for your use case:

| Use Case | Scope |
| --- | --- |
| Memory per user | `user_id` only |
| Memory per agent | `agent_id` only |
| Memory per user and agent | `user_id + agent_id` |
| Fully isolated per run | All three fields set |

## Configuration Reference

All settings can be provided via environment variables, a `.env` file, or passed directly to `MemoryManager`/`MemoryConfig`.

### AI Core (Required)

| Variable | Description |
| --- | --- |
| `AICORE_CLIENT_ID` | OAuth client ID |
| `AICORE_CLIENT_SECRET` | OAuth client secret |
| `AICORE_AUTH_URL` | OAuth token endpoint |
| `AICORE_BASE_URL` | AI Core API base URL |
| `AICORE_RESOURCE_GROUP` | Resource group (default: `default`) |
| `AICORE_LLM_MODEL` | LLM model name (default: `gpt-4o`) |

### Database

| Variable | Default | Description |
| --- | --- | --- |
| `SAP_MEMORY_DB_URL` | — | Full connection string (overrides individual fields below) |
| `SAP_MEMORY_DB_HOST` | `localhost` | PostgreSQL host |
| `SAP_MEMORY_DB_PORT` | `5432` | PostgreSQL port |
| `SAP_MEMORY_DB_NAME` | `sap_memory` | Database name |
| `SAP_MEMORY_DB_USER` | `postgres` | Database user |
| `SAP_MEMORY_DB_PASSWORD` | `postgres` | Database password |
| `SAP_MEMORY_DB_SCHEMA` | `sap_agent_memory` | PostgreSQL schema |

### Memory Tuning

| Variable | Default | Description |
| --- | --- | --- |
| `SAP_MEMORY_INJECT_MIN_IMPORTANCE` | `0.1` | Minimum importance score to inject a fact |
| `SAP_MEMORY_INJECT_LIMIT` | `20` | Max facts injected per session |
| `SAP_MEMORY_CONSOLIDATION_INTERVAL_HOURS` | `6` | How often consolidation runs |
| `SAP_MEMORY_CONSOLIDATION_CLEANUP_INTERVAL_HOURS` | `1` | Cleanup job interval |
| `SAP_MEMORY_CONSOLIDATION_DECAY_DAILY_HOUR` | `2` | Daily decay hour (UTC) |
| `SAP_MEMORY_CONSOLIDATION_DECAY_FACTOR` | `0.95` | Importance multiplier per decay cycle |
| `SAP_MEMORY_CONSOLIDATION_DECAY_THRESHOLD` | `0.05` | Delete facts below this score |

## Architecture

```text
┌─────────────────────────────────────────────────┐
│                  LangGraph Flow                  │
│                                                  │
│  inject_node ──→ your_agent_node ──→ record_node │
│       │                                   │      │
│       ▼                                   ▼      │
│  Reads facts                      Saves episode  │
│  from DB                          to DB           │
└─────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────┐
│            Background Scheduler                  │
│                                                  │
│  Consolidation (every N hours):                  │
│    Load episodes → Call LLM → Extract facts      │
│    → Write MemoryItems → Mark episodes done      │
│                                                  │
│  Decay (daily):                                  │
│    Multiply importance by decay_factor            │
│    → Delete items below threshold                 │
└─────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────┐
│               PostgreSQL Tables                  │
│                                                  │
│  episodes           Raw conversation records     │
│  memory_items       Consolidated facts + scores  │
│  consolidation_jobs Job tracking / idempotency   │
└─────────────────────────────────────────────────┘
```

## API Reference

### MemoryManager

The recommended entry point. Handles DB setup, node creation, and the scheduler.

| Method | Description |
| --- | --- |
| `MemoryManager(*, db_url, aicore_*, ...)` | Constructor — saves config, nothing is running yet |
| `await MemoryManager.create(*, ...)` | Shortcut: creates instance + calls `start()` |
| `await manager.start()` | Connects to DB, runs migrations, builds nodes, starts scheduler |
| `manager.inject_node` | LangGraph node function — fetches memories into state |
| `manager.record_node` | LangGraph node function — saves conversation as episode |
| `await manager.close()` | Stops scheduler, closes DB pool |

### Models

| Class | Description |
| --- | --- |
| `WorkingMemoryFrame` | Ephemeral per-session frame with `injected_facts`, `scratch`, `session_id` |
| `EpisodeRecord` | Raw conversation saved by `record_node` (id, messages, scope, timestamps) |
| `MemoryItem` | Consolidated fact with `content`, `importance`, and scope |
| `ConsolidationJob` | Tracks a consolidation run (status, facts_created, checksum) |

### Storage Repositories

| Class | Key Methods |
| --- | --- |
| `EpisodeRepository` | `save()`, `get_unconsolidated()`, `mark_consolidated()` |
| `MemoryItemRepository` | `upsert()`, `fetch_by_scope()`, `delete()`, `decay_importance()` |
| `ConsolidationJobRepository` | `create()`, `update_status()`, `get_last_for_scope()` |

### Functions

| Function | Description |
| --- | --- |
| `build_inject_node(pool, config)` | Returns an async LangGraph node that injects memories |
| `build_record_node(pool, config)` | Returns an async LangGraph node that saves episodes |
| `build_scheduler(pool, config, scopes=)` | Returns a configured APScheduler with consolidation + decay jobs |
| `run_consolidation(...)` | Manually triggers one consolidation pass for a scope |
| `get_pool(config)` | Async context manager for DB connection pool |
| `apply_migrations(pool, config)` | Runs SQL migrations to set up the schema |

## Cloud Foundry / SAP BTP

If deploying to Cloud Foundry, use `MemoryConfig.from_vcap_services()` to automatically read credentials from service bindings:

```python
import json, os
from agent_memory_layer import MemoryConfig

vcap = json.loads(os.environ["VCAP_SERVICES"])
config = MemoryConfig.from_vcap_services(vcap)
```

AI Core and PostgreSQL credentials are extracted from the `aicore` and `postgresql` bindings respectively.

## Inspecting Stored Data

Connect to the database to see what's stored:

```bash
# If using Docker
docker exec -it agent-memory-layer-db psql -U memory_user -d agent_memory

# Query examples
SELECT * FROM sap_agent_memory.memory_items ORDER BY importance DESC;
SELECT * FROM sap_agent_memory.episodes ORDER BY created_at DESC;
SELECT * FROM sap_agent_memory.consolidation_jobs ORDER BY started_at DESC;
```

## License

Apache 2.0 — see [LICENSE](LICENSE) for details.
