Coverage for little_loops / cli / loop / run.py: 89%

224 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-loop run subcommand.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import atexit 

7import json 

8import os 

9import re 

10import time 

11import uuid 

12from datetime import UTC, datetime 

13from pathlib import Path 

14 

15from little_loops.cli.loop._helpers import ( 

16 _is_earliest_waiter, 

17 _make_instance_id, 

18 get_builtin_loops_dir, 

19 print_execution_plan, 

20 register_loop_signal_handlers, 

21 resolve_loop_path, 

22 run_background, 

23 run_foreground, 

24) 

25from little_loops.logger import Logger 

26 

27 

28def _parse_program_md(path: Path) -> dict[str, str]: 

29 """Parse .ll/program.md heading sections into context key-value pairs. 

30 

31 Sections mapped: 

32 ## Directive → directive (prose) 

33 ## Targets → targets (space-joined list items) 

34 ## Benchmark → each key: value pair injected directly 

35 ## Budget → budget (prose) 

36 ## Constraints → constraints (prose) 

37 """ 

38 if not path.exists(): 

39 return {} 

40 try: 

41 content = path.read_text() 

42 except OSError: 

43 return {} 

44 

45 def _extract(heading: str) -> str: 

46 m = re.search(rf"^##\s+{re.escape(heading)}\s*$", content, re.MULTILINE | re.IGNORECASE) 

47 if not m: 

48 return "" 

49 start = m.end() 

50 nxt = re.search(r"^##\s", content[start:], re.MULTILINE) 

51 return content[start : start + nxt.start()].strip() if nxt else content[start:].strip() 

52 

53 result: dict[str, str] = {} 

54 

55 directive = _extract("Directive") 

56 if directive: 

57 result["directive"] = directive 

58 

59 targets_text = _extract("Targets") 

60 if targets_text: 

61 items = [ 

62 line.lstrip("-* \t").strip() 

63 for line in targets_text.splitlines() 

64 if line.strip().startswith(("-", "*")) 

65 ] 

66 result["targets"] = " ".join(items) if items else targets_text 

67 

68 benchmark_text = _extract("Benchmark") 

69 if benchmark_text: 

70 for line in benchmark_text.splitlines(): 

71 if ":" in line: 

72 k, _, v = line.partition(":") 

73 k, v = k.strip(), v.strip() 

74 if k and v: 

75 result[k] = v 

76 

77 budget = _extract("Budget") 

78 if budget: 

79 result["budget"] = budget 

80 

81 constraints = _extract("Constraints") 

82 if constraints: 

83 result["constraints"] = constraints 

84 

85 return result 

86 

87 

88def cmd_run( 

89 loop_name: str, 

90 args: argparse.Namespace, 

91 loops_dir: Path, 

92 logger: Logger, 

93) -> int: 

94 """Run a loop.""" 

95 from little_loops.fsm.concurrency import LockManager 

96 from little_loops.fsm.persistence import PersistentExecutor, _reconcile_stale_runs 

97 from little_loops.fsm.rate_limit_circuit import RateLimitCircuit 

98 from little_loops.fsm.validation import load_and_validate 

99 

100 try: 

101 if getattr(args, "builtin", False): 

102 path = get_builtin_loops_dir() / f"{loop_name}.yaml" 

103 if not path.exists(): 

104 logger.error(f"Built-in loop not found: {loop_name!r}") 

105 return 1 

106 else: 

107 path = resolve_loop_path(loop_name, loops_dir) 

108 fsm, _ = load_and_validate(path) 

109 except FileNotFoundError as e: 

110 logger.error(str(e)) 

111 return 1 

112 except ValueError as e: 

113 logger.error(f"Validation error: {e}") 

114 return 1 

115 

116 # Apply overrides 

117 if args.max_iterations: 

118 fsm.max_iterations = args.max_iterations 

119 if args.delay is not None: 

120 fsm.backoff = args.delay 

121 if args.no_llm: 

122 fsm.llm.enabled = False 

123 if args.llm_model: 

124 fsm.llm.model = args.llm_model 

125 # Inject positional input arg before --context so --context can override 

126 if getattr(args, "input", None) is not None: 

127 raw = args.input 

128 try: 

129 parsed = json.loads(raw) 

130 if isinstance(parsed, dict): 

131 matched = {k: v for k, v in parsed.items() if k in fsm.context} 

132 if matched: 

133 fsm.context.update(matched) 

134 else: 

135 fsm.context[fsm.input_key] = raw 

136 else: 

137 fsm.context[fsm.input_key] = raw 

138 except (json.JSONDecodeError, ValueError): 

139 fsm.context[fsm.input_key] = raw 

140 # Inject program.md fields before --context so --context can override 

141 program_md_arg = getattr(args, "program_md", None) 

142 program_md_path = ( 

143 program_md_arg if program_md_arg is not None else Path.cwd() / ".ll" / "program.md" 

144 ) 

145 for key, value in _parse_program_md(program_md_path).items(): 

146 fsm.context[key] = value 

147 for kv in getattr(args, "context", None) or []: 

148 if "=" not in kv: 

149 raise SystemExit(f"Invalid --context format: {kv!r} (expected KEY=VALUE)") 

150 key, _, value = kv.partition("=") 

151 fsm.context[key.strip()] = value.strip() 

152 

153 # Apply YAML loop config env-var overrides (CLI flags below overwrite these) 

154 if fsm.config is not None and isinstance(fsm.config.handoff_threshold, int): 

155 os.environ["LL_HANDOFF_THRESHOLD"] = str(fsm.config.handoff_threshold) 

156 

157 if getattr(args, "handoff_threshold", None) is not None: 

158 if not (1 <= args.handoff_threshold <= 100): 

159 raise SystemExit("--handoff-threshold must be between 1 and 100") 

160 os.environ["LL_HANDOFF_THRESHOLD"] = str(args.handoff_threshold) 

161 

162 if getattr(args, "context_limit", None) is not None: 

163 os.environ["LL_CONTEXT_LIMIT"] = str(args.context_limit) 

164 

165 from little_loops.config import BRConfig 

166 

167 _config = BRConfig(Path.cwd()) 

168 _edge_label_colors = _config.cli.colors.fsm_edge_labels.to_dict() 

169 _highlight_color = _config.cli.colors.fsm_active_state 

170 _badges = _config.loops.glyphs.to_dict() 

171 

172 # Dry run 

173 if args.dry_run: 

174 print_execution_plan(fsm, edge_label_colors=_edge_label_colors) 

175 return 0 

176 

177 # Pre-run validation: check required context variables are present 

178 _ctx_var_re = re.compile(r"\$\{context\.([^}.]+)") 

179 missing_keys: set[str] = set() 

180 for state in fsm.states.values(): 

181 templates = [state.action] if state.action else [] 

182 if state.evaluate and state.evaluate.prompt: 

183 templates.append(state.evaluate.prompt) 

184 for template in templates: 

185 for m in _ctx_var_re.finditer(template): 

186 key = m.group(1) 

187 if key not in fsm.context: 

188 missing_keys.add(key) 

189 if missing_keys: 

190 for key in sorted(missing_keys): 

191 logger.error( 

192 f"Missing required context variable: '{key}'. " 

193 f"Run with: ll-loop run {loop_name} --context {key}=VALUE" 

194 ) 

195 return 1 

196 

197 # Background mode: spawn detached process and return 

198 if getattr(args, "background", False): 

199 if getattr(args, "worktree", False): 

200 raise SystemExit("--worktree and --background cannot be combined") 

201 return run_background(loop_name, args, loops_dir) 

202 

203 # Register PID file for all foreground runs so cmd_stop can send SIGTERM (BUG-639). 

204 # Background-spawned processes (foreground_internal=True) have their PID written by the 

205 # parent in run_background(); plain foreground runs must write their own PID here. 

206 running_dir = loops_dir / ".running" 

207 running_dir.mkdir(parents=True, exist_ok=True) 

208 _reconcile_stale_runs(loops_dir) 

209 if getattr(args, "foreground_internal", False): 

210 instance_id: str | None = getattr(args, "instance_id", None) 

211 else: 

212 instance_id = _make_instance_id(loop_name) 

213 pid_file = running_dir / f"{instance_id or loop_name}.pid" 

214 foreground_pid_file: Path | None = pid_file 

215 

216 if not getattr(args, "foreground_internal", False): 

217 pid_file.write_text(str(os.getpid())) 

218 

219 def _cleanup_pid() -> None: 

220 pid_file.unlink(missing_ok=True) 

221 

222 atexit.register(_cleanup_pid) 

223 

224 # Scope-based locking 

225 lock_manager = LockManager(loops_dir) 

226 scope = fsm.scope or ["."] 

227 _queue_entry_file: Path | None = None 

228 

229 def _cleanup_queue_entry() -> None: 

230 if _queue_entry_file is not None: 

231 _queue_entry_file.unlink(missing_ok=True) 

232 

233 if not lock_manager.acquire(fsm.name, scope, instance_id=instance_id): 

234 conflict = lock_manager.find_conflict(scope) 

235 if conflict and getattr(args, "queue", False): 

236 # Write queue entry so dashboard shows the waiting loop 

237 queue_dir = loops_dir / ".queue" 

238 queue_dir.mkdir(parents=True, exist_ok=True) 

239 entry_id = str(uuid.uuid4()) 

240 entry = { 

241 "id": entry_id, 

242 "loopName": loop_name, 

243 "enqueuedAt": datetime.now(UTC).isoformat(), 

244 "context": { 

245 "waitingFor": conflict.loop_name, 

246 "scope": conflict.scope, 

247 "pid": os.getpid(), 

248 }, 

249 } 

250 _queue_entry_file = queue_dir / f"{entry_id}.json" 

251 _queue_entry_file.write_text(json.dumps(entry, indent=2)) 

252 atexit.register(_cleanup_queue_entry) 

253 

254 logger.info(f"Waiting for conflicting loop '{conflict.loop_name}' to finish...") 

255 # Retry loop: when N waiters are released simultaneously, only one wins 

256 # acquire(); losers loop back and wait again rather than exiting (BUG-1281). 

257 acquired = False 

258 _wait_start = time.time() 

259 _budget = _config.loops.queue_wait_timeout_seconds 

260 while time.time() - _wait_start < _budget: 

261 _remaining = _budget - (time.time() - _wait_start) 

262 if not lock_manager.wait_for_scope(scope, timeout=int(_remaining)): 

263 _cleanup_queue_entry() 

264 logger.error("Timeout waiting for scope to become available") 

265 return 1 

266 if not _is_earliest_waiter(entry_id, queue_dir): 

267 time.sleep(1) 

268 continue 

269 if lock_manager.acquire(fsm.name, scope, instance_id=instance_id): 

270 acquired = True 

271 break 

272 if not acquired: 

273 _cleanup_queue_entry() 

274 logger.error("Failed to acquire lock after waiting") 

275 return 1 

276 # Lock acquired - no longer queued 

277 _cleanup_queue_entry() 

278 elif conflict: 

279 logger.error(f"Scope conflict with running loop: {conflict.loop_name}") 

280 logger.info(f" Conflicting scope: {conflict.scope}") 

281 logger.info(" Use --queue to wait for it to finish") 

282 return 1 

283 else: 

284 # Unexpected: find_conflict returned None but acquire failed 

285 logger.error("Failed to acquire scope lock (unknown reason)") 

286 return 1 

287 

288 executor: PersistentExecutor | None = None 

289 try: 

290 # Worktree isolation: create branch + directory before anything reads Path.cwd() 

291 if getattr(args, "worktree", False): 

292 from little_loops.config import BRConfig as _MainBRConfig 

293 from little_loops.parallel.git_lock import GitLock 

294 from little_loops.worktree_utils import cleanup_worktree, setup_worktree 

295 

296 _main_config = _MainBRConfig(Path.cwd()) 

297 _worktree_base = _main_config.get_worktree_base() 

298 _copy_files = _main_config.parallel.worktree_copy_files 

299 _repo_path = Path.cwd() 

300 

301 _timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 

302 _safe_name = re.sub(r"[^a-zA-Z0-9-]", "-", loop_name) 

303 _branch_name = f"{_timestamp}-{_safe_name}" 

304 _worktree_path = _worktree_base / _branch_name 

305 

306 _worktree_base.mkdir(parents=True, exist_ok=True) 

307 _git_lock = GitLock(logger) 

308 

309 setup_worktree( 

310 repo_path=_repo_path, 

311 worktree_path=_worktree_path, 

312 branch_name=_branch_name, 

313 copy_files=_copy_files, 

314 logger=logger, 

315 git_lock=_git_lock, 

316 ) 

317 

318 logger.info(f"Worktree: {_worktree_path}") 

319 logger.info(f"Branch: {_branch_name}") 

320 

321 def _cleanup_worktree_on_exit() -> None: 

322 cleanup_worktree( 

323 worktree_path=_worktree_path, 

324 repo_path=_repo_path, 

325 logger=logger, 

326 git_lock=_git_lock, 

327 delete_branch=True, 

328 ) 

329 

330 atexit.register(_cleanup_worktree_on_exit) 

331 

332 os.environ["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1" 

333 os.chdir(_worktree_path) 

334 

335 circuit = ( 

336 RateLimitCircuit(Path(_config.commands.rate_limits.circuit_breaker_path)) 

337 if _config.commands.rate_limits.circuit_breaker_enabled 

338 else None 

339 ) 

340 executor = PersistentExecutor( 

341 fsm, loops_dir=loops_dir, circuit=circuit, instance_id=instance_id, pid=os.getpid() 

342 ) 

343 

344 # Register signal handlers for graceful shutdown 

345 register_loop_signal_handlers(executor, pid_file=foreground_pid_file) 

346 

347 from little_loops.extension import wire_extensions 

348 from little_loops.transport import wire_transports 

349 

350 wire_extensions(executor.event_bus, _config.extensions, executor=executor) 

351 wire_transports(executor.event_bus, _config.events) 

352 return run_foreground( 

353 executor, 

354 fsm, 

355 args, 

356 highlight_color=_highlight_color, 

357 edge_label_colors=_edge_label_colors, 

358 badges=_badges, 

359 ) 

360 finally: 

361 if executor is not None: 

362 executor.close_transports() 

363 lock_manager.release(fsm.name, instance_id=instance_id)