Coverage for jumpstarter_driver_flashers/client.py: 19%

312 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-30 18:45 +0200

1import hashlib 

2import json 

3import os 

4import re 

5import sys 

6import threading 

7import time 

8from contextlib import contextmanager 

9from dataclasses import dataclass 

10from pathlib import Path, PosixPath 

11from queue import Queue 

12from urllib.parse import urlparse 

13 

14import click 

15from jumpstarter_driver_composite.client import CompositeClient 

16from jumpstarter_driver_opendal.client import FlasherClient, OpendalClient, operator_for_path 

17from jumpstarter_driver_opendal.common import PathBuf 

18from jumpstarter_driver_pyserial.client import Console 

19from opendal import Metadata, Operator 

20 

21from jumpstarter_driver_flashers.bundle import FlasherBundleManifestV1Alpha1 

22 

23from jumpstarter.common.exceptions import ArgumentError 

24 

25debug_console_option = click.option("--console-debug", is_flag=True, help="Enable console debug mode") 

26 

27EXPECT_TIMEOUT_DEFAULT = 60 

28EXPECT_TIMEOUT_SYNC = 1200 

29 

30 

31@dataclass(kw_only=True) 

32class BaseFlasherClient(FlasherClient, CompositeClient): 

33 """ 

34 Client interface for software driven flashing 

35 

36 This client provides methods to flash and dump images to a device under test (DUT) 

37 """ 

38 

39 def __post_init__(self): 

40 super().__post_init__() 

41 self._manifest = None 

42 self._console_debug = False 

43 

44 def set_console_debug(self, debug: bool): 

45 """Set console debug mode""" 

46 self._console_debug = debug 

47 # TODO: also set console debug on uboot client 

48 

49 @contextmanager 

50 def busybox_shell(self): 

51 """Start a context manager busybox interactive console""" 

52 # Make the exporter download the bundle contents and set files in the right places 

53 self.logger.info("Setting up flasher bundle files in exporter") 

54 self.call("setup_flasher_bundle") 

55 with self._services_up(): 

56 with self._busybox() as busybox: 

57 busybox.send("\n\n") 

58 yield self.serial 

59 

60 @contextmanager 

61 def bootloader_shell(self): 

62 """Start a context manager uboot/bootloader for interactive console""" 

63 # Make the exporter download the bundle contents and set files in the right places 

64 self.logger.info("Setting up flasher bundle files in exporter") 

65 self.call("setup_flasher_bundle") 

66 with self._services_up(): 

67 with self.uboot.reboot_to_console(debug=self._console_debug): 

68 pass 

69 yield self.serial 

70 

71 def flash( 

72 self, 

73 path: PathBuf, 

74 *, 

75 partition: str | None = None, 

76 operator: Operator | None = None, 

77 os_image_checksum: str | None = None, 

78 force_exporter_http: bool = False, 

79 force_flash_bundle: str | None = None, 

80 ): 

81 """Flash image to DUT""" 

82 skip_exporter_http = False 

83 image_url = "" 

84 operator_scheme = None 

85 if path.startswith("http://") and not force_exporter_http: 

86 # busybox can handle the http from a remote directly, unless target is isolated 

87 image_url = path 

88 skip_exporter_http = True 

89 else: 

90 if operator is None: 

91 path, operator, operator_scheme = operator_for_path(path) 

92 image_url = self.http.get_url() + "/" + path.name 

93 

94 # start counting time for the flash operation 

95 start_time = time.time() 

96 

97 if not skip_exporter_http: 

98 # Create a queue to handle exceptions from the thread 

99 error_queue = Queue() 

100 

101 # Start the storage write operation in the background 

102 storage_thread = threading.Thread( 

103 target=self._transfer_bg_thread, 

104 args=(path, operator, operator_scheme, os_image_checksum, self.http.storage, error_queue), 

105 name="storage_transfer", 

106 ) 

107 storage_thread.start() 

108 

109 # Make the exporter download the bundle contents and set files in the right places 

110 self.logger.info("Setting up flasher bundle files in exporter") 

111 self.call("setup_flasher_bundle", force_flash_bundle) 

112 

113 # Early exit if there was an error in the background thread 

114 if not skip_exporter_http and not error_queue.empty(): 

115 raise error_queue.get() 

116 

117 with self._services_up(): 

118 with self._busybox() as console: 

119 manifest = self.manifest 

120 target = partition or self.call("get_default_target") or manifest.spec.default_target 

121 if not target: 

122 raise ArgumentError("No partition or default target specified") 

123 

124 target_device = self._get_target_device(target, manifest, console) 

125 

126 self.logger.info(f"Using target block device: {target_device}") 

127 console.sendline(f"export dhcp_addr={self._dhcp_details.ip_address}") 

128 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

129 console.sendline(f"export gw_addr={self._dhcp_details.gateway}") 

130 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

131 

132 # Preflash commands are executed before the flash operation 

133 # generally used to clean up boot entries in existing devices 

134 for preflash_command in manifest.spec.preflash_commands: 

135 self.logger.info(f"Running preflash command: {preflash_command}") 

136 console.sendline(preflash_command) 

137 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

138 

139 # make sure that the device is connected to the network and has an IP address 

140 console.sendline("udhcpc") 

141 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

142 

143 if not skip_exporter_http: 

144 # Wait for the storage write operation to complete before proceeding 

145 self.logger.info("Waiting until the http image preparation in storage is completed") 

146 storage_thread.join() 

147 

148 # Check if there were any exceptions in the background thread 

149 if not error_queue.empty(): 

150 raise error_queue.get() 

151 

152 self._flash_with_progress(console, manifest, path, image_url, target_device) 

153 

154 total_time = time.time() - start_time 

155 # total time in minutes:seconds 

156 minutes, seconds = divmod(total_time, 60) 

157 self.logger.info(f"Flashing completed in {int(minutes)}m {int(seconds):02d}s") 

158 console.sendline("reboot") 

159 time.sleep(2) 

160 self.logger.info("Powering off target") 

161 self.power.off() 

162 

163 def _flash_with_progress(self, console, manifest, path, image_url, target_path): 

164 """Flash image to target device with progress monitoring. 

165 

166 Args: 

167 console: Console object for device interaction 

168 manifest: Flasher manifest containing target definitions 

169 path: Path to the source image 

170 image_url: URL to download the image from 

171 target_path: Target device path to flash to 

172 """ 

173 # Flash the image 

174 decompress_cmd = _get_decompression_command(path) 

175 flash_cmd = ( 

176 f'( wget -q -O - "{image_url}" | ' 

177 f"{decompress_cmd} " 

178 f"dd of={target_path} bs=64k iflag=fullblock oflag=direct) &" 

179 ) 

180 console.sendline(flash_cmd) 

181 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT * 2) 

182 

183 console.sendline("pidof dd") 

184 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

185 dd_pid = console.before.decode(errors="ignore").splitlines()[1].strip() 

186 

187 # Initialize progress tracking variables 

188 last_pos = 0 

189 last_time = time.time() 

190 

191 while True: 

192 console.sendline(f"cat /proc/{dd_pid}/fdinfo/1") 

193 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

194 if "No such file or directory" in console.before.decode(errors="ignore"): 

195 break 

196 data = console.before.decode(errors="ignore") 

197 match = re.search(r"pos:\s+(\d+)", data) 

198 if match: 

199 current_bytes = int(match.group(1)) 

200 current_time = time.time() 

201 elapsed = current_time - last_time 

202 

203 if elapsed >= 1.0: # Update speed every second 

204 bytes_diff = current_bytes - last_pos 

205 speed_mb = (bytes_diff / (1024 * 1024)) / elapsed 

206 total_mb = current_bytes / (1024 * 1024) 

207 self.logger.info(f"Flash progress: {total_mb:.2f} MB, Speed: {speed_mb:.2f} MB/s") 

208 

209 last_pos = current_bytes 

210 last_time = current_time 

211 time.sleep(1) 

212 self.logger.info("Flushing buffers") 

213 console.sendline("sync") 

214 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_SYNC) 

215 

216 def _get_target_device(self, target: str, manifest: FlasherBundleManifestV1Alpha1, console) -> str: 

217 """Get the target device path from the manifest, resolving block devices if needed. 

218 

219 Args: 

220 target: Target name from manifest 

221 manifest: Flasher manifest containing target definitions 

222 console: Console object for device interaction 

223 

224 Returns: 

225 Resolved target device path 

226 

227 Raises: 

228 ArgumentError: If target is not found in manifest 

229 """ 

230 target_path = manifest.spec.targets.get(target) 

231 if target_path is None: 

232 raise ArgumentError(f"Target {target} not found in manifest") 

233 

234 if target_path.startswith("/sys/class/block#"): 

235 target_path = self._lookup_block_device(console, manifest.spec.login.prompt, target_path.split("#")[1]) 

236 

237 return target_path 

238 

239 def _transfer_bg_thread( 

240 self, 

241 src_path: PathBuf, 

242 src_operator: Operator, 

243 src_operator_scheme: str, 

244 known_hash: str | None, 

245 to_storage: OpendalClient, 

246 error_queue, 

247 ): 

248 """Transfer image to storage in the background 

249 Args: 

250 src_path: Path to the source image 

251 src_operator: Operator to read the source image 

252 to_storage: Storage operator to write the image to 

253 error_queue: Queue to put exceptions in if any 

254 known_hash: Known hash of the image 

255 """ 

256 self.logger.info(f"Writing image to storage in the background: {src_path}") 

257 try: 

258 filename = Path(src_path).name if isinstance(src_path, (str, os.PathLike)) else src_path.name 

259 

260 if src_operator_scheme == "fs": 

261 file_hash = self._sha256_file(src_operator, src_path) 

262 self.logger.info(f"Hash of {filename} is {file_hash}") 

263 else: 

264 file_hash = known_hash 

265 self.logger.info(f"Using provided hash for {filename}: {known_hash}") 

266 

267 if file_hash and to_storage.exists(filename): 

268 to_storage_hash = to_storage.hash(filename) 

269 self.logger.info(f"Hash of existing file in storage: {to_storage_hash}") 

270 

271 if to_storage_hash == file_hash: 

272 self.logger.info(f"Image {filename} already exists in storage with matching hash, skipping") 

273 return 

274 else: 

275 self.logger.info(f"Image {filename} exists in storage but hash differs, will overwrite") 

276 

277 self.logger.info(f"Uploading image to storage: {filename}") 

278 to_storage.write_from_path(filename, src_path, src_operator) 

279 

280 metadata, metadata_json = self._create_metadata_and_json(src_operator, src_path, file_hash) 

281 metadata_file = filename + ".metadata" 

282 to_storage.write_bytes(metadata_file, metadata_json.encode(errors="ignore")) 

283 

284 self.logger.info(f"Image written to storage: {filename}") 

285 

286 except Exception as e: 

287 self.logger.error(f"Error writing image to storage: {e}") 

288 error_queue.put(e) 

289 raise 

290 

291 def _sha256_file(self, src_operator, src_path) -> str: 

292 m = hashlib.sha256() 

293 with src_operator.open(src_path, "rb") as f: 

294 while True: 

295 data = f.read(size=65536) 

296 if len(data) == 0: 

297 break 

298 m.update(data) 

299 

300 return m.hexdigest() 

301 

302 def _create_metadata_and_json(self, src_operator, src_path, file_hash=None) -> tuple[Metadata, str]: 

303 """Create a metadata json string from a metadata object""" 

304 metadata = src_operator.stat(src_path) 

305 metadata_dict = { 

306 "path": str(src_path), 

307 "content_length": metadata.content_length, 

308 "etag": metadata.etag, 

309 } 

310 

311 if file_hash: 

312 metadata_dict["hash"] = file_hash 

313 

314 return metadata, json.dumps(metadata_dict) 

315 

316 def _lookup_block_device(self, console, prompt, address: str) -> str: 

317 """Lookup block device for a given address. 

318 Sometimes targets don't get assigned block device numbers in a predictable way, 

319 so we need to lookup the block device by address. 

320 """ 

321 console.send(f"ls -l /sys/class/block/ | grep {address} | head -n 1" + "\n") 

322 console.expect(prompt, timeout=EXPECT_TIMEOUT_DEFAULT) 

323 # This produces an output like: 

324 # ls /sys/class/block/ -la | grep 4fb0000 

325 # lrwxrwxrwx 1 root root 0 Jan 1 

326 # 00:00 mmcblk1 -> ../../devices/platform/bus@100000/4fb0000.mmc/mmc_host/mmc1/mmc1:aaaa/block/mmcblk1 

327 output = console.before.decode(errors="ignore") 

328 match = re.search(r"\s(\w+)\s->", output) 

329 if match: 

330 return "/dev/" + match.group(1) 

331 else: 

332 raise ArgumentError(f"No block device found for address {address}, output was: {output}") 

333 

334 def dump( 

335 self, 

336 path: PathBuf, 

337 *, 

338 partition: str | None = None, 

339 operator: Operator | None = None, 

340 ): 

341 """Dump image from DUT""" 

342 raise NotImplementedError("Dump is not implemented for this driver yet") 

343 

344 def _filename(self, path: PathBuf) -> str: 

345 """Extract filename from url or path""" 

346 if path.startswith(("http://", "https://")): 

347 return urlparse(path).path.split("/")[-1] 

348 else: 

349 return Path(path).name 

350 

351 def _upload_artifact(self, storage, path: PathBuf, operator: Operator): 

352 """Upload artifact to storage""" 

353 filename = self._filename(path) 

354 if storage.exists(filename): 

355 # TODO: check hash for existing files 

356 self.logger.info(f"Artifact {filename} already exists in storage, skipping") 

357 storage.write_from_path(filename, path, operator=operator) 

358 

359 @contextmanager 

360 def _services_up(self): 

361 """Make sure that the http and tftp services are up an running in this context""" 

362 try: 

363 self.http.start() 

364 self.tftp.start() 

365 yield 

366 finally: 

367 self.http.stop() 

368 self.tftp.stop() 

369 

370 def _generate_uboot_env(self): 

371 """Generate a uboot environment dictionary, may need specific overrides for different targets""" 

372 tftp_host = self.tftp.get_host() 

373 return { 

374 "serverip": tftp_host, 

375 } 

376 

377 @contextmanager 

378 def _busybox(self): 

379 """Start a busybox shell. 

380 

381 This is a helper context manager that boots the device into uboot and returns a console object. 

382 """ 

383 

384 # make sure that the device is booted into the uboot console 

385 with self.uboot.reboot_to_console(debug=self._console_debug): 

386 # run dhcp discovery and gather details useful for later 

387 self._dhcp_details = self.uboot.setup_dhcp() 

388 self.logger.info(f"discovered dhcp details: {self._dhcp_details}") 

389 

390 # configure the environment necessary 

391 env = self._generate_uboot_env() 

392 self.uboot.set_env_dict(env) 

393 

394 # load any necessary files to RAM from the tftp storage 

395 manifest = self.manifest 

396 kernel_filename = Path(manifest.get_kernel_file()).name 

397 kernel_address = manifest.get_kernel_address() 

398 

399 self.uboot.run_command(f"tftpboot {kernel_address} {kernel_filename}", timeout=120) 

400 

401 if manifest.get_initram_file(): 

402 initram_filename = Path(manifest.get_initram_file()).name 

403 initram_address = manifest.get_initram_address() 

404 if initram_address: 

405 self.uboot.run_command(f"tftpboot {initram_address} {initram_filename}", timeout=120) 

406 

407 try: 

408 dtb_file = manifest.get_dtb_file() 

409 if dtb_file: 

410 dtb_filename = Path(dtb_file).name 

411 dtb_address = manifest.get_dtb_address() 

412 if dtb_address: 

413 self.uboot.run_command(f"tftpboot {dtb_address} {dtb_filename}", timeout=120) 

414 except ValueError: 

415 # DTB variant not found, skip DTB loading 

416 pass 

417 

418 with self.serial.pexpect() as console: 

419 if self._console_debug: 

420 console.logfile_read = sys.stdout.buffer 

421 

422 bootcmd = self.call("get_bootcmd") 

423 

424 self.logger.info(f"Running boot command: {bootcmd}") 

425 console.send(bootcmd + "\n") 

426 

427 # if manifest has login details, we need to login 

428 if manifest.spec.login.username: 

429 console.expect(manifest.spec.login.login_prompt, timeout=EXPECT_TIMEOUT_DEFAULT * 3) 

430 console.send(manifest.spec.login.username + "\n") 

431 

432 # if manifest has password, we need to send it 

433 if manifest.spec.login.password: 

434 console.expect("ssword:", timeout=EXPECT_TIMEOUT_DEFAULT) 

435 console.send(manifest.spec.login.password + "\n") 

436 

437 console.expect(manifest.spec.login.prompt, timeout=EXPECT_TIMEOUT_DEFAULT * 3) 

438 yield console 

439 

440 def use_dtb(self, path: PathBuf, operator: Operator | None = None): 

441 """Use DTB file""" 

442 if operator is None: 

443 path, operator, operator_scheme = operator_for_path(path) 

444 

445 ... 

446 

447 def use_initram(self, path: PathBuf, operator: Operator | None = None): 

448 """Use initramfs file""" 

449 if operator is None: 

450 path, operator, operator_scheme = operator_for_path(path) 

451 

452 ... 

453 

454 def use_kernel(self, path: PathBuf, operator: Operator | None = None): 

455 """Use kernel file""" 

456 if operator is None: 

457 path, operator, operator_scheme = operator_for_path(path) 

458 

459 ... 

460 

461 @property 

462 def manifest(self): 

463 """Get flasher bundle manifest""" 

464 if self._manifest: 

465 return self._manifest 

466 

467 yaml_str = self.call("get_flasher_manifest_yaml") 

468 self._manifest = FlasherBundleManifestV1Alpha1.from_string(yaml_str) 

469 return self._manifest 

470 

471 def cli(self): 

472 @click.group 

473 def base(): 

474 """Software-defined flasher interface""" 

475 pass 

476 

477 @base.command() 

478 @click.argument("file") 

479 @click.option("--partition", type=str) 

480 @click.option("--os-image-checksum", help="SHA256 checksum of OS image (direct value)") 

481 @click.option( 

482 "--os-image-checksum-file", 

483 help="File containing SHA256 checksum of OS image", 

484 type=click.Path(exists=True, dir_okay=False), 

485 ) 

486 @click.option("--force-exporter-http", is_flag=True, help="Force use of exporter HTTP") 

487 @click.option("--force-flash-bundle", type=str, help="Force use of a specific flasher OCI bundle") 

488 @debug_console_option 

489 def flash( 

490 file, 

491 partition, 

492 os_image_checksum, 

493 os_image_checksum_file, 

494 console_debug, 

495 force_exporter_http, 

496 force_flash_bundle, 

497 ): 

498 """Flash image to DUT from file""" 

499 if os_image_checksum_file and os.path.exists(os_image_checksum_file): 

500 with open(os_image_checksum_file) as f: 

501 os_image_checksum = f.read().strip().split()[0] 

502 self.logger.info(f"Read checksum from file: {os_image_checksum}") 

503 

504 self.set_console_debug(console_debug) 

505 self.flash( 

506 file, 

507 partition=partition, 

508 force_exporter_http=force_exporter_http, 

509 force_flash_bundle=force_flash_bundle, 

510 ) 

511 

512 @base.command() 

513 @debug_console_option 

514 def bootloader_shell(console_debug): 

515 """Start a uboot/bootloader interactive console""" 

516 self.set_console_debug(console_debug) 

517 with self.bootloader_shell() as serial: 

518 print("=> ", end="", flush=True) 

519 c = Console(serial) 

520 c.run() 

521 

522 @base.command() 

523 @debug_console_option 

524 def busybox_shell(console_debug): 

525 """Start a busybox shell""" 

526 self.set_console_debug(console_debug) 

527 with self.busybox_shell() as serial: 

528 print("# ", end="", flush=True) 

529 c = Console(serial) 

530 c.run() 

531 

532 return base 

533 

534 

535def _get_decompression_command(filename_or_url) -> str: 

536 """ 

537 Determine the appropriate decompression command based on file extension 

538 

539 Args: 

540 filename (str): Name of the file to check 

541 

542 Returns: 

543 str: Decompression command ('zcat', 'xzcat', or 'cat' for uncompressed) 

544 """ 

545 if type(filename_or_url) is PosixPath: 

546 filename = filename_or_url.name 

547 elif filename_or_url.startswith(("http://", "https://")): 

548 filename = urlparse(filename_or_url).path.split("/")[-1] 

549 

550 filename = filename.lower() 

551 if filename.endswith((".gz", ".gzip")): 

552 return "zcat |" 

553 elif filename.endswith(".xz"): 

554 return "xzcat |" 

555 return ""