Coverage for jumpstarter_driver_flashers/client.py: 19%
312 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-30 18:45 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-30 18:45 +0200
1import hashlib
2import json
3import os
4import re
5import sys
6import threading
7import time
8from contextlib import contextmanager
9from dataclasses import dataclass
10from pathlib import Path, PosixPath
11from queue import Queue
12from urllib.parse import urlparse
14import click
15from jumpstarter_driver_composite.client import CompositeClient
16from jumpstarter_driver_opendal.client import FlasherClient, OpendalClient, operator_for_path
17from jumpstarter_driver_opendal.common import PathBuf
18from jumpstarter_driver_pyserial.client import Console
19from opendal import Metadata, Operator
21from jumpstarter_driver_flashers.bundle import FlasherBundleManifestV1Alpha1
23from jumpstarter.common.exceptions import ArgumentError
25debug_console_option = click.option("--console-debug", is_flag=True, help="Enable console debug mode")
27EXPECT_TIMEOUT_DEFAULT = 60
28EXPECT_TIMEOUT_SYNC = 1200
31@dataclass(kw_only=True)
32class BaseFlasherClient(FlasherClient, CompositeClient):
33 """
34 Client interface for software driven flashing
36 This client provides methods to flash and dump images to a device under test (DUT)
37 """
39 def __post_init__(self):
40 super().__post_init__()
41 self._manifest = None
42 self._console_debug = False
44 def set_console_debug(self, debug: bool):
45 """Set console debug mode"""
46 self._console_debug = debug
47 # TODO: also set console debug on uboot client
49 @contextmanager
50 def busybox_shell(self):
51 """Start a context manager busybox interactive console"""
52 # Make the exporter download the bundle contents and set files in the right places
53 self.logger.info("Setting up flasher bundle files in exporter")
54 self.call("setup_flasher_bundle")
55 with self._services_up():
56 with self._busybox() as busybox:
57 busybox.send("\n\n")
58 yield self.serial
60 @contextmanager
61 def bootloader_shell(self):
62 """Start a context manager uboot/bootloader for interactive console"""
63 # Make the exporter download the bundle contents and set files in the right places
64 self.logger.info("Setting up flasher bundle files in exporter")
65 self.call("setup_flasher_bundle")
66 with self._services_up():
67 with self.uboot.reboot_to_console(debug=self._console_debug):
68 pass
69 yield self.serial
71 def flash(
72 self,
73 path: PathBuf,
74 *,
75 partition: str | None = None,
76 operator: Operator | None = None,
77 os_image_checksum: str | None = None,
78 force_exporter_http: bool = False,
79 force_flash_bundle: str | None = None,
80 ):
81 """Flash image to DUT"""
82 skip_exporter_http = False
83 image_url = ""
84 operator_scheme = None
85 if path.startswith("http://") and not force_exporter_http:
86 # busybox can handle the http from a remote directly, unless target is isolated
87 image_url = path
88 skip_exporter_http = True
89 else:
90 if operator is None:
91 path, operator, operator_scheme = operator_for_path(path)
92 image_url = self.http.get_url() + "/" + path.name
94 # start counting time for the flash operation
95 start_time = time.time()
97 if not skip_exporter_http:
98 # Create a queue to handle exceptions from the thread
99 error_queue = Queue()
101 # Start the storage write operation in the background
102 storage_thread = threading.Thread(
103 target=self._transfer_bg_thread,
104 args=(path, operator, operator_scheme, os_image_checksum, self.http.storage, error_queue),
105 name="storage_transfer",
106 )
107 storage_thread.start()
109 # Make the exporter download the bundle contents and set files in the right places
110 self.logger.info("Setting up flasher bundle files in exporter")
111 self.call("setup_flasher_bundle", force_flash_bundle)
113 # Early exit if there was an error in the background thread
114 if not skip_exporter_http and not error_queue.empty():
115 raise error_queue.get()
117 with self._services_up():
118 with self._busybox() as console:
119 manifest = self.manifest
120 target = partition or self.call("get_default_target") or manifest.spec.default_target
121 if not target:
122 raise ArgumentError("No partition or default target specified")
124 target_device = self._get_target_device(target, manifest, console)
126 self.logger.info(f"Using target block device: {target_device}")
127 console.sendline(f"export dhcp_addr={self._dhcp_details.ip_address}")
128 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
129 console.sendline(f"export gw_addr={self._dhcp_details.gateway}")
130 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
132 # Preflash commands are executed before the flash operation
133 # generally used to clean up boot entries in existing devices
134 for preflash_command in manifest.spec.preflash_commands:
135 self.logger.info(f"Running preflash command: {preflash_command}")
136 console.sendline(preflash_command)
137 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
139 # make sure that the device is connected to the network and has an IP address
140 console.sendline("udhcpc")
141 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
143 if not skip_exporter_http:
144 # Wait for the storage write operation to complete before proceeding
145 self.logger.info("Waiting until the http image preparation in storage is completed")
146 storage_thread.join()
148 # Check if there were any exceptions in the background thread
149 if not error_queue.empty():
150 raise error_queue.get()
152 self._flash_with_progress(console, manifest, path, image_url, target_device)
154 total_time = time.time() - start_time
155 # total time in minutes:seconds
156 minutes, seconds = divmod(total_time, 60)
157 self.logger.info(f"Flashing completed in {int(minutes)}m {int(seconds):02d}s")
158 console.sendline("reboot")
159 time.sleep(2)
160 self.logger.info("Powering off target")
161 self.power.off()
163 def _flash_with_progress(self, console, manifest, path, image_url, target_path):
164 """Flash image to target device with progress monitoring.
166 Args:
167 console: Console object for device interaction
168 manifest: Flasher manifest containing target definitions
169 path: Path to the source image
170 image_url: URL to download the image from
171 target_path: Target device path to flash to
172 """
173 # Flash the image
174 decompress_cmd = _get_decompression_command(path)
175 flash_cmd = (
176 f'( wget -q -O - "{image_url}" | '
177 f"{decompress_cmd} "
178 f"dd of={target_path} bs=64k iflag=fullblock oflag=direct) &"
179 )
180 console.sendline(flash_cmd)
181 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT * 2)
183 console.sendline("pidof dd")
184 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
185 dd_pid = console.before.decode(errors="ignore").splitlines()[1].strip()
187 # Initialize progress tracking variables
188 last_pos = 0
189 last_time = time.time()
191 while True:
192 console.sendline(f"cat /proc/{dd_pid}/fdinfo/1")
193 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
194 if "No such file or directory" in console.before.decode(errors="ignore"):
195 break
196 data = console.before.decode(errors="ignore")
197 match = re.search(r"pos:\s+(\d+)", data)
198 if match:
199 current_bytes = int(match.group(1))
200 current_time = time.time()
201 elapsed = current_time - last_time
203 if elapsed >= 1.0: # Update speed every second
204 bytes_diff = current_bytes - last_pos
205 speed_mb = (bytes_diff / (1024 * 1024)) / elapsed
206 total_mb = current_bytes / (1024 * 1024)
207 self.logger.info(f"Flash progress: {total_mb:.2f} MB, Speed: {speed_mb:.2f} MB/s")
209 last_pos = current_bytes
210 last_time = current_time
211 time.sleep(1)
212 self.logger.info("Flushing buffers")
213 console.sendline("sync")
214 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_SYNC)
216 def _get_target_device(self, target: str, manifest: FlasherBundleManifestV1Alpha1, console) -> str:
217 """Get the target device path from the manifest, resolving block devices if needed.
219 Args:
220 target: Target name from manifest
221 manifest: Flasher manifest containing target definitions
222 console: Console object for device interaction
224 Returns:
225 Resolved target device path
227 Raises:
228 ArgumentError: If target is not found in manifest
229 """
230 target_path = manifest.spec.targets.get(target)
231 if target_path is None:
232 raise ArgumentError(f"Target {target} not found in manifest")
234 if target_path.startswith("/sys/class/block#"):
235 target_path = self._lookup_block_device(console, manifest.spec.login.prompt, target_path.split("#")[1])
237 return target_path
239 def _transfer_bg_thread(
240 self,
241 src_path: PathBuf,
242 src_operator: Operator,
243 src_operator_scheme: str,
244 known_hash: str | None,
245 to_storage: OpendalClient,
246 error_queue,
247 ):
248 """Transfer image to storage in the background
249 Args:
250 src_path: Path to the source image
251 src_operator: Operator to read the source image
252 to_storage: Storage operator to write the image to
253 error_queue: Queue to put exceptions in if any
254 known_hash: Known hash of the image
255 """
256 self.logger.info(f"Writing image to storage in the background: {src_path}")
257 try:
258 filename = Path(src_path).name if isinstance(src_path, (str, os.PathLike)) else src_path.name
260 if src_operator_scheme == "fs":
261 file_hash = self._sha256_file(src_operator, src_path)
262 self.logger.info(f"Hash of {filename} is {file_hash}")
263 else:
264 file_hash = known_hash
265 self.logger.info(f"Using provided hash for {filename}: {known_hash}")
267 if file_hash and to_storage.exists(filename):
268 to_storage_hash = to_storage.hash(filename)
269 self.logger.info(f"Hash of existing file in storage: {to_storage_hash}")
271 if to_storage_hash == file_hash:
272 self.logger.info(f"Image {filename} already exists in storage with matching hash, skipping")
273 return
274 else:
275 self.logger.info(f"Image {filename} exists in storage but hash differs, will overwrite")
277 self.logger.info(f"Uploading image to storage: {filename}")
278 to_storage.write_from_path(filename, src_path, src_operator)
280 metadata, metadata_json = self._create_metadata_and_json(src_operator, src_path, file_hash)
281 metadata_file = filename + ".metadata"
282 to_storage.write_bytes(metadata_file, metadata_json.encode(errors="ignore"))
284 self.logger.info(f"Image written to storage: {filename}")
286 except Exception as e:
287 self.logger.error(f"Error writing image to storage: {e}")
288 error_queue.put(e)
289 raise
291 def _sha256_file(self, src_operator, src_path) -> str:
292 m = hashlib.sha256()
293 with src_operator.open(src_path, "rb") as f:
294 while True:
295 data = f.read(size=65536)
296 if len(data) == 0:
297 break
298 m.update(data)
300 return m.hexdigest()
302 def _create_metadata_and_json(self, src_operator, src_path, file_hash=None) -> tuple[Metadata, str]:
303 """Create a metadata json string from a metadata object"""
304 metadata = src_operator.stat(src_path)
305 metadata_dict = {
306 "path": str(src_path),
307 "content_length": metadata.content_length,
308 "etag": metadata.etag,
309 }
311 if file_hash:
312 metadata_dict["hash"] = file_hash
314 return metadata, json.dumps(metadata_dict)
316 def _lookup_block_device(self, console, prompt, address: str) -> str:
317 """Lookup block device for a given address.
318 Sometimes targets don't get assigned block device numbers in a predictable way,
319 so we need to lookup the block device by address.
320 """
321 console.send(f"ls -l /sys/class/block/ | grep {address} | head -n 1" + "\n")
322 console.expect(prompt, timeout=EXPECT_TIMEOUT_DEFAULT)
323 # This produces an output like:
324 # ls /sys/class/block/ -la | grep 4fb0000
325 # lrwxrwxrwx 1 root root 0 Jan 1
326 # 00:00 mmcblk1 -> ../../devices/platform/bus@100000/4fb0000.mmc/mmc_host/mmc1/mmc1:aaaa/block/mmcblk1
327 output = console.before.decode(errors="ignore")
328 match = re.search(r"\s(\w+)\s->", output)
329 if match:
330 return "/dev/" + match.group(1)
331 else:
332 raise ArgumentError(f"No block device found for address {address}, output was: {output}")
334 def dump(
335 self,
336 path: PathBuf,
337 *,
338 partition: str | None = None,
339 operator: Operator | None = None,
340 ):
341 """Dump image from DUT"""
342 raise NotImplementedError("Dump is not implemented for this driver yet")
344 def _filename(self, path: PathBuf) -> str:
345 """Extract filename from url or path"""
346 if path.startswith(("http://", "https://")):
347 return urlparse(path).path.split("/")[-1]
348 else:
349 return Path(path).name
351 def _upload_artifact(self, storage, path: PathBuf, operator: Operator):
352 """Upload artifact to storage"""
353 filename = self._filename(path)
354 if storage.exists(filename):
355 # TODO: check hash for existing files
356 self.logger.info(f"Artifact {filename} already exists in storage, skipping")
357 storage.write_from_path(filename, path, operator=operator)
359 @contextmanager
360 def _services_up(self):
361 """Make sure that the http and tftp services are up an running in this context"""
362 try:
363 self.http.start()
364 self.tftp.start()
365 yield
366 finally:
367 self.http.stop()
368 self.tftp.stop()
370 def _generate_uboot_env(self):
371 """Generate a uboot environment dictionary, may need specific overrides for different targets"""
372 tftp_host = self.tftp.get_host()
373 return {
374 "serverip": tftp_host,
375 }
377 @contextmanager
378 def _busybox(self):
379 """Start a busybox shell.
381 This is a helper context manager that boots the device into uboot and returns a console object.
382 """
384 # make sure that the device is booted into the uboot console
385 with self.uboot.reboot_to_console(debug=self._console_debug):
386 # run dhcp discovery and gather details useful for later
387 self._dhcp_details = self.uboot.setup_dhcp()
388 self.logger.info(f"discovered dhcp details: {self._dhcp_details}")
390 # configure the environment necessary
391 env = self._generate_uboot_env()
392 self.uboot.set_env_dict(env)
394 # load any necessary files to RAM from the tftp storage
395 manifest = self.manifest
396 kernel_filename = Path(manifest.get_kernel_file()).name
397 kernel_address = manifest.get_kernel_address()
399 self.uboot.run_command(f"tftpboot {kernel_address} {kernel_filename}", timeout=120)
401 if manifest.get_initram_file():
402 initram_filename = Path(manifest.get_initram_file()).name
403 initram_address = manifest.get_initram_address()
404 if initram_address:
405 self.uboot.run_command(f"tftpboot {initram_address} {initram_filename}", timeout=120)
407 try:
408 dtb_file = manifest.get_dtb_file()
409 if dtb_file:
410 dtb_filename = Path(dtb_file).name
411 dtb_address = manifest.get_dtb_address()
412 if dtb_address:
413 self.uboot.run_command(f"tftpboot {dtb_address} {dtb_filename}", timeout=120)
414 except ValueError:
415 # DTB variant not found, skip DTB loading
416 pass
418 with self.serial.pexpect() as console:
419 if self._console_debug:
420 console.logfile_read = sys.stdout.buffer
422 bootcmd = self.call("get_bootcmd")
424 self.logger.info(f"Running boot command: {bootcmd}")
425 console.send(bootcmd + "\n")
427 # if manifest has login details, we need to login
428 if manifest.spec.login.username:
429 console.expect(manifest.spec.login.login_prompt, timeout=EXPECT_TIMEOUT_DEFAULT * 3)
430 console.send(manifest.spec.login.username + "\n")
432 # if manifest has password, we need to send it
433 if manifest.spec.login.password:
434 console.expect("ssword:", timeout=EXPECT_TIMEOUT_DEFAULT)
435 console.send(manifest.spec.login.password + "\n")
437 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT * 3)
438 yield console
440 def use_dtb(self, path: PathBuf, operator: Operator | None = None):
441 """Use DTB file"""
442 if operator is None:
443 path, operator, operator_scheme = operator_for_path(path)
445 ...
447 def use_initram(self, path: PathBuf, operator: Operator | None = None):
448 """Use initramfs file"""
449 if operator is None:
450 path, operator, operator_scheme = operator_for_path(path)
452 ...
454 def use_kernel(self, path: PathBuf, operator: Operator | None = None):
455 """Use kernel file"""
456 if operator is None:
457 path, operator, operator_scheme = operator_for_path(path)
459 ...
461 @property
462 def manifest(self):
463 """Get flasher bundle manifest"""
464 if self._manifest:
465 return self._manifest
467 yaml_str = self.call("get_flasher_manifest_yaml")
468 self._manifest = FlasherBundleManifestV1Alpha1.from_string(yaml_str)
469 return self._manifest
471 def cli(self):
472 @click.group
473 def base():
474 """Software-defined flasher interface"""
475 pass
477 @base.command()
478 @click.argument("file")
479 @click.option("--partition", type=str)
480 @click.option("--os-image-checksum", help="SHA256 checksum of OS image (direct value)")
481 @click.option(
482 "--os-image-checksum-file",
483 help="File containing SHA256 checksum of OS image",
484 type=click.Path(exists=True, dir_okay=False),
485 )
486 @click.option("--force-exporter-http", is_flag=True, help="Force use of exporter HTTP")
487 @click.option("--force-flash-bundle", type=str, help="Force use of a specific flasher OCI bundle")
488 @debug_console_option
489 def flash(
490 file,
491 partition,
492 os_image_checksum,
493 os_image_checksum_file,
494 console_debug,
495 force_exporter_http,
496 force_flash_bundle,
497 ):
498 """Flash image to DUT from file"""
499 if os_image_checksum_file and os.path.exists(os_image_checksum_file):
500 with open(os_image_checksum_file) as f:
501 os_image_checksum = f.read().strip().split()[0]
502 self.logger.info(f"Read checksum from file: {os_image_checksum}")
504 self.set_console_debug(console_debug)
505 self.flash(
506 file,
507 partition=partition,
508 force_exporter_http=force_exporter_http,
509 force_flash_bundle=force_flash_bundle,
510 )
512 @base.command()
513 @debug_console_option
514 def bootloader_shell(console_debug):
515 """Start a uboot/bootloader interactive console"""
516 self.set_console_debug(console_debug)
517 with self.bootloader_shell() as serial:
518 print("=> ", end="", flush=True)
519 c = Console(serial)
520 c.run()
522 @base.command()
523 @debug_console_option
524 def busybox_shell(console_debug):
525 """Start a busybox shell"""
526 self.set_console_debug(console_debug)
527 with self.busybox_shell() as serial:
528 print("# ", end="", flush=True)
529 c = Console(serial)
530 c.run()
532 return base
535def _get_decompression_command(filename_or_url) -> str:
536 """
537 Determine the appropriate decompression command based on file extension
539 Args:
540 filename (str): Name of the file to check
542 Returns:
543 str: Decompression command ('zcat', 'xzcat', or 'cat' for uncompressed)
544 """
545 if type(filename_or_url) is PosixPath:
546 filename = filename_or_url.name
547 elif filename_or_url.startswith(("http://", "https://")):
548 filename = urlparse(filename_or_url).path.split("/")[-1]
550 filename = filename.lower()
551 if filename.endswith((".gz", ".gzip")):
552 return "zcat |"
553 elif filename.endswith(".xz"):
554 return "xzcat |"
555 return ""