Coverage for src/meshadmin/cli/main.py: 20%
378 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-04 14:54 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-04 14:54 +0200
1import asyncio
2import json
3import os
4import platform
5import shutil
6import signal
7import subprocess
8from datetime import datetime, timedelta
9from importlib import resources
10from importlib.metadata import PackageNotFoundError, version
11from pathlib import Path
12from time import sleep
13from typing import Annotated
14from uuid import uuid4
16import httpx
17import jwt
18import structlog
19import typer
20from jwcrypto.jwk import JWK
21from jwcrypto.jwt import JWT
22from jwt import decode
23from rich import print, print_json
25from meshadmin.cli.config import load_config
26from meshadmin.cli.schemas import ClientEnrollment, NetworkCreate, TemplateCreate
27from meshadmin.common.utils import (
28 create_expiration_date,
29 create_keys,
30 download_nebula_binaries,
31 get_nebula_path,
32 get_public_ip,
33)
35app = typer.Typer()
36logger = structlog.get_logger(__name__)
37config = load_config()
39def version_callback(value: bool):
40 if value:
41 try:
42 installed_version = version("meshadmin")
43 typer.echo(f"meshadmin version {installed_version}")
44 except PackageNotFoundError:
45 typer.echo("meshadmin is not installed")
46 raise typer.Exit()
48@app.callback()
49def main(
50 ctx: typer.Context,
51 version: bool = typer.Option(
52 None,
53 "--version",
54 callback=version_callback,
55 is_eager=True,
56 help="Show the version and exit.",
57 ),
58):
59 pass
61@app.command()
62def download():
63 logger.info("Downloading nebula binaries")
64 try:
65 install_path = download_nebula_binaries(config.api_endpoint)
66 logger.info("Nebula binaries downloaded successfully", path=str(install_path))
67 except Exception as e:
68 logger.error("Failed to download nebula binaries", error=str(e))
69 raise typer.Exit(code=1)
72@app.command()
73def enroll(
74 enrollment_key: Annotated[
75 str,
76 typer.Argument(envvar="MESH_ENROLLMENT_KEY"),
77 ],
78 preferred_hostname: Annotated[
79 str,
80 typer.Option(envvar="MESH_HOSTNAME"),
81 ] = None,
82 public_ip: Annotated[
83 str,
84 typer.Option(envvar="MESH_PUBLIC_IP"),
85 ] = None,
86 mesh_config_path: Annotated[
87 Path,
88 typer.Option(envvar="MESH_CONFIG_PATH"),
89 ] = ".",
90 mesh_admin_endpoint: Annotated[
91 str,
92 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
93 ] = config.server_url,
94):
95 logger.info("enrolling")
97 if not mesh_config_path.exists():
98 mesh_config_path.mkdir(exist_ok=True, parents=True)
100 private_auth_key_path = mesh_config_path / config.private_key
101 if private_auth_key_path.exists():
102 logger.info("auth key already exists")
103 else:
104 logger.info("creating auth key")
105 create_auth_key(mesh_config_path)
107 jwk = JWK.from_json(private_auth_key_path.read_text())
108 public_auth_key = jwk.export_public()
109 logger.info("public key for registration", public_key=public_auth_key)
111 private_net_key_path = mesh_config_path / config.private_net_key_file
112 public_net_key_path = mesh_config_path / config.public_net_key_file
114 if public_ip is None:
115 public_ip = get_public_ip()
116 logger.info(
117 "public ip not set, using ip reported by https://checkip.amazonaws.com/",
118 public_ip=public_ip,
119 )
121 if preferred_hostname is None:
122 preferred_hostname = platform.node()
123 logger.info(
124 "preferred hostname not set, using system hostname",
125 hostname=preferred_hostname,
126 )
128 if private_net_key_path.exists() and public_net_key_path.exists():
129 public_nebula_key = public_net_key_path.read_text()
130 logger.info(
131 "private and public nebula key already exists",
132 public_key=public_nebula_key,
133 )
134 else:
135 logger.info("creating private and public nebula key")
136 private, public_nebula_key = create_keys()
137 private_net_key_path.write_text(private)
138 private_auth_key_path.chmod(0o600)
139 public_net_key_path.write_text(public_nebula_key)
140 public_net_key_path.chmod(0o600)
141 logger.info(
142 "private and public nebula key created", public_nebula_key=public_nebula_key
143 )
145 enrollment = ClientEnrollment(
146 enrollment_key=enrollment_key,
147 public_net_key=public_nebula_key,
148 public_auth_key=public_auth_key,
149 preferred_hostname=preferred_hostname,
150 public_ip=public_ip,
151 )
153 res = httpx.post(
154 f"{mesh_admin_endpoint}/api/v1/enroll",
155 content=enrollment.model_dump_json(),
156 headers={"Content-Type": "application/json"},
157 )
158 res.raise_for_status()
160 get_config(mesh_config_path, mesh_admin_endpoint)
161 logger.info("enrollment response", enrollment=res.content)
162 logger.info("enrollment finished")
165@app.command()
166def install_service(
167 mesh_config_path: Annotated[
168 Path,
169 typer.Option(envvar="MESH_CONFIG_PATH"),
170 ] = None,
171 mesh_admin_endpoint: Annotated[
172 str,
173 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
174 ] = config.server_url,
175):
176 os_name = platform.system()
177 meshadmin_path = shutil.which("meshadmin")
179 if not meshadmin_path:
180 logger.error("meshadmin executable not found in PATH")
181 exit(1)
183 if mesh_config_path is None:
184 if os_name == "Darwin":
185 mesh_config_path = Path(
186 os.path.expanduser("~/Library/Application Support/meshadmin")
187 )
188 else:
189 mesh_config_path = Path("/etc/meshadmin")
191 mesh_config_path = Path(os.path.expanduser(str(mesh_config_path)))
192 if not mesh_config_path.exists():
193 mesh_config_path.mkdir(exist_ok=True, parents=True)
195 (mesh_config_path / "env").write_text(
196 f"""MESH_ADMIN_ENDPOINT={mesh_admin_endpoint}
197 MESH_CONFIG_PATH={mesh_config_path.absolute()}
198 """
199 )
200 if os_name == "Darwin":
201 plist_content = f"""<?xml version="1.0" encoding="UTF-8"?>
202<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
203<plist version="1.0">
204<dict>
205 <key>Label</key>
206 <string>com.meshadmin.service</string>
207 <key>ProgramArguments</key>
208 <array>
209 <string>{meshadmin_path}</string>
210 <string>start</string>
211 </array>
212 <key>EnvironmentVariables</key>
213 <dict>
214 <key>MESH_ADMIN_ENDPOINT</key>
215 <string>{mesh_admin_endpoint}</string>
216 <key>MESH_CONFIG_PATH</key>
217 <string>{mesh_config_path.absolute()}</string>
218 </dict>
219 <key>RunAtLoad</key>
220 <true/>
221 <key>KeepAlive</key>
222 <true/>
223 <key>StandardErrorPath</key>
224 <string>{mesh_config_path.absolute()}/error.log</string>
225 <key>StandardOutPath</key>
226 <string>{mesh_config_path.absolute()}/output.log</string>
227</dict>
228</plist>
229"""
230 launch_agents_dir = Path(os.path.expanduser("~/Library/LaunchAgents"))
231 if not launch_agents_dir.exists():
232 launch_agents_dir.mkdir(exist_ok=True, parents=True)
233 plist_path = launch_agents_dir / "com.meshadmin.service.plist"
234 plist_path.write_text(plist_content)
235 subprocess.run(["launchctl", "load", str(plist_path)])
236 logger.info(
237 "meshadmin service installed and started",
238 plist_path=str(plist_path),
239 config_path=str(mesh_config_path),
240 )
241 print(f"meshadmin service installed at {plist_path}")
242 print(f"Configuration directory: {mesh_config_path}")
243 print("Service has been loaded and will start automatically on login")
245 else:
246 systemd_unit = f"""[Unit]
247Description=Meshadmin
248Wants=basic.target network-online.target nss-lookup.target time-sync.target
249After=basic.target network.target network-online.target
250Before=sshd.service
252[Service]
253#Type=notify
254#NotifyAccess=main
255SyslogIdentifier=meshadmin
256EnvironmentFile={mesh_config_path.absolute()}/env
257ExecReload=/bin/kill -HUP $MAINPID
258ExecStart={meshadmin_path} start
259Restart=always
261[Install]
262WantedBy=multi-user.target
263"""
264 config.systemd_service_path.write_text(systemd_unit)
265 subprocess.run(["systemctl", "daemon-reload"])
266 subprocess.run(["systemctl", "enable", "meshadmin"])
267 print(f"meshadmin service installed at {config.systemd_service_path}")
268 print(f"Configuration directory: {mesh_config_path}")
269 print("Service has been enabled and will start automatically on boot")
272@app.command()
273def uninstall_service():
274 os_name = platform.system()
275 if os_name == "Darwin":
276 plist_path = Path(
277 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist")
278 )
279 if plist_path.exists():
280 subprocess.run(["launchctl", "unload", str(plist_path)])
281 plist_path.unlink()
282 logger.info("meshadmin service uninstalled", plist_path=str(plist_path))
283 print(f"meshadmin service uninstalled from {plist_path}")
284 else:
285 logger.warning("meshadmin service not found", plist_path=str(plist_path))
286 print("meshadmin service not found, nothing to uninstall")
287 else:
288 if config.systemd_service_path.exists():
289 subprocess.run(["systemctl", "stop", "meshadmin"])
290 subprocess.run(["systemctl", "disable", "meshadmin"])
291 subprocess.run(["systemctl", "daemon-reload"])
292 config.systemd_service_path.unlink()
293 env_path = Path("/etc/meshadmin/env")
294 if env_path.exists():
295 env_path.unlink()
296 logger.info("meshadmin service uninstalled")
297 print("meshadmin service uninstalled")
298 else:
299 logger.warning("meshadmin service not found")
300 print("meshadmin service not found, nothing to uninstall")
303@app.command()
304def start_service():
305 os_name = platform.system()
306 if os_name == "Darwin":
307 plist_path = Path(
308 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist")
309 )
310 if plist_path.exists():
311 subprocess.run(["launchctl", "load", str(plist_path)])
312 logger.info("meshadmin service started")
313 print("meshadmin service started")
314 else:
315 logger.error("meshadmin service not installed", plist_path=str(plist_path))
316 print(
317 "meshadmin service not installed. Run 'meshadmin install_service' first."
318 )
319 else:
320 subprocess.run(["systemctl", "start", "meshadmin"])
321 print("meshadmin service started")
324@app.command()
325def stop_service():
326 os_name = platform.system()
327 if os_name == "Darwin":
328 plist_path = Path(
329 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist")
330 )
331 if plist_path.exists():
332 subprocess.run(["launchctl", "unload", str(plist_path)])
333 logger.info("meshadmin service stopped")
334 print("meshadmin service stopped")
335 else:
336 logger.error("meshadmin service not installed", plist_path=str(plist_path))
337 print("meshadmin service not installed. Nothing to stop.")
338 else:
339 subprocess.run(["systemctl", "stop", "meshadmin"])
340 print("meshadmin service stopped")
343@app.command()
344def create_auth_key(
345 mesh_config_path: Annotated[
346 Path,
347 typer.Argument(envvar="MESH_CONFIG_PATH"),
348 ] = ".",
349):
350 jwk = JWK.generate(kty="RSA", kid=str(uuid4()), size=2048)
351 auth_key = mesh_config_path / config.private_key
352 auth_key.write_text(jwk.export_private())
353 auth_key.chmod(0o600)
356@app.command()
357def show_auth_public_key(
358 mesh_config_path: Annotated[
359 Path,
360 typer.Argument(envvar="MESH_CONFIG_PATH"),
361 ] = ".",
362):
363 jwk = JWK.from_json((mesh_config_path / config.private_key).read_text())
364 print(jwk.export_public())
367@app.command()
368def create_net_keys(
369 mesh_config_path: Annotated[
370 Path,
371 typer.Argument(envvar="MESH_CONFIG_PATH"),
372 ] = ".",
373):
374 private, public = create_keys()
375 private_net_key_path = mesh_config_path / config.private_net_key_file
376 private_net_key_path.write_text(private)
377 private_net_key_path.chmod(0o600)
378 public_net_key_path = mesh_config_path / config.public_net_key_file
379 public_net_key_path.write_text(public)
380 public_net_key_path.chmod(0o600)
383@app.command()
384def get_config(
385 mesh_config_path: Annotated[
386 Path,
387 typer.Option(envvar="MESH_CONFIG_PATH"),
388 ] = ".",
389 mesh_admin_endpoint: Annotated[
390 str,
391 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
392 ] = config.server_url,
393):
394 private_net_key, public_net_key = create_keys()
395 private_auth_key = JWK.from_json(
396 (mesh_config_path / config.private_key).read_text()
397 )
399 loop = asyncio.get_event_loop()
401 result, _ = loop.run_until_complete(
402 get_config_from_mesh(mesh_admin_endpoint, private_auth_key)
403 )
404 (mesh_config_path / config.config_path).write_text(result)
407async def get_config_from_mesh(mesh_admin_endpoint, private_auth_key):
408 jwt = JWT(
409 header={"alg": "RS256", "kid": private_auth_key.thumbprint()},
410 claims={
411 "exp": create_expiration_date(10),
412 "kid": private_auth_key.thumbprint(),
413 },
414 )
415 jwt.make_signed_token(private_auth_key)
416 token = jwt.serialize()
418 async with httpx.AsyncClient() as client:
419 res = await client.get(
420 f"{mesh_admin_endpoint}/api/v1/config",
421 headers={"Authorization": f"Bearer {token}"},
422 )
423 res.raise_for_status()
424 config = res.text
425 update_interval = int(res.headers.get("X-Update-Interval", "5"))
426 return config, update_interval
429async def cleanup_ephemeral_hosts(mesh_admin_endpoint, private_auth_key):
430 jwt_token = JWT(
431 header={"alg": "RS256", "kid": private_auth_key.thumbprint()},
432 claims={
433 "exp": create_expiration_date(10),
434 "kid": private_auth_key.thumbprint(),
435 },
436 )
437 jwt_token.make_signed_token(private_auth_key)
438 token = jwt_token.serialize()
440 async with httpx.AsyncClient() as client:
441 res = await client.post(
442 f"{mesh_admin_endpoint}/api/v1/cleanup-ephemeral",
443 headers={"Authorization": f"Bearer {token}"},
444 )
445 res.raise_for_status()
446 return res.json()
449async def start_nebula(mesh_config_path, mesh_admin_endpoint):
450 await logger.ainfo("starting nebula")
451 conf_path = mesh_config_path / config.config_path
452 assert conf_path.exists(), f"Config at {conf_path} does not exist"
454 private_auth_key_path = mesh_config_path / config.private_key
455 assert private_auth_key_path.exists(), (
456 f"private_key at {private_auth_key_path} does not exist"
457 )
459 async def start_process():
460 return await asyncio.create_subprocess_exec(
461 get_nebula_path(),
462 "-config",
463 str(conf_path),
464 cwd=mesh_config_path,
465 )
467 proc = await start_process()
469 # Default update interval in seconds
470 update_interval = 5
472 while True:
473 await asyncio.sleep(update_interval)
474 try:
475 private_auth_key_path = mesh_config_path / config.private_key
476 private_auth_key = JWK.from_json(private_auth_key_path.read_text())
478 # Check for config updates
479 try:
480 new_config, new_update_interval = await get_config_from_mesh(
481 mesh_admin_endpoint, private_auth_key
482 )
484 if update_interval != new_update_interval:
485 await logger.ainfo(
486 "update interval changed",
487 old_interval=update_interval,
488 new_interval=new_update_interval,
489 )
490 update_interval = new_update_interval
492 old_config = conf_path.read_text()
493 if new_config != old_config:
494 await logger.ainfo("config changed, reloading")
495 conf_path.write_text(new_config)
496 conf_path.chmod(0o600)
498 try:
499 proc.send_signal(signal.SIGHUP)
500 except ProcessLookupError:
501 await logger.ainfo("process died, restarting")
502 proc = await start_process()
503 else:
504 await logger.ainfo("config not changed")
505 except httpx.HTTPStatusError as e:
506 if e.response.status_code == 401:
507 await logger.aerror(
508 "Could not get config because of authentication error. Host may have been deleted.",
509 error=str(e),
510 response_text=e.response.text,
511 )
512 print(
513 "Error: Could not get config because of authentication error. Host may have been deleted."
514 )
515 print(f"Server message: {e.response.text}")
516 break
517 else:
518 await logger.aerror("error getting config", error=str(e))
520 # Cleanup ephemeral hosts
521 try:
522 result = await cleanup_ephemeral_hosts(
523 mesh_admin_endpoint, private_auth_key
524 )
525 if result.get("removed_count", 0) > 0:
526 await logger.ainfo(
527 "removed stale ephemeral hosts",
528 count=result["removed_count"],
529 )
530 except httpx.HTTPStatusError as e:
531 if e.response.status_code == 401:
532 await logger.aerror(
533 "Could not clean up ephemeral hosts because of authentication error. Host may have been deleted.",
534 error=str(e),
535 response_text=e.response.text,
536 )
537 print(
538 "Error: Could not clean up ephemeral hosts because of authentication error. Host may have been deleted."
539 )
540 print(f"Server message: {e.response.text}")
541 break
542 else:
543 await logger.aerror("error during cleanup operation", error=str(e))
545 except Exception:
546 await logger.aexception("could not refresh token")
547 if proc.returncode is not None:
548 await logger.ainfo("process died, restarting")
549 proc = await start_process()
551 # Clean shutdown if we get here
552 if proc.returncode is None:
553 await logger.ainfo("shutting down nebula process")
554 proc.terminate()
555 try:
556 await asyncio.wait_for(proc.wait(), timeout=5.0)
557 except asyncio.TimeoutError:
558 await logger.awarning("nebula process didn't terminate, killing it")
559 proc.kill()
562@app.command()
563def start(
564 mesh_config_path: Annotated[
565 Path,
566 typer.Option(envvar="MESH_CONFIG_PATH"),
567 ] = ".",
568 mesh_admin_endpoint: Annotated[
569 str,
570 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
571 ] = config.server_url,
572):
573 asyncio.run(start_nebula(mesh_config_path, mesh_admin_endpoint))
576@app.command()
577def show_public_key(private_key: Path):
578 jwk = JWK.from_json(private_key.read_text())
579 print(jwk.export_public())
582@app.command()
583def login():
584 res = httpx.post(
585 config.keycloak_device_auth_url,
586 data={
587 "client_id": config.keycloak_admin_client,
588 },
589 )
590 res.raise_for_status()
592 device_auth_response = res.json()
593 print(device_auth_response)
594 print(
595 "Please open the verification url",
596 device_auth_response["verification_uri_complete"],
597 )
599 while True:
600 res = httpx.post(
601 config.keycloak_token_url,
602 data={
603 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
604 "client_id": config.keycloak_admin_client,
605 "device_code": device_auth_response["device_code"],
606 },
607 )
608 if res.status_code == 200:
609 logger.info("Received auth token")
610 config.authentication_path.write_bytes(res.content)
611 config.authentication_path.chmod(0o600)
613 access_token = res.json()["access_token"]
614 refresh_token = res.json()["refresh_token"]
615 print(
616 jwt.decode(
617 refresh_token,
618 algorithms=["RS256"],
619 options={"verify_signature": False},
620 )
621 )
622 logger.info("access_token", access_token=access_token)
623 print("successfully authenticated")
624 break
625 else:
626 print(res.json())
627 sleep(device_auth_response["interval"])
630def get_access_token():
631 if config.authentication_path.exists():
632 auth = json.loads(config.authentication_path.read_text())
633 access_token = auth["access_token"]
635 decoded_token = decode(
636 access_token, options={"verify_signature": False, "verify_exp": False}
637 )
639 # is exp still 2/3 of the time
640 if decoded_token["exp"] >= (datetime.now() + timedelta(seconds=10)).timestamp():
641 return access_token
642 else:
643 refresh_token = auth["refresh_token"]
644 res = httpx.post(
645 config.keycloak_token_url,
646 data={
647 "grant_type": "refresh_token",
648 "refresh_token": refresh_token,
649 "client_id": config.keycloak_admin_client,
650 },
651 )
652 res.raise_for_status()
653 config.authentication_path.write_bytes(res.content)
654 return res.json()["access_token"]
656 else:
657 print("authentication failed")
660@app.command()
661def create_network(name: str, cidr: str):
662 try:
663 access_token = get_access_token()
664 except Exception:
665 logger.exception("failed to get access token")
666 exit(1)
668 res = httpx.post(
669 f"{config.api_endpoint}/networks",
670 content=NetworkCreate(name=name, cidr=cidr).model_dump_json(),
671 headers={"Authorization": f"Bearer {access_token}"},
672 )
674 if res.status_code >= 400:
675 print("could not create network:", res.text)
676 exit(1)
678 print_json(res.content.decode("utf-8"))
681@app.command()
682def list_networks():
683 try:
684 access_token = get_access_token()
685 except Exception:
686 logger.exception("failed to get access token")
687 exit(1)
689 res = httpx.get(
690 f"{config.api_endpoint}/networks",
691 headers={"Authorization": f"Bearer {access_token}"},
692 )
693 res.raise_for_status()
694 print(res.json())
697@app.command()
698def create_template(
699 name: str, network_name: str, is_lighthouse: bool, is_relay: bool, use_relay: bool
700):
701 try:
702 access_token = get_access_token()
703 except Exception:
704 logger.exception("failed to get access token")
705 exit(1)
707 res = httpx.post(
708 f"{config.api_endpoint}/templates",
709 content=TemplateCreate(
710 name=name,
711 network_name=network_name,
712 is_lighthouse=is_lighthouse,
713 is_relay=is_relay,
714 use_relay=use_relay,
715 ).model_dump_json(),
716 headers={"Authorization": f"Bearer {access_token}"},
717 )
718 res.raise_for_status()
719 print_json(res.content.decode("utf-8"))
722@app.command()
723def delete_template(name: str):
724 try:
725 access_token = get_access_token()
726 except Exception:
727 logger.exception("failed to get access token")
728 exit(1)
730 res = httpx.delete(
731 f"{config.api_endpoint}/templates/{name}",
732 headers={"Authorization": f"Bearer {access_token}"},
733 )
734 res.raise_for_status()
735 print(res.json())
738@app.command()
739def delete_host(name: str):
740 try:
741 access_token = get_access_token()
742 except Exception:
743 logger.exception("failed to get access token")
744 exit(1)
746 res = httpx.delete(
747 f"{config.api_endpoint}/hosts/{name}",
748 headers={"Authorization": f"Bearer {access_token}"},
749 )
750 res.raise_for_status()
751 print(res.json())
754@app.command()
755def nebula_cert():
756 binary_name = "nebula-cert"
757 with resources.path("meshadmin.assets", binary_name) as binary_path:
758 if not os.access(binary_path, os.X_OK):
759 raise PermissionError(f"{binary_path} is not executable.")
760 result = subprocess.run([binary_path, "--help"], text=True, capture_output=True)
761 print(result.stdout)
764if __name__ == "__main__":
765 app()