Metadata-Version: 2.4
Name: langgraph-postgres-memory
Version: 0.2.0
Summary: Production-ready PostgreSQL memory for LangGraph agents. Pool setup, lifecycle management, retry logic, and common ops — no boilerplate.
Author: LangModule
License: MIT
License-File: LICENSE
Keywords: agent,checkpointer,langgraph,memory,postgres,store
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.11
Requires-Dist: langchain-core>=1.3.2
Requires-Dist: langgraph-checkpoint-postgres>=3.0.5
Requires-Dist: langgraph>=1.1.10
Requires-Dist: psycopg-pool>=3.3.0
Requires-Dist: psycopg[binary]>=3.3.3
Requires-Dist: tenacity>=9.1.4
Description-Content-Type: text/markdown

# LangGraph Postgres Memory

Production-ready PostgreSQL memory for LangGraph agents. Pool setup, lifecycle management, retry logic, and common operations — no boilerplate.

> **Note:** This library wraps [langgraph-checkpoint-postgres](https://github.com/langchain-ai/langgraph/tree/main/libs/checkpoint-postgres) and [langgraph](https://github.com/langchain-ai/langgraph) — it does not reimplement checkpointing or the store. It handles the boilerplate you'd otherwise copy-paste into every agent.

## Short-Term Memory (PostgresShortTerm)

Short-term memory persists conversation state within a single thread. LangGraph's checkpointer saves a snapshot at every graph step automatically — `PostgresShortTerm` handles the pool, lifecycle, and cleanup around it.

### Features

- **One-line setup** — connection pool, checkpointer, and lifecycle managed via async context manager
- **Production pool defaults** — TCP keepalives, configurable idle/lifetime/timeout, schema isolation
- **Retry with backoff** — transient Postgres errors retried automatically via tenacity
- **Thread cleanup** — single CTE deletes across all 3 checkpoint tables in one round-trip
- **Bulk cleanup** — delete threads older than N days using UUID v6 timestamp comparison
- **Health primitives** — `ping()` and `pool_stats()` for application health endpoints
- **Pydantic config** — validated settings, pass however you load them (YAML, env vars, hardcoded)

### Usage

```python
from langchain_core.messages import HumanMessage
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph_postgres_memory import PostgresMemoryConfig, PostgresShortTerm

# Define your graph
builder = StateGraph(MessagesState)
builder.add_node("echo", lambda state: {"messages": state["messages"]})
builder.add_edge(START, "echo")
builder.add_edge("echo", END)

# Configure memory
config = PostgresMemoryConfig(
    user="myuser",
    password="mypass",
    host="localhost",
    database="mydb",
    schema_name="agent_schema",  # default: "public"
)

async with PostgresShortTerm(config) as memory:
    # Compile your graph with the checkpointer
    graph = builder.compile(checkpointer=memory.checkpointer)

    # Invoke as usual
    result = await graph.ainvoke(
        {"messages": [HumanMessage(content="hello")]},
        {"configurable": {"thread_id": "thread-123"}},
    )

    # Read messages back
    messages = await memory.get_messages("thread-123")

    # Delete a thread
    await memory.delete_thread("thread-123")

    # Bulk cleanup
    await memory.delete_threads_older_than(days=30)

    # Health check
    alive = await memory.ping()
    stats = memory.pool_stats()
```

### Without this library

```python
# ~40 lines you copy-paste into every agent
conn_str = f"postgresql://{user}:{quote_plus(password)}@{host}:{port}/{db}"
conn_str += "?keepalives=1&keepalives_idle=30&..."
pool = AsyncConnectionPool(
    conninfo=conn_str, min_size=2, max_size=10,
    kwargs={"autocommit": True, "row_factory": dict_row},
    configure=..., check=...,
)
await pool.open()
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
# ... try/finally to close pool
# ... raw SQL to delete threads across 3 tables
# ... dig into checkpoint JSONB to extract messages
```

## Long-Term Memory (PostgresLongTerm)

Long-term memory persists knowledge across threads and sessions. Unlike the checkpointer (which saves every message automatically within one thread), the store requires explicit reads and writes — your agent decides what to remember. `PostgresLongTerm` handles pool, store lifecycle, TTL sweeper, retry, and convenience operations.

### Features

- **One-line setup** — connection pool, store, table migrations, and TTL sweeper via async context manager
- **Any embedding provider** — pass OpenAI, Bedrock, Cohere, or any custom function. No provider lock-in
- **Optional semantic search** — embeddings are optional. Works as a pure key-value store without them
- **TTL auto-lifecycle** — sweeper starts/stops automatically with the context manager
- **Namespace CRUD with retry** — `put`, `get`, `search`, `delete`, `list_namespaces` all retried on transient errors
- **Bulk operations** — `delete_namespace` and `count` via raw SQL (Store API doesn't have these)
- **Health primitives** — `ping()` and `pool_stats()` for application health endpoints
- **Pydantic config** — inherits all pool/retry settings from `PostgresMemoryConfig`, adds embedding + TTL fields

### Usage

#### Basic key-value (no embeddings)

```python
from langgraph_postgres_memory import PostgresLongTermConfig, PostgresLongTerm

config = PostgresLongTermConfig(user="u", password="p", database="db")

async with PostgresLongTerm(config) as memory:
    # Store user preferences
    await memory.put(("users", "u1", "prefs"), "theme", {"value": "dark"})
    await memory.put(("users", "u1", "prefs"), "lang", {"value": "python"})

    # Retrieve
    item = await memory.get(("users", "u1", "prefs"), "theme")
    print(item.value)  # {"value": "dark"}

    # Search with filter
    results = await memory.search(("users", "u1", "prefs"), filter={"value": "python"})

    # List namespaces
    ns = await memory.list_namespaces(prefix=("users", "u1"))

    # Count items
    n = await memory.count(("users", "u1", "prefs"))  # 2

    # Bulk delete
    deleted = await memory.delete_namespace(("users", "u1", "prefs"))  # 2

    # Health check
    alive = await memory.ping()
    stats = memory.pool_stats()
```

#### With semantic search (OpenAI)

```python
from langchain_openai import OpenAIEmbeddings
from langgraph_postgres_memory import PostgresLongTermConfig, PostgresLongTerm

config = PostgresLongTermConfig(
    user="u", password="p", database="db",
    embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
    embedding_dims=1536,
    embedding_fields=["text"],
)

async with PostgresLongTerm(config) as memory:
    await memory.put(
        ("users", "u1", "memories"), "m1",
        {"text": "User prefers Python for backend work"}
    )
    await memory.put(
        ("users", "u1", "memories"), "m2",
        {"text": "User's company runs on AWS with Kubernetes"}
    )

    # Semantic search
    results = await memory.search(
        ("users", "u1", "memories"),
        query="what cloud infrastructure does the user have?"
    )
    # Returns m2 ranked higher (semantic match)
```

#### With AWS Bedrock Titan

```python
import boto3
from langchain_aws import BedrockEmbeddings
from langgraph_postgres_memory import PostgresLongTermConfig, PostgresLongTerm

session = boto3.Session(profile_name="my-profile", region_name="us-east-1")
bedrock_client = session.client("bedrock-runtime")

config = PostgresLongTermConfig(
    user="u", password="p", database="db",
    embedding=BedrockEmbeddings(
        model_id="amazon.titan-embed-text-v2:0",
        client=bedrock_client
    ),
    embedding_dims=1024,
)

async with PostgresLongTerm(config) as memory:
    # Works exactly the same as OpenAI example
    ...
```

#### With both short-term and long-term

```python
from langgraph_postgres_memory import (
    PostgresMemoryConfig,
    PostgresShortTerm,
    PostgresLongTermConfig,
    PostgresLongTerm,
)

short_config = PostgresMemoryConfig(user="u", password="p", database="db")
long_config = PostgresLongTermConfig(
    user="u", password="p", database="db",
    embedding=my_embeddings, embedding_dims=1536,
)

async with PostgresShortTerm(short_config) as short, PostgresLongTerm(long_config) as long:
    graph = builder.compile(
        checkpointer=short.checkpointer,  # automatic per-thread state
        store=long.store,                  # explicit cross-thread memory
    )
```

#### With TTL (auto-expiring items)

```python
config = PostgresLongTermConfig(
    user="u", password="p", database="db",
    ttl_default_minutes=1440,          # 24 hours default
    ttl_sweep_interval_minutes=10,     # check every 10 minutes
)

async with PostgresLongTerm(config) as memory:
    # This item expires in 60 minutes (overrides default)
    await memory.put(("cache",), "temp", {"data": "..."}, ttl=60)

    # This item uses default TTL (24 hours)
    await memory.put(("users", "u1", "session"), "ctx", {"last_topic": "k8s"})

    # TTL sweeper runs automatically in background
```

## Installation

```bash
# pip
pip install langgraph-postgres-memory

# uv
uv add langgraph-postgres-memory
```

## Requirements

- Python >= 3.11
- PostgreSQL (tested with 16)
- pgvector extension (only if using semantic search)

## Configuration

All pool and retry settings have sensible defaults. Override what you need:

```python
config = PostgresMemoryConfig(
    user="myuser",
    password="mypass",
    database="mydb",

    # Connection (defaults shown)
    host="localhost",
    port=5432,
    schema_name="public",

    # Pool tuning
    pool_min_size=2,
    pool_max_size=20,
    pool_max_idle=300,       # seconds — tune down to ~30 for serverless (Neon, Supabase)
    pool_max_lifetime=1800,  # seconds — tune down to ~180 for serverless
    pool_timeout=30,         # seconds — acquisition timeout

    # Retry tuning
    retry_max_attempts=3,
    retry_max_wait=10,       # backoff cap in seconds
)
```

`PostgresLongTermConfig` inherits all fields above and adds:

```python
long_config = PostgresLongTermConfig(
    user="myuser",
    password="mypass",
    database="mydb",

    # Embedding (optional — omit for pure key-value mode)
    embedding=my_embeddings_object,      # LangChain Embeddings, sync/async function
    embedding_dims=1536,                 # required if embedding is set
    embedding_fields=["$"],              # JSON paths to embed, default = entire value
    distance_type="cosine",              # "cosine", "l2", or "inner_product"

    # TTL (optional — omit for no expiry)
    ttl_default_minutes=1440,            # default TTL for new items (minutes)
    ttl_sweep_interval_minutes=5,        # background cleanup interval (minutes)
)
```

## API Reference

### PostgresShortTerm

| Method | Description |
|--------|-------------|
| `PostgresShortTerm(config)` | Constructor, takes `PostgresMemoryConfig` |
| `async with PostgresShortTerm(config)` | Opens pool, initializes checkpointer, closes on exit |
| `.checkpointer` | `AsyncPostgresSaver` instance for `builder.compile(checkpointer=...)` |
| `await .get_messages(thread_id)` | Get messages from latest checkpoint |
| `await .delete_thread(thread_id)` | Delete all checkpoints, blobs, and writes for a thread |
| `await .delete_threads_older_than(days)` | Bulk delete threads older than N days |
| `await .ping()` | Returns `True` if database is reachable |
| `.pool_stats()` | Pool size, available connections, waiting requests |

### PostgresLongTerm

| Method | Description |
|--------|-------------|
| `PostgresLongTerm(config)` | Constructor, takes `PostgresLongTermConfig` |
| `async with PostgresLongTerm(config)` | Opens pool, initializes store, starts TTL sweeper, closes on exit |
| `.store` | `AsyncPostgresStore` instance for `builder.compile(store=...)` |
| `await .put(namespace, key, value)` | Store or update an item (supports `index` and `ttl` kwargs) |
| `await .get(namespace, key)` | Retrieve an item or `None` |
| `await .search(namespace_prefix)` | Search with optional `query`, `filter`, `limit`, `offset` |
| `await .delete(namespace, key)` | Delete a single item |
| `await .list_namespaces()` | List namespaces with optional `prefix`, `suffix`, `max_depth` |
| `await .delete_namespace(namespace)` | Delete ALL items in a namespace (returns count) |
| `await .count(namespace_prefix)` | Count items under a namespace prefix |
| `await .ping()` | Returns `True` if database is reachable |
| `.pool_stats()` | Pool size, available connections, waiting requests |

## Project Structure

```
langgraph_postgres_memory/
  __init__.py        — public exports
  _core.py           — shared config, pool builder, retry builder, helpers
  shortterm.py       — PostgresShortTerm (checkpointer wrapper)
  longterm.py        — PostgresLongTermConfig + PostgresLongTerm (store wrapper)
pyproject.toml
Makefile
tests/
  conftest.py        — test config fixture + --run-integration flag
  docker-compose.yml — Postgres 16 on port 5433
  test_shortterm.py  — 20 unit + 7 integration tests
  test_longterm.py   — 17 unit + 21 integration tests
```

## Testing

```bash
# Unit tests only (no database needed)
make test-unit

# Full test suite (starts Postgres via Docker, runs all tests, stops Postgres)
make test-all

# Or manually
docker compose -f tests/docker-compose.yml up -d --wait
uv run pytest --run-integration -v
docker compose -f tests/docker-compose.yml down
```

## Acknowledgments

This project wraps [langgraph-checkpoint-postgres](https://github.com/langchain-ai/langgraph/tree/main/libs/checkpoint-postgres) and [langgraph](https://github.com/langchain-ai/langgraph) from the LangChain team. The checkpointing engine, store, serialization, and schema management are entirely theirs — this library handles pool lifecycle, retry, and convenience operations on top.

## License

MIT
