1#!/usr/bin/python 2""" 3Copyright 2016 Google Inc. All Rights Reserved. 4 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16""" 17import asyncore 18import gc 19import logging 20import platform 21import Queue 22import re 23import signal 24import socket 25import sys 26import threading 27import time 28 29server = None 30in_pipe = None 31out_pipe = None 32must_exit = False 33options = None 34dest_addresses = None 35connections = {} 36dns_cache = {} 37port_mappings = None 38map_localhost = False 39needs_flush = False 40flush_pipes = False 41last_activity = None 42REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0 43lock = threading.Lock() 44background_activity_count = 0 45 46 47def PrintMessage(msg): 48 # Print the message to stdout & flush to make sure that the message is not 49 # buffered when tsproxy is run as a subprocess. 50 print >> sys.stdout, msg 51 sys.stdout.flush() 52 53######################################################################################################################## 54# Traffic-shaping pipe (just passthrough for now) 55######################################################################################################################## 56class TSPipe(): 57 PIPE_IN = 0 58 PIPE_OUT = 1 59 60 def __init__(self, direction, latency, kbps): 61 self.direction = direction 62 self.latency = latency 63 self.kbps = kbps 64 self.queue = Queue.Queue() 65 self.last_tick = time.clock() 66 self.next_message = None 67 self.available_bytes = .0 68 self.peer = 'server' 69 if self.direction == self.PIPE_IN: 70 self.peer = 'client' 71 72 def SendMessage(self, message, main_thread = True): 73 global connections, in_pipe, out_pipe 74 message_sent = False 75 now = time.clock() 76 if message['message'] == 'closed': 77 message['time'] = now 78 else: 79 message['time'] = time.clock() + self.latency 80 message['size'] = .0 81 if 'data' in message: 82 message['size'] = float(len(message['data'])) 83 try: 84 connection_id = message['connection'] 85 # Send messages directly, bypassing the queues is throttling is disabled and we are on the main thread 86 if main_thread and connection_id in connections and self.peer in connections[connection_id]and self.latency == 0 and self.kbps == .0: 87 message_sent = self.SendPeerMessage(message) 88 except: 89 pass 90 if not message_sent: 91 try: 92 self.queue.put(message) 93 except: 94 pass 95 96 def SendPeerMessage(self, message): 97 global last_activity 98 last_activity = time.clock() 99 message_sent = False 100 connection_id = message['connection'] 101 if connection_id in connections: 102 if self.peer in connections[connection_id]: 103 try: 104 connections[connection_id][self.peer].handle_message(message) 105 message_sent = True 106 except: 107 # Clean up any disconnected connections 108 try: 109 connections[connection_id]['server'].close() 110 except: 111 pass 112 try: 113 connections[connection_id]['client'].close() 114 except: 115 pass 116 del connections[connection_id] 117 return message_sent 118 119 def tick(self): 120 global connections 121 global flush_pipes 122 processed_messages = False 123 now = time.clock() 124 try: 125 if self.next_message is None: 126 self.next_message = self.queue.get_nowait() 127 128 # Accumulate bandwidth if an available packet/message was waiting since our last tick 129 if self.next_message is not None and self.kbps > .0 and self.next_message['time'] <= now: 130 elapsed = now - self.last_tick 131 accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0 132 self.available_bytes += accumulated_bytes 133 134 # process messages as long as the next message is sendable (latency or available bytes) 135 while (self.next_message is not None) and\ 136 (flush_pipes or ((self.next_message['time'] <= now) and 137 (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))): 138 self.queue.task_done() 139 processed_messages = True 140 if self.kbps > .0: 141 self.available_bytes -= self.next_message['size'] 142 self.SendPeerMessage(self.next_message) 143 self.next_message = None 144 self.next_message = self.queue.get_nowait() 145 except: 146 pass 147 148 # Only accumulate bytes while we have messages that are ready to send 149 if self.next_message is None or self.next_message['time'] > now: 150 self.available_bytes = .0 151 self.last_tick = now 152 153 return processed_messages 154 155 156######################################################################################################################## 157# Threaded DNS resolver 158######################################################################################################################## 159class AsyncDNS(threading.Thread): 160 def __init__(self, client_id, hostname, port, result_pipe): 161 threading.Thread.__init__(self) 162 self.hostname = hostname 163 self.port = port 164 self.client_id = client_id 165 self.result_pipe = result_pipe 166 167 def run(self): 168 global lock, background_activity_count 169 try: 170 logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port)) 171 addresses = socket.getaddrinfo(self.hostname, self.port) 172 logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port)) 173 except: 174 addresses = () 175 logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port)) 176 message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses} 177 self.result_pipe.SendMessage(message, False) 178 lock.acquire() 179 if background_activity_count > 0: 180 background_activity_count -= 1 181 lock.release() 182 # open and close a local socket which will interrupt the long polling loop to process the message 183 s = socket.socket() 184 s.connect((server.ipaddr, server.port)) 185 s.close() 186 187 188######################################################################################################################## 189# TCP Client 190######################################################################################################################## 191class TCPConnection(asyncore.dispatcher): 192 STATE_ERROR = -1 193 STATE_IDLE = 0 194 STATE_RESOLVING = 1 195 STATE_CONNECTING = 2 196 STATE_CONNECTED = 3 197 198 def __init__(self, client_id): 199 global options 200 asyncore.dispatcher.__init__(self) 201 self.client_id = client_id 202 self.state = self.STATE_IDLE 203 self.buffer = '' 204 self.addr = None 205 self.dns_thread = None 206 self.hostname = None 207 self.port = None 208 self.needs_config = True 209 self.needs_close = False 210 self.is_localhost = False 211 self.did_resolve = False 212 213 def SendMessage(self, type, message): 214 message['message'] = type 215 message['connection'] = self.client_id 216 in_pipe.SendMessage(message) 217 218 def handle_message(self, message): 219 if message['message'] == 'data' and 'data' in message and len(message['data']): 220 self.buffer += message['data'] 221 if self.state == self.STATE_CONNECTED: 222 self.handle_write() 223 elif message['message'] == 'resolve': 224 self.HandleResolve(message) 225 elif message['message'] == 'connect': 226 self.HandleConnect(message) 227 elif message['message'] == 'closed': 228 if len(self.buffer) == 0: 229 self.handle_close() 230 else: 231 self.needs_close = True 232 233 def handle_error(self): 234 logging.warning('[{0:d}] Error'.format(self.client_id)) 235 if self.state == self.STATE_CONNECTING: 236 self.SendMessage('connected', {'success': False, 'address': self.addr}) 237 238 def handle_close(self): 239 logging.info('[{0:d}] Server Connection Closed'.format(self.client_id)) 240 self.state = self.STATE_ERROR 241 self.close() 242 try: 243 if self.client_id in connections: 244 if 'server' in connections[self.client_id]: 245 del connections[self.client_id]['server'] 246 if 'client' in connections[self.client_id]: 247 self.SendMessage('closed', {}) 248 else: 249 del connections[self.client_id] 250 except: 251 pass 252 253 def handle_connect(self): 254 if self.state == self.STATE_CONNECTING: 255 self.state = self.STATE_CONNECTED 256 self.SendMessage('connected', {'success': True, 'address': self.addr}) 257 logging.info('[{0:d}] Connected'.format(self.client_id)) 258 self.handle_write() 259 260 def writable(self): 261 if self.state == self.STATE_CONNECTING: 262 return True 263 return len(self.buffer) > 0 264 265 def handle_write(self): 266 if self.needs_config: 267 self.needs_config = False 268 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 269 self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 128 * 1024) 270 self.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 128 * 1024) 271 if len(self.buffer) > 0: 272 sent = self.send(self.buffer) 273 logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent)) 274 self.buffer = self.buffer[sent:] 275 if self.needs_close and len(self.buffer) == 0: 276 self.needs_close = False 277 self.handle_close() 278 279 def handle_read(self): 280 try: 281 while True: 282 data = self.recv(1460) 283 if data: 284 if self.state == self.STATE_CONNECTED: 285 logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data))) 286 self.SendMessage('data', {'data': data}) 287 else: 288 return 289 except: 290 pass 291 292 def HandleResolve(self, message): 293 global in_pipe, map_localhost, lock, background_activity_count 294 self.did_resolve = True 295 if 'hostname' in message: 296 self.hostname = message['hostname'] 297 self.port = 0 298 if 'port' in message: 299 self.port = message['port'] 300 logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostname, self.port)) 301 if self.hostname == 'localhost': 302 self.hostname = '127.0.0.1' 303 if self.hostname == '127.0.0.1': 304 logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id)) 305 self.is_localhost = True 306 if (dest_addresses is not None) and (not self.is_localhost or map_localhost): 307 logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(self.client_id, self.hostname, self.port, dest_addresses)) 308 self.SendMessage('resolved', {'addresses': dest_addresses}) 309 else: 310 lock.acquire() 311 background_activity_count += 1 312 lock.release() 313 self.state = self.STATE_RESOLVING 314 self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe) 315 self.dns_thread.start() 316 317 def HandleConnect(self, message): 318 global map_localhost 319 if 'addresses' in message and len(message['addresses']): 320 self.state = self.STATE_CONNECTING 321 if not self.did_resolve and message['addresses'][0] == '127.0.0.1': 322 logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id)) 323 self.is_localhost = True 324 if (dest_addresses is not None) and (not self.is_localhost or map_localhost): 325 self.addr = dest_addresses[0] 326 else: 327 self.addr = message['addresses'][0] 328 self.create_socket(self.addr[0], socket.SOCK_STREAM) 329 addr = self.addr[4][0] 330 if not self.is_localhost or map_localhost: 331 port = GetDestPort(message['port']) 332 else: 333 port = message['port'] 334 logging.info('[{0:d}] Connecting to {1}:{2:d}'.format(self.client_id, addr, port)) 335 self.connect((addr, port)) 336 337 338######################################################################################################################## 339# Socks5 Server 340######################################################################################################################## 341class Socks5Server(asyncore.dispatcher): 342 343 def __init__(self, host, port): 344 asyncore.dispatcher.__init__(self) 345 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 346 try: 347 #self.set_reuse_addr() 348 self.bind((host, port)) 349 self.listen(socket.SOMAXCONN) 350 self.ipaddr, self.port = self.getsockname() 351 self.current_client_id = 0 352 except: 353 PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port)) 354 exit(1) 355 356 def handle_accept(self): 357 global connections 358 pair = self.accept() 359 if pair is not None: 360 sock, addr = pair 361 self.current_client_id += 1 362 logging.info('[{0:d}] Incoming connection from {1}'.format(self.current_client_id, repr(addr))) 363 connections[self.current_client_id] = { 364 'client' : Socks5Connection(sock, self.current_client_id), 365 'server' : None 366 } 367 368 369# Socks5 reference: https://en.wikipedia.org/wiki/SOCKS#SOCKS5 370class Socks5Connection(asyncore.dispatcher): 371 STATE_ERROR = -1 372 STATE_WAITING_FOR_HANDSHAKE = 0 373 STATE_WAITING_FOR_CONNECT_REQUEST = 1 374 STATE_RESOLVING = 2 375 STATE_CONNECTING = 3 376 STATE_CONNECTED = 4 377 378 def __init__(self, connected_socket, client_id): 379 global options 380 asyncore.dispatcher.__init__(self, connected_socket) 381 self.client_id = client_id 382 self.state = self.STATE_WAITING_FOR_HANDSHAKE 383 self.ip = None 384 self.addresses = None 385 self.hostname = None 386 self.port = None 387 self.requested_address = None 388 self.buffer = '' 389 self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 390 self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 128 * 1024) 391 self.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 128 * 1024) 392 self.needs_close = False 393 394 def SendMessage(self, type, message): 395 message['message'] = type 396 message['connection'] = self.client_id 397 out_pipe.SendMessage(message) 398 399 def handle_message(self, message): 400 if message['message'] == 'data' and 'data' in message and len(message['data']) > 0: 401 self.buffer += message['data'] 402 if self.state == self.STATE_CONNECTED: 403 self.handle_write() 404 elif message['message'] == 'resolved': 405 self.HandleResolved(message) 406 elif message['message'] == 'connected': 407 self.HandleConnected(message) 408 self.handle_write() 409 elif message['message'] == 'closed': 410 if len(self.buffer) == 0: 411 logging.info('[{0:d}] Server connection close being processed, closing Browser connection'.format(self.client_id)) 412 self.handle_close() 413 else: 414 logging.info('[{0:d}] Server connection close being processed, queuing browser connection close'.format(self.client_id)) 415 self.needs_close = True 416 417 def writable(self): 418 return len(self.buffer) > 0 419 420 def handle_write(self): 421 if len(self.buffer) > 0: 422 sent = self.send(self.buffer) 423 logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent)) 424 self.buffer = self.buffer[sent:] 425 if self.needs_close and len(self.buffer) == 0: 426 logging.info('[{0:d}] queued browser connection close being processed, closing Browser connection'.format(self.client_id)) 427 self.needs_close = False 428 self.handle_close() 429 430 def handle_read(self): 431 global connections 432 global dns_cache 433 try: 434 while True: 435 # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames) 436 data = self.recv(1460) 437 if data: 438 data_len = len(data) 439 if self.state == self.STATE_CONNECTED: 440 logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len)) 441 self.SendMessage('data', {'data': data}) 442 elif self.state == self.STATE_WAITING_FOR_HANDSHAKE: 443 self.state = self.STATE_ERROR #default to an error state, set correctly if things work out 444 if data_len >= 2 and ord(data[0]) == 0x05: 445 supports_no_auth = False 446 auth_count = ord(data[1]) 447 if data_len == auth_count + 2: 448 for i in range(auth_count): 449 offset = i + 2 450 if ord(data[offset]) == 0: 451 supports_no_auth = True 452 if supports_no_auth: 453 # Respond with a message that "No Authentication" was agreed to 454 logging.info('[{0:d}] New Socks5 client'.format(self.client_id)) 455 response = chr(0x05) + chr(0x00) 456 self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST 457 self.buffer += response 458 self.handle_write() 459 elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST: 460 self.state = self.STATE_ERROR #default to an error state, set correctly if things work out 461 if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00: 462 if ord(data[1]) == 0x01: #TCP connection (only supported method for now) 463 connections[self.client_id]['server'] = TCPConnection(self.client_id) 464 self.requested_address = data[3:] 465 port_offset = 0 466 if ord(data[3]) == 0x01: 467 port_offset = 8 468 self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7])) 469 elif ord(data[3]) == 0x03: 470 name_len = ord(data[4]) 471 if data_len >= 6 + name_len: 472 port_offset = 5 + name_len 473 self.hostname = data[5:5 + name_len] 474 elif ord(data[3]) == 0x04 and data_len >= 22: 475 port_offset = 20 476 self.ip = '' 477 for i in range(16): 478 self.ip += '{0:02x}'.format(ord(data[4 + i])) 479 if i % 2 and i < 15: 480 self.ip += ':' 481 if port_offset and connections[self.client_id]['server'] is not None: 482 self.port = 256 * ord(data[port_offset]) + ord(data[port_offset + 1]) 483 if self.port: 484 if self.ip is None and self.hostname is not None: 485 if self.hostname in dns_cache: 486 self.state = self.STATE_CONNECTING 487 self.addresses = dns_cache[self.hostname] 488 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 489 else: 490 self.state = self.STATE_RESOLVING 491 self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port}) 492 elif self.ip is not None: 493 self.state = self.STATE_CONNECTING 494 logging.debug('[{0:d}] Socks Connect - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.ip, self.port)) 495 self.addresses = socket.getaddrinfo(self.ip, self.port) 496 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 497 else: 498 return 499 except: 500 pass 501 502 def handle_close(self): 503 logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.client_id)) 504 self.state = self.STATE_ERROR 505 self.close() 506 try: 507 if self.client_id in connections: 508 if 'client' in connections[self.client_id]: 509 del connections[self.client_id]['client'] 510 if 'server' in connections[self.client_id]: 511 self.SendMessage('closed', {}) 512 else: 513 del connections[self.client_id] 514 except: 515 pass 516 517 def HandleResolved(self, message): 518 global dns_cache 519 if self.state == self.STATE_RESOLVING: 520 if 'addresses' in message and len(message['addresses']): 521 self.state = self.STATE_CONNECTING 522 self.addresses = message['addresses'] 523 dns_cache[self.hostname] = self.addresses 524 logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname)) 525 self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port}) 526 else: 527 # Send host unreachable error 528 self.state = self.STATE_ERROR 529 self.buffer += chr(0x05) + chr(0x04) + self.requested_address 530 self.handle_write() 531 532 def HandleConnected(self, message): 533 if 'success' in message and self.state == self.STATE_CONNECTING: 534 response = chr(0x05) 535 if message['success']: 536 response += chr(0x00) 537 logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hostname)) 538 self.state = self.STATE_CONNECTED 539 else: 540 response += chr(0x04) 541 self.state = self.STATE_ERROR 542 response += chr(0x00) 543 response += self.requested_address 544 self.buffer += response 545 self.handle_write() 546 547 548######################################################################################################################## 549# stdin command processor 550######################################################################################################################## 551class CommandProcessor(): 552 def __init__(self): 553 thread = threading.Thread(target = self.run, args=()) 554 thread.daemon = True 555 thread.start() 556 557 def run(self): 558 global must_exit 559 while not must_exit: 560 for line in iter(sys.stdin.readline, ''): 561 self.ProcessCommand(line.strip()) 562 563 def ProcessCommand(self, input): 564 global in_pipe 565 global out_pipe 566 global needs_flush 567 global REMOVE_TCP_OVERHEAD 568 global port_mappings 569 global server 570 if len(input): 571 ok = False 572 try: 573 command = input.split() 574 if len(command) and len(command[0]): 575 if command[0].lower() == 'flush': 576 ok = True 577 elif command[0].lower() == 'set' and len(command) >= 3: 578 if command[1].lower() == 'rtt' and len(command[2]): 579 rtt = float(command[2]) 580 latency = rtt / 2000.0 581 in_pipe.latency = latency 582 out_pipe.latency = latency 583 ok = True 584 elif command[1].lower() == 'inkbps' and len(command[2]): 585 in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD 586 ok = True 587 elif command[1].lower() == 'outkbps' and len(command[2]): 588 out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD 589 ok = True 590 elif command[1].lower() == 'mapports' and len(command[2]): 591 SetPortMappings(command[2]) 592 ok = True 593 elif command[0].lower() == 'reset' and len(command) >= 2: 594 if command[1].lower() == 'rtt' or command[1].lower() == 'all': 595 in_pipe.latency = 0 596 out_pipe.latency = 0 597 ok = True 598 if command[1].lower() == 'inkbps' or command[1].lower() == 'all': 599 in_pipe.kbps = 0 600 ok = True 601 if command[1].lower() == 'outkbps' or command[1].lower() == 'all': 602 out_pipe.kbps = 0 603 ok = True 604 if command[1].lower() == 'mapports' or command[1].lower() == 'all': 605 port_mappings = {} 606 ok = True 607 608 if ok: 609 needs_flush = True 610 except: 611 pass 612 if not ok: 613 PrintMessage('ERROR') 614 # open and close a local socket which will interrupt the long polling loop to process the flush 615 if needs_flush: 616 s = socket.socket() 617 s.connect((server.ipaddr, server.port)) 618 s.close() 619 620 621######################################################################################################################## 622# Main Entry Point 623######################################################################################################################## 624def main(): 625 global server 626 global options 627 global in_pipe 628 global out_pipe 629 global dest_addresses 630 global port_mappings 631 global map_localhost 632 import argparse 633 global REMOVE_TCP_OVERHEAD 634 parser = argparse.ArgumentParser(description='Traffic-shaping socks5 proxy.', 635 prog='tsproxy') 636 parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more). -vvvv for full debug output.") 637 parser.add_argument('--logfile', help="Write log messages to given file instead of stdout.") 638 parser.add_argument('-b', '--bind', default='localhost', help="Server interface address (defaults to localhost).") 639 parser.add_argument('-p', '--port', type=int, default=1080, help="Server port (defaults to 1080, use 0 for randomly assigned).") 640 parser.add_argument('-r', '--rtt', type=float, default=.0, help="Round Trip Time Latency (in ms).") 641 parser.add_argument('-i', '--inkbps', type=float, default=.0, help="Download Bandwidth (in 1000 bits/s - Kbps).") 642 parser.add_argument('-o', '--outkbps', type=float, default=.0, help="Upload Bandwidth (in 1000 bits/s - Kbps).") 643 parser.add_argument('-w', '--window', type=int, default=10, help="Emulated TCP initial congestion window (defaults to 10).") 644 parser.add_argument('-d', '--desthost', help="Redirect all outbound connections to the specified host.") 645 parser.add_argument('-m', '--mapports', help="Remap outbound ports. Comma-separated list of original:new with * as a wildcard. --mapports '443:8443,*:8080'") 646 parser.add_argument('-l', '--localhost', action='store_true', default=False, 647 help="Include connections already destined for localhost/127.0.0.1 in the host and port remapping.") 648 options = parser.parse_args() 649 650 # Set up logging 651 log_level = logging.CRITICAL 652 if options.verbose == 1: 653 log_level = logging.ERROR 654 elif options.verbose == 2: 655 log_level = logging.WARNING 656 elif options.verbose == 3: 657 log_level = logging.INFO 658 elif options.verbose >= 4: 659 log_level = logging.DEBUG 660 if options.logfile is not None: 661 logging.basicConfig(filename=options.logfile, level=log_level, 662 format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S") 663 else: 664 logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S") 665 666 # Parse any port mappings 667 if options.mapports: 668 SetPortMappings(options.mapports) 669 670 map_localhost = options.localhost 671 672 # Resolve the address for a rewrite destination host if one was specified 673 if options.desthost: 674 logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.desthost, GetDestPort(80))) 675 dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80)) 676 677 # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1000 to convert to seconds) 678 in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE_TCP_OVERHEAD) 679 out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REMOVE_TCP_OVERHEAD) 680 681 signal.signal(signal.SIGINT, signal_handler) 682 server = Socks5Server(options.bind, options.port) 683 command_processor = CommandProcessor() 684 PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.format(server.ipaddr, server.port)) 685 run_loop() 686 687def signal_handler(signal, frame): 688 global server 689 global must_exit 690 logging.error('Exiting...') 691 must_exit = True 692 del server 693 694 695# Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms 696def run_loop(): 697 global must_exit 698 global in_pipe 699 global out_pipe 700 global needs_flush 701 global flush_pipes 702 global last_activity 703 winmm = None 704 705 # increase the windows timer resolution to 1ms 706 if platform.system() == "Windows": 707 try: 708 import ctypes 709 winmm = ctypes.WinDLL('winmm') 710 winmm.timeBeginPeriod(1) 711 except: 712 pass 713 714 last_activity = time.clock() 715 last_check = time.clock() 716 # disable gc to avoid pauses during traffic shaping/proxying 717 gc.disable() 718 while not must_exit: 719 # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise 720 lock.acquire() 721 tick_interval = 0.001 722 if background_activity_count == 0: 723 if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty(): 724 tick_interval = 1.0 725 elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0: 726 tick_interval = 1.0 727 lock.release() 728 asyncore.poll(tick_interval, asyncore.socket_map) 729 if needs_flush: 730 flush_pipes = True 731 needs_flush = False 732 out_pipe.tick() 733 in_pipe.tick() 734 if flush_pipes: 735 PrintMessage('OK') 736 flush_pipes = False 737 # Every 500 ms check to see if it is a good time to do a gc 738 now = time.clock() 739 if now - last_check > 0.5: 740 last_check = now 741 # manually gc after 5 seconds of idle 742 if now - last_activity >= 5: 743 last_activity = now 744 logging.debug("Triggering manual GC") 745 gc.collect() 746 747 if winmm is not None: 748 winmm.timeEndPeriod(1) 749 750def GetDestPort(port): 751 global port_mappings 752 if port_mappings is not None: 753 src_port = str(port) 754 if src_port in port_mappings: 755 return port_mappings[src_port] 756 elif 'default' in port_mappings: 757 return port_mappings['default'] 758 return port 759 760 761def SetPortMappings(map_string): 762 global port_mappings 763 port_mappings = {} 764 map_string = map_string.strip('\'" \t\r\n') 765 for pair in map_string.split(','): 766 (src, dest) = pair.split(':') 767 if src == '*': 768 port_mappings['default'] = int(dest) 769 logging.debug("Default port mapped to port {0}".format(dest)) 770 else: 771 logging.debug("Port {0} mapped to port {1}".format(src, dest)) 772 port_mappings[src] = int(dest) 773 774 775if '__main__' == __name__: 776 main() 777