# Arke Terminal — Source Dump
# Generated: 2026-04-16T17:42:36Z

# ═══════════════════════════════════════════
# README.md
# ═══════════════════════════════════════════

# Arke Terminal

AI document search for legal teams. Privilege-safe, on-premise.

![Dashboard](/.github/media/dashboard.png)

Cloud AI breaks attorney-client privilege (*United States v. Heppner*, *Hamid v SSHD*). Arke runs on your server. Your documents never leave your network.

Hybrid search (semantic + keyword) over your documents. Ask questions, get answers with source references. Click a source to open it in your default application.

## Quick start (Docker)

```bash
docker compose up --build
```

Opens dashboard at [localhost:8000](http://localhost:8000). Pulls models automatically on first run.

## Quick start (local)

Requires Python 3.12+, Postgres with `pgvector`, and [Ollama](https://ollama.com).

```bash
pip install arke-terminal
cp .env.example .env   # fill in DATABASE_URL and DATA_PATH
arke ingest ./your-documents
arke serve
```

## CLI

```bash
arke ingest <path>                       # index documents
arke ask "query"                         # search from terminal
arke serve                               # start dashboard + API
arke sweep <fast|medium|thorough> -l 30  # run eval benchmark
```

## Library

```python
from arke import Arke, Config

cfg = Config(database_url="postgresql://...", data_path="./docs")

async with Arke(cfg) as engine:
    await engine.ingest("./docs")
    result = await engine.ask("What are the termination clauses?")
    print(result.answer)
    for hit in result.hits:
        print(hit.chunk.source, hit.similarity)
```

## Input formats

**Plain text** (.txt) — loaded directly, source = relative path from root.

**JSONL** — one document per line:

```json
{"content": "...", "source": "optional", "created_at": "2026-04-01T12:00:00Z", "metadata": {}}
```

Only `content` is required.


# ═══════════════════════════════════════════
# arke/api.py
# ═══════════════════════════════════════════

"""REST API for Arke. Start with `arke serve`."""
from contextlib import asynccontextmanager
from pathlib import Path

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel

from . import Arke
from .config import Config

STATIC_DIR = Path(__file__).parent / "static"


engine: Arke
cfg: Config


@asynccontextmanager
async def lifespan(_app: FastAPI):
    global engine, cfg
    cfg = Config.from_env()
    engine = Arke(cfg)
    await engine.open()
    yield
    await engine.close()


app = FastAPI(title="Arke", lifespan=lifespan)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)


class AskRequest(BaseModel):
    query: str


class IngestRequest(BaseModel):
    source: str


class SweepRequest(BaseModel):
    level: str
    limit: int


@app.get("/health")
async def health():
    return {"status": "ok"}


@app.post("/ask")
async def ask(req: AskRequest):
    result = await engine.ask(req.query)
    sources = [
        {
            "source": h.chunk.source,
            "content": h.chunk.content[:300],
            "similarity": round(h.similarity, 3),
        }
        for h in result.hits
    ]
    return {"answer": result.answer, "sources": sources}


@app.post("/ingest")
async def ingest(req: IngestRequest):
    await engine.ingest(req.source)
    return {"status": "ok"}


@app.post("/sweep")
async def sweep(req: SweepRequest):
    rows = await Arke.sweep(cfg, req.level, req.limit)
    return {"rows": [{**vars(row.metrics), **vars(row.cfg)} for row in rows]}


@app.get("/open/{source:path}")
async def open_file(source: str):
    """Opens a document in the OS default application. Local-only."""
    import subprocess, sys
    base = Path(cfg.data_path)
    full = (base / source).resolve()
    if not full.exists():
        matches = list(base.rglob(source))
        if not matches:
            return {"error": "not found"}
        full = matches[0].resolve()

    if sys.platform == "linux":
        subprocess.Popen(["xdg-open", str(full)])
    elif sys.platform == "darwin":
        subprocess.Popen(["open", str(full)])
    else:
        subprocess.Popen(["start", str(full)], shell=True)

    return {"status": "ok"}


if STATIC_DIR.is_dir():
    @app.get("/{path:path}")
    async def spa_fallback(path: str):
        file = STATIC_DIR / path
        if file.is_file():
            return FileResponse(file)
        return FileResponse(STATIC_DIR / "index.html")

# ═══════════════════════════════════════════
# arke/cache.old.py
# ═══════════════════════════════════════════

"""File-based JSONL cache for expensive operations."""
import hashlib
import json
from pathlib import Path

CACHE_DIR = Path(".cache")


def corpus_hash(source_path: str) -> str:
    """Deterministic hash of source file(s) content."""
    h = hashlib.md5()
    p = Path(source_path)
    if p.is_file():
        h.update(p.read_bytes())
    elif p.is_dir():
        for f in sorted(p.rglob("*.jsonl")):
            h.update(f.read_bytes())
    return h.hexdigest()[:12]


class Cache:
    """Generic JSONL cache. Doesn't know what it stores — just dicts."""

    def __init__(self, **params: object) -> None:
        raw = json.dumps(params, sort_keys=True, default=str)
        key = hashlib.md5(raw.encode()).hexdigest()[:16]
        self._path = CACHE_DIR / f"{key}.jsonl"

    @property
    def path(self) -> Path:
        return self._path

    def exists(self) -> bool:
        return self._path.exists()

    def load(self) -> list[dict] | None:
        if not self._path.exists():
            return None
        try:
            return [json.loads(line) for line in self._path.read_text().splitlines()]
        except (json.JSONDecodeError, UnicodeDecodeError):
            self._path.unlink(missing_ok=True)
            print(f"cache corrupt, deleted: {self._path}")
            return None

    def save(self, rows: list[dict]) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)
        with open(self._path, "w") as f:
            for row in rows:
                f.write(json.dumps(row) + "\n")

# ═══════════════════════════════════════════
# arke/chunker.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

# Strongest separators first. Empty string is the final fallback (per-character).
SEPARATORS = ["\n\n", "\n", ". ", ", ", " ", ""]


@dataclass(frozen=True)
class ChunkData:
    """A chunk with its overlap context. The embedding is computed from
    `overlapped()` so the vector 'knows' about its neighbors, but only
    `clean` is stored in the database — no text duplication between rows."""
    clean: str
    head: str
    tail: str

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail


def chunk(text: str, chunk_size: int, overlap: float) -> list[ChunkData]:
    """Recursive character text splitter: split by separator hierarchy,
    then greedily merge adjacent pieces up to chunk_size."""
    if not text:
        return []

    clean = _merge(_separate(text.strip(), chunk_size), chunk_size)
    overlap_chars = int(chunk_size * overlap)
    result: list[ChunkData] = []

    for i, piece in enumerate(clean):
        head = clean[i - 1][-overlap_chars:] if (overlap_chars > 0 and i > 0) else ""
        tail = clean[i + 1][:overlap_chars] if (overlap_chars > 0 and i < len(clean) - 1) else ""
        result.append(ChunkData(clean=piece, head=head, tail=tail))

    return result


def _separate(text: str, chunk_size: int, depth: int = 0) -> list[str]:
    if depth >= len(SEPARATORS):
        return [text]

    sep = SEPARATORS[depth]
    parts = text.split(sep)
    result: list[str] = []

    for i, part in enumerate(parts):
        if len(part) < chunk_size:
            # Reattach the separator as a suffix so _merge can reconstruct
            # the original text without losing whitespace or newlines.
            suffix = sep if i < len(parts) - 1 else ""
            result.append(part + suffix)
        else:
            result.extend(_separate(part, chunk_size, depth + 1))

    return result


def _merge(splits: list[str], chunk_size: int) -> list[str]:
    raw: list[str] = []
    for s in splits:
        if not s:
            continue

        if raw and len(raw[-1]) + len(s) < chunk_size:
            raw[-1] += s
        else:
            raw.append(s)

    return raw

# ═══════════════════════════════════════════
# arke/cli.py
# ═══════════════════════════════════════════

import asyncio

import click
from dotenv import load_dotenv

from . import Arke
from .config import Config


def _arke() -> Arke:
    return Arke(Config.from_env())


@click.group(help="Arke — RAG with built-in eval")
def app() -> None:
    pass


@app.command(help="Digest raw source into cached JSONL.")
@click.argument("source", default="")
def digest(source: str) -> None:
    cfg = Config.from_env()
    data_path = source or cfg.data_path
    Arke.digest(data_path)


@app.command(help="Ingest documents from a JSONL file or directory.")
@click.argument("source")
def ingest(source: str) -> None:
    async def run() -> None:
        async with _arke() as m:
            await m.ingest(source)
    asyncio.run(run())


@app.command(help="Ask a question against the ingested corpus.")
@click.argument("query")
def ask(query: str) -> None:
    async def run() -> None:
        async with _arke() as m:
            result = await m.ask(query)
            print(f"\n{result.answer}\n")
    asyncio.run(run())


@app.command(help="Run an eval sweep across preset configurations.")
@click.argument("level")
@click.option("--limit", "-l", default=30, type=int, help="Number of sample chunks for eval")
def sweep(level: str, limit: int) -> None:
    async def run() -> None:
        cfg = Config.from_env()
        await Arke.sweep(cfg, level, limit)
    asyncio.run(run())


@app.command(help="Start the REST API server.")
@click.option("--port", "-p", default=8000, type=int, help="Port to listen on")
def serve(port: int) -> None:
    import uvicorn
    from .api import app as api_app
    uvicorn.run(api_app, host="0.0.0.0", port=port)


def main() -> None:
    load_dotenv()
    try:
        app()
    except (RuntimeError, ValueError) as exc:
        print(f"error: {exc}")
        raise SystemExit(1)


if __name__ == "__main__":
    main()

# ═══════════════════════════════════════════
# arke/config.py
# ═══════════════════════════════════════════

from __future__ import annotations

import os
from dataclasses import dataclass, replace


@dataclass(frozen=True)
class Config:
    database_url: str = ""
    data_path: str = ""
    api_key: str = ""
    embedder_url: str = ""
    embedder_model: str = ""
    embedding_dim: int = 0
    inference_url: str = ""
    inference_model: str = ""
    chunk_size: int = 600
    overlap: float = 0.0
    alpha: float = 0.7
    k: int = 5

    def resolved(self) -> Config:
        """Fills empty fields from defaults, validates, returns new Config."""
        if not self.database_url:
            raise ValueError("config: database_url is required")
        if not self.data_path:
            raise ValueError("config: data_path is required — set DATA_PATH in .env")

        cfg = replace(
            self,
            embedder_url=self.embedder_url or DEFAULTS.embedder_url,
            embedder_model=self.embedder_model or DEFAULTS.embedder_model,
            embedding_dim=self.embedding_dim or DEFAULTS.embedding_dim,
            inference_url=self.inference_url or DEFAULTS.inference_url,
            inference_model=self.inference_model or DEFAULTS.inference_model,
        )

        if cfg.chunk_size < 100 or cfg.chunk_size > 10000:
            raise ValueError(f"config: chunk_size must be 100..10000, got {cfg.chunk_size}")
        if cfg.overlap < 0 or cfg.overlap > 0.5:
            raise ValueError(f"config: overlap must be 0..0.5, got {cfg.overlap}")
        if cfg.alpha < 0 or cfg.alpha > 1:
            raise ValueError(f"config: alpha must be 0..1, got {cfg.alpha}")
        if cfg.k < 1 or cfg.k > 20:
            raise ValueError(f"config: k must be 1..20, got {cfg.k}")

        return cfg

    @staticmethod
    def from_env() -> Config:
        return Config(
            database_url=os.environ.get("DATABASE_URL", ""),
            data_path=os.environ.get("DATA_PATH", ""),
            api_key=os.environ.get("API_KEY", ""),
            embedder_url=os.environ.get("EMBEDDER_URL", ""),
            embedder_model=os.environ.get("EMBEDDER_MODEL", ""),
            embedding_dim=int(os.environ.get("EMBEDDING_DIM", "0")),
            inference_url=os.environ.get("INFERENCE_URL", ""),
            inference_model=os.environ.get("INFERENCE_MODEL", ""),
        )


DEFAULTS = Config(
    embedder_url="http://localhost:11434",
    embedder_model="bge-m3",
    embedding_dim=1024,
    inference_url="http://localhost:11434",
    inference_model="llama3:8b-instruct-q4_K_M",
)

# ═══════════════════════════════════════════
# arke/corpus.py
# ═══════════════════════════════════════════

"""LegalBench-RAG downloader. Used by digest when DATA_PATH is the Dropbox URL."""
import io
import zipfile
from pathlib import Path
from urllib.error import URLError
from urllib.request import urlopen

LEGALBENCH_URL = "https://www.dropbox.com/scl/fo/r7xfa5i3hdsbxex1w6amw/AID389Olvtm-ZLTKAPrw6k4?rlkey=5n8zrbk4c08lbit3iiexofmwg&st=0hu354cq&dl=1"
DATA_DIR = Path.home() / ".arke" / "data" / "legalbench-rag"


def download_legalbench() -> str:
    """Download and extract LegalBench-RAG corpus. Returns path to corpus dir.
    Idempotent — skips download if already present."""
    corpus_dir = DATA_DIR / "corpus"
    if corpus_dir.exists() and any(corpus_dir.rglob("*.txt")):
        print(f"legalbench-rag already at {corpus_dir}")
        return str(corpus_dir)

    print("downloading legalbench-rag (~87 MB)...")
    try:
        raw = urlopen(LEGALBENCH_URL).read()
    except URLError as exc:
        raise RuntimeError(f"failed to download legalbench-rag: {exc}") from exc

    DATA_DIR.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(io.BytesIO(raw)) as zf:
        zf.extractall(DATA_DIR)

    if not corpus_dir.exists():
        raise RuntimeError(f"expected {corpus_dir} after extraction, not found")

    count = sum(1 for _ in corpus_dir.rglob("*.txt"))
    print(f"extracted {count} documents to {corpus_dir}")
    return str(corpus_dir)

# ═══════════════════════════════════════════
# arke/db.old.py
# ═══════════════════════════════════════════

import asyncpg
import pgvector.asyncpg

from .config import Config
from .types import Chunk, SearchHit


BATCH_SIZE = 50


class Db:
    def __init__(self, dsn: str, embedding_dim: int) -> None:
        self._dsn = dsn
        self._embedding_dim = embedding_dim

    async def open(self) -> None:
        try:
            self._pool = await asyncpg.create_pool(dsn=self._dsn, init=_init_conn)
        except (OSError, asyncpg.PostgresError) as exc:
            raise RuntimeError(f"database connection failed ({self._dsn}): {exc}") from exc

    async def close(self) -> None:
        await self._pool.close()

    async def init_schema(self) -> None:
        try:
            await self._init_schema()
        except asyncpg.UndefinedObjectError as exc:
            raise RuntimeError("pgvector extension is not installed on this PostgreSQL server") from exc

    async def _init_schema(self) -> None:
        async with self._pool.acquire() as conn:
            await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
            await conn.execute(f"""
                CREATE TABLE IF NOT EXISTS chunks (
                    id          TEXT PRIMARY KEY,
                    source      TEXT NOT NULL CHECK (length(source) > 0),
                    chunk_index INTEGER NOT NULL CHECK (chunk_index >= 0),
                    content     TEXT NOT NULL CHECK (length(content) > 0),
                    embedding   vector({self._embedding_dim}) NOT NULL,
                    metadata    JSONB NOT NULL DEFAULT '{{}}' CHECK (jsonb_typeof(metadata) = 'object'),
                    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
                    tsv         tsvector GENERATED ALWAYS AS (to_tsvector('simple', content)) STORED NOT NULL
                )
            """)
            await conn.execute("CREATE INDEX IF NOT EXISTS chunks_tsv_idx ON chunks USING GIN (tsv)")
            await conn.execute(f"CREATE INDEX IF NOT EXISTS chunks_embedding_idx ON chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)")

    async def insert(self, chunks: list[Chunk]) -> None:
        rows = [c.to_row() for c in chunks]
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                for offset in range(0, len(rows), BATCH_SIZE):
                    await conn.executemany(
                        """
                        INSERT INTO chunks (id, source, chunk_index, content, embedding, metadata, created_at)
                        VALUES ($1, $2, $3, $4, $5, $6, $7)
                        ON CONFLICT (id) DO NOTHING
                        """,
                        rows[offset : offset + BATCH_SIZE],
                    )

    async def search(self, cfg: Config, query_vec: list[float], query_text: str) -> list[SearchHit]:
        """Hybrid search: alpha*cosine + (1-alpha)*bm25_normalized."""
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """
                WITH scored AS (
                    SELECT id, content, source, chunk_index, metadata, created_at, embedding,
                           1 - (embedding <=> $1) AS cosine_raw,
                           ts_rank(tsv, plainto_tsquery('simple', $3)) AS bm25_raw
                    FROM chunks
                ),
                bounds AS (SELECT max(bm25_raw) AS max_bm25 FROM scored)
                SELECT s.id, s.content, s.source, s.chunk_index, s.metadata, s.created_at, s.embedding,
                       ($4 * s.cosine_raw +
                        (1.0 - $4) * CASE WHEN b.max_bm25 > 0 THEN s.bm25_raw / b.max_bm25 ELSE 0 END
                       ) AS similarity
                FROM scored s, bounds b
                ORDER BY similarity DESC
                LIMIT $2
                """,
                query_vec, cfg.k, query_text, cfg.alpha,
            )
        return [
            SearchHit(chunk=Chunk.from_dict(dict(r)), similarity=float(r["similarity"]))
            for r in rows
        ]

    async def fetch_all(self) -> list[Chunk]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch("SELECT id, source, chunk_index, content, embedding, metadata, created_at FROM chunks")
        return [Chunk.from_dict(dict(r)) for r in rows]

    async def truncate(self) -> None:
        async with self._pool.acquire() as conn:
            await conn.execute("TRUNCATE chunks")

    async def sample(self, limit: int) -> list[Chunk]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch("SELECT id, source, chunk_index, content, embedding, metadata, created_at FROM chunks ORDER BY random() LIMIT $1", limit)
        return [Chunk.from_dict(dict(r)) for r in rows]


async def _init_conn(conn: asyncpg.Connection) -> None:
    await pgvector.asyncpg.register_vector(conn)

# ═══════════════════════════════════════════
# arke/digest.py
# ═══════════════════════════════════════════

"""Digest: parse raw source into cached JSONL.

Converts any supported source (local JSONL, URL) into a normalized
cache file that sweep and ingest can consume. This is the single
entry point for all data — no silent downloads, no fallbacks.
"""
from .cache import Cache, corpus_hash
from .corpus import download_legalbench
from .loader import load_docs
from .types import Doc


def digest(data_path: str) -> str:
    """Parse source into cached JSONL. Returns path to cache file.
    Idempotent — if cache exists for this source, returns immediately."""
    if not data_path:
        raise ValueError("data_path is required — set DATA_PATH in .env")

    if data_path.startswith("http"):
        return _digest_url(data_path)
    return _digest_local(data_path)


def _digest_url(url: str) -> str:
    corpus_path = download_legalbench()
    return _digest_local(corpus_path)


def _digest_local(path: str) -> str:
    chash = corpus_hash(path)
    cache = Cache(local=chash)
    if cache.exists():
        return str(cache.path)

    docs = load_docs(path)
    rows = [_doc_to_dict(d) for d in docs]
    cache.save(rows)
    print(f"digested {len(rows)} docs from {path}")
    return str(cache.path)


def _doc_to_dict(doc: Doc) -> dict:
    return {
        "content": doc.content,
        "source": doc.source,
        "metadata": doc.metadata,
        "created_at": doc.created_at.isoformat(),
    }

# ═══════════════════════════════════════════
# arke/gen.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

import asyncio
import json
import re

from . import Arke
from .models import chat

CONCURRENCY = 10


@dataclass(frozen=True)
class EvalCase:
    query: str
    expected_ids: list[str]


async def make_cases(engine: Arke, limit: int) -> list[EvalCase]:
    """Samples random chunks and generates eval questions concurrently."""
    samples = await engine.db.sample(limit)
    semaphore = asyncio.Semaphore(CONCURRENCY)
    done = 0

    async def process(chunk) -> list[EvalCase]:
        nonlocal done
        async with semaphore:
            questions = await _generate_questions(engine, chunk.content)
            done += 1
            print(f"  gen {done}/{len(samples)}")
            return [EvalCase(query=q, expected_ids=[chunk.id]) for q in questions]

    tasks = [process(chunk) for chunk in samples]
    results = await asyncio.gather(*tasks)

    cases = [case for batch in results for case in batch]
    print(f"gen done: {len(cases)} cases from {len(samples)} chunks")
    return cases


async def _generate_questions(engine: Arke, content: str) -> list[str]:
    instruction = "You will receive a text chunk from a personal knowledge base. Generate 1 to 3 questions that ONLY this specific chunk can answer. Questions must include specific details from the text — names, numbers, dates, unique terms. Avoid generic questions. Return ONLY a JSON array of strings, nothing else. Example: [\"question 1\", \"question 2\"]"
    prompt = instruction + "\n\nChunk:\n" + content
    raw = await chat(engine.cfg, engine.http, None, prompt)
    raw = raw.strip()

    match = re.search(r"\[[\s\S]*]", raw)
    if not match:
        return []

    try:
        parsed = json.loads(match.group(0))
    except json.JSONDecodeError:
        return []

    if not isinstance(parsed, list):
        return []

    return [q for q in parsed if isinstance(q, str) and q]

# ═══════════════════════════════════════════
# arke/__init__.py
# ═══════════════════════════════════════════

from __future__ import annotations

from .types import Chunk, Doc, SearchAnswer, SearchHit

__all__ = ["Chunk", "Doc", "SearchAnswer", "SearchHit"]

# ═══════════════════════════════════════════
# arke/loader.py
# ═══════════════════════════════════════════

import json
from datetime import datetime, timezone
from pathlib import Path

from .types import Doc


def load_docs(source_path: str) -> list[Doc]:
    path = Path(source_path)
    if not path.is_dir():
        return _load_file(path)

    jsonl_files = sorted(path.glob("*.jsonl"))
    txt_files = sorted(path.rglob("*.txt"))

    docs: list[Doc] = []
    for file in jsonl_files:
        docs.extend(_load_file(file))
    for file in txt_files:
        docs.extend(_load_txt(file, path))
    return docs


def _load_file(file: Path) -> list[Doc]:
    """Reads a JSONL file and returns valid docs. System boundary: invalid
    lines and invalid records are dropped silently. The file stem serves
    as a source fallback when a record omits its own."""
    fallback = file.stem
    docs: list[Doc] = []
    for line in file.read_text(encoding="utf-8").splitlines():
        if not line.strip():
            continue

        try:
            raw = json.loads(line)
        except json.JSONDecodeError:
            continue

        if not isinstance(raw, dict):
            continue

        content = raw.get("content")
        if not isinstance(content, str) or not content.strip():
            continue

        source_raw = raw.get("source")
        source = source_raw if isinstance(source_raw, str) and source_raw else fallback

        raw_date = raw.get("created_at")
        created_at = datetime.now(timezone.utc)
        if isinstance(raw_date, str):
            try:
                created_at = datetime.fromisoformat(raw_date.replace("Z", "+00:00"))
            except ValueError:
                pass

        metadata_raw = raw.get("metadata")
        metadata = metadata_raw if isinstance(metadata_raw, dict) else {}

        docs.append(Doc(content=content, source=source, created_at=created_at, metadata=metadata))

    return docs


def _load_txt(file: Path, root: Path) -> list[Doc]:
    """Reads a plain text file as a single Doc. Source = relative path from root."""
    content = file.read_text(encoding="utf-8").strip()
    if not content:
        return []

    source = str(file.relative_to(root))
    return [Doc(
        content=content,
        source=source,
        created_at=datetime.now(timezone.utc),
        metadata={},
    )]

# ═══════════════════════════════════════════
# arke/models.py
# ═══════════════════════════════════════════

"""LLM API client. Speaks the /v1/ protocol supported by all major backends."""
import asyncio

import httpx

from .config import Config

EMBED_BATCH_SIZE = 64
RETRY_ATTEMPTS = 3
RETRY_DELAY = 2
RETRYABLE_STATUSES = (429, 502, 503, 504)


async def embed(cfg: Config, http: httpx.AsyncClient, texts: list[str]) -> list[list[float]]:
    """Embeds texts in batches to respect API limits."""
    all_vectors: list[list[float]] = []
    total = len(texts)
    for offset in range(0, total, EMBED_BATCH_SIZE):
        batch = texts[offset: offset + EMBED_BATCH_SIZE]
        vectors = await _embed_batch(cfg, http, batch)
        all_vectors.extend(vectors)
        done = min(offset + EMBED_BATCH_SIZE, total)
        print(f"\r  embed {done}/{total}", end="", flush=True)
    if total > EMBED_BATCH_SIZE:
        print()
    return all_vectors


async def _embed_batch(cfg: Config, http: httpx.AsyncClient, texts: list[str]) -> list[list[float]]:
    url = f"{cfg.embedder_url}/v1/embeddings"
    body = {"model": cfg.embedder_model, "input": texts}
    res = await _post(http, url, cfg.api_key, body)
    try:
        data = sorted(res["data"], key=lambda d: d["index"])
        return [d["embedding"] for d in data]
    except (KeyError, TypeError, IndexError) as exc:
        raise RuntimeError(f"unexpected embed response from {url}: {exc}") from exc


async def chat(cfg: Config, http: httpx.AsyncClient, system: str | None, user: str) -> str:
    messages: list[dict[str, str]] = []
    if system is not None:
        messages.append({"role": "system", "content": system})
    messages.append({"role": "user", "content": user})

    url = f"{cfg.inference_url}/v1/chat/completions"
    body = {"model": cfg.inference_model, "messages": messages}
    res = await _post(http, url, cfg.api_key, body)
    try:
        return res["choices"][0]["message"]["content"]
    except (KeyError, TypeError, IndexError) as exc:
        raise RuntimeError(f"unexpected chat response from {url}: {exc}") from exc


async def _post(http: httpx.AsyncClient, url: str, api_key: str, body: dict) -> dict:
    headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
    last_err: Exception | None = None

    for attempt in range(RETRY_ATTEMPTS):
        try:
            res = await http.post(url, headers=headers, json=body, timeout=300.0)
        except httpx.TimeoutException as exc:
            last_err = exc
            if attempt < RETRY_ATTEMPTS - 1:
                await asyncio.sleep(RETRY_DELAY)
                continue
            raise RuntimeError(f"API timed out after {RETRY_ATTEMPTS} attempts ({url})") from exc
        except httpx.ConnectError as exc:
            last_err = exc
            if attempt < RETRY_ATTEMPTS - 1:
                await asyncio.sleep(RETRY_DELAY)
                continue
            raise RuntimeError(f"API unreachable after {RETRY_ATTEMPTS} attempts ({url})") from exc

        if res.status_code in RETRYABLE_STATUSES and attempt < RETRY_ATTEMPTS - 1:
            last_err = RuntimeError(f"API {res.status_code}")
            await asyncio.sleep(RETRY_DELAY)
            continue

        if res.status_code >= 400:
            raise RuntimeError(f"API {res.status_code} from {url}: {res.text[:200]}")

        return res.json()

    raise RuntimeError(f"API failed after {RETRY_ATTEMPTS} attempts ({url}): {last_err}")

# ═══════════════════════════════════════════
# arke/presets.py
# ═══════════════════════════════════════════

from dataclasses import replace
from itertools import product

from .config import Config

FAST = "fast"
MEDIUM = "medium"
THOROUGH = "thorough"


def get_preset(level: str, base: Config) -> list[Config]:
    """Expands a sweep level into concrete configs with all parameter combinations."""
    if level == FAST:
        return _expand(
            base,
            chunk_sizes=[base.chunk_size],
            overlaps=[base.overlap],
            alphas=[base.alpha],
            ks=[base.k],
        )
    if level == MEDIUM:
        return _expand(
            base,
            chunk_sizes=[base.chunk_size],
            overlaps=[base.overlap],
            alphas=[0.0, 0.3, 0.5, 0.7, 1.0],
            ks=[5, 10, 20],
        )
    if level == THOROUGH:
        return _expand(
            base,
            chunk_sizes=[500, 1000],
            overlaps=[0.0, 0.2],
            alphas=[0.0, 0.3, 0.5, 0.7, 1.0],
            ks=[5, 10, 20],
        )

    raise ValueError(f"unknown sweep level: {level} (expected: {FAST} | {MEDIUM} | {THOROUGH})")


def _expand(
    base: Config,
    chunk_sizes: list[int],
    overlaps: list[float],
    alphas: list[float],
    ks: list[int],
) -> list[Config]:
    out: list[Config] = []
    for chunk_size, overlap, alpha, k in product(chunk_sizes, overlaps, alphas, ks):
        cfg = replace(base, chunk_size=chunk_size, overlap=overlap, alpha=alpha, k=k)
        out.append(cfg.resolved())
    return out

# ═══════════════════════════════════════════
# arke/sdb.py
# ═══════════════════════════════════════════

"""Unified file-based store for all of Arke's cached state.

Everything that passes through Arke is cache — documents, embeddings, source files,
sessions. Microsoft (or the user's filesystem) is the source of truth; sdb is the
gut that digests and holds the processed form. Blow it away at any time — it
rebuilds from the source.

Layout:
    <root>/<table>/<id[:2]>/<id>.<ext>

Sharding by first two characters of the id keeps any single directory small
regardless of corpus size.
"""
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
import json
import os
import shutil

import numpy as np


_root: Path | None = None


def mount(path: str | Path) -> None:
    global _root
    _root = Path(path).expanduser()
    _root.mkdir(parents=True, exist_ok=True)
    for tmp in _root.rglob("*.tmp"):
        tmp.unlink(missing_ok=True)


def _table(table: str) -> Path:
    return _root / table


def _shard(table: str, id: str) -> Path:
    return _table(table) / id[:2]


def _path(table: str, id: str, ext: str) -> Path:
    suffix = f".{ext}" if ext else ""
    return _shard(table, id) / f"{id}{suffix}"


@contextmanager
def _atomic_open(path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_name(path.name + ".tmp")
    with open(tmp, "wb") as f:
        yield f
        f.flush()
        os.fsync(f.fileno())
    tmp.replace(path)


# JSON — documents, sessions, any structured record -----------------------

def put_json(table: str, id: str, data: dict) -> None:
    with _atomic_open(_path(table, id, "json")) as f:
        f.write(json.dumps(data, indent=2).encode())


def get_json(table: str, id: str) -> dict | None:
    p = _path(table, id, "json")
    if not p.exists():
        return None
    return json.loads(p.read_text())


def scan_json(table: str) -> Iterator[tuple[str, dict]]:
    table_dir = _table(table)
    if not table_dir.exists():
        return
    for f in table_dir.rglob("*.json"):
        yield f.stem, json.loads(f.read_text())


# Vectors — embeddings --------------------------------------------------------

def put_vec(table: str, id: str, vec: np.ndarray) -> None:
    with _atomic_open(_path(table, id, "npy")) as f:
        np.save(f, vec)


def get_vec(table: str, id: str) -> np.ndarray | None:
    p = _path(table, id, "npy")
    if not p.exists():
        return None
    return np.load(p)


# Raw bytes — source files (PDF, DOCX, MSG), any opaque blob ------------------

def put_bin(table: str, id: str, data: bytes) -> None:
    with _atomic_open(_path(table, id, "")) as f:
        f.write(data)


def get_bin(table: str, id: str) -> bytes | None:
    p = _path(table, id, "")
    if not p.exists():
        return None
    return p.read_bytes()


# Delete ----------------------------------------------------------------------

def delete(table: str, id: str) -> None:
    """Remove a single record (any extension) from the table."""
    shard = _shard(table, id)
    if not shard.exists():
        return
    for f in shard.iterdir():
        if f.name == id or f.name.startswith(f"{id}."):
            f.unlink(missing_ok=True)


def wipe(table: str) -> None:
    """Remove the entire table — every record under every shard."""
    table_dir = _table(table)
    if table_dir.exists():
        shutil.rmtree(table_dir)

# ═══════════════════════════════════════════
# arke/sweep.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

from . import Arke
from .config import Config
from .models import embed
from .types import Chunk, SearchHit
from .cache import Cache, corpus_hash
from .digest import digest
from .gen import make_cases, EvalCase
from .presets import get_preset


@dataclass(frozen=True)
class SweepRow:
    cfg: Config
    metrics: EvalMetrics


@dataclass(frozen=True)
class EvalResult:
    hits: list[SearchHit]
    expected_ids: list[str]


@dataclass(frozen=True)
class EvalMetrics:
    precision: float
    recall: float
    mrr: float  # Mean Reciprocal Rank: 1 / position of first correct result


async def run_sweep(base_cfg: Config, level: str, limit: int) -> list[SweepRow]:
    source_path = digest(base_cfg.data_path)

    configs = get_preset(level, base_cfg)
    rows: list[SweepRow] = []
    chash = corpus_hash(source_path)

    for idx, cfg in enumerate(configs):
        async with Arke(cfg) as m:
            await m.reset()
            await _ensure_chunks(m, chash, cfg, source_path)
            cases = await _ensure_cases(m, chash, cfg, limit)

            queries = [c.query for c in cases]
            vectors = await embed(m.cfg, m.http, queries)

            results: list[EvalResult] = []
            for i, case in enumerate(cases):
                hits = await m.db.search(cfg, vectors[i], case.query)
                results.append(EvalResult(hits=hits, expected_ids=case.expected_ids))

            metrics = _score(results)
            rows.append(SweepRow(cfg=cfg, metrics=metrics))
            print(
                f"  eval {idx + 1}/{len(configs)}: chunk={cfg.chunk_size} overlap={cfg.overlap} alpha={cfg.alpha:.1f} k={cfg.k}")

    rows.sort(key=lambda r: r.metrics.mrr, reverse=True)
    _print_table(rows)
    return rows


async def _ensure_chunks(m: Arke, chash: str, cfg: Config, source_path: str) -> None:
    cache = Cache(corpus=chash, chunk_size=cfg.chunk_size, overlap=cfg.overlap, embedder=cfg.embedder_model)
    cached = cache.load()
    if cached is not None:
        chunks = [Chunk.from_dict(d) for d in cached]
        await m.db.insert(chunks)
        print(f"cache hit: {len(chunks)} chunks from disk")
    else:
        await m.ingest(source_path)
        chunks = await m.db.fetch_all()
        cache.save([c.to_dict() for c in chunks])


async def _ensure_cases(m: Arke, chash: str, cfg: Config, limit: int) -> list[EvalCase]:
    cache = Cache(corpus=chash, chunk_size=cfg.chunk_size, overlap=cfg.overlap, inference=cfg.inference_model)
    cached = cache.load()
    if cached is not None:
        cases = [EvalCase(query=c["query"], expected_ids=c["expected_ids"]) for c in cached]
        print(f"cache hit: {len(cases)} eval cases from disk")
        return cases

    cases = await make_cases(m, limit)
    cache.save([{"query": c.query, "expected_ids": c.expected_ids} for c in cases])
    return cases


def _score(results: list[EvalResult]) -> EvalMetrics:
    """Averages precision, recall and MRR across eval results."""
    n = len(results)

    if n == 0:
        return EvalMetrics(precision=0.0, recall=0.0, mrr=0.0)

    sum_p = 0.0
    sum_r = 0.0
    sum_rr = 0.0

    for r in results:
        expected = set(r.expected_ids)
        matched = sum(1 for h in r.hits if h.chunk.id in expected)

        sum_p += matched / len(r.hits) if r.hits else 0.0
        sum_r += matched / len(expected) if expected else 0.0

        for i, h in enumerate(r.hits):
            if h.chunk.id in expected:
                sum_rr += 1.0 / (i + 1)
                break

    return EvalMetrics(precision=sum_p / n, recall=sum_r / n, mrr=sum_rr / n)


def _print_table(rows: list[SweepRow]) -> None:
    header = f"{'chunk':>6} {'overlap':>7} {'alpha':>6} {'k':>4} {'prec':>7} {'recall':>7} {'MRR':>7}"
    print(f"\n{'Sweep Results (sorted by MRR)':^50}")
    print(header)
    print("-" * len(header))

    best_mrr = rows[0].metrics.mrr if rows else 0
    for r in rows:
        mrr_str = f"{r.metrics.mrr:.3f}"
        if r.metrics.mrr == best_mrr:
            mrr_str = f"{mrr_str} <-- best"
        print(
            f"{r.cfg.chunk_size:>6} {r.cfg.overlap:>7.1f} {r.cfg.alpha:>6.1f} {r.cfg.k:>4}"
            f" {r.metrics.precision:>7.3f} {r.metrics.recall:>7.3f} {mrr_str:>7}"
        )
    print()

# ═══════════════════════════════════════════
# arke/types.py
# ═══════════════════════════════════════════

import hashlib
from dataclasses import dataclass, field
from typing import Any, ClassVar, Iterator

import numpy as np

from . import sdb


@dataclass
class Chunk:
    doc_id: str
    chunk_index: int
    clean: str
    head: str
    tail: str

    # Runtime only — not serialized. Loaded from sdb.get_vec or computed on GPU.
    embedding: np.ndarray | None = field(default=None, compare=False, repr=False)

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail

    def cache_key(self, model_id: str, model_version: str) -> str:
        raw = f"{model_id}:{model_version}:{self.overlapped()}"
        return hashlib.md5(raw.encode()).hexdigest()

    def save_embedding(self, model_id: str, model_version: str) -> None:
        if self.embedding is None:
            return
        sdb.put_vec("embeddings", self.cache_key(model_id, model_version), self.embedding)

    def load_embedding(self, model_id: str, model_version: str) -> bool:
        vec = sdb.get_vec("embeddings", self.cache_key(model_id, model_version))
        if vec is None:
            return False
        self.embedding = vec
        return True


@dataclass
class Doc:
    TABLE: ClassVar[str] = "documents"

    id: str
    source: str
    created: int
    modified: int
    metadata: dict[str, Any] = field(default_factory=dict)
    tags: list[str] = field(default_factory=list)

    # Runtime only — re-grown on every unwrap, never serialized.
    chunks: list[Chunk] = field(default_factory=list, compare=False, repr=False)
    _dirty: bool = field(default=False, compare=False, repr=False)

    def wrap(self) -> dict:
        return {
            "id": self.id,
            "source": self.source,
            "created": self.created,
            "modified": self.modified,
            "metadata": self.metadata,
            "tags": self.tags,
        }

    @classmethod
    def unwrap(cls, d: dict) -> "Doc":
        return cls(
            id=d["id"],
            source=d["source"],
            created=d["created"],
            modified=d["modified"],
            metadata=d.get("metadata", {}),
            tags=d.get("tags", []),
        )

    def save(self) -> None:
        sdb.put_json(self.TABLE, self.id, self.wrap())
        self._dirty = False

    @classmethod
    def load(cls, id: str) -> "Doc | None":
        data = sdb.get_json(cls.TABLE, id)
        return cls.unwrap(data) if data else None

    @classmethod
    def scan(cls) -> Iterator["Doc"]:
        for _, data in sdb.scan_json(cls.TABLE):
            yield cls.unwrap(data)


@dataclass(frozen=True)
class SearchHit:
    chunk: Chunk
    similarity: float


@dataclass(frozen=True)
class SearchAnswer:
    answer: str
    hits: list[SearchHit]

# ═══════════════════════════════════════════
# pyproject.toml
# ═══════════════════════════════════════════

[project]
name = "arke-terminal"
version = "0.1.163"
description = "Local-first RAG with built-in eval."
requires-python = ">=3.12"
license = "MIT"
authors = [
    { name = "Pavel Dolgoter", email = "mikepromogratus@proton.me" },
]
readme = "README.md"
keywords = ["rag", "eval", "search", "embeddings", "pgvector", "local"]
classifiers = [
    "Development Status :: 3 - Alpha",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.12",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
]
dependencies = [
    "python-dotenv>=1.0",
    "asyncpg>=0.29",
    "pgvector>=0.3",
    "httpx>=0.27",
    "numpy>=2.0",
    "click>=8.1",
    "fastapi>=0.115",
    "uvicorn>=0.34",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=0.23",
]

[project.urls]
Repository = "https://github.com/padolgot/arke-terminal"

[project.scripts]
arke = "arke.cli:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["arke"]

# ═══════════════════════════════════════════
# .env.example
# ═══════════════════════════════════════════

DATABASE_URL=postgresql://postgres@localhost:5432/arke

# Path to corpus. Local directory with .txt/.jsonl, or URL (auto-downloads LegalBench-RAG).
DATA_PATH=https://www.dropbox.com/scl/fo/r7xfa5i3hdsbxex1w6amw/AID389Olvtm-ZLTKAPrw6k4?rlkey=5n8zrbk4c08lbit3iiexofmwg&st=0hu354cq&dl=1

# API key for cloud providers. Leave empty for local (Ollama, vLLM).
API_KEY=

# Override defaults (Ollama localhost with bge-m3 / llama3).
# Works with any /v1/-compatible API (Ollama, vLLM, etc).
# EMBEDDER_URL=http://localhost:11434
# EMBEDDER_MODEL=bge-m3
# EMBEDDING_DIM=1024
# INFERENCE_URL=http://localhost:11434
# INFERENCE_MODEL=llama3:8b-instruct-q4_K_M
