Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py: 39%

346 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 20:29 +0000

1from __future__ import annotations 

2 

3from abc import ABCMeta, abstractmethod 

4from collections.abc import Generator 

5from contextlib import closing 

6from dataclasses import dataclass 

7from io import BytesIO 

8from pathlib import Path 

9from urllib.parse import urlparse 

10from uuid import UUID 

11 

12import asyncclick as click 

13from anyio import EndOfStream 

14from anyio.abc import ObjectStream 

15from opendal import Operator 

16from pydantic import ConfigDict, validate_call 

17 

18from .adapter import OpendalAdapter 

19from .common import Capability, HashAlgo, Metadata, Mode, PathBuf, PresignedRequest 

20from jumpstarter.client import DriverClient 

21from jumpstarter.common.exceptions import ArgumentError 

22 

23 

24@dataclass(kw_only=True) 

25class BytesIOStream(ObjectStream[bytes]): 

26 buf: BytesIO 

27 

28 async def send(self, item: bytes): 

29 self.buf.write(item) 

30 

31 async def receive(self) -> bytes: 

32 item = self.buf.read(65535) 

33 if len(item) == 0: 

34 raise EndOfStream 

35 return item 

36 

37 async def send_eof(self): 

38 pass 

39 

40 async def aclose(self): 

41 pass 

42 

43 

44def operator_for_path(path: PathBuf) -> tuple[PathBuf, Operator, str]: 

45 """Create an operator for the given path 

46 Return a tuple of: 

47 - the path 

48 - the operator for the given path 

49 - the scheme of the operator. 

50 """ 

51 if type(path) is str and path.startswith(("http://", "https://")): 

52 parsed_url = urlparse(path) 

53 operator = Operator("http", root="/", endpoint=f"{parsed_url.scheme}://{parsed_url.netloc}") 

54 return Path(parsed_url.path), operator, "http" 

55 else: 

56 return Path(path).resolve(), Operator("fs", root="/"), "fs" 

57 

58 

59@dataclass(kw_only=True) 

60class OpendalFile: 

61 """ 

62 A file-like object representing a remote file 

63 """ 

64 

65 client: OpendalClient 

66 fd: UUID 

67 

68 def __write(self, handle): 

69 return self.client.call("file_write", self.fd, handle) 

70 

71 def __read(self, handle): 

72 return self.client.call("file_read", self.fd, handle) 

73 

74 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

75 def write_from_path(self, path: PathBuf, operator: Operator | None = None): 

76 """ 

77 Write into remote file with content from local file 

78 """ 

79 if operator is None: 

80 path, operator, _ = operator_for_path(path) 

81 

82 with OpendalAdapter(client=self.client, operator=operator, path=path) as handle: 

83 return self.__write(handle) 

84 

85 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

86 def read_into_path(self, path: PathBuf, operator: Operator | None = None): 

87 """ 

88 Read content from remote file into local file 

89 """ 

90 if operator is None: 

91 path, operator, _ = operator_for_path(path) 

92 

93 with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle: 

94 return self.__read(handle) 

95 

96 @validate_call(validate_return=True) 

97 def write_bytes(self, data: bytes) -> None: 

98 buf = BytesIO(data) 

99 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: 

100 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: 

101 self.__write(handle) 

102 

103 @validate_call(validate_return=True) 

104 def read_bytes(self) -> bytes: 

105 buf = BytesIO() 

106 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: 

107 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: 

108 self.__read(handle) 

109 return buf.getvalue() 

110 

111 @validate_call(validate_return=True) 

112 def seek(self, pos: int, whence: int = 0) -> int: 

113 """ 

114 Change the cursor position to the given byte offset. 

115 Offset is interpreted relative to the position indicated by whence. 

116 The default value for whence is SEEK_SET. Values for whence are: 

117 

118 SEEK_SET or 0 – start of the file (the default); offset should be zero or positive 

119 

120 SEEK_CUR or 1 – current cursor position; offset may be negative 

121 

122 SEEK_END or 2 – end of the file; offset is usually negative 

123 

124 Return the new cursor position 

125 """ 

126 return self.client.call("file_seek", self.fd, pos, whence) 

127 

128 @validate_call(validate_return=True) 

129 def tell(self) -> int: 

130 """ 

131 Return the current cursor position 

132 """ 

133 return self.client.call("file_tell", self.fd) 

134 

135 @validate_call(validate_return=True) 

136 def close(self) -> None: 

137 """ 

138 Close the file 

139 """ 

140 return self.client.call("file_close", self.fd) 

141 

142 @property 

143 @validate_call(validate_return=True) 

144 def closed(self) -> bool: 

145 """ 

146 Check if the file is closed 

147 """ 

148 return self.client.call("file_closed", self.fd) 

149 

150 @validate_call(validate_return=True) 

151 def readable(self) -> bool: 

152 """ 

153 Check if the file is readable 

154 """ 

155 return self.client.call("file_readable", self.fd) 

156 

157 @validate_call(validate_return=True) 

158 def seekable(self) -> bool: 

159 """ 

160 Check if the file is seekable 

161 """ 

162 return self.client.call("file_seekable", self.fd) 

163 

164 @validate_call(validate_return=True) 

165 def writable(self) -> bool: 

166 """ 

167 Check if the file is writable 

168 """ 

169 return self.client.call("file_writable", self.fd) 

170 

171 

172class OpendalClient(DriverClient): 

173 @validate_call(validate_return=True) 

174 def write_bytes(self, /, path: PathBuf, data: bytes) -> None: 

175 """ 

176 Write data into path 

177 

178 >>> opendal.write_bytes("file.txt", b"content") 

179 """ 

180 with closing(self.open(path, "wb")) as f: 

181 f.write_bytes(data) 

182 

183 @validate_call(validate_return=True) 

184 def read_bytes(self, /, path: PathBuf) -> bytes: 

185 """ 

186 Read data from path 

187 

188 >>> opendal.write_bytes("file.txt", b"content") 

189 >>> opendal.read_bytes("file.txt") 

190 b'content' 

191 """ 

192 with closing(self.open(path, "rb")) as f: 

193 return f.read_bytes() 

194 

195 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

196 def write_from_path(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> None: 

197 """ 

198 Write data from src into dst 

199 

200 >>> _ = (tmp / "src").write_bytes(b"content") 

201 >>> opendal.write_from_path("file.txt", tmp / "src") 

202 >>> opendal.read_bytes("file.txt") 

203 b'content' 

204 """ 

205 with closing(self.open(dst, "wb")) as f: 

206 f.write_from_path(src, operator) 

207 

208 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) 

209 def read_into_path(self, src: PathBuf, dst: PathBuf, operator: Operator | None = None) -> None: 

210 """ 

211 Read data into dst from src 

212 

213 >>> opendal.write_bytes("file.txt", b"content") 

214 >>> opendal.read_into_path("file.txt", tmp / "dst") 

215 >>> (tmp / "dst").read_bytes() 

216 b'content' 

217 """ 

218 with closing(self.open(src, "rb")) as f: 

219 f.read_into_path(dst, operator) 

220 

221 @validate_call 

222 def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile: 

223 """ 

224 Open a file-like reader for the given path 

225 

226 >>> file = opendal.open("file.txt", "wb") 

227 >>> file.write_bytes(b"content") 

228 >>> file.close() 

229 """ 

230 return OpendalFile(client=self, fd=self.call("open", path, mode)) 

231 

232 @validate_call(validate_return=True) 

233 def stat(self, /, path: PathBuf) -> Metadata: 

234 """ 

235 Get current path's metadata 

236 

237 >>> opendal.write_bytes("file.txt", b"content") 

238 >>> opendal.stat("file.txt").mode.is_file() 

239 True 

240 """ 

241 return self.call("stat", path) 

242 

243 @validate_call(validate_return=True) 

244 def hash(self, /, path: PathBuf, algo: HashAlgo = "sha256") -> str: 

245 """ 

246 Get current path's hash 

247 

248 >>> opendal.write_bytes("file.txt", b"content") 

249 >>> opendal.hash("file.txt") 

250 'ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73' 

251 """ 

252 return self.call("hash", path, algo) 

253 

254 @validate_call(validate_return=True) 

255 def copy(self, /, source: PathBuf, target: PathBuf): 

256 """ 

257 Copy source to target 

258 

259 >>> opendal.write_bytes("file.txt", b"content") 

260 >>> opendal.copy("file.txt", "copy.txt") 

261 >>> opendal.exists("copy.txt") 

262 True 

263 """ 

264 self.call("copy", source, target) 

265 

266 @validate_call(validate_return=True) 

267 def rename(self, /, source: PathBuf, target: PathBuf): 

268 """ 

269 Rename source to target 

270 

271 >>> opendal.write_bytes("file.txt", b"content") 

272 >>> opendal.rename("file.txt", "rename.txt") 

273 >>> opendal.exists("file.txt") 

274 False 

275 >>> opendal.exists("rename.txt") 

276 True 

277 """ 

278 self.call("rename", source, target) 

279 

280 @validate_call(validate_return=True) 

281 def remove_all(self, /, path: PathBuf): 

282 """ 

283 Remove all file under path 

284 

285 >>> opendal.write_bytes("dir/file.txt", b"content") 

286 >>> opendal.remove_all("dir/") 

287 >>> opendal.exists("dir/file.txt") 

288 False 

289 """ 

290 self.call("remove_all", path) 

291 

292 @validate_call(validate_return=True) 

293 def create_dir(self, /, path: PathBuf): 

294 """ 

295 Create a dir at given path 

296 

297 To indicate that a path is a directory, it is compulsory to include a trailing / in the path. 

298 

299 Create on existing dir will succeed. 

300 Create dir is always recursive, works like mkdir -p. 

301 

302 >>> opendal.create_dir("a/b/c/") 

303 >>> opendal.exists("a/b/c/") 

304 True 

305 """ 

306 self.call("create_dir", path) 

307 

308 @validate_call(validate_return=True) 

309 def delete(self, /, path: PathBuf): 

310 """ 

311 Delete given path 

312 

313 Delete not existing error won't return errors 

314 

315 >>> opendal.write_bytes("file.txt", b"content") 

316 >>> opendal.exists("file.txt") 

317 True 

318 >>> opendal.delete("file.txt") 

319 >>> opendal.exists("file.txt") 

320 False 

321 """ 

322 self.call("delete", path) 

323 

324 @validate_call(validate_return=True) 

325 def exists(self, /, path: PathBuf) -> bool: 

326 """ 

327 Check if given path exists 

328 

329 >>> opendal.exists("file.txt") 

330 False 

331 >>> opendal.write_bytes("file.txt", b"content") 

332 >>> opendal.exists("file.txt") 

333 True 

334 """ 

335 return self.call("exists", path) 

336 

337 @validate_call 

338 def list(self, /, path: PathBuf) -> Generator[str, None, None]: 

339 """ 

340 List files and directories under given path 

341 

342 >>> opendal.write_bytes("dir/file.txt", b"content") 

343 >>> opendal.write_bytes("dir/another.txt", b"content") 

344 >>> sorted(opendal.list("dir/")) 

345 ['dir/', 'dir/another.txt', 'dir/file.txt'] 

346 """ 

347 yield from self.streamingcall("list", path) 

348 

349 @validate_call 

350 def scan(self, /, path: PathBuf) -> Generator[str, None, None]: 

351 """ 

352 List files and directories under given path recursively 

353 

354 >>> opendal.write_bytes("dir/a/file.txt", b"content") 

355 >>> opendal.write_bytes("dir/b/another.txt", b"content") 

356 >>> sorted(opendal.scan("dir/")) 

357 ['dir/', 'dir/a/', 'dir/a/file.txt', 'dir/b/', 'dir/b/another.txt'] 

358 """ 

359 yield from self.streamingcall("scan", path) 

360 

361 @validate_call(validate_return=True) 

362 def presign_stat(self, /, path: PathBuf, expire_second: int) -> PresignedRequest: 

363 """ 

364 Presign an operation for stat (HEAD) which expires after expire_second seconds 

365 """ 

366 return self.call("presign_stat", path, expire_second) 

367 

368 @validate_call(validate_return=True) 

369 def presign_read(self, /, path: PathBuf, expire_second: int) -> PresignedRequest: 

370 """ 

371 Presign an operation for read (GET) which expires after expire_second seconds 

372 """ 

373 return self.call("presign_read", path, expire_second) 

374 

375 @validate_call(validate_return=True) 

376 def presign_write(self, /, path: PathBuf, expire_second: int) -> PresignedRequest: 

377 """ 

378 Presign an operation for write (PUT) which expires after expire_second seconds 

379 """ 

380 return self.call("presign_write", path, expire_second) 

381 

382 @validate_call(validate_return=True) 

383 def capability(self, /) -> Capability: 

384 """ 

385 Get capabilities of the underlying storage 

386 

387 >>> cap = opendal.capability() 

388 >>> cap.copy 

389 True 

390 >>> cap.presign_read 

391 False 

392 """ 

393 return self.call("capability") 

394 

395 def cli(self): # noqa: C901 

396 arg_path = click.argument("path", type=click.Path()) 

397 arg_source = click.argument("source", type=click.Path()) 

398 arg_target = click.argument("target", type=click.Path()) 

399 arg_src = click.argument("src", type=click.Path()) 

400 arg_dst = click.argument("dst", type=click.Path()) 

401 opt_expire_second = click.option("--expire-second", type=int, required=True) 

402 

403 @click.group 

404 def base(): 

405 """Opendal Storage""" 

406 

407 @base.command 

408 @arg_path 

409 def write_bytes(path): 

410 data = click.get_binary_stream("stdin").read() 

411 self.write_bytes(path, data) 

412 

413 @base.command 

414 @arg_path 

415 def read_bytes(path): 

416 data = self.read_bytes(path) 

417 click.echo(data, nl=False) 

418 

419 @base.command 

420 @arg_dst 

421 @arg_src 

422 def write_from_path(dst, src): 

423 self.write_from_path(dst, src) 

424 

425 @base.command 

426 @arg_src 

427 @arg_dst 

428 def read_into_path(src, dst): 

429 self.read_into_path(src, dst) 

430 

431 @base.command 

432 @arg_path 

433 def stat(path): 

434 click.echo(self.stat(path).model_dump_json(indent=2, by_alias=True)) 

435 

436 @base.command 

437 @arg_path 

438 @click.option("--algo", type=click.Choice(["md5", "sha256"])) 

439 def hash(path, algo): 

440 click.echo(self.hash(path, algo)) 

441 

442 @base.command 

443 @arg_source 

444 @arg_target 

445 def copy(source, target): 

446 self.copy(source, target) 

447 

448 @base.command 

449 @arg_source 

450 @arg_target 

451 def rename(source, target): 

452 self.rename(source, target) 

453 

454 @base.command 

455 @arg_path 

456 def remove_all(path): 

457 self.remove_all(path) 

458 

459 @base.command 

460 @arg_path 

461 def create_dir(path): 

462 self.create_dir(path) 

463 

464 @base.command 

465 @arg_path 

466 def delete(path): 

467 self.delete(path) 

468 

469 @base.command 

470 @arg_path 

471 def exists(path): 

472 if not self.exists(path): 

473 raise click.ClickException(f"path {path} does not exist") 

474 

475 @base.command 

476 @arg_path 

477 def list(path): 

478 for entry in self.list(path): 

479 click.echo(entry) 

480 

481 @base.command 

482 @arg_path 

483 def scan(path): 

484 for entry in self.scan(path): 

485 click.echo(entry) 

486 

487 @base.command 

488 @arg_path 

489 @opt_expire_second 

490 def presign_stat(path, expire_second): 

491 click.echo(self.presign_stat(path, expire_second).model_dump_json(indent=2)) 

492 

493 @base.command 

494 @arg_path 

495 @opt_expire_second 

496 def presign_read(path, expire_second): 

497 click.echo(self.presign_read(path, expire_second).model_dump_json(indent=2)) 

498 

499 @base.command 

500 @arg_path 

501 @opt_expire_second 

502 def presign_write(path, expire_second): 

503 click.echo(self.presign_write(path, expire_second).model_dump_json(indent=2)) 

504 

505 @base.command 

506 def capability(): 

507 click.echo(self.capability().model_dump_json(indent=2)) 

508 

509 return base 

510 

511 

512class FlasherClientInterface(metaclass=ABCMeta): 

513 @abstractmethod 

514 def flash( 

515 self, 

516 path: PathBuf, 

517 *, 

518 partition: str | None = None, 

519 operator: Operator | None = None, 

520 ): 

521 """Flash image to DUT""" 

522 ... 

523 

524 @abstractmethod 

525 def dump(self, path: PathBuf, *, partition: str | None = None, operator: Operator | None = None): 

526 """Dump image from DUT""" 

527 ... 

528 

529 def cli(self): 

530 @click.group 

531 def base(): 

532 """Generic flasher interface""" 

533 pass 

534 

535 @base.command() 

536 @click.argument("file") 

537 @click.option("--partition", type=str) 

538 def flash(file, partition): 

539 """Flash image to DUT from file""" 

540 self.flash(file, partition=partition) 

541 

542 @base.command() 

543 @click.argument("file") 

544 @click.option("--partition", type=str) 

545 def dump(file, partition): 

546 """Dump image from DUT to file""" 

547 self.dump(file, partition=partition) 

548 

549 return base 

550 

551 

552class FlasherClient(FlasherClientInterface, DriverClient): 

553 def flash( 

554 self, 

555 path: PathBuf, 

556 *, 

557 partition: str | None = None, 

558 operator: Operator | None = None, 

559 ): 

560 """Flash image to DUT""" 

561 if operator is None: 

562 path, operator, _ = operator_for_path(path) 

563 

564 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb") as handle: 

565 return self.call("flash", handle, partition) 

566 

567 def dump( 

568 self, 

569 path: PathBuf, 

570 *, 

571 partition: str | None = None, 

572 operator: Operator | None = None, 

573 ): 

574 """Dump image from DUT""" 

575 if operator is None: 

576 path, operator, _ = operator_for_path(path) 

577 

578 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle: 

579 return self.call("dump", handle, partition) 

580 

581 

582class StorageMuxClient(DriverClient): 

583 def host(self): 

584 """Connect storage to host""" 

585 return self.call("host") 

586 

587 def dut(self): 

588 """Connect storage to dut""" 

589 return self.call("dut") 

590 

591 def off(self): 

592 """Disconnect storage""" 

593 return self.call("off") 

594 

595 def write(self, handle): 

596 return self.call("write", handle) 

597 

598 def read(self, handle): 

599 return self.call("read", handle) 

600 

601 def write_file(self, operator: Operator, path: str): 

602 with OpendalAdapter(client=self, operator=operator, path=path) as handle: 

603 return self.write(handle) 

604 

605 def read_file(self, operator: Operator, path: str): 

606 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle: 

607 return self.read(handle) 

608 

609 def write_local_file(self, filepath): 

610 """Write a local file to the storage device""" 

611 absolute = Path(filepath).resolve() 

612 return self.write_file(operator=Operator("fs", root="/"), path=str(absolute)) 

613 

614 def read_local_file(self, filepath): 

615 """Read into a local file from the storage device""" 

616 absolute = Path(filepath).resolve() 

617 return self.read_file(operator=Operator("fs", root="/"), path=str(absolute)) 

618 

619 def cli(self, base=None): 

620 if base is None: 

621 base = click.group(lambda: None) 

622 

623 @base.command() 

624 def host(): 

625 """Connect storage to host""" 

626 self.host() 

627 

628 @base.command() 

629 def dut(): 

630 """Connect storage to dut""" 

631 self.dut() 

632 

633 @base.command() 

634 def off(): 

635 """Disconnect storage""" 

636 self.off() 

637 

638 @base.command() 

639 @click.argument("file") 

640 def write_local_file(file): 

641 self.write_local_file(file) 

642 

643 return base 

644 

645 

646class StorageMuxFlasherClient(FlasherClient, StorageMuxClient): 

647 def flash( 

648 self, 

649 path: PathBuf, 

650 *, 

651 partition: str | None = None, 

652 operator: Operator | None = None, 

653 ): 

654 """Flash image to DUT""" 

655 if partition is not None: 

656 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided") 

657 

658 self.host() 

659 

660 if operator is None: 

661 path, operator, _ = operator_for_path(path) 

662 

663 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb") as handle: 

664 try: 

665 return self.write(handle) 

666 finally: 

667 self.dut() 

668 

669 def dump( 

670 self, 

671 path: PathBuf, 

672 *, 

673 partition: str | None = None, 

674 operator: Operator | None = None, 

675 ): 

676 """Dump image from DUT""" 

677 if partition is not None: 

678 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided") 

679 

680 self.call("host") 

681 

682 if operator is None: 

683 path, operator, _ = operator_for_path(path) 

684 

685 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle: 

686 try: 

687 return self.call("read", handle) 

688 finally: 

689 self.call("dut") 

690 

691 def cli(self): 

692 top_cli = FlasherClient.cli(self) 

693 return StorageMuxClient.cli(self, top_cli)