You are a senior code reviewer. Review the following code changes.

## Specification

No specification provided. Focus on correctness, tests, and integration.





## Code Changes

```diff
diff --git a/expert_build/cli.py b/expert_build/cli.py
index b15f174..0f99b53 100644
--- a/expert_build/cli.py
+++ b/expert_build/cli.py
@@ -111,6 +111,18 @@ def main():
     pipe_p.add_argument("--resume", action="store_true",
                         help="Resume from last saved pipeline state")
 
+    # -- derive-review-repair --
+    drr_p = sub.add_parser("derive-review-repair",
+                           help="Run derive/review/repair loop on existing belief network")
+    drr_p.add_argument("--model", default="claude", help="Model to use (default: claude)")
+    drr_p.add_argument("--rounds", type=int, default=3,
+                       help="Max derive/review/repair cycles (default: 3)")
+    drr_p.add_argument("--max-derive-rounds", type=int, default=10,
+                       help="Max derive exhaust rounds per cycle (default: 10)")
+    drr_p.add_argument("--timeout", type=int, default=600,
+                       help="LLM timeout in seconds (default: 600)")
+    drr_p.add_argument("--domain", help="Domain description for derive context")
+
     # -- status --
     sub.add_parser("status", help="Show pipeline progress")
 
@@ -135,6 +147,7 @@ def main():
         "cert-coverage": lambda a: _lazy("coverage", "cmd_cert_coverage")(a),
         "exam": lambda a: _lazy("exam", "cmd_exam")(a),
         "pipeline": lambda a: _lazy("pipeline", "cmd_pipeline")(a),
+        "derive-review-repair": lambda a: _lazy("pipeline", "cmd_derive_review_repair")(a),
         "status": lambda a: _lazy("init_cmd", "cmd_status")(a),
         "install-skill": lambda a: _lazy("init_cmd", "cmd_install_skill")(a),
     }
diff --git a/expert_build/pipeline.py b/expert_build/pipeline.py
index 6dcd4d5..f266b48 100644
--- a/expert_build/pipeline.py
+++ b/expert_build/pipeline.py
@@ -314,6 +314,119 @@ def _stage_export(args):
     print(f"\nFinal: {in_count} IN / {total} total beliefs", file=sys.stderr)
 
 
+def _run_convergence_loop(args, rounds, start_cycle=1, total_rounds=None,
+                          on_stage=None):
+    """Run derive -> review -> repair -> dedup until convergence.
+
+    Args:
+        args: namespace with .model, .timeout, .domain, .max_derive_rounds
+        rounds: number of cycles to run
+        start_cycle: cycle number to start from (for labeling)
+        total_rounds: total cycles for labels (defaults to start_cycle + rounds - 1)
+        on_stage: optional callback(cycle, stage_num, event, **kwargs)
+            event is "start" or "end", kwargs has stage-specific data
+
+    Returns summary dict with totals across all cycles.
+    """
+    if total_rounds is None:
+        total_rounds = start_cycle + rounds - 1
+
+    summary = {
+        "cycles": 0,
+        "total_derived": 0,
+        "total_reviewed": 0,
+        "total_invalid": 0,
+        "total_linked": 0,
+        "total_softened": 0,
+        "total_abandoned": 0,
+        "converged": False,
+    }
+
+    for i in range(rounds):
+        cycle = start_cycle + i
+        label = f"cycle {cycle}/{total_rounds}"
+        summary["cycles"] = i + 1
+
+        if on_stage:
+            on_stage(cycle, 4, "start")
+        added = _stage_derive(args, round_label=label)
+        summary["total_derived"] += added
+        if on_stage:
+            on_stage(cycle, 4, "end", added=added)
+
+        if on_stage:
+            on_stage(cycle, 5, "start")
+        review_result = _stage_review(args, round_label=label)
+        invalid_count = review_result.get("invalid", 0)
+        summary["total_reviewed"] += review_result.get("reviewed", 0)
+        summary["total_invalid"] += invalid_count
+        if on_stage:
+            on_stage(cycle, 5, "end", reviewed=review_result.get("reviewed", 0),
+                     invalid=invalid_count)
+
+        repair_result = None
+        if invalid_count > 0:
+            if on_stage:
+                on_stage(cycle, 6, "start")
+            repair_result = _stage_repair(args, review_result, round_label=label)
+            summary["total_linked"] += repair_result.get("linked", 0)
+            summary["total_softened"] += repair_result.get("softened", 0)
+            summary["total_abandoned"] += repair_result.get("abandoned", 0)
+            if on_stage:
+                on_stage(cycle, 6, "end")
+        elif on_stage:
+            on_stage(cycle, 6, "end", skipped=True)
+
+        if on_stage:
+            on_stage(cycle, 7, "start")
+        _stage_deduplicate(args, round_label=label)
+        if on_stage:
+            on_stage(cycle, 7, "end")
+
+        if invalid_count == 0 and added == 0:
+            print(f"\nConverged after {cycle} cycles "
+                  f"(0 invalids, 0 new derivations)", file=sys.stderr)
+            summary["converged"] = True
+            break
+
+    return summary
+
+
+def cmd_derive_review_repair(args):
+    """Run derive/review/repair loop on existing belief network."""
+    from .caffeinate import hold as _caffeinate
+    _caffeinate()
+
+    if not check_model_available(args.model):
+        print(f"Model not available: {args.model}", file=sys.stderr)
+        sys.exit(1)
+
+    if not Path(REASONS_DB).exists():
+        print(f"Reasons database not found: {REASONS_DB}", file=sys.stderr)
+        print("Run: expert-build pipeline or expert-build accept-beliefs first",
+              file=sys.stderr)
+        sys.exit(1)
+
+    rounds = getattr(args, "rounds", 3)
+    print(f"=== Derive-Review-Repair ===", file=sys.stderr)
+    print(f"Model: {args.model}", file=sys.stderr)
+    print(f"Max rounds: {rounds}", file=sys.stderr)
+    print(f"Max derive rounds per cycle: {args.max_derive_rounds}\n",
+          file=sys.stderr)
+
+    summary = _run_convergence_loop(args, rounds)
+
+    print(f"\n=== Summary ===", file=sys.stderr)
+    print(f"Cycles: {summary['cycles']}", file=sys.stderr)
+    print(f"Derived: {summary['total_derived']}", file=sys.stderr)
+    print(f"Reviewed: {summary['total_reviewed']}", file=sys.stderr)
+    print(f"Invalid: {summary['total_invalid']}", file=sys.stderr)
+    print(f"  Linked: {summary['total_linked']}", file=sys.stderr)
+    print(f"  Softened: {summary['total_softened']}", file=sys.stderr)
+    print(f"  Abandoned: {summary['total_abandoned']}", file=sys.stderr)
+    print(f"Converged: {'yes' if summary['converged'] else 'no'}", file=sys.stderr)
+
+
 def cmd_pipeline(args):
     """Run end-to-end EEM construction pipeline."""
     from .caffeinate import hold as _caffeinate
@@ -382,41 +495,27 @@ def cmd_pipeline(args):
         # Stages 4-7: Derive → Review → Repair → Deduplicate (convergence loop)
         if not state.get("loop_completed"):
             start_cycle = state.get("current_cycle") or 1
-            for cycle in range(start_cycle, args.rounds + 1):
-                label = f"cycle {cycle}/{args.rounds}"
-                state["current_cycle"] = cycle
-                _save_state(state)
-
-                _banner(4, total_stages, f"DERIVE ({label})")
-                _mark_stage(state, 4, "running", cycle=cycle)
-                added = _stage_derive(args, round_label=label)
-                _mark_stage(state, 4, "completed", cycle=cycle, added=added)
-
-                _banner(5, total_stages, f"REVIEW ({label})")
-                _mark_stage(state, 5, "running", cycle=cycle)
-                review_result = _stage_review(args, round_label=label)
-                invalid_count = review_result.get("invalid", 0)
-                _mark_stage(state, 5, "completed", cycle=cycle,
-                            reviewed=review_result.get("reviewed", 0),
-                            invalid=invalid_count)
-
-                if invalid_count > 0:
-                    _banner(6, total_stages, f"REPAIR ({label})")
-                    _mark_stage(state, 6, "running", cycle=cycle)
-                    _stage_repair(args, review_result, round_label=label)
-                    _mark_stage(state, 6, "completed", cycle=cycle)
+            remaining_rounds = args.rounds - start_cycle + 1
+
+            def _pipeline_on_stage(cycle, stage_num, event, **kwargs):
+                if event == "start":
+                    label = f"cycle {cycle}/{args.rounds}"
+                    if stage_num == 4:
+                        state["current_cycle"] = cycle
+                        _save_state(state)
+                    _banner(stage_num, total_stages,
+                            f"{STAGE_NAMES[stage_num].upper()} ({label})")
+                    _mark_stage(state, stage_num, "running", cycle=cycle)
                 else:
-                    _mark_stage(state, 6, "completed", cycle=cycle, skipped=True)
-
-                _banner(7, total_stages, f"DEDUPLICATE ({label})")
-                _mark_stage(state, 7, "running", cycle=cycle)
-                _stage_deduplicate(args, round_label=label)
-                _mark_stage(state, 7, "completed", cycle=cycle)
-
-                if invalid_count == 0 and added == 0:
-                    print(f"\nConverged after {cycle} cycles "
-                          f"(0 invalids, 0 new derivations)", file=sys.stderr)
-                    break
+                    _mark_stage(state, stage_num, "completed",
+                                cycle=cycle, **kwargs)
+
+            _run_convergence_loop(
+                args, remaining_rounds,
+                start_cycle=start_cycle,
+                total_rounds=args.rounds,
+                on_stage=_pipeline_on_stage,
+            )
 
             state["loop_completed"] = True
             _save_state(state)
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index 458fda6..1ce7775 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -8,6 +8,8 @@
 
 from expert_build.pipeline import (
     cmd_pipeline,
+    cmd_derive_review_repair,
+    _run_convergence_loop,
     _stage_ingest,
     _stage_extract,
     _stage_derive,
@@ -49,6 +51,18 @@ def make_pipeline_args(**overrides):
     return types.SimpleNamespace(**defaults)
 
 
+def make_drr_args(**overrides):
+    defaults = dict(
+        model="claude",
+        rounds=3,
+        max_derive_rounds=10,
+        timeout=600,
+        domain="Test domain",
+    )
+    defaults.update(overrides)
+    return types.SimpleNamespace(**defaults)
+
+
 # --- auto_accept_proposals ---
 
 class TestAutoAcceptProposals:
@@ -443,3 +457,121 @@ def test_corrupt_state_file_handled(self, work_dir):
              patch("expert_build.caffeinate.hold"), \
              pytest.raises(SystemExit):
             cmd_pipeline(args)
+
+
+# --- Derive-Review-Repair ---
+
+class TestConvergenceLoop:
+    def test_converges_on_zero_invalids_and_zero_derived(self, work_dir):
+        args = make_drr_args(rounds=3)
+        review_result = {"reviewed": 5, "invalid": 0, "results": []}
+
+        with patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"):
+            summary = _run_convergence_loop(args, rounds=3)
+
+        assert summary["converged"] is True
+        assert summary["cycles"] == 1
+
+    def test_runs_max_rounds_without_convergence(self, work_dir):
+        args = make_drr_args(rounds=2)
+        review_result = {"reviewed": 5, "invalid": 1, "results": [
+            {"belief_id": "b1", "valid": False},
+        ]}
+        repair_result = {"linked": 1, "softened": 0, "abandoned": 0}
+
+        with patch("expert_build.pipeline._stage_derive", return_value=1), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_repair", return_value=repair_result), \
+             patch("expert_build.pipeline._stage_deduplicate"):
+            summary = _run_convergence_loop(args, rounds=2)
+
+        assert summary["converged"] is False
+        assert summary["cycles"] == 2
+        assert summary["total_derived"] == 2
+        assert summary["total_linked"] == 2
+
+    def test_skips_repair_when_no_invalids(self, work_dir):
+        args = make_drr_args(rounds=1)
+        review_result = {"reviewed": 5, "invalid": 0, "results": []}
+
+        with patch("expert_build.pipeline._stage_derive", return_value=1), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_repair") as mock_repair, \
+             patch("expert_build.pipeline._stage_deduplicate"):
+            summary = _run_convergence_loop(args, rounds=1)
+
+        assert not mock_repair.called
+        assert summary["total_invalid"] == 0
+
+    def test_summary_accumulates_across_cycles(self, work_dir):
+        args = make_drr_args(rounds=2)
+        review_result = {"reviewed": 3, "invalid": 1, "results": [
+            {"belief_id": "b1", "valid": False},
+        ]}
+        repair_result = {"linked": 0, "softened": 1, "abandoned": 0}
+
+        with patch("expert_build.pipeline._stage_derive", return_value=2), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_repair", return_value=repair_result), \
+             patch("expert_build.pipeline._stage_deduplicate"):
+            summary = _run_convergence_loop(args, rounds=2)
+
+        assert summary["total_derived"] == 4
+        assert summary["total_reviewed"] == 6
+        assert summary["total_invalid"] == 2
+        assert summary["total_softened"] == 2
+
+    def test_on_stage_callback_called(self, work_dir):
+        args = make_drr_args(rounds=1)
+        review_result = {"reviewed": 1, "invalid": 0, "results": []}
+        events = []
+
+        def on_stage(cycle, stage_num, event, **kwargs):
+            events.append((cycle, stage_num, event))
+
+        with patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"):
+            _run_convergence_loop(args, rounds=1, on_stage=on_stage)
+
+        assert (1, 4, "start") in events
+        assert (1, 4, "end") in events
+        assert (1, 5, "start") in events
+        assert (1, 5, "end") in events
+        assert (1, 6, "end") in events
+        assert (1, 7, "start") in events
+        assert (1, 7, "end") in events
+
+
+class TestCmdDeriveReviewRepair:
+    def test_model_not_available_exits(self, work_dir):
+        args = make_drr_args(model="nonexistent")
+        with patch("expert_build.llm.check_model_available", return_value=False), \
+             pytest.raises(SystemExit):
+            cmd_derive_review_repair(args)
+
+    def test_missing_db_exits(self, work_dir):
+        (work_dir / "reasons.db").unlink()
+        args = make_drr_args()
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(SystemExit):
+            cmd_derive_review_repair(args)
+
+    def test_prints_summary(self, work_dir, capsys):
+        args = make_drr_args(rounds=1)
+        review_result = {"reviewed": 5, "invalid": 0, "results": []}
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_derive_review_repair(args)
+
+        captured = capsys.readouterr()
+        assert "Summary" in captured.err
+        assert "Derived: 0" in captured.err
+        assert "Converged: yes" in captured.err

```

## Observation Results

You previously requested observations. Here are the results:

```json
{
  "stage_repair_body": {
    "function": "_stage_repair",
    "file": "expert_build/pipeline.py",
    "start_line": 242,
    "end_line": 273,
    "source": "def _stage_repair(args, review_result, round_label=\"\"):\n    \"\"\"Stage 6: Research and repair invalid beliefs.\n\n    Returns the research results dict.\n    \"\"\"\n    from reasons_lib.api import research\n\n    prefix = f\"[{round_label}] \" if round_label else \"\"\n\n    invalid_ids = [\n        r[\"belief_id\"] for r in review_result.get(\"results\", [])\n        if not r.get(\"valid\", True)\n    ]\n\n    if not invalid_ids:\n        print(f\"{prefix}No invalid beliefs to repair\", file=sys.stderr)\n        return {\"total_invalid\": 0}\n\n    print(f\"{prefix}Researching {len(invalid_ids)} invalid beliefs...\", file=sys.stderr)\n\n    result = research(\n        belief_ids=invalid_ids,\n        model=args.model,\n        timeout=args.timeout,\n        db_path=REASONS_DB,\n    )\n\n    print(f\"{prefix}  Linked: {result.get('linked', 0)}, \"\n          f\"Softened: {result.get('softened', 0)}, \"\n          f\"Abandoned: {result.get('abandoned', 0)}\", file=sys.stderr)\n\n    return result"
  },
  "stage_derive_body": {
    "function": "_stage_derive",
    "file": "expert_build/pipeline.py",
    "start_line": 166,
    "end_line": 216,
    "source": "def _stage_derive(args, round_label=\"\"):\n    \"\"\"Stage 4: Derive new beliefs until saturated or max rounds hit.\n\n    Returns total number of beliefs added.\n    \"\"\"\n    from reasons_lib.api import export_network\n    from reasons_lib.derive import build_prompt, parse_proposals, validate_proposals, apply_proposals\n\n    total_added = 0\n    prefix = f\"[{round_label}] \" if round_label else \"\"\n\n    for derive_round in range(1, args.max_derive_rounds + 1):\n        print(f\"{prefix}Derive round {derive_round}/{args.max_derive_rounds}...\",\n              file=sys.stderr)\n\n        data = export_network(db_path=REASONS_DB)\n        nodes = data.get(\"nodes\", {})\n        if not nodes:\n            print(f\"{prefix}No nodes in network\", file=sys.stderr)\n            break\n\n        prompt, stats = build_prompt(nodes, domain=args.domain)\n        print(f\"{prefix}  Network: {stats['total_in']} IN, \"\n              f\"{stats['total_derived']} derived, depth {stats['max_depth']}\",\n              file=sys.stderr)\n\n        try:\n            response = invoke_sync(prompt, model=args.model, timeout=args.timeout)\n        except Exception as e:\n            print(f\"{prefix}  Derive error: {e}\", file=sys.stderr)\n            break\n\n        proposals = parse_proposals(response)\n        if not proposals:\n            print(f\"{prefix}  Saturated (no proposals)\", file=sys.stderr)\n            break\n\n        valid, skipped = validate_proposals(proposals, nodes)\n        for p, reason in skipped:\n            print(f\"{prefix}  SKIP {p['id']}: {reason}\", file=sys.stderr)\n\n        if not valid:\n            print(f\"{prefix}  Saturated (no valid proposals)\", file=sys.stderr)\n            break\n\n        results = apply_proposals(valid, db_path=REASONS_DB)\n        added = sum(1 for _, r in results if isinstance(r, dict))\n        total_added += added\n        print(f\"{prefix}  Added {added} beliefs\", file=sys.stderr)\n\n    return total_added"
  },
  "stage_review_body": {
    "function": "_stage_review",
    "file": "expert_build/pipeline.py",
    "start_line": 219,
    "end_line": 239,
    "source": "def _stage_review(args, round_label=\"\"):\n    \"\"\"Stage 5: Review derived beliefs for validity.\n\n    Returns the review results dict.\n    \"\"\"\n    from reasons_lib.api import review_beliefs\n\n    prefix = f\"[{round_label}] \" if round_label else \"\"\n    print(f\"{prefix}Reviewing beliefs...\", file=sys.stderr)\n\n    result = review_beliefs(\n        model=args.model,\n        timeout=args.timeout,\n        db_path=REASONS_DB,\n    )\n\n    reviewed = result.get(\"reviewed\", 0)\n    invalid = result.get(\"invalid\", 0)\n    print(f\"{prefix}  Reviewed {reviewed}, invalid {invalid}\", file=sys.stderr)\n\n    return result"
  },
  "stage_deduplicate_body": {
    "function": "_stage_deduplicate",
    "file": "expert_build/pipeline.py",
    "start_line": 276,
    "end_line": 293,
    "source": "def _stage_deduplicate(args, round_label=\"\"):\n    \"\"\"Stage 7: Remove duplicate beliefs.\"\"\"\n    from reasons_lib.api import deduplicate\n\n    prefix = f\"[{round_label}] \" if round_label else \"\"\n    print(f\"{prefix}Deduplicating...\", file=sys.stderr)\n\n    result = deduplicate(auto=True, db_path=REASONS_DB)\n    retracted = result.get(\"retracted\", [])\n    clusters = result.get(\"clusters\", [])\n\n    if retracted:\n        print(f\"{prefix}  {len(clusters)} clusters, retracted {len(retracted)}\",\n              file=sys.stderr)\n    else:\n        print(f\"{prefix}  No duplicates found\", file=sys.stderr)\n\n    return result"
  },
  "stage_names_definition": {
    "symbol": "STAGE_NAMES",
    "usages": [
      {
        "file": "expert_build/pipeline.py",
        "line": 15,
        "text": "STAGE_NAMES = {"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 46,
        "text": "for n, name in STAGE_NAMES.items()"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 71,
        "text": "key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 82,
        "text": "key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 507,
        "text": "f\"{STAGE_NAMES[stage_num].upper()} ({label})\")"
      }
    ],
    "production_usages": [
      {
        "file": "expert_build/pipeline.py",
        "line": 15,
        "text": "STAGE_NAMES = {"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 46,
        "text": "for n, name in STAGE_NAMES.items()"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 71,
        "text": "key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 82,
        "text": "key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 507,
        "text": "f\"{STAGE_NAMES[stage_num].upper()} ({label})\")"
      }
    ],
    "test_usages": [],
    "production_count": 5,
    "test_count": 0,
    "total_count": 5
  },
  "pipeline_imports": {
    "file": "expert_build/pipeline.py",
    "imports": [
      "json",
      "sys",
      "traceback"
    ],
    "from_imports": [
      {
        "module": "datetime",
        "names": [
          "datetime",
          "timezone"
        ]
      },
      {
        "module": "pathlib",
        "names": [
          "Path"
        ]
      },
      {
        "module": "types",
        "names": [
          "SimpleNamespace"
        ]
      },
      {
        "module": "llm",
        "names": [
          "check_model_available",
          "invoke_sync"
        ]
      },
      {
        "module": "propose",
        "names": [
          "PROJECT_DIR",
          "REASONS_DB"
        ]
      }
    ],
    "import_section": "\"\"\"End-to-end EEM construction pipeline.\"\"\"\n\nimport json\nimport sys\nimport traceback\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom types import SimpleNamespace\n\nfrom .llm import check_model_available, invoke_sync\nfrom .propose import PROJECT_DIR, REASONS_DB\n\nSTATE_FILE = Path(PROJECT_DIR) / \"pipeline-state.json\"\n\nSTAGE_NAMES = {\n    1: \"ingest\",\n    2: \"summarize\",\n    3: \"extract\",\n    4: \"derive\",\n    5: \"review\",\n    6: \"repair\",\n    7: \"deduplicate\",\n    8: \"export\",\n}\n\n\ndef _now():\n    return datetime.now(timezone.utc).isoformat(timespec=\"seconds\")\n\n\ndef _init_state(args):\n    state = {\n        \"started_at\": _now(),\n        \"updated_at\": _now(),\n        \"status\": \"running\",\n        \"current_stage\": None,\n        \"current_cycle\": None,\n        \"args\": {\n            \"url\": getattr(args, \"url\", None),\n            \"model\": args.model,\n            \"rounds\": args.rounds,\n            \"domain\": getattr(args, \"domain\", None),\n        },\n        \"stages\": {\n            f\"{n}_{name}\": {\"status\": \"pending\"}\n            for n, name in STAGE_NAMES.items()\n        },\n    }\n    _save_state(state)\n    return state\n\n\ndef _load_state():\n    if not STATE_FILE.exists():\n        return None\n    try:\n        return json.loads(STATE_FILE.read_text())\n    except (json.JSONDecodeError, ValueError):\n        print(f\"WARNING: corrupt state file {STATE_FILE}, ignoring\",\n              file=sys.stderr)\n        return None\n\n\ndef _save_state(state):\n    state[\"updated_at\"] = _now()\n    STATE_FILE.parent.mkdir(parents=True, exist_ok=True)\n    STATE_FILE.write_text(json.dumps(state, indent=2) + \"\\n\")\n\n\ndef _mark_stage(state, stage_num, status, **meta):\n    key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\"\n    state[\"stages\"][key][\"status\"] = status\n    if status == \"running\":\n        state[\"current_stage\"] = stage_num\n    elif status == \"completed\":\n        state[\"stages\"][key][\"completed_at\"] = _now()\n    state[\"stages\"][key].update(meta)\n    _save_state(state)\n\n\ndef _stage_completed(state, stage_num):\n    key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\"\n    return state[\"stages\"][key][\"status\"] == \"completed\"\n\n\ndef _banner(stage_num, total, name):\n    print(f\"\\n{'=' * 50}\", file=sys.stderr)\n    print(f\"  Stage {stage_num}/{total}: {name}\", file=sys.stderr)\n    print(f\"{'=' * 50}\\n\", file=sys.stderr)\n\n\ndef _stage_ingest(args):\n    \"\"\"Stage 1: Fetch docs or chunk PDFs into sources/.\"\"\""
  },
  "convergence_loop_callers": {
    "symbol": "_run_convergence_loop",
    "production_callers": [
      {
        "file": "expert_build/pipeline.py",
        "line": 317,
        "text": "def _run_convergence_loop(args, rounds, start_cycle=1, total_rounds=None,",
        "context_function": "_stage_export",
        "context_snippet": "   314:     print(f\"\\nFinal: {in_count} IN / {total} total beliefs\", file=sys.stderr)\n   315: \n   316: \n>> 317: def _run_convergence_loop(args, rounds, start_cycle=1, total_rounds=None,\n   318:                           on_stage=None):\n   319:     \"\"\"Run derive -> review -> repair -> dedup until convergence.\n   320: "
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 417,
        "text": "summary = _run_convergence_loop(args, rounds)",
        "context_function": "cmd_derive_review_repair",
        "context_snippet": "   414:     print(f\"Max derive rounds per cycle: {args.max_derive_rounds}\\n\",\n   415:           file=sys.stderr)\n   416: \n>> 417:     summary = _run_convergence_loop(args, rounds)\n   418: \n   419:     print(f\"\\n=== Summary ===\", file=sys.stderr)\n   420:     print(f\"Cycles: {summary['cycles']}\", file=sys.stderr)"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 513,
        "text": "_run_convergence_loop(",
        "context_function": "_pipeline_on_stage",
        "context_snippet": "   510:                     _mark_stage(state, stage_num, \"completed\",\n   511:                                 cycle=cycle, **kwargs)\n   512: \n>> 513:             _run_convergence_loop(\n   514:                 args, remaining_rounds,\n   515:                 start_cycle=start_cycle,\n   516:                 total_rounds=args.rounds,"
      }
    ],
    "test_callers": [
      {
        "file": "tests/test_pipeline.py",
        "line": 12,
        "text": "_run_convergence_loop,",
        "context_function": null,
        "context_snippet": "   9: from expert_build.pipeline import (\n   10:     cmd_pipeline,\n   11:     cmd_derive_review_repair,\n>> 12:     _run_convergence_loop,\n   13:     _stage_ingest,\n   14:     _stage_extract,\n   15:     _stage_derive,"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 472,
        "text": "summary = _run_convergence_loop(args, rounds=3)",
        "context_function": "test_converges_on_zero_invalids_and_zero_derived",
        "context_snippet": "   469:         with patch(\"expert_build.pipeline._stage_derive\", return_value=0), \\\n   470:              patch(\"expert_build.pipeline._stage_review\", return_value=review_result), \\\n   471:              patch(\"expert_build.pipeline._stage_deduplicate\"):\n>> 472:             summary = _run_convergence_loop(args, rounds=3)\n   473: \n   474:         assert summary[\"converged\"] is True\n   475:         assert summary[\"cycles\"] == 1"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 488,
        "text": "summary = _run_convergence_loop(args, rounds=2)",
        "context_function": "test_runs_max_rounds_without_convergence",
        "context_snippet": "   485:              patch(\"expert_build.pipeline._stage_review\", return_value=review_result), \\\n   486:              patch(\"expert_build.pipeline._stage_repair\", return_value=repair_result), \\\n   487:              patch(\"expert_build.pipeline._stage_deduplicate\"):\n>> 488:             summary = _run_convergence_loop(args, rounds=2)\n   489: \n   490:         assert summary[\"converged\"] is False\n   491:         assert summary[\"cycles\"] == 2"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 503,
        "text": "summary = _run_convergence_loop(args, rounds=1)",
        "context_function": "test_skips_repair_when_no_invalids",
        "context_snippet": "   500:              patch(\"expert_build.pipeline._stage_review\", return_value=review_result), \\\n   501:              patch(\"expert_build.pipeline._stage_repair\") as mock_repair, \\\n   502:              patch(\"expert_build.pipeline._stage_deduplicate\"):\n>> 503:             summary = _run_convergence_loop(args, rounds=1)\n   504: \n   505:         assert not mock_repair.called\n   506:         assert summary[\"total_invalid\"] == 0"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 519,
        "text": "summary = _run_convergence_loop(args, rounds=2)",
        "context_function": "test_summary_accumulates_across_cycles",
        "context_snippet": "   516:              patch(\"expert_build.pipeline._stage_review\", return_value=review_result), \\\n   517:              patch(\"expert_build.pipeline._stage_repair\", return_value=repair_result), \\\n   518:              patch(\"expert_build.pipeline._stage_deduplicate\"):\n>> 519:             summary = _run_convergence_loop(args, rounds=2)\n   520: \n   521:         assert summary[\"total_derived\"] == 4\n   522:         assert summary[\"total_reviewed\"] == 6"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 537,
        "text": "_run_convergence_loop(args, rounds=1, on_stage=on_stage)",
        "context_function": "on_stage",
        "context_snippet": "   534:         with patch(\"expert_build.pipeline._stage_derive\", return_value=0), \\\n   535:              patch(\"expert_build.pipeline._stage_review\", return_value=review_result), \\\n   536:              patch(\"expert_build.pipeline._stage_deduplicate\"):\n>> 537:             _run_convergence_loop(args, rounds=1, on_stage=on_stage)\n   538: \n   539:         assert (1, 4, \"start\") in events\n   540:         assert (1, 4, \"end\") in events"
      }
    ],
    "production_count": 3,
    "test_count": 6,
    "total_count": 9
  },
  "mark_stage_body": {
    "function": "_mark_stage",
    "file": "expert_build/pipeline.py",
    "start_line": 70,
    "end_line": 78,
    "source": "def _mark_stage(state, stage_num, status, **meta):\n    key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\"\n    state[\"stages\"][key][\"status\"] = status\n    if status == \"running\":\n        state[\"current_stage\"] = stage_num\n    elif status == \"completed\":\n        state[\"stages\"][key][\"completed_at\"] = _now()\n    state[\"stages\"][key].update(meta)\n    _save_state(state)"
  },
  "lazy_body": {
    "function": "_lazy",
    "file": "expert_build/cli.py",
    "start_line": 11,
    "end_line": 14,
    "source": "def _lazy(module_name, func_name):\n    \"\"\"Lazy import to keep startup fast.\"\"\"\n    mod = importlib.import_module(f\".{module_name}\", package=\"expert_build\")\n    return getattr(mod, func_name)"
  }
}
```

Use these results to inform your review. Do not request the same observations again.


## Instructions

For each significant change (new file, modified function, etc.), provide a structured verdict.

Use this exact format for each change:

### <file_path or file_path:function_name>
VERDICT: PASS | CONCERN | BLOCK
CORRECTNESS: VALID | QUESTIONABLE | BROKEN
SPEC_COMPLIANCE: MEETS | PARTIAL | VIOLATES | N/A
ISSUE_COMPLIANCE: ADDRESSES | PARTIAL | UNRELATED | N/A
BELIEF_COMPLIANCE: CONSISTENT | VIOLATES | N/A
TEST_COVERAGE: COVERED | PARTIAL | UNTESTED
INTEGRATION: WIRED | PARTIAL | MISSING
REASONING: <brief explanation of your assessment>
---

## Review Criteria

1. **CORRECTNESS**: Does the code do what it claims? Is the logic sound?
   - VALID: Logic is correct, no bugs apparent
   - QUESTIONABLE: Logic may have edge cases or unclear behavior
   - BROKEN: Clear bugs or incorrect behavior

2. **SPEC_COMPLIANCE**: Does it meet MUST requirements from the spec?
   - MEETS: All relevant spec requirements satisfied
   - PARTIAL: Some requirements met, others missing or incomplete
   - VIOLATES: Contradicts spec requirements
   - N/A: No spec provided or not applicable

3. **ISSUE_COMPLIANCE** (only when an issue is provided): Do the changes address the problem or feature described in the issue?
   - ADDRESSES: Changes directly solve the issue's stated problem or implement the requested feature
   - PARTIAL: Changes partially address the issue but leave some aspects unresolved
   - UNRELATED: Changes do not appear related to the issue
   - N/A: No issue provided

4. **TEST_COVERAGE**: Are there tests for the new/changed code?
   - COVERED: Tests exist and cover the changes
   - PARTIAL: Some tests exist but coverage is incomplete
   - UNTESTED: No tests for the changes

5. **INTEGRATION**: Are callers updated? Is the feature usable end-to-end?
   - WIRED: Feature is fully integrated and usable
   - PARTIAL: Interface exists but callers not updated, or integration incomplete
   - MISSING: No integration with existing code

6. **BELIEF_COMPLIANCE** (only when beliefs are provided): Do the changes respect known architectural invariants, contracts, and rules?
   - CONSISTENT: Changes align with or reinforce known beliefs
   - VIOLATES: Changes contradict a specific belief — cite the belief ID
   - N/A: No beliefs provided or no relevant beliefs apply

## Verdict Guidelines

- **BLOCK**: Security issues, broken functionality, spec violations, or missing critical integration
- **CONCERN**: Missing tests, partial integration, questionable patterns, or unclear logic
- **PASS**: Correct, tested, well-integrated code

## Important

- Full function bodies for modified functions may be available in the observations section — use them to verify the complete logic, not just the diff hunks
- Related test files (prefixed with ``related_test:``) may be included in observations — check whether existing test assertions still match modified return types, signatures, or behavior. Flag any test that would break due to the changes
- If duplicate test coverage is detected (multiple test files covering the same source), note it in your review
- Focus on actual issues, not style preferences
- If a method signature is added but callers aren't updated, that's PARTIAL integration
- Be specific in reasoning - reference line numbers or function names
- When in doubt, use CONCERN rather than PASS

## Self-Review

After completing your review, add a brief self-assessment:

### SELF_REVIEW
LIMITATIONS: <what context were you missing that affected review quality?>
---

Examples of limitations:
- "Could not see full class to verify no other methods access the modified field"
- "Test file not included in diff - cannot verify coverage claims"
- "Spec file referenced but not provided"


## Feature Requests

If this review tool could be improved to help you do a better job, suggest features:

### FEATURE_REQUESTS
- <suggestion 1>
- <suggestion 2>
---

Examples:
- "Include full file context for modified functions, not just diff hunks"
- "Show callers of modified methods to verify integration"
- "Include test file alongside implementation changes"

Only include this section if you have specific suggestions. Skip if none.
