Coverage for session_mgmt_mcp/app_monitor.py: 13.68%
310 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Application-Aware Context Monitoring for Session Management MCP Server.
4Monitors IDE activity and browser documentation to enrich session context.
5Excludes Slack/Discord as per Phase 4 requirements.
6"""
8import asyncio
9import contextlib
10import json
11import sqlite3
12from collections import defaultdict
13from dataclasses import dataclass
14from datetime import datetime, timedelta
15from pathlib import Path
16from typing import Any
18try:
19 from watchdog.events import FileSystemEventHandler
20 from watchdog.observers import Observer
22 WATCHDOG_AVAILABLE = True
23except ImportError:
24 WATCHDOG_AVAILABLE = False
26 # Create stub for FileSystemEventHandler when watchdog is not available
27 class FileSystemEventHandler:
28 pass
31try:
32 import psutil
34 PSUTIL_AVAILABLE = True
35except ImportError:
36 PSUTIL_AVAILABLE = False
39@dataclass
40class ActivityEvent:
41 """Represents a monitored activity event."""
43 timestamp: str
44 event_type: str # 'file_change', 'app_focus', 'browser_nav'
45 application: str
46 details: dict[str, Any]
47 project_path: str | None = None
48 relevance_score: float = 0.0
51class IDEActivityMonitor:
52 """Monitors IDE file changes and activity."""
54 def __init__(self, project_paths: list[str]) -> None:
55 self.project_paths = project_paths
56 self.observers = []
57 self.activity_buffer = []
58 self.last_activity = {}
59 self.ide_extensions = {
60 ".py",
61 ".js",
62 ".ts",
63 ".jsx",
64 ".tsx",
65 ".java",
66 ".cpp",
67 ".c",
68 ".h",
69 ".rs",
70 ".go",
71 ".php",
72 ".rb",
73 ".swift",
74 ".kt",
75 ".scala",
76 ".cs",
77 ".html",
78 ".css",
79 ".scss",
80 ".vue",
81 ".svelte",
82 ".json",
83 ".yaml",
84 ".yml",
85 ".toml",
86 ".ini",
87 ".md",
88 ".txt",
89 ".sql",
90 ".sh",
91 ".bat",
92 }
94 def start_monitoring(self):
95 """Start file system monitoring."""
96 if not WATCHDOG_AVAILABLE:
97 return False
99 for path in self.project_paths:
100 if Path(path).exists():
101 event_handler = IDEFileHandler(self)
102 observer = Observer()
103 observer.schedule(event_handler, path, recursive=True)
104 observer.start()
105 self.observers.append(observer)
107 return len(self.observers) > 0
109 def stop_monitoring(self) -> None:
110 """Stop file system monitoring."""
111 for observer in self.observers:
112 observer.stop()
113 observer.join()
114 self.observers.clear()
116 def add_activity(self, event: ActivityEvent) -> None:
117 """Add activity event to buffer."""
118 self.activity_buffer.append(event)
120 # Keep buffer size manageable
121 if len(self.activity_buffer) > 1000:
122 self.activity_buffer = self.activity_buffer[-500:]
124 def get_recent_activity(self, minutes: int = 30) -> list[ActivityEvent]:
125 """Get recent activity within specified minutes."""
126 cutoff = datetime.now() - timedelta(minutes=minutes)
127 cutoff_str = cutoff.isoformat()
129 return [
130 event for event in self.activity_buffer if event.timestamp >= cutoff_str
131 ]
133 def get_active_files(self, minutes: int = 60) -> list[dict[str, Any]]:
134 """Get files actively being worked on."""
135 recent_events = self.get_recent_activity(minutes)
136 file_activity = defaultdict(list)
138 for event in recent_events:
139 if event.event_type == "file_change" and "file_path" in event.details:
140 file_path = event.details["file_path"]
141 file_activity[file_path].append(event)
143 # Score files by activity frequency and recency
144 active_files = []
145 for file_path, events in file_activity.items():
146 score = len(events)
147 latest_event = max(events, key=lambda e: e.timestamp)
149 # Boost score for recent activity
150 time_diff = datetime.now() - datetime.fromisoformat(latest_event.timestamp)
151 if time_diff.total_seconds() < 300: # 5 minutes
152 score *= 2
154 active_files.append(
155 {
156 "file_path": file_path,
157 "activity_score": score,
158 "event_count": len(events),
159 "last_activity": latest_event.timestamp,
160 "project_path": latest_event.project_path,
161 },
162 )
164 return sorted(active_files, key=lambda x: x["activity_score"], reverse=True)
167class IDEFileHandler(FileSystemEventHandler):
168 """Handles file system events for IDE monitoring."""
170 def __init__(self, monitor: IDEActivityMonitor) -> None:
171 self.monitor = monitor
172 self.ignore_patterns = {
173 ".git",
174 "__pycache__",
175 "node_modules",
176 ".venv",
177 "venv",
178 ".pytest_cache",
179 ".mypy_cache",
180 ".ruff_cache",
181 "dist",
182 "build",
183 ".DS_Store",
184 ".idea",
185 ".vscode/settings.json",
186 }
188 def should_ignore(self, file_path: str) -> bool:
189 """Check if file should be ignored."""
190 path = Path(file_path)
192 # Check ignore patterns
193 for part in path.parts:
194 if part in self.ignore_patterns:
195 return True
197 # Check if it's a relevant file extension
198 if path.suffix not in self.monitor.ide_extensions:
199 return True
201 # Ignore temporary files
202 return bool(path.name.startswith(".") or path.name.endswith("~"))
204 def on_modified(self, event) -> None:
205 if event.is_directory or self.should_ignore(event.src_path):
206 return
208 # Determine project path
209 project_path = None
210 src_path = Path(event.src_path)
211 for proj_path in self.monitor.project_paths:
212 if src_path.is_relative_to(proj_path):
213 project_path = proj_path
214 break
216 activity_event = ActivityEvent(
217 timestamp=datetime.now().isoformat(),
218 event_type="file_change",
219 application="ide",
220 details={
221 "file_path": event.src_path,
222 "file_name": src_path.name,
223 "file_extension": src_path.suffix,
224 "change_type": "modified",
225 },
226 project_path=project_path,
227 relevance_score=0.8,
228 )
230 self.monitor.add_activity(activity_event)
233class BrowserDocumentationMonitor:
234 """Monitors browser activity for documentation sites."""
236 def __init__(self) -> None:
237 self.doc_domains = {
238 "docs.python.org",
239 "developer.mozilla.org",
240 "docs.rs",
241 "docs.oracle.com",
242 "docs.microsoft.com",
243 "docs.aws.amazon.com",
244 "cloud.google.com",
245 "docs.github.com",
246 "docs.gitlab.com",
247 "stackoverflow.com",
248 "github.com",
249 "fastapi.tiangolo.com",
250 "pydantic-docs.helpmanual.io",
251 "django-documentation",
252 "flask.palletsprojects.com",
253 "nodejs.org",
254 "reactjs.org",
255 "vuejs.org",
256 "angular.io",
257 "svelte.dev",
258 }
259 self.activity_buffer = []
260 self.browser_processes = set()
262 def get_browser_processes(self) -> list[dict[str, Any]]:
263 """Get currently running browser processes."""
264 if not PSUTIL_AVAILABLE:
265 return []
267 browsers = []
268 browser_names = {"chrome", "firefox", "safari", "edge", "brave"}
270 try:
271 for proc in psutil.process_iter(["pid", "name", "create_time"]):
272 proc_name = proc.info["name"].lower()
273 if any(browser in proc_name for browser in browser_names):
274 browsers.append(
275 {
276 "pid": proc.info["pid"],
277 "name": proc.info["name"],
278 "create_time": proc.info["create_time"],
279 },
280 )
281 except (psutil.NoSuchProcess, psutil.AccessDenied):
282 pass
284 return browsers
286 def extract_documentation_context(self, url: str) -> dict[str, Any]:
287 """Extract context from documentation URLs."""
288 context = {"domain": "", "technology": "", "topic": "", "relevance": 0.0}
290 try:
291 from urllib.parse import urlparse
293 parsed = urlparse(url)
294 domain = parsed.netloc
295 path = parsed.path
297 context["domain"] = domain
299 # Determine technology and relevance
300 if "python" in domain or "python" in path:
301 context["technology"] = "python"
302 context["relevance"] = 0.9
303 elif (
304 "javascript" in path
305 or "js" in path
306 or domain in ["developer.mozilla.org", "nodejs.org"]
307 ):
308 context["technology"] = "javascript"
309 context["relevance"] = 0.8
310 elif "rust" in domain or "docs.rs" in domain:
311 context["technology"] = "rust"
312 context["relevance"] = 0.8
313 elif any(
314 framework in domain for framework in ["django", "flask", "fastapi"]
315 ):
316 context["technology"] = "python-web"
317 context["relevance"] = 0.9
318 elif any(
319 framework in domain
320 for framework in ["react", "vue", "angular", "svelte"]
321 ):
322 context["technology"] = "frontend"
323 context["relevance"] = 0.8
325 # Extract topic from path
326 path_parts = [p for p in path.split("/") if p]
327 if path_parts:
328 context["topic"] = (
329 path_parts[-1]
330 if path_parts[-1] != "index.html"
331 else path_parts[-2]
332 if len(path_parts) > 1
333 else ""
334 )
336 except Exception:
337 pass
339 return context
341 def add_browser_activity(self, url: str, title: str = "") -> None:
342 """Add browser navigation activity."""
343 context = self.extract_documentation_context(url)
345 activity_event = ActivityEvent(
346 timestamp=datetime.now().isoformat(),
347 event_type="browser_nav",
348 application="browser",
349 details={
350 "url": url,
351 "title": title,
352 "domain": context["domain"],
353 "technology": context["technology"],
354 "topic": context["topic"],
355 },
356 relevance_score=context["relevance"],
357 )
359 self.activity_buffer.append(activity_event)
361 # Keep buffer manageable
362 if len(self.activity_buffer) > 500:
363 self.activity_buffer = self.activity_buffer[-250:]
366class ApplicationFocusMonitor:
367 """Monitors application focus changes."""
369 def __init__(self) -> None:
370 self.focus_history = []
371 self.current_app = None
372 self.app_categories = {
373 "ide": {
374 "code",
375 "pycharm",
376 "vscode",
377 "sublime",
378 "atom",
379 "vim",
380 "emacs",
381 "intellij",
382 },
383 "browser": {"chrome", "firefox", "safari", "edge", "brave"},
384 "terminal": {
385 "terminal",
386 "term",
387 "console",
388 "cmd",
389 "powershell",
390 "zsh",
391 "bash",
392 },
393 "documentation": {"devdocs", "dash", "zeal"},
394 }
396 def get_focused_application(self) -> dict[str, Any] | None:
397 """Get currently focused application."""
398 if not PSUTIL_AVAILABLE:
399 return None
401 try:
402 # This is a simplified version - would need platform-specific implementation
403 # for full window focus detection
404 for proc in psutil.process_iter(["pid", "name"]):
405 proc_name = proc.info["name"].lower()
406 category = self._categorize_app(proc_name)
407 if category:
408 return {
409 "name": proc.info["name"],
410 "category": category,
411 "pid": proc.info["pid"],
412 }
413 except (psutil.NoSuchProcess, psutil.AccessDenied):
414 pass
416 return None
418 def _categorize_app(self, app_name: str) -> str | None:
419 """Categorize application by name."""
420 for category, keywords in self.app_categories.items():
421 if any(keyword in app_name for keyword in keywords):
422 return category
423 return None
425 def add_focus_event(self, app_info: dict[str, Any]) -> None:
426 """Add application focus event."""
427 activity_event = ActivityEvent(
428 timestamp=datetime.now().isoformat(),
429 event_type="app_focus",
430 application=app_info["name"],
431 details={"category": app_info["category"], "pid": app_info["pid"]},
432 relevance_score=0.6 if app_info["category"] in ["ide", "terminal"] else 0.3,
433 )
435 self.focus_history.append(activity_event)
437 # Keep history manageable
438 if len(self.focus_history) > 200:
439 self.focus_history = self.focus_history[-100:]
442class ActivityDatabase:
443 """SQLite database for storing activity events."""
445 def __init__(self, db_path: str) -> None:
446 self.db_path = db_path
447 self._init_database()
449 def _init_database(self) -> None:
450 """Initialize database tables."""
451 with sqlite3.connect(self.db_path) as conn:
452 conn.execute("""
453 CREATE TABLE IF NOT EXISTS activity_events (
454 id INTEGER PRIMARY KEY AUTOINCREMENT,
455 timestamp TEXT NOT NULL,
456 event_type TEXT NOT NULL,
457 application TEXT NOT NULL,
458 details TEXT NOT NULL,
459 project_path TEXT,
460 relevance_score REAL,
461 created_at TEXT DEFAULT CURRENT_TIMESTAMP
462 )
463 """)
465 conn.execute("""
466 CREATE INDEX IF NOT EXISTS idx_timestamp ON activity_events(timestamp)
467 """)
469 conn.execute("""
470 CREATE INDEX IF NOT EXISTS idx_event_type ON activity_events(event_type)
471 """)
473 def store_event(self, event: ActivityEvent) -> None:
474 """Store activity event in database."""
475 with sqlite3.connect(self.db_path) as conn:
476 conn.execute(
477 """
478 INSERT INTO activity_events
479 (timestamp, event_type, application, details, project_path, relevance_score)
480 VALUES (?, ?, ?, ?, ?, ?)
481 """,
482 (
483 event.timestamp,
484 event.event_type,
485 event.application,
486 json.dumps(event.details),
487 event.project_path,
488 event.relevance_score,
489 ),
490 )
492 def get_events(
493 self,
494 start_time: str | None = None,
495 end_time: str | None = None,
496 event_types: list[str] | None = None,
497 limit: int = 100,
498 ) -> list[ActivityEvent]:
499 """Retrieve activity events from database."""
500 with sqlite3.connect(self.db_path) as conn:
501 query = "SELECT * FROM activity_events WHERE 1=1"
502 params = []
504 if start_time:
505 query += " AND timestamp >= ?"
506 params.append(start_time)
508 if end_time:
509 query += " AND timestamp <= ?"
510 params.append(end_time)
512 if event_types:
513 placeholders = ",".join("?" * len(event_types))
514 query += f" AND event_type IN ({placeholders})"
515 params.extend(event_types)
517 query += " ORDER BY timestamp DESC LIMIT ?"
518 params.append(limit)
520 cursor = conn.execute(query, params)
521 rows = cursor.fetchall()
523 events = []
524 for row in rows:
525 events.append(
526 ActivityEvent(
527 timestamp=row[1],
528 event_type=row[2],
529 application=row[3],
530 details=json.loads(row[4]),
531 project_path=row[5],
532 relevance_score=row[6] or 0.0,
533 ),
534 )
536 return events
538 def cleanup_old_events(self, days_to_keep: int = 30):
539 """Remove old activity events."""
540 cutoff = datetime.now() - timedelta(days=days_to_keep)
541 cutoff_str = cutoff.isoformat()
543 with sqlite3.connect(self.db_path) as conn:
544 cursor = conn.execute(
545 "DELETE FROM activity_events WHERE timestamp < ?",
546 (cutoff_str,),
547 )
548 return cursor.rowcount
551class ApplicationMonitor:
552 """Main application monitoring coordinator."""
554 def __init__(self, data_dir: str, project_paths: list[str] | None = None) -> None:
555 self.data_dir = Path(data_dir)
556 self.data_dir.mkdir(parents=True, exist_ok=True)
558 self.project_paths = project_paths or []
559 self.db = ActivityDatabase(str(self.data_dir / "activity.db"))
561 self.ide_monitor = IDEActivityMonitor(self.project_paths)
562 self.browser_monitor = BrowserDocumentationMonitor()
563 self.focus_monitor = ApplicationFocusMonitor()
565 self.monitoring_active = False
566 self._monitoring_task = None
568 async def start_monitoring(self):
569 """Start all monitoring components."""
570 if self.monitoring_active:
571 return None
573 self.monitoring_active = True
575 # Start IDE monitoring
576 ide_started = self.ide_monitor.start_monitoring()
578 # Start background monitoring task
579 self._monitoring_task = asyncio.create_task(self._monitoring_loop())
581 return {
582 "ide_monitoring": ide_started,
583 "watchdog_available": WATCHDOG_AVAILABLE,
584 "psutil_available": PSUTIL_AVAILABLE,
585 "project_paths": self.project_paths,
586 }
588 async def stop_monitoring(self) -> None:
589 """Stop all monitoring."""
590 self.monitoring_active = False
592 if self._monitoring_task:
593 self._monitoring_task.cancel()
594 with contextlib.suppress(asyncio.CancelledError):
595 await self._monitoring_task
597 self.ide_monitor.stop_monitoring()
599 async def _monitoring_loop(self) -> None:
600 """Background monitoring loop."""
601 while self.monitoring_active:
602 try:
603 # Check application focus
604 focused_app = self.focus_monitor.get_focused_application()
605 if focused_app and focused_app != self.focus_monitor.current_app:
606 self.focus_monitor.add_focus_event(focused_app)
607 self.focus_monitor.current_app = focused_app
609 # Persist buffered IDE events
610 for event in self.ide_monitor.activity_buffer[-10:]: # Last 10 events
611 self.db.store_event(event)
613 # Persist buffered browser events
614 for event in self.browser_monitor.activity_buffer[-10:]:
615 self.db.store_event(event)
617 # Persist focus events
618 for event in self.focus_monitor.focus_history[-5:]:
619 self.db.store_event(event)
621 await asyncio.sleep(30) # Check every 30 seconds
623 except asyncio.CancelledError:
624 break
625 except Exception as e:
626 # Log error but continue monitoring
627 print(f"Monitoring error: {e}")
628 await asyncio.sleep(60)
630 def get_activity_summary(self, hours: int = 2) -> dict[str, Any]:
631 """Get activity summary for specified hours."""
632 start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
633 events = self.db.get_events(start_time=start_time, limit=500)
635 summary = {
636 "total_events": len(events),
637 "time_range_hours": hours,
638 "event_types": defaultdict(int),
639 "applications": defaultdict(int),
640 "active_files": [],
641 "documentation_sites": [],
642 "average_relevance": 0.0,
643 }
645 total_relevance = 0
646 doc_sites = set()
648 for event in events:
649 summary["event_types"][event.event_type] += 1
650 summary["applications"][event.application] += 1
651 total_relevance += event.relevance_score
653 if event.event_type == "browser_nav" and event.details.get("domain"):
654 doc_sites.add(event.details["domain"])
656 if events:
657 summary["average_relevance"] = total_relevance / len(events)
659 summary["active_files"] = self.ide_monitor.get_active_files(hours * 60)
660 summary["documentation_sites"] = list(doc_sites)
662 # Convert defaultdict to regular dict for JSON serialization
663 summary["event_types"] = dict(summary["event_types"])
664 summary["applications"] = dict(summary["applications"])
666 return summary
668 def get_context_insights(self, hours: int = 1) -> dict[str, Any]:
669 """Get contextual insights from recent activity."""
670 start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
671 events = self.db.get_events(start_time=start_time, limit=200)
673 insights = {
674 "primary_focus": None,
675 "technologies_used": set(),
676 "active_projects": set(),
677 "documentation_topics": [],
678 "productivity_score": 0.0,
679 "context_switches": 0,
680 }
682 if not events:
683 return insights
685 # Analyze primary focus
686 app_time = defaultdict(int)
687 last_app = None
689 for event in events:
690 app_time[event.application] += 1
692 # Count context switches
693 if last_app and last_app != event.application:
694 insights["context_switches"] += 1
695 last_app = event.application
697 # Extract technologies
698 if event.event_type == "file_change":
699 ext = event.details.get("file_extension", "")
700 if ext == ".py":
701 insights["technologies_used"].add("python")
702 elif ext in [".js", ".ts"]:
703 insights["technologies_used"].add("javascript")
704 elif ext == ".rs":
705 insights["technologies_used"].add("rust")
707 # Extract projects
708 if event.project_path:
709 insights["active_projects"].add(event.project_path)
711 # Extract documentation topics
712 if event.event_type == "browser_nav":
713 topic = event.details.get("topic")
714 technology = event.details.get("technology")
715 if topic and technology:
716 insights["documentation_topics"].append(f"{technology}: {topic}")
718 # Determine primary focus
719 if app_time:
720 insights["primary_focus"] = max(app_time.items(), key=lambda x: x[1])[0]
722 # Calculate productivity score based on relevant activity
723 relevant_events = [e for e in events if e.relevance_score > 0.5]
724 if events:
725 insights["productivity_score"] = len(relevant_events) / len(events)
727 # Convert sets to lists for JSON serialization
728 insights["technologies_used"] = list(insights["technologies_used"])
729 insights["active_projects"] = list(insights["active_projects"])
731 return insights