Coverage for src/meshadmin/cli/main.py: 19%
366 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-04 09:21 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-04 09:21 +0200
1import asyncio
2import json
3import os
4import platform
5import shutil
6import signal
7import subprocess
8from datetime import datetime, timedelta
9from importlib import resources
10from pathlib import Path
11from time import sleep
12from typing import Annotated
13from uuid import uuid4
15import httpx
16import jwt
17import structlog
18import typer
19from jwcrypto.jwk import JWK
20from jwcrypto.jwt import JWT
21from jwt import decode
22from rich import print, print_json
24from meshadmin.cli.config import load_config
25from meshadmin.cli.schemas import ClientEnrollment, NetworkCreate, TemplateCreate
26from meshadmin.common.utils import (
27 create_expiration_date,
28 create_keys,
29 download_nebula_binaries,
30 get_nebula_path,
31 get_public_ip,
32)
34app = typer.Typer()
35logger = structlog.get_logger(__name__)
36config = load_config()
39@app.command()
40def download():
41 logger.info("Downloading nebula binaries")
42 try:
43 install_path = download_nebula_binaries(config.api_endpoint)
44 logger.info("Nebula binaries downloaded successfully", path=str(install_path))
45 except Exception as e:
46 logger.error("Failed to download nebula binaries", error=str(e))
47 raise typer.Exit(code=1)
50@app.command()
51def enroll(
52 enrollment_key: Annotated[
53 str,
54 typer.Argument(envvar="MESH_ENROLLMENT_KEY"),
55 ],
56 preferred_hostname: Annotated[
57 str,
58 typer.Option(envvar="MESH_HOSTNAME"),
59 ] = None,
60 public_ip: Annotated[
61 str,
62 typer.Option(envvar="MESH_PUBLIC_IP"),
63 ] = None,
64 mesh_config_path: Annotated[
65 Path,
66 typer.Option(envvar="MESH_CONFIG_PATH"),
67 ] = ".",
68 mesh_admin_endpoint: Annotated[
69 str,
70 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
71 ] = config.server_url,
72):
73 logger.info("enrolling")
75 if not mesh_config_path.exists():
76 mesh_config_path.mkdir(exist_ok=True, parents=True)
78 private_auth_key_path = mesh_config_path / config.private_key
79 if private_auth_key_path.exists():
80 logger.info("auth key already exists")
81 else:
82 logger.info("creating auth key")
83 create_auth_key(mesh_config_path)
85 jwk = JWK.from_json(private_auth_key_path.read_text())
86 public_auth_key = jwk.export_public()
87 logger.info("public key for registration", public_key=public_auth_key)
89 private_net_key_path = mesh_config_path / config.private_net_key_file
90 public_net_key_path = mesh_config_path / config.public_net_key_file
92 if public_ip is None:
93 public_ip = get_public_ip()
94 logger.info(
95 "public ip not set, using ip reported by https://checkip.amazonaws.com/",
96 public_ip=public_ip,
97 )
99 if preferred_hostname is None:
100 preferred_hostname = platform.node()
101 logger.info(
102 "preferred hostname not set, using system hostname",
103 hostname=preferred_hostname,
104 )
106 if private_net_key_path.exists() and public_net_key_path.exists():
107 public_nebula_key = public_net_key_path.read_text()
108 logger.info(
109 "private and public nebula key already exists",
110 public_key=public_nebula_key,
111 )
112 else:
113 logger.info("creating private and public nebula key")
114 private, public_nebula_key = create_keys()
115 private_net_key_path.write_text(private)
116 private_auth_key_path.chmod(0o600)
117 public_net_key_path.write_text(public_nebula_key)
118 public_net_key_path.chmod(0o600)
119 logger.info(
120 "private and public nebula key created", public_nebula_key=public_nebula_key
121 )
123 enrollment = ClientEnrollment(
124 enrollment_key=enrollment_key,
125 public_net_key=public_nebula_key,
126 public_auth_key=public_auth_key,
127 preferred_hostname=preferred_hostname,
128 public_ip=public_ip,
129 )
131 res = httpx.post(
132 f"{mesh_admin_endpoint}/api/v1/enroll",
133 content=enrollment.model_dump_json(),
134 headers={"Content-Type": "application/json"},
135 )
136 res.raise_for_status()
138 get_config(mesh_config_path, mesh_admin_endpoint)
139 logger.info("enrollment response", enrollment=res.content)
140 logger.info("enrollment finished")
143@app.command()
144def install_service(
145 mesh_config_path: Annotated[
146 Path,
147 typer.Option(envvar="MESH_CONFIG_PATH"),
148 ] = None,
149 mesh_admin_endpoint: Annotated[
150 str,
151 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
152 ] = config.server_url,
153):
154 os_name = platform.system()
155 meshadmin_path = shutil.which("meshadmin")
157 if not meshadmin_path:
158 logger.error("meshadmin executable not found in PATH")
159 exit(1)
161 if mesh_config_path is None:
162 if os_name == "Darwin":
163 mesh_config_path = Path(
164 os.path.expanduser("~/Library/Application Support/meshadmin")
165 )
166 else:
167 mesh_config_path = Path("/etc/meshadmin")
169 mesh_config_path = Path(os.path.expanduser(str(mesh_config_path)))
170 if not mesh_config_path.exists():
171 mesh_config_path.mkdir(exist_ok=True, parents=True)
173 (mesh_config_path / "env").write_text(
174 f"""MESH_ADMIN_ENDPOINT={mesh_admin_endpoint}
175 MESH_CONFIG_PATH={mesh_config_path.absolute()}
176 """
177 )
178 if os_name == "Darwin":
179 plist_content = f"""<?xml version="1.0" encoding="UTF-8"?>
180<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
181<plist version="1.0">
182<dict>
183 <key>Label</key>
184 <string>com.meshadmin.service</string>
185 <key>ProgramArguments</key>
186 <array>
187 <string>{meshadmin_path}</string>
188 <string>start</string>
189 </array>
190 <key>EnvironmentVariables</key>
191 <dict>
192 <key>MESH_ADMIN_ENDPOINT</key>
193 <string>{mesh_admin_endpoint}</string>
194 <key>MESH_CONFIG_PATH</key>
195 <string>{mesh_config_path.absolute()}</string>
196 </dict>
197 <key>RunAtLoad</key>
198 <true/>
199 <key>KeepAlive</key>
200 <true/>
201 <key>StandardErrorPath</key>
202 <string>{mesh_config_path.absolute()}/error.log</string>
203 <key>StandardOutPath</key>
204 <string>{mesh_config_path.absolute()}/output.log</string>
205</dict>
206</plist>
207"""
208 launch_agents_dir = Path(os.path.expanduser("~/Library/LaunchAgents"))
209 if not launch_agents_dir.exists():
210 launch_agents_dir.mkdir(exist_ok=True, parents=True)
211 plist_path = launch_agents_dir / "com.meshadmin.service.plist"
212 plist_path.write_text(plist_content)
213 subprocess.run(["launchctl", "load", str(plist_path)])
214 logger.info(
215 "meshadmin service installed and started",
216 plist_path=str(plist_path),
217 config_path=str(mesh_config_path),
218 )
219 print(f"meshadmin service installed at {plist_path}")
220 print(f"Configuration directory: {mesh_config_path}")
221 print("Service has been loaded and will start automatically on login")
223 else:
224 systemd_unit = f"""[Unit]
225Description=Meshadmin
226Wants=basic.target network-online.target nss-lookup.target time-sync.target
227After=basic.target network.target network-online.target
228Before=sshd.service
230[Service]
231#Type=notify
232#NotifyAccess=main
233SyslogIdentifier=meshadmin
234EnvironmentFile={mesh_config_path.absolute()}/env
235ExecReload=/bin/kill -HUP $MAINPID
236ExecStart={meshadmin_path} start
237Restart=always
239[Install]
240WantedBy=multi-user.target
241"""
242 config.systemd_service_path.write_text(systemd_unit)
243 subprocess.run(["systemctl", "daemon-reload"])
244 subprocess.run(["systemctl", "enable", "meshadmin"])
245 print(f"meshadmin service installed at {config.systemd_service_path}")
246 print(f"Configuration directory: {mesh_config_path}")
247 print("Service has been enabled and will start automatically on boot")
250@app.command()
251def uninstall_service():
252 os_name = platform.system()
253 if os_name == "Darwin":
254 plist_path = Path(
255 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist")
256 )
257 if plist_path.exists():
258 subprocess.run(["launchctl", "unload", str(plist_path)])
259 plist_path.unlink()
260 logger.info("meshadmin service uninstalled", plist_path=str(plist_path))
261 print(f"meshadmin service uninstalled from {plist_path}")
262 else:
263 logger.warning("meshadmin service not found", plist_path=str(plist_path))
264 print("meshadmin service not found, nothing to uninstall")
265 else:
266 if config.systemd_service_path.exists():
267 subprocess.run(["systemctl", "stop", "meshadmin"])
268 subprocess.run(["systemctl", "disable", "meshadmin"])
269 subprocess.run(["systemctl", "daemon-reload"])
270 config.systemd_service_path.unlink()
271 env_path = Path("/etc/meshadmin/env")
272 if env_path.exists():
273 env_path.unlink()
274 logger.info("meshadmin service uninstalled")
275 print("meshadmin service uninstalled")
276 else:
277 logger.warning("meshadmin service not found")
278 print("meshadmin service not found, nothing to uninstall")
281@app.command()
282def start_service():
283 os_name = platform.system()
284 if os_name == "Darwin":
285 plist_path = Path(
286 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist")
287 )
288 if plist_path.exists():
289 subprocess.run(["launchctl", "load", str(plist_path)])
290 logger.info("meshadmin service started")
291 print("meshadmin service started")
292 else:
293 logger.error("meshadmin service not installed", plist_path=str(plist_path))
294 print(
295 "meshadmin service not installed. Run 'meshadmin install_service' first."
296 )
297 else:
298 subprocess.run(["systemctl", "start", "meshadmin"])
299 print("meshadmin service started")
302@app.command()
303def stop_service():
304 os_name = platform.system()
305 if os_name == "Darwin":
306 plist_path = Path(
307 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist")
308 )
309 if plist_path.exists():
310 subprocess.run(["launchctl", "unload", str(plist_path)])
311 logger.info("meshadmin service stopped")
312 print("meshadmin service stopped")
313 else:
314 logger.error("meshadmin service not installed", plist_path=str(plist_path))
315 print("meshadmin service not installed. Nothing to stop.")
316 else:
317 subprocess.run(["systemctl", "stop", "meshadmin"])
318 print("meshadmin service stopped")
321@app.command()
322def create_auth_key(
323 mesh_config_path: Annotated[
324 Path,
325 typer.Argument(envvar="MESH_CONFIG_PATH"),
326 ] = ".",
327):
328 jwk = JWK.generate(kty="RSA", kid=str(uuid4()), size=2048)
329 auth_key = mesh_config_path / config.private_key
330 auth_key.write_text(jwk.export_private())
331 auth_key.chmod(0o600)
334@app.command()
335def show_auth_public_key(
336 mesh_config_path: Annotated[
337 Path,
338 typer.Argument(envvar="MESH_CONFIG_PATH"),
339 ] = ".",
340):
341 jwk = JWK.from_json((mesh_config_path / config.private_key).read_text())
342 print(jwk.export_public())
345@app.command()
346def create_net_keys(
347 mesh_config_path: Annotated[
348 Path,
349 typer.Argument(envvar="MESH_CONFIG_PATH"),
350 ] = ".",
351):
352 private, public = create_keys()
353 private_net_key_path = mesh_config_path / config.private_net_key_file
354 private_net_key_path.write_text(private)
355 private_net_key_path.chmod(0o600)
356 public_net_key_path = mesh_config_path / config.public_net_key_file
357 public_net_key_path.write_text(public)
358 public_net_key_path.chmod(0o600)
361@app.command()
362def get_config(
363 mesh_config_path: Annotated[
364 Path,
365 typer.Option(envvar="MESH_CONFIG_PATH"),
366 ] = ".",
367 mesh_admin_endpoint: Annotated[
368 str,
369 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
370 ] = config.server_url,
371):
372 private_net_key, public_net_key = create_keys()
373 private_auth_key = JWK.from_json(
374 (mesh_config_path / config.private_key).read_text()
375 )
377 loop = asyncio.get_event_loop()
379 result, _ = loop.run_until_complete(
380 get_config_from_mesh(mesh_admin_endpoint, private_auth_key)
381 )
382 (mesh_config_path / config.config_path).write_text(result)
385async def get_config_from_mesh(mesh_admin_endpoint, private_auth_key):
386 jwt = JWT(
387 header={"alg": "RS256", "kid": private_auth_key.thumbprint()},
388 claims={
389 "exp": create_expiration_date(10),
390 "kid": private_auth_key.thumbprint(),
391 },
392 )
393 jwt.make_signed_token(private_auth_key)
394 token = jwt.serialize()
396 async with httpx.AsyncClient() as client:
397 res = await client.get(
398 f"{mesh_admin_endpoint}/api/v1/config",
399 headers={"Authorization": f"Bearer {token}"},
400 )
401 res.raise_for_status()
402 config = res.text
403 update_interval = int(res.headers.get("X-Update-Interval", "5"))
404 return config, update_interval
407async def cleanup_ephemeral_hosts(mesh_admin_endpoint, private_auth_key):
408 jwt_token = JWT(
409 header={"alg": "RS256", "kid": private_auth_key.thumbprint()},
410 claims={
411 "exp": create_expiration_date(10),
412 "kid": private_auth_key.thumbprint(),
413 },
414 )
415 jwt_token.make_signed_token(private_auth_key)
416 token = jwt_token.serialize()
418 async with httpx.AsyncClient() as client:
419 res = await client.post(
420 f"{mesh_admin_endpoint}/api/v1/cleanup-ephemeral",
421 headers={"Authorization": f"Bearer {token}"},
422 )
423 res.raise_for_status()
424 return res.json()
427async def start_nebula(mesh_config_path, mesh_admin_endpoint):
428 await logger.ainfo("starting nebula")
429 conf_path = mesh_config_path / config.config_path
430 assert conf_path.exists(), f"Config at {conf_path} does not exist"
432 private_auth_key_path = mesh_config_path / config.private_key
433 assert private_auth_key_path.exists(), (
434 f"private_key at {private_auth_key_path} does not exist"
435 )
437 async def start_process():
438 return await asyncio.create_subprocess_exec(
439 get_nebula_path(),
440 "-config",
441 str(conf_path),
442 cwd=mesh_config_path,
443 )
445 proc = await start_process()
447 # Default update interval in seconds
448 update_interval = 5
450 while True:
451 await asyncio.sleep(update_interval)
452 try:
453 private_auth_key_path = mesh_config_path / config.private_key
454 private_auth_key = JWK.from_json(private_auth_key_path.read_text())
456 # Check for config updates
457 try:
458 new_config, new_update_interval = await get_config_from_mesh(
459 mesh_admin_endpoint, private_auth_key
460 )
462 if update_interval != new_update_interval:
463 await logger.ainfo(
464 "update interval changed",
465 old_interval=update_interval,
466 new_interval=new_update_interval,
467 )
468 update_interval = new_update_interval
470 old_config = conf_path.read_text()
471 if new_config != old_config:
472 await logger.ainfo("config changed, reloading")
473 conf_path.write_text(new_config)
474 conf_path.chmod(0o600)
476 try:
477 proc.send_signal(signal.SIGHUP)
478 except ProcessLookupError:
479 await logger.ainfo("process died, restarting")
480 proc = await start_process()
481 else:
482 await logger.ainfo("config not changed")
483 except httpx.HTTPStatusError as e:
484 if e.response.status_code == 401:
485 await logger.aerror(
486 "Could not get config because of authentication error. Host may have been deleted.",
487 error=str(e),
488 response_text=e.response.text,
489 )
490 print(
491 "Error: Could not get config because of authentication error. Host may have been deleted."
492 )
493 print(f"Server message: {e.response.text}")
494 break
495 else:
496 await logger.aerror("error getting config", error=str(e))
498 # Cleanup ephemeral hosts
499 try:
500 result = await cleanup_ephemeral_hosts(
501 mesh_admin_endpoint, private_auth_key
502 )
503 if result.get("removed_count", 0) > 0:
504 await logger.ainfo(
505 "removed stale ephemeral hosts",
506 count=result["removed_count"],
507 )
508 except httpx.HTTPStatusError as e:
509 if e.response.status_code == 401:
510 await logger.aerror(
511 "Could not clean up ephemeral hosts because of authentication error. Host may have been deleted.",
512 error=str(e),
513 response_text=e.response.text,
514 )
515 print(
516 "Error: Could not clean up ephemeral hosts because of authentication error. Host may have been deleted."
517 )
518 print(f"Server message: {e.response.text}")
519 break
520 else:
521 await logger.aerror("error during cleanup operation", error=str(e))
523 except Exception:
524 await logger.aexception("could not refresh token")
525 if proc.returncode is not None:
526 await logger.ainfo("process died, restarting")
527 proc = await start_process()
529 # Clean shutdown if we get here
530 if proc.returncode is None:
531 await logger.ainfo("shutting down nebula process")
532 proc.terminate()
533 try:
534 await asyncio.wait_for(proc.wait(), timeout=5.0)
535 except asyncio.TimeoutError:
536 await logger.awarning("nebula process didn't terminate, killing it")
537 proc.kill()
540@app.command()
541def start(
542 mesh_config_path: Annotated[
543 Path,
544 typer.Option(envvar="MESH_CONFIG_PATH"),
545 ] = ".",
546 mesh_admin_endpoint: Annotated[
547 str,
548 typer.Option(envvar="MESH_ADMIN_ENDPOINT"),
549 ] = config.server_url,
550):
551 asyncio.run(start_nebula(mesh_config_path, mesh_admin_endpoint))
554@app.command()
555def show_public_key(private_key: Path):
556 jwk = JWK.from_json(private_key.read_text())
557 print(jwk.export_public())
560@app.command()
561def login():
562 res = httpx.post(
563 config.keycloak_device_auth_url,
564 data={
565 "client_id": config.keycloak_admin_client,
566 },
567 )
568 res.raise_for_status()
570 device_auth_response = res.json()
571 print(device_auth_response)
572 print(
573 "Please open the verification url",
574 device_auth_response["verification_uri_complete"],
575 )
577 while True:
578 res = httpx.post(
579 config.keycloak_token_url,
580 data={
581 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
582 "client_id": config.keycloak_admin_client,
583 "device_code": device_auth_response["device_code"],
584 },
585 )
586 if res.status_code == 200:
587 logger.info("Received auth token")
588 config.authentication_path.write_bytes(res.content)
589 config.authentication_path.chmod(0o600)
591 access_token = res.json()["access_token"]
592 refresh_token = res.json()["refresh_token"]
593 print(
594 jwt.decode(
595 refresh_token,
596 algorithms=["RS256"],
597 options={"verify_signature": False},
598 )
599 )
600 logger.info("access_token", access_token=access_token)
601 print("successfully authenticated")
602 break
603 else:
604 print(res.json())
605 sleep(device_auth_response["interval"])
608def get_access_token():
609 if config.authentication_path.exists():
610 auth = json.loads(config.authentication_path.read_text())
611 access_token = auth["access_token"]
613 decoded_token = decode(
614 access_token, options={"verify_signature": False, "verify_exp": False}
615 )
617 # is exp still 2/3 of the time
618 if decoded_token["exp"] >= (datetime.now() + timedelta(seconds=10)).timestamp():
619 return access_token
620 else:
621 refresh_token = auth["refresh_token"]
622 res = httpx.post(
623 config.keycloak_token_url,
624 data={
625 "grant_type": "refresh_token",
626 "refresh_token": refresh_token,
627 "client_id": config.keycloak_admin_client,
628 },
629 )
630 res.raise_for_status()
631 config.authentication_path.write_bytes(res.content)
632 return res.json()["access_token"]
634 else:
635 print("authentication failed")
638@app.command()
639def create_network(name: str, cidr: str):
640 try:
641 access_token = get_access_token()
642 except Exception:
643 logger.exception("failed to get access token")
644 exit(1)
646 res = httpx.post(
647 f"{config.api_endpoint}/networks",
648 content=NetworkCreate(name=name, cidr=cidr).model_dump_json(),
649 headers={"Authorization": f"Bearer {access_token}"},
650 )
652 if res.status_code >= 400:
653 print("could not create network:", res.text)
654 exit(1)
656 print_json(res.content.decode("utf-8"))
659@app.command()
660def list_networks():
661 try:
662 access_token = get_access_token()
663 except Exception:
664 logger.exception("failed to get access token")
665 exit(1)
667 res = httpx.get(
668 f"{config.api_endpoint}/networks",
669 headers={"Authorization": f"Bearer {access_token}"},
670 )
671 res.raise_for_status()
672 print(res.json())
675@app.command()
676def create_template(
677 name: str, network_name: str, is_lighthouse: bool, is_relay: bool, use_relay: bool
678):
679 try:
680 access_token = get_access_token()
681 except Exception:
682 logger.exception("failed to get access token")
683 exit(1)
685 res = httpx.post(
686 f"{config.api_endpoint}/templates",
687 content=TemplateCreate(
688 name=name,
689 network_name=network_name,
690 is_lighthouse=is_lighthouse,
691 is_relay=is_relay,
692 use_relay=use_relay,
693 ).model_dump_json(),
694 headers={"Authorization": f"Bearer {access_token}"},
695 )
696 res.raise_for_status()
697 print_json(res.content.decode("utf-8"))
700@app.command()
701def delete_template(name: str):
702 try:
703 access_token = get_access_token()
704 except Exception:
705 logger.exception("failed to get access token")
706 exit(1)
708 res = httpx.delete(
709 f"{config.api_endpoint}/templates/{name}",
710 headers={"Authorization": f"Bearer {access_token}"},
711 )
712 res.raise_for_status()
713 print(res.json())
716@app.command()
717def delete_host(name: str):
718 try:
719 access_token = get_access_token()
720 except Exception:
721 logger.exception("failed to get access token")
722 exit(1)
724 res = httpx.delete(
725 f"{config.api_endpoint}/hosts/{name}",
726 headers={"Authorization": f"Bearer {access_token}"},
727 )
728 res.raise_for_status()
729 print(res.json())
732@app.command()
733def nebula_cert():
734 binary_name = "nebula-cert"
735 with resources.path("meshadmin.assets", binary_name) as binary_path:
736 if not os.access(binary_path, os.X_OK):
737 raise PermissionError(f"{binary_path} is not executable.")
738 result = subprocess.run([binary_path, "--help"], text=True, capture_output=True)
739 print(result.stdout)
742if __name__ == "__main__":
743 app()