1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.client; 20 21 import java.io.IOException; 22 import java.lang.reflect.Constructor; 23 import java.net.ProtocolException; 24 import java.util.ArrayList; 25 import java.util.LinkedList; 26 import java.util.List; 27 import java.util.concurrent.ArrayBlockingQueue; 28 import java.util.concurrent.BlockingQueue; 29 import java.util.concurrent.RejectedExecutionException; 30 31 import org.eclipse.jetty.client.HttpClient.Connector; 32 import org.eclipse.jetty.client.security.Authentication; 33 import org.eclipse.jetty.client.security.SecurityListener; 34 import org.eclipse.jetty.http.HttpCookie; 35 import org.eclipse.jetty.http.HttpHeaders; 36 import org.eclipse.jetty.http.HttpMethods; 37 import org.eclipse.jetty.http.HttpStatus; 38 import org.eclipse.jetty.http.PathMap; 39 import org.eclipse.jetty.io.Buffer; 40 import org.eclipse.jetty.io.ByteArrayBuffer; 41 import org.eclipse.jetty.io.Connection; 42 import org.eclipse.jetty.io.EndPoint; 43 import org.eclipse.jetty.util.component.AggregateLifeCycle; 44 import org.eclipse.jetty.util.component.Dumpable; 45 import org.eclipse.jetty.util.log.Log; 46 import org.eclipse.jetty.util.log.Logger; 47 import org.eclipse.jetty.util.ssl.SslContextFactory; 48 49 /** 50 * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $ 51 */ 52 public class HttpDestination implements Dumpable 53 { 54 private static final Logger LOG = Log.getLogger(HttpDestination.class); 55 56 private final List<HttpExchange> _exchanges = new LinkedList<HttpExchange>(); 57 private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>(); 58 private final BlockingQueue<Object> _reservedConnections = new ArrayBlockingQueue<Object>(10, true); 59 private final List<AbstractHttpConnection> _idleConnections = new ArrayList<AbstractHttpConnection>(); 60 private final HttpClient _client; 61 private final Address _address; 62 private final boolean _ssl; 63 private final SslContextFactory _sslContextFactory; 64 private final ByteArrayBuffer _hostHeader; 65 private volatile int _maxConnections; 66 private volatile int _maxQueueSize; 67 private int _pendingConnections = 0; 68 private int _pendingReservedConnections = 0; 69 private volatile Address _proxy; 70 private Authentication _proxyAuthentication; 71 private PathMap _authorizations; 72 private List<HttpCookie> _cookies; 73 HttpDestination(HttpClient client, Address address, boolean ssl, SslContextFactory sslContextFactory)74 HttpDestination(HttpClient client, Address address, boolean ssl, SslContextFactory sslContextFactory) 75 { 76 _client = client; 77 _address = address; 78 _ssl = ssl; 79 _sslContextFactory = sslContextFactory; 80 _maxConnections = _client.getMaxConnectionsPerAddress(); 81 _maxQueueSize = _client.getMaxQueueSizePerAddress(); 82 String addressString = address.getHost(); 83 if (address.getPort() != (_ssl ? 443 : 80)) 84 addressString += ":" + address.getPort(); 85 _hostHeader = new ByteArrayBuffer(addressString); 86 } 87 getHttpClient()88 public HttpClient getHttpClient() 89 { 90 return _client; 91 } 92 getAddress()93 public Address getAddress() 94 { 95 return _address; 96 } 97 isSecure()98 public boolean isSecure() 99 { 100 return _ssl; 101 } 102 getSslContextFactory()103 public SslContextFactory getSslContextFactory() 104 { 105 return _sslContextFactory; 106 } 107 getHostHeader()108 public Buffer getHostHeader() 109 { 110 return _hostHeader; 111 } 112 getMaxConnections()113 public int getMaxConnections() 114 { 115 return _maxConnections; 116 } 117 setMaxConnections(int maxConnections)118 public void setMaxConnections(int maxConnections) 119 { 120 this._maxConnections = maxConnections; 121 } 122 getMaxQueueSize()123 public int getMaxQueueSize() 124 { 125 return _maxQueueSize; 126 } 127 setMaxQueueSize(int maxQueueSize)128 public void setMaxQueueSize(int maxQueueSize) 129 { 130 this._maxQueueSize = maxQueueSize; 131 } 132 getConnections()133 public int getConnections() 134 { 135 synchronized (this) 136 { 137 return _connections.size(); 138 } 139 } 140 getIdleConnections()141 public int getIdleConnections() 142 { 143 synchronized (this) 144 { 145 return _idleConnections.size(); 146 } 147 } 148 addAuthorization(String pathSpec, Authentication authorization)149 public void addAuthorization(String pathSpec, Authentication authorization) 150 { 151 synchronized (this) 152 { 153 if (_authorizations == null) 154 _authorizations = new PathMap(); 155 _authorizations.put(pathSpec, authorization); 156 } 157 158 // TODO query and remove methods 159 } 160 addCookie(HttpCookie cookie)161 public void addCookie(HttpCookie cookie) 162 { 163 synchronized (this) 164 { 165 if (_cookies == null) 166 _cookies = new ArrayList<HttpCookie>(); 167 _cookies.add(cookie); 168 } 169 170 // TODO query, remove and age methods 171 } 172 173 /** 174 * Get a connection. We either get an idle connection if one is available, or 175 * we make a new connection, if we have not yet reached maxConnections. If we 176 * have reached maxConnections, we wait until the number reduces. 177 * 178 * @param timeout max time prepared to block waiting to be able to get a connection 179 * @return a HttpConnection for this destination 180 * @throws IOException if an I/O error occurs 181 */ getConnection(long timeout)182 private AbstractHttpConnection getConnection(long timeout) throws IOException 183 { 184 AbstractHttpConnection connection = null; 185 186 while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0) 187 { 188 boolean startConnection = false; 189 synchronized (this) 190 { 191 int totalConnections = _connections.size() + _pendingConnections; 192 if (totalConnections < _maxConnections) 193 { 194 _pendingReservedConnections++; 195 startConnection = true; 196 } 197 } 198 199 if (startConnection) 200 { 201 startNewConnection(); 202 try 203 { 204 Object o = _reservedConnections.take(); 205 if (o instanceof AbstractHttpConnection) 206 { 207 connection = (AbstractHttpConnection)o; 208 } 209 else 210 throw (IOException)o; 211 } 212 catch (InterruptedException e) 213 { 214 LOG.ignore(e); 215 } 216 } 217 else 218 { 219 try 220 { 221 Thread.currentThread(); 222 Thread.sleep(200); 223 timeout -= 200; 224 } 225 catch (InterruptedException e) 226 { 227 LOG.ignore(e); 228 } 229 } 230 } 231 return connection; 232 } 233 reserveConnection(long timeout)234 public AbstractHttpConnection reserveConnection(long timeout) throws IOException 235 { 236 AbstractHttpConnection connection = getConnection(timeout); 237 if (connection != null) 238 connection.setReserved(true); 239 return connection; 240 } 241 getIdleConnection()242 public AbstractHttpConnection getIdleConnection() throws IOException 243 { 244 AbstractHttpConnection connection = null; 245 while (true) 246 { 247 synchronized (this) 248 { 249 if (connection != null) 250 { 251 _connections.remove(connection); 252 connection.close(); 253 connection = null; 254 } 255 if (_idleConnections.size() > 0) 256 connection = _idleConnections.remove(_idleConnections.size() - 1); 257 } 258 259 if (connection == null) 260 { 261 return null; 262 } 263 264 // Check if the connection was idle, 265 // but it expired just a moment ago 266 if (connection.cancelIdleTimeout()) 267 { 268 return connection; 269 } 270 } 271 } 272 startNewConnection()273 protected void startNewConnection() 274 { 275 try 276 { 277 synchronized (this) 278 { 279 _pendingConnections++; 280 } 281 final Connector connector = _client._connector; 282 if (connector != null) 283 connector.startConnection(this); 284 } 285 catch (Exception e) 286 { 287 LOG.debug(e); 288 onConnectionFailed(e); 289 } 290 } 291 onConnectionFailed(Throwable throwable)292 public void onConnectionFailed(Throwable throwable) 293 { 294 Throwable connect_failure = null; 295 296 boolean startConnection = false; 297 synchronized (this) 298 { 299 _pendingConnections--; 300 if (_pendingReservedConnections > 0) 301 { 302 connect_failure = throwable; 303 _pendingReservedConnections--; 304 } 305 else if (_exchanges.size() > 0) 306 { 307 HttpExchange ex = _exchanges.remove(0); 308 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED)) 309 ex.getEventListener().onConnectionFailed(throwable); 310 311 // Since an existing connection had failed, we need to create a 312 // connection if the queue is not empty and client is running. 313 if (!_exchanges.isEmpty() && _client.isStarted()) 314 startConnection = true; 315 } 316 } 317 318 if (startConnection) 319 startNewConnection(); 320 321 if (connect_failure != null) 322 { 323 try 324 { 325 _reservedConnections.put(connect_failure); 326 } 327 catch (InterruptedException e) 328 { 329 LOG.ignore(e); 330 } 331 } 332 } 333 onException(Throwable throwable)334 public void onException(Throwable throwable) 335 { 336 synchronized (this) 337 { 338 _pendingConnections--; 339 if (_exchanges.size() > 0) 340 { 341 HttpExchange ex = _exchanges.remove(0); 342 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED)) 343 ex.getEventListener().onException(throwable); 344 } 345 } 346 } 347 onNewConnection(final AbstractHttpConnection connection)348 public void onNewConnection(final AbstractHttpConnection connection) throws IOException 349 { 350 Connection reservedConnection = null; 351 352 synchronized (this) 353 { 354 _pendingConnections--; 355 _connections.add(connection); 356 357 if (_pendingReservedConnections > 0) 358 { 359 reservedConnection = connection; 360 _pendingReservedConnections--; 361 } 362 else 363 { 364 // Establish the tunnel if needed 365 EndPoint endPoint = connection.getEndPoint(); 366 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint) 367 { 368 SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint; 369 ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint); 370 connect.setAddress(getProxy()); 371 LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy()); 372 send(connection, connect); 373 } 374 else 375 { 376 // Another connection stole the exchange that caused the creation of this connection ? 377 if (_exchanges.size() == 0) 378 { 379 LOG.debug("No exchanges for new connection {}", connection); 380 connection.setIdleTimeout(); 381 _idleConnections.add(connection); 382 } 383 else 384 { 385 HttpExchange exchange = _exchanges.remove(0); 386 send(connection, exchange); 387 } 388 } 389 } 390 } 391 392 if (reservedConnection != null) 393 { 394 try 395 { 396 _reservedConnections.put(reservedConnection); 397 } 398 catch (InterruptedException e) 399 { 400 LOG.ignore(e); 401 } 402 } 403 } 404 returnConnection(AbstractHttpConnection connection, boolean close)405 public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException 406 { 407 if (connection.isReserved()) 408 connection.setReserved(false); 409 410 if (close) 411 { 412 try 413 { 414 connection.close(); 415 } 416 catch (IOException e) 417 { 418 LOG.ignore(e); 419 } 420 } 421 422 if (!_client.isStarted()) 423 return; 424 425 if (!close && connection.getEndPoint().isOpen()) 426 { 427 synchronized (this) 428 { 429 if (_exchanges.size() == 0) 430 { 431 connection.setIdleTimeout(); 432 _idleConnections.add(connection); 433 } 434 else 435 { 436 HttpExchange ex = _exchanges.remove(0); 437 send(connection, ex); 438 } 439 this.notifyAll(); 440 } 441 } 442 else 443 { 444 boolean startConnection = false; 445 synchronized (this) 446 { 447 _connections.remove(connection); 448 if (!_exchanges.isEmpty()) 449 startConnection = true; 450 } 451 452 if (startConnection) 453 startNewConnection(); 454 } 455 } 456 returnIdleConnection(AbstractHttpConnection connection)457 public void returnIdleConnection(AbstractHttpConnection connection) 458 { 459 // TODO work out the real idle time; 460 long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1; 461 connection.onIdleExpired(idleForMs); 462 463 boolean startConnection = false; 464 synchronized (this) 465 { 466 _idleConnections.remove(connection); 467 _connections.remove(connection); 468 469 if (!_exchanges.isEmpty() && _client.isStarted()) 470 startConnection = true; 471 } 472 473 if (startConnection) 474 startNewConnection(); 475 } 476 send(HttpExchange ex)477 public void send(HttpExchange ex) throws IOException 478 { 479 ex.setStatus(HttpExchange.STATUS_WAITING_FOR_CONNECTION); 480 481 LinkedList<String> listeners = _client.getRegisteredListeners(); 482 if (listeners != null) 483 { 484 // Add registered listeners, fail if we can't load them 485 for (int i = listeners.size(); i > 0; --i) 486 { 487 String listenerClass = listeners.get(i - 1); 488 try 489 { 490 Class<?> listener = Class.forName(listenerClass); 491 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class); 492 HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex); 493 ex.setEventListener(elistener); 494 } 495 catch (final Exception e) 496 { 497 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass) 498 { 499 { 500 initCause(e); 501 } 502 }; 503 } 504 } 505 } 506 507 // Security is supported by default and should be the first consulted 508 if (_client.hasRealms()) 509 { 510 ex.setEventListener(new SecurityListener(this, ex)); 511 } 512 513 doSend(ex); 514 } 515 resend(HttpExchange ex)516 public void resend(HttpExchange ex) throws IOException 517 { 518 ex.getEventListener().onRetry(); 519 ex.reset(); 520 doSend(ex); 521 } 522 doSend(HttpExchange ex)523 protected void doSend(HttpExchange ex) throws IOException 524 { 525 // add cookies 526 // TODO handle max-age etc. 527 if (_cookies != null) 528 { 529 StringBuilder buf = null; 530 for (HttpCookie cookie : _cookies) 531 { 532 if (buf == null) 533 buf = new StringBuilder(); 534 else 535 buf.append("; "); 536 buf.append(cookie.getName()); // TODO quotes 537 buf.append("="); 538 buf.append(cookie.getValue()); // TODO quotes 539 } 540 if (buf != null) 541 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString()); 542 } 543 544 // Add any known authorizations 545 if (_authorizations != null) 546 { 547 Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI()); 548 if (auth != null) 549 (auth).setCredentials(ex); 550 } 551 552 // Schedule the timeout here, before we queue the exchange 553 // so that we count also the queue time in the timeout 554 ex.scheduleTimeout(this); 555 556 AbstractHttpConnection connection = getIdleConnection(); 557 if (connection != null) 558 { 559 send(connection, ex); 560 } 561 else 562 { 563 boolean startConnection = false; 564 synchronized (this) 565 { 566 if (_exchanges.size() == _maxQueueSize) 567 throw new RejectedExecutionException("Queue full for address " + _address); 568 569 _exchanges.add(ex); 570 if (_connections.size() + _pendingConnections < _maxConnections) 571 startConnection = true; 572 } 573 574 if (startConnection) 575 startNewConnection(); 576 } 577 } 578 exchangeExpired(HttpExchange exchange)579 protected void exchangeExpired(HttpExchange exchange) 580 { 581 // The exchange may expire while waiting in the 582 // destination queue, make sure it is removed 583 synchronized (this) 584 { 585 _exchanges.remove(exchange); 586 } 587 } 588 send(AbstractHttpConnection connection, HttpExchange exchange)589 protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException 590 { 591 synchronized (this) 592 { 593 // If server closes the connection, put the exchange back 594 // to the exchange queue and recycle the connection 595 if (!connection.send(exchange)) 596 { 597 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION) 598 _exchanges.add(0, exchange); 599 returnIdleConnection(connection); 600 } 601 } 602 } 603 604 @Override toString()605 public synchronized String toString() 606 { 607 return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize); 608 } 609 toDetailString()610 public synchronized String toDetailString() 611 { 612 StringBuilder b = new StringBuilder(); 613 b.append(toString()); 614 b.append('\n'); 615 synchronized (this) 616 { 617 for (AbstractHttpConnection connection : _connections) 618 { 619 b.append(connection.toDetailString()); 620 if (_idleConnections.contains(connection)) 621 b.append(" IDLE"); 622 b.append('\n'); 623 } 624 } 625 b.append("--"); 626 b.append('\n'); 627 628 return b.toString(); 629 } 630 setProxy(Address proxy)631 public void setProxy(Address proxy) 632 { 633 _proxy = proxy; 634 } 635 getProxy()636 public Address getProxy() 637 { 638 return _proxy; 639 } 640 getProxyAuthentication()641 public Authentication getProxyAuthentication() 642 { 643 return _proxyAuthentication; 644 } 645 setProxyAuthentication(Authentication authentication)646 public void setProxyAuthentication(Authentication authentication) 647 { 648 _proxyAuthentication = authentication; 649 } 650 isProxied()651 public boolean isProxied() 652 { 653 return _proxy != null; 654 } 655 close()656 public void close() throws IOException 657 { 658 synchronized (this) 659 { 660 for (AbstractHttpConnection connection : _connections) 661 { 662 connection.close(); 663 } 664 } 665 } 666 dump()667 public String dump() 668 { 669 return AggregateLifeCycle.dump(this); 670 } 671 dump(Appendable out, String indent)672 public void dump(Appendable out, String indent) throws IOException 673 { 674 synchronized (this) 675 { 676 out.append(String.valueOf(this)); 677 out.append("idle="); 678 out.append(String.valueOf(_idleConnections.size())); 679 out.append(" pending="); 680 out.append(String.valueOf(_pendingConnections)); 681 out.append("\n"); 682 AggregateLifeCycle.dump(out, indent, _connections); 683 } 684 } 685 686 private class ConnectExchange extends ContentExchange 687 { 688 private final SelectConnector.UpgradableEndPoint proxyEndPoint; 689 ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)690 public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint) 691 { 692 this.proxyEndPoint = proxyEndPoint; 693 setMethod(HttpMethods.CONNECT); 694 String serverHostAndPort = serverAddress.toString(); 695 setRequestURI(serverHostAndPort); 696 addRequestHeader(HttpHeaders.HOST, serverHostAndPort); 697 addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive"); 698 addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client"); 699 } 700 701 @Override onResponseComplete()702 protected void onResponseComplete() throws IOException 703 { 704 int responseStatus = getResponseStatus(); 705 if (responseStatus == HttpStatus.OK_200) 706 { 707 proxyEndPoint.upgrade(); 708 } 709 else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504) 710 { 711 onExpire(); 712 } 713 else 714 { 715 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus)); 716 } 717 } 718 719 @Override onConnectionFailed(Throwable x)720 protected void onConnectionFailed(Throwable x) 721 { 722 HttpDestination.this.onConnectionFailed(x); 723 } 724 725 @Override onException(Throwable x)726 protected void onException(Throwable x) 727 { 728 HttpExchange exchange = null; 729 synchronized (HttpDestination.this) 730 { 731 if (!_exchanges.isEmpty()) 732 exchange = _exchanges.remove(0); 733 } 734 if (exchange != null && exchange.setStatus(STATUS_EXCEPTED)) 735 exchange.getEventListener().onException(x); 736 } 737 738 @Override onExpire()739 protected void onExpire() 740 { 741 HttpExchange exchange = null; 742 synchronized (HttpDestination.this) 743 { 744 if (!_exchanges.isEmpty()) 745 exchange = _exchanges.remove(0); 746 } 747 if (exchange != null && exchange.setStatus(STATUS_EXPIRED)) 748 exchange.getEventListener().onExpire(); 749 } 750 } 751 } 752