Coverage for little_loops / cli / loop / run.py: 89%
224 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-loop run subcommand."""
3from __future__ import annotations
5import argparse
6import atexit
7import json
8import os
9import re
10import time
11import uuid
12from datetime import UTC, datetime
13from pathlib import Path
15from little_loops.cli.loop._helpers import (
16 _is_earliest_waiter,
17 _make_instance_id,
18 get_builtin_loops_dir,
19 print_execution_plan,
20 register_loop_signal_handlers,
21 resolve_loop_path,
22 run_background,
23 run_foreground,
24)
25from little_loops.logger import Logger
28def _parse_program_md(path: Path) -> dict[str, str]:
29 """Parse .ll/program.md heading sections into context key-value pairs.
31 Sections mapped:
32 ## Directive → directive (prose)
33 ## Targets → targets (space-joined list items)
34 ## Benchmark → each key: value pair injected directly
35 ## Budget → budget (prose)
36 ## Constraints → constraints (prose)
37 """
38 if not path.exists():
39 return {}
40 try:
41 content = path.read_text()
42 except OSError:
43 return {}
45 def _extract(heading: str) -> str:
46 m = re.search(rf"^##\s+{re.escape(heading)}\s*$", content, re.MULTILINE | re.IGNORECASE)
47 if not m:
48 return ""
49 start = m.end()
50 nxt = re.search(r"^##\s", content[start:], re.MULTILINE)
51 return content[start : start + nxt.start()].strip() if nxt else content[start:].strip()
53 result: dict[str, str] = {}
55 directive = _extract("Directive")
56 if directive:
57 result["directive"] = directive
59 targets_text = _extract("Targets")
60 if targets_text:
61 items = [
62 line.lstrip("-* \t").strip()
63 for line in targets_text.splitlines()
64 if line.strip().startswith(("-", "*"))
65 ]
66 result["targets"] = " ".join(items) if items else targets_text
68 benchmark_text = _extract("Benchmark")
69 if benchmark_text:
70 for line in benchmark_text.splitlines():
71 if ":" in line:
72 k, _, v = line.partition(":")
73 k, v = k.strip(), v.strip()
74 if k and v:
75 result[k] = v
77 budget = _extract("Budget")
78 if budget:
79 result["budget"] = budget
81 constraints = _extract("Constraints")
82 if constraints:
83 result["constraints"] = constraints
85 return result
88def cmd_run(
89 loop_name: str,
90 args: argparse.Namespace,
91 loops_dir: Path,
92 logger: Logger,
93) -> int:
94 """Run a loop."""
95 from little_loops.fsm.concurrency import LockManager
96 from little_loops.fsm.persistence import PersistentExecutor, _reconcile_stale_runs
97 from little_loops.fsm.rate_limit_circuit import RateLimitCircuit
98 from little_loops.fsm.validation import load_and_validate
100 try:
101 if getattr(args, "builtin", False):
102 path = get_builtin_loops_dir() / f"{loop_name}.yaml"
103 if not path.exists():
104 logger.error(f"Built-in loop not found: {loop_name!r}")
105 return 1
106 else:
107 path = resolve_loop_path(loop_name, loops_dir)
108 fsm, _ = load_and_validate(path)
109 except FileNotFoundError as e:
110 logger.error(str(e))
111 return 1
112 except ValueError as e:
113 logger.error(f"Validation error: {e}")
114 return 1
116 # Apply overrides
117 if args.max_iterations:
118 fsm.max_iterations = args.max_iterations
119 if args.delay is not None:
120 fsm.backoff = args.delay
121 if args.no_llm:
122 fsm.llm.enabled = False
123 if args.llm_model:
124 fsm.llm.model = args.llm_model
125 # Inject positional input arg before --context so --context can override
126 if getattr(args, "input", None) is not None:
127 raw = args.input
128 try:
129 parsed = json.loads(raw)
130 if isinstance(parsed, dict):
131 matched = {k: v for k, v in parsed.items() if k in fsm.context}
132 if matched:
133 fsm.context.update(matched)
134 else:
135 fsm.context[fsm.input_key] = raw
136 else:
137 fsm.context[fsm.input_key] = raw
138 except (json.JSONDecodeError, ValueError):
139 fsm.context[fsm.input_key] = raw
140 # Inject program.md fields before --context so --context can override
141 program_md_arg = getattr(args, "program_md", None)
142 program_md_path = (
143 program_md_arg if program_md_arg is not None else Path.cwd() / ".ll" / "program.md"
144 )
145 for key, value in _parse_program_md(program_md_path).items():
146 fsm.context[key] = value
147 for kv in getattr(args, "context", None) or []:
148 if "=" not in kv:
149 raise SystemExit(f"Invalid --context format: {kv!r} (expected KEY=VALUE)")
150 key, _, value = kv.partition("=")
151 fsm.context[key.strip()] = value.strip()
153 # Apply YAML loop config env-var overrides (CLI flags below overwrite these)
154 if fsm.config is not None and isinstance(fsm.config.handoff_threshold, int):
155 os.environ["LL_HANDOFF_THRESHOLD"] = str(fsm.config.handoff_threshold)
157 if getattr(args, "handoff_threshold", None) is not None:
158 if not (1 <= args.handoff_threshold <= 100):
159 raise SystemExit("--handoff-threshold must be between 1 and 100")
160 os.environ["LL_HANDOFF_THRESHOLD"] = str(args.handoff_threshold)
162 if getattr(args, "context_limit", None) is not None:
163 os.environ["LL_CONTEXT_LIMIT"] = str(args.context_limit)
165 from little_loops.config import BRConfig
167 _config = BRConfig(Path.cwd())
168 _edge_label_colors = _config.cli.colors.fsm_edge_labels.to_dict()
169 _highlight_color = _config.cli.colors.fsm_active_state
170 _badges = _config.loops.glyphs.to_dict()
172 # Dry run
173 if args.dry_run:
174 print_execution_plan(fsm, edge_label_colors=_edge_label_colors)
175 return 0
177 # Pre-run validation: check required context variables are present
178 _ctx_var_re = re.compile(r"\$\{context\.([^}.]+)")
179 missing_keys: set[str] = set()
180 for state in fsm.states.values():
181 templates = [state.action] if state.action else []
182 if state.evaluate and state.evaluate.prompt:
183 templates.append(state.evaluate.prompt)
184 for template in templates:
185 for m in _ctx_var_re.finditer(template):
186 key = m.group(1)
187 if key not in fsm.context:
188 missing_keys.add(key)
189 if missing_keys:
190 for key in sorted(missing_keys):
191 logger.error(
192 f"Missing required context variable: '{key}'. "
193 f"Run with: ll-loop run {loop_name} --context {key}=VALUE"
194 )
195 return 1
197 # Background mode: spawn detached process and return
198 if getattr(args, "background", False):
199 if getattr(args, "worktree", False):
200 raise SystemExit("--worktree and --background cannot be combined")
201 return run_background(loop_name, args, loops_dir)
203 # Register PID file for all foreground runs so cmd_stop can send SIGTERM (BUG-639).
204 # Background-spawned processes (foreground_internal=True) have their PID written by the
205 # parent in run_background(); plain foreground runs must write their own PID here.
206 running_dir = loops_dir / ".running"
207 running_dir.mkdir(parents=True, exist_ok=True)
208 _reconcile_stale_runs(loops_dir)
209 if getattr(args, "foreground_internal", False):
210 instance_id: str | None = getattr(args, "instance_id", None)
211 else:
212 instance_id = _make_instance_id(loop_name)
213 pid_file = running_dir / f"{instance_id or loop_name}.pid"
214 foreground_pid_file: Path | None = pid_file
216 if not getattr(args, "foreground_internal", False):
217 pid_file.write_text(str(os.getpid()))
219 def _cleanup_pid() -> None:
220 pid_file.unlink(missing_ok=True)
222 atexit.register(_cleanup_pid)
224 # Scope-based locking
225 lock_manager = LockManager(loops_dir)
226 scope = fsm.scope or ["."]
227 _queue_entry_file: Path | None = None
229 def _cleanup_queue_entry() -> None:
230 if _queue_entry_file is not None:
231 _queue_entry_file.unlink(missing_ok=True)
233 if not lock_manager.acquire(fsm.name, scope, instance_id=instance_id):
234 conflict = lock_manager.find_conflict(scope)
235 if conflict and getattr(args, "queue", False):
236 # Write queue entry so dashboard shows the waiting loop
237 queue_dir = loops_dir / ".queue"
238 queue_dir.mkdir(parents=True, exist_ok=True)
239 entry_id = str(uuid.uuid4())
240 entry = {
241 "id": entry_id,
242 "loopName": loop_name,
243 "enqueuedAt": datetime.now(UTC).isoformat(),
244 "context": {
245 "waitingFor": conflict.loop_name,
246 "scope": conflict.scope,
247 "pid": os.getpid(),
248 },
249 }
250 _queue_entry_file = queue_dir / f"{entry_id}.json"
251 _queue_entry_file.write_text(json.dumps(entry, indent=2))
252 atexit.register(_cleanup_queue_entry)
254 logger.info(f"Waiting for conflicting loop '{conflict.loop_name}' to finish...")
255 # Retry loop: when N waiters are released simultaneously, only one wins
256 # acquire(); losers loop back and wait again rather than exiting (BUG-1281).
257 acquired = False
258 _wait_start = time.time()
259 _budget = _config.loops.queue_wait_timeout_seconds
260 while time.time() - _wait_start < _budget:
261 _remaining = _budget - (time.time() - _wait_start)
262 if not lock_manager.wait_for_scope(scope, timeout=int(_remaining)):
263 _cleanup_queue_entry()
264 logger.error("Timeout waiting for scope to become available")
265 return 1
266 if not _is_earliest_waiter(entry_id, queue_dir):
267 time.sleep(1)
268 continue
269 if lock_manager.acquire(fsm.name, scope, instance_id=instance_id):
270 acquired = True
271 break
272 if not acquired:
273 _cleanup_queue_entry()
274 logger.error("Failed to acquire lock after waiting")
275 return 1
276 # Lock acquired - no longer queued
277 _cleanup_queue_entry()
278 elif conflict:
279 logger.error(f"Scope conflict with running loop: {conflict.loop_name}")
280 logger.info(f" Conflicting scope: {conflict.scope}")
281 logger.info(" Use --queue to wait for it to finish")
282 return 1
283 else:
284 # Unexpected: find_conflict returned None but acquire failed
285 logger.error("Failed to acquire scope lock (unknown reason)")
286 return 1
288 executor: PersistentExecutor | None = None
289 try:
290 # Worktree isolation: create branch + directory before anything reads Path.cwd()
291 if getattr(args, "worktree", False):
292 from little_loops.config import BRConfig as _MainBRConfig
293 from little_loops.parallel.git_lock import GitLock
294 from little_loops.worktree_utils import cleanup_worktree, setup_worktree
296 _main_config = _MainBRConfig(Path.cwd())
297 _worktree_base = _main_config.get_worktree_base()
298 _copy_files = _main_config.parallel.worktree_copy_files
299 _repo_path = Path.cwd()
301 _timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
302 _safe_name = re.sub(r"[^a-zA-Z0-9-]", "-", loop_name)
303 _branch_name = f"{_timestamp}-{_safe_name}"
304 _worktree_path = _worktree_base / _branch_name
306 _worktree_base.mkdir(parents=True, exist_ok=True)
307 _git_lock = GitLock(logger)
309 setup_worktree(
310 repo_path=_repo_path,
311 worktree_path=_worktree_path,
312 branch_name=_branch_name,
313 copy_files=_copy_files,
314 logger=logger,
315 git_lock=_git_lock,
316 )
318 logger.info(f"Worktree: {_worktree_path}")
319 logger.info(f"Branch: {_branch_name}")
321 def _cleanup_worktree_on_exit() -> None:
322 cleanup_worktree(
323 worktree_path=_worktree_path,
324 repo_path=_repo_path,
325 logger=logger,
326 git_lock=_git_lock,
327 delete_branch=True,
328 )
330 atexit.register(_cleanup_worktree_on_exit)
332 os.environ["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1"
333 os.chdir(_worktree_path)
335 circuit = (
336 RateLimitCircuit(Path(_config.commands.rate_limits.circuit_breaker_path))
337 if _config.commands.rate_limits.circuit_breaker_enabled
338 else None
339 )
340 executor = PersistentExecutor(
341 fsm, loops_dir=loops_dir, circuit=circuit, instance_id=instance_id, pid=os.getpid()
342 )
344 # Register signal handlers for graceful shutdown
345 register_loop_signal_handlers(executor, pid_file=foreground_pid_file)
347 from little_loops.extension import wire_extensions
348 from little_loops.transport import wire_transports
350 wire_extensions(executor.event_bus, _config.extensions, executor=executor)
351 wire_transports(executor.event_bus, _config.events)
352 return run_foreground(
353 executor,
354 fsm,
355 args,
356 highlight_color=_highlight_color,
357 edge_label_colors=_edge_label_colors,
358 badges=_badges,
359 )
360 finally:
361 if executor is not None:
362 executor.close_transports()
363 lock_manager.release(fsm.name, instance_id=instance_id)