Coverage for gcsfs/zonal_file.py: 19%
113 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1import logging
3from fsspec import asyn
4from google.cloud.storage.asyncio.async_appendable_object_writer import (
5 _DEFAULT_FLUSH_INTERVAL_BYTES,
6)
8from gcsfs import zb_hns_utils
9from gcsfs.core import DEFAULT_BLOCK_SIZE, GCSFile
11from .caching import ( # noqa: F401 Unused import to register GCS-Specific caches, Please do not remove it.
12 ReadAheadChunked,
13)
15logger = logging.getLogger("gcsfs.zonal_file")
18class ZonalFile(GCSFile):
19 """
20 ZonalFile is subclass of GCSFile and handles data operations from
21 Zonal buckets only using a high-performance gRPC path.
22 """
24 def __init__(
25 self,
26 gcsfs,
27 path,
28 mode="rb",
29 block_size=DEFAULT_BLOCK_SIZE,
30 autocommit=True,
31 cache_type="readahead_chunked",
32 cache_options=None,
33 acl=None,
34 consistency="md5",
35 metadata=None,
36 content_type=None,
37 timeout=None,
38 fixed_key_metadata=None,
39 generation=None,
40 kms_key_name=None,
41 finalize_on_close=False,
42 flush_interval_bytes=_DEFAULT_FLUSH_INTERVAL_BYTES,
43 **kwargs,
44 ):
45 """
46 Initializes the ZonalFile object.
48 For Zonal buckets, `finalize_on_close` is set to `False` by default to optimize
49 for write throughput and keep the file appendable. This means that when exiting
50 a `with` block or closing, the file will not be automatically finalized. To
51 ensure the write is finalized, `.commit()` must be called explicitly or
52 `finalize_on_close` must be set to `True` when opening the file.
54 For Zonal buckets, `flush_interval_bytes` controls the write buffer size before
55 persisting data to GCS (default: 16 MiB). This value must be a multiple
56 of `_MAX_CHUNK_SIZE_BYTES` (2 MiB). Note that this higher default value may
57 increase memory usage.
58 """
59 bucket, key, generation = gcsfs._split_path(path)
60 if not key:
61 raise OSError("Attempt to open a bucket")
62 self.mrd = None
63 self.aaow = None
64 self.finalize_on_close = finalize_on_close
65 self.finalized = False
66 self.mode = mode
67 self.flush_interval_bytes = flush_interval_bytes
68 self.gcsfs = gcsfs
69 object_size = None
70 if "r" in self.mode:
71 self.mrd = asyn.sync(
72 self.gcsfs.loop, self._init_mrd, bucket, key, generation
73 )
74 object_size = self.mrd.persisted_size
75 if object_size is None:
76 logger.warning(
77 "AsyncMultiRangeDownloader (MRD) exists but has no 'persisted_size'. "
78 "This may result in incorrect behavior for unfinalized objects."
79 )
80 elif "w" in self.mode or "a" in self.mode:
81 pass
82 else:
83 raise NotImplementedError(
84 "Only read, write and append operations are currently supported for Zonal buckets."
85 )
87 super().__init__(
88 gcsfs,
89 path,
90 mode,
91 block_size,
92 autocommit,
93 cache_type,
94 cache_options,
95 acl,
96 consistency,
97 metadata,
98 content_type,
99 timeout,
100 fixed_key_metadata,
101 generation,
102 kms_key_name,
103 # Zonal buckets support append; this prevents GCSFile from forcing 'w' mode
104 _supports_append="a" in mode,
105 # pass persisted_size here so that Cache is initialized with correct object size
106 size=object_size,
107 **kwargs,
108 )
110 async def _init_mrd(self, bucket_name, object_name, generation=None):
111 """
112 Initializes the AsyncMultiRangeDownloader.
113 """
114 await self.gcsfs._get_grpc_client()
115 return await zb_hns_utils.init_mrd(
116 self.gcsfs.grpc_client, bucket_name, object_name, generation
117 )
119 async def _init_aaow(
120 self, bucket_name, object_name, generation=None, flush_interval_bytes=None
121 ):
122 """
123 Initializes the AsyncAppendableObjectWriter.
124 """
125 # generation is needed while creating aaow to append to existing objects
126 if "a" in self.mode and generation is None:
127 try:
128 # self.path might not be set yet, so reconstruct full path
129 info = await self.gcsfs._info(f"{bucket_name}/{object_name}")
130 generation = info.get("generation")
131 except FileNotFoundError:
132 # if file doesn't exist, we don't need generation
133 pass
134 await self.gcsfs._get_grpc_client()
135 return await zb_hns_utils.init_aaow(
136 self.gcsfs.grpc_client,
137 bucket_name,
138 object_name,
139 generation,
140 flush_interval_bytes,
141 )
143 def _ensure_aaow(self):
144 if self.aaow is None:
145 self.aaow = asyn.sync(
146 self.gcsfs.loop,
147 self._init_aaow,
148 self.bucket,
149 self.key,
150 self.generation,
151 self.flush_interval_bytes,
152 )
154 def _fetch_range(self, start=None, end=None, chunk_lengths=None):
155 """
156 Overrides the default _fetch_range to implement the gRPC read path.
158 See super() class for documentation.
159 """
160 if end is not None and chunk_lengths is not None:
161 raise ValueError(
162 "The end and chunk_lengths arguments are mutually exclusive and cannot be used together."
163 )
165 try:
166 if chunk_lengths is not None:
167 return asyn.sync(
168 self.fs.loop,
169 self.gcsfs._fetch_range_split,
170 self.path,
171 start=start,
172 chunk_lengths=chunk_lengths,
173 size=self.size,
174 mrd=self.mrd,
175 )
176 return self.gcsfs.cat_file(self.path, start=start, end=end, mrd=self.mrd)
177 except RuntimeError as e:
178 if "not satisfiable" in str(e):
179 return b"" if chunk_lengths is None else [b""]
180 raise
182 def write(self, data):
183 """
184 Writes data using AsyncAppendableObjectWriter.
186 For more details, see the documentation for AsyncAppendableObjectWriter:
187 https://github.com/googleapis/python-storage/blob/9e6fefdc24a12a9189f7119bc9119e84a061842f/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py#L38
188 """
189 if self.closed:
190 raise ValueError("I/O operation on closed file.")
191 if not self.writable():
192 raise ValueError("File not in write mode.")
193 if self.forced:
194 raise ValueError("This file has been force-flushed, can only close")
196 # Lazily initialize the AsyncAppendableObjectWriter on the first write to avoid
197 # unnecessary object creation for files that are opened but never written to.
198 self._ensure_aaow()
199 asyn.sync(self.gcsfs.loop, self.aaow.append, data)
200 bytes_written = len(data)
201 self.loc += bytes_written
202 return bytes_written
204 def flush(self, force=False):
205 """
206 Flushes the AsyncAppendableObjectWriter, sending all buffered data
207 to the server.
208 """
209 if self.closed:
210 raise ValueError("Flush on closed file.")
211 if force and self.forced:
212 raise ValueError("Force flush cannot be called more than once.")
213 if self.finalized:
214 logger.warning("File is already finalized. Ignoring flush call.")
215 return
216 if force:
217 self.forced = True
219 if self.readable():
220 # no-op to flush on read-mode
221 return
223 # Case 1: Intermediate flush (force=False)
224 # If no data has been written (aaow is None), there is nothing to flush.
225 if self.aaow is None and not force:
226 return
228 # Case 2: Closing flush (force=True) or some data has been written (AAOW exists)
229 # We must ensure aaow exists so that the file is created even for empty writes,
230 # and to flush any buffered data if it exists.
231 self._ensure_aaow()
233 asyn.sync(self.gcsfs.loop, self.aaow.flush)
235 def commit(self):
236 """
237 Commits the write by finalizing the AsyncAppendableObjectWriter.
238 """
239 if not self.writable(): # No-op
240 logger.warning("File not in write mode. Ignoring commit call.")
241 return
242 if self.finalized: # No-op
243 logger.warning(
244 "This file has already been finalized. Ignoring commit call."
245 )
246 return
248 self._ensure_aaow()
249 asyn.sync(self.gcsfs.loop, self.aaow.finalize)
250 self.finalized = True
251 # File is already finalized, avoid finalizing again on close
252 self.finalize_on_close = False
254 def discard(self):
255 """Discard is not applicable for Zonal Buckets. Log a warning instead."""
256 logger.warning(
257 "Discard is not applicable for Zonal Buckets. \
258 Data is uploaded via streaming and cannot be cancelled."
259 )
261 def _initiate_upload(self):
262 """Initiates the upload for Zonal buckets using gRPC."""
263 from gcsfs.extended_gcsfs import initiate_upload
265 self.location = asyn.sync(
266 self.gcsfs.loop,
267 initiate_upload,
268 self.gcsfs,
269 self.bucket,
270 self.key,
271 self.content_type,
272 self.metadata,
273 self.fixed_key_metadata,
274 mode="create" if "x" in self.mode else "overwrite",
275 kms_key_name=self.kms_key_name,
276 timeout=self.timeout,
277 )
279 def _simple_upload(self):
280 """Performs a simple upload for Zonal buckets using gRPC."""
281 from gcsfs.extended_gcsfs import simple_upload
283 self.buffer.seek(0)
284 data = self.buffer.read()
285 asyn.sync(
286 self.gcsfs.loop,
287 simple_upload,
288 self.gcsfs,
289 self.bucket,
290 self.key,
291 data,
292 self.metadata,
293 self.consistency,
294 self.content_type,
295 self.fixed_key_metadata,
296 mode="create" if "x" in self.mode else "overwrite",
297 kms_key_name=self.kms_key_name,
298 timeout=self.timeout,
299 finalize_on_close=self.finalize_on_close,
300 )
302 def _upload_chunk(self, final=False):
303 raise NotImplementedError(
304 "_upload_chunk is not implemented yet for ZonalFile. Please use write() instead."
305 )
307 def close(self):
308 """
309 Closes the ZonalFile and the underlying AsyncMultiRangeDownloader and AsyncAppendableObjectWriter.
310 If in write mode, finalizes the write if finalize_on_close is True.
311 """
312 if self.closed:
313 return
314 # super is closed before aaow since flush may need aaow
315 super().close()
316 # Helper method safely handles mrd=None.
317 asyn.sync(self.gcsfs.loop, zb_hns_utils.close_mrd, self.mrd)
319 # Only close aaow if the stream is open
320 if self.aaow and self.aaow._is_stream_open:
321 asyn.sync(
322 self.gcsfs.loop,
323 zb_hns_utils.close_aaow,
324 self.aaow,
325 finalize_on_close=self.finalize_on_close,
326 )