Coverage for src/meshadmin/cli/main.py: 64%

69 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-22 07:26 +0200

1from importlib.metadata import PackageNotFoundError, version 

2from pathlib import Path 

3from time import sleep 

4from typing import Annotated 

5 

6import httpx 

7import jwt 

8import structlog 

9import typer 

10import yaml 

11 

12from meshadmin.cli.commands import ( 

13 context_app, 

14 host_app, 

15 nebula_app, 

16 network_app, 

17 service_app, 

18 template_app, 

19) 

20from meshadmin.cli.config import get_config, load_config, set_config 

21from meshadmin.common.utils import get_default_config_path 

22 

23app = typer.Typer() 

24logger = structlog.get_logger(__name__) 

25 

26app.add_typer(nebula_app, name="nebula", help="Manage the nebula service") 

27app.add_typer(service_app, name="service", help="Manage the meshadmin service") 

28app.add_typer(network_app, name="network", help="Manage networks") 

29app.add_typer(template_app, name="template", help="Manage templates") 

30app.add_typer(host_app, name="host", help="Manage hosts") 

31app.add_typer(context_app, name="context", help="Manage network contexts") 

32 

33 

34def version_callback(value: bool): 

35 if value: 

36 try: 

37 installed_version = version("meshadmin") 

38 typer.echo(f"meshadmin version {installed_version}") 

39 except PackageNotFoundError: 

40 typer.echo("meshadmin is not installed") 

41 raise typer.Exit() 

42 

43 

44@app.callback() 

45def main( 

46 ctx: typer.Context, 

47 version: bool = typer.Option( 

48 None, 

49 "--version", 

50 callback=version_callback, 

51 is_eager=True, 

52 help="Show the version and exit.", 

53 ), 

54 config_path: Annotated[ 

55 Path, 

56 typer.Option( 

57 "--config-path", 

58 "-c", 

59 envvar="MESHADMIN_CONFIG_PATH", 

60 help="Path to the configuration directory", 

61 ), 

62 ] = get_default_config_path(), 

63 context: Annotated[ 

64 str, 

65 typer.Option( 

66 "--context", 

67 envvar="MESH_CONTEXT", 

68 help="Name of the context to use", 

69 ), 

70 ] = None, 

71): 

72 config = load_config(config_path) 

73 set_config(config) 

74 

75 if context: 

76 if not config.contexts_file.exists(): 

77 print("No contexts found") 

78 raise typer.Exit(1) 

79 

80 with open(config.contexts_file) as f: 

81 contexts = yaml.safe_load(f) or {} 

82 

83 if context not in contexts: 

84 print(f"Context '{context}' not found") 

85 raise typer.Exit(1) 

86 

87 for ctx_name in contexts: 

88 contexts[ctx_name]["active"] = ctx_name == context 

89 

90 with open(config.contexts_file, "w") as f: 

91 yaml.dump(contexts, f) 

92 

93 

94@app.command() 

95def login(): 

96 config = get_config() 

97 res = httpx.post( 

98 config.keycloak_device_auth_url, 

99 data={ 

100 "client_id": config.keycloak_admin_client, 

101 }, 

102 ) 

103 res.raise_for_status() 

104 

105 device_auth_response = res.json() 

106 print(device_auth_response) 

107 print( 

108 "Please open the verification url", 

109 device_auth_response["verification_uri_complete"], 

110 ) 

111 

112 while True: 

113 res = httpx.post( 

114 config.keycloak_token_url, 

115 data={ 

116 "grant_type": "urn:ietf:params:oauth:grant-type:device_code", 

117 "client_id": config.keycloak_admin_client, 

118 "device_code": device_auth_response["device_code"], 

119 }, 

120 ) 

121 if res.status_code == 200: 

122 logger.info("Received auth token") 

123 config.authentication_path.write_bytes(res.content) 

124 config.authentication_path.chmod(0o600) 

125 

126 access_token = res.json()["access_token"] 

127 refresh_token = res.json()["refresh_token"] 

128 print( 

129 jwt.decode( 

130 refresh_token, 

131 algorithms=["RS256"], 

132 options={"verify_signature": False}, 

133 ) 

134 ) 

135 logger.info("access_token", access_token=access_token) 

136 print("successfully authenticated") 

137 break 

138 else: 

139 print(res.json()) 

140 sleep(device_auth_response["interval"]) 

141 

142 

143if __name__ == "__main__": 

144 app()