Coverage for little_loops / cli / logs.py: 87%
236 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-logs: Discover and extract ll-relevant JSONL entries from ~/.claude/projects/."""
3from __future__ import annotations
5import argparse
6import json
7import re
8import shutil
9import sys
10import time
11from pathlib import Path
13from little_loops.cli.loop.info import ( # private symbol: cross-module coupling; verify signature on upgrade
14 _format_history_event,
15)
16from little_loops.cli.output import configure_output, use_color_enabled
17from little_loops.config import BRConfig
18from little_loops.logger import Logger
19from little_loops.user_messages import get_project_folder
21_COMMAND_NAME_RE = re.compile(r"<command-name>/ll:")
24def _is_ll_relevant(record: dict) -> bool:
25 """Return True if a JSONL record indicates ll activity.
27 Detects three signal types:
28 (a) queue-operation enqueue with /ll: content
29 (b) user records with <command-name>/ll: pattern in message content
30 """
31 record_type = record.get("type")
33 # (a) queue-operation: only enqueue records with /ll: content signal ll activity
34 if record_type == "queue-operation":
35 return (
36 record.get("operation") == "enqueue"
37 and isinstance(record.get("content"), str)
38 and record["content"].startswith("/ll:")
39 )
41 # (b) user records: check message content for <command-name>/ll: pattern
42 if record_type == "user":
43 message = record.get("message", {})
44 if not isinstance(message, dict):
45 return False
46 content = message.get("content")
47 if isinstance(content, str):
48 return bool(_COMMAND_NAME_RE.search(content))
49 if isinstance(content, list):
50 for block in content:
51 if isinstance(block, dict):
52 text = block.get("text", "")
53 if isinstance(text, str) and _COMMAND_NAME_RE.search(text):
54 return True
56 # (c) assistant records: check for Bash tool-use invoking an ll- command
57 if record_type == "assistant":
58 message = record.get("message", {})
59 content = message.get("content", [])
60 if isinstance(content, list):
61 for block in content:
62 if (
63 isinstance(block, dict)
64 and block.get("type") == "tool_use"
65 and block.get("name") == "Bash"
66 ):
67 cmd = block.get("input", {}).get("command", "")
68 if re.search(r"\bll-\w+", cmd):
69 return True
71 return False
74def _has_ll_activity(project_folder: Path) -> bool:
75 """Return True if any non-agent JSONL file in project_folder has ll activity."""
76 jsonl_files = [f for f in project_folder.glob("*.jsonl") if not f.name.startswith("agent-")]
78 for jsonl_file in jsonl_files:
79 try:
80 with open(jsonl_file, encoding="utf-8") as f:
81 for line in f:
82 line = line.strip()
83 if not line:
84 continue
85 try:
86 record = json.loads(line)
87 except json.JSONDecodeError:
88 continue
89 if _is_ll_relevant(record):
90 return True
91 except OSError:
92 continue
94 return False
97def discover_all_projects(logger: Logger) -> list[Path]:
98 """Discover all Claude projects with ll activity.
100 Iterates ~/.claude/projects/, decodes each directory name back to an
101 absolute path, checks for ll-relevant JSONL records, and returns a
102 sorted list of paths that exist on disk.
104 Args:
105 logger: Logger instance for warnings.
107 Returns:
108 Sorted list of decoded absolute paths for projects with ll activity.
109 """
110 claude_projects = Path.home() / ".claude" / "projects"
112 if not claude_projects.exists():
113 return []
115 results: list[Path] = []
117 for project_dir in claude_projects.iterdir():
118 if not project_dir.is_dir():
119 continue
121 # Decode directory name to path: "-Users-foo-bar" -> "/Users/foo/bar"
122 decoded_path = Path(project_dir.name.replace("-", "/"))
124 if not decoded_path.exists():
125 logger.warning(f"Decoded path does not exist: {decoded_path}")
126 continue
128 if _has_ll_activity(project_dir):
129 results.append(decoded_path)
131 return sorted(results)
134def _cmd_matches(record: dict, cmd: str) -> bool:
135 """Return True if record contains a Bash tool-use whose command includes cmd."""
136 message = record.get("message", {})
137 content = message.get("content", [])
138 if isinstance(content, list):
139 for block in content:
140 if (
141 isinstance(block, dict)
142 and block.get("type") == "tool_use"
143 and block.get("name") == "Bash"
144 ):
145 command = block.get("input", {}).get("command", "")
146 if cmd in command:
147 return True
148 return False
151def generate_index(logs_dir: Path) -> None:
152 """Generate logs/index.md summarising extracted projects."""
153 rows = []
155 if logs_dir.exists():
156 for subdir in sorted(logs_dir.iterdir()):
157 if not subdir.is_dir():
158 continue
160 jsonl_files = [f for f in subdir.glob("*.jsonl") if not f.name.startswith("agent-")]
161 if not jsonl_files:
162 continue
164 timestamps: list[str] = []
165 for jsonl_file in jsonl_files:
166 try:
167 with open(jsonl_file, encoding="utf-8") as f:
168 for line in f:
169 line = line.strip()
170 if not line:
171 continue
172 try:
173 record = json.loads(line)
174 except json.JSONDecodeError:
175 continue
176 ts = record.get("timestamp")
177 if ts:
178 timestamps.append(ts)
179 except OSError:
180 continue
182 if timestamps:
183 earliest = min(timestamps)[:10]
184 latest = max(timestamps)[:10]
185 date_range = f"{earliest} – {latest}" if earliest != latest else earliest
186 else:
187 date_range = ""
189 rows.append((subdir.name, len(jsonl_files), date_range))
191 lines = ["# Logs Index", ""]
192 if rows:
193 lines.append("| Project | Sessions | Date Range |")
194 lines.append("|---------|----------|------------|")
195 for name, count, date_range in rows:
196 lines.append(f"| {name} | {count} | {date_range} |")
197 else:
198 lines.append("*No projects extracted yet.*")
199 lines.append("")
201 logs_dir.mkdir(parents=True, exist_ok=True)
202 (logs_dir / "index.md").write_text("\n".join(lines), encoding="utf-8")
205def _cmd_extract(args: argparse.Namespace, logger: Logger) -> int:
206 """Extract ll-relevant JSONL records to logs/<slug>/<session-id>.jsonl."""
207 if args.project:
208 cwd_path: Path = args.project
209 project_folder = get_project_folder(cwd_path)
210 if project_folder is None:
211 logger.error(f"No Claude project folder found for: {cwd_path}")
212 logger.error(f"Expected: ~/.claude/projects/{str(cwd_path).replace('/', '-')}")
213 return 1
214 project_items = [(cwd_path, project_folder)]
215 else:
216 decoded_paths = discover_all_projects(logger)
217 project_items = []
218 for decoded_path in decoded_paths:
219 folder = get_project_folder(decoded_path)
220 if folder is not None:
221 project_items.append((decoded_path, folder))
223 for cwd_path, project_folder in project_items:
224 slug = cwd_path.resolve().name
225 buckets: dict[str, list[dict]] = {}
227 jsonl_files = [f for f in project_folder.glob("*.jsonl") if not f.name.startswith("agent-")]
228 for jsonl_file in jsonl_files:
229 try:
230 with open(jsonl_file, encoding="utf-8") as f:
231 for line in f:
232 line = line.strip()
233 if not line:
234 continue
235 try:
236 record = json.loads(line)
237 except json.JSONDecodeError:
238 continue
239 if _is_ll_relevant(record):
240 session_id = record.get("sessionId", "")
241 buckets.setdefault(session_id, []).append(record)
242 except OSError:
243 continue
245 if args.cmd:
246 filtered: dict[str, list[dict]] = {}
247 for session_id, records in buckets.items():
248 matching = [r for r in records if _cmd_matches(r, args.cmd)]
249 if matching:
250 filtered[session_id] = matching
251 buckets = filtered
253 out_base = Path.cwd() / "logs" / slug
254 for session_id, records in buckets.items():
255 out_file = out_base / f"{session_id}.jsonl"
256 out_file.parent.mkdir(parents=True, exist_ok=True)
257 with open(out_file, "w", encoding="utf-8") as f:
258 for record in records:
259 f.write(json.dumps(record) + "\n")
261 generate_index(Path.cwd() / "logs")
262 return 0
265def _cmd_tail(args: argparse.Namespace, loops_dir: Path) -> int:
266 """Stream live events from an active loop session."""
267 events_file = loops_dir / ".running" / f"{args.loop}.events.jsonl"
269 if not events_file.exists():
270 print(f"No active session for loop '{args.loop}'", file=sys.stderr)
271 return 1
273 width = shutil.get_terminal_size().columns
274 try:
275 with open(events_file, encoding="utf-8") as f:
276 f.seek(0, 2)
277 while True:
278 line = f.readline()
279 if line:
280 line = line.strip()
281 if line:
282 try:
283 event = json.loads(line)
284 except json.JSONDecodeError:
285 continue
286 formatted = _format_history_event(event, verbose=False, width=width)
287 if formatted is not None:
288 print(formatted)
289 else:
290 time.sleep(0.1)
291 except KeyboardInterrupt:
292 return 0
294 return 0
297def _build_parser() -> argparse.ArgumentParser:
298 """Build the argument parser for ll-logs."""
299 parser = argparse.ArgumentParser(
300 prog="ll-logs",
301 description="Discover and extract ll-relevant JSONL entries from Claude Code logs",
302 formatter_class=argparse.RawDescriptionHelpFormatter,
303 epilog="""
304Examples:
305 %(prog)s discover # List all projects with ll activity
306 %(prog)s tail --loop <name> # Stream live events from an active loop session
307 %(prog)s extract --all # Extract all projects to logs/
308 %(prog)s extract --project /path # Extract one project to logs/<slug>/
309 %(prog)s extract --all --cmd ll-history # Filter to ll-history invocations
310""",
311 )
313 subparsers = parser.add_subparsers(dest="command", help="Available commands")
314 subparsers.add_parser(
315 "discover",
316 help="List all Claude projects with ll activity (one path per line, sorted)",
317 )
319 tail_parser = subparsers.add_parser(
320 "tail",
321 help="Stream live events from an active loop session",
322 )
323 tail_parser.add_argument("--loop", required=True, metavar="NAME", help="Loop name to tail")
325 extract_parser = subparsers.add_parser(
326 "extract",
327 help="Extract ll-relevant JSONL records to logs/<slug>/<session-id>.jsonl",
328 )
329 target_group = extract_parser.add_mutually_exclusive_group(required=True)
330 target_group.add_argument(
331 "--project",
332 type=Path,
333 metavar="DIR",
334 help="Working directory of the target project",
335 )
336 target_group.add_argument(
337 "--all",
338 action="store_true",
339 help="Extract all projects with ll activity",
340 )
341 extract_parser.add_argument(
342 "--cmd",
343 metavar="TOOL",
344 help="Filter to records containing this ll- tool name (e.g. ll-history)",
345 )
347 return parser
350def _parse_args() -> argparse.Namespace:
351 """Parse command-line arguments. Exposed for testing."""
352 return _build_parser().parse_args()
355def main_logs() -> int:
356 """Entry point for ll-logs command.
358 Returns:
359 0 on success, 1 when no subcommand given or on error.
360 """
361 configure_output()
362 logger = Logger(use_color=use_color_enabled())
364 parser = _build_parser()
365 args = parser.parse_args()
367 if not args.command:
368 parser.print_help()
369 return 1
371 if args.command == "discover":
372 projects = discover_all_projects(logger)
373 for path in projects:
374 print(path)
375 return 0
377 if args.command == "tail":
378 config = BRConfig(Path.cwd())
379 loops_dir = Path(config.loops.loops_dir)
380 return _cmd_tail(args, loops_dir)
382 if args.command == "extract":
383 return _cmd_extract(args, logger)
385 return 1