# Arke Terminal — Source Dump
# Generated: 2026-04-15T23:33:40Z


# ═══════════════════════════════════════════
# arke/api.py
# ═══════════════════════════════════════════

"""REST API for Arke. Start with `arke serve`."""
from contextlib import asynccontextmanager
from pathlib import Path

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel

from . import Arke
from .config import Config

STATIC_DIR = Path(__file__).parent / "static"


engine: Arke
cfg: Config


@asynccontextmanager
async def lifespan(_app: FastAPI):
    global engine, cfg
    cfg = Config.from_env()
    engine = Arke(cfg)
    await engine.open()
    yield
    await engine.close()


app = FastAPI(title="Arke", lifespan=lifespan)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)


class AskRequest(BaseModel):
    query: str


class IngestRequest(BaseModel):
    source: str


class SweepRequest(BaseModel):
    level: str
    limit: int


@app.get("/health")
async def health():
    return {"status": "ok"}


@app.post("/ask")
async def ask(req: AskRequest):
    result = await engine.ask(req.query)
    sources = [
        {
            "source": h.chunk.source,
            "content": h.chunk.content[:300],
            "similarity": round(h.similarity, 3),
        }
        for h in result.hits
    ]
    return {"answer": result.answer, "sources": sources}


@app.post("/ingest")
async def ingest(req: IngestRequest):
    await engine.ingest(req.source)
    return {"status": "ok"}


@app.post("/sweep")
async def sweep(req: SweepRequest):
    rows = await Arke.sweep(cfg, req.level, req.limit)
    return {"rows": [{**vars(row.metrics), **vars(row.cfg)} for row in rows]}


@app.get("/open/{source:path}")
async def open_file(source: str):
    """Opens a document in the OS default application. Local-only."""
    import subprocess, sys
    base = Path(cfg.data_path)
    full = (base / source).resolve()
    if not full.exists():
        matches = list(base.rglob(source))
        if not matches:
            return {"error": "not found"}
        full = matches[0].resolve()

    if sys.platform == "linux":
        subprocess.Popen(["xdg-open", str(full)])
    elif sys.platform == "darwin":
        subprocess.Popen(["open", str(full)])
    else:
        subprocess.Popen(["start", str(full)], shell=True)

    return {"status": "ok"}


if STATIC_DIR.is_dir():
    @app.get("/{path:path}")
    async def spa_fallback(path: str):
        file = STATIC_DIR / path
        if file.is_file():
            return FileResponse(file)
        return FileResponse(STATIC_DIR / "index.html")

# ═══════════════════════════════════════════
# arke/cache.py
# ═══════════════════════════════════════════

"""File-based JSONL cache for expensive operations."""
import hashlib
import json
from pathlib import Path

CACHE_DIR = Path(".cache")


def corpus_hash(source_path: str) -> str:
    """Deterministic hash of source file(s) content."""
    h = hashlib.md5()
    p = Path(source_path)
    if p.is_file():
        h.update(p.read_bytes())
    elif p.is_dir():
        for f in sorted(p.rglob("*.jsonl")):
            h.update(f.read_bytes())
    return h.hexdigest()[:12]


class Cache:
    """Generic JSONL cache. Doesn't know what it stores — just dicts."""

    def __init__(self, **params: object) -> None:
        raw = json.dumps(params, sort_keys=True, default=str)
        key = hashlib.md5(raw.encode()).hexdigest()[:16]
        self._path = CACHE_DIR / f"{key}.jsonl"

    @property
    def path(self) -> Path:
        return self._path

    def exists(self) -> bool:
        return self._path.exists()

    def load(self) -> list[dict] | None:
        if not self._path.exists():
            return None
        try:
            return [json.loads(line) for line in self._path.read_text().splitlines()]
        except (json.JSONDecodeError, UnicodeDecodeError):
            self._path.unlink(missing_ok=True)
            print(f"cache corrupt, deleted: {self._path}")
            return None

    def save(self, rows: list[dict]) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)
        with open(self._path, "w") as f:
            for row in rows:
                f.write(json.dumps(row) + "\n")

# ═══════════════════════════════════════════
# arke/chunker.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

# Strongest separators first. Empty string is the final fallback (per-character).
SEPARATORS = ["\n\n", "\n", ". ", ", ", " ", ""]


@dataclass(frozen=True)
class ChunkData:
    """A chunk with its overlap context. The embedding is computed from
    `overlapped()` so the vector 'knows' about its neighbors, but only
    `clean` is stored in the database — no text duplication between rows."""
    clean: str
    head: str
    tail: str

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail


def chunk(text: str, chunk_size: int, overlap: float) -> list[ChunkData]:
    """Recursive character text splitter: split by separator hierarchy,
    then greedily merge adjacent pieces up to chunk_size."""
    if not text:
        return []

    clean = _merge(_separate(text.strip(), chunk_size), chunk_size)
    overlap_chars = int(chunk_size * overlap)
    result: list[ChunkData] = []

    for i, piece in enumerate(clean):
        head = clean[i - 1][-overlap_chars:] if (overlap_chars > 0 and i > 0) else ""
        tail = clean[i + 1][:overlap_chars] if (overlap_chars > 0 and i < len(clean) - 1) else ""
        result.append(ChunkData(clean=piece, head=head, tail=tail))

    return result


def _separate(text: str, chunk_size: int, depth: int = 0) -> list[str]:
    if depth >= len(SEPARATORS):
        return [text]

    sep = SEPARATORS[depth]
    parts = text.split(sep)
    result: list[str] = []

    for i, part in enumerate(parts):
        if len(part) < chunk_size:
            # Reattach the separator as a suffix so _merge can reconstruct
            # the original text without losing whitespace or newlines.
            suffix = sep if i < len(parts) - 1 else ""
            result.append(part + suffix)
        else:
            result.extend(_separate(part, chunk_size, depth + 1))

    return result


def _merge(splits: list[str], chunk_size: int) -> list[str]:
    raw: list[str] = []
    for s in splits:
        if not s:
            continue

        if raw and len(raw[-1]) + len(s) < chunk_size:
            raw[-1] += s
        else:
            raw.append(s)

    return raw

# ═══════════════════════════════════════════
# arke/cli.py
# ═══════════════════════════════════════════

import asyncio

import click
from dotenv import load_dotenv

from . import Arke
from .config import Config


def _arke() -> Arke:
    return Arke(Config.from_env())


@click.group(help="Arke — RAG with built-in eval")
def app() -> None:
    pass


@app.command(help="Digest raw source into cached JSONL.")
@click.argument("source", default="")
def digest(source: str) -> None:
    cfg = Config.from_env()
    data_path = source or cfg.data_path
    Arke.digest(data_path)


@app.command(help="Ingest documents from a JSONL file or directory.")
@click.argument("source")
def ingest(source: str) -> None:
    async def run() -> None:
        async with _arke() as m:
            await m.ingest(source)
    asyncio.run(run())


@app.command(help="Ask a question against the ingested corpus.")
@click.argument("query")
def ask(query: str) -> None:
    async def run() -> None:
        async with _arke() as m:
            result = await m.ask(query)
            print(f"\n{result.answer}\n")
    asyncio.run(run())


@app.command(help="Run an eval sweep across preset configurations.")
@click.argument("level")
@click.option("--limit", "-l", default=30, type=int, help="Number of sample chunks for eval")
def sweep(level: str, limit: int) -> None:
    async def run() -> None:
        cfg = Config.from_env()
        await Arke.sweep(cfg, level, limit)
    asyncio.run(run())


@app.command(help="Start the REST API server.")
@click.option("--port", "-p", default=8000, type=int, help="Port to listen on")
def serve(port: int) -> None:
    import uvicorn
    from .api import app as api_app
    uvicorn.run(api_app, host="0.0.0.0", port=port)


def main() -> None:
    load_dotenv()
    try:
        app()
    except (RuntimeError, ValueError) as exc:
        print(f"error: {exc}")
        raise SystemExit(1)


if __name__ == "__main__":
    main()

# ═══════════════════════════════════════════
# arke/config.py
# ═══════════════════════════════════════════

from __future__ import annotations

import os
from dataclasses import dataclass, replace


@dataclass(frozen=True)
class Config:
    database_url: str = ""
    data_path: str = ""
    api_key: str = ""
    embedder_url: str = ""
    embedder_model: str = ""
    embedding_dim: int = 0
    inference_url: str = ""
    inference_model: str = ""
    chunk_size: int = 600
    overlap: float = 0.0
    alpha: float = 0.7
    k: int = 5

    def resolved(self) -> Config:
        """Fills empty fields from defaults, validates, returns new Config."""
        if not self.database_url:
            raise ValueError("config: database_url is required")
        if not self.data_path:
            raise ValueError("config: data_path is required — set DATA_PATH in .env")

        cfg = replace(
            self,
            embedder_url=self.embedder_url or DEFAULTS.embedder_url,
            embedder_model=self.embedder_model or DEFAULTS.embedder_model,
            embedding_dim=self.embedding_dim or DEFAULTS.embedding_dim,
            inference_url=self.inference_url or DEFAULTS.inference_url,
            inference_model=self.inference_model or DEFAULTS.inference_model,
        )

        if cfg.chunk_size < 100 or cfg.chunk_size > 10000:
            raise ValueError(f"config: chunk_size must be 100..10000, got {cfg.chunk_size}")
        if cfg.overlap < 0 or cfg.overlap > 0.5:
            raise ValueError(f"config: overlap must be 0..0.5, got {cfg.overlap}")
        if cfg.alpha < 0 or cfg.alpha > 1:
            raise ValueError(f"config: alpha must be 0..1, got {cfg.alpha}")
        if cfg.k < 1 or cfg.k > 20:
            raise ValueError(f"config: k must be 1..20, got {cfg.k}")

        return cfg

    @staticmethod
    def from_env() -> Config:
        return Config(
            database_url=os.environ.get("DATABASE_URL", ""),
            data_path=os.environ.get("DATA_PATH", ""),
            api_key=os.environ.get("API_KEY", ""),
            embedder_url=os.environ.get("EMBEDDER_URL", ""),
            embedder_model=os.environ.get("EMBEDDER_MODEL", ""),
            embedding_dim=int(os.environ.get("EMBEDDING_DIM", "0")),
            inference_url=os.environ.get("INFERENCE_URL", ""),
            inference_model=os.environ.get("INFERENCE_MODEL", ""),
        )


DEFAULTS = Config(
    embedder_url="http://localhost:11434",
    embedder_model="bge-m3",
    embedding_dim=1024,
    inference_url="http://localhost:11434",
    inference_model="llama3:8b-instruct-q4_K_M",
)

# ═══════════════════════════════════════════
# arke/corpus.py
# ═══════════════════════════════════════════

"""LegalBench-RAG downloader. Used by digest when DATA_PATH is the Dropbox URL."""
import io
import zipfile
from pathlib import Path
from urllib.error import URLError
from urllib.request import urlopen

LEGALBENCH_URL = "https://www.dropbox.com/scl/fo/r7xfa5i3hdsbxex1w6amw/AID389Olvtm-ZLTKAPrw6k4?rlkey=5n8zrbk4c08lbit3iiexofmwg&st=0hu354cq&dl=1"
DATA_DIR = Path.home() / ".arke" / "data" / "legalbench-rag"


def download_legalbench() -> str:
    """Download and extract LegalBench-RAG corpus. Returns path to corpus dir.
    Idempotent — skips download if already present."""
    corpus_dir = DATA_DIR / "corpus"
    if corpus_dir.exists() and any(corpus_dir.rglob("*.txt")):
        print(f"legalbench-rag already at {corpus_dir}")
        return str(corpus_dir)

    print("downloading legalbench-rag (~87 MB)...")
    try:
        raw = urlopen(LEGALBENCH_URL).read()
    except URLError as exc:
        raise RuntimeError(f"failed to download legalbench-rag: {exc}") from exc

    DATA_DIR.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(io.BytesIO(raw)) as zf:
        zf.extractall(DATA_DIR)

    if not corpus_dir.exists():
        raise RuntimeError(f"expected {corpus_dir} after extraction, not found")

    count = sum(1 for _ in corpus_dir.rglob("*.txt"))
    print(f"extracted {count} documents to {corpus_dir}")
    return str(corpus_dir)

# ═══════════════════════════════════════════
# arke/db.py
# ═══════════════════════════════════════════

import asyncpg
import pgvector.asyncpg

from .config import Config
from .types import Chunk, SearchHit


BATCH_SIZE = 50


class Db:
    def __init__(self, dsn: str, embedding_dim: int) -> None:
        self._dsn = dsn
        self._embedding_dim = embedding_dim

    async def open(self) -> None:
        try:
            self._pool = await asyncpg.create_pool(dsn=self._dsn, init=_init_conn)
        except (OSError, asyncpg.PostgresError) as exc:
            raise RuntimeError(f"database connection failed ({self._dsn}): {exc}") from exc

    async def close(self) -> None:
        await self._pool.close()

    async def init_schema(self) -> None:
        try:
            await self._init_schema()
        except asyncpg.UndefinedObjectError as exc:
            raise RuntimeError("pgvector extension is not installed on this PostgreSQL server") from exc

    async def _init_schema(self) -> None:
        async with self._pool.acquire() as conn:
            await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
            await conn.execute(f"""
                CREATE TABLE IF NOT EXISTS chunks (
                    id          TEXT PRIMARY KEY,
                    source      TEXT NOT NULL CHECK (length(source) > 0),
                    chunk_index INTEGER NOT NULL CHECK (chunk_index >= 0),
                    content     TEXT NOT NULL CHECK (length(content) > 0),
                    embedding   vector({self._embedding_dim}) NOT NULL,
                    metadata    JSONB NOT NULL DEFAULT '{{}}' CHECK (jsonb_typeof(metadata) = 'object'),
                    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
                    tsv         tsvector GENERATED ALWAYS AS (to_tsvector('simple', content)) STORED NOT NULL
                )
            """)
            await conn.execute("CREATE INDEX IF NOT EXISTS chunks_tsv_idx ON chunks USING GIN (tsv)")
            await conn.execute(f"CREATE INDEX IF NOT EXISTS chunks_embedding_idx ON chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)")

    async def insert(self, chunks: list[Chunk]) -> None:
        rows = [c.to_row() for c in chunks]
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                for offset in range(0, len(rows), BATCH_SIZE):
                    await conn.executemany(
                        """
                        INSERT INTO chunks (id, source, chunk_index, content, embedding, metadata, created_at)
                        VALUES ($1, $2, $3, $4, $5, $6, $7)
                        ON CONFLICT (id) DO NOTHING
                        """,
                        rows[offset : offset + BATCH_SIZE],
                    )

    async def search(self, cfg: Config, query_vec: list[float], query_text: str) -> list[SearchHit]:
        """Hybrid search: alpha*cosine + (1-alpha)*bm25_normalized."""
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """
                WITH scored AS (
                    SELECT id, content, source, chunk_index, metadata, created_at, embedding,
                           1 - (embedding <=> $1) AS cosine_raw,
                           ts_rank(tsv, plainto_tsquery('simple', $3)) AS bm25_raw
                    FROM chunks
                ),
                bounds AS (SELECT max(bm25_raw) AS max_bm25 FROM scored)
                SELECT s.id, s.content, s.source, s.chunk_index, s.metadata, s.created_at, s.embedding,
                       ($4 * s.cosine_raw +
                        (1.0 - $4) * CASE WHEN b.max_bm25 > 0 THEN s.bm25_raw / b.max_bm25 ELSE 0 END
                       ) AS similarity
                FROM scored s, bounds b
                ORDER BY similarity DESC
                LIMIT $2
                """,
                query_vec, cfg.k, query_text, cfg.alpha,
            )
        return [
            SearchHit(chunk=Chunk.from_dict(dict(r)), similarity=float(r["similarity"]))
            for r in rows
        ]

    async def fetch_all(self) -> list[Chunk]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch("SELECT id, source, chunk_index, content, embedding, metadata, created_at FROM chunks")
        return [Chunk.from_dict(dict(r)) for r in rows]

    async def truncate(self) -> None:
        async with self._pool.acquire() as conn:
            await conn.execute("TRUNCATE chunks")

    async def sample(self, limit: int) -> list[Chunk]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch("SELECT id, source, chunk_index, content, embedding, metadata, created_at FROM chunks ORDER BY random() LIMIT $1", limit)
        return [Chunk.from_dict(dict(r)) for r in rows]


async def _init_conn(conn: asyncpg.Connection) -> None:
    await pgvector.asyncpg.register_vector(conn)

# ═══════════════════════════════════════════
# arke/digest.py
# ═══════════════════════════════════════════

"""Digest: parse raw source into cached JSONL.

Converts any supported source (local JSONL, URL) into a normalized
cache file that sweep and ingest can consume. This is the single
entry point for all data — no silent downloads, no fallbacks.
"""
from .cache import Cache, corpus_hash
from .corpus import download_legalbench
from .loader import load_docs
from .types import Doc


def digest(data_path: str) -> str:
    """Parse source into cached JSONL. Returns path to cache file.
    Idempotent — if cache exists for this source, returns immediately."""
    if not data_path:
        raise ValueError("data_path is required — set DATA_PATH in .env")

    if data_path.startswith("http"):
        return _digest_url(data_path)
    return _digest_local(data_path)


def _digest_url(url: str) -> str:
    corpus_path = download_legalbench()
    return _digest_local(corpus_path)


def _digest_local(path: str) -> str:
    chash = corpus_hash(path)
    cache = Cache(local=chash)
    if cache.exists():
        return str(cache.path)

    docs = load_docs(path)
    rows = [_doc_to_dict(d) for d in docs]
    cache.save(rows)
    print(f"digested {len(rows)} docs from {path}")
    return str(cache.path)


def _doc_to_dict(doc: Doc) -> dict:
    return {
        "content": doc.content,
        "source": doc.source,
        "metadata": doc.metadata,
        "created_at": doc.created_at.isoformat(),
    }

# ═══════════════════════════════════════════
# arke/gen.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

import asyncio
import json
import re

from . import Arke
from .models import chat

CONCURRENCY = 10


@dataclass(frozen=True)
class EvalCase:
    query: str
    expected_ids: list[str]


async def make_cases(engine: Arke, limit: int) -> list[EvalCase]:
    """Samples random chunks and generates eval questions concurrently."""
    samples = await engine.db.sample(limit)
    semaphore = asyncio.Semaphore(CONCURRENCY)
    done = 0

    async def process(chunk) -> list[EvalCase]:
        nonlocal done
        async with semaphore:
            questions = await _generate_questions(engine, chunk.content)
            done += 1
            print(f"  gen {done}/{len(samples)}")
            return [EvalCase(query=q, expected_ids=[chunk.id]) for q in questions]

    tasks = [process(chunk) for chunk in samples]
    results = await asyncio.gather(*tasks)

    cases = [case for batch in results for case in batch]
    print(f"gen done: {len(cases)} cases from {len(samples)} chunks")
    return cases


async def _generate_questions(engine: Arke, content: str) -> list[str]:
    instruction = "You will receive a text chunk from a personal knowledge base. Generate 1 to 3 questions that ONLY this specific chunk can answer. Questions must include specific details from the text — names, numbers, dates, unique terms. Avoid generic questions. Return ONLY a JSON array of strings, nothing else. Example: [\"question 1\", \"question 2\"]"
    prompt = instruction + "\n\nChunk:\n" + content
    raw = await chat(engine.cfg, engine.http, None, prompt)
    raw = raw.strip()

    match = re.search(r"\[[\s\S]*]", raw)
    if not match:
        return []

    try:
        parsed = json.loads(match.group(0))
    except json.JSONDecodeError:
        return []

    if not isinstance(parsed, list):
        return []

    return [q for q in parsed if isinstance(q, str) and q]

# ═══════════════════════════════════════════
# arke/__init__.py
# ═══════════════════════════════════════════

from __future__ import annotations

import hashlib

import httpx

from .chunker import chunk
from .config import Config
from .db import Db
from .loader import load_docs
from .models import chat, embed
from .types import Chunk, SearchAnswer, SearchHit


class Arke:
    """RAG engine with built-in eval. Lifecycle: Arke(cfg) → open() → work → close().
    Or use `async with Arke(cfg) as m:` for automatic cleanup."""

    def __init__(self, cfg: Config) -> None:
        self.cfg = cfg.resolved()

    async def open(self) -> None:
        self.db = Db(self.cfg.database_url, self.cfg.embedding_dim)
        await self.db.open()
        await self.db.init_schema()
        self.http = httpx.AsyncClient(timeout=60.0)

    async def close(self) -> None:
        await self.db.close()
        await self.http.aclose()

    async def __aenter__(self) -> Arke:
        await self.open()
        return self

    async def __aexit__(self, *_: object) -> None:
        await self.close()

    async def reset(self) -> None:
        await self.db.truncate()

    async def ingest(self, source_path: str) -> None:
        docs = load_docs(source_path)

        class Piece:
            def __init__(self, doc, idx, raw):
                self.doc = doc
                self.idx = idx
                self.raw = raw

        pieces = []
        for doc in docs:
            for idx, raw in enumerate(chunk(doc.content, self.cfg.chunk_size, self.cfg.overlap)):
                pieces.append(Piece(doc, idx, raw))

        texts = [p.raw.overlapped() for p in pieces]
        vectors = await embed(self.cfg, self.http, texts)
        print(f"embedded {len(texts)} chunks in one call")

        chunks = [
            Chunk(
                id=hashlib.md5(f"{p.doc.source}:{p.idx}:{p.raw.clean}".encode()).hexdigest(),
                source=p.doc.source,
                chunk_index=p.idx,
                content=p.raw.clean,
                embedding=vectors[i],
                metadata=p.doc.metadata,
                created_at=p.doc.created_at,
            )
            for i, p in enumerate(pieces)
        ]
        await self.db.insert(chunks)
        print(f"ingest done: {len(chunks)} chunks from {len(docs)} docs")

    async def ask(self, query: str) -> SearchAnswer:
        vectors = await embed(self.cfg, self.http, [query])
        hits = await self.db.search(self.cfg, vectors[0], query)

        if hits:
            answer = await self._answer_with_context(query, hits)
        else:
            answer = await self._answer_without_context(query)
        return SearchAnswer(answer=answer, hits=hits)

    async def _answer_with_context(self, query: str, hits: list[SearchHit]) -> str:
        prompt = "You are a document search assistant. Answer ONLY from the provided context. Be concise — use bullet points, cite document names. No recommendations, no disclaimers, no suggestions to contact anyone. If the context is insufficient, say what IS available instead."

        parts: list[str] = []
        for i, h in enumerate(hits):
            c = h.chunk
            date = c.created_at.date().isoformat()
            parts.append(f"[{i + 1}] ({date}, {c.source}, sim={h.similarity:.3f})\n{c.content}")
        context = "\n\n".join(parts)

        return await chat(self.cfg, self.http, prompt, f"Context:\n{context}\n\nQuestion: {query}")

    async def _answer_without_context(self, query: str) -> str:
        prompt = "You are a knowledge assistant. Answer the question directly based on your general knowledge. Answer in the same language as the question. Be concise and direct."
        return await chat(self.cfg, self.http, prompt, query)


from .digest import digest as _digest  # noqa: E402
from .sweep import run_sweep, SweepRow, EvalMetrics, EvalResult  # noqa: E402

Arke.digest = staticmethod(_digest)
Arke.sweep = staticmethod(run_sweep)

__all__ = [
    "Arke",
    "Config",
    "SweepRow",
    "EvalMetrics",
    "EvalResult",
]

# ═══════════════════════════════════════════
# arke/loader.py
# ═══════════════════════════════════════════

import json
from datetime import datetime, timezone
from pathlib import Path

from .types import Doc


def load_docs(source_path: str) -> list[Doc]:
    path = Path(source_path)
    if not path.is_dir():
        return _load_file(path)

    jsonl_files = sorted(path.glob("*.jsonl"))
    txt_files = sorted(path.rglob("*.txt"))

    docs: list[Doc] = []
    for file in jsonl_files:
        docs.extend(_load_file(file))
    for file in txt_files:
        docs.extend(_load_txt(file, path))
    return docs


def _load_file(file: Path) -> list[Doc]:
    """Reads a JSONL file and returns valid docs. System boundary: invalid
    lines and invalid records are dropped silently. The file stem serves
    as a source fallback when a record omits its own."""
    fallback = file.stem
    docs: list[Doc] = []
    for line in file.read_text(encoding="utf-8").splitlines():
        if not line.strip():
            continue

        try:
            raw = json.loads(line)
        except json.JSONDecodeError:
            continue

        if not isinstance(raw, dict):
            continue

        content = raw.get("content")
        if not isinstance(content, str) or not content.strip():
            continue

        source_raw = raw.get("source")
        source = source_raw if isinstance(source_raw, str) and source_raw else fallback

        raw_date = raw.get("created_at")
        created_at = datetime.now(timezone.utc)
        if isinstance(raw_date, str):
            try:
                created_at = datetime.fromisoformat(raw_date.replace("Z", "+00:00"))
            except ValueError:
                pass

        metadata_raw = raw.get("metadata")
        metadata = metadata_raw if isinstance(metadata_raw, dict) else {}

        docs.append(Doc(content=content, source=source, created_at=created_at, metadata=metadata))

    return docs


def _load_txt(file: Path, root: Path) -> list[Doc]:
    """Reads a plain text file as a single Doc. Source = relative path from root."""
    content = file.read_text(encoding="utf-8").strip()
    if not content:
        return []

    source = str(file.relative_to(root))
    return [Doc(
        content=content,
        source=source,
        created_at=datetime.now(timezone.utc),
        metadata={},
    )]

# ═══════════════════════════════════════════
# arke/models.py
# ═══════════════════════════════════════════

"""LLM API client. Speaks the /v1/ protocol supported by all major backends."""
import asyncio

import httpx

from .config import Config

EMBED_BATCH_SIZE = 64
RETRY_ATTEMPTS = 3
RETRY_DELAY = 2
RETRYABLE_STATUSES = (429, 502, 503, 504)


async def embed(cfg: Config, http: httpx.AsyncClient, texts: list[str]) -> list[list[float]]:
    """Embeds texts in batches to respect API limits."""
    all_vectors: list[list[float]] = []
    total = len(texts)
    for offset in range(0, total, EMBED_BATCH_SIZE):
        batch = texts[offset: offset + EMBED_BATCH_SIZE]
        vectors = await _embed_batch(cfg, http, batch)
        all_vectors.extend(vectors)
        done = min(offset + EMBED_BATCH_SIZE, total)
        print(f"\r  embed {done}/{total}", end="", flush=True)
    if total > EMBED_BATCH_SIZE:
        print()
    return all_vectors


async def _embed_batch(cfg: Config, http: httpx.AsyncClient, texts: list[str]) -> list[list[float]]:
    url = f"{cfg.embedder_url}/v1/embeddings"
    body = {"model": cfg.embedder_model, "input": texts}
    res = await _post(http, url, cfg.api_key, body)
    try:
        data = sorted(res["data"], key=lambda d: d["index"])
        return [d["embedding"] for d in data]
    except (KeyError, TypeError, IndexError) as exc:
        raise RuntimeError(f"unexpected embed response from {url}: {exc}") from exc


async def chat(cfg: Config, http: httpx.AsyncClient, system: str | None, user: str) -> str:
    messages: list[dict[str, str]] = []
    if system is not None:
        messages.append({"role": "system", "content": system})
    messages.append({"role": "user", "content": user})

    url = f"{cfg.inference_url}/v1/chat/completions"
    body = {"model": cfg.inference_model, "messages": messages}
    res = await _post(http, url, cfg.api_key, body)
    try:
        return res["choices"][0]["message"]["content"]
    except (KeyError, TypeError, IndexError) as exc:
        raise RuntimeError(f"unexpected chat response from {url}: {exc}") from exc


async def _post(http: httpx.AsyncClient, url: str, api_key: str, body: dict) -> dict:
    headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
    last_err: Exception | None = None

    for attempt in range(RETRY_ATTEMPTS):
        try:
            res = await http.post(url, headers=headers, json=body, timeout=300.0)
        except httpx.TimeoutException as exc:
            last_err = exc
            if attempt < RETRY_ATTEMPTS - 1:
                await asyncio.sleep(RETRY_DELAY)
                continue
            raise RuntimeError(f"API timed out after {RETRY_ATTEMPTS} attempts ({url})") from exc
        except httpx.ConnectError as exc:
            last_err = exc
            if attempt < RETRY_ATTEMPTS - 1:
                await asyncio.sleep(RETRY_DELAY)
                continue
            raise RuntimeError(f"API unreachable after {RETRY_ATTEMPTS} attempts ({url})") from exc

        if res.status_code in RETRYABLE_STATUSES and attempt < RETRY_ATTEMPTS - 1:
            last_err = RuntimeError(f"API {res.status_code}")
            await asyncio.sleep(RETRY_DELAY)
            continue

        if res.status_code >= 400:
            raise RuntimeError(f"API {res.status_code} from {url}: {res.text[:200]}")

        return res.json()

    raise RuntimeError(f"API failed after {RETRY_ATTEMPTS} attempts ({url}): {last_err}")

# ═══════════════════════════════════════════
# arke/presets.py
# ═══════════════════════════════════════════

from dataclasses import replace
from itertools import product

from .config import Config

FAST = "fast"
MEDIUM = "medium"
THOROUGH = "thorough"


def get_preset(level: str, base: Config) -> list[Config]:
    """Expands a sweep level into concrete configs with all parameter combinations."""
    if level == FAST:
        return _expand(
            base,
            chunk_sizes=[base.chunk_size],
            overlaps=[base.overlap],
            alphas=[base.alpha],
            ks=[base.k],
        )
    if level == MEDIUM:
        return _expand(
            base,
            chunk_sizes=[base.chunk_size],
            overlaps=[base.overlap],
            alphas=[0.0, 0.3, 0.5, 0.7, 1.0],
            ks=[5, 10, 20],
        )
    if level == THOROUGH:
        return _expand(
            base,
            chunk_sizes=[500, 1000],
            overlaps=[0.0, 0.2],
            alphas=[0.0, 0.3, 0.5, 0.7, 1.0],
            ks=[5, 10, 20],
        )

    raise ValueError(f"unknown sweep level: {level} (expected: {FAST} | {MEDIUM} | {THOROUGH})")


def _expand(
    base: Config,
    chunk_sizes: list[int],
    overlaps: list[float],
    alphas: list[float],
    ks: list[int],
) -> list[Config]:
    out: list[Config] = []
    for chunk_size, overlap, alpha, k in product(chunk_sizes, overlaps, alphas, ks):
        cfg = replace(base, chunk_size=chunk_size, overlap=overlap, alpha=alpha, k=k)
        out.append(cfg.resolved())
    return out

# ═══════════════════════════════════════════
# arke/sweep.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

from . import Arke
from .config import Config
from .models import embed
from .types import Chunk, SearchHit
from .cache import Cache, corpus_hash
from .digest import digest
from .gen import make_cases, EvalCase
from .presets import get_preset


@dataclass(frozen=True)
class SweepRow:
    cfg: Config
    metrics: EvalMetrics


@dataclass(frozen=True)
class EvalResult:
    hits: list[SearchHit]
    expected_ids: list[str]


@dataclass(frozen=True)
class EvalMetrics:
    precision: float
    recall: float
    mrr: float  # Mean Reciprocal Rank: 1 / position of first correct result


async def run_sweep(base_cfg: Config, level: str, limit: int) -> list[SweepRow]:
    source_path = digest(base_cfg.data_path)

    configs = get_preset(level, base_cfg)
    rows: list[SweepRow] = []
    chash = corpus_hash(source_path)

    for idx, cfg in enumerate(configs):
        async with Arke(cfg) as m:
            await m.reset()
            await _ensure_chunks(m, chash, cfg, source_path)
            cases = await _ensure_cases(m, chash, cfg, limit)

            queries = [c.query for c in cases]
            vectors = await embed(m.cfg, m.http, queries)

            results: list[EvalResult] = []
            for i, case in enumerate(cases):
                hits = await m.db.search(cfg, vectors[i], case.query)
                results.append(EvalResult(hits=hits, expected_ids=case.expected_ids))

            metrics = _score(results)
            rows.append(SweepRow(cfg=cfg, metrics=metrics))
            print(
                f"  eval {idx + 1}/{len(configs)}: chunk={cfg.chunk_size} overlap={cfg.overlap} alpha={cfg.alpha:.1f} k={cfg.k}")

    rows.sort(key=lambda r: r.metrics.mrr, reverse=True)
    _print_table(rows)
    return rows


async def _ensure_chunks(m: Arke, chash: str, cfg: Config, source_path: str) -> None:
    cache = Cache(corpus=chash, chunk_size=cfg.chunk_size, overlap=cfg.overlap, embedder=cfg.embedder_model)
    cached = cache.load()
    if cached is not None:
        chunks = [Chunk.from_dict(d) for d in cached]
        await m.db.insert(chunks)
        print(f"cache hit: {len(chunks)} chunks from disk")
    else:
        await m.ingest(source_path)
        chunks = await m.db.fetch_all()
        cache.save([c.to_dict() for c in chunks])


async def _ensure_cases(m: Arke, chash: str, cfg: Config, limit: int) -> list[EvalCase]:
    cache = Cache(corpus=chash, chunk_size=cfg.chunk_size, overlap=cfg.overlap, inference=cfg.inference_model)
    cached = cache.load()
    if cached is not None:
        cases = [EvalCase(query=c["query"], expected_ids=c["expected_ids"]) for c in cached]
        print(f"cache hit: {len(cases)} eval cases from disk")
        return cases

    cases = await make_cases(m, limit)
    cache.save([{"query": c.query, "expected_ids": c.expected_ids} for c in cases])
    return cases


def _score(results: list[EvalResult]) -> EvalMetrics:
    """Averages precision, recall and MRR across eval results."""
    n = len(results)

    if n == 0:
        return EvalMetrics(precision=0.0, recall=0.0, mrr=0.0)

    sum_p = 0.0
    sum_r = 0.0
    sum_rr = 0.0

    for r in results:
        expected = set(r.expected_ids)
        matched = sum(1 for h in r.hits if h.chunk.id in expected)

        sum_p += matched / len(r.hits) if r.hits else 0.0
        sum_r += matched / len(expected) if expected else 0.0

        for i, h in enumerate(r.hits):
            if h.chunk.id in expected:
                sum_rr += 1.0 / (i + 1)
                break

    return EvalMetrics(precision=sum_p / n, recall=sum_r / n, mrr=sum_rr / n)


def _print_table(rows: list[SweepRow]) -> None:
    header = f"{'chunk':>6} {'overlap':>7} {'alpha':>6} {'k':>4} {'prec':>7} {'recall':>7} {'MRR':>7}"
    print(f"\n{'Sweep Results (sorted by MRR)':^50}")
    print(header)
    print("-" * len(header))

    best_mrr = rows[0].metrics.mrr if rows else 0
    for r in rows:
        mrr_str = f"{r.metrics.mrr:.3f}"
        if r.metrics.mrr == best_mrr:
            mrr_str = f"{mrr_str} <-- best"
        print(
            f"{r.cfg.chunk_size:>6} {r.cfg.overlap:>7.1f} {r.cfg.alpha:>6.1f} {r.cfg.k:>4}"
            f" {r.metrics.precision:>7.3f} {r.metrics.recall:>7.3f} {mrr_str:>7}"
        )
    print()

# ═══════════════════════════════════════════
# arke/types.py
# ═══════════════════════════════════════════

import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any


@dataclass(frozen=True)
class Doc:
    content: str
    source: str
    created_at: datetime
    metadata: dict[str, Any]


@dataclass(frozen=True)
class Chunk:
    id: str
    source: str
    chunk_index: int
    content: str
    embedding: list[float]
    created_at: datetime
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "source": self.source,
            "chunk_index": self.chunk_index,
            "content": self.content,
            "embedding": [float(x) for x in self.embedding],
            "metadata": self.metadata,
            "created_at": self.created_at.isoformat(),
        }

    def to_row(self) -> tuple:
        """Tuple for DB INSERT. Column order must match the INSERT statement in db.py."""
        return (self.id, self.source, self.chunk_index, self.content,
                self.embedding, json.dumps(self.metadata), self.created_at)

    @staticmethod
    def from_dict(d: dict) -> "Chunk":
        created = d["created_at"]
        if isinstance(created, str):
            created = datetime.fromisoformat(created)

        metadata = d.get("metadata", {})
        if isinstance(metadata, str):
            metadata = json.loads(metadata)

        return Chunk(
            id=d["id"],
            source=d["source"],
            chunk_index=d["chunk_index"],
            content=d["content"],
            embedding=list(d["embedding"]),
            metadata=metadata,
            created_at=created,
        )


@dataclass(frozen=True)
class SearchHit:
    chunk: Chunk
    similarity: float


@dataclass(frozen=True)
class SearchAnswer:
    answer: str
    hits: list[SearchHit]

# ═══════════════════════════════════════════
# pyproject.toml
# ═══════════════════════════════════════════

[project]
name = "arke-terminal"
version = "0.1.120"
description = "Local-first RAG with built-in eval."
requires-python = ">=3.12"
license = "MIT"
authors = [
    { name = "Pavel Dolgoter", email = "mikepromogratus@proton.me" },
]
readme = "README.md"
keywords = ["rag", "eval", "search", "embeddings", "pgvector", "local"]
classifiers = [
    "Development Status :: 3 - Alpha",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.12",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
]
dependencies = [
    "python-dotenv>=1.0",
    "asyncpg>=0.29",
    "pgvector>=0.3",
    "httpx>=0.27",
    "numpy>=2.0",
    "click>=8.1",
    "fastapi>=0.115",
    "uvicorn>=0.34",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=0.23",
]

[project.urls]
Repository = "https://github.com/padolgot/arke-terminal"

[project.scripts]
arke = "arke.cli:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["arke"]

# ═══════════════════════════════════════════
# .env.example
# ═══════════════════════════════════════════

DATABASE_URL=postgresql://postgres@localhost:5432/arke

# Path to corpus. Local directory with .txt/.jsonl, or URL (auto-downloads LegalBench-RAG).
DATA_PATH=https://www.dropbox.com/scl/fo/r7xfa5i3hdsbxex1w6amw/AID389Olvtm-ZLTKAPrw6k4?rlkey=5n8zrbk4c08lbit3iiexofmwg&st=0hu354cq&dl=1

# API key for cloud providers. Leave empty for local (Ollama, vLLM).
API_KEY=

# Override defaults (Ollama localhost with bge-m3 / llama3).
# Works with any /v1/-compatible API (Ollama, vLLM, etc).
# EMBEDDER_URL=http://localhost:11434
# EMBEDDER_MODEL=bge-m3
# EMBEDDING_DIM=1024
# INFERENCE_URL=http://localhost:11434
# INFERENCE_MODEL=llama3:8b-instruct-q4_K_M
