Coverage for little_loops / user_messages.py: 92%
332 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Extract and analyze user messages from Claude Code logs.
3Provides functionality to extract user messages from Claude Code session
4logs stored in ~/.claude/projects/.
6Usage as CLI:
7 ll-messages # Last 100 messages to file
8 ll-messages -n 50 # Last 50 messages
9 ll-messages --since 2026-01-01 # Since date
10 ll-messages -o output.jsonl # Custom output path
11 ll-messages --stdout # Print to terminal instead of file
13Usage as library:
14 from little_loops.user_messages import extract_user_messages, get_project_folder
16 project_folder = get_project_folder()
17 messages = extract_user_messages(project_folder, limit=50)
18"""
20from __future__ import annotations
22import json
23from dataclasses import dataclass
24from datetime import datetime
25from pathlib import Path
27__all__ = [
28 "UserMessage",
29 "ResponseMetadata",
30 "CommandRecord",
31 "ExampleRecord",
32 "get_project_folder",
33 "extract_user_messages",
34 "extract_commands",
35 "build_examples",
36 "save_messages",
37]
40@dataclass
41class UserMessage:
42 """Extracted user message with metadata.
44 Attributes:
45 content: The text content of the user message
46 timestamp: When the message was sent
47 session_id: Claude Code session identifier
48 uuid: Unique message identifier
49 cwd: Working directory when message was sent
50 git_branch: Git branch active when message was sent
51 is_sidechain: Whether this was a sidechain message
52 """
54 content: str
55 timestamp: datetime
56 session_id: str
57 uuid: str
58 cwd: str | None = None
59 git_branch: str | None = None
60 is_sidechain: bool = False
62 response_metadata: ResponseMetadata | None = None
64 def to_dict(self) -> dict[str, object]:
65 """Convert to dictionary for JSON serialization."""
66 result: dict[str, object] = {
67 "content": self.content,
68 "timestamp": self.timestamp.isoformat(),
69 "session_id": self.session_id,
70 "uuid": self.uuid,
71 "cwd": self.cwd,
72 "git_branch": self.git_branch,
73 "is_sidechain": self.is_sidechain,
74 }
75 if self.response_metadata is not None:
76 result["response_metadata"] = self.response_metadata.to_dict()
77 return result
80@dataclass
81class ResponseMetadata:
82 """Metadata extracted from assistant response.
84 Attributes:
85 tools_used: List of tools and their usage counts
86 files_read: Files accessed via Read tool
87 files_modified: Files changed via Edit/Write tools
88 completion_status: "success", "failure", or "partial"
89 error_message: Error text if failure detected
90 """
92 tools_used: list[dict[str, str | int]]
93 files_read: list[str]
94 files_modified: list[str]
95 completion_status: str
96 error_message: str | None = None
98 def to_dict(self) -> dict[str, object]:
99 """Convert to dictionary for JSON serialization."""
100 return {
101 "tools_used": self.tools_used,
102 "files_read": self.files_read,
103 "files_modified": self.files_modified,
104 "completion_status": self.completion_status,
105 "error_message": self.error_message,
106 }
109@dataclass
110class CommandRecord:
111 """Extracted CLI command from assistant tool_use.
113 Attributes:
114 content: The command string that was executed
115 timestamp: When the command was issued
116 session_id: Claude Code session identifier
117 uuid: Unique record identifier
118 tool: Tool name (e.g., "Bash")
119 cwd: Working directory when command was issued
120 git_branch: Git branch active when command was issued
121 """
123 content: str
124 timestamp: datetime
125 session_id: str
126 uuid: str
127 tool: str
128 cwd: str | None = None
129 git_branch: str | None = None
131 def to_dict(self) -> dict[str, object]:
132 """Convert to dictionary for JSON serialization."""
133 return {
134 "type": "command",
135 "content": self.content,
136 "timestamp": self.timestamp.isoformat(),
137 "session_id": self.session_id,
138 "uuid": self.uuid,
139 "tool": self.tool,
140 "cwd": self.cwd,
141 "git_branch": self.git_branch,
142 }
145@dataclass
146class ExampleRecord:
147 """Training example pair extracted from a skill invocation session.
149 Attributes:
150 skill: The skill name (e.g., "capture-issue")
151 input: Concatenated preceding user messages as context
152 output: JSON-serialized ResponseMetadata summary (tools_used, files_modified,
153 completion_status); free-text assistant response capture is deferred
154 session_id: Claude Code session identifier
155 timestamp: When the skill was invoked
156 context_window: Number of preceding messages used as context
157 """
159 skill: str
160 input: str
161 output: str
162 session_id: str
163 timestamp: datetime
164 context_window: int
166 def to_dict(self) -> dict[str, object]:
167 """Convert to dictionary for JSON serialization."""
168 return {
169 "type": "example",
170 "skill": self.skill,
171 "input": self.input,
172 "output": self.output,
173 "session_id": self.session_id,
174 "timestamp": self.timestamp.isoformat(),
175 "context_window": self.context_window,
176 }
179def _extract_response_metadata(response_record: dict) -> ResponseMetadata | None:
180 """Extract metadata from an assistant response record.
182 Args:
183 response_record: The assistant record from JSONL
185 Returns:
186 ResponseMetadata if parseable, None otherwise
187 """
188 message_data = response_record.get("message", {})
189 content = message_data.get("content", [])
191 if not isinstance(content, list):
192 return None
194 tools_used: dict[str, int] = {}
195 files_read: list[str] = []
196 files_modified: list[str] = []
198 for block in content:
199 if not isinstance(block, dict):
200 continue
201 if block.get("type") != "tool_use":
202 continue
204 tool_name = block.get("name", "")
205 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1
207 tool_input = block.get("input", {})
208 if tool_name == "Read":
209 file_path = tool_input.get("file_path")
210 if file_path:
211 files_read.append(file_path)
212 elif tool_name in ("Edit", "Write"):
213 file_path = tool_input.get("file_path")
214 if file_path:
215 files_modified.append(file_path)
217 # Detect completion status from text content
218 completion_status = _detect_completion_status(content)
219 error_message = _detect_error_message(content) if completion_status == "failure" else None
221 # Convert tools_used dict to list format
222 tools_list: list[dict[str, str | int]] = [
223 {"tool": name, "count": count} for name, count in tools_used.items()
224 ]
226 return ResponseMetadata(
227 tools_used=tools_list,
228 files_read=files_read,
229 files_modified=files_modified,
230 completion_status=completion_status,
231 error_message=error_message,
232 )
235def _aggregate_response_metadata(responses: list[dict]) -> ResponseMetadata | None:
236 """Aggregate metadata from multiple assistant response records.
238 Combines tool counts, file lists, and uses completion status from final response.
240 Args:
241 responses: List of assistant records from JSONL
243 Returns:
244 Aggregated ResponseMetadata, or None if no valid responses
245 """
246 if not responses:
247 return None
249 tools_used: dict[str, int] = {}
250 files_read: set[str] = set()
251 files_modified: set[str] = set()
252 completion_status = "success"
253 error_message: str | None = None
255 for response_record in responses:
256 message_data = response_record.get("message", {})
257 content = message_data.get("content", [])
259 if not isinstance(content, list):
260 continue
262 for block in content:
263 if not isinstance(block, dict):
264 continue
265 if block.get("type") != "tool_use":
266 continue
268 tool_name = block.get("name", "")
269 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1
271 tool_input = block.get("input", {})
272 if tool_name == "Read":
273 file_path = tool_input.get("file_path")
274 if file_path:
275 files_read.add(file_path)
276 elif tool_name in ("Edit", "Write"):
277 file_path = tool_input.get("file_path")
278 if file_path:
279 files_modified.add(file_path)
281 # Use completion status from the final response
282 final_content = responses[-1].get("message", {}).get("content", [])
283 if isinstance(final_content, list):
284 completion_status = _detect_completion_status(final_content)
285 if completion_status == "failure":
286 error_message = _detect_error_message(final_content)
288 # Convert to output format
289 tools_list: list[dict[str, str | int]] = [
290 {"tool": name, "count": count} for name, count in tools_used.items()
291 ]
293 return ResponseMetadata(
294 tools_used=tools_list,
295 files_read=sorted(files_read),
296 files_modified=sorted(files_modified),
297 completion_status=completion_status,
298 error_message=error_message,
299 )
302def _detect_completion_status(content: list) -> str:
303 """Detect completion status from response content.
305 Args:
306 content: List of content blocks from assistant response
308 Returns:
309 "success", "failure", or "partial"
310 """
311 text_parts = []
312 for block in content:
313 if isinstance(block, dict) and block.get("type") == "text":
314 text_parts.append(block.get("text", ""))
316 text = " ".join(text_parts).lower()
318 # Check for error indicators
319 error_patterns = ["error", "failed", "couldn't", "unable to", "cannot"]
320 if any(pattern in text for pattern in error_patterns):
321 return "failure"
323 # Check for partial completion
324 partial_patterns = ["partially", "some of", "not all", "incomplete"]
325 if any(pattern in text for pattern in partial_patterns):
326 return "partial"
328 return "success"
331def _detect_error_message(content: list) -> str | None:
332 """Extract error message from response content.
334 Args:
335 content: List of content blocks from assistant response
337 Returns:
338 Error message if found, None otherwise
339 """
340 for block in content:
341 if isinstance(block, dict) and block.get("type") == "text":
342 text = block.get("text", "")
343 # Look for common error message patterns
344 lower_text = text.lower()
345 if "error:" in lower_text or "failed:" in lower_text:
346 # Extract the line containing the error
347 for line in text.split("\n"):
348 if "error" in line.lower() or "failed" in line.lower():
349 result = line.strip()[:200] # Limit length
350 return result if isinstance(result, str) else None
351 return None
354def get_project_folder(cwd: Path | None = None) -> Path | None:
355 """Map current directory to Claude Code project folder.
357 Converts: /home/user/foo/bar -> ~/.claude/projects/-home-user-foo-bar
359 Args:
360 cwd: Working directory to map. If None, uses current directory.
362 Returns:
363 Path to Claude project folder, or None if it doesn't exist.
364 """
365 if cwd is None:
366 cwd = Path.cwd()
368 # Convert path to dash-separated format
369 # /home/user/foo/bar -> -home-user-foo-bar
370 path_str = str(cwd.resolve())
371 encoded_path = path_str.replace("/", "-")
373 # Build project folder path
374 claude_projects = Path.home() / ".claude" / "projects"
375 project_folder = claude_projects / encoded_path
377 if project_folder.exists():
378 return project_folder
380 return None
383def extract_user_messages(
384 project_folder: Path,
385 limit: int | None = None,
386 since: datetime | None = None,
387 include_agent_sessions: bool = True,
388 include_response_context: bool = False,
389) -> list[UserMessage]:
390 """Extract user messages from all JSONL session files.
392 Filters:
393 - type == "user"
394 - message.content is string (real user input)
395 - message.content is array but [0].type != "tool_result"
397 Args:
398 project_folder: Path to Claude project folder
399 limit: Maximum number of messages to return
400 since: Only include messages after this datetime
401 include_agent_sessions: Whether to include agent-*.jsonl files
402 include_response_context: Whether to include metadata from assistant responses
404 Returns:
405 Messages sorted by timestamp, most recent first.
406 """
407 messages: list[UserMessage] = []
409 # Find all JSONL files
410 pattern = "*.jsonl"
411 jsonl_files = list(project_folder.glob(pattern))
413 for jsonl_file in jsonl_files:
414 # Skip agent sessions if requested
415 if not include_agent_sessions and jsonl_file.name.startswith("agent-"):
416 continue
418 try:
419 # If we need response context, read all records first to pair user/assistant
420 if include_response_context:
421 all_records: list[dict] = []
422 with open(jsonl_file, encoding="utf-8") as f:
423 for line in f:
424 line = line.strip()
425 if not line:
426 continue
427 try:
428 record = json.loads(line)
429 all_records.append(record)
430 except json.JSONDecodeError:
431 continue
433 # Process records, pairing user messages with their responses
434 messages.extend(_extract_messages_with_context(all_records, jsonl_file, since))
435 else:
436 # Original behavior: stream through file
437 with open(jsonl_file, encoding="utf-8") as f:
438 for line in f:
439 line = line.strip()
440 if not line:
441 continue
443 try:
444 record = json.loads(line)
445 except json.JSONDecodeError:
446 continue
448 msg = _parse_user_record(record, jsonl_file, since)
449 if msg is not None:
450 messages.append(msg)
452 except OSError:
453 # Skip files that can't be read
454 continue
456 # Sort by timestamp, most recent first
457 messages.sort(key=lambda m: m.timestamp, reverse=True)
459 # Apply limit
460 if limit is not None:
461 messages = messages[:limit]
463 return messages
466def extract_commands(
467 project_folder: Path,
468 limit: int | None = None,
469 since: datetime | None = None,
470 include_agent_sessions: bool = True,
471 tools: list[str] | None = None,
472) -> list[CommandRecord]:
473 """Extract CLI commands from assistant tool_use messages.
475 Parses assistant messages for tool_use blocks and extracts command strings.
477 Args:
478 project_folder: Path to Claude project folder
479 limit: Maximum number of commands to return
480 since: Only include commands after this datetime
481 include_agent_sessions: Whether to include agent-*.jsonl files
482 tools: Filter to specific tools (default: ["Bash"])
484 Returns:
485 Commands sorted by timestamp, most recent first.
486 """
487 if tools is None:
488 tools = ["Bash"]
490 commands: list[CommandRecord] = []
492 # Find all JSONL files
493 pattern = "*.jsonl"
494 jsonl_files = list(project_folder.glob(pattern))
496 for jsonl_file in jsonl_files:
497 # Skip agent sessions if requested
498 if not include_agent_sessions and jsonl_file.name.startswith("agent-"):
499 continue
501 try:
502 with open(jsonl_file, encoding="utf-8") as f:
503 for line in f:
504 line = line.strip()
505 if not line:
506 continue
508 try:
509 record = json.loads(line)
510 except json.JSONDecodeError:
511 continue
513 cmds = _parse_command_record(record, jsonl_file, since, tools)
514 commands.extend(cmds)
516 except OSError:
517 # Skip files that can't be read
518 continue
520 # Sort by timestamp, most recent first
521 commands.sort(key=lambda c: c.timestamp, reverse=True)
523 # Apply limit
524 if limit is not None:
525 commands = commands[:limit]
527 return commands
530def _parse_command_record(
531 record: dict,
532 jsonl_file: Path,
533 since: datetime | None,
534 tools: list[str],
535) -> list[CommandRecord]:
536 """Parse CLI commands from an assistant record.
538 Args:
539 record: The JSON record from JSONL
540 jsonl_file: Source file (for fallback timestamp)
541 since: Filter for commands after this datetime
542 tools: Tool names to extract (e.g., ["Bash"])
544 Returns:
545 List of CommandRecord for each matching tool_use block
546 """
547 # Filter for assistant messages only
548 if record.get("type") != "assistant":
549 return []
551 message_data = record.get("message", {})
552 content = message_data.get("content", [])
554 if not isinstance(content, list):
555 return []
557 # Parse timestamp
558 timestamp_str = record.get("timestamp", "")
559 try:
560 timestamp_str = timestamp_str.replace("Z", "+00:00")
561 timestamp = datetime.fromisoformat(timestamp_str)
562 if timestamp.tzinfo is not None:
563 timestamp = timestamp.replace(tzinfo=None)
564 except (ValueError, AttributeError):
565 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime)
567 # Apply since filter
568 if since and timestamp < since:
569 return []
571 commands: list[CommandRecord] = []
573 for block in content:
574 if not isinstance(block, dict):
575 continue
576 if block.get("type") != "tool_use":
577 continue
579 tool_name = block.get("name", "")
580 if tool_name not in tools:
581 continue
583 tool_input = block.get("input", {})
584 command_str = tool_input.get("command", "")
585 if not command_str:
586 continue
588 commands.append(
589 CommandRecord(
590 content=command_str,
591 timestamp=timestamp,
592 session_id=record.get("sessionId", ""),
593 uuid=record.get("uuid", ""),
594 tool=tool_name,
595 cwd=record.get("cwd"),
596 git_branch=record.get("gitBranch"),
597 )
598 )
600 return commands
603def _parse_user_record(
604 record: dict,
605 jsonl_file: Path,
606 since: datetime | None,
607) -> UserMessage | None:
608 """Parse a single user record into a UserMessage.
610 Args:
611 record: The JSON record from JSONL
612 jsonl_file: Source file (for fallback timestamp)
613 since: Filter for messages after this datetime
615 Returns:
616 UserMessage if valid user message, None otherwise
617 """
618 # Filter for user messages only
619 if record.get("type") != "user":
620 return None
622 message_data = record.get("message", {})
623 content = message_data.get("content")
625 # Skip if no content
626 if content is None:
627 return None
629 # Check if this is a real user message or tool_result
630 if isinstance(content, str):
631 # String content = real user message
632 message_content = content
633 elif isinstance(content, list):
634 # Array content - check first element
635 if len(content) > 0 and content[0].get("type") == "tool_result":
636 # This is a tool result, skip it
637 return None
638 # Extract text from array (could be text blocks)
639 text_parts = []
640 for block in content:
641 if isinstance(block, dict):
642 if block.get("type") == "text":
643 text_parts.append(block.get("text", ""))
644 elif "content" in block:
645 text_parts.append(str(block.get("content", "")))
646 message_content = "\n".join(text_parts) if text_parts else str(content)
647 else:
648 return None
650 # Parse timestamp
651 timestamp_str = record.get("timestamp", "")
652 try:
653 # Handle ISO 8601 format with Z suffix
654 timestamp_str = timestamp_str.replace("Z", "+00:00")
655 timestamp = datetime.fromisoformat(timestamp_str)
656 # Convert to naive datetime for consistent comparison
657 if timestamp.tzinfo is not None:
658 timestamp = timestamp.replace(tzinfo=None)
659 except (ValueError, AttributeError):
660 # Use file modification time as fallback
661 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime)
663 # Apply since filter
664 if since and timestamp < since:
665 return None
667 # Create message object
668 return UserMessage(
669 content=message_content,
670 timestamp=timestamp,
671 session_id=record.get("sessionId", ""),
672 uuid=record.get("uuid", ""),
673 cwd=record.get("cwd"),
674 git_branch=record.get("gitBranch"),
675 is_sidechain=record.get("isSidechain", False),
676 )
679def _extract_messages_with_context(
680 records: list[dict],
681 jsonl_file: Path,
682 since: datetime | None,
683) -> list[UserMessage]:
684 """Extract user messages with response context from a list of records.
686 Pairs each user message with ALL following assistant responses until the
687 next user message, aggregating tool usage and file changes.
689 Args:
690 records: List of all records from a JSONL file
691 jsonl_file: Source file (for fallback timestamp)
692 since: Filter for messages after this datetime
694 Returns:
695 List of UserMessages with response_metadata populated
696 """
697 messages: list[UserMessage] = []
699 current_msg: UserMessage | None = None
700 current_responses: list[dict] = []
702 for record in records:
703 if record.get("type") == "user":
704 if current_msg is not None:
705 current_msg.response_metadata = _aggregate_response_metadata(current_responses)
706 messages.append(current_msg)
707 current_msg = _parse_user_record(record, jsonl_file, since)
708 current_responses = []
709 elif record.get("type") == "assistant" and current_msg is not None:
710 current_responses.append(record)
712 # Emit the final group
713 if current_msg is not None:
714 current_msg.response_metadata = _aggregate_response_metadata(current_responses)
715 messages.append(current_msg)
717 return messages
720def save_messages(
721 messages: list[UserMessage],
722 output_path: Path | None = None,
723) -> Path:
724 """Save messages to timestamped JSONL file.
726 Args:
727 messages: List of UserMessage objects to save
728 output_path: Output file path. If None, uses default location.
730 Returns:
731 Path to the saved file.
732 """
733 if output_path is None:
734 # Default: ./.ll/user-messages-{timestamp}.jsonl
735 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
736 output_dir = Path.cwd() / ".ll"
737 output_dir.mkdir(parents=True, exist_ok=True)
738 output_path = output_dir / f"user-messages-{timestamp}.jsonl"
740 output_path = Path(output_path)
741 output_path.parent.mkdir(parents=True, exist_ok=True)
743 with open(output_path, "w", encoding="utf-8") as f:
744 for msg in messages:
745 f.write(json.dumps(msg.to_dict()) + "\n")
747 return output_path
750def build_examples(
751 messages: list[UserMessage],
752 skill: str,
753 context_window: int = 3,
754) -> list[ExampleRecord]:
755 """Build training example pairs from skill invocation sessions.
757 Groups messages by session, identifies skill trigger records (the user-side record
758 whose content contains ``<command-name>/ll:SKILL_NAME</command-name>``), and pairs
759 each trigger with the N preceding messages as input context.
761 Args:
762 messages: UserMessage list (already filtered to skill-matching sessions)
763 skill: The skill name to build examples for (e.g. "capture-issue")
764 context_window: Number of preceding messages to include as context (default 3)
766 Returns:
767 List of ExampleRecord objects, one per skill trigger record found.
768 """
769 import re
771 skill_pattern = re.compile(rf"<command-name>/ll:{re.escape(skill)}</command-name>")
773 # Group by session_id, sorted ascending by timestamp
774 sessions: dict[str, list[UserMessage]] = {}
775 for msg in messages:
776 sessions.setdefault(msg.session_id, []).append(msg)
777 for session_msgs in sessions.values():
778 session_msgs.sort(key=lambda m: m.timestamp)
780 examples: list[ExampleRecord] = []
781 for session_id, session_msgs in sessions.items():
782 for idx, msg in enumerate(session_msgs):
783 if not skill_pattern.search(msg.content):
784 continue
786 # Collect N preceding messages as context
787 preceding = session_msgs[max(0, idx - context_window) : idx]
788 input_text = "\n\n".join(m.content for m in preceding)
790 # Serialize response_metadata as output
791 if msg.response_metadata is not None:
792 output_str = json.dumps(msg.response_metadata.to_dict())
793 else:
794 output_str = "{}"
796 examples.append(
797 ExampleRecord(
798 skill=skill,
799 input=input_text,
800 output=output_str,
801 session_id=session_id,
802 timestamp=msg.timestamp,
803 context_window=context_window,
804 )
805 )
807 return examples
810def print_messages_to_stdout(messages: list[UserMessage]) -> None:
811 """Print messages to stdout in JSONL format.
813 Args:
814 messages: List of UserMessage objects to print
815 """
816 import sys
818 for msg in messages:
819 print(json.dumps(msg.to_dict()), file=sys.stdout)