Coverage for src/meshadmin/cli/main.py: 19%

366 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-04 09:21 +0200

1import asyncio 

2import json 

3import os 

4import platform 

5import shutil 

6import signal 

7import subprocess 

8from datetime import datetime, timedelta 

9from importlib import resources 

10from pathlib import Path 

11from time import sleep 

12from typing import Annotated 

13from uuid import uuid4 

14 

15import httpx 

16import jwt 

17import structlog 

18import typer 

19from jwcrypto.jwk import JWK 

20from jwcrypto.jwt import JWT 

21from jwt import decode 

22from rich import print, print_json 

23 

24from meshadmin.cli.config import load_config 

25from meshadmin.cli.schemas import ClientEnrollment, NetworkCreate, TemplateCreate 

26from meshadmin.common.utils import ( 

27 create_expiration_date, 

28 create_keys, 

29 download_nebula_binaries, 

30 get_nebula_path, 

31 get_public_ip, 

32) 

33 

34app = typer.Typer() 

35logger = structlog.get_logger(__name__) 

36config = load_config() 

37 

38 

39@app.command() 

40def download(): 

41 logger.info("Downloading nebula binaries") 

42 try: 

43 install_path = download_nebula_binaries(config.api_endpoint) 

44 logger.info("Nebula binaries downloaded successfully", path=str(install_path)) 

45 except Exception as e: 

46 logger.error("Failed to download nebula binaries", error=str(e)) 

47 raise typer.Exit(code=1) 

48 

49 

50@app.command() 

51def enroll( 

52 enrollment_key: Annotated[ 

53 str, 

54 typer.Argument(envvar="MESH_ENROLLMENT_KEY"), 

55 ], 

56 preferred_hostname: Annotated[ 

57 str, 

58 typer.Option(envvar="MESH_HOSTNAME"), 

59 ] = None, 

60 public_ip: Annotated[ 

61 str, 

62 typer.Option(envvar="MESH_PUBLIC_IP"), 

63 ] = None, 

64 mesh_config_path: Annotated[ 

65 Path, 

66 typer.Option(envvar="MESH_CONFIG_PATH"), 

67 ] = ".", 

68 mesh_admin_endpoint: Annotated[ 

69 str, 

70 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

71 ] = config.server_url, 

72): 

73 logger.info("enrolling") 

74 

75 if not mesh_config_path.exists(): 

76 mesh_config_path.mkdir(exist_ok=True, parents=True) 

77 

78 private_auth_key_path = mesh_config_path / config.private_key 

79 if private_auth_key_path.exists(): 

80 logger.info("auth key already exists") 

81 else: 

82 logger.info("creating auth key") 

83 create_auth_key(mesh_config_path) 

84 

85 jwk = JWK.from_json(private_auth_key_path.read_text()) 

86 public_auth_key = jwk.export_public() 

87 logger.info("public key for registration", public_key=public_auth_key) 

88 

89 private_net_key_path = mesh_config_path / config.private_net_key_file 

90 public_net_key_path = mesh_config_path / config.public_net_key_file 

91 

92 if public_ip is None: 

93 public_ip = get_public_ip() 

94 logger.info( 

95 "public ip not set, using ip reported by https://checkip.amazonaws.com/", 

96 public_ip=public_ip, 

97 ) 

98 

99 if preferred_hostname is None: 

100 preferred_hostname = platform.node() 

101 logger.info( 

102 "preferred hostname not set, using system hostname", 

103 hostname=preferred_hostname, 

104 ) 

105 

106 if private_net_key_path.exists() and public_net_key_path.exists(): 

107 public_nebula_key = public_net_key_path.read_text() 

108 logger.info( 

109 "private and public nebula key already exists", 

110 public_key=public_nebula_key, 

111 ) 

112 else: 

113 logger.info("creating private and public nebula key") 

114 private, public_nebula_key = create_keys() 

115 private_net_key_path.write_text(private) 

116 private_auth_key_path.chmod(0o600) 

117 public_net_key_path.write_text(public_nebula_key) 

118 public_net_key_path.chmod(0o600) 

119 logger.info( 

120 "private and public nebula key created", public_nebula_key=public_nebula_key 

121 ) 

122 

123 enrollment = ClientEnrollment( 

124 enrollment_key=enrollment_key, 

125 public_net_key=public_nebula_key, 

126 public_auth_key=public_auth_key, 

127 preferred_hostname=preferred_hostname, 

128 public_ip=public_ip, 

129 ) 

130 

131 res = httpx.post( 

132 f"{mesh_admin_endpoint}/api/v1/enroll", 

133 content=enrollment.model_dump_json(), 

134 headers={"Content-Type": "application/json"}, 

135 ) 

136 res.raise_for_status() 

137 

138 get_config(mesh_config_path, mesh_admin_endpoint) 

139 logger.info("enrollment response", enrollment=res.content) 

140 logger.info("enrollment finished") 

141 

142 

143@app.command() 

144def install_service( 

145 mesh_config_path: Annotated[ 

146 Path, 

147 typer.Option(envvar="MESH_CONFIG_PATH"), 

148 ] = None, 

149 mesh_admin_endpoint: Annotated[ 

150 str, 

151 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

152 ] = config.server_url, 

153): 

154 os_name = platform.system() 

155 meshadmin_path = shutil.which("meshadmin") 

156 

157 if not meshadmin_path: 

158 logger.error("meshadmin executable not found in PATH") 

159 exit(1) 

160 

161 if mesh_config_path is None: 

162 if os_name == "Darwin": 

163 mesh_config_path = Path( 

164 os.path.expanduser("~/Library/Application Support/meshadmin") 

165 ) 

166 else: 

167 mesh_config_path = Path("/etc/meshadmin") 

168 

169 mesh_config_path = Path(os.path.expanduser(str(mesh_config_path))) 

170 if not mesh_config_path.exists(): 

171 mesh_config_path.mkdir(exist_ok=True, parents=True) 

172 

173 (mesh_config_path / "env").write_text( 

174 f"""MESH_ADMIN_ENDPOINT={mesh_admin_endpoint} 

175 MESH_CONFIG_PATH={mesh_config_path.absolute()} 

176 """ 

177 ) 

178 if os_name == "Darwin": 

179 plist_content = f"""<?xml version="1.0" encoding="UTF-8"?> 

180<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> 

181<plist version="1.0"> 

182<dict> 

183 <key>Label</key> 

184 <string>com.meshadmin.service</string> 

185 <key>ProgramArguments</key> 

186 <array> 

187 <string>{meshadmin_path}</string> 

188 <string>start</string> 

189 </array> 

190 <key>EnvironmentVariables</key> 

191 <dict> 

192 <key>MESH_ADMIN_ENDPOINT</key> 

193 <string>{mesh_admin_endpoint}</string> 

194 <key>MESH_CONFIG_PATH</key> 

195 <string>{mesh_config_path.absolute()}</string> 

196 </dict> 

197 <key>RunAtLoad</key> 

198 <true/> 

199 <key>KeepAlive</key> 

200 <true/> 

201 <key>StandardErrorPath</key> 

202 <string>{mesh_config_path.absolute()}/error.log</string> 

203 <key>StandardOutPath</key> 

204 <string>{mesh_config_path.absolute()}/output.log</string> 

205</dict> 

206</plist> 

207""" 

208 launch_agents_dir = Path(os.path.expanduser("~/Library/LaunchAgents")) 

209 if not launch_agents_dir.exists(): 

210 launch_agents_dir.mkdir(exist_ok=True, parents=True) 

211 plist_path = launch_agents_dir / "com.meshadmin.service.plist" 

212 plist_path.write_text(plist_content) 

213 subprocess.run(["launchctl", "load", str(plist_path)]) 

214 logger.info( 

215 "meshadmin service installed and started", 

216 plist_path=str(plist_path), 

217 config_path=str(mesh_config_path), 

218 ) 

219 print(f"meshadmin service installed at {plist_path}") 

220 print(f"Configuration directory: {mesh_config_path}") 

221 print("Service has been loaded and will start automatically on login") 

222 

223 else: 

224 systemd_unit = f"""[Unit] 

225Description=Meshadmin 

226Wants=basic.target network-online.target nss-lookup.target time-sync.target 

227After=basic.target network.target network-online.target 

228Before=sshd.service 

229 

230[Service] 

231#Type=notify 

232#NotifyAccess=main 

233SyslogIdentifier=meshadmin 

234EnvironmentFile={mesh_config_path.absolute()}/env 

235ExecReload=/bin/kill -HUP $MAINPID 

236ExecStart={meshadmin_path} start  

237Restart=always 

238 

239[Install] 

240WantedBy=multi-user.target 

241""" 

242 config.systemd_service_path.write_text(systemd_unit) 

243 subprocess.run(["systemctl", "daemon-reload"]) 

244 subprocess.run(["systemctl", "enable", "meshadmin"]) 

245 print(f"meshadmin service installed at {config.systemd_service_path}") 

246 print(f"Configuration directory: {mesh_config_path}") 

247 print("Service has been enabled and will start automatically on boot") 

248 

249 

250@app.command() 

251def uninstall_service(): 

252 os_name = platform.system() 

253 if os_name == "Darwin": 

254 plist_path = Path( 

255 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist") 

256 ) 

257 if plist_path.exists(): 

258 subprocess.run(["launchctl", "unload", str(plist_path)]) 

259 plist_path.unlink() 

260 logger.info("meshadmin service uninstalled", plist_path=str(plist_path)) 

261 print(f"meshadmin service uninstalled from {plist_path}") 

262 else: 

263 logger.warning("meshadmin service not found", plist_path=str(plist_path)) 

264 print("meshadmin service not found, nothing to uninstall") 

265 else: 

266 if config.systemd_service_path.exists(): 

267 subprocess.run(["systemctl", "stop", "meshadmin"]) 

268 subprocess.run(["systemctl", "disable", "meshadmin"]) 

269 subprocess.run(["systemctl", "daemon-reload"]) 

270 config.systemd_service_path.unlink() 

271 env_path = Path("/etc/meshadmin/env") 

272 if env_path.exists(): 

273 env_path.unlink() 

274 logger.info("meshadmin service uninstalled") 

275 print("meshadmin service uninstalled") 

276 else: 

277 logger.warning("meshadmin service not found") 

278 print("meshadmin service not found, nothing to uninstall") 

279 

280 

281@app.command() 

282def start_service(): 

283 os_name = platform.system() 

284 if os_name == "Darwin": 

285 plist_path = Path( 

286 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist") 

287 ) 

288 if plist_path.exists(): 

289 subprocess.run(["launchctl", "load", str(plist_path)]) 

290 logger.info("meshadmin service started") 

291 print("meshadmin service started") 

292 else: 

293 logger.error("meshadmin service not installed", plist_path=str(plist_path)) 

294 print( 

295 "meshadmin service not installed. Run 'meshadmin install_service' first." 

296 ) 

297 else: 

298 subprocess.run(["systemctl", "start", "meshadmin"]) 

299 print("meshadmin service started") 

300 

301 

302@app.command() 

303def stop_service(): 

304 os_name = platform.system() 

305 if os_name == "Darwin": 

306 plist_path = Path( 

307 os.path.expanduser("~/Library/LaunchAgents/com.meshadmin.service.plist") 

308 ) 

309 if plist_path.exists(): 

310 subprocess.run(["launchctl", "unload", str(plist_path)]) 

311 logger.info("meshadmin service stopped") 

312 print("meshadmin service stopped") 

313 else: 

314 logger.error("meshadmin service not installed", plist_path=str(plist_path)) 

315 print("meshadmin service not installed. Nothing to stop.") 

316 else: 

317 subprocess.run(["systemctl", "stop", "meshadmin"]) 

318 print("meshadmin service stopped") 

319 

320 

321@app.command() 

322def create_auth_key( 

323 mesh_config_path: Annotated[ 

324 Path, 

325 typer.Argument(envvar="MESH_CONFIG_PATH"), 

326 ] = ".", 

327): 

328 jwk = JWK.generate(kty="RSA", kid=str(uuid4()), size=2048) 

329 auth_key = mesh_config_path / config.private_key 

330 auth_key.write_text(jwk.export_private()) 

331 auth_key.chmod(0o600) 

332 

333 

334@app.command() 

335def show_auth_public_key( 

336 mesh_config_path: Annotated[ 

337 Path, 

338 typer.Argument(envvar="MESH_CONFIG_PATH"), 

339 ] = ".", 

340): 

341 jwk = JWK.from_json((mesh_config_path / config.private_key).read_text()) 

342 print(jwk.export_public()) 

343 

344 

345@app.command() 

346def create_net_keys( 

347 mesh_config_path: Annotated[ 

348 Path, 

349 typer.Argument(envvar="MESH_CONFIG_PATH"), 

350 ] = ".", 

351): 

352 private, public = create_keys() 

353 private_net_key_path = mesh_config_path / config.private_net_key_file 

354 private_net_key_path.write_text(private) 

355 private_net_key_path.chmod(0o600) 

356 public_net_key_path = mesh_config_path / config.public_net_key_file 

357 public_net_key_path.write_text(public) 

358 public_net_key_path.chmod(0o600) 

359 

360 

361@app.command() 

362def get_config( 

363 mesh_config_path: Annotated[ 

364 Path, 

365 typer.Option(envvar="MESH_CONFIG_PATH"), 

366 ] = ".", 

367 mesh_admin_endpoint: Annotated[ 

368 str, 

369 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

370 ] = config.server_url, 

371): 

372 private_net_key, public_net_key = create_keys() 

373 private_auth_key = JWK.from_json( 

374 (mesh_config_path / config.private_key).read_text() 

375 ) 

376 

377 loop = asyncio.get_event_loop() 

378 

379 result, _ = loop.run_until_complete( 

380 get_config_from_mesh(mesh_admin_endpoint, private_auth_key) 

381 ) 

382 (mesh_config_path / config.config_path).write_text(result) 

383 

384 

385async def get_config_from_mesh(mesh_admin_endpoint, private_auth_key): 

386 jwt = JWT( 

387 header={"alg": "RS256", "kid": private_auth_key.thumbprint()}, 

388 claims={ 

389 "exp": create_expiration_date(10), 

390 "kid": private_auth_key.thumbprint(), 

391 }, 

392 ) 

393 jwt.make_signed_token(private_auth_key) 

394 token = jwt.serialize() 

395 

396 async with httpx.AsyncClient() as client: 

397 res = await client.get( 

398 f"{mesh_admin_endpoint}/api/v1/config", 

399 headers={"Authorization": f"Bearer {token}"}, 

400 ) 

401 res.raise_for_status() 

402 config = res.text 

403 update_interval = int(res.headers.get("X-Update-Interval", "5")) 

404 return config, update_interval 

405 

406 

407async def cleanup_ephemeral_hosts(mesh_admin_endpoint, private_auth_key): 

408 jwt_token = JWT( 

409 header={"alg": "RS256", "kid": private_auth_key.thumbprint()}, 

410 claims={ 

411 "exp": create_expiration_date(10), 

412 "kid": private_auth_key.thumbprint(), 

413 }, 

414 ) 

415 jwt_token.make_signed_token(private_auth_key) 

416 token = jwt_token.serialize() 

417 

418 async with httpx.AsyncClient() as client: 

419 res = await client.post( 

420 f"{mesh_admin_endpoint}/api/v1/cleanup-ephemeral", 

421 headers={"Authorization": f"Bearer {token}"}, 

422 ) 

423 res.raise_for_status() 

424 return res.json() 

425 

426 

427async def start_nebula(mesh_config_path, mesh_admin_endpoint): 

428 await logger.ainfo("starting nebula") 

429 conf_path = mesh_config_path / config.config_path 

430 assert conf_path.exists(), f"Config at {conf_path} does not exist" 

431 

432 private_auth_key_path = mesh_config_path / config.private_key 

433 assert private_auth_key_path.exists(), ( 

434 f"private_key at {private_auth_key_path} does not exist" 

435 ) 

436 

437 async def start_process(): 

438 return await asyncio.create_subprocess_exec( 

439 get_nebula_path(), 

440 "-config", 

441 str(conf_path), 

442 cwd=mesh_config_path, 

443 ) 

444 

445 proc = await start_process() 

446 

447 # Default update interval in seconds 

448 update_interval = 5 

449 

450 while True: 

451 await asyncio.sleep(update_interval) 

452 try: 

453 private_auth_key_path = mesh_config_path / config.private_key 

454 private_auth_key = JWK.from_json(private_auth_key_path.read_text()) 

455 

456 # Check for config updates 

457 try: 

458 new_config, new_update_interval = await get_config_from_mesh( 

459 mesh_admin_endpoint, private_auth_key 

460 ) 

461 

462 if update_interval != new_update_interval: 

463 await logger.ainfo( 

464 "update interval changed", 

465 old_interval=update_interval, 

466 new_interval=new_update_interval, 

467 ) 

468 update_interval = new_update_interval 

469 

470 old_config = conf_path.read_text() 

471 if new_config != old_config: 

472 await logger.ainfo("config changed, reloading") 

473 conf_path.write_text(new_config) 

474 conf_path.chmod(0o600) 

475 

476 try: 

477 proc.send_signal(signal.SIGHUP) 

478 except ProcessLookupError: 

479 await logger.ainfo("process died, restarting") 

480 proc = await start_process() 

481 else: 

482 await logger.ainfo("config not changed") 

483 except httpx.HTTPStatusError as e: 

484 if e.response.status_code == 401: 

485 await logger.aerror( 

486 "Could not get config because of authentication error. Host may have been deleted.", 

487 error=str(e), 

488 response_text=e.response.text, 

489 ) 

490 print( 

491 "Error: Could not get config because of authentication error. Host may have been deleted." 

492 ) 

493 print(f"Server message: {e.response.text}") 

494 break 

495 else: 

496 await logger.aerror("error getting config", error=str(e)) 

497 

498 # Cleanup ephemeral hosts 

499 try: 

500 result = await cleanup_ephemeral_hosts( 

501 mesh_admin_endpoint, private_auth_key 

502 ) 

503 if result.get("removed_count", 0) > 0: 

504 await logger.ainfo( 

505 "removed stale ephemeral hosts", 

506 count=result["removed_count"], 

507 ) 

508 except httpx.HTTPStatusError as e: 

509 if e.response.status_code == 401: 

510 await logger.aerror( 

511 "Could not clean up ephemeral hosts because of authentication error. Host may have been deleted.", 

512 error=str(e), 

513 response_text=e.response.text, 

514 ) 

515 print( 

516 "Error: Could not clean up ephemeral hosts because of authentication error. Host may have been deleted." 

517 ) 

518 print(f"Server message: {e.response.text}") 

519 break 

520 else: 

521 await logger.aerror("error during cleanup operation", error=str(e)) 

522 

523 except Exception: 

524 await logger.aexception("could not refresh token") 

525 if proc.returncode is not None: 

526 await logger.ainfo("process died, restarting") 

527 proc = await start_process() 

528 

529 # Clean shutdown if we get here 

530 if proc.returncode is None: 

531 await logger.ainfo("shutting down nebula process") 

532 proc.terminate() 

533 try: 

534 await asyncio.wait_for(proc.wait(), timeout=5.0) 

535 except asyncio.TimeoutError: 

536 await logger.awarning("nebula process didn't terminate, killing it") 

537 proc.kill() 

538 

539 

540@app.command() 

541def start( 

542 mesh_config_path: Annotated[ 

543 Path, 

544 typer.Option(envvar="MESH_CONFIG_PATH"), 

545 ] = ".", 

546 mesh_admin_endpoint: Annotated[ 

547 str, 

548 typer.Option(envvar="MESH_ADMIN_ENDPOINT"), 

549 ] = config.server_url, 

550): 

551 asyncio.run(start_nebula(mesh_config_path, mesh_admin_endpoint)) 

552 

553 

554@app.command() 

555def show_public_key(private_key: Path): 

556 jwk = JWK.from_json(private_key.read_text()) 

557 print(jwk.export_public()) 

558 

559 

560@app.command() 

561def login(): 

562 res = httpx.post( 

563 config.keycloak_device_auth_url, 

564 data={ 

565 "client_id": config.keycloak_admin_client, 

566 }, 

567 ) 

568 res.raise_for_status() 

569 

570 device_auth_response = res.json() 

571 print(device_auth_response) 

572 print( 

573 "Please open the verification url", 

574 device_auth_response["verification_uri_complete"], 

575 ) 

576 

577 while True: 

578 res = httpx.post( 

579 config.keycloak_token_url, 

580 data={ 

581 "grant_type": "urn:ietf:params:oauth:grant-type:device_code", 

582 "client_id": config.keycloak_admin_client, 

583 "device_code": device_auth_response["device_code"], 

584 }, 

585 ) 

586 if res.status_code == 200: 

587 logger.info("Received auth token") 

588 config.authentication_path.write_bytes(res.content) 

589 config.authentication_path.chmod(0o600) 

590 

591 access_token = res.json()["access_token"] 

592 refresh_token = res.json()["refresh_token"] 

593 print( 

594 jwt.decode( 

595 refresh_token, 

596 algorithms=["RS256"], 

597 options={"verify_signature": False}, 

598 ) 

599 ) 

600 logger.info("access_token", access_token=access_token) 

601 print("successfully authenticated") 

602 break 

603 else: 

604 print(res.json()) 

605 sleep(device_auth_response["interval"]) 

606 

607 

608def get_access_token(): 

609 if config.authentication_path.exists(): 

610 auth = json.loads(config.authentication_path.read_text()) 

611 access_token = auth["access_token"] 

612 

613 decoded_token = decode( 

614 access_token, options={"verify_signature": False, "verify_exp": False} 

615 ) 

616 

617 # is exp still 2/3 of the time 

618 if decoded_token["exp"] >= (datetime.now() + timedelta(seconds=10)).timestamp(): 

619 return access_token 

620 else: 

621 refresh_token = auth["refresh_token"] 

622 res = httpx.post( 

623 config.keycloak_token_url, 

624 data={ 

625 "grant_type": "refresh_token", 

626 "refresh_token": refresh_token, 

627 "client_id": config.keycloak_admin_client, 

628 }, 

629 ) 

630 res.raise_for_status() 

631 config.authentication_path.write_bytes(res.content) 

632 return res.json()["access_token"] 

633 

634 else: 

635 print("authentication failed") 

636 

637 

638@app.command() 

639def create_network(name: str, cidr: str): 

640 try: 

641 access_token = get_access_token() 

642 except Exception: 

643 logger.exception("failed to get access token") 

644 exit(1) 

645 

646 res = httpx.post( 

647 f"{config.api_endpoint}/networks", 

648 content=NetworkCreate(name=name, cidr=cidr).model_dump_json(), 

649 headers={"Authorization": f"Bearer {access_token}"}, 

650 ) 

651 

652 if res.status_code >= 400: 

653 print("could not create network:", res.text) 

654 exit(1) 

655 

656 print_json(res.content.decode("utf-8")) 

657 

658 

659@app.command() 

660def list_networks(): 

661 try: 

662 access_token = get_access_token() 

663 except Exception: 

664 logger.exception("failed to get access token") 

665 exit(1) 

666 

667 res = httpx.get( 

668 f"{config.api_endpoint}/networks", 

669 headers={"Authorization": f"Bearer {access_token}"}, 

670 ) 

671 res.raise_for_status() 

672 print(res.json()) 

673 

674 

675@app.command() 

676def create_template( 

677 name: str, network_name: str, is_lighthouse: bool, is_relay: bool, use_relay: bool 

678): 

679 try: 

680 access_token = get_access_token() 

681 except Exception: 

682 logger.exception("failed to get access token") 

683 exit(1) 

684 

685 res = httpx.post( 

686 f"{config.api_endpoint}/templates", 

687 content=TemplateCreate( 

688 name=name, 

689 network_name=network_name, 

690 is_lighthouse=is_lighthouse, 

691 is_relay=is_relay, 

692 use_relay=use_relay, 

693 ).model_dump_json(), 

694 headers={"Authorization": f"Bearer {access_token}"}, 

695 ) 

696 res.raise_for_status() 

697 print_json(res.content.decode("utf-8")) 

698 

699 

700@app.command() 

701def delete_template(name: str): 

702 try: 

703 access_token = get_access_token() 

704 except Exception: 

705 logger.exception("failed to get access token") 

706 exit(1) 

707 

708 res = httpx.delete( 

709 f"{config.api_endpoint}/templates/{name}", 

710 headers={"Authorization": f"Bearer {access_token}"}, 

711 ) 

712 res.raise_for_status() 

713 print(res.json()) 

714 

715 

716@app.command() 

717def delete_host(name: str): 

718 try: 

719 access_token = get_access_token() 

720 except Exception: 

721 logger.exception("failed to get access token") 

722 exit(1) 

723 

724 res = httpx.delete( 

725 f"{config.api_endpoint}/hosts/{name}", 

726 headers={"Authorization": f"Bearer {access_token}"}, 

727 ) 

728 res.raise_for_status() 

729 print(res.json()) 

730 

731 

732@app.command() 

733def nebula_cert(): 

734 binary_name = "nebula-cert" 

735 with resources.path("meshadmin.assets", binary_name) as binary_path: 

736 if not os.access(binary_path, os.X_OK): 

737 raise PermissionError(f"{binary_path} is not executable.") 

738 result = subprocess.run([binary_path, "--help"], text=True, capture_output=True) 

739 print(result.stdout) 

740 

741 

742if __name__ == "__main__": 

743 app()