1 package org.testng.internal;
2 
3 import org.testng.TestNGException;
4 import org.testng.collections.Lists;
5 
6 import java.util.List;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.ExecutorCompletionService;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.ThreadFactory;
14 
15 /**
16  * Simple wrapper for an ExecutorCompletionService.
17  */
18 public class PoolService<FutureType> {
19 
20   private ExecutorCompletionService<FutureType> m_completionService;
21   private ThreadFactory m_threadFactory;
22   private ExecutorService m_executor;
23 
PoolService(int threadPoolSize)24   public PoolService(int threadPoolSize) {
25     m_threadFactory = new ThreadFactory() {
26       private int m_threadIndex = 0;
27 
28       @Override
29       public Thread newThread(Runnable r) {
30         Thread result = new Thread(r);
31         result.setName("PoolService-" + m_threadIndex);
32         m_threadIndex++;
33         return result;
34       }
35     };
36     m_executor = Executors.newFixedThreadPool(threadPoolSize, m_threadFactory);
37     m_completionService = new ExecutorCompletionService<>(m_executor);
38   }
39 
submitTasksAndWait(List<? extends Callable<FutureType>> tasks)40   public List<FutureType> submitTasksAndWait(List<? extends Callable<FutureType>> tasks) {
41     List<FutureType> result = Lists.newArrayList();
42 
43     for (Callable<FutureType> callable : tasks) {
44       m_completionService.submit(callable);
45     }
46     for (int i = 0; i < tasks.size(); i++) {
47       try {
48         Future<FutureType> take = m_completionService.take();
49         result.add(take.get());
50       } catch (InterruptedException | ExecutionException e) {
51         throw new TestNGException(e);
52       }
53     }
54 
55     m_executor.shutdown();
56     return result;
57   }
58 }
59