Coverage for session_buddy / app_monitor.py: 66.60%
416 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Application-Aware Context Monitoring for Session Management MCP Server.
4Monitors IDE activity and browser documentation to enrich session context.
5Excludes Slack/Discord as per Phase 4 requirements.
6"""
8import asyncio
9import json
10import operator
11import sqlite3
12from collections import defaultdict
13from contextlib import suppress
14from dataclasses import dataclass
15from datetime import datetime, timedelta
16from pathlib import Path
17from typing import TYPE_CHECKING, Any
19if TYPE_CHECKING:
20 import psutil
21 # FileSystemEventHandler and Observer imported at runtime below
23try:
24 from watchdog.events import FileSystemEventHandler
25 from watchdog.observers import Observer
27 WATCHDOG_AVAILABLE = True
28except ImportError:
29 WATCHDOG_AVAILABLE = False
31 # Create stub for FileSystemEventHandler when watchdog is not available
32 class FileSystemEventHandler: # type: ignore[no-redef]
33 """Stub base class when watchdog is not available."""
35 def __init__(self) -> None: # type: ignore[no-redef]
36 super().__init__() # type: ignore[misc]
38 # Create stub for Observer when watchdog is not available
39 class Observer: # type: ignore[no-redef]
40 def __init__(self) -> None:
41 pass
43 def schedule(
44 self, event_handler: Any, path: str, recursive: bool = True
45 ) -> None:
46 pass
48 def start(self) -> None:
49 pass
51 def stop(self) -> None:
52 pass
54 def join(self) -> None:
55 pass
58try:
59 import psutil
61 PSUTIL_AVAILABLE = True
62except ImportError:
63 PSUTIL_AVAILABLE = False
66@dataclass
67class ActivityEvent:
68 """Represents a monitored activity event."""
70 timestamp: str
71 event_type: str # 'file_change', 'app_focus', 'browser_nav'
72 application: str
73 details: dict[str, Any]
74 project_path: str | None = None
75 relevance_score: float = 0.0
78class ProjectActivityMonitor:
79 """Monitors project activity including file changes and application focus."""
81 def __init__(self, project_paths: list[str] | None = None) -> None:
82 """Initialize activity monitor."""
83 from session_buddy.settings import get_settings
85 self._settings = get_settings()
86 self.project_paths = project_paths or []
87 self.db_path = str(Path.home() / ".claude" / "data" / "activity.db")
88 self.observers: list[Any] = []
89 self.activity_buffer: list[ActivityEvent] = []
90 self.last_activity: dict[str, Any] = {}
91 self.ide_extensions = {
92 ".py",
93 ".js",
94 ".ts",
95 ".jsx",
96 ".tsx",
97 ".java",
98 ".cpp",
99 ".c",
100 ".h",
101 ".rs",
102 ".go",
103 ".php",
104 ".rb",
105 ".swift",
106 ".kt",
107 ".scala",
108 ".cs",
109 ".html",
110 ".css",
111 ".scss",
112 ".vue",
113 ".svelte",
114 ".json",
115 ".yaml",
116 }
118 def _init_database(self) -> None:
119 """Initialize database tables."""
120 with sqlite3.connect(self.db_path) as conn:
121 conn.execute("""
122 CREATE TABLE IF NOT EXISTS activity_events (
123 id INTEGER PRIMARY KEY AUTOINCREMENT,
124 timestamp TEXT NOT NULL,
125 event_type TEXT NOT NULL,
126 application TEXT NOT NULL,
127 details TEXT NOT NULL,
128 project_path TEXT,
129 relevance_score REAL DEFAULT 0.0,
130 created_at TEXT DEFAULT CURRENT_TIMESTAMP
131 )
132 """)
134 def start_monitoring(self) -> bool:
135 """Start file system monitoring."""
136 if not WATCHDOG_AVAILABLE: 136 ↛ 139line 136 didn't jump to line 139 because the condition on line 136 was always true
137 return False
139 if WATCHDOG_AVAILABLE:
140 for path in self.project_paths:
141 if Path(path).exists():
142 event_handler = IDEFileHandler(self)
143 observer = Observer()
144 observer.schedule(event_handler, path, recursive=True)
145 observer.start()
146 self.observers.append(observer)
148 return len(self.observers) > 0
150 def stop_monitoring(self) -> None:
151 """Stop file system monitoring."""
152 if WATCHDOG_AVAILABLE: 152 ↛ 156line 152 didn't jump to line 156 because the condition on line 152 was always true
153 for observer in self.observers:
154 observer.stop()
155 observer.join()
156 self.observers.clear()
158 def add_activity(self, event: ActivityEvent) -> None:
159 """Add activity event to buffer."""
160 self.activity_buffer.append(event)
162 # Keep buffer size manageable
163 if len(self.activity_buffer) > 1000:
164 self.activity_buffer = self.activity_buffer[-500:]
166 def get_recent_activity(self, minutes: int = 30) -> list[ActivityEvent]:
167 """Get recent activity within specified minutes."""
168 cutoff = datetime.now() - timedelta(minutes=minutes)
169 cutoff_str = cutoff.isoformat()
171 return [
172 event for event in self.activity_buffer if event.timestamp >= cutoff_str
173 ]
175 def get_active_files(self, minutes: int = 60) -> list[dict[str, Any]]:
176 """Get files actively being worked on."""
177 recent_events = self.get_recent_activity(minutes)
178 file_activity: dict[str, list[ActivityEvent]] = defaultdict(list)
180 for event in recent_events:
181 if event.event_type == "file_change" and "file_path" in event.details: 181 ↛ 180line 181 didn't jump to line 180 because the condition on line 181 was always true
182 file_path = event.details["file_path"]
183 file_activity[file_path].append(event)
185 # Score files by activity frequency and recency
186 active_files = []
187 for file_path, events in file_activity.items():
188 score = len(events)
189 latest_event = max(events, key=lambda e: e.timestamp)
191 # Boost score for recent activity
192 time_diff = datetime.now() - datetime.fromisoformat(latest_event.timestamp)
193 if time_diff.total_seconds() < 300: # 5 minutes 193 ↛ 196line 193 didn't jump to line 196 because the condition on line 193 was always true
194 score *= 2
196 active_files.append(
197 {
198 "file_path": file_path,
199 "activity_score": score,
200 "event_count": len(events),
201 "last_activity": latest_event.timestamp,
202 "project_path": latest_event.project_path,
203 },
204 )
206 return sorted(
207 active_files, key=operator.itemgetter("activity_score"), reverse=True
208 )
211class IDEFileHandler(FileSystemEventHandler): # type: ignore[misc]
212 """Handles file system events for IDE monitoring."""
214 def __init__(self, monitor: ProjectActivityMonitor) -> None:
215 self.monitor = monitor
216 # Merge settings-driven ignore dirs
217 self.ignore_patterns = set(self.monitor._settings.filesystem_ignore_dirs)
218 self.ignore_patterns.add(".vscode/settings.json")
220 # Critical file patterns for smart thresholding
221 self.critical_patterns = {
222 "auth": ["auth", "login", "session", "jwt", "oauth"],
223 "database": ["db", "database", "migration", "schema"],
224 "config": ["config", "settings", "env"],
225 "api": ["api", "endpoint", "route", "controller"],
226 "security": ["security", "encrypt", "hash", "crypto"],
227 }
228 self._recent_ttl_seconds = self.monitor._settings.filesystem_dedupe_ttl_seconds
230 def should_ignore(self, file_path: str) -> bool:
231 """Check if file should be ignored."""
232 path = Path(file_path)
234 # Check ignore patterns
235 for part in path.parts:
236 if part in self.ignore_patterns: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 return True
239 # Check if it's a relevant file extension
240 if path.suffix not in self.monitor.ide_extensions: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 return True
243 # Ignore large or temporary files
244 max_size = self.monitor._settings.filesystem_max_file_size_bytes
245 with suppress(Exception):
246 if path.exists() and path.is_file() and path.stat().st_size > max_size: 246 ↛ 249line 246 didn't jump to line 249
247 return True
249 return bool(path.name.startswith(".") or path.name.endswith("~"))
251 def on_modified(self, event: Any) -> None:
252 """Handle file modification events."""
253 if event.is_directory or self.should_ignore(event.src_path):
254 return
256 src_path = Path(event.src_path)
257 project_path = self._determine_project_path(src_path)
259 activity_event = self._create_activity_event(src_path, project_path)
260 self.monitor.add_activity(activity_event)
262 self._try_entity_extraction(activity_event, src_path, project_path)
264 def _determine_project_path(self, src_path: Path) -> str | None:
265 """Determine which project this file belongs to."""
266 for proj_path in self.monitor.project_paths:
267 if src_path.is_relative_to(proj_path):
268 return proj_path
269 return None
271 def _create_activity_event(
272 self,
273 src_path: Path,
274 project_path: str | None,
275 ) -> ActivityEvent:
276 """Create an activity event for file modification."""
277 return ActivityEvent(
278 timestamp=datetime.now().isoformat(),
279 event_type="file_change",
280 application="ide",
281 details={
282 "file_path": str(src_path),
283 "file_name": src_path.name,
284 "file_extension": src_path.suffix,
285 "change_type": "modified",
286 },
287 project_path=project_path,
288 relevance_score=self._estimate_relevance(src_path),
289 )
291 def _try_entity_extraction(
292 self,
293 activity_event: ActivityEvent,
294 src_path: Path,
295 project_path: str | None,
296 ) -> None:
297 """Try to extract entities if feature flags are enabled."""
298 from session_buddy.config.feature_flags import get_feature_flags
300 flags = get_feature_flags()
301 if not (
302 flags.enable_filesystem_extraction and flags.enable_llm_entity_extraction
303 ):
304 return
306 if not self._passes_threshold(activity_event):
307 return
309 if self._recently_processed_persisted(str(src_path)):
310 return
312 self._fire_and_forget_extraction(activity_event, src_path, project_path)
314 def _fire_and_forget_extraction(
315 self,
316 activity_event: ActivityEvent,
317 src_path: Path,
318 project_path: str | None,
319 ) -> None:
320 """Fire-and-forget entity extraction task."""
321 with suppress(Exception):
322 import asyncio as _asyncio
324 from session_buddy.memory.file_context import build_file_context
325 from session_buddy.tools.entity_extraction_tools import (
326 extract_and_store_memory as _extract,
327 )
329 ctx = build_file_context(str(src_path))
330 snippet = ctx.get("snippet", "")
332 _asyncio.create_task(
333 _extract(
334 user_input=f"Updated file: {src_path.name}\nContext: {ctx['metadata']}",
335 ai_output=snippet,
336 project=project_path,
337 activity_score=activity_event.relevance_score,
338 )
339 )
341 def _recently_processed_persisted(self, file_path: str) -> bool:
342 """Check and update persistent recent-extractions cache.
344 Returns True if the given file was processed within the TTL window.
345 """
346 try:
347 # Use the same SQLite DB as activity log
348 import sqlite3 as _sql
349 from time import time as _now
351 self._ensure_recent_table()
352 with _sql.connect(self.monitor.db_path) as conn:
353 cur = conn.cursor()
354 cur.execute(
355 "SELECT last_extracted FROM recent_extractions WHERE file_path=?",
356 (file_path,),
357 )
358 row = cur.fetchone()
359 now = int(_now())
360 if row:
361 last = int(row[0])
362 if now - last < self._recent_ttl_seconds:
363 return True
364 # Update timestamp
365 cur.execute(
366 "UPDATE recent_extractions SET last_extracted=? WHERE file_path=?",
367 (now, file_path),
368 )
369 else:
370 cur.execute(
371 "INSERT INTO recent_extractions (file_path, last_extracted) VALUES (?, ?)",
372 (file_path, now),
373 )
374 conn.commit()
375 except Exception:
376 # On any error, fall back to allowing processing to avoid missing events
377 return False
378 return False
380 def _ensure_recent_table(self) -> None:
381 """Ensure the persistent recent_extractions table exists."""
382 with suppress(Exception):
383 import sqlite3 as _sql
385 with _sql.connect(self.monitor.db_path) as conn:
386 conn.execute(
387 """
388 CREATE TABLE IF NOT EXISTS recent_extractions (
389 file_path TEXT PRIMARY KEY,
390 last_extracted INTEGER
391 )
392 """
393 )
395 def _estimate_relevance(self, path: Path) -> float:
396 name = path.name.lower()
397 for keywords in self.critical_patterns.values():
398 if any(k in name for k in keywords):
399 return 0.95
400 return 0.8
402 def _passes_threshold(self, event: ActivityEvent) -> bool:
403 # Base threshold
404 if event.relevance_score < 0.7:
405 return False
406 # Stricter threshold for non-critical files
407 name = event.details.get("file_name", "").lower()
408 critical = any(
409 any(k in name for k in kw) for kw in self.critical_patterns.values()
410 )
411 return not (not critical and event.relevance_score < 0.9)
414class BrowserDocumentationMonitor:
415 """Monitors browser activity for documentation sites."""
417 def __init__(self) -> None:
418 self.doc_domains = {
419 "docs.python.org",
420 "developer.mozilla.org",
421 "docs.rs",
422 "docs.oracle.com",
423 "docs.microsoft.com",
424 "docs.aws.amazon.com",
425 "cloud.google.com",
426 "docs.github.com",
427 "docs.gitlab.com",
428 "stackoverflow.com",
429 "github.com",
430 "fastapi.tiangolo.com",
431 "pydantic-docs.helpmanual.io",
432 "django-documentation",
433 "flask.palletsprojects.com",
434 "nodejs.org",
435 "reactjs.org",
436 "vuejs.org",
437 "angular.io",
438 "svelte.dev",
439 }
440 self.activity_buffer: list[ActivityEvent] = []
441 self.browser_processes: set[str] = set()
443 def get_browser_processes(self) -> list[dict[str, Any]]:
444 """Get currently running browser processes."""
445 if not PSUTIL_AVAILABLE: 445 ↛ 448line 445 didn't jump to line 448 because the condition on line 445 was always true
446 return []
448 browsers = []
449 browser_names = {"chrome", "firefox", "safari", "edge", "brave"}
451 try:
452 for proc in psutil.process_iter(["pid", "name", "create_time"]):
453 proc_name = proc.info["name"].lower()
454 if any(browser in proc_name for browser in browser_names):
455 browsers.append(
456 {
457 "pid": proc.info["pid"],
458 "name": proc.info["name"],
459 "create_time": proc.info["create_time"],
460 },
461 )
462 except (psutil.NoSuchProcess, psutil.AccessDenied):
463 pass
465 return browsers
467 def extract_documentation_context(self, url: str) -> dict[str, Any]:
468 """Extract context from documentation URLs."""
469 context = {"domain": "", "technology": "", "topic": "", "relevance": 0.0}
471 from contextlib import suppress
473 with suppress(ValueError, AttributeError):
474 from urllib.parse import urlparse
476 parsed = urlparse(url)
477 domain = parsed.netloc
478 path = parsed.path
480 context["domain"] = domain
481 technology, relevance = self._determine_technology(domain, path)
482 context["technology"] = technology
483 context["relevance"] = relevance
484 context["topic"] = self._extract_topic(path)
486 return context
488 def _determine_technology(self, domain: str, path: str) -> tuple[str, float]:
489 """Return technology label and relevance score for a domain/path."""
490 normalized_path = path.lower()
491 normalized_domain = domain.lower()
493 tech_rules = [
494 (
495 "python" in normalized_domain or "python" in normalized_path,
496 "python",
497 0.9,
498 ),
499 (
500 "javascript" in normalized_path
501 or "js" in normalized_path
502 or normalized_domain in {"developer.mozilla.org", "nodejs.org"},
503 "javascript",
504 0.8,
505 ),
506 (
507 "rust" in normalized_domain or "docs.rs" in normalized_domain,
508 "rust",
509 0.8,
510 ),
511 (
512 any(
513 framework in normalized_domain
514 for framework in ("django", "flask", "fastapi")
515 ),
516 "python-web",
517 0.9,
518 ),
519 (
520 any(
521 framework in normalized_domain
522 for framework in ("react", "vue", "angular", "svelte")
523 ),
524 "frontend",
525 0.8,
526 ),
527 ]
529 for condition, label, relevance in tech_rules: 529 ↛ 533line 529 didn't jump to line 533 because the loop on line 529 didn't complete
530 if condition: 530 ↛ 529line 530 didn't jump to line 529 because the condition on line 530 was always true
531 return label, relevance
533 return "", 0.0
535 def _extract_topic(self, path: str) -> str:
536 """Derive topic from a URL path."""
537 path_parts = [p for p in path.split("/") if p]
538 if not path_parts: 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true
539 return ""
541 tail = path_parts[-1]
542 if tail == "index.html" and len(path_parts) > 1: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true
543 return path_parts[-2]
544 return tail
546 def add_browser_activity(self, url: str, title: str = "") -> None:
547 """Add browser navigation activity."""
548 context = self.extract_documentation_context(url)
550 activity_event = ActivityEvent(
551 timestamp=datetime.now().isoformat(),
552 event_type="browser_nav",
553 application="browser",
554 details={
555 "url": url,
556 "title": title,
557 "domain": context["domain"],
558 "technology": context["technology"],
559 "topic": context["topic"],
560 },
561 relevance_score=context["relevance"],
562 )
564 self.activity_buffer.append(activity_event)
566 # Keep buffer manageable
567 if len(self.activity_buffer) > 500: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true
568 self.activity_buffer = self.activity_buffer[-250:]
571class ApplicationFocusMonitor:
572 """Monitors application focus changes."""
574 def __init__(self) -> None:
575 self.focus_history: list[ActivityEvent] = []
576 self.current_app: str | None = None
577 self.app_categories = {
578 "ide": {
579 "code",
580 "pycharm",
581 "vscode",
582 "sublime",
583 "atom",
584 "vim",
585 "emacs",
586 "intellij",
587 },
588 "browser": {"chrome", "firefox", "safari", "edge", "brave"},
589 "terminal": {
590 "terminal",
591 "term",
592 "console",
593 "cmd",
594 "powershell",
595 "zsh",
596 "bash",
597 },
598 "documentation": {"devdocs", "dash", "zeal"},
599 }
601 def get_focused_application(self) -> dict[str, Any] | None:
602 """Get currently focused application."""
603 if not PSUTIL_AVAILABLE:
604 return None
606 try:
607 # This is a simplified version - would need platform-specific implementation
608 # for full window focus detection
609 for proc in psutil.process_iter(["pid", "name"]): 609 ↛ 621line 609 didn't jump to line 621 because the loop on line 609 didn't complete
610 proc_name = proc.info["name"].lower()
611 category = self._categorize_app(proc_name)
612 if category:
613 return {
614 "name": proc.info["name"],
615 "category": category,
616 "pid": proc.info["pid"],
617 }
618 except (psutil.NoSuchProcess, psutil.AccessDenied):
619 pass
621 return None
623 def _categorize_app(self, app_name: str) -> str | None:
624 """Categorize application by name."""
625 for category, keywords in self.app_categories.items():
626 if any(keyword in app_name for keyword in keywords):
627 return category
628 return None
630 def add_focus_event(self, app_info: dict[str, Any]) -> None:
631 """Add application focus event."""
632 activity_event = ActivityEvent(
633 timestamp=datetime.now().isoformat(),
634 event_type="app_focus",
635 application=app_info["name"],
636 details={"category": app_info["category"], "pid": app_info["pid"]},
637 relevance_score=0.6 if app_info["category"] in {"ide", "terminal"} else 0.3,
638 )
640 self.focus_history.append(activity_event)
642 # Keep history manageable
643 if len(self.focus_history) > 200: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true
644 self.focus_history = self.focus_history[-100:]
647class ActivityDatabase:
648 """SQLite database for storing activity events."""
650 def __init__(self, db_path: str) -> None:
651 self.db_path = db_path
652 self._init_database()
654 def _init_database(self) -> None:
655 """Initialize database tables."""
656 with sqlite3.connect(self.db_path) as conn:
657 conn.execute("""
658 CREATE TABLE IF NOT EXISTS activity_events (
659 id INTEGER PRIMARY KEY AUTOINCREMENT,
660 timestamp TEXT NOT NULL,
661 event_type TEXT NOT NULL,
662 application TEXT NOT NULL,
663 details TEXT NOT NULL,
664 project_path TEXT,
665 relevance_score REAL,
666 created_at TEXT DEFAULT CURRENT_TIMESTAMP
667 )
668 """)
670 conn.execute("""
671 CREATE INDEX IF NOT EXISTS idx_timestamp ON activity_events(timestamp)
672 """)
674 conn.execute("""
675 CREATE INDEX IF NOT EXISTS idx_event_type ON activity_events(event_type)
676 """)
678 def store_event(self, event: ActivityEvent) -> None:
679 """Store activity event in database."""
680 with sqlite3.connect(self.db_path) as conn:
681 conn.execute(
682 """
683 INSERT INTO activity_events
684 (timestamp, event_type, application, details, project_path, relevance_score)
685 VALUES (?, ?, ?, ?, ?, ?)
686 """,
687 (
688 event.timestamp,
689 event.event_type,
690 event.application,
691 json.dumps(event.details),
692 event.project_path,
693 event.relevance_score,
694 ),
695 )
697 def get_events(
698 self,
699 start_time: str | None = None,
700 end_time: str | None = None,
701 event_types: list[str] | None = None,
702 limit: int = 100,
703 ) -> list[ActivityEvent]:
704 """Retrieve activity events from database."""
705 with sqlite3.connect(self.db_path) as conn:
706 query = "SELECT * FROM activity_events WHERE 1=1"
707 params: list[Any] = []
709 if start_time:
710 query += " AND timestamp >= ?"
711 params.append(start_time)
713 if end_time: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true
714 query += " AND timestamp <= ?"
715 params.append(end_time)
717 if event_types: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true
718 placeholders = ",".join("?" * len(event_types))
719 query += f" AND event_type IN ({placeholders})"
720 params.extend(event_types)
722 query += " ORDER BY timestamp DESC LIMIT ?"
723 params.append(limit)
725 cursor = conn.execute(query, params)
726 rows = cursor.fetchall()
728 return [
729 ActivityEvent(
730 timestamp=row[1],
731 event_type=row[2],
732 application=row[3],
733 details=json.loads(row[4]),
734 project_path=row[5],
735 relevance_score=row[6] or 0.0,
736 )
737 for row in rows
738 ]
740 def cleanup_old_events(self, days_to_keep: int = 30) -> None:
741 """Remove old activity events."""
742 cutoff = datetime.now() - timedelta(days=days_to_keep)
743 cutoff_str = cutoff.isoformat()
745 with sqlite3.connect(self.db_path) as conn:
746 conn.execute(
747 "DELETE FROM activity_events WHERE timestamp < ?",
748 (cutoff_str,),
749 )
752class ApplicationMonitor:
753 """Main application monitoring coordinator."""
755 def __init__(self, data_dir: str, project_paths: list[str] | None = None) -> None:
756 self.data_dir = Path(data_dir)
757 self.project_paths = project_paths or []
759 self._setup_directory()
760 self._initialize_components()
761 self._setup_monitoring_state()
763 def _setup_directory(self) -> None:
764 """Set up the data directory."""
765 self.data_dir.mkdir(parents=True, exist_ok=True)
767 def _initialize_components(self) -> None:
768 """Initialize monitoring components."""
769 self.db = ActivityDatabase(str(self.data_dir / "activity.db"))
770 self.ide_monitor = ProjectActivityMonitor(self.project_paths)
771 self.browser_monitor = BrowserDocumentationMonitor()
772 self.focus_monitor = ApplicationFocusMonitor()
774 def _setup_monitoring_state(self) -> None:
775 """Set up monitoring state variables."""
776 self.monitoring_active = False
777 self._monitoring_task: asyncio.Task[Any] | None = None
779 async def start_monitoring(self) -> dict[str, Any] | None:
780 """Start all monitoring components."""
781 if self.monitoring_active: 781 ↛ 782line 781 didn't jump to line 782 because the condition on line 781 was never true
782 return None
784 self.monitoring_active = True
786 # Start IDE monitoring
787 ide_started = self.ide_monitor.start_monitoring()
789 # Start background monitoring task
790 self._monitoring_task = asyncio.create_task(self._monitoring_loop())
792 return {
793 "ide_monitoring": ide_started,
794 "watchdog_available": WATCHDOG_AVAILABLE,
795 "psutil_available": PSUTIL_AVAILABLE,
796 "project_paths": self.project_paths,
797 }
799 async def stop_monitoring(self) -> None:
800 """Stop all monitoring."""
801 self.monitoring_active = False
803 if self._monitoring_task: 803 ↛ 804line 803 didn't jump to line 804 because the condition on line 803 was never true
804 self._monitoring_task.cancel()
805 with suppress(asyncio.CancelledError):
806 await self._monitoring_task
808 self.ide_monitor.stop_monitoring()
810 async def _monitoring_loop(self) -> None:
811 """Background monitoring loop."""
812 while self.monitoring_active: 812 ↛ exitline 812 didn't return from function '_monitoring_loop' because the condition on line 812 was always true
813 try:
814 await self._process_monitoring_cycle()
815 await asyncio.sleep(30) # Check every 30 seconds
816 except asyncio.CancelledError:
817 break
818 except Exception as e:
819 await self._handle_monitoring_error(e)
821 async def _process_monitoring_cycle(self) -> None:
822 """Process a single monitoring cycle."""
823 await self._check_application_focus()
824 await self._persist_buffered_events()
826 async def _check_application_focus(self) -> None:
827 """Check and update application focus if changed."""
828 focused_app = self.focus_monitor.get_focused_application()
829 if self._is_focus_changed(focused_app) and focused_app is not None: 829 ↛ exitline 829 didn't return from function '_check_application_focus' because the condition on line 829 was always true
830 self.focus_monitor.add_focus_event(focused_app)
831 self.focus_monitor.current_app = focused_app.get("name")
833 def _is_focus_changed(self, focused_app: dict[str, Any] | None) -> bool:
834 """Check if application focus has changed."""
835 return (
836 focused_app is not None
837 and focused_app.get("name") != self.focus_monitor.current_app
838 )
840 async def _persist_buffered_events(self) -> None:
841 """Persist all buffered events to database."""
842 # Use generator expressions for memory efficiency
843 await self._persist_event_batch(
844 event for event in self.ide_monitor.activity_buffer[-10:]
845 )
846 await self._persist_event_batch(
847 event for event in self.browser_monitor.activity_buffer[-10:]
848 )
849 await self._persist_event_batch(
850 event for event in self.focus_monitor.focus_history[-5:]
851 )
853 async def _persist_event_batch(
854 self,
855 events: Any, # Generator of ActivityEvent
856 ) -> None:
857 """Persist a batch of events to the database."""
858 for event in events:
859 self.db.store_event(event)
861 async def _handle_monitoring_error(self, error: Exception) -> None:
862 """Handle monitoring errors with appropriate logging and delay."""
863 # Log error but continue monitoring
864 await asyncio.sleep(60)
866 def get_activity_summary(self, hours: int = 2) -> dict[str, Any]:
867 """Get activity summary for specified hours."""
868 start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
869 events = self.db.get_events(start_time=start_time, limit=500)
871 summary = self._create_activity_summary_template(hours, events)
872 self._aggregate_event_data(events, summary)
873 self._add_additional_context(hours, summary)
874 return self._finalize_summary(summary)
876 def _create_activity_summary_template(
877 self,
878 hours: int,
879 events: list[Any],
880 ) -> dict[str, Any]:
881 """Create the base summary template."""
882 return {
883 "total_events": len(events),
884 "time_range_hours": hours,
885 "event_types": defaultdict(int),
886 "applications": defaultdict(int),
887 "active_files": [],
888 "documentation_sites": [],
889 "average_relevance": 0.0,
890 }
892 def _aggregate_event_data(self, events: list[Any], summary: dict[str, Any]) -> None:
893 """Aggregate event data into the summary."""
894 total_relevance: float = 0.0
895 doc_sites = set()
897 for event in events:
898 summary["event_types"][event.event_type] += 1
899 summary["applications"][event.application] += 1
900 total_relevance += event.relevance_score
902 if event.event_type == "browser_nav" and event.details.get("domain"):
903 doc_sites.add(event.details["domain"])
905 if events:
906 summary["average_relevance"] = total_relevance / len(events)
908 summary["documentation_sites"] = list(doc_sites)
910 def _add_additional_context(self, hours: int, summary: dict[str, Any]) -> None:
911 """Add additional context information to the summary."""
912 summary["active_files"] = self.ide_monitor.get_active_files(hours * 60)
914 def _finalize_summary(self, summary: dict[str, Any]) -> dict[str, Any]:
915 """Finalize summary by converting collections for JSON serialization."""
916 summary["event_types"] = dict(summary["event_types"])
917 summary["applications"] = dict(summary["applications"])
918 return summary
920 def get_context_insights(self, hours: int = 1) -> dict[str, Any]:
921 """Get contextual insights from recent activity."""
922 start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
923 events = self.db.get_events(start_time=start_time, limit=200)
925 insights = self._create_insights_template()
926 if not events: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 return insights
929 app_time = self._analyze_events(events, insights)
930 self._determine_primary_focus(app_time, insights)
931 self._calculate_productivity_score(events, insights)
932 return self._finalize_insights(insights)
934 def _create_insights_template(self) -> dict[str, Any]:
935 """Create the insights template dictionary."""
936 return {
937 "primary_focus": None,
938 "technologies_used": set(),
939 "active_projects": set(),
940 "documentation_topics": [],
941 "productivity_score": 0.0,
942 "context_switches": 0,
943 }
945 def _analyze_events(
946 self,
947 events: list[Any],
948 insights: dict[str, Any],
949 ) -> dict[str, int]:
950 """Analyze events and extract insights data."""
951 app_time: dict[str, int] = defaultdict(int)
952 last_app = None
954 for event in events:
955 app_time[event.application] += 1
957 if last_app and last_app != event.application: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 insights["context_switches"] += 1
959 last_app = event.application
961 self._extract_event_data(event, insights)
963 return app_time
965 def _extract_event_data(self, event: Any, insights: dict[str, Any]) -> None:
966 """Extract relevant data from a single event."""
967 self._extract_technologies(event, insights)
968 self._extract_projects(event, insights)
969 self._extract_documentation_topics(event, insights)
971 def _extract_technologies(self, event: Any, insights: dict[str, Any]) -> None:
972 """Extract technology information from file change events."""
973 if event.event_type == "file_change": 973 ↛ 974line 973 didn't jump to line 974 because the condition on line 973 was never true
974 ext = event.details.get("file_extension", "")
975 if ext == ".py":
976 insights["technologies_used"].add("python")
977 elif ext in {".js", ".ts"}:
978 insights["technologies_used"].add("javascript")
979 elif ext == ".rs":
980 insights["technologies_used"].add("rust")
982 def _extract_projects(self, event: Any, insights: dict[str, Any]) -> None:
983 """Extract project information from events."""
984 if event.project_path: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true
985 insights["active_projects"].add(event.project_path)
987 def _extract_documentation_topics(
988 self,
989 event: Any,
990 insights: dict[str, Any],
991 ) -> None:
992 """Extract documentation topics from browser navigation events."""
993 if event.event_type == "browser_nav": 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true
994 topic = event.details.get("topic")
995 technology = event.details.get("technology")
996 if topic and technology:
997 insights["documentation_topics"].append(f"{technology}: {topic}")
999 def _determine_primary_focus(
1000 self,
1001 app_time: dict[str, int],
1002 insights: dict[str, Any],
1003 ) -> None:
1004 """Determine the primary application focus."""
1005 if app_time: 1005 ↛ exitline 1005 didn't return from function '_determine_primary_focus' because the condition on line 1005 was always true
1006 insights["primary_focus"] = max(
1007 app_time.items(), key=operator.itemgetter(1)
1008 )[0]
1010 def _calculate_productivity_score(
1011 self,
1012 events: list[Any],
1013 insights: dict[str, Any],
1014 ) -> None:
1015 """Calculate productivity score based on relevant activity."""
1016 relevant_events = [e for e in events if e.relevance_score > 0.5]
1017 if events: 1017 ↛ exitline 1017 didn't return from function '_calculate_productivity_score' because the condition on line 1017 was always true
1018 insights["productivity_score"] = len(relevant_events) / len(events)
1020 def _finalize_insights(self, insights: dict[str, Any]) -> dict[str, Any]:
1021 """Convert sets to lists for JSON serialization."""
1022 insights["technologies_used"] = list(insights["technologies_used"])
1023 insights["active_projects"] = list(insights["active_projects"])
1024 return insights