Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py: 39%

349 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 17:10 +0200

1from __future__ import annotations 

2 

3from abc import ABCMeta, abstractmethod 

4from collections.abc import Generator 

5from contextlib import closing 

6from dataclasses import dataclass 

7from io import BytesIO 

8from pathlib import Path 

9from urllib.parse import urlparse 

10from uuid import UUID 

11 

12import click 

13from anyio import EndOfStream 

14from anyio.abc import ObjectStream 

15from opendal import Operator 

16from pydantic import ConfigDict, validate_call 

17 

18from .adapter import OpendalAdapter 

19from .common import Capability, HashAlgo, Metadata, Mode, PathBuf, PresignedRequest 

20from jumpstarter.client import DriverClient 

21from jumpstarter.common.exceptions import ArgumentError 

22from jumpstarter.streams.encoding import Compression 

23 

24 

25@dataclass(kw_only=True) 

26class BytesIOStream(ObjectStream[bytes]): 

27 buf: BytesIO 

28 

29 async def send(self, item: bytes): 

30 self.buf.write(item) 

31 

32 async def receive(self) -> bytes: 

33 item = self.buf.read(65535) 

34 if len(item) == 0: 

35 raise EndOfStream 

36 return item 

37 

38 async def send_eof(self): 

39 pass 

40 

41 async def aclose(self): 

42 pass 

43 

44 

45def operator_for_path(path: PathBuf) -> tuple[PathBuf, Operator, str]: 

46 """Create an operator for the given path 

47 Return a tuple of: 

48 - the path 

49 - the operator for the given path 

50 - the scheme of the operator. 

51 """ 

52 if type(path) is str and path.startswith(("http://", "https://")): 

53 parsed_url = urlparse(path) 

54 operator = Operator("http", root="/", endpoint=f"{parsed_url.scheme}://{parsed_url.netloc}") 

55 return Path(parsed_url.path), operator, "http" 

56 else: 

57 return Path(path).resolve(), Operator("fs", root="/"), "fs" 

58 

59 

60@dataclass(kw_only=True) 

61class OpendalFile: 

62 """ 

63 A file-like object representing a remote file 

64 """ 

65 

66 client: OpendalClient 

67 fd: UUID 

68 

69 def __write(self, handle): 

70 return self.client.call("file_write", self.fd, handle) 

71 

72 def __read(self, handle): 

73 return self.client.call("file_read", self.fd, handle) 

74 

75 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

76 def write_from_path(self, path: PathBuf, operator: Operator | None = None): 

77 """ 

78 Write into remote file with content from local file 

79 """ 

80 if operator is None: 

81 path, operator, _ = operator_for_path(path) 

82 

83 with OpendalAdapter(client=self.client, operator=operator, path=path) as handle: 

84 return self.__write(handle) 

85 

86 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

87 def read_into_path(self, path: PathBuf, operator: Operator | None = None): 

88 """ 

89 Read content from remote file into local file 

90 """ 

91 if operator is None: 

92 path, operator, _ = operator_for_path(path) 

93 

94 with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle: 

95 return self.__read(handle) 

96 

97 @validate_call(validate_return=True) 

98 def write_bytes(self, data: bytes) -> None: 

99 buf = BytesIO(data) 

100 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: 

101 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: 

102 self.__write(handle) 

103 

104 @validate_call(validate_return=True) 

105 def read_bytes(self) -> bytes: 

106 buf = BytesIO() 

107 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: 

108 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: 

109 self.__read(handle) 

110 return buf.getvalue() 

111 

112 @validate_call(validate_return=True) 

113 def seek(self, pos: int, whence: int = 0) -> int: 

114 """ 

115 Change the cursor position to the given byte offset. 

116 Offset is interpreted relative to the position indicated by whence. 

117 The default value for whence is SEEK_SET. Values for whence are: 

118 

119 SEEK_SET or 0 – start of the file (the default); offset should be zero or positive 

120 

121 SEEK_CUR or 1 – current cursor position; offset may be negative 

122 

123 SEEK_END or 2 – end of the file; offset is usually negative 

124 

125 Return the new cursor position 

126 """ 

127 return self.client.call("file_seek", self.fd, pos, whence) 

128 

129 @validate_call(validate_return=True) 

130 def tell(self) -> int: 

131 """ 

132 Return the current cursor position 

133 """ 

134 return self.client.call("file_tell", self.fd) 

135 

136 @validate_call(validate_return=True) 

137 def close(self) -> None: 

138 """ 

139 Close the file 

140 """ 

141 return self.client.call("file_close", self.fd) 

142 

143 @property 

144 @validate_call(validate_return=True) 

145 def closed(self) -> bool: 

146 """ 

147 Check if the file is closed 

148 """ 

149 return self.client.call("file_closed", self.fd) 

150 

151 @validate_call(validate_return=True) 

152 def readable(self) -> bool: 

153 """ 

154 Check if the file is readable 

155 """ 

156 return self.client.call("file_readable", self.fd) 

157 

158 @validate_call(validate_return=True) 

159 def seekable(self) -> bool: 

160 """ 

161 Check if the file is seekable 

162 """ 

163 return self.client.call("file_seekable", self.fd) 

164 

165 @validate_call(validate_return=True) 

166 def writable(self) -> bool: 

167 """ 

168 Check if the file is writable 

169 """ 

170 return self.client.call("file_writable", self.fd) 

171 

172 

173class OpendalClient(DriverClient): 

174 @validate_call(validate_return=True) 

175 def write_bytes(self, /, path: PathBuf, data: bytes) -> None: 

176 """ 

177 Write data into path 

178 

179 >>> opendal.write_bytes("file.txt", b"content") 

180 """ 

181 with closing(self.open(path, "wb")) as f: 

182 f.write_bytes(data) 

183 

184 @validate_call(validate_return=True) 

185 def read_bytes(self, /, path: PathBuf) -> bytes: 

186 """ 

187 Read data from path 

188 

189 >>> opendal.write_bytes("file.txt", b"content") 

190 >>> opendal.read_bytes("file.txt") 

191 b'content' 

192 """ 

193 with closing(self.open(path, "rb")) as f: 

194 return f.read_bytes() 

195 

196 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

197 def write_from_path(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> None: 

198 """ 

199 Write data from src into dst 

200 

201 >>> _ = (tmp / "src").write_bytes(b"content") 

202 >>> opendal.write_from_path("file.txt", tmp / "src") 

203 >>> opendal.read_bytes("file.txt") 

204 b'content' 

205 """ 

206 with closing(self.open(dst, "wb")) as f: 

207 f.write_from_path(src, operator) 

208 

209 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

210 def read_into_path(self, src: PathBuf, dst: PathBuf, operator: Operator | None = None) -> None: 

211 """ 

212 Read data into dst from src 

213 

214 >>> opendal.write_bytes("file.txt", b"content") 

215 >>> opendal.read_into_path("file.txt", tmp / "dst") 

216 >>> (tmp / "dst").read_bytes() 

217 b'content' 

218 """ 

219 with closing(self.open(src, "rb")) as f: 

220 f.read_into_path(dst, operator) 

221 

222 @validate_call 

223 def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile: 

224 """ 

225 Open a file-like reader for the given path 

226 

227 >>> file = opendal.open("file.txt", "wb") 

228 >>> file.write_bytes(b"content") 

229 >>> file.close() 

230 """ 

231 return OpendalFile(client=self, fd=self.call("open", path, mode)) 

232 

233 @validate_call(validate_return=True) 

234 def stat(self, /, path: PathBuf) -> Metadata: 

235 """ 

236 Get current path's metadata 

237 

238 >>> opendal.write_bytes("file.txt", b"content") 

239 >>> opendal.stat("file.txt").mode.is_file() 

240 True 

241 """ 

242 return self.call("stat", path) 

243 

244 @validate_call(validate_return=True) 

245 def hash(self, /, path: PathBuf, algo: HashAlgo = "sha256") -> str: 

246 """ 

247 Get current path's hash 

248 

249 >>> opendal.write_bytes("file.txt", b"content") 

250 >>> opendal.hash("file.txt") 

251 'ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73' 

252 """ 

253 return self.call("hash", path, algo) 

254 

255 @validate_call(validate_return=True) 

256 def copy(self, /, source: PathBuf, target: PathBuf): 

257 """ 

258 Copy source to target 

259 

260 >>> opendal.write_bytes("file.txt", b"content") 

261 >>> opendal.copy("file.txt", "copy.txt") 

262 >>> opendal.exists("copy.txt") 

263 True 

264 """ 

265 self.call("copy", source, target) 

266 

267 @validate_call(validate_return=True) 

268 def rename(self, /, source: PathBuf, target: PathBuf): 

269 """ 

270 Rename source to target 

271 

272 >>> opendal.write_bytes("file.txt", b"content") 

273 >>> opendal.rename("file.txt", "rename.txt") 

274 >>> opendal.exists("file.txt") 

275 False 

276 >>> opendal.exists("rename.txt") 

277 True 

278 """ 

279 self.call("rename", source, target) 

280 

281 @validate_call(validate_return=True) 

282 def remove_all(self, /, path: PathBuf): 

283 """ 

284 Remove all file under path 

285 

286 >>> opendal.write_bytes("dir/file.txt", b"content") 

287 >>> opendal.remove_all("dir/") 

288 >>> opendal.exists("dir/file.txt") 

289 False 

290 """ 

291 self.call("remove_all", path) 

292 

293 @validate_call(validate_return=True) 

294 def create_dir(self, /, path: PathBuf): 

295 """ 

296 Create a dir at given path 

297 

298 To indicate that a path is a directory, it is compulsory to include a trailing / in the path. 

299 

300 Create on existing dir will succeed. 

301 Create dir is always recursive, works like mkdir -p. 

302 

303 >>> opendal.create_dir("a/b/c/") 

304 >>> opendal.exists("a/b/c/") 

305 True 

306 """ 

307 self.call("create_dir", path) 

308 

309 @validate_call(validate_return=True) 

310 def delete(self, /, path: PathBuf): 

311 """ 

312 Delete given path 

313 

314 Delete not existing error won't return errors 

315 

316 >>> opendal.write_bytes("file.txt", b"content") 

317 >>> opendal.exists("file.txt") 

318 True 

319 >>> opendal.delete("file.txt") 

320 >>> opendal.exists("file.txt") 

321 False 

322 """ 

323 self.call("delete", path) 

324 

325 @validate_call(validate_return=True) 

326 def exists(self, /, path: PathBuf) -> bool: 

327 """ 

328 Check if given path exists 

329 

330 >>> opendal.exists("file.txt") 

331 False 

332 >>> opendal.write_bytes("file.txt", b"content") 

333 >>> opendal.exists("file.txt") 

334 True 

335 """ 

336 return self.call("exists", path) 

337 

338 @validate_call 

339 def list(self, /, path: PathBuf) -> Generator[str, None, None]: 

340 """ 

341 List files and directories under given path 

342 

343 >>> opendal.write_bytes("dir/file.txt", b"content") 

344 >>> opendal.write_bytes("dir/another.txt", b"content") 

345 >>> sorted(opendal.list("dir/")) 

346 ['dir/', 'dir/another.txt', 'dir/file.txt'] 

347 """ 

348 yield from self.streamingcall("list", path) 

349 

350 @validate_call 

351 def scan(self, /, path: PathBuf) -> Generator[str, None, None]: 

352 """ 

353 List files and directories under given path recursively 

354 

355 >>> opendal.write_bytes("dir/a/file.txt", b"content") 

356 >>> opendal.write_bytes("dir/b/another.txt", b"content") 

357 >>> sorted(opendal.scan("dir/")) 

358 ['dir/', 'dir/a/', 'dir/a/file.txt', 'dir/b/', 'dir/b/another.txt'] 

359 """ 

360 yield from self.streamingcall("scan", path) 

361 

362 @validate_call(validate_return=True) 

363 def presign_stat(self, /, path: PathBuf, expire_second: int) -> PresignedRequest: 

364 """ 

365 Presign an operation for stat (HEAD) which expires after expire_second seconds 

366 """ 

367 return self.call("presign_stat", path, expire_second) 

368 

369 @validate_call(validate_return=True) 

370 def presign_read(self, /, path: PathBuf, expire_second: int) -> PresignedRequest: 

371 """ 

372 Presign an operation for read (GET) which expires after expire_second seconds 

373 """ 

374 return self.call("presign_read", path, expire_second) 

375 

376 @validate_call(validate_return=True) 

377 def presign_write(self, /, path: PathBuf, expire_second: int) -> PresignedRequest: 

378 """ 

379 Presign an operation for write (PUT) which expires after expire_second seconds 

380 """ 

381 return self.call("presign_write", path, expire_second) 

382 

383 @validate_call(validate_return=True) 

384 def capability(self, /) -> Capability: 

385 """ 

386 Get capabilities of the underlying storage 

387 

388 >>> cap = opendal.capability() 

389 >>> cap.copy 

390 True 

391 >>> cap.presign_read 

392 False 

393 """ 

394 return self.call("capability") 

395 

396 def cli(self): # noqa: C901 

397 arg_path = click.argument("path", type=click.Path()) 

398 arg_source = click.argument("source", type=click.Path()) 

399 arg_target = click.argument("target", type=click.Path()) 

400 arg_src = click.argument("src", type=click.Path()) 

401 arg_dst = click.argument("dst", type=click.Path()) 

402 opt_expire_second = click.option("--expire-second", type=int, required=True) 

403 

404 @click.group 

405 def base(): 

406 """Opendal Storage""" 

407 

408 @base.command 

409 @arg_path 

410 def write_bytes(path): 

411 data = click.get_binary_stream("stdin").read() 

412 self.write_bytes(path, data) 

413 

414 @base.command 

415 @arg_path 

416 def read_bytes(path): 

417 data = self.read_bytes(path) 

418 click.echo(data, nl=False) 

419 

420 @base.command 

421 @arg_dst 

422 @arg_src 

423 def write_from_path(dst, src): 

424 self.write_from_path(dst, src) 

425 

426 @base.command 

427 @arg_src 

428 @arg_dst 

429 def read_into_path(src, dst): 

430 self.read_into_path(src, dst) 

431 

432 @base.command 

433 @arg_path 

434 def stat(path): 

435 click.echo(self.stat(path).model_dump_json(indent=2, by_alias=True)) 

436 

437 @base.command 

438 @arg_path 

439 @click.option("--algo", type=click.Choice(["md5", "sha256"])) 

440 def hash(path, algo): 

441 click.echo(self.hash(path, algo)) 

442 

443 @base.command 

444 @arg_source 

445 @arg_target 

446 def copy(source, target): 

447 self.copy(source, target) 

448 

449 @base.command 

450 @arg_source 

451 @arg_target 

452 def rename(source, target): 

453 self.rename(source, target) 

454 

455 @base.command 

456 @arg_path 

457 def remove_all(path): 

458 self.remove_all(path) 

459 

460 @base.command 

461 @arg_path 

462 def create_dir(path): 

463 self.create_dir(path) 

464 

465 @base.command 

466 @arg_path 

467 def delete(path): 

468 self.delete(path) 

469 

470 @base.command 

471 @arg_path 

472 def exists(path): 

473 if not self.exists(path): 

474 raise click.ClickException(f"path {path} does not exist") 

475 

476 @base.command 

477 @arg_path 

478 def list(path): 

479 for entry in self.list(path): 

480 click.echo(entry) 

481 

482 @base.command 

483 @arg_path 

484 def scan(path): 

485 for entry in self.scan(path): 

486 click.echo(entry) 

487 

488 @base.command 

489 @arg_path 

490 @opt_expire_second 

491 def presign_stat(path, expire_second): 

492 click.echo(self.presign_stat(path, expire_second).model_dump_json(indent=2)) 

493 

494 @base.command 

495 @arg_path 

496 @opt_expire_second 

497 def presign_read(path, expire_second): 

498 click.echo(self.presign_read(path, expire_second).model_dump_json(indent=2)) 

499 

500 @base.command 

501 @arg_path 

502 @opt_expire_second 

503 def presign_write(path, expire_second): 

504 click.echo(self.presign_write(path, expire_second).model_dump_json(indent=2)) 

505 

506 @base.command 

507 def capability(): 

508 click.echo(self.capability().model_dump_json(indent=2)) 

509 

510 return base 

511 

512 

513class FlasherClientInterface(metaclass=ABCMeta): 

514 @abstractmethod 

515 def flash( 

516 self, 

517 path: PathBuf, 

518 *, 

519 partition: str | None = None, 

520 operator: Operator | None = None, 

521 compression: Compression | None = None, 

522 ): 

523 """Flash image to DUT""" 

524 ... 

525 

526 @abstractmethod 

527 def dump( 

528 self, 

529 path: PathBuf, 

530 *, 

531 partition: str | None = None, 

532 operator: Operator | None = None, 

533 compression: Compression | None = None, 

534 ): 

535 """Dump image from DUT""" 

536 ... 

537 

538 def cli(self): 

539 @click.group 

540 def base(): 

541 """Generic flasher interface""" 

542 pass 

543 

544 @base.command() 

545 @click.argument("file") 

546 @click.option("--partition", type=str) 

547 @click.option("--compression", type=click.Choice(Compression, case_sensitive=False)) 

548 def flash(file, partition, compression): 

549 """Flash image to DUT from file""" 

550 self.flash(file, partition=partition, compression=compression) 

551 

552 @base.command() 

553 @click.argument("file") 

554 @click.option("--partition", type=str) 

555 @click.option("--compression", type=click.Choice(Compression, case_sensitive=False)) 

556 def dump(file, partition, compression): 

557 """Dump image from DUT to file""" 

558 self.dump(file, partition=partition, compression=compression) 

559 

560 return base 

561 

562 

563class FlasherClient(FlasherClientInterface, DriverClient): 

564 def flash( 

565 self, 

566 path: PathBuf, 

567 *, 

568 partition: str | None = None, 

569 operator: Operator | None = None, 

570 compression: Compression | None = None, 

571 ): 

572 """Flash image to DUT""" 

573 if operator is None: 

574 path, operator, _ = operator_for_path(path) 

575 

576 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb", compression=compression) as handle: 

577 return self.call("flash", handle, partition) 

578 

579 def dump( 

580 self, 

581 path: PathBuf, 

582 *, 

583 partition: str | None = None, 

584 operator: Operator | None = None, 

585 compression: Compression | None = None, 

586 ): 

587 """Dump image from DUT""" 

588 if operator is None: 

589 path, operator, _ = operator_for_path(path) 

590 

591 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb", compression=compression) as handle: 

592 return self.call("dump", handle, partition) 

593 

594 

595class StorageMuxClient(DriverClient): 

596 def host(self): 

597 """Connect storage to host""" 

598 return self.call("host") 

599 

600 def dut(self): 

601 """Connect storage to dut""" 

602 return self.call("dut") 

603 

604 def off(self): 

605 """Disconnect storage""" 

606 return self.call("off") 

607 

608 def write(self, handle): 

609 return self.call("write", handle) 

610 

611 def read(self, handle): 

612 return self.call("read", handle) 

613 

614 def write_file(self, operator: Operator, path: str): 

615 with OpendalAdapter(client=self, operator=operator, path=path) as handle: 

616 return self.write(handle) 

617 

618 def read_file(self, operator: Operator, path: str): 

619 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle: 

620 return self.read(handle) 

621 

622 def write_local_file(self, filepath): 

623 """Write a local file to the storage device""" 

624 absolute = Path(filepath).resolve() 

625 return self.write_file(operator=Operator("fs", root="/"), path=str(absolute)) 

626 

627 def read_local_file(self, filepath): 

628 """Read into a local file from the storage device""" 

629 absolute = Path(filepath).resolve() 

630 return self.read_file(operator=Operator("fs", root="/"), path=str(absolute)) 

631 

632 def cli(self, base=None): 

633 if base is None: 

634 base = click.group(lambda: None) 

635 

636 @base.command() 

637 def host(): 

638 """Connect storage to host""" 

639 self.host() 

640 

641 @base.command() 

642 def dut(): 

643 """Connect storage to dut""" 

644 self.dut() 

645 

646 @base.command() 

647 def off(): 

648 """Disconnect storage""" 

649 self.off() 

650 

651 @base.command() 

652 @click.argument("file") 

653 def write_local_file(file): 

654 self.write_local_file(file) 

655 

656 return base 

657 

658 

659class StorageMuxFlasherClient(FlasherClient, StorageMuxClient): 

660 def flash( 

661 self, 

662 path: PathBuf, 

663 *, 

664 partition: str | None = None, 

665 operator: Operator | None = None, 

666 compression: Compression | None = None, 

667 ): 

668 """Flash image to DUT""" 

669 if partition is not None: 

670 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided") 

671 

672 self.host() 

673 

674 if operator is None: 

675 path, operator, _ = operator_for_path(path) 

676 

677 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb", compression=compression) as handle: 

678 try: 

679 return self.write(handle) 

680 finally: 

681 self.dut() 

682 

683 def dump( 

684 self, 

685 path: PathBuf, 

686 *, 

687 partition: str | None = None, 

688 operator: Operator | None = None, 

689 compression: Compression | None = None, 

690 ): 

691 """Dump image from DUT""" 

692 if partition is not None: 

693 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided") 

694 

695 self.call("host") 

696 

697 if operator is None: 

698 path, operator, _ = operator_for_path(path) 

699 

700 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb", compression=compression) as handle: 

701 try: 

702 return self.call("read", handle) 

703 finally: 

704 self.call("dut") 

705 

706 def cli(self): 

707 top_cli = FlasherClient.cli(self) 

708 return StorageMuxClient.cli(self, top_cli)