Coverage for agentos/tools/task_scheduler.py: 0%

209 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 06:58 +0800

1""" 

2Lightweight Task Scheduler & Job Queue for AgentOS. 

3 

4PriorityTaskQueue — bounded priority queue with deadline-aware scheduling. 

5TaskScheduler — interval/delay/cron-like scheduling with persistence hooks. 

6WorkerPool — simple thread-pool backed by the task queue. 

7""" 

8 

9import heapq 

10import threading 

11import time 

12import uuid 

13from dataclasses import dataclass, field 

14from enum import Enum, auto 

15from typing import Any, Callable, Dict, List, Optional, Tuple 

16 

17 

18# ============================================================================ 

19# Task & Result Types 

20# ============================================================================ 

21 

22class TaskStatus(Enum): 

23 PENDING = auto() 

24 RUNNING = auto() 

25 COMPLETED = auto() 

26 FAILED = auto() 

27 EXPIRED = auto() 

28 CANCELLED = auto() 

29 

30 

31@dataclass(order=True) 

32class _PrioritizedTask: 

33 """Internal heap item. Lower priority value = higher urgency.""" 

34 priority: int 

35 deadline: float 

36 seq: int # tiebreaker for stable ordering 

37 task: "Task" = field(compare=False) 

38 

39 

40@dataclass 

41class Task: 

42 """A schedulable task with metadata.""" 

43 task_id: str 

44 func: Callable 

45 args: tuple = () 

46 kwargs: Dict[str, Any] = field(default_factory=dict) 

47 priority: int = 0 

48 deadline: Optional[float] = None 

49 status: TaskStatus = TaskStatus.PENDING 

50 result: Any = None 

51 error: Optional[str] = None 

52 created_at: float = field(default_factory=time.monotonic) 

53 started_at: Optional[float] = None 

54 completed_at: Optional[float] = None 

55 

56 def run(self): 

57 self.started_at = time.monotonic() 

58 self.status = TaskStatus.RUNNING 

59 try: 

60 self.result = self.func(*self.args, **self.kwargs) 

61 self.status = TaskStatus.COMPLETED 

62 except Exception as e: 

63 self.error = str(e) 

64 self.status = TaskStatus.FAILED 

65 finally: 

66 self.completed_at = time.monotonic() 

67 

68 @property 

69 def elapsed(self) -> Optional[float]: 

70 if self.started_at is None: 

71 return None 

72 return (self.completed_at or time.monotonic()) - self.started_at 

73 

74 

75# ============================================================================ 

76# PriorityTaskQueue 

77# ============================================================================ 

78 

79class PriorityTaskQueue: 

80 """Thread-safe bounded priority queue for tasks. 

81 

82 Lower priority value = executed first. Deadlines auto-expire stale tasks. 

83 """ 

84 

85 def __init__(self, max_size: int = 1000): 

86 self._max_size = max_size 

87 self._heap: List[_PrioritizedTask] = [] 

88 self._lookup: Dict[str, _PrioritizedTask] = {} 

89 self._lock = threading.RLock() 

90 self._seq: int = 0 

91 self._total_enqueued: int = 0 

92 self._total_dequeued: int = 0 

93 self._total_expired: int = 0 

94 

95 def enqueue(self, task: Task) -> bool: 

96 with self._lock: 

97 if len(self._heap) >= self._max_size: 

98 return False 

99 self._seq += 1 

100 pt = _PrioritizedTask( 

101 priority=task.priority, 

102 deadline=task.deadline or float('inf'), 

103 seq=self._seq, 

104 task=task, 

105 ) 

106 heapq.heappush(self._heap, pt) 

107 self._lookup[task.task_id] = pt 

108 self._total_enqueued += 1 

109 return True 

110 

111 def dequeue(self) -> Optional[Task]: 

112 with self._lock: 

113 self._clean_expired() 

114 while self._heap: 

115 pt = heapq.heappop(self._heap) 

116 task = pt.task 

117 if task.status == TaskStatus.CANCELLED: 

118 self._lookup.pop(task.task_id, None) 

119 continue 

120 if task.deadline and time.monotonic() > task.deadline: 

121 task.status = TaskStatus.EXPIRED 

122 self._lookup.pop(task.task_id, None) 

123 self._total_expired += 1 

124 continue 

125 self._lookup.pop(task.task_id, None) 

126 self._total_dequeued += 1 

127 return task 

128 return None 

129 

130 def cancel(self, task_id: str) -> bool: 

131 with self._lock: 

132 pt = self._lookup.pop(task_id, None) 

133 if pt: 

134 pt.task.status = TaskStatus.CANCELLED 

135 return True 

136 return False 

137 

138 def _clean_expired(self): 

139 """Remove cancelled tasks from heap top.""" 

140 now = time.monotonic() 

141 while self._heap: 

142 pt = self._heap[0] 

143 if pt.task.status == TaskStatus.CANCELLED: 

144 heapq.heappop(self._heap) 

145 self._lookup.pop(pt.task.task_id, None) 

146 elif pt.deadline != float('inf') and now > pt.deadline: 

147 pt.task.status = TaskStatus.EXPIRED 

148 heapq.heappop(self._heap) 

149 self._lookup.pop(pt.task.task_id, None) 

150 self._total_expired += 1 

151 else: 

152 break 

153 

154 @property 

155 def size(self) -> int: 

156 with self._lock: 

157 return len(self._heap) 

158 

159 @property 

160 def stats(self) -> Dict[str, Any]: 

161 with self._lock: 

162 return { 

163 "size": len(self._heap), 

164 "max_size": self._max_size, 

165 "total_enqueued": self._total_enqueued, 

166 "total_dequeued": self._total_dequeued, 

167 "total_expired": self._total_expired, 

168 } 

169 

170 

171# ============================================================================ 

172# TaskScheduler 

173# ============================================================================ 

174 

175class TaskScheduler: 

176 """Schedule tasks at intervals or after delays. Runs in a background thread.""" 

177 

178 def __init__(self): 

179 self._queue = PriorityTaskQueue() 

180 self._running = False 

181 self._thread: Optional[threading.Thread] = None 

182 self._lock = threading.Lock() 

183 self._scheduled_count: int = 0 

184 self._executed_count: int = 0 

185 

186 def submit(self, task: Task) -> bool: 

187 """Submit a task for immediate execution.""" 

188 ok = self._queue.enqueue(task) 

189 if ok: 

190 self._scheduled_count += 1 

191 return ok 

192 

193 def schedule_after(self, func: Callable, delay: float, *args, **kwargs) -> Task: 

194 """Schedule a task to run after delay seconds.""" 

195 task = Task( 

196 task_id=str(uuid.uuid4()), 

197 func=func, 

198 args=args, 

199 kwargs=kwargs, 

200 deadline=time.monotonic() + delay, 

201 priority=int(delay * 1000), # sooner = lower priority 

202 ) 

203 self.submit(task) 

204 return task 

205 

206 def schedule_at_interval( 

207 self, func: Callable, interval: float, *args, **kwargs 

208 ) -> None: 

209 """Repeatedly schedule a task at fixed intervals. Uses a daemon thread.""" 

210 

211 def _loop(): 

212 while self._running: 

213 t = Task( 

214 task_id=str(uuid.uuid4()), 

215 func=func, 

216 args=args, 

217 kwargs=kwargs, 

218 ) 

219 self.submit(t) 

220 self._executed_count += 1 

221 time.sleep(interval) 

222 

223 t = threading.Thread(target=_loop, daemon=True) 

224 t.start() 

225 

226 def start(self) -> None: 

227 self._running = True 

228 

229 def stop(self) -> None: 

230 self._running = False 

231 

232 def run_once(self) -> Optional[Task]: 

233 """Pull and run one task. Returns the executed task or None.""" 

234 task = self._queue.dequeue() 

235 if task: 

236 task.run() 

237 self._executed_count += 1 

238 return task 

239 return None 

240 

241 def run_loop(self, max_tasks: int = 0) -> int: 

242 """Run tasks until queue empty or max_tasks reached. Returns count executed.""" 

243 count = 0 

244 self._running = True 

245 while self._running: 

246 task = self._queue.dequeue() 

247 if task is None: 

248 break 

249 task.run() 

250 count += 1 

251 if 0 < max_tasks <= count: 

252 break 

253 self._running = False 

254 self._executed_count += count 

255 return count 

256 

257 @property 

258 def pending(self) -> int: 

259 return self._queue.size 

260 

261 @property 

262 def stats(self) -> Dict[str, Any]: 

263 return { 

264 **self._queue.stats, 

265 "scheduled_count": self._scheduled_count, 

266 "executed_count": self._executed_count, 

267 "pending": self.pending, 

268 } 

269 

270 

271# ============================================================================ 

272# WorkerPool 

273# ============================================================================ 

274 

275class WorkerPool: 

276 """Simple thread pool pulling from a PriorityTaskQueue.""" 

277 

278 def __init__(self, num_workers: int = 4, max_queue_size: int = 1000): 

279 self._num_workers = num_workers 

280 self._queue = PriorityTaskQueue(max_size=max_queue_size) 

281 self._running = False 

282 self._workers: List[threading.Thread] = [] 

283 self._lock = threading.Lock() 

284 

285 def submit(self, func: Callable, *args, **kwargs) -> Task: 

286 task = Task( 

287 task_id=str(uuid.uuid4()), 

288 func=func, 

289 args=args, 

290 kwargs=kwargs, 

291 ) 

292 ok = self._queue.enqueue(task) 

293 if not ok: 

294 raise RuntimeError("WorkerPool queue is full") 

295 return task 

296 

297 def start(self) -> None: 

298 self._running = True 

299 for _ in range(self._num_workers): 

300 t = threading.Thread(target=self._worker_loop, daemon=True) 

301 t.start() 

302 self._workers.append(t) 

303 

304 def stop(self, wait: bool = True) -> None: 

305 self._running = False 

306 if wait: 

307 for t in self._workers: 

308 t.join(timeout=5.0) 

309 

310 def _worker_loop(self) -> None: 

311 while self._running: 

312 task = self._queue.dequeue() 

313 if task is None: 

314 time.sleep(0.01) 

315 continue 

316 task.run() 

317 

318 @property 

319 def pending(self) -> int: 

320 return self._queue.size 

321 

322 @property 

323 def stats(self) -> Dict[str, Any]: 

324 return { 

325 **self._queue.stats, 

326 "workers": self._num_workers, 

327 "running": self._running, 

328 }