1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import abc 8 9import six 10 11from cryptography import utils 12from cryptography.exceptions import ( 13 AlreadyFinalized, AlreadyUpdated, NotYetFinalized, UnsupportedAlgorithm, 14 _Reasons 15) 16from cryptography.hazmat.backends.interfaces import CipherBackend 17from cryptography.hazmat.primitives.ciphers import modes 18 19 20@six.add_metaclass(abc.ABCMeta) 21class CipherAlgorithm(object): 22 @abc.abstractproperty 23 def name(self): 24 """ 25 A string naming this mode (e.g. "AES", "Camellia"). 26 """ 27 28 @abc.abstractproperty 29 def key_size(self): 30 """ 31 The size of the key being used as an integer in bits (e.g. 128, 256). 32 """ 33 34 35@six.add_metaclass(abc.ABCMeta) 36class BlockCipherAlgorithm(object): 37 @abc.abstractproperty 38 def block_size(self): 39 """ 40 The size of a block as an integer in bits (e.g. 64, 128). 41 """ 42 43 44@six.add_metaclass(abc.ABCMeta) 45class CipherContext(object): 46 @abc.abstractmethod 47 def update(self, data): 48 """ 49 Processes the provided bytes through the cipher and returns the results 50 as bytes. 51 """ 52 53 @abc.abstractmethod 54 def update_into(self, data, buf): 55 """ 56 Processes the provided bytes and writes the resulting data into the 57 provided buffer. Returns the number of bytes written. 58 """ 59 60 @abc.abstractmethod 61 def finalize(self): 62 """ 63 Returns the results of processing the final block as bytes. 64 """ 65 66 67@six.add_metaclass(abc.ABCMeta) 68class AEADCipherContext(object): 69 @abc.abstractmethod 70 def authenticate_additional_data(self, data): 71 """ 72 Authenticates the provided bytes. 73 """ 74 75 76@six.add_metaclass(abc.ABCMeta) 77class AEADDecryptionContext(object): 78 @abc.abstractmethod 79 def finalize_with_tag(self, tag): 80 """ 81 Returns the results of processing the final block as bytes and allows 82 delayed passing of the authentication tag. 83 """ 84 85 86@six.add_metaclass(abc.ABCMeta) 87class AEADEncryptionContext(object): 88 @abc.abstractproperty 89 def tag(self): 90 """ 91 Returns tag bytes. This is only available after encryption is 92 finalized. 93 """ 94 95 96class Cipher(object): 97 def __init__(self, algorithm, mode, backend): 98 if not isinstance(backend, CipherBackend): 99 raise UnsupportedAlgorithm( 100 "Backend object does not implement CipherBackend.", 101 _Reasons.BACKEND_MISSING_INTERFACE 102 ) 103 104 if not isinstance(algorithm, CipherAlgorithm): 105 raise TypeError("Expected interface of CipherAlgorithm.") 106 107 if mode is not None: 108 mode.validate_for_algorithm(algorithm) 109 110 self.algorithm = algorithm 111 self.mode = mode 112 self._backend = backend 113 114 def encryptor(self): 115 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 116 if self.mode.tag is not None: 117 raise ValueError( 118 "Authentication tag must be None when encrypting." 119 ) 120 ctx = self._backend.create_symmetric_encryption_ctx( 121 self.algorithm, self.mode 122 ) 123 return self._wrap_ctx(ctx, encrypt=True) 124 125 def decryptor(self): 126 ctx = self._backend.create_symmetric_decryption_ctx( 127 self.algorithm, self.mode 128 ) 129 return self._wrap_ctx(ctx, encrypt=False) 130 131 def _wrap_ctx(self, ctx, encrypt): 132 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 133 if encrypt: 134 return _AEADEncryptionContext(ctx) 135 else: 136 return _AEADCipherContext(ctx) 137 else: 138 return _CipherContext(ctx) 139 140 141@utils.register_interface(CipherContext) 142class _CipherContext(object): 143 def __init__(self, ctx): 144 self._ctx = ctx 145 146 def update(self, data): 147 if self._ctx is None: 148 raise AlreadyFinalized("Context was already finalized.") 149 return self._ctx.update(data) 150 151 def update_into(self, data, buf): 152 if self._ctx is None: 153 raise AlreadyFinalized("Context was already finalized.") 154 return self._ctx.update_into(data, buf) 155 156 def finalize(self): 157 if self._ctx is None: 158 raise AlreadyFinalized("Context was already finalized.") 159 data = self._ctx.finalize() 160 self._ctx = None 161 return data 162 163 164@utils.register_interface(AEADCipherContext) 165@utils.register_interface(CipherContext) 166@utils.register_interface(AEADDecryptionContext) 167class _AEADCipherContext(object): 168 def __init__(self, ctx): 169 self._ctx = ctx 170 self._bytes_processed = 0 171 self._aad_bytes_processed = 0 172 self._tag = None 173 self._updated = False 174 175 def _check_limit(self, data_size): 176 if self._ctx is None: 177 raise AlreadyFinalized("Context was already finalized.") 178 self._updated = True 179 self._bytes_processed += data_size 180 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: 181 raise ValueError( 182 "{0} has a maximum encrypted byte limit of {1}".format( 183 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES 184 ) 185 ) 186 187 def update(self, data): 188 self._check_limit(len(data)) 189 return self._ctx.update(data) 190 191 def update_into(self, data, buf): 192 self._check_limit(len(data)) 193 return self._ctx.update_into(data, buf) 194 195 def finalize(self): 196 if self._ctx is None: 197 raise AlreadyFinalized("Context was already finalized.") 198 data = self._ctx.finalize() 199 self._tag = self._ctx.tag 200 self._ctx = None 201 return data 202 203 def finalize_with_tag(self, tag): 204 if self._ctx is None: 205 raise AlreadyFinalized("Context was already finalized.") 206 data = self._ctx.finalize_with_tag(tag) 207 self._tag = self._ctx.tag 208 self._ctx = None 209 return data 210 211 def authenticate_additional_data(self, data): 212 if self._ctx is None: 213 raise AlreadyFinalized("Context was already finalized.") 214 if self._updated: 215 raise AlreadyUpdated("Update has been called on this context.") 216 217 self._aad_bytes_processed += len(data) 218 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: 219 raise ValueError( 220 "{0} has a maximum AAD byte limit of {1}".format( 221 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES 222 ) 223 ) 224 225 self._ctx.authenticate_additional_data(data) 226 227 228@utils.register_interface(AEADEncryptionContext) 229class _AEADEncryptionContext(_AEADCipherContext): 230 @property 231 def tag(self): 232 if self._ctx is not None: 233 raise NotYetFinalized("You must finalize encryption before " 234 "getting the tag.") 235 return self._tag 236