1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 package org.eclipse.jetty.client;
20 
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 
24 import org.eclipse.jetty.http.AbstractGenerator;
25 import org.eclipse.jetty.http.HttpStatus;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.Buffers;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 
33 
34 /* ------------------------------------------------------------ */
35 /** Blocking HTTP Connection
36  */
37 public class BlockingHttpConnection extends AbstractHttpConnection
38 {
39     private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
40 
41     private boolean _requestComplete;
42     private Buffer _requestContentChunk;
43     private boolean _expired=false;
44 
BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)45     BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
46     {
47         super(requestBuffers, responseBuffers, endPoint);
48     }
49 
reset()50     protected void reset() throws IOException
51     {
52         _requestComplete = false;
53         _expired = false;
54         super.reset();
55     }
56 
57 
58     @Override
exchangeExpired(HttpExchange exchange)59     protected void exchangeExpired(HttpExchange exchange)
60     {
61         synchronized (this)
62         {
63            super.exchangeExpired(exchange);
64            _expired = true;
65            this.notifyAll();
66         }
67     }
68 
69 
70 
71     @Override
onIdleExpired(long idleForMs)72     public void onIdleExpired(long idleForMs)
73     {
74         try
75         {
76             LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
77             _expired = true;
78             _endp.close();
79         }
80         catch(IOException e)
81         {
82             LOG.ignore(e);
83 
84             try
85             {
86                 _endp.close();
87             }
88             catch(IOException e2)
89             {
90                 LOG.ignore(e2);
91             }
92         }
93 
94         synchronized(this)
95         {
96             this.notifyAll();
97         }
98     }
99 
100     @Override
handle()101     public Connection handle() throws IOException
102     {
103         Connection connection = this;
104 
105         try
106         {
107             boolean failed = false;
108 
109 
110             // While we are making progress and have not changed connection
111             while (_endp.isOpen() && connection==this)
112             {
113                 LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
114 
115                 HttpExchange exchange;
116                 synchronized (this)
117                 {
118                     exchange=_exchange;
119                     while (exchange == null)
120                     {
121                         try
122                         {
123                             this.wait();
124                             exchange=_exchange;
125                             if (_expired)
126                             {
127                                 failed = true;
128                                 throw new InterruptedException();
129                             }
130 
131                         }
132                         catch (InterruptedException e)
133                         {
134                             throw new InterruptedIOException();
135                         }
136                     }
137                 }
138                 LOG.debug("exchange {}",exchange);
139 
140                 try
141                 {
142                     // Should we commit the request?
143                     if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
144                     {
145                         LOG.debug("commit");
146                         commitRequest();
147                     }
148 
149                     // Generate output
150                     while (_generator.isCommitted() && !_generator.isComplete())
151                     {
152                         if (_generator.flushBuffer()>0)
153                         {
154                             LOG.debug("flushed");
155                         }
156 
157                         // Is there more content to send or should we complete the generator
158                         if (_generator.isState(AbstractGenerator.STATE_CONTENT))
159                         {
160                             // Look for more content to send.
161                             if (_requestContentChunk==null)
162                                 _requestContentChunk = exchange.getRequestContentChunk(null);
163 
164                             if (_requestContentChunk==null)
165                             {
166                                 LOG.debug("complete");
167                                 _generator.complete();
168                             }
169                             else if (_generator.isEmpty())
170                             {
171                                 LOG.debug("addChunk");
172                                 Buffer chunk=_requestContentChunk;
173                                 _requestContentChunk=exchange.getRequestContentChunk(null);
174                                 _generator.addContent(chunk,_requestContentChunk==null);
175                                 if (_requestContentChunk==null)
176                                     exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
177                             }
178                         }
179                     }
180 
181                     // Signal request completion
182                     if (_generator.isComplete() && !_requestComplete)
183                     {
184                         LOG.debug("requestComplete");
185                         _requestComplete = true;
186                         exchange.getEventListener().onRequestComplete();
187                     }
188 
189                     // Read any input that is available
190                     if (!_parser.isComplete() && _parser.parseAvailable())
191                     {
192                         LOG.debug("parsed");
193                     }
194 
195                     // Flush output
196                     _endp.flush();
197                 }
198                 catch (Throwable e)
199                 {
200                     LOG.debug("Failure on " + _exchange, e);
201 
202                     failed = true;
203 
204                     synchronized (this)
205                     {
206                         if (exchange != null)
207                         {
208                             // Cancelling the exchange causes an exception as we close the connection,
209                             // but we don't report it as it is normal cancelling operation
210                             if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
211                                     exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
212                                     !exchange.isDone())
213                             {
214                                 if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
215                                     exchange.getEventListener().onException(e);
216                             }
217                         }
218                         else
219                         {
220                             if (e instanceof IOException)
221                                 throw (IOException)e;
222                             if (e instanceof Error)
223                                 throw (Error)e;
224                             if (e instanceof RuntimeException)
225                                 throw (RuntimeException)e;
226                             throw new RuntimeException(e);
227                         }
228                     }
229                 }
230                 finally
231                 {
232                     LOG.debug("{} {}",_generator, _parser);
233                     LOG.debug("{}",_endp);
234 
235                     boolean complete = failed || _generator.isComplete() && _parser.isComplete();
236 
237                     if (complete)
238                     {
239                         boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
240                         _generator.setPersistent(persistent);
241                         reset();
242                         if (persistent)
243                             _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
244 
245                         synchronized (this)
246                         {
247                             exchange=_exchange;
248                             _exchange = null;
249 
250                             // Cancel the exchange
251                             if (exchange!=null)
252                             {
253                                 exchange.cancelTimeout(_destination.getHttpClient());
254 
255                                 // TODO should we check the exchange is done?
256                             }
257 
258                             // handle switched protocols
259                             if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
260                             {
261                                 Connection switched=exchange.onSwitchProtocol(_endp);
262                                 if (switched!=null)
263                                     connection=switched;
264                                 {
265                                     // switched protocol!
266                                     _pipeline = null;
267                                     if (_pipeline!=null)
268                                         _destination.send(_pipeline);
269                                     _pipeline = null;
270 
271                                     connection=switched;
272                                 }
273                             }
274 
275                             // handle pipelined requests
276                             if (_pipeline!=null)
277                             {
278                                 if (!persistent || connection!=this)
279                                     _destination.send(_pipeline);
280                                 else
281                                     _exchange=_pipeline;
282                                 _pipeline=null;
283                             }
284 
285                             if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
286                                 _destination.returnConnection(this, !persistent);
287                         }
288                     }
289                 }
290             }
291         }
292         finally
293         {
294             _parser.returnBuffers();
295             _generator.returnBuffers();
296         }
297 
298         return connection;
299     }
300 
301     @Override
send(HttpExchange ex)302     public boolean send(HttpExchange ex) throws IOException
303     {
304         boolean sent=super.send(ex);
305         if (sent)
306         {
307             synchronized (this)
308             {
309                 notifyAll();
310             }
311         }
312         return sent;
313     }
314 }
315