Coverage for src/meshadmin/cli/commands/template.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-09 15:09 +0200

1import httpx 

2import structlog 

3import typer 

4from rich import print, print_json 

5 

6from meshadmin.cli.utils import get_access_token, get_context_config 

7from meshadmin.common import schemas 

8 

9template_app = typer.Typer(no_args_is_help=True) 

10logger = structlog.get_logger(__name__) 

11 

12 

13@template_app.command(name="create") 

14def create_template( 

15 name: str, network_name: str, is_lighthouse: bool, is_relay: bool, use_relay: bool 

16): 

17 try: 

18 access_token = get_access_token() 

19 except Exception: 

20 logger.exception("failed to get access token") 

21 exit(1) 

22 

23 context = get_context_config() 

24 res = httpx.post( 

25 f"{context['endpoint']}/api/v1/templates", 

26 content=schemas.TemplateCreate( 

27 name=name, 

28 network_name=network_name, 

29 is_lighthouse=is_lighthouse, 

30 is_relay=is_relay, 

31 use_relay=use_relay, 

32 ).model_dump_json(), 

33 headers={"Authorization": f"Bearer {access_token}"}, 

34 ) 

35 res.raise_for_status() 

36 print_json(res.content.decode("utf-8")) 

37 

38 

39@template_app.command() 

40def get_token( 

41 name: str, ttl: int = typer.Option(None, help="Token validity duration in seconds") 

42): 

43 try: 

44 access_token = get_access_token() 

45 except Exception: 

46 logger.exception("failed to get access token") 

47 exit(1) 

48 

49 context = get_context_config() 

50 res = httpx.get( 

51 f"{context['endpoint']}/api/v1/templates/{name}/token", 

52 params={"ttl": ttl} if ttl else None, 

53 headers={"Authorization": f"Bearer {access_token}"}, 

54 ) 

55 res.raise_for_status() 

56 print_json(res.content.decode("utf-8")) 

57 

58 

59@template_app.command(name="delete") 

60def delete_template(name: str): 

61 try: 

62 access_token = get_access_token() 

63 except Exception: 

64 logger.exception("failed to get access token") 

65 exit(1) 

66 

67 context = get_context_config() 

68 res = httpx.delete( 

69 f"{context['endpoint']}/api/v1/templates/{name}", 

70 headers={"Authorization": f"Bearer {access_token}"}, 

71 ) 

72 res.raise_for_status() 

73 print(res.json())