Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver.py: 72%

223 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 20:29 +0000

1import hashlib 

2from abc import ABCMeta, abstractmethod 

3from collections.abc import AsyncGenerator 

4from dataclasses import dataclass, field 

5from pathlib import Path 

6from tempfile import NamedTemporaryFile, TemporaryDirectory, _TemporaryFileWrapper 

7from typing import Any 

8from uuid import UUID, uuid4 

9 

10from anyio.streams.file import FileReadStream, FileWriteStream 

11from opendal import AsyncFile, AsyncOperator 

12from pydantic import validate_call 

13 

14from .adapter import AsyncFileStream 

15from .common import Capability, HashAlgo, Metadata, Mode, PresignedRequest 

16from jumpstarter.driver import Driver, export 

17 

18 

19@dataclass(kw_only=True) 

20class Opendal(Driver): 

21 scheme: str 

22 kwargs: dict[str, str] 

23 

24 _operator: AsyncOperator = field(init=False) 

25 _fds: dict[UUID, AsyncFile] = field(init=False, default_factory=dict) 

26 

27 @classmethod 

28 def client(cls) -> str: 

29 return "jumpstarter_driver_opendal.client.OpendalClient" 

30 

31 def __post_init__(self): 

32 if hasattr(super(), "__post_init__"): 

33 super().__post_init__() 

34 

35 self._operator = AsyncOperator(self.scheme, **self.kwargs) 

36 

37 @export 

38 @validate_call(validate_return=True) 

39 async def open(self, /, path: str, mode: Mode) -> UUID: 

40 file = await self._operator.open(path, mode) 

41 uuid = uuid4() 

42 

43 self._fds[uuid] = file 

44 

45 return uuid 

46 

47 @export 

48 @validate_call(validate_return=True) 

49 async def file_read(self, /, fd: UUID, dst: Any) -> None: 

50 async with self.resource(dst) as res: 

51 stream = AsyncFileStream(file=self._fds[fd]) 

52 async for chunk in stream: 

53 await res.send(chunk) 

54 

55 @export 

56 @validate_call(validate_return=True) 

57 async def file_write(self, /, fd: UUID, src: Any) -> None: 

58 async with self.resource(src) as res: 

59 stream = AsyncFileStream(file=self._fds[fd]) 

60 async for chunk in res: 

61 await stream.send(chunk) 

62 

63 @export 

64 @validate_call(validate_return=True) 

65 async def file_seek(self, /, fd: UUID, pos: int, whence: int = 0) -> int: 

66 return await self._fds[fd].seek(pos, whence) 

67 

68 @export 

69 @validate_call(validate_return=True) 

70 async def file_tell(self, /, fd: UUID) -> int: 

71 return await self._fds[fd].tell() 

72 

73 @export 

74 @validate_call(validate_return=True) 

75 async def file_close(self, /, fd: UUID) -> None: 

76 await self._fds[fd].close() 

77 

78 @export 

79 @validate_call(validate_return=True) 

80 async def file_closed(self, /, fd: UUID) -> bool: 

81 return await self._fds[fd].closed 

82 

83 @export 

84 @validate_call(validate_return=True) 

85 async def file_readable(self, /, fd: UUID) -> bool: 

86 return await self._fds[fd].readable() 

87 

88 @export 

89 @validate_call(validate_return=True) 

90 async def file_seekable(self, /, fd: UUID) -> bool: 

91 return await self._fds[fd].seekable() 

92 

93 @export 

94 @validate_call(validate_return=True) 

95 async def file_writable(self, /, fd: UUID) -> bool: 

96 return await self._fds[fd].writable() 

97 

98 @export 

99 @validate_call(validate_return=True) 

100 async def stat(self, /, path: str) -> Metadata: 

101 return Metadata.model_validate(await self._operator.stat(path), from_attributes=True) 

102 

103 @export 

104 @validate_call(validate_return=True) 

105 async def hash(self, /, path: str, algo: HashAlgo = "sha256") -> str: 

106 match algo: 

107 case "md5": 

108 m = hashlib.md5() 

109 case "sha256": 

110 m = hashlib.sha256() 

111 async with await self._operator.open(path, "rb") as f: 

112 while True: 

113 data = await f.read(size=65536) 

114 if len(data) == 0: 

115 break 

116 m.update(data) 

117 

118 return m.hexdigest() 

119 

120 @export 

121 @validate_call(validate_return=True) 

122 async def copy(self, /, source: str, target: str): 

123 await self._operator.copy(source, target) 

124 

125 @export 

126 @validate_call(validate_return=True) 

127 async def rename(self, /, source: str, target: str): 

128 await self._operator.rename(source, target) 

129 

130 @export 

131 @validate_call(validate_return=True) 

132 async def remove_all(self, /, path: str): 

133 await self._operator.remove_all(path) 

134 

135 @export 

136 @validate_call(validate_return=True) 

137 async def create_dir(self, /, path: str): 

138 await self._operator.create_dir(path) 

139 

140 @export 

141 @validate_call(validate_return=True) 

142 async def delete(self, /, path: str): 

143 await self._operator.delete(path) 

144 

145 @export 

146 @validate_call(validate_return=True) 

147 async def exists(self, /, path: str) -> bool: 

148 return await self._operator.exists(path) 

149 

150 @export 

151 async def list(self, /, path: str) -> AsyncGenerator[str, None]: 

152 async for entry in await self._operator.list(path): 

153 yield entry.path 

154 

155 @export 

156 async def scan(self, /, path: str) -> AsyncGenerator[str, None]: 

157 async for entry in await self._operator.scan(path): 

158 yield entry.path 

159 

160 @export 

161 @validate_call(validate_return=True) 

162 async def presign_stat(self, /, path: str, expire_second: int) -> PresignedRequest: 

163 return PresignedRequest.model_validate( 

164 await self._operator.presign_stat(path, expire_second), from_attributes=True 

165 ) 

166 

167 @export 

168 @validate_call(validate_return=True) 

169 async def presign_read(self, /, path: str, expire_second: int) -> PresignedRequest: 

170 return PresignedRequest.model_validate( 

171 await self._operator.presign_read(path, expire_second), from_attributes=True 

172 ) 

173 

174 @export 

175 @validate_call(validate_return=True) 

176 async def presign_write(self, /, path: str, expire_second: int) -> PresignedRequest: 

177 return PresignedRequest.model_validate( 

178 await self._operator.presign_write(path, expire_second), from_attributes=True 

179 ) 

180 

181 @export 

182 @validate_call(validate_return=True) 

183 async def capability(self, /) -> Capability: 

184 return Capability.model_validate(self._operator.capability(), from_attributes=True) 

185 

186 async def copy_exporter_file(self, /, source: Path, target: str): 

187 """Copy a file from the exporter to the target path. 

188 This function is intended to be used on the exporter side to copy files to the target path. 

189 """ 

190 async with await AsyncOperator("fs", root=source.parent.as_posix()).open(source.name, "rb") as src: 

191 async with await self._operator.open(target, "wb") as dst: 

192 while True: 

193 data = await src.read(size=65536) 

194 if len(data) == 0: 

195 break 

196 await dst.write(bs=data) 

197 

198 

199class FlasherInterface(metaclass=ABCMeta): 

200 @classmethod 

201 def client(cls) -> str: 

202 return "jumpstarter_driver_opendal.client.FlasherClient" 

203 

204 @abstractmethod 

205 def flash(self, source, partition: str | None = None): ... 

206 

207 @abstractmethod 

208 def dump(self, target, partition: str | None = None): ... 

209 

210 

211@dataclass 

212class MockFlasher(FlasherInterface, Driver): 

213 _tempdir: TemporaryDirectory = field(default_factory=TemporaryDirectory) 

214 

215 def __path(self, partition: str | None = None) -> str: 

216 if partition is None: 

217 partition = "default" 

218 return str(Path(self._tempdir.name) / partition) 

219 

220 @export 

221 async def flash(self, source, partition: str | None = None): 

222 async with await FileWriteStream.from_path(self.__path(partition)) as stream: 

223 async with self.resource(source) as res: 

224 async for chunk in res: 

225 await stream.send(chunk) 

226 

227 @export 

228 async def dump(self, target, partition: str | None = None): 

229 async with await FileReadStream.from_path(self.__path(partition)) as stream: 

230 async with self.resource(target) as res: 

231 async for chunk in stream: 

232 await res.send(chunk) 

233 

234 

235class StorageMuxInterface(metaclass=ABCMeta): 

236 @classmethod 

237 def client(cls) -> str: 

238 return "jumpstarter_driver_opendal.client.StorageMuxClient" 

239 

240 @abstractmethod 

241 async def host(self): ... 

242 

243 @abstractmethod 

244 async def dut(self): ... 

245 

246 @abstractmethod 

247 async def off(self): ... 

248 

249 @abstractmethod 

250 async def write(self, src: str): ... 

251 

252 @abstractmethod 

253 async def read(self, dst: str): ... 

254 

255 

256class StorageMuxFlasherInterface(StorageMuxInterface): 

257 @classmethod 

258 def client(cls) -> str: 

259 return "jumpstarter_driver_opendal.client.StorageMuxFlasherClient" 

260 

261 

262@dataclass 

263class MockStorageMux(StorageMuxInterface, Driver): 

264 file: _TemporaryFileWrapper = field(default_factory=NamedTemporaryFile) 

265 

266 @export 

267 async def host(self): 

268 pass 

269 

270 @export 

271 async def dut(self): 

272 pass 

273 

274 @export 

275 async def off(self): 

276 pass 

277 

278 @export 

279 async def write(self, src: str): 

280 async with await FileWriteStream.from_path(self.file.name) as stream: 

281 async with self.resource(src) as res: 

282 async for chunk in res: 

283 await stream.send(chunk) 

284 

285 @export 

286 async def read(self, dst: str): 

287 async with await FileReadStream.from_path(self.file.name) as stream: 

288 async with self.resource(dst) as res: 

289 async for chunk in stream: 

290 await res.send(chunk) 

291 

292 

293@dataclass 

294class MockStorageMuxFlasher(StorageMuxFlasherInterface, MockStorageMux): 

295 pass