Coverage for jumpstarter_driver_flashers/client.py: 19%
299 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1import hashlib
2import json
3import os
4import re
5import sys
6import threading
7import time
8from contextlib import contextmanager
9from dataclasses import dataclass
10from pathlib import Path, PosixPath
11from queue import Queue
12from urllib.parse import urlparse
14import asyncclick as click
15from jumpstarter_driver_composite.client import CompositeClient
16from jumpstarter_driver_opendal.client import FlasherClient, OpendalClient, operator_for_path
17from jumpstarter_driver_opendal.common import PathBuf
18from jumpstarter_driver_pyserial.client import Console
19from opendal import Metadata, Operator
21from jumpstarter_driver_flashers.bundle import FlasherBundleManifestV1Alpha1
23from jumpstarter.common.exceptions import ArgumentError
25debug_console_option = click.option("--console-debug", is_flag=True, help="Enable console debug mode")
28@dataclass(kw_only=True)
29class BaseFlasherClient(FlasherClient, CompositeClient):
30 """
31 Client interface for software driven flashing
33 This client provides methods to flash and dump images to a device under test (DUT)
34 """
36 def __post_init__(self):
37 super().__post_init__()
38 self._manifest = None
39 self._console_debug = False
41 def set_console_debug(self, debug: bool):
42 """Set console debug mode"""
43 self._console_debug = debug
44 # TODO: also set console debug on uboot client
46 @contextmanager
47 def busybox_shell(self):
48 """Start a context manager busybox interactive console"""
49 # Make the exporter download the bundle contents and set files in the right places
50 self.logger.info("Setting up flasher bundle files in exporter")
51 self.call("setup_flasher_bundle")
52 with self._services_up():
53 with self._busybox() as busybox:
54 busybox.send("\n\n")
55 yield self.serial
57 @contextmanager
58 def bootloader_shell(self):
59 """Start a context manager uboot/bootloader for interactive console"""
60 # Make the exporter download the bundle contents and set files in the right places
61 self.logger.info("Setting up flasher bundle files in exporter")
62 self.call("setup_flasher_bundle")
63 with self._services_up():
64 with self.uboot.reboot_to_console(debug=self._console_debug):
65 pass
66 yield self.serial
68 def flash(
69 self,
70 path: PathBuf,
71 *,
72 partition: str | None = None,
73 operator: Operator | None = None,
74 os_image_checksum: str | None = None,
75 force_exporter_http: bool = False,
76 force_flash_bundle: str | None = None,
77 ):
78 """Flash image to DUT"""
79 skip_exporter_http = False
80 image_url = ""
81 operator_scheme = None
82 if path.startswith("http://") and not force_exporter_http:
83 # busybox can handle the http from a remote directly, unless target is isolated
84 image_url = path
85 skip_exporter_http = True
86 else:
87 if operator is None:
88 path, operator, operator_scheme = operator_for_path(path)
89 image_url = self.http.get_url() + "/" + path.name
91 # start counting time for the flash operation
92 start_time = time.time()
94 if not skip_exporter_http:
95 # Create a queue to handle exceptions from the thread
96 error_queue = Queue()
98 # Start the storage write operation in the background
99 storage_thread = threading.Thread(
100 target=self._transfer_bg_thread,
101 args=(path, operator, operator_scheme, os_image_checksum, self.http.storage, error_queue),
102 name="storage_transfer",
103 )
104 storage_thread.start()
106 # Make the exporter download the bundle contents and set files in the right places
107 self.logger.info("Setting up flasher bundle files in exporter")
108 self.call("setup_flasher_bundle", force_flash_bundle)
110 # Early exit if there was an error in the background thread
111 if not skip_exporter_http and not error_queue.empty():
112 raise error_queue.get()
114 with self._services_up():
115 with self._busybox() as console:
116 manifest = self.manifest
117 target = partition or manifest.spec.default_target
118 if not target:
119 raise ArgumentError("No partition or default target specified")
121 target_device = self._get_target_device(target, manifest, console)
123 self.logger.info(f"Using target block device: {target_device}")
125 # Preflash commands are executed before the flash operation
126 # generally used to clean up boot entries in existing devices
127 for preflash_command in manifest.spec.preflash_commands:
128 self.logger.info(f"Running preflash command: {preflash_command}")
129 console.sendline(preflash_command)
130 console.expect(manifest.spec.login.prompt, timeout=5)
132 # make sure that the device is connected to the network and has an IP address
133 console.sendline("udhcpc")
134 console.expect(manifest.spec.login.prompt, timeout=10)
136 if not skip_exporter_http:
137 # Wait for the storage write operation to complete before proceeding
138 self.logger.info("Waiting until the http image preparation in storage is completed")
139 storage_thread.join()
141 # Check if there were any exceptions in the background thread
142 if not error_queue.empty():
143 raise error_queue.get()
145 self._flash_with_progress(console, manifest, path, image_url, target_device)
147 total_time = time.time() - start_time
148 # total time in minutes:seconds
149 minutes, seconds = divmod(total_time, 60)
150 self.logger.info(f"Flashing completed in {int(minutes)}m {int(seconds):02d}s")
151 console.sendline("reboot")
152 time.sleep(2)
153 self.logger.info("Powering off target")
154 self.power.off()
156 def _flash_with_progress(self, console, manifest, path, image_url, target_path):
157 """Flash image to target device with progress monitoring.
159 Args:
160 console: Console object for device interaction
161 manifest: Flasher manifest containing target definitions
162 path: Path to the source image
163 image_url: URL to download the image from
164 target_path: Target device path to flash to
165 """
166 # Flash the image
167 decompress_cmd = _get_decompression_command(path)
168 flash_cmd = (
169 f'( wget -q -O - "{image_url}" | '
170 f"{decompress_cmd} "
171 f"dd of={target_path} bs=64k iflag=fullblock oflag=direct) &"
172 )
173 console.sendline(flash_cmd)
174 console.expect(manifest.spec.login.prompt, timeout=60)
176 console.sendline("pidof dd")
177 console.expect(manifest.spec.login.prompt, timeout=3)
178 dd_pid = console.before.decode(errors="ignore").splitlines()[1].strip()
180 # Initialize progress tracking variables
181 last_pos = 0
182 last_time = time.time()
184 while True:
185 console.sendline(f"cat /proc/{dd_pid}/fdinfo/1")
186 console.expect(manifest.spec.login.prompt, timeout=3)
187 if "No such file or directory" in console.before.decode(errors="ignore"):
188 break
189 data = console.before.decode(errors="ignore")
190 match = re.search(r"pos:\s+(\d+)", data)
191 if match:
192 current_bytes = int(match.group(1))
193 current_time = time.time()
194 elapsed = current_time - last_time
196 if elapsed >= 1.0: # Update speed every second
197 bytes_diff = current_bytes - last_pos
198 speed_mb = (bytes_diff / (1024 * 1024)) / elapsed
199 total_mb = current_bytes / (1024 * 1024)
200 self.logger.info(f"Flash progress: {total_mb:.2f} MB, Speed: {speed_mb:.2f} MB/s")
202 last_pos = current_bytes
203 last_time = current_time
204 time.sleep(1)
205 self.logger.info("Flushing buffers")
206 console.sendline("sync")
207 console.expect(manifest.spec.login.prompt, timeout=1200)
209 def _get_target_device(self, target: str, manifest: FlasherBundleManifestV1Alpha1, console) -> str:
210 """Get the target device path from the manifest, resolving block devices if needed.
212 Args:
213 target: Target name from manifest
214 manifest: Flasher manifest containing target definitions
215 console: Console object for device interaction
217 Returns:
218 Resolved target device path
220 Raises:
221 ArgumentError: If target is not found in manifest
222 """
223 target_path = manifest.spec.targets.get(target)
224 if target_path is None:
225 raise ArgumentError(f"Target {target} not found in manifest")
227 if target_path.startswith("/sys/class/block#"):
228 target_path = self._lookup_block_device(console, manifest.spec.login.prompt, target_path.split("#")[1])
230 return target_path
232 def _transfer_bg_thread(
233 self,
234 src_path: PathBuf,
235 src_operator: Operator,
236 src_operator_scheme: str,
237 known_hash: str | None,
238 to_storage: OpendalClient,
239 error_queue,
240 ):
241 """Transfer image to storage in the background
242 Args:
243 src_path: Path to the source image
244 src_operator: Operator to read the source image
245 to_storage: Storage operator to write the image to
246 error_queue: Queue to put exceptions in if any
247 known_hash: Known hash of the image
248 """
249 self.logger.info(f"Writing image to storage in the background: {src_path}")
250 try:
251 filename = Path(src_path).name if isinstance(src_path, (str, os.PathLike)) else src_path.name
253 if src_operator_scheme == "fs":
254 file_hash = self._sha256_file(src_operator, src_path)
255 self.logger.info(f"Hash of {filename} is {file_hash}")
256 else:
257 file_hash = known_hash
258 self.logger.info(f"Using provided hash for {filename}: {known_hash}")
260 if file_hash and to_storage.exists(filename):
261 to_storage_hash = to_storage.hash(filename)
262 self.logger.info(f"Hash of existing file in storage: {to_storage_hash}")
264 if to_storage_hash == file_hash:
265 self.logger.info(f"Image {filename} already exists in storage with matching hash, skipping")
266 return
267 else:
268 self.logger.info(f"Image {filename} exists in storage but hash differs, will overwrite")
270 self.logger.info(f"Uploading image to storage: {filename}")
271 to_storage.write_from_path(filename, src_path, src_operator)
273 metadata, metadata_json = self._create_metadata_and_json(src_operator, src_path, file_hash)
274 metadata_file = filename + ".metadata"
275 to_storage.write_bytes(metadata_file, metadata_json.encode(errors="ignore"))
277 self.logger.info(f"Image written to storage: {filename}")
279 except Exception as e:
280 self.logger.error(f"Error writing image to storage: {e}")
281 error_queue.put(e)
282 raise
284 def _sha256_file(self, src_operator, src_path) -> str:
285 m = hashlib.sha256()
286 with src_operator.open(src_path, "rb") as f:
287 while True:
288 data = f.read(size=65536)
289 if len(data) == 0:
290 break
291 m.update(data)
293 return m.hexdigest()
295 def _create_metadata_and_json(self, src_operator, src_path, file_hash=None) -> tuple[Metadata, str]:
296 """Create a metadata json string from a metadata object"""
297 metadata = src_operator.stat(src_path)
298 metadata_dict = {
299 "path": str(src_path),
300 "content_length": metadata.content_length,
301 "etag": metadata.etag,
302 }
304 if file_hash:
305 metadata_dict["hash"] = file_hash
307 return metadata, json.dumps(metadata_dict)
309 def _lookup_block_device(self, console, prompt, address: str) -> str:
310 """Lookup block device for a given address.
311 Sometimes targets don't get assigned block device numbers in a predictable way,
312 so we need to lookup the block device by address.
313 """
314 console.send(f"ls -l /sys/class/block/ | grep {address} | head -n 1" + "\n")
315 console.expect(prompt, timeout=5)
316 # This produces an output like:
317 # ls /sys/class/block/ -la | grep 4fb0000
318 # lrwxrwxrwx 1 root root 0 Jan 1
319 # 00:00 mmcblk1 -> ../../devices/platform/bus@100000/4fb0000.mmc/mmc_host/mmc1/mmc1:aaaa/block/mmcblk1
320 output = console.before.decode(errors="ignore")
321 match = re.search(r"\s(\w+)\s->", output)
322 if match:
323 return "/dev/" + match.group(1)
324 else:
325 raise ArgumentError(f"No block device found for address {address}, output was: {output}")
327 def dump(
328 self,
329 path: PathBuf,
330 *,
331 partition: str | None = None,
332 operator: Operator | None = None,
333 ):
334 """Dump image from DUT"""
335 raise NotImplementedError("Dump is not implemented for this driver yet")
337 def _filename(self, path: PathBuf) -> str:
338 """Extract filename from url or path"""
339 if path.startswith(("http://", "https://")):
340 return urlparse(path).path.split("/")[-1]
341 else:
342 return Path(path).name
344 def _upload_artifact(self, storage, path: PathBuf, operator: Operator):
345 """Upload artifact to storage"""
346 filename = self._filename(path)
347 if storage.exists(filename):
348 # TODO: check hash for existing files
349 self.logger.info(f"Artifact {filename} already exists in storage, skipping")
350 storage.write_from_path(filename, path, operator=operator)
352 @contextmanager
353 def _services_up(self):
354 """Make sure that the http and tftp services are up an running in this context"""
355 try:
356 self.http.start()
357 self.tftp.start()
358 yield
359 finally:
360 self.http.stop()
361 self.tftp.stop()
363 def _generate_uboot_env(self):
364 """Generate a uboot environment dictionary, may need specific overrides for different targets"""
365 tftp_host = self.tftp.get_host()
366 return {
367 "serverip": tftp_host,
368 }
370 @contextmanager
371 def _busybox(self):
372 """Start a busybox shell.
374 This is a helper context manager that boots the device into uboot and returns a console object.
375 """
377 # make sure that the device is booted into the uboot console
378 with self.uboot.reboot_to_console(debug=self._console_debug):
379 # run dhcp discovery and gather details useful for later
380 self._dhcp_details = self.uboot.setup_dhcp()
381 self.logger.info(f"discovered dhcp details: {self._dhcp_details}")
383 # configure the environment necessary
384 env = self._generate_uboot_env()
385 self.uboot.set_env_dict(env)
387 # load any necessary files to RAM from the tftp storage
388 manifest = self.manifest
389 kernel_filename = Path(manifest.get_kernel_file()).name
390 kernel_address = manifest.get_kernel_address()
392 self.uboot.run_command(f"tftpboot {kernel_address} {kernel_filename}", timeout=120)
394 if manifest.get_initram_file():
395 initram_filename = Path(manifest.get_initram_file()).name
396 initram_address = manifest.get_initram_address()
397 self.uboot.run_command(f"tftpboot {initram_address} {initram_filename}", timeout=120)
399 if manifest.get_dtb_file():
400 dtb_filename = Path(manifest.get_dtb_file()).name
401 dtb_address = manifest.get_dtb_address()
402 self.uboot.run_command(f"tftpboot {dtb_address} {dtb_filename}", timeout=120)
404 with self.serial.pexpect() as console:
405 if self._console_debug:
406 console.logfile_read = sys.stdout.buffer
408 self.logger.info(f"Running boot command: {manifest.spec.bootcmd}")
409 console.send(manifest.spec.bootcmd + "\n")
411 # if manifest has login details, we need to login
412 if manifest.spec.login.username:
413 console.expect(manifest.spec.login.login_prompt, timeout=120)
414 console.send(manifest.spec.login.username + "\n")
416 # if manifest has password, we need to send it
417 if manifest.spec.login.password:
418 console.expect("ssword:", timeout=30)
419 console.send(manifest.spec.login.password + "\n")
421 console.expect(manifest.spec.login.prompt, timeout=120)
422 yield console
424 def use_dtb(self, path: PathBuf, operator: Operator | None = None):
425 """Use DTB file"""
426 if operator is None:
427 path, operator, operator_scheme = operator_for_path(path)
429 ...
431 def use_initram(self, path: PathBuf, operator: Operator | None = None):
432 """Use initramfs file"""
433 if operator is None:
434 path, operator, operator_scheme = operator_for_path(path)
436 ...
438 def use_kernel(self, path: PathBuf, operator: Operator | None = None):
439 """Use kernel file"""
440 if operator is None:
441 path, operator, operator_scheme = operator_for_path(path)
443 ...
445 @property
446 def manifest(self):
447 """Get flasher bundle manifest"""
448 if self._manifest:
449 return self._manifest
451 yaml_str = self.call("get_flasher_manifest_yaml")
452 self._manifest = FlasherBundleManifestV1Alpha1.from_string(yaml_str)
453 return self._manifest
455 def cli(self):
456 @click.group
457 def base():
458 """Software-defined flasher interface"""
459 pass
461 @base.command()
462 @click.argument("file")
463 @click.option("--partition", type=str)
464 @click.option("--os-image-checksum", help="SHA256 checksum of OS image (direct value)")
465 @click.option(
466 "--os-image-checksum-file",
467 help="File containing SHA256 checksum of OS image",
468 type=click.Path(exists=True, dir_okay=False),
469 )
470 @click.option("--force-exporter-http", is_flag=True, help="Force use of exporter HTTP")
471 @click.option("--force-flash-bundle", type=str, help="Force use of a specific flasher OCI bundle")
472 @debug_console_option
473 def flash(
474 file,
475 partition,
476 os_image_checksum,
477 os_image_checksum_file,
478 console_debug,
479 force_exporter_http,
480 force_flash_bundle,
481 ):
482 """Flash image to DUT from file"""
483 if os_image_checksum_file and os.path.exists(os_image_checksum_file):
484 with open(os_image_checksum_file) as f:
485 os_image_checksum = f.read().strip().split()[0]
486 self.logger.info(f"Read checksum from file: {os_image_checksum}")
488 self.set_console_debug(console_debug)
489 self.flash(
490 file,
491 partition=partition,
492 force_exporter_http=force_exporter_http,
493 force_flash_bundle=force_flash_bundle,
494 )
496 @base.command()
497 @debug_console_option
498 def bootloader_shell(console_debug):
499 """Start a uboot/bootloader interactive console"""
500 self.set_console_debug(console_debug)
501 with self.bootloader_shell() as serial:
502 print("=> ", end="", flush=True)
503 c = Console(serial)
504 c.run()
506 @base.command()
507 @debug_console_option
508 def busybox_shell(console_debug):
509 """Start a busybox shell"""
510 self.set_console_debug(console_debug)
511 with self.busybox_shell() as serial:
512 print("# ", end="", flush=True)
513 c = Console(serial)
514 c.run()
516 return base
519def _get_decompression_command(filename_or_url) -> str:
520 """
521 Determine the appropriate decompression command based on file extension
523 Args:
524 filename (str): Name of the file to check
526 Returns:
527 str: Decompression command ('zcat', 'xzcat', or 'cat' for uncompressed)
528 """
529 if type(filename_or_url) is PosixPath:
530 filename = filename_or_url.name
531 elif filename_or_url.startswith(("http://", "https://")):
532 filename = urlparse(filename_or_url).path.split("/")[-1]
534 filename = filename.lower()
535 if filename.endswith((".gz", ".gzip")):
536 return "zcat |"
537 elif filename.endswith(".xz"):
538 return "xzcat |"
539 return ""