Coverage for mcp_bridge/tools/agent_manager.py: 70%
363 statements
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
1"""
2Agent Manager for Stravinsky.
4Spawns background agents using Claude Code CLI with full tool access.
5This replaces the simple model-only invocation with true agentic execution.
6"""
8import json
9import logging
10import os
11import shutil
12import signal
13import subprocess
14import threading
15import time
16from dataclasses import asdict, dataclass
17from datetime import datetime
18from pathlib import Path
19from typing import Any
21logger = logging.getLogger(__name__)
23# Model routing configuration
24# Specialized agents call external models via MCP tools:
25# explore/dewey/document_writer/multimodal → invoke_gemini(gemini-3-flash)
26# frontend → invoke_gemini(gemini-3-pro-high)
27# delphi → invoke_openai(gpt-5.2)
28# Non-specialized coding tasks use Claude CLI with --model sonnet
29AGENT_MODEL_ROUTING = {
30 # Specialized agents - no CLI model flag, they call invoke_* tools
31 "explore": None,
32 "dewey": None,
33 "document_writer": None,
34 "multimodal": None,
35 "frontend": None,
36 "delphi": None,
37 "research-lead": None, # Hierarchical orchestrator using gemini-3-flash
38 "implementation-lead": None, # Hierarchical orchestrator using haiku
39 # Planner uses Opus for superior reasoning about dependencies and parallelization
40 "planner": "opus",
41 # Default for unknown agent types (coding tasks) - use Sonnet 4.5
42 "_default": "sonnet",
43}
45# Cost tier classification (from oh-my-opencode pattern)
46AGENT_COST_TIERS = {
47 "explore": "CHEAP", # Uses gemini-3-flash
48 "dewey": "CHEAP", # Uses gemini-3-flash
49 "document_writer": "CHEAP", # Uses gemini-3-flash
50 "multimodal": "CHEAP", # Uses gemini-3-flash
51 "research-lead": "CHEAP", # Uses gemini-3-flash
52 "implementation-lead": "CHEAP", # Uses haiku
53 "frontend": "MEDIUM", # Uses gemini-3-pro-high
54 "delphi": "EXPENSIVE", # Uses gpt-5.2 (OpenAI GPT)
55 "planner": "EXPENSIVE", # Uses Claude Opus 4.5
56 "_default": "EXPENSIVE", # Claude Sonnet 4.5 via CLI
57}
59# Display model names for output formatting (user-visible)
60AGENT_DISPLAY_MODELS = {
61 "explore": "gemini-3-flash",
62 "dewey": "gemini-3-flash",
63 "document_writer": "gemini-3-flash",
64 "multimodal": "gemini-3-flash",
65 "research-lead": "gemini-3-flash",
66 "implementation-lead": "haiku",
67 "frontend": "gemini-3-pro-high",
68 "delphi": "gpt-5.2",
69 "planner": "opus-4.5",
70 "_default": "sonnet-4.5",
71}
73# Cost tier emoji indicators for visual differentiation
74# Colors indicate cost: 🟢 cheap/free, 🔵 medium, 🟣 expensive (GPT), 🟠 Claude
75COST_TIER_EMOJI = {
76 "CHEAP": "🟢", # Free/cheap models (gemini-3-flash, haiku)
77 "MEDIUM": "🔵", # Medium cost (gemini-3-pro-high)
78 "EXPENSIVE": "🟣", # Expensive models (gpt-5.2, opus)
79}
81# Model family indicators
82MODEL_FAMILY_EMOJI = {
83 "gemini-3-flash": "🟢",
84 "gemini-3-pro-high": "🔵",
85 "haiku": "🟢",
86 "sonnet-4.5": "🟠",
87 "opus-4.5": "🟣",
88 "gpt-5.2": "🟣",
89}
91# ANSI color codes for terminal output
92class Colors:
93 """ANSI color codes for colorized terminal output."""
94 RESET = "\033[0m"
95 BOLD = "\033[1m"
96 DIM = "\033[2m"
98 # Foreground colors
99 BLACK = "\033[30m"
100 RED = "\033[31m"
101 GREEN = "\033[32m"
102 YELLOW = "\033[33m"
103 BLUE = "\033[34m"
104 MAGENTA = "\033[35m"
105 CYAN = "\033[36m"
106 WHITE = "\033[37m"
108 # Bright foreground colors
109 BRIGHT_BLACK = "\033[90m"
110 BRIGHT_RED = "\033[91m"
111 BRIGHT_GREEN = "\033[92m"
112 BRIGHT_YELLOW = "\033[93m"
113 BRIGHT_BLUE = "\033[94m"
114 BRIGHT_MAGENTA = "\033[95m"
115 BRIGHT_CYAN = "\033[96m"
116 BRIGHT_WHITE = "\033[97m"
119def get_agent_emoji(agent_type: str) -> str:
120 """Get the colored emoji indicator for an agent based on its cost tier."""
121 cost_tier = AGENT_COST_TIERS.get(agent_type, AGENT_COST_TIERS["_default"])
122 return COST_TIER_EMOJI.get(cost_tier, "⚪")
125def get_model_emoji(model_name: str) -> str:
126 """Get the colored emoji indicator for a model."""
127 return MODEL_FAMILY_EMOJI.get(model_name, "⚪")
130def colorize_agent_spawn_message(
131 cost_emoji: str,
132 agent_type: str,
133 display_model: str,
134 description: str,
135 task_id: str,
136) -> str:
137 """
138 Create a colorized agent spawn message with ANSI color codes.
140 Format:
141 🟢 explore:gemini-3-flash('Find auth...') ⏳
142 task_id=agent_abc123
144 With colors:
145 🟢 {CYAN}explore{RESET}:{YELLOW}gemini-3-flash{RESET}('{BOLD}Find auth...{RESET}') ⏳
146 task_id={BRIGHT_BLACK}agent_abc123{RESET}
147 """
148 short_desc = (description or "")[:50].strip()
150 # Build colorized message
151 colored_message = (
152 f"{cost_emoji} "
153 f"{Colors.CYAN}{agent_type}{Colors.RESET}:"
154 f"{Colors.YELLOW}{display_model}{Colors.RESET}"
155 f"('{Colors.BOLD}{short_desc}{Colors.RESET}') "
156 f"{Colors.BRIGHT_GREEN}⏳{Colors.RESET}\n"
157 f"task_id={Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}"
158 )
159 return colored_message
162@dataclass
163class AgentTask:
164 """Represents a background agent task with full tool access."""
166 id: str
167 prompt: str
168 agent_type: str # explore, dewey, frontend, delphi, etc.
169 description: str
170 status: str # pending, running, completed, failed, cancelled
171 created_at: str
172 parent_session_id: str | None = None
173 started_at: str | None = None
174 completed_at: str | None = None
175 result: str | None = None
176 error: str | None = None
177 pid: int | None = None
178 timeout: int = 300 # Default 5 minutes
179 progress: dict[str, Any] | None = None # tool calls, last update
182@dataclass
183class AgentProgress:
184 """Progress tracking for a running agent."""
186 tool_calls: int = 0
187 last_tool: str | None = None
188 last_message: str | None = None
189 last_update: str | None = None
192class AgentManager:
193 """
194 Manages background agent execution using Claude Code CLI.
196 Key features:
197 - Spawns agents with full tool access via `claude -p`
198 - Tracks task status and progress
199 - Persists state to .stravinsky/agents.json
200 - Provides notification mechanism for task completion
201 """
203 # Dynamic CLI path - find claude in PATH, fallback to common locations
204 CLAUDE_CLI = shutil.which("claude") or "/opt/homebrew/bin/claude"
206 def __init__(self, base_dir: str | None = None):
207 # Initialize lock FIRST - used by _save_tasks and _load_tasks
208 self._lock = threading.RLock()
210 if base_dir:
211 self.base_dir = Path(base_dir)
212 else:
213 self.base_dir = Path.cwd() / ".stravinsky"
215 self.agents_dir = self.base_dir / "agents"
216 self.state_file = self.base_dir / "agents.json"
218 self.base_dir.mkdir(parents=True, exist_ok=True)
219 self.agents_dir.mkdir(parents=True, exist_ok=True)
221 if not self.state_file.exists():
222 self._save_tasks({})
224 # In-memory tracking for running processes
225 self._processes: dict[str, subprocess.Popen] = {}
226 self._notification_queue: dict[str, list[dict[str, Any]]] = {}
228 def _load_tasks(self) -> dict[str, Any]:
229 """Load tasks from persistent storage."""
230 with self._lock:
231 try:
232 if not self.state_file.exists():
233 return {}
234 with open(self.state_file) as f:
235 return json.load(f)
236 except (json.JSONDecodeError, FileNotFoundError):
237 return {}
239 def _save_tasks(self, tasks: dict[str, Any]):
240 """Save tasks to persistent storage."""
241 with self._lock, open(self.state_file, "w") as f:
242 json.dump(tasks, f, indent=2)
244 def _update_task(self, task_id: str, **kwargs):
245 """Update a task's fields."""
246 with self._lock:
247 tasks = self._load_tasks()
248 if task_id in tasks:
249 tasks[task_id].update(kwargs)
250 self._save_tasks(tasks)
252 def get_task(self, task_id: str) -> dict[str, Any] | None:
253 """Get a task by ID."""
254 tasks = self._load_tasks()
255 return tasks.get(task_id)
257 def list_tasks(self, parent_session_id: str | None = None) -> list[dict[str, Any]]:
258 """List all tasks, optionally filtered by parent session."""
259 tasks = self._load_tasks()
260 task_list = list(tasks.values())
262 if parent_session_id:
263 task_list = [t for t in task_list if t.get("parent_session_id") == parent_session_id]
265 return task_list
267 def spawn(
268 self,
269 token_store: Any,
270 prompt: str,
271 agent_type: str = "explore",
272 description: str = "",
273 parent_session_id: str | None = None,
274 system_prompt: str | None = None,
275 model: str = "gemini-3-flash",
276 thinking_budget: int = 0,
277 timeout: int = 300,
278 ) -> str:
279 """
280 Spawn a new background agent.
282 Args:
283 prompt: The task prompt for the agent
284 agent_type: Type of agent (explore, dewey, frontend, delphi)
285 description: Short description for status display
286 parent_session_id: Optional parent session for notifications
287 system_prompt: Optional custom system prompt
288 model: Model to use (gemini-3-flash, claude, etc.)
289 timeout: Maximum execution time in seconds
291 Returns:
292 Task ID for tracking
293 """
294 import uuid as uuid_module # Local import for MCP context
296 task_id = f"agent_{uuid_module.uuid4().hex[:8]}"
298 task = AgentTask(
299 id=task_id,
300 prompt=prompt,
301 agent_type=agent_type,
302 description=description or prompt[:50],
303 status="pending",
304 created_at=datetime.now().isoformat(),
305 parent_session_id=parent_session_id,
306 timeout=timeout,
307 )
309 # Persist task
310 with self._lock:
311 tasks = self._load_tasks()
312 tasks[task_id] = asdict(task)
313 self._save_tasks(tasks)
315 # Start background execution
316 self._execute_agent(
317 task_id, token_store, prompt, agent_type, system_prompt, model, thinking_budget, timeout
318 )
320 return task_id
322 def _execute_agent(
323 self,
324 task_id: str,
325 token_store: Any,
326 prompt: str,
327 agent_type: str,
328 system_prompt: str | None = None,
329 model: str = "gemini-3-flash",
330 thinking_budget: int = 0,
331 timeout: int = 300,
332 ):
333 """Execute agent using Claude CLI with full tool access.
335 Uses `claude -p` to spawn a background agent with complete tool access,
336 just like oh-my-opencode's Sisyphus implementation.
337 """
339 def run_agent():
340 log_file = self.agents_dir / f"{task_id}.log"
341 output_file = self.agents_dir / f"{task_id}.out"
343 self._update_task(task_id, status="running", started_at=datetime.now().isoformat())
345 try:
346 # Prepare full prompt with system prompt if provided
347 full_prompt = prompt
348 if system_prompt:
349 full_prompt = f"{system_prompt}\n\n---\n\n{prompt}"
351 logger.info(f"[AgentManager] Spawning Claude CLI agent {task_id} ({agent_type})")
353 # Build Claude CLI command with full tool access
354 # Using `claude -p` for non-interactive mode with prompt
355 cmd = [
356 self.CLAUDE_CLI,
357 "-p",
358 full_prompt,
359 "--output-format",
360 "text",
361 "--dangerously-skip-permissions", # Critical: bypass permission prompts
362 ]
364 # Model routing:
365 # - Specialized agents (explore/dewey/etc): None = use CLI default, they call invoke_*
366 # - Unknown agent types (coding tasks): Use Sonnet 4.5
367 if agent_type in AGENT_MODEL_ROUTING:
368 cli_model = AGENT_MODEL_ROUTING[agent_type] # None for specialized
369 else:
370 cli_model = AGENT_MODEL_ROUTING.get("_default", "sonnet")
372 if cli_model:
373 cmd.extend(["--model", cli_model])
374 logger.info(f"[AgentManager] Using --model {cli_model} for {agent_type} agent")
376 # Add system prompt file if we have one
377 if system_prompt:
378 system_file = self.agents_dir / f"{task_id}.system"
379 system_file.write_text(system_prompt)
380 cmd.extend(["--system-prompt", str(system_file)])
382 # Execute Claude CLI as subprocess with full tool access
383 logger.info(f"[AgentManager] Running: {' '.join(cmd[:3])}...")
385 # Use PIPE for stderr to capture it properly
386 # (Previously used file handle which was closed before process finished)
387 process = subprocess.Popen(
388 cmd,
389 stdin=subprocess.DEVNULL, # Critical: prevent stdin blocking
390 stdout=subprocess.PIPE,
391 stderr=subprocess.PIPE,
392 text=True,
393 cwd=str(Path.cwd()),
394 env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "stravinsky-agent"},
395 start_new_session=True, # Allow process group management
396 )
398 # Track the process
399 self._processes[task_id] = process
400 self._update_task(task_id, pid=process.pid)
402 # Wait for completion with timeout
403 try:
404 stdout, stderr = process.communicate(timeout=timeout)
405 result = stdout.strip() if stdout else ""
407 # Write stderr to log file
408 if stderr:
409 log_file.write_text(stderr)
411 if process.returncode == 0:
412 output_file.write_text(result)
413 self._update_task(
414 task_id,
415 status="completed",
416 result=result,
417 completed_at=datetime.now().isoformat(),
418 )
419 logger.info(f"[AgentManager] Agent {task_id} completed successfully")
420 else:
421 error_msg = f"Claude CLI exited with code {process.returncode}"
422 if stderr:
423 error_msg += f"\n{stderr}"
424 self._update_task(
425 task_id,
426 status="failed",
427 error=error_msg,
428 completed_at=datetime.now().isoformat(),
429 )
430 logger.error(f"[AgentManager] Agent {task_id} failed: {error_msg}")
432 except subprocess.TimeoutExpired:
433 process.kill()
434 self._update_task(
435 task_id,
436 status="failed",
437 error=f"Agent timed out after {timeout}s",
438 completed_at=datetime.now().isoformat(),
439 )
440 logger.warning(f"[AgentManager] Agent {task_id} timed out")
442 except FileNotFoundError:
443 error_msg = f"Claude CLI not found at {self.CLAUDE_CLI}. Install with: npm install -g @anthropic-ai/claude-code"
444 log_file.write_text(error_msg)
445 self._update_task(
446 task_id,
447 status="failed",
448 error=error_msg,
449 completed_at=datetime.now().isoformat(),
450 )
451 logger.error(f"[AgentManager] {error_msg}")
453 except Exception as e:
454 error_msg = str(e)
455 log_file.write_text(error_msg)
456 self._update_task(
457 task_id,
458 status="failed",
459 error=error_msg,
460 completed_at=datetime.now().isoformat(),
461 )
462 logger.exception(f"[AgentManager] Agent {task_id} exception")
464 finally:
465 self._processes.pop(task_id, None)
466 self._notify_completion(task_id)
468 # Run in background thread
469 thread = threading.Thread(target=run_agent, daemon=True)
470 thread.start()
472 def _notify_completion(self, task_id: str):
473 """Queue notification for parent session."""
474 task = self.get_task(task_id)
475 if not task:
476 return
478 parent_id = task.get("parent_session_id")
479 if parent_id:
480 if parent_id not in self._notification_queue:
481 self._notification_queue[parent_id] = []
483 self._notification_queue[parent_id].append(task)
484 logger.info(f"[AgentManager] Queued notification for {parent_id}: task {task_id}")
486 def get_pending_notifications(self, session_id: str) -> list[dict[str, Any]]:
487 """Get and clear pending notifications for a session."""
488 notifications = self._notification_queue.pop(session_id, [])
489 return notifications
491 def cancel(self, task_id: str) -> bool:
492 """Cancel a running agent task."""
493 task = self.get_task(task_id)
494 if not task:
495 return False
497 if task["status"] != "running":
498 return False
500 process = self._processes.get(task_id)
501 if process:
502 try:
503 os.killpg(os.getpgid(process.pid), signal.SIGTERM)
504 process.wait(timeout=5)
505 except Exception as e:
506 logger.warning(f"[AgentManager] Failed to kill process for {task_id}: {e}")
507 try:
508 process.kill()
509 except:
510 pass
512 self._update_task(task_id, status="cancelled", completed_at=datetime.now().isoformat())
514 return True
516 def stop_all(self, clear_history: bool = False) -> int:
517 """
518 Stop all running agents and optionally clear task history.
520 Args:
521 clear_history: If True, also remove completed/failed tasks from history
523 Returns:
524 Number of tasks stopped/cleared
525 """
526 tasks = self._load_tasks()
527 stopped_count = 0
529 # Stop running tasks
530 for task_id, task in list(tasks.items()):
531 if task.get("status") == "running":
532 self.cancel(task_id)
533 stopped_count += 1
535 # Optionally clear history
536 if clear_history:
537 cleared = len(tasks)
538 self._save_tasks({})
539 self._processes.clear()
540 logger.info(f"[AgentManager] Cleared all {cleared} agent tasks")
541 return cleared
543 return stopped_count
545 def get_output(self, task_id: str, block: bool = False, timeout: float = 30.0) -> str:
546 """
547 Get output from an agent task.
549 Args:
550 task_id: The task ID
551 block: If True, wait for completion
552 timeout: Max seconds to wait if blocking
554 Returns:
555 Formatted task output/status
556 """
557 task = self.get_task(task_id)
558 if not task:
559 return f"Task {task_id} not found."
561 if block and task["status"] == "running":
562 # Poll for completion
563 start = datetime.now()
564 while (datetime.now() - start).total_seconds() < timeout:
565 task = self.get_task(task_id)
566 if not task or task["status"] != "running":
567 break
568 time.sleep(0.5)
570 # Refresh task state after potential blocking wait
571 if not task:
572 return f"Task {task_id} not found."
574 status = task["status"]
575 description = task.get("description", "")
576 agent_type = task.get("agent_type", "unknown")
578 # Get cost-tier emoji for visual differentiation
579 cost_emoji = get_agent_emoji(agent_type)
580 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"])
582 if status == "completed":
583 result = task.get("result", "(no output)")
584 return f"""{cost_emoji} {Colors.BRIGHT_GREEN}✅ Agent Task Completed{Colors.RESET}
586**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}
587**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}')
589**Result**:
590{result}"""
592 elif status == "failed":
593 error = task.get("error", "(no error details)")
594 return f"""{cost_emoji} {Colors.BRIGHT_RED}❌ Agent Task Failed{Colors.RESET}
596**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}
597**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}')
599**Error**:
600{error}"""
602 elif status == "cancelled":
603 return f"""{cost_emoji} {Colors.BRIGHT_YELLOW}⚠️ Agent Task Cancelled{Colors.RESET}
605**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}
606**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}')"""
608 else: # pending or running
609 pid = task.get("pid", "N/A")
610 started = task.get("started_at", "N/A")
611 return f"""{cost_emoji} {Colors.BRIGHT_YELLOW}⏳ Agent Task Running{Colors.RESET}
613**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}
614**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}')
615**PID**: {Colors.DIM}{pid}{Colors.RESET}
616**Started**: {Colors.DIM}{started}{Colors.RESET}
618Use `agent_output` with block=true to wait for completion."""
620 def get_progress(self, task_id: str, lines: int = 20) -> str:
621 """
622 Get real-time progress from a running agent's output.
624 Args:
625 task_id: The task ID
626 lines: Number of lines to show from the end
628 Returns:
629 Recent output lines and status
630 """
631 task = self.get_task(task_id)
632 if not task:
633 return f"Task {task_id} not found."
635 output_file = self.agents_dir / f"{task_id}.out"
636 log_file = self.agents_dir / f"{task_id}.log"
638 status = task["status"]
639 description = task.get("description", "")
640 agent_type = task.get("agent_type", "unknown")
641 pid = task.get("pid")
643 # Zombie Detection: If running but process is gone
644 if status == "running" and pid:
645 try:
646 import psutil
648 if not psutil.pid_exists(pid):
649 status = "failed"
650 self._update_task(
651 task_id,
652 status="failed",
653 error="Agent process died unexpectedly (Zombie detected)",
654 completed_at=datetime.now().isoformat(),
655 )
656 logger.warning(f"[AgentManager] Zombie agent detected: {task_id}")
657 except ImportError:
658 pass
660 # Read recent output
661 output_content = ""
662 if output_file.exists():
663 try:
664 full_content = output_file.read_text()
665 if full_content:
666 output_lines = full_content.strip().split("\n")
667 recent = output_lines[-lines:] if len(output_lines) > lines else output_lines
668 output_content = "\n".join(recent)
669 except Exception:
670 pass
672 # Check log for errors
673 log_content = ""
674 if log_file.exists():
675 try:
676 log_content = log_file.read_text().strip()
677 except Exception:
678 pass
680 # Status emoji
681 status_emoji = {
682 "pending": "⏳",
683 "running": "🔄",
684 "completed": "✅",
685 "failed": "❌",
686 "cancelled": "⚠️",
687 }.get(status, "❓")
689 # Get cost-tier emoji for visual differentiation
690 cost_emoji = get_agent_emoji(agent_type)
691 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"])
693 result = f"""{cost_emoji} {status_emoji} **Agent Progress**
695**Task ID**: {task_id}
696**Agent**: {agent_type}:{display_model}('{description}')
697**Status**: {status}
698"""
700 if output_content:
701 result += f"\n**Recent Output** (last {lines} lines):\n```\n{output_content}\n```"
702 elif status == "running":
703 result += "\n*Agent is working... no output yet.*"
705 if log_content and status == "failed":
706 # Truncate log if too long
707 if len(log_content) > 500:
708 log_content = log_content[:500] + "..."
709 result += f"\n\n**Error Log**:\n```\n{log_content}\n```"
711 return result
714# Global manager instance
715_manager: AgentManager | None = None
716_manager_lock = threading.Lock()
719def get_manager() -> AgentManager:
720 """Get or create the global AgentManager instance."""
721 global _manager
722 if _manager is None:
723 with _manager_lock:
724 # Double-check pattern to avoid race condition
725 if _manager is None:
726 _manager = AgentManager()
727 return _manager
730# Tool interface functions
733async def agent_spawn(
734 prompt: str,
735 agent_type: str = "explore",
736 description: str = "",
737 model: str = "gemini-3-flash",
738 thinking_budget: int = 0,
739 timeout: int = 300,
740 blocking: bool = False,
741) -> str:
742 """
743 Spawn a background agent.
745 Args:
746 prompt: The task for the agent to perform
747 agent_type: Type of agent (explore, dewey, frontend, delphi)
748 description: Short description shown in status
749 model: Model to use (gemini-3-flash, gemini-2.0-flash, claude)
750 thinking_budget: Reserved reasoning tokens
751 timeout: Execution timeout in seconds
752 blocking: If True, wait for completion and return result directly (use for delphi)
754 Returns:
755 Task ID and instructions, or full result if blocking=True
756 """
757 manager = get_manager()
759 # Map agent types to system prompts
760 # ALL agents use invoke_gemini or invoke_openai - NOT Claude directly
761 # explore/dewey/document_writer/multimodal/frontend → gemini-3-flash
762 # delphi → openai gpt-5.2
763 system_prompts = {
764 "explore": """You are a codebase exploration specialist. Find files, patterns, and answer 'where is X?' questions.
766MODEL ROUTING (MANDATORY):
767You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL analysis and reasoning.
768The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file.
770WORKFLOW:
7711. Call invoke_gemini_agentic(prompt="<task description>", model="gemini-3-flash", max_turns=5, agent_context={"agent_type": "explore"})
7722. The agentic model will autonomously explore the codebase using available tools
7733. Return the Gemini response with findings
775RECOMMENDED: max_turns=5 for thorough exploration""",
776 "dewey": """You are a documentation and research specialist. Find implementation examples and official docs.
778MODEL ROUTING (MANDATORY):
779You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL analysis, summarization, and reasoning.
780The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file.
782WORKFLOW:
7831. Call invoke_gemini_agentic(prompt="<task description>", model="gemini-3-flash", max_turns=5, agent_context={"agent_type": "dewey"})
7842. The agentic model will autonomously research and gather information using available tools
7853. Return the Gemini response with findings
787RECOMMENDED: max_turns=5 for comprehensive research""",
788 "frontend": """You are a Senior Frontend Architect & UI Designer.
790MODEL ROUTING (MANDATORY):
791You MUST use invoke_gemini_agentic with model="gemini-3-pro-high" for ALL code generation and design work.
792The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file.
794DESIGN PHILOSOPHY:
795- Anti-Generic: Reject standard layouts. Bespoke, asymmetric, distinctive.
796- Library Discipline: Use existing UI libraries (Shadcn, Radix, MUI) if detected.
797- Stack: React/Vue/Svelte, Tailwind/Custom CSS, semantic HTML5.
799WORKFLOW:
8001. Call invoke_gemini_agentic(prompt="Generate frontend code for: <task>", model="gemini-3-pro-high", max_turns=3, agent_context={"agent_type": "frontend"})
8012. The agentic model will autonomously analyze the codebase and generate code using available tools
8023. Return the generated code
804RECOMMENDED: max_turns=3 for focused code generation""",
805 "delphi": """You are a strategic technical advisor for architecture and hard debugging.
807MODEL ROUTING (MANDATORY):
808You MUST use invoke_openai with model="gpt-5.2" for ALL strategic advice and analysis.
810WORKFLOW:
8111. Gather context about the problem
8122. Call invoke_openai(prompt="<problem description>", model="gpt-5.2", agent_context={"agent_type": "delphi"})
8133. Return the GPT response""",
814 "document_writer": """You are a Technical Documentation Specialist.
816MODEL ROUTING (MANDATORY):
817You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL documentation generation.
818The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file.
820DOCUMENT TYPES: README, API docs, ADRs, user guides, inline docs.
822WORKFLOW:
8231. Call invoke_gemini_agentic(prompt="Write documentation for: <topic>", model="gemini-3-flash", max_turns=3, agent_context={"agent_type": "document_writer"})
8242. The agentic model will autonomously gather context and generate documentation using available tools
8253. Return the documentation
827RECOMMENDED: max_turns=3 for focused documentation generation""",
828 "multimodal": """You interpret media files (PDFs, images, diagrams, screenshots).
830MODEL ROUTING (MANDATORY):
831You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL visual analysis.
832The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file.
834WORKFLOW:
8351. Call invoke_gemini_agentic(prompt="Analyze this file: <path>. Extract: <goal>", model="gemini-3-flash", max_turns=3, agent_context={"agent_type": "multimodal"})
8362. The agentic model will autonomously access and analyze the file using available tools
8373. Return extracted information only
839RECOMMENDED: max_turns=3 for focused visual analysis""",
840 "planner": """You are a pre-implementation planning specialist. You analyze requests and produce structured implementation plans BEFORE any code changes begin.
842PURPOSE:
843- Analyze requests and produce actionable implementation plans
844- Identify dependencies and parallelization opportunities
845- Enable efficient parallel execution by the orchestrator
846- Prevent wasted effort through upfront planning
848METHODOLOGY:
8491. EXPLORE FIRST: Spawn explore agents IN PARALLEL to understand the codebase
8502. DECOMPOSE: Break request into atomic, single-purpose tasks
8513. ANALYZE DEPENDENCIES: What blocks what? What can run in parallel?
8524. ASSIGN AGENTS: Map each task to the right specialist (explore/dewey/frontend/delphi)
8535. OUTPUT STRUCTURED PLAN: Use the required format below
855REQUIRED OUTPUT FORMAT:
856```
857## PLAN: [Brief title]
859### ANALYSIS
860- **Request**: [One sentence summary]
861- **Scope**: [What's in/out of scope]
862- **Risk Level**: [Low/Medium/High]
864### EXECUTION PHASES
866#### Phase 1: [Name] (PARALLEL)
867| Task | Agent | Files | Est |
868|------|-------|-------|-----|
869| [description] | explore | file.py | S/M/L |
871#### Phase 2: [Name] (SEQUENTIAL after Phase 1)
872| Task | Agent | Files | Est |
873|------|-------|-------|-----|
875### AGENT SPAWN COMMANDS
876```python
877# Phase 1 - Fire all in parallel
878agent_spawn(prompt="...", agent_type="explore", description="...")
879```
880```
882CONSTRAINTS:
883- You ONLY plan. You NEVER execute code changes.
884- Every task must have a clear agent assignment
885- Parallel phases must be truly independent
886- Include ready-to-use agent_spawn commands""",
887 "research-lead": """You coordinate research tasks by spawning explore and dewey agents in parallel.
889## Your Role
8901. Receive research objective from Stravinsky
8912. Decompose into parallel search tasks
8923. Spawn explore/dewey agents for each task
8934. Collect and SYNTHESIZE results
8945. Return structured findings (not raw outputs)
896## Output Format
897Always return a Research Brief:
898```json
899{
900 "objective": "Original research goal",
901 "findings": [
902 {"source": "agent_id", "summary": "Key finding", "confidence": "high/medium/low"},
903 ...
904 ],
905 "synthesis": "Combined analysis of all findings",
906 "gaps": ["Information we couldn't find"],
907 "recommendations": ["Suggested next steps"]
908}
909```
911MODEL ROUTING:
912Use invoke_gemini with model="gemini-3-flash" for ALL synthesis work.
913""",
914 "implementation-lead": """You coordinate implementation based on research findings.
916## Your Role
9171. Receive Research Brief from Stravinsky
9182. Create implementation plan
9193. Delegate to specialists:
920 - frontend: UI/visual work
921 - debugger: Fix failures
922 - code-reviewer: Quality checks
9234. Verify with lsp_diagnostics
9245. Return Implementation Report
926## Output Format
927```json
928{
929 "objective": "What was implemented",
930 "files_changed": ["path/to/file.py"],
931 "tests_status": "pass/fail/skipped",
932 "diagnostics": "clean/warnings/errors",
933 "blockers": ["Issues preventing completion"]
934}
935```
937## Escalation Rules
938- After 2 failed attempts → spawn debugger
939- After debugger fails → escalate to Stravinsky with context
940- NEVER call delphi directly
941""",
942 }
944 system_prompt = system_prompts.get(agent_type)
946 # Model routing (MANDATORY - enforced in system prompts):
947 # - explore, dewey, document_writer, multimodal → invoke_gemini(gemini-3-flash)
948 # - frontend → invoke_gemini(gemini-3-pro-high)
949 # - delphi → invoke_openai(gpt-5.2)
950 # - Unknown agent types (coding tasks) → Claude CLI --model sonnet
952 # Get token store for authentication
953 from ..auth.token_store import TokenStore
955 token_store = TokenStore()
957 task_id = manager.spawn(
958 token_store=token_store,
959 prompt=prompt,
960 agent_type=agent_type,
961 description=description or prompt[:50],
962 system_prompt=system_prompt,
963 model=model, # Not used for Claude CLI, kept for API compatibility
964 thinking_budget=thinking_budget, # Not used for Claude CLI, kept for API compatibility
965 timeout=timeout,
966 )
968 # Get display model and cost tier emoji for concise output
969 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"])
970 cost_emoji = get_agent_emoji(agent_type)
971 short_desc = (description or prompt[:50]).strip()
973 # If blocking mode (recommended for delphi), wait for completion
974 if blocking:
975 result = manager.get_output(task_id, block=True, timeout=timeout)
976 blocking_msg = colorize_agent_spawn_message(
977 cost_emoji, agent_type, display_model, short_desc, task_id
978 )
979 return f"{blocking_msg} {Colors.BOLD}[BLOCKING]{Colors.RESET}\n\n{result}"
981 # Enhanced format with ANSI colors: cost_emoji agent:model('description') status_emoji
982 # 🟢 explore:gemini-3-flash('Find auth...') ⏳
983 # With colors: agent type in cyan, model in yellow, description bold
984 return colorize_agent_spawn_message(
985 cost_emoji, agent_type, display_model, short_desc, task_id
986 )
989async def agent_output(task_id: str, block: bool = False) -> str:
990 """
991 Get output from a background agent task.
993 Args:
994 task_id: The task ID from agent_spawn
995 block: If True, wait for the task to complete (up to 30s)
997 Returns:
998 Task status and output
999 """
1000 manager = get_manager()
1001 return manager.get_output(task_id, block=block)
1004async def agent_retry(
1005 task_id: str,
1006 new_prompt: str | None = None,
1007 new_timeout: int | None = None,
1008) -> str:
1009 """
1010 Retry a failed or timed-out background agent.
1012 Args:
1013 task_id: The ID of the task to retry
1014 new_prompt: Optional refined prompt for the retry
1015 new_timeout: Optional new timeout in seconds
1017 Returns:
1018 New Task ID and status
1019 """
1020 manager = get_manager()
1021 task = manager.get_task(task_id)
1023 if not task:
1024 return f"❌ Task {task_id} not found."
1026 if task["status"] in ["running", "pending"]:
1027 return f"⚠️ Task {task_id} is still {task['status']}. Cancel it first if you want to retry."
1029 prompt = new_prompt or task["prompt"]
1030 timeout = new_timeout or task.get("timeout", 300)
1032 return await agent_spawn(
1033 prompt=prompt,
1034 agent_type=task["agent_type"],
1035 description=f"Retry of {task_id}: {task['description']}",
1036 timeout=timeout,
1037 )
1040async def agent_cancel(task_id: str) -> str:
1041 """
1042 Cancel a running background agent.
1044 Args:
1045 task_id: The task ID to cancel
1047 Returns:
1048 Cancellation result
1049 """
1050 manager = get_manager()
1051 success = manager.cancel(task_id)
1053 if success:
1054 return f"✅ Agent task {task_id} has been cancelled."
1055 else:
1056 task = manager.get_task(task_id)
1057 if not task:
1058 return f"❌ Task {task_id} not found."
1059 else:
1060 return f"⚠️ Task {task_id} is not running (status: {task['status']}). Cannot cancel."
1063async def agent_list() -> str:
1064 """
1065 List all background agent tasks.
1067 Returns:
1068 Formatted list of tasks
1069 """
1070 manager = get_manager()
1071 tasks = manager.list_tasks()
1073 if not tasks:
1074 return "No background agent tasks found."
1076 lines = []
1078 for t in sorted(tasks, key=lambda x: x.get("created_at", ""), reverse=True):
1079 status_emoji = {
1080 "pending": "⏳",
1081 "running": "🔄",
1082 "completed": "✅",
1083 "failed": "❌",
1084 "cancelled": "⚠️",
1085 }.get(t["status"], "❓")
1087 agent_type = t.get("agent_type", "unknown")
1088 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"])
1089 cost_emoji = get_agent_emoji(agent_type)
1090 desc = t.get("description", t.get("prompt", "")[:40])
1091 task_id = t["id"]
1093 # Concise format with colors: cost_emoji status agent:model('desc') id=xxx
1094 # Agent type in cyan, model in yellow, task_id in dim
1095 lines.append(
1096 f"{cost_emoji} {status_emoji} "
1097 f"{Colors.CYAN}{agent_type}{Colors.RESET}:"
1098 f"{Colors.YELLOW}{display_model}{Colors.RESET}"
1099 f"('{Colors.BOLD}{desc}{Colors.RESET}') "
1100 f"id={Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}"
1101 )
1103 return "\n".join(lines)
1106async def agent_progress(task_id: str, lines: int = 20) -> str:
1107 """
1108 Get real-time progress from a running background agent.
1110 Shows the most recent output lines from the agent, useful for
1111 monitoring what the agent is currently doing.
1113 Args:
1114 task_id: The task ID from agent_spawn
1115 lines: Number of recent output lines to show (default 20)
1117 Returns:
1118 Recent agent output and status
1119 """
1120 manager = get_manager()
1121 return manager.get_progress(task_id, lines=lines)