1#!/usr/bin/python
2"""
3Copyright 2016 Google Inc. All Rights Reserved.
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16"""
17import asyncore
18import gc
19import logging
20import platform
21import Queue
22import re
23import signal
24import socket
25import sys
26import threading
27import time
28
29server = None
30in_pipe = None
31out_pipe = None
32must_exit = False
33options = None
34dest_addresses = None
35connections = {}
36dns_cache = {}
37port_mappings = None
38map_localhost = False
39needs_flush = False
40flush_pipes = False
41last_activity = None
42REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
43lock = threading.Lock()
44background_activity_count = 0
45
46
47def PrintMessage(msg):
48  # Print the message to stdout & flush to make sure that the message is not
49  # buffered when tsproxy is run as a subprocess.
50  print >> sys.stdout, msg
51  sys.stdout.flush()
52
53########################################################################################################################
54#   Traffic-shaping pipe (just passthrough for now)
55########################################################################################################################
56class TSPipe():
57  PIPE_IN = 0
58  PIPE_OUT = 1
59
60  def __init__(self, direction, latency, kbps):
61    self.direction = direction
62    self.latency = latency
63    self.kbps = kbps
64    self.queue = Queue.Queue()
65    self.last_tick = time.clock()
66    self.next_message = None
67    self.available_bytes = .0
68    self.peer = 'server'
69    if self.direction == self.PIPE_IN:
70      self.peer = 'client'
71
72  def SendMessage(self, message, main_thread = True):
73    global connections, in_pipe, out_pipe
74    message_sent = False
75    now = time.clock()
76    if message['message'] == 'closed':
77      message['time'] = now
78    else:
79      message['time'] = time.clock() + self.latency
80    message['size'] = .0
81    if 'data' in message:
82      message['size'] = float(len(message['data']))
83    try:
84      connection_id = message['connection']
85      # Send messages directly, bypassing the queues is throttling is disabled and we are on the main thread
86      if main_thread and connection_id in connections and self.peer in connections[connection_id]and self.latency == 0 and self.kbps == .0:
87        message_sent = self.SendPeerMessage(message)
88    except:
89      pass
90    if not message_sent:
91      try:
92        self.queue.put(message)
93      except:
94        pass
95
96  def SendPeerMessage(self, message):
97    global last_activity
98    last_activity = time.clock()
99    message_sent = False
100    connection_id = message['connection']
101    if connection_id in connections:
102      if self.peer in connections[connection_id]:
103        try:
104          connections[connection_id][self.peer].handle_message(message)
105          message_sent = True
106        except:
107          # Clean up any disconnected connections
108          try:
109            connections[connection_id]['server'].close()
110          except:
111            pass
112          try:
113            connections[connection_id]['client'].close()
114          except:
115            pass
116          del connections[connection_id]
117    return message_sent
118
119  def tick(self):
120    global connections
121    global flush_pipes
122    processed_messages = False
123    now = time.clock()
124    try:
125      if self.next_message is None:
126        self.next_message = self.queue.get_nowait()
127
128      # Accumulate bandwidth if an available packet/message was waiting since our last tick
129      if self.next_message is not None and self.kbps > .0 and self.next_message['time'] <= now:
130        elapsed = now - self.last_tick
131        accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0
132        self.available_bytes += accumulated_bytes
133
134      # process messages as long as the next message is sendable (latency or available bytes)
135      while (self.next_message is not None) and\
136          (flush_pipes or ((self.next_message['time'] <= now) and
137                          (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))):
138        self.queue.task_done()
139        processed_messages = True
140        if self.kbps > .0:
141          self.available_bytes -= self.next_message['size']
142        self.SendPeerMessage(self.next_message)
143        self.next_message = None
144        self.next_message = self.queue.get_nowait()
145    except:
146      pass
147
148    # Only accumulate bytes while we have messages that are ready to send
149    if self.next_message is None or self.next_message['time'] > now:
150      self.available_bytes = .0
151    self.last_tick = now
152
153    return processed_messages
154
155
156########################################################################################################################
157#   Threaded DNS resolver
158########################################################################################################################
159class AsyncDNS(threading.Thread):
160  def __init__(self, client_id, hostname, port, result_pipe):
161    threading.Thread.__init__(self)
162    self.hostname = hostname
163    self.port = port
164    self.client_id = client_id
165    self.result_pipe = result_pipe
166
167  def run(self):
168    global lock, background_activity_count
169    try:
170      logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
171      addresses = socket.getaddrinfo(self.hostname, self.port)
172      logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
173    except:
174      addresses = ()
175      logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
176    message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
177    self.result_pipe.SendMessage(message, False)
178    lock.acquire()
179    if background_activity_count > 0:
180      background_activity_count -= 1
181    lock.release()
182    # open and close a local socket which will interrupt the long polling loop to process the message
183    s = socket.socket()
184    s.connect((server.ipaddr, server.port))
185    s.close()
186
187
188########################################################################################################################
189#   TCP Client
190########################################################################################################################
191class TCPConnection(asyncore.dispatcher):
192  STATE_ERROR = -1
193  STATE_IDLE = 0
194  STATE_RESOLVING = 1
195  STATE_CONNECTING = 2
196  STATE_CONNECTED = 3
197
198  def __init__(self, client_id):
199    global options
200    asyncore.dispatcher.__init__(self)
201    self.client_id = client_id
202    self.state = self.STATE_IDLE
203    self.buffer = ''
204    self.addr = None
205    self.dns_thread = None
206    self.hostname = None
207    self.port = None
208    self.needs_config = True
209    self.needs_close = False
210    self.is_localhost = False
211    self.did_resolve = False
212
213  def SendMessage(self, type, message):
214    message['message'] = type
215    message['connection'] = self.client_id
216    in_pipe.SendMessage(message)
217
218  def handle_message(self, message):
219    if message['message'] == 'data' and 'data' in message and len(message['data']):
220      self.buffer += message['data']
221      if self.state == self.STATE_CONNECTED:
222        self.handle_write()
223    elif message['message'] == 'resolve':
224      self.HandleResolve(message)
225    elif message['message'] == 'connect':
226      self.HandleConnect(message)
227    elif message['message'] == 'closed':
228      if len(self.buffer) == 0:
229        self.handle_close()
230      else:
231        self.needs_close = True
232
233  def handle_error(self):
234    logging.warning('[{0:d}] Error'.format(self.client_id))
235    if self.state == self.STATE_CONNECTING:
236      self.SendMessage('connected', {'success': False, 'address': self.addr})
237
238  def handle_close(self):
239    logging.info('[{0:d}] Server Connection Closed'.format(self.client_id))
240    self.state = self.STATE_ERROR
241    self.close()
242    try:
243      if self.client_id in connections:
244        if 'server' in connections[self.client_id]:
245          del connections[self.client_id]['server']
246        if 'client' in connections[self.client_id]:
247          self.SendMessage('closed', {})
248        else:
249          del connections[self.client_id]
250    except:
251      pass
252
253  def handle_connect(self):
254    if self.state == self.STATE_CONNECTING:
255      self.state = self.STATE_CONNECTED
256      self.SendMessage('connected', {'success': True, 'address': self.addr})
257      logging.info('[{0:d}] Connected'.format(self.client_id))
258    self.handle_write()
259
260  def writable(self):
261    if self.state == self.STATE_CONNECTING:
262      return True
263    return len(self.buffer) > 0
264
265  def handle_write(self):
266    if self.needs_config:
267      self.needs_config = False
268      self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
269      self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 128 * 1024)
270      self.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 128 * 1024)
271    if len(self.buffer) > 0:
272      sent = self.send(self.buffer)
273      logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
274      self.buffer = self.buffer[sent:]
275      if self.needs_close and len(self.buffer) == 0:
276        self.needs_close = False
277        self.handle_close()
278
279  def handle_read(self):
280    try:
281      while True:
282        data = self.recv(1460)
283        if data:
284          if self.state == self.STATE_CONNECTED:
285            logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
286            self.SendMessage('data', {'data': data})
287        else:
288          return
289    except:
290      pass
291
292  def HandleResolve(self, message):
293    global in_pipe,  map_localhost, lock, background_activity_count
294    self.did_resolve = True
295    if 'hostname' in message:
296      self.hostname = message['hostname']
297    self.port = 0
298    if 'port' in message:
299      self.port = message['port']
300    logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
301    if self.hostname == 'localhost':
302      self.hostname = '127.0.0.1'
303    if self.hostname == '127.0.0.1':
304      logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
305      self.is_localhost = True
306    if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
307      logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(self.client_id, self.hostname, self.port, dest_addresses))
308      self.SendMessage('resolved', {'addresses': dest_addresses})
309    else:
310      lock.acquire()
311      background_activity_count += 1
312      lock.release()
313      self.state = self.STATE_RESOLVING
314      self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe)
315      self.dns_thread.start()
316
317  def HandleConnect(self, message):
318    global map_localhost
319    if 'addresses' in message and len(message['addresses']):
320      self.state = self.STATE_CONNECTING
321      if not self.did_resolve and message['addresses'][0] == '127.0.0.1':
322        logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
323        self.is_localhost = True
324      if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
325        self.addr = dest_addresses[0]
326      else:
327        self.addr = message['addresses'][0]
328      self.create_socket(self.addr[0], socket.SOCK_STREAM)
329      addr = self.addr[4][0]
330      if not self.is_localhost or map_localhost:
331        port = GetDestPort(message['port'])
332      else:
333        port = message['port']
334      logging.info('[{0:d}] Connecting to {1}:{2:d}'.format(self.client_id, addr, port))
335      self.connect((addr, port))
336
337
338########################################################################################################################
339#   Socks5 Server
340########################################################################################################################
341class Socks5Server(asyncore.dispatcher):
342
343  def __init__(self, host, port):
344    asyncore.dispatcher.__init__(self)
345    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
346    try:
347      #self.set_reuse_addr()
348      self.bind((host, port))
349      self.listen(socket.SOMAXCONN)
350      self.ipaddr, self.port = self.getsockname()
351      self.current_client_id = 0
352    except:
353      PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port))
354      exit(1)
355
356  def handle_accept(self):
357    global connections
358    pair = self.accept()
359    if pair is not None:
360      sock, addr = pair
361      self.current_client_id += 1
362      logging.info('[{0:d}] Incoming connection from {1}'.format(self.current_client_id, repr(addr)))
363      connections[self.current_client_id] = {
364        'client' : Socks5Connection(sock, self.current_client_id),
365        'server' : None
366      }
367
368
369# Socks5 reference: https://en.wikipedia.org/wiki/SOCKS#SOCKS5
370class Socks5Connection(asyncore.dispatcher):
371  STATE_ERROR = -1
372  STATE_WAITING_FOR_HANDSHAKE = 0
373  STATE_WAITING_FOR_CONNECT_REQUEST = 1
374  STATE_RESOLVING = 2
375  STATE_CONNECTING = 3
376  STATE_CONNECTED = 4
377
378  def __init__(self, connected_socket, client_id):
379    global options
380    asyncore.dispatcher.__init__(self, connected_socket)
381    self.client_id = client_id
382    self.state = self.STATE_WAITING_FOR_HANDSHAKE
383    self.ip = None
384    self.addresses = None
385    self.hostname = None
386    self.port = None
387    self.requested_address = None
388    self.buffer = ''
389    self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
390    self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 128 * 1024)
391    self.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 128 * 1024)
392    self.needs_close = False
393
394  def SendMessage(self, type, message):
395    message['message'] = type
396    message['connection'] = self.client_id
397    out_pipe.SendMessage(message)
398
399  def handle_message(self, message):
400    if message['message'] == 'data' and 'data' in message and len(message['data']) > 0:
401      self.buffer += message['data']
402      if self.state == self.STATE_CONNECTED:
403        self.handle_write()
404    elif message['message'] == 'resolved':
405      self.HandleResolved(message)
406    elif message['message'] == 'connected':
407      self.HandleConnected(message)
408      self.handle_write()
409    elif message['message'] == 'closed':
410      if len(self.buffer) == 0:
411        logging.info('[{0:d}] Server connection close being processed, closing Browser connection'.format(self.client_id))
412        self.handle_close()
413      else:
414        logging.info('[{0:d}] Server connection close being processed, queuing browser connection close'.format(self.client_id))
415        self.needs_close = True
416
417  def writable(self):
418    return len(self.buffer) > 0
419
420  def handle_write(self):
421    if len(self.buffer) > 0:
422      sent = self.send(self.buffer)
423      logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
424      self.buffer = self.buffer[sent:]
425      if self.needs_close and len(self.buffer) == 0:
426        logging.info('[{0:d}] queued browser connection close being processed, closing Browser connection'.format(self.client_id))
427        self.needs_close = False
428        self.handle_close()
429
430  def handle_read(self):
431    global connections
432    global dns_cache
433    try:
434      while True:
435        # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
436        data = self.recv(1460)
437        if data:
438          data_len = len(data)
439          if self.state == self.STATE_CONNECTED:
440            logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len))
441            self.SendMessage('data', {'data': data})
442          elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
443            self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
444            if data_len >= 2 and ord(data[0]) == 0x05:
445              supports_no_auth = False
446              auth_count = ord(data[1])
447              if data_len == auth_count + 2:
448                for i in range(auth_count):
449                  offset = i + 2
450                  if ord(data[offset]) == 0:
451                    supports_no_auth = True
452              if supports_no_auth:
453                # Respond with a message that "No Authentication" was agreed to
454                logging.info('[{0:d}] New Socks5 client'.format(self.client_id))
455                response = chr(0x05) + chr(0x00)
456                self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
457                self.buffer += response
458                self.handle_write()
459          elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
460            self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
461            if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
462              if ord(data[1]) == 0x01: #TCP connection (only supported method for now)
463                connections[self.client_id]['server'] = TCPConnection(self.client_id)
464              self.requested_address = data[3:]
465              port_offset = 0
466              if ord(data[3]) == 0x01:
467                port_offset = 8
468                self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7]))
469              elif ord(data[3]) == 0x03:
470                name_len = ord(data[4])
471                if data_len >= 6 + name_len:
472                  port_offset = 5 + name_len
473                  self.hostname = data[5:5 + name_len]
474              elif ord(data[3]) == 0x04 and data_len >= 22:
475                port_offset = 20
476                self.ip = ''
477                for i in range(16):
478                  self.ip += '{0:02x}'.format(ord(data[4 + i]))
479                  if i % 2 and i < 15:
480                    self.ip += ':'
481              if port_offset and connections[self.client_id]['server'] is not None:
482                self.port = 256 * ord(data[port_offset]) + ord(data[port_offset + 1])
483                if self.port:
484                  if self.ip is None and self.hostname is not None:
485                    if self.hostname in dns_cache:
486                      self.state = self.STATE_CONNECTING
487                      self.addresses = dns_cache[self.hostname]
488                      self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
489                    else:
490                      self.state = self.STATE_RESOLVING
491                      self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port})
492                  elif self.ip is not None:
493                    self.state = self.STATE_CONNECTING
494                    logging.debug('[{0:d}] Socks Connect - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.ip, self.port))
495                    self.addresses = socket.getaddrinfo(self.ip, self.port)
496                    self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
497        else:
498          return
499    except:
500      pass
501
502  def handle_close(self):
503    logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.client_id))
504    self.state = self.STATE_ERROR
505    self.close()
506    try:
507      if self.client_id in connections:
508        if 'client' in connections[self.client_id]:
509          del connections[self.client_id]['client']
510        if 'server' in connections[self.client_id]:
511          self.SendMessage('closed', {})
512        else:
513          del connections[self.client_id]
514    except:
515      pass
516
517  def HandleResolved(self, message):
518    global dns_cache
519    if self.state == self.STATE_RESOLVING:
520      if 'addresses' in message and len(message['addresses']):
521        self.state = self.STATE_CONNECTING
522        self.addresses = message['addresses']
523        dns_cache[self.hostname] = self.addresses
524        logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname))
525        self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
526      else:
527        # Send host unreachable error
528        self.state = self.STATE_ERROR
529        self.buffer += chr(0x05) + chr(0x04) + self.requested_address
530        self.handle_write()
531
532  def HandleConnected(self, message):
533    if 'success' in message and self.state == self.STATE_CONNECTING:
534      response = chr(0x05)
535      if message['success']:
536        response += chr(0x00)
537        logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hostname))
538        self.state = self.STATE_CONNECTED
539      else:
540        response += chr(0x04)
541        self.state = self.STATE_ERROR
542      response += chr(0x00)
543      response += self.requested_address
544      self.buffer += response
545      self.handle_write()
546
547
548########################################################################################################################
549#   stdin command processor
550########################################################################################################################
551class CommandProcessor():
552  def __init__(self):
553    thread = threading.Thread(target = self.run, args=())
554    thread.daemon = True
555    thread.start()
556
557  def run(self):
558    global must_exit
559    while not must_exit:
560      for line in iter(sys.stdin.readline, ''):
561        self.ProcessCommand(line.strip())
562
563  def ProcessCommand(self, input):
564    global in_pipe
565    global out_pipe
566    global needs_flush
567    global REMOVE_TCP_OVERHEAD
568    global port_mappings
569    global server
570    if len(input):
571      ok = False
572      try:
573        command = input.split()
574        if len(command) and len(command[0]):
575          if command[0].lower() == 'flush':
576            ok = True
577          elif command[0].lower() == 'set' and len(command) >= 3:
578            if command[1].lower() == 'rtt' and len(command[2]):
579              rtt = float(command[2])
580              latency = rtt / 2000.0
581              in_pipe.latency = latency
582              out_pipe.latency = latency
583              ok = True
584            elif command[1].lower() == 'inkbps' and len(command[2]):
585              in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
586              ok = True
587            elif command[1].lower() == 'outkbps' and len(command[2]):
588              out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
589              ok = True
590            elif command[1].lower() == 'mapports' and len(command[2]):
591              SetPortMappings(command[2])
592              ok = True
593          elif command[0].lower() == 'reset' and len(command) >= 2:
594            if command[1].lower() == 'rtt' or command[1].lower() == 'all':
595              in_pipe.latency = 0
596              out_pipe.latency = 0
597              ok = True
598            if command[1].lower() == 'inkbps' or command[1].lower() == 'all':
599              in_pipe.kbps = 0
600              ok = True
601            if command[1].lower() == 'outkbps' or command[1].lower() == 'all':
602              out_pipe.kbps = 0
603              ok = True
604            if command[1].lower() == 'mapports' or command[1].lower() == 'all':
605              port_mappings = {}
606              ok = True
607
608          if ok:
609            needs_flush = True
610      except:
611        pass
612      if not ok:
613        PrintMessage('ERROR')
614      # open and close a local socket which will interrupt the long polling loop to process the flush
615      if needs_flush:
616        s = socket.socket()
617        s.connect((server.ipaddr, server.port))
618        s.close()
619
620
621########################################################################################################################
622#   Main Entry Point
623########################################################################################################################
624def main():
625  global server
626  global options
627  global in_pipe
628  global out_pipe
629  global dest_addresses
630  global port_mappings
631  global map_localhost
632  import argparse
633  global REMOVE_TCP_OVERHEAD
634  parser = argparse.ArgumentParser(description='Traffic-shaping socks5 proxy.',
635                                   prog='tsproxy')
636  parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more). -vvvv for full debug output.")
637  parser.add_argument('--logfile', help="Write log messages to given file instead of stdout.")
638  parser.add_argument('-b', '--bind', default='localhost', help="Server interface address (defaults to localhost).")
639  parser.add_argument('-p', '--port', type=int, default=1080, help="Server port (defaults to 1080, use 0 for randomly assigned).")
640  parser.add_argument('-r', '--rtt', type=float, default=.0, help="Round Trip Time Latency (in ms).")
641  parser.add_argument('-i', '--inkbps', type=float, default=.0, help="Download Bandwidth (in 1000 bits/s - Kbps).")
642  parser.add_argument('-o', '--outkbps', type=float, default=.0, help="Upload Bandwidth (in 1000 bits/s - Kbps).")
643  parser.add_argument('-w', '--window', type=int, default=10, help="Emulated TCP initial congestion window (defaults to 10).")
644  parser.add_argument('-d', '--desthost', help="Redirect all outbound connections to the specified host.")
645  parser.add_argument('-m', '--mapports', help="Remap outbound ports. Comma-separated list of original:new with * as a wildcard. --mapports '443:8443,*:8080'")
646  parser.add_argument('-l', '--localhost', action='store_true', default=False,
647                      help="Include connections already destined for localhost/127.0.0.1 in the host and port remapping.")
648  options = parser.parse_args()
649
650  # Set up logging
651  log_level = logging.CRITICAL
652  if options.verbose == 1:
653    log_level = logging.ERROR
654  elif options.verbose == 2:
655    log_level = logging.WARNING
656  elif options.verbose == 3:
657    log_level = logging.INFO
658  elif options.verbose >= 4:
659    log_level = logging.DEBUG
660  if options.logfile is not None:
661    logging.basicConfig(filename=options.logfile, level=log_level,
662                        format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")
663  else:
664    logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")
665
666  # Parse any port mappings
667  if options.mapports:
668    SetPortMappings(options.mapports)
669
670  map_localhost = options.localhost
671
672  # Resolve the address for a rewrite destination host if one was specified
673  if options.desthost:
674    logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.desthost, GetDestPort(80)))
675    dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
676
677  # Set up the pipes.  1/2 of the latency gets applied in each direction (and /1000 to convert to seconds)
678  in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE_TCP_OVERHEAD)
679  out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REMOVE_TCP_OVERHEAD)
680
681  signal.signal(signal.SIGINT, signal_handler)
682  server = Socks5Server(options.bind, options.port)
683  command_processor = CommandProcessor()
684  PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.format(server.ipaddr, server.port))
685  run_loop()
686
687def signal_handler(signal, frame):
688  global server
689  global must_exit
690  logging.error('Exiting...')
691  must_exit = True
692  del server
693
694
695# Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms
696def run_loop():
697  global must_exit
698  global in_pipe
699  global out_pipe
700  global needs_flush
701  global flush_pipes
702  global last_activity
703  winmm = None
704
705  # increase the windows timer resolution to 1ms
706  if platform.system() == "Windows":
707    try:
708      import ctypes
709      winmm = ctypes.WinDLL('winmm')
710      winmm.timeBeginPeriod(1)
711    except:
712      pass
713
714  last_activity = time.clock()
715  last_check = time.clock()
716  # disable gc to avoid pauses during traffic shaping/proxying
717  gc.disable()
718  while not must_exit:
719    # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
720    lock.acquire()
721    tick_interval = 0.001
722    if background_activity_count == 0:
723      if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty():
724        tick_interval = 1.0
725      elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0:
726        tick_interval = 1.0
727    lock.release()
728    asyncore.poll(tick_interval, asyncore.socket_map)
729    if needs_flush:
730      flush_pipes = True
731      needs_flush = False
732    out_pipe.tick()
733    in_pipe.tick()
734    if flush_pipes:
735      PrintMessage('OK')
736      flush_pipes = False
737    # Every 500 ms check to see if it is a good time to do a gc
738    now = time.clock()
739    if now - last_check > 0.5:
740      last_check = now
741      # manually gc after 5 seconds of idle
742      if now - last_activity >= 5:
743        last_activity = now
744        logging.debug("Triggering manual GC")
745        gc.collect()
746
747  if winmm is not None:
748    winmm.timeEndPeriod(1)
749
750def GetDestPort(port):
751  global port_mappings
752  if port_mappings is not None:
753    src_port = str(port)
754    if src_port in port_mappings:
755      return port_mappings[src_port]
756    elif 'default' in port_mappings:
757      return port_mappings['default']
758  return port
759
760
761def SetPortMappings(map_string):
762  global port_mappings
763  port_mappings = {}
764  map_string = map_string.strip('\'" \t\r\n')
765  for pair in map_string.split(','):
766    (src, dest) = pair.split(':')
767    if src == '*':
768      port_mappings['default'] = int(dest)
769      logging.debug("Default port mapped to port {0}".format(dest))
770    else:
771      logging.debug("Port {0} mapped to port {1}".format(src, dest))
772      port_mappings[src] = int(dest)
773
774
775if '__main__' == __name__:
776  main()
777