Coverage for greyhorse/app/private/functional/linker.py: 92%
88 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-05-12 22:07 +0300
« prev ^ index » next coverage.py v7.11.3, created at 2026-05-12 22:07 +0300
1from __future__ import annotations
3from collections import deque
4from collections.abc import Callable, Collection, Iterable, Mapping
5from dataclasses import dataclass
6from enum import IntEnum
8from greyhorse.result import Err, Ok, Result
10from ..fragment import Fragment
11from ..resolving.schemas import PlanAction, PlanResolveError, ResolvePlan
14@dataclass(slots=True, frozen=True, kw_only=True)
15class Linkage:
16 """Result of cross-fragment dependency analysis.
18 ``resolved``: types mapped to (fragment, plan) pairs in discovery order.
19 ``order``: types in topological execution order (dependencies first).
20 ``pending``: types not found in any fragment (require external values).
21 """
23 resolved: Mapping[type, tuple[Fragment, ResolvePlan]]
24 order: tuple[type, ...]
25 pending: tuple[PlanAction, ...]
28def _has_factory_call(plan: ResolvePlan, t: type) -> bool:
29 return any(
30 isinstance(a, PlanAction.FactoryCall) and a.target_type is t for a in plan.actions
31 )
34class FragmentLinker:
35 """Analyzes cross-fragment dependencies and produces a topologically sorted Linkage.
37 Pure analyzer — does not create instances, touch buckets, or call invoke_sync.
38 """
40 __slots__ = ('_fragments',)
42 def __init__(self, fragments: Iterable[Fragment]) -> None:
43 self._fragments = list(fragments)
45 def link(
46 self,
47 requested: Collection[type],
48 optional: Collection[type] = (),
49 from_scope: IntEnum | None = None,
50 ) -> Result[Linkage, PlanResolveError]:
51 resolved, pending, err = self._collect(requested, optional, from_scope)
52 if err is not None:
53 return err.to_result()
55 order_result = self._toposort(resolved)
56 if order_result is None:
57 cycle_types = ', '.join(t.__qualname__ for t in resolved)
58 return PlanResolveError.CyclicDependency(types=cycle_types).to_result()
60 return Ok(Linkage(resolved=resolved, order=tuple(order_result), pending=tuple(pending)))
62 def _collect(
63 self,
64 requested: Collection[type],
65 optional: Collection[type],
66 from_scope: IntEnum | None,
67 ) -> tuple[
68 dict[type, tuple[Fragment, ResolvePlan]], list[PlanAction], PlanResolveError | None
69 ]:
70 resolved: dict[type, tuple[Fragment, ResolvePlan]] = {}
71 pending: list[PlanAction] = []
72 required_types: set[type] = set(requested)
73 queue = list(requested) + [t for t in optional if t not in required_types]
75 while queue:
76 t = queue.pop()
77 if t in resolved:
78 continue
80 fragment_found = False
81 first_error: PlanResolveError | None = None
83 for frag in self._fragments:
84 match frag.compile_plan(t, from_scope):
85 case Err(err):
86 if first_error is None:
87 first_error = err
88 continue
89 case Ok(plan) if not plan.actions:
90 continue
91 case Ok(plan) if not _has_factory_call(plan, t):
92 continue
93 case Ok(plan):
94 pass
95 case _:
96 continue
98 resolved[t] = (frag, plan)
99 fragment_found = True
101 for action in plan.actions:
102 if (
103 isinstance(action, PlanAction.ProviderCall)
104 and action.target_type not in resolved
105 ):
106 queue.append(action.target_type)
107 if not action.optional:
108 required_types.add(action.target_type)
109 break
111 if not fragment_found and t in required_types:
112 if first_error is not None:
113 return resolved, pending, first_error
114 pending.append(
115 PlanAction.ProviderCall(
116 target_type=t, signature=Callable[[], t], optional=False
117 )
118 )
120 return resolved, pending, None
122 def _toposort(
123 self, resolved: dict[type, tuple[Fragment, ResolvePlan]]
124 ) -> list[type] | None:
125 deps: dict[type, set[type]] = {}
126 for t, (_, plan) in resolved.items():
127 deps[t] = set()
128 for action in plan.actions:
129 if (
130 isinstance(action, PlanAction.ProviderCall)
131 and action.target_type in resolved
132 ):
133 deps[t].add(action.target_type)
135 in_degree = {t: len(d) for t, d in deps.items()}
136 zero_queue: deque[type] = deque(t for t, d in in_degree.items() if d == 0)
137 order: list[type] = []
139 while zero_queue:
140 t = zero_queue.popleft()
141 order.append(t)
142 for other, other_deps in deps.items():
143 if t in other_deps:
144 in_degree[other] -= 1
145 if in_degree[other] == 0:
146 zero_queue.append(other)
148 if len(order) < len(resolved):
149 return None
151 return order
154__all__ = ['FragmentLinker', 'Linkage']