Coverage for gcsfs/checkers.py: 91%
77 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1import base64
2from base64 import b64encode
3from hashlib import md5
5from .retry import ChecksumError
7try:
8 import crcmod
9except ImportError:
10 crcmod = None
13class ConsistencyChecker:
14 def __init__(self):
15 pass
17 def update(self, data: bytes):
18 pass
20 def validate_json_response(self, gcs_object):
21 pass
23 def validate_headers(self, headers):
24 pass
26 def validate_http_response(self, r):
27 pass
30class MD5Checker(ConsistencyChecker):
31 def __init__(self):
32 self.md = md5()
34 def update(self, data):
35 self.md.update(data)
37 def validate_json_response(self, gcs_object):
38 mdback = gcs_object["md5Hash"]
39 if b64encode(self.md.digest()) != mdback.encode():
40 raise ChecksumError("MD5 checksum failed")
42 def validate_headers(self, headers):
43 if headers is not None and "X-Goog-Hash" in headers:
45 dig = [
46 bit.split("=")[1]
47 for bit in headers["X-Goog-Hash"].split(",")
48 if bit and bit.strip().startswith("md5=")
49 ]
50 if dig:
51 if b64encode(self.md.digest()).decode().rstrip("=") != dig[0]:
52 raise ChecksumError("Checksum failure")
53 else:
54 raise NotImplementedError(
55 "No md5 checksum available to do consistency check. GCS does "
56 "not provide md5 sums for composite objects."
57 )
59 def validate_http_response(self, r):
60 return self.validate_headers(r.headers)
63class SizeChecker(ConsistencyChecker):
64 def __init__(self):
65 self.size = 0
67 def update(self, data: bytes):
68 self.size += len(data)
70 def validate_json_response(self, gcs_object):
71 assert int(gcs_object["size"]) == self.size, "Size mismatch"
73 def validate_http_response(self, r):
74 assert r.content_length == self.size
77class Crc32cChecker(ConsistencyChecker):
78 def __init__(self):
79 self.crc32c = crcmod.Crc(0x11EDC6F41, initCrc=0, xorOut=0xFFFFFFFF)
81 def update(self, data: bytes):
82 self.crc32c.update(data)
84 def validate_json_response(self, gcs_object):
85 # docs for gcs_object: https://cloud.google.com/storage/docs/json_api/v1/objects
86 digest = self.crc32c.digest()
87 digest_b64 = base64.b64encode(digest).decode()
88 expected = gcs_object["crc32c"]
90 if digest_b64 != expected:
91 raise ChecksumError(f'Expected "{expected}". Got "{digest_b64}"')
93 def validate_headers(self, headers):
94 if headers is not None:
95 hasher = headers.get("X-Goog-Hash", "")
96 crc = [h.split("=", 1)[1] for h in hasher.split(",") if "crc32c" in h]
97 if not crc:
98 raise NotImplementedError("No crc32c checksum was provided by google!")
99 if crc[0] != b64encode(self.crc32c.digest()).decode():
100 raise ChecksumError()
102 def validate_http_response(self, r):
103 return self.validate_headers(r.headers)
106def get_consistency_checker(consistency: str | None) -> ConsistencyChecker:
107 if consistency == "size":
108 return SizeChecker()
109 elif consistency == "md5":
110 return MD5Checker()
111 elif consistency == "crc32c":
112 if crcmod is None:
113 raise ImportError(
114 "The python package `crcmod` is required for `consistency='crc32c'`. "
115 "This can be installed with `pip install gcsfs[crc]`"
116 )
117 else:
118 return Crc32cChecker()
119 else:
120 return ConsistencyChecker()