Coverage for src / invariant / cli.py: 0.00%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-03 19:45 +0000

1"""Command-line interface for executing serialized Invariant graphs.""" 

2 

3import argparse 

4import json 

5import sys 

6from dataclasses import dataclass 

7from pathlib import Path 

8from typing import Any, TextIO 

9 

10from invariant.executor import Executor 

11from invariant.graph import Graph 

12from invariant.graph_serialization import ( 

13 FORMAT_ID, 

14 dump_value_to_jsonable, 

15 load_graph_from_dict, 

16 load_graph_output_from_dict, 

17 load_value_from_jsonable, 

18) 

19from invariant.protocol import ICacheable 

20from invariant.registry import OpRegistry 

21from invariant.store.null import NullStore 

22 

23 

24@dataclass(frozen=True) 

25class _CliOutput: 

26 value: Any 

27 is_context: bool 

28 selected_key: str | None 

29 

30 

31def _build_parser() -> argparse.ArgumentParser: 

32 parser = argparse.ArgumentParser( 

33 prog="invariant", 

34 description="Execute a serialized Invariant graph and emit JSON results.", 

35 ) 

36 parser.add_argument( 

37 "graph", 

38 nargs="?", 

39 default="-", 

40 help="Path to graph JSON. Reads stdin when omitted or '-'.", 

41 ) 

42 parser.add_argument( 

43 "--context", 

44 metavar="CONTEXT_FILE", 

45 help="Path to a JSON object containing external context values.", 

46 ) 

47 parser.add_argument( 

48 "--param", 

49 action="append", 

50 default=[], 

51 metavar="KEY=VALUE", 

52 help=( 

53 "Override or add one external context value. VALUE accepts JSON " 

54 "scalars/objects, Invariant JSON markers, and bare strings. " 

55 "Missing external graph dependencies are supplied as null." 

56 ), 

57 ) 

58 parser.add_argument( 

59 "--pick", 

60 metavar="KEY", 

61 help="Emit only one key from the execution result.", 

62 ) 

63 parser.add_argument( 

64 "--all", 

65 action="store_true", 

66 help="Emit the full execution context.", 

67 ) 

68 parser.add_argument( 

69 "--pretty", 

70 action="store_true", 

71 help="Emit indented JSON.", 

72 ) 

73 parser.add_argument( 

74 "-o", 

75 "--output", 

76 metavar="FILE", 

77 help="Write output to FILE instead of stdout.", 

78 ) 

79 parser.add_argument( 

80 "--output-format", 

81 choices=["auto", "json", "binary"], 

82 default="auto", 

83 help=( 

84 "File output format. auto writes selected ICacheable values as binary " 

85 "and everything else as JSON." 

86 ), 

87 ) 

88 return parser 

89 

90 

91def _read_graph_arg(graph_arg: str, stdin: TextIO) -> str: 

92 if graph_arg == "-": 

93 return stdin.read() 

94 return Path(graph_arg).read_text(encoding="utf-8") 

95 

96 

97def _load_input_document(data: str) -> tuple[Graph, str | None]: 

98 obj = json.loads(data) 

99 if not isinstance(obj, dict): 

100 raise ValueError("Graph document must be a JSON object") 

101 

102 if obj.get("format") == FORMAT_ID: 

103 return load_graph_from_dict(obj), None 

104 

105 return load_graph_output_from_dict(obj) 

106 

107 

108def _load_context(path: str | None) -> dict[str, Any]: 

109 if path is None: 

110 return {} 

111 

112 obj = json.loads(Path(path).read_text(encoding="utf-8")) 

113 if not isinstance(obj, dict): 

114 raise ValueError("Context document must be a JSON object") 

115 

116 return {key: load_value_from_jsonable(value) for key, value in obj.items()} 

117 

118 

119def _parse_param_value(value: str) -> Any: 

120 try: 

121 return load_value_from_jsonable(json.loads(value)) 

122 except json.JSONDecodeError: 

123 return value 

124 

125 

126def _parse_param(param: str) -> tuple[str, Any]: 

127 key, separator, value = param.partition("=") 

128 if separator == "" or not key: 

129 raise ValueError("--param must be in KEY=VALUE form") 

130 return key, _parse_param_value(value) 

131 

132 

133def _load_params(params: list[str]) -> dict[str, Any]: 

134 return dict(_parse_param(param) for param in params) 

135 

136 

137def _external_deps(graph: Graph) -> set[str]: 

138 graph_keys = set(graph) 

139 return { 

140 dep 

141 for vertex in graph.values() 

142 for dep in vertex.deps 

143 if dep not in graph_keys 

144 } 

145 

146 

147def _encode_result_context(results: dict[str, Any]) -> dict[str, Any]: 

148 return {key: dump_value_to_jsonable(value) for key, value in results.items()} 

149 

150 

151def _select_output( 

152 results: dict[str, Any], 

153 *, 

154 pick: str | None, 

155 wrapper_output: str | None, 

156 emit_all: bool, 

157) -> _CliOutput: 

158 if emit_all: 

159 return _CliOutput(results, is_context=True, selected_key=None) 

160 

161 selected_key = pick if pick is not None else wrapper_output 

162 if selected_key is None: 

163 return _CliOutput(results, is_context=True, selected_key=None) 

164 

165 if selected_key not in results: 

166 available = ", ".join(sorted(results)) 

167 raise ValueError(f"Key '{selected_key}' not found. Available keys: {available}") 

168 

169 return _CliOutput( 

170 results[selected_key], is_context=False, selected_key=selected_key 

171 ) 

172 

173 

174def _execute_cli(args: argparse.Namespace, stdin: TextIO) -> _CliOutput: 

175 graph, wrapper_output = _load_input_document(_read_graph_arg(args.graph, stdin)) 

176 context = _load_context(args.context) 

177 context.update(_load_params(args.param)) 

178 for dep in _external_deps(graph): 

179 context.setdefault(dep, None) 

180 

181 registry = OpRegistry() 

182 registry.clear() 

183 registry.auto_discover() 

184 

185 executor = Executor(registry, NullStore()) 

186 results = executor.execute(graph, context=context) 

187 return _select_output( 

188 results, 

189 pick=args.pick, 

190 wrapper_output=wrapper_output, 

191 emit_all=args.all, 

192 ) 

193 

194 

195def _jsonable_output(output: _CliOutput) -> Any: 

196 if output.is_context: 

197 return _encode_result_context(output.value) 

198 return dump_value_to_jsonable(output.value) 

199 

200 

201def _write_json_output( 

202 output: _CliOutput, stream: TextIO, *, pretty: bool 

203) -> None: 

204 json.dump( 

205 _jsonable_output(output), 

206 stream, 

207 indent=2 if pretty else None, 

208 separators=None if pretty else (",", ":"), 

209 sort_keys=True, 

210 ) 

211 stream.write("\n") 

212 

213 

214def _write_binary_output(output: _CliOutput, path: Path) -> None: 

215 if output.is_context: 

216 raise ValueError("Binary output requires a single selected result") 

217 

218 value = output.value 

219 if not isinstance(value, ICacheable): 

220 selected = f" '{output.selected_key}'" if output.selected_key else "" 

221 raise ValueError( 

222 f"Output{selected} is {type(value).__name__}, not an ICacheable value" 

223 ) 

224 

225 path.parent.mkdir(parents=True, exist_ok=True) 

226 to_file = getattr(value, "to_file", None) 

227 if callable(to_file): 

228 to_file(path) 

229 return 

230 

231 with path.open("wb") as stream: 

232 value.to_stream(stream) 

233 

234 

235def _write_output_file( 

236 output: _CliOutput, 

237 *, 

238 path: Path, 

239 output_format: str, 

240 pretty: bool, 

241) -> None: 

242 if output_format == "binary" or ( 

243 output_format == "auto" 

244 and not output.is_context 

245 and isinstance(output.value, ICacheable) 

246 ): 

247 _write_binary_output(output, path) 

248 return 

249 

250 if output_format == "auto" or output_format == "json": 

251 path.parent.mkdir(parents=True, exist_ok=True) 

252 with path.open("w", encoding="utf-8") as stream: 

253 _write_json_output(output, stream, pretty=pretty) 

254 return 

255 

256 raise ValueError(f"Unsupported output format: {output_format}") 

257 

258 

259def main(argv: list[str] | None = None) -> int: 

260 parser = _build_parser() 

261 args = parser.parse_args(argv) 

262 

263 try: 

264 output = _execute_cli(args, sys.stdin) 

265 if args.output: 

266 _write_output_file( 

267 output, 

268 path=Path(args.output), 

269 output_format=args.output_format, 

270 pretty=args.pretty, 

271 ) 

272 else: 

273 _write_json_output(output, sys.stdout, pretty=args.pretty) 

274 except Exception as e: 

275 print(f"invariant: error: {e}", file=sys.stderr) 

276 return 1 

277 

278 return 0 

279 

280 

281if __name__ == "__main__": 

282 raise SystemExit(main())