Coverage for greyhorse / river / invocation.py: 93%
106 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-18 11:33 +0300
1"""Operator invocation builder with dependency binding and lazy execution.
3``OperatorInvocation`` is the bridge between controller-owned runtime state
4and the operator's execution pipeline. A controller creates an invocation,
5binds contingent dependencies, and calls ``prepare(resolver)`` to get a
6``Resource[IO, T]`` that can be executed via ``.run()`` or ``with .open()``.
7"""
9from __future__ import annotations
11import logging
12from collections.abc import Callable
13from typing import Any
15from greyhorse.app.private.resolving import _type_to_param_name
16from greyhorse.app.private.runtime.invoke import invoke_sync
17from greyhorse.factory import Factory
19from .private.injection import inject_functor_via_linker
20from .private.monads.effect import ExitCase
21from .private.monads.io import IO
22from .private.monads.resource import Resource, resource_acquire, resource_release
23from .resolver import OperatorResolver
26logger = logging.getLogger(__name__)
29class OperatorInvocation[T]:
30 """Builder for a single operator execution.
32 Holds the factory (from ``@operator`` decorator), user-supplied args,
33 and bindings (contingent dependencies from the controller).
35 Typical usage::
37 inv = ops.publish_pool()
38 inv.bind(DbPool, pool_value)
39 program = inv.prepare(resolver) # Resource[IO, T]
40 result = program.run()
41 """
43 __slots__ = ('_args', '_bindings', '_factory', '_kwargs')
45 def __init__(self, factory: Factory[T], *args: Any, **kwargs: Any) -> None:
46 self._factory = factory
47 self._args = args
48 self._kwargs = kwargs
49 self._bindings: dict[type, Any] = {}
51 def bind(self, key: type, value: Any) -> OperatorInvocation[T]:
52 """Bind a contingent dependency by type.
54 Pure values are wrapped in ``Resource.pure``. ``Resource`` instances
55 are stored as-is (bracket semantics preserved).
57 Args:
58 key: The type slot to fill (e.g. ``DbPool``).
59 value: A ready value or a ``Resource[IO, T]`` program.
61 Returns:
62 Self for chaining.
63 """
64 if isinstance(value, Resource):
65 self._bindings[key] = value
66 else:
67 self._bindings[key] = Resource[IO, type(value)].pure(value) # type: ignore[misc]
68 return self
70 def bind_resource(
71 self, key: type, *, acquire: Callable[[], Any], release: Callable[[Any], None]
72 ) -> OperatorInvocation[T]:
73 """Bind a managed contingent resource with acquire/release pair.
75 Internally wraps into ``Resource.make(IO.into(acquire), IO.wrap(release))``.
76 Controller code stays clean — no IO/Resource visible.
78 Args:
79 key: The type slot to fill.
80 acquire: Callable that creates the resource.
81 release: Callable that destroys the resource.
83 Returns:
84 Self for chaining.
85 """
86 res = Resource[IO, key].make(IO.into(acquire), IO.wrap(release)) # type: ignore[valid-type,call-arg]
87 self._bindings[key] = res
88 return self
90 def prepare(self, resolver: OperatorResolver) -> Resource[IO, T]:
91 """Build a lazy ``Resource[IO, T]`` program from bindings + fragment DI.
93 The returned Resource is not executed until ``.run()`` or ``with .open()``.
94 Resolvers and bound resources are acquired during execution and released
95 when the Resource is released (bracket semantics).
97 Args:
98 resolver: Provides ``linker``, ``scope``, and ``context_values``
99 for fragment-based dependency injection.
101 Returns:
102 A ``Resource[IO, T]`` that executes the operator when used.
103 """
104 factory = self._factory
105 args = self._args
106 kwargs = self._kwargs
107 bindings = dict(self._bindings)
109 if factory.scoped:
111 def acquire() -> _Held:
112 cloned = factory.clone()
113 external, bound_stacks = _materialize_bindings(bindings, resolver)
114 resolvers: list = []
115 try:
116 skip_slots = set(bindings.keys())
117 resolvers = inject_functor_via_linker(
118 cloned,
119 resolver.linker,
120 resolver.scope,
121 external=external,
122 skip_slots=skip_slots,
123 )
124 value = invoke_sync(cloned.create, *args, **kwargs)
125 return _Held(
126 value=value,
127 resolvers=resolvers,
128 cloned=cloned,
129 bound_stacks=bound_stacks,
130 )
131 except Exception:
132 _cleanup(resolvers, bound_stacks)
133 raise
135 def release(held: _Held, exit_case: ExitCase) -> None:
136 try:
137 invoke_sync(held.cloned.destroy, held.value)
138 finally:
139 _cleanup(held.resolvers, held.bound_stacks, exit_case)
141 held_res = Resource[IO, T].make_case(
142 acquire=IO(acquire),
143 release=lambda h, ec: IO(lambda: release(h, ec)), # type: ignore[arg-type]
144 )
145 return held_res.and_then( # type: ignore[return-value]
146 lambda h: Resource[IO, T].pure(h.value) # type: ignore[union-attr]
147 )
149 pending_cleanup: list[tuple[list, dict[type, list]]] = []
151 def acquire_unscoped() -> Any:
152 cloned = factory.clone()
153 external, bound_stacks = _materialize_bindings(bindings, resolver)
154 resolvers: list = []
155 try:
156 skip_slots = set(bindings.keys())
157 resolvers = inject_functor_via_linker(
158 cloned,
159 resolver.linker,
160 resolver.scope,
161 external=external,
162 skip_slots=skip_slots,
163 )
164 value = invoke_sync(cloned.create, *args, **kwargs)
165 except Exception:
166 _cleanup(resolvers, bound_stacks)
167 raise
168 pending_cleanup.append((resolvers, bound_stacks))
169 return value
171 def release_unscoped(value: Any, exit_case: ExitCase) -> None:
172 if pending_cleanup:
173 resolvers, bound_stacks = pending_cleanup.pop()
174 _cleanup(resolvers, bound_stacks, exit_case)
176 return Resource[IO, T].make_case(
177 acquire=IO(acquire_unscoped),
178 release=lambda v, ec: IO(lambda: release_unscoped(v, ec)),
179 )
182class _Held:
183 """Per-acquisition state for scoped operator cleanup."""
185 __slots__ = ('bound_stacks', 'cloned', 'resolvers', 'value')
187 def __init__(
188 self, value: Any, resolvers: list, cloned: Factory, bound_stacks: dict[type, list]
189 ) -> None:
190 self.value = value
191 self.resolvers = resolvers
192 self.cloned = cloned
193 self.bound_stacks = bound_stacks
196def _materialize_bindings(
197 bindings: dict[type, Resource], resolver: OperatorResolver
198) -> tuple[dict[str, Any], dict[type, list]]:
199 """Acquire all bound resources and build the external dict for DI."""
200 external = dict(resolver.context_values)
201 bound_stacks: dict[type, list] = {}
202 try:
203 for bound_type, bound_res in bindings.items():
204 stack: list = []
205 val = resource_acquire(bound_res, stack)()
206 external[_type_to_param_name(bound_type)] = val
207 bound_stacks[bound_type] = stack
208 except Exception:
209 _cleanup([], bound_stacks)
210 raise
211 return external, bound_stacks
214def _cleanup(
215 resolvers: list, bound_stacks: dict[type, list], exit_case: ExitCase | None = None
216) -> None:
217 """Release resolvers and bound resource stacks in reverse order."""
218 for r in reversed(resolvers):
219 try:
220 r.__exit__(None, None, None)
221 except Exception:
222 logger.debug('_cleanup: ошибка resolver __exit__ подавлена')
224 ec = exit_case if exit_case is not None else ExitCase.Succeeded
225 for stack in reversed(list(bound_stacks.values())):
226 release_io = resource_release(ec, stack)
227 if release_io is not None:
228 try:
229 release_io() # type: ignore[operator]
230 except Exception:
231 logger.debug('_cleanup: ошибка release bound-ресурса подавлена')