Coverage for session_mgmt_mcp/app_monitor.py: 13.68%

310 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Application-Aware Context Monitoring for Session Management MCP Server. 

3 

4Monitors IDE activity and browser documentation to enrich session context. 

5Excludes Slack/Discord as per Phase 4 requirements. 

6""" 

7 

8import asyncio 

9import contextlib 

10import json 

11import sqlite3 

12from collections import defaultdict 

13from dataclasses import dataclass 

14from datetime import datetime, timedelta 

15from pathlib import Path 

16from typing import Any 

17 

18try: 

19 from watchdog.events import FileSystemEventHandler 

20 from watchdog.observers import Observer 

21 

22 WATCHDOG_AVAILABLE = True 

23except ImportError: 

24 WATCHDOG_AVAILABLE = False 

25 

26 # Create stub for FileSystemEventHandler when watchdog is not available 

27 class FileSystemEventHandler: 

28 pass 

29 

30 

31try: 

32 import psutil 

33 

34 PSUTIL_AVAILABLE = True 

35except ImportError: 

36 PSUTIL_AVAILABLE = False 

37 

38 

39@dataclass 

40class ActivityEvent: 

41 """Represents a monitored activity event.""" 

42 

43 timestamp: str 

44 event_type: str # 'file_change', 'app_focus', 'browser_nav' 

45 application: str 

46 details: dict[str, Any] 

47 project_path: str | None = None 

48 relevance_score: float = 0.0 

49 

50 

51class IDEActivityMonitor: 

52 """Monitors IDE file changes and activity.""" 

53 

54 def __init__(self, project_paths: list[str]) -> None: 

55 self.project_paths = project_paths 

56 self.observers = [] 

57 self.activity_buffer = [] 

58 self.last_activity = {} 

59 self.ide_extensions = { 

60 ".py", 

61 ".js", 

62 ".ts", 

63 ".jsx", 

64 ".tsx", 

65 ".java", 

66 ".cpp", 

67 ".c", 

68 ".h", 

69 ".rs", 

70 ".go", 

71 ".php", 

72 ".rb", 

73 ".swift", 

74 ".kt", 

75 ".scala", 

76 ".cs", 

77 ".html", 

78 ".css", 

79 ".scss", 

80 ".vue", 

81 ".svelte", 

82 ".json", 

83 ".yaml", 

84 ".yml", 

85 ".toml", 

86 ".ini", 

87 ".md", 

88 ".txt", 

89 ".sql", 

90 ".sh", 

91 ".bat", 

92 } 

93 

94 def start_monitoring(self): 

95 """Start file system monitoring.""" 

96 if not WATCHDOG_AVAILABLE: 

97 return False 

98 

99 for path in self.project_paths: 

100 if Path(path).exists(): 

101 event_handler = IDEFileHandler(self) 

102 observer = Observer() 

103 observer.schedule(event_handler, path, recursive=True) 

104 observer.start() 

105 self.observers.append(observer) 

106 

107 return len(self.observers) > 0 

108 

109 def stop_monitoring(self) -> None: 

110 """Stop file system monitoring.""" 

111 for observer in self.observers: 

112 observer.stop() 

113 observer.join() 

114 self.observers.clear() 

115 

116 def add_activity(self, event: ActivityEvent) -> None: 

117 """Add activity event to buffer.""" 

118 self.activity_buffer.append(event) 

119 

120 # Keep buffer size manageable 

121 if len(self.activity_buffer) > 1000: 

122 self.activity_buffer = self.activity_buffer[-500:] 

123 

124 def get_recent_activity(self, minutes: int = 30) -> list[ActivityEvent]: 

125 """Get recent activity within specified minutes.""" 

126 cutoff = datetime.now() - timedelta(minutes=minutes) 

127 cutoff_str = cutoff.isoformat() 

128 

129 return [ 

130 event for event in self.activity_buffer if event.timestamp >= cutoff_str 

131 ] 

132 

133 def get_active_files(self, minutes: int = 60) -> list[dict[str, Any]]: 

134 """Get files actively being worked on.""" 

135 recent_events = self.get_recent_activity(minutes) 

136 file_activity = defaultdict(list) 

137 

138 for event in recent_events: 

139 if event.event_type == "file_change" and "file_path" in event.details: 

140 file_path = event.details["file_path"] 

141 file_activity[file_path].append(event) 

142 

143 # Score files by activity frequency and recency 

144 active_files = [] 

145 for file_path, events in file_activity.items(): 

146 score = len(events) 

147 latest_event = max(events, key=lambda e: e.timestamp) 

148 

149 # Boost score for recent activity 

150 time_diff = datetime.now() - datetime.fromisoformat(latest_event.timestamp) 

151 if time_diff.total_seconds() < 300: # 5 minutes 

152 score *= 2 

153 

154 active_files.append( 

155 { 

156 "file_path": file_path, 

157 "activity_score": score, 

158 "event_count": len(events), 

159 "last_activity": latest_event.timestamp, 

160 "project_path": latest_event.project_path, 

161 }, 

162 ) 

163 

164 return sorted(active_files, key=lambda x: x["activity_score"], reverse=True) 

165 

166 

167class IDEFileHandler(FileSystemEventHandler): 

168 """Handles file system events for IDE monitoring.""" 

169 

170 def __init__(self, monitor: IDEActivityMonitor) -> None: 

171 self.monitor = monitor 

172 self.ignore_patterns = { 

173 ".git", 

174 "__pycache__", 

175 "node_modules", 

176 ".venv", 

177 "venv", 

178 ".pytest_cache", 

179 ".mypy_cache", 

180 ".ruff_cache", 

181 "dist", 

182 "build", 

183 ".DS_Store", 

184 ".idea", 

185 ".vscode/settings.json", 

186 } 

187 

188 def should_ignore(self, file_path: str) -> bool: 

189 """Check if file should be ignored.""" 

190 path = Path(file_path) 

191 

192 # Check ignore patterns 

193 for part in path.parts: 

194 if part in self.ignore_patterns: 

195 return True 

196 

197 # Check if it's a relevant file extension 

198 if path.suffix not in self.monitor.ide_extensions: 

199 return True 

200 

201 # Ignore temporary files 

202 return bool(path.name.startswith(".") or path.name.endswith("~")) 

203 

204 def on_modified(self, event) -> None: 

205 if event.is_directory or self.should_ignore(event.src_path): 

206 return 

207 

208 # Determine project path 

209 project_path = None 

210 src_path = Path(event.src_path) 

211 for proj_path in self.monitor.project_paths: 

212 if src_path.is_relative_to(proj_path): 

213 project_path = proj_path 

214 break 

215 

216 activity_event = ActivityEvent( 

217 timestamp=datetime.now().isoformat(), 

218 event_type="file_change", 

219 application="ide", 

220 details={ 

221 "file_path": event.src_path, 

222 "file_name": src_path.name, 

223 "file_extension": src_path.suffix, 

224 "change_type": "modified", 

225 }, 

226 project_path=project_path, 

227 relevance_score=0.8, 

228 ) 

229 

230 self.monitor.add_activity(activity_event) 

231 

232 

233class BrowserDocumentationMonitor: 

234 """Monitors browser activity for documentation sites.""" 

235 

236 def __init__(self) -> None: 

237 self.doc_domains = { 

238 "docs.python.org", 

239 "developer.mozilla.org", 

240 "docs.rs", 

241 "docs.oracle.com", 

242 "docs.microsoft.com", 

243 "docs.aws.amazon.com", 

244 "cloud.google.com", 

245 "docs.github.com", 

246 "docs.gitlab.com", 

247 "stackoverflow.com", 

248 "github.com", 

249 "fastapi.tiangolo.com", 

250 "pydantic-docs.helpmanual.io", 

251 "django-documentation", 

252 "flask.palletsprojects.com", 

253 "nodejs.org", 

254 "reactjs.org", 

255 "vuejs.org", 

256 "angular.io", 

257 "svelte.dev", 

258 } 

259 self.activity_buffer = [] 

260 self.browser_processes = set() 

261 

262 def get_browser_processes(self) -> list[dict[str, Any]]: 

263 """Get currently running browser processes.""" 

264 if not PSUTIL_AVAILABLE: 

265 return [] 

266 

267 browsers = [] 

268 browser_names = {"chrome", "firefox", "safari", "edge", "brave"} 

269 

270 try: 

271 for proc in psutil.process_iter(["pid", "name", "create_time"]): 

272 proc_name = proc.info["name"].lower() 

273 if any(browser in proc_name for browser in browser_names): 

274 browsers.append( 

275 { 

276 "pid": proc.info["pid"], 

277 "name": proc.info["name"], 

278 "create_time": proc.info["create_time"], 

279 }, 

280 ) 

281 except (psutil.NoSuchProcess, psutil.AccessDenied): 

282 pass 

283 

284 return browsers 

285 

286 def extract_documentation_context(self, url: str) -> dict[str, Any]: 

287 """Extract context from documentation URLs.""" 

288 context = {"domain": "", "technology": "", "topic": "", "relevance": 0.0} 

289 

290 try: 

291 from urllib.parse import urlparse 

292 

293 parsed = urlparse(url) 

294 domain = parsed.netloc 

295 path = parsed.path 

296 

297 context["domain"] = domain 

298 

299 # Determine technology and relevance 

300 if "python" in domain or "python" in path: 

301 context["technology"] = "python" 

302 context["relevance"] = 0.9 

303 elif ( 

304 "javascript" in path 

305 or "js" in path 

306 or domain in ["developer.mozilla.org", "nodejs.org"] 

307 ): 

308 context["technology"] = "javascript" 

309 context["relevance"] = 0.8 

310 elif "rust" in domain or "docs.rs" in domain: 

311 context["technology"] = "rust" 

312 context["relevance"] = 0.8 

313 elif any( 

314 framework in domain for framework in ["django", "flask", "fastapi"] 

315 ): 

316 context["technology"] = "python-web" 

317 context["relevance"] = 0.9 

318 elif any( 

319 framework in domain 

320 for framework in ["react", "vue", "angular", "svelte"] 

321 ): 

322 context["technology"] = "frontend" 

323 context["relevance"] = 0.8 

324 

325 # Extract topic from path 

326 path_parts = [p for p in path.split("/") if p] 

327 if path_parts: 

328 context["topic"] = ( 

329 path_parts[-1] 

330 if path_parts[-1] != "index.html" 

331 else path_parts[-2] 

332 if len(path_parts) > 1 

333 else "" 

334 ) 

335 

336 except Exception: 

337 pass 

338 

339 return context 

340 

341 def add_browser_activity(self, url: str, title: str = "") -> None: 

342 """Add browser navigation activity.""" 

343 context = self.extract_documentation_context(url) 

344 

345 activity_event = ActivityEvent( 

346 timestamp=datetime.now().isoformat(), 

347 event_type="browser_nav", 

348 application="browser", 

349 details={ 

350 "url": url, 

351 "title": title, 

352 "domain": context["domain"], 

353 "technology": context["technology"], 

354 "topic": context["topic"], 

355 }, 

356 relevance_score=context["relevance"], 

357 ) 

358 

359 self.activity_buffer.append(activity_event) 

360 

361 # Keep buffer manageable 

362 if len(self.activity_buffer) > 500: 

363 self.activity_buffer = self.activity_buffer[-250:] 

364 

365 

366class ApplicationFocusMonitor: 

367 """Monitors application focus changes.""" 

368 

369 def __init__(self) -> None: 

370 self.focus_history = [] 

371 self.current_app = None 

372 self.app_categories = { 

373 "ide": { 

374 "code", 

375 "pycharm", 

376 "vscode", 

377 "sublime", 

378 "atom", 

379 "vim", 

380 "emacs", 

381 "intellij", 

382 }, 

383 "browser": {"chrome", "firefox", "safari", "edge", "brave"}, 

384 "terminal": { 

385 "terminal", 

386 "term", 

387 "console", 

388 "cmd", 

389 "powershell", 

390 "zsh", 

391 "bash", 

392 }, 

393 "documentation": {"devdocs", "dash", "zeal"}, 

394 } 

395 

396 def get_focused_application(self) -> dict[str, Any] | None: 

397 """Get currently focused application.""" 

398 if not PSUTIL_AVAILABLE: 

399 return None 

400 

401 try: 

402 # This is a simplified version - would need platform-specific implementation 

403 # for full window focus detection 

404 for proc in psutil.process_iter(["pid", "name"]): 

405 proc_name = proc.info["name"].lower() 

406 category = self._categorize_app(proc_name) 

407 if category: 

408 return { 

409 "name": proc.info["name"], 

410 "category": category, 

411 "pid": proc.info["pid"], 

412 } 

413 except (psutil.NoSuchProcess, psutil.AccessDenied): 

414 pass 

415 

416 return None 

417 

418 def _categorize_app(self, app_name: str) -> str | None: 

419 """Categorize application by name.""" 

420 for category, keywords in self.app_categories.items(): 

421 if any(keyword in app_name for keyword in keywords): 

422 return category 

423 return None 

424 

425 def add_focus_event(self, app_info: dict[str, Any]) -> None: 

426 """Add application focus event.""" 

427 activity_event = ActivityEvent( 

428 timestamp=datetime.now().isoformat(), 

429 event_type="app_focus", 

430 application=app_info["name"], 

431 details={"category": app_info["category"], "pid": app_info["pid"]}, 

432 relevance_score=0.6 if app_info["category"] in ["ide", "terminal"] else 0.3, 

433 ) 

434 

435 self.focus_history.append(activity_event) 

436 

437 # Keep history manageable 

438 if len(self.focus_history) > 200: 

439 self.focus_history = self.focus_history[-100:] 

440 

441 

442class ActivityDatabase: 

443 """SQLite database for storing activity events.""" 

444 

445 def __init__(self, db_path: str) -> None: 

446 self.db_path = db_path 

447 self._init_database() 

448 

449 def _init_database(self) -> None: 

450 """Initialize database tables.""" 

451 with sqlite3.connect(self.db_path) as conn: 

452 conn.execute(""" 

453 CREATE TABLE IF NOT EXISTS activity_events ( 

454 id INTEGER PRIMARY KEY AUTOINCREMENT, 

455 timestamp TEXT NOT NULL, 

456 event_type TEXT NOT NULL, 

457 application TEXT NOT NULL, 

458 details TEXT NOT NULL, 

459 project_path TEXT, 

460 relevance_score REAL, 

461 created_at TEXT DEFAULT CURRENT_TIMESTAMP 

462 ) 

463 """) 

464 

465 conn.execute(""" 

466 CREATE INDEX IF NOT EXISTS idx_timestamp ON activity_events(timestamp) 

467 """) 

468 

469 conn.execute(""" 

470 CREATE INDEX IF NOT EXISTS idx_event_type ON activity_events(event_type) 

471 """) 

472 

473 def store_event(self, event: ActivityEvent) -> None: 

474 """Store activity event in database.""" 

475 with sqlite3.connect(self.db_path) as conn: 

476 conn.execute( 

477 """ 

478 INSERT INTO activity_events 

479 (timestamp, event_type, application, details, project_path, relevance_score) 

480 VALUES (?, ?, ?, ?, ?, ?) 

481 """, 

482 ( 

483 event.timestamp, 

484 event.event_type, 

485 event.application, 

486 json.dumps(event.details), 

487 event.project_path, 

488 event.relevance_score, 

489 ), 

490 ) 

491 

492 def get_events( 

493 self, 

494 start_time: str | None = None, 

495 end_time: str | None = None, 

496 event_types: list[str] | None = None, 

497 limit: int = 100, 

498 ) -> list[ActivityEvent]: 

499 """Retrieve activity events from database.""" 

500 with sqlite3.connect(self.db_path) as conn: 

501 query = "SELECT * FROM activity_events WHERE 1=1" 

502 params = [] 

503 

504 if start_time: 

505 query += " AND timestamp >= ?" 

506 params.append(start_time) 

507 

508 if end_time: 

509 query += " AND timestamp <= ?" 

510 params.append(end_time) 

511 

512 if event_types: 

513 placeholders = ",".join("?" * len(event_types)) 

514 query += f" AND event_type IN ({placeholders})" 

515 params.extend(event_types) 

516 

517 query += " ORDER BY timestamp DESC LIMIT ?" 

518 params.append(limit) 

519 

520 cursor = conn.execute(query, params) 

521 rows = cursor.fetchall() 

522 

523 events = [] 

524 for row in rows: 

525 events.append( 

526 ActivityEvent( 

527 timestamp=row[1], 

528 event_type=row[2], 

529 application=row[3], 

530 details=json.loads(row[4]), 

531 project_path=row[5], 

532 relevance_score=row[6] or 0.0, 

533 ), 

534 ) 

535 

536 return events 

537 

538 def cleanup_old_events(self, days_to_keep: int = 30): 

539 """Remove old activity events.""" 

540 cutoff = datetime.now() - timedelta(days=days_to_keep) 

541 cutoff_str = cutoff.isoformat() 

542 

543 with sqlite3.connect(self.db_path) as conn: 

544 cursor = conn.execute( 

545 "DELETE FROM activity_events WHERE timestamp < ?", 

546 (cutoff_str,), 

547 ) 

548 return cursor.rowcount 

549 

550 

551class ApplicationMonitor: 

552 """Main application monitoring coordinator.""" 

553 

554 def __init__(self, data_dir: str, project_paths: list[str] | None = None) -> None: 

555 self.data_dir = Path(data_dir) 

556 self.data_dir.mkdir(parents=True, exist_ok=True) 

557 

558 self.project_paths = project_paths or [] 

559 self.db = ActivityDatabase(str(self.data_dir / "activity.db")) 

560 

561 self.ide_monitor = IDEActivityMonitor(self.project_paths) 

562 self.browser_monitor = BrowserDocumentationMonitor() 

563 self.focus_monitor = ApplicationFocusMonitor() 

564 

565 self.monitoring_active = False 

566 self._monitoring_task = None 

567 

568 async def start_monitoring(self): 

569 """Start all monitoring components.""" 

570 if self.monitoring_active: 

571 return None 

572 

573 self.monitoring_active = True 

574 

575 # Start IDE monitoring 

576 ide_started = self.ide_monitor.start_monitoring() 

577 

578 # Start background monitoring task 

579 self._monitoring_task = asyncio.create_task(self._monitoring_loop()) 

580 

581 return { 

582 "ide_monitoring": ide_started, 

583 "watchdog_available": WATCHDOG_AVAILABLE, 

584 "psutil_available": PSUTIL_AVAILABLE, 

585 "project_paths": self.project_paths, 

586 } 

587 

588 async def stop_monitoring(self) -> None: 

589 """Stop all monitoring.""" 

590 self.monitoring_active = False 

591 

592 if self._monitoring_task: 

593 self._monitoring_task.cancel() 

594 with contextlib.suppress(asyncio.CancelledError): 

595 await self._monitoring_task 

596 

597 self.ide_monitor.stop_monitoring() 

598 

599 async def _monitoring_loop(self) -> None: 

600 """Background monitoring loop.""" 

601 while self.monitoring_active: 

602 try: 

603 # Check application focus 

604 focused_app = self.focus_monitor.get_focused_application() 

605 if focused_app and focused_app != self.focus_monitor.current_app: 

606 self.focus_monitor.add_focus_event(focused_app) 

607 self.focus_monitor.current_app = focused_app 

608 

609 # Persist buffered IDE events 

610 for event in self.ide_monitor.activity_buffer[-10:]: # Last 10 events 

611 self.db.store_event(event) 

612 

613 # Persist buffered browser events 

614 for event in self.browser_monitor.activity_buffer[-10:]: 

615 self.db.store_event(event) 

616 

617 # Persist focus events 

618 for event in self.focus_monitor.focus_history[-5:]: 

619 self.db.store_event(event) 

620 

621 await asyncio.sleep(30) # Check every 30 seconds 

622 

623 except asyncio.CancelledError: 

624 break 

625 except Exception as e: 

626 # Log error but continue monitoring 

627 print(f"Monitoring error: {e}") 

628 await asyncio.sleep(60) 

629 

630 def get_activity_summary(self, hours: int = 2) -> dict[str, Any]: 

631 """Get activity summary for specified hours.""" 

632 start_time = (datetime.now() - timedelta(hours=hours)).isoformat() 

633 events = self.db.get_events(start_time=start_time, limit=500) 

634 

635 summary = { 

636 "total_events": len(events), 

637 "time_range_hours": hours, 

638 "event_types": defaultdict(int), 

639 "applications": defaultdict(int), 

640 "active_files": [], 

641 "documentation_sites": [], 

642 "average_relevance": 0.0, 

643 } 

644 

645 total_relevance = 0 

646 doc_sites = set() 

647 

648 for event in events: 

649 summary["event_types"][event.event_type] += 1 

650 summary["applications"][event.application] += 1 

651 total_relevance += event.relevance_score 

652 

653 if event.event_type == "browser_nav" and event.details.get("domain"): 

654 doc_sites.add(event.details["domain"]) 

655 

656 if events: 

657 summary["average_relevance"] = total_relevance / len(events) 

658 

659 summary["active_files"] = self.ide_monitor.get_active_files(hours * 60) 

660 summary["documentation_sites"] = list(doc_sites) 

661 

662 # Convert defaultdict to regular dict for JSON serialization 

663 summary["event_types"] = dict(summary["event_types"]) 

664 summary["applications"] = dict(summary["applications"]) 

665 

666 return summary 

667 

668 def get_context_insights(self, hours: int = 1) -> dict[str, Any]: 

669 """Get contextual insights from recent activity.""" 

670 start_time = (datetime.now() - timedelta(hours=hours)).isoformat() 

671 events = self.db.get_events(start_time=start_time, limit=200) 

672 

673 insights = { 

674 "primary_focus": None, 

675 "technologies_used": set(), 

676 "active_projects": set(), 

677 "documentation_topics": [], 

678 "productivity_score": 0.0, 

679 "context_switches": 0, 

680 } 

681 

682 if not events: 

683 return insights 

684 

685 # Analyze primary focus 

686 app_time = defaultdict(int) 

687 last_app = None 

688 

689 for event in events: 

690 app_time[event.application] += 1 

691 

692 # Count context switches 

693 if last_app and last_app != event.application: 

694 insights["context_switches"] += 1 

695 last_app = event.application 

696 

697 # Extract technologies 

698 if event.event_type == "file_change": 

699 ext = event.details.get("file_extension", "") 

700 if ext == ".py": 

701 insights["technologies_used"].add("python") 

702 elif ext in [".js", ".ts"]: 

703 insights["technologies_used"].add("javascript") 

704 elif ext == ".rs": 

705 insights["technologies_used"].add("rust") 

706 

707 # Extract projects 

708 if event.project_path: 

709 insights["active_projects"].add(event.project_path) 

710 

711 # Extract documentation topics 

712 if event.event_type == "browser_nav": 

713 topic = event.details.get("topic") 

714 technology = event.details.get("technology") 

715 if topic and technology: 

716 insights["documentation_topics"].append(f"{technology}: {topic}") 

717 

718 # Determine primary focus 

719 if app_time: 

720 insights["primary_focus"] = max(app_time.items(), key=lambda x: x[1])[0] 

721 

722 # Calculate productivity score based on relevant activity 

723 relevant_events = [e for e in events if e.relevance_score > 0.5] 

724 if events: 

725 insights["productivity_score"] = len(relevant_events) / len(events) 

726 

727 # Convert sets to lists for JSON serialization 

728 insights["technologies_used"] = list(insights["technologies_used"]) 

729 insights["active_projects"] = list(insights["active_projects"]) 

730 

731 return insights