Coverage for gcsfs/core.py: 80%
1024 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1"""
2Google Cloud Storage pythonic interface
3"""
5import asyncio
6import io
7import json
8import logging
9import mimetypes
10import os
11import posixpath
12import re
13import uuid
14import warnings
15import weakref
16from datetime import datetime, timedelta
17from glob import has_magic
18from urllib.parse import parse_qs
19from urllib.parse import quote as quote_urllib
20from urllib.parse import urlsplit
22import aiohttp
23import fsspec
24from fsspec import asyn
25from fsspec.callbacks import NoOpCallback
26from fsspec.implementations.http import get_client
27from fsspec.utils import other_paths, setup_logging, stringify_path
29from . import __version__ as version
30from .checkers import get_consistency_checker
31from .concurrency import parallel_tasks_first_completed
32from .credentials import GoogleCredentials
33from .inventory_report import InventoryReport
34from .retry import errs, retry_request, validate_response
35from .zb_hns_utils import DEFAULT_CONCURRENCY, MAX_PREFETCH_SIZE
37logger = logging.getLogger("gcsfs")
40if "GCSFS_DEBUG" in os.environ:
41 setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG"))
44# client created 2018-01-16
45ACLs = {
46 "authenticatedread",
47 "bucketownerfullcontrol",
48 "bucketownerread",
49 "private",
50 "projectprivate",
51 "publicread",
52}
53bACLs = {
54 "authenticatedRead",
55 "private",
56 "projectPrivate",
57 "publicRead",
58 "publicReadWrite",
59}
60DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "")
62GCS_MIN_BLOCK_SIZE = 2**18
63GCS_MAX_BLOCK_SIZE = 2**28
64DEFAULT_BLOCK_SIZE = 5 * 2**20
66SUPPORTED_FIXED_KEY_METADATA = {
67 "content_encoding": "contentEncoding",
68 "cache_control": "cacheControl",
69 "content_disposition": "contentDisposition",
70 "content_language": "contentLanguage",
71 "custom_time": "customTime",
72}
74# Define allowed parameters for the GCS list API
75_VALID_LIST_PARAMS = {
76 "delimiter",
77 "prefix",
78 "startOffset",
79 "endOffset",
80 "maxResults",
81 "versions",
82 "pageToken",
83 "includeFoldersAsPrefixes",
84}
87def quote(s):
88 """
89 Quote characters to be safe for URL paths.
90 Also quotes '/'.
92 Parameters
93 ----------
94 s: input URL/portion
96 Returns
97 -------
98 corrected URL
99 """
100 # Encode everything, including slashes
101 return quote_urllib(s, safe="")
104def norm_path(path):
105 """
106 Canonicalize path to '{bucket}/{name}' form.
108 Used by petastorm, do not remove.
109 """
110 bucket, name, _ = GCSFileSystem._split_path(path)
111 return "/".join((bucket, name))
114async def _req_to_text(r):
115 async with r:
116 return (await r.read()).decode()
119class UnclosableBytesIO(io.BytesIO):
120 """Prevent closing BytesIO to avoid errors during retries."""
122 def close(self):
123 """Reset stream position for next retry."""
124 self.seek(0)
127def _gcp_universe_domain():
128 return os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN", "googleapis.com")
131def _location():
132 """
133 Resolves GCS HTTP location as http[s]://host
135 Enables storage emulation for integration tests.
137 Returns
138 -------
139 valid http location
140 """
141 _emulator_location = os.getenv("STORAGE_EMULATOR_HOST", "")
142 if _emulator_location not in {"default", "", None}:
143 if not any(
144 _emulator_location.startswith(scheme) for scheme in ("http://", "https://")
145 ):
146 _emulator_location = f"http://{_emulator_location}"
147 return _emulator_location
149 return f"https://storage.{_gcp_universe_domain()}"
152def _chunks(lst, n):
153 """
154 Yield evenly-sized chunks from a list.
156 Implementation based on https://stackoverflow.com/a/312464.
157 """
158 for i in range(0, len(lst), n):
159 yield lst[i : i + n]
162def _coalesce_generation(*args):
163 """Helper to coalesce a list of object generations down to one."""
164 generations = set(args)
165 if None in generations:
166 generations.remove(None)
167 if len(generations) > 1:
168 raise ValueError(
169 "Cannot coalesce generations where more than one are defined,"
170 f" {generations}"
171 )
172 elif len(generations) == 0:
173 return None
174 else:
175 return generations.pop()
178def _is_directory_marker(entry):
179 return entry["size"] == 0 and entry["name"].endswith("/")
182class GCSFileSystem(asyn.AsyncFileSystem):
183 r"""
184 Connect to Google Cloud Storage.
186 The following modes of authentication are supported:
188 - ``token=None``, GCSFS will attempt to guess your credentials in the
189 following order: gcloud CLI default, gcsfs cached token, google compute
190 metadata service, anonymous.
191 - ``token='google_default'``, your default gcloud credentials will be used,
192 which are typically established by doing ``gcloud login`` in a terminal.
193 - ``token='cache'``, credentials from previously successful gcsfs
194 authentication will be used (use this after "browser" auth succeeded)
195 - ``token='anon'``, no authentication is performed, and you can only
196 access data which is accessible to allUsers (in this case, the project and
197 access level parameters are meaningless)
198 - ``token='browser'``, you get an access code with which you can
199 authenticate via a specially provided URL
200 - if ``token='cloud'``, we assume we are running within google compute
201 or google container engine, and query the internal metadata directly for
202 a token.
203 - you may supply a token generated by the
204 [gcloud](https://cloud.google.com/sdk/docs/)
205 utility; this is either a python dictionary, the name of a file
206 containing the JSON returned by logging in with the gcloud CLI tool,
207 or a Credentials object. gcloud typically stores its tokens in locations
208 such as
209 ``~/.config/gcloud/application_default_credentials.json``,
210 ``~/.config/gcloud/credentials``, or
211 ``~\AppData\Roaming\gcloud\credentials``, etc.
213 Specific methods, (eg. ``ls``, ``info``, ...) may return object details from GCS.
214 These detailed listings include the
215 [object resource](https://cloud.google.com/storage/docs/json_api/v1/objects#resource)
217 GCS *does not* include "directory" objects but instead generates
218 directories by splitting
219 [object names](https://cloud.google.com/storage/docs/key-terms).
220 This means that, for example,
221 a directory does not need to exist for an object to be created within it.
222 Creating an object implicitly creates it's parent directories, and removing
223 all objects from a directory implicitly deletes the empty directory.
225 `GCSFileSystem` generates listing entries for these implied directories in
226 listing apis with the object properties:
228 - "name" : string
229 The "{bucket}/{name}" path of the dir, used in calls to
230 GCSFileSystem or GCSFile.
231 - "bucket" : string
232 The name of the bucket containing this object.
233 - "kind" : 'storage#object'
234 - "size" : 0
235 - "storageClass" : 'DIRECTORY'
236 - type: 'directory' (fsspec compat)
238 GCSFileSystem maintains a per-implied-directory cache of object listings and
239 fulfills all object information and listing requests from cache. This implied, for example, that objects
240 created via other processes *will not* be visible to the GCSFileSystem until the cache
241 refreshed. Calls to GCSFileSystem.open and calls to GCSFile are not affected by this cache.
243 Note that directory listings are cached by default, because fetching those listings can be expensive. This is
244 contrary to local filesystem behaviour. The cache will be cleared if writing from this instance, but it can
245 become stale and return incorrect results if the storage is written to from another process/machine.
246 If you anticipate this possibility, you can set the use_listings_cache and listings_expiry_time arguments
247 to configure the caching, call `.invalidate_cache()` when required, or pass `refresh=True` to the
248 various listing methods.
250 In the default case the cache is never expired. This may be controlled via the ``cache_timeout``
251 GCSFileSystem parameter or via explicit calls to ``GCSFileSystem.invalidate_cache``.
253 NOTE on "exclusive" mode: mode=="create"" (in pipe and put) and open(mode="xb") are supported on an
254 experimental basis. The test harness does not currently support this, so use at your
255 own risk.
257 Parameters
258 ----------
259 project : string
260 project_id to work under. Note that this is not the same as, but often
261 very similar to, the project name.
262 This is required in order
263 to list all the buckets you have access to within a project and to
264 create/delete buckets, or update their access policies.
265 If ``token='google_default'``, the value is overridden by the default,
266 if ``token='anon'``, the value is ignored.
267 access : one of {'read_only', 'read_write', 'full_control'}
268 Full control implies read/write as well as modifying metadata,
269 e.g., access control.
270 token: None, dict or string
271 (see description of authentication methods, above)
272 consistency: 'none', 'size', 'md5'
273 Check method when writing files. Can be overridden in open().
274 cache_timeout: float, seconds
275 Cache expiration time in seconds for object metadata cache.
276 Set cache_timeout <= 0 for no caching, None for no cache expiration.
277 secure_serialize: bool (deprecated)
278 requester_pays : bool, or str default False
279 Whether to use requester-pays requests. This will include your
280 project ID `project` in requests as the `userProject`, and you'll be
281 billed for accessing data from requester-pays buckets. Optionally,
282 pass a project-id here as a string to use that as the `userProject`.
283 session_kwargs: dict
284 passed on to ``aiohttp.ClientSession``; can contain, for example,
285 proxy settings.
286 endpoint_url: str
287 If given, use this URL (format protocol://host:port , *without* any
288 path part) for communication. If not given, defaults to the value
289 of environment variable "STORAGE_EMULATOR_HOST"; if that is not set
290 either, will use the standard Google endpoint.
291 default_location: str
292 Default location where buckets are created, like 'US' or 'EUROPE-WEST3'.
293 You can find a list of all available locations here:
294 https://cloud.google.com/storage/docs/locations#available-locations
295 version_aware: bool
296 Whether to support object versioning. If enabled this will require the
297 user to have the necessary permissions for dealing with versioned objects.
298 """
300 scopes = {"read_only", "read_write", "full_control"}
301 retries = 6 # number of retries on http failure
302 default_block_size = DEFAULT_BLOCK_SIZE
303 protocol = "gs", "gcs"
304 async_impl = True
305 MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024
307 def __init__(
308 self,
309 project=DEFAULT_PROJECT,
310 access="full_control",
311 token=None,
312 block_size=None,
313 consistency="none",
314 cache_timeout=None,
315 secure_serialize=True,
316 check_connection=None,
317 requests_timeout=None,
318 requester_pays=False,
319 asynchronous=False,
320 session_kwargs=None,
321 loop=None,
322 timeout=None,
323 endpoint_url=None,
324 default_location=None,
325 version_aware=False,
326 **kwargs,
327 ):
328 if cache_timeout is not None:
329 kwargs["listings_expiry_time"] = cache_timeout
330 super().__init__(
331 self,
332 asynchronous=asynchronous,
333 loop=loop,
334 **kwargs,
335 )
336 if access not in self.scopes:
337 raise ValueError("access must be one of {}", self.scopes)
338 if project is None:
339 warnings.warn("GCS project not set - cannot list or create buckets")
340 if block_size is not None:
341 self.default_block_size = block_size
342 self.requester_pays = requester_pays
343 self.consistency = consistency
344 self.cache_timeout = cache_timeout or kwargs.pop("listings_expiry_time", None)
345 self.requests_timeout = requests_timeout
346 self.timeout = timeout
347 self._session = None
348 self._endpoint = endpoint_url
349 self.session_kwargs = session_kwargs or {}
350 self.default_location = default_location
351 self.version_aware = version_aware
353 if check_connection:
354 warnings.warn(
355 "The `check_connection` argument is deprecated and will be removed in a future release.",
356 DeprecationWarning,
357 )
359 self.credentials = GoogleCredentials(
360 project, access, token, on_google=self.on_google
361 )
363 @property
364 def _location(self):
365 return self._endpoint or _location()
367 @property
368 def base(self):
369 return f"{self._location}/storage/v1/"
371 @property
372 def batch_url_base(self):
373 return f"{self._location}/batch/storage/v1"
375 @property
376 def project(self):
377 return self.credentials.project
379 # Clean up the aiohttp session
380 #
381 # This can run from the main thread if invoked via the weakref callback.
382 # This can happen even if the `loop` parameter belongs to another thread
383 # (e.g. the fsspec IO worker). The control flow here is intended to attempt
384 # in-thread asynchronous cleanup first, then fallback to synchronous
385 # cleanup (which can handle cross-thread calls).
386 @staticmethod
387 def close_session(loop, session: aiohttp.ClientSession, asynchronous=False):
388 if session.closed:
389 return
390 force_close = False
391 try:
392 current_loop = asyncio.get_running_loop()
393 except RuntimeError:
394 current_loop = None
395 if loop:
396 # an explicit loop was set
397 if loop.is_running():
398 loop.create_task(session.close())
399 else:
400 force_close = True
401 elif current_loop is not None and current_loop.is_running() and asynchronous:
402 # running in a concurrnet context
403 current_loop.create_task(session.close())
404 elif asyn.loop[0] is not None and asyn.loop[0].is_running():
405 try:
406 asyn.sync(asyn.loop[0], session.close, timeout=0.1)
407 except fsspec.FSTimeoutError:
408 force_close = True
409 else:
410 force_close = True
411 if force_close:
412 # during shutdown, this is the fallback
413 connector = getattr(session, "_connector", None)
414 if connector is not None:
415 # close after loop is dead
416 connector._close()
418 async def _set_session(self):
419 if self._session is None:
420 self._session = await get_client(**self.session_kwargs)
421 weakref.finalize(
422 self, self.close_session, self.loop, self._session, self.asynchronous
423 )
424 return self._session
426 @property
427 def session(self):
428 if self.asynchronous and self._session is None:
429 raise RuntimeError("Please await _connect* before anything else")
430 return self._session
432 @classmethod
433 def _strip_protocol(cls, path):
434 if isinstance(path, list):
435 return [cls._strip_protocol(p) for p in path]
436 path = stringify_path(path)
437 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
438 for protocol in protos:
439 if path.startswith(protocol + "://"):
440 path = path[len(protocol) + 3 :]
441 elif path.startswith(protocol + "::"):
442 path = path[len(protocol) + 2 :]
443 # use of root_marker to make minimum required path, e.g., "/"
444 return path or cls.root_marker
446 @classmethod
447 def _get_kwargs_from_urls(cls, path):
448 _, _, generation = cls._split_path(path, version_aware=True)
449 if generation is not None:
450 return {"version_aware": True}
451 return {}
453 def _get_params(self, kwargs):
454 params = {k: v for k, v in kwargs.items() if v is not None}
455 # needed for requester pays buckets
456 if self.requester_pays:
457 if isinstance(self.requester_pays, str):
458 user_project = self.requester_pays
459 else:
460 user_project = self.project
461 params["userProject"] = user_project
462 return params
464 def _get_headers(self, headers):
465 out = {}
466 if headers is not None:
467 out.update(headers)
468 if "User-Agent" not in out:
469 out["User-Agent"] = "python-gcsfs/" + version
470 self.credentials.apply(out)
471 return out
473 def _format_path(self, path, args):
474 if not path.startswith("http"):
475 path = self.base + path
477 if args:
478 path = path.format(*[quote(p) for p in args])
479 return path
481 @retry_request(retries=retries)
482 async def _request(
483 self, method, path, *args, headers=None, json=None, data=None, **kwargs
484 ):
485 await self._set_session()
486 if hasattr(data, "seek"):
487 data.seek(0)
488 async with self.session.request(
489 method=method,
490 url=self._format_path(path, args),
491 params=self._get_params(kwargs),
492 json=json,
493 headers=self._get_headers(headers),
494 data=data,
495 timeout=self.requests_timeout,
496 ) as r:
497 status = r.status
498 headers = r.headers
499 info = r.request_info # for debug only
500 contents = await r.read()
502 validate_response(status, contents, path, args)
503 return status, headers, info, contents
505 async def _call(
506 self, method, path, *args, json_out=False, info_out=False, **kwargs
507 ):
508 logger.debug(f"{method.upper()}: {path}, {args}, {kwargs.get('headers')}")
509 status, headers, info, contents = await self._request(
510 method, path, *args, **kwargs
511 )
512 if json_out:
513 return json.loads(contents)
514 elif info_out:
515 return info
516 else:
517 return headers, contents
519 call = asyn.sync_wrapper(_call)
521 @property
522 def buckets(self):
523 """Return list of available project buckets."""
524 return [
525 b["name"]
526 for b in asyn.sync(self.loop, self._list_buckets, timeout=self.timeout)
527 ]
529 def _process_object(self, bucket, object_metadata):
530 """Process object resource into gcsfs object information format.
532 Process GCS object resource via type casting and attribute updates to
533 the cache-able gcsfs object information format. Returns an updated copy
534 of the object resource.
536 (See https://cloud.google.com/storage/docs/json_api/v1/objects#resource)
537 """
538 result = dict(object_metadata)
539 result["size"] = int(object_metadata.get("size", 0))
540 result["name"] = posixpath.join(bucket, object_metadata["name"])
541 result["type"] = "file"
542 # Translate time metadata from GCS names to fsspec standard names.
543 # TODO(issues/559): Remove legacy names `updated` and `timeCreated`?
544 if "updated" in object_metadata:
545 result["mtime"] = self._parse_timestamp(object_metadata["updated"])
546 if "timeCreated" in object_metadata:
547 result["ctime"] = self._parse_timestamp(object_metadata["timeCreated"])
548 if "generation" in object_metadata or "metageneration" in object_metadata:
549 result["generation"] = object_metadata.get("generation")
550 result["metageneration"] = object_metadata.get("metageneration")
552 return result
554 async def _make_bucket_requester_pays(self, path, state=True):
555 # this is really some form of setACL/chmod
556 # perhaps should be automatic if gcs.requester_pays
557 json = {"billing": {"requesterPays": state}}
558 await self._call("PATCH", f"b/{path}", json=json)
560 make_bucket_requester_pays = asyn.sync_wrapper(_make_bucket_requester_pays)
562 async def _get_object(self, path):
563 """Return object information at the given path."""
564 bucket, key, generation = self.split_path(path)
566 # Check if parent dir is in listing cache
567 listing = self._ls_from_cache(path)
568 if listing:
569 name = "/".join((bucket, key))
570 for file_details in listing:
571 if (
572 file_details["type"] == "file"
573 and file_details["name"] == name
574 and (not generation or file_details.get("generation") == generation)
575 ):
576 return file_details
577 else:
578 raise FileNotFoundError(path)
580 if not key:
581 # Attempt to "get" the bucket root, return error instead of
582 # listing.
583 raise FileNotFoundError(path)
585 res = None
586 # Work around various permission settings. Prefer an object get (storage.objects.get), but
587 # fall back to a bucket list + filter to object name (storage.objects.list).
588 try:
589 res = await self._call(
590 "GET", "b/{}/o/{}", bucket, key, json_out=True, generation=generation
591 )
592 except OSError as e:
593 if not str(e).startswith("Forbidden"):
594 raise
595 resp = await self._call(
596 "GET",
597 "b/{}/o",
598 bucket,
599 json_out=True,
600 prefix=key,
601 maxResults=1 if not generation else None,
602 versions="true" if generation else None,
603 )
604 for item in resp.get("items", []):
605 if item["name"] == key and (
606 not generation or item.get("generation") == generation
607 ):
608 res = item
609 break
610 if res is None:
611 raise FileNotFoundError(path)
612 return self._process_object(bucket, res)
614 async def _list_objects(self, path, prefix="", versions=False, **kwargs):
615 bucket, key, generation = self.split_path(path)
616 path = path.rstrip("/")
618 # NOTE: the inventory report logic is experimental.
619 inventory_report_info = kwargs.get("inventory_report_info", None)
621 # Only attempt to list from the cache when the user does not use
622 # the inventory report service.
623 if not inventory_report_info:
624 try:
625 clisting = self._ls_from_cache(path)
626 hassubdirs = clisting and any(
627 c["name"].rstrip("/") == path and c["type"] == "directory"
628 for c in clisting
629 )
630 if clisting and not hassubdirs:
631 return clisting
632 except FileNotFoundError:
633 # not finding a bucket in list of "my" buckets is OK
634 if key:
635 raise
637 items, prefixes = await self._do_list_objects(
638 path,
639 prefix=prefix,
640 versions=versions,
641 **kwargs,
642 )
644 pseudodirs = [
645 {
646 "bucket": bucket,
647 "name": bucket + "/" + prefix.strip("/"),
648 "size": 0,
649 "storageClass": "DIRECTORY",
650 "type": "directory",
651 }
652 for prefix in prefixes
653 ]
654 if not (items + pseudodirs):
655 if key:
656 return [await self._get_object(path)]
657 else:
658 return []
659 out = pseudodirs + items
661 use_snapshot_listing = inventory_report_info and inventory_report_info.get(
662 "use_snapshot_listing"
663 )
665 max_results = kwargs.get("max_results")
667 # Don't cache prefixed/partial listings, in addition to
668 # not using the inventory report service to do listing directly.
669 if not prefix and not use_snapshot_listing and not max_results:
670 self.dircache[path] = out
671 return out
673 async def _do_list_objects(
674 self,
675 path,
676 max_results=None,
677 delimiter="/",
678 prefix="",
679 versions=False,
680 **kwargs,
681 ):
682 """Object listing for the given {bucket}/{prefix}/ path."""
683 bucket, _path, generation = self.split_path(path)
684 _path = "" if not _path else _path.rstrip("/") + "/"
685 prefix = f"{_path}{prefix}" or None
686 versions = bool(versions or generation)
688 # Page size of 5000 is officially supported across GCS.
689 default_page_size = 5000
691 # NOTE: the inventory report logic is experimental.
692 inventory_report_info = kwargs.get("inventory_report_info", None)
694 # Check if the user has configured inventory report option.
695 if inventory_report_info is not None:
696 items, prefixes = await InventoryReport.fetch_snapshot(
697 gcs_file_system=self,
698 inventory_report_info=inventory_report_info,
699 prefix=prefix,
700 )
702 use_snapshot_listing = inventory_report_info.get("use_snapshot_listing")
704 # If the user wants to rely on the snapshot from the inventory report
705 # for listing, directly return the results.
706 if use_snapshot_listing:
707 return items, prefixes
709 # Otherwise, use the snapshot to initiate concurrent listing.
710 return await self._concurrent_list_objects_helper(
711 items=items,
712 bucket=bucket,
713 delimiter=delimiter,
714 prefix=prefix,
715 versions=versions,
716 page_size=default_page_size,
717 **kwargs,
718 )
720 # If the user has not configured inventory report, proceed to use
721 # sequential listing.
722 else:
723 return await self._sequential_list_objects_helper(
724 bucket=bucket,
725 delimiter=delimiter,
726 start_offset=None,
727 end_offset=None,
728 prefix=prefix,
729 versions=versions,
730 max_results=max_results,
731 **kwargs,
732 )
734 async def _concurrent_list_objects_helper(
735 self, items, bucket, delimiter, prefix, versions, page_size, **kwargs
736 ):
737 """
738 Lists objects using coroutines, using the object names from the inventory
739 report to split up the ranges.
740 """
742 # Extract out the names of the objects fetched from the inventory report.
743 snapshot_object_names = sorted([item["name"] for item in items])
745 # Determine the number of coroutines needed to concurrent listing.
746 # Ideally, want each coroutine to fetch a single page of objects.
747 num_coroutines = len(snapshot_object_names) // page_size + 1
748 num_objects_per_coroutine = len(snapshot_object_names) // num_coroutines
750 start_offsets = []
751 end_offsets = []
753 # Calculate the split splits of each coroutine (start offset and end offset).
754 for i in range(num_coroutines):
755 range_start = i * num_objects_per_coroutine
756 if i == num_coroutines - 1:
757 range_end = len(snapshot_object_names)
758 else:
759 range_end = range_start + num_objects_per_coroutine
761 if range_start == 0:
762 prefix_start = None
763 else:
764 prefix_start = snapshot_object_names[range_start]
766 if range_end == len(snapshot_object_names):
767 prefix_end = None
768 else:
769 prefix_end = snapshot_object_names[range_end]
771 start_offsets.append(prefix_start)
772 end_offsets.append(prefix_end)
774 # Assign the coroutine all at once, and wait for them to finish listing.
775 results = await asyncio.gather(
776 *[
777 self._sequential_list_objects_helper(
778 bucket=bucket,
779 delimiter=delimiter,
780 start_offset=start_offsets[i],
781 end_offset=end_offsets[i],
782 prefix=prefix,
783 versions=versions,
784 max_results=page_size,
785 **kwargs,
786 )
787 for i in range(0, len(start_offsets))
788 ]
789 )
791 items = []
792 prefixes = []
794 # Concatenate the items and prefixes from each coroutine for final results.
795 for i in range(len(results)):
796 items_from_process, prefixes_from_process = results[i]
797 items.extend(items_from_process)
798 prefixes.extend(prefixes_from_process)
800 return items, prefixes
802 async def _sequential_list_objects_helper(
803 self,
804 bucket,
805 delimiter,
806 start_offset,
807 end_offset,
808 prefix,
809 versions,
810 max_results,
811 items_per_call=1000,
812 **kwargs,
813 ):
814 """
815 Sequential list objects within the start and end offset range.
816 """
817 max_results = max_results if max_results else 10_000_000
818 prefixes = []
819 items = []
820 num_items = min(items_per_call, max_results, 1000)
821 page = await self._call_list_objects(
822 bucket,
823 delimiter=delimiter,
824 prefix=prefix,
825 startOffset=start_offset,
826 endOffset=end_offset,
827 maxResults=num_items,
828 versions="true" if versions else None,
829 **kwargs,
830 )
832 prefixes.extend(page.get("prefixes", []))
833 items.extend(page.get("items", []))
834 next_page_token = page.get("nextPageToken", None)
836 while len(items) + len(prefixes) < max_results and next_page_token is not None:
837 num_items = min(
838 items_per_call, max_results - (len(items) + len(prefixes)), 1000
839 )
840 page = await self._call_list_objects(
841 bucket,
842 delimiter=delimiter,
843 prefix=prefix,
844 startOffset=start_offset,
845 endOffset=end_offset,
846 maxResults=num_items,
847 pageToken=next_page_token,
848 versions="true" if versions else None,
849 **kwargs,
850 )
852 assert page["kind"] == "storage#objects"
853 prefixes.extend(page.get("prefixes", []))
854 items.extend(page.get("items", []))
855 next_page_token = page.get("nextPageToken", None)
857 items = [self._process_object(bucket, i) for i in items]
859 return items, prefixes
861 async def _call_list_objects(self, bucket, **kwargs):
862 """
863 Helper method to fetch a single page of object listing.
864 Extracts valid GCS parameters from kwargs to prevent parameter pollution.
865 """
867 # Only pass valid parameters to the API call
868 valid_kwargs = {k: v for k, v in kwargs.items() if k in _VALID_LIST_PARAMS}
870 return await self._call(
871 "GET",
872 "b/{}/o",
873 bucket,
874 json_out=True,
875 **valid_kwargs,
876 )
878 async def _list_buckets(self):
879 """Return list of all buckets under the current project."""
880 if "" not in self.dircache:
881 items = []
882 page = await self._call("GET", "b", project=self.project, json_out=True)
884 assert page["kind"] == "storage#buckets"
885 items.extend(page.get("items", []))
886 next_page_token = page.get("nextPageToken", None)
888 while next_page_token is not None:
889 page = await self._call(
890 "GET",
891 "b",
892 project=self.project,
893 pageToken=next_page_token,
894 json_out=True,
895 )
897 assert page["kind"] == "storage#buckets"
898 items.extend(page.get("items", []))
899 next_page_token = page.get("nextPageToken", None)
901 buckets = [
902 {**i, "name": i["name"] + "/", "size": 0, "type": "directory"}
903 for i in items
904 ]
905 self.dircache[""] = buckets
906 return buckets
907 return self.dircache[""]
909 def invalidate_cache(self, path=None):
910 """
911 Invalidate listing cache for given path, it is reloaded on next use.
913 Parameters
914 ----------
915 path: string or None
916 If None, clear all listings cached else listings at or under given
917 path.
918 """
919 if path is None:
920 logger.debug("invalidate_cache clearing cache")
921 self.dircache.clear()
922 else:
923 path = self._strip_protocol(path).rstrip("/")
925 while path:
926 self.dircache.pop(path, None)
927 path = self._parent(path)
929 async def _mkdir(
930 self,
931 path,
932 acl="projectPrivate",
933 default_acl="bucketOwnerFullControl",
934 location=None,
935 create_parents=False,
936 enable_versioning=False,
937 enable_object_retention=False,
938 iam_configuration=None,
939 **kwargs,
940 ):
941 """
942 New bucket
944 If path is more than just a bucket, will create bucket if create_parents=True;
945 otherwise is a noop. If create_parents is False and bucket does not exist,
946 will produce FileNotFoundError.
948 Parameters
949 ----------
950 path: str
951 bucket name. If contains '/' (i.e., looks like subdir), will
952 have no effect because GCS doesn't have real directories.
953 acl: string, one of bACLs
954 access for the bucket itself. See:
955 https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
956 default_acl: str, one of ACLs
957 default ACL for objects created in this bucket
958 location: Optional[str]
959 Location where buckets are created, like 'US' or 'EUROPE-WEST3'.
960 If not provided, defaults to `self.default_location`.
961 You can find a list of all available locations here:
962 https://cloud.google.com/storage/docs/locations#available-locations
963 create_parents: bool
964 If True, creates the bucket in question, if it doesn't already exist
965 enable_versioning: bool
966 If True, creates the bucket in question with object versioning
967 enabled.
968 enable_object_retention: bool
969 If True, creates the bucket in question with object retention
970 permanently enabled.
971 iam_configuration: dict
972 If provided, sets the IAM policy for the bucket. This argument
973 allows setting properties such as `{publicAccessPrevention: "enforced"}`
974 and `{"uniformBucketLevelAccess": {"enabled": True}}`. If passed, `acl`
975 and `default_acl` are explicitly ignored.
976 **kwargs
977 Additional parameters passed to the API call request body. See:
978 https://cloud.google.com/storage/docs/json_api/v1/buckets/insert#request-body
979 for all possible options. Pass nested parameters as dictionaries, e.g.:
980 `{"autoclass": {"enabled": True}}`
981 """
982 bucket, object, generation = self.split_path(path)
983 if bucket in ["", "/"]:
984 raise ValueError("Cannot create root bucket")
985 if "/" in path and create_parents and await self._exists(bucket):
986 # nothing to do
987 return
988 if "/" in path:
989 if await self._exists(bucket):
990 return
991 if not create_parents:
992 raise FileNotFoundError(bucket)
994 json_data = {"name": bucket}
995 location = location or self.default_location
996 if location:
997 json_data["location"] = location
998 if enable_versioning:
999 json_data["versioning"] = {"enabled": True}
1000 if iam_configuration:
1001 json_data["iamConfiguration"] = iam_configuration
1002 acl = None
1003 default_acl = None
1004 if kwargs:
1005 json_data.update(kwargs)
1007 await self._call(
1008 method="POST",
1009 path="b",
1010 predefinedAcl=acl,
1011 project=self.project,
1012 predefinedDefaultObjectAcl=default_acl,
1013 enableObjectRetention=str(enable_object_retention).lower(),
1014 json=json_data,
1015 json_out=True,
1016 )
1017 self.invalidate_cache(bucket)
1019 mkdir = asyn.sync_wrapper(_mkdir)
1021 async def _rmdir(self, bucket):
1022 """Delete an empty bucket
1024 Parameters
1025 ----------
1026 bucket: str
1027 bucket name. If contains '/' (i.e., looks like subdir), will
1028 have no effect because GCS doesn't have real directories.
1029 """
1030 bucket = bucket.rstrip("/")
1031 if "/" in bucket:
1032 return
1033 await self._call("DELETE", "b/" + bucket, json_out=False)
1034 self.invalidate_cache(bucket)
1035 self.invalidate_cache("")
1037 rmdir = asyn.sync_wrapper(_rmdir)
1039 def modified(self, path):
1040 return self.info(path)["mtime"]
1042 def created(self, path):
1043 return self.info(path)["ctime"]
1045 def _parse_timestamp(self, timestamp):
1046 assert timestamp.endswith("Z")
1047 timestamp = timestamp[:-1]
1048 timestamp = timestamp + "0" * (6 - len(timestamp.rsplit(".", 1)[-1]))
1049 return datetime.fromisoformat(timestamp + "+00:00")
1051 async def _info(self, path, generation=None, **kwargs):
1052 """File information about this path."""
1053 path = self._strip_protocol(path).rstrip("/")
1054 if "/" not in path:
1055 try:
1056 out = await self._call("GET", f"b/{path}", json_out=True)
1057 out.update(size=0, type="directory")
1058 except OSError:
1059 # GET bucket failed, try ls; will have no metadata
1060 exists = await self._ls(path)
1061 if exists:
1062 out = {"name": path, "size": 0, "type": "directory"}
1063 else:
1064 raise FileNotFoundError(path)
1065 return out
1066 # Check directory cache for parent dir
1067 parent_path = self._parent(path)
1068 parent_cache = self._ls_from_cache(parent_path)
1069 bucket, key, path_generation = self.split_path(path)
1070 generation = _coalesce_generation(generation, path_generation)
1071 if parent_cache:
1072 name = "/".join((bucket, key))
1073 for o in parent_cache:
1074 if o["name"].rstrip("/") == name and (
1075 not generation or o.get("generation") == generation
1076 ):
1077 return o
1078 if self._ls_from_cache(path):
1079 # this is a directory
1080 return {
1081 "bucket": bucket,
1082 "name": path,
1083 "size": 0,
1084 "storageClass": "DIRECTORY",
1085 "type": "directory",
1086 }
1088 async with parallel_tasks_first_completed(
1089 [
1090 self._get_object(path),
1091 self._get_directory_info(path, bucket, key, generation),
1092 ]
1093 ) as (tasks, done, pending):
1094 get_object_task, get_directory_info_task = tasks
1096 try:
1097 get_object_res = await get_object_task
1098 if not _is_directory_marker(get_object_res):
1099 return get_object_res
1100 except FileNotFoundError:
1101 pass
1102 return await get_directory_info_task
1104 async def _get_directory_info(self, path, bucket, key, generation):
1105 """
1106 Internal method to check if a path is a directory by listing objects.
1107 """
1108 out = await self._list_objects(path, max_results=1)
1109 exact = next((o for o in out if o["name"].rstrip("/") == path), None)
1110 if exact and not _is_directory_marker(exact):
1111 # exact hit
1112 return exact
1113 elif out:
1114 # other stuff - must be a directory
1115 return {
1116 "bucket": bucket,
1117 "name": path,
1118 "size": 0,
1119 "storageClass": "DIRECTORY",
1120 "type": "directory",
1121 }
1122 else:
1123 raise FileNotFoundError(path)
1125 async def _ls(
1126 self, path, detail=False, prefix="", versions=False, refresh=False, **kwargs
1127 ):
1128 """List objects under the given '/{bucket}/{prefix} path."""
1129 path = self._strip_protocol(path).rstrip("/")
1131 if refresh:
1132 self.invalidate_cache(path)
1133 if path in ["/", ""]:
1134 out = await self._list_buckets()
1135 else:
1136 out = []
1137 dir_names = set()
1138 for entry in await self._list_objects(
1139 path, prefix=prefix, versions=versions, **kwargs
1140 ):
1141 if _is_directory_marker(entry):
1142 entry = {
1143 "bucket": entry["bucket"],
1144 "name": path.rstrip("/"),
1145 "size": 0,
1146 "storageClass": "DIRECTORY",
1147 "type": "directory",
1148 }
1150 if entry["type"] == "directory":
1151 if entry["name"] in dir_names:
1152 continue
1153 dir_names.add(entry["name"])
1155 if versions and "generation" in entry:
1156 entry = entry.copy()
1157 entry["name"] = f"{entry['name']}#{entry['generation']}"
1159 out.append(entry)
1161 out.sort(key=lambda e: (e["name"]))
1163 if detail:
1164 return out
1165 else:
1166 return [o["name"] for o in out]
1168 def url(self, path):
1169 """Get HTTP URL of the given path"""
1170 u = "{}/download/storage/v1/b/{}/o/{}?alt=media{}"
1171 bucket, object, generation = self.split_path(path)
1172 object = quote(object)
1173 return u.format(
1174 self._location,
1175 bucket,
1176 object,
1177 f"&generation={generation}" if generation else "",
1178 )
1180 async def _cat_file_sequential(self, path, start=None, end=None, **kwargs):
1181 """Simple one-shot get of file data"""
1182 # if start and end are both provided and valid, but start >= end, return empty bytes
1183 # Otherwise, _process_limits would generate an invalid HTTP range (e.g. "bytes=5-4"
1184 # for start=5, end=5), causing the server to return the whole file instead of nothing.
1185 if start is not None and end is not None and start >= end >= 0:
1186 return b""
1188 u2 = self.url(path)
1189 if start is not None or end is not None:
1190 head = {"Range": await self._process_limits(path, start, end)}
1191 else:
1192 head = {}
1194 headers, out = await self._call("GET", u2, headers=head)
1195 return out
1197 async def _cat_file_concurrent(
1198 self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs
1199 ):
1200 """Concurrent fetch of file data"""
1201 if start is None:
1202 start = 0
1203 if end is None:
1204 end = (await self._info(path))["size"]
1205 if start >= end:
1206 return b""
1208 if concurrency <= 1 or end - start < self.MIN_CHUNK_SIZE_FOR_CONCURRENCY:
1209 return await self._cat_file_sequential(path, start=start, end=end, **kwargs)
1211 total_size = end - start
1212 part_size = total_size // concurrency
1213 tasks = []
1215 for i in range(concurrency):
1216 offset = start + (i * part_size)
1217 actual_size = (
1218 part_size if i < concurrency - 1 else total_size - (i * part_size)
1219 )
1220 tasks.append(
1221 asyncio.create_task(
1222 self._cat_file_sequential(
1223 path, start=offset, end=offset + actual_size, **kwargs
1224 )
1225 )
1226 )
1228 try:
1229 results = await asyncio.gather(*tasks)
1230 return b"".join(results)
1231 except BaseException as e:
1232 for t in tasks:
1233 if not t.done():
1234 t.cancel()
1235 await asyncio.gather(*tasks, return_exceptions=True)
1236 raise e
1238 async def _cat_file(
1239 self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs
1240 ):
1241 """Simple one-shot, or concurrent get of file data"""
1242 if concurrency > 1:
1243 return await self._cat_file_concurrent(
1244 path, start=start, end=end, concurrency=concurrency, **kwargs
1245 )
1247 # While we could just call _cat_file_concurrent(concurrency=1), we are choosing
1248 # to keep it separate because concurrency code path is still in an experimental phase.
1249 # Once concurrency code path is stabilized, we can remove this if-else condition.
1250 return await self._cat_file_sequential(path, start=start, end=end, **kwargs)
1252 async def _getxattr(self, path, attr):
1253 """Get user-defined metadata attribute"""
1254 meta = (await self._info(path)).get("metadata", {})
1255 return meta[attr]
1257 getxattr = asyn.sync_wrapper(_getxattr)
1259 async def _setxattrs(
1260 self,
1261 path,
1262 content_type=None,
1263 content_encoding=None,
1264 fixed_key_metadata=None,
1265 **kwargs,
1266 ):
1267 """Set/delete/add writable metadata attributes
1269 Note: uses PATCH method (update), leaving unedited keys alone.
1270 fake-gcs-server:latest does not seem to support this.
1272 Parameters
1273 ----------
1274 content_type: str
1275 If not None, set the content-type to this value
1276 content_encoding: str
1277 This parameter is deprecated, you may use fixed_key_metadata instead.
1278 If not None, set the content-encoding.
1279 See https://cloud.google.com/storage/docs/transcoding
1280 fixed_key_metadata: dict
1281 Google metadata, in key/value pairs, supported keys:
1282 - cache_control
1283 - content_disposition
1284 - content_encoding
1285 - content_language
1286 - custom_time
1288 More info:
1289 https://cloud.google.com/storage/docs/metadata#mutable
1290 kw_args: key-value pairs like field="value" or field=None
1291 value must be string to add or modify, or None to delete
1293 Returns
1294 -------
1295 Entire metadata after update (even if only path is passed)
1296 """
1297 i_json = {"metadata": kwargs}
1298 if content_type is not None:
1299 i_json["contentType"] = content_type
1300 if content_encoding is not None:
1301 logger.warn(
1302 "setxattrs: content_encoding parameter is now deprecated "
1303 "you may use `fixed_key_metadata` instead"
1304 )
1305 i_json["contentEncoding"] = content_encoding
1306 i_json.update(_convert_fixed_key_metadata(fixed_key_metadata))
1308 bucket, key, generation = self.split_path(path)
1309 o_json = await self._call(
1310 "PATCH",
1311 "b/{}/o/{}",
1312 bucket,
1313 key,
1314 fields="metadata",
1315 json=i_json,
1316 json_out=True,
1317 )
1318 return o_json.get("metadata", {})
1320 setxattrs = asyn.sync_wrapper(_setxattrs)
1322 async def _merge(self, path, paths, acl=None):
1323 """Concatenate objects within a single bucket"""
1324 bucket, key, generation = self.split_path(path)
1325 source = [{"name": self.split_path(p)[1]} for p in paths]
1326 await self._call(
1327 "POST",
1328 "b/{}/o/{}/compose",
1329 bucket,
1330 key,
1331 destinationPredefinedAcl=acl,
1332 headers={"Content-Type": "application/json"},
1333 json={
1334 "sourceObjects": source,
1335 "kind": "storage#composeRequest",
1336 "destination": {"name": key, "bucket": bucket},
1337 },
1338 )
1340 merge = asyn.sync_wrapper(_merge)
1342 # TODO: Add async mv method in the async.py and remove from GCSFileSystem.
1343 async def _mv(
1344 self, path1, path2, recursive=False, maxdepth=None, batch_size=None, **kwargs
1345 ):
1346 if path1 == path2:
1347 return
1349 if isinstance(path1, list) and isinstance(path2, list):
1350 # No need to expand paths when both source and destination
1351 # are provided as lists
1352 paths1 = path1
1353 paths2 = path2
1354 else:
1355 source_is_str = isinstance(path1, str)
1356 paths1 = await self._expand_path(
1357 path1, maxdepth=maxdepth, recursive=recursive
1358 )
1359 if source_is_str and (not recursive or maxdepth is not None):
1360 # Non-recursive glob does not move directories
1361 paths1 = [
1362 p
1363 for p in paths1
1364 if not (asyn.trailing_sep(p) or await self._isdir(p))
1365 ]
1366 if not paths1:
1367 return
1369 source_is_file = len(paths1) == 1
1370 dest_is_dir = isinstance(path2, str) and (
1371 asyn.trailing_sep(path2) or await self._isdir(path2)
1372 )
1374 exists = source_is_str and (
1375 (has_magic(path1) and source_is_file)
1376 or (
1377 not has_magic(path1)
1378 and dest_is_dir
1379 and not asyn.trailing_sep(path1)
1380 )
1381 )
1382 paths2 = other_paths(
1383 paths1,
1384 path2,
1385 exists=exists,
1386 flatten=not source_is_str,
1387 )
1389 batch_size = batch_size or self.batch_size
1390 result = await asyn._run_coros_in_chunks(
1391 [self._mv_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)],
1392 batch_size=batch_size,
1393 return_exceptions=True,
1394 nofiles=True,
1395 )
1397 for res, p1 in zip(result, paths1):
1398 if isinstance(res, Exception):
1399 if isinstance(res, FileNotFoundError) and recursive:
1400 # Ignore FileNotFoundError for implicit directories returned by _expand_path.
1401 if any(p.startswith(p1.rstrip("/") + "/") for p in paths1):
1402 continue
1403 raise res
1405 mv = asyn.sync_wrapper(_mv)
1407 async def _cp_file(self, path1, path2, acl=None, **kwargs):
1408 """Duplicate remote file"""
1409 b1, k1, g1 = self.split_path(path1)
1410 b2, k2, g2 = self.split_path(path2)
1411 if g2:
1412 raise ValueError("Cannot write to specific object generation")
1413 out = await self._call(
1414 "POST",
1415 "b/{}/o/{}/rewriteTo/b/{}/o/{}",
1416 b1,
1417 k1,
1418 b2,
1419 k2,
1420 headers={"Content-Type": "application/json"},
1421 destinationPredefinedAcl=acl,
1422 json_out=True,
1423 sourceGeneration=g1,
1424 )
1425 while out["done"] is not True:
1426 out = await self._call(
1427 "POST",
1428 "b/{}/o/{}/rewriteTo/b/{}/o/{}",
1429 b1,
1430 k1,
1431 b2,
1432 k2,
1433 headers={"Content-Type": "application/json"},
1434 rewriteToken=out["rewriteToken"],
1435 destinationPredefinedAcl=acl,
1436 json_out=True,
1437 sourceGeneration=g1,
1438 )
1439 self.invalidate_cache(self._parent(path2))
1441 async def _mv_file_cache_update(self, path1, path2, response=None):
1442 self.invalidate_cache(self._parent(path1))
1443 self.invalidate_cache(self._parent(path2))
1445 async def _mv_file(self, path1, path2, **kwargs):
1446 src_bucket, src_key, generation1 = self.split_path(path1)
1447 dest_bucket, dest_key, generation2 = self.split_path(path2)
1449 if generation2:
1450 raise ValueError("Cannot move to specific object generation")
1452 if src_bucket == dest_bucket and src_key and dest_key:
1453 try:
1454 out = await self._call(
1455 "POST",
1456 "b/{}/o/{}/moveTo/o/{}",
1457 src_bucket,
1458 src_key,
1459 dest_key,
1460 sourceGeneration=generation1,
1461 headers={
1462 "Content-Type": "application/json",
1463 "X-Goog-GCS-Idempotency-Token": str(uuid.uuid4()),
1464 },
1465 json_out=True,
1466 )
1467 await self._mv_file_cache_update(path1, path2, out)
1468 return
1469 except FileNotFoundError:
1470 # Raise immediately because fallback will also fail when file is not found.
1471 raise
1472 except Exception as e:
1473 # TODO: Fallback is added to make sure there is smooth transition, it can be removed
1474 # once we have metrics proving that moveTo API is working properly for all bucket types.
1475 logger.warning(
1476 f"Failed to move file using moveTo API: {e}. Falling back to copy/delete."
1477 )
1479 await super()._mv_file(path1, path2, **kwargs)
1481 mv_file = asyn.sync_wrapper(_mv_file)
1483 async def _rm_file(self, path, **kwargs):
1484 bucket, key, generation = self.split_path(path)
1485 if key:
1486 await self._call("DELETE", "b/{}/o/{}", bucket, key, generation=generation)
1487 # TODO: This can be optimized for HNS buckets by not invalidating the entire parent
1488 # directory structure from cache but to just remove the deleted file entry from immediate parent's cache.
1489 self.invalidate_cache(posixpath.dirname(self._strip_protocol(path)))
1490 else:
1491 await self._rmdir(path)
1493 async def _rm_files(self, paths):
1494 import random
1496 template = (
1497 "\n--===============7330845974216740156==\n"
1498 "Content-Type: application/http\n"
1499 "Content-Transfer-Encoding: binary\n"
1500 "Content-ID: <b29c5de2-0db4-490b-b421-6a51b598bd11+{i}>"
1501 "\n\nDELETE /storage/v1/b/{bucket}/o/{key}{query} HTTP/1.1\n"
1502 "Content-Type: application/json\n"
1503 "accept: application/json\ncontent-length: 0\n"
1504 )
1505 out = []
1506 # Splitting requests into batches
1507 # See https://cloud.google.com/storage/docs/batch
1508 for retry in range(1, 6):
1509 remaining = []
1510 chunk = paths
1511 parts = []
1512 for i, p in enumerate(chunk):
1513 bucket, key, generation = self.split_path(p)
1514 query_params = self._get_params(
1515 {"generation": generation} if generation else {}
1516 )
1517 query = (
1518 ("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
1519 if query_params
1520 else ""
1521 )
1522 parts.append(
1523 template.format(
1524 i=i + 1,
1525 bucket=quote(bucket),
1526 key=quote(key),
1527 query=query,
1528 )
1529 )
1530 body = "".join(parts)
1531 headers, content = await self._call(
1532 "POST",
1533 self.batch_url_base,
1534 headers={
1535 "Content-Type": 'multipart/mixed; boundary="=========='
1536 '=====7330845974216740156=="'
1537 },
1538 data=body + "\n--===============7330845974216740156==--",
1539 )
1541 boundary = headers["Content-Type"].split("=", 1)[1]
1542 parents = set(self._parent(p) for p in paths) | set(paths)
1543 [self.invalidate_cache(parent) for parent in parents]
1544 txt = content.decode()
1545 responses = txt.split(boundary)[1:-1]
1546 for path, response in zip(paths, responses):
1547 m = re.search("HTTP/[0-9.]+ ([0-9]+)", response)
1548 code = int(m.groups()[0]) if m else None
1549 if code in [200, 204]:
1550 out.append(path)
1551 elif code in errs and retry < 5:
1552 remaining.append(path)
1553 else:
1554 msg = re.search("{(.*)}", response.replace("\n", ""))
1555 if msg:
1556 msg2 = re.search("({.*})", msg.groups()[0])
1557 else:
1558 msg2 = None
1559 if msg and msg2:
1560 out.append(OSError(msg2.groups()[0]))
1561 else:
1562 out.append(OSError(str(path, code)))
1563 if remaining:
1564 paths = remaining
1565 await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32))
1566 else:
1567 break
1568 return out
1570 @property
1571 def on_google(self):
1572 # match "torage" to handle both "storage" and "Storage"
1573 return f"torage.{_gcp_universe_domain()}" in self._location
1575 async def _delete_files(self, files, batchsize):
1576 """Helper to delete files in batches."""
1577 if self.on_google:
1578 # emulators do not support batch
1579 return sum(
1580 await asyn._run_coros_in_chunks(
1581 [
1582 self._rm_files(files[i : i + batchsize])
1583 for i in range(0, len(files), batchsize)
1584 ],
1585 return_exceptions=True,
1586 ),
1587 [],
1588 )
1589 else:
1590 return await asyn._run_coros_in_chunks(
1591 [self._rm_file(f) for f in files], return_exceptions=True, batch_size=5
1592 )
1594 async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
1595 paths = await self._expand_path(path, recursive=recursive, maxdepth=maxdepth)
1596 files = [p for p in paths if self.split_path(p)[1]]
1597 dirs = [p for p in paths if not self.split_path(p)[1]]
1598 exs = await self._delete_files(files, batchsize)
1600 # buckets
1601 exs.extend(
1602 await asyncio.gather(
1603 *[self._rmdir(d) for d in dirs], return_exceptions=True
1604 )
1605 )
1606 errors = [
1607 ex
1608 for ex in exs
1609 if isinstance(ex, Exception)
1610 and "No such object" not in str(ex)
1611 and not isinstance(ex, FileNotFoundError)
1612 ]
1613 if errors:
1614 raise errors[0]
1615 exs = [
1616 ex
1617 for ex in exs
1618 if "No such object" not in str(ex) and not isinstance(ex, FileNotFoundError)
1619 ]
1620 if not exs:
1621 # nothing got deleted
1622 raise FileNotFoundError(path)
1623 return exs
1625 rm = asyn.sync_wrapper(_rm)
1627 async def _pipe_file(
1628 self,
1629 path,
1630 data,
1631 metadata=None,
1632 consistency=None,
1633 content_type="application/octet-stream",
1634 fixed_key_metadata=None,
1635 chunksize=50 * 2**20,
1636 mode="overwrite",
1637 ):
1638 # enforce blocksize should be a multiple of 2**18
1639 consistency = consistency or self.consistency
1640 bucket, key, generation = self.split_path(path)
1641 size = len(data)
1642 out = None
1643 if size < chunksize:
1644 location = await simple_upload(
1645 self,
1646 bucket,
1647 key,
1648 data,
1649 metadata,
1650 consistency,
1651 content_type,
1652 fixed_key_metadata=fixed_key_metadata,
1653 mode=mode,
1654 )
1655 else:
1656 location = await initiate_upload(
1657 self,
1658 bucket,
1659 key,
1660 content_type,
1661 metadata,
1662 fixed_key_metadata=fixed_key_metadata,
1663 mode=mode,
1664 )
1665 try:
1666 for offset in range(0, len(data), chunksize):
1667 bit = data[offset : offset + chunksize]
1668 out = await upload_chunk(
1669 self, location, bit, offset, size, content_type
1670 )
1671 except Exception:
1672 await self._call(
1673 "DELETE",
1674 location.replace("&ifGenerationMatch=0", ""),
1675 )
1676 raise
1678 checker = get_consistency_checker(consistency)
1679 checker.update(data)
1680 checker.validate_json_response(out)
1682 self.invalidate_cache(self._parent(path))
1683 return location
1685 async def _put_file(
1686 self,
1687 lpath,
1688 rpath,
1689 metadata=None,
1690 consistency=None,
1691 content_type=None,
1692 chunksize=50 * 2**20,
1693 callback=None,
1694 fixed_key_metadata=None,
1695 mode="overwrite",
1696 **kwargs,
1697 ):
1698 # enforce blocksize should be a multiple of 2**18
1699 if os.path.isdir(lpath):
1700 return
1701 if content_type is None:
1702 content_type, _ = mimetypes.guess_type(lpath)
1703 if content_type is None:
1704 content_type = "application/octet-stream"
1705 callback = callback or NoOpCallback()
1706 consistency = consistency or self.consistency
1707 checker = get_consistency_checker(consistency)
1708 bucket, key, generation = self.split_path(rpath)
1709 if generation:
1710 raise ValueError("Cannot write to specific object generation")
1711 with open(lpath, "rb") as f0:
1712 size = f0.seek(0, 2)
1713 f0.seek(0)
1714 callback.set_size(size)
1716 if size < 5 * 2**20:
1717 await simple_upload(
1718 self,
1719 bucket,
1720 key,
1721 f0.read(),
1722 consistency=consistency,
1723 metadatain=metadata,
1724 content_type=content_type,
1725 fixed_key_metadata=fixed_key_metadata,
1726 mode=mode,
1727 )
1728 callback.absolute_update(size)
1730 else:
1731 location = await initiate_upload(
1732 self,
1733 bucket,
1734 key,
1735 content_type,
1736 metadata=metadata,
1737 fixed_key_metadata=fixed_key_metadata,
1738 mode=mode,
1739 )
1740 offset = 0
1741 try:
1742 while True:
1743 bit = f0.read(chunksize)
1744 if not bit:
1745 break
1746 out = await upload_chunk(
1747 self, location, bit, offset, size, content_type
1748 )
1749 offset += len(bit)
1750 callback.absolute_update(offset)
1751 checker.update(bit)
1752 except Exception:
1753 await self._call(
1754 "DELETE",
1755 self.location.replace("&ifGenerationMatch=0", ""),
1756 )
1757 raise
1759 checker.validate_json_response(out)
1761 self.invalidate_cache(self._parent(rpath))
1763 async def _isdir(self, path):
1765 try:
1766 return (await self._info(path))["type"] == "directory"
1767 except OSError:
1768 return False
1770 async def _find(
1771 self,
1772 path,
1773 withdirs=False,
1774 detail=False,
1775 prefix="",
1776 versions=False,
1777 maxdepth=None,
1778 update_cache=True,
1779 **kwargs,
1780 ):
1781 path = self._strip_protocol(path)
1783 if maxdepth is not None and maxdepth < 1:
1784 raise ValueError("maxdepth must be at least 1")
1786 # Fetch objects as if the path is a directory
1787 objects, _ = await self._do_list_objects(
1788 path, delimiter="", prefix=prefix, versions=versions
1789 )
1791 if not objects:
1792 # Fetch objects as if the path is a file
1793 bucket, key, _ = self.split_path(path)
1794 if prefix:
1795 _path = "" if not key else key.rstrip("/") + "/"
1796 _prefix = f"{_path}{prefix}"
1797 else:
1798 _prefix = key
1799 objects, _ = await self._do_list_objects(
1800 bucket, delimiter="", prefix=_prefix, versions=versions
1801 )
1802 else:
1803 _prefix = prefix
1805 path2 = path.rstrip("/") + "/"
1807 if not prefix:
1808 objects = [
1809 o for o in objects if o["name"].startswith(path2) or o["name"] == path
1810 ]
1812 dirs = self._get_dirs_and_update_cache(
1813 path, objects, prefix=prefix, update_cache=update_cache
1814 )
1816 if withdirs:
1817 objects = sorted(objects + list(dirs.values()), key=lambda x: x["name"])
1819 if maxdepth:
1820 # Filter returned objects based on requested maxdepth
1821 depth = path.rstrip("/").count("/") + maxdepth
1822 objects = list(filter(lambda o: o["name"].count("/") <= depth, objects))
1824 if detail:
1825 if versions:
1826 return {f"{o['name']}#{o['generation']}": o for o in objects}
1827 return {o["name"]: o for o in objects}
1829 if versions:
1830 return [f"{o['name']}#{o['generation']}" for o in objects]
1831 return [o["name"] for o in objects]
1833 def _get_dirs_and_update_cache(self, path, objects, prefix="", update_cache=True):
1834 """
1835 Populates the directory cache from a list of object details.
1837 This method reconstructs the directory hierarchy from a flat list
1838 of objects and update the cache, which improves the performance of
1839 subsequent `ls` calls.
1841 Parameters
1842 ----------
1843 path: str
1844 The root path of the find operation.
1845 objects: list[dict]
1846 A list of objects from which directories are extracted and cache is updated.
1847 prefix: str
1848 If a prefix is provided, the directory cache will *not* be updated,
1849 as the object list is considered partial.
1850 update_cache: bool
1851 Cache won't be updated if update_cache is False.
1853 Returns
1854 -------
1855 dict: A dictionary of all pseudo-directory entries created.
1856 """
1857 dirs = {}
1858 cache_entries = {}
1860 for obj in objects:
1861 # For native HNS empty folders, which are returned as directory types
1862 # but are not placeholders, we need to ensure they have an entry in the cache.
1863 if obj.get("type") == "directory":
1864 cache_entries.setdefault(obj["name"], {})
1866 parent = self._parent(obj["name"])
1867 previous = obj
1869 while parent:
1870 dir_key = self.split_path(parent)[1]
1871 if not dir_key or len(parent) < len(path.rstrip("/")):
1872 break
1874 dirs[parent] = {
1875 "Key": dir_key,
1876 "Size": 0,
1877 "name": parent,
1878 "StorageClass": "DIRECTORY",
1879 "type": "directory",
1880 "size": 0,
1881 }
1883 listing = cache_entries.setdefault(parent, {})
1884 name = previous["name"]
1885 if name not in listing:
1886 listing[name] = previous
1888 previous = dirs[parent]
1889 parent = self._parent(parent)
1890 if not prefix and update_cache:
1891 cache_entries_list = {k: list(v.values()) for k, v in cache_entries.items()}
1892 self.dircache.update(cache_entries_list)
1893 return dirs
1895 @retry_request(retries=retries)
1896 async def _get_file_request(
1897 self, rpath, lpath, *args, headers=None, callback=None, **kwargs
1898 ):
1899 consistency = kwargs.pop("consistency", self.consistency)
1900 await self._set_session()
1901 async with self.session.get(
1902 url=rpath,
1903 params=self._get_params(kwargs),
1904 headers=self._get_headers(headers),
1905 timeout=self.requests_timeout,
1906 ) as r:
1907 validate_response(r.status, None, rpath)
1908 try:
1909 size = int(r.headers["content-length"])
1910 except (KeyError, ValueError):
1911 size = None
1912 callback.set_size(size)
1914 checker = get_consistency_checker(consistency)
1915 lparent = os.path.dirname(lpath) or os.curdir
1916 os.makedirs(lparent, exist_ok=True)
1917 with open(lpath, "wb") as f2:
1918 while True:
1919 data = await r.content.read(4096 * 32)
1920 if not data:
1921 break
1922 f2.write(data)
1923 checker.update(data)
1924 callback.relative_update(len(data))
1926 validate_response(r.status, data, rpath) # validate http request
1927 checker.validate_http_response(r) # validate file consistency
1928 return r.status, r.headers, r.request_info, data
1930 async def _get_file(self, rpath, lpath, callback=None, **kwargs):
1931 u2 = self.url(rpath)
1932 if os.path.isdir(lpath):
1933 return
1934 callback = callback or NoOpCallback()
1935 await self._get_file_request(u2, lpath, callback=callback, **kwargs)
1937 def _open(
1938 self,
1939 path,
1940 mode="rb",
1941 block_size=None,
1942 cache_options=None,
1943 acl=None,
1944 consistency=None,
1945 metadata=None,
1946 autocommit=True,
1947 fixed_key_metadata=None,
1948 generation=None,
1949 **kwargs,
1950 ):
1951 """
1952 See ``GCSFile``.
1954 consistency: None or str
1955 If None, use default for this instance
1956 """
1957 if block_size is None:
1958 block_size = self.default_block_size
1959 const = consistency or self.consistency
1960 return GCSFile(
1961 self,
1962 path,
1963 mode,
1964 block_size,
1965 cache_options=cache_options,
1966 consistency=const,
1967 metadata=metadata,
1968 acl=acl,
1969 autocommit=autocommit,
1970 fixed_key_metadata=fixed_key_metadata,
1971 **kwargs,
1972 )
1974 @classmethod
1975 def _split_path(cls, path, version_aware=False):
1976 """
1977 Normalise GCS path string into bucket and key.
1979 Parameters
1980 ----------
1981 path : string
1982 Input path, like `gcs://mybucket/path/to/file`.
1983 Path is of the form: '[gs|gcs://]bucket[/key][?querystring][#fragment]'
1985 GCS allows object generation (object version) to be specified in either
1986 the URL fragment or the `generation` query parameter. When provided,
1987 the fragment will take priority over the `generation` query parameter.
1989 Returns
1990 -------
1991 (bucket, key, generation) tuple
1992 """
1993 path = cls._strip_protocol(path).lstrip("/")
1994 if "/" not in path:
1995 return path, "", None
1996 bucket, keypart = path.split("/", 1)
1997 key = keypart
1998 generation = None
1999 if version_aware:
2000 parts = urlsplit(keypart)
2001 try:
2002 if parts.fragment:
2003 generation = parts.fragment
2004 elif parts.query:
2005 parsed = parse_qs(parts.query)
2006 if "generation" in parsed:
2007 generation = parsed["generation"][0]
2008 # Sanity check whether this could be a valid generation ID. If
2009 # it is not, assume that # or ? characters are supposed to be
2010 # part of the object name.
2011 if generation is not None:
2012 int(generation)
2013 key = parts.path
2014 except ValueError:
2015 generation = None
2016 return (
2017 bucket,
2018 key,
2019 generation,
2020 )
2022 def split_path(self, path):
2023 return self._split_path(path, version_aware=self.version_aware)
2025 def sign(self, path, expiration=100, **kwargs):
2026 """Create a signed URL representing the given path.
2028 Parameters
2029 ----------
2030 path : str
2031 The path on the filesystem
2032 expiration : int
2033 Number of seconds to enable the URL for
2035 Returns
2036 -------
2037 URL : str
2038 The signed URL
2039 """
2040 from google.cloud import storage
2042 client = storage.Client(
2043 credentials=self.credentials.credentials,
2044 project=self.project,
2045 )
2047 bucket, key, generation = self.split_path(path)
2048 bucket = client.bucket(bucket)
2049 blob = bucket.blob(key)
2051 return blob.generate_signed_url(
2052 expiration=timedelta(seconds=expiration),
2053 generation=generation,
2054 api_access_endpoint=self._endpoint,
2055 **kwargs,
2056 )
2059GoogleCredentials.load_tokens()
2062class GCSFile(fsspec.spec.AbstractBufferedFile):
2063 def __init__(
2064 self,
2065 gcsfs,
2066 path,
2067 mode="rb",
2068 block_size=DEFAULT_BLOCK_SIZE,
2069 autocommit=True,
2070 cache_type="readahead",
2071 cache_options=None,
2072 acl=None,
2073 consistency="md5",
2074 metadata=None,
2075 content_type=None,
2076 timeout=None,
2077 fixed_key_metadata=None,
2078 generation=None,
2079 kms_key_name=None,
2080 **kwargs,
2081 ):
2082 """
2083 Open a file.
2085 Parameters
2086 ----------
2087 gcsfs: instance of GCSFileSystem
2088 path: str
2089 location in GCS, like 'bucket/path/to/file'
2090 mode: str
2091 Normal file modes. Currently only 'wb' amd 'rb'.
2092 block_size: int
2093 Buffer size for reading or writing
2094 acl: str
2095 ACL to apply, if any, one of ``ACLs``. New files are normally
2096 "bucketownerfullcontrol", but a default can be configured per
2097 bucket.
2098 consistency: str, 'none', 'size', 'md5', 'crc32c'
2099 Check for success in writing, applied at file close.
2100 'size' ensures that the number of bytes reported by GCS matches
2101 the number we wrote; 'md5' does a full checksum. Any value other
2102 than 'size' or 'md5' or 'crc32c' is assumed to mean no checking.
2103 content_type: str
2104 default when unspecified is provided by mimetypes.guess_type or
2105 otherwise `application/octet-stream`. See the list of available
2106 content types at https://www.iana.org/assignments/media-types/media-types.txt
2107 metadata: dict
2108 Custom metadata, in key/value pairs, added at file creation
2109 fixed_key_metadata: dict
2110 Google metadata, in key/value pairs, supported keys:
2111 - cache_control
2112 - content_disposition
2113 - content_encoding
2114 - content_language
2115 - custom_time
2116 More info:
2117 https://cloud.google.com/storage/docs/metadata#mutable
2118 kms_key_name: str
2119 Resource name of the Cloud KMS key that will be used to encrypt
2120 the object.
2121 More info:
2122 https://cloud.google.com/storage/docs/encryption/customer-managed-keys
2123 timeout: int
2124 Timeout seconds for the asynchronous callback.
2125 generation: str
2126 Object generation.
2127 """
2128 bucket, key, path_generation = gcsfs.split_path(path)
2129 if not key:
2130 raise OSError("Attempt to open a bucket")
2131 self.generation = _coalesce_generation(generation, path_generation)
2132 self.concurrency = kwargs.get("concurrency", DEFAULT_CONCURRENCY)
2133 super().__init__(
2134 gcsfs,
2135 path,
2136 mode,
2137 block_size,
2138 autocommit=autocommit,
2139 cache_type=cache_type,
2140 cache_options=cache_options,
2141 **kwargs,
2142 )
2143 self.gcsfs = gcsfs
2144 self.bucket = bucket
2145 self.key = key
2146 self.acl = acl
2147 self.consistency = consistency
2148 self.checker = get_consistency_checker(consistency)
2150 # Ideally, all of these fields should be part of `cache_options`. Because current
2151 # `fsspec` caches do not accept arbitrary `*args` and `**kwargs`, passing them
2152 # there currently causes instantiation errors. We are holding off on introducing
2153 # them as explicit keyword arguments to ensure existing user workloads are not
2154 # disrupted. This will be refactored once the upstream `fsspec` changes are merged.
2155 use_prefetch_reader = kwargs.get(
2156 "use_experimental_adaptive_prefetching", False
2157 ) or os.environ.get(
2158 "USE_EXPERIMENTAL_ADAPTIVE_PREFETCHING", "false"
2159 ).lower() in (
2160 "true",
2161 "1",
2162 )
2164 if "r" in mode and use_prefetch_reader:
2165 max_prefetch_size = kwargs.get("max_prefetch_size", MAX_PREFETCH_SIZE)
2166 from .prefetcher import BackgroundPrefetcher
2168 self._prefetch_engine = BackgroundPrefetcher(
2169 self._async_fetch_range,
2170 self.size,
2171 max_prefetch_size=max_prefetch_size,
2172 concurrency=self.concurrency,
2173 )
2174 else:
2175 self._prefetch_engine = None
2177 # _supports_append is an internal argument not meant to be used directly.
2178 # If True, allows opening file in append mode. This is generally not supported
2179 # by GCS, but may be supported by subclasses (e.g. ZonalFile). This flag should
2180 # be set by subclasses that support append operations. Otherwise, the mode
2181 # will be overwritten to "wb" mode with a warning.
2182 _supports_append = kwargs.pop("_supports_append", False)
2183 if "a" in self.mode and not _supports_append:
2184 warnings.warn(
2185 "Append mode 'a' is not supported in GCS. Using overwrite mode instead."
2186 )
2187 self.mode = self.mode.replace("a", "w")
2189 if "r" in self.mode:
2190 det = self.details
2191 else:
2192 det = {}
2193 self.content_type = content_type or det.get(
2194 "contentType",
2195 mimetypes.guess_type(self.path)[0] or "application/octet-stream",
2196 )
2197 self.metadata = metadata or det.get("metadata", {})
2198 self.fixed_key_metadata = _convert_fixed_key_metadata(det, from_google=True)
2199 self.fixed_key_metadata.update(fixed_key_metadata or {})
2200 self.kms_key_name = kms_key_name
2201 self.timeout = timeout
2202 if mode in {"wb", "xb"}:
2203 if self.blocksize < GCS_MIN_BLOCK_SIZE:
2204 warnings.warn("Setting block size to minimum value, 2**18")
2205 self.blocksize = GCS_MIN_BLOCK_SIZE
2206 self.location = None
2208 @property
2209 def details(self):
2210 if self._details is None:
2211 self._details = self.fs.info(self.path, generation=self.generation)
2212 return self._details
2214 def info(self):
2215 """File information about this path"""
2216 return self.details
2218 def url(self):
2219 """HTTP link to this file's data"""
2220 return self.fs.url(self.path)
2222 def _upload_chunk(self, final=False):
2223 """Write one part of a multi-block file upload
2225 Parameters
2226 ----------
2227 final: bool
2228 Complete and commit upload
2229 """
2230 while True:
2231 # shortfall splits blocks bigger than max allowed upload
2232 data = self.buffer.getvalue()
2233 head = {}
2234 l = len(data)
2236 if (l < GCS_MIN_BLOCK_SIZE) and (not final or not self.autocommit):
2237 # either flush() was called, but we don't have enough to
2238 # push, or we split a big upload, and have less left than one
2239 # block. If this is the final part, OK to violate those
2240 # terms.
2241 return False
2243 # Select the biggest possible chunk of data to be uploaded
2244 chunk_length = min(l, GCS_MAX_BLOCK_SIZE)
2245 chunk = data[:chunk_length]
2246 if final and self.autocommit and chunk_length == l:
2247 if l:
2248 # last chunk
2249 head["Content-Range"] = "bytes %i-%i/%i" % (
2250 self.offset,
2251 self.offset + chunk_length - 1,
2252 self.offset + l,
2253 )
2254 else:
2255 # closing when buffer is empty
2256 head["Content-Range"] = "bytes */%i" % self.offset
2257 data = None
2258 else:
2259 head["Content-Range"] = "bytes %i-%i/*" % (
2260 self.offset,
2261 self.offset + chunk_length - 1,
2262 )
2263 head.update(
2264 {"Content-Type": self.content_type, "Content-Length": str(chunk_length)}
2265 )
2266 headers, contents = self.gcsfs.call(
2267 "POST", self.location, headers=head, data=chunk
2268 )
2269 if "Range" in headers:
2270 end = int(headers["Range"].split("-")[1])
2271 shortfall = (self.offset + l - 1) - end
2272 if shortfall > 0:
2273 self.checker.update(data[:-shortfall])
2274 self.buffer = UnclosableBytesIO(data[-shortfall:])
2275 self.buffer.seek(shortfall)
2276 self.offset += l - shortfall
2277 continue
2278 else:
2279 self.checker.update(data)
2280 if final and contents:
2281 j = json.loads(contents)
2282 self.generation = j.get("generation")
2283 else:
2284 assert final, "Response looks like upload is over"
2285 if l:
2286 j = json.loads(contents)
2287 self.checker.update(data)
2288 self.checker.validate_json_response(j)
2289 self.generation = j.get("generation")
2290 # Clear buffer and update offset when all is received
2291 self.buffer = UnclosableBytesIO()
2292 self.offset += l
2293 break
2294 return True
2296 def commit(self):
2297 """If not auto-committing, finalize file"""
2298 self.autocommit = True
2299 self._upload_chunk(final=True)
2301 def _initiate_upload(self):
2302 """Create multi-upload"""
2303 self.location = asyn.sync(
2304 self.gcsfs.loop,
2305 initiate_upload,
2306 self.gcsfs,
2307 self.bucket,
2308 self.key,
2309 self.content_type,
2310 self.metadata,
2311 self.fixed_key_metadata,
2312 mode="create" if "x" in self.mode else "overwrite",
2313 kms_key_name=self.kms_key_name,
2314 timeout=self.timeout,
2315 )
2317 def discard(self):
2318 """Cancel in-progress multi-upload
2320 Should only happen during discarding this write-mode file
2321 """
2322 if self.location is None:
2323 return
2324 self.gcsfs.call(
2325 "DELETE",
2326 self.location.replace("&ifGenerationMatch=0", ""),
2327 )
2328 self.location = None
2329 self.closed = True
2331 def _simple_upload(self):
2332 """One-shot upload, less than 5MB"""
2333 self.buffer.seek(0)
2334 data = self.buffer.read()
2335 j = asyn.sync(
2336 self.gcsfs.loop,
2337 simple_upload,
2338 self.gcsfs,
2339 self.bucket,
2340 self.key,
2341 data,
2342 self.metadata,
2343 self.consistency,
2344 self.content_type,
2345 self.fixed_key_metadata,
2346 mode="create" if "x" in self.mode else "overwrite",
2347 kms_key_name=self.kms_key_name,
2348 timeout=self.timeout,
2349 )
2350 self.generation = j.get("generation")
2352 def _fetch_range(self, start=None, end=None):
2353 """Get data from GCS
2355 start, end : None or integers
2356 if not both None, fetch only given range
2357 """
2358 try:
2359 if hasattr(self, "_prefetch_engine") and self._prefetch_engine:
2360 return self._prefetch_engine._fetch(start=start, end=end)
2361 return self.fs.cat_file(
2362 self.path, start=start, end=end, concurrency=self.concurrency
2363 )
2364 except RuntimeError as e:
2365 if "not satisfiable" in str(e):
2366 return b""
2367 raise
2369 async def _async_fetch_range(self, start_offset, total_size, split_factor=1):
2370 """Async fetcher mapped to the Prefetcher engine for regional buckets."""
2371 return await self.gcsfs._cat_file_concurrent(
2372 self.path,
2373 start=start_offset,
2374 end=start_offset + total_size,
2375 concurrency=split_factor,
2376 )
2378 def close(self):
2379 super().close()
2380 if hasattr(self, "_prefetch_engine") and self._prefetch_engine:
2381 self._prefetch_engine.close()
2384def _convert_fixed_key_metadata(metadata, *, from_google=False):
2385 """
2386 Convert fixed_key_metadata to/from GCS format.
2388 Google uses camelCase for its parameters, this function transform
2389 exposed fixed_key_metadata (snake_case) to or from GCS(google) format
2391 Parameters
2392 ----------
2393 metadata: dict
2394 A key value pair of fixed_key_metadata, key can be either
2395 camel case or snake case.
2396 from_google: bool
2397 True means that the metadata come from google and thus should be converted
2398 to snake_case
2399 """
2400 out = {}
2401 if metadata is None:
2402 return out
2404 for key, attribute_name in SUPPORTED_FIXED_KEY_METADATA.items():
2405 src = key if not from_google else attribute_name
2406 dst = attribute_name if not from_google else key
2407 if src in metadata:
2408 out[dst] = metadata[src]
2409 return out
2412async def upload_chunk(fs, location, data, offset, size, content_type):
2413 """
2414 Uploads a chunk of data. This function has a conditional path to support
2415 experimental features for Zonal buckets to append data using gRPC.
2416 """
2417 from google.cloud.storage.asyncio.async_appendable_object_writer import (
2418 AsyncAppendableObjectWriter,
2419 )
2421 from .extended_gcsfs import ExtendedGcsFileSystem
2422 from .extended_gcsfs import upload_chunk as ext_upload_chunk
2424 # location is AsyncAppendableObjectWriter only when ExtendedGcsFileSystem is used
2425 if isinstance(fs, ExtendedGcsFileSystem) and isinstance(
2426 location, AsyncAppendableObjectWriter
2427 ):
2429 return await ext_upload_chunk(fs, location, data, offset, size, content_type)
2430 head = {}
2431 l = len(data)
2432 range = "bytes %i-%i/%i" % (offset, offset + l - 1, size)
2433 head["Content-Range"] = range
2434 head.update({"Content-Type": content_type, "Content-Length": str(l)})
2435 headers, txt = await fs._call(
2436 "POST", location, headers=head, data=UnclosableBytesIO(data)
2437 )
2438 if "Range" in headers:
2439 end = int(headers["Range"].split("-")[1])
2440 shortfall = (offset + l - 1) - end
2441 if shortfall:
2442 return await upload_chunk(
2443 fs, location, data[-shortfall:], end, size, content_type
2444 )
2445 return json.loads(txt) if txt else None
2448async def initiate_upload(
2449 fs,
2450 bucket,
2451 key,
2452 content_type="application/octet-stream",
2453 metadata=None,
2454 fixed_key_metadata=None,
2455 mode="overwrite",
2456 kms_key_name=None,
2457):
2458 """
2459 Initiates a resumable upload. This function has a conditional path to support
2460 experimental features for Zonal buckets to append data using gRPC, returning an
2461 "AsyncAppendableObjectWriter" instance as location.
2462 """
2463 from .extended_gcsfs import ExtendedGcsFileSystem
2464 from .extended_gcsfs import initiate_upload as ext_initiate_upload
2466 # Explicit type checking is used to ensure only the ExtendedGcsFileSystem
2467 # enters this path, ruling out false positives from mocks or coincidentally matching attributes.
2468 if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket):
2470 return await ext_initiate_upload(
2471 fs,
2472 bucket,
2473 key,
2474 content_type,
2475 metadata,
2476 fixed_key_metadata,
2477 mode,
2478 kms_key_name,
2479 )
2481 j = {"name": key}
2482 if metadata:
2483 j["metadata"] = metadata
2484 kw = {"ifGenerationMatch": "0"} if mode == "create" else {}
2485 if kms_key_name:
2486 kw["kmsKeyName"] = kms_key_name
2487 j.update(_convert_fixed_key_metadata(fixed_key_metadata))
2488 headers, _ = await fs._call(
2489 method="POST",
2490 path=f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o?name={quote(key)}",
2491 uploadType="resumable",
2492 json=j,
2493 headers={"X-Upload-Content-Type": content_type},
2494 **kw,
2495 )
2496 loc = headers["Location"]
2497 out = loc[0] if isinstance(loc, list) else loc # <- for CVR responses
2498 if len(str(loc)) < 20:
2499 logger.error("Location failed: %s" % headers)
2500 return out
2503async def simple_upload(
2504 fs,
2505 bucket,
2506 key,
2507 datain,
2508 metadatain=None,
2509 consistency=None,
2510 content_type="application/octet-stream",
2511 fixed_key_metadata=None,
2512 mode="overwrite",
2513 kms_key_name=None,
2514):
2515 """
2516 Performs a simple, single-request upload. This function has a conditional path to support
2517 experimental features for Zonal buckets to upload data using gRPC.
2518 """
2519 from .extended_gcsfs import ExtendedGcsFileSystem
2520 from .extended_gcsfs import simple_upload as ext_simple_upload
2522 # Explicit type checking is used to ensure only the ExtendedGcsFileSystem
2523 # enters this path, ruling out false positives from mocks or coincidentally matching attributes.
2524 if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket):
2526 return await ext_simple_upload(
2527 fs,
2528 bucket,
2529 key,
2530 datain,
2531 metadatain,
2532 consistency,
2533 content_type,
2534 fixed_key_metadata,
2535 mode,
2536 kms_key_name,
2537 )
2539 checker = get_consistency_checker(consistency)
2540 path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o"
2541 metadata = {"name": key}
2542 if metadatain is not None:
2543 metadata["metadata"] = metadatain
2544 kw = {"ifGenerationMatch": "0"} if mode == "create" else {}
2545 if kms_key_name:
2546 kw["kmsKeyName"] = kms_key_name
2547 metadata.update(_convert_fixed_key_metadata(fixed_key_metadata))
2548 metadata = json.dumps(metadata)
2549 template = (
2550 "--==0=="
2551 "\nContent-Type: application/json; charset=UTF-8"
2552 "\n\n" + metadata + "\n--==0==" + f"\nContent-Type: {content_type}" + "\n\n"
2553 )
2555 data = template.encode() + datain + b"\n--==0==--"
2556 j = await fs._call(
2557 "POST",
2558 path,
2559 uploadType="multipart",
2560 headers={"Content-Type": 'multipart/related; boundary="==0=="'},
2561 data=UnclosableBytesIO(data),
2562 json_out=True,
2563 **kw,
2564 )
2565 checker.update(datain)
2566 checker.validate_json_response(j)
2567 return j