Coverage for gcsfs/credentials.py: 79%

170 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1import json 

2import logging 

3import os 

4import pickle 

5import textwrap 

6import threading 

7import warnings 

8from datetime import datetime, timezone 

9 

10import google.auth as gauth 

11import google.auth.compute_engine 

12import google.auth.credentials 

13import google.auth.exceptions 

14import requests 

15from google.auth.transport.requests import Request 

16from google.oauth2 import service_account 

17from google.oauth2.credentials import Credentials 

18from google_auth_oauthlib.flow import InstalledAppFlow 

19 

20from gcsfs.retry import HttpError, NonRetryableError 

21 

22logger = logging.getLogger("gcsfs.credentials") 

23 

24tfile = os.path.join(os.path.expanduser("~"), ".gcs_tokens") 

25 

26not_secret = { 

27 "client_id": "586241054156-9kst7ltfj66svc342pcn43vp6ta3idin" 

28 ".apps.googleusercontent.com", 

29 "client_secret": "xto0LIFYX35mmHF9T1R2QBqT", 

30} 

31 

32client_config = { 

33 "installed": { 

34 "client_id": not_secret["client_id"], 

35 "client_secret": not_secret["client_secret"], 

36 "auth_uri": "https://accounts.google.com/o/oauth2/auth", 

37 "token_uri": "https://accounts.google.com/o/oauth2/token", 

38 } 

39} 

40 

41TOKEN_INFO_TIMEOUT_SECONDS = 10 

42LOCAL_REFRESH_BUFFER = 300 # Greater than google.auth._helpers.REFRESH_THRESHOLD 

43 

44 

45def _get_creds_from_raw_token(token): 

46 # Default to True. Only disable if user explicitly says 'false', '0', or 'off'. 

47 env_val = os.environ.get("FETCH_RAW_TOKEN_EXPIRY", "true").lower() 

48 should_fetch_expiry = env_val not in ("false", "0", "off", "no") 

49 

50 if should_fetch_expiry: 

51 response = requests.get( 

52 "https://oauth2.googleapis.com/tokeninfo", 

53 params={"access_token": token}, 

54 timeout=TOKEN_INFO_TIMEOUT_SECONDS, 

55 ) 

56 

57 if response.status_code == 400: 

58 # Token is likely expired or invalid format 

59 raise ValueError("Provided token is either not valid, or expired.") 

60 

61 response.raise_for_status() 

62 expiry = datetime.utcfromtimestamp(float(response.json()["exp"])) 

63 

64 time_remaining = max( 

65 0, 

66 ( 

67 expiry.replace(tzinfo=timezone.utc) - datetime.now(timezone.utc) 

68 ).total_seconds(), 

69 ) 

70 if time_remaining <= LOCAL_REFRESH_BUFFER: 

71 raise ValueError( 

72 f"The provided raw token expires in {time_remaining} seconds, " 

73 f"which is less than the safety buffer ({LOCAL_REFRESH_BUFFER}). " 

74 "This may cause immediate authentication failures. " 

75 "To bypass this check and safety buffer, you can set the environment " 

76 "variable FETCH_RAW_TOKEN_EXPIRY=false (expiry will be unknown)." 

77 ) 

78 else: 

79 expiry = None 

80 

81 return Credentials(token, expiry=expiry) 

82 

83 

84class GoogleCredentials: 

85 def __init__(self, project, access, token, check_credentials=None, on_google=True): 

86 self.scope = "https://www.googleapis.com/auth/devstorage." + access 

87 self.project = project 

88 self.access = access 

89 self.heads = {} 

90 

91 self.credentials = None 

92 self.method = None 

93 self.lock = threading.Lock() 

94 self.token = token 

95 self.on_google = on_google 

96 self.connect(method=token) 

97 

98 if check_credentials: 

99 warnings.warn( 

100 "The `check_credentials` argument is deprecated and will be removed in a future release.", 

101 DeprecationWarning, 

102 ) 

103 

104 @classmethod 

105 def load_tokens(cls): 

106 """Get "browser" tokens from disc""" 

107 try: 

108 with open(tfile, "rb") as f: 

109 tokens = pickle.load(f) 

110 except Exception: 

111 tokens = {} 

112 GoogleCredentials.tokens = tokens 

113 

114 @staticmethod 

115 def _save_tokens(): 

116 try: 

117 with open(tfile, "wb") as f: 

118 pickle.dump(GoogleCredentials.tokens, f, 2) 

119 except Exception as e: 

120 warnings.warn("Saving token cache failed: " + str(e)) 

121 

122 def _connect_google_default(self): 

123 with requests.Session() as session: 

124 req = Request(session) 

125 credentials, project = gauth.default(scopes=[self.scope], request=req) 

126 

127 msg = textwrap.dedent( 

128 """\ 

129 User-provided project '{}' does not match the google default project '{}'. Either 

130 

131 1. Accept the google-default project by not passing a `project` to GCSFileSystem 

132 2. Configure the default project to match the user-provided project (gcloud config set project) 

133 3. Use an authorization method other than 'google_default' by providing 'token=...' 

134 """ 

135 ) 

136 if self.project and self.project != project: 

137 raise ValueError(msg.format(self.project, project)) 

138 self.project = project 

139 self.credentials = credentials 

140 

141 def _connect_cloud(self): 

142 if not self.on_google: 

143 raise ValueError 

144 self.credentials = gauth.compute_engine.Credentials() 

145 try: 

146 with requests.Session() as session: 

147 req = Request(session) 

148 self.credentials.refresh(req) 

149 except gauth.exceptions.RefreshError as error: 

150 raise ValueError("Invalid gcloud credentials") from error 

151 

152 def _connect_cache(self): 

153 if len(self.tokens) == 0: 

154 raise ValueError("No cached tokens") 

155 

156 project, access = self.project, self.access 

157 if (project, access) in self.tokens: 

158 credentials = self.tokens[(project, access)] 

159 self.credentials = credentials 

160 

161 def _dict_to_credentials(self, token): 

162 """ 

163 Convert old dict-style token. 

164 

165 Does not preserve access token itself, assumes refresh required. 

166 """ 

167 try: 

168 token = service_account.Credentials.from_service_account_info( 

169 token, scopes=[self.scope] 

170 ) 

171 except: # noqa: E722 

172 # TODO: catch specific exceptions 

173 # According https://github.com/googleapis/python-cloud-core/blob/master/google/cloud/client.py 

174 # Scopes required for authenticating with a service. User authentication fails 

175 # with invalid_scope if scope is specified. 

176 token = Credentials( 

177 None, 

178 refresh_token=token["refresh_token"], 

179 client_secret=token["client_secret"], 

180 client_id=token["client_id"], 

181 token_uri="https://oauth2.googleapis.com/token", 

182 ) 

183 return token 

184 

185 def _connect_token(self, token): 

186 """ 

187 Connect using a concrete token 

188 

189 Parameters 

190 ---------- 

191 token: str, dict or Credentials 

192 If a str and a valid file name, try to load as a Service file, or next as a JSON; 

193 if not a valid file name, assume it's a valid raw (non-renewable/session) token, and pass to Credentials. If 

194 dict, try to interpret as credentials; if Credentials, use directly. 

195 """ 

196 if isinstance(token, str): 

197 if os.path.exists(token): 

198 try: 

199 # is this a "service" token? 

200 self._connect_service(token) 

201 return 

202 except: # noqa: E722 

203 # TODO: catch specific exceptions 

204 # some other kind of token file 

205 # will raise exception if is not json 

206 with open(token) as data: 

207 token = json.load(data) 

208 else: 

209 token = _get_creds_from_raw_token(token) 

210 if isinstance(token, dict): 

211 credentials = self._dict_to_credentials(token) 

212 elif isinstance(token, google.auth.credentials.Credentials): 

213 credentials = token 

214 else: 

215 raise ValueError("Token format not understood") 

216 self.credentials = credentials 

217 if self.credentials.valid: 

218 self.credentials.apply(self.heads) 

219 

220 def _credentials_valid(self, refresh_buffer): 

221 return ( 

222 self.credentials.valid 

223 # In addition to checking current validity, we ensure that there is 

224 # not a near-future expiry to avoid errors when expiration hits. 

225 and ( 

226 ( 

227 self.credentials.expiry 

228 and ( 

229 self.credentials.expiry.replace(tzinfo=timezone.utc) 

230 - datetime.now(timezone.utc) 

231 ).total_seconds() 

232 > refresh_buffer 

233 ) 

234 or not self.credentials.expiry 

235 ) 

236 ) 

237 

238 def maybe_refresh(self, refresh_buffer=LOCAL_REFRESH_BUFFER): 

239 """ 

240 Check and refresh credentials if needed 

241 """ 

242 if self.credentials is None: 

243 return # anon 

244 

245 if self._credentials_valid(refresh_buffer): 

246 return # still good, with buffer 

247 

248 with requests.Session() as session: 

249 req = Request(session) 

250 with self.lock: 

251 if self._credentials_valid(refresh_buffer): 

252 return # repeat check to avoid race conditions 

253 

254 logger.debug("GCS refresh") 

255 try: 

256 self.credentials.refresh(req) 

257 except gauth.exceptions.RefreshError as error: 

258 # There may be scenarios where this error is raised from the client side due 

259 # to missing necessary attributes to refresh the token, For instance 

260 # https://github.com/googleapis/google-auth-library-python/blob/main/google/oauth2/_credentials_async.py#L51 

261 # In such cases, the request gets retried 

262 # with backoff strategy, which can be avoided. 

263 

264 # Check for client side errors (if any) 

265 if ( 

266 "credentials do not contain the necessary fields need to refresh" 

267 in str(error) 

268 ): 

269 raise NonRetryableError( 

270 "Got error while refreshing credentials." 

271 ) from error 

272 

273 # Re-raise as HttpError with a 401 code and the expected message 

274 raise HttpError( 

275 {"code": 401, "message": "Invalid Credentials"} 

276 ) from error 

277 

278 # https://github.com/fsspec/filesystem_spec/issues/565 

279 self.credentials.apply(self.heads) 

280 

281 def apply(self, out): 

282 """Insert credential headers in-place to a dictionary""" 

283 self.maybe_refresh() 

284 if self.credentials is not None: 

285 self.credentials.apply(out) 

286 

287 def _connect_service(self, fn): 

288 # raises exception if the file does not match expectation 

289 credentials = service_account.Credentials.from_service_account_file( 

290 fn, scopes=[self.scope] 

291 ) 

292 self.credentials = credentials 

293 

294 def _connect_anon(self): 

295 self.credentials = None 

296 

297 def _connect_browser(self): 

298 flow = InstalledAppFlow.from_client_config(client_config, [self.scope]) 

299 credentials = flow.run_local_server() 

300 self.tokens[(self.project, self.access)] = credentials 

301 self._save_tokens() 

302 self.credentials = credentials 

303 

304 def connect(self, method=None): 

305 """ 

306 Establish session token. A new token will be requested if the current 

307 one is within 100s of expiry. 

308 

309 Parameters 

310 ---------- 

311 method: str (google_default|cache|cloud|token|anon|browser) or None 

312 Type of authorisation to implement - calls `_connect_*` methods. 

313 If None, will try sequence of methods. 

314 """ 

315 if method not in [ 

316 "google_default", 

317 "cache", 

318 "cloud", 

319 "token", 

320 "anon", 

321 None, 

322 ]: 

323 self._connect_token(method) 

324 elif method is None: 

325 methods = ["google_default", "cache", "cloud", "anon"] 

326 if os.environ.get("NO_GCE_CHECK") == "true": 

327 methods.remove("cloud") 

328 for meth in methods: 

329 try: 

330 self.connect(method=meth) 

331 logger.debug("Connected with method %s", meth) 

332 break 

333 except (google.auth.exceptions.GoogleAuthError, ValueError) as e: 

334 # GoogleAuthError is the base class for all authentication 

335 # errors 

336 logger.debug( 

337 'Connection with method "%s" failed' % meth, exc_info=e 

338 ) 

339 # Reset credentials if they were set but the authentication failed 

340 # (reverts to 'anon' behavior) 

341 self.credentials = None 

342 else: 

343 # Since the 'anon' connection method should always succeed, 

344 # getting here means something has gone terribly wrong. 

345 raise RuntimeError("All connection methods have failed!") 

346 else: 

347 self.__getattribute__("_connect_" + method)() 

348 self.method = method