Coverage for gcsfs/core.py: 80%

1024 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1""" 

2Google Cloud Storage pythonic interface 

3""" 

4 

5import asyncio 

6import io 

7import json 

8import logging 

9import mimetypes 

10import os 

11import posixpath 

12import re 

13import uuid 

14import warnings 

15import weakref 

16from datetime import datetime, timedelta 

17from glob import has_magic 

18from urllib.parse import parse_qs 

19from urllib.parse import quote as quote_urllib 

20from urllib.parse import urlsplit 

21 

22import aiohttp 

23import fsspec 

24from fsspec import asyn 

25from fsspec.callbacks import NoOpCallback 

26from fsspec.implementations.http import get_client 

27from fsspec.utils import other_paths, setup_logging, stringify_path 

28 

29from . import __version__ as version 

30from .checkers import get_consistency_checker 

31from .concurrency import parallel_tasks_first_completed 

32from .credentials import GoogleCredentials 

33from .inventory_report import InventoryReport 

34from .retry import errs, retry_request, validate_response 

35from .zb_hns_utils import DEFAULT_CONCURRENCY, MAX_PREFETCH_SIZE 

36 

37logger = logging.getLogger("gcsfs") 

38 

39 

40if "GCSFS_DEBUG" in os.environ: 

41 setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG")) 

42 

43 

44# client created 2018-01-16 

45ACLs = { 

46 "authenticatedread", 

47 "bucketownerfullcontrol", 

48 "bucketownerread", 

49 "private", 

50 "projectprivate", 

51 "publicread", 

52} 

53bACLs = { 

54 "authenticatedRead", 

55 "private", 

56 "projectPrivate", 

57 "publicRead", 

58 "publicReadWrite", 

59} 

60DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "") 

61 

62GCS_MIN_BLOCK_SIZE = 2**18 

63GCS_MAX_BLOCK_SIZE = 2**28 

64DEFAULT_BLOCK_SIZE = 5 * 2**20 

65 

66SUPPORTED_FIXED_KEY_METADATA = { 

67 "content_encoding": "contentEncoding", 

68 "cache_control": "cacheControl", 

69 "content_disposition": "contentDisposition", 

70 "content_language": "contentLanguage", 

71 "custom_time": "customTime", 

72} 

73 

74# Define allowed parameters for the GCS list API 

75_VALID_LIST_PARAMS = { 

76 "delimiter", 

77 "prefix", 

78 "startOffset", 

79 "endOffset", 

80 "maxResults", 

81 "versions", 

82 "pageToken", 

83 "includeFoldersAsPrefixes", 

84} 

85 

86 

87def quote(s): 

88 """ 

89 Quote characters to be safe for URL paths. 

90 Also quotes '/'. 

91 

92 Parameters 

93 ---------- 

94 s: input URL/portion 

95 

96 Returns 

97 ------- 

98 corrected URL 

99 """ 

100 # Encode everything, including slashes 

101 return quote_urllib(s, safe="") 

102 

103 

104def norm_path(path): 

105 """ 

106 Canonicalize path to '{bucket}/{name}' form. 

107 

108 Used by petastorm, do not remove. 

109 """ 

110 bucket, name, _ = GCSFileSystem._split_path(path) 

111 return "/".join((bucket, name)) 

112 

113 

114async def _req_to_text(r): 

115 async with r: 

116 return (await r.read()).decode() 

117 

118 

119class UnclosableBytesIO(io.BytesIO): 

120 """Prevent closing BytesIO to avoid errors during retries.""" 

121 

122 def close(self): 

123 """Reset stream position for next retry.""" 

124 self.seek(0) 

125 

126 

127def _gcp_universe_domain(): 

128 return os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN", "googleapis.com") 

129 

130 

131def _location(): 

132 """ 

133 Resolves GCS HTTP location as http[s]://host 

134 

135 Enables storage emulation for integration tests. 

136 

137 Returns 

138 ------- 

139 valid http location 

140 """ 

141 _emulator_location = os.getenv("STORAGE_EMULATOR_HOST", "") 

142 if _emulator_location not in {"default", "", None}: 

143 if not any( 

144 _emulator_location.startswith(scheme) for scheme in ("http://", "https://") 

145 ): 

146 _emulator_location = f"http://{_emulator_location}" 

147 return _emulator_location 

148 

149 return f"https://storage.{_gcp_universe_domain()}" 

150 

151 

152def _chunks(lst, n): 

153 """ 

154 Yield evenly-sized chunks from a list. 

155 

156 Implementation based on https://stackoverflow.com/a/312464. 

157 """ 

158 for i in range(0, len(lst), n): 

159 yield lst[i : i + n] 

160 

161 

162def _coalesce_generation(*args): 

163 """Helper to coalesce a list of object generations down to one.""" 

164 generations = set(args) 

165 if None in generations: 

166 generations.remove(None) 

167 if len(generations) > 1: 

168 raise ValueError( 

169 "Cannot coalesce generations where more than one are defined," 

170 f" {generations}" 

171 ) 

172 elif len(generations) == 0: 

173 return None 

174 else: 

175 return generations.pop() 

176 

177 

178def _is_directory_marker(entry): 

179 return entry["size"] == 0 and entry["name"].endswith("/") 

180 

181 

182class GCSFileSystem(asyn.AsyncFileSystem): 

183 r""" 

184 Connect to Google Cloud Storage. 

185 

186 The following modes of authentication are supported: 

187 

188 - ``token=None``, GCSFS will attempt to guess your credentials in the 

189 following order: gcloud CLI default, gcsfs cached token, google compute 

190 metadata service, anonymous. 

191 - ``token='google_default'``, your default gcloud credentials will be used, 

192 which are typically established by doing ``gcloud login`` in a terminal. 

193 - ``token='cache'``, credentials from previously successful gcsfs 

194 authentication will be used (use this after "browser" auth succeeded) 

195 - ``token='anon'``, no authentication is performed, and you can only 

196 access data which is accessible to allUsers (in this case, the project and 

197 access level parameters are meaningless) 

198 - ``token='browser'``, you get an access code with which you can 

199 authenticate via a specially provided URL 

200 - if ``token='cloud'``, we assume we are running within google compute 

201 or google container engine, and query the internal metadata directly for 

202 a token. 

203 - you may supply a token generated by the 

204 [gcloud](https://cloud.google.com/sdk/docs/) 

205 utility; this is either a python dictionary, the name of a file 

206 containing the JSON returned by logging in with the gcloud CLI tool, 

207 or a Credentials object. gcloud typically stores its tokens in locations 

208 such as 

209 ``~/.config/gcloud/application_default_credentials.json``, 

210 ``~/.config/gcloud/credentials``, or 

211 ``~\AppData\Roaming\gcloud\credentials``, etc. 

212 

213 Specific methods, (eg. ``ls``, ``info``, ...) may return object details from GCS. 

214 These detailed listings include the 

215 [object resource](https://cloud.google.com/storage/docs/json_api/v1/objects#resource) 

216 

217 GCS *does not* include "directory" objects but instead generates 

218 directories by splitting 

219 [object names](https://cloud.google.com/storage/docs/key-terms). 

220 This means that, for example, 

221 a directory does not need to exist for an object to be created within it. 

222 Creating an object implicitly creates it's parent directories, and removing 

223 all objects from a directory implicitly deletes the empty directory. 

224 

225 `GCSFileSystem` generates listing entries for these implied directories in 

226 listing apis with the object properties: 

227 

228 - "name" : string 

229 The "{bucket}/{name}" path of the dir, used in calls to 

230 GCSFileSystem or GCSFile. 

231 - "bucket" : string 

232 The name of the bucket containing this object. 

233 - "kind" : 'storage#object' 

234 - "size" : 0 

235 - "storageClass" : 'DIRECTORY' 

236 - type: 'directory' (fsspec compat) 

237 

238 GCSFileSystem maintains a per-implied-directory cache of object listings and 

239 fulfills all object information and listing requests from cache. This implied, for example, that objects 

240 created via other processes *will not* be visible to the GCSFileSystem until the cache 

241 refreshed. Calls to GCSFileSystem.open and calls to GCSFile are not affected by this cache. 

242 

243 Note that directory listings are cached by default, because fetching those listings can be expensive. This is 

244 contrary to local filesystem behaviour. The cache will be cleared if writing from this instance, but it can 

245 become stale and return incorrect results if the storage is written to from another process/machine. 

246 If you anticipate this possibility, you can set the use_listings_cache and listings_expiry_time arguments 

247 to configure the caching, call `.invalidate_cache()` when required, or pass `refresh=True` to the 

248 various listing methods. 

249 

250 In the default case the cache is never expired. This may be controlled via the ``cache_timeout`` 

251 GCSFileSystem parameter or via explicit calls to ``GCSFileSystem.invalidate_cache``. 

252 

253 NOTE on "exclusive" mode: mode=="create"" (in pipe and put) and open(mode="xb") are supported on an 

254 experimental basis. The test harness does not currently support this, so use at your 

255 own risk. 

256 

257 Parameters 

258 ---------- 

259 project : string 

260 project_id to work under. Note that this is not the same as, but often 

261 very similar to, the project name. 

262 This is required in order 

263 to list all the buckets you have access to within a project and to 

264 create/delete buckets, or update their access policies. 

265 If ``token='google_default'``, the value is overridden by the default, 

266 if ``token='anon'``, the value is ignored. 

267 access : one of {'read_only', 'read_write', 'full_control'} 

268 Full control implies read/write as well as modifying metadata, 

269 e.g., access control. 

270 token: None, dict or string 

271 (see description of authentication methods, above) 

272 consistency: 'none', 'size', 'md5' 

273 Check method when writing files. Can be overridden in open(). 

274 cache_timeout: float, seconds 

275 Cache expiration time in seconds for object metadata cache. 

276 Set cache_timeout <= 0 for no caching, None for no cache expiration. 

277 secure_serialize: bool (deprecated) 

278 requester_pays : bool, or str default False 

279 Whether to use requester-pays requests. This will include your 

280 project ID `project` in requests as the `userProject`, and you'll be 

281 billed for accessing data from requester-pays buckets. Optionally, 

282 pass a project-id here as a string to use that as the `userProject`. 

283 session_kwargs: dict 

284 passed on to ``aiohttp.ClientSession``; can contain, for example, 

285 proxy settings. 

286 endpoint_url: str 

287 If given, use this URL (format protocol://host:port , *without* any 

288 path part) for communication. If not given, defaults to the value 

289 of environment variable "STORAGE_EMULATOR_HOST"; if that is not set 

290 either, will use the standard Google endpoint. 

291 default_location: str 

292 Default location where buckets are created, like 'US' or 'EUROPE-WEST3'. 

293 You can find a list of all available locations here: 

294 https://cloud.google.com/storage/docs/locations#available-locations 

295 version_aware: bool 

296 Whether to support object versioning. If enabled this will require the 

297 user to have the necessary permissions for dealing with versioned objects. 

298 """ 

299 

300 scopes = {"read_only", "read_write", "full_control"} 

301 retries = 6 # number of retries on http failure 

302 default_block_size = DEFAULT_BLOCK_SIZE 

303 protocol = "gs", "gcs" 

304 async_impl = True 

305 MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024 

306 

307 def __init__( 

308 self, 

309 project=DEFAULT_PROJECT, 

310 access="full_control", 

311 token=None, 

312 block_size=None, 

313 consistency="none", 

314 cache_timeout=None, 

315 secure_serialize=True, 

316 check_connection=None, 

317 requests_timeout=None, 

318 requester_pays=False, 

319 asynchronous=False, 

320 session_kwargs=None, 

321 loop=None, 

322 timeout=None, 

323 endpoint_url=None, 

324 default_location=None, 

325 version_aware=False, 

326 **kwargs, 

327 ): 

328 if cache_timeout is not None: 

329 kwargs["listings_expiry_time"] = cache_timeout 

330 super().__init__( 

331 self, 

332 asynchronous=asynchronous, 

333 loop=loop, 

334 **kwargs, 

335 ) 

336 if access not in self.scopes: 

337 raise ValueError("access must be one of {}", self.scopes) 

338 if project is None: 

339 warnings.warn("GCS project not set - cannot list or create buckets") 

340 if block_size is not None: 

341 self.default_block_size = block_size 

342 self.requester_pays = requester_pays 

343 self.consistency = consistency 

344 self.cache_timeout = cache_timeout or kwargs.pop("listings_expiry_time", None) 

345 self.requests_timeout = requests_timeout 

346 self.timeout = timeout 

347 self._session = None 

348 self._endpoint = endpoint_url 

349 self.session_kwargs = session_kwargs or {} 

350 self.default_location = default_location 

351 self.version_aware = version_aware 

352 

353 if check_connection: 

354 warnings.warn( 

355 "The `check_connection` argument is deprecated and will be removed in a future release.", 

356 DeprecationWarning, 

357 ) 

358 

359 self.credentials = GoogleCredentials( 

360 project, access, token, on_google=self.on_google 

361 ) 

362 

363 @property 

364 def _location(self): 

365 return self._endpoint or _location() 

366 

367 @property 

368 def base(self): 

369 return f"{self._location}/storage/v1/" 

370 

371 @property 

372 def batch_url_base(self): 

373 return f"{self._location}/batch/storage/v1" 

374 

375 @property 

376 def project(self): 

377 return self.credentials.project 

378 

379 # Clean up the aiohttp session 

380 # 

381 # This can run from the main thread if invoked via the weakref callback. 

382 # This can happen even if the `loop` parameter belongs to another thread 

383 # (e.g. the fsspec IO worker). The control flow here is intended to attempt 

384 # in-thread asynchronous cleanup first, then fallback to synchronous 

385 # cleanup (which can handle cross-thread calls). 

386 @staticmethod 

387 def close_session(loop, session: aiohttp.ClientSession, asynchronous=False): 

388 if session.closed: 

389 return 

390 force_close = False 

391 try: 

392 current_loop = asyncio.get_running_loop() 

393 except RuntimeError: 

394 current_loop = None 

395 if loop: 

396 # an explicit loop was set 

397 if loop.is_running(): 

398 loop.create_task(session.close()) 

399 else: 

400 force_close = True 

401 elif current_loop is not None and current_loop.is_running() and asynchronous: 

402 # running in a concurrnet context 

403 current_loop.create_task(session.close()) 

404 elif asyn.loop[0] is not None and asyn.loop[0].is_running(): 

405 try: 

406 asyn.sync(asyn.loop[0], session.close, timeout=0.1) 

407 except fsspec.FSTimeoutError: 

408 force_close = True 

409 else: 

410 force_close = True 

411 if force_close: 

412 # during shutdown, this is the fallback 

413 connector = getattr(session, "_connector", None) 

414 if connector is not None: 

415 # close after loop is dead 

416 connector._close() 

417 

418 async def _set_session(self): 

419 if self._session is None: 

420 self._session = await get_client(**self.session_kwargs) 

421 weakref.finalize( 

422 self, self.close_session, self.loop, self._session, self.asynchronous 

423 ) 

424 return self._session 

425 

426 @property 

427 def session(self): 

428 if self.asynchronous and self._session is None: 

429 raise RuntimeError("Please await _connect* before anything else") 

430 return self._session 

431 

432 @classmethod 

433 def _strip_protocol(cls, path): 

434 if isinstance(path, list): 

435 return [cls._strip_protocol(p) for p in path] 

436 path = stringify_path(path) 

437 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol 

438 for protocol in protos: 

439 if path.startswith(protocol + "://"): 

440 path = path[len(protocol) + 3 :] 

441 elif path.startswith(protocol + "::"): 

442 path = path[len(protocol) + 2 :] 

443 # use of root_marker to make minimum required path, e.g., "/" 

444 return path or cls.root_marker 

445 

446 @classmethod 

447 def _get_kwargs_from_urls(cls, path): 

448 _, _, generation = cls._split_path(path, version_aware=True) 

449 if generation is not None: 

450 return {"version_aware": True} 

451 return {} 

452 

453 def _get_params(self, kwargs): 

454 params = {k: v for k, v in kwargs.items() if v is not None} 

455 # needed for requester pays buckets 

456 if self.requester_pays: 

457 if isinstance(self.requester_pays, str): 

458 user_project = self.requester_pays 

459 else: 

460 user_project = self.project 

461 params["userProject"] = user_project 

462 return params 

463 

464 def _get_headers(self, headers): 

465 out = {} 

466 if headers is not None: 

467 out.update(headers) 

468 if "User-Agent" not in out: 

469 out["User-Agent"] = "python-gcsfs/" + version 

470 self.credentials.apply(out) 

471 return out 

472 

473 def _format_path(self, path, args): 

474 if not path.startswith("http"): 

475 path = self.base + path 

476 

477 if args: 

478 path = path.format(*[quote(p) for p in args]) 

479 return path 

480 

481 @retry_request(retries=retries) 

482 async def _request( 

483 self, method, path, *args, headers=None, json=None, data=None, **kwargs 

484 ): 

485 await self._set_session() 

486 if hasattr(data, "seek"): 

487 data.seek(0) 

488 async with self.session.request( 

489 method=method, 

490 url=self._format_path(path, args), 

491 params=self._get_params(kwargs), 

492 json=json, 

493 headers=self._get_headers(headers), 

494 data=data, 

495 timeout=self.requests_timeout, 

496 ) as r: 

497 status = r.status 

498 headers = r.headers 

499 info = r.request_info # for debug only 

500 contents = await r.read() 

501 

502 validate_response(status, contents, path, args) 

503 return status, headers, info, contents 

504 

505 async def _call( 

506 self, method, path, *args, json_out=False, info_out=False, **kwargs 

507 ): 

508 logger.debug(f"{method.upper()}: {path}, {args}, {kwargs.get('headers')}") 

509 status, headers, info, contents = await self._request( 

510 method, path, *args, **kwargs 

511 ) 

512 if json_out: 

513 return json.loads(contents) 

514 elif info_out: 

515 return info 

516 else: 

517 return headers, contents 

518 

519 call = asyn.sync_wrapper(_call) 

520 

521 @property 

522 def buckets(self): 

523 """Return list of available project buckets.""" 

524 return [ 

525 b["name"] 

526 for b in asyn.sync(self.loop, self._list_buckets, timeout=self.timeout) 

527 ] 

528 

529 def _process_object(self, bucket, object_metadata): 

530 """Process object resource into gcsfs object information format. 

531 

532 Process GCS object resource via type casting and attribute updates to 

533 the cache-able gcsfs object information format. Returns an updated copy 

534 of the object resource. 

535 

536 (See https://cloud.google.com/storage/docs/json_api/v1/objects#resource) 

537 """ 

538 result = dict(object_metadata) 

539 result["size"] = int(object_metadata.get("size", 0)) 

540 result["name"] = posixpath.join(bucket, object_metadata["name"]) 

541 result["type"] = "file" 

542 # Translate time metadata from GCS names to fsspec standard names. 

543 # TODO(issues/559): Remove legacy names `updated` and `timeCreated`? 

544 if "updated" in object_metadata: 

545 result["mtime"] = self._parse_timestamp(object_metadata["updated"]) 

546 if "timeCreated" in object_metadata: 

547 result["ctime"] = self._parse_timestamp(object_metadata["timeCreated"]) 

548 if "generation" in object_metadata or "metageneration" in object_metadata: 

549 result["generation"] = object_metadata.get("generation") 

550 result["metageneration"] = object_metadata.get("metageneration") 

551 

552 return result 

553 

554 async def _make_bucket_requester_pays(self, path, state=True): 

555 # this is really some form of setACL/chmod 

556 # perhaps should be automatic if gcs.requester_pays 

557 json = {"billing": {"requesterPays": state}} 

558 await self._call("PATCH", f"b/{path}", json=json) 

559 

560 make_bucket_requester_pays = asyn.sync_wrapper(_make_bucket_requester_pays) 

561 

562 async def _get_object(self, path): 

563 """Return object information at the given path.""" 

564 bucket, key, generation = self.split_path(path) 

565 

566 # Check if parent dir is in listing cache 

567 listing = self._ls_from_cache(path) 

568 if listing: 

569 name = "/".join((bucket, key)) 

570 for file_details in listing: 

571 if ( 

572 file_details["type"] == "file" 

573 and file_details["name"] == name 

574 and (not generation or file_details.get("generation") == generation) 

575 ): 

576 return file_details 

577 else: 

578 raise FileNotFoundError(path) 

579 

580 if not key: 

581 # Attempt to "get" the bucket root, return error instead of 

582 # listing. 

583 raise FileNotFoundError(path) 

584 

585 res = None 

586 # Work around various permission settings. Prefer an object get (storage.objects.get), but 

587 # fall back to a bucket list + filter to object name (storage.objects.list). 

588 try: 

589 res = await self._call( 

590 "GET", "b/{}/o/{}", bucket, key, json_out=True, generation=generation 

591 ) 

592 except OSError as e: 

593 if not str(e).startswith("Forbidden"): 

594 raise 

595 resp = await self._call( 

596 "GET", 

597 "b/{}/o", 

598 bucket, 

599 json_out=True, 

600 prefix=key, 

601 maxResults=1 if not generation else None, 

602 versions="true" if generation else None, 

603 ) 

604 for item in resp.get("items", []): 

605 if item["name"] == key and ( 

606 not generation or item.get("generation") == generation 

607 ): 

608 res = item 

609 break 

610 if res is None: 

611 raise FileNotFoundError(path) 

612 return self._process_object(bucket, res) 

613 

614 async def _list_objects(self, path, prefix="", versions=False, **kwargs): 

615 bucket, key, generation = self.split_path(path) 

616 path = path.rstrip("/") 

617 

618 # NOTE: the inventory report logic is experimental. 

619 inventory_report_info = kwargs.get("inventory_report_info", None) 

620 

621 # Only attempt to list from the cache when the user does not use 

622 # the inventory report service. 

623 if not inventory_report_info: 

624 try: 

625 clisting = self._ls_from_cache(path) 

626 hassubdirs = clisting and any( 

627 c["name"].rstrip("/") == path and c["type"] == "directory" 

628 for c in clisting 

629 ) 

630 if clisting and not hassubdirs: 

631 return clisting 

632 except FileNotFoundError: 

633 # not finding a bucket in list of "my" buckets is OK 

634 if key: 

635 raise 

636 

637 items, prefixes = await self._do_list_objects( 

638 path, 

639 prefix=prefix, 

640 versions=versions, 

641 **kwargs, 

642 ) 

643 

644 pseudodirs = [ 

645 { 

646 "bucket": bucket, 

647 "name": bucket + "/" + prefix.strip("/"), 

648 "size": 0, 

649 "storageClass": "DIRECTORY", 

650 "type": "directory", 

651 } 

652 for prefix in prefixes 

653 ] 

654 if not (items + pseudodirs): 

655 if key: 

656 return [await self._get_object(path)] 

657 else: 

658 return [] 

659 out = pseudodirs + items 

660 

661 use_snapshot_listing = inventory_report_info and inventory_report_info.get( 

662 "use_snapshot_listing" 

663 ) 

664 

665 max_results = kwargs.get("max_results") 

666 

667 # Don't cache prefixed/partial listings, in addition to 

668 # not using the inventory report service to do listing directly. 

669 if not prefix and not use_snapshot_listing and not max_results: 

670 self.dircache[path] = out 

671 return out 

672 

673 async def _do_list_objects( 

674 self, 

675 path, 

676 max_results=None, 

677 delimiter="/", 

678 prefix="", 

679 versions=False, 

680 **kwargs, 

681 ): 

682 """Object listing for the given {bucket}/{prefix}/ path.""" 

683 bucket, _path, generation = self.split_path(path) 

684 _path = "" if not _path else _path.rstrip("/") + "/" 

685 prefix = f"{_path}{prefix}" or None 

686 versions = bool(versions or generation) 

687 

688 # Page size of 5000 is officially supported across GCS. 

689 default_page_size = 5000 

690 

691 # NOTE: the inventory report logic is experimental. 

692 inventory_report_info = kwargs.get("inventory_report_info", None) 

693 

694 # Check if the user has configured inventory report option. 

695 if inventory_report_info is not None: 

696 items, prefixes = await InventoryReport.fetch_snapshot( 

697 gcs_file_system=self, 

698 inventory_report_info=inventory_report_info, 

699 prefix=prefix, 

700 ) 

701 

702 use_snapshot_listing = inventory_report_info.get("use_snapshot_listing") 

703 

704 # If the user wants to rely on the snapshot from the inventory report 

705 # for listing, directly return the results. 

706 if use_snapshot_listing: 

707 return items, prefixes 

708 

709 # Otherwise, use the snapshot to initiate concurrent listing. 

710 return await self._concurrent_list_objects_helper( 

711 items=items, 

712 bucket=bucket, 

713 delimiter=delimiter, 

714 prefix=prefix, 

715 versions=versions, 

716 page_size=default_page_size, 

717 **kwargs, 

718 ) 

719 

720 # If the user has not configured inventory report, proceed to use 

721 # sequential listing. 

722 else: 

723 return await self._sequential_list_objects_helper( 

724 bucket=bucket, 

725 delimiter=delimiter, 

726 start_offset=None, 

727 end_offset=None, 

728 prefix=prefix, 

729 versions=versions, 

730 max_results=max_results, 

731 **kwargs, 

732 ) 

733 

734 async def _concurrent_list_objects_helper( 

735 self, items, bucket, delimiter, prefix, versions, page_size, **kwargs 

736 ): 

737 """ 

738 Lists objects using coroutines, using the object names from the inventory 

739 report to split up the ranges. 

740 """ 

741 

742 # Extract out the names of the objects fetched from the inventory report. 

743 snapshot_object_names = sorted([item["name"] for item in items]) 

744 

745 # Determine the number of coroutines needed to concurrent listing. 

746 # Ideally, want each coroutine to fetch a single page of objects. 

747 num_coroutines = len(snapshot_object_names) // page_size + 1 

748 num_objects_per_coroutine = len(snapshot_object_names) // num_coroutines 

749 

750 start_offsets = [] 

751 end_offsets = [] 

752 

753 # Calculate the split splits of each coroutine (start offset and end offset). 

754 for i in range(num_coroutines): 

755 range_start = i * num_objects_per_coroutine 

756 if i == num_coroutines - 1: 

757 range_end = len(snapshot_object_names) 

758 else: 

759 range_end = range_start + num_objects_per_coroutine 

760 

761 if range_start == 0: 

762 prefix_start = None 

763 else: 

764 prefix_start = snapshot_object_names[range_start] 

765 

766 if range_end == len(snapshot_object_names): 

767 prefix_end = None 

768 else: 

769 prefix_end = snapshot_object_names[range_end] 

770 

771 start_offsets.append(prefix_start) 

772 end_offsets.append(prefix_end) 

773 

774 # Assign the coroutine all at once, and wait for them to finish listing. 

775 results = await asyncio.gather( 

776 *[ 

777 self._sequential_list_objects_helper( 

778 bucket=bucket, 

779 delimiter=delimiter, 

780 start_offset=start_offsets[i], 

781 end_offset=end_offsets[i], 

782 prefix=prefix, 

783 versions=versions, 

784 max_results=page_size, 

785 **kwargs, 

786 ) 

787 for i in range(0, len(start_offsets)) 

788 ] 

789 ) 

790 

791 items = [] 

792 prefixes = [] 

793 

794 # Concatenate the items and prefixes from each coroutine for final results. 

795 for i in range(len(results)): 

796 items_from_process, prefixes_from_process = results[i] 

797 items.extend(items_from_process) 

798 prefixes.extend(prefixes_from_process) 

799 

800 return items, prefixes 

801 

802 async def _sequential_list_objects_helper( 

803 self, 

804 bucket, 

805 delimiter, 

806 start_offset, 

807 end_offset, 

808 prefix, 

809 versions, 

810 max_results, 

811 items_per_call=1000, 

812 **kwargs, 

813 ): 

814 """ 

815 Sequential list objects within the start and end offset range. 

816 """ 

817 max_results = max_results if max_results else 10_000_000 

818 prefixes = [] 

819 items = [] 

820 num_items = min(items_per_call, max_results, 1000) 

821 page = await self._call_list_objects( 

822 bucket, 

823 delimiter=delimiter, 

824 prefix=prefix, 

825 startOffset=start_offset, 

826 endOffset=end_offset, 

827 maxResults=num_items, 

828 versions="true" if versions else None, 

829 **kwargs, 

830 ) 

831 

832 prefixes.extend(page.get("prefixes", [])) 

833 items.extend(page.get("items", [])) 

834 next_page_token = page.get("nextPageToken", None) 

835 

836 while len(items) + len(prefixes) < max_results and next_page_token is not None: 

837 num_items = min( 

838 items_per_call, max_results - (len(items) + len(prefixes)), 1000 

839 ) 

840 page = await self._call_list_objects( 

841 bucket, 

842 delimiter=delimiter, 

843 prefix=prefix, 

844 startOffset=start_offset, 

845 endOffset=end_offset, 

846 maxResults=num_items, 

847 pageToken=next_page_token, 

848 versions="true" if versions else None, 

849 **kwargs, 

850 ) 

851 

852 assert page["kind"] == "storage#objects" 

853 prefixes.extend(page.get("prefixes", [])) 

854 items.extend(page.get("items", [])) 

855 next_page_token = page.get("nextPageToken", None) 

856 

857 items = [self._process_object(bucket, i) for i in items] 

858 

859 return items, prefixes 

860 

861 async def _call_list_objects(self, bucket, **kwargs): 

862 """ 

863 Helper method to fetch a single page of object listing. 

864 Extracts valid GCS parameters from kwargs to prevent parameter pollution. 

865 """ 

866 

867 # Only pass valid parameters to the API call 

868 valid_kwargs = {k: v for k, v in kwargs.items() if k in _VALID_LIST_PARAMS} 

869 

870 return await self._call( 

871 "GET", 

872 "b/{}/o", 

873 bucket, 

874 json_out=True, 

875 **valid_kwargs, 

876 ) 

877 

878 async def _list_buckets(self): 

879 """Return list of all buckets under the current project.""" 

880 if "" not in self.dircache: 

881 items = [] 

882 page = await self._call("GET", "b", project=self.project, json_out=True) 

883 

884 assert page["kind"] == "storage#buckets" 

885 items.extend(page.get("items", [])) 

886 next_page_token = page.get("nextPageToken", None) 

887 

888 while next_page_token is not None: 

889 page = await self._call( 

890 "GET", 

891 "b", 

892 project=self.project, 

893 pageToken=next_page_token, 

894 json_out=True, 

895 ) 

896 

897 assert page["kind"] == "storage#buckets" 

898 items.extend(page.get("items", [])) 

899 next_page_token = page.get("nextPageToken", None) 

900 

901 buckets = [ 

902 {**i, "name": i["name"] + "/", "size": 0, "type": "directory"} 

903 for i in items 

904 ] 

905 self.dircache[""] = buckets 

906 return buckets 

907 return self.dircache[""] 

908 

909 def invalidate_cache(self, path=None): 

910 """ 

911 Invalidate listing cache for given path, it is reloaded on next use. 

912 

913 Parameters 

914 ---------- 

915 path: string or None 

916 If None, clear all listings cached else listings at or under given 

917 path. 

918 """ 

919 if path is None: 

920 logger.debug("invalidate_cache clearing cache") 

921 self.dircache.clear() 

922 else: 

923 path = self._strip_protocol(path).rstrip("/") 

924 

925 while path: 

926 self.dircache.pop(path, None) 

927 path = self._parent(path) 

928 

929 async def _mkdir( 

930 self, 

931 path, 

932 acl="projectPrivate", 

933 default_acl="bucketOwnerFullControl", 

934 location=None, 

935 create_parents=False, 

936 enable_versioning=False, 

937 enable_object_retention=False, 

938 iam_configuration=None, 

939 **kwargs, 

940 ): 

941 """ 

942 New bucket 

943 

944 If path is more than just a bucket, will create bucket if create_parents=True; 

945 otherwise is a noop. If create_parents is False and bucket does not exist, 

946 will produce FileNotFoundError. 

947 

948 Parameters 

949 ---------- 

950 path: str 

951 bucket name. If contains '/' (i.e., looks like subdir), will 

952 have no effect because GCS doesn't have real directories. 

953 acl: string, one of bACLs 

954 access for the bucket itself. See: 

955 https://cloud.google.com/storage/docs/access-control/lists#predefined-acl 

956 default_acl: str, one of ACLs 

957 default ACL for objects created in this bucket 

958 location: Optional[str] 

959 Location where buckets are created, like 'US' or 'EUROPE-WEST3'. 

960 If not provided, defaults to `self.default_location`. 

961 You can find a list of all available locations here: 

962 https://cloud.google.com/storage/docs/locations#available-locations 

963 create_parents: bool 

964 If True, creates the bucket in question, if it doesn't already exist 

965 enable_versioning: bool 

966 If True, creates the bucket in question with object versioning 

967 enabled. 

968 enable_object_retention: bool 

969 If True, creates the bucket in question with object retention 

970 permanently enabled. 

971 iam_configuration: dict 

972 If provided, sets the IAM policy for the bucket. This argument 

973 allows setting properties such as `{publicAccessPrevention: "enforced"}` 

974 and `{"uniformBucketLevelAccess": {"enabled": True}}`. If passed, `acl` 

975 and `default_acl` are explicitly ignored. 

976 **kwargs 

977 Additional parameters passed to the API call request body. See: 

978 https://cloud.google.com/storage/docs/json_api/v1/buckets/insert#request-body 

979 for all possible options. Pass nested parameters as dictionaries, e.g.: 

980 `{"autoclass": {"enabled": True}}` 

981 """ 

982 bucket, object, generation = self.split_path(path) 

983 if bucket in ["", "/"]: 

984 raise ValueError("Cannot create root bucket") 

985 if "/" in path and create_parents and await self._exists(bucket): 

986 # nothing to do 

987 return 

988 if "/" in path: 

989 if await self._exists(bucket): 

990 return 

991 if not create_parents: 

992 raise FileNotFoundError(bucket) 

993 

994 json_data = {"name": bucket} 

995 location = location or self.default_location 

996 if location: 

997 json_data["location"] = location 

998 if enable_versioning: 

999 json_data["versioning"] = {"enabled": True} 

1000 if iam_configuration: 

1001 json_data["iamConfiguration"] = iam_configuration 

1002 acl = None 

1003 default_acl = None 

1004 if kwargs: 

1005 json_data.update(kwargs) 

1006 

1007 await self._call( 

1008 method="POST", 

1009 path="b", 

1010 predefinedAcl=acl, 

1011 project=self.project, 

1012 predefinedDefaultObjectAcl=default_acl, 

1013 enableObjectRetention=str(enable_object_retention).lower(), 

1014 json=json_data, 

1015 json_out=True, 

1016 ) 

1017 self.invalidate_cache(bucket) 

1018 

1019 mkdir = asyn.sync_wrapper(_mkdir) 

1020 

1021 async def _rmdir(self, bucket): 

1022 """Delete an empty bucket 

1023 

1024 Parameters 

1025 ---------- 

1026 bucket: str 

1027 bucket name. If contains '/' (i.e., looks like subdir), will 

1028 have no effect because GCS doesn't have real directories. 

1029 """ 

1030 bucket = bucket.rstrip("/") 

1031 if "/" in bucket: 

1032 return 

1033 await self._call("DELETE", "b/" + bucket, json_out=False) 

1034 self.invalidate_cache(bucket) 

1035 self.invalidate_cache("") 

1036 

1037 rmdir = asyn.sync_wrapper(_rmdir) 

1038 

1039 def modified(self, path): 

1040 return self.info(path)["mtime"] 

1041 

1042 def created(self, path): 

1043 return self.info(path)["ctime"] 

1044 

1045 def _parse_timestamp(self, timestamp): 

1046 assert timestamp.endswith("Z") 

1047 timestamp = timestamp[:-1] 

1048 timestamp = timestamp + "0" * (6 - len(timestamp.rsplit(".", 1)[-1])) 

1049 return datetime.fromisoformat(timestamp + "+00:00") 

1050 

1051 async def _info(self, path, generation=None, **kwargs): 

1052 """File information about this path.""" 

1053 path = self._strip_protocol(path).rstrip("/") 

1054 if "/" not in path: 

1055 try: 

1056 out = await self._call("GET", f"b/{path}", json_out=True) 

1057 out.update(size=0, type="directory") 

1058 except OSError: 

1059 # GET bucket failed, try ls; will have no metadata 

1060 exists = await self._ls(path) 

1061 if exists: 

1062 out = {"name": path, "size": 0, "type": "directory"} 

1063 else: 

1064 raise FileNotFoundError(path) 

1065 return out 

1066 # Check directory cache for parent dir 

1067 parent_path = self._parent(path) 

1068 parent_cache = self._ls_from_cache(parent_path) 

1069 bucket, key, path_generation = self.split_path(path) 

1070 generation = _coalesce_generation(generation, path_generation) 

1071 if parent_cache: 

1072 name = "/".join((bucket, key)) 

1073 for o in parent_cache: 

1074 if o["name"].rstrip("/") == name and ( 

1075 not generation or o.get("generation") == generation 

1076 ): 

1077 return o 

1078 if self._ls_from_cache(path): 

1079 # this is a directory 

1080 return { 

1081 "bucket": bucket, 

1082 "name": path, 

1083 "size": 0, 

1084 "storageClass": "DIRECTORY", 

1085 "type": "directory", 

1086 } 

1087 

1088 async with parallel_tasks_first_completed( 

1089 [ 

1090 self._get_object(path), 

1091 self._get_directory_info(path, bucket, key, generation), 

1092 ] 

1093 ) as (tasks, done, pending): 

1094 get_object_task, get_directory_info_task = tasks 

1095 

1096 try: 

1097 get_object_res = await get_object_task 

1098 if not _is_directory_marker(get_object_res): 

1099 return get_object_res 

1100 except FileNotFoundError: 

1101 pass 

1102 return await get_directory_info_task 

1103 

1104 async def _get_directory_info(self, path, bucket, key, generation): 

1105 """ 

1106 Internal method to check if a path is a directory by listing objects. 

1107 """ 

1108 out = await self._list_objects(path, max_results=1) 

1109 exact = next((o for o in out if o["name"].rstrip("/") == path), None) 

1110 if exact and not _is_directory_marker(exact): 

1111 # exact hit 

1112 return exact 

1113 elif out: 

1114 # other stuff - must be a directory 

1115 return { 

1116 "bucket": bucket, 

1117 "name": path, 

1118 "size": 0, 

1119 "storageClass": "DIRECTORY", 

1120 "type": "directory", 

1121 } 

1122 else: 

1123 raise FileNotFoundError(path) 

1124 

1125 async def _ls( 

1126 self, path, detail=False, prefix="", versions=False, refresh=False, **kwargs 

1127 ): 

1128 """List objects under the given '/{bucket}/{prefix} path.""" 

1129 path = self._strip_protocol(path).rstrip("/") 

1130 

1131 if refresh: 

1132 self.invalidate_cache(path) 

1133 if path in ["/", ""]: 

1134 out = await self._list_buckets() 

1135 else: 

1136 out = [] 

1137 dir_names = set() 

1138 for entry in await self._list_objects( 

1139 path, prefix=prefix, versions=versions, **kwargs 

1140 ): 

1141 if _is_directory_marker(entry): 

1142 entry = { 

1143 "bucket": entry["bucket"], 

1144 "name": path.rstrip("/"), 

1145 "size": 0, 

1146 "storageClass": "DIRECTORY", 

1147 "type": "directory", 

1148 } 

1149 

1150 if entry["type"] == "directory": 

1151 if entry["name"] in dir_names: 

1152 continue 

1153 dir_names.add(entry["name"]) 

1154 

1155 if versions and "generation" in entry: 

1156 entry = entry.copy() 

1157 entry["name"] = f"{entry['name']}#{entry['generation']}" 

1158 

1159 out.append(entry) 

1160 

1161 out.sort(key=lambda e: (e["name"])) 

1162 

1163 if detail: 

1164 return out 

1165 else: 

1166 return [o["name"] for o in out] 

1167 

1168 def url(self, path): 

1169 """Get HTTP URL of the given path""" 

1170 u = "{}/download/storage/v1/b/{}/o/{}?alt=media{}" 

1171 bucket, object, generation = self.split_path(path) 

1172 object = quote(object) 

1173 return u.format( 

1174 self._location, 

1175 bucket, 

1176 object, 

1177 f"&generation={generation}" if generation else "", 

1178 ) 

1179 

1180 async def _cat_file_sequential(self, path, start=None, end=None, **kwargs): 

1181 """Simple one-shot get of file data""" 

1182 # if start and end are both provided and valid, but start >= end, return empty bytes 

1183 # Otherwise, _process_limits would generate an invalid HTTP range (e.g. "bytes=5-4" 

1184 # for start=5, end=5), causing the server to return the whole file instead of nothing. 

1185 if start is not None and end is not None and start >= end >= 0: 

1186 return b"" 

1187 

1188 u2 = self.url(path) 

1189 if start is not None or end is not None: 

1190 head = {"Range": await self._process_limits(path, start, end)} 

1191 else: 

1192 head = {} 

1193 

1194 headers, out = await self._call("GET", u2, headers=head) 

1195 return out 

1196 

1197 async def _cat_file_concurrent( 

1198 self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs 

1199 ): 

1200 """Concurrent fetch of file data""" 

1201 if start is None: 

1202 start = 0 

1203 if end is None: 

1204 end = (await self._info(path))["size"] 

1205 if start >= end: 

1206 return b"" 

1207 

1208 if concurrency <= 1 or end - start < self.MIN_CHUNK_SIZE_FOR_CONCURRENCY: 

1209 return await self._cat_file_sequential(path, start=start, end=end, **kwargs) 

1210 

1211 total_size = end - start 

1212 part_size = total_size // concurrency 

1213 tasks = [] 

1214 

1215 for i in range(concurrency): 

1216 offset = start + (i * part_size) 

1217 actual_size = ( 

1218 part_size if i < concurrency - 1 else total_size - (i * part_size) 

1219 ) 

1220 tasks.append( 

1221 asyncio.create_task( 

1222 self._cat_file_sequential( 

1223 path, start=offset, end=offset + actual_size, **kwargs 

1224 ) 

1225 ) 

1226 ) 

1227 

1228 try: 

1229 results = await asyncio.gather(*tasks) 

1230 return b"".join(results) 

1231 except BaseException as e: 

1232 for t in tasks: 

1233 if not t.done(): 

1234 t.cancel() 

1235 await asyncio.gather(*tasks, return_exceptions=True) 

1236 raise e 

1237 

1238 async def _cat_file( 

1239 self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs 

1240 ): 

1241 """Simple one-shot, or concurrent get of file data""" 

1242 if concurrency > 1: 

1243 return await self._cat_file_concurrent( 

1244 path, start=start, end=end, concurrency=concurrency, **kwargs 

1245 ) 

1246 

1247 # While we could just call _cat_file_concurrent(concurrency=1), we are choosing 

1248 # to keep it separate because concurrency code path is still in an experimental phase. 

1249 # Once concurrency code path is stabilized, we can remove this if-else condition. 

1250 return await self._cat_file_sequential(path, start=start, end=end, **kwargs) 

1251 

1252 async def _getxattr(self, path, attr): 

1253 """Get user-defined metadata attribute""" 

1254 meta = (await self._info(path)).get("metadata", {}) 

1255 return meta[attr] 

1256 

1257 getxattr = asyn.sync_wrapper(_getxattr) 

1258 

1259 async def _setxattrs( 

1260 self, 

1261 path, 

1262 content_type=None, 

1263 content_encoding=None, 

1264 fixed_key_metadata=None, 

1265 **kwargs, 

1266 ): 

1267 """Set/delete/add writable metadata attributes 

1268 

1269 Note: uses PATCH method (update), leaving unedited keys alone. 

1270 fake-gcs-server:latest does not seem to support this. 

1271 

1272 Parameters 

1273 ---------- 

1274 content_type: str 

1275 If not None, set the content-type to this value 

1276 content_encoding: str 

1277 This parameter is deprecated, you may use fixed_key_metadata instead. 

1278 If not None, set the content-encoding. 

1279 See https://cloud.google.com/storage/docs/transcoding 

1280 fixed_key_metadata: dict 

1281 Google metadata, in key/value pairs, supported keys: 

1282 - cache_control 

1283 - content_disposition 

1284 - content_encoding 

1285 - content_language 

1286 - custom_time 

1287 

1288 More info: 

1289 https://cloud.google.com/storage/docs/metadata#mutable 

1290 kw_args: key-value pairs like field="value" or field=None 

1291 value must be string to add or modify, or None to delete 

1292 

1293 Returns 

1294 ------- 

1295 Entire metadata after update (even if only path is passed) 

1296 """ 

1297 i_json = {"metadata": kwargs} 

1298 if content_type is not None: 

1299 i_json["contentType"] = content_type 

1300 if content_encoding is not None: 

1301 logger.warn( 

1302 "setxattrs: content_encoding parameter is now deprecated " 

1303 "you may use `fixed_key_metadata` instead" 

1304 ) 

1305 i_json["contentEncoding"] = content_encoding 

1306 i_json.update(_convert_fixed_key_metadata(fixed_key_metadata)) 

1307 

1308 bucket, key, generation = self.split_path(path) 

1309 o_json = await self._call( 

1310 "PATCH", 

1311 "b/{}/o/{}", 

1312 bucket, 

1313 key, 

1314 fields="metadata", 

1315 json=i_json, 

1316 json_out=True, 

1317 ) 

1318 return o_json.get("metadata", {}) 

1319 

1320 setxattrs = asyn.sync_wrapper(_setxattrs) 

1321 

1322 async def _merge(self, path, paths, acl=None): 

1323 """Concatenate objects within a single bucket""" 

1324 bucket, key, generation = self.split_path(path) 

1325 source = [{"name": self.split_path(p)[1]} for p in paths] 

1326 await self._call( 

1327 "POST", 

1328 "b/{}/o/{}/compose", 

1329 bucket, 

1330 key, 

1331 destinationPredefinedAcl=acl, 

1332 headers={"Content-Type": "application/json"}, 

1333 json={ 

1334 "sourceObjects": source, 

1335 "kind": "storage#composeRequest", 

1336 "destination": {"name": key, "bucket": bucket}, 

1337 }, 

1338 ) 

1339 

1340 merge = asyn.sync_wrapper(_merge) 

1341 

1342 # TODO: Add async mv method in the async.py and remove from GCSFileSystem. 

1343 async def _mv( 

1344 self, path1, path2, recursive=False, maxdepth=None, batch_size=None, **kwargs 

1345 ): 

1346 if path1 == path2: 

1347 return 

1348 

1349 if isinstance(path1, list) and isinstance(path2, list): 

1350 # No need to expand paths when both source and destination 

1351 # are provided as lists 

1352 paths1 = path1 

1353 paths2 = path2 

1354 else: 

1355 source_is_str = isinstance(path1, str) 

1356 paths1 = await self._expand_path( 

1357 path1, maxdepth=maxdepth, recursive=recursive 

1358 ) 

1359 if source_is_str and (not recursive or maxdepth is not None): 

1360 # Non-recursive glob does not move directories 

1361 paths1 = [ 

1362 p 

1363 for p in paths1 

1364 if not (asyn.trailing_sep(p) or await self._isdir(p)) 

1365 ] 

1366 if not paths1: 

1367 return 

1368 

1369 source_is_file = len(paths1) == 1 

1370 dest_is_dir = isinstance(path2, str) and ( 

1371 asyn.trailing_sep(path2) or await self._isdir(path2) 

1372 ) 

1373 

1374 exists = source_is_str and ( 

1375 (has_magic(path1) and source_is_file) 

1376 or ( 

1377 not has_magic(path1) 

1378 and dest_is_dir 

1379 and not asyn.trailing_sep(path1) 

1380 ) 

1381 ) 

1382 paths2 = other_paths( 

1383 paths1, 

1384 path2, 

1385 exists=exists, 

1386 flatten=not source_is_str, 

1387 ) 

1388 

1389 batch_size = batch_size or self.batch_size 

1390 result = await asyn._run_coros_in_chunks( 

1391 [self._mv_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)], 

1392 batch_size=batch_size, 

1393 return_exceptions=True, 

1394 nofiles=True, 

1395 ) 

1396 

1397 for res, p1 in zip(result, paths1): 

1398 if isinstance(res, Exception): 

1399 if isinstance(res, FileNotFoundError) and recursive: 

1400 # Ignore FileNotFoundError for implicit directories returned by _expand_path. 

1401 if any(p.startswith(p1.rstrip("/") + "/") for p in paths1): 

1402 continue 

1403 raise res 

1404 

1405 mv = asyn.sync_wrapper(_mv) 

1406 

1407 async def _cp_file(self, path1, path2, acl=None, **kwargs): 

1408 """Duplicate remote file""" 

1409 b1, k1, g1 = self.split_path(path1) 

1410 b2, k2, g2 = self.split_path(path2) 

1411 if g2: 

1412 raise ValueError("Cannot write to specific object generation") 

1413 out = await self._call( 

1414 "POST", 

1415 "b/{}/o/{}/rewriteTo/b/{}/o/{}", 

1416 b1, 

1417 k1, 

1418 b2, 

1419 k2, 

1420 headers={"Content-Type": "application/json"}, 

1421 destinationPredefinedAcl=acl, 

1422 json_out=True, 

1423 sourceGeneration=g1, 

1424 ) 

1425 while out["done"] is not True: 

1426 out = await self._call( 

1427 "POST", 

1428 "b/{}/o/{}/rewriteTo/b/{}/o/{}", 

1429 b1, 

1430 k1, 

1431 b2, 

1432 k2, 

1433 headers={"Content-Type": "application/json"}, 

1434 rewriteToken=out["rewriteToken"], 

1435 destinationPredefinedAcl=acl, 

1436 json_out=True, 

1437 sourceGeneration=g1, 

1438 ) 

1439 self.invalidate_cache(self._parent(path2)) 

1440 

1441 async def _mv_file_cache_update(self, path1, path2, response=None): 

1442 self.invalidate_cache(self._parent(path1)) 

1443 self.invalidate_cache(self._parent(path2)) 

1444 

1445 async def _mv_file(self, path1, path2, **kwargs): 

1446 src_bucket, src_key, generation1 = self.split_path(path1) 

1447 dest_bucket, dest_key, generation2 = self.split_path(path2) 

1448 

1449 if generation2: 

1450 raise ValueError("Cannot move to specific object generation") 

1451 

1452 if src_bucket == dest_bucket and src_key and dest_key: 

1453 try: 

1454 out = await self._call( 

1455 "POST", 

1456 "b/{}/o/{}/moveTo/o/{}", 

1457 src_bucket, 

1458 src_key, 

1459 dest_key, 

1460 sourceGeneration=generation1, 

1461 headers={ 

1462 "Content-Type": "application/json", 

1463 "X-Goog-GCS-Idempotency-Token": str(uuid.uuid4()), 

1464 }, 

1465 json_out=True, 

1466 ) 

1467 await self._mv_file_cache_update(path1, path2, out) 

1468 return 

1469 except FileNotFoundError: 

1470 # Raise immediately because fallback will also fail when file is not found. 

1471 raise 

1472 except Exception as e: 

1473 # TODO: Fallback is added to make sure there is smooth transition, it can be removed 

1474 # once we have metrics proving that moveTo API is working properly for all bucket types. 

1475 logger.warning( 

1476 f"Failed to move file using moveTo API: {e}. Falling back to copy/delete." 

1477 ) 

1478 

1479 await super()._mv_file(path1, path2, **kwargs) 

1480 

1481 mv_file = asyn.sync_wrapper(_mv_file) 

1482 

1483 async def _rm_file(self, path, **kwargs): 

1484 bucket, key, generation = self.split_path(path) 

1485 if key: 

1486 await self._call("DELETE", "b/{}/o/{}", bucket, key, generation=generation) 

1487 # TODO: This can be optimized for HNS buckets by not invalidating the entire parent 

1488 # directory structure from cache but to just remove the deleted file entry from immediate parent's cache. 

1489 self.invalidate_cache(posixpath.dirname(self._strip_protocol(path))) 

1490 else: 

1491 await self._rmdir(path) 

1492 

1493 async def _rm_files(self, paths): 

1494 import random 

1495 

1496 template = ( 

1497 "\n--===============7330845974216740156==\n" 

1498 "Content-Type: application/http\n" 

1499 "Content-Transfer-Encoding: binary\n" 

1500 "Content-ID: <b29c5de2-0db4-490b-b421-6a51b598bd11+{i}>" 

1501 "\n\nDELETE /storage/v1/b/{bucket}/o/{key}{query} HTTP/1.1\n" 

1502 "Content-Type: application/json\n" 

1503 "accept: application/json\ncontent-length: 0\n" 

1504 ) 

1505 out = [] 

1506 # Splitting requests into batches 

1507 # See https://cloud.google.com/storage/docs/batch 

1508 for retry in range(1, 6): 

1509 remaining = [] 

1510 chunk = paths 

1511 parts = [] 

1512 for i, p in enumerate(chunk): 

1513 bucket, key, generation = self.split_path(p) 

1514 query_params = self._get_params( 

1515 {"generation": generation} if generation else {} 

1516 ) 

1517 query = ( 

1518 ("?" + "&".join(f"{k}={v}" for k, v in query_params.items())) 

1519 if query_params 

1520 else "" 

1521 ) 

1522 parts.append( 

1523 template.format( 

1524 i=i + 1, 

1525 bucket=quote(bucket), 

1526 key=quote(key), 

1527 query=query, 

1528 ) 

1529 ) 

1530 body = "".join(parts) 

1531 headers, content = await self._call( 

1532 "POST", 

1533 self.batch_url_base, 

1534 headers={ 

1535 "Content-Type": 'multipart/mixed; boundary="==========' 

1536 '=====7330845974216740156=="' 

1537 }, 

1538 data=body + "\n--===============7330845974216740156==--", 

1539 ) 

1540 

1541 boundary = headers["Content-Type"].split("=", 1)[1] 

1542 parents = set(self._parent(p) for p in paths) | set(paths) 

1543 [self.invalidate_cache(parent) for parent in parents] 

1544 txt = content.decode() 

1545 responses = txt.split(boundary)[1:-1] 

1546 for path, response in zip(paths, responses): 

1547 m = re.search("HTTP/[0-9.]+ ([0-9]+)", response) 

1548 code = int(m.groups()[0]) if m else None 

1549 if code in [200, 204]: 

1550 out.append(path) 

1551 elif code in errs and retry < 5: 

1552 remaining.append(path) 

1553 else: 

1554 msg = re.search("{(.*)}", response.replace("\n", "")) 

1555 if msg: 

1556 msg2 = re.search("({.*})", msg.groups()[0]) 

1557 else: 

1558 msg2 = None 

1559 if msg and msg2: 

1560 out.append(OSError(msg2.groups()[0])) 

1561 else: 

1562 out.append(OSError(str(path, code))) 

1563 if remaining: 

1564 paths = remaining 

1565 await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32)) 

1566 else: 

1567 break 

1568 return out 

1569 

1570 @property 

1571 def on_google(self): 

1572 # match "torage" to handle both "storage" and "Storage" 

1573 return f"torage.{_gcp_universe_domain()}" in self._location 

1574 

1575 async def _delete_files(self, files, batchsize): 

1576 """Helper to delete files in batches.""" 

1577 if self.on_google: 

1578 # emulators do not support batch 

1579 return sum( 

1580 await asyn._run_coros_in_chunks( 

1581 [ 

1582 self._rm_files(files[i : i + batchsize]) 

1583 for i in range(0, len(files), batchsize) 

1584 ], 

1585 return_exceptions=True, 

1586 ), 

1587 [], 

1588 ) 

1589 else: 

1590 return await asyn._run_coros_in_chunks( 

1591 [self._rm_file(f) for f in files], return_exceptions=True, batch_size=5 

1592 ) 

1593 

1594 async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): 

1595 paths = await self._expand_path(path, recursive=recursive, maxdepth=maxdepth) 

1596 files = [p for p in paths if self.split_path(p)[1]] 

1597 dirs = [p for p in paths if not self.split_path(p)[1]] 

1598 exs = await self._delete_files(files, batchsize) 

1599 

1600 # buckets 

1601 exs.extend( 

1602 await asyncio.gather( 

1603 *[self._rmdir(d) for d in dirs], return_exceptions=True 

1604 ) 

1605 ) 

1606 errors = [ 

1607 ex 

1608 for ex in exs 

1609 if isinstance(ex, Exception) 

1610 and "No such object" not in str(ex) 

1611 and not isinstance(ex, FileNotFoundError) 

1612 ] 

1613 if errors: 

1614 raise errors[0] 

1615 exs = [ 

1616 ex 

1617 for ex in exs 

1618 if "No such object" not in str(ex) and not isinstance(ex, FileNotFoundError) 

1619 ] 

1620 if not exs: 

1621 # nothing got deleted 

1622 raise FileNotFoundError(path) 

1623 return exs 

1624 

1625 rm = asyn.sync_wrapper(_rm) 

1626 

1627 async def _pipe_file( 

1628 self, 

1629 path, 

1630 data, 

1631 metadata=None, 

1632 consistency=None, 

1633 content_type="application/octet-stream", 

1634 fixed_key_metadata=None, 

1635 chunksize=50 * 2**20, 

1636 mode="overwrite", 

1637 ): 

1638 # enforce blocksize should be a multiple of 2**18 

1639 consistency = consistency or self.consistency 

1640 bucket, key, generation = self.split_path(path) 

1641 size = len(data) 

1642 out = None 

1643 if size < chunksize: 

1644 location = await simple_upload( 

1645 self, 

1646 bucket, 

1647 key, 

1648 data, 

1649 metadata, 

1650 consistency, 

1651 content_type, 

1652 fixed_key_metadata=fixed_key_metadata, 

1653 mode=mode, 

1654 ) 

1655 else: 

1656 location = await initiate_upload( 

1657 self, 

1658 bucket, 

1659 key, 

1660 content_type, 

1661 metadata, 

1662 fixed_key_metadata=fixed_key_metadata, 

1663 mode=mode, 

1664 ) 

1665 try: 

1666 for offset in range(0, len(data), chunksize): 

1667 bit = data[offset : offset + chunksize] 

1668 out = await upload_chunk( 

1669 self, location, bit, offset, size, content_type 

1670 ) 

1671 except Exception: 

1672 await self._call( 

1673 "DELETE", 

1674 location.replace("&ifGenerationMatch=0", ""), 

1675 ) 

1676 raise 

1677 

1678 checker = get_consistency_checker(consistency) 

1679 checker.update(data) 

1680 checker.validate_json_response(out) 

1681 

1682 self.invalidate_cache(self._parent(path)) 

1683 return location 

1684 

1685 async def _put_file( 

1686 self, 

1687 lpath, 

1688 rpath, 

1689 metadata=None, 

1690 consistency=None, 

1691 content_type=None, 

1692 chunksize=50 * 2**20, 

1693 callback=None, 

1694 fixed_key_metadata=None, 

1695 mode="overwrite", 

1696 **kwargs, 

1697 ): 

1698 # enforce blocksize should be a multiple of 2**18 

1699 if os.path.isdir(lpath): 

1700 return 

1701 if content_type is None: 

1702 content_type, _ = mimetypes.guess_type(lpath) 

1703 if content_type is None: 

1704 content_type = "application/octet-stream" 

1705 callback = callback or NoOpCallback() 

1706 consistency = consistency or self.consistency 

1707 checker = get_consistency_checker(consistency) 

1708 bucket, key, generation = self.split_path(rpath) 

1709 if generation: 

1710 raise ValueError("Cannot write to specific object generation") 

1711 with open(lpath, "rb") as f0: 

1712 size = f0.seek(0, 2) 

1713 f0.seek(0) 

1714 callback.set_size(size) 

1715 

1716 if size < 5 * 2**20: 

1717 await simple_upload( 

1718 self, 

1719 bucket, 

1720 key, 

1721 f0.read(), 

1722 consistency=consistency, 

1723 metadatain=metadata, 

1724 content_type=content_type, 

1725 fixed_key_metadata=fixed_key_metadata, 

1726 mode=mode, 

1727 ) 

1728 callback.absolute_update(size) 

1729 

1730 else: 

1731 location = await initiate_upload( 

1732 self, 

1733 bucket, 

1734 key, 

1735 content_type, 

1736 metadata=metadata, 

1737 fixed_key_metadata=fixed_key_metadata, 

1738 mode=mode, 

1739 ) 

1740 offset = 0 

1741 try: 

1742 while True: 

1743 bit = f0.read(chunksize) 

1744 if not bit: 

1745 break 

1746 out = await upload_chunk( 

1747 self, location, bit, offset, size, content_type 

1748 ) 

1749 offset += len(bit) 

1750 callback.absolute_update(offset) 

1751 checker.update(bit) 

1752 except Exception: 

1753 await self._call( 

1754 "DELETE", 

1755 self.location.replace("&ifGenerationMatch=0", ""), 

1756 ) 

1757 raise 

1758 

1759 checker.validate_json_response(out) 

1760 

1761 self.invalidate_cache(self._parent(rpath)) 

1762 

1763 async def _isdir(self, path): 

1764 

1765 try: 

1766 return (await self._info(path))["type"] == "directory" 

1767 except OSError: 

1768 return False 

1769 

1770 async def _find( 

1771 self, 

1772 path, 

1773 withdirs=False, 

1774 detail=False, 

1775 prefix="", 

1776 versions=False, 

1777 maxdepth=None, 

1778 update_cache=True, 

1779 **kwargs, 

1780 ): 

1781 path = self._strip_protocol(path) 

1782 

1783 if maxdepth is not None and maxdepth < 1: 

1784 raise ValueError("maxdepth must be at least 1") 

1785 

1786 # Fetch objects as if the path is a directory 

1787 objects, _ = await self._do_list_objects( 

1788 path, delimiter="", prefix=prefix, versions=versions 

1789 ) 

1790 

1791 if not objects: 

1792 # Fetch objects as if the path is a file 

1793 bucket, key, _ = self.split_path(path) 

1794 if prefix: 

1795 _path = "" if not key else key.rstrip("/") + "/" 

1796 _prefix = f"{_path}{prefix}" 

1797 else: 

1798 _prefix = key 

1799 objects, _ = await self._do_list_objects( 

1800 bucket, delimiter="", prefix=_prefix, versions=versions 

1801 ) 

1802 else: 

1803 _prefix = prefix 

1804 

1805 path2 = path.rstrip("/") + "/" 

1806 

1807 if not prefix: 

1808 objects = [ 

1809 o for o in objects if o["name"].startswith(path2) or o["name"] == path 

1810 ] 

1811 

1812 dirs = self._get_dirs_and_update_cache( 

1813 path, objects, prefix=prefix, update_cache=update_cache 

1814 ) 

1815 

1816 if withdirs: 

1817 objects = sorted(objects + list(dirs.values()), key=lambda x: x["name"]) 

1818 

1819 if maxdepth: 

1820 # Filter returned objects based on requested maxdepth 

1821 depth = path.rstrip("/").count("/") + maxdepth 

1822 objects = list(filter(lambda o: o["name"].count("/") <= depth, objects)) 

1823 

1824 if detail: 

1825 if versions: 

1826 return {f"{o['name']}#{o['generation']}": o for o in objects} 

1827 return {o["name"]: o for o in objects} 

1828 

1829 if versions: 

1830 return [f"{o['name']}#{o['generation']}" for o in objects] 

1831 return [o["name"] for o in objects] 

1832 

1833 def _get_dirs_and_update_cache(self, path, objects, prefix="", update_cache=True): 

1834 """ 

1835 Populates the directory cache from a list of object details. 

1836 

1837 This method reconstructs the directory hierarchy from a flat list 

1838 of objects and update the cache, which improves the performance of 

1839 subsequent `ls` calls. 

1840 

1841 Parameters 

1842 ---------- 

1843 path: str 

1844 The root path of the find operation. 

1845 objects: list[dict] 

1846 A list of objects from which directories are extracted and cache is updated. 

1847 prefix: str 

1848 If a prefix is provided, the directory cache will *not* be updated, 

1849 as the object list is considered partial. 

1850 update_cache: bool 

1851 Cache won't be updated if update_cache is False. 

1852 

1853 Returns 

1854 ------- 

1855 dict: A dictionary of all pseudo-directory entries created. 

1856 """ 

1857 dirs = {} 

1858 cache_entries = {} 

1859 

1860 for obj in objects: 

1861 # For native HNS empty folders, which are returned as directory types 

1862 # but are not placeholders, we need to ensure they have an entry in the cache. 

1863 if obj.get("type") == "directory": 

1864 cache_entries.setdefault(obj["name"], {}) 

1865 

1866 parent = self._parent(obj["name"]) 

1867 previous = obj 

1868 

1869 while parent: 

1870 dir_key = self.split_path(parent)[1] 

1871 if not dir_key or len(parent) < len(path.rstrip("/")): 

1872 break 

1873 

1874 dirs[parent] = { 

1875 "Key": dir_key, 

1876 "Size": 0, 

1877 "name": parent, 

1878 "StorageClass": "DIRECTORY", 

1879 "type": "directory", 

1880 "size": 0, 

1881 } 

1882 

1883 listing = cache_entries.setdefault(parent, {}) 

1884 name = previous["name"] 

1885 if name not in listing: 

1886 listing[name] = previous 

1887 

1888 previous = dirs[parent] 

1889 parent = self._parent(parent) 

1890 if not prefix and update_cache: 

1891 cache_entries_list = {k: list(v.values()) for k, v in cache_entries.items()} 

1892 self.dircache.update(cache_entries_list) 

1893 return dirs 

1894 

1895 @retry_request(retries=retries) 

1896 async def _get_file_request( 

1897 self, rpath, lpath, *args, headers=None, callback=None, **kwargs 

1898 ): 

1899 consistency = kwargs.pop("consistency", self.consistency) 

1900 await self._set_session() 

1901 async with self.session.get( 

1902 url=rpath, 

1903 params=self._get_params(kwargs), 

1904 headers=self._get_headers(headers), 

1905 timeout=self.requests_timeout, 

1906 ) as r: 

1907 validate_response(r.status, None, rpath) 

1908 try: 

1909 size = int(r.headers["content-length"]) 

1910 except (KeyError, ValueError): 

1911 size = None 

1912 callback.set_size(size) 

1913 

1914 checker = get_consistency_checker(consistency) 

1915 lparent = os.path.dirname(lpath) or os.curdir 

1916 os.makedirs(lparent, exist_ok=True) 

1917 with open(lpath, "wb") as f2: 

1918 while True: 

1919 data = await r.content.read(4096 * 32) 

1920 if not data: 

1921 break 

1922 f2.write(data) 

1923 checker.update(data) 

1924 callback.relative_update(len(data)) 

1925 

1926 validate_response(r.status, data, rpath) # validate http request 

1927 checker.validate_http_response(r) # validate file consistency 

1928 return r.status, r.headers, r.request_info, data 

1929 

1930 async def _get_file(self, rpath, lpath, callback=None, **kwargs): 

1931 u2 = self.url(rpath) 

1932 if os.path.isdir(lpath): 

1933 return 

1934 callback = callback or NoOpCallback() 

1935 await self._get_file_request(u2, lpath, callback=callback, **kwargs) 

1936 

1937 def _open( 

1938 self, 

1939 path, 

1940 mode="rb", 

1941 block_size=None, 

1942 cache_options=None, 

1943 acl=None, 

1944 consistency=None, 

1945 metadata=None, 

1946 autocommit=True, 

1947 fixed_key_metadata=None, 

1948 generation=None, 

1949 **kwargs, 

1950 ): 

1951 """ 

1952 See ``GCSFile``. 

1953 

1954 consistency: None or str 

1955 If None, use default for this instance 

1956 """ 

1957 if block_size is None: 

1958 block_size = self.default_block_size 

1959 const = consistency or self.consistency 

1960 return GCSFile( 

1961 self, 

1962 path, 

1963 mode, 

1964 block_size, 

1965 cache_options=cache_options, 

1966 consistency=const, 

1967 metadata=metadata, 

1968 acl=acl, 

1969 autocommit=autocommit, 

1970 fixed_key_metadata=fixed_key_metadata, 

1971 **kwargs, 

1972 ) 

1973 

1974 @classmethod 

1975 def _split_path(cls, path, version_aware=False): 

1976 """ 

1977 Normalise GCS path string into bucket and key. 

1978 

1979 Parameters 

1980 ---------- 

1981 path : string 

1982 Input path, like `gcs://mybucket/path/to/file`. 

1983 Path is of the form: '[gs|gcs://]bucket[/key][?querystring][#fragment]' 

1984 

1985 GCS allows object generation (object version) to be specified in either 

1986 the URL fragment or the `generation` query parameter. When provided, 

1987 the fragment will take priority over the `generation` query parameter. 

1988 

1989 Returns 

1990 ------- 

1991 (bucket, key, generation) tuple 

1992 """ 

1993 path = cls._strip_protocol(path).lstrip("/") 

1994 if "/" not in path: 

1995 return path, "", None 

1996 bucket, keypart = path.split("/", 1) 

1997 key = keypart 

1998 generation = None 

1999 if version_aware: 

2000 parts = urlsplit(keypart) 

2001 try: 

2002 if parts.fragment: 

2003 generation = parts.fragment 

2004 elif parts.query: 

2005 parsed = parse_qs(parts.query) 

2006 if "generation" in parsed: 

2007 generation = parsed["generation"][0] 

2008 # Sanity check whether this could be a valid generation ID. If 

2009 # it is not, assume that # or ? characters are supposed to be 

2010 # part of the object name. 

2011 if generation is not None: 

2012 int(generation) 

2013 key = parts.path 

2014 except ValueError: 

2015 generation = None 

2016 return ( 

2017 bucket, 

2018 key, 

2019 generation, 

2020 ) 

2021 

2022 def split_path(self, path): 

2023 return self._split_path(path, version_aware=self.version_aware) 

2024 

2025 def sign(self, path, expiration=100, **kwargs): 

2026 """Create a signed URL representing the given path. 

2027 

2028 Parameters 

2029 ---------- 

2030 path : str 

2031 The path on the filesystem 

2032 expiration : int 

2033 Number of seconds to enable the URL for 

2034 

2035 Returns 

2036 ------- 

2037 URL : str 

2038 The signed URL 

2039 """ 

2040 from google.cloud import storage 

2041 

2042 client = storage.Client( 

2043 credentials=self.credentials.credentials, 

2044 project=self.project, 

2045 ) 

2046 

2047 bucket, key, generation = self.split_path(path) 

2048 bucket = client.bucket(bucket) 

2049 blob = bucket.blob(key) 

2050 

2051 return blob.generate_signed_url( 

2052 expiration=timedelta(seconds=expiration), 

2053 generation=generation, 

2054 api_access_endpoint=self._endpoint, 

2055 **kwargs, 

2056 ) 

2057 

2058 

2059GoogleCredentials.load_tokens() 

2060 

2061 

2062class GCSFile(fsspec.spec.AbstractBufferedFile): 

2063 def __init__( 

2064 self, 

2065 gcsfs, 

2066 path, 

2067 mode="rb", 

2068 block_size=DEFAULT_BLOCK_SIZE, 

2069 autocommit=True, 

2070 cache_type="readahead", 

2071 cache_options=None, 

2072 acl=None, 

2073 consistency="md5", 

2074 metadata=None, 

2075 content_type=None, 

2076 timeout=None, 

2077 fixed_key_metadata=None, 

2078 generation=None, 

2079 kms_key_name=None, 

2080 **kwargs, 

2081 ): 

2082 """ 

2083 Open a file. 

2084 

2085 Parameters 

2086 ---------- 

2087 gcsfs: instance of GCSFileSystem 

2088 path: str 

2089 location in GCS, like 'bucket/path/to/file' 

2090 mode: str 

2091 Normal file modes. Currently only 'wb' amd 'rb'. 

2092 block_size: int 

2093 Buffer size for reading or writing 

2094 acl: str 

2095 ACL to apply, if any, one of ``ACLs``. New files are normally 

2096 "bucketownerfullcontrol", but a default can be configured per 

2097 bucket. 

2098 consistency: str, 'none', 'size', 'md5', 'crc32c' 

2099 Check for success in writing, applied at file close. 

2100 'size' ensures that the number of bytes reported by GCS matches 

2101 the number we wrote; 'md5' does a full checksum. Any value other 

2102 than 'size' or 'md5' or 'crc32c' is assumed to mean no checking. 

2103 content_type: str 

2104 default when unspecified is provided by mimetypes.guess_type or 

2105 otherwise `application/octet-stream`. See the list of available 

2106 content types at https://www.iana.org/assignments/media-types/media-types.txt 

2107 metadata: dict 

2108 Custom metadata, in key/value pairs, added at file creation 

2109 fixed_key_metadata: dict 

2110 Google metadata, in key/value pairs, supported keys: 

2111 - cache_control 

2112 - content_disposition 

2113 - content_encoding 

2114 - content_language 

2115 - custom_time 

2116 More info: 

2117 https://cloud.google.com/storage/docs/metadata#mutable 

2118 kms_key_name: str 

2119 Resource name of the Cloud KMS key that will be used to encrypt 

2120 the object. 

2121 More info: 

2122 https://cloud.google.com/storage/docs/encryption/customer-managed-keys 

2123 timeout: int 

2124 Timeout seconds for the asynchronous callback. 

2125 generation: str 

2126 Object generation. 

2127 """ 

2128 bucket, key, path_generation = gcsfs.split_path(path) 

2129 if not key: 

2130 raise OSError("Attempt to open a bucket") 

2131 self.generation = _coalesce_generation(generation, path_generation) 

2132 self.concurrency = kwargs.get("concurrency", DEFAULT_CONCURRENCY) 

2133 super().__init__( 

2134 gcsfs, 

2135 path, 

2136 mode, 

2137 block_size, 

2138 autocommit=autocommit, 

2139 cache_type=cache_type, 

2140 cache_options=cache_options, 

2141 **kwargs, 

2142 ) 

2143 self.gcsfs = gcsfs 

2144 self.bucket = bucket 

2145 self.key = key 

2146 self.acl = acl 

2147 self.consistency = consistency 

2148 self.checker = get_consistency_checker(consistency) 

2149 

2150 # Ideally, all of these fields should be part of `cache_options`. Because current 

2151 # `fsspec` caches do not accept arbitrary `*args` and `**kwargs`, passing them 

2152 # there currently causes instantiation errors. We are holding off on introducing 

2153 # them as explicit keyword arguments to ensure existing user workloads are not 

2154 # disrupted. This will be refactored once the upstream `fsspec` changes are merged. 

2155 use_prefetch_reader = kwargs.get( 

2156 "use_experimental_adaptive_prefetching", False 

2157 ) or os.environ.get( 

2158 "USE_EXPERIMENTAL_ADAPTIVE_PREFETCHING", "false" 

2159 ).lower() in ( 

2160 "true", 

2161 "1", 

2162 ) 

2163 

2164 if "r" in mode and use_prefetch_reader: 

2165 max_prefetch_size = kwargs.get("max_prefetch_size", MAX_PREFETCH_SIZE) 

2166 from .prefetcher import BackgroundPrefetcher 

2167 

2168 self._prefetch_engine = BackgroundPrefetcher( 

2169 self._async_fetch_range, 

2170 self.size, 

2171 max_prefetch_size=max_prefetch_size, 

2172 concurrency=self.concurrency, 

2173 ) 

2174 else: 

2175 self._prefetch_engine = None 

2176 

2177 # _supports_append is an internal argument not meant to be used directly. 

2178 # If True, allows opening file in append mode. This is generally not supported 

2179 # by GCS, but may be supported by subclasses (e.g. ZonalFile). This flag should 

2180 # be set by subclasses that support append operations. Otherwise, the mode 

2181 # will be overwritten to "wb" mode with a warning. 

2182 _supports_append = kwargs.pop("_supports_append", False) 

2183 if "a" in self.mode and not _supports_append: 

2184 warnings.warn( 

2185 "Append mode 'a' is not supported in GCS. Using overwrite mode instead." 

2186 ) 

2187 self.mode = self.mode.replace("a", "w") 

2188 

2189 if "r" in self.mode: 

2190 det = self.details 

2191 else: 

2192 det = {} 

2193 self.content_type = content_type or det.get( 

2194 "contentType", 

2195 mimetypes.guess_type(self.path)[0] or "application/octet-stream", 

2196 ) 

2197 self.metadata = metadata or det.get("metadata", {}) 

2198 self.fixed_key_metadata = _convert_fixed_key_metadata(det, from_google=True) 

2199 self.fixed_key_metadata.update(fixed_key_metadata or {}) 

2200 self.kms_key_name = kms_key_name 

2201 self.timeout = timeout 

2202 if mode in {"wb", "xb"}: 

2203 if self.blocksize < GCS_MIN_BLOCK_SIZE: 

2204 warnings.warn("Setting block size to minimum value, 2**18") 

2205 self.blocksize = GCS_MIN_BLOCK_SIZE 

2206 self.location = None 

2207 

2208 @property 

2209 def details(self): 

2210 if self._details is None: 

2211 self._details = self.fs.info(self.path, generation=self.generation) 

2212 return self._details 

2213 

2214 def info(self): 

2215 """File information about this path""" 

2216 return self.details 

2217 

2218 def url(self): 

2219 """HTTP link to this file's data""" 

2220 return self.fs.url(self.path) 

2221 

2222 def _upload_chunk(self, final=False): 

2223 """Write one part of a multi-block file upload 

2224 

2225 Parameters 

2226 ---------- 

2227 final: bool 

2228 Complete and commit upload 

2229 """ 

2230 while True: 

2231 # shortfall splits blocks bigger than max allowed upload 

2232 data = self.buffer.getvalue() 

2233 head = {} 

2234 l = len(data) 

2235 

2236 if (l < GCS_MIN_BLOCK_SIZE) and (not final or not self.autocommit): 

2237 # either flush() was called, but we don't have enough to 

2238 # push, or we split a big upload, and have less left than one 

2239 # block. If this is the final part, OK to violate those 

2240 # terms. 

2241 return False 

2242 

2243 # Select the biggest possible chunk of data to be uploaded 

2244 chunk_length = min(l, GCS_MAX_BLOCK_SIZE) 

2245 chunk = data[:chunk_length] 

2246 if final and self.autocommit and chunk_length == l: 

2247 if l: 

2248 # last chunk 

2249 head["Content-Range"] = "bytes %i-%i/%i" % ( 

2250 self.offset, 

2251 self.offset + chunk_length - 1, 

2252 self.offset + l, 

2253 ) 

2254 else: 

2255 # closing when buffer is empty 

2256 head["Content-Range"] = "bytes */%i" % self.offset 

2257 data = None 

2258 else: 

2259 head["Content-Range"] = "bytes %i-%i/*" % ( 

2260 self.offset, 

2261 self.offset + chunk_length - 1, 

2262 ) 

2263 head.update( 

2264 {"Content-Type": self.content_type, "Content-Length": str(chunk_length)} 

2265 ) 

2266 headers, contents = self.gcsfs.call( 

2267 "POST", self.location, headers=head, data=chunk 

2268 ) 

2269 if "Range" in headers: 

2270 end = int(headers["Range"].split("-")[1]) 

2271 shortfall = (self.offset + l - 1) - end 

2272 if shortfall > 0: 

2273 self.checker.update(data[:-shortfall]) 

2274 self.buffer = UnclosableBytesIO(data[-shortfall:]) 

2275 self.buffer.seek(shortfall) 

2276 self.offset += l - shortfall 

2277 continue 

2278 else: 

2279 self.checker.update(data) 

2280 if final and contents: 

2281 j = json.loads(contents) 

2282 self.generation = j.get("generation") 

2283 else: 

2284 assert final, "Response looks like upload is over" 

2285 if l: 

2286 j = json.loads(contents) 

2287 self.checker.update(data) 

2288 self.checker.validate_json_response(j) 

2289 self.generation = j.get("generation") 

2290 # Clear buffer and update offset when all is received 

2291 self.buffer = UnclosableBytesIO() 

2292 self.offset += l 

2293 break 

2294 return True 

2295 

2296 def commit(self): 

2297 """If not auto-committing, finalize file""" 

2298 self.autocommit = True 

2299 self._upload_chunk(final=True) 

2300 

2301 def _initiate_upload(self): 

2302 """Create multi-upload""" 

2303 self.location = asyn.sync( 

2304 self.gcsfs.loop, 

2305 initiate_upload, 

2306 self.gcsfs, 

2307 self.bucket, 

2308 self.key, 

2309 self.content_type, 

2310 self.metadata, 

2311 self.fixed_key_metadata, 

2312 mode="create" if "x" in self.mode else "overwrite", 

2313 kms_key_name=self.kms_key_name, 

2314 timeout=self.timeout, 

2315 ) 

2316 

2317 def discard(self): 

2318 """Cancel in-progress multi-upload 

2319 

2320 Should only happen during discarding this write-mode file 

2321 """ 

2322 if self.location is None: 

2323 return 

2324 self.gcsfs.call( 

2325 "DELETE", 

2326 self.location.replace("&ifGenerationMatch=0", ""), 

2327 ) 

2328 self.location = None 

2329 self.closed = True 

2330 

2331 def _simple_upload(self): 

2332 """One-shot upload, less than 5MB""" 

2333 self.buffer.seek(0) 

2334 data = self.buffer.read() 

2335 j = asyn.sync( 

2336 self.gcsfs.loop, 

2337 simple_upload, 

2338 self.gcsfs, 

2339 self.bucket, 

2340 self.key, 

2341 data, 

2342 self.metadata, 

2343 self.consistency, 

2344 self.content_type, 

2345 self.fixed_key_metadata, 

2346 mode="create" if "x" in self.mode else "overwrite", 

2347 kms_key_name=self.kms_key_name, 

2348 timeout=self.timeout, 

2349 ) 

2350 self.generation = j.get("generation") 

2351 

2352 def _fetch_range(self, start=None, end=None): 

2353 """Get data from GCS 

2354 

2355 start, end : None or integers 

2356 if not both None, fetch only given range 

2357 """ 

2358 try: 

2359 if hasattr(self, "_prefetch_engine") and self._prefetch_engine: 

2360 return self._prefetch_engine._fetch(start=start, end=end) 

2361 return self.fs.cat_file( 

2362 self.path, start=start, end=end, concurrency=self.concurrency 

2363 ) 

2364 except RuntimeError as e: 

2365 if "not satisfiable" in str(e): 

2366 return b"" 

2367 raise 

2368 

2369 async def _async_fetch_range(self, start_offset, total_size, split_factor=1): 

2370 """Async fetcher mapped to the Prefetcher engine for regional buckets.""" 

2371 return await self.gcsfs._cat_file_concurrent( 

2372 self.path, 

2373 start=start_offset, 

2374 end=start_offset + total_size, 

2375 concurrency=split_factor, 

2376 ) 

2377 

2378 def close(self): 

2379 super().close() 

2380 if hasattr(self, "_prefetch_engine") and self._prefetch_engine: 

2381 self._prefetch_engine.close() 

2382 

2383 

2384def _convert_fixed_key_metadata(metadata, *, from_google=False): 

2385 """ 

2386 Convert fixed_key_metadata to/from GCS format. 

2387 

2388 Google uses camelCase for its parameters, this function transform 

2389 exposed fixed_key_metadata (snake_case) to or from GCS(google) format 

2390 

2391 Parameters 

2392 ---------- 

2393 metadata: dict 

2394 A key value pair of fixed_key_metadata, key can be either 

2395 camel case or snake case. 

2396 from_google: bool 

2397 True means that the metadata come from google and thus should be converted 

2398 to snake_case 

2399 """ 

2400 out = {} 

2401 if metadata is None: 

2402 return out 

2403 

2404 for key, attribute_name in SUPPORTED_FIXED_KEY_METADATA.items(): 

2405 src = key if not from_google else attribute_name 

2406 dst = attribute_name if not from_google else key 

2407 if src in metadata: 

2408 out[dst] = metadata[src] 

2409 return out 

2410 

2411 

2412async def upload_chunk(fs, location, data, offset, size, content_type): 

2413 """ 

2414 Uploads a chunk of data. This function has a conditional path to support 

2415 experimental features for Zonal buckets to append data using gRPC. 

2416 """ 

2417 from google.cloud.storage.asyncio.async_appendable_object_writer import ( 

2418 AsyncAppendableObjectWriter, 

2419 ) 

2420 

2421 from .extended_gcsfs import ExtendedGcsFileSystem 

2422 from .extended_gcsfs import upload_chunk as ext_upload_chunk 

2423 

2424 # location is AsyncAppendableObjectWriter only when ExtendedGcsFileSystem is used 

2425 if isinstance(fs, ExtendedGcsFileSystem) and isinstance( 

2426 location, AsyncAppendableObjectWriter 

2427 ): 

2428 

2429 return await ext_upload_chunk(fs, location, data, offset, size, content_type) 

2430 head = {} 

2431 l = len(data) 

2432 range = "bytes %i-%i/%i" % (offset, offset + l - 1, size) 

2433 head["Content-Range"] = range 

2434 head.update({"Content-Type": content_type, "Content-Length": str(l)}) 

2435 headers, txt = await fs._call( 

2436 "POST", location, headers=head, data=UnclosableBytesIO(data) 

2437 ) 

2438 if "Range" in headers: 

2439 end = int(headers["Range"].split("-")[1]) 

2440 shortfall = (offset + l - 1) - end 

2441 if shortfall: 

2442 return await upload_chunk( 

2443 fs, location, data[-shortfall:], end, size, content_type 

2444 ) 

2445 return json.loads(txt) if txt else None 

2446 

2447 

2448async def initiate_upload( 

2449 fs, 

2450 bucket, 

2451 key, 

2452 content_type="application/octet-stream", 

2453 metadata=None, 

2454 fixed_key_metadata=None, 

2455 mode="overwrite", 

2456 kms_key_name=None, 

2457): 

2458 """ 

2459 Initiates a resumable upload. This function has a conditional path to support 

2460 experimental features for Zonal buckets to append data using gRPC, returning an 

2461 "AsyncAppendableObjectWriter" instance as location. 

2462 """ 

2463 from .extended_gcsfs import ExtendedGcsFileSystem 

2464 from .extended_gcsfs import initiate_upload as ext_initiate_upload 

2465 

2466 # Explicit type checking is used to ensure only the ExtendedGcsFileSystem 

2467 # enters this path, ruling out false positives from mocks or coincidentally matching attributes. 

2468 if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket): 

2469 

2470 return await ext_initiate_upload( 

2471 fs, 

2472 bucket, 

2473 key, 

2474 content_type, 

2475 metadata, 

2476 fixed_key_metadata, 

2477 mode, 

2478 kms_key_name, 

2479 ) 

2480 

2481 j = {"name": key} 

2482 if metadata: 

2483 j["metadata"] = metadata 

2484 kw = {"ifGenerationMatch": "0"} if mode == "create" else {} 

2485 if kms_key_name: 

2486 kw["kmsKeyName"] = kms_key_name 

2487 j.update(_convert_fixed_key_metadata(fixed_key_metadata)) 

2488 headers, _ = await fs._call( 

2489 method="POST", 

2490 path=f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o?name={quote(key)}", 

2491 uploadType="resumable", 

2492 json=j, 

2493 headers={"X-Upload-Content-Type": content_type}, 

2494 **kw, 

2495 ) 

2496 loc = headers["Location"] 

2497 out = loc[0] if isinstance(loc, list) else loc # <- for CVR responses 

2498 if len(str(loc)) < 20: 

2499 logger.error("Location failed: %s" % headers) 

2500 return out 

2501 

2502 

2503async def simple_upload( 

2504 fs, 

2505 bucket, 

2506 key, 

2507 datain, 

2508 metadatain=None, 

2509 consistency=None, 

2510 content_type="application/octet-stream", 

2511 fixed_key_metadata=None, 

2512 mode="overwrite", 

2513 kms_key_name=None, 

2514): 

2515 """ 

2516 Performs a simple, single-request upload. This function has a conditional path to support 

2517 experimental features for Zonal buckets to upload data using gRPC. 

2518 """ 

2519 from .extended_gcsfs import ExtendedGcsFileSystem 

2520 from .extended_gcsfs import simple_upload as ext_simple_upload 

2521 

2522 # Explicit type checking is used to ensure only the ExtendedGcsFileSystem 

2523 # enters this path, ruling out false positives from mocks or coincidentally matching attributes. 

2524 if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket): 

2525 

2526 return await ext_simple_upload( 

2527 fs, 

2528 bucket, 

2529 key, 

2530 datain, 

2531 metadatain, 

2532 consistency, 

2533 content_type, 

2534 fixed_key_metadata, 

2535 mode, 

2536 kms_key_name, 

2537 ) 

2538 

2539 checker = get_consistency_checker(consistency) 

2540 path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o" 

2541 metadata = {"name": key} 

2542 if metadatain is not None: 

2543 metadata["metadata"] = metadatain 

2544 kw = {"ifGenerationMatch": "0"} if mode == "create" else {} 

2545 if kms_key_name: 

2546 kw["kmsKeyName"] = kms_key_name 

2547 metadata.update(_convert_fixed_key_metadata(fixed_key_metadata)) 

2548 metadata = json.dumps(metadata) 

2549 template = ( 

2550 "--==0==" 

2551 "\nContent-Type: application/json; charset=UTF-8" 

2552 "\n\n" + metadata + "\n--==0==" + f"\nContent-Type: {content_type}" + "\n\n" 

2553 ) 

2554 

2555 data = template.encode() + datain + b"\n--==0==--" 

2556 j = await fs._call( 

2557 "POST", 

2558 path, 

2559 uploadType="multipart", 

2560 headers={"Content-Type": 'multipart/related; boundary="==0=="'}, 

2561 data=UnclosableBytesIO(data), 

2562 json_out=True, 

2563 **kw, 

2564 ) 

2565 checker.update(datain) 

2566 checker.validate_json_response(j) 

2567 return j