1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 package org.eclipse.jetty.server.handler;
20 
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketException;
24 import java.net.SocketTimeoutException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.SocketChannel;
28 import java.util.Arrays;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.TimeUnit;
33 import javax.servlet.ServletException;
34 import javax.servlet.http.HttpServletRequest;
35 import javax.servlet.http.HttpServletResponse;
36 
37 import org.eclipse.jetty.http.HttpMethods;
38 import org.eclipse.jetty.http.HttpParser;
39 import org.eclipse.jetty.io.AsyncEndPoint;
40 import org.eclipse.jetty.io.Buffer;
41 import org.eclipse.jetty.io.ConnectedEndPoint;
42 import org.eclipse.jetty.io.Connection;
43 import org.eclipse.jetty.io.EndPoint;
44 import org.eclipse.jetty.io.nio.AsyncConnection;
45 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
46 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
47 import org.eclipse.jetty.io.nio.SelectorManager;
48 import org.eclipse.jetty.server.AbstractHttpConnection;
49 import org.eclipse.jetty.server.Handler;
50 import org.eclipse.jetty.server.Request;
51 import org.eclipse.jetty.server.Server;
52 import org.eclipse.jetty.util.HostMap;
53 import org.eclipse.jetty.util.TypeUtil;
54 import org.eclipse.jetty.util.component.LifeCycle;
55 import org.eclipse.jetty.util.log.Log;
56 import org.eclipse.jetty.util.log.Logger;
57 import org.eclipse.jetty.util.thread.ThreadPool;
58 
59 /**
60  * <p>Implementation of a tunneling proxy that supports HTTP CONNECT.</p>
61  * <p>To work as CONNECT proxy, objects of this class must be instantiated using the no-arguments
62  * constructor, since the remote server information will be present in the CONNECT URI.</p>
63  */
64 public class ConnectHandler extends HandlerWrapper
65 {
66     private static final Logger LOG = Log.getLogger(ConnectHandler.class);
67     private final SelectorManager _selectorManager = new Manager();
68     private volatile int _connectTimeout = 5000;
69     private volatile int _writeTimeout = 30000;
70     private volatile ThreadPool _threadPool;
71     private volatile boolean _privateThreadPool;
72     private HostMap<String> _white = new HostMap<String>();
73     private HostMap<String> _black = new HostMap<String>();
74 
ConnectHandler()75     public ConnectHandler()
76     {
77         this(null);
78     }
79 
ConnectHandler(String[] white, String[] black)80     public ConnectHandler(String[] white, String[] black)
81     {
82         this(null, white, black);
83     }
84 
ConnectHandler(Handler handler)85     public ConnectHandler(Handler handler)
86     {
87         setHandler(handler);
88     }
89 
ConnectHandler(Handler handler, String[] white, String[] black)90     public ConnectHandler(Handler handler, String[] white, String[] black)
91     {
92         setHandler(handler);
93         set(white, _white);
94         set(black, _black);
95     }
96 
97     /**
98      * @return the timeout, in milliseconds, to connect to the remote server
99      */
getConnectTimeout()100     public int getConnectTimeout()
101     {
102         return _connectTimeout;
103     }
104 
105     /**
106      * @param connectTimeout the timeout, in milliseconds, to connect to the remote server
107      */
setConnectTimeout(int connectTimeout)108     public void setConnectTimeout(int connectTimeout)
109     {
110         _connectTimeout = connectTimeout;
111     }
112 
113     /**
114      * @return the timeout, in milliseconds, to write data to a peer
115      */
getWriteTimeout()116     public int getWriteTimeout()
117     {
118         return _writeTimeout;
119     }
120 
121     /**
122      * @param writeTimeout the timeout, in milliseconds, to write data to a peer
123      */
setWriteTimeout(int writeTimeout)124     public void setWriteTimeout(int writeTimeout)
125     {
126         _writeTimeout = writeTimeout;
127     }
128 
129     @Override
setServer(Server server)130     public void setServer(Server server)
131     {
132         super.setServer(server);
133 
134         server.getContainer().update(this, null, _selectorManager, "selectManager");
135 
136         if (_privateThreadPool)
137             server.getContainer().update(this, null, _privateThreadPool, "threadpool", true);
138         else
139             _threadPool = server.getThreadPool();
140     }
141 
142     /**
143      * @return the thread pool
144      */
getThreadPool()145     public ThreadPool getThreadPool()
146     {
147         return _threadPool;
148     }
149 
150     /**
151      * @param threadPool the thread pool
152      */
setThreadPool(ThreadPool threadPool)153     public void setThreadPool(ThreadPool threadPool)
154     {
155         if (getServer() != null)
156             getServer().getContainer().update(this, _privateThreadPool ? _threadPool : null, threadPool, "threadpool", true);
157         _privateThreadPool = threadPool != null;
158         _threadPool = threadPool;
159     }
160 
161     @Override
doStart()162     protected void doStart() throws Exception
163     {
164         super.doStart();
165 
166         if (_threadPool == null)
167         {
168             _threadPool = getServer().getThreadPool();
169             _privateThreadPool = false;
170         }
171         if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning())
172             ((LifeCycle)_threadPool).start();
173 
174         _selectorManager.start();
175     }
176 
177     @Override
doStop()178     protected void doStop() throws Exception
179     {
180         _selectorManager.stop();
181 
182         ThreadPool threadPool = _threadPool;
183         if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle)
184             ((LifeCycle)threadPool).stop();
185 
186         super.doStop();
187     }
188 
189     @Override
handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)190     public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
191     {
192         if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
193         {
194             LOG.debug("CONNECT request for {}", request.getRequestURI());
195             try
196             {
197                 handleConnect(baseRequest, request, response, request.getRequestURI());
198             }
199             catch(Exception e)
200             {
201                 LOG.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
202                 LOG.debug(e);
203             }
204         }
205         else
206         {
207             super.handle(target, baseRequest, request, response);
208         }
209     }
210 
211     /**
212      * <p>Handles a CONNECT request.</p>
213      * <p>CONNECT requests may have authentication headers such as <code>Proxy-Authorization</code>
214      * that authenticate the client with the proxy.</p>
215      *
216      * @param baseRequest   Jetty-specific http request
217      * @param request       the http request
218      * @param response      the http response
219      * @param serverAddress the remote server address in the form {@code host:port}
220      * @throws ServletException if an application error occurs
221      * @throws IOException      if an I/O error occurs
222      */
handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress)223     protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException
224     {
225         boolean proceed = handleAuthentication(request, response, serverAddress);
226         if (!proceed)
227             return;
228 
229         String host = serverAddress;
230         int port = 80;
231         int colon = serverAddress.indexOf(':');
232         if (colon > 0)
233         {
234             host = serverAddress.substring(0, colon);
235             port = Integer.parseInt(serverAddress.substring(colon + 1));
236         }
237 
238         if (!validateDestination(host))
239         {
240             LOG.info("ProxyHandler: Forbidden destination " + host);
241             response.setStatus(HttpServletResponse.SC_FORBIDDEN);
242             baseRequest.setHandled(true);
243             return;
244         }
245 
246         SocketChannel channel;
247 
248         try
249         {
250             channel = connectToServer(request,host,port);
251         }
252         catch (SocketException se)
253         {
254             LOG.info("ConnectHandler: SocketException " + se.getMessage());
255             response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
256             baseRequest.setHandled(true);
257             return;
258         }
259         catch (SocketTimeoutException ste)
260         {
261             LOG.info("ConnectHandler: SocketTimeoutException" + ste.getMessage());
262             response.setStatus(HttpServletResponse.SC_GATEWAY_TIMEOUT);
263             baseRequest.setHandled(true);
264             return;
265         }
266         catch (IOException ioe)
267         {
268             LOG.info("ConnectHandler: IOException" + ioe.getMessage());
269             response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
270             baseRequest.setHandled(true);
271             return;
272         }
273 
274         // Transfer unread data from old connection to new connection
275         // We need to copy the data to avoid races:
276         // 1. when this unread data is written and the server replies before the clientToProxy
277         // connection is installed (it is only installed after returning from this method)
278         // 2. when the client sends data before this unread data has been written.
279         AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection();
280         Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer();
281         Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer();
282         int length = headerBuffer == null ? 0 : headerBuffer.length();
283         length += bodyBuffer == null ? 0 : bodyBuffer.length();
284         IndirectNIOBuffer buffer = null;
285         if (length > 0)
286         {
287             buffer = new IndirectNIOBuffer(length);
288             if (headerBuffer != null)
289             {
290                 buffer.put(headerBuffer);
291                 headerBuffer.clear();
292             }
293             if (bodyBuffer != null)
294             {
295                 buffer.put(bodyBuffer);
296                 bodyBuffer.clear();
297             }
298         }
299 
300         ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
301         prepareContext(request, context);
302 
303         ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
304 
305         // CONNECT expects a 200 response
306         response.setStatus(HttpServletResponse.SC_OK);
307 
308         // Prevent close
309         baseRequest.getConnection().getGenerator().setPersistent(true);
310 
311         // Close to force last flush it so that the client receives it
312         response.getOutputStream().close();
313 
314         upgradeConnection(request, response, clientToProxy);
315     }
316 
prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)317     private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
318     {
319         AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection();
320         ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
321         ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
322         clientToProxy.setConnection(proxyToServer);
323         proxyToServer.setConnection(clientToProxy);
324         return clientToProxy;
325     }
326 
327     /**
328      * <p>Handles the authentication before setting up the tunnel to the remote server.</p>
329      * <p>The default implementation returns true.</p>
330      *
331      * @param request  the HTTP request
332      * @param response the HTTP response
333      * @param address  the address of the remote server in the form {@code host:port}.
334      * @return true to allow to connect to the remote host, false otherwise
335      * @throws ServletException to report a server error to the caller
336      * @throws IOException      to report a server error to the caller
337      */
handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address)338     protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
339     {
340         return true;
341     }
342 
newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)343     protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
344     {
345         return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
346     }
347 
newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)348     protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
349     {
350         return new ProxyToServerConnection(context, buffer);
351     }
352 
353     // may return null
connectToServer(HttpServletRequest request, String host, int port)354     private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
355     {
356         SocketChannel channel = connect(request, host, port);
357         channel.configureBlocking(false);
358         return channel;
359     }
360 
361     /**
362      * <p>Establishes a connection to the remote server.</p>
363      *
364      * @param request the HTTP request that initiated the tunnel
365      * @param host    the host to connect to
366      * @param port    the port to connect to
367      * @return a {@link SocketChannel} connected to the remote server
368      * @throws IOException if the connection cannot be established
369      */
connect(HttpServletRequest request, String host, int port)370     protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
371     {
372         SocketChannel channel = SocketChannel.open();
373 
374         if (channel == null)
375         {
376             throw new IOException("unable to connect to " + host + ":" + port);
377         }
378 
379         try
380         {
381             // Connect to remote server
382             LOG.debug("Establishing connection to {}:{}", host, port);
383             channel.socket().setTcpNoDelay(true);
384             channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
385             LOG.debug("Established connection to {}:{}", host, port);
386             return channel;
387         }
388         catch (IOException x)
389         {
390             LOG.debug("Failed to establish connection to " + host + ":" + port, x);
391             try
392             {
393                 channel.close();
394             }
395             catch (IOException xx)
396             {
397                 LOG.ignore(xx);
398             }
399             throw x;
400         }
401     }
402 
prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)403     protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
404     {
405     }
406 
upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection)407     private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
408     {
409         // Set the new connection as request attribute and change the status to 101
410         // so that Jetty understands that it has to upgrade the connection
411         request.setAttribute("org.eclipse.jetty.io.Connection", connection);
412         response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
413         LOG.debug("Upgraded connection to {}", connection);
414     }
415 
register(SocketChannel channel, ProxyToServerConnection proxyToServer)416     private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
417     {
418         _selectorManager.register(channel, proxyToServer);
419         proxyToServer.waitReady(_connectTimeout);
420     }
421 
422     /**
423      * <p>Reads (with non-blocking semantic) into the given {@code buffer} from the given {@code endPoint}.</p>
424      *
425      * @param endPoint the endPoint to read from
426      * @param buffer   the buffer to read data into
427      * @param context  the context information related to the connection
428      * @return the number of bytes read (possibly 0 since the read is non-blocking)
429      *         or -1 if the channel has been closed remotely
430      * @throws IOException if the endPoint cannot be read
431      */
read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context)432     protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
433     {
434         return endPoint.fill(buffer);
435     }
436 
437     /**
438      * <p>Writes (with blocking semantic) the given buffer of data onto the given endPoint.</p>
439      *
440      * @param endPoint the endPoint to write to
441      * @param buffer   the buffer to write
442      * @param context  the context information related to the connection
443      * @throws IOException if the buffer cannot be written
444      * @return the number of bytes written
445      */
write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context)446     protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
447     {
448         if (buffer == null)
449             return 0;
450 
451         int length = buffer.length();
452         final StringBuilder debug = LOG.isDebugEnabled()?new StringBuilder():null;
453         int flushed = endPoint.flush(buffer);
454         if (debug!=null)
455             debug.append(flushed);
456 
457         // Loop until all written
458         while (buffer.length()>0 && !endPoint.isOutputShutdown())
459         {
460             if (!endPoint.isBlocking())
461             {
462                 boolean ready = endPoint.blockWritable(getWriteTimeout());
463                 if (!ready)
464                     throw new IOException("Write timeout");
465             }
466             flushed = endPoint.flush(buffer);
467             if (debug!=null)
468                 debug.append("+").append(flushed);
469         }
470 
471         LOG.debug("Written {}/{} bytes {}", debug, length, endPoint);
472         buffer.compact();
473         return length;
474     }
475 
476     private class Manager extends SelectorManager
477     {
478         @Override
newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)479         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
480         {
481             SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, key, channel.socket().getSoTimeout());
482             endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
483             endp.setMaxIdleTime(_writeTimeout);
484             return endp;
485         }
486 
487         @Override
newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)488         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
489         {
490             ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment;
491             proxyToServer.setTimeStamp(System.currentTimeMillis());
492             proxyToServer.setEndPoint(endpoint);
493             return proxyToServer;
494         }
495 
496         @Override
endPointOpened(SelectChannelEndPoint endpoint)497         protected void endPointOpened(SelectChannelEndPoint endpoint)
498         {
499             ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
500             proxyToServer.ready();
501         }
502 
503         @Override
dispatch(Runnable task)504         public boolean dispatch(Runnable task)
505         {
506             return _threadPool.dispatch(task);
507         }
508 
509         @Override
endPointClosed(SelectChannelEndPoint endpoint)510         protected void endPointClosed(SelectChannelEndPoint endpoint)
511         {
512         }
513 
514         @Override
endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)515         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
516         {
517         }
518     }
519 
520     public class ProxyToServerConnection implements AsyncConnection
521     {
522         private final CountDownLatch _ready = new CountDownLatch(1);
523         private final Buffer _buffer = new IndirectNIOBuffer(4096);
524         private final ConcurrentMap<String, Object> _context;
525         private volatile Buffer _data;
526         private volatile ClientToProxyConnection _toClient;
527         private volatile long _timestamp;
528         private volatile AsyncEndPoint _endPoint;
529 
ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)530         public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
531         {
532             _context = context;
533             _data = data;
534         }
535 
536         @Override
toString()537         public String toString()
538         {
539             StringBuilder builder = new StringBuilder("ProxyToServer");
540             builder.append("(:").append(_endPoint.getLocalPort());
541             builder.append("<=>:").append(_endPoint.getRemotePort());
542             return builder.append(")").toString();
543         }
544 
handle()545         public Connection handle() throws IOException
546         {
547             LOG.debug("{}: begin reading from server", this);
548             try
549             {
550                 writeData();
551 
552                 while (true)
553                 {
554                     int read = read(_endPoint, _buffer, _context);
555 
556                     if (read == -1)
557                     {
558                         LOG.debug("{}: server closed connection {}", this, _endPoint);
559 
560                         if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
561                             closeClient();
562                         else
563                             _toClient.shutdownOutput();
564 
565                         break;
566                     }
567 
568                     if (read == 0)
569                         break;
570 
571                     LOG.debug("{}: read from server {} bytes {}", this, read, _endPoint);
572                     int written = write(_toClient._endPoint, _buffer, _context);
573                     LOG.debug("{}: written to {} {} bytes", this, _toClient, written);
574                 }
575                 return this;
576             }
577             catch (ClosedChannelException x)
578             {
579                 LOG.debug(x);
580                 throw x;
581             }
582             catch (IOException x)
583             {
584                 LOG.warn(this + ": unexpected exception", x);
585                 close();
586                 throw x;
587             }
588             catch (RuntimeException x)
589             {
590                 LOG.warn(this + ": unexpected exception", x);
591                 close();
592                 throw x;
593             }
594             finally
595             {
596                 LOG.debug("{}: end reading from server", this);
597             }
598         }
599 
onInputShutdown()600         public void onInputShutdown() throws IOException
601         {
602         }
603 
writeData()604         private void writeData() throws IOException
605         {
606             // This method is called from handle() and closeServer()
607             // which may happen concurrently (e.g. a client closing
608             // while reading from the server), so needs synchronization
609             synchronized (this)
610             {
611                 if (_data != null)
612                 {
613                     try
614                     {
615                         int written = write(_endPoint, _data, _context);
616                         LOG.debug("{}: written to server {} bytes", this, written);
617                     }
618                     finally
619                     {
620                         // Attempt once to write the data; if the write fails (for example
621                         // because the connection is already closed), clear the data and
622                         // give up to avoid to continue to write data to a closed connection
623                         _data = null;
624                     }
625                 }
626             }
627         }
628 
setConnection(ClientToProxyConnection connection)629         public void setConnection(ClientToProxyConnection connection)
630         {
631             _toClient = connection;
632         }
633 
getTimeStamp()634         public long getTimeStamp()
635         {
636             return _timestamp;
637         }
638 
setTimeStamp(long timestamp)639         public void setTimeStamp(long timestamp)
640         {
641             _timestamp = timestamp;
642         }
643 
setEndPoint(AsyncEndPoint endpoint)644         public void setEndPoint(AsyncEndPoint endpoint)
645         {
646             _endPoint = endpoint;
647         }
648 
isIdle()649         public boolean isIdle()
650         {
651             return false;
652         }
653 
isSuspended()654         public boolean isSuspended()
655         {
656             return false;
657         }
658 
onClose()659         public void onClose()
660         {
661         }
662 
ready()663         public void ready()
664         {
665             _ready.countDown();
666         }
667 
waitReady(long timeout)668         public void waitReady(long timeout) throws IOException
669         {
670             try
671             {
672                 _ready.await(timeout, TimeUnit.MILLISECONDS);
673             }
674             catch (final InterruptedException x)
675             {
676                 throw new IOException()
677                 {{
678                         initCause(x);
679                     }};
680             }
681         }
682 
closeClient()683         public void closeClient() throws IOException
684         {
685             _toClient.closeClient();
686         }
687 
closeServer()688         public void closeServer() throws IOException
689         {
690             _endPoint.close();
691         }
692 
close()693         public void close()
694         {
695             try
696             {
697                 closeClient();
698             }
699             catch (IOException x)
700             {
701                 LOG.debug(this + ": unexpected exception closing the client", x);
702             }
703 
704             try
705             {
706                 closeServer();
707             }
708             catch (IOException x)
709             {
710                 LOG.debug(this + ": unexpected exception closing the server", x);
711             }
712         }
713 
shutdownOutput()714         public void shutdownOutput() throws IOException
715         {
716             writeData();
717             _endPoint.shutdownOutput();
718         }
719 
onIdleExpired(long idleForMs)720         public void onIdleExpired(long idleForMs)
721         {
722             try
723             {
724                 LOG.debug("{} idle expired", this);
725                 if (_endPoint.isOutputShutdown())
726                     close();
727                 else
728                     shutdownOutput();
729             }
730             catch(Exception e)
731             {
732                 LOG.debug(e);
733                 close();
734             }
735         }
736     }
737 
738     public class ClientToProxyConnection implements AsyncConnection
739     {
740         private final Buffer _buffer = new IndirectNIOBuffer(4096);
741         private final ConcurrentMap<String, Object> _context;
742         private final SocketChannel _channel;
743         private final EndPoint _endPoint;
744         private final long _timestamp;
745         private volatile ProxyToServerConnection _toServer;
746         private boolean _firstTime = true;
747 
ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)748         public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
749         {
750             _context = context;
751             _channel = channel;
752             _endPoint = endPoint;
753             _timestamp = timestamp;
754         }
755 
756         @Override
toString()757         public String toString()
758         {
759             StringBuilder builder = new StringBuilder("ClientToProxy");
760             builder.append("(:").append(_endPoint.getLocalPort());
761             builder.append("<=>:").append(_endPoint.getRemotePort());
762             return builder.append(")").toString();
763         }
764 
handle()765         public Connection handle() throws IOException
766         {
767             LOG.debug("{}: begin reading from client", this);
768             try
769             {
770                 if (_firstTime)
771                 {
772                     _firstTime = false;
773                     register(_channel, _toServer);
774                     LOG.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
775                 }
776 
777                 while (true)
778                 {
779                     int read = read(_endPoint, _buffer, _context);
780 
781                     if (read == -1)
782                     {
783                         LOG.debug("{}: client closed connection {}", this, _endPoint);
784 
785                         if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
786                             closeServer();
787                         else
788                             _toServer.shutdownOutput();
789 
790                         break;
791                     }
792 
793                     if (read == 0)
794                         break;
795 
796                     LOG.debug("{}: read from client {} bytes {}", this, read, _endPoint);
797                     int written = write(_toServer._endPoint, _buffer, _context);
798                     LOG.debug("{}: written to {} {} bytes", this, _toServer, written);
799                 }
800                 return this;
801             }
802             catch (ClosedChannelException x)
803             {
804                 LOG.debug(x);
805                 closeServer();
806                 throw x;
807             }
808             catch (IOException x)
809             {
810                 LOG.warn(this + ": unexpected exception", x);
811                 close();
812                 throw x;
813             }
814             catch (RuntimeException x)
815             {
816                 LOG.warn(this + ": unexpected exception", x);
817                 close();
818                 throw x;
819             }
820             finally
821             {
822                 LOG.debug("{}: end reading from client", this);
823             }
824         }
825 
onInputShutdown()826         public void onInputShutdown() throws IOException
827         {
828         }
829 
getTimeStamp()830         public long getTimeStamp()
831         {
832             return _timestamp;
833         }
834 
isIdle()835         public boolean isIdle()
836         {
837             return false;
838         }
839 
isSuspended()840         public boolean isSuspended()
841         {
842             return false;
843         }
844 
onClose()845         public void onClose()
846         {
847         }
848 
setConnection(ProxyToServerConnection connection)849         public void setConnection(ProxyToServerConnection connection)
850         {
851             _toServer = connection;
852         }
853 
closeClient()854         public void closeClient() throws IOException
855         {
856             _endPoint.close();
857         }
858 
closeServer()859         public void closeServer() throws IOException
860         {
861             _toServer.closeServer();
862         }
863 
close()864         public void close()
865         {
866             try
867             {
868                 closeClient();
869             }
870             catch (IOException x)
871             {
872                 LOG.debug(this + ": unexpected exception closing the client", x);
873             }
874 
875             try
876             {
877                 closeServer();
878             }
879             catch (IOException x)
880             {
881                 LOG.debug(this + ": unexpected exception closing the server", x);
882             }
883         }
884 
shutdownOutput()885         public void shutdownOutput() throws IOException
886         {
887             _endPoint.shutdownOutput();
888         }
889 
onIdleExpired(long idleForMs)890         public void onIdleExpired(long idleForMs)
891         {
892             try
893             {
894                 LOG.debug("{} idle expired", this);
895                 if (_endPoint.isOutputShutdown())
896                     close();
897                 else
898                     shutdownOutput();
899             }
900             catch(Exception e)
901             {
902                 LOG.debug(e);
903                 close();
904             }
905         }
906     }
907 
908     /**
909      * Add a whitelist entry to an existing handler configuration
910      *
911      * @param entry new whitelist entry
912      */
addWhite(String entry)913     public void addWhite(String entry)
914     {
915         add(entry, _white);
916     }
917 
918     /**
919      * Add a blacklist entry to an existing handler configuration
920      *
921      * @param entry new blacklist entry
922      */
addBlack(String entry)923     public void addBlack(String entry)
924     {
925         add(entry, _black);
926     }
927 
928     /**
929      * Re-initialize the whitelist of existing handler object
930      *
931      * @param entries array of whitelist entries
932      */
setWhite(String[] entries)933     public void setWhite(String[] entries)
934     {
935         set(entries, _white);
936     }
937 
938     /**
939      * Re-initialize the blacklist of existing handler object
940      *
941      * @param entries array of blacklist entries
942      */
setBlack(String[] entries)943     public void setBlack(String[] entries)
944     {
945         set(entries, _black);
946     }
947 
948     /**
949      * Helper method to process a list of new entries and replace
950      * the content of the specified host map
951      *
952      * @param entries new entries
953      * @param hostMap target host map
954      */
set(String[] entries, HostMap<String> hostMap)955     protected void set(String[] entries, HostMap<String> hostMap)
956     {
957         hostMap.clear();
958 
959         if (entries != null && entries.length > 0)
960         {
961             for (String addrPath : entries)
962             {
963                 add(addrPath, hostMap);
964             }
965         }
966     }
967 
968     /**
969      * Helper method to process the new entry and add it to
970      * the specified host map.
971      *
972      * @param entry      new entry
973      * @param hostMap target host map
974      */
add(String entry, HostMap<String> hostMap)975     private void add(String entry, HostMap<String> hostMap)
976     {
977         if (entry != null && entry.length() > 0)
978         {
979             entry = entry.trim();
980             if (hostMap.get(entry) == null)
981             {
982                 hostMap.put(entry, entry);
983             }
984         }
985     }
986 
987     /**
988      * Check the request hostname against white- and blacklist.
989      *
990      * @param host hostname to check
991      * @return true if hostname is allowed to be proxied
992      */
validateDestination(String host)993     public boolean validateDestination(String host)
994     {
995         if (_white.size() > 0)
996         {
997             Object whiteObj = _white.getLazyMatches(host);
998             if (whiteObj == null)
999             {
1000                 return false;
1001             }
1002         }
1003 
1004         if (_black.size() > 0)
1005         {
1006             Object blackObj = _black.getLazyMatches(host);
1007             if (blackObj != null)
1008             {
1009                 return false;
1010             }
1011         }
1012 
1013         return true;
1014     }
1015 
1016     @Override
dump(Appendable out, String indent)1017     public void dump(Appendable out, String indent) throws IOException
1018     {
1019         dumpThis(out);
1020         if (_privateThreadPool)
1021             dump(out, indent, Arrays.asList(_threadPool, _selectorManager), TypeUtil.asList(getHandlers()), getBeans());
1022         else
1023             dump(out, indent, Arrays.asList(_selectorManager), TypeUtil.asList(getHandlers()), getBeans());
1024     }
1025 }
1026