1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package vogar.tasks;
18 
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import vogar.Console;
27 import vogar.Result;
28 import vogar.util.Threads;
29 
30 /**
31  * A set of tasks to execute.
32  */
33 public final class TaskQueue {
34     private static final int FOREVER = 60 * 60 * 24 * 28; // four weeks
35     private final Console console;
36     private int runningTasks;
37     private int runningActions;
38     private int maxConcurrentActions;
39     private final LinkedList<Task> tasks = new LinkedList<Task>();
40     private final LinkedList<Task> runnableActions = new LinkedList<Task>();
41     private final LinkedList<Task> runnableTasks = new LinkedList<Task>();
42     private final List<Task> failedTasks = new ArrayList<Task>();
43 
TaskQueue(Console console, int maxConcurrentActions)44     public TaskQueue(Console console, int maxConcurrentActions) {
45         this.console = console;
46         this.maxConcurrentActions = maxConcurrentActions;
47     }
48 
49     /**
50      * Adds a task to the queue.
51      */
enqueue(Task task)52     public synchronized void enqueue(Task task) {
53         tasks.add(task);
54     }
55 
enqueueAll(Collection<Task> tasks)56     public void enqueueAll(Collection<Task> tasks) {
57         this.tasks.addAll(tasks);
58     }
59 
getTasks()60     public synchronized List<Task> getTasks() {
61         return new ArrayList<Task>(tasks);
62     }
63 
runTasks()64     public void runTasks() {
65         promoteBlockedTasks();
66 
67         ExecutorService runners = Threads.threadPerCpuExecutor(console, "TaskQueue");
68         for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
69             runners.execute(new Runnable() {
70                 @Override public void run() {
71                     while (runOneTask()) {
72                     }
73                 }
74             });
75         }
76 
77         runners.shutdown();
78         try {
79             runners.awaitTermination(FOREVER, TimeUnit.SECONDS);
80         } catch (InterruptedException e) {
81             throw new AssertionError();
82         }
83     }
84 
printTasks()85     public void printTasks() {
86         if (!console.isVerbose()) {
87             return;
88         }
89 
90         int i = 0;
91         for (Task task : tasks) {
92             StringBuilder message = new StringBuilder()
93                     .append("Task ").append(i++).append(": ").append(task);
94             for (Task blocker : task.tasksThatMustFinishFirst) {
95                 message.append("\n  depends on completed task: ").append(blocker);
96             }
97             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
98                 message.append("\n  depends on successful task: ").append(blocker);
99             }
100             console.verbose(message.toString());
101         }
102     }
103 
printProblemTasks()104     public void printProblemTasks() {
105         for (Task task : failedTasks) {
106             String message = "Failed task: " + task + " " + task.result;
107             if (task.thrown != null) {
108                 console.info(message, task.thrown);
109             } else {
110                 console.info(message);
111             }
112         }
113         if (!console.isVerbose()) {
114             return;
115         }
116         for (Task task : tasks) {
117             StringBuilder message = new StringBuilder()
118                     .append("Failed to execute task: ").append(task);
119             for (Task blocker : task.tasksThatMustFinishFirst) {
120                 if (blocker.result == null) {
121                     message.append("\n  blocked by unexecuted task: ").append(blocker);
122                 }
123             }
124             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
125                 if (blocker.result == null) {
126                     message.append("\n  blocked by unexecuted task: ").append(blocker);
127                 } else if (blocker.result != Result.SUCCESS) {
128                     message.append("\n  blocked by unsuccessful task: ").append(blocker);
129                 }
130             }
131             console.verbose(message.toString());
132         }
133     }
134 
runOneTask()135     private boolean runOneTask() {
136         Task task = takeTask();
137         if (task == null) {
138             return false;
139         }
140         String threadName = Thread.currentThread().getName();
141         Thread.currentThread().setName(task.toString());
142         try {
143             task.run(console);
144         } finally {
145             doneTask(task);
146             Thread.currentThread().setName(threadName);
147         }
148         return true;
149     }
150 
takeTask()151     private synchronized Task takeTask() {
152         while (true) {
153             Task task = null;
154             if (runningActions < maxConcurrentActions) {
155                 task = runnableActions.poll();
156             }
157             if (task == null) {
158                 task = runnableTasks.poll();
159             }
160 
161             if (task != null) {
162                 runningTasks++;
163                 if (task.isAction()) {
164                     runningActions++;
165                 }
166                 return task;
167             }
168 
169             if (isExhausted()) {
170                 return null;
171             }
172 
173             try {
174                 wait();
175             } catch (InterruptedException e) {
176                 throw new AssertionError();
177             }
178         }
179     }
180 
doneTask(Task task)181     private synchronized void doneTask(Task task) {
182         if (task.result != Result.SUCCESS) {
183             failedTasks.add(task);
184         }
185         runningTasks--;
186         if (task.isAction()) {
187             runningActions--;
188         }
189         promoteBlockedTasks();
190         if (isExhausted()) {
191             notifyAll();
192         }
193     }
194 
promoteBlockedTasks()195     private synchronized void promoteBlockedTasks() {
196         for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
197             Task potentiallyUnblocked = it.next();
198             if (potentiallyUnblocked.isRunnable()) {
199                 it.remove();
200                 if (potentiallyUnblocked.isAction()) {
201                     runnableActions.add(potentiallyUnblocked);
202                 } else {
203                     runnableTasks.add(potentiallyUnblocked);
204                 }
205                 notifyAll();
206             }
207         }
208     }
209 
210     /**
211      * Returns true if there are no tasks to run and no tasks currently running.
212      */
isExhausted()213     private boolean isExhausted() {
214         return runnableTasks.isEmpty() && runnableActions.isEmpty() && runningTasks == 0;
215     }
216 }
217