1## This file is part of Scapy
2## Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
3##               2015, 2016, 2017 Maxence Tury
4## This program is published under a GPLv2 license
5
6"""
7Authenticated Encryption with Associated Data ciphers.
8
9RFC 5288 introduces new ciphersuites for TLS 1.2 which are based on AES in
10Galois/Counter Mode (GCM). RFC 6655 in turn introduces AES_CCM ciphersuites.
11The related AEAD algorithms are defined in RFC 5116. Later on, RFC 7905
12introduced cipher suites based on a ChaCha20-Poly1305 construction.
13"""
14
15from __future__ import absolute_import
16import struct
17
18from scapy.config import conf
19from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip
20from scapy.layers.tls.crypto.ciphers import CipherError
21from scapy.utils import strxor
22import scapy.modules.six as six
23
24if conf.crypto_valid:
25    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
26    from cryptography.hazmat.backends import default_backend
27    from cryptography.exceptions import InvalidTag
28if conf.crypto_valid_advanced:
29    from cryptography.hazmat.primitives.ciphers.aead import (AESCCM,
30                                                             ChaCha20Poly1305)
31
32
33_tls_aead_cipher_algs = {}
34
35class _AEADCipherMetaclass(type):
36    """
37    Cipher classes are automatically registered through this metaclass.
38    Furthermore, their name attribute is extracted from their class name.
39    """
40    def __new__(cls, ciph_name, bases, dct):
41        if not ciph_name.startswith("_AEADCipher"):
42            dct["name"] = ciph_name[7:]     # remove leading "Cipher_"
43        the_class = super(_AEADCipherMetaclass, cls).__new__(cls, ciph_name,
44                                                             bases, dct)
45        if not ciph_name.startswith("_AEADCipher"):
46            _tls_aead_cipher_algs[ciph_name[7:]] = the_class
47        return the_class
48
49
50class AEADTagError(Exception):
51    """
52    Raised when MAC verification fails.
53    """
54    pass
55
56class _AEADCipher(six.with_metaclass(_AEADCipherMetaclass, object)):
57    """
58    The hasattr(self, "pc_cls") tests correspond to the legacy API of the
59    crypto library. With cryptography v2.0, both CCM and GCM should follow
60    the else case.
61
62    Note that the "fixed_iv" in TLS RFCs is called "salt" in the AEAD RFC 5116.
63    """
64    type = "aead"
65    fixed_iv_len = 4
66    nonce_explicit_len = 8
67
68    def __init__(self, key=None, fixed_iv=None, nonce_explicit=None):
69        """
70        'key' and 'fixed_iv' are to be provided as strings, whereas the internal
71        'nonce_explicit' is an integer (it is simpler for incrementation).
72        /!\ The whole 'nonce' may be called IV in certain RFCs.
73        """
74        self.ready = {"key": True, "fixed_iv": True, "nonce_explicit": True}
75        if key is None:
76            self.ready["key"] = False
77            key = b"\0" * self.key_len
78        if fixed_iv is None:
79            self.ready["fixed_iv"] = False
80            fixed_iv = b"\0" * self.fixed_iv_len
81        if nonce_explicit is None:
82            self.ready["nonce_explicit"] = False
83            nonce_explicit = 0
84
85        if isinstance(nonce_explicit, str):
86            nonce_explicit = pkcs_os2ip(nonce_explicit)
87
88        # we use super() in order to avoid any deadlock with __setattr__
89        super(_AEADCipher, self).__setattr__("key", key)
90        super(_AEADCipher, self).__setattr__("fixed_iv", fixed_iv)
91        super(_AEADCipher, self).__setattr__("nonce_explicit", nonce_explicit)
92
93        if hasattr(self, "pc_cls"):
94            self._cipher = Cipher(self.pc_cls(key),
95                                  self.pc_cls_mode(self._get_nonce()),
96                                  backend=default_backend())
97        else:
98            self._cipher = self.cipher_cls(key)
99
100    def __setattr__(self, name, val):
101        if name == "key":
102            if self._cipher is not None:
103                if hasattr(self, "pc_cls"):
104                    self._cipher.algorithm.key = val
105                else:
106                    self._cipher._key = val
107            self.ready["key"] = True
108        elif name == "fixed_iv":
109            self.ready["fixed_iv"] = True
110        elif name == "nonce_explicit":
111            if isinstance(val, str):
112                val = pkcs_os2ip(val)
113            self.ready["nonce_explicit"] = True
114        super(_AEADCipher, self).__setattr__(name, val)
115
116
117    def _get_nonce(self):
118        return (self.fixed_iv +
119                pkcs_i2osp(self.nonce_explicit, self.nonce_explicit_len))
120
121    def _update_nonce_explicit(self):
122        """
123        Increment the explicit nonce while avoiding any overflow.
124        """
125        ne = self.nonce_explicit + 1
126        self.nonce_explicit = ne % 2**(self.nonce_explicit_len*8)
127
128    def auth_encrypt(self, P, A, seq_num=None):
129        """
130        Encrypt the data then prepend the explicit part of the nonce. The
131        authentication tag is directly appended with the most recent crypto
132        API. Additional data may be authenticated without encryption (as A).
133
134        The 'seq_num' should never be used here, it is only a safeguard needed
135        because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py
136        actually is a _AEADCipher_TLS13 (even though others are not).
137        """
138        if False in six.itervalues(self.ready):
139            raise CipherError(P, A)
140
141        if hasattr(self, "pc_cls"):
142            self._cipher.mode._initialization_vector = self._get_nonce()
143            self._cipher.mode._tag = None
144            encryptor = self._cipher.encryptor()
145            encryptor.authenticate_additional_data(A)
146            res = encryptor.update(P) + encryptor.finalize()
147            res += encryptor.tag
148        else:
149            if isinstance(self._cipher, AESCCM):
150                res = self._cipher.encrypt(self._get_nonce(), P, A,
151                                           tag_length=self.tag_len)
152            else:
153                res = self._cipher.encrypt(self._get_nonce(), P, A)
154
155        nonce_explicit = pkcs_i2osp(self.nonce_explicit,
156                                    self.nonce_explicit_len)
157        self._update_nonce_explicit()
158        return nonce_explicit + res
159
160    def auth_decrypt(self, A, C, seq_num=None, add_length=True):
161        """
162        Decrypt the data and authenticate the associated data (i.e. A).
163        If the verification fails, an AEADTagError is raised. It is the user's
164        responsibility to catch it if deemed useful. If we lack the key, we
165        raise a CipherError which contains the encrypted input.
166
167        Note that we add the TLSCiphertext length to A although we're supposed
168        to add the TLSCompressed length. Fortunately, they are the same,
169        but the specifications actually messed up here. :'(
170
171        The 'add_length' switch should always be True for TLS, but we provide
172        it anyway (mostly for test cases, hum).
173
174        The 'seq_num' should never be used here, it is only a safeguard needed
175        because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py
176        actually is a _AEADCipher_TLS13 (even though others are not).
177        """
178        nonce_explicit_str, C, mac = (C[:self.nonce_explicit_len],
179                                      C[self.nonce_explicit_len:-self.tag_len],
180                                      C[-self.tag_len:])
181
182        if False in six.itervalues(self.ready):
183            raise CipherError(nonce_explicit_str, C, mac)
184
185        self.nonce_explicit = pkcs_os2ip(nonce_explicit_str)
186        if add_length:
187            A += struct.pack("!H", len(C))
188
189        if hasattr(self, "pc_cls"):
190            self._cipher.mode._initialization_vector = self._get_nonce()
191            self._cipher.mode._tag = mac
192            decryptor = self._cipher.decryptor()
193            decryptor.authenticate_additional_data(A)
194            P = decryptor.update(C)
195            try:
196                decryptor.finalize()
197            except InvalidTag:
198                raise AEADTagError(nonce_explicit_str, P, mac)
199        else:
200            try:
201                if isinstance(self._cipher, AESCCM):
202                    P = self._cipher.decrypt(self._get_nonce(), C + mac, A,
203                                             tag_length=self.tag_len)
204                else:
205                    P = self._cipher.decrypt(self._get_nonce(), C + mac, A)
206            except InvalidTag:
207                raise AEADTagError(nonce_explicit_str,
208                                     "<unauthenticated data>",
209                                     mac)
210        return nonce_explicit_str, P, mac
211
212    def snapshot(self):
213        c = self.__class__(self.key, self.fixed_iv, self.nonce_explicit)
214        c.ready = self.ready.copy()
215        return c
216
217
218if conf.crypto_valid:
219    class Cipher_AES_128_GCM(_AEADCipher):
220       #XXX use the new AESGCM if available
221       #if conf.crypto_valid_advanced:
222       #    cipher_cls = AESGCM
223       #else:
224        pc_cls = algorithms.AES
225        pc_cls_mode = modes.GCM
226        key_len = 16
227        tag_len = 16
228
229    class Cipher_AES_256_GCM(Cipher_AES_128_GCM):
230        key_len = 32
231
232
233if conf.crypto_valid_advanced:
234    class Cipher_AES_128_CCM(_AEADCipher):
235        cipher_cls = AESCCM
236        key_len = 16
237        tag_len = 16
238
239    class Cipher_AES_256_CCM(Cipher_AES_128_CCM):
240        key_len = 32
241
242    class Cipher_AES_128_CCM_8(Cipher_AES_128_CCM):
243        tag_len = 8
244
245    class Cipher_AES_256_CCM_8(Cipher_AES_128_CCM_8):
246        key_len = 32
247
248
249class _AEADCipher_TLS13(six.with_metaclass(_AEADCipherMetaclass, object)):
250    """
251    The hasattr(self, "pc_cls") enable support for the legacy implementation
252    of GCM in the cryptography library. They should not be used, and might
253    eventually be removed, with cryptography v2.0. XXX
254    """
255    type = "aead"
256
257    def __init__(self, key=None, fixed_iv=None, nonce_explicit=None):
258        """
259        'key' and 'fixed_iv' are to be provided as strings. This IV never
260        changes: it is either the client_write_IV or server_write_IV.
261
262        Note that 'nonce_explicit' is never used. It is only a safeguard for a
263        call in session.py to the TLS 1.2/ChaCha20Poly1305 case (see RFC 7905).
264        """
265        self.ready = {"key": True, "fixed_iv": True}
266        if key is None:
267            self.ready["key"] = False
268            key = b"\0" * self.key_len
269        if fixed_iv is None:
270            self.ready["fixed_iv"] = False
271            fixed_iv = b"\0" * self.fixed_iv_len
272
273        # we use super() in order to avoid any deadlock with __setattr__
274        super(_AEADCipher_TLS13, self).__setattr__("key", key)
275        super(_AEADCipher_TLS13, self).__setattr__("fixed_iv", fixed_iv)
276
277        if hasattr(self, "pc_cls"):
278            self._cipher = Cipher(self.pc_cls(key),
279                                  self.pc_cls_mode(fixed_iv),
280                                  backend=default_backend())
281        else:
282            self._cipher = self.cipher_cls(key)
283
284    def __setattr__(self, name, val):
285        if name == "key":
286            if self._cipher is not None:
287                if hasattr(self, "pc_cls"):
288                    self._cipher.algorithm.key = val
289                else:
290                    self._cipher._key = val
291            self.ready["key"] = True
292        elif name == "fixed_iv":
293            self.ready["fixed_iv"] = True
294        super(_AEADCipher_TLS13, self).__setattr__(name, val)
295
296    def _get_nonce(self, seq_num):
297        padlen = self.fixed_iv_len - len(seq_num)
298        padded_seq_num = b"\x00" * padlen + seq_num
299        return strxor(padded_seq_num, self.fixed_iv)
300
301    def auth_encrypt(self, P, A, seq_num):
302        """
303        Encrypt the data, and append the computed authentication code.
304        TLS 1.3 does not use additional data, but we leave this option to the
305        user nonetheless.
306
307        Note that the cipher's authentication tag must be None when encrypting.
308        """
309        if False in six.itervalues(self.ready):
310            raise CipherError(P, A)
311
312        if hasattr(self, "pc_cls"):
313            self._cipher.mode._tag = None
314            self._cipher.mode._initialization_vector = self._get_nonce(seq_num)
315            encryptor = self._cipher.encryptor()
316            encryptor.authenticate_additional_data(A)
317            res = encryptor.update(P) + encryptor.finalize()
318            res += encryptor.tag
319        else:
320            if (conf.crypto_valid_advanced and
321                isinstance(self._cipher, AESCCM)):
322                res = self._cipher.encrypt(self._get_nonce(seq_num), P, A,
323                                           tag_length=self.tag_len)
324            else:
325                res = self._cipher.encrypt(self._get_nonce(seq_num), P, A)
326        return res
327
328    def auth_decrypt(self, A, C, seq_num):
329        """
330        Decrypt the data and verify the authentication code (in this order).
331        Note that TLS 1.3 is not supposed to use any additional data A.
332        If the verification fails, an AEADTagError is raised. It is the user's
333        responsibility to catch it if deemed useful. If we lack the key, we
334        raise a CipherError which contains the encrypted input.
335        """
336        C, mac = C[:-self.tag_len], C[-self.tag_len:]
337        if False in six.itervalues(self.ready):
338            raise CipherError(C, mac)
339
340        if hasattr(self, "pc_cls"):
341            self._cipher.mode._initialization_vector = self._get_nonce(seq_num)
342            self._cipher.mode._tag = mac
343            decryptor = self._cipher.decryptor()
344            decryptor.authenticate_additional_data(A)
345            P = decryptor.update(C)
346            try:
347                decryptor.finalize()
348            except InvalidTag:
349                raise AEADTagError(P, mac)
350        else:
351            try:
352                if (conf.crypto_valid_advanced and
353                    isinstance(self._cipher, AESCCM)):
354                    P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A,
355                                             tag_length=self.tag_len)
356                else:
357                    if (conf.crypto_valid_advanced and
358                        isinstance(self, Cipher_CHACHA20_POLY1305)):
359                        A += struct.pack("!H", len(C))
360                    P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A)
361            except InvalidTag:
362                raise AEADTagError("<unauthenticated data>", mac)
363        return P, mac
364
365    def snapshot(self):
366        c = self.__class__(self.key, self.fixed_iv)
367        c.ready = self.ready.copy()
368        return c
369
370
371if conf.crypto_valid_advanced:
372    class Cipher_CHACHA20_POLY1305_TLS13(_AEADCipher_TLS13):
373        cipher_cls = ChaCha20Poly1305
374        key_len = 32
375        tag_len = 16
376        fixed_iv_len = 12
377        nonce_explicit_len = 0
378
379    class Cipher_CHACHA20_POLY1305(Cipher_CHACHA20_POLY1305_TLS13):
380        """
381        This TLS 1.2 cipher actually uses TLS 1.3 logic, as per RFC 7905.
382        Changes occur at the record layer (in record.py).
383        """
384        pass
385
386
387if conf.crypto_valid:
388    class Cipher_AES_128_GCM_TLS13(_AEADCipher_TLS13):
389       #XXX use the new AESGCM if available
390       #if conf.crypto_valid_advanced:
391       #    cipher_cls = AESGCM
392       #else:
393        pc_cls = algorithms.AES
394        pc_cls_mode = modes.GCM
395        key_len = 16
396        fixed_iv_len = 12
397        tag_len = 16
398
399    class Cipher_AES_256_GCM_TLS13(Cipher_AES_128_GCM_TLS13):
400        key_len = 32
401
402
403if conf.crypto_valid_advanced:
404    class Cipher_AES_128_CCM_TLS13(_AEADCipher_TLS13):
405        cipher_cls = AESCCM
406        key_len = 16
407        tag_len = 16
408
409    class Cipher_AES_128_CCM_8_TLS13(Cipher_AES_128_CCM_TLS13):
410        tag_len = 8
411
412