Coverage for gcsfs/credentials.py: 79%
170 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1import json
2import logging
3import os
4import pickle
5import textwrap
6import threading
7import warnings
8from datetime import datetime, timezone
10import google.auth as gauth
11import google.auth.compute_engine
12import google.auth.credentials
13import google.auth.exceptions
14import requests
15from google.auth.transport.requests import Request
16from google.oauth2 import service_account
17from google.oauth2.credentials import Credentials
18from google_auth_oauthlib.flow import InstalledAppFlow
20from gcsfs.retry import HttpError, NonRetryableError
22logger = logging.getLogger("gcsfs.credentials")
24tfile = os.path.join(os.path.expanduser("~"), ".gcs_tokens")
26not_secret = {
27 "client_id": "586241054156-9kst7ltfj66svc342pcn43vp6ta3idin"
28 ".apps.googleusercontent.com",
29 "client_secret": "xto0LIFYX35mmHF9T1R2QBqT",
30}
32client_config = {
33 "installed": {
34 "client_id": not_secret["client_id"],
35 "client_secret": not_secret["client_secret"],
36 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
37 "token_uri": "https://accounts.google.com/o/oauth2/token",
38 }
39}
41TOKEN_INFO_TIMEOUT_SECONDS = 10
42LOCAL_REFRESH_BUFFER = 300 # Greater than google.auth._helpers.REFRESH_THRESHOLD
45def _get_creds_from_raw_token(token):
46 # Default to True. Only disable if user explicitly says 'false', '0', or 'off'.
47 env_val = os.environ.get("FETCH_RAW_TOKEN_EXPIRY", "true").lower()
48 should_fetch_expiry = env_val not in ("false", "0", "off", "no")
50 if should_fetch_expiry:
51 response = requests.get(
52 "https://oauth2.googleapis.com/tokeninfo",
53 params={"access_token": token},
54 timeout=TOKEN_INFO_TIMEOUT_SECONDS,
55 )
57 if response.status_code == 400:
58 # Token is likely expired or invalid format
59 raise ValueError("Provided token is either not valid, or expired.")
61 response.raise_for_status()
62 expiry = datetime.utcfromtimestamp(float(response.json()["exp"]))
64 time_remaining = max(
65 0,
66 (
67 expiry.replace(tzinfo=timezone.utc) - datetime.now(timezone.utc)
68 ).total_seconds(),
69 )
70 if time_remaining <= LOCAL_REFRESH_BUFFER:
71 raise ValueError(
72 f"The provided raw token expires in {time_remaining} seconds, "
73 f"which is less than the safety buffer ({LOCAL_REFRESH_BUFFER}). "
74 "This may cause immediate authentication failures. "
75 "To bypass this check and safety buffer, you can set the environment "
76 "variable FETCH_RAW_TOKEN_EXPIRY=false (expiry will be unknown)."
77 )
78 else:
79 expiry = None
81 return Credentials(token, expiry=expiry)
84class GoogleCredentials:
85 def __init__(self, project, access, token, check_credentials=None, on_google=True):
86 self.scope = "https://www.googleapis.com/auth/devstorage." + access
87 self.project = project
88 self.access = access
89 self.heads = {}
91 self.credentials = None
92 self.method = None
93 self.lock = threading.Lock()
94 self.token = token
95 self.on_google = on_google
96 self.connect(method=token)
98 if check_credentials:
99 warnings.warn(
100 "The `check_credentials` argument is deprecated and will be removed in a future release.",
101 DeprecationWarning,
102 )
104 @classmethod
105 def load_tokens(cls):
106 """Get "browser" tokens from disc"""
107 try:
108 with open(tfile, "rb") as f:
109 tokens = pickle.load(f)
110 except Exception:
111 tokens = {}
112 GoogleCredentials.tokens = tokens
114 @staticmethod
115 def _save_tokens():
116 try:
117 with open(tfile, "wb") as f:
118 pickle.dump(GoogleCredentials.tokens, f, 2)
119 except Exception as e:
120 warnings.warn("Saving token cache failed: " + str(e))
122 def _connect_google_default(self):
123 with requests.Session() as session:
124 req = Request(session)
125 credentials, project = gauth.default(scopes=[self.scope], request=req)
127 msg = textwrap.dedent(
128 """\
129 User-provided project '{}' does not match the google default project '{}'. Either
131 1. Accept the google-default project by not passing a `project` to GCSFileSystem
132 2. Configure the default project to match the user-provided project (gcloud config set project)
133 3. Use an authorization method other than 'google_default' by providing 'token=...'
134 """
135 )
136 if self.project and self.project != project:
137 raise ValueError(msg.format(self.project, project))
138 self.project = project
139 self.credentials = credentials
141 def _connect_cloud(self):
142 if not self.on_google:
143 raise ValueError
144 self.credentials = gauth.compute_engine.Credentials()
145 try:
146 with requests.Session() as session:
147 req = Request(session)
148 self.credentials.refresh(req)
149 except gauth.exceptions.RefreshError as error:
150 raise ValueError("Invalid gcloud credentials") from error
152 def _connect_cache(self):
153 if len(self.tokens) == 0:
154 raise ValueError("No cached tokens")
156 project, access = self.project, self.access
157 if (project, access) in self.tokens:
158 credentials = self.tokens[(project, access)]
159 self.credentials = credentials
161 def _dict_to_credentials(self, token):
162 """
163 Convert old dict-style token.
165 Does not preserve access token itself, assumes refresh required.
166 """
167 try:
168 token = service_account.Credentials.from_service_account_info(
169 token, scopes=[self.scope]
170 )
171 except: # noqa: E722
172 # TODO: catch specific exceptions
173 # According https://github.com/googleapis/python-cloud-core/blob/master/google/cloud/client.py
174 # Scopes required for authenticating with a service. User authentication fails
175 # with invalid_scope if scope is specified.
176 token = Credentials(
177 None,
178 refresh_token=token["refresh_token"],
179 client_secret=token["client_secret"],
180 client_id=token["client_id"],
181 token_uri="https://oauth2.googleapis.com/token",
182 )
183 return token
185 def _connect_token(self, token):
186 """
187 Connect using a concrete token
189 Parameters
190 ----------
191 token: str, dict or Credentials
192 If a str and a valid file name, try to load as a Service file, or next as a JSON;
193 if not a valid file name, assume it's a valid raw (non-renewable/session) token, and pass to Credentials. If
194 dict, try to interpret as credentials; if Credentials, use directly.
195 """
196 if isinstance(token, str):
197 if os.path.exists(token):
198 try:
199 # is this a "service" token?
200 self._connect_service(token)
201 return
202 except: # noqa: E722
203 # TODO: catch specific exceptions
204 # some other kind of token file
205 # will raise exception if is not json
206 with open(token) as data:
207 token = json.load(data)
208 else:
209 token = _get_creds_from_raw_token(token)
210 if isinstance(token, dict):
211 credentials = self._dict_to_credentials(token)
212 elif isinstance(token, google.auth.credentials.Credentials):
213 credentials = token
214 else:
215 raise ValueError("Token format not understood")
216 self.credentials = credentials
217 if self.credentials.valid:
218 self.credentials.apply(self.heads)
220 def _credentials_valid(self, refresh_buffer):
221 return (
222 self.credentials.valid
223 # In addition to checking current validity, we ensure that there is
224 # not a near-future expiry to avoid errors when expiration hits.
225 and (
226 (
227 self.credentials.expiry
228 and (
229 self.credentials.expiry.replace(tzinfo=timezone.utc)
230 - datetime.now(timezone.utc)
231 ).total_seconds()
232 > refresh_buffer
233 )
234 or not self.credentials.expiry
235 )
236 )
238 def maybe_refresh(self, refresh_buffer=LOCAL_REFRESH_BUFFER):
239 """
240 Check and refresh credentials if needed
241 """
242 if self.credentials is None:
243 return # anon
245 if self._credentials_valid(refresh_buffer):
246 return # still good, with buffer
248 with requests.Session() as session:
249 req = Request(session)
250 with self.lock:
251 if self._credentials_valid(refresh_buffer):
252 return # repeat check to avoid race conditions
254 logger.debug("GCS refresh")
255 try:
256 self.credentials.refresh(req)
257 except gauth.exceptions.RefreshError as error:
258 # There may be scenarios where this error is raised from the client side due
259 # to missing necessary attributes to refresh the token, For instance
260 # https://github.com/googleapis/google-auth-library-python/blob/main/google/oauth2/_credentials_async.py#L51
261 # In such cases, the request gets retried
262 # with backoff strategy, which can be avoided.
264 # Check for client side errors (if any)
265 if (
266 "credentials do not contain the necessary fields need to refresh"
267 in str(error)
268 ):
269 raise NonRetryableError(
270 "Got error while refreshing credentials."
271 ) from error
273 # Re-raise as HttpError with a 401 code and the expected message
274 raise HttpError(
275 {"code": 401, "message": "Invalid Credentials"}
276 ) from error
278 # https://github.com/fsspec/filesystem_spec/issues/565
279 self.credentials.apply(self.heads)
281 def apply(self, out):
282 """Insert credential headers in-place to a dictionary"""
283 self.maybe_refresh()
284 if self.credentials is not None:
285 self.credentials.apply(out)
287 def _connect_service(self, fn):
288 # raises exception if the file does not match expectation
289 credentials = service_account.Credentials.from_service_account_file(
290 fn, scopes=[self.scope]
291 )
292 self.credentials = credentials
294 def _connect_anon(self):
295 self.credentials = None
297 def _connect_browser(self):
298 flow = InstalledAppFlow.from_client_config(client_config, [self.scope])
299 credentials = flow.run_local_server()
300 self.tokens[(self.project, self.access)] = credentials
301 self._save_tokens()
302 self.credentials = credentials
304 def connect(self, method=None):
305 """
306 Establish session token. A new token will be requested if the current
307 one is within 100s of expiry.
309 Parameters
310 ----------
311 method: str (google_default|cache|cloud|token|anon|browser) or None
312 Type of authorisation to implement - calls `_connect_*` methods.
313 If None, will try sequence of methods.
314 """
315 if method not in [
316 "google_default",
317 "cache",
318 "cloud",
319 "token",
320 "anon",
321 None,
322 ]:
323 self._connect_token(method)
324 elif method is None:
325 methods = ["google_default", "cache", "cloud", "anon"]
326 if os.environ.get("NO_GCE_CHECK") == "true":
327 methods.remove("cloud")
328 for meth in methods:
329 try:
330 self.connect(method=meth)
331 logger.debug("Connected with method %s", meth)
332 break
333 except (google.auth.exceptions.GoogleAuthError, ValueError) as e:
334 # GoogleAuthError is the base class for all authentication
335 # errors
336 logger.debug(
337 'Connection with method "%s" failed' % meth, exc_info=e
338 )
339 # Reset credentials if they were set but the authentication failed
340 # (reverts to 'anon' behavior)
341 self.credentials = None
342 else:
343 # Since the 'anon' connection method should always succeed,
344 # getting here means something has gone terribly wrong.
345 raise RuntimeError("All connection methods have failed!")
346 else:
347 self.__getattribute__("_connect_" + method)()
348 self.method = method