1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.server.nio; 20 21 import java.io.IOException; 22 import java.net.InetSocketAddress; 23 import java.net.Socket; 24 import java.nio.channels.ByteChannel; 25 import java.nio.channels.SelectionKey; 26 import java.nio.channels.ServerSocketChannel; 27 import java.nio.channels.SocketChannel; 28 import java.util.Set; 29 30 import org.eclipse.jetty.http.HttpException; 31 import org.eclipse.jetty.io.Buffer; 32 import org.eclipse.jetty.io.ConnectedEndPoint; 33 import org.eclipse.jetty.io.Connection; 34 import org.eclipse.jetty.io.EndPoint; 35 import org.eclipse.jetty.io.EofException; 36 import org.eclipse.jetty.io.nio.ChannelEndPoint; 37 import org.eclipse.jetty.server.BlockingHttpConnection; 38 import org.eclipse.jetty.server.Request; 39 import org.eclipse.jetty.util.ConcurrentHashSet; 40 import org.eclipse.jetty.util.log.Log; 41 import org.eclipse.jetty.util.log.Logger; 42 43 44 /* ------------------------------------------------------------------------------- */ 45 /** Blocking NIO connector. 46 * This connector uses efficient NIO buffers with a traditional blocking thread model. 47 * Direct NIO buffers are used and a thread is allocated per connections. 48 * 49 * This connector is best used when there are a few very active connections. 50 * 51 * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector" 52 * 53 * 54 * 55 */ 56 public class BlockingChannelConnector extends AbstractNIOConnector 57 { 58 private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class); 59 60 private transient ServerSocketChannel _acceptChannel; 61 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>(); 62 63 64 /* ------------------------------------------------------------ */ 65 /** Constructor. 66 * 67 */ BlockingChannelConnector()68 public BlockingChannelConnector() 69 { 70 } 71 72 /* ------------------------------------------------------------ */ getConnection()73 public Object getConnection() 74 { 75 return _acceptChannel; 76 } 77 78 /* ------------------------------------------------------------ */ 79 /** 80 * @see org.eclipse.jetty.server.AbstractConnector#doStart() 81 */ 82 @Override doStart()83 protected void doStart() throws Exception 84 { 85 super.doStart(); 86 getThreadPool().dispatch(new Runnable() 87 { 88 89 public void run() 90 { 91 while (isRunning()) 92 { 93 try 94 { 95 Thread.sleep(400); 96 long now=System.currentTimeMillis(); 97 for (BlockingChannelEndPoint endp : _endpoints) 98 { 99 endp.checkIdleTimestamp(now); 100 } 101 } 102 catch(InterruptedException e) 103 { 104 LOG.ignore(e); 105 } 106 catch(Exception e) 107 { 108 LOG.warn(e); 109 } 110 } 111 } 112 113 }); 114 115 } 116 117 118 /* ------------------------------------------------------------ */ open()119 public void open() throws IOException 120 { 121 // Create a new server socket and set to non blocking mode 122 _acceptChannel= ServerSocketChannel.open(); 123 _acceptChannel.configureBlocking(true); 124 125 // Bind the server socket to the local host and port 126 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); 127 _acceptChannel.socket().bind(addr,getAcceptQueueSize()); 128 } 129 130 /* ------------------------------------------------------------ */ close()131 public void close() throws IOException 132 { 133 if (_acceptChannel != null) 134 _acceptChannel.close(); 135 _acceptChannel=null; 136 } 137 138 /* ------------------------------------------------------------ */ 139 @Override accept(int acceptorID)140 public void accept(int acceptorID) 141 throws IOException, InterruptedException 142 { 143 SocketChannel channel = _acceptChannel.accept(); 144 channel.configureBlocking(true); 145 Socket socket=channel.socket(); 146 configure(socket); 147 148 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel); 149 connection.dispatch(); 150 } 151 152 /* ------------------------------------------------------------------------------- */ 153 @Override customize(EndPoint endpoint, Request request)154 public void customize(EndPoint endpoint, Request request) 155 throws IOException 156 { 157 super.customize(endpoint, request); 158 endpoint.setMaxIdleTime(_maxIdleTime); 159 configure(((SocketChannel)endpoint.getTransport()).socket()); 160 } 161 162 163 /* ------------------------------------------------------------------------------- */ getLocalPort()164 public int getLocalPort() 165 { 166 if (_acceptChannel==null || !_acceptChannel.isOpen()) 167 return -1; 168 return _acceptChannel.socket().getLocalPort(); 169 } 170 171 /* ------------------------------------------------------------------------------- */ 172 /* ------------------------------------------------------------------------------- */ 173 /* ------------------------------------------------------------------------------- */ 174 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint 175 { 176 private Connection _connection; 177 private int _timeout; 178 private volatile long _idleTimestamp; 179 BlockingChannelEndPoint(ByteChannel channel)180 BlockingChannelEndPoint(ByteChannel channel) 181 throws IOException 182 { 183 super(channel,BlockingChannelConnector.this._maxIdleTime); 184 _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer()); 185 } 186 187 /* ------------------------------------------------------------ */ 188 /** Get the connection. 189 * @return the connection 190 */ getConnection()191 public Connection getConnection() 192 { 193 return _connection; 194 } 195 196 /* ------------------------------------------------------------ */ setConnection(Connection connection)197 public void setConnection(Connection connection) 198 { 199 _connection=connection; 200 } 201 202 /* ------------------------------------------------------------ */ checkIdleTimestamp(long now)203 public void checkIdleTimestamp(long now) 204 { 205 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout)) 206 { 207 idleExpired(); 208 } 209 } 210 211 /* ------------------------------------------------------------ */ idleExpired()212 protected void idleExpired() 213 { 214 try 215 { 216 super.close(); 217 } 218 catch (IOException e) 219 { 220 LOG.ignore(e); 221 } 222 } 223 224 /* ------------------------------------------------------------ */ dispatch()225 void dispatch() throws IOException 226 { 227 if (!getThreadPool().dispatch(this)) 228 { 229 LOG.warn("dispatch failed for {}",_connection); 230 super.close(); 231 } 232 } 233 234 /* ------------------------------------------------------------ */ 235 /** 236 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer) 237 */ 238 @Override fill(Buffer buffer)239 public int fill(Buffer buffer) throws IOException 240 { 241 _idleTimestamp=System.currentTimeMillis(); 242 return super.fill(buffer); 243 } 244 245 /* ------------------------------------------------------------ */ 246 /** 247 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer) 248 */ 249 @Override flush(Buffer buffer)250 public int flush(Buffer buffer) throws IOException 251 { 252 _idleTimestamp=System.currentTimeMillis(); 253 return super.flush(buffer); 254 } 255 256 /* ------------------------------------------------------------ */ 257 /** 258 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer) 259 */ 260 @Override flush(Buffer header, Buffer buffer, Buffer trailer)261 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException 262 { 263 _idleTimestamp=System.currentTimeMillis(); 264 return super.flush(header,buffer,trailer); 265 } 266 267 /* ------------------------------------------------------------ */ run()268 public void run() 269 { 270 try 271 { 272 _timeout=getMaxIdleTime(); 273 connectionOpened(_connection); 274 _endpoints.add(this); 275 276 while (isOpen()) 277 { 278 _idleTimestamp=System.currentTimeMillis(); 279 if (_connection.isIdle()) 280 { 281 if (getServer().getThreadPool().isLowOnThreads()) 282 { 283 int lrmit = getLowResourcesMaxIdleTime(); 284 if (lrmit>=0 && _timeout!= lrmit) 285 { 286 _timeout=lrmit; 287 } 288 } 289 } 290 else 291 { 292 if (_timeout!=getMaxIdleTime()) 293 { 294 _timeout=getMaxIdleTime(); 295 } 296 } 297 298 _connection = _connection.handle(); 299 300 } 301 } 302 catch (EofException e) 303 { 304 LOG.debug("EOF", e); 305 try{BlockingChannelEndPoint.this.close();} 306 catch(IOException e2){LOG.ignore(e2);} 307 } 308 catch (HttpException e) 309 { 310 LOG.debug("BAD", e); 311 try{super.close();} 312 catch(IOException e2){LOG.ignore(e2);} 313 } 314 catch(Throwable e) 315 { 316 LOG.warn("handle failed",e); 317 try{super.close();} 318 catch(IOException e2){LOG.ignore(e2);} 319 } 320 finally 321 { 322 connectionClosed(_connection); 323 _endpoints.remove(this); 324 325 // wait for client to close, but if not, close ourselves. 326 try 327 { 328 if (!_socket.isClosed()) 329 { 330 long timestamp=System.currentTimeMillis(); 331 int max_idle=getMaxIdleTime(); 332 333 _socket.setSoTimeout(getMaxIdleTime()); 334 int c=0; 335 do 336 { 337 c = _socket.getInputStream().read(); 338 } 339 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); 340 if (!_socket.isClosed()) 341 _socket.close(); 342 } 343 } 344 catch(IOException e) 345 { 346 LOG.ignore(e); 347 } 348 } 349 } 350 351 /* ------------------------------------------------------------ */ 352 @Override toString()353 public String toString() 354 { 355 return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}", 356 hashCode(), 357 _socket.getRemoteSocketAddress(), 358 _socket.getLocalSocketAddress(), 359 isOpen(), 360 isInputShutdown(), 361 isOutputShutdown(), 362 _connection); 363 } 364 365 } 366 } 367