Coverage for greyhorse / app / resources / injection.py: 0%

94 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 00:48 +0300

1import inspect 

2import sys 

3from collections.abc import Callable, Iterable 

4from dataclasses import dataclass, field 

5from functools import wraps 

6from typing import Any, TypeVar, get_type_hints 

7 

8from greyhorse.app.abc.collections.selectors import Selector 

9from greyhorse.app.private.runtime.invoke import caller_path 

10from greyhorse.maybe import Just, Maybe, Nothing 

11from greyhorse.utils.types import is_maybe, is_optional, unwrap_maybe, unwrap_optional 

12 

13from ..abc.module import Module 

14 

15 

16Target = TypeVar('Target', bound=Callable | type) 

17 

18 

19@dataclass(slots=True) 

20class _ModuleNode: 

21 children: dict[str, '_ModuleNode'] = field(default_factory=dict) 

22 targets: list[Target] = field(default_factory=list) 

23 

24 

25class _TargetCollector: 

26 __slots__ = ('_root',) 

27 

28 def __init__(self) -> None: 

29 self._root = _ModuleNode() 

30 

31 def add_target(self, target: Target) -> bool: 

32 path = target.__module__ 

33 current = self._root 

34 

35 for path_entry in path.split('.'): 

36 if path_entry not in current.children: 

37 current.children[path_entry] = _ModuleNode() 

38 current = current.children[path_entry] 

39 

40 if target in current.targets: 

41 return False 

42 

43 current.targets.append(target) 

44 return True 

45 

46 def list_targets(self, path: str) -> list[Target]: 

47 current = self._root 

48 

49 for path_entry in path.split('.'): 

50 if path_entry not in current.children: 

51 return [] 

52 current = current.children[path_entry] 

53 

54 result = current.targets.copy() 

55 nodes = list(current.children.values()) 

56 curr_idx = 0 

57 

58 while curr_idx < len(nodes): 

59 node = nodes[curr_idx] 

60 nodes += node.children.values() 

61 curr_idx += 1 

62 

63 for node in nodes: 

64 result += node.targets.copy() 

65 

66 return result 

67 

68 

69_collector = _TargetCollector() 

70 

71 

72def get_current_module(depth: int = 2) -> Maybe[Module]: 

73 current_module_path = caller_path(depth) 

74 

75 while len(current_module_path): 

76 dotted_module_path = '.'.join(current_module_path) 

77 if (mod := sys.modules.get(dotted_module_path)) and ( 

78 instance := getattr(mod, '__gh_module__', None) 

79 ): 

80 return Just(instance) 

81 

82 current_module_path = current_module_path[: len(current_module_path) - 1] 

83 

84 return Nothing 

85 

86 

87def inject_targets(container: Selector[type, Any], paths: Iterable[str]) -> int: 

88 targets = set() 

89 

90 for path in paths: 

91 targets.update(set(_collector.list_targets(path))) 

92 

93 for target in targets: 

94 target.__container__ = container 

95 

96 return len(targets) 

97 

98 

99def uninject_targets(paths: Iterable[str]) -> int: 

100 targets = set() 

101 

102 for path in paths: 

103 targets.update(set(_collector.list_targets(path))) 

104 

105 for target in targets: 

106 delattr(target, '__container__') 

107 

108 return len(targets) 

109 

110 

111def invoke_injected[T, **P]( 

112 func: Callable[P, T], 

113 hints: dict[str, Any], 

114 selector: Selector[type, Any], 

115 /, 

116 *args: P.args, 

117 **kwargs: P.kwargs, 

118) -> T: 

119 injected_args = {} 

120 args_count = len(args) if inspect.ismethod(func) else len(args) - 1 

121 

122 for i, (k, v) in enumerate(hints.items()): 

123 if i < args_count or k in kwargs: 

124 continue 

125 if value := selector.get(unwrap_maybe(unwrap_optional(v))): 

126 injected_args[k] = value if is_maybe(v) else value.unwrap() 

127 elif is_maybe(v): 

128 injected_args[k] = Nothing 

129 elif is_optional(v): 

130 injected_args[k] = None 

131 

132 kwargs.update(injected_args) 

133 return func(*args, **kwargs) 

134 

135 

136def inject[T, **P](target: Callable[P, T] | type[T]) -> Callable[P, T]: 

137 hints = get_type_hints(target) 

138 hints.pop('return', None) 

139 

140 @wraps(target) 

141 def decorator(*args: P.args, **kwargs: P.kwargs) -> T: 

142 if container := getattr(target, '__container__', None): 

143 return invoke_injected(target, hints, container, *args, **kwargs) 

144 return target(*args, **kwargs) 

145 

146 _collector.add_target(target) 

147 return decorator