# Arke Terminal — Source Dump
# Generated: 2026-04-17T23:26:20Z

# ═══════════════════════════════════════════
# README.md
# ═══════════════════════════════════════════

# Arke Terminal

AI document search for legal teams. Privilege-safe, on-premise.

![Dashboard](/.github/media/dashboard.png)

Cloud AI breaks attorney-client privilege (*United States v. Heppner*, *Hamid v SSHD*). Arke runs on your server. Your documents never leave your network.

Hybrid search (semantic + keyword) over your documents. Ask questions, get answers with source references. Click a source to open it in your default application.

## Quick start (Docker)

```bash
docker compose up --build
```

Opens dashboard at [localhost:8000](http://localhost:8000). Pulls models automatically on first run.

## Quick start (local)

Requires Python 3.12+, Postgres with `pgvector`, and [Ollama](https://ollama.com).

```bash
pip install arke-terminal
cp .env.example .env   # fill in DATABASE_URL and DATA_PATH
arke ingest ./your-documents
arke serve
```

## CLI

```bash
arke ingest <path>                       # index documents
arke ask "query"                         # search from terminal
arke serve                               # start dashboard + API
arke sweep <fast|medium|thorough> -l 30  # run eval benchmark
```

## Library

```python
from arke import Arke, Config

cfg = Config(database_url="postgresql://...", data_path="./docs")

async with Arke(cfg) as engine:
    await engine.ingest("./docs")
    result = await engine.ask("What are the termination clauses?")
    print(result.answer)
    for hit in result.hits:
        print(hit.chunk.source, hit.similarity)
```

## Input formats

**Plain text** (.txt) — loaded directly, source = relative path from root.

**JSONL** — one document per line:

```json
{"content": "...", "source": "optional", "created_at": "2026-04-01T12:00:00Z", "metadata": {}}
```

Only `content` is required.


# ═══════════════════════════════════════════
# arke/clients/cli.py
# ═══════════════════════════════════════════

"""CLI client. Usage: arke ask "your question" """
import sys

from arke.server import mailbox


def ask(query: str) -> None:
    msg_id = mailbox.send({"cmd": "ask", "query": query})
    response = mailbox.receive(msg_id)

    if response is None:
        print("error: arke did not respond", file=sys.stderr)
        sys.exit(1)

    if not response.get("ok"):
        print(f"error: {response.get('error')}", file=sys.stderr)
        sys.exit(1)

    print(response["answer"])

    for cite in response.get("citations", []):
        print(f"  [{cite['source']}] {cite['text'][:80]}...")


def main() -> None:
    if len(sys.argv) < 3 or sys.argv[1] != "ask":
        print("usage: arke ask <query>")
        sys.exit(1)

    ask(" ".join(sys.argv[2:]))

# ═══════════════════════════════════════════
# arke/clients/email.py
# ═══════════════════════════════════════════

"""Email client via Microsoft Graph webhooks.

Steps covered: auth with Azure AD client credentials, Graph subscription
against the shared mailbox, echo reply to any new email. Arke RAG is wired
separately through arke.server.mailbox.

Setup (two terminals):
    $ cloudflared tunnel --url http://localhost:8080
    # copy the printed https://<slug>.trycloudflare.com URL into
    # M365_WEBHOOK_URL in .env, then:
    $ arke-mail

The webhook server runs on a daemon thread so Graph can validate the
URL synchronously during subscription creation (Graph holds the POST
open until our endpoint echoes back the validationToken). Everything
else — subscription lifecycle, renewal, shutdown — lives in the main
thread as a plain synchronous loop.
"""

import json
import logging
import os
import secrets
import signal
import threading
import time
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, urlparse

import httpx
import msal

logger = logging.getLogger(__name__)

GRAPH = "https://graph.microsoft.com/v1.0"
SCOPE = ["https://graph.microsoft.com/.default"]
SUBSCRIPTION_TTL_MIN = 60
RENEWAL_INTERVAL_SEC = 50 * 60


@dataclass(frozen=True)
class EmailConfig:
    tenant_id: str
    client_id: str
    client_secret: str
    mailbox: str
    webhook_url: str
    webhook_port: int = 8080

    @staticmethod
    def from_env() -> "EmailConfig":
        def req(key: str) -> str:
            value = os.environ.get(key, "")
            if not value:
                raise ValueError(f"email config: {key} is required")
            return value

        return EmailConfig(
            tenant_id=req("M365_TENANT_ID"),
            client_id=req("M365_CLIENT_ID"),
            client_secret=req("M365_CLIENT_SECRET"),
            mailbox=req("M365_MAILBOX"),
            webhook_url=req("M365_WEBHOOK_URL"),
            webhook_port=int(os.environ.get("M365_WEBHOOK_PORT", "8080")),
        )


class _SeenIds:
    """Bounded FIFO of recently-processed message IDs. Graph occasionally
    redelivers a notification (e.g. on our 5xx or a transient network error);
    without dedup we would reply twice to the same email. Safe without a lock:
    HTTPServer serves one request at a time."""

    def __init__(self, max_size: int = 512) -> None:
        self._order: deque[str] = deque(maxlen=max_size)
        self._set: set[str] = set()

    def check_and_add(self, item: str) -> bool:
        """Return True if already seen; otherwise record and return False."""
        if item in self._set:
            return True
        if len(self._order) == self._order.maxlen:
            self._set.discard(self._order[0])
        self._order.append(item)
        self._set.add(item)
        return False


def _auth(token: str) -> dict[str, str]:
    return {"Authorization": f"Bearer {token}"}


def _iso_utc(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")


def _send(call: Callable[[], httpx.Response], max_tries: int = 3) -> httpx.Response:
    """Invoke a Graph request with retries on 429 and 5xx, exponential backoff.

    401 is not retried — MSAL caches a valid token, so a rejection means a
    deeper auth problem that should surface, not be masked.
    """
    delay = 1.0
    for attempt in range(max_tries):
        r = call()
        last_attempt = attempt == max_tries - 1
        if r.status_code == 429 and not last_attempt:
            time.sleep(float(r.headers.get("Retry-After", delay)))
            delay *= 2
            continue
        if 500 <= r.status_code < 600 and not last_attempt:
            time.sleep(delay)
            delay *= 2
            continue
        return r
    return r


def acquire_token(app: msal.ConfidentialClientApplication) -> str:
    result = app.acquire_token_for_client(scopes=SCOPE)
    if "access_token" not in result:
        raise RuntimeError(f"auth: {result.get('error_description', result)}")
    return result["access_token"]


def create_subscription(http: httpx.Client, token: str, cfg: EmailConfig, client_state: str) -> str:
    # Scope to the Inbox folder so outbound replies (saved in Sent Items) do
    # not trigger notifications. Without this the bot receives its own sent
    # messages and can recursively reply to itself.
    expires = datetime.now(timezone.utc) + timedelta(minutes=SUBSCRIPTION_TTL_MIN)
    body = {
        "changeType": "created",
        "notificationUrl": cfg.webhook_url,
        "resource": f"/users/{cfg.mailbox}/mailFolders('inbox')/messages",
        "expirationDateTime": _iso_utc(expires),
        "clientState": client_state,
    }
    r = _send(lambda: http.post(f"{GRAPH}/subscriptions", json=body, headers=_auth(token)))
    r.raise_for_status()
    return r.json()["id"]


def renew_subscription(http: httpx.Client, token: str, sub_id: str) -> None:
    expires = datetime.now(timezone.utc) + timedelta(minutes=SUBSCRIPTION_TTL_MIN)
    body = {"expirationDateTime": _iso_utc(expires)}
    url = f"{GRAPH}/subscriptions/{sub_id}"
    r = _send(lambda: http.patch(url, json=body, headers=_auth(token)))
    r.raise_for_status()


def delete_subscription(http: httpx.Client, token: str, sub_id: str) -> None:
    url = f"{GRAPH}/subscriptions/{sub_id}"
    r = _send(lambda: http.delete(url, headers=_auth(token)))
    if r.status_code >= 400:
        logger.warning("delete subscription %s: %d", sub_id, r.status_code)


def fetch_message(http: httpx.Client, token: str, mailbox: str, msg_id: str) -> dict:
    url = f"{GRAPH}/users/{mailbox}/messages/{msg_id}"
    params = {"$select": "id,subject,from"}
    r = _send(lambda: http.get(url, params=params, headers=_auth(token)))
    r.raise_for_status()
    return r.json()


def reply_to_message(http: httpx.Client, token: str, mailbox: str, msg_id: str, comment: str) -> None:
    url = f"{GRAPH}/users/{mailbox}/messages/{msg_id}/reply"
    body = {"comment": comment}
    r = _send(lambda: http.post(url, json=body, headers=_auth(token)))
    r.raise_for_status()


def mark_as_read(http: httpx.Client, token: str, mailbox: str, msg_id: str) -> None:
    url = f"{GRAPH}/users/{mailbox}/messages/{msg_id}"
    body = {"isRead": True}
    r = _send(lambda: http.patch(url, json=body, headers=_auth(token)))
    r.raise_for_status()


def process_message(http: httpx.Client, token: str, mailbox: str, msg_id: str) -> None:
    """Echo: fetch the message, reply with 'received: <subject>', mark read.

    RAG integration replaces the reply body and optionally routes through
    arke.server.mailbox; wiring lives outside this function.
    """
    msg = fetch_message(http, token, mailbox, msg_id)
    subject = msg.get("subject") or "(no subject)"
    sender = (msg.get("from") or {}).get("emailAddress", {}).get("address", "unknown")
    logger.info("received: %s (from %s)", subject, sender)

    reply_to_message(http, token, mailbox, msg_id, f"received: {subject}")
    mark_as_read(http, token, mailbox, msg_id)


def _build_handler(
    http: httpx.Client,
    msal_app: msal.ConfidentialClientApplication,
    cfg: EmailConfig,
    client_state: str,
) -> type[BaseHTTPRequestHandler]:
    seen = _SeenIds()

    class WebhookHandler(BaseHTTPRequestHandler):
        def log_message(self, fmt: str, *args: object) -> None:
            logger.debug("http: " + fmt, *args)

        def do_POST(self) -> None:
            query = parse_qs(urlparse(self.path).query)
            if "validationToken" in query:
                token_value = query["validationToken"][0]
                payload = token_value.encode()
                self.send_response(200)
                self.send_header("Content-Type", "text/plain")
                self.send_header("Content-Length", str(len(payload)))
                self.end_headers()
                self.wfile.write(payload)
                logger.info("subscription validated")
                return

            length = int(self.headers.get("Content-Length", "0"))
            raw = self.rfile.read(length)
            self.send_response(202)
            self.end_headers()

            try:
                body = json.loads(raw)
            except json.JSONDecodeError:
                logger.warning("invalid json body")
                return

            token = acquire_token(msal_app)
            for notif in body.get("value", []):
                if notif.get("clientState") != client_state:
                    logger.warning("clientState mismatch, dropping")
                    continue
                msg_id = (notif.get("resourceData") or {}).get("id")
                if not msg_id:
                    continue
                if seen.check_and_add(msg_id):
                    logger.info("duplicate notification for %s, skipping", msg_id)
                    continue
                try:
                    process_message(http, token, cfg.mailbox, msg_id)
                except httpx.HTTPError as exc:
                    logger.error("process %s failed: %s", msg_id, exc)

    return WebhookHandler


def _install_term_handler() -> None:
    """Translate SIGTERM into KeyboardInterrupt so systemd/docker stop paths run
    the same finally block as Ctrl-C, deleting the Graph subscription cleanly.
    Python's default SIGTERM kills the process mid-stride, skipping cleanup."""

    def handler(signum: int, frame: object) -> None:
        raise KeyboardInterrupt()

    signal.signal(signal.SIGTERM, handler)


def run(cfg: EmailConfig) -> None:
    logger.info("email client starting, mailbox=%s, webhook=%s", cfg.mailbox, cfg.webhook_url)
    _install_term_handler()

    msal_app = msal.ConfidentialClientApplication(
        client_id=cfg.client_id,
        client_credential=cfg.client_secret,
        authority=f"https://login.microsoftonline.com/{cfg.tenant_id}",
    )

    with httpx.Client(timeout=30) as http:
        client_state = secrets.token_urlsafe(32)
        handler_cls = _build_handler(http, msal_app, cfg, client_state)
        server = HTTPServer(("127.0.0.1", cfg.webhook_port), handler_cls)

        server_thread = threading.Thread(target=server.serve_forever, daemon=True)
        server_thread.start()
        logger.info("listening on 127.0.0.1:%d", cfg.webhook_port)

        sub_id: str | None = None
        try:
            token = acquire_token(msal_app)
            sub_id = create_subscription(http, token, cfg, client_state)
            logger.info("created subscription %s", sub_id)

            while True:
                time.sleep(RENEWAL_INTERVAL_SEC)
                renew_subscription(http, acquire_token(msal_app), sub_id)
                logger.info("renewed subscription")
        except KeyboardInterrupt:
            logger.info("shutting down")
        finally:
            if sub_id is not None:
                try:
                    delete_subscription(http, acquire_token(msal_app), sub_id)
                except Exception:
                    logger.exception("cleanup failed")
            server.shutdown()


def main() -> None:
    from dotenv import load_dotenv

    load_dotenv()
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
    run(EmailConfig.from_env())


if __name__ == "__main__":
    main()

# ═══════════════════════════════════════════
# arke/clients/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/clients/socket.py
# ═══════════════════════════════════════════

"""Unix socket gateway. Knows nothing about Arke internals.

Accepts a connection, reads a JSON request, drops it in inbox,
polls outbox for the response, sends it back, closes the connection.
"""
import json
import logging
import socket
from pathlib import Path

from arke.server import mailbox

logger = logging.getLogger(__name__)

SOCK_PATH = Path("~/.arke/arke.sock").expanduser()


def run(sock_path: Path = SOCK_PATH) -> None:
    """Start socket gateway. Blocks forever — run in a dedicated thread or process."""
    mailbox.setup()

    if sock_path.exists():
        sock_path.unlink()

    server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    server.bind(str(sock_path))
    server.listen()
    logger.info("gateway listening on %s", sock_path)

    try:
        while True:
            conn, _ = server.accept()
            conn.settimeout(30)
            try:
                _handle(conn)
            except Exception as e:
                logger.warning("connection error: %s", e)
            finally:
                conn.close()
    finally:
        server.close()
        if sock_path.exists():
            sock_path.unlink()


def _handle(conn: socket.socket) -> None:
    f = conn.makefile("rb")
    line = f.readline()
    if not line:
        return

    try:
        request = json.loads(line)
    except json.JSONDecodeError as e:
        _send(conn, {"ok": False, "error": f"invalid JSON: {e}"})
        return

    logger.info("cmd=%s", request.get("cmd"))

    msg_id = mailbox.send(request)
    response = mailbox.receive(msg_id)

    if response is None:
        _send(conn, {"ok": False, "error": "arke did not respond in time"})
        return

    _send(conn, response)


def _send(conn: socket.socket, data: dict) -> None:
    conn.sendall(json.dumps(data).encode() + b"\n")

# ═══════════════════════════════════════════
# arke/clients/tui.py
# ═══════════════════════════════════════════

"""TUI client. Run: python -m arke.clients.tui"""
# TODO: replace with Textual when we wire up the UI
# For now: minimal readline loop so we can test mailbox end-to-end

import sys

from arke.server import mailbox


def run() -> None:
    print("Arke TUI — type your question, Ctrl-C to exit\n")

    while True:
        try:
            query = input("> ").strip()
        except (KeyboardInterrupt, EOFError):
            print()
            break

        if not query:
            continue

        msg_id = mailbox.send({"cmd": "ask", "query": query})
        response = mailbox.receive(msg_id)

        if response is None:
            print("error: arke did not respond\n")
            continue

        if not response.get("ok"):
            print(f"error: {response.get('error')}\n")
            continue

        print(f"\n{response['answer']}\n")

        for cite in response.get("citations", []):
            print(f"  [{cite['source']}] {cite['text'][:80]}...")
        print()


if __name__ == "__main__":
    run()

# ═══════════════════════════════════════════
# arke/digest/__init__.py
# ═══════════════════════════════════════════

from .sync import RcloneSource, run

__all__ = ["RcloneSource", "run"]

# ═══════════════════════════════════════════
# arke/digest/sync.py
# ═══════════════════════════════════════════

import hashlib
import logging
import shutil
import subprocess
import time
from pathlib import Path

logger = logging.getLogger(__name__)


class RcloneSource:
    def __init__(self, name: str, remote: str):
        self._name = name
        self._remote = remote

    @property
    def name(self) -> str:
        return self._name

    def sync_to(self, dest: Path) -> None:
        dest.mkdir(parents=True, exist_ok=True)
        subprocess.run(
            ["rclone", "sync", self._remote, str(dest),
             "--timeout", "10m",
             "--contimeout", "60s"],
            check=True,
        )


def _dir_hash(path: Path) -> str:
    h = hashlib.md5()
    for f in sorted(path.rglob("*")):
        if f.is_file():
            st = f.stat()
            h.update(str(f.relative_to(path)).encode())
            h.update(str(st.st_size).encode())
            h.update(str(st.st_mtime_ns).encode())
    return h.hexdigest()


def _load_hash(space: Path) -> str:
    p = space / ".sync_hash"
    return p.read_text().strip() if p.exists() else ""


def _save_hash(space: Path, h: str) -> None:
    (space / ".sync_hash").write_text(h)


def run(space: Path, sources: list[RcloneSource], interval: int = 60) -> None:
    """Sync loop. Blocks forever — call from a dedicated process."""
    staging = space / "staging"
    digest = space / "digest"
    staging.mkdir(parents=True, exist_ok=True)

    # survive restarts without forcing a full re-ingest
    last_hash = _load_hash(space)

    while True:
        for source in sources:
            try:
                source.sync_to(staging / source.name)
            except Exception as e:
                logger.warning("source %s failed: %s", source.name, e)

        current_hash = _dir_hash(staging)
        if current_hash != last_hash:
            tmp = digest.with_name(digest.name + ".tmp")
            old = digest.with_name(digest.name + ".old")

            if tmp.exists():
                shutil.rmtree(tmp)
            if old.exists():
                shutil.rmtree(old)

            shutil.copytree(staging, tmp, symlinks=True)

            if digest.exists():
                # This rename is the only junction between the sync daemon and Arke.
                # If Arke consumed digest between exists() and here, this raises — let it.
                # systemd restarts in 5s, digest is gone, second attempt goes through clean.
                digest.rename(old)

            tmp.rename(digest)

            if old.exists():
                shutil.rmtree(old)

            last_hash = current_hash
            _save_hash(space, last_hash)
            logger.info("digest published")

        time.sleep(interval)

# ═══════════════════════════════════════════
# arke/eval/download_corpora.py
# ═══════════════════════════════════════════

"""Download eval corpora. Run once to populate ~/.arke/data/.

Usage:
    python -m arke.eval.download_corpora

Corpora:
    legalbench-rag  — ~87 MB, legal QA benchmark
    (more to be added)
"""
import io
import sys
import zipfile
from pathlib import Path
from urllib.error import URLError
from urllib.request import urlopen

DATA_DIR = Path.home() / ".arke" / "data"

CORPORA = {
    "legalbench-rag": {
        "url": "https://www.dropbox.com/scl/fo/r7xfa5i3hdsbxex1w6amw/AID389Olvtm-ZLTKAPrw6k4?rlkey=5n8zrbk4c08lbit3iiexofmwg&st=0hu354cq&dl=1",
        "dest": DATA_DIR / "legalbench-rag",
        "check": "corpus/*.txt",
        "size": "~87 MB",
    },
}


def download(name: str) -> None:
    corpus = CORPORA[name]
    dest: Path = corpus["dest"]
    check_glob: str = corpus["check"]

    if dest.exists() and any(dest.rglob(check_glob.split("/")[-1])):
        print(f"{name}: already at {dest}")
        return

    print(f"{name}: downloading {corpus['size']}...")
    try:
        raw = urlopen(corpus["url"]).read()
    except URLError as exc:
        raise RuntimeError(f"{name}: download failed: {exc}") from exc

    dest.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(io.BytesIO(raw)) as zf:
        zf.extractall(dest)

    count = sum(1 for _ in dest.rglob("*.txt"))
    print(f"{name}: extracted {count} files to {dest}")


def main() -> None:
    for name in CORPORA:
        try:
            download(name)
        except RuntimeError as exc:
            print(f"error: {exc}", file=sys.stderr)
            sys.exit(1)


if __name__ == "__main__":
    main()

# ═══════════════════════════════════════════
# arke/eval/gen.py
# ═══════════════════════════════════════════

"""Generate eval cases from chunks using LLM.

For each sampled chunk, LLM generates questions that only that chunk can answer.
This creates ground-truth (query → expected_chunk) pairs for scoring.
"""
import json
import random
import re
from dataclasses import dataclass

from arke.server.models import LLM
from arke.server.types import Chunk

INSTRUCTION = (
    "You will receive a text chunk from a legal document. "
    "Generate 1 to 3 questions that ONLY this specific chunk can answer. "
    "Questions must include specific details — names, numbers, dates, case citations, unique terms. "
    "Avoid generic questions. "
    "Return ONLY a JSON array of strings, nothing else. "
    'Example: ["question 1", "question 2"]'
)


@dataclass(frozen=True)
class EvalCase:
    query: str
    expected_key: str  # "doc_id:chunk_index"


def make_cases(llm: LLM, chunks: list[Chunk], limit: int) -> list[EvalCase]:
    samples = random.sample(chunks, min(limit, len(chunks)))
    cases: list[EvalCase] = []

    for i, chunk in enumerate(samples):
        questions = _generate_questions(llm, chunk.clean)
        key = f"{chunk.doc_id}:{chunk.chunk_index}"
        cases.extend(EvalCase(query=q, expected_key=key) for q in questions)
        print(f"  gen {i + 1}/{len(samples)}: {len(questions)} questions")

    print(f"gen done: {len(cases)} cases from {len(samples)} chunks")
    return cases


def _generate_questions(llm: LLM, content: str) -> list[str]:
    raw = llm.chat(None, INSTRUCTION + "\n\nChunk:\n" + content).strip()

    match = re.search(r"\[[\s\S]*]", raw)
    if not match:
        return []

    try:
        parsed = json.loads(match.group(0))
    except json.JSONDecodeError:
        return []

    if not isinstance(parsed, list):
        return []

    return [q for q in parsed if isinstance(q, str) and q.strip()]

# ═══════════════════════════════════════════
# arke/eval/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/eval/presets.py
# ═══════════════════════════════════════════

from dataclasses import replace
from itertools import product

from arke.server.config import Config

FAST = "fast"
MEDIUM = "medium"
THOROUGH = "thorough"


def get_preset(level: str, base: Config) -> list[Config]:
    if level == FAST:
        return _expand(base, chunk_sizes=[base.chunk_size], overlaps=[base.overlap], alphas=[base.alpha], ks=[base.k])
    if level == MEDIUM:
        return _expand(base, chunk_sizes=[base.chunk_size], overlaps=[base.overlap], alphas=[0.0, 0.3, 0.5, 0.7, 1.0], ks=[5, 10, 20])
    if level == THOROUGH:
        return _expand(base, chunk_sizes=[500, 1000], overlaps=[0.0, 0.2], alphas=[0.0, 0.3, 0.5, 0.7, 1.0], ks=[5, 10, 20])

    raise ValueError(f"unknown sweep level: {level} (expected: {FAST} | {MEDIUM} | {THOROUGH})")


def _expand(base: Config, chunk_sizes: list[int], overlaps: list[float], alphas: list[float], ks: list[int]) -> list[Config]:
    out: list[Config] = []
    for chunk_size, overlap, alpha, k in product(chunk_sizes, overlaps, alphas, ks):
        cfg = replace(base, chunk_size=chunk_size, overlap=overlap, alpha=alpha, k=k)
        out.append(cfg.resolved())
    return out

# ═══════════════════════════════════════════
# arke/eval/sweep.py
# ═══════════════════════════════════════════

"""Sweep — run eval across preset configs and rank by MRR.

Each config row = full arke-server restart with that config.
Communicates through mailbox like any real client.

Usage:
    python -m arke.eval.sweep --space legalbench --level medium --limit 50
"""
import os
import signal
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path

from arke.server import mailbox
from arke.server.config import Config
from arke.server.models import Models
from arke.server.types import Doc

from .gen import EvalCase, make_cases
from .presets import get_preset


@dataclass(frozen=True)
class EvalMetrics:
    precision: float
    recall: float
    mrr: float


@dataclass(frozen=True)
class SweepRow:
    cfg: Config
    metrics: EvalMetrics


def run(space: str, level: str, limit: int) -> list[SweepRow]:
    base_cfg = Config.from_env().resolved()
    configs = get_preset(level, base_cfg)

    # Generate eval cases using base config (one-time)
    print("generating eval cases...")
    cases = _generate_cases(space, base_cfg, limit)
    if not cases:
        print("error: no eval cases generated", file=sys.stderr)
        sys.exit(1)
    print(f"generated {len(cases)} cases\n")

    rows: list[SweepRow] = []
    for idx, cfg in enumerate(configs):
        print(f"[{idx + 1}/{len(configs)}] chunk={cfg.chunk_size} overlap={cfg.overlap} alpha={cfg.alpha} k={cfg.k}")
        metrics = _run_row(space, cfg, cases)
        rows.append(SweepRow(cfg=cfg, metrics=metrics))
        print(f"  → precision={metrics.precision:.3f} recall={metrics.recall:.3f} MRR={metrics.mrr:.3f}\n")

    rows.sort(key=lambda r: r.metrics.mrr, reverse=True)
    _print_table(rows)
    return rows


def _generate_cases(space: str, cfg: Config, limit: int) -> list[EvalCase]:
    """Start server, sample chunks, generate questions, stop server."""
    proc = _start_server(space, cfg)
    try:
        _wait_ready()
        # ask server to sample chunks for case generation
        msg_id = mailbox.send({"cmd": "sample", "limit": limit})
        response = mailbox.receive(msg_id)
        if not response or not response.get("ok"):
            raise RuntimeError(f"sample failed: {response}")

        # load models for case generation (cloud preferred for quality)
        models = Models.load(cfg)
        from arke.server.types import Chunk
        chunks = [
            Chunk(doc_id=c["doc_id"], chunk_index=c["chunk_index"], clean=c["clean"], head=c["head"], tail=c["tail"])
            for c in response["chunks"]
        ]
        return make_cases(models.llm, chunks, limit)
    finally:
        _stop_server(proc)


def _run_row(space: str, cfg: Config, cases: list[EvalCase]) -> EvalMetrics:
    proc = _start_server(space, cfg)
    try:
        _wait_ready()
        results = []
        for case in cases:
            msg_id = mailbox.send({"cmd": "ask", "query": case.query})
            response = mailbox.receive(msg_id)
            hits = response.get("citations", []) if response and response.get("ok") else []
            results.append((hits, case.expected_key))
        return _score(results)
    finally:
        _stop_server(proc)


def _start_server(space: str, cfg: Config) -> subprocess.Popen:
    env = {
        **os.environ,
        "ARKE_SPACE": space,
        "CHUNK_SIZE": str(cfg.chunk_size),
        "OVERLAP": str(cfg.overlap),
        "ALPHA": str(cfg.alpha),
        "K": str(cfg.k),
    }
    proc = subprocess.Popen(["arke-server"], env=env)
    return proc


def _wait_ready(timeout: float = 60.0) -> None:
    """Poll ping until server responds or timeout."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            msg_id = mailbox.send({"cmd": "ping"})
            response = mailbox.receive.__wrapped__(msg_id, poll_timeout=2.0) if hasattr(mailbox.receive, "__wrapped__") else None
            # fallback: try direct file check
            outbox = Path.home() / ".arke" / "outbox" / f"{msg_id}.json"
            time.sleep(0.5)
            if outbox.exists():
                outbox.unlink(missing_ok=True)
                return
        except Exception:
            pass
        time.sleep(1.0)
    raise RuntimeError("arke-server did not start in time")


def _stop_server(proc: subprocess.Popen) -> None:
    proc.send_signal(signal.SIGTERM)
    try:
        proc.wait(timeout=10)
    except subprocess.TimeoutExpired:
        proc.kill()


def _score(results: list[tuple[list, str]]) -> EvalMetrics:
    n = len(results)
    if n == 0:
        return EvalMetrics(precision=0.0, recall=0.0, mrr=0.0)

    sum_p = sum_r = sum_rr = 0.0
    for hits, expected_key in results:
        hit_keys = [f"{h['source']}:{h.get('chunk_index', '')}" for h in hits]
        matched = sum(1 for k in hit_keys if k == expected_key)

        sum_p += matched / len(hits) if hits else 0.0
        sum_r += float(matched > 0)

        for i, k in enumerate(hit_keys):
            if k == expected_key:
                sum_rr += 1.0 / (i + 1)
                break

    return EvalMetrics(precision=sum_p / n, recall=sum_r / n, mrr=sum_rr / n)


def _print_table(rows: list[SweepRow]) -> None:
    header = f"{'chunk':>6} {'overlap':>7} {'alpha':>6} {'k':>4} {'prec':>7} {'recall':>7} {'MRR':>7}"
    print(f"\n{'Sweep Results (sorted by MRR)':^50}")
    print(header)
    print("-" * len(header))
    best_mrr = rows[0].metrics.mrr if rows else 0.0
    for r in rows:
        mrr_str = f"{r.metrics.mrr:.3f}"
        if r.metrics.mrr == best_mrr:
            mrr_str += " <-- best"
        print(
            f"{r.cfg.chunk_size:>6} {r.cfg.overlap:>7.1f} {r.cfg.alpha:>6.1f} {r.cfg.k:>4}"
            f" {r.metrics.precision:>7.3f} {r.metrics.recall:>7.3f} {mrr_str}"
        )
    print()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--space", required=True)
    parser.add_argument("--level", default="medium")
    parser.add_argument("--limit", type=int, default=50)
    args = parser.parse_args()
    run(args.space, args.level, args.limit)

# ═══════════════════════════════════════════
# arke/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/server/backend_cloud.py
# ═══════════════════════════════════════════

"""Cloud backend — OpenAI API via HTTP."""
import json
import urllib.request
from dataclasses import dataclass

EMBED_BATCH_SIZE = 64


@dataclass
class CloudEmbedder:
    base_url: str
    api_key: str
    model: str

    def embed(self, texts: list[str]) -> list[list[float]]:
        result: list[list[float]] = []
        for offset in range(0, len(texts), EMBED_BATCH_SIZE):
            batch = texts[offset : offset + EMBED_BATCH_SIZE]
            res = _post(self.base_url, self.api_key, "/v1/embeddings", {"model": self.model, "input": batch})
            data = sorted(res["data"], key=lambda d: d["index"])
            result.extend(d["embedding"] for d in data)
        return result


@dataclass
class CloudLLM:
    base_url: str
    api_key: str
    model: str

    def chat(self, system: str | None, user: str) -> str:
        messages: list[dict] = []
        if system is not None:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": user})
        res = _post(self.base_url, self.api_key, "/v1/chat/completions", {"model": self.model, "messages": messages})
        return res["choices"][0]["message"]["content"]


def load(base_url: str, api_key: str, embed_model: str, inference_model: str) -> tuple[CloudEmbedder, CloudLLM]:
    return (
        CloudEmbedder(base_url, api_key, embed_model),
        CloudLLM(base_url, api_key, inference_model),
    )


def _post(base_url: str, api_key: str, path: str, body: dict) -> dict:
    req = urllib.request.Request(
        base_url + path,
        data=json.dumps(body).encode(),
        headers={
            "Content-Type": "application/json",
            "Authorization": f"Bearer {api_key}",
        },
    )
    with urllib.request.urlopen(req, timeout=120) as resp:
        return json.loads(resp.read())

# ═══════════════════════════════════════════
# arke/server/backend_local.py
# ═══════════════════════════════════════════

"""Local backend — llama-cpp-python, models loaded in-process from .gguf files."""
from dataclasses import dataclass

EMBED_BATCH_SIZE = 64


@dataclass
class LocalEmbedder:
    _model: object

    def embed(self, texts: list[str]) -> list[list[float]]:
        result: list[list[float]] = []
        for offset in range(0, len(texts), EMBED_BATCH_SIZE):
            batch = texts[offset : offset + EMBED_BATCH_SIZE]
            result.extend(self._model.embed(batch))
        return result


@dataclass
class LocalLLM:
    _model: object

    def chat(self, system: str | None, user: str) -> str:
        messages: list[dict] = []
        if system is not None:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": user})
        response = self._model.create_chat_completion(messages=messages)
        return response["choices"][0]["message"]["content"]


def load(embed_model_path: str, inference_model_path: str) -> tuple[LocalEmbedder, LocalLLM]:
    from llama_cpp import Llama
    embedder = Llama(model_path=embed_model_path, embedding=True, n_ctx=512, verbose=False)
    llm = Llama(model_path=inference_model_path, n_ctx=4096, verbose=False)
    return LocalEmbedder(embedder), LocalLLM(llm)

# ═══════════════════════════════════════════
# arke/server/bm25.py
# ═══════════════════════════════════════════

"""In-memory BM25 index. Built once at ingest, queried on every ask.

Standard Okapi BM25 with k1=1.5, b=0.75.
Keys are arbitrary strings — we use "<doc_id>:<chunk_index>".
"""
import math
import re
from dataclasses import dataclass, field

K1 = 1.5
B = 0.75


def _tokenize(text: str) -> list[str]:
    return re.findall(r"[a-z0-9]+", text.lower())


@dataclass
class BM25Index:
    _docs: dict[str, list[str]] = field(default_factory=dict)       # key → tokens
    _df: dict[str, int] = field(default_factory=dict)               # term → doc count
    _avgdl: float = 0.0

    def add(self, key: str, text: str) -> None:
        tokens = _tokenize(text)
        self._docs[key] = tokens
        for term in set(tokens):
            self._df[term] = self._df.get(term, 0) + 1

    def build(self) -> None:
        """Call after all add() calls to finalize avgdl."""
        if self._docs:
            self._avgdl = sum(len(t) for t in self._docs.values()) / len(self._docs)

    def scores(self, query: str) -> dict[str, float]:
        """Return BM25 score for every indexed key. Zero scores omitted."""
        terms = _tokenize(query)
        if not terms or not self._docs:
            return {}

        n = len(self._docs)
        result: dict[str, float] = {}

        for term in terms:
            df = self._df.get(term, 0)
            if df == 0:
                continue
            idf = math.log((n - df + 0.5) / (df + 0.5) + 1)

            for key, tokens in self._docs.items():
                tf = tokens.count(term)
                if tf == 0:
                    continue
                dl = len(tokens)
                norm = tf * (K1 + 1) / (tf + K1 * (1 - B + B * dl / self._avgdl))
                result[key] = result.get(key, 0.0) + idf * norm

        return result

    def clear(self) -> None:
        self._docs.clear()
        self._df.clear()
        self._avgdl = 0.0

# ═══════════════════════════════════════════
# arke/server/chunker.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

# Strongest separators first. Empty string is the final fallback (per-character).
SEPARATORS = ["\n\n", "\n", ". ", ", ", " ", ""]


@dataclass(frozen=True)
class ChunkData:
    """A chunk with its overlap context. The embedding is computed from
    `overlapped()` so the vector 'knows' about its neighbors, but only
    `clean` is stored in the database — no text duplication between rows."""
    clean: str
    head: str
    tail: str

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail


def chunk(text: str, chunk_size: int, overlap: float) -> list[ChunkData]:
    """Recursive character text splitter: split by separator hierarchy,
    then greedily merge adjacent pieces up to chunk_size."""
    if not text:
        return []

    clean = _merge(_separate(text.strip(), chunk_size), chunk_size)
    overlap_chars = int(chunk_size * overlap)
    result: list[ChunkData] = []

    for i, piece in enumerate(clean):
        head = clean[i - 1][-overlap_chars:] if (overlap_chars > 0 and i > 0) else ""
        tail = clean[i + 1][:overlap_chars] if (overlap_chars > 0 and i < len(clean) - 1) else ""
        result.append(ChunkData(clean=piece, head=head, tail=tail))

    return result


def _separate(text: str, chunk_size: int, depth: int = 0) -> list[str]:
    if depth >= len(SEPARATORS):
        return [text]

    sep = SEPARATORS[depth]
    parts = text.split(sep)
    result: list[str] = []

    for i, part in enumerate(parts):
        if len(part) < chunk_size:
            # Reattach the separator as a suffix so _merge can reconstruct
            # the original text without losing whitespace or newlines.
            suffix = sep if i < len(parts) - 1 else ""
            result.append(part + suffix)
        else:
            result.extend(_separate(part, chunk_size, depth + 1))

    return result


def _merge(splits: list[str], chunk_size: int) -> list[str]:
    raw: list[str] = []
    for s in splits:
        if not s:
            continue

        if raw and len(raw[-1]) + len(s) < chunk_size:
            raw[-1] += s
        else:
            raw.append(s)

    return raw

# ═══════════════════════════════════════════
# arke/server/config.py
# ═══════════════════════════════════════════

from __future__ import annotations

import os
from dataclasses import dataclass, replace


@dataclass(frozen=True)
class Config:
    # Dataspace
    space: str = "default"

    # Backend: "local" (llama-cpp-python) or "cloud" (OpenAI)
    backend: str = "local"

    # Local backend — paths to .gguf files
    embed_model_path: str = ""
    inference_model_path: str = ""

    # Cloud backend
    cloud_api_key: str = ""
    cloud_base_url: str = "https://api.openai.com"
    cloud_embed_model: str = "text-embedding-3-small"
    cloud_inference_model: str = "gpt-4o"

    # RAG parameters
    embedding_dim: int = 0
    chunk_size: int = 600
    overlap: float = 0.0
    alpha: float = 0.7
    k: int = 5

    def resolved(self) -> Config:
        if self.backend == "local":
            if not self.embed_model_path:
                raise ValueError("config: EMBED_MODEL_PATH is required for local backend")
            if not self.inference_model_path:
                raise ValueError("config: INFERENCE_MODEL_PATH is required for local backend")
        elif self.backend == "cloud":
            if not self.cloud_api_key:
                raise ValueError("config: CLOUD_API_KEY is required for cloud backend")
        else:
            raise ValueError(f"config: BACKEND must be 'local' or 'cloud', got '{self.backend}'")

        cfg = replace(self, embedding_dim=self.embedding_dim or DEFAULTS.embedding_dim)

        if cfg.chunk_size < 100 or cfg.chunk_size > 10000:
            raise ValueError(f"config: chunk_size must be 100..10000, got {cfg.chunk_size}")
        if cfg.overlap < 0 or cfg.overlap > 0.5:
            raise ValueError(f"config: overlap must be 0..0.5, got {cfg.overlap}")
        if cfg.alpha < 0 or cfg.alpha > 1:
            raise ValueError(f"config: alpha must be 0..1, got {cfg.alpha}")
        if cfg.k < 1 or cfg.k > 20:
            raise ValueError(f"config: k must be 1..20, got {cfg.k}")

        return cfg

    @staticmethod
    def from_env() -> Config:
        return Config(
            space=os.environ.get("ARKE_SPACE", "default"),
            backend=os.environ.get("BACKEND", "local"),
            embed_model_path=os.environ.get("EMBED_MODEL_PATH", ""),
            inference_model_path=os.environ.get("INFERENCE_MODEL_PATH", ""),
            cloud_api_key=os.environ.get("CLOUD_API_KEY", ""),
            cloud_base_url=os.environ.get("CLOUD_BASE_URL", "https://api.openai.com"),
            cloud_embed_model=os.environ.get("CLOUD_EMBED_MODEL", "text-embedding-3-small"),
            cloud_inference_model=os.environ.get("CLOUD_INFERENCE_MODEL", "gpt-4o"),
            embedding_dim=int(os.environ.get("EMBEDDING_DIM", "0")),
            chunk_size=int(os.environ.get("CHUNK_SIZE", "600")),
            overlap=float(os.environ.get("OVERLAP", "0.0")),
            alpha=float(os.environ.get("ALPHA", "0.7")),
            k=int(os.environ.get("K", "5")),
        )


DEFAULTS = Config(
    embedding_dim=1024,
)

# ═══════════════════════════════════════════
# arke/server/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/server/loader.py
# ═══════════════════════════════════════════

"""Loader — turns raw files from digest/ into (Doc, text) pairs.

Dispatcher uses file extension. Supported: .txt .md .pdf .docx .msg
Unsupported files are skipped silently — unknown formats are normal in digest/.
"""
import hashlib
import logging
from pathlib import Path

from .types import Doc

logger = logging.getLogger(__name__)


def load_digest(digest_path: Path) -> list[tuple[Doc, str]]:
    """Parse all supported files under digest_path. Returns (Doc, text) pairs."""
    results: list[tuple[Doc, str]] = []
    for path in sorted(digest_path.rglob("*")):
        if not path.is_file() or path.name.startswith("."):
            continue
        result = load_file(path, root=digest_path)
        if result is not None:
            results.append(result)
    return results


def load_file(path: Path, root: Path | None = None) -> tuple[Doc, str] | None:
    """Parse a single file. Returns (Doc, text) or None if unsupported/empty."""
    suffix = path.suffix.lower()

    match suffix:
        case ".txt" | ".md":
            text = _load_txt(path)
        case ".pdf":
            text = _load_pdf(path)
        case ".docx":
            text = _load_docx(path)
        case ".msg":
            text = _load_msg(path)
        case _:
            logger.debug("skipping unsupported file: %s", path.name)
            return None

    if not text or not text.strip():
        return None

    source = str(path.relative_to(root)) if root else path.name
    stat = path.stat()
    doc = Doc(
        id=_content_id(path),
        source=source,
        created=int(stat.st_ctime),
        modified=int(stat.st_mtime),
        metadata={"filename": path.name, "suffix": suffix},
    )
    return doc, text.strip()


def _content_id(path: Path) -> str:
    return hashlib.md5(path.read_bytes()).hexdigest()


def _load_txt(path: Path) -> str:
    return path.read_text(encoding="utf-8", errors="replace")


def _load_pdf(path: Path) -> str:
    import pdfplumber
    pages: list[str] = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()
            if text:
                pages.append(text)
    return "\n\n".join(pages)


def _load_docx(path: Path) -> str:
    from docx import Document
    doc = Document(path)
    return "\n\n".join(p.text for p in doc.paragraphs if p.text.strip())


def _load_msg(path: Path) -> str:
    import extract_msg
    with extract_msg.Message(str(path)) as msg:
        parts: list[str] = []
        if msg.subject:
            parts.append(f"Subject: {msg.subject}")
        if msg.body:
            parts.append(msg.body)
        return "\n\n".join(parts)

# ═══════════════════════════════════════════
# arke/server/mailbox.py
# ═══════════════════════════════════════════

"""Inbox/outbox file queue. Arke's only coordination primitive.

Layout:
    ~/.arke/inbox/<uuid>.json   — incoming request
    ~/.arke/outbox/<uuid>.json  — response written by Arke

Writers drop a file in inbox and poll outbox for the response.
Arke scans inbox on every tick, processes, writes outbox.
"""
import json
import os
import time
import uuid
from pathlib import Path

ARKE_HOME = Path("~/.arke").expanduser()
INBOX = ARKE_HOME / "inbox"
OUTBOX = ARKE_HOME / "outbox"

POLL_INTERVAL = 0.1   # seconds between outbox polls
POLL_TIMEOUT  = 120.0 # max seconds to wait for a response


def setup() -> None:
    INBOX.mkdir(parents=True, exist_ok=True)
    OUTBOX.mkdir(parents=True, exist_ok=True)


def send(request: dict) -> str:
    """Write a request to inbox. Returns the message id."""
    msg_id = str(uuid.uuid4())
    _atomic_write(INBOX / f"{msg_id}.json", request)
    return msg_id


def receive(msg_id: str) -> dict | None:
    """Poll outbox for a response. Blocks until response arrives or timeout."""
    path = OUTBOX / f"{msg_id}.json"
    deadline = time.monotonic() + POLL_TIMEOUT

    while time.monotonic() < deadline:
        if path.exists():
            data = json.loads(path.read_text())
            path.unlink(missing_ok=True)
            return data
        time.sleep(POLL_INTERVAL)

    return None


def drain() -> list[tuple[str, dict]]:
    """Return all pending inbox messages as (msg_id, request) pairs. Removes files."""
    messages: list[tuple[str, dict]] = []
    for f in sorted(INBOX.glob("*.json")):
        try:
            data = json.loads(f.read_text())
            messages.append((f.stem, data))
            f.unlink(missing_ok=True)
        except Exception:
            f.unlink(missing_ok=True)
    return messages


def reply(msg_id: str, response: dict) -> None:
    """Write a response to outbox."""
    _atomic_write(OUTBOX / f"{msg_id}.json", response)


def _atomic_write(path: Path, data: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(".tmp")
    tmp.write_text(json.dumps(data))
    tmp.replace(path)

# ═══════════════════════════════════════════
# arke/server/main.py
# ═══════════════════════════════════════════

"""Arke — the living organism.

Startup sequence:
  1. mount space (sdb)
  2. load config + models
  3. consume digest/ if present
  4. enter main loop

Main loop (1-second pulse):
  - drain inbox  → process requests → write outbox
  - check digest → re-ingest if hash changed
"""
import hashlib
import logging
import time
from pathlib import Path

import numpy as np

from . import chunker, loader, mailbox, sdb
from .bm25 import BM25Index
from .config import Config
from .models import Models
from .space import mount as mount_space
from .types import Chunk, Doc, SearchHit

logger = logging.getLogger(__name__)

TICK = 1.0  # seconds

SYSTEM_PROMPT = (
    "You are a legal research assistant. "
    "Answer based only on the provided documents. "
    "Be concise and cite the source for every claim."
)


def run() -> None:
    mailbox.setup()
    cfg = Config.from_env().resolved()
    space = mount_space(cfg.space)
    models = Models.load(cfg)

    digest_path = space.path / "digest"
    docs: dict[str, Doc] = {}
    bm25 = BM25Index()
    last_digest_hash = ""

    if digest_path.exists():
        logger.info("loading digest on startup...")
        last_digest_hash = _ingest(digest_path, cfg, models, docs, bm25)

    logger.info("arke ready — %d docs, %d chunks", len(docs), _chunk_count(docs))

    while True:
        _drain(docs, bm25, cfg, models)
        last_digest_hash = _watch_digest(digest_path, last_digest_hash, cfg, models, docs, bm25)
        time.sleep(TICK)


# --- ingest ------------------------------------------------------------------

def _ingest(digest_path: Path, cfg: Config, models: Models, docs: dict[str, Doc], bm25: BM25Index) -> str:
    docs.clear()
    bm25.clear()
    model_key = cfg.embed_model_path or cfg.cloud_embed_model

    for path in sorted(digest_path.rglob("*")):
        if not path.is_file() or path.name.startswith("."):
            continue

        result = loader.load_file(path, root=digest_path)
        if result is None:
            continue

        doc, text = result

        sdb.put_bin("sources", doc.id, path.read_bytes())

        chunk_datas = chunker.chunk(text, cfg.chunk_size, cfg.overlap)
        for i, cd in enumerate(chunk_datas):
            chunk = Chunk(doc_id=doc.id, chunk_index=i, clean=cd.clean, head=cd.head, tail=cd.tail)

            if not chunk.load_embedding(model_key, "1"):
                vec = models.embedder.embed([chunk.overlapped()])[0]
                chunk.embedding = np.array(vec, dtype=np.float32)
                chunk.save_embedding(model_key, "1")

            bm25.add(f"{doc.id}:{i}", chunk.overlapped())
            doc.chunks.append(chunk)

        doc.save()
        docs[doc.id] = doc

    bm25.build()
    logger.info("ingest done — %d docs, %d chunks", len(docs), _chunk_count(docs))
    return _dir_hash(digest_path)


# --- main loop ---------------------------------------------------------------

def _drain(docs: dict[str, Doc], bm25: BM25Index, cfg: Config, models: Models) -> None:
    for msg_id, request in mailbox.drain():
        try:
            response = _dispatch(request, docs, bm25, cfg, models)
        except Exception as e:
            logger.warning("handler error: %s", e)
            response = {"ok": False, "error": str(e)}
        mailbox.reply(msg_id, response)


def _dispatch(request: dict, docs: dict[str, Doc], bm25: BM25Index, cfg: Config, models: Models) -> dict:
    cmd = request.get("cmd")

    if cmd == "ask":
        return _ask(request, docs, bm25, cfg, models)

    if cmd == "ping":
        return {"ok": True, "pong": True}

    if cmd == "sample":
        return _sample(request, docs)

    return {"ok": False, "error": f"unknown cmd: {cmd}"}


def _watch_digest(
    digest_path: Path,
    last_hash: str,
    cfg: Config,
    models: Models,
    docs: dict[str, Doc],
    bm25: BM25Index,
) -> str:
    if not digest_path.exists():
        return last_hash

    current_hash = _dir_hash(digest_path)
    if current_hash == last_hash:
        return last_hash

    logger.info("new digest detected, re-ingesting...")
    return _ingest(digest_path, cfg, models, docs, bm25)


# --- ask ---------------------------------------------------------------------

def _ask(request: dict, docs: dict[str, Doc], bm25: BM25Index, cfg: Config, models: Models) -> dict:
    query = request.get("query", "")
    if not query:
        return {"ok": False, "error": "query is required"}

    q_vec = np.array(models.embedder.embed([query])[0], dtype=np.float32)
    hits = _hybrid_search(docs, bm25, q_vec, query, cfg.k, cfg.alpha)

    if not hits:
        return {"ok": True, "answer": "No relevant documents found.", "citations": []}

    context = "\n\n".join(
        f"[{i+1}] (source: {h.chunk.doc_id[:8]}) {h.chunk.clean}"
        for i, h in enumerate(hits)
    )
    answer = models.llm.chat(SYSTEM_PROMPT, f"Documents:\n{context}\n\nQuestion: {query}")

    citations = [
        {"source": h.chunk.doc_id, "text": h.chunk.clean[:200], "score": round(h.similarity, 3)}
        for h in hits
    ]
    return {"ok": True, "answer": answer, "citations": citations}


def _sample(request: dict, docs: dict[str, Doc]) -> dict:
    import random
    limit = request.get("limit", 50)
    all_chunks = [chunk for doc in docs.values() for chunk in doc.chunks]
    sample = random.sample(all_chunks, min(limit, len(all_chunks)))
    return {"ok": True, "chunks": [
        {"doc_id": c.doc_id, "chunk_index": c.chunk_index, "clean": c.clean, "head": c.head, "tail": c.tail}
        for c in sample
    ]}


def _hybrid_search(
    docs: dict[str, Doc],
    bm25: BM25Index,
    q_vec: np.ndarray,
    query: str,
    k: int,
    alpha: float,
) -> list[SearchHit]:
    q_norm = np.linalg.norm(q_vec)
    if q_norm == 0:
        return []

    # cosine scores
    cosine: dict[str, float] = {}
    for doc in docs.values():
        for chunk in doc.chunks:
            if chunk.embedding is None:
                continue
            c_norm = np.linalg.norm(chunk.embedding)
            if c_norm == 0:
                continue
            key = f"{chunk.doc_id}:{chunk.chunk_index}"
            cosine[key] = float(np.dot(q_vec, chunk.embedding) / (q_norm * c_norm))

    # bm25 scores — normalize to [0, 1]
    bm25_raw = bm25.scores(query)
    bm25_max = max(bm25_raw.values(), default=1.0)
    bm25_norm = {k: v / bm25_max for k, v in bm25_raw.items()} if bm25_max > 0 else {}

    # hybrid score
    all_keys = set(cosine) | set(bm25_norm)
    scored: list[tuple[str, float]] = []
    for key in all_keys:
        score = alpha * cosine.get(key, 0.0) + (1 - alpha) * bm25_norm.get(key, 0.0)
        scored.append((key, score))

    scored.sort(key=lambda x: x[1], reverse=True)

    # resolve keys back to chunks
    chunk_map: dict[str, Chunk] = {
        f"{chunk.doc_id}:{chunk.chunk_index}": chunk
        for doc in docs.values()
        for chunk in doc.chunks
    }

    hits: list[SearchHit] = []
    for key, score in scored[:k]:
        chunk = chunk_map.get(key)
        if chunk:
            hits.append(SearchHit(chunk=chunk, similarity=score))
    return hits


# --- helpers -----------------------------------------------------------------

def _chunk_count(docs: dict[str, Doc]) -> int:
    return sum(len(d.chunks) for d in docs.values())


def _dir_hash(path: Path) -> str:
    h = hashlib.md5()
    for f in sorted(path.rglob("*")):
        if f.is_file():
            st = f.stat()
            h.update(str(f.relative_to(path)).encode())
            h.update(str(st.st_size).encode())
            h.update(str(st.st_mtime_ns).encode())
    return h.hexdigest()

# ═══════════════════════════════════════════
# arke/server/models.py
# ═══════════════════════════════════════════

"""Models — Embedder and LLM protocols + factory."""
from dataclasses import dataclass
from typing import Protocol

from .config import Config


class Embedder(Protocol):
    def embed(self, texts: list[str]) -> list[list[float]]: ...


class LLM(Protocol):
    def chat(self, system: str | None, user: str) -> str: ...


@dataclass
class Models:
    embedder: Embedder
    llm: LLM

    @staticmethod
    def load(cfg: Config) -> "Models":
        if cfg.backend == "cloud":
            from .backend_cloud import load
            embedder, llm = load(cfg.cloud_base_url, cfg.cloud_api_key, cfg.cloud_embed_model, cfg.cloud_inference_model)
        else:
            from .backend_local import load
            embedder, llm = load(cfg.embed_model_path, cfg.inference_model_path)

        return Models(embedder=embedder, llm=llm)

# ═══════════════════════════════════════════
# arke/server/sdb.py
# ═══════════════════════════════════════════

"""Unified file-based store for all of Arke's cached state.

Everything that passes through Arke is cache — documents, embeddings, source files,
sessions. Microsoft (or the user's filesystem) is the source of truth; sdb is the
gut that digests and holds the processed form. Blow it away at any time — it
rebuilds from the source.

Layout:
    <root>/<table>/<id[:2]>/<id>.<ext>

Sharding by first two characters of the id keeps any single directory small
regardless of corpus size.
"""
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
import json
import os
import shutil

import numpy as np


_root: Path | None = None


def mount(path: str | Path) -> None:
    global _root
    _root = Path(path).expanduser()
    _root.mkdir(parents=True, exist_ok=True)
    for tmp in _root.rglob("*.tmp"):
        tmp.unlink(missing_ok=True)


def _table(table: str) -> Path:
    return _root / table


def _shard(table: str, id: str) -> Path:
    return _table(table) / id[:2]


def _path(table: str, id: str, ext: str) -> Path:
    suffix = f".{ext}" if ext else ""
    return _shard(table, id) / f"{id}{suffix}"


@contextmanager
def _atomic_open(path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_name(path.name + ".tmp")
    with open(tmp, "wb") as f:
        yield f
        f.flush()
        os.fsync(f.fileno())
    tmp.replace(path)


# JSON — documents, sessions, any structured record -----------------------

def put_json(table: str, id: str, data: dict) -> None:
    with _atomic_open(_path(table, id, "json")) as f:
        f.write(json.dumps(data, indent=2).encode())


def get_json(table: str, id: str) -> dict | None:
    p = _path(table, id, "json")
    if not p.exists():
        return None
    return json.loads(p.read_text())


def scan_json(table: str) -> Iterator[tuple[str, dict]]:
    table_dir = _table(table)
    if not table_dir.exists():
        return
    for f in table_dir.rglob("*.json"):
        yield f.stem, json.loads(f.read_text())


# Vectors — embeddings --------------------------------------------------------

def put_vec(table: str, id: str, vec: np.ndarray) -> None:
    with _atomic_open(_path(table, id, "npy")) as f:
        np.save(f, vec)


def get_vec(table: str, id: str) -> np.ndarray | None:
    p = _path(table, id, "npy")
    if not p.exists():
        return None
    return np.load(p)


# Raw bytes — source files (PDF, DOCX, MSG), any opaque blob ------------------

def put_bin(table: str, id: str, data: bytes) -> None:
    with _atomic_open(_path(table, id, "")) as f:
        f.write(data)


def get_bin(table: str, id: str) -> bytes | None:
    p = _path(table, id, "")
    if not p.exists():
        return None
    return p.read_bytes()


# Delete ----------------------------------------------------------------------

def delete(table: str, id: str) -> None:
    """Remove a single record (any extension) from the table."""
    shard = _shard(table, id)
    if not shard.exists():
        return
    for f in shard.iterdir():
        if f.name == id or f.name.startswith(f"{id}."):
            f.unlink(missing_ok=True)


def wipe(table: str) -> None:
    """Remove the entire table — every record under every shard."""
    table_dir = _table(table)
    if table_dir.exists():
        shutil.rmtree(table_dir)

# ═══════════════════════════════════════════
# arke/server/space.py
# ═══════════════════════════════════════════

"""Space — an isolated container for all Arke state.

Each space has its own sdb root and its own config. Switching spaces
is just calling mount() with a different name — the old in-memory
objects become invalid, the new space's disk state takes over.

Layout:
    <home>/<name>/
        config.json    — chunk params, model, source_dirs
        data/          — sdb root (documents, chunks, embeddings, sessions)
"""
from __future__ import annotations

import shutil
from dataclasses import dataclass
from pathlib import Path

from . import sdb

ARKE_HOME = Path("~/.arke")


@dataclass(frozen=True)
class Space:
    name: str
    path: Path

    @property
    def data(self) -> Path:
        return self.path / "data"

    @property
    def config_path(self) -> Path:
        return self.path / "config.json"

    def wipe(self) -> None:
        """Erase all indexed data. Config is preserved."""
        if self.data.exists():
            shutil.rmtree(self.data)
        sdb.mount(self.data)


def mount(name: str, home: str | Path = ARKE_HOME) -> Space:
    """Mount a space by name. Sets the sdb root. Config is loaded separately."""
    path = Path(home).expanduser() / "dataspaces" / name
    path.mkdir(parents=True, exist_ok=True)
    space = Space(name=name, path=path)
    sdb.mount(space.data)
    return space

# ═══════════════════════════════════════════
# arke/server/types.py
# ═══════════════════════════════════════════

import hashlib
from dataclasses import dataclass, field
from typing import Any, ClassVar, Iterator

import numpy as np

from . import sdb


@dataclass
class Chunk:
    doc_id: str
    chunk_index: int
    clean: str
    head: str
    tail: str

    # Runtime only — not serialized. Loaded from sdb.get_vec or computed on GPU.
    embedding: np.ndarray | None = field(default=None, compare=False, repr=False)

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail

    def cache_key(self, model_id: str, model_version: str) -> str:
        raw = f"{model_id}:{model_version}:{self.overlapped()}"
        return hashlib.md5(raw.encode()).hexdigest()

    def save_embedding(self, model_id: str, model_version: str) -> None:
        if self.embedding is None:
            return
        sdb.put_vec("embeddings", self.cache_key(model_id, model_version), self.embedding)

    def load_embedding(self, model_id: str, model_version: str) -> bool:
        vec = sdb.get_vec("embeddings", self.cache_key(model_id, model_version))
        if vec is None:
            return False
        self.embedding = vec
        return True


@dataclass
class Doc:
    TABLE: ClassVar[str] = "documents"

    id: str
    source: str
    created: int
    modified: int
    metadata: dict[str, Any] = field(default_factory=dict)
    tags: list[str] = field(default_factory=list)

    # Runtime only — re-grown on every unwrap, never serialized.
    chunks: list[Chunk] = field(default_factory=list, compare=False, repr=False)
    _dirty: bool = field(default=False, compare=False, repr=False)

    def wrap(self) -> dict:
        return {
            "id": self.id,
            "source": self.source,
            "created": self.created,
            "modified": self.modified,
            "metadata": self.metadata,
            "tags": self.tags,
        }

    @classmethod
    def unwrap(cls, d: dict) -> "Doc":
        return cls(
            id=d["id"],
            source=d["source"],
            created=d["created"],
            modified=d["modified"],
            metadata=d.get("metadata", {}),
            tags=d.get("tags", []),
        )

    def save(self) -> None:
        sdb.put_json(self.TABLE, self.id, self.wrap())
        self._dirty = False

    @classmethod
    def load(cls, id: str) -> "Doc | None":
        data = sdb.get_json(cls.TABLE, id)
        return cls.unwrap(data) if data else None

    @classmethod
    def scan(cls) -> Iterator["Doc"]:
        for _, data in sdb.scan_json(cls.TABLE):
            yield cls.unwrap(data)


@dataclass(frozen=True)
class SearchHit:
    chunk: Chunk
    similarity: float


@dataclass(frozen=True)
class SearchAnswer:
    answer: str
    hits: list[SearchHit]

# ═══════════════════════════════════════════
# pyproject.toml
# ═══════════════════════════════════════════

[project]
name = "arke-terminal"
version = "0.1.192"
description = "Local-first RAG for legal teams."
requires-python = ">=3.12"
license = "MIT"
authors = [
    { name = "Pavel Dolgoter", email = "mikepromogratus@proton.me" },
]
readme = "README.md"
keywords = ["rag", "search", "embeddings", "legal", "local"]
classifiers = [
    "Development Status :: 3 - Alpha",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.12",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
]
dependencies = [
    "python-dotenv>=1.0",
    "numpy>=2.0",
    "click>=8.1",
    "llama-cpp-python>=0.3",
    "pdfplumber>=0.11",
    "python-docx>=1.1",
    "extract-msg>=0.48",
    "msal>=1.36.0",
    "httpx>=0.28.1",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
]

[project.urls]
Repository = "https://github.com/padolgot/arke-terminal"

[project.scripts]
arke-server  = "arke.server.main:run"
arke-gateway = "arke.clients.socket:run"
arke         = "arke.clients.cli:main"
arke-eval    = "arke.eval.sweep:run"
arke-mail    = "arke.clients.email:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["arke"]

# ═══════════════════════════════════════════
# .env.example
# ═══════════════════════════════════════════


API_KEY=
