Coverage for little_loops / fsm / fragments.py: 96%
109 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Fragment library resolution for FSM loops.
3Implements parse-time expansion of ``fragment:`` references in loop YAML files.
4Fragment libraries define named partial state definitions that any loop can import
5and reference, eliminating copy-paste duplication across loop files.
7Fragment resolution happens before ``FSMLoop.from_dict`` is called, so the engine
8never sees ``fragment:`` keys.
10Example loop YAML::
12 import:
13 - lib/common.yaml
15 states:
16 lint:
17 fragment: shell_exit # inherits action_type + evaluate from fragment
18 action: "ruff check ."
19 on_yes: done
20 on_no: fix
22Example library YAML (``lib/common.yaml``)::
24 fragments:
25 shell_exit:
26 action_type: shell
27 evaluate:
28 type: exit_code
29"""
31from __future__ import annotations
33from pathlib import Path
34from typing import Any
36import yaml
38_BUILTIN_LOOPS_DIR = Path(__file__).parent.parent / "loops"
41def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
42 """Deep-merge two dicts; override keys win at every nesting level.
44 For nested dicts, recursively merges instead of replacing. For all other
45 value types (str, int, bool, list, None), the override value wins outright.
46 Returns a new dict; neither input is mutated.
48 Args:
49 base: Base dict (e.g. a fragment definition).
50 override: Override dict (e.g. state-level fields).
52 Returns:
53 New merged dict with override taking precedence.
54 """
55 result = dict(base)
56 for key, value in override.items():
57 if key in result and isinstance(result[key], dict) and isinstance(value, dict):
58 result[key] = _deep_merge(result[key], value)
59 else:
60 result[key] = value
61 return result
64def resolve_fragments(raw_loop_dict: dict[str, Any], loop_dir: Path) -> dict[str, Any]:
65 """Load fragment libraries, merge namespaces, and expand ``fragment:`` references.
67 Resolution steps:
68 1. Load each path in ``import:`` (relative to ``loop_dir``), collecting all
69 named fragments. Later imports override earlier imports for the same name.
70 2. Merge the loop's own ``fragments:`` block on top (local overrides imports).
71 3. For each state that has a ``fragment:`` key, deep-merge the named fragment
72 into the state dict (state-level keys win), then remove the ``fragment:`` key.
74 Returns a new dict with all ``fragment:`` keys expanded. The ``import:`` and
75 ``fragments:`` keys remain in the returned dict so callers can access them for
76 display or validation purposes.
78 Args:
79 raw_loop_dict: Raw YAML dict loaded from a loop file.
80 loop_dir: Directory containing the loop file; import paths are resolved
81 relative to this directory.
83 Returns:
84 New dict with fragment references expanded.
86 Raises:
87 FileNotFoundError: If an ``import:`` path does not exist relative to ``loop_dir``.
88 ValueError: If a state references a ``fragment:`` name that is not defined in
89 any imported library or the local ``fragments:`` block.
90 """
91 # Step 1: load imported fragment libraries
92 imported_fragments: dict[str, dict[str, Any]] = {}
93 for import_path in raw_loop_dict.get("import", []):
94 lib_path = loop_dir / import_path
95 if not lib_path.exists():
96 builtin_path = _BUILTIN_LOOPS_DIR / import_path
97 if builtin_path.exists():
98 lib_path = builtin_path
99 else:
100 raise FileNotFoundError(
101 f"Fragment library not found: {import_path} "
102 f"(checked '{loop_dir / import_path}' and '{builtin_path}')"
103 )
104 with open(lib_path) as f:
105 lib_data = yaml.safe_load(f)
106 if isinstance(lib_data, dict):
107 for name, frag in lib_data.get("fragments", {}).items():
108 imported_fragments[name] = frag
110 # Step 2: merge local fragments: block on top (local wins)
111 all_fragments: dict[str, dict[str, Any]] = {
112 **imported_fragments,
113 **raw_loop_dict.get("fragments", {}),
114 }
116 result = dict(raw_loop_dict)
117 states: dict[str, Any] = dict(result.get("states", {}))
119 # Step 3: if no states reference a fragment, return early (no-op)
120 if not any(isinstance(s, dict) and s.get("fragment") is not None for s in states.values()):
121 return result
123 for state_name, state_dict in states.items():
124 if not isinstance(state_dict, dict):
125 continue
126 fragment_name = state_dict.get("fragment")
127 if fragment_name is None:
128 continue
129 if fragment_name not in all_fragments:
130 available = ", ".join(sorted(all_fragments))
131 raise ValueError(
132 f"State '{state_name}': fragment '{fragment_name}' not found. "
133 f"Available fragments: {available or '(none)'}"
134 )
135 # Deep merge: fragment is the base, state fields override
136 # Strip description before merge — it is metadata, not a state field
137 frag_copy = dict(all_fragments[fragment_name])
138 frag_copy.pop("description", None)
139 merged = _deep_merge(frag_copy, state_dict)
140 del merged["fragment"] # consume the fragment: key
141 states[state_name] = merged
143 result["states"] = states
144 return result
147def resolve_inheritance(
148 raw_loop_dict: dict[str, Any],
149 loop_dir: Path,
150 _seen: tuple[str, ...] = (),
151) -> dict[str, Any]:
152 """Resolve ``from:`` template inheritance by deep-merging parent into child.
154 A loop YAML with ``from: <name>`` inherits all top-level fields and states
155 from the named parent loop. The child's own fields override the parent's at
156 every nesting level (per :func:`_deep_merge` semantics): scalars and lists
157 are replaced by the child, dicts are merged recursively. The ``from:`` key
158 itself is stripped from the returned dict.
160 Parent lookup uses :func:`little_loops.cli.loop._helpers.resolve_loop_path`,
161 which searches ``loop_dir`` first then falls back to the bundled built-in
162 loops directory. Cycles in the ``from:`` chain raise ``ValueError`` with the
163 full chain path; missing parents raise ``FileNotFoundError``.
165 Resolution must run *before* :func:`resolve_fragments` and *before* the
166 required-fields check in :func:`load_and_validate`, so a child can omit
167 fields its parent provides (including ``initial`` and ``states``) and so a
168 parent's ``import:``/``fragments:`` blocks survive into the merged result.
170 Args:
171 raw_loop_dict: Raw YAML dict loaded from a loop file.
172 loop_dir: Directory containing the (child) loop file; parent names are
173 resolved relative to this directory.
174 _seen: Internal tuple of parent names already visited during recursion;
175 used for cycle detection.
177 Returns:
178 New dict with ``from:`` resolved and stripped. If the input has no
179 ``from:`` key, returns it unchanged.
181 Raises:
182 ValueError: If ``from:`` is not a string, the parent is not a YAML
183 mapping, or a cycle is detected in the inheritance chain.
184 FileNotFoundError: If the parent loop name cannot be resolved.
185 """
186 if "from" not in raw_loop_dict:
187 return raw_loop_dict
189 parent_name = raw_loop_dict["from"]
190 if not isinstance(parent_name, str):
191 raise ValueError(f"`from:` must be a string, got {type(parent_name).__name__}")
193 if parent_name in _seen:
194 chain = " -> ".join(_seen + (parent_name,))
195 raise ValueError(f"Circular `from:` chain: {chain}")
197 # Lazy import to avoid circular import at module load (cli.loop._helpers
198 # imports from fsm.* indirectly). Mirrors the pattern used in
199 # fsm/executor.py:410 for sub-loop calls.
200 from little_loops.cli.loop._helpers import resolve_loop_path
202 parent_path = resolve_loop_path(parent_name, loop_dir)
203 with open(parent_path) as f:
204 parent_data = yaml.safe_load(f)
206 if not isinstance(parent_data, dict):
207 raise ValueError(
208 f"Parent loop '{parent_name}' is not a YAML mapping (got {type(parent_data).__name__})"
209 )
211 parent_data = resolve_inheritance(parent_data, parent_path.parent, _seen + (parent_name,))
213 child_without_from = {k: v for k, v in raw_loop_dict.items() if k != "from"}
214 merged = _deep_merge(parent_data, child_without_from)
215 merged.pop("from", None)
216 return merged
219def resolve_flow(raw_loop_dict: dict[str, Any]) -> dict[str, Any]:
220 """Expand ``flow:`` linear shorthand into a verbose ``states:`` map.
222 A loop YAML with ``flow: [<state>, ...]`` declares an ordered linear chain
223 of states. Each entry is either a bare name (unconditional forward
224 transition) or a ternary ``name?yes_target:no_target`` (conditional
225 branching). The last entry is implicitly ``terminal: true``.
227 Optional ``state_defs:`` supplies prompt/action/evaluate bodies that are
228 deep-merged into the generated state skeletons. If both ``flow:`` and
229 ``states:`` are present, raises ``ValueError`` — the two are mutually
230 exclusive.
232 Resolution runs *after* :func:`resolve_inheritance` (so a child can
233 override a parent's states with its own ``flow:``) and *before* the
234 required-fields check in :func:`load_and_validate` (so the expanded
235 ``states:`` key satisfies the validator).
237 Args:
238 raw_loop_dict: Raw YAML dict loaded from a loop file.
240 Returns:
241 New dict with ``flow:`` expanded to ``states:`` and both ``flow:``
242 and ``state_defs:`` stripped. If the input has no ``flow:`` key,
243 returns it unchanged.
245 Raises:
246 ValueError: If ``flow:`` is not a list, is empty, or contains a
247 malformed ternary entry.
248 """
249 if "flow" not in raw_loop_dict:
250 return raw_loop_dict
252 # If both flow: and states: are present, flow: takes precedence. This
253 # handles the case where states was inherited via `from:` — the child's
254 # explicit flow: overrides the parent's states.
255 #
257 flow = raw_loop_dict["flow"]
258 if not isinstance(flow, list):
259 raise ValueError(f"'flow:' must be a list, got {type(flow).__name__}")
260 if len(flow) < 1:
261 raise ValueError("'flow:' must contain at least one state")
263 state_defs: dict[str, dict[str, Any]] = raw_loop_dict.get("state_defs", {})
264 if not isinstance(state_defs, dict):
265 state_defs = {}
267 generated_states: dict[str, dict[str, Any]] = {}
269 # Pre-parse all entries to extract state names (strip ternary suffixes)
270 def _parse_name(raw: str) -> str:
271 return raw.split("?", 1)[0] if "?" in raw else raw
273 parsed_names = [_parse_name(e) if isinstance(e, str) else e for e in flow]
275 for i, entry in enumerate(flow):
276 is_last = i == len(flow) - 1
277 next_name = parsed_names[i + 1] if not is_last else None
279 if not isinstance(entry, str):
280 raise ValueError(f"'flow:' entry {i} must be a string, got {type(entry).__name__}")
282 if "?" in entry:
283 # Ternary form: name?yes_target:no_target
284 parts = entry.split("?", 1)
285 state_name = parts[0]
286 targets = parts[1]
288 if ":" not in targets or targets.endswith(":") or targets.startswith(":"):
289 raise ValueError(
290 f"Malformed ternary in flow entry '{entry}': must be name?yes_target:no_target"
291 )
292 yes_target, no_target = targets.split(":", 1)
293 if not yes_target or not no_target:
294 raise ValueError(
295 f"Malformed ternary in flow entry '{entry}': "
296 f"both yes and no targets must be non-empty"
297 )
299 skeleton: dict[str, Any] = {"on_yes": yes_target, "on_no": no_target}
300 else:
301 state_name = entry
302 skeleton = {}
303 if next_name is not None:
304 skeleton["next"] = next_name
306 if is_last:
307 skeleton["terminal"] = True
309 # Deep-merge state_defs body into generated skeleton
310 if state_name in state_defs:
311 skeleton = _deep_merge(skeleton, state_defs[state_name])
313 generated_states[state_name] = skeleton
315 result = {k: v for k, v in raw_loop_dict.items() if k not in ("flow", "state_defs")}
316 result["states"] = generated_states
317 return result