Coverage for agentos/tools/scheduler.py: 0%
237 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:00 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 08:00 +0800
1"""
2Scheduler — lightweight interval + cron-style job scheduler.
4Supports:
5 - Fixed interval scheduling (every N seconds)
6 - Delayed one-shot scheduling
7 - Cron-style scheduling (minute, hour, day of month, month, day of week)
8 - Job lifecycle (start, stop, pause, resume)
9 - Execution history / stats
10 - Thread-safe
11"""
13from __future__ import annotations
15import threading
16import time
17from dataclasses import dataclass, field
18from typing import Any, Callable, Dict, List, Optional, Set
21# ============================================================================
22# Job
23# ============================================================================
25class JobState:
26 PENDING = "pending"
27 RUNNING = "running"
28 PAUSED = "paused"
29 STOPPED = "stopped"
32@dataclass
33class Job:
34 id: str
35 func: Callable[..., Any]
36 args: tuple = ()
37 kwargs: Dict[str, Any] = field(default_factory=dict)
38 # Scheduling
39 interval: Optional[float] = None # Fixed interval in seconds
40 cron: Optional[Dict[str, str]] = None # {"minute": "*", "hour": "*", ...}
41 delay: Optional[float] = None # One-shot delay
42 # State
43 state: str = JobState.PENDING
44 next_run: float = 0.0
45 last_run: Optional[float] = None
46 run_count: int = 0
47 error_count: int = 0
48 last_error: Optional[str] = None
49 _timer: Optional[threading.Timer] = field(default=None, repr=False)
52# ============================================================================
53# Cron Parser
54# ============================================================================
56def _cron_next(cron: Dict[str, str], now: float) -> float:
57 """Compute next run time from cron expression. Returns timestamp."""
58 import calendar
59 t = time.localtime(now)
60 # Parse fields
61 minutes = _parse_cron_field(cron.get("minute", "*"), 0, 59)
62 hours = _parse_cron_field(cron.get("hour", "*"), 0, 23)
63 doms = _parse_cron_field(cron.get("day", "*"), 1, 31)
64 months = _parse_cron_field(cron.get("month", "*"), 1, 12)
65 dows = _parse_cron_field(cron.get("day_of_week", "*"), 0, 6)
67 # Search forward
68 year = t.tm_year
69 month = t.tm_mon
70 day = t.tm_mday
71 hour = t.tm_hour
72 minute = t.tm_min
74 for _ in range(366 * 5): # max 5 years search
75 # Check month
76 if month not in months:
77 month += 1
78 if month > 12:
79 month = 1
80 year += 1
81 day = 1
82 hour = 0
83 minute = 0
84 continue
86 # Check day (DOM and DOW)
87 if day > calendar.monthrange(year, month)[1]:
88 day = 1
89 month += 1
90 if month > 12:
91 month = 1
92 year += 1
93 hour = 0
94 minute = 0
95 continue
97 dow = calendar.weekday(year, month, day)
98 # Convert Monday=0..Sunday=6 to Sunday=0..Saturday=6
99 dow = (dow + 1) % 7
101 if day not in doms and dow not in dows:
102 day += 1
103 hour = 0
104 minute = 0
105 continue
107 # Check hour
108 if hour not in hours:
109 hour += 1
110 if hour > 23:
111 hour = 0
112 day += 1
113 minute = 0
114 continue
116 # Check minute
117 if minute not in minutes:
118 minute += 1
119 if minute > 59:
120 minute = 0
121 hour += 1
122 continue
124 # Found
125 result = time.mktime((year, month, day, hour, minute, 0, 0, 0, 0))
126 if result > now:
127 return result
128 # Advance past current instant
129 minute += 1
131 raise RuntimeError("No cron match within 5 years")
134def _parse_cron_field(field: str, lo: int, hi: int) -> Set[int]:
135 """Parse cron field like '*' or '1,3,5' or '*/15'."""
136 if field == "*":
137 return set(range(lo, hi + 1))
138 result = set()
139 for part in field.split(","):
140 part = part.strip()
141 if "/" in part:
142 base, step = part.split("/")
143 base = lo if base == "*" else int(base)
144 step = int(step)
145 for v in range(base, hi + 1, step):
146 result.add(v)
147 elif "-" in part:
148 a, b = part.split("-")
149 result.update(range(int(a), int(b) + 1))
150 else:
151 result.add(int(part))
152 return result
155# ============================================================================
156# Scheduler
157# ============================================================================
159class Scheduler:
160 """Lightweight job scheduler.
162 Usage:
163 sched = Scheduler()
165 # Every 5 seconds
166 sched.every(5).do(my_func, arg1, arg2)
168 # Every minute
169 sched.cron("*/1 * * * *").do(my_func)
171 # One-shot delay
172 sched.delay(10).do(my_func)
174 sched.start()
175 """
177 def __init__(self):
178 self._jobs: Dict[str, Job] = {}
179 self._lock = threading.RLock()
180 self._running = False
181 self._id_counter = 0
183 # ---------- Fluent API ----------
185 def every(self, seconds: float) -> Scheduler:
186 self._last_interval = seconds
187 self._last_cron = None
188 self._last_delay = None
189 return self
191 def cron(self, expression: str) -> Scheduler:
192 """Cron expression: 'minute hour day month day_of_week'"""
193 parts = expression.strip().split()
194 if len(parts) != 5:
195 raise ValueError("Cron expression must have 5 fields: 'minute hour day month day_of_week'")
196 self._last_cron = {
197 "minute": parts[0],
198 "hour": parts[1],
199 "day": parts[2],
200 "month": parts[3],
201 "day_of_week": parts[4],
202 }
203 self._last_interval = None
204 self._last_delay = None
205 return self
207 def delay(self, seconds: float) -> Scheduler:
208 self._last_delay = seconds
209 self._last_interval = None
210 self._last_cron = None
211 return self
213 def do(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Job:
214 """Register the job and return it."""
215 self._id_counter += 1
216 job_id = f"job_{self._id_counter}"
218 interval = getattr(self, "_last_interval", None)
219 cron = getattr(self, "_last_cron", None)
220 delay = getattr(self, "_last_delay", None)
222 now = time.time()
223 if interval:
224 next_run = now + interval
225 elif cron:
226 next_run = _cron_next(cron, now)
227 elif delay:
228 next_run = now + delay
229 else:
230 raise ValueError("Must call every(), cron(), or delay() before do()")
232 job = Job(
233 id=job_id,
234 func=func,
235 args=args,
236 kwargs=kwargs,
237 interval=interval,
238 cron=cron,
239 delay=delay,
240 next_run=next_run,
241 )
243 with self._lock:
244 self._jobs[job_id] = job
246 # Schedule if running
247 if self._running:
248 self._schedule_job(job)
250 return job
252 # ---------- Lifecycle ----------
254 def start(self) -> None:
255 with self._lock:
256 if self._running:
257 return
258 self._running = True
259 for job in self._jobs.values():
260 if job.state in (JobState.PENDING,):
261 self._schedule_job(job)
263 def stop(self) -> None:
264 with self._lock:
265 self._running = False
266 for job in self._jobs.values():
267 if job._timer:
268 job._timer.cancel()
269 job._timer = None
270 if job.state == JobState.RUNNING:
271 job.state = JobState.PAUSED
273 def pause(self, job_id: str) -> bool:
274 with self._lock:
275 job = self._jobs.get(job_id)
276 if not job:
277 return False
278 if job._timer:
279 job._timer.cancel()
280 job._timer = None
281 job.state = JobState.PAUSED
282 return True
284 def resume(self, job_id: str) -> bool:
285 with self._lock:
286 job = self._jobs.get(job_id)
287 if not job:
288 return False
289 if job.state != JobState.PAUSED:
290 return False
291 job.state = JobState.PENDING
292 job.next_run = time.time() # run immediately
293 if self._running:
294 self._schedule_job(job)
295 return True
297 def remove(self, job_id: str) -> bool:
298 with self._lock:
299 job = self._jobs.pop(job_id, None)
300 if job and job._timer:
301 job._timer.cancel()
302 return job is not None
304 # ---------- Internal ----------
306 def _schedule_job(self, job: Job) -> None:
307 if job.state == JobState.PAUSED:
308 return
309 job.state = JobState.PENDING
310 delay = max(0, job.next_run - time.time())
311 timer = threading.Timer(delay, self._run_job, args=[job.id])
312 timer.daemon = True
313 job._timer = timer
314 timer.start()
316 def _run_job(self, job_id: str) -> None:
317 with self._lock:
318 job = self._jobs.get(job_id)
319 if not job or not self._running:
320 return
321 job.state = JobState.RUNNING
323 try:
324 job.func(*job.args, **job.kwargs)
325 job.last_error = None
326 except Exception as e:
327 job.error_count += 1
328 job.last_error = str(e)
329 finally:
330 with self._lock:
331 job.last_run = time.time()
332 job.run_count += 1
334 if job.delay:
335 # One-shot — mark done
336 job.state = JobState.STOPPED
337 return
339 if job.state != JobState.PAUSED:
340 # Compute next run
341 if job.interval:
342 job.next_run = time.time() + job.interval
343 elif job.cron:
344 job.next_run = _cron_next(job.cron, time.time())
345 else:
346 job.state = JobState.STOPPED
347 return
349 if self._running:
350 self._schedule_job(job)
352 # ---------- Query ----------
354 def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
355 with self._lock:
356 job = self._jobs.get(job_id)
357 if not job:
358 return None
359 return self._job_info(job)
361 def list_jobs(self) -> List[Dict[str, Any]]:
362 with self._lock:
363 return [self._job_info(j) for j in self._jobs.values()]
365 @staticmethod
366 def _job_info(job: Job) -> Dict[str, Any]:
367 return {
368 "id": job.id,
369 "state": job.state,
370 "interval": job.interval,
371 "cron": job.cron,
372 "delay": job.delay,
373 "next_run": job.next_run,
374 "last_run": job.last_run,
375 "run_count": job.run_count,
376 "error_count": job.error_count,
377 "last_error": job.last_error,
378 }