Coverage for mcp_bridge/tools/agent_manager.py: 70%

363 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2026-01-10 00:20 -0500

1""" 

2Agent Manager for Stravinsky. 

3 

4Spawns background agents using Claude Code CLI with full tool access. 

5This replaces the simple model-only invocation with true agentic execution. 

6""" 

7 

8import json 

9import logging 

10import os 

11import shutil 

12import signal 

13import subprocess 

14import threading 

15import time 

16from dataclasses import asdict, dataclass 

17from datetime import datetime 

18from pathlib import Path 

19from typing import Any 

20 

21logger = logging.getLogger(__name__) 

22 

23# Model routing configuration 

24# Specialized agents call external models via MCP tools: 

25# explore/dewey/document_writer/multimodal → invoke_gemini(gemini-3-flash) 

26# frontend → invoke_gemini(gemini-3-pro-high) 

27# delphi → invoke_openai(gpt-5.2) 

28# Non-specialized coding tasks use Claude CLI with --model sonnet 

29AGENT_MODEL_ROUTING = { 

30 # Specialized agents - no CLI model flag, they call invoke_* tools 

31 "explore": None, 

32 "dewey": None, 

33 "document_writer": None, 

34 "multimodal": None, 

35 "frontend": None, 

36 "delphi": None, 

37 "research-lead": None, # Hierarchical orchestrator using gemini-3-flash 

38 "implementation-lead": None, # Hierarchical orchestrator using haiku 

39 # Planner uses Opus for superior reasoning about dependencies and parallelization 

40 "planner": "opus", 

41 # Default for unknown agent types (coding tasks) - use Sonnet 4.5 

42 "_default": "sonnet", 

43} 

44 

45# Cost tier classification (from oh-my-opencode pattern) 

46AGENT_COST_TIERS = { 

47 "explore": "CHEAP", # Uses gemini-3-flash 

48 "dewey": "CHEAP", # Uses gemini-3-flash 

49 "document_writer": "CHEAP", # Uses gemini-3-flash 

50 "multimodal": "CHEAP", # Uses gemini-3-flash 

51 "research-lead": "CHEAP", # Uses gemini-3-flash 

52 "implementation-lead": "CHEAP", # Uses haiku 

53 "frontend": "MEDIUM", # Uses gemini-3-pro-high 

54 "delphi": "EXPENSIVE", # Uses gpt-5.2 (OpenAI GPT) 

55 "planner": "EXPENSIVE", # Uses Claude Opus 4.5 

56 "_default": "EXPENSIVE", # Claude Sonnet 4.5 via CLI 

57} 

58 

59# Display model names for output formatting (user-visible) 

60AGENT_DISPLAY_MODELS = { 

61 "explore": "gemini-3-flash", 

62 "dewey": "gemini-3-flash", 

63 "document_writer": "gemini-3-flash", 

64 "multimodal": "gemini-3-flash", 

65 "research-lead": "gemini-3-flash", 

66 "implementation-lead": "haiku", 

67 "frontend": "gemini-3-pro-high", 

68 "delphi": "gpt-5.2", 

69 "planner": "opus-4.5", 

70 "_default": "sonnet-4.5", 

71} 

72 

73# Cost tier emoji indicators for visual differentiation 

74# Colors indicate cost: 🟢 cheap/free, 🔵 medium, 🟣 expensive (GPT), 🟠 Claude 

75COST_TIER_EMOJI = { 

76 "CHEAP": "🟢", # Free/cheap models (gemini-3-flash, haiku) 

77 "MEDIUM": "🔵", # Medium cost (gemini-3-pro-high) 

78 "EXPENSIVE": "🟣", # Expensive models (gpt-5.2, opus) 

79} 

80 

81# Model family indicators 

82MODEL_FAMILY_EMOJI = { 

83 "gemini-3-flash": "🟢", 

84 "gemini-3-pro-high": "🔵", 

85 "haiku": "🟢", 

86 "sonnet-4.5": "🟠", 

87 "opus-4.5": "🟣", 

88 "gpt-5.2": "🟣", 

89} 

90 

91# ANSI color codes for terminal output 

92class Colors: 

93 """ANSI color codes for colorized terminal output.""" 

94 RESET = "\033[0m" 

95 BOLD = "\033[1m" 

96 DIM = "\033[2m" 

97 

98 # Foreground colors 

99 BLACK = "\033[30m" 

100 RED = "\033[31m" 

101 GREEN = "\033[32m" 

102 YELLOW = "\033[33m" 

103 BLUE = "\033[34m" 

104 MAGENTA = "\033[35m" 

105 CYAN = "\033[36m" 

106 WHITE = "\033[37m" 

107 

108 # Bright foreground colors 

109 BRIGHT_BLACK = "\033[90m" 

110 BRIGHT_RED = "\033[91m" 

111 BRIGHT_GREEN = "\033[92m" 

112 BRIGHT_YELLOW = "\033[93m" 

113 BRIGHT_BLUE = "\033[94m" 

114 BRIGHT_MAGENTA = "\033[95m" 

115 BRIGHT_CYAN = "\033[96m" 

116 BRIGHT_WHITE = "\033[97m" 

117 

118 

119def get_agent_emoji(agent_type: str) -> str: 

120 """Get the colored emoji indicator for an agent based on its cost tier.""" 

121 cost_tier = AGENT_COST_TIERS.get(agent_type, AGENT_COST_TIERS["_default"]) 

122 return COST_TIER_EMOJI.get(cost_tier, "⚪") 

123 

124 

125def get_model_emoji(model_name: str) -> str: 

126 """Get the colored emoji indicator for a model.""" 

127 return MODEL_FAMILY_EMOJI.get(model_name, "⚪") 

128 

129 

130def colorize_agent_spawn_message( 

131 cost_emoji: str, 

132 agent_type: str, 

133 display_model: str, 

134 description: str, 

135 task_id: str, 

136) -> str: 

137 """ 

138 Create a colorized agent spawn message with ANSI color codes. 

139 

140 Format: 

141 🟢 explore:gemini-3-flash('Find auth...') ⏳ 

142 task_id=agent_abc123 

143 

144 With colors: 

145 🟢 {CYAN}explore{RESET}:{YELLOW}gemini-3-flash{RESET}('{BOLD}Find auth...{RESET}') ⏳ 

146 task_id={BRIGHT_BLACK}agent_abc123{RESET} 

147 """ 

148 short_desc = (description or "")[:50].strip() 

149 

150 # Build colorized message 

151 colored_message = ( 

152 f"{cost_emoji} " 

153 f"{Colors.CYAN}{agent_type}{Colors.RESET}:" 

154 f"{Colors.YELLOW}{display_model}{Colors.RESET}" 

155 f"('{Colors.BOLD}{short_desc}{Colors.RESET}') " 

156 f"{Colors.BRIGHT_GREEN}⏳{Colors.RESET}\n" 

157 f"task_id={Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}" 

158 ) 

159 return colored_message 

160 

161 

162@dataclass 

163class AgentTask: 

164 """Represents a background agent task with full tool access.""" 

165 

166 id: str 

167 prompt: str 

168 agent_type: str # explore, dewey, frontend, delphi, etc. 

169 description: str 

170 status: str # pending, running, completed, failed, cancelled 

171 created_at: str 

172 parent_session_id: str | None = None 

173 started_at: str | None = None 

174 completed_at: str | None = None 

175 result: str | None = None 

176 error: str | None = None 

177 pid: int | None = None 

178 timeout: int = 300 # Default 5 minutes 

179 progress: dict[str, Any] | None = None # tool calls, last update 

180 

181 

182@dataclass 

183class AgentProgress: 

184 """Progress tracking for a running agent.""" 

185 

186 tool_calls: int = 0 

187 last_tool: str | None = None 

188 last_message: str | None = None 

189 last_update: str | None = None 

190 

191 

192class AgentManager: 

193 """ 

194 Manages background agent execution using Claude Code CLI. 

195 

196 Key features: 

197 - Spawns agents with full tool access via `claude -p` 

198 - Tracks task status and progress 

199 - Persists state to .stravinsky/agents.json 

200 - Provides notification mechanism for task completion 

201 """ 

202 

203 # Dynamic CLI path - find claude in PATH, fallback to common locations 

204 CLAUDE_CLI = shutil.which("claude") or "/opt/homebrew/bin/claude" 

205 

206 def __init__(self, base_dir: str | None = None): 

207 # Initialize lock FIRST - used by _save_tasks and _load_tasks 

208 self._lock = threading.RLock() 

209 

210 if base_dir: 

211 self.base_dir = Path(base_dir) 

212 else: 

213 self.base_dir = Path.cwd() / ".stravinsky" 

214 

215 self.agents_dir = self.base_dir / "agents" 

216 self.state_file = self.base_dir / "agents.json" 

217 

218 self.base_dir.mkdir(parents=True, exist_ok=True) 

219 self.agents_dir.mkdir(parents=True, exist_ok=True) 

220 

221 if not self.state_file.exists(): 

222 self._save_tasks({}) 

223 

224 # In-memory tracking for running processes 

225 self._processes: dict[str, subprocess.Popen] = {} 

226 self._notification_queue: dict[str, list[dict[str, Any]]] = {} 

227 

228 def _load_tasks(self) -> dict[str, Any]: 

229 """Load tasks from persistent storage.""" 

230 with self._lock: 

231 try: 

232 if not self.state_file.exists(): 

233 return {} 

234 with open(self.state_file) as f: 

235 return json.load(f) 

236 except (json.JSONDecodeError, FileNotFoundError): 

237 return {} 

238 

239 def _save_tasks(self, tasks: dict[str, Any]): 

240 """Save tasks to persistent storage.""" 

241 with self._lock, open(self.state_file, "w") as f: 

242 json.dump(tasks, f, indent=2) 

243 

244 def _update_task(self, task_id: str, **kwargs): 

245 """Update a task's fields.""" 

246 with self._lock: 

247 tasks = self._load_tasks() 

248 if task_id in tasks: 

249 tasks[task_id].update(kwargs) 

250 self._save_tasks(tasks) 

251 

252 def get_task(self, task_id: str) -> dict[str, Any] | None: 

253 """Get a task by ID.""" 

254 tasks = self._load_tasks() 

255 return tasks.get(task_id) 

256 

257 def list_tasks(self, parent_session_id: str | None = None) -> list[dict[str, Any]]: 

258 """List all tasks, optionally filtered by parent session.""" 

259 tasks = self._load_tasks() 

260 task_list = list(tasks.values()) 

261 

262 if parent_session_id: 

263 task_list = [t for t in task_list if t.get("parent_session_id") == parent_session_id] 

264 

265 return task_list 

266 

267 def spawn( 

268 self, 

269 token_store: Any, 

270 prompt: str, 

271 agent_type: str = "explore", 

272 description: str = "", 

273 parent_session_id: str | None = None, 

274 system_prompt: str | None = None, 

275 model: str = "gemini-3-flash", 

276 thinking_budget: int = 0, 

277 timeout: int = 300, 

278 ) -> str: 

279 """ 

280 Spawn a new background agent. 

281 

282 Args: 

283 prompt: The task prompt for the agent 

284 agent_type: Type of agent (explore, dewey, frontend, delphi) 

285 description: Short description for status display 

286 parent_session_id: Optional parent session for notifications 

287 system_prompt: Optional custom system prompt 

288 model: Model to use (gemini-3-flash, claude, etc.) 

289 timeout: Maximum execution time in seconds 

290 

291 Returns: 

292 Task ID for tracking 

293 """ 

294 import uuid as uuid_module # Local import for MCP context 

295 

296 task_id = f"agent_{uuid_module.uuid4().hex[:8]}" 

297 

298 task = AgentTask( 

299 id=task_id, 

300 prompt=prompt, 

301 agent_type=agent_type, 

302 description=description or prompt[:50], 

303 status="pending", 

304 created_at=datetime.now().isoformat(), 

305 parent_session_id=parent_session_id, 

306 timeout=timeout, 

307 ) 

308 

309 # Persist task 

310 with self._lock: 

311 tasks = self._load_tasks() 

312 tasks[task_id] = asdict(task) 

313 self._save_tasks(tasks) 

314 

315 # Start background execution 

316 self._execute_agent( 

317 task_id, token_store, prompt, agent_type, system_prompt, model, thinking_budget, timeout 

318 ) 

319 

320 return task_id 

321 

322 def _execute_agent( 

323 self, 

324 task_id: str, 

325 token_store: Any, 

326 prompt: str, 

327 agent_type: str, 

328 system_prompt: str | None = None, 

329 model: str = "gemini-3-flash", 

330 thinking_budget: int = 0, 

331 timeout: int = 300, 

332 ): 

333 """Execute agent using Claude CLI with full tool access. 

334 

335 Uses `claude -p` to spawn a background agent with complete tool access, 

336 just like oh-my-opencode's Sisyphus implementation. 

337 """ 

338 

339 def run_agent(): 

340 log_file = self.agents_dir / f"{task_id}.log" 

341 output_file = self.agents_dir / f"{task_id}.out" 

342 

343 self._update_task(task_id, status="running", started_at=datetime.now().isoformat()) 

344 

345 try: 

346 # Prepare full prompt with system prompt if provided 

347 full_prompt = prompt 

348 if system_prompt: 

349 full_prompt = f"{system_prompt}\n\n---\n\n{prompt}" 

350 

351 logger.info(f"[AgentManager] Spawning Claude CLI agent {task_id} ({agent_type})") 

352 

353 # Build Claude CLI command with full tool access 

354 # Using `claude -p` for non-interactive mode with prompt 

355 cmd = [ 

356 self.CLAUDE_CLI, 

357 "-p", 

358 full_prompt, 

359 "--output-format", 

360 "text", 

361 "--dangerously-skip-permissions", # Critical: bypass permission prompts 

362 ] 

363 

364 # Model routing: 

365 # - Specialized agents (explore/dewey/etc): None = use CLI default, they call invoke_* 

366 # - Unknown agent types (coding tasks): Use Sonnet 4.5 

367 if agent_type in AGENT_MODEL_ROUTING: 

368 cli_model = AGENT_MODEL_ROUTING[agent_type] # None for specialized 

369 else: 

370 cli_model = AGENT_MODEL_ROUTING.get("_default", "sonnet") 

371 

372 if cli_model: 

373 cmd.extend(["--model", cli_model]) 

374 logger.info(f"[AgentManager] Using --model {cli_model} for {agent_type} agent") 

375 

376 # Add system prompt file if we have one 

377 if system_prompt: 

378 system_file = self.agents_dir / f"{task_id}.system" 

379 system_file.write_text(system_prompt) 

380 cmd.extend(["--system-prompt", str(system_file)]) 

381 

382 # Execute Claude CLI as subprocess with full tool access 

383 logger.info(f"[AgentManager] Running: {' '.join(cmd[:3])}...") 

384 

385 # Use PIPE for stderr to capture it properly 

386 # (Previously used file handle which was closed before process finished) 

387 process = subprocess.Popen( 

388 cmd, 

389 stdin=subprocess.DEVNULL, # Critical: prevent stdin blocking 

390 stdout=subprocess.PIPE, 

391 stderr=subprocess.PIPE, 

392 text=True, 

393 cwd=str(Path.cwd()), 

394 env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "stravinsky-agent"}, 

395 start_new_session=True, # Allow process group management 

396 ) 

397 

398 # Track the process 

399 self._processes[task_id] = process 

400 self._update_task(task_id, pid=process.pid) 

401 

402 # Wait for completion with timeout 

403 try: 

404 stdout, stderr = process.communicate(timeout=timeout) 

405 result = stdout.strip() if stdout else "" 

406 

407 # Write stderr to log file 

408 if stderr: 

409 log_file.write_text(stderr) 

410 

411 if process.returncode == 0: 

412 output_file.write_text(result) 

413 self._update_task( 

414 task_id, 

415 status="completed", 

416 result=result, 

417 completed_at=datetime.now().isoformat(), 

418 ) 

419 logger.info(f"[AgentManager] Agent {task_id} completed successfully") 

420 else: 

421 error_msg = f"Claude CLI exited with code {process.returncode}" 

422 if stderr: 

423 error_msg += f"\n{stderr}" 

424 self._update_task( 

425 task_id, 

426 status="failed", 

427 error=error_msg, 

428 completed_at=datetime.now().isoformat(), 

429 ) 

430 logger.error(f"[AgentManager] Agent {task_id} failed: {error_msg}") 

431 

432 except subprocess.TimeoutExpired: 

433 process.kill() 

434 self._update_task( 

435 task_id, 

436 status="failed", 

437 error=f"Agent timed out after {timeout}s", 

438 completed_at=datetime.now().isoformat(), 

439 ) 

440 logger.warning(f"[AgentManager] Agent {task_id} timed out") 

441 

442 except FileNotFoundError: 

443 error_msg = f"Claude CLI not found at {self.CLAUDE_CLI}. Install with: npm install -g @anthropic-ai/claude-code" 

444 log_file.write_text(error_msg) 

445 self._update_task( 

446 task_id, 

447 status="failed", 

448 error=error_msg, 

449 completed_at=datetime.now().isoformat(), 

450 ) 

451 logger.error(f"[AgentManager] {error_msg}") 

452 

453 except Exception as e: 

454 error_msg = str(e) 

455 log_file.write_text(error_msg) 

456 self._update_task( 

457 task_id, 

458 status="failed", 

459 error=error_msg, 

460 completed_at=datetime.now().isoformat(), 

461 ) 

462 logger.exception(f"[AgentManager] Agent {task_id} exception") 

463 

464 finally: 

465 self._processes.pop(task_id, None) 

466 self._notify_completion(task_id) 

467 

468 # Run in background thread 

469 thread = threading.Thread(target=run_agent, daemon=True) 

470 thread.start() 

471 

472 def _notify_completion(self, task_id: str): 

473 """Queue notification for parent session.""" 

474 task = self.get_task(task_id) 

475 if not task: 

476 return 

477 

478 parent_id = task.get("parent_session_id") 

479 if parent_id: 

480 if parent_id not in self._notification_queue: 

481 self._notification_queue[parent_id] = [] 

482 

483 self._notification_queue[parent_id].append(task) 

484 logger.info(f"[AgentManager] Queued notification for {parent_id}: task {task_id}") 

485 

486 def get_pending_notifications(self, session_id: str) -> list[dict[str, Any]]: 

487 """Get and clear pending notifications for a session.""" 

488 notifications = self._notification_queue.pop(session_id, []) 

489 return notifications 

490 

491 def cancel(self, task_id: str) -> bool: 

492 """Cancel a running agent task.""" 

493 task = self.get_task(task_id) 

494 if not task: 

495 return False 

496 

497 if task["status"] != "running": 

498 return False 

499 

500 process = self._processes.get(task_id) 

501 if process: 

502 try: 

503 os.killpg(os.getpgid(process.pid), signal.SIGTERM) 

504 process.wait(timeout=5) 

505 except Exception as e: 

506 logger.warning(f"[AgentManager] Failed to kill process for {task_id}: {e}") 

507 try: 

508 process.kill() 

509 except: 

510 pass 

511 

512 self._update_task(task_id, status="cancelled", completed_at=datetime.now().isoformat()) 

513 

514 return True 

515 

516 def stop_all(self, clear_history: bool = False) -> int: 

517 """ 

518 Stop all running agents and optionally clear task history. 

519 

520 Args: 

521 clear_history: If True, also remove completed/failed tasks from history 

522 

523 Returns: 

524 Number of tasks stopped/cleared 

525 """ 

526 tasks = self._load_tasks() 

527 stopped_count = 0 

528 

529 # Stop running tasks 

530 for task_id, task in list(tasks.items()): 

531 if task.get("status") == "running": 

532 self.cancel(task_id) 

533 stopped_count += 1 

534 

535 # Optionally clear history 

536 if clear_history: 

537 cleared = len(tasks) 

538 self._save_tasks({}) 

539 self._processes.clear() 

540 logger.info(f"[AgentManager] Cleared all {cleared} agent tasks") 

541 return cleared 

542 

543 return stopped_count 

544 

545 def get_output(self, task_id: str, block: bool = False, timeout: float = 30.0) -> str: 

546 """ 

547 Get output from an agent task. 

548 

549 Args: 

550 task_id: The task ID 

551 block: If True, wait for completion 

552 timeout: Max seconds to wait if blocking 

553 

554 Returns: 

555 Formatted task output/status 

556 """ 

557 task = self.get_task(task_id) 

558 if not task: 

559 return f"Task {task_id} not found." 

560 

561 if block and task["status"] == "running": 

562 # Poll for completion 

563 start = datetime.now() 

564 while (datetime.now() - start).total_seconds() < timeout: 

565 task = self.get_task(task_id) 

566 if not task or task["status"] != "running": 

567 break 

568 time.sleep(0.5) 

569 

570 # Refresh task state after potential blocking wait 

571 if not task: 

572 return f"Task {task_id} not found." 

573 

574 status = task["status"] 

575 description = task.get("description", "") 

576 agent_type = task.get("agent_type", "unknown") 

577 

578 # Get cost-tier emoji for visual differentiation 

579 cost_emoji = get_agent_emoji(agent_type) 

580 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"]) 

581 

582 if status == "completed": 

583 result = task.get("result", "(no output)") 

584 return f"""{cost_emoji} {Colors.BRIGHT_GREEN}✅ Agent Task Completed{Colors.RESET} 

585 

586**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET} 

587**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}') 

588 

589**Result**: 

590{result}""" 

591 

592 elif status == "failed": 

593 error = task.get("error", "(no error details)") 

594 return f"""{cost_emoji} {Colors.BRIGHT_RED}❌ Agent Task Failed{Colors.RESET} 

595 

596**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET} 

597**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}') 

598 

599**Error**: 

600{error}""" 

601 

602 elif status == "cancelled": 

603 return f"""{cost_emoji} {Colors.BRIGHT_YELLOW}⚠️ Agent Task Cancelled{Colors.RESET} 

604 

605**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET} 

606**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}')""" 

607 

608 else: # pending or running 

609 pid = task.get("pid", "N/A") 

610 started = task.get("started_at", "N/A") 

611 return f"""{cost_emoji} {Colors.BRIGHT_YELLOW}⏳ Agent Task Running{Colors.RESET} 

612 

613**Task ID**: {Colors.BRIGHT_BLACK}{task_id}{Colors.RESET} 

614**Agent**: {Colors.CYAN}{agent_type}{Colors.RESET}:{Colors.YELLOW}{display_model}{Colors.RESET}('{Colors.BOLD}{description}{Colors.RESET}') 

615**PID**: {Colors.DIM}{pid}{Colors.RESET} 

616**Started**: {Colors.DIM}{started}{Colors.RESET} 

617 

618Use `agent_output` with block=true to wait for completion.""" 

619 

620 def get_progress(self, task_id: str, lines: int = 20) -> str: 

621 """ 

622 Get real-time progress from a running agent's output. 

623 

624 Args: 

625 task_id: The task ID 

626 lines: Number of lines to show from the end 

627 

628 Returns: 

629 Recent output lines and status 

630 """ 

631 task = self.get_task(task_id) 

632 if not task: 

633 return f"Task {task_id} not found." 

634 

635 output_file = self.agents_dir / f"{task_id}.out" 

636 log_file = self.agents_dir / f"{task_id}.log" 

637 

638 status = task["status"] 

639 description = task.get("description", "") 

640 agent_type = task.get("agent_type", "unknown") 

641 pid = task.get("pid") 

642 

643 # Zombie Detection: If running but process is gone 

644 if status == "running" and pid: 

645 try: 

646 import psutil 

647 

648 if not psutil.pid_exists(pid): 

649 status = "failed" 

650 self._update_task( 

651 task_id, 

652 status="failed", 

653 error="Agent process died unexpectedly (Zombie detected)", 

654 completed_at=datetime.now().isoformat(), 

655 ) 

656 logger.warning(f"[AgentManager] Zombie agent detected: {task_id}") 

657 except ImportError: 

658 pass 

659 

660 # Read recent output 

661 output_content = "" 

662 if output_file.exists(): 

663 try: 

664 full_content = output_file.read_text() 

665 if full_content: 

666 output_lines = full_content.strip().split("\n") 

667 recent = output_lines[-lines:] if len(output_lines) > lines else output_lines 

668 output_content = "\n".join(recent) 

669 except Exception: 

670 pass 

671 

672 # Check log for errors 

673 log_content = "" 

674 if log_file.exists(): 

675 try: 

676 log_content = log_file.read_text().strip() 

677 except Exception: 

678 pass 

679 

680 # Status emoji 

681 status_emoji = { 

682 "pending": "⏳", 

683 "running": "🔄", 

684 "completed": "✅", 

685 "failed": "❌", 

686 "cancelled": "⚠️", 

687 }.get(status, "❓") 

688 

689 # Get cost-tier emoji for visual differentiation 

690 cost_emoji = get_agent_emoji(agent_type) 

691 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"]) 

692 

693 result = f"""{cost_emoji} {status_emoji} **Agent Progress** 

694 

695**Task ID**: {task_id} 

696**Agent**: {agent_type}:{display_model}('{description}') 

697**Status**: {status} 

698""" 

699 

700 if output_content: 

701 result += f"\n**Recent Output** (last {lines} lines):\n```\n{output_content}\n```" 

702 elif status == "running": 

703 result += "\n*Agent is working... no output yet.*" 

704 

705 if log_content and status == "failed": 

706 # Truncate log if too long 

707 if len(log_content) > 500: 

708 log_content = log_content[:500] + "..." 

709 result += f"\n\n**Error Log**:\n```\n{log_content}\n```" 

710 

711 return result 

712 

713 

714# Global manager instance 

715_manager: AgentManager | None = None 

716_manager_lock = threading.Lock() 

717 

718 

719def get_manager() -> AgentManager: 

720 """Get or create the global AgentManager instance.""" 

721 global _manager 

722 if _manager is None: 

723 with _manager_lock: 

724 # Double-check pattern to avoid race condition 

725 if _manager is None: 

726 _manager = AgentManager() 

727 return _manager 

728 

729 

730# Tool interface functions 

731 

732 

733async def agent_spawn( 

734 prompt: str, 

735 agent_type: str = "explore", 

736 description: str = "", 

737 model: str = "gemini-3-flash", 

738 thinking_budget: int = 0, 

739 timeout: int = 300, 

740 blocking: bool = False, 

741) -> str: 

742 """ 

743 Spawn a background agent. 

744 

745 Args: 

746 prompt: The task for the agent to perform 

747 agent_type: Type of agent (explore, dewey, frontend, delphi) 

748 description: Short description shown in status 

749 model: Model to use (gemini-3-flash, gemini-2.0-flash, claude) 

750 thinking_budget: Reserved reasoning tokens 

751 timeout: Execution timeout in seconds 

752 blocking: If True, wait for completion and return result directly (use for delphi) 

753 

754 Returns: 

755 Task ID and instructions, or full result if blocking=True 

756 """ 

757 manager = get_manager() 

758 

759 # Map agent types to system prompts 

760 # ALL agents use invoke_gemini or invoke_openai - NOT Claude directly 

761 # explore/dewey/document_writer/multimodal/frontend → gemini-3-flash 

762 # delphi → openai gpt-5.2 

763 system_prompts = { 

764 "explore": """You are a codebase exploration specialist. Find files, patterns, and answer 'where is X?' questions. 

765 

766MODEL ROUTING (MANDATORY): 

767You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL analysis and reasoning. 

768The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file. 

769 

770WORKFLOW: 

7711. Call invoke_gemini_agentic(prompt="<task description>", model="gemini-3-flash", max_turns=5, agent_context={"agent_type": "explore"}) 

7722. The agentic model will autonomously explore the codebase using available tools 

7733. Return the Gemini response with findings 

774 

775RECOMMENDED: max_turns=5 for thorough exploration""", 

776 "dewey": """You are a documentation and research specialist. Find implementation examples and official docs. 

777 

778MODEL ROUTING (MANDATORY): 

779You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL analysis, summarization, and reasoning. 

780The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file. 

781 

782WORKFLOW: 

7831. Call invoke_gemini_agentic(prompt="<task description>", model="gemini-3-flash", max_turns=5, agent_context={"agent_type": "dewey"}) 

7842. The agentic model will autonomously research and gather information using available tools 

7853. Return the Gemini response with findings 

786 

787RECOMMENDED: max_turns=5 for comprehensive research""", 

788 "frontend": """You are a Senior Frontend Architect & UI Designer. 

789 

790MODEL ROUTING (MANDATORY): 

791You MUST use invoke_gemini_agentic with model="gemini-3-pro-high" for ALL code generation and design work. 

792The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file. 

793 

794DESIGN PHILOSOPHY: 

795- Anti-Generic: Reject standard layouts. Bespoke, asymmetric, distinctive. 

796- Library Discipline: Use existing UI libraries (Shadcn, Radix, MUI) if detected. 

797- Stack: React/Vue/Svelte, Tailwind/Custom CSS, semantic HTML5. 

798 

799WORKFLOW: 

8001. Call invoke_gemini_agentic(prompt="Generate frontend code for: <task>", model="gemini-3-pro-high", max_turns=3, agent_context={"agent_type": "frontend"}) 

8012. The agentic model will autonomously analyze the codebase and generate code using available tools 

8023. Return the generated code 

803 

804RECOMMENDED: max_turns=3 for focused code generation""", 

805 "delphi": """You are a strategic technical advisor for architecture and hard debugging. 

806 

807MODEL ROUTING (MANDATORY): 

808You MUST use invoke_openai with model="gpt-5.2" for ALL strategic advice and analysis. 

809 

810WORKFLOW: 

8111. Gather context about the problem 

8122. Call invoke_openai(prompt="<problem description>", model="gpt-5.2", agent_context={"agent_type": "delphi"}) 

8133. Return the GPT response""", 

814 "document_writer": """You are a Technical Documentation Specialist. 

815 

816MODEL ROUTING (MANDATORY): 

817You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL documentation generation. 

818The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file. 

819 

820DOCUMENT TYPES: README, API docs, ADRs, user guides, inline docs. 

821 

822WORKFLOW: 

8231. Call invoke_gemini_agentic(prompt="Write documentation for: <topic>", model="gemini-3-flash", max_turns=3, agent_context={"agent_type": "document_writer"}) 

8242. The agentic model will autonomously gather context and generate documentation using available tools 

8253. Return the documentation 

826 

827RECOMMENDED: max_turns=3 for focused documentation generation""", 

828 "multimodal": """You interpret media files (PDFs, images, diagrams, screenshots). 

829 

830MODEL ROUTING (MANDATORY): 

831You MUST use invoke_gemini_agentic with model="gemini-3-flash" for ALL visual analysis. 

832The agentic mode gives you autonomous tool access: read_file, list_directory, grep_search, write_file. 

833 

834WORKFLOW: 

8351. Call invoke_gemini_agentic(prompt="Analyze this file: <path>. Extract: <goal>", model="gemini-3-flash", max_turns=3, agent_context={"agent_type": "multimodal"}) 

8362. The agentic model will autonomously access and analyze the file using available tools 

8373. Return extracted information only 

838 

839RECOMMENDED: max_turns=3 for focused visual analysis""", 

840 "planner": """You are a pre-implementation planning specialist. You analyze requests and produce structured implementation plans BEFORE any code changes begin. 

841 

842PURPOSE: 

843- Analyze requests and produce actionable implementation plans 

844- Identify dependencies and parallelization opportunities 

845- Enable efficient parallel execution by the orchestrator 

846- Prevent wasted effort through upfront planning 

847 

848METHODOLOGY: 

8491. EXPLORE FIRST: Spawn explore agents IN PARALLEL to understand the codebase 

8502. DECOMPOSE: Break request into atomic, single-purpose tasks 

8513. ANALYZE DEPENDENCIES: What blocks what? What can run in parallel? 

8524. ASSIGN AGENTS: Map each task to the right specialist (explore/dewey/frontend/delphi) 

8535. OUTPUT STRUCTURED PLAN: Use the required format below 

854 

855REQUIRED OUTPUT FORMAT: 

856``` 

857## PLAN: [Brief title] 

858 

859### ANALYSIS 

860- **Request**: [One sentence summary] 

861- **Scope**: [What's in/out of scope] 

862- **Risk Level**: [Low/Medium/High] 

863 

864### EXECUTION PHASES 

865 

866#### Phase 1: [Name] (PARALLEL) 

867| Task | Agent | Files | Est | 

868|------|-------|-------|-----| 

869| [description] | explore | file.py | S/M/L | 

870 

871#### Phase 2: [Name] (SEQUENTIAL after Phase 1) 

872| Task | Agent | Files | Est | 

873|------|-------|-------|-----| 

874 

875### AGENT SPAWN COMMANDS 

876```python 

877# Phase 1 - Fire all in parallel 

878agent_spawn(prompt="...", agent_type="explore", description="...") 

879``` 

880``` 

881 

882CONSTRAINTS: 

883- You ONLY plan. You NEVER execute code changes. 

884- Every task must have a clear agent assignment 

885- Parallel phases must be truly independent 

886- Include ready-to-use agent_spawn commands""", 

887 "research-lead": """You coordinate research tasks by spawning explore and dewey agents in parallel. 

888 

889## Your Role 

8901. Receive research objective from Stravinsky 

8912. Decompose into parallel search tasks 

8923. Spawn explore/dewey agents for each task 

8934. Collect and SYNTHESIZE results 

8945. Return structured findings (not raw outputs) 

895 

896## Output Format 

897Always return a Research Brief: 

898```json 

899{ 

900 "objective": "Original research goal", 

901 "findings": [ 

902 {"source": "agent_id", "summary": "Key finding", "confidence": "high/medium/low"}, 

903 ... 

904 ], 

905 "synthesis": "Combined analysis of all findings", 

906 "gaps": ["Information we couldn't find"], 

907 "recommendations": ["Suggested next steps"] 

908} 

909``` 

910 

911MODEL ROUTING: 

912Use invoke_gemini with model="gemini-3-flash" for ALL synthesis work. 

913""", 

914 "implementation-lead": """You coordinate implementation based on research findings. 

915 

916## Your Role 

9171. Receive Research Brief from Stravinsky 

9182. Create implementation plan 

9193. Delegate to specialists: 

920 - frontend: UI/visual work 

921 - debugger: Fix failures 

922 - code-reviewer: Quality checks 

9234. Verify with lsp_diagnostics 

9245. Return Implementation Report 

925 

926## Output Format 

927```json 

928{ 

929 "objective": "What was implemented", 

930 "files_changed": ["path/to/file.py"], 

931 "tests_status": "pass/fail/skipped", 

932 "diagnostics": "clean/warnings/errors", 

933 "blockers": ["Issues preventing completion"] 

934} 

935``` 

936 

937## Escalation Rules 

938- After 2 failed attempts → spawn debugger 

939- After debugger fails → escalate to Stravinsky with context 

940- NEVER call delphi directly 

941""", 

942 } 

943 

944 system_prompt = system_prompts.get(agent_type) 

945 

946 # Model routing (MANDATORY - enforced in system prompts): 

947 # - explore, dewey, document_writer, multimodal → invoke_gemini(gemini-3-flash) 

948 # - frontend → invoke_gemini(gemini-3-pro-high) 

949 # - delphi → invoke_openai(gpt-5.2) 

950 # - Unknown agent types (coding tasks) → Claude CLI --model sonnet 

951 

952 # Get token store for authentication 

953 from ..auth.token_store import TokenStore 

954 

955 token_store = TokenStore() 

956 

957 task_id = manager.spawn( 

958 token_store=token_store, 

959 prompt=prompt, 

960 agent_type=agent_type, 

961 description=description or prompt[:50], 

962 system_prompt=system_prompt, 

963 model=model, # Not used for Claude CLI, kept for API compatibility 

964 thinking_budget=thinking_budget, # Not used for Claude CLI, kept for API compatibility 

965 timeout=timeout, 

966 ) 

967 

968 # Get display model and cost tier emoji for concise output 

969 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"]) 

970 cost_emoji = get_agent_emoji(agent_type) 

971 short_desc = (description or prompt[:50]).strip() 

972 

973 # If blocking mode (recommended for delphi), wait for completion 

974 if blocking: 

975 result = manager.get_output(task_id, block=True, timeout=timeout) 

976 blocking_msg = colorize_agent_spawn_message( 

977 cost_emoji, agent_type, display_model, short_desc, task_id 

978 ) 

979 return f"{blocking_msg} {Colors.BOLD}[BLOCKING]{Colors.RESET}\n\n{result}" 

980 

981 # Enhanced format with ANSI colors: cost_emoji agent:model('description') status_emoji 

982 # 🟢 explore:gemini-3-flash('Find auth...') ⏳ 

983 # With colors: agent type in cyan, model in yellow, description bold 

984 return colorize_agent_spawn_message( 

985 cost_emoji, agent_type, display_model, short_desc, task_id 

986 ) 

987 

988 

989async def agent_output(task_id: str, block: bool = False) -> str: 

990 """ 

991 Get output from a background agent task. 

992 

993 Args: 

994 task_id: The task ID from agent_spawn 

995 block: If True, wait for the task to complete (up to 30s) 

996 

997 Returns: 

998 Task status and output 

999 """ 

1000 manager = get_manager() 

1001 return manager.get_output(task_id, block=block) 

1002 

1003 

1004async def agent_retry( 

1005 task_id: str, 

1006 new_prompt: str | None = None, 

1007 new_timeout: int | None = None, 

1008) -> str: 

1009 """ 

1010 Retry a failed or timed-out background agent. 

1011 

1012 Args: 

1013 task_id: The ID of the task to retry 

1014 new_prompt: Optional refined prompt for the retry 

1015 new_timeout: Optional new timeout in seconds 

1016 

1017 Returns: 

1018 New Task ID and status 

1019 """ 

1020 manager = get_manager() 

1021 task = manager.get_task(task_id) 

1022 

1023 if not task: 

1024 return f"❌ Task {task_id} not found." 

1025 

1026 if task["status"] in ["running", "pending"]: 

1027 return f"⚠️ Task {task_id} is still {task['status']}. Cancel it first if you want to retry." 

1028 

1029 prompt = new_prompt or task["prompt"] 

1030 timeout = new_timeout or task.get("timeout", 300) 

1031 

1032 return await agent_spawn( 

1033 prompt=prompt, 

1034 agent_type=task["agent_type"], 

1035 description=f"Retry of {task_id}: {task['description']}", 

1036 timeout=timeout, 

1037 ) 

1038 

1039 

1040async def agent_cancel(task_id: str) -> str: 

1041 """ 

1042 Cancel a running background agent. 

1043 

1044 Args: 

1045 task_id: The task ID to cancel 

1046 

1047 Returns: 

1048 Cancellation result 

1049 """ 

1050 manager = get_manager() 

1051 success = manager.cancel(task_id) 

1052 

1053 if success: 

1054 return f"✅ Agent task {task_id} has been cancelled." 

1055 else: 

1056 task = manager.get_task(task_id) 

1057 if not task: 

1058 return f"❌ Task {task_id} not found." 

1059 else: 

1060 return f"⚠️ Task {task_id} is not running (status: {task['status']}). Cannot cancel." 

1061 

1062 

1063async def agent_list() -> str: 

1064 """ 

1065 List all background agent tasks. 

1066 

1067 Returns: 

1068 Formatted list of tasks 

1069 """ 

1070 manager = get_manager() 

1071 tasks = manager.list_tasks() 

1072 

1073 if not tasks: 

1074 return "No background agent tasks found." 

1075 

1076 lines = [] 

1077 

1078 for t in sorted(tasks, key=lambda x: x.get("created_at", ""), reverse=True): 

1079 status_emoji = { 

1080 "pending": "⏳", 

1081 "running": "🔄", 

1082 "completed": "✅", 

1083 "failed": "❌", 

1084 "cancelled": "⚠️", 

1085 }.get(t["status"], "❓") 

1086 

1087 agent_type = t.get("agent_type", "unknown") 

1088 display_model = AGENT_DISPLAY_MODELS.get(agent_type, AGENT_DISPLAY_MODELS["_default"]) 

1089 cost_emoji = get_agent_emoji(agent_type) 

1090 desc = t.get("description", t.get("prompt", "")[:40]) 

1091 task_id = t["id"] 

1092 

1093 # Concise format with colors: cost_emoji status agent:model('desc') id=xxx 

1094 # Agent type in cyan, model in yellow, task_id in dim 

1095 lines.append( 

1096 f"{cost_emoji} {status_emoji} " 

1097 f"{Colors.CYAN}{agent_type}{Colors.RESET}:" 

1098 f"{Colors.YELLOW}{display_model}{Colors.RESET}" 

1099 f"('{Colors.BOLD}{desc}{Colors.RESET}') " 

1100 f"id={Colors.BRIGHT_BLACK}{task_id}{Colors.RESET}" 

1101 ) 

1102 

1103 return "\n".join(lines) 

1104 

1105 

1106async def agent_progress(task_id: str, lines: int = 20) -> str: 

1107 """ 

1108 Get real-time progress from a running background agent. 

1109 

1110 Shows the most recent output lines from the agent, useful for 

1111 monitoring what the agent is currently doing. 

1112 

1113 Args: 

1114 task_id: The task ID from agent_spawn 

1115 lines: Number of recent output lines to show (default 20) 

1116 

1117 Returns: 

1118 Recent agent output and status 

1119 """ 

1120 manager = get_manager() 

1121 return manager.get_progress(task_id, lines=lines)