Coverage for jumpstarter_driver_flashers/client.py: 19%

299 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 20:29 +0000

1import hashlib 

2import json 

3import os 

4import re 

5import sys 

6import threading 

7import time 

8from contextlib import contextmanager 

9from dataclasses import dataclass 

10from pathlib import Path, PosixPath 

11from queue import Queue 

12from urllib.parse import urlparse 

13 

14import asyncclick as click 

15from jumpstarter_driver_composite.client import CompositeClient 

16from jumpstarter_driver_opendal.client import FlasherClient, OpendalClient, operator_for_path 

17from jumpstarter_driver_opendal.common import PathBuf 

18from jumpstarter_driver_pyserial.client import Console 

19from opendal import Metadata, Operator 

20 

21from jumpstarter_driver_flashers.bundle import FlasherBundleManifestV1Alpha1 

22 

23from jumpstarter.common.exceptions import ArgumentError 

24 

25debug_console_option = click.option("--console-debug", is_flag=True, help="Enable console debug mode") 

26 

27 

28@dataclass(kw_only=True) 

29class BaseFlasherClient(FlasherClient, CompositeClient): 

30 """ 

31 Client interface for software driven flashing 

32 

33 This client provides methods to flash and dump images to a device under test (DUT) 

34 """ 

35 

36 def __post_init__(self): 

37 super().__post_init__() 

38 self._manifest = None 

39 self._console_debug = False 

40 

41 def set_console_debug(self, debug: bool): 

42 """Set console debug mode""" 

43 self._console_debug = debug 

44 # TODO: also set console debug on uboot client 

45 

46 @contextmanager 

47 def busybox_shell(self): 

48 """Start a context manager busybox interactive console""" 

49 # Make the exporter download the bundle contents and set files in the right places 

50 self.logger.info("Setting up flasher bundle files in exporter") 

51 self.call("setup_flasher_bundle") 

52 with self._services_up(): 

53 with self._busybox() as busybox: 

54 busybox.send("\n\n") 

55 yield self.serial 

56 

57 @contextmanager 

58 def bootloader_shell(self): 

59 """Start a context manager uboot/bootloader for interactive console""" 

60 # Make the exporter download the bundle contents and set files in the right places 

61 self.logger.info("Setting up flasher bundle files in exporter") 

62 self.call("setup_flasher_bundle") 

63 with self._services_up(): 

64 with self.uboot.reboot_to_console(debug=self._console_debug): 

65 pass 

66 yield self.serial 

67 

68 def flash( 

69 self, 

70 path: PathBuf, 

71 *, 

72 partition: str | None = None, 

73 operator: Operator | None = None, 

74 os_image_checksum: str | None = None, 

75 force_exporter_http: bool = False, 

76 force_flash_bundle: str | None = None, 

77 ): 

78 """Flash image to DUT""" 

79 skip_exporter_http = False 

80 image_url = "" 

81 operator_scheme = None 

82 if path.startswith("http://") and not force_exporter_http: 

83 # busybox can handle the http from a remote directly, unless target is isolated 

84 image_url = path 

85 skip_exporter_http = True 

86 else: 

87 if operator is None: 

88 path, operator, operator_scheme = operator_for_path(path) 

89 image_url = self.http.get_url() + "/" + path.name 

90 

91 # start counting time for the flash operation 

92 start_time = time.time() 

93 

94 if not skip_exporter_http: 

95 # Create a queue to handle exceptions from the thread 

96 error_queue = Queue() 

97 

98 # Start the storage write operation in the background 

99 storage_thread = threading.Thread( 

100 target=self._transfer_bg_thread, 

101 args=(path, operator, operator_scheme, os_image_checksum, self.http.storage, error_queue), 

102 name="storage_transfer", 

103 ) 

104 storage_thread.start() 

105 

106 # Make the exporter download the bundle contents and set files in the right places 

107 self.logger.info("Setting up flasher bundle files in exporter") 

108 self.call("setup_flasher_bundle", force_flash_bundle) 

109 

110 # Early exit if there was an error in the background thread 

111 if not skip_exporter_http and not error_queue.empty(): 

112 raise error_queue.get() 

113 

114 with self._services_up(): 

115 with self._busybox() as console: 

116 manifest = self.manifest 

117 target = partition or manifest.spec.default_target 

118 if not target: 

119 raise ArgumentError("No partition or default target specified") 

120 

121 target_device = self._get_target_device(target, manifest, console) 

122 

123 self.logger.info(f"Using target block device: {target_device}") 

124 

125 # Preflash commands are executed before the flash operation 

126 # generally used to clean up boot entries in existing devices 

127 for preflash_command in manifest.spec.preflash_commands: 

128 self.logger.info(f"Running preflash command: {preflash_command}") 

129 console.sendline(preflash_command) 

130 console.expect(manifest.spec.login.prompt, timeout=5) 

131 

132 # make sure that the device is connected to the network and has an IP address 

133 console.sendline("udhcpc") 

134 console.expect(manifest.spec.login.prompt, timeout=10) 

135 

136 if not skip_exporter_http: 

137 # Wait for the storage write operation to complete before proceeding 

138 self.logger.info("Waiting until the http image preparation in storage is completed") 

139 storage_thread.join() 

140 

141 # Check if there were any exceptions in the background thread 

142 if not error_queue.empty(): 

143 raise error_queue.get() 

144 

145 self._flash_with_progress(console, manifest, path, image_url, target_device) 

146 

147 total_time = time.time() - start_time 

148 # total time in minutes:seconds 

149 minutes, seconds = divmod(total_time, 60) 

150 self.logger.info(f"Flashing completed in {int(minutes)}m {int(seconds):02d}s") 

151 console.sendline("reboot") 

152 time.sleep(2) 

153 self.logger.info("Powering off target") 

154 self.power.off() 

155 

156 def _flash_with_progress(self, console, manifest, path, image_url, target_path): 

157 """Flash image to target device with progress monitoring. 

158 

159 Args: 

160 console: Console object for device interaction 

161 manifest: Flasher manifest containing target definitions 

162 path: Path to the source image 

163 image_url: URL to download the image from 

164 target_path: Target device path to flash to 

165 """ 

166 # Flash the image 

167 decompress_cmd = _get_decompression_command(path) 

168 flash_cmd = ( 

169 f'( wget -q -O - "{image_url}" | ' 

170 f"{decompress_cmd} " 

171 f"dd of={target_path} bs=64k iflag=fullblock oflag=direct) &" 

172 ) 

173 console.sendline(flash_cmd) 

174 console.expect(manifest.spec.login.prompt, timeout=60) 

175 

176 console.sendline("pidof dd") 

177 console.expect(manifest.spec.login.prompt, timeout=3) 

178 dd_pid = console.before.decode(errors="ignore").splitlines()[1].strip() 

179 

180 # Initialize progress tracking variables 

181 last_pos = 0 

182 last_time = time.time() 

183 

184 while True: 

185 console.sendline(f"cat /proc/{dd_pid}/fdinfo/1") 

186 console.expect(manifest.spec.login.prompt, timeout=3) 

187 if "No such file or directory" in console.before.decode(errors="ignore"): 

188 break 

189 data = console.before.decode(errors="ignore") 

190 match = re.search(r"pos:\s+(\d+)", data) 

191 if match: 

192 current_bytes = int(match.group(1)) 

193 current_time = time.time() 

194 elapsed = current_time - last_time 

195 

196 if elapsed >= 1.0: # Update speed every second 

197 bytes_diff = current_bytes - last_pos 

198 speed_mb = (bytes_diff / (1024 * 1024)) / elapsed 

199 total_mb = current_bytes / (1024 * 1024) 

200 self.logger.info(f"Flash progress: {total_mb:.2f} MB, Speed: {speed_mb:.2f} MB/s") 

201 

202 last_pos = current_bytes 

203 last_time = current_time 

204 time.sleep(1) 

205 self.logger.info("Flushing buffers") 

206 console.sendline("sync") 

207 console.expect(manifest.spec.login.prompt, timeout=1200) 

208 

209 def _get_target_device(self, target: str, manifest: FlasherBundleManifestV1Alpha1, console) -> str: 

210 """Get the target device path from the manifest, resolving block devices if needed. 

211 

212 Args: 

213 target: Target name from manifest 

214 manifest: Flasher manifest containing target definitions 

215 console: Console object for device interaction 

216 

217 Returns: 

218 Resolved target device path 

219 

220 Raises: 

221 ArgumentError: If target is not found in manifest 

222 """ 

223 target_path = manifest.spec.targets.get(target) 

224 if target_path is None: 

225 raise ArgumentError(f"Target {target} not found in manifest") 

226 

227 if target_path.startswith("/sys/class/block#"): 

228 target_path = self._lookup_block_device(console, manifest.spec.login.prompt, target_path.split("#")[1]) 

229 

230 return target_path 

231 

232 def _transfer_bg_thread( 

233 self, 

234 src_path: PathBuf, 

235 src_operator: Operator, 

236 src_operator_scheme: str, 

237 known_hash: str | None, 

238 to_storage: OpendalClient, 

239 error_queue, 

240 ): 

241 """Transfer image to storage in the background 

242 Args: 

243 src_path: Path to the source image 

244 src_operator: Operator to read the source image 

245 to_storage: Storage operator to write the image to 

246 error_queue: Queue to put exceptions in if any 

247 known_hash: Known hash of the image 

248 """ 

249 self.logger.info(f"Writing image to storage in the background: {src_path}") 

250 try: 

251 filename = Path(src_path).name if isinstance(src_path, (str, os.PathLike)) else src_path.name 

252 

253 if src_operator_scheme == "fs": 

254 file_hash = self._sha256_file(src_operator, src_path) 

255 self.logger.info(f"Hash of {filename} is {file_hash}") 

256 else: 

257 file_hash = known_hash 

258 self.logger.info(f"Using provided hash for {filename}: {known_hash}") 

259 

260 if file_hash and to_storage.exists(filename): 

261 to_storage_hash = to_storage.hash(filename) 

262 self.logger.info(f"Hash of existing file in storage: {to_storage_hash}") 

263 

264 if to_storage_hash == file_hash: 

265 self.logger.info(f"Image {filename} already exists in storage with matching hash, skipping") 

266 return 

267 else: 

268 self.logger.info(f"Image {filename} exists in storage but hash differs, will overwrite") 

269 

270 self.logger.info(f"Uploading image to storage: {filename}") 

271 to_storage.write_from_path(filename, src_path, src_operator) 

272 

273 metadata, metadata_json = self._create_metadata_and_json(src_operator, src_path, file_hash) 

274 metadata_file = filename + ".metadata" 

275 to_storage.write_bytes(metadata_file, metadata_json.encode(errors="ignore")) 

276 

277 self.logger.info(f"Image written to storage: {filename}") 

278 

279 except Exception as e: 

280 self.logger.error(f"Error writing image to storage: {e}") 

281 error_queue.put(e) 

282 raise 

283 

284 def _sha256_file(self, src_operator, src_path) -> str: 

285 m = hashlib.sha256() 

286 with src_operator.open(src_path, "rb") as f: 

287 while True: 

288 data = f.read(size=65536) 

289 if len(data) == 0: 

290 break 

291 m.update(data) 

292 

293 return m.hexdigest() 

294 

295 def _create_metadata_and_json(self, src_operator, src_path, file_hash=None) -> tuple[Metadata, str]: 

296 """Create a metadata json string from a metadata object""" 

297 metadata = src_operator.stat(src_path) 

298 metadata_dict = { 

299 "path": str(src_path), 

300 "content_length": metadata.content_length, 

301 "etag": metadata.etag, 

302 } 

303 

304 if file_hash: 

305 metadata_dict["hash"] = file_hash 

306 

307 return metadata, json.dumps(metadata_dict) 

308 

309 def _lookup_block_device(self, console, prompt, address: str) -> str: 

310 """Lookup block device for a given address. 

311 Sometimes targets don't get assigned block device numbers in a predictable way, 

312 so we need to lookup the block device by address. 

313 """ 

314 console.send(f"ls -l /sys/class/block/ | grep {address} | head -n 1" + "\n") 

315 console.expect(prompt, timeout=5) 

316 # This produces an output like: 

317 # ls /sys/class/block/ -la | grep 4fb0000 

318 # lrwxrwxrwx 1 root root 0 Jan 1 

319 # 00:00 mmcblk1 -> ../../devices/platform/bus@100000/4fb0000.mmc/mmc_host/mmc1/mmc1:aaaa/block/mmcblk1 

320 output = console.before.decode(errors="ignore") 

321 match = re.search(r"\s(\w+)\s->", output) 

322 if match: 

323 return "/dev/" + match.group(1) 

324 else: 

325 raise ArgumentError(f"No block device found for address {address}, output was: {output}") 

326 

327 def dump( 

328 self, 

329 path: PathBuf, 

330 *, 

331 partition: str | None = None, 

332 operator: Operator | None = None, 

333 ): 

334 """Dump image from DUT""" 

335 raise NotImplementedError("Dump is not implemented for this driver yet") 

336 

337 def _filename(self, path: PathBuf) -> str: 

338 """Extract filename from url or path""" 

339 if path.startswith(("http://", "https://")): 

340 return urlparse(path).path.split("/")[-1] 

341 else: 

342 return Path(path).name 

343 

344 def _upload_artifact(self, storage, path: PathBuf, operator: Operator): 

345 """Upload artifact to storage""" 

346 filename = self._filename(path) 

347 if storage.exists(filename): 

348 # TODO: check hash for existing files 

349 self.logger.info(f"Artifact {filename} already exists in storage, skipping") 

350 storage.write_from_path(filename, path, operator=operator) 

351 

352 @contextmanager 

353 def _services_up(self): 

354 """Make sure that the http and tftp services are up an running in this context""" 

355 try: 

356 self.http.start() 

357 self.tftp.start() 

358 yield 

359 finally: 

360 self.http.stop() 

361 self.tftp.stop() 

362 

363 def _generate_uboot_env(self): 

364 """Generate a uboot environment dictionary, may need specific overrides for different targets""" 

365 tftp_host = self.tftp.get_host() 

366 return { 

367 "serverip": tftp_host, 

368 } 

369 

370 @contextmanager 

371 def _busybox(self): 

372 """Start a busybox shell. 

373 

374 This is a helper context manager that boots the device into uboot and returns a console object. 

375 """ 

376 

377 # make sure that the device is booted into the uboot console 

378 with self.uboot.reboot_to_console(debug=self._console_debug): 

379 # run dhcp discovery and gather details useful for later 

380 self._dhcp_details = self.uboot.setup_dhcp() 

381 self.logger.info(f"discovered dhcp details: {self._dhcp_details}") 

382 

383 # configure the environment necessary 

384 env = self._generate_uboot_env() 

385 self.uboot.set_env_dict(env) 

386 

387 # load any necessary files to RAM from the tftp storage 

388 manifest = self.manifest 

389 kernel_filename = Path(manifest.get_kernel_file()).name 

390 kernel_address = manifest.get_kernel_address() 

391 

392 self.uboot.run_command(f"tftpboot {kernel_address} {kernel_filename}", timeout=120) 

393 

394 if manifest.get_initram_file(): 

395 initram_filename = Path(manifest.get_initram_file()).name 

396 initram_address = manifest.get_initram_address() 

397 self.uboot.run_command(f"tftpboot {initram_address} {initram_filename}", timeout=120) 

398 

399 if manifest.get_dtb_file(): 

400 dtb_filename = Path(manifest.get_dtb_file()).name 

401 dtb_address = manifest.get_dtb_address() 

402 self.uboot.run_command(f"tftpboot {dtb_address} {dtb_filename}", timeout=120) 

403 

404 with self.serial.pexpect() as console: 

405 if self._console_debug: 

406 console.logfile_read = sys.stdout.buffer 

407 

408 self.logger.info(f"Running boot command: {manifest.spec.bootcmd}") 

409 console.send(manifest.spec.bootcmd + "\n") 

410 

411 # if manifest has login details, we need to login 

412 if manifest.spec.login.username: 

413 console.expect(manifest.spec.login.login_prompt, timeout=120) 

414 console.send(manifest.spec.login.username + "\n") 

415 

416 # if manifest has password, we need to send it 

417 if manifest.spec.login.password: 

418 console.expect("ssword:", timeout=30) 

419 console.send(manifest.spec.login.password + "\n") 

420 

421 console.expect(manifest.spec.login.prompt, timeout=120) 

422 yield console 

423 

424 def use_dtb(self, path: PathBuf, operator: Operator | None = None): 

425 """Use DTB file""" 

426 if operator is None: 

427 path, operator, operator_scheme = operator_for_path(path) 

428 

429 ... 

430 

431 def use_initram(self, path: PathBuf, operator: Operator | None = None): 

432 """Use initramfs file""" 

433 if operator is None: 

434 path, operator, operator_scheme = operator_for_path(path) 

435 

436 ... 

437 

438 def use_kernel(self, path: PathBuf, operator: Operator | None = None): 

439 """Use kernel file""" 

440 if operator is None: 

441 path, operator, operator_scheme = operator_for_path(path) 

442 

443 ... 

444 

445 @property 

446 def manifest(self): 

447 """Get flasher bundle manifest""" 

448 if self._manifest: 

449 return self._manifest 

450 

451 yaml_str = self.call("get_flasher_manifest_yaml") 

452 self._manifest = FlasherBundleManifestV1Alpha1.from_string(yaml_str) 

453 return self._manifest 

454 

455 def cli(self): 

456 @click.group 

457 def base(): 

458 """Software-defined flasher interface""" 

459 pass 

460 

461 @base.command() 

462 @click.argument("file") 

463 @click.option("--partition", type=str) 

464 @click.option("--os-image-checksum", help="SHA256 checksum of OS image (direct value)") 

465 @click.option( 

466 "--os-image-checksum-file", 

467 help="File containing SHA256 checksum of OS image", 

468 type=click.Path(exists=True, dir_okay=False), 

469 ) 

470 @click.option("--force-exporter-http", is_flag=True, help="Force use of exporter HTTP") 

471 @click.option("--force-flash-bundle", type=str, help="Force use of a specific flasher OCI bundle") 

472 @debug_console_option 

473 def flash( 

474 file, 

475 partition, 

476 os_image_checksum, 

477 os_image_checksum_file, 

478 console_debug, 

479 force_exporter_http, 

480 force_flash_bundle, 

481 ): 

482 """Flash image to DUT from file""" 

483 if os_image_checksum_file and os.path.exists(os_image_checksum_file): 

484 with open(os_image_checksum_file) as f: 

485 os_image_checksum = f.read().strip().split()[0] 

486 self.logger.info(f"Read checksum from file: {os_image_checksum}") 

487 

488 self.set_console_debug(console_debug) 

489 self.flash( 

490 file, 

491 partition=partition, 

492 force_exporter_http=force_exporter_http, 

493 force_flash_bundle=force_flash_bundle, 

494 ) 

495 

496 @base.command() 

497 @debug_console_option 

498 def bootloader_shell(console_debug): 

499 """Start a uboot/bootloader interactive console""" 

500 self.set_console_debug(console_debug) 

501 with self.bootloader_shell() as serial: 

502 print("=> ", end="", flush=True) 

503 c = Console(serial) 

504 c.run() 

505 

506 @base.command() 

507 @debug_console_option 

508 def busybox_shell(console_debug): 

509 """Start a busybox shell""" 

510 self.set_console_debug(console_debug) 

511 with self.busybox_shell() as serial: 

512 print("# ", end="", flush=True) 

513 c = Console(serial) 

514 c.run() 

515 

516 return base 

517 

518 

519def _get_decompression_command(filename_or_url) -> str: 

520 """ 

521 Determine the appropriate decompression command based on file extension 

522 

523 Args: 

524 filename (str): Name of the file to check 

525 

526 Returns: 

527 str: Decompression command ('zcat', 'xzcat', or 'cat' for uncompressed) 

528 """ 

529 if type(filename_or_url) is PosixPath: 

530 filename = filename_or_url.name 

531 elif filename_or_url.startswith(("http://", "https://")): 

532 filename = urlparse(filename_or_url).path.split("/")[-1] 

533 

534 filename = filename.lower() 

535 if filename.endswith((".gz", ".gzip")): 

536 return "zcat |" 

537 elif filename.endswith(".xz"): 

538 return "xzcat |" 

539 return ""