Coverage for gcsfs/checkers.py: 91%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1import base64 

2from base64 import b64encode 

3from hashlib import md5 

4 

5from .retry import ChecksumError 

6 

7try: 

8 import crcmod 

9except ImportError: 

10 crcmod = None 

11 

12 

13class ConsistencyChecker: 

14 def __init__(self): 

15 pass 

16 

17 def update(self, data: bytes): 

18 pass 

19 

20 def validate_json_response(self, gcs_object): 

21 pass 

22 

23 def validate_headers(self, headers): 

24 pass 

25 

26 def validate_http_response(self, r): 

27 pass 

28 

29 

30class MD5Checker(ConsistencyChecker): 

31 def __init__(self): 

32 self.md = md5() 

33 

34 def update(self, data): 

35 self.md.update(data) 

36 

37 def validate_json_response(self, gcs_object): 

38 mdback = gcs_object["md5Hash"] 

39 if b64encode(self.md.digest()) != mdback.encode(): 

40 raise ChecksumError("MD5 checksum failed") 

41 

42 def validate_headers(self, headers): 

43 if headers is not None and "X-Goog-Hash" in headers: 

44 

45 dig = [ 

46 bit.split("=")[1] 

47 for bit in headers["X-Goog-Hash"].split(",") 

48 if bit and bit.strip().startswith("md5=") 

49 ] 

50 if dig: 

51 if b64encode(self.md.digest()).decode().rstrip("=") != dig[0]: 

52 raise ChecksumError("Checksum failure") 

53 else: 

54 raise NotImplementedError( 

55 "No md5 checksum available to do consistency check. GCS does " 

56 "not provide md5 sums for composite objects." 

57 ) 

58 

59 def validate_http_response(self, r): 

60 return self.validate_headers(r.headers) 

61 

62 

63class SizeChecker(ConsistencyChecker): 

64 def __init__(self): 

65 self.size = 0 

66 

67 def update(self, data: bytes): 

68 self.size += len(data) 

69 

70 def validate_json_response(self, gcs_object): 

71 assert int(gcs_object["size"]) == self.size, "Size mismatch" 

72 

73 def validate_http_response(self, r): 

74 assert r.content_length == self.size 

75 

76 

77class Crc32cChecker(ConsistencyChecker): 

78 def __init__(self): 

79 self.crc32c = crcmod.Crc(0x11EDC6F41, initCrc=0, xorOut=0xFFFFFFFF) 

80 

81 def update(self, data: bytes): 

82 self.crc32c.update(data) 

83 

84 def validate_json_response(self, gcs_object): 

85 # docs for gcs_object: https://cloud.google.com/storage/docs/json_api/v1/objects 

86 digest = self.crc32c.digest() 

87 digest_b64 = base64.b64encode(digest).decode() 

88 expected = gcs_object["crc32c"] 

89 

90 if digest_b64 != expected: 

91 raise ChecksumError(f'Expected "{expected}". Got "{digest_b64}"') 

92 

93 def validate_headers(self, headers): 

94 if headers is not None: 

95 hasher = headers.get("X-Goog-Hash", "") 

96 crc = [h.split("=", 1)[1] for h in hasher.split(",") if "crc32c" in h] 

97 if not crc: 

98 raise NotImplementedError("No crc32c checksum was provided by google!") 

99 if crc[0] != b64encode(self.crc32c.digest()).decode(): 

100 raise ChecksumError() 

101 

102 def validate_http_response(self, r): 

103 return self.validate_headers(r.headers) 

104 

105 

106def get_consistency_checker(consistency: str | None) -> ConsistencyChecker: 

107 if consistency == "size": 

108 return SizeChecker() 

109 elif consistency == "md5": 

110 return MD5Checker() 

111 elif consistency == "crc32c": 

112 if crcmod is None: 

113 raise ImportError( 

114 "The python package `crcmod` is required for `consistency='crc32c'`. " 

115 "This can be installed with `pip install gcsfs[crc]`" 

116 ) 

117 else: 

118 return Crc32cChecker() 

119 else: 

120 return ConsistencyChecker()