#!/usr/bin/env python3
"""Move misrouted SUBAGENT records out of live <UUID>.jsonl files into
their proper <UUID>/subagents/agent-<agentId>.jsonl files.

Background: scripts/merge-recovered-into-profiles appended 20,856 records
to 156 live session files. ~99.7% of those records were subagent turns
(isSidechain truthy or agentId set) which Claude Code keeps in a SEPARATE
per-agent file under <UUID>/subagents/. Those records do not belong in
the main session jsonl. This script moves them.

For each DIVERGENT entry in the merge-log:
  1. Read backup records (pre-merge state) to compute the "belongs in
     main" key set.
  2. Read current live records under flock(LOCK_EX). Live = backup +
     records appended by the merge + any records written by an active
     Claude Code process since the merge.
  3. Partition live records:
       - key in backup_keys           -> KEEP in main (was already there)
       - key not in backup_keys AND
         (isSidechain OR agentId)     -> MOVE to subagent file
       - key not in backup_keys AND
         not subagent                 -> KEEP in main (legitimate
                                        recovered main record OR a
                                        post-merge real-time write)
  4. Append moved records to <UUID>/subagents/agent-<agentId>.jsonl,
     creating the file 0600 / parent dir 0755 if missing. Append happens
     under flock(LOCK_EX) on an O_APPEND fd to serialize against any
     concurrent writer. Dedup by record key.
  5. Rewrite the live file IN PLACE under the same flock by lseek(0) +
     write + ftruncate. We deliberately do NOT use os.replace(): three
     Claude Code sessions are actively writing to these files and have
     them open by inode. Replacing the inode would orphan their writes.

Steps 4-then-5 ordering is deliberate: append-before-rewrite means a
crash between the two leaves duplicate data (recoverable on re-run),
while the reverse order would risk data loss (truncated main, never-
written subagent). Subagent appends dedup against on-disk content under
flock so re-runs are idempotent.

Default mode is dry-run: prints planned counts, writes nothing. Dry-run
also reports a three-way split of planned moves: removed_main,
new_append (don't already exist at destination), dedup_skip (already at
destination because Claude Code wrote them at runtime).

Pass --apply to execute.
"""
from __future__ import annotations

import argparse
import datetime as _dt
import errno
import fcntl
import hashlib
import json
import os
import shutil
import sys
import traceback
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parent.parent
BACKUP_ROOT = PROJECT_ROOT / "data" / "merge-backups"
FIX_BACKUP_ROOT = PROJECT_ROOT / "data" / "fix-backups"
REAL_PROFILES = [
    Path.home() / ".claude-personal",
    Path.home() / ".claude-work",
    Path.home() / ".claude-the-third",
]


def latest_merge_run() -> Path:
    """Find the most recent merge-backups/<ts>/ dir that has a merge-log."""
    if not BACKUP_ROOT.is_dir():
        raise SystemExit(f"no merge-backups dir at {BACKUP_ROOT}")
    cands = sorted(
        d for d in BACKUP_ROOT.iterdir()
        if d.is_dir() and (d / "merge-log.json").is_file()
    )
    if not cands:
        raise SystemExit(f"no merge-log.json under {BACKUP_ROOT}/*/")
    return cands[-1]


def record_key(obj: dict) -> str | None:
    """Stable identity key matching verify-recovered-duplicates.record_key.
    For our needs the uuid path always wins on subagent records (they all
    carry a uuid). Hash fallback is fine for control records, which we
    never move."""
    if not isinstance(obj, dict):
        return None
    uid = obj.get("uuid")
    if isinstance(uid, str) and uid:
        return f"uuid:{uid}"
    snap = obj.get("snapshot")
    if isinstance(snap, dict):
        sid = snap.get("messageId")
        if isinstance(sid, str) and sid:
            return f"snap:{sid}"
    mid = obj.get("messageId")
    if isinstance(mid, str) and mid:
        return f"mid:{mid}"
    canon = json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8")
    return "h:" + hashlib.sha1(canon).hexdigest()


def parse_jsonl_bytes(data: bytes, dropped: list[dict] | None = None,
                      source: str | None = None) -> list[dict]:
    """Parse all valid JSON-object lines from raw bytes. Skips blank
    lines. Unparseable / non-dict lines are dropped; if `dropped` is
    supplied, each drop is logged with file path, 1-based line number,
    reason, and the first 100 bytes of the raw line as hex.

    (One known torn-line file lives in this set; the torn line's bytes
    will be lost on rewrite. That is acceptable: the line was
    un-recoverable JSON pre-existing the merge.)"""
    out: list[dict] = []
    for line_no, raw in enumerate(data.splitlines(), start=1):
        s = raw.strip()
        if not s:
            continue
        try:
            obj = json.loads(s)
        except (UnicodeDecodeError, json.JSONDecodeError) as e:
            if dropped is not None:
                dropped.append({
                    "file": source,
                    "line_no": line_no,
                    "reason": f"{type(e).__name__}: {e}",
                    "raw_first_100_bytes_hex": raw[:100].hex(),
                })
            continue
        if isinstance(obj, dict):
            out.append(obj)
        elif dropped is not None:
            dropped.append({
                "file": source,
                "line_no": line_no,
                "reason": f"non-dict json ({type(obj).__name__})",
                "raw_first_100_bytes_hex": raw[:100].hex(),
            })
    return out


def parse_jsonl_path(p: Path, dropped: list[dict] | None = None) -> list[dict]:
    with p.open("rb") as f:
        return parse_jsonl_bytes(f.read(), dropped=dropped, source=str(p))


def is_subagent_record(obj: dict) -> bool:
    if obj.get("isSidechain"):
        return True
    a = obj.get("agentId")
    if isinstance(a, str) and a:
        return True
    return False


def agent_id_of(obj: dict) -> str | None:
    a = obj.get("agentId")
    if isinstance(a, str) and a:
        return a
    return None


def profile_of(path: Path) -> Path | None:
    for prof in REAL_PROFILES:
        try:
            path.relative_to(prof)
            return prof
        except ValueError:
            pass
    return None


def backup_path_for(real: Path, merge_run_root: Path) -> Path:
    """Map a live path to its pre-merge backup path under merge_run_root."""
    prof = profile_of(real)
    if prof is None:
        raise RuntimeError(f"path {real} not under any real profile")
    rel = real.relative_to(prof)
    return merge_run_root / prof.name / rel


def fix_backup_path_for(real: Path, fix_run_root: Path) -> Path:
    """Per-file backup-of-backup destination (post-merge state, pre-fix)."""
    prof = profile_of(real)
    if prof is None:
        raise RuntimeError(f"path {real} not under any real profile")
    rel = real.relative_to(prof)
    return fix_run_root / prof.name / rel


def serialize_record(obj: dict) -> bytes:
    """Compact JSON to match Claude Code's on-disk format."""
    return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8")


def rewrite_live_in_place(real: Path, keep_records: list[dict]) -> None:
    """Truncate-and-rewrite real under an exclusive flock. Does NOT
    swap the inode (active Claude Code processes hold this file open)."""
    body = b"".join(serialize_record(r) + b"\n" for r in keep_records)
    fd = os.open(real, os.O_RDWR)
    try:
        fcntl.flock(fd, fcntl.LOCK_EX)
        os.lseek(fd, 0, os.SEEK_SET)
        # Write everything first, then truncate to the new length so the
        # file is never observed shorter-than-content mid-write.
        n = 0
        view = memoryview(body)
        while n < len(view):
            n += os.write(fd, view[n:])
        os.ftruncate(fd, len(body))
        os.fsync(fd)
    finally:
        try:
            fcntl.flock(fd, fcntl.LOCK_UN)
        finally:
            os.close(fd)


def existing_subagent_keys(agent_path: Path) -> set[str]:
    """Read agent_path (if it exists) and return its record-key set.
    Used by dry-run to dedup planned appends against records Claude
    Code already wrote at runtime. No locking -- dry-run is read-only
    and we accept slightly stale counts."""
    if not agent_path.is_file():
        return set()
    keys: set[str] = set()
    for r in parse_jsonl_path(agent_path):
        k = record_key(r)
        if k:
            keys.add(k)
    return keys


def append_to_subagent_file(agent_path: Path, records: list[dict]) -> int:
    """Append records (deduped by key against existing file) to
    agent_path under flock+O_APPEND. Returns count actually appended.
    Creates parent dir 0755 (matching Claude Code's mode) and file
    0600 if missing."""
    if not records:
        return 0
    parent = agent_path.parent
    if not parent.exists():
        parent.mkdir(parents=True, exist_ok=True)
        try:
            os.chmod(parent, 0o755)
        except OSError:
            pass

    # Open for append+read so we can scan existing keys under the lock.
    flags = os.O_RDWR | os.O_APPEND | os.O_CREAT
    fd = os.open(agent_path, flags, 0o600)
    try:
        fcntl.flock(fd, fcntl.LOCK_EX)
        # Read current contents to dedup. O_APPEND means write goes to
        # end regardless of fd offset, so seeking is fine.
        os.lseek(fd, 0, os.SEEK_SET)
        existing = b""
        while True:
            chunk = os.read(fd, 1 << 20)
            if not chunk:
                break
            existing += chunk
        existing_keys = set()
        for r in parse_jsonl_bytes(existing):
            k = record_key(r)
            if k:
                existing_keys.add(k)
        appended = 0
        # Honor existing trailing-newline convention: if file is non-empty
        # and missing trailing newline, prepend one before our writes.
        prefix = b""
        if existing and not existing.endswith(b"\n"):
            prefix = b"\n"
        for r in records:
            k = record_key(r)
            if k and k in existing_keys:
                continue
            data = (prefix + serialize_record(r) + b"\n") if appended == 0 else (serialize_record(r) + b"\n")
            view = memoryview(data)
            n = 0
            while n < len(view):
                n += os.write(fd, view[n:])
            if k:
                existing_keys.add(k)
            appended += 1
        os.fsync(fd)
        return appended
    finally:
        try:
            fcntl.flock(fd, fcntl.LOCK_UN)
        finally:
            os.close(fd)


def process_one(
    entry: dict,
    merge_run_root: Path,
    fix_run_root: Path | None,
    apply: bool,
    dropped_lines: list[dict] | None = None,
) -> dict:
    """Plan (and optionally apply) the fix for one DIVERGENT entry.
    Returns a per-file log dict."""
    real = Path(entry["real"])
    out: dict = {
        "uuid": entry["uuid"],
        "real": str(real),
        "merge_appended": entry.get("appended"),
        "errors": [],
    }
    if not real.is_file():
        out["errors"].append(f"live file missing: {real}")
        return out

    backup = backup_path_for(real, merge_run_root)
    if not backup.is_file():
        out["errors"].append(f"backup missing: {backup}")
        return out

    # Backup keys: what was there pre-merge and must remain in main.
    backup_records = parse_jsonl_path(backup, dropped=dropped_lines)
    backup_keys = {k for r in backup_records for k in [record_key(r)] if k}
    out["backup_records"] = len(backup_records)

    session_root = real.with_suffix("")  # <UUID>/ next to <UUID>.jsonl
    sub_dir = session_root / "subagents"

    # We must read the live file under a lock so a concurrent Claude
    # Code write does not race with our partition. Apply path uses one
    # lock for the full read -> subagent-append -> rewrite sequence.
    # Dry-run path reads without the lock (no mutation), which is
    # acceptable for an estimate.
    if apply:
        # Save current live state before touching it.
        if fix_run_root is None:
            raise RuntimeError("apply requires fix_run_root")
        fb = fix_backup_path_for(real, fix_run_root)
        fb.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(real, fb)

        fd = os.open(real, os.O_RDWR)
        try:
            fcntl.flock(fd, fcntl.LOCK_EX)
            os.lseek(fd, 0, os.SEEK_SET)
            existing = b""
            while True:
                chunk = os.read(fd, 1 << 20)
                if not chunk:
                    break
                existing += chunk
            live_records = parse_jsonl_bytes(
                existing, dropped=dropped_lines, source=str(real)
            )

            keep: list[dict] = []
            move_by_agent: dict[str, list[dict]] = {}
            move_no_agent: list[dict] = []
            for r in live_records:
                k = record_key(r)
                if k is not None and k in backup_keys:
                    keep.append(r)
                    continue
                if is_subagent_record(r):
                    aid = agent_id_of(r)
                    if aid:
                        move_by_agent.setdefault(aid, []).append(r)
                    else:
                        # isSidechain truthy but no agentId -- rare.
                        # Keep in main rather than guess a target file.
                        move_no_agent.append(r)
                        keep.append(r)
                else:
                    keep.append(r)

            out["live_records_before"] = len(live_records)
            out["kept_main"] = len(keep)
            moved_per_agent_planned = {a: len(v) for a, v in move_by_agent.items()}
            out["moved_per_agent_planned"] = moved_per_agent_planned
            out["moved_no_agent_kept_main"] = len(move_no_agent)

            # CRASH-SAFETY ORDERING: append to subagent files FIRST, then
            # rewrite main. If we're killed between the two steps we end
            # up with duplicate data (in main AND in subagent), which is
            # recoverable by re-running the fixer. The reverse order risks
            # data loss (main truncated, subagent append never executed).
            # Subagent appends are idempotent: append_to_subagent_file
            # dedups planned records against on-disk content under flock.
            moved_per_agent_actual: dict[str, int] = {}
            for aid, recs in move_by_agent.items():
                agent_path = sub_dir / f"agent-{aid}.jsonl"
                try:
                    actual = append_to_subagent_file(agent_path, recs)
                    moved_per_agent_actual[aid] = actual
                except Exception as e:
                    out["errors"].append(f"append agent-{aid}: {e}")
                    moved_per_agent_actual[aid] = -1
            out["moved_per_agent_actual"] = moved_per_agent_actual

            # Aggregated totals so the run summary can show real numbers
            # instead of 0/0. `new_appends` = records actually written to
            # subagent files; `already_at_destination` = records skipped by
            # the dedup inside append_to_subagent_file.
            actual_total = sum(v for v in moved_per_agent_actual.values() if v >= 0)
            planned_total = sum(len(recs) for recs in move_by_agent.values())
            out["new_appends_total"] = actual_total
            out["already_at_destination_total"] = max(0, planned_total - actual_total)

            # Now rewrite main under the SAME flock we've been holding,
            # so concurrent writers see either the pre-fix file or the
            # post-fix file but never a mid-write state.
            body = b"".join(serialize_record(r) + b"\n" for r in keep)
            os.lseek(fd, 0, os.SEEK_SET)
            n = 0
            view = memoryview(body)
            while n < len(view):
                n += os.write(fd, view[n:])
            os.ftruncate(fd, len(body))
            os.fsync(fd)
        finally:
            try:
                fcntl.flock(fd, fcntl.LOCK_UN)
            finally:
                os.close(fd)
    else:
        # Dry-run: read live without a lock. Estimate only.
        live_records = parse_jsonl_path(real, dropped=dropped_lines)
        keep = []
        move_by_agent = {}
        move_no_agent = []
        for r in live_records:
            k = record_key(r)
            if k is not None and k in backup_keys:
                keep.append(r)
                continue
            if is_subagent_record(r):
                aid = agent_id_of(r)
                if aid:
                    move_by_agent.setdefault(aid, []).append(r)
                else:
                    move_no_agent.append(r)
                    keep.append(r)
            else:
                keep.append(r)
        out["live_records_before"] = len(live_records)
        out["kept_main"] = len(keep)
        out["moved_per_agent_planned"] = {a: len(v) for a, v in move_by_agent.items()}
        out["moved_no_agent_kept_main"] = len(move_no_agent)

        # Three-way split for honest dry-run reporting: of the planned
        # moves, how many are NEW appends (not already at destination)
        # vs ALREADY THERE (Claude Code wrote them at runtime, would be
        # dedup-skipped during apply)?
        new_appends_per_agent: dict[str, int] = {}
        already_per_agent: dict[str, int] = {}
        total_new = 0
        total_already = 0
        for aid, recs in move_by_agent.items():
            agent_path = sub_dir / f"agent-{aid}.jsonl"
            existing_keys = existing_subagent_keys(agent_path)
            new_count = 0
            for r in recs:
                k = record_key(r)
                if k and k in existing_keys:
                    continue
                new_count += 1
            already_count = len(recs) - new_count
            new_appends_per_agent[aid] = new_count
            already_per_agent[aid] = already_count
            total_new += new_count
            total_already += already_count
        out["new_appends_per_agent"] = new_appends_per_agent
        out["already_at_destination_per_agent"] = already_per_agent
        out["new_appends_total"] = total_new
        out["already_at_destination_total"] = total_already
    return out


def main() -> int:
    ap = argparse.ArgumentParser(description=__doc__)
    ap.add_argument("--apply", action="store_true",
                    help="Actually rewrite files. Without this, dry-run only.")
    ap.add_argument("--merge-run", type=Path, default=None,
                    help="Path to a merge-backups/<ts>/ dir (default: latest).")
    ap.add_argument("--limit", type=int, default=0,
                    help="Process only the first N DIVERGENT entries (debugging).")
    ap.add_argument("--exclude-uuid", action="append", default=[], metavar="UUID",
                    help="Skip merge-log entries whose 'uuid' equals UUID. "
                         "Repeatable. Use this to leave the live session of "
                         "the running Claude Code conversation untouched.")
    args = ap.parse_args()

    merge_run_root = args.merge_run or latest_merge_run()
    log_path = merge_run_root / "merge-log.json"
    if not log_path.is_file():
        print(f"ERROR: merge-log.json not found at {log_path}", file=sys.stderr)
        return 1
    print(f"[fix] merge-log: {log_path}", file=sys.stderr)
    merge_log = json.loads(log_path.read_text())

    excluded_uuids = set(args.exclude_uuid or [])
    all_divergent = [e for e in merge_log["entries"] if e.get("class") == "DIVERGENT"]
    excluded_entries = [
        {"uuid": e.get("uuid"), "real": e.get("real")}
        for e in all_divergent if e.get("uuid") in excluded_uuids
    ]
    divergent = [e for e in all_divergent if e.get("uuid") not in excluded_uuids]
    if excluded_uuids:
        print(f"[fix] excluded UUIDs: {sorted(excluded_uuids)}", file=sys.stderr)
        print(f"[fix] excluded entries matched: {len(excluded_entries)}", file=sys.stderr)
    if args.limit > 0:
        divergent = divergent[: args.limit]
    print(f"[fix] DIVERGENT entries to process: {len(divergent)}", file=sys.stderr)

    fix_run_root: Path | None = None
    if args.apply:
        ts = _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%dT%H%M%S_%fZ")
        fix_run_root = FIX_BACKUP_ROOT / ts
        fix_run_root.mkdir(parents=True, exist_ok=True)
        print(f"[fix] backups -> {fix_run_root}", file=sys.stderr)

    per_file_logs: list[dict] = []
    profile_totals: dict[str, dict[str, int]] = {}
    grand_kept = 0
    grand_moved = 0
    grand_new_appends = 0
    grand_already = 0
    grand_no_agent = 0
    failed = 0
    dropped_lines: list[dict] = []

    for i, entry in enumerate(divergent):
        try:
            res = process_one(
                entry, merge_run_root, fix_run_root, args.apply,
                dropped_lines=dropped_lines,
            )
        except Exception as e:
            failed += 1
            tb = traceback.format_exc(limit=3)
            res = {
                "uuid": entry.get("uuid"),
                "real": entry.get("real"),
                "errors": [f"unhandled: {e}\n{tb}"],
            }
            print(f"  FAIL {entry.get('uuid')}: {e}", file=sys.stderr)

        per_file_logs.append(res)
        if res.get("errors"):
            failed += 1

        prof = profile_of(Path(entry["real"]))
        prof_name = prof.name if prof else "?"
        slot = profile_totals.setdefault(
            prof_name,
            {"files": 0, "kept_main": 0, "moved": 0,
             "new_appends": 0, "already": 0, "no_agent": 0},
        )
        slot["files"] += 1
        slot["kept_main"] += res.get("kept_main", 0)
        moved = sum((res.get("moved_per_agent_planned") or {}).values())
        slot["moved"] += moved
        slot["new_appends"] += res.get("new_appends_total", 0)
        slot["already"] += res.get("already_at_destination_total", 0)
        slot["no_agent"] += res.get("moved_no_agent_kept_main", 0)

        grand_kept += res.get("kept_main", 0)
        grand_moved += moved
        grand_new_appends += res.get("new_appends_total", 0)
        grand_already += res.get("already_at_destination_total", 0)
        grand_no_agent += res.get("moved_no_agent_kept_main", 0)

        if (i + 1) % 25 == 0:
            print(f"  progress: {i+1}/{len(divergent)}", file=sys.stderr)

    # Per-profile breakdown.
    # Columns:
    #   removed_main = records leaving the main file (moved + no_agent)
    #   new_append   = records that don't already exist at the destination
    #                  subagent file -- TRUE net additions written by us
    #   dedup_skip   = records that already exist at the destination
    #                  (Claude Code wrote them at runtime); will be no-ops
    print()
    print("Per-profile breakdown:")
    print(
        f"  {'profile':<22} {'files':>6} {'kept_main':>10} "
        f"{'removed_main':>13} {'new_append':>11} {'dedup_skip':>11} "
        f"{'no_agentid':>11}"
    )
    for prof_name in sorted(profile_totals):
        t = profile_totals[prof_name]
        removed = t['moved']
        print(
            f"  {prof_name:<22} {t['files']:>6} {t['kept_main']:>10} "
            f"{removed:>13} {t['new_appends']:>11} {t['already']:>11} "
            f"{t['no_agent']:>11}"
        )
    print(
        f"  {'TOTAL':<22} {len(divergent):>6} {grand_kept:>10} "
        f"{grand_moved:>13} {grand_new_appends:>11} {grand_already:>11} "
        f"{grand_no_agent:>11}"
    )
    print()
    if not args.apply:
        # Reconciliation hint: removed_main should == new_append + dedup_skip.
        recon = grand_new_appends + grand_already
        ok = "OK" if recon == grand_moved else "MISMATCH"
        print(
            f"Reconciliation [{ok}]: removed_main ({grand_moved}) "
            f"== new_append ({grand_new_appends}) + dedup_skip ({grand_already}) "
            f"= {recon}"
        )
    print(f"Errors: {failed}")
    if dropped_lines:
        print(f"Dropped (unparseable) lines: {len(dropped_lines)}")
    if excluded_entries:
        print(
            f"Excluded entries (skipped via --exclude-uuid): "
            f"{len(excluded_entries)}"
        )
        for e in excluded_entries:
            print(f"  {e['uuid']}  {e['real']}")

    if args.apply and fix_run_root is not None:
        log_out = fix_run_root / "fix-log.json"
        log_out.write_text(json.dumps({
            "merge_run": str(merge_run_root),
            "applied": True,
            "totals": {
                "files": len(divergent),
                "kept_main": grand_kept,
                "moved_subagent": grand_moved,
                "no_agentid_kept_main": grand_no_agent,
                "errors": failed,
            },
            "per_profile": profile_totals,
            "excluded": excluded_entries,
            "dropped_lines": dropped_lines,
            "entries": per_file_logs,
        }, indent=2))
        print(f"Run log: {log_out}")

    if not args.apply:
        print()
        print("Dry-run only. Pass --apply to execute.")

    return 0 if failed == 0 else 2


if __name__ == "__main__":
    sys.exit(main())
