Coverage for little_loops / fsm / persistence.py: 90%
363 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""State persistence and event streaming for FSM loops.
3This module provides persistence capabilities for FSM loop execution:
4- LoopState: Dataclass representing loop execution state
5- StatePersistence: File I/O for state and events
6- PersistentExecutor: Wrapper that persists state during execution
7- Utility functions for listing running loops and reading history
9File structure:
10 .loops/
11 ├── fix-types.yaml # Loop definition
12 ├── .running/ # Runtime state (auto-managed)
13 │ ├── fix-types-20260503T122306.state.json
14 │ └── fix-types-20260503T122306.events.jsonl
15 └── .history/ # Archived run logs (auto-populated)
16 └── 2024-01-15T103000-fix-types/
17 ├── state.json
18 └── events.jsonl
19"""
21from __future__ import annotations
23import json
24import logging
25import os
26import re
27import shutil
28import tempfile
29import time
30from dataclasses import dataclass, field
31from datetime import UTC, datetime
32from pathlib import Path
33from typing import Any
35from little_loops.events import EventBus
36from little_loops.fsm.concurrency import _process_alive
37from little_loops.fsm.executor import EventCallback, ExecutionResult, FSMExecutor
38from little_loops.fsm.schema import FSMLoop
40RUNNING_DIR = ".running"
41HISTORY_DIR = ".history"
43RESUMABLE_STATUSES: frozenset[str] = frozenset({"running", "awaiting_continuation", "interrupted"})
45logger = logging.getLogger(__name__)
47_RUN_FOLDER = re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{6})-(.+)$")
48_INSTANCE_SUFFIX = re.compile(r"-\d{8}T\d{6}$")
51def _parse_run_folder(name: str) -> tuple[str, str] | None:
52 """Return (run_id, loop_name) from a flat history folder name, or None."""
53 m = _RUN_FOLDER.match(name)
54 return (m.group(1), m.group(2)) if m else None
57def _iso_now() -> str:
58 """Return current time as ISO 8601 string."""
59 return datetime.now(UTC).isoformat()
62def _now_ms() -> int:
63 """Return current time in milliseconds."""
64 return int(time.time() * 1000)
67@dataclass
68class LoopState:
69 """Persistent state for an FSM loop execution.
71 This captures all runtime state needed to resume a loop:
72 - Current state and iteration
73 - Captured variables and previous result
74 - Last evaluation result
75 - Timestamps and status
77 Attributes:
78 loop_name: Name of the loop
79 current_state: Current FSM state name
80 iteration: Current iteration count (1-based)
81 captured: Captured action outputs by variable name
82 prev_result: Previous state's result (output, exit_code, state)
83 last_result: Last evaluation result (verdict, details)
84 started_at: ISO timestamp when loop started
85 updated_at: ISO timestamp when state was last saved
86 status: Execution status (running, completed, failed, interrupted, awaiting_continuation, timed_out)
87 continuation_prompt: Continuation context from handoff signal (if status is awaiting_continuation)
88 accumulated_ms: Total milliseconds elapsed across all segments up to this save (used to restore
89 elapsed time correctly after resume, so duration_ms and ${loop.elapsed_ms} reflect the
90 full loop lifetime rather than only the most recent segment)
91 """
93 loop_name: str
94 current_state: str
95 iteration: int
96 captured: dict[str, dict[str, Any]]
97 prev_result: dict[str, Any] | None
98 last_result: dict[str, Any] | None
99 started_at: str
100 updated_at: str
101 status: (
102 str # "running", "completed", "failed", "interrupted", "awaiting_continuation", "timed_out"
103 )
104 continuation_prompt: str | None = None
105 accumulated_ms: int = 0 # total elapsed ms across all segments (for resume offset)
106 retry_counts: dict[str, int] = field(default_factory=dict) # per-state retry tracking
107 # Per-state rate-limit retry tracking (ENH-1133: dict-of-record).
108 # Each record: {"short_retries": int, "long_retries": int,
109 # "total_wait_seconds": float, "first_seen_at": float | None}.
110 # Legacy int values (dict[str, int]) are coerced in from_dict.
111 rate_limit_retries: dict[str, dict[str, Any]] = field(default_factory=dict)
112 # Count of consecutive rate_limit_exhausted emissions across states. Reset
113 # on any non-rate-limited state outcome. Persisted for resume durability.
114 consecutive_rate_limit_exhaustions: int = 0
115 # Per-edge revisit tracking for cycle detection.
116 edge_revisit_counts: dict[str, int] = field(default_factory=dict)
117 active_sub_loop: str | None = None # name of currently executing sub-loop (observability)
118 pid: int | None = None # OS PID of the process that started this run (for reconciliation sweep)
120 def to_dict(self) -> dict[str, Any]:
121 """Convert to dictionary for JSON serialization."""
122 result = {
123 "loop_name": self.loop_name,
124 "current_state": self.current_state,
125 "iteration": self.iteration,
126 "captured": self.captured,
127 "prev_result": self.prev_result,
128 "last_result": self.last_result,
129 "started_at": self.started_at,
130 "updated_at": self.updated_at,
131 "status": self.status,
132 "accumulated_ms": self.accumulated_ms,
133 }
134 if self.continuation_prompt is not None:
135 result["continuation_prompt"] = self.continuation_prompt
136 if self.retry_counts:
137 result["retry_counts"] = self.retry_counts
138 if self.rate_limit_retries:
139 result["rate_limit_retries"] = self.rate_limit_retries
140 if self.consecutive_rate_limit_exhaustions:
141 result["consecutive_rate_limit_exhaustions"] = self.consecutive_rate_limit_exhaustions
142 if self.edge_revisit_counts:
143 result["edge_revisit_counts"] = self.edge_revisit_counts
144 if self.active_sub_loop is not None:
145 result["active_sub_loop"] = self.active_sub_loop
146 if self.pid is not None:
147 result["pid"] = self.pid
148 return result
150 @classmethod
151 def from_dict(cls, data: dict[str, Any]) -> LoopState:
152 """Create LoopState from dictionary.
154 Migrates legacy ``rate_limit_retries`` values from ``dict[str, int]``
155 (BUG-1107 pre-ENH-1133 shape) to the dict-of-record shape. Integer
156 values are coerced to ``{"short_retries": <int>, "long_retries": 0,
157 "total_wait_seconds": 0.0, "first_seen_at": None}``.
159 Args:
160 data: Dictionary with loop state fields
162 Returns:
163 LoopState instance
164 """
165 raw_rl = data.get("rate_limit_retries", {}) or {}
166 migrated_rl: dict[str, dict[str, Any]] = {}
167 for state_name, value in raw_rl.items():
168 if isinstance(value, int):
169 migrated_rl[state_name] = {
170 "short_retries": value,
171 "long_retries": 0,
172 "total_wait_seconds": 0.0,
173 "first_seen_at": None,
174 }
175 elif isinstance(value, dict):
176 migrated_rl[state_name] = value
177 return cls(
178 loop_name=data["loop_name"],
179 current_state=data["current_state"],
180 iteration=data["iteration"],
181 captured=data.get("captured", {}),
182 prev_result=data.get("prev_result"),
183 last_result=data.get("last_result"),
184 started_at=data["started_at"],
185 updated_at=data.get("updated_at", ""),
186 status=data["status"],
187 continuation_prompt=data.get("continuation_prompt"),
188 accumulated_ms=data.get("accumulated_ms", 0),
189 retry_counts=data.get("retry_counts", {}),
190 rate_limit_retries=migrated_rl,
191 consecutive_rate_limit_exhaustions=data.get("consecutive_rate_limit_exhaustions", 0),
192 edge_revisit_counts=data.get("edge_revisit_counts", {}),
193 active_sub_loop=data.get("active_sub_loop"),
194 pid=data.get("pid"),
195 )
198class StatePersistence:
199 """Manage loop state persistence and event streaming.
201 Handles file I/O for:
202 - State file: JSON file with current execution state
203 - Events file: JSONL file with execution events (append-only)
205 Files are stored in .loops/.running/<instance_id>.*
206 """
208 def __init__(
209 self, loop_name: str, loops_dir: Path | None = None, instance_id: str | None = None
210 ) -> None:
211 """Initialize persistence for a loop.
213 Args:
214 loop_name: Name of the loop
215 loops_dir: Base directory for loops (default: .loops)
216 instance_id: Optional unique instance identifier; falls back to loop_name when None
217 """
218 self.loop_name = loop_name
219 self.loops_dir = loops_dir or Path(".loops")
220 self.running_dir = self.loops_dir / RUNNING_DIR
221 stem = instance_id or loop_name
222 self.state_file = self.running_dir / f"{stem}.state.json"
223 self.events_file = self.running_dir / f"{stem}.events.jsonl"
225 def initialize(self) -> None:
226 """Create running directory if needed."""
227 self.running_dir.mkdir(parents=True, exist_ok=True)
229 def save_state(self, state: LoopState) -> None:
230 """Save current state to file using an atomic write.
232 Updates the updated_at timestamp before saving. Writes to a temporary
233 file first, then renames it over the target to avoid leaving a corrupt
234 or empty state file if the process is killed mid-write.
236 Args:
237 state: LoopState to save
238 """
239 state.updated_at = _iso_now()
240 data = json.dumps(state.to_dict(), indent=2)
241 tmp_fd, tmp_path = tempfile.mkstemp(dir=self.state_file.parent, suffix=".tmp")
242 try:
243 with os.fdopen(tmp_fd, "w") as f:
244 f.write(data)
245 os.replace(tmp_path, self.state_file)
246 except Exception:
247 os.unlink(tmp_path)
248 raise
250 def load_state(self) -> LoopState | None:
251 """Load state from file, or None if not exists.
253 Returns:
254 LoopState if file exists and is valid, None otherwise
255 """
256 if not self.state_file.exists():
257 return None
258 try:
259 data = json.loads(self.state_file.read_text())
260 except json.JSONDecodeError:
261 return None
262 try:
263 return LoopState.from_dict(data)
264 except KeyError as e:
265 logger.warning("Corrupted state file %s: missing key %s", self.state_file, e)
266 return None
268 def clear_state(self) -> None:
269 """Remove state file."""
270 if self.state_file.exists():
271 self.state_file.unlink()
273 def append_event(self, event: dict[str, Any]) -> None:
274 """Append event to JSONL file.
276 Args:
277 event: Event dictionary to append
278 """
279 with open(self.events_file, "a", encoding="utf-8") as f:
280 f.write(json.dumps(event) + "\n")
282 def read_events(self) -> list[dict[str, Any]]:
283 """Read all events from file.
285 Returns:
286 List of event dictionaries, empty if file doesn't exist
287 """
288 if not self.events_file.exists():
289 return []
290 events: list[dict[str, Any]] = []
291 with open(self.events_file, encoding="utf-8") as f:
292 for line in f:
293 line = line.strip()
294 if line:
295 try:
296 events.append(json.loads(line))
297 except json.JSONDecodeError:
298 continue # Skip malformed lines
299 return events
301 def clear_events(self) -> None:
302 """Remove events file."""
303 if self.events_file.exists():
304 self.events_file.unlink()
306 def archive_run(self) -> Path | None:
307 """Archive current run files to .history/ before clearing.
309 Reads the current state to derive the run timestamp, then copies
310 both state.json and events.jsonl into:
311 <loops_dir>/.history/<run_id>-<loop_name>/
313 where run_id is a compact ISO timestamp derived from started_at
314 (e.g. "2024-01-15T103000" from "2024-01-15T10:30:00.123456+00:00").
316 Returns:
317 Path to the archive directory if files were archived, None if
318 there were no files to archive (fresh run).
319 """
320 has_state = self.state_file.exists()
321 has_events = self.events_file.exists()
322 if not has_state and not has_events:
323 return None
325 # Derive run ID from started_at in state file, or fall back to now
326 state = self.load_state()
327 if state is not None and state.started_at:
328 # Compact ISO: strip colons, dots, plus signs; take first 19 chars
329 # e.g. "2024-01-15T10:30:00.123+00:00" → "2024-01-15T103000"
330 run_id = state.started_at.replace(":", "").replace(".", "").replace("+", "")[:17]
331 else:
332 run_id = datetime.now(UTC).strftime("%Y-%m-%dT%H%M%S")
334 run_folder = f"{run_id}-{self.loop_name}"
335 archive_dir = self.loops_dir / HISTORY_DIR / run_folder
336 archive_dir.mkdir(parents=True, exist_ok=True)
338 if has_state:
339 shutil.copy2(self.state_file, archive_dir / "state.json")
340 if has_events:
341 shutil.copy2(self.events_file, archive_dir / "events.jsonl")
343 return archive_dir
345 def clear_all(self) -> None:
346 """Archive current run files then clear state and events (for new run)."""
347 self.archive_run()
348 self.clear_state()
349 self.clear_events()
352def _reconcile_stale_runs(loops_dir: Path) -> int:
353 """Archive state files in .running/ that belong to dead or terminal processes.
355 Called at loop startup to clean up files left by crashed or interrupted runs.
356 Returns the count of archived files.
358 Strategy (mirrors LockManager.find_conflict() stale-lock cleanup):
359 - Terminal-status files (completed/failed/timed_out) are archived
360 unconditionally — they are definitionally stale by invariant.
361 - status="interrupted" files are left alone so the user can resume them.
362 - status="running" files are checked via their sibling .pid file; archived
363 only if the PID is confirmed dead. No .pid file → leave alone (can't confirm).
364 """
365 running_dir = loops_dir / RUNNING_DIR
366 if not running_dir.exists():
367 return 0
369 terminal_statuses = {"completed", "failed", "timed_out"}
370 archived = 0
372 for state_file in running_dir.glob("*.state.json"):
373 try:
374 data = json.loads(state_file.read_text())
375 state = LoopState.from_dict(data)
376 except (json.JSONDecodeError, KeyError, OSError):
377 continue
379 is_stale = state.status in terminal_statuses
381 if not is_stale and state.status == "running":
382 stem = state_file.name.removesuffix(".state.json")
383 pid_file = running_dir / f"{stem}.pid"
384 if pid_file.exists():
385 try:
386 pid = int(pid_file.read_text().strip())
387 is_stale = not _process_alive(pid)
388 except (OSError, ValueError):
389 pass
391 if not is_stale:
392 continue
394 stem = state_file.name.removesuffix(".state.json")
395 instance_id = stem if stem != state.loop_name else None
396 persistence = StatePersistence(
397 loop_name=state.loop_name,
398 loops_dir=loops_dir,
399 instance_id=instance_id,
400 )
401 try:
402 persistence.clear_all()
403 (running_dir / f"{stem}.pid").unlink(missing_ok=True)
404 archived += 1
405 logger.debug("Archived stale run: %s (status=%s)", stem, state.status)
406 except OSError as e:
407 logger.warning("Failed to archive stale run %s: %s", stem, e)
409 if archived:
410 logger.info("Reconciliation sweep archived %d stale run(s) from .running/", archived)
412 return archived
415class PersistentExecutor:
416 """FSM Executor with state persistence and event streaming.
418 Wraps FSMExecutor to:
419 - Save state after each state transition
420 - Append events to JSONL file as they occur
421 - Support resuming from saved state
422 - Support graceful shutdown via signal handling
423 """
425 def __init__(
426 self,
427 fsm: FSMLoop,
428 persistence: StatePersistence | None = None,
429 loops_dir: Path | None = None,
430 instance_id: str | None = None,
431 pid: int | None = None,
432 **executor_kwargs: Any,
433 ) -> None:
434 """Initialize persistent executor.
436 Args:
437 fsm: FSM loop definition
438 persistence: Optional pre-configured persistence (for testing)
439 loops_dir: Base directory for loops (default: .loops)
440 instance_id: Optional unique instance identifier for file path scoping
441 pid: OS PID of the running process; stored in saved state for reconciliation
442 **executor_kwargs: Additional kwargs for FSMExecutor
443 """
444 from little_loops.fsm.handoff_handler import HandoffBehavior, HandoffHandler
445 from little_loops.fsm.signal_detector import SignalDetector
447 self.fsm = fsm
448 self.loops_dir = loops_dir
449 self._run_pid = pid
450 self.persistence = persistence or StatePersistence(
451 fsm.name, loops_dir or Path(".loops"), instance_id=instance_id
452 )
453 self.persistence.initialize()
455 # Create signal detector and handler based on FSM config
456 signal_detector = SignalDetector()
457 handoff_handler = HandoffHandler(HandoffBehavior(fsm.on_handoff))
459 # Create base executor with event callback that persists
460 self._executor = FSMExecutor(
461 fsm,
462 event_callback=self._handle_event,
463 signal_detector=signal_detector,
464 handoff_handler=handoff_handler,
465 loops_dir=self.loops_dir,
466 **executor_kwargs,
467 )
468 self._last_result: dict[str, Any] | None = None
469 self._continuation_prompt: str | None = None
470 self.event_bus = EventBus()
472 @property
473 def _on_event(self) -> EventCallback | None:
474 """Backward-compatible access to the first observer on the event bus."""
475 return self.event_bus._observers[0][0] if self.event_bus._observers else None
477 @_on_event.setter
478 def _on_event(self, callback: EventCallback | None) -> None:
479 """Backward-compatible setter: replaces all observers with this one."""
480 self.event_bus._observers.clear()
481 if callback is not None:
482 self.event_bus.register(callback)
484 def close_transports(self) -> None:
485 """Close all transports registered on the underlying EventBus."""
486 self.event_bus.close_transports()
488 def request_shutdown(self) -> None:
489 """Request graceful shutdown of the executor.
491 Delegates to the underlying FSMExecutor's request_shutdown method.
492 The loop will exit cleanly after the current state completes,
493 saving state as "interrupted" so it can be resumed later.
494 """
495 self._executor.request_shutdown()
497 def _handle_event(self, event: dict[str, Any]) -> None:
498 """Handle event: persist to file and save state.
500 Args:
501 event: Event dictionary from executor
502 """
503 self.persistence.append_event(event)
505 # Save state after state transitions
506 event_type = event.get("event")
507 if event_type in ("state_enter", "loop_complete"):
508 self._save_state()
510 # Track evaluation results for state persistence
511 if event_type == "evaluate":
512 self._last_result = {
513 "verdict": event.get("verdict"),
514 "details": {
515 k: v for k, v in event.items() if k not in ("event", "ts", "type", "verdict")
516 },
517 }
519 # Track handoff events for continuation prompt
520 if event_type == "handoff_detected":
521 self._continuation_prompt = event.get("continuation")
523 # Delegate to registered observers (e.g. progress display, extensions)
524 self.event_bus.emit(event)
526 def _save_state(self) -> None:
527 """Save current executor state to file."""
528 status = "running"
529 if self._executor.current_state:
530 state_config = self.fsm.states.get(self._executor.current_state)
531 if state_config and state_config.terminal:
532 status = "completed"
534 state = LoopState(
535 loop_name=self.fsm.name,
536 current_state=self._executor.current_state,
537 iteration=self._executor.iteration,
538 captured=self._executor.captured,
539 prev_result=self._executor.prev_result,
540 last_result=self._last_result,
541 started_at=self._executor.started_at,
542 updated_at="", # Will be set by save_state
543 status=status,
544 accumulated_ms=_now_ms()
545 - self._executor.start_time_ms
546 + self._executor.elapsed_offset_ms,
547 retry_counts=dict(self._executor._retry_counts),
548 rate_limit_retries={k: dict(v) for k, v in self._executor._rate_limit_retries.items()},
549 consecutive_rate_limit_exhaustions=(self._executor._consecutive_rate_limit_exhaustions),
550 edge_revisit_counts=dict(self._executor._edge_revisit_counts),
551 pid=self._run_pid,
552 )
553 self.persistence.save_state(state)
555 def run(self, clear_previous: bool = True) -> ExecutionResult:
556 """Run the FSM with persistence.
558 Args:
559 clear_previous: If True, clear previous state/events before running
561 Returns:
562 ExecutionResult from the execution
563 """
564 if clear_previous:
565 self.persistence.clear_all()
567 result = self._executor.run()
569 # Update final state
570 final_status = "completed" if result.terminated_by == "terminal" else "failed"
571 if result.terminated_by in ("max_iterations", "signal"):
572 final_status = "interrupted"
573 if result.terminated_by == "handoff":
574 final_status = "awaiting_continuation"
575 if result.terminated_by == "timeout":
576 final_status = "timed_out"
577 if result.terminated_by == "cycle_detected":
578 final_status = "failed"
580 final_state = LoopState(
581 loop_name=self.fsm.name,
582 current_state=result.final_state,
583 iteration=result.iterations,
584 captured=result.captured,
585 prev_result=self._executor.prev_result,
586 last_result=self._last_result,
587 started_at=self._executor.started_at,
588 updated_at="",
589 status=final_status,
590 continuation_prompt=self._continuation_prompt,
591 accumulated_ms=result.duration_ms,
592 )
593 self.persistence.save_state(final_state)
594 self.persistence.archive_run()
596 return result
598 def resume(self) -> ExecutionResult | None:
599 """Resume from saved state, or None if no resumable state.
601 Resumable states are: "running", "awaiting_continuation", and "interrupted".
603 Returns:
604 ExecutionResult if resumed and completed, None if no resumable state
605 """
606 state = self.persistence.load_state()
607 if state is None:
608 return None
610 if state.status not in RESUMABLE_STATUSES:
611 return None # Already completed/failed
613 # Restore executor state
614 self._executor.current_state = state.current_state
615 self._executor.iteration = state.iteration
616 self._executor.captured = state.captured
617 self._executor.prev_result = state.prev_result
618 self._executor.started_at = state.started_at
619 self._last_result = state.last_result
620 self._executor._retry_counts = dict(state.retry_counts)
621 self._executor._rate_limit_retries = {
622 k: dict(v) for k, v in state.rate_limit_retries.items()
623 }
624 self._executor._consecutive_rate_limit_exhaustions = (
625 state.consecutive_rate_limit_exhaustions
626 )
627 self._executor._edge_revisit_counts = dict(state.edge_revisit_counts)
629 # Restore accumulated elapsed time so duration_ms and ${loop.elapsed_ms} reflect
630 # the full loop lifetime (all segments), not just the resumed segment.
631 # FSMExecutor.run() will reset start_time_ms to _now_ms(), so we use elapsed_offset_ms
632 # to carry forward the time already spent before this resume.
633 self._executor.elapsed_offset_ms = state.accumulated_ms
635 # Clear any pending signals from previous run
636 self._executor._pending_handoff = None
637 self._executor._pending_error = None
639 # Emit resume event with continuation context if available
640 resume_event: dict[str, Any] = {
641 "event": "loop_resume",
642 "ts": _iso_now(),
643 "loop": self.fsm.name,
644 "from_state": state.current_state,
645 "iteration": state.iteration,
646 }
647 if state.status == "awaiting_continuation" and state.continuation_prompt:
648 resume_event["from_handoff"] = True
649 resume_event["continuation_prompt"] = state.continuation_prompt
650 self.persistence.append_event(resume_event)
651 self.event_bus.emit(resume_event)
653 # Continue execution (don't clear previous events)
654 return self.run(clear_previous=False)
657def _find_instances(loop_name: str, running_dir: Path) -> list[tuple[str | None, LoopState]]:
658 """Discover all state-file instances for *loop_name* in *running_dir*.
660 Globs ``{loop_name}-*.state.json`` for instance-scoped files and
661 ``{loop_name}.state.json`` for legacy bare-name files.
663 Returns:
664 List of ``(instance_id, LoopState)`` tuples sorted by file name.
665 *instance_id* is the file stem (e.g. ``"autodev-20260503T122306"``)
666 for instance-scoped files, or ``None`` for legacy bare-name files.
667 """
668 if not running_dir.exists():
669 return []
671 instances: list[tuple[str | None, LoopState]] = []
673 # Instance-scoped files: {loop_name}-YYYYMMDDTHHMMSS.state.json
674 # Use Path(stem).stem to strip both suffixes (.state.json → base stem).
675 for state_file in sorted(running_dir.glob(f"{loop_name}-*.state.json")):
676 base_stem = Path(state_file.stem).stem # e.g. "autodev-20260503T122306"
677 if not _INSTANCE_SUFFIX.search(base_stem):
678 continue # skip files like "loop-name-extra" that don't match timestamp pattern
679 try:
680 data = json.loads(state_file.read_text())
681 instances.append((base_stem, LoopState.from_dict(data)))
682 except (json.JSONDecodeError, KeyError):
683 continue
685 # Legacy bare-name file: {loop_name}.state.json
686 legacy_file = running_dir / f"{loop_name}.state.json"
687 if legacy_file.exists():
688 try:
689 data = json.loads(legacy_file.read_text())
690 instances.append((None, LoopState.from_dict(data)))
691 except (json.JSONDecodeError, KeyError):
692 pass
694 return instances
697def list_running_loops(loops_dir: Path | None = None) -> list[LoopState]:
698 """List all loops with saved state.
700 Args:
701 loops_dir: Base directory for loops (default: .loops)
703 Returns:
704 List of LoopState objects for all loops with state files
705 """
706 base_dir = loops_dir or Path(".loops")
707 running_dir = base_dir / RUNNING_DIR
709 if not running_dir.exists():
710 return []
712 states: list[LoopState] = []
713 for state_file in running_dir.glob("*.state.json"):
714 try:
715 data = json.loads(state_file.read_text())
716 states.append(LoopState.from_dict(data))
717 except (json.JSONDecodeError, KeyError):
718 continue # Skip malformed files
720 # Include loops that have a PID file but no state file yet (still starting up).
721 # Strip instance-ID timestamp suffix (e.g. "autodev-20240115T103000" → "autodev")
722 # before the known_names check to avoid spurious "starting" entries for loops
723 # that already have a state file under their logical name.
724 known_names = {s.loop_name for s in states}
725 for pid_file in running_dir.glob("*.pid"):
726 logical_name = _INSTANCE_SUFFIX.sub("", pid_file.stem)
727 if logical_name in known_names:
728 continue # state file already covers this loop
729 try:
730 pid = int(pid_file.read_text().strip())
731 except (ValueError, OSError):
732 continue
733 if _process_alive(pid):
734 states.append(
735 LoopState(
736 loop_name=logical_name,
737 current_state="(initializing)",
738 iteration=0,
739 captured={},
740 prev_result=None,
741 last_result=None,
742 started_at="",
743 updated_at="",
744 status="starting",
745 )
746 )
748 return states
751def list_run_history(loop_name: str, loops_dir: Path | None = None) -> list[LoopState]:
752 """List archived runs for a loop, newest first.
754 Reads state files from .loops/.history/<run_id>-<loop_name>/state.json and
755 returns them sorted by started_at descending (most recent run first).
757 Also checks the legacy nested layout .loops/.history/<loop_name>/*/state.json
758 for backward compatibility with existing history folders.
760 Args:
761 loop_name: Name of the loop
762 loops_dir: Base directory for loops (default: .loops)
764 Returns:
765 List of LoopState objects for all archived runs, newest first.
766 Returns an empty list if no history exists.
767 """
768 base_dir = loops_dir or Path(".loops")
769 history_dir = base_dir / HISTORY_DIR
771 if not history_dir.exists():
772 return []
774 states: list[LoopState] = []
776 # Flat layout: <run_id>-<loop_name>/state.json
777 for state_file in history_dir.glob(f"*-{loop_name}/state.json"):
778 try:
779 data = json.loads(state_file.read_text())
780 states.append(LoopState.from_dict(data))
781 except (json.JSONDecodeError, KeyError):
782 continue
784 # Backward compat: legacy nested layout <loop_name>/<run_id>/state.json
785 old_loop_dir = history_dir / loop_name
786 if old_loop_dir.exists():
787 logger.warning(
788 "Found legacy nested history at %s; migrate to flat layout by moving "
789 "each run to .history/<run_id>-%s/",
790 old_loop_dir,
791 loop_name,
792 )
793 for state_file in old_loop_dir.glob("*/state.json"):
794 try:
795 data = json.loads(state_file.read_text())
796 states.append(LoopState.from_dict(data))
797 except (json.JSONDecodeError, KeyError):
798 continue
800 states.sort(key=lambda s: s.started_at, reverse=True)
801 return states
804def get_archived_events(
805 loop_name: str, run_id: str, loops_dir: Path | None = None
806) -> list[dict[str, Any]]:
807 """Read events for a specific archived run.
809 Args:
810 loop_name: Name of the loop
811 run_id: The run directory name (compact timestamp)
812 loops_dir: Base directory for loops (default: .loops)
814 Returns:
815 List of event dictionaries, empty if not found.
816 """
817 base_dir = loops_dir or Path(".loops")
818 run_folder = f"{run_id}-{loop_name}"
819 events_file = base_dir / HISTORY_DIR / run_folder / "events.jsonl"
821 if not events_file.exists():
822 return []
824 events: list[dict[str, Any]] = []
825 with open(events_file, encoding="utf-8") as f:
826 for line in f:
827 line = line.strip()
828 if line:
829 try:
830 events.append(json.loads(line))
831 except json.JSONDecodeError:
832 continue
833 return events
836def get_loop_history(loop_name: str, loops_dir: Path | None = None) -> list[dict[str, Any]]:
837 """Get event history for a loop.
839 Args:
840 loop_name: Name of the loop
841 loops_dir: Base directory for loops (default: .loops)
843 Returns:
844 List of event dictionaries
845 """
846 persistence = StatePersistence(loop_name, loops_dir)
847 return persistence.read_events()