Coverage for greyhorse / river / private / monads / resource_reader.py: 29%
73 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
1# mypy: disable_error_code="name-defined,arg-type"
2"""ResourceReader — Reader transformer producing Resource[IO, T] from a context."""
4from __future__ import annotations
6import logging
7from typing import Any
9from greyhorse.app.abc.collections.selectors import Selector
10from greyhorse.app.private.runtime.invoke import invoke_sync
11from greyhorse.factory import Factory
13from .effect import ExitCase
14from .io import IO
15from .reader_t import ReaderT
16from .resource import Resource
19logger = logging.getLogger(__name__)
22OperationSelector = Selector[type, Any]
25class ResourceReader[T](ReaderT[OperationSelector, Resource[IO, 'T'], 'T']):
26 """Reader transformer that produces ``Resource[IO, T]`` from a context.
28 Two factory methods:
30 - ``from_factory(factory)`` — injects dependencies via ``Selector``
31 (prototype-compatible, O(N) per-param lookup).
32 - ``from_factory_injected(factory)`` — injects via ``FragmentLinker``
33 (overrides-first, cross-fragment resolution, bracket-safe cleanup).
35 The returned ``ResourceReader`` is callable: ``reader(ctx) -> Resource[IO, T]``.
36 The Resource is lazy — execution happens on ``.run()`` or ``with .open()``.
38 For scoped factories (generators), acquire/release bracket semantics
39 are preserved. For unscoped factories, resolver cleanup is deferred
40 until the Resource is released.
42 Type parameters:
43 T: Value type produced by the factory.
44 """
46 @classmethod
47 def from_factory(
48 cls, factory: Factory[T], *args: Any, **kwargs: Any
49 ) -> ResourceReader[T]:
50 """Create a ResourceReader using Selector-based DI.
52 Injects dependencies by iterating factory params and looking up
53 each type in the selector. Ported from the prototype.
55 Args:
56 factory: The Factory to inject into and invoke.
57 *args: Extra positional args passed to ``factory.create()``.
58 **kwargs: Extra keyword args passed to ``factory.create()``.
60 Returns:
61 A ResourceReader that, given a Selector, produces Resource[IO, T].
62 """
63 from ..injection import inject_functor
65 args = tuple(a for a in args if a is not None)
67 def run(selector: OperationSelector) -> Resource[IO, T]:
68 cloned_fn = factory.clone()
69 inject_functor(cloned_fn, selector)
71 if cloned_fn.scoped:
72 resource = Resource[IO, cloned_fn.return_type].make(
73 IO.into(invoke_sync, cloned_fn.create, *args, **kwargs),
74 IO.wrap(invoke_sync, cloned_fn.destroy),
75 )
76 else:
77 resource = Resource[IO, cloned_fn.return_type].eval(
78 IO.into(invoke_sync, cloned_fn.create, *args, **kwargs)
79 )
80 return resource
82 return ResourceReader[T](run)
84 @classmethod
85 def from_factory_injected(
86 cls, factory: Factory[T], *args: Any, **kwargs: Any
87 ) -> ResourceReader[T]:
88 """Create a ResourceReader using FragmentLinker-based DI.
90 Uses ``inject_functor_via_linker`` for multi-root cross-fragment
91 resolution with overrides-first semantics. Resolvers are kept alive
92 until the Resource is released (bracket-safe).
94 Args:
95 factory: The Factory to inject into and invoke.
96 *args: Extra positional args passed to ``factory.create()``.
97 **kwargs: Extra keyword args passed to ``factory.create()``.
99 Returns:
100 A ResourceReader that, given an OperatorResolver, produces Resource[IO, T].
101 """
102 from ..injection import inject_functor_via_linker
104 args = tuple(a for a in args if a is not None)
106 def run(ctx: Any) -> Resource[IO, T]:
107 if factory.scoped:
109 def acquire() -> _Held:
110 cloned = factory.clone()
111 resolvers = inject_functor_via_linker(
112 cloned, ctx.linker, ctx.scope, external=dict(ctx.context_values)
113 )
114 try:
115 value = invoke_sync(cloned.create, *args, **kwargs)
116 return _Held(value=value, resolvers=resolvers, cloned=cloned)
117 except Exception:
118 _close_resolvers(resolvers)
119 raise
121 def release(held: _Held, exit_case: ExitCase) -> None:
122 try:
123 invoke_sync(held.cloned.destroy, held.value)
124 finally:
125 _close_resolvers(held.resolvers)
127 return Resource[IO, factory.return_type].make_case(
128 acquire=IO(acquire), release=lambda h, ec: IO(lambda: release(h, ec))
129 )
131 pending_resolvers: list[list] = []
133 def acquire_unscoped() -> Any:
134 cloned = factory.clone()
135 resolvers = inject_functor_via_linker(
136 cloned, ctx.linker, ctx.scope, external=dict(ctx.context_values)
137 )
138 try:
139 value = invoke_sync(cloned.create, *args, **kwargs)
140 except Exception:
141 _close_resolvers(resolvers)
142 raise
143 pending_resolvers.append(resolvers)
144 return value
146 def release_unscoped(value: Any, exit_case: ExitCase) -> None:
147 if pending_resolvers:
148 _close_resolvers(pending_resolvers.pop())
150 return Resource[IO, factory.return_type].make_case(
151 acquire=IO(acquire_unscoped),
152 release=lambda v, ec: IO(lambda: release_unscoped(v, ec)),
153 )
155 return ResourceReader[T](run)
158class _Held:
159 """Per-acquisition state for scoped ResourceReader cleanup."""
161 __slots__ = ('cloned', 'resolvers', 'value')
163 def __init__(self, value: Any, resolvers: list, cloned: Factory) -> None:
164 self.value = value
165 self.resolvers = resolvers
166 self.cloned = cloned
169def _close_resolvers(resolvers: list) -> None:
170 """Close all ValueResolvers in reverse order, suppressing exceptions."""
171 for r in reversed(resolvers):
172 try:
173 r.__exit__(None, None, None)
174 except Exception:
175 logger.debug('_close_resolvers: ошибка подавлена')