Coverage for gcsfs/zb_hns_utils.py: 98%

62 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1import logging 

2import os 

3from io import BytesIO 

4 

5from google.api_core.exceptions import NotFound 

6from google.cloud.storage.asyncio.async_appendable_object_writer import ( 

7 _DEFAULT_FLUSH_INTERVAL_BYTES, 

8 AsyncAppendableObjectWriter, 

9) 

10from google.cloud.storage.asyncio.async_multi_range_downloader import ( 

11 AsyncMultiRangeDownloader, 

12) 

13 

14MRD_MAX_RANGES = 1000 # MRD supports up to 1000 ranges per request 

15DEFAULT_CONCURRENCY = int(os.environ.get("DEFAULT_GCSFS_CONCURRENCY", "1")) 

16MAX_PREFETCH_SIZE = 256 * 1024 * 1024 

17logger = logging.getLogger("gcsfs") 

18 

19 

20async def init_mrd(grpc_client, bucket_name, object_name, generation=None): 

21 """ 

22 Creates the AsyncMultiRangeDownloader using an existing client. 

23 Wraps Google API errors into standard Python exceptions. 

24 """ 

25 try: 

26 return await AsyncMultiRangeDownloader.create_mrd( 

27 grpc_client, bucket_name, object_name, generation 

28 ) 

29 except NotFound: 

30 # We wrap the error here to match standard Python error handling 

31 # and avoid leaking Google API exceptions to users. 

32 raise FileNotFoundError(f"{bucket_name}/{object_name}") 

33 

34 

35async def download_range(offset, length, mrd): 

36 """ 

37 Downloads a byte range from the file asynchronously. 

38 """ 

39 # If length = 0, mrd returns till end of file, so handle that case here 

40 if length == 0: 

41 return b"" 

42 buffer = BytesIO() 

43 await mrd.download_ranges([(offset, length, buffer)]) 

44 data = buffer.getvalue() 

45 bytes_downloaded = len(data) 

46 

47 if length != bytes_downloaded: 

48 logger.warning( 

49 f"Short read detected for {mrd.bucket_name}/{mrd.object_name}! " 

50 f"Requested {length} bytes but downloaded {bytes_downloaded} bytes." 

51 ) 

52 

53 logger.debug( 

54 f"Requested {length} bytes from offset {offset}, downloaded {bytes_downloaded} " 

55 f"bytes from mrd path: {mrd.bucket_name}/{mrd.object_name}" 

56 ) 

57 return data 

58 

59 

60async def download_ranges(ranges, mrd): 

61 """ 

62 Downloads multiple byte ranges from the file asynchronously in a single batch. 

63 

64 Args: 

65 ranges: List of (offset, length) tuples to download. Max 1000 ranges allowed. 

66 mrd: AsyncMultiRangeDownloader instance 

67 

68 Returns: 

69 List of bytes objects, one for each range 

70 """ 

71 # Prepare tasks: Filter out empty ranges and create buffers immediately 

72 # Structure: (original_index, offset, length, buffer) 

73 # Calling MRD with length=0 returns till end of file. We handle zero-length 

74 # ranges by returning b"" without calling MRD. So only create tasks for length > 0 

75 

76 if len(ranges) > MRD_MAX_RANGES: 

77 raise ValueError("Invalid input - number of ranges cannot be more than 1000") 

78 

79 tasks = [ 

80 (i, off, length, BytesIO()) 

81 for i, (off, length) in enumerate(ranges) 

82 if length > 0 

83 ] 

84 

85 # Execute Download 

86 if tasks: 

87 # The MRD expects list of (offset, length, buffer) 

88 # We extract these from our task list 

89 await mrd.download_ranges([(off, length, buf) for _, off, length, buf in tasks]) 

90 

91 # Map results back to their original positions 

92 results = [b""] * len(ranges) 

93 for i, _, _, buffer in tasks: 

94 results[i] = buffer.getvalue() 

95 

96 # Log stats 

97 total_requested = sum(r[1] for r in ranges) 

98 total_downloaded = sum(len(r) for r in results) 

99 

100 if total_requested != total_downloaded: 

101 logger.warning( 

102 f"Short read detected for {mrd.bucket_name}/{mrd.object_name}! " 

103 f"Requested {total_requested} bytes but downloaded {total_downloaded} bytes." 

104 ) 

105 

106 if logger.isEnabledFor(logging.DEBUG): 

107 requested_ranges_to_log = [(r[0], r[1]) for r in ranges] 

108 logger.debug( 

109 f"mrd path: {mrd.bucket_name}/{mrd.object_name} | " 

110 f"Requested {len(ranges)} ranges: {requested_ranges_to_log} | " 

111 f"total bytes requested: {total_requested} | " 

112 f"total bytes downloaded: {total_downloaded}" 

113 ) 

114 

115 return results 

116 

117 

118async def init_aaow( 

119 grpc_client, bucket_name, object_name, generation=None, flush_interval_bytes=None 

120): 

121 """ 

122 Creates and opens the AsyncAppendableObjectWriter. 

123 """ 

124 writer_options = {} 

125 # Only pass flush_interval_bytes if the user explicitly provided a 

126 # non-default flush interval. 

127 if flush_interval_bytes and flush_interval_bytes != _DEFAULT_FLUSH_INTERVAL_BYTES: 

128 writer_options["FLUSH_INTERVAL_BYTES"] = flush_interval_bytes 

129 writer = AsyncAppendableObjectWriter( 

130 client=grpc_client, 

131 bucket_name=bucket_name, 

132 object_name=object_name, 

133 generation=generation, 

134 writer_options=writer_options, 

135 ) 

136 await writer.open() 

137 return writer 

138 

139 

140async def close_mrd(mrd): 

141 """ 

142 Closes the AsyncMultiRangeDownloader gracefully. 

143 Logs a warning if closing fails, instead of raising an exception. 

144 """ 

145 if mrd: 

146 try: 

147 await mrd.close() 

148 except Exception as e: 

149 logger.warning( 

150 f"Error closing AsyncMultiRangeDownloader for {mrd.bucket_name}/{mrd.object_name}: {e}" 

151 ) 

152 

153 

154async def close_aaow(aaow, finalize_on_close=False): 

155 """ 

156 Closes the AsyncAppendableObjectWriter gracefully. 

157 Logs a warning if closing fails, instead of raising an exception. 

158 """ 

159 if aaow: 

160 try: 

161 await aaow.close(finalize_on_close=finalize_on_close) 

162 except Exception as e: 

163 logger.warning( 

164 f"Error closing AsyncAppendableObjectWriter for {aaow.bucket_name}/{aaow.object_name}: {e}" 

165 )