1## This file is part of Scapy
2## Copyright (C) 2017 Maxence Tury
3## This program is published under a GPLv2 license
4
5"""
6SSLv2 handshake fields & logic.
7"""
8
9import math
10
11from scapy.error import log_runtime, warning
12from scapy.fields import *
13from scapy.packet import Packet, Raw, Padding
14from scapy.layers.tls.cert import Cert, PrivKey, PubKey
15from scapy.layers.tls.basefields import _tls_version, _TLSVersionField
16from scapy.layers.tls.handshake import _CipherSuitesField
17from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature
18from scapy.layers.tls.session import (_GenericTLSSessionInheritance,
19                                      readConnState, writeConnState)
20from scapy.layers.tls.crypto.suites import (_tls_cipher_suites,
21                                            _tls_cipher_suites_cls,
22                                            _GenericCipherSuite,
23                                            _GenericCipherSuiteMetaclass,
24                                            get_usable_ciphersuites,
25                                            SSL_CK_DES_192_EDE3_CBC_WITH_MD5)
26
27
28###############################################################################
29### Generic SSLv2 Handshake message                                         ###
30###############################################################################
31
32_sslv2_handshake_type = { 0: "error",                1: "client_hello",
33                          2: "client_master_key",    3: "client_finished",
34                          4: "server_hello",         5: "server_verify",
35                          6: "server_finished",      7: "request_certificate",
36                          8: "client_certificate" }
37
38
39class _SSLv2Handshake(_GenericTLSSessionInheritance):
40    """
41    Inherited by other Handshake classes to get post_build().
42    Also used as a fallback for unknown TLS Handshake packets.
43    """
44    name = "SSLv2 Handshake Generic message"
45    fields_desc = [ ByteEnumField("msgtype", None, _sslv2_handshake_type) ]
46
47    def guess_payload_class(self, p):
48        return Padding
49
50    def tls_session_update(self, msg_str):
51        """
52        Covers both post_build- and post_dissection- context updates.
53        """
54        self.tls_session.handshake_messages.append(msg_str)
55        self.tls_session.handshake_messages_parsed.append(self)
56
57
58###############################################################################
59### Error                                                                   ###
60###############################################################################
61
62_tls_error_code = { 1: "no_cipher",         2: "no_certificate",
63                    4: "bad_certificate",   6: "unsupported_certificate_type" }
64
65class SSLv2Error(_SSLv2Handshake):
66    """
67    SSLv2 Error.
68    """
69    name = "SSLv2 Handshake - Error"
70    fields_desc = [ ByteEnumField("msgtype", 0, _sslv2_handshake_type),
71                    ShortEnumField("code", None, _tls_error_code) ]
72
73
74###############################################################################
75### ClientHello                                                             ###
76###############################################################################
77
78class _SSLv2CipherSuitesField(_CipherSuitesField):
79    def __init__(self, name, default, dico, length_from=None):
80        _CipherSuitesField.__init__(self, name, default, dico,
81                                    length_from=length_from)
82        self.itemfmt = b""
83        self.itemsize = 3
84
85    def i2m(self, pkt, val):
86        if val is None:
87            val2 = []
88        val2 = [(x >> 16, x & 0x00ffff) for x in val]
89        return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2])
90
91    def m2i(self, pkt, m):
92        res = []
93        while m:
94            res.append(struct.unpack("!I", b"\x00" + m[:3])[0])
95            m = m[3:]
96        return res
97
98
99class SSLv2ClientHello(_SSLv2Handshake):
100    """
101    SSLv2 ClientHello.
102    """
103    name = "SSLv2 Handshake - Client Hello"
104    fields_desc = [ ByteEnumField("msgtype", 1, _sslv2_handshake_type),
105                    _TLSVersionField("version", 0x0002, _tls_version),
106
107                    FieldLenField("cipherslen", None, fmt="!H",
108                                  length_of="ciphers"),
109                    FieldLenField("sidlen", None, fmt="!H",
110                                  length_of="sid"),
111                    FieldLenField("challengelen", None, fmt="!H",
112                                  length_of="challenge"),
113
114                    XStrLenField("sid", b"",
115                                 length_from=lambda pkt:pkt.sidlen),
116                    _SSLv2CipherSuitesField("ciphers",
117                                      [SSL_CK_DES_192_EDE3_CBC_WITH_MD5],
118                                      _tls_cipher_suites,
119                                      length_from=lambda pkt: pkt.cipherslen),
120                    XStrLenField("challenge", b"",
121                                 length_from=lambda pkt:pkt.challengelen) ]
122
123    def tls_session_update(self, msg_str):
124        super(SSLv2ClientHello, self).tls_session_update(msg_str)
125        self.tls_session.advertised_tls_version = self.version
126        self.tls_session.sslv2_common_cs = self.ciphers
127        self.tls_session.sslv2_challenge = self.challenge
128
129
130###############################################################################
131### ServerHello                                                             ###
132###############################################################################
133
134class _SSLv2CertDataField(StrLenField):
135    def getfield(self, pkt, s):
136        l = 0
137        if self.length_from is not None:
138            l = self.length_from(pkt)
139        try:
140            certdata = Cert(s[:l])
141        except:
142            certdata = s[:l]
143        return s[l:], certdata
144
145    def i2len(self, pkt, i):
146        if isinstance(i, Cert):
147            return len(i.der)
148        return len(i)
149
150    def i2m(self, pkt, i):
151        if isinstance(i, Cert):
152            return i.der
153        return i
154
155
156class SSLv2ServerHello(_SSLv2Handshake):
157    """
158    SSLv2 ServerHello.
159    """
160    name = "SSLv2 Handshake - Server Hello"
161    fields_desc = [ ByteEnumField("msgtype", 4, _sslv2_handshake_type),
162
163                    ByteField("sid_hit", 0),
164                    ByteEnumField("certtype", 1, {1: "x509_cert"}),
165                    _TLSVersionField("version", 0x0002, _tls_version),
166
167                    FieldLenField("certlen", None, fmt="!H",
168                                  length_of="cert"),
169                    FieldLenField("cipherslen", None, fmt="!H",
170                                  length_of="ciphers"),
171                    FieldLenField("connection_idlen", None, fmt="!H",
172                                  length_of="connection_id"),
173
174                    _SSLv2CertDataField("cert", b"",
175                                        length_from=lambda pkt: pkt.certlen),
176                    _SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites,
177                                length_from=lambda pkt: pkt.cipherslen),
178                    XStrLenField("connection_id", b"",
179                                length_from=lambda pkt: pkt.connection_idlen) ]
180
181    def tls_session_update(self, msg_str):
182        """
183        XXX Something should be done about the session ID here.
184        """
185        super(SSLv2ServerHello, self).tls_session_update(msg_str)
186
187        s = self.tls_session
188        client_cs = s.sslv2_common_cs
189        css = [cs for cs in client_cs if cs in self.ciphers]
190        s.sslv2_common_cs = css
191        s.sslv2_connection_id = self.connection_id
192        s.tls_version = self.version
193        if self.cert is not None:
194            s.server_certs = [self.cert]
195
196
197###############################################################################
198### ClientMasterKey                                                         ###
199###############################################################################
200
201class _SSLv2CipherSuiteField(EnumField):
202    def __init__(self, name, default, dico):
203        EnumField.__init__(self, name, default, dico)
204
205    def i2m(self, pkt, val):
206        if val is None:
207            return b""
208        val2 = (val >> 16, val & 0x00ffff)
209        return struct.pack(">BH", val2[0], val2[1])
210
211    def addfield(self, pkt, s, val):
212        return s + self.i2m(pkt, val)
213
214    def m2i(self, pkt, m):
215        return struct.unpack("!I", b"\x00" + m[:3])[0]
216
217    def getfield(self, pkt, s):
218        return s[3:], self.m2i(pkt, s)
219
220class _SSLv2EncryptedKeyField(XStrLenField):
221    def i2repr(self, pkt, x):
222        s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x)
223        if pkt.decryptedkey is not None:
224            dx = pkt.decryptedkey
225            ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx)
226            s += "    [decryptedkey= %s]" % ds
227        return s
228
229class SSLv2ClientMasterKey(_SSLv2Handshake):
230    """
231    SSLv2 ClientMasterKey.
232    """
233    __slots__ = ["decryptedkey"]
234    name = "SSLv2 Handshake - Client Master Key"
235    fields_desc = [ ByteEnumField("msgtype", 2, _sslv2_handshake_type),
236                    _SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites),
237
238                    FieldLenField("clearkeylen", None, fmt="!H",
239                                  length_of="clearkey"),
240                    FieldLenField("encryptedkeylen", None, fmt="!H",
241                                  length_of="encryptedkey"),
242                    FieldLenField("keyarglen", None, fmt="!H",
243                                  length_of="keyarg"),
244
245                    XStrLenField("clearkey", "",
246                                length_from=lambda pkt: pkt.clearkeylen),
247                    _SSLv2EncryptedKeyField("encryptedkey", "",
248                                length_from=lambda pkt: pkt.encryptedkeylen),
249                    XStrLenField("keyarg", "",
250                                length_from=lambda pkt: pkt.keyarglen) ]
251
252    def __init__(self, *args, **kargs):
253        """
254        When post_building, the packets fields are updated (this is somewhat
255        non-standard). We might need these fields later, but calling __str__
256        on a new packet (i.e. not dissected from a raw string) applies
257        post_build to an object different from the original one... unless
258        we hackishly always set self.explicit to 1.
259        """
260        if "decryptedkey" in kargs:
261            self.decryptedkey = kargs["decryptedkey"]
262            del kargs["decryptedkey"]
263        else:
264            self.decryptedkey = b""
265        super(SSLv2ClientMasterKey, self).__init__(*args, **kargs)
266        self.explicit = 1
267
268    def pre_dissect(self, s):
269        clearkeylen = struct.unpack("!H", s[4:6])[0]
270        encryptedkeylen = struct.unpack("!H", s[6:8])[0]
271        encryptedkeystart = 10 + clearkeylen
272        encryptedkey = s[encryptedkeystart:encryptedkeystart+encryptedkeylen]
273        if self.tls_session.server_rsa_key:
274            self.decryptedkey = \
275                    self.tls_session.server_rsa_key.decrypt(encryptedkey)
276        else:
277            self.decryptedkey = None
278        return s
279
280    def post_build(self, pkt, pay):
281        cs_val = None
282        if self.cipher is None:
283            common_cs = self.tls_session.sslv2_common_cs
284            cs_vals = get_usable_ciphersuites(common_cs, "SSLv2")
285            if len(cs_vals) == 0:
286                warning("No known common cipher suite between SSLv2 Hellos.")
287                cs_val = 0x0700c0
288                cipher = b"\x07\x00\xc0"
289            else:
290                cs_val = cs_vals[0]         #XXX choose the best one
291                cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff)
292            cs_cls = _tls_cipher_suites_cls[cs_val]
293            self.cipher = cs_val
294        else:
295            cipher = pkt[1:4]
296            cs_val = struct.unpack("!I", b"\x00" + cipher)[0]
297            if cs_val not in _tls_cipher_suites_cls:
298                warning("Unknown ciphersuite %d from ClientMasterKey" % cs_val)
299                cs_cls = None
300            else:
301                cs_cls = _tls_cipher_suites_cls[cs_val]
302
303        if cs_cls:
304            if (self.encryptedkey == b"" and
305                len(self.tls_session.server_certs) > 0):
306                # else, the user is responsible for export slicing & encryption
307                key = randstring(cs_cls.cipher_alg.key_len)
308
309                if self.clearkey == b"" and cs_cls.kx_alg.export:
310                    self.clearkey = key[:-5]
311
312                if self.decryptedkey == b"":
313                    if cs_cls.kx_alg.export:
314                        self.decryptedkey = key[-5:]
315                    else:
316                        self.decryptedkey = key
317
318                pubkey = self.tls_session.server_certs[0].pubKey
319                self.encryptedkey = pubkey.encrypt(self.decryptedkey)
320
321            if self.keyarg == b"" and cs_cls.cipher_alg.type == "block":
322                self.keyarg = randstring(cs_cls.cipher_alg.block_size)
323
324        clearkey = self.clearkey or b""
325        if self.clearkeylen is None:
326            self.clearkeylen = len(clearkey)
327        clearkeylen = struct.pack("!H", self.clearkeylen)
328
329        encryptedkey = self.encryptedkey or b""
330        if self.encryptedkeylen is None:
331            self.encryptedkeylen = len(encryptedkey)
332        encryptedkeylen = struct.pack("!H", self.encryptedkeylen)
333
334        keyarg = self.keyarg or b""
335        if self.keyarglen is None:
336            self.keyarglen = len(keyarg)
337        keyarglen = struct.pack("!H", self.keyarglen)
338
339        s = (chb(pkt[0]) + cipher
340             + clearkeylen + encryptedkeylen + keyarglen
341             + clearkey + encryptedkey + keyarg)
342        return s + pay
343
344    def tls_session_update(self, msg_str):
345        super(SSLv2ClientMasterKey, self).tls_session_update(msg_str)
346
347        s = self.tls_session
348        cs_val = self.cipher
349        if cs_val not in _tls_cipher_suites_cls:
350            warning("Unknown cipher suite %d from ClientMasterKey" % cs_val)
351            cs_cls = None
352        else:
353            cs_cls = _tls_cipher_suites_cls[cs_val]
354
355        tls_version = s.tls_version or 0x0002
356        connection_end = s.connection_end
357        wcs_seq_num = s.wcs.seq_num
358        s.pwcs = writeConnState(ciphersuite=cs_cls,
359                                            connection_end=connection_end,
360                                            seq_num=wcs_seq_num,
361                                            tls_version=tls_version)
362        rcs_seq_num = s.rcs.seq_num
363        s.prcs = readConnState(ciphersuite=cs_cls,
364                                           connection_end=connection_end,
365                                           seq_num=rcs_seq_num,
366                                           tls_version=tls_version)
367
368        if self.decryptedkey is not None:
369            s.master_secret = self.clearkey + self.decryptedkey
370            s.compute_sslv2_km_and_derive_keys()
371
372            if s.pwcs.cipher.type == "block":
373                s.pwcs.cipher.iv = self.keyarg
374            if s.prcs.cipher.type == "block":
375                s.prcs.cipher.iv = self.keyarg
376
377            s.triggered_prcs_commit = True
378            s.triggered_pwcs_commit = True
379
380
381###############################################################################
382### ServerVerify                                                            ###
383###############################################################################
384
385class SSLv2ServerVerify(_SSLv2Handshake):
386    """
387    In order to parse a ServerVerify, the exact message string should be
388    fed to the class. This is how SSLv2 defines the challenge length...
389    """
390    name = "SSLv2 Handshake - Server Verify"
391    fields_desc = [ ByteEnumField("msgtype", 5, _sslv2_handshake_type),
392                    XStrField("challenge", "") ]
393
394    def build(self, *args, **kargs):
395        fval = self.getfieldval("challenge")
396        if fval is None:
397            self.challenge = self.tls_session.sslv2_challenge
398        return super(SSLv2ServerVerify, self).build(*args, **kargs)
399
400    def post_dissection(self, pkt):
401        s = self.tls_session
402        if s.sslv2_challenge is not None:
403            if self.challenge != s.sslv2_challenge:
404                pkt_info = pkt.firstlayer().summary()
405                log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info)
406
407
408###############################################################################
409### RequestCertificate                                                      ###
410###############################################################################
411
412class SSLv2RequestCertificate(_SSLv2Handshake):
413    """
414    In order to parse a RequestCertificate, the exact message string should be
415    fed to the class. This is how SSLv2 defines the challenge length...
416    """
417    name = "SSLv2 Handshake - Request Certificate"
418    fields_desc = [ ByteEnumField("msgtype", 7, _sslv2_handshake_type),
419                    ByteEnumField("authtype", 1, {1: "md5_with_rsa"}),
420                    XStrField("challenge", "") ]
421
422    def tls_session_update(self, msg_str):
423        super(SSLv2RequestCertificate, self).tls_session_update(msg_str)
424        self.tls_session.sslv2_challenge_clientcert = self.challenge
425
426
427###############################################################################
428### ClientCertificate                                                       ###
429###############################################################################
430
431class SSLv2ClientCertificate(_SSLv2Handshake):
432    """
433    SSLv2 ClientCertificate.
434    """
435    name = "SSLv2 Handshake - Client Certificate"
436    fields_desc = [ ByteEnumField("msgtype", 8, _sslv2_handshake_type),
437
438                    ByteEnumField("certtype", 1, {1: "x509_cert"}),
439                    FieldLenField("certlen", None, fmt="!H",
440                                  length_of="certdata"),
441                    FieldLenField("responselen", None, fmt="!H",
442                                  length_of="responsedata"),
443
444                    _SSLv2CertDataField("certdata", b"",
445                                      length_from=lambda pkt: pkt.certlen),
446                    _TLSSignatureField("responsedata", None,
447                                length_from=lambda pkt: pkt.responselen) ]
448
449    def build(self, *args, **kargs):
450        s = self.tls_session
451        sig = self.getfieldval("responsedata")
452        test = (sig is None and
453                s.sslv2_key_material is not None and
454                s.sslv2_challenge_clientcert is not None and
455                len(s.server_certs) > 0)
456        if test:
457            s = self.tls_session
458            m = (s.sslv2_key_material +
459                 s.sslv2_challenge_clientcert +
460                 s.server_certs[0].der)
461            self.responsedata = _TLSSignature(tls_session=s)
462            self.responsedata._update_sig(m, s.client_key)
463        else:
464            self.responsedata = b""
465        return super(SSLv2ClientCertificate, self).build(*args, **kargs)
466
467    def post_dissection_tls_session_update(self, msg_str):
468        self.tls_session_update(msg_str)
469
470        s = self.tls_session
471        test = (len(s.client_certs) > 0 and
472                s.sslv2_key_material is not None and
473                s.sslv2_challenge_clientcert is not None and
474                len(s.server_certs) > 0)
475        if test:
476            m = (s.sslv2_key_material +
477                 s.sslv2_challenge_clientcert +
478                 s.server_certs[0].der)
479            sig_test = self.responsedata._verify_sig(m, s.client_certs[0])
480            if not sig_test:
481                pkt_info = self.firstlayer().summary()
482                log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info)
483
484    def tls_session_update(self, msg_str):
485        super(SSLv2ClientCertificate, self).tls_session_update(msg_str)
486        if self.certdata:
487            self.tls_session.client_certs = [self.certdata]
488
489
490###############################################################################
491### Finished                                                                ###
492###############################################################################
493
494class SSLv2ClientFinished(_SSLv2Handshake):
495    """
496    In order to parse a ClientFinished, the exact message string should be fed
497    to the class. SSLv2 does not offer any other way to know the c_id length.
498    """
499    name = "SSLv2 Handshake - Client Finished"
500    fields_desc = [ ByteEnumField("msgtype", 3, _sslv2_handshake_type),
501                    XStrField("connection_id", "") ]
502
503    def build(self, *args, **kargs):
504        fval = self.getfieldval("connection_id")
505        if fval == b"":
506            self.connection_id = self.tls_session.sslv2_connection_id
507        return super(SSLv2ClientFinished, self).build(*args, **kargs)
508
509    def post_dissection(self, pkt):
510        s = self.tls_session
511        if s.sslv2_connection_id is not None:
512            if self.connection_id != s.sslv2_connection_id:
513                pkt_info = pkt.firstlayer().summary()
514                log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info)
515
516
517class SSLv2ServerFinished(_SSLv2Handshake):
518    """
519    In order to parse a ServerFinished, the exact message string should be fed
520    to the class. SSLv2 does not offer any other way to know the sid length.
521    """
522    name = "SSLv2 Handshake - Server Finished"
523    fields_desc = [ ByteEnumField("msgtype", 6, _sslv2_handshake_type),
524                    XStrField("sid", "") ]
525
526    def build(self, *args, **kargs):
527        fval = self.getfieldval("sid")
528        if fval == b"":
529            self.sid = self.tls_session.sid
530        return super(SSLv2ServerFinished, self).build(*args, **kargs)
531
532    def post_dissection_tls_session_update(self, msg_str):
533        self.tls_session_update(msg_str)
534        self.tls_session.sid = self.sid
535
536
537###############################################################################
538### All handshake messages defined in this module                           ###
539###############################################################################
540
541_sslv2_handshake_cls = { 0: SSLv2Error,             1: SSLv2ClientHello,
542                         2: SSLv2ClientMasterKey,   3: SSLv2ClientFinished,
543                         4: SSLv2ServerHello,       5: SSLv2ServerVerify,
544                         6: SSLv2ServerFinished,    7: SSLv2RequestCertificate,
545                         8: SSLv2ClientCertificate }
546
547