Coverage for mcp_bridge/tools/background_tasks.py: 33%
94 statements
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
1"""
2Background Task Manager for Stravinsky.
4Provides mechanisms to spawn, monitor, and manage async sub-agents.
5Tasks are persisted to .stravinsky/tasks.json.
6"""
8import json
9import subprocess
10import sys
11from dataclasses import asdict, dataclass
12from datetime import datetime
13from pathlib import Path
14from typing import Any
17@dataclass
18class BackgroundTask:
19 id: str
20 prompt: str
21 model: str
22 status: str # pending, running, completed, failed
23 created_at: str
24 started_at: str | None = None
25 completed_at: str | None = None
26 result: str | None = None
27 error: str | None = None
28 pid: int | None = None
31class BackgroundManager:
32 def __init__(self, base_dir: str | None = None):
33 if base_dir:
34 self.base_dir = Path(base_dir)
35 else:
36 # Default to .stravinsky in the current working directory
37 self.base_dir = Path.cwd() / ".stravinsky"
39 self.tasks_dir = self.base_dir / "tasks"
40 self.state_file = self.base_dir / "tasks.json"
42 self.base_dir.mkdir(parents=True, exist_ok=True)
43 self.tasks_dir.mkdir(parents=True, exist_ok=True)
45 if not self.state_file.exists():
46 self._save_tasks({})
48 def _load_tasks(self) -> dict[str, Any]:
49 try:
50 with open(self.state_file) as f:
51 return json.load(f)
52 except (json.JSONDecodeError, FileNotFoundError):
53 return {}
55 def _save_tasks(self, tasks: dict[str, Any]):
56 with open(self.state_file, "w") as f:
57 json.dump(tasks, f, indent=2)
59 def create_task(self, prompt: str, model: str) -> str:
60 import uuid as uuid_module # Local import for MCP context
61 task_id = str(uuid_module.uuid4())[:8]
62 task = BackgroundTask(
63 id=task_id,
64 prompt=prompt,
65 model=model,
66 status="pending",
67 created_at=datetime.isoformat(datetime.now()),
68 )
70 tasks = self._load_tasks()
71 tasks[task_id] = asdict(task)
72 self._save_tasks(tasks)
73 return task_id
75 def update_task(self, task_id: str, **kwargs):
76 tasks = self._load_tasks()
77 if task_id in tasks:
78 tasks[task_id].update(kwargs)
79 self._save_tasks(tasks)
81 def get_task(self, task_id: str) -> dict[str, Any] | None:
82 tasks = self._load_tasks()
83 return tasks.get(task_id)
85 def list_tasks(self) -> list[dict[str, Any]]:
86 tasks = self._load_tasks()
87 return list(tasks.values())
89 def spawn(self, task_id: str):
90 """
91 Spawns a background process to execute the task.
92 In this implementation, it uses another instance of the MCP bridge
93 or a dedicated tool invoker script.
94 """
95 task = self.get_task(task_id)
96 if not task:
97 return
99 # We'll use a wrapper script that handles the model invocation and updates the status
100 # mcp_bridge/tools/task_runner.py (we will create this)
102 log_file = self.tasks_dir / f"{task_id}.log"
104 # Start the background process
105 cmd = [
106 sys.executable,
107 "-m", "mcp_bridge.tools.task_runner",
108 "--task-id", task_id,
109 "--base-dir", str(self.base_dir)
110 ]
112 try:
113 # Using Popen to run in background
114 process = subprocess.Popen(
115 cmd,
116 stdout=open(log_file, "w"),
117 stderr=subprocess.STDOUT,
118 start_new_session=True # Run in its own session so it doesn't die with the server
119 )
121 self.update_task(task_id, status="running", pid=process.pid, started_at=datetime.isoformat(datetime.now()))
122 except Exception as e:
123 self.update_task(task_id, status="failed", error=str(e))
126# Tool interface functions
128async def task_spawn(prompt: str, model: str = "gemini-3-flash") -> str:
129 """Spawns a new background task."""
130 manager = BackgroundManager()
131 task_id = manager.create_task(prompt, model)
132 manager.spawn(task_id)
133 return f"Task spawned with ID: {task_id}. Use task_status('{task_id}') to check progress."
136async def task_status(task_id: str) -> str:
137 """Checks the status of a background task."""
138 manager = BackgroundManager()
139 task = manager.get_task(task_id)
140 if not task:
141 return f"Task {task_id} not found."
143 status = task["status"]
144 if status == "completed":
145 return f"Task {task_id} COMPLETED:\n\n{task.get('result')}"
146 elif status == "failed":
147 return f"Task {task_id} FAILED:\n\n{task.get('error')}"
148 else:
149 return f"Task {task_id} is currently {status} (PID: {task.get('pid')})."
152async def task_list() -> str:
153 """Lists all background tasks."""
154 manager = BackgroundManager()
155 tasks = manager.list_tasks()
156 if not tasks:
157 return "No background tasks found."
159 lines = ["Background Tasks:"]
160 for t in tasks:
161 lines.append(f"- [{t['id']}] {t['status']}: {t['prompt'][:50]}...")
163 return "\n".join(lines)