#!/usr/bin/env python3
"""
nano-vm stress benchmark suite v0.1
Validates: δ(S, E) → S' — deterministic, replayable, failure-safe

Array size: 10_000 | Runs per test: 5
"""

import asyncio
import hashlib
import inspect
import json
import random
import statistics
import sys
import time
import traceback
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Literal

# ── rich ──────────────────────────────────────────────────────────────────────
try:
    from rich import box
    from rich import print as rprint
    from rich.columns import Columns
    from rich.console import Console
    from rich.layout import Layout
    from rich.live import Live
    from rich.panel import Panel
    from rich.progress import (
        BarColumn,
        MofNCompleteColumn,
        Progress,
        SpinnerColumn,
        TaskProgressColumn,
        TextColumn,
        TimeElapsedColumn,
    )
    from rich.rule import Rule
    from rich.table import Table
    from rich.text import Text
except ImportError:
    print("Installing rich...")
    import subprocess

    subprocess.check_call([sys.executable, "-m", "pip", "install", "rich", "-q"])
    from rich import box
    from rich.columns import Columns
    from rich.console import Console
    from rich.panel import Panel
    from rich.progress import (
        BarColumn,
        MofNCompleteColumn,
        Progress,
        SpinnerColumn,
        TaskProgressColumn,
        TextColumn,
        TimeElapsedColumn,
    )
    from rich.rule import Rule
    from rich.table import Table
    from rich.text import Text

# ── llm-nano-vm ───────────────────────────────────────────────────────────────
try:
    from nano_vm import ExecutionVM, Program
    from nano_vm.adapters import MockLLMAdapter

    HAS_NANO_VM = True
except ImportError:
    HAS_NANO_VM = False

console = Console(highlight=False)

ARRAY_SIZE = 10_000
RUNS = 5
SEED = 42

# ═══════════════════════════════════════════════════════════════════════════════
# FSM STUB (fallback когда llm-nano-vm не установлен, тестирует FSM-логику)
# ═══════════════════════════════════════════════════════════════════════════════


class FSMState(str, Enum):
    DRAFT = "DRAFT"
    CART_VALIDATED = "CART_VALIDATED"
    PAYMENT_PENDING = "PAYMENT_PENDING"
    PAYMENT_FAILED = "PAYMENT_FAILED"
    PAID = "PAID"
    RESERVED = "RESERVED"
    DELIVERING = "DELIVERING"
    DELIVERY_CONFIRMED = "DELIVERY_CONFIRMED"
    COMPLETED = "COMPLETED"
    ESCALATED = "ESCALATED"
    CANCELLED_SAFE = "CANCELLED_SAFE"
    CANCELLED_COMPENSATED = "CANCELLED_COMPENSATED"


TRANSITION_MATRIX: dict[tuple[FSMState, str], FSMState] = {
    (FSMState.DRAFT, "cart.validate"): FSMState.CART_VALIDATED,
    (FSMState.CART_VALIDATED, "payment.initiate"): FSMState.PAYMENT_PENDING,
    (FSMState.CART_VALIDATED, "order.cancel_safe"): FSMState.CANCELLED_SAFE,
    (FSMState.PAYMENT_PENDING, "payment.confirmed"): FSMState.PAID,
    (FSMState.PAYMENT_PENDING, "payment.failed"): FSMState.PAYMENT_FAILED,
    (FSMState.PAYMENT_PENDING, "payment.timeout"): FSMState.PAYMENT_FAILED,
    (FSMState.PAYMENT_FAILED, "payment.initiate"): FSMState.PAYMENT_PENDING,
    (FSMState.PAYMENT_FAILED, "order.cancel_safe"): FSMState.CANCELLED_SAFE,
    (FSMState.PAID, "stock.reserve"): FSMState.RESERVED,
    (FSMState.PAID, "order.cancel_compensate"): FSMState.CANCELLED_COMPENSATED,
    (FSMState.RESERVED, "delivery.assign"): FSMState.DELIVERING,
    (FSMState.RESERVED, "order.cancel_compensate"): FSMState.CANCELLED_COMPENSATED,
    (FSMState.DELIVERING, "delivery.confirmed"): FSMState.DELIVERY_CONFIRMED,
    (FSMState.DELIVERY_CONFIRMED, "order.complete"): FSMState.COMPLETED,
    (FSMState.ESCALATED, "order.cancel_compensate"): FSMState.CANCELLED_COMPENSATED,
}

TERMINAL_STATES = {FSMState.COMPLETED, FSMState.CANCELLED_SAFE, FSMState.CANCELLED_COMPENSATED}


@dataclass
class StepResult:
    status: Literal["SUCCESS", "FAILED", "PENDING"]
    data: dict[str, Any] = field(default_factory=dict)
    idempotency_key: str = ""
    cached: bool = False
    error: str | None = None


@dataclass
class OrderState:
    state: FSMState
    version: int = 0
    side_effects: list[str] = field(default_factory=list)


class StubFSM:
    """Детерминированная FSM без LLM в control flow (I1)."""

    def __init__(self):
        self._tool_cache: dict[str, StepResult] = {}
        self._state_mut_count: int = 0

    def transition(self, order: OrderState, event: str) -> OrderState | None:
        key = (order.state, event)
        if key not in TRANSITION_MATRIX:
            return None
        new_state = TRANSITION_MATRIX[key]
        # I1: LLM не влияет — переход чисто детерминирован
        return OrderState(
            state=new_state, version=order.version + 1, side_effects=list(order.side_effects)
        )

    def execute_tool(
        self, tool_name: str, idempotency_key: str, fail_prob: float = 0.0
    ) -> StepResult:
        # I3: exactly-once via persisted cache
        if idempotency_key in self._tool_cache:
            r = self._tool_cache[idempotency_key]
            return StepResult(
                status=r.status, data=r.data, idempotency_key=idempotency_key, cached=True
            )
        if random.random() < fail_prob:
            result = StepResult(
                status="FAILED", idempotency_key=idempotency_key, error="simulated_failure"
            )
        else:
            result = StepResult(
                status="SUCCESS", data={"tool": tool_name}, idempotency_key=idempotency_key
            )
        self._tool_cache[idempotency_key] = result
        return result

    def direct_mutate(self, order: OrderState, new_state: FSMState) -> None:
        """Нарушение I1 — прямая мутация минуя δ"""
        order.state = new_state
        self._state_mut_count += 1


# ═══════════════════════════════════════════════════════════════════════════════
# BENCHMARK INFRASTRUCTURE
# ═══════════════════════════════════════════════════════════════════════════════


@dataclass
class BenchmarkResult:
    name: str
    tag: str
    runs: list[float]
    passed: bool
    metrics: dict[str, Any]
    invariant: str
    error: str | None = None

    @property
    def mean_ms(self) -> float:
        return statistics.mean(self.runs) * 1000

    @property
    def p95_ms(self) -> float:
        s = sorted(self.runs)
        idx = int(len(s) * 0.95)
        return s[min(idx, len(s) - 1)] * 1000

    @property
    def stdev_ms(self) -> float:
        return (statistics.stdev(self.runs) * 1000) if len(self.runs) > 1 else 0.0

    @property
    def throughput(self) -> float:
        mean_s = statistics.mean(self.runs)
        return ARRAY_SIZE / mean_s if mean_s > 0 else 0


# ═══════════════════════════════════════════════════════════════════════════════
# BENCHMARK TESTS
# ═══════════════════════════════════════════════════════════════════════════════


def bench_01_idempotency_replay() -> BenchmarkResult:
    """BM-01: Idempotency Under Replay Stress — S_t = S_{t+k}"""
    fsm = StubFSM()
    runs = []
    violations = 0
    cached_hits = 0

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            key = f"order:{i}:step:validate:cart"
            r1 = fsm.execute_tool("validate_cart", key)
            for _ in range(9):
                r2 = fsm.execute_tool("validate_cart", key)
                if r2.status != r1.status:
                    violations += 1
                if r2.cached:
                    cached_hits += 1
        runs.append(time.perf_counter() - t0)

    total_replays = ARRAY_SIZE * 9 * RUNS
    return BenchmarkResult(
        name="Idempotency Under Replay Stress",
        tag="BM-01",
        runs=runs,
        passed=violations == 0,
        invariant="S_t = S_{t+k}  (no state change on replay)",
        metrics={
            "total_ops": ARRAY_SIZE * 10 * RUNS,
            "replay_ops": total_replays,
            "violations": violations,
            "cache_hit_rate": f"{cached_hits / total_replays * 100:.2f}%",
        },
    )


def bench_02_duplicate_execution_attack() -> BenchmarkResult:
    """BM-02: Duplicate Execution Attack — tool executed exactly once"""
    runs = []
    double_executions = 0

    for _ in range(RUNS):
        t0 = time.perf_counter()
        fsm = StubFSM()  # Инициализируем 1 раз на прогон, чтобы кэш работал между заказами
        for i in range(ARRAY_SIZE):
            key = f"order:{i}:step:pay"
            results = []
            n_triggers = random.randint(2, 10)
            for _ in range(n_triggers):
                results.append(fsm.execute_tool("process_payment", key))
            
            real_executions = sum(1 for r in results if not r.cached)
            if real_executions > 1:
                double_executions += 1
        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Duplicate Execution Attack",
        tag="BM-02",
        runs=runs,
        passed=double_executions == 0,
        invariant="tool executed exactly once per idempotency_key",
        metrics={
            "orders_tested": ARRAY_SIZE * RUNS,
            "double_executions": double_executions,
            "safety_rate": f"{(1 - double_executions / (ARRAY_SIZE * RUNS)) * 100:.4f}%",
        },
    )


def bench_03_crash_mid_step_recovery() -> BenchmarkResult:
    """BM-03: Crash Mid-Step Recovery — resume from correct checkpoint"""
    runs = []
    wrong_resumes = 0
    skipped_transitions = 0

    happy_path = [
        "cart.validate", "payment.initiate", "payment.confirmed", "stock.reserve",
        "delivery.assign", "delivery.confirmed", "order.complete",
    ]

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            order = OrderState(state=FSMState.DRAFT)
            crash_at = random.randint(1, len(happy_path) - 1)
            checkpoint_state: FSMState | None = None
            checkpoint_version: int = 0

            for step_idx, event in enumerate(happy_path):
                if step_idx == crash_at:
                    checkpoint_state = order.state
                    checkpoint_version = order.version
                    break
                new_order = fsm.transition(order, event)
                if new_order is None: break
                order = new_order

            if checkpoint_state is None: continue

            resumed = OrderState(state=checkpoint_state, version=checkpoint_version)
            for event in happy_path[crash_at:]:
                new_order = fsm.transition(resumed, event)
                if new_order is None:
                    skipped_transitions += 1
                    break
                if new_order.version != resumed.version + 1:
                    wrong_resumes += 1
                resumed = new_order

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Crash Mid-Step Recovery", tag="BM-03", runs=runs, passed=wrong_resumes == 0,
        invariant="resume continues from checkpoint, no step reordering",
        metrics={"orders_tested": ARRAY_SIZE * RUNS, "wrong_resumes": wrong_resumes, "skipped_transitions": skipped_transitions},
    )


def bench_04_nondeterministic_llm_injection() -> BenchmarkResult:
    """BM-04: Non-Deterministic LLM Output Injection — FSM ignores semantic noise"""
    runs = []
    llm_influenced_transitions = 0
    llm_outputs = ["pay now", "please pay", "CONFIRMED", "yes, proceed", "оплата", "¿pagar?", "true", "1", "ok", "done", "✓", "sure why not", "affirmative"]

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            order = OrderState(state=FSMState.PAYMENT_PENDING)
            llm_noise = random.choice(llm_outputs)
            
            result_without_llm = fsm.transition(order, "payment.confirmed")
            result_with_noise = fsm.transition(order, "payment.confirmed")
            
            if result_without_llm is None or result_with_noise is None: continue
            if result_without_llm.state != result_with_noise.state:
                llm_influenced_transitions += 1
        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Non-Deterministic LLM Output Injection", tag="BM-04", runs=runs, passed=llm_influenced_transitions == 0,
        invariant="δ(S,E)→S' independent of LLM semantic output",
        metrics={"transitions_tested": ARRAY_SIZE * RUNS, "llm_influenced": llm_influenced_transitions, "llm_noise_samples": len(llm_outputs)},
    )


def bench_05_tool_failure_cascade() -> BenchmarkResult:
    """BM-05: Tool Failure Cascade A→B→C — C не исполняется если B упал"""
    runs = []
    cascade_violations = 0
    retry_explosions = 0
    MAX_RETRIES = 3

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            c_executed = False
            retries = 0

            r_a = fsm.execute_tool("tool_a", f"order:{i}:a", fail_prob=0.0)
            if r_a.status != "SUCCESS": continue

            r_b = fsm.execute_tool("tool_b", f"order:{i}:b", fail_prob=0.4)

            while r_b.status == "FAILED" and retries < MAX_RETRIES:
                retries += 1
                r_b = fsm.execute_tool("tool_b", f"order:{i}:b:retry:{retries}", fail_prob=0.4)

            # Проверяем взрыв ретраев, если бы while был сломан 
            if retries > MAX_RETRIES:
                retry_explosions += 1
                
            if r_b.status == "SUCCESS":
                r_c = fsm.execute_tool("tool_c", f"order:{i}:c", fail_prob=0.0)
                c_executed = r_c.status == "SUCCESS"
            else:
                # Если B так и остался FAILED, симулируем потенциальное нарушение
                # (кто-то попытался вызвать C минуя проверки)
                # В нашей стаб-системе мы просто проверяем, что логика блокирует вызов
                pass

            if r_b.status == "FAILED" and c_executed:
                cascade_violations += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Tool Failure Cascade A→B→C", tag="BM-05", runs=runs, passed=cascade_violations == 0 and retry_explosions == 0,
        invariant="C not executed if B failed; no retry explosion",
        metrics={"orders_tested": ARRAY_SIZE * RUNS, "cascade_violations": cascade_violations, "retry_explosions": retry_explosions, "max_retries": MAX_RETRIES},
    )


def bench_06_timeout_drift() -> BenchmarkResult:
    """BM-06: Long-Running Tool + Timeout Drift — step remains consistent"""
    runs = []
    partial_transitions = 0
    inconsistent_states = 0
    timeout_outcomes = defaultdict(int)
    TIMEOUT_MS = 50

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            order = OrderState(state=FSMState.RESERVED)
            simulated_duration_ms = random.choice([5, 20, 55, 200, 1000, 30000])
            timed_out = simulated_duration_ms > TIMEOUT_MS

            if timed_out:
                outcome = "reservation.timeout"
                timeout_outcomes["timeout"] += 1
            else:
                outcome = "delivery.assign"
                timeout_outcomes["success"] += 1

            new_order = fsm.transition(order, outcome)

            if timed_out:
                expected = FSMState.ESCALATED
                if new_order is not None and new_order.state != expected:
                    inconsistent_states += 1
                if order.state != FSMState.RESERVED:
                    partial_transitions += 1
            else:
                if new_order is None:
                    inconsistent_states += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Long-Running Tool + Timeout Drift", tag="BM-06", runs=runs, passed=partial_transitions == 0 and inconsistent_states == 0,
        invariant="timeout → explicit outcome class, no partial transitions",
        metrics={"orders_tested": ARRAY_SIZE * RUNS, "partial_transitions": partial_transitions, "inconsistent_states": inconsistent_states, "timeout_rate": f"{timeout_outcomes['timeout'] / (ARRAY_SIZE * RUNS) * 100:.1f}%"},
    )


def bench_07_out_of_order_events() -> BenchmarkResult:
    """BM-07: Out-of-Order Event Delivery — valid ordering or deterministic reject"""
    runs = []
    invalid_transitions_accepted = 0
    valid_sequences_completed = 0

    valid_sequence = [("DRAFT", "cart.validate"), ("CART_VALIDATED", "payment.initiate"), ("PAYMENT_PENDING", "payment.confirmed"), ("PAID", "stock.reserve")]

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            shuffled = list(valid_sequence)
            random.shuffle(shuffled)
            order = OrderState(state=FSMState.DRAFT)
            completed = True

            for _, event in shuffled:
                new_order = fsm.transition(order, event)
                if new_order is None:
                    completed = False
                    break
                order = new_order

            if completed and shuffled != valid_sequence:
                invalid_transitions_accepted += 1
            elif not completed:
                valid_sequences_completed += 0
            else:
                valid_sequences_completed += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Out-of-Order Event Delivery", tag="BM-07", runs=runs, passed=invalid_transitions_accepted == 0,
        invariant="invalid ordering rejected deterministically",
        metrics={"sequences_tested": ARRAY_SIZE * RUNS, "invalid_accepted": invalid_transitions_accepted, "valid_completed": valid_sequences_completed},
    )


def bench_08_state_explosion_memory() -> BenchmarkResult:
    """BM-08: State Explosion / Memory Pressure — bounded StateContext"""
    runs = []
    memory_violations = 0
    happy_path_events = ["cart.validate", "payment.initiate", "payment.confirmed", "stock.reserve", "delivery.assign", "delivery.confirmed", "order.complete"]

    for _ in range(RUNS):
        t0 = time.perf_counter()
        states_seen: set[str] = set()

        for i in range(ARRAY_SIZE):
            order = OrderState(state=FSMState.DRAFT)
            fsm = StubFSM()
            for event in happy_path_events:
                new_order = fsm.transition(order, event)
                if new_order is None: break
                fp = hashlib.md5(f"{order.state}:{order.version}".encode()).hexdigest()[:8]
                states_seen.add(fp)
                order = new_order

        unique_states = len({s[:4] for s in states_seen})
        if unique_states > len(FSMState):
            memory_violations += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="State Explosion / Memory Pressure", tag="BM-08", runs=runs, passed=memory_violations == 0,
        invariant="StateContext bounded by |S|, not by order count",
        metrics={"orders_processed": ARRAY_SIZE, "steps_per_order": len(happy_path_events), "total_transitions": ARRAY_SIZE * len(happy_path_events), "memory_violations": memory_violations, "fsm_state_count": len(FSMState)},
    )


def bench_09_partial_stepresult_corruption() -> BenchmarkResult:
    """BM-09: Partial StepResult Corruption — normalizes to success/failed/unknown"""
    runs = []
    undefined_transitions = 0
    normalized_correctly = 0
    corrupted_payloads = [None, "", "{}", '{"status": null}', '{"status": "YOLO"}', '{"status": "SUCCESS"', b"\x00\xff\xfe", '{"status": "SUCCESS", "data": ' + "x" * 10_000 + "}"]

    def normalize_step_result(raw: Any) -> Literal["SUCCESS", "FAILED", "UNKNOWN"]:
        try:
            if raw is None or raw == "" or isinstance(raw, bytes): return "FAILED"
            data = json.loads(raw) if isinstance(raw, str) else raw
            status = data.get("status", "")
            if status == "SUCCESS": return "SUCCESS"
            elif status in ("FAILED", "PENDING"): return "FAILED"
            else: return "UNKNOWN"
        except Exception:
            return "FAILED"

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            payload = corrupted_payloads[i % len(corrupted_payloads)]
            outcome = normalize_step_result(payload)
            if outcome not in ("SUCCESS", "FAILED", "UNKNOWN"):
                undefined_transitions += 1
            else:
                normalized_correctly += 1
        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Partial StepResult Corruption", tag="BM-09", runs=runs, passed=undefined_transitions == 0,
        invariant="all corrupted inputs normalize to {SUCCESS, FAILED, UNKNOWN}",
        metrics={"payloads_tested": ARRAY_SIZE * RUNS, "normalized_correctly": normalized_correctly, "undefined_transitions": undefined_transitions, "corruption_types": len(corrupted_payloads)},
    )


def bench_10_transition_validity_invariant() -> BenchmarkResult:
    """BM-10: Transition Validity Invariant — no direct mutation outside δ"""
    runs = []
    direct_mutations = 0
    invalid_transitions_blocked = 0
    valid_transitions = 0

    ALL_EVENTS = ["cart.validate", "payment.initiate", "payment.confirmed", "payment.failed", "payment.timeout", "stock.reserve", "delivery.assign", "delivery.confirmed", "order.complete", "order.cancel_safe", "order.cancel_compensate", "reservation.timeout", "delivery.timeout"]

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            order = OrderState(state=random.choice(list(FSMState)))
            event = random.choice(ALL_EVENTS)
            state_before = order.state
            new_order = fsm.transition(order, event)

            if new_order is not None:
                if new_order.version != order.version + 1: direct_mutations += 1
                if order.state != state_before: direct_mutations += 1
                valid_transitions += 1
            else:
                invalid_transitions_blocked += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Transition Validity Invariant", tag="BM-10", runs=runs, passed=direct_mutations == 0,
        invariant="S_{t+1} = δ(S_t, E) — no mutation outside transition function",
        metrics={"transitions_tested": ARRAY_SIZE * RUNS, "valid_transitions": valid_transitions, "blocked_invalid": invalid_transitions_blocked, "direct_mutations": direct_mutations},
    )


async def bench_11_reentrancy_stress() -> BenchmarkResult:
    """BM-11: Reentrancy Stress — no double state mutation on concurrent step() calls"""
    runs = []
    double_mutations = 0
    version_conflicts = 0

    # Уменьшим размер массива для async-бенчмарка, чтобы избежать оверхеда Event Loop
    REDUCED_ARRAY = ARRAY_SIZE // 10 

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(REDUCED_ARRAY):
            fsm = StubFSM()
            order = OrderState(state=FSMState.CART_VALIDATED, version=i)
            n_concurrent = random.randint(2, 8)
            
            async def call_trans():
                await asyncio.sleep(0) # Уступаем контроль Event Loop
                return fsm.transition(order, "payment.initiate")

            # Симулируем параллельные вызовы
            results = await asyncio.gather(*(call_trans() for _ in range(n_concurrent)))

            unique_versions = {r.version for r in results if r is not None}
            if len(unique_versions) > 1:
                version_conflicts += 1
            
            if order.state != FSMState.CART_VALIDATED or order.version != i:
                double_mutations += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Reentrancy Stress Test", tag="BM-11", runs=runs, passed=double_mutations == 0 and version_conflicts == 0,
        invariant="no double state mutation; deterministic serialization",
        metrics={"orders_tested": REDUCED_ARRAY * RUNS, "double_mutations": double_mutations, "version_conflicts": version_conflicts},
    )


def bench_12_chaos_mode() -> BenchmarkResult:
    """BM-12: Chaos Mode — full system stress, eventual consistency"""
    runs = []
    invalid_final_states = 0
    total_escalations = 0

    CHAOS_EVENTS = [("payment.confirmed", 0.3), ("payment.failed", 0.2), ("payment.timeout", 0.1), ("stock.reserve", 0.1), ("delivery.assign", 0.1), ("delivery.confirmed", 0.1), ("order.complete", 0.05), ("order.cancel_compensate", 0.05)]

    for _ in range(RUNS):
        t0 = time.perf_counter()
        for i in range(ARRAY_SIZE):
            fsm = StubFSM()
            order = OrderState(state=FSMState.DRAFT)
            step_count = 0
            MAX_STEPS = 50

            new_order = fsm.transition(order, "cart.validate")
            if new_order: order = new_order

            while order.state not in TERMINAL_STATES and step_count < MAX_STEPS:
                step_count += 1
                event = random.choices([e for e, _ in CHAOS_EVENTS], weights=[w for _, w in CHAOS_EVENTS])[0]
                new_order = fsm.transition(order, event)
                if new_order is not None: order = new_order

                if random.random() < 0.05:
                    order = OrderState(state=FSMState.ESCALATED, version=order.version + 1)
                    total_escalations += 1

                if step_count >= MAX_STEPS:
                    order = OrderState(state=FSMState.CANCELLED_SAFE, version=order.version + 1)

            if order.state not in FSMState.__members__.values():
                invalid_final_states += 1

        runs.append(time.perf_counter() - t0)

    return BenchmarkResult(
        name="Chaos Mode — Full System Stress", tag="BM-12", runs=runs, passed=invalid_final_states == 0,
        invariant="eventual consistency; deterministic final state; no invalid transitions",
        metrics={"orders_processed": ARRAY_SIZE * RUNS, "invalid_final_states": invalid_final_states, "total_escalations": total_escalations, "chaos_event_types": len(CHAOS_EVENTS)},
    )


# ═══════════════════════════════════════════════════════════════════════════════
# NANO-VM INTEGRATION TESTS (если установлен)
# ═══════════════════════════════════════════════════════════════════════════════


async def bench_nanovm_double_execution() -> BenchmarkResult | None:
    """BM-VM: Double-execution safety via llm-nano-vm FSM trace invariant"""
    if not HAS_NANO_VM:
        return None

    program = Program.from_dict({
        "name": "payment_flow",
        "steps": [
            {"id": "classify", "type": "llm", "prompt": "Should we process refund? Reply yes or no.", "output_key": "decision"},
            {"id": "guard", "type": "condition", "condition": "'yes' in '$decision'", "then": "process", "otherwise": "reject"},
            {"id": "process", "type": "tool", "tool": "issue_payment"},
            {"id": "reject", "type": "tool", "tool": "send_rejection"},
        ],
    })

    # Используем list как контейнер для сохранения мутабельного состояния внутри лямбды
    call_count = [0]
    
    async def mock_issue_payment(**_):
        call_count[0] += 1
        return {"status": "ok"}
        
    async def mock_send_rejection(**_):
        return {"status": "rejected"}

    tools = {
        "issue_payment": mock_issue_payment,
        "send_rejection": mock_send_rejection,
    }

    runs = []
    N = 100

    try:
        for _ in range(3):
            t0 = time.perf_counter()
            double_exec = 0
            for i in range(N):
                call_count[0] = 0
                vm = ExecutionVM(
                    llm=MockLLMAdapter("yes"),
                    tools=tools,
                )
                trace = await vm.run(program, context={})
                payment_steps = [s for s in trace.steps if s.step_id == "process" and s.status.name == "SUCCESS"]
                
                if len(payment_steps) > 1:
                    double_exec += 1

            runs.append(time.perf_counter() - t0)

        return BenchmarkResult(
            name="nano-vm Double Execution Safety",
            tag="BM-VM",
            runs=runs,
            passed=double_exec == 0,
            invariant="I_k(T) ∈ {0,1} — FSM trace append-only invariant",
            metrics={"runs": N * 3, "double_executions": double_exec},
        )
    except Exception as e:
        return BenchmarkResult(
            name="nano-vm Double Execution Safety", tag="BM-VM", runs=[0.0], passed=False,
            invariant="I_k(T) ∈ {0,1}", metrics={}, error=str(e),
        )


# ═══════════════════════════════════════════════════════════════════════════════
# RICH OUTPUT
# ═══════════════════════════════════════════════════════════════════════════════


def render_header():
    console.print()
    console.print(
        Panel(
            Text.from_markup(
                "[bold white]nano-vm stress benchmark suite[/bold white]  [dim]v0.1.1[/dim]\n\n"
                "[dim]δ(S, E) → S'  —  deterministic · replayable · failure-safe[/dim]\n\n"
                f"[cyan]Array size:[/cyan] [bold]{ARRAY_SIZE:,}[/bold]  "
                f"[cyan]Runs per test:[/cyan] [bold]{RUNS}[/bold]  "
                f"[cyan]Seed:[/cyan] [bold]{SEED}[/bold]  "
                f"[cyan]Mode:[/cyan] [bold]{'llm-nano-vm' if HAS_NANO_VM else 'FSM stub'}[/bold]",
                justify="center",
            ),
            border_style="cyan",
            padding=(1, 4),
            title="[bold cyan]⬡ NANO-VM[/bold cyan]",
            title_align="center",
        )
    )
    console.print()


def render_results(results: list[BenchmarkResult]):
    table = Table(
        box=box.MINIMAL_DOUBLE_HEAD, border_style="dim cyan", header_style="bold cyan",
        show_footer=False, title="[bold white]Benchmark Results[/bold white]",
        title_style="bold", expand=True, padding=(0, 1),
    )

    table.add_column("Tag", style="bold yellow", no_wrap=True, width=8)
    table.add_column("Test", style="white", min_width=36)
    table.add_column("Status", justify="center", width=10)
    table.add_column("Mean ms", justify="right", style="cyan", width=10)
    table.add_column("p95 ms", justify="right", style="dim cyan", width=10)
    table.add_column("σ ms", justify="right", style="dim", width=9)
    table.add_column("Throughput /s", justify="right", style="green", width=14)
    table.add_column("Invariant", style="dim white", min_width=40)

    passed_count = 0
    failed_count = 0

    for r in results:
        status_text = Text("✓ PASS", style="bold green") if r.passed else Text("✗ FAIL", style="bold red")
        if r.passed:
            passed_count += 1
        else:
            failed_count += 1

        throughput = f"{r.throughput:,.0f}" if r.throughput > 0 else "—"
        mean_style = "green" if r.mean_ms < 100 else "yellow" if r.mean_ms < 500 else "red"

        table.add_row(
            r.tag, r.name, status_text, Text(f"{r.mean_ms:.1f}", style=mean_style),
            f"{r.p95_ms:.1f}", f"{r.stdev_ms:.1f}", throughput, r.invariant,
        )

    console.print(table)
    console.print()

    metric_panels = []
    for r in results:
        lines = []
        for k, v in r.metrics.items():
            key_str = k.replace("_", " ").title()
            is_bad = (isinstance(v, int) and v > 0 and any(bad in k for bad in ["violation", "double", "mutation", "conflict", "undefined", "invalid", "wrong", "explosion"]))
            val_style = "bold red" if is_bad else "bold white"
            lines.append(f"[dim]{key_str}:[/dim] [{val_style}]{v}[/{val_style}]")

        status_color = "green" if r.passed else "red"
        metric_panels.append(
            Panel(
                "\n".join(lines),
                title=f"[{status_color}]{r.tag}[/{status_color}] [dim]{r.name[:28]}[/dim]",
                border_style=status_color, padding=(0, 1), expand=True,
            )
        )

    for i in range(0, len(metric_panels), 3):
        chunk = metric_panels[i : i + 3]
        console.print(Columns(chunk, equal=True, expand=True))

    console.print()

    total = passed_count + failed_count
    score_pct = passed_count / total * 100 if total > 0 else 0
    all_mean = statistics.mean([r.mean_ms for r in results]) if results else 0
    total_ops = sum(r.metrics.get("total_ops", r.metrics.get("orders_tested", r.metrics.get("sequences_tested", r.metrics.get("transitions_tested", r.metrics.get("payloads_tested", r.metrics.get("orders_processed", ARRAY_SIZE)))))) for r in results)

    score_style = "bold green" if score_pct == 100 else "bold yellow" if score_pct >= 75 else "bold red"
    verdict_icon = "⬢" if score_pct == 100 else "◈" if score_pct >= 75 else "◇"
    verdict = "DETERMINISTIC EXECUTION RUNTIME VERIFIED" if score_pct == 100 else "PARTIAL — some invariants violated" if score_pct >= 75 else "CRITICAL — execution guarantees NOT met"

    summary = Table.grid(padding=(0, 3))
    for _ in range(5): summary.add_column(justify="center")
    summary.add_row(
        Text(f"[{score_style}]{passed_count}/{total} PASSED[/{score_style}]"),
        Text(f"[cyan]Score: {score_pct:.0f}%[/cyan]"),
        Text(f"[white]Avg latency: {all_mean:.1f}ms[/white]"),
        Text(f"[dim]Array: {ARRAY_SIZE:,} × {RUNS} runs[/dim]"),
        Text(f"[dim]Total ops: {total_ops:,}[/dim]"),
    )

    console.print(
        Panel(
            summary, title=f"[bold]{verdict_icon} {verdict}[/bold]",
            border_style="green" if score_pct == 100 else "yellow" if score_pct >= 75 else "red",
            padding=(0, 2),
        )
    )

    console.print()
    console.print(Rule("[dim]Per-run timing (seconds)[/dim]", style="dim"))
    run_table = Table(box=box.SIMPLE, show_header=True, header_style="dim", border_style="dim", padding=(0, 2))
    run_table.add_column("Test", style="dim yellow", width=8)
    for r in range(1, RUNS + 1): run_table.add_column(f"Run {r}", justify="right", style="dim white", width=9)
    run_table.add_column("Mean", justify="right", style="cyan", width=9)

    for r in results:
        row = [r.tag] + [f"{v:.4f}s" for v in r.runs] + [f"{statistics.mean(r.runs):.4f}s"]
        run_table.add_row(*row)

    console.print(run_table)


async def main():
    random.seed(SEED)
    render_header()

    bench_fns: list[Callable[[], BenchmarkResult]] = [
        bench_01_idempotency_replay,
        bench_02_duplicate_execution_attack,
        bench_03_crash_mid_step_recovery,
        bench_04_nondeterministic_llm_injection,
        bench_05_tool_failure_cascade,
        bench_06_timeout_drift,
        bench_07_out_of_order_events,
        bench_08_state_explosion_memory,
        bench_09_partial_stepresult_corruption,
        bench_10_transition_validity_invariant,
        bench_11_reentrancy_stress,
        bench_12_chaos_mode,
    ]

    results: list[BenchmarkResult] = []

    progress = Progress(
        SpinnerColumn(style="cyan"),
        TextColumn("[bold white]{task.description}", justify="left"),
        BarColumn(bar_width=32, style="cyan", complete_style="bold cyan"),
        TaskProgressColumn(),
        MofNCompleteColumn(),
        TimeElapsedColumn(),
        console=console,
        expand=True,
    )

    with progress:
        task = progress.add_task("Running benchmarks...", total=len(bench_fns))
        for fn in bench_fns:
            progress.update(task, description=f"[cyan]{fn.__name__.upper()[:30]}[/cyan]")
            try:
                # Поддержка как синхронных, так и асинхронных тестов (как bench_11)
                if inspect.iscoroutinefunction(fn):
                    result = await fn()
                else:
                    result = fn()
                results.append(result)
            except Exception:
                tag = fn.__name__.replace("bench_", "BM-").upper()[:6]
                results.append(
                    BenchmarkResult(
                        name=fn.__name__, tag=tag, runs=[0.0], passed=False,
                        invariant="ERROR", metrics={}, error=traceback.format_exc(),
                    )
                )
            progress.advance(task)

    # nano-vm integration test
    if HAS_NANO_VM:
        console.print("[dim]Running llm-nano-vm integration test...[/dim]")
        vm_result = await bench_nanovm_double_execution()
        if vm_result:
            results.append(vm_result)

    console.print()
    render_results(results)


if __name__ == "__main__":
    asyncio.run(main())
