Coverage for agentos/evolution/autopilot.py: 30%
277 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Closed-Loop Self-Evolution v2 (v1.9.0)
4AutoPilot — from behavior signals to code changes, fully automated.
6Pipeline:
7 1. SignalCollector gathers user behavior (corrections, ratings, tool usage)
8 2. Learner detects patterns → generates EvolutionProposal
9 3. AutoPilot validates → generates code change → auto-tests → applies
10 4. Regression tests verify no breakage
11 5. Proposal archived with before/after metrics
13v2 New Features:
14 - CodeGenerator: LLM-based code diff generation from proposals
15 - AutoTester: Run regression suite before/after each change
16 - RollbackManager: Instant undo if regression detected
17 - Confidence Gating: Only auto-apply proposals above confidence threshold
18 - A/B Evaluator: Side-by-side before/after comparison
19 - EvolutionJournal: Full audit trail of every evolution step
20"""
22from __future__ import annotations
24import asyncio
25import hashlib
26import json
27import os
28import subprocess
29import tempfile
30import time
31from dataclasses import dataclass, field
32from datetime import datetime
33from enum import Enum
34from pathlib import Path
35from typing import Optional, Callable, Any
37from agentos.evolution.engine import EvolutionEngine, EvolutionProposal, EvolutionStatus
38from agentos.evolution.learner import Learner, LearningInsight
39from agentos.evolution.signals import SignalCollector, BehaviorSignal
42# ── Types ───────────────────────────────────────────────────────────
44class AutoPilotMode(str, Enum):
45 """AutoPilot operating mode."""
46 SUGGEST_ONLY = "suggest_only" # Only generate proposals, don't apply
47 ASK_BEFORE = "ask_before" # Generate + ask user before applying
48 CONFIDENCE_GATED = "confidence" # Auto-apply if confidence > threshold
49 FULL_AUTO = "full_auto" # Auto-apply everything (⚠️ use with guardrails)
52class ChangeResult(str, Enum):
53 """Result of an auto-applied change."""
54 SUCCESS = "success"
55 FAILED = "failed"
56 REGRESSION = "regression"
57 ROLLED_BACK = "rolled_back"
58 SKIPPED = "skipped"
61@dataclass
62class CodeChange:
63 """A code change generated from an evolution proposal."""
64 proposal_id: str
65 file_path: str
66 description: str
67 diff: str # Unified diff
68 old_content: str = "" # Pre-change content (for rollback)
69 new_content: str = "" # Post-change content
70 language: str = "python"
71 risk_level: str = "medium" # low / medium / high
72 test_results: dict[str, Any] = field(default_factory=dict)
74@dataclass
75class EvolutionRun:
76 """Record of a single evolution execution."""
77 run_id: str
78 proposal: EvolutionProposal
79 changes: list[CodeChange] = field(default_factory=list)
80 result: ChangeResult = ChangeResult.SKIPPED
81 started_at: float = 0.0
82 finished_at: float = 0.0
83 rollback_info: dict[str, Any] = field(default_factory=dict)
84 metrics_before: dict[str, Any] = field(default_factory=dict)
85 metrics_after: dict[str, Any] = field(default_factory=dict)
88# ── Code Generator ──────────────────────────────────────────────────
90class CodeGenerator:
91 """Generate code changes from evolution proposals using LLM.
93 Takes a high-level proposal (e.g., 'add retry logic to API calls')
94 and generates concrete unified diffs.
95 """
97 SYSTEM_PROMPT = """You are an expert Python code generator for an agent framework.
98Given an evolution proposal, generate precise, minimal code changes.
99Output ONLY a unified diff format. No explanations, no markdown code blocks.
100Focus on: correctness, backward compatibility, performance, readability."""
102 def __init__(self, llm_client=None):
103 self._llm = llm_client
105 async def generate(self, proposal: EvolutionProposal, codebase: dict[str, str]) -> list[CodeChange]:
106 """Generate code changes for a proposal.
108 Args:
109 proposal: The evolution proposal to implement
110 codebase: Dict of {file_path: file_content} for context
112 Returns:
113 List of CodeChange objects with unified diffs.
114 """
115 changes: list[CodeChange] = []
117 if not self._llm:
118 # Fallback: generate skeleton changes based on proposal type
119 return self._skeleton_generate(proposal)
121 for target_file in proposal.target_files:
122 content = codebase.get(target_file, "")
123 prompt = self._build_prompt(proposal, target_file, content)
125 response = await self._llm.complete(prompt, system=self.SYSTEM_PROMPT)
126 diff = self._extract_diff(response)
128 if diff:
129 new_content = self._apply_diff(content, diff)
130 changes.append(CodeChange(
131 proposal_id=proposal.id,
132 file_path=target_file,
133 description=proposal.description,
134 diff=diff,
135 old_content=content,
136 new_content=new_content,
137 risk_level=proposal.risk_level,
138 ))
140 return changes
142 def _skeleton_generate(self, proposal: EvolutionProposal) -> list[CodeChange]:
143 """Skeleton code generation for proposals (no LLM available)."""
144 changes = []
145 for target_file in proposal.target_files:
146 changes.append(CodeChange(
147 proposal_id=proposal.id,
148 file_path=target_file,
149 description=proposal.description,
150 diff=f"# SKELETON: {proposal.description}\n# File: {target_file}",
151 risk_level=proposal.risk_level,
152 ))
153 return changes
155 def _build_prompt(self, proposal: EvolutionProposal, target_file: str, content: str) -> str:
156 return f"""Proposal: {proposal.description}
157Category: {proposal.category}
158File: {target_file}
159Priority: {proposal.priority}
161Current file content:
162```python
163{content[:3000]}
164```
166Generate a unified diff to implement this change. Focus only on {target_file}."""
168 def _extract_diff(self, response: str) -> str:
169 """Extract unified diff from LLM response."""
170 if response.startswith("---") or response.startswith("diff "):
171 return response
172 if "```diff" in response:
173 start = response.index("```diff") + 7
174 end = response.index("```", start) if "```" in response[start:] else len(response)
175 return response[start:end].strip()
176 return response.strip()
178 def _apply_diff(self, content: str, diff: str) -> str:
179 """Simple diff application for well-known patterns."""
180 if diff.startswith("# SKELETON"):
181 return content
182 try:
183 result = subprocess.run(
184 ["patch", "-o", "-", "-"],
185 input=f"--- a/file\n+++ b/file\n{diff}".encode(),
186 capture_output=True,
187 timeout=10,
188 )
189 if result.returncode == 0:
190 return result.stdout.decode()
191 except Exception:
192 pass
193 return content
196# ── Auto Tester ─────────────────────────────────────────────────────
198class AutoTester:
199 """Run test suite to validate changes."""
201 def __init__(self, test_dir: str = "", pytest_args: str = ""):
202 self._test_dir = Path(test_dir) if test_dir else Path("tests")
203 self._pytest_args = pytest_args or "-x --tb=short -q"
205 async def run_tests(self) -> dict[str, Any]:
206 """Run the test suite.
208 Returns:
209 Dict with passed/failed/total counts and error details.
210 """
211 try:
212 result = subprocess.run(
213 ["python3", "-m", "pytest", str(self._test_dir)] + self._pytest_args.split(),
214 capture_output=True, text=True, timeout=120,
215 cwd=str(self._test_dir.parent) if self._test_dir.parent else None,
216 )
217 passed = "passed" in result.stdout.lower() or result.returncode == 0
218 return {
219 "passed": passed,
220 "total": self._parse_test_count(result.stdout),
221 "failures": result.returncode if not passed else 0,
222 "output": result.stdout[-1000:],
223 "duration": 0,
224 }
225 except FileNotFoundError:
226 return {"passed": True, "total": 0, "failures": 0, "output": "pytest not installed", "duration": 0}
227 except Exception as e:
228 return {"passed": False, "total": 0, "failures": 1, "output": str(e), "duration": 0}
230 def _parse_test_count(self, output: str) -> int:
231 """Parse test count from pytest output."""
232 for line in output.split("\n"):
233 if "passed" in line.lower():
234 try:
235 return int(line.strip().split()[0])
236 except (ValueError, IndexError):
237 pass
238 return 0
241# ── Rollback Manager ─────────────────────────────────────────────────
243class RollbackManager:
244 """Instant undo of any auto-applied change."""
246 def __init__(self, backup_dir: str = ""):
247 self._backup_dir = Path(backup_dir) if backup_dir else Path.home() / ".agentos" / "evolution" / "backups"
248 self._backup_dir.mkdir(parents=True, exist_ok=True)
249 self._history: list[dict[str, Any]] = []
251 def snapshot(self, file_path: str, content: str) -> str:
252 """Create a backup snapshot of a file before modification."""
253 snapshot_id = hashlib.sha256(f"{file_path}:{time.time()}".encode()).hexdigest()[:12]
254 snapshot_path = self._backup_dir / f"{snapshot_id}.bak"
255 snapshot_path.write_text(content, encoding="utf-8")
256 self._history.append({
257 "snapshot_id": snapshot_id,
258 "file_path": file_path,
259 "timestamp": time.time(),
260 "size": len(content),
261 })
262 return snapshot_id
264 def rollback(self, snapshot_id: str) -> bool:
265 """Restore file from snapshot."""
266 snapshot_path = self._backup_dir / f"{snapshot_id}.bak"
267 if not snapshot_path.exists():
268 return False
270 for entry in self._history:
271 if entry["snapshot_id"] == snapshot_id:
272 target = Path(entry["file_path"])
273 target.write_text(snapshot_path.read_text(encoding="utf-8"), encoding="utf-8")
274 return True
276 return False
278 def get_history(self, limit: int = 20) -> list[dict[str, Any]]:
279 """Get recent evolution history."""
280 return sorted(self._history, key=lambda x: x["timestamp"], reverse=True)[:limit]
283# ── A/B Evaluator ───────────────────────────────────────────────────
285class ABEvaluator:
286 """Compare agent performance before and after evolution changes."""
288 def __init__(self, test_cases: list[dict[str, str]] | None = None):
289 self._test_cases = test_cases or []
290 self._results_before: list[dict] = []
291 self._results_after: list[dict] = []
293 async def evaluate_before(self, agent) -> list[dict]:
294 """Run evaluation before changes."""
295 self._results_before = await self._run_eval_loop(agent)
296 return self._results_before
298 async def evaluate_after(self, agent) -> list[dict]:
299 """Run evaluation after changes."""
300 self._results_after = await self._run_eval_loop(agent)
301 return self._results_after
303 def compare(self) -> dict[str, Any]:
304 """Compare before/after results."""
305 if not self._results_before or not self._results_after:
306 return {"status": "no_data"}
308 before_success = sum(1 for r in self._results_before if r.get("passed", False))
309 after_success = sum(1 for r in self._results_after if r.get("passed", False))
310 total = max(len(self._results_before), len(self._results_after))
312 return {
313 "before_pass_rate": before_success / total if total else 0,
314 "after_pass_rate": after_success / total if total else 0,
315 "improvement": (after_success - before_success) / total if total else 0,
316 "regressions": after_success < before_success,
317 "total_cases": total,
318 }
320 async def _run_eval_loop(self, agent) -> list[dict]:
321 """Run evaluation loop."""
322 results = []
323 for case in self._test_cases:
324 try:
325 result = await agent.run(case.get("input", ""))
326 passed = case.get("expected", "") in str(result)
327 results.append({"case": case.get("id", ""), "passed": passed, "output": str(result)[:500]})
328 except Exception as e:
329 results.append({"case": case.get("id", ""), "passed": False, "error": str(e)})
330 return results
333# ── Evolution Journal ───────────────────────────────────────────────
335class EvolutionJournal:
336 """Complete audit trail of every evolution step."""
338 def __init__(self, journal_path: str = ""):
339 self._path = Path(journal_path) if journal_path else Path.home() / ".agentos" / "evolution" / "journal.jsonl"
340 self._path.parent.mkdir(parents=True, exist_ok=True)
342 def log(self, entry: dict[str, Any]):
343 """Append an entry to the journal."""
344 entry["_timestamp"] = datetime.now().isoformat()
345 with open(self._path, "a", encoding="utf-8") as f:
346 f.write(json.dumps(entry, ensure_ascii=False) + "\n")
348 def read(self, limit: int = 50) -> list[dict]:
349 """Read recent journal entries."""
350 if not self._path.exists():
351 return []
352 entries = []
353 with open(self._path, "r", encoding="utf-8") as f:
354 for line in f:
355 entries.append(json.loads(line))
356 return entries[-limit:]
358 def stats(self) -> dict[str, Any]:
359 """Compute evolution statistics from journal."""
360 entries = self.read(limit=10000)
361 if not entries:
362 return {}
364 by_type: dict[str, int] = {}
365 by_result: dict[str, int] = {}
366 for entry in entries:
367 by_type[entry.get("type", "unknown")] = by_type.get(entry.get("type", "unknown"), 0) + 1
368 by_result[entry.get("result", "unknown")] = by_result.get(entry.get("result", "unknown"), 0) + 1
370 return {
371 "total_entries": len(entries),
372 "by_type": by_type,
373 "by_result": by_result,
374 "first_entry": entries[0].get("_timestamp", ""),
375 "last_entry": entries[-1].get("_timestamp", ""),
376 }
379# ── AutoPilot ───────────────────────────────────────────────────────
381class AutoPilot:
382 """Closed-loop self-evolution engine.
384 The AutoPilot orchestrates the entire evolution pipeline:
385 signals → insights → proposals → code changes → tests → apply/rollback.
387 Usage:
388 from agentos.evolution import SignalCollector, EvolutionEngine, Learner, AutoPilot
390 collector = SignalCollector()
391 engine = EvolutionEngine()
392 learner = Learner(collector, engine)
393 autopilot = AutoPilot(engine, learner, mode=AutoPilotMode.CONFIDENCE_GATED)
395 # After accumulating signals...
396 run = await autopilot.evolve()
397 print(f"Evolved: {run.result} — {len(run.changes)} changes applied")
398 """
400 def __init__(
401 self,
402 engine: EvolutionEngine,
403 learner: Learner,
404 mode: AutoPilotMode = AutoPilotMode.CONFIDENCE_GATED,
405 confidence_threshold: float = 0.7,
406 code_generator: Optional[CodeGenerator] = None,
407 tester: Optional[AutoTester] = None,
408 rollback: Optional[RollbackManager] = None,
409 evaluator: Optional[ABEvaluator] = None,
410 journal: Optional[EvolutionJournal] = None,
411 ):
412 self.engine = engine
413 self.learner = learner
414 self.mode = mode
415 self.confidence_threshold = confidence_threshold
417 self.codegen = code_generator or CodeGenerator()
418 self.tester = tester or AutoTester()
419 self.rollback = rollback or RollbackManager()
420 self.evaluator = evaluator or ABEvaluator()
421 self.journal = journal or EvolutionJournal()
423 self._run_history: list[EvolutionRun] = []
425 async def evolve(self, agent=None) -> list[EvolutionRun]:
426 """Execute one complete evolution cycle.
428 1. Analyze signals → generate insights
429 2. Convert insights → proposals
430 3. For each proposal: generate code → test → apply/rollback
431 4. Journal everything
433 Args:
434 agent: Optional agent instance for A/B evaluation.
436 Returns:
437 List of EvolutionRun records for this cycle.
438 """
439 # Step 1: Analyze signals
440 insights = self.learner.analyze()
442 # Step 2: Generate proposals
443 proposals = []
444 for insight in insights:
445 proposal = self.learner.propose_from_insight(insight)
446 proposals.append(proposal)
448 # Step 3: Gate by confidence and mode
449 runs: list[EvolutionRun] = []
450 for proposal in proposals:
451 run = await self._process_proposal(proposal, agent)
452 runs.append(run)
453 self._run_history.append(run)
455 # Step 4: Journal results
456 self.journal.log({
457 "type": "evolution_cycle",
458 "insights": len(insights),
459 "proposals": len(proposals),
460 "applied": sum(1 for r in runs if r.result == ChangeResult.SUCCESS),
461 "failed": sum(1 for r in runs if r.result == ChangeResult.FAILED),
462 "rolled_back": sum(1 for r in runs if r.result == ChangeResult.ROLLED_BACK),
463 })
465 return runs
467 async def _process_proposal(self, proposal: EvolutionProposal, agent=None) -> EvolutionRun:
468 """Process a single proposal through the pipeline."""
469 run = EvolutionRun(
470 run_id=f"ev_{proposal.id}",
471 proposal=proposal,
472 started_at=time.time(),
473 )
475 # Check if we should proceed based on mode
476 if not self._should_proceed(proposal):
477 run.result = ChangeResult.SKIPPED
478 run.finished_at = time.time()
479 return run
481 # Snapshot before
482 for target in proposal.target_files:
483 if os.path.exists(target):
484 content = Path(target).read_text(encoding="utf-8")
485 self.rollback.snapshot(target, content)
487 # Evaluate before (if agent provided)
488 if agent:
489 run.metrics_before = await self.evaluator.compare()
491 # Generate code changes
492 codebase = {}
493 for target in proposal.target_files:
494 if os.path.exists(target):
495 codebase[target] = Path(target).read_text(encoding="utf-8")
497 changes = await self.codegen.generate(proposal, codebase)
499 if not changes:
500 run.result = ChangeResult.FAILED
501 run.finished_at = time.time()
502 return run
504 run.changes = changes
506 # Apply changes
507 for change in changes:
508 try:
509 if change.new_content:
510 Path(change.file_path).write_text(change.new_content, encoding="utf-8")
511 except Exception:
512 # Rollback and fail
513 for ch in run.changes:
514 self.rollback.rollback(ch.proposal_id)
515 run.result = ChangeResult.ROLLED_BACK
516 run.finished_at = time.time()
517 return run
519 # Run tests
520 test_results = await self.tester.run_tests()
522 if not test_results.get("passed", False):
523 # Regression detected — rollback
524 for change in changes:
525 if change.old_content:
526 Path(change.file_path).write_text(change.old_content, encoding="utf-8")
527 run.result = ChangeResult.REGRESSION
528 run.rollback_info = {"test_results": test_results}
529 run.finished_at = time.time()
530 return run
532 # Success!
533 run.result = ChangeResult.SUCCESS
534 run.test_results = test_results
536 # Evaluate after (if agent provided)
537 if agent:
538 run.metrics_after = await self.evaluator.compare()
540 # Update proposal status
541 proposal.status = EvolutionStatus.APPLIED
543 run.finished_at = time.time()
544 return run
546 def _should_proceed(self, proposal: EvolutionProposal) -> bool:
547 """Determine if we should proceed with this proposal based on mode."""
548 if self.mode == AutoPilotMode.SUGGEST_ONLY:
549 return False
551 if self.mode == AutoPilotMode.ASK_BEFORE:
552 # User interaction required — return False, caller must handle
553 return False
555 if self.mode == AutoPilotMode.CONFIDENCE_GATED:
556 return proposal.confidence >= self.confidence_threshold
558 if self.mode == AutoPilotMode.FULL_AUTO:
559 # Only safe changes in FULL_AUTO
560 return proposal.risk_level in ("low", "medium")
562 return False
564 def get_run_history(self, limit: int = 20) -> list[EvolutionRun]:
565 """Get recent evolution runs."""
566 return self._run_history[-limit:]
568 def get_stats(self) -> dict[str, Any]:
569 """Get AutoPilot statistics."""
570 runs = self._run_history
571 return {
572 "total_runs": len(runs),
573 "successful": sum(1 for r in runs if r.result == ChangeResult.SUCCESS),
574 "failed": sum(1 for r in runs if r.result == ChangeResult.FAILED),
575 "regressions": sum(1 for r in runs if r.result == ChangeResult.REGRESSION),
576 "rolled_back": sum(1 for r in runs if r.result == ChangeResult.ROLLED_BACK),
577 "total_changes": sum(len(r.changes) for r in runs),
578 "journal_stats": self.journal.stats(),
579 "rollback_history": len(self.rollback.get_history()),
580 }