Coverage for session_buddy / natural_scheduler.py: 16.82%

255 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Natural Language Scheduling module for time-based reminders and triggers. 

2 

3This module provides intelligent scheduling capabilities including: 

4- Natural language time parsing ("in 30 minutes", "tomorrow at 9am") 

5- Recurring reminders and cron-like scheduling 

6- Context-aware reminder triggers 

7- Integration with session workflow 

8""" 

9 

10import asyncio 

11import importlib.util 

12import json 

13import logging 

14import sqlite3 

15import threading 

16import time 

17from collections.abc import Callable 

18from datetime import datetime, timedelta 

19from pathlib import Path 

20from typing import Any 

21 

22DATEUTIL_AVAILABLE = importlib.util.find_spec("dateutil") is not None 

23CRONTAB_AVAILABLE = importlib.util.find_spec("python_crontab") is not None 

24SCHEDULE_AVAILABLE = importlib.util.find_spec("schedule") is not None 

25 

26if DATEUTIL_AVAILABLE: 26 ↛ 29line 26 didn't jump to line 29 because the condition on line 26 was always true

27 from dateutil.relativedelta import relativedelta 

28 

29from .session_types import RecurrenceInterval 

30from .utils.scheduler import ( 

31 NaturalLanguageParser, 

32 NaturalReminder, 

33 ReminderStatus, 

34 ReminderType, 

35) 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40class ReminderScheduler: 

41 """Manages scheduling and execution of reminders.""" 

42 

43 def __init__(self, db_path: str | None = None) -> None: 

44 """Initialize reminder scheduler.""" 

45 self.db_path = db_path or str( 

46 Path.home() / ".claude" / "data" / "natural_scheduler.db", 

47 ) 

48 self.parser = NaturalLanguageParser() 

49 self._lock = threading.Lock() 

50 self._running = False 

51 self._scheduler_thread: threading.Thread | None = None 

52 self._callbacks: dict[str, list[Callable[..., Any]]] = {} 

53 self._init_database() 

54 

55 def _init_database(self) -> None: 

56 """Initialize SQLite database for reminders.""" 

57 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 

58 

59 with sqlite3.connect(self.db_path) as conn: 

60 conn.execute(""" 

61 CREATE TABLE IF NOT EXISTS reminders ( 

62 reminder_id TEXT PRIMARY KEY, 

63 reminder_type TEXT NOT NULL, 

64 expression TEXT, 

65 scheduled_time TIMESTAMP NOT NULL, 

66 action TEXT, 

67 status TEXT NOT NULL, 

68 created_at TIMESTAMP, 

69 executed_at TIMESTAMP, 

70 recurrence_pattern TEXT, 

71 failure_reason TEXT, 

72 metadata TEXT -- JSON object 

73 ) 

74 """) 

75 

76 conn.execute(""" 

77 CREATE TABLE IF NOT EXISTS reminder_history ( 

78 id INTEGER PRIMARY KEY AUTOINCREMENT, 

79 reminder_id TEXT NOT NULL, 

80 action TEXT NOT NULL, 

81 timestamp TIMESTAMP, 

82 result TEXT, 

83 details TEXT -- JSON object 

84 ) 

85 """) 

86 

87 # Create indices 

88 conn.execute( 

89 "CREATE INDEX IF NOT EXISTS idx_reminders_scheduled ON reminders(scheduled_for)", 

90 ) 

91 conn.execute( 

92 "CREATE INDEX IF NOT EXISTS idx_reminders_status ON reminders(status)", 

93 ) 

94 conn.execute( 

95 "CREATE INDEX IF NOT EXISTS idx_reminders_user ON reminders(user_id)", 

96 ) 

97 conn.execute( 

98 "CREATE INDEX IF NOT EXISTS idx_reminders_project ON reminders(project_id)", 

99 ) 

100 

101 async def create_reminder( 

102 self, 

103 title: str, 

104 time_expression: str, 

105 description: str = "", 

106 user_id: str = "default", 

107 project_id: str | None = None, 

108 notification_method: str = "session", 

109 context_triggers: list[str] | None = None, 

110 metadata: dict[str, Any] | None = None, 

111 ) -> str | None: 

112 """Create a new reminder from natural language.""" 

113 # Parse the time expression 

114 scheduled_time = self.parser.parse_time_expression(time_expression) 

115 if not scheduled_time: 

116 return None 

117 

118 # Check for recurrence 

119 recurrence_pattern = self.parser.parse_recurrence(time_expression) 

120 reminder_type = ( 

121 ReminderType.RECURRING if recurrence_pattern else ReminderType.TASK 

122 ) 

123 

124 # Generate reminder ID 

125 reminder_id = f"rem_{int(time.time() * 1000)}" 

126 

127 # Build metadata from additional fields 

128 reminder_metadata: dict[str, Any] = { 

129 "title": title, 

130 "description": description, 

131 "user_id": user_id, 

132 "project_id": project_id, 

133 "context_triggers": context_triggers or [], 

134 "notification_method": notification_method, 

135 } 

136 if metadata: 

137 reminder_metadata.update(metadata) 

138 

139 reminder = NaturalReminder( 

140 reminder_id=reminder_id, 

141 reminder_type=reminder_type, 

142 expression=time_expression, 

143 scheduled_time=scheduled_time, 

144 action=title or description, 

145 status=ReminderStatus.PENDING, 

146 created_at=datetime.now(), 

147 executed_at=None, 

148 recurrence_pattern=recurrence_pattern, 

149 metadata=reminder_metadata, 

150 ) 

151 

152 # Store in database 

153 with sqlite3.connect(self.db_path) as conn: 

154 conn.execute( 

155 """ 

156 INSERT INTO reminders (reminder_id, reminder_type, expression, scheduled_time, action, 

157 status, created_at, executed_at, recurrence_pattern, metadata) 

158 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

159 """, 

160 ( 

161 reminder.reminder_id, 

162 reminder.reminder_type.value, 

163 reminder.expression, 

164 reminder.scheduled_time, 

165 reminder.action, 

166 reminder.status.value, 

167 reminder.created_at, 

168 reminder.executed_at, 

169 reminder.recurrence_pattern, 

170 json.dumps(reminder.metadata), 

171 ), 

172 ) 

173 

174 # Log creation 

175 await self._log_reminder_action( 

176 reminder_id, 

177 "created", 

178 "success", 

179 { 

180 "scheduled_for": scheduled_time.isoformat(), 

181 "time_expression": time_expression, 

182 }, 

183 ) 

184 

185 return reminder_id 

186 

187 async def get_pending_reminders( 

188 self, 

189 user_id: str | None = None, 

190 project_id: str | None = None, 

191 ) -> list[dict[str, Any]]: 

192 """Get pending reminders.""" 

193 with sqlite3.connect(self.db_path) as conn: 

194 conn.row_factory = sqlite3.Row 

195 

196 where_conditions = ["status IN ('pending', 'active')"] 

197 params = [] 

198 

199 if user_id: 

200 where_conditions.append("user_id = ?") 

201 params.append(user_id) 

202 

203 if project_id: 

204 where_conditions.append("project_id = ?") 

205 params.append(project_id) 

206 

207 # Build SQL safely - all user input is parameterized via params list 

208 query = ( 

209 "SELECT * FROM reminders WHERE " 

210 + " AND ".join(where_conditions) 

211 + " ORDER BY scheduled_for" 

212 ) 

213 

214 cursor = conn.execute(query, params) 

215 results = [] 

216 

217 for row in cursor.fetchall(): 

218 result = dict(row) 

219 result["context_triggers"] = json.loads( 

220 result["context_triggers"] or "[]", 

221 ) 

222 result["metadata"] = json.loads(result["metadata"] or "{}") 

223 results.append(result) 

224 

225 return results 

226 

227 async def get_due_reminders( 

228 self, 

229 check_time: datetime | None = None, 

230 ) -> list[dict[str, Any]]: 

231 """Get reminders that are due for execution.""" 

232 check_time = check_time or datetime.now() 

233 

234 with sqlite3.connect(self.db_path) as conn: 

235 conn.row_factory = sqlite3.Row 

236 

237 cursor = conn.execute( 

238 """ 

239 SELECT * FROM reminders 

240 WHERE status = 'pending' AND scheduled_for <= ? 

241 ORDER BY scheduled_for 

242 """, 

243 (check_time,), 

244 ) 

245 

246 results = [] 

247 for row in cursor.fetchall(): 

248 result = dict(row) 

249 result["context_triggers"] = json.loads( 

250 result["context_triggers"] or "[]", 

251 ) 

252 result["metadata"] = json.loads(result["metadata"] or "{}") 

253 results.append(result) 

254 

255 return results 

256 

257 async def execute_reminder(self, reminder_id: str) -> bool: 

258 """Execute a due reminder.""" 

259 try: 

260 # Get reminder details 

261 reminder_data = await self._get_reminder_by_id(reminder_id) 

262 if not reminder_data: 

263 return False 

264 

265 # Execute callbacks 

266 await self._execute_notification_callbacks(reminder_id, reminder_data) 

267 

268 # Handle recurring reminders or mark as executed 

269 recurrence_pattern = ( 

270 reminder_data.get("recurrence_pattern") 

271 or reminder_data.get("recurrence_rule") # type: ignore[arg-type] 

272 ) 

273 if recurrence_pattern: 

274 return await self._handle_recurring_reminder( 

275 reminder_id, 

276 reminder_data, 

277 recurrence_pattern, 

278 ) 

279 return await self._mark_reminder_executed(reminder_id) 

280 

281 except Exception as e: 

282 await self._log_reminder_action( 

283 reminder_id, 

284 "executed", 

285 "failed", 

286 {"error": str(e)}, 

287 ) 

288 return False 

289 

290 async def _get_reminder_by_id(self, reminder_id: str) -> dict[str, Any] | None: 

291 """Fetch and parse reminder data from database.""" 

292 with sqlite3.connect(self.db_path) as conn: 

293 conn.row_factory = sqlite3.Row 

294 row = conn.execute( 

295 "SELECT * FROM reminders WHERE id = ?", 

296 (reminder_id,), 

297 ).fetchone() 

298 

299 if not row: 

300 return None 

301 

302 reminder_data = dict(row) 

303 reminder_data["context_triggers"] = json.loads( 

304 reminder_data.get("context_triggers") or "[]", 

305 ) 

306 reminder_data["metadata"] = json.loads( 

307 reminder_data.get("metadata") or "{}", 

308 ) 

309 return reminder_data 

310 

311 async def _execute_notification_callbacks( 

312 self, 

313 reminder_id: str, 

314 reminder_data: dict[str, Any], 

315 ) -> None: 

316 """Execute all registered callbacks for a reminder.""" 

317 callbacks = self._callbacks.get( 

318 reminder_data.get("notification_method", ""), [] 

319 ) 

320 for callback in callbacks: 

321 try: 

322 await callback(reminder_data) 

323 except Exception as e: 

324 logger.exception(f"Callback error for reminder {reminder_id}: {e}") 

325 

326 async def _handle_recurring_reminder( 

327 self, 

328 reminder_id: str, 

329 reminder_data: dict[str, Any], 

330 recurrence_pattern: Any, 

331 ) -> bool: 

332 """Schedule next occurrence for recurring reminder.""" 

333 next_time = self._calculate_next_occurrence( 

334 reminder_data.get("scheduled_time") or reminder_data.get("scheduled_for"), # type: ignore[arg-type] 

335 recurrence_pattern, 

336 ) 

337 

338 if next_time: 

339 with sqlite3.connect(self.db_path) as conn: 

340 conn.execute( 

341 """ 

342 UPDATE reminders 

343 SET scheduled_for = ?, status = 'pending', executed_at = NULL 

344 WHERE id = ? 

345 """, 

346 (next_time, reminder_id), 

347 ) 

348 

349 await self._log_reminder_action( 

350 reminder_id, 

351 "rescheduled", 

352 "success", 

353 {"next_occurrence": next_time.isoformat()}, 

354 ) 

355 return True 

356 

357 # If no next occurrence, mark as executed 

358 return await self._mark_reminder_executed(reminder_id) 

359 

360 async def _mark_reminder_executed(self, reminder_id: str) -> bool: 

361 """Mark reminder as executed in database.""" 

362 now = datetime.now() 

363 with sqlite3.connect(self.db_path) as conn: 

364 conn.execute( 

365 """ 

366 UPDATE reminders 

367 SET status = ?, executed_at = ? 

368 WHERE id = ? 

369 """, 

370 (ReminderStatus.EXECUTED.value, now, reminder_id), 

371 ) 

372 

373 await self._log_reminder_action( 

374 reminder_id, 

375 "executed", 

376 "success", 

377 {"executed_at": now.isoformat()}, 

378 ) 

379 return True 

380 

381 async def cancel_reminder(self, reminder_id: str) -> bool: 

382 """Cancel a pending reminder.""" 

383 try: 

384 with sqlite3.connect(self.db_path) as conn: 

385 result = conn.execute( 

386 """ 

387 UPDATE reminders 

388 SET status = ? 

389 WHERE id = ? AND status IN ('pending', 'active') 

390 """, 

391 (ReminderStatus.CANCELLED.value, reminder_id), 

392 ) 

393 

394 success = result.rowcount > 0 

395 

396 if success: 

397 await self._log_reminder_action(reminder_id, "cancelled", "success", {}) 

398 

399 return success 

400 

401 except Exception as e: 

402 await self._log_reminder_action( 

403 reminder_id, 

404 "cancelled", 

405 "failed", 

406 {"error": str(e)}, 

407 ) 

408 return False 

409 

410 def register_notification_callback( 

411 self, 

412 method: str, 

413 callback: Callable[..., Any], 

414 ) -> None: 

415 """Register callback for notification method.""" 

416 if method not in self._callbacks: 

417 self._callbacks[method] = [] 

418 self._callbacks[method].append(callback) 

419 

420 def start_scheduler(self) -> None: 

421 """Start the background scheduler.""" 

422 if self._running: 

423 return 

424 

425 self._running = True 

426 self._scheduler_thread = threading.Thread( 

427 target=self._scheduler_loop, 

428 daemon=True, 

429 ) 

430 self._scheduler_thread.start() 

431 

432 def stop_scheduler(self) -> None: 

433 """Stop the background scheduler.""" 

434 self._running = False 

435 if self._scheduler_thread and self._scheduler_thread.is_alive(): 

436 self._scheduler_thread.join(timeout=5.0) 

437 

438 def _scheduler_loop(self) -> None: 

439 """Background scheduler loop.""" 

440 while self._running: 

441 loop = None # Initialize to prevent "possibly unbound" error 

442 try: 

443 loop = asyncio.new_event_loop() 

444 asyncio.set_event_loop(loop) 

445 loop.run_until_complete(self._check_and_execute_reminders()) 

446 except Exception as e: 

447 logger.exception(f"Scheduler loop error: {e}") 

448 finally: 

449 if loop and not loop.is_closed(): 

450 loop.close() 

451 time.sleep(60) # Check every minute 

452 

453 async def _check_and_execute_reminders(self) -> None: 

454 """Check for due reminders and execute them.""" 

455 due_reminders = await self.get_due_reminders() 

456 

457 for reminder in due_reminders: 

458 await self.execute_reminder(reminder["id"]) 

459 

460 def _parse_recurrence_interval(self, recurrence_rule: str) -> RecurrenceInterval: 

461 """Parse frequency and interval from recurrence rule.""" 

462 parts = recurrence_rule.split(";") 

463 interval = 1 

464 freq = None 

465 

466 for part in parts: 

467 if part.startswith("FREQ="): 

468 freq = part.split("=")[1] 

469 elif part.startswith("INTERVAL="): 

470 interval = int(part.split("=")[1]) 

471 

472 return RecurrenceInterval(frequency=freq, interval=interval) 

473 

474 def _calculate_simple_occurrence( 

475 self, 

476 last_time: datetime, 

477 recurrence_rule: str, 

478 ) -> datetime | None: 

479 """Calculate simple recurrence occurrences (daily, weekly, monthly).""" 

480 if recurrence_rule.startswith("FREQ=DAILY"): 

481 return last_time + timedelta(days=1) # type: ignore[no-any-return] 

482 if recurrence_rule.startswith("FREQ=WEEKLY"): 

483 return last_time + timedelta(weeks=1) # type: ignore[no-any-return] 

484 if recurrence_rule.startswith("FREQ=MONTHLY"): 

485 return last_time + relativedelta(months=1) # type: ignore[no-any-return] 

486 return None 

487 

488 def _calculate_interval_occurrence( 

489 self, 

490 last_time: datetime, 

491 recurrence_rule: str, 

492 ) -> datetime | None: 

493 """Calculate interval-based recurrence occurrences.""" 

494 if "INTERVAL=" in recurrence_rule: 

495 recurrence = self._parse_recurrence_interval(recurrence_rule) 

496 freq = recurrence.frequency 

497 interval = recurrence.interval 

498 

499 if freq == "HOURLY": 

500 return last_time + timedelta(hours=interval) # type: ignore[no-any-return] 

501 if freq == "MINUTELY": 

502 return last_time + timedelta(minutes=interval) # type: ignore[no-any-return] 

503 if freq == "DAILY": 

504 return last_time + timedelta(days=interval) # type: ignore[no-any-return] 

505 return None 

506 

507 def _check_dateutil_availability(self) -> bool: 

508 """Check if dateutil is available for processing.""" 

509 return DATEUTIL_AVAILABLE 

510 

511 def _attempt_simple_calculation( 

512 self, 

513 last_time: datetime, 

514 recurrence_rule: str, 

515 ) -> datetime | None: 

516 """Attempt to calculate using simple occurrence rules.""" 

517 try: 

518 return self._calculate_simple_occurrence(last_time, recurrence_rule) 

519 except Exception: 

520 return None 

521 

522 def _attempt_interval_calculation( 

523 self, 

524 last_time: datetime, 

525 recurrence_rule: str, 

526 ) -> datetime | None: 

527 """Attempt to calculate using interval occurrence rules.""" 

528 try: 

529 return self._calculate_interval_occurrence(last_time, recurrence_rule) 

530 except Exception: 

531 return None 

532 

533 def _calculate_next_occurrence( 

534 self, 

535 last_time: datetime, 

536 recurrence_rule: str, 

537 ) -> datetime | None: 

538 """Calculate next occurrence for recurring reminder.""" 

539 if not DATEUTIL_AVAILABLE: 

540 return None 

541 

542 try: 

543 # Try simple rule parsing first 

544 result = self._calculate_simple_occurrence(last_time, recurrence_rule) 

545 if result: 

546 return result 

547 

548 # Try interval-based recurrence rules 

549 result = self._calculate_interval_occurrence(last_time, recurrence_rule) 

550 if result: 

551 return result 

552 

553 except Exception as e: 

554 logger.exception(f"Error calculating next occurrence: {e}") 

555 

556 return None 

557 

558 async def _log_reminder_action( 

559 self, 

560 reminder_id: str, 

561 action: str, 

562 result: str, 

563 details: dict[str, Any], 

564 ) -> None: 

565 """Log reminder action for audit trail.""" 

566 with sqlite3.connect(self.db_path) as conn: 

567 conn.execute( 

568 """ 

569 INSERT INTO reminder_history (reminder_id, action, timestamp, result, details) 

570 VALUES (?, ?, ?, ?, ?) 

571 """, 

572 (reminder_id, action, datetime.now(), result, json.dumps(details)), 

573 ) 

574 

575 

576# Global scheduler instance 

577_reminder_scheduler = None 

578 

579 

580def get_reminder_scheduler() -> "ReminderScheduler": 

581 """Get global reminder scheduler instance.""" 

582 global _reminder_scheduler 

583 if _reminder_scheduler is None: 

584 _reminder_scheduler = ReminderScheduler() 

585 return _reminder_scheduler 

586 

587 

588# Public API functions for MCP tools 

589async def create_natural_reminder( 

590 title: str, 

591 time_expression: str, 

592 description: str = "", 

593 user_id: str = "default", 

594 project_id: str | None = None, 

595 notification_method: str = "session", 

596) -> str | None: 

597 """Create reminder from natural language time expression.""" 

598 scheduler = get_reminder_scheduler() 

599 return await scheduler.create_reminder( 

600 title, 

601 time_expression, 

602 description, 

603 user_id, 

604 project_id, 

605 notification_method, 

606 ) 

607 

608 

609async def list_user_reminders( 

610 user_id: str = "default", 

611 project_id: str | None = None, 

612) -> list[dict[str, Any]]: 

613 """List pending reminders for user/project.""" 

614 scheduler = get_reminder_scheduler() 

615 return await scheduler.get_pending_reminders(user_id, project_id) 

616 

617 

618async def cancel_user_reminder(reminder_id: str) -> bool: 

619 """Cancel a specific reminder.""" 

620 scheduler = get_reminder_scheduler() 

621 return await scheduler.cancel_reminder(reminder_id) 

622 

623 

624async def check_due_reminders() -> list[dict[str, Any]]: 

625 """Check for reminders that are due now.""" 

626 scheduler = get_reminder_scheduler() 

627 return await scheduler.get_due_reminders() 

628 

629 

630def start_reminder_service() -> None: 

631 """Start the background reminder service.""" 

632 scheduler = get_reminder_scheduler() 

633 scheduler.start_scheduler() 

634 

635 

636def stop_reminder_service() -> None: 

637 """Stop the background reminder service.""" 

638 scheduler = get_reminder_scheduler() 

639 scheduler.stop_scheduler() 

640 

641 

642def register_session_notifications() -> None: 

643 """Register session-based notification callbacks.""" 

644 scheduler = get_reminder_scheduler() 

645 

646 async def session_notification(reminder_data: dict[str, Any]) -> None: 

647 """Default session notification handler.""" 

648 logger.info( 

649 f"Reminder: {reminder_data['title']} - {reminder_data['description']}", 

650 ) 

651 

652 scheduler.register_notification_callback("session", session_notification)