Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py: 39%
346 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1from __future__ import annotations
3from abc import ABCMeta, abstractmethod
4from collections.abc import Generator
5from contextlib import closing
6from dataclasses import dataclass
7from io import BytesIO
8from pathlib import Path
9from urllib.parse import urlparse
10from uuid import UUID
12import asyncclick as click
13from anyio import EndOfStream
14from anyio.abc import ObjectStream
15from opendal import Operator
16from pydantic import ConfigDict, validate_call
18from .adapter import OpendalAdapter
19from .common import Capability, HashAlgo, Metadata, Mode, PathBuf, PresignedRequest
20from jumpstarter.client import DriverClient
21from jumpstarter.common.exceptions import ArgumentError
24@dataclass(kw_only=True)
25class BytesIOStream(ObjectStream[bytes]):
26 buf: BytesIO
28 async def send(self, item: bytes):
29 self.buf.write(item)
31 async def receive(self) -> bytes:
32 item = self.buf.read(65535)
33 if len(item) == 0:
34 raise EndOfStream
35 return item
37 async def send_eof(self):
38 pass
40 async def aclose(self):
41 pass
44def operator_for_path(path: PathBuf) -> tuple[PathBuf, Operator, str]:
45 """Create an operator for the given path
46 Return a tuple of:
47 - the path
48 - the operator for the given path
49 - the scheme of the operator.
50 """
51 if type(path) is str and path.startswith(("http://", "https://")):
52 parsed_url = urlparse(path)
53 operator = Operator("http", root="/", endpoint=f"{parsed_url.scheme}://{parsed_url.netloc}")
54 return Path(parsed_url.path), operator, "http"
55 else:
56 return Path(path).resolve(), Operator("fs", root="/"), "fs"
59@dataclass(kw_only=True)
60class OpendalFile:
61 """
62 A file-like object representing a remote file
63 """
65 client: OpendalClient
66 fd: UUID
68 def __write(self, handle):
69 return self.client.call("file_write", self.fd, handle)
71 def __read(self, handle):
72 return self.client.call("file_read", self.fd, handle)
74 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
75 def write_from_path(self, path: PathBuf, operator: Operator | None = None):
76 """
77 Write into remote file with content from local file
78 """
79 if operator is None:
80 path, operator, _ = operator_for_path(path)
82 with OpendalAdapter(client=self.client, operator=operator, path=path) as handle:
83 return self.__write(handle)
85 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
86 def read_into_path(self, path: PathBuf, operator: Operator | None = None):
87 """
88 Read content from remote file into local file
89 """
90 if operator is None:
91 path, operator, _ = operator_for_path(path)
93 with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle:
94 return self.__read(handle)
96 @validate_call(validate_return=True)
97 def write_bytes(self, data: bytes) -> None:
98 buf = BytesIO(data)
99 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream:
100 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle:
101 self.__write(handle)
103 @validate_call(validate_return=True)
104 def read_bytes(self) -> bytes:
105 buf = BytesIO()
106 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream:
107 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle:
108 self.__read(handle)
109 return buf.getvalue()
111 @validate_call(validate_return=True)
112 def seek(self, pos: int, whence: int = 0) -> int:
113 """
114 Change the cursor position to the given byte offset.
115 Offset is interpreted relative to the position indicated by whence.
116 The default value for whence is SEEK_SET. Values for whence are:
118 SEEK_SET or 0 – start of the file (the default); offset should be zero or positive
120 SEEK_CUR or 1 – current cursor position; offset may be negative
122 SEEK_END or 2 – end of the file; offset is usually negative
124 Return the new cursor position
125 """
126 return self.client.call("file_seek", self.fd, pos, whence)
128 @validate_call(validate_return=True)
129 def tell(self) -> int:
130 """
131 Return the current cursor position
132 """
133 return self.client.call("file_tell", self.fd)
135 @validate_call(validate_return=True)
136 def close(self) -> None:
137 """
138 Close the file
139 """
140 return self.client.call("file_close", self.fd)
142 @property
143 @validate_call(validate_return=True)
144 def closed(self) -> bool:
145 """
146 Check if the file is closed
147 """
148 return self.client.call("file_closed", self.fd)
150 @validate_call(validate_return=True)
151 def readable(self) -> bool:
152 """
153 Check if the file is readable
154 """
155 return self.client.call("file_readable", self.fd)
157 @validate_call(validate_return=True)
158 def seekable(self) -> bool:
159 """
160 Check if the file is seekable
161 """
162 return self.client.call("file_seekable", self.fd)
164 @validate_call(validate_return=True)
165 def writable(self) -> bool:
166 """
167 Check if the file is writable
168 """
169 return self.client.call("file_writable", self.fd)
172class OpendalClient(DriverClient):
173 @validate_call(validate_return=True)
174 def write_bytes(self, /, path: PathBuf, data: bytes) -> None:
175 """
176 Write data into path
178 >>> opendal.write_bytes("file.txt", b"content")
179 """
180 with closing(self.open(path, "wb")) as f:
181 f.write_bytes(data)
183 @validate_call(validate_return=True)
184 def read_bytes(self, /, path: PathBuf) -> bytes:
185 """
186 Read data from path
188 >>> opendal.write_bytes("file.txt", b"content")
189 >>> opendal.read_bytes("file.txt")
190 b'content'
191 """
192 with closing(self.open(path, "rb")) as f:
193 return f.read_bytes()
195 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
196 def write_from_path(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> None:
197 """
198 Write data from src into dst
200 >>> _ = (tmp / "src").write_bytes(b"content")
201 >>> opendal.write_from_path("file.txt", tmp / "src")
202 >>> opendal.read_bytes("file.txt")
203 b'content'
204 """
205 with closing(self.open(dst, "wb")) as f:
206 f.write_from_path(src, operator)
208 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
209 def read_into_path(self, src: PathBuf, dst: PathBuf, operator: Operator | None = None) -> None:
210 """
211 Read data into dst from src
213 >>> opendal.write_bytes("file.txt", b"content")
214 >>> opendal.read_into_path("file.txt", tmp / "dst")
215 >>> (tmp / "dst").read_bytes()
216 b'content'
217 """
218 with closing(self.open(src, "rb")) as f:
219 f.read_into_path(dst, operator)
221 @validate_call
222 def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile:
223 """
224 Open a file-like reader for the given path
226 >>> file = opendal.open("file.txt", "wb")
227 >>> file.write_bytes(b"content")
228 >>> file.close()
229 """
230 return OpendalFile(client=self, fd=self.call("open", path, mode))
232 @validate_call(validate_return=True)
233 def stat(self, /, path: PathBuf) -> Metadata:
234 """
235 Get current path's metadata
237 >>> opendal.write_bytes("file.txt", b"content")
238 >>> opendal.stat("file.txt").mode.is_file()
239 True
240 """
241 return self.call("stat", path)
243 @validate_call(validate_return=True)
244 def hash(self, /, path: PathBuf, algo: HashAlgo = "sha256") -> str:
245 """
246 Get current path's hash
248 >>> opendal.write_bytes("file.txt", b"content")
249 >>> opendal.hash("file.txt")
250 'ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73'
251 """
252 return self.call("hash", path, algo)
254 @validate_call(validate_return=True)
255 def copy(self, /, source: PathBuf, target: PathBuf):
256 """
257 Copy source to target
259 >>> opendal.write_bytes("file.txt", b"content")
260 >>> opendal.copy("file.txt", "copy.txt")
261 >>> opendal.exists("copy.txt")
262 True
263 """
264 self.call("copy", source, target)
266 @validate_call(validate_return=True)
267 def rename(self, /, source: PathBuf, target: PathBuf):
268 """
269 Rename source to target
271 >>> opendal.write_bytes("file.txt", b"content")
272 >>> opendal.rename("file.txt", "rename.txt")
273 >>> opendal.exists("file.txt")
274 False
275 >>> opendal.exists("rename.txt")
276 True
277 """
278 self.call("rename", source, target)
280 @validate_call(validate_return=True)
281 def remove_all(self, /, path: PathBuf):
282 """
283 Remove all file under path
285 >>> opendal.write_bytes("dir/file.txt", b"content")
286 >>> opendal.remove_all("dir/")
287 >>> opendal.exists("dir/file.txt")
288 False
289 """
290 self.call("remove_all", path)
292 @validate_call(validate_return=True)
293 def create_dir(self, /, path: PathBuf):
294 """
295 Create a dir at given path
297 To indicate that a path is a directory, it is compulsory to include a trailing / in the path.
299 Create on existing dir will succeed.
300 Create dir is always recursive, works like mkdir -p.
302 >>> opendal.create_dir("a/b/c/")
303 >>> opendal.exists("a/b/c/")
304 True
305 """
306 self.call("create_dir", path)
308 @validate_call(validate_return=True)
309 def delete(self, /, path: PathBuf):
310 """
311 Delete given path
313 Delete not existing error won't return errors
315 >>> opendal.write_bytes("file.txt", b"content")
316 >>> opendal.exists("file.txt")
317 True
318 >>> opendal.delete("file.txt")
319 >>> opendal.exists("file.txt")
320 False
321 """
322 self.call("delete", path)
324 @validate_call(validate_return=True)
325 def exists(self, /, path: PathBuf) -> bool:
326 """
327 Check if given path exists
329 >>> opendal.exists("file.txt")
330 False
331 >>> opendal.write_bytes("file.txt", b"content")
332 >>> opendal.exists("file.txt")
333 True
334 """
335 return self.call("exists", path)
337 @validate_call
338 def list(self, /, path: PathBuf) -> Generator[str, None, None]:
339 """
340 List files and directories under given path
342 >>> opendal.write_bytes("dir/file.txt", b"content")
343 >>> opendal.write_bytes("dir/another.txt", b"content")
344 >>> sorted(opendal.list("dir/"))
345 ['dir/', 'dir/another.txt', 'dir/file.txt']
346 """
347 yield from self.streamingcall("list", path)
349 @validate_call
350 def scan(self, /, path: PathBuf) -> Generator[str, None, None]:
351 """
352 List files and directories under given path recursively
354 >>> opendal.write_bytes("dir/a/file.txt", b"content")
355 >>> opendal.write_bytes("dir/b/another.txt", b"content")
356 >>> sorted(opendal.scan("dir/"))
357 ['dir/', 'dir/a/', 'dir/a/file.txt', 'dir/b/', 'dir/b/another.txt']
358 """
359 yield from self.streamingcall("scan", path)
361 @validate_call(validate_return=True)
362 def presign_stat(self, /, path: PathBuf, expire_second: int) -> PresignedRequest:
363 """
364 Presign an operation for stat (HEAD) which expires after expire_second seconds
365 """
366 return self.call("presign_stat", path, expire_second)
368 @validate_call(validate_return=True)
369 def presign_read(self, /, path: PathBuf, expire_second: int) -> PresignedRequest:
370 """
371 Presign an operation for read (GET) which expires after expire_second seconds
372 """
373 return self.call("presign_read", path, expire_second)
375 @validate_call(validate_return=True)
376 def presign_write(self, /, path: PathBuf, expire_second: int) -> PresignedRequest:
377 """
378 Presign an operation for write (PUT) which expires after expire_second seconds
379 """
380 return self.call("presign_write", path, expire_second)
382 @validate_call(validate_return=True)
383 def capability(self, /) -> Capability:
384 """
385 Get capabilities of the underlying storage
387 >>> cap = opendal.capability()
388 >>> cap.copy
389 True
390 >>> cap.presign_read
391 False
392 """
393 return self.call("capability")
395 def cli(self): # noqa: C901
396 arg_path = click.argument("path", type=click.Path())
397 arg_source = click.argument("source", type=click.Path())
398 arg_target = click.argument("target", type=click.Path())
399 arg_src = click.argument("src", type=click.Path())
400 arg_dst = click.argument("dst", type=click.Path())
401 opt_expire_second = click.option("--expire-second", type=int, required=True)
403 @click.group
404 def base():
405 """Opendal Storage"""
407 @base.command
408 @arg_path
409 def write_bytes(path):
410 data = click.get_binary_stream("stdin").read()
411 self.write_bytes(path, data)
413 @base.command
414 @arg_path
415 def read_bytes(path):
416 data = self.read_bytes(path)
417 click.echo(data, nl=False)
419 @base.command
420 @arg_dst
421 @arg_src
422 def write_from_path(dst, src):
423 self.write_from_path(dst, src)
425 @base.command
426 @arg_src
427 @arg_dst
428 def read_into_path(src, dst):
429 self.read_into_path(src, dst)
431 @base.command
432 @arg_path
433 def stat(path):
434 click.echo(self.stat(path).model_dump_json(indent=2, by_alias=True))
436 @base.command
437 @arg_path
438 @click.option("--algo", type=click.Choice(["md5", "sha256"]))
439 def hash(path, algo):
440 click.echo(self.hash(path, algo))
442 @base.command
443 @arg_source
444 @arg_target
445 def copy(source, target):
446 self.copy(source, target)
448 @base.command
449 @arg_source
450 @arg_target
451 def rename(source, target):
452 self.rename(source, target)
454 @base.command
455 @arg_path
456 def remove_all(path):
457 self.remove_all(path)
459 @base.command
460 @arg_path
461 def create_dir(path):
462 self.create_dir(path)
464 @base.command
465 @arg_path
466 def delete(path):
467 self.delete(path)
469 @base.command
470 @arg_path
471 def exists(path):
472 if not self.exists(path):
473 raise click.ClickException(f"path {path} does not exist")
475 @base.command
476 @arg_path
477 def list(path):
478 for entry in self.list(path):
479 click.echo(entry)
481 @base.command
482 @arg_path
483 def scan(path):
484 for entry in self.scan(path):
485 click.echo(entry)
487 @base.command
488 @arg_path
489 @opt_expire_second
490 def presign_stat(path, expire_second):
491 click.echo(self.presign_stat(path, expire_second).model_dump_json(indent=2))
493 @base.command
494 @arg_path
495 @opt_expire_second
496 def presign_read(path, expire_second):
497 click.echo(self.presign_read(path, expire_second).model_dump_json(indent=2))
499 @base.command
500 @arg_path
501 @opt_expire_second
502 def presign_write(path, expire_second):
503 click.echo(self.presign_write(path, expire_second).model_dump_json(indent=2))
505 @base.command
506 def capability():
507 click.echo(self.capability().model_dump_json(indent=2))
509 return base
512class FlasherClientInterface(metaclass=ABCMeta):
513 @abstractmethod
514 def flash(
515 self,
516 path: PathBuf,
517 *,
518 partition: str | None = None,
519 operator: Operator | None = None,
520 ):
521 """Flash image to DUT"""
522 ...
524 @abstractmethod
525 def dump(self, path: PathBuf, *, partition: str | None = None, operator: Operator | None = None):
526 """Dump image from DUT"""
527 ...
529 def cli(self):
530 @click.group
531 def base():
532 """Generic flasher interface"""
533 pass
535 @base.command()
536 @click.argument("file")
537 @click.option("--partition", type=str)
538 def flash(file, partition):
539 """Flash image to DUT from file"""
540 self.flash(file, partition=partition)
542 @base.command()
543 @click.argument("file")
544 @click.option("--partition", type=str)
545 def dump(file, partition):
546 """Dump image from DUT to file"""
547 self.dump(file, partition=partition)
549 return base
552class FlasherClient(FlasherClientInterface, DriverClient):
553 def flash(
554 self,
555 path: PathBuf,
556 *,
557 partition: str | None = None,
558 operator: Operator | None = None,
559 ):
560 """Flash image to DUT"""
561 if operator is None:
562 path, operator, _ = operator_for_path(path)
564 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb") as handle:
565 return self.call("flash", handle, partition)
567 def dump(
568 self,
569 path: PathBuf,
570 *,
571 partition: str | None = None,
572 operator: Operator | None = None,
573 ):
574 """Dump image from DUT"""
575 if operator is None:
576 path, operator, _ = operator_for_path(path)
578 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle:
579 return self.call("dump", handle, partition)
582class StorageMuxClient(DriverClient):
583 def host(self):
584 """Connect storage to host"""
585 return self.call("host")
587 def dut(self):
588 """Connect storage to dut"""
589 return self.call("dut")
591 def off(self):
592 """Disconnect storage"""
593 return self.call("off")
595 def write(self, handle):
596 return self.call("write", handle)
598 def read(self, handle):
599 return self.call("read", handle)
601 def write_file(self, operator: Operator, path: str):
602 with OpendalAdapter(client=self, operator=operator, path=path) as handle:
603 return self.write(handle)
605 def read_file(self, operator: Operator, path: str):
606 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle:
607 return self.read(handle)
609 def write_local_file(self, filepath):
610 """Write a local file to the storage device"""
611 absolute = Path(filepath).resolve()
612 return self.write_file(operator=Operator("fs", root="/"), path=str(absolute))
614 def read_local_file(self, filepath):
615 """Read into a local file from the storage device"""
616 absolute = Path(filepath).resolve()
617 return self.read_file(operator=Operator("fs", root="/"), path=str(absolute))
619 def cli(self, base=None):
620 if base is None:
621 base = click.group(lambda: None)
623 @base.command()
624 def host():
625 """Connect storage to host"""
626 self.host()
628 @base.command()
629 def dut():
630 """Connect storage to dut"""
631 self.dut()
633 @base.command()
634 def off():
635 """Disconnect storage"""
636 self.off()
638 @base.command()
639 @click.argument("file")
640 def write_local_file(file):
641 self.write_local_file(file)
643 return base
646class StorageMuxFlasherClient(FlasherClient, StorageMuxClient):
647 def flash(
648 self,
649 path: PathBuf,
650 *,
651 partition: str | None = None,
652 operator: Operator | None = None,
653 ):
654 """Flash image to DUT"""
655 if partition is not None:
656 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided")
658 self.host()
660 if operator is None:
661 path, operator, _ = operator_for_path(path)
663 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb") as handle:
664 try:
665 return self.write(handle)
666 finally:
667 self.dut()
669 def dump(
670 self,
671 path: PathBuf,
672 *,
673 partition: str | None = None,
674 operator: Operator | None = None,
675 ):
676 """Dump image from DUT"""
677 if partition is not None:
678 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided")
680 self.call("host")
682 if operator is None:
683 path, operator, _ = operator_for_path(path)
685 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle:
686 try:
687 return self.call("read", handle)
688 finally:
689 self.call("dut")
691 def cli(self):
692 top_cli = FlasherClient.cli(self)
693 return StorageMuxClient.cli(self, top_cli)