Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver.py: 72%
223 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1import hashlib
2from abc import ABCMeta, abstractmethod
3from collections.abc import AsyncGenerator
4from dataclasses import dataclass, field
5from pathlib import Path
6from tempfile import NamedTemporaryFile, TemporaryDirectory, _TemporaryFileWrapper
7from typing import Any
8from uuid import UUID, uuid4
10from anyio.streams.file import FileReadStream, FileWriteStream
11from opendal import AsyncFile, AsyncOperator
12from pydantic import validate_call
14from .adapter import AsyncFileStream
15from .common import Capability, HashAlgo, Metadata, Mode, PresignedRequest
16from jumpstarter.driver import Driver, export
19@dataclass(kw_only=True)
20class Opendal(Driver):
21 scheme: str
22 kwargs: dict[str, str]
24 _operator: AsyncOperator = field(init=False)
25 _fds: dict[UUID, AsyncFile] = field(init=False, default_factory=dict)
27 @classmethod
28 def client(cls) -> str:
29 return "jumpstarter_driver_opendal.client.OpendalClient"
31 def __post_init__(self):
32 if hasattr(super(), "__post_init__"):
33 super().__post_init__()
35 self._operator = AsyncOperator(self.scheme, **self.kwargs)
37 @export
38 @validate_call(validate_return=True)
39 async def open(self, /, path: str, mode: Mode) -> UUID:
40 file = await self._operator.open(path, mode)
41 uuid = uuid4()
43 self._fds[uuid] = file
45 return uuid
47 @export
48 @validate_call(validate_return=True)
49 async def file_read(self, /, fd: UUID, dst: Any) -> None:
50 async with self.resource(dst) as res:
51 stream = AsyncFileStream(file=self._fds[fd])
52 async for chunk in stream:
53 await res.send(chunk)
55 @export
56 @validate_call(validate_return=True)
57 async def file_write(self, /, fd: UUID, src: Any) -> None:
58 async with self.resource(src) as res:
59 stream = AsyncFileStream(file=self._fds[fd])
60 async for chunk in res:
61 await stream.send(chunk)
63 @export
64 @validate_call(validate_return=True)
65 async def file_seek(self, /, fd: UUID, pos: int, whence: int = 0) -> int:
66 return await self._fds[fd].seek(pos, whence)
68 @export
69 @validate_call(validate_return=True)
70 async def file_tell(self, /, fd: UUID) -> int:
71 return await self._fds[fd].tell()
73 @export
74 @validate_call(validate_return=True)
75 async def file_close(self, /, fd: UUID) -> None:
76 await self._fds[fd].close()
78 @export
79 @validate_call(validate_return=True)
80 async def file_closed(self, /, fd: UUID) -> bool:
81 return await self._fds[fd].closed
83 @export
84 @validate_call(validate_return=True)
85 async def file_readable(self, /, fd: UUID) -> bool:
86 return await self._fds[fd].readable()
88 @export
89 @validate_call(validate_return=True)
90 async def file_seekable(self, /, fd: UUID) -> bool:
91 return await self._fds[fd].seekable()
93 @export
94 @validate_call(validate_return=True)
95 async def file_writable(self, /, fd: UUID) -> bool:
96 return await self._fds[fd].writable()
98 @export
99 @validate_call(validate_return=True)
100 async def stat(self, /, path: str) -> Metadata:
101 return Metadata.model_validate(await self._operator.stat(path), from_attributes=True)
103 @export
104 @validate_call(validate_return=True)
105 async def hash(self, /, path: str, algo: HashAlgo = "sha256") -> str:
106 match algo:
107 case "md5":
108 m = hashlib.md5()
109 case "sha256":
110 m = hashlib.sha256()
111 async with await self._operator.open(path, "rb") as f:
112 while True:
113 data = await f.read(size=65536)
114 if len(data) == 0:
115 break
116 m.update(data)
118 return m.hexdigest()
120 @export
121 @validate_call(validate_return=True)
122 async def copy(self, /, source: str, target: str):
123 await self._operator.copy(source, target)
125 @export
126 @validate_call(validate_return=True)
127 async def rename(self, /, source: str, target: str):
128 await self._operator.rename(source, target)
130 @export
131 @validate_call(validate_return=True)
132 async def remove_all(self, /, path: str):
133 await self._operator.remove_all(path)
135 @export
136 @validate_call(validate_return=True)
137 async def create_dir(self, /, path: str):
138 await self._operator.create_dir(path)
140 @export
141 @validate_call(validate_return=True)
142 async def delete(self, /, path: str):
143 await self._operator.delete(path)
145 @export
146 @validate_call(validate_return=True)
147 async def exists(self, /, path: str) -> bool:
148 return await self._operator.exists(path)
150 @export
151 async def list(self, /, path: str) -> AsyncGenerator[str, None]:
152 async for entry in await self._operator.list(path):
153 yield entry.path
155 @export
156 async def scan(self, /, path: str) -> AsyncGenerator[str, None]:
157 async for entry in await self._operator.scan(path):
158 yield entry.path
160 @export
161 @validate_call(validate_return=True)
162 async def presign_stat(self, /, path: str, expire_second: int) -> PresignedRequest:
163 return PresignedRequest.model_validate(
164 await self._operator.presign_stat(path, expire_second), from_attributes=True
165 )
167 @export
168 @validate_call(validate_return=True)
169 async def presign_read(self, /, path: str, expire_second: int) -> PresignedRequest:
170 return PresignedRequest.model_validate(
171 await self._operator.presign_read(path, expire_second), from_attributes=True
172 )
174 @export
175 @validate_call(validate_return=True)
176 async def presign_write(self, /, path: str, expire_second: int) -> PresignedRequest:
177 return PresignedRequest.model_validate(
178 await self._operator.presign_write(path, expire_second), from_attributes=True
179 )
181 @export
182 @validate_call(validate_return=True)
183 async def capability(self, /) -> Capability:
184 return Capability.model_validate(self._operator.capability(), from_attributes=True)
186 async def copy_exporter_file(self, /, source: Path, target: str):
187 """Copy a file from the exporter to the target path.
188 This function is intended to be used on the exporter side to copy files to the target path.
189 """
190 async with await AsyncOperator("fs", root=source.parent.as_posix()).open(source.name, "rb") as src:
191 async with await self._operator.open(target, "wb") as dst:
192 while True:
193 data = await src.read(size=65536)
194 if len(data) == 0:
195 break
196 await dst.write(bs=data)
199class FlasherInterface(metaclass=ABCMeta):
200 @classmethod
201 def client(cls) -> str:
202 return "jumpstarter_driver_opendal.client.FlasherClient"
204 @abstractmethod
205 def flash(self, source, partition: str | None = None): ...
207 @abstractmethod
208 def dump(self, target, partition: str | None = None): ...
211@dataclass
212class MockFlasher(FlasherInterface, Driver):
213 _tempdir: TemporaryDirectory = field(default_factory=TemporaryDirectory)
215 def __path(self, partition: str | None = None) -> str:
216 if partition is None:
217 partition = "default"
218 return str(Path(self._tempdir.name) / partition)
220 @export
221 async def flash(self, source, partition: str | None = None):
222 async with await FileWriteStream.from_path(self.__path(partition)) as stream:
223 async with self.resource(source) as res:
224 async for chunk in res:
225 await stream.send(chunk)
227 @export
228 async def dump(self, target, partition: str | None = None):
229 async with await FileReadStream.from_path(self.__path(partition)) as stream:
230 async with self.resource(target) as res:
231 async for chunk in stream:
232 await res.send(chunk)
235class StorageMuxInterface(metaclass=ABCMeta):
236 @classmethod
237 def client(cls) -> str:
238 return "jumpstarter_driver_opendal.client.StorageMuxClient"
240 @abstractmethod
241 async def host(self): ...
243 @abstractmethod
244 async def dut(self): ...
246 @abstractmethod
247 async def off(self): ...
249 @abstractmethod
250 async def write(self, src: str): ...
252 @abstractmethod
253 async def read(self, dst: str): ...
256class StorageMuxFlasherInterface(StorageMuxInterface):
257 @classmethod
258 def client(cls) -> str:
259 return "jumpstarter_driver_opendal.client.StorageMuxFlasherClient"
262@dataclass
263class MockStorageMux(StorageMuxInterface, Driver):
264 file: _TemporaryFileWrapper = field(default_factory=NamedTemporaryFile)
266 @export
267 async def host(self):
268 pass
270 @export
271 async def dut(self):
272 pass
274 @export
275 async def off(self):
276 pass
278 @export
279 async def write(self, src: str):
280 async with await FileWriteStream.from_path(self.file.name) as stream:
281 async with self.resource(src) as res:
282 async for chunk in res:
283 await stream.send(chunk)
285 @export
286 async def read(self, dst: str):
287 async with await FileReadStream.from_path(self.file.name) as stream:
288 async with self.resource(dst) as res:
289 async for chunk in stream:
290 await res.send(chunk)
293@dataclass
294class MockStorageMuxFlasher(StorageMuxFlasherInterface, MockStorageMux):
295 pass