Coverage for src/meshadmin/cli/commands/host.py: 94%

117 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-22 07:09 +0200

1import asyncio 

2import os 

3import platform 

4from pathlib import Path 

5from typing import Annotated 

6from uuid import uuid4 

7 

8import httpx 

9import structlog 

10import typer 

11from jwcrypto.jwk import JWK 

12 

13from meshadmin.cli.commands.nebula import download 

14from meshadmin.cli.config import get_config 

15from meshadmin.cli.utils import ( 

16 get_access_token, 

17 get_config_from_mesh, 

18 get_context_config, 

19) 

20from meshadmin.common import schemas 

21from meshadmin.common.utils import create_keys, get_default_config_path, get_public_ip 

22 

23host_app = typer.Typer() 

24logger = structlog.get_logger(__name__) 

25 

26host_config_app = typer.Typer() 

27host_app.add_typer(host_config_app, name="config", help="Manage host configurations") 

28 

29 

30@host_app.command(name="enroll") 

31def host_enroll( 

32 enrollment_key: Annotated[ 

33 str, 

34 typer.Argument(envvar="MESH_ENROLLMENT_KEY"), 

35 ], 

36 preferred_hostname: Annotated[ 

37 str, 

38 typer.Option(envvar="MESH_HOSTNAME"), 

39 ] = None, 

40 public_ip: Annotated[ 

41 str, 

42 typer.Option(envvar="MESH_PUBLIC_IP"), 

43 ] = None, 

44): 

45 config = get_config() 

46 context = get_context_config() 

47 network_dir = context["network_dir"] 

48 

49 download() 

50 logger.info("enrolling") 

51 

52 network_dir.mkdir(parents=True, exist_ok=True) 

53 

54 # Use shared auth key for all contexts 

55 private_auth_key_path = config.contexts_file.parent / config.private_key 

56 if not private_auth_key_path.exists(): 

57 logger.info("creating auth key") 

58 create_auth_key(private_auth_key_path.parent) 

59 

60 jwk = JWK.from_json(private_auth_key_path.read_text()) 

61 public_auth_key = jwk.export_public() 

62 logger.info("public key for registration", public_key=public_auth_key) 

63 

64 private_net_key_path = network_dir / config.private_net_key_file 

65 public_net_key_path = network_dir / config.public_net_key_file 

66 

67 if public_ip is None: 

68 public_ip = get_public_ip() 

69 logger.info( 

70 "public ip not set, using ip reported by https://checkip.amazonaws.com/", 

71 public_ip=public_ip, 

72 ) 

73 

74 if preferred_hostname is None: 

75 preferred_hostname = platform.node() 

76 logger.info( 

77 "preferred hostname not set, using system hostname", 

78 hostname=preferred_hostname, 

79 ) 

80 

81 if private_net_key_path.exists() and public_net_key_path.exists(): 

82 public_nebula_key = public_net_key_path.read_text() 

83 logger.info( 

84 "private and public nebula key already exists", 

85 public_key=public_nebula_key, 

86 ) 

87 else: 

88 logger.info("creating private and public nebula key") 

89 private, public_nebula_key = create_keys() 

90 private_net_key_path.write_text(private) 

91 private_auth_key_path.chmod(0o600) 

92 public_net_key_path.write_text(public_nebula_key) 

93 public_net_key_path.chmod(0o600) 

94 logger.info( 

95 "private and public nebula key created", public_nebula_key=public_nebula_key 

96 ) 

97 

98 enrollment = schemas.ClientEnrollment( 

99 enrollment_key=enrollment_key, 

100 public_net_key=public_nebula_key, 

101 public_auth_key=public_auth_key, 

102 preferred_hostname=preferred_hostname, 

103 public_ip=public_ip, 

104 interface=context["interface"], 

105 ) 

106 

107 res = httpx.post( 

108 f"{context['endpoint']}/api/v1/enroll", 

109 content=enrollment.model_dump_json(), 

110 headers={"Content-Type": "application/json"}, 

111 ) 

112 res.raise_for_status() 

113 

114 get_host_config() 

115 logger.info("enrollment response", enrollment=res.content) 

116 logger.info("enrollment finished") 

117 

118 

119@host_app.command() 

120def create_auth_key( 

121 mesh_config_path: Annotated[ 

122 Path, 

123 typer.Argument(envvar="MESH_CONFIG_PATH"), 

124 ] = get_default_config_path(), 

125): 

126 config = get_config() 

127 jwk = JWK.generate(kty="RSA", kid=str(uuid4()), size=2048) 

128 auth_key = mesh_config_path / config.private_key 

129 auth_key.write_text(jwk.export_private()) 

130 auth_key.chmod(0o600) 

131 

132 

133@host_app.command() 

134def show_auth_public_key( 

135 mesh_config_path: Annotated[ 

136 Path, 

137 typer.Argument(envvar="MESH_CONFIG_PATH"), 

138 ] = get_default_config_path(), 

139): 

140 config = get_config() 

141 jwk = JWK.from_json((mesh_config_path / config.private_key).read_text()) 

142 print(jwk.export_public()) 

143 

144 

145@host_config_app.command() 

146def get_host_config(): 

147 config = get_config() 

148 private_net_key, public_net_key = create_keys() 

149 context = get_context_config() 

150 private_auth_key = JWK.from_json( 

151 (config.contexts_file.parent / config.private_key).read_text() 

152 ) 

153 

154 loop = asyncio.get_event_loop() 

155 

156 result, _ = loop.run_until_complete( 

157 get_config_from_mesh(context["endpoint"], private_auth_key) 

158 ) 

159 (context["network_dir"] / config.config_path).write_text(result) 

160 

161 

162@host_app.command(name="delete") 

163def delete_host(name: str): 

164 try: 

165 access_token = get_access_token() 

166 except Exception: 

167 logger.exception("failed to get access token") 

168 exit(1) 

169 

170 context = get_context_config() 

171 res = httpx.delete( 

172 f"{context['endpoint']}/api/v1/hosts/{name}", 

173 headers={"Authorization": f"Bearer {access_token}"}, 

174 ) 

175 res.raise_for_status() 

176 print(res.json()) 

177 

178 

179@host_config_app.command(name="info") 

180def show_config_info(): 

181 config = get_config() 

182 print("\nConfiguration Paths:") 

183 print(f"Contexts file: {config.contexts_file}") 

184 print(f"Networks directory: {config.networks_dir}") 

185 try: 

186 context = get_context_config() 

187 print("\nCurrent Context:") 

188 print(f"Name: {context['name']}") 

189 print(f"Endpoint: {context['endpoint']}") 

190 print(f"Interface: {context['interface']}") 

191 print(f"Network directory: {context['network_dir']}") 

192 

193 config_file = context["network_dir"] / config.config_path 

194 env_file = context["network_dir"] / "env" 

195 private_key = context["network_dir"] / config.private_net_key_file 

196 

197 print("\nContext Files:") 

198 print( 

199 f"Config file: {config_file} {'(exists)' if config_file.exists() else '(not found)'}" 

200 ) 

201 print( 

202 f"Environment file: {env_file} {'(exists)' if env_file.exists() else '(not found)'}" 

203 ) 

204 print( 

205 f"Private key: {private_key} {'(exists)' if private_key.exists() else '(not found)'}" 

206 ) 

207 if platform.system() == "Darwin": 

208 service_file = Path( 

209 os.path.expanduser( 

210 f"~/Library/LaunchAgents/com.meshadmin.{context['name']}.plist" 

211 ) 

212 ) 

213 print( 

214 f"Service file: {service_file} {'(exists)' if service_file.exists() else '(not found)'}" 

215 ) 

216 else: 

217 service_file = Path( 

218 f"/usr/lib/systemd/system/meshadmin-{context['name']}.service" 

219 ) 

220 print( 

221 f"Service file: {service_file} {'(exists)' if service_file.exists() else '(not found)'}" 

222 ) 

223 except typer.Exit: 

224 print("\nNo active context found")