Coverage for session_buddy / utils / scheduler / time_parser.py: 15.87%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Natural language time expression parser. 

2 

3This module provides the NaturalLanguageParser class for parsing 

4natural language time expressions into datetime objects. 

5""" 

6 

7from __future__ import annotations 

8 

9import contextlib 

10import re 

11from datetime import datetime, timedelta 

12from re import Match 

13from typing import TYPE_CHECKING, Any 

14 

15if TYPE_CHECKING: 

16 from dateutil import parser as date_parser 

17 from dateutil.relativedelta import relativedelta 

18 

19# Try to import dateutil for better date handling 

20try: 

21 from dateutil import parser as date_parser 

22 from dateutil.relativedelta import relativedelta 

23 

24 DATEUTIL_AVAILABLE = True 

25except ImportError: 

26 DATEUTIL_AVAILABLE = False 

27 

28 

29class NaturalLanguageParser: 

30 """Parses natural language time expressions.""" 

31 

32 def __init__(self) -> None: 

33 """Initialize natural language parser.""" 

34 self.time_patterns = self._create_time_patterns() 

35 self.recurrence_patterns = self._create_recurrence_patterns() 

36 

37 def _create_time_patterns(self) -> dict[str, Any]: 

38 """Create time parsing patterns dictionary.""" 

39 patterns = {} 

40 

41 # Add relative time patterns 

42 patterns.update(self._get_relative_time_patterns()) 

43 

44 # Add specific time patterns 

45 patterns.update(self._get_specific_time_patterns()) 

46 

47 # Add session-relative patterns 

48 patterns.update(self._get_session_relative_patterns()) 

49 

50 return patterns 

51 

52 def _get_relative_time_patterns(self) -> dict[str, Any]: 

53 """Get relative time patterns (in X minutes/hours/days).""" 

54 return { 

55 r"in (\d+) (minute|min|minutes|mins)": lambda m: timedelta( 

56 minutes=int(m.group(1)) 

57 ), 

58 r"in (\d+) (hour|hours|hr|hrs)": lambda m: timedelta(hours=int(m.group(1))), 

59 r"in (\d+) (day|days)": lambda m: timedelta(days=int(m.group(1))), 

60 r"in (\d+) (week|weeks)": lambda m: timedelta(weeks=int(m.group(1))), 

61 r"in (\d+) (month|months)": self._create_month_handler(), 

62 } 

63 

64 def _get_specific_time_patterns(self) -> dict[str, Any]: 

65 """Get specific time patterns (tomorrow, next monday, etc).""" 

66 return { 

67 r"tomorrow( at (\d{1,2}):?(\d{2})?)?(am|pm)?": self._parse_tomorrow, 

68 r"next (monday|tuesday|wednesday|thursday|friday|saturday|sunday)": self._parse_next_weekday, 

69 r"at (\d{1,2}):?(\d{2})?\s*(am|pm)?": self._parse_specific_time, 

70 r"(monday|tuesday|wednesday|thursday|friday|saturday|sunday) at (\d{1,2}):?(\d{2})?\s*(am|pm)?": self._parse_weekday_time, 

71 } 

72 

73 def _get_session_relative_patterns(self) -> dict[str, Any]: 

74 """Get session-relative time patterns.""" 

75 return { 

76 r"end of (session|work)": lambda m: timedelta(hours=2), 

77 r"after (break|lunch)": lambda m: timedelta(hours=1), 

78 r"before (meeting|call)": lambda m: timedelta(minutes=15), 

79 } 

80 

81 def _create_month_handler(self) -> Any: 

82 """Create month duration handler with dateutil fallback.""" 

83 if DATEUTIL_AVAILABLE: 

84 return lambda m: relativedelta(months=int(m.group(1))) 

85 return lambda m: timedelta(days=int(m.group(1)) * 30) 

86 

87 def _create_recurrence_patterns(self) -> dict[str, Any]: 

88 """Create recurrence parsing patterns dictionary.""" 

89 return { 

90 r"every (day|daily)": "FREQ=DAILY", 

91 r"every (week|weekly)": "FREQ=WEEKLY", 

92 r"every (month|monthly)": "FREQ=MONTHLY", 

93 r"every (\d+) (minute|minutes)": lambda m: f"FREQ=MINUTELY;INTERVAL={m.group(1)}", 

94 r"every (\d+) (hour|hours)": lambda m: f"FREQ=HOURLY;INTERVAL={m.group(1)}", 

95 r"every (\d+) (day|days)": lambda m: f"FREQ=DAILY;INTERVAL={m.group(1)}", 

96 } 

97 

98 def _try_parse_relative_pattern( 

99 self, expression: str, base_time: datetime, time_patterns: dict[str, Any] 

100 ) -> datetime | None: 

101 """Try to parse the expression using relative time patterns.""" 

102 for pattern, handler in time_patterns.items(): 

103 match = self._try_pattern_match(pattern, expression) 

104 if match: 

105 result = self._process_pattern_handler(handler, match) 

106 if result: 

107 return self._convert_result_to_datetime(result, base_time) 

108 return None 

109 

110 def _try_pattern_match(self, pattern: str, expression: str) -> Match[str] | None: 

111 """Try to match a pattern against the expression.""" 

112 return re.search(pattern, expression, re.IGNORECASE) # REGEX OK: Time parsing 

113 

114 def _process_pattern_handler(self, handler: Any, match: Match[str]) -> Any: 

115 """Process a pattern handler with exception handling.""" 

116 with contextlib.suppress(TypeError, ValueError, RuntimeError, AttributeError): 

117 if callable(handler): 

118 return handler(match) # type: ignore[no-untyped-call] 

119 return None 

120 

121 def _convert_result_to_datetime( 

122 self, result: Any, base_time: datetime 

123 ) -> datetime | None: 

124 """Convert handler result to datetime with base time.""" 

125 if isinstance(result, timedelta): 

126 return base_time + result 

127 if isinstance(result, datetime): 

128 return result 

129 if hasattr(result, "days") or hasattr(result, "months"): 

130 return base_time + result # type: ignore[no-any-return] 

131 return None 

132 

133 def _try_parse_absolute_date( 

134 self, expression: str, base_time: datetime 

135 ) -> datetime | None: 

136 """Try to parse the expression using absolute date parsing.""" 

137 if DATEUTIL_AVAILABLE: 

138 try: 

139 parsed_date = date_parser.parse(expression, default=base_time) 

140 # Ensure parsed_date is a datetime object 

141 if ( 

142 isinstance(parsed_date, datetime) and parsed_date > base_time 

143 ): # Only future dates 

144 return datetime( 

145 parsed_date.year, 

146 parsed_date.month, 

147 parsed_date.day, 

148 parsed_date.hour, 

149 parsed_date.minute, 

150 parsed_date.second, 

151 ) 

152 except (ValueError, TypeError): 

153 with contextlib.suppress(ValueError, TypeError): 

154 pass 

155 return None 

156 

157 def _validate_input(self, expression: str) -> str | None: 

158 """Validate and normalize input expression.""" 

159 if not expression or not expression.strip(): 

160 return None 

161 return expression.lower().strip() 

162 

163 def _try_parsing_strategies( 

164 self, expression: str, base_time: datetime 

165 ) -> datetime | None: 

166 """Try multiple parsing strategies in order.""" 

167 # Strategy 1: Relative patterns 

168 result = self._try_parse_relative_pattern( 

169 expression, base_time, self.time_patterns 

170 ) 

171 if result: 

172 return result 

173 

174 # Strategy 2: Absolute date parsing 

175 result = self._try_parse_absolute_date(expression, base_time) 

176 if result: 

177 return result 

178 

179 return None 

180 

181 def parse_time_expression( 

182 self, 

183 expression: str, 

184 base_time: datetime | None = None, 

185 ) -> datetime | None: 

186 """Parse natural language time expression.""" 

187 normalized_expression = self._validate_input(expression) 

188 if not normalized_expression: 

189 return None 

190 

191 base_time = base_time or datetime.now() 

192 return self._try_parsing_strategies(normalized_expression, base_time) 

193 

194 def parse_recurrence(self, expression: str) -> str | None: 

195 """Parse recurrence pattern from natural language.""" 

196 if not expression: 

197 return None 

198 

199 expression = expression.lower().strip() 

200 

201 for pattern, handler in self.recurrence_patterns.items(): 

202 match = re.search( 

203 pattern, expression, re.IGNORECASE 

204 ) # REGEX OK: Recurrence parsing 

205 if match: 

206 if callable(handler): 

207 result = handler(match) 

208 if isinstance(result, str): 

209 return result 

210 elif isinstance(handler, str): 

211 return handler 

212 

213 return None 

214 

215 def _parse_tomorrow(self, match: Match[str]) -> datetime: 

216 """Parse 'tomorrow' with optional time.""" 

217 tomorrow = datetime.now() + timedelta(days=1) 

218 

219 if match.group(2) and match.group(3): # Has time 

220 hour = int(match.group(2)) 

221 minute = int(match.group(3)) 

222 am_pm = match.group(4) 

223 

224 if am_pm and am_pm.lower() == "pm" and hour != 12: 

225 hour += 12 

226 elif am_pm and am_pm.lower() == "am" and hour == 12: 

227 hour = 0 

228 

229 return tomorrow.replace(hour=hour, minute=minute, second=0, microsecond=0) 

230 # Default to 9 AM tomorrow 

231 return tomorrow.replace(hour=9, minute=0, second=0, microsecond=0) 

232 

233 def _parse_next_weekday(self, match: Match[str]) -> datetime: 

234 """Parse 'next monday', etc.""" 

235 weekdays = { 

236 "monday": 0, 

237 "tuesday": 1, 

238 "wednesday": 2, 

239 "thursday": 3, 

240 "friday": 4, 

241 "saturday": 5, 

242 "sunday": 6, 

243 } 

244 

245 target_weekday = weekdays[match.group(1)] 

246 today = datetime.now() 

247 days_ahead = target_weekday - today.weekday() 

248 

249 if days_ahead <= 0: # Target day already happened this week 

250 days_ahead += 7 

251 

252 return today + timedelta(days=days_ahead) 

253 

254 def _parse_specific_time(self, match: Match[str]) -> datetime: 

255 """Parse 'at 3:30pm' for today.""" 

256 hour = int(match.group(1)) 

257 minute = int(match.group(2)) if match.group(2) else 0 

258 am_pm = match.group(3) 

259 

260 if am_pm and am_pm.lower() == "pm" and hour != 12: 

261 hour += 12 

262 elif am_pm and am_pm.lower() == "am" and hour == 12: 

263 hour = 0 

264 

265 target_time = datetime.now().replace( 

266 hour=hour, 

267 minute=minute, 

268 second=0, 

269 microsecond=0, 

270 ) 

271 

272 # If time has passed today, schedule for tomorrow 

273 if target_time <= datetime.now(): 

274 target_time += timedelta(days=1) 

275 

276 return target_time 

277 

278 def _parse_weekday_time(self, match: Match[str]) -> datetime: 

279 """Parse 'monday at 3pm'.""" 

280 target_weekday = self._get_weekday_number(match.group(1)) 

281 hour, minute = self._parse_hour_minute( 

282 match.group(2), match.group(3), match.group(4) 

283 ) 

284 

285 today = datetime.now() 

286 days_ahead = self._calculate_days_ahead(target_weekday, today, hour, minute) 

287 

288 target_date = today + timedelta(days=days_ahead) 

289 return target_date.replace(hour=hour, minute=minute, second=0, microsecond=0) 

290 

291 def _get_weekday_number(self, weekday_name: str) -> int: 

292 """Get weekday number from name.""" 

293 weekdays = { 

294 "monday": 0, 

295 "tuesday": 1, 

296 "wednesday": 2, 

297 "thursday": 3, 

298 "friday": 4, 

299 "saturday": 5, 

300 "sunday": 6, 

301 } 

302 return weekdays[weekday_name] 

303 

304 def _parse_hour_minute( 

305 self, hour_str: str, minute_str: str | None, am_pm: str | None 

306 ) -> tuple[int, int]: 

307 """Parse hour and minute from time components.""" 

308 hour = int(hour_str) 

309 minute = int(minute_str) if minute_str else 0 

310 

311 if am_pm and am_pm.lower() == "pm" and hour != 12: 

312 hour += 12 

313 elif am_pm and am_pm.lower() == "am" and hour == 12: 

314 hour = 0 

315 

316 return hour, minute 

317 

318 def _calculate_days_ahead( 

319 self, target_weekday: int, today: datetime, hour: int, minute: int 

320 ) -> int: 

321 """Calculate how many days ahead the target weekday is.""" 

322 days_ahead = target_weekday - today.weekday() 

323 

324 if days_ahead < 0: # Target day already happened this week 

325 days_ahead += 7 

326 elif days_ahead == 0: # Today - check if time has passed 

327 target_time = today.replace( 

328 hour=hour, minute=minute, second=0, microsecond=0 

329 ) 

330 if target_time <= today: 

331 days_ahead = 7 

332 

333 return days_ahead