Coverage for little_loops / cli / loop / lifecycle.py: 91%

298 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-loop lifecycle subcommands: status, stop, resume.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import atexit 

7import json 

8import os 

9import signal 

10import time 

11from pathlib import Path 

12 

13from little_loops.cli.loop._helpers import ( 

14 EXIT_CODES, 

15 load_loop, 

16 register_loop_signal_handlers, 

17 run_background, 

18) 

19from little_loops.fsm.concurrency import _process_alive 

20from little_loops.fsm.persistence import LoopState, _find_instances 

21from little_loops.logger import Logger 

22 

23 

24def _format_relative_time(seconds: float) -> str: 

25 """Format seconds as a human-readable relative time string (e.g., '3m ago'). 

26 

27 Delegates to the shared ``format_relative_time`` in ``cli.output``. 

28 """ 

29 from little_loops.cli.output import format_relative_time 

30 

31 return format_relative_time(seconds) 

32 

33 

34def _read_pid_file(pid_file: Path) -> int | None: 

35 """Read and validate a PID file. 

36 

37 Returns: 

38 The PID as an integer, or None if the file doesn't exist or is invalid. 

39 """ 

40 if not pid_file.exists(): 

41 return None 

42 try: 

43 return int(pid_file.read_text().strip()) 

44 except (ValueError, OSError): 

45 return None 

46 

47 

48def _kill_with_timeout(pid: int, label: str, logger: Logger) -> None: 

49 """Send SIGTERM to pid; escalate to SIGKILL after 10 s if still alive.""" 

50 os.kill(pid, signal.SIGTERM) 

51 for _ in range(10): 

52 time.sleep(1) 

53 if not _process_alive(pid): 

54 return 

55 try: 

56 os.kill(pid, signal.SIGKILL) 

57 logger.warning(f"Sent SIGKILL to {label} (PID: {pid})") 

58 except OSError: 

59 pass 

60 

61 

62def _status_single( 

63 instance_id: str | None, 

64 state: LoopState, 

65 loop_name: str, 

66 running_dir: Path, 

67 args: argparse.Namespace | None, 

68) -> int: 

69 """Render status for one instance (human-readable or JSON).""" 

70 from little_loops.cli.output import print_json 

71 

72 stem = instance_id or loop_name 

73 pid_file = running_dir / f"{stem}.pid" 

74 pid = _read_pid_file(pid_file) 

75 pid_source: str | None = "pid_file" if pid is not None else None 

76 

77 if pid is None: 

78 lock_file_path = running_dir / f"{stem}.lock" 

79 if lock_file_path.exists(): 

80 try: 

81 with open(lock_file_path) as _lf: 

82 lock_data = json.load(_lf) 

83 pid = lock_data.get("pid") 

84 if pid is not None: 

85 pid_source = "lock_file" 

86 except (json.JSONDecodeError, KeyError, OSError): 

87 pass 

88 

89 log_file = running_dir / f"{stem}.log" 

90 log_file_str: str | None = None 

91 log_updated_ago: str | None = None 

92 last_event: str | None = None 

93 if log_file.exists(): 

94 log_file_str = str(log_file) 

95 age_seconds = time.time() - log_file.stat().st_mtime 

96 log_updated_ago = _format_relative_time(age_seconds) 

97 try: 

98 lines = log_file.read_text().splitlines() 

99 non_empty = [ln for ln in lines if ln.strip()] 

100 last_event = non_empty[-1] if non_empty else None 

101 except OSError: 

102 last_event = None 

103 

104 if getattr(args, "json", False): 

105 d = state.to_dict() 

106 d["pid"] = pid 

107 d["pid_source"] = pid_source 

108 d["log_file"] = log_file_str 

109 d["log_updated_ago"] = log_updated_ago 

110 d["last_event"] = last_event 

111 print_json(d) 

112 return 0 

113 

114 print(f"Loop: {state.loop_name}") 

115 print(f"Status: {state.status}") 

116 print(f"Current state: {state.current_state}") 

117 print(f"Iteration: {state.iteration}") 

118 print(f"Started: {state.started_at}") 

119 print(f"Updated: {state.updated_at}") 

120 

121 if pid is not None: 

122 if _process_alive(pid): 

123 print(f"PID: {pid} (running)") 

124 else: 

125 print(f"PID: {pid} (not running - stale PID file)") 

126 

127 if log_file_str is not None: 

128 print(f"Log: {log_file_str}") 

129 print(f"Log updated: {log_updated_ago}") 

130 if last_event: 

131 print(f"Last event: {last_event}") 

132 else: 

133 print("Log: (not found)") 

134 

135 if state.continuation_prompt: 

136 prompt_preview = state.continuation_prompt[:200] 

137 if len(state.continuation_prompt) > 200: 

138 prompt_preview += "..." 

139 print(f"Continuation context: {prompt_preview}") 

140 return 0 

141 

142 

143def cmd_status( 

144 loop_name: str, 

145 loops_dir: Path, 

146 logger: Logger, 

147 args: argparse.Namespace | None = None, 

148) -> int: 

149 """Show loop status.""" 

150 from little_loops.cli.output import print_json 

151 

152 running_dir = loops_dir / ".running" 

153 instances = _find_instances(loop_name, running_dir) 

154 

155 if not instances: 

156 logger.error(f"No state found for: {loop_name}") 

157 return 1 

158 

159 if len(instances) == 1: 

160 instance_id, state = instances[0] 

161 return _status_single(instance_id, state, loop_name, running_dir, args) 

162 

163 # Multiple instances: aggregate display 

164 if getattr(args, "json", False): 

165 result_list = [] 

166 for instance_id, state in instances: 

167 stem = instance_id or loop_name 

168 pid_file = running_dir / f"{stem}.pid" 

169 pid = _read_pid_file(pid_file) 

170 pid_source: str | None = "pid_file" if pid is not None else None 

171 if pid is None: 

172 lock_file_path = running_dir / f"{stem}.lock" 

173 if lock_file_path.exists(): 

174 try: 

175 with open(lock_file_path) as _lf: 

176 lock_data = json.load(_lf) 

177 pid = lock_data.get("pid") 

178 if pid is not None: 

179 pid_source = "lock_file" 

180 except (json.JSONDecodeError, KeyError, OSError): 

181 pass 

182 log_file = running_dir / f"{stem}.log" 

183 d = state.to_dict() 

184 d["instance_id"] = instance_id 

185 d["pid"] = pid 

186 d["pid_source"] = pid_source 

187 if log_file.exists(): 

188 d["log_file"] = str(log_file) 

189 d["log_updated_ago"] = _format_relative_time(time.time() - log_file.stat().st_mtime) 

190 else: 

191 d["log_file"] = None 

192 d["log_updated_ago"] = None 

193 result_list.append(d) 

194 print_json(result_list) 

195 return 0 

196 

197 print(f"{len(instances)} instances of '{loop_name}':") 

198 for i, (instance_id, state) in enumerate(instances, 1): 

199 stem = instance_id or loop_name 

200 pid_file = running_dir / f"{stem}.pid" 

201 pid = _read_pid_file(pid_file) 

202 if pid is None: 

203 lock_file_path = running_dir / f"{stem}.lock" 

204 if lock_file_path.exists(): 

205 try: 

206 with open(lock_file_path) as _lf: 

207 lock_data = json.load(_lf) 

208 pid = lock_data.get("pid") 

209 except (json.JSONDecodeError, KeyError, OSError): 

210 pass 

211 log_file = running_dir / f"{stem}.log" 

212 

213 print() 

214 print(f"[{i}] {stem}") 

215 print(f" Status: {state.status}") 

216 print(f" Current state: {state.current_state}") 

217 print(f" Iteration: {state.iteration}") 

218 if pid is not None: 

219 if _process_alive(pid): 

220 print(f" PID: {pid} (running)") 

221 else: 

222 print(f" PID: {pid} (not running - stale PID file)") 

223 if log_file.exists(): 

224 age_seconds = time.time() - log_file.stat().st_mtime 

225 print(f" Log: {log_file}") 

226 print(f" Log updated: {_format_relative_time(age_seconds)}") 

227 else: 

228 print(" Log: (not found)") 

229 return 0 

230 

231 

232def cmd_stop( 

233 loop_name: str, 

234 loops_dir: Path, 

235 logger: Logger, 

236) -> int: 

237 """Stop a running loop.""" 

238 from little_loops.fsm.persistence import StatePersistence 

239 

240 running_dir = loops_dir / ".running" 

241 instances = _find_instances(loop_name, running_dir) 

242 

243 if not instances: 

244 logger.error(f"No state found for: {loop_name}") 

245 return 1 

246 

247 running_instances = [(iid, s) for iid, s in instances if s.status == "running"] 

248 

249 if not running_instances: 

250 # Secondary check: an orphaned lock-file with a live PID blocks scope 

251 # acquisition even when state is not "running". Kill the holder and release. 

252 for instance_id, state in instances: 

253 stem = instance_id or loop_name 

254 lock_file = running_dir / f"{stem}.lock" 

255 if lock_file.exists(): 

256 lock_pid: int | None = None 

257 try: 

258 with open(lock_file) as _lf: 

259 lock_data = json.load(_lf) 

260 lock_pid = lock_data.get("pid") 

261 except (json.JSONDecodeError, KeyError, OSError): 

262 pass 

263 if lock_pid and _process_alive(lock_pid): 

264 logger.warning( 

265 f"Loop state is '{state.status}' but lock file holds live PID {lock_pid}. " 

266 "Killing orphaned lock holder..." 

267 ) 

268 _kill_with_timeout(lock_pid, stem, logger) 

269 lock_file.unlink(missing_ok=True) 

270 logger.success(f"Released orphaned scope lock for {stem}") 

271 return 0 

272 elif lock_pid is not None: 

273 # Dead PID: stale lock file, just remove it 

274 lock_file.unlink(missing_ok=True) 

275 logger.info(f"Removed stale lock file for {stem}") 

276 return 0 

277 _, state = instances[0] 

278 logger.error(f"Loop not running: {loop_name} (status: {state.status})") 

279 return 1 

280 

281 for instance_id, state in running_instances: 

282 persistence = StatePersistence(loop_name, loops_dir, instance_id=instance_id) 

283 stem = instance_id or loop_name 

284 pid_file = running_dir / f"{stem}.pid" 

285 pid = _read_pid_file(pid_file) 

286 

287 if pid is not None: 

288 if _process_alive(pid): 

289 _kill_with_timeout(pid, stem, logger) 

290 state.status = "interrupted" 

291 persistence.save_state(state) 

292 pid_file.unlink(missing_ok=True) 

293 logger.success(f"Stopped {stem} (PID: {pid})") 

294 else: 

295 # Process already exited: preserve its final status, only clean up PID file 

296 logger.info(f"Process {pid} not running, cleaning up PID file") 

297 pid_file.unlink(missing_ok=True) 

298 else: 

299 # No PID file: no background process tracked, update state only 

300 state.status = "interrupted" 

301 persistence.save_state(state) 

302 logger.success(f"Marked {stem} as interrupted") 

303 

304 return 0 

305 

306 

307def cmd_resume( 

308 loop_name: str, 

309 args: argparse.Namespace, 

310 loops_dir: Path, 

311 logger: Logger, 

312) -> int: 

313 """Resume an interrupted loop.""" 

314 from little_loops.fsm.persistence import PersistentExecutor 

315 

316 # Background mode: spawn detached process and return 

317 if getattr(args, "background", False): 

318 return run_background(loop_name, args, loops_dir, subcommand="resume") 

319 

320 # Register PID file for all foreground runs so cmd_stop can send SIGTERM (BUG-639). 

321 # Background-spawned processes (foreground_internal=True) have their PID written by the 

322 # parent in run_background(); plain foreground runs must write their own PID here. 

323 import os 

324 

325 running_dir = loops_dir / ".running" 

326 running_dir.mkdir(parents=True, exist_ok=True) 

327 

328 # Discover all instances and resolve to a single resumable one. 

329 instances = _find_instances(loop_name, running_dir) 

330 from little_loops.fsm.persistence import RESUMABLE_STATUSES 

331 

332 resumable = [(iid, s) for iid, s in instances if s.status in RESUMABLE_STATUSES] 

333 if len(resumable) > 1: 

334 print(f"Multiple instances of '{loop_name}' are resumable:") 

335 for iid, _ in resumable: 

336 print(f" {iid or loop_name}") 

337 print("Use --instance-id to select one.") 

338 return 1 

339 

340 # Use discovered instance_id (or fall back to args / None for no-state case) 

341 if resumable: 

342 instance_id: str | None = resumable[0][0] 

343 state_for_display = resumable[0][1] 

344 else: 

345 instance_id = getattr(args, "instance_id", None) 

346 state_for_display = None 

347 

348 pid_file = running_dir / f"{instance_id or loop_name}.pid" 

349 foreground_pid_file: Path | None = pid_file 

350 

351 if not getattr(args, "foreground_internal", False): 

352 pid_file.write_text(str(os.getpid())) 

353 

354 def _cleanup_pid() -> None: 

355 pid_file.unlink(missing_ok=True) 

356 

357 atexit.register(_cleanup_pid) 

358 

359 try: 

360 fsm = load_loop(loop_name, loops_dir, logger) 

361 except FileNotFoundError as e: 

362 logger.error(str(e)) 

363 return 1 

364 except ValueError as e: 

365 logger.error(f"Validation error: {e}") 

366 return 1 

367 

368 for kv in getattr(args, "context", None) or []: 

369 if "=" not in kv: 

370 raise SystemExit(f"Invalid --context format: {kv!r} (expected KEY=VALUE)") 

371 key, _, value = kv.partition("=") 

372 fsm.context[key.strip()] = value.strip() 

373 

374 if getattr(args, "delay", None) is not None: 

375 fsm.backoff = args.delay 

376 

377 # Apply YAML loop config env-var overrides (CLI flags below overwrite these) 

378 if fsm.config is not None and isinstance(fsm.config.handoff_threshold, int): 

379 os.environ["LL_HANDOFF_THRESHOLD"] = str(fsm.config.handoff_threshold) 

380 

381 if getattr(args, "handoff_threshold", None) is not None: 

382 if not (1 <= args.handoff_threshold <= 100): 

383 raise SystemExit("--handoff-threshold must be between 1 and 100") 

384 os.environ["LL_HANDOFF_THRESHOLD"] = str(args.handoff_threshold) 

385 

386 # Show context if resuming from a handoff 

387 if state_for_display and state_for_display.status == "awaiting_continuation": 

388 print(f"Resuming from context handoff (iteration {state_for_display.iteration})...") 

389 if state_for_display.continuation_prompt: 

390 prompt_preview = state_for_display.continuation_prompt[:500] 

391 if len(state_for_display.continuation_prompt) > 500: 

392 prompt_preview += "..." 

393 print(f"Context: {prompt_preview}") 

394 print() 

395 

396 from little_loops.config import BRConfig 

397 from little_loops.extension import wire_extensions 

398 from little_loops.fsm.rate_limit_circuit import RateLimitCircuit 

399 from little_loops.transport import wire_transports 

400 

401 config = BRConfig(Path.cwd()) 

402 circuit = ( 

403 RateLimitCircuit(Path(config.commands.rate_limits.circuit_breaker_path)) 

404 if config.commands.rate_limits.circuit_breaker_enabled 

405 else None 

406 ) 

407 executor = PersistentExecutor( 

408 fsm, loops_dir=loops_dir, circuit=circuit, instance_id=instance_id 

409 ) 

410 

411 # Register signal handlers for graceful shutdown (same as cmd_run) 

412 register_loop_signal_handlers(executor, pid_file=foreground_pid_file) 

413 

414 wire_extensions(executor.event_bus, config.extensions, executor=executor) 

415 wire_transports(executor.event_bus, config.events) 

416 

417 try: 

418 result = executor.resume() 

419 

420 if result is None: 

421 logger.warning(f"Nothing to resume for: {loop_name}") 

422 return 1 

423 

424 duration_sec = result.duration_ms / 1000 

425 if duration_sec < 60: 

426 duration_str = f"{duration_sec:.1f}s" 

427 else: 

428 minutes = int(duration_sec // 60) 

429 seconds = duration_sec % 60 

430 duration_str = f"{minutes}m {seconds:.0f}s" 

431 

432 logger.success( 

433 f"Resumed and completed: {result.final_state} " 

434 f"({result.iterations} iterations, {duration_str})" 

435 ) 

436 return EXIT_CODES.get(result.terminated_by, 1) 

437 finally: 

438 executor.close_transports()