1# -*- coding: utf-8 -*-
2#
3
4import sys
5sys.path[0:0] = [""]
6
7import os
8import os.path
9import socket
10
11import six
12
13# websocket-client
14import websocket as ws
15from websocket._handshake import _create_sec_websocket_key, \
16    _validate as _validate_header
17from websocket._http import read_headers
18from websocket._url import get_proxy_info, parse_url
19from websocket._utils import validate_utf8
20
21if six.PY3:
22    from base64 import decodebytes as base64decode
23else:
24    from base64 import decodestring as base64decode
25
26if sys.version_info[0] == 2 and sys.version_info[1] < 7:
27    import unittest2 as unittest
28else:
29    import unittest
30
31try:
32    from ssl import SSLError
33except ImportError:
34    # dummy class of SSLError for ssl none-support environment.
35    class SSLError(Exception):
36        pass
37
38# Skip test to access the internet.
39TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
40
41# Skip Secure WebSocket test.
42TEST_SECURE_WS = True
43TRACEABLE = False
44
45
46def create_mask_key(_):
47    return "abcd"
48
49
50class SockMock(object):
51    def __init__(self):
52        self.data = []
53        self.sent = []
54
55    def add_packet(self, data):
56        self.data.append(data)
57
58    def recv(self, bufsize):
59        if self.data:
60            e = self.data.pop(0)
61            if isinstance(e, Exception):
62                raise e
63            if len(e) > bufsize:
64                self.data.insert(0, e[bufsize:])
65            return e[:bufsize]
66
67    def send(self, data):
68        self.sent.append(data)
69        return len(data)
70
71    def close(self):
72        pass
73
74
75class HeaderSockMock(SockMock):
76
77    def __init__(self, fname):
78        SockMock.__init__(self)
79        path = os.path.join(os.path.dirname(__file__), fname)
80        with open(path, "rb") as f:
81            self.add_packet(f.read())
82
83
84class WebSocketTest(unittest.TestCase):
85    def setUp(self):
86        ws.enableTrace(TRACEABLE)
87
88    def tearDown(self):
89        pass
90
91    def testDefaultTimeout(self):
92        self.assertEqual(ws.getdefaulttimeout(), None)
93        ws.setdefaulttimeout(10)
94        self.assertEqual(ws.getdefaulttimeout(), 10)
95        ws.setdefaulttimeout(None)
96
97    def testParseUrl(self):
98        p = parse_url("ws://www.example.com/r")
99        self.assertEqual(p[0], "www.example.com")
100        self.assertEqual(p[1], 80)
101        self.assertEqual(p[2], "/r")
102        self.assertEqual(p[3], False)
103
104        p = parse_url("ws://www.example.com/r/")
105        self.assertEqual(p[0], "www.example.com")
106        self.assertEqual(p[1], 80)
107        self.assertEqual(p[2], "/r/")
108        self.assertEqual(p[3], False)
109
110        p = parse_url("ws://www.example.com/")
111        self.assertEqual(p[0], "www.example.com")
112        self.assertEqual(p[1], 80)
113        self.assertEqual(p[2], "/")
114        self.assertEqual(p[3], False)
115
116        p = parse_url("ws://www.example.com")
117        self.assertEqual(p[0], "www.example.com")
118        self.assertEqual(p[1], 80)
119        self.assertEqual(p[2], "/")
120        self.assertEqual(p[3], False)
121
122        p = parse_url("ws://www.example.com:8080/r")
123        self.assertEqual(p[0], "www.example.com")
124        self.assertEqual(p[1], 8080)
125        self.assertEqual(p[2], "/r")
126        self.assertEqual(p[3], False)
127
128        p = parse_url("ws://www.example.com:8080/")
129        self.assertEqual(p[0], "www.example.com")
130        self.assertEqual(p[1], 8080)
131        self.assertEqual(p[2], "/")
132        self.assertEqual(p[3], False)
133
134        p = parse_url("ws://www.example.com:8080")
135        self.assertEqual(p[0], "www.example.com")
136        self.assertEqual(p[1], 8080)
137        self.assertEqual(p[2], "/")
138        self.assertEqual(p[3], False)
139
140        p = parse_url("wss://www.example.com:8080/r")
141        self.assertEqual(p[0], "www.example.com")
142        self.assertEqual(p[1], 8080)
143        self.assertEqual(p[2], "/r")
144        self.assertEqual(p[3], True)
145
146        p = parse_url("wss://www.example.com:8080/r?key=value")
147        self.assertEqual(p[0], "www.example.com")
148        self.assertEqual(p[1], 8080)
149        self.assertEqual(p[2], "/r?key=value")
150        self.assertEqual(p[3], True)
151
152        self.assertRaises(ValueError, parse_url, "http://www.example.com/r")
153
154        if sys.version_info[0] == 2 and sys.version_info[1] < 7:
155            return
156
157        p = parse_url("ws://[2a03:4000:123:83::3]/r")
158        self.assertEqual(p[0], "2a03:4000:123:83::3")
159        self.assertEqual(p[1], 80)
160        self.assertEqual(p[2], "/r")
161        self.assertEqual(p[3], False)
162
163        p = parse_url("ws://[2a03:4000:123:83::3]:8080/r")
164        self.assertEqual(p[0], "2a03:4000:123:83::3")
165        self.assertEqual(p[1], 8080)
166        self.assertEqual(p[2], "/r")
167        self.assertEqual(p[3], False)
168
169        p = parse_url("wss://[2a03:4000:123:83::3]/r")
170        self.assertEqual(p[0], "2a03:4000:123:83::3")
171        self.assertEqual(p[1], 443)
172        self.assertEqual(p[2], "/r")
173        self.assertEqual(p[3], True)
174
175        p = parse_url("wss://[2a03:4000:123:83::3]:8080/r")
176        self.assertEqual(p[0], "2a03:4000:123:83::3")
177        self.assertEqual(p[1], 8080)
178        self.assertEqual(p[2], "/r")
179        self.assertEqual(p[3], True)
180
181    def testWSKey(self):
182        key = _create_sec_websocket_key()
183        self.assertTrue(key != 24)
184        self.assertTrue(six.u("¥n") not in key)
185
186    def testWsUtils(self):
187        key = "c6b8hTg4EeGb2gQMztV1/g=="
188        required_header = {
189            "upgrade": "websocket",
190            "connection": "upgrade",
191            "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=",
192            }
193        self.assertEqual(_validate_header(required_header, key, None), (True, None))
194
195        header = required_header.copy()
196        header["upgrade"] = "http"
197        self.assertEqual(_validate_header(header, key, None), (False, None))
198        del header["upgrade"]
199        self.assertEqual(_validate_header(header, key, None), (False, None))
200
201        header = required_header.copy()
202        header["connection"] = "something"
203        self.assertEqual(_validate_header(header, key, None), (False, None))
204        del header["connection"]
205        self.assertEqual(_validate_header(header, key, None), (False, None))
206
207        header = required_header.copy()
208        header["sec-websocket-accept"] = "something"
209        self.assertEqual(_validate_header(header, key, None), (False, None))
210        del header["sec-websocket-accept"]
211        self.assertEqual(_validate_header(header, key, None), (False, None))
212
213        header = required_header.copy()
214        header["sec-websocket-protocol"] = "sub1"
215        self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1"))
216        self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None))
217
218        header = required_header.copy()
219        header["sec-websocket-protocol"] = "sUb1"
220        self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1"))
221
222
223    def testReadHeader(self):
224        status, header = read_headers(HeaderSockMock("data/header01.txt"))
225        self.assertEqual(status, 101)
226        self.assertEqual(header["connection"], "Upgrade")
227
228        HeaderSockMock("data/header02.txt")
229        self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
230
231    def testSend(self):
232        # TODO: add longer frame data
233        sock = ws.WebSocket()
234        sock.set_mask_key(create_mask_key)
235        s = sock.sock = HeaderSockMock("data/header01.txt")
236        sock.send("Hello")
237        self.assertEqual(s.sent[0], six.b("\x81\x85abcd)\x07\x0f\x08\x0e"))
238
239        sock.send("こんにちは")
240        self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"))
241
242        sock.send(u"こんにちは")
243        self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc"))
244
245        sock.send("x" * 127)
246
247    def testRecv(self):
248        # TODO: add longer frame data
249        sock = ws.WebSocket()
250        s = sock.sock = SockMock()
251        something = six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
252        s.add_packet(something)
253        data = sock.recv()
254        self.assertEqual(data, "こんにちは")
255
256        s.add_packet(six.b("\x81\x85abcd)\x07\x0f\x08\x0e"))
257        data = sock.recv()
258        self.assertEqual(data, "Hello")
259
260    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
261    def testIter(self):
262        count = 2
263        for _ in ws.create_connection('ws://stream.meetup.com/2/rsvps'):
264            count -= 1
265            if count == 0:
266                break
267
268    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
269    def testNext(self):
270        sock = ws.create_connection('ws://stream.meetup.com/2/rsvps')
271        self.assertEqual(str, type(next(sock)))
272
273    def testInternalRecvStrict(self):
274        sock = ws.WebSocket()
275        s = sock.sock = SockMock()
276        s.add_packet(six.b("foo"))
277        s.add_packet(socket.timeout())
278        s.add_packet(six.b("bar"))
279        # s.add_packet(SSLError("The read operation timed out"))
280        s.add_packet(six.b("baz"))
281        with self.assertRaises(ws.WebSocketTimeoutException):
282            sock.frame_buffer.recv_strict(9)
283        # if six.PY2:
284        #     with self.assertRaises(ws.WebSocketTimeoutException):
285        #         data = sock._recv_strict(9)
286        # else:
287        #     with self.assertRaises(SSLError):
288        #         data = sock._recv_strict(9)
289        data = sock.frame_buffer.recv_strict(9)
290        self.assertEqual(data, six.b("foobarbaz"))
291        with self.assertRaises(ws.WebSocketConnectionClosedException):
292            sock.frame_buffer.recv_strict(1)
293
294    def testRecvTimeout(self):
295        sock = ws.WebSocket()
296        s = sock.sock = SockMock()
297        s.add_packet(six.b("\x81"))
298        s.add_packet(socket.timeout())
299        s.add_packet(six.b("\x8dabcd\x29\x07\x0f\x08\x0e"))
300        s.add_packet(socket.timeout())
301        s.add_packet(six.b("\x4e\x43\x33\x0e\x10\x0f\x00\x40"))
302        with self.assertRaises(ws.WebSocketTimeoutException):
303            sock.recv()
304        with self.assertRaises(ws.WebSocketTimeoutException):
305            sock.recv()
306        data = sock.recv()
307        self.assertEqual(data, "Hello, World!")
308        with self.assertRaises(ws.WebSocketConnectionClosedException):
309            sock.recv()
310
311    def testRecvWithSimpleFragmentation(self):
312        sock = ws.WebSocket()
313        s = sock.sock = SockMock()
314        # OPCODE=TEXT, FIN=0, MSG="Brevity is "
315        s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
316        # OPCODE=CONT, FIN=1, MSG="the soul of wit"
317        s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17"))
318        data = sock.recv()
319        self.assertEqual(data, "Brevity is the soul of wit")
320        with self.assertRaises(ws.WebSocketConnectionClosedException):
321            sock.recv()
322
323    def testRecvWithFireEventOfFragmentation(self):
324        sock = ws.WebSocket(fire_cont_frame=True)
325        s = sock.sock = SockMock()
326        # OPCODE=TEXT, FIN=0, MSG="Brevity is "
327        s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
328        # OPCODE=CONT, FIN=0, MSG="Brevity is "
329        s.add_packet(six.b("\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
330        # OPCODE=CONT, FIN=1, MSG="the soul of wit"
331        s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17"))
332
333        _, data = sock.recv_data()
334        self.assertEqual(data, six.b("Brevity is "))
335        _, data = sock.recv_data()
336        self.assertEqual(data, six.b("Brevity is "))
337        _, data = sock.recv_data()
338        self.assertEqual(data, six.b("the soul of wit"))
339
340        # OPCODE=CONT, FIN=0, MSG="Brevity is "
341        s.add_packet(six.b("\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C"))
342
343        with self.assertRaises(ws.WebSocketException):
344            sock.recv_data()
345
346        with self.assertRaises(ws.WebSocketConnectionClosedException):
347            sock.recv()
348
349    def testClose(self):
350        sock = ws.WebSocket()
351        sock.sock = SockMock()
352        sock.connected = True
353        sock.close()
354        self.assertEqual(sock.connected, False)
355
356        sock = ws.WebSocket()
357        s = sock.sock = SockMock()
358        sock.connected = True
359        s.add_packet(six.b('\x88\x80\x17\x98p\x84'))
360        sock.recv()
361        self.assertEqual(sock.connected, False)
362
363    def testRecvContFragmentation(self):
364        sock = ws.WebSocket()
365        s = sock.sock = SockMock()
366        # OPCODE=CONT, FIN=1, MSG="the soul of wit"
367        s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17"))
368        self.assertRaises(ws.WebSocketException, sock.recv)
369
370    def testRecvWithProlongedFragmentation(self):
371        sock = ws.WebSocket()
372        s = sock.sock = SockMock()
373        # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
374        s.add_packet(six.b("\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15"
375                           "\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC"))
376        # OPCODE=CONT, FIN=0, MSG="dear friends, "
377        s.add_packet(six.b("\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07"
378                           "\x17MB"))
379        # OPCODE=CONT, FIN=1, MSG="once more"
380        s.add_packet(six.b("\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04"))
381        data = sock.recv()
382        self.assertEqual(
383            data,
384            "Once more unto the breach, dear friends, once more")
385        with self.assertRaises(ws.WebSocketConnectionClosedException):
386            sock.recv()
387
388    def testRecvWithFragmentationAndControlFrame(self):
389        sock = ws.WebSocket()
390        sock.set_mask_key(create_mask_key)
391        s = sock.sock = SockMock()
392        # OPCODE=TEXT, FIN=0, MSG="Too much "
393        s.add_packet(six.b("\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA"))
394        # OPCODE=PING, FIN=1, MSG="Please PONG this"
395        s.add_packet(six.b("\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17"))
396        # OPCODE=CONT, FIN=1, MSG="of a good thing"
397        s.add_packet(six.b("\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c"
398                           "\x08\x0c\x04"))
399        data = sock.recv()
400        self.assertEqual(data, "Too much of a good thing")
401        with self.assertRaises(ws.WebSocketConnectionClosedException):
402            sock.recv()
403        self.assertEqual(
404            s.sent[0],
405            six.b("\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17"))
406
407    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
408    def testWebSocket(self):
409        s = ws.create_connection("ws://echo.websocket.org/")
410        self.assertNotEqual(s, None)
411        s.send("Hello, World")
412        result = s.recv()
413        self.assertEqual(result, "Hello, World")
414
415        s.send(u"こにゃにゃちは、世界")
416        result = s.recv()
417        self.assertEqual(result, "こにゃにゃちは、世界")
418        s.close()
419
420    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
421    def testPingPong(self):
422        s = ws.create_connection("ws://echo.websocket.org/")
423        self.assertNotEqual(s, None)
424        s.ping("Hello")
425        s.pong("Hi")
426        s.close()
427
428    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
429    @unittest.skipUnless(TEST_SECURE_WS, "wss://echo.websocket.org doesn't work well.")
430    def testSecureWebSocket(self):
431        if 1:
432            import ssl
433            s = ws.create_connection("wss://echo.websocket.org/")
434            self.assertNotEqual(s, None)
435            self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
436            s.send("Hello, World")
437            result = s.recv()
438            self.assertEqual(result, "Hello, World")
439            s.send(u"こにゃにゃちは、世界")
440            result = s.recv()
441            self.assertEqual(result, "こにゃにゃちは、世界")
442            s.close()
443        #except:
444        #    pass
445
446    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
447    def testWebSocketWihtCustomHeader(self):
448        s = ws.create_connection("ws://echo.websocket.org/",
449                                 headers={"User-Agent": "PythonWebsocketClient"})
450        self.assertNotEqual(s, None)
451        s.send("Hello, World")
452        result = s.recv()
453        self.assertEqual(result, "Hello, World")
454        s.close()
455
456    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
457    def testAfterClose(self):
458        s = ws.create_connection("ws://echo.websocket.org/")
459        self.assertNotEqual(s, None)
460        s.close()
461        self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
462        self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
463
464    def testNonce(self):
465        """ WebSocket key should be a random 16-byte nonce.
466        """
467        key = _create_sec_websocket_key()
468        nonce = base64decode(key.encode("utf-8"))
469        self.assertEqual(16, len(nonce))
470
471
472class WebSocketAppTest(unittest.TestCase):
473
474    class NotSetYet(object):
475        """ A marker class for signalling that a value hasn't been set yet.
476        """
477
478    def setUp(self):
479        ws.enableTrace(TRACEABLE)
480
481        WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
482        WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
483        WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
484
485    def tearDown(self):
486        WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
487        WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
488        WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
489
490    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
491    def testKeepRunning(self):
492        """ A WebSocketApp should keep running as long as its self.keep_running
493        is not False (in the boolean context).
494        """
495
496        def on_open(self, *args, **kwargs):
497            """ Set the keep_running flag for later inspection and immediately
498            close the connection.
499            """
500            WebSocketAppTest.keep_running_open = self.keep_running
501
502            self.close()
503
504        def on_close(self, *args, **kwargs):
505            """ Set the keep_running flag for the test to use.
506            """
507            WebSocketAppTest.keep_running_close = self.keep_running
508
509        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
510        app.run_forever()
511
512        self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
513                                    WebSocketAppTest.NotSetYet))
514
515        self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
516                                    WebSocketAppTest.NotSetYet))
517
518        self.assertEqual(True, WebSocketAppTest.keep_running_open)
519        self.assertEqual(False, WebSocketAppTest.keep_running_close)
520
521    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
522    def testSockMaskKey(self):
523        """ A WebSocketApp should forward the received mask_key function down
524        to the actual socket.
525        """
526
527        def my_mask_key_func():
528            pass
529
530        def on_open(self, *args, **kwargs):
531            """ Set the value so the test can use it later on and immediately
532            close the connection.
533            """
534            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
535            self.close()
536
537        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
538        app.run_forever()
539
540        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
541        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
542
543
544class SockOptTest(unittest.TestCase):
545    @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
546    def testSockOpt(self):
547        sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
548        s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt)
549        self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
550        s.close()
551
552class UtilsTest(unittest.TestCase):
553    def testUtf8Validator(self):
554        state = validate_utf8(six.b('\xf0\x90\x80\x80'))
555        self.assertEqual(state, True)
556        state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited'))
557        self.assertEqual(state, False)
558        state = validate_utf8(six.b(''))
559        self.assertEqual(state, True)
560
561class ProxyInfoTest(unittest.TestCase):
562    def setUp(self):
563        self.http_proxy = os.environ.get("http_proxy", None)
564        self.https_proxy = os.environ.get("https_proxy", None)
565        if "http_proxy" in os.environ:
566            del os.environ["http_proxy"]
567        if "https_proxy" in os.environ:
568            del os.environ["https_proxy"]
569
570    def tearDown(self):
571        if self.http_proxy:
572            os.environ["http_proxy"] = self.http_proxy
573        elif "http_proxy" in os.environ:
574            del os.environ["http_proxy"]
575
576        if self.https_proxy:
577            os.environ["https_proxy"] = self.https_proxy
578        elif "https_proxy" in os.environ:
579            del os.environ["https_proxy"]
580
581
582    def testProxyFromArgs(self):
583        self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None))
584        self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128), ("localhost", 3128, None))
585        self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None))
586        self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128), ("localhost", 3128, None))
587
588        self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")),
589            ("localhost", 0, ("a", "b")))
590        self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
591            ("localhost", 3128, ("a", "b")))
592        self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")),
593            ("localhost", 0, ("a", "b")))
594        self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
595            ("localhost", 3128, ("a", "b")))
596
597        self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, no_proxy=["example.com"], proxy_auth=("a", "b")),
598            ("localhost", 3128, ("a", "b")))
599        self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")),
600            (None, 0, None))
601
602
603    def testProxyFromEnv(self):
604        os.environ["http_proxy"] = "http://localhost/"
605        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
606        os.environ["http_proxy"] = "http://localhost:3128/"
607        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
608
609        os.environ["http_proxy"] = "http://localhost/"
610        os.environ["https_proxy"] = "http://localhost2/"
611        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
612        os.environ["http_proxy"] = "http://localhost:3128/"
613        os.environ["https_proxy"] = "http://localhost2:3128/"
614        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
615
616        os.environ["http_proxy"] = "http://localhost/"
617        os.environ["https_proxy"] = "http://localhost2/"
618        self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None))
619        os.environ["http_proxy"] = "http://localhost:3128/"
620        os.environ["https_proxy"] = "http://localhost2:3128/"
621        self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None))
622
623
624        os.environ["http_proxy"] = "http://a:b@localhost/"
625        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
626        os.environ["http_proxy"] = "http://a:b@localhost:3128/"
627        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
628
629        os.environ["http_proxy"] = "http://a:b@localhost/"
630        os.environ["https_proxy"] = "http://a:b@localhost2/"
631        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
632        os.environ["http_proxy"] = "http://a:b@localhost:3128/"
633        os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
634        self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
635
636        os.environ["http_proxy"] = "http://a:b@localhost/"
637        os.environ["https_proxy"] = "http://a:b@localhost2/"
638        self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b")))
639        os.environ["http_proxy"] = "http://a:b@localhost:3128/"
640        os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
641        self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b")))
642
643        os.environ["http_proxy"] = "http://a:b@localhost/"
644        os.environ["https_proxy"] = "http://a:b@localhost2/"
645        os.environ["no_proxy"] = "example1.com,example2.com"
646        self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b")))
647        os.environ["http_proxy"] = "http://a:b@localhost:3128/"
648        os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
649        os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org"
650        self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
651
652        os.environ["http_proxy"] = "http://a:b@localhost:3128/"
653        os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
654        os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16"
655        self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None))
656        self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None))
657
658
659if __name__ == "__main__":
660    unittest.main()
661