1 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
2 // -*- Mode: C++ -*-
3 //
4 // Copyright (C) 2013-2020 Red Hat, Inc.
5 //
6 // Author: Dodji Seketeli
7 
8 /// @file
9 ///
10 /// This file implements the worker threads (or thread pool) design
11 /// pattern.  It aims at performing a set of tasks in parallel, using
12 /// the multi-threading capabilities of the underlying processor(s).
13 
14 #include <assert.h>
15 #include <unistd.h>
16 #include <pthread.h>
17 #include <queue>
18 #include <vector>
19 #include <iostream>
20 
21 #include "abg-fwd.h"
22 #include "abg-internal.h"
23 // <headers defining libabigail's API go under here>
24 ABG_BEGIN_EXPORT_DECLARATIONS
25 
26 #include "abg-workers.h"
27 
28 ABG_END_EXPORT_DECLARATIONS
29 // </headers defining libabigail's API>
30 
31 namespace abigail
32 {
33 
34 namespace workers
35 {
36 
37 /// @defgroup thread_pool Worker Threads
38 /// @{
39 ///
40 /// \brief Libabigail's implementation of Thread Pools.
41 ///
42 /// The main interface of this pattern is a @ref queue of @ref tasks
43 /// to be performed.  Associated to that queue are a set of worker
44 /// threads (these are native posix threads) that sits there, idle,
45 /// until at least one @ref task is added to the queue.
46 ///
47 /// When a @ref task is added to the @ref queue, one thread is woken
48 /// up, picks the @ref task, removes it from the @ref queue, and
49 /// executes the instructions it carries.  We say the worker thread
50 /// performs the @ref task.
51 ///
52 /// When the worker thread is done performing the @ref task, the
53 /// performed @ref task is added to another queue, named as the "done
54 /// queue".  Then the thread looks at the @ref queue of tasks to be
55 /// performed again, and if there is at least one task in that queue,
56 /// the same process as above is done.  Otherwise, the thread blocks,
57 /// waiting for a new task to be added to the queue.
58 ///
59 /// By default, the number of worker threads is equal to the number of
60 /// execution threads advertised by the underlying processor.
61 ///
62 /// Note that the user of the queue can either wait for all the tasks
63 /// to be performed by the pool of threads,and them stop them, get the
64 /// vector of done tasks and proceed to whatever computation she may
65 /// need next.
66 ///
67 /// Or she can choose to be asynchronously notified whenever a task is
68 /// performed and added to the "done queue".
69 ///
70 ///@}
71 
72 /// @return The number of hardware threads of executions advertised by
73 /// the underlying processor.
74 size_t
get_number_of_threads()75 get_number_of_threads()
76 {return sysconf(_SC_NPROCESSORS_ONLN);}
77 
78 /// The abstraction of a worker thread.
79 ///
80 /// This is an implementation detail of the @ref queue public
81 /// interface type of this worker thread design pattern.
82 struct worker
83 {
84   pthread_t tid;
85 
workerabigail::workers::worker86   worker()
87     : tid()
88   {}
89 
90   static queue::priv*
91   wait_to_execute_a_task(queue::priv*);
92 }; // end struct worker
93 
94 // </worker declarations>
95 
96 // <queue stuff>
97 
98 /// The private data structure of the task queue.
99 struct queue::priv
100 {
101   // A boolean to say if the user wants to shutdown the worker
102   // threads. guarded by tasks_todo_mutex.
103   // TODO: once we have std::atomic<bool>, use it and reconsider the
104   // synchronization around its reads and writes
105   bool				bring_workers_down;
106   // The number of worker threads.
107   size_t			num_workers;
108   // A mutex that protects the todo tasks queue from being accessed in
109   // read/write by two threads at the same time.
110   pthread_mutex_t		tasks_todo_mutex;
111   // The queue condition variable.  This condition is used to make the
112   // worker threads sleep until a new task is added to the queue of
113   // todo tasks.  Whenever a new task is added to that queue, a signal
114   // is sent to all a thread sleeping on this condition variable.
115   pthread_cond_t		tasks_todo_cond;
116   // A mutex that protects the done tasks queue from being accessed in
117   // read/write by two threads at the same time.
118   pthread_mutex_t		tasks_done_mutex;
119   // A condition to be signalled whenever there is a task done. That is being
120   // used to wait for tasks completed when bringing the workers down.
121   pthread_cond_t		tasks_done_cond;
122   // The todo task queue itself.
123   std::queue<task_sptr>	tasks_todo;
124   // The done task queue itself.
125   std::vector<task_sptr>	tasks_done;
126   // This functor is invoked to notify the user of this queue that a
127   // task has been completed and has been added to the done tasks
128   // vector.  We call it a notifier.  This notifier is the default
129   // notifier of the work queue; the one that is used when the user
130   // has specified no notifier.  It basically does nothing.
131   static task_done_notify	default_notify;
132   // This is a reference to the the notifier that is actually used in
133   // the queue.  It's either the one specified by the user or the
134   // default one.
135   task_done_notify&		notify;
136   // A vector of the worker threads.
137   std::vector<worker>		workers;
138 
139   /// A constructor of @ref queue::priv.
140   ///
141   /// @param nb_workers the number of worker threads to have in the
142   /// thread pool.
143   ///
144   /// @param task_done_notify a functor object that is invoked by the
145   /// worker thread which has performed the task, right after it's
146   /// added that task to the vector of the done tasks.
privabigail::workers::queue::priv147   priv(size_t nb_workers = get_number_of_threads(),
148 	      task_done_notify& n = default_notify)
149     : bring_workers_down(),
150       num_workers(nb_workers),
151       tasks_todo_mutex(),
152       tasks_todo_cond(),
153       tasks_done_mutex(),
154       tasks_done_cond(),
155       notify(n)
156   {create_workers();}
157 
158   /// Create the worker threads pool and have all threads sit idle,
159   /// waiting for a task to be added to the todo queue.
160   void
create_workersabigail::workers::queue::priv161   create_workers()
162   {
163     for (unsigned i = 0; i < num_workers; ++i)
164       {
165 	worker w;
166 	ABG_ASSERT(pthread_create(&w.tid,
167 			      /*attr=*/0,
168 			      (void*(*)(void*))&worker::wait_to_execute_a_task,
169 			      this) == 0);
170 	workers.push_back(w);
171       }
172   }
173 
174   /// Submit a task to the queue of tasks to be performed.
175   ///
176   /// This wakes up one thread from the pool which immediatly starts
177   /// performing the task.  When it's done with the task, it goes back
178   /// to be suspended, waiting for a new task to be scheduled.
179   ///
180   /// @param t the task to schedule.  Note that a nil task won't be
181   /// scheduled.  If the queue is empty, the task @p t won't be
182   /// scheduled either.
183   ///
184   /// @return true iff the task @p t was successfully scheduled.
185   bool
schedule_taskabigail::workers::queue::priv186   schedule_task(const task_sptr& t)
187   {
188     if (workers.empty() || !t)
189       return false;
190 
191     pthread_mutex_lock(&tasks_todo_mutex);
192     tasks_todo.push(t);
193     pthread_mutex_unlock(&tasks_todo_mutex);
194     pthread_cond_signal(&tasks_todo_cond);
195     return true;
196   }
197 
198   /// Submit a vector of task to the queue of tasks to be performed.
199   ///
200   /// This wakes up threads of the pool which immediatly start
201   /// performing the tasks.  When they are done with the task, they go
202   /// back to be suspended, waiting for new tasks to be scheduled.
203   ///
204   /// @param tasks the tasks to schedule.
205   bool
schedule_tasksabigail::workers::queue::priv206   schedule_tasks(const tasks_type& tasks)
207   {
208     bool is_ok= true;
209     for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t)
210       is_ok &= schedule_task(*t);
211     return is_ok;
212   }
213 
214   /// Signal all the threads (of the pool) which are suspended and
215   /// waiting to perform a task, so that they wake up and end up their
216   /// execution.  If there is no task to perform, they just end their
217   /// execution.  If there are tasks to perform, they finish them and
218   /// then end their execution.
219   ///
220   /// This function then joins all the tasks of the pool, waiting for
221   /// them to finish, and then it returns.  In other words, this
222   /// function suspends the thread of the caller, waiting for the
223   /// worker threads to finish their tasks, and end their execution.
224   ///
225   /// If the user code wants to work with the thread pool again,
226   /// she'll need to create them again, using the member function
227   /// create_workers().
228   void
do_bring_workers_downabigail::workers::queue::priv229   do_bring_workers_down()
230   {
231     if (workers.empty())
232       return;
233 
234     // Wait for the todo list to be empty to make sure all tasks got picked up
235     pthread_mutex_lock(&tasks_todo_mutex);
236     while (!tasks_todo.empty())
237       pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
238 
239     bring_workers_down = true;
240     pthread_mutex_unlock(&tasks_todo_mutex);
241 
242     // Now that the task queue is empty, drain the workers by waking them up,
243     // letting them finish their final task before termination.
244     ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
245 
246     for (std::vector<worker>::const_iterator i = workers.begin();
247 	 i != workers.end();
248 	 ++i)
249       ABG_ASSERT(pthread_join(i->tid, /*thread_return=*/0) == 0);
250     workers.clear();
251   }
252 
253   /// Destructors of @ref queue::priv type.
~privabigail::workers::queue::priv254   ~priv()
255   {do_bring_workers_down();}
256 
257 }; //end struct queue::priv
258 
259 // default initialize the default notifier.
260 queue::task_done_notify queue::priv::default_notify;
261 
262 /// Default constructor of the @ref queue type.
263 ///
264 /// By default the queue is created with a number of worker threaders
265 /// which is equals to the number of simultaneous execution threads
266 /// supported by the underlying processor.
queue()267 queue::queue()
268   : p_(new priv())
269 {}
270 
271 /// Constructor of the @ref queue type.
272 ///
273 /// @param number_of_workers the number of worker threads to have in
274 /// the pool.
queue(unsigned number_of_workers)275 queue::queue(unsigned number_of_workers)
276   : p_(new priv(number_of_workers))
277 {}
278 
279 /// Constructor of the @ref queue type.
280 ///
281 /// @param number_of_workers the number of worker threads to have in
282 /// the pool.
283 ///
284 /// @param the notifier to invoke when a task is done doing its job.
285 /// Users should create a type that inherit this @ref task_done_notify
286 /// class and overload its virtual task_done_notify::operator()
287 /// operator function.  Note that the code of that
288 /// task_done_notify::operator() is assured to run in *sequence*, with
289 /// respect to the code of other task_done_notify::operator() from
290 /// other tasks.
queue(unsigned number_of_workers,task_done_notify & notifier)291 queue::queue(unsigned number_of_workers,
292 	     task_done_notify& notifier)
293   : p_(new priv(number_of_workers, notifier))
294 {}
295 
296 /// Getter of the size of the queue.  This gives the number of task
297 /// still present in the queue.
298 ///
299 /// @return the number of task still present in the queue.
300 size_t
get_size() const301 queue::get_size() const
302 {return p_->tasks_todo.size();}
303 
304 /// Submit a task to the queue of tasks to be performed.
305 ///
306 /// This wakes up one thread from the pool which immediatly starts
307 /// performing the task.  When it's done with the task, it goes back
308 /// to be suspended, waiting for a new task to be scheduled.
309 ///
310 /// @param t the task to schedule.  Note that if the queue is empty or
311 /// if the task is nil, the task is not scheduled.
312 ///
313 /// @return true iff the task was successfully scheduled.
314 bool
schedule_task(const task_sptr & t)315 queue::schedule_task(const task_sptr& t)
316 {return p_->schedule_task(t);}
317 
318 /// Submit a vector of tasks to the queue of tasks to be performed.
319 ///
320 /// This wakes up one or more threads from the pool which immediatly
321 /// start performing the tasks.  When the threads are done with the
322 /// tasks, they goes back to be suspended, waiting for a new task to
323 /// be scheduled.
324 ///
325 /// @param tasks the tasks to schedule.
326 bool
schedule_tasks(const tasks_type & tasks)327 queue::schedule_tasks(const tasks_type& tasks)
328 {return p_->schedule_tasks(tasks);}
329 
330 /// Suspends the current thread until all worker threads finish
331 /// performing the tasks they are executing.
332 ///
333 /// If the worker threads were suspended waiting for a new task to
334 /// perform, they are woken up and their execution ends.
335 ///
336 /// The execution of the current thread is resumed when all the
337 /// threads of the pool have finished their execution and are
338 /// terminated.
339 void
wait_for_workers_to_complete()340 queue::wait_for_workers_to_complete()
341 {p_->do_bring_workers_down();}
342 
343 /// Getter of the vector of tasks that got performed.
344 ///
345 /// @return the vector of tasks that got performed.
346 std::vector<task_sptr>&
get_completed_tasks() const347 queue::get_completed_tasks() const
348 {return p_->tasks_done;}
349 
350 /// Destructor for the @ref queue type.
~queue()351 queue::~queue()
352 {}
353 
354 /// The default function invocation operator of the @ref queue type.
355 ///
356 /// This does nothing.
357 void
operator ()(const task_sptr &)358 queue::task_done_notify::operator()(const task_sptr&/*task_done*/)
359 {
360 }
361 
362 // </queue stuff>
363 
364 // <worker definitions>
365 
366 /// Wait to be woken up by a thread condition signal, then look if
367 /// there is a task to be executed.  If there is, then pick one (in a
368 /// FIFO manner), execute it, and put the executed task into the set
369 /// of done tasks.
370 ///
371 /// @param t the private data of the "task queue" type to consider.
372 ///
373 /// @param return the same private data of the task queue type we got
374 /// in argument.
375 queue::priv*
wait_to_execute_a_task(queue::priv * p)376 worker::wait_to_execute_a_task(queue::priv* p)
377 {
378   while (true)
379     {
380       pthread_mutex_lock(&p->tasks_todo_mutex);
381       // If there is no more tasks to perform and the queue is not to
382       // be brought down then wait (sleep) for new tasks to come up.
383       while (p->tasks_todo.empty() && !p->bring_workers_down)
384 	pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
385 
386       // We were woken up.  So maybe there are tasks to perform?  If
387       // so, get a task from the queue ...
388       task_sptr t;
389       if (!p->tasks_todo.empty())
390 	{
391 	  t = p->tasks_todo.front();
392 	  p->tasks_todo.pop();
393 	}
394       pthread_mutex_unlock(&p->tasks_todo_mutex);
395 
396       // If we've got a task to perform then perform it and when it's
397       // done then add to the set of tasks that are done.
398       if (t)
399 	{
400 	  t->perform();
401 
402 	  // Add the task to the vector of tasks that are done and
403 	  // notify listeners about the fact that the task is done.
404 	  //
405 	  // Note that this (including the notification) is not
406 	  // happening in parallel.  So the code performed by the
407 	  // notifier during the notification is running sequentially,
408 	  // not in parallel with any other task that was just done
409 	  // and that is notifying its listeners.
410 	  pthread_mutex_lock(&p->tasks_done_mutex);
411 	  p->tasks_done.push_back(t);
412 	  p->notify(t);
413 	  pthread_mutex_unlock(&p->tasks_done_mutex);
414 	  pthread_cond_signal(&p->tasks_done_cond);
415 	}
416 
417       // ensure we access bring_workers_down always guarded
418       bool drop_out = false;
419       pthread_mutex_lock(&p->tasks_todo_mutex);
420       drop_out = p->bring_workers_down;
421       pthread_mutex_unlock(&p->tasks_todo_mutex);
422       if (drop_out)
423 	break;
424     }
425 
426   return p;
427 }
428 // </worker definitions>
429 } //end namespace workers
430 } //end namespace abigail
431