1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.client; 20 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 24 import org.eclipse.jetty.http.AbstractGenerator; 25 import org.eclipse.jetty.http.HttpStatus; 26 import org.eclipse.jetty.io.Buffer; 27 import org.eclipse.jetty.io.Buffers; 28 import org.eclipse.jetty.io.Connection; 29 import org.eclipse.jetty.io.EndPoint; 30 import org.eclipse.jetty.util.log.Log; 31 import org.eclipse.jetty.util.log.Logger; 32 33 34 /* ------------------------------------------------------------ */ 35 /** Blocking HTTP Connection 36 */ 37 public class BlockingHttpConnection extends AbstractHttpConnection 38 { 39 private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class); 40 41 private boolean _requestComplete; 42 private Buffer _requestContentChunk; 43 private boolean _expired=false; 44 BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)45 BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint) 46 { 47 super(requestBuffers, responseBuffers, endPoint); 48 } 49 reset()50 protected void reset() throws IOException 51 { 52 _requestComplete = false; 53 _expired = false; 54 super.reset(); 55 } 56 57 58 @Override exchangeExpired(HttpExchange exchange)59 protected void exchangeExpired(HttpExchange exchange) 60 { 61 synchronized (this) 62 { 63 super.exchangeExpired(exchange); 64 _expired = true; 65 this.notifyAll(); 66 } 67 } 68 69 70 71 @Override onIdleExpired(long idleForMs)72 public void onIdleExpired(long idleForMs) 73 { 74 try 75 { 76 LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); 77 _expired = true; 78 _endp.close(); 79 } 80 catch(IOException e) 81 { 82 LOG.ignore(e); 83 84 try 85 { 86 _endp.close(); 87 } 88 catch(IOException e2) 89 { 90 LOG.ignore(e2); 91 } 92 } 93 94 synchronized(this) 95 { 96 this.notifyAll(); 97 } 98 } 99 100 @Override handle()101 public Connection handle() throws IOException 102 { 103 Connection connection = this; 104 105 try 106 { 107 boolean failed = false; 108 109 110 // While we are making progress and have not changed connection 111 while (_endp.isOpen() && connection==this) 112 { 113 LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer()); 114 115 HttpExchange exchange; 116 synchronized (this) 117 { 118 exchange=_exchange; 119 while (exchange == null) 120 { 121 try 122 { 123 this.wait(); 124 exchange=_exchange; 125 if (_expired) 126 { 127 failed = true; 128 throw new InterruptedException(); 129 } 130 131 } 132 catch (InterruptedException e) 133 { 134 throw new InterruptedIOException(); 135 } 136 } 137 } 138 LOG.debug("exchange {}",exchange); 139 140 try 141 { 142 // Should we commit the request? 143 if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT) 144 { 145 LOG.debug("commit"); 146 commitRequest(); 147 } 148 149 // Generate output 150 while (_generator.isCommitted() && !_generator.isComplete()) 151 { 152 if (_generator.flushBuffer()>0) 153 { 154 LOG.debug("flushed"); 155 } 156 157 // Is there more content to send or should we complete the generator 158 if (_generator.isState(AbstractGenerator.STATE_CONTENT)) 159 { 160 // Look for more content to send. 161 if (_requestContentChunk==null) 162 _requestContentChunk = exchange.getRequestContentChunk(null); 163 164 if (_requestContentChunk==null) 165 { 166 LOG.debug("complete"); 167 _generator.complete(); 168 } 169 else if (_generator.isEmpty()) 170 { 171 LOG.debug("addChunk"); 172 Buffer chunk=_requestContentChunk; 173 _requestContentChunk=exchange.getRequestContentChunk(null); 174 _generator.addContent(chunk,_requestContentChunk==null); 175 if (_requestContentChunk==null) 176 exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); 177 } 178 } 179 } 180 181 // Signal request completion 182 if (_generator.isComplete() && !_requestComplete) 183 { 184 LOG.debug("requestComplete"); 185 _requestComplete = true; 186 exchange.getEventListener().onRequestComplete(); 187 } 188 189 // Read any input that is available 190 if (!_parser.isComplete() && _parser.parseAvailable()) 191 { 192 LOG.debug("parsed"); 193 } 194 195 // Flush output 196 _endp.flush(); 197 } 198 catch (Throwable e) 199 { 200 LOG.debug("Failure on " + _exchange, e); 201 202 failed = true; 203 204 synchronized (this) 205 { 206 if (exchange != null) 207 { 208 // Cancelling the exchange causes an exception as we close the connection, 209 // but we don't report it as it is normal cancelling operation 210 if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING && 211 exchange.getStatus() != HttpExchange.STATUS_CANCELLED && 212 !exchange.isDone()) 213 { 214 if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED)) 215 exchange.getEventListener().onException(e); 216 } 217 } 218 else 219 { 220 if (e instanceof IOException) 221 throw (IOException)e; 222 if (e instanceof Error) 223 throw (Error)e; 224 if (e instanceof RuntimeException) 225 throw (RuntimeException)e; 226 throw new RuntimeException(e); 227 } 228 } 229 } 230 finally 231 { 232 LOG.debug("{} {}",_generator, _parser); 233 LOG.debug("{}",_endp); 234 235 boolean complete = failed || _generator.isComplete() && _parser.isComplete(); 236 237 if (complete) 238 { 239 boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent(); 240 _generator.setPersistent(persistent); 241 reset(); 242 if (persistent) 243 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout()); 244 245 synchronized (this) 246 { 247 exchange=_exchange; 248 _exchange = null; 249 250 // Cancel the exchange 251 if (exchange!=null) 252 { 253 exchange.cancelTimeout(_destination.getHttpClient()); 254 255 // TODO should we check the exchange is done? 256 } 257 258 // handle switched protocols 259 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101) 260 { 261 Connection switched=exchange.onSwitchProtocol(_endp); 262 if (switched!=null) 263 connection=switched; 264 { 265 // switched protocol! 266 _pipeline = null; 267 if (_pipeline!=null) 268 _destination.send(_pipeline); 269 _pipeline = null; 270 271 connection=switched; 272 } 273 } 274 275 // handle pipelined requests 276 if (_pipeline!=null) 277 { 278 if (!persistent || connection!=this) 279 _destination.send(_pipeline); 280 else 281 _exchange=_pipeline; 282 _pipeline=null; 283 } 284 285 if (_exchange==null && !isReserved()) // TODO how do we return switched connections? 286 _destination.returnConnection(this, !persistent); 287 } 288 } 289 } 290 } 291 } 292 finally 293 { 294 _parser.returnBuffers(); 295 _generator.returnBuffers(); 296 } 297 298 return connection; 299 } 300 301 @Override send(HttpExchange ex)302 public boolean send(HttpExchange ex) throws IOException 303 { 304 boolean sent=super.send(ex); 305 if (sent) 306 { 307 synchronized (this) 308 { 309 notifyAll(); 310 } 311 } 312 return sent; 313 } 314 } 315