Coverage for src / invariant / yaml_serialization.py: 84.14%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-06 12:18 +0000

1"""YAML authoring format for Invariant graphs. 

2 

3YAML is a human-editable input format over the existing graph JSON model. It is 

4load-only: canonical serialization remains JSON through graph_serialization.py. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10from typing import Any 

11 

12from invariant.graph import Graph 

13from invariant.graph_serialization import ( 

14 dump_graph_to_dict, 

15 load_graph_document_from_dict, 

16 load_graph_from_dict, 

17) 

18 

19YAML_INSTALL_GUIDANCE = ( 

20 "YAML graph loading requires PyYAML. Install it with: " 

21 "pip install invariant-core[yaml]" 

22) 

23RESOURCES_INSTALL_GUIDANCE = ( 

24 "YAML resource subgraph loading requires JustMyResource. Install it with: " 

25 "pip install invariant-core[resources]" 

26) 

27 

28JSON_GRAPH_CONTENT_TYPES = frozenset( 

29 {"application/vnd.invariant.graph+json", "application/json"} 

30) 

31YAML_GRAPH_CONTENT_TYPES = frozenset( 

32 { 

33 "application/vnd.invariant.graph+yaml", 

34 "application/yaml", 

35 "text/yaml", 

36 "application/x-yaml", 

37 } 

38) 

39YAML_GRAPH_SUFFIXES = (".yaml", ".yml") 

40 

41 

42def _require_yaml() -> Any: 

43 try: 

44 import yaml 

45 except ImportError as exc: 

46 raise RuntimeError(YAML_INSTALL_GUIDANCE) from exc 

47 return yaml 

48 

49 

50def _require_resource_registry() -> Any: 

51 try: 

52 from justmyresource import get_default_registry 

53 except ImportError as exc: 

54 raise RuntimeError(RESOURCES_INSTALL_GUIDANCE) from exc 

55 return get_default_registry() 

56 

57 

58def _construct_ref(loader: Any, node: Any) -> dict[str, str]: 

59 return {"$ref": loader.construct_scalar(node)} 

60 

61 

62def _construct_cel(loader: Any, node: Any) -> dict[str, str]: 

63 return {"$cel": loader.construct_scalar(node)} 

64 

65 

66def _construct_decimal(loader: Any, node: Any) -> dict[str, str]: 

67 return {"$decimal": loader.construct_scalar(node)} 

68 

69 

70def _construct_tuple(loader: Any, node: Any) -> dict[str, list[Any]]: 

71 if node.id != "sequence": 

72 raise ValueError("!tuple value must be a sequence") 

73 return {"$tuple": loader.construct_sequence(node, deep=True)} 

74 

75 

76def _construct_plain_value(loader: Any, node: Any) -> Any: 

77 if node.id == "scalar": 

78 return loader.construct_scalar(node) 

79 if node.id == "sequence": 

80 return loader.construct_sequence(node, deep=True) 

81 if node.id == "mapping": 

82 return loader.construct_mapping(node, deep=True) 

83 return loader.construct_object(node, deep=True) 

84 

85 

86def _construct_literal(loader: Any, node: Any) -> dict[str, Any]: 

87 return {"$literal": _construct_plain_value(loader, node)} 

88 

89 

90def _construct_icacheable(loader: Any, node: Any) -> dict[str, dict[str, Any]]: 

91 if node.id != "mapping": 

92 raise ValueError("!icacheable value must be a mapping") 

93 return {"$icacheable": loader.construct_mapping(node, deep=True)} 

94 

95 

96def _normalize_content_type(content_type: Any) -> str | None: 

97 if not isinstance(content_type, str) or not content_type.strip(): 

98 return None 

99 return content_type.split(";", 1)[0].strip().lower() 

100 

101 

102def _detect_resource_document_format( 

103 resource_name: str, content_type: Any 

104) -> str: 

105 normalized = _normalize_content_type(content_type) 

106 if normalized in JSON_GRAPH_CONTENT_TYPES: 

107 return "json" 

108 if normalized in YAML_GRAPH_CONTENT_TYPES: 

109 return "yaml" 

110 if resource_name.lower().endswith(YAML_GRAPH_SUFFIXES): 

111 return "yaml" 

112 content_type_label = normalized if normalized is not None else "<missing>" 

113 raise ValueError( 

114 f"YAML subgraph resource {resource_name!r} has unsupported content type " 

115 f"{content_type_label!r}" 

116 ) 

117 

118 

119def _resource_text(resource: Any, resource_name: str) -> str: 

120 text = getattr(resource, "text", None) 

121 if isinstance(text, str): 

122 return text 

123 

124 data = getattr(resource, "data", None) 

125 if isinstance(data, str): 

126 return data 

127 if isinstance(data, bytes): 

128 encoding = getattr(resource, "encoding", None) or "utf-8" 

129 return data.decode(encoding) 

130 

131 raise ValueError(f"YAML subgraph resource {resource_name!r} has no text data") 

132 

133 

134def _load_resource_document( 

135 resource_name: str, resource_stack: tuple[str, ...] 

136) -> tuple[Graph, str | None]: 

137 if resource_name in resource_stack: 

138 cycle = " -> ".join((*resource_stack, resource_name)) 

139 raise ValueError(f"YAML subgraph resource include cycle detected: {cycle}") 

140 

141 registry = _require_resource_registry() 

142 try: 

143 resource = registry.get_resource(resource_name) 

144 except Exception as exc: 

145 raise ValueError( 

146 f"YAML subgraph resource {resource_name!r} could not be resolved: {exc}" 

147 ) from exc 

148 

149 document_format = _detect_resource_document_format( 

150 resource_name, getattr(resource, "content_type", None) 

151 ) 

152 text = _resource_text(resource, resource_name) 

153 

154 if document_format == "json": 

155 obj = json.loads(text) 

156 if not isinstance(obj, dict): 

157 raise ValueError( 

158 f"YAML subgraph resource {resource_name!r} document must be an object" 

159 ) 

160 else: 

161 obj = _load_yaml_document( 

162 text, 

163 resource_stack=(*resource_stack, resource_name), 

164 ) 

165 

166 return load_graph_document_from_dict(obj) 

167 

168 

169def _validate_subgraph_resource_mapping(mapping: dict[str, Any]) -> None: 

170 allowed_keys = {"resource", "deps", "params", "output"} 

171 unknown_keys = set(mapping) - allowed_keys 

172 if unknown_keys: 

173 raise ValueError( 

174 "!subgraph has unsupported keys: " + ", ".join(sorted(unknown_keys)) 

175 ) 

176 

177 for key in ("resource", "deps", "params"): 

178 if key not in mapping: 

179 raise ValueError(f"!subgraph must have '{key}'") 

180 

181 resource = mapping["resource"] 

182 if not isinstance(resource, str) or not resource: 

183 raise ValueError("!subgraph 'resource' must be a non-empty string") 

184 

185 deps = mapping["deps"] 

186 if not isinstance(deps, list): 

187 raise ValueError("!subgraph 'deps' must be a list") 

188 for i, dep in enumerate(deps): 

189 if not isinstance(dep, str): 

190 raise ValueError( 

191 f"!subgraph deps[{i}] must be string, got {type(dep).__name__}" 

192 ) 

193 

194 if not isinstance(mapping["params"], dict): 

195 raise ValueError("!subgraph 'params' must be a mapping") 

196 

197 output = mapping.get("output") 

198 if "output" in mapping and (not isinstance(output, str) or not output): 

199 raise ValueError("!subgraph 'output' must be a non-empty string when present") 

200 

201 

202def _construct_subgraph_resource(loader: Any, node: Any) -> dict[str, Any]: 

203 if node.id != "mapping": 

204 raise ValueError("!subgraph value must be a mapping") 

205 

206 mapping = loader.construct_mapping(node, deep=True) 

207 _validate_subgraph_resource_mapping(mapping) 

208 

209 graph, document_output = _load_resource_document( 

210 mapping["resource"], 

211 getattr(loader, "resource_stack", ()), 

212 ) 

213 output = mapping.get("output", document_output) 

214 if output is None: 

215 raise ValueError( 

216 f"!subgraph resource {mapping['resource']!r} has no default output; " 

217 "provide 'output'" 

218 ) 

219 

220 resource_document = dump_graph_to_dict(graph, output=output) 

221 return { 

222 "kind": "subgraph", 

223 "deps": list(mapping["deps"]), 

224 "params": dict(mapping["params"]), 

225 "graph": resource_document["graph"], 

226 "output": output, 

227 } 

228 

229 

230def _make_loader(yaml: Any, resource_stack: tuple[str, ...] = ()) -> type: 

231 class InvariantYamlLoader(yaml.SafeLoader): 

232 pass 

233 

234 InvariantYamlLoader.resource_stack = resource_stack 

235 InvariantYamlLoader.add_constructor("!ref", _construct_ref) 

236 InvariantYamlLoader.add_constructor("!cel", _construct_cel) 

237 InvariantYamlLoader.add_constructor("!decimal", _construct_decimal) 

238 InvariantYamlLoader.add_constructor("!tuple", _construct_tuple) 

239 InvariantYamlLoader.add_constructor("!literal", _construct_literal) 

240 InvariantYamlLoader.add_constructor("!icacheable", _construct_icacheable) 

241 InvariantYamlLoader.add_constructor("!subgraph", _construct_subgraph_resource) 

242 return InvariantYamlLoader 

243 

244 

245def _load_yaml_document( 

246 data: str | bytes, resource_stack: tuple[str, ...] = () 

247) -> dict[str, Any]: 

248 yaml = _require_yaml() 

249 if isinstance(data, bytes): 

250 data = data.decode("utf-8") 

251 

252 obj = yaml.load(data, Loader=_make_loader(yaml, resource_stack)) 

253 if not isinstance(obj, dict): 

254 raise ValueError("YAML graph document must be an object") 

255 return obj 

256 

257 

258def load_graph_yaml( 

259 data: str | bytes, legacy_kind_inference: bool = False 

260) -> Graph: 

261 """Load a graph envelope from YAML. 

262 

263 YAML tags map to the same marker objects as the JSON graph model: 

264 ``!ref``, ``!cel``, ``!decimal``, ``!tuple``, ``!literal``, and 

265 ``!icacheable``. The YAML-only ``!subgraph`` vertex tag is resolved to a 

266 normal SubGraphNode before graph validation. 

267 """ 

268 

269 return load_graph_from_dict(_load_yaml_document(data), legacy_kind_inference) 

270 

271 

272def load_graph_document_yaml( 

273 data: str | bytes, legacy_kind_inference: bool = False 

274) -> tuple[Graph, str | None]: 

275 """Load a graph document from YAML, preserving optional output.""" 

276 

277 return load_graph_document_from_dict( 

278 _load_yaml_document(data), legacy_kind_inference 

279 )