Coverage for little_loops / fsm / persistence.py: 90%

363 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""State persistence and event streaming for FSM loops. 

2 

3This module provides persistence capabilities for FSM loop execution: 

4- LoopState: Dataclass representing loop execution state 

5- StatePersistence: File I/O for state and events 

6- PersistentExecutor: Wrapper that persists state during execution 

7- Utility functions for listing running loops and reading history 

8 

9File structure: 

10 .loops/ 

11 ├── fix-types.yaml # Loop definition 

12 ├── .running/ # Runtime state (auto-managed) 

13 │ ├── fix-types-20260503T122306.state.json 

14 │ └── fix-types-20260503T122306.events.jsonl 

15 └── .history/ # Archived run logs (auto-populated) 

16 └── 2024-01-15T103000-fix-types/ 

17 ├── state.json 

18 └── events.jsonl 

19""" 

20 

21from __future__ import annotations 

22 

23import json 

24import logging 

25import os 

26import re 

27import shutil 

28import tempfile 

29import time 

30from dataclasses import dataclass, field 

31from datetime import UTC, datetime 

32from pathlib import Path 

33from typing import Any 

34 

35from little_loops.events import EventBus 

36from little_loops.fsm.concurrency import _process_alive 

37from little_loops.fsm.executor import EventCallback, ExecutionResult, FSMExecutor 

38from little_loops.fsm.schema import FSMLoop 

39 

40RUNNING_DIR = ".running" 

41HISTORY_DIR = ".history" 

42 

43RESUMABLE_STATUSES: frozenset[str] = frozenset({"running", "awaiting_continuation", "interrupted"}) 

44 

45logger = logging.getLogger(__name__) 

46 

47_RUN_FOLDER = re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{6})-(.+)$") 

48_INSTANCE_SUFFIX = re.compile(r"-\d{8}T\d{6}$") 

49 

50 

51def _parse_run_folder(name: str) -> tuple[str, str] | None: 

52 """Return (run_id, loop_name) from a flat history folder name, or None.""" 

53 m = _RUN_FOLDER.match(name) 

54 return (m.group(1), m.group(2)) if m else None 

55 

56 

57def _iso_now() -> str: 

58 """Return current time as ISO 8601 string.""" 

59 return datetime.now(UTC).isoformat() 

60 

61 

62def _now_ms() -> int: 

63 """Return current time in milliseconds.""" 

64 return int(time.time() * 1000) 

65 

66 

67@dataclass 

68class LoopState: 

69 """Persistent state for an FSM loop execution. 

70 

71 This captures all runtime state needed to resume a loop: 

72 - Current state and iteration 

73 - Captured variables and previous result 

74 - Last evaluation result 

75 - Timestamps and status 

76 

77 Attributes: 

78 loop_name: Name of the loop 

79 current_state: Current FSM state name 

80 iteration: Current iteration count (1-based) 

81 captured: Captured action outputs by variable name 

82 prev_result: Previous state's result (output, exit_code, state) 

83 last_result: Last evaluation result (verdict, details) 

84 started_at: ISO timestamp when loop started 

85 updated_at: ISO timestamp when state was last saved 

86 status: Execution status (running, completed, failed, interrupted, awaiting_continuation, timed_out) 

87 continuation_prompt: Continuation context from handoff signal (if status is awaiting_continuation) 

88 accumulated_ms: Total milliseconds elapsed across all segments up to this save (used to restore 

89 elapsed time correctly after resume, so duration_ms and ${loop.elapsed_ms} reflect the 

90 full loop lifetime rather than only the most recent segment) 

91 """ 

92 

93 loop_name: str 

94 current_state: str 

95 iteration: int 

96 captured: dict[str, dict[str, Any]] 

97 prev_result: dict[str, Any] | None 

98 last_result: dict[str, Any] | None 

99 started_at: str 

100 updated_at: str 

101 status: ( 

102 str # "running", "completed", "failed", "interrupted", "awaiting_continuation", "timed_out" 

103 ) 

104 continuation_prompt: str | None = None 

105 accumulated_ms: int = 0 # total elapsed ms across all segments (for resume offset) 

106 retry_counts: dict[str, int] = field(default_factory=dict) # per-state retry tracking 

107 # Per-state rate-limit retry tracking (ENH-1133: dict-of-record). 

108 # Each record: {"short_retries": int, "long_retries": int, 

109 # "total_wait_seconds": float, "first_seen_at": float | None}. 

110 # Legacy int values (dict[str, int]) are coerced in from_dict. 

111 rate_limit_retries: dict[str, dict[str, Any]] = field(default_factory=dict) 

112 # Count of consecutive rate_limit_exhausted emissions across states. Reset 

113 # on any non-rate-limited state outcome. Persisted for resume durability. 

114 consecutive_rate_limit_exhaustions: int = 0 

115 # Per-edge revisit tracking for cycle detection. 

116 edge_revisit_counts: dict[str, int] = field(default_factory=dict) 

117 active_sub_loop: str | None = None # name of currently executing sub-loop (observability) 

118 pid: int | None = None # OS PID of the process that started this run (for reconciliation sweep) 

119 

120 def to_dict(self) -> dict[str, Any]: 

121 """Convert to dictionary for JSON serialization.""" 

122 result = { 

123 "loop_name": self.loop_name, 

124 "current_state": self.current_state, 

125 "iteration": self.iteration, 

126 "captured": self.captured, 

127 "prev_result": self.prev_result, 

128 "last_result": self.last_result, 

129 "started_at": self.started_at, 

130 "updated_at": self.updated_at, 

131 "status": self.status, 

132 "accumulated_ms": self.accumulated_ms, 

133 } 

134 if self.continuation_prompt is not None: 

135 result["continuation_prompt"] = self.continuation_prompt 

136 if self.retry_counts: 

137 result["retry_counts"] = self.retry_counts 

138 if self.rate_limit_retries: 

139 result["rate_limit_retries"] = self.rate_limit_retries 

140 if self.consecutive_rate_limit_exhaustions: 

141 result["consecutive_rate_limit_exhaustions"] = self.consecutive_rate_limit_exhaustions 

142 if self.edge_revisit_counts: 

143 result["edge_revisit_counts"] = self.edge_revisit_counts 

144 if self.active_sub_loop is not None: 

145 result["active_sub_loop"] = self.active_sub_loop 

146 if self.pid is not None: 

147 result["pid"] = self.pid 

148 return result 

149 

150 @classmethod 

151 def from_dict(cls, data: dict[str, Any]) -> LoopState: 

152 """Create LoopState from dictionary. 

153 

154 Migrates legacy ``rate_limit_retries`` values from ``dict[str, int]`` 

155 (BUG-1107 pre-ENH-1133 shape) to the dict-of-record shape. Integer 

156 values are coerced to ``{"short_retries": <int>, "long_retries": 0, 

157 "total_wait_seconds": 0.0, "first_seen_at": None}``. 

158 

159 Args: 

160 data: Dictionary with loop state fields 

161 

162 Returns: 

163 LoopState instance 

164 """ 

165 raw_rl = data.get("rate_limit_retries", {}) or {} 

166 migrated_rl: dict[str, dict[str, Any]] = {} 

167 for state_name, value in raw_rl.items(): 

168 if isinstance(value, int): 

169 migrated_rl[state_name] = { 

170 "short_retries": value, 

171 "long_retries": 0, 

172 "total_wait_seconds": 0.0, 

173 "first_seen_at": None, 

174 } 

175 elif isinstance(value, dict): 

176 migrated_rl[state_name] = value 

177 return cls( 

178 loop_name=data["loop_name"], 

179 current_state=data["current_state"], 

180 iteration=data["iteration"], 

181 captured=data.get("captured", {}), 

182 prev_result=data.get("prev_result"), 

183 last_result=data.get("last_result"), 

184 started_at=data["started_at"], 

185 updated_at=data.get("updated_at", ""), 

186 status=data["status"], 

187 continuation_prompt=data.get("continuation_prompt"), 

188 accumulated_ms=data.get("accumulated_ms", 0), 

189 retry_counts=data.get("retry_counts", {}), 

190 rate_limit_retries=migrated_rl, 

191 consecutive_rate_limit_exhaustions=data.get("consecutive_rate_limit_exhaustions", 0), 

192 edge_revisit_counts=data.get("edge_revisit_counts", {}), 

193 active_sub_loop=data.get("active_sub_loop"), 

194 pid=data.get("pid"), 

195 ) 

196 

197 

198class StatePersistence: 

199 """Manage loop state persistence and event streaming. 

200 

201 Handles file I/O for: 

202 - State file: JSON file with current execution state 

203 - Events file: JSONL file with execution events (append-only) 

204 

205 Files are stored in .loops/.running/<instance_id>.* 

206 """ 

207 

208 def __init__( 

209 self, loop_name: str, loops_dir: Path | None = None, instance_id: str | None = None 

210 ) -> None: 

211 """Initialize persistence for a loop. 

212 

213 Args: 

214 loop_name: Name of the loop 

215 loops_dir: Base directory for loops (default: .loops) 

216 instance_id: Optional unique instance identifier; falls back to loop_name when None 

217 """ 

218 self.loop_name = loop_name 

219 self.loops_dir = loops_dir or Path(".loops") 

220 self.running_dir = self.loops_dir / RUNNING_DIR 

221 stem = instance_id or loop_name 

222 self.state_file = self.running_dir / f"{stem}.state.json" 

223 self.events_file = self.running_dir / f"{stem}.events.jsonl" 

224 

225 def initialize(self) -> None: 

226 """Create running directory if needed.""" 

227 self.running_dir.mkdir(parents=True, exist_ok=True) 

228 

229 def save_state(self, state: LoopState) -> None: 

230 """Save current state to file using an atomic write. 

231 

232 Updates the updated_at timestamp before saving. Writes to a temporary 

233 file first, then renames it over the target to avoid leaving a corrupt 

234 or empty state file if the process is killed mid-write. 

235 

236 Args: 

237 state: LoopState to save 

238 """ 

239 state.updated_at = _iso_now() 

240 data = json.dumps(state.to_dict(), indent=2) 

241 tmp_fd, tmp_path = tempfile.mkstemp(dir=self.state_file.parent, suffix=".tmp") 

242 try: 

243 with os.fdopen(tmp_fd, "w") as f: 

244 f.write(data) 

245 os.replace(tmp_path, self.state_file) 

246 except Exception: 

247 os.unlink(tmp_path) 

248 raise 

249 

250 def load_state(self) -> LoopState | None: 

251 """Load state from file, or None if not exists. 

252 

253 Returns: 

254 LoopState if file exists and is valid, None otherwise 

255 """ 

256 if not self.state_file.exists(): 

257 return None 

258 try: 

259 data = json.loads(self.state_file.read_text()) 

260 except json.JSONDecodeError: 

261 return None 

262 try: 

263 return LoopState.from_dict(data) 

264 except KeyError as e: 

265 logger.warning("Corrupted state file %s: missing key %s", self.state_file, e) 

266 return None 

267 

268 def clear_state(self) -> None: 

269 """Remove state file.""" 

270 if self.state_file.exists(): 

271 self.state_file.unlink() 

272 

273 def append_event(self, event: dict[str, Any]) -> None: 

274 """Append event to JSONL file. 

275 

276 Args: 

277 event: Event dictionary to append 

278 """ 

279 with open(self.events_file, "a", encoding="utf-8") as f: 

280 f.write(json.dumps(event) + "\n") 

281 

282 def read_events(self) -> list[dict[str, Any]]: 

283 """Read all events from file. 

284 

285 Returns: 

286 List of event dictionaries, empty if file doesn't exist 

287 """ 

288 if not self.events_file.exists(): 

289 return [] 

290 events: list[dict[str, Any]] = [] 

291 with open(self.events_file, encoding="utf-8") as f: 

292 for line in f: 

293 line = line.strip() 

294 if line: 

295 try: 

296 events.append(json.loads(line)) 

297 except json.JSONDecodeError: 

298 continue # Skip malformed lines 

299 return events 

300 

301 def clear_events(self) -> None: 

302 """Remove events file.""" 

303 if self.events_file.exists(): 

304 self.events_file.unlink() 

305 

306 def archive_run(self) -> Path | None: 

307 """Archive current run files to .history/ before clearing. 

308 

309 Reads the current state to derive the run timestamp, then copies 

310 both state.json and events.jsonl into: 

311 <loops_dir>/.history/<run_id>-<loop_name>/ 

312 

313 where run_id is a compact ISO timestamp derived from started_at 

314 (e.g. "2024-01-15T103000" from "2024-01-15T10:30:00.123456+00:00"). 

315 

316 Returns: 

317 Path to the archive directory if files were archived, None if 

318 there were no files to archive (fresh run). 

319 """ 

320 has_state = self.state_file.exists() 

321 has_events = self.events_file.exists() 

322 if not has_state and not has_events: 

323 return None 

324 

325 # Derive run ID from started_at in state file, or fall back to now 

326 state = self.load_state() 

327 if state is not None and state.started_at: 

328 # Compact ISO: strip colons, dots, plus signs; take first 19 chars 

329 # e.g. "2024-01-15T10:30:00.123+00:00" → "2024-01-15T103000" 

330 run_id = state.started_at.replace(":", "").replace(".", "").replace("+", "")[:17] 

331 else: 

332 run_id = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%S") 

333 

334 run_folder = f"{run_id}-{self.loop_name}" 

335 archive_dir = self.loops_dir / HISTORY_DIR / run_folder 

336 archive_dir.mkdir(parents=True, exist_ok=True) 

337 

338 if has_state: 

339 shutil.copy2(self.state_file, archive_dir / "state.json") 

340 if has_events: 

341 shutil.copy2(self.events_file, archive_dir / "events.jsonl") 

342 

343 return archive_dir 

344 

345 def clear_all(self) -> None: 

346 """Archive current run files then clear state and events (for new run).""" 

347 self.archive_run() 

348 self.clear_state() 

349 self.clear_events() 

350 

351 

352def _reconcile_stale_runs(loops_dir: Path) -> int: 

353 """Archive state files in .running/ that belong to dead or terminal processes. 

354 

355 Called at loop startup to clean up files left by crashed or interrupted runs. 

356 Returns the count of archived files. 

357 

358 Strategy (mirrors LockManager.find_conflict() stale-lock cleanup): 

359 - Terminal-status files (completed/failed/timed_out) are archived 

360 unconditionally — they are definitionally stale by invariant. 

361 - status="interrupted" files are left alone so the user can resume them. 

362 - status="running" files are checked via their sibling .pid file; archived 

363 only if the PID is confirmed dead. No .pid file → leave alone (can't confirm). 

364 """ 

365 running_dir = loops_dir / RUNNING_DIR 

366 if not running_dir.exists(): 

367 return 0 

368 

369 terminal_statuses = {"completed", "failed", "timed_out"} 

370 archived = 0 

371 

372 for state_file in running_dir.glob("*.state.json"): 

373 try: 

374 data = json.loads(state_file.read_text()) 

375 state = LoopState.from_dict(data) 

376 except (json.JSONDecodeError, KeyError, OSError): 

377 continue 

378 

379 is_stale = state.status in terminal_statuses 

380 

381 if not is_stale and state.status == "running": 

382 stem = state_file.name.removesuffix(".state.json") 

383 pid_file = running_dir / f"{stem}.pid" 

384 if pid_file.exists(): 

385 try: 

386 pid = int(pid_file.read_text().strip()) 

387 is_stale = not _process_alive(pid) 

388 except (OSError, ValueError): 

389 pass 

390 

391 if not is_stale: 

392 continue 

393 

394 stem = state_file.name.removesuffix(".state.json") 

395 instance_id = stem if stem != state.loop_name else None 

396 persistence = StatePersistence( 

397 loop_name=state.loop_name, 

398 loops_dir=loops_dir, 

399 instance_id=instance_id, 

400 ) 

401 try: 

402 persistence.clear_all() 

403 (running_dir / f"{stem}.pid").unlink(missing_ok=True) 

404 archived += 1 

405 logger.debug("Archived stale run: %s (status=%s)", stem, state.status) 

406 except OSError as e: 

407 logger.warning("Failed to archive stale run %s: %s", stem, e) 

408 

409 if archived: 

410 logger.info("Reconciliation sweep archived %d stale run(s) from .running/", archived) 

411 

412 return archived 

413 

414 

415class PersistentExecutor: 

416 """FSM Executor with state persistence and event streaming. 

417 

418 Wraps FSMExecutor to: 

419 - Save state after each state transition 

420 - Append events to JSONL file as they occur 

421 - Support resuming from saved state 

422 - Support graceful shutdown via signal handling 

423 """ 

424 

425 def __init__( 

426 self, 

427 fsm: FSMLoop, 

428 persistence: StatePersistence | None = None, 

429 loops_dir: Path | None = None, 

430 instance_id: str | None = None, 

431 pid: int | None = None, 

432 **executor_kwargs: Any, 

433 ) -> None: 

434 """Initialize persistent executor. 

435 

436 Args: 

437 fsm: FSM loop definition 

438 persistence: Optional pre-configured persistence (for testing) 

439 loops_dir: Base directory for loops (default: .loops) 

440 instance_id: Optional unique instance identifier for file path scoping 

441 pid: OS PID of the running process; stored in saved state for reconciliation 

442 **executor_kwargs: Additional kwargs for FSMExecutor 

443 """ 

444 from little_loops.fsm.handoff_handler import HandoffBehavior, HandoffHandler 

445 from little_loops.fsm.signal_detector import SignalDetector 

446 

447 self.fsm = fsm 

448 self.loops_dir = loops_dir 

449 self._run_pid = pid 

450 self.persistence = persistence or StatePersistence( 

451 fsm.name, loops_dir or Path(".loops"), instance_id=instance_id 

452 ) 

453 self.persistence.initialize() 

454 

455 # Create signal detector and handler based on FSM config 

456 signal_detector = SignalDetector() 

457 handoff_handler = HandoffHandler(HandoffBehavior(fsm.on_handoff)) 

458 

459 # Create base executor with event callback that persists 

460 self._executor = FSMExecutor( 

461 fsm, 

462 event_callback=self._handle_event, 

463 signal_detector=signal_detector, 

464 handoff_handler=handoff_handler, 

465 loops_dir=self.loops_dir, 

466 **executor_kwargs, 

467 ) 

468 self._last_result: dict[str, Any] | None = None 

469 self._continuation_prompt: str | None = None 

470 self.event_bus = EventBus() 

471 

472 @property 

473 def _on_event(self) -> EventCallback | None: 

474 """Backward-compatible access to the first observer on the event bus.""" 

475 return self.event_bus._observers[0][0] if self.event_bus._observers else None 

476 

477 @_on_event.setter 

478 def _on_event(self, callback: EventCallback | None) -> None: 

479 """Backward-compatible setter: replaces all observers with this one.""" 

480 self.event_bus._observers.clear() 

481 if callback is not None: 

482 self.event_bus.register(callback) 

483 

484 def close_transports(self) -> None: 

485 """Close all transports registered on the underlying EventBus.""" 

486 self.event_bus.close_transports() 

487 

488 def request_shutdown(self) -> None: 

489 """Request graceful shutdown of the executor. 

490 

491 Delegates to the underlying FSMExecutor's request_shutdown method. 

492 The loop will exit cleanly after the current state completes, 

493 saving state as "interrupted" so it can be resumed later. 

494 """ 

495 self._executor.request_shutdown() 

496 

497 def _handle_event(self, event: dict[str, Any]) -> None: 

498 """Handle event: persist to file and save state. 

499 

500 Args: 

501 event: Event dictionary from executor 

502 """ 

503 self.persistence.append_event(event) 

504 

505 # Save state after state transitions 

506 event_type = event.get("event") 

507 if event_type in ("state_enter", "loop_complete"): 

508 self._save_state() 

509 

510 # Track evaluation results for state persistence 

511 if event_type == "evaluate": 

512 self._last_result = { 

513 "verdict": event.get("verdict"), 

514 "details": { 

515 k: v for k, v in event.items() if k not in ("event", "ts", "type", "verdict") 

516 }, 

517 } 

518 

519 # Track handoff events for continuation prompt 

520 if event_type == "handoff_detected": 

521 self._continuation_prompt = event.get("continuation") 

522 

523 # Delegate to registered observers (e.g. progress display, extensions) 

524 self.event_bus.emit(event) 

525 

526 def _save_state(self) -> None: 

527 """Save current executor state to file.""" 

528 status = "running" 

529 if self._executor.current_state: 

530 state_config = self.fsm.states.get(self._executor.current_state) 

531 if state_config and state_config.terminal: 

532 status = "completed" 

533 

534 state = LoopState( 

535 loop_name=self.fsm.name, 

536 current_state=self._executor.current_state, 

537 iteration=self._executor.iteration, 

538 captured=self._executor.captured, 

539 prev_result=self._executor.prev_result, 

540 last_result=self._last_result, 

541 started_at=self._executor.started_at, 

542 updated_at="", # Will be set by save_state 

543 status=status, 

544 accumulated_ms=_now_ms() 

545 - self._executor.start_time_ms 

546 + self._executor.elapsed_offset_ms, 

547 retry_counts=dict(self._executor._retry_counts), 

548 rate_limit_retries={k: dict(v) for k, v in self._executor._rate_limit_retries.items()}, 

549 consecutive_rate_limit_exhaustions=(self._executor._consecutive_rate_limit_exhaustions), 

550 edge_revisit_counts=dict(self._executor._edge_revisit_counts), 

551 pid=self._run_pid, 

552 ) 

553 self.persistence.save_state(state) 

554 

555 def run(self, clear_previous: bool = True) -> ExecutionResult: 

556 """Run the FSM with persistence. 

557 

558 Args: 

559 clear_previous: If True, clear previous state/events before running 

560 

561 Returns: 

562 ExecutionResult from the execution 

563 """ 

564 if clear_previous: 

565 self.persistence.clear_all() 

566 

567 result = self._executor.run() 

568 

569 # Update final state 

570 final_status = "completed" if result.terminated_by == "terminal" else "failed" 

571 if result.terminated_by in ("max_iterations", "signal"): 

572 final_status = "interrupted" 

573 if result.terminated_by == "handoff": 

574 final_status = "awaiting_continuation" 

575 if result.terminated_by == "timeout": 

576 final_status = "timed_out" 

577 if result.terminated_by == "cycle_detected": 

578 final_status = "failed" 

579 

580 final_state = LoopState( 

581 loop_name=self.fsm.name, 

582 current_state=result.final_state, 

583 iteration=result.iterations, 

584 captured=result.captured, 

585 prev_result=self._executor.prev_result, 

586 last_result=self._last_result, 

587 started_at=self._executor.started_at, 

588 updated_at="", 

589 status=final_status, 

590 continuation_prompt=self._continuation_prompt, 

591 accumulated_ms=result.duration_ms, 

592 ) 

593 self.persistence.save_state(final_state) 

594 self.persistence.archive_run() 

595 

596 return result 

597 

598 def resume(self) -> ExecutionResult | None: 

599 """Resume from saved state, or None if no resumable state. 

600 

601 Resumable states are: "running", "awaiting_continuation", and "interrupted". 

602 

603 Returns: 

604 ExecutionResult if resumed and completed, None if no resumable state 

605 """ 

606 state = self.persistence.load_state() 

607 if state is None: 

608 return None 

609 

610 if state.status not in RESUMABLE_STATUSES: 

611 return None # Already completed/failed 

612 

613 # Restore executor state 

614 self._executor.current_state = state.current_state 

615 self._executor.iteration = state.iteration 

616 self._executor.captured = state.captured 

617 self._executor.prev_result = state.prev_result 

618 self._executor.started_at = state.started_at 

619 self._last_result = state.last_result 

620 self._executor._retry_counts = dict(state.retry_counts) 

621 self._executor._rate_limit_retries = { 

622 k: dict(v) for k, v in state.rate_limit_retries.items() 

623 } 

624 self._executor._consecutive_rate_limit_exhaustions = ( 

625 state.consecutive_rate_limit_exhaustions 

626 ) 

627 self._executor._edge_revisit_counts = dict(state.edge_revisit_counts) 

628 

629 # Restore accumulated elapsed time so duration_ms and ${loop.elapsed_ms} reflect 

630 # the full loop lifetime (all segments), not just the resumed segment. 

631 # FSMExecutor.run() will reset start_time_ms to _now_ms(), so we use elapsed_offset_ms 

632 # to carry forward the time already spent before this resume. 

633 self._executor.elapsed_offset_ms = state.accumulated_ms 

634 

635 # Clear any pending signals from previous run 

636 self._executor._pending_handoff = None 

637 self._executor._pending_error = None 

638 

639 # Emit resume event with continuation context if available 

640 resume_event: dict[str, Any] = { 

641 "event": "loop_resume", 

642 "ts": _iso_now(), 

643 "loop": self.fsm.name, 

644 "from_state": state.current_state, 

645 "iteration": state.iteration, 

646 } 

647 if state.status == "awaiting_continuation" and state.continuation_prompt: 

648 resume_event["from_handoff"] = True 

649 resume_event["continuation_prompt"] = state.continuation_prompt 

650 self.persistence.append_event(resume_event) 

651 self.event_bus.emit(resume_event) 

652 

653 # Continue execution (don't clear previous events) 

654 return self.run(clear_previous=False) 

655 

656 

657def _find_instances(loop_name: str, running_dir: Path) -> list[tuple[str | None, LoopState]]: 

658 """Discover all state-file instances for *loop_name* in *running_dir*. 

659 

660 Globs ``{loop_name}-*.state.json`` for instance-scoped files and 

661 ``{loop_name}.state.json`` for legacy bare-name files. 

662 

663 Returns: 

664 List of ``(instance_id, LoopState)`` tuples sorted by file name. 

665 *instance_id* is the file stem (e.g. ``"autodev-20260503T122306"``) 

666 for instance-scoped files, or ``None`` for legacy bare-name files. 

667 """ 

668 if not running_dir.exists(): 

669 return [] 

670 

671 instances: list[tuple[str | None, LoopState]] = [] 

672 

673 # Instance-scoped files: {loop_name}-YYYYMMDDTHHMMSS.state.json 

674 # Use Path(stem).stem to strip both suffixes (.state.json → base stem). 

675 for state_file in sorted(running_dir.glob(f"{loop_name}-*.state.json")): 

676 base_stem = Path(state_file.stem).stem # e.g. "autodev-20260503T122306" 

677 if not _INSTANCE_SUFFIX.search(base_stem): 

678 continue # skip files like "loop-name-extra" that don't match timestamp pattern 

679 try: 

680 data = json.loads(state_file.read_text()) 

681 instances.append((base_stem, LoopState.from_dict(data))) 

682 except (json.JSONDecodeError, KeyError): 

683 continue 

684 

685 # Legacy bare-name file: {loop_name}.state.json 

686 legacy_file = running_dir / f"{loop_name}.state.json" 

687 if legacy_file.exists(): 

688 try: 

689 data = json.loads(legacy_file.read_text()) 

690 instances.append((None, LoopState.from_dict(data))) 

691 except (json.JSONDecodeError, KeyError): 

692 pass 

693 

694 return instances 

695 

696 

697def list_running_loops(loops_dir: Path | None = None) -> list[LoopState]: 

698 """List all loops with saved state. 

699 

700 Args: 

701 loops_dir: Base directory for loops (default: .loops) 

702 

703 Returns: 

704 List of LoopState objects for all loops with state files 

705 """ 

706 base_dir = loops_dir or Path(".loops") 

707 running_dir = base_dir / RUNNING_DIR 

708 

709 if not running_dir.exists(): 

710 return [] 

711 

712 states: list[LoopState] = [] 

713 for state_file in running_dir.glob("*.state.json"): 

714 try: 

715 data = json.loads(state_file.read_text()) 

716 states.append(LoopState.from_dict(data)) 

717 except (json.JSONDecodeError, KeyError): 

718 continue # Skip malformed files 

719 

720 # Include loops that have a PID file but no state file yet (still starting up). 

721 # Strip instance-ID timestamp suffix (e.g. "autodev-20240115T103000" → "autodev") 

722 # before the known_names check to avoid spurious "starting" entries for loops 

723 # that already have a state file under their logical name. 

724 known_names = {s.loop_name for s in states} 

725 for pid_file in running_dir.glob("*.pid"): 

726 logical_name = _INSTANCE_SUFFIX.sub("", pid_file.stem) 

727 if logical_name in known_names: 

728 continue # state file already covers this loop 

729 try: 

730 pid = int(pid_file.read_text().strip()) 

731 except (ValueError, OSError): 

732 continue 

733 if _process_alive(pid): 

734 states.append( 

735 LoopState( 

736 loop_name=logical_name, 

737 current_state="(initializing)", 

738 iteration=0, 

739 captured={}, 

740 prev_result=None, 

741 last_result=None, 

742 started_at="", 

743 updated_at="", 

744 status="starting", 

745 ) 

746 ) 

747 

748 return states 

749 

750 

751def list_run_history(loop_name: str, loops_dir: Path | None = None) -> list[LoopState]: 

752 """List archived runs for a loop, newest first. 

753 

754 Reads state files from .loops/.history/<run_id>-<loop_name>/state.json and 

755 returns them sorted by started_at descending (most recent run first). 

756 

757 Also checks the legacy nested layout .loops/.history/<loop_name>/*/state.json 

758 for backward compatibility with existing history folders. 

759 

760 Args: 

761 loop_name: Name of the loop 

762 loops_dir: Base directory for loops (default: .loops) 

763 

764 Returns: 

765 List of LoopState objects for all archived runs, newest first. 

766 Returns an empty list if no history exists. 

767 """ 

768 base_dir = loops_dir or Path(".loops") 

769 history_dir = base_dir / HISTORY_DIR 

770 

771 if not history_dir.exists(): 

772 return [] 

773 

774 states: list[LoopState] = [] 

775 

776 # Flat layout: <run_id>-<loop_name>/state.json 

777 for state_file in history_dir.glob(f"*-{loop_name}/state.json"): 

778 try: 

779 data = json.loads(state_file.read_text()) 

780 states.append(LoopState.from_dict(data)) 

781 except (json.JSONDecodeError, KeyError): 

782 continue 

783 

784 # Backward compat: legacy nested layout <loop_name>/<run_id>/state.json 

785 old_loop_dir = history_dir / loop_name 

786 if old_loop_dir.exists(): 

787 logger.warning( 

788 "Found legacy nested history at %s; migrate to flat layout by moving " 

789 "each run to .history/<run_id>-%s/", 

790 old_loop_dir, 

791 loop_name, 

792 ) 

793 for state_file in old_loop_dir.glob("*/state.json"): 

794 try: 

795 data = json.loads(state_file.read_text()) 

796 states.append(LoopState.from_dict(data)) 

797 except (json.JSONDecodeError, KeyError): 

798 continue 

799 

800 states.sort(key=lambda s: s.started_at, reverse=True) 

801 return states 

802 

803 

804def get_archived_events( 

805 loop_name: str, run_id: str, loops_dir: Path | None = None 

806) -> list[dict[str, Any]]: 

807 """Read events for a specific archived run. 

808 

809 Args: 

810 loop_name: Name of the loop 

811 run_id: The run directory name (compact timestamp) 

812 loops_dir: Base directory for loops (default: .loops) 

813 

814 Returns: 

815 List of event dictionaries, empty if not found. 

816 """ 

817 base_dir = loops_dir or Path(".loops") 

818 run_folder = f"{run_id}-{loop_name}" 

819 events_file = base_dir / HISTORY_DIR / run_folder / "events.jsonl" 

820 

821 if not events_file.exists(): 

822 return [] 

823 

824 events: list[dict[str, Any]] = [] 

825 with open(events_file, encoding="utf-8") as f: 

826 for line in f: 

827 line = line.strip() 

828 if line: 

829 try: 

830 events.append(json.loads(line)) 

831 except json.JSONDecodeError: 

832 continue 

833 return events 

834 

835 

836def get_loop_history(loop_name: str, loops_dir: Path | None = None) -> list[dict[str, Any]]: 

837 """Get event history for a loop. 

838 

839 Args: 

840 loop_name: Name of the loop 

841 loops_dir: Base directory for loops (default: .loops) 

842 

843 Returns: 

844 List of event dictionaries 

845 """ 

846 persistence = StatePersistence(loop_name, loops_dir) 

847 return persistence.read_events()