Coverage for mcp_bridge/tools/session_manager.py: 5%

155 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2026-01-10 00:20 -0500

1""" 

2Session Manager Tools 

3 

4Tools for navigating and searching Claude Code session history. 

5Sessions are stored in ~/.claude/projects/ as JSONL files. 

6""" 

7 

8import json 

9from datetime import datetime 

10from pathlib import Path 

11 

12 

13def get_sessions_directory() -> Path: 

14 """Get the Claude sessions directory.""" 

15 return Path.home() / ".claude" / "projects" 

16 

17 

18def list_sessions( 

19 project_path: str | None = None, 

20 limit: int = 20, 

21 from_date: str | None = None, 

22 to_date: str | None = None, 

23) -> str: 

24 """ 

25 List Claude Code sessions with optional filtering. 

26  

27 Args: 

28 project_path: Filter by project path 

29 limit: Maximum sessions to return 

30 from_date: Filter from date (ISO format) 

31 to_date: Filter until date (ISO format) 

32  

33 Returns: 

34 Formatted list of sessions. 

35 """ 

36 sessions_dir = get_sessions_directory() 

37 if not sessions_dir.exists(): 

38 return "No sessions directory found" 

39 

40 sessions = [] 

41 

42 # Walk through project directories 

43 for project_dir in sessions_dir.iterdir(): 

44 if not project_dir.is_dir(): 

45 continue 

46 

47 # Check project path filter 

48 if project_path: 

49 # Project dirs are hashed, so we'd need a mapping 

50 # For now, skip this filter 

51 pass 

52 

53 # Find session files 

54 for session_file in project_dir.glob("*.jsonl"): 

55 try: 

56 stat = session_file.stat() 

57 mtime = datetime.fromtimestamp(stat.st_mtime) 

58 

59 # Date filters 

60 if from_date: 

61 from_dt = datetime.fromisoformat(from_date) 

62 if mtime < from_dt: 

63 continue 

64 if to_date: 

65 to_dt = datetime.fromisoformat(to_date) 

66 if mtime > to_dt: 

67 continue 

68 

69 sessions.append({ 

70 "id": session_file.stem, 

71 "path": str(session_file), 

72 "project": project_dir.name, 

73 "modified": mtime.isoformat(), 

74 "size": stat.st_size, 

75 }) 

76 except Exception: 

77 continue 

78 

79 # Sort by modified time, newest first 

80 sessions.sort(key=lambda s: s["modified"], reverse=True) 

81 sessions = sessions[:limit] 

82 

83 if not sessions: 

84 return "No sessions found" 

85 

86 lines = [f"Found {len(sessions)} sessions:\n"] 

87 for s in sessions: 

88 lines.append(f" {s['id'][:12]}... ({s['modified'][:10]})") 

89 

90 return "\n".join(lines) 

91 

92 

93def read_session( 

94 session_id: str, 

95 limit: int | None = None, 

96 include_metadata: bool = False, 

97) -> str: 

98 """ 

99 Read messages from a session. 

100  

101 Args: 

102 session_id: Session ID (filename stem) 

103 limit: Maximum messages to read 

104 include_metadata: Include message metadata 

105  

106 Returns: 

107 Formatted session content. 

108 """ 

109 sessions_dir = get_sessions_directory() 

110 

111 # Find session file 

112 session_file = None 

113 for project_dir in sessions_dir.iterdir(): 

114 if not project_dir.is_dir(): 

115 continue 

116 candidate = project_dir / f"{session_id}.jsonl" 

117 if candidate.exists(): 

118 session_file = candidate 

119 break 

120 # Also check partial matches 

121 for f in project_dir.glob(f"{session_id}*.jsonl"): 

122 session_file = f 

123 break 

124 

125 if not session_file or not session_file.exists(): 

126 return f"Session not found: {session_id}" 

127 

128 messages = [] 

129 try: 

130 with open(session_file) as f: 

131 for line in f: 

132 if line.strip(): 

133 try: 

134 msg = json.loads(line) 

135 messages.append(msg) 

136 except json.JSONDecodeError: 

137 continue 

138 except Exception as e: 

139 return f"Error reading session: {e}" 

140 

141 if limit and limit > 0: 

142 messages = messages[:limit] 

143 

144 if not messages: 

145 return "Session is empty" 

146 

147 lines = [f"Session: {session_id}\nMessages: {len(messages)}\n"] 

148 

149 for i, msg in enumerate(messages[:50]): # Limit display 

150 role = msg.get("role", "unknown") 

151 content = msg.get("content", "") 

152 if isinstance(content, list): 

153 content = " ".join(str(c.get("text", "")) for c in content if isinstance(c, dict)) 

154 content = content[:200] + "..." if len(content) > 200 else content 

155 lines.append(f"[{i+1}] {role}: {content}") 

156 

157 if len(messages) > 50: 

158 lines.append(f"\n... and {len(messages) - 50} more messages") 

159 

160 return "\n".join(lines) 

161 

162 

163def search_sessions( 

164 query: str, 

165 session_id: str | None = None, 

166 case_sensitive: bool = False, 

167 limit: int = 20, 

168) -> str: 

169 """ 

170 Search across session messages. 

171  

172 Args: 

173 query: Search query 

174 session_id: Search in specific session only 

175 case_sensitive: Case-sensitive search 

176 limit: Maximum results 

177  

178 Returns: 

179 Search results with context. 

180 """ 

181 sessions_dir = get_sessions_directory() 

182 results = [] 

183 

184 search_query = query if case_sensitive else query.lower() 

185 

186 # Find session files to search 

187 session_files = [] 

188 for project_dir in sessions_dir.iterdir(): 

189 if not project_dir.is_dir(): 

190 continue 

191 

192 if session_id: 

193 for f in project_dir.glob(f"{session_id}*.jsonl"): 

194 session_files.append(f) 

195 else: 

196 session_files.extend(project_dir.glob("*.jsonl")) 

197 

198 for session_file in session_files[:50]: # Limit sessions to search 

199 try: 

200 with open(session_file) as f: 

201 for line_num, line in enumerate(f): 

202 if not line.strip(): 

203 continue 

204 

205 check_line = line if case_sensitive else line.lower() 

206 if search_query in check_line: 

207 try: 

208 msg = json.loads(line) 

209 content = msg.get("content", "") 

210 if isinstance(content, list): 

211 content = " ".join( 

212 str(c.get("text", "")) for c in content if isinstance(c, dict) 

213 ) 

214 

215 results.append({ 

216 "session": session_file.stem[:12], 

217 "line": line_num, 

218 "role": msg.get("role", "unknown"), 

219 "snippet": content[:150], 

220 }) 

221 

222 if len(results) >= limit: 

223 break 

224 except json.JSONDecodeError: 

225 continue 

226 except Exception: 

227 continue 

228 

229 if len(results) >= limit: 

230 break 

231 

232 if not results: 

233 return f"No results for: {query}" 

234 

235 lines = [f"Found {len(results)} matches for '{query}':\n"] 

236 for r in results: 

237 lines.append(f" [{r['session']}] {r['role']}: {r['snippet']}...") 

238 

239 return "\n".join(lines) 

240 

241 

242def get_session_info(session_id: str) -> str: 

243 """ 

244 Get metadata about a session. 

245  

246 Args: 

247 session_id: Session ID 

248  

249 Returns: 

250 Session metadata and statistics. 

251 """ 

252 sessions_dir = get_sessions_directory() 

253 

254 # Find session file 

255 session_file = None 

256 for project_dir in sessions_dir.iterdir(): 

257 if not project_dir.is_dir(): 

258 continue 

259 for f in project_dir.glob(f"{session_id}*.jsonl"): 

260 session_file = f 

261 break 

262 if session_file: 

263 break 

264 

265 if not session_file or not session_file.exists(): 

266 return f"Session not found: {session_id}" 

267 

268 try: 

269 stat = session_file.stat() 

270 message_count = 0 

271 user_count = 0 

272 assistant_count = 0 

273 

274 with open(session_file) as f: 

275 for line in f: 

276 if line.strip(): 

277 try: 

278 msg = json.loads(line) 

279 message_count += 1 

280 role = msg.get("role", "") 

281 if role == "user": 

282 user_count += 1 

283 elif role == "assistant": 

284 assistant_count += 1 

285 except json.JSONDecodeError: 

286 continue 

287 

288 lines = [ 

289 f"Session: {session_id}", 

290 f"File: {session_file}", 

291 f"Size: {stat.st_size / 1024:.1f} KB", 

292 f"Modified: {datetime.fromtimestamp(stat.st_mtime).isoformat()}", 

293 f"Messages: {message_count}", 

294 f" User: {user_count}", 

295 f" Assistant: {assistant_count}", 

296 ] 

297 return "\n".join(lines) 

298 

299 except Exception as e: 

300 return f"Error: {e}"