Coverage for src/meshadmin/cli/main.py: 64%
69 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:09 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:09 +0200
1from importlib.metadata import PackageNotFoundError, version
2from pathlib import Path
3from time import sleep
4from typing import Annotated
6import httpx
7import jwt
8import structlog
9import typer
10import yaml
12from meshadmin.cli.commands import (
13 context_app,
14 host_app,
15 nebula_app,
16 network_app,
17 service_app,
18 template_app,
19)
20from meshadmin.cli.config import get_config, load_config, set_config
21from meshadmin.common.utils import get_default_config_path
23app = typer.Typer()
24logger = structlog.get_logger(__name__)
26app.add_typer(nebula_app, name="nebula", help="Manage the nebula service")
27app.add_typer(service_app, name="service", help="Manage the meshadmin service")
28app.add_typer(network_app, name="network", help="Manage networks")
29app.add_typer(template_app, name="template", help="Manage templates")
30app.add_typer(host_app, name="host", help="Manage hosts")
31app.add_typer(context_app, name="context", help="Manage network contexts")
34def version_callback(value: bool):
35 if value:
36 try:
37 installed_version = version("meshadmin")
38 typer.echo(f"meshadmin version {installed_version}")
39 except PackageNotFoundError:
40 typer.echo("meshadmin is not installed")
41 raise typer.Exit()
44@app.callback()
45def main(
46 ctx: typer.Context,
47 version: bool = typer.Option(
48 None,
49 "--version",
50 callback=version_callback,
51 is_eager=True,
52 help="Show the version and exit.",
53 ),
54 config_path: Annotated[
55 Path,
56 typer.Option(
57 "--config-path",
58 "-c",
59 envvar="MESHADMIN_CONFIG_PATH",
60 help="Path to the configuration directory",
61 ),
62 ] = get_default_config_path(),
63 context: Annotated[
64 str,
65 typer.Option(
66 "--context",
67 envvar="MESH_CONTEXT",
68 help="Name of the context to use",
69 ),
70 ] = None,
71):
72 config = load_config(config_path)
73 set_config(config)
75 if context:
76 if not config.contexts_file.exists():
77 print("No contexts found")
78 raise typer.Exit(1)
80 with open(config.contexts_file) as f:
81 contexts = yaml.safe_load(f) or {}
83 if context not in contexts:
84 print(f"Context '{context}' not found")
85 raise typer.Exit(1)
87 for ctx_name in contexts:
88 contexts[ctx_name]["active"] = ctx_name == context
90 with open(config.contexts_file, "w") as f:
91 yaml.dump(contexts, f)
94@app.command()
95def login():
96 config = get_config()
97 res = httpx.post(
98 config.keycloak_device_auth_url,
99 data={
100 "client_id": config.keycloak_admin_client,
101 },
102 )
103 res.raise_for_status()
105 device_auth_response = res.json()
106 print(device_auth_response)
107 print(
108 "Please open the verification url",
109 device_auth_response["verification_uri_complete"],
110 )
112 while True:
113 res = httpx.post(
114 config.keycloak_token_url,
115 data={
116 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
117 "client_id": config.keycloak_admin_client,
118 "device_code": device_auth_response["device_code"],
119 },
120 )
121 if res.status_code == 200:
122 logger.info("Received auth token")
123 config.authentication_path.write_bytes(res.content)
124 config.authentication_path.chmod(0o600)
126 access_token = res.json()["access_token"]
127 refresh_token = res.json()["refresh_token"]
128 print(
129 jwt.decode(
130 refresh_token,
131 algorithms=["RS256"],
132 options={"verify_signature": False},
133 )
134 )
135 logger.info("access_token", access_token=access_token)
136 print("successfully authenticated")
137 break
138 else:
139 print(res.json())
140 sleep(device_auth_response["interval"])
143if __name__ == "__main__":
144 app()