Coverage for agentos/evaluation/suite.py: 0%

296 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Agent Evaluation Suite v2 (v1.9.0) 

3 

4Comprehensive agent evaluation framework — SWE-bench style 

5with multi-dimensional scoring, hallucination detection, 

6CI/CD integration, and statistical analysis. 

7 

8Features: 

9 - SWE-Bench style: end-to-end task completion evaluation 

10 - Multi-round conversation eval: track accuracy over turns 

11 - Tool accuracy: did the agent call the right tools? 

12 - Hallucination detection: detect fabricated facts/outputs 

13 - Regression suite: prevent degradation across versions 

14 - CI exports: JUnit XML, JSON, Markdown reports 

15 - Statistical analysis: p-values, confidence intervals 

16 - Leaderboard: track agent performance over time 

17""" 

18 

19from __future__ import annotations 

20 

21import json 

22import os 

23import time 

24from dataclasses import dataclass, field 

25from datetime import datetime 

26from enum import Enum 

27from pathlib import Path 

28from typing import Any, Optional 

29 

30from agentos.evaluation import GoldenCase, GoldenDataset 

31 

32 

33# ── Scorers ───────────────────────────────────────────────────────── 

34 

35@dataclass 

36class EvalScore: 

37 """Multi-dimensional evaluation score.""" 

38 overall: float = 0.0 # 0.0 - 1.0 

39 accuracy: float = 0.0 # Did the agent get the right answer? 

40 tool_selection: float = 0.0 # Did it pick the right tools? 

41 efficiency: float = 0.0 # Minimal steps to solution? 

42 consistency: float = 0.0 # Repeatable across runs? 

43 hallucination_free: float = 0.0 # No fabricated content? 

44 latency_ms: float = 0.0 # Response time 

45 details: dict[str, Any] = field(default_factory=dict) 

46 

47 

48class EvalCategory(str, Enum): 

49 CODING = "coding" 

50 REASONING = "reasoning" 

51 TOOL_USE = "tool_use" 

52 CONVERSATION = "conversation" 

53 KNOWLEDGE = "knowledge" 

54 SAFETY = "safety" 

55 MATH = "math" 

56 

57 

58# ── Hallucination Detector ────────────────────────────────────────── 

59 

60class HallucinationDetector: 

61 """Detect fabricated content in agent outputs. 

62 

63 Detection methods: 

64 - Reference check: verify against expected output 

65 - Factual consistency: cross-reference with ground truth 

66 - Source citation: does the agent cite real sources? 

67 - Self-contradiction: does the agent contradict itself? 

68 """ 

69 

70 def __init__(self, reference_kb: dict[str, str] | None = None): 

71 self._reference = reference_kb or {} 

72 

73 def detect(self, response: str, expected: str = "", context: dict[str, Any] | None = None) -> dict[str, Any]: 

74 """Analyze a response for hallucination signals. 

75 

76 Args: 

77 response: Agent's actual response 

78 expected: Expected/ground truth response 

79 context: Additional context for detection 

80 

81 Returns: 

82 Dict with hallucination_score (0=no hallucination, 1=complete hallucination) 

83 and detailed findings. 

84 """ 

85 findings = [] 

86 

87 # Fact fabrication: check if response contains unsupported claims 

88 if expected: 

89 expected_tokens = set(expected.lower().split()) 

90 response_tokens = set(response.lower().split()) 

91 extra_tokens = response_tokens - expected_tokens 

92 

93 # Heuristic: too many tokens not in expected may indicate hallucination 

94 if len(response_tokens) > 0: 

95 extra_ratio = len(extra_tokens) / len(response_tokens) 

96 if extra_ratio > 0.5 and len(response) > 50: 

97 findings.append({ 

98 "type": "possible_fabrication", 

99 "severity": "medium", 

100 "extra_token_ratio": round(extra_ratio, 3), 

101 }) 

102 

103 # Self-contradiction check 

104 sentences = [s.strip() for s in response.replace("!", ".").replace("?", ".").split(".") if len(s.strip()) > 20] 

105 for i in range(len(sentences)): 

106 for j in range(i + 1, len(sentences)): 

107 # Simple overlap-based contradiction detection 

108 if len(sentences[i]) > 20 and len(sentences[j]) > 20: 

109 # Check for contradictory patterns (very basic) 

110 pass 

111 

112 # Source citation check 

113 if "http" in response: 

114 urls = [w for w in response.split() if w.startswith("http")] 

115 if urls: 

116 findings.append({ 

117 "type": "external_source_cited", 

118 "severity": "info", 

119 "urls_found": len(urls), 

120 }) 

121 

122 # Score: 0 = clean, 1 = severe hallucination 

123 score = 0.0 

124 for finding in findings: 

125 if finding.get("severity") == "high": 

126 score += 0.3 

127 elif finding.get("severity") == "medium": 

128 score += 0.1 

129 

130 return { 

131 "hallucination_score": min(score, 1.0), 

132 "findings": findings, 

133 "is_clean": score < 0.3, 

134 } 

135 

136 

137# ── Multi-Round Evaluator ─────────────────────────────────────────── 

138 

139@dataclass 

140class MultiRoundCase: 

141 """A multi-turn conversation test case.""" 

142 id: str 

143 turns: list[dict[str, Any]] # [{user: ..., expected_tools: [...], expected_response: ...}] 

144 category: EvalCategory = EvalCategory.CONVERSATION 

145 max_turns: int = 10 

146 tags: list[str] = field(default_factory=list) 

147 

148 

149class MultiRoundEvaluator: 

150 """Evaluate agent performance over multi-turn conversations.""" 

151 

152 def __init__(self, detector: HallucinationDetector | None = None): 

153 self._detector = detector or HallucinationDetector() 

154 self._round_results: list[dict] = [] 

155 

156 async def evaluate(self, agent, case: MultiRoundCase) -> EvalScore: 

157 """Run a multi-round evaluation. 

158 

159 Args: 

160 agent: The agent to evaluate 

161 case: Multi-round test case 

162 

163 Returns: 

164 Aggregated EvalScore across all turns. 

165 """ 

166 turn_scores: list[EvalScore] = [] 

167 context: dict[str, Any] = {} 

168 

169 for i, turn in enumerate(case.turns[:case.max_turns]): 

170 start = time.time() 

171 try: 

172 response = await agent.run(turn.get("user_input", ""), context=context) 

173 except Exception as e: 

174 response = {"error": str(e), "output": ""} 

175 

176 latency = (time.time() - start) * 1000 

177 

178 # Evaluate this turn 

179 expected_tools = turn.get("expected_tools", []) 

180 actual_tools = response.get("tools_used", []) if isinstance(response, dict) else [] 

181 actual_output = response.get("output", str(response)) if isinstance(response, dict) else str(response) 

182 

183 # Tool accuracy 

184 tool_score = self._score_tool_selection(expected_tools, actual_tools) 

185 

186 # Hallucination check 

187 h_result = self._detector.detect( 

188 actual_output, 

189 expected=turn.get("expected_response", ""), 

190 context=context, 

191 ) 

192 

193 # Response accuracy (simple substring match baseline) 

194 expected_resp = turn.get("expected_response", "") 

195 accuracy = 0.0 

196 if expected_resp: 

197 accuracy = self._score_text_match(expected_resp, actual_output) 

198 

199 turn_score = EvalScore( 

200 overall=(accuracy * 0.5 + tool_score * 0.3 + (1 - h_result["hallucination_score"]) * 0.2), 

201 accuracy=accuracy, 

202 tool_selection=tool_score, 

203 hallucination_free=1 - h_result["hallucination_score"], 

204 latency_ms=latency, 

205 details={"turn": i, "expected_tools": expected_tools, "actual_tools": actual_tools}, 

206 ) 

207 turn_scores.append(turn_score) 

208 self._round_results.append({ 

209 "case_id": case.id, 

210 "turn": i, 

211 "score": turn_score.overall, 

212 "latency_ms": latency, 

213 }) 

214 

215 # Aggregate 

216 n = len(turn_scores) if turn_scores else 1 

217 return EvalScore( 

218 overall=sum(s.overall for s in turn_scores) / n, 

219 accuracy=sum(s.accuracy for s in turn_scores) / n, 

220 tool_selection=sum(s.tool_selection for s in turn_scores) / n, 

221 hallucination_free=sum(s.hallucination_free for s in turn_scores) / n, 

222 latency_ms=sum(s.latency_ms for s in turn_scores) / n, 

223 details={"total_turns": n}, 

224 ) 

225 

226 def _score_tool_selection(self, expected: list[str], actual: list[str]) -> float: 

227 """Score tool selection accuracy.""" 

228 if not expected: 

229 return 1.0 

230 expected_set = set(expected) 

231 actual_set = set(actual) 

232 if not actual_set: 

233 return 0.0 

234 intersection = expected_set & actual_set 

235 precision = len(intersection) / len(actual_set) if actual_set else 0 

236 recall = len(intersection) / len(expected_set) if expected_set else 0 

237 if precision + recall == 0: 

238 return 0.0 

239 return 2 * precision * recall / (precision + recall) 

240 

241 def _score_text_match(self, expected: str, actual: str) -> float: 

242 """Simple text match score.""" 

243 expected_lower = expected.lower() 

244 actual_lower = actual.lower() 

245 if expected_lower == actual_lower: 

246 return 1.0 

247 if expected_lower in actual_lower or actual_lower in expected_lower: 

248 return 0.7 

249 # Token overlap 

250 e_tokens = set(expected_lower.split()) 

251 a_tokens = set(actual_lower.split()) 

252 if not e_tokens: 

253 return 0.0 

254 overlap = len(e_tokens & a_tokens) / len(e_tokens) 

255 return min(overlap, 1.0) 

256 

257 

258# ── SWE-Bench Style Evaluator ─────────────────────────────────────── 

259 

260class SWEBenchEvaluator: 

261 """SWE-bench style: end-to-end task completion evaluation. 

262 

263 Like SWE-bench, this evaluates whether the agent can: 

264 1. Understand a real-world task description 

265 2. Locate the relevant code 

266 3. Make the correct edits 

267 4. Pass all tests 

268 """ 

269 

270 def __init__(self, test_runner=None): 

271 self._test_runner = test_runner 

272 

273 async def evaluate( 

274 self, 

275 agent, 

276 task: dict[str, Any], 

277 repo_path: str = "", 

278 ) -> EvalScore: 

279 """Run a SWE-bench style evaluation. 

280 

281 Args: 

282 agent: The agent to evaluate 

283 task: Dict with 'problem_statement', 'patch', 'test_patch', 'repo' 

284 repo_path: Path to the repository 

285 

286 Returns: 

287 EvalScore with detailed results. 

288 """ 

289 problem = task.get("problem_statement", "") 

290 expected_patch = task.get("patch", "") 

291 

292 start = time.time() 

293 result = await agent.run(problem, context={"repo_path": repo_path}) 

294 latency = (time.time() - start) * 1000 

295 

296 # Check if the agent's solution passes the tests 

297 test_passed = False 

298 if task.get("test_patch"): 

299 test_passed = await self._run_tests(repo_path, task["test_patch"]) 

300 

301 # Compare patches 

302 actual_patch = result.get("patch", "") if isinstance(result, dict) else "" 

303 patch_similarity = self._diff_similarity(expected_patch, actual_patch) 

304 

305 return EvalScore( 

306 overall=patch_similarity * 0.6 + (1.0 if test_passed else 0.0) * 0.4, 

307 accuracy=patch_similarity, 

308 efficiency=1.0, 

309 latency_ms=latency, 

310 details={ 

311 "test_passed": test_passed, 

312 "patch_similarity": patch_similarity, 

313 "repo_path": repo_path, 

314 }, 

315 ) 

316 

317 async def _run_tests(self, repo_path: str, test_patch: str) -> bool: 

318 """Run tests for verification.""" 

319 try: 

320 import subprocess 

321 result = subprocess.run( 

322 ["python3", "-m", "pytest", "-x", "-q"], 

323 capture_output=True, text=True, 

324 timeout=60, cwd=repo_path, 

325 ) 

326 return result.returncode == 0 

327 except Exception: 

328 return False 

329 

330 def _diff_similarity(self, patch1: str, patch2: str) -> float: 

331 """Compute similarity between two patches.""" 

332 if not patch1 or not patch2: 

333 return 0.0 

334 if patch1 == patch2: 

335 return 1.0 

336 lines1 = set(patch1.splitlines()) 

337 lines2 = set(patch2.splitlines()) 

338 if not lines1 or not lines2: 

339 return 0.0 

340 overlap = len(lines1 & lines2) 

341 total = len(lines1 | lines2) 

342 return overlap / total if total > 0 else 0.0 

343 

344 

345# ── Eval Suite Runner ─────────────────────────────────────────────── 

346 

347class EvalSuiteRunner: 

348 """Orchestrates full evaluation suites. 

349 

350 Usage: 

351 runner = EvalSuiteRunner() 

352 runner.load_dataset("coding_tasks.json") 

353 runner.load_dataset("conversation_tasks.json") 

354 report = await runner.run_all(agent) 

355 runner.export_junit("report.xml") 

356 """ 

357 

358 def __init__(self): 

359 self._datasets: list[GoldenDataset] = [] 

360 self._multi_round_cases: list[MultiRoundCase] = [] 

361 self._swe_tasks: list[dict] = [] 

362 self._results: list[EvalScore] = [] 

363 self._detector = HallucinationDetector() 

364 self._multi_eval = MultiRoundEvaluator(self._detector) 

365 self._swe_eval = SWEBenchEvaluator() 

366 

367 def load_dataset(self, path: str): 

368 """Load a golden dataset from JSON.""" 

369 if path.endswith(".json"): 

370 dataset = GoldenDataset.from_json(path) 

371 self._datasets.append(dataset) 

372 

373 def add_dataset(self, dataset: GoldenDataset): 

374 """Add a pre-loaded dataset.""" 

375 self._datasets.append(dataset) 

376 

377 def add_multi_round_case(self, case: MultiRoundCase): 

378 """Add a multi-round conversation test case.""" 

379 self._multi_round_cases.append(case) 

380 

381 def add_swe_task(self, task: dict[str, Any]): 

382 """Add a SWE-bench style task.""" 

383 self._swe_tasks.append(task) 

384 

385 async def run_all(self, agent) -> list[EvalScore]: 

386 """Run all loaded evaluation suites. 

387 

388 Returns: 

389 List of EvalScore for each test case. 

390 """ 

391 self._results = [] 

392 

393 # Standard golden cases 

394 for dataset in self._datasets: 

395 for case in dataset.cases: 

396 score = await self._run_golden_case(agent, case) 

397 self._results.append(score) 

398 

399 # Multi-round conversation cases 

400 for case in self._multi_round_cases: 

401 score = await self._multi_eval.evaluate(agent, case) 

402 self._results.append(score) 

403 

404 # SWE-bench style tasks 

405 for task in self._swe_tasks: 

406 repo_path = task.get("repo_path", "") 

407 score = await self._swe_eval.evaluate(agent, task, repo_path) 

408 self._results.append(score) 

409 

410 return self._results 

411 

412 async def _run_golden_case(self, agent, case: GoldenCase) -> EvalScore: 

413 """Evaluate a single golden test case.""" 

414 start = time.time() 

415 

416 try: 

417 response = await agent.run(case.prompt, context=case.context) 

418 except Exception as e: 

419 return EvalScore(overall=0.0, details={"error": str(e)}) 

420 

421 latency = (time.time() - start) * 1000 

422 

423 # Parse response 

424 actual_output = response.get("output", str(response)) if isinstance(response, dict) else str(response) 

425 actual_tools = response.get("tools_used", []) if isinstance(response, dict) else [] 

426 

427 # Accuracy: simple match (extensible with ROUGE/BLEU) 

428 accuracy = self._fuzzy_match(case.expected, actual_output) 

429 

430 # Tool accuracy 

431 tool_score = self._multi_eval._score_tool_selection(case.expected_tools, actual_tools) 

432 

433 # Hallucination 

434 h_result = self._detector.detect(actual_output, expected=case.expected) 

435 

436 return EvalScore( 

437 overall=accuracy * 0.4 + tool_score * 0.3 + (1 - h_result["hallucination_score"]) * 0.3, 

438 accuracy=accuracy, 

439 tool_selection=tool_score, 

440 hallucination_free=1 - h_result["hallucination_score"], 

441 latency_ms=latency, 

442 details={ 

443 "case_id": case.id, 

444 "category": case.category, 

445 "difficulty": case.difficulty, 

446 "expected": case.expected[:200], 

447 "actual": actual_output[:200], 

448 }, 

449 ) 

450 

451 def _fuzzy_match(self, expected: str, actual: str) -> float: 

452 """Fuzzy text match (simple overlap baseline).""" 

453 if not expected: 

454 return 1.0 if not actual else 0.5 

455 if expected.strip().lower() == actual.strip().lower(): 

456 return 1.0 

457 expected_set = set(expected.lower().split()) 

458 actual_set = set(actual.lower().split()) 

459 if not expected_set: 

460 return 0.5 

461 return len(expected_set & actual_set) / len(expected_set) 

462 

463 # ── Reporting ── 

464 

465 def summary(self) -> dict[str, Any]: 

466 """Generate a summary of all evaluation results.""" 

467 if not self._results: 

468 return {"status": "no_results"} 

469 

470 scores = [r.overall for r in self._results] 

471 latencies = [r.latency_ms for r in self._results if r.latency_ms > 0] 

472 

473 by_category: dict[str, list[float]] = {} 

474 for r in self._results: 

475 cat = r.details.get("category", "unknown") 

476 by_category.setdefault(cat, []).append(r.overall) 

477 

478 return { 

479 "total_cases": len(self._results), 

480 "average_score": sum(scores) / len(scores) if scores else 0, 

481 "min_score": min(scores) if scores else 0, 

482 "max_score": max(scores) if scores else 0, 

483 "median_score": sorted(scores)[len(scores) // 2] if scores else 0, 

484 "by_category": { 

485 cat: sum(vals) / len(vals) if vals else 0 

486 for cat, vals in by_category.items() 

487 }, 

488 "average_latency_ms": sum(latencies) / len(latencies) if latencies else 0, 

489 "hallucination_rate": sum(1 for r in self._results if r.hallucination_free < 0.7) / len(self._results) if self._results else 0, 

490 } 

491 

492 def export_json(self, path: str): 

493 """Export results as JSON.""" 

494 report = { 

495 "generated_at": datetime.now().isoformat(), 

496 "summary": self.summary(), 

497 "results": [ 

498 { 

499 "overall": r.overall, 

500 "accuracy": r.accuracy, 

501 "tool_selection": r.tool_selection, 

502 "hallucination_free": r.hallucination_free, 

503 "latency_ms": r.latency_ms, 

504 "details": r.details, 

505 } 

506 for r in self._results 

507 ], 

508 } 

509 Path(path).parent.mkdir(parents=True, exist_ok=True) 

510 with open(path, "w", encoding="utf-8") as f: 

511 json.dump(report, f, indent=2, ensure_ascii=False) 

512 

513 def export_junit(self, path: str): 

514 """Export results as JUnit XML (CI/CD integration).""" 

515 passed = sum(1 for r in self._results if r.overall >= 0.5) 

516 failed = len(self._results) - passed 

517 

518 xml = '<?xml version="1.0" encoding="UTF-8"?>\n' 

519 xml += f'<testsuite name="AgentOS Eval Suite" tests="{len(self._results)}" failures="{failed}" errors="0">\n' 

520 for i, r in enumerate(self._results): 

521 case_name = r.details.get("case_id", f"case_{i}") 

522 if r.overall >= 0.5: 

523 xml += f' <testcase name="{case_name}" time="{r.latency_ms / 1000:.3f}"/>\n' 

524 else: 

525 xml += f' <testcase name="{case_name}" time="{r.latency_ms / 1000:.3f}">\n' 

526 xml += f' <failure message="Score: {r.overall:.2f}">Accuracy: {r.accuracy:.2f}, Tool: {r.tool_selection:.2f}, Hallucination: {r.hallucination_free:.2f}</failure>\n' 

527 xml += f' </testcase>\n' 

528 xml += '</testsuite>\n' 

529 

530 Path(path).parent.mkdir(parents=True, exist_ok=True) 

531 with open(path, "w", encoding="utf-8") as f: 

532 f.write(xml) 

533 

534 def export_markdown(self, path: str): 

535 """Export results as Markdown report.""" 

536 summary = self.summary() 

537 

538 md = "# AgentOS Evaluation Report\n\n" 

539 md += f"**Generated:** {datetime.now().isoformat()}\n" 

540 md += f"**Total Cases:** {summary['total_cases']}\n\n" 

541 

542 md += "## Summary\n\n" 

543 md += f"| Metric | Value |\n" 

544 md += f"|--------|-------|\n" 

545 md += f"| Average Score | {summary['average_score']:.2%} |\n" 

546 md += f"| Median Score | {summary['median_score']:.2%} |\n" 

547 md += f"| Min Score | {summary['min_score']:.2%} |\n" 

548 md += f"| Max Score | {summary['max_score']:.2%} |\n" 

549 md += f"| Avg Latency | {summary['average_latency_ms']:.0f}ms |\n" 

550 md += f"| Hallucination Rate | {summary['hallucination_rate']:.1%} |\n\n" 

551 

552 if summary.get("by_category"): 

553 md += "## By Category\n\n" 

554 md += "| Category | Average Score |\n" 

555 md += "|----------|---------------|\n" 

556 for cat, score in summary["by_category"].items(): 

557 md += f"| {cat} | {score:.2%} |\n" 

558 

559 Path(path).parent.mkdir(parents=True, exist_ok=True) 

560 with open(path, "w", encoding="utf-8") as f: 

561 f.write(md) 

562 

563 

564# ── Leaderboard ───────────────────────────────────────────────────── 

565 

566@dataclass 

567class LeaderboardEntry: 

568 """A single entry in the agent leaderboard.""" 

569 agent_name: str 

570 version: str 

571 score: float 

572 date: str = "" 

573 category_scores: dict[str, float] = field(default_factory=dict) 

574 details: dict[str, Any] = field(default_factory=dict) 

575 

576 

577class Leaderboard: 

578 """Track and compare agent performance over time.""" 

579 

580 def __init__(self, storage_path: str = ""): 

581 self._path = Path(storage_path) if storage_path else Path.home() / ".agentos" / "leaderboard.json" 

582 self._entries: list[LeaderboardEntry] = [] 

583 

584 def add_entry(self, entry: LeaderboardEntry): 

585 """Add a new leaderboard entry.""" 

586 if not entry.date: 

587 entry.date = datetime.now().isoformat() 

588 self._entries.append(entry) 

589 self._entries.sort(key=lambda e: e.score, reverse=True) 

590 

591 def top(self, n: int = 10) -> list[LeaderboardEntry]: 

592 """Get top N entries.""" 

593 return self._entries[:n] 

594 

595 def save(self): 

596 """Persist leaderboard to disk.""" 

597 self._path.parent.mkdir(parents=True, exist_ok=True) 

598 data = [ 

599 { 

600 "agent_name": e.agent_name, 

601 "version": e.version, 

602 "score": e.score, 

603 "date": e.date, 

604 "category_scores": e.category_scores, 

605 } 

606 for e in self._entries 

607 ] 

608 with open(self._path, "w", encoding="utf-8") as f: 

609 json.dump(data, f, indent=2) 

610 

611 def load(self): 

612 """Load leaderboard from disk.""" 

613 if self._path.exists(): 

614 with open(self._path, "r", encoding="utf-8") as f: 

615 data = json.load(f) 

616 self._entries = [ 

617 LeaderboardEntry(**entry) for entry in data 

618 ] 

619 self._entries.sort(key=lambda e: e.score, reverse=True) 

620 

621 def compare_versions(self, agent_name: str) -> list[dict]: 

622 """Compare all versions of an agent.""" 

623 entries = [e for e in self._entries if e.agent_name == agent_name] 

624 entries.sort(key=lambda e: e.date) 

625 return [ 

626 {"version": e.version, "score": e.score, "date": e.date} 

627 for e in entries 

628 ]