Coverage for session_buddy / utils / scheduler / time_parser.py: 15.87%
150 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Natural language time expression parser.
3This module provides the NaturalLanguageParser class for parsing
4natural language time expressions into datetime objects.
5"""
7from __future__ import annotations
9import contextlib
10import re
11from datetime import datetime, timedelta
12from re import Match
13from typing import TYPE_CHECKING, Any
15if TYPE_CHECKING:
16 from dateutil import parser as date_parser
17 from dateutil.relativedelta import relativedelta
19# Try to import dateutil for better date handling
20try:
21 from dateutil import parser as date_parser
22 from dateutil.relativedelta import relativedelta
24 DATEUTIL_AVAILABLE = True
25except ImportError:
26 DATEUTIL_AVAILABLE = False
29class NaturalLanguageParser:
30 """Parses natural language time expressions."""
32 def __init__(self) -> None:
33 """Initialize natural language parser."""
34 self.time_patterns = self._create_time_patterns()
35 self.recurrence_patterns = self._create_recurrence_patterns()
37 def _create_time_patterns(self) -> dict[str, Any]:
38 """Create time parsing patterns dictionary."""
39 patterns = {}
41 # Add relative time patterns
42 patterns.update(self._get_relative_time_patterns())
44 # Add specific time patterns
45 patterns.update(self._get_specific_time_patterns())
47 # Add session-relative patterns
48 patterns.update(self._get_session_relative_patterns())
50 return patterns
52 def _get_relative_time_patterns(self) -> dict[str, Any]:
53 """Get relative time patterns (in X minutes/hours/days)."""
54 return {
55 r"in (\d+) (minute|min|minutes|mins)": lambda m: timedelta(
56 minutes=int(m.group(1))
57 ),
58 r"in (\d+) (hour|hours|hr|hrs)": lambda m: timedelta(hours=int(m.group(1))),
59 r"in (\d+) (day|days)": lambda m: timedelta(days=int(m.group(1))),
60 r"in (\d+) (week|weeks)": lambda m: timedelta(weeks=int(m.group(1))),
61 r"in (\d+) (month|months)": self._create_month_handler(),
62 }
64 def _get_specific_time_patterns(self) -> dict[str, Any]:
65 """Get specific time patterns (tomorrow, next monday, etc)."""
66 return {
67 r"tomorrow( at (\d{1,2}):?(\d{2})?)?(am|pm)?": self._parse_tomorrow,
68 r"next (monday|tuesday|wednesday|thursday|friday|saturday|sunday)": self._parse_next_weekday,
69 r"at (\d{1,2}):?(\d{2})?\s*(am|pm)?": self._parse_specific_time,
70 r"(monday|tuesday|wednesday|thursday|friday|saturday|sunday) at (\d{1,2}):?(\d{2})?\s*(am|pm)?": self._parse_weekday_time,
71 }
73 def _get_session_relative_patterns(self) -> dict[str, Any]:
74 """Get session-relative time patterns."""
75 return {
76 r"end of (session|work)": lambda m: timedelta(hours=2),
77 r"after (break|lunch)": lambda m: timedelta(hours=1),
78 r"before (meeting|call)": lambda m: timedelta(minutes=15),
79 }
81 def _create_month_handler(self) -> Any:
82 """Create month duration handler with dateutil fallback."""
83 if DATEUTIL_AVAILABLE:
84 return lambda m: relativedelta(months=int(m.group(1)))
85 return lambda m: timedelta(days=int(m.group(1)) * 30)
87 def _create_recurrence_patterns(self) -> dict[str, Any]:
88 """Create recurrence parsing patterns dictionary."""
89 return {
90 r"every (day|daily)": "FREQ=DAILY",
91 r"every (week|weekly)": "FREQ=WEEKLY",
92 r"every (month|monthly)": "FREQ=MONTHLY",
93 r"every (\d+) (minute|minutes)": lambda m: f"FREQ=MINUTELY;INTERVAL={m.group(1)}",
94 r"every (\d+) (hour|hours)": lambda m: f"FREQ=HOURLY;INTERVAL={m.group(1)}",
95 r"every (\d+) (day|days)": lambda m: f"FREQ=DAILY;INTERVAL={m.group(1)}",
96 }
98 def _try_parse_relative_pattern(
99 self, expression: str, base_time: datetime, time_patterns: dict[str, Any]
100 ) -> datetime | None:
101 """Try to parse the expression using relative time patterns."""
102 for pattern, handler in time_patterns.items():
103 match = self._try_pattern_match(pattern, expression)
104 if match:
105 result = self._process_pattern_handler(handler, match)
106 if result:
107 return self._convert_result_to_datetime(result, base_time)
108 return None
110 def _try_pattern_match(self, pattern: str, expression: str) -> Match[str] | None:
111 """Try to match a pattern against the expression."""
112 return re.search(pattern, expression, re.IGNORECASE) # REGEX OK: Time parsing
114 def _process_pattern_handler(self, handler: Any, match: Match[str]) -> Any:
115 """Process a pattern handler with exception handling."""
116 with contextlib.suppress(TypeError, ValueError, RuntimeError, AttributeError):
117 if callable(handler):
118 return handler(match) # type: ignore[no-untyped-call]
119 return None
121 def _convert_result_to_datetime(
122 self, result: Any, base_time: datetime
123 ) -> datetime | None:
124 """Convert handler result to datetime with base time."""
125 if isinstance(result, timedelta):
126 return base_time + result
127 if isinstance(result, datetime):
128 return result
129 if hasattr(result, "days") or hasattr(result, "months"):
130 return base_time + result # type: ignore[no-any-return]
131 return None
133 def _try_parse_absolute_date(
134 self, expression: str, base_time: datetime
135 ) -> datetime | None:
136 """Try to parse the expression using absolute date parsing."""
137 if DATEUTIL_AVAILABLE:
138 try:
139 parsed_date = date_parser.parse(expression, default=base_time)
140 # Ensure parsed_date is a datetime object
141 if (
142 isinstance(parsed_date, datetime) and parsed_date > base_time
143 ): # Only future dates
144 return datetime(
145 parsed_date.year,
146 parsed_date.month,
147 parsed_date.day,
148 parsed_date.hour,
149 parsed_date.minute,
150 parsed_date.second,
151 )
152 except (ValueError, TypeError):
153 with contextlib.suppress(ValueError, TypeError):
154 pass
155 return None
157 def _validate_input(self, expression: str) -> str | None:
158 """Validate and normalize input expression."""
159 if not expression or not expression.strip():
160 return None
161 return expression.lower().strip()
163 def _try_parsing_strategies(
164 self, expression: str, base_time: datetime
165 ) -> datetime | None:
166 """Try multiple parsing strategies in order."""
167 # Strategy 1: Relative patterns
168 result = self._try_parse_relative_pattern(
169 expression, base_time, self.time_patterns
170 )
171 if result:
172 return result
174 # Strategy 2: Absolute date parsing
175 result = self._try_parse_absolute_date(expression, base_time)
176 if result:
177 return result
179 return None
181 def parse_time_expression(
182 self,
183 expression: str,
184 base_time: datetime | None = None,
185 ) -> datetime | None:
186 """Parse natural language time expression."""
187 normalized_expression = self._validate_input(expression)
188 if not normalized_expression:
189 return None
191 base_time = base_time or datetime.now()
192 return self._try_parsing_strategies(normalized_expression, base_time)
194 def parse_recurrence(self, expression: str) -> str | None:
195 """Parse recurrence pattern from natural language."""
196 if not expression:
197 return None
199 expression = expression.lower().strip()
201 for pattern, handler in self.recurrence_patterns.items():
202 match = re.search(
203 pattern, expression, re.IGNORECASE
204 ) # REGEX OK: Recurrence parsing
205 if match:
206 if callable(handler):
207 result = handler(match)
208 if isinstance(result, str):
209 return result
210 elif isinstance(handler, str):
211 return handler
213 return None
215 def _parse_tomorrow(self, match: Match[str]) -> datetime:
216 """Parse 'tomorrow' with optional time."""
217 tomorrow = datetime.now() + timedelta(days=1)
219 if match.group(2) and match.group(3): # Has time
220 hour = int(match.group(2))
221 minute = int(match.group(3))
222 am_pm = match.group(4)
224 if am_pm and am_pm.lower() == "pm" and hour != 12:
225 hour += 12
226 elif am_pm and am_pm.lower() == "am" and hour == 12:
227 hour = 0
229 return tomorrow.replace(hour=hour, minute=minute, second=0, microsecond=0)
230 # Default to 9 AM tomorrow
231 return tomorrow.replace(hour=9, minute=0, second=0, microsecond=0)
233 def _parse_next_weekday(self, match: Match[str]) -> datetime:
234 """Parse 'next monday', etc."""
235 weekdays = {
236 "monday": 0,
237 "tuesday": 1,
238 "wednesday": 2,
239 "thursday": 3,
240 "friday": 4,
241 "saturday": 5,
242 "sunday": 6,
243 }
245 target_weekday = weekdays[match.group(1)]
246 today = datetime.now()
247 days_ahead = target_weekday - today.weekday()
249 if days_ahead <= 0: # Target day already happened this week
250 days_ahead += 7
252 return today + timedelta(days=days_ahead)
254 def _parse_specific_time(self, match: Match[str]) -> datetime:
255 """Parse 'at 3:30pm' for today."""
256 hour = int(match.group(1))
257 minute = int(match.group(2)) if match.group(2) else 0
258 am_pm = match.group(3)
260 if am_pm and am_pm.lower() == "pm" and hour != 12:
261 hour += 12
262 elif am_pm and am_pm.lower() == "am" and hour == 12:
263 hour = 0
265 target_time = datetime.now().replace(
266 hour=hour,
267 minute=minute,
268 second=0,
269 microsecond=0,
270 )
272 # If time has passed today, schedule for tomorrow
273 if target_time <= datetime.now():
274 target_time += timedelta(days=1)
276 return target_time
278 def _parse_weekday_time(self, match: Match[str]) -> datetime:
279 """Parse 'monday at 3pm'."""
280 target_weekday = self._get_weekday_number(match.group(1))
281 hour, minute = self._parse_hour_minute(
282 match.group(2), match.group(3), match.group(4)
283 )
285 today = datetime.now()
286 days_ahead = self._calculate_days_ahead(target_weekday, today, hour, minute)
288 target_date = today + timedelta(days=days_ahead)
289 return target_date.replace(hour=hour, minute=minute, second=0, microsecond=0)
291 def _get_weekday_number(self, weekday_name: str) -> int:
292 """Get weekday number from name."""
293 weekdays = {
294 "monday": 0,
295 "tuesday": 1,
296 "wednesday": 2,
297 "thursday": 3,
298 "friday": 4,
299 "saturday": 5,
300 "sunday": 6,
301 }
302 return weekdays[weekday_name]
304 def _parse_hour_minute(
305 self, hour_str: str, minute_str: str | None, am_pm: str | None
306 ) -> tuple[int, int]:
307 """Parse hour and minute from time components."""
308 hour = int(hour_str)
309 minute = int(minute_str) if minute_str else 0
311 if am_pm and am_pm.lower() == "pm" and hour != 12:
312 hour += 12
313 elif am_pm and am_pm.lower() == "am" and hour == 12:
314 hour = 0
316 return hour, minute
318 def _calculate_days_ahead(
319 self, target_weekday: int, today: datetime, hour: int, minute: int
320 ) -> int:
321 """Calculate how many days ahead the target weekday is."""
322 days_ahead = target_weekday - today.weekday()
324 if days_ahead < 0: # Target day already happened this week
325 days_ahead += 7
326 elif days_ahead == 0: # Today - check if time has passed
327 target_time = today.replace(
328 hour=hour, minute=minute, second=0, microsecond=0
329 )
330 if target_time <= today:
331 days_ahead = 7
333 return days_ahead