Coverage for gcsfs/extended_gcsfs.py: 33%

598 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1import asyncio 

2import logging 

3import os 

4import uuid 

5from enum import Enum 

6from glob import has_magic 

7 

8from fsspec import asyn 

9from fsspec.callbacks import NoOpCallback 

10from google.api_core import exceptions as api_exceptions 

11from google.api_core.client_info import ClientInfo 

12from google.api_core.client_options import ClientOptions 

13from google.auth.credentials import AnonymousCredentials 

14from google.cloud import storage_control_v2 

15from google.cloud.storage.asyncio.async_appendable_object_writer import ( 

16 AsyncAppendableObjectWriter, 

17) 

18from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient 

19 

20from gcsfs import __version__ as version 

21from gcsfs import zb_hns_utils 

22from gcsfs.core import GCSFile, GCSFileSystem 

23from gcsfs.zonal_file import ZonalFile 

24 

25logger = logging.getLogger("gcsfs") 

26 

27USER_AGENT = "python-gcsfs" 

28 

29 

30class BucketType(Enum): 

31 ZONAL_HIERARCHICAL = "ZONAL_HIERARCHICAL" 

32 HIERARCHICAL = "HIERARCHICAL" 

33 NON_HIERARCHICAL = "NON_HIERARCHICAL" 

34 UNKNOWN = "UNKNOWN" 

35 

36 

37gcs_file_types = { 

38 BucketType.ZONAL_HIERARCHICAL: ZonalFile, 

39 BucketType.NON_HIERARCHICAL: GCSFile, 

40 BucketType.HIERARCHICAL: GCSFile, 

41 BucketType.UNKNOWN: GCSFile, 

42} 

43 

44 

45class ExtendedGcsFileSystem(GCSFileSystem): 

46 """ 

47 This class will be used when GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT env variable is set to true. 

48 ExtendedGcsFileSystem is a subclass of GCSFileSystem that adds new logic for bucket types 

49 including zonal and hierarchical. For buckets without special properties, it forwards requests 

50 to the parent class GCSFileSystem for default processing. 

51 """ 

52 

53 def __init__(self, *args, finalize_on_close=False, **kwargs): 

54 super().__init__(*args, **kwargs) 

55 # By default, files in zonal buckets are left unfinalized to allow appends. 

56 self.finalize_on_close = finalize_on_close 

57 self._grpc_client = None 

58 self._storage_control_client = None 

59 # Adds user-passed credentials to ExtendedGcsFileSystem to pass to gRPC/Storage Control clients. 

60 # We unwrap the nested credentials here because self.credentials is a GCSFS wrapper, 

61 # but the clients expect the underlying google.auth credentials object. 

62 self.credential = self.credentials.credentials 

63 # When token="anon", self.credentials.credentials is None. This is 

64 # often used for testing with emulators. However, the gRPC and storage 

65 # control clients require a credentials object for initialization. 

66 # We explicitly use AnonymousCredentials() to allow unauthenticated access. 

67 if self.credentials.token == "anon": 

68 self.credential = AnonymousCredentials() 

69 self._storage_layout_cache = {} 

70 

71 @property 

72 def _user_project(self): 

73 """Value used for billing - enabling "requestor pays" access""" 

74 if self.requester_pays: 

75 return ( 

76 self.requester_pays 

77 if isinstance(self.requester_pays, str) 

78 else self.project 

79 ) 

80 return None 

81 

82 @property 

83 def grpc_client(self): 

84 if self.asynchronous and self._grpc_client is None: 

85 raise RuntimeError( 

86 "Please await _get_grpc_client() before accessing grpc_client" 

87 ) 

88 if self._grpc_client is None: 

89 self._grpc_client = asyn.sync(self.loop, self._get_grpc_client) 

90 return self._grpc_client 

91 

92 async def _get_grpc_client(self): 

93 if self._grpc_client is None: 

94 client_options = None 

95 if self._location: 

96 # client_options expects only the host:port, without the protocol. 

97 endpoint = self._location.split("://")[-1] 

98 client_options = ClientOptions(api_endpoint=endpoint) 

99 self._grpc_client = AsyncGrpcClient( 

100 credentials=self.credential, 

101 client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"), 

102 client_options=client_options, 

103 ) 

104 return self._grpc_client 

105 

106 async def _get_control_plane_client(self): 

107 if self._storage_control_client is None: 

108 

109 # Initialize the storage control plane client for bucket 

110 # metadata operations 

111 transport_cls = ( 

112 storage_control_v2.StorageControlAsyncClient.get_transport_class( 

113 "grpc_asyncio" 

114 ) 

115 ) 

116 channel = transport_cls.create_channel( 

117 credentials=self.credential, 

118 options=[("grpc.primary_user_agent", f"{USER_AGENT}/{version}")], 

119 quota_project_id=self._user_project, 

120 ) 

121 transport = transport_cls(channel=channel) 

122 self._storage_control_client = storage_control_v2.StorageControlAsyncClient( 

123 transport=transport 

124 ) 

125 return self._storage_control_client 

126 

127 async def _lookup_bucket_type(self, bucket): 

128 if bucket in self._storage_layout_cache: 

129 return self._storage_layout_cache[bucket] 

130 bucket_type = await self._get_bucket_type(bucket) 

131 # Dont cache UNKNOWN type 

132 if bucket_type == BucketType.UNKNOWN: 

133 return bucket_type 

134 self._storage_layout_cache[bucket] = bucket_type 

135 return self._storage_layout_cache[bucket] 

136 

137 _sync_lookup_bucket_type = asyn.sync_wrapper(_lookup_bucket_type) 

138 

139 async def _get_bucket_type(self, bucket): 

140 try: 

141 client = await self._get_control_plane_client() 

142 bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout" 

143 logger.debug(f"get_storage_layout request for name: {bucket_name_value}") 

144 response = await client.get_storage_layout(name=bucket_name_value) 

145 

146 if response.location_type == "zone": 

147 return BucketType.ZONAL_HIERARCHICAL 

148 if ( 

149 response.hierarchical_namespace 

150 and response.hierarchical_namespace.enabled 

151 ): 

152 return BucketType.HIERARCHICAL 

153 return BucketType.NON_HIERARCHICAL 

154 except api_exceptions.NotFound: 

155 logger.warning( 

156 f"Error: Bucket {bucket} not found or you lack permissions for " 

157 f"storage layout api used to detect bucket type. Falling back to GCSFileSystem." 

158 ) 

159 return BucketType.UNKNOWN 

160 except Exception as e: 

161 logger.warning( 

162 f"Could not determine bucket type for bucket name {bucket}: {e}, falling back to GCSFileSystem" 

163 ) 

164 # Default to UNKNOWN in case bucket type is not obtained 

165 return BucketType.UNKNOWN 

166 

167 def _open( 

168 self, 

169 path, 

170 mode="rb", 

171 block_size=None, 

172 cache_options=None, 

173 acl=None, 

174 consistency=None, 

175 metadata=None, 

176 autocommit=True, 

177 fixed_key_metadata=None, 

178 generation=None, 

179 **kwargs, 

180 ): 

181 """ 

182 Open a file. 

183 """ 

184 bucket, _, _ = self.split_path(path) 

185 bucket_type = self._sync_lookup_bucket_type(bucket) 

186 

187 return gcs_file_types[bucket_type]( 

188 self, 

189 path, 

190 mode, 

191 block_size=block_size or self.default_block_size, 

192 cache_options=cache_options, 

193 consistency=consistency or self.consistency, 

194 metadata=metadata, 

195 acl=acl, 

196 autocommit=autocommit, 

197 fixed_key_metadata=fixed_key_metadata, 

198 generation=generation, 

199 finalize_on_close=kwargs.pop("finalize_on_close", self.finalize_on_close), 

200 **kwargs, 

201 ) 

202 

203 # Replacement method for _process_limits to support new params (offset and length) for MRD. 

204 async def _process_limits_to_offset_and_length( 

205 self, path, start, end, file_size=None 

206 ): 

207 """ 

208 Calculates the read offset and length from start and end parameters. 

209 

210 Args: 

211 path (str): The path to the file. 

212 start (int | None): The starting byte position. 

213 end (int | None): The ending byte position. 

214 file_size (int | None): The total size of the file. If None, it will be fetched via _info(). 

215 

216 Returns: 

217 tuple: A tuple containing (offset, length). 

218 """ 

219 size = file_size 

220 

221 if start is None: 

222 offset = 0 

223 elif start < 0: 

224 size = (await self._info(path))["size"] if size is None else size 

225 # If start is negative and larger than the file size, we should start from 0. 

226 offset = max(0, size + start) 

227 else: 

228 offset = start 

229 

230 if end is None: 

231 size = (await self._info(path))["size"] if size is None else size 

232 effective_end = size 

233 elif end < 0: 

234 size = (await self._info(path))["size"] if size is None else size 

235 effective_end = size + end 

236 else: 

237 effective_end = end 

238 

239 # If the requested end is before/ same as the start, return empty. 

240 if effective_end <= offset: 

241 return offset, 0 

242 else: 

243 length = effective_end - offset # Normal case 

244 size = (await self._info(path))["size"] if size is None else size 

245 if effective_end > size: 

246 length = max(0, size - offset) # Clamp and ensure non-negative 

247 

248 return offset, length 

249 

250 sync_process_limits_to_offset_and_length = asyn.sync_wrapper( 

251 _process_limits_to_offset_and_length 

252 ) 

253 

254 async def _is_zonal_bucket(self, bucket): 

255 bucket_type = await self._lookup_bucket_type(bucket) 

256 return bucket_type == BucketType.ZONAL_HIERARCHICAL 

257 

258 async def _fetch_range_split( 

259 self, path, start=None, chunk_lengths=None, mrd=None, size=None, **kwargs 

260 ): 

261 """ 

262 Reading multiple reads in one large stream. 

263 

264 Optimized for Zonal Buckets: 

265 Leverages AsyncMultiRangeDownloader.download_ranges() to fetch all requested 

266 'chunk_lengths' (chunks) concurrently in a single batch request, significantly 

267 improving performance for ReadAheadV2. 

268 """ 

269 

270 bucket, object_name, generation = self.split_path(path) 

271 mrd_created = False 

272 try: 

273 if mrd is None: 

274 # Check before creating MRD 

275 if not await self._is_zonal_bucket(bucket): 

276 raise RuntimeError( 

277 "Internal error, this method is only supported for zonal buckets!" 

278 ) 

279 

280 await self._get_grpc_client() 

281 mrd = await zb_hns_utils.init_mrd( 

282 self.grpc_client, bucket, object_name, generation 

283 ) 

284 mrd_created = True 

285 

286 file_size = size or mrd.persisted_size 

287 if file_size is None: 

288 logger.warning( 

289 f"AsyncMultiRangeDownloader (MRD) for {path} has no 'persisted_size'. " 

290 "Falling back to _info() to get the file size." 

291 ) 

292 file_size = (await self._info(path))["size"] 

293 

294 if chunk_lengths: 

295 start_offset = start if start is not None else 0 

296 current_offset = start_offset 

297 

298 if start_offset >= file_size or ( 

299 chunk_lengths is not None 

300 and start_offset + sum(chunk_lengths) > file_size 

301 ): 

302 raise RuntimeError("Request not satisfiable.") 

303 

304 read_ranges = [] # To pass to MRD 

305 

306 for length in chunk_lengths: 

307 read_ranges.append((current_offset, length)) 

308 current_offset += length 

309 

310 return await zb_hns_utils.download_ranges(read_ranges, mrd) 

311 else: 

312 end = kwargs.get("end") 

313 offset, length = await self._process_limits_to_offset_and_length( 

314 path, start, end, file_size 

315 ) 

316 

317 data = await zb_hns_utils.download_range( 

318 offset=offset, length=length, mrd=mrd 

319 ) 

320 return [data] 

321 finally: 

322 if mrd_created: 

323 await zb_hns_utils.close_mrd(mrd) 

324 

325 async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs): 

326 """Fetch a file's contents as bytes, with an optimized path for Zonal buckets. 

327 

328 This method overrides the parent `_cat_file` to read objects in Zonal buckets using gRPC. 

329 

330 Args: 

331 path (str): The full GCS path to the file (e.g., "bucket/object"). 

332 start (int, optional): The starting byte position to read from. 

333 end (int, optional): The ending byte position to read to. 

334 mrd (AsyncMultiRangeDownloader, optional): An existing multi-range 

335 downloader instance. If not provided, a new one will be created for Zonal buckets. 

336 

337 Returns: 

338 bytes: The content of the file or file range. 

339 """ 

340 try: 

341 mrd_created = False 

342 

343 # A new MRD is required when read is done directly by the 

344 # GCSFilesystem class without creating a GCSFile object first. 

345 if mrd is None: 

346 bucket, object_name, generation = self.split_path(path) 

347 # Fall back to default implementation if not a zonal bucket 

348 if not await self._is_zonal_bucket(bucket): 

349 return await super()._cat_file(path, start=start, end=end, **kwargs) 

350 

351 await self._get_grpc_client() 

352 mrd = await zb_hns_utils.init_mrd( 

353 self.grpc_client, bucket, object_name, generation 

354 ) 

355 mrd_created = True 

356 

357 file_size = mrd.persisted_size 

358 if file_size is None: 

359 logger.warning( 

360 f"AsyncMultiRangeDownloader (MRD) for {path} has no 'persisted_size'. " 

361 "Falling back to _info() to get the file size. " 

362 "This may result in incorrect behavior for unfinalized objects." 

363 ) 

364 offset, length = await self._process_limits_to_offset_and_length( 

365 path, start, end, file_size 

366 ) 

367 

368 return await zb_hns_utils.download_range( 

369 offset=offset, length=length, mrd=mrd 

370 ) 

371 finally: 

372 # Explicit cleanup if we created the MRD 

373 if mrd_created: 

374 await zb_hns_utils.close_mrd(mrd) 

375 

376 async def _is_bucket_hns_enabled(self, bucket): 

377 """Checks if a bucket has Hierarchical Namespace enabled.""" 

378 try: 

379 bucket_type = await self._lookup_bucket_type(bucket) 

380 except Exception as e: 

381 logger.warning( 

382 f"Could not determine if bucket '{bucket}' is HNS-enabled, falling back to default non-HNS: {e}", 

383 stack_info=True, 

384 ) 

385 return False 

386 

387 return bucket_type in [BucketType.ZONAL_HIERARCHICAL, BucketType.HIERARCHICAL] 

388 

389 def _update_dircache_after_rename(self, path1, path2): 

390 """ 

391 Performs a targeted update of the directory cache after a successful 

392 folder rename operation. 

393 

394 This involves three main steps: 

395 1. Removing the source folder and all its descendants from the cache. 

396 2. Removing the source folder's entry from its parent's listing. 

397 3. Adding the new destination folder's entry to its parent's listing. 

398 

399 Args: 

400 path1 (str): The source path that was renamed. 

401 path2 (str): The destination path. 

402 """ 

403 # 1. Find and remove all descendant paths of the source from the cache. 

404 source_prefix = f"{path1.rstrip('/')}/" 

405 for key in list(self.dircache): 

406 if key.startswith(source_prefix): 

407 self.dircache.pop(key, None) 

408 

409 # 2. Remove the old source entry from its parent's listing. 

410 self.dircache.pop(path1, None) 

411 parent1 = self._parent(path1) 

412 if parent1 in self.dircache: 

413 self.dircache[parent1] = [ 

414 e for e in self.dircache[parent1] if e.get("name") != path1 

415 ] 

416 

417 # 3. Invalidate the destination path and update its parent's cache. 

418 self.dircache.pop(path2, None) 

419 parent2 = self._parent(path2) 

420 if parent2 in self.dircache: 

421 _, key2, _ = self.split_path(path2) 

422 new_entry = { 

423 "Key": key2, 

424 "Size": 0, 

425 "name": path2, 

426 "size": 0, 

427 "type": "directory", 

428 "storageClass": "DIRECTORY", 

429 } 

430 self.dircache[parent2].append(new_entry) 

431 

432 async def _mv_file_cache_update(self, path1, path2, response=None): 

433 """ 

434 Update the cache after a file move operation. 

435 

436 For HNS-enabled buckets where the move is within the same bucket, this method 

437 directly updates the directory cache by removing the source entry from it's 

438 parent cache and adding destination path as a new entry in it's corresponding parent cache. 

439 This avoids invalidating the entire parent directory cache, which is beneficial for HNS 

440 performance. 

441 

442 For non-HNS buckets or cross-bucket moves, it falls back to the default 

443 behavior (invalidating the cache for both source and destination parents). 

444 """ 

445 src_bucket, _, _ = self.split_path(path1) 

446 dest_bucket, _, _ = self.split_path(path2) 

447 

448 if await self._is_bucket_hns_enabled(src_bucket) and src_bucket == dest_bucket: 

449 src_parent = self._parent(path1) 

450 if src_parent in self.dircache: 

451 path1_stripped = self._strip_protocol(path1) 

452 self.dircache[src_parent] = [ 

453 e 

454 for e in self.dircache[src_parent] 

455 if e.get("name") != path1_stripped 

456 ] 

457 dest_parent = self._parent(path2) 

458 if dest_parent in self.dircache and response: 

459 new_entry = self._process_object(dest_bucket, response) 

460 self.dircache[dest_parent].append(new_entry) 

461 else: 

462 await super()._mv_file_cache_update(path1, path2, response) 

463 

464 async def _mv(self, path1, path2, **kwargs): 

465 """ 

466 Move a file or directory. Overrides the parent `_mv` to provide an 

467 optimized, atomic implementation for renaming folders and moving files 

468 in HNS-enabled buckets. Falls back to the parent's object-level 

469 copy-and-delete implementation for non-HNS buckets. 

470 """ 

471 if path1 == path2: 

472 logger.debug( 

473 "%s mv: The paths are the same, so no files/directories were moved.", 

474 self, 

475 ) 

476 return 

477 

478 if ( 

479 isinstance(path1, list) 

480 or isinstance(path2, list) 

481 or (isinstance(path1, str) and has_magic(path1)) 

482 ): 

483 return await super()._mv(path1, path2, **kwargs) 

484 

485 bucket1, key1, _ = self.split_path(path1) 

486 bucket2, key2, _ = self.split_path(path2) 

487 

488 is_hns = await self._is_bucket_hns_enabled(bucket1) 

489 

490 if not is_hns: 

491 logger.debug( 

492 f"Not an HNS bucket. Falling back to object-level mv for '{path1}' to '{path2}'." 

493 ) 

494 return await super()._mv(path1, path2, **kwargs) 

495 

496 try: 

497 info1 = await self._info(path1) 

498 is_folder = info1.get("type") == "directory" 

499 

500 # We only use HNS rename if the source is a folder and the move is 

501 # within the same bucket. 

502 if is_folder and bucket1 == bucket2 and key1: 

503 logger.debug( 

504 f"Using HNS-aware folder rename for '{path1}' to '{path2}'." 

505 ) 

506 source_folder_name = f"projects/_/buckets/{bucket1}/folders/{key1}" 

507 destination_folder_id = key2 or key1.rstrip("/").split("/")[-1] 

508 

509 request = storage_control_v2.RenameFolderRequest( 

510 name=source_folder_name, 

511 destination_folder_id=destination_folder_id, 

512 request_id=str(uuid.uuid4()), 

513 ) 

514 

515 logger.debug(f"rename_folder request: {request}") 

516 client = await self._get_control_plane_client() 

517 operation = await client.rename_folder(request=request) 

518 await operation.result() 

519 self._update_dircache_after_rename(path1, path2) 

520 

521 logger.debug( 

522 "Successfully renamed folder from '%s' to '%s'", path1, path2 

523 ) 

524 return 

525 elif not is_folder: 

526 await self._mv_file(path1, path2) 

527 return 

528 except Exception as e: 

529 if isinstance(e, FileNotFoundError): 

530 # If the source doesn't exist, fail fast. 

531 raise 

532 if isinstance(e, api_exceptions.NotFound): 

533 raise FileNotFoundError( 

534 f"Source '{path1}' not found for move operation." 

535 ) from e 

536 if isinstance(e, api_exceptions.Conflict): 

537 # This occurs if the destination folder already exists. 

538 # Raise FileExistsError for fsspec compatibility. 

539 raise FileExistsError( 

540 f"HNS rename failed due to conflict for '{path1}' to '{path2}'" 

541 ) from e 

542 if isinstance(e, api_exceptions.FailedPrecondition): 

543 raise OSError(f"HNS rename failed: {e}") from e 

544 

545 logger.warning(f"Could not perform HNS-aware mv: {e}") 

546 

547 logger.debug(f"Falling back to object-level mv for '{path1}' to '{path2}'.") 

548 return await super()._mv(path1, path2, **kwargs) 

549 

550 mv = asyn.sync_wrapper(_mv) 

551 

552 async def _list_objects(self, path, prefix="", versions=False, **kwargs): 

553 try: 

554 return await super()._list_objects( 

555 path, prefix=prefix, versions=versions, **kwargs 

556 ) 

557 except FileNotFoundError: 

558 bucket, key, _ = self.split_path(path) 

559 if key and await self._is_bucket_hns_enabled(bucket): 

560 try: 

561 await self._get_directory_info(path, bucket, key, None) 

562 return [] 

563 except (FileNotFoundError, Exception): 

564 pass 

565 raise 

566 

567 async def _mkdir( 

568 self, 

569 path, 

570 create_parents=False, 

571 enable_hierarchical_namespace=False, 

572 placement=None, 

573 location=None, 

574 **kwargs, 

575 ): 

576 """ 

577 Create a directory or bucket. 

578 

579 If the path refers to a bucket (no object key), a new bucket is created. 

580 If the path refers to a directory (includes object key), a directory is created. 

581 

582 Parameters 

583 ---------- 

584 path : str 

585 Path to create. 

586 create_parents : bool 

587 If True, create parent directories if they do not exist. 

588 If the path includes a bucket that does not exist, the bucket will also be created. 

589 enable_hierarchical_namespace : bool 

590 If True, and a bucket is being created, the bucket will have Hierarchical 

591 Namespace (HNS) enabled. 

592 placement : str, optional 

593 If set to a zone (e.g. "us-central1-a"), a Zonal bucket is created. 

594 Zonal buckets are HNS-enabled by default. 

595 When creating a Zonal bucket, `location` must be passed as a 

596 region (e.g. "us-central1"). If `location` is not specified, it defaults 

597 to `self.default_location`. The zone specified in `placement` must belong 

598 to the region specified in `location`. 

599 location : str, optional 

600 Location where buckets are created, like 'US' or 'EUROPE-WEST3'. 

601 If not provided, defaults to `self.default_location`. 

602 **kwargs : dict 

603 Additional arguments passed to the bucket creation API. 

604 

605 Notes 

606 ----- 

607 - For HNS-enabled buckets (including Zonal buckets), this method creates a 

608 native folder object. 

609 - If `create_parents` is False and a parent directory does not exist in an 

610 HNS/Zonal bucket, a FileNotFoundError is raised. 

611 - For non-HNS buckets, this falls back to the parent implementation. Since 

612 standard GCS has no true directories, `mkdir` on a path with a key is 

613 typically a no-op unless `create_parents=True` triggers bucket creation. 

614 """ 

615 path = self._strip_protocol(path) 

616 bucket, key, _ = self.split_path(path) 

617 

618 # Determine if we are requesting creation of a Zonal or HNS bucket 

619 should_create_zonal_bucket = placement is not None 

620 should_create_hns_bucket = ( 

621 enable_hierarchical_namespace or should_create_zonal_bucket 

622 ) 

623 

624 # Prepare arguments for bucket creation 

625 bucket_kwargs = kwargs.copy() 

626 if location: 

627 bucket_kwargs["location"] = location 

628 if should_create_zonal_bucket: 

629 bucket_kwargs["customPlacementConfig"] = {"dataLocations": [placement]} 

630 bucket_kwargs["storageClass"] = "RAPID" 

631 

632 if should_create_hns_bucket: 

633 bucket_kwargs["hierarchicalNamespace"] = {"enabled": True} 

634 # HNS buckets require uniform bucket-level access. 

635 bucket_kwargs["iamConfiguration"] = { 

636 "uniformBucketLevelAccess": {"enabled": True} 

637 } 

638 # When uniformBucketLevelAccess is enabled, ACLs cannot be used. 

639 # We must explicitly set them to None to prevent the parent 

640 # method from using default ACLs. 

641 bucket_kwargs["acl"] = None 

642 bucket_kwargs["default_acl"] = None 

643 

644 # Case 1: Path is just a bucket 

645 if not key: 

646 return await super()._mkdir( 

647 path, create_parents=create_parents, **bucket_kwargs 

648 ) 

649 

650 # Case 2: Path is a folder 

651 is_hns_bucket = False 

652 

653 # If creating parents and HNS/Zonal requested, ensure bucket exists with correct config 

654 if create_parents and should_create_hns_bucket: 

655 if not await self._exists(bucket): 

656 await super()._mkdir(bucket, create_parents=True, **bucket_kwargs) 

657 is_hns_bucket = True 

658 

659 if not is_hns_bucket: 

660 is_hns_bucket = await self._is_bucket_hns_enabled(bucket) 

661 

662 if is_hns_bucket: 

663 return await self._create_hns_folder(path, bucket, key, create_parents) 

664 

665 return await super()._mkdir( 

666 path, create_parents=create_parents, **bucket_kwargs 

667 ) 

668 

669 async def _create_hns_folder(self, path, bucket, key, create_parents): 

670 logger.debug(f"Using HNS-aware mkdir for '{path}'.") 

671 parent = f"projects/_/buckets/{bucket}" 

672 folder_id = key.rstrip("/") 

673 request = storage_control_v2.CreateFolderRequest( 

674 parent=parent, 

675 folder_id=folder_id, 

676 recursive=create_parents, 

677 request_id=str(uuid.uuid4()), 

678 ) 

679 try: 

680 logger.debug(f"create_folder request: {request}") 

681 client = await self._get_control_plane_client() 

682 await client.create_folder(request=request) 

683 # Instead of invalidating the parent cache, update it to add the new entry. 

684 parent_path = self._parent(path) 

685 if parent_path in self.dircache: 

686 new_entry = { 

687 "Key": key.rstrip("/"), 

688 "Size": 0, 

689 "name": path, 

690 "size": 0, 

691 "type": "directory", 

692 "storageClass": "DIRECTORY", 

693 } 

694 self.dircache[parent_path].append(new_entry) 

695 except api_exceptions.Conflict as e: 

696 logger.debug(f"Directory already exists: {path}: {e}") 

697 except api_exceptions.FailedPrecondition as e: 

698 # This error can occur if create_parents=False and the parent dir doesn't exist. 

699 # Translate it to FileNotFoundError for fsspec compatibility. 

700 raise FileNotFoundError( 

701 f"mkdir for '{path}' failed due to a precondition error: {e}" 

702 ) from e 

703 

704 mkdir = asyn.sync_wrapper(_mkdir) 

705 

706 async def _get_directory_info(self, path, bucket, key, generation): 

707 """ 

708 Override to use Storage Control API's get_folder for HNS buckets. 

709 For HNS, we avoid calling _ls (_list_objects) entirely. 

710 """ 

711 is_hns = await self._is_bucket_hns_enabled(bucket) 

712 

713 # If bucket is HNS, use get folder metadata api to determine a directory 

714 if is_hns: 

715 try: 

716 # folder_id is the path relative to the bucket 

717 folder_id = key.rstrip("/") 

718 folder_resource_name = ( 

719 f"projects/_/buckets/{bucket}/folders/{folder_id}" 

720 ) 

721 

722 request = storage_control_v2.GetFolderRequest( 

723 name=folder_resource_name, request_id=str(uuid.uuid4()) 

724 ) 

725 

726 # Verify existence using get_folder API 

727 client = await self._get_control_plane_client() 

728 response = await client.get_folder(request=request) 

729 

730 # If successful, return directory metadata 

731 return { 

732 "bucket": bucket, 

733 "name": path, 

734 "size": 0, 

735 "storageClass": "DIRECTORY", 

736 "type": "directory", 

737 "ctime": response.create_time, 

738 "mtime": response.update_time, 

739 "metageneration": response.metageneration, 

740 } 

741 except api_exceptions.NotFound: 

742 # If get_folder fails, the folder does not exist. 

743 raise FileNotFoundError(path) 

744 except Exception as e: 

745 # Log unexpected errors 

746 logger.error(f"Error fetching folder metadata for {path}: {e}") 

747 raise e 

748 

749 # Fallback to standard GCS behavior for non-HNS buckets 

750 return await super()._get_directory_info(path, bucket, key, generation) 

751 

752 async def _rmdir(self, path): 

753 """ 

754 Deletes an empty directory. Overrides the parent `_rmdir` to delete 

755 empty directories in HNS-enabled buckets. 

756 """ 

757 path = self._strip_protocol(path) 

758 bucket, key, _ = self.split_path(path) 

759 

760 # The parent _rmdir is only for deleting buckets. If key is empty, 

761 # given path is a bucket, we can fall back. 

762 if not key: 

763 return await super()._rmdir(path) 

764 

765 is_hns = await self._is_bucket_hns_enabled(bucket) 

766 if not is_hns: 

767 return await super()._rmdir(path) 

768 

769 # In HNS buckets, a placeholder object (e.g., 'a/b/c/') might exist, 

770 # which would cause rmdir('a/b/c') to fail because the directory is not empty. 

771 # To handle this, we first attempt to delete the placeholder object. 

772 # If it doesn't exist, _rm_file will raise a FileNotFoundError, which we can 

773 # safely ignore and proceed with the directory deletion. 

774 # 

775 # Note: This may delete the placeholder even if the directory contains 

776 # other files and the final `delete_folder` call fails. This side 

777 # effect is acceptable because placeholder objects are used to simulate 

778 # folders and are not strictly necessary in HNS-enabled buckets, which 

779 # have native folder entities. 

780 try: 

781 placeholder_path = f"{path.rstrip('/')}/" 

782 

783 await self._rm_file(placeholder_path) 

784 logger.debug( 

785 f"Removed placeholder object '{placeholder_path}' before rmdir." 

786 ) 

787 except FileNotFoundError: 

788 # This is expected if no placeholder object exists and can be safely ignored. 

789 pass 

790 

791 try: 

792 logger.debug(f"Using HNS-aware rmdir for '{path}'.") 

793 folder_name = f"projects/_/buckets/{bucket}/folders/{key.rstrip('/')}" 

794 request = storage_control_v2.DeleteFolderRequest( 

795 name=folder_name, 

796 request_id=str(uuid.uuid4()), 

797 ) 

798 

799 logger.debug(f"delete_folder request: {request}") 

800 client = await self._get_control_plane_client() 

801 await client.delete_folder(request=request) 

802 

803 # Remove the directory from the cache and from its parent's listing. 

804 self.dircache.pop(path, None) 

805 parent = self._parent(path) 

806 if parent in self.dircache: 

807 # Remove the deleted directory entry from the parent's listing. 

808 self.dircache[parent] = [ 

809 e for e in self.dircache[parent] if e.get("name") != path 

810 ] 

811 return 

812 except api_exceptions.NotFound as e: 

813 # This can happen if the directory does not exist, or if the path 

814 # points to a file object instead of a directory. 

815 raise FileNotFoundError(f"rmdir failed for path: {path}: {e}") from e 

816 except api_exceptions.FailedPrecondition as e: 

817 # This can happen if the directory is not empty. 

818 raise OSError( 

819 f"Pre condition failed for rmdir for path: {path}: {e}" 

820 ) from e 

821 except Exception as e: 

822 logger.error(f"HNS rmdir: Failed to delete folder '{path}': {e}") 

823 raise 

824 

825 rmdir = asyn.sync_wrapper(_rmdir) 

826 

827 # TODO: This method is only added to be used in rm method, can be deleted once 

828 # rm method is integrated with recursive API 

829 async def _expand_path_with_details( 

830 self, path, recursive=False, maxdepth=None, detail=False 

831 ): 

832 if maxdepth is not None and maxdepth < 1: 

833 raise ValueError("maxdepth must be at least 1") 

834 

835 if isinstance(path, str): 

836 return await self._expand_path_with_details( 

837 [path], recursive, maxdepth, detail=detail 

838 ) 

839 else: 

840 out = {} if detail else set() 

841 path = [self._strip_protocol(p) for p in path] 

842 

843 for p in path: 

844 if has_magic(p): 

845 bit = await self._glob(p, maxdepth=maxdepth, detail=detail) 

846 if detail: 

847 out.update(bit) 

848 bit_paths = list(bit.keys()) 

849 else: 

850 bit_set = set(bit) 

851 out |= bit_set 

852 bit_paths = list(bit_set) 

853 

854 if recursive: 

855 if maxdepth is not None and maxdepth <= 1: 

856 continue 

857 rec = await self._expand_path_with_details( 

858 bit_paths, 

859 recursive=recursive, 

860 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

861 detail=detail, 

862 ) 

863 if detail: 

864 for info in rec: 

865 out[info["name"]] = info 

866 else: 

867 out |= set(rec) 

868 continue 

869 elif recursive: 

870 rec = await self._find( 

871 p, maxdepth=maxdepth, withdirs=True, detail=detail 

872 ) 

873 if detail: 

874 out.update(rec) 

875 else: 

876 out |= set(rec) 

877 

878 if p not in out: 

879 if detail: 

880 try: 

881 info = await self._info(p) 

882 out[p] = info 

883 except (FileNotFoundError, OSError): 

884 pass 

885 elif recursive is False or (await self._exists(p)): 

886 out.add(p) 

887 

888 if detail: 

889 out = list(out.values()) 

890 else: 

891 out = sorted(out) 

892 

893 if not out: 

894 raise FileNotFoundError(path) 

895 return out 

896 

897 async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): 

898 """ 

899 Deletes files and directories. 

900 

901 This method overrides the parent `_rm` to correctly handle directory 

902 deletion in HNS-enabled buckets. For non-HNS buckets, it falls back 

903 to the parent implementation. 

904 

905 For HNS buckets, it first expands the path to get a list of all files 

906 and directories, then categorizes them. It deletes files in batches 

907 and then deletes directories individually. 

908 

909 Args: 

910 path (str or list): The path(s) to delete. 

911 recursive (bool): If True, deletes directories and their contents. 

912 maxdepth (int, optional): The maximum depth to traverse for deletion. 

913 batchsize (int): The number of files to delete in a single batch request. 

914 """ 

915 if isinstance(path, list): 

916 # For HNS check, we can check for bucket type from the first path. 

917 bucket, _, _ = self.split_path(path[0]) if path else (None, None, None) 

918 else: 

919 bucket, _, _ = self.split_path(path) 

920 

921 is_hns = await self._is_bucket_hns_enabled(bucket) 

922 

923 if not is_hns: 

924 # Fall back to the parent's async rm implementation for non-HNS buckets. 

925 return await super()._rm( 

926 path, recursive=recursive, maxdepth=maxdepth, batchsize=batchsize 

927 ) 

928 

929 paths = await self._expand_path_with_details( 

930 path, recursive=recursive, maxdepth=maxdepth, detail=True 

931 ) 

932 

933 # Separate files and directories based on their type. 

934 # Directories must be deleted from the deepest first. 

935 files = list({p["name"] for p in paths if p["type"] == "file"}) 

936 dirs = sorted( 

937 list({p["name"] for p in paths if p["type"] == "directory"}), 

938 reverse=True, 

939 ) 

940 

941 return await self._perform_rm(files, dirs, path, batchsize=batchsize) 

942 

943 async def _perform_rm(self, files, dirs, path, batchsize): 

944 """ 

945 Helper method to perform the deletion of files and directories. 

946 

947 Args: 

948 files (list[str]): A list of file paths to delete. 

949 dirs (list[str]): A list of directory paths to delete, sorted from 

950 deepest to shallowest. 

951 path (str): The original path for the rm operation, for error reporting. 

952 batchsize (int): The number of files to delete in a single batch request. 

953 

954 Returns: 

955 list: A list of exceptions that occurred during deletion. 

956 """ 

957 # If no files or directories were found to delete, raise FileNotFoundError. 

958 if not files and not dirs: 

959 raise FileNotFoundError(path) 

960 

961 exs = await self._delete_files(files, batchsize) 

962 # For directories, we must delete them from deepest to shallowest 

963 # to avoid race conditions where a parent is deleted before its child. 

964 # We group directories by depth and delete them level by level. 

965 dirs_by_depth = {} 

966 for d in dirs: 

967 depth = d.count("/") 

968 dirs_by_depth.setdefault(depth, []).append(d) 

969 

970 for depth in sorted(dirs_by_depth.keys(), reverse=True): 

971 level_dirs = dirs_by_depth[depth] 

972 results = await asyn._run_coros_in_chunks( 

973 [self._rmdir(d) for d in level_dirs], 

974 batch_size=batchsize, 

975 return_exceptions=True, 

976 ) 

977 for res in results: 

978 if isinstance(res, Exception): 

979 exs.append(res) 

980 

981 errors = [ 

982 ex 

983 for ex in exs 

984 if isinstance(ex, Exception) 

985 and not isinstance(ex, (FileNotFoundError, api_exceptions.NotFound)) 

986 and "No such object" not in str(ex) 

987 ] 

988 

989 if errors: 

990 raise errors[0] 

991 

992 # Filter out non-critical "not found" errors from the final list. 

993 # A successful rm should return an empty list. 

994 return [ 

995 e 

996 for e in exs 

997 if e is not None 

998 and not isinstance(e, (FileNotFoundError, api_exceptions.NotFound)) 

999 and "No such object" not in str(e) 

1000 ] 

1001 

1002 rm = asyn.sync_wrapper(_rm) 

1003 

1004 async def _find( 

1005 self, 

1006 path, 

1007 withdirs=False, 

1008 detail=False, 

1009 prefix="", 

1010 versions=False, 

1011 maxdepth=None, 

1012 **kwargs, 

1013 ): 

1014 """ 

1015 HNS-aware find. Overrides the parent to correctly list empty folders in HNS buckets. 

1016 

1017 For HNS buckets this method uses a hybrid approach for fetching files and directories: 

1018 1. It fetches all files in a single recursive API call (like the parent). 

1019 2. It concurrently fetches all folder objects (including empty ones) 

1020 using a recursive walk with the Storage Control API. 

1021 3. It merges these results for a complete listing. 

1022 

1023 For buckets with flat structure, it falls back to the parent implementation. 

1024 """ 

1025 path = self._strip_protocol(path) 

1026 if maxdepth is not None and maxdepth < 1: 

1027 raise ValueError("maxdepth must be at least 1") 

1028 bucket, _, _ = self.split_path(path) 

1029 

1030 is_hns = await self._is_bucket_hns_enabled(bucket) 

1031 if not is_hns: 

1032 # Use parent implementation if bucket is not HNS. 

1033 return await super()._find( 

1034 path, 

1035 withdirs=withdirs, 

1036 detail=detail, 

1037 prefix=prefix, 

1038 versions=versions, 

1039 maxdepth=maxdepth, 

1040 **kwargs, 

1041 ) 

1042 

1043 # Hybrid approach for HNS enabled buckets 

1044 # 1. Fetch all files from super find() method by passing withdirs as False. 

1045 # We pass maxdepth as None here to ensure we fetch all files for caching, 

1046 # and then filter by maxdepth at the end of this method. 

1047 files_task = asyncio.create_task( 

1048 super()._find( 

1049 path, 

1050 withdirs=False, # Fetch files only 

1051 detail=True, # Get full details for merging and populating cache 

1052 prefix=prefix, 

1053 versions=versions, 

1054 maxdepth=None, 

1055 update_cache=False, # Defer caching until merging files and folders 

1056 **kwargs, 

1057 ) 

1058 ) 

1059 

1060 # 2. Fetch all folders recursively. This is necessary to find all folders, 

1061 # especially empty ones. 

1062 folders_task = asyncio.create_task( 

1063 self._get_all_folders(path, bucket, prefix=prefix) 

1064 ) 

1065 # 3. Run tasks concurrently and merge results. 

1066 files_result, folders_result = await asyncio.gather(files_task, folders_task) 

1067 

1068 # Always update the cache with both files and folders for consistency. 

1069 cacheable_objects = list(files_result.values()) + folders_result 

1070 self._get_dirs_and_update_cache(path, cacheable_objects, prefix=prefix) 

1071 

1072 if not withdirs: 

1073 # If not including directories, the final output should only contain files. 

1074 all_objects = list(files_result.values()) 

1075 else: 

1076 all_objects = cacheable_objects 

1077 

1078 all_objects.sort(key=lambda o: o["name"]) 

1079 

1080 # Final filtering and formatting. `all_objects` now contains a complete 

1081 # list of all files and folders. 

1082 if maxdepth: 

1083 depth = path.rstrip("/").count("/") + maxdepth 

1084 all_objects = [o for o in all_objects if o["name"].count("/") <= depth] 

1085 

1086 if detail: 

1087 if versions: 

1088 return { 

1089 ( 

1090 f"{o['name']}#{o['generation']}" 

1091 if "generation" in o 

1092 else o["name"] 

1093 ): o 

1094 for o in all_objects 

1095 } 

1096 return {o["name"]: o for o in all_objects} 

1097 

1098 if versions: 

1099 return [ 

1100 f"{o['name']}#{o['generation']}" if "generation" in o else o["name"] 

1101 for o in all_objects 

1102 ] 

1103 return [o["name"] for o in all_objects] 

1104 

1105 async def _get_all_folders(self, path, bucket, prefix=""): 

1106 """ 

1107 Recursively fetches all folder objects under a given path using the 

1108 Storage Control API. 

1109 """ 

1110 folders = [] 

1111 base_path = self.split_path(path)[1].rstrip("/") 

1112 full_prefix = f"{base_path}/{prefix}".strip("/") if base_path else prefix 

1113 

1114 folder_id = full_prefix 

1115 if folder_id and not folder_id.endswith("/"): 

1116 folder_id += "/" 

1117 parent = f"projects/_/buckets/{bucket}" 

1118 request = storage_control_v2.ListFoldersRequest( 

1119 parent=parent, prefix=folder_id, request_id=str(uuid.uuid4()) 

1120 ) 

1121 logger.debug(f"list_folders request: {request}") 

1122 

1123 client = await self._get_control_plane_client() 

1124 async for folder in await client.list_folders(request=request): 

1125 folders.append(self._create_folder_entry(bucket, folder)) 

1126 

1127 return folders 

1128 

1129 def _create_folder_entry(self, bucket, folder): 

1130 """Helper to create a dictionary representing a folder entry.""" 

1131 path = f"{bucket}/{folder.name.split('/folders/')[1]}".rstrip("/") 

1132 _, key, _ = self.split_path(path) 

1133 return { 

1134 "Key": key, 

1135 "Size": 0, 

1136 "name": path, 

1137 "size": 0, 

1138 "type": "directory", 

1139 "storageClass": "DIRECTORY", 

1140 "ctime": folder.create_time, 

1141 "mtime": folder.update_time, 

1142 "metageneration": folder.metageneration, 

1143 } 

1144 

1145 async def _put_file( 

1146 self, 

1147 lpath, 

1148 rpath, 

1149 metadata=None, 

1150 consistency=None, 

1151 content_type=None, 

1152 chunksize=50 * 2**20, 

1153 callback=None, 

1154 fixed_key_metadata=None, 

1155 mode="overwrite", 

1156 **kwargs, 

1157 ): 

1158 """Upload a local file. 

1159 

1160 This method is optimized for Zonal buckets, using gRPC for uploads. 

1161 In zonal buckets, file is left *unfinalized* by default unless 

1162 `finalize_on_close` is set to True. 

1163 For non-Zonal buckets, it delegates to the parent class's implementation. 

1164 

1165 Parameters 

1166 ---------- 

1167 lpath: str 

1168 Path to the local file to be uploaded. 

1169 rpath: str 

1170 Path on GCS to upload the file to. 

1171 metadata: dict, optional 

1172 Unsupported for Zonal buckets and will be ignored. 

1173 consistency: str, optional 

1174 Unsupported for Zonal buckets and will be ignored. 

1175 content_type: str, optional 

1176 Unsupported for Zonal buckets and will be ignored except for the default. 

1177 chunksize: int, optional 

1178 The size of chunks to upload data in. 

1179 callback: fsspec.callbacks.Callback, optional 

1180 Callback to monitor the upload progress. 

1181 fixed_key_metadata: dict, optional 

1182 Unsupported for Zonal buckets and will be ignored. 

1183 mode: str, optional 

1184 The write mode, either 'overwrite' or 'create'. 

1185 """ 

1186 bucket, key, generation = self.split_path(rpath) 

1187 if not await self._is_zonal_bucket(bucket): 

1188 return await super()._put_file( 

1189 lpath, 

1190 rpath, 

1191 metadata=metadata, 

1192 consistency=consistency, 

1193 content_type=content_type, 

1194 chunksize=chunksize, 

1195 callback=callback, 

1196 fixed_key_metadata=fixed_key_metadata, 

1197 mode=mode, 

1198 **kwargs, 

1199 ) 

1200 

1201 if os.path.isdir(lpath): 

1202 return 

1203 

1204 if generation: 

1205 raise ValueError("Cannot write to specific object generation") 

1206 

1207 if ( 

1208 metadata 

1209 or fixed_key_metadata 

1210 or consistency 

1211 or (content_type and content_type != "application/octet-stream") 

1212 ): 

1213 logger.warning( 

1214 "Zonal buckets do not support content_type, metadata, " 

1215 "fixed_key_metadata or consistency during upload. " 

1216 "These parameters will be ignored." 

1217 ) 

1218 await self._get_grpc_client() 

1219 # Works for both 'overwrite' and 'create' modes 

1220 writer = await zb_hns_utils.init_aaow(self.grpc_client, bucket, key) 

1221 

1222 try: 

1223 with open(lpath, "rb") as f: 

1224 await writer.append_from_file(f, block_size=chunksize) 

1225 finally: 

1226 finalize_on_close = kwargs.get("finalize_on_close", self.finalize_on_close) 

1227 await zb_hns_utils.close_aaow(writer, finalize_on_close=finalize_on_close) 

1228 

1229 self.invalidate_cache(self._parent(rpath)) 

1230 

1231 async def _pipe_file( 

1232 self, 

1233 path, 

1234 data, 

1235 metadata=None, 

1236 consistency=None, 

1237 content_type="application/octet-stream", 

1238 fixed_key_metadata=None, 

1239 chunksize=50 * 2**20, 

1240 mode="overwrite", 

1241 **kwargs, 

1242 ): 

1243 """Upload bytes to a file. 

1244 

1245 This method is optimized for Zonal buckets, using gRPC for uploads. 

1246 In zonal buckets, file is left *unfinalized* by default unless 

1247 `finalize_on_close` is set to True. 

1248 For non-Zonal buckets, it delegates to the parent class's implementation. 

1249 

1250 Parameters 

1251 ---------- 

1252 path: str 

1253 Path to the file to be written. 

1254 data: bytes 

1255 The content to write to the file. 

1256 metadata: dict, optional 

1257 Unsupported for Zonal buckets and will be ignored. 

1258 consistency: str, optional 

1259 Unsupported for Zonal buckets and will be ignored. 

1260 content_type: str, optional 

1261 Unsupported for Zonal buckets and will be ignored, except for the default. 

1262 fixed_key_metadata: dict, optional 

1263 Unsupported for Zonal buckets and will be ignored. 

1264 chunksize: int, optional 

1265 The size of chunks to upload data in. 

1266 mode: str, optional 

1267 The write mode, either 'overwrite' or 'create'. 

1268 """ 

1269 bucket, key, generation = self.split_path(path) 

1270 if not await self._is_zonal_bucket(bucket): 

1271 return await super()._pipe_file( 

1272 path, 

1273 data, 

1274 metadata=metadata, 

1275 consistency=consistency, 

1276 content_type=content_type, 

1277 fixed_key_metadata=fixed_key_metadata, 

1278 chunksize=chunksize, 

1279 mode=mode, 

1280 ) 

1281 

1282 if ( 

1283 metadata 

1284 or fixed_key_metadata 

1285 or (content_type and content_type != "application/octet-stream") 

1286 ): 

1287 logger.warning( 

1288 "Zonal buckets do not support content_type, metadata or " 

1289 "fixed_key_metadata during upload. These parameters will be ignored." 

1290 ) 

1291 await self._get_grpc_client() 

1292 # Works for both 'overwrite' and 'create' modes 

1293 writer = await zb_hns_utils.init_aaow(self.grpc_client, bucket, key) 

1294 try: 

1295 for i in range(0, len(data), chunksize): 

1296 await writer.append(data[i : i + chunksize]) 

1297 finally: 

1298 finalize_on_close = kwargs.get("finalize_on_close", self.finalize_on_close) 

1299 await zb_hns_utils.close_aaow(writer, finalize_on_close=finalize_on_close) 

1300 

1301 self.invalidate_cache(self._parent(path)) 

1302 

1303 async def _get_file(self, rpath, lpath, callback=None, **kwargs): 

1304 """ 

1305 Downloads a file from GCS to a local path. 

1306 

1307 For Zonal buckets, it uses gRPC client for optimized downloads. 

1308 For Standard buckets, it delegates to the parent class implementation. 

1309 

1310 Parameters 

1311 ---------- 

1312 rpath: str 

1313 Path on GCS to download the file from. 

1314 lpath: str 

1315 Path to the local file to be downloaded. 

1316 callback: fsspec.callbacks.Callback, optional 

1317 Callback to monitor the download progress. 

1318 **kwargs: 

1319 For Zonal buckets, `chunksize` bytes (int) can be provided to control 

1320 the download chunk size (default is 128KB). 

1321 """ 

1322 bucket, key, generation = self.split_path(rpath) 

1323 if not await self._is_zonal_bucket(bucket): 

1324 return await super()._get_file( 

1325 rpath, 

1326 lpath, 

1327 callback=callback, 

1328 **kwargs, 

1329 ) 

1330 

1331 if os.path.isdir(lpath): 

1332 return 

1333 callback = callback or NoOpCallback() 

1334 

1335 mrd = None 

1336 try: 

1337 await self._get_grpc_client() 

1338 mrd = await zb_hns_utils.init_mrd(self.grpc_client, bucket, key, generation) 

1339 

1340 size = mrd.persisted_size 

1341 if size is None: 

1342 logger.warning( 

1343 f"AsyncMultiRangeDownloader (MRD) for {rpath} has no 'persisted_size'. " 

1344 "Falling back to _info() to get the file size. " 

1345 "This may result in incorrect behavior for unfinalized objects." 

1346 ) 

1347 size = (await self._info(rpath))["size"] 

1348 callback.set_size(size) 

1349 

1350 lparent = os.path.dirname(lpath) or os.curdir 

1351 os.makedirs(lparent, exist_ok=True) 

1352 

1353 chunksize = kwargs.get("chunksize", 4096 * 32) # 128KB default 

1354 offset = 0 

1355 

1356 with open(lpath, "wb") as f2: 

1357 while True: 

1358 if offset >= size: 

1359 break 

1360 

1361 data = await zb_hns_utils.download_range( 

1362 offset=offset, length=chunksize, mrd=mrd 

1363 ) 

1364 if not data: 

1365 break 

1366 

1367 f2.write(data) 

1368 offset += len(data) 

1369 callback.relative_update(len(data)) 

1370 except Exception as e: 

1371 # Clean up the corrupted file before raising error 

1372 if os.path.exists(lpath): 

1373 os.remove(lpath) 

1374 raise e 

1375 finally: 

1376 await zb_hns_utils.close_mrd(mrd) 

1377 

1378 async def _do_list_objects( 

1379 self, 

1380 path, 

1381 max_results=None, 

1382 delimiter="/", 

1383 prefix="", 

1384 versions=False, 

1385 **kwargs, 

1386 ): 

1387 """ 

1388 Lists objects in a bucket. 

1389 

1390 For HNS-enabled buckets, it sets `includeFoldersAsPrefixes` to True 

1391 when the delimiter is '/'. 

1392 """ 

1393 bucket, _, _ = self.split_path(path) 

1394 if await self._is_bucket_hns_enabled(bucket) and delimiter == "/": 

1395 kwargs["includeFoldersAsPrefixes"] = "true" 

1396 

1397 return await super()._do_list_objects( 

1398 path, 

1399 max_results=max_results, 

1400 delimiter=delimiter, 

1401 prefix=prefix, 

1402 versions=versions, 

1403 **kwargs, 

1404 ) 

1405 

1406 async def _cp_file(self, path1, path2, acl=None, **kwargs): 

1407 """Duplicate remote file. 

1408 

1409 For Standard GCS buckets, falls back to the parent class's implementation 

1410 

1411 Zonal Bucket Support: 

1412 Server-side copy is currently NOT supported for Zonal buckets because 

1413 the `RewriteObject` API is unavailable for them. 

1414 

1415 The following scenarios will raise a `NotImplementedError`: 

1416 * Intra-zonal: Copying within the same Zonal bucket. 

1417 * Inter-zonal: Copying between two different Zonal buckets. 

1418 * Mixed: Copying between a Zonal bucket and a Standard bucket. 

1419 

1420 """ 

1421 b1, _, _ = self.split_path(path1) 

1422 b2, _, _ = self.split_path(path2) 

1423 

1424 is_zonal_source, is_zonal_dest = await asyncio.gather( 

1425 self._is_zonal_bucket(b1), self._is_zonal_bucket(b2) 

1426 ) 

1427 

1428 # 1. Standard -> Standard (Delegate to core implementation) 

1429 if not is_zonal_source and not is_zonal_dest: 

1430 return await super()._cp_file(path1, path2, acl=acl, **kwargs) 

1431 

1432 # 2. Zonal Scenarios (Currently Unsupported) 

1433 raise NotImplementedError( 

1434 "Server-side copy involving Zonal buckets is not supported. " 

1435 "Zonal objects do not support rewrite." 

1436 ) 

1437 

1438 async def _merge(self, path, paths, acl=None): 

1439 """Concatenate objects within a single bucket. 

1440 

1441 For Standard GCS buckets, falls back to the parent class's implementation 

1442 

1443 Zonal Bucket Support: 

1444 Server-side compose is currently NOT supported for Zonal buckets. 

1445 """ 

1446 bucket, _, _ = self.split_path(path) 

1447 

1448 if await self._is_zonal_bucket(bucket): 

1449 raise NotImplementedError( 

1450 "Server-side compose/merge is not supported for Zonal buckets." 

1451 ) 

1452 

1453 return await super()._merge(path, paths, acl=acl) 

1454 

1455 merge = asyn.sync_wrapper(_merge) 

1456 

1457 

1458async def upload_chunk(fs, location, data, offset, size, content_type): 

1459 """ 

1460 Uploads a chunk of data using AsyncAppendableObjectWriter for zonal buckets. 

1461 Finalizes the upload when the total uploaded data size reaches the specified size. 

1462 Delegates to core upload_chunk implementation for Non-Zonal buckets. 

1463 """ 

1464 # If `location` is an HTTP resumable-upload URL (string), delegate to core upload_chunk 

1465 # for Standard buckets. 

1466 if isinstance(location, (str, bytes)): 

1467 from gcsfs.core import upload_chunk as core_upload_chunk 

1468 

1469 return await core_upload_chunk(fs, location, data, offset, size, content_type) 

1470 

1471 if not isinstance(location, AsyncAppendableObjectWriter): 

1472 raise TypeError( 

1473 "upload_chunk for Zonal buckets expects an AsyncAppendableObjectWriter instance." 

1474 ) 

1475 

1476 if offset or size or content_type: 

1477 logger.warning( 

1478 "Zonal buckets do not support offset, or content_type during upload. These parameters will be ignored." 

1479 ) 

1480 

1481 if not location._is_stream_open: 

1482 raise ValueError("Writer is closed. Please initiate a new upload.") 

1483 

1484 try: 

1485 await location.append(data) 

1486 except Exception as e: 

1487 logger.error( 

1488 f"Error uploading chunk at offset {location.offset}: {e}. Closing stream." 

1489 ) 

1490 # Don't finalize the upload on error 

1491 await zb_hns_utils.close_aaow(location, finalize_on_close=False) 

1492 raise 

1493 

1494 if (location.offset or 0) >= size: 

1495 logger.debug("Uploaded data is equal or greater than size. Finalizing upload.") 

1496 await zb_hns_utils.close_aaow(location, finalize_on_close=True) 

1497 

1498 

1499async def initiate_upload( 

1500 fs, 

1501 bucket, 

1502 key, 

1503 content_type="application/octet-stream", 

1504 metadata=None, 

1505 fixed_key_metadata=None, 

1506 mode="overwrite", 

1507 kms_key_name=None, 

1508): 

1509 """ 

1510 Initiates an upload for Zonal buckets by creating an AsyncAppendableObjectWriter. 

1511 Delegates to core initiate_upload implementation for Non-Zonal buckets. 

1512 

1513 Parameters 

1514 ---------- 

1515 fs: GCSFileSystem 

1516 The GCS filesystem instance. 

1517 bucket: str 

1518 The target bucket name. 

1519 key: str 

1520 The target object key. 

1521 content_type: str, optional 

1522 Unsupported for Zonal buckets and will be ignored, except for the default. 

1523 metadata: dict, optional 

1524 Unsupported for Zonal buckets and will be ignored. 

1525 fixed_key_metadata: dict, optional 

1526 Unsupported for Zonal buckets and will be ignored. 

1527 mode: str, optional 

1528 The write mode, either 'overwrite' or 'create'. 

1529 kms_key_name: str, optional 

1530 Unsupported for Zonal buckets and will be ignored. 

1531 """ 

1532 if not await fs._is_zonal_bucket(bucket): 

1533 from gcsfs.core import initiate_upload as core_initiate_upload 

1534 

1535 return await core_initiate_upload( 

1536 fs, 

1537 bucket, 

1538 key, 

1539 content_type, 

1540 metadata, 

1541 fixed_key_metadata, 

1542 mode, 

1543 kms_key_name, 

1544 ) 

1545 

1546 if ( 

1547 metadata 

1548 or fixed_key_metadata 

1549 or kms_key_name 

1550 or (content_type and content_type != "application/octet-stream") 

1551 ): 

1552 logger.warning( 

1553 "Zonal buckets do not support content_type, metadata, fixed_key_metadata, " 

1554 "or kms_key_name during upload. These parameters will be ignored." 

1555 ) 

1556 

1557 await fs._get_grpc_client() 

1558 # If generation is not passed to init_aaow, it creates a new object and overwrites if object already exists. 

1559 # Hence it works for both 'overwrite' and 'create' modes. 

1560 return await zb_hns_utils.init_aaow(fs.grpc_client, bucket, key) 

1561 

1562 

1563async def simple_upload( 

1564 fs, 

1565 bucket, 

1566 key, 

1567 datain, 

1568 metadatain=None, 

1569 consistency=None, 

1570 content_type="application/octet-stream", 

1571 fixed_key_metadata=None, 

1572 mode="overwrite", 

1573 kms_key_name=None, 

1574 **kwargs, 

1575): 

1576 """ 

1577 Performs a simple, single-request upload to Zonal bucket using gRPC. 

1578 In zonal buckets, file is left *unfinalized* by default unless 

1579 `finalize_on_close` is set to True. 

1580 Delegates to core simple_upload implementation for Non-Zonal buckets. 

1581 

1582 Parameters 

1583 ---------- 

1584 fs: GCSFileSystem 

1585 The GCS filesystem instance. 

1586 bucket: str 

1587 The target bucket name. 

1588 key: str 

1589 The target object key. 

1590 datain: bytes 

1591 The data to be uploaded. 

1592 metadatain: dict, optional 

1593 Unsupported for Zonal buckets and will be ignored. 

1594 consistency: str, optional 

1595 Unsupported for Zonal buckets and will be ignored. 

1596 content_type: str, optional 

1597 Unsupported for Zonal buckets and will be ignored, except for the default. 

1598 fixed_key_metadata: dict, optional 

1599 Unsupported for Zonal buckets and will be ignored. 

1600 mode: str, optional 

1601 The write mode, either 'overwrite' or 'create'. 

1602 kms_key_name: str, optional 

1603 Unsupported for Zonal buckets and will be ignored. 

1604 """ 

1605 if not await fs._is_zonal_bucket(bucket): 

1606 from gcsfs.core import simple_upload as core_simple_upload 

1607 

1608 return await core_simple_upload( 

1609 fs, 

1610 bucket, 

1611 key, 

1612 datain, 

1613 metadatain, 

1614 consistency, 

1615 content_type, 

1616 fixed_key_metadata, 

1617 mode, 

1618 kms_key_name, 

1619 ) 

1620 

1621 if ( 

1622 metadatain 

1623 or fixed_key_metadata 

1624 or kms_key_name 

1625 or consistency 

1626 or (content_type and content_type != "application/octet-stream") 

1627 ): 

1628 logger.warning( 

1629 "Zonal buckets do not support content_type, metadatain, fixed_key_metadata, " 

1630 "consistency or kms_key_name during upload. These parameters will be ignored." 

1631 ) 

1632 await fs._get_grpc_client() 

1633 # If generation is not passed to init_aaow, it creates a new object and overwrites if object already exists. 

1634 # Hence it works for both 'overwrite' and 'create' modes. 

1635 writer = await zb_hns_utils.init_aaow(fs.grpc_client, bucket, key) 

1636 try: 

1637 await writer.append(datain) 

1638 finally: 

1639 default_finalize = getattr(fs, "finalize_on_close", False) 

1640 finalize_on_close = kwargs.get("finalize_on_close", default_finalize) 

1641 await zb_hns_utils.close_aaow(writer, finalize_on_close=finalize_on_close)