Coverage for src / sql_tool / cli / commands / config.py: 94%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-14 15:28 -0500

1"""Configuration management CLI commands.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING 

6 

7import typer 

8 

9from sql_tool.core.config import DEFAULT_CONFIG_PATH, load_config, resolve_config 

10 

11if TYPE_CHECKING: 

12 from pathlib import Path 

13 

14 from sql_tool.core.config import ResolvedConfig 

15 

16config_app = typer.Typer(help="Configuration management commands") 

17 

18 

19@config_app.callback(invoke_without_command=True) 

20def config_callback(ctx: typer.Context) -> None: 

21 if not ctx.invoked_subcommand: 

22 typer.echo(ctx.get_help()) 

23 raise typer.Exit() 

24 

25 

26def _get_resolved_config(ctx: typer.Context) -> tuple[ResolvedConfig, Path | None]: 

27 config_path: Path | None = ctx.obj.get("config_file") 

28 app_config = load_config(config_path) 

29 resolved = resolve_config( 

30 app_config, 

31 profile_name=ctx.obj.get("profile"), 

32 dsn=ctx.obj.get("dsn"), 

33 host=ctx.obj.get("host"), 

34 port=ctx.obj.get("port"), 

35 database=ctx.obj.get("database"), 

36 user=ctx.obj.get("user"), 

37 password=ctx.obj.get("password"), 

38 ) 

39 return resolved, config_path 

40 

41 

42def _mask_password(value: str | None) -> str: 

43 if value is None: 

44 return "not set" 

45 return "***" 

46 

47 

48@config_app.command("show") 

49def config_show(ctx: typer.Context) -> None: 

50 """Display resolved configuration with source attribution.""" 

51 resolved, config_path = _get_resolved_config(ctx) 

52 sources = resolved.sources 

53 

54 typer.echo("Connection Settings (resolved):") 

55 connection_fields = [ 

56 ("host", resolved.host), 

57 ("port", str(resolved.port)), 

58 ("database", resolved.dbname), 

59 ("user", resolved.user or "not set"), 

60 ("password", _mask_password(resolved.password)), 

61 ("sslmode", resolved.sslmode), 

62 ] 

63 for field_name, value in connection_fields: 

64 source_key = "dbname" if field_name == "database" else field_name 

65 source = sources.get(source_key, "default") 

66 typer.echo(f" {field_name}: {value} ({source})") 

67 

68 typer.echo("") 

69 typer.echo("General:") 

70 timeout_source = sources.get("default_timeout", "default") 

71 typer.echo(f" timeout: {resolved.default_timeout}s ({timeout_source})") 

72 format_source = sources.get("default_format", "default") 

73 typer.echo(f" format: {resolved.default_format} ({format_source})") 

74 

75 typer.echo("") 

76 if resolved.active_profile: 

77 typer.echo(f"Active Profile: {resolved.active_profile}") 

78 else: 

79 typer.echo("Active Profile: none") 

80 

81 display_path = config_path or DEFAULT_CONFIG_PATH 

82 typer.echo(f"Config File: {display_path}") 

83 

84 

85@config_app.command("profiles") 

86def config_profiles(ctx: typer.Context) -> None: 

87 """List available connection profiles.""" 

88 config_path: Path | None = ctx.obj.get("config_file") 

89 app_config = load_config(config_path) 

90 active_profile = ctx.obj.get("profile") or None 

91 

92 if not app_config.profiles: 

93 typer.echo("No profiles configured.") 

94 display_path = config_path or DEFAULT_CONFIG_PATH 

95 typer.echo(f"Add profiles to: {display_path}") 

96 return 

97 

98 typer.echo("Available Profiles:") 

99 typer.echo("") 

100 for name, profile in sorted(app_config.profiles.items()): 

101 is_active = name == active_profile 

102 marker = "* " if is_active else " " 

103 label = " (active)" if is_active else "" 

104 typer.echo(f"{marker}{name}{label}") 

105 

106 display_fields = [ 

107 ("host", profile.host), 

108 ("port", str(profile.port)), 

109 ("database", profile.dbname), 

110 ] 

111 if profile.user: 

112 display_fields.append(("user", profile.user)) 

113 if profile.sslmode != "prefer": 

114 display_fields.append(("sslmode", profile.sslmode)) 

115 

116 for field_name, value in display_fields: 

117 typer.echo(f" {field_name}: {value}") 

118 typer.echo("")