1import socket
2import telnetlib
3import time
4import Queue
5
6import unittest
7from unittest import TestCase
8from test import test_support
9threading = test_support.import_module('threading')
10
11HOST = test_support.HOST
12EOF_sigil = object()
13
14def server(evt, serv, dataq=None):
15    """ Open a tcp server in three steps
16        1) set evt to true to let the parent know we are ready
17        2) [optional] if is not False, write the list of data from dataq.get()
18           to the socket.
19    """
20    serv.listen(5)
21    evt.set()
22    try:
23        conn, addr = serv.accept()
24        if dataq:
25            data = ''
26            new_data = dataq.get(True, 0.5)
27            dataq.task_done()
28            for item in new_data:
29                if item == EOF_sigil:
30                    break
31                if type(item) in [int, float]:
32                    time.sleep(item)
33                else:
34                    data += item
35                written = conn.send(data)
36                data = data[written:]
37        conn.close()
38    except socket.timeout:
39        pass
40    finally:
41        serv.close()
42
43class GeneralTests(TestCase):
44
45    def setUp(self):
46        self.evt = threading.Event()
47        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48        self.sock.settimeout(60)  # Safety net. Look issue 11812
49        self.port = test_support.bind_port(self.sock)
50        self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
51        self.thread.setDaemon(True)
52        self.thread.start()
53        self.evt.wait()
54
55    def tearDown(self):
56        self.thread.join()
57
58    def testBasic(self):
59        # connects
60        telnet = telnetlib.Telnet(HOST, self.port)
61        telnet.sock.close()
62
63    def testTimeoutDefault(self):
64        self.assertTrue(socket.getdefaulttimeout() is None)
65        socket.setdefaulttimeout(30)
66        try:
67            telnet = telnetlib.Telnet(HOST, self.port)
68        finally:
69            socket.setdefaulttimeout(None)
70        self.assertEqual(telnet.sock.gettimeout(), 30)
71        telnet.sock.close()
72
73    def testTimeoutNone(self):
74        # None, having other default
75        self.assertTrue(socket.getdefaulttimeout() is None)
76        socket.setdefaulttimeout(30)
77        try:
78            telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
79        finally:
80            socket.setdefaulttimeout(None)
81        self.assertTrue(telnet.sock.gettimeout() is None)
82        telnet.sock.close()
83
84    def testTimeoutValue(self):
85        telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
86        self.assertEqual(telnet.sock.gettimeout(), 30)
87        telnet.sock.close()
88
89    def testTimeoutOpen(self):
90        telnet = telnetlib.Telnet()
91        telnet.open(HOST, self.port, timeout=30)
92        self.assertEqual(telnet.sock.gettimeout(), 30)
93        telnet.sock.close()
94
95    def testGetters(self):
96        # Test telnet getter methods
97        telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
98        t_sock = telnet.sock
99        self.assertEqual(telnet.get_socket(), t_sock)
100        self.assertEqual(telnet.fileno(), t_sock.fileno())
101        telnet.sock.close()
102
103def _read_setUp(self):
104    self.evt = threading.Event()
105    self.dataq = Queue.Queue()
106    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107    self.sock.settimeout(10)
108    self.port = test_support.bind_port(self.sock)
109    self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
110    self.thread.start()
111    self.evt.wait()
112
113def _read_tearDown(self):
114    self.thread.join()
115
116class ReadTests(TestCase):
117    setUp = _read_setUp
118    tearDown = _read_tearDown
119
120    # use a similar approach to testing timeouts as test_timeout.py
121    # these will never pass 100% but make the fuzz big enough that it is rare
122    block_long = 0.6
123    block_short = 0.3
124    def test_read_until_A(self):
125        """
126        read_until(expected, [timeout])
127          Read until the expected string has been seen, or a timeout is
128          hit (default is no timeout); may block.
129        """
130        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
131        self.dataq.put(want)
132        telnet = telnetlib.Telnet(HOST, self.port)
133        self.dataq.join()
134        data = telnet.read_until('match')
135        self.assertEqual(data, ''.join(want[:-2]))
136
137    def test_read_until_B(self):
138        # test the timeout - it does NOT raise socket.timeout
139        want = ['hello', self.block_long, 'not seen', EOF_sigil]
140        self.dataq.put(want)
141        telnet = telnetlib.Telnet(HOST, self.port)
142        self.dataq.join()
143        data = telnet.read_until('not seen', self.block_short)
144        self.assertEqual(data, want[0])
145        self.assertEqual(telnet.read_all(), 'not seen')
146
147    def test_read_until_with_poll(self):
148        """Use select.poll() to implement telnet.read_until()."""
149        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
150        self.dataq.put(want)
151        telnet = telnetlib.Telnet(HOST, self.port)
152        if not telnet._has_poll:
153            raise unittest.SkipTest('select.poll() is required')
154        telnet._has_poll = True
155        self.dataq.join()
156        data = telnet.read_until('match')
157        self.assertEqual(data, ''.join(want[:-2]))
158
159    def test_read_until_with_select(self):
160        """Use select.select() to implement telnet.read_until()."""
161        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
162        self.dataq.put(want)
163        telnet = telnetlib.Telnet(HOST, self.port)
164        telnet._has_poll = False
165        self.dataq.join()
166        data = telnet.read_until('match')
167        self.assertEqual(data, ''.join(want[:-2]))
168
169    def test_read_all_A(self):
170        """
171        read_all()
172          Read all data until EOF; may block.
173        """
174        want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
175        self.dataq.put(want)
176        telnet = telnetlib.Telnet(HOST, self.port)
177        self.dataq.join()
178        data = telnet.read_all()
179        self.assertEqual(data, ''.join(want[:-1]))
180
181    def _test_blocking(self, func):
182        self.dataq.put([self.block_long, EOF_sigil])
183        self.dataq.join()
184        start = time.time()
185        data = func()
186        self.assertTrue(self.block_short <= time.time() - start)
187
188    def test_read_all_B(self):
189        self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
190
191    def test_read_all_C(self):
192        self.dataq.put([EOF_sigil])
193        telnet = telnetlib.Telnet(HOST, self.port)
194        self.dataq.join()
195        telnet.read_all()
196        telnet.read_all() # shouldn't raise
197
198    def test_read_some_A(self):
199        """
200        read_some()
201          Read at least one byte or EOF; may block.
202        """
203        # test 'at least one byte'
204        want = ['x' * 500, EOF_sigil]
205        self.dataq.put(want)
206        telnet = telnetlib.Telnet(HOST, self.port)
207        self.dataq.join()
208        data = telnet.read_all()
209        self.assertTrue(len(data) >= 1)
210
211    def test_read_some_B(self):
212        # test EOF
213        self.dataq.put([EOF_sigil])
214        telnet = telnetlib.Telnet(HOST, self.port)
215        self.dataq.join()
216        self.assertEqual('', telnet.read_some())
217
218    def test_read_some_C(self):
219        self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
220
221    def _test_read_any_eager_A(self, func_name):
222        """
223        read_very_eager()
224          Read all data available already queued or on the socket,
225          without blocking.
226        """
227        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
228        expects = want[1] + want[2]
229        self.dataq.put(want)
230        telnet = telnetlib.Telnet(HOST, self.port)
231        self.dataq.join()
232        func = getattr(telnet, func_name)
233        data = ''
234        while True:
235            try:
236                data += func()
237                self.assertTrue(expects.startswith(data))
238            except EOFError:
239                break
240        self.assertEqual(expects, data)
241
242    def _test_read_any_eager_B(self, func_name):
243        # test EOF
244        self.dataq.put([EOF_sigil])
245        telnet = telnetlib.Telnet(HOST, self.port)
246        self.dataq.join()
247        time.sleep(self.block_short)
248        func = getattr(telnet, func_name)
249        self.assertRaises(EOFError, func)
250
251    # read_eager and read_very_eager make the same guarantees
252    # (they behave differently but we only test the guarantees)
253    def test_read_very_eager_A(self):
254        self._test_read_any_eager_A('read_very_eager')
255    def test_read_very_eager_B(self):
256        self._test_read_any_eager_B('read_very_eager')
257    def test_read_eager_A(self):
258        self._test_read_any_eager_A('read_eager')
259    def test_read_eager_B(self):
260        self._test_read_any_eager_B('read_eager')
261    # NB -- we need to test the IAC block which is mentioned in the docstring
262    # but not in the module docs
263
264    def _test_read_any_lazy_B(self, func_name):
265        self.dataq.put([EOF_sigil])
266        telnet = telnetlib.Telnet(HOST, self.port)
267        self.dataq.join()
268        func = getattr(telnet, func_name)
269        telnet.fill_rawq()
270        self.assertRaises(EOFError, func)
271
272    def test_read_lazy_A(self):
273        want = ['x' * 100, EOF_sigil]
274        self.dataq.put(want)
275        telnet = telnetlib.Telnet(HOST, self.port)
276        self.dataq.join()
277        time.sleep(self.block_short)
278        self.assertEqual('', telnet.read_lazy())
279        data = ''
280        while True:
281            try:
282                read_data = telnet.read_lazy()
283                data += read_data
284                if not read_data:
285                    telnet.fill_rawq()
286            except EOFError:
287                break
288            self.assertTrue(want[0].startswith(data))
289        self.assertEqual(data, want[0])
290
291    def test_read_lazy_B(self):
292        self._test_read_any_lazy_B('read_lazy')
293
294    def test_read_very_lazy_A(self):
295        want = ['x' * 100, EOF_sigil]
296        self.dataq.put(want)
297        telnet = telnetlib.Telnet(HOST, self.port)
298        self.dataq.join()
299        time.sleep(self.block_short)
300        self.assertEqual('', telnet.read_very_lazy())
301        data = ''
302        while True:
303            try:
304                read_data = telnet.read_very_lazy()
305            except EOFError:
306                break
307            data += read_data
308            if not read_data:
309                telnet.fill_rawq()
310                self.assertEqual('', telnet.cookedq)
311                telnet.process_rawq()
312            self.assertTrue(want[0].startswith(data))
313        self.assertEqual(data, want[0])
314
315    def test_read_very_lazy_B(self):
316        self._test_read_any_lazy_B('read_very_lazy')
317
318class nego_collector(object):
319    def __init__(self, sb_getter=None):
320        self.seen = ''
321        self.sb_getter = sb_getter
322        self.sb_seen = ''
323
324    def do_nego(self, sock, cmd, opt):
325        self.seen += cmd + opt
326        if cmd == tl.SE and self.sb_getter:
327            sb_data = self.sb_getter()
328            self.sb_seen += sb_data
329
330tl = telnetlib
331class OptionTests(TestCase):
332    setUp = _read_setUp
333    tearDown = _read_tearDown
334    # RFC 854 commands
335    cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
336
337    def _test_command(self, data):
338        """ helper for testing IAC + cmd """
339        self.setUp()
340        self.dataq.put(data)
341        telnet = telnetlib.Telnet(HOST, self.port)
342        self.dataq.join()
343        nego = nego_collector()
344        telnet.set_option_negotiation_callback(nego.do_nego)
345        txt = telnet.read_all()
346        cmd = nego.seen
347        self.assertTrue(len(cmd) > 0) # we expect at least one command
348        self.assertIn(cmd[0], self.cmds)
349        self.assertEqual(cmd[1], tl.NOOPT)
350        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
351        nego.sb_getter = None # break the nego => telnet cycle
352        self.tearDown()
353
354    def test_IAC_commands(self):
355        # reset our setup
356        self.dataq.put([EOF_sigil])
357        telnet = telnetlib.Telnet(HOST, self.port)
358        self.dataq.join()
359        self.tearDown()
360
361        for cmd in self.cmds:
362            self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
363            self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
364            self._test_command([tl.IAC + cmd, EOF_sigil])
365        # all at once
366        self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
367        self.assertEqual('', telnet.read_sb_data())
368
369    def test_SB_commands(self):
370        # RFC 855, subnegotiations portion
371        send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
372                tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
373                tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
374                tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
375                tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
376                EOF_sigil,
377               ]
378        self.dataq.put(send)
379        telnet = telnetlib.Telnet(HOST, self.port)
380        self.dataq.join()
381        nego = nego_collector(telnet.read_sb_data)
382        telnet.set_option_negotiation_callback(nego.do_nego)
383        txt = telnet.read_all()
384        self.assertEqual(txt, '')
385        want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
386        self.assertEqual(nego.sb_seen, want_sb_data)
387        self.assertEqual('', telnet.read_sb_data())
388        nego.sb_getter = None # break the nego => telnet cycle
389
390
391class ExpectTests(TestCase):
392    def setUp(self):
393        self.evt = threading.Event()
394        self.dataq = Queue.Queue()
395        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
396        self.sock.settimeout(10)
397        self.port = test_support.bind_port(self.sock)
398        self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
399                                                            self.dataq))
400        self.thread.start()
401        self.evt.wait()
402
403    def tearDown(self):
404        self.thread.join()
405
406    # use a similar approach to testing timeouts as test_timeout.py
407    # these will never pass 100% but make the fuzz big enough that it is rare
408    block_long = 0.6
409    block_short = 0.3
410    def test_expect_A(self):
411        """
412        expect(expected, [timeout])
413          Read until the expected string has been seen, or a timeout is
414          hit (default is no timeout); may block.
415        """
416        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
417        self.dataq.put(want)
418        telnet = telnetlib.Telnet(HOST, self.port)
419        self.dataq.join()
420        (_,_,data) = telnet.expect(['match'])
421        self.assertEqual(data, ''.join(want[:-2]))
422
423    def test_expect_B(self):
424        # test the timeout - it does NOT raise socket.timeout
425        want = ['hello', self.block_long, 'not seen', EOF_sigil]
426        self.dataq.put(want)
427        telnet = telnetlib.Telnet(HOST, self.port)
428        self.dataq.join()
429        (_,_,data) = telnet.expect(['not seen'], self.block_short)
430        self.assertEqual(data, want[0])
431        self.assertEqual(telnet.read_all(), 'not seen')
432
433    def test_expect_with_poll(self):
434        """Use select.poll() to implement telnet.expect()."""
435        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
436        self.dataq.put(want)
437        telnet = telnetlib.Telnet(HOST, self.port)
438        if not telnet._has_poll:
439            raise unittest.SkipTest('select.poll() is required')
440        telnet._has_poll = True
441        self.dataq.join()
442        (_,_,data) = telnet.expect(['match'])
443        self.assertEqual(data, ''.join(want[:-2]))
444
445    def test_expect_with_select(self):
446        """Use select.select() to implement telnet.expect()."""
447        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
448        self.dataq.put(want)
449        telnet = telnetlib.Telnet(HOST, self.port)
450        telnet._has_poll = False
451        self.dataq.join()
452        (_,_,data) = telnet.expect(['match'])
453        self.assertEqual(data, ''.join(want[:-2]))
454
455
456def test_main(verbose=None):
457    test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
458                              ExpectTests)
459
460if __name__ == '__main__':
461    test_main()
462