1import socket
2import nntplib
3import time
4import unittest
5
6try:
7    import threading
8except ImportError:
9    threading = None
10
11
12from unittest import TestCase
13from test import test_support
14
15HOST = test_support.HOST
16
17
18def server(evt, serv, evil=False):
19    serv.listen(5)
20    try:
21        conn, addr = serv.accept()
22    except socket.timeout:
23        pass
24    else:
25        if evil:
26            conn.send("1 I'm too long response" * 3000 + "\n")
27        else:
28            conn.send("1 I'm OK response\n")
29        conn.close()
30    finally:
31        serv.close()
32        evt.set()
33
34
35class BaseServerTest(TestCase):
36    def setUp(self):
37        self.evt = threading.Event()
38        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
39        self.sock.settimeout(3)
40        self.port = test_support.bind_port(self.sock)
41        threading.Thread(
42            target=server,
43            args=(self.evt, self.sock, self.evil)).start()
44        time.sleep(.1)
45
46    def tearDown(self):
47        self.evt.wait()
48
49
50@unittest.skipUnless(threading, 'threading required')
51class ServerTests(BaseServerTest):
52    evil = False
53
54    def test_basic_connect(self):
55        nntp = nntplib.NNTP('localhost', self.port)
56        nntp.sock.close()
57
58
59@unittest.skipUnless(threading, 'threading required')
60class EvilServerTests(BaseServerTest):
61    evil = True
62
63    def test_too_long_line(self):
64        self.assertRaises(nntplib.NNTPDataError,
65                          nntplib.NNTP, 'localhost', self.port)
66
67
68def test_main(verbose=None):
69    test_support.run_unittest(EvilServerTests)
70    test_support.run_unittest(ServerTests)
71
72if __name__ == '__main__':
73    test_main()
74