Coverage for src/meshadmin/cli/utils.py: 25%
130 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:09 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:09 +0200
1import asyncio
2import json
3import os
4import signal
5from datetime import datetime, timedelta
6from pathlib import Path
8import httpx
9import structlog
10import typer
11import yaml
12from jwcrypto.jwk import JWK
13from jwcrypto.jwt import JWT
14from jwt import decode
16from meshadmin.cli.config import get_config
17from meshadmin.common.utils import create_expiration_date, get_nebula_path
19logger = structlog.get_logger(__name__)
22def get_access_token():
23 if os.getenv("MESHADMIN_TEST_MODE") == "true":
24 return "test-token"
26 config = get_config()
27 if config.authentication_path.exists():
28 auth = json.loads(config.authentication_path.read_text())
29 access_token = auth["access_token"]
31 decoded_token = decode(
32 access_token, options={"verify_signature": False, "verify_exp": False}
33 )
35 # is exp still 2/3 of the time
36 if decoded_token["exp"] >= (datetime.now() + timedelta(seconds=10)).timestamp():
37 return access_token
38 else:
39 refresh_token = auth["refresh_token"]
40 res = httpx.post(
41 config.keycloak_token_url,
42 data={
43 "grant_type": "refresh_token",
44 "refresh_token": refresh_token,
45 "client_id": config.keycloak_admin_client,
46 },
47 )
48 res.raise_for_status()
49 config.authentication_path.write_bytes(res.content)
50 return res.json()["access_token"]
52 else:
53 print("authentication failed")
56def get_context_config():
57 config = get_config()
58 if not config.contexts_file.exists():
59 print("No contexts found")
60 raise typer.Exit(1)
62 with open(config.contexts_file) as f:
63 contexts = yaml.safe_load(f) or {}
65 current = os.getenv("MESH_CONTEXT")
66 if not current:
67 active_contexts = [
68 name for name, data in contexts.items() if data.get("active")
69 ]
70 current = active_contexts[0] if active_contexts else None
72 if not current or current not in contexts:
73 print("No active context. Please select a context with 'meshadmin context use'")
74 raise typer.Exit(1)
76 context_data = contexts[current]
77 network_dir = config.networks_dir / current
79 return {
80 "name": current,
81 "endpoint": context_data["endpoint"],
82 "interface": context_data["interface"],
83 "network_dir": network_dir,
84 }
87async def get_config_from_mesh(mesh_admin_endpoint, private_auth_key):
88 jwt = JWT(
89 header={"alg": "RS256", "kid": private_auth_key.thumbprint()},
90 claims={
91 "exp": create_expiration_date(10),
92 "kid": private_auth_key.thumbprint(),
93 },
94 )
95 jwt.make_signed_token(private_auth_key)
96 token = jwt.serialize()
98 async with httpx.AsyncClient() as client:
99 res = await client.get(
100 f"{mesh_admin_endpoint}/api/v1/config",
101 headers={"Authorization": f"Bearer {token}"},
102 )
103 res.raise_for_status()
104 config = res.text
105 update_interval = int(res.headers.get("X-Update-Interval", "5"))
106 return config, update_interval
109async def cleanup_ephemeral_hosts(mesh_admin_endpoint, private_auth_key):
110 jwt_token = JWT(
111 header={"alg": "RS256", "kid": private_auth_key.thumbprint()},
112 claims={
113 "exp": create_expiration_date(10),
114 "kid": private_auth_key.thumbprint(),
115 },
116 )
117 jwt_token.make_signed_token(private_auth_key)
118 token = jwt_token.serialize()
120 async with httpx.AsyncClient() as client:
121 res = await client.post(
122 f"{mesh_admin_endpoint}/api/v1/cleanup-ephemeral",
123 headers={"Authorization": f"Bearer {token}"},
124 )
125 res.raise_for_status()
126 return res.json()
129async def start_nebula(network_dir: Path, mesh_admin_endpoint: str):
130 config = get_config()
131 await logger.ainfo("starting nebula")
132 conf_path = network_dir / config.config_path
133 assert conf_path.exists(), f"Config at {conf_path} does not exist"
135 private_auth_key_path = config.contexts_file.parent / config.private_key
136 assert private_auth_key_path.exists(), (
137 f"private_key at {private_auth_key_path} does not exist"
138 )
140 async def start_process():
141 return await asyncio.create_subprocess_exec(
142 get_nebula_path(),
143 "-config",
144 str(conf_path),
145 cwd=network_dir,
146 )
148 proc = await start_process()
150 # Default update interval in seconds
151 update_interval = 5
153 while True:
154 await asyncio.sleep(update_interval)
155 try:
156 private_auth_key_path = config.contexts_file.parent / config.private_key
157 private_auth_key = JWK.from_json(private_auth_key_path.read_text())
159 # Check for config updates
160 try:
161 new_config, new_update_interval = await get_config_from_mesh(
162 mesh_admin_endpoint, private_auth_key
163 )
165 if update_interval != new_update_interval:
166 await logger.ainfo(
167 "update interval changed",
168 old_interval=update_interval,
169 new_interval=new_update_interval,
170 )
171 update_interval = new_update_interval
173 old_config = conf_path.read_text()
174 if new_config != old_config:
175 await logger.ainfo("config changed, reloading")
176 conf_path.write_text(new_config)
177 conf_path.chmod(0o600)
179 try:
180 proc.send_signal(signal.SIGHUP)
181 except ProcessLookupError:
182 await logger.ainfo("process died, restarting")
183 proc = await start_process()
184 else:
185 await logger.ainfo("config not changed")
186 except httpx.HTTPStatusError as e:
187 if e.response.status_code == 401:
188 await logger.aerror(
189 "Could not get config because of authentication error. Host may have been deleted.",
190 error=str(e),
191 response_text=e.response.text,
192 )
193 print(
194 "Error: Could not get config because of authentication error. Host may have been deleted."
195 )
196 print(f"Server message: {e.response.text}")
197 break
198 else:
199 await logger.aerror("error getting config", error=str(e))
201 # Cleanup ephemeral hosts
202 try:
203 result = await cleanup_ephemeral_hosts(
204 mesh_admin_endpoint, private_auth_key
205 )
206 if result.get("removed_count", 0) > 0:
207 await logger.ainfo(
208 "removed stale ephemeral hosts",
209 count=result["removed_count"],
210 )
211 except httpx.HTTPStatusError as e:
212 if e.response.status_code == 401:
213 await logger.aerror(
214 "Could not clean up ephemeral hosts because of authentication error. Host may have been deleted.",
215 error=str(e),
216 response_text=e.response.text,
217 )
218 print(
219 "Error: Could not clean up ephemeral hosts because of authentication error. Host may have been deleted."
220 )
221 print(f"Server message: {e.response.text}")
222 break
223 else:
224 await logger.aerror("error during cleanup operation", error=str(e))
226 except Exception:
227 await logger.aexception("could not refresh token")
228 if proc.returncode is not None:
229 await logger.ainfo("process died, restarting")
230 proc = await start_process()
232 # Clean shutdown if we get here
233 if proc.returncode is None:
234 await logger.ainfo("shutting down nebula process")
235 proc.terminate()
236 try:
237 await asyncio.wait_for(proc.wait(), timeout=5.0)
238 except asyncio.TimeoutError:
239 await logger.awarning("nebula process didn't terminate, killing it")
240 proc.kill()