1 /*
2  * thread_pool.cpp - Thread Pool
3  *
4  *  Copyright (c) 2017 Intel Corporation
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * Author: Wind Yuan <feng.yuan@intel.com>
19  */
20 
21 #include "thread_pool.h"
22 
23 #define XCAM_POOL_MIN_THREADS 2
24 #define XCAM_POOL_MAX_THREADS 1024
25 
26 namespace XCam {
27 
28 class UserThread
29     : public Thread
30 {
31 public:
UserThread(const SmartPtr<ThreadPool> & pool,const char * name)32     UserThread (const SmartPtr<ThreadPool> &pool, const char *name)
33         : Thread (name)
34         , _pool (pool)
35     {}
36 
37 protected:
38     virtual bool started ();
39     virtual void stopped ();
40     virtual bool loop ();
41 
42 private:
43     SmartPtr<ThreadPool> _pool;
44 };
45 
46 bool
started()47 UserThread::started ()
48 {
49     XCAM_ASSERT (_pool.ptr ());
50     SmartLock lock (_pool->_mutex);
51     return true;
52 }
53 
54 void
stopped()55 UserThread::stopped ()
56 {
57     XCAM_LOG_DEBUG ("thread(%s, %p) stopped", XCAM_STR(get_name ()), this);
58 }
59 
60 bool
loop()61 UserThread::loop ()
62 {
63     XCAM_ASSERT (_pool.ptr ());
64     {
65         SmartLock lock (_pool->_mutex);
66         if (!_pool->_running)
67             return false;
68     }
69 
70     SmartPtr<ThreadPool::UserData> data = _pool->_data_queue.pop ();
71     if (!data.ptr ()) {
72         XCAM_LOG_DEBUG ("user thread(%s) get null data, need stop", XCAM_STR (_pool->get_name ()));
73         return false;
74     }
75 
76     {
77         SmartLock lock (_pool->_mutex);
78         XCAM_ASSERT (_pool->_free_threads > 0);
79         --_pool->_free_threads;
80     }
81 
82     bool ret = _pool->dispatch (data);
83 
84     if (ret) {
85         SmartLock lock (_pool->_mutex);
86         ++_pool->_free_threads;
87     }
88     return ret;
89 }
90 
91 bool
dispatch(const SmartPtr<ThreadPool::UserData> & data)92 ThreadPool::dispatch (const SmartPtr<ThreadPool::UserData> &data)
93 {
94     XCAM_FAIL_RETURN (
95         ERROR, data.ptr(), true,
96         "ThreadPool(%s) dispatch NULL data", XCAM_STR (get_name ()));
97     XCamReturn err = data->run ();
98     data->done (err);
99     return true;
100 }
101 
ThreadPool(const char * name)102 ThreadPool::ThreadPool (const char *name)
103     : _name (NULL)
104     , _min_threads (XCAM_POOL_MIN_THREADS)
105     , _max_threads (XCAM_POOL_MIN_THREADS)
106     , _allocated_threads (0)
107     , _free_threads (0)
108     , _running (false)
109 {
110     if (name)
111         _name = strndup (name, XCAM_MAX_STR_SIZE);
112 }
113 
~ThreadPool()114 ThreadPool::~ThreadPool ()
115 {
116     stop ();
117 
118     xcam_mem_clear (_name);
119 }
120 
121 bool
set_threads(uint32_t min,uint32_t max)122 ThreadPool::set_threads (uint32_t min, uint32_t max)
123 {
124     XCAM_FAIL_RETURN (
125         ERROR, !_running, false,
126         "ThreadPool(%s) set threads failed, need stop the pool first", XCAM_STR(get_name ()));
127 
128     if (min < XCAM_POOL_MIN_THREADS)
129         min = XCAM_POOL_MIN_THREADS;
130     if (max > XCAM_POOL_MAX_THREADS)
131         max = XCAM_POOL_MAX_THREADS;
132 
133     if (min > max)
134         min = max;
135 
136     _min_threads = min;
137     _max_threads = max;
138     return true;
139 }
140 
141 bool
is_running()142 ThreadPool::is_running ()
143 {
144     SmartLock locker(_mutex);
145     return _running;
146 }
147 
148 XCamReturn
start()149 ThreadPool::start ()
150 {
151     SmartLock locker(_mutex);
152     if (_running)
153         return XCAM_RETURN_NO_ERROR;
154 
155     _free_threads = 0;
156     _allocated_threads = 0;
157     _data_queue.resume_pop ();
158 
159     for (uint32_t i = 0; i < _min_threads; ++i) {
160         XCamReturn ret = create_user_thread_unsafe ();
161         XCAM_FAIL_RETURN (
162             ERROR, xcam_ret_is_ok (ret), ret,
163             "thread pool(%s) start failed by creating user thread", XCAM_STR (get_name()));
164     }
165 
166     XCAM_ASSERT (_allocated_threads == _min_threads);
167 
168     _running = true;
169     return XCAM_RETURN_NO_ERROR;
170 }
171 
172 XCamReturn
stop()173 ThreadPool::stop ()
174 {
175     UserThreadList threads;
176     {
177         SmartLock locker(_mutex);
178         if (!_running)
179             return XCAM_RETURN_NO_ERROR;
180 
181         _running = false;
182         threads = _thread_list;
183         _thread_list.clear ();
184     }
185 
186     for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
187     {
188         SmartPtr<UserThread> t = *i;
189         XCAM_ASSERT (t.ptr ());
190         t->emit_stop ();
191     }
192 
193     _data_queue.pause_pop ();
194     _data_queue.clear ();
195 
196     for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
197     {
198         SmartPtr<UserThread> t = *i;
199         XCAM_ASSERT (t.ptr ());
200         t->stop ();
201     }
202 
203     {
204         SmartLock locker(_mutex);
205         _free_threads = 0;
206         _allocated_threads = 0;
207     }
208 
209     return XCAM_RETURN_NO_ERROR;
210 }
211 
212 XCamReturn
create_user_thread_unsafe()213 ThreadPool::create_user_thread_unsafe ()
214 {
215     char name[256];
216     snprintf (name, 255, "%s-%d", XCAM_STR (get_name()), _allocated_threads);
217     SmartPtr<UserThread> thread = new UserThread (this, name);
218     XCAM_ASSERT (thread.ptr ());
219     XCAM_FAIL_RETURN (
220         ERROR, thread.ptr () && thread->start (), XCAM_RETURN_ERROR_THREAD,
221         "ThreadPool(%s) create user thread failed by starting error", XCAM_STR (get_name()));
222 
223     _thread_list.push_back (thread);
224 
225     ++_allocated_threads;
226     ++_free_threads;
227     XCAM_ASSERT (_free_threads <= _allocated_threads);
228 
229     return XCAM_RETURN_NO_ERROR;
230 }
231 
232 XCamReturn
queue(const SmartPtr<UserData> & data)233 ThreadPool::queue (const SmartPtr<UserData> &data)
234 {
235     XCAM_ASSERT (data.ptr ());
236     {
237         SmartLock locker (_mutex);
238         if (!_running)
239             return XCAM_RETURN_ERROR_THREAD;
240     }
241 
242     if (!_data_queue.push (data))
243         return XCAM_RETURN_ERROR_THREAD;
244 
245     do {
246         SmartLock locker(_mutex);
247         if (!_running) {
248             _data_queue.erase (data);
249             return XCAM_RETURN_ERROR_THREAD;
250         }
251 
252         if (_allocated_threads >= _max_threads)
253             break;
254 
255         if (!_free_threads)
256             break;
257 
258         XCamReturn err = create_user_thread_unsafe ();
259         if (!xcam_ret_is_ok (err) && _allocated_threads) {
260             XCAM_LOG_WARNING ("thread pool(%s) create new thread failed but queue data can continue");
261             break;
262         }
263 
264         XCAM_FAIL_RETURN (
265             ERROR, xcam_ret_is_ok (err), err,
266             "thread pool(%s) queue data failed by creating user thread", XCAM_STR (get_name()));
267 
268     } while (0);
269 
270     return XCAM_RETURN_NO_ERROR;
271 }
272 
273 }
274