Coverage for agentos/tools/task_scheduler.py: 0%
209 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 06:58 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 06:58 +0800
1"""
2Lightweight Task Scheduler & Job Queue for AgentOS.
4PriorityTaskQueue — bounded priority queue with deadline-aware scheduling.
5TaskScheduler — interval/delay/cron-like scheduling with persistence hooks.
6WorkerPool — simple thread-pool backed by the task queue.
7"""
9import heapq
10import threading
11import time
12import uuid
13from dataclasses import dataclass, field
14from enum import Enum, auto
15from typing import Any, Callable, Dict, List, Optional, Tuple
18# ============================================================================
19# Task & Result Types
20# ============================================================================
22class TaskStatus(Enum):
23 PENDING = auto()
24 RUNNING = auto()
25 COMPLETED = auto()
26 FAILED = auto()
27 EXPIRED = auto()
28 CANCELLED = auto()
31@dataclass(order=True)
32class _PrioritizedTask:
33 """Internal heap item. Lower priority value = higher urgency."""
34 priority: int
35 deadline: float
36 seq: int # tiebreaker for stable ordering
37 task: "Task" = field(compare=False)
40@dataclass
41class Task:
42 """A schedulable task with metadata."""
43 task_id: str
44 func: Callable
45 args: tuple = ()
46 kwargs: Dict[str, Any] = field(default_factory=dict)
47 priority: int = 0
48 deadline: Optional[float] = None
49 status: TaskStatus = TaskStatus.PENDING
50 result: Any = None
51 error: Optional[str] = None
52 created_at: float = field(default_factory=time.monotonic)
53 started_at: Optional[float] = None
54 completed_at: Optional[float] = None
56 def run(self):
57 self.started_at = time.monotonic()
58 self.status = TaskStatus.RUNNING
59 try:
60 self.result = self.func(*self.args, **self.kwargs)
61 self.status = TaskStatus.COMPLETED
62 except Exception as e:
63 self.error = str(e)
64 self.status = TaskStatus.FAILED
65 finally:
66 self.completed_at = time.monotonic()
68 @property
69 def elapsed(self) -> Optional[float]:
70 if self.started_at is None:
71 return None
72 return (self.completed_at or time.monotonic()) - self.started_at
75# ============================================================================
76# PriorityTaskQueue
77# ============================================================================
79class PriorityTaskQueue:
80 """Thread-safe bounded priority queue for tasks.
82 Lower priority value = executed first. Deadlines auto-expire stale tasks.
83 """
85 def __init__(self, max_size: int = 1000):
86 self._max_size = max_size
87 self._heap: List[_PrioritizedTask] = []
88 self._lookup: Dict[str, _PrioritizedTask] = {}
89 self._lock = threading.RLock()
90 self._seq: int = 0
91 self._total_enqueued: int = 0
92 self._total_dequeued: int = 0
93 self._total_expired: int = 0
95 def enqueue(self, task: Task) -> bool:
96 with self._lock:
97 if len(self._heap) >= self._max_size:
98 return False
99 self._seq += 1
100 pt = _PrioritizedTask(
101 priority=task.priority,
102 deadline=task.deadline or float('inf'),
103 seq=self._seq,
104 task=task,
105 )
106 heapq.heappush(self._heap, pt)
107 self._lookup[task.task_id] = pt
108 self._total_enqueued += 1
109 return True
111 def dequeue(self) -> Optional[Task]:
112 with self._lock:
113 self._clean_expired()
114 while self._heap:
115 pt = heapq.heappop(self._heap)
116 task = pt.task
117 if task.status == TaskStatus.CANCELLED:
118 self._lookup.pop(task.task_id, None)
119 continue
120 if task.deadline and time.monotonic() > task.deadline:
121 task.status = TaskStatus.EXPIRED
122 self._lookup.pop(task.task_id, None)
123 self._total_expired += 1
124 continue
125 self._lookup.pop(task.task_id, None)
126 self._total_dequeued += 1
127 return task
128 return None
130 def cancel(self, task_id: str) -> bool:
131 with self._lock:
132 pt = self._lookup.pop(task_id, None)
133 if pt:
134 pt.task.status = TaskStatus.CANCELLED
135 return True
136 return False
138 def _clean_expired(self):
139 """Remove cancelled tasks from heap top."""
140 now = time.monotonic()
141 while self._heap:
142 pt = self._heap[0]
143 if pt.task.status == TaskStatus.CANCELLED:
144 heapq.heappop(self._heap)
145 self._lookup.pop(pt.task.task_id, None)
146 elif pt.deadline != float('inf') and now > pt.deadline:
147 pt.task.status = TaskStatus.EXPIRED
148 heapq.heappop(self._heap)
149 self._lookup.pop(pt.task.task_id, None)
150 self._total_expired += 1
151 else:
152 break
154 @property
155 def size(self) -> int:
156 with self._lock:
157 return len(self._heap)
159 @property
160 def stats(self) -> Dict[str, Any]:
161 with self._lock:
162 return {
163 "size": len(self._heap),
164 "max_size": self._max_size,
165 "total_enqueued": self._total_enqueued,
166 "total_dequeued": self._total_dequeued,
167 "total_expired": self._total_expired,
168 }
171# ============================================================================
172# TaskScheduler
173# ============================================================================
175class TaskScheduler:
176 """Schedule tasks at intervals or after delays. Runs in a background thread."""
178 def __init__(self):
179 self._queue = PriorityTaskQueue()
180 self._running = False
181 self._thread: Optional[threading.Thread] = None
182 self._lock = threading.Lock()
183 self._scheduled_count: int = 0
184 self._executed_count: int = 0
186 def submit(self, task: Task) -> bool:
187 """Submit a task for immediate execution."""
188 ok = self._queue.enqueue(task)
189 if ok:
190 self._scheduled_count += 1
191 return ok
193 def schedule_after(self, func: Callable, delay: float, *args, **kwargs) -> Task:
194 """Schedule a task to run after delay seconds."""
195 task = Task(
196 task_id=str(uuid.uuid4()),
197 func=func,
198 args=args,
199 kwargs=kwargs,
200 deadline=time.monotonic() + delay,
201 priority=int(delay * 1000), # sooner = lower priority
202 )
203 self.submit(task)
204 return task
206 def schedule_at_interval(
207 self, func: Callable, interval: float, *args, **kwargs
208 ) -> None:
209 """Repeatedly schedule a task at fixed intervals. Uses a daemon thread."""
211 def _loop():
212 while self._running:
213 t = Task(
214 task_id=str(uuid.uuid4()),
215 func=func,
216 args=args,
217 kwargs=kwargs,
218 )
219 self.submit(t)
220 self._executed_count += 1
221 time.sleep(interval)
223 t = threading.Thread(target=_loop, daemon=True)
224 t.start()
226 def start(self) -> None:
227 self._running = True
229 def stop(self) -> None:
230 self._running = False
232 def run_once(self) -> Optional[Task]:
233 """Pull and run one task. Returns the executed task or None."""
234 task = self._queue.dequeue()
235 if task:
236 task.run()
237 self._executed_count += 1
238 return task
239 return None
241 def run_loop(self, max_tasks: int = 0) -> int:
242 """Run tasks until queue empty or max_tasks reached. Returns count executed."""
243 count = 0
244 self._running = True
245 while self._running:
246 task = self._queue.dequeue()
247 if task is None:
248 break
249 task.run()
250 count += 1
251 if 0 < max_tasks <= count:
252 break
253 self._running = False
254 self._executed_count += count
255 return count
257 @property
258 def pending(self) -> int:
259 return self._queue.size
261 @property
262 def stats(self) -> Dict[str, Any]:
263 return {
264 **self._queue.stats,
265 "scheduled_count": self._scheduled_count,
266 "executed_count": self._executed_count,
267 "pending": self.pending,
268 }
271# ============================================================================
272# WorkerPool
273# ============================================================================
275class WorkerPool:
276 """Simple thread pool pulling from a PriorityTaskQueue."""
278 def __init__(self, num_workers: int = 4, max_queue_size: int = 1000):
279 self._num_workers = num_workers
280 self._queue = PriorityTaskQueue(max_size=max_queue_size)
281 self._running = False
282 self._workers: List[threading.Thread] = []
283 self._lock = threading.Lock()
285 def submit(self, func: Callable, *args, **kwargs) -> Task:
286 task = Task(
287 task_id=str(uuid.uuid4()),
288 func=func,
289 args=args,
290 kwargs=kwargs,
291 )
292 ok = self._queue.enqueue(task)
293 if not ok:
294 raise RuntimeError("WorkerPool queue is full")
295 return task
297 def start(self) -> None:
298 self._running = True
299 for _ in range(self._num_workers):
300 t = threading.Thread(target=self._worker_loop, daemon=True)
301 t.start()
302 self._workers.append(t)
304 def stop(self, wait: bool = True) -> None:
305 self._running = False
306 if wait:
307 for t in self._workers:
308 t.join(timeout=5.0)
310 def _worker_loop(self) -> None:
311 while self._running:
312 task = self._queue.dequeue()
313 if task is None:
314 time.sleep(0.01)
315 continue
316 task.run()
318 @property
319 def pending(self) -> int:
320 return self._queue.size
322 @property
323 def stats(self) -> Dict[str, Any]:
324 return {
325 **self._queue.stats,
326 "workers": self._num_workers,
327 "running": self._running,
328 }