Coverage for src / apcore_cli / cli.py: 76%
511 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""Core Dispatcher — CLI entry point and module routing (FE-01)."""
3from __future__ import annotations
5import json
6import logging
7import re
8import sys
9import time
10from pathlib import Path
11from typing import TYPE_CHECKING, Any
13import click
14import jsonschema
16from apcore_cli.approval import check_approval
17from apcore_cli.builtin_group import RESERVED_GROUP_NAMES as RESERVED_GROUP_NAMES # noqa: PLC0414
18from apcore_cli.display_helpers import get_display as _get_display
19from apcore_cli.output import format_exec_result
20from apcore_cli.ref_resolver import RefResolverError, resolve_refs
21from apcore_cli.schema_parser import reconvert_enum_values, schema_to_click_options
22from apcore_cli.security.sandbox import Sandbox
24# FE-13 §11.4: ``BUILTIN_COMMANDS`` was retired in v0.7.0. All apcore-cli-provided
25# commands now live under the single reserved ``apcli`` group, so the only
26# collision surface is the group name itself (see ``RESERVED_GROUP_NAMES`` above,
27# imported from :mod:`apcore_cli.builtin_group`). Stale ``from apcore_cli.cli
28# import BUILTIN_COMMANDS`` imports will fail at import time rather than drift
29# silently — intentional per spec §11.4.
31if TYPE_CHECKING:
32 from apcore import Executor, Registry
33 from apcore.registry.types import ModuleDescriptor
35 from apcore_cli.security.audit import AuditLogger
37logger = logging.getLogger("apcore_cli.cli")
39# Module-level audit logger, set during CLI init
40_audit_logger: AuditLogger | None = None
42# Module-level verbose help flag, set during CLI init
43_verbose_help: bool = False
46def set_verbose_help(verbose: bool) -> None:
47 """Set the verbose help flag. When False, built-in options are hidden."""
48 global _verbose_help
49 _verbose_help = verbose
52# Module-level docs URL, set by downstream projects
53_docs_url: str | None = None
56def set_docs_url(url: str | None) -> None:
57 """Set the base URL for online documentation links in help and man pages.
59 Pass None to disable. Command-level help appends ``/commands/{name}``
60 automatically.
62 Example::
64 set_docs_url("https://docs.apcore.dev/cli")
65 """
66 global _docs_url
67 _docs_url = url
70def set_audit_logger(audit_logger: AuditLogger | None) -> None:
71 """Set the global audit logger instance. Pass None to clear."""
72 global _audit_logger
73 _audit_logger = audit_logger
76class _LazyGroup(click.Group):
77 """Click Group for a single command group — lazily builds subcommands."""
79 def __init__(
80 self,
81 members: dict[str, tuple[str, Any]],
82 executor: Any,
83 help_text_max_length: int = 1000,
84 extensions_root: str | None = None,
85 **kwargs: Any,
86 ) -> None:
87 super().__init__(**kwargs)
88 self._members = members # dict[cmd_name, (module_id, descriptor)]
89 self._executor = executor
90 self._help_text_max_length = help_text_max_length
91 self._extensions_root = extensions_root
92 self._cmd_cache: dict[str, click.Command] = {}
94 def list_commands(self, ctx: click.Context) -> list[str]:
95 return sorted(self._members.keys())
97 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
98 if cmd_name in self._cmd_cache:
99 return self._cmd_cache[cmd_name]
100 entry = self._members.get(cmd_name)
101 if entry is None:
102 return None
103 _, descriptor = entry
104 cmd = build_module_command(
105 descriptor,
106 self._executor,
107 help_text_max_length=self._help_text_max_length,
108 cmd_name=cmd_name,
109 extensions_root=self._extensions_root,
110 )
111 self._cmd_cache[cmd_name] = cmd
112 return cmd
115class LazyModuleGroup(click.Group):
116 """Custom Click Group that lazily loads apcore modules as subcommands."""
118 def __init__(
119 self,
120 registry: Registry,
121 executor: Executor,
122 help_text_max_length: int = 1000,
123 extensions_root: str | None = None,
124 **kwargs: Any,
125 ) -> None:
126 super().__init__(**kwargs)
127 self._registry = registry
128 self._executor = executor
129 self._help_text_max_length = help_text_max_length
130 self._extensions_root = extensions_root
131 self._module_cache: dict[str, click.Command] = {}
132 # alias → canonical module_id (populated lazily)
133 self._alias_map: dict[str, str] = {}
134 # module_id → descriptor cache (populated during alias map build)
135 self._descriptor_cache: dict[str, Any] = {}
136 self._alias_map_built: bool = False
138 def _build_alias_map(self) -> None:
139 """Build alias→module_id map from display overlay metadata."""
140 if self._alias_map_built:
141 return
142 try:
143 for module_id in self._registry.list():
144 descriptor = self._registry.get_definition(module_id)
145 if descriptor is None:
146 continue
147 self._descriptor_cache[module_id] = descriptor
148 display = _get_display(descriptor)
149 cli_alias: str | None = (display.get("cli") or {}).get("alias")
150 if cli_alias and cli_alias != module_id:
151 self._alias_map[cli_alias] = module_id
152 self._alias_map_built = True
153 except Exception:
154 logger.warning("Failed to build alias map from registry")
156 def list_commands(self, ctx: click.Context) -> list[str]:
157 # Root-level commands are whatever the factory has registered
158 # (post-FE-13: the `apcli` group + any deprecation shims + caller
159 # extras). Module-derived commands live on top of those.
160 registered = [name for name, cmd in self.commands.items() if not cmd.hidden]
161 try:
162 self._build_alias_map()
163 reverse: dict[str, str] = {v: k for k, v in self._alias_map.items()}
164 module_ids = self._registry.list()
165 names = [reverse.get(mid, mid) for mid in module_ids]
166 except Exception:
167 logger.warning("Failed to list modules from registry")
168 names = []
169 return sorted(set(registered + names))
171 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
172 # Check built-in commands first
173 if cmd_name in self.commands:
174 return self.commands[cmd_name]
176 # Check cache
177 if cmd_name in self._module_cache:
178 return self._module_cache[cmd_name]
180 # Resolve alias → canonical module_id
181 self._build_alias_map()
182 module_id = self._alias_map.get(cmd_name, cmd_name)
184 # Look up in descriptor cache (populated during alias map build) or registry
185 module_def = self._descriptor_cache.get(module_id)
186 if module_def is None:
187 module_def = self._registry.get_definition(module_id)
188 if module_def is None:
189 return None
191 cmd = build_module_command(
192 module_def,
193 self._executor,
194 help_text_max_length=self._help_text_max_length,
195 cmd_name=cmd_name,
196 extensions_root=self._extensions_root,
197 )
198 self._module_cache[cmd_name] = cmd
199 return cmd
202class GroupedModuleGroup(LazyModuleGroup):
203 """Extended LazyModuleGroup that organises modules into named groups."""
205 def __init__(self, **kwargs: Any) -> None:
206 from apcore_cli.exposure import ExposureFilter
208 exposure_filter = kwargs.pop("exposure_filter", None)
209 super().__init__(**kwargs)
210 self._exposure_filter: ExposureFilter = exposure_filter or ExposureFilter()
211 self._group_map: dict[str, dict[str, tuple[str, Any]]] = {}
212 self._top_level_modules: dict[str, tuple[str, Any]] = {}
213 self._group_cache: dict[str, _LazyGroup] = {}
214 self._group_map_built: bool = False
216 @staticmethod
217 def _resolve_group(module_id: str, descriptor: Any) -> tuple[str | None, str]:
218 """Determine (group, command_name) for a module from its display overlay."""
219 if not module_id:
220 logger.warning("Empty module_id encountered in _resolve_group")
221 return (None, "")
223 display = _get_display(descriptor)
224 cli_display = display.get("cli") or {}
225 explicit_group = cli_display.get("group")
227 if isinstance(explicit_group, str) and explicit_group != "":
228 return (explicit_group, cli_display.get("alias") or module_id)
229 if explicit_group == "":
230 return (None, cli_display.get("alias") or module_id)
232 cli_name = cli_display.get("alias") or module_id
233 if "." in cli_name:
234 group, _, cmd = cli_name.partition(".")
235 return (group, cmd)
236 return (None, cli_name)
238 def _build_group_map(self) -> None:
239 """Build the group map from registry modules.
241 Applies the exposure filter before building groups — hidden modules are
242 excluded from --help and tab-completion but remain invocable via exec.
244 FE-13 §4.10: rejects modules whose explicit group name, dotted-ID
245 auto-extracted group, or top-level CLI name is ``apcli`` (reserved).
246 Rejection is a hard :class:`click.UsageError` — exit 2 at import time.
247 """
248 if self._group_map_built:
249 return
250 try:
251 self._build_alias_map()
252 for module_id in self._registry.list():
253 descriptor = self._descriptor_cache.get(module_id)
254 if descriptor is None:
255 continue
256 # Skip modules that should not appear as CLI commands (FE-12).
257 if not self._exposure_filter.is_exposed(module_id):
258 continue
259 group, cmd = self._resolve_group(module_id, descriptor)
260 # FE-13: reserved-name enforcement covers all three collision
261 # shapes — explicit group, auto-grouped from dotted module_id,
262 # and top-level CLI name. Raised as UsageError so Click exits
263 # with code 2 per spec §7 FR-13-09. UsageError must propagate
264 # past the below try/except so test fixtures and live CLI see
265 # the intended failure path.
266 if group is not None and group in RESERVED_GROUP_NAMES:
267 raise click.UsageError(
268 f"Module '{module_id}': group name '{group}' is reserved. "
269 f"Use a different CLI alias or set display.cli.group to another value."
270 )
271 if group is None and cmd in RESERVED_GROUP_NAMES:
272 raise click.UsageError(
273 f"Module '{module_id}': top-level CLI name '{cmd}' is reserved. Use a different CLI alias."
274 )
275 if group is None:
276 self._top_level_modules[cmd] = (module_id, descriptor)
277 elif not re.fullmatch(r"[a-z][a-z0-9_-]*", group):
278 logger.warning(
279 "Module '%s': group name '%s' is not shell-safe — treating as top-level.",
280 module_id,
281 group,
282 )
283 self._top_level_modules[cmd] = (module_id, descriptor)
284 else:
285 self._group_map.setdefault(group, {})[cmd] = (module_id, descriptor)
286 self._group_map_built = True
287 except click.UsageError:
288 raise
289 except Exception:
290 # Transient registry errors — allow retry on next call.
291 logger.warning("Failed to build group map")
293 def list_commands(self, ctx: click.Context) -> list[str]:
294 self._build_group_map()
295 # Registered root commands (post-FE-13: `apcli` group + deprecation
296 # shims + caller extras). Hidden entries (e.g. the apcli group when
297 # mode=none) are filtered out so tab-completion matches --help.
298 registered = [name for name, cmd in self.commands.items() if not cmd.hidden]
299 group_names = list(self._group_map.keys())
300 top_names = list(self._top_level_modules.keys())
301 return sorted(set(registered + group_names + top_names))
303 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
304 # Check built-in commands first
305 if cmd_name in self.commands:
306 return self.commands[cmd_name]
308 self._build_group_map()
310 # Check group cache
311 if cmd_name in self._group_cache:
312 return self._group_cache[cmd_name]
314 # Check if it's a group
315 if cmd_name in self._group_map:
316 grp = _LazyGroup(
317 members=self._group_map[cmd_name],
318 executor=self._executor,
319 help_text_max_length=self._help_text_max_length,
320 extensions_root=self._extensions_root,
321 name=cmd_name,
322 )
323 self._group_cache[cmd_name] = grp
324 return grp
326 # Check top-level modules
327 if cmd_name in self._top_level_modules:
328 if cmd_name in self._module_cache:
329 return self._module_cache[cmd_name]
330 _, descriptor = self._top_level_modules[cmd_name]
331 cmd = build_module_command(
332 descriptor,
333 self._executor,
334 help_text_max_length=self._help_text_max_length,
335 cmd_name=cmd_name,
336 extensions_root=self._extensions_root,
337 )
338 self._module_cache[cmd_name] = cmd
339 return cmd
341 return None
343 def format_help(self, ctx: click.Context, formatter: click.HelpFormatter) -> None:
344 self._build_group_map()
345 self.format_usage(ctx, formatter)
346 if self.help:
347 formatter.write_paragraph()
348 formatter.write(self.help)
350 # Options section
351 opts = []
352 for p in self.get_params(ctx):
353 rv = p.get_help_record(ctx)
354 if rv is not None:
355 opts.append(rv)
356 if opts:
357 with formatter.section("Options"):
358 formatter.write_dl(opts)
360 # Commands section. Post-FE-13, all apcore-cli built-ins live under
361 # the `apcli` group — at root we only show non-hidden registered
362 # commands (the apcli group itself when visible, help, and any
363 # user-added extras via extra_commands / add_command).
364 cmd_records = []
365 for name in sorted(self.commands):
366 cmd = self.commands[name]
367 if cmd is None or cmd.hidden:
368 continue
369 cmd_records.append((name, cmd.get_short_help_str()))
370 if cmd_records:
371 with formatter.section("Commands"):
372 formatter.write_dl(cmd_records)
374 # Modules section (top-level)
375 if self._top_level_modules:
376 module_records = []
377 for name in sorted(self._top_level_modules.keys()):
378 _, descriptor = self._top_level_modules[name]
379 desc = getattr(descriptor, "description", "") or ""
380 module_records.append((name, desc))
381 with formatter.section("Modules"):
382 formatter.write_dl(module_records)
384 # Groups section (business-module-derived groups only — the `apcli`
385 # reserved group was rejected in _build_group_map, so filtering here
386 # is defensive rather than load-bearing).
387 if self._group_map:
388 group_records = []
389 for group_name in sorted(self._group_map.keys()):
390 count = len(self._group_map[group_name])
391 suffix = "s" if count != 1 else ""
392 group_records.append((group_name, f"({count} command{suffix})"))
393 if group_records:
394 with formatter.section("Groups"):
395 formatter.write_dl(group_records)
397 # Footer hints for discoverability
398 formatter.write_paragraph()
399 formatter.write(
400 "Use --help --verbose to show all options (including built-in apcore options).\n"
401 "Use --help --man to display a formatted man page."
402 )
405# Error code mapping from apcore error codes to CLI exit codes
406_ERROR_CODE_MAP = {
407 "MODULE_NOT_FOUND": 44,
408 "MODULE_LOAD_ERROR": 44,
409 "MODULE_DISABLED": 44,
410 "SCHEMA_VALIDATION_ERROR": 45,
411 "SCHEMA_CIRCULAR_REF": 48,
412 "APPROVAL_DENIED": 46,
413 "APPROVAL_TIMEOUT": 46,
414 "APPROVAL_PENDING": 46,
415 "CONFIG_NOT_FOUND": 47,
416 "CONFIG_INVALID": 47,
417 "MODULE_EXECUTE_ERROR": 1,
418 "MODULE_TIMEOUT": 1,
419 "ACL_DENIED": 77,
420 # Config Bus errors (apcore >= 0.15.0)
421 "CONFIG_NAMESPACE_RESERVED": 78,
422 "CONFIG_NAMESPACE_DUPLICATE": 78,
423 "CONFIG_ENV_PREFIX_CONFLICT": 78,
424 "CONFIG_ENV_MAP_CONFLICT": 78,
425 "CONFIG_MOUNT_ERROR": 66,
426 "CONFIG_BIND_ERROR": 65,
427 "ERROR_FORMATTER_DUPLICATE": 70,
428}
431def _first_failed_exit_code(result: Any) -> int:
432 """Return the exit code for the first failed check in a PreflightResult."""
433 _check_to_exit = {
434 "module_id": 2,
435 "module_lookup": 44,
436 "call_chain": 1,
437 "acl": 77,
438 "schema": 45,
439 "approval": 46,
440 "module_preflight": 1,
441 }
442 for check in getattr(result, "checks", []):
443 if not check.passed:
444 return _check_to_exit.get(check.check, 1)
445 return 1
448def format_preflight_result(result: Any, fmt: str | None = None) -> None:
449 """Format and print a PreflightResult to stdout."""
450 from apcore_cli.output import resolve_format
452 resolved = resolve_format(fmt)
453 if resolved == "json" or not sys.stdout.isatty():
454 payload: dict[str, Any] = {
455 "valid": result.valid,
456 "requires_approval": result.requires_approval,
457 "checks": [],
458 }
459 for c in result.checks:
460 entry: dict[str, Any] = {"check": c.check, "passed": c.passed}
461 if c.error is not None:
462 entry["error"] = c.error
463 if c.warnings:
464 entry["warnings"] = c.warnings
465 payload["checks"].append(entry)
466 click.echo(json.dumps(payload, indent=2, default=str))
467 else:
468 # TTY table format
469 for c in result.checks:
470 has_warnings = bool(getattr(c, "warnings", []))
471 if c.passed and has_warnings:
472 sym = "\u26a0" # ⚠ passed with warnings
473 elif c.passed:
474 sym = "\u2713" # ✓ passed
475 elif c.passed is False:
476 sym = "\u2717" # ✗ failed
477 else:
478 sym = "\u25cb" # ○ skipped
479 status = f" {sym} {c.check:<20}"
480 if c.error:
481 detail = json.dumps(c.error, default=str) if isinstance(c.error, dict) else str(c.error)
482 status += f" {detail}"
483 elif c.passed and not has_warnings:
484 status += " OK"
485 elif not c.passed:
486 status += " Skipped"
487 click.echo(status)
488 for w in getattr(c, "warnings", []):
489 click.echo(f" Warning: {w}")
490 errors = sum(1 for c in result.checks if not c.passed)
491 warnings = sum(len(getattr(c, "warnings", [])) for c in result.checks)
492 tag = "PASS" if result.valid else "FAIL"
493 click.echo(f"\nResult: {tag} ({errors} error(s), {warnings} warning(s))")
496def _emit_error_json(e: Exception, exit_code: int) -> None:
497 """Emit structured JSON error to stderr for AI agents."""
498 code = getattr(e, "code", None)
499 payload: dict[str, Any] = {
500 "error": True,
501 "code": code or "UNKNOWN",
502 "message": str(e),
503 "exit_code": exit_code,
504 }
505 for field in ("details", "suggestion", "ai_guidance", "retryable", "user_fixable"):
506 val = getattr(e, field, None)
507 if val is not None:
508 payload[field] = val
509 click.echo(json.dumps(payload, default=str), err=True)
512def _emit_error_tty(e: Exception, exit_code: int) -> None:
513 """Emit human-readable error to stderr with guidance fields."""
514 code = getattr(e, "code", None)
515 header = f"Error [{code}]: {e}" if code else f"Error: {e}"
516 click.echo(header, err=True)
518 details = getattr(e, "details", None)
519 if details and isinstance(details, dict):
520 click.echo("", err=True)
521 click.echo(" Details:", err=True)
522 for k, v in details.items():
523 click.echo(f" {k}: {v}", err=True)
525 suggestion = getattr(e, "suggestion", None)
526 if suggestion:
527 click.echo(f"\n Suggestion: {suggestion}", err=True)
529 retryable = getattr(e, "retryable", None)
530 if retryable is not None:
531 label = "Yes" if retryable else "No (same input will fail again)"
532 click.echo(f" Retryable: {label}", err=True)
534 click.echo(f"\n Exit code: {exit_code}", err=True)
537def _get_module_id(module_def: ModuleDescriptor) -> str:
538 """Get the canonical module ID, falling back to module_id."""
539 cid = getattr(module_def, "canonical_id", None)
540 if isinstance(cid, str):
541 return cid
542 return module_def.module_id
545def build_module_command(
546 module_def: ModuleDescriptor,
547 executor: Executor,
548 help_text_max_length: int = 1000,
549 cmd_name: str | None = None,
550 extensions_root: str | None = None,
551) -> click.Command:
552 """Build a Click command from an apcore module definition.
554 Generates Click options from the module's input_schema, wires up
555 STDIN input collection, schema validation, approval gating,
556 execution, audit logging, and output formatting.
557 """
558 # Resolve display overlay fields (§5.13)
559 display = _get_display(module_def)
560 cli_display = display.get("cli") or {}
562 raw_schema = getattr(module_def, "input_schema", None)
563 module_id = _get_module_id(module_def)
564 # cmd_name is the user-facing name (alias or module_id)
565 effective_cmd_name: str = cmd_name or cli_display.get("alias") or module_id
566 cmd_help: str = cli_display.get("description") or module_def.description
568 # Defensively convert Pydantic model class to dict
569 if raw_schema is None:
570 input_schema: dict = {}
571 elif isinstance(raw_schema, dict):
572 input_schema = raw_schema
573 elif hasattr(raw_schema, "model_json_schema"):
574 # Pydantic v2 BaseModel class
575 input_schema = raw_schema.model_json_schema()
576 elif hasattr(raw_schema, "schema"):
577 # Pydantic v1 BaseModel class
578 input_schema = raw_schema.schema()
579 else:
580 input_schema = {}
582 if input_schema.get("properties"):
583 try:
584 resolved_schema = resolve_refs(input_schema, max_depth=32, module_id=module_id)
585 except (SystemExit, RefResolverError):
586 raise
587 except Exception as e:
588 logger.warning("Failed to resolve $refs in schema for '%s', using raw schema: %s", module_id, e)
589 resolved_schema = input_schema
590 else:
591 resolved_schema = input_schema
593 try:
594 schema_options = schema_to_click_options(resolved_schema, max_help_length=help_text_max_length)
595 except ValueError as e:
596 click.echo(f"Error: Module '{module_id}' schema error: {e}", err=True)
597 sys.exit(2)
599 def callback(**kwargs: Any) -> None:
600 # Separate built-in options from schema-generated kwargs
601 stdin_input = kwargs.pop("input", None)
602 auto_approve = kwargs.pop("yes", False)
603 large_input = kwargs.pop("large_input", False)
604 output_format = kwargs.pop("format", None)
605 output_fields = kwargs.pop("fields", None)
606 sandbox_flag = kwargs.pop("sandbox", False)
607 dry_run = kwargs.pop("dry_run", False)
608 trace_flag = kwargs.pop("trace", False)
609 stream_flag = kwargs.pop("stream", False)
610 strategy_name = kwargs.pop("strategy", None)
611 approval_timeout = kwargs.pop("approval_timeout", None) or 60
612 approval_token = kwargs.pop("approval_token", None)
614 merged: dict[str, Any] = {}
615 audit_start = time.monotonic()
616 try:
617 # 1. Collect and merge input (STDIN + CLI flags)
618 merged = collect_input(stdin_input, kwargs, large_input)
620 # 2. Reconvert enum values to original types
621 merged = reconvert_enum_values(merged, schema_options)
623 # -- Dry-run: preflight validation only, no execution --
624 if dry_run:
625 preflight = executor.validate(module_id, merged)
626 format_preflight_result(preflight, output_format)
627 # --trace --dry-run: show which pipeline steps would run
628 if trace_flag and hasattr(preflight, "checks"):
629 click.echo("\nPipeline preview (dry-run):", err=True)
630 _pure_steps = {
631 "context_creation",
632 "call_chain_guard",
633 "module_lookup",
634 "acl_check",
635 "input_validation",
636 }
637 _all_steps = [
638 "context_creation",
639 "call_chain_guard",
640 "module_lookup",
641 "acl_check",
642 "approval_gate",
643 "middleware_before",
644 "input_validation",
645 "execute",
646 "output_validation",
647 "middleware_after",
648 "return_result",
649 ]
650 for s in _all_steps:
651 if s in _pure_steps:
652 click.echo(f" \u2713 {s:<24} (pure — would execute)", err=True)
653 else:
654 click.echo(f" \u25cb {s:<24} (impure — skipped in dry-run)", err=True)
655 sys.exit(0 if preflight.valid else _first_failed_exit_code(preflight))
657 # 3. Validate against schema (if schema has properties)
658 if resolved_schema.get("properties"):
659 try:
660 jsonschema.validate(merged, resolved_schema)
661 except jsonschema.ValidationError as ve:
662 click.echo(
663 f"Error: Validation failed for '{ve.path}': {ve.message}.",
664 err=True,
665 )
666 sys.exit(45)
668 # -- Inject approval token if provided --
669 if approval_token:
670 merged["_approval_token"] = approval_token
672 # 4. Check approval gate
673 check_approval(module_def, auto_approve, timeout=approval_timeout)
675 # 5. Execute (optionally sandboxed)
677 # -- Streaming execution --
678 if stream_flag:
679 import asyncio
681 # Streaming always outputs JSONL; --format table is ignored (spec §3.6.2)
682 if output_format == "table":
683 logger.warning("Streaming mode always outputs JSONL; --format table is ignored.")
685 annotations = getattr(module_def, "annotations", None)
686 is_streaming = getattr(annotations, "streaming", False)
687 if not is_streaming:
688 logger.warning(
689 "Module '%s' does not declare streaming support. Falling back to standard execution.",
690 module_id,
691 )
693 if is_streaming and hasattr(executor, "stream"):
695 async def _do_stream() -> None:
696 chunks = 0
697 async for chunk in executor.stream(module_id, merged):
698 chunks += 1
699 click.echo(json.dumps(chunk, default=str))
700 sys.stdout.flush()
701 if sys.stderr.isatty():
702 click.echo(
703 f"\rStreaming {module_id}... ({chunks} chunks)",
704 err=True,
705 nl=False,
706 )
707 if sys.stderr.isatty():
708 click.echo("", err=True)
710 asyncio.run(_do_stream())
711 duration_ms = int((time.monotonic() - audit_start) * 1000)
712 if _audit_logger is not None:
713 _audit_logger.log_execution(module_id, merged, "success", 0, duration_ms)
714 return
715 # else: fall through to normal execution
717 # -- Traced execution --
718 if trace_flag and hasattr(executor, "call_with_trace"):
719 result, trace = executor.call_with_trace(
720 module_id,
721 merged,
722 strategy=strategy_name,
723 )
724 duration_ms = int((time.monotonic() - audit_start) * 1000)
726 # Print result (before audit so a formatter raise doesn't double-log)
727 if output_format == "json" or not sys.stdout.isatty():
728 # Merge _trace into JSON output
729 trace_data = {
730 "strategy": trace.strategy_name,
731 "total_duration_ms": trace.total_duration_ms,
732 "success": trace.success,
733 "steps": [
734 {
735 "name": s.name,
736 "duration_ms": s.duration_ms,
737 "skipped": s.skipped,
738 **({"skip_reason": s.skip_reason} if s.skipped else {}),
739 }
740 for s in trace.steps
741 ],
742 }
743 if isinstance(result, dict):
744 output = {**result, "_trace": trace_data}
745 else:
746 output = {"result": result, "_trace": trace_data}
747 click.echo(json.dumps(output, indent=2, default=str))
748 else:
749 format_exec_result(result, output_format, fields=output_fields)
750 # Print trace to stderr
751 step_count = len(trace.steps)
752 click.echo(
753 f"\nPipeline Trace (strategy: {trace.strategy_name}, "
754 f"{step_count} steps, {trace.total_duration_ms:.1f}ms)",
755 err=True,
756 )
757 for s in trace.steps:
758 if s.skipped:
759 sym = "\u25cb"
760 dur = "\u2014"
761 reason = f" skipped ({s.skip_reason or 'n/a'})"
762 else:
763 sym = "\u2713"
764 dur = f"{s.duration_ms:.1f}ms"
765 reason = ""
766 click.echo(f" {sym} {s.name:<24} {dur:>8}{reason}", err=True)
768 # Audit after formatting — formatter raise will not produce a
769 # duplicate success+error pair
770 if _audit_logger is not None:
771 _audit_logger.log_execution(module_id, merged, "success", 0, duration_ms)
772 return
774 # -- Standard execution (with optional strategy) --
775 sandbox = Sandbox(enabled=sandbox_flag, extensions_root=extensions_root)
776 if strategy_name and hasattr(executor, "call_with_trace"):
777 if sandbox_flag:
778 # Sandbox mode: delegate to subprocess (strategy not available in sandbox)
779 logger.warning("--sandbox ignores --strategy; sandboxed execution uses default strategy.")
780 result = sandbox.execute(module_id, merged, executor)
781 else:
782 # Strategy requires call_with_trace to pass strategy param
783 result, _trace = executor.call_with_trace(
784 module_id,
785 merged,
786 strategy=strategy_name,
787 )
788 if strategy_name != "standard" and sys.stderr.isatty():
789 click.echo(
790 f"Warning: Using '{strategy_name}' strategy.",
791 err=True,
792 )
793 else:
794 if strategy_name and not hasattr(executor, "call_with_trace"):
795 logger.warning(
796 "--strategy '%s' requested but executor does not support call_with_trace; "
797 "using default pipeline.",
798 strategy_name,
799 )
800 result = sandbox.execute(module_id, merged, executor)
801 duration_ms = int((time.monotonic() - audit_start) * 1000)
803 # 6. Format and print result (before audit so a formatter raise doesn't
804 # produce a spurious success entry followed by an error entry)
805 format_exec_result(result, output_format, fields=output_fields)
807 # 7. Audit log (success) — only reached if formatting did not raise
808 if _audit_logger is not None:
809 _audit_logger.log_execution(module_id, merged, "success", 0, duration_ms)
811 except KeyboardInterrupt:
812 click.echo("Execution cancelled.", err=True)
813 sys.exit(130)
814 except SystemExit:
815 raise
816 except Exception as e:
817 error_code = getattr(e, "code", None)
818 exit_code = _ERROR_CODE_MAP.get(error_code, 1) if isinstance(error_code, str) else 1
820 # Audit log (error)
821 if _audit_logger is not None:
822 duration_ms = int((time.monotonic() - audit_start) * 1000)
823 _audit_logger.log_execution(module_id, merged, "error", exit_code, duration_ms)
825 if output_format == "json" or not sys.stderr.isatty():
826 _emit_error_json(e, exit_code)
827 else:
828 _emit_error_tty(e, exit_code)
829 sys.exit(exit_code)
831 # Build the command with schema-generated options + built-in options
832 _epilog_parts: list[str] = []
833 if not _verbose_help:
834 _epilog_parts.append("Use --verbose to show all options (including built-in apcore options).")
835 if _docs_url:
836 _epilog_parts.append(f"Docs: {_docs_url}/commands/{effective_cmd_name}")
837 _epilog = "\n".join(_epilog_parts) if _epilog_parts else None
838 cmd = click.Command(
839 name=effective_cmd_name,
840 help=cmd_help,
841 callback=callback,
842 epilog=_epilog,
843 )
845 # Add built-in options (hidden unless --verbose is passed with --help)
846 _hide = not _verbose_help
847 cmd.params.append(
848 click.Option(
849 ["--input"],
850 default=None,
851 help="Read JSON input from a file path, or use '-' to read from stdin pipe.",
852 hidden=_hide,
853 )
854 )
855 cmd.params.append(
856 click.Option(
857 ["--yes", "-y"],
858 is_flag=True,
859 default=False,
860 help="Skip interactive approval prompts (for scripts and CI).",
861 hidden=_hide,
862 )
863 )
864 cmd.params.append(
865 click.Option(
866 ["--large-input"],
867 is_flag=True,
868 default=False,
869 help="Allow stdin input larger than 10MB (default limit protects against accidental pipes).",
870 hidden=_hide,
871 )
872 )
873 cmd.params.append(
874 click.Option(
875 ["--format"],
876 type=click.Choice(["json", "table", "csv", "yaml", "jsonl"]),
877 default=None,
878 help="Output format: json, table, csv, yaml, jsonl.",
879 hidden=_hide,
880 )
881 )
882 cmd.params.append(
883 click.Option(
884 ["--fields"],
885 default=None,
886 help="Comma-separated dot-paths to select from the result (e.g., 'status,data.count').",
887 hidden=_hide,
888 )
889 )
890 # --sandbox is always hidden (not yet implemented)
891 cmd.params.append(
892 click.Option(
893 ["--sandbox"],
894 is_flag=True,
895 default=False,
896 help="Run module in an isolated subprocess with restricted filesystem and env access.",
897 hidden=True,
898 )
899 )
900 cmd.params.append(
901 click.Option(
902 ["--dry-run"],
903 is_flag=True,
904 default=False,
905 help="Run preflight checks without executing the module. Shows validation results.",
906 hidden=_hide,
907 )
908 )
909 cmd.params.append(
910 click.Option(
911 ["--trace"],
912 is_flag=True,
913 default=False,
914 help="Show execution pipeline trace with per-step timing after the result.",
915 hidden=_hide,
916 )
917 )
918 cmd.params.append(
919 click.Option(
920 ["--stream"],
921 is_flag=True,
922 default=False,
923 help="Stream module output as JSONL (one JSON object per line, flushed immediately).",
924 hidden=_hide,
925 )
926 )
927 cmd.params.append(
928 click.Option(
929 ["--strategy"],
930 type=click.Choice(["standard", "internal", "testing", "performance", "minimal"]),
931 default=None,
932 help="Execution pipeline strategy: standard (default), internal, testing, performance.",
933 hidden=_hide,
934 )
935 )
936 cmd.params.append(
937 click.Option(
938 ["--approval-timeout"],
939 type=int,
940 default=None,
941 help="Override approval prompt timeout in seconds (default: 60).",
942 hidden=_hide,
943 )
944 )
945 cmd.params.append(
946 click.Option(
947 ["--approval-token"],
948 default=None,
949 help="Resume a pending approval with the given token (for async approval flows).",
950 hidden=_hide,
951 )
952 )
954 # Guard: schema property names must not collide with built-in option names.
955 _reserved = {
956 "input",
957 "yes",
958 "large_input",
959 "format",
960 "fields",
961 "sandbox",
962 "verbose",
963 "dry_run",
964 "trace",
965 "stream",
966 "strategy",
967 "approval_timeout",
968 "approval_token",
969 }
970 for opt in schema_options:
971 if opt.name in _reserved:
972 click.echo(
973 f"Error: Module '{module_id}' schema property '{opt.name}' conflicts "
974 f"with a reserved CLI option name. Rename the property.",
975 err=True,
976 )
977 sys.exit(2)
979 # Add schema-generated options
980 cmd.params.extend(schema_options)
982 return cmd
985def validate_module_id(module_id: str) -> None:
986 """Validate module ID format and length.
988 Length limit tracks PROTOCOL_SPEC §2.7 EBNF constraint #1 — bumped from
989 128 to 192 in spec 1.6.0-draft to accommodate Java/.NET deep-namespace
990 FQN-derived IDs. Filesystem-safe (192 + len('.binding.yaml')=205 < 255).
991 """
992 if len(module_id) > 192:
993 click.echo(
994 f"Error: Invalid module ID format: '{module_id}'. Maximum length is 192 characters.",
995 err=True,
996 )
997 sys.exit(2)
998 if not re.fullmatch(r"[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)*", module_id):
999 click.echo(
1000 f"Error: Invalid module ID format: '{module_id}'.",
1001 err=True,
1002 )
1003 sys.exit(2)
1006def collect_input(
1007 stdin_flag: str | None,
1008 cli_kwargs: dict[str, Any],
1009 large_input: bool = False,
1010) -> dict[str, Any]:
1011 """Collect and merge input from STDIN, a file path, and CLI flags.
1013 ``stdin_flag`` accepts three forms:
1014 * ``None`` or empty — use CLI kwargs only.
1015 * ``"-"`` — read JSON from STDIN (10 MB cap unless ``large_input``).
1016 * any other string — treat as a file path; open and read JSON.
1018 CLI flags override STDIN/file values for duplicate keys.
1019 """
1020 # Remove None values from CLI kwargs
1021 cli_kwargs_non_none = {k: v for k, v in cli_kwargs.items() if v is not None}
1023 if not stdin_flag:
1024 return cli_kwargs_non_none
1026 if stdin_flag == "-":
1027 raw = sys.stdin.read()
1028 raw_size = len(raw.encode("utf-8"))
1030 if raw_size > 10_485_760 and not large_input:
1031 click.echo(
1032 "Error: STDIN input exceeds 10MB limit. Use --large-input to override.",
1033 err=True,
1034 )
1035 sys.exit(2)
1036 source_label = "STDIN"
1037 else:
1038 # File-path form.
1039 try:
1040 raw = Path(stdin_flag).read_text(encoding="utf-8")
1041 except FileNotFoundError:
1042 click.echo(f"Error: --input file '{stdin_flag}' does not exist.", err=True)
1043 sys.exit(2)
1044 except OSError as e:
1045 click.echo(f"Error: could not read --input file '{stdin_flag}': {e}.", err=True)
1046 sys.exit(2)
1047 raw_size = len(raw.encode("utf-8"))
1048 if raw_size > 10_485_760 and not large_input:
1049 click.echo(
1050 f"Error: --input file '{stdin_flag}' exceeds 10MB limit. Use --large-input to override.",
1051 err=True,
1052 )
1053 sys.exit(2)
1054 source_label = f"--input file '{stdin_flag}'"
1056 if not raw:
1057 stdin_data: dict[str, Any] = {}
1058 else:
1059 try:
1060 stdin_data = json.loads(raw)
1061 except json.JSONDecodeError as e:
1062 click.echo(
1063 f"Error: {source_label} does not contain valid JSON: {e.msg}.",
1064 err=True,
1065 )
1066 sys.exit(2)
1068 if not isinstance(stdin_data, dict):
1069 click.echo(
1070 f"Error: {source_label} JSON must be an object, got {type(stdin_data).__name__}.",
1071 err=True,
1072 )
1073 sys.exit(2)
1075 # CLI flags override STDIN/file for duplicate keys
1076 return {**stdin_data, **cli_kwargs_non_none}