Metadata-Version: 2.4
Name: rag-timetravel
Version: 1.0.0
Summary: Event-sourced RAG pipeline with time-travel debugging and index versioning
Project-URL: Homepage, https://github.com/Ar-maan05/rag-timetravel
Project-URL: Repository, https://github.com/Ar-maan05/rag-timetravel
Project-URL: Issues, https://github.com/Ar-maan05/rag-timetravel/issues
License: MIT License
        
        Copyright (c) 2026 Armaan Sandhu
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
License-File: LICENSE
Keywords: debugging,event-sourcing,lancedb,llm,rag,vector-database
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.10
Requires-Dist: aiosqlite>=0.20.0
Requires-Dist: fastapi>=0.115.0
Requires-Dist: httpx>=0.27.0
Requires-Dist: lancedb>=0.10.0
Requires-Dist: litellm>=1.30.0
Requires-Dist: numpy>=1.26.0
Requires-Dist: pyarrow>=16.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: typer>=0.12.0
Requires-Dist: uvicorn[standard]>=0.30.0
Provides-Extra: dev
Requires-Dist: asyncpg>=0.29.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest-cov>=5.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.4.0; extra == 'dev'
Provides-Extra: local
Requires-Dist: sentence-transformers>=3.0.0; extra == 'local'
Provides-Extra: mcp
Requires-Dist: mcp>=0.1.0; extra == 'mcp'
Provides-Extra: postgres
Requires-Dist: asyncpg>=0.29.0; extra == 'postgres'
Description-Content-Type: text/markdown

# rag-timetravel

[![CI](https://github.com/Ar-maan05/rag-timetravel/actions/workflows/ci.yml/badge.svg)](https://github.com/Ar-maan05/rag-timetravel/actions/workflows/ci.yml) [![PyPI version](https://img.shields.io/pypi/v/rag-timetravel.svg?v=2)](https://pypi.org/project/rag-timetravel/) [![Python versions](https://img.shields.io/pypi/pyversions/rag-timetravel.svg?v=2)](https://pypi.org/project/rag-timetravel/) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Time-travel debugging for the RAG retrieval layer, built on LanceDB's native dataset versioning.

Every retrieval and generation step is recorded as an immutable event, and the vector index is versioned automatically. You can take any past query, reconstruct the exact index version it ran against, re-run retrieval, and diff what was retrieved then versus now. The defensible, deterministic core is the retrieval diff; answer re-generation is a best-effort layer on top (see [Determinism](#what-is-and-isnt-deterministic)).

---

## The problem

When a RAG answer regresses ("why did this get worse than last month?"), the hard question is *what changed*: the retrieved chunks, the index state at that time, or the generator. Tracing/eval tools (Langfuse, Arize Phoenix, LangSmith, TruLens) record traces and let you compare runs, but they treat the vector index as opaque: they cannot reconstruct the index as it existed at an arbitrary past moment and re-run retrieval against it.

rag-timetravel fills that specific gap. Because it owns ingestion, it can pin retrieval to a historical LanceDB version and show you exactly which chunks a query would have surfaced at any point in its history, with no full-index copies.

### What is and isn't deterministic

- **Retrieval replay is faithful.** Pinning to a snapshot's LanceDB version reproduces the exact chunk set and scores that existed at that time. This is the core guarantee, and it is tested against a real index.
- **Generation replay is best-effort.** Remote models drift and sample non-deterministically, so a replayed *answer* reflects today's generator over the historical retrieval, not a byte-for-byte reproduction of the old answer. Treat the replayed answer as informational; trust the retrieval diff.

---

## How it works

```
┌─────────────────────────────────────────────────────────────────┐
│                        RAGPipeline                              │
│                                                                 │
│  ingest(doc) ──► chunk ──► embed ──► LanceDB ──► emit event    │
│                                                                 │
│  query(text) ──► embed ──► search ──► LLM ──► emit 5 events    │
│                                         │                       │
│                                         ▼                       │
│                                   EventStore (SQLite)           │
└─────────────────────────────────────────────────────────────────┘
          │
          │  (at any later time)
          ▼
┌─────────────────────────────────────────────────────────────────┐
│                      ReplayEngine                               │
│                                                                 │
│  1. Load QueryReceived event → get original timestamp T         │
│  2. Find IndexSnapshot with max(ts) where ts <= T               │
│  3. table.checkout(snapshot.lancedb_version) on a fresh handle  │
│     (read-only view, O(1), no data copy)                        │
│  4. Re-run retrieval + generation                               │
│  5. Return ReplayResult                                         │
└─────────────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────────────┐
│                       Comparator                                │
│                                                                 │
│  compare(left_id, right_id) → DiffReport                        │
│    - chunks added / removed / score-delta                       │
│    - answer similarity (embedding cosine + token Jaccard)       │
│    - latency delta                                              │
└─────────────────────────────────────────────────────────────────┘
```

### Why LanceDB?

LanceDB's versioned storage creates a new dataset version on every write automatically. A "snapshot" in rag-timetravel is just a metadata record pointing to a specific LanceDB version integer. Checking out a past version is `O(1)`: no data is copied. This is the key primitive that makes cheap time-travel possible.

### Why SQLite for the event store?

Zero dependencies, file-portable, and fast enough for local workloads (< 1 ms per append). The `EventStore` interface is narrow enough to swap in a Postgres backend later with no changes to the pipeline.

---

## Installation

```bash
pip install rag-timetravel
```

The base install is lightweight. The default `PipelineConfig` uses a local
`sentence-transformers` embedder, which (because it pulls in torch) ships as an
optional extra:

```bash
pip install 'rag-timetravel[local]'    # local embeddings via sentence-transformers
```

If you use an OpenAI-compatible embedder instead (`embedder_model="openai/..."`
or `"ollama/..."`), the base install is all you need.

**Requirements:** Python 3.11+

For local generation (default config): install [Ollama](https://ollama.ai) and pull a model:

```bash
ollama pull gemma3
```

---

## Quickstart

### Python API

```python
import asyncio
from rag_timetravel import RAGPipeline, PipelineConfig, ReplayEngine, Comparator

async def main():
    # Create a pipeline
    config = PipelineConfig(model="ollama/gemma3", top_k=4)
    pipeline = await RAGPipeline.create("./my_project", config)

    # Ingest documents
    await pipeline.ingest("policy_v1.txt", open("policy_v1.txt").read())
    snap1 = await pipeline.take_snapshot(label="v1-docs")

    # Query
    result = await pipeline.query("What is the return window?")
    print(result.answer)
    print(result.query_id)  # save this

    # Ingest updated documents
    await pipeline.ingest("policy_v2.txt", open("policy_v2.txt").read())
    await pipeline.take_snapshot(label="v2-docs")

    # Replay the original query against the v1 index
    engine = ReplayEngine(pipeline)
    replay = await engine.replay(result.query_id)
    print(replay.answer)  # answer using only v1 docs

    # Diff v1 vs v2 answers
    result_v2 = await pipeline.query("What is the return window?")
    cmp = Comparator(pipeline)
    report = await cmp.compare(result.query_id, result_v2.query_id)
    print(report.to_text())

asyncio.run(main())
```

### CLI

```bash
# Ingest a directory of .txt and .md files
rag-timetravel ingest ./docs --project ./my_project

# Take a snapshot
rag-timetravel snapshot --label "after-v1" --project ./my_project

# Query
rag-timetravel query "What is the refund policy?" --project ./my_project

# Replay a past query
rag-timetravel replay --query-id <uuid> --project ./my_project

# Replay any text against the index at a past timestamp
rag-timetravel replay \
  --text "What is the refund policy?" \
  --as-of "2024-12-01T10:00:00" \
  --project ./my_project

# Diff two queries (add --html report.html to also write an HTML report)
rag-timetravel diff --left <uuid> --right <uuid> --project ./my_project

# Show the event trace for a query
rag-timetravel trace --query-id <uuid> --project ./my_project

# List all snapshots
rag-timetravel snapshots --project ./my_project

# Start the REST API
rag-timetravel serve --project ./my_project --port 8000
```

### REST API

```bash
rag-timetravel serve --project ./my_project
```

| Method | Path | Description |
|--------|------|-------------|
| POST | `/ingest` | Ingest a document |
| POST | `/query` | Run a RAG query |
| GET | `/query/{query_id}` | Get event trace for a query |
| POST | `/replay` | Replay a past query against its original index |
| POST | `/replay/as-of` | Replay query text at a historical timestamp |
| POST | `/diff` | Diff two query executions (JSON) |
| GET | `/diff/{left}/{right}/html` | Rendered HTML diff report |
| GET | `/snapshots` | List all snapshots |
| POST | `/snapshot` | Take a manual snapshot |
| GET | `/events` | Query the raw event log |
| GET | `/health` | Health check |

Interactive docs: `http://localhost:8000/docs`

---

## Configuration

```python
from rag_timetravel import PipelineConfig

config = PipelineConfig(
    embedder_model="all-MiniLM-L6-v2",   # local sentence-transformers model
    model="ollama/gemma3",                # litellm model string
    top_k=4,                              # chunks to retrieve per query
    chunk_size=512,                       # words per chunk
    chunk_overlap=64,                     # word overlap between chunks
    auto_snapshot_every=10,               # snapshot after every N docs (0=off)
    table_name="documents",               # LanceDB table name
)
```

### Using OpenAI

```python
import os
os.environ["OPENAI_API_KEY"] = "sk-..."

config = PipelineConfig(
    embedder_model="openai/text-embedding-3-small",
    model="gpt-4o",
)
```

### Using a custom embedder

```python
from rag_timetravel.index.embedder import Embedder

class MyEmbedder:
    @property
    def dim(self) -> int:
        return 768

    async def embed(self, texts: list[str]) -> list[list[float]]:
        # your implementation
        ...

pipeline = await RAGPipeline.create("./project", config, embedder=MyEmbedder())
```

---

## Project structure

```
my_project/
├── events.db          # SQLite event store (append-only log)
└── lancedb/           # LanceDB vector index (versioned automatically)
    └── documents.lance/
```

The event store and index are self-contained in a single directory. Copy or rsync the directory to back up the full history.

---

## Event types

| Event | When | Key payload fields |
|-------|------|-------------------|
| `query.received` | Query enters the pipeline | `text`, `pipeline_config_id` |
| `retrieval.executed` | Vector search completes | `chunk_ids`, `scores`, `latency_ms` |
| `context.assembled` | Chunks merged into prompt | `chunk_ids`, `token_count` |
| `generation.executed` | LLM produces answer | `answer`, `model`, token counts |
| `query.completed` | Full round-trip done | `total_latency_ms` |
| `document.ingested` | Document written to index | `doc_id`, `source`, `chunk_count` |
| `index.snapshot_taken` | Snapshot recorded | `snapshot_id`, `lancedb_version` |
| `replay.*` | Replay events (same structure, separate prefix) | |

---

## Development

```bash
git clone https://github.com/Ar-maan05/rag-timetravel
cd rag-timetravel
pip install -e ".[dev]"

# Run tests: they exercise a real local LanceDB but mock the LLM and use a
# stub embedder, so no GPU, model download, or API key is needed.
pytest tests/

# Lint and type-check
ruff check src/rag_timetravel/
mypy src/rag_timetravel/
```

---

## Roadmap

- [x] SQLite event store
- [x] LanceDB index versioning via `checkout`
- [x] Query, replay, diff (JSON + HTML report)
- [x] CLI + FastAPI server
- [x] PostgreSQL event store backend
- [x] Snapshot scheduling (cron-style)
- [x] `bisect` command: binary search across snapshots to find when an answer changed
- [x] Web UI for browsing event traces
- [x] MCP server wrapper

### Future Explorations

- [ ] **Advanced Retrieval Replay:** Hybrid search (vector + BM25) and re-ranker versioning
- [ ] **Visual Diffing & Analytics:** Ingested document diffs and embedding drift visualization
- [ ] **Cloud Storage Backends:** Support LanceDB checkouts directly from S3/GCS
- [ ] **Observability Integrations:** Export time-travel traces to OpenTelemetry, Langfuse, or Phoenix

---

## License

MIT

