Coverage for little_loops / state.py: 96%
101 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""State persistence for little-loops automation.
3Provides state management for resume capability during automated processing.
4"""
6from __future__ import annotations
8import json
9import os
10import tempfile
11from dataclasses import dataclass, field
12from datetime import UTC, datetime
13from pathlib import Path
14from typing import Any
16from little_loops.events import EventBus
17from little_loops.logger import Logger
20def _iso_now() -> str:
21 """Get current time as ISO 8601 string."""
22 return datetime.now(UTC).isoformat()
25@dataclass
26class ProcessingState:
27 """Persistent state for automated issue processing.
29 Enables resume capability after interruption by tracking:
30 - Currently processing issue
31 - Completed issues
32 - Failed issues with reasons
33 - Timing information
34 - Auto-corrections made during validation
36 Attributes:
37 current_issue: Path to currently processing issue file
38 phase: Current processing phase
39 timestamp: Last update timestamp
40 completed_issues: List of completed issue IDs
41 failed_issues: Mapping of issue ID to failure reason
42 attempted_issues: Set of issues already attempted
43 timing: Per-issue timing breakdown
44 corrections: Mapping of issue ID to list of corrections made
45 """
47 current_issue: str = ""
48 phase: str = "idle"
49 timestamp: str = ""
50 completed_issues: list[str] = field(default_factory=list)
51 failed_issues: dict[str, str] = field(default_factory=dict)
52 attempted_issues: set[str] = field(default_factory=set)
53 timing: dict[str, dict[str, float]] = field(default_factory=dict)
54 corrections: dict[str, list[str]] = field(default_factory=dict)
56 def to_dict(self) -> dict[str, Any]:
57 """Convert state to dictionary for JSON serialization."""
58 return {
59 "current_issue": self.current_issue,
60 "phase": self.phase,
61 "timestamp": self.timestamp,
62 "completed_issues": self.completed_issues,
63 "failed_issues": self.failed_issues,
64 "attempted_issues": list(self.attempted_issues),
65 "timing": self.timing,
66 "corrections": self.corrections,
67 }
69 @classmethod
70 def from_dict(cls, data: dict[str, Any]) -> ProcessingState:
71 """Create state from dictionary (JSON deserialization)."""
72 return cls(
73 current_issue=data.get("current_issue", ""),
74 phase=data.get("phase", "idle"),
75 timestamp=data.get("timestamp", ""),
76 completed_issues=list(data.get("completed_issues", [])),
77 failed_issues=dict(data.get("failed_issues", {})),
78 attempted_issues=set(data.get("attempted_issues", [])),
79 timing=dict(data.get("timing", {})),
80 corrections=dict(data.get("corrections", {})),
81 )
84class StateManager:
85 """Manages persistence of processing state.
87 Handles loading, saving, and cleanup of state files for
88 automated issue processing with resume capability.
89 """
91 def __init__(self, state_file: Path, logger: Logger, event_bus: EventBus | None = None) -> None:
92 """Initialize state manager.
94 Args:
95 state_file: Path to the state file
96 logger: Logger instance for output
97 event_bus: Optional EventBus for emitting state transition events
98 """
99 self.state_file = state_file
100 self.logger = logger
101 self._event_bus = event_bus
102 self._state: ProcessingState | None = None
104 def _emit(self, event_type: str, payload: dict[str, Any]) -> None:
105 """Emit an event via the EventBus if available."""
106 if self._event_bus:
107 self._event_bus.emit({"event": event_type, "ts": _iso_now(), **payload})
109 @property
110 def state(self) -> ProcessingState:
111 """Get current state, creating new if needed."""
112 if self._state is None:
113 self._state = ProcessingState(timestamp=_iso_now())
114 return self._state
116 def load(self) -> ProcessingState | None:
117 """Load state from file.
119 Returns:
120 Loaded state or None if file doesn't exist
121 """
122 try:
123 if self.state_file.exists():
124 data = json.loads(self.state_file.read_text())
125 self._state = ProcessingState.from_dict(data)
126 self.logger.info(f"State loaded from {self.state_file}")
127 return self._state
128 except json.JSONDecodeError as e:
129 self.logger.error(f"Failed to parse state file: {e}")
130 except Exception as e:
131 self.logger.error(f"Failed to load state: {e}")
132 return None
134 def save(self) -> None:
135 """Save current state to file using an atomic write.
137 Writes to a temporary file in the same directory, then renames it over
138 the target path via os.replace. This ensures the state file is always
139 either the previous valid version or the new valid version — never an
140 empty or partially-written file.
141 """
142 try:
143 self.state.timestamp = _iso_now()
144 data = json.dumps(self.state.to_dict(), indent=2)
145 tmp_fd, tmp_path = tempfile.mkstemp(dir=self.state_file.parent, suffix=".tmp")
146 try:
147 with os.fdopen(tmp_fd, "w") as f:
148 f.write(data)
149 os.replace(tmp_path, self.state_file)
150 except Exception:
151 os.unlink(tmp_path)
152 raise
153 self.logger.info(f"State saved to {self.state_file}")
154 except Exception as e:
155 self.logger.error(f"Failed to save state: {e}")
157 def cleanup(self) -> None:
158 """Remove state file."""
159 try:
160 if self.state_file.exists():
161 self.state_file.unlink()
162 self.logger.info("State file cleaned up")
163 except Exception as e:
164 self.logger.error(f"Failed to cleanup state file: {e}")
166 def update_current(self, issue_path: str, phase: str) -> None:
167 """Update current issue and phase.
169 Args:
170 issue_path: Path to current issue file
171 phase: Current processing phase
172 """
173 self.state.current_issue = issue_path
174 self.state.phase = phase
175 self.save()
177 def mark_attempted(self, issue_id: str, *, save: bool = True) -> None:
178 """Mark an issue as attempted.
180 Args:
181 issue_id: Issue identifier
182 save: Whether to persist state immediately (default True)
183 """
184 self.state.attempted_issues.add(issue_id)
185 if save:
186 self.save()
188 def mark_completed(self, issue_id: str, timing: dict[str, float] | None = None) -> None:
189 """Mark an issue as completed.
191 Args:
192 issue_id: Issue identifier
193 timing: Optional timing breakdown
194 """
195 self.state.completed_issues.append(issue_id)
196 if timing:
197 self.state.timing[issue_id] = timing
198 self.state.current_issue = ""
199 self.state.phase = "idle"
200 self.save()
201 self._emit("state.issue_completed", {"issue_id": issue_id, "status": "completed"})
203 def mark_failed(self, issue_id: str, reason: str) -> None:
204 """Mark an issue as failed.
206 Args:
207 issue_id: Issue identifier
208 reason: Failure reason
209 """
210 self.state.failed_issues[issue_id] = reason
211 self.save()
212 self._emit(
213 "state.issue_failed", {"issue_id": issue_id, "reason": reason, "status": "failed"}
214 )
216 def is_attempted(self, issue_id: str) -> bool:
217 """Check if an issue has been attempted.
219 Args:
220 issue_id: Issue identifier
222 Returns:
223 True if issue was already attempted
224 """
225 return issue_id in self.state.attempted_issues
227 def record_corrections(self, issue_id: str, corrections: list[str]) -> None:
228 """Record corrections made to an issue.
230 Args:
231 issue_id: Issue identifier
232 corrections: List of correction descriptions
233 """
234 if corrections:
235 self.state.corrections[issue_id] = corrections
236 self.save()