Coverage for greyhorse / river / private / monads / resource.py: 97%
116 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
1# mypy: warn_no_return=false,disable_error_code="misc,type-arg,valid-type,return-value,operator,call-arg,override"
2"""Resource monad — bracket-based lifecycle management.
4``Resource[F, T]`` is an ADT (Pure/Allocate/Bind/Eval) that describes
5acquire/release pairs for managed resources. Parameterized by effect type ``F``
6(typically ``IO``) and value type ``T``.
8The primary API is ``Resource.make(acquire, release)`` + ``.run()`` for
9immediate execution, or ``.open()`` for ``with``-statement usage via
10``ResourceHandle``.
12Examples:
13 Simple value::
15 res = Resource[IO, int].pure(42)
16 assert res.run() == 42
18 Managed resource with acquire/release::
20 pool = Resource[IO, Pool].make(
21 IO(lambda: Pool.create(size=10)),
22 lambda p: IO(lambda: p.close()),
23 )
24 with pool.open() as p:
25 p.query(...)
26 # p.close() called automatically
28 Chaining resources::
30 db = Resource[IO, Db].make(IO(connect), lambda c: IO(c.close))
31 tx = db.and_then(lambda c: Resource[IO, Tx].make(
32 IO(c.begin), lambda t: IO(t.rollback)
33 ))
34 result = tx.run() # connect → begin → use → rollback → close
35"""
37from __future__ import annotations
39import logging
40from collections.abc import Callable
41from functools import partial
42from typing import Any, ForwardRef, TypeVar
44from greyhorse.enum import Enum, Struct, Tuple
45from greyhorse.utils.types import TypeWrapper
47from .effect import Effect, ExitCase
48from .io import IO
51logger = logging.getLogger(__name__)
54F = TypeVar('F', bound=Effect, covariant=True)
55T = TypeVar('T', covariant=True)
56U = TypeVar('U', covariant=True)
59class Resource(Enum, TypeWrapper[F, T]):
60 """Bracket-based resource lifecycle as an algebraic data type.
62 Four variants:
64 - ``Pure(value)`` — wraps a ready value, no cleanup.
65 - ``Allocate(acquire, release)`` — acquire/release pair with ExitCase.
66 - ``Bind(source, continuation)`` — monadic chaining.
67 - ``Eval(thunk, suspend)`` — wraps an effect, optionally suspending.
69 Type parameters:
70 F: Effect type (typically ``IO``).
71 T: Value type produced by the resource.
72 """
74 Pure = Tuple(T)
75 Allocate = Struct(acquire=Effect[T], release=Callable[[T, ExitCase], Effect[None]])
76 Bind = Struct(
77 source=ForwardRef('Resource'), continuation=Callable[[T], ForwardRef('Resource')]
78 )
79 Eval = Struct(thunk=Effect[Any], suspend=bool)
81 def use(self, f: Callable[[T], Effect[U]]) -> F[U]:
82 """Execute the resource, applying ``f`` to the acquired value.
84 Release is guaranteed via ``bracket_case`` — runs even on exception.
86 Args:
87 f: Function to apply to the acquired value, returning an Effect.
89 Returns:
90 The Effect produced by ``f``, with release guaranteed after.
92 Examples:
93 ::
95 res = Resource[IO, int].pure(42)
96 result = res.use(lambda v: IO.pure(v * 2))()
97 assert result == 84
98 """
99 match self:
100 case Resource.Pure(value):
101 return f(value)
103 case Resource.Allocate(acquire, release):
104 return acquire.bracket_case(f, release)
106 case Resource.Bind(source, continuation):
108 def run(value: T) -> F[U]:
109 next_value = continuation(value)
110 return next_value.use(f)
112 return source.use(run)
114 case Resource.Eval(thunk, suspend):
115 if suspend:
116 return thunk.and_then(lambda v: v.use(f))
117 return thunk.and_then(f)
119 def and_then(self, f: Callable[[T], Resource[F, U]]) -> Resource[F, U]:
120 """Monadic bind — chain with a function returning another Resource.
122 Args:
123 f: Function that takes the acquired value and returns a new Resource.
125 Returns:
126 A new Resource that acquires both in sequence, releases in LIFO order.
128 Examples:
129 ::
131 r1 = Resource[IO, str].pure('hello')
132 r2 = r1.and_then(lambda s: Resource[IO, str].pure(s + ' world'))
133 assert r2.run() == 'hello world'
134 """
135 return self.Bind[*self.__wrapped_type__](source=self, continuation=f)
137 def run(self) -> T:
138 """Execute the resource and return the value.
140 Shortcut for ``use(IO.pure)()``. Acquires, extracts value, releases.
142 Returns:
143 The acquired value after release.
145 Examples:
146 ::
148 assert Resource[IO, int].pure(7).run() == 7
149 """
150 return self.use(IO.pure)()
152 def open(self) -> ResourceHandle[T]:
153 """Create a context manager for ``with``-statement usage.
155 Each call returns an independent ``ResourceHandle`` with its own
156 cleanup stack. Safe for multiple concurrent opens from the same Resource.
158 Returns:
159 A ``ResourceHandle`` that acquires on ``__enter__`` and releases
160 on ``__exit__``.
162 Examples:
163 ::
165 res = Resource[IO, str].make(IO.pure('x'), lambda v: IO(lambda: None))
166 with res.open() as value:
167 assert value == 'x'
168 """
169 return ResourceHandle(self)
171 @classmethod
172 def pure(cls, value: T) -> Resource[F, T]:
173 """Wrap a pure value — no acquire/release needed.
175 Args:
176 value: The value to wrap.
178 Returns:
179 A Resource that immediately yields the value with no cleanup.
181 Examples:
182 ::
184 res = Resource[IO, int].pure(42)
185 assert res.run() == 42
186 """
187 return cls.Pure[*cls.__wrapped_type__](value)
189 @classmethod
190 def make(
191 cls, acquire: F[T], release: Callable[[T], F[None]]
192 ) -> Resource[F, T]:
193 """Create a resource with simple release (no ExitCase info).
195 Args:
196 acquire: Effect that produces the resource value.
197 release: Function that takes the value and returns a cleanup Effect.
199 Returns:
200 A Resource with bracket semantics (acquire → use → release).
202 Examples:
203 ::
205 acquired, released = [], []
206 res = Resource[IO, str].make(
207 IO(lambda: (acquired.append(1), 'val')[1]),
208 lambda v: IO(lambda: released.append(v)),
209 )
210 assert res.run() == 'val'
211 assert acquired == [1] and released == ['val']
212 """
213 return cls.Allocate[*cls.__wrapped_type__](
214 acquire=acquire, release=lambda v, _: release(v)
215 )
217 @classmethod
218 def make_case(
219 cls, acquire: F[T], release: Callable[[T, ExitCase], F[None]]
220 ) -> Resource[F, T]:
221 """Create a resource with ExitCase-aware release.
223 The release function receives the ``ExitCase`` indicating how the
224 use-phase ended (Succeeded, Canceled, or Errored).
226 Args:
227 acquire: Effect that produces the resource value.
228 release: Function ``(value, exit_case) -> cleanup Effect``.
230 Returns:
231 A Resource with ExitCase-aware bracket semantics.
233 Examples:
234 ::
236 cases = []
237 res = Resource[IO, str].make_case(
238 IO.pure('val'),
239 lambda v, ec: IO(lambda: cases.append(ec)),
240 )
241 res.run()
242 assert cases[0] == ExitCase.Succeeded
243 """
244 return cls.Allocate[*cls.__wrapped_type__](acquire=acquire, release=release)
246 @classmethod
247 def eval(cls, effect: F[T]) -> Resource[F, T]:
248 """Wrap an effect as a resource (no cleanup needed).
250 Args:
251 effect: The Effect to wrap.
253 Returns:
254 A Resource that runs the effect and yields its result.
256 Examples:
257 ::
259 res = Resource[IO, int].eval(IO.pure(7))
260 assert res.run() == 7
261 """
262 return cls.Eval[*cls.__wrapped_type__](thunk=effect, suspend=False)
264 @classmethod
265 def suspend(cls, effect: F[Resource[F, T]]) -> Resource[F, T]:
266 """Suspend a resource-producing effect for lazy evaluation.
268 The outer effect produces a Resource, which is then flattened
269 into a single Resource.
271 Args:
272 effect: An Effect that returns a Resource.
274 Returns:
275 A Resource that lazily evaluates the outer effect.
276 """
277 return cls.Eval[*cls.__wrapped_type__](thunk=effect, suspend=True)
280class ResourceHandle[T]:
281 """Context manager wrapping a ``Resource`` for ``with``-statement usage.
283 Each handle is independent — multiple ``open()`` calls on the same
284 Resource produce independent handles with separate cleanup stacks.
286 Handles partial-acquire rollback: if acquisition fails mid-chain,
287 already-acquired resources are released before re-raising.
289 Examples:
290 ::
292 res = Resource[IO, str].make(IO.pure('x'), lambda v: IO(lambda: None))
293 with res.open() as value:
294 assert value == 'x'
295 # release runs here
296 """
298 __slots__ = ('_active', '_resource', '_stack', '_value')
300 def __init__(self, resource: Resource) -> None:
301 self._resource = resource
302 self._stack: list = []
303 self._value: T | None = None
304 self._active = False
306 def __enter__(self) -> T:
307 """Acquire the resource. Rolls back partial acquisitions on failure."""
308 if self._active:
309 raise RuntimeError('ResourceHandle is not reentrant')
310 self._stack = []
311 try:
312 self._value = resource_acquire(self._resource, self._stack)()
313 except BaseException as exc:
314 release_io = resource_release(ExitCase.from_exc(exc), self._stack)
315 if release_io is not None:
316 release_io()
317 self._stack = []
318 raise
319 self._active = True
320 return self._value
322 def __exit__(
323 self,
324 exc_type: type[BaseException] | None,
325 exc_val: BaseException | None,
326 exc_tb: Any,
327 ) -> None:
328 """Release the resource. Passes ExitCase to release callbacks."""
329 exit_case = ExitCase.from_exc(exc_val)
330 release_io = resource_release(exit_case, self._stack)
331 if release_io is not None:
332 release_io()
333 self._stack = []
334 self._value = None
335 self._active = False
338def resource_acquire[F: Effect, T](
339 resource: Resource[F, T], stack: list[Callable[[ExitCase], Effect[None]]]
340) -> F[T]:
341 """Unwrap a Resource tree, pushing release callbacks onto the stack.
343 Args:
344 resource: The Resource to acquire.
345 stack: Mutable list where release callbacks are appended (LIFO order).
347 Returns:
348 An Effect that produces the acquired value.
349 """
350 match resource:
351 case Resource.Pure(value):
352 return IO.pure(value)
354 case Resource.Allocate(acquire, release):
356 def run(value: T) -> T:
357 stack.append(partial(release, value))
358 return value
360 return acquire.and_then(IO.wrap(run))
362 case Resource.Bind(source, continuation):
364 def run(value: T) -> F[U]:
365 next_value = continuation(value)
366 return resource_acquire(next_value, stack)
368 return resource_acquire(source, stack).and_then(run)
370 case Resource.Eval(thunk, suspend):
371 if suspend:
372 return thunk.and_then(lambda v: resource_acquire(v, stack))
373 return thunk
376def resource_release(
377 exit_case: ExitCase, stack: list[Callable[[ExitCase], Effect[None]]]
378) -> Effect[None] | None:
379 """Execute all release callbacks from the stack in LIFO order.
381 Every finalizer runs regardless of prior failures. The first exception
382 is re-raised after all finalizers have been attempted.
384 Args:
385 exit_case: How the use-phase ended (Succeeded/Canceled/Errored).
386 stack: Release callbacks accumulated by ``resource_acquire``.
388 Returns:
389 An IO that runs all releases, or None if the stack is empty.
390 """
391 if not stack:
392 return None
394 def run_all() -> None:
395 first_exc: BaseException | None = None
396 for fn in reversed(stack):
397 try:
398 fn(exit_case)()
399 except Exception as e:
400 if first_exc is None:
401 first_exc = e
402 else:
403 logger.debug('resource_release: ошибка подавлена: %s', e)
404 if first_exc is not None:
405 raise first_exc
407 return IO(run_all)