Coverage for greyhorse / river / private / monads / io.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-18 11:33 +0300

1# mypy: disable_error_code="misc,override,operator" 

2"""IO monad — synchronous effectful computation. 

3 

4``IO[T]`` wraps a ``Callable[[], T]`` and defers execution until ``__call__``. 

5It is the canonical ``Effect[T]`` implementation used by ``Resource``. 

6 

7Examples: 

8 :: 

9 

10 io = IO.pure(42) 

11 assert io() == 42 

12 

13 io2 = IO.delay(lambda: 7) 

14 assert io2() == 7 

15 

16 io3 = IO.into(int.__add__, 3, 4) 

17 assert io3() == 7 

18""" 

19 

20from __future__ import annotations 

21 

22import asyncio 

23import logging 

24from collections.abc import Callable 

25from functools import partial 

26from typing import Any 

27 

28from greyhorse.utils.types import TypeWrapper 

29 

30from .effect import Effect, ExitCase 

31 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36class IO[T](Effect[T], TypeWrapper[T]): 

37 """Lazy synchronous effect. 

38 

39 Wraps a thunk (zero-argument callable) and executes it on ``__call__``. 

40 Supports monadic composition via ``map``/``and_then`` and bracket 

41 resource management via ``bracket_case``. 

42 

43 Type parameters: 

44 T: The result type of the computation. 

45 """ 

46 

47 __slots__ = ('_run',) 

48 

49 def __init__(self, run: Callable[[], T]) -> None: 

50 self._run = run 

51 

52 def map[U](self, f: Callable[[T], U]) -> IO[U]: 

53 """Apply a pure function to the result. 

54 

55 Args: 

56 f: Function to apply to the computed value. 

57 

58 Returns: 

59 A new IO that computes ``f(self())``. 

60 """ 

61 def run() -> U: 

62 a = self._run() 

63 return f(a) 

64 

65 return IO(run) 

66 

67 def and_then[U](self, f: Callable[[T], IO[U]]) -> IO[U]: 

68 """Monadic bind — chain this IO with a function returning IO. 

69 

70 Args: 

71 f: Function that takes the result and returns a new IO. 

72 

73 Returns: 

74 A new IO that runs ``self``, then runs ``f(result)``. 

75 """ 

76 def run() -> U: 

77 a = self._run() 

78 return f(a)() 

79 

80 return IO(run) 

81 

82 def __call__(self) -> T: 

83 """Execute the effect and return the result.""" 

84 return self._run() 

85 

86 @staticmethod 

87 def pure(value: T) -> IO[T]: 

88 """Wrap a pure value in IO (no side effects). 

89 

90 Args: 

91 value: The value to wrap. 

92 

93 Examples: 

94 :: 

95 

96 assert IO.pure(42)() == 42 

97 """ 

98 return IO[type(value)](lambda: value) 

99 

100 @staticmethod 

101 def delay(f: Callable[[], T]) -> IO[T]: 

102 """Create IO from a zero-argument callable. 

103 

104 Args: 

105 f: Thunk to defer. 

106 

107 Examples: 

108 :: 

109 

110 assert IO.delay(lambda: 7)() == 7 

111 """ 

112 return IO(f) 

113 

114 @staticmethod 

115 def into[T, **P](f: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> IO[T]: 

116 """Create IO by partially applying a function. 

117 

118 Args: 

119 f: Function to partially apply. 

120 *args: Positional arguments to bind. 

121 **kwargs: Keyword arguments to bind. 

122 

123 Returns: 

124 An IO that calls ``f(*args, **kwargs)`` on execution. 

125 

126 Examples: 

127 :: 

128 

129 def add(a: int, b: int) -> int: 

130 return a + b 

131 

132 assert IO.into(add, 3, 4)() == 7 

133 """ 

134 return IO(partial(f, *args, **kwargs)) 

135 

136 @staticmethod 

137 def wrap[T, **P]( 

138 f: Callable[P, T], *args: P.args, **kwargs: P.kwargs 

139 ) -> Callable[..., IO[T]]: 

140 """Create a factory that produces IO from additional arguments. 

141 

142 Pre-binds some arguments; the returned callable accepts the rest. 

143 

144 Args: 

145 f: Function to partially apply. 

146 *args: Pre-bound positional arguments. 

147 **kwargs: Pre-bound keyword arguments. 

148 

149 Returns: 

150 A callable ``(*extra_args, **extra_kwargs) -> IO[T]``. 

151 """ 

152 def run(*a: Any, **kw: Any) -> IO[T]: 

153 return IO(partial(f, *args, *a, **kwargs, **kw)) 

154 

155 return run 

156 

157 def bracket_case[U]( # type: ignore 

158 self, use: Callable[[T], IO[U]], release: Callable[[T, ExitCase], IO[None]] 

159 ) -> IO[U]: 

160 """Acquire-use-release with ExitCase feedback to the release function. 

161 

162 Executes: ``acquire → use(value) → release(value, exit_case)``. 

163 Release always runs, even on exception or cancellation. 

164 

165 Args: 

166 use: Function applied to the acquired value. 

167 release: Cleanup function receiving the value and ``ExitCase``. 

168 

169 Returns: 

170 An IO that manages the full acquire-use-release lifecycle. 

171 """ 

172 def run() -> U: 

173 value = self._run() 

174 try: 

175 res = use(value)() 

176 except asyncio.CancelledError: 

177 try: 

178 release(value, ExitCase.Canceled)() 

179 except Exception: 

180 logger.debug('IO.bracket_case: ошибка release подавлена (Canceled)') 

181 raise 

182 except Exception as e: 

183 try: 

184 release(value, ExitCase.Errored(exc=e))() 

185 except Exception: 

186 logger.debug('IO.bracket_case: ошибка release подавлена (Errored)') 

187 raise 

188 else: 

189 try: 

190 release(value, ExitCase.Succeeded)() 

191 except Exception: 

192 logger.debug('IO.bracket_case: ошибка release подавлена (Succeeded)') 

193 return res 

194 

195 return IO(run)