1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 package org.eclipse.jetty.client;
20 
21 import java.io.IOException;
22 import java.lang.reflect.Constructor;
23 import java.net.ProtocolException;
24 import java.util.ArrayList;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.RejectedExecutionException;
30 
31 import org.eclipse.jetty.client.HttpClient.Connector;
32 import org.eclipse.jetty.client.security.Authentication;
33 import org.eclipse.jetty.client.security.SecurityListener;
34 import org.eclipse.jetty.http.HttpCookie;
35 import org.eclipse.jetty.http.HttpHeaders;
36 import org.eclipse.jetty.http.HttpMethods;
37 import org.eclipse.jetty.http.HttpStatus;
38 import org.eclipse.jetty.http.PathMap;
39 import org.eclipse.jetty.io.Buffer;
40 import org.eclipse.jetty.io.ByteArrayBuffer;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.component.AggregateLifeCycle;
44 import org.eclipse.jetty.util.component.Dumpable;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47 import org.eclipse.jetty.util.ssl.SslContextFactory;
48 
49 /**
50  * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
51  */
52 public class HttpDestination implements Dumpable
53 {
54     private static final Logger LOG = Log.getLogger(HttpDestination.class);
55 
56     private final List<HttpExchange> _exchanges = new LinkedList<HttpExchange>();
57     private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
58     private final BlockingQueue<Object> _reservedConnections = new ArrayBlockingQueue<Object>(10, true);
59     private final List<AbstractHttpConnection> _idleConnections = new ArrayList<AbstractHttpConnection>();
60     private final HttpClient _client;
61     private final Address _address;
62     private final boolean _ssl;
63     private final SslContextFactory _sslContextFactory;
64     private final ByteArrayBuffer _hostHeader;
65     private volatile int _maxConnections;
66     private volatile int _maxQueueSize;
67     private int _pendingConnections = 0;
68     private int _pendingReservedConnections = 0;
69     private volatile Address _proxy;
70     private Authentication _proxyAuthentication;
71     private PathMap _authorizations;
72     private List<HttpCookie> _cookies;
73 
HttpDestination(HttpClient client, Address address, boolean ssl, SslContextFactory sslContextFactory)74     HttpDestination(HttpClient client, Address address, boolean ssl, SslContextFactory sslContextFactory)
75     {
76         _client = client;
77         _address = address;
78         _ssl = ssl;
79         _sslContextFactory = sslContextFactory;
80         _maxConnections = _client.getMaxConnectionsPerAddress();
81         _maxQueueSize = _client.getMaxQueueSizePerAddress();
82         String addressString = address.getHost();
83         if (address.getPort() != (_ssl ? 443 : 80))
84             addressString += ":" + address.getPort();
85         _hostHeader = new ByteArrayBuffer(addressString);
86     }
87 
getHttpClient()88     public HttpClient getHttpClient()
89     {
90         return _client;
91     }
92 
getAddress()93     public Address getAddress()
94     {
95         return _address;
96     }
97 
isSecure()98     public boolean isSecure()
99     {
100         return _ssl;
101     }
102 
getSslContextFactory()103     public SslContextFactory getSslContextFactory()
104     {
105         return _sslContextFactory;
106     }
107 
getHostHeader()108     public Buffer getHostHeader()
109     {
110         return _hostHeader;
111     }
112 
getMaxConnections()113     public int getMaxConnections()
114     {
115         return _maxConnections;
116     }
117 
setMaxConnections(int maxConnections)118     public void setMaxConnections(int maxConnections)
119     {
120         this._maxConnections = maxConnections;
121     }
122 
getMaxQueueSize()123     public int getMaxQueueSize()
124     {
125         return _maxQueueSize;
126     }
127 
setMaxQueueSize(int maxQueueSize)128     public void setMaxQueueSize(int maxQueueSize)
129     {
130         this._maxQueueSize = maxQueueSize;
131     }
132 
getConnections()133     public int getConnections()
134     {
135         synchronized (this)
136         {
137             return _connections.size();
138         }
139     }
140 
getIdleConnections()141     public int getIdleConnections()
142     {
143         synchronized (this)
144         {
145             return _idleConnections.size();
146         }
147     }
148 
addAuthorization(String pathSpec, Authentication authorization)149     public void addAuthorization(String pathSpec, Authentication authorization)
150     {
151         synchronized (this)
152         {
153             if (_authorizations == null)
154                 _authorizations = new PathMap();
155             _authorizations.put(pathSpec, authorization);
156         }
157 
158         // TODO query and remove methods
159     }
160 
addCookie(HttpCookie cookie)161     public void addCookie(HttpCookie cookie)
162     {
163         synchronized (this)
164         {
165             if (_cookies == null)
166                 _cookies = new ArrayList<HttpCookie>();
167             _cookies.add(cookie);
168         }
169 
170         // TODO query, remove and age methods
171     }
172 
173     /**
174      * Get a connection. We either get an idle connection if one is available, or
175      * we make a new connection, if we have not yet reached maxConnections. If we
176      * have reached maxConnections, we wait until the number reduces.
177      *
178      * @param timeout max time prepared to block waiting to be able to get a connection
179      * @return a HttpConnection for this destination
180      * @throws IOException if an I/O error occurs
181      */
getConnection(long timeout)182     private AbstractHttpConnection getConnection(long timeout) throws IOException
183     {
184         AbstractHttpConnection connection = null;
185 
186         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
187         {
188             boolean startConnection = false;
189             synchronized (this)
190             {
191                 int totalConnections = _connections.size() + _pendingConnections;
192                 if (totalConnections < _maxConnections)
193                 {
194                     _pendingReservedConnections++;
195                     startConnection = true;
196                 }
197             }
198 
199             if (startConnection)
200             {
201                 startNewConnection();
202                 try
203                 {
204                     Object o = _reservedConnections.take();
205                     if (o instanceof AbstractHttpConnection)
206                     {
207                         connection = (AbstractHttpConnection)o;
208                     }
209                     else
210                         throw (IOException)o;
211                 }
212                 catch (InterruptedException e)
213                 {
214                     LOG.ignore(e);
215                 }
216             }
217             else
218             {
219                 try
220                 {
221                     Thread.currentThread();
222                     Thread.sleep(200);
223                     timeout -= 200;
224                 }
225                 catch (InterruptedException e)
226                 {
227                     LOG.ignore(e);
228                 }
229             }
230         }
231         return connection;
232     }
233 
reserveConnection(long timeout)234     public AbstractHttpConnection reserveConnection(long timeout) throws IOException
235     {
236         AbstractHttpConnection connection = getConnection(timeout);
237         if (connection != null)
238             connection.setReserved(true);
239         return connection;
240     }
241 
getIdleConnection()242     public AbstractHttpConnection getIdleConnection() throws IOException
243     {
244         AbstractHttpConnection connection = null;
245         while (true)
246         {
247             synchronized (this)
248             {
249                 if (connection != null)
250                 {
251                     _connections.remove(connection);
252                     connection.close();
253                     connection = null;
254                 }
255                 if (_idleConnections.size() > 0)
256                     connection = _idleConnections.remove(_idleConnections.size() - 1);
257             }
258 
259             if (connection == null)
260             {
261                 return null;
262             }
263 
264             // Check if the connection was idle,
265             // but it expired just a moment ago
266             if (connection.cancelIdleTimeout())
267             {
268                 return connection;
269             }
270         }
271     }
272 
startNewConnection()273     protected void startNewConnection()
274     {
275         try
276         {
277             synchronized (this)
278             {
279                 _pendingConnections++;
280             }
281             final Connector connector = _client._connector;
282             if (connector != null)
283                 connector.startConnection(this);
284         }
285         catch (Exception e)
286         {
287             LOG.debug(e);
288             onConnectionFailed(e);
289         }
290     }
291 
onConnectionFailed(Throwable throwable)292     public void onConnectionFailed(Throwable throwable)
293     {
294         Throwable connect_failure = null;
295 
296         boolean startConnection = false;
297         synchronized (this)
298         {
299             _pendingConnections--;
300             if (_pendingReservedConnections > 0)
301             {
302                 connect_failure = throwable;
303                 _pendingReservedConnections--;
304             }
305             else if (_exchanges.size() > 0)
306             {
307                 HttpExchange ex = _exchanges.remove(0);
308                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
309                     ex.getEventListener().onConnectionFailed(throwable);
310 
311                 // Since an existing connection had failed, we need to create a
312                 // connection if the  queue is not empty and client is running.
313                 if (!_exchanges.isEmpty() && _client.isStarted())
314                     startConnection = true;
315             }
316         }
317 
318         if (startConnection)
319             startNewConnection();
320 
321         if (connect_failure != null)
322         {
323             try
324             {
325                 _reservedConnections.put(connect_failure);
326             }
327             catch (InterruptedException e)
328             {
329                 LOG.ignore(e);
330             }
331         }
332     }
333 
onException(Throwable throwable)334     public void onException(Throwable throwable)
335     {
336         synchronized (this)
337         {
338             _pendingConnections--;
339             if (_exchanges.size() > 0)
340             {
341                 HttpExchange ex = _exchanges.remove(0);
342                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
343                     ex.getEventListener().onException(throwable);
344             }
345         }
346     }
347 
onNewConnection(final AbstractHttpConnection connection)348     public void onNewConnection(final AbstractHttpConnection connection) throws IOException
349     {
350         Connection reservedConnection = null;
351 
352         synchronized (this)
353         {
354             _pendingConnections--;
355             _connections.add(connection);
356 
357             if (_pendingReservedConnections > 0)
358             {
359                 reservedConnection = connection;
360                 _pendingReservedConnections--;
361             }
362             else
363             {
364                 // Establish the tunnel if needed
365                 EndPoint endPoint = connection.getEndPoint();
366                 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
367                 {
368                     SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
369                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint);
370                     connect.setAddress(getProxy());
371                     LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy());
372                     send(connection, connect);
373                 }
374                 else
375                 {
376                     // Another connection stole the exchange that caused the creation of this connection ?
377                     if (_exchanges.size() == 0)
378                     {
379                         LOG.debug("No exchanges for new connection {}", connection);
380                         connection.setIdleTimeout();
381                         _idleConnections.add(connection);
382                     }
383                     else
384                     {
385                         HttpExchange exchange = _exchanges.remove(0);
386                         send(connection, exchange);
387                     }
388                 }
389             }
390         }
391 
392         if (reservedConnection != null)
393         {
394             try
395             {
396                 _reservedConnections.put(reservedConnection);
397             }
398             catch (InterruptedException e)
399             {
400                 LOG.ignore(e);
401             }
402         }
403     }
404 
returnConnection(AbstractHttpConnection connection, boolean close)405     public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
406     {
407         if (connection.isReserved())
408             connection.setReserved(false);
409 
410         if (close)
411         {
412             try
413             {
414                 connection.close();
415             }
416             catch (IOException e)
417             {
418                 LOG.ignore(e);
419             }
420         }
421 
422         if (!_client.isStarted())
423             return;
424 
425         if (!close && connection.getEndPoint().isOpen())
426         {
427             synchronized (this)
428             {
429                 if (_exchanges.size() == 0)
430                 {
431                     connection.setIdleTimeout();
432                     _idleConnections.add(connection);
433                 }
434                 else
435                 {
436                     HttpExchange ex = _exchanges.remove(0);
437                     send(connection, ex);
438                 }
439                 this.notifyAll();
440             }
441         }
442         else
443         {
444             boolean startConnection = false;
445             synchronized (this)
446             {
447                 _connections.remove(connection);
448                 if (!_exchanges.isEmpty())
449                     startConnection = true;
450             }
451 
452             if (startConnection)
453                 startNewConnection();
454         }
455     }
456 
returnIdleConnection(AbstractHttpConnection connection)457     public void returnIdleConnection(AbstractHttpConnection connection)
458     {
459         // TODO work out the real idle time;
460         long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1;
461         connection.onIdleExpired(idleForMs);
462 
463         boolean startConnection = false;
464         synchronized (this)
465         {
466             _idleConnections.remove(connection);
467             _connections.remove(connection);
468 
469             if (!_exchanges.isEmpty() && _client.isStarted())
470                 startConnection = true;
471         }
472 
473         if (startConnection)
474             startNewConnection();
475     }
476 
send(HttpExchange ex)477     public void send(HttpExchange ex) throws IOException
478     {
479         ex.setStatus(HttpExchange.STATUS_WAITING_FOR_CONNECTION);
480 
481         LinkedList<String> listeners = _client.getRegisteredListeners();
482         if (listeners != null)
483         {
484             // Add registered listeners, fail if we can't load them
485             for (int i = listeners.size(); i > 0; --i)
486             {
487                 String listenerClass = listeners.get(i - 1);
488                 try
489                 {
490                     Class<?> listener = Class.forName(listenerClass);
491                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
492                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
493                     ex.setEventListener(elistener);
494                 }
495                 catch (final Exception e)
496                 {
497                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
498                     {
499                         {
500                             initCause(e);
501                         }
502                     };
503                 }
504             }
505         }
506 
507         // Security is supported by default and should be the first consulted
508         if (_client.hasRealms())
509         {
510             ex.setEventListener(new SecurityListener(this, ex));
511         }
512 
513         doSend(ex);
514     }
515 
resend(HttpExchange ex)516     public void resend(HttpExchange ex) throws IOException
517     {
518         ex.getEventListener().onRetry();
519         ex.reset();
520         doSend(ex);
521     }
522 
doSend(HttpExchange ex)523     protected void doSend(HttpExchange ex) throws IOException
524     {
525         // add cookies
526         // TODO handle max-age etc.
527         if (_cookies != null)
528         {
529             StringBuilder buf = null;
530             for (HttpCookie cookie : _cookies)
531             {
532                 if (buf == null)
533                     buf = new StringBuilder();
534                 else
535                     buf.append("; ");
536                 buf.append(cookie.getName()); // TODO quotes
537                 buf.append("=");
538                 buf.append(cookie.getValue()); // TODO quotes
539             }
540             if (buf != null)
541                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
542         }
543 
544         // Add any known authorizations
545         if (_authorizations != null)
546         {
547             Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
548             if (auth != null)
549                 (auth).setCredentials(ex);
550         }
551 
552         // Schedule the timeout here, before we queue the exchange
553         // so that we count also the queue time in the timeout
554         ex.scheduleTimeout(this);
555 
556         AbstractHttpConnection connection = getIdleConnection();
557         if (connection != null)
558         {
559             send(connection, ex);
560         }
561         else
562         {
563             boolean startConnection = false;
564             synchronized (this)
565             {
566                 if (_exchanges.size() == _maxQueueSize)
567                     throw new RejectedExecutionException("Queue full for address " + _address);
568 
569                 _exchanges.add(ex);
570                 if (_connections.size() + _pendingConnections < _maxConnections)
571                     startConnection = true;
572             }
573 
574             if (startConnection)
575                 startNewConnection();
576         }
577     }
578 
exchangeExpired(HttpExchange exchange)579     protected void exchangeExpired(HttpExchange exchange)
580     {
581         // The exchange may expire while waiting in the
582         // destination queue, make sure it is removed
583         synchronized (this)
584         {
585             _exchanges.remove(exchange);
586         }
587     }
588 
send(AbstractHttpConnection connection, HttpExchange exchange)589     protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
590     {
591         synchronized (this)
592         {
593             // If server closes the connection, put the exchange back
594             // to the exchange queue and recycle the connection
595             if (!connection.send(exchange))
596             {
597                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
598                     _exchanges.add(0, exchange);
599                 returnIdleConnection(connection);
600             }
601         }
602     }
603 
604     @Override
toString()605     public synchronized String toString()
606     {
607         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize);
608     }
609 
toDetailString()610     public synchronized String toDetailString()
611     {
612         StringBuilder b = new StringBuilder();
613         b.append(toString());
614         b.append('\n');
615         synchronized (this)
616         {
617             for (AbstractHttpConnection connection : _connections)
618             {
619                 b.append(connection.toDetailString());
620                 if (_idleConnections.contains(connection))
621                     b.append(" IDLE");
622                 b.append('\n');
623             }
624         }
625         b.append("--");
626         b.append('\n');
627 
628         return b.toString();
629     }
630 
setProxy(Address proxy)631     public void setProxy(Address proxy)
632     {
633         _proxy = proxy;
634     }
635 
getProxy()636     public Address getProxy()
637     {
638         return _proxy;
639     }
640 
getProxyAuthentication()641     public Authentication getProxyAuthentication()
642     {
643         return _proxyAuthentication;
644     }
645 
setProxyAuthentication(Authentication authentication)646     public void setProxyAuthentication(Authentication authentication)
647     {
648         _proxyAuthentication = authentication;
649     }
650 
isProxied()651     public boolean isProxied()
652     {
653         return _proxy != null;
654     }
655 
close()656     public void close() throws IOException
657     {
658         synchronized (this)
659         {
660             for (AbstractHttpConnection connection : _connections)
661             {
662                 connection.close();
663             }
664         }
665     }
666 
dump()667     public String dump()
668     {
669         return AggregateLifeCycle.dump(this);
670     }
671 
dump(Appendable out, String indent)672     public void dump(Appendable out, String indent) throws IOException
673     {
674         synchronized (this)
675         {
676             out.append(String.valueOf(this));
677             out.append("idle=");
678             out.append(String.valueOf(_idleConnections.size()));
679             out.append(" pending=");
680             out.append(String.valueOf(_pendingConnections));
681             out.append("\n");
682             AggregateLifeCycle.dump(out, indent, _connections);
683         }
684     }
685 
686     private class ConnectExchange extends ContentExchange
687     {
688         private final SelectConnector.UpgradableEndPoint proxyEndPoint;
689 
ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)690         public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)
691         {
692             this.proxyEndPoint = proxyEndPoint;
693             setMethod(HttpMethods.CONNECT);
694             String serverHostAndPort = serverAddress.toString();
695             setRequestURI(serverHostAndPort);
696             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
697             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
698             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
699         }
700 
701         @Override
onResponseComplete()702         protected void onResponseComplete() throws IOException
703         {
704             int responseStatus = getResponseStatus();
705             if (responseStatus == HttpStatus.OK_200)
706             {
707                 proxyEndPoint.upgrade();
708             }
709             else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
710             {
711                 onExpire();
712             }
713             else
714             {
715                 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus));
716             }
717         }
718 
719         @Override
onConnectionFailed(Throwable x)720         protected void onConnectionFailed(Throwable x)
721         {
722             HttpDestination.this.onConnectionFailed(x);
723         }
724 
725         @Override
onException(Throwable x)726         protected void onException(Throwable x)
727         {
728             HttpExchange exchange = null;
729             synchronized (HttpDestination.this)
730             {
731                 if (!_exchanges.isEmpty())
732                     exchange = _exchanges.remove(0);
733             }
734             if (exchange != null && exchange.setStatus(STATUS_EXCEPTED))
735                 exchange.getEventListener().onException(x);
736         }
737 
738         @Override
onExpire()739         protected void onExpire()
740         {
741             HttpExchange exchange = null;
742             synchronized (HttpDestination.this)
743             {
744                 if (!_exchanges.isEmpty())
745                     exchange = _exchanges.remove(0);
746             }
747             if (exchange != null && exchange.setStatus(STATUS_EXPIRED))
748                 exchange.getEventListener().onExpire();
749         }
750     }
751 }
752