# Arke Terminal — Source Dump
# Generated: 2026-04-18T10:55:16Z

# ═══════════════════════════════════════════
# README.md
# ═══════════════════════════════════════════

# Arke Terminal

AI document search for legal teams. Privilege-safe, on-premise.

Cloud AI breaks attorney-client privilege (*United States v. Heppner*, *Hamid v SSHD*). Arke runs on your server. Your documents never leave your network.

---

## How it works

Arke is a single Python process with two pipes.

**Vertical pipe — digestion.** Drop documents into `digest/`. Arke detects the change, loads every file (PDF, DOCX, MSG, TXT), chunks and embeds it, keeps everything in memory and on local disk. On the next question, the answers are already there.

**Horizontal pipe — query.** A question arrives — from email, terminal, or CLI. Arke embeds the query, runs hybrid search (cosine + BM25), passes the top results to the LLM, and returns a cited answer. No round-trips, no external services: the LLM runs in the same process via `llama-cpp-python`.

The document is a seed on disk (plain JSON) and a tree in memory (chunks, embeddings, full text). The tree is always rebuilt from the seed. Nothing clever lives on disk.

---

## Install

```bash
pip install arke-terminal
cp .env.example .env   # fill in model paths or cloud API key
arke-server            # start the engine
```

No Docker. No Postgres. No Ollama daemon.

---

## Interfaces

**Email (Microsoft 365)**

Arke connects to a shared mailbox via Graph API webhooks. A lawyer sends a question to `ask@yourfirm.legal`, Arke replies with an answer and citations. Runs with `arke-mail`.

Requires: Azure AD app registration, `cloudflared` tunnel for the webhook endpoint.

```
M365_TENANT_ID, M365_CLIENT_ID, M365_CLIENT_SECRET
M365_MAILBOX, M365_WEBHOOK_URL
```

**Terminal**

```bash
arke ask "What are the termination clauses?"
```

**Eval sweep** — find the best retrieval config (chunk size, overlap, alpha, k) by running MRR benchmarks across presets:

```bash
python -m arke.eval.sweep --space legalbench --level medium --limit 50
```

---

## Backends

| Backend | When to use |
|---------|-------------|
| `local` | Production. GPU server, `.gguf` models. Zero external calls. |
| `cloud` | Eval and development. OpenAI-compatible API. |

```bash
# local
BACKEND=local
EMBED_MODEL_PATH=/models/nomic-embed.gguf
INFERENCE_MODEL_PATH=/models/mistral.gguf

# cloud
BACKEND=cloud
CLOUD_API_KEY=sk-...
CLOUD_EMBED_MODEL=text-embedding-3-small
CLOUD_INFERENCE_MODEL=gpt-4o
```

---

## Input formats

PDF, DOCX, MSG (Outlook), TXT, Markdown. Drop files into `digest/`, Arke picks them up automatically.

---

## Configuration

All via environment variables (`.env`):

| Variable | Default | Description |
|----------|---------|-------------|
| `ARKE_SPACE` | `default` | Dataspace name (isolates documents per client) |
| `CHUNK_SIZE` | `600` | Characters per chunk |
| `OVERLAP` | `0.0` | Overlap fraction (0–0.5) |
| `ALPHA` | `0.7` | Blend: 1.0 = pure semantic, 0.0 = pure keyword |
| `K` | `5` | Top-k results passed to LLM |


# ═══════════════════════════════════════════
# arke/clients/cli.py
# ═══════════════════════════════════════════

"""CLI client. Usage: arke ask "your question" """
import sys

from arke.server import mailbox


def ask(query: str) -> None:
    msg_id = mailbox.send({"cmd": "ask", "query": query})
    response = mailbox.receive(msg_id)

    if response is None:
        print("error: arke did not respond", file=sys.stderr)
        sys.exit(1)

    if not response.get("ok"):
        print(f"error: {response.get('error')}", file=sys.stderr)
        sys.exit(1)

    print(response["answer"])

    for cite in response.get("citations", []):
        print(f"  [{cite['source']}] {cite['text'][:80]}...")


def main() -> None:
    if len(sys.argv) < 3 or sys.argv[1] != "ask":
        print("usage: arke ask <query>")
        sys.exit(1)

    ask(" ".join(sys.argv[2:]))

# ═══════════════════════════════════════════
# arke/clients/email.py
# ═══════════════════════════════════════════

"""Email client via Microsoft Graph webhooks.

Steps covered: auth with Azure AD client credentials, Graph subscription
against the shared mailbox, echo reply to any new email. Arke RAG is wired
separately through arke.server.mailbox.

Setup (two terminals):
    $ cloudflared tunnel --url http://localhost:8080
    # copy the printed https://<slug>.trycloudflare.com URL into
    # M365_WEBHOOK_URL in .env, then:
    $ arke-mail

The webhook server runs on a daemon thread so Graph can validate the
URL synchronously during subscription creation (Graph holds the POST
open until our endpoint echoes back the validationToken). Everything
else — subscription lifecycle, renewal, shutdown — lives in the main
thread as a plain synchronous loop.
"""

import json
import logging
import os
import secrets
import signal
import threading
import time
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, urlparse

import httpx
import msal

logger = logging.getLogger(__name__)

GRAPH = "https://graph.microsoft.com/v1.0"
SCOPE = ["https://graph.microsoft.com/.default"]
SUBSCRIPTION_TTL_MIN = 60
RENEWAL_INTERVAL_SEC = 50 * 60


@dataclass(frozen=True)
class EmailConfig:
    tenant_id: str
    client_id: str
    client_secret: str
    mailbox: str
    webhook_url: str
    webhook_port: int = 8080

    @staticmethod
    def from_env() -> "EmailConfig":
        def req(key: str) -> str:
            value = os.environ.get(key, "")
            if not value:
                raise ValueError(f"email config: {key} is required")
            return value

        return EmailConfig(
            tenant_id=req("M365_TENANT_ID"),
            client_id=req("M365_CLIENT_ID"),
            client_secret=req("M365_CLIENT_SECRET"),
            mailbox=req("M365_MAILBOX"),
            webhook_url=req("M365_WEBHOOK_URL"),
            webhook_port=int(os.environ.get("M365_WEBHOOK_PORT", "8080")),
        )


class _SeenIds:
    """Bounded FIFO of recently-processed message IDs. Graph occasionally
    redelivers a notification (e.g. on our 5xx or a transient network error);
    without dedup we would reply twice to the same email. Safe without a lock:
    HTTPServer serves one request at a time."""

    def __init__(self, max_size: int = 512) -> None:
        self._order: deque[str] = deque(maxlen=max_size)
        self._set: set[str] = set()

    def check_and_add(self, item: str) -> bool:
        """Return True if already seen; otherwise record and return False."""
        if item in self._set:
            return True
        if len(self._order) == self._order.maxlen:
            self._set.discard(self._order[0])
        self._order.append(item)
        self._set.add(item)
        return False


def _auth(token: str) -> dict[str, str]:
    return {"Authorization": f"Bearer {token}"}


def _iso_utc(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")


def _send(call: Callable[[], httpx.Response], max_tries: int = 3) -> httpx.Response:
    """Invoke a Graph request with retries on 429 and 5xx, exponential backoff.

    401 is not retried — MSAL caches a valid token, so a rejection means a
    deeper auth problem that should surface, not be masked.
    """
    delay = 1.0
    for attempt in range(max_tries):
        r = call()
        last_attempt = attempt == max_tries - 1
        if r.status_code == 429 and not last_attempt:
            time.sleep(float(r.headers.get("Retry-After", delay)))
            delay *= 2
            continue
        if 500 <= r.status_code < 600 and not last_attempt:
            time.sleep(delay)
            delay *= 2
            continue
        return r
    return r


def acquire_token(app: msal.ConfidentialClientApplication) -> str:
    result = app.acquire_token_for_client(scopes=SCOPE)
    if "access_token" not in result:
        raise RuntimeError(f"auth: {result.get('error_description', result)}")
    return result["access_token"]


def create_subscription(http: httpx.Client, token: str, cfg: EmailConfig, client_state: str) -> str:
    # Scope to the Inbox folder so outbound replies (saved in Sent Items) do
    # not trigger notifications. Without this the bot receives its own sent
    # messages and can recursively reply to itself.
    expires = datetime.now(timezone.utc) + timedelta(minutes=SUBSCRIPTION_TTL_MIN)
    body = {
        "changeType": "created",
        "notificationUrl": cfg.webhook_url,
        "resource": f"/users/{cfg.mailbox}/mailFolders('inbox')/messages",
        "expirationDateTime": _iso_utc(expires),
        "clientState": client_state,
    }
    r = _send(lambda: http.post(f"{GRAPH}/subscriptions", json=body, headers=_auth(token)))
    r.raise_for_status()
    return r.json()["id"]


def renew_subscription(http: httpx.Client, token: str, sub_id: str) -> None:
    expires = datetime.now(timezone.utc) + timedelta(minutes=SUBSCRIPTION_TTL_MIN)
    body = {"expirationDateTime": _iso_utc(expires)}
    url = f"{GRAPH}/subscriptions/{sub_id}"
    r = _send(lambda: http.patch(url, json=body, headers=_auth(token)))
    r.raise_for_status()


def delete_subscription(http: httpx.Client, token: str, sub_id: str) -> None:
    url = f"{GRAPH}/subscriptions/{sub_id}"
    r = _send(lambda: http.delete(url, headers=_auth(token)))
    if r.status_code >= 400:
        logger.warning("delete subscription %s: %d", sub_id, r.status_code)


def fetch_message(http: httpx.Client, token: str, mailbox: str, msg_id: str) -> dict:
    url = f"{GRAPH}/users/{mailbox}/messages/{msg_id}"
    params = {"$select": "id,subject,from,body"}
    headers = {**_auth(token), "Prefer": 'outlook.body-content-type="text"'}
    r = _send(lambda: http.get(url, params=params, headers=headers))
    r.raise_for_status()
    return r.json()


def reply_to_message(http: httpx.Client, token: str, mailbox: str, msg_id: str, comment: str) -> None:
    url = f"{GRAPH}/users/{mailbox}/messages/{msg_id}/reply"
    body = {"comment": comment}
    r = _send(lambda: http.post(url, json=body, headers=_auth(token)))
    r.raise_for_status()


def mark_as_read(http: httpx.Client, token: str, mailbox: str, msg_id: str) -> None:
    url = f"{GRAPH}/users/{mailbox}/messages/{msg_id}"
    body = {"isRead": True}
    r = _send(lambda: http.patch(url, json=body, headers=_auth(token)))
    r.raise_for_status()


def process_message(
    http: httpx.Client, token: str, mailbox: str, msg_id: str, workspace_path: "Path"
) -> None:
    from arke.server import mailbox as arke_mailbox

    msg = fetch_message(http, token, mailbox, msg_id)
    subject = msg.get("subject") or "(no subject)"
    sender = (msg.get("from") or {}).get("emailAddress", {}).get("address", "unknown")
    body_text = (msg.get("body") or {}).get("content", "").strip()
    logger.info("received: %s (from %s)", subject, sender)

    query = body_text or subject
    arke_msg_id = arke_mailbox.send({"cmd": "ask", "query": query}, workspace_path)
    response = arke_mailbox.receive(arke_msg_id, workspace_path)

    if response and response.get("ok"):
        answer = response.get("answer", "No answer available.")
    else:
        answer = "Arke could not process your request at this time."

    reply_to_message(http, token, mailbox, msg_id, answer)
    mark_as_read(http, token, mailbox, msg_id)


def _build_handler(
    http: httpx.Client,
    msal_app: msal.ConfidentialClientApplication,
    cfg: EmailConfig,
    client_state: str,
    workspace_path: "Path",
) -> type[BaseHTTPRequestHandler]:
    seen = _SeenIds()

    class WebhookHandler(BaseHTTPRequestHandler):
        def log_message(self, fmt: str, *args: object) -> None:
            logger.debug("http: " + fmt, *args)

        def do_POST(self) -> None:
            query = parse_qs(urlparse(self.path).query)
            if "validationToken" in query:
                token_value = query["validationToken"][0]
                payload = token_value.encode()
                self.send_response(200)
                self.send_header("Content-Type", "text/plain")
                self.send_header("Content-Length", str(len(payload)))
                self.end_headers()
                self.wfile.write(payload)
                logger.info("subscription validated")
                return

            length = int(self.headers.get("Content-Length", "0"))
            raw = self.rfile.read(length)
            self.send_response(202)
            self.end_headers()

            try:
                body = json.loads(raw)
            except json.JSONDecodeError:
                logger.warning("invalid json body")
                return

            token = acquire_token(msal_app)
            for notif in body.get("value", []):
                if notif.get("clientState") != client_state:
                    logger.warning("clientState mismatch, dropping")
                    continue
                msg_id = (notif.get("resourceData") or {}).get("id")
                if not msg_id:
                    continue
                if seen.check_and_add(msg_id):
                    logger.info("duplicate notification for %s, skipping", msg_id)
                    continue
                try:
                    process_message(http, token, cfg.mailbox, msg_id, workspace_path)
                except httpx.HTTPError as exc:
                    logger.error("process %s failed: %s", msg_id, exc)

    return WebhookHandler


def _install_term_handler() -> None:
    """Translate SIGTERM into KeyboardInterrupt so systemd/docker stop paths run
    the same finally block as Ctrl-C, deleting the Graph subscription cleanly.
    Python's default SIGTERM kills the process mid-stride, skipping cleanup."""

    def handler(signum: int, frame: object) -> None:
        raise KeyboardInterrupt()

    signal.signal(signal.SIGTERM, handler)


def run(cfg: EmailConfig, workspace_path: "Path") -> None:
    logger.info("email client starting, mailbox=%s, webhook=%s", cfg.mailbox, cfg.webhook_url)
    _install_term_handler()

    msal_app = msal.ConfidentialClientApplication(
        client_id=cfg.client_id,
        client_credential=cfg.client_secret,
        authority=f"https://login.microsoftonline.com/{cfg.tenant_id}",
    )

    with httpx.Client(timeout=30) as http:
        client_state = secrets.token_urlsafe(32)
        handler_cls = _build_handler(http, msal_app, cfg, client_state, workspace_path)
        server = HTTPServer(("127.0.0.1", cfg.webhook_port), handler_cls)

        server_thread = threading.Thread(target=server.serve_forever, daemon=True)
        server_thread.start()
        logger.info("listening on 127.0.0.1:%d", cfg.webhook_port)

        sub_id: str | None = None
        try:
            token = acquire_token(msal_app)
            sub_id = create_subscription(http, token, cfg, client_state)
            logger.info("created subscription %s", sub_id)

            while True:
                time.sleep(RENEWAL_INTERVAL_SEC)
                renew_subscription(http, acquire_token(msal_app), sub_id)
                logger.info("renewed subscription")
        except KeyboardInterrupt:
            logger.info("shutting down")
        finally:
            if sub_id is not None:
                try:
                    delete_subscription(http, acquire_token(msal_app), sub_id)
                except Exception:
                    logger.exception("cleanup failed")
            server.shutdown()


def main() -> None:
    from pathlib import Path
    from dotenv import load_dotenv

    load_dotenv()
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
    workspace_name = os.environ.get("ARKE_WORKSPACE", "default")
    workspace_path = Path.home() / ".arke" / "workspaces" / workspace_name
    run(EmailConfig.from_env(), workspace_path)


if __name__ == "__main__":
    main()

# ═══════════════════════════════════════════
# arke/clients/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/clients/socket.py
# ═══════════════════════════════════════════

"""Unix socket gateway. Knows nothing about Arke internals.

Accepts a connection, reads a JSON request, drops it in inbox,
polls outbox for the response, sends it back, closes the connection.
"""
import json
import logging
import socket
from pathlib import Path

from arke.server import mailbox

logger = logging.getLogger(__name__)

SOCK_PATH = Path("~/.arke/arke.sock").expanduser()


def run(sock_path: Path = SOCK_PATH) -> None:
    """Start socket gateway. Blocks forever — run in a dedicated thread or process."""
    mailbox.setup()

    if sock_path.exists():
        sock_path.unlink()

    server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    server.bind(str(sock_path))
    server.listen()
    logger.info("gateway listening on %s", sock_path)

    try:
        while True:
            conn, _ = server.accept()
            conn.settimeout(30)
            try:
                _handle(conn)
            except Exception as e:
                logger.warning("connection error: %s", e)
            finally:
                conn.close()
    finally:
        server.close()
        if sock_path.exists():
            sock_path.unlink()


def _handle(conn: socket.socket) -> None:
    f = conn.makefile("rb")
    line = f.readline()
    if not line:
        return

    try:
        request = json.loads(line)
    except json.JSONDecodeError as e:
        _send(conn, {"ok": False, "error": f"invalid JSON: {e}"})
        return

    logger.info("cmd=%s", request.get("cmd"))

    msg_id = mailbox.send(request)
    response = mailbox.receive(msg_id)

    if response is None:
        _send(conn, {"ok": False, "error": "arke did not respond in time"})
        return

    _send(conn, response)


def _send(conn: socket.socket, data: dict) -> None:
    conn.sendall(json.dumps(data).encode() + b"\n")

# ═══════════════════════════════════════════
# arke/clients/tui.py
# ═══════════════════════════════════════════

"""TUI client. Run: python -m arke.clients.tui"""
# TODO: replace with Textual when we wire up the UI
# For now: minimal readline loop so we can test mailbox end-to-end

import sys

from arke.server import mailbox


def run() -> None:
    print("Arke TUI — type your question, Ctrl-C to exit\n")

    while True:
        try:
            query = input("> ").strip()
        except (KeyboardInterrupt, EOFError):
            print()
            break

        if not query:
            continue

        msg_id = mailbox.send({"cmd": "ask", "query": query})
        response = mailbox.receive(msg_id)

        if response is None:
            print("error: arke did not respond\n")
            continue

        if not response.get("ok"):
            print(f"error: {response.get('error')}\n")
            continue

        print(f"\n{response['answer']}\n")

        for cite in response.get("citations", []):
            print(f"  [{cite['source']}] {cite['text'][:80]}...")
        print()


if __name__ == "__main__":
    run()

# ═══════════════════════════════════════════
# arke/digest/__init__.py
# ═══════════════════════════════════════════

from .sync import RcloneSource, run

__all__ = ["RcloneSource", "run"]

# ═══════════════════════════════════════════
# arke/digest/sync.py
# ═══════════════════════════════════════════

import hashlib
import logging
import shutil
import subprocess
import time
from pathlib import Path

logger = logging.getLogger(__name__)


class RcloneSource:
    def __init__(self, name: str, remote: str):
        self._name = name
        self._remote = remote

    @property
    def name(self) -> str:
        return self._name

    def sync_to(self, dest: Path) -> None:
        dest.mkdir(parents=True, exist_ok=True)
        subprocess.run(
            ["rclone", "sync", self._remote, str(dest),
             "--timeout", "10m",
             "--contimeout", "60s"],
            check=True,
        )


def _dir_hash(path: Path) -> str:
    h = hashlib.md5()
    for f in sorted(path.rglob("*")):
        if f.is_file():
            st = f.stat()
            h.update(str(f.relative_to(path)).encode())
            h.update(str(st.st_size).encode())
            h.update(str(st.st_mtime_ns).encode())
    return h.hexdigest()


def _load_hash(space: Path) -> str:
    p = space / ".sync_hash"
    return p.read_text().strip() if p.exists() else ""


def _save_hash(space: Path, h: str) -> None:
    (space / ".sync_hash").write_text(h)


def run(space: Path, sources: list[RcloneSource], interval: int = 60) -> None:
    """Sync loop. Blocks forever — call from a dedicated process."""
    staging = space / "staging"
    digest = space / "digest"
    staging.mkdir(parents=True, exist_ok=True)

    # survive restarts without forcing a full re-ingest
    last_hash = _load_hash(space)

    while True:
        for source in sources:
            try:
                source.sync_to(staging / source.name)
            except Exception as e:
                logger.warning("source %s failed: %s", source.name, e)

        current_hash = _dir_hash(staging)
        if current_hash != last_hash:
            tmp = digest.with_name(digest.name + ".tmp")
            old = digest.with_name(digest.name + ".old")

            if tmp.exists():
                shutil.rmtree(tmp)
            if old.exists():
                shutil.rmtree(old)

            shutil.copytree(staging, tmp, symlinks=True)

            if digest.exists():
                # This rename is the only junction between the sync daemon and Arke.
                # If Arke consumed digest between exists() and here, this raises — let it.
                # systemd restarts in 5s, digest is gone, second attempt goes through clean.
                digest.rename(old)

            tmp.rename(digest)

            if old.exists():
                shutil.rmtree(old)

            last_hash = current_hash
            _save_hash(space, last_hash)
            logger.info("digest published")

        time.sleep(interval)

# ═══════════════════════════════════════════
# arke/eval/download_corpora.py
# ═══════════════════════════════════════════

"""Download eval corpora. Run once to populate ~/.arke/data/.

Usage:
    python -m arke.eval.download_corpora

Corpora:
    legalbench-rag  — ~87 MB, legal QA benchmark
    (more to be added)
"""
import io
import sys
import zipfile
from pathlib import Path
from urllib.error import URLError
from urllib.request import urlopen

DATA_DIR = Path.home() / ".arke" / "data"

CORPORA = {
    "legalbench-rag": {
        "url": "https://www.dropbox.com/scl/fo/r7xfa5i3hdsbxex1w6amw/AID389Olvtm-ZLTKAPrw6k4?rlkey=5n8zrbk4c08lbit3iiexofmwg&st=0hu354cq&dl=1",
        "dest": DATA_DIR / "legalbench-rag",
        "check": "corpus/*.txt",
        "size": "~87 MB",
    },
}


def download(name: str) -> None:
    corpus = CORPORA[name]
    dest: Path = corpus["dest"]
    check_glob: str = corpus["check"]

    if dest.exists() and any(dest.rglob(check_glob.split("/")[-1])):
        print(f"{name}: already at {dest}")
        return

    print(f"{name}: downloading {corpus['size']}...")
    try:
        raw = urlopen(corpus["url"]).read()
    except URLError as exc:
        raise RuntimeError(f"{name}: download failed: {exc}") from exc

    dest.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(io.BytesIO(raw)) as zf:
        zf.extractall(dest)

    count = sum(1 for _ in dest.rglob("*.txt"))
    print(f"{name}: extracted {count} files to {dest}")


def main() -> None:
    for name in CORPORA:
        try:
            download(name)
        except RuntimeError as exc:
            print(f"error: {exc}", file=sys.stderr)
            sys.exit(1)


if __name__ == "__main__":
    main()

# ═══════════════════════════════════════════
# arke/eval/gen.py
# ═══════════════════════════════════════════

"""Generate eval cases from chunks using LLM.

For each sampled chunk, LLM generates questions that only that chunk can answer.
This creates ground-truth (query → expected_chunk) pairs for scoring.
"""
import json
import random
import re
from dataclasses import dataclass

from arke.server.models import LLM
from arke.server.types import Chunk

INSTRUCTION = (
    "You will receive a text chunk from a legal document. "
    "Generate 1 to 3 questions that ONLY this specific chunk can answer. "
    "Questions must include specific details — names, numbers, dates, case citations, unique terms. "
    "Avoid generic questions. "
    "Return ONLY a JSON array of strings, nothing else. "
    'Example: ["question 1", "question 2"]'
)


@dataclass(frozen=True)
class EvalCase:
    query: str
    expected_key: str  # "doc_id:chunk_index"


def make_cases(llm: LLM, chunks: list[Chunk], limit: int) -> list[EvalCase]:
    samples = random.sample(chunks, min(limit, len(chunks)))
    cases: list[EvalCase] = []

    for i, chunk in enumerate(samples):
        questions = _generate_questions(llm, chunk.clean)
        key = f"{chunk.doc_id}:{chunk.chunk_index}"
        cases.extend(EvalCase(query=q, expected_key=key) for q in questions)
        print(f"  gen {i + 1}/{len(samples)}: {len(questions)} questions")

    print(f"gen done: {len(cases)} cases from {len(samples)} chunks")
    return cases


def _generate_questions(llm: LLM, content: str) -> list[str]:
    raw = llm.chat(None, INSTRUCTION + "\n\nChunk:\n" + content).strip()

    match = re.search(r"\[[\s\S]*]", raw)
    if not match:
        return []

    try:
        parsed = json.loads(match.group(0))
    except json.JSONDecodeError:
        return []

    if not isinstance(parsed, list):
        return []

    return [q for q in parsed if isinstance(q, str) and q.strip()]

# ═══════════════════════════════════════════
# arke/eval/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/eval/presets.py
# ═══════════════════════════════════════════

from dataclasses import replace
from itertools import product

from arke.server.config import Config

FAST = "fast"
MEDIUM = "medium"
THOROUGH = "thorough"


def get_preset(level: str, base: Config) -> list[Config]:
    if level == FAST:
        return _expand(base, chunk_sizes=[base.chunk_size], overlaps=[base.overlap], alphas=[base.alpha], ks=[base.k])
    if level == MEDIUM:
        return _expand(base, chunk_sizes=[base.chunk_size], overlaps=[base.overlap], alphas=[0.0, 0.3, 0.5, 0.7, 1.0], ks=[5, 10, 20])
    if level == THOROUGH:
        return _expand(base, chunk_sizes=[500, 1000], overlaps=[0.0, 0.2], alphas=[0.0, 0.3, 0.5, 0.7, 1.0], ks=[5, 10, 20])

    raise ValueError(f"unknown sweep level: {level} (expected: {FAST} | {MEDIUM} | {THOROUGH})")


def _expand(base: Config, chunk_sizes: list[int], overlaps: list[float], alphas: list[float], ks: list[int]) -> list[Config]:
    out: list[Config] = []
    for chunk_size, overlap, alpha, k in product(chunk_sizes, overlaps, alphas, ks):
        cfg = replace(base, chunk_size=chunk_size, overlap=overlap, alpha=alpha, k=k)
        out.append(cfg.resolved())
    return out

# ═══════════════════════════════════════════
# arke/eval/sweep.py
# ═══════════════════════════════════════════

"""Sweep — run eval across preset configs and rank by MRR.

Each config row = full arke-server restart with that config.
Communicates through mailbox like any real client.

Usage:
    python -m arke.eval.sweep --workspace legalbench --level medium --limit 50
"""
import os
import signal
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path

from arke.server import mailbox
from arke.server.config import Config
from arke.server.models import Models
from arke.server.types import Doc

from .gen import EvalCase, make_cases
from .presets import get_preset


@dataclass(frozen=True)
class EvalMetrics:
    precision: float
    recall: float
    mrr: float


@dataclass(frozen=True)
class SweepRow:
    cfg: Config
    metrics: EvalMetrics


def run(workspace: str, level: str, limit: int) -> list[SweepRow]:
    base_cfg = Config.from_env().resolved()
    configs = get_preset(level, base_cfg)

    # Generate eval cases using base config (one-time)
    print("generating eval cases...")
    cases = _generate_cases(workspace, base_cfg, limit)
    if not cases:
        print("error: no eval cases generated", file=sys.stderr)
        sys.exit(1)
    print(f"generated {len(cases)} cases\n")

    rows: list[SweepRow] = []
    for idx, cfg in enumerate(configs):
        print(f"[{idx + 1}/{len(configs)}] chunk={cfg.chunk_size} overlap={cfg.overlap} alpha={cfg.alpha} k={cfg.k}")
        metrics = _run_row(workspace, cfg, cases)
        rows.append(SweepRow(cfg=cfg, metrics=metrics))
        print(f"  → precision={metrics.precision:.3f} recall={metrics.recall:.3f} MRR={metrics.mrr:.3f}\n")

    rows.sort(key=lambda r: r.metrics.mrr, reverse=True)
    _print_table(rows)
    return rows


def _ws_path(workspace: str) -> Path:
    return Path.home() / ".arke" / "workspaces" / workspace


def _generate_cases(workspace: str, cfg: Config, limit: int) -> list[EvalCase]:
    """Start server, sample chunks, generate questions, stop server."""
    proc = _start_server(workspace, cfg)
    ws_path = _ws_path(workspace)
    try:
        _wait_ready(ws_path)
        msg_id = mailbox.send({"cmd": "sample", "limit": limit}, ws_path)
        response = mailbox.receive(msg_id, ws_path)
        if not response or not response.get("ok"):
            raise RuntimeError(f"sample failed: {response}")

        # load models for case generation (cloud preferred for quality)
        models = Models.load(cfg)
        from arke.server.types import Chunk
        chunks = [
            Chunk(doc_id=c["doc_id"], chunk_index=c["chunk_index"], clean=c["clean"], head=c["head"], tail=c["tail"])
            for c in response["chunks"]
        ]
        return make_cases(models.llm, chunks, limit)
    finally:
        _stop_server(proc)


def _run_row(workspace: str, cfg: Config, cases: list[EvalCase]) -> EvalMetrics:
    proc = _start_server(workspace, cfg)
    ws_path = _ws_path(workspace)
    try:
        _wait_ready(ws_path)
        results = []
        for case in cases:
            msg_id = mailbox.send({"cmd": "ask", "query": case.query}, ws_path)
            response = mailbox.receive(msg_id, ws_path)
            hits = response.get("citations", []) if response and response.get("ok") else []
            results.append((hits, case.expected_key))
        return _score(results)
    finally:
        _stop_server(proc)


def _start_server(workspace: str, cfg: Config) -> subprocess.Popen:
    env = {
        **os.environ,
        "ARKE_WORKSPACE": workspace,
        "CHUNK_SIZE": str(cfg.chunk_size),
        "OVERLAP": str(cfg.overlap),
        "ALPHA": str(cfg.alpha),
        "K": str(cfg.k),
    }
    proc = subprocess.Popen(["arke-server"], env=env)
    return proc


def _wait_ready(ws_path: Path, timeout: float = 60.0) -> None:
    """Poll ping until server responds or timeout."""
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            msg_id = mailbox.send({"cmd": "ping"}, ws_path)
            response = mailbox.receive(msg_id, ws_path)
            if response and response.get("pong"):
                return
        except Exception:
            pass
        time.sleep(1.0)
    raise RuntimeError("arke-server did not start in time")


def _stop_server(proc: subprocess.Popen) -> None:
    proc.send_signal(signal.SIGTERM)
    try:
        proc.wait(timeout=10)
    except subprocess.TimeoutExpired:
        proc.kill()


def _score(results: list[tuple[list, str]]) -> EvalMetrics:
    n = len(results)
    if n == 0:
        return EvalMetrics(precision=0.0, recall=0.0, mrr=0.0)

    sum_p = sum_r = sum_rr = 0.0
    for hits, expected_key in results:
        hit_keys = [f"{h['source']}:{h.get('chunk_index', '')}" for h in hits]
        matched = sum(1 for k in hit_keys if k == expected_key)

        sum_p += matched / len(hits) if hits else 0.0
        sum_r += float(matched > 0)

        for i, k in enumerate(hit_keys):
            if k == expected_key:
                sum_rr += 1.0 / (i + 1)
                break

    return EvalMetrics(precision=sum_p / n, recall=sum_r / n, mrr=sum_rr / n)


def _print_table(rows: list[SweepRow]) -> None:
    header = f"{'chunk':>6} {'overlap':>7} {'alpha':>6} {'k':>4} {'prec':>7} {'recall':>7} {'MRR':>7}"
    print(f"\n{'Sweep Results (sorted by MRR)':^50}")
    print(header)
    print("-" * len(header))
    best_mrr = rows[0].metrics.mrr if rows else 0.0
    for r in rows:
        mrr_str = f"{r.metrics.mrr:.3f}"
        if r.metrics.mrr == best_mrr:
            mrr_str += " <-- best"
        print(
            f"{r.cfg.chunk_size:>6} {r.cfg.overlap:>7.1f} {r.cfg.alpha:>6.1f} {r.cfg.k:>4}"
            f" {r.metrics.precision:>7.3f} {r.metrics.recall:>7.3f} {mrr_str}"
        )
    print()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--workspace", required=True)
    parser.add_argument("--level", default="medium")
    parser.add_argument("--limit", type=int, default=50)
    args = parser.parse_args()
    run(args.workspace, args.level, args.limit)

# ═══════════════════════════════════════════
# arke/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/server/backend_cloud.py
# ═══════════════════════════════════════════

"""Cloud backend — OpenAI API via HTTP."""
import json
import urllib.request
from dataclasses import dataclass

EMBED_BATCH_SIZE = 64


@dataclass
class CloudEmbedder:
    base_url: str
    api_key: str
    model: str

    def embed(self, texts: list[str]) -> list[list[float]]:
        result: list[list[float]] = []
        for offset in range(0, len(texts), EMBED_BATCH_SIZE):
            batch = texts[offset : offset + EMBED_BATCH_SIZE]
            res = _post(self.base_url, self.api_key, "/v1/embeddings", {"model": self.model, "input": batch})
            data = sorted(res["data"], key=lambda d: d["index"])
            result.extend(d["embedding"] for d in data)
        return result


@dataclass
class CloudLLM:
    base_url: str
    api_key: str
    model: str

    def chat(self, system: str | None, user: str) -> str:
        messages: list[dict] = []
        if system is not None:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": user})
        res = _post(self.base_url, self.api_key, "/v1/chat/completions", {"model": self.model, "messages": messages})
        return res["choices"][0]["message"]["content"]


def load(base_url: str, api_key: str, embed_model: str, inference_model: str) -> tuple[CloudEmbedder, CloudLLM]:
    return (
        CloudEmbedder(base_url, api_key, embed_model),
        CloudLLM(base_url, api_key, inference_model),
    )


def _post(base_url: str, api_key: str, path: str, body: dict) -> dict:
    req = urllib.request.Request(
        base_url + path,
        data=json.dumps(body).encode(),
        headers={
            "Content-Type": "application/json",
            "Authorization": f"Bearer {api_key}",
        },
    )
    with urllib.request.urlopen(req, timeout=120) as resp:
        return json.loads(resp.read())

# ═══════════════════════════════════════════
# arke/server/backend_local.py
# ═══════════════════════════════════════════

"""Local backend — llama-cpp-python, models loaded in-process from .gguf files."""
from dataclasses import dataclass

EMBED_BATCH_SIZE = 64


@dataclass
class LocalEmbedder:
    _model: object

    def embed(self, texts: list[str]) -> list[list[float]]:
        result: list[list[float]] = []
        for offset in range(0, len(texts), EMBED_BATCH_SIZE):
            batch = texts[offset : offset + EMBED_BATCH_SIZE]
            result.extend(self._model.embed(batch))
        return result


@dataclass
class LocalLLM:
    _model: object

    def chat(self, system: str | None, user: str) -> str:
        messages: list[dict] = []
        if system is not None:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": user})
        response = self._model.create_chat_completion(messages=messages)
        return response["choices"][0]["message"]["content"]


def load(embed_model_path: str, inference_model_path: str) -> tuple[LocalEmbedder, LocalLLM]:
    from llama_cpp import Llama
    embedder = Llama(model_path=embed_model_path, embedding=True, n_ctx=512, verbose=False)
    llm = Llama(model_path=inference_model_path, n_ctx=4096, verbose=False)
    return LocalEmbedder(embedder), LocalLLM(llm)

# ═══════════════════════════════════════════
# arke/server/bm25.py
# ═══════════════════════════════════════════

"""In-memory BM25 index. Built once at ingest, queried on every ask.

Standard Okapi BM25 with k1=1.5, b=0.75.
Keys are arbitrary strings — we use "<doc_id>:<chunk_index>".
"""
import math
import re
from dataclasses import dataclass, field

K1 = 1.5
B = 0.75


def _tokenize(text: str) -> list[str]:
    return re.findall(r"[a-z0-9]+", text.lower())


@dataclass
class BM25Index:
    _docs: dict[str, list[str]] = field(default_factory=dict)       # key → tokens
    _df: dict[str, int] = field(default_factory=dict)               # term → doc count
    _avgdl: float = 0.0

    def add(self, key: str, text: str) -> None:
        tokens = _tokenize(text)
        self._docs[key] = tokens
        for term in set(tokens):
            self._df[term] = self._df.get(term, 0) + 1

    def build(self) -> None:
        """Call after all add() calls to finalize avgdl."""
        if self._docs:
            self._avgdl = sum(len(t) for t in self._docs.values()) / len(self._docs)

    def scores(self, query: str) -> dict[str, float]:
        """Return BM25 score for every indexed key. Zero scores omitted."""
        terms = _tokenize(query)
        if not terms or not self._docs:
            return {}

        n = len(self._docs)
        result: dict[str, float] = {}

        for term in terms:
            df = self._df.get(term, 0)
            if df == 0:
                continue
            idf = math.log((n - df + 0.5) / (df + 0.5) + 1)

            for key, tokens in self._docs.items():
                tf = tokens.count(term)
                if tf == 0:
                    continue
                dl = len(tokens)
                norm = tf * (K1 + 1) / (tf + K1 * (1 - B + B * dl / self._avgdl))
                result[key] = result.get(key, 0.0) + idf * norm

        return result

    def clear(self) -> None:
        self._docs.clear()
        self._df.clear()
        self._avgdl = 0.0

# ═══════════════════════════════════════════
# arke/server/chunker.py
# ═══════════════════════════════════════════

from dataclasses import dataclass

# Strongest separators first. Empty string is the final fallback (per-character).
SEPARATORS = ["\n\n", "\n", ". ", ", ", " ", ""]


@dataclass(frozen=True)
class ChunkData:
    """A chunk with its overlap context. The embedding is computed from
    `overlapped()` so the vector 'knows' about its neighbors, but only
    `clean` is stored in the database — no text duplication between rows."""
    clean: str
    head: str
    tail: str

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail


def chunk(text: str, chunk_size: int, overlap: float) -> list[ChunkData]:
    """Recursive character text splitter: split by separator hierarchy,
    then greedily merge adjacent pieces up to chunk_size."""
    if not text:
        return []

    clean = _merge(_separate(text.strip(), chunk_size), chunk_size)
    overlap_chars = int(chunk_size * overlap)
    result: list[ChunkData] = []

    for i, piece in enumerate(clean):
        head = clean[i - 1][-overlap_chars:] if (overlap_chars > 0 and i > 0) else ""
        tail = clean[i + 1][:overlap_chars] if (overlap_chars > 0 and i < len(clean) - 1) else ""
        result.append(ChunkData(clean=piece, head=head, tail=tail))

    return result


def _separate(text: str, chunk_size: int, depth: int = 0) -> list[str]:
    if depth >= len(SEPARATORS):
        return [text]

    sep = SEPARATORS[depth]
    parts = text.split(sep)
    result: list[str] = []

    for i, part in enumerate(parts):
        if len(part) < chunk_size:
            # Reattach the separator as a suffix so _merge can reconstruct
            # the original text without losing whitespace or newlines.
            suffix = sep if i < len(parts) - 1 else ""
            result.append(part + suffix)
        else:
            result.extend(_separate(part, chunk_size, depth + 1))

    return result


def _merge(splits: list[str], chunk_size: int) -> list[str]:
    raw: list[str] = []
    for s in splits:
        if not s:
            continue

        if raw and len(raw[-1]) + len(s) < chunk_size:
            raw[-1] += s
        else:
            raw.append(s)

    return raw

# ═══════════════════════════════════════════
# arke/server/config.py
# ═══════════════════════════════════════════

from __future__ import annotations

import os
from dataclasses import dataclass, replace


@dataclass(frozen=True)
class Config:
    # Workspace
    workspace: str = "default"

    # Backend: "local" (llama-cpp-python) or "cloud" (OpenAI)
    backend: str = "local"

    # Local backend — paths to .gguf files
    embed_model_path: str = ""
    inference_model_path: str = ""

    # Cloud backend
    cloud_api_key: str = ""
    cloud_base_url: str = "https://api.openai.com"
    cloud_embed_model: str = "text-embedding-3-small"
    cloud_inference_model: str = "gpt-4o"

    # RAG parameters
    embedding_dim: int = 0
    chunk_size: int = 600
    overlap: float = 0.0
    alpha: float = 0.7
    k: int = 5

    def resolved(self) -> Config:
        if self.backend == "local":
            if not self.embed_model_path:
                raise ValueError("config: EMBED_MODEL_PATH is required for local backend")
            if not self.inference_model_path:
                raise ValueError("config: INFERENCE_MODEL_PATH is required for local backend")
        elif self.backend == "cloud":
            if not self.cloud_api_key:
                raise ValueError("config: CLOUD_API_KEY is required for cloud backend")
        else:
            raise ValueError(f"config: BACKEND must be 'local' or 'cloud', got '{self.backend}'")

        cfg = replace(self, embedding_dim=self.embedding_dim or DEFAULTS.embedding_dim)

        if cfg.chunk_size < 100 or cfg.chunk_size > 10000:
            raise ValueError(f"config: chunk_size must be 100..10000, got {cfg.chunk_size}")
        if cfg.overlap < 0 or cfg.overlap > 0.5:
            raise ValueError(f"config: overlap must be 0..0.5, got {cfg.overlap}")
        if cfg.alpha < 0 or cfg.alpha > 1:
            raise ValueError(f"config: alpha must be 0..1, got {cfg.alpha}")
        if cfg.k < 1 or cfg.k > 20:
            raise ValueError(f"config: k must be 1..20, got {cfg.k}")

        return cfg

    @staticmethod
    def from_env() -> Config:
        return Config(
            workspace=os.environ.get("ARKE_WORKSPACE", "default"),
            backend=os.environ.get("BACKEND", "local"),
            embed_model_path=os.environ.get("EMBED_MODEL_PATH", ""),
            inference_model_path=os.environ.get("INFERENCE_MODEL_PATH", ""),
            cloud_api_key=os.environ.get("CLOUD_API_KEY", ""),
            cloud_base_url=os.environ.get("CLOUD_BASE_URL", "https://api.openai.com"),
            cloud_embed_model=os.environ.get("CLOUD_EMBED_MODEL", "text-embedding-3-small"),
            cloud_inference_model=os.environ.get("CLOUD_INFERENCE_MODEL", "gpt-4o"),
            embedding_dim=int(os.environ.get("EMBEDDING_DIM", "0")),
            chunk_size=int(os.environ.get("CHUNK_SIZE", "600")),
            overlap=float(os.environ.get("OVERLAP", "0.0")),
            alpha=float(os.environ.get("ALPHA", "0.7")),
            k=int(os.environ.get("K", "5")),
        )


DEFAULTS = Config(
    embedding_dim=1024,
)

# ═══════════════════════════════════════════
# arke/server/__init__.py
# ═══════════════════════════════════════════


# ═══════════════════════════════════════════
# arke/server/loader.py
# ═══════════════════════════════════════════

"""Loader — turns raw files from digest/ into (Doc, text) pairs.

Dispatcher uses file extension. Supported: .txt .md .pdf .docx .msg
Unsupported files are skipped silently — unknown formats are normal in digest/.
"""
import hashlib
import logging
from pathlib import Path

from .types import Doc

logger = logging.getLogger(__name__)


def load_digest(digest_path: Path) -> list[tuple[Doc, str]]:
    """Parse all supported files under digest_path. Returns (Doc, text) pairs."""
    results: list[tuple[Doc, str]] = []
    for path in sorted(digest_path.rglob("*")):
        if not path.is_file() or path.name.startswith("."):
            continue
        result = load_file(path, root=digest_path)
        if result is not None:
            results.append(result)
    return results


def load_file(path: Path, root: Path | None = None) -> tuple[Doc, str] | None:
    """Parse a single file. Returns (Doc, text) or None if unsupported/empty."""
    suffix = path.suffix.lower()

    match suffix:
        case ".txt" | ".md":
            text = _load_txt(path)
        case ".pdf":
            text = _load_pdf(path)
        case ".docx":
            text = _load_docx(path)
        case ".msg":
            text = _load_msg(path)
        case _:
            logger.debug("skipping unsupported file: %s", path.name)
            return None

    if not text or not text.strip():
        return None

    source = str(path.relative_to(root)) if root else path.name
    stat = path.stat()
    doc = Doc(
        id=_content_id(path),
        source=source,
        created=int(stat.st_ctime),
        modified=int(stat.st_mtime),
        metadata={"filename": path.name, "suffix": suffix},
    )
    return doc, text.strip()


def _content_id(path: Path) -> str:
    return hashlib.md5(path.read_bytes()).hexdigest()


def _load_txt(path: Path) -> str:
    return path.read_text(encoding="utf-8", errors="replace")


def _load_pdf(path: Path) -> str:
    import pdfplumber
    pages: list[str] = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()
            if text:
                pages.append(text)
    return "\n\n".join(pages)


def _load_docx(path: Path) -> str:
    from docx import Document
    doc = Document(path)
    return "\n\n".join(p.text for p in doc.paragraphs if p.text.strip())


def _load_msg(path: Path) -> str:
    import extract_msg
    with extract_msg.Message(str(path)) as msg:
        parts: list[str] = []
        if msg.subject:
            parts.append(f"Subject: {msg.subject}")
        if msg.body:
            parts.append(msg.body)
        return "\n\n".join(parts)

# ═══════════════════════════════════════════
# arke/server/mailbox.py
# ═══════════════════════════════════════════

"""Inbox/outbox file queue. Arke's only coordination primitive.

Layout:
    ~/.arke/workspaces/<name>/inbox/<uuid>.json   — incoming request
    ~/.arke/workspaces/<name>/outbox/<uuid>.json  — response written by Arke

Writers drop a file in inbox and poll outbox for the response.
Arke scans inbox on every tick, processes, writes outbox.
"""
import json
import time
import uuid
from pathlib import Path

POLL_INTERVAL = 0.1   # seconds between outbox polls
POLL_TIMEOUT  = 120.0 # max seconds to wait for a response

_inbox:  Path | None = None
_outbox: Path | None = None


def setup(inbox: Path, outbox: Path) -> None:
    global _inbox, _outbox
    _inbox  = inbox
    _outbox = outbox
    _inbox.mkdir(parents=True, exist_ok=True)
    _outbox.mkdir(parents=True, exist_ok=True)


def send(request: dict, workspace_path: Path) -> str:
    """Write a request to inbox. Returns the message id."""
    msg_id = str(uuid.uuid4())
    inbox = workspace_path / "inbox"
    inbox.mkdir(parents=True, exist_ok=True)
    _atomic_write(inbox / f"{msg_id}.json", request)
    return msg_id


def receive(msg_id: str, workspace_path: Path) -> dict | None:
    """Poll outbox for a response. Blocks until response arrives or timeout."""
    path = workspace_path / "outbox" / f"{msg_id}.json"
    deadline = time.monotonic() + POLL_TIMEOUT

    while time.monotonic() < deadline:
        if path.exists():
            data = json.loads(path.read_text())
            path.unlink(missing_ok=True)
            return data
        time.sleep(POLL_INTERVAL)

    return None


def drain() -> list[tuple[str, dict]]:
    """Return all pending inbox messages as (msg_id, request) pairs. Removes files."""
    assert _inbox is not None, "mailbox.setup() not called"
    messages: list[tuple[str, dict]] = []
    for f in sorted(_inbox.glob("*.json")):
        try:
            data = json.loads(f.read_text())
            messages.append((f.stem, data))
            f.unlink(missing_ok=True)
        except Exception:
            f.unlink(missing_ok=True)
    return messages


def reply(msg_id: str, response: dict) -> None:
    """Write a response to outbox."""
    assert _outbox is not None, "mailbox.setup() not called"
    _atomic_write(_outbox / f"{msg_id}.json", response)


def _atomic_write(path: Path, data: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(".tmp")
    tmp.write_text(json.dumps(data))
    tmp.replace(path)

# ═══════════════════════════════════════════
# arke/server/main.py
# ═══════════════════════════════════════════

"""Arke — the living organism.

Startup sequence:
  1. mount workspace (sdb)
  2. load config + models
  3. consume digest/ if present
  4. enter main loop

Main loop (1-second pulse):
  - drain inbox  → process requests → write outbox
  - check digest → re-ingest if hash changed
"""
import hashlib
import logging
import time
from pathlib import Path

import numpy as np

from . import chunker, loader, mailbox, sdb
from .bm25 import BM25Index
from .config import Config
from .models import Models
from .workspace import mount as mount_workspace
from .types import Chunk, Doc, SearchHit

logger = logging.getLogger(__name__)

TICK = 1.0  # seconds

SYSTEM_PROMPT = (
    "You are a legal research assistant. "
    "Answer based only on the provided documents. "
    "Be concise and cite the source for every claim."
)


def run() -> None:
    cfg = Config.from_env().resolved()
    ws = mount_workspace(cfg.workspace)
    mailbox.setup(ws.inbox, ws.outbox)
    models = Models.load(cfg)

    digest_path = ws.path / "digest"
    docs: dict[str, Doc] = {}
    bm25 = BM25Index()
    last_digest_hash = ""

    if digest_path.exists():
        logger.info("loading digest on startup...")
        last_digest_hash = _ingest(digest_path, cfg, models, docs, bm25)

    logger.info("arke ready [%s] — %d docs, %d chunks", ws.name, len(docs), _chunk_count(docs))

    while True:
        _drain(docs, bm25, cfg, models)
        last_digest_hash = _watch_digest(digest_path, last_digest_hash, cfg, models, docs, bm25)
        time.sleep(TICK)


# --- ingest ------------------------------------------------------------------

def _ingest(digest_path: Path, cfg: Config, models: Models, docs: dict[str, Doc], bm25: BM25Index) -> str:
    docs.clear()
    bm25.clear()
    model_key = cfg.embed_model_path or cfg.cloud_embed_model

    for path in sorted(digest_path.rglob("*")):
        if not path.is_file() or path.name.startswith("."):
            continue

        result = loader.load_file(path, root=digest_path)
        if result is None:
            continue

        doc, text = result

        sdb.put_bin("sources", doc.id, path.read_bytes())

        chunk_datas = chunker.chunk(text, cfg.chunk_size, cfg.overlap)
        for i, cd in enumerate(chunk_datas):
            chunk = Chunk(doc_id=doc.id, chunk_index=i, clean=cd.clean, head=cd.head, tail=cd.tail)

            if not chunk.load_embedding(model_key, "1"):
                vec = models.embedder.embed([chunk.overlapped()])[0]
                chunk.embedding = np.array(vec, dtype=np.float32)
                chunk.save_embedding(model_key, "1")

            bm25.add(f"{doc.id}:{i}", chunk.overlapped())
            doc.chunks.append(chunk)

        doc.save()
        docs[doc.id] = doc

    bm25.build()
    logger.info("ingest done — %d docs, %d chunks", len(docs), _chunk_count(docs))
    return _dir_hash(digest_path)


# --- main loop ---------------------------------------------------------------

def _drain(docs: dict[str, Doc], bm25: BM25Index, cfg: Config, models: Models) -> None:
    for msg_id, request in mailbox.drain():
        try:
            response = _dispatch(request, docs, bm25, cfg, models)
        except Exception as e:
            logger.warning("handler error: %s", e)
            response = {"ok": False, "error": str(e)}
        mailbox.reply(msg_id, response)


def _dispatch(request: dict, docs: dict[str, Doc], bm25: BM25Index, cfg: Config, models: Models) -> dict:
    cmd = request.get("cmd")

    if cmd == "ask":
        return _ask(request, docs, bm25, cfg, models)

    if cmd == "ping":
        return {"ok": True, "pong": True}

    if cmd == "sample":
        return _sample(request, docs)

    return {"ok": False, "error": f"unknown cmd: {cmd}"}


def _watch_digest(
    digest_path: Path,
    last_hash: str,
    cfg: Config,
    models: Models,
    docs: dict[str, Doc],
    bm25: BM25Index,
) -> str:
    if not digest_path.exists():
        return last_hash

    current_hash = _dir_hash(digest_path)
    if current_hash == last_hash:
        return last_hash

    logger.info("new digest detected, re-ingesting...")
    return _ingest(digest_path, cfg, models, docs, bm25)


# --- ask ---------------------------------------------------------------------

def _ask(request: dict, docs: dict[str, Doc], bm25: BM25Index, cfg: Config, models: Models) -> dict:
    query = request.get("query", "")
    if not query:
        return {"ok": False, "error": "query is required"}

    q_vec = np.array(models.embedder.embed([query])[0], dtype=np.float32)
    hits = _hybrid_search(docs, bm25, q_vec, query, cfg.k, cfg.alpha)

    if not hits:
        return {"ok": True, "answer": "No relevant documents found.", "citations": []}

    context = "\n\n".join(
        f"[{i+1}] (source: {h.chunk.doc_id[:8]}) {h.chunk.clean}"
        for i, h in enumerate(hits)
    )
    answer = models.llm.chat(SYSTEM_PROMPT, f"Documents:\n{context}\n\nQuestion: {query}")

    citations = [
        {"source": h.chunk.doc_id, "text": h.chunk.clean[:200], "score": round(h.similarity, 3)}
        for h in hits
    ]
    return {"ok": True, "answer": answer, "citations": citations}


def _sample(request: dict, docs: dict[str, Doc]) -> dict:
    import random
    limit = request.get("limit", 50)
    all_chunks = [chunk for doc in docs.values() for chunk in doc.chunks]
    sample = random.sample(all_chunks, min(limit, len(all_chunks)))
    return {"ok": True, "chunks": [
        {"doc_id": c.doc_id, "chunk_index": c.chunk_index, "clean": c.clean, "head": c.head, "tail": c.tail}
        for c in sample
    ]}


def _hybrid_search(
    docs: dict[str, Doc],
    bm25: BM25Index,
    q_vec: np.ndarray,
    query: str,
    k: int,
    alpha: float,
) -> list[SearchHit]:
    q_norm = np.linalg.norm(q_vec)
    if q_norm == 0:
        return []

    # cosine scores
    cosine: dict[str, float] = {}
    for doc in docs.values():
        for chunk in doc.chunks:
            if chunk.embedding is None:
                continue
            c_norm = np.linalg.norm(chunk.embedding)
            if c_norm == 0:
                continue
            key = f"{chunk.doc_id}:{chunk.chunk_index}"
            cosine[key] = float(np.dot(q_vec, chunk.embedding) / (q_norm * c_norm))

    # bm25 scores — normalize to [0, 1]
    bm25_raw = bm25.scores(query)
    bm25_max = max(bm25_raw.values(), default=1.0)
    bm25_norm = {k: v / bm25_max for k, v in bm25_raw.items()} if bm25_max > 0 else {}

    # hybrid score
    all_keys = set(cosine) | set(bm25_norm)
    scored: list[tuple[str, float]] = []
    for key in all_keys:
        score = alpha * cosine.get(key, 0.0) + (1 - alpha) * bm25_norm.get(key, 0.0)
        scored.append((key, score))

    scored.sort(key=lambda x: x[1], reverse=True)

    # resolve keys back to chunks
    chunk_map: dict[str, Chunk] = {
        f"{chunk.doc_id}:{chunk.chunk_index}": chunk
        for doc in docs.values()
        for chunk in doc.chunks
    }

    hits: list[SearchHit] = []
    for key, score in scored[:k]:
        chunk = chunk_map.get(key)
        if chunk:
            hits.append(SearchHit(chunk=chunk, similarity=score))
    return hits


# --- helpers -----------------------------------------------------------------

def _chunk_count(docs: dict[str, Doc]) -> int:
    return sum(len(d.chunks) for d in docs.values())


def _dir_hash(path: Path) -> str:
    h = hashlib.md5()
    for f in sorted(path.rglob("*")):
        if f.is_file():
            st = f.stat()
            h.update(str(f.relative_to(path)).encode())
            h.update(str(st.st_size).encode())
            h.update(str(st.st_mtime_ns).encode())
    return h.hexdigest()

# ═══════════════════════════════════════════
# arke/server/models.py
# ═══════════════════════════════════════════

"""Models — Embedder and LLM protocols + factory."""
from dataclasses import dataclass
from typing import Protocol

from .config import Config


class Embedder(Protocol):
    def embed(self, texts: list[str]) -> list[list[float]]: ...


class LLM(Protocol):
    def chat(self, system: str | None, user: str) -> str: ...


@dataclass
class Models:
    embedder: Embedder
    llm: LLM

    @staticmethod
    def load(cfg: Config) -> "Models":
        if cfg.backend == "cloud":
            from .backend_cloud import load
            embedder, llm = load(cfg.cloud_base_url, cfg.cloud_api_key, cfg.cloud_embed_model, cfg.cloud_inference_model)
        else:
            from .backend_local import load
            embedder, llm = load(cfg.embed_model_path, cfg.inference_model_path)

        return Models(embedder=embedder, llm=llm)

# ═══════════════════════════════════════════
# arke/server/sdb.py
# ═══════════════════════════════════════════

"""Unified file-based store for all of Arke's cached state.

Everything that passes through Arke is cache — documents, embeddings, source files,
sessions. Microsoft (or the user's filesystem) is the source of truth; sdb is the
gut that digests and holds the processed form. Blow it away at any time — it
rebuilds from the source.

Layout:
    <root>/<table>/<id[:2]>/<id>.<ext>

Sharding by first two characters of the id keeps any single directory small
regardless of corpus size.
"""
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
import json
import os
import shutil

import numpy as np


_root: Path | None = None


def mount(path: str | Path) -> None:
    global _root
    _root = Path(path).expanduser()
    _root.mkdir(parents=True, exist_ok=True)
    for tmp in _root.rglob("*.tmp"):
        tmp.unlink(missing_ok=True)


def _table(table: str) -> Path:
    return _root / table


def _shard(table: str, id: str) -> Path:
    return _table(table) / id[:2]


def _path(table: str, id: str, ext: str) -> Path:
    suffix = f".{ext}" if ext else ""
    return _shard(table, id) / f"{id}{suffix}"


@contextmanager
def _atomic_open(path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_name(path.name + ".tmp")
    with open(tmp, "wb") as f:
        yield f
        f.flush()
        os.fsync(f.fileno())
    tmp.replace(path)


# JSON — documents, sessions, any structured record -----------------------

def put_json(table: str, id: str, data: dict) -> None:
    with _atomic_open(_path(table, id, "json")) as f:
        f.write(json.dumps(data, indent=2).encode())


def get_json(table: str, id: str) -> dict | None:
    p = _path(table, id, "json")
    if not p.exists():
        return None
    return json.loads(p.read_text())


def scan_json(table: str) -> Iterator[tuple[str, dict]]:
    table_dir = _table(table)
    if not table_dir.exists():
        return
    for f in table_dir.rglob("*.json"):
        yield f.stem, json.loads(f.read_text())


# Vectors — embeddings --------------------------------------------------------

def put_vec(table: str, id: str, vec: np.ndarray) -> None:
    with _atomic_open(_path(table, id, "npy")) as f:
        np.save(f, vec)


def get_vec(table: str, id: str) -> np.ndarray | None:
    p = _path(table, id, "npy")
    if not p.exists():
        return None
    return np.load(p)


# Raw bytes — source files (PDF, DOCX, MSG), any opaque blob ------------------

def put_bin(table: str, id: str, data: bytes) -> None:
    with _atomic_open(_path(table, id, "")) as f:
        f.write(data)


def get_bin(table: str, id: str) -> bytes | None:
    p = _path(table, id, "")
    if not p.exists():
        return None
    return p.read_bytes()


# Delete ----------------------------------------------------------------------

def delete(table: str, id: str) -> None:
    """Remove a single record (any extension) from the table."""
    shard = _shard(table, id)
    if not shard.exists():
        return
    for f in shard.iterdir():
        if f.name == id or f.name.startswith(f"{id}."):
            f.unlink(missing_ok=True)


def wipe(table: str) -> None:
    """Remove the entire table — every record under every shard."""
    table_dir = _table(table)
    if table_dir.exists():
        shutil.rmtree(table_dir)

# ═══════════════════════════════════════════
# arke/server/types.py
# ═══════════════════════════════════════════

import hashlib
from dataclasses import dataclass, field
from typing import Any, ClassVar, Iterator

import numpy as np

from . import sdb


@dataclass
class Chunk:
    doc_id: str
    chunk_index: int
    clean: str
    head: str
    tail: str

    # Runtime only — not serialized. Loaded from sdb.get_vec or computed on GPU.
    embedding: np.ndarray | None = field(default=None, compare=False, repr=False)

    def overlapped(self) -> str:
        return self.head + self.clean + self.tail

    def cache_key(self, model_id: str, model_version: str) -> str:
        raw = f"{model_id}:{model_version}:{self.overlapped()}"
        return hashlib.md5(raw.encode()).hexdigest()

    def save_embedding(self, model_id: str, model_version: str) -> None:
        if self.embedding is None:
            return
        sdb.put_vec("embeddings", self.cache_key(model_id, model_version), self.embedding)

    def load_embedding(self, model_id: str, model_version: str) -> bool:
        vec = sdb.get_vec("embeddings", self.cache_key(model_id, model_version))
        if vec is None:
            return False
        self.embedding = vec
        return True


@dataclass
class Doc:
    TABLE: ClassVar[str] = "documents"

    id: str
    source: str
    created: int
    modified: int
    metadata: dict[str, Any] = field(default_factory=dict)
    tags: list[str] = field(default_factory=list)

    # Runtime only — re-grown on every unwrap, never serialized.
    chunks: list[Chunk] = field(default_factory=list, compare=False, repr=False)
    _dirty: bool = field(default=False, compare=False, repr=False)

    def wrap(self) -> dict:
        return {
            "id": self.id,
            "source": self.source,
            "created": self.created,
            "modified": self.modified,
            "metadata": self.metadata,
            "tags": self.tags,
        }

    @classmethod
    def unwrap(cls, d: dict) -> "Doc":
        return cls(
            id=d["id"],
            source=d["source"],
            created=d["created"],
            modified=d["modified"],
            metadata=d.get("metadata", {}),
            tags=d.get("tags", []),
        )

    def save(self) -> None:
        sdb.put_json(self.TABLE, self.id, self.wrap())
        self._dirty = False

    @classmethod
    def load(cls, id: str) -> "Doc | None":
        data = sdb.get_json(cls.TABLE, id)
        return cls.unwrap(data) if data else None

    @classmethod
    def scan(cls) -> Iterator["Doc"]:
        for _, data in sdb.scan_json(cls.TABLE):
            yield cls.unwrap(data)


@dataclass(frozen=True)
class SearchHit:
    chunk: Chunk
    similarity: float


@dataclass(frozen=True)
class SearchAnswer:
    answer: str
    hits: list[SearchHit]

# ═══════════════════════════════════════════
# arke/server/workspace.py
# ═══════════════════════════════════════════

"""Workspace — an isolated container for all Arke state.

Each workspace has its own sdb root. Switching workspaces is just
calling mount() with a different name.

Layout:
    ~/.arke/workspaces/<name>/
        data/      — sdb root (documents, embeddings, sources)
        digest/    — drop documents here; Arke ingests on change
        inbox/     — incoming requests
        outbox/    — responses
"""
from __future__ import annotations

import shutil
from dataclasses import dataclass
from pathlib import Path

from . import sdb

ARKE_HOME = Path("~/.arke")


@dataclass(frozen=True)
class Workspace:
    name: str
    path: Path

    @property
    def data(self) -> Path:
        return self.path / "data"

    @property
    def inbox(self) -> Path:
        return self.path / "inbox"

    @property
    def outbox(self) -> Path:
        return self.path / "outbox"

    def wipe(self) -> None:
        """Erase all indexed data."""
        if self.data.exists():
            shutil.rmtree(self.data)
        sdb.mount(self.data)


def mount(name: str, home: str | Path = ARKE_HOME) -> Workspace:
    """Mount a workspace by name. Sets the sdb root."""
    path = Path(home).expanduser() / "workspaces" / name
    path.mkdir(parents=True, exist_ok=True)
    ws = Workspace(name=name, path=path)
    sdb.mount(ws.data)
    return ws

# ═══════════════════════════════════════════
# pyproject.toml
# ═══════════════════════════════════════════

[project]
name = "arke-legal"
version = "0.1.193"
description = "Local-first RAG for legal teams."
requires-python = ">=3.12"
license = "MIT"
authors = [
    { name = "Pavel Dolgoter", email = "mikepromogratus@proton.me" },
]
readme = "README.md"
keywords = ["rag", "search", "embeddings", "legal", "local"]
classifiers = [
    "Development Status :: 3 - Alpha",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3.12",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
]
dependencies = [
    "python-dotenv>=1.0",
    "numpy>=2.0",
    "click>=8.1",
    "llama-cpp-python>=0.3",
    "pdfplumber>=0.11",
    "python-docx>=1.1",
    "extract-msg>=0.48",
    "msal>=1.36.0",
    "httpx>=0.28.1",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
]

[project.urls]
Repository = "https://github.com/padolgot/arke-legal"

[project.scripts]
arke-server  = "arke.server.main:run"
arke-gateway = "arke.clients.socket:run"
arke         = "arke.clients.cli:main"
arke-eval    = "arke.eval.sweep:run"
arke-mail    = "arke.clients.email:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["arke"]

# ═══════════════════════════════════════════
# .env.example
# ═══════════════════════════════════════════


API_KEY=
