1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.client; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.util.Collections; 24 import java.util.concurrent.atomic.AtomicBoolean; 25 26 import org.eclipse.jetty.client.security.Authentication; 27 import org.eclipse.jetty.http.HttpFields; 28 import org.eclipse.jetty.http.HttpGenerator; 29 import org.eclipse.jetty.http.HttpHeaderValues; 30 import org.eclipse.jetty.http.HttpHeaders; 31 import org.eclipse.jetty.http.HttpMethods; 32 import org.eclipse.jetty.http.HttpParser; 33 import org.eclipse.jetty.http.HttpSchemes; 34 import org.eclipse.jetty.http.HttpStatus; 35 import org.eclipse.jetty.http.HttpVersions; 36 import org.eclipse.jetty.io.AbstractConnection; 37 import org.eclipse.jetty.io.Buffer; 38 import org.eclipse.jetty.io.Buffers; 39 import org.eclipse.jetty.io.Connection; 40 import org.eclipse.jetty.io.EndPoint; 41 import org.eclipse.jetty.io.EofException; 42 import org.eclipse.jetty.io.View; 43 import org.eclipse.jetty.util.component.AggregateLifeCycle; 44 import org.eclipse.jetty.util.component.Dumpable; 45 import org.eclipse.jetty.util.log.Log; 46 import org.eclipse.jetty.util.log.Logger; 47 import org.eclipse.jetty.util.thread.Timeout; 48 49 /** 50 * 51 * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $ 52 */ 53 public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable 54 { 55 private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class); 56 57 protected HttpDestination _destination; 58 protected HttpGenerator _generator; 59 protected HttpParser _parser; 60 protected boolean _http11 = true; 61 protected int _status; 62 protected Buffer _connectionHeader; 63 protected boolean _reserved; 64 65 // The current exchange waiting for a response 66 protected volatile HttpExchange _exchange; 67 protected HttpExchange _pipeline; 68 private final Timeout.Task _idleTimeout = new ConnectionIdleTask(); 69 private AtomicBoolean _idle = new AtomicBoolean(false); 70 71 AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)72 AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp) 73 { 74 super(endp); 75 76 _generator = new HttpGenerator(requestBuffers,endp); 77 _parser = new HttpParser(responseBuffers,endp,new Handler()); 78 } 79 setReserved(boolean reserved)80 public void setReserved (boolean reserved) 81 { 82 _reserved = reserved; 83 } 84 isReserved()85 public boolean isReserved() 86 { 87 return _reserved; 88 } 89 getDestination()90 public HttpDestination getDestination() 91 { 92 return _destination; 93 } 94 setDestination(HttpDestination destination)95 public void setDestination(HttpDestination destination) 96 { 97 _destination = destination; 98 } 99 send(HttpExchange ex)100 public boolean send(HttpExchange ex) throws IOException 101 { 102 LOG.debug("Send {} on {}",ex,this); 103 synchronized (this) 104 { 105 if (_exchange != null) 106 { 107 if (_pipeline != null) 108 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange); 109 _pipeline = ex; 110 return true; 111 } 112 113 _exchange = ex; 114 _exchange.associate(this); 115 116 // The call to associate() may have closed the connection, check if it's the case 117 if (!_endp.isOpen()) 118 { 119 _exchange.disassociate(); 120 _exchange = null; 121 return false; 122 } 123 124 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT); 125 126 adjustIdleTimeout(); 127 128 return true; 129 } 130 } 131 adjustIdleTimeout()132 private void adjustIdleTimeout() throws IOException 133 { 134 // Adjusts the idle timeout in case the default or exchange timeout 135 // are greater. This is needed for long polls, where one wants an 136 // aggressive releasing of idle connections (so idle timeout is small) 137 // but still allow long polls to complete normally 138 139 long timeout = _exchange.getTimeout(); 140 if (timeout <= 0) 141 timeout = _destination.getHttpClient().getTimeout(); 142 143 long endPointTimeout = _endp.getMaxIdleTime(); 144 145 if (timeout > 0 && timeout > endPointTimeout) 146 { 147 // Make it larger than the exchange timeout so that there are 148 // no races between the idle timeout and the exchange timeout 149 // when trying to close the endpoint 150 _endp.setMaxIdleTime(2 * (int)timeout); 151 } 152 } 153 handle()154 public abstract Connection handle() throws IOException; 155 156 isIdle()157 public boolean isIdle() 158 { 159 synchronized (this) 160 { 161 return _exchange == null; 162 } 163 } 164 isSuspended()165 public boolean isSuspended() 166 { 167 return false; 168 } 169 onClose()170 public void onClose() 171 { 172 } 173 174 /** 175 * @throws IOException 176 */ commitRequest()177 protected void commitRequest() throws IOException 178 { 179 synchronized (this) 180 { 181 _status=0; 182 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT) 183 throw new IllegalStateException(); 184 185 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST); 186 _generator.setVersion(_exchange.getVersion()); 187 188 String method=_exchange.getMethod(); 189 String uri = _exchange.getRequestURI(); 190 if (_destination.isProxied()) 191 { 192 if (!HttpMethods.CONNECT.equals(method) && uri.startsWith("/")) 193 { 194 boolean secure = _destination.isSecure(); 195 String host = _destination.getAddress().getHost(); 196 int port = _destination.getAddress().getPort(); 197 StringBuilder absoluteURI = new StringBuilder(); 198 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP); 199 absoluteURI.append("://"); 200 absoluteURI.append(host); 201 // Avoid adding default ports 202 if (!(secure && port == 443 || !secure && port == 80)) 203 absoluteURI.append(":").append(port); 204 absoluteURI.append(uri); 205 uri = absoluteURI.toString(); 206 } 207 Authentication auth = _destination.getProxyAuthentication(); 208 if (auth != null) 209 auth.setCredentials(_exchange); 210 } 211 212 _generator.setRequest(method, uri); 213 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method)); 214 215 HttpFields requestHeaders = _exchange.getRequestFields(); 216 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL) 217 { 218 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER)) 219 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader()); 220 } 221 222 Buffer requestContent = _exchange.getRequestContent(); 223 if (requestContent != null) 224 { 225 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length()); 226 _generator.completeHeader(requestHeaders,false); 227 _generator.addContent(new View(requestContent),true); 228 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); 229 } 230 else 231 { 232 InputStream requestContentStream = _exchange.getRequestContentSource(); 233 if (requestContentStream != null) 234 { 235 _generator.completeHeader(requestHeaders, false); 236 } 237 else 238 { 239 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH); 240 _generator.completeHeader(requestHeaders, true); 241 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); 242 } 243 } 244 } 245 } 246 reset()247 protected void reset() throws IOException 248 { 249 _connectionHeader = null; 250 _parser.reset(); 251 _generator.reset(); 252 _http11 = true; 253 } 254 255 256 private class Handler extends HttpParser.EventHandler 257 { 258 @Override startRequest(Buffer method, Buffer url, Buffer version)259 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException 260 { 261 // System.out.println( method.toString() + "///" + url.toString() + 262 // "///" + version.toString() ); 263 // TODO validate this is acceptable, the <!DOCTYPE goop was coming 264 // out here 265 // throw new IllegalStateException(); 266 } 267 268 @Override startResponse(Buffer version, int status, Buffer reason)269 public void startResponse(Buffer version, int status, Buffer reason) throws IOException 270 { 271 HttpExchange exchange = _exchange; 272 if (exchange==null) 273 { 274 LOG.warn("No exchange for response"); 275 _endp.close(); 276 return; 277 } 278 279 switch(status) 280 { 281 case HttpStatus.CONTINUE_100: 282 case HttpStatus.PROCESSING_102: 283 // TODO check if appropriate expect was sent in the request. 284 exchange.setEventListener(new NonFinalResponseListener(exchange)); 285 break; 286 287 case HttpStatus.OK_200: 288 // handle special case for CONNECT 200 responses 289 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod())) 290 _parser.setHeadResponse(true); 291 break; 292 } 293 294 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version); 295 _status=status; 296 exchange.getEventListener().onResponseStatus(version,status,reason); 297 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS); 298 299 } 300 301 @Override parsedHeader(Buffer name, Buffer value)302 public void parsedHeader(Buffer name, Buffer value) throws IOException 303 { 304 HttpExchange exchange = _exchange; 305 if (exchange!=null) 306 { 307 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL) 308 { 309 _connectionHeader = HttpHeaderValues.CACHE.lookup(value); 310 } 311 exchange.getEventListener().onResponseHeader(name,value); 312 } 313 } 314 315 @Override headerComplete()316 public void headerComplete() throws IOException 317 { 318 HttpExchange exchange = _exchange; 319 if (exchange!=null) 320 { 321 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT); 322 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod())) 323 _parser.setPersistent(true); 324 } 325 } 326 327 @Override content(Buffer ref)328 public void content(Buffer ref) throws IOException 329 { 330 HttpExchange exchange = _exchange; 331 if (exchange!=null) 332 exchange.getEventListener().onResponseContent(ref); 333 } 334 335 @Override messageComplete(long contextLength)336 public void messageComplete(long contextLength) throws IOException 337 { 338 HttpExchange exchange = _exchange; 339 if (exchange!=null) 340 exchange.setStatus(HttpExchange.STATUS_COMPLETED); 341 } 342 343 @Override earlyEOF()344 public void earlyEOF() 345 { 346 HttpExchange exchange = _exchange; 347 if (exchange!=null) 348 { 349 if (!exchange.isDone()) 350 { 351 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED)) 352 exchange.getEventListener().onException(new EofException("early EOF")); 353 } 354 } 355 } 356 } 357 358 @Override toString()359 public String toString() 360 { 361 return String.format("%s %s g=%s p=%s", 362 super.toString(), 363 _destination == null ? "?.?.?.?:??" : _destination.getAddress(), 364 _generator, 365 _parser); 366 } 367 toDetailString()368 public String toDetailString() 369 { 370 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge(); 371 } 372 close()373 public void close() throws IOException 374 { 375 //if there is a live, unfinished exchange, set its status to be 376 //excepted and wake up anyone waiting on waitForDone() 377 378 HttpExchange exchange = _exchange; 379 if (exchange != null && !exchange.isDone()) 380 { 381 switch (exchange.getStatus()) 382 { 383 case HttpExchange.STATUS_CANCELLED: 384 case HttpExchange.STATUS_CANCELLING: 385 case HttpExchange.STATUS_COMPLETED: 386 case HttpExchange.STATUS_EXCEPTED: 387 case HttpExchange.STATUS_EXPIRED: 388 break; 389 case HttpExchange.STATUS_PARSING_CONTENT: 390 if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT)) 391 break; 392 default: 393 String exch= exchange.toString(); 394 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: "; 395 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED)) 396 exchange.getEventListener().onException(new EofException(reason+exch)); 397 } 398 } 399 400 if (_endp.isOpen()) 401 { 402 _endp.close(); 403 _destination.returnConnection(this, true); 404 } 405 } 406 setIdleTimeout()407 public void setIdleTimeout() 408 { 409 synchronized (this) 410 { 411 if (_idle.compareAndSet(false, true)) 412 _destination.getHttpClient().scheduleIdle(_idleTimeout); 413 else 414 throw new IllegalStateException(); 415 } 416 } 417 cancelIdleTimeout()418 public boolean cancelIdleTimeout() 419 { 420 synchronized (this) 421 { 422 if (_idle.compareAndSet(true, false)) 423 { 424 _destination.getHttpClient().cancel(_idleTimeout); 425 return true; 426 } 427 } 428 429 return false; 430 } 431 exchangeExpired(HttpExchange exchange)432 protected void exchangeExpired(HttpExchange exchange) 433 { 434 synchronized (this) 435 { 436 // We are expiring an exchange, but the exchange is pending 437 // Cannot reuse the connection because the reply may arrive, so close it 438 if (_exchange == exchange) 439 { 440 try 441 { 442 _destination.returnConnection(this, true); 443 } 444 catch (IOException x) 445 { 446 LOG.ignore(x); 447 } 448 } 449 } 450 } 451 452 /* ------------------------------------------------------------ */ 453 /** 454 * @see org.eclipse.jetty.util.component.Dumpable#dump() 455 */ dump()456 public String dump() 457 { 458 return AggregateLifeCycle.dump(this); 459 } 460 461 /* ------------------------------------------------------------ */ 462 /** 463 * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String) 464 */ dump(Appendable out, String indent)465 public void dump(Appendable out, String indent) throws IOException 466 { 467 synchronized (this) 468 { 469 out.append(String.valueOf(this)).append("\n"); 470 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp)); 471 } 472 } 473 474 /* ------------------------------------------------------------ */ 475 private class ConnectionIdleTask extends Timeout.Task 476 { 477 /* ------------------------------------------------------------ */ 478 @Override expired()479 public void expired() 480 { 481 // Connection idle, close it 482 if (_idle.compareAndSet(true, false)) 483 { 484 _destination.returnIdleConnection(AbstractHttpConnection.this); 485 } 486 } 487 } 488 489 490 /* ------------------------------------------------------------ */ 491 private class NonFinalResponseListener implements HttpEventListener 492 { 493 final HttpExchange _exchange; 494 final HttpEventListener _next; 495 496 /* ------------------------------------------------------------ */ NonFinalResponseListener(HttpExchange exchange)497 public NonFinalResponseListener(HttpExchange exchange) 498 { 499 _exchange=exchange; 500 _next=exchange.getEventListener(); 501 } 502 503 /* ------------------------------------------------------------ */ onRequestCommitted()504 public void onRequestCommitted() throws IOException 505 { 506 } 507 508 /* ------------------------------------------------------------ */ onRequestComplete()509 public void onRequestComplete() throws IOException 510 { 511 } 512 513 /* ------------------------------------------------------------ */ onResponseStatus(Buffer version, int status, Buffer reason)514 public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException 515 { 516 } 517 518 /* ------------------------------------------------------------ */ onResponseHeader(Buffer name, Buffer value)519 public void onResponseHeader(Buffer name, Buffer value) throws IOException 520 { 521 _next.onResponseHeader(name,value); 522 } 523 524 /* ------------------------------------------------------------ */ onResponseHeaderComplete()525 public void onResponseHeaderComplete() throws IOException 526 { 527 _next.onResponseHeaderComplete(); 528 } 529 530 /* ------------------------------------------------------------ */ onResponseContent(Buffer content)531 public void onResponseContent(Buffer content) throws IOException 532 { 533 } 534 535 /* ------------------------------------------------------------ */ onResponseComplete()536 public void onResponseComplete() throws IOException 537 { 538 _exchange.setEventListener(_next); 539 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); 540 _parser.reset(); 541 } 542 543 /* ------------------------------------------------------------ */ onConnectionFailed(Throwable ex)544 public void onConnectionFailed(Throwable ex) 545 { 546 _exchange.setEventListener(_next); 547 _next.onConnectionFailed(ex); 548 } 549 550 /* ------------------------------------------------------------ */ onException(Throwable ex)551 public void onException(Throwable ex) 552 { 553 _exchange.setEventListener(_next); 554 _next.onException(ex); 555 } 556 557 /* ------------------------------------------------------------ */ onExpire()558 public void onExpire() 559 { 560 _exchange.setEventListener(_next); 561 _next.onExpire(); 562 } 563 564 /* ------------------------------------------------------------ */ onRetry()565 public void onRetry() 566 { 567 _exchange.setEventListener(_next); 568 _next.onRetry(); 569 } 570 } 571 } 572