Coverage for little_loops / fsm / concurrency.py: 99%

122 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Scope-based concurrency control for FSM loops. 

2 

3Prevents concurrent loops from conflicting when operating on 

4the same files or directories through file-based locking. 

5 

6Public exports: 

7 ScopeLock: Dataclass representing a scope lock 

8 LockManager: Manager for acquiring/releasing scope locks 

9""" 

10 

11from __future__ import annotations 

12 

13import errno 

14import fcntl 

15import json 

16import os 

17import time 

18from dataclasses import dataclass 

19from datetime import UTC, datetime 

20from pathlib import Path 

21from typing import Any 

22 

23RUNNING_DIR = ".running" 

24 

25 

26def _process_alive(pid: int) -> bool: 

27 """Check if a process is still running. 

28 

29 Returns True if alive (or alive but unreadable due to EPERM), 

30 False only if process does not exist (ESRCH). 

31 """ 

32 try: 

33 os.kill(pid, 0) 

34 return True 

35 except OSError as e: 

36 if e.errno == errno.ESRCH: 

37 return False # No such process 

38 return True # EPERM or other: process exists, no permission 

39 

40 

41def _iso_now() -> str: 

42 """Return current time as ISO8601 string.""" 

43 return datetime.now(UTC).isoformat() 

44 

45 

46@dataclass 

47class ScopeLock: 

48 """Represents a lock on a set of paths for a running loop. 

49 

50 Attributes: 

51 loop_name: Name of the loop holding the lock 

52 scope: List of paths this loop operates on 

53 pid: Process ID of the lock holder 

54 started_at: ISO timestamp when lock was acquired 

55 """ 

56 

57 loop_name: str 

58 scope: list[str] 

59 pid: int 

60 started_at: str 

61 

62 def to_dict(self) -> dict[str, Any]: 

63 """Convert to dictionary for JSON serialization.""" 

64 return { 

65 "loop_name": self.loop_name, 

66 "scope": self.scope, 

67 "pid": self.pid, 

68 "started_at": self.started_at, 

69 } 

70 

71 @classmethod 

72 def from_dict(cls, data: dict[str, Any]) -> ScopeLock: 

73 """Create from dictionary (JSON deserialization).""" 

74 return cls( 

75 loop_name=str(data["loop_name"]), 

76 scope=list(data["scope"]) if isinstance(data["scope"], list) else [str(data["scope"])], 

77 pid=int(data["pid"]), 

78 started_at=str(data["started_at"]), 

79 ) 

80 

81 

82class LockManager: 

83 """Manage scope-based locks for concurrent loop execution. 

84 

85 Lock files are stored in .loops/.running/<instance_id>.lock 

86 and contain JSON with ScopeLock data. 

87 """ 

88 

89 def __init__(self, loops_dir: Path | None = None) -> None: 

90 """Initialize the lock manager. 

91 

92 Args: 

93 loops_dir: Base directory for loops (default: .loops) 

94 """ 

95 self.loops_dir = loops_dir or Path(".loops") 

96 self.running_dir = self.loops_dir / RUNNING_DIR 

97 

98 def acquire(self, loop_name: str, scope: list[str], instance_id: str | None = None) -> bool: 

99 """Attempt to acquire lock for the given scope. 

100 

101 Args: 

102 loop_name: Name of the loop to acquire lock for 

103 scope: List of paths the loop operates on 

104 instance_id: Optional unique instance identifier; falls back to loop_name when None 

105 

106 Returns: 

107 True if lock acquired, False if conflict exists 

108 """ 

109 # Normalize scope - empty means whole project 

110 if not scope: 

111 scope = ["."] 

112 scope = [self._normalize_path(p) for p in scope] 

113 

114 # Ensure running directory exists before opening sentinel lock 

115 self.running_dir.mkdir(parents=True, exist_ok=True) 

116 

117 # Serialize the check-and-create sequence across processes using a 

118 # sentinel file. This eliminates the TOCTOU window between 

119 # find_conflict() (read) and lock-file creation (write). 

120 # .acquire.lock is a dotfile so Path.glob("*.lock") will not match it 

121 # and stale-lock cleanup in find_conflict/list_locks ignores it. 

122 dir_lock_path = self.running_dir / ".acquire.lock" 

123 with open(dir_lock_path, "w") as dir_lock: 

124 fcntl.flock(dir_lock, fcntl.LOCK_EX) 

125 

126 # Check for conflicts (now atomic with write below) 

127 conflict = self.find_conflict(scope) 

128 if conflict: 

129 return False 

130 

131 # Create lock file 

132 lock_file = self.running_dir / f"{instance_id or loop_name}.lock" 

133 lock = ScopeLock( 

134 loop_name=loop_name, 

135 scope=scope, 

136 pid=os.getpid(), 

137 started_at=_iso_now(), 

138 ) 

139 with open(lock_file, "w") as f: 

140 json.dump(lock.to_dict(), f) 

141 

142 return True 

143 

144 def release(self, loop_name: str, instance_id: str | None = None) -> None: 

145 """Release lock for a loop. 

146 

147 Args: 

148 loop_name: Name of the loop to release lock for 

149 instance_id: Optional unique instance identifier; falls back to loop_name when None 

150 """ 

151 lock_file = self.running_dir / f"{instance_id or loop_name}.lock" 

152 lock_file.unlink(missing_ok=True) 

153 

154 def find_conflict(self, scope: list[str]) -> ScopeLock | None: 

155 """Find any running loop with overlapping scope. 

156 

157 Also cleans up stale locks from dead processes. 

158 

159 Args: 

160 scope: Scope to check for conflicts 

161 

162 Returns: 

163 ScopeLock of conflicting loop, or None if no conflict 

164 """ 

165 if not self.running_dir.exists(): 

166 return None 

167 

168 # Normalize once before the comparison loop to avoid O(n*m) stat calls 

169 normalized_scope = [self._normalize_path(p) for p in scope] 

170 

171 for lock_file in self.running_dir.glob("*.lock"): 

172 try: 

173 with open(lock_file) as f: 

174 data = json.load(f) 

175 lock = ScopeLock.from_dict(data) 

176 

177 # Check if process is still alive 

178 if not self._process_alive(lock.pid): 

179 # Stale lock, remove it 

180 lock_file.unlink(missing_ok=True) 

181 continue 

182 

183 # Normalize lock scope (lock files from acquire() are already 

184 # absolute, but normalize defensively in case of legacy files) 

185 lock_scope = [self._normalize_path(p) for p in lock.scope] 

186 if self._scopes_overlap(normalized_scope, lock_scope): 

187 return lock 

188 

189 except (json.JSONDecodeError, KeyError, FileNotFoundError): 

190 # Malformed or deleted lock file, skip 

191 continue 

192 

193 return None 

194 

195 def list_locks(self) -> list[ScopeLock]: 

196 """List all active locks. 

197 

198 Cleans up stale locks as a side effect. 

199 

200 Returns: 

201 List of active ScopeLock objects 

202 """ 

203 locks: list[ScopeLock] = [] 

204 if not self.running_dir.exists(): 

205 return locks 

206 

207 for lock_file in self.running_dir.glob("*.lock"): 

208 try: 

209 with open(lock_file) as f: 

210 data = json.load(f) 

211 lock = ScopeLock.from_dict(data) 

212 

213 if self._process_alive(lock.pid): 

214 locks.append(lock) 

215 else: 

216 # Stale lock, remove it 

217 lock_file.unlink(missing_ok=True) 

218 except (json.JSONDecodeError, KeyError, FileNotFoundError): 

219 continue 

220 

221 return locks 

222 

223 def wait_for_scope(self, scope: list[str], timeout: int = 300) -> bool: 

224 """Wait until scope is available. 

225 

226 Args: 

227 scope: Scope to wait for 

228 timeout: Maximum time to wait in seconds 

229 

230 Returns: 

231 True if scope became available, False if timeout 

232 """ 

233 start = time.time() 

234 while time.time() - start < timeout: 

235 conflict = self.find_conflict(scope) 

236 if conflict is None: 

237 return True 

238 time.sleep(1) 

239 

240 return False 

241 

242 def _scopes_overlap(self, scope1: list[str], scope2: list[str]) -> bool: 

243 """Check if two scopes have any overlapping paths.""" 

244 for p1 in scope1: 

245 for p2 in scope2: 

246 if self._paths_overlap(p1, p2): 

247 return True 

248 return False 

249 

250 def _paths_overlap(self, path1: str, path2: str) -> bool: 

251 """Check if two paths overlap (same, or one contains the other). 

252 

253 Assumes paths are already normalized (pre-resolved absolute strings). 

254 """ 

255 p1 = Path(path1) 

256 p2 = Path(path2) 

257 

258 # Same path 

259 if p1 == p2: 

260 return True 

261 

262 # One is parent of other 

263 try: 

264 p1.relative_to(p2) 

265 return True 

266 except ValueError: 

267 pass 

268 

269 try: 

270 p2.relative_to(p1) 

271 return True 

272 except ValueError: 

273 pass 

274 

275 return False 

276 

277 def _normalize_path(self, path: str) -> str: 

278 """Normalize path for consistent comparison.""" 

279 return str(Path(path).resolve()) 

280 

281 def _process_alive(self, pid: int) -> bool: 

282 """Check if process is still running.""" 

283 return _process_alive(pid)