1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package vogar.tasks;
18 
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import vogar.Console;
27 import vogar.Result;
28 import vogar.util.Threads;
29 
30 /**
31  * A set of tasks to execute.
32  */
33 public final class TaskQueue {
34     private static final int FOREVER = 60 * 60 * 24 * 28; // four weeks
35     private final Console console;
36     private int runningTasks;
37     private int runningActions;
38     private int maxConcurrentActions;
39     private final LinkedList<Task> tasks = new LinkedList<Task>();
40     private final LinkedList<Task> runnableActions = new LinkedList<Task>();
41     private final LinkedList<Task> runnableTasks = new LinkedList<Task>();
42     private final List<Task> failedTasks = new ArrayList<Task>();
43 
TaskQueue(Console console, int maxConcurrentActions)44     public TaskQueue(Console console, int maxConcurrentActions) {
45         this.console = console;
46         this.maxConcurrentActions = maxConcurrentActions;
47     }
48 
49     /**
50      * Adds a task to the queue.
51      */
enqueue(Task task)52     public synchronized void enqueue(Task task) {
53         tasks.add(task);
54     }
55 
enqueueAll(Collection<Task> tasks)56     public void enqueueAll(Collection<Task> tasks) {
57         this.tasks.addAll(tasks);
58     }
59 
getTasks()60     public synchronized List<Task> getTasks() {
61         return new ArrayList<Task>(tasks);
62     }
63 
runTasks()64     public void runTasks() {
65         promoteBlockedTasks();
66 
67         ExecutorService runners = Threads.threadPerCpuExecutor(console, "TaskQueue");
68         for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
69             runners.execute(new Runnable() {
70                 @Override public void run() {
71                     while (runOneTask()) {
72                     }
73                 }
74             });
75         }
76 
77         runners.shutdown();
78         try {
79             runners.awaitTermination(FOREVER, TimeUnit.SECONDS);
80         } catch (InterruptedException e) {
81             throw new AssertionError();
82         }
83     }
84 
printTasks()85     public void printTasks() {
86         if (!console.isVerbose()) {
87             return;
88         }
89 
90         int i = 0;
91         for (Task task : tasks) {
92             StringBuilder message = new StringBuilder()
93                     .append("Task ").append(i++).append(": ").append(task);
94             for (Task blocker : task.tasksThatMustFinishFirst) {
95                 message.append("\n  depends on completed task: ").append(blocker);
96             }
97             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
98                 message.append("\n  depends on successful task: ").append(blocker);
99             }
100             console.verbose(message.toString());
101         }
102     }
103 
hasFailedTasks()104     public boolean hasFailedTasks() {
105         return !failedTasks.isEmpty();
106     }
107 
printProblemTasks()108     public void printProblemTasks() {
109         for (Task task : failedTasks) {
110             String message = "Failed task: " + task + " " + task.result;
111             if (task.thrown != null) {
112                 console.info(message, task.thrown);
113             } else {
114                 console.info(message);
115             }
116         }
117         if (!console.isVerbose()) {
118             return;
119         }
120         for (Task task : tasks) {
121             StringBuilder message = new StringBuilder()
122                     .append("Failed to execute task: ").append(task);
123             for (Task blocker : task.tasksThatMustFinishFirst) {
124                 if (blocker.result == null) {
125                     message.append("\n  blocked by unexecuted task: ").append(blocker);
126                 }
127             }
128             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
129                 if (blocker.result == null) {
130                     message.append("\n  blocked by unexecuted task: ").append(blocker);
131                 } else if (blocker.result != Result.SUCCESS) {
132                     message.append("\n  blocked by unsuccessful task: ").append(blocker);
133                 }
134             }
135             console.verbose(message.toString());
136         }
137     }
138 
runOneTask()139     private boolean runOneTask() {
140         Task task = takeTask();
141         if (task == null) {
142             return false;
143         }
144         String threadName = Thread.currentThread().getName();
145         Thread.currentThread().setName(task.toString());
146         try {
147             task.run(console);
148         } finally {
149             doneTask(task);
150             Thread.currentThread().setName(threadName);
151         }
152         return true;
153     }
154 
takeTask()155     private synchronized Task takeTask() {
156         while (true) {
157             Task task = null;
158             if (runningActions < maxConcurrentActions) {
159                 task = runnableActions.poll();
160             }
161             if (task == null) {
162                 task = runnableTasks.poll();
163             }
164 
165             if (task != null) {
166                 runningTasks++;
167                 if (task.isAction()) {
168                     runningActions++;
169                 }
170                 return task;
171             }
172 
173             if (isExhausted()) {
174                 return null;
175             }
176 
177             try {
178                 wait();
179             } catch (InterruptedException e) {
180                 throw new AssertionError();
181             }
182         }
183     }
184 
doneTask(Task task)185     private synchronized void doneTask(Task task) {
186         if (task.result != Result.SUCCESS) {
187             failedTasks.add(task);
188         }
189         runningTasks--;
190         if (task.isAction()) {
191             runningActions--;
192         }
193         promoteBlockedTasks();
194         if (isExhausted()) {
195             notifyAll();
196         }
197     }
198 
promoteBlockedTasks()199     private synchronized void promoteBlockedTasks() {
200         for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
201             Task potentiallyUnblocked = it.next();
202             if (potentiallyUnblocked.isRunnable()) {
203                 it.remove();
204                 if (potentiallyUnblocked.isAction()) {
205                     runnableActions.add(potentiallyUnblocked);
206                 } else {
207                     runnableTasks.add(potentiallyUnblocked);
208                 }
209                 notifyAll();
210             }
211         }
212     }
213 
214     /**
215      * Returns true if there are no tasks to run and no tasks currently running.
216      */
isExhausted()217     private boolean isExhausted() {
218         return runnableTasks.isEmpty() && runnableActions.isEmpty() && runningTasks == 0;
219     }
220 }
221