Coverage for greyhorse / river / invocation.py: 93%

106 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-18 11:33 +0300

1"""Operator invocation builder with dependency binding and lazy execution. 

2 

3``OperatorInvocation`` is the bridge between controller-owned runtime state 

4and the operator's execution pipeline. A controller creates an invocation, 

5binds contingent dependencies, and calls ``prepare(resolver)`` to get a 

6``Resource[IO, T]`` that can be executed via ``.run()`` or ``with .open()``. 

7""" 

8 

9from __future__ import annotations 

10 

11import logging 

12from collections.abc import Callable 

13from typing import Any 

14 

15from greyhorse.app.private.resolving import _type_to_param_name 

16from greyhorse.app.private.runtime.invoke import invoke_sync 

17from greyhorse.factory import Factory 

18 

19from .private.injection import inject_functor_via_linker 

20from .private.monads.effect import ExitCase 

21from .private.monads.io import IO 

22from .private.monads.resource import Resource, resource_acquire, resource_release 

23from .resolver import OperatorResolver 

24 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class OperatorInvocation[T]: 

30 """Builder for a single operator execution. 

31 

32 Holds the factory (from ``@operator`` decorator), user-supplied args, 

33 and bindings (contingent dependencies from the controller). 

34 

35 Typical usage:: 

36 

37 inv = ops.publish_pool() 

38 inv.bind(DbPool, pool_value) 

39 program = inv.prepare(resolver) # Resource[IO, T] 

40 result = program.run() 

41 """ 

42 

43 __slots__ = ('_args', '_bindings', '_factory', '_kwargs') 

44 

45 def __init__(self, factory: Factory[T], *args: Any, **kwargs: Any) -> None: 

46 self._factory = factory 

47 self._args = args 

48 self._kwargs = kwargs 

49 self._bindings: dict[type, Any] = {} 

50 

51 def bind(self, key: type, value: Any) -> OperatorInvocation[T]: 

52 """Bind a contingent dependency by type. 

53 

54 Pure values are wrapped in ``Resource.pure``. ``Resource`` instances 

55 are stored as-is (bracket semantics preserved). 

56 

57 Args: 

58 key: The type slot to fill (e.g. ``DbPool``). 

59 value: A ready value or a ``Resource[IO, T]`` program. 

60 

61 Returns: 

62 Self for chaining. 

63 """ 

64 if isinstance(value, Resource): 

65 self._bindings[key] = value 

66 else: 

67 self._bindings[key] = Resource[IO, type(value)].pure(value) # type: ignore[misc] 

68 return self 

69 

70 def bind_resource( 

71 self, key: type, *, acquire: Callable[[], Any], release: Callable[[Any], None] 

72 ) -> OperatorInvocation[T]: 

73 """Bind a managed contingent resource with acquire/release pair. 

74 

75 Internally wraps into ``Resource.make(IO.into(acquire), IO.wrap(release))``. 

76 Controller code stays clean — no IO/Resource visible. 

77 

78 Args: 

79 key: The type slot to fill. 

80 acquire: Callable that creates the resource. 

81 release: Callable that destroys the resource. 

82 

83 Returns: 

84 Self for chaining. 

85 """ 

86 res = Resource[IO, key].make(IO.into(acquire), IO.wrap(release)) # type: ignore[valid-type,call-arg] 

87 self._bindings[key] = res 

88 return self 

89 

90 def prepare(self, resolver: OperatorResolver) -> Resource[IO, T]: 

91 """Build a lazy ``Resource[IO, T]`` program from bindings + fragment DI. 

92 

93 The returned Resource is not executed until ``.run()`` or ``with .open()``. 

94 Resolvers and bound resources are acquired during execution and released 

95 when the Resource is released (bracket semantics). 

96 

97 Args: 

98 resolver: Provides ``linker``, ``scope``, and ``context_values`` 

99 for fragment-based dependency injection. 

100 

101 Returns: 

102 A ``Resource[IO, T]`` that executes the operator when used. 

103 """ 

104 factory = self._factory 

105 args = self._args 

106 kwargs = self._kwargs 

107 bindings = dict(self._bindings) 

108 

109 if factory.scoped: 

110 

111 def acquire() -> _Held: 

112 cloned = factory.clone() 

113 external, bound_stacks = _materialize_bindings(bindings, resolver) 

114 resolvers: list = [] 

115 try: 

116 skip_slots = set(bindings.keys()) 

117 resolvers = inject_functor_via_linker( 

118 cloned, 

119 resolver.linker, 

120 resolver.scope, 

121 external=external, 

122 skip_slots=skip_slots, 

123 ) 

124 value = invoke_sync(cloned.create, *args, **kwargs) 

125 return _Held( 

126 value=value, 

127 resolvers=resolvers, 

128 cloned=cloned, 

129 bound_stacks=bound_stacks, 

130 ) 

131 except Exception: 

132 _cleanup(resolvers, bound_stacks) 

133 raise 

134 

135 def release(held: _Held, exit_case: ExitCase) -> None: 

136 try: 

137 invoke_sync(held.cloned.destroy, held.value) 

138 finally: 

139 _cleanup(held.resolvers, held.bound_stacks, exit_case) 

140 

141 held_res = Resource[IO, T].make_case( 

142 acquire=IO(acquire), 

143 release=lambda h, ec: IO(lambda: release(h, ec)), # type: ignore[arg-type] 

144 ) 

145 return held_res.and_then( # type: ignore[return-value] 

146 lambda h: Resource[IO, T].pure(h.value) # type: ignore[union-attr] 

147 ) 

148 

149 pending_cleanup: list[tuple[list, dict[type, list]]] = [] 

150 

151 def acquire_unscoped() -> Any: 

152 cloned = factory.clone() 

153 external, bound_stacks = _materialize_bindings(bindings, resolver) 

154 resolvers: list = [] 

155 try: 

156 skip_slots = set(bindings.keys()) 

157 resolvers = inject_functor_via_linker( 

158 cloned, 

159 resolver.linker, 

160 resolver.scope, 

161 external=external, 

162 skip_slots=skip_slots, 

163 ) 

164 value = invoke_sync(cloned.create, *args, **kwargs) 

165 except Exception: 

166 _cleanup(resolvers, bound_stacks) 

167 raise 

168 pending_cleanup.append((resolvers, bound_stacks)) 

169 return value 

170 

171 def release_unscoped(value: Any, exit_case: ExitCase) -> None: 

172 if pending_cleanup: 

173 resolvers, bound_stacks = pending_cleanup.pop() 

174 _cleanup(resolvers, bound_stacks, exit_case) 

175 

176 return Resource[IO, T].make_case( 

177 acquire=IO(acquire_unscoped), 

178 release=lambda v, ec: IO(lambda: release_unscoped(v, ec)), 

179 ) 

180 

181 

182class _Held: 

183 """Per-acquisition state for scoped operator cleanup.""" 

184 

185 __slots__ = ('bound_stacks', 'cloned', 'resolvers', 'value') 

186 

187 def __init__( 

188 self, value: Any, resolvers: list, cloned: Factory, bound_stacks: dict[type, list] 

189 ) -> None: 

190 self.value = value 

191 self.resolvers = resolvers 

192 self.cloned = cloned 

193 self.bound_stacks = bound_stacks 

194 

195 

196def _materialize_bindings( 

197 bindings: dict[type, Resource], resolver: OperatorResolver 

198) -> tuple[dict[str, Any], dict[type, list]]: 

199 """Acquire all bound resources and build the external dict for DI.""" 

200 external = dict(resolver.context_values) 

201 bound_stacks: dict[type, list] = {} 

202 try: 

203 for bound_type, bound_res in bindings.items(): 

204 stack: list = [] 

205 val = resource_acquire(bound_res, stack)() 

206 external[_type_to_param_name(bound_type)] = val 

207 bound_stacks[bound_type] = stack 

208 except Exception: 

209 _cleanup([], bound_stacks) 

210 raise 

211 return external, bound_stacks 

212 

213 

214def _cleanup( 

215 resolvers: list, bound_stacks: dict[type, list], exit_case: ExitCase | None = None 

216) -> None: 

217 """Release resolvers and bound resource stacks in reverse order.""" 

218 for r in reversed(resolvers): 

219 try: 

220 r.__exit__(None, None, None) 

221 except Exception: 

222 logger.debug('_cleanup: ошибка resolver __exit__ подавлена') 

223 

224 ec = exit_case if exit_case is not None else ExitCase.Succeeded 

225 for stack in reversed(list(bound_stacks.values())): 

226 release_io = resource_release(ec, stack) 

227 if release_io is not None: 

228 try: 

229 release_io() # type: ignore[operator] 

230 except Exception: 

231 logger.debug('_cleanup: ошибка release bound-ресурса подавлена')