Coverage for gcsfs/zonal_file.py: 19%

113 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1import logging 

2 

3from fsspec import asyn 

4from google.cloud.storage.asyncio.async_appendable_object_writer import ( 

5 _DEFAULT_FLUSH_INTERVAL_BYTES, 

6) 

7 

8from gcsfs import zb_hns_utils 

9from gcsfs.core import DEFAULT_BLOCK_SIZE, GCSFile 

10 

11from .caching import ( # noqa: F401 Unused import to register GCS-Specific caches, Please do not remove it. 

12 ReadAheadChunked, 

13) 

14 

15logger = logging.getLogger("gcsfs.zonal_file") 

16 

17 

18class ZonalFile(GCSFile): 

19 """ 

20 ZonalFile is subclass of GCSFile and handles data operations from 

21 Zonal buckets only using a high-performance gRPC path. 

22 """ 

23 

24 def __init__( 

25 self, 

26 gcsfs, 

27 path, 

28 mode="rb", 

29 block_size=DEFAULT_BLOCK_SIZE, 

30 autocommit=True, 

31 cache_type="readahead_chunked", 

32 cache_options=None, 

33 acl=None, 

34 consistency="md5", 

35 metadata=None, 

36 content_type=None, 

37 timeout=None, 

38 fixed_key_metadata=None, 

39 generation=None, 

40 kms_key_name=None, 

41 finalize_on_close=False, 

42 flush_interval_bytes=_DEFAULT_FLUSH_INTERVAL_BYTES, 

43 **kwargs, 

44 ): 

45 """ 

46 Initializes the ZonalFile object. 

47 

48 For Zonal buckets, `finalize_on_close` is set to `False` by default to optimize 

49 for write throughput and keep the file appendable. This means that when exiting 

50 a `with` block or closing, the file will not be automatically finalized. To 

51 ensure the write is finalized, `.commit()` must be called explicitly or 

52 `finalize_on_close` must be set to `True` when opening the file. 

53 

54 For Zonal buckets, `flush_interval_bytes` controls the write buffer size before 

55 persisting data to GCS (default: 16 MiB). This value must be a multiple 

56 of `_MAX_CHUNK_SIZE_BYTES` (2 MiB). Note that this higher default value may 

57 increase memory usage. 

58 """ 

59 bucket, key, generation = gcsfs._split_path(path) 

60 if not key: 

61 raise OSError("Attempt to open a bucket") 

62 self.mrd = None 

63 self.aaow = None 

64 self.finalize_on_close = finalize_on_close 

65 self.finalized = False 

66 self.mode = mode 

67 self.flush_interval_bytes = flush_interval_bytes 

68 self.gcsfs = gcsfs 

69 object_size = None 

70 if "r" in self.mode: 

71 self.mrd = asyn.sync( 

72 self.gcsfs.loop, self._init_mrd, bucket, key, generation 

73 ) 

74 object_size = self.mrd.persisted_size 

75 if object_size is None: 

76 logger.warning( 

77 "AsyncMultiRangeDownloader (MRD) exists but has no 'persisted_size'. " 

78 "This may result in incorrect behavior for unfinalized objects." 

79 ) 

80 elif "w" in self.mode or "a" in self.mode: 

81 pass 

82 else: 

83 raise NotImplementedError( 

84 "Only read, write and append operations are currently supported for Zonal buckets." 

85 ) 

86 

87 super().__init__( 

88 gcsfs, 

89 path, 

90 mode, 

91 block_size, 

92 autocommit, 

93 cache_type, 

94 cache_options, 

95 acl, 

96 consistency, 

97 metadata, 

98 content_type, 

99 timeout, 

100 fixed_key_metadata, 

101 generation, 

102 kms_key_name, 

103 # Zonal buckets support append; this prevents GCSFile from forcing 'w' mode 

104 _supports_append="a" in mode, 

105 # pass persisted_size here so that Cache is initialized with correct object size 

106 size=object_size, 

107 **kwargs, 

108 ) 

109 

110 async def _init_mrd(self, bucket_name, object_name, generation=None): 

111 """ 

112 Initializes the AsyncMultiRangeDownloader. 

113 """ 

114 await self.gcsfs._get_grpc_client() 

115 return await zb_hns_utils.init_mrd( 

116 self.gcsfs.grpc_client, bucket_name, object_name, generation 

117 ) 

118 

119 async def _init_aaow( 

120 self, bucket_name, object_name, generation=None, flush_interval_bytes=None 

121 ): 

122 """ 

123 Initializes the AsyncAppendableObjectWriter. 

124 """ 

125 # generation is needed while creating aaow to append to existing objects 

126 if "a" in self.mode and generation is None: 

127 try: 

128 # self.path might not be set yet, so reconstruct full path 

129 info = await self.gcsfs._info(f"{bucket_name}/{object_name}") 

130 generation = info.get("generation") 

131 except FileNotFoundError: 

132 # if file doesn't exist, we don't need generation 

133 pass 

134 await self.gcsfs._get_grpc_client() 

135 return await zb_hns_utils.init_aaow( 

136 self.gcsfs.grpc_client, 

137 bucket_name, 

138 object_name, 

139 generation, 

140 flush_interval_bytes, 

141 ) 

142 

143 def _ensure_aaow(self): 

144 if self.aaow is None: 

145 self.aaow = asyn.sync( 

146 self.gcsfs.loop, 

147 self._init_aaow, 

148 self.bucket, 

149 self.key, 

150 self.generation, 

151 self.flush_interval_bytes, 

152 ) 

153 

154 def _fetch_range(self, start=None, end=None, chunk_lengths=None): 

155 """ 

156 Overrides the default _fetch_range to implement the gRPC read path. 

157 

158 See super() class for documentation. 

159 """ 

160 if end is not None and chunk_lengths is not None: 

161 raise ValueError( 

162 "The end and chunk_lengths arguments are mutually exclusive and cannot be used together." 

163 ) 

164 

165 try: 

166 if chunk_lengths is not None: 

167 return asyn.sync( 

168 self.fs.loop, 

169 self.gcsfs._fetch_range_split, 

170 self.path, 

171 start=start, 

172 chunk_lengths=chunk_lengths, 

173 size=self.size, 

174 mrd=self.mrd, 

175 ) 

176 return self.gcsfs.cat_file(self.path, start=start, end=end, mrd=self.mrd) 

177 except RuntimeError as e: 

178 if "not satisfiable" in str(e): 

179 return b"" if chunk_lengths is None else [b""] 

180 raise 

181 

182 def write(self, data): 

183 """ 

184 Writes data using AsyncAppendableObjectWriter. 

185 

186 For more details, see the documentation for AsyncAppendableObjectWriter: 

187 https://github.com/googleapis/python-storage/blob/9e6fefdc24a12a9189f7119bc9119e84a061842f/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py#L38 

188 """ 

189 if self.closed: 

190 raise ValueError("I/O operation on closed file.") 

191 if not self.writable(): 

192 raise ValueError("File not in write mode.") 

193 if self.forced: 

194 raise ValueError("This file has been force-flushed, can only close") 

195 

196 # Lazily initialize the AsyncAppendableObjectWriter on the first write to avoid 

197 # unnecessary object creation for files that are opened but never written to. 

198 self._ensure_aaow() 

199 asyn.sync(self.gcsfs.loop, self.aaow.append, data) 

200 bytes_written = len(data) 

201 self.loc += bytes_written 

202 return bytes_written 

203 

204 def flush(self, force=False): 

205 """ 

206 Flushes the AsyncAppendableObjectWriter, sending all buffered data 

207 to the server. 

208 """ 

209 if self.closed: 

210 raise ValueError("Flush on closed file.") 

211 if force and self.forced: 

212 raise ValueError("Force flush cannot be called more than once.") 

213 if self.finalized: 

214 logger.warning("File is already finalized. Ignoring flush call.") 

215 return 

216 if force: 

217 self.forced = True 

218 

219 if self.readable(): 

220 # no-op to flush on read-mode 

221 return 

222 

223 # Case 1: Intermediate flush (force=False) 

224 # If no data has been written (aaow is None), there is nothing to flush. 

225 if self.aaow is None and not force: 

226 return 

227 

228 # Case 2: Closing flush (force=True) or some data has been written (AAOW exists) 

229 # We must ensure aaow exists so that the file is created even for empty writes, 

230 # and to flush any buffered data if it exists. 

231 self._ensure_aaow() 

232 

233 asyn.sync(self.gcsfs.loop, self.aaow.flush) 

234 

235 def commit(self): 

236 """ 

237 Commits the write by finalizing the AsyncAppendableObjectWriter. 

238 """ 

239 if not self.writable(): # No-op 

240 logger.warning("File not in write mode. Ignoring commit call.") 

241 return 

242 if self.finalized: # No-op 

243 logger.warning( 

244 "This file has already been finalized. Ignoring commit call." 

245 ) 

246 return 

247 

248 self._ensure_aaow() 

249 asyn.sync(self.gcsfs.loop, self.aaow.finalize) 

250 self.finalized = True 

251 # File is already finalized, avoid finalizing again on close 

252 self.finalize_on_close = False 

253 

254 def discard(self): 

255 """Discard is not applicable for Zonal Buckets. Log a warning instead.""" 

256 logger.warning( 

257 "Discard is not applicable for Zonal Buckets. \ 

258 Data is uploaded via streaming and cannot be cancelled." 

259 ) 

260 

261 def _initiate_upload(self): 

262 """Initiates the upload for Zonal buckets using gRPC.""" 

263 from gcsfs.extended_gcsfs import initiate_upload 

264 

265 self.location = asyn.sync( 

266 self.gcsfs.loop, 

267 initiate_upload, 

268 self.gcsfs, 

269 self.bucket, 

270 self.key, 

271 self.content_type, 

272 self.metadata, 

273 self.fixed_key_metadata, 

274 mode="create" if "x" in self.mode else "overwrite", 

275 kms_key_name=self.kms_key_name, 

276 timeout=self.timeout, 

277 ) 

278 

279 def _simple_upload(self): 

280 """Performs a simple upload for Zonal buckets using gRPC.""" 

281 from gcsfs.extended_gcsfs import simple_upload 

282 

283 self.buffer.seek(0) 

284 data = self.buffer.read() 

285 asyn.sync( 

286 self.gcsfs.loop, 

287 simple_upload, 

288 self.gcsfs, 

289 self.bucket, 

290 self.key, 

291 data, 

292 self.metadata, 

293 self.consistency, 

294 self.content_type, 

295 self.fixed_key_metadata, 

296 mode="create" if "x" in self.mode else "overwrite", 

297 kms_key_name=self.kms_key_name, 

298 timeout=self.timeout, 

299 finalize_on_close=self.finalize_on_close, 

300 ) 

301 

302 def _upload_chunk(self, final=False): 

303 raise NotImplementedError( 

304 "_upload_chunk is not implemented yet for ZonalFile. Please use write() instead." 

305 ) 

306 

307 def close(self): 

308 """ 

309 Closes the ZonalFile and the underlying AsyncMultiRangeDownloader and AsyncAppendableObjectWriter. 

310 If in write mode, finalizes the write if finalize_on_close is True. 

311 """ 

312 if self.closed: 

313 return 

314 # super is closed before aaow since flush may need aaow 

315 super().close() 

316 # Helper method safely handles mrd=None. 

317 asyn.sync(self.gcsfs.loop, zb_hns_utils.close_mrd, self.mrd) 

318 

319 # Only close aaow if the stream is open 

320 if self.aaow and self.aaow._is_stream_open: 

321 asyn.sync( 

322 self.gcsfs.loop, 

323 zb_hns_utils.close_aaow, 

324 self.aaow, 

325 finalize_on_close=self.finalize_on_close, 

326 )