Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py: 39%
349 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
1from __future__ import annotations
3from abc import ABCMeta, abstractmethod
4from collections.abc import Generator
5from contextlib import closing
6from dataclasses import dataclass
7from io import BytesIO
8from pathlib import Path
9from urllib.parse import urlparse
10from uuid import UUID
12import click
13from anyio import EndOfStream
14from anyio.abc import ObjectStream
15from opendal import Operator
16from pydantic import ConfigDict, validate_call
18from .adapter import OpendalAdapter
19from .common import Capability, HashAlgo, Metadata, Mode, PathBuf, PresignedRequest
20from jumpstarter.client import DriverClient
21from jumpstarter.common.exceptions import ArgumentError
22from jumpstarter.streams.encoding import Compression
25@dataclass(kw_only=True)
26class BytesIOStream(ObjectStream[bytes]):
27 buf: BytesIO
29 async def send(self, item: bytes):
30 self.buf.write(item)
32 async def receive(self) -> bytes:
33 item = self.buf.read(65535)
34 if len(item) == 0:
35 raise EndOfStream
36 return item
38 async def send_eof(self):
39 pass
41 async def aclose(self):
42 pass
45def operator_for_path(path: PathBuf) -> tuple[PathBuf, Operator, str]:
46 """Create an operator for the given path
47 Return a tuple of:
48 - the path
49 - the operator for the given path
50 - the scheme of the operator.
51 """
52 if type(path) is str and path.startswith(("http://", "https://")):
53 parsed_url = urlparse(path)
54 operator = Operator("http", root="/", endpoint=f"{parsed_url.scheme}://{parsed_url.netloc}")
55 return Path(parsed_url.path), operator, "http"
56 else:
57 return Path(path).resolve(), Operator("fs", root="/"), "fs"
60@dataclass(kw_only=True)
61class OpendalFile:
62 """
63 A file-like object representing a remote file
64 """
66 client: OpendalClient
67 fd: UUID
69 def __write(self, handle):
70 return self.client.call("file_write", self.fd, handle)
72 def __read(self, handle):
73 return self.client.call("file_read", self.fd, handle)
75 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
76 def write_from_path(self, path: PathBuf, operator: Operator | None = None):
77 """
78 Write into remote file with content from local file
79 """
80 if operator is None:
81 path, operator, _ = operator_for_path(path)
83 with OpendalAdapter(client=self.client, operator=operator, path=path) as handle:
84 return self.__write(handle)
86 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
87 def read_into_path(self, path: PathBuf, operator: Operator | None = None):
88 """
89 Read content from remote file into local file
90 """
91 if operator is None:
92 path, operator, _ = operator_for_path(path)
94 with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle:
95 return self.__read(handle)
97 @validate_call(validate_return=True)
98 def write_bytes(self, data: bytes) -> None:
99 buf = BytesIO(data)
100 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream:
101 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle:
102 self.__write(handle)
104 @validate_call(validate_return=True)
105 def read_bytes(self) -> bytes:
106 buf = BytesIO()
107 with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream:
108 with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle:
109 self.__read(handle)
110 return buf.getvalue()
112 @validate_call(validate_return=True)
113 def seek(self, pos: int, whence: int = 0) -> int:
114 """
115 Change the cursor position to the given byte offset.
116 Offset is interpreted relative to the position indicated by whence.
117 The default value for whence is SEEK_SET. Values for whence are:
119 SEEK_SET or 0 – start of the file (the default); offset should be zero or positive
121 SEEK_CUR or 1 – current cursor position; offset may be negative
123 SEEK_END or 2 – end of the file; offset is usually negative
125 Return the new cursor position
126 """
127 return self.client.call("file_seek", self.fd, pos, whence)
129 @validate_call(validate_return=True)
130 def tell(self) -> int:
131 """
132 Return the current cursor position
133 """
134 return self.client.call("file_tell", self.fd)
136 @validate_call(validate_return=True)
137 def close(self) -> None:
138 """
139 Close the file
140 """
141 return self.client.call("file_close", self.fd)
143 @property
144 @validate_call(validate_return=True)
145 def closed(self) -> bool:
146 """
147 Check if the file is closed
148 """
149 return self.client.call("file_closed", self.fd)
151 @validate_call(validate_return=True)
152 def readable(self) -> bool:
153 """
154 Check if the file is readable
155 """
156 return self.client.call("file_readable", self.fd)
158 @validate_call(validate_return=True)
159 def seekable(self) -> bool:
160 """
161 Check if the file is seekable
162 """
163 return self.client.call("file_seekable", self.fd)
165 @validate_call(validate_return=True)
166 def writable(self) -> bool:
167 """
168 Check if the file is writable
169 """
170 return self.client.call("file_writable", self.fd)
173class OpendalClient(DriverClient):
174 @validate_call(validate_return=True)
175 def write_bytes(self, /, path: PathBuf, data: bytes) -> None:
176 """
177 Write data into path
179 >>> opendal.write_bytes("file.txt", b"content")
180 """
181 with closing(self.open(path, "wb")) as f:
182 f.write_bytes(data)
184 @validate_call(validate_return=True)
185 def read_bytes(self, /, path: PathBuf) -> bytes:
186 """
187 Read data from path
189 >>> opendal.write_bytes("file.txt", b"content")
190 >>> opendal.read_bytes("file.txt")
191 b'content'
192 """
193 with closing(self.open(path, "rb")) as f:
194 return f.read_bytes()
196 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
197 def write_from_path(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> None:
198 """
199 Write data from src into dst
201 >>> _ = (tmp / "src").write_bytes(b"content")
202 >>> opendal.write_from_path("file.txt", tmp / "src")
203 >>> opendal.read_bytes("file.txt")
204 b'content'
205 """
206 with closing(self.open(dst, "wb")) as f:
207 f.write_from_path(src, operator)
209 @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True))
210 def read_into_path(self, src: PathBuf, dst: PathBuf, operator: Operator | None = None) -> None:
211 """
212 Read data into dst from src
214 >>> opendal.write_bytes("file.txt", b"content")
215 >>> opendal.read_into_path("file.txt", tmp / "dst")
216 >>> (tmp / "dst").read_bytes()
217 b'content'
218 """
219 with closing(self.open(src, "rb")) as f:
220 f.read_into_path(dst, operator)
222 @validate_call
223 def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile:
224 """
225 Open a file-like reader for the given path
227 >>> file = opendal.open("file.txt", "wb")
228 >>> file.write_bytes(b"content")
229 >>> file.close()
230 """
231 return OpendalFile(client=self, fd=self.call("open", path, mode))
233 @validate_call(validate_return=True)
234 def stat(self, /, path: PathBuf) -> Metadata:
235 """
236 Get current path's metadata
238 >>> opendal.write_bytes("file.txt", b"content")
239 >>> opendal.stat("file.txt").mode.is_file()
240 True
241 """
242 return self.call("stat", path)
244 @validate_call(validate_return=True)
245 def hash(self, /, path: PathBuf, algo: HashAlgo = "sha256") -> str:
246 """
247 Get current path's hash
249 >>> opendal.write_bytes("file.txt", b"content")
250 >>> opendal.hash("file.txt")
251 'ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73'
252 """
253 return self.call("hash", path, algo)
255 @validate_call(validate_return=True)
256 def copy(self, /, source: PathBuf, target: PathBuf):
257 """
258 Copy source to target
260 >>> opendal.write_bytes("file.txt", b"content")
261 >>> opendal.copy("file.txt", "copy.txt")
262 >>> opendal.exists("copy.txt")
263 True
264 """
265 self.call("copy", source, target)
267 @validate_call(validate_return=True)
268 def rename(self, /, source: PathBuf, target: PathBuf):
269 """
270 Rename source to target
272 >>> opendal.write_bytes("file.txt", b"content")
273 >>> opendal.rename("file.txt", "rename.txt")
274 >>> opendal.exists("file.txt")
275 False
276 >>> opendal.exists("rename.txt")
277 True
278 """
279 self.call("rename", source, target)
281 @validate_call(validate_return=True)
282 def remove_all(self, /, path: PathBuf):
283 """
284 Remove all file under path
286 >>> opendal.write_bytes("dir/file.txt", b"content")
287 >>> opendal.remove_all("dir/")
288 >>> opendal.exists("dir/file.txt")
289 False
290 """
291 self.call("remove_all", path)
293 @validate_call(validate_return=True)
294 def create_dir(self, /, path: PathBuf):
295 """
296 Create a dir at given path
298 To indicate that a path is a directory, it is compulsory to include a trailing / in the path.
300 Create on existing dir will succeed.
301 Create dir is always recursive, works like mkdir -p.
303 >>> opendal.create_dir("a/b/c/")
304 >>> opendal.exists("a/b/c/")
305 True
306 """
307 self.call("create_dir", path)
309 @validate_call(validate_return=True)
310 def delete(self, /, path: PathBuf):
311 """
312 Delete given path
314 Delete not existing error won't return errors
316 >>> opendal.write_bytes("file.txt", b"content")
317 >>> opendal.exists("file.txt")
318 True
319 >>> opendal.delete("file.txt")
320 >>> opendal.exists("file.txt")
321 False
322 """
323 self.call("delete", path)
325 @validate_call(validate_return=True)
326 def exists(self, /, path: PathBuf) -> bool:
327 """
328 Check if given path exists
330 >>> opendal.exists("file.txt")
331 False
332 >>> opendal.write_bytes("file.txt", b"content")
333 >>> opendal.exists("file.txt")
334 True
335 """
336 return self.call("exists", path)
338 @validate_call
339 def list(self, /, path: PathBuf) -> Generator[str, None, None]:
340 """
341 List files and directories under given path
343 >>> opendal.write_bytes("dir/file.txt", b"content")
344 >>> opendal.write_bytes("dir/another.txt", b"content")
345 >>> sorted(opendal.list("dir/"))
346 ['dir/', 'dir/another.txt', 'dir/file.txt']
347 """
348 yield from self.streamingcall("list", path)
350 @validate_call
351 def scan(self, /, path: PathBuf) -> Generator[str, None, None]:
352 """
353 List files and directories under given path recursively
355 >>> opendal.write_bytes("dir/a/file.txt", b"content")
356 >>> opendal.write_bytes("dir/b/another.txt", b"content")
357 >>> sorted(opendal.scan("dir/"))
358 ['dir/', 'dir/a/', 'dir/a/file.txt', 'dir/b/', 'dir/b/another.txt']
359 """
360 yield from self.streamingcall("scan", path)
362 @validate_call(validate_return=True)
363 def presign_stat(self, /, path: PathBuf, expire_second: int) -> PresignedRequest:
364 """
365 Presign an operation for stat (HEAD) which expires after expire_second seconds
366 """
367 return self.call("presign_stat", path, expire_second)
369 @validate_call(validate_return=True)
370 def presign_read(self, /, path: PathBuf, expire_second: int) -> PresignedRequest:
371 """
372 Presign an operation for read (GET) which expires after expire_second seconds
373 """
374 return self.call("presign_read", path, expire_second)
376 @validate_call(validate_return=True)
377 def presign_write(self, /, path: PathBuf, expire_second: int) -> PresignedRequest:
378 """
379 Presign an operation for write (PUT) which expires after expire_second seconds
380 """
381 return self.call("presign_write", path, expire_second)
383 @validate_call(validate_return=True)
384 def capability(self, /) -> Capability:
385 """
386 Get capabilities of the underlying storage
388 >>> cap = opendal.capability()
389 >>> cap.copy
390 True
391 >>> cap.presign_read
392 False
393 """
394 return self.call("capability")
396 def cli(self): # noqa: C901
397 arg_path = click.argument("path", type=click.Path())
398 arg_source = click.argument("source", type=click.Path())
399 arg_target = click.argument("target", type=click.Path())
400 arg_src = click.argument("src", type=click.Path())
401 arg_dst = click.argument("dst", type=click.Path())
402 opt_expire_second = click.option("--expire-second", type=int, required=True)
404 @click.group
405 def base():
406 """Opendal Storage"""
408 @base.command
409 @arg_path
410 def write_bytes(path):
411 data = click.get_binary_stream("stdin").read()
412 self.write_bytes(path, data)
414 @base.command
415 @arg_path
416 def read_bytes(path):
417 data = self.read_bytes(path)
418 click.echo(data, nl=False)
420 @base.command
421 @arg_dst
422 @arg_src
423 def write_from_path(dst, src):
424 self.write_from_path(dst, src)
426 @base.command
427 @arg_src
428 @arg_dst
429 def read_into_path(src, dst):
430 self.read_into_path(src, dst)
432 @base.command
433 @arg_path
434 def stat(path):
435 click.echo(self.stat(path).model_dump_json(indent=2, by_alias=True))
437 @base.command
438 @arg_path
439 @click.option("--algo", type=click.Choice(["md5", "sha256"]))
440 def hash(path, algo):
441 click.echo(self.hash(path, algo))
443 @base.command
444 @arg_source
445 @arg_target
446 def copy(source, target):
447 self.copy(source, target)
449 @base.command
450 @arg_source
451 @arg_target
452 def rename(source, target):
453 self.rename(source, target)
455 @base.command
456 @arg_path
457 def remove_all(path):
458 self.remove_all(path)
460 @base.command
461 @arg_path
462 def create_dir(path):
463 self.create_dir(path)
465 @base.command
466 @arg_path
467 def delete(path):
468 self.delete(path)
470 @base.command
471 @arg_path
472 def exists(path):
473 if not self.exists(path):
474 raise click.ClickException(f"path {path} does not exist")
476 @base.command
477 @arg_path
478 def list(path):
479 for entry in self.list(path):
480 click.echo(entry)
482 @base.command
483 @arg_path
484 def scan(path):
485 for entry in self.scan(path):
486 click.echo(entry)
488 @base.command
489 @arg_path
490 @opt_expire_second
491 def presign_stat(path, expire_second):
492 click.echo(self.presign_stat(path, expire_second).model_dump_json(indent=2))
494 @base.command
495 @arg_path
496 @opt_expire_second
497 def presign_read(path, expire_second):
498 click.echo(self.presign_read(path, expire_second).model_dump_json(indent=2))
500 @base.command
501 @arg_path
502 @opt_expire_second
503 def presign_write(path, expire_second):
504 click.echo(self.presign_write(path, expire_second).model_dump_json(indent=2))
506 @base.command
507 def capability():
508 click.echo(self.capability().model_dump_json(indent=2))
510 return base
513class FlasherClientInterface(metaclass=ABCMeta):
514 @abstractmethod
515 def flash(
516 self,
517 path: PathBuf,
518 *,
519 partition: str | None = None,
520 operator: Operator | None = None,
521 compression: Compression | None = None,
522 ):
523 """Flash image to DUT"""
524 ...
526 @abstractmethod
527 def dump(
528 self,
529 path: PathBuf,
530 *,
531 partition: str | None = None,
532 operator: Operator | None = None,
533 compression: Compression | None = None,
534 ):
535 """Dump image from DUT"""
536 ...
538 def cli(self):
539 @click.group
540 def base():
541 """Generic flasher interface"""
542 pass
544 @base.command()
545 @click.argument("file")
546 @click.option("--partition", type=str)
547 @click.option("--compression", type=click.Choice(Compression, case_sensitive=False))
548 def flash(file, partition, compression):
549 """Flash image to DUT from file"""
550 self.flash(file, partition=partition, compression=compression)
552 @base.command()
553 @click.argument("file")
554 @click.option("--partition", type=str)
555 @click.option("--compression", type=click.Choice(Compression, case_sensitive=False))
556 def dump(file, partition, compression):
557 """Dump image from DUT to file"""
558 self.dump(file, partition=partition, compression=compression)
560 return base
563class FlasherClient(FlasherClientInterface, DriverClient):
564 def flash(
565 self,
566 path: PathBuf,
567 *,
568 partition: str | None = None,
569 operator: Operator | None = None,
570 compression: Compression | None = None,
571 ):
572 """Flash image to DUT"""
573 if operator is None:
574 path, operator, _ = operator_for_path(path)
576 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb", compression=compression) as handle:
577 return self.call("flash", handle, partition)
579 def dump(
580 self,
581 path: PathBuf,
582 *,
583 partition: str | None = None,
584 operator: Operator | None = None,
585 compression: Compression | None = None,
586 ):
587 """Dump image from DUT"""
588 if operator is None:
589 path, operator, _ = operator_for_path(path)
591 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb", compression=compression) as handle:
592 return self.call("dump", handle, partition)
595class StorageMuxClient(DriverClient):
596 def host(self):
597 """Connect storage to host"""
598 return self.call("host")
600 def dut(self):
601 """Connect storage to dut"""
602 return self.call("dut")
604 def off(self):
605 """Disconnect storage"""
606 return self.call("off")
608 def write(self, handle):
609 return self.call("write", handle)
611 def read(self, handle):
612 return self.call("read", handle)
614 def write_file(self, operator: Operator, path: str):
615 with OpendalAdapter(client=self, operator=operator, path=path) as handle:
616 return self.write(handle)
618 def read_file(self, operator: Operator, path: str):
619 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb") as handle:
620 return self.read(handle)
622 def write_local_file(self, filepath):
623 """Write a local file to the storage device"""
624 absolute = Path(filepath).resolve()
625 return self.write_file(operator=Operator("fs", root="/"), path=str(absolute))
627 def read_local_file(self, filepath):
628 """Read into a local file from the storage device"""
629 absolute = Path(filepath).resolve()
630 return self.read_file(operator=Operator("fs", root="/"), path=str(absolute))
632 def cli(self, base=None):
633 if base is None:
634 base = click.group(lambda: None)
636 @base.command()
637 def host():
638 """Connect storage to host"""
639 self.host()
641 @base.command()
642 def dut():
643 """Connect storage to dut"""
644 self.dut()
646 @base.command()
647 def off():
648 """Disconnect storage"""
649 self.off()
651 @base.command()
652 @click.argument("file")
653 def write_local_file(file):
654 self.write_local_file(file)
656 return base
659class StorageMuxFlasherClient(FlasherClient, StorageMuxClient):
660 def flash(
661 self,
662 path: PathBuf,
663 *,
664 partition: str | None = None,
665 operator: Operator | None = None,
666 compression: Compression | None = None,
667 ):
668 """Flash image to DUT"""
669 if partition is not None:
670 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided")
672 self.host()
674 if operator is None:
675 path, operator, _ = operator_for_path(path)
677 with OpendalAdapter(client=self, operator=operator, path=path, mode="rb", compression=compression) as handle:
678 try:
679 return self.write(handle)
680 finally:
681 self.dut()
683 def dump(
684 self,
685 path: PathBuf,
686 *,
687 partition: str | None = None,
688 operator: Operator | None = None,
689 compression: Compression | None = None,
690 ):
691 """Dump image from DUT"""
692 if partition is not None:
693 raise ArgumentError(f"partition is not supported for StorageMuxFlasherClient, {partition} provided")
695 self.call("host")
697 if operator is None:
698 path, operator, _ = operator_for_path(path)
700 with OpendalAdapter(client=self, operator=operator, path=path, mode="wb", compression=compression) as handle:
701 try:
702 return self.call("read", handle)
703 finally:
704 self.call("dut")
706 def cli(self):
707 top_cli = FlasherClient.cli(self)
708 return StorageMuxClient.cli(self, top_cli)