1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 
20 package org.eclipse.jetty.util.thread;
21 
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.ArrayBlockingQueue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34 
35 import org.eclipse.jetty.util.BlockingArrayQueue;
36 import org.eclipse.jetty.util.component.AbstractLifeCycle;
37 import org.eclipse.jetty.util.component.AggregateLifeCycle;
38 import org.eclipse.jetty.util.component.Dumpable;
39 import org.eclipse.jetty.util.component.LifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
43 
44 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
45 {
46     private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
47 
48     private final AtomicInteger _threadsStarted = new AtomicInteger();
49     private final AtomicInteger _threadsIdle = new AtomicInteger();
50     private final AtomicLong _lastShrink = new AtomicLong();
51     private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
52     private final Object _joinLock = new Object();
53     private BlockingQueue<Runnable> _jobs;
54     private String _name;
55     private int _maxIdleTimeMs=60000;
56     private int _maxThreads=254;
57     private int _minThreads=8;
58     private int _maxQueued=-1;
59     private int _priority=Thread.NORM_PRIORITY;
60     private boolean _daemon=false;
61     private int _maxStopTime=100;
62     private boolean _detailedDump=false;
63 
64     /* ------------------------------------------------------------------- */
65     /** Construct
66      */
QueuedThreadPool()67     public QueuedThreadPool()
68     {
69         _name="qtp"+super.hashCode();
70     }
71 
72     /* ------------------------------------------------------------------- */
73     /** Construct
74      */
QueuedThreadPool(int maxThreads)75     public QueuedThreadPool(int maxThreads)
76     {
77         this();
78         setMaxThreads(maxThreads);
79     }
80 
81     /* ------------------------------------------------------------------- */
82     /** Construct
83      */
QueuedThreadPool(BlockingQueue<Runnable> jobQ)84     public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
85     {
86         this();
87         _jobs=jobQ;
88         _jobs.clear();
89     }
90 
91 
92     /* ------------------------------------------------------------ */
93     @Override
doStart()94     protected void doStart() throws Exception
95     {
96         super.doStart();
97         _threadsStarted.set(0);
98 
99         if (_jobs==null)
100         {
101             _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
102                 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
103         }
104 
105         int threads=_threadsStarted.get();
106         while (isRunning() && threads<_minThreads)
107         {
108             startThread(threads);
109             threads=_threadsStarted.get();
110         }
111     }
112 
113     /* ------------------------------------------------------------ */
114     @Override
doStop()115     protected void doStop() throws Exception
116     {
117         super.doStop();
118         long start=System.currentTimeMillis();
119 
120         // let jobs complete naturally for a while
121         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
122             Thread.sleep(1);
123 
124         // kill queued jobs and flush out idle jobs
125         _jobs.clear();
126         Runnable noop = new Runnable(){public void run(){}};
127         for  (int i=_threadsIdle.get();i-->0;)
128             _jobs.offer(noop);
129         Thread.yield();
130 
131         // interrupt remaining threads
132         if (_threadsStarted.get()>0)
133             for (Thread thread : _threads)
134                 thread.interrupt();
135 
136         // wait for remaining threads to die
137         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
138         {
139             Thread.sleep(1);
140         }
141         Thread.yield();
142         int size=_threads.size();
143         if (size>0)
144         {
145             LOG.warn(size+" threads could not be stopped");
146 
147             if (size==1 || LOG.isDebugEnabled())
148             {
149                 for (Thread unstopped : _threads)
150                 {
151                     LOG.info("Couldn't stop "+unstopped);
152                     for (StackTraceElement element : unstopped.getStackTrace())
153                     {
154                         LOG.info(" at "+element);
155                     }
156                 }
157             }
158         }
159 
160         synchronized (_joinLock)
161         {
162             _joinLock.notifyAll();
163         }
164     }
165 
166     /* ------------------------------------------------------------ */
167     /**
168      * Delegated to the named or anonymous Pool.
169      */
setDaemon(boolean daemon)170     public void setDaemon(boolean daemon)
171     {
172         _daemon=daemon;
173     }
174 
175     /* ------------------------------------------------------------ */
176     /** Set the maximum thread idle time.
177      * Threads that are idle for longer than this period may be
178      * stopped.
179      * Delegated to the named or anonymous Pool.
180      * @see #getMaxIdleTimeMs
181      * @param maxIdleTimeMs Max idle time in ms.
182      */
setMaxIdleTimeMs(int maxIdleTimeMs)183     public void setMaxIdleTimeMs(int maxIdleTimeMs)
184     {
185         _maxIdleTimeMs=maxIdleTimeMs;
186     }
187 
188     /* ------------------------------------------------------------ */
189     /**
190      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
191      */
setMaxStopTimeMs(int stopTimeMs)192     public void setMaxStopTimeMs(int stopTimeMs)
193     {
194         _maxStopTime = stopTimeMs;
195     }
196 
197     /* ------------------------------------------------------------ */
198     /** Set the maximum number of threads.
199      * Delegated to the named or anonymous Pool.
200      * @see #getMaxThreads
201      * @param maxThreads maximum number of threads.
202      */
setMaxThreads(int maxThreads)203     public void setMaxThreads(int maxThreads)
204     {
205         _maxThreads=maxThreads;
206         if (_minThreads>_maxThreads)
207             _minThreads=_maxThreads;
208     }
209 
210     /* ------------------------------------------------------------ */
211     /** Set the minimum number of threads.
212      * Delegated to the named or anonymous Pool.
213      * @see #getMinThreads
214      * @param minThreads minimum number of threads
215      */
setMinThreads(int minThreads)216     public void setMinThreads(int minThreads)
217     {
218         _minThreads=minThreads;
219 
220         if (_minThreads>_maxThreads)
221             _maxThreads=_minThreads;
222 
223         int threads=_threadsStarted.get();
224         while (isStarted() && threads<_minThreads)
225         {
226             startThread(threads);
227             threads=_threadsStarted.get();
228         }
229     }
230 
231     /* ------------------------------------------------------------ */
232     /**
233      * @param name Name of the BoundedThreadPool to use when naming Threads.
234      */
setName(String name)235     public void setName(String name)
236     {
237         if (isRunning())
238             throw new IllegalStateException("started");
239         _name= name;
240     }
241 
242     /* ------------------------------------------------------------ */
243     /** Set the priority of the pool threads.
244      *  @param priority the new thread priority.
245      */
setThreadsPriority(int priority)246     public void setThreadsPriority(int priority)
247     {
248         _priority=priority;
249     }
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * @return maximum queue size
254      */
getMaxQueued()255     public int getMaxQueued()
256     {
257         return _maxQueued;
258     }
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * @param max job queue size
263      */
setMaxQueued(int max)264     public void setMaxQueued(int max)
265     {
266         if (isRunning())
267             throw new IllegalStateException("started");
268         _maxQueued=max;
269     }
270 
271     /* ------------------------------------------------------------ */
272     /** Get the maximum thread idle time.
273      * Delegated to the named or anonymous Pool.
274      * @see #setMaxIdleTimeMs
275      * @return Max idle time in ms.
276      */
getMaxIdleTimeMs()277     public int getMaxIdleTimeMs()
278     {
279         return _maxIdleTimeMs;
280     }
281 
282     /* ------------------------------------------------------------ */
283     /**
284      * @return maximum total time that stop() will wait for threads to die.
285      */
getMaxStopTimeMs()286     public int getMaxStopTimeMs()
287     {
288         return _maxStopTime;
289     }
290 
291     /* ------------------------------------------------------------ */
292     /** Set the maximum number of threads.
293      * Delegated to the named or anonymous Pool.
294      * @see #setMaxThreads
295      * @return maximum number of threads.
296      */
getMaxThreads()297     public int getMaxThreads()
298     {
299         return _maxThreads;
300     }
301 
302     /* ------------------------------------------------------------ */
303     /** Get the minimum number of threads.
304      * Delegated to the named or anonymous Pool.
305      * @see #setMinThreads
306      * @return minimum number of threads.
307      */
getMinThreads()308     public int getMinThreads()
309     {
310         return _minThreads;
311     }
312 
313     /* ------------------------------------------------------------ */
314     /**
315      * @return The name of the BoundedThreadPool.
316      */
getName()317     public String getName()
318     {
319         return _name;
320     }
321 
322     /* ------------------------------------------------------------ */
323     /** Get the priority of the pool threads.
324      *  @return the priority of the pool threads.
325      */
getThreadsPriority()326     public int getThreadsPriority()
327     {
328         return _priority;
329     }
330 
331     /* ------------------------------------------------------------ */
332     /**
333      * Delegated to the named or anonymous Pool.
334      */
isDaemon()335     public boolean isDaemon()
336     {
337         return _daemon;
338     }
339 
340     /* ------------------------------------------------------------ */
isDetailedDump()341     public boolean isDetailedDump()
342     {
343         return _detailedDump;
344     }
345 
346     /* ------------------------------------------------------------ */
setDetailedDump(boolean detailedDump)347     public void setDetailedDump(boolean detailedDump)
348     {
349         _detailedDump = detailedDump;
350     }
351 
352     /* ------------------------------------------------------------ */
dispatch(Runnable job)353     public boolean dispatch(Runnable job)
354     {
355         if (isRunning())
356         {
357             final int jobQ = _jobs.size();
358             final int idle = getIdleThreads();
359             if(_jobs.offer(job))
360             {
361                 // If we had no idle threads or the jobQ is greater than the idle threads
362                 if (idle==0 || jobQ>idle)
363                 {
364                     int threads=_threadsStarted.get();
365                     if (threads<_maxThreads)
366                         startThread(threads);
367                 }
368                 return true;
369             }
370         }
371         LOG.debug("Dispatched {} to stopped {}",job,this);
372         return false;
373     }
374 
375     /* ------------------------------------------------------------ */
execute(Runnable job)376     public void execute(Runnable job)
377     {
378         if (!dispatch(job))
379             throw new RejectedExecutionException();
380     }
381 
382     /* ------------------------------------------------------------ */
383     /**
384      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
385      */
join()386     public void join() throws InterruptedException
387     {
388         synchronized (_joinLock)
389         {
390             while (isRunning())
391                 _joinLock.wait();
392         }
393 
394         while (isStopping())
395             Thread.sleep(1);
396     }
397 
398     /* ------------------------------------------------------------ */
399     /**
400      * @return The total number of threads currently in the pool
401      */
getThreads()402     public int getThreads()
403     {
404         return _threadsStarted.get();
405     }
406 
407     /* ------------------------------------------------------------ */
408     /**
409      * @return The number of idle threads in the pool
410      */
getIdleThreads()411     public int getIdleThreads()
412     {
413         return _threadsIdle.get();
414     }
415 
416     /* ------------------------------------------------------------ */
417     /**
418      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
419      */
isLowOnThreads()420     public boolean isLowOnThreads()
421     {
422         return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
423     }
424 
425     /* ------------------------------------------------------------ */
startThread(int threads)426     private boolean startThread(int threads)
427     {
428         final int next=threads+1;
429         if (!_threadsStarted.compareAndSet(threads,next))
430             return false;
431 
432         boolean started=false;
433         try
434         {
435             Thread thread=newThread(_runnable);
436             thread.setDaemon(_daemon);
437             thread.setPriority(_priority);
438             thread.setName(_name+"-"+thread.getId());
439             _threads.add(thread);
440 
441             thread.start();
442             started=true;
443         }
444         finally
445         {
446             if (!started)
447                 _threadsStarted.decrementAndGet();
448         }
449         return started;
450     }
451 
452     /* ------------------------------------------------------------ */
newThread(Runnable runnable)453     protected Thread newThread(Runnable runnable)
454     {
455         return new Thread(runnable);
456     }
457 
458 
459     /* ------------------------------------------------------------ */
dump()460     public String dump()
461     {
462         return AggregateLifeCycle.dump(this);
463     }
464 
465     /* ------------------------------------------------------------ */
dump(Appendable out, String indent)466     public void dump(Appendable out, String indent) throws IOException
467     {
468         List<Object> dump = new ArrayList<Object>(getMaxThreads());
469         for (final Thread thread: _threads)
470         {
471             final StackTraceElement[] trace=thread.getStackTrace();
472             boolean inIdleJobPoll=false;
473             // trace can be null on early java 6 jvms
474             if (trace != null)
475             {
476                 for (StackTraceElement t : trace)
477                 {
478                     if ("idleJobPoll".equals(t.getMethodName()))
479                     {
480                         inIdleJobPoll = true;
481                         break;
482                     }
483                 }
484             }
485             final boolean idle=inIdleJobPoll;
486 
487             if (_detailedDump)
488             {
489                 dump.add(new Dumpable()
490                 {
491                     public void dump(Appendable out, String indent) throws IOException
492                     {
493                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
494                         if (!idle)
495                             AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
496                     }
497 
498                     public String dump()
499                     {
500                         return null;
501                     }
502                 });
503             }
504             else
505             {
506                 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
507             }
508         }
509 
510         AggregateLifeCycle.dumpObject(out,this);
511         AggregateLifeCycle.dump(out,indent,dump);
512 
513     }
514 
515 
516     /* ------------------------------------------------------------ */
517     @Override
toString()518     public String toString()
519     {
520         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
521     }
522 
523     /* ------------------------------------------------------------ */
idleJobPoll()524     private Runnable idleJobPoll() throws InterruptedException
525     {
526         return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
527     }
528 
529     /* ------------------------------------------------------------ */
530     private Runnable _runnable = new Runnable()
531     {
532         public void run()
533         {
534             boolean shrink=false;
535             try
536             {
537                 Runnable job=_jobs.poll();
538                 while (isRunning())
539                 {
540                     // Job loop
541                     while (job!=null && isRunning())
542                     {
543                         runJob(job);
544                         job=_jobs.poll();
545                     }
546 
547                     // Idle loop
548                     try
549                     {
550                         _threadsIdle.incrementAndGet();
551 
552                         while (isRunning() && job==null)
553                         {
554                             if (_maxIdleTimeMs<=0)
555                                 job=_jobs.take();
556                             else
557                             {
558                                 // maybe we should shrink?
559                                 final int size=_threadsStarted.get();
560                                 if (size>_minThreads)
561                                 {
562                                     long last=_lastShrink.get();
563                                     long now=System.currentTimeMillis();
564                                     if (last==0 || (now-last)>_maxIdleTimeMs)
565                                     {
566                                         shrink=_lastShrink.compareAndSet(last,now) &&
567                                         _threadsStarted.compareAndSet(size,size-1);
568                                         if (shrink)
569                                             return;
570                                     }
571                                 }
572                                 job=idleJobPoll();
573                             }
574                         }
575                     }
576                     finally
577                     {
578                         _threadsIdle.decrementAndGet();
579                     }
580                 }
581             }
582             catch(InterruptedException e)
583             {
584                 LOG.ignore(e);
585             }
586             catch(Exception e)
587             {
588                 LOG.warn(e);
589             }
590             finally
591             {
592                 if (!shrink)
593                     _threadsStarted.decrementAndGet();
594                 _threads.remove(Thread.currentThread());
595             }
596         }
597     };
598 
599     /* ------------------------------------------------------------ */
600     /**
601      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
602      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
603      *
604      * @param job the job to run
605      */
runJob(Runnable job)606     protected void runJob(Runnable job)
607     {
608         job.run();
609     }
610 
611     /* ------------------------------------------------------------ */
612     /**
613      * @return the job queue
614      */
getQueue()615     protected BlockingQueue<Runnable> getQueue()
616     {
617         return _jobs;
618     }
619 
620     /* ------------------------------------------------------------ */
621     /**
622      * @param id The thread ID to stop.
623      * @return true if the thread was found and stopped.
624      * @deprecated Use {@link #interruptThread(long)} in preference
625      */
626     @Deprecated
stopThread(long id)627     public boolean stopThread(long id)
628     {
629         for (Thread thread: _threads)
630         {
631             if (thread.getId()==id)
632             {
633                 thread.stop();
634                 return true;
635             }
636         }
637         return false;
638     }
639 
640     /* ------------------------------------------------------------ */
641     /**
642      * @param id The thread ID to interrupt.
643      * @return true if the thread was found and interrupted.
644      */
interruptThread(long id)645     public boolean interruptThread(long id)
646     {
647         for (Thread thread: _threads)
648         {
649             if (thread.getId()==id)
650             {
651                 thread.interrupt();
652                 return true;
653             }
654         }
655         return false;
656     }
657 
658     /* ------------------------------------------------------------ */
659     /**
660      * @param id The thread ID to interrupt.
661      * @return true if the thread was found and interrupted.
662      */
dumpThread(long id)663     public String dumpThread(long id)
664     {
665         for (Thread thread: _threads)
666         {
667             if (thread.getId()==id)
668             {
669                 StringBuilder buf = new StringBuilder();
670                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
671                 for (StackTraceElement element : thread.getStackTrace())
672                     buf.append("  at ").append(element.toString()).append('\n');
673                 return buf.toString();
674             }
675         }
676         return null;
677     }
678 }
679