Coverage for greyhorse / river / private / monads / io.py: 100%
62 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
1# mypy: disable_error_code="misc,override,operator"
2"""IO monad — synchronous effectful computation.
4``IO[T]`` wraps a ``Callable[[], T]`` and defers execution until ``__call__``.
5It is the canonical ``Effect[T]`` implementation used by ``Resource``.
7Examples:
8 ::
10 io = IO.pure(42)
11 assert io() == 42
13 io2 = IO.delay(lambda: 7)
14 assert io2() == 7
16 io3 = IO.into(int.__add__, 3, 4)
17 assert io3() == 7
18"""
20from __future__ import annotations
22import asyncio
23import logging
24from collections.abc import Callable
25from functools import partial
26from typing import Any
28from greyhorse.utils.types import TypeWrapper
30from .effect import Effect, ExitCase
33logger = logging.getLogger(__name__)
36class IO[T](Effect[T], TypeWrapper[T]):
37 """Lazy synchronous effect.
39 Wraps a thunk (zero-argument callable) and executes it on ``__call__``.
40 Supports monadic composition via ``map``/``and_then`` and bracket
41 resource management via ``bracket_case``.
43 Type parameters:
44 T: The result type of the computation.
45 """
47 __slots__ = ('_run',)
49 def __init__(self, run: Callable[[], T]) -> None:
50 self._run = run
52 def map[U](self, f: Callable[[T], U]) -> IO[U]:
53 """Apply a pure function to the result.
55 Args:
56 f: Function to apply to the computed value.
58 Returns:
59 A new IO that computes ``f(self())``.
60 """
61 def run() -> U:
62 a = self._run()
63 return f(a)
65 return IO(run)
67 def and_then[U](self, f: Callable[[T], IO[U]]) -> IO[U]:
68 """Monadic bind — chain this IO with a function returning IO.
70 Args:
71 f: Function that takes the result and returns a new IO.
73 Returns:
74 A new IO that runs ``self``, then runs ``f(result)``.
75 """
76 def run() -> U:
77 a = self._run()
78 return f(a)()
80 return IO(run)
82 def __call__(self) -> T:
83 """Execute the effect and return the result."""
84 return self._run()
86 @staticmethod
87 def pure(value: T) -> IO[T]:
88 """Wrap a pure value in IO (no side effects).
90 Args:
91 value: The value to wrap.
93 Examples:
94 ::
96 assert IO.pure(42)() == 42
97 """
98 return IO[type(value)](lambda: value)
100 @staticmethod
101 def delay(f: Callable[[], T]) -> IO[T]:
102 """Create IO from a zero-argument callable.
104 Args:
105 f: Thunk to defer.
107 Examples:
108 ::
110 assert IO.delay(lambda: 7)() == 7
111 """
112 return IO(f)
114 @staticmethod
115 def into[T, **P](f: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> IO[T]:
116 """Create IO by partially applying a function.
118 Args:
119 f: Function to partially apply.
120 *args: Positional arguments to bind.
121 **kwargs: Keyword arguments to bind.
123 Returns:
124 An IO that calls ``f(*args, **kwargs)`` on execution.
126 Examples:
127 ::
129 def add(a: int, b: int) -> int:
130 return a + b
132 assert IO.into(add, 3, 4)() == 7
133 """
134 return IO(partial(f, *args, **kwargs))
136 @staticmethod
137 def wrap[T, **P](
138 f: Callable[P, T], *args: P.args, **kwargs: P.kwargs
139 ) -> Callable[..., IO[T]]:
140 """Create a factory that produces IO from additional arguments.
142 Pre-binds some arguments; the returned callable accepts the rest.
144 Args:
145 f: Function to partially apply.
146 *args: Pre-bound positional arguments.
147 **kwargs: Pre-bound keyword arguments.
149 Returns:
150 A callable ``(*extra_args, **extra_kwargs) -> IO[T]``.
151 """
152 def run(*a: Any, **kw: Any) -> IO[T]:
153 return IO(partial(f, *args, *a, **kwargs, **kw))
155 return run
157 def bracket_case[U]( # type: ignore
158 self, use: Callable[[T], IO[U]], release: Callable[[T, ExitCase], IO[None]]
159 ) -> IO[U]:
160 """Acquire-use-release with ExitCase feedback to the release function.
162 Executes: ``acquire → use(value) → release(value, exit_case)``.
163 Release always runs, even on exception or cancellation.
165 Args:
166 use: Function applied to the acquired value.
167 release: Cleanup function receiving the value and ``ExitCase``.
169 Returns:
170 An IO that manages the full acquire-use-release lifecycle.
171 """
172 def run() -> U:
173 value = self._run()
174 try:
175 res = use(value)()
176 except asyncio.CancelledError:
177 try:
178 release(value, ExitCase.Canceled)()
179 except Exception:
180 logger.debug('IO.bracket_case: ошибка release подавлена (Canceled)')
181 raise
182 except Exception as e:
183 try:
184 release(value, ExitCase.Errored(exc=e))()
185 except Exception:
186 logger.debug('IO.bracket_case: ошибка release подавлена (Errored)')
187 raise
188 else:
189 try:
190 release(value, ExitCase.Succeeded)()
191 except Exception:
192 logger.debug('IO.bracket_case: ошибка release подавлена (Succeeded)')
193 return res
195 return IO(run)