Coverage for gcsfs/extended_gcsfs.py: 33%
598 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1import asyncio
2import logging
3import os
4import uuid
5from enum import Enum
6from glob import has_magic
8from fsspec import asyn
9from fsspec.callbacks import NoOpCallback
10from google.api_core import exceptions as api_exceptions
11from google.api_core.client_info import ClientInfo
12from google.api_core.client_options import ClientOptions
13from google.auth.credentials import AnonymousCredentials
14from google.cloud import storage_control_v2
15from google.cloud.storage.asyncio.async_appendable_object_writer import (
16 AsyncAppendableObjectWriter,
17)
18from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
20from gcsfs import __version__ as version
21from gcsfs import zb_hns_utils
22from gcsfs.core import GCSFile, GCSFileSystem
23from gcsfs.zonal_file import ZonalFile
25logger = logging.getLogger("gcsfs")
27USER_AGENT = "python-gcsfs"
30class BucketType(Enum):
31 ZONAL_HIERARCHICAL = "ZONAL_HIERARCHICAL"
32 HIERARCHICAL = "HIERARCHICAL"
33 NON_HIERARCHICAL = "NON_HIERARCHICAL"
34 UNKNOWN = "UNKNOWN"
37gcs_file_types = {
38 BucketType.ZONAL_HIERARCHICAL: ZonalFile,
39 BucketType.NON_HIERARCHICAL: GCSFile,
40 BucketType.HIERARCHICAL: GCSFile,
41 BucketType.UNKNOWN: GCSFile,
42}
45class ExtendedGcsFileSystem(GCSFileSystem):
46 """
47 This class will be used when GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT env variable is set to true.
48 ExtendedGcsFileSystem is a subclass of GCSFileSystem that adds new logic for bucket types
49 including zonal and hierarchical. For buckets without special properties, it forwards requests
50 to the parent class GCSFileSystem for default processing.
51 """
53 def __init__(self, *args, finalize_on_close=False, **kwargs):
54 super().__init__(*args, **kwargs)
55 # By default, files in zonal buckets are left unfinalized to allow appends.
56 self.finalize_on_close = finalize_on_close
57 self._grpc_client = None
58 self._storage_control_client = None
59 # Adds user-passed credentials to ExtendedGcsFileSystem to pass to gRPC/Storage Control clients.
60 # We unwrap the nested credentials here because self.credentials is a GCSFS wrapper,
61 # but the clients expect the underlying google.auth credentials object.
62 self.credential = self.credentials.credentials
63 # When token="anon", self.credentials.credentials is None. This is
64 # often used for testing with emulators. However, the gRPC and storage
65 # control clients require a credentials object for initialization.
66 # We explicitly use AnonymousCredentials() to allow unauthenticated access.
67 if self.credentials.token == "anon":
68 self.credential = AnonymousCredentials()
69 self._storage_layout_cache = {}
71 @property
72 def _user_project(self):
73 """Value used for billing - enabling "requestor pays" access"""
74 if self.requester_pays:
75 return (
76 self.requester_pays
77 if isinstance(self.requester_pays, str)
78 else self.project
79 )
80 return None
82 @property
83 def grpc_client(self):
84 if self.asynchronous and self._grpc_client is None:
85 raise RuntimeError(
86 "Please await _get_grpc_client() before accessing grpc_client"
87 )
88 if self._grpc_client is None:
89 self._grpc_client = asyn.sync(self.loop, self._get_grpc_client)
90 return self._grpc_client
92 async def _get_grpc_client(self):
93 if self._grpc_client is None:
94 client_options = None
95 if self._location:
96 # client_options expects only the host:port, without the protocol.
97 endpoint = self._location.split("://")[-1]
98 client_options = ClientOptions(api_endpoint=endpoint)
99 self._grpc_client = AsyncGrpcClient(
100 credentials=self.credential,
101 client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"),
102 client_options=client_options,
103 )
104 return self._grpc_client
106 async def _get_control_plane_client(self):
107 if self._storage_control_client is None:
109 # Initialize the storage control plane client for bucket
110 # metadata operations
111 transport_cls = (
112 storage_control_v2.StorageControlAsyncClient.get_transport_class(
113 "grpc_asyncio"
114 )
115 )
116 channel = transport_cls.create_channel(
117 credentials=self.credential,
118 options=[("grpc.primary_user_agent", f"{USER_AGENT}/{version}")],
119 quota_project_id=self._user_project,
120 )
121 transport = transport_cls(channel=channel)
122 self._storage_control_client = storage_control_v2.StorageControlAsyncClient(
123 transport=transport
124 )
125 return self._storage_control_client
127 async def _lookup_bucket_type(self, bucket):
128 if bucket in self._storage_layout_cache:
129 return self._storage_layout_cache[bucket]
130 bucket_type = await self._get_bucket_type(bucket)
131 # Dont cache UNKNOWN type
132 if bucket_type == BucketType.UNKNOWN:
133 return bucket_type
134 self._storage_layout_cache[bucket] = bucket_type
135 return self._storage_layout_cache[bucket]
137 _sync_lookup_bucket_type = asyn.sync_wrapper(_lookup_bucket_type)
139 async def _get_bucket_type(self, bucket):
140 try:
141 client = await self._get_control_plane_client()
142 bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout"
143 logger.debug(f"get_storage_layout request for name: {bucket_name_value}")
144 response = await client.get_storage_layout(name=bucket_name_value)
146 if response.location_type == "zone":
147 return BucketType.ZONAL_HIERARCHICAL
148 if (
149 response.hierarchical_namespace
150 and response.hierarchical_namespace.enabled
151 ):
152 return BucketType.HIERARCHICAL
153 return BucketType.NON_HIERARCHICAL
154 except api_exceptions.NotFound:
155 logger.warning(
156 f"Error: Bucket {bucket} not found or you lack permissions for "
157 f"storage layout api used to detect bucket type. Falling back to GCSFileSystem."
158 )
159 return BucketType.UNKNOWN
160 except Exception as e:
161 logger.warning(
162 f"Could not determine bucket type for bucket name {bucket}: {e}, falling back to GCSFileSystem"
163 )
164 # Default to UNKNOWN in case bucket type is not obtained
165 return BucketType.UNKNOWN
167 def _open(
168 self,
169 path,
170 mode="rb",
171 block_size=None,
172 cache_options=None,
173 acl=None,
174 consistency=None,
175 metadata=None,
176 autocommit=True,
177 fixed_key_metadata=None,
178 generation=None,
179 **kwargs,
180 ):
181 """
182 Open a file.
183 """
184 bucket, _, _ = self.split_path(path)
185 bucket_type = self._sync_lookup_bucket_type(bucket)
187 return gcs_file_types[bucket_type](
188 self,
189 path,
190 mode,
191 block_size=block_size or self.default_block_size,
192 cache_options=cache_options,
193 consistency=consistency or self.consistency,
194 metadata=metadata,
195 acl=acl,
196 autocommit=autocommit,
197 fixed_key_metadata=fixed_key_metadata,
198 generation=generation,
199 finalize_on_close=kwargs.pop("finalize_on_close", self.finalize_on_close),
200 **kwargs,
201 )
203 # Replacement method for _process_limits to support new params (offset and length) for MRD.
204 async def _process_limits_to_offset_and_length(
205 self, path, start, end, file_size=None
206 ):
207 """
208 Calculates the read offset and length from start and end parameters.
210 Args:
211 path (str): The path to the file.
212 start (int | None): The starting byte position.
213 end (int | None): The ending byte position.
214 file_size (int | None): The total size of the file. If None, it will be fetched via _info().
216 Returns:
217 tuple: A tuple containing (offset, length).
218 """
219 size = file_size
221 if start is None:
222 offset = 0
223 elif start < 0:
224 size = (await self._info(path))["size"] if size is None else size
225 # If start is negative and larger than the file size, we should start from 0.
226 offset = max(0, size + start)
227 else:
228 offset = start
230 if end is None:
231 size = (await self._info(path))["size"] if size is None else size
232 effective_end = size
233 elif end < 0:
234 size = (await self._info(path))["size"] if size is None else size
235 effective_end = size + end
236 else:
237 effective_end = end
239 # If the requested end is before/ same as the start, return empty.
240 if effective_end <= offset:
241 return offset, 0
242 else:
243 length = effective_end - offset # Normal case
244 size = (await self._info(path))["size"] if size is None else size
245 if effective_end > size:
246 length = max(0, size - offset) # Clamp and ensure non-negative
248 return offset, length
250 sync_process_limits_to_offset_and_length = asyn.sync_wrapper(
251 _process_limits_to_offset_and_length
252 )
254 async def _is_zonal_bucket(self, bucket):
255 bucket_type = await self._lookup_bucket_type(bucket)
256 return bucket_type == BucketType.ZONAL_HIERARCHICAL
258 async def _fetch_range_split(
259 self, path, start=None, chunk_lengths=None, mrd=None, size=None, **kwargs
260 ):
261 """
262 Reading multiple reads in one large stream.
264 Optimized for Zonal Buckets:
265 Leverages AsyncMultiRangeDownloader.download_ranges() to fetch all requested
266 'chunk_lengths' (chunks) concurrently in a single batch request, significantly
267 improving performance for ReadAheadV2.
268 """
270 bucket, object_name, generation = self.split_path(path)
271 mrd_created = False
272 try:
273 if mrd is None:
274 # Check before creating MRD
275 if not await self._is_zonal_bucket(bucket):
276 raise RuntimeError(
277 "Internal error, this method is only supported for zonal buckets!"
278 )
280 await self._get_grpc_client()
281 mrd = await zb_hns_utils.init_mrd(
282 self.grpc_client, bucket, object_name, generation
283 )
284 mrd_created = True
286 file_size = size or mrd.persisted_size
287 if file_size is None:
288 logger.warning(
289 f"AsyncMultiRangeDownloader (MRD) for {path} has no 'persisted_size'. "
290 "Falling back to _info() to get the file size."
291 )
292 file_size = (await self._info(path))["size"]
294 if chunk_lengths:
295 start_offset = start if start is not None else 0
296 current_offset = start_offset
298 if start_offset >= file_size or (
299 chunk_lengths is not None
300 and start_offset + sum(chunk_lengths) > file_size
301 ):
302 raise RuntimeError("Request not satisfiable.")
304 read_ranges = [] # To pass to MRD
306 for length in chunk_lengths:
307 read_ranges.append((current_offset, length))
308 current_offset += length
310 return await zb_hns_utils.download_ranges(read_ranges, mrd)
311 else:
312 end = kwargs.get("end")
313 offset, length = await self._process_limits_to_offset_and_length(
314 path, start, end, file_size
315 )
317 data = await zb_hns_utils.download_range(
318 offset=offset, length=length, mrd=mrd
319 )
320 return [data]
321 finally:
322 if mrd_created:
323 await zb_hns_utils.close_mrd(mrd)
325 async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs):
326 """Fetch a file's contents as bytes, with an optimized path for Zonal buckets.
328 This method overrides the parent `_cat_file` to read objects in Zonal buckets using gRPC.
330 Args:
331 path (str): The full GCS path to the file (e.g., "bucket/object").
332 start (int, optional): The starting byte position to read from.
333 end (int, optional): The ending byte position to read to.
334 mrd (AsyncMultiRangeDownloader, optional): An existing multi-range
335 downloader instance. If not provided, a new one will be created for Zonal buckets.
337 Returns:
338 bytes: The content of the file or file range.
339 """
340 try:
341 mrd_created = False
343 # A new MRD is required when read is done directly by the
344 # GCSFilesystem class without creating a GCSFile object first.
345 if mrd is None:
346 bucket, object_name, generation = self.split_path(path)
347 # Fall back to default implementation if not a zonal bucket
348 if not await self._is_zonal_bucket(bucket):
349 return await super()._cat_file(path, start=start, end=end, **kwargs)
351 await self._get_grpc_client()
352 mrd = await zb_hns_utils.init_mrd(
353 self.grpc_client, bucket, object_name, generation
354 )
355 mrd_created = True
357 file_size = mrd.persisted_size
358 if file_size is None:
359 logger.warning(
360 f"AsyncMultiRangeDownloader (MRD) for {path} has no 'persisted_size'. "
361 "Falling back to _info() to get the file size. "
362 "This may result in incorrect behavior for unfinalized objects."
363 )
364 offset, length = await self._process_limits_to_offset_and_length(
365 path, start, end, file_size
366 )
368 return await zb_hns_utils.download_range(
369 offset=offset, length=length, mrd=mrd
370 )
371 finally:
372 # Explicit cleanup if we created the MRD
373 if mrd_created:
374 await zb_hns_utils.close_mrd(mrd)
376 async def _is_bucket_hns_enabled(self, bucket):
377 """Checks if a bucket has Hierarchical Namespace enabled."""
378 try:
379 bucket_type = await self._lookup_bucket_type(bucket)
380 except Exception as e:
381 logger.warning(
382 f"Could not determine if bucket '{bucket}' is HNS-enabled, falling back to default non-HNS: {e}",
383 stack_info=True,
384 )
385 return False
387 return bucket_type in [BucketType.ZONAL_HIERARCHICAL, BucketType.HIERARCHICAL]
389 def _update_dircache_after_rename(self, path1, path2):
390 """
391 Performs a targeted update of the directory cache after a successful
392 folder rename operation.
394 This involves three main steps:
395 1. Removing the source folder and all its descendants from the cache.
396 2. Removing the source folder's entry from its parent's listing.
397 3. Adding the new destination folder's entry to its parent's listing.
399 Args:
400 path1 (str): The source path that was renamed.
401 path2 (str): The destination path.
402 """
403 # 1. Find and remove all descendant paths of the source from the cache.
404 source_prefix = f"{path1.rstrip('/')}/"
405 for key in list(self.dircache):
406 if key.startswith(source_prefix):
407 self.dircache.pop(key, None)
409 # 2. Remove the old source entry from its parent's listing.
410 self.dircache.pop(path1, None)
411 parent1 = self._parent(path1)
412 if parent1 in self.dircache:
413 self.dircache[parent1] = [
414 e for e in self.dircache[parent1] if e.get("name") != path1
415 ]
417 # 3. Invalidate the destination path and update its parent's cache.
418 self.dircache.pop(path2, None)
419 parent2 = self._parent(path2)
420 if parent2 in self.dircache:
421 _, key2, _ = self.split_path(path2)
422 new_entry = {
423 "Key": key2,
424 "Size": 0,
425 "name": path2,
426 "size": 0,
427 "type": "directory",
428 "storageClass": "DIRECTORY",
429 }
430 self.dircache[parent2].append(new_entry)
432 async def _mv_file_cache_update(self, path1, path2, response=None):
433 """
434 Update the cache after a file move operation.
436 For HNS-enabled buckets where the move is within the same bucket, this method
437 directly updates the directory cache by removing the source entry from it's
438 parent cache and adding destination path as a new entry in it's corresponding parent cache.
439 This avoids invalidating the entire parent directory cache, which is beneficial for HNS
440 performance.
442 For non-HNS buckets or cross-bucket moves, it falls back to the default
443 behavior (invalidating the cache for both source and destination parents).
444 """
445 src_bucket, _, _ = self.split_path(path1)
446 dest_bucket, _, _ = self.split_path(path2)
448 if await self._is_bucket_hns_enabled(src_bucket) and src_bucket == dest_bucket:
449 src_parent = self._parent(path1)
450 if src_parent in self.dircache:
451 path1_stripped = self._strip_protocol(path1)
452 self.dircache[src_parent] = [
453 e
454 for e in self.dircache[src_parent]
455 if e.get("name") != path1_stripped
456 ]
457 dest_parent = self._parent(path2)
458 if dest_parent in self.dircache and response:
459 new_entry = self._process_object(dest_bucket, response)
460 self.dircache[dest_parent].append(new_entry)
461 else:
462 await super()._mv_file_cache_update(path1, path2, response)
464 async def _mv(self, path1, path2, **kwargs):
465 """
466 Move a file or directory. Overrides the parent `_mv` to provide an
467 optimized, atomic implementation for renaming folders and moving files
468 in HNS-enabled buckets. Falls back to the parent's object-level
469 copy-and-delete implementation for non-HNS buckets.
470 """
471 if path1 == path2:
472 logger.debug(
473 "%s mv: The paths are the same, so no files/directories were moved.",
474 self,
475 )
476 return
478 if (
479 isinstance(path1, list)
480 or isinstance(path2, list)
481 or (isinstance(path1, str) and has_magic(path1))
482 ):
483 return await super()._mv(path1, path2, **kwargs)
485 bucket1, key1, _ = self.split_path(path1)
486 bucket2, key2, _ = self.split_path(path2)
488 is_hns = await self._is_bucket_hns_enabled(bucket1)
490 if not is_hns:
491 logger.debug(
492 f"Not an HNS bucket. Falling back to object-level mv for '{path1}' to '{path2}'."
493 )
494 return await super()._mv(path1, path2, **kwargs)
496 try:
497 info1 = await self._info(path1)
498 is_folder = info1.get("type") == "directory"
500 # We only use HNS rename if the source is a folder and the move is
501 # within the same bucket.
502 if is_folder and bucket1 == bucket2 and key1:
503 logger.debug(
504 f"Using HNS-aware folder rename for '{path1}' to '{path2}'."
505 )
506 source_folder_name = f"projects/_/buckets/{bucket1}/folders/{key1}"
507 destination_folder_id = key2 or key1.rstrip("/").split("/")[-1]
509 request = storage_control_v2.RenameFolderRequest(
510 name=source_folder_name,
511 destination_folder_id=destination_folder_id,
512 request_id=str(uuid.uuid4()),
513 )
515 logger.debug(f"rename_folder request: {request}")
516 client = await self._get_control_plane_client()
517 operation = await client.rename_folder(request=request)
518 await operation.result()
519 self._update_dircache_after_rename(path1, path2)
521 logger.debug(
522 "Successfully renamed folder from '%s' to '%s'", path1, path2
523 )
524 return
525 elif not is_folder:
526 await self._mv_file(path1, path2)
527 return
528 except Exception as e:
529 if isinstance(e, FileNotFoundError):
530 # If the source doesn't exist, fail fast.
531 raise
532 if isinstance(e, api_exceptions.NotFound):
533 raise FileNotFoundError(
534 f"Source '{path1}' not found for move operation."
535 ) from e
536 if isinstance(e, api_exceptions.Conflict):
537 # This occurs if the destination folder already exists.
538 # Raise FileExistsError for fsspec compatibility.
539 raise FileExistsError(
540 f"HNS rename failed due to conflict for '{path1}' to '{path2}'"
541 ) from e
542 if isinstance(e, api_exceptions.FailedPrecondition):
543 raise OSError(f"HNS rename failed: {e}") from e
545 logger.warning(f"Could not perform HNS-aware mv: {e}")
547 logger.debug(f"Falling back to object-level mv for '{path1}' to '{path2}'.")
548 return await super()._mv(path1, path2, **kwargs)
550 mv = asyn.sync_wrapper(_mv)
552 async def _list_objects(self, path, prefix="", versions=False, **kwargs):
553 try:
554 return await super()._list_objects(
555 path, prefix=prefix, versions=versions, **kwargs
556 )
557 except FileNotFoundError:
558 bucket, key, _ = self.split_path(path)
559 if key and await self._is_bucket_hns_enabled(bucket):
560 try:
561 await self._get_directory_info(path, bucket, key, None)
562 return []
563 except (FileNotFoundError, Exception):
564 pass
565 raise
567 async def _mkdir(
568 self,
569 path,
570 create_parents=False,
571 enable_hierarchical_namespace=False,
572 placement=None,
573 location=None,
574 **kwargs,
575 ):
576 """
577 Create a directory or bucket.
579 If the path refers to a bucket (no object key), a new bucket is created.
580 If the path refers to a directory (includes object key), a directory is created.
582 Parameters
583 ----------
584 path : str
585 Path to create.
586 create_parents : bool
587 If True, create parent directories if they do not exist.
588 If the path includes a bucket that does not exist, the bucket will also be created.
589 enable_hierarchical_namespace : bool
590 If True, and a bucket is being created, the bucket will have Hierarchical
591 Namespace (HNS) enabled.
592 placement : str, optional
593 If set to a zone (e.g. "us-central1-a"), a Zonal bucket is created.
594 Zonal buckets are HNS-enabled by default.
595 When creating a Zonal bucket, `location` must be passed as a
596 region (e.g. "us-central1"). If `location` is not specified, it defaults
597 to `self.default_location`. The zone specified in `placement` must belong
598 to the region specified in `location`.
599 location : str, optional
600 Location where buckets are created, like 'US' or 'EUROPE-WEST3'.
601 If not provided, defaults to `self.default_location`.
602 **kwargs : dict
603 Additional arguments passed to the bucket creation API.
605 Notes
606 -----
607 - For HNS-enabled buckets (including Zonal buckets), this method creates a
608 native folder object.
609 - If `create_parents` is False and a parent directory does not exist in an
610 HNS/Zonal bucket, a FileNotFoundError is raised.
611 - For non-HNS buckets, this falls back to the parent implementation. Since
612 standard GCS has no true directories, `mkdir` on a path with a key is
613 typically a no-op unless `create_parents=True` triggers bucket creation.
614 """
615 path = self._strip_protocol(path)
616 bucket, key, _ = self.split_path(path)
618 # Determine if we are requesting creation of a Zonal or HNS bucket
619 should_create_zonal_bucket = placement is not None
620 should_create_hns_bucket = (
621 enable_hierarchical_namespace or should_create_zonal_bucket
622 )
624 # Prepare arguments for bucket creation
625 bucket_kwargs = kwargs.copy()
626 if location:
627 bucket_kwargs["location"] = location
628 if should_create_zonal_bucket:
629 bucket_kwargs["customPlacementConfig"] = {"dataLocations": [placement]}
630 bucket_kwargs["storageClass"] = "RAPID"
632 if should_create_hns_bucket:
633 bucket_kwargs["hierarchicalNamespace"] = {"enabled": True}
634 # HNS buckets require uniform bucket-level access.
635 bucket_kwargs["iamConfiguration"] = {
636 "uniformBucketLevelAccess": {"enabled": True}
637 }
638 # When uniformBucketLevelAccess is enabled, ACLs cannot be used.
639 # We must explicitly set them to None to prevent the parent
640 # method from using default ACLs.
641 bucket_kwargs["acl"] = None
642 bucket_kwargs["default_acl"] = None
644 # Case 1: Path is just a bucket
645 if not key:
646 return await super()._mkdir(
647 path, create_parents=create_parents, **bucket_kwargs
648 )
650 # Case 2: Path is a folder
651 is_hns_bucket = False
653 # If creating parents and HNS/Zonal requested, ensure bucket exists with correct config
654 if create_parents and should_create_hns_bucket:
655 if not await self._exists(bucket):
656 await super()._mkdir(bucket, create_parents=True, **bucket_kwargs)
657 is_hns_bucket = True
659 if not is_hns_bucket:
660 is_hns_bucket = await self._is_bucket_hns_enabled(bucket)
662 if is_hns_bucket:
663 return await self._create_hns_folder(path, bucket, key, create_parents)
665 return await super()._mkdir(
666 path, create_parents=create_parents, **bucket_kwargs
667 )
669 async def _create_hns_folder(self, path, bucket, key, create_parents):
670 logger.debug(f"Using HNS-aware mkdir for '{path}'.")
671 parent = f"projects/_/buckets/{bucket}"
672 folder_id = key.rstrip("/")
673 request = storage_control_v2.CreateFolderRequest(
674 parent=parent,
675 folder_id=folder_id,
676 recursive=create_parents,
677 request_id=str(uuid.uuid4()),
678 )
679 try:
680 logger.debug(f"create_folder request: {request}")
681 client = await self._get_control_plane_client()
682 await client.create_folder(request=request)
683 # Instead of invalidating the parent cache, update it to add the new entry.
684 parent_path = self._parent(path)
685 if parent_path in self.dircache:
686 new_entry = {
687 "Key": key.rstrip("/"),
688 "Size": 0,
689 "name": path,
690 "size": 0,
691 "type": "directory",
692 "storageClass": "DIRECTORY",
693 }
694 self.dircache[parent_path].append(new_entry)
695 except api_exceptions.Conflict as e:
696 logger.debug(f"Directory already exists: {path}: {e}")
697 except api_exceptions.FailedPrecondition as e:
698 # This error can occur if create_parents=False and the parent dir doesn't exist.
699 # Translate it to FileNotFoundError for fsspec compatibility.
700 raise FileNotFoundError(
701 f"mkdir for '{path}' failed due to a precondition error: {e}"
702 ) from e
704 mkdir = asyn.sync_wrapper(_mkdir)
706 async def _get_directory_info(self, path, bucket, key, generation):
707 """
708 Override to use Storage Control API's get_folder for HNS buckets.
709 For HNS, we avoid calling _ls (_list_objects) entirely.
710 """
711 is_hns = await self._is_bucket_hns_enabled(bucket)
713 # If bucket is HNS, use get folder metadata api to determine a directory
714 if is_hns:
715 try:
716 # folder_id is the path relative to the bucket
717 folder_id = key.rstrip("/")
718 folder_resource_name = (
719 f"projects/_/buckets/{bucket}/folders/{folder_id}"
720 )
722 request = storage_control_v2.GetFolderRequest(
723 name=folder_resource_name, request_id=str(uuid.uuid4())
724 )
726 # Verify existence using get_folder API
727 client = await self._get_control_plane_client()
728 response = await client.get_folder(request=request)
730 # If successful, return directory metadata
731 return {
732 "bucket": bucket,
733 "name": path,
734 "size": 0,
735 "storageClass": "DIRECTORY",
736 "type": "directory",
737 "ctime": response.create_time,
738 "mtime": response.update_time,
739 "metageneration": response.metageneration,
740 }
741 except api_exceptions.NotFound:
742 # If get_folder fails, the folder does not exist.
743 raise FileNotFoundError(path)
744 except Exception as e:
745 # Log unexpected errors
746 logger.error(f"Error fetching folder metadata for {path}: {e}")
747 raise e
749 # Fallback to standard GCS behavior for non-HNS buckets
750 return await super()._get_directory_info(path, bucket, key, generation)
752 async def _rmdir(self, path):
753 """
754 Deletes an empty directory. Overrides the parent `_rmdir` to delete
755 empty directories in HNS-enabled buckets.
756 """
757 path = self._strip_protocol(path)
758 bucket, key, _ = self.split_path(path)
760 # The parent _rmdir is only for deleting buckets. If key is empty,
761 # given path is a bucket, we can fall back.
762 if not key:
763 return await super()._rmdir(path)
765 is_hns = await self._is_bucket_hns_enabled(bucket)
766 if not is_hns:
767 return await super()._rmdir(path)
769 # In HNS buckets, a placeholder object (e.g., 'a/b/c/') might exist,
770 # which would cause rmdir('a/b/c') to fail because the directory is not empty.
771 # To handle this, we first attempt to delete the placeholder object.
772 # If it doesn't exist, _rm_file will raise a FileNotFoundError, which we can
773 # safely ignore and proceed with the directory deletion.
774 #
775 # Note: This may delete the placeholder even if the directory contains
776 # other files and the final `delete_folder` call fails. This side
777 # effect is acceptable because placeholder objects are used to simulate
778 # folders and are not strictly necessary in HNS-enabled buckets, which
779 # have native folder entities.
780 try:
781 placeholder_path = f"{path.rstrip('/')}/"
783 await self._rm_file(placeholder_path)
784 logger.debug(
785 f"Removed placeholder object '{placeholder_path}' before rmdir."
786 )
787 except FileNotFoundError:
788 # This is expected if no placeholder object exists and can be safely ignored.
789 pass
791 try:
792 logger.debug(f"Using HNS-aware rmdir for '{path}'.")
793 folder_name = f"projects/_/buckets/{bucket}/folders/{key.rstrip('/')}"
794 request = storage_control_v2.DeleteFolderRequest(
795 name=folder_name,
796 request_id=str(uuid.uuid4()),
797 )
799 logger.debug(f"delete_folder request: {request}")
800 client = await self._get_control_plane_client()
801 await client.delete_folder(request=request)
803 # Remove the directory from the cache and from its parent's listing.
804 self.dircache.pop(path, None)
805 parent = self._parent(path)
806 if parent in self.dircache:
807 # Remove the deleted directory entry from the parent's listing.
808 self.dircache[parent] = [
809 e for e in self.dircache[parent] if e.get("name") != path
810 ]
811 return
812 except api_exceptions.NotFound as e:
813 # This can happen if the directory does not exist, or if the path
814 # points to a file object instead of a directory.
815 raise FileNotFoundError(f"rmdir failed for path: {path}: {e}") from e
816 except api_exceptions.FailedPrecondition as e:
817 # This can happen if the directory is not empty.
818 raise OSError(
819 f"Pre condition failed for rmdir for path: {path}: {e}"
820 ) from e
821 except Exception as e:
822 logger.error(f"HNS rmdir: Failed to delete folder '{path}': {e}")
823 raise
825 rmdir = asyn.sync_wrapper(_rmdir)
827 # TODO: This method is only added to be used in rm method, can be deleted once
828 # rm method is integrated with recursive API
829 async def _expand_path_with_details(
830 self, path, recursive=False, maxdepth=None, detail=False
831 ):
832 if maxdepth is not None and maxdepth < 1:
833 raise ValueError("maxdepth must be at least 1")
835 if isinstance(path, str):
836 return await self._expand_path_with_details(
837 [path], recursive, maxdepth, detail=detail
838 )
839 else:
840 out = {} if detail else set()
841 path = [self._strip_protocol(p) for p in path]
843 for p in path:
844 if has_magic(p):
845 bit = await self._glob(p, maxdepth=maxdepth, detail=detail)
846 if detail:
847 out.update(bit)
848 bit_paths = list(bit.keys())
849 else:
850 bit_set = set(bit)
851 out |= bit_set
852 bit_paths = list(bit_set)
854 if recursive:
855 if maxdepth is not None and maxdepth <= 1:
856 continue
857 rec = await self._expand_path_with_details(
858 bit_paths,
859 recursive=recursive,
860 maxdepth=maxdepth - 1 if maxdepth is not None else None,
861 detail=detail,
862 )
863 if detail:
864 for info in rec:
865 out[info["name"]] = info
866 else:
867 out |= set(rec)
868 continue
869 elif recursive:
870 rec = await self._find(
871 p, maxdepth=maxdepth, withdirs=True, detail=detail
872 )
873 if detail:
874 out.update(rec)
875 else:
876 out |= set(rec)
878 if p not in out:
879 if detail:
880 try:
881 info = await self._info(p)
882 out[p] = info
883 except (FileNotFoundError, OSError):
884 pass
885 elif recursive is False or (await self._exists(p)):
886 out.add(p)
888 if detail:
889 out = list(out.values())
890 else:
891 out = sorted(out)
893 if not out:
894 raise FileNotFoundError(path)
895 return out
897 async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
898 """
899 Deletes files and directories.
901 This method overrides the parent `_rm` to correctly handle directory
902 deletion in HNS-enabled buckets. For non-HNS buckets, it falls back
903 to the parent implementation.
905 For HNS buckets, it first expands the path to get a list of all files
906 and directories, then categorizes them. It deletes files in batches
907 and then deletes directories individually.
909 Args:
910 path (str or list): The path(s) to delete.
911 recursive (bool): If True, deletes directories and their contents.
912 maxdepth (int, optional): The maximum depth to traverse for deletion.
913 batchsize (int): The number of files to delete in a single batch request.
914 """
915 if isinstance(path, list):
916 # For HNS check, we can check for bucket type from the first path.
917 bucket, _, _ = self.split_path(path[0]) if path else (None, None, None)
918 else:
919 bucket, _, _ = self.split_path(path)
921 is_hns = await self._is_bucket_hns_enabled(bucket)
923 if not is_hns:
924 # Fall back to the parent's async rm implementation for non-HNS buckets.
925 return await super()._rm(
926 path, recursive=recursive, maxdepth=maxdepth, batchsize=batchsize
927 )
929 paths = await self._expand_path_with_details(
930 path, recursive=recursive, maxdepth=maxdepth, detail=True
931 )
933 # Separate files and directories based on their type.
934 # Directories must be deleted from the deepest first.
935 files = list({p["name"] for p in paths if p["type"] == "file"})
936 dirs = sorted(
937 list({p["name"] for p in paths if p["type"] == "directory"}),
938 reverse=True,
939 )
941 return await self._perform_rm(files, dirs, path, batchsize=batchsize)
943 async def _perform_rm(self, files, dirs, path, batchsize):
944 """
945 Helper method to perform the deletion of files and directories.
947 Args:
948 files (list[str]): A list of file paths to delete.
949 dirs (list[str]): A list of directory paths to delete, sorted from
950 deepest to shallowest.
951 path (str): The original path for the rm operation, for error reporting.
952 batchsize (int): The number of files to delete in a single batch request.
954 Returns:
955 list: A list of exceptions that occurred during deletion.
956 """
957 # If no files or directories were found to delete, raise FileNotFoundError.
958 if not files and not dirs:
959 raise FileNotFoundError(path)
961 exs = await self._delete_files(files, batchsize)
962 # For directories, we must delete them from deepest to shallowest
963 # to avoid race conditions where a parent is deleted before its child.
964 # We group directories by depth and delete them level by level.
965 dirs_by_depth = {}
966 for d in dirs:
967 depth = d.count("/")
968 dirs_by_depth.setdefault(depth, []).append(d)
970 for depth in sorted(dirs_by_depth.keys(), reverse=True):
971 level_dirs = dirs_by_depth[depth]
972 results = await asyn._run_coros_in_chunks(
973 [self._rmdir(d) for d in level_dirs],
974 batch_size=batchsize,
975 return_exceptions=True,
976 )
977 for res in results:
978 if isinstance(res, Exception):
979 exs.append(res)
981 errors = [
982 ex
983 for ex in exs
984 if isinstance(ex, Exception)
985 and not isinstance(ex, (FileNotFoundError, api_exceptions.NotFound))
986 and "No such object" not in str(ex)
987 ]
989 if errors:
990 raise errors[0]
992 # Filter out non-critical "not found" errors from the final list.
993 # A successful rm should return an empty list.
994 return [
995 e
996 for e in exs
997 if e is not None
998 and not isinstance(e, (FileNotFoundError, api_exceptions.NotFound))
999 and "No such object" not in str(e)
1000 ]
1002 rm = asyn.sync_wrapper(_rm)
1004 async def _find(
1005 self,
1006 path,
1007 withdirs=False,
1008 detail=False,
1009 prefix="",
1010 versions=False,
1011 maxdepth=None,
1012 **kwargs,
1013 ):
1014 """
1015 HNS-aware find. Overrides the parent to correctly list empty folders in HNS buckets.
1017 For HNS buckets this method uses a hybrid approach for fetching files and directories:
1018 1. It fetches all files in a single recursive API call (like the parent).
1019 2. It concurrently fetches all folder objects (including empty ones)
1020 using a recursive walk with the Storage Control API.
1021 3. It merges these results for a complete listing.
1023 For buckets with flat structure, it falls back to the parent implementation.
1024 """
1025 path = self._strip_protocol(path)
1026 if maxdepth is not None and maxdepth < 1:
1027 raise ValueError("maxdepth must be at least 1")
1028 bucket, _, _ = self.split_path(path)
1030 is_hns = await self._is_bucket_hns_enabled(bucket)
1031 if not is_hns:
1032 # Use parent implementation if bucket is not HNS.
1033 return await super()._find(
1034 path,
1035 withdirs=withdirs,
1036 detail=detail,
1037 prefix=prefix,
1038 versions=versions,
1039 maxdepth=maxdepth,
1040 **kwargs,
1041 )
1043 # Hybrid approach for HNS enabled buckets
1044 # 1. Fetch all files from super find() method by passing withdirs as False.
1045 # We pass maxdepth as None here to ensure we fetch all files for caching,
1046 # and then filter by maxdepth at the end of this method.
1047 files_task = asyncio.create_task(
1048 super()._find(
1049 path,
1050 withdirs=False, # Fetch files only
1051 detail=True, # Get full details for merging and populating cache
1052 prefix=prefix,
1053 versions=versions,
1054 maxdepth=None,
1055 update_cache=False, # Defer caching until merging files and folders
1056 **kwargs,
1057 )
1058 )
1060 # 2. Fetch all folders recursively. This is necessary to find all folders,
1061 # especially empty ones.
1062 folders_task = asyncio.create_task(
1063 self._get_all_folders(path, bucket, prefix=prefix)
1064 )
1065 # 3. Run tasks concurrently and merge results.
1066 files_result, folders_result = await asyncio.gather(files_task, folders_task)
1068 # Always update the cache with both files and folders for consistency.
1069 cacheable_objects = list(files_result.values()) + folders_result
1070 self._get_dirs_and_update_cache(path, cacheable_objects, prefix=prefix)
1072 if not withdirs:
1073 # If not including directories, the final output should only contain files.
1074 all_objects = list(files_result.values())
1075 else:
1076 all_objects = cacheable_objects
1078 all_objects.sort(key=lambda o: o["name"])
1080 # Final filtering and formatting. `all_objects` now contains a complete
1081 # list of all files and folders.
1082 if maxdepth:
1083 depth = path.rstrip("/").count("/") + maxdepth
1084 all_objects = [o for o in all_objects if o["name"].count("/") <= depth]
1086 if detail:
1087 if versions:
1088 return {
1089 (
1090 f"{o['name']}#{o['generation']}"
1091 if "generation" in o
1092 else o["name"]
1093 ): o
1094 for o in all_objects
1095 }
1096 return {o["name"]: o for o in all_objects}
1098 if versions:
1099 return [
1100 f"{o['name']}#{o['generation']}" if "generation" in o else o["name"]
1101 for o in all_objects
1102 ]
1103 return [o["name"] for o in all_objects]
1105 async def _get_all_folders(self, path, bucket, prefix=""):
1106 """
1107 Recursively fetches all folder objects under a given path using the
1108 Storage Control API.
1109 """
1110 folders = []
1111 base_path = self.split_path(path)[1].rstrip("/")
1112 full_prefix = f"{base_path}/{prefix}".strip("/") if base_path else prefix
1114 folder_id = full_prefix
1115 if folder_id and not folder_id.endswith("/"):
1116 folder_id += "/"
1117 parent = f"projects/_/buckets/{bucket}"
1118 request = storage_control_v2.ListFoldersRequest(
1119 parent=parent, prefix=folder_id, request_id=str(uuid.uuid4())
1120 )
1121 logger.debug(f"list_folders request: {request}")
1123 client = await self._get_control_plane_client()
1124 async for folder in await client.list_folders(request=request):
1125 folders.append(self._create_folder_entry(bucket, folder))
1127 return folders
1129 def _create_folder_entry(self, bucket, folder):
1130 """Helper to create a dictionary representing a folder entry."""
1131 path = f"{bucket}/{folder.name.split('/folders/')[1]}".rstrip("/")
1132 _, key, _ = self.split_path(path)
1133 return {
1134 "Key": key,
1135 "Size": 0,
1136 "name": path,
1137 "size": 0,
1138 "type": "directory",
1139 "storageClass": "DIRECTORY",
1140 "ctime": folder.create_time,
1141 "mtime": folder.update_time,
1142 "metageneration": folder.metageneration,
1143 }
1145 async def _put_file(
1146 self,
1147 lpath,
1148 rpath,
1149 metadata=None,
1150 consistency=None,
1151 content_type=None,
1152 chunksize=50 * 2**20,
1153 callback=None,
1154 fixed_key_metadata=None,
1155 mode="overwrite",
1156 **kwargs,
1157 ):
1158 """Upload a local file.
1160 This method is optimized for Zonal buckets, using gRPC for uploads.
1161 In zonal buckets, file is left *unfinalized* by default unless
1162 `finalize_on_close` is set to True.
1163 For non-Zonal buckets, it delegates to the parent class's implementation.
1165 Parameters
1166 ----------
1167 lpath: str
1168 Path to the local file to be uploaded.
1169 rpath: str
1170 Path on GCS to upload the file to.
1171 metadata: dict, optional
1172 Unsupported for Zonal buckets and will be ignored.
1173 consistency: str, optional
1174 Unsupported for Zonal buckets and will be ignored.
1175 content_type: str, optional
1176 Unsupported for Zonal buckets and will be ignored except for the default.
1177 chunksize: int, optional
1178 The size of chunks to upload data in.
1179 callback: fsspec.callbacks.Callback, optional
1180 Callback to monitor the upload progress.
1181 fixed_key_metadata: dict, optional
1182 Unsupported for Zonal buckets and will be ignored.
1183 mode: str, optional
1184 The write mode, either 'overwrite' or 'create'.
1185 """
1186 bucket, key, generation = self.split_path(rpath)
1187 if not await self._is_zonal_bucket(bucket):
1188 return await super()._put_file(
1189 lpath,
1190 rpath,
1191 metadata=metadata,
1192 consistency=consistency,
1193 content_type=content_type,
1194 chunksize=chunksize,
1195 callback=callback,
1196 fixed_key_metadata=fixed_key_metadata,
1197 mode=mode,
1198 **kwargs,
1199 )
1201 if os.path.isdir(lpath):
1202 return
1204 if generation:
1205 raise ValueError("Cannot write to specific object generation")
1207 if (
1208 metadata
1209 or fixed_key_metadata
1210 or consistency
1211 or (content_type and content_type != "application/octet-stream")
1212 ):
1213 logger.warning(
1214 "Zonal buckets do not support content_type, metadata, "
1215 "fixed_key_metadata or consistency during upload. "
1216 "These parameters will be ignored."
1217 )
1218 await self._get_grpc_client()
1219 # Works for both 'overwrite' and 'create' modes
1220 writer = await zb_hns_utils.init_aaow(self.grpc_client, bucket, key)
1222 try:
1223 with open(lpath, "rb") as f:
1224 await writer.append_from_file(f, block_size=chunksize)
1225 finally:
1226 finalize_on_close = kwargs.get("finalize_on_close", self.finalize_on_close)
1227 await zb_hns_utils.close_aaow(writer, finalize_on_close=finalize_on_close)
1229 self.invalidate_cache(self._parent(rpath))
1231 async def _pipe_file(
1232 self,
1233 path,
1234 data,
1235 metadata=None,
1236 consistency=None,
1237 content_type="application/octet-stream",
1238 fixed_key_metadata=None,
1239 chunksize=50 * 2**20,
1240 mode="overwrite",
1241 **kwargs,
1242 ):
1243 """Upload bytes to a file.
1245 This method is optimized for Zonal buckets, using gRPC for uploads.
1246 In zonal buckets, file is left *unfinalized* by default unless
1247 `finalize_on_close` is set to True.
1248 For non-Zonal buckets, it delegates to the parent class's implementation.
1250 Parameters
1251 ----------
1252 path: str
1253 Path to the file to be written.
1254 data: bytes
1255 The content to write to the file.
1256 metadata: dict, optional
1257 Unsupported for Zonal buckets and will be ignored.
1258 consistency: str, optional
1259 Unsupported for Zonal buckets and will be ignored.
1260 content_type: str, optional
1261 Unsupported for Zonal buckets and will be ignored, except for the default.
1262 fixed_key_metadata: dict, optional
1263 Unsupported for Zonal buckets and will be ignored.
1264 chunksize: int, optional
1265 The size of chunks to upload data in.
1266 mode: str, optional
1267 The write mode, either 'overwrite' or 'create'.
1268 """
1269 bucket, key, generation = self.split_path(path)
1270 if not await self._is_zonal_bucket(bucket):
1271 return await super()._pipe_file(
1272 path,
1273 data,
1274 metadata=metadata,
1275 consistency=consistency,
1276 content_type=content_type,
1277 fixed_key_metadata=fixed_key_metadata,
1278 chunksize=chunksize,
1279 mode=mode,
1280 )
1282 if (
1283 metadata
1284 or fixed_key_metadata
1285 or (content_type and content_type != "application/octet-stream")
1286 ):
1287 logger.warning(
1288 "Zonal buckets do not support content_type, metadata or "
1289 "fixed_key_metadata during upload. These parameters will be ignored."
1290 )
1291 await self._get_grpc_client()
1292 # Works for both 'overwrite' and 'create' modes
1293 writer = await zb_hns_utils.init_aaow(self.grpc_client, bucket, key)
1294 try:
1295 for i in range(0, len(data), chunksize):
1296 await writer.append(data[i : i + chunksize])
1297 finally:
1298 finalize_on_close = kwargs.get("finalize_on_close", self.finalize_on_close)
1299 await zb_hns_utils.close_aaow(writer, finalize_on_close=finalize_on_close)
1301 self.invalidate_cache(self._parent(path))
1303 async def _get_file(self, rpath, lpath, callback=None, **kwargs):
1304 """
1305 Downloads a file from GCS to a local path.
1307 For Zonal buckets, it uses gRPC client for optimized downloads.
1308 For Standard buckets, it delegates to the parent class implementation.
1310 Parameters
1311 ----------
1312 rpath: str
1313 Path on GCS to download the file from.
1314 lpath: str
1315 Path to the local file to be downloaded.
1316 callback: fsspec.callbacks.Callback, optional
1317 Callback to monitor the download progress.
1318 **kwargs:
1319 For Zonal buckets, `chunksize` bytes (int) can be provided to control
1320 the download chunk size (default is 128KB).
1321 """
1322 bucket, key, generation = self.split_path(rpath)
1323 if not await self._is_zonal_bucket(bucket):
1324 return await super()._get_file(
1325 rpath,
1326 lpath,
1327 callback=callback,
1328 **kwargs,
1329 )
1331 if os.path.isdir(lpath):
1332 return
1333 callback = callback or NoOpCallback()
1335 mrd = None
1336 try:
1337 await self._get_grpc_client()
1338 mrd = await zb_hns_utils.init_mrd(self.grpc_client, bucket, key, generation)
1340 size = mrd.persisted_size
1341 if size is None:
1342 logger.warning(
1343 f"AsyncMultiRangeDownloader (MRD) for {rpath} has no 'persisted_size'. "
1344 "Falling back to _info() to get the file size. "
1345 "This may result in incorrect behavior for unfinalized objects."
1346 )
1347 size = (await self._info(rpath))["size"]
1348 callback.set_size(size)
1350 lparent = os.path.dirname(lpath) or os.curdir
1351 os.makedirs(lparent, exist_ok=True)
1353 chunksize = kwargs.get("chunksize", 4096 * 32) # 128KB default
1354 offset = 0
1356 with open(lpath, "wb") as f2:
1357 while True:
1358 if offset >= size:
1359 break
1361 data = await zb_hns_utils.download_range(
1362 offset=offset, length=chunksize, mrd=mrd
1363 )
1364 if not data:
1365 break
1367 f2.write(data)
1368 offset += len(data)
1369 callback.relative_update(len(data))
1370 except Exception as e:
1371 # Clean up the corrupted file before raising error
1372 if os.path.exists(lpath):
1373 os.remove(lpath)
1374 raise e
1375 finally:
1376 await zb_hns_utils.close_mrd(mrd)
1378 async def _do_list_objects(
1379 self,
1380 path,
1381 max_results=None,
1382 delimiter="/",
1383 prefix="",
1384 versions=False,
1385 **kwargs,
1386 ):
1387 """
1388 Lists objects in a bucket.
1390 For HNS-enabled buckets, it sets `includeFoldersAsPrefixes` to True
1391 when the delimiter is '/'.
1392 """
1393 bucket, _, _ = self.split_path(path)
1394 if await self._is_bucket_hns_enabled(bucket) and delimiter == "/":
1395 kwargs["includeFoldersAsPrefixes"] = "true"
1397 return await super()._do_list_objects(
1398 path,
1399 max_results=max_results,
1400 delimiter=delimiter,
1401 prefix=prefix,
1402 versions=versions,
1403 **kwargs,
1404 )
1406 async def _cp_file(self, path1, path2, acl=None, **kwargs):
1407 """Duplicate remote file.
1409 For Standard GCS buckets, falls back to the parent class's implementation
1411 Zonal Bucket Support:
1412 Server-side copy is currently NOT supported for Zonal buckets because
1413 the `RewriteObject` API is unavailable for them.
1415 The following scenarios will raise a `NotImplementedError`:
1416 * Intra-zonal: Copying within the same Zonal bucket.
1417 * Inter-zonal: Copying between two different Zonal buckets.
1418 * Mixed: Copying between a Zonal bucket and a Standard bucket.
1420 """
1421 b1, _, _ = self.split_path(path1)
1422 b2, _, _ = self.split_path(path2)
1424 is_zonal_source, is_zonal_dest = await asyncio.gather(
1425 self._is_zonal_bucket(b1), self._is_zonal_bucket(b2)
1426 )
1428 # 1. Standard -> Standard (Delegate to core implementation)
1429 if not is_zonal_source and not is_zonal_dest:
1430 return await super()._cp_file(path1, path2, acl=acl, **kwargs)
1432 # 2. Zonal Scenarios (Currently Unsupported)
1433 raise NotImplementedError(
1434 "Server-side copy involving Zonal buckets is not supported. "
1435 "Zonal objects do not support rewrite."
1436 )
1438 async def _merge(self, path, paths, acl=None):
1439 """Concatenate objects within a single bucket.
1441 For Standard GCS buckets, falls back to the parent class's implementation
1443 Zonal Bucket Support:
1444 Server-side compose is currently NOT supported for Zonal buckets.
1445 """
1446 bucket, _, _ = self.split_path(path)
1448 if await self._is_zonal_bucket(bucket):
1449 raise NotImplementedError(
1450 "Server-side compose/merge is not supported for Zonal buckets."
1451 )
1453 return await super()._merge(path, paths, acl=acl)
1455 merge = asyn.sync_wrapper(_merge)
1458async def upload_chunk(fs, location, data, offset, size, content_type):
1459 """
1460 Uploads a chunk of data using AsyncAppendableObjectWriter for zonal buckets.
1461 Finalizes the upload when the total uploaded data size reaches the specified size.
1462 Delegates to core upload_chunk implementation for Non-Zonal buckets.
1463 """
1464 # If `location` is an HTTP resumable-upload URL (string), delegate to core upload_chunk
1465 # for Standard buckets.
1466 if isinstance(location, (str, bytes)):
1467 from gcsfs.core import upload_chunk as core_upload_chunk
1469 return await core_upload_chunk(fs, location, data, offset, size, content_type)
1471 if not isinstance(location, AsyncAppendableObjectWriter):
1472 raise TypeError(
1473 "upload_chunk for Zonal buckets expects an AsyncAppendableObjectWriter instance."
1474 )
1476 if offset or size or content_type:
1477 logger.warning(
1478 "Zonal buckets do not support offset, or content_type during upload. These parameters will be ignored."
1479 )
1481 if not location._is_stream_open:
1482 raise ValueError("Writer is closed. Please initiate a new upload.")
1484 try:
1485 await location.append(data)
1486 except Exception as e:
1487 logger.error(
1488 f"Error uploading chunk at offset {location.offset}: {e}. Closing stream."
1489 )
1490 # Don't finalize the upload on error
1491 await zb_hns_utils.close_aaow(location, finalize_on_close=False)
1492 raise
1494 if (location.offset or 0) >= size:
1495 logger.debug("Uploaded data is equal or greater than size. Finalizing upload.")
1496 await zb_hns_utils.close_aaow(location, finalize_on_close=True)
1499async def initiate_upload(
1500 fs,
1501 bucket,
1502 key,
1503 content_type="application/octet-stream",
1504 metadata=None,
1505 fixed_key_metadata=None,
1506 mode="overwrite",
1507 kms_key_name=None,
1508):
1509 """
1510 Initiates an upload for Zonal buckets by creating an AsyncAppendableObjectWriter.
1511 Delegates to core initiate_upload implementation for Non-Zonal buckets.
1513 Parameters
1514 ----------
1515 fs: GCSFileSystem
1516 The GCS filesystem instance.
1517 bucket: str
1518 The target bucket name.
1519 key: str
1520 The target object key.
1521 content_type: str, optional
1522 Unsupported for Zonal buckets and will be ignored, except for the default.
1523 metadata: dict, optional
1524 Unsupported for Zonal buckets and will be ignored.
1525 fixed_key_metadata: dict, optional
1526 Unsupported for Zonal buckets and will be ignored.
1527 mode: str, optional
1528 The write mode, either 'overwrite' or 'create'.
1529 kms_key_name: str, optional
1530 Unsupported for Zonal buckets and will be ignored.
1531 """
1532 if not await fs._is_zonal_bucket(bucket):
1533 from gcsfs.core import initiate_upload as core_initiate_upload
1535 return await core_initiate_upload(
1536 fs,
1537 bucket,
1538 key,
1539 content_type,
1540 metadata,
1541 fixed_key_metadata,
1542 mode,
1543 kms_key_name,
1544 )
1546 if (
1547 metadata
1548 or fixed_key_metadata
1549 or kms_key_name
1550 or (content_type and content_type != "application/octet-stream")
1551 ):
1552 logger.warning(
1553 "Zonal buckets do not support content_type, metadata, fixed_key_metadata, "
1554 "or kms_key_name during upload. These parameters will be ignored."
1555 )
1557 await fs._get_grpc_client()
1558 # If generation is not passed to init_aaow, it creates a new object and overwrites if object already exists.
1559 # Hence it works for both 'overwrite' and 'create' modes.
1560 return await zb_hns_utils.init_aaow(fs.grpc_client, bucket, key)
1563async def simple_upload(
1564 fs,
1565 bucket,
1566 key,
1567 datain,
1568 metadatain=None,
1569 consistency=None,
1570 content_type="application/octet-stream",
1571 fixed_key_metadata=None,
1572 mode="overwrite",
1573 kms_key_name=None,
1574 **kwargs,
1575):
1576 """
1577 Performs a simple, single-request upload to Zonal bucket using gRPC.
1578 In zonal buckets, file is left *unfinalized* by default unless
1579 `finalize_on_close` is set to True.
1580 Delegates to core simple_upload implementation for Non-Zonal buckets.
1582 Parameters
1583 ----------
1584 fs: GCSFileSystem
1585 The GCS filesystem instance.
1586 bucket: str
1587 The target bucket name.
1588 key: str
1589 The target object key.
1590 datain: bytes
1591 The data to be uploaded.
1592 metadatain: dict, optional
1593 Unsupported for Zonal buckets and will be ignored.
1594 consistency: str, optional
1595 Unsupported for Zonal buckets and will be ignored.
1596 content_type: str, optional
1597 Unsupported for Zonal buckets and will be ignored, except for the default.
1598 fixed_key_metadata: dict, optional
1599 Unsupported for Zonal buckets and will be ignored.
1600 mode: str, optional
1601 The write mode, either 'overwrite' or 'create'.
1602 kms_key_name: str, optional
1603 Unsupported for Zonal buckets and will be ignored.
1604 """
1605 if not await fs._is_zonal_bucket(bucket):
1606 from gcsfs.core import simple_upload as core_simple_upload
1608 return await core_simple_upload(
1609 fs,
1610 bucket,
1611 key,
1612 datain,
1613 metadatain,
1614 consistency,
1615 content_type,
1616 fixed_key_metadata,
1617 mode,
1618 kms_key_name,
1619 )
1621 if (
1622 metadatain
1623 or fixed_key_metadata
1624 or kms_key_name
1625 or consistency
1626 or (content_type and content_type != "application/octet-stream")
1627 ):
1628 logger.warning(
1629 "Zonal buckets do not support content_type, metadatain, fixed_key_metadata, "
1630 "consistency or kms_key_name during upload. These parameters will be ignored."
1631 )
1632 await fs._get_grpc_client()
1633 # If generation is not passed to init_aaow, it creates a new object and overwrites if object already exists.
1634 # Hence it works for both 'overwrite' and 'create' modes.
1635 writer = await zb_hns_utils.init_aaow(fs.grpc_client, bucket, key)
1636 try:
1637 await writer.append(datain)
1638 finally:
1639 default_finalize = getattr(fs, "finalize_on_close", False)
1640 finalize_on_close = kwargs.get("finalize_on_close", default_finalize)
1641 await zb_hns_utils.close_aaow(writer, finalize_on_close=finalize_on_close)