1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.gallery3d.util;
18 
19 import android.util.Log;
20 
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.ThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
25 
26 public class ThreadPool {
27     @SuppressWarnings("unused")
28     private static final String TAG = "ThreadPool";
29     private static final int CORE_POOL_SIZE = 4;
30     private static final int MAX_POOL_SIZE = 8;
31     private static final int KEEP_ALIVE_TIME = 10; // 10 seconds
32 
33     // Resource type
34     public static final int MODE_NONE = 0;
35     public static final int MODE_CPU = 1;
36     public static final int MODE_NETWORK = 2;
37 
38     public static final JobContext JOB_CONTEXT_STUB = new JobContextStub();
39 
40     ResourceCounter mCpuCounter = new ResourceCounter(2);
41     ResourceCounter mNetworkCounter = new ResourceCounter(2);
42 
43     // A Job is like a Callable, but it has an addition JobContext parameter.
44     public interface Job<T> {
run(JobContext jc)45         public T run(JobContext jc);
46     }
47 
48     public interface JobContext {
isCancelled()49         boolean isCancelled();
setCancelListener(CancelListener listener)50         void setCancelListener(CancelListener listener);
setMode(int mode)51         boolean setMode(int mode);
52     }
53 
54     private static class JobContextStub implements JobContext {
55         @Override
isCancelled()56         public boolean isCancelled() {
57             return false;
58         }
59 
60         @Override
setCancelListener(CancelListener listener)61         public void setCancelListener(CancelListener listener) {
62         }
63 
64         @Override
setMode(int mode)65         public boolean setMode(int mode) {
66             return true;
67         }
68     }
69 
70     public interface CancelListener {
onCancel()71         public void onCancel();
72     }
73 
74     private static class ResourceCounter {
75         public int value;
ResourceCounter(int v)76         public ResourceCounter(int v) {
77             value = v;
78         }
79     }
80 
81     private final Executor mExecutor;
82 
ThreadPool()83     public ThreadPool() {
84         this(CORE_POOL_SIZE, MAX_POOL_SIZE);
85     }
86 
ThreadPool(int initPoolSize, int maxPoolSize)87     public ThreadPool(int initPoolSize, int maxPoolSize) {
88         mExecutor = new ThreadPoolExecutor(
89                 initPoolSize, maxPoolSize, KEEP_ALIVE_TIME,
90                 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
91                 new PriorityThreadFactory("thread-pool",
92                 android.os.Process.THREAD_PRIORITY_BACKGROUND));
93     }
94 
95     // Submit a job to the thread pool. The listener will be called when the
96     // job is finished (or cancelled).
submit(Job<T> job, FutureListener<T> listener)97     public <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
98         Worker<T> w = new Worker<T>(job, listener);
99         mExecutor.execute(w);
100         return w;
101     }
102 
submit(Job<T> job)103     public <T> Future<T> submit(Job<T> job) {
104         return submit(job, null);
105     }
106 
107     private class Worker<T> implements Runnable, Future<T>, JobContext {
108         @SuppressWarnings("hiding")
109         private static final String TAG = "Worker";
110         private Job<T> mJob;
111         private FutureListener<T> mListener;
112         private CancelListener mCancelListener;
113         private ResourceCounter mWaitOnResource;
114         private volatile boolean mIsCancelled;
115         private boolean mIsDone;
116         private T mResult;
117         private int mMode;
118 
Worker(Job<T> job, FutureListener<T> listener)119         public Worker(Job<T> job, FutureListener<T> listener) {
120             mJob = job;
121             mListener = listener;
122         }
123 
124         // This is called by a thread in the thread pool.
125         @Override
run()126         public void run() {
127             T result = null;
128 
129             // A job is in CPU mode by default. setMode returns false
130             // if the job is cancelled.
131             if (setMode(MODE_CPU)) {
132                 try {
133                     result = mJob.run(this);
134                 } catch (Throwable ex) {
135                     Log.w(TAG, "Exception in running a job", ex);
136                 }
137             }
138 
139             synchronized(this) {
140                 setMode(MODE_NONE);
141                 mResult = result;
142                 mIsDone = true;
143                 notifyAll();
144             }
145             if (mListener != null) mListener.onFutureDone(this);
146         }
147 
148         // Below are the methods for Future.
149         @Override
cancel()150         public synchronized void cancel() {
151             if (mIsCancelled) return;
152             mIsCancelled = true;
153             if (mWaitOnResource != null) {
154                 synchronized (mWaitOnResource) {
155                     mWaitOnResource.notifyAll();
156                 }
157             }
158             if (mCancelListener != null) {
159                 mCancelListener.onCancel();
160             }
161         }
162 
163         @Override
isCancelled()164         public boolean isCancelled() {
165             return mIsCancelled;
166         }
167 
168         @Override
isDone()169         public synchronized boolean isDone() {
170             return mIsDone;
171         }
172 
173         @Override
get()174         public synchronized T get() {
175             while (!mIsDone) {
176                 try {
177                     wait();
178                 } catch (Exception ex) {
179                     Log.w(TAG, "ingore exception", ex);
180                     // ignore.
181                 }
182             }
183             return mResult;
184         }
185 
186         @Override
waitDone()187         public void waitDone() {
188             get();
189         }
190 
191         // Below are the methods for JobContext (only called from the
192         // thread running the job)
193         @Override
setCancelListener(CancelListener listener)194         public synchronized void setCancelListener(CancelListener listener) {
195             mCancelListener = listener;
196             if (mIsCancelled && mCancelListener != null) {
197                 mCancelListener.onCancel();
198             }
199         }
200 
201         @Override
setMode(int mode)202         public boolean setMode(int mode) {
203             // Release old resource
204             ResourceCounter rc = modeToCounter(mMode);
205             if (rc != null) releaseResource(rc);
206             mMode = MODE_NONE;
207 
208             // Acquire new resource
209             rc = modeToCounter(mode);
210             if (rc != null) {
211                 if (!acquireResource(rc)) {
212                     return false;
213                 }
214                 mMode = mode;
215             }
216 
217             return true;
218         }
219 
modeToCounter(int mode)220         private ResourceCounter modeToCounter(int mode) {
221             if (mode == MODE_CPU) {
222                 return mCpuCounter;
223             } else if (mode == MODE_NETWORK) {
224                 return mNetworkCounter;
225             } else {
226                 return null;
227             }
228         }
229 
acquireResource(ResourceCounter counter)230         private boolean acquireResource(ResourceCounter counter) {
231             while (true) {
232                 synchronized (this) {
233                     if (mIsCancelled) {
234                         mWaitOnResource = null;
235                         return false;
236                     }
237                     mWaitOnResource = counter;
238                 }
239 
240                 synchronized (counter) {
241                     if (counter.value > 0) {
242                         counter.value--;
243                         break;
244                     } else {
245                         try {
246                             counter.wait();
247                         } catch (InterruptedException ex) {
248                             // ignore.
249                         }
250                     }
251                 }
252             }
253 
254             synchronized (this) {
255                 mWaitOnResource = null;
256             }
257 
258             return true;
259         }
260 
releaseResource(ResourceCounter counter)261         private void releaseResource(ResourceCounter counter) {
262             synchronized (counter) {
263                 counter.value++;
264                 counter.notifyAll();
265             }
266         }
267     }
268 }
269