Metadata-Version: 2.4
Name: corpulse
Version: 1.9.2
Summary: Corpus health analytics for RAG pipelines
Project-URL: Homepage, https://github.com/arkadyb/corpulse
Project-URL: Repository, https://github.com/arkadyb/corpulse
Project-URL: Source, https://github.com/arkadyb/corpulse
Project-URL: Issues, https://github.com/arkadyb/corpulse/issues
Author: Arkady B
License-Expression: MPL-2.0
License-File: LICENSE
Keywords: corpus-health,observability,rag,retrieval-augmented-generation,vector-database
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Requires-Dist: numpy>=1.24
Requires-Dist: scikit-learn>=1.3
Requires-Dist: typing-extensions>=4.8
Provides-Extra: dev
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Provides-Extra: fastapi
Requires-Dist: fastapi>=0.110.0; extra == 'fastapi'
Requires-Dist: pydantic>=2.0.0; extra == 'fastapi'
Provides-Extra: postgres
Requires-Dist: psycopg[binary,pool]>=3.2; extra == 'postgres'
Provides-Extra: postgres-async
Requires-Dist: asyncpg>=0.29; extra == 'postgres-async'
Provides-Extra: qdrant
Requires-Dist: qdrant-client>=1.7; extra == 'qdrant'
Description-Content-Type: text/markdown

# corpulse

Corpus health analytics for RAG pipelines. Track which documents help, which ones don't, and which ones are just noise.

---

## The Problem

Your vector database grows over time. Documents get added, re-chunked, and updated. Old versions linger. Near-identical content gets indexed twice. Outdated files keep surfacing in results. Without tracking, you're building on top of noise. corpulse surfaces these issues automatically.

---

## What corpulse is (and isn't)

corpulse measures **corpus health** — which documents are retrieved, how often, and whether users act on them.

It does **not** measure answer quality, faithfulness, or relevance. For that, see tools like Ragas or DeepEval.

Think of it as a fitness tracker for your document corpus, not a grade on your answers.

---

## Installation

```bash
# Core library
pip install corpulse

# With Qdrant wrapper support
pip install "corpulse[qdrant]"

# From source
pip install "git+https://github.com/arkadyb/corpulse.git"
```

Requires Python 3.10+. The `[qdrant]` extra installs `qdrant-client>=1.7`.

Maintainers should use [.github/RELEASE_CHECKLIST.md](.github/RELEASE_CHECKLIST.md) for the first-release flow.

---

## Contributing and Security

Contributions are welcome through reviewed pull requests. See
[CONTRIBUTING.md](CONTRIBUTING.md) before opening a larger change.

Please report suspected vulnerabilities privately using the process in
[SECURITY.md](SECURITY.md), not through public issues.

---

## Quickstart: Manual API

```python
from corpulse import Corpulse

corp = Corpulse()  # writes to ./corpulse.db

# After your vector DB search returns results
results = [
    {"doc_id": "abc123", "filename": "guide.md", "score": 0.91},
    {"doc_id": "def456", "filename": "faq.md",   "score": 0.87},
]
corp.log_retrieval(results, query="how to install?")

# When user acts on a result
corp.log_engagement("abc123", event="opened")

# Print corpus health table
corp.report()
```

`report()` pretty-prints with [tabulate](https://pypi.org/project/tabulate/) if installed, falls back to plain text otherwise.

---

## Quickstart: Qdrant Wrapper

**Before (manual instrumentation):**

```python
from qdrant_client import QdrantClient
from corpulse import Corpulse

client = QdrantClient(":memory:")
corp = Corpulse()

result = client.query_points(collection_name="docs", query=[0.1, 0.2, ...], limit=5)
# Must manually extract results and call log_retrieval
records = [
    {"doc_id": str(p.id), "filename": p.payload.get("filename", str(p.id)), "score": p.score}
    for p in result.points
]
corp.log_retrieval(records, query="how to install?")
```

**After (automatic via wrapper):**

```python
from qdrant_client import QdrantClient
from corpulse import Corpulse, QdrantCorpulseClient

client = QdrantClient(":memory:")
corp = Corpulse()
wrapped = QdrantCorpulseClient(client, corp)

result = wrapped.query_points(collection_name="docs", query=[0.1, 0.2, ...], limit=5)
# log_retrieval() called automatically — result is unchanged
```

**Async variant:**

```python
import asyncio
from qdrant_client import AsyncQdrantClient
from corpulse import Corpulse, AsyncQdrantCorpulseClient

async def main():
    client = AsyncQdrantClient(":memory:")
    corp = Corpulse()
    wrapped = AsyncQdrantCorpulseClient(client, corp)

    result = await wrapped.query_points(
        collection_name="docs", query=[0.1, 0.2, ...], limit=5
    )
    # log_retrieval() called automatically

asyncio.run(main())
```

**Constructor parameters:**

- `payload_id_field` — payload key to use as document ID (default: `None`, uses Qdrant point ID)
- `payload_filename_key` — payload key for filename (default: `"filename"`)

**Advanced: generic wrapper engine**

If a client exposes stable query methods and you can normalize its native
response into Corpulse records, you can use the generic `wrap()` API instead of
writing a dedicated class:

```python
from corpulse import Corpulse, WrapMethod, wrap

wrapped = wrap(
    client,
    Corpulse(),
    methods={
        "search": WrapMethod(
            normalize=lambda result, args, kwargs: [
                {
                    "doc_id": hit["id"],
                    "filename": hit["name"],
                    "score": hit["score"],
                    "embedding": None,
                }
                for hit in result.hits
            ]
        )
    },
)
```

This removes most wrapper boilerplate, but each database still needs a
normalization recipe for its response shape.

---

## Async usage

corpulse ships a fully async interface via `AsyncCorpulse`. It returns structured data instead of printing, making it ideal for web services and async pipelines.

```python
import asyncio
from corpulse import AsyncCorpulse
from corpulse.backends import AsyncPostgresBackend

async def main():
    backend = await AsyncPostgresBackend.create(
        "postgresql://user:pass@localhost/mydb"
    )
    async with AsyncCorpulse(backend=backend) as corp:
        # Ingest: called after every vector DB query in your RAG pipeline
        await corp.log_retrieval(
            [{"doc_id": "abc123", "filename": "guide.md", "score": 0.91}],
            query="how to install?",
        )
        await corp.log_engagement("abc123", event="opened")

        ghosts = await corp.get_ghosts()
        print(f"Ghost docs: {len(ghosts)}")

        report = await corp.report(window_days=30)
        print(report["summary"])
        print(report["rows"][:3])

        cleanup = await corp.cleanup_report()
        print(cleanup["ghosts"])
        print(cleanup["suspects"])

asyncio.run(main())
```

`AsyncCorpulse.report()` and `AsyncCorpulse.cleanup_report()` return dictionaries with structured payloads, so you can log them, send them over HTTP, or render them in your own UI without parsing stdout.

## Workload Trace Capture

corpulse can capture append-only RAG request traces for observability and later replay work. Raw query and component content are optional; you can store hashes and references instead.

```python
from corpulse import Corpulse, AsyncCorpulse

corp = Corpulse()
corp.log_rag_request(
    session_id="session-123",
    query="What is the answer?",
    request_id="req-123",
    components=[
        {"type": "system_prompt", "token_count": 12, "refs": None, "content_hash": "sp-1", "metadata": None},
        {"type": "vector_db", "token_count": 42, "refs": [{"doc_id": "abc123"}], "content_hash": "vec-1", "metadata": {"top_k": 5}},
        {"type": "chat_history", "token_count": 18, "refs": [{"turn": 3}], "content_hash": None, "metadata": {"window": 4}},
        {"type": "user_input", "token_count": 9, "refs": None, "content_hash": "ui-1", "metadata": None},
    ],
    timings={"ttft_ms": 210, "tpot_ms": 18, "retrieval_ms": 42},
)

async def log_async(async_corp: AsyncCorpulse) -> None:
    await async_corp.alog_rag_request(
        session_id="session-123",
        query=None,
        request_id="req-124",
        components=[{"type": "other", "token_count": None, "refs": None, "content_hash": "fallback", "metadata": {"mode": "hash-only"}}],
        timings={"queue_ms": 7, "total_latency_ms": 409},
        timeout=False,
    )
```

### Workload Trace JSONL Import/Export

The JSONL schema version is `corpulse.rag_request_trace.v1`. Export is privacy-first by default: raw query text and component metadata are omitted unless you opt in.

```python
from corpulse import Corpulse, AsyncCorpulse

corp = Corpulse()
corp.export_rag_request_traces_jsonl("traces.jsonl")
corp.import_rag_request_traces_jsonl("traces.jsonl")

async def round_trip(async_corp: AsyncCorpulse) -> None:
    await async_corp.aexport_rag_request_traces_jsonl("traces.jsonl")
    await async_corp.aimport_rag_request_traces_jsonl("traces.jsonl")
```

Default export keeps the trace portable without raw content:

```json
{"captured_at":1710000000.0,"components":[{"content_hash":"vec-1","metadata":null,"refs":[{"doc_id":"abc123"}],"token_count":42,"type":"vector_db"}],"error":null,"input_token_count":42,"output_token_count":9,"query_hash":"abc123","query_text":null,"request_id":"req-123","schema_version":"corpulse.rag_request_trace.v1","session_id":"session-123","timeout":false,"timings":{"retrieval_ms":42.0,"ttft_ms":210.0}}
```

Use `include_raw_text=True` and `include_component_metadata=True` only when you explicitly want to export the raw trace payload.

Import is append-oriented and skips duplicate trace fingerprints by default, so re-importing the same JSONL file does not create duplicate analytics rows.

### Callable Replay

Replay uses captured or JSONL-imported workload traces and invokes your supplied callable once per trace. corpulse sorts traces by capture time, builds a replay request envelope, records success or failure, and does not store the callable return value.

```python
from corpulse import Corpulse

corp = Corpulse()

def replay_handler(request):
    print(request["request_id"], request["query_hash"])

replay = corp.replay_rag_request_traces(
    replay_handler,
    window_days=30,
    time_scale=None,
)
print(replay["summary"])
```

```python
async def async_replay_handler(request):
    print(request["request_id"], request["query_hash"])

replay = await async_corp.areplay_rag_request_traces(async_replay_handler)
```

`time_scale=None` means no sleeping. `time_scale=1.0` replays captured deltas in real time, and larger values replay faster. You can cap each scheduled delay with `max_delay_seconds`.

Core corpulse does not ship an OpenAI SDK, HTTP client, or benchmark exporter for replay. Users needing OpenAI-compatible endpoint replay should implement the supplied callable with their own raw prompt/message reconstruction, endpoint client, and result retention policy.

## Workload and Serving Reports

Use `workload_report()` to summarize request volume, throughput, burst windows, token pressure, and component composition. Use `serving_report()` to inspect TTFT, TPOT, total latency, stage latencies, percentiles, timeout rate, error rate, and slow-request contributors. Use `session_report()` to inspect conversation-level behavior across captured or JSONL-imported traces.

```python
from corpulse import Corpulse

corp = Corpulse()

workload = corp.workload_report(window_days=30)
print(workload["traffic"])
print(workload["tokens"])
print(workload["components"])

serving = corp.serving_report(window_days=30)
print(serving["ttft_ms"])
print(serving["slow_request_contributors"])

session = corp.session_report(window_days=30)
print(session["summary"])
print(session["sessions"])
print(session["context_reuse"])
```

These reports read the same captured or JSONL-imported traces exposed by `get_rag_request_traces()`. Session `summary` covers request count, turns per session, duration, follow-up rate, and history growth; `sessions` contains per-session timing and token growth details. `context_reuse` surfaces repeated refs or content hashes within the same session, without semantic matching, cache recommendations, LLM-as-judge, or online inference dependencies.

## Generation trace capture

corpulse also supports append-only trace capture for future generation metrics. Use it to store the prompt or query text, the retrieved context references you fed into generation, the final answer text, and optional evaluation labels.

```python
from corpulse import Corpulse

corp = Corpulse()
corp.log_generation_trace(
    prompt_text="Answer the user's question",
    retrieved_context_refs=[{"doc_id": "abc123", "chunk_id": "c-1"}],
    final_answer_text="Here is the response.",
    evaluation_labels=["grounded"],
)

traces = corp.get_generation_traces()
```

Trace records are read-only once written and do not change any existing corpus-health analytics.

---

## What It Measures

- Ghost documents — registered but never retrieved within a time window
- Near-duplicates — embedding pairs above a cosine similarity threshold (requires scikit-learn)
- Obsolete versions — e.g. `api-v1.md` superseded by `api-v2.md`
- Stale embeddings — source file updated but embedding not refreshed
- Low-engagement suspects — retrieved often but users rarely act on them
- Mean Reciprocal Rank — retrieval-order quality proxy based on existing ranks plus engagement overlap
- User Acceptance Rate — share of engagement rows whose `event_type` is one of `opened`, `clicked`, `copied`, or `thumbs_up`
- Generation trace capture — append-only prompt/query text, retrieved context refs, final answer text, and optional labels for future generation metrics

---

## Configuration

```python
corp = Corpulse(
    db_path="./corpulse.db",          # SQLite database path
    ghost_threshold_days=30,         # Days before flagging as ghost
    duplicate_threshold=0.92,        # Cosine similarity threshold
    stale_threshold_days=14,         # Days of source-vs-embedding lag
    obsolete_pattern=r"v\d+",        # Regex for version detection in filenames
    top_k_report=20,                 # Documents shown in report()
)
```

---

## Analysis Methods

All analysis methods use the configured lookback window. If you do not pass `window_days`, corpulse uses `ghost_threshold_days`.

| Method | What it measures | Example use |
|--------|------------------|-------------|
| `get_ghosts()` | Documents that were registered but not retrieved during the lookback window. | Find files that exist in the index but never show up in search, such as a stale draft nobody clicks. |
| `get_duplicates()` | Pairs of documents whose embeddings are above the configured cosine similarity threshold. | Spot near-identical files like `api-v1.md` and `api-v1-copy.md` that are both being indexed. |
| `get_obsolete()` | Older documents that appear to have been superseded by a newer filename version. | Detect versioned docs such as `guide-v1.md` that should probably be replaced by `guide-v2.md`. |
| `get_stale_embeddings()` | Documents whose source file timestamp is newer than the stored embedding timestamp. | Catch a document that was edited yesterday but still has an embedding from last week. |
| `get_suspects()` | Documents with high retrieval volume but low engagement rate. | Identify pages that are frequently returned by search but rarely opened or acted on. |
| `mean_reciprocal_rank()` | A retrieval-order quality proxy based on retrieval rank and whether the document was engaged with. Higher is better. | Use it to check whether documents that users actually interact with tend to appear near the top of results. |
| `acceptance_rate()` | The share of engagement events whose normalized `event_type` is in the accepted allowlist: `opened`, `clicked`, `copied`, or `thumbs_up`. | If you log 80 total engagement events and 60 are opens/clicks/copies/thumbs-up, the acceptance rate is `0.75`. |
| `corpus_health()` | A summary of corpus noise: ghosts, obsolete docs, stale embeddings, duplicates, plus a bloat warning and recommendation. | Get a quick “how healthy is my index?” snapshot before deciding whether cleanup is urgent. |
| `to_dataframe()` | A per-document pandas DataFrame with retrievals, engagements, engagement rate, and status. | Load the full stats into a notebook or BI tool to sort by retrievals and inspect outliers. |
| `report()` | A human-readable corpus health report printed to stdout. | Run it in a CLI job or cron task to print a quick snapshot without writing custom formatting code. |
| `cleanup_report()` | A prioritized cleanup payload with ghosts, obsolete docs, stale embeddings, and suspects. | Feed it into a maintenance workflow that decides what to delete, refresh, or review first. |

---

## License

MPL 2.0 — see [LICENSE](LICENSE) for details.
