Coverage for src/meshadmin/cli/main.py: 20%

378 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-04 14:54 +0200

1import asyncio 

2import json 

3import os 

4import platform 

5import shutil 

6import signal 

7import subprocess 

8from datetime import datetime, timedelta 

9from importlib import resources 

10from importlib.metadata import PackageNotFoundError, version 

11from pathlib import Path 

12from time import sleep 

13from typing import Annotated 

14from uuid import uuid4 

15 

16import httpx 

17import jwt 

18import structlog 

19import typer 

20from jwcrypto.jwk import JWK 

21from jwcrypto.jwt import JWT 

22from jwt import decode 

23from rich import print, print_json 

24 

25from meshadmin.cli.config import load_config 

26from meshadmin.cli.schemas import ClientEnrollment, NetworkCreate, TemplateCreate 

27from meshadmin.common.utils import ( 

28 create_expiration_date, 

29 create_keys, 

30 download_nebula_binaries, 

31 get_nebula_path, 

32 get_public_ip, 

33) 

34 

35app = typer.Typer() 

36logger = structlog.get_logger(__name__) 

37config = load_config() 

38 

39def version_callback(value: bool): 

40 if value: 

41 try: 

42 installed_version = version("meshadmin") 

43 typer.echo(f"meshadmin version {installed_version}") 

44 except PackageNotFoundError: 

45 typer.echo("meshadmin is not installed") 

46 raise typer.Exit() 

47 

48@app.callback() 

49def main( 

50 ctx: typer.Context, 

51 version: bool = typer.Option( 

52 None, 

53 "--version", 

54 callback=version_callback, 

55 is_eager=True, 

56 help="Show the version and exit.", 

57 ), 

58): 

59 pass 

60 

61@app.command() 

62def download(): 

63 logger.info("Downloading nebula binaries") 

64 try: 

65 install_path = download_nebula_binaries(config.api_endpoint) 

66 logger.info("Nebula binaries downloaded successfully", path=str(install_path)) 

67 except Exception as e: 

68 logger.error("Failed to download nebula binaries", error=str(e)) 

69 raise typer.Exit(code=1) 

70 

71 

72@app.command() 

73def enroll( 

74 enrollment_key: Annotated[ 

75 str, 

76 typer.Argument(envvar="MESH_ENROLLMENT_KEY"), 

77 ], 

78 preferred_hostname: Annotated[ 

79 str, 

80 typer.Option(envvar="MESH_HOSTNAME"), 

81 ] = None, 

82 public_ip: Annotated[ 

83 str, 

84 typer.Option(envvar="MESH_PUBLIC_IP"), 

85 ] = None, 

86 mesh_config_path: Annotated[ 

87 Path, 

88 typer.Option(envvar="MESH_CONFIG_PATH"), 

89 ] = ".", 

90 mesh_admin_endpoint: Annotated[ 

91 str, 

92 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

93 ] = config.server_url, 

94): 

95 logger.info("enrolling") 

96 

97 if not mesh_config_path.exists(): 

98 mesh_config_path.mkdir(exist_ok=True, parents=True) 

99 

100 private_auth_key_path = mesh_config_path / config.private_key 

101 if private_auth_key_path.exists(): 

102 logger.info("auth key already exists") 

103 else: 

104 logger.info("creating auth key") 

105 create_auth_key(mesh_config_path) 

106 

107 jwk = JWK.from_json(private_auth_key_path.read_text()) 

108 public_auth_key = jwk.export_public() 

109 logger.info("public key for registration", public_key=public_auth_key) 

110 

111 private_net_key_path = mesh_config_path / config.private_net_key_file 

112 public_net_key_path = mesh_config_path / config.public_net_key_file 

113 

114 if public_ip is None: 

115 public_ip = get_public_ip() 

116 logger.info( 

117 "public ip not set, using ip reported by https://checkip.amazonaws.com/", 

118 public_ip=public_ip, 

119 ) 

120 

121 if preferred_hostname is None: 

122 preferred_hostname = platform.node() 

123 logger.info( 

124 "preferred hostname not set, using system hostname", 

125 hostname=preferred_hostname, 

126 ) 

127 

128 if private_net_key_path.exists() and public_net_key_path.exists(): 

129 public_nebula_key = public_net_key_path.read_text() 

130 logger.info( 

131 "private and public nebula key already exists", 

132 public_key=public_nebula_key, 

133 ) 

134 else: 

135 logger.info("creating private and public nebula key") 

136 private, public_nebula_key = create_keys() 

137 private_net_key_path.write_text(private) 

138 private_auth_key_path.chmod(0o600) 

139 public_net_key_path.write_text(public_nebula_key) 

140 public_net_key_path.chmod(0o600) 

141 logger.info( 

142 "private and public nebula key created", public_nebula_key=public_nebula_key 

143 ) 

144 

145 enrollment = ClientEnrollment( 

146 enrollment_key=enrollment_key, 

147 public_net_key=public_nebula_key, 

148 public_auth_key=public_auth_key, 

149 preferred_hostname=preferred_hostname, 

150 public_ip=public_ip, 

151 ) 

152 

153 res = httpx.post( 

154 f"{mesh_admin_endpoint}/api/v1/enroll", 

155 content=enrollment.model_dump_json(), 

156 headers={"Content-Type": "application/json"}, 

157 ) 

158 res.raise_for_status() 

159 

160 get_config(mesh_config_path, mesh_admin_endpoint) 

161 logger.info("enrollment response", enrollment=res.content) 

162 logger.info("enrollment finished") 

163 

164 

165@app.command() 

166def install_service( 

167 mesh_config_path: Annotated[ 

168 Path, 

169 typer.Option(envvar="MESH_CONFIG_PATH"), 

170 ] = None, 

171 mesh_admin_endpoint: Annotated[ 

172 str, 

173 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

174 ] = config.server_url, 

175): 

176 os_name = platform.system() 

177 meshadmin_path = shutil.which("meshadmin") 

178 

179 if not meshadmin_path: 

180 logger.error("meshadmin executable not found in PATH") 

181 exit(1) 

182 

183 if mesh_config_path is None: 

184 if os_name == "Darwin": 

185 mesh_config_path = Path( 

186 os.path.expanduser("~/Library/Application Support/meshadmin") 

187 ) 

188 else: 

189 mesh_config_path = Path("/etc/meshadmin") 

190 

191 mesh_config_path = Path(os.path.expanduser(str(mesh_config_path))) 

192 if not mesh_config_path.exists(): 

193 mesh_config_path.mkdir(exist_ok=True, parents=True) 

194 

195 (mesh_config_path / "env").write_text( 

196 f"""MESH_ADMIN_ENDPOINT={mesh_admin_endpoint} 

197 MESH_CONFIG_PATH={mesh_config_path.absolute()} 

198 """ 

199 ) 

200 if os_name == "Darwin": 

201 plist_content = f"""<?xml version="1.0" encoding="UTF-8"?> 

202<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> 

203<plist version="1.0"> 

204<dict> 

205 <key>Label</key> 

206 <string>com.meshadmin.service</string> 

207 <key>ProgramArguments</key> 

208 <array> 

209 <string>{meshadmin_path}</string> 

210 <string>start</string> 

211 </array> 

212 <key>EnvironmentVariables</key> 

213 <dict> 

214 <key>MESH_ADMIN_ENDPOINT</key> 

215 <string>{mesh_admin_endpoint}</string> 

216 <key>MESH_CONFIG_PATH</key> 

217 <string>{mesh_config_path.absolute()}</string> 

218 </dict> 

219 <key>RunAtLoad</key> 

220 <true/> 

221 <key>KeepAlive</key> 

222 <true/> 

223 <key>StandardErrorPath</key> 

224 <string>{mesh_config_path.absolute()}/error.log</string> 

225 <key>StandardOutPath</key> 

226 <string>{mesh_config_path.absolute()}/output.log</string> 

227</dict> 

228</plist> 

229""" 

230 launch_agents_dir = Path(os.path.expanduser("~/Library/LaunchAgents")) 

231 if not launch_agents_dir.exists(): 

232 launch_agents_dir.mkdir(exist_ok=True, parents=True) 

233 plist_path = launch_agents_dir / "com.meshadmin.service.plist" 

234 plist_path.write_text(plist_content) 

235 subprocess.run(["launchctl", "load", str(plist_path)]) 

236 logger.info( 

237 "meshadmin service installed and started", 

238 plist_path=str(plist_path), 

239 config_path=str(mesh_config_path), 

240 ) 

241 print(f"meshadmin service installed at {plist_path}") 

242 print(f"Configuration directory: {mesh_config_path}") 

243 print("Service has been loaded and will start automatically on login") 

244 

245 else: 

246 systemd_unit = f"""[Unit] 

247Description=Meshadmin 

248Wants=basic.target network-online.target nss-lookup.target time-sync.target 

249After=basic.target network.target network-online.target 

250Before=sshd.service 

251 

252[Service] 

253#Type=notify 

254#NotifyAccess=main 

255SyslogIdentifier=meshadmin 

256EnvironmentFile={mesh_config_path.absolute()}/env 

257ExecReload=/bin/kill -HUP $MAINPID 

258ExecStart={meshadmin_path} start  

259Restart=always 

260 

261[Install] 

262WantedBy=multi-user.target 

263""" 

264 config.systemd_service_path.write_text(systemd_unit) 

265 subprocess.run(["systemctl", "daemon-reload"]) 

266 subprocess.run(["systemctl", "enable", "meshadmin"]) 

267 print(f"meshadmin service installed at {config.systemd_service_path}") 

268 print(f"Configuration directory: {mesh_config_path}") 

269 print("Service has been enabled and will start automatically on boot") 

270 

271 

272@app.command() 

273def uninstall_service(): 

274 os_name = platform.system() 

275 if os_name == "Darwin": 

276 plist_path = Path( 

277 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist") 

278 ) 

279 if plist_path.exists(): 

280 subprocess.run(["launchctl", "unload", str(plist_path)]) 

281 plist_path.unlink() 

282 logger.info("meshadmin service uninstalled", plist_path=str(plist_path)) 

283 print(f"meshadmin service uninstalled from {plist_path}") 

284 else: 

285 logger.warning("meshadmin service not found", plist_path=str(plist_path)) 

286 print("meshadmin service not found, nothing to uninstall") 

287 else: 

288 if config.systemd_service_path.exists(): 

289 subprocess.run(["systemctl", "stop", "meshadmin"]) 

290 subprocess.run(["systemctl", "disable", "meshadmin"]) 

291 subprocess.run(["systemctl", "daemon-reload"]) 

292 config.systemd_service_path.unlink() 

293 env_path = Path("/etc/meshadmin/env") 

294 if env_path.exists(): 

295 env_path.unlink() 

296 logger.info("meshadmin service uninstalled") 

297 print("meshadmin service uninstalled") 

298 else: 

299 logger.warning("meshadmin service not found") 

300 print("meshadmin service not found, nothing to uninstall") 

301 

302 

303@app.command() 

304def start_service(): 

305 os_name = platform.system() 

306 if os_name == "Darwin": 

307 plist_path = Path( 

308 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist") 

309 ) 

310 if plist_path.exists(): 

311 subprocess.run(["launchctl", "load", str(plist_path)]) 

312 logger.info("meshadmin service started") 

313 print("meshadmin service started") 

314 else: 

315 logger.error("meshadmin service not installed", plist_path=str(plist_path)) 

316 print( 

317 "meshadmin service not installed. Run 'meshadmin install_service' first." 

318 ) 

319 else: 

320 subprocess.run(["systemctl", "start", "meshadmin"]) 

321 print("meshadmin service started") 

322 

323 

324@app.command() 

325def stop_service(): 

326 os_name = platform.system() 

327 if os_name == "Darwin": 

328 plist_path = Path( 

329 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist") 

330 ) 

331 if plist_path.exists(): 

332 subprocess.run(["launchctl", "unload", str(plist_path)]) 

333 logger.info("meshadmin service stopped") 

334 print("meshadmin service stopped") 

335 else: 

336 logger.error("meshadmin service not installed", plist_path=str(plist_path)) 

337 print("meshadmin service not installed. Nothing to stop.") 

338 else: 

339 subprocess.run(["systemctl", "stop", "meshadmin"]) 

340 print("meshadmin service stopped") 

341 

342 

343@app.command() 

344def create_auth_key( 

345 mesh_config_path: Annotated[ 

346 Path, 

347 typer.Argument(envvar="MESH_CONFIG_PATH"), 

348 ] = ".", 

349): 

350 jwk = JWK.generate(kty="RSA", kid=str(uuid4()), size=2048) 

351 auth_key = mesh_config_path / config.private_key 

352 auth_key.write_text(jwk.export_private()) 

353 auth_key.chmod(0o600) 

354 

355 

356@app.command() 

357def show_auth_public_key( 

358 mesh_config_path: Annotated[ 

359 Path, 

360 typer.Argument(envvar="MESH_CONFIG_PATH"), 

361 ] = ".", 

362): 

363 jwk = JWK.from_json((mesh_config_path / config.private_key).read_text()) 

364 print(jwk.export_public()) 

365 

366 

367@app.command() 

368def create_net_keys( 

369 mesh_config_path: Annotated[ 

370 Path, 

371 typer.Argument(envvar="MESH_CONFIG_PATH"), 

372 ] = ".", 

373): 

374 private, public = create_keys() 

375 private_net_key_path = mesh_config_path / config.private_net_key_file 

376 private_net_key_path.write_text(private) 

377 private_net_key_path.chmod(0o600) 

378 public_net_key_path = mesh_config_path / config.public_net_key_file 

379 public_net_key_path.write_text(public) 

380 public_net_key_path.chmod(0o600) 

381 

382 

383@app.command() 

384def get_config( 

385 mesh_config_path: Annotated[ 

386 Path, 

387 typer.Option(envvar="MESH_CONFIG_PATH"), 

388 ] = ".", 

389 mesh_admin_endpoint: Annotated[ 

390 str, 

391 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

392 ] = config.server_url, 

393): 

394 private_net_key, public_net_key = create_keys() 

395 private_auth_key = JWK.from_json( 

396 (mesh_config_path / config.private_key).read_text() 

397 ) 

398 

399 loop = asyncio.get_event_loop() 

400 

401 result, _ = loop.run_until_complete( 

402 get_config_from_mesh(mesh_admin_endpoint, private_auth_key) 

403 ) 

404 (mesh_config_path / config.config_path).write_text(result) 

405 

406 

407async def get_config_from_mesh(mesh_admin_endpoint, private_auth_key): 

408 jwt = JWT( 

409 header={"alg": "RS256", "kid": private_auth_key.thumbprint()}, 

410 claims={ 

411 "exp": create_expiration_date(10), 

412 "kid": private_auth_key.thumbprint(), 

413 }, 

414 ) 

415 jwt.make_signed_token(private_auth_key) 

416 token = jwt.serialize() 

417 

418 async with httpx.AsyncClient() as client: 

419 res = await client.get( 

420 f"{mesh_admin_endpoint}/api/v1/config", 

421 headers={"Authorization": f"Bearer {token}"}, 

422 ) 

423 res.raise_for_status() 

424 config = res.text 

425 update_interval = int(res.headers.get("X-Update-Interval", "5")) 

426 return config, update_interval 

427 

428 

429async def cleanup_ephemeral_hosts(mesh_admin_endpoint, private_auth_key): 

430 jwt_token = JWT( 

431 header={"alg": "RS256", "kid": private_auth_key.thumbprint()}, 

432 claims={ 

433 "exp": create_expiration_date(10), 

434 "kid": private_auth_key.thumbprint(), 

435 }, 

436 ) 

437 jwt_token.make_signed_token(private_auth_key) 

438 token = jwt_token.serialize() 

439 

440 async with httpx.AsyncClient() as client: 

441 res = await client.post( 

442 f"{mesh_admin_endpoint}/api/v1/cleanup-ephemeral", 

443 headers={"Authorization": f"Bearer {token}"}, 

444 ) 

445 res.raise_for_status() 

446 return res.json() 

447 

448 

449async def start_nebula(mesh_config_path, mesh_admin_endpoint): 

450 await logger.ainfo("starting nebula") 

451 conf_path = mesh_config_path / config.config_path 

452 assert conf_path.exists(), f"Config at {conf_path} does not exist" 

453 

454 private_auth_key_path = mesh_config_path / config.private_key 

455 assert private_auth_key_path.exists(), ( 

456 f"private_key at {private_auth_key_path} does not exist" 

457 ) 

458 

459 async def start_process(): 

460 return await asyncio.create_subprocess_exec( 

461 get_nebula_path(), 

462 "-config", 

463 str(conf_path), 

464 cwd=mesh_config_path, 

465 ) 

466 

467 proc = await start_process() 

468 

469 # Default update interval in seconds 

470 update_interval = 5 

471 

472 while True: 

473 await asyncio.sleep(update_interval) 

474 try: 

475 private_auth_key_path = mesh_config_path / config.private_key 

476 private_auth_key = JWK.from_json(private_auth_key_path.read_text()) 

477 

478 # Check for config updates 

479 try: 

480 new_config, new_update_interval = await get_config_from_mesh( 

481 mesh_admin_endpoint, private_auth_key 

482 ) 

483 

484 if update_interval != new_update_interval: 

485 await logger.ainfo( 

486 "update interval changed", 

487 old_interval=update_interval, 

488 new_interval=new_update_interval, 

489 ) 

490 update_interval = new_update_interval 

491 

492 old_config = conf_path.read_text() 

493 if new_config != old_config: 

494 await logger.ainfo("config changed, reloading") 

495 conf_path.write_text(new_config) 

496 conf_path.chmod(0o600) 

497 

498 try: 

499 proc.send_signal(signal.SIGHUP) 

500 except ProcessLookupError: 

501 await logger.ainfo("process died, restarting") 

502 proc = await start_process() 

503 else: 

504 await logger.ainfo("config not changed") 

505 except httpx.HTTPStatusError as e: 

506 if e.response.status_code == 401: 

507 await logger.aerror( 

508 "Could not get config because of authentication error. Host may have been deleted.", 

509 error=str(e), 

510 response_text=e.response.text, 

511 ) 

512 print( 

513 "Error: Could not get config because of authentication error. Host may have been deleted." 

514 ) 

515 print(f"Server message: {e.response.text}") 

516 break 

517 else: 

518 await logger.aerror("error getting config", error=str(e)) 

519 

520 # Cleanup ephemeral hosts 

521 try: 

522 result = await cleanup_ephemeral_hosts( 

523 mesh_admin_endpoint, private_auth_key 

524 ) 

525 if result.get("removed_count", 0) > 0: 

526 await logger.ainfo( 

527 "removed stale ephemeral hosts", 

528 count=result["removed_count"], 

529 ) 

530 except httpx.HTTPStatusError as e: 

531 if e.response.status_code == 401: 

532 await logger.aerror( 

533 "Could not clean up ephemeral hosts because of authentication error. Host may have been deleted.", 

534 error=str(e), 

535 response_text=e.response.text, 

536 ) 

537 print( 

538 "Error: Could not clean up ephemeral hosts because of authentication error. Host may have been deleted." 

539 ) 

540 print(f"Server message: {e.response.text}") 

541 break 

542 else: 

543 await logger.aerror("error during cleanup operation", error=str(e)) 

544 

545 except Exception: 

546 await logger.aexception("could not refresh token") 

547 if proc.returncode is not None: 

548 await logger.ainfo("process died, restarting") 

549 proc = await start_process() 

550 

551 # Clean shutdown if we get here 

552 if proc.returncode is None: 

553 await logger.ainfo("shutting down nebula process") 

554 proc.terminate() 

555 try: 

556 await asyncio.wait_for(proc.wait(), timeout=5.0) 

557 except asyncio.TimeoutError: 

558 await logger.awarning("nebula process didn't terminate, killing it") 

559 proc.kill() 

560 

561 

562@app.command() 

563def start( 

564 mesh_config_path: Annotated[ 

565 Path, 

566 typer.Option(envvar="MESH_CONFIG_PATH"), 

567 ] = ".", 

568 mesh_admin_endpoint: Annotated[ 

569 str, 

570 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

571 ] = config.server_url, 

572): 

573 asyncio.run(start_nebula(mesh_config_path, mesh_admin_endpoint)) 

574 

575 

576@app.command() 

577def show_public_key(private_key: Path): 

578 jwk = JWK.from_json(private_key.read_text()) 

579 print(jwk.export_public()) 

580 

581 

582@app.command() 

583def login(): 

584 res = httpx.post( 

585 config.keycloak_device_auth_url, 

586 data={ 

587 "client_id": config.keycloak_admin_client, 

588 }, 

589 ) 

590 res.raise_for_status() 

591 

592 device_auth_response = res.json() 

593 print(device_auth_response) 

594 print( 

595 "Please open the verification url", 

596 device_auth_response["verification_uri_complete"], 

597 ) 

598 

599 while True: 

600 res = httpx.post( 

601 config.keycloak_token_url, 

602 data={ 

603 "grant_type": "urn:ietf:params:oauth:grant-type:device_code", 

604 "client_id": config.keycloak_admin_client, 

605 "device_code": device_auth_response["device_code"], 

606 }, 

607 ) 

608 if res.status_code == 200: 

609 logger.info("Received auth token") 

610 config.authentication_path.write_bytes(res.content) 

611 config.authentication_path.chmod(0o600) 

612 

613 access_token = res.json()["access_token"] 

614 refresh_token = res.json()["refresh_token"] 

615 print( 

616 jwt.decode( 

617 refresh_token, 

618 algorithms=["RS256"], 

619 options={"verify_signature": False}, 

620 ) 

621 ) 

622 logger.info("access_token", access_token=access_token) 

623 print("successfully authenticated") 

624 break 

625 else: 

626 print(res.json()) 

627 sleep(device_auth_response["interval"]) 

628 

629 

630def get_access_token(): 

631 if config.authentication_path.exists(): 

632 auth = json.loads(config.authentication_path.read_text()) 

633 access_token = auth["access_token"] 

634 

635 decoded_token = decode( 

636 access_token, options={"verify_signature": False, "verify_exp": False} 

637 ) 

638 

639 # is exp still 2/3 of the time 

640 if decoded_token["exp"] >= (datetime.now() + timedelta(seconds=10)).timestamp(): 

641 return access_token 

642 else: 

643 refresh_token = auth["refresh_token"] 

644 res = httpx.post( 

645 config.keycloak_token_url, 

646 data={ 

647 "grant_type": "refresh_token", 

648 "refresh_token": refresh_token, 

649 "client_id": config.keycloak_admin_client, 

650 }, 

651 ) 

652 res.raise_for_status() 

653 config.authentication_path.write_bytes(res.content) 

654 return res.json()["access_token"] 

655 

656 else: 

657 print("authentication failed") 

658 

659 

660@app.command() 

661def create_network(name: str, cidr: str): 

662 try: 

663 access_token = get_access_token() 

664 except Exception: 

665 logger.exception("failed to get access token") 

666 exit(1) 

667 

668 res = httpx.post( 

669 f"{config.api_endpoint}/networks", 

670 content=NetworkCreate(name=name, cidr=cidr).model_dump_json(), 

671 headers={"Authorization": f"Bearer {access_token}"}, 

672 ) 

673 

674 if res.status_code >= 400: 

675 print("could not create network:", res.text) 

676 exit(1) 

677 

678 print_json(res.content.decode("utf-8")) 

679 

680 

681@app.command() 

682def list_networks(): 

683 try: 

684 access_token = get_access_token() 

685 except Exception: 

686 logger.exception("failed to get access token") 

687 exit(1) 

688 

689 res = httpx.get( 

690 f"{config.api_endpoint}/networks", 

691 headers={"Authorization": f"Bearer {access_token}"}, 

692 ) 

693 res.raise_for_status() 

694 print(res.json()) 

695 

696 

697@app.command() 

698def create_template( 

699 name: str, network_name: str, is_lighthouse: bool, is_relay: bool, use_relay: bool 

700): 

701 try: 

702 access_token = get_access_token() 

703 except Exception: 

704 logger.exception("failed to get access token") 

705 exit(1) 

706 

707 res = httpx.post( 

708 f"{config.api_endpoint}/templates", 

709 content=TemplateCreate( 

710 name=name, 

711 network_name=network_name, 

712 is_lighthouse=is_lighthouse, 

713 is_relay=is_relay, 

714 use_relay=use_relay, 

715 ).model_dump_json(), 

716 headers={"Authorization": f"Bearer {access_token}"}, 

717 ) 

718 res.raise_for_status() 

719 print_json(res.content.decode("utf-8")) 

720 

721 

722@app.command() 

723def delete_template(name: str): 

724 try: 

725 access_token = get_access_token() 

726 except Exception: 

727 logger.exception("failed to get access token") 

728 exit(1) 

729 

730 res = httpx.delete( 

731 f"{config.api_endpoint}/templates/{name}", 

732 headers={"Authorization": f"Bearer {access_token}"}, 

733 ) 

734 res.raise_for_status() 

735 print(res.json()) 

736 

737 

738@app.command() 

739def delete_host(name: str): 

740 try: 

741 access_token = get_access_token() 

742 except Exception: 

743 logger.exception("failed to get access token") 

744 exit(1) 

745 

746 res = httpx.delete( 

747 f"{config.api_endpoint}/hosts/{name}", 

748 headers={"Authorization": f"Bearer {access_token}"}, 

749 ) 

750 res.raise_for_status() 

751 print(res.json()) 

752 

753 

754@app.command() 

755def nebula_cert(): 

756 binary_name = "nebula-cert" 

757 with resources.path("meshadmin.assets", binary_name) as binary_path: 

758 if not os.access(binary_path, os.X_OK): 

759 raise PermissionError(f"{binary_path} is not executable.") 

760 result = subprocess.run([binary_path, "--help"], text=True, capture_output=True) 

761 print(result.stdout) 

762 

763 

764if __name__ == "__main__": 

765 app()