Coverage for mcp_bridge/tools/task_runner.py: 0%
66 statements
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
1"""
2Task Runner for Stravinsky background sub-agents.
4This script is executed as a background process to handle agent tasks,
5capture output, and update status in tasks.json.
6"""
8import argparse
9import asyncio
10import json
11import logging
12import os
13from datetime import datetime
14from pathlib import Path
16# Setup logging
17logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
18logger = logging.getLogger("task_runner")
21async def run_task(task_id: str, base_dir: str):
22 base_path = Path(base_dir)
23 tasks_file = base_path / "tasks.json"
24 agents_dir = base_path / "agents"
26 # Load task details
27 try:
28 with open(tasks_file) as f:
29 tasks = json.load(f)
30 task = tasks.get(task_id)
31 except Exception as e:
32 logger.error(f"Failed to load tasks: {e}")
33 return
35 if not task:
36 logger.error(f"Task {task_id} not found")
37 return
39 prompt = task.get("prompt")
40 model = task.get("model", "gemini-2.0-flash")
42 output_file = agents_dir / f"{task_id}.out"
43 agents_dir.mkdir(parents=True, exist_ok=True)
45 try:
46 # Use Claude CLI for background tasks to ensure tool access
47 # Discover CLI path
48 claude_cli = os.environ.get("CLAUDE_CLI", "/opt/homebrew/bin/claude")
50 cmd = [
51 claude_cli,
52 "-p",
53 ]
55 if model:
56 cmd.extend(["--model", model])
58 cmd.append(prompt)
60 logger.info(f"Executing task {task_id} via CLI ({model})...")
62 # Open output and log files
63 with (
64 open(output_file, "w") as out_f,
65 open(base_path / "tasks" / f"{task_id}.log", "a") as log_f,
66 ):
67 process = await asyncio.create_subprocess_exec(
68 *cmd, stdout=out_f, stderr=log_f, cwd=os.getcwd()
69 )
71 await process.wait()
73 # Read result
74 if output_file.exists():
75 result = output_file.read_text()
76 else:
77 result = ""
79 # Save result
80 with open(output_file, "w") as f:
81 f.write(result)
83 # Update status
84 with open(tasks_file) as f:
85 tasks = json.load(f)
87 if task_id in tasks:
88 tasks[task_id].update(
89 {
90 "status": "completed",
91 "result": result,
92 "completed_at": datetime.now().isoformat(),
93 }
94 )
95 with open(tasks_file, "w") as f:
96 json.dump(tasks, f, indent=2)
98 logger.info(f"Task {task_id} completed successfully")
100 except Exception as e:
101 logger.exception(f"Task {task_id} failed: {e}")
103 # Update status with error
104 try:
105 with open(tasks_file) as f:
106 tasks = json.load(f)
107 if task_id in tasks:
108 tasks[task_id].update(
109 {
110 "status": "failed",
111 "error": str(e),
112 "completed_at": datetime.now().isoformat(),
113 }
114 )
115 with open(tasks_file, "w") as f:
116 json.dump(tasks, f, indent=2)
117 except:
118 pass
121if __name__ == "__main__":
122 parser = argparse.ArgumentParser(
123 description="Internal task runner for Stravinsky background agents. "
124 "Executes agent tasks via Claude CLI and manages output/status.",
125 prog="task_runner",
126 )
127 parser.add_argument(
128 "--task-id",
129 required=True,
130 help="Unique identifier for the task to execute",
131 )
132 parser.add_argument(
133 "--base-dir",
134 required=True,
135 help="Base directory containing tasks.json and agents/ output folder",
136 )
137 args = parser.parse_args()
139 asyncio.run(run_task(args.task_id, args.base_dir))