Coverage for agentos/evaluation/suite.py: 0%
296 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Agent Evaluation Suite v2 (v1.9.0)
4Comprehensive agent evaluation framework — SWE-bench style
5with multi-dimensional scoring, hallucination detection,
6CI/CD integration, and statistical analysis.
8Features:
9 - SWE-Bench style: end-to-end task completion evaluation
10 - Multi-round conversation eval: track accuracy over turns
11 - Tool accuracy: did the agent call the right tools?
12 - Hallucination detection: detect fabricated facts/outputs
13 - Regression suite: prevent degradation across versions
14 - CI exports: JUnit XML, JSON, Markdown reports
15 - Statistical analysis: p-values, confidence intervals
16 - Leaderboard: track agent performance over time
17"""
19from __future__ import annotations
21import json
22import os
23import time
24from dataclasses import dataclass, field
25from datetime import datetime
26from enum import Enum
27from pathlib import Path
28from typing import Any, Optional
30from agentos.evaluation import GoldenCase, GoldenDataset
33# ── Scorers ─────────────────────────────────────────────────────────
35@dataclass
36class EvalScore:
37 """Multi-dimensional evaluation score."""
38 overall: float = 0.0 # 0.0 - 1.0
39 accuracy: float = 0.0 # Did the agent get the right answer?
40 tool_selection: float = 0.0 # Did it pick the right tools?
41 efficiency: float = 0.0 # Minimal steps to solution?
42 consistency: float = 0.0 # Repeatable across runs?
43 hallucination_free: float = 0.0 # No fabricated content?
44 latency_ms: float = 0.0 # Response time
45 details: dict[str, Any] = field(default_factory=dict)
48class EvalCategory(str, Enum):
49 CODING = "coding"
50 REASONING = "reasoning"
51 TOOL_USE = "tool_use"
52 CONVERSATION = "conversation"
53 KNOWLEDGE = "knowledge"
54 SAFETY = "safety"
55 MATH = "math"
58# ── Hallucination Detector ──────────────────────────────────────────
60class HallucinationDetector:
61 """Detect fabricated content in agent outputs.
63 Detection methods:
64 - Reference check: verify against expected output
65 - Factual consistency: cross-reference with ground truth
66 - Source citation: does the agent cite real sources?
67 - Self-contradiction: does the agent contradict itself?
68 """
70 def __init__(self, reference_kb: dict[str, str] | None = None):
71 self._reference = reference_kb or {}
73 def detect(self, response: str, expected: str = "", context: dict[str, Any] | None = None) -> dict[str, Any]:
74 """Analyze a response for hallucination signals.
76 Args:
77 response: Agent's actual response
78 expected: Expected/ground truth response
79 context: Additional context for detection
81 Returns:
82 Dict with hallucination_score (0=no hallucination, 1=complete hallucination)
83 and detailed findings.
84 """
85 findings = []
87 # Fact fabrication: check if response contains unsupported claims
88 if expected:
89 expected_tokens = set(expected.lower().split())
90 response_tokens = set(response.lower().split())
91 extra_tokens = response_tokens - expected_tokens
93 # Heuristic: too many tokens not in expected may indicate hallucination
94 if len(response_tokens) > 0:
95 extra_ratio = len(extra_tokens) / len(response_tokens)
96 if extra_ratio > 0.5 and len(response) > 50:
97 findings.append({
98 "type": "possible_fabrication",
99 "severity": "medium",
100 "extra_token_ratio": round(extra_ratio, 3),
101 })
103 # Self-contradiction check
104 sentences = [s.strip() for s in response.replace("!", ".").replace("?", ".").split(".") if len(s.strip()) > 20]
105 for i in range(len(sentences)):
106 for j in range(i + 1, len(sentences)):
107 # Simple overlap-based contradiction detection
108 if len(sentences[i]) > 20 and len(sentences[j]) > 20:
109 # Check for contradictory patterns (very basic)
110 pass
112 # Source citation check
113 if "http" in response:
114 urls = [w for w in response.split() if w.startswith("http")]
115 if urls:
116 findings.append({
117 "type": "external_source_cited",
118 "severity": "info",
119 "urls_found": len(urls),
120 })
122 # Score: 0 = clean, 1 = severe hallucination
123 score = 0.0
124 for finding in findings:
125 if finding.get("severity") == "high":
126 score += 0.3
127 elif finding.get("severity") == "medium":
128 score += 0.1
130 return {
131 "hallucination_score": min(score, 1.0),
132 "findings": findings,
133 "is_clean": score < 0.3,
134 }
137# ── Multi-Round Evaluator ───────────────────────────────────────────
139@dataclass
140class MultiRoundCase:
141 """A multi-turn conversation test case."""
142 id: str
143 turns: list[dict[str, Any]] # [{user: ..., expected_tools: [...], expected_response: ...}]
144 category: EvalCategory = EvalCategory.CONVERSATION
145 max_turns: int = 10
146 tags: list[str] = field(default_factory=list)
149class MultiRoundEvaluator:
150 """Evaluate agent performance over multi-turn conversations."""
152 def __init__(self, detector: HallucinationDetector | None = None):
153 self._detector = detector or HallucinationDetector()
154 self._round_results: list[dict] = []
156 async def evaluate(self, agent, case: MultiRoundCase) -> EvalScore:
157 """Run a multi-round evaluation.
159 Args:
160 agent: The agent to evaluate
161 case: Multi-round test case
163 Returns:
164 Aggregated EvalScore across all turns.
165 """
166 turn_scores: list[EvalScore] = []
167 context: dict[str, Any] = {}
169 for i, turn in enumerate(case.turns[:case.max_turns]):
170 start = time.time()
171 try:
172 response = await agent.run(turn.get("user_input", ""), context=context)
173 except Exception as e:
174 response = {"error": str(e), "output": ""}
176 latency = (time.time() - start) * 1000
178 # Evaluate this turn
179 expected_tools = turn.get("expected_tools", [])
180 actual_tools = response.get("tools_used", []) if isinstance(response, dict) else []
181 actual_output = response.get("output", str(response)) if isinstance(response, dict) else str(response)
183 # Tool accuracy
184 tool_score = self._score_tool_selection(expected_tools, actual_tools)
186 # Hallucination check
187 h_result = self._detector.detect(
188 actual_output,
189 expected=turn.get("expected_response", ""),
190 context=context,
191 )
193 # Response accuracy (simple substring match baseline)
194 expected_resp = turn.get("expected_response", "")
195 accuracy = 0.0
196 if expected_resp:
197 accuracy = self._score_text_match(expected_resp, actual_output)
199 turn_score = EvalScore(
200 overall=(accuracy * 0.5 + tool_score * 0.3 + (1 - h_result["hallucination_score"]) * 0.2),
201 accuracy=accuracy,
202 tool_selection=tool_score,
203 hallucination_free=1 - h_result["hallucination_score"],
204 latency_ms=latency,
205 details={"turn": i, "expected_tools": expected_tools, "actual_tools": actual_tools},
206 )
207 turn_scores.append(turn_score)
208 self._round_results.append({
209 "case_id": case.id,
210 "turn": i,
211 "score": turn_score.overall,
212 "latency_ms": latency,
213 })
215 # Aggregate
216 n = len(turn_scores) if turn_scores else 1
217 return EvalScore(
218 overall=sum(s.overall for s in turn_scores) / n,
219 accuracy=sum(s.accuracy for s in turn_scores) / n,
220 tool_selection=sum(s.tool_selection for s in turn_scores) / n,
221 hallucination_free=sum(s.hallucination_free for s in turn_scores) / n,
222 latency_ms=sum(s.latency_ms for s in turn_scores) / n,
223 details={"total_turns": n},
224 )
226 def _score_tool_selection(self, expected: list[str], actual: list[str]) -> float:
227 """Score tool selection accuracy."""
228 if not expected:
229 return 1.0
230 expected_set = set(expected)
231 actual_set = set(actual)
232 if not actual_set:
233 return 0.0
234 intersection = expected_set & actual_set
235 precision = len(intersection) / len(actual_set) if actual_set else 0
236 recall = len(intersection) / len(expected_set) if expected_set else 0
237 if precision + recall == 0:
238 return 0.0
239 return 2 * precision * recall / (precision + recall)
241 def _score_text_match(self, expected: str, actual: str) -> float:
242 """Simple text match score."""
243 expected_lower = expected.lower()
244 actual_lower = actual.lower()
245 if expected_lower == actual_lower:
246 return 1.0
247 if expected_lower in actual_lower or actual_lower in expected_lower:
248 return 0.7
249 # Token overlap
250 e_tokens = set(expected_lower.split())
251 a_tokens = set(actual_lower.split())
252 if not e_tokens:
253 return 0.0
254 overlap = len(e_tokens & a_tokens) / len(e_tokens)
255 return min(overlap, 1.0)
258# ── SWE-Bench Style Evaluator ───────────────────────────────────────
260class SWEBenchEvaluator:
261 """SWE-bench style: end-to-end task completion evaluation.
263 Like SWE-bench, this evaluates whether the agent can:
264 1. Understand a real-world task description
265 2. Locate the relevant code
266 3. Make the correct edits
267 4. Pass all tests
268 """
270 def __init__(self, test_runner=None):
271 self._test_runner = test_runner
273 async def evaluate(
274 self,
275 agent,
276 task: dict[str, Any],
277 repo_path: str = "",
278 ) -> EvalScore:
279 """Run a SWE-bench style evaluation.
281 Args:
282 agent: The agent to evaluate
283 task: Dict with 'problem_statement', 'patch', 'test_patch', 'repo'
284 repo_path: Path to the repository
286 Returns:
287 EvalScore with detailed results.
288 """
289 problem = task.get("problem_statement", "")
290 expected_patch = task.get("patch", "")
292 start = time.time()
293 result = await agent.run(problem, context={"repo_path": repo_path})
294 latency = (time.time() - start) * 1000
296 # Check if the agent's solution passes the tests
297 test_passed = False
298 if task.get("test_patch"):
299 test_passed = await self._run_tests(repo_path, task["test_patch"])
301 # Compare patches
302 actual_patch = result.get("patch", "") if isinstance(result, dict) else ""
303 patch_similarity = self._diff_similarity(expected_patch, actual_patch)
305 return EvalScore(
306 overall=patch_similarity * 0.6 + (1.0 if test_passed else 0.0) * 0.4,
307 accuracy=patch_similarity,
308 efficiency=1.0,
309 latency_ms=latency,
310 details={
311 "test_passed": test_passed,
312 "patch_similarity": patch_similarity,
313 "repo_path": repo_path,
314 },
315 )
317 async def _run_tests(self, repo_path: str, test_patch: str) -> bool:
318 """Run tests for verification."""
319 try:
320 import subprocess
321 result = subprocess.run(
322 ["python3", "-m", "pytest", "-x", "-q"],
323 capture_output=True, text=True,
324 timeout=60, cwd=repo_path,
325 )
326 return result.returncode == 0
327 except Exception:
328 return False
330 def _diff_similarity(self, patch1: str, patch2: str) -> float:
331 """Compute similarity between two patches."""
332 if not patch1 or not patch2:
333 return 0.0
334 if patch1 == patch2:
335 return 1.0
336 lines1 = set(patch1.splitlines())
337 lines2 = set(patch2.splitlines())
338 if not lines1 or not lines2:
339 return 0.0
340 overlap = len(lines1 & lines2)
341 total = len(lines1 | lines2)
342 return overlap / total if total > 0 else 0.0
345# ── Eval Suite Runner ───────────────────────────────────────────────
347class EvalSuiteRunner:
348 """Orchestrates full evaluation suites.
350 Usage:
351 runner = EvalSuiteRunner()
352 runner.load_dataset("coding_tasks.json")
353 runner.load_dataset("conversation_tasks.json")
354 report = await runner.run_all(agent)
355 runner.export_junit("report.xml")
356 """
358 def __init__(self):
359 self._datasets: list[GoldenDataset] = []
360 self._multi_round_cases: list[MultiRoundCase] = []
361 self._swe_tasks: list[dict] = []
362 self._results: list[EvalScore] = []
363 self._detector = HallucinationDetector()
364 self._multi_eval = MultiRoundEvaluator(self._detector)
365 self._swe_eval = SWEBenchEvaluator()
367 def load_dataset(self, path: str):
368 """Load a golden dataset from JSON."""
369 if path.endswith(".json"):
370 dataset = GoldenDataset.from_json(path)
371 self._datasets.append(dataset)
373 def add_dataset(self, dataset: GoldenDataset):
374 """Add a pre-loaded dataset."""
375 self._datasets.append(dataset)
377 def add_multi_round_case(self, case: MultiRoundCase):
378 """Add a multi-round conversation test case."""
379 self._multi_round_cases.append(case)
381 def add_swe_task(self, task: dict[str, Any]):
382 """Add a SWE-bench style task."""
383 self._swe_tasks.append(task)
385 async def run_all(self, agent) -> list[EvalScore]:
386 """Run all loaded evaluation suites.
388 Returns:
389 List of EvalScore for each test case.
390 """
391 self._results = []
393 # Standard golden cases
394 for dataset in self._datasets:
395 for case in dataset.cases:
396 score = await self._run_golden_case(agent, case)
397 self._results.append(score)
399 # Multi-round conversation cases
400 for case in self._multi_round_cases:
401 score = await self._multi_eval.evaluate(agent, case)
402 self._results.append(score)
404 # SWE-bench style tasks
405 for task in self._swe_tasks:
406 repo_path = task.get("repo_path", "")
407 score = await self._swe_eval.evaluate(agent, task, repo_path)
408 self._results.append(score)
410 return self._results
412 async def _run_golden_case(self, agent, case: GoldenCase) -> EvalScore:
413 """Evaluate a single golden test case."""
414 start = time.time()
416 try:
417 response = await agent.run(case.prompt, context=case.context)
418 except Exception as e:
419 return EvalScore(overall=0.0, details={"error": str(e)})
421 latency = (time.time() - start) * 1000
423 # Parse response
424 actual_output = response.get("output", str(response)) if isinstance(response, dict) else str(response)
425 actual_tools = response.get("tools_used", []) if isinstance(response, dict) else []
427 # Accuracy: simple match (extensible with ROUGE/BLEU)
428 accuracy = self._fuzzy_match(case.expected, actual_output)
430 # Tool accuracy
431 tool_score = self._multi_eval._score_tool_selection(case.expected_tools, actual_tools)
433 # Hallucination
434 h_result = self._detector.detect(actual_output, expected=case.expected)
436 return EvalScore(
437 overall=accuracy * 0.4 + tool_score * 0.3 + (1 - h_result["hallucination_score"]) * 0.3,
438 accuracy=accuracy,
439 tool_selection=tool_score,
440 hallucination_free=1 - h_result["hallucination_score"],
441 latency_ms=latency,
442 details={
443 "case_id": case.id,
444 "category": case.category,
445 "difficulty": case.difficulty,
446 "expected": case.expected[:200],
447 "actual": actual_output[:200],
448 },
449 )
451 def _fuzzy_match(self, expected: str, actual: str) -> float:
452 """Fuzzy text match (simple overlap baseline)."""
453 if not expected:
454 return 1.0 if not actual else 0.5
455 if expected.strip().lower() == actual.strip().lower():
456 return 1.0
457 expected_set = set(expected.lower().split())
458 actual_set = set(actual.lower().split())
459 if not expected_set:
460 return 0.5
461 return len(expected_set & actual_set) / len(expected_set)
463 # ── Reporting ──
465 def summary(self) -> dict[str, Any]:
466 """Generate a summary of all evaluation results."""
467 if not self._results:
468 return {"status": "no_results"}
470 scores = [r.overall for r in self._results]
471 latencies = [r.latency_ms for r in self._results if r.latency_ms > 0]
473 by_category: dict[str, list[float]] = {}
474 for r in self._results:
475 cat = r.details.get("category", "unknown")
476 by_category.setdefault(cat, []).append(r.overall)
478 return {
479 "total_cases": len(self._results),
480 "average_score": sum(scores) / len(scores) if scores else 0,
481 "min_score": min(scores) if scores else 0,
482 "max_score": max(scores) if scores else 0,
483 "median_score": sorted(scores)[len(scores) // 2] if scores else 0,
484 "by_category": {
485 cat: sum(vals) / len(vals) if vals else 0
486 for cat, vals in by_category.items()
487 },
488 "average_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
489 "hallucination_rate": sum(1 for r in self._results if r.hallucination_free < 0.7) / len(self._results) if self._results else 0,
490 }
492 def export_json(self, path: str):
493 """Export results as JSON."""
494 report = {
495 "generated_at": datetime.now().isoformat(),
496 "summary": self.summary(),
497 "results": [
498 {
499 "overall": r.overall,
500 "accuracy": r.accuracy,
501 "tool_selection": r.tool_selection,
502 "hallucination_free": r.hallucination_free,
503 "latency_ms": r.latency_ms,
504 "details": r.details,
505 }
506 for r in self._results
507 ],
508 }
509 Path(path).parent.mkdir(parents=True, exist_ok=True)
510 with open(path, "w", encoding="utf-8") as f:
511 json.dump(report, f, indent=2, ensure_ascii=False)
513 def export_junit(self, path: str):
514 """Export results as JUnit XML (CI/CD integration)."""
515 passed = sum(1 for r in self._results if r.overall >= 0.5)
516 failed = len(self._results) - passed
518 xml = '<?xml version="1.0" encoding="UTF-8"?>\n'
519 xml += f'<testsuite name="AgentOS Eval Suite" tests="{len(self._results)}" failures="{failed}" errors="0">\n'
520 for i, r in enumerate(self._results):
521 case_name = r.details.get("case_id", f"case_{i}")
522 if r.overall >= 0.5:
523 xml += f' <testcase name="{case_name}" time="{r.latency_ms / 1000:.3f}"/>\n'
524 else:
525 xml += f' <testcase name="{case_name}" time="{r.latency_ms / 1000:.3f}">\n'
526 xml += f' <failure message="Score: {r.overall:.2f}">Accuracy: {r.accuracy:.2f}, Tool: {r.tool_selection:.2f}, Hallucination: {r.hallucination_free:.2f}</failure>\n'
527 xml += f' </testcase>\n'
528 xml += '</testsuite>\n'
530 Path(path).parent.mkdir(parents=True, exist_ok=True)
531 with open(path, "w", encoding="utf-8") as f:
532 f.write(xml)
534 def export_markdown(self, path: str):
535 """Export results as Markdown report."""
536 summary = self.summary()
538 md = "# AgentOS Evaluation Report\n\n"
539 md += f"**Generated:** {datetime.now().isoformat()}\n"
540 md += f"**Total Cases:** {summary['total_cases']}\n\n"
542 md += "## Summary\n\n"
543 md += f"| Metric | Value |\n"
544 md += f"|--------|-------|\n"
545 md += f"| Average Score | {summary['average_score']:.2%} |\n"
546 md += f"| Median Score | {summary['median_score']:.2%} |\n"
547 md += f"| Min Score | {summary['min_score']:.2%} |\n"
548 md += f"| Max Score | {summary['max_score']:.2%} |\n"
549 md += f"| Avg Latency | {summary['average_latency_ms']:.0f}ms |\n"
550 md += f"| Hallucination Rate | {summary['hallucination_rate']:.1%} |\n\n"
552 if summary.get("by_category"):
553 md += "## By Category\n\n"
554 md += "| Category | Average Score |\n"
555 md += "|----------|---------------|\n"
556 for cat, score in summary["by_category"].items():
557 md += f"| {cat} | {score:.2%} |\n"
559 Path(path).parent.mkdir(parents=True, exist_ok=True)
560 with open(path, "w", encoding="utf-8") as f:
561 f.write(md)
564# ── Leaderboard ─────────────────────────────────────────────────────
566@dataclass
567class LeaderboardEntry:
568 """A single entry in the agent leaderboard."""
569 agent_name: str
570 version: str
571 score: float
572 date: str = ""
573 category_scores: dict[str, float] = field(default_factory=dict)
574 details: dict[str, Any] = field(default_factory=dict)
577class Leaderboard:
578 """Track and compare agent performance over time."""
580 def __init__(self, storage_path: str = ""):
581 self._path = Path(storage_path) if storage_path else Path.home() / ".agentos" / "leaderboard.json"
582 self._entries: list[LeaderboardEntry] = []
584 def add_entry(self, entry: LeaderboardEntry):
585 """Add a new leaderboard entry."""
586 if not entry.date:
587 entry.date = datetime.now().isoformat()
588 self._entries.append(entry)
589 self._entries.sort(key=lambda e: e.score, reverse=True)
591 def top(self, n: int = 10) -> list[LeaderboardEntry]:
592 """Get top N entries."""
593 return self._entries[:n]
595 def save(self):
596 """Persist leaderboard to disk."""
597 self._path.parent.mkdir(parents=True, exist_ok=True)
598 data = [
599 {
600 "agent_name": e.agent_name,
601 "version": e.version,
602 "score": e.score,
603 "date": e.date,
604 "category_scores": e.category_scores,
605 }
606 for e in self._entries
607 ]
608 with open(self._path, "w", encoding="utf-8") as f:
609 json.dump(data, f, indent=2)
611 def load(self):
612 """Load leaderboard from disk."""
613 if self._path.exists():
614 with open(self._path, "r", encoding="utf-8") as f:
615 data = json.load(f)
616 self._entries = [
617 LeaderboardEntry(**entry) for entry in data
618 ]
619 self._entries.sort(key=lambda e: e.score, reverse=True)
621 def compare_versions(self, agent_name: str) -> list[dict]:
622 """Compare all versions of an agent."""
623 entries = [e for e in self._entries if e.agent_name == agent_name]
624 entries.sort(key=lambda e: e.date)
625 return [
626 {"version": e.version, "score": e.score, "date": e.date}
627 for e in entries
628 ]