Coverage for agentos/evolution/autopilot.py: 30%

277 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Closed-Loop Self-Evolution v2 (v1.9.0) 

3 

4AutoPilot — from behavior signals to code changes, fully automated. 

5 

6Pipeline: 

7 1. SignalCollector gathers user behavior (corrections, ratings, tool usage) 

8 2. Learner detects patterns → generates EvolutionProposal 

9 3. AutoPilot validates → generates code change → auto-tests → applies 

10 4. Regression tests verify no breakage 

11 5. Proposal archived with before/after metrics 

12 

13v2 New Features: 

14 - CodeGenerator: LLM-based code diff generation from proposals 

15 - AutoTester: Run regression suite before/after each change 

16 - RollbackManager: Instant undo if regression detected 

17 - Confidence Gating: Only auto-apply proposals above confidence threshold 

18 - A/B Evaluator: Side-by-side before/after comparison 

19 - EvolutionJournal: Full audit trail of every evolution step 

20""" 

21 

22from __future__ import annotations 

23 

24import asyncio 

25import hashlib 

26import json 

27import os 

28import subprocess 

29import tempfile 

30import time 

31from dataclasses import dataclass, field 

32from datetime import datetime 

33from enum import Enum 

34from pathlib import Path 

35from typing import Optional, Callable, Any 

36 

37from agentos.evolution.engine import EvolutionEngine, EvolutionProposal, EvolutionStatus 

38from agentos.evolution.learner import Learner, LearningInsight 

39from agentos.evolution.signals import SignalCollector, BehaviorSignal 

40 

41 

42# ── Types ─────────────────────────────────────────────────────────── 

43 

44class AutoPilotMode(str, Enum): 

45 """AutoPilot operating mode.""" 

46 SUGGEST_ONLY = "suggest_only" # Only generate proposals, don't apply 

47 ASK_BEFORE = "ask_before" # Generate + ask user before applying 

48 CONFIDENCE_GATED = "confidence" # Auto-apply if confidence > threshold 

49 FULL_AUTO = "full_auto" # Auto-apply everything (⚠️ use with guardrails) 

50 

51 

52class ChangeResult(str, Enum): 

53 """Result of an auto-applied change.""" 

54 SUCCESS = "success" 

55 FAILED = "failed" 

56 REGRESSION = "regression" 

57 ROLLED_BACK = "rolled_back" 

58 SKIPPED = "skipped" 

59 

60 

61@dataclass 

62class CodeChange: 

63 """A code change generated from an evolution proposal.""" 

64 proposal_id: str 

65 file_path: str 

66 description: str 

67 diff: str # Unified diff 

68 old_content: str = "" # Pre-change content (for rollback) 

69 new_content: str = "" # Post-change content 

70 language: str = "python" 

71 risk_level: str = "medium" # low / medium / high 

72 test_results: dict[str, Any] = field(default_factory=dict) 

73 

74@dataclass 

75class EvolutionRun: 

76 """Record of a single evolution execution.""" 

77 run_id: str 

78 proposal: EvolutionProposal 

79 changes: list[CodeChange] = field(default_factory=list) 

80 result: ChangeResult = ChangeResult.SKIPPED 

81 started_at: float = 0.0 

82 finished_at: float = 0.0 

83 rollback_info: dict[str, Any] = field(default_factory=dict) 

84 metrics_before: dict[str, Any] = field(default_factory=dict) 

85 metrics_after: dict[str, Any] = field(default_factory=dict) 

86 

87 

88# ── Code Generator ────────────────────────────────────────────────── 

89 

90class CodeGenerator: 

91 """Generate code changes from evolution proposals using LLM. 

92 

93 Takes a high-level proposal (e.g., 'add retry logic to API calls') 

94 and generates concrete unified diffs. 

95 """ 

96 

97 SYSTEM_PROMPT = """You are an expert Python code generator for an agent framework. 

98Given an evolution proposal, generate precise, minimal code changes. 

99Output ONLY a unified diff format. No explanations, no markdown code blocks. 

100Focus on: correctness, backward compatibility, performance, readability.""" 

101 

102 def __init__(self, llm_client=None): 

103 self._llm = llm_client 

104 

105 async def generate(self, proposal: EvolutionProposal, codebase: dict[str, str]) -> list[CodeChange]: 

106 """Generate code changes for a proposal. 

107 

108 Args: 

109 proposal: The evolution proposal to implement 

110 codebase: Dict of {file_path: file_content} for context 

111 

112 Returns: 

113 List of CodeChange objects with unified diffs. 

114 """ 

115 changes: list[CodeChange] = [] 

116 

117 if not self._llm: 

118 # Fallback: generate skeleton changes based on proposal type 

119 return self._skeleton_generate(proposal) 

120 

121 for target_file in proposal.target_files: 

122 content = codebase.get(target_file, "") 

123 prompt = self._build_prompt(proposal, target_file, content) 

124 

125 response = await self._llm.complete(prompt, system=self.SYSTEM_PROMPT) 

126 diff = self._extract_diff(response) 

127 

128 if diff: 

129 new_content = self._apply_diff(content, diff) 

130 changes.append(CodeChange( 

131 proposal_id=proposal.id, 

132 file_path=target_file, 

133 description=proposal.description, 

134 diff=diff, 

135 old_content=content, 

136 new_content=new_content, 

137 risk_level=proposal.risk_level, 

138 )) 

139 

140 return changes 

141 

142 def _skeleton_generate(self, proposal: EvolutionProposal) -> list[CodeChange]: 

143 """Skeleton code generation for proposals (no LLM available).""" 

144 changes = [] 

145 for target_file in proposal.target_files: 

146 changes.append(CodeChange( 

147 proposal_id=proposal.id, 

148 file_path=target_file, 

149 description=proposal.description, 

150 diff=f"# SKELETON: {proposal.description}\n# File: {target_file}", 

151 risk_level=proposal.risk_level, 

152 )) 

153 return changes 

154 

155 def _build_prompt(self, proposal: EvolutionProposal, target_file: str, content: str) -> str: 

156 return f"""Proposal: {proposal.description} 

157Category: {proposal.category} 

158File: {target_file} 

159Priority: {proposal.priority} 

160 

161Current file content: 

162```python 

163{content[:3000]} 

164``` 

165 

166Generate a unified diff to implement this change. Focus only on {target_file}.""" 

167 

168 def _extract_diff(self, response: str) -> str: 

169 """Extract unified diff from LLM response.""" 

170 if response.startswith("---") or response.startswith("diff "): 

171 return response 

172 if "```diff" in response: 

173 start = response.index("```diff") + 7 

174 end = response.index("```", start) if "```" in response[start:] else len(response) 

175 return response[start:end].strip() 

176 return response.strip() 

177 

178 def _apply_diff(self, content: str, diff: str) -> str: 

179 """Simple diff application for well-known patterns.""" 

180 if diff.startswith("# SKELETON"): 

181 return content 

182 try: 

183 result = subprocess.run( 

184 ["patch", "-o", "-", "-"], 

185 input=f"--- a/file\n+++ b/file\n{diff}".encode(), 

186 capture_output=True, 

187 timeout=10, 

188 ) 

189 if result.returncode == 0: 

190 return result.stdout.decode() 

191 except Exception: 

192 pass 

193 return content 

194 

195 

196# ── Auto Tester ───────────────────────────────────────────────────── 

197 

198class AutoTester: 

199 """Run test suite to validate changes.""" 

200 

201 def __init__(self, test_dir: str = "", pytest_args: str = ""): 

202 self._test_dir = Path(test_dir) if test_dir else Path("tests") 

203 self._pytest_args = pytest_args or "-x --tb=short -q" 

204 

205 async def run_tests(self) -> dict[str, Any]: 

206 """Run the test suite. 

207 

208 Returns: 

209 Dict with passed/failed/total counts and error details. 

210 """ 

211 try: 

212 result = subprocess.run( 

213 ["python3", "-m", "pytest", str(self._test_dir)] + self._pytest_args.split(), 

214 capture_output=True, text=True, timeout=120, 

215 cwd=str(self._test_dir.parent) if self._test_dir.parent else None, 

216 ) 

217 passed = "passed" in result.stdout.lower() or result.returncode == 0 

218 return { 

219 "passed": passed, 

220 "total": self._parse_test_count(result.stdout), 

221 "failures": result.returncode if not passed else 0, 

222 "output": result.stdout[-1000:], 

223 "duration": 0, 

224 } 

225 except FileNotFoundError: 

226 return {"passed": True, "total": 0, "failures": 0, "output": "pytest not installed", "duration": 0} 

227 except Exception as e: 

228 return {"passed": False, "total": 0, "failures": 1, "output": str(e), "duration": 0} 

229 

230 def _parse_test_count(self, output: str) -> int: 

231 """Parse test count from pytest output.""" 

232 for line in output.split("\n"): 

233 if "passed" in line.lower(): 

234 try: 

235 return int(line.strip().split()[0]) 

236 except (ValueError, IndexError): 

237 pass 

238 return 0 

239 

240 

241# ── Rollback Manager ───────────────────────────────────────────────── 

242 

243class RollbackManager: 

244 """Instant undo of any auto-applied change.""" 

245 

246 def __init__(self, backup_dir: str = ""): 

247 self._backup_dir = Path(backup_dir) if backup_dir else Path.home() / ".agentos" / "evolution" / "backups" 

248 self._backup_dir.mkdir(parents=True, exist_ok=True) 

249 self._history: list[dict[str, Any]] = [] 

250 

251 def snapshot(self, file_path: str, content: str) -> str: 

252 """Create a backup snapshot of a file before modification.""" 

253 snapshot_id = hashlib.sha256(f"{file_path}:{time.time()}".encode()).hexdigest()[:12] 

254 snapshot_path = self._backup_dir / f"{snapshot_id}.bak" 

255 snapshot_path.write_text(content, encoding="utf-8") 

256 self._history.append({ 

257 "snapshot_id": snapshot_id, 

258 "file_path": file_path, 

259 "timestamp": time.time(), 

260 "size": len(content), 

261 }) 

262 return snapshot_id 

263 

264 def rollback(self, snapshot_id: str) -> bool: 

265 """Restore file from snapshot.""" 

266 snapshot_path = self._backup_dir / f"{snapshot_id}.bak" 

267 if not snapshot_path.exists(): 

268 return False 

269 

270 for entry in self._history: 

271 if entry["snapshot_id"] == snapshot_id: 

272 target = Path(entry["file_path"]) 

273 target.write_text(snapshot_path.read_text(encoding="utf-8"), encoding="utf-8") 

274 return True 

275 

276 return False 

277 

278 def get_history(self, limit: int = 20) -> list[dict[str, Any]]: 

279 """Get recent evolution history.""" 

280 return sorted(self._history, key=lambda x: x["timestamp"], reverse=True)[:limit] 

281 

282 

283# ── A/B Evaluator ─────────────────────────────────────────────────── 

284 

285class ABEvaluator: 

286 """Compare agent performance before and after evolution changes.""" 

287 

288 def __init__(self, test_cases: list[dict[str, str]] | None = None): 

289 self._test_cases = test_cases or [] 

290 self._results_before: list[dict] = [] 

291 self._results_after: list[dict] = [] 

292 

293 async def evaluate_before(self, agent) -> list[dict]: 

294 """Run evaluation before changes.""" 

295 self._results_before = await self._run_eval_loop(agent) 

296 return self._results_before 

297 

298 async def evaluate_after(self, agent) -> list[dict]: 

299 """Run evaluation after changes.""" 

300 self._results_after = await self._run_eval_loop(agent) 

301 return self._results_after 

302 

303 def compare(self) -> dict[str, Any]: 

304 """Compare before/after results.""" 

305 if not self._results_before or not self._results_after: 

306 return {"status": "no_data"} 

307 

308 before_success = sum(1 for r in self._results_before if r.get("passed", False)) 

309 after_success = sum(1 for r in self._results_after if r.get("passed", False)) 

310 total = max(len(self._results_before), len(self._results_after)) 

311 

312 return { 

313 "before_pass_rate": before_success / total if total else 0, 

314 "after_pass_rate": after_success / total if total else 0, 

315 "improvement": (after_success - before_success) / total if total else 0, 

316 "regressions": after_success < before_success, 

317 "total_cases": total, 

318 } 

319 

320 async def _run_eval_loop(self, agent) -> list[dict]: 

321 """Run evaluation loop.""" 

322 results = [] 

323 for case in self._test_cases: 

324 try: 

325 result = await agent.run(case.get("input", "")) 

326 passed = case.get("expected", "") in str(result) 

327 results.append({"case": case.get("id", ""), "passed": passed, "output": str(result)[:500]}) 

328 except Exception as e: 

329 results.append({"case": case.get("id", ""), "passed": False, "error": str(e)}) 

330 return results 

331 

332 

333# ── Evolution Journal ─────────────────────────────────────────────── 

334 

335class EvolutionJournal: 

336 """Complete audit trail of every evolution step.""" 

337 

338 def __init__(self, journal_path: str = ""): 

339 self._path = Path(journal_path) if journal_path else Path.home() / ".agentos" / "evolution" / "journal.jsonl" 

340 self._path.parent.mkdir(parents=True, exist_ok=True) 

341 

342 def log(self, entry: dict[str, Any]): 

343 """Append an entry to the journal.""" 

344 entry["_timestamp"] = datetime.now().isoformat() 

345 with open(self._path, "a", encoding="utf-8") as f: 

346 f.write(json.dumps(entry, ensure_ascii=False) + "\n") 

347 

348 def read(self, limit: int = 50) -> list[dict]: 

349 """Read recent journal entries.""" 

350 if not self._path.exists(): 

351 return [] 

352 entries = [] 

353 with open(self._path, "r", encoding="utf-8") as f: 

354 for line in f: 

355 entries.append(json.loads(line)) 

356 return entries[-limit:] 

357 

358 def stats(self) -> dict[str, Any]: 

359 """Compute evolution statistics from journal.""" 

360 entries = self.read(limit=10000) 

361 if not entries: 

362 return {} 

363 

364 by_type: dict[str, int] = {} 

365 by_result: dict[str, int] = {} 

366 for entry in entries: 

367 by_type[entry.get("type", "unknown")] = by_type.get(entry.get("type", "unknown"), 0) + 1 

368 by_result[entry.get("result", "unknown")] = by_result.get(entry.get("result", "unknown"), 0) + 1 

369 

370 return { 

371 "total_entries": len(entries), 

372 "by_type": by_type, 

373 "by_result": by_result, 

374 "first_entry": entries[0].get("_timestamp", ""), 

375 "last_entry": entries[-1].get("_timestamp", ""), 

376 } 

377 

378 

379# ── AutoPilot ─────────────────────────────────────────────────────── 

380 

381class AutoPilot: 

382 """Closed-loop self-evolution engine. 

383 

384 The AutoPilot orchestrates the entire evolution pipeline: 

385 signals → insights → proposals → code changes → tests → apply/rollback. 

386 

387 Usage: 

388 from agentos.evolution import SignalCollector, EvolutionEngine, Learner, AutoPilot 

389 

390 collector = SignalCollector() 

391 engine = EvolutionEngine() 

392 learner = Learner(collector, engine) 

393 autopilot = AutoPilot(engine, learner, mode=AutoPilotMode.CONFIDENCE_GATED) 

394 

395 # After accumulating signals... 

396 run = await autopilot.evolve() 

397 print(f"Evolved: {run.result} — {len(run.changes)} changes applied") 

398 """ 

399 

400 def __init__( 

401 self, 

402 engine: EvolutionEngine, 

403 learner: Learner, 

404 mode: AutoPilotMode = AutoPilotMode.CONFIDENCE_GATED, 

405 confidence_threshold: float = 0.7, 

406 code_generator: Optional[CodeGenerator] = None, 

407 tester: Optional[AutoTester] = None, 

408 rollback: Optional[RollbackManager] = None, 

409 evaluator: Optional[ABEvaluator] = None, 

410 journal: Optional[EvolutionJournal] = None, 

411 ): 

412 self.engine = engine 

413 self.learner = learner 

414 self.mode = mode 

415 self.confidence_threshold = confidence_threshold 

416 

417 self.codegen = code_generator or CodeGenerator() 

418 self.tester = tester or AutoTester() 

419 self.rollback = rollback or RollbackManager() 

420 self.evaluator = evaluator or ABEvaluator() 

421 self.journal = journal or EvolutionJournal() 

422 

423 self._run_history: list[EvolutionRun] = [] 

424 

425 async def evolve(self, agent=None) -> list[EvolutionRun]: 

426 """Execute one complete evolution cycle. 

427 

428 1. Analyze signals → generate insights 

429 2. Convert insights → proposals 

430 3. For each proposal: generate code → test → apply/rollback 

431 4. Journal everything 

432 

433 Args: 

434 agent: Optional agent instance for A/B evaluation. 

435 

436 Returns: 

437 List of EvolutionRun records for this cycle. 

438 """ 

439 # Step 1: Analyze signals 

440 insights = self.learner.analyze() 

441 

442 # Step 2: Generate proposals 

443 proposals = [] 

444 for insight in insights: 

445 proposal = self.learner.propose_from_insight(insight) 

446 proposals.append(proposal) 

447 

448 # Step 3: Gate by confidence and mode 

449 runs: list[EvolutionRun] = [] 

450 for proposal in proposals: 

451 run = await self._process_proposal(proposal, agent) 

452 runs.append(run) 

453 self._run_history.append(run) 

454 

455 # Step 4: Journal results 

456 self.journal.log({ 

457 "type": "evolution_cycle", 

458 "insights": len(insights), 

459 "proposals": len(proposals), 

460 "applied": sum(1 for r in runs if r.result == ChangeResult.SUCCESS), 

461 "failed": sum(1 for r in runs if r.result == ChangeResult.FAILED), 

462 "rolled_back": sum(1 for r in runs if r.result == ChangeResult.ROLLED_BACK), 

463 }) 

464 

465 return runs 

466 

467 async def _process_proposal(self, proposal: EvolutionProposal, agent=None) -> EvolutionRun: 

468 """Process a single proposal through the pipeline.""" 

469 run = EvolutionRun( 

470 run_id=f"ev_{proposal.id}", 

471 proposal=proposal, 

472 started_at=time.time(), 

473 ) 

474 

475 # Check if we should proceed based on mode 

476 if not self._should_proceed(proposal): 

477 run.result = ChangeResult.SKIPPED 

478 run.finished_at = time.time() 

479 return run 

480 

481 # Snapshot before 

482 for target in proposal.target_files: 

483 if os.path.exists(target): 

484 content = Path(target).read_text(encoding="utf-8") 

485 self.rollback.snapshot(target, content) 

486 

487 # Evaluate before (if agent provided) 

488 if agent: 

489 run.metrics_before = await self.evaluator.compare() 

490 

491 # Generate code changes 

492 codebase = {} 

493 for target in proposal.target_files: 

494 if os.path.exists(target): 

495 codebase[target] = Path(target).read_text(encoding="utf-8") 

496 

497 changes = await self.codegen.generate(proposal, codebase) 

498 

499 if not changes: 

500 run.result = ChangeResult.FAILED 

501 run.finished_at = time.time() 

502 return run 

503 

504 run.changes = changes 

505 

506 # Apply changes 

507 for change in changes: 

508 try: 

509 if change.new_content: 

510 Path(change.file_path).write_text(change.new_content, encoding="utf-8") 

511 except Exception: 

512 # Rollback and fail 

513 for ch in run.changes: 

514 self.rollback.rollback(ch.proposal_id) 

515 run.result = ChangeResult.ROLLED_BACK 

516 run.finished_at = time.time() 

517 return run 

518 

519 # Run tests 

520 test_results = await self.tester.run_tests() 

521 

522 if not test_results.get("passed", False): 

523 # Regression detected — rollback 

524 for change in changes: 

525 if change.old_content: 

526 Path(change.file_path).write_text(change.old_content, encoding="utf-8") 

527 run.result = ChangeResult.REGRESSION 

528 run.rollback_info = {"test_results": test_results} 

529 run.finished_at = time.time() 

530 return run 

531 

532 # Success! 

533 run.result = ChangeResult.SUCCESS 

534 run.test_results = test_results 

535 

536 # Evaluate after (if agent provided) 

537 if agent: 

538 run.metrics_after = await self.evaluator.compare() 

539 

540 # Update proposal status 

541 proposal.status = EvolutionStatus.APPLIED 

542 

543 run.finished_at = time.time() 

544 return run 

545 

546 def _should_proceed(self, proposal: EvolutionProposal) -> bool: 

547 """Determine if we should proceed with this proposal based on mode.""" 

548 if self.mode == AutoPilotMode.SUGGEST_ONLY: 

549 return False 

550 

551 if self.mode == AutoPilotMode.ASK_BEFORE: 

552 # User interaction required — return False, caller must handle 

553 return False 

554 

555 if self.mode == AutoPilotMode.CONFIDENCE_GATED: 

556 return proposal.confidence >= self.confidence_threshold 

557 

558 if self.mode == AutoPilotMode.FULL_AUTO: 

559 # Only safe changes in FULL_AUTO 

560 return proposal.risk_level in ("low", "medium") 

561 

562 return False 

563 

564 def get_run_history(self, limit: int = 20) -> list[EvolutionRun]: 

565 """Get recent evolution runs.""" 

566 return self._run_history[-limit:] 

567 

568 def get_stats(self) -> dict[str, Any]: 

569 """Get AutoPilot statistics.""" 

570 runs = self._run_history 

571 return { 

572 "total_runs": len(runs), 

573 "successful": sum(1 for r in runs if r.result == ChangeResult.SUCCESS), 

574 "failed": sum(1 for r in runs if r.result == ChangeResult.FAILED), 

575 "regressions": sum(1 for r in runs if r.result == ChangeResult.REGRESSION), 

576 "rolled_back": sum(1 for r in runs if r.result == ChangeResult.ROLLED_BACK), 

577 "total_changes": sum(len(r.changes) for r in runs), 

578 "journal_stats": self.journal.stats(), 

579 "rollback_history": len(self.rollback.get_history()), 

580 }