Coverage for gcsfs/zb_hns_utils.py: 98%
62 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1import logging
2import os
3from io import BytesIO
5from google.api_core.exceptions import NotFound
6from google.cloud.storage.asyncio.async_appendable_object_writer import (
7 _DEFAULT_FLUSH_INTERVAL_BYTES,
8 AsyncAppendableObjectWriter,
9)
10from google.cloud.storage.asyncio.async_multi_range_downloader import (
11 AsyncMultiRangeDownloader,
12)
14MRD_MAX_RANGES = 1000 # MRD supports up to 1000 ranges per request
15DEFAULT_CONCURRENCY = int(os.environ.get("DEFAULT_GCSFS_CONCURRENCY", "1"))
16MAX_PREFETCH_SIZE = 256 * 1024 * 1024
17logger = logging.getLogger("gcsfs")
20async def init_mrd(grpc_client, bucket_name, object_name, generation=None):
21 """
22 Creates the AsyncMultiRangeDownloader using an existing client.
23 Wraps Google API errors into standard Python exceptions.
24 """
25 try:
26 return await AsyncMultiRangeDownloader.create_mrd(
27 grpc_client, bucket_name, object_name, generation
28 )
29 except NotFound:
30 # We wrap the error here to match standard Python error handling
31 # and avoid leaking Google API exceptions to users.
32 raise FileNotFoundError(f"{bucket_name}/{object_name}")
35async def download_range(offset, length, mrd):
36 """
37 Downloads a byte range from the file asynchronously.
38 """
39 # If length = 0, mrd returns till end of file, so handle that case here
40 if length == 0:
41 return b""
42 buffer = BytesIO()
43 await mrd.download_ranges([(offset, length, buffer)])
44 data = buffer.getvalue()
45 bytes_downloaded = len(data)
47 if length != bytes_downloaded:
48 logger.warning(
49 f"Short read detected for {mrd.bucket_name}/{mrd.object_name}! "
50 f"Requested {length} bytes but downloaded {bytes_downloaded} bytes."
51 )
53 logger.debug(
54 f"Requested {length} bytes from offset {offset}, downloaded {bytes_downloaded} "
55 f"bytes from mrd path: {mrd.bucket_name}/{mrd.object_name}"
56 )
57 return data
60async def download_ranges(ranges, mrd):
61 """
62 Downloads multiple byte ranges from the file asynchronously in a single batch.
64 Args:
65 ranges: List of (offset, length) tuples to download. Max 1000 ranges allowed.
66 mrd: AsyncMultiRangeDownloader instance
68 Returns:
69 List of bytes objects, one for each range
70 """
71 # Prepare tasks: Filter out empty ranges and create buffers immediately
72 # Structure: (original_index, offset, length, buffer)
73 # Calling MRD with length=0 returns till end of file. We handle zero-length
74 # ranges by returning b"" without calling MRD. So only create tasks for length > 0
76 if len(ranges) > MRD_MAX_RANGES:
77 raise ValueError("Invalid input - number of ranges cannot be more than 1000")
79 tasks = [
80 (i, off, length, BytesIO())
81 for i, (off, length) in enumerate(ranges)
82 if length > 0
83 ]
85 # Execute Download
86 if tasks:
87 # The MRD expects list of (offset, length, buffer)
88 # We extract these from our task list
89 await mrd.download_ranges([(off, length, buf) for _, off, length, buf in tasks])
91 # Map results back to their original positions
92 results = [b""] * len(ranges)
93 for i, _, _, buffer in tasks:
94 results[i] = buffer.getvalue()
96 # Log stats
97 total_requested = sum(r[1] for r in ranges)
98 total_downloaded = sum(len(r) for r in results)
100 if total_requested != total_downloaded:
101 logger.warning(
102 f"Short read detected for {mrd.bucket_name}/{mrd.object_name}! "
103 f"Requested {total_requested} bytes but downloaded {total_downloaded} bytes."
104 )
106 if logger.isEnabledFor(logging.DEBUG):
107 requested_ranges_to_log = [(r[0], r[1]) for r in ranges]
108 logger.debug(
109 f"mrd path: {mrd.bucket_name}/{mrd.object_name} | "
110 f"Requested {len(ranges)} ranges: {requested_ranges_to_log} | "
111 f"total bytes requested: {total_requested} | "
112 f"total bytes downloaded: {total_downloaded}"
113 )
115 return results
118async def init_aaow(
119 grpc_client, bucket_name, object_name, generation=None, flush_interval_bytes=None
120):
121 """
122 Creates and opens the AsyncAppendableObjectWriter.
123 """
124 writer_options = {}
125 # Only pass flush_interval_bytes if the user explicitly provided a
126 # non-default flush interval.
127 if flush_interval_bytes and flush_interval_bytes != _DEFAULT_FLUSH_INTERVAL_BYTES:
128 writer_options["FLUSH_INTERVAL_BYTES"] = flush_interval_bytes
129 writer = AsyncAppendableObjectWriter(
130 client=grpc_client,
131 bucket_name=bucket_name,
132 object_name=object_name,
133 generation=generation,
134 writer_options=writer_options,
135 )
136 await writer.open()
137 return writer
140async def close_mrd(mrd):
141 """
142 Closes the AsyncMultiRangeDownloader gracefully.
143 Logs a warning if closing fails, instead of raising an exception.
144 """
145 if mrd:
146 try:
147 await mrd.close()
148 except Exception as e:
149 logger.warning(
150 f"Error closing AsyncMultiRangeDownloader for {mrd.bucket_name}/{mrd.object_name}: {e}"
151 )
154async def close_aaow(aaow, finalize_on_close=False):
155 """
156 Closes the AsyncAppendableObjectWriter gracefully.
157 Logs a warning if closing fails, instead of raising an exception.
158 """
159 if aaow:
160 try:
161 await aaow.close(finalize_on_close=finalize_on_close)
162 except Exception as e:
163 logger.warning(
164 f"Error closing AsyncAppendableObjectWriter for {aaow.bucket_name}/{aaow.object_name}: {e}"
165 )