1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // #define LOG_NDEBUG 0
18 #define LOG_TAG "WorkQueue"
19 
20 #include <utils/Log.h>
21 #include "WorkQueue.h"
22 
23 namespace android {
24 
25 // --- WorkQueue ---
26 
WorkQueue(size_t maxThreads,bool canCallJava)27 WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
28         mMaxThreads(maxThreads), mCanCallJava(canCallJava),
29         mCanceled(false), mFinished(false), mIdleThreads(0) {
30 }
31 
~WorkQueue()32 WorkQueue::~WorkQueue() {
33     if (!cancel()) {
34         finish();
35     }
36 }
37 
schedule(WorkUnit * workUnit,size_t backlog)38 status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
39     AutoMutex _l(mLock);
40 
41     if (mFinished || mCanceled) {
42         return INVALID_OPERATION;
43     }
44 
45     if (mWorkThreads.size() < mMaxThreads
46             && mIdleThreads < mWorkUnits.size() + 1) {
47         sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
48         status_t status = workThread->run("WorkQueue::WorkThread");
49         if (status) {
50             return status;
51         }
52         mWorkThreads.add(workThread);
53         mIdleThreads += 1;
54     } else if (backlog) {
55         while (mWorkUnits.size() >= mMaxThreads * backlog) {
56             mWorkDequeuedCondition.wait(mLock);
57             if (mFinished || mCanceled) {
58                 return INVALID_OPERATION;
59             }
60         }
61     }
62 
63     mWorkUnits.add(workUnit);
64     mWorkChangedCondition.broadcast();
65     return OK;
66 }
67 
cancel()68 status_t WorkQueue::cancel() {
69     AutoMutex _l(mLock);
70 
71     return cancelLocked();
72 }
73 
cancelLocked()74 status_t WorkQueue::cancelLocked() {
75     if (mFinished) {
76         return INVALID_OPERATION;
77     }
78 
79     if (!mCanceled) {
80         mCanceled = true;
81 
82         size_t count = mWorkUnits.size();
83         for (size_t i = 0; i < count; i++) {
84             delete mWorkUnits.itemAt(i);
85         }
86         mWorkUnits.clear();
87         mWorkChangedCondition.broadcast();
88         mWorkDequeuedCondition.broadcast();
89     }
90     return OK;
91 }
92 
finish()93 status_t WorkQueue::finish() {
94     { // acquire lock
95         AutoMutex _l(mLock);
96 
97         if (mFinished) {
98             return INVALID_OPERATION;
99         }
100 
101         mFinished = true;
102         mWorkChangedCondition.broadcast();
103     } // release lock
104 
105     // It is not possible for the list of work threads to change once the mFinished
106     // flag has been set, so we can access mWorkThreads outside of the lock here.
107     size_t count = mWorkThreads.size();
108     for (size_t i = 0; i < count; i++) {
109         mWorkThreads.itemAt(i)->join();
110     }
111     mWorkThreads.clear();
112     return OK;
113 }
114 
threadLoop()115 bool WorkQueue::threadLoop() {
116     WorkUnit* workUnit;
117     { // acquire lock
118         AutoMutex _l(mLock);
119 
120         for (;;) {
121             if (mCanceled) {
122                 return false;
123             }
124 
125             if (!mWorkUnits.isEmpty()) {
126                 workUnit = mWorkUnits.itemAt(0);
127                 mWorkUnits.removeAt(0);
128                 mIdleThreads -= 1;
129                 mWorkDequeuedCondition.broadcast();
130                 break;
131             }
132 
133             if (mFinished) {
134                 return false;
135             }
136 
137             mWorkChangedCondition.wait(mLock);
138         }
139     } // release lock
140 
141     bool shouldContinue = workUnit->run();
142     delete workUnit;
143 
144     { // acquire lock
145         AutoMutex _l(mLock);
146 
147         mIdleThreads += 1;
148 
149         if (!shouldContinue) {
150             cancelLocked();
151             return false;
152         }
153     } // release lock
154 
155     return true;
156 }
157 
158 // --- WorkQueue::WorkThread ---
159 
WorkThread(WorkQueue * workQueue,bool canCallJava)160 WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
161         Thread(canCallJava), mWorkQueue(workQueue) {
162 }
163 
~WorkThread()164 WorkQueue::WorkThread::~WorkThread() {
165 }
166 
threadLoop()167 bool WorkQueue::WorkThread::threadLoop() {
168     return mWorkQueue->threadLoop();
169 }
170 
171 };  // namespace android
172