1#!/usr/bin/env python
2
3## This file is part of Scapy
4## This program is published under a GPLv2 license
5
6"""
7TLS client used in unit tests.
8
9Start our TLS client, send our send_data, and terminate session with an Alert.
10Optional cipher_cuite_code and version may be provided as hexadecimal strings
11(e.g. c09e for TLS_DHE_RSA_WITH_AES_128_CCM and 0303 for TLS 1.2).
12Reception of the exact send_data on the server is to be checked externally.
13"""
14
15import sys, os, time
16import multiprocessing
17
18basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"../../"))
19sys.path=[basedir]+sys.path
20
21from scapy.layers.tls.automaton_cli import TLSClientAutomaton
22from scapy.layers.tls.handshake import TLSClientHello
23
24
25send_data = cipher_suite_code = version = None
26
27def run_tls_test_client(send_data=None, cipher_suite_code=None, version=None):
28    if version == "0002":
29        t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2")
30    else:
31        ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16))
32        t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=1)
33    t.run()
34
35from travis_test_server import run_tls_test_server
36
37def test_tls_client(suite, version, q):
38    msg = ("TestC_%s_data" % suite).encode()
39    # Run server
40    q_ = multiprocessing.Manager().Queue()
41    th_ = multiprocessing.Process(target=run_tls_test_server, args=(msg, q_))
42    th_.start()
43    # Synchronise threads
44    assert q_.get() is True
45    time.sleep(1)
46    # Run client
47    run_tls_test_client(msg, suite, version)
48    # Wait for server
49    th_.join(60)
50    if th_.is_alive():
51        th_.terminate()
52        raise RuntimeError("Test timed out")
53    # Return values
54    q.put(q_.get())
55    q.put(th_.exitcode)
56