Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py: 57%
54 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
1from contextlib import asynccontextmanager, suppress
2from dataclasses import dataclass, field
3from typing import Any, Callable, Literal, Mapping
5from anyio import BrokenResourceError, EndOfStream
6from anyio.abc import ObjectStream
7from opendal import AsyncFile, Metadata, Operator
8from opendal.exceptions import Error
10from jumpstarter.client import DriverClient
11from jumpstarter.client.adapters import blocking
12from jumpstarter.common.resources import PresignedRequestResource
13from jumpstarter.streams.encoding import Compression
14from jumpstarter.streams.progress import ProgressAttribute
17@dataclass(frozen=True, kw_only=True, slots=True)
18class AsyncFileStream(ObjectStream[bytes]):
19 """
20 wrapper type for opendal.AsyncFile to make it compatible with anyio streams
21 """
23 file: AsyncFile
24 metadata: Metadata | None = field(default=None)
26 async def send(self, item: bytes):
27 try:
28 await self.file.write(item)
29 except Error as e:
30 raise BrokenResourceError from e
32 async def receive(self) -> bytes:
33 if not await self.file.readable():
34 raise EndOfStream
35 try:
36 item = await self.file.read(size=65536)
37 except Error as e:
38 raise BrokenResourceError from e
39 if len(item) == 0:
40 raise EndOfStream
41 return item
43 async def send_eof(self):
44 pass
46 async def aclose(self):
47 with suppress(Error):
48 await self.file.close()
50 @property
51 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
52 if self.metadata is not None and self.metadata.content_length != 0:
53 return {ProgressAttribute.total: lambda: float(self.metadata.content_length)}
54 else:
55 return {}
58@blocking
59@asynccontextmanager
60async def OpendalAdapter(
61 *,
62 client: DriverClient,
63 operator: Operator, # opendal.Operator for the storage backend
64 path: str, # file path in storage backend relative to the storage root
65 mode: Literal["rb", "wb"] = "rb", # binary read or binary write mode
66 compression: Compression | None = None, # compression algorithm
67):
68 # if the access mode is binary read, and the storage backend supports presigned read requests
69 if mode == "rb" and operator.capability().presign_read and compression is None:
70 # create presigned url for the specified file with a 60 second expiration
71 presigned = await operator.to_async_operator().presign_read(path, expire_second=60)
72 yield PresignedRequestResource(
73 headers=presigned.headers, url=presigned.url, method=presigned.method
74 ).model_dump(mode="json")
75 # otherwise stream the file content from the client to the exporter
76 else:
77 try:
78 metadata = await operator.to_async_operator().stat(path)
79 except Exception:
80 metadata = None
81 file = await operator.to_async_operator().open(path, mode)
82 async with client.resource_async(
83 AsyncFileStream(file=file, metadata=metadata),
84 content_encoding=compression,
85 ) as res:
86 yield res