Coverage for session_buddy / tools / conscious_agent_tools.py: 34.21%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2from __future__ import annotations 

3 

4import typing as t 

5from typing import TYPE_CHECKING 

6 

7from session_buddy.config.feature_flags import get_feature_flags 

8from session_buddy.memory.conscious_agent import ConsciousAgent 

9from session_buddy.reflection_tools import get_reflection_database 

10 

11if TYPE_CHECKING: 

12 from fastmcp import FastMCP 

13 

14_agent: ConsciousAgent | None = None 

15 

16 

17def register_conscious_agent_tools(mcp: FastMCP) -> None: 

18 @mcp.tool() # type: ignore[no-untyped-call] 

19 async def start_conscious_agent(interval_hours: int = 6) -> dict[str, t.Any]: 

20 """Start background Conscious Agent if enabled by flags.""" 

21 flags = get_feature_flags() 

22 if not flags.enable_conscious_agent: 

23 return {"status": "disabled"} 

24 global _agent 

25 if _agent is None: 

26 db = await get_reflection_database() 

27 _agent = ConsciousAgent(db, analysis_interval_hours=interval_hours) 

28 await _agent.start() 

29 return {"status": "started", "interval_hours": interval_hours} 

30 

31 @mcp.tool() # type: ignore[no-untyped-call] 

32 async def stop_conscious_agent() -> dict[str, t.Any]: 

33 """Stop background Conscious Agent if running.""" 

34 global _agent 

35 if _agent is None: 

36 return {"status": "not_running"} 

37 await _agent.stop() 

38 _agent = None 

39 return {"status": "stopped"} 

40 

41 @mcp.tool() # type: ignore[no-untyped-call] 

42 async def force_conscious_analysis() -> dict[str, t.Any]: 

43 """Force a one-time analysis run.""" 

44 global _agent 

45 if _agent is None: 

46 db = await get_reflection_database() 

47 _agent = ConsciousAgent(db) 

48 return await _agent.force_analysis()