Coverage for greyhorse/app/private/functional/linker.py: 92%

88 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-05-12 22:07 +0300

1from __future__ import annotations 

2 

3from collections import deque 

4from collections.abc import Callable, Collection, Iterable, Mapping 

5from dataclasses import dataclass 

6from enum import IntEnum 

7 

8from greyhorse.result import Err, Ok, Result 

9 

10from ..fragment import Fragment 

11from ..resolving.schemas import PlanAction, PlanResolveError, ResolvePlan 

12 

13 

14@dataclass(slots=True, frozen=True, kw_only=True) 

15class Linkage: 

16 """Result of cross-fragment dependency analysis. 

17 

18 ``resolved``: types mapped to (fragment, plan) pairs in discovery order. 

19 ``order``: types in topological execution order (dependencies first). 

20 ``pending``: types not found in any fragment (require external values). 

21 """ 

22 

23 resolved: Mapping[type, tuple[Fragment, ResolvePlan]] 

24 order: tuple[type, ...] 

25 pending: tuple[PlanAction, ...] 

26 

27 

28def _has_factory_call(plan: ResolvePlan, t: type) -> bool: 

29 return any( 

30 isinstance(a, PlanAction.FactoryCall) and a.target_type is t for a in plan.actions 

31 ) 

32 

33 

34class FragmentLinker: 

35 """Analyzes cross-fragment dependencies and produces a topologically sorted Linkage. 

36 

37 Pure analyzer — does not create instances, touch buckets, or call invoke_sync. 

38 """ 

39 

40 __slots__ = ('_fragments',) 

41 

42 def __init__(self, fragments: Iterable[Fragment]) -> None: 

43 self._fragments = list(fragments) 

44 

45 def link( 

46 self, 

47 requested: Collection[type], 

48 optional: Collection[type] = (), 

49 from_scope: IntEnum | None = None, 

50 ) -> Result[Linkage, PlanResolveError]: 

51 resolved, pending, err = self._collect(requested, optional, from_scope) 

52 if err is not None: 

53 return err.to_result() 

54 

55 order_result = self._toposort(resolved) 

56 if order_result is None: 

57 cycle_types = ', '.join(t.__qualname__ for t in resolved) 

58 return PlanResolveError.CyclicDependency(types=cycle_types).to_result() 

59 

60 return Ok(Linkage(resolved=resolved, order=tuple(order_result), pending=tuple(pending))) 

61 

62 def _collect( 

63 self, 

64 requested: Collection[type], 

65 optional: Collection[type], 

66 from_scope: IntEnum | None, 

67 ) -> tuple[ 

68 dict[type, tuple[Fragment, ResolvePlan]], list[PlanAction], PlanResolveError | None 

69 ]: 

70 resolved: dict[type, tuple[Fragment, ResolvePlan]] = {} 

71 pending: list[PlanAction] = [] 

72 required_types: set[type] = set(requested) 

73 queue = list(requested) + [t for t in optional if t not in required_types] 

74 

75 while queue: 

76 t = queue.pop() 

77 if t in resolved: 

78 continue 

79 

80 fragment_found = False 

81 first_error: PlanResolveError | None = None 

82 

83 for frag in self._fragments: 

84 match frag.compile_plan(t, from_scope): 

85 case Err(err): 

86 if first_error is None: 

87 first_error = err 

88 continue 

89 case Ok(plan) if not plan.actions: 

90 continue 

91 case Ok(plan) if not _has_factory_call(plan, t): 

92 continue 

93 case Ok(plan): 

94 pass 

95 case _: 

96 continue 

97 

98 resolved[t] = (frag, plan) 

99 fragment_found = True 

100 

101 for action in plan.actions: 

102 if ( 

103 isinstance(action, PlanAction.ProviderCall) 

104 and action.target_type not in resolved 

105 ): 

106 queue.append(action.target_type) 

107 if not action.optional: 

108 required_types.add(action.target_type) 

109 break 

110 

111 if not fragment_found and t in required_types: 

112 if first_error is not None: 

113 return resolved, pending, first_error 

114 pending.append( 

115 PlanAction.ProviderCall( 

116 target_type=t, signature=Callable[[], t], optional=False 

117 ) 

118 ) 

119 

120 return resolved, pending, None 

121 

122 def _toposort( 

123 self, resolved: dict[type, tuple[Fragment, ResolvePlan]] 

124 ) -> list[type] | None: 

125 deps: dict[type, set[type]] = {} 

126 for t, (_, plan) in resolved.items(): 

127 deps[t] = set() 

128 for action in plan.actions: 

129 if ( 

130 isinstance(action, PlanAction.ProviderCall) 

131 and action.target_type in resolved 

132 ): 

133 deps[t].add(action.target_type) 

134 

135 in_degree = {t: len(d) for t, d in deps.items()} 

136 zero_queue: deque[type] = deque(t for t, d in in_degree.items() if d == 0) 

137 order: list[type] = [] 

138 

139 while zero_queue: 

140 t = zero_queue.popleft() 

141 order.append(t) 

142 for other, other_deps in deps.items(): 

143 if t in other_deps: 

144 in_degree[other] -= 1 

145 if in_degree[other] == 0: 

146 zero_queue.append(other) 

147 

148 if len(order) < len(resolved): 

149 return None 

150 

151 return order 

152 

153 

154__all__ = ['FragmentLinker', 'Linkage']