Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-uboot/jumpstarter_driver_uboot/client.py: 19%

90 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 20:29 +0000

1import sys 

2from contextlib import contextmanager 

3from functools import cached_property 

4 

5import pexpect 

6from jumpstarter_driver_composite.client import CompositeClient 

7 

8from .common import ESC, DhcpInfo 

9 

10 

11class UbootConsoleClient(CompositeClient): 

12 @cached_property 

13 def prompt(self) -> str: 

14 """ 

15 U-Boot prompt to expect 

16 """ 

17 

18 return self.call("get_prompt") 

19 

20 @contextmanager 

21 def reboot_to_console(self, *, debug=False) -> None: 

22 """ 

23 Reboot to U-Boot console 

24 

25 Power cycle the target and wait for the U-Boot prompt 

26 

27 Must be used as a context manager, other methods can only be 

28 used within the reboot_to_console context 

29 

30 >>> with uboot.reboot_to_console(debug=True): # doctest: +SKIP 

31 ... uboot.set_env("foo", "bar") 

32 ... uboot.setup_dhcp() 

33 >>> # uboot.set_env("foo", "baz") # invalid use 

34 """ 

35 

36 self.logger.info("Power cycling target...") 

37 self.power.cycle() 

38 

39 self.logger.info("Waiting for U-Boot prompt...") 

40 

41 with self.serial.pexpect() as p: 

42 if debug: 

43 p.logfile_read = sys.stdout.buffer 

44 

45 for _ in range(100): # TODO: configurable retries 

46 try: 

47 p.send(ESC) 

48 p.expect_exact(self.prompt, timeout=0.1) 

49 except pexpect.TIMEOUT: 

50 continue 

51 

52 break 

53 else: 

54 raise RuntimeError("Failed to get U-Boot prompt") 

55 

56 self.p = p 

57 try: 

58 yield 

59 finally: 

60 delattr(self, "p") 

61 

62 def run_command(self, cmd: str, timeout: int = 60, *, _internal_log=True) -> bytes: 

63 """ 

64 Run raw command in the U-Boot console 

65 """ 

66 

67 if _internal_log: 

68 self.logger.info(f"Running command: {cmd}") 

69 if not hasattr(self, "p"): 

70 raise RuntimeError("Not in a reboot_to_console context") 

71 self.p.sendline("") 

72 self.p.expect_exact(self.prompt, timeout=timeout) 

73 self.p.sendline(cmd) 

74 self.p.expect_exact(self.prompt, timeout=timeout) 

75 return self.p.before 

76 

77 def run_command_checked(self, cmd: str, timeout: int = 60, check=True) -> list[str]: 

78 """ 

79 Run command in the U-Boot console and check the exit code 

80 """ 

81 

82 self.logger.info(f"Running command checked: {cmd}") 

83 output = self.run_command("{}; echo $?".format(cmd), _internal_log=False) 

84 parsed = output.strip().decode().splitlines() 

85 

86 if len(parsed) < 2: 

87 raise RuntimeError("Insufficient lines returned from command execution, raw output: {}".format(output)) 

88 

89 try: 

90 retval = int(parsed[-1]) 

91 except ValueError: 

92 raise ValueError("Failed to parse command return value: {}", parsed[-1]) from None 

93 

94 if check and retval != 0: 

95 raise RuntimeError("Command failed with return value: {}, output: {}".format(retval, output)) 

96 

97 return parsed[1:-1] 

98 

99 def setup_dhcp(self, timeout: int = 60) -> DhcpInfo: 

100 """ 

101 Setup dhcp in U-Boot 

102 """ 

103 

104 self.logger.info("Running DHCP to obtain network configuration...") 

105 

106 autoload = self.get_env("autoload", timeout=timeout) 

107 self.set_env("autoload", "no") 

108 self.run_command_checked("dhcp", timeout=timeout) 

109 self.set_env("autoload", autoload) 

110 

111 ipaddr = self.get_env("ipaddr") 

112 gatewayip = self.get_env("gatewayip") 

113 netmask = self.get_env("netmask") or "255.255.255.0" 

114 

115 if not ipaddr or not gatewayip: 

116 raise ValueError("Could not extract complete network information") 

117 

118 return DhcpInfo(ip_address=ipaddr, gateway=gatewayip, netmask=netmask) 

119 

120 def get_env(self, key: str, timeout: int = 5) -> str | None: 

121 """ 

122 Get U-Boot environment variable value 

123 """ 

124 

125 self.logger.debug(f"Getting U-Boot env var: {key}") 

126 try: 

127 output = self.run_command_checked("printenv {}".format(key), timeout, check=False) 

128 if len(output) != 1: 

129 raise RuntimeError( 

130 "Invalid number of lines returned from printenv command, output: {}".format(output), 

131 ) 

132 

133 if output[0].startswith("## Error") and output[0].endswith("not defined"): 

134 return None 

135 

136 parsed = output[0].split("=", 1) 

137 if len(parsed) != 2: 

138 raise RuntimeError( 

139 "Failed to parse output of printenv command, output: {}".format(output[0]), 

140 ) 

141 

142 return parsed[1] 

143 except TimeoutError as err: 

144 raise TimeoutError(f"Timed out getting var {key}") from err 

145 

146 def set_env(self, key: str, value: str | None, timeout: int = 5) -> None: 

147 """ 

148 Set U-Boot environment variable value 

149 """ 

150 

151 if value is not None: 

152 cmd = "setenv {} '{}'".format(key, value) 

153 else: 

154 cmd = "setenv {}".format(key) 

155 

156 try: 

157 self.run_command_checked(cmd, timeout=5) 

158 except TimeoutError as err: 

159 raise TimeoutError(f"Timed out setting var {key}") from err 

160 

161 def set_env_dict(self, env: dict[str, str | None]) -> None: 

162 """ 

163 Set multiple U-Boot environment variable value 

164 """ 

165 

166 for key, value in env.items(): 

167 self.set_env(key, value)