1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 20 package org.eclipse.jetty.util.thread; 21 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.List; 26 import java.util.concurrent.ArrayBlockingQueue; 27 import java.util.concurrent.BlockingQueue; 28 import java.util.concurrent.ConcurrentLinkedQueue; 29 import java.util.concurrent.Executor; 30 import java.util.concurrent.RejectedExecutionException; 31 import java.util.concurrent.TimeUnit; 32 import java.util.concurrent.atomic.AtomicInteger; 33 import java.util.concurrent.atomic.AtomicLong; 34 35 import org.eclipse.jetty.util.BlockingArrayQueue; 36 import org.eclipse.jetty.util.component.AbstractLifeCycle; 37 import org.eclipse.jetty.util.component.AggregateLifeCycle; 38 import org.eclipse.jetty.util.component.Dumpable; 39 import org.eclipse.jetty.util.component.LifeCycle; 40 import org.eclipse.jetty.util.log.Log; 41 import org.eclipse.jetty.util.log.Logger; 42 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; 43 44 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable 45 { 46 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); 47 48 private final AtomicInteger _threadsStarted = new AtomicInteger(); 49 private final AtomicInteger _threadsIdle = new AtomicInteger(); 50 private final AtomicLong _lastShrink = new AtomicLong(); 51 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); 52 private final Object _joinLock = new Object(); 53 private BlockingQueue<Runnable> _jobs; 54 private String _name; 55 private int _maxIdleTimeMs=60000; 56 private int _maxThreads=254; 57 private int _minThreads=8; 58 private int _maxQueued=-1; 59 private int _priority=Thread.NORM_PRIORITY; 60 private boolean _daemon=false; 61 private int _maxStopTime=100; 62 private boolean _detailedDump=false; 63 64 /* ------------------------------------------------------------------- */ 65 /** Construct 66 */ QueuedThreadPool()67 public QueuedThreadPool() 68 { 69 _name="qtp"+super.hashCode(); 70 } 71 72 /* ------------------------------------------------------------------- */ 73 /** Construct 74 */ QueuedThreadPool(int maxThreads)75 public QueuedThreadPool(int maxThreads) 76 { 77 this(); 78 setMaxThreads(maxThreads); 79 } 80 81 /* ------------------------------------------------------------------- */ 82 /** Construct 83 */ QueuedThreadPool(BlockingQueue<Runnable> jobQ)84 public QueuedThreadPool(BlockingQueue<Runnable> jobQ) 85 { 86 this(); 87 _jobs=jobQ; 88 _jobs.clear(); 89 } 90 91 92 /* ------------------------------------------------------------ */ 93 @Override doStart()94 protected void doStart() throws Exception 95 { 96 super.doStart(); 97 _threadsStarted.set(0); 98 99 if (_jobs==null) 100 { 101 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) 102 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); 103 } 104 105 int threads=_threadsStarted.get(); 106 while (isRunning() && threads<_minThreads) 107 { 108 startThread(threads); 109 threads=_threadsStarted.get(); 110 } 111 } 112 113 /* ------------------------------------------------------------ */ 114 @Override doStop()115 protected void doStop() throws Exception 116 { 117 super.doStop(); 118 long start=System.currentTimeMillis(); 119 120 // let jobs complete naturally for a while 121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) 122 Thread.sleep(1); 123 124 // kill queued jobs and flush out idle jobs 125 _jobs.clear(); 126 Runnable noop = new Runnable(){public void run(){}}; 127 for (int i=_threadsIdle.get();i-->0;) 128 _jobs.offer(noop); 129 Thread.yield(); 130 131 // interrupt remaining threads 132 if (_threadsStarted.get()>0) 133 for (Thread thread : _threads) 134 thread.interrupt(); 135 136 // wait for remaining threads to die 137 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) 138 { 139 Thread.sleep(1); 140 } 141 Thread.yield(); 142 int size=_threads.size(); 143 if (size>0) 144 { 145 LOG.warn(size+" threads could not be stopped"); 146 147 if (size==1 || LOG.isDebugEnabled()) 148 { 149 for (Thread unstopped : _threads) 150 { 151 LOG.info("Couldn't stop "+unstopped); 152 for (StackTraceElement element : unstopped.getStackTrace()) 153 { 154 LOG.info(" at "+element); 155 } 156 } 157 } 158 } 159 160 synchronized (_joinLock) 161 { 162 _joinLock.notifyAll(); 163 } 164 } 165 166 /* ------------------------------------------------------------ */ 167 /** 168 * Delegated to the named or anonymous Pool. 169 */ setDaemon(boolean daemon)170 public void setDaemon(boolean daemon) 171 { 172 _daemon=daemon; 173 } 174 175 /* ------------------------------------------------------------ */ 176 /** Set the maximum thread idle time. 177 * Threads that are idle for longer than this period may be 178 * stopped. 179 * Delegated to the named or anonymous Pool. 180 * @see #getMaxIdleTimeMs 181 * @param maxIdleTimeMs Max idle time in ms. 182 */ setMaxIdleTimeMs(int maxIdleTimeMs)183 public void setMaxIdleTimeMs(int maxIdleTimeMs) 184 { 185 _maxIdleTimeMs=maxIdleTimeMs; 186 } 187 188 /* ------------------------------------------------------------ */ 189 /** 190 * @param stopTimeMs maximum total time that stop() will wait for threads to die. 191 */ setMaxStopTimeMs(int stopTimeMs)192 public void setMaxStopTimeMs(int stopTimeMs) 193 { 194 _maxStopTime = stopTimeMs; 195 } 196 197 /* ------------------------------------------------------------ */ 198 /** Set the maximum number of threads. 199 * Delegated to the named or anonymous Pool. 200 * @see #getMaxThreads 201 * @param maxThreads maximum number of threads. 202 */ setMaxThreads(int maxThreads)203 public void setMaxThreads(int maxThreads) 204 { 205 _maxThreads=maxThreads; 206 if (_minThreads>_maxThreads) 207 _minThreads=_maxThreads; 208 } 209 210 /* ------------------------------------------------------------ */ 211 /** Set the minimum number of threads. 212 * Delegated to the named or anonymous Pool. 213 * @see #getMinThreads 214 * @param minThreads minimum number of threads 215 */ setMinThreads(int minThreads)216 public void setMinThreads(int minThreads) 217 { 218 _minThreads=minThreads; 219 220 if (_minThreads>_maxThreads) 221 _maxThreads=_minThreads; 222 223 int threads=_threadsStarted.get(); 224 while (isStarted() && threads<_minThreads) 225 { 226 startThread(threads); 227 threads=_threadsStarted.get(); 228 } 229 } 230 231 /* ------------------------------------------------------------ */ 232 /** 233 * @param name Name of the BoundedThreadPool to use when naming Threads. 234 */ setName(String name)235 public void setName(String name) 236 { 237 if (isRunning()) 238 throw new IllegalStateException("started"); 239 _name= name; 240 } 241 242 /* ------------------------------------------------------------ */ 243 /** Set the priority of the pool threads. 244 * @param priority the new thread priority. 245 */ setThreadsPriority(int priority)246 public void setThreadsPriority(int priority) 247 { 248 _priority=priority; 249 } 250 251 /* ------------------------------------------------------------ */ 252 /** 253 * @return maximum queue size 254 */ getMaxQueued()255 public int getMaxQueued() 256 { 257 return _maxQueued; 258 } 259 260 /* ------------------------------------------------------------ */ 261 /** 262 * @param max job queue size 263 */ setMaxQueued(int max)264 public void setMaxQueued(int max) 265 { 266 if (isRunning()) 267 throw new IllegalStateException("started"); 268 _maxQueued=max; 269 } 270 271 /* ------------------------------------------------------------ */ 272 /** Get the maximum thread idle time. 273 * Delegated to the named or anonymous Pool. 274 * @see #setMaxIdleTimeMs 275 * @return Max idle time in ms. 276 */ getMaxIdleTimeMs()277 public int getMaxIdleTimeMs() 278 { 279 return _maxIdleTimeMs; 280 } 281 282 /* ------------------------------------------------------------ */ 283 /** 284 * @return maximum total time that stop() will wait for threads to die. 285 */ getMaxStopTimeMs()286 public int getMaxStopTimeMs() 287 { 288 return _maxStopTime; 289 } 290 291 /* ------------------------------------------------------------ */ 292 /** Set the maximum number of threads. 293 * Delegated to the named or anonymous Pool. 294 * @see #setMaxThreads 295 * @return maximum number of threads. 296 */ getMaxThreads()297 public int getMaxThreads() 298 { 299 return _maxThreads; 300 } 301 302 /* ------------------------------------------------------------ */ 303 /** Get the minimum number of threads. 304 * Delegated to the named or anonymous Pool. 305 * @see #setMinThreads 306 * @return minimum number of threads. 307 */ getMinThreads()308 public int getMinThreads() 309 { 310 return _minThreads; 311 } 312 313 /* ------------------------------------------------------------ */ 314 /** 315 * @return The name of the BoundedThreadPool. 316 */ getName()317 public String getName() 318 { 319 return _name; 320 } 321 322 /* ------------------------------------------------------------ */ 323 /** Get the priority of the pool threads. 324 * @return the priority of the pool threads. 325 */ getThreadsPriority()326 public int getThreadsPriority() 327 { 328 return _priority; 329 } 330 331 /* ------------------------------------------------------------ */ 332 /** 333 * Delegated to the named or anonymous Pool. 334 */ isDaemon()335 public boolean isDaemon() 336 { 337 return _daemon; 338 } 339 340 /* ------------------------------------------------------------ */ isDetailedDump()341 public boolean isDetailedDump() 342 { 343 return _detailedDump; 344 } 345 346 /* ------------------------------------------------------------ */ setDetailedDump(boolean detailedDump)347 public void setDetailedDump(boolean detailedDump) 348 { 349 _detailedDump = detailedDump; 350 } 351 352 /* ------------------------------------------------------------ */ dispatch(Runnable job)353 public boolean dispatch(Runnable job) 354 { 355 if (isRunning()) 356 { 357 final int jobQ = _jobs.size(); 358 final int idle = getIdleThreads(); 359 if(_jobs.offer(job)) 360 { 361 // If we had no idle threads or the jobQ is greater than the idle threads 362 if (idle==0 || jobQ>idle) 363 { 364 int threads=_threadsStarted.get(); 365 if (threads<_maxThreads) 366 startThread(threads); 367 } 368 return true; 369 } 370 } 371 LOG.debug("Dispatched {} to stopped {}",job,this); 372 return false; 373 } 374 375 /* ------------------------------------------------------------ */ execute(Runnable job)376 public void execute(Runnable job) 377 { 378 if (!dispatch(job)) 379 throw new RejectedExecutionException(); 380 } 381 382 /* ------------------------------------------------------------ */ 383 /** 384 * Blocks until the thread pool is {@link LifeCycle#stop stopped}. 385 */ join()386 public void join() throws InterruptedException 387 { 388 synchronized (_joinLock) 389 { 390 while (isRunning()) 391 _joinLock.wait(); 392 } 393 394 while (isStopping()) 395 Thread.sleep(1); 396 } 397 398 /* ------------------------------------------------------------ */ 399 /** 400 * @return The total number of threads currently in the pool 401 */ getThreads()402 public int getThreads() 403 { 404 return _threadsStarted.get(); 405 } 406 407 /* ------------------------------------------------------------ */ 408 /** 409 * @return The number of idle threads in the pool 410 */ getIdleThreads()411 public int getIdleThreads() 412 { 413 return _threadsIdle.get(); 414 } 415 416 /* ------------------------------------------------------------ */ 417 /** 418 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs 419 */ isLowOnThreads()420 public boolean isLowOnThreads() 421 { 422 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); 423 } 424 425 /* ------------------------------------------------------------ */ startThread(int threads)426 private boolean startThread(int threads) 427 { 428 final int next=threads+1; 429 if (!_threadsStarted.compareAndSet(threads,next)) 430 return false; 431 432 boolean started=false; 433 try 434 { 435 Thread thread=newThread(_runnable); 436 thread.setDaemon(_daemon); 437 thread.setPriority(_priority); 438 thread.setName(_name+"-"+thread.getId()); 439 _threads.add(thread); 440 441 thread.start(); 442 started=true; 443 } 444 finally 445 { 446 if (!started) 447 _threadsStarted.decrementAndGet(); 448 } 449 return started; 450 } 451 452 /* ------------------------------------------------------------ */ newThread(Runnable runnable)453 protected Thread newThread(Runnable runnable) 454 { 455 return new Thread(runnable); 456 } 457 458 459 /* ------------------------------------------------------------ */ dump()460 public String dump() 461 { 462 return AggregateLifeCycle.dump(this); 463 } 464 465 /* ------------------------------------------------------------ */ dump(Appendable out, String indent)466 public void dump(Appendable out, String indent) throws IOException 467 { 468 List<Object> dump = new ArrayList<Object>(getMaxThreads()); 469 for (final Thread thread: _threads) 470 { 471 final StackTraceElement[] trace=thread.getStackTrace(); 472 boolean inIdleJobPoll=false; 473 // trace can be null on early java 6 jvms 474 if (trace != null) 475 { 476 for (StackTraceElement t : trace) 477 { 478 if ("idleJobPoll".equals(t.getMethodName())) 479 { 480 inIdleJobPoll = true; 481 break; 482 } 483 } 484 } 485 final boolean idle=inIdleJobPoll; 486 487 if (_detailedDump) 488 { 489 dump.add(new Dumpable() 490 { 491 public void dump(Appendable out, String indent) throws IOException 492 { 493 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); 494 if (!idle) 495 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); 496 } 497 498 public String dump() 499 { 500 return null; 501 } 502 }); 503 } 504 else 505 { 506 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); 507 } 508 } 509 510 AggregateLifeCycle.dumpObject(out,this); 511 AggregateLifeCycle.dump(out,indent,dump); 512 513 } 514 515 516 /* ------------------------------------------------------------ */ 517 @Override toString()518 public String toString() 519 { 520 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; 521 } 522 523 /* ------------------------------------------------------------ */ idleJobPoll()524 private Runnable idleJobPoll() throws InterruptedException 525 { 526 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); 527 } 528 529 /* ------------------------------------------------------------ */ 530 private Runnable _runnable = new Runnable() 531 { 532 public void run() 533 { 534 boolean shrink=false; 535 try 536 { 537 Runnable job=_jobs.poll(); 538 while (isRunning()) 539 { 540 // Job loop 541 while (job!=null && isRunning()) 542 { 543 runJob(job); 544 job=_jobs.poll(); 545 } 546 547 // Idle loop 548 try 549 { 550 _threadsIdle.incrementAndGet(); 551 552 while (isRunning() && job==null) 553 { 554 if (_maxIdleTimeMs<=0) 555 job=_jobs.take(); 556 else 557 { 558 // maybe we should shrink? 559 final int size=_threadsStarted.get(); 560 if (size>_minThreads) 561 { 562 long last=_lastShrink.get(); 563 long now=System.currentTimeMillis(); 564 if (last==0 || (now-last)>_maxIdleTimeMs) 565 { 566 shrink=_lastShrink.compareAndSet(last,now) && 567 _threadsStarted.compareAndSet(size,size-1); 568 if (shrink) 569 return; 570 } 571 } 572 job=idleJobPoll(); 573 } 574 } 575 } 576 finally 577 { 578 _threadsIdle.decrementAndGet(); 579 } 580 } 581 } 582 catch(InterruptedException e) 583 { 584 LOG.ignore(e); 585 } 586 catch(Exception e) 587 { 588 LOG.warn(e); 589 } 590 finally 591 { 592 if (!shrink) 593 _threadsStarted.decrementAndGet(); 594 _threads.remove(Thread.currentThread()); 595 } 596 } 597 }; 598 599 /* ------------------------------------------------------------ */ 600 /** 601 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> 602 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> 603 * 604 * @param job the job to run 605 */ runJob(Runnable job)606 protected void runJob(Runnable job) 607 { 608 job.run(); 609 } 610 611 /* ------------------------------------------------------------ */ 612 /** 613 * @return the job queue 614 */ getQueue()615 protected BlockingQueue<Runnable> getQueue() 616 { 617 return _jobs; 618 } 619 620 /* ------------------------------------------------------------ */ 621 /** 622 * @param id The thread ID to stop. 623 * @return true if the thread was found and stopped. 624 * @deprecated Use {@link #interruptThread(long)} in preference 625 */ 626 @Deprecated stopThread(long id)627 public boolean stopThread(long id) 628 { 629 for (Thread thread: _threads) 630 { 631 if (thread.getId()==id) 632 { 633 thread.stop(); 634 return true; 635 } 636 } 637 return false; 638 } 639 640 /* ------------------------------------------------------------ */ 641 /** 642 * @param id The thread ID to interrupt. 643 * @return true if the thread was found and interrupted. 644 */ interruptThread(long id)645 public boolean interruptThread(long id) 646 { 647 for (Thread thread: _threads) 648 { 649 if (thread.getId()==id) 650 { 651 thread.interrupt(); 652 return true; 653 } 654 } 655 return false; 656 } 657 658 /* ------------------------------------------------------------ */ 659 /** 660 * @param id The thread ID to interrupt. 661 * @return true if the thread was found and interrupted. 662 */ dumpThread(long id)663 public String dumpThread(long id) 664 { 665 for (Thread thread: _threads) 666 { 667 if (thread.getId()==id) 668 { 669 StringBuilder buf = new StringBuilder(); 670 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); 671 for (StackTraceElement element : thread.getStackTrace()) 672 buf.append(" at ").append(element.toString()).append('\n'); 673 return buf.toString(); 674 } 675 } 676 return null; 677 } 678 } 679