Coverage for little_loops / subprocess_utils.py: 86%

174 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Subprocess utilities for Claude CLI invocation. 

2 

3Provides shared functionality for running Claude CLI commands with 

4real-time output streaming, timeout handling, and context handoff detection. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import logging 

11import os 

12import re 

13import selectors 

14import subprocess 

15import time 

16from collections.abc import Callable 

17from pathlib import Path 

18 

19from little_loops.host_runner import resolve_host 

20 

21logger = logging.getLogger(__name__) 

22 

23# Callback type: (line: str, is_stderr: bool) -> None 

24OutputCallback = Callable[[str, bool], None] 

25 

26# Process lifecycle callback: (process: Popen) -> None 

27ProcessCallback = Callable[[subprocess.Popen[str]], None] 

28 

29# Model detection callback: (model: str) -> None 

30ModelCallback = Callable[[str], None] 

31 

32# Usage callback: (input_tokens: int, output_tokens: int) -> None 

33UsageCallback = Callable[[int, int], None] 

34 

35# Context handoff detection pattern 

36CONTEXT_HANDOFF_PATTERN = re.compile(r"CONTEXT_HANDOFF:\s*Ready for fresh session") 

37CONTINUATION_PROMPT_PATH = Path(".ll/ll-continue-prompt.md") 

38 

39# Sentinel file written when a session ends with high context usage (Option G). 

40# Consumed by run_with_continuation; NOT deleted by session-cleanup.sh. 

41SENTINEL_PATH = Path(".ll/ll-context-handoff-needed") 

42 

43# Chars of captured_stdout to include in Option J guillotine prompt (≈3K tokens). 

44_GUILLOTINE_TAIL_CHARS = 12_000 

45# Lines of original_command to include for task intent. 

46_GUILLOTINE_MAX_TASK_LINES = 20 

47 

48 

49def detect_context_handoff(output: str) -> bool: 

50 """Check if output contains a context handoff signal. 

51 

52 Args: 

53 output: Command output to check 

54 

55 Returns: 

56 True if context handoff was signaled 

57 """ 

58 return bool(CONTEXT_HANDOFF_PATTERN.search(output)) 

59 

60 

61def read_continuation_prompt(repo_path: Path | None = None) -> str | None: 

62 """Read the continuation prompt file if it exists. 

63 

64 Args: 

65 repo_path: Optional repository root path 

66 

67 Returns: 

68 Contents of continuation prompt, or None if not found 

69 """ 

70 prompt_path = (repo_path or Path.cwd()) / CONTINUATION_PROMPT_PATH 

71 if prompt_path.exists(): 

72 return prompt_path.read_text() 

73 return None 

74 

75 

76def read_sentinel(repo_path: Path | None = None) -> dict | None: 

77 """Read and consume the context-handoff sentinel file if it exists. 

78 

79 The sentinel is written by context-handoff-sentinel.sh (Stop hook) or 

80 the Python layer in run_with_continuation when a session ends with high 

81 context usage but no CONTEXT_HANDOFF signal. 

82 

83 Args: 

84 repo_path: Optional repository root path 

85 

86 Returns: 

87 Parsed sentinel dict, or None if not present 

88 """ 

89 sentinel_path = (repo_path or Path.cwd()) / SENTINEL_PATH 

90 if not sentinel_path.exists(): 

91 return None 

92 try: 

93 data = json.loads(sentinel_path.read_text()) 

94 sentinel_path.unlink(missing_ok=True) 

95 return data 

96 except Exception: 

97 sentinel_path.unlink(missing_ok=True) 

98 return {} 

99 

100 

101def write_sentinel( 

102 repo_path: Path | None = None, 

103 token_count: int = 0, 

104 context_limit: int = 200_000, 

105) -> None: 

106 """Write the context-handoff sentinel file. 

107 

108 Args: 

109 repo_path: Optional repository root path 

110 token_count: Total tokens used in the session 

111 context_limit: Context window size 

112 """ 

113 import datetime 

114 

115 sentinel_path = (repo_path or Path.cwd()) / SENTINEL_PATH 

116 usage_percent = int(token_count * 100 / context_limit) if context_limit > 0 else 0 

117 try: 

118 sentinel_path.parent.mkdir(parents=True, exist_ok=True) 

119 sentinel_path.write_text( 

120 json.dumps( 

121 { 

122 "written_at": datetime.datetime.now(datetime.UTC).strftime( 

123 "%Y-%m-%dT%H:%M:%SZ" 

124 ), 

125 "token_count": token_count, 

126 "context_limit": context_limit, 

127 "usage_percent": usage_percent, 

128 } 

129 ) 

130 ) 

131 except Exception: 

132 pass 

133 

134 

135def assemble_guillotine_prompt( 

136 original_command: str, 

137 captured_stdout: str, 

138 token_stats: dict, 

139) -> str: 

140 """Assemble a fresh-session continuation prompt for Option J (parent-side guillotine). 

141 

142 Called when context > 90% or "Prompt is too long" is detected with no handoff. 

143 The resulting prompt is passed to a BRAND-NEW claude -p session (not --resume), 

144 so it starts with 0 tokens. 

145 

146 Args: 

147 original_command: The original task command / skill invocation 

148 captured_stdout: All Claude text output captured so far 

149 token_stats: Dict with keys: input_tokens, output_tokens, context_limit, 

150 trigger_reason (optional) 

151 

152 Returns: 

153 Assembled continuation prompt string 

154 """ 

155 task_lines = original_command.strip().splitlines()[:_GUILLOTINE_MAX_TASK_LINES] 

156 task_excerpt = "\n".join(task_lines) 

157 if len(original_command.strip().splitlines()) > _GUILLOTINE_MAX_TASK_LINES: 

158 task_excerpt += f"\n... (truncated to {_GUILLOTINE_MAX_TASK_LINES} lines)" 

159 

160 stdout_tail = (captured_stdout or "")[-_GUILLOTINE_TAIL_CHARS:] 

161 if not stdout_tail: 

162 stdout_tail = "(no output captured before interruption)" 

163 

164 input_tokens = token_stats.get("input_tokens", 0) 

165 output_tokens = token_stats.get("output_tokens", 0) 

166 context_limit = token_stats.get("context_limit", 200_000) 

167 trigger_reason = token_stats.get("trigger_reason", "context > 90%") 

168 

169 scratch_listing = _list_scratch_files() 

170 

171 return f"""\ 

172⚠ CONTEXT LIMIT REACHED — FRESH SESSION CONTINUATION 

173 

174The previous automation session exhausted its context window before completing. 

175This fresh session (new context window, starts at 0 tokens) is continuing from 

176that interrupted session. 

177 

178## Original Task 

179{task_excerpt} 

180 

181## Session Progress at Interruption 

182- Approximate tokens used: {input_tokens + output_tokens:,} / {context_limit:,} 

183- Trigger reason: {trigger_reason} 

184 

185## Last Session Output (what was happening at interruption) 

186{stdout_tail} 

187 

188## Scratch Pad Files Available 

189{scratch_listing} 

190 

191## Instructions for This Session 

1921. Do NOT restart from scratch — the previous session made progress (see above) 

1932. Read the "Last Session Output" section to understand exactly where we were 

1943. Check the scratch pad files before re-running expensive operations 

1954. Continue implementation from the interruption point 

1965. Complete normally: test, commit, close the issue as usual 

197""" 

198 

199 

200def _list_scratch_files() -> str: 

201 """List files in .loops/tmp/scratch/ with sizes for the guillotine prompt.""" 

202 scratch_dir = Path(".loops/tmp/scratch") 

203 if not scratch_dir.exists(): 

204 return "None" 

205 try: 

206 files = sorted(scratch_dir.iterdir()) 

207 if not files: 

208 return "None" 

209 lines = [] 

210 for f in files: 

211 try: 

212 size_kb = f.stat().st_size // 1024 

213 lines.append(f" {f.name} ({size_kb}KB)") 

214 except Exception: 

215 lines.append(f" {f.name}") 

216 return "\n".join(lines) 

217 except Exception: 

218 return "None" 

219 

220 

221def run_claude_command( 

222 command: str, 

223 timeout: int = 3600, 

224 working_dir: Path | None = None, 

225 stream_callback: OutputCallback | None = None, 

226 on_process_start: ProcessCallback | None = None, 

227 on_process_end: ProcessCallback | None = None, 

228 idle_timeout: int = 0, 

229 on_model_detected: ModelCallback | None = None, 

230 on_usage: UsageCallback | None = None, 

231 agent: str | None = None, 

232 tools: list[str] | None = None, 

233 resume_session: bool = False, 

234) -> subprocess.CompletedProcess[str]: 

235 """Invoke Claude CLI command with real-time output streaming. 

236 

237 Args: 

238 command: Command to pass to Claude CLI 

239 timeout: Timeout in seconds (0 for no timeout) 

240 working_dir: Optional working directory for the command 

241 stream_callback: Optional callback for streaming output lines. 

242 Called with (line, is_stderr) for each line of output. 

243 on_process_start: Optional callback invoked after process starts. 

244 Receives the Popen object for tracking/management. 

245 on_process_end: Optional callback invoked after process completes. 

246 Receives the Popen object. Called in finally block. 

247 idle_timeout: Kill process if no output for this many seconds (0 to disable). 

248 on_model_detected: Optional callback invoked with the model name from the 

249 stream-json system/init event. Called at most once per invocation. 

250 on_usage: Optional callback invoked with (input_tokens, output_tokens) from 

251 the stream-json result event. input_tokens includes cache_read_input_tokens. 

252 resume_session: If True, passes --continue to the Claude CLI to continue the 

253 most recent conversation. Used for the Option E explicit-handoff path. 

254 

255 Returns: 

256 CompletedProcess with stdout/stderr captured 

257 

258 Raises: 

259 subprocess.TimeoutExpired: If command exceeds timeout or idle timeout. 

260 When triggered by idle timeout, the output field is set to "idle_timeout". 

261 """ 

262 runner = resolve_host() 

263 invocation = runner.build_streaming( 

264 prompt=command, 

265 working_dir=working_dir, 

266 resume=resume_session, 

267 agent=agent, 

268 tools=tools, 

269 ) 

270 cmd_args = [invocation.binary, *invocation.args] 

271 

272 env = os.environ.copy() 

273 env.update(invocation.env) 

274 if "GIT_DIR" in invocation.env: 

275 logger.debug("Worktree detected: GIT_DIR=%s", invocation.env["GIT_DIR"]) 

276 

277 process = subprocess.Popen( 

278 cmd_args, 

279 stdout=subprocess.PIPE, 

280 stderr=subprocess.PIPE, 

281 text=True, 

282 bufsize=1, # Line buffered 

283 cwd=working_dir, 

284 env=env, 

285 ) 

286 

287 if on_process_start: 

288 on_process_start(process) 

289 

290 stdout_lines: list[str] = [] 

291 stderr_lines: list[str] = [] 

292 

293 # Use selectors for non-blocking read from both streams 

294 with selectors.DefaultSelector() as sel: 

295 if process.stdout: 

296 sel.register(process.stdout, selectors.EVENT_READ) 

297 if process.stderr: 

298 sel.register(process.stderr, selectors.EVENT_READ) 

299 

300 start_time = time.time() 

301 last_output_time = start_time 

302 

303 try: 

304 while sel.get_map(): 

305 now = time.time() 

306 if timeout and (now - start_time) > timeout: 

307 process.kill() 

308 try: 

309 process.wait(timeout=10) 

310 except subprocess.TimeoutExpired: 

311 logger.warning( 

312 "Process %s did not terminate within 10s after kill", 

313 process.pid, 

314 ) 

315 raise subprocess.TimeoutExpired(cmd_args, timeout) 

316 

317 if idle_timeout and (now - last_output_time) > idle_timeout: 

318 process.kill() 

319 try: 

320 process.wait(timeout=10) 

321 except subprocess.TimeoutExpired: 

322 logger.warning( 

323 "Process %s did not terminate within 10s after kill", 

324 process.pid, 

325 ) 

326 raise subprocess.TimeoutExpired(cmd_args, idle_timeout, output="idle_timeout") 

327 

328 ready = sel.select(timeout=1.0) 

329 for key, _ in ready: 

330 line = key.fileobj.readline() # type: ignore[union-attr] 

331 if not line: 

332 sel.unregister(key.fileobj) 

333 continue 

334 

335 last_output_time = time.time() 

336 line = line.rstrip("\n") 

337 is_stderr = key.fileobj is process.stderr 

338 

339 if not is_stderr: 

340 try: 

341 event = json.loads(line) 

342 etype = event.get("type") 

343 if etype == "system" and event.get("subtype") == "init": 

344 if on_model_detected and "model" in event: 

345 on_model_detected(event["model"]) 

346 continue # don't add to stdout_lines 

347 elif etype == "assistant": 

348 msg = event.get("message", {}) 

349 text_parts = [ 

350 block["text"] 

351 for block in msg.get("content", []) 

352 if block.get("type") == "text" 

353 ] 

354 text = "\n\n".join(text_parts) 

355 if not text: 

356 continue 

357 for sub_line in text.splitlines() or [""]: 

358 stdout_lines.append(sub_line) 

359 if stream_callback: 

360 stream_callback(sub_line, is_stderr) 

361 continue 

362 elif etype == "result": 

363 usage = event.get("usage", {}) 

364 if on_usage and usage: 

365 on_usage( 

366 usage.get("input_tokens", 0) 

367 + usage.get("cache_read_input_tokens", 0), 

368 usage.get("output_tokens", 0), 

369 ) 

370 if event.get("is_error"): 

371 error_text = event.get("error") or event.get("result", "") 

372 if error_text: 

373 stderr_lines.append(f"[result] {error_text}") 

374 continue # skip other event types (tool_use, etc.) 

375 else: 

376 continue # skip other event types (tool_use, etc.) 

377 except (json.JSONDecodeError, KeyError, TypeError): 

378 pass # non-JSON line: pass through as raw text 

379 

380 if is_stderr: 

381 stderr_lines.append(line) 

382 else: 

383 stdout_lines.append(line) 

384 

385 if stream_callback: 

386 stream_callback(line, is_stderr) 

387 

388 try: 

389 process.wait(timeout=30) 

390 except subprocess.TimeoutExpired: 

391 logger.warning( 

392 "Process %s did not exit within 30s after streams closed, killing", 

393 process.pid, 

394 ) 

395 process.kill() 

396 try: 

397 process.wait(timeout=10) 

398 except subprocess.TimeoutExpired: 

399 logger.warning( 

400 "Process %s did not terminate within 10s after kill", 

401 process.pid, 

402 ) 

403 finally: 

404 if on_process_end: 

405 on_process_end(process) 

406 

407 return subprocess.CompletedProcess( 

408 cmd_args, 

409 process.returncode if process.returncode is not None else -9, 

410 stdout="\n".join(stdout_lines), 

411 stderr="\n".join(stderr_lines), 

412 )