1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 package org.eclipse.jetty.server.nio;
20 
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.nio.channels.ByteChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.ServerSocketChannel;
27 import java.nio.channels.SocketChannel;
28 import java.util.Set;
29 
30 import org.eclipse.jetty.http.HttpException;
31 import org.eclipse.jetty.io.Buffer;
32 import org.eclipse.jetty.io.ConnectedEndPoint;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.io.EofException;
36 import org.eclipse.jetty.io.nio.ChannelEndPoint;
37 import org.eclipse.jetty.server.BlockingHttpConnection;
38 import org.eclipse.jetty.server.Request;
39 import org.eclipse.jetty.util.ConcurrentHashSet;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 
43 
44 /* ------------------------------------------------------------------------------- */
45 /**  Blocking NIO connector.
46  * This connector uses efficient NIO buffers with a traditional blocking thread model.
47  * Direct NIO buffers are used and a thread is allocated per connections.
48  *
49  * This connector is best used when there are a few very active connections.
50  *
51  * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
52  *
53  *
54  *
55  */
56 public class BlockingChannelConnector extends AbstractNIOConnector
57 {
58     private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
59 
60     private transient ServerSocketChannel _acceptChannel;
61     private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
62 
63 
64     /* ------------------------------------------------------------ */
65     /** Constructor.
66      *
67      */
BlockingChannelConnector()68     public BlockingChannelConnector()
69     {
70     }
71 
72     /* ------------------------------------------------------------ */
getConnection()73     public Object getConnection()
74     {
75         return _acceptChannel;
76     }
77 
78     /* ------------------------------------------------------------ */
79     /**
80      * @see org.eclipse.jetty.server.AbstractConnector#doStart()
81      */
82     @Override
doStart()83     protected void doStart() throws Exception
84     {
85         super.doStart();
86         getThreadPool().dispatch(new Runnable()
87         {
88 
89             public void run()
90             {
91                 while (isRunning())
92                 {
93                     try
94                     {
95                         Thread.sleep(400);
96                         long now=System.currentTimeMillis();
97                         for (BlockingChannelEndPoint endp : _endpoints)
98                         {
99                             endp.checkIdleTimestamp(now);
100                         }
101                     }
102                     catch(InterruptedException e)
103                     {
104                         LOG.ignore(e);
105                     }
106                     catch(Exception e)
107                     {
108                         LOG.warn(e);
109                     }
110                 }
111             }
112 
113         });
114 
115     }
116 
117 
118     /* ------------------------------------------------------------ */
open()119     public void open() throws IOException
120     {
121         // Create a new server socket and set to non blocking mode
122         _acceptChannel= ServerSocketChannel.open();
123         _acceptChannel.configureBlocking(true);
124 
125         // Bind the server socket to the local host and port
126         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
127         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
128     }
129 
130     /* ------------------------------------------------------------ */
close()131     public void close() throws IOException
132     {
133         if (_acceptChannel != null)
134             _acceptChannel.close();
135         _acceptChannel=null;
136     }
137 
138     /* ------------------------------------------------------------ */
139     @Override
accept(int acceptorID)140     public void accept(int acceptorID)
141     	throws IOException, InterruptedException
142     {
143         SocketChannel channel = _acceptChannel.accept();
144         channel.configureBlocking(true);
145         Socket socket=channel.socket();
146         configure(socket);
147 
148         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
149         connection.dispatch();
150     }
151 
152     /* ------------------------------------------------------------------------------- */
153     @Override
customize(EndPoint endpoint, Request request)154     public void customize(EndPoint endpoint, Request request)
155         throws IOException
156     {
157         super.customize(endpoint, request);
158         endpoint.setMaxIdleTime(_maxIdleTime);
159         configure(((SocketChannel)endpoint.getTransport()).socket());
160     }
161 
162 
163     /* ------------------------------------------------------------------------------- */
getLocalPort()164     public int getLocalPort()
165     {
166         if (_acceptChannel==null || !_acceptChannel.isOpen())
167             return -1;
168         return _acceptChannel.socket().getLocalPort();
169     }
170 
171     /* ------------------------------------------------------------------------------- */
172     /* ------------------------------------------------------------------------------- */
173     /* ------------------------------------------------------------------------------- */
174     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
175     {
176         private Connection _connection;
177         private int _timeout;
178         private volatile long _idleTimestamp;
179 
BlockingChannelEndPoint(ByteChannel channel)180         BlockingChannelEndPoint(ByteChannel channel)
181             throws IOException
182         {
183             super(channel,BlockingChannelConnector.this._maxIdleTime);
184             _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
185         }
186 
187         /* ------------------------------------------------------------ */
188         /** Get the connection.
189          * @return the connection
190          */
getConnection()191         public Connection getConnection()
192         {
193             return _connection;
194         }
195 
196         /* ------------------------------------------------------------ */
setConnection(Connection connection)197         public void setConnection(Connection connection)
198         {
199             _connection=connection;
200         }
201 
202         /* ------------------------------------------------------------ */
checkIdleTimestamp(long now)203         public void checkIdleTimestamp(long now)
204         {
205             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
206             {
207                 idleExpired();
208             }
209         }
210 
211         /* ------------------------------------------------------------ */
idleExpired()212         protected void idleExpired()
213         {
214             try
215             {
216                 super.close();
217             }
218             catch (IOException e)
219             {
220                 LOG.ignore(e);
221             }
222         }
223 
224         /* ------------------------------------------------------------ */
dispatch()225         void dispatch() throws IOException
226         {
227             if (!getThreadPool().dispatch(this))
228             {
229                 LOG.warn("dispatch failed for  {}",_connection);
230                 super.close();
231             }
232         }
233 
234         /* ------------------------------------------------------------ */
235         /**
236          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
237          */
238         @Override
fill(Buffer buffer)239         public int fill(Buffer buffer) throws IOException
240         {
241             _idleTimestamp=System.currentTimeMillis();
242             return super.fill(buffer);
243         }
244 
245         /* ------------------------------------------------------------ */
246         /**
247          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
248          */
249         @Override
flush(Buffer buffer)250         public int flush(Buffer buffer) throws IOException
251         {
252             _idleTimestamp=System.currentTimeMillis();
253             return super.flush(buffer);
254         }
255 
256         /* ------------------------------------------------------------ */
257         /**
258          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
259          */
260         @Override
flush(Buffer header, Buffer buffer, Buffer trailer)261         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
262         {
263             _idleTimestamp=System.currentTimeMillis();
264             return super.flush(header,buffer,trailer);
265         }
266 
267         /* ------------------------------------------------------------ */
run()268         public void run()
269         {
270             try
271             {
272                 _timeout=getMaxIdleTime();
273                 connectionOpened(_connection);
274                 _endpoints.add(this);
275 
276                 while (isOpen())
277                 {
278                     _idleTimestamp=System.currentTimeMillis();
279                     if (_connection.isIdle())
280                     {
281                         if (getServer().getThreadPool().isLowOnThreads())
282                         {
283                             int lrmit = getLowResourcesMaxIdleTime();
284                             if (lrmit>=0 && _timeout!= lrmit)
285                             {
286                                 _timeout=lrmit;
287                             }
288                         }
289                     }
290                     else
291                     {
292                         if (_timeout!=getMaxIdleTime())
293                         {
294                             _timeout=getMaxIdleTime();
295                         }
296                     }
297 
298                     _connection = _connection.handle();
299 
300                 }
301             }
302             catch (EofException e)
303             {
304                 LOG.debug("EOF", e);
305                 try{BlockingChannelEndPoint.this.close();}
306                 catch(IOException e2){LOG.ignore(e2);}
307             }
308             catch (HttpException e)
309             {
310                 LOG.debug("BAD", e);
311                 try{super.close();}
312                 catch(IOException e2){LOG.ignore(e2);}
313             }
314             catch(Throwable e)
315             {
316                 LOG.warn("handle failed",e);
317                 try{super.close();}
318                 catch(IOException e2){LOG.ignore(e2);}
319             }
320             finally
321             {
322                 connectionClosed(_connection);
323                 _endpoints.remove(this);
324 
325                 // wait for client to close, but if not, close ourselves.
326                 try
327                 {
328                     if (!_socket.isClosed())
329                     {
330                         long timestamp=System.currentTimeMillis();
331                         int max_idle=getMaxIdleTime();
332 
333                         _socket.setSoTimeout(getMaxIdleTime());
334                         int c=0;
335                         do
336                         {
337                             c = _socket.getInputStream().read();
338                         }
339                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
340                         if (!_socket.isClosed())
341                             _socket.close();
342                     }
343                 }
344                 catch(IOException e)
345                 {
346                     LOG.ignore(e);
347                 }
348             }
349         }
350 
351         /* ------------------------------------------------------------ */
352         @Override
toString()353         public String toString()
354         {
355             return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
356                     hashCode(),
357                     _socket.getRemoteSocketAddress(),
358                     _socket.getLocalSocketAddress(),
359                     isOpen(),
360                     isInputShutdown(),
361                     isOutputShutdown(),
362                     _connection);
363         }
364 
365     }
366 }
367