Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-uboot/jumpstarter_driver_uboot/client.py: 19%
90 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1import sys
2from contextlib import contextmanager
3from functools import cached_property
5import pexpect
6from jumpstarter_driver_composite.client import CompositeClient
8from .common import ESC, DhcpInfo
11class UbootConsoleClient(CompositeClient):
12 @cached_property
13 def prompt(self) -> str:
14 """
15 U-Boot prompt to expect
16 """
18 return self.call("get_prompt")
20 @contextmanager
21 def reboot_to_console(self, *, debug=False) -> None:
22 """
23 Reboot to U-Boot console
25 Power cycle the target and wait for the U-Boot prompt
27 Must be used as a context manager, other methods can only be
28 used within the reboot_to_console context
30 >>> with uboot.reboot_to_console(debug=True): # doctest: +SKIP
31 ... uboot.set_env("foo", "bar")
32 ... uboot.setup_dhcp()
33 >>> # uboot.set_env("foo", "baz") # invalid use
34 """
36 self.logger.info("Power cycling target...")
37 self.power.cycle()
39 self.logger.info("Waiting for U-Boot prompt...")
41 with self.serial.pexpect() as p:
42 if debug:
43 p.logfile_read = sys.stdout.buffer
45 for _ in range(100): # TODO: configurable retries
46 try:
47 p.send(ESC)
48 p.expect_exact(self.prompt, timeout=0.1)
49 except pexpect.TIMEOUT:
50 continue
52 break
53 else:
54 raise RuntimeError("Failed to get U-Boot prompt")
56 self.p = p
57 try:
58 yield
59 finally:
60 delattr(self, "p")
62 def run_command(self, cmd: str, timeout: int = 60, *, _internal_log=True) -> bytes:
63 """
64 Run raw command in the U-Boot console
65 """
67 if _internal_log:
68 self.logger.info(f"Running command: {cmd}")
69 if not hasattr(self, "p"):
70 raise RuntimeError("Not in a reboot_to_console context")
71 self.p.sendline("")
72 self.p.expect_exact(self.prompt, timeout=timeout)
73 self.p.sendline(cmd)
74 self.p.expect_exact(self.prompt, timeout=timeout)
75 return self.p.before
77 def run_command_checked(self, cmd: str, timeout: int = 60, check=True) -> list[str]:
78 """
79 Run command in the U-Boot console and check the exit code
80 """
82 self.logger.info(f"Running command checked: {cmd}")
83 output = self.run_command("{}; echo $?".format(cmd), _internal_log=False)
84 parsed = output.strip().decode().splitlines()
86 if len(parsed) < 2:
87 raise RuntimeError("Insufficient lines returned from command execution, raw output: {}".format(output))
89 try:
90 retval = int(parsed[-1])
91 except ValueError:
92 raise ValueError("Failed to parse command return value: {}", parsed[-1]) from None
94 if check and retval != 0:
95 raise RuntimeError("Command failed with return value: {}, output: {}".format(retval, output))
97 return parsed[1:-1]
99 def setup_dhcp(self, timeout: int = 60) -> DhcpInfo:
100 """
101 Setup dhcp in U-Boot
102 """
104 self.logger.info("Running DHCP to obtain network configuration...")
106 autoload = self.get_env("autoload", timeout=timeout)
107 self.set_env("autoload", "no")
108 self.run_command_checked("dhcp", timeout=timeout)
109 self.set_env("autoload", autoload)
111 ipaddr = self.get_env("ipaddr")
112 gatewayip = self.get_env("gatewayip")
113 netmask = self.get_env("netmask") or "255.255.255.0"
115 if not ipaddr or not gatewayip:
116 raise ValueError("Could not extract complete network information")
118 return DhcpInfo(ip_address=ipaddr, gateway=gatewayip, netmask=netmask)
120 def get_env(self, key: str, timeout: int = 5) -> str | None:
121 """
122 Get U-Boot environment variable value
123 """
125 self.logger.debug(f"Getting U-Boot env var: {key}")
126 try:
127 output = self.run_command_checked("printenv {}".format(key), timeout, check=False)
128 if len(output) != 1:
129 raise RuntimeError(
130 "Invalid number of lines returned from printenv command, output: {}".format(output),
131 )
133 if output[0].startswith("## Error") and output[0].endswith("not defined"):
134 return None
136 parsed = output[0].split("=", 1)
137 if len(parsed) != 2:
138 raise RuntimeError(
139 "Failed to parse output of printenv command, output: {}".format(output[0]),
140 )
142 return parsed[1]
143 except TimeoutError as err:
144 raise TimeoutError(f"Timed out getting var {key}") from err
146 def set_env(self, key: str, value: str | None, timeout: int = 5) -> None:
147 """
148 Set U-Boot environment variable value
149 """
151 if value is not None:
152 cmd = "setenv {} '{}'".format(key, value)
153 else:
154 cmd = "setenv {}".format(key)
156 try:
157 self.run_command_checked(cmd, timeout=5)
158 except TimeoutError as err:
159 raise TimeoutError(f"Timed out setting var {key}") from err
161 def set_env_dict(self, env: dict[str, str | None]) -> None:
162 """
163 Set multiple U-Boot environment variable value
164 """
166 for key, value in env.items():
167 self.set_env(key, value)