You are a senior code reviewer preparing to review code changes.

## Code Changes

```diff
diff --git a/expert_build/cli.py b/expert_build/cli.py
index 8dcd7a6..00ea9ad 100644
--- a/expert_build/cli.py
+++ b/expert_build/cli.py
@@ -108,6 +108,8 @@ def main():
     pipe_p.add_argument("--timeout", type=int, default=600,
                         help="LLM timeout in seconds (default: 600)")
     pipe_p.add_argument("--domain", help="Domain description for derive context")
+    pipe_p.add_argument("--resume", action="store_true",
+                        help="Resume from last saved pipeline state")
 
     # -- status --
     sub.add_parser("status", help="Show pipeline progress")
diff --git a/expert_build/pipeline.py b/expert_build/pipeline.py
index 82c7966..f5504db 100644
--- a/expert_build/pipeline.py
+++ b/expert_build/pipeline.py
@@ -2,11 +2,85 @@
 
 import json
 import sys
+import traceback
+from datetime import datetime, timezone
 from pathlib import Path
 from types import SimpleNamespace
 
 from .llm import check_model_available, invoke_sync
-from .propose import REASONS_DB
+from .propose import PROJECT_DIR, REASONS_DB
+
+STATE_FILE = Path(PROJECT_DIR) / "pipeline-state.json"
+
+STAGE_NAMES = {
+    1: "ingest",
+    2: "summarize",
+    3: "extract",
+    4: "derive",
+    5: "review",
+    6: "repair",
+    7: "deduplicate",
+    8: "export",
+}
+
+
+def _now():
+    return datetime.now(timezone.utc).isoformat(timespec="seconds")
+
+
+def _init_state(args):
+    state = {
+        "started_at": _now(),
+        "updated_at": _now(),
+        "status": "running",
+        "current_stage": None,
+        "current_cycle": None,
+        "args": {
+            "url": getattr(args, "url", None),
+            "model": args.model,
+            "rounds": args.rounds,
+            "domain": getattr(args, "domain", None),
+        },
+        "stages": {
+            f"{n}_{name}": {"status": "pending"}
+            for n, name in STAGE_NAMES.items()
+        },
+    }
+    _save_state(state)
+    return state
+
+
+def _load_state():
+    if not STATE_FILE.exists():
+        return None
+    try:
+        return json.loads(STATE_FILE.read_text())
+    except (json.JSONDecodeError, ValueError):
+        print(f"WARNING: corrupt state file {STATE_FILE}, ignoring",
+              file=sys.stderr)
+        return None
+
+
+def _save_state(state):
+    state["updated_at"] = _now()
+    STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
+    STATE_FILE.write_text(json.dumps(state, indent=2) + "\n")
+
+
+def _mark_stage(state, stage_num, status, **meta):
+    key = f"{stage_num}_{STAGE_NAMES[stage_num]}"
+    state["stages"][key]["status"] = status
+    if status == "running":
+        state["current_stage"] = stage_num
+    elif status == "completed":
+        state["stages"][key]["completed_at"] = _now()
+    state["stages"][key].update(meta)
+    _save_state(state)
+
+
+def _stage_completed(state, stage_num):
+    key = f"{stage_num}_{STAGE_NAMES[stage_num]}"
+    return state["stages"][key]["status"] == "completed"
 
 
 def _banner(stage_num, total, name):
@@ -249,52 +323,119 @@ def cmd_pipeline(args):
         print(f"Model not available: {args.model}", file=sys.stderr)
         sys.exit(1)
 
-    total_stages = 8
-    has_sources = args.url or args.pdf
-
-    # Stage 1: Ingest
-    if has_sources:
-        _banner(1, total_stages, "INGEST")
-        _stage_ingest(args)
+    resume = getattr(args, "resume", False)
+    if resume:
+        state = _load_state()
+        if not state:
+            print("No pipeline state to resume. Run without --resume first.",
+                  file=sys.stderr)
+            sys.exit(1)
+        if state["status"] == "completed":
+            print("Pipeline already completed. Run without --resume to start fresh.",
+                  file=sys.stderr)
+            return
+        print("Resuming pipeline from state file", file=sys.stderr)
+        state["status"] = "running"
+        _save_state(state)
     else:
-        print("No --url or --pdf provided, skipping ingest", file=sys.stderr)
-
-    # Stage 2: Summarize
-    _banner(2, total_stages, "SUMMARIZE")
-    _stage_summarize(args)
+        state = _init_state(args)
 
-    # Stage 3: Extract
-    _banner(3, total_stages, "EXTRACT")
-    should_continue = _stage_extract(args)
-    if not should_continue:
-        return
-
-    # Stages 4-7: Derive → Review → Repair → Deduplicate (convergence loop)
-    for cycle in range(1, args.rounds + 1):
-        label = f"cycle {cycle}/{args.rounds}"
-
-        _banner(4, total_stages, f"DERIVE ({label})")
-        added = _stage_derive(args, round_label=label)
-
-        _banner(5, total_stages, f"REVIEW ({label})")
-        review_result = _stage_review(args, round_label=label)
-
-        invalid_count = review_result.get("invalid", 0)
-
-        if invalid_count > 0:
-            _banner(6, total_stages, f"REPAIR ({label})")
-            _stage_repair(args, review_result, round_label=label)
-
-        _banner(7, total_stages, f"DEDUPLICATE ({label})")
-        _stage_deduplicate(args, round_label=label)
-
-        if invalid_count == 0 and added == 0:
-            print(f"\nConverged after {cycle} cycles "
-                  f"(0 invalids, 0 new derivations)", file=sys.stderr)
-            break
-
-    # Stage 8: Export
-    _banner(8, total_stages, "EXPORT")
-    _stage_export(args)
+    total_stages = 8
+    has_sources = args.url or args.pdf
 
-    print("\nPipeline complete.", file=sys.stderr)
+    try:
+        # Stage 1: Ingest
+        if not _stage_completed(state, 1):
+            if has_sources:
+                _banner(1, total_stages, "INGEST")
+                _mark_stage(state, 1, "running")
+                _stage_ingest(args)
+                _mark_stage(state, 1, "completed")
+            else:
+                print("No --url or --pdf provided, skipping ingest", file=sys.stderr)
+                _mark_stage(state, 1, "completed", skipped=True)
+        else:
+            print("Stage 1 (INGEST) already completed, skipping", file=sys.stderr)
+
+        # Stage 2: Summarize
+        if not _stage_completed(state, 2):
+            _banner(2, total_stages, "SUMMARIZE")
+            _mark_stage(state, 2, "running")
+            _stage_summarize(args)
+            _mark_stage(state, 2, "completed")
+        else:
+            print("Stage 2 (SUMMARIZE) already completed, skipping", file=sys.stderr)
+
+        # Stage 3: Extract
+        if not _stage_completed(state, 3):
+            _banner(3, total_stages, "EXTRACT")
+            _mark_stage(state, 3, "running")
+            should_continue = _stage_extract(args)
+            _mark_stage(state, 3, "completed")
+            if not should_continue:
+                state["status"] = "paused"
+                _save_state(state)
+                return
+        else:
+            print("Stage 3 (EXTRACT) already completed, skipping", file=sys.stderr)
+
+        # Stages 4-7: Derive → Review → Repair → Deduplicate (convergence loop)
+        if not state.get("loop_completed"):
+            start_cycle = state.get("current_cycle") or 1
+            for cycle in range(start_cycle, args.rounds + 1):
+                label = f"cycle {cycle}/{args.rounds}"
+                state["current_cycle"] = cycle
+                _save_state(state)
+
+                _banner(4, total_stages, f"DERIVE ({label})")
+                _mark_stage(state, 4, "running", cycle=cycle)
+                added = _stage_derive(args, round_label=label)
+                _mark_stage(state, 4, "completed", cycle=cycle, added=added)
+
+                _banner(5, total_stages, f"REVIEW ({label})")
+                _mark_stage(state, 5, "running", cycle=cycle)
+                review_result = _stage_review(args, round_label=label)
+                invalid_count = review_result.get("invalid", 0)
+                _mark_stage(state, 5, "completed", cycle=cycle,
+                            reviewed=review_result.get("reviewed", 0),
+                            invalid=invalid_count)
+
+                if invalid_count > 0:
+                    _banner(6, total_stages, f"REPAIR ({label})")
+                    _mark_stage(state, 6, "running", cycle=cycle)
+                    _stage_repair(args, review_result, round_label=label)
+                    _mark_stage(state, 6, "completed", cycle=cycle)
+                else:
+                    _mark_stage(state, 6, "completed", cycle=cycle, skipped=True)
+
+                _banner(7, total_stages, f"DEDUPLICATE ({label})")
+                _mark_stage(state, 7, "running", cycle=cycle)
+                _stage_deduplicate(args, round_label=label)
+                _mark_stage(state, 7, "completed", cycle=cycle)
+
+                if invalid_count == 0 and added == 0:
+                    print(f"\nConverged after {cycle} cycles "
+                          f"(0 invalids, 0 new derivations)", file=sys.stderr)
+                    break
+
+            state["loop_completed"] = True
+            _save_state(state)
+        else:
+            print("Convergence loop already completed, skipping", file=sys.stderr)
+
+        # Stage 8: Export
+        _banner(8, total_stages, "EXPORT")
+        _mark_stage(state, 8, "running")
+        _stage_export(args)
+        _mark_stage(state, 8, "completed")
+
+        state["status"] = "completed"
+        _save_state(state)
+        print("\nPipeline complete.", file=sys.stderr)
+
+    except Exception as e:
+        state["status"] = "failed"
+        state["error"] = str(e)
+        state["error_traceback"] = traceback.format_exc()
+        _save_state(state)
+        raise
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index fd208ba..6efbd5d 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -1,5 +1,6 @@
 """Tests for the pipeline command."""
 
+import json
 import types
 from pathlib import Path
 from unittest.mock import patch
@@ -14,6 +15,9 @@
     _stage_review,
     _stage_repair,
     _stage_deduplicate,
+    _load_state,
+    _save_state,
+    STATE_FILE,
 )
 from expert_build.propose import auto_accept_proposals
 
@@ -41,6 +45,7 @@ def make_pipeline_args(**overrides):
         depth=2,
         timeout=600,
         domain="Test domain",
+        resume=False,
     )
     defaults.update(overrides)
     return types.SimpleNamespace(**defaults)
@@ -303,3 +308,140 @@ def test_no_auto_accept_stops_early(self, work_dir, capsys):
 
         assert not mock_derive.called
         assert not mock_export.called
+
+
+# --- Pipeline State ---
+
+class TestPipelineState:
+    def test_state_file_created_on_run(self, work_dir):
+        args = make_pipeline_args(rounds=1, url=None, pdf=None)
+        review_result = {"reviewed": 0, "invalid": 0, "results": []}
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract", return_value=True), \
+             patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"), \
+             patch("expert_build.pipeline._stage_export"), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state is not None
+        assert state["status"] == "completed"
+        assert state["stages"]["8_export"]["status"] == "completed"
+
+    def test_state_records_failure(self, work_dir):
+        args = make_pipeline_args(url=None, pdf=None)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize",
+                   side_effect=RuntimeError("LLM exploded")), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(RuntimeError, match="LLM exploded"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["status"] == "failed"
+        assert "LLM exploded" in state["error"]
+        assert state["stages"]["2_summarize"]["status"] == "running"
+
+    def test_resume_skips_completed_stages(self, work_dir, capsys):
+        args = make_pipeline_args(rounds=1, url=None, pdf=None)
+        review_result = {"reviewed": 0, "invalid": 0, "results": []}
+
+        # First run: complete through summarize, then fail at extract
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract",
+                   side_effect=RuntimeError("crash")), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(RuntimeError):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["stages"]["1_ingest"]["status"] == "completed"
+        assert state["stages"]["2_summarize"]["status"] == "completed"
+        assert state["stages"]["3_extract"]["status"] == "running"
+
+        # Resume: should skip ingest and summarize
+        resume_args = make_pipeline_args(rounds=1, url=None, pdf=None, resume=True)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize") as mock_summarize, \
+             patch("expert_build.pipeline._stage_extract", return_value=True), \
+             patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"), \
+             patch("expert_build.pipeline._stage_export"), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(resume_args)
+
+        assert not mock_summarize.called
+        captured = capsys.readouterr()
+        assert "already completed, skipping" in captured.err
+
+        state = _load_state()
+        assert state["status"] == "completed"
+
+    def test_resume_without_state_exits(self, work_dir):
+        args = make_pipeline_args(resume=True)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(SystemExit):
+            cmd_pipeline(args)
+
+    def test_no_auto_accept_sets_paused(self, work_dir):
+        args = make_pipeline_args(no_auto_accept=True, url=None, pdf=None)
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract", return_value=False), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["status"] == "paused"
+        assert state["stages"]["3_extract"]["status"] == "completed"
+
+    def test_resume_completed_pipeline_returns_early(self, work_dir, capsys):
+        """Resuming an already-completed pipeline does nothing."""
+        args = make_pipeline_args(rounds=1, url=None, pdf=None)
+        review_result = {"reviewed": 0, "invalid": 0, "results": []}
+
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_summarize"), \
+             patch("expert_build.pipeline._stage_extract", return_value=True), \
+             patch("expert_build.pipeline._stage_derive", return_value=0), \
+             patch("expert_build.pipeline._stage_review", return_value=review_result), \
+             patch("expert_build.pipeline._stage_deduplicate"), \
+             patch("expert_build.pipeline._stage_export"), \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(args)
+
+        state = _load_state()
+        assert state["status"] == "completed"
+
+        # Now resume — should return early
+        resume_args = make_pipeline_args(resume=True)
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.pipeline._stage_export") as mock_export, \
+             patch("expert_build.caffeinate.hold"):
+            cmd_pipeline(resume_args)
+
+        assert not mock_export.called
+        captured = capsys.readouterr()
+        assert "already completed" in captured.err
+
+    def test_corrupt_state_file_handled(self, work_dir):
+        """Corrupt state file is treated as missing."""
+        STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
+        STATE_FILE.write_text("{truncated")
+
+        args = make_pipeline_args(resume=True)
+        with patch("expert_build.llm.check_model_available", return_value=True), \
+             patch("expert_build.caffeinate.hold"), \
+             pytest.raises(SystemExit):
+            cmd_pipeline(args)

```

## Your Task

Analyze the diff and identify what additional information you need to render confident verdicts.
Do NOT render verdicts yet. Only request observations.

## Available Observation Tools

| Tool | Purpose | When to use |
|------|---------|-------------|
| `exception_hierarchy` | Show exception MRO and subclasses | Retry logic, exception handling |
| `raises_analysis` | What exceptions a function raises | New function calls, error paths |
| `call_graph` | What a function calls | Impact analysis |
| `find_usages` | Where a symbol is used (with prod/test split) | Quick integration lookup |
| `find_callers` | Caller analysis with prod/test split and calling context | Method signature changes, return type changes, constructor modifications, integration verification |
| `test_coverage` | Find tests for a file (uses coverage-map if available) | Test coverage claims |
| `coverage_map_tests` | Find tests covering a file (from coverage-map.json) | Precise test coverage from actual execution |
| `coverage_map_files` | Find files covered by tests matching a pattern | Impact analysis for test changes |
| `function_body` | Full source of a function/method | Need complete function context beyond diff hunks |
| `file_imports` | Extract imports from a file | Verify import changes, check dependencies |
| `project_dependencies` | Get pyproject.toml/requirements.txt | Verify new imports have dependencies |
| `related_test_files` | Find test files for a source file | Discover tests by naming, imports, and coverage map |
| `class_hierarchy` | Show base classes and their `__init__` signatures | Class changes its parent, modifies `__init__`, or uses `super()` |
| `symbol_migration` | Check if a rename is complete across the repo | Symbol renamed in diff — verify old name is fully removed |
| `generator_info` | Report whether a function uses `yield` | Function might be a generator — affects return value semantics |

## What to Look For

1. **Exception handling**: Any `retry_if_exception_type`, `except`, or exception class references
2. **New dependencies**: Calls to external libraries where you don't know the error behavior
3. **Behavioral changes**: Modified logic where you need to verify callers/callees
4. **Test claims**: References to tests you can't see in the diff
5. **Inheritance changes**: Class definition changes, new base classes, `super()` calls
6. **Renames**: Symbols that appear to have been renamed in the diff
7. **Factory methods**: Calls to `@classmethod` / `@staticmethod` constructors (e.g. `Result.error(...)`) — request `function_body` to see their implementation

## Output Format

Output a JSON array of observation requests:

```json
[
  {"name": "descriptive_name", "tool": "tool_name", "params": {"param": "value"}},
  ...
]
```

If you don't need any observations (simple changes, all context is in the diff), output:

```json
[]
```

## Examples

For a diff containing `retry_if_exception_type((OSError, httpx.TransportError))`:
```json
[
  {"name": "oserror_subclasses", "tool": "exception_hierarchy", "params": {"class_name": "builtins.OSError"}},
  {"name": "transport_errors", "tool": "exception_hierarchy", "params": {"class_name": "httpx.TransportError"}}
]
```

For a diff adding a new function that calls `oauth_client.get_access_token()`:
```json
[
  {"name": "oauth_exceptions", "tool": "raises_analysis", "params": {"file_path": "src/auth/oauth.py", "function_name": "get_access_token"}}
]
```

For a diff modifying a method but you need the full function to verify:
```json
[
  {"name": "full_getattr", "tool": "function_body", "params": {"file_path": "src/proxy.py", "function_name": "__getattr__"}}
]
```

For a diff changing a method signature or return type (verify all callers):
```json
[
  {"name": "handle_request_callers", "tool": "find_callers", "params": {"symbol": "handle_request"}}
]
```

For a diff adding new imports (e.g., `import httpx`):
```json
[
  {"name": "file_imports", "tool": "file_imports", "params": {"file_path": "src/client.py"}},
  {"name": "project_deps", "tool": "project_dependencies", "params": {}}
]
```

For a diff calling a factory method like `ModuleResult.error_result(msg)`:
```json
[
  {"name": "error_result_body", "tool": "function_body", "params": {"file_path": "src/models.py", "function_name": "error_result"}}
]
```

For a diff where a class changes its parent class:
```json
[
  {"name": "client_hierarchy", "tool": "class_hierarchy", "params": {"class_name": "MyClient", "file_path": "src/client.py"}}
]
```

For a diff that renames a symbol (e.g., `OldClient` to `NewClient`):
```json
[
  {"name": "client_rename", "tool": "symbol_migration", "params": {"old_name": "OldClient", "new_name": "NewClient"}}
]
```

For a diff modifying a function that might be a generator:
```json
[
  {"name": "process_gen", "tool": "generator_info", "params": {"file_path": "src/pipeline.py", "function_name": "process_items"}}
]
```

Now analyze the diff above and output your observation requests as JSON:
