Coverage for little_loops / user_messages.py: 92%

332 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Extract and analyze user messages from Claude Code logs. 

2 

3Provides functionality to extract user messages from Claude Code session 

4logs stored in ~/.claude/projects/. 

5 

6Usage as CLI: 

7 ll-messages # Last 100 messages to file 

8 ll-messages -n 50 # Last 50 messages 

9 ll-messages --since 2026-01-01 # Since date 

10 ll-messages -o output.jsonl # Custom output path 

11 ll-messages --stdout # Print to terminal instead of file 

12 

13Usage as library: 

14 from little_loops.user_messages import extract_user_messages, get_project_folder 

15 

16 project_folder = get_project_folder() 

17 messages = extract_user_messages(project_folder, limit=50) 

18""" 

19 

20from __future__ import annotations 

21 

22import json 

23from dataclasses import dataclass 

24from datetime import datetime 

25from pathlib import Path 

26 

27__all__ = [ 

28 "UserMessage", 

29 "ResponseMetadata", 

30 "CommandRecord", 

31 "ExampleRecord", 

32 "get_project_folder", 

33 "extract_user_messages", 

34 "extract_commands", 

35 "build_examples", 

36 "save_messages", 

37] 

38 

39 

40@dataclass 

41class UserMessage: 

42 """Extracted user message with metadata. 

43 

44 Attributes: 

45 content: The text content of the user message 

46 timestamp: When the message was sent 

47 session_id: Claude Code session identifier 

48 uuid: Unique message identifier 

49 cwd: Working directory when message was sent 

50 git_branch: Git branch active when message was sent 

51 is_sidechain: Whether this was a sidechain message 

52 """ 

53 

54 content: str 

55 timestamp: datetime 

56 session_id: str 

57 uuid: str 

58 cwd: str | None = None 

59 git_branch: str | None = None 

60 is_sidechain: bool = False 

61 

62 response_metadata: ResponseMetadata | None = None 

63 

64 def to_dict(self) -> dict[str, object]: 

65 """Convert to dictionary for JSON serialization.""" 

66 result: dict[str, object] = { 

67 "content": self.content, 

68 "timestamp": self.timestamp.isoformat(), 

69 "session_id": self.session_id, 

70 "uuid": self.uuid, 

71 "cwd": self.cwd, 

72 "git_branch": self.git_branch, 

73 "is_sidechain": self.is_sidechain, 

74 } 

75 if self.response_metadata is not None: 

76 result["response_metadata"] = self.response_metadata.to_dict() 

77 return result 

78 

79 

80@dataclass 

81class ResponseMetadata: 

82 """Metadata extracted from assistant response. 

83 

84 Attributes: 

85 tools_used: List of tools and their usage counts 

86 files_read: Files accessed via Read tool 

87 files_modified: Files changed via Edit/Write tools 

88 completion_status: "success", "failure", or "partial" 

89 error_message: Error text if failure detected 

90 """ 

91 

92 tools_used: list[dict[str, str | int]] 

93 files_read: list[str] 

94 files_modified: list[str] 

95 completion_status: str 

96 error_message: str | None = None 

97 

98 def to_dict(self) -> dict[str, object]: 

99 """Convert to dictionary for JSON serialization.""" 

100 return { 

101 "tools_used": self.tools_used, 

102 "files_read": self.files_read, 

103 "files_modified": self.files_modified, 

104 "completion_status": self.completion_status, 

105 "error_message": self.error_message, 

106 } 

107 

108 

109@dataclass 

110class CommandRecord: 

111 """Extracted CLI command from assistant tool_use. 

112 

113 Attributes: 

114 content: The command string that was executed 

115 timestamp: When the command was issued 

116 session_id: Claude Code session identifier 

117 uuid: Unique record identifier 

118 tool: Tool name (e.g., "Bash") 

119 cwd: Working directory when command was issued 

120 git_branch: Git branch active when command was issued 

121 """ 

122 

123 content: str 

124 timestamp: datetime 

125 session_id: str 

126 uuid: str 

127 tool: str 

128 cwd: str | None = None 

129 git_branch: str | None = None 

130 

131 def to_dict(self) -> dict[str, object]: 

132 """Convert to dictionary for JSON serialization.""" 

133 return { 

134 "type": "command", 

135 "content": self.content, 

136 "timestamp": self.timestamp.isoformat(), 

137 "session_id": self.session_id, 

138 "uuid": self.uuid, 

139 "tool": self.tool, 

140 "cwd": self.cwd, 

141 "git_branch": self.git_branch, 

142 } 

143 

144 

145@dataclass 

146class ExampleRecord: 

147 """Training example pair extracted from a skill invocation session. 

148 

149 Attributes: 

150 skill: The skill name (e.g., "capture-issue") 

151 input: Concatenated preceding user messages as context 

152 output: JSON-serialized ResponseMetadata summary (tools_used, files_modified, 

153 completion_status); free-text assistant response capture is deferred 

154 session_id: Claude Code session identifier 

155 timestamp: When the skill was invoked 

156 context_window: Number of preceding messages used as context 

157 """ 

158 

159 skill: str 

160 input: str 

161 output: str 

162 session_id: str 

163 timestamp: datetime 

164 context_window: int 

165 

166 def to_dict(self) -> dict[str, object]: 

167 """Convert to dictionary for JSON serialization.""" 

168 return { 

169 "type": "example", 

170 "skill": self.skill, 

171 "input": self.input, 

172 "output": self.output, 

173 "session_id": self.session_id, 

174 "timestamp": self.timestamp.isoformat(), 

175 "context_window": self.context_window, 

176 } 

177 

178 

179def _extract_response_metadata(response_record: dict) -> ResponseMetadata | None: 

180 """Extract metadata from an assistant response record. 

181 

182 Args: 

183 response_record: The assistant record from JSONL 

184 

185 Returns: 

186 ResponseMetadata if parseable, None otherwise 

187 """ 

188 message_data = response_record.get("message", {}) 

189 content = message_data.get("content", []) 

190 

191 if not isinstance(content, list): 

192 return None 

193 

194 tools_used: dict[str, int] = {} 

195 files_read: list[str] = [] 

196 files_modified: list[str] = [] 

197 

198 for block in content: 

199 if not isinstance(block, dict): 

200 continue 

201 if block.get("type") != "tool_use": 

202 continue 

203 

204 tool_name = block.get("name", "") 

205 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1 

206 

207 tool_input = block.get("input", {}) 

208 if tool_name == "Read": 

209 file_path = tool_input.get("file_path") 

210 if file_path: 

211 files_read.append(file_path) 

212 elif tool_name in ("Edit", "Write"): 

213 file_path = tool_input.get("file_path") 

214 if file_path: 

215 files_modified.append(file_path) 

216 

217 # Detect completion status from text content 

218 completion_status = _detect_completion_status(content) 

219 error_message = _detect_error_message(content) if completion_status == "failure" else None 

220 

221 # Convert tools_used dict to list format 

222 tools_list: list[dict[str, str | int]] = [ 

223 {"tool": name, "count": count} for name, count in tools_used.items() 

224 ] 

225 

226 return ResponseMetadata( 

227 tools_used=tools_list, 

228 files_read=files_read, 

229 files_modified=files_modified, 

230 completion_status=completion_status, 

231 error_message=error_message, 

232 ) 

233 

234 

235def _aggregate_response_metadata(responses: list[dict]) -> ResponseMetadata | None: 

236 """Aggregate metadata from multiple assistant response records. 

237 

238 Combines tool counts, file lists, and uses completion status from final response. 

239 

240 Args: 

241 responses: List of assistant records from JSONL 

242 

243 Returns: 

244 Aggregated ResponseMetadata, or None if no valid responses 

245 """ 

246 if not responses: 

247 return None 

248 

249 tools_used: dict[str, int] = {} 

250 files_read: set[str] = set() 

251 files_modified: set[str] = set() 

252 completion_status = "success" 

253 error_message: str | None = None 

254 

255 for response_record in responses: 

256 message_data = response_record.get("message", {}) 

257 content = message_data.get("content", []) 

258 

259 if not isinstance(content, list): 

260 continue 

261 

262 for block in content: 

263 if not isinstance(block, dict): 

264 continue 

265 if block.get("type") != "tool_use": 

266 continue 

267 

268 tool_name = block.get("name", "") 

269 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1 

270 

271 tool_input = block.get("input", {}) 

272 if tool_name == "Read": 

273 file_path = tool_input.get("file_path") 

274 if file_path: 

275 files_read.add(file_path) 

276 elif tool_name in ("Edit", "Write"): 

277 file_path = tool_input.get("file_path") 

278 if file_path: 

279 files_modified.add(file_path) 

280 

281 # Use completion status from the final response 

282 final_content = responses[-1].get("message", {}).get("content", []) 

283 if isinstance(final_content, list): 

284 completion_status = _detect_completion_status(final_content) 

285 if completion_status == "failure": 

286 error_message = _detect_error_message(final_content) 

287 

288 # Convert to output format 

289 tools_list: list[dict[str, str | int]] = [ 

290 {"tool": name, "count": count} for name, count in tools_used.items() 

291 ] 

292 

293 return ResponseMetadata( 

294 tools_used=tools_list, 

295 files_read=sorted(files_read), 

296 files_modified=sorted(files_modified), 

297 completion_status=completion_status, 

298 error_message=error_message, 

299 ) 

300 

301 

302def _detect_completion_status(content: list) -> str: 

303 """Detect completion status from response content. 

304 

305 Args: 

306 content: List of content blocks from assistant response 

307 

308 Returns: 

309 "success", "failure", or "partial" 

310 """ 

311 text_parts = [] 

312 for block in content: 

313 if isinstance(block, dict) and block.get("type") == "text": 

314 text_parts.append(block.get("text", "")) 

315 

316 text = " ".join(text_parts).lower() 

317 

318 # Check for error indicators 

319 error_patterns = ["error", "failed", "couldn't", "unable to", "cannot"] 

320 if any(pattern in text for pattern in error_patterns): 

321 return "failure" 

322 

323 # Check for partial completion 

324 partial_patterns = ["partially", "some of", "not all", "incomplete"] 

325 if any(pattern in text for pattern in partial_patterns): 

326 return "partial" 

327 

328 return "success" 

329 

330 

331def _detect_error_message(content: list) -> str | None: 

332 """Extract error message from response content. 

333 

334 Args: 

335 content: List of content blocks from assistant response 

336 

337 Returns: 

338 Error message if found, None otherwise 

339 """ 

340 for block in content: 

341 if isinstance(block, dict) and block.get("type") == "text": 

342 text = block.get("text", "") 

343 # Look for common error message patterns 

344 lower_text = text.lower() 

345 if "error:" in lower_text or "failed:" in lower_text: 

346 # Extract the line containing the error 

347 for line in text.split("\n"): 

348 if "error" in line.lower() or "failed" in line.lower(): 

349 result = line.strip()[:200] # Limit length 

350 return result if isinstance(result, str) else None 

351 return None 

352 

353 

354def get_project_folder(cwd: Path | None = None) -> Path | None: 

355 """Map current directory to Claude Code project folder. 

356 

357 Converts: /home/user/foo/bar -> ~/.claude/projects/-home-user-foo-bar 

358 

359 Args: 

360 cwd: Working directory to map. If None, uses current directory. 

361 

362 Returns: 

363 Path to Claude project folder, or None if it doesn't exist. 

364 """ 

365 if cwd is None: 

366 cwd = Path.cwd() 

367 

368 # Convert path to dash-separated format 

369 # /home/user/foo/bar -> -home-user-foo-bar 

370 path_str = str(cwd.resolve()) 

371 encoded_path = path_str.replace("/", "-") 

372 

373 # Build project folder path 

374 claude_projects = Path.home() / ".claude" / "projects" 

375 project_folder = claude_projects / encoded_path 

376 

377 if project_folder.exists(): 

378 return project_folder 

379 

380 return None 

381 

382 

383def extract_user_messages( 

384 project_folder: Path, 

385 limit: int | None = None, 

386 since: datetime | None = None, 

387 include_agent_sessions: bool = True, 

388 include_response_context: bool = False, 

389) -> list[UserMessage]: 

390 """Extract user messages from all JSONL session files. 

391 

392 Filters: 

393 - type == "user" 

394 - message.content is string (real user input) 

395 - message.content is array but [0].type != "tool_result" 

396 

397 Args: 

398 project_folder: Path to Claude project folder 

399 limit: Maximum number of messages to return 

400 since: Only include messages after this datetime 

401 include_agent_sessions: Whether to include agent-*.jsonl files 

402 include_response_context: Whether to include metadata from assistant responses 

403 

404 Returns: 

405 Messages sorted by timestamp, most recent first. 

406 """ 

407 messages: list[UserMessage] = [] 

408 

409 # Find all JSONL files 

410 pattern = "*.jsonl" 

411 jsonl_files = list(project_folder.glob(pattern)) 

412 

413 for jsonl_file in jsonl_files: 

414 # Skip agent sessions if requested 

415 if not include_agent_sessions and jsonl_file.name.startswith("agent-"): 

416 continue 

417 

418 try: 

419 # If we need response context, read all records first to pair user/assistant 

420 if include_response_context: 

421 all_records: list[dict] = [] 

422 with open(jsonl_file, encoding="utf-8") as f: 

423 for line in f: 

424 line = line.strip() 

425 if not line: 

426 continue 

427 try: 

428 record = json.loads(line) 

429 all_records.append(record) 

430 except json.JSONDecodeError: 

431 continue 

432 

433 # Process records, pairing user messages with their responses 

434 messages.extend(_extract_messages_with_context(all_records, jsonl_file, since)) 

435 else: 

436 # Original behavior: stream through file 

437 with open(jsonl_file, encoding="utf-8") as f: 

438 for line in f: 

439 line = line.strip() 

440 if not line: 

441 continue 

442 

443 try: 

444 record = json.loads(line) 

445 except json.JSONDecodeError: 

446 continue 

447 

448 msg = _parse_user_record(record, jsonl_file, since) 

449 if msg is not None: 

450 messages.append(msg) 

451 

452 except OSError: 

453 # Skip files that can't be read 

454 continue 

455 

456 # Sort by timestamp, most recent first 

457 messages.sort(key=lambda m: m.timestamp, reverse=True) 

458 

459 # Apply limit 

460 if limit is not None: 

461 messages = messages[:limit] 

462 

463 return messages 

464 

465 

466def extract_commands( 

467 project_folder: Path, 

468 limit: int | None = None, 

469 since: datetime | None = None, 

470 include_agent_sessions: bool = True, 

471 tools: list[str] | None = None, 

472) -> list[CommandRecord]: 

473 """Extract CLI commands from assistant tool_use messages. 

474 

475 Parses assistant messages for tool_use blocks and extracts command strings. 

476 

477 Args: 

478 project_folder: Path to Claude project folder 

479 limit: Maximum number of commands to return 

480 since: Only include commands after this datetime 

481 include_agent_sessions: Whether to include agent-*.jsonl files 

482 tools: Filter to specific tools (default: ["Bash"]) 

483 

484 Returns: 

485 Commands sorted by timestamp, most recent first. 

486 """ 

487 if tools is None: 

488 tools = ["Bash"] 

489 

490 commands: list[CommandRecord] = [] 

491 

492 # Find all JSONL files 

493 pattern = "*.jsonl" 

494 jsonl_files = list(project_folder.glob(pattern)) 

495 

496 for jsonl_file in jsonl_files: 

497 # Skip agent sessions if requested 

498 if not include_agent_sessions and jsonl_file.name.startswith("agent-"): 

499 continue 

500 

501 try: 

502 with open(jsonl_file, encoding="utf-8") as f: 

503 for line in f: 

504 line = line.strip() 

505 if not line: 

506 continue 

507 

508 try: 

509 record = json.loads(line) 

510 except json.JSONDecodeError: 

511 continue 

512 

513 cmds = _parse_command_record(record, jsonl_file, since, tools) 

514 commands.extend(cmds) 

515 

516 except OSError: 

517 # Skip files that can't be read 

518 continue 

519 

520 # Sort by timestamp, most recent first 

521 commands.sort(key=lambda c: c.timestamp, reverse=True) 

522 

523 # Apply limit 

524 if limit is not None: 

525 commands = commands[:limit] 

526 

527 return commands 

528 

529 

530def _parse_command_record( 

531 record: dict, 

532 jsonl_file: Path, 

533 since: datetime | None, 

534 tools: list[str], 

535) -> list[CommandRecord]: 

536 """Parse CLI commands from an assistant record. 

537 

538 Args: 

539 record: The JSON record from JSONL 

540 jsonl_file: Source file (for fallback timestamp) 

541 since: Filter for commands after this datetime 

542 tools: Tool names to extract (e.g., ["Bash"]) 

543 

544 Returns: 

545 List of CommandRecord for each matching tool_use block 

546 """ 

547 # Filter for assistant messages only 

548 if record.get("type") != "assistant": 

549 return [] 

550 

551 message_data = record.get("message", {}) 

552 content = message_data.get("content", []) 

553 

554 if not isinstance(content, list): 

555 return [] 

556 

557 # Parse timestamp 

558 timestamp_str = record.get("timestamp", "") 

559 try: 

560 timestamp_str = timestamp_str.replace("Z", "+00:00") 

561 timestamp = datetime.fromisoformat(timestamp_str) 

562 if timestamp.tzinfo is not None: 

563 timestamp = timestamp.replace(tzinfo=None) 

564 except (ValueError, AttributeError): 

565 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime) 

566 

567 # Apply since filter 

568 if since and timestamp < since: 

569 return [] 

570 

571 commands: list[CommandRecord] = [] 

572 

573 for block in content: 

574 if not isinstance(block, dict): 

575 continue 

576 if block.get("type") != "tool_use": 

577 continue 

578 

579 tool_name = block.get("name", "") 

580 if tool_name not in tools: 

581 continue 

582 

583 tool_input = block.get("input", {}) 

584 command_str = tool_input.get("command", "") 

585 if not command_str: 

586 continue 

587 

588 commands.append( 

589 CommandRecord( 

590 content=command_str, 

591 timestamp=timestamp, 

592 session_id=record.get("sessionId", ""), 

593 uuid=record.get("uuid", ""), 

594 tool=tool_name, 

595 cwd=record.get("cwd"), 

596 git_branch=record.get("gitBranch"), 

597 ) 

598 ) 

599 

600 return commands 

601 

602 

603def _parse_user_record( 

604 record: dict, 

605 jsonl_file: Path, 

606 since: datetime | None, 

607) -> UserMessage | None: 

608 """Parse a single user record into a UserMessage. 

609 

610 Args: 

611 record: The JSON record from JSONL 

612 jsonl_file: Source file (for fallback timestamp) 

613 since: Filter for messages after this datetime 

614 

615 Returns: 

616 UserMessage if valid user message, None otherwise 

617 """ 

618 # Filter for user messages only 

619 if record.get("type") != "user": 

620 return None 

621 

622 message_data = record.get("message", {}) 

623 content = message_data.get("content") 

624 

625 # Skip if no content 

626 if content is None: 

627 return None 

628 

629 # Check if this is a real user message or tool_result 

630 if isinstance(content, str): 

631 # String content = real user message 

632 message_content = content 

633 elif isinstance(content, list): 

634 # Array content - check first element 

635 if len(content) > 0 and content[0].get("type") == "tool_result": 

636 # This is a tool result, skip it 

637 return None 

638 # Extract text from array (could be text blocks) 

639 text_parts = [] 

640 for block in content: 

641 if isinstance(block, dict): 

642 if block.get("type") == "text": 

643 text_parts.append(block.get("text", "")) 

644 elif "content" in block: 

645 text_parts.append(str(block.get("content", ""))) 

646 message_content = "\n".join(text_parts) if text_parts else str(content) 

647 else: 

648 return None 

649 

650 # Parse timestamp 

651 timestamp_str = record.get("timestamp", "") 

652 try: 

653 # Handle ISO 8601 format with Z suffix 

654 timestamp_str = timestamp_str.replace("Z", "+00:00") 

655 timestamp = datetime.fromisoformat(timestamp_str) 

656 # Convert to naive datetime for consistent comparison 

657 if timestamp.tzinfo is not None: 

658 timestamp = timestamp.replace(tzinfo=None) 

659 except (ValueError, AttributeError): 

660 # Use file modification time as fallback 

661 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime) 

662 

663 # Apply since filter 

664 if since and timestamp < since: 

665 return None 

666 

667 # Create message object 

668 return UserMessage( 

669 content=message_content, 

670 timestamp=timestamp, 

671 session_id=record.get("sessionId", ""), 

672 uuid=record.get("uuid", ""), 

673 cwd=record.get("cwd"), 

674 git_branch=record.get("gitBranch"), 

675 is_sidechain=record.get("isSidechain", False), 

676 ) 

677 

678 

679def _extract_messages_with_context( 

680 records: list[dict], 

681 jsonl_file: Path, 

682 since: datetime | None, 

683) -> list[UserMessage]: 

684 """Extract user messages with response context from a list of records. 

685 

686 Pairs each user message with ALL following assistant responses until the 

687 next user message, aggregating tool usage and file changes. 

688 

689 Args: 

690 records: List of all records from a JSONL file 

691 jsonl_file: Source file (for fallback timestamp) 

692 since: Filter for messages after this datetime 

693 

694 Returns: 

695 List of UserMessages with response_metadata populated 

696 """ 

697 messages: list[UserMessage] = [] 

698 

699 current_msg: UserMessage | None = None 

700 current_responses: list[dict] = [] 

701 

702 for record in records: 

703 if record.get("type") == "user": 

704 if current_msg is not None: 

705 current_msg.response_metadata = _aggregate_response_metadata(current_responses) 

706 messages.append(current_msg) 

707 current_msg = _parse_user_record(record, jsonl_file, since) 

708 current_responses = [] 

709 elif record.get("type") == "assistant" and current_msg is not None: 

710 current_responses.append(record) 

711 

712 # Emit the final group 

713 if current_msg is not None: 

714 current_msg.response_metadata = _aggregate_response_metadata(current_responses) 

715 messages.append(current_msg) 

716 

717 return messages 

718 

719 

720def save_messages( 

721 messages: list[UserMessage], 

722 output_path: Path | None = None, 

723) -> Path: 

724 """Save messages to timestamped JSONL file. 

725 

726 Args: 

727 messages: List of UserMessage objects to save 

728 output_path: Output file path. If None, uses default location. 

729 

730 Returns: 

731 Path to the saved file. 

732 """ 

733 if output_path is None: 

734 # Default: ./.ll/user-messages-{timestamp}.jsonl 

735 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 

736 output_dir = Path.cwd() / ".ll" 

737 output_dir.mkdir(parents=True, exist_ok=True) 

738 output_path = output_dir / f"user-messages-{timestamp}.jsonl" 

739 

740 output_path = Path(output_path) 

741 output_path.parent.mkdir(parents=True, exist_ok=True) 

742 

743 with open(output_path, "w", encoding="utf-8") as f: 

744 for msg in messages: 

745 f.write(json.dumps(msg.to_dict()) + "\n") 

746 

747 return output_path 

748 

749 

750def build_examples( 

751 messages: list[UserMessage], 

752 skill: str, 

753 context_window: int = 3, 

754) -> list[ExampleRecord]: 

755 """Build training example pairs from skill invocation sessions. 

756 

757 Groups messages by session, identifies skill trigger records (the user-side record 

758 whose content contains ``<command-name>/ll:SKILL_NAME</command-name>``), and pairs 

759 each trigger with the N preceding messages as input context. 

760 

761 Args: 

762 messages: UserMessage list (already filtered to skill-matching sessions) 

763 skill: The skill name to build examples for (e.g. "capture-issue") 

764 context_window: Number of preceding messages to include as context (default 3) 

765 

766 Returns: 

767 List of ExampleRecord objects, one per skill trigger record found. 

768 """ 

769 import re 

770 

771 skill_pattern = re.compile(rf"<command-name>/ll:{re.escape(skill)}</command-name>") 

772 

773 # Group by session_id, sorted ascending by timestamp 

774 sessions: dict[str, list[UserMessage]] = {} 

775 for msg in messages: 

776 sessions.setdefault(msg.session_id, []).append(msg) 

777 for session_msgs in sessions.values(): 

778 session_msgs.sort(key=lambda m: m.timestamp) 

779 

780 examples: list[ExampleRecord] = [] 

781 for session_id, session_msgs in sessions.items(): 

782 for idx, msg in enumerate(session_msgs): 

783 if not skill_pattern.search(msg.content): 

784 continue 

785 

786 # Collect N preceding messages as context 

787 preceding = session_msgs[max(0, idx - context_window) : idx] 

788 input_text = "\n\n".join(m.content for m in preceding) 

789 

790 # Serialize response_metadata as output 

791 if msg.response_metadata is not None: 

792 output_str = json.dumps(msg.response_metadata.to_dict()) 

793 else: 

794 output_str = "{}" 

795 

796 examples.append( 

797 ExampleRecord( 

798 skill=skill, 

799 input=input_text, 

800 output=output_str, 

801 session_id=session_id, 

802 timestamp=msg.timestamp, 

803 context_window=context_window, 

804 ) 

805 ) 

806 

807 return examples 

808 

809 

810def print_messages_to_stdout(messages: list[UserMessage]) -> None: 

811 """Print messages to stdout in JSONL format. 

812 

813 Args: 

814 messages: List of UserMessage objects to print 

815 """ 

816 import sys 

817 

818 for msg in messages: 

819 print(json.dumps(msg.to_dict()), file=sys.stdout)