Coverage for little_loops / cli / logs.py: 87%

236 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-logs: Discover and extract ll-relevant JSONL entries from ~/.claude/projects/.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import json 

7import re 

8import shutil 

9import sys 

10import time 

11from pathlib import Path 

12 

13from little_loops.cli.loop.info import ( # private symbol: cross-module coupling; verify signature on upgrade 

14 _format_history_event, 

15) 

16from little_loops.cli.output import configure_output, use_color_enabled 

17from little_loops.config import BRConfig 

18from little_loops.logger import Logger 

19from little_loops.user_messages import get_project_folder 

20 

21_COMMAND_NAME_RE = re.compile(r"<command-name>/ll:") 

22 

23 

24def _is_ll_relevant(record: dict) -> bool: 

25 """Return True if a JSONL record indicates ll activity. 

26 

27 Detects three signal types: 

28 (a) queue-operation enqueue with /ll: content 

29 (b) user records with <command-name>/ll: pattern in message content 

30 """ 

31 record_type = record.get("type") 

32 

33 # (a) queue-operation: only enqueue records with /ll: content signal ll activity 

34 if record_type == "queue-operation": 

35 return ( 

36 record.get("operation") == "enqueue" 

37 and isinstance(record.get("content"), str) 

38 and record["content"].startswith("/ll:") 

39 ) 

40 

41 # (b) user records: check message content for <command-name>/ll: pattern 

42 if record_type == "user": 

43 message = record.get("message", {}) 

44 if not isinstance(message, dict): 

45 return False 

46 content = message.get("content") 

47 if isinstance(content, str): 

48 return bool(_COMMAND_NAME_RE.search(content)) 

49 if isinstance(content, list): 

50 for block in content: 

51 if isinstance(block, dict): 

52 text = block.get("text", "") 

53 if isinstance(text, str) and _COMMAND_NAME_RE.search(text): 

54 return True 

55 

56 # (c) assistant records: check for Bash tool-use invoking an ll- command 

57 if record_type == "assistant": 

58 message = record.get("message", {}) 

59 content = message.get("content", []) 

60 if isinstance(content, list): 

61 for block in content: 

62 if ( 

63 isinstance(block, dict) 

64 and block.get("type") == "tool_use" 

65 and block.get("name") == "Bash" 

66 ): 

67 cmd = block.get("input", {}).get("command", "") 

68 if re.search(r"\bll-\w+", cmd): 

69 return True 

70 

71 return False 

72 

73 

74def _has_ll_activity(project_folder: Path) -> bool: 

75 """Return True if any non-agent JSONL file in project_folder has ll activity.""" 

76 jsonl_files = [f for f in project_folder.glob("*.jsonl") if not f.name.startswith("agent-")] 

77 

78 for jsonl_file in jsonl_files: 

79 try: 

80 with open(jsonl_file, encoding="utf-8") as f: 

81 for line in f: 

82 line = line.strip() 

83 if not line: 

84 continue 

85 try: 

86 record = json.loads(line) 

87 except json.JSONDecodeError: 

88 continue 

89 if _is_ll_relevant(record): 

90 return True 

91 except OSError: 

92 continue 

93 

94 return False 

95 

96 

97def discover_all_projects(logger: Logger) -> list[Path]: 

98 """Discover all Claude projects with ll activity. 

99 

100 Iterates ~/.claude/projects/, decodes each directory name back to an 

101 absolute path, checks for ll-relevant JSONL records, and returns a 

102 sorted list of paths that exist on disk. 

103 

104 Args: 

105 logger: Logger instance for warnings. 

106 

107 Returns: 

108 Sorted list of decoded absolute paths for projects with ll activity. 

109 """ 

110 claude_projects = Path.home() / ".claude" / "projects" 

111 

112 if not claude_projects.exists(): 

113 return [] 

114 

115 results: list[Path] = [] 

116 

117 for project_dir in claude_projects.iterdir(): 

118 if not project_dir.is_dir(): 

119 continue 

120 

121 # Decode directory name to path: "-Users-foo-bar" -> "/Users/foo/bar" 

122 decoded_path = Path(project_dir.name.replace("-", "/")) 

123 

124 if not decoded_path.exists(): 

125 logger.warning(f"Decoded path does not exist: {decoded_path}") 

126 continue 

127 

128 if _has_ll_activity(project_dir): 

129 results.append(decoded_path) 

130 

131 return sorted(results) 

132 

133 

134def _cmd_matches(record: dict, cmd: str) -> bool: 

135 """Return True if record contains a Bash tool-use whose command includes cmd.""" 

136 message = record.get("message", {}) 

137 content = message.get("content", []) 

138 if isinstance(content, list): 

139 for block in content: 

140 if ( 

141 isinstance(block, dict) 

142 and block.get("type") == "tool_use" 

143 and block.get("name") == "Bash" 

144 ): 

145 command = block.get("input", {}).get("command", "") 

146 if cmd in command: 

147 return True 

148 return False 

149 

150 

151def generate_index(logs_dir: Path) -> None: 

152 """Generate logs/index.md summarising extracted projects.""" 

153 rows = [] 

154 

155 if logs_dir.exists(): 

156 for subdir in sorted(logs_dir.iterdir()): 

157 if not subdir.is_dir(): 

158 continue 

159 

160 jsonl_files = [f for f in subdir.glob("*.jsonl") if not f.name.startswith("agent-")] 

161 if not jsonl_files: 

162 continue 

163 

164 timestamps: list[str] = [] 

165 for jsonl_file in jsonl_files: 

166 try: 

167 with open(jsonl_file, encoding="utf-8") as f: 

168 for line in f: 

169 line = line.strip() 

170 if not line: 

171 continue 

172 try: 

173 record = json.loads(line) 

174 except json.JSONDecodeError: 

175 continue 

176 ts = record.get("timestamp") 

177 if ts: 

178 timestamps.append(ts) 

179 except OSError: 

180 continue 

181 

182 if timestamps: 

183 earliest = min(timestamps)[:10] 

184 latest = max(timestamps)[:10] 

185 date_range = f"{earliest}{latest}" if earliest != latest else earliest 

186 else: 

187 date_range = "" 

188 

189 rows.append((subdir.name, len(jsonl_files), date_range)) 

190 

191 lines = ["# Logs Index", ""] 

192 if rows: 

193 lines.append("| Project | Sessions | Date Range |") 

194 lines.append("|---------|----------|------------|") 

195 for name, count, date_range in rows: 

196 lines.append(f"| {name} | {count} | {date_range} |") 

197 else: 

198 lines.append("*No projects extracted yet.*") 

199 lines.append("") 

200 

201 logs_dir.mkdir(parents=True, exist_ok=True) 

202 (logs_dir / "index.md").write_text("\n".join(lines), encoding="utf-8") 

203 

204 

205def _cmd_extract(args: argparse.Namespace, logger: Logger) -> int: 

206 """Extract ll-relevant JSONL records to logs/<slug>/<session-id>.jsonl.""" 

207 if args.project: 

208 cwd_path: Path = args.project 

209 project_folder = get_project_folder(cwd_path) 

210 if project_folder is None: 

211 logger.error(f"No Claude project folder found for: {cwd_path}") 

212 logger.error(f"Expected: ~/.claude/projects/{str(cwd_path).replace('/', '-')}") 

213 return 1 

214 project_items = [(cwd_path, project_folder)] 

215 else: 

216 decoded_paths = discover_all_projects(logger) 

217 project_items = [] 

218 for decoded_path in decoded_paths: 

219 folder = get_project_folder(decoded_path) 

220 if folder is not None: 

221 project_items.append((decoded_path, folder)) 

222 

223 for cwd_path, project_folder in project_items: 

224 slug = cwd_path.resolve().name 

225 buckets: dict[str, list[dict]] = {} 

226 

227 jsonl_files = [f for f in project_folder.glob("*.jsonl") if not f.name.startswith("agent-")] 

228 for jsonl_file in jsonl_files: 

229 try: 

230 with open(jsonl_file, encoding="utf-8") as f: 

231 for line in f: 

232 line = line.strip() 

233 if not line: 

234 continue 

235 try: 

236 record = json.loads(line) 

237 except json.JSONDecodeError: 

238 continue 

239 if _is_ll_relevant(record): 

240 session_id = record.get("sessionId", "") 

241 buckets.setdefault(session_id, []).append(record) 

242 except OSError: 

243 continue 

244 

245 if args.cmd: 

246 filtered: dict[str, list[dict]] = {} 

247 for session_id, records in buckets.items(): 

248 matching = [r for r in records if _cmd_matches(r, args.cmd)] 

249 if matching: 

250 filtered[session_id] = matching 

251 buckets = filtered 

252 

253 out_base = Path.cwd() / "logs" / slug 

254 for session_id, records in buckets.items(): 

255 out_file = out_base / f"{session_id}.jsonl" 

256 out_file.parent.mkdir(parents=True, exist_ok=True) 

257 with open(out_file, "w", encoding="utf-8") as f: 

258 for record in records: 

259 f.write(json.dumps(record) + "\n") 

260 

261 generate_index(Path.cwd() / "logs") 

262 return 0 

263 

264 

265def _cmd_tail(args: argparse.Namespace, loops_dir: Path) -> int: 

266 """Stream live events from an active loop session.""" 

267 events_file = loops_dir / ".running" / f"{args.loop}.events.jsonl" 

268 

269 if not events_file.exists(): 

270 print(f"No active session for loop '{args.loop}'", file=sys.stderr) 

271 return 1 

272 

273 width = shutil.get_terminal_size().columns 

274 try: 

275 with open(events_file, encoding="utf-8") as f: 

276 f.seek(0, 2) 

277 while True: 

278 line = f.readline() 

279 if line: 

280 line = line.strip() 

281 if line: 

282 try: 

283 event = json.loads(line) 

284 except json.JSONDecodeError: 

285 continue 

286 formatted = _format_history_event(event, verbose=False, width=width) 

287 if formatted is not None: 

288 print(formatted) 

289 else: 

290 time.sleep(0.1) 

291 except KeyboardInterrupt: 

292 return 0 

293 

294 return 0 

295 

296 

297def _build_parser() -> argparse.ArgumentParser: 

298 """Build the argument parser for ll-logs.""" 

299 parser = argparse.ArgumentParser( 

300 prog="ll-logs", 

301 description="Discover and extract ll-relevant JSONL entries from Claude Code logs", 

302 formatter_class=argparse.RawDescriptionHelpFormatter, 

303 epilog=""" 

304Examples: 

305 %(prog)s discover # List all projects with ll activity 

306 %(prog)s tail --loop <name> # Stream live events from an active loop session 

307 %(prog)s extract --all # Extract all projects to logs/ 

308 %(prog)s extract --project /path # Extract one project to logs/<slug>/ 

309 %(prog)s extract --all --cmd ll-history # Filter to ll-history invocations 

310""", 

311 ) 

312 

313 subparsers = parser.add_subparsers(dest="command", help="Available commands") 

314 subparsers.add_parser( 

315 "discover", 

316 help="List all Claude projects with ll activity (one path per line, sorted)", 

317 ) 

318 

319 tail_parser = subparsers.add_parser( 

320 "tail", 

321 help="Stream live events from an active loop session", 

322 ) 

323 tail_parser.add_argument("--loop", required=True, metavar="NAME", help="Loop name to tail") 

324 

325 extract_parser = subparsers.add_parser( 

326 "extract", 

327 help="Extract ll-relevant JSONL records to logs/<slug>/<session-id>.jsonl", 

328 ) 

329 target_group = extract_parser.add_mutually_exclusive_group(required=True) 

330 target_group.add_argument( 

331 "--project", 

332 type=Path, 

333 metavar="DIR", 

334 help="Working directory of the target project", 

335 ) 

336 target_group.add_argument( 

337 "--all", 

338 action="store_true", 

339 help="Extract all projects with ll activity", 

340 ) 

341 extract_parser.add_argument( 

342 "--cmd", 

343 metavar="TOOL", 

344 help="Filter to records containing this ll- tool name (e.g. ll-history)", 

345 ) 

346 

347 return parser 

348 

349 

350def _parse_args() -> argparse.Namespace: 

351 """Parse command-line arguments. Exposed for testing.""" 

352 return _build_parser().parse_args() 

353 

354 

355def main_logs() -> int: 

356 """Entry point for ll-logs command. 

357 

358 Returns: 

359 0 on success, 1 when no subcommand given or on error. 

360 """ 

361 configure_output() 

362 logger = Logger(use_color=use_color_enabled()) 

363 

364 parser = _build_parser() 

365 args = parser.parse_args() 

366 

367 if not args.command: 

368 parser.print_help() 

369 return 1 

370 

371 if args.command == "discover": 

372 projects = discover_all_projects(logger) 

373 for path in projects: 

374 print(path) 

375 return 0 

376 

377 if args.command == "tail": 

378 config = BRConfig(Path.cwd()) 

379 loops_dir = Path(config.loops.loops_dir) 

380 return _cmd_tail(args, loops_dir) 

381 

382 if args.command == "extract": 

383 return _cmd_extract(args, logger) 

384 

385 return 1