Coverage for src/meshadmin/cli/commands/host.py: 94%
117 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:09 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:09 +0200
1import asyncio
2import os
3import platform
4from pathlib import Path
5from typing import Annotated
6from uuid import uuid4
8import httpx
9import structlog
10import typer
11from jwcrypto.jwk import JWK
13from meshadmin.cli.commands.nebula import download
14from meshadmin.cli.config import get_config
15from meshadmin.cli.utils import (
16 get_access_token,
17 get_config_from_mesh,
18 get_context_config,
19)
20from meshadmin.common import schemas
21from meshadmin.common.utils import create_keys, get_default_config_path, get_public_ip
23host_app = typer.Typer(no_args_is_help=True)
24logger = structlog.get_logger(__name__)
26host_config_app = typer.Typer(no_args_is_help=True)
27host_app.add_typer(host_config_app, name="config", help="Manage host configurations")
30@host_app.command(name="enroll")
31def host_enroll(
32 enrollment_key: Annotated[
33 str,
34 typer.Argument(envvar="MESH_ENROLLMENT_KEY"),
35 ],
36 preferred_hostname: Annotated[
37 str,
38 typer.Option(envvar="MESH_HOSTNAME"),
39 ] = None,
40 public_ip: Annotated[
41 str,
42 typer.Option(envvar="MESH_PUBLIC_IP"),
43 ] = None,
44):
45 config = get_config()
46 context = get_context_config()
47 network_dir = context["network_dir"]
49 download()
50 logger.info("enrolling")
52 network_dir.mkdir(parents=True, exist_ok=True)
54 # Use shared auth key for all contexts
55 private_auth_key_path = config.contexts_file.parent / config.private_key
56 if not private_auth_key_path.exists():
57 logger.info("creating auth key")
58 create_auth_key(private_auth_key_path.parent)
60 jwk = JWK.from_json(private_auth_key_path.read_text())
61 public_auth_key = jwk.export_public()
62 logger.info("public key for registration", public_key=public_auth_key)
64 private_net_key_path = network_dir / config.private_net_key_file
65 public_net_key_path = network_dir / config.public_net_key_file
67 if public_ip is None:
68 public_ip = get_public_ip()
69 logger.info(
70 "public ip not set, using ip reported by https://checkip.amazonaws.com/",
71 public_ip=public_ip,
72 )
74 if preferred_hostname is None:
75 preferred_hostname = platform.node()
76 logger.info(
77 "preferred hostname not set, using system hostname",
78 hostname=preferred_hostname,
79 )
81 if private_net_key_path.exists() and public_net_key_path.exists():
82 public_nebula_key = public_net_key_path.read_text()
83 logger.info(
84 "private and public nebula key already exists",
85 public_key=public_nebula_key,
86 )
87 else:
88 logger.info("creating private and public nebula key")
89 private, public_nebula_key = create_keys()
90 private_net_key_path.write_text(private)
91 private_auth_key_path.chmod(0o600)
92 public_net_key_path.write_text(public_nebula_key)
93 public_net_key_path.chmod(0o600)
94 logger.info(
95 "private and public nebula key created", public_nebula_key=public_nebula_key
96 )
98 enrollment = schemas.ClientEnrollment(
99 enrollment_key=enrollment_key,
100 public_net_key=public_nebula_key,
101 public_auth_key=public_auth_key,
102 preferred_hostname=preferred_hostname,
103 public_ip=public_ip,
104 interface=context["interface"],
105 )
107 res = httpx.post(
108 f"{context['endpoint']}/api/v1/enroll",
109 content=enrollment.model_dump_json(),
110 headers={"Content-Type": "application/json"},
111 )
112 res.raise_for_status()
114 get_host_config()
115 logger.info("enrollment response", enrollment=res.content)
116 logger.info("enrollment finished")
119@host_app.command()
120def create_auth_key(
121 mesh_config_path: Annotated[
122 Path,
123 typer.Argument(envvar="MESH_CONFIG_PATH"),
124 ] = get_default_config_path(),
125):
126 config = get_config()
127 jwk = JWK.generate(kty="RSA", kid=str(uuid4()), size=2048)
128 auth_key = mesh_config_path / config.private_key
129 auth_key.write_text(jwk.export_private())
130 auth_key.chmod(0o600)
133@host_app.command()
134def show_auth_public_key(
135 mesh_config_path: Annotated[
136 Path,
137 typer.Argument(envvar="MESH_CONFIG_PATH"),
138 ] = get_default_config_path(),
139):
140 config = get_config()
141 jwk = JWK.from_json((mesh_config_path / config.private_key).read_text())
142 print(jwk.export_public())
145@host_config_app.command()
146def get_host_config():
147 config = get_config()
148 private_net_key, public_net_key = create_keys()
149 context = get_context_config()
150 private_auth_key = JWK.from_json(
151 (config.contexts_file.parent / config.private_key).read_text()
152 )
154 loop = asyncio.get_event_loop()
156 result, _, _ = loop.run_until_complete(
157 get_config_from_mesh(context["endpoint"], private_auth_key)
158 )
159 (context["network_dir"] / config.config_path).write_text(result)
162@host_app.command(name="delete")
163def delete_host(name: str):
164 try:
165 access_token = get_access_token()
166 except Exception:
167 logger.exception("failed to get access token")
168 exit(1)
170 context = get_context_config()
171 res = httpx.delete(
172 f"{context['endpoint']}/api/v1/hosts/{name}",
173 headers={"Authorization": f"Bearer {access_token}"},
174 )
175 res.raise_for_status()
176 print(res.json())
179@host_config_app.command(name="info")
180def show_config_info():
181 config = get_config()
182 print("\nConfiguration Paths:")
183 print(f"Contexts file: {config.contexts_file}")
184 print(f"Networks directory: {config.networks_dir}")
185 try:
186 context = get_context_config()
187 print("\nCurrent Context:")
188 print(f"Name: {context['name']}")
189 print(f"Endpoint: {context['endpoint']}")
190 print(f"Interface: {context['interface']}")
191 print(f"Network directory: {context['network_dir']}")
193 config_file = context["network_dir"] / config.config_path
194 env_file = context["network_dir"] / "env"
195 private_key = context["network_dir"] / config.private_net_key_file
197 print("\nContext Files:")
198 print(
199 f"Config file: {config_file} {'(exists)' if config_file.exists() else '(not found)'}"
200 )
201 print(
202 f"Environment file: {env_file} {'(exists)' if env_file.exists() else '(not found)'}"
203 )
204 print(
205 f"Private key: {private_key} {'(exists)' if private_key.exists() else '(not found)'}"
206 )
207 if platform.system() == "Darwin":
208 service_file = Path(
209 os.path.expanduser(
210 f"~/Library/LaunchAgents/com.meshadmin.{context['name']}.plist"
211 )
212 )
213 print(
214 f"Service file: {service_file} {'(exists)' if service_file.exists() else '(not found)'}"
215 )
216 else:
217 service_file = Path(
218 f"/usr/lib/systemd/system/meshadmin-{context['name']}.service"
219 )
220 print(
221 f"Service file: {service_file} {'(exists)' if service_file.exists() else '(not found)'}"
222 )
223 except typer.Exit:
224 print("\nNo active context found")