"""In-memory vector store — cosine over a Python list.
Zero dependencies. Default for dev, tests, and small corpora (up
to ~10K chunks before search latency starts to bite). For larger
corpora swap to :class:`FAISSVectorStore` (in-process ANN) or
:class:`ChromaVectorStore` / :class:`PostgresVectorStore`
(persistent).
Beyond the protocol contract this backend additionally supports:
* **Diversity (MMR)** — pass ``diversity=0.3`` to :meth:`search`
for varied top-k.
* **Hybrid search** — :meth:`search_hybrid` combines BM25 lexical
scores with vector similarity via Reciprocal Rank Fusion.
* **Persistence** — :meth:`save` / :meth:`load` round-trip the
store to JSON on disk.
"""
from __future__ import annotations
import json
import math
from collections.abc import Mapping
from pathlib import Path
from typing import Any
from ..core.ids import new_id
from ..core.protocols import Embedder
from ..loader.base import Chunk
from ._bm25 import BM25Index, reciprocal_rank_fusion
from ._filter import evaluate_filter
from ._mmr import mmr_select
from .base import SearchResult, _chunks_from_texts
def _cosine(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b, strict=True))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
if na == 0 or nb == 0:
return 0.0
return dot / (na * nb)
_PERSIST_VERSION = 1
[docs]
class InMemoryVectorStore:
"""In-process vector store backed by a Python list."""
name = "in-memory"
def __init__(self, embedder: Embedder) -> None:
if embedder is None:
raise ValueError("embedder is required")
self._embedder = embedder
# Parallel lists. Allocated per-instance so concurrent stores
# in the same process don't share state.
self._ids: list[str] = []
self._chunks: list[Chunk] = []
self._vectors: list[list[float]] = []
self._bm25: BM25Index | None = None # built lazily
@property
def embedder(self) -> Embedder:
return self._embedder
# ---------------------------------------------------------------
# Factory classmethods — explicit kwargs so IDEs autocomplete
# ---------------------------------------------------------------
[docs]
@classmethod
async def from_chunks(
cls,
chunks: list[Chunk],
*,
embedder: Embedder,
ids: list[str] | None = None,
) -> InMemoryVectorStore:
"""One-shot: construct an InMemoryVectorStore + add ``chunks``."""
store = cls(embedder=embedder)
await store.add(chunks, ids=ids)
return store
[docs]
@classmethod
async def from_texts(
cls,
texts: list[str],
*,
embedder: Embedder,
metadatas: list[dict[str, Any]] | None = None,
ids: list[str] | None = None,
) -> InMemoryVectorStore:
"""One-shot: construct an InMemoryVectorStore from raw text
strings (each becomes a :class:`Chunk` with the matching
metadata dict, or empty if ``metadatas`` is None)."""
return await cls.from_chunks(
_chunks_from_texts(texts, metadatas),
embedder=embedder,
ids=ids,
)
# ---------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------
[docs]
async def add(
self,
chunks: list[Chunk],
ids: list[str] | None = None,
) -> list[str]:
if not chunks:
return []
if ids is not None and len(ids) != len(chunks):
raise ValueError(
f"ids length ({len(ids)}) must match chunks "
f"length ({len(chunks)})"
)
try:
vectors = await self._embedder.embed_batch(
[c.content for c in chunks]
)
except (AttributeError, NotImplementedError):
vectors = [
await self._embedder.embed(c.content) for c in chunks
]
assigned_ids: list[str] = []
for i, (chunk, vec) in enumerate(
zip(chunks, vectors, strict=True)
):
cid = ids[i] if ids is not None else new_id("vec")
self._ids.append(cid)
self._chunks.append(chunk)
self._vectors.append(vec)
assigned_ids.append(cid)
# Invalidate BM25 index — built lazily on next hybrid query.
self._bm25 = None
return assigned_ids
[docs]
async def delete(self, ids: list[str]) -> None:
if not ids:
return
kill = set(ids)
keep_indices = [
i for i, cid in enumerate(self._ids) if cid not in kill
]
self._ids = [self._ids[i] for i in keep_indices]
self._chunks = [self._chunks[i] for i in keep_indices]
self._vectors = [self._vectors[i] for i in keep_indices]
self._bm25 = None
[docs]
async def get_by_ids(self, ids: list[str]) -> list[Chunk]:
if not ids:
return []
index = {cid: i for i, cid in enumerate(self._ids)}
return [
self._chunks[index[cid]] for cid in ids if cid in index
]
[docs]
async def count(self) -> int:
return len(self._ids)
# ---------------------------------------------------------------
# Search
# ---------------------------------------------------------------
[docs]
async def search(
self,
query: str,
*,
k: int = 4,
filter: Mapping[str, Any] | None = None,
diversity: float | None = None,
) -> list[SearchResult]:
q_vec = await self._embedder.embed(query)
return await self.search_by_vector(
q_vec, k=k, filter=filter, diversity=diversity
)
[docs]
async def search_by_vector(
self,
vector: list[float],
*,
k: int = 4,
filter: Mapping[str, Any] | None = None,
diversity: float | None = None,
) -> list[SearchResult]:
if not self._vectors:
return []
# Step 1: cosine-rank everything that survives the filter.
scored: list[tuple[int, float]] = []
for i, vec in enumerate(self._vectors):
if not evaluate_filter(filter, self._chunks[i].metadata):
continue
scored.append((i, _cosine(vector, vec)))
scored.sort(key=lambda x: x[1], reverse=True)
if diversity is None or diversity <= 0:
return [
SearchResult(
chunk=self._chunks[i],
score=score,
id=self._ids[i],
)
for i, score in scored[:k]
]
# Step 2: MMR rerank over a candidate pool. Pool is k*4 (or
# the full filtered set if smaller) — wider pool = better
# diversity, more compute. k*4 is the LangChain default.
pool_size = min(len(scored), max(k * 4, 20))
pool = scored[:pool_size]
pool_indices = [i for i, _ in pool]
pool_vecs = [self._vectors[i] for i in pool_indices]
chosen_in_pool = mmr_select(
vector, pool_vecs, k, diversity=diversity
)
return [
SearchResult(
chunk=self._chunks[pool_indices[p]],
score=pool[p][1],
id=self._ids[pool_indices[p]],
)
for p in chosen_in_pool
]
[docs]
async def search_hybrid(
self,
query: str,
*,
k: int = 4,
filter: Mapping[str, Any] | None = None,
alpha: float = 0.5,
) -> list[SearchResult]:
"""Hybrid lexical (BM25) + vector search via RRF.
``alpha`` is in [0, 1]: 0 = pure BM25, 1 = pure vector,
0.5 = even weighting (RRF default). Both rankings are
computed independently and fused by Reciprocal Rank Fusion,
then the top-``k`` survivors are returned.
Embeddings catch semantic similarity ("automobile" ↔ "car"),
BM25 catches exact-term hits (model names, error codes,
person names) — together they outperform either alone on
most retrieval benchmarks.
"""
if not self._vectors:
return []
alpha = max(0.0, min(1.0, alpha))
# Build BM25 index lazily (covers the whole corpus including
# filtered-out items; we apply the filter post-rank).
if self._bm25 is None:
self._bm25 = BM25Index()
self._bm25.add([c.content for c in self._chunks])
# Rank by vector.
q_vec = await self._embedder.embed(query)
vector_scored: list[tuple[int, float]] = []
for i, vec in enumerate(self._vectors):
if not evaluate_filter(filter, self._chunks[i].metadata):
continue
vector_scored.append((i, _cosine(q_vec, vec)))
vector_scored.sort(key=lambda x: x[1], reverse=True)
# Rank by BM25 (apply the same filter).
bm25_raw = self._bm25.search(query, k=len(self._ids))
bm25_scored = [
(i, s)
for i, s in bm25_raw
if evaluate_filter(filter, self._chunks[i].metadata)
]
# Fuse. RRF ignores raw score magnitudes so we use ``alpha``
# by replicating each ranking proportionally — alpha=0.7
# means "vector ranking counts 70%, BM25 30%".
rankings: list[list[tuple[int, float]]] = []
if alpha > 0:
rankings.append(vector_scored)
if alpha < 1:
rankings.append(bm25_scored)
# Weight by replicating: scale tells RRF how strongly to
# weight each list. Three buckets cover the common cases.
if 0 < alpha < 1 and abs(alpha - 0.5) > 0.05:
extra = vector_scored if alpha > 0.5 else bm25_scored
weight_replications = max(
1, int(round(abs(alpha - 0.5) * 8))
)
rankings.extend([extra] * weight_replications)
fused = reciprocal_rank_fusion(rankings)
top = fused[:k]
return [
SearchResult(
chunk=self._chunks[idx],
score=score,
id=self._ids[idx],
)
for idx, score in top
]
# ---------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------
[docs]
async def save(self, path: str | Path) -> None:
"""Write the full store (chunks + vectors + ids) to a JSON
file. The embedder is NOT serialized — supply the same
embedder when calling :meth:`load`."""
target = Path(path)
data = {
"version": _PERSIST_VERSION,
"dimensions": (
len(self._vectors[0]) if self._vectors else None
),
"rows": [
{
"id": self._ids[i],
"content": self._chunks[i].content,
"metadata": self._chunks[i].metadata,
"vector": self._vectors[i],
}
for i in range(len(self._ids))
],
}
target.write_text(json.dumps(data)) # noqa: ASYNC240 — sync I/O is fine here, persistence is rare and small
[docs]
@classmethod
async def load(
cls, path: str | Path, *, embedder: Embedder
) -> InMemoryVectorStore:
"""Restore a store previously :meth:`save`-d. Pass the same
embedder kind/dimensions or queries will produce nonsense
scores."""
data = json.loads(Path(path).read_text()) # noqa: ASYNC240 — sync I/O is fine here, called once at startup
if data.get("version") != _PERSIST_VERSION:
raise ValueError(
f"Unsupported persist version: "
f"{data.get('version')!r} (expected {_PERSIST_VERSION})"
)
store = cls(embedder=embedder)
for row in data["rows"]:
store._ids.append(row["id"])
store._chunks.append(
Chunk(
content=row["content"],
metadata=row["metadata"],
)
)
store._vectors.append(row["vector"])
return store