1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.server.handler; 20 21 import java.io.IOException; 22 import java.net.InetSocketAddress; 23 import java.net.SocketException; 24 import java.net.SocketTimeoutException; 25 import java.nio.channels.ClosedChannelException; 26 import java.nio.channels.SelectionKey; 27 import java.nio.channels.SocketChannel; 28 import java.util.Arrays; 29 import java.util.concurrent.ConcurrentHashMap; 30 import java.util.concurrent.ConcurrentMap; 31 import java.util.concurrent.CountDownLatch; 32 import java.util.concurrent.TimeUnit; 33 import javax.servlet.ServletException; 34 import javax.servlet.http.HttpServletRequest; 35 import javax.servlet.http.HttpServletResponse; 36 37 import org.eclipse.jetty.http.HttpMethods; 38 import org.eclipse.jetty.http.HttpParser; 39 import org.eclipse.jetty.io.AsyncEndPoint; 40 import org.eclipse.jetty.io.Buffer; 41 import org.eclipse.jetty.io.ConnectedEndPoint; 42 import org.eclipse.jetty.io.Connection; 43 import org.eclipse.jetty.io.EndPoint; 44 import org.eclipse.jetty.io.nio.AsyncConnection; 45 import org.eclipse.jetty.io.nio.IndirectNIOBuffer; 46 import org.eclipse.jetty.io.nio.SelectChannelEndPoint; 47 import org.eclipse.jetty.io.nio.SelectorManager; 48 import org.eclipse.jetty.server.AbstractHttpConnection; 49 import org.eclipse.jetty.server.Handler; 50 import org.eclipse.jetty.server.Request; 51 import org.eclipse.jetty.server.Server; 52 import org.eclipse.jetty.util.HostMap; 53 import org.eclipse.jetty.util.TypeUtil; 54 import org.eclipse.jetty.util.component.LifeCycle; 55 import org.eclipse.jetty.util.log.Log; 56 import org.eclipse.jetty.util.log.Logger; 57 import org.eclipse.jetty.util.thread.ThreadPool; 58 59 /** 60 * <p>Implementation of a tunneling proxy that supports HTTP CONNECT.</p> 61 * <p>To work as CONNECT proxy, objects of this class must be instantiated using the no-arguments 62 * constructor, since the remote server information will be present in the CONNECT URI.</p> 63 */ 64 public class ConnectHandler extends HandlerWrapper 65 { 66 private static final Logger LOG = Log.getLogger(ConnectHandler.class); 67 private final SelectorManager _selectorManager = new Manager(); 68 private volatile int _connectTimeout = 5000; 69 private volatile int _writeTimeout = 30000; 70 private volatile ThreadPool _threadPool; 71 private volatile boolean _privateThreadPool; 72 private HostMap<String> _white = new HostMap<String>(); 73 private HostMap<String> _black = new HostMap<String>(); 74 ConnectHandler()75 public ConnectHandler() 76 { 77 this(null); 78 } 79 ConnectHandler(String[] white, String[] black)80 public ConnectHandler(String[] white, String[] black) 81 { 82 this(null, white, black); 83 } 84 ConnectHandler(Handler handler)85 public ConnectHandler(Handler handler) 86 { 87 setHandler(handler); 88 } 89 ConnectHandler(Handler handler, String[] white, String[] black)90 public ConnectHandler(Handler handler, String[] white, String[] black) 91 { 92 setHandler(handler); 93 set(white, _white); 94 set(black, _black); 95 } 96 97 /** 98 * @return the timeout, in milliseconds, to connect to the remote server 99 */ getConnectTimeout()100 public int getConnectTimeout() 101 { 102 return _connectTimeout; 103 } 104 105 /** 106 * @param connectTimeout the timeout, in milliseconds, to connect to the remote server 107 */ setConnectTimeout(int connectTimeout)108 public void setConnectTimeout(int connectTimeout) 109 { 110 _connectTimeout = connectTimeout; 111 } 112 113 /** 114 * @return the timeout, in milliseconds, to write data to a peer 115 */ getWriteTimeout()116 public int getWriteTimeout() 117 { 118 return _writeTimeout; 119 } 120 121 /** 122 * @param writeTimeout the timeout, in milliseconds, to write data to a peer 123 */ setWriteTimeout(int writeTimeout)124 public void setWriteTimeout(int writeTimeout) 125 { 126 _writeTimeout = writeTimeout; 127 } 128 129 @Override setServer(Server server)130 public void setServer(Server server) 131 { 132 super.setServer(server); 133 134 server.getContainer().update(this, null, _selectorManager, "selectManager"); 135 136 if (_privateThreadPool) 137 server.getContainer().update(this, null, _privateThreadPool, "threadpool", true); 138 else 139 _threadPool = server.getThreadPool(); 140 } 141 142 /** 143 * @return the thread pool 144 */ getThreadPool()145 public ThreadPool getThreadPool() 146 { 147 return _threadPool; 148 } 149 150 /** 151 * @param threadPool the thread pool 152 */ setThreadPool(ThreadPool threadPool)153 public void setThreadPool(ThreadPool threadPool) 154 { 155 if (getServer() != null) 156 getServer().getContainer().update(this, _privateThreadPool ? _threadPool : null, threadPool, "threadpool", true); 157 _privateThreadPool = threadPool != null; 158 _threadPool = threadPool; 159 } 160 161 @Override doStart()162 protected void doStart() throws Exception 163 { 164 super.doStart(); 165 166 if (_threadPool == null) 167 { 168 _threadPool = getServer().getThreadPool(); 169 _privateThreadPool = false; 170 } 171 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning()) 172 ((LifeCycle)_threadPool).start(); 173 174 _selectorManager.start(); 175 } 176 177 @Override doStop()178 protected void doStop() throws Exception 179 { 180 _selectorManager.stop(); 181 182 ThreadPool threadPool = _threadPool; 183 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle) 184 ((LifeCycle)threadPool).stop(); 185 186 super.doStop(); 187 } 188 189 @Override handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)190 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 191 { 192 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod())) 193 { 194 LOG.debug("CONNECT request for {}", request.getRequestURI()); 195 try 196 { 197 handleConnect(baseRequest, request, response, request.getRequestURI()); 198 } 199 catch(Exception e) 200 { 201 LOG.warn("ConnectHandler "+baseRequest.getUri()+" "+ e); 202 LOG.debug(e); 203 } 204 } 205 else 206 { 207 super.handle(target, baseRequest, request, response); 208 } 209 } 210 211 /** 212 * <p>Handles a CONNECT request.</p> 213 * <p>CONNECT requests may have authentication headers such as <code>Proxy-Authorization</code> 214 * that authenticate the client with the proxy.</p> 215 * 216 * @param baseRequest Jetty-specific http request 217 * @param request the http request 218 * @param response the http response 219 * @param serverAddress the remote server address in the form {@code host:port} 220 * @throws ServletException if an application error occurs 221 * @throws IOException if an I/O error occurs 222 */ handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress)223 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException 224 { 225 boolean proceed = handleAuthentication(request, response, serverAddress); 226 if (!proceed) 227 return; 228 229 String host = serverAddress; 230 int port = 80; 231 int colon = serverAddress.indexOf(':'); 232 if (colon > 0) 233 { 234 host = serverAddress.substring(0, colon); 235 port = Integer.parseInt(serverAddress.substring(colon + 1)); 236 } 237 238 if (!validateDestination(host)) 239 { 240 LOG.info("ProxyHandler: Forbidden destination " + host); 241 response.setStatus(HttpServletResponse.SC_FORBIDDEN); 242 baseRequest.setHandled(true); 243 return; 244 } 245 246 SocketChannel channel; 247 248 try 249 { 250 channel = connectToServer(request,host,port); 251 } 252 catch (SocketException se) 253 { 254 LOG.info("ConnectHandler: SocketException " + se.getMessage()); 255 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 256 baseRequest.setHandled(true); 257 return; 258 } 259 catch (SocketTimeoutException ste) 260 { 261 LOG.info("ConnectHandler: SocketTimeoutException" + ste.getMessage()); 262 response.setStatus(HttpServletResponse.SC_GATEWAY_TIMEOUT); 263 baseRequest.setHandled(true); 264 return; 265 } 266 catch (IOException ioe) 267 { 268 LOG.info("ConnectHandler: IOException" + ioe.getMessage()); 269 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 270 baseRequest.setHandled(true); 271 return; 272 } 273 274 // Transfer unread data from old connection to new connection 275 // We need to copy the data to avoid races: 276 // 1. when this unread data is written and the server replies before the clientToProxy 277 // connection is installed (it is only installed after returning from this method) 278 // 2. when the client sends data before this unread data has been written. 279 AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection(); 280 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer(); 281 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer(); 282 int length = headerBuffer == null ? 0 : headerBuffer.length(); 283 length += bodyBuffer == null ? 0 : bodyBuffer.length(); 284 IndirectNIOBuffer buffer = null; 285 if (length > 0) 286 { 287 buffer = new IndirectNIOBuffer(length); 288 if (headerBuffer != null) 289 { 290 buffer.put(headerBuffer); 291 headerBuffer.clear(); 292 } 293 if (bodyBuffer != null) 294 { 295 buffer.put(bodyBuffer); 296 bodyBuffer.clear(); 297 } 298 } 299 300 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>(); 301 prepareContext(request, context); 302 303 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer); 304 305 // CONNECT expects a 200 response 306 response.setStatus(HttpServletResponse.SC_OK); 307 308 // Prevent close 309 baseRequest.getConnection().getGenerator().setPersistent(true); 310 311 // Close to force last flush it so that the client receives it 312 response.getOutputStream().close(); 313 314 upgradeConnection(request, response, clientToProxy); 315 } 316 prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)317 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer) 318 { 319 AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection(); 320 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer); 321 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp()); 322 clientToProxy.setConnection(proxyToServer); 323 proxyToServer.setConnection(clientToProxy); 324 return clientToProxy; 325 } 326 327 /** 328 * <p>Handles the authentication before setting up the tunnel to the remote server.</p> 329 * <p>The default implementation returns true.</p> 330 * 331 * @param request the HTTP request 332 * @param response the HTTP response 333 * @param address the address of the remote server in the form {@code host:port}. 334 * @return true to allow to connect to the remote host, false otherwise 335 * @throws ServletException to report a server error to the caller 336 * @throws IOException to report a server error to the caller 337 */ handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address)338 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException 339 { 340 return true; 341 } 342 newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)343 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp) 344 { 345 return new ClientToProxyConnection(context, channel, endPoint, timeStamp); 346 } 347 newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)348 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer) 349 { 350 return new ProxyToServerConnection(context, buffer); 351 } 352 353 // may return null connectToServer(HttpServletRequest request, String host, int port)354 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException 355 { 356 SocketChannel channel = connect(request, host, port); 357 channel.configureBlocking(false); 358 return channel; 359 } 360 361 /** 362 * <p>Establishes a connection to the remote server.</p> 363 * 364 * @param request the HTTP request that initiated the tunnel 365 * @param host the host to connect to 366 * @param port the port to connect to 367 * @return a {@link SocketChannel} connected to the remote server 368 * @throws IOException if the connection cannot be established 369 */ connect(HttpServletRequest request, String host, int port)370 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException 371 { 372 SocketChannel channel = SocketChannel.open(); 373 374 if (channel == null) 375 { 376 throw new IOException("unable to connect to " + host + ":" + port); 377 } 378 379 try 380 { 381 // Connect to remote server 382 LOG.debug("Establishing connection to {}:{}", host, port); 383 channel.socket().setTcpNoDelay(true); 384 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout()); 385 LOG.debug("Established connection to {}:{}", host, port); 386 return channel; 387 } 388 catch (IOException x) 389 { 390 LOG.debug("Failed to establish connection to " + host + ":" + port, x); 391 try 392 { 393 channel.close(); 394 } 395 catch (IOException xx) 396 { 397 LOG.ignore(xx); 398 } 399 throw x; 400 } 401 } 402 prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)403 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context) 404 { 405 } 406 upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection)407 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException 408 { 409 // Set the new connection as request attribute and change the status to 101 410 // so that Jetty understands that it has to upgrade the connection 411 request.setAttribute("org.eclipse.jetty.io.Connection", connection); 412 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); 413 LOG.debug("Upgraded connection to {}", connection); 414 } 415 register(SocketChannel channel, ProxyToServerConnection proxyToServer)416 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException 417 { 418 _selectorManager.register(channel, proxyToServer); 419 proxyToServer.waitReady(_connectTimeout); 420 } 421 422 /** 423 * <p>Reads (with non-blocking semantic) into the given {@code buffer} from the given {@code endPoint}.</p> 424 * 425 * @param endPoint the endPoint to read from 426 * @param buffer the buffer to read data into 427 * @param context the context information related to the connection 428 * @return the number of bytes read (possibly 0 since the read is non-blocking) 429 * or -1 if the channel has been closed remotely 430 * @throws IOException if the endPoint cannot be read 431 */ read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context)432 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException 433 { 434 return endPoint.fill(buffer); 435 } 436 437 /** 438 * <p>Writes (with blocking semantic) the given buffer of data onto the given endPoint.</p> 439 * 440 * @param endPoint the endPoint to write to 441 * @param buffer the buffer to write 442 * @param context the context information related to the connection 443 * @throws IOException if the buffer cannot be written 444 * @return the number of bytes written 445 */ write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context)446 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException 447 { 448 if (buffer == null) 449 return 0; 450 451 int length = buffer.length(); 452 final StringBuilder debug = LOG.isDebugEnabled()?new StringBuilder():null; 453 int flushed = endPoint.flush(buffer); 454 if (debug!=null) 455 debug.append(flushed); 456 457 // Loop until all written 458 while (buffer.length()>0 && !endPoint.isOutputShutdown()) 459 { 460 if (!endPoint.isBlocking()) 461 { 462 boolean ready = endPoint.blockWritable(getWriteTimeout()); 463 if (!ready) 464 throw new IOException("Write timeout"); 465 } 466 flushed = endPoint.flush(buffer); 467 if (debug!=null) 468 debug.append("+").append(flushed); 469 } 470 471 LOG.debug("Written {}/{} bytes {}", debug, length, endPoint); 472 buffer.compact(); 473 return length; 474 } 475 476 private class Manager extends SelectorManager 477 { 478 @Override newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)479 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException 480 { 481 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, key, channel.socket().getSoTimeout()); 482 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); 483 endp.setMaxIdleTime(_writeTimeout); 484 return endp; 485 } 486 487 @Override newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)488 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) 489 { 490 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment; 491 proxyToServer.setTimeStamp(System.currentTimeMillis()); 492 proxyToServer.setEndPoint(endpoint); 493 return proxyToServer; 494 } 495 496 @Override endPointOpened(SelectChannelEndPoint endpoint)497 protected void endPointOpened(SelectChannelEndPoint endpoint) 498 { 499 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment(); 500 proxyToServer.ready(); 501 } 502 503 @Override dispatch(Runnable task)504 public boolean dispatch(Runnable task) 505 { 506 return _threadPool.dispatch(task); 507 } 508 509 @Override endPointClosed(SelectChannelEndPoint endpoint)510 protected void endPointClosed(SelectChannelEndPoint endpoint) 511 { 512 } 513 514 @Override endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)515 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) 516 { 517 } 518 } 519 520 public class ProxyToServerConnection implements AsyncConnection 521 { 522 private final CountDownLatch _ready = new CountDownLatch(1); 523 private final Buffer _buffer = new IndirectNIOBuffer(4096); 524 private final ConcurrentMap<String, Object> _context; 525 private volatile Buffer _data; 526 private volatile ClientToProxyConnection _toClient; 527 private volatile long _timestamp; 528 private volatile AsyncEndPoint _endPoint; 529 ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)530 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data) 531 { 532 _context = context; 533 _data = data; 534 } 535 536 @Override toString()537 public String toString() 538 { 539 StringBuilder builder = new StringBuilder("ProxyToServer"); 540 builder.append("(:").append(_endPoint.getLocalPort()); 541 builder.append("<=>:").append(_endPoint.getRemotePort()); 542 return builder.append(")").toString(); 543 } 544 handle()545 public Connection handle() throws IOException 546 { 547 LOG.debug("{}: begin reading from server", this); 548 try 549 { 550 writeData(); 551 552 while (true) 553 { 554 int read = read(_endPoint, _buffer, _context); 555 556 if (read == -1) 557 { 558 LOG.debug("{}: server closed connection {}", this, _endPoint); 559 560 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) 561 closeClient(); 562 else 563 _toClient.shutdownOutput(); 564 565 break; 566 } 567 568 if (read == 0) 569 break; 570 571 LOG.debug("{}: read from server {} bytes {}", this, read, _endPoint); 572 int written = write(_toClient._endPoint, _buffer, _context); 573 LOG.debug("{}: written to {} {} bytes", this, _toClient, written); 574 } 575 return this; 576 } 577 catch (ClosedChannelException x) 578 { 579 LOG.debug(x); 580 throw x; 581 } 582 catch (IOException x) 583 { 584 LOG.warn(this + ": unexpected exception", x); 585 close(); 586 throw x; 587 } 588 catch (RuntimeException x) 589 { 590 LOG.warn(this + ": unexpected exception", x); 591 close(); 592 throw x; 593 } 594 finally 595 { 596 LOG.debug("{}: end reading from server", this); 597 } 598 } 599 onInputShutdown()600 public void onInputShutdown() throws IOException 601 { 602 } 603 writeData()604 private void writeData() throws IOException 605 { 606 // This method is called from handle() and closeServer() 607 // which may happen concurrently (e.g. a client closing 608 // while reading from the server), so needs synchronization 609 synchronized (this) 610 { 611 if (_data != null) 612 { 613 try 614 { 615 int written = write(_endPoint, _data, _context); 616 LOG.debug("{}: written to server {} bytes", this, written); 617 } 618 finally 619 { 620 // Attempt once to write the data; if the write fails (for example 621 // because the connection is already closed), clear the data and 622 // give up to avoid to continue to write data to a closed connection 623 _data = null; 624 } 625 } 626 } 627 } 628 setConnection(ClientToProxyConnection connection)629 public void setConnection(ClientToProxyConnection connection) 630 { 631 _toClient = connection; 632 } 633 getTimeStamp()634 public long getTimeStamp() 635 { 636 return _timestamp; 637 } 638 setTimeStamp(long timestamp)639 public void setTimeStamp(long timestamp) 640 { 641 _timestamp = timestamp; 642 } 643 setEndPoint(AsyncEndPoint endpoint)644 public void setEndPoint(AsyncEndPoint endpoint) 645 { 646 _endPoint = endpoint; 647 } 648 isIdle()649 public boolean isIdle() 650 { 651 return false; 652 } 653 isSuspended()654 public boolean isSuspended() 655 { 656 return false; 657 } 658 onClose()659 public void onClose() 660 { 661 } 662 ready()663 public void ready() 664 { 665 _ready.countDown(); 666 } 667 waitReady(long timeout)668 public void waitReady(long timeout) throws IOException 669 { 670 try 671 { 672 _ready.await(timeout, TimeUnit.MILLISECONDS); 673 } 674 catch (final InterruptedException x) 675 { 676 throw new IOException() 677 {{ 678 initCause(x); 679 }}; 680 } 681 } 682 closeClient()683 public void closeClient() throws IOException 684 { 685 _toClient.closeClient(); 686 } 687 closeServer()688 public void closeServer() throws IOException 689 { 690 _endPoint.close(); 691 } 692 close()693 public void close() 694 { 695 try 696 { 697 closeClient(); 698 } 699 catch (IOException x) 700 { 701 LOG.debug(this + ": unexpected exception closing the client", x); 702 } 703 704 try 705 { 706 closeServer(); 707 } 708 catch (IOException x) 709 { 710 LOG.debug(this + ": unexpected exception closing the server", x); 711 } 712 } 713 shutdownOutput()714 public void shutdownOutput() throws IOException 715 { 716 writeData(); 717 _endPoint.shutdownOutput(); 718 } 719 onIdleExpired(long idleForMs)720 public void onIdleExpired(long idleForMs) 721 { 722 try 723 { 724 LOG.debug("{} idle expired", this); 725 if (_endPoint.isOutputShutdown()) 726 close(); 727 else 728 shutdownOutput(); 729 } 730 catch(Exception e) 731 { 732 LOG.debug(e); 733 close(); 734 } 735 } 736 } 737 738 public class ClientToProxyConnection implements AsyncConnection 739 { 740 private final Buffer _buffer = new IndirectNIOBuffer(4096); 741 private final ConcurrentMap<String, Object> _context; 742 private final SocketChannel _channel; 743 private final EndPoint _endPoint; 744 private final long _timestamp; 745 private volatile ProxyToServerConnection _toServer; 746 private boolean _firstTime = true; 747 ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)748 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp) 749 { 750 _context = context; 751 _channel = channel; 752 _endPoint = endPoint; 753 _timestamp = timestamp; 754 } 755 756 @Override toString()757 public String toString() 758 { 759 StringBuilder builder = new StringBuilder("ClientToProxy"); 760 builder.append("(:").append(_endPoint.getLocalPort()); 761 builder.append("<=>:").append(_endPoint.getRemotePort()); 762 return builder.append(")").toString(); 763 } 764 handle()765 public Connection handle() throws IOException 766 { 767 LOG.debug("{}: begin reading from client", this); 768 try 769 { 770 if (_firstTime) 771 { 772 _firstTime = false; 773 register(_channel, _toServer); 774 LOG.debug("{}: registered channel {} with connection {}", this, _channel, _toServer); 775 } 776 777 while (true) 778 { 779 int read = read(_endPoint, _buffer, _context); 780 781 if (read == -1) 782 { 783 LOG.debug("{}: client closed connection {}", this, _endPoint); 784 785 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) 786 closeServer(); 787 else 788 _toServer.shutdownOutput(); 789 790 break; 791 } 792 793 if (read == 0) 794 break; 795 796 LOG.debug("{}: read from client {} bytes {}", this, read, _endPoint); 797 int written = write(_toServer._endPoint, _buffer, _context); 798 LOG.debug("{}: written to {} {} bytes", this, _toServer, written); 799 } 800 return this; 801 } 802 catch (ClosedChannelException x) 803 { 804 LOG.debug(x); 805 closeServer(); 806 throw x; 807 } 808 catch (IOException x) 809 { 810 LOG.warn(this + ": unexpected exception", x); 811 close(); 812 throw x; 813 } 814 catch (RuntimeException x) 815 { 816 LOG.warn(this + ": unexpected exception", x); 817 close(); 818 throw x; 819 } 820 finally 821 { 822 LOG.debug("{}: end reading from client", this); 823 } 824 } 825 onInputShutdown()826 public void onInputShutdown() throws IOException 827 { 828 } 829 getTimeStamp()830 public long getTimeStamp() 831 { 832 return _timestamp; 833 } 834 isIdle()835 public boolean isIdle() 836 { 837 return false; 838 } 839 isSuspended()840 public boolean isSuspended() 841 { 842 return false; 843 } 844 onClose()845 public void onClose() 846 { 847 } 848 setConnection(ProxyToServerConnection connection)849 public void setConnection(ProxyToServerConnection connection) 850 { 851 _toServer = connection; 852 } 853 closeClient()854 public void closeClient() throws IOException 855 { 856 _endPoint.close(); 857 } 858 closeServer()859 public void closeServer() throws IOException 860 { 861 _toServer.closeServer(); 862 } 863 close()864 public void close() 865 { 866 try 867 { 868 closeClient(); 869 } 870 catch (IOException x) 871 { 872 LOG.debug(this + ": unexpected exception closing the client", x); 873 } 874 875 try 876 { 877 closeServer(); 878 } 879 catch (IOException x) 880 { 881 LOG.debug(this + ": unexpected exception closing the server", x); 882 } 883 } 884 shutdownOutput()885 public void shutdownOutput() throws IOException 886 { 887 _endPoint.shutdownOutput(); 888 } 889 onIdleExpired(long idleForMs)890 public void onIdleExpired(long idleForMs) 891 { 892 try 893 { 894 LOG.debug("{} idle expired", this); 895 if (_endPoint.isOutputShutdown()) 896 close(); 897 else 898 shutdownOutput(); 899 } 900 catch(Exception e) 901 { 902 LOG.debug(e); 903 close(); 904 } 905 } 906 } 907 908 /** 909 * Add a whitelist entry to an existing handler configuration 910 * 911 * @param entry new whitelist entry 912 */ addWhite(String entry)913 public void addWhite(String entry) 914 { 915 add(entry, _white); 916 } 917 918 /** 919 * Add a blacklist entry to an existing handler configuration 920 * 921 * @param entry new blacklist entry 922 */ addBlack(String entry)923 public void addBlack(String entry) 924 { 925 add(entry, _black); 926 } 927 928 /** 929 * Re-initialize the whitelist of existing handler object 930 * 931 * @param entries array of whitelist entries 932 */ setWhite(String[] entries)933 public void setWhite(String[] entries) 934 { 935 set(entries, _white); 936 } 937 938 /** 939 * Re-initialize the blacklist of existing handler object 940 * 941 * @param entries array of blacklist entries 942 */ setBlack(String[] entries)943 public void setBlack(String[] entries) 944 { 945 set(entries, _black); 946 } 947 948 /** 949 * Helper method to process a list of new entries and replace 950 * the content of the specified host map 951 * 952 * @param entries new entries 953 * @param hostMap target host map 954 */ set(String[] entries, HostMap<String> hostMap)955 protected void set(String[] entries, HostMap<String> hostMap) 956 { 957 hostMap.clear(); 958 959 if (entries != null && entries.length > 0) 960 { 961 for (String addrPath : entries) 962 { 963 add(addrPath, hostMap); 964 } 965 } 966 } 967 968 /** 969 * Helper method to process the new entry and add it to 970 * the specified host map. 971 * 972 * @param entry new entry 973 * @param hostMap target host map 974 */ add(String entry, HostMap<String> hostMap)975 private void add(String entry, HostMap<String> hostMap) 976 { 977 if (entry != null && entry.length() > 0) 978 { 979 entry = entry.trim(); 980 if (hostMap.get(entry) == null) 981 { 982 hostMap.put(entry, entry); 983 } 984 } 985 } 986 987 /** 988 * Check the request hostname against white- and blacklist. 989 * 990 * @param host hostname to check 991 * @return true if hostname is allowed to be proxied 992 */ validateDestination(String host)993 public boolean validateDestination(String host) 994 { 995 if (_white.size() > 0) 996 { 997 Object whiteObj = _white.getLazyMatches(host); 998 if (whiteObj == null) 999 { 1000 return false; 1001 } 1002 } 1003 1004 if (_black.size() > 0) 1005 { 1006 Object blackObj = _black.getLazyMatches(host); 1007 if (blackObj != null) 1008 { 1009 return false; 1010 } 1011 } 1012 1013 return true; 1014 } 1015 1016 @Override dump(Appendable out, String indent)1017 public void dump(Appendable out, String indent) throws IOException 1018 { 1019 dumpThis(out); 1020 if (_privateThreadPool) 1021 dump(out, indent, Arrays.asList(_threadPool, _selectorManager), TypeUtil.asList(getHandlers()), getBeans()); 1022 else 1023 dump(out, indent, Arrays.asList(_selectorManager), TypeUtil.asList(getHandlers()), getBeans()); 1024 } 1025 } 1026