Coverage for src/meshadmin/cli/main.py: 64%
70 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 11:34 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 11:34 +0200
1from importlib.metadata import PackageNotFoundError, version
2from pathlib import Path
3from time import sleep
4from typing import Annotated
6import httpx
7import jwt
8import structlog
9import typer
10import yaml
12from meshadmin.cli.commands import (
13 context_app,
14 host_app,
15 nebula_app,
16 network_app,
17 service_app,
18 system_app,
19 template_app,
20)
21from meshadmin.cli.config import get_config, load_config, set_config
22from meshadmin.common.utils import get_default_config_path
24app = typer.Typer()
25logger = structlog.get_logger(__name__)
27app.add_typer(nebula_app, name="nebula", help="Manage the nebula service")
28app.add_typer(service_app, name="service", help="Manage the meshadmin service")
29app.add_typer(network_app, name="network", help="Manage networks")
30app.add_typer(template_app, name="template", help="Manage templates")
31app.add_typer(host_app, name="host", help="Manage hosts")
32app.add_typer(context_app, name="context", help="Manage network contexts")
33app.add_typer(system_app, name="system", help="System maintenance commands")
36def version_callback(value: bool):
37 if value:
38 try:
39 installed_version = version("meshadmin")
40 typer.echo(f"meshadmin version {installed_version}")
41 except PackageNotFoundError:
42 typer.echo("meshadmin is not installed")
43 raise typer.Exit()
46@app.callback()
47def main(
48 ctx: typer.Context,
49 version: bool = typer.Option(
50 None,
51 "--version",
52 callback=version_callback,
53 is_eager=True,
54 help="Show the version and exit.",
55 ),
56 config_path: Annotated[
57 Path,
58 typer.Option(
59 "--config-path",
60 "-c",
61 envvar="MESHADMIN_CONFIG_PATH",
62 help="Path to the configuration directory",
63 ),
64 ] = get_default_config_path(),
65 context: Annotated[
66 str,
67 typer.Option(
68 "--context",
69 envvar="MESH_CONTEXT",
70 help="Name of the context to use",
71 ),
72 ] = None,
73):
74 config = load_config(config_path)
75 set_config(config)
77 if context:
78 if not config.contexts_file.exists():
79 print("No contexts found")
80 raise typer.Exit(1)
82 with open(config.contexts_file) as f:
83 contexts = yaml.safe_load(f) or {}
85 if context not in contexts:
86 print(f"Context '{context}' not found")
87 raise typer.Exit(1)
89 for ctx_name in contexts:
90 contexts[ctx_name]["active"] = ctx_name == context
92 with open(config.contexts_file, "w") as f:
93 yaml.dump(contexts, f)
96@app.command()
97def login():
98 config = get_config()
99 res = httpx.post(
100 config.keycloak_device_auth_url,
101 data={
102 "client_id": config.keycloak_admin_client,
103 },
104 )
105 res.raise_for_status()
107 device_auth_response = res.json()
108 print(device_auth_response)
109 print(
110 "Please open the verification url",
111 device_auth_response["verification_uri_complete"],
112 )
114 while True:
115 res = httpx.post(
116 config.keycloak_token_url,
117 data={
118 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
119 "client_id": config.keycloak_admin_client,
120 "device_code": device_auth_response["device_code"],
121 },
122 )
123 if res.status_code == 200:
124 logger.info("Received auth token")
125 config.authentication_path.write_bytes(res.content)
126 config.authentication_path.chmod(0o600)
128 access_token = res.json()["access_token"]
129 refresh_token = res.json()["refresh_token"]
130 print(
131 jwt.decode(
132 refresh_token,
133 algorithms=["RS256"],
134 options={"verify_signature": False},
135 )
136 )
137 logger.info("access_token", access_token=access_token)
138 print("successfully authenticated")
139 break
140 else:
141 print(res.json())
142 sleep(device_auth_response["interval"])
145if __name__ == "__main__":
146 app()