Coverage for /home/fedora/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py: 62%

42 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-05 20:29 +0000

1from contextlib import asynccontextmanager, suppress 

2from dataclasses import dataclass 

3from typing import Literal 

4 

5from anyio import BrokenResourceError, EndOfStream 

6from anyio.abc import ObjectStream 

7from opendal import AsyncFile, Operator 

8from opendal.exceptions import Error 

9 

10from jumpstarter.client import DriverClient 

11from jumpstarter.client.adapters import blocking 

12from jumpstarter.common.resources import PresignedRequestResource 

13 

14 

15@dataclass(frozen=True, kw_only=True, slots=True) 

16class AsyncFileStream(ObjectStream[bytes]): 

17 """ 

18 wrapper type for opendal.AsyncFile to make it compatible with anyio streams 

19 """ 

20 

21 file: AsyncFile 

22 

23 async def send(self, item: bytes): 

24 try: 

25 await self.file.write(item) 

26 except Error as e: 

27 raise BrokenResourceError from e 

28 

29 async def receive(self) -> bytes: 

30 if not await self.file.readable(): 

31 raise EndOfStream 

32 try: 

33 item = await self.file.read(size=65536) 

34 except Error as e: 

35 raise BrokenResourceError from e 

36 if len(item) == 0: 

37 raise EndOfStream 

38 return item 

39 

40 async def send_eof(self): 

41 pass 

42 

43 async def aclose(self): 

44 with suppress(Error): 

45 await self.file.close() 

46 

47 

48@blocking 

49@asynccontextmanager 

50async def OpendalAdapter( 

51 *, 

52 client: DriverClient, 

53 operator: Operator, # opendal.Operator for the storage backend 

54 path: str, # file path in storage backend relative to the storage root 

55 mode: Literal["rb", "wb"] = "rb", # binary read or binary write mode 

56): 

57 # if the access mode is binary read, and the storage backend supports presigned read requests 

58 if mode == "rb" and operator.capability().presign_read: 

59 # create presigned url for the specified file with a 60 second expiration 

60 presigned = await operator.to_async_operator().presign_read(path, expire_second=60) 

61 yield PresignedRequestResource( 

62 headers=presigned.headers, url=presigned.url, method=presigned.method 

63 ).model_dump(mode="json") 

64 # otherwise stream the file content from the client to the exporter 

65 else: 

66 file = await operator.to_async_operator().open(path, mode) 

67 async with client.resource_async(AsyncFileStream(file=file)) as res: 

68 yield res