1# -*- coding: utf-8 -*- 2# 3 4import sys 5sys.path[0:0] = [""] 6 7import os 8import os.path 9import socket 10 11import six 12 13# websocket-client 14import websocket as ws 15from websocket._handshake import _create_sec_websocket_key, \ 16 _validate as _validate_header 17from websocket._http import read_headers 18from websocket._url import get_proxy_info, parse_url 19from websocket._utils import validate_utf8 20 21if six.PY3: 22 from base64 import decodebytes as base64decode 23else: 24 from base64 import decodestring as base64decode 25 26if sys.version_info[0] == 2 and sys.version_info[1] < 7: 27 import unittest2 as unittest 28else: 29 import unittest 30 31try: 32 from ssl import SSLError 33except ImportError: 34 # dummy class of SSLError for ssl none-support environment. 35 class SSLError(Exception): 36 pass 37 38# Skip test to access the internet. 39TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' 40 41# Skip Secure WebSocket test. 42TEST_SECURE_WS = True 43TRACEABLE = False 44 45 46def create_mask_key(_): 47 return "abcd" 48 49 50class SockMock(object): 51 def __init__(self): 52 self.data = [] 53 self.sent = [] 54 55 def add_packet(self, data): 56 self.data.append(data) 57 58 def recv(self, bufsize): 59 if self.data: 60 e = self.data.pop(0) 61 if isinstance(e, Exception): 62 raise e 63 if len(e) > bufsize: 64 self.data.insert(0, e[bufsize:]) 65 return e[:bufsize] 66 67 def send(self, data): 68 self.sent.append(data) 69 return len(data) 70 71 def close(self): 72 pass 73 74 75class HeaderSockMock(SockMock): 76 77 def __init__(self, fname): 78 SockMock.__init__(self) 79 path = os.path.join(os.path.dirname(__file__), fname) 80 with open(path, "rb") as f: 81 self.add_packet(f.read()) 82 83 84class WebSocketTest(unittest.TestCase): 85 def setUp(self): 86 ws.enableTrace(TRACEABLE) 87 88 def tearDown(self): 89 pass 90 91 def testDefaultTimeout(self): 92 self.assertEqual(ws.getdefaulttimeout(), None) 93 ws.setdefaulttimeout(10) 94 self.assertEqual(ws.getdefaulttimeout(), 10) 95 ws.setdefaulttimeout(None) 96 97 def testParseUrl(self): 98 p = parse_url("ws://www.example.com/r") 99 self.assertEqual(p[0], "www.example.com") 100 self.assertEqual(p[1], 80) 101 self.assertEqual(p[2], "/r") 102 self.assertEqual(p[3], False) 103 104 p = parse_url("ws://www.example.com/r/") 105 self.assertEqual(p[0], "www.example.com") 106 self.assertEqual(p[1], 80) 107 self.assertEqual(p[2], "/r/") 108 self.assertEqual(p[3], False) 109 110 p = parse_url("ws://www.example.com/") 111 self.assertEqual(p[0], "www.example.com") 112 self.assertEqual(p[1], 80) 113 self.assertEqual(p[2], "/") 114 self.assertEqual(p[3], False) 115 116 p = parse_url("ws://www.example.com") 117 self.assertEqual(p[0], "www.example.com") 118 self.assertEqual(p[1], 80) 119 self.assertEqual(p[2], "/") 120 self.assertEqual(p[3], False) 121 122 p = parse_url("ws://www.example.com:8080/r") 123 self.assertEqual(p[0], "www.example.com") 124 self.assertEqual(p[1], 8080) 125 self.assertEqual(p[2], "/r") 126 self.assertEqual(p[3], False) 127 128 p = parse_url("ws://www.example.com:8080/") 129 self.assertEqual(p[0], "www.example.com") 130 self.assertEqual(p[1], 8080) 131 self.assertEqual(p[2], "/") 132 self.assertEqual(p[3], False) 133 134 p = parse_url("ws://www.example.com:8080") 135 self.assertEqual(p[0], "www.example.com") 136 self.assertEqual(p[1], 8080) 137 self.assertEqual(p[2], "/") 138 self.assertEqual(p[3], False) 139 140 p = parse_url("wss://www.example.com:8080/r") 141 self.assertEqual(p[0], "www.example.com") 142 self.assertEqual(p[1], 8080) 143 self.assertEqual(p[2], "/r") 144 self.assertEqual(p[3], True) 145 146 p = parse_url("wss://www.example.com:8080/r?key=value") 147 self.assertEqual(p[0], "www.example.com") 148 self.assertEqual(p[1], 8080) 149 self.assertEqual(p[2], "/r?key=value") 150 self.assertEqual(p[3], True) 151 152 self.assertRaises(ValueError, parse_url, "http://www.example.com/r") 153 154 if sys.version_info[0] == 2 and sys.version_info[1] < 7: 155 return 156 157 p = parse_url("ws://[2a03:4000:123:83::3]/r") 158 self.assertEqual(p[0], "2a03:4000:123:83::3") 159 self.assertEqual(p[1], 80) 160 self.assertEqual(p[2], "/r") 161 self.assertEqual(p[3], False) 162 163 p = parse_url("ws://[2a03:4000:123:83::3]:8080/r") 164 self.assertEqual(p[0], "2a03:4000:123:83::3") 165 self.assertEqual(p[1], 8080) 166 self.assertEqual(p[2], "/r") 167 self.assertEqual(p[3], False) 168 169 p = parse_url("wss://[2a03:4000:123:83::3]/r") 170 self.assertEqual(p[0], "2a03:4000:123:83::3") 171 self.assertEqual(p[1], 443) 172 self.assertEqual(p[2], "/r") 173 self.assertEqual(p[3], True) 174 175 p = parse_url("wss://[2a03:4000:123:83::3]:8080/r") 176 self.assertEqual(p[0], "2a03:4000:123:83::3") 177 self.assertEqual(p[1], 8080) 178 self.assertEqual(p[2], "/r") 179 self.assertEqual(p[3], True) 180 181 def testWSKey(self): 182 key = _create_sec_websocket_key() 183 self.assertTrue(key != 24) 184 self.assertTrue(six.u("¥n") not in key) 185 186 def testWsUtils(self): 187 key = "c6b8hTg4EeGb2gQMztV1/g==" 188 required_header = { 189 "upgrade": "websocket", 190 "connection": "upgrade", 191 "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=", 192 } 193 self.assertEqual(_validate_header(required_header, key, None), (True, None)) 194 195 header = required_header.copy() 196 header["upgrade"] = "http" 197 self.assertEqual(_validate_header(header, key, None), (False, None)) 198 del header["upgrade"] 199 self.assertEqual(_validate_header(header, key, None), (False, None)) 200 201 header = required_header.copy() 202 header["connection"] = "something" 203 self.assertEqual(_validate_header(header, key, None), (False, None)) 204 del header["connection"] 205 self.assertEqual(_validate_header(header, key, None), (False, None)) 206 207 header = required_header.copy() 208 header["sec-websocket-accept"] = "something" 209 self.assertEqual(_validate_header(header, key, None), (False, None)) 210 del header["sec-websocket-accept"] 211 self.assertEqual(_validate_header(header, key, None), (False, None)) 212 213 header = required_header.copy() 214 header["sec-websocket-protocol"] = "sub1" 215 self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1")) 216 self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None)) 217 218 header = required_header.copy() 219 header["sec-websocket-protocol"] = "sUb1" 220 self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1")) 221 222 223 def testReadHeader(self): 224 status, header = read_headers(HeaderSockMock("data/header01.txt")) 225 self.assertEqual(status, 101) 226 self.assertEqual(header["connection"], "Upgrade") 227 228 HeaderSockMock("data/header02.txt") 229 self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")) 230 231 def testSend(self): 232 # TODO: add longer frame data 233 sock = ws.WebSocket() 234 sock.set_mask_key(create_mask_key) 235 s = sock.sock = HeaderSockMock("data/header01.txt") 236 sock.send("Hello") 237 self.assertEqual(s.sent[0], six.b("\x81\x85abcd)\x07\x0f\x08\x0e")) 238 239 sock.send("こんにちは") 240 self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")) 241 242 sock.send(u"こんにちは") 243 self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")) 244 245 sock.send("x" * 127) 246 247 def testRecv(self): 248 # TODO: add longer frame data 249 sock = ws.WebSocket() 250 s = sock.sock = SockMock() 251 something = six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") 252 s.add_packet(something) 253 data = sock.recv() 254 self.assertEqual(data, "こんにちは") 255 256 s.add_packet(six.b("\x81\x85abcd)\x07\x0f\x08\x0e")) 257 data = sock.recv() 258 self.assertEqual(data, "Hello") 259 260 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 261 def testIter(self): 262 count = 2 263 for _ in ws.create_connection('ws://stream.meetup.com/2/rsvps'): 264 count -= 1 265 if count == 0: 266 break 267 268 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 269 def testNext(self): 270 sock = ws.create_connection('ws://stream.meetup.com/2/rsvps') 271 self.assertEqual(str, type(next(sock))) 272 273 def testInternalRecvStrict(self): 274 sock = ws.WebSocket() 275 s = sock.sock = SockMock() 276 s.add_packet(six.b("foo")) 277 s.add_packet(socket.timeout()) 278 s.add_packet(six.b("bar")) 279 # s.add_packet(SSLError("The read operation timed out")) 280 s.add_packet(six.b("baz")) 281 with self.assertRaises(ws.WebSocketTimeoutException): 282 sock.frame_buffer.recv_strict(9) 283 # if six.PY2: 284 # with self.assertRaises(ws.WebSocketTimeoutException): 285 # data = sock._recv_strict(9) 286 # else: 287 # with self.assertRaises(SSLError): 288 # data = sock._recv_strict(9) 289 data = sock.frame_buffer.recv_strict(9) 290 self.assertEqual(data, six.b("foobarbaz")) 291 with self.assertRaises(ws.WebSocketConnectionClosedException): 292 sock.frame_buffer.recv_strict(1) 293 294 def testRecvTimeout(self): 295 sock = ws.WebSocket() 296 s = sock.sock = SockMock() 297 s.add_packet(six.b("\x81")) 298 s.add_packet(socket.timeout()) 299 s.add_packet(six.b("\x8dabcd\x29\x07\x0f\x08\x0e")) 300 s.add_packet(socket.timeout()) 301 s.add_packet(six.b("\x4e\x43\x33\x0e\x10\x0f\x00\x40")) 302 with self.assertRaises(ws.WebSocketTimeoutException): 303 sock.recv() 304 with self.assertRaises(ws.WebSocketTimeoutException): 305 sock.recv() 306 data = sock.recv() 307 self.assertEqual(data, "Hello, World!") 308 with self.assertRaises(ws.WebSocketConnectionClosedException): 309 sock.recv() 310 311 def testRecvWithSimpleFragmentation(self): 312 sock = ws.WebSocket() 313 s = sock.sock = SockMock() 314 # OPCODE=TEXT, FIN=0, MSG="Brevity is " 315 s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) 316 # OPCODE=CONT, FIN=1, MSG="the soul of wit" 317 s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")) 318 data = sock.recv() 319 self.assertEqual(data, "Brevity is the soul of wit") 320 with self.assertRaises(ws.WebSocketConnectionClosedException): 321 sock.recv() 322 323 def testRecvWithFireEventOfFragmentation(self): 324 sock = ws.WebSocket(fire_cont_frame=True) 325 s = sock.sock = SockMock() 326 # OPCODE=TEXT, FIN=0, MSG="Brevity is " 327 s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) 328 # OPCODE=CONT, FIN=0, MSG="Brevity is " 329 s.add_packet(six.b("\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) 330 # OPCODE=CONT, FIN=1, MSG="the soul of wit" 331 s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")) 332 333 _, data = sock.recv_data() 334 self.assertEqual(data, six.b("Brevity is ")) 335 _, data = sock.recv_data() 336 self.assertEqual(data, six.b("Brevity is ")) 337 _, data = sock.recv_data() 338 self.assertEqual(data, six.b("the soul of wit")) 339 340 # OPCODE=CONT, FIN=0, MSG="Brevity is " 341 s.add_packet(six.b("\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) 342 343 with self.assertRaises(ws.WebSocketException): 344 sock.recv_data() 345 346 with self.assertRaises(ws.WebSocketConnectionClosedException): 347 sock.recv() 348 349 def testClose(self): 350 sock = ws.WebSocket() 351 sock.sock = SockMock() 352 sock.connected = True 353 sock.close() 354 self.assertEqual(sock.connected, False) 355 356 sock = ws.WebSocket() 357 s = sock.sock = SockMock() 358 sock.connected = True 359 s.add_packet(six.b('\x88\x80\x17\x98p\x84')) 360 sock.recv() 361 self.assertEqual(sock.connected, False) 362 363 def testRecvContFragmentation(self): 364 sock = ws.WebSocket() 365 s = sock.sock = SockMock() 366 # OPCODE=CONT, FIN=1, MSG="the soul of wit" 367 s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")) 368 self.assertRaises(ws.WebSocketException, sock.recv) 369 370 def testRecvWithProlongedFragmentation(self): 371 sock = ws.WebSocket() 372 s = sock.sock = SockMock() 373 # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, " 374 s.add_packet(six.b("\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15" 375 "\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC")) 376 # OPCODE=CONT, FIN=0, MSG="dear friends, " 377 s.add_packet(six.b("\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07" 378 "\x17MB")) 379 # OPCODE=CONT, FIN=1, MSG="once more" 380 s.add_packet(six.b("\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04")) 381 data = sock.recv() 382 self.assertEqual( 383 data, 384 "Once more unto the breach, dear friends, once more") 385 with self.assertRaises(ws.WebSocketConnectionClosedException): 386 sock.recv() 387 388 def testRecvWithFragmentationAndControlFrame(self): 389 sock = ws.WebSocket() 390 sock.set_mask_key(create_mask_key) 391 s = sock.sock = SockMock() 392 # OPCODE=TEXT, FIN=0, MSG="Too much " 393 s.add_packet(six.b("\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA")) 394 # OPCODE=PING, FIN=1, MSG="Please PONG this" 395 s.add_packet(six.b("\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17")) 396 # OPCODE=CONT, FIN=1, MSG="of a good thing" 397 s.add_packet(six.b("\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c" 398 "\x08\x0c\x04")) 399 data = sock.recv() 400 self.assertEqual(data, "Too much of a good thing") 401 with self.assertRaises(ws.WebSocketConnectionClosedException): 402 sock.recv() 403 self.assertEqual( 404 s.sent[0], 405 six.b("\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17")) 406 407 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 408 def testWebSocket(self): 409 s = ws.create_connection("ws://echo.websocket.org/") 410 self.assertNotEqual(s, None) 411 s.send("Hello, World") 412 result = s.recv() 413 self.assertEqual(result, "Hello, World") 414 415 s.send(u"こにゃにゃちは、世界") 416 result = s.recv() 417 self.assertEqual(result, "こにゃにゃちは、世界") 418 s.close() 419 420 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 421 def testPingPong(self): 422 s = ws.create_connection("ws://echo.websocket.org/") 423 self.assertNotEqual(s, None) 424 s.ping("Hello") 425 s.pong("Hi") 426 s.close() 427 428 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 429 @unittest.skipUnless(TEST_SECURE_WS, "wss://echo.websocket.org doesn't work well.") 430 def testSecureWebSocket(self): 431 if 1: 432 import ssl 433 s = ws.create_connection("wss://echo.websocket.org/") 434 self.assertNotEqual(s, None) 435 self.assertTrue(isinstance(s.sock, ssl.SSLSocket)) 436 s.send("Hello, World") 437 result = s.recv() 438 self.assertEqual(result, "Hello, World") 439 s.send(u"こにゃにゃちは、世界") 440 result = s.recv() 441 self.assertEqual(result, "こにゃにゃちは、世界") 442 s.close() 443 #except: 444 # pass 445 446 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 447 def testWebSocketWihtCustomHeader(self): 448 s = ws.create_connection("ws://echo.websocket.org/", 449 headers={"User-Agent": "PythonWebsocketClient"}) 450 self.assertNotEqual(s, None) 451 s.send("Hello, World") 452 result = s.recv() 453 self.assertEqual(result, "Hello, World") 454 s.close() 455 456 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 457 def testAfterClose(self): 458 s = ws.create_connection("ws://echo.websocket.org/") 459 self.assertNotEqual(s, None) 460 s.close() 461 self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello") 462 self.assertRaises(ws.WebSocketConnectionClosedException, s.recv) 463 464 def testNonce(self): 465 """ WebSocket key should be a random 16-byte nonce. 466 """ 467 key = _create_sec_websocket_key() 468 nonce = base64decode(key.encode("utf-8")) 469 self.assertEqual(16, len(nonce)) 470 471 472class WebSocketAppTest(unittest.TestCase): 473 474 class NotSetYet(object): 475 """ A marker class for signalling that a value hasn't been set yet. 476 """ 477 478 def setUp(self): 479 ws.enableTrace(TRACEABLE) 480 481 WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() 482 WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() 483 WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() 484 485 def tearDown(self): 486 WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() 487 WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() 488 WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() 489 490 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 491 def testKeepRunning(self): 492 """ A WebSocketApp should keep running as long as its self.keep_running 493 is not False (in the boolean context). 494 """ 495 496 def on_open(self, *args, **kwargs): 497 """ Set the keep_running flag for later inspection and immediately 498 close the connection. 499 """ 500 WebSocketAppTest.keep_running_open = self.keep_running 501 502 self.close() 503 504 def on_close(self, *args, **kwargs): 505 """ Set the keep_running flag for the test to use. 506 """ 507 WebSocketAppTest.keep_running_close = self.keep_running 508 509 app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close) 510 app.run_forever() 511 512 self.assertFalse(isinstance(WebSocketAppTest.keep_running_open, 513 WebSocketAppTest.NotSetYet)) 514 515 self.assertFalse(isinstance(WebSocketAppTest.keep_running_close, 516 WebSocketAppTest.NotSetYet)) 517 518 self.assertEqual(True, WebSocketAppTest.keep_running_open) 519 self.assertEqual(False, WebSocketAppTest.keep_running_close) 520 521 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 522 def testSockMaskKey(self): 523 """ A WebSocketApp should forward the received mask_key function down 524 to the actual socket. 525 """ 526 527 def my_mask_key_func(): 528 pass 529 530 def on_open(self, *args, **kwargs): 531 """ Set the value so the test can use it later on and immediately 532 close the connection. 533 """ 534 WebSocketAppTest.get_mask_key_id = id(self.get_mask_key) 535 self.close() 536 537 app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func) 538 app.run_forever() 539 540 # Note: We can't use 'is' for comparing the functions directly, need to use 'id'. 541 self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func)) 542 543 544class SockOptTest(unittest.TestCase): 545 @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") 546 def testSockOpt(self): 547 sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),) 548 s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt) 549 self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) 550 s.close() 551 552class UtilsTest(unittest.TestCase): 553 def testUtf8Validator(self): 554 state = validate_utf8(six.b('\xf0\x90\x80\x80')) 555 self.assertEqual(state, True) 556 state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited')) 557 self.assertEqual(state, False) 558 state = validate_utf8(six.b('')) 559 self.assertEqual(state, True) 560 561class ProxyInfoTest(unittest.TestCase): 562 def setUp(self): 563 self.http_proxy = os.environ.get("http_proxy", None) 564 self.https_proxy = os.environ.get("https_proxy", None) 565 if "http_proxy" in os.environ: 566 del os.environ["http_proxy"] 567 if "https_proxy" in os.environ: 568 del os.environ["https_proxy"] 569 570 def tearDown(self): 571 if self.http_proxy: 572 os.environ["http_proxy"] = self.http_proxy 573 elif "http_proxy" in os.environ: 574 del os.environ["http_proxy"] 575 576 if self.https_proxy: 577 os.environ["https_proxy"] = self.https_proxy 578 elif "https_proxy" in os.environ: 579 del os.environ["https_proxy"] 580 581 582 def testProxyFromArgs(self): 583 self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None)) 584 self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128), ("localhost", 3128, None)) 585 self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None)) 586 self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128), ("localhost", 3128, None)) 587 588 self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")), 589 ("localhost", 0, ("a", "b"))) 590 self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), 591 ("localhost", 3128, ("a", "b"))) 592 self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")), 593 ("localhost", 0, ("a", "b"))) 594 self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), 595 ("localhost", 3128, ("a", "b"))) 596 597 self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, no_proxy=["example.com"], proxy_auth=("a", "b")), 598 ("localhost", 3128, ("a", "b"))) 599 self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")), 600 (None, 0, None)) 601 602 603 def testProxyFromEnv(self): 604 os.environ["http_proxy"] = "http://localhost/" 605 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None)) 606 os.environ["http_proxy"] = "http://localhost:3128/" 607 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None)) 608 609 os.environ["http_proxy"] = "http://localhost/" 610 os.environ["https_proxy"] = "http://localhost2/" 611 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None)) 612 os.environ["http_proxy"] = "http://localhost:3128/" 613 os.environ["https_proxy"] = "http://localhost2:3128/" 614 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None)) 615 616 os.environ["http_proxy"] = "http://localhost/" 617 os.environ["https_proxy"] = "http://localhost2/" 618 self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None)) 619 os.environ["http_proxy"] = "http://localhost:3128/" 620 os.environ["https_proxy"] = "http://localhost2:3128/" 621 self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None)) 622 623 624 os.environ["http_proxy"] = "http://a:b@localhost/" 625 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b"))) 626 os.environ["http_proxy"] = "http://a:b@localhost:3128/" 627 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b"))) 628 629 os.environ["http_proxy"] = "http://a:b@localhost/" 630 os.environ["https_proxy"] = "http://a:b@localhost2/" 631 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b"))) 632 os.environ["http_proxy"] = "http://a:b@localhost:3128/" 633 os.environ["https_proxy"] = "http://a:b@localhost2:3128/" 634 self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b"))) 635 636 os.environ["http_proxy"] = "http://a:b@localhost/" 637 os.environ["https_proxy"] = "http://a:b@localhost2/" 638 self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b"))) 639 os.environ["http_proxy"] = "http://a:b@localhost:3128/" 640 os.environ["https_proxy"] = "http://a:b@localhost2:3128/" 641 self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b"))) 642 643 os.environ["http_proxy"] = "http://a:b@localhost/" 644 os.environ["https_proxy"] = "http://a:b@localhost2/" 645 os.environ["no_proxy"] = "example1.com,example2.com" 646 self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b"))) 647 os.environ["http_proxy"] = "http://a:b@localhost:3128/" 648 os.environ["https_proxy"] = "http://a:b@localhost2:3128/" 649 os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org" 650 self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None)) 651 652 os.environ["http_proxy"] = "http://a:b@localhost:3128/" 653 os.environ["https_proxy"] = "http://a:b@localhost2:3128/" 654 os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16" 655 self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None)) 656 self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None)) 657 658 659if __name__ == "__main__": 660 unittest.main() 661