Coverage for mcp_bridge/tools/session_manager.py: 5%
155 statements
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
1"""
2Session Manager Tools
4Tools for navigating and searching Claude Code session history.
5Sessions are stored in ~/.claude/projects/ as JSONL files.
6"""
8import json
9from datetime import datetime
10from pathlib import Path
13def get_sessions_directory() -> Path:
14 """Get the Claude sessions directory."""
15 return Path.home() / ".claude" / "projects"
18def list_sessions(
19 project_path: str | None = None,
20 limit: int = 20,
21 from_date: str | None = None,
22 to_date: str | None = None,
23) -> str:
24 """
25 List Claude Code sessions with optional filtering.
27 Args:
28 project_path: Filter by project path
29 limit: Maximum sessions to return
30 from_date: Filter from date (ISO format)
31 to_date: Filter until date (ISO format)
33 Returns:
34 Formatted list of sessions.
35 """
36 sessions_dir = get_sessions_directory()
37 if not sessions_dir.exists():
38 return "No sessions directory found"
40 sessions = []
42 # Walk through project directories
43 for project_dir in sessions_dir.iterdir():
44 if not project_dir.is_dir():
45 continue
47 # Check project path filter
48 if project_path:
49 # Project dirs are hashed, so we'd need a mapping
50 # For now, skip this filter
51 pass
53 # Find session files
54 for session_file in project_dir.glob("*.jsonl"):
55 try:
56 stat = session_file.stat()
57 mtime = datetime.fromtimestamp(stat.st_mtime)
59 # Date filters
60 if from_date:
61 from_dt = datetime.fromisoformat(from_date)
62 if mtime < from_dt:
63 continue
64 if to_date:
65 to_dt = datetime.fromisoformat(to_date)
66 if mtime > to_dt:
67 continue
69 sessions.append({
70 "id": session_file.stem,
71 "path": str(session_file),
72 "project": project_dir.name,
73 "modified": mtime.isoformat(),
74 "size": stat.st_size,
75 })
76 except Exception:
77 continue
79 # Sort by modified time, newest first
80 sessions.sort(key=lambda s: s["modified"], reverse=True)
81 sessions = sessions[:limit]
83 if not sessions:
84 return "No sessions found"
86 lines = [f"Found {len(sessions)} sessions:\n"]
87 for s in sessions:
88 lines.append(f" {s['id'][:12]}... ({s['modified'][:10]})")
90 return "\n".join(lines)
93def read_session(
94 session_id: str,
95 limit: int | None = None,
96 include_metadata: bool = False,
97) -> str:
98 """
99 Read messages from a session.
101 Args:
102 session_id: Session ID (filename stem)
103 limit: Maximum messages to read
104 include_metadata: Include message metadata
106 Returns:
107 Formatted session content.
108 """
109 sessions_dir = get_sessions_directory()
111 # Find session file
112 session_file = None
113 for project_dir in sessions_dir.iterdir():
114 if not project_dir.is_dir():
115 continue
116 candidate = project_dir / f"{session_id}.jsonl"
117 if candidate.exists():
118 session_file = candidate
119 break
120 # Also check partial matches
121 for f in project_dir.glob(f"{session_id}*.jsonl"):
122 session_file = f
123 break
125 if not session_file or not session_file.exists():
126 return f"Session not found: {session_id}"
128 messages = []
129 try:
130 with open(session_file) as f:
131 for line in f:
132 if line.strip():
133 try:
134 msg = json.loads(line)
135 messages.append(msg)
136 except json.JSONDecodeError:
137 continue
138 except Exception as e:
139 return f"Error reading session: {e}"
141 if limit and limit > 0:
142 messages = messages[:limit]
144 if not messages:
145 return "Session is empty"
147 lines = [f"Session: {session_id}\nMessages: {len(messages)}\n"]
149 for i, msg in enumerate(messages[:50]): # Limit display
150 role = msg.get("role", "unknown")
151 content = msg.get("content", "")
152 if isinstance(content, list):
153 content = " ".join(str(c.get("text", "")) for c in content if isinstance(c, dict))
154 content = content[:200] + "..." if len(content) > 200 else content
155 lines.append(f"[{i+1}] {role}: {content}")
157 if len(messages) > 50:
158 lines.append(f"\n... and {len(messages) - 50} more messages")
160 return "\n".join(lines)
163def search_sessions(
164 query: str,
165 session_id: str | None = None,
166 case_sensitive: bool = False,
167 limit: int = 20,
168) -> str:
169 """
170 Search across session messages.
172 Args:
173 query: Search query
174 session_id: Search in specific session only
175 case_sensitive: Case-sensitive search
176 limit: Maximum results
178 Returns:
179 Search results with context.
180 """
181 sessions_dir = get_sessions_directory()
182 results = []
184 search_query = query if case_sensitive else query.lower()
186 # Find session files to search
187 session_files = []
188 for project_dir in sessions_dir.iterdir():
189 if not project_dir.is_dir():
190 continue
192 if session_id:
193 for f in project_dir.glob(f"{session_id}*.jsonl"):
194 session_files.append(f)
195 else:
196 session_files.extend(project_dir.glob("*.jsonl"))
198 for session_file in session_files[:50]: # Limit sessions to search
199 try:
200 with open(session_file) as f:
201 for line_num, line in enumerate(f):
202 if not line.strip():
203 continue
205 check_line = line if case_sensitive else line.lower()
206 if search_query in check_line:
207 try:
208 msg = json.loads(line)
209 content = msg.get("content", "")
210 if isinstance(content, list):
211 content = " ".join(
212 str(c.get("text", "")) for c in content if isinstance(c, dict)
213 )
215 results.append({
216 "session": session_file.stem[:12],
217 "line": line_num,
218 "role": msg.get("role", "unknown"),
219 "snippet": content[:150],
220 })
222 if len(results) >= limit:
223 break
224 except json.JSONDecodeError:
225 continue
226 except Exception:
227 continue
229 if len(results) >= limit:
230 break
232 if not results:
233 return f"No results for: {query}"
235 lines = [f"Found {len(results)} matches for '{query}':\n"]
236 for r in results:
237 lines.append(f" [{r['session']}] {r['role']}: {r['snippet']}...")
239 return "\n".join(lines)
242def get_session_info(session_id: str) -> str:
243 """
244 Get metadata about a session.
246 Args:
247 session_id: Session ID
249 Returns:
250 Session metadata and statistics.
251 """
252 sessions_dir = get_sessions_directory()
254 # Find session file
255 session_file = None
256 for project_dir in sessions_dir.iterdir():
257 if not project_dir.is_dir():
258 continue
259 for f in project_dir.glob(f"{session_id}*.jsonl"):
260 session_file = f
261 break
262 if session_file:
263 break
265 if not session_file or not session_file.exists():
266 return f"Session not found: {session_id}"
268 try:
269 stat = session_file.stat()
270 message_count = 0
271 user_count = 0
272 assistant_count = 0
274 with open(session_file) as f:
275 for line in f:
276 if line.strip():
277 try:
278 msg = json.loads(line)
279 message_count += 1
280 role = msg.get("role", "")
281 if role == "user":
282 user_count += 1
283 elif role == "assistant":
284 assistant_count += 1
285 except json.JSONDecodeError:
286 continue
288 lines = [
289 f"Session: {session_id}",
290 f"File: {session_file}",
291 f"Size: {stat.st_size / 1024:.1f} KB",
292 f"Modified: {datetime.fromtimestamp(stat.st_mtime).isoformat()}",
293 f"Messages: {message_count}",
294 f" User: {user_count}",
295 f" Assistant: {assistant_count}",
296 ]
297 return "\n".join(lines)
299 except Exception as e:
300 return f"Error: {e}"