Coverage for src/meshadmin/cli/main.py: 64%

70 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 08:49 +0200

1from importlib.metadata import PackageNotFoundError, version 

2from pathlib import Path 

3from time import sleep 

4from typing import Annotated 

5 

6import httpx 

7import jwt 

8import structlog 

9import typer 

10import yaml 

11 

12from meshadmin.cli.commands import ( 

13 context_app, 

14 host_app, 

15 nebula_app, 

16 network_app, 

17 service_app, 

18 system_app, 

19 template_app, 

20) 

21from meshadmin.cli.config import get_config, load_config, set_config 

22from meshadmin.common.utils import get_default_config_path 

23 

24app = typer.Typer() 

25logger = structlog.get_logger(__name__) 

26 

27app.add_typer(nebula_app, name="nebula", help="Manage the nebula service") 

28app.add_typer(service_app, name="service", help="Manage the meshadmin service") 

29app.add_typer(network_app, name="network", help="Manage networks") 

30app.add_typer(template_app, name="template", help="Manage templates") 

31app.add_typer(host_app, name="host", help="Manage hosts") 

32app.add_typer(context_app, name="context", help="Manage network contexts") 

33app.add_typer(system_app, name="system", help="System maintenance commands") 

34 

35 

36def version_callback(value: bool): 

37 if value: 

38 try: 

39 installed_version = version("meshadmin") 

40 typer.echo(f"meshadmin version {installed_version}") 

41 except PackageNotFoundError: 

42 typer.echo("meshadmin is not installed") 

43 raise typer.Exit() 

44 

45 

46@app.callback() 

47def main( 

48 ctx: typer.Context, 

49 version: bool = typer.Option( 

50 None, 

51 "--version", 

52 callback=version_callback, 

53 is_eager=True, 

54 help="Show the version and exit.", 

55 ), 

56 config_path: Annotated[ 

57 Path, 

58 typer.Option( 

59 "--config-path", 

60 "-c", 

61 envvar="MESHADMIN_CONFIG_PATH", 

62 help="Path to the configuration directory", 

63 ), 

64 ] = get_default_config_path(), 

65 context: Annotated[ 

66 str, 

67 typer.Option( 

68 "--context", 

69 envvar="MESH_CONTEXT", 

70 help="Name of the context to use", 

71 ), 

72 ] = None, 

73): 

74 config = load_config(config_path) 

75 set_config(config) 

76 

77 if context: 

78 if not config.contexts_file.exists(): 

79 print("No contexts found") 

80 raise typer.Exit(1) 

81 

82 with open(config.contexts_file) as f: 

83 contexts = yaml.safe_load(f) or {} 

84 

85 if context not in contexts: 

86 print(f"Context '{context}' not found") 

87 raise typer.Exit(1) 

88 

89 for ctx_name in contexts: 

90 contexts[ctx_name]["active"] = ctx_name == context 

91 

92 with open(config.contexts_file, "w") as f: 

93 yaml.dump(contexts, f) 

94 

95 

96@app.command() 

97def login(): 

98 config = get_config() 

99 res = httpx.post( 

100 config.keycloak_device_auth_url, 

101 data={ 

102 "client_id": config.keycloak_admin_client, 

103 }, 

104 ) 

105 res.raise_for_status() 

106 

107 device_auth_response = res.json() 

108 print(device_auth_response) 

109 print( 

110 "Please open the verification url", 

111 device_auth_response["verification_uri_complete"], 

112 ) 

113 

114 while True: 

115 res = httpx.post( 

116 config.keycloak_token_url, 

117 data={ 

118 "grant_type": "urn:ietf:params:oauth:grant-type:device_code", 

119 "client_id": config.keycloak_admin_client, 

120 "device_code": device_auth_response["device_code"], 

121 }, 

122 ) 

123 if res.status_code == 200: 

124 logger.info("Received auth token") 

125 config.authentication_path.write_bytes(res.content) 

126 config.authentication_path.chmod(0o600) 

127 

128 access_token = res.json()["access_token"] 

129 refresh_token = res.json()["refresh_token"] 

130 print( 

131 jwt.decode( 

132 refresh_token, 

133 algorithms=["RS256"], 

134 options={"verify_signature": False}, 

135 ) 

136 ) 

137 logger.info("access_token", access_token=access_token) 

138 print("successfully authenticated") 

139 break 

140 else: 

141 print(res.json()) 

142 sleep(device_auth_response["interval"]) 

143 

144 

145if __name__ == "__main__": 

146 app()