Coverage for session_buddy / app_monitor.py: 66.60%

416 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Application-Aware Context Monitoring for Session Management MCP Server. 

3 

4Monitors IDE activity and browser documentation to enrich session context. 

5Excludes Slack/Discord as per Phase 4 requirements. 

6""" 

7 

8import asyncio 

9import json 

10import operator 

11import sqlite3 

12from collections import defaultdict 

13from contextlib import suppress 

14from dataclasses import dataclass 

15from datetime import datetime, timedelta 

16from pathlib import Path 

17from typing import TYPE_CHECKING, Any 

18 

19if TYPE_CHECKING: 

20 import psutil 

21 # FileSystemEventHandler and Observer imported at runtime below 

22 

23try: 

24 from watchdog.events import FileSystemEventHandler 

25 from watchdog.observers import Observer 

26 

27 WATCHDOG_AVAILABLE = True 

28except ImportError: 

29 WATCHDOG_AVAILABLE = False 

30 

31 # Create stub for FileSystemEventHandler when watchdog is not available 

32 class FileSystemEventHandler: # type: ignore[no-redef] 

33 """Stub base class when watchdog is not available.""" 

34 

35 def __init__(self) -> None: # type: ignore[no-redef] 

36 super().__init__() # type: ignore[misc] 

37 

38 # Create stub for Observer when watchdog is not available 

39 class Observer: # type: ignore[no-redef] 

40 def __init__(self) -> None: 

41 pass 

42 

43 def schedule( 

44 self, event_handler: Any, path: str, recursive: bool = True 

45 ) -> None: 

46 pass 

47 

48 def start(self) -> None: 

49 pass 

50 

51 def stop(self) -> None: 

52 pass 

53 

54 def join(self) -> None: 

55 pass 

56 

57 

58try: 

59 import psutil 

60 

61 PSUTIL_AVAILABLE = True 

62except ImportError: 

63 PSUTIL_AVAILABLE = False 

64 

65 

66@dataclass 

67class ActivityEvent: 

68 """Represents a monitored activity event.""" 

69 

70 timestamp: str 

71 event_type: str # 'file_change', 'app_focus', 'browser_nav' 

72 application: str 

73 details: dict[str, Any] 

74 project_path: str | None = None 

75 relevance_score: float = 0.0 

76 

77 

78class ProjectActivityMonitor: 

79 """Monitors project activity including file changes and application focus.""" 

80 

81 def __init__(self, project_paths: list[str] | None = None) -> None: 

82 """Initialize activity monitor.""" 

83 from session_buddy.settings import get_settings 

84 

85 self._settings = get_settings() 

86 self.project_paths = project_paths or [] 

87 self.db_path = str(Path.home() / ".claude" / "data" / "activity.db") 

88 self.observers: list[Any] = [] 

89 self.activity_buffer: list[ActivityEvent] = [] 

90 self.last_activity: dict[str, Any] = {} 

91 self.ide_extensions = { 

92 ".py", 

93 ".js", 

94 ".ts", 

95 ".jsx", 

96 ".tsx", 

97 ".java", 

98 ".cpp", 

99 ".c", 

100 ".h", 

101 ".rs", 

102 ".go", 

103 ".php", 

104 ".rb", 

105 ".swift", 

106 ".kt", 

107 ".scala", 

108 ".cs", 

109 ".html", 

110 ".css", 

111 ".scss", 

112 ".vue", 

113 ".svelte", 

114 ".json", 

115 ".yaml", 

116 } 

117 

118 def _init_database(self) -> None: 

119 """Initialize database tables.""" 

120 with sqlite3.connect(self.db_path) as conn: 

121 conn.execute(""" 

122 CREATE TABLE IF NOT EXISTS activity_events ( 

123 id INTEGER PRIMARY KEY AUTOINCREMENT, 

124 timestamp TEXT NOT NULL, 

125 event_type TEXT NOT NULL, 

126 application TEXT NOT NULL, 

127 details TEXT NOT NULL, 

128 project_path TEXT, 

129 relevance_score REAL DEFAULT 0.0, 

130 created_at TEXT DEFAULT CURRENT_TIMESTAMP 

131 ) 

132 """) 

133 

134 def start_monitoring(self) -> bool: 

135 """Start file system monitoring.""" 

136 if not WATCHDOG_AVAILABLE: 136 ↛ 139line 136 didn't jump to line 139 because the condition on line 136 was always true

137 return False 

138 

139 if WATCHDOG_AVAILABLE: 

140 for path in self.project_paths: 

141 if Path(path).exists(): 

142 event_handler = IDEFileHandler(self) 

143 observer = Observer() 

144 observer.schedule(event_handler, path, recursive=True) 

145 observer.start() 

146 self.observers.append(observer) 

147 

148 return len(self.observers) > 0 

149 

150 def stop_monitoring(self) -> None: 

151 """Stop file system monitoring.""" 

152 if WATCHDOG_AVAILABLE: 152 ↛ 156line 152 didn't jump to line 156 because the condition on line 152 was always true

153 for observer in self.observers: 

154 observer.stop() 

155 observer.join() 

156 self.observers.clear() 

157 

158 def add_activity(self, event: ActivityEvent) -> None: 

159 """Add activity event to buffer.""" 

160 self.activity_buffer.append(event) 

161 

162 # Keep buffer size manageable 

163 if len(self.activity_buffer) > 1000: 

164 self.activity_buffer = self.activity_buffer[-500:] 

165 

166 def get_recent_activity(self, minutes: int = 30) -> list[ActivityEvent]: 

167 """Get recent activity within specified minutes.""" 

168 cutoff = datetime.now() - timedelta(minutes=minutes) 

169 cutoff_str = cutoff.isoformat() 

170 

171 return [ 

172 event for event in self.activity_buffer if event.timestamp >= cutoff_str 

173 ] 

174 

175 def get_active_files(self, minutes: int = 60) -> list[dict[str, Any]]: 

176 """Get files actively being worked on.""" 

177 recent_events = self.get_recent_activity(minutes) 

178 file_activity: dict[str, list[ActivityEvent]] = defaultdict(list) 

179 

180 for event in recent_events: 

181 if event.event_type == "file_change" and "file_path" in event.details: 181 ↛ 180line 181 didn't jump to line 180 because the condition on line 181 was always true

182 file_path = event.details["file_path"] 

183 file_activity[file_path].append(event) 

184 

185 # Score files by activity frequency and recency 

186 active_files = [] 

187 for file_path, events in file_activity.items(): 

188 score = len(events) 

189 latest_event = max(events, key=lambda e: e.timestamp) 

190 

191 # Boost score for recent activity 

192 time_diff = datetime.now() - datetime.fromisoformat(latest_event.timestamp) 

193 if time_diff.total_seconds() < 300: # 5 minutes 193 ↛ 196line 193 didn't jump to line 196 because the condition on line 193 was always true

194 score *= 2 

195 

196 active_files.append( 

197 { 

198 "file_path": file_path, 

199 "activity_score": score, 

200 "event_count": len(events), 

201 "last_activity": latest_event.timestamp, 

202 "project_path": latest_event.project_path, 

203 }, 

204 ) 

205 

206 return sorted( 

207 active_files, key=operator.itemgetter("activity_score"), reverse=True 

208 ) 

209 

210 

211class IDEFileHandler(FileSystemEventHandler): # type: ignore[misc] 

212 """Handles file system events for IDE monitoring.""" 

213 

214 def __init__(self, monitor: ProjectActivityMonitor) -> None: 

215 self.monitor = monitor 

216 # Merge settings-driven ignore dirs 

217 self.ignore_patterns = set(self.monitor._settings.filesystem_ignore_dirs) 

218 self.ignore_patterns.add(".vscode/settings.json") 

219 

220 # Critical file patterns for smart thresholding 

221 self.critical_patterns = { 

222 "auth": ["auth", "login", "session", "jwt", "oauth"], 

223 "database": ["db", "database", "migration", "schema"], 

224 "config": ["config", "settings", "env"], 

225 "api": ["api", "endpoint", "route", "controller"], 

226 "security": ["security", "encrypt", "hash", "crypto"], 

227 } 

228 self._recent_ttl_seconds = self.monitor._settings.filesystem_dedupe_ttl_seconds 

229 

230 def should_ignore(self, file_path: str) -> bool: 

231 """Check if file should be ignored.""" 

232 path = Path(file_path) 

233 

234 # Check ignore patterns 

235 for part in path.parts: 

236 if part in self.ignore_patterns: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 return True 

238 

239 # Check if it's a relevant file extension 

240 if path.suffix not in self.monitor.ide_extensions: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 return True 

242 

243 # Ignore large or temporary files 

244 max_size = self.monitor._settings.filesystem_max_file_size_bytes 

245 with suppress(Exception): 

246 if path.exists() and path.is_file() and path.stat().st_size > max_size: 246 ↛ 249line 246 didn't jump to line 249

247 return True 

248 

249 return bool(path.name.startswith(".") or path.name.endswith("~")) 

250 

251 def on_modified(self, event: Any) -> None: 

252 """Handle file modification events.""" 

253 if event.is_directory or self.should_ignore(event.src_path): 

254 return 

255 

256 src_path = Path(event.src_path) 

257 project_path = self._determine_project_path(src_path) 

258 

259 activity_event = self._create_activity_event(src_path, project_path) 

260 self.monitor.add_activity(activity_event) 

261 

262 self._try_entity_extraction(activity_event, src_path, project_path) 

263 

264 def _determine_project_path(self, src_path: Path) -> str | None: 

265 """Determine which project this file belongs to.""" 

266 for proj_path in self.monitor.project_paths: 

267 if src_path.is_relative_to(proj_path): 

268 return proj_path 

269 return None 

270 

271 def _create_activity_event( 

272 self, 

273 src_path: Path, 

274 project_path: str | None, 

275 ) -> ActivityEvent: 

276 """Create an activity event for file modification.""" 

277 return ActivityEvent( 

278 timestamp=datetime.now().isoformat(), 

279 event_type="file_change", 

280 application="ide", 

281 details={ 

282 "file_path": str(src_path), 

283 "file_name": src_path.name, 

284 "file_extension": src_path.suffix, 

285 "change_type": "modified", 

286 }, 

287 project_path=project_path, 

288 relevance_score=self._estimate_relevance(src_path), 

289 ) 

290 

291 def _try_entity_extraction( 

292 self, 

293 activity_event: ActivityEvent, 

294 src_path: Path, 

295 project_path: str | None, 

296 ) -> None: 

297 """Try to extract entities if feature flags are enabled.""" 

298 from session_buddy.config.feature_flags import get_feature_flags 

299 

300 flags = get_feature_flags() 

301 if not ( 

302 flags.enable_filesystem_extraction and flags.enable_llm_entity_extraction 

303 ): 

304 return 

305 

306 if not self._passes_threshold(activity_event): 

307 return 

308 

309 if self._recently_processed_persisted(str(src_path)): 

310 return 

311 

312 self._fire_and_forget_extraction(activity_event, src_path, project_path) 

313 

314 def _fire_and_forget_extraction( 

315 self, 

316 activity_event: ActivityEvent, 

317 src_path: Path, 

318 project_path: str | None, 

319 ) -> None: 

320 """Fire-and-forget entity extraction task.""" 

321 with suppress(Exception): 

322 import asyncio as _asyncio 

323 

324 from session_buddy.memory.file_context import build_file_context 

325 from session_buddy.tools.entity_extraction_tools import ( 

326 extract_and_store_memory as _extract, 

327 ) 

328 

329 ctx = build_file_context(str(src_path)) 

330 snippet = ctx.get("snippet", "") 

331 

332 _asyncio.create_task( 

333 _extract( 

334 user_input=f"Updated file: {src_path.name}\nContext: {ctx['metadata']}", 

335 ai_output=snippet, 

336 project=project_path, 

337 activity_score=activity_event.relevance_score, 

338 ) 

339 ) 

340 

341 def _recently_processed_persisted(self, file_path: str) -> bool: 

342 """Check and update persistent recent-extractions cache. 

343 

344 Returns True if the given file was processed within the TTL window. 

345 """ 

346 try: 

347 # Use the same SQLite DB as activity log 

348 import sqlite3 as _sql 

349 from time import time as _now 

350 

351 self._ensure_recent_table() 

352 with _sql.connect(self.monitor.db_path) as conn: 

353 cur = conn.cursor() 

354 cur.execute( 

355 "SELECT last_extracted FROM recent_extractions WHERE file_path=?", 

356 (file_path,), 

357 ) 

358 row = cur.fetchone() 

359 now = int(_now()) 

360 if row: 

361 last = int(row[0]) 

362 if now - last < self._recent_ttl_seconds: 

363 return True 

364 # Update timestamp 

365 cur.execute( 

366 "UPDATE recent_extractions SET last_extracted=? WHERE file_path=?", 

367 (now, file_path), 

368 ) 

369 else: 

370 cur.execute( 

371 "INSERT INTO recent_extractions (file_path, last_extracted) VALUES (?, ?)", 

372 (file_path, now), 

373 ) 

374 conn.commit() 

375 except Exception: 

376 # On any error, fall back to allowing processing to avoid missing events 

377 return False 

378 return False 

379 

380 def _ensure_recent_table(self) -> None: 

381 """Ensure the persistent recent_extractions table exists.""" 

382 with suppress(Exception): 

383 import sqlite3 as _sql 

384 

385 with _sql.connect(self.monitor.db_path) as conn: 

386 conn.execute( 

387 """ 

388 CREATE TABLE IF NOT EXISTS recent_extractions ( 

389 file_path TEXT PRIMARY KEY, 

390 last_extracted INTEGER 

391 ) 

392 """ 

393 ) 

394 

395 def _estimate_relevance(self, path: Path) -> float: 

396 name = path.name.lower() 

397 for keywords in self.critical_patterns.values(): 

398 if any(k in name for k in keywords): 

399 return 0.95 

400 return 0.8 

401 

402 def _passes_threshold(self, event: ActivityEvent) -> bool: 

403 # Base threshold 

404 if event.relevance_score < 0.7: 

405 return False 

406 # Stricter threshold for non-critical files 

407 name = event.details.get("file_name", "").lower() 

408 critical = any( 

409 any(k in name for k in kw) for kw in self.critical_patterns.values() 

410 ) 

411 return not (not critical and event.relevance_score < 0.9) 

412 

413 

414class BrowserDocumentationMonitor: 

415 """Monitors browser activity for documentation sites.""" 

416 

417 def __init__(self) -> None: 

418 self.doc_domains = { 

419 "docs.python.org", 

420 "developer.mozilla.org", 

421 "docs.rs", 

422 "docs.oracle.com", 

423 "docs.microsoft.com", 

424 "docs.aws.amazon.com", 

425 "cloud.google.com", 

426 "docs.github.com", 

427 "docs.gitlab.com", 

428 "stackoverflow.com", 

429 "github.com", 

430 "fastapi.tiangolo.com", 

431 "pydantic-docs.helpmanual.io", 

432 "django-documentation", 

433 "flask.palletsprojects.com", 

434 "nodejs.org", 

435 "reactjs.org", 

436 "vuejs.org", 

437 "angular.io", 

438 "svelte.dev", 

439 } 

440 self.activity_buffer: list[ActivityEvent] = [] 

441 self.browser_processes: set[str] = set() 

442 

443 def get_browser_processes(self) -> list[dict[str, Any]]: 

444 """Get currently running browser processes.""" 

445 if not PSUTIL_AVAILABLE: 445 ↛ 448line 445 didn't jump to line 448 because the condition on line 445 was always true

446 return [] 

447 

448 browsers = [] 

449 browser_names = {"chrome", "firefox", "safari", "edge", "brave"} 

450 

451 try: 

452 for proc in psutil.process_iter(["pid", "name", "create_time"]): 

453 proc_name = proc.info["name"].lower() 

454 if any(browser in proc_name for browser in browser_names): 

455 browsers.append( 

456 { 

457 "pid": proc.info["pid"], 

458 "name": proc.info["name"], 

459 "create_time": proc.info["create_time"], 

460 }, 

461 ) 

462 except (psutil.NoSuchProcess, psutil.AccessDenied): 

463 pass 

464 

465 return browsers 

466 

467 def extract_documentation_context(self, url: str) -> dict[str, Any]: 

468 """Extract context from documentation URLs.""" 

469 context = {"domain": "", "technology": "", "topic": "", "relevance": 0.0} 

470 

471 from contextlib import suppress 

472 

473 with suppress(ValueError, AttributeError): 

474 from urllib.parse import urlparse 

475 

476 parsed = urlparse(url) 

477 domain = parsed.netloc 

478 path = parsed.path 

479 

480 context["domain"] = domain 

481 technology, relevance = self._determine_technology(domain, path) 

482 context["technology"] = technology 

483 context["relevance"] = relevance 

484 context["topic"] = self._extract_topic(path) 

485 

486 return context 

487 

488 def _determine_technology(self, domain: str, path: str) -> tuple[str, float]: 

489 """Return technology label and relevance score for a domain/path.""" 

490 normalized_path = path.lower() 

491 normalized_domain = domain.lower() 

492 

493 tech_rules = [ 

494 ( 

495 "python" in normalized_domain or "python" in normalized_path, 

496 "python", 

497 0.9, 

498 ), 

499 ( 

500 "javascript" in normalized_path 

501 or "js" in normalized_path 

502 or normalized_domain in {"developer.mozilla.org", "nodejs.org"}, 

503 "javascript", 

504 0.8, 

505 ), 

506 ( 

507 "rust" in normalized_domain or "docs.rs" in normalized_domain, 

508 "rust", 

509 0.8, 

510 ), 

511 ( 

512 any( 

513 framework in normalized_domain 

514 for framework in ("django", "flask", "fastapi") 

515 ), 

516 "python-web", 

517 0.9, 

518 ), 

519 ( 

520 any( 

521 framework in normalized_domain 

522 for framework in ("react", "vue", "angular", "svelte") 

523 ), 

524 "frontend", 

525 0.8, 

526 ), 

527 ] 

528 

529 for condition, label, relevance in tech_rules: 529 ↛ 533line 529 didn't jump to line 533 because the loop on line 529 didn't complete

530 if condition: 530 ↛ 529line 530 didn't jump to line 529 because the condition on line 530 was always true

531 return label, relevance 

532 

533 return "", 0.0 

534 

535 def _extract_topic(self, path: str) -> str: 

536 """Derive topic from a URL path.""" 

537 path_parts = [p for p in path.split("/") if p] 

538 if not path_parts: 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true

539 return "" 

540 

541 tail = path_parts[-1] 

542 if tail == "index.html" and len(path_parts) > 1: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 return path_parts[-2] 

544 return tail 

545 

546 def add_browser_activity(self, url: str, title: str = "") -> None: 

547 """Add browser navigation activity.""" 

548 context = self.extract_documentation_context(url) 

549 

550 activity_event = ActivityEvent( 

551 timestamp=datetime.now().isoformat(), 

552 event_type="browser_nav", 

553 application="browser", 

554 details={ 

555 "url": url, 

556 "title": title, 

557 "domain": context["domain"], 

558 "technology": context["technology"], 

559 "topic": context["topic"], 

560 }, 

561 relevance_score=context["relevance"], 

562 ) 

563 

564 self.activity_buffer.append(activity_event) 

565 

566 # Keep buffer manageable 

567 if len(self.activity_buffer) > 500: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 self.activity_buffer = self.activity_buffer[-250:] 

569 

570 

571class ApplicationFocusMonitor: 

572 """Monitors application focus changes.""" 

573 

574 def __init__(self) -> None: 

575 self.focus_history: list[ActivityEvent] = [] 

576 self.current_app: str | None = None 

577 self.app_categories = { 

578 "ide": { 

579 "code", 

580 "pycharm", 

581 "vscode", 

582 "sublime", 

583 "atom", 

584 "vim", 

585 "emacs", 

586 "intellij", 

587 }, 

588 "browser": {"chrome", "firefox", "safari", "edge", "brave"}, 

589 "terminal": { 

590 "terminal", 

591 "term", 

592 "console", 

593 "cmd", 

594 "powershell", 

595 "zsh", 

596 "bash", 

597 }, 

598 "documentation": {"devdocs", "dash", "zeal"}, 

599 } 

600 

601 def get_focused_application(self) -> dict[str, Any] | None: 

602 """Get currently focused application.""" 

603 if not PSUTIL_AVAILABLE: 

604 return None 

605 

606 try: 

607 # This is a simplified version - would need platform-specific implementation 

608 # for full window focus detection 

609 for proc in psutil.process_iter(["pid", "name"]): 609 ↛ 621line 609 didn't jump to line 621 because the loop on line 609 didn't complete

610 proc_name = proc.info["name"].lower() 

611 category = self._categorize_app(proc_name) 

612 if category: 

613 return { 

614 "name": proc.info["name"], 

615 "category": category, 

616 "pid": proc.info["pid"], 

617 } 

618 except (psutil.NoSuchProcess, psutil.AccessDenied): 

619 pass 

620 

621 return None 

622 

623 def _categorize_app(self, app_name: str) -> str | None: 

624 """Categorize application by name.""" 

625 for category, keywords in self.app_categories.items(): 

626 if any(keyword in app_name for keyword in keywords): 

627 return category 

628 return None 

629 

630 def add_focus_event(self, app_info: dict[str, Any]) -> None: 

631 """Add application focus event.""" 

632 activity_event = ActivityEvent( 

633 timestamp=datetime.now().isoformat(), 

634 event_type="app_focus", 

635 application=app_info["name"], 

636 details={"category": app_info["category"], "pid": app_info["pid"]}, 

637 relevance_score=0.6 if app_info["category"] in {"ide", "terminal"} else 0.3, 

638 ) 

639 

640 self.focus_history.append(activity_event) 

641 

642 # Keep history manageable 

643 if len(self.focus_history) > 200: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true

644 self.focus_history = self.focus_history[-100:] 

645 

646 

647class ActivityDatabase: 

648 """SQLite database for storing activity events.""" 

649 

650 def __init__(self, db_path: str) -> None: 

651 self.db_path = db_path 

652 self._init_database() 

653 

654 def _init_database(self) -> None: 

655 """Initialize database tables.""" 

656 with sqlite3.connect(self.db_path) as conn: 

657 conn.execute(""" 

658 CREATE TABLE IF NOT EXISTS activity_events ( 

659 id INTEGER PRIMARY KEY AUTOINCREMENT, 

660 timestamp TEXT NOT NULL, 

661 event_type TEXT NOT NULL, 

662 application TEXT NOT NULL, 

663 details TEXT NOT NULL, 

664 project_path TEXT, 

665 relevance_score REAL, 

666 created_at TEXT DEFAULT CURRENT_TIMESTAMP 

667 ) 

668 """) 

669 

670 conn.execute(""" 

671 CREATE INDEX IF NOT EXISTS idx_timestamp ON activity_events(timestamp) 

672 """) 

673 

674 conn.execute(""" 

675 CREATE INDEX IF NOT EXISTS idx_event_type ON activity_events(event_type) 

676 """) 

677 

678 def store_event(self, event: ActivityEvent) -> None: 

679 """Store activity event in database.""" 

680 with sqlite3.connect(self.db_path) as conn: 

681 conn.execute( 

682 """ 

683 INSERT INTO activity_events 

684 (timestamp, event_type, application, details, project_path, relevance_score) 

685 VALUES (?, ?, ?, ?, ?, ?) 

686 """, 

687 ( 

688 event.timestamp, 

689 event.event_type, 

690 event.application, 

691 json.dumps(event.details), 

692 event.project_path, 

693 event.relevance_score, 

694 ), 

695 ) 

696 

697 def get_events( 

698 self, 

699 start_time: str | None = None, 

700 end_time: str | None = None, 

701 event_types: list[str] | None = None, 

702 limit: int = 100, 

703 ) -> list[ActivityEvent]: 

704 """Retrieve activity events from database.""" 

705 with sqlite3.connect(self.db_path) as conn: 

706 query = "SELECT * FROM activity_events WHERE 1=1" 

707 params: list[Any] = [] 

708 

709 if start_time: 

710 query += " AND timestamp >= ?" 

711 params.append(start_time) 

712 

713 if end_time: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 query += " AND timestamp <= ?" 

715 params.append(end_time) 

716 

717 if event_types: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true

718 placeholders = ",".join("?" * len(event_types)) 

719 query += f" AND event_type IN ({placeholders})" 

720 params.extend(event_types) 

721 

722 query += " ORDER BY timestamp DESC LIMIT ?" 

723 params.append(limit) 

724 

725 cursor = conn.execute(query, params) 

726 rows = cursor.fetchall() 

727 

728 return [ 

729 ActivityEvent( 

730 timestamp=row[1], 

731 event_type=row[2], 

732 application=row[3], 

733 details=json.loads(row[4]), 

734 project_path=row[5], 

735 relevance_score=row[6] or 0.0, 

736 ) 

737 for row in rows 

738 ] 

739 

740 def cleanup_old_events(self, days_to_keep: int = 30) -> None: 

741 """Remove old activity events.""" 

742 cutoff = datetime.now() - timedelta(days=days_to_keep) 

743 cutoff_str = cutoff.isoformat() 

744 

745 with sqlite3.connect(self.db_path) as conn: 

746 conn.execute( 

747 "DELETE FROM activity_events WHERE timestamp < ?", 

748 (cutoff_str,), 

749 ) 

750 

751 

752class ApplicationMonitor: 

753 """Main application monitoring coordinator.""" 

754 

755 def __init__(self, data_dir: str, project_paths: list[str] | None = None) -> None: 

756 self.data_dir = Path(data_dir) 

757 self.project_paths = project_paths or [] 

758 

759 self._setup_directory() 

760 self._initialize_components() 

761 self._setup_monitoring_state() 

762 

763 def _setup_directory(self) -> None: 

764 """Set up the data directory.""" 

765 self.data_dir.mkdir(parents=True, exist_ok=True) 

766 

767 def _initialize_components(self) -> None: 

768 """Initialize monitoring components.""" 

769 self.db = ActivityDatabase(str(self.data_dir / "activity.db")) 

770 self.ide_monitor = ProjectActivityMonitor(self.project_paths) 

771 self.browser_monitor = BrowserDocumentationMonitor() 

772 self.focus_monitor = ApplicationFocusMonitor() 

773 

774 def _setup_monitoring_state(self) -> None: 

775 """Set up monitoring state variables.""" 

776 self.monitoring_active = False 

777 self._monitoring_task: asyncio.Task[Any] | None = None 

778 

779 async def start_monitoring(self) -> dict[str, Any] | None: 

780 """Start all monitoring components.""" 

781 if self.monitoring_active: 781 ↛ 782line 781 didn't jump to line 782 because the condition on line 781 was never true

782 return None 

783 

784 self.monitoring_active = True 

785 

786 # Start IDE monitoring 

787 ide_started = self.ide_monitor.start_monitoring() 

788 

789 # Start background monitoring task 

790 self._monitoring_task = asyncio.create_task(self._monitoring_loop()) 

791 

792 return { 

793 "ide_monitoring": ide_started, 

794 "watchdog_available": WATCHDOG_AVAILABLE, 

795 "psutil_available": PSUTIL_AVAILABLE, 

796 "project_paths": self.project_paths, 

797 } 

798 

799 async def stop_monitoring(self) -> None: 

800 """Stop all monitoring.""" 

801 self.monitoring_active = False 

802 

803 if self._monitoring_task: 803 ↛ 804line 803 didn't jump to line 804 because the condition on line 803 was never true

804 self._monitoring_task.cancel() 

805 with suppress(asyncio.CancelledError): 

806 await self._monitoring_task 

807 

808 self.ide_monitor.stop_monitoring() 

809 

810 async def _monitoring_loop(self) -> None: 

811 """Background monitoring loop.""" 

812 while self.monitoring_active: 812 ↛ exitline 812 didn't return from function '_monitoring_loop' because the condition on line 812 was always true

813 try: 

814 await self._process_monitoring_cycle() 

815 await asyncio.sleep(30) # Check every 30 seconds 

816 except asyncio.CancelledError: 

817 break 

818 except Exception as e: 

819 await self._handle_monitoring_error(e) 

820 

821 async def _process_monitoring_cycle(self) -> None: 

822 """Process a single monitoring cycle.""" 

823 await self._check_application_focus() 

824 await self._persist_buffered_events() 

825 

826 async def _check_application_focus(self) -> None: 

827 """Check and update application focus if changed.""" 

828 focused_app = self.focus_monitor.get_focused_application() 

829 if self._is_focus_changed(focused_app) and focused_app is not None: 829 ↛ exitline 829 didn't return from function '_check_application_focus' because the condition on line 829 was always true

830 self.focus_monitor.add_focus_event(focused_app) 

831 self.focus_monitor.current_app = focused_app.get("name") 

832 

833 def _is_focus_changed(self, focused_app: dict[str, Any] | None) -> bool: 

834 """Check if application focus has changed.""" 

835 return ( 

836 focused_app is not None 

837 and focused_app.get("name") != self.focus_monitor.current_app 

838 ) 

839 

840 async def _persist_buffered_events(self) -> None: 

841 """Persist all buffered events to database.""" 

842 # Use generator expressions for memory efficiency 

843 await self._persist_event_batch( 

844 event for event in self.ide_monitor.activity_buffer[-10:] 

845 ) 

846 await self._persist_event_batch( 

847 event for event in self.browser_monitor.activity_buffer[-10:] 

848 ) 

849 await self._persist_event_batch( 

850 event for event in self.focus_monitor.focus_history[-5:] 

851 ) 

852 

853 async def _persist_event_batch( 

854 self, 

855 events: Any, # Generator of ActivityEvent 

856 ) -> None: 

857 """Persist a batch of events to the database.""" 

858 for event in events: 

859 self.db.store_event(event) 

860 

861 async def _handle_monitoring_error(self, error: Exception) -> None: 

862 """Handle monitoring errors with appropriate logging and delay.""" 

863 # Log error but continue monitoring 

864 await asyncio.sleep(60) 

865 

866 def get_activity_summary(self, hours: int = 2) -> dict[str, Any]: 

867 """Get activity summary for specified hours.""" 

868 start_time = (datetime.now() - timedelta(hours=hours)).isoformat() 

869 events = self.db.get_events(start_time=start_time, limit=500) 

870 

871 summary = self._create_activity_summary_template(hours, events) 

872 self._aggregate_event_data(events, summary) 

873 self._add_additional_context(hours, summary) 

874 return self._finalize_summary(summary) 

875 

876 def _create_activity_summary_template( 

877 self, 

878 hours: int, 

879 events: list[Any], 

880 ) -> dict[str, Any]: 

881 """Create the base summary template.""" 

882 return { 

883 "total_events": len(events), 

884 "time_range_hours": hours, 

885 "event_types": defaultdict(int), 

886 "applications": defaultdict(int), 

887 "active_files": [], 

888 "documentation_sites": [], 

889 "average_relevance": 0.0, 

890 } 

891 

892 def _aggregate_event_data(self, events: list[Any], summary: dict[str, Any]) -> None: 

893 """Aggregate event data into the summary.""" 

894 total_relevance: float = 0.0 

895 doc_sites = set() 

896 

897 for event in events: 

898 summary["event_types"][event.event_type] += 1 

899 summary["applications"][event.application] += 1 

900 total_relevance += event.relevance_score 

901 

902 if event.event_type == "browser_nav" and event.details.get("domain"): 

903 doc_sites.add(event.details["domain"]) 

904 

905 if events: 

906 summary["average_relevance"] = total_relevance / len(events) 

907 

908 summary["documentation_sites"] = list(doc_sites) 

909 

910 def _add_additional_context(self, hours: int, summary: dict[str, Any]) -> None: 

911 """Add additional context information to the summary.""" 

912 summary["active_files"] = self.ide_monitor.get_active_files(hours * 60) 

913 

914 def _finalize_summary(self, summary: dict[str, Any]) -> dict[str, Any]: 

915 """Finalize summary by converting collections for JSON serialization.""" 

916 summary["event_types"] = dict(summary["event_types"]) 

917 summary["applications"] = dict(summary["applications"]) 

918 return summary 

919 

920 def get_context_insights(self, hours: int = 1) -> dict[str, Any]: 

921 """Get contextual insights from recent activity.""" 

922 start_time = (datetime.now() - timedelta(hours=hours)).isoformat() 

923 events = self.db.get_events(start_time=start_time, limit=200) 

924 

925 insights = self._create_insights_template() 

926 if not events: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 return insights 

928 

929 app_time = self._analyze_events(events, insights) 

930 self._determine_primary_focus(app_time, insights) 

931 self._calculate_productivity_score(events, insights) 

932 return self._finalize_insights(insights) 

933 

934 def _create_insights_template(self) -> dict[str, Any]: 

935 """Create the insights template dictionary.""" 

936 return { 

937 "primary_focus": None, 

938 "technologies_used": set(), 

939 "active_projects": set(), 

940 "documentation_topics": [], 

941 "productivity_score": 0.0, 

942 "context_switches": 0, 

943 } 

944 

945 def _analyze_events( 

946 self, 

947 events: list[Any], 

948 insights: dict[str, Any], 

949 ) -> dict[str, int]: 

950 """Analyze events and extract insights data.""" 

951 app_time: dict[str, int] = defaultdict(int) 

952 last_app = None 

953 

954 for event in events: 

955 app_time[event.application] += 1 

956 

957 if last_app and last_app != event.application: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 insights["context_switches"] += 1 

959 last_app = event.application 

960 

961 self._extract_event_data(event, insights) 

962 

963 return app_time 

964 

965 def _extract_event_data(self, event: Any, insights: dict[str, Any]) -> None: 

966 """Extract relevant data from a single event.""" 

967 self._extract_technologies(event, insights) 

968 self._extract_projects(event, insights) 

969 self._extract_documentation_topics(event, insights) 

970 

971 def _extract_technologies(self, event: Any, insights: dict[str, Any]) -> None: 

972 """Extract technology information from file change events.""" 

973 if event.event_type == "file_change": 973 ↛ 974line 973 didn't jump to line 974 because the condition on line 973 was never true

974 ext = event.details.get("file_extension", "") 

975 if ext == ".py": 

976 insights["technologies_used"].add("python") 

977 elif ext in {".js", ".ts"}: 

978 insights["technologies_used"].add("javascript") 

979 elif ext == ".rs": 

980 insights["technologies_used"].add("rust") 

981 

982 def _extract_projects(self, event: Any, insights: dict[str, Any]) -> None: 

983 """Extract project information from events.""" 

984 if event.project_path: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true

985 insights["active_projects"].add(event.project_path) 

986 

987 def _extract_documentation_topics( 

988 self, 

989 event: Any, 

990 insights: dict[str, Any], 

991 ) -> None: 

992 """Extract documentation topics from browser navigation events.""" 

993 if event.event_type == "browser_nav": 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true

994 topic = event.details.get("topic") 

995 technology = event.details.get("technology") 

996 if topic and technology: 

997 insights["documentation_topics"].append(f"{technology}: {topic}") 

998 

999 def _determine_primary_focus( 

1000 self, 

1001 app_time: dict[str, int], 

1002 insights: dict[str, Any], 

1003 ) -> None: 

1004 """Determine the primary application focus.""" 

1005 if app_time: 1005 ↛ exitline 1005 didn't return from function '_determine_primary_focus' because the condition on line 1005 was always true

1006 insights["primary_focus"] = max( 

1007 app_time.items(), key=operator.itemgetter(1) 

1008 )[0] 

1009 

1010 def _calculate_productivity_score( 

1011 self, 

1012 events: list[Any], 

1013 insights: dict[str, Any], 

1014 ) -> None: 

1015 """Calculate productivity score based on relevant activity.""" 

1016 relevant_events = [e for e in events if e.relevance_score > 0.5] 

1017 if events: 1017 ↛ exitline 1017 didn't return from function '_calculate_productivity_score' because the condition on line 1017 was always true

1018 insights["productivity_score"] = len(relevant_events) / len(events) 

1019 

1020 def _finalize_insights(self, insights: dict[str, Any]) -> dict[str, Any]: 

1021 """Convert sets to lists for JSON serialization.""" 

1022 insights["technologies_used"] = list(insights["technologies_used"]) 

1023 insights["active_projects"] = list(insights["active_projects"]) 

1024 return insights