Coverage for little_loops / state.py: 96%

101 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""State persistence for little-loops automation. 

2 

3Provides state management for resume capability during automated processing. 

4""" 

5 

6from __future__ import annotations 

7 

8import json 

9import os 

10import tempfile 

11from dataclasses import dataclass, field 

12from datetime import UTC, datetime 

13from pathlib import Path 

14from typing import Any 

15 

16from little_loops.events import EventBus 

17from little_loops.logger import Logger 

18 

19 

20def _iso_now() -> str: 

21 """Get current time as ISO 8601 string.""" 

22 return datetime.now(UTC).isoformat() 

23 

24 

25@dataclass 

26class ProcessingState: 

27 """Persistent state for automated issue processing. 

28 

29 Enables resume capability after interruption by tracking: 

30 - Currently processing issue 

31 - Completed issues 

32 - Failed issues with reasons 

33 - Timing information 

34 - Auto-corrections made during validation 

35 

36 Attributes: 

37 current_issue: Path to currently processing issue file 

38 phase: Current processing phase 

39 timestamp: Last update timestamp 

40 completed_issues: List of completed issue IDs 

41 failed_issues: Mapping of issue ID to failure reason 

42 attempted_issues: Set of issues already attempted 

43 timing: Per-issue timing breakdown 

44 corrections: Mapping of issue ID to list of corrections made 

45 """ 

46 

47 current_issue: str = "" 

48 phase: str = "idle" 

49 timestamp: str = "" 

50 completed_issues: list[str] = field(default_factory=list) 

51 failed_issues: dict[str, str] = field(default_factory=dict) 

52 attempted_issues: set[str] = field(default_factory=set) 

53 timing: dict[str, dict[str, float]] = field(default_factory=dict) 

54 corrections: dict[str, list[str]] = field(default_factory=dict) 

55 

56 def to_dict(self) -> dict[str, Any]: 

57 """Convert state to dictionary for JSON serialization.""" 

58 return { 

59 "current_issue": self.current_issue, 

60 "phase": self.phase, 

61 "timestamp": self.timestamp, 

62 "completed_issues": self.completed_issues, 

63 "failed_issues": self.failed_issues, 

64 "attempted_issues": list(self.attempted_issues), 

65 "timing": self.timing, 

66 "corrections": self.corrections, 

67 } 

68 

69 @classmethod 

70 def from_dict(cls, data: dict[str, Any]) -> ProcessingState: 

71 """Create state from dictionary (JSON deserialization).""" 

72 return cls( 

73 current_issue=data.get("current_issue", ""), 

74 phase=data.get("phase", "idle"), 

75 timestamp=data.get("timestamp", ""), 

76 completed_issues=list(data.get("completed_issues", [])), 

77 failed_issues=dict(data.get("failed_issues", {})), 

78 attempted_issues=set(data.get("attempted_issues", [])), 

79 timing=dict(data.get("timing", {})), 

80 corrections=dict(data.get("corrections", {})), 

81 ) 

82 

83 

84class StateManager: 

85 """Manages persistence of processing state. 

86 

87 Handles loading, saving, and cleanup of state files for 

88 automated issue processing with resume capability. 

89 """ 

90 

91 def __init__(self, state_file: Path, logger: Logger, event_bus: EventBus | None = None) -> None: 

92 """Initialize state manager. 

93 

94 Args: 

95 state_file: Path to the state file 

96 logger: Logger instance for output 

97 event_bus: Optional EventBus for emitting state transition events 

98 """ 

99 self.state_file = state_file 

100 self.logger = logger 

101 self._event_bus = event_bus 

102 self._state: ProcessingState | None = None 

103 

104 def _emit(self, event_type: str, payload: dict[str, Any]) -> None: 

105 """Emit an event via the EventBus if available.""" 

106 if self._event_bus: 

107 self._event_bus.emit({"event": event_type, "ts": _iso_now(), **payload}) 

108 

109 @property 

110 def state(self) -> ProcessingState: 

111 """Get current state, creating new if needed.""" 

112 if self._state is None: 

113 self._state = ProcessingState(timestamp=_iso_now()) 

114 return self._state 

115 

116 def load(self) -> ProcessingState | None: 

117 """Load state from file. 

118 

119 Returns: 

120 Loaded state or None if file doesn't exist 

121 """ 

122 try: 

123 if self.state_file.exists(): 

124 data = json.loads(self.state_file.read_text()) 

125 self._state = ProcessingState.from_dict(data) 

126 self.logger.info(f"State loaded from {self.state_file}") 

127 return self._state 

128 except json.JSONDecodeError as e: 

129 self.logger.error(f"Failed to parse state file: {e}") 

130 except Exception as e: 

131 self.logger.error(f"Failed to load state: {e}") 

132 return None 

133 

134 def save(self) -> None: 

135 """Save current state to file using an atomic write. 

136 

137 Writes to a temporary file in the same directory, then renames it over 

138 the target path via os.replace. This ensures the state file is always 

139 either the previous valid version or the new valid version — never an 

140 empty or partially-written file. 

141 """ 

142 try: 

143 self.state.timestamp = _iso_now() 

144 data = json.dumps(self.state.to_dict(), indent=2) 

145 tmp_fd, tmp_path = tempfile.mkstemp(dir=self.state_file.parent, suffix=".tmp") 

146 try: 

147 with os.fdopen(tmp_fd, "w") as f: 

148 f.write(data) 

149 os.replace(tmp_path, self.state_file) 

150 except Exception: 

151 os.unlink(tmp_path) 

152 raise 

153 self.logger.info(f"State saved to {self.state_file}") 

154 except Exception as e: 

155 self.logger.error(f"Failed to save state: {e}") 

156 

157 def cleanup(self) -> None: 

158 """Remove state file.""" 

159 try: 

160 if self.state_file.exists(): 

161 self.state_file.unlink() 

162 self.logger.info("State file cleaned up") 

163 except Exception as e: 

164 self.logger.error(f"Failed to cleanup state file: {e}") 

165 

166 def update_current(self, issue_path: str, phase: str) -> None: 

167 """Update current issue and phase. 

168 

169 Args: 

170 issue_path: Path to current issue file 

171 phase: Current processing phase 

172 """ 

173 self.state.current_issue = issue_path 

174 self.state.phase = phase 

175 self.save() 

176 

177 def mark_attempted(self, issue_id: str, *, save: bool = True) -> None: 

178 """Mark an issue as attempted. 

179 

180 Args: 

181 issue_id: Issue identifier 

182 save: Whether to persist state immediately (default True) 

183 """ 

184 self.state.attempted_issues.add(issue_id) 

185 if save: 

186 self.save() 

187 

188 def mark_completed(self, issue_id: str, timing: dict[str, float] | None = None) -> None: 

189 """Mark an issue as completed. 

190 

191 Args: 

192 issue_id: Issue identifier 

193 timing: Optional timing breakdown 

194 """ 

195 self.state.completed_issues.append(issue_id) 

196 if timing: 

197 self.state.timing[issue_id] = timing 

198 self.state.current_issue = "" 

199 self.state.phase = "idle" 

200 self.save() 

201 self._emit("state.issue_completed", {"issue_id": issue_id, "status": "completed"}) 

202 

203 def mark_failed(self, issue_id: str, reason: str) -> None: 

204 """Mark an issue as failed. 

205 

206 Args: 

207 issue_id: Issue identifier 

208 reason: Failure reason 

209 """ 

210 self.state.failed_issues[issue_id] = reason 

211 self.save() 

212 self._emit( 

213 "state.issue_failed", {"issue_id": issue_id, "reason": reason, "status": "failed"} 

214 ) 

215 

216 def is_attempted(self, issue_id: str) -> bool: 

217 """Check if an issue has been attempted. 

218 

219 Args: 

220 issue_id: Issue identifier 

221 

222 Returns: 

223 True if issue was already attempted 

224 """ 

225 return issue_id in self.state.attempted_issues 

226 

227 def record_corrections(self, issue_id: str, corrections: list[str]) -> None: 

228 """Record corrections made to an issue. 

229 

230 Args: 

231 issue_id: Issue identifier 

232 corrections: List of correction descriptions 

233 """ 

234 if corrections: 

235 self.state.corrections[issue_id] = corrections 

236 self.save()