Coverage for mcp_bridge/tools/task_runner.py: 0%

66 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2026-01-10 00:20 -0500

1""" 

2Task Runner for Stravinsky background sub-agents. 

3 

4This script is executed as a background process to handle agent tasks, 

5capture output, and update status in tasks.json. 

6""" 

7 

8import argparse 

9import asyncio 

10import json 

11import logging 

12import os 

13from datetime import datetime 

14from pathlib import Path 

15 

16# Setup logging 

17logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") 

18logger = logging.getLogger("task_runner") 

19 

20 

21async def run_task(task_id: str, base_dir: str): 

22 base_path = Path(base_dir) 

23 tasks_file = base_path / "tasks.json" 

24 agents_dir = base_path / "agents" 

25 

26 # Load task details 

27 try: 

28 with open(tasks_file) as f: 

29 tasks = json.load(f) 

30 task = tasks.get(task_id) 

31 except Exception as e: 

32 logger.error(f"Failed to load tasks: {e}") 

33 return 

34 

35 if not task: 

36 logger.error(f"Task {task_id} not found") 

37 return 

38 

39 prompt = task.get("prompt") 

40 model = task.get("model", "gemini-2.0-flash") 

41 

42 output_file = agents_dir / f"{task_id}.out" 

43 agents_dir.mkdir(parents=True, exist_ok=True) 

44 

45 try: 

46 # Use Claude CLI for background tasks to ensure tool access 

47 # Discover CLI path 

48 claude_cli = os.environ.get("CLAUDE_CLI", "/opt/homebrew/bin/claude") 

49 

50 cmd = [ 

51 claude_cli, 

52 "-p", 

53 ] 

54 

55 if model: 

56 cmd.extend(["--model", model]) 

57 

58 cmd.append(prompt) 

59 

60 logger.info(f"Executing task {task_id} via CLI ({model})...") 

61 

62 # Open output and log files 

63 with ( 

64 open(output_file, "w") as out_f, 

65 open(base_path / "tasks" / f"{task_id}.log", "a") as log_f, 

66 ): 

67 process = await asyncio.create_subprocess_exec( 

68 *cmd, stdout=out_f, stderr=log_f, cwd=os.getcwd() 

69 ) 

70 

71 await process.wait() 

72 

73 # Read result 

74 if output_file.exists(): 

75 result = output_file.read_text() 

76 else: 

77 result = "" 

78 

79 # Save result 

80 with open(output_file, "w") as f: 

81 f.write(result) 

82 

83 # Update status 

84 with open(tasks_file) as f: 

85 tasks = json.load(f) 

86 

87 if task_id in tasks: 

88 tasks[task_id].update( 

89 { 

90 "status": "completed", 

91 "result": result, 

92 "completed_at": datetime.now().isoformat(), 

93 } 

94 ) 

95 with open(tasks_file, "w") as f: 

96 json.dump(tasks, f, indent=2) 

97 

98 logger.info(f"Task {task_id} completed successfully") 

99 

100 except Exception as e: 

101 logger.exception(f"Task {task_id} failed: {e}") 

102 

103 # Update status with error 

104 try: 

105 with open(tasks_file) as f: 

106 tasks = json.load(f) 

107 if task_id in tasks: 

108 tasks[task_id].update( 

109 { 

110 "status": "failed", 

111 "error": str(e), 

112 "completed_at": datetime.now().isoformat(), 

113 } 

114 ) 

115 with open(tasks_file, "w") as f: 

116 json.dump(tasks, f, indent=2) 

117 except: 

118 pass 

119 

120 

121if __name__ == "__main__": 

122 parser = argparse.ArgumentParser( 

123 description="Internal task runner for Stravinsky background agents. " 

124 "Executes agent tasks via Claude CLI and manages output/status.", 

125 prog="task_runner", 

126 ) 

127 parser.add_argument( 

128 "--task-id", 

129 required=True, 

130 help="Unique identifier for the task to execute", 

131 ) 

132 parser.add_argument( 

133 "--base-dir", 

134 required=True, 

135 help="Base directory containing tasks.json and agents/ output folder", 

136 ) 

137 args = parser.parse_args() 

138 

139 asyncio.run(run_task(args.task_id, args.base_dir))