Coverage for agentos/tools/scheduler.py: 0%

237 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 08:00 +0800

1""" 

2Scheduler — lightweight interval + cron-style job scheduler. 

3 

4Supports: 

5 - Fixed interval scheduling (every N seconds) 

6 - Delayed one-shot scheduling 

7 - Cron-style scheduling (minute, hour, day of month, month, day of week) 

8 - Job lifecycle (start, stop, pause, resume) 

9 - Execution history / stats 

10 - Thread-safe 

11""" 

12 

13from __future__ import annotations 

14 

15import threading 

16import time 

17from dataclasses import dataclass, field 

18from typing import Any, Callable, Dict, List, Optional, Set 

19 

20 

21# ============================================================================ 

22# Job 

23# ============================================================================ 

24 

25class JobState: 

26 PENDING = "pending" 

27 RUNNING = "running" 

28 PAUSED = "paused" 

29 STOPPED = "stopped" 

30 

31 

32@dataclass 

33class Job: 

34 id: str 

35 func: Callable[..., Any] 

36 args: tuple = () 

37 kwargs: Dict[str, Any] = field(default_factory=dict) 

38 # Scheduling 

39 interval: Optional[float] = None # Fixed interval in seconds 

40 cron: Optional[Dict[str, str]] = None # {"minute": "*", "hour": "*", ...} 

41 delay: Optional[float] = None # One-shot delay 

42 # State 

43 state: str = JobState.PENDING 

44 next_run: float = 0.0 

45 last_run: Optional[float] = None 

46 run_count: int = 0 

47 error_count: int = 0 

48 last_error: Optional[str] = None 

49 _timer: Optional[threading.Timer] = field(default=None, repr=False) 

50 

51 

52# ============================================================================ 

53# Cron Parser 

54# ============================================================================ 

55 

56def _cron_next(cron: Dict[str, str], now: float) -> float: 

57 """Compute next run time from cron expression. Returns timestamp.""" 

58 import calendar 

59 t = time.localtime(now) 

60 # Parse fields 

61 minutes = _parse_cron_field(cron.get("minute", "*"), 0, 59) 

62 hours = _parse_cron_field(cron.get("hour", "*"), 0, 23) 

63 doms = _parse_cron_field(cron.get("day", "*"), 1, 31) 

64 months = _parse_cron_field(cron.get("month", "*"), 1, 12) 

65 dows = _parse_cron_field(cron.get("day_of_week", "*"), 0, 6) 

66 

67 # Search forward 

68 year = t.tm_year 

69 month = t.tm_mon 

70 day = t.tm_mday 

71 hour = t.tm_hour 

72 minute = t.tm_min 

73 

74 for _ in range(366 * 5): # max 5 years search 

75 # Check month 

76 if month not in months: 

77 month += 1 

78 if month > 12: 

79 month = 1 

80 year += 1 

81 day = 1 

82 hour = 0 

83 minute = 0 

84 continue 

85 

86 # Check day (DOM and DOW) 

87 if day > calendar.monthrange(year, month)[1]: 

88 day = 1 

89 month += 1 

90 if month > 12: 

91 month = 1 

92 year += 1 

93 hour = 0 

94 minute = 0 

95 continue 

96 

97 dow = calendar.weekday(year, month, day) 

98 # Convert Monday=0..Sunday=6 to Sunday=0..Saturday=6 

99 dow = (dow + 1) % 7 

100 

101 if day not in doms and dow not in dows: 

102 day += 1 

103 hour = 0 

104 minute = 0 

105 continue 

106 

107 # Check hour 

108 if hour not in hours: 

109 hour += 1 

110 if hour > 23: 

111 hour = 0 

112 day += 1 

113 minute = 0 

114 continue 

115 

116 # Check minute 

117 if minute not in minutes: 

118 minute += 1 

119 if minute > 59: 

120 minute = 0 

121 hour += 1 

122 continue 

123 

124 # Found 

125 result = time.mktime((year, month, day, hour, minute, 0, 0, 0, 0)) 

126 if result > now: 

127 return result 

128 # Advance past current instant 

129 minute += 1 

130 

131 raise RuntimeError("No cron match within 5 years") 

132 

133 

134def _parse_cron_field(field: str, lo: int, hi: int) -> Set[int]: 

135 """Parse cron field like '*' or '1,3,5' or '*/15'.""" 

136 if field == "*": 

137 return set(range(lo, hi + 1)) 

138 result = set() 

139 for part in field.split(","): 

140 part = part.strip() 

141 if "/" in part: 

142 base, step = part.split("/") 

143 base = lo if base == "*" else int(base) 

144 step = int(step) 

145 for v in range(base, hi + 1, step): 

146 result.add(v) 

147 elif "-" in part: 

148 a, b = part.split("-") 

149 result.update(range(int(a), int(b) + 1)) 

150 else: 

151 result.add(int(part)) 

152 return result 

153 

154 

155# ============================================================================ 

156# Scheduler 

157# ============================================================================ 

158 

159class Scheduler: 

160 """Lightweight job scheduler. 

161 

162 Usage: 

163 sched = Scheduler() 

164 

165 # Every 5 seconds 

166 sched.every(5).do(my_func, arg1, arg2) 

167 

168 # Every minute 

169 sched.cron("*/1 * * * *").do(my_func) 

170 

171 # One-shot delay 

172 sched.delay(10).do(my_func) 

173 

174 sched.start() 

175 """ 

176 

177 def __init__(self): 

178 self._jobs: Dict[str, Job] = {} 

179 self._lock = threading.RLock() 

180 self._running = False 

181 self._id_counter = 0 

182 

183 # ---------- Fluent API ---------- 

184 

185 def every(self, seconds: float) -> Scheduler: 

186 self._last_interval = seconds 

187 self._last_cron = None 

188 self._last_delay = None 

189 return self 

190 

191 def cron(self, expression: str) -> Scheduler: 

192 """Cron expression: 'minute hour day month day_of_week'""" 

193 parts = expression.strip().split() 

194 if len(parts) != 5: 

195 raise ValueError("Cron expression must have 5 fields: 'minute hour day month day_of_week'") 

196 self._last_cron = { 

197 "minute": parts[0], 

198 "hour": parts[1], 

199 "day": parts[2], 

200 "month": parts[3], 

201 "day_of_week": parts[4], 

202 } 

203 self._last_interval = None 

204 self._last_delay = None 

205 return self 

206 

207 def delay(self, seconds: float) -> Scheduler: 

208 self._last_delay = seconds 

209 self._last_interval = None 

210 self._last_cron = None 

211 return self 

212 

213 def do(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Job: 

214 """Register the job and return it.""" 

215 self._id_counter += 1 

216 job_id = f"job_{self._id_counter}" 

217 

218 interval = getattr(self, "_last_interval", None) 

219 cron = getattr(self, "_last_cron", None) 

220 delay = getattr(self, "_last_delay", None) 

221 

222 now = time.time() 

223 if interval: 

224 next_run = now + interval 

225 elif cron: 

226 next_run = _cron_next(cron, now) 

227 elif delay: 

228 next_run = now + delay 

229 else: 

230 raise ValueError("Must call every(), cron(), or delay() before do()") 

231 

232 job = Job( 

233 id=job_id, 

234 func=func, 

235 args=args, 

236 kwargs=kwargs, 

237 interval=interval, 

238 cron=cron, 

239 delay=delay, 

240 next_run=next_run, 

241 ) 

242 

243 with self._lock: 

244 self._jobs[job_id] = job 

245 

246 # Schedule if running 

247 if self._running: 

248 self._schedule_job(job) 

249 

250 return job 

251 

252 # ---------- Lifecycle ---------- 

253 

254 def start(self) -> None: 

255 with self._lock: 

256 if self._running: 

257 return 

258 self._running = True 

259 for job in self._jobs.values(): 

260 if job.state in (JobState.PENDING,): 

261 self._schedule_job(job) 

262 

263 def stop(self) -> None: 

264 with self._lock: 

265 self._running = False 

266 for job in self._jobs.values(): 

267 if job._timer: 

268 job._timer.cancel() 

269 job._timer = None 

270 if job.state == JobState.RUNNING: 

271 job.state = JobState.PAUSED 

272 

273 def pause(self, job_id: str) -> bool: 

274 with self._lock: 

275 job = self._jobs.get(job_id) 

276 if not job: 

277 return False 

278 if job._timer: 

279 job._timer.cancel() 

280 job._timer = None 

281 job.state = JobState.PAUSED 

282 return True 

283 

284 def resume(self, job_id: str) -> bool: 

285 with self._lock: 

286 job = self._jobs.get(job_id) 

287 if not job: 

288 return False 

289 if job.state != JobState.PAUSED: 

290 return False 

291 job.state = JobState.PENDING 

292 job.next_run = time.time() # run immediately 

293 if self._running: 

294 self._schedule_job(job) 

295 return True 

296 

297 def remove(self, job_id: str) -> bool: 

298 with self._lock: 

299 job = self._jobs.pop(job_id, None) 

300 if job and job._timer: 

301 job._timer.cancel() 

302 return job is not None 

303 

304 # ---------- Internal ---------- 

305 

306 def _schedule_job(self, job: Job) -> None: 

307 if job.state == JobState.PAUSED: 

308 return 

309 job.state = JobState.PENDING 

310 delay = max(0, job.next_run - time.time()) 

311 timer = threading.Timer(delay, self._run_job, args=[job.id]) 

312 timer.daemon = True 

313 job._timer = timer 

314 timer.start() 

315 

316 def _run_job(self, job_id: str) -> None: 

317 with self._lock: 

318 job = self._jobs.get(job_id) 

319 if not job or not self._running: 

320 return 

321 job.state = JobState.RUNNING 

322 

323 try: 

324 job.func(*job.args, **job.kwargs) 

325 job.last_error = None 

326 except Exception as e: 

327 job.error_count += 1 

328 job.last_error = str(e) 

329 finally: 

330 with self._lock: 

331 job.last_run = time.time() 

332 job.run_count += 1 

333 

334 if job.delay: 

335 # One-shot — mark done 

336 job.state = JobState.STOPPED 

337 return 

338 

339 if job.state != JobState.PAUSED: 

340 # Compute next run 

341 if job.interval: 

342 job.next_run = time.time() + job.interval 

343 elif job.cron: 

344 job.next_run = _cron_next(job.cron, time.time()) 

345 else: 

346 job.state = JobState.STOPPED 

347 return 

348 

349 if self._running: 

350 self._schedule_job(job) 

351 

352 # ---------- Query ---------- 

353 

354 def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: 

355 with self._lock: 

356 job = self._jobs.get(job_id) 

357 if not job: 

358 return None 

359 return self._job_info(job) 

360 

361 def list_jobs(self) -> List[Dict[str, Any]]: 

362 with self._lock: 

363 return [self._job_info(j) for j in self._jobs.values()] 

364 

365 @staticmethod 

366 def _job_info(job: Job) -> Dict[str, Any]: 

367 return { 

368 "id": job.id, 

369 "state": job.state, 

370 "interval": job.interval, 

371 "cron": job.cron, 

372 "delay": job.delay, 

373 "next_run": job.next_run, 

374 "last_run": job.last_run, 

375 "run_count": job.run_count, 

376 "error_count": job.error_count, 

377 "last_error": job.last_error, 

378 }