Coverage for src/meshadmin/cli/utils.py: 25%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-22 07:09 +0200

1import asyncio 

2import json 

3import os 

4import signal 

5from datetime import datetime, timedelta 

6from pathlib import Path 

7 

8import httpx 

9import structlog 

10import typer 

11import yaml 

12from jwcrypto.jwk import JWK 

13from jwcrypto.jwt import JWT 

14from jwt import decode 

15 

16from meshadmin.cli.config import get_config 

17from meshadmin.common.utils import create_expiration_date, get_nebula_path 

18 

19logger = structlog.get_logger(__name__) 

20 

21 

22def get_access_token(): 

23 if os.getenv("MESHADMIN_TEST_MODE") == "true": 

24 return "test-token" 

25 

26 config = get_config() 

27 if config.authentication_path.exists(): 

28 auth = json.loads(config.authentication_path.read_text()) 

29 access_token = auth["access_token"] 

30 

31 decoded_token = decode( 

32 access_token, options={"verify_signature": False, "verify_exp": False} 

33 ) 

34 

35 # is exp still 2/3 of the time 

36 if decoded_token["exp"] >= (datetime.now() + timedelta(seconds=10)).timestamp(): 

37 return access_token 

38 else: 

39 refresh_token = auth["refresh_token"] 

40 res = httpx.post( 

41 config.keycloak_token_url, 

42 data={ 

43 "grant_type": "refresh_token", 

44 "refresh_token": refresh_token, 

45 "client_id": config.keycloak_admin_client, 

46 }, 

47 ) 

48 res.raise_for_status() 

49 config.authentication_path.write_bytes(res.content) 

50 return res.json()["access_token"] 

51 

52 else: 

53 print("authentication failed") 

54 

55 

56def get_context_config(): 

57 config = get_config() 

58 if not config.contexts_file.exists(): 

59 print("No contexts found") 

60 raise typer.Exit(1) 

61 

62 with open(config.contexts_file) as f: 

63 contexts = yaml.safe_load(f) or {} 

64 

65 current = os.getenv("MESH_CONTEXT") 

66 if not current: 

67 active_contexts = [ 

68 name for name, data in contexts.items() if data.get("active") 

69 ] 

70 current = active_contexts[0] if active_contexts else None 

71 

72 if not current or current not in contexts: 

73 print("No active context. Please select a context with 'meshadmin context use'") 

74 raise typer.Exit(1) 

75 

76 context_data = contexts[current] 

77 network_dir = config.networks_dir / current 

78 

79 return { 

80 "name": current, 

81 "endpoint": context_data["endpoint"], 

82 "interface": context_data["interface"], 

83 "network_dir": network_dir, 

84 } 

85 

86 

87async def get_config_from_mesh(mesh_admin_endpoint, private_auth_key): 

88 jwt = JWT( 

89 header={"alg": "RS256", "kid": private_auth_key.thumbprint()}, 

90 claims={ 

91 "exp": create_expiration_date(10), 

92 "kid": private_auth_key.thumbprint(), 

93 }, 

94 ) 

95 jwt.make_signed_token(private_auth_key) 

96 token = jwt.serialize() 

97 

98 async with httpx.AsyncClient() as client: 

99 res = await client.get( 

100 f"{mesh_admin_endpoint}/api/v1/config", 

101 headers={"Authorization": f"Bearer {token}"}, 

102 ) 

103 res.raise_for_status() 

104 config = res.text 

105 update_interval = int(res.headers.get("X-Update-Interval", "5")) 

106 return config, update_interval 

107 

108 

109async def cleanup_ephemeral_hosts(mesh_admin_endpoint, private_auth_key): 

110 jwt_token = JWT( 

111 header={"alg": "RS256", "kid": private_auth_key.thumbprint()}, 

112 claims={ 

113 "exp": create_expiration_date(10), 

114 "kid": private_auth_key.thumbprint(), 

115 }, 

116 ) 

117 jwt_token.make_signed_token(private_auth_key) 

118 token = jwt_token.serialize() 

119 

120 async with httpx.AsyncClient() as client: 

121 res = await client.post( 

122 f"{mesh_admin_endpoint}/api/v1/cleanup-ephemeral", 

123 headers={"Authorization": f"Bearer {token}"}, 

124 ) 

125 res.raise_for_status() 

126 return res.json() 

127 

128 

129async def start_nebula(network_dir: Path, mesh_admin_endpoint: str): 

130 config = get_config() 

131 await logger.ainfo("starting nebula") 

132 conf_path = network_dir / config.config_path 

133 assert conf_path.exists(), f"Config at {conf_path} does not exist" 

134 

135 private_auth_key_path = config.contexts_file.parent / config.private_key 

136 assert private_auth_key_path.exists(), ( 

137 f"private_key at {private_auth_key_path} does not exist" 

138 ) 

139 

140 async def start_process(): 

141 return await asyncio.create_subprocess_exec( 

142 get_nebula_path(), 

143 "-config", 

144 str(conf_path), 

145 cwd=network_dir, 

146 ) 

147 

148 proc = await start_process() 

149 

150 # Default update interval in seconds 

151 update_interval = 5 

152 

153 while True: 

154 await asyncio.sleep(update_interval) 

155 try: 

156 private_auth_key_path = config.contexts_file.parent / config.private_key 

157 private_auth_key = JWK.from_json(private_auth_key_path.read_text()) 

158 

159 # Check for config updates 

160 try: 

161 new_config, new_update_interval = await get_config_from_mesh( 

162 mesh_admin_endpoint, private_auth_key 

163 ) 

164 

165 if update_interval != new_update_interval: 

166 await logger.ainfo( 

167 "update interval changed", 

168 old_interval=update_interval, 

169 new_interval=new_update_interval, 

170 ) 

171 update_interval = new_update_interval 

172 

173 old_config = conf_path.read_text() 

174 if new_config != old_config: 

175 await logger.ainfo("config changed, reloading") 

176 conf_path.write_text(new_config) 

177 conf_path.chmod(0o600) 

178 

179 try: 

180 proc.send_signal(signal.SIGHUP) 

181 except ProcessLookupError: 

182 await logger.ainfo("process died, restarting") 

183 proc = await start_process() 

184 else: 

185 await logger.ainfo("config not changed") 

186 except httpx.HTTPStatusError as e: 

187 if e.response.status_code == 401: 

188 await logger.aerror( 

189 "Could not get config because of authentication error. Host may have been deleted.", 

190 error=str(e), 

191 response_text=e.response.text, 

192 ) 

193 print( 

194 "Error: Could not get config because of authentication error. Host may have been deleted." 

195 ) 

196 print(f"Server message: {e.response.text}") 

197 break 

198 else: 

199 await logger.aerror("error getting config", error=str(e)) 

200 

201 # Cleanup ephemeral hosts 

202 try: 

203 result = await cleanup_ephemeral_hosts( 

204 mesh_admin_endpoint, private_auth_key 

205 ) 

206 if result.get("removed_count", 0) > 0: 

207 await logger.ainfo( 

208 "removed stale ephemeral hosts", 

209 count=result["removed_count"], 

210 ) 

211 except httpx.HTTPStatusError as e: 

212 if e.response.status_code == 401: 

213 await logger.aerror( 

214 "Could not clean up ephemeral hosts because of authentication error. Host may have been deleted.", 

215 error=str(e), 

216 response_text=e.response.text, 

217 ) 

218 print( 

219 "Error: Could not clean up ephemeral hosts because of authentication error. Host may have been deleted." 

220 ) 

221 print(f"Server message: {e.response.text}") 

222 break 

223 else: 

224 await logger.aerror("error during cleanup operation", error=str(e)) 

225 

226 except Exception: 

227 await logger.aexception("could not refresh token") 

228 if proc.returncode is not None: 

229 await logger.ainfo("process died, restarting") 

230 proc = await start_process() 

231 

232 # Clean shutdown if we get here 

233 if proc.returncode is None: 

234 await logger.ainfo("shutting down nebula process") 

235 proc.terminate() 

236 try: 

237 await asyncio.wait_for(proc.wait(), timeout=5.0) 

238 except asyncio.TimeoutError: 

239 await logger.awarning("nebula process didn't terminate, killing it") 

240 proc.kill()