Coverage for greyhorse / river / private / monads / resource_reader.py: 29%

73 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-18 11:33 +0300

1# mypy: disable_error_code="name-defined,arg-type" 

2"""ResourceReader — Reader transformer producing Resource[IO, T] from a context.""" 

3 

4from __future__ import annotations 

5 

6import logging 

7from typing import Any 

8 

9from greyhorse.app.abc.collections.selectors import Selector 

10from greyhorse.app.private.runtime.invoke import invoke_sync 

11from greyhorse.factory import Factory 

12 

13from .effect import ExitCase 

14from .io import IO 

15from .reader_t import ReaderT 

16from .resource import Resource 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22OperationSelector = Selector[type, Any] 

23 

24 

25class ResourceReader[T](ReaderT[OperationSelector, Resource[IO, 'T'], 'T']): 

26 """Reader transformer that produces ``Resource[IO, T]`` from a context. 

27 

28 Two factory methods: 

29 

30 - ``from_factory(factory)`` — injects dependencies via ``Selector`` 

31 (prototype-compatible, O(N) per-param lookup). 

32 - ``from_factory_injected(factory)`` — injects via ``FragmentLinker`` 

33 (overrides-first, cross-fragment resolution, bracket-safe cleanup). 

34 

35 The returned ``ResourceReader`` is callable: ``reader(ctx) -> Resource[IO, T]``. 

36 The Resource is lazy — execution happens on ``.run()`` or ``with .open()``. 

37 

38 For scoped factories (generators), acquire/release bracket semantics 

39 are preserved. For unscoped factories, resolver cleanup is deferred 

40 until the Resource is released. 

41 

42 Type parameters: 

43 T: Value type produced by the factory. 

44 """ 

45 

46 @classmethod 

47 def from_factory( 

48 cls, factory: Factory[T], *args: Any, **kwargs: Any 

49 ) -> ResourceReader[T]: 

50 """Create a ResourceReader using Selector-based DI. 

51 

52 Injects dependencies by iterating factory params and looking up 

53 each type in the selector. Ported from the prototype. 

54 

55 Args: 

56 factory: The Factory to inject into and invoke. 

57 *args: Extra positional args passed to ``factory.create()``. 

58 **kwargs: Extra keyword args passed to ``factory.create()``. 

59 

60 Returns: 

61 A ResourceReader that, given a Selector, produces Resource[IO, T]. 

62 """ 

63 from ..injection import inject_functor 

64 

65 args = tuple(a for a in args if a is not None) 

66 

67 def run(selector: OperationSelector) -> Resource[IO, T]: 

68 cloned_fn = factory.clone() 

69 inject_functor(cloned_fn, selector) 

70 

71 if cloned_fn.scoped: 

72 resource = Resource[IO, cloned_fn.return_type].make( 

73 IO.into(invoke_sync, cloned_fn.create, *args, **kwargs), 

74 IO.wrap(invoke_sync, cloned_fn.destroy), 

75 ) 

76 else: 

77 resource = Resource[IO, cloned_fn.return_type].eval( 

78 IO.into(invoke_sync, cloned_fn.create, *args, **kwargs) 

79 ) 

80 return resource 

81 

82 return ResourceReader[T](run) 

83 

84 @classmethod 

85 def from_factory_injected( 

86 cls, factory: Factory[T], *args: Any, **kwargs: Any 

87 ) -> ResourceReader[T]: 

88 """Create a ResourceReader using FragmentLinker-based DI. 

89 

90 Uses ``inject_functor_via_linker`` for multi-root cross-fragment 

91 resolution with overrides-first semantics. Resolvers are kept alive 

92 until the Resource is released (bracket-safe). 

93 

94 Args: 

95 factory: The Factory to inject into and invoke. 

96 *args: Extra positional args passed to ``factory.create()``. 

97 **kwargs: Extra keyword args passed to ``factory.create()``. 

98 

99 Returns: 

100 A ResourceReader that, given an OperatorResolver, produces Resource[IO, T]. 

101 """ 

102 from ..injection import inject_functor_via_linker 

103 

104 args = tuple(a for a in args if a is not None) 

105 

106 def run(ctx: Any) -> Resource[IO, T]: 

107 if factory.scoped: 

108 

109 def acquire() -> _Held: 

110 cloned = factory.clone() 

111 resolvers = inject_functor_via_linker( 

112 cloned, ctx.linker, ctx.scope, external=dict(ctx.context_values) 

113 ) 

114 try: 

115 value = invoke_sync(cloned.create, *args, **kwargs) 

116 return _Held(value=value, resolvers=resolvers, cloned=cloned) 

117 except Exception: 

118 _close_resolvers(resolvers) 

119 raise 

120 

121 def release(held: _Held, exit_case: ExitCase) -> None: 

122 try: 

123 invoke_sync(held.cloned.destroy, held.value) 

124 finally: 

125 _close_resolvers(held.resolvers) 

126 

127 return Resource[IO, factory.return_type].make_case( 

128 acquire=IO(acquire), release=lambda h, ec: IO(lambda: release(h, ec)) 

129 ) 

130 

131 pending_resolvers: list[list] = [] 

132 

133 def acquire_unscoped() -> Any: 

134 cloned = factory.clone() 

135 resolvers = inject_functor_via_linker( 

136 cloned, ctx.linker, ctx.scope, external=dict(ctx.context_values) 

137 ) 

138 try: 

139 value = invoke_sync(cloned.create, *args, **kwargs) 

140 except Exception: 

141 _close_resolvers(resolvers) 

142 raise 

143 pending_resolvers.append(resolvers) 

144 return value 

145 

146 def release_unscoped(value: Any, exit_case: ExitCase) -> None: 

147 if pending_resolvers: 

148 _close_resolvers(pending_resolvers.pop()) 

149 

150 return Resource[IO, factory.return_type].make_case( 

151 acquire=IO(acquire_unscoped), 

152 release=lambda v, ec: IO(lambda: release_unscoped(v, ec)), 

153 ) 

154 

155 return ResourceReader[T](run) 

156 

157 

158class _Held: 

159 """Per-acquisition state for scoped ResourceReader cleanup.""" 

160 

161 __slots__ = ('cloned', 'resolvers', 'value') 

162 

163 def __init__(self, value: Any, resolvers: list, cloned: Factory) -> None: 

164 self.value = value 

165 self.resolvers = resolvers 

166 self.cloned = cloned 

167 

168 

169def _close_resolvers(resolvers: list) -> None: 

170 """Close all ValueResolvers in reverse order, suppressing exceptions.""" 

171 for r in reversed(resolvers): 

172 try: 

173 r.__exit__(None, None, None) 

174 except Exception: 

175 logger.debug('_close_resolvers: ошибка подавлена')