Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-uboot/jumpstarter_driver_uboot/client.py: 20%
91 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 15:50 +0200
1import sys
2from collections.abc import Generator
3from contextlib import contextmanager
4from functools import cached_property
6import pexpect
7from jumpstarter_driver_composite.client import CompositeClient
9from .common import ESC, DhcpInfo
12class UbootConsoleClient(CompositeClient):
13 @cached_property
14 def prompt(self) -> str:
15 """
16 U-Boot prompt to expect
17 """
19 return self.call("get_prompt")
21 @contextmanager
22 def reboot_to_console(self, *, debug=False) -> Generator[None]:
23 """
24 Reboot to U-Boot console
26 Power cycle the target and wait for the U-Boot prompt
28 Must be used as a context manager, other methods can only be
29 used within the reboot_to_console context
31 >>> with uboot.reboot_to_console(debug=True): # doctest: +SKIP
32 ... uboot.set_env("foo", "bar")
33 ... uboot.setup_dhcp()
34 >>> # uboot.set_env("foo", "baz") # invalid use
35 """
37 self.logger.info("Power cycling target...")
38 self.power.cycle()
40 self.logger.info("Waiting for U-Boot prompt...")
42 with self.serial.pexpect() as p:
43 if debug:
44 p.logfile_read = sys.stdout.buffer
46 for _ in range(100): # TODO: configurable retries
47 try:
48 p.send(ESC)
49 p.expect_exact(self.prompt, timeout=0.1)
50 except pexpect.TIMEOUT:
51 continue
53 break
54 else:
55 raise RuntimeError("Failed to get U-Boot prompt")
57 self.p = p
58 try:
59 yield
60 finally:
61 delattr(self, "p")
63 def run_command(self, cmd: str, timeout: int = 60, *, _internal_log=True) -> bytes:
64 """
65 Run raw command in the U-Boot console
66 """
68 if _internal_log:
69 self.logger.info(f"Running command: {cmd}")
70 if not hasattr(self, "p"):
71 raise RuntimeError("Not in a reboot_to_console context")
72 self.p.sendline("")
73 self.p.expect_exact(self.prompt, timeout=timeout)
74 self.p.sendline(cmd)
75 self.p.expect_exact(self.prompt, timeout=timeout)
76 return self.p.before
78 def run_command_checked(self, cmd: str, timeout: int = 60, check=True) -> list[str]:
79 """
80 Run command in the U-Boot console and check the exit code
81 """
83 self.logger.info(f"Running command checked: {cmd}")
84 output = self.run_command("{}; echo $?".format(cmd), _internal_log=False)
85 parsed = output.strip().decode().splitlines()
87 if len(parsed) < 2:
88 raise RuntimeError("Insufficient lines returned from command execution, raw output: {}".format(output))
90 try:
91 retval = int(parsed[-1])
92 except ValueError:
93 raise ValueError("Failed to parse command return value: {}", parsed[-1]) from None
95 if check and retval != 0:
96 raise RuntimeError("Command failed with return value: {}, output: {}".format(retval, output))
98 return parsed[1:-1]
100 def setup_dhcp(self, timeout: int = 60) -> DhcpInfo:
101 """
102 Setup dhcp in U-Boot
103 """
105 self.logger.info("Running DHCP to obtain network configuration...")
107 autoload = self.get_env("autoload", timeout=timeout)
108 self.set_env("autoload", "no")
109 self.run_command_checked("dhcp", timeout=timeout)
110 self.set_env("autoload", autoload)
112 ipaddr = self.get_env("ipaddr")
113 gatewayip = self.get_env("gatewayip")
114 netmask = self.get_env("netmask") or "255.255.255.0"
116 if not ipaddr or not gatewayip:
117 raise ValueError("Could not extract complete network information")
119 return DhcpInfo(ip_address=ipaddr, gateway=gatewayip, netmask=netmask)
121 def get_env(self, key: str, timeout: int = 5) -> str | None:
122 """
123 Get U-Boot environment variable value
124 """
126 self.logger.debug(f"Getting U-Boot env var: {key}")
127 try:
128 output = self.run_command_checked("printenv {}".format(key), timeout, check=False)
129 if len(output) != 1:
130 raise RuntimeError(
131 "Invalid number of lines returned from printenv command, output: {}".format(output),
132 )
134 if output[0].startswith("## Error") and output[0].endswith("not defined"):
135 return None
137 parsed = output[0].split("=", 1)
138 if len(parsed) != 2:
139 raise RuntimeError(
140 "Failed to parse output of printenv command, output: {}".format(output[0]),
141 )
143 return parsed[1]
144 except TimeoutError as err:
145 raise TimeoutError(f"Timed out getting var {key}") from err
147 def set_env(self, key: str, value: str | None, timeout: int = 5) -> None:
148 """
149 Set U-Boot environment variable value
150 """
152 if value is not None:
153 cmd = "setenv {} '{}'".format(key, value)
154 else:
155 cmd = "setenv {}".format(key)
157 try:
158 self.run_command_checked(cmd, timeout=5)
159 except TimeoutError as err:
160 raise TimeoutError(f"Timed out setting var {key}") from err
162 def set_env_dict(self, env: dict[str, str | None]) -> None:
163 """
164 Set multiple U-Boot environment variable value
165 """
167 for key, value in env.items():
168 self.set_env(key, value)