# devfork focus pack (codex)

You are operating in a low-cognitive-load mode.
Prioritize small, verifiable steps and explicit outputs.

Goal snapshot:
- goal_id: V1.1
- title: Improve dry-run mode
- phase: legacy
- uncertainty: unknown
- allowed_changes:
  - src/devf/core/auto.py
- must_pass_tests:
  - tests/test_auto.py

Execution checklist:
1. Implement minimal diff for the current goal.
2. Keep edits within allowed_changes (if configured).
3. Verify with: pytest
4. Summarize changed files + risk notes + next action.

---

Context:
<context_pack version="1">
  <task id="V1.1">Improve dry-run mode</task>
  <constraints>
    <must_pass>tests/test_auto.py</must_pass>
    <allowed_changes>src/devf/core/auto.py</allowed_changes>
  </constraints>
  <notes>dry-run은 파일을 수정하지 않으므로 dirty tree와 lock check를 건너뛰어야 함</notes>
  <evidence>
    <last_session_status>complete</last_session_status>
    <suggested_tests>
      <test>tests/test_auto.py</test>
      <test>tests/test_auto_hard_policy.py</test>
      <test>tests/test_auto_parallel.py</test>
      <test>tests/test_auto_quality.py</test>
      <test>tests/test_auto_replan.py</test>
      <test>tests/test_auto_risk_merge.py</test>
      <test>tests/test_evidence_bdd.py</test>
    </suggested_tests>
  </evidence>
  <target_files>
    <source path="src/devf/core/auto.py" lines="501">
"""Automation loop."""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import fnmatch
import os
import re
import sys
from pathlib import Path
import shlex
import subprocess
import threading
from typing import Iterable

import yaml

from devf.core.attempt import (
    AttemptLog,
    clear_attempts,
    load_attempts,
    save_attempt,
)
from devf.core.config import Config, load_config
from devf.core.contract import (
    AcceptanceContract,
    contract_prompt_lines,
    load_acceptance_contract,
    validate_forbidden_patterns,
    validate_required_patterns,
)
from devf.core.decision import load_decision_ticket
from devf.core.context import build_context
from devf.core.evidence import hash_text, new_run_id, write_evidence_row
from devf.core.errors import DevfError
from devf.core.feedback_infer import infer_and_store_feedback_notes
from devf.core.feedback_policy import load_feedback_policy
from devf.core.gate import run_gate
from devf.core.goals import (
    Goal,
    collect_goals,
    find_goal,
    load_goals,
    update_goal_fields,
    update_goal_status,
)
from devf.core.languages import (
    apply_pytest_reliability_flags,
    assertion_patterns as language_assertion_patterns,
    build_targeted_test_commands,
    collect_test_files,
    resolve_goal_languages,
    trivial_assertions as language_trivial_assertions,
)
from devf.core.immune_policy import evaluate_immune_changes
from devf.core.phase import load_phase_template, next_phase, parse_plan_output, regress_phase
from devf.core.policies import AutoPolicies, load_auto_policies
from devf.core.runner import GoalRunner, RunnerResult
from devf.core.retry_policy import (
    ADVANCE_ACTION,
    BLOCK_ACTION,
    RETRY_ACTION,
    decide_retry_action,
)
from devf.core.replan import InvalidationEvent, apply_post_goal_replan
from devf.core.risk_policy import compute_risk_score
from devf.core.runners.local import LocalRunner
from devf.core.runners.llm import LLMRunner
from devf.core.scheduler import build_execution_batches
from devf.core.session import generate_session_log, write_session_log
from devf.core.state_policy import decide_goal_state
from devf.core.triage import classify_failure
from devf.utils.codetools import complexity_check
from devf.utils.file_parser import parse_file_changes, apply_file_changes
from devf.utils.git import (
    commit_all,
    get_changed_files,
    get_head_commit,
    is_dirty,
    run_git,
    reset_hard,
    worktree_create,
    worktree_merge,
    worktree_remove,
)


@dataclass(frozen=True)
class Outcome:
    success: bool
    should_retry: bool
    classification: str
    reason: str | None = None


@dataclass(frozen=True)
class RedGateResult:
    passed: bool
    reason: str
    test_output: str = ""
    test_files: list[str] = field(default_factory=list)


@dataclass(frozen=True)
class PolicyDecision:
    policy_version: str
    failure_classification: str | None
    action_taken: str
    risk_score: int


_ROOT_LOCK = threading.RLock()


def run_auto(
    root: Path,
    goal_id: str | None,
    recursive: bool,
    dry_run: bool,
    explain: bool,
    tool_name: str | None,
    runner: GoalRunner | None = None,
    parallelism: int = 1,
) -&gt; int:
    config, warnings = load_config(root / ".ai" / "config.yaml")
    for warning in warnings:
        _log_warning(warning)

    goals = load_goals(root / ".ai" / "goals.yaml")
    selected = collect_goals(goals, goal_id, recursive)
    if not selected:
        raise DevfError("no active goals to run")
    batches = build_execution_batches(goals, selected)

    # dry-run: print prompt and exit, no lock or dirty check needed
    if dry_run:
        for goal in selected:
            if goal.spec_file:
                 print(f"BDD Mode: {goal.spec_file}")
            elif goal.phase:
                print(build_phase_prompt(root, config, goal, goal.phase, []))
            else:
                print(build_prompt(root, config, goal, []))
        return 0

    # Select runner
    # If explicit runner provided (e.g. from test), use it.
    # If config.roles is set, prefer LLMRunner.
    # Else fallback to LocalRunner.
    if runner is None:
        if config.roles.worker or config.roles.architect:
            runner = LLMRunner()
        else:
            runner = LocalRunner()

    run_id = new_run_id()
    policies = load_auto_policies(root)
    feedback_policy = load_feedback_policy(root)
    worker_count = max(1, parallelism)

    _acquire_lock(root)
    exit_code = 0
    try:
        has_failure = False
        cycle_count = 0
        no_progress_count = 0
        root_lock = _ROOT_LOCK
        stop_session = False

        for batch in batches:
            if stop_session:
                break

            for chunk in _chunked(batch, worker_count):
                prepared: list[tuple[Goal, Path, str]] = []
                for goal in chunk:
                    cycle_count += 1
                    if cycle_count &gt; config.circuit_breakers.max_cycles_per_session:
                        _log_warning("Circuit breaker: max cycles per session reached")
                        stop_session = True
                        break
                    with root_lock:
                        wt_root = worktree_create(root, goal.id)
                    base_commit = get_head_commit(wt_root)
                    prepared.append((goal, wt_root, base_commit))

                if not prepared:
                    continue

                results: list[tuple[Goal, bool, str | None]] = []
                if len(prepared) == 1:
                    goal, wt_root, base_commit = prepared[0]
                    ok, phase = _execute_goal_once(
                        wt_root,
                        root,
                        config,
                        goal,
                        runner,
                        tool_name,
                        explain,
                        base_commit,
                        run_id,
                        policies,
                        root_lock,
                    )
                    results.append((goal, ok, phase))
                else:
                    with ThreadPoolExecutor(max_workers=min(worker_count, len(prepared))) as executor:
                        future_map = {
                            executor.submit(
                                _execute_goal_once,
                                wt_root,
                                root,
                                config,
                                goal,
                                runner,
                                tool_name,
                                explain,
                                base_commit,
                                run_id,
                                policies,
                                root_lock,
                            ): goal
                            for goal, wt_root, base_commit in prepared
                        }
                        for future in as_completed(future_map):
                            goal = future_map[future]
                            ok, phase = future.result()
                            results.append((goal, ok, phase))

                goals_path = root / ".ai" / "goals.yaml"
                for goal, goal_ok, phase in results:
                    if not goal_ok:
                        no_progress_count += 1
                        _safe_worktree_remove(root, goal.id, root_lock)
                        if phase is None or phase == "gate":
                            _safe_update_goal_status(goals_path, goal.id, "blocked", root_lock)
                        has_failure = True
                    else:
                        no_progress_count = 0

                if no_progress_count &gt;= config.circuit_breakers.max_consecutive_no_progress:
                    _log_warning("Circuit breaker: max consecutive no-progress reached")
                    stop_session = True
                    break

        exit_code = 1 if has_failure else 0
    finally:
        _release_lock(root)

    if feedback_policy.enabled:
        try:
            infer_and_store_feedback_notes(root, run_id, feedback_policy)
        except Exception as exc:  # pragma: no cover - post-run feedback should be non-blocking
            _log_warning(f"failed to infer feedback notes: {exc}")
    return exit_code


def _execute_goal_once(
    wt_root: Path,
    root: Path,
    config: Config,
    goal: Goal,
    runner: GoalRunner,
    tool_name: str | None,
    explain: bool,
    base_commit: str,
    run_id: str,
    policies: AutoPolicies,
    root_lock: threading.RLock,
) -&gt; tuple[bool, str | None]:
    goals_path = root / ".ai" / "goals.yaml"
    phase = goal.phase
    goal_ok = False

    decision_ready, decision_reason = _validate_goal_decision_prerequisites(root, goal)
    if not decision_ready:
        reason = decision_reason or "decision prerequisites not satisfied"
        outcome = Outcome(
            success=False,
            should_retry=False,
            classification="decision-pending",
            reason=reason,
        )
        outcome, policy_decision = _apply_policy_decision(
            root,
            goal.id,
            phase,
            1,
            outcome,
            reason,
            wt_root,
            base_commit,
            config.max_retries,
            policies,
        )
        _record_evidence(
            root,
            run_id,
            goal.id,
            phase,
            1,
            outcome,
            wt_root,
            base_commit,
            reason,
            policy_decision=policy_decision,
        )
        _safe_update_goal_status(goals_path, goal.id, "blocked", root_lock)
        return False, phase

    if goal.spec_file:
        goal_ok = _run_bdd_goal(
            wt_root, root, config, goal, config.max_retries,
            runner, tool_name, explain, base_commit, run_id, policies, root_lock,
        )
    elif phase is None:
        goal_ok = _run_legacy_goal(
            wt_root, root, config, goal, config.max_retries,
            runner, tool_name, explain, base_commit, run_id, policies, root_lock,
        )
    elif phase == "gate":
        outcome, gate_output = evaluate_phase(wt_root, config, goal, "gate", base_commit)
        outcome, policy_decision = _apply_policy_decision(
            root,
            goal.id,
            "gate",
            1,
            outcome,
            gate_output,
            wt_root,
            base_commit,
            config.max_retries,
            policies,
        )
        _record_evidence(
            root, run_id, goal.id, "gate", 1, outcome, wt_root, base_commit, gate_output,
            policy_decision=policy_decision,
        )
        if outcome.success:
            nxt = next_phase("gate", phases=goal.phases)
            if nxt == "merge":
                goal_ok = _merge_goal_with_controls(
                    root=root,
                    wt_root=wt_root,
                    config=config,
                    goal=goal,
                    run_id=run_id,
                    attempt=1,
                    base_commit=base_commit,
                    policies=policies,
                    root_lock=root_lock,
                )
                if not goal_ok:
                    return False, phase
            else:
                _safe_update_goal_fields(goals_path, goal.id, {"phase": nxt}, root_lock)
            goal_ok = True
        else:
            _safe_update_goal_fields(goals_path, goal.id, {"phase": regress_phase("gate")}, root_lock)
            goal_ok = False
    elif phase == "merge":
        goal_ok = _merge_goal_with_controls(
            root=root,
            wt_root=wt_root,
            config=config,
            goal=goal,
            run_id=run_id,
            attempt=1,
            base_commit=base_commit,
            policies=policies,
            root_lock=root_lock,
        )
    else:
        goal_ok = _run_phased_goal(
            wt_root, root, config, goal, phase, config.max_retries,
            runner, tool_name, explain, base_commit, run_id, policies, root_lock,
        )

    return goal_ok, phase


def _chunked(goals: list[Goal], size: int) -&gt; list[list[Goal]]:
    chunks: list[list[Goal]] = []
    for i in range(0, len(goals), size):
        chunks.append(goals[i:i + size])
    return chunks


def _safe_update_goal_status(path: Path, goal_id: str, status: str, lock: threading.RLock) -&gt; None:
    with lock:
        update_goal_status(path, goal_id, status)


def _safe_update_goal_fields(
    path: Path, goal_id: str, fields: dict[str, object], lock: threading.RLock,
) -&gt; None:
    with lock:
        update_goal_fields(path, goal_id, fields)


def _safe_worktree_merge(root: Path, goal_id: str, lock: threading.RLock) -&gt; None:
    with lock:
        worktree_merge(root, goal_id)


def _safe_worktree_remove(root: Path, goal_id: str, lock: threading.RLock) -&gt; None:
    with lock:
        worktree_remove(root, goal_id)


def _safe_apply_post_goal_replan(
    root: Path,
    completed_goal_id: str,
    lock: threading.RLock,
) -&gt; list[InvalidationEvent]:
    with lock:
        return apply_post_goal_replan(root, completed_goal_id)


def _merge_goal_with_controls(
    root: Path,
    wt_root: Path,
    config: Config,
    goal: Goal,
    run_id: str,
    attempt: int,
    base_commit: str,
    policies: AutoPolicies,
    root_lock: threading.RLock,
    runner_result: RunnerResult | None = None,
) -&gt; bool:
    goals_path = root / ".ai" / "goals.yaml"
    changed_files = get_changed_files(wt_root, base_commit)
    merge_decision = _success_policy_decision(policies, "merge", changed_files)

    if merge_decision.risk_score &gt;= policies.risk.block_threshold:
        _record_evidence(
            root,
            run_id,
            goal.id,
            "merge",
            attempt,
            Outcome(
                success=False,
                should_retry=False,
                classification="risk-blocked",
                reason=(
                    f"risk_score {merge_decision.risk_score} &gt;= "
                    f"block_threshold {policies.risk.block_threshold}"
                ),
            ),
            wt_root,
            base_commit,
            "",
            policy_decision=PolicyDecision(
                policy_version=policies.version,
                failure_classification="risk-threshold",
                action_taken=BLOCK_ACTION,
                risk_score=merge_decision.risk_score,
            ),
            runner_result=runner_result,
        )
        _safe_update_goal_status(goals_path, goal.id, "blocked", root_lock)
        _safe_update_goal_fields(goals_path, goal.id, {"state": "review_ready"}, root_lock)
        return False

    pre_merge_cmd = (config.merge_train.pre_merge_command or config.test_command).strip()
    pre_merge_ok, pre_merge_output = _run_tests(wt_root, pre_merge_cmd, config)
    if not pre_merge_ok:
        _record_evidence(
            root,
            run_id,
            goal.id,
            "merge-train",
            attempt,
            Outcome(
                success=False,
                should_retry=False,
                classification="merge-train-fail",
                reason=f"pre-merge command failed: {pre_merge_cmd}",
            ),
            wt_root,
            base_commit,
            pre_merge_output,
            policy_decision=PolicyDecision(
                policy_version=policies.version,
                failure_classification=classify_failure("failed", "pre-merge command failed", pre_merge_output),
                action_taken=BLOCK_ACTION,
                risk_score=merge_decision.risk_score,
            ),
            runner_result=runner_result,
        )
        _safe_update_goal_status(goals_path, goal.id, "blocked", root_lock)
        _safe_update_goal_fields(goals_path, goal.id, {"state": "review_ready"}, root_lock)
        return False

    if is_dirty(wt_root):
... (truncated, total 2645 lines)

    </source>
    <source path="tests/test_auto.py" lines="501">
"""Tests for automation loop."""

from __future__ import annotations

from pathlib import Path
import subprocess
import textwrap

import pytest
import yaml

from devf.core.attempt import AttemptLog
from devf.core.auto import (
    _changes_allowed,
    build_prompt,
    build_phase_prompt,
    evaluate,
    evaluate_phase,
    run_auto,
)
from devf.core.config import Config
from devf.core.errors import DevfError
from devf.core.goals import Goal, find_goal, load_goals
from devf.core.immune_policy import write_repair_grant
from devf.core.runner import GoalRunner, RunnerResult
from devf.core.runners.local import LocalRunner


def _make_config(**overrides: object) -&gt; Config:
    defaults = {
        "test_command": "echo ok",
        "ai_tool": "echo {prompt}",
        "timeout_minutes": 30,
        "max_retries": 3,
        "max_context_bytes": 120_000,
        "ai_tools": {},
    }
    defaults.update(overrides)
    return Config(**defaults)  # type: ignore[arg-type]


def _make_goal(**overrides: object) -&gt; Goal:
    defaults = {
        "id": "G1",
        "title": "Test Goal",
        "status": "active",
        "children": [],
        "expect_failure": False,
        "allowed_changes": [],
        "prompt_mode": None,
        "mode": None,
        "tool": None,
        "notes": None,
        "acceptance": [],
        "test_files": [],
    }
    defaults.update(overrides)
    return Goal(**defaults)  # type: ignore[arg-type]


def test_changes_allowed() -&gt; None:
    assert _changes_allowed(["src/auth.py"], ["src/*.py"])
    assert not _changes_allowed(["src/auth.py", "docs/readme.md"], ["src/*.py"])
    assert _changes_allowed([], ["src/*.py"])


def test_changes_allowed_ai_dir_always_ok() -&gt; None:
    """Changes to .ai/ should always be allowed (devf metadata)."""
    assert _changes_allowed(
        ["src/auth.py", ".ai/handoffs/2026-02-10_120000.md"],
        ["src/*.py"],
    )
    assert _changes_allowed([".ai/sessions/log.md"], ["src/*.py"])


def test_changes_allowed_with_always_allow_patterns() -&gt; None:
    assert _changes_allowed(
        ["src/auth.py", "docs/ARCHITECTURE.md"],
        ["src/*.py"],
        always_allow=["docs/ARCHITECTURE.md"],
    )


def test_build_prompt(tmp_path: Path) -&gt; None:
    ai = tmp_path / ".ai"
    ai.mkdir()
    (ai / "handoffs").mkdir()
    (ai / "sessions").mkdir()
    (ai / "config.yaml").write_text(
        'test_command: "pytest"\nai_tool: "echo {prompt}"\n', encoding="utf-8",
    )
    (ai / "goals.yaml").write_text("goals: []\n", encoding="utf-8")
    (ai / "rules.md").write_text("# Rules\n- Run tests\n", encoding="utf-8")

    config = _make_config(test_command="pytest")
    goal = _make_goal()
    prompt = build_prompt(tmp_path, config, goal)
    assert "pytest" in prompt
    assert "checklist" in prompt.lower()


def test_build_prompt_handoff_template(tmp_path: Path) -&gt; None:
    """Prompt should include a task tag with goal_id pre-filled in XML."""
    ai = tmp_path / ".ai"
    ai.mkdir()
    (ai / "handoffs").mkdir()
    (ai / "sessions").mkdir()
    (ai / "config.yaml").write_text(
        'test_command: "pytest"\nai_tool: "echo {prompt}"\n', encoding="utf-8",
    )
    (ai / "goals.yaml").write_text("goals: []\n", encoding="utf-8")
    (ai / "rules.md").write_text("", encoding="utf-8")

    config = _make_config()
    goal = _make_goal(id="M1.2")
    prompt = build_prompt(tmp_path, config, goal)
    assert '&lt;task id="M1.2"&gt;' in prompt
    assert "Work completion checklist" in prompt
    assert ".ai/handoffs/" in prompt


def test_build_prompt_expect_failure(tmp_path: Path) -&gt; None:
    ai = tmp_path / ".ai"
    ai.mkdir()
    (ai / "handoffs").mkdir()
    (ai / "sessions").mkdir()
    (ai / "config.yaml").write_text(
        'test_command: "pytest"\nai_tool: "echo {prompt}"\n', encoding="utf-8",
    )
    (ai / "goals.yaml").write_text("goals: []\n", encoding="utf-8")
    (ai / "rules.md").write_text("", encoding="utf-8")

    config = _make_config()
    goal = _make_goal(expect_failure=True)
    prompt = build_prompt(tmp_path, config, goal)
    assert "RED" in prompt


def test_build_prompt_allowed_changes(tmp_path: Path) -&gt; None:
    ai = tmp_path / ".ai"
    ai.mkdir()
    (ai / "handoffs").mkdir()
    (ai / "sessions").mkdir()
    (ai / "config.yaml").write_text(
        'test_command: "pytest"\nai_tool: "echo {prompt}"\n', encoding="utf-8",
    )
    (ai / "goals.yaml").write_text("goals: []\n", encoding="utf-8")
    (ai / "rules.md").write_text("", encoding="utf-8")

    # Create the allowed file so build_symbol_map picks it up
    (tmp_path / "src").mkdir()
    (tmp_path / "src" / "auth.py").write_text("def login(): pass\n", encoding="utf-8")

    config = _make_config()
    goal = _make_goal(allowed_changes=["src/auth.py"])
    prompt = build_prompt(tmp_path, config, goal)
    assert "src/auth.py" in prompt
    assert "class" in prompt or "def" in prompt  # Map should contain the symbol


def test_build_prompt_includes_file_contents(tmp_path: Path) -&gt; None:
    """Prompt should include actual source code of target files."""
    ai = tmp_path / ".ai"
    ai.mkdir()
    (ai / "handoffs").mkdir()
    (ai / "sessions").mkdir()
    (ai / "config.yaml").write_text(
        'test_command: "pytest"\nai_tool: "echo {prompt}"\n', encoding="utf-8",
    )
    (ai / "goals.yaml").write_text("goals: []\n", encoding="utf-8")
    (ai / "rules.md").write_text("", encoding="utf-8")

    (tmp_path / "src").mkdir()
    (tmp_path / "src" / "queue.py").write_text(
        "class TaskQueue:\n    def push(self, item): pass\n", encoding="utf-8",
    )
    (tmp_path / "tests").mkdir()
    (tmp_path / "tests" / "test_queue.py").write_text(
        "def test_push(): assert True\n", encoding="utf-8",
    )

    config = _make_config()
    goal = _make_goal(
        allowed_changes=["src/queue.py"],
        test_files=["tests/test_queue.py"],
    )
    prompt = build_prompt(tmp_path, config, goal)
    assert "&lt;target_files&gt;" in prompt
    assert "class TaskQueue:" in prompt
    assert "def test_push():" in prompt


def test_build_prompt_adversarial(tmp_path: Path) -&gt; None:
    ai = tmp_path / ".ai"
    ai.mkdir()
    (ai / "handoffs").mkdir()
    (ai / "sessions").mkdir()
    (ai / "config.yaml").write_text(
        'test_command: "pytest"\nai_tool: "echo {prompt}"\n', encoding="utf-8",
    )
    (ai / "goals.yaml").write_text("goals: []\n", encoding="utf-8")
    (ai / "rules.md").write_text("", encoding="utf-8")

    config = _make_config()
    goal = _make_goal(prompt_mode="adversarial")
    prompt = build_prompt(tmp_path, config, goal)
    assert "adversarial" in prompt.lower()


def test_evaluate_complete(tmp_project: Path) -&gt; None:
    config = _make_config()
    goal = _make_goal()

    (tmp_project / "new_file.py").write_text("x = 1\n", encoding="utf-8")

    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, test_output = evaluate(tmp_project, config, goal, base_commit)
    assert outcome.success
    assert outcome.classification == "complete"
    assert isinstance(test_output, str)


def test_evaluate_no_changes(tmp_project: Path) -&gt; None:
    config = _make_config()
    goal = _make_goal()
    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, test_output = evaluate(tmp_project, config, goal, base_commit)
    assert not outcome.success
    assert outcome.classification == "no-progress"


def test_evaluate_expect_failure(tmp_project: Path) -&gt; None:
    config = _make_config(test_command="false")  # always fail
    goal = _make_goal(expect_failure=True)

    (tmp_project / "new_file.py").write_text("x = 1\n", encoding="utf-8")
    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, _test_output = evaluate(tmp_project, config, goal, base_commit)
    assert outcome.success
    assert "expected failure" in outcome.classification


def test_evaluate_changes_outside_allowed(tmp_project: Path) -&gt; None:
    config = _make_config()
    goal = _make_goal(allowed_changes=["src/*.py"])

    (tmp_project / "outside.txt").write_text("x\n", encoding="utf-8")
    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, _test_output = evaluate(tmp_project, config, goal, base_commit)
    assert not outcome.success
    assert outcome.should_retry
    assert "allowed scope" in (outcome.reason or "")


def test_evaluate_changes_outside_allowed_with_always_allow(tmp_project: Path) -&gt; None:
    config = _make_config(always_allow_changes=["docs/ARCHITECTURE.md"])
    goal = _make_goal(allowed_changes=["src/*.py"])

    src_file = tmp_project / "src" / "auth.py"
    src_file.parent.mkdir(parents=True, exist_ok=True)
    src_file.write_text("x = 1\n", encoding="utf-8")
    (tmp_project / "docs" / "ARCHITECTURE.md").parent.mkdir(parents=True, exist_ok=True)
    (tmp_project / "docs" / "ARCHITECTURE.md").write_text("# generated\n", encoding="utf-8")

    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, _test_output = evaluate(tmp_project, config, goal, base_commit)
    assert outcome.success
    assert outcome.classification == "complete"


def test_evaluate_tests_failed(tmp_project: Path) -&gt; None:
    config = _make_config(test_command="false")
    goal = _make_goal()

    (tmp_project / "new_file.py").write_text("x = 1\n", encoding="utf-8")
    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, _test_output = evaluate(tmp_project, config, goal, base_commit)
    assert not outcome.success
    assert outcome.should_retry
    assert outcome.reason == "tests failed"


def test_evaluate_complexity_warning(tmp_project: Path, capsys: pytest.CaptureFixture[str]) -&gt; None:
    """Complexity warnings should go to stderr but not fail the evaluation."""
    config = _make_config()
    goal = _make_goal()

    # Create a file that exceeds line limit
    lines = "\n".join(f"x{i} = {i}" for i in range(500))
    (tmp_project / "big.py").write_text(lines, encoding="utf-8")

    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, _test_output = evaluate(tmp_project, config, goal, base_commit)
    assert outcome.success  # complexity is a warning, not failure
    captured = capsys.readouterr()
    assert "[complexity]" in captured.err


def test_build_prompt_acceptance_criteria(tmp_project: "Path") -&gt; None:
    """Acceptance criteria should appear in the XML constraints section."""
    config = _make_config()
    goal = Goal(
        id="V1", title="Login", status="active",
        notes="Use JWT tokens",
        acceptance=["pytest tests/test_auth.py passes", "POST /login returns 200"],
    )
    prompt = build_prompt(tmp_project, config, goal)

    assert "&lt;criteria&gt;pytest tests/test_auth.py passes&lt;/criteria&gt;" in prompt
    assert "&lt;criteria&gt;POST /login returns 200&lt;/criteria&gt;" in prompt
    assert "&lt;notes&gt;Use JWT tokens&lt;/notes&gt;" in prompt


def test_build_prompt_no_acceptance(tmp_project: "Path") -&gt; None:
    """Without acceptance criteria, prompt should not have the section."""
    config = _make_config()
    goal = Goal(id="V1", title="Login", status="active")
    prompt = build_prompt(tmp_project, config, goal)

    assert "Acceptance criteria" not in prompt
    assert "Design notes" not in prompt


def test_build_prompt_retry_includes_diff(tmp_project: "Path") -&gt; None:
    """Retry prompt should include the actual diff from previous attempt."""
    config = _make_config()
    goal = _make_goal()
    attempts = [
        AttemptLog(
            attempt=1,
            classification="failed",
            reason="tests failed",
            diff_stat="src/foo.py | 3 +++",
            test_output="FAILED test_foo - AssertionError\n1 failed, 2 passed",
            diff="--- a/src/foo.py\n+++ b/src/foo.py\n@@ -1 +1 @@\n-old_code\n+new_code",
        ),
    ]
    prompt = build_prompt(tmp_project, config, goal, attempts)
    assert "DO NOT repeat the same approach" in prompt
    assert "-old_code" in prompt
    assert "+new_code" in prompt
    assert "FAILED test_foo" in prompt


def test_build_prompt_retry_without_diff(tmp_project: "Path") -&gt; None:
    """Retry with no diff should fall back to diff_stat."""
    config = _make_config()
    goal = _make_goal()
    attempts = [
        AttemptLog(
            attempt=1,
            classification="no-progress",
            reason="no file changes",
            diff_stat="",
            test_output="",
        ),
    ]
    prompt = build_prompt(tmp_project, config, goal, attempts)
    assert "Attempt 1" in prompt
    assert "no-progress" in prompt


def _make_dirty_project_with_goal(tmp_project: Path) -&gt; None:
    """Add an active goal and make the working tree dirty."""
    (tmp_project / ".ai" / "goals.yaml").write_text(
        "goals:\n  - id: G1\n    title: Test\n    status: active\n",
        encoding="utf-8",
    )
    subprocess.run(
        ["git", "add", "-A"],
        cwd=str(tmp_project), capture_output=True, check=True,
    )
    subprocess.run(
        ["git", "commit", "-m", "add goal"],
        cwd=str(tmp_project), capture_output=True, check=True,
    )
    # Make tree dirty
    (tmp_project / "dirty.txt").write_text("uncommitted\n", encoding="utf-8")


def test_dry_run_works_on_dirty_tree(tmp_project: Path, capsys: pytest.CaptureFixture[str]) -&gt; None:
    """--dry-run should print prompt without error even on a dirty tree."""
    _make_dirty_project_with_goal(tmp_project)

    ret = run_auto(tmp_project, goal_id=None, recursive=False, dry_run=True, explain=False, tool_name=None)
    assert ret == 0
    captured = capsys.readouterr()
    assert "G1" in captured.out  # prompt should contain the goal


def test_non_dry_run_rejects_dirty_tree(tmp_project: Path) -&gt; None:
    """Normal run should still reject a dirty working tree."""
    _make_dirty_project_with_goal(tmp_project)

    with pytest.raises(DevfError, match="dirty"):
        run_auto(tmp_project, goal_id=None, recursive=False, dry_run=False, explain=False, tool_name=None)


def test_build_phase_prompt_implement_fallback(tmp_project: Path) -&gt; None:
    """implement phase without template falls back to existing build_prompt."""
    config = _make_config()
    goal = _make_goal(phase="implement")
    prompt = build_phase_prompt(tmp_project, config, goal, "implement", [])
    # Should contain the standard checklist from build_prompt
    assert "checklist" in prompt.lower()


def test_build_phase_prompt_with_template(tmp_project: Path) -&gt; None:
    """Phase prompt uses Jinja2 template when available."""
    templates_dir = tmp_project / ".ai" / "templates"
    templates_dir.mkdir(parents=True)
    (templates_dir / "implement.md.j2").write_text(
        "TEMPLATE: {{ goal.id }} - {{ goal.title }}", encoding="utf-8"
    )

    config = _make_config()
    goal = _make_goal(phase="implement")
    prompt = build_phase_prompt(tmp_project, config, goal, "implement", [])
    assert "TEMPLATE: G1 - Test Goal" in prompt


def test_evaluate_phase_gate(tmp_project: Path) -&gt; None:
    """gate phase runs mechanical checks instead of AI evaluation."""
    config = _make_config()
    goal = _make_goal(phase="gate")

    (tmp_project / "new_file.py").write_text("x = 1\n", encoding="utf-8")
    base_commit = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        cwd=str(tmp_project), capture_output=True, text=True, check=True,
    ).stdout.strip()

    outcome, output = evaluate_phase(tmp_project, config, goal, "gate", base_commit)
    assert outcome.success
    assert "gate" in outcome.classification.lower()


# ---------------------------------------------------------------------------
# Phase-aware run_auto + circuit breaker tests
# ---------------------------------------------------------------------------


class MockRunner(GoalRunner):
    """Runner that writes a file and succeeds."""

    def __init__(self, filename: str = "output.py", content: str = "x = 1\n"):
        self.filename = filename
        self.content = content
        self.call_count = 0

    def run(self, root, config, goal, prompt, tool_name=None):
        self.call_count += 1
        (root / self.filename).write_text(self.content, encoding="utf-8")
        return RunnerResult(success=True, output="ok")


class NoopRunner(GoalRunner):
    """Runner that does nothing (no file changes)."""

    def __init__(self):
        self.call_count = 0

    def run(self, root, config, goal, prompt, tool_name=None):
        self.call_count += 1
        return RunnerResult(success=True, output="ok")


def test_run_auto_phase_implement_advances(tmp_project: Path) -&gt; None:
    """run_auto with implement phase should advance phase on success."""
    goals_yaml = tmp_project / ".ai" / "goals.yaml"
    goals_yaml.write_text(
        textwrap.dedent("""\
... (truncated, total 1012 lines)

    </source>
  </target_files>
  <reference>
    <file>src/devf/core/auto.py</file>
    <file>tests/test_auto.py</file>
    <code_map>
      # Codebase Map
      src/devf/cli.py:
        def main() -> None
        def _emit_json(payload: object) -> None
        def _resolve_goal_for_focus(root: Path, preferred_goal_id: str | None)
        def _render_tool_launch_command(root: Path, tool_name: str, prompt_rel_path: Path) -> str
        def _render_focus_prompt(root: Path, goal, tool_name: str, context_text: str) -> str
        def _render_focus_brief(goal, tool_name: str, prompt_rel: Path, launch_command: str) -> str
        def init_command(json_output: bool) -> None
        def context_command(format_name: str, json_output: bool) -> None
        def focus_command(tool_name: str, goal_id: str | None, context_format: str, json_output: bool) -> None
        def map_command(json_output: bool) -> None
        def handoff_command(goal_id: str | None, to_stdout: bool, json_output: bool) -> None
        def merge_command(goal_id: str, json_output: bool) -> None
        def status_command(json_output: bool) -> None
        def metrics_command(window_days: int, json_output: bool) -> None
        def immune_group() -> None
        def immune_grant_command(allowed_changes: tuple[Any], approved_by: str, issued_by: str, ttl_minutes: int, reason: str, json_output: bool) -> None
        def docs_group() -> None
        def docs_generate_command(window_days: int, warn_stale: bool, render_mermaid: bool, open_mermaid_index: bool, json_output: bool) -> None
        def docs_mermaid_command(markdown_glob: str, mmdc_bin: str, open_index: bool, json_output: bool) -> None
        def docs_sync_vault_command(check_links: bool, strict: bool, json_output: bool) -> None
        def _open_path(path: Path) -> None
        def triage_command(run_id: str, goal_id: str | None, json_output: bool) -> None
        def feedback_group() -> None
        def feedback_note_command(run_id: str | None, goal_id: str | None, phase: str | None, category: str, impact: str, expected: str, actual: str, workaround: str, confidence: float, tool_name: str | None, json_output: bool) -> None
        def feedback_analyze_command(run_id: str, goal_id: str | None, json_output: bool) -> None
        def feedback_backlog_command(window_days: int, promote: bool, json_output: bool) -> None
        def feedback_publish_command(limit: int, dry_run: bool, json_output: bool) -> None
        def propose_group() -> None
        def propose_note_command(run_id: str | None, goal_id: str | None, source: str, category: str, impact: str, risk: str, confidence: float, effort_hint: str, title: str, why_now: str, evidence_refs: tuple[Any], affected_goals: tuple[Any], json_output: bool) -> None
        def propose_list_command(window_days: int, category: str | None, status: str | None, limit: int, json_output: bool) -> None
        def propose_promote_command(window_days: int, max_active: int, json_output: bool) -> None
        def decision_group() -> None
        def decision_new_command(goal_id: str, question: str, alternatives: str, decision_id: str | None, owner: str, file_path: str | None, json_output: bool) -> None
        def decision_evaluate_command(decision_file: str, accept: bool, run_id: str | None, actor: str, log_evidence: bool, json_output: bool) -> None
        def decision_spike_command(decision_file: str, parallel: int, command_template: str, backend: str, actor: str, log_evidence: bool, accept: bool, json_output: bool) -> None
        def auto_command(goal_id: str | None, recursive: bool, dry_run: bool, explain: bool, tool_name: str | None, parallelism: int) -> None
        def plan_command(instruction: str | None, autonomous: bool, tool_name: str | None) -> None
        def orchestrate_command(run_id: str | None, window_days: int, max_goals: int, publish: bool, publish_dry_run: bool, json_output: bool) -> None
      src/devf/core/attempt.py:
        class AttemptLog:
        def save_attempt(root: Path, goal_id: str, attempt: int, classification: str, reason: str | None, diff_stat: str, test_output: str, diff: str) -> None
        def load_attempts(root: Path, goal_id: str) -> list[AttemptLog]
        def clear_attempts(root: Path, goal_id: str) -> None
        def _get_attempt_dir(root: Path, goal_id: str) -> Path
      src/devf/core/auto.py:
        class Outcome:
        class RedGateResult:
        class PolicyDecision:
        def run_auto(root: Path, goal_id: str | None, recursive: bool, dry_run: bool, explain: bool, tool_name: str | None, runner: GoalRunner | None, parallelism: int) -> int
        def _execute_goal_once(wt_root: Path, root: Path, config: Config, goal: Goal, runner: GoalRunner, tool_name: str | None, explain: bool, base_commit: str, run_id: str, policies: AutoPolicies, root_lock: threading.RLock) -> tuple[Any]
        def _chunked(goals: list[Goal], size: int) -> list[list[Goal]]
        def _safe_update_goal_status(path: Path, goal_id: str, status: str, lock: threading.RLock) -> None
        def _safe_update_goal_fields(path: Path, goal_id: str, fields: dict[Any], lock: threading.RLock) -> None
        def _safe_worktree_merge(root: Path, goal_id: str, lock: threading.RLock) -> None
        def _safe_worktree_remove(root: Path, goal_id: str, lock: threading.RLock) -> None
        def _safe_apply_post_goal_replan(root: Path, completed_goal_id: str, lock: threading.RLock) -> list[InvalidationEvent]
        def _merge_goal_with_controls(root: Path, wt_root: Path, config: Config, goal: Goal, run_id: str, attempt: int, base_commit: str, policies: AutoPolicies, root_lock: threading.RLock, runner_result: RunnerResult | None) -> bool
        def _run_bdd_goal(wt_root: Path, root: Path, config: Config, goal: Goal, max_retries: int, runner: GoalRunner, tool_name: str | None, explain: bool, base_commit: str, run_id: str, policies: AutoPolicies, root_lock: threading.RLock) -> bool
        def _run_legacy_goal(wt_root: Path, root: Path, config: Config, goal: Goal, max_retries: int, runner: GoalRunner, tool_name: str | None, explain: bool, base_commit: str, run_id: str, policies: AutoPolicies, root_lock: threading.RLock) -> bool
        def _run_phased_goal(wt_root: Path, root: Path, config: Config, goal: Goal, phase: str, max_retries: int, runner: GoalRunner, tool_name: str | None, explain: bool, base_commit: str, run_id: str, policies: AutoPolicies, root_lock: threading.RLock) -> bool
        def build_prompt(root: Path, config: Config, goal: Goal, attempts: list[AttemptLog] | None) -> str
        def build_phase_prompt(root: Path, config: Config, goal: Goal, phase: str, attempts: list[AttemptLog] | None) -> str
        def _read_file_safe(path: Path, max_lines: int) -> str
        def _read_latest_handoff_content(root: Path) -> str
        def _read_unresolved_vulns(root: Path) -> str
        def evaluate_phase(root: Path, config: Config, goal: Goal, phase: str, base_commit: str, policy_root: Path | None) -> tuple[Any]
        def evaluate(root: Path, config: Config, goal: Goal, base_commit: str, policy_root: Path | None) -> tuple[Any]
        def _run_tests(root: Path, command: str, config: Config | None) -> tuple[Any]
        def _record_evidence(root: Path, run_id: str, goal_id: str, phase: str | None, attempt: int, outcome: Outcome, wt_root: Path, base_commit: str, test_output: str, policy_decision: PolicyDecision | None, runner_result: RunnerResult | None) -> None
        def _record_goal_invalidation_evidence(root: Path, run_id: str, event: InvalidationEvent) -> None
        def _parse_gate_checks_from_summary(summary: str) -> list[dict[Any]]
        def _update_goal_state_for_evidence(root: Path, goal_id: str, phase: str | None, outcome: Outcome) -> tuple[Any]
        def _apply_policy_decision(root: Path, goal_id: str, phase: str | None, attempt: int, outcome: Outcome, test_output: str, wt_root: Path, base_commit: str, max_retries: int, policies: AutoPolicies) -> tuple[Any]
        def _success_policy_decision(policies: AutoPolicies, phase: str | None, changed_files: list[str]) -> PolicyDecision
        def _verify_bdd_red_stage(root: Path, base_commit: str, contract_file: str | None, contract: AcceptanceContract | None, config: Config | None, languages: list[str] | None) -> RedGateResult
        def _run_targeted_pytest(root: Path, test_files: list[str], config: Config | None) -> tuple[Any]
        def _run_targeted_tests(root: Path, test_files: list[str], config: Config | None, languages: list[str] | None) -> tuple[Any]
        def _validate_bdd_impl_scope(changed_files: list[str], contract_file: str | None) -> tuple[Any]
        def _validate_role_scope(goal: Goal, changed_files: list[str]) -> tuple[Any]
        def _validate_planned_changes(root: Path, goal: Goal, changes: dict[Any] | list[object], stage: str | None, contract_file: str | None, attempt: int | None) -> tuple[Any]
        def _is_real_change_file(path: str) -> bool
        def _is_python_test_file(path: str) -> bool
        def _has_assertions(root: Path, test_files: list[str], config: Config | None, languages: list[str] | None) -> bool
        def _has_nontrivial_assertions(root: Path, test_files: list[str], config: Config | None, languages: list[str] | None) -> bool
        def _validate_required_assertions(root: Path, test_files: list[str], required_assertions: list[str]) -> tuple[Any]
        def _load_goal_contract(root: Path, goal: Goal) -> tuple[Any]
        def _validate_goal_decision_prerequisites(goal_root: Path, goal: Goal) -> tuple[Any]
        def _validate_contract_change_rules(changed_files: list[str], contract: AcceptanceContract, contract_file: str | None) -> tuple[Any]
        def _run_contract_pass_tests(root: Path, contract: AcceptanceContract | None, config: Config | None, languages: list[str] | None) -> tuple[Any]
        def _run_contract_fail_tests(root: Path, contract: AcceptanceContract, config: Config | None, languages: list[str] | None) -> tuple[Any]
        def _triage_test_failure(test_output: str) -> tuple[Any]
        def _looks_like_flaky_failure(text: str) -> bool
        def _get_diff_stat(root: Path, base_commit: str) -> str
        def _get_diff(root: Path, base_commit: str) -> str
        def _changes_allowed(files: Iterable[str], patterns: Iterable[str], always_allow: Iterable[str]) -> bool
        def _lock_path(root: Path) -> Path
        def _acquire_lock(root: Path) -> None
        def _release_lock(root: Path) -> None
        def _pid_alive(pid: int) -> bool
        def _log_warning(message: str) -> None
        def _log_info(message: str) -> None
      src/devf/core/config.py:
        class GateConfig:
        class CircuitBreakerConfig:
        class MergeTrainConfig:
        class ModelConfig:
        class RolesConfig:
        class LanguageProfileConfig:
        class Config:
        def _validate_positive_int(value: Any, field_name: str) -> int
        def _validate_tool_command(command: str, field_name: str) -> None
        def _parse_model_config(data: Any, field_name: str) -> ModelConfig
        def _parse_str_list(value: Any, field_name: str) -> list[str]
        def _default_language_profiles(test_command: str, gate: GateConfig) -> dict[Any]
        def _parse_language_profiles(raw: Any, test_command: str, gate: GateConfig) -> dict[Any]
        def _parse_gate_config(raw: Any) -> GateConfig
        def load_config(path: Path) -> tuple[Any]
      src/devf/core/context.py:
        class ContextData:
        def build_context(root: Path, format_name: str, max_context_bytes: int | None, goal_override: Any | None) -> str
        def build_context_data(root: Path, config: Config, goal_override: Any | None) -> ContextData
        def render_context(data: ContextData, format_name: str) -> str
        def render_pack(data: ContextData) -> str
        def render_plain(data: ContextData) -> str
        def render_markdown(data: ContextData) -> str
        def trim_context_data(data: ContextData) -> ContextData
        def _trim_lines_to_bytes(lines: list[str], max_bytes: int) -> list[str]
        def _load_rules(path: Path) -> list[str]
        def _load_config_or_default(root: Path) -> Config
        def find_root(start: Path) -> Path
        def _has_glob(value: str) -> bool
        def _normalize_relpath(root: Path, path: Path) -> str | None
        def _expand_paths(root: Path, items: Iterable[str]) -> tuple[Any]
        def _get_priority(path: str) -> int
        def _select_context_files(files: set[str], limit: int) -> list[str]
        def _extract_forward_deps(path: Path, module_to_file: dict[Any]) -> set[str]
        def _xml_escape(text: str) -> str
        def _read_file_contents(root: Path, files: set[str]) -> dict[Any]
      src/devf/core/contract.py:
        class AcceptanceContract:
        def load_acceptance_contract(root: Path, contract_file: str | None) -> AcceptanceContract | None
        def validate_required_patterns(items: list[str], patterns: list[str], label: str) -> tuple[Any]
        def validate_forbidden_patterns(items: list[str], patterns: list[str], label: str) -> tuple[Any]
        def contract_prompt_lines(contract: AcceptanceContract) -> list[str]
        def _parse_str_list(data: dict, key: str, contract_file: str, root: Path, normalize: bool) -> list[str]
      src/devf/core/decision.py:
        class AlternativeScore:
        class DecisionEvaluation:
        def normalize_decision_id(value: str) -> str
        def default_decision_id(goal_id: str) -> str
        def create_decision_ticket(goal_id: str, question: str, alternatives: list[str], decision_id: str, owner: str) -> dict[Any]
        def load_decision_ticket(path: Path) -> dict[Any]
        def validate_decision_ticket(ticket: dict[Any]) -> None
        def evaluate_decision_ticket(ticket: dict[Any]) -> DecisionEvaluation
        def apply_decision_result(ticket: dict[Any], evaluation: DecisionEvaluation, actor: str) -> dict[Any]
        def save_decision_ticket(path: Path, ticket: dict[Any]) -> None
        def append_decision_evidence(root: Path, decision_file: Path, ticket: dict[Any], evaluation: DecisionEvaluation, run_id: str | None, actor: str) -> Path
        def _relpath_or_abs(root: Path, path: Path) -> str
      src/devf/core/errors.py:
        class DevfError(Exception):
      src/devf/core/evidence.py:
        def new_run_id() -> str
        def hash_text(text: str) -> str
        def write_evidence_row(root: Path, run_id: str, row: dict[Any]) -> None
      src/devf/core/feedback_infer.py:
        def infer_and_store_feedback_notes(root: Path, run_id: str, policy: FeedbackPolicy, goal_id: str | None) -> list[dict[Any]]
        def infer_feedback_notes_from_rows(rows: list[dict[Any]], run_id: str) -> list[dict[Any]]
        def _infer_repeated_failures(rows: list[dict[Any]], run_id: str) -> list[dict[Any]]
        def _infer_no_progress_waste(rows: list[dict[Any]], run_id: str) -> list[dict[Any]]
        def _infer_retry_then_success(rows: list[dict[Any]], run_id: str) -> list[dict[Any]]
        def _infer_error_clarity(rows: list[dict[Any]], run_id: str) -> list[dict[Any]]
        def _load_run_evidence_rows(root: Path, run_id: str) -> list[dict[Any]]
        def _row_id(row: dict[Any]) -> str
      src/devf/core/feedback_policy.py:
        class FeedbackPromotionPolicy:
        class FeedbackDedupPolicy:
        class FeedbackPublishPolicy:
        class FeedbackPolicy:
        def load_feedback_policy(root: Path) -> FeedbackPolicy
        def _parse_positive_int(value: Any, default: int) -> int
        def _parse_ratio(value: Any, default: float) -> float
        def _parse_non_empty_str(value: Any, default: str) -> str
        def _parse_impact(value: Any, default: str) -> str
        def _parse_str_list(value: Any, default: list[str]) -> list[str]
      src/devf/core/gate.py:
        class CheckResult:
        class GateResult:
        def run_gate(root: Path, config: Config, goal: Goal, base_commit: str) -> GateResult
        def _run_gate_legacy(root: Path, config: Config, goal: Goal, base_commit: str) -> GateResult
        def _run_command_check(name: str, command: str, root: Path) -> CheckResult
        def _run_mutation_checks(checks: dict[Any], config: Config, goal: Goal, languages: list[str], root: Path) -> None
        def _run_mutation_command(name: str, command: str, min_score: int, root: Path) -> CheckResult
        def _extract_mutation_score(output: str) -> float | None
        def _check_diff_size(root: Path, base_commit: str, max_lines: int) -> CheckResult
        def _check_scope(root: Path, goal: Goal, base_commit: str, always_allow: list[str] | None) -> CheckResult
        def _format_summary(checks: dict[Any]) -> str
        def _run_security_checks(checks: dict[Any], security_commands: list[str], root: Path) -> None
        def _guess_security_check_name(command: str, idx: int) -> str
        def _check_required_checks(checks: dict[Any], required_checks: list[str], fail_on_skipped_required: bool) -> CheckResult
      src/devf/core/goals.py:
        class Goal:
        class GoalNode:
        def _parse_goal(data: dict[Any], root: Path) -> Goal
        def load_goals(path: Path) -> list[Goal]
        def _ensure_unique_ids(goals: Iterable[Goal]) -> None
        def iter_goals(goals: Iterable[Goal], depth: int, parent: Goal | None) -> Iterator[GoalNode]
        def find_goal(goals: Iterable[Goal], goal_id: str) -> Goal | None
        def find_goal_node(goals: Iterable[Goal], goal_id: str) -> GoalNode | None
        def select_active_goal(goals: list[Goal], preferred_id: str | None) -> Goal | None
        def collect_goals(goals: list[Goal], root_id: str | None, recursive: bool) -> list[Goal]
        def update_goal_status(path: Path, goal_id: str, status: str) -> None
        def _update_goal_status(raw_goals: list[dict[Any]], goal_id: str, status: str) -> bool
        def update_goal_fields(path: Path, goal_id: str, fields: dict[Any]) -> None
        def _update_goal_fields(raw_goals: list[dict[Any]], goal_id: str, fields: dict[Any]) -> bool
      src/devf/core/immune_policy.py:
        class ImmunePolicy:
        class RepairGrant:
        class ImmuneCheckResult:
        def load_immune_policy(root: Path) -> ImmunePolicy
        def evaluate_immune_changes(root: Path, changed_files: list[str]) -> ImmuneCheckResult
        def write_repair_grant(root: Path) -> Path
        def _load_repair_grant(root: Path, grant_file: str) -> RepairGrant | None
        def _append_immune_audit(root: Path, policy: ImmunePolicy, changed_files: list[str], result: ImmuneCheckResult, metadata: dict[Any] | None) -> None
        def _match_paths(paths: list[str], patterns: list[str]) -> list[str]
        def _out_of_scope_files(paths: list[str], patterns: list[str]) -> list[str]
        def _parse_non_empty_str(value: Any, default: str) -> str
        def _parse_str_list(value: Any, default: list[str]) -> list[str]
        def _parse_positive_int(value: Any, default: int) -> int
        def _parse_iso(value: Any) -> datetime | None
      src/devf/core/languages.py:
        def language_from_path(path: str) -> str | None
        def resolve_goal_languages(root: Path, goal: Goal, config: Config, changed_files: list[str] | None) -> list[str]
        def collect_test_files(changed_files: list[str], config: Config, languages: list[str]) -> list[str]
        def assertion_patterns(config: Config, languages: list[str]) -> list[str]
        def trivial_assertions(config: Config, languages: list[str]) -> list[str]
        def build_targeted_test_commands(config: Config, languages: list[str], test_files: list[str]) -> list[tuple[Any]]
        def gate_commands_for_languages(config: Config, languages: list[str]) -> list[tuple[Any]]
        def apply_pytest_reliability_flags(command: str, gate: GateConfig) -> str
        def _split_files_by_language(test_files: list[str]) -> dict[Any]
        def _is_pytest_invocation(tokens: list[str]) -> bool
        def _has_option(tokens: list[str]) -> bool
        def _is_enabled(config: Config, language: str) -> bool
        def _looks_like_python_repo(root: Path) -> bool
        def _looks_like_rust_repo(root: Path) -> bool
        def _guess_check_name(language: str, command: str, idx: int, config: Config) -> str
      src/devf/core/phase.py:
        def next_phase(current: str, phases: list[str] | None) -> str | None
        def regress_phase(current: str) -> str
        def advance_phase(current: str, phases: list[str] | None) -> str | None
        def load_phase_template(root: Path, phase: str) -> Any
        def parse_plan_output(output: str) -> dict[Any] | None
      src/devf/core/policies.py:
        class AutoPolicies:
          - version(self) -> str
        def load_auto_policies(root: Path) -> AutoPolicies
      src/devf/core/replan.py:
        class InvalidationEvent:
        def apply_post_goal_replan(root: Path, completed_goal_id: str) -> list[InvalidationEvent]
        def _index_goals(raw_goals: list[dict[Any]], index: dict[Any]) -> None
        def _as_id_list(value: Any) -> list[str]
        def _as_optional_str(value: Any) -> str | None
      src/devf/core/retry_policy.py:
        class RetryPolicy:
        def load_retry_policy(root: Path) -> RetryPolicy
        def decide_retry_action(policy: RetryPolicy, failure_classification: str, prior_failure_classifications: list[str], attempt: int, fallback_max_retries: int) -> Action
        def _parse_non_negative_int(value: Any, default: int) -> int
      src/devf/core/risk_policy.py:
        class RiskPolicy:
        def load_risk_policy(root: Path) -> RiskPolicy
        def compute_risk_score(policy: RiskPolicy, phase: str | None, changed_files: list[str], failure_classification: str | None) -> int
        def _touches_sensitive_paths(files: list[str], patterns: list[str]) -> bool
        def _parse_non_negative_int(value: Any, default: int) -> int
        def _parse_positive_int(value: Any, default: int) -> int
      src/devf/core/runner.py:
        class RunnerResult:
        class GoalRunner(ABC):
          - run(self, root: Path, config: Config, goal: Goal, prompt: str, tool_name: str | None) -> RunnerResult
      src/devf/core/runners/llm.py:
        class LLMRunner(GoalRunner):
          - run(self, root: Path, config: Config, goal: Goal, prompt: str, tool_name: str | None) -> RunnerResult
          - _build_messages_with_cache(self, prompt: str, model: str) -> list[dict]
          - _resolve_model_config(self, config: Config, goal: Goal, tool_name: str | None) -> ModelConfig | None
        def _get_usage_value(usage: object, field_name: str) -> int | None
      src/devf/core/runners/local.py:
        class LocalRunner(GoalRunner):
          - run(self, root: Path, config: Config, goal: Goal, prompt: str, tool_name: str | None) -> RunnerResult
          - _resolve_tool_command(self, config: Config, goal: Goal, tool_name: str | None) -> str
      src/devf/core/scheduler.py:
        def build_execution_batches(all_goals: list[Goal], selected: list[Goal]) -> list[list[Goal]]
      src/devf/core/session.py:
        class SessionLog:
        def generate_session_log(root: Path, goal: Goal, base_commit: str, test_output: str, status: str) -> str
        def write_session_log(session_dir: Path, content: str, suffix: str | None) -> Path
        def find_latest_session(session_dir: Path) -> SessionLog | None
        def parse_session(path: Path) -> SessionLog
        def _parse_frontmatter(text: str) -> tuple[Any]
        def _extract_section(body: str, name: str) -> str
        def _extract_test_summary(test_output: str) -> str
      src/devf/core/state_policy.py:
        def decide_goal_state(current_state: str | None, phase: str | None, success: bool, classification: str) -> str | None
        def _target_state_for_success(phase: str | None, classification: str) -> str | None
      src/devf/core/triage.py:
        def classify_failure(classification: str | None, reason: str | None, test_output: str | None) -> str
      src/devf/utils/codetools.py:
        def _iter_py_files(root: Path) -> list[Path]
        def _is_dataclass(node: ast.ClassDef) -> bool
        def _count_dataclass_fields(node: ast.ClassDef) -> int
        def code_structure_snapshot(root: Path) -> str
        def build_import_map(root: Path) -> tuple[Any]
        def _record_import(module: str, importing_file: str, known: set[str], result: dict[Any]) -> None
        def impact_analysis(changed_files: list[str], root: Path) -> str
        def find_related_tests(root: Path, target_files: list[str]) -> list[str]
        def complexity_check(files: list[str], root: Path) -> list[str]
        def _count_init_attrs(func: ast.FunctionDef | ast.AsyncFunctionDef) -> int
        def file_to_module(rel_path: str) -> str | None
        def _relpath(path: Path, root: Path) -> str
      src/devf/utils/file_parser.py:
        class FileChange:
        def parse_file_changes(text: str) -> list[FileChange]
        def apply_file_changes(root: Path, changes: list[FileChange]) -> list[str]
      src/devf/utils/git.py:
        class GitResult:
        def run_git(args: Iterable[str], root: Path, check: bool) -> GitResult
        def get_head_commit(root: Path) -> str
        def get_commit_time(root: Path, commit: str) -> datetime
        def is_dirty(root: Path) -> bool
        def get_changed_files(root: Path, base_commit: str) -> list[str]
        def reset_hard(root: Path, commit: str) -> None
        def commit_all(root: Path, message: str) -> str
        def get_diff_stat(root: Path, base_commit: str) -> str
        def get_log_since(root: Path, base_commit: str) -> list[tuple[Any]]
        def get_recent_log(root: Path, n: int) -> list[tuple[Any]]
        def git_change_summary(root: Path, since_commit: str | None) -> str
        def find_session_boundary(root: Path) -> str | None
        def get_full_messages(root: Path, base_commit: str) -> list[tuple[Any]]
        def get_committed_files(root: Path, base_commit: str) -> list[str]
        def worktree_path(root: Path, goal_id: str) -> Path
        def worktree_create(root: Path, goal_id: str) -> Path
        def worktree_remove(root: Path, goal_id: str) -> None
        def worktree_merge(root: Path, goal_id: str) -> str
        def worktree_list(root: Path) -> list[dict[Any]]
    </code_map>
  </reference>
  <rules>
    <rule>[PLAN_NOTE] Keep this active every session. Edit: .ai/plan_note.md</rule>
    <rule>Primary objective of this Codex session: improve devfork itself for LLM operators.</rule>
    <rule>If a task does not improve operator throughput, safety, or cognitive load, deprioritize it.</rule>
    <rule>Step 0: Read this note first.</rule>
    <rule>Step 1: Inspect latest friction signals (.ai/feedback/backlog.yaml, recent no-progress attempts, scope violations).</rule>
    <rule>Step 2: Pick one smallest high-leverage fix that can be validated by tests.</rule>
    <rule>Step 3: Ship the fix with explicit guardrails and low-risk rollout.</rule>
    <rule>P1: Question queue + resume loop (workers ask instead of repeating no-progress retries).</rule>
    <rule>P2: Strict write boundary enforcement with narrow generated-file exceptions only.</rule>
    <rule>P3: Retry strategy by failure class (scope/env/spec/test) instead of blind retries.</rule>
    <rule>P4: Low-token session handoff format (focus pack + JSON outputs + explicit next action).</rule>
    <rule>P5: Telemetry -> backlog -> publish pipeline with explicit human/manager trigger.</rule>
    <rule>Prefer smaller diff with clear verification over broad refactor.</rule>
    <rule>If ambiguity blocks progress, ask a concrete multiple-choice question.</rule>
    <rule>Keep safety invariants stronger than speed.</rule>
    <rule>[High] Scope exceptions for auto-generated files are too manual.</rule>
    <rule>Symptom: pre-commit updates (docs/ARCHITECTURE.md, core/protocols.py, .codemap.json) trigger out-of-scope failures.</rule>
    <rule>Direction: keep global allowlist in config (always_allow_changes) with narrow patterns.</rule>
    <rule>[High] Scope failure diagnosis is not explicit enough.</rule>
    <rule>Symptom: "changes outside allowed scope" without immediate violating-file list in operator-facing output.</rule>
    <rule>Direction: print violating file paths directly in failure reason/summary.</rule>
    <rule>[Medium] Clean-tree enforcement is too strict for .ai operational artifacts.</rule>
    <rule>Symptom: init/run artifacts create dirty state and force extra commit churn.</rule>
    <rule>Direction: treat operational .ai generated files as ignored/non-blocking by default where safe.</rule>
    <rule>[Medium] Recovery after failed attempt is too manual.</rule>
    <rule>Symptom: blocked -> re-activate -> commit hygiene -> rerun is multi-step.</rule>
    <rule>Direction: add one-command recovery path (scope fix + retry flow).</rule>
    <rule>[Low] Non-interactive runs still produce clarification questions from worker model.</rule>
    <rule>Symptom: worker asks questions and makes no edits -> no-progress.</rule>
    <rule>Direction: strengthen non-interactive execution contract in context/prompt and add ask-via-queue behavior.</rule>
    <rule>Run tests before committing</rule>
    <rule>Commit only after tests pass</rule>
    <rule>{type}({goal_id}): {description}</rule>
    <rule>types: feat, fix, refactor, test, docs, chore</rule>
  </rules>
</context_pack>