Coverage for greyhorse / app / resources / slot.py: 97%
209 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 00:50 +0300
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 00:50 +0300
1from __future__ import annotations
3import enum
4from dataclasses import dataclass
5from typing import Any
7from greyhorse.app.abc.resources.borrow import BorrowError, BorrowMutError
8from greyhorse.app.abc.resources.common import ResourceStatus
9from greyhorse.result import Ok, Result
11from .errors import ProviderReadyError
12from .stream import ResourceEvent, ResourceEventKind, ResourceEventStream
15type ProviderAccessError = ProviderReadyError | BorrowError
16type ProviderMutAccessError = ProviderReadyError | BorrowMutError
19@dataclass(slots=True, frozen=True)
20class ResourceSnapshot[T]:
21 status: ResourceStatus
22 epoch: int
23 value: T | None = None
24 reason: str | None = None
27class _PendingKind(enum.Enum):
28 REPLACE = enum.auto()
29 REMOVE = enum.auto()
32@dataclass(slots=True)
33class _PendingTransition[T]:
34 kind: _PendingKind
35 next_value: T | None = None
36 reason: str | None = None
39class _SlotState[T]:
40 __slots__ = (
41 '_epoch',
42 '_mut_count',
43 '_name',
44 '_pending',
45 '_reason',
46 '_shared_count',
47 '_status',
48 '_stream',
49 '_value',
50 )
52 def __init__(self, name: str) -> None:
53 self._name = name
54 self._value: T | None = None
55 self._status: ResourceStatus = ResourceStatus.Idle
56 self._epoch = 0
57 self._shared_count = 0
58 self._mut_count = 0
59 self._pending: _PendingTransition[T] | None = None
60 self._reason: str | None = None
61 self._stream: ResourceEventStream[T] = ResourceEventStream()
63 @property
64 def name(self) -> str:
65 return self._name
67 @property
68 def epoch(self) -> int:
69 return self._epoch
71 def snapshot(self) -> ResourceSnapshot[T]:
72 return ResourceSnapshot(
73 status=self._status,
74 epoch=self._epoch,
75 value=self._value,
76 reason=self._reason,
77 )
79 def events(self) -> ResourceEventStream[T]:
80 return self._stream
82 def has_active_scopes(self) -> bool:
83 return self._shared_count > 0 or self._mut_count > 0
85 def check_readiness(self) -> ProviderReadyError | None:
86 if self._status == ResourceStatus.Idle:
87 return ProviderReadyError.NotStarted(name=self._name)
88 if isinstance(self._status, ResourceStatus.Failed):
89 return ProviderReadyError.Degraded(
90 name=self._name,
91 reason=self._status.message, # type: ignore[attr-defined]
92 )
93 if self._pending is not None:
94 match self._pending.kind:
95 case _PendingKind.REPLACE:
96 return ProviderReadyError.Replacing(name=self._name)
97 case _PendingKind.REMOVE:
98 return ProviderReadyError.Removing(name=self._name)
99 return None
101 def try_set_pending(
102 self,
103 kind: _PendingKind,
104 next_value: Any = None,
105 reason: str | None = None,
106 ) -> bool:
107 if (
108 self._pending is not None
109 and self._pending.kind == _PendingKind.REMOVE
110 and kind == _PendingKind.REPLACE
111 ):
112 return False
113 self._pending = _PendingTransition(kind=kind, next_value=next_value, reason=reason)
114 return True
116 # ------------------------------------------------------------------
117 # Scope management
118 # ------------------------------------------------------------------
120 def enter_shared(self) -> Result[T, ProviderAccessError]:
121 err = self.check_readiness()
122 if err is not None:
123 return err.to_result() # type: ignore[return-value]
125 if self._value is None:
126 return BorrowError.Empty(name=self._name).to_result() # type: ignore[return-value]
128 if self._mut_count > 0:
129 return BorrowError.BorrowedAsMutable(name=self._name).to_result() # type: ignore[return-value]
131 self._shared_count += 1
132 self._status = ResourceStatus.Available(alive=True, in_use=True)
133 return Ok(self._value)
135 def exit_shared(self) -> None:
136 if self._shared_count > 0:
137 self._shared_count -= 1
138 if not self.has_active_scopes():
139 self._update_status_after_scope_exit()
140 self._apply_pending()
142 def enter_mut(self) -> Result[T, ProviderMutAccessError]:
143 err = self.check_readiness()
144 if err is not None:
145 return err.to_result() # type: ignore[return-value]
147 if self._value is None:
148 return BorrowMutError.Empty(name=self._name).to_result() # type: ignore[return-value]
150 if self._mut_count > 0:
151 return BorrowMutError.AlreadyBorrowed(name=self._name).to_result() # type: ignore[return-value]
153 if self._shared_count > 0:
154 return BorrowMutError.BorrowedAsImmutable(name=self._name).to_result() # type: ignore[return-value]
156 self._mut_count = 1
157 self._status = ResourceStatus.Acquired(alive=True)
158 return Ok(self._value)
160 def exit_mut(self) -> None:
161 self._mut_count = 0
162 if not self.has_active_scopes():
163 self._update_status_after_scope_exit()
164 self._apply_pending()
166 # ------------------------------------------------------------------
167 # Owner API — returns (epoch, applied)
168 # ------------------------------------------------------------------
170 def publish(self, value: T) -> tuple[int, bool]:
171 if self.has_active_scopes():
172 applied = self.try_set_pending(_PendingKind.REPLACE, next_value=value)
173 return self._epoch, applied
174 self._value = value
175 self._epoch += 1
176 self._status = ResourceStatus.Available(alive=True, in_use=False)
177 self._pending = None
178 self._reason = None
179 self._stream.emit(
180 ResourceEvent(kind=ResourceEventKind.PUBLISHED, epoch=self._epoch, value=value)
181 )
182 return self._epoch, True
184 def replace(self, value: T) -> tuple[int, bool]:
185 if self.has_active_scopes():
186 applied = self.try_set_pending(_PendingKind.REPLACE, next_value=value)
187 return self._epoch, applied
188 self._do_replace(value)
189 return self._epoch, True
191 def remove(self, reason: str | None = None) -> tuple[int, bool]:
192 if self.has_active_scopes():
193 applied = self.try_set_pending(_PendingKind.REMOVE, reason=reason)
194 return self._epoch, applied
195 self._do_remove(reason)
196 return self._epoch, True
198 def degrade(self, reason: str) -> tuple[int, bool]:
199 if self._pending is not None and self._pending.kind == _PendingKind.REMOVE:
200 return self._epoch, False
201 self._status = ResourceStatus.Failed(message=reason)
202 self._reason = reason
203 self._epoch += 1
204 self._stream.emit(
205 ResourceEvent(kind=ResourceEventKind.DEGRADED, epoch=self._epoch, reason=reason)
206 )
207 return self._epoch, True
209 def mark_ready(self) -> tuple[int, bool]:
210 if self._value is not None and self._pending is None:
211 self._status = ResourceStatus.Available(alive=True, in_use=self.has_active_scopes())
212 self._reason = None
213 self._epoch += 1
214 self._stream.emit(ResourceEvent(kind=ResourceEventKind.READY, epoch=self._epoch))
215 return self._epoch, True
216 return self._epoch, False
218 # ------------------------------------------------------------------
219 # Internal
220 # ------------------------------------------------------------------
222 def _update_status_after_scope_exit(self) -> None:
223 if self._value is not None and not isinstance(self._status, ResourceStatus.Failed):
224 self._status = ResourceStatus.Available(alive=True, in_use=False)
226 def _apply_pending(self) -> None:
227 if self._pending is None:
228 return
229 pending = self._pending
230 self._pending = None
231 match pending.kind:
232 case _PendingKind.REPLACE:
233 self._do_replace(pending.next_value)
234 case _PendingKind.REMOVE:
235 self._do_remove(pending.reason)
237 def _do_replace(self, value: Any) -> None:
238 prev_status = self._status
239 self._value = value
240 self._epoch += 1
241 if not isinstance(prev_status, ResourceStatus.Failed):
242 self._status = ResourceStatus.Available(alive=True, in_use=False)
243 self._stream.emit(
244 ResourceEvent(kind=ResourceEventKind.REPLACED, epoch=self._epoch, value=value)
245 )
247 def _do_remove(self, reason: str | None) -> None:
248 self._value = None
249 self._reason = reason
250 self._epoch += 1
251 self._status = ResourceStatus.Idle
252 self._stream.emit(
253 ResourceEvent(kind=ResourceEventKind.REMOVED, epoch=self._epoch, reason=reason)
254 )
257class ConsumerSlot[T]:
258 __slots__ = ('_state',)
260 def __init__(self, state: _SlotState[T]) -> None:
261 self._state = state
263 @property
264 def name(self) -> str:
265 return self._state.name
267 def snapshot(self) -> ResourceSnapshot[T]:
268 return self._state.snapshot()
270 def events(self) -> ResourceEventStream[T]:
271 return self._state.events()
273 def try_enter_shared(self) -> Result[T, ProviderAccessError]:
274 return self._state.enter_shared()
276 def exit_shared(self) -> None:
277 self._state.exit_shared()
279 def try_enter_mut(self) -> Result[T, ProviderMutAccessError]:
280 return self._state.enter_mut()
282 def exit_mut(self) -> None:
283 self._state.exit_mut()
286class OwnerSlot[T]:
287 __slots__ = ('_consumer', '_state')
289 def __init__(self, name: str) -> None:
290 self._state: _SlotState[T] = _SlotState(name)
291 self._consumer: ConsumerSlot[T] = ConsumerSlot(self._state)
293 @property
294 def name(self) -> str:
295 return self._state.name
297 def consumer(self) -> ConsumerSlot[T]:
298 return self._consumer
300 def snapshot(self) -> ResourceSnapshot[T]:
301 return self._state.snapshot()
303 def events(self) -> ResourceEventStream[T]:
304 return self._state.events()
306 def publish(self, value: T) -> tuple[int, bool]:
307 return self._state.publish(value)
309 def replace(self, value: T) -> tuple[int, bool]:
310 return self._state.replace(value)
312 def remove(self, reason: str | None = None) -> tuple[int, bool]:
313 return self._state.remove(reason)
315 def degrade(self, reason: str) -> tuple[int, bool]:
316 return self._state.degrade(reason)
318 def mark_ready(self) -> tuple[int, bool]:
319 return self._state.mark_ready()