Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py: 57%

54 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 17:10 +0200

1from contextlib import asynccontextmanager, suppress 

2from dataclasses import dataclass, field 

3from typing import Any, Callable, Literal, Mapping 

4 

5from anyio import BrokenResourceError, EndOfStream 

6from anyio.abc import ObjectStream 

7from opendal import AsyncFile, Metadata, Operator 

8from opendal.exceptions import Error 

9 

10from jumpstarter.client import DriverClient 

11from jumpstarter.client.adapters import blocking 

12from jumpstarter.common.resources import PresignedRequestResource 

13from jumpstarter.streams.encoding import Compression 

14from jumpstarter.streams.progress import ProgressAttribute 

15 

16 

17@dataclass(frozen=True, kw_only=True, slots=True) 

18class AsyncFileStream(ObjectStream[bytes]): 

19 """ 

20 wrapper type for opendal.AsyncFile to make it compatible with anyio streams 

21 """ 

22 

23 file: AsyncFile 

24 metadata: Metadata | None = field(default=None) 

25 

26 async def send(self, item: bytes): 

27 try: 

28 await self.file.write(item) 

29 except Error as e: 

30 raise BrokenResourceError from e 

31 

32 async def receive(self) -> bytes: 

33 if not await self.file.readable(): 

34 raise EndOfStream 

35 try: 

36 item = await self.file.read(size=65536) 

37 except Error as e: 

38 raise BrokenResourceError from e 

39 if len(item) == 0: 

40 raise EndOfStream 

41 return item 

42 

43 async def send_eof(self): 

44 pass 

45 

46 async def aclose(self): 

47 with suppress(Error): 

48 await self.file.close() 

49 

50 @property 

51 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

52 if self.metadata is not None and self.metadata.content_length != 0: 

53 return {ProgressAttribute.total: lambda: float(self.metadata.content_length)} 

54 else: 

55 return {} 

56 

57 

58@blocking 

59@asynccontextmanager 

60async def OpendalAdapter( 

61 *, 

62 client: DriverClient, 

63 operator: Operator, # opendal.Operator for the storage backend 

64 path: str, # file path in storage backend relative to the storage root 

65 mode: Literal["rb", "wb"] = "rb", # binary read or binary write mode 

66 compression: Compression | None = None, # compression algorithm 

67): 

68 # if the access mode is binary read, and the storage backend supports presigned read requests 

69 if mode == "rb" and operator.capability().presign_read and compression is None: 

70 # create presigned url for the specified file with a 60 second expiration 

71 presigned = await operator.to_async_operator().presign_read(path, expire_second=60) 

72 yield PresignedRequestResource( 

73 headers=presigned.headers, url=presigned.url, method=presigned.method 

74 ).model_dump(mode="json") 

75 # otherwise stream the file content from the client to the exporter 

76 else: 

77 try: 

78 metadata = await operator.to_async_operator().stat(path) 

79 except Exception: 

80 metadata = None 

81 file = await operator.to_async_operator().open(path, mode) 

82 async with client.resource_async( 

83 AsyncFileStream(file=file, metadata=metadata), 

84 content_encoding=compression, 

85 ) as res: 

86 yield res