Coverage for greyhorse / app / resources / stream.py: 100%

36 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 00:49 +0300

1from __future__ import annotations 

2 

3import enum 

4import logging 

5from collections.abc import Callable 

6from dataclasses import dataclass 

7 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class ResourceEventKind(enum.Enum): 

13 PUBLISHED = enum.auto() 

14 REMOVED = enum.auto() 

15 REPLACED = enum.auto() 

16 DEGRADED = enum.auto() 

17 READY = enum.auto() 

18 

19 

20@dataclass(slots=True, frozen=True) 

21class ResourceEvent[T]: 

22 kind: ResourceEventKind 

23 epoch: int 

24 value: T | None = None 

25 reason: str | None = None 

26 

27 

28class ResourceEventStream[T]: 

29 __slots__ = ('_next_id', '_subscribers') 

30 

31 def __init__(self) -> None: 

32 self._next_id = 0 

33 self._subscribers: dict[int, Callable[[ResourceEvent[T]], None]] = {} 

34 

35 def subscribe(self, callback: Callable[[ResourceEvent[T]], None]) -> int: 

36 sub_id = self._next_id 

37 self._next_id += 1 

38 self._subscribers[sub_id] = callback 

39 return sub_id 

40 

41 def unsubscribe(self, sub_id: int) -> bool: 

42 return self._subscribers.pop(sub_id, None) is not None 

43 

44 def emit(self, event: ResourceEvent[T]) -> None: 

45 for callback in list(self._subscribers.values()): 

46 try: 

47 callback(event) 

48 except Exception: 

49 logger.debug('ResourceEventStream: callback error suppressed')