Coverage for little_loops / fsm / fragments.py: 96%

109 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Fragment library resolution for FSM loops. 

2 

3Implements parse-time expansion of ``fragment:`` references in loop YAML files. 

4Fragment libraries define named partial state definitions that any loop can import 

5and reference, eliminating copy-paste duplication across loop files. 

6 

7Fragment resolution happens before ``FSMLoop.from_dict`` is called, so the engine 

8never sees ``fragment:`` keys. 

9 

10Example loop YAML:: 

11 

12 import: 

13 - lib/common.yaml 

14 

15 states: 

16 lint: 

17 fragment: shell_exit # inherits action_type + evaluate from fragment 

18 action: "ruff check ." 

19 on_yes: done 

20 on_no: fix 

21 

22Example library YAML (``lib/common.yaml``):: 

23 

24 fragments: 

25 shell_exit: 

26 action_type: shell 

27 evaluate: 

28 type: exit_code 

29""" 

30 

31from __future__ import annotations 

32 

33from pathlib import Path 

34from typing import Any 

35 

36import yaml 

37 

38_BUILTIN_LOOPS_DIR = Path(__file__).parent.parent / "loops" 

39 

40 

41def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: 

42 """Deep-merge two dicts; override keys win at every nesting level. 

43 

44 For nested dicts, recursively merges instead of replacing. For all other 

45 value types (str, int, bool, list, None), the override value wins outright. 

46 Returns a new dict; neither input is mutated. 

47 

48 Args: 

49 base: Base dict (e.g. a fragment definition). 

50 override: Override dict (e.g. state-level fields). 

51 

52 Returns: 

53 New merged dict with override taking precedence. 

54 """ 

55 result = dict(base) 

56 for key, value in override.items(): 

57 if key in result and isinstance(result[key], dict) and isinstance(value, dict): 

58 result[key] = _deep_merge(result[key], value) 

59 else: 

60 result[key] = value 

61 return result 

62 

63 

64def resolve_fragments(raw_loop_dict: dict[str, Any], loop_dir: Path) -> dict[str, Any]: 

65 """Load fragment libraries, merge namespaces, and expand ``fragment:`` references. 

66 

67 Resolution steps: 

68 1. Load each path in ``import:`` (relative to ``loop_dir``), collecting all 

69 named fragments. Later imports override earlier imports for the same name. 

70 2. Merge the loop's own ``fragments:`` block on top (local overrides imports). 

71 3. For each state that has a ``fragment:`` key, deep-merge the named fragment 

72 into the state dict (state-level keys win), then remove the ``fragment:`` key. 

73 

74 Returns a new dict with all ``fragment:`` keys expanded. The ``import:`` and 

75 ``fragments:`` keys remain in the returned dict so callers can access them for 

76 display or validation purposes. 

77 

78 Args: 

79 raw_loop_dict: Raw YAML dict loaded from a loop file. 

80 loop_dir: Directory containing the loop file; import paths are resolved 

81 relative to this directory. 

82 

83 Returns: 

84 New dict with fragment references expanded. 

85 

86 Raises: 

87 FileNotFoundError: If an ``import:`` path does not exist relative to ``loop_dir``. 

88 ValueError: If a state references a ``fragment:`` name that is not defined in 

89 any imported library or the local ``fragments:`` block. 

90 """ 

91 # Step 1: load imported fragment libraries 

92 imported_fragments: dict[str, dict[str, Any]] = {} 

93 for import_path in raw_loop_dict.get("import", []): 

94 lib_path = loop_dir / import_path 

95 if not lib_path.exists(): 

96 builtin_path = _BUILTIN_LOOPS_DIR / import_path 

97 if builtin_path.exists(): 

98 lib_path = builtin_path 

99 else: 

100 raise FileNotFoundError( 

101 f"Fragment library not found: {import_path} " 

102 f"(checked '{loop_dir / import_path}' and '{builtin_path}')" 

103 ) 

104 with open(lib_path) as f: 

105 lib_data = yaml.safe_load(f) 

106 if isinstance(lib_data, dict): 

107 for name, frag in lib_data.get("fragments", {}).items(): 

108 imported_fragments[name] = frag 

109 

110 # Step 2: merge local fragments: block on top (local wins) 

111 all_fragments: dict[str, dict[str, Any]] = { 

112 **imported_fragments, 

113 **raw_loop_dict.get("fragments", {}), 

114 } 

115 

116 result = dict(raw_loop_dict) 

117 states: dict[str, Any] = dict(result.get("states", {})) 

118 

119 # Step 3: if no states reference a fragment, return early (no-op) 

120 if not any(isinstance(s, dict) and s.get("fragment") is not None for s in states.values()): 

121 return result 

122 

123 for state_name, state_dict in states.items(): 

124 if not isinstance(state_dict, dict): 

125 continue 

126 fragment_name = state_dict.get("fragment") 

127 if fragment_name is None: 

128 continue 

129 if fragment_name not in all_fragments: 

130 available = ", ".join(sorted(all_fragments)) 

131 raise ValueError( 

132 f"State '{state_name}': fragment '{fragment_name}' not found. " 

133 f"Available fragments: {available or '(none)'}" 

134 ) 

135 # Deep merge: fragment is the base, state fields override 

136 # Strip description before merge — it is metadata, not a state field 

137 frag_copy = dict(all_fragments[fragment_name]) 

138 frag_copy.pop("description", None) 

139 merged = _deep_merge(frag_copy, state_dict) 

140 del merged["fragment"] # consume the fragment: key 

141 states[state_name] = merged 

142 

143 result["states"] = states 

144 return result 

145 

146 

147def resolve_inheritance( 

148 raw_loop_dict: dict[str, Any], 

149 loop_dir: Path, 

150 _seen: tuple[str, ...] = (), 

151) -> dict[str, Any]: 

152 """Resolve ``from:`` template inheritance by deep-merging parent into child. 

153 

154 A loop YAML with ``from: <name>`` inherits all top-level fields and states 

155 from the named parent loop. The child's own fields override the parent's at 

156 every nesting level (per :func:`_deep_merge` semantics): scalars and lists 

157 are replaced by the child, dicts are merged recursively. The ``from:`` key 

158 itself is stripped from the returned dict. 

159 

160 Parent lookup uses :func:`little_loops.cli.loop._helpers.resolve_loop_path`, 

161 which searches ``loop_dir`` first then falls back to the bundled built-in 

162 loops directory. Cycles in the ``from:`` chain raise ``ValueError`` with the 

163 full chain path; missing parents raise ``FileNotFoundError``. 

164 

165 Resolution must run *before* :func:`resolve_fragments` and *before* the 

166 required-fields check in :func:`load_and_validate`, so a child can omit 

167 fields its parent provides (including ``initial`` and ``states``) and so a 

168 parent's ``import:``/``fragments:`` blocks survive into the merged result. 

169 

170 Args: 

171 raw_loop_dict: Raw YAML dict loaded from a loop file. 

172 loop_dir: Directory containing the (child) loop file; parent names are 

173 resolved relative to this directory. 

174 _seen: Internal tuple of parent names already visited during recursion; 

175 used for cycle detection. 

176 

177 Returns: 

178 New dict with ``from:`` resolved and stripped. If the input has no 

179 ``from:`` key, returns it unchanged. 

180 

181 Raises: 

182 ValueError: If ``from:`` is not a string, the parent is not a YAML 

183 mapping, or a cycle is detected in the inheritance chain. 

184 FileNotFoundError: If the parent loop name cannot be resolved. 

185 """ 

186 if "from" not in raw_loop_dict: 

187 return raw_loop_dict 

188 

189 parent_name = raw_loop_dict["from"] 

190 if not isinstance(parent_name, str): 

191 raise ValueError(f"`from:` must be a string, got {type(parent_name).__name__}") 

192 

193 if parent_name in _seen: 

194 chain = " -> ".join(_seen + (parent_name,)) 

195 raise ValueError(f"Circular `from:` chain: {chain}") 

196 

197 # Lazy import to avoid circular import at module load (cli.loop._helpers 

198 # imports from fsm.* indirectly). Mirrors the pattern used in 

199 # fsm/executor.py:410 for sub-loop calls. 

200 from little_loops.cli.loop._helpers import resolve_loop_path 

201 

202 parent_path = resolve_loop_path(parent_name, loop_dir) 

203 with open(parent_path) as f: 

204 parent_data = yaml.safe_load(f) 

205 

206 if not isinstance(parent_data, dict): 

207 raise ValueError( 

208 f"Parent loop '{parent_name}' is not a YAML mapping (got {type(parent_data).__name__})" 

209 ) 

210 

211 parent_data = resolve_inheritance(parent_data, parent_path.parent, _seen + (parent_name,)) 

212 

213 child_without_from = {k: v for k, v in raw_loop_dict.items() if k != "from"} 

214 merged = _deep_merge(parent_data, child_without_from) 

215 merged.pop("from", None) 

216 return merged 

217 

218 

219def resolve_flow(raw_loop_dict: dict[str, Any]) -> dict[str, Any]: 

220 """Expand ``flow:`` linear shorthand into a verbose ``states:`` map. 

221 

222 A loop YAML with ``flow: [<state>, ...]`` declares an ordered linear chain 

223 of states. Each entry is either a bare name (unconditional forward 

224 transition) or a ternary ``name?yes_target:no_target`` (conditional 

225 branching). The last entry is implicitly ``terminal: true``. 

226 

227 Optional ``state_defs:`` supplies prompt/action/evaluate bodies that are 

228 deep-merged into the generated state skeletons. If both ``flow:`` and 

229 ``states:`` are present, raises ``ValueError`` — the two are mutually 

230 exclusive. 

231 

232 Resolution runs *after* :func:`resolve_inheritance` (so a child can 

233 override a parent's states with its own ``flow:``) and *before* the 

234 required-fields check in :func:`load_and_validate` (so the expanded 

235 ``states:`` key satisfies the validator). 

236 

237 Args: 

238 raw_loop_dict: Raw YAML dict loaded from a loop file. 

239 

240 Returns: 

241 New dict with ``flow:`` expanded to ``states:`` and both ``flow:`` 

242 and ``state_defs:`` stripped. If the input has no ``flow:`` key, 

243 returns it unchanged. 

244 

245 Raises: 

246 ValueError: If ``flow:`` is not a list, is empty, or contains a 

247 malformed ternary entry. 

248 """ 

249 if "flow" not in raw_loop_dict: 

250 return raw_loop_dict 

251 

252 # If both flow: and states: are present, flow: takes precedence. This 

253 # handles the case where states was inherited via `from:` — the child's 

254 # explicit flow: overrides the parent's states. 

255 # 

256 

257 flow = raw_loop_dict["flow"] 

258 if not isinstance(flow, list): 

259 raise ValueError(f"'flow:' must be a list, got {type(flow).__name__}") 

260 if len(flow) < 1: 

261 raise ValueError("'flow:' must contain at least one state") 

262 

263 state_defs: dict[str, dict[str, Any]] = raw_loop_dict.get("state_defs", {}) 

264 if not isinstance(state_defs, dict): 

265 state_defs = {} 

266 

267 generated_states: dict[str, dict[str, Any]] = {} 

268 

269 # Pre-parse all entries to extract state names (strip ternary suffixes) 

270 def _parse_name(raw: str) -> str: 

271 return raw.split("?", 1)[0] if "?" in raw else raw 

272 

273 parsed_names = [_parse_name(e) if isinstance(e, str) else e for e in flow] 

274 

275 for i, entry in enumerate(flow): 

276 is_last = i == len(flow) - 1 

277 next_name = parsed_names[i + 1] if not is_last else None 

278 

279 if not isinstance(entry, str): 

280 raise ValueError(f"'flow:' entry {i} must be a string, got {type(entry).__name__}") 

281 

282 if "?" in entry: 

283 # Ternary form: name?yes_target:no_target 

284 parts = entry.split("?", 1) 

285 state_name = parts[0] 

286 targets = parts[1] 

287 

288 if ":" not in targets or targets.endswith(":") or targets.startswith(":"): 

289 raise ValueError( 

290 f"Malformed ternary in flow entry '{entry}': must be name?yes_target:no_target" 

291 ) 

292 yes_target, no_target = targets.split(":", 1) 

293 if not yes_target or not no_target: 

294 raise ValueError( 

295 f"Malformed ternary in flow entry '{entry}': " 

296 f"both yes and no targets must be non-empty" 

297 ) 

298 

299 skeleton: dict[str, Any] = {"on_yes": yes_target, "on_no": no_target} 

300 else: 

301 state_name = entry 

302 skeleton = {} 

303 if next_name is not None: 

304 skeleton["next"] = next_name 

305 

306 if is_last: 

307 skeleton["terminal"] = True 

308 

309 # Deep-merge state_defs body into generated skeleton 

310 if state_name in state_defs: 

311 skeleton = _deep_merge(skeleton, state_defs[state_name]) 

312 

313 generated_states[state_name] = skeleton 

314 

315 result = {k: v for k, v in raw_loop_dict.items() if k not in ("flow", "state_defs")} 

316 result["states"] = generated_states 

317 return result