You are a senior code reviewer. Review the following code changes.

## Specification

No specification provided. Focus on correctness, tests, and integration.





## Code Changes

```diff
diff --git a/expert_build/cli.py b/expert_build/cli.py
index 8dcd7a6..00ea9ad 100644
--- a/expert_build/cli.py
+++ b/expert_build/cli.py
@@ -108,6 +108,8 @@ def main():
     pipe_p.add_argument("--timeout", type=int, default=600,
                         help="LLM timeout in seconds (default: 600)")
     pipe_p.add_argument("--domain", help="Domain description for derive context")
+    pipe_p.add_argument("--resume", action="store_true",
+                        help="Resume from last saved pipeline state")
 
     # -- status --
     sub.add_parser("status", help="Show pipeline progress")
diff --git a/expert_build/pipeline.py b/expert_build/pipeline.py
index 82c7966..7da0b11 100644
--- a/expert_build/pipeline.py
+++ b/expert_build/pipeline.py
@@ -2,11 +2,80 @@
 
 import json
 import sys
+import traceback
+from datetime import datetime, timezone
 from pathlib import Path
 from types import SimpleNamespace
 
 from .llm import check_model_available, invoke_sync
-from .propose import REASONS_DB
+from .propose import PROJECT_DIR, REASONS_DB
+
+STATE_FILE = Path(PROJECT_DIR) / "pipeline-state.json"
+
+STAGE_NAMES = {
+    1: "ingest",
+    2: "summarize",
+    3: "extract",
+    4: "derive",
+    5: "review",
+    6: "repair",
+    7: "deduplicate",
+    8: "export",
+}
+
+
+def _now():
+    return datetime.now(timezone.utc).isoformat(timespec="seconds")
+
+
+def _init_state(args):
+    state = {
+        "started_at": _now(),
+        "updated_at": _now(),
+        "status": "running",
+        "current_stage": None,
+        "current_cycle": None,
+        "args": {
+            "url": getattr(args, "url", None),
+            "model": args.model,
+            "rounds": args.rounds,
+            "domain": getattr(args, "domain", None),
+        },
+        "stages": {
+            f"{n}_{name}": {"status": "pending"}
+            for n, name in STAGE_NAMES.items()
+        },
+    }
+    _save_state(state)
+    return state
+
+
+def _load_state():
+    if not STATE_FILE.exists():
+        return None
+    return json.loads(STATE_FILE.read_text())
+
+
+def _save_state(state):
+    state["updated_at"] = _now()
+    STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
+    STATE_FILE.write_text(json.dumps(state, indent=2) + "\n")
+
+
+def _mark_stage(state, stage_num, status, **meta):
+    key = f"{stage_num}_{STAGE_NAMES[stage_num]}"
+    state["stages"][key]["status"] = status
+    if status == "running":
+        state["current_stage"] = stage_num
+    elif status == "completed":
+        state["stages"][key]["completed_at"] = _now()
+    state["stages"][key].update(meta)
+    _save_state(state)
+
+
+def _stage_completed(state, stage_num):
+    key = f"{stage_num}_{STAGE_NAMES[stage_num]}"
+    return state["stages"][key]["status"] == "completed"
 
 
 def _banner(stage_num, total, name):
@@ -249,52 +318,109 @@ def cmd_pipeline(args):
         print(f"Model not available: {args.model}", file=sys.stderr)
         sys.exit(1)
 
-    total_stages = 8
-    has_sources = args.url or args.pdf
-
-    # Stage 1: Ingest
-    if has_sources:
-        _banner(1, total_stages, "INGEST")
-        _stage_ingest(args)
+    resume = getattr(args, "resume", False)
+    if resume:
+        state = _load_state()
+        if not state:
+            print("No pipeline state to resume. Run without --resume first.",
+                  file=sys.stderr)
+            sys.exit(1)
+        print(f"Resuming pipeline from state file", file=sys.stderr)
+        state["status"] = "running"
+        _save_state(state)
     else:
-        print("No --url or --pdf provided, skipping ingest", file=sys.stderr)
-
-    # Stage 2: Summarize
-    _banner(2, total_stages, "SUMMARIZE")
-    _stage_summarize(args)
-
-    # Stage 3: Extract
-    _banner(3, total_stages, "EXTRACT")
-    should_continue = _stage_extract(args)
-    if not should_continue:
-        return
-
-    # Stages 4-7: Derive → Review → Repair → Deduplicate (convergence loop)
-    for cycle in range(1, args.rounds + 1):
-        label = f"cycle {cycle}/{args.rounds}"
-
-        _banner(4, total_stages, f"DERIVE ({label})")
-        added = _stage_derive(args, round_label=label)
-
-        _banner(5, total_stages, f"REVIEW ({label})")
-        review_result = _stage_review(args, round_label=label)
+        state = _init_state(args)
 
-        invalid_count = review_result.get("invalid", 0)
-
-        if invalid_count > 0:
-            _banner(6, total_stages, f"REPAIR ({label})")
-            _stage_repair(args, review_result, round_label=label)
-
-        _banner(7, total_stages, f"DEDUPLICATE ({label})")
-        _stage_deduplicate(args, round_label=label)
-
-        if invalid_count == 0 and added == 0:
-            print(f"\nConverged after {cycle} cycles "
-                  f"(0 invalids, 0 new derivations)", file=sys.stderr)
-            break
-
-    # Stage 8: Export
-    _banner(8, total_stages, "EXPORT")
-    _stage_export(args)
+    total_stages = 8
+    has_sources = args.url or args.pdf
 
-    print("\nPipeline complete.", file=sys.stderr)
+    try:
+        # Stage 1: Ingest
+        if not _stage_completed(state, 1):
+            if has_sources:
+                _banner(1, total_stages, "INGEST")
+                _mark_stage(state, 1, "running")
+                _stage_ingest(args)
+                _mark_stage(state, 1, "completed")
+            else:
+                print("No --url or --pdf provided, skipping ingest", file=sys.stderr)
+                _mark_stage(state, 1, "completed", skipped=True)
+        else:
+            print("Stage 1 (INGEST) already completed, skipping", file=sys.stderr)
+
+        # Stage 2: Summarize
+        if not _stage_completed(state, 2):
+            _banner(2, total_stages, "SUMMARIZE")
+            _mark_stage(state, 2, "running")
+            _stage_summarize(args)
+            _mark_stage(state, 2, "completed")
+        else:
+            print("Stage 2 (SUMMARIZE) already completed, skipping", file=sys.stderr)
+
+        # Stage 3: Extract
+        if not _stage_completed(state, 3):
+            _banner(3, total_stages, "EXTRACT")
+            _mark_stage(state, 3, "running")
+            should_continue = _stage_extract(args)
+            _mark_stage(state, 3, "completed")
+            if not should_continue:
+                state["status"] = "paused"
+                _save_state(state)
+                return
+        else:
+            print("Stage 3 (EXTRACT) already completed, skipping", file=sys.stderr)
+
+        # Stages 4-7: Derive → Review → Repair → Deduplicate (convergence loop)
+        start_cycle = state.get("current_cycle") or 1
+        for cycle in range(start_cycle, args.rounds + 1):
+            label = f"cycle {cycle}/{args.rounds}"
+            state["current_cycle"] = cycle
+            _save_state(state)
+
+            _banner(4, total_stages, f"DERIVE ({label})")
+            _mark_stage(state, 4, "running", cycle=cycle)
+            added = _stage_derive(args, round_label=label)
+            _mark_stage(state, 4, "completed", cycle=cycle, added=added)
+
+            _banner(5, total_stages, f"REVIEW ({label})")
+            _mark_stage(state, 5, "running", cycle=cycle)
+            review_result = _stage_review(args, round_label=label)
+            invalid_count = review_result.get("invalid", 0)
+            _mark_stage(state, 5, "completed", cycle=cycle,
+                        reviewed=review_result.get("reviewed", 0),
+                        invalid=invalid_count)
+
+            if invalid_count > 0:
+                _banner(6, total_stages, f"REPAIR ({label})")
+                _mark_stage(state, 6, "running", cycle=cycle)
+                _stage_repair(args, review_result, round_label=label)
+                _mark_stage(state, 6, "completed", cycle=cycle)
+            else:
+                _mark_stage(state, 6, "completed", cycle=cycle, skipped=True)
+
+            _banner(7, total_stages, f"DEDUPLICATE ({label})")
+            _mark_stage(state, 7, "running", cycle=cycle)
+            _stage_deduplicate(args, round_label=label)
+            _mark_stage(state, 7, "completed", cycle=cycle)
+
+            if invalid_count == 0 and added == 0:
+                print(f"\nConverged after {cycle} cycles "
+                      f"(0 invalids, 0 new derivations)", file=sys.stderr)
+                break
+
+        # Stage 8: Export
+        _banner(8, total_stages, "EXPORT")
+        _mark_stage(state, 8, "running")
+        _stage_export(args)
+        _mark_stage(state, 8, "completed")
+
+        state["status"] = "completed"
+        _save_state(state)
+        print("\nPipeline complete.", file=sys.stderr)
+
+    except Exception as e:
+        state["status"] = "failed"
+        state["error"] = str(e)
+        state["error_traceback"] = traceback.format_exc()
+        _save_state(state)
+        raise
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index fd208ba..0906319 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -1,5 +1,6 @@
 """Tests for the pipeline command."""
 
+import json
 import types
 from pathlib import Path
 from unittest.mock import patch
@@ -14,6 +15,8 @@
     _stage_review,
     _stage_repair,
     _stage_deduplicate,
+    STATE_FILE,
+    _load_state,
 )
 from expert_build.propose import auto_accept_proposals
 
@@ -41,6 +44,7 @@ def make_pipeline_args(**overrides):
         depth=2,
         timeout=600,
         domain="Test domain",
+        resume=False,
     )
     defaults.update(overrides)
     return types.SimpleNamespace(**defaults)
@@ -303,3 +307,100 @@ def test_no_auto_accept_stops_early(self, work_dir, capsys):
 
         assert not mock_derive.called
         assert not mock_export.called
+
+
+# --- Pipeline State ---
+
+class TestPipelineState:
+    def test_state_file_created_on_run(self, work_dir):
+        args = make_pipeline_args(rounds=1, url=None, pdf=None)
+        review_result = {"reviewed": 0, "invalid": 0, "results": []}
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract", return_value=True), \
+             patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"), \
+             patch("expert_build.pipeline._stage_export"), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state is not None
+        assert state["status"] == "completed"
+        assert state["stages"]["8_export"]["status"] == "completed"
+
+    def test_state_records_failure(self, work_dir):
+        args = make_pipeline_args(url=None, pdf=None)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize",
+                   side_effect=RuntimeError("LLM exploded")), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(RuntimeError, match="LLM exploded"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["status"] == "failed"
+        assert "LLM exploded" in state["error"]
+        assert state["stages"]["2_summarize"]["status"] == "running"
+
+    def test_resume_skips_completed_stages(self, work_dir, capsys):
+        args = make_pipeline_args(rounds=1, url=None, pdf=None)
+        review_result = {"reviewed": 0, "invalid": 0, "results": []}
+
+        # First run: complete through summarize, then fail at extract
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract",
+                   side_effect=RuntimeError("crash")), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(RuntimeError):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["stages"]["1_ingest"]["status"] == "completed"
+        assert state["stages"]["2_summarize"]["status"] == "completed"
+        assert state["stages"]["3_extract"]["status"] == "running"
+
+        # Resume: should skip ingest and summarize
+        resume_args = make_pipeline_args(rounds=1, url=None, pdf=None, resume=True)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize") as mock_summarize, \
+             patch("expert_build.pipeline._stage_extract", return_value=True), \
+             patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"), \
+             patch("expert_build.pipeline._stage_export"), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(resume_args)
+
+        assert not mock_summarize.called
+        captured = capsys.readouterr()
+        assert "already completed, skipping" in captured.err
+
+        state = _load_state()
+        assert state["status"] == "completed"
+
+    def test_resume_without_state_exits(self, work_dir):
+        args = make_pipeline_args(resume=True)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(SystemExit):
+            cmd_pipeline(args)
+
+    def test_no_auto_accept_sets_paused(self, work_dir):
+        args = make_pipeline_args(no_auto_accept=True, url=None, pdf=None)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract", return_value=False), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["status"] == "paused"
+        assert state["stages"]["3_extract"]["status"] == "completed"

```

## Observation Results

You previously requested observations. Here are the results:

```json
{
  "project_dir_definition": {
    "symbol": "PROJECT_DIR",
    "usages": [
      {
        "file": "expert_build/propose.py",
        "line": 15,
        "text": "PROJECT_DIR = \".expert-build\""
      },
      {
        "file": "expert_build/propose.py",
        "line": 285,
        "text": "processed_path = Path(PROJECT_DIR) / \"proposed-entries.json\""
      },
      {
        "file": "expert_build/propose.py",
        "line": 311,
        "text": "cache_path = Path(PROJECT_DIR) / \"belief-vectors.json\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 11,
        "text": "from .propose import PROJECT_DIR, REASONS_DB"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 13,
        "text": "STATE_FILE = Path(PROJECT_DIR) / \"pipeline-state.json\""
      }
    ],
    "production_usages": [
      {
        "file": "expert_build/propose.py",
        "line": 15,
        "text": "PROJECT_DIR = \".expert-build\""
      },
      {
        "file": "expert_build/propose.py",
        "line": 285,
        "text": "processed_path = Path(PROJECT_DIR) / \"proposed-entries.json\""
      },
      {
        "file": "expert_build/propose.py",
        "line": 311,
        "text": "cache_path = Path(PROJECT_DIR) / \"belief-vectors.json\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 11,
        "text": "from .propose import PROJECT_DIR, REASONS_DB"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 13,
        "text": "STATE_FILE = Path(PROJECT_DIR) / \"pipeline-state.json\""
      }
    ],
    "test_usages": [],
    "production_count": 5,
    "test_count": 0,
    "total_count": 5
  },
  "cmd_pipeline_full": {
    "function": "cmd_pipeline",
    "file": "expert_build/pipeline.py",
    "start_line": 312,
    "end_line": 426,
    "source": "def cmd_pipeline(args):\n    \"\"\"Run end-to-end EEM construction pipeline.\"\"\"\n    from .caffeinate import hold as _caffeinate\n    _caffeinate()\n\n    if not check_model_available(args.model):\n        print(f\"Model not available: {args.model}\", file=sys.stderr)\n        sys.exit(1)\n\n    resume = getattr(args, \"resume\", False)\n    if resume:\n        state = _load_state()\n        if not state:\n            print(\"No pipeline state to resume. Run without --resume first.\",\n                  file=sys.stderr)\n            sys.exit(1)\n        print(f\"Resuming pipeline from state file\", file=sys.stderr)\n        state[\"status\"] = \"running\"\n        _save_state(state)\n    else:\n        state = _init_state(args)\n\n    total_stages = 8\n    has_sources = args.url or args.pdf\n\n    try:\n        # Stage 1: Ingest\n        if not _stage_completed(state, 1):\n            if has_sources:\n                _banner(1, total_stages, \"INGEST\")\n                _mark_stage(state, 1, \"running\")\n                _stage_ingest(args)\n                _mark_stage(state, 1, \"completed\")\n            else:\n                print(\"No --url or --pdf provided, skipping ingest\", file=sys.stderr)\n                _mark_stage(state, 1, \"completed\", skipped=True)\n        else:\n            print(\"Stage 1 (INGEST) already completed, skipping\", file=sys.stderr)\n\n        # Stage 2: Summarize\n        if not _stage_completed(state, 2):\n            _banner(2, total_stages, \"SUMMARIZE\")\n            _mark_stage(state, 2, \"running\")\n            _stage_summarize(args)\n            _mark_stage(state, 2, \"completed\")\n        else:\n            print(\"Stage 2 (SUMMARIZE) already completed, skipping\", file=sys.stderr)\n\n        # Stage 3: Extract\n        if not _stage_completed(state, 3):\n            _banner(3, total_stages, \"EXTRACT\")\n            _mark_stage(state, 3, \"running\")\n            should_continue = _stage_extract(args)\n            _mark_stage(state, 3, \"completed\")\n            if not should_continue:\n                state[\"status\"] = \"paused\"\n                _save_state(state)\n                return\n        else:\n            print(\"Stage 3 (EXTRACT) already completed, skipping\", file=sys.stderr)\n\n        # Stages 4-7: Derive \u2192 Review \u2192 Repair \u2192 Deduplicate (convergence loop)\n        start_cycle = state.get(\"current_cycle\") or 1\n        for cycle in range(start_cycle, args.rounds + 1):\n            label = f\"cycle {cycle}/{args.rounds}\"\n            state[\"current_cycle\"] = cycle\n            _save_state(state)\n\n            _banner(4, total_stages, f\"DERIVE ({label})\")\n            _mark_stage(state, 4, \"running\", cycle=cycle)\n            added = _stage_derive(args, round_label=label)\n            _mark_stage(state, 4, \"completed\", cycle=cycle, added=added)\n\n            _banner(5, total_stages, f\"REVIEW ({label})\")\n            _mark_stage(state, 5, \"running\", cycle=cycle)\n            review_result = _stage_review(args, round_label=label)\n            invalid_count = review_result.get(\"invalid\", 0)\n            _mark_stage(state, 5, \"completed\", cycle=cycle,\n                        reviewed=review_result.get(\"reviewed\", 0),\n                        invalid=invalid_count)\n\n            if invalid_count > 0:\n                _banner(6, total_stages, f\"REPAIR ({label})\")\n                _mark_stage(state, 6, \"running\", cycle=cycle)\n                _stage_repair(args, review_result, round_label=label)\n                _mark_stage(state, 6, \"completed\", cycle=cycle)\n            else:\n                _mark_stage(state, 6, \"completed\", cycle=cycle, skipped=True)\n\n            _banner(7, total_stages, f\"DEDUPLICATE ({label})\")\n            _mark_stage(state, 7, \"running\", cycle=cycle)\n            _stage_deduplicate(args, round_label=label)\n            _mark_stage(state, 7, \"completed\", cycle=cycle)\n\n            if invalid_count == 0 and added == 0:\n                print(f\"\\nConverged after {cycle} cycles \"\n                      f\"(0 invalids, 0 new derivations)\", file=sys.stderr)\n                break\n\n        # Stage 8: Export\n        _banner(8, total_stages, \"EXPORT\")\n        _mark_stage(state, 8, \"running\")\n        _stage_export(args)\n        _mark_stage(state, 8, \"completed\")\n\n        state[\"status\"] = \"completed\"\n        _save_state(state)\n        print(\"\\nPipeline complete.\", file=sys.stderr)\n\n    except Exception as e:\n        state[\"status\"] = \"failed\"\n        state[\"error\"] = str(e)\n        state[\"error_traceback\"] = traceback.format_exc()\n        _save_state(state)\n        raise"
  },
  "stage_extract_body": {
    "function": "_stage_extract",
    "file": "expert_build/pipeline.py",
    "start_line": 128,
    "end_line": 158,
    "source": "def _stage_extract(args):\n    \"\"\"Stage 3: Extract beliefs from entries and optionally auto-accept.\"\"\"\n    from .propose import cmd_propose_beliefs, cmd_accept_beliefs\n\n    prop_args = SimpleNamespace(\n        input_dir=\"entries\",\n        output=\"proposed-beliefs.md\",\n        model=args.model,\n        batch_size=5,\n        entry=None,\n    )\n    setattr(prop_args, \"all\", False)\n\n    cmd_propose_beliefs(prop_args)\n\n    if args.no_auto_accept:\n        print(\"\\nStopping after propose-beliefs (--no-auto-accept)\", file=sys.stderr)\n        print(\"Review proposed-beliefs.md, mark entries as [ACCEPT], then run:\", file=sys.stderr)\n        print(\"  expert-build accept-beliefs\", file=sys.stderr)\n        return False\n\n    proposals_path = Path(\"proposed-beliefs.md\")\n    if proposals_path.exists():\n        from .propose import auto_accept_proposals\n        auto_accept_proposals(str(proposals_path))\n        print(\"Auto-accepted all proposed beliefs\", file=sys.stderr)\n\n        accept_args = SimpleNamespace(file=\"proposed-beliefs.md\")\n        cmd_accept_beliefs(accept_args)\n\n    return True"
  },
  "work_dir_fixture": {
    "function": "work_dir",
    "file": "tests/test_pipeline.py",
    "start_line": 24,
    "end_line": 31,
    "source": "@pytest.fixture\ndef work_dir(tmp_path, monkeypatch):\n    \"\"\"Set working directory to tmp_path for isolated pipeline runs.\"\"\"\n    monkeypatch.chdir(tmp_path)\n    (tmp_path / \"sources\").mkdir()\n    (tmp_path / \"entries\").mkdir()\n    (tmp_path / \"reasons.db\").touch()\n    return tmp_path"
  },
  "state_file_usages": {
    "symbol": "STATE_FILE",
    "usages": [
      {
        "file": "tests/test_pipeline.py",
        "line": 18,
        "text": "STATE_FILE,"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 13,
        "text": "STATE_FILE = Path(PROJECT_DIR) / \"pipeline-state.json\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 54,
        "text": "if not STATE_FILE.exists():"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 56,
        "text": "return json.loads(STATE_FILE.read_text())"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 61,
        "text": "STATE_FILE.parent.mkdir(parents=True, exist_ok=True)"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 62,
        "text": "STATE_FILE.write_text(json.dumps(state, indent=2) + \"\\n\")"
      }
    ],
    "production_usages": [
      {
        "file": "expert_build/pipeline.py",
        "line": 13,
        "text": "STATE_FILE = Path(PROJECT_DIR) / \"pipeline-state.json\""
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 54,
        "text": "if not STATE_FILE.exists():"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 56,
        "text": "return json.loads(STATE_FILE.read_text())"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 61,
        "text": "STATE_FILE.parent.mkdir(parents=True, exist_ok=True)"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 62,
        "text": "STATE_FILE.write_text(json.dumps(state, indent=2) + \"\\n\")"
      }
    ],
    "test_usages": [
      {
        "file": "tests/test_pipeline.py",
        "line": 18,
        "text": "STATE_FILE,"
      }
    ],
    "production_count": 5,
    "test_count": 1,
    "total_count": 6
  },
  "pipeline_imports": {
    "file": "expert_build/pipeline.py",
    "imports": [
      "json",
      "sys",
      "traceback"
    ],
    "from_imports": [
      {
        "module": "datetime",
        "names": [
          "datetime",
          "timezone"
        ]
      },
      {
        "module": "pathlib",
        "names": [
          "Path"
        ]
      },
      {
        "module": "types",
        "names": [
          "SimpleNamespace"
        ]
      },
      {
        "module": "llm",
        "names": [
          "check_model_available",
          "invoke_sync"
        ]
      },
      {
        "module": "propose",
        "names": [
          "PROJECT_DIR",
          "REASONS_DB"
        ]
      }
    ],
    "import_section": "\"\"\"End-to-end EEM construction pipeline.\"\"\"\n\nimport json\nimport sys\nimport traceback\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom types import SimpleNamespace\n\nfrom .llm import check_model_available, invoke_sync\nfrom .propose import PROJECT_DIR, REASONS_DB\n\nSTATE_FILE = Path(PROJECT_DIR) / \"pipeline-state.json\"\n\nSTAGE_NAMES = {\n    1: \"ingest\",\n    2: \"summarize\",\n    3: \"extract\",\n    4: \"derive\",\n    5: \"review\",\n    6: \"repair\",\n    7: \"deduplicate\",\n    8: \"export\",\n}\n\n\ndef _now():\n    return datetime.now(timezone.utc).isoformat(timespec=\"seconds\")\n\n\ndef _init_state(args):\n    state = {\n        \"started_at\": _now(),\n        \"updated_at\": _now(),\n        \"status\": \"running\",\n        \"current_stage\": None,\n        \"current_cycle\": None,\n        \"args\": {\n            \"url\": getattr(args, \"url\", None),\n            \"model\": args.model,\n            \"rounds\": args.rounds,\n            \"domain\": getattr(args, \"domain\", None),\n        },\n        \"stages\": {\n            f\"{n}_{name}\": {\"status\": \"pending\"}\n            for n, name in STAGE_NAMES.items()\n        },\n    }\n    _save_state(state)\n    return state\n\n\ndef _load_state():\n    if not STATE_FILE.exists():\n        return None\n    return json.loads(STATE_FILE.read_text())\n\n\ndef _save_state(state):\n    state[\"updated_at\"] = _now()\n    STATE_FILE.parent.mkdir(parents=True, exist_ok=True)\n    STATE_FILE.write_text(json.dumps(state, indent=2) + \"\\n\")\n\n\ndef _mark_stage(state, stage_num, status, **meta):\n    key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\"\n    state[\"stages\"][key][\"status\"] = status\n    if status == \"running\":\n        state[\"current_stage\"] = stage_num\n    elif status == \"completed\":\n        state[\"stages\"][key][\"completed_at\"] = _now()\n    state[\"stages\"][key].update(meta)\n    _save_state(state)\n\n\ndef _stage_completed(state, stage_num):\n    key = f\"{stage_num}_{STAGE_NAMES[stage_num]}\"\n    return state[\"stages\"][key][\"status\"] == \"completed\"\n\n\ndef _banner(stage_num, total, name):\n    print(f\"\\n{'=' * 50}\", file=sys.stderr)\n    print(f\"  Stage {stage_num}/{total}: {name}\", file=sys.stderr)\n    print(f\"{'=' * 50}\\n\", file=sys.stderr)\n\n\ndef _stage_ingest(args):\n    \"\"\"Stage 1: Fetch docs or chunk PDFs into sources/.\"\"\""
  },
  "cmd_pipeline_callers": {
    "symbol": "cmd_pipeline",
    "production_callers": [
      {
        "file": "expert_build/cli.py",
        "line": 137,
        "text": "\"pipeline\": lambda a: _lazy(\"pipeline\", \"cmd_pipeline\")(a),",
        "context_function": "main",
        "context_snippet": "   134:         \"accept-beliefs\": lambda a: _lazy(\"propose\", \"cmd_accept_beliefs\")(a),\n   135:         \"cert-coverage\": lambda a: _lazy(\"coverage\", \"cmd_cert_coverage\")(a),\n   136:         \"exam\": lambda a: _lazy(\"exam\", \"cmd_exam\")(a),\n>> 137:         \"pipeline\": lambda a: _lazy(\"pipeline\", \"cmd_pipeline\")(a),\n   138:         \"status\": lambda a: _lazy(\"init_cmd\", \"cmd_status\")(a),\n   139:         \"install-skill\": lambda a: _lazy(\"init_cmd\", \"cmd_install_skill\")(a),\n   140:     }"
      },
      {
        "file": "expert_build/pipeline.py",
        "line": 312,
        "text": "def cmd_pipeline(args):",
        "context_function": "_stage_export",
        "context_snippet": "   309:     print(f\"\\nFinal: {in_count} IN / {total} total beliefs\", file=sys.stderr)\n   310: \n   311: \n>> 312: def cmd_pipeline(args):\n   313:     \"\"\"Run end-to-end EEM construction pipeline.\"\"\"\n   314:     from .caffeinate import hold as _caffeinate\n   315:     _caffeinate()"
      }
    ],
    "test_callers": [
      {
        "file": "tests/test_pipeline.py",
        "line": 11,
        "text": "cmd_pipeline,",
        "context_function": null,
        "context_snippet": "   8: import pytest\n   9: \n   10: from expert_build.pipeline import (\n>> 11:     cmd_pipeline,\n   12:     _stage_ingest,\n   13:     _stage_extract,\n   14:     _stage_derive,"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 257,
        "text": "cmd_pipeline(args)",
        "context_function": "test_model_not_available_exits",
        "context_snippet": "   254:         args = make_pipeline_args(model=\"nonexistent\")\n   255:         with patch(\"expert_build.llm.check_model_available\", return_value=False), \\\n   256:              pytest.raises(SystemExit):\n>> 257:             cmd_pipeline(args)\n   258: \n   259:     def test_converges_early_on_zero_invalids_and_zero_added(self, work_dir, capsys):\n   260:         args = make_pipeline_args(rounds=3, url=None, pdf=None)"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 271,
        "text": "cmd_pipeline(args)",
        "context_function": "test_converges_early_on_zero_invalids_and_zero_added",
        "context_snippet": "   268:              patch(\"expert_build.pipeline._stage_deduplicate\"), \\\n   269:              patch(\"expert_build.pipeline._stage_export\"), \\\n   270:              patch(\"expert_build.caffeinate.hold\"):\n>> 271:             cmd_pipeline(args)\n   272: \n   273:         captured = capsys.readouterr()\n   274:         assert \"Converged after 1 cycle\" in captured.err"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 291,
        "text": "cmd_pipeline(args)",
        "context_function": "test_runs_all_rounds_without_convergence",
        "context_snippet": "   288:              patch(\"expert_build.pipeline._stage_deduplicate\"), \\\n   289:              patch(\"expert_build.pipeline._stage_export\"), \\\n   290:              patch(\"expert_build.caffeinate.hold\"):\n>> 291:             cmd_pipeline(args)\n   292: \n   293:         captured = capsys.readouterr()\n   294:         assert \"Converged\" not in captured.err"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 306,
        "text": "cmd_pipeline(args)",
        "context_function": "test_no_auto_accept_stops_early",
        "context_snippet": "   303:              patch(\"expert_build.pipeline._stage_derive\") as mock_derive, \\\n   304:              patch(\"expert_build.pipeline._stage_export\") as mock_export, \\\n   305:              patch(\"expert_build.caffeinate.hold\"):\n>> 306:             cmd_pipeline(args)\n   307: \n   308:         assert not mock_derive.called\n   309:         assert not mock_export.called"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 327,
        "text": "cmd_pipeline(args)",
        "context_function": "test_state_file_created_on_run",
        "context_snippet": "   324:              patch(\"expert_build.pipeline._stage_deduplicate\"), \\\n   325:              patch(\"expert_build.pipeline._stage_export\"), \\\n   326:              patch(\"expert_build.caffeinate.hold\"):\n>> 327:             cmd_pipeline(args)\n   328: \n   329:         state = _load_state()\n   330:         assert state is not None"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 342,
        "text": "cmd_pipeline(args)",
        "context_function": "test_state_records_failure",
        "context_snippet": "   339:                    side_effect=RuntimeError(\"LLM exploded\")), \\\n   340:              patch(\"expert_build.caffeinate.hold\"), \\\n   341:              pytest.raises(RuntimeError, match=\"LLM exploded\"):\n>> 342:             cmd_pipeline(args)\n   343: \n   344:         state = _load_state()\n   345:         assert state[\"status\"] == \"failed\""
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 360,
        "text": "cmd_pipeline(args)",
        "context_function": "test_resume_skips_completed_stages",
        "context_snippet": "   357:                    side_effect=RuntimeError(\"crash\")), \\\n   358:              patch(\"expert_build.caffeinate.hold\"), \\\n   359:              pytest.raises(RuntimeError):\n>> 360:             cmd_pipeline(args)\n   361: \n   362:         state = _load_state()\n   363:         assert state[\"stages\"][\"1_ingest\"][\"status\"] == \"completed\""
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 378,
        "text": "cmd_pipeline(resume_args)",
        "context_function": "test_resume_skips_completed_stages",
        "context_snippet": "   375:              patch(\"expert_build.pipeline._stage_deduplicate\"), \\\n   376:              patch(\"expert_build.pipeline._stage_export\"), \\\n   377:              patch(\"expert_build.caffeinate.hold\"):\n>> 378:             cmd_pipeline(resume_args)\n   379: \n   380:         assert not mock_summarize.called\n   381:         captured = capsys.readouterr()"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 393,
        "text": "cmd_pipeline(args)",
        "context_function": "test_resume_without_state_exits",
        "context_snippet": "   390:         with patch(\"expert_build.llm.check_model_available\", return_value=True), \\\n   391:              patch(\"expert_build.caffeinate.hold\"), \\\n   392:              pytest.raises(SystemExit):\n>> 393:             cmd_pipeline(args)\n   394: \n   395:     def test_no_auto_accept_sets_paused(self, work_dir):\n   396:         args = make_pipeline_args(no_auto_accept=True, url=None, pdf=None)"
      },
      {
        "file": "tests/test_pipeline.py",
        "line": 402,
        "text": "cmd_pipeline(args)",
        "context_function": "test_no_auto_accept_sets_paused",
        "context_snippet": "   399:              patch(\"expert_build.pipeline._stage_summarize\"), \\\n   400:              patch(\"expert_build.pipeline._stage_extract\", return_value=False), \\\n   401:              patch(\"expert_build.caffeinate.hold\"):\n>> 402:             cmd_pipeline(args)\n   403: \n   404:         state = _load_state()\n   405:         assert state[\"status\"] == \"paused\""
      }
    ],
    "production_count": 2,
    "test_count": 11,
    "total_count": 13
  },
  "pipeline_tests": {
    "source_file": "expert_build/pipeline.py",
    "test_files": [
      {
        "path": "tests/test_pipeline.py",
        "exists": true,
        "line_count": 406
      },
      {
        "path": "tests/test_pipelinepy",
        "exists": false
      }
    ],
    "test_count": 2
  },
  "stage_ingest_raises": {
    "function": "_stage_ingest",
    "file": "expert_build/pipeline.py",
    "explicit_raises": [],
    "calls": [
      "print",
      "cmd_fetch_docs",
      "cmd_chunk_pdf",
      "SimpleNamespace"
    ]
  },
  "propose_project_dir": {
    "error": "No function found at 'PROJECT_DIR'",
    "file": "expert_build/propose.py"
  }
}
```

Use these results to inform your review. Do not request the same observations again.


## Instructions

For each significant change (new file, modified function, etc.), provide a structured verdict.

Use this exact format for each change:

### <file_path or file_path:function_name>
VERDICT: PASS | CONCERN | BLOCK
CORRECTNESS: VALID | QUESTIONABLE | BROKEN
SPEC_COMPLIANCE: MEETS | PARTIAL | VIOLATES | N/A
ISSUE_COMPLIANCE: ADDRESSES | PARTIAL | UNRELATED | N/A
BELIEF_COMPLIANCE: CONSISTENT | VIOLATES | N/A
TEST_COVERAGE: COVERED | PARTIAL | UNTESTED
INTEGRATION: WIRED | PARTIAL | MISSING
REASONING: <brief explanation of your assessment>
---

## Review Criteria

1. **CORRECTNESS**: Does the code do what it claims? Is the logic sound?
   - VALID: Logic is correct, no bugs apparent
   - QUESTIONABLE: Logic may have edge cases or unclear behavior
   - BROKEN: Clear bugs or incorrect behavior

2. **SPEC_COMPLIANCE**: Does it meet MUST requirements from the spec?
   - MEETS: All relevant spec requirements satisfied
   - PARTIAL: Some requirements met, others missing or incomplete
   - VIOLATES: Contradicts spec requirements
   - N/A: No spec provided or not applicable

3. **ISSUE_COMPLIANCE** (only when an issue is provided): Do the changes address the problem or feature described in the issue?
   - ADDRESSES: Changes directly solve the issue's stated problem or implement the requested feature
   - PARTIAL: Changes partially address the issue but leave some aspects unresolved
   - UNRELATED: Changes do not appear related to the issue
   - N/A: No issue provided

4. **TEST_COVERAGE**: Are there tests for the new/changed code?
   - COVERED: Tests exist and cover the changes
   - PARTIAL: Some tests exist but coverage is incomplete
   - UNTESTED: No tests for the changes

5. **INTEGRATION**: Are callers updated? Is the feature usable end-to-end?
   - WIRED: Feature is fully integrated and usable
   - PARTIAL: Interface exists but callers not updated, or integration incomplete
   - MISSING: No integration with existing code

6. **BELIEF_COMPLIANCE** (only when beliefs are provided): Do the changes respect known architectural invariants, contracts, and rules?
   - CONSISTENT: Changes align with or reinforce known beliefs
   - VIOLATES: Changes contradict a specific belief — cite the belief ID
   - N/A: No beliefs provided or no relevant beliefs apply

## Verdict Guidelines

- **BLOCK**: Security issues, broken functionality, spec violations, or missing critical integration
- **CONCERN**: Missing tests, partial integration, questionable patterns, or unclear logic
- **PASS**: Correct, tested, well-integrated code

## Important

- Full function bodies for modified functions may be available in the observations section — use them to verify the complete logic, not just the diff hunks
- Related test files (prefixed with ``related_test:``) may be included in observations — check whether existing test assertions still match modified return types, signatures, or behavior. Flag any test that would break due to the changes
- If duplicate test coverage is detected (multiple test files covering the same source), note it in your review
- Focus on actual issues, not style preferences
- If a method signature is added but callers aren't updated, that's PARTIAL integration
- Be specific in reasoning - reference line numbers or function names
- When in doubt, use CONCERN rather than PASS

## Self-Review

After completing your review, add a brief self-assessment:

### SELF_REVIEW
LIMITATIONS: <what context were you missing that affected review quality?>
---

Examples of limitations:
- "Could not see full class to verify no other methods access the modified field"
- "Test file not included in diff - cannot verify coverage claims"
- "Spec file referenced but not provided"


## Feature Requests

If this review tool could be improved to help you do a better job, suggest features:

### FEATURE_REQUESTS
- <suggestion 1>
- <suggestion 2>
---

Examples:
- "Include full file context for modified functions, not just diff hunks"
- "Show callers of modified methods to verify integration"
- "Include test file alongside implementation changes"

Only include this section if you have specific suggestions. Skip if none.
