Coverage for session_buddy / tools / monitoring_tools.py: 21.97%
220 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Application monitoring and activity tracking MCP tools.
4This module provides tools for monitoring application activity, tracking interruptions,
5and managing session context following crackerjack architecture patterns.
7Refactored to use utility modules for reduced code duplication.
8"""
10from __future__ import annotations
12from typing import TYPE_CHECKING, Any
14from session_buddy.utils.error_handlers import _get_logger
15from session_buddy.utils.instance_managers import (
16 get_app_monitor as resolve_app_monitor,
17)
18from session_buddy.utils.instance_managers import (
19 get_interruption_manager as resolve_interruption_manager,
20)
21from session_buddy.utils.messages import ToolMessages
23if TYPE_CHECKING:
24 from collections.abc import Awaitable, Callable
26 from fastmcp import FastMCP
29# ============================================================================
30# Service Resolution Helpers
31# ============================================================================
34async def _require_app_monitor() -> Any:
35 """Get application monitor instance or raise error."""
36 monitor = await resolve_app_monitor()
37 if monitor is None:
38 msg = "Application monitoring not available. Features may be limited"
39 raise RuntimeError(msg)
40 return monitor
43async def _require_interruption_manager() -> Any:
44 """Get interruption manager instance or raise error."""
45 manager = await resolve_interruption_manager()
46 if manager is None:
47 msg = "Interruption management not available. Features may be limited"
48 raise RuntimeError(msg)
49 return manager
52async def _execute_monitor_operation(
53 operation_name: str, operation: Callable[[Any], Awaitable[str]]
54) -> str:
55 """Execute a monitoring operation with error handling."""
56 try:
57 monitor = await _require_app_monitor()
58 return await operation(monitor)
59 except RuntimeError as e:
60 return f"❌ {e!s}"
61 except Exception as e:
62 _get_logger().exception(f"Error in {operation_name}: {e}")
63 return ToolMessages.operation_failed(operation_name, e)
66async def _execute_interruption_operation(
67 operation_name: str, operation: Callable[[Any], Awaitable[str]]
68) -> str:
69 """Execute an interruption management operation with error handling."""
70 try:
71 manager = await _require_interruption_manager()
72 return await operation(manager)
73 except RuntimeError as e:
74 return f"❌ {e!s}"
75 except Exception as e:
76 _get_logger().exception(f"Error in {operation_name}: {e}")
77 return ToolMessages.operation_failed(operation_name, e)
80# ============================================================================
81# App Monitoring Tools
82# ============================================================================
85async def _start_app_monitoring_operation(
86 monitor: Any, project_paths: list[str] | None
87) -> str:
88 """Start monitoring IDE activity and browser documentation usage."""
89 await monitor.start_monitoring(project_paths=project_paths)
91 lines = ["🔍 Application Monitoring Started", ""]
93 if project_paths:
94 lines.append("📁 Monitoring project paths:")
95 lines.extend([f" • {path}" for path in project_paths])
96 else:
97 lines.append("📁 Monitoring all accessible paths")
99 lines.extend(
100 [
101 "",
102 "👁️ Now tracking:",
103 " • IDE file access and editing patterns",
104 " • Browser documentation and research activity",
105 " • Application focus and context switches",
106 " • File system changes and development flow",
107 "",
108 "💡 Use `get_activity_summary` to view tracked activity",
109 "💡 Use `stop_app_monitoring` to end tracking",
110 ]
111 )
113 return "\n".join(lines)
116async def _start_app_monitoring_impl(project_paths: list[str] | None = None) -> str:
117 """Start monitoring IDE activity and browser documentation usage."""
119 async def operation_wrapper(monitor: Any) -> str:
120 return await _start_app_monitoring_operation(monitor, project_paths)
122 return await _execute_monitor_operation(
123 "Start app monitoring",
124 operation_wrapper,
125 )
128async def _stop_app_monitoring_operation(monitor: Any) -> str:
129 """Stop all application monitoring."""
130 summary = await monitor.stop_monitoring()
132 lines = [
133 "⏹️ Application Monitoring Stopped",
134 "",
135 "📊 Session summary:",
136 f" • Duration: {summary.get('duration_minutes', 0):.1f} minutes",
137 f" • Files tracked: {summary.get('files_tracked', 0)}",
138 f" • Applications monitored: {summary.get('apps_monitored', 0)}",
139 f" • Context switches: {summary.get('context_switches', 0)}",
140 "",
141 "✅ All monitoring stopped successfully",
142 ]
144 return "\n".join(lines)
147async def _stop_app_monitoring_impl() -> str:
148 """Stop all application monitoring."""
149 return await _execute_monitor_operation(
150 "Stop app monitoring", _stop_app_monitoring_operation
151 )
154# ============================================================================
155# Activity Summary Helpers
156# ============================================================================
159def _format_file_activity(files: list[dict[str, Any]]) -> list[str]:
160 """Format file activity section."""
161 if not files:
162 return []
164 lines = [f"📄 File Activity ({len(files)} files):"]
165 for file_info in files[:10]: # Show top 10
166 lines.append(f" • {file_info['path']} ({file_info['access_count']} accesses)")
167 if len(files) > 10:
168 lines.append(f" • ... and {len(files) - 10} more files")
169 return lines
172def _format_app_activity(apps: list[dict[str, Any]]) -> list[str]:
173 """Format application activity section."""
174 if not apps:
175 return []
177 lines = ["\n🖥️ Application Focus:"]
178 for app_info in apps[:5]: # Show top 5
179 duration = app_info["focus_time_minutes"]
180 lines.append(f" • {app_info['name']}: {duration:.1f} minutes")
181 return lines
184def _format_productivity_metrics(metrics: dict[str, Any]) -> list[str]:
185 """Format productivity metrics section."""
186 if not metrics:
187 return []
189 return [
190 "\n📈 Productivity Metrics:",
191 f" • Focus time: {metrics.get('focus_time_minutes', 0):.1f} minutes",
192 f" • Context switches: {metrics.get('context_switches', 0)}",
193 f" • Deep work periods: {metrics.get('deep_work_periods', 0)}",
194 ]
197async def _get_activity_summary_operation(monitor: Any, hours: int) -> str:
198 """Get activity summary for the specified number of hours."""
199 summary = await monitor.get_activity_summary(hours=hours)
200 lines = [f"📊 Activity Summary - Last {hours} Hours", ""]
202 if not summary.get("has_data"):
203 lines.extend(
204 [
205 "🔍 No activity data available",
206 "💡 Start monitoring with `start_app_monitoring`",
207 ]
208 )
209 return "\n".join(lines)
211 # Add all sections
212 lines.extend(_format_file_activity(summary.get("file_activity", [])))
213 lines.extend(_format_app_activity(summary.get("app_activity", [])))
214 lines.extend(_format_productivity_metrics(summary.get("productivity_metrics", {})))
216 return "\n".join(lines)
219async def _get_activity_summary_impl(hours: int = 2) -> str:
220 """Get activity summary for the specified number of hours."""
222 async def operation_wrapper(monitor: Any) -> str:
223 return await _get_activity_summary_operation(monitor, hours)
225 return await _execute_monitor_operation("Get activity summary", operation_wrapper)
228# ============================================================================
229# Context Insights
230# ============================================================================
233def _format_context_insights_output(insights: dict[str, Any], hours: int) -> list[str]:
234 """Format context insights output."""
235 lines = [f"🧠 Context Insights - Last {hours} Hours", ""]
237 if not insights.get("has_data"):
238 lines.append("🔍 No context data available")
239 return lines
241 # Current focus area
242 focus = insights.get("current_focus")
243 if focus:
244 lines.extend(
245 (
246 f"🎯 Current Focus: {focus['area']}",
247 f" Duration: {focus['duration_minutes']:.1f} minutes",
248 )
249 )
251 # Project patterns
252 patterns = insights.get("project_patterns", [])
253 if patterns:
254 lines.append("\n📋 Project Patterns:")
255 lines.extend([f" • {pattern['description']}" for pattern in patterns[:3]])
257 # Technology context
258 tech_context = insights.get("technology_context", [])
259 if tech_context:
260 lines.append("\n⚙️ Technology Context:")
261 lines.extend(
262 [
263 f" • {tech['name']}: {tech['confidence']:.0%} confidence"
264 for tech in tech_context[:5]
265 ]
266 )
268 # Recommendations
269 recommendations = insights.get("recommendations", [])
270 if recommendations:
271 lines.append("\n💡 Recommendations:")
272 lines.extend([f" • {rec}" for rec in recommendations[:3]])
274 return lines
277async def _get_context_insights_operation(monitor: Any, hours: int) -> str:
278 """Get contextual insights from recent activity."""
279 insights = await monitor.get_context_insights(hours=hours)
280 lines = _format_context_insights_output(insights, hours)
281 return "\n".join(lines)
284async def _get_context_insights_impl(hours: int = 1) -> str:
285 """Get contextual insights from recent activity."""
287 async def operation_wrapper(monitor: Any) -> str:
288 return await _get_context_insights_operation(monitor, hours)
290 return await _execute_monitor_operation("Get context insights", operation_wrapper)
293async def _get_active_files_operation(monitor: Any, minutes: int) -> str:
294 """Get list of actively edited files in recent minutes."""
295 files = await monitor.get_active_files(minutes=minutes)
297 lines = [f"📄 Active Files - Last {minutes} Minutes", ""]
299 if not files:
300 lines.extend(
301 [
302 "🔍 No active files in this period",
303 "💡 Files will appear here when you edit them during monitoring",
304 ]
305 )
306 return "\n".join(lines)
308 lines.append(f"📝 Found {len(files)} active files:")
309 for file_info in files[:20]: # Show top 20
310 timestamp = file_info.get("last_modified", "Unknown")
311 lines.extend(
312 (
313 f" • {file_info['path']}",
314 f" Last modified: {timestamp}",
315 f" Changes: {file_info.get('change_count', 0)}",
316 )
317 )
319 if len(files) > 20:
320 lines.append(f"\n... and {len(files) - 20} more files")
322 return "\n".join(lines)
325async def _get_active_files_impl(minutes: int = 60) -> str:
326 """Get list of actively edited files in recent minutes."""
328 async def operation_wrapper(monitor: Any) -> str:
329 return await _get_active_files_operation(monitor, minutes)
331 return await _execute_monitor_operation("Get active files", operation_wrapper)
334# ============================================================================
335# Interruption Management Tools
336# ============================================================================
339async def _start_interruption_monitoring_operation(
340 manager: Any, session_id: str, user_id: str
341) -> str:
342 """Start monitoring for interruptions and context switches."""
343 await manager.start_monitoring(session_id=session_id, user_id=user_id)
345 return "\n".join(
346 [
347 "🔔 Interruption Monitoring Started",
348 "",
349 f"📝 Session ID: {session_id}",
350 f"👤 User: {user_id}",
351 "",
352 "🎯 Now detecting:",
353 " • System sleep/wake events",
354 " • Network disconnections",
355 " • Application crashes",
356 " • Long periods of inactivity",
357 "",
358 "💡 Context will be automatically preserved on interruptions",
359 "💡 Use `get_interruption_history` to view past events",
360 ]
361 )
364async def _start_interruption_monitoring_impl(
365 session_id: str, user_id: str = "default_user"
366) -> str:
367 """Start monitoring for interruptions and context switches."""
369 async def operation_wrapper(manager: Any) -> str:
370 return await _start_interruption_monitoring_operation(
371 manager, session_id, user_id
372 )
374 return await _execute_interruption_operation(
375 "Start interruption monitoring",
376 operation_wrapper,
377 )
380async def _stop_interruption_monitoring_operation(manager: Any) -> str:
381 """Stop interruption monitoring."""
382 summary = await manager.stop_monitoring()
384 return "\n".join(
385 [
386 "⏹️ Interruption Monitoring Stopped",
387 "",
388 "📊 Session summary:",
389 f" • Duration: {summary.get('duration_minutes', 0):.1f} minutes",
390 f" • Interruptions detected: {summary.get('interruption_count', 0)}",
391 f" • Contexts preserved: {summary.get('contexts_saved', 0)}",
392 "",
393 "✅ Monitoring stopped successfully",
394 ]
395 )
398async def _stop_interruption_monitoring_impl() -> str:
399 """Stop interruption monitoring."""
400 return await _execute_interruption_operation(
401 "Stop interruption monitoring", _stop_interruption_monitoring_operation
402 )
405async def _create_session_context_operation(
406 manager: Any, session_id: str, context_data: dict[str, Any]
407) -> str:
408 """Create a new session context snapshot."""
409 context_id = await manager.create_context_snapshot(
410 session_id=session_id, context_data=context_data
411 )
413 return "\n".join(
414 [
415 "📸 Session Context Created",
416 "",
417 f"🆔 Context ID: {context_id}",
418 f"📝 Session: {session_id}",
419 f"📦 Data items: {len(context_data)}",
420 "",
421 "✅ Context snapshot saved successfully",
422 "💡 Use `restore_session_context` to restore this context",
423 ]
424 )
427async def _create_session_context_impl(
428 session_id: str, context_data: dict[str, Any]
429) -> str:
430 """Create a new session context snapshot."""
432 async def operation_wrapper(manager: Any) -> str:
433 return await _create_session_context_operation(
434 manager, session_id, context_data
435 )
437 return await _execute_interruption_operation(
438 "Create session context",
439 operation_wrapper,
440 )
443async def _preserve_current_context_operation(
444 manager: Any, session_id: str, reason: str
445) -> str:
446 """Preserve current development context before an interruption."""
447 context_snapshot = await manager.preserve_context(
448 session_id=session_id, interruption_reason=reason
449 )
451 return "\n".join(
452 [
453 "💾 Context Preserved",
454 "",
455 f"🆔 Snapshot ID: {context_snapshot['id']}",
456 f"📝 Reason: {reason}",
457 f"📦 Items preserved: {context_snapshot['item_count']}",
458 "",
459 "✅ Context saved successfully",
460 "💡 Use `restore_session_context` to restore this context",
461 ]
462 )
465async def _preserve_current_context_impl(
466 session_id: str, reason: str = "manual_checkpoint"
467) -> str:
468 """Preserve current development context before an interruption."""
470 async def operation_wrapper(manager: Any) -> str:
471 return await _preserve_current_context_operation(manager, session_id, reason)
473 return await _execute_interruption_operation(
474 "Preserve current context",
475 operation_wrapper,
476 )
479async def _restore_session_context_operation(manager: Any, session_id: str) -> str:
480 """Restore a previously saved session context."""
481 restored = await manager.restore_context(session_id=session_id)
483 if not restored.get("success"):
484 return f"❌ Failed to restore context: {restored.get('error', 'Unknown error')}"
486 return "\n".join(
487 [
488 "♻️ Context Restored",
489 "",
490 f"📝 Session ID: {session_id}",
491 f"📦 Items restored: {restored['item_count']}",
492 f"📅 Original timestamp: {restored['original_timestamp']}",
493 "",
494 "✅ Context restored successfully",
495 "💡 Resume work from where you left off",
496 ]
497 )
500async def _restore_session_context_impl(session_id: str) -> str:
501 """Restore a previously saved session context."""
503 async def operation_wrapper(manager: Any) -> str:
504 return await _restore_session_context_operation(manager, session_id)
506 return await _execute_interruption_operation(
507 "Restore session context",
508 operation_wrapper,
509 )
512async def _get_interruption_history_operation(
513 manager: Any, user_id: str, hours: int
514) -> str:
515 """Get history of interruptions for debugging and analysis."""
516 history = await manager.get_interruption_history(user_id=user_id, hours=hours)
518 lines = [f"📜 Interruption History - Last {hours} Hours", f"👤 User: {user_id}", ""]
520 if not history:
521 lines.extend(
522 [
523 "🔍 No interruptions recorded",
524 "💡 Interruptions will appear here when detected during monitoring",
525 ]
526 )
527 return "\n".join(lines)
529 lines.append(f"⚠️ Found {len(history)} interruptions:")
530 for event in history[:10]: # Show last 10
531 lines.extend(
532 [
533 f"\n📍 {event['timestamp']}",
534 f" Type: {event['type']}",
535 f" Reason: {event.get('reason', 'N/A')}",
536 f" Recovery: {event.get('recovery_action', 'None')}",
537 ]
538 )
540 if len(history) > 10:
541 lines.append(f"\n... and {len(history) - 10} more events")
543 return "\n".join(lines)
546async def _get_interruption_history_impl(user_id: str, hours: int = 24) -> str:
547 """Get history of interruptions for debugging and analysis."""
549 async def operation_wrapper(manager: Any) -> str:
550 return await _get_interruption_history_operation(manager, user_id, hours)
552 return await _execute_interruption_operation(
553 "Get interruption history",
554 operation_wrapper,
555 )
558# ============================================================================
559# MCP Tool Registration
560# ============================================================================
563def register_monitoring_tools(mcp: FastMCP) -> None:
564 """Register all monitoring and interruption management tools."""
566 @mcp.tool() # type: ignore[misc]
567 async def start_app_monitoring(project_paths: list[str] | None = None) -> str:
568 """Start monitoring IDE activity and browser documentation usage."""
569 return await _start_app_monitoring_impl(project_paths)
571 @mcp.tool() # type: ignore[misc]
572 async def stop_app_monitoring() -> str:
573 """Stop all application monitoring."""
574 return await _stop_app_monitoring_impl()
576 @mcp.tool() # type: ignore[misc]
577 async def get_activity_summary(hours: int = 2) -> str:
578 """Get activity summary for the specified number of hours."""
579 return await _get_activity_summary_impl(hours)
581 @mcp.tool() # type: ignore[misc]
582 async def get_context_insights(hours: int = 1) -> str:
583 """Get contextual insights from recent activity."""
584 return await _get_context_insights_impl(hours)
586 @mcp.tool() # type: ignore[misc]
587 async def get_active_files(minutes: int = 60) -> str:
588 """Get list of actively edited files in recent minutes."""
589 return await _get_active_files_impl(minutes)
591 @mcp.tool() # type: ignore[misc]
592 async def start_interruption_monitoring(
593 session_id: str, user_id: str = "default_user"
594 ) -> str:
595 """Start monitoring for interruptions and context switches."""
596 return await _start_interruption_monitoring_impl(session_id, user_id)
598 @mcp.tool() # type: ignore[misc]
599 async def stop_interruption_monitoring() -> str:
600 """Stop interruption monitoring."""
601 return await _stop_interruption_monitoring_impl()
603 @mcp.tool() # type: ignore[misc]
604 async def create_session_context(
605 session_id: str, context_data: dict[str, Any]
606 ) -> str:
607 """Create a new session context snapshot."""
608 return await _create_session_context_impl(session_id, context_data)
610 @mcp.tool() # type: ignore[misc]
611 async def preserve_current_context(
612 session_id: str, reason: str = "manual_checkpoint"
613 ) -> str:
614 """Preserve current development context before an interruption."""
615 return await _preserve_current_context_impl(session_id, reason)
617 @mcp.tool() # type: ignore[misc]
618 async def restore_session_context(session_id: str) -> str:
619 """Restore a previously saved session context."""
620 return await _restore_session_context_impl(session_id)
622 @mcp.tool() # type: ignore[misc]
623 async def get_interruption_history(user_id: str, hours: int = 24) -> str:
624 """Get history of interruptions for debugging and analysis."""
625 return await _get_interruption_history_impl(user_id, hours)