Coverage for src / apcore_cli / output.py: 90%
247 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""Output Formatter — TTY-adaptive output rendering (FE-08)."""
3from __future__ import annotations
5import json
6import logging
7import sys
8from typing import TYPE_CHECKING, Any
10import click
11from rich.console import Console
12from rich.panel import Panel
13from rich.syntax import Syntax
14from rich.table import Table
16from apcore_cli.display_helpers import get_cli_display_fields as _get_cli_fields
18if TYPE_CHECKING:
19 from apcore.registry.types import ModuleDescriptor
21logger = logging.getLogger(__name__)
24def resolve_format(explicit_format: str | None) -> str:
25 """Resolve output format with TTY-adaptive default."""
26 if explicit_format is not None:
27 return explicit_format
28 if sys.stdout.isatty():
29 return "table"
30 return "json"
33def _truncate(text: str, max_length: int = 80) -> str:
34 """Truncate text to max_length, appending '...' if needed."""
35 if len(text) <= max_length:
36 return text
37 return text[: max_length - 3] + "..."
40def format_module_list(
41 modules: list[ModuleDescriptor],
42 format: str,
43 filter_tags: tuple[str, ...] = (),
44 show_deps: bool = False,
45 exposure_filter: Any | None = None,
46) -> None:
47 """Format and print a list of modules.
49 Args:
50 modules: Module descriptors to display.
51 format: Output format string (table, json, csv, yaml, jsonl).
52 filter_tags: Tags used for filtering (shown in empty-result message).
53 show_deps: When True, adds a dependency count column to table output.
54 exposure_filter: When not None, adds an "Exposure" column (✓/—) showing
55 each module's exposure status per FE-12.
56 """
57 if format == "table":
58 if not modules and filter_tags:
59 click.echo(f"No modules found matching tags: {', '.join(filter_tags)}.")
60 return
61 if not modules:
62 click.echo("No modules found.")
63 return
65 table = Table(title="Modules")
66 table.add_column("ID")
67 table.add_column("Description")
68 table.add_column("Tags")
69 if show_deps:
70 table.add_column("Deps", justify="right")
71 if exposure_filter is not None:
72 table.add_column("Exposure", justify="center")
74 for m in modules:
75 display_name, desc, tags_val = _get_cli_fields(m)
76 tags = ", ".join(tags_val) if tags_val else ""
77 row: list[str] = [display_name, _truncate(desc, 80), tags]
78 if show_deps:
79 deps = getattr(m, "dependencies", None) or []
80 row.append(str(len(deps)))
81 if exposure_filter is not None:
82 mid = getattr(m, "module_id", display_name)
83 row.append("\u2713" if exposure_filter.is_exposed(mid) else "\u2014")
84 table.add_row(*row)
86 Console().print(table)
87 elif format in ("json", "csv", "yaml", "jsonl"):
88 rows: list[dict[str, Any]] = []
89 for m in modules:
90 mid, desc, tags_val = _get_cli_fields(m)
91 entry: dict[str, Any] = {
92 "id": mid,
93 "description": desc,
94 "tags": tags_val,
95 }
96 if show_deps:
97 deps = getattr(m, "dependencies", None) or []
98 entry["dependency_count"] = len(deps)
99 if exposure_filter is not None:
100 entry["exposed"] = exposure_filter.is_exposed(mid)
101 rows.append(entry)
103 if format == "json":
104 click.echo(json.dumps(rows, indent=2))
105 elif format == "csv":
106 import csv
107 import io
109 if rows:
110 buf = io.StringIO()
111 writer = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))
112 writer.writeheader()
113 for row in rows:
114 writer.writerow({k: str(v) for k, v in row.items()})
115 click.echo(buf.getvalue().rstrip())
116 elif format == "yaml":
117 try:
118 import yaml
120 click.echo(yaml.dump(rows, default_flow_style=False, allow_unicode=True).rstrip())
121 except ImportError:
122 click.echo(json.dumps(rows, indent=2))
123 elif format == "jsonl":
124 for row in rows:
125 click.echo(json.dumps(row))
128def _annotations_to_dict(annotations: Any) -> dict | None:
129 """Convert annotations (dict or dataclass) to a plain dict, or None."""
130 if annotations is None:
131 return None
132 if isinstance(annotations, dict):
133 return annotations if annotations else None
134 # Dataclass-like object (e.g. ModuleAnnotations) — convert non-default fields
135 try:
136 import dataclasses
138 if dataclasses.is_dataclass(annotations):
139 return {
140 k: v
141 for k, v in dataclasses.asdict(annotations).items()
142 if v is not None and v is not False and v != 0 and v != []
143 }
144 except (TypeError, AttributeError) as e:
145 logger.debug("Could not extract annotations via dataclasses.asdict: %s", e)
146 # Fallback: try vars()
147 try:
148 d = {
149 k: v
150 for k, v in vars(annotations).items()
151 if not k.startswith("_") and v is not None and v is not False and v != 0
152 }
153 return d if d else None
154 except (TypeError, AttributeError) as e:
155 logger.debug("Could not extract annotations via vars(): %s", e)
156 return None
159def format_module_detail(module_def: ModuleDescriptor, format: str) -> None:
160 """Format and print full module metadata."""
161 from apcore_cli.display_helpers import get_display
163 mid = module_def.canonical_id if hasattr(module_def, "canonical_id") else module_def.module_id
165 # Resolve display overlay fields (§5.13)
166 display = get_display(module_def)
167 cli_display = display.get("cli") or {}
168 display_description: str = cli_display.get("description") or module_def.description
169 display_guidance: str | None = cli_display.get("guidance") or display.get("guidance")
171 if format == "table":
172 console = Console()
173 console.print(Panel(f"Module: {mid}"))
174 click.echo(f"\nDescription:\n {display_description}\n")
175 if display_guidance:
176 click.echo(f"Guidance:\n{display_guidance}\n")
178 if hasattr(module_def, "input_schema") and module_def.input_schema:
179 click.echo("\nInput Schema:")
180 console.print(Syntax(json.dumps(module_def.input_schema, indent=2), "json", theme="monokai"))
182 if hasattr(module_def, "output_schema") and module_def.output_schema:
183 click.echo("\nOutput Schema:")
184 console.print(Syntax(json.dumps(module_def.output_schema, indent=2), "json", theme="monokai"))
186 ann_dict = _annotations_to_dict(getattr(module_def, "annotations", None))
187 if ann_dict:
188 click.echo("\nAnnotations:")
189 for k, v in ann_dict.items():
190 click.echo(f" {k}: {v}")
192 # Extension metadata (x- prefixed)
193 x_fields = {}
194 if hasattr(module_def, "metadata") and isinstance(module_def.metadata, dict):
195 x_fields = {k: v for k, v in module_def.metadata.items() if k.startswith("x-") or k.startswith("x_")}
196 # Also check vars() for x_ prefixed attributes
197 try:
198 for k, v in vars(module_def).items():
199 if (k.startswith("x_") or k.startswith("x-")) and k not in x_fields:
200 x_fields[k] = v
201 except TypeError as e:
202 logger.debug("Could not extract x- fields via vars(): %s", e)
203 if x_fields:
204 click.echo("\nExtension Metadata:")
205 for k, v in x_fields.items():
206 click.echo(f" {k}: {v}")
208 tags = getattr(module_def, "tags", [])
209 if tags:
210 click.echo(f"\nTags: {', '.join(tags)}")
212 elif format == "json":
213 result: dict[str, Any] = {
214 "id": mid,
215 "description": display_description,
216 }
217 if display_guidance:
218 result["guidance"] = display_guidance
219 if hasattr(module_def, "input_schema") and module_def.input_schema:
220 result["input_schema"] = module_def.input_schema
221 if hasattr(module_def, "output_schema") and module_def.output_schema:
222 result["output_schema"] = module_def.output_schema
224 ann_dict = _annotations_to_dict(getattr(module_def, "annotations", None))
225 if ann_dict:
226 result["annotations"] = ann_dict
228 tags = getattr(module_def, "tags", [])
229 if tags:
230 result["tags"] = tags
232 # Extension metadata
233 if hasattr(module_def, "metadata") and isinstance(module_def.metadata, dict):
234 for k, v in module_def.metadata.items():
235 if k.startswith("x-") or k.startswith("x_"):
236 result[k] = v
237 try:
238 for k, v in vars(module_def).items():
239 if (k.startswith("x_") or k.startswith("x-")) and k not in result:
240 result[k] = v
241 except TypeError as e:
242 logger.debug("Could not extract x- fields via vars(): %s", e)
244 click.echo(json.dumps(result, indent=2))
247def format_grouped_module_list(
248 grouped: dict[str | None, list[tuple[str, str, list[str]]]],
249 filter_tags: tuple[str, ...] = (),
250) -> None:
251 """Format and print modules grouped by namespace.
253 Parameters
254 ----------
255 grouped:
256 Mapping of group name (or ``None`` for ungrouped) to a list of
257 ``(command_name, description, tags)`` tuples.
258 filter_tags:
259 Tags used for filtering (shown in the empty-state message).
260 """
261 console = Console()
263 # Separate top-level (ungrouped) modules
264 top = grouped.pop(None, [])
266 all_empty = not top and not grouped
267 if all_empty and filter_tags:
268 click.echo(f"No modules found matching tags: {', '.join(filter_tags)}.")
269 return
270 if all_empty:
271 click.echo("No modules found.")
272 return
274 # Named groups
275 for group_name in sorted(grouped.keys()):
276 members = grouped[group_name]
277 table = Table(title=f"{group_name}")
278 table.add_column("Command")
279 table.add_column("Description")
280 table.add_column("Tags")
281 for cmd, desc, tags in sorted(members, key=lambda x: x[0]):
282 table.add_row(cmd, _truncate(desc, 80), ", ".join(tags) if tags else "")
283 console.print(table)
285 # Top-level / ungrouped
286 if top:
287 table = Table(title="Other")
288 table.add_column("Command")
289 table.add_column("Description")
290 table.add_column("Tags")
291 for cmd, desc, tags in sorted(top, key=lambda x: x[0]):
292 table.add_row(cmd, _truncate(desc, 80), ", ".join(tags) if tags else "")
293 console.print(table)
296def format_exec_result(result: Any, format: str | None = None, fields: str | None = None) -> None:
297 """Format and print module execution result.
299 Uses ``resolve_format(format)`` for TTY-adaptive defaulting:
300 - json (or non-TTY default): JSON-pretty-printed output.
301 - table: Rich table for dict results; falls back to JSON for lists.
302 - csv: Comma-separated values (dict keys as header).
303 - yaml: YAML format.
304 - jsonl: JSON Lines (one object per line).
305 """
306 if result is None:
307 return
309 # Apply field selection if specified
310 if fields and isinstance(result, dict):
311 selected = {}
312 for f in fields.split(","):
313 f = f.strip()
314 val = result
315 for part in f.split("."):
316 if isinstance(val, dict):
317 val = val.get(part)
318 else:
319 val = None
320 break
321 selected[f] = val
322 result = selected
324 effective = resolve_format(format)
326 if effective == "csv":
327 import csv
328 import io
330 if isinstance(result, dict):
331 buf = io.StringIO()
332 writer = csv.DictWriter(buf, fieldnames=list(result.keys()))
333 writer.writeheader()
334 writer.writerow({k: str(v) for k, v in result.items()})
335 click.echo(buf.getvalue().rstrip())
336 elif isinstance(result, list) and result and isinstance(result[0], dict):
337 buf = io.StringIO()
338 writer = csv.DictWriter(buf, fieldnames=list(result[0].keys()))
339 writer.writeheader()
340 for row in result:
341 writer.writerow({k: str(v) for k, v in row.items()})
342 click.echo(buf.getvalue().rstrip())
343 else:
344 click.echo(json.dumps(result, default=str))
345 elif effective == "yaml":
346 try:
347 import yaml
349 click.echo(yaml.dump(result, default_flow_style=False, allow_unicode=True).rstrip())
350 except ImportError:
351 click.echo(json.dumps(result, indent=2, default=str))
352 elif effective == "jsonl":
353 if isinstance(result, list):
354 for item in result:
355 click.echo(json.dumps(item, default=str))
356 else:
357 click.echo(json.dumps(result, default=str))
358 elif effective == "table" and isinstance(result, dict):
359 table = Table()
360 table.add_column("Key")
361 table.add_column("Value")
362 for k, v in result.items():
363 table.add_row(str(k), str(v))
364 Console().print(table)
365 elif isinstance(result, dict | list):
366 click.echo(json.dumps(result, indent=2, default=str))
367 elif isinstance(result, str):
368 click.echo(result)
369 else:
370 click.echo(str(result))