Coverage for little_loops / subprocess_utils.py: 86%
174 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Subprocess utilities for Claude CLI invocation.
3Provides shared functionality for running Claude CLI commands with
4real-time output streaming, timeout handling, and context handoff detection.
5"""
7from __future__ import annotations
9import json
10import logging
11import os
12import re
13import selectors
14import subprocess
15import time
16from collections.abc import Callable
17from pathlib import Path
19from little_loops.host_runner import resolve_host
21logger = logging.getLogger(__name__)
23# Callback type: (line: str, is_stderr: bool) -> None
24OutputCallback = Callable[[str, bool], None]
26# Process lifecycle callback: (process: Popen) -> None
27ProcessCallback = Callable[[subprocess.Popen[str]], None]
29# Model detection callback: (model: str) -> None
30ModelCallback = Callable[[str], None]
32# Usage callback: (input_tokens: int, output_tokens: int) -> None
33UsageCallback = Callable[[int, int], None]
35# Context handoff detection pattern
36CONTEXT_HANDOFF_PATTERN = re.compile(r"CONTEXT_HANDOFF:\s*Ready for fresh session")
37CONTINUATION_PROMPT_PATH = Path(".ll/ll-continue-prompt.md")
39# Sentinel file written when a session ends with high context usage (Option G).
40# Consumed by run_with_continuation; NOT deleted by session-cleanup.sh.
41SENTINEL_PATH = Path(".ll/ll-context-handoff-needed")
43# Chars of captured_stdout to include in Option J guillotine prompt (≈3K tokens).
44_GUILLOTINE_TAIL_CHARS = 12_000
45# Lines of original_command to include for task intent.
46_GUILLOTINE_MAX_TASK_LINES = 20
49def detect_context_handoff(output: str) -> bool:
50 """Check if output contains a context handoff signal.
52 Args:
53 output: Command output to check
55 Returns:
56 True if context handoff was signaled
57 """
58 return bool(CONTEXT_HANDOFF_PATTERN.search(output))
61def read_continuation_prompt(repo_path: Path | None = None) -> str | None:
62 """Read the continuation prompt file if it exists.
64 Args:
65 repo_path: Optional repository root path
67 Returns:
68 Contents of continuation prompt, or None if not found
69 """
70 prompt_path = (repo_path or Path.cwd()) / CONTINUATION_PROMPT_PATH
71 if prompt_path.exists():
72 return prompt_path.read_text()
73 return None
76def read_sentinel(repo_path: Path | None = None) -> dict | None:
77 """Read and consume the context-handoff sentinel file if it exists.
79 The sentinel is written by context-handoff-sentinel.sh (Stop hook) or
80 the Python layer in run_with_continuation when a session ends with high
81 context usage but no CONTEXT_HANDOFF signal.
83 Args:
84 repo_path: Optional repository root path
86 Returns:
87 Parsed sentinel dict, or None if not present
88 """
89 sentinel_path = (repo_path or Path.cwd()) / SENTINEL_PATH
90 if not sentinel_path.exists():
91 return None
92 try:
93 data = json.loads(sentinel_path.read_text())
94 sentinel_path.unlink(missing_ok=True)
95 return data
96 except Exception:
97 sentinel_path.unlink(missing_ok=True)
98 return {}
101def write_sentinel(
102 repo_path: Path | None = None,
103 token_count: int = 0,
104 context_limit: int = 200_000,
105) -> None:
106 """Write the context-handoff sentinel file.
108 Args:
109 repo_path: Optional repository root path
110 token_count: Total tokens used in the session
111 context_limit: Context window size
112 """
113 import datetime
115 sentinel_path = (repo_path or Path.cwd()) / SENTINEL_PATH
116 usage_percent = int(token_count * 100 / context_limit) if context_limit > 0 else 0
117 try:
118 sentinel_path.parent.mkdir(parents=True, exist_ok=True)
119 sentinel_path.write_text(
120 json.dumps(
121 {
122 "written_at": datetime.datetime.now(datetime.UTC).strftime(
123 "%Y-%m-%dT%H:%M:%SZ"
124 ),
125 "token_count": token_count,
126 "context_limit": context_limit,
127 "usage_percent": usage_percent,
128 }
129 )
130 )
131 except Exception:
132 pass
135def assemble_guillotine_prompt(
136 original_command: str,
137 captured_stdout: str,
138 token_stats: dict,
139) -> str:
140 """Assemble a fresh-session continuation prompt for Option J (parent-side guillotine).
142 Called when context > 90% or "Prompt is too long" is detected with no handoff.
143 The resulting prompt is passed to a BRAND-NEW claude -p session (not --resume),
144 so it starts with 0 tokens.
146 Args:
147 original_command: The original task command / skill invocation
148 captured_stdout: All Claude text output captured so far
149 token_stats: Dict with keys: input_tokens, output_tokens, context_limit,
150 trigger_reason (optional)
152 Returns:
153 Assembled continuation prompt string
154 """
155 task_lines = original_command.strip().splitlines()[:_GUILLOTINE_MAX_TASK_LINES]
156 task_excerpt = "\n".join(task_lines)
157 if len(original_command.strip().splitlines()) > _GUILLOTINE_MAX_TASK_LINES:
158 task_excerpt += f"\n... (truncated to {_GUILLOTINE_MAX_TASK_LINES} lines)"
160 stdout_tail = (captured_stdout or "")[-_GUILLOTINE_TAIL_CHARS:]
161 if not stdout_tail:
162 stdout_tail = "(no output captured before interruption)"
164 input_tokens = token_stats.get("input_tokens", 0)
165 output_tokens = token_stats.get("output_tokens", 0)
166 context_limit = token_stats.get("context_limit", 200_000)
167 trigger_reason = token_stats.get("trigger_reason", "context > 90%")
169 scratch_listing = _list_scratch_files()
171 return f"""\
172⚠ CONTEXT LIMIT REACHED — FRESH SESSION CONTINUATION
174The previous automation session exhausted its context window before completing.
175This fresh session (new context window, starts at 0 tokens) is continuing from
176that interrupted session.
178## Original Task
179{task_excerpt}
181## Session Progress at Interruption
182- Approximate tokens used: {input_tokens + output_tokens:,} / {context_limit:,}
183- Trigger reason: {trigger_reason}
185## Last Session Output (what was happening at interruption)
186{stdout_tail}
188## Scratch Pad Files Available
189{scratch_listing}
191## Instructions for This Session
1921. Do NOT restart from scratch — the previous session made progress (see above)
1932. Read the "Last Session Output" section to understand exactly where we were
1943. Check the scratch pad files before re-running expensive operations
1954. Continue implementation from the interruption point
1965. Complete normally: test, commit, close the issue as usual
197"""
200def _list_scratch_files() -> str:
201 """List files in .loops/tmp/scratch/ with sizes for the guillotine prompt."""
202 scratch_dir = Path(".loops/tmp/scratch")
203 if not scratch_dir.exists():
204 return "None"
205 try:
206 files = sorted(scratch_dir.iterdir())
207 if not files:
208 return "None"
209 lines = []
210 for f in files:
211 try:
212 size_kb = f.stat().st_size // 1024
213 lines.append(f" {f.name} ({size_kb}KB)")
214 except Exception:
215 lines.append(f" {f.name}")
216 return "\n".join(lines)
217 except Exception:
218 return "None"
221def run_claude_command(
222 command: str,
223 timeout: int = 3600,
224 working_dir: Path | None = None,
225 stream_callback: OutputCallback | None = None,
226 on_process_start: ProcessCallback | None = None,
227 on_process_end: ProcessCallback | None = None,
228 idle_timeout: int = 0,
229 on_model_detected: ModelCallback | None = None,
230 on_usage: UsageCallback | None = None,
231 agent: str | None = None,
232 tools: list[str] | None = None,
233 resume_session: bool = False,
234) -> subprocess.CompletedProcess[str]:
235 """Invoke Claude CLI command with real-time output streaming.
237 Args:
238 command: Command to pass to Claude CLI
239 timeout: Timeout in seconds (0 for no timeout)
240 working_dir: Optional working directory for the command
241 stream_callback: Optional callback for streaming output lines.
242 Called with (line, is_stderr) for each line of output.
243 on_process_start: Optional callback invoked after process starts.
244 Receives the Popen object for tracking/management.
245 on_process_end: Optional callback invoked after process completes.
246 Receives the Popen object. Called in finally block.
247 idle_timeout: Kill process if no output for this many seconds (0 to disable).
248 on_model_detected: Optional callback invoked with the model name from the
249 stream-json system/init event. Called at most once per invocation.
250 on_usage: Optional callback invoked with (input_tokens, output_tokens) from
251 the stream-json result event. input_tokens includes cache_read_input_tokens.
252 resume_session: If True, passes --continue to the Claude CLI to continue the
253 most recent conversation. Used for the Option E explicit-handoff path.
255 Returns:
256 CompletedProcess with stdout/stderr captured
258 Raises:
259 subprocess.TimeoutExpired: If command exceeds timeout or idle timeout.
260 When triggered by idle timeout, the output field is set to "idle_timeout".
261 """
262 runner = resolve_host()
263 invocation = runner.build_streaming(
264 prompt=command,
265 working_dir=working_dir,
266 resume=resume_session,
267 agent=agent,
268 tools=tools,
269 )
270 cmd_args = [invocation.binary, *invocation.args]
272 env = os.environ.copy()
273 env.update(invocation.env)
274 if "GIT_DIR" in invocation.env:
275 logger.debug("Worktree detected: GIT_DIR=%s", invocation.env["GIT_DIR"])
277 process = subprocess.Popen(
278 cmd_args,
279 stdout=subprocess.PIPE,
280 stderr=subprocess.PIPE,
281 text=True,
282 bufsize=1, # Line buffered
283 cwd=working_dir,
284 env=env,
285 )
287 if on_process_start:
288 on_process_start(process)
290 stdout_lines: list[str] = []
291 stderr_lines: list[str] = []
293 # Use selectors for non-blocking read from both streams
294 with selectors.DefaultSelector() as sel:
295 if process.stdout:
296 sel.register(process.stdout, selectors.EVENT_READ)
297 if process.stderr:
298 sel.register(process.stderr, selectors.EVENT_READ)
300 start_time = time.time()
301 last_output_time = start_time
303 try:
304 while sel.get_map():
305 now = time.time()
306 if timeout and (now - start_time) > timeout:
307 process.kill()
308 try:
309 process.wait(timeout=10)
310 except subprocess.TimeoutExpired:
311 logger.warning(
312 "Process %s did not terminate within 10s after kill",
313 process.pid,
314 )
315 raise subprocess.TimeoutExpired(cmd_args, timeout)
317 if idle_timeout and (now - last_output_time) > idle_timeout:
318 process.kill()
319 try:
320 process.wait(timeout=10)
321 except subprocess.TimeoutExpired:
322 logger.warning(
323 "Process %s did not terminate within 10s after kill",
324 process.pid,
325 )
326 raise subprocess.TimeoutExpired(cmd_args, idle_timeout, output="idle_timeout")
328 ready = sel.select(timeout=1.0)
329 for key, _ in ready:
330 line = key.fileobj.readline() # type: ignore[union-attr]
331 if not line:
332 sel.unregister(key.fileobj)
333 continue
335 last_output_time = time.time()
336 line = line.rstrip("\n")
337 is_stderr = key.fileobj is process.stderr
339 if not is_stderr:
340 try:
341 event = json.loads(line)
342 etype = event.get("type")
343 if etype == "system" and event.get("subtype") == "init":
344 if on_model_detected and "model" in event:
345 on_model_detected(event["model"])
346 continue # don't add to stdout_lines
347 elif etype == "assistant":
348 msg = event.get("message", {})
349 text_parts = [
350 block["text"]
351 for block in msg.get("content", [])
352 if block.get("type") == "text"
353 ]
354 text = "\n\n".join(text_parts)
355 if not text:
356 continue
357 for sub_line in text.splitlines() or [""]:
358 stdout_lines.append(sub_line)
359 if stream_callback:
360 stream_callback(sub_line, is_stderr)
361 continue
362 elif etype == "result":
363 usage = event.get("usage", {})
364 if on_usage and usage:
365 on_usage(
366 usage.get("input_tokens", 0)
367 + usage.get("cache_read_input_tokens", 0),
368 usage.get("output_tokens", 0),
369 )
370 if event.get("is_error"):
371 error_text = event.get("error") or event.get("result", "")
372 if error_text:
373 stderr_lines.append(f"[result] {error_text}")
374 continue # skip other event types (tool_use, etc.)
375 else:
376 continue # skip other event types (tool_use, etc.)
377 except (json.JSONDecodeError, KeyError, TypeError):
378 pass # non-JSON line: pass through as raw text
380 if is_stderr:
381 stderr_lines.append(line)
382 else:
383 stdout_lines.append(line)
385 if stream_callback:
386 stream_callback(line, is_stderr)
388 try:
389 process.wait(timeout=30)
390 except subprocess.TimeoutExpired:
391 logger.warning(
392 "Process %s did not exit within 30s after streams closed, killing",
393 process.pid,
394 )
395 process.kill()
396 try:
397 process.wait(timeout=10)
398 except subprocess.TimeoutExpired:
399 logger.warning(
400 "Process %s did not terminate within 10s after kill",
401 process.pid,
402 )
403 finally:
404 if on_process_end:
405 on_process_end(process)
407 return subprocess.CompletedProcess(
408 cmd_args,
409 process.returncode if process.returncode is not None else -9,
410 stdout="\n".join(stdout_lines),
411 stderr="\n".join(stderr_lines),
412 )