Coverage for mcp_bridge/tools/background_tasks.py: 33%

94 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2026-01-10 00:20 -0500

1""" 

2Background Task Manager for Stravinsky. 

3 

4Provides mechanisms to spawn, monitor, and manage async sub-agents. 

5Tasks are persisted to .stravinsky/tasks.json. 

6""" 

7 

8import json 

9import subprocess 

10import sys 

11from dataclasses import asdict, dataclass 

12from datetime import datetime 

13from pathlib import Path 

14from typing import Any 

15 

16 

17@dataclass 

18class BackgroundTask: 

19 id: str 

20 prompt: str 

21 model: str 

22 status: str # pending, running, completed, failed 

23 created_at: str 

24 started_at: str | None = None 

25 completed_at: str | None = None 

26 result: str | None = None 

27 error: str | None = None 

28 pid: int | None = None 

29 

30 

31class BackgroundManager: 

32 def __init__(self, base_dir: str | None = None): 

33 if base_dir: 

34 self.base_dir = Path(base_dir) 

35 else: 

36 # Default to .stravinsky in the current working directory 

37 self.base_dir = Path.cwd() / ".stravinsky" 

38 

39 self.tasks_dir = self.base_dir / "tasks" 

40 self.state_file = self.base_dir / "tasks.json" 

41 

42 self.base_dir.mkdir(parents=True, exist_ok=True) 

43 self.tasks_dir.mkdir(parents=True, exist_ok=True) 

44 

45 if not self.state_file.exists(): 

46 self._save_tasks({}) 

47 

48 def _load_tasks(self) -> dict[str, Any]: 

49 try: 

50 with open(self.state_file) as f: 

51 return json.load(f) 

52 except (json.JSONDecodeError, FileNotFoundError): 

53 return {} 

54 

55 def _save_tasks(self, tasks: dict[str, Any]): 

56 with open(self.state_file, "w") as f: 

57 json.dump(tasks, f, indent=2) 

58 

59 def create_task(self, prompt: str, model: str) -> str: 

60 import uuid as uuid_module # Local import for MCP context 

61 task_id = str(uuid_module.uuid4())[:8] 

62 task = BackgroundTask( 

63 id=task_id, 

64 prompt=prompt, 

65 model=model, 

66 status="pending", 

67 created_at=datetime.isoformat(datetime.now()), 

68 ) 

69 

70 tasks = self._load_tasks() 

71 tasks[task_id] = asdict(task) 

72 self._save_tasks(tasks) 

73 return task_id 

74 

75 def update_task(self, task_id: str, **kwargs): 

76 tasks = self._load_tasks() 

77 if task_id in tasks: 

78 tasks[task_id].update(kwargs) 

79 self._save_tasks(tasks) 

80 

81 def get_task(self, task_id: str) -> dict[str, Any] | None: 

82 tasks = self._load_tasks() 

83 return tasks.get(task_id) 

84 

85 def list_tasks(self) -> list[dict[str, Any]]: 

86 tasks = self._load_tasks() 

87 return list(tasks.values()) 

88 

89 def spawn(self, task_id: str): 

90 """ 

91 Spawns a background process to execute the task. 

92 In this implementation, it uses another instance of the MCP bridge 

93 or a dedicated tool invoker script. 

94 """ 

95 task = self.get_task(task_id) 

96 if not task: 

97 return 

98 

99 # We'll use a wrapper script that handles the model invocation and updates the status 

100 # mcp_bridge/tools/task_runner.py (we will create this) 

101 

102 log_file = self.tasks_dir / f"{task_id}.log" 

103 

104 # Start the background process 

105 cmd = [ 

106 sys.executable, 

107 "-m", "mcp_bridge.tools.task_runner", 

108 "--task-id", task_id, 

109 "--base-dir", str(self.base_dir) 

110 ] 

111 

112 try: 

113 # Using Popen to run in background 

114 process = subprocess.Popen( 

115 cmd, 

116 stdout=open(log_file, "w"), 

117 stderr=subprocess.STDOUT, 

118 start_new_session=True # Run in its own session so it doesn't die with the server 

119 ) 

120 

121 self.update_task(task_id, status="running", pid=process.pid, started_at=datetime.isoformat(datetime.now())) 

122 except Exception as e: 

123 self.update_task(task_id, status="failed", error=str(e)) 

124 

125 

126# Tool interface functions 

127 

128async def task_spawn(prompt: str, model: str = "gemini-3-flash") -> str: 

129 """Spawns a new background task.""" 

130 manager = BackgroundManager() 

131 task_id = manager.create_task(prompt, model) 

132 manager.spawn(task_id) 

133 return f"Task spawned with ID: {task_id}. Use task_status('{task_id}') to check progress." 

134 

135 

136async def task_status(task_id: str) -> str: 

137 """Checks the status of a background task.""" 

138 manager = BackgroundManager() 

139 task = manager.get_task(task_id) 

140 if not task: 

141 return f"Task {task_id} not found." 

142 

143 status = task["status"] 

144 if status == "completed": 

145 return f"Task {task_id} COMPLETED:\n\n{task.get('result')}" 

146 elif status == "failed": 

147 return f"Task {task_id} FAILED:\n\n{task.get('error')}" 

148 else: 

149 return f"Task {task_id} is currently {status} (PID: {task.get('pid')})." 

150 

151 

152async def task_list() -> str: 

153 """Lists all background tasks.""" 

154 manager = BackgroundManager() 

155 tasks = manager.list_tasks() 

156 if not tasks: 

157 return "No background tasks found." 

158 

159 lines = ["Background Tasks:"] 

160 for t in tasks: 

161 lines.append(f"- [{t['id']}] {t['status']}: {t['prompt'][:50]}...") 

162 

163 return "\n".join(lines)