Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/cryptography/hazmat/primitives/ciphers/base.py : 42%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import abc
7import typing
9from cryptography import utils
10from cryptography.exceptions import (
11 AlreadyFinalized,
12 AlreadyUpdated,
13 NotYetFinalized,
14 UnsupportedAlgorithm,
15 _Reasons,
16)
17from cryptography.hazmat.backends import _get_backend
18from cryptography.hazmat.backends.interfaces import CipherBackend
19from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
20from cryptography.hazmat.primitives.ciphers import modes
23class BlockCipherAlgorithm(metaclass=abc.ABCMeta):
24 @abc.abstractproperty
25 def block_size(self) -> int:
26 """
27 The size of a block as an integer in bits (e.g. 64, 128).
28 """
31class CipherContext(metaclass=abc.ABCMeta):
32 @abc.abstractmethod
33 def update(self, data: bytes) -> bytes:
34 """
35 Processes the provided bytes through the cipher and returns the results
36 as bytes.
37 """
39 @abc.abstractmethod
40 def update_into(self, data: bytes, buf) -> int:
41 """
42 Processes the provided bytes and writes the resulting data into the
43 provided buffer. Returns the number of bytes written.
44 """
46 @abc.abstractmethod
47 def finalize(self) -> bytes:
48 """
49 Returns the results of processing the final block as bytes.
50 """
53class AEADCipherContext(metaclass=abc.ABCMeta):
54 @abc.abstractmethod
55 def authenticate_additional_data(self, data: bytes) -> None:
56 """
57 Authenticates the provided bytes.
58 """
61class AEADDecryptionContext(metaclass=abc.ABCMeta):
62 @abc.abstractmethod
63 def finalize_with_tag(self, tag: bytes) -> bytes:
64 """
65 Returns the results of processing the final block as bytes and allows
66 delayed passing of the authentication tag.
67 """
70class AEADEncryptionContext(metaclass=abc.ABCMeta):
71 @abc.abstractproperty
72 def tag(self) -> bytes:
73 """
74 Returns tag bytes. This is only available after encryption is
75 finalized.
76 """
79class Cipher(object):
80 def __init__(
81 self,
82 algorithm: CipherAlgorithm,
83 mode: typing.Optional[modes.Mode],
84 backend=None,
85 ):
86 backend = _get_backend(backend)
87 if not isinstance(backend, CipherBackend):
88 raise UnsupportedAlgorithm(
89 "Backend object does not implement CipherBackend.",
90 _Reasons.BACKEND_MISSING_INTERFACE,
91 )
93 if not isinstance(algorithm, CipherAlgorithm):
94 raise TypeError("Expected interface of CipherAlgorithm.")
96 if mode is not None:
97 mode.validate_for_algorithm(algorithm)
99 self.algorithm = algorithm
100 self.mode = mode
101 self._backend = backend
103 def encryptor(self):
104 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
105 if self.mode.tag is not None:
106 raise ValueError(
107 "Authentication tag must be None when encrypting."
108 )
109 ctx = self._backend.create_symmetric_encryption_ctx(
110 self.algorithm, self.mode
111 )
112 return self._wrap_ctx(ctx, encrypt=True)
114 def decryptor(self):
115 ctx = self._backend.create_symmetric_decryption_ctx(
116 self.algorithm, self.mode
117 )
118 return self._wrap_ctx(ctx, encrypt=False)
120 def _wrap_ctx(self, ctx, encrypt):
121 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
122 if encrypt:
123 return _AEADEncryptionContext(ctx)
124 else:
125 return _AEADCipherContext(ctx)
126 else:
127 return _CipherContext(ctx)
130@utils.register_interface(CipherContext)
131class _CipherContext(object):
132 def __init__(self, ctx):
133 self._ctx = ctx
135 def update(self, data: bytes) -> bytes:
136 if self._ctx is None:
137 raise AlreadyFinalized("Context was already finalized.")
138 return self._ctx.update(data)
140 def update_into(self, data: bytes, buf) -> int:
141 if self._ctx is None:
142 raise AlreadyFinalized("Context was already finalized.")
143 return self._ctx.update_into(data, buf)
145 def finalize(self) -> bytes:
146 if self._ctx is None:
147 raise AlreadyFinalized("Context was already finalized.")
148 data = self._ctx.finalize()
149 self._ctx = None
150 return data
153@utils.register_interface(AEADCipherContext)
154@utils.register_interface(CipherContext)
155@utils.register_interface(AEADDecryptionContext)
156class _AEADCipherContext(object):
157 def __init__(self, ctx):
158 self._ctx = ctx
159 self._bytes_processed = 0
160 self._aad_bytes_processed = 0
161 self._tag = None
162 self._updated = False
164 def _check_limit(self, data_size: int):
165 if self._ctx is None:
166 raise AlreadyFinalized("Context was already finalized.")
167 self._updated = True
168 self._bytes_processed += data_size
169 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
170 raise ValueError(
171 "{} has a maximum encrypted byte limit of {}".format(
172 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
173 )
174 )
176 def update(self, data: bytes) -> bytes:
177 self._check_limit(len(data))
178 return self._ctx.update(data)
180 def update_into(self, data: bytes, buf) -> int:
181 self._check_limit(len(data))
182 return self._ctx.update_into(data, buf)
184 def finalize(self) -> bytes:
185 if self._ctx is None:
186 raise AlreadyFinalized("Context was already finalized.")
187 data = self._ctx.finalize()
188 self._tag = self._ctx.tag
189 self._ctx = None
190 return data
192 def finalize_with_tag(self, tag: bytes) -> bytes:
193 if self._ctx is None:
194 raise AlreadyFinalized("Context was already finalized.")
195 data = self._ctx.finalize_with_tag(tag)
196 self._tag = self._ctx.tag
197 self._ctx = None
198 return data
200 def authenticate_additional_data(self, data: bytes) -> None:
201 if self._ctx is None:
202 raise AlreadyFinalized("Context was already finalized.")
203 if self._updated:
204 raise AlreadyUpdated("Update has been called on this context.")
206 self._aad_bytes_processed += len(data)
207 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
208 raise ValueError(
209 "{} has a maximum AAD byte limit of {}".format(
210 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
211 )
212 )
214 self._ctx.authenticate_additional_data(data)
217@utils.register_interface(AEADEncryptionContext)
218class _AEADEncryptionContext(_AEADCipherContext):
219 @property
220 def tag(self) -> bytes:
221 if self._ctx is not None:
222 raise NotYetFinalized(
223 "You must finalize encryption before " "getting the tag."
224 )
225 assert self._tag is not None
226 return self._tag