Coverage for greyhorse / app / resources / slot.py: 97%

209 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 00:50 +0300

1from __future__ import annotations 

2 

3import enum 

4from dataclasses import dataclass 

5from typing import Any 

6 

7from greyhorse.app.abc.resources.borrow import BorrowError, BorrowMutError 

8from greyhorse.app.abc.resources.common import ResourceStatus 

9from greyhorse.result import Ok, Result 

10 

11from .errors import ProviderReadyError 

12from .stream import ResourceEvent, ResourceEventKind, ResourceEventStream 

13 

14 

15type ProviderAccessError = ProviderReadyError | BorrowError 

16type ProviderMutAccessError = ProviderReadyError | BorrowMutError 

17 

18 

19@dataclass(slots=True, frozen=True) 

20class ResourceSnapshot[T]: 

21 status: ResourceStatus 

22 epoch: int 

23 value: T | None = None 

24 reason: str | None = None 

25 

26 

27class _PendingKind(enum.Enum): 

28 REPLACE = enum.auto() 

29 REMOVE = enum.auto() 

30 

31 

32@dataclass(slots=True) 

33class _PendingTransition[T]: 

34 kind: _PendingKind 

35 next_value: T | None = None 

36 reason: str | None = None 

37 

38 

39class _SlotState[T]: 

40 __slots__ = ( 

41 '_epoch', 

42 '_mut_count', 

43 '_name', 

44 '_pending', 

45 '_reason', 

46 '_shared_count', 

47 '_status', 

48 '_stream', 

49 '_value', 

50 ) 

51 

52 def __init__(self, name: str) -> None: 

53 self._name = name 

54 self._value: T | None = None 

55 self._status: ResourceStatus = ResourceStatus.Idle 

56 self._epoch = 0 

57 self._shared_count = 0 

58 self._mut_count = 0 

59 self._pending: _PendingTransition[T] | None = None 

60 self._reason: str | None = None 

61 self._stream: ResourceEventStream[T] = ResourceEventStream() 

62 

63 @property 

64 def name(self) -> str: 

65 return self._name 

66 

67 @property 

68 def epoch(self) -> int: 

69 return self._epoch 

70 

71 def snapshot(self) -> ResourceSnapshot[T]: 

72 return ResourceSnapshot( 

73 status=self._status, 

74 epoch=self._epoch, 

75 value=self._value, 

76 reason=self._reason, 

77 ) 

78 

79 def events(self) -> ResourceEventStream[T]: 

80 return self._stream 

81 

82 def has_active_scopes(self) -> bool: 

83 return self._shared_count > 0 or self._mut_count > 0 

84 

85 def check_readiness(self) -> ProviderReadyError | None: 

86 if self._status == ResourceStatus.Idle: 

87 return ProviderReadyError.NotStarted(name=self._name) 

88 if isinstance(self._status, ResourceStatus.Failed): 

89 return ProviderReadyError.Degraded( 

90 name=self._name, 

91 reason=self._status.message, # type: ignore[attr-defined] 

92 ) 

93 if self._pending is not None: 

94 match self._pending.kind: 

95 case _PendingKind.REPLACE: 

96 return ProviderReadyError.Replacing(name=self._name) 

97 case _PendingKind.REMOVE: 

98 return ProviderReadyError.Removing(name=self._name) 

99 return None 

100 

101 def try_set_pending( 

102 self, 

103 kind: _PendingKind, 

104 next_value: Any = None, 

105 reason: str | None = None, 

106 ) -> bool: 

107 if ( 

108 self._pending is not None 

109 and self._pending.kind == _PendingKind.REMOVE 

110 and kind == _PendingKind.REPLACE 

111 ): 

112 return False 

113 self._pending = _PendingTransition(kind=kind, next_value=next_value, reason=reason) 

114 return True 

115 

116 # ------------------------------------------------------------------ 

117 # Scope management 

118 # ------------------------------------------------------------------ 

119 

120 def enter_shared(self) -> Result[T, ProviderAccessError]: 

121 err = self.check_readiness() 

122 if err is not None: 

123 return err.to_result() # type: ignore[return-value] 

124 

125 if self._value is None: 

126 return BorrowError.Empty(name=self._name).to_result() # type: ignore[return-value] 

127 

128 if self._mut_count > 0: 

129 return BorrowError.BorrowedAsMutable(name=self._name).to_result() # type: ignore[return-value] 

130 

131 self._shared_count += 1 

132 self._status = ResourceStatus.Available(alive=True, in_use=True) 

133 return Ok(self._value) 

134 

135 def exit_shared(self) -> None: 

136 if self._shared_count > 0: 

137 self._shared_count -= 1 

138 if not self.has_active_scopes(): 

139 self._update_status_after_scope_exit() 

140 self._apply_pending() 

141 

142 def enter_mut(self) -> Result[T, ProviderMutAccessError]: 

143 err = self.check_readiness() 

144 if err is not None: 

145 return err.to_result() # type: ignore[return-value] 

146 

147 if self._value is None: 

148 return BorrowMutError.Empty(name=self._name).to_result() # type: ignore[return-value] 

149 

150 if self._mut_count > 0: 

151 return BorrowMutError.AlreadyBorrowed(name=self._name).to_result() # type: ignore[return-value] 

152 

153 if self._shared_count > 0: 

154 return BorrowMutError.BorrowedAsImmutable(name=self._name).to_result() # type: ignore[return-value] 

155 

156 self._mut_count = 1 

157 self._status = ResourceStatus.Acquired(alive=True) 

158 return Ok(self._value) 

159 

160 def exit_mut(self) -> None: 

161 self._mut_count = 0 

162 if not self.has_active_scopes(): 

163 self._update_status_after_scope_exit() 

164 self._apply_pending() 

165 

166 # ------------------------------------------------------------------ 

167 # Owner API — returns (epoch, applied) 

168 # ------------------------------------------------------------------ 

169 

170 def publish(self, value: T) -> tuple[int, bool]: 

171 if self.has_active_scopes(): 

172 applied = self.try_set_pending(_PendingKind.REPLACE, next_value=value) 

173 return self._epoch, applied 

174 self._value = value 

175 self._epoch += 1 

176 self._status = ResourceStatus.Available(alive=True, in_use=False) 

177 self._pending = None 

178 self._reason = None 

179 self._stream.emit( 

180 ResourceEvent(kind=ResourceEventKind.PUBLISHED, epoch=self._epoch, value=value) 

181 ) 

182 return self._epoch, True 

183 

184 def replace(self, value: T) -> tuple[int, bool]: 

185 if self.has_active_scopes(): 

186 applied = self.try_set_pending(_PendingKind.REPLACE, next_value=value) 

187 return self._epoch, applied 

188 self._do_replace(value) 

189 return self._epoch, True 

190 

191 def remove(self, reason: str | None = None) -> tuple[int, bool]: 

192 if self.has_active_scopes(): 

193 applied = self.try_set_pending(_PendingKind.REMOVE, reason=reason) 

194 return self._epoch, applied 

195 self._do_remove(reason) 

196 return self._epoch, True 

197 

198 def degrade(self, reason: str) -> tuple[int, bool]: 

199 if self._pending is not None and self._pending.kind == _PendingKind.REMOVE: 

200 return self._epoch, False 

201 self._status = ResourceStatus.Failed(message=reason) 

202 self._reason = reason 

203 self._epoch += 1 

204 self._stream.emit( 

205 ResourceEvent(kind=ResourceEventKind.DEGRADED, epoch=self._epoch, reason=reason) 

206 ) 

207 return self._epoch, True 

208 

209 def mark_ready(self) -> tuple[int, bool]: 

210 if self._value is not None and self._pending is None: 

211 self._status = ResourceStatus.Available(alive=True, in_use=self.has_active_scopes()) 

212 self._reason = None 

213 self._epoch += 1 

214 self._stream.emit(ResourceEvent(kind=ResourceEventKind.READY, epoch=self._epoch)) 

215 return self._epoch, True 

216 return self._epoch, False 

217 

218 # ------------------------------------------------------------------ 

219 # Internal 

220 # ------------------------------------------------------------------ 

221 

222 def _update_status_after_scope_exit(self) -> None: 

223 if self._value is not None and not isinstance(self._status, ResourceStatus.Failed): 

224 self._status = ResourceStatus.Available(alive=True, in_use=False) 

225 

226 def _apply_pending(self) -> None: 

227 if self._pending is None: 

228 return 

229 pending = self._pending 

230 self._pending = None 

231 match pending.kind: 

232 case _PendingKind.REPLACE: 

233 self._do_replace(pending.next_value) 

234 case _PendingKind.REMOVE: 

235 self._do_remove(pending.reason) 

236 

237 def _do_replace(self, value: Any) -> None: 

238 prev_status = self._status 

239 self._value = value 

240 self._epoch += 1 

241 if not isinstance(prev_status, ResourceStatus.Failed): 

242 self._status = ResourceStatus.Available(alive=True, in_use=False) 

243 self._stream.emit( 

244 ResourceEvent(kind=ResourceEventKind.REPLACED, epoch=self._epoch, value=value) 

245 ) 

246 

247 def _do_remove(self, reason: str | None) -> None: 

248 self._value = None 

249 self._reason = reason 

250 self._epoch += 1 

251 self._status = ResourceStatus.Idle 

252 self._stream.emit( 

253 ResourceEvent(kind=ResourceEventKind.REMOVED, epoch=self._epoch, reason=reason) 

254 ) 

255 

256 

257class ConsumerSlot[T]: 

258 __slots__ = ('_state',) 

259 

260 def __init__(self, state: _SlotState[T]) -> None: 

261 self._state = state 

262 

263 @property 

264 def name(self) -> str: 

265 return self._state.name 

266 

267 def snapshot(self) -> ResourceSnapshot[T]: 

268 return self._state.snapshot() 

269 

270 def events(self) -> ResourceEventStream[T]: 

271 return self._state.events() 

272 

273 def try_enter_shared(self) -> Result[T, ProviderAccessError]: 

274 return self._state.enter_shared() 

275 

276 def exit_shared(self) -> None: 

277 self._state.exit_shared() 

278 

279 def try_enter_mut(self) -> Result[T, ProviderMutAccessError]: 

280 return self._state.enter_mut() 

281 

282 def exit_mut(self) -> None: 

283 self._state.exit_mut() 

284 

285 

286class OwnerSlot[T]: 

287 __slots__ = ('_consumer', '_state') 

288 

289 def __init__(self, name: str) -> None: 

290 self._state: _SlotState[T] = _SlotState(name) 

291 self._consumer: ConsumerSlot[T] = ConsumerSlot(self._state) 

292 

293 @property 

294 def name(self) -> str: 

295 return self._state.name 

296 

297 def consumer(self) -> ConsumerSlot[T]: 

298 return self._consumer 

299 

300 def snapshot(self) -> ResourceSnapshot[T]: 

301 return self._state.snapshot() 

302 

303 def events(self) -> ResourceEventStream[T]: 

304 return self._state.events() 

305 

306 def publish(self, value: T) -> tuple[int, bool]: 

307 return self._state.publish(value) 

308 

309 def replace(self, value: T) -> tuple[int, bool]: 

310 return self._state.replace(value) 

311 

312 def remove(self, reason: str | None = None) -> tuple[int, bool]: 

313 return self._state.remove(reason) 

314 

315 def degrade(self, reason: str) -> tuple[int, bool]: 

316 return self._state.degrade(reason) 

317 

318 def mark_ready(self) -> tuple[int, bool]: 

319 return self._state.mark_ready()