Coverage for src / apcore_cli / output.py: 90%

247 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Output Formatter — TTY-adaptive output rendering (FE-08).""" 

2 

3from __future__ import annotations 

4 

5import json 

6import logging 

7import sys 

8from typing import TYPE_CHECKING, Any 

9 

10import click 

11from rich.console import Console 

12from rich.panel import Panel 

13from rich.syntax import Syntax 

14from rich.table import Table 

15 

16from apcore_cli.display_helpers import get_cli_display_fields as _get_cli_fields 

17 

18if TYPE_CHECKING: 

19 from apcore.registry.types import ModuleDescriptor 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24def resolve_format(explicit_format: str | None) -> str: 

25 """Resolve output format with TTY-adaptive default.""" 

26 if explicit_format is not None: 

27 return explicit_format 

28 if sys.stdout.isatty(): 

29 return "table" 

30 return "json" 

31 

32 

33def _truncate(text: str, max_length: int = 80) -> str: 

34 """Truncate text to max_length, appending '...' if needed.""" 

35 if len(text) <= max_length: 

36 return text 

37 return text[: max_length - 3] + "..." 

38 

39 

40def format_module_list( 

41 modules: list[ModuleDescriptor], 

42 format: str, 

43 filter_tags: tuple[str, ...] = (), 

44 show_deps: bool = False, 

45 exposure_filter: Any | None = None, 

46) -> None: 

47 """Format and print a list of modules. 

48 

49 Args: 

50 modules: Module descriptors to display. 

51 format: Output format string (table, json, csv, yaml, jsonl). 

52 filter_tags: Tags used for filtering (shown in empty-result message). 

53 show_deps: When True, adds a dependency count column to table output. 

54 exposure_filter: When not None, adds an "Exposure" column (✓/—) showing 

55 each module's exposure status per FE-12. 

56 """ 

57 if format == "table": 

58 if not modules and filter_tags: 

59 click.echo(f"No modules found matching tags: {', '.join(filter_tags)}.") 

60 return 

61 if not modules: 

62 click.echo("No modules found.") 

63 return 

64 

65 table = Table(title="Modules") 

66 table.add_column("ID") 

67 table.add_column("Description") 

68 table.add_column("Tags") 

69 if show_deps: 

70 table.add_column("Deps", justify="right") 

71 if exposure_filter is not None: 

72 table.add_column("Exposure", justify="center") 

73 

74 for m in modules: 

75 display_name, desc, tags_val = _get_cli_fields(m) 

76 tags = ", ".join(tags_val) if tags_val else "" 

77 row: list[str] = [display_name, _truncate(desc, 80), tags] 

78 if show_deps: 

79 deps = getattr(m, "dependencies", None) or [] 

80 row.append(str(len(deps))) 

81 if exposure_filter is not None: 

82 mid = getattr(m, "module_id", display_name) 

83 row.append("\u2713" if exposure_filter.is_exposed(mid) else "\u2014") 

84 table.add_row(*row) 

85 

86 Console().print(table) 

87 elif format in ("json", "csv", "yaml", "jsonl"): 

88 rows: list[dict[str, Any]] = [] 

89 for m in modules: 

90 mid, desc, tags_val = _get_cli_fields(m) 

91 entry: dict[str, Any] = { 

92 "id": mid, 

93 "description": desc, 

94 "tags": tags_val, 

95 } 

96 if show_deps: 

97 deps = getattr(m, "dependencies", None) or [] 

98 entry["dependency_count"] = len(deps) 

99 if exposure_filter is not None: 

100 entry["exposed"] = exposure_filter.is_exposed(mid) 

101 rows.append(entry) 

102 

103 if format == "json": 

104 click.echo(json.dumps(rows, indent=2)) 

105 elif format == "csv": 

106 import csv 

107 import io 

108 

109 if rows: 

110 buf = io.StringIO() 

111 writer = csv.DictWriter(buf, fieldnames=list(rows[0].keys())) 

112 writer.writeheader() 

113 for row in rows: 

114 writer.writerow({k: str(v) for k, v in row.items()}) 

115 click.echo(buf.getvalue().rstrip()) 

116 elif format == "yaml": 

117 try: 

118 import yaml 

119 

120 click.echo(yaml.dump(rows, default_flow_style=False, allow_unicode=True).rstrip()) 

121 except ImportError: 

122 click.echo(json.dumps(rows, indent=2)) 

123 elif format == "jsonl": 

124 for row in rows: 

125 click.echo(json.dumps(row)) 

126 

127 

128def _annotations_to_dict(annotations: Any) -> dict | None: 

129 """Convert annotations (dict or dataclass) to a plain dict, or None.""" 

130 if annotations is None: 

131 return None 

132 if isinstance(annotations, dict): 

133 return annotations if annotations else None 

134 # Dataclass-like object (e.g. ModuleAnnotations) — convert non-default fields 

135 try: 

136 import dataclasses 

137 

138 if dataclasses.is_dataclass(annotations): 

139 return { 

140 k: v 

141 for k, v in dataclasses.asdict(annotations).items() 

142 if v is not None and v is not False and v != 0 and v != [] 

143 } 

144 except (TypeError, AttributeError) as e: 

145 logger.debug("Could not extract annotations via dataclasses.asdict: %s", e) 

146 # Fallback: try vars() 

147 try: 

148 d = { 

149 k: v 

150 for k, v in vars(annotations).items() 

151 if not k.startswith("_") and v is not None and v is not False and v != 0 

152 } 

153 return d if d else None 

154 except (TypeError, AttributeError) as e: 

155 logger.debug("Could not extract annotations via vars(): %s", e) 

156 return None 

157 

158 

159def format_module_detail(module_def: ModuleDescriptor, format: str) -> None: 

160 """Format and print full module metadata.""" 

161 from apcore_cli.display_helpers import get_display 

162 

163 mid = module_def.canonical_id if hasattr(module_def, "canonical_id") else module_def.module_id 

164 

165 # Resolve display overlay fields (§5.13) 

166 display = get_display(module_def) 

167 cli_display = display.get("cli") or {} 

168 display_description: str = cli_display.get("description") or module_def.description 

169 display_guidance: str | None = cli_display.get("guidance") or display.get("guidance") 

170 

171 if format == "table": 

172 console = Console() 

173 console.print(Panel(f"Module: {mid}")) 

174 click.echo(f"\nDescription:\n {display_description}\n") 

175 if display_guidance: 

176 click.echo(f"Guidance:\n{display_guidance}\n") 

177 

178 if hasattr(module_def, "input_schema") and module_def.input_schema: 

179 click.echo("\nInput Schema:") 

180 console.print(Syntax(json.dumps(module_def.input_schema, indent=2), "json", theme="monokai")) 

181 

182 if hasattr(module_def, "output_schema") and module_def.output_schema: 

183 click.echo("\nOutput Schema:") 

184 console.print(Syntax(json.dumps(module_def.output_schema, indent=2), "json", theme="monokai")) 

185 

186 ann_dict = _annotations_to_dict(getattr(module_def, "annotations", None)) 

187 if ann_dict: 

188 click.echo("\nAnnotations:") 

189 for k, v in ann_dict.items(): 

190 click.echo(f" {k}: {v}") 

191 

192 # Extension metadata (x- prefixed) 

193 x_fields = {} 

194 if hasattr(module_def, "metadata") and isinstance(module_def.metadata, dict): 

195 x_fields = {k: v for k, v in module_def.metadata.items() if k.startswith("x-") or k.startswith("x_")} 

196 # Also check vars() for x_ prefixed attributes 

197 try: 

198 for k, v in vars(module_def).items(): 

199 if (k.startswith("x_") or k.startswith("x-")) and k not in x_fields: 

200 x_fields[k] = v 

201 except TypeError as e: 

202 logger.debug("Could not extract x- fields via vars(): %s", e) 

203 if x_fields: 

204 click.echo("\nExtension Metadata:") 

205 for k, v in x_fields.items(): 

206 click.echo(f" {k}: {v}") 

207 

208 tags = getattr(module_def, "tags", []) 

209 if tags: 

210 click.echo(f"\nTags: {', '.join(tags)}") 

211 

212 elif format == "json": 

213 result: dict[str, Any] = { 

214 "id": mid, 

215 "description": display_description, 

216 } 

217 if display_guidance: 

218 result["guidance"] = display_guidance 

219 if hasattr(module_def, "input_schema") and module_def.input_schema: 

220 result["input_schema"] = module_def.input_schema 

221 if hasattr(module_def, "output_schema") and module_def.output_schema: 

222 result["output_schema"] = module_def.output_schema 

223 

224 ann_dict = _annotations_to_dict(getattr(module_def, "annotations", None)) 

225 if ann_dict: 

226 result["annotations"] = ann_dict 

227 

228 tags = getattr(module_def, "tags", []) 

229 if tags: 

230 result["tags"] = tags 

231 

232 # Extension metadata 

233 if hasattr(module_def, "metadata") and isinstance(module_def.metadata, dict): 

234 for k, v in module_def.metadata.items(): 

235 if k.startswith("x-") or k.startswith("x_"): 

236 result[k] = v 

237 try: 

238 for k, v in vars(module_def).items(): 

239 if (k.startswith("x_") or k.startswith("x-")) and k not in result: 

240 result[k] = v 

241 except TypeError as e: 

242 logger.debug("Could not extract x- fields via vars(): %s", e) 

243 

244 click.echo(json.dumps(result, indent=2)) 

245 

246 

247def format_grouped_module_list( 

248 grouped: dict[str | None, list[tuple[str, str, list[str]]]], 

249 filter_tags: tuple[str, ...] = (), 

250) -> None: 

251 """Format and print modules grouped by namespace. 

252 

253 Parameters 

254 ---------- 

255 grouped: 

256 Mapping of group name (or ``None`` for ungrouped) to a list of 

257 ``(command_name, description, tags)`` tuples. 

258 filter_tags: 

259 Tags used for filtering (shown in the empty-state message). 

260 """ 

261 console = Console() 

262 

263 # Separate top-level (ungrouped) modules 

264 top = grouped.pop(None, []) 

265 

266 all_empty = not top and not grouped 

267 if all_empty and filter_tags: 

268 click.echo(f"No modules found matching tags: {', '.join(filter_tags)}.") 

269 return 

270 if all_empty: 

271 click.echo("No modules found.") 

272 return 

273 

274 # Named groups 

275 for group_name in sorted(grouped.keys()): 

276 members = grouped[group_name] 

277 table = Table(title=f"{group_name}") 

278 table.add_column("Command") 

279 table.add_column("Description") 

280 table.add_column("Tags") 

281 for cmd, desc, tags in sorted(members, key=lambda x: x[0]): 

282 table.add_row(cmd, _truncate(desc, 80), ", ".join(tags) if tags else "") 

283 console.print(table) 

284 

285 # Top-level / ungrouped 

286 if top: 

287 table = Table(title="Other") 

288 table.add_column("Command") 

289 table.add_column("Description") 

290 table.add_column("Tags") 

291 for cmd, desc, tags in sorted(top, key=lambda x: x[0]): 

292 table.add_row(cmd, _truncate(desc, 80), ", ".join(tags) if tags else "") 

293 console.print(table) 

294 

295 

296def format_exec_result(result: Any, format: str | None = None, fields: str | None = None) -> None: 

297 """Format and print module execution result. 

298 

299 Uses ``resolve_format(format)`` for TTY-adaptive defaulting: 

300 - json (or non-TTY default): JSON-pretty-printed output. 

301 - table: Rich table for dict results; falls back to JSON for lists. 

302 - csv: Comma-separated values (dict keys as header). 

303 - yaml: YAML format. 

304 - jsonl: JSON Lines (one object per line). 

305 """ 

306 if result is None: 

307 return 

308 

309 # Apply field selection if specified 

310 if fields and isinstance(result, dict): 

311 selected = {} 

312 for f in fields.split(","): 

313 f = f.strip() 

314 val = result 

315 for part in f.split("."): 

316 if isinstance(val, dict): 

317 val = val.get(part) 

318 else: 

319 val = None 

320 break 

321 selected[f] = val 

322 result = selected 

323 

324 effective = resolve_format(format) 

325 

326 if effective == "csv": 

327 import csv 

328 import io 

329 

330 if isinstance(result, dict): 

331 buf = io.StringIO() 

332 writer = csv.DictWriter(buf, fieldnames=list(result.keys())) 

333 writer.writeheader() 

334 writer.writerow({k: str(v) for k, v in result.items()}) 

335 click.echo(buf.getvalue().rstrip()) 

336 elif isinstance(result, list) and result and isinstance(result[0], dict): 

337 buf = io.StringIO() 

338 writer = csv.DictWriter(buf, fieldnames=list(result[0].keys())) 

339 writer.writeheader() 

340 for row in result: 

341 writer.writerow({k: str(v) for k, v in row.items()}) 

342 click.echo(buf.getvalue().rstrip()) 

343 else: 

344 click.echo(json.dumps(result, default=str)) 

345 elif effective == "yaml": 

346 try: 

347 import yaml 

348 

349 click.echo(yaml.dump(result, default_flow_style=False, allow_unicode=True).rstrip()) 

350 except ImportError: 

351 click.echo(json.dumps(result, indent=2, default=str)) 

352 elif effective == "jsonl": 

353 if isinstance(result, list): 

354 for item in result: 

355 click.echo(json.dumps(item, default=str)) 

356 else: 

357 click.echo(json.dumps(result, default=str)) 

358 elif effective == "table" and isinstance(result, dict): 

359 table = Table() 

360 table.add_column("Key") 

361 table.add_column("Value") 

362 for k, v in result.items(): 

363 table.add_row(str(k), str(v)) 

364 Console().print(table) 

365 elif isinstance(result, dict | list): 

366 click.echo(json.dumps(result, indent=2, default=str)) 

367 elif isinstance(result, str): 

368 click.echo(result) 

369 else: 

370 click.echo(str(result))