Coverage for session_buddy / tools / access_log_tools.py: 86.30%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""MCP tools for memory access log statistics.""" 

3 

4from __future__ import annotations 

5 

6from datetime import datetime, timedelta 

7from typing import TYPE_CHECKING, Any 

8 

9if TYPE_CHECKING: 

10 from fastmcp import FastMCP 

11 

12 

13def register_access_log_tools(mcp: FastMCP) -> None: 

14 @mcp.tool() # type: ignore[no-untyped-call] 

15 async def access_log_stats( 

16 hours: int = 24, 

17 top_n: int = 10, 

18 project: str | None = None, 

19 namespace: str | None = None, 

20 ) -> dict[str, Any]: 

21 """Return access statistics from memory_access_log. 

22 

23 Filters by time window and optional project/namespace. 

24 """ 

25 try: 

26 import duckdb 

27 from session_buddy.settings import get_database_path 

28 

29 db_path = get_database_path() 

30 cutoff = datetime.now() - timedelta(hours=hours) 

31 

32 with duckdb.connect( 

33 db_path, config={"allow_unsigned_extensions": True} 

34 ) as conn: 

35 query_config = _build_query_config(cutoff, project, namespace) 

36 

37 total_accesses = _get_total_accesses(conn, query_config) 

38 distinct_memories = _get_distinct_memories(conn, query_config) 

39 by_type = _get_access_type_stats(conn, query_config) 

40 by_provider = _get_provider_stats(by_type) 

41 top_memories = _get_top_memories(conn, query_config, top_n) 

42 recent = _get_recent_accesses(conn, query_config, top_n) 

43 

44 return { 

45 "window_hours": hours, 

46 "total_accesses": total_accesses, 

47 "distinct_memories": distinct_memories, 

48 "by_type": by_type, 

49 "by_provider": by_provider, 

50 "top_memories": top_memories, 

51 "recent": recent, 

52 "filters": {"project": project, "namespace": namespace}, 

53 } 

54 except Exception as e: 

55 return { 

56 "error": f"Access log stats unavailable: {e}", 

57 "hint": "Ensure schema_v2 is enabled and memory_access_log exists.", 

58 } 

59 

60 

61def _build_query_config( 

62 cutoff: datetime, project: str | None, namespace: str | None 

63) -> dict[str, Any]: 

64 """Build configuration for queries.""" 

65 where = "l.timestamp >= ?" 

66 params: list[Any] = [cutoff] 

67 if project or namespace: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 where += " AND c.id = l.memory_id" 

69 if project: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 where += " AND c.project = ?" 

71 params.append(project) 

72 if namespace: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 where += " AND c.namespace = ?" 

74 params.append(namespace) 

75 

76 join_clause = ( 

77 "JOIN conversations_v2 c ON c.id=l.memory_id" if (project or namespace) else "" 

78 ) 

79 

80 return {"where": where, "params": params, "join_clause": join_clause} 

81 

82 

83def _get_total_accesses(conn: Any, config: dict[str, Any]) -> int: 

84 """Get total access count.""" 

85 total_sql = f"SELECT COUNT(*) FROM memory_access_log l {config['join_clause']} WHERE {config['where']}" 

86 total_result = conn.execute(total_sql, config["params"]).fetchone() 

87 return int(total_result[0]) if total_result else 0 

88 

89 

90def _get_distinct_memories(conn: Any, config: dict[str, Any]) -> int: 

91 """Get distinct memories count.""" 

92 distinct_sql = f"SELECT COUNT(DISTINCT l.memory_id) FROM memory_access_log l {config['join_clause']} WHERE {config['where']}" 

93 distinct_result = conn.execute(distinct_sql, config["params"]).fetchone() 

94 return int(distinct_result[0]) if distinct_result else 0 

95 

96 

97def _get_access_type_stats(conn: Any, config: dict[str, Any]) -> dict[str, int]: 

98 """Get access type statistics.""" 

99 by_type_sql = f"SELECT l.access_type, COUNT(*) FROM memory_access_log l {config['join_clause']} WHERE {config['where']} GROUP BY l.access_type" 

100 by_type_rows = conn.execute(by_type_sql, config["params"]).fetchall() 

101 return {str(r[0] or ""): int(r[1]) for r in by_type_rows} 

102 

103 

104def _get_provider_stats(by_type: dict[str, int]) -> dict[str, int]: 

105 """Parse provider usage from access type.""" 

106 by_provider: dict[str, int] = {} 

107 for k, v in by_type.items(): 

108 if k.startswith("extract:"): 

109 prov = k.split(":", 1)[1] or "unknown" 

110 by_provider[prov] = by_provider.get(prov, 0) + v 

111 return by_provider 

112 

113 

114def _get_top_memories( 

115 conn: Any, config: dict[str, Any], top_n: int 

116) -> list[dict[str, Any]]: 

117 """Get top accessed memories.""" 

118 top_sql = f""" 

119 SELECT l.memory_id, 

120 COUNT(*) AS cnt, 

121 MAX(l.timestamp) AS last_access, 

122 c.category, 

123 c.memory_tier, 

124 c.importance_score, 

125 c.project, 

126 c.namespace 

127 FROM memory_access_log l 

128 JOIN conversations_v2 c ON c.id = l.memory_id 

129 WHERE {config["where"]} 

130 GROUP BY l.memory_id, c.category, c.memory_tier, c.importance_score, c.project, c.namespace 

131 ORDER BY cnt DESC, last_access DESC 

132 LIMIT ? 

133 """ 

134 top_params = [*config["params"], top_n] 

135 top_rows = conn.execute(top_sql, top_params).fetchall() 

136 return [ 

137 { 

138 "memory_id": str(r[0]), 

139 "count": int(r[1]), 

140 "last_access": str(r[2]), 

141 "category": str(r[3]) if r[3] is not None else None, 

142 "memory_tier": str(r[4]) if r[4] is not None else None, 

143 "importance_score": float(r[5]) if r[5] is not None else None, 

144 "project": str(r[6]) if r[6] is not None else None, 

145 "namespace": str(r[7]) if r[7] is not None else None, 

146 } 

147 for r in top_rows 

148 ] 

149 

150 

151def _get_recent_accesses( 

152 conn: Any, config: dict[str, Any], top_n: int 

153) -> list[dict[str, Any]]: 

154 """Get recent access samples.""" 

155 recent_sql = f""" 

156 SELECT l.memory_id, l.access_type, l.timestamp 

157 FROM memory_access_log l {config["join_clause"]} 

158 WHERE {config["where"]} 

159 ORDER BY l.timestamp DESC 

160 LIMIT ? 

161 """ 

162 recent_params = [*config["params"], top_n] 

163 recent_rows = conn.execute(recent_sql, recent_params).fetchall() 

164 return [ 

165 { 

166 "memory_id": str(r[0]), 

167 "access_type": str(r[1]) if r[1] is not None else None, 

168 "timestamp": str(r[2]), 

169 } 

170 for r in recent_rows 

171 ]