Coverage for little_loops / fsm / concurrency.py: 99%
122 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Scope-based concurrency control for FSM loops.
3Prevents concurrent loops from conflicting when operating on
4the same files or directories through file-based locking.
6Public exports:
7 ScopeLock: Dataclass representing a scope lock
8 LockManager: Manager for acquiring/releasing scope locks
9"""
11from __future__ import annotations
13import errno
14import fcntl
15import json
16import os
17import time
18from dataclasses import dataclass
19from datetime import UTC, datetime
20from pathlib import Path
21from typing import Any
23RUNNING_DIR = ".running"
26def _process_alive(pid: int) -> bool:
27 """Check if a process is still running.
29 Returns True if alive (or alive but unreadable due to EPERM),
30 False only if process does not exist (ESRCH).
31 """
32 try:
33 os.kill(pid, 0)
34 return True
35 except OSError as e:
36 if e.errno == errno.ESRCH:
37 return False # No such process
38 return True # EPERM or other: process exists, no permission
41def _iso_now() -> str:
42 """Return current time as ISO8601 string."""
43 return datetime.now(UTC).isoformat()
46@dataclass
47class ScopeLock:
48 """Represents a lock on a set of paths for a running loop.
50 Attributes:
51 loop_name: Name of the loop holding the lock
52 scope: List of paths this loop operates on
53 pid: Process ID of the lock holder
54 started_at: ISO timestamp when lock was acquired
55 """
57 loop_name: str
58 scope: list[str]
59 pid: int
60 started_at: str
62 def to_dict(self) -> dict[str, Any]:
63 """Convert to dictionary for JSON serialization."""
64 return {
65 "loop_name": self.loop_name,
66 "scope": self.scope,
67 "pid": self.pid,
68 "started_at": self.started_at,
69 }
71 @classmethod
72 def from_dict(cls, data: dict[str, Any]) -> ScopeLock:
73 """Create from dictionary (JSON deserialization)."""
74 return cls(
75 loop_name=str(data["loop_name"]),
76 scope=list(data["scope"]) if isinstance(data["scope"], list) else [str(data["scope"])],
77 pid=int(data["pid"]),
78 started_at=str(data["started_at"]),
79 )
82class LockManager:
83 """Manage scope-based locks for concurrent loop execution.
85 Lock files are stored in .loops/.running/<instance_id>.lock
86 and contain JSON with ScopeLock data.
87 """
89 def __init__(self, loops_dir: Path | None = None) -> None:
90 """Initialize the lock manager.
92 Args:
93 loops_dir: Base directory for loops (default: .loops)
94 """
95 self.loops_dir = loops_dir or Path(".loops")
96 self.running_dir = self.loops_dir / RUNNING_DIR
98 def acquire(self, loop_name: str, scope: list[str], instance_id: str | None = None) -> bool:
99 """Attempt to acquire lock for the given scope.
101 Args:
102 loop_name: Name of the loop to acquire lock for
103 scope: List of paths the loop operates on
104 instance_id: Optional unique instance identifier; falls back to loop_name when None
106 Returns:
107 True if lock acquired, False if conflict exists
108 """
109 # Normalize scope - empty means whole project
110 if not scope:
111 scope = ["."]
112 scope = [self._normalize_path(p) for p in scope]
114 # Ensure running directory exists before opening sentinel lock
115 self.running_dir.mkdir(parents=True, exist_ok=True)
117 # Serialize the check-and-create sequence across processes using a
118 # sentinel file. This eliminates the TOCTOU window between
119 # find_conflict() (read) and lock-file creation (write).
120 # .acquire.lock is a dotfile so Path.glob("*.lock") will not match it
121 # and stale-lock cleanup in find_conflict/list_locks ignores it.
122 dir_lock_path = self.running_dir / ".acquire.lock"
123 with open(dir_lock_path, "w") as dir_lock:
124 fcntl.flock(dir_lock, fcntl.LOCK_EX)
126 # Check for conflicts (now atomic with write below)
127 conflict = self.find_conflict(scope)
128 if conflict:
129 return False
131 # Create lock file
132 lock_file = self.running_dir / f"{instance_id or loop_name}.lock"
133 lock = ScopeLock(
134 loop_name=loop_name,
135 scope=scope,
136 pid=os.getpid(),
137 started_at=_iso_now(),
138 )
139 with open(lock_file, "w") as f:
140 json.dump(lock.to_dict(), f)
142 return True
144 def release(self, loop_name: str, instance_id: str | None = None) -> None:
145 """Release lock for a loop.
147 Args:
148 loop_name: Name of the loop to release lock for
149 instance_id: Optional unique instance identifier; falls back to loop_name when None
150 """
151 lock_file = self.running_dir / f"{instance_id or loop_name}.lock"
152 lock_file.unlink(missing_ok=True)
154 def find_conflict(self, scope: list[str]) -> ScopeLock | None:
155 """Find any running loop with overlapping scope.
157 Also cleans up stale locks from dead processes.
159 Args:
160 scope: Scope to check for conflicts
162 Returns:
163 ScopeLock of conflicting loop, or None if no conflict
164 """
165 if not self.running_dir.exists():
166 return None
168 # Normalize once before the comparison loop to avoid O(n*m) stat calls
169 normalized_scope = [self._normalize_path(p) for p in scope]
171 for lock_file in self.running_dir.glob("*.lock"):
172 try:
173 with open(lock_file) as f:
174 data = json.load(f)
175 lock = ScopeLock.from_dict(data)
177 # Check if process is still alive
178 if not self._process_alive(lock.pid):
179 # Stale lock, remove it
180 lock_file.unlink(missing_ok=True)
181 continue
183 # Normalize lock scope (lock files from acquire() are already
184 # absolute, but normalize defensively in case of legacy files)
185 lock_scope = [self._normalize_path(p) for p in lock.scope]
186 if self._scopes_overlap(normalized_scope, lock_scope):
187 return lock
189 except (json.JSONDecodeError, KeyError, FileNotFoundError):
190 # Malformed or deleted lock file, skip
191 continue
193 return None
195 def list_locks(self) -> list[ScopeLock]:
196 """List all active locks.
198 Cleans up stale locks as a side effect.
200 Returns:
201 List of active ScopeLock objects
202 """
203 locks: list[ScopeLock] = []
204 if not self.running_dir.exists():
205 return locks
207 for lock_file in self.running_dir.glob("*.lock"):
208 try:
209 with open(lock_file) as f:
210 data = json.load(f)
211 lock = ScopeLock.from_dict(data)
213 if self._process_alive(lock.pid):
214 locks.append(lock)
215 else:
216 # Stale lock, remove it
217 lock_file.unlink(missing_ok=True)
218 except (json.JSONDecodeError, KeyError, FileNotFoundError):
219 continue
221 return locks
223 def wait_for_scope(self, scope: list[str], timeout: int = 300) -> bool:
224 """Wait until scope is available.
226 Args:
227 scope: Scope to wait for
228 timeout: Maximum time to wait in seconds
230 Returns:
231 True if scope became available, False if timeout
232 """
233 start = time.time()
234 while time.time() - start < timeout:
235 conflict = self.find_conflict(scope)
236 if conflict is None:
237 return True
238 time.sleep(1)
240 return False
242 def _scopes_overlap(self, scope1: list[str], scope2: list[str]) -> bool:
243 """Check if two scopes have any overlapping paths."""
244 for p1 in scope1:
245 for p2 in scope2:
246 if self._paths_overlap(p1, p2):
247 return True
248 return False
250 def _paths_overlap(self, path1: str, path2: str) -> bool:
251 """Check if two paths overlap (same, or one contains the other).
253 Assumes paths are already normalized (pre-resolved absolute strings).
254 """
255 p1 = Path(path1)
256 p2 = Path(path2)
258 # Same path
259 if p1 == p2:
260 return True
262 # One is parent of other
263 try:
264 p1.relative_to(p2)
265 return True
266 except ValueError:
267 pass
269 try:
270 p2.relative_to(p1)
271 return True
272 except ValueError:
273 pass
275 return False
277 def _normalize_path(self, path: str) -> str:
278 """Normalize path for consistent comparison."""
279 return str(Path(path).resolve())
281 def _process_alive(self, pid: int) -> bool:
282 """Check if process is still running."""
283 return _process_alive(pid)