Coverage for session_buddy / natural_scheduler.py: 16.82%
255 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Natural Language Scheduling module for time-based reminders and triggers.
3This module provides intelligent scheduling capabilities including:
4- Natural language time parsing ("in 30 minutes", "tomorrow at 9am")
5- Recurring reminders and cron-like scheduling
6- Context-aware reminder triggers
7- Integration with session workflow
8"""
10import asyncio
11import importlib.util
12import json
13import logging
14import sqlite3
15import threading
16import time
17from collections.abc import Callable
18from datetime import datetime, timedelta
19from pathlib import Path
20from typing import Any
22DATEUTIL_AVAILABLE = importlib.util.find_spec("dateutil") is not None
23CRONTAB_AVAILABLE = importlib.util.find_spec("python_crontab") is not None
24SCHEDULE_AVAILABLE = importlib.util.find_spec("schedule") is not None
26if DATEUTIL_AVAILABLE: 26 ↛ 29line 26 didn't jump to line 29 because the condition on line 26 was always true
27 from dateutil.relativedelta import relativedelta
29from .session_types import RecurrenceInterval
30from .utils.scheduler import (
31 NaturalLanguageParser,
32 NaturalReminder,
33 ReminderStatus,
34 ReminderType,
35)
37logger = logging.getLogger(__name__)
40class ReminderScheduler:
41 """Manages scheduling and execution of reminders."""
43 def __init__(self, db_path: str | None = None) -> None:
44 """Initialize reminder scheduler."""
45 self.db_path = db_path or str(
46 Path.home() / ".claude" / "data" / "natural_scheduler.db",
47 )
48 self.parser = NaturalLanguageParser()
49 self._lock = threading.Lock()
50 self._running = False
51 self._scheduler_thread: threading.Thread | None = None
52 self._callbacks: dict[str, list[Callable[..., Any]]] = {}
53 self._init_database()
55 def _init_database(self) -> None:
56 """Initialize SQLite database for reminders."""
57 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
59 with sqlite3.connect(self.db_path) as conn:
60 conn.execute("""
61 CREATE TABLE IF NOT EXISTS reminders (
62 reminder_id TEXT PRIMARY KEY,
63 reminder_type TEXT NOT NULL,
64 expression TEXT,
65 scheduled_time TIMESTAMP NOT NULL,
66 action TEXT,
67 status TEXT NOT NULL,
68 created_at TIMESTAMP,
69 executed_at TIMESTAMP,
70 recurrence_pattern TEXT,
71 failure_reason TEXT,
72 metadata TEXT -- JSON object
73 )
74 """)
76 conn.execute("""
77 CREATE TABLE IF NOT EXISTS reminder_history (
78 id INTEGER PRIMARY KEY AUTOINCREMENT,
79 reminder_id TEXT NOT NULL,
80 action TEXT NOT NULL,
81 timestamp TIMESTAMP,
82 result TEXT,
83 details TEXT -- JSON object
84 )
85 """)
87 # Create indices
88 conn.execute(
89 "CREATE INDEX IF NOT EXISTS idx_reminders_scheduled ON reminders(scheduled_for)",
90 )
91 conn.execute(
92 "CREATE INDEX IF NOT EXISTS idx_reminders_status ON reminders(status)",
93 )
94 conn.execute(
95 "CREATE INDEX IF NOT EXISTS idx_reminders_user ON reminders(user_id)",
96 )
97 conn.execute(
98 "CREATE INDEX IF NOT EXISTS idx_reminders_project ON reminders(project_id)",
99 )
101 async def create_reminder(
102 self,
103 title: str,
104 time_expression: str,
105 description: str = "",
106 user_id: str = "default",
107 project_id: str | None = None,
108 notification_method: str = "session",
109 context_triggers: list[str] | None = None,
110 metadata: dict[str, Any] | None = None,
111 ) -> str | None:
112 """Create a new reminder from natural language."""
113 # Parse the time expression
114 scheduled_time = self.parser.parse_time_expression(time_expression)
115 if not scheduled_time:
116 return None
118 # Check for recurrence
119 recurrence_pattern = self.parser.parse_recurrence(time_expression)
120 reminder_type = (
121 ReminderType.RECURRING if recurrence_pattern else ReminderType.TASK
122 )
124 # Generate reminder ID
125 reminder_id = f"rem_{int(time.time() * 1000)}"
127 # Build metadata from additional fields
128 reminder_metadata: dict[str, Any] = {
129 "title": title,
130 "description": description,
131 "user_id": user_id,
132 "project_id": project_id,
133 "context_triggers": context_triggers or [],
134 "notification_method": notification_method,
135 }
136 if metadata:
137 reminder_metadata.update(metadata)
139 reminder = NaturalReminder(
140 reminder_id=reminder_id,
141 reminder_type=reminder_type,
142 expression=time_expression,
143 scheduled_time=scheduled_time,
144 action=title or description,
145 status=ReminderStatus.PENDING,
146 created_at=datetime.now(),
147 executed_at=None,
148 recurrence_pattern=recurrence_pattern,
149 metadata=reminder_metadata,
150 )
152 # Store in database
153 with sqlite3.connect(self.db_path) as conn:
154 conn.execute(
155 """
156 INSERT INTO reminders (reminder_id, reminder_type, expression, scheduled_time, action,
157 status, created_at, executed_at, recurrence_pattern, metadata)
158 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
159 """,
160 (
161 reminder.reminder_id,
162 reminder.reminder_type.value,
163 reminder.expression,
164 reminder.scheduled_time,
165 reminder.action,
166 reminder.status.value,
167 reminder.created_at,
168 reminder.executed_at,
169 reminder.recurrence_pattern,
170 json.dumps(reminder.metadata),
171 ),
172 )
174 # Log creation
175 await self._log_reminder_action(
176 reminder_id,
177 "created",
178 "success",
179 {
180 "scheduled_for": scheduled_time.isoformat(),
181 "time_expression": time_expression,
182 },
183 )
185 return reminder_id
187 async def get_pending_reminders(
188 self,
189 user_id: str | None = None,
190 project_id: str | None = None,
191 ) -> list[dict[str, Any]]:
192 """Get pending reminders."""
193 with sqlite3.connect(self.db_path) as conn:
194 conn.row_factory = sqlite3.Row
196 where_conditions = ["status IN ('pending', 'active')"]
197 params = []
199 if user_id:
200 where_conditions.append("user_id = ?")
201 params.append(user_id)
203 if project_id:
204 where_conditions.append("project_id = ?")
205 params.append(project_id)
207 # Build SQL safely - all user input is parameterized via params list
208 query = (
209 "SELECT * FROM reminders WHERE "
210 + " AND ".join(where_conditions)
211 + " ORDER BY scheduled_for"
212 )
214 cursor = conn.execute(query, params)
215 results = []
217 for row in cursor.fetchall():
218 result = dict(row)
219 result["context_triggers"] = json.loads(
220 result["context_triggers"] or "[]",
221 )
222 result["metadata"] = json.loads(result["metadata"] or "{}")
223 results.append(result)
225 return results
227 async def get_due_reminders(
228 self,
229 check_time: datetime | None = None,
230 ) -> list[dict[str, Any]]:
231 """Get reminders that are due for execution."""
232 check_time = check_time or datetime.now()
234 with sqlite3.connect(self.db_path) as conn:
235 conn.row_factory = sqlite3.Row
237 cursor = conn.execute(
238 """
239 SELECT * FROM reminders
240 WHERE status = 'pending' AND scheduled_for <= ?
241 ORDER BY scheduled_for
242 """,
243 (check_time,),
244 )
246 results = []
247 for row in cursor.fetchall():
248 result = dict(row)
249 result["context_triggers"] = json.loads(
250 result["context_triggers"] or "[]",
251 )
252 result["metadata"] = json.loads(result["metadata"] or "{}")
253 results.append(result)
255 return results
257 async def execute_reminder(self, reminder_id: str) -> bool:
258 """Execute a due reminder."""
259 try:
260 # Get reminder details
261 reminder_data = await self._get_reminder_by_id(reminder_id)
262 if not reminder_data:
263 return False
265 # Execute callbacks
266 await self._execute_notification_callbacks(reminder_id, reminder_data)
268 # Handle recurring reminders or mark as executed
269 recurrence_pattern = (
270 reminder_data.get("recurrence_pattern")
271 or reminder_data.get("recurrence_rule") # type: ignore[arg-type]
272 )
273 if recurrence_pattern:
274 return await self._handle_recurring_reminder(
275 reminder_id,
276 reminder_data,
277 recurrence_pattern,
278 )
279 return await self._mark_reminder_executed(reminder_id)
281 except Exception as e:
282 await self._log_reminder_action(
283 reminder_id,
284 "executed",
285 "failed",
286 {"error": str(e)},
287 )
288 return False
290 async def _get_reminder_by_id(self, reminder_id: str) -> dict[str, Any] | None:
291 """Fetch and parse reminder data from database."""
292 with sqlite3.connect(self.db_path) as conn:
293 conn.row_factory = sqlite3.Row
294 row = conn.execute(
295 "SELECT * FROM reminders WHERE id = ?",
296 (reminder_id,),
297 ).fetchone()
299 if not row:
300 return None
302 reminder_data = dict(row)
303 reminder_data["context_triggers"] = json.loads(
304 reminder_data.get("context_triggers") or "[]",
305 )
306 reminder_data["metadata"] = json.loads(
307 reminder_data.get("metadata") or "{}",
308 )
309 return reminder_data
311 async def _execute_notification_callbacks(
312 self,
313 reminder_id: str,
314 reminder_data: dict[str, Any],
315 ) -> None:
316 """Execute all registered callbacks for a reminder."""
317 callbacks = self._callbacks.get(
318 reminder_data.get("notification_method", ""), []
319 )
320 for callback in callbacks:
321 try:
322 await callback(reminder_data)
323 except Exception as e:
324 logger.exception(f"Callback error for reminder {reminder_id}: {e}")
326 async def _handle_recurring_reminder(
327 self,
328 reminder_id: str,
329 reminder_data: dict[str, Any],
330 recurrence_pattern: Any,
331 ) -> bool:
332 """Schedule next occurrence for recurring reminder."""
333 next_time = self._calculate_next_occurrence(
334 reminder_data.get("scheduled_time") or reminder_data.get("scheduled_for"), # type: ignore[arg-type]
335 recurrence_pattern,
336 )
338 if next_time:
339 with sqlite3.connect(self.db_path) as conn:
340 conn.execute(
341 """
342 UPDATE reminders
343 SET scheduled_for = ?, status = 'pending', executed_at = NULL
344 WHERE id = ?
345 """,
346 (next_time, reminder_id),
347 )
349 await self._log_reminder_action(
350 reminder_id,
351 "rescheduled",
352 "success",
353 {"next_occurrence": next_time.isoformat()},
354 )
355 return True
357 # If no next occurrence, mark as executed
358 return await self._mark_reminder_executed(reminder_id)
360 async def _mark_reminder_executed(self, reminder_id: str) -> bool:
361 """Mark reminder as executed in database."""
362 now = datetime.now()
363 with sqlite3.connect(self.db_path) as conn:
364 conn.execute(
365 """
366 UPDATE reminders
367 SET status = ?, executed_at = ?
368 WHERE id = ?
369 """,
370 (ReminderStatus.EXECUTED.value, now, reminder_id),
371 )
373 await self._log_reminder_action(
374 reminder_id,
375 "executed",
376 "success",
377 {"executed_at": now.isoformat()},
378 )
379 return True
381 async def cancel_reminder(self, reminder_id: str) -> bool:
382 """Cancel a pending reminder."""
383 try:
384 with sqlite3.connect(self.db_path) as conn:
385 result = conn.execute(
386 """
387 UPDATE reminders
388 SET status = ?
389 WHERE id = ? AND status IN ('pending', 'active')
390 """,
391 (ReminderStatus.CANCELLED.value, reminder_id),
392 )
394 success = result.rowcount > 0
396 if success:
397 await self._log_reminder_action(reminder_id, "cancelled", "success", {})
399 return success
401 except Exception as e:
402 await self._log_reminder_action(
403 reminder_id,
404 "cancelled",
405 "failed",
406 {"error": str(e)},
407 )
408 return False
410 def register_notification_callback(
411 self,
412 method: str,
413 callback: Callable[..., Any],
414 ) -> None:
415 """Register callback for notification method."""
416 if method not in self._callbacks:
417 self._callbacks[method] = []
418 self._callbacks[method].append(callback)
420 def start_scheduler(self) -> None:
421 """Start the background scheduler."""
422 if self._running:
423 return
425 self._running = True
426 self._scheduler_thread = threading.Thread(
427 target=self._scheduler_loop,
428 daemon=True,
429 )
430 self._scheduler_thread.start()
432 def stop_scheduler(self) -> None:
433 """Stop the background scheduler."""
434 self._running = False
435 if self._scheduler_thread and self._scheduler_thread.is_alive():
436 self._scheduler_thread.join(timeout=5.0)
438 def _scheduler_loop(self) -> None:
439 """Background scheduler loop."""
440 while self._running:
441 loop = None # Initialize to prevent "possibly unbound" error
442 try:
443 loop = asyncio.new_event_loop()
444 asyncio.set_event_loop(loop)
445 loop.run_until_complete(self._check_and_execute_reminders())
446 except Exception as e:
447 logger.exception(f"Scheduler loop error: {e}")
448 finally:
449 if loop and not loop.is_closed():
450 loop.close()
451 time.sleep(60) # Check every minute
453 async def _check_and_execute_reminders(self) -> None:
454 """Check for due reminders and execute them."""
455 due_reminders = await self.get_due_reminders()
457 for reminder in due_reminders:
458 await self.execute_reminder(reminder["id"])
460 def _parse_recurrence_interval(self, recurrence_rule: str) -> RecurrenceInterval:
461 """Parse frequency and interval from recurrence rule."""
462 parts = recurrence_rule.split(";")
463 interval = 1
464 freq = None
466 for part in parts:
467 if part.startswith("FREQ="):
468 freq = part.split("=")[1]
469 elif part.startswith("INTERVAL="):
470 interval = int(part.split("=")[1])
472 return RecurrenceInterval(frequency=freq, interval=interval)
474 def _calculate_simple_occurrence(
475 self,
476 last_time: datetime,
477 recurrence_rule: str,
478 ) -> datetime | None:
479 """Calculate simple recurrence occurrences (daily, weekly, monthly)."""
480 if recurrence_rule.startswith("FREQ=DAILY"):
481 return last_time + timedelta(days=1) # type: ignore[no-any-return]
482 if recurrence_rule.startswith("FREQ=WEEKLY"):
483 return last_time + timedelta(weeks=1) # type: ignore[no-any-return]
484 if recurrence_rule.startswith("FREQ=MONTHLY"):
485 return last_time + relativedelta(months=1) # type: ignore[no-any-return]
486 return None
488 def _calculate_interval_occurrence(
489 self,
490 last_time: datetime,
491 recurrence_rule: str,
492 ) -> datetime | None:
493 """Calculate interval-based recurrence occurrences."""
494 if "INTERVAL=" in recurrence_rule:
495 recurrence = self._parse_recurrence_interval(recurrence_rule)
496 freq = recurrence.frequency
497 interval = recurrence.interval
499 if freq == "HOURLY":
500 return last_time + timedelta(hours=interval) # type: ignore[no-any-return]
501 if freq == "MINUTELY":
502 return last_time + timedelta(minutes=interval) # type: ignore[no-any-return]
503 if freq == "DAILY":
504 return last_time + timedelta(days=interval) # type: ignore[no-any-return]
505 return None
507 def _check_dateutil_availability(self) -> bool:
508 """Check if dateutil is available for processing."""
509 return DATEUTIL_AVAILABLE
511 def _attempt_simple_calculation(
512 self,
513 last_time: datetime,
514 recurrence_rule: str,
515 ) -> datetime | None:
516 """Attempt to calculate using simple occurrence rules."""
517 try:
518 return self._calculate_simple_occurrence(last_time, recurrence_rule)
519 except Exception:
520 return None
522 def _attempt_interval_calculation(
523 self,
524 last_time: datetime,
525 recurrence_rule: str,
526 ) -> datetime | None:
527 """Attempt to calculate using interval occurrence rules."""
528 try:
529 return self._calculate_interval_occurrence(last_time, recurrence_rule)
530 except Exception:
531 return None
533 def _calculate_next_occurrence(
534 self,
535 last_time: datetime,
536 recurrence_rule: str,
537 ) -> datetime | None:
538 """Calculate next occurrence for recurring reminder."""
539 if not DATEUTIL_AVAILABLE:
540 return None
542 try:
543 # Try simple rule parsing first
544 result = self._calculate_simple_occurrence(last_time, recurrence_rule)
545 if result:
546 return result
548 # Try interval-based recurrence rules
549 result = self._calculate_interval_occurrence(last_time, recurrence_rule)
550 if result:
551 return result
553 except Exception as e:
554 logger.exception(f"Error calculating next occurrence: {e}")
556 return None
558 async def _log_reminder_action(
559 self,
560 reminder_id: str,
561 action: str,
562 result: str,
563 details: dict[str, Any],
564 ) -> None:
565 """Log reminder action for audit trail."""
566 with sqlite3.connect(self.db_path) as conn:
567 conn.execute(
568 """
569 INSERT INTO reminder_history (reminder_id, action, timestamp, result, details)
570 VALUES (?, ?, ?, ?, ?)
571 """,
572 (reminder_id, action, datetime.now(), result, json.dumps(details)),
573 )
576# Global scheduler instance
577_reminder_scheduler = None
580def get_reminder_scheduler() -> "ReminderScheduler":
581 """Get global reminder scheduler instance."""
582 global _reminder_scheduler
583 if _reminder_scheduler is None:
584 _reminder_scheduler = ReminderScheduler()
585 return _reminder_scheduler
588# Public API functions for MCP tools
589async def create_natural_reminder(
590 title: str,
591 time_expression: str,
592 description: str = "",
593 user_id: str = "default",
594 project_id: str | None = None,
595 notification_method: str = "session",
596) -> str | None:
597 """Create reminder from natural language time expression."""
598 scheduler = get_reminder_scheduler()
599 return await scheduler.create_reminder(
600 title,
601 time_expression,
602 description,
603 user_id,
604 project_id,
605 notification_method,
606 )
609async def list_user_reminders(
610 user_id: str = "default",
611 project_id: str | None = None,
612) -> list[dict[str, Any]]:
613 """List pending reminders for user/project."""
614 scheduler = get_reminder_scheduler()
615 return await scheduler.get_pending_reminders(user_id, project_id)
618async def cancel_user_reminder(reminder_id: str) -> bool:
619 """Cancel a specific reminder."""
620 scheduler = get_reminder_scheduler()
621 return await scheduler.cancel_reminder(reminder_id)
624async def check_due_reminders() -> list[dict[str, Any]]:
625 """Check for reminders that are due now."""
626 scheduler = get_reminder_scheduler()
627 return await scheduler.get_due_reminders()
630def start_reminder_service() -> None:
631 """Start the background reminder service."""
632 scheduler = get_reminder_scheduler()
633 scheduler.start_scheduler()
636def stop_reminder_service() -> None:
637 """Stop the background reminder service."""
638 scheduler = get_reminder_scheduler()
639 scheduler.stop_scheduler()
642def register_session_notifications() -> None:
643 """Register session-based notification callbacks."""
644 scheduler = get_reminder_scheduler()
646 async def session_notification(reminder_data: dict[str, Any]) -> None:
647 """Default session notification handler."""
648 logger.info(
649 f"Reminder: {reminder_data['title']} - {reminder_data['description']}",
650 )
652 scheduler.register_notification_callback("session", session_notification)