Coverage for little_loops / cli / loop / lifecycle.py: 91%
298 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-loop lifecycle subcommands: status, stop, resume."""
3from __future__ import annotations
5import argparse
6import atexit
7import json
8import os
9import signal
10import time
11from pathlib import Path
13from little_loops.cli.loop._helpers import (
14 EXIT_CODES,
15 load_loop,
16 register_loop_signal_handlers,
17 run_background,
18)
19from little_loops.fsm.concurrency import _process_alive
20from little_loops.fsm.persistence import LoopState, _find_instances
21from little_loops.logger import Logger
24def _format_relative_time(seconds: float) -> str:
25 """Format seconds as a human-readable relative time string (e.g., '3m ago').
27 Delegates to the shared ``format_relative_time`` in ``cli.output``.
28 """
29 from little_loops.cli.output import format_relative_time
31 return format_relative_time(seconds)
34def _read_pid_file(pid_file: Path) -> int | None:
35 """Read and validate a PID file.
37 Returns:
38 The PID as an integer, or None if the file doesn't exist or is invalid.
39 """
40 if not pid_file.exists():
41 return None
42 try:
43 return int(pid_file.read_text().strip())
44 except (ValueError, OSError):
45 return None
48def _kill_with_timeout(pid: int, label: str, logger: Logger) -> None:
49 """Send SIGTERM to pid; escalate to SIGKILL after 10 s if still alive."""
50 os.kill(pid, signal.SIGTERM)
51 for _ in range(10):
52 time.sleep(1)
53 if not _process_alive(pid):
54 return
55 try:
56 os.kill(pid, signal.SIGKILL)
57 logger.warning(f"Sent SIGKILL to {label} (PID: {pid})")
58 except OSError:
59 pass
62def _status_single(
63 instance_id: str | None,
64 state: LoopState,
65 loop_name: str,
66 running_dir: Path,
67 args: argparse.Namespace | None,
68) -> int:
69 """Render status for one instance (human-readable or JSON)."""
70 from little_loops.cli.output import print_json
72 stem = instance_id or loop_name
73 pid_file = running_dir / f"{stem}.pid"
74 pid = _read_pid_file(pid_file)
75 pid_source: str | None = "pid_file" if pid is not None else None
77 if pid is None:
78 lock_file_path = running_dir / f"{stem}.lock"
79 if lock_file_path.exists():
80 try:
81 with open(lock_file_path) as _lf:
82 lock_data = json.load(_lf)
83 pid = lock_data.get("pid")
84 if pid is not None:
85 pid_source = "lock_file"
86 except (json.JSONDecodeError, KeyError, OSError):
87 pass
89 log_file = running_dir / f"{stem}.log"
90 log_file_str: str | None = None
91 log_updated_ago: str | None = None
92 last_event: str | None = None
93 if log_file.exists():
94 log_file_str = str(log_file)
95 age_seconds = time.time() - log_file.stat().st_mtime
96 log_updated_ago = _format_relative_time(age_seconds)
97 try:
98 lines = log_file.read_text().splitlines()
99 non_empty = [ln for ln in lines if ln.strip()]
100 last_event = non_empty[-1] if non_empty else None
101 except OSError:
102 last_event = None
104 if getattr(args, "json", False):
105 d = state.to_dict()
106 d["pid"] = pid
107 d["pid_source"] = pid_source
108 d["log_file"] = log_file_str
109 d["log_updated_ago"] = log_updated_ago
110 d["last_event"] = last_event
111 print_json(d)
112 return 0
114 print(f"Loop: {state.loop_name}")
115 print(f"Status: {state.status}")
116 print(f"Current state: {state.current_state}")
117 print(f"Iteration: {state.iteration}")
118 print(f"Started: {state.started_at}")
119 print(f"Updated: {state.updated_at}")
121 if pid is not None:
122 if _process_alive(pid):
123 print(f"PID: {pid} (running)")
124 else:
125 print(f"PID: {pid} (not running - stale PID file)")
127 if log_file_str is not None:
128 print(f"Log: {log_file_str}")
129 print(f"Log updated: {log_updated_ago}")
130 if last_event:
131 print(f"Last event: {last_event}")
132 else:
133 print("Log: (not found)")
135 if state.continuation_prompt:
136 prompt_preview = state.continuation_prompt[:200]
137 if len(state.continuation_prompt) > 200:
138 prompt_preview += "..."
139 print(f"Continuation context: {prompt_preview}")
140 return 0
143def cmd_status(
144 loop_name: str,
145 loops_dir: Path,
146 logger: Logger,
147 args: argparse.Namespace | None = None,
148) -> int:
149 """Show loop status."""
150 from little_loops.cli.output import print_json
152 running_dir = loops_dir / ".running"
153 instances = _find_instances(loop_name, running_dir)
155 if not instances:
156 logger.error(f"No state found for: {loop_name}")
157 return 1
159 if len(instances) == 1:
160 instance_id, state = instances[0]
161 return _status_single(instance_id, state, loop_name, running_dir, args)
163 # Multiple instances: aggregate display
164 if getattr(args, "json", False):
165 result_list = []
166 for instance_id, state in instances:
167 stem = instance_id or loop_name
168 pid_file = running_dir / f"{stem}.pid"
169 pid = _read_pid_file(pid_file)
170 pid_source: str | None = "pid_file" if pid is not None else None
171 if pid is None:
172 lock_file_path = running_dir / f"{stem}.lock"
173 if lock_file_path.exists():
174 try:
175 with open(lock_file_path) as _lf:
176 lock_data = json.load(_lf)
177 pid = lock_data.get("pid")
178 if pid is not None:
179 pid_source = "lock_file"
180 except (json.JSONDecodeError, KeyError, OSError):
181 pass
182 log_file = running_dir / f"{stem}.log"
183 d = state.to_dict()
184 d["instance_id"] = instance_id
185 d["pid"] = pid
186 d["pid_source"] = pid_source
187 if log_file.exists():
188 d["log_file"] = str(log_file)
189 d["log_updated_ago"] = _format_relative_time(time.time() - log_file.stat().st_mtime)
190 else:
191 d["log_file"] = None
192 d["log_updated_ago"] = None
193 result_list.append(d)
194 print_json(result_list)
195 return 0
197 print(f"{len(instances)} instances of '{loop_name}':")
198 for i, (instance_id, state) in enumerate(instances, 1):
199 stem = instance_id or loop_name
200 pid_file = running_dir / f"{stem}.pid"
201 pid = _read_pid_file(pid_file)
202 if pid is None:
203 lock_file_path = running_dir / f"{stem}.lock"
204 if lock_file_path.exists():
205 try:
206 with open(lock_file_path) as _lf:
207 lock_data = json.load(_lf)
208 pid = lock_data.get("pid")
209 except (json.JSONDecodeError, KeyError, OSError):
210 pass
211 log_file = running_dir / f"{stem}.log"
213 print()
214 print(f"[{i}] {stem}")
215 print(f" Status: {state.status}")
216 print(f" Current state: {state.current_state}")
217 print(f" Iteration: {state.iteration}")
218 if pid is not None:
219 if _process_alive(pid):
220 print(f" PID: {pid} (running)")
221 else:
222 print(f" PID: {pid} (not running - stale PID file)")
223 if log_file.exists():
224 age_seconds = time.time() - log_file.stat().st_mtime
225 print(f" Log: {log_file}")
226 print(f" Log updated: {_format_relative_time(age_seconds)}")
227 else:
228 print(" Log: (not found)")
229 return 0
232def cmd_stop(
233 loop_name: str,
234 loops_dir: Path,
235 logger: Logger,
236) -> int:
237 """Stop a running loop."""
238 from little_loops.fsm.persistence import StatePersistence
240 running_dir = loops_dir / ".running"
241 instances = _find_instances(loop_name, running_dir)
243 if not instances:
244 logger.error(f"No state found for: {loop_name}")
245 return 1
247 running_instances = [(iid, s) for iid, s in instances if s.status == "running"]
249 if not running_instances:
250 # Secondary check: an orphaned lock-file with a live PID blocks scope
251 # acquisition even when state is not "running". Kill the holder and release.
252 for instance_id, state in instances:
253 stem = instance_id or loop_name
254 lock_file = running_dir / f"{stem}.lock"
255 if lock_file.exists():
256 lock_pid: int | None = None
257 try:
258 with open(lock_file) as _lf:
259 lock_data = json.load(_lf)
260 lock_pid = lock_data.get("pid")
261 except (json.JSONDecodeError, KeyError, OSError):
262 pass
263 if lock_pid and _process_alive(lock_pid):
264 logger.warning(
265 f"Loop state is '{state.status}' but lock file holds live PID {lock_pid}. "
266 "Killing orphaned lock holder..."
267 )
268 _kill_with_timeout(lock_pid, stem, logger)
269 lock_file.unlink(missing_ok=True)
270 logger.success(f"Released orphaned scope lock for {stem}")
271 return 0
272 elif lock_pid is not None:
273 # Dead PID: stale lock file, just remove it
274 lock_file.unlink(missing_ok=True)
275 logger.info(f"Removed stale lock file for {stem}")
276 return 0
277 _, state = instances[0]
278 logger.error(f"Loop not running: {loop_name} (status: {state.status})")
279 return 1
281 for instance_id, state in running_instances:
282 persistence = StatePersistence(loop_name, loops_dir, instance_id=instance_id)
283 stem = instance_id or loop_name
284 pid_file = running_dir / f"{stem}.pid"
285 pid = _read_pid_file(pid_file)
287 if pid is not None:
288 if _process_alive(pid):
289 _kill_with_timeout(pid, stem, logger)
290 state.status = "interrupted"
291 persistence.save_state(state)
292 pid_file.unlink(missing_ok=True)
293 logger.success(f"Stopped {stem} (PID: {pid})")
294 else:
295 # Process already exited: preserve its final status, only clean up PID file
296 logger.info(f"Process {pid} not running, cleaning up PID file")
297 pid_file.unlink(missing_ok=True)
298 else:
299 # No PID file: no background process tracked, update state only
300 state.status = "interrupted"
301 persistence.save_state(state)
302 logger.success(f"Marked {stem} as interrupted")
304 return 0
307def cmd_resume(
308 loop_name: str,
309 args: argparse.Namespace,
310 loops_dir: Path,
311 logger: Logger,
312) -> int:
313 """Resume an interrupted loop."""
314 from little_loops.fsm.persistence import PersistentExecutor
316 # Background mode: spawn detached process and return
317 if getattr(args, "background", False):
318 return run_background(loop_name, args, loops_dir, subcommand="resume")
320 # Register PID file for all foreground runs so cmd_stop can send SIGTERM (BUG-639).
321 # Background-spawned processes (foreground_internal=True) have their PID written by the
322 # parent in run_background(); plain foreground runs must write their own PID here.
323 import os
325 running_dir = loops_dir / ".running"
326 running_dir.mkdir(parents=True, exist_ok=True)
328 # Discover all instances and resolve to a single resumable one.
329 instances = _find_instances(loop_name, running_dir)
330 from little_loops.fsm.persistence import RESUMABLE_STATUSES
332 resumable = [(iid, s) for iid, s in instances if s.status in RESUMABLE_STATUSES]
333 if len(resumable) > 1:
334 print(f"Multiple instances of '{loop_name}' are resumable:")
335 for iid, _ in resumable:
336 print(f" {iid or loop_name}")
337 print("Use --instance-id to select one.")
338 return 1
340 # Use discovered instance_id (or fall back to args / None for no-state case)
341 if resumable:
342 instance_id: str | None = resumable[0][0]
343 state_for_display = resumable[0][1]
344 else:
345 instance_id = getattr(args, "instance_id", None)
346 state_for_display = None
348 pid_file = running_dir / f"{instance_id or loop_name}.pid"
349 foreground_pid_file: Path | None = pid_file
351 if not getattr(args, "foreground_internal", False):
352 pid_file.write_text(str(os.getpid()))
354 def _cleanup_pid() -> None:
355 pid_file.unlink(missing_ok=True)
357 atexit.register(_cleanup_pid)
359 try:
360 fsm = load_loop(loop_name, loops_dir, logger)
361 except FileNotFoundError as e:
362 logger.error(str(e))
363 return 1
364 except ValueError as e:
365 logger.error(f"Validation error: {e}")
366 return 1
368 for kv in getattr(args, "context", None) or []:
369 if "=" not in kv:
370 raise SystemExit(f"Invalid --context format: {kv!r} (expected KEY=VALUE)")
371 key, _, value = kv.partition("=")
372 fsm.context[key.strip()] = value.strip()
374 if getattr(args, "delay", None) is not None:
375 fsm.backoff = args.delay
377 # Apply YAML loop config env-var overrides (CLI flags below overwrite these)
378 if fsm.config is not None and isinstance(fsm.config.handoff_threshold, int):
379 os.environ["LL_HANDOFF_THRESHOLD"] = str(fsm.config.handoff_threshold)
381 if getattr(args, "handoff_threshold", None) is not None:
382 if not (1 <= args.handoff_threshold <= 100):
383 raise SystemExit("--handoff-threshold must be between 1 and 100")
384 os.environ["LL_HANDOFF_THRESHOLD"] = str(args.handoff_threshold)
386 # Show context if resuming from a handoff
387 if state_for_display and state_for_display.status == "awaiting_continuation":
388 print(f"Resuming from context handoff (iteration {state_for_display.iteration})...")
389 if state_for_display.continuation_prompt:
390 prompt_preview = state_for_display.continuation_prompt[:500]
391 if len(state_for_display.continuation_prompt) > 500:
392 prompt_preview += "..."
393 print(f"Context: {prompt_preview}")
394 print()
396 from little_loops.config import BRConfig
397 from little_loops.extension import wire_extensions
398 from little_loops.fsm.rate_limit_circuit import RateLimitCircuit
399 from little_loops.transport import wire_transports
401 config = BRConfig(Path.cwd())
402 circuit = (
403 RateLimitCircuit(Path(config.commands.rate_limits.circuit_breaker_path))
404 if config.commands.rate_limits.circuit_breaker_enabled
405 else None
406 )
407 executor = PersistentExecutor(
408 fsm, loops_dir=loops_dir, circuit=circuit, instance_id=instance_id
409 )
411 # Register signal handlers for graceful shutdown (same as cmd_run)
412 register_loop_signal_handlers(executor, pid_file=foreground_pid_file)
414 wire_extensions(executor.event_bus, config.extensions, executor=executor)
415 wire_transports(executor.event_bus, config.events)
417 try:
418 result = executor.resume()
420 if result is None:
421 logger.warning(f"Nothing to resume for: {loop_name}")
422 return 1
424 duration_sec = result.duration_ms / 1000
425 if duration_sec < 60:
426 duration_str = f"{duration_sec:.1f}s"
427 else:
428 minutes = int(duration_sec // 60)
429 seconds = duration_sec % 60
430 duration_str = f"{minutes}m {seconds:.0f}s"
432 logger.success(
433 f"Resumed and completed: {result.final_state} "
434 f"({result.iterations} iterations, {duration_str})"
435 )
436 return EXIT_CODES.get(result.terminated_by, 1)
437 finally:
438 executor.close_transports()