Coverage for session_buddy / tools / monitoring_tools.py: 21.97%

220 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Application monitoring and activity tracking MCP tools. 

3 

4This module provides tools for monitoring application activity, tracking interruptions, 

5and managing session context following crackerjack architecture patterns. 

6 

7Refactored to use utility modules for reduced code duplication. 

8""" 

9 

10from __future__ import annotations 

11 

12from typing import TYPE_CHECKING, Any 

13 

14from session_buddy.utils.error_handlers import _get_logger 

15from session_buddy.utils.instance_managers import ( 

16 get_app_monitor as resolve_app_monitor, 

17) 

18from session_buddy.utils.instance_managers import ( 

19 get_interruption_manager as resolve_interruption_manager, 

20) 

21from session_buddy.utils.messages import ToolMessages 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Awaitable, Callable 

25 

26 from fastmcp import FastMCP 

27 

28 

29# ============================================================================ 

30# Service Resolution Helpers 

31# ============================================================================ 

32 

33 

34async def _require_app_monitor() -> Any: 

35 """Get application monitor instance or raise error.""" 

36 monitor = await resolve_app_monitor() 

37 if monitor is None: 

38 msg = "Application monitoring not available. Features may be limited" 

39 raise RuntimeError(msg) 

40 return monitor 

41 

42 

43async def _require_interruption_manager() -> Any: 

44 """Get interruption manager instance or raise error.""" 

45 manager = await resolve_interruption_manager() 

46 if manager is None: 

47 msg = "Interruption management not available. Features may be limited" 

48 raise RuntimeError(msg) 

49 return manager 

50 

51 

52async def _execute_monitor_operation( 

53 operation_name: str, operation: Callable[[Any], Awaitable[str]] 

54) -> str: 

55 """Execute a monitoring operation with error handling.""" 

56 try: 

57 monitor = await _require_app_monitor() 

58 return await operation(monitor) 

59 except RuntimeError as e: 

60 return f"{e!s}" 

61 except Exception as e: 

62 _get_logger().exception(f"Error in {operation_name}: {e}") 

63 return ToolMessages.operation_failed(operation_name, e) 

64 

65 

66async def _execute_interruption_operation( 

67 operation_name: str, operation: Callable[[Any], Awaitable[str]] 

68) -> str: 

69 """Execute an interruption management operation with error handling.""" 

70 try: 

71 manager = await _require_interruption_manager() 

72 return await operation(manager) 

73 except RuntimeError as e: 

74 return f"{e!s}" 

75 except Exception as e: 

76 _get_logger().exception(f"Error in {operation_name}: {e}") 

77 return ToolMessages.operation_failed(operation_name, e) 

78 

79 

80# ============================================================================ 

81# App Monitoring Tools 

82# ============================================================================ 

83 

84 

85async def _start_app_monitoring_operation( 

86 monitor: Any, project_paths: list[str] | None 

87) -> str: 

88 """Start monitoring IDE activity and browser documentation usage.""" 

89 await monitor.start_monitoring(project_paths=project_paths) 

90 

91 lines = ["🔍 Application Monitoring Started", ""] 

92 

93 if project_paths: 

94 lines.append("📁 Monitoring project paths:") 

95 lines.extend([f"{path}" for path in project_paths]) 

96 else: 

97 lines.append("📁 Monitoring all accessible paths") 

98 

99 lines.extend( 

100 [ 

101 "", 

102 "👁️ Now tracking:", 

103 " • IDE file access and editing patterns", 

104 " • Browser documentation and research activity", 

105 " • Application focus and context switches", 

106 " • File system changes and development flow", 

107 "", 

108 "💡 Use `get_activity_summary` to view tracked activity", 

109 "💡 Use `stop_app_monitoring` to end tracking", 

110 ] 

111 ) 

112 

113 return "\n".join(lines) 

114 

115 

116async def _start_app_monitoring_impl(project_paths: list[str] | None = None) -> str: 

117 """Start monitoring IDE activity and browser documentation usage.""" 

118 

119 async def operation_wrapper(monitor: Any) -> str: 

120 return await _start_app_monitoring_operation(monitor, project_paths) 

121 

122 return await _execute_monitor_operation( 

123 "Start app monitoring", 

124 operation_wrapper, 

125 ) 

126 

127 

128async def _stop_app_monitoring_operation(monitor: Any) -> str: 

129 """Stop all application monitoring.""" 

130 summary = await monitor.stop_monitoring() 

131 

132 lines = [ 

133 "⏹️ Application Monitoring Stopped", 

134 "", 

135 "📊 Session summary:", 

136 f" • Duration: {summary.get('duration_minutes', 0):.1f} minutes", 

137 f" • Files tracked: {summary.get('files_tracked', 0)}", 

138 f" • Applications monitored: {summary.get('apps_monitored', 0)}", 

139 f" • Context switches: {summary.get('context_switches', 0)}", 

140 "", 

141 "✅ All monitoring stopped successfully", 

142 ] 

143 

144 return "\n".join(lines) 

145 

146 

147async def _stop_app_monitoring_impl() -> str: 

148 """Stop all application monitoring.""" 

149 return await _execute_monitor_operation( 

150 "Stop app monitoring", _stop_app_monitoring_operation 

151 ) 

152 

153 

154# ============================================================================ 

155# Activity Summary Helpers 

156# ============================================================================ 

157 

158 

159def _format_file_activity(files: list[dict[str, Any]]) -> list[str]: 

160 """Format file activity section.""" 

161 if not files: 

162 return [] 

163 

164 lines = [f"📄 File Activity ({len(files)} files):"] 

165 for file_info in files[:10]: # Show top 10 

166 lines.append(f"{file_info['path']} ({file_info['access_count']} accesses)") 

167 if len(files) > 10: 

168 lines.append(f" • ... and {len(files) - 10} more files") 

169 return lines 

170 

171 

172def _format_app_activity(apps: list[dict[str, Any]]) -> list[str]: 

173 """Format application activity section.""" 

174 if not apps: 

175 return [] 

176 

177 lines = ["\n🖥️ Application Focus:"] 

178 for app_info in apps[:5]: # Show top 5 

179 duration = app_info["focus_time_minutes"] 

180 lines.append(f"{app_info['name']}: {duration:.1f} minutes") 

181 return lines 

182 

183 

184def _format_productivity_metrics(metrics: dict[str, Any]) -> list[str]: 

185 """Format productivity metrics section.""" 

186 if not metrics: 

187 return [] 

188 

189 return [ 

190 "\n📈 Productivity Metrics:", 

191 f" • Focus time: {metrics.get('focus_time_minutes', 0):.1f} minutes", 

192 f" • Context switches: {metrics.get('context_switches', 0)}", 

193 f" • Deep work periods: {metrics.get('deep_work_periods', 0)}", 

194 ] 

195 

196 

197async def _get_activity_summary_operation(monitor: Any, hours: int) -> str: 

198 """Get activity summary for the specified number of hours.""" 

199 summary = await monitor.get_activity_summary(hours=hours) 

200 lines = [f"📊 Activity Summary - Last {hours} Hours", ""] 

201 

202 if not summary.get("has_data"): 

203 lines.extend( 

204 [ 

205 "🔍 No activity data available", 

206 "💡 Start monitoring with `start_app_monitoring`", 

207 ] 

208 ) 

209 return "\n".join(lines) 

210 

211 # Add all sections 

212 lines.extend(_format_file_activity(summary.get("file_activity", []))) 

213 lines.extend(_format_app_activity(summary.get("app_activity", []))) 

214 lines.extend(_format_productivity_metrics(summary.get("productivity_metrics", {}))) 

215 

216 return "\n".join(lines) 

217 

218 

219async def _get_activity_summary_impl(hours: int = 2) -> str: 

220 """Get activity summary for the specified number of hours.""" 

221 

222 async def operation_wrapper(monitor: Any) -> str: 

223 return await _get_activity_summary_operation(monitor, hours) 

224 

225 return await _execute_monitor_operation("Get activity summary", operation_wrapper) 

226 

227 

228# ============================================================================ 

229# Context Insights 

230# ============================================================================ 

231 

232 

233def _format_context_insights_output(insights: dict[str, Any], hours: int) -> list[str]: 

234 """Format context insights output.""" 

235 lines = [f"🧠 Context Insights - Last {hours} Hours", ""] 

236 

237 if not insights.get("has_data"): 

238 lines.append("🔍 No context data available") 

239 return lines 

240 

241 # Current focus area 

242 focus = insights.get("current_focus") 

243 if focus: 

244 lines.extend( 

245 ( 

246 f"🎯 Current Focus: {focus['area']}", 

247 f" Duration: {focus['duration_minutes']:.1f} minutes", 

248 ) 

249 ) 

250 

251 # Project patterns 

252 patterns = insights.get("project_patterns", []) 

253 if patterns: 

254 lines.append("\n📋 Project Patterns:") 

255 lines.extend([f"{pattern['description']}" for pattern in patterns[:3]]) 

256 

257 # Technology context 

258 tech_context = insights.get("technology_context", []) 

259 if tech_context: 

260 lines.append("\n⚙️ Technology Context:") 

261 lines.extend( 

262 [ 

263 f"{tech['name']}: {tech['confidence']:.0%} confidence" 

264 for tech in tech_context[:5] 

265 ] 

266 ) 

267 

268 # Recommendations 

269 recommendations = insights.get("recommendations", []) 

270 if recommendations: 

271 lines.append("\n💡 Recommendations:") 

272 lines.extend([f"{rec}" for rec in recommendations[:3]]) 

273 

274 return lines 

275 

276 

277async def _get_context_insights_operation(monitor: Any, hours: int) -> str: 

278 """Get contextual insights from recent activity.""" 

279 insights = await monitor.get_context_insights(hours=hours) 

280 lines = _format_context_insights_output(insights, hours) 

281 return "\n".join(lines) 

282 

283 

284async def _get_context_insights_impl(hours: int = 1) -> str: 

285 """Get contextual insights from recent activity.""" 

286 

287 async def operation_wrapper(monitor: Any) -> str: 

288 return await _get_context_insights_operation(monitor, hours) 

289 

290 return await _execute_monitor_operation("Get context insights", operation_wrapper) 

291 

292 

293async def _get_active_files_operation(monitor: Any, minutes: int) -> str: 

294 """Get list of actively edited files in recent minutes.""" 

295 files = await monitor.get_active_files(minutes=minutes) 

296 

297 lines = [f"📄 Active Files - Last {minutes} Minutes", ""] 

298 

299 if not files: 

300 lines.extend( 

301 [ 

302 "🔍 No active files in this period", 

303 "💡 Files will appear here when you edit them during monitoring", 

304 ] 

305 ) 

306 return "\n".join(lines) 

307 

308 lines.append(f"📝 Found {len(files)} active files:") 

309 for file_info in files[:20]: # Show top 20 

310 timestamp = file_info.get("last_modified", "Unknown") 

311 lines.extend( 

312 ( 

313 f"{file_info['path']}", 

314 f" Last modified: {timestamp}", 

315 f" Changes: {file_info.get('change_count', 0)}", 

316 ) 

317 ) 

318 

319 if len(files) > 20: 

320 lines.append(f"\n... and {len(files) - 20} more files") 

321 

322 return "\n".join(lines) 

323 

324 

325async def _get_active_files_impl(minutes: int = 60) -> str: 

326 """Get list of actively edited files in recent minutes.""" 

327 

328 async def operation_wrapper(monitor: Any) -> str: 

329 return await _get_active_files_operation(monitor, minutes) 

330 

331 return await _execute_monitor_operation("Get active files", operation_wrapper) 

332 

333 

334# ============================================================================ 

335# Interruption Management Tools 

336# ============================================================================ 

337 

338 

339async def _start_interruption_monitoring_operation( 

340 manager: Any, session_id: str, user_id: str 

341) -> str: 

342 """Start monitoring for interruptions and context switches.""" 

343 await manager.start_monitoring(session_id=session_id, user_id=user_id) 

344 

345 return "\n".join( 

346 [ 

347 "🔔 Interruption Monitoring Started", 

348 "", 

349 f"📝 Session ID: {session_id}", 

350 f"👤 User: {user_id}", 

351 "", 

352 "🎯 Now detecting:", 

353 " • System sleep/wake events", 

354 " • Network disconnections", 

355 " • Application crashes", 

356 " • Long periods of inactivity", 

357 "", 

358 "💡 Context will be automatically preserved on interruptions", 

359 "💡 Use `get_interruption_history` to view past events", 

360 ] 

361 ) 

362 

363 

364async def _start_interruption_monitoring_impl( 

365 session_id: str, user_id: str = "default_user" 

366) -> str: 

367 """Start monitoring for interruptions and context switches.""" 

368 

369 async def operation_wrapper(manager: Any) -> str: 

370 return await _start_interruption_monitoring_operation( 

371 manager, session_id, user_id 

372 ) 

373 

374 return await _execute_interruption_operation( 

375 "Start interruption monitoring", 

376 operation_wrapper, 

377 ) 

378 

379 

380async def _stop_interruption_monitoring_operation(manager: Any) -> str: 

381 """Stop interruption monitoring.""" 

382 summary = await manager.stop_monitoring() 

383 

384 return "\n".join( 

385 [ 

386 "⏹️ Interruption Monitoring Stopped", 

387 "", 

388 "📊 Session summary:", 

389 f" • Duration: {summary.get('duration_minutes', 0):.1f} minutes", 

390 f" • Interruptions detected: {summary.get('interruption_count', 0)}", 

391 f" • Contexts preserved: {summary.get('contexts_saved', 0)}", 

392 "", 

393 "✅ Monitoring stopped successfully", 

394 ] 

395 ) 

396 

397 

398async def _stop_interruption_monitoring_impl() -> str: 

399 """Stop interruption monitoring.""" 

400 return await _execute_interruption_operation( 

401 "Stop interruption monitoring", _stop_interruption_monitoring_operation 

402 ) 

403 

404 

405async def _create_session_context_operation( 

406 manager: Any, session_id: str, context_data: dict[str, Any] 

407) -> str: 

408 """Create a new session context snapshot.""" 

409 context_id = await manager.create_context_snapshot( 

410 session_id=session_id, context_data=context_data 

411 ) 

412 

413 return "\n".join( 

414 [ 

415 "📸 Session Context Created", 

416 "", 

417 f"🆔 Context ID: {context_id}", 

418 f"📝 Session: {session_id}", 

419 f"📦 Data items: {len(context_data)}", 

420 "", 

421 "✅ Context snapshot saved successfully", 

422 "💡 Use `restore_session_context` to restore this context", 

423 ] 

424 ) 

425 

426 

427async def _create_session_context_impl( 

428 session_id: str, context_data: dict[str, Any] 

429) -> str: 

430 """Create a new session context snapshot.""" 

431 

432 async def operation_wrapper(manager: Any) -> str: 

433 return await _create_session_context_operation( 

434 manager, session_id, context_data 

435 ) 

436 

437 return await _execute_interruption_operation( 

438 "Create session context", 

439 operation_wrapper, 

440 ) 

441 

442 

443async def _preserve_current_context_operation( 

444 manager: Any, session_id: str, reason: str 

445) -> str: 

446 """Preserve current development context before an interruption.""" 

447 context_snapshot = await manager.preserve_context( 

448 session_id=session_id, interruption_reason=reason 

449 ) 

450 

451 return "\n".join( 

452 [ 

453 "💾 Context Preserved", 

454 "", 

455 f"🆔 Snapshot ID: {context_snapshot['id']}", 

456 f"📝 Reason: {reason}", 

457 f"📦 Items preserved: {context_snapshot['item_count']}", 

458 "", 

459 "✅ Context saved successfully", 

460 "💡 Use `restore_session_context` to restore this context", 

461 ] 

462 ) 

463 

464 

465async def _preserve_current_context_impl( 

466 session_id: str, reason: str = "manual_checkpoint" 

467) -> str: 

468 """Preserve current development context before an interruption.""" 

469 

470 async def operation_wrapper(manager: Any) -> str: 

471 return await _preserve_current_context_operation(manager, session_id, reason) 

472 

473 return await _execute_interruption_operation( 

474 "Preserve current context", 

475 operation_wrapper, 

476 ) 

477 

478 

479async def _restore_session_context_operation(manager: Any, session_id: str) -> str: 

480 """Restore a previously saved session context.""" 

481 restored = await manager.restore_context(session_id=session_id) 

482 

483 if not restored.get("success"): 

484 return f"❌ Failed to restore context: {restored.get('error', 'Unknown error')}" 

485 

486 return "\n".join( 

487 [ 

488 "♻️ Context Restored", 

489 "", 

490 f"📝 Session ID: {session_id}", 

491 f"📦 Items restored: {restored['item_count']}", 

492 f"📅 Original timestamp: {restored['original_timestamp']}", 

493 "", 

494 "✅ Context restored successfully", 

495 "💡 Resume work from where you left off", 

496 ] 

497 ) 

498 

499 

500async def _restore_session_context_impl(session_id: str) -> str: 

501 """Restore a previously saved session context.""" 

502 

503 async def operation_wrapper(manager: Any) -> str: 

504 return await _restore_session_context_operation(manager, session_id) 

505 

506 return await _execute_interruption_operation( 

507 "Restore session context", 

508 operation_wrapper, 

509 ) 

510 

511 

512async def _get_interruption_history_operation( 

513 manager: Any, user_id: str, hours: int 

514) -> str: 

515 """Get history of interruptions for debugging and analysis.""" 

516 history = await manager.get_interruption_history(user_id=user_id, hours=hours) 

517 

518 lines = [f"📜 Interruption History - Last {hours} Hours", f"👤 User: {user_id}", ""] 

519 

520 if not history: 

521 lines.extend( 

522 [ 

523 "🔍 No interruptions recorded", 

524 "💡 Interruptions will appear here when detected during monitoring", 

525 ] 

526 ) 

527 return "\n".join(lines) 

528 

529 lines.append(f"⚠️ Found {len(history)} interruptions:") 

530 for event in history[:10]: # Show last 10 

531 lines.extend( 

532 [ 

533 f"\n📍 {event['timestamp']}", 

534 f" Type: {event['type']}", 

535 f" Reason: {event.get('reason', 'N/A')}", 

536 f" Recovery: {event.get('recovery_action', 'None')}", 

537 ] 

538 ) 

539 

540 if len(history) > 10: 

541 lines.append(f"\n... and {len(history) - 10} more events") 

542 

543 return "\n".join(lines) 

544 

545 

546async def _get_interruption_history_impl(user_id: str, hours: int = 24) -> str: 

547 """Get history of interruptions for debugging and analysis.""" 

548 

549 async def operation_wrapper(manager: Any) -> str: 

550 return await _get_interruption_history_operation(manager, user_id, hours) 

551 

552 return await _execute_interruption_operation( 

553 "Get interruption history", 

554 operation_wrapper, 

555 ) 

556 

557 

558# ============================================================================ 

559# MCP Tool Registration 

560# ============================================================================ 

561 

562 

563def register_monitoring_tools(mcp: FastMCP) -> None: 

564 """Register all monitoring and interruption management tools.""" 

565 

566 @mcp.tool() # type: ignore[misc] 

567 async def start_app_monitoring(project_paths: list[str] | None = None) -> str: 

568 """Start monitoring IDE activity and browser documentation usage.""" 

569 return await _start_app_monitoring_impl(project_paths) 

570 

571 @mcp.tool() # type: ignore[misc] 

572 async def stop_app_monitoring() -> str: 

573 """Stop all application monitoring.""" 

574 return await _stop_app_monitoring_impl() 

575 

576 @mcp.tool() # type: ignore[misc] 

577 async def get_activity_summary(hours: int = 2) -> str: 

578 """Get activity summary for the specified number of hours.""" 

579 return await _get_activity_summary_impl(hours) 

580 

581 @mcp.tool() # type: ignore[misc] 

582 async def get_context_insights(hours: int = 1) -> str: 

583 """Get contextual insights from recent activity.""" 

584 return await _get_context_insights_impl(hours) 

585 

586 @mcp.tool() # type: ignore[misc] 

587 async def get_active_files(minutes: int = 60) -> str: 

588 """Get list of actively edited files in recent minutes.""" 

589 return await _get_active_files_impl(minutes) 

590 

591 @mcp.tool() # type: ignore[misc] 

592 async def start_interruption_monitoring( 

593 session_id: str, user_id: str = "default_user" 

594 ) -> str: 

595 """Start monitoring for interruptions and context switches.""" 

596 return await _start_interruption_monitoring_impl(session_id, user_id) 

597 

598 @mcp.tool() # type: ignore[misc] 

599 async def stop_interruption_monitoring() -> str: 

600 """Stop interruption monitoring.""" 

601 return await _stop_interruption_monitoring_impl() 

602 

603 @mcp.tool() # type: ignore[misc] 

604 async def create_session_context( 

605 session_id: str, context_data: dict[str, Any] 

606 ) -> str: 

607 """Create a new session context snapshot.""" 

608 return await _create_session_context_impl(session_id, context_data) 

609 

610 @mcp.tool() # type: ignore[misc] 

611 async def preserve_current_context( 

612 session_id: str, reason: str = "manual_checkpoint" 

613 ) -> str: 

614 """Preserve current development context before an interruption.""" 

615 return await _preserve_current_context_impl(session_id, reason) 

616 

617 @mcp.tool() # type: ignore[misc] 

618 async def restore_session_context(session_id: str) -> str: 

619 """Restore a previously saved session context.""" 

620 return await _restore_session_context_impl(session_id) 

621 

622 @mcp.tool() # type: ignore[misc] 

623 async def get_interruption_history(user_id: str, hours: int = 24) -> str: 

624 """Get history of interruptions for debugging and analysis.""" 

625 return await _get_interruption_history_impl(user_id, hours)