Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py: 62%
42 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-05 20:29 +0000
1from contextlib import asynccontextmanager, suppress
2from dataclasses import dataclass
3from typing import Literal
5from anyio import BrokenResourceError, EndOfStream
6from anyio.abc import ObjectStream
7from opendal import AsyncFile, Operator
8from opendal.exceptions import Error
10from jumpstarter.client import DriverClient
11from jumpstarter.client.adapters import blocking
12from jumpstarter.common.resources import PresignedRequestResource
15@dataclass(frozen=True, kw_only=True, slots=True)
16class AsyncFileStream(ObjectStream[bytes]):
17 """
18 wrapper type for opendal.AsyncFile to make it compatible with anyio streams
19 """
21 file: AsyncFile
23 async def send(self, item: bytes):
24 try:
25 await self.file.write(item)
26 except Error as e:
27 raise BrokenResourceError from e
29 async def receive(self) -> bytes:
30 if not await self.file.readable():
31 raise EndOfStream
32 try:
33 item = await self.file.read(size=65536)
34 except Error as e:
35 raise BrokenResourceError from e
36 if len(item) == 0:
37 raise EndOfStream
38 return item
40 async def send_eof(self):
41 pass
43 async def aclose(self):
44 with suppress(Error):
45 await self.file.close()
48@blocking
49@asynccontextmanager
50async def OpendalAdapter(
51 *,
52 client: DriverClient,
53 operator: Operator, # opendal.Operator for the storage backend
54 path: str, # file path in storage backend relative to the storage root
55 mode: Literal["rb", "wb"] = "rb", # binary read or binary write mode
56):
57 # if the access mode is binary read, and the storage backend supports presigned read requests
58 if mode == "rb" and operator.capability().presign_read:
59 # create presigned url for the specified file with a 60 second expiration
60 presigned = await operator.to_async_operator().presign_read(path, expire_second=60)
61 yield PresignedRequestResource(
62 headers=presigned.headers, url=presigned.url, method=presigned.method
63 ).model_dump(mode="json")
64 # otherwise stream the file content from the client to the exporter
65 else:
66 file = await operator.to_async_operator().open(path, mode)
67 async with client.resource_async(AsyncFileStream(file=file)) as res:
68 yield res