Metadata-Version: 2.4
Name: langgraph-ephemeral-checkpointer
Version: 0.1.0rc2
Summary: TTL-based thread retention for LangGraph checkpointers
Requires-Python: >=3.11
Requires-Dist: clypi
Requires-Dist: langgraph-checkpoint>=3.0.0
Provides-Extra: postgres
Requires-Dist: langgraph-checkpoint-postgres>=3.0.5; extra == 'postgres'
Provides-Extra: sqlite
Requires-Dist: langgraph-checkpoint-sqlite>=3.0.3; extra == 'sqlite'
Provides-Extra: test
Requires-Dist: aiosqlite>=0.20; extra == 'test'
Requires-Dist: langgraph-checkpoint-sqlite>=3.0.3; extra == 'test'
Requires-Dist: langgraph>=1.1.8; extra == 'test'
Requires-Dist: pytest-asyncio>=0.24; extra == 'test'
Requires-Dist: pytest>=8.0; extra == 'test'
Description-Content-Type: text/markdown

# langgraph-ephemeral-checkpointer

TTL-based thread retention for [LangGraph](https://github.com/langchain-ai/langgraph) checkpointers. Automatically expire and delete old conversation threads based on idle time, absolute age, or checkpoint count — without touching your application's hot path.

---

## Table of Contents

- [Why](#why)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [TTLPolicy](#ttlpolicy)
- [TTLSweeper](#ttlsweeper)
  - [Running a sweep](#running-a-sweep)
  - [Background loop](#background-loop)
  - [Dry run](#dry-run)
  - [SweepResult](#sweepresult)
- [Per-thread policy overrides](#per-thread-policy-overrides)
- [Callbacks](#callbacks)
- [Safe delete](#safe-delete)
- [Multi-instance coordination](#multi-instance-coordination)
- [Backends](#backends)
- [CLI](#cli)
- [API reference](#api-reference)

---

## Why

LangGraph checkpointers persist every conversation state to storage. Without a retention policy, threads accumulate indefinitely. This library runs as a **sidecar** — it never sits in the hot path of your graph — and periodically sweeps expired threads according to rules you define.

---

## Installation

```bash
pip install langgraph-ephemeral-checkpointer
```

Backend extras (install only what you use):

```bash
pip install "langgraph-ephemeral-checkpointer[sqlite]"    # SqliteSaver
pip install "langgraph-ephemeral-checkpointer[postgres]"  # PostgresSaver
```

Python 3.11+ is required.

---

## Quick Start

```python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper

checkpointer = SqliteSaver.from_conn_string("threads.db")
policy = TTLPolicy(idle_ttl_seconds=3600)  # delete threads idle for > 1 hour
sweeper = TTLSweeper(checkpointer, policy)

result = sweeper.sweep()
print(f"Deleted {len(result.deleted_thread_ids)} thread(s)")
```

---

## TTLPolicy

`TTLPolicy` defines when a thread is considered expired. At least one rule is required.

```python
from langgraph_ephemeral_checkpointer import TTLPolicy

# Idle TTL — delete threads with no activity for 2 hours
policy = TTLPolicy(idle_ttl_seconds=7200)

# Hard age TTL — delete threads older than 7 days regardless of activity
policy = TTLPolicy(hard_age_ttl_seconds=604800)

# Checkpoint count limit — keep only the 20 most recent checkpoints per thread
policy = TTLPolicy(max_checkpoints_per_thread=20)

# Combine rules — a thread is expired if ANY rule matches
policy = TTLPolicy(
    idle_ttl_seconds=3600,
    hard_age_ttl_seconds=86400,
    max_checkpoints_per_thread=50,
)
```

| Parameter | Type | Description |
|---|---|---|
| `idle_ttl_seconds` | `int \| None` | Expire threads with no checkpoint activity for this many seconds |
| `hard_age_ttl_seconds` | `int \| None` | Expire threads whose first checkpoint is older than this many seconds |
| `max_checkpoints_per_thread` | `int \| None` | Trim oldest checkpoints so each thread keeps at most this many |

`TTLPolicy` is a frozen dataclass — instances are immutable.

---

## TTLSweeper

`TTLSweeper` is the main entry point. It takes a checkpointer and a policy, and handles the rest.

```python
TTLSweeper(
    checkpointer,           # any LangGraph BaseCheckpointSaver
    policy,                 # TTLPolicy
    *,
    policy_resolver=None,   # per-thread overrides (see below)
    enable_coordination=False,  # PostgreSQL advisory locks
    safe_delete=True,       # re-verify timestamps before deleting
    on_before_delete=None,  # callback before each deletion
    on_sweep_complete=None, # callback after each sweep
)
```

### Running a sweep

```python
# synchronous
result = sweeper.sweep()

# asynchronous
result = await sweeper.asweep()
```

Both return a [`SweepResult`](#sweepresult).

### Background loop

For always-on applications, run a background sweep loop that fires on an interval:

```python
import asyncio
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper

async def main():
    async with AsyncSqliteSaver.from_conn_string("threads.db") as checkpointer:
        policy = TTLPolicy(idle_ttl_seconds=3600)
        sweeper = TTLSweeper(checkpointer, policy)

        # sweep every 5 minutes
        asyncio.create_task(sweeper.start(interval_seconds=300))

        # ... your application runs here ...

        await sweeper.stop()

asyncio.run(main())
```

### Dry run

Preview what would be deleted without actually deleting anything:

```python
result = sweeper.sweep(dry_run=True)
print(f"Would delete: {result.deleted_thread_ids}")
```

`on_before_delete` is skipped during dry runs. `on_sweep_complete` still fires.

### SweepResult

Every sweep returns a `SweepResult`:

```python
result = sweeper.sweep()

result.deleted_thread_ids      # list[str] — IDs of deleted threads
result.pruned_checkpoint_count # int — checkpoints removed by max_checkpoints pruning
result.active_thread_count     # int — threads still alive after this sweep
result.sweep_duration_seconds  # float — wall-clock time for the sweep
```

---

## Per-thread policy overrides

Supply a `policy_resolver` to apply different rules to individual threads. The resolver receives a `thread_id` and returns either a custom `TTLPolicy` or a `PolicyOverride`.

```python
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper
from langgraph_ephemeral_checkpointer.types import PolicyOverride

default_policy = TTLPolicy(idle_ttl_seconds=3600)

vip_policy = TTLPolicy(idle_ttl_seconds=604800)  # VIP threads last 7 days

def resolver(thread_id: str):
    if thread_id.startswith("vip:"):
        return vip_policy
    if thread_id.startswith("system:"):
        return PolicyOverride.EXEMPT  # never expire
    return PolicyOverride.USE_DEFAULT

sweeper = TTLSweeper(checkpointer, default_policy, policy_resolver=resolver)
```

| Return value | Behaviour |
|---|---|
| `TTLPolicy` | Use this policy for the thread instead of the global one |
| `PolicyOverride.USE_DEFAULT` | Apply the sweeper's global policy |
| `PolicyOverride.EXEMPT` | Never expire this thread |

If the resolver raises an exception, it is logged and the global policy is used as a fallback. The resolver is called exactly once per thread per sweep.

---

## Callbacks

### `on_before_delete`

Called before each thread is deleted. Return `False` to skip the deletion.

```python
def on_before_delete(thread_id: str, policy: TTLPolicy, reason: str) -> bool:
    print(f"Deleting {thread_id!r} (reason: {reason})")
    # return False to abort this specific deletion
    return True

sweeper = TTLSweeper(checkpointer, policy, on_before_delete=on_before_delete)
```

The `reason` parameter is one of `"idle_ttl"` or `"hard_age_ttl"`.

### `on_sweep_complete`

Called once after every sweep cycle with the final `SweepResult`.

```python
import logging

logger = logging.getLogger(__name__)

def on_sweep_complete(result: SweepResult) -> None:
    logger.info(
        "sweep complete",
        extra={
            "deleted": len(result.deleted_thread_ids),
            "active": result.active_thread_count,
            "duration_s": result.sweep_duration_seconds,
        },
    )

sweeper = TTLSweeper(checkpointer, policy, on_sweep_complete=on_sweep_complete)
```

Exceptions raised inside either callback are caught, logged, and do not interrupt the sweep.

---

## Safe delete

By default (`safe_delete=True`), the sweeper re-reads each thread's latest checkpoint immediately before deleting it and compares timestamps. If the thread received a new checkpoint between the scan and the delete — meaning it became active again — the deletion is skipped.

```python
# disable if you don't need the extra safety check (slightly faster)
sweeper = TTLSweeper(checkpointer, policy, safe_delete=False)
```

---

## Multi-instance coordination

If you run multiple application instances sharing a single PostgreSQL checkpointer, you can enable advisory locks so only one instance sweeps at a time:

```python
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, TTLSweeper

async with AsyncPostgresSaver.from_conn_string(dsn) as checkpointer:
    sweeper = TTLSweeper(
        checkpointer,
        TTLPolicy(idle_ttl_seconds=3600),
        enable_coordination=True,
    )
    result = await sweeper.asweep()
```

When `enable_coordination=True` and the backend is PostgreSQL, the sweeper acquires a session-level advisory lock before scanning. If another instance holds the lock, the sweep is skipped and `SweepResult` is returned with empty `deleted_thread_ids`. The lock is automatically released when the database connection closes, so crashed instances never block others permanently.

This option is a no-op for non-PostgreSQL backends (a warning is logged).

---

## Backends

The sweeper automatically selects the most efficient strategy for your checkpointer.

| Checkpointer | Strategy | Notes |
|---|---|---|
| `InMemorySaver` | `MemoryStrategy` | Reads storage dict directly; extracts timestamps from UUIDv6 checkpoint IDs |
| `SqliteSaver` | `SqliteStrategy` | Single `GROUP BY` query for all threads |
| `AsyncSqliteSaver` | `AsyncSqliteStrategy` | Async variant of the above |
| `PostgresSaver` | `PostgresStrategy` | `GROUP BY` query; advisory lock support |
| `AsyncPostgresSaver` | `AsyncPostgresStrategy` | Async variant; advisory lock support |
| Any other | `GenericStrategy` | Iterates via `.list()` / `.alist()` — works with any checkpointer |

`max_checkpoints_per_thread` pruning is supported on all backends except `GenericStrategy`.

---

## CLI

The package ships a CLI for one-off inspection and sweeps without writing any code.

```
python -m langgraph_ephemeral_checkpointer <command> [options]
```

### `status` — inspect threads

Show all threads with their age, idle time, checkpoint count, and whether they are expired under the given policy.

```bash
# in-memory backend (empty on fresh start, useful for testing)
python -m langgraph_ephemeral_checkpointer status \
  --backend memory \
  --idle-ttl 3600

# SQLite
python -m langgraph_ephemeral_checkpointer status \
  --backend sqlite \
  --dsn threads.db \
  --idle-ttl 3600 \
  --hard-age-ttl 86400

# PostgreSQL
python -m langgraph_ephemeral_checkpointer status \
  --backend postgres \
  --dsn "postgresql://user:pass@localhost/mydb" \
  --idle-ttl 7200
```

### `sweep` — delete expired threads

```bash
# dry run first — see what would be deleted
python -m langgraph_ephemeral_checkpointer sweep \
  --backend sqlite \
  --dsn threads.db \
  --idle-ttl 3600 \
  --dry-run

# live run
python -m langgraph_ephemeral_checkpointer sweep \
  --backend sqlite \
  --dsn threads.db \
  --idle-ttl 3600
```

### Options

| Option | Description |
|---|---|
| `--backend` | `memory`, `sqlite`, or `postgres` (required) |
| `--dsn` | File path or connection string (required for sqlite/postgres) |
| `--idle-ttl` | Idle TTL in seconds |
| `--hard-age-ttl` | Hard age TTL in seconds |
| `--max-checkpoints` | Max checkpoints per thread |
| `--dry-run` | Preview deletions without executing (`sweep` only) |

At least one of `--idle-ttl`, `--hard-age-ttl`, or `--max-checkpoints` must be provided.

---

## API reference

### `TTLPolicy`

```python
@dataclass(frozen=True)
class TTLPolicy:
    idle_ttl_seconds: int | None = None
    hard_age_ttl_seconds: int | None = None
    max_checkpoints_per_thread: int | None = None
```

### `TTLSweeper`

```python
class TTLSweeper:
    def __init__(
        self,
        checkpointer: BaseCheckpointSaver,
        policy: TTLPolicy,
        *,
        policy_resolver: PolicyResolver | None = None,
        enable_coordination: bool = False,
        safe_delete: bool = True,
        on_before_delete: OnBeforeDelete | None = None,
        on_sweep_complete: OnSweepComplete | None = None,
    ) -> None: ...

    def sweep(self, *, dry_run: bool = False) -> SweepResult: ...
    async def asweep(self, *, dry_run: bool = False) -> SweepResult: ...
    async def start(self, interval_seconds: int = 300) -> None: ...
    async def stop(self) -> None: ...
```

### `SweepResult`

```python
@dataclass
class SweepResult:
    deleted_thread_ids: list[str]
    pruned_checkpoint_count: int
    active_thread_count: int
    sweep_duration_seconds: float
```

### `PolicyOverride`

```python
class PolicyOverride(enum.Enum):
    USE_DEFAULT = "use_default"
    EXEMPT      = "exempt"
```

### Callable types

```python
PolicyResolver  = Callable[[str], TTLPolicy | PolicyOverride]
OnBeforeDelete  = Callable[[str, TTLPolicy, str], bool]
OnSweepComplete = Callable[[SweepResult], None]
```
