Coverage for gcsfs/retry.py: 85%

94 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1import asyncio 

2import json 

3import logging 

4import random 

5 

6import aiohttp.client_exceptions 

7import google.auth.exceptions 

8import requests.exceptions 

9from decorator import decorator 

10 

11logger = logging.getLogger("gcsfs") 

12 

13 

14class HttpError(Exception): 

15 """Holds the message and code from cloud errors.""" 

16 

17 def __init__(self, error_response=None): 

18 # Save error_response for potential pickle. 

19 self._error_response = error_response 

20 if error_response: 

21 self.code = error_response.get("code", None) 

22 self.message = error_response.get("message", "") 

23 if self.code: 

24 if isinstance(self.message, bytes): 

25 self.message += (", %s" % self.code).encode() 

26 else: 

27 self.message += ", %s" % self.code 

28 else: 

29 self.message = "" 

30 self.code = None 

31 # Call the base class constructor with the parameters it needs 

32 super().__init__(self.message) 

33 

34 def __reduce__(self): 

35 """This makes the Exception pickleable.""" 

36 

37 # This is basically deconstructing the HttpError when pickled. 

38 return HttpError, (self._error_response,) 

39 

40 

41class ChecksumError(Exception): 

42 """Raised when the md5 hash of the content does not match the header.""" 

43 

44 pass 

45 

46 

47class NonRetryableError(Exception): 

48 """Raised when the underlying error can not be retried, or continued further.""" 

49 

50 pass 

51 

52 

53RETRIABLE_EXCEPTIONS = ( 

54 requests.exceptions.ChunkedEncodingError, 

55 requests.exceptions.ConnectionError, 

56 requests.exceptions.ReadTimeout, 

57 requests.exceptions.Timeout, 

58 requests.exceptions.ProxyError, 

59 requests.exceptions.SSLError, 

60 requests.exceptions.ContentDecodingError, 

61 google.auth.exceptions.RefreshError, 

62 aiohttp.client_exceptions.ClientError, 

63 ChecksumError, 

64) 

65 

66 

67errs = list(range(500, 505)) + [ 

68 # Request Timeout 

69 408, 

70 # Too Many Requests 

71 429, 

72] 

73errs = set(errs + [str(e) for e in errs]) 

74 

75 

76def is_retriable(exception): 

77 """Returns True if this exception is retriable.""" 

78 if isinstance(exception, NonRetryableError): 

79 return False 

80 

81 if isinstance(exception, HttpError): 

82 # Add 401 to retriable errors when it's an auth expiration issue 

83 if exception.code == 401 and "Invalid Credentials" in str(exception.message): 

84 return True 

85 return exception.code in errs 

86 

87 return isinstance(exception, RETRIABLE_EXCEPTIONS) 

88 

89 

90def validate_response(status, content, path, args=None): 

91 """ 

92 Check the requests object r, raise error if it's not ok. 

93 

94 Parameters 

95 ---------- 

96 r: requests response object 

97 path: associated URL path, for error messages 

98 """ 

99 if status >= 400 and status != 499: 

100 # 499 is special "upload was cancelled" status 

101 if args: 

102 from .core import quote 

103 

104 path = path.format(*[quote(p) for p in args]) 

105 if status == 404: 

106 raise FileNotFoundError(path) 

107 

108 error = None 

109 if hasattr(content, "decode"): 

110 content = content.decode() 

111 try: 

112 error = json.loads(content)["error"] 

113 # Sometimes the error message is a string. 

114 if isinstance(error, str): 

115 msg = error 

116 else: 

117 msg = error["message"] 

118 except json.decoder.JSONDecodeError: 

119 msg = content 

120 

121 if status == 403: 

122 raise OSError(f"Forbidden: {path}\n{msg}") 

123 elif status == 412: 

124 raise FileExistsError(path) 

125 elif status == 502: 

126 raise requests.exceptions.ProxyError() 

127 elif "invalid" in str(msg): 

128 raise ValueError(f"Bad Request: {path}\n{msg}") 

129 elif error and not isinstance(error, str): 

130 raise HttpError(error) 

131 elif status: 

132 raise HttpError({"code": status, "message": msg}) # text-like 

133 else: 

134 raise RuntimeError(msg) 

135 

136 

137@decorator 

138async def retry_request(func, retries=6, *args, **kwargs): 

139 for retry in range(retries): 

140 try: 

141 if retry > 0: 

142 await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32)) 

143 return await func(*args, **kwargs) 

144 except ( 

145 HttpError, 

146 requests.exceptions.RequestException, 

147 google.auth.exceptions.GoogleAuthError, 

148 ChecksumError, 

149 aiohttp.client_exceptions.ClientError, 

150 ) as e: 

151 if ( 

152 isinstance(e, HttpError) 

153 and e.code == 400 

154 and "requester pays" in e.message 

155 ): 

156 msg = ( 

157 "Bucket is requester pays. " 

158 "Set `requester_pays=True` when creating the GCSFileSystem." 

159 ) 

160 raise ValueError(msg) from e 

161 # Special test for 404 to avoid retrying the request 

162 if ( 

163 isinstance(e, aiohttp.client_exceptions.ClientResponseError) 

164 and e.status == 404 

165 ): 

166 logger.debug("Request returned 404, no retries.") 

167 raise e 

168 if isinstance(e, HttpError) and e.code == 404: 

169 logger.debug("Request returned 404, no retries.") 

170 raise e 

171 if retry == retries - 1: 

172 logger.exception(f"{func.__name__} out of retries on exception: {e}") 

173 raise e 

174 if is_retriable(e): 

175 logger.debug(f"{func.__name__} retrying after exception: {e}") 

176 continue 

177 logger.exception(f"{func.__name__} non-retriable exception: {e}") 

178 raise e