Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import abc 

7import typing 

8 

9from cryptography import utils 

10from cryptography.exceptions import ( 

11 AlreadyFinalized, 

12 AlreadyUpdated, 

13 NotYetFinalized, 

14 UnsupportedAlgorithm, 

15 _Reasons, 

16) 

17from cryptography.hazmat.backends import _get_backend 

18from cryptography.hazmat.backends.interfaces import CipherBackend 

19from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm 

20from cryptography.hazmat.primitives.ciphers import modes 

21 

22 

23class BlockCipherAlgorithm(metaclass=abc.ABCMeta): 

24 @abc.abstractproperty 

25 def block_size(self) -> int: 

26 """ 

27 The size of a block as an integer in bits (e.g. 64, 128). 

28 """ 

29 

30 

31class CipherContext(metaclass=abc.ABCMeta): 

32 @abc.abstractmethod 

33 def update(self, data: bytes) -> bytes: 

34 """ 

35 Processes the provided bytes through the cipher and returns the results 

36 as bytes. 

37 """ 

38 

39 @abc.abstractmethod 

40 def update_into(self, data: bytes, buf) -> int: 

41 """ 

42 Processes the provided bytes and writes the resulting data into the 

43 provided buffer. Returns the number of bytes written. 

44 """ 

45 

46 @abc.abstractmethod 

47 def finalize(self) -> bytes: 

48 """ 

49 Returns the results of processing the final block as bytes. 

50 """ 

51 

52 

53class AEADCipherContext(metaclass=abc.ABCMeta): 

54 @abc.abstractmethod 

55 def authenticate_additional_data(self, data: bytes) -> None: 

56 """ 

57 Authenticates the provided bytes. 

58 """ 

59 

60 

61class AEADDecryptionContext(metaclass=abc.ABCMeta): 

62 @abc.abstractmethod 

63 def finalize_with_tag(self, tag: bytes) -> bytes: 

64 """ 

65 Returns the results of processing the final block as bytes and allows 

66 delayed passing of the authentication tag. 

67 """ 

68 

69 

70class AEADEncryptionContext(metaclass=abc.ABCMeta): 

71 @abc.abstractproperty 

72 def tag(self) -> bytes: 

73 """ 

74 Returns tag bytes. This is only available after encryption is 

75 finalized. 

76 """ 

77 

78 

79class Cipher(object): 

80 def __init__( 

81 self, 

82 algorithm: CipherAlgorithm, 

83 mode: typing.Optional[modes.Mode], 

84 backend=None, 

85 ): 

86 backend = _get_backend(backend) 

87 if not isinstance(backend, CipherBackend): 

88 raise UnsupportedAlgorithm( 

89 "Backend object does not implement CipherBackend.", 

90 _Reasons.BACKEND_MISSING_INTERFACE, 

91 ) 

92 

93 if not isinstance(algorithm, CipherAlgorithm): 

94 raise TypeError("Expected interface of CipherAlgorithm.") 

95 

96 if mode is not None: 

97 mode.validate_for_algorithm(algorithm) 

98 

99 self.algorithm = algorithm 

100 self.mode = mode 

101 self._backend = backend 

102 

103 def encryptor(self): 

104 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

105 if self.mode.tag is not None: 

106 raise ValueError( 

107 "Authentication tag must be None when encrypting." 

108 ) 

109 ctx = self._backend.create_symmetric_encryption_ctx( 

110 self.algorithm, self.mode 

111 ) 

112 return self._wrap_ctx(ctx, encrypt=True) 

113 

114 def decryptor(self): 

115 ctx = self._backend.create_symmetric_decryption_ctx( 

116 self.algorithm, self.mode 

117 ) 

118 return self._wrap_ctx(ctx, encrypt=False) 

119 

120 def _wrap_ctx(self, ctx, encrypt): 

121 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

122 if encrypt: 

123 return _AEADEncryptionContext(ctx) 

124 else: 

125 return _AEADCipherContext(ctx) 

126 else: 

127 return _CipherContext(ctx) 

128 

129 

130@utils.register_interface(CipherContext) 

131class _CipherContext(object): 

132 def __init__(self, ctx): 

133 self._ctx = ctx 

134 

135 def update(self, data: bytes) -> bytes: 

136 if self._ctx is None: 

137 raise AlreadyFinalized("Context was already finalized.") 

138 return self._ctx.update(data) 

139 

140 def update_into(self, data: bytes, buf) -> int: 

141 if self._ctx is None: 

142 raise AlreadyFinalized("Context was already finalized.") 

143 return self._ctx.update_into(data, buf) 

144 

145 def finalize(self) -> bytes: 

146 if self._ctx is None: 

147 raise AlreadyFinalized("Context was already finalized.") 

148 data = self._ctx.finalize() 

149 self._ctx = None 

150 return data 

151 

152 

153@utils.register_interface(AEADCipherContext) 

154@utils.register_interface(CipherContext) 

155@utils.register_interface(AEADDecryptionContext) 

156class _AEADCipherContext(object): 

157 def __init__(self, ctx): 

158 self._ctx = ctx 

159 self._bytes_processed = 0 

160 self._aad_bytes_processed = 0 

161 self._tag = None 

162 self._updated = False 

163 

164 def _check_limit(self, data_size: int): 

165 if self._ctx is None: 

166 raise AlreadyFinalized("Context was already finalized.") 

167 self._updated = True 

168 self._bytes_processed += data_size 

169 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: 

170 raise ValueError( 

171 "{} has a maximum encrypted byte limit of {}".format( 

172 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES 

173 ) 

174 ) 

175 

176 def update(self, data: bytes) -> bytes: 

177 self._check_limit(len(data)) 

178 return self._ctx.update(data) 

179 

180 def update_into(self, data: bytes, buf) -> int: 

181 self._check_limit(len(data)) 

182 return self._ctx.update_into(data, buf) 

183 

184 def finalize(self) -> bytes: 

185 if self._ctx is None: 

186 raise AlreadyFinalized("Context was already finalized.") 

187 data = self._ctx.finalize() 

188 self._tag = self._ctx.tag 

189 self._ctx = None 

190 return data 

191 

192 def finalize_with_tag(self, tag: bytes) -> bytes: 

193 if self._ctx is None: 

194 raise AlreadyFinalized("Context was already finalized.") 

195 data = self._ctx.finalize_with_tag(tag) 

196 self._tag = self._ctx.tag 

197 self._ctx = None 

198 return data 

199 

200 def authenticate_additional_data(self, data: bytes) -> None: 

201 if self._ctx is None: 

202 raise AlreadyFinalized("Context was already finalized.") 

203 if self._updated: 

204 raise AlreadyUpdated("Update has been called on this context.") 

205 

206 self._aad_bytes_processed += len(data) 

207 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: 

208 raise ValueError( 

209 "{} has a maximum AAD byte limit of {}".format( 

210 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES 

211 ) 

212 ) 

213 

214 self._ctx.authenticate_additional_data(data) 

215 

216 

217@utils.register_interface(AEADEncryptionContext) 

218class _AEADEncryptionContext(_AEADCipherContext): 

219 @property 

220 def tag(self) -> bytes: 

221 if self._ctx is not None: 

222 raise NotYetFinalized( 

223 "You must finalize encryption before " "getting the tag." 

224 ) 

225 assert self._tag is not None 

226 return self._tag