Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-uboot/jumpstarter_driver_uboot/client.py: 20%

91 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1import sys 

2from collections.abc import Generator 

3from contextlib import contextmanager 

4from functools import cached_property 

5 

6import pexpect 

7from jumpstarter_driver_composite.client import CompositeClient 

8 

9from .common import ESC, DhcpInfo 

10 

11 

12class UbootConsoleClient(CompositeClient): 

13 @cached_property 

14 def prompt(self) -> str: 

15 """ 

16 U-Boot prompt to expect 

17 """ 

18 

19 return self.call("get_prompt") 

20 

21 @contextmanager 

22 def reboot_to_console(self, *, debug=False) -> Generator[None]: 

23 """ 

24 Reboot to U-Boot console 

25 

26 Power cycle the target and wait for the U-Boot prompt 

27 

28 Must be used as a context manager, other methods can only be 

29 used within the reboot_to_console context 

30 

31 >>> with uboot.reboot_to_console(debug=True): # doctest: +SKIP 

32 ... uboot.set_env("foo", "bar") 

33 ... uboot.setup_dhcp() 

34 >>> # uboot.set_env("foo", "baz") # invalid use 

35 """ 

36 

37 self.logger.info("Power cycling target...") 

38 self.power.cycle() 

39 

40 self.logger.info("Waiting for U-Boot prompt...") 

41 

42 with self.serial.pexpect() as p: 

43 if debug: 

44 p.logfile_read = sys.stdout.buffer 

45 

46 for _ in range(100): # TODO: configurable retries 

47 try: 

48 p.send(ESC) 

49 p.expect_exact(self.prompt, timeout=0.1) 

50 except pexpect.TIMEOUT: 

51 continue 

52 

53 break 

54 else: 

55 raise RuntimeError("Failed to get U-Boot prompt") 

56 

57 self.p = p 

58 try: 

59 yield 

60 finally: 

61 delattr(self, "p") 

62 

63 def run_command(self, cmd: str, timeout: int = 60, *, _internal_log=True) -> bytes: 

64 """ 

65 Run raw command in the U-Boot console 

66 """ 

67 

68 if _internal_log: 

69 self.logger.info(f"Running command: {cmd}") 

70 if not hasattr(self, "p"): 

71 raise RuntimeError("Not in a reboot_to_console context") 

72 self.p.sendline("") 

73 self.p.expect_exact(self.prompt, timeout=timeout) 

74 self.p.sendline(cmd) 

75 self.p.expect_exact(self.prompt, timeout=timeout) 

76 return self.p.before 

77 

78 def run_command_checked(self, cmd: str, timeout: int = 60, check=True) -> list[str]: 

79 """ 

80 Run command in the U-Boot console and check the exit code 

81 """ 

82 

83 self.logger.info(f"Running command checked: {cmd}") 

84 output = self.run_command("{}; echo $?".format(cmd), _internal_log=False) 

85 parsed = output.strip().decode().splitlines() 

86 

87 if len(parsed) < 2: 

88 raise RuntimeError("Insufficient lines returned from command execution, raw output: {}".format(output)) 

89 

90 try: 

91 retval = int(parsed[-1]) 

92 except ValueError: 

93 raise ValueError("Failed to parse command return value: {}", parsed[-1]) from None 

94 

95 if check and retval != 0: 

96 raise RuntimeError("Command failed with return value: {}, output: {}".format(retval, output)) 

97 

98 return parsed[1:-1] 

99 

100 def setup_dhcp(self, timeout: int = 60) -> DhcpInfo: 

101 """ 

102 Setup dhcp in U-Boot 

103 """ 

104 

105 self.logger.info("Running DHCP to obtain network configuration...") 

106 

107 autoload = self.get_env("autoload", timeout=timeout) 

108 self.set_env("autoload", "no") 

109 self.run_command_checked("dhcp", timeout=timeout) 

110 self.set_env("autoload", autoload) 

111 

112 ipaddr = self.get_env("ipaddr") 

113 gatewayip = self.get_env("gatewayip") 

114 netmask = self.get_env("netmask") or "255.255.255.0" 

115 

116 if not ipaddr or not gatewayip: 

117 raise ValueError("Could not extract complete network information") 

118 

119 return DhcpInfo(ip_address=ipaddr, gateway=gatewayip, netmask=netmask) 

120 

121 def get_env(self, key: str, timeout: int = 5) -> str | None: 

122 """ 

123 Get U-Boot environment variable value 

124 """ 

125 

126 self.logger.debug(f"Getting U-Boot env var: {key}") 

127 try: 

128 output = self.run_command_checked("printenv {}".format(key), timeout, check=False) 

129 if len(output) != 1: 

130 raise RuntimeError( 

131 "Invalid number of lines returned from printenv command, output: {}".format(output), 

132 ) 

133 

134 if output[0].startswith("## Error") and output[0].endswith("not defined"): 

135 return None 

136 

137 parsed = output[0].split("=", 1) 

138 if len(parsed) != 2: 

139 raise RuntimeError( 

140 "Failed to parse output of printenv command, output: {}".format(output[0]), 

141 ) 

142 

143 return parsed[1] 

144 except TimeoutError as err: 

145 raise TimeoutError(f"Timed out getting var {key}") from err 

146 

147 def set_env(self, key: str, value: str | None, timeout: int = 5) -> None: 

148 """ 

149 Set U-Boot environment variable value 

150 """ 

151 

152 if value is not None: 

153 cmd = "setenv {} '{}'".format(key, value) 

154 else: 

155 cmd = "setenv {}".format(key) 

156 

157 try: 

158 self.run_command_checked(cmd, timeout=5) 

159 except TimeoutError as err: 

160 raise TimeoutError(f"Timed out setting var {key}") from err 

161 

162 def set_env_dict(self, env: dict[str, str | None]) -> None: 

163 """ 

164 Set multiple U-Boot environment variable value 

165 """ 

166 

167 for key, value in env.items(): 

168 self.set_env(key, value)