1 /*
2  * workqueue.cpp, workqueue class
3  *
4  * Copyright (c) 2009-2010 Wind River Systems, Inc.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include <workqueue.h>
20 #include <sys/prctl.h>
21 
WorkQueue()22 WorkQueue::WorkQueue()
23 {
24     stop = false;
25     executing = true;
26     wait_for_works = false;
27     works = NULL;
28 
29     pthread_mutex_init(&wlock, NULL);
30     pthread_cond_init(&wcond, NULL);
31 
32     pthread_mutex_init(&executing_lock, NULL);
33     pthread_cond_init(&executing_wait, NULL);
34     pthread_cond_init(&paused_wait, NULL);
35 }
36 
~WorkQueue()37 WorkQueue::~WorkQueue()
38 {
39     StopWork();
40 
41     pthread_cond_destroy(&wcond);
42     pthread_mutex_destroy(&wlock);
43 
44     pthread_cond_destroy(&paused_wait);
45     pthread_cond_destroy(&executing_wait);
46     pthread_mutex_destroy(&executing_lock);
47 }
48 
StartWork(bool executing)49 int WorkQueue::StartWork(bool executing)
50 {
51     this->executing = executing;
52 
53     pthread_mutex_lock(&wlock);
54     stop = false;
55     pthread_mutex_unlock(&wlock);
56     return Start();
57 }
58 
StopWork(void)59 void WorkQueue::StopWork(void)
60 {
61     /* discard all scheduled works */
62     pthread_mutex_lock(&wlock);
63     while (works)
64         works = __list_delete(works, works);
65     pthread_mutex_unlock(&wlock);
66 
67     /*  wakeup DoWork() if it's sleeping */
68     /*
69      * FIXME
70      *
71      * if DoWork() is sleeping, Work()'s called one more time at this moment.
72      * if DoWork()::wi->Work() called ScheduleWork() (self-rescheduling),
73      * this function would be sleeping forever at Join() because works list
74      * never be empty.
75      */
76     ResumeWork();
77 
78     pthread_mutex_lock(&wlock);
79     stop = true;
80     pthread_cond_signal(&wcond); /* wakeup Run() if it's sleeping */
81     pthread_mutex_unlock(&wlock);
82 
83     Join();
84 }
85 
86 /* it returns when Run() is sleeping at executing_wait or at wcond */
PauseWork(void)87 void WorkQueue::PauseWork(void)
88 {
89     pthread_mutex_lock(&executing_lock);
90     executing = false;
91     /* this prevents deadlock if Run() is sleeping with locking wcond */
92     if (!wait_for_works)
93         pthread_cond_wait(&paused_wait, &executing_lock); /* wokeup by Run() */
94     pthread_mutex_unlock(&executing_lock);
95 }
96 
ResumeWork(void)97 void WorkQueue::ResumeWork(void)
98 {
99     pthread_mutex_lock(&executing_lock);
100     executing = true;
101     pthread_cond_signal(&executing_wait);
102     pthread_mutex_unlock(&executing_lock);
103 }
104 
Run(void)105 void WorkQueue::Run(void)
106 {
107     prctl(PR_SET_NAME, (unsigned long)"IntelHwCodec", 0, 0, 0);
108 
109     while (true) {
110         pthread_mutex_lock(&wlock);
111         if (stop) {
112             pthread_mutex_unlock(&wlock);
113             break;
114         }
115 
116         if (!works) {
117             pthread_mutex_lock(&executing_lock);
118             wait_for_works = true;
119             /* wake up PauseWork() if it's sleeping */
120             pthread_cond_signal(&paused_wait);
121             pthread_mutex_unlock(&executing_lock);
122 
123             /*
124              * sleeps until works're available.
125              * wokeup by ScheduleWork() or FlushWork() or ~WorkQueue()
126              */
127             pthread_cond_wait(&wcond, &wlock);
128 
129             pthread_mutex_lock(&executing_lock);
130             wait_for_works = false;
131             pthread_mutex_unlock(&executing_lock);
132         }
133 
134         while (works) {
135             struct list *entry = works;
136             WorkableInterface *wi =
137                 static_cast<WorkableInterface *>(entry->data);
138 
139             works = __list_delete(works, entry);
140             pthread_mutex_unlock(&wlock);
141 
142             /*
143              * 1. if PauseWork() locks executing_lock right before Run() locks
144              *    the lock, Run() sends the paused signal and go to sleep.
145              * 2. if Run() locks executing_lock first, DoWork() is called and
146              *    PausedWork() waits for paused_wait signal. Run() sends the
147              *    signal during next loop processing or at the end of loop
148              *    in case of works're not available.
149              */
150             pthread_mutex_lock(&executing_lock);
151             if (!executing) {
152                 pthread_cond_signal(&paused_wait);
153                 pthread_cond_wait(&executing_wait, &executing_lock);
154             }
155             pthread_mutex_unlock(&executing_lock);
156 
157             DoWork(wi);
158 
159             pthread_mutex_lock(&wlock);
160         }
161 
162         pthread_mutex_unlock(&wlock);
163     }
164 }
165 
DoWork(WorkableInterface * wi)166 void WorkQueue::DoWork(WorkableInterface *wi)
167 {
168     if (wi)
169         wi->Work();
170 }
171 
Work(void)172 void WorkQueue::Work(void)
173 {
174     return;
175 }
176 
ScheduleWork(void)177 void WorkQueue::ScheduleWork(void)
178 {
179     pthread_mutex_lock(&wlock);
180     works = list_add_tail(works, static_cast<WorkableInterface *>(this));
181     pthread_cond_signal(&wcond); /* wakeup Run() if it's sleeping */
182     pthread_mutex_unlock(&wlock);
183 }
184 
ScheduleWork(WorkableInterface * wi)185 void WorkQueue::ScheduleWork(WorkableInterface *wi)
186 {
187     pthread_mutex_lock(&wlock);
188     if (wi)
189         works = list_add_tail(works, wi);
190     else
191         works = list_add_tail(works, static_cast<WorkableInterface *>(this));
192     pthread_cond_signal(&wcond); /* wakeup Run() if it's sleeping */
193     pthread_mutex_unlock(&wlock);
194 }
195 
CancelScheduledWork(WorkableInterface * wi)196 void WorkQueue::CancelScheduledWork(WorkableInterface *wi)
197 {
198     pthread_mutex_lock(&wlock);
199     works = list_delete_all(works, wi);
200     pthread_mutex_unlock(&wlock);
201 }
202 
FlushWork(void)203 void WorkQueue::FlushWork(void)
204 {
205     FlushBarrier fb;
206     bool needtowait = false;
207 
208     pthread_mutex_lock(&wlock);
209     if (works) {
210         list_add_tail(works, &fb);
211         pthread_cond_signal(&wcond); /* wakeup Run() if it's sleeping */
212 
213         needtowait = true;
214     }
215     pthread_mutex_unlock(&wlock);
216 
217     if (needtowait)
218         fb.WaitCompletion(); /* wokeup by FlushWork::Work() */
219 }
220 
FlushBarrier()221 WorkQueue::FlushBarrier::FlushBarrier()
222 {
223     pthread_mutex_init(&cplock, NULL);
224     pthread_cond_init(&complete, NULL);
225 }
226 
~FlushBarrier()227 WorkQueue::FlushBarrier::~FlushBarrier()
228 {
229     pthread_cond_destroy(&complete);
230     pthread_mutex_destroy(&cplock);
231 }
232 
WaitCompletion(void)233 void WorkQueue::FlushBarrier::WaitCompletion(void)
234 {
235     pthread_mutex_lock(&cplock);
236     pthread_cond_wait(&complete, &cplock);
237     pthread_mutex_unlock(&cplock);
238 }
239 
Work(void)240 void WorkQueue::FlushBarrier::Work(void)
241 {
242     pthread_mutex_lock(&cplock);
243     pthread_cond_signal(&complete); /* wakeup WaitCompletion() */
244     pthread_mutex_unlock(&cplock);
245 }
246