Coverage for greyhorse / app / resources / injection.py: 0%
94 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 00:48 +0300
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 00:48 +0300
1import inspect
2import sys
3from collections.abc import Callable, Iterable
4from dataclasses import dataclass, field
5from functools import wraps
6from typing import Any, TypeVar, get_type_hints
8from greyhorse.app.abc.collections.selectors import Selector
9from greyhorse.app.private.runtime.invoke import caller_path
10from greyhorse.maybe import Just, Maybe, Nothing
11from greyhorse.utils.types import is_maybe, is_optional, unwrap_maybe, unwrap_optional
13from ..abc.module import Module
16Target = TypeVar('Target', bound=Callable | type)
19@dataclass(slots=True)
20class _ModuleNode:
21 children: dict[str, '_ModuleNode'] = field(default_factory=dict)
22 targets: list[Target] = field(default_factory=list)
25class _TargetCollector:
26 __slots__ = ('_root',)
28 def __init__(self) -> None:
29 self._root = _ModuleNode()
31 def add_target(self, target: Target) -> bool:
32 path = target.__module__
33 current = self._root
35 for path_entry in path.split('.'):
36 if path_entry not in current.children:
37 current.children[path_entry] = _ModuleNode()
38 current = current.children[path_entry]
40 if target in current.targets:
41 return False
43 current.targets.append(target)
44 return True
46 def list_targets(self, path: str) -> list[Target]:
47 current = self._root
49 for path_entry in path.split('.'):
50 if path_entry not in current.children:
51 return []
52 current = current.children[path_entry]
54 result = current.targets.copy()
55 nodes = list(current.children.values())
56 curr_idx = 0
58 while curr_idx < len(nodes):
59 node = nodes[curr_idx]
60 nodes += node.children.values()
61 curr_idx += 1
63 for node in nodes:
64 result += node.targets.copy()
66 return result
69_collector = _TargetCollector()
72def get_current_module(depth: int = 2) -> Maybe[Module]:
73 current_module_path = caller_path(depth)
75 while len(current_module_path):
76 dotted_module_path = '.'.join(current_module_path)
77 if (mod := sys.modules.get(dotted_module_path)) and (
78 instance := getattr(mod, '__gh_module__', None)
79 ):
80 return Just(instance)
82 current_module_path = current_module_path[: len(current_module_path) - 1]
84 return Nothing
87def inject_targets(container: Selector[type, Any], paths: Iterable[str]) -> int:
88 targets = set()
90 for path in paths:
91 targets.update(set(_collector.list_targets(path)))
93 for target in targets:
94 target.__container__ = container
96 return len(targets)
99def uninject_targets(paths: Iterable[str]) -> int:
100 targets = set()
102 for path in paths:
103 targets.update(set(_collector.list_targets(path)))
105 for target in targets:
106 delattr(target, '__container__')
108 return len(targets)
111def invoke_injected[T, **P](
112 func: Callable[P, T],
113 hints: dict[str, Any],
114 selector: Selector[type, Any],
115 /,
116 *args: P.args,
117 **kwargs: P.kwargs,
118) -> T:
119 injected_args = {}
120 args_count = len(args) if inspect.ismethod(func) else len(args) - 1
122 for i, (k, v) in enumerate(hints.items()):
123 if i < args_count or k in kwargs:
124 continue
125 if value := selector.get(unwrap_maybe(unwrap_optional(v))):
126 injected_args[k] = value if is_maybe(v) else value.unwrap()
127 elif is_maybe(v):
128 injected_args[k] = Nothing
129 elif is_optional(v):
130 injected_args[k] = None
132 kwargs.update(injected_args)
133 return func(*args, **kwargs)
136def inject[T, **P](target: Callable[P, T] | type[T]) -> Callable[P, T]:
137 hints = get_type_hints(target)
138 hints.pop('return', None)
140 @wraps(target)
141 def decorator(*args: P.args, **kwargs: P.kwargs) -> T:
142 if container := getattr(target, '__container__', None):
143 return invoke_injected(target, hints, container, *args, **kwargs)
144 return target(*args, **kwargs)
146 _collector.add_target(target)
147 return decorator