1 /*
2   This file is part of Valgrind, a dynamic binary instrumentation
3   framework.
4 
5   Copyright (C) 2008-2008 Google Inc
6      opensource@google.com
7 
8   This program is free software; you can redistribute it and/or
9   modify it under the terms of the GNU General Public License as
10   published by the Free Software Foundation; either version 2 of the
11   License, or (at your option) any later version.
12 
13   This program is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16   General Public License for more details.
17 
18   You should have received a copy of the GNU General Public License
19   along with this program; if not, write to the Free Software
20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
21   02111-1307, USA.
22 
23   The GNU General Public License is contained in the file COPYING.
24 */
25 
26 // Author: Konstantin Serebryany <opensource@google.com>
27 //
28 // Here we define few simple classes that wrap pthread primitives.
29 //
30 // We need this to create unit tests for helgrind (or similar tool)
31 // that will work with different threading frameworks.
32 //
33 // If one needs to test helgrind's support for another threading library,
34 // he/she can create a copy of this file and replace pthread_ calls
35 // with appropriate calls to his/her library.
36 //
37 // Note, that some of the methods defined here are annotated with
38 // ANNOTATE_* macros defined in dynamic_annotations.h.
39 //
40 // DISCLAIMER: the classes defined in this header file
41 // are NOT intended for general use -- only for unit tests.
42 //
43 
44 #ifndef THREAD_WRAPPERS_PTHREAD_H
45 #define THREAD_WRAPPERS_PTHREAD_H
46 
47 #include <pthread.h>
48 #include <semaphore.h>
49 #include <unistd.h>
50 #include <queue>
51 #include <stdio.h>
52 #include <limits.h>   // INT_MAX
53 
54 #ifdef VGO_darwin
55 #include <libkern/OSAtomic.h>
56 #define NO_BARRIER
57 #define NO_TLS
58 #endif
59 
60 #include <string>
61 using namespace std;
62 
63 #include <sys/time.h>
64 #include <time.h>
65 
66 #include "../../drd/drd.h"
67 #define ANNOTATE_NO_OP(arg) do { } while(0)
68 #define ANNOTATE_EXPECT_RACE(addr, descr)                \
69     ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race")
RunningOnValgrind()70 static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; }
71 
72 #include <assert.h>
73 #ifdef NDEBUG
74 # error "Pleeease, do not define NDEBUG"
75 #endif
76 #define CHECK assert
77 
78 /// Set this to true if malloc() uses mutex on your platform as this may
79 /// introduce a happens-before arc for a pure happens-before race detector.
80 const bool kMallocUsesMutex = false;
81 
82 /// Current time in milliseconds.
GetCurrentTimeMillis()83 static inline int64_t GetCurrentTimeMillis() {
84   struct timeval now;
85   gettimeofday(&now, NULL);
86   return now.tv_sec * 1000 + now.tv_usec / 1000;
87 }
88 
89 /// Copy tv to ts adding offset in milliseconds.
timeval2timespec(timeval * const tv,timespec * ts,int64_t offset_milli)90 static inline void timeval2timespec(timeval *const tv,
91                                      timespec *ts,
92                                      int64_t offset_milli) {
93   const int64_t ten_9 = 1000000000LL;
94   const int64_t ten_6 = 1000000LL;
95   const int64_t ten_3 = 1000LL;
96   int64_t now_nsec = (int64_t)tv->tv_sec * ten_9;
97   now_nsec += (int64_t)tv->tv_usec * ten_3;
98   int64_t then_nsec = now_nsec + offset_milli * ten_6;
99   ts->tv_sec  = then_nsec / ten_9;
100   ts->tv_nsec = then_nsec % ten_9;
101 }
102 
103 
104 class CondVar;
105 
106 #ifndef NO_SPINLOCK
107 /// helgrind does not (yet) support spin locks, so we annotate them.
108 
109 #ifndef VGO_darwin
110 class SpinLock {
111  public:
SpinLock()112   SpinLock() {
113     CHECK(0 == pthread_spin_init(&mu_, 0));
114     ANNOTATE_RWLOCK_CREATE((void*)&mu_);
115   }
~SpinLock()116   ~SpinLock() {
117     ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
118     CHECK(0 == pthread_spin_destroy(&mu_));
119   }
Lock()120   void Lock() {
121     CHECK(0 == pthread_spin_lock(&mu_));
122     ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
123   }
Unlock()124   void Unlock() {
125     ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
126     CHECK(0 == pthread_spin_unlock(&mu_));
127   }
128  private:
129   pthread_spinlock_t mu_;
130 };
131 
132 #else
133 
134 class SpinLock {
135  public:
136   // Mac OS X version.
SpinLock()137   SpinLock() : mu_(OS_SPINLOCK_INIT) {
138     ANNOTATE_RWLOCK_CREATE((void*)&mu_);
139   }
~SpinLock()140   ~SpinLock() {
141     ANNOTATE_RWLOCK_DESTROY((void*)&mu_);
142   }
Lock()143   void Lock() {
144     OSSpinLockLock(&mu_);
145     ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1);
146   }
Unlock()147   void Unlock() {
148     ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1);
149     OSSpinLockUnlock(&mu_);
150   }
151  private:
152   OSSpinLock mu_;
153 };
154 #endif // VGO_darwin
155 
156 #endif // NO_SPINLOCK
157 
158 /// Just a boolean condition. Used by Mutex::LockWhen and similar.
159 class Condition {
160  public:
161   typedef bool (*func_t)(void*);
162 
163   template <typename T>
Condition(bool (* func)(T *),T * arg)164   Condition(bool (*func)(T*), T* arg)
165   : func_(reinterpret_cast<func_t>(func)), arg_(arg) {}
166 
Condition(bool (* func)())167   Condition(bool (*func)())
168   : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {}
169 
Eval()170   bool Eval() { return func_(arg_); }
171  private:
172   func_t func_;
173   void *arg_;
174 
175 };
176 
177 
178 /// Wrapper for pthread_mutex_t.
179 ///
180 /// pthread_mutex_t is *not* a reader-writer lock,
181 /// so the methods like ReaderLock() aren't really reader locks.
182 /// We can not use pthread_rwlock_t because it
183 /// does not work with pthread_cond_t.
184 ///
185 /// TODO: We still need to test reader locks with this class.
186 /// Implement a mode where pthread_rwlock_t will be used
187 /// instead of pthread_mutex_t (only when not used with CondVar or LockWhen).
188 ///
189 class Mutex {
190   friend class CondVar;
191  public:
Mutex()192   Mutex() {
193     CHECK(0 == pthread_mutex_init(&mu_, NULL));
194     CHECK(0 == pthread_cond_init(&cv_, NULL));
195     signal_at_unlock_ = true;  // Always signal at Unlock to make
196                                // Mutex more friendly to hybrid detectors.
197   }
~Mutex()198   ~Mutex() {
199     CHECK(0 == pthread_cond_destroy(&cv_));
200     CHECK(0 == pthread_mutex_destroy(&mu_));
201   }
Lock()202   void Lock()          { CHECK(0 == pthread_mutex_lock(&mu_));}
TryLock()203   bool TryLock()       { return (0 == pthread_mutex_trylock(&mu_));}
Unlock()204   void Unlock() {
205     if (signal_at_unlock_) {
206       CHECK(0 == pthread_cond_signal(&cv_));
207     }
208     CHECK(0 == pthread_mutex_unlock(&mu_));
209   }
ReaderLock()210   void ReaderLock()    { Lock(); }
ReaderTryLock()211   bool ReaderTryLock() { return TryLock();}
ReaderUnlock()212   void ReaderUnlock()  { Unlock(); }
213 
LockWhen(Condition cond)214   void LockWhen(Condition cond)            { Lock(); WaitLoop(cond); }
ReaderLockWhen(Condition cond)215   void ReaderLockWhen(Condition cond)      { Lock(); WaitLoop(cond); }
Await(Condition cond)216   void Await(Condition cond)               { WaitLoop(cond); }
217 
ReaderLockWhenWithTimeout(Condition cond,int millis)218   bool ReaderLockWhenWithTimeout(Condition cond, int millis)
219     { Lock(); return WaitLoopWithTimeout(cond, millis); }
LockWhenWithTimeout(Condition cond,int millis)220   bool LockWhenWithTimeout(Condition cond, int millis)
221     { Lock(); return WaitLoopWithTimeout(cond, millis); }
AwaitWithTimeout(Condition cond,int millis)222   bool AwaitWithTimeout(Condition cond, int millis)
223     { return WaitLoopWithTimeout(cond, millis); }
224 
225  private:
226 
WaitLoop(Condition cond)227   void WaitLoop(Condition cond) {
228     signal_at_unlock_ = true;
229     while(cond.Eval() == false) {
230       pthread_cond_wait(&cv_, &mu_);
231     }
232     ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
233   }
234 
WaitLoopWithTimeout(Condition cond,int millis)235   bool WaitLoopWithTimeout(Condition cond, int millis) {
236     struct timeval now;
237     struct timespec timeout;
238     int retcode = 0;
239     gettimeofday(&now, NULL);
240     timeval2timespec(&now, &timeout, millis);
241 
242     signal_at_unlock_ = true;
243     while (cond.Eval() == false && retcode == 0) {
244       retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout);
245     }
246     if(retcode == 0) {
247       ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_);
248     }
249     return cond.Eval();
250   }
251 
252   // A hack. cv_ should be the first data member so that
253   // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works.
254   // (See also racecheck_unittest.cc)
255   pthread_cond_t  cv_;
256   pthread_mutex_t mu_;
257   bool            signal_at_unlock_;  // Set to true if Wait was called.
258 };
259 
260 
261 class MutexLock {  // Scoped Mutex Locker/Unlocker
262  public:
MutexLock(Mutex * mu)263   MutexLock(Mutex *mu)
264     : mu_(mu) {
265     mu_->Lock();
266   }
~MutexLock()267   ~MutexLock() {
268     mu_->Unlock();
269   }
270  private:
271   Mutex *mu_;
272 };
273 
274 
275 /// Wrapper for pthread_cond_t.
276 class CondVar {
277  public:
CondVar()278   CondVar()   { CHECK(0 == pthread_cond_init(&cv_, NULL)); }
~CondVar()279   ~CondVar()  { CHECK(0 == pthread_cond_destroy(&cv_)); }
Wait(Mutex * mu)280   void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); }
WaitWithTimeout(Mutex * mu,int millis)281   bool WaitWithTimeout(Mutex *mu, int millis) {
282     struct timeval now;
283     struct timespec timeout;
284     gettimeofday(&now, NULL);
285     timeval2timespec(&now, &timeout, millis);
286     return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout);
287   }
Signal()288   void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); }
SignalAll()289   void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); }
290  private:
291   pthread_cond_t cv_;
292 };
293 
294 
295 // pthreads do not allow to use condvar with rwlock so we can't make
296 // ReaderLock method of Mutex to be the real rw-lock.
297 // So, we need a special lock class to test reader locks.
298 #define NEEDS_SEPERATE_RW_LOCK
299 class RWLock {
300  public:
RWLock()301   RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); }
~RWLock()302   ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); }
Lock()303   void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); }
ReaderLock()304   void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); }
Unlock()305   void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
ReaderUnlock()306   void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); }
307  private:
308   pthread_cond_t dummy; // Damn, this requires some redesign...
309   pthread_rwlock_t mu_;
310 };
311 
312 class ReaderLockScoped {  // Scoped RWLock Locker/Unlocker
313  public:
ReaderLockScoped(RWLock * mu)314   ReaderLockScoped(RWLock *mu)
315     : mu_(mu) {
316     mu_->ReaderLock();
317   }
~ReaderLockScoped()318   ~ReaderLockScoped() {
319     mu_->ReaderUnlock();
320   }
321  private:
322   RWLock *mu_;
323 };
324 
325 class WriterLockScoped {  // Scoped RWLock Locker/Unlocker
326  public:
WriterLockScoped(RWLock * mu)327   WriterLockScoped(RWLock *mu)
328     : mu_(mu) {
329     mu_->Lock();
330   }
~WriterLockScoped()331   ~WriterLockScoped() {
332     mu_->Unlock();
333   }
334  private:
335   RWLock *mu_;
336 };
337 
338 
339 
340 
341 /// Wrapper for pthread_create()/pthread_join().
342 class MyThread {
343  public:
344   typedef void *(*worker_t)(void*);
345 
346   MyThread(worker_t worker, void *arg = NULL, const char *name = NULL)
w_(worker)347       :w_(worker), arg_(arg), name_(name) {}
348   MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL)
w_(reinterpret_cast<worker_t> (worker))349       :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
350   MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL)
w_(reinterpret_cast<worker_t> (worker))351       :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {}
352 
~MyThread()353   ~MyThread(){ w_ = NULL; arg_ = NULL;}
Start()354   void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));}
Join()355   void Join()  { CHECK(0 == pthread_join(t_, NULL));}
tid()356   pthread_t tid() const { return t_; }
357  private:
ThreadBody(MyThread * my_thread)358   static void ThreadBody(MyThread *my_thread) {
359     if (my_thread->name_) {
360       ANNOTATE_THREAD_NAME(my_thread->name_);
361     }
362     my_thread->w_(my_thread->arg_);
363   }
364   pthread_t t_;
365   worker_t  w_;
366   void     *arg_;
367   const char *name_;
368 };
369 
370 
371 /// Just a message queue.
372 class ProducerConsumerQueue {
373  public:
ProducerConsumerQueue(int unused)374   ProducerConsumerQueue(int unused) {
375     //ANNOTATE_PCQ_CREATE(this);
376   }
~ProducerConsumerQueue()377   ~ProducerConsumerQueue() {
378     CHECK(q_.empty());
379     //ANNOTATE_PCQ_DESTROY(this);
380   }
381 
382   // Put.
Put(void * item)383   void Put(void *item) {
384     mu_.Lock();
385       q_.push(item);
386       ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get()
387       //ANNOTATE_PCQ_PUT(this);
388     mu_.Unlock();
389   }
390 
391   // Get.
392   // Blocks if the queue is empty.
Get()393   void *Get() {
394     mu_.LockWhen(Condition(IsQueueNotEmpty, &q_));
395       void * item = NULL;
396       bool ok = TryGetInternal(&item);
397       CHECK(ok);
398     mu_.Unlock();
399     return item;
400   }
401 
402   // If queue is not empty,
403   // remove an element from queue, put it into *res and return true.
404   // Otherwise return false.
TryGet(void ** res)405   bool TryGet(void **res) {
406     mu_.Lock();
407       bool ok = TryGetInternal(res);
408     mu_.Unlock();
409     return ok;
410   }
411 
412  private:
413   Mutex mu_;
414   std::queue<void*> q_; // protected by mu_
415 
416   // Requires mu_
TryGetInternal(void ** item_ptr)417   bool TryGetInternal(void ** item_ptr) {
418     if (q_.empty())
419       return false;
420     *item_ptr = q_.front();
421     q_.pop();
422     //ANNOTATE_PCQ_GET(this);
423     return true;
424   }
425 
IsQueueNotEmpty(std::queue<void * > * queue)426   static bool IsQueueNotEmpty(std::queue<void*> * queue) {
427      return !queue->empty();
428   }
429 };
430 
431 
432 
433 /// Function pointer with zero, one or two parameters.
434 struct Closure {
435   typedef void (*F0)();
436   typedef void (*F1)(void *arg1);
437   typedef void (*F2)(void *arg1, void *arg2);
438   int  n_params;
439   void *f;
440   void *param1;
441   void *param2;
442 
ExecuteClosure443   void Execute() {
444     if (n_params == 0) {
445       (F0(f))();
446     } else if (n_params == 1) {
447       (F1(f))(param1);
448     } else {
449       CHECK(n_params == 2);
450       (F2(f))(param1, param2);
451     }
452     delete this;
453   }
454 };
455 
NewCallback(void (* f)())456 Closure *NewCallback(void (*f)()) {
457   Closure *res = new Closure;
458   res->n_params = 0;
459   res->f = (void*)(f);
460   res->param1 = NULL;
461   res->param2 = NULL;
462   return res;
463 }
464 
465 template <class P1>
NewCallback(void (* f)(P1),P1 p1)466 Closure *NewCallback(void (*f)(P1), P1 p1) {
467   CHECK(sizeof(P1) <= sizeof(void*));
468   Closure *res = new Closure;
469   res->n_params = 1;
470   res->f = (void*)(f);
471   res->param1 = (void*)p1;
472   res->param2 = NULL;
473   return res;
474 }
475 
476 template <class T, class P1, class P2>
NewCallback(void (* f)(P1,P2),P1 p1,P2 p2)477 Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) {
478   CHECK(sizeof(P1) <= sizeof(void*));
479   Closure *res = new Closure;
480   res->n_params = 2;
481   res->f = (void*)(f);
482   res->param1 = (void*)p1;
483   res->param2 = (void*)p2;
484   return res;
485 }
486 
487 /*! A thread pool that uses ProducerConsumerQueue.
488   Usage:
489   {
490     ThreadPool pool(n_workers);
491     pool.StartWorkers();
492     pool.Add(NewCallback(func_with_no_args));
493     pool.Add(NewCallback(func_with_one_arg, arg));
494     pool.Add(NewCallback(func_with_two_args, arg1, arg2));
495     ... // more calls to pool.Add()
496 
497     // the ~ThreadPool() is called: we wait workers to finish
498     // and then join all threads in the pool.
499   }
500 */
501 class ThreadPool {
502  public:
503   //! Create n_threads threads, but do not start.
ThreadPool(int n_threads)504   explicit ThreadPool(int n_threads)
505     : queue_(INT_MAX) {
506     for (int i = 0; i < n_threads; i++) {
507       MyThread *thread = new MyThread(&ThreadPool::Worker, this);
508       workers_.push_back(thread);
509     }
510   }
511 
512   //! Start all threads.
StartWorkers()513   void StartWorkers() {
514     for (size_t i = 0; i < workers_.size(); i++) {
515       workers_[i]->Start();
516     }
517   }
518 
519   //! Add a closure.
Add(Closure * closure)520   void Add(Closure *closure) {
521     queue_.Put(closure);
522   }
523 
num_threads()524   int num_threads() { return workers_.size();}
525 
526   //! Wait workers to finish, then join all threads.
~ThreadPool()527   ~ThreadPool() {
528     for (size_t i = 0; i < workers_.size(); i++) {
529       Add(NULL);
530     }
531     for (size_t i = 0; i < workers_.size(); i++) {
532       workers_[i]->Join();
533       delete workers_[i];
534     }
535   }
536  private:
537   std::vector<MyThread*>   workers_;
538   ProducerConsumerQueue  queue_;
539 
Worker(void * p)540   static void *Worker(void *p) {
541     ThreadPool *pool = reinterpret_cast<ThreadPool*>(p);
542     while (true) {
543       Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get());
544       if(closure == NULL) {
545         return NULL;
546       }
547       closure->Execute();
548     }
549   }
550 };
551 
552 #ifndef NO_BARRIER
553 /// Wrapper for pthread_barrier_t.
554 class Barrier{
555  public:
Barrier(int n_threads)556   explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));}
~Barrier()557   ~Barrier()                      {CHECK(0 == pthread_barrier_destroy(&b_));}
Block()558   void Block() {
559     // helgrind 3.3.0 does not have an interceptor for barrier.
560     // but our current local version does.
561     // ANNOTATE_CONDVAR_SIGNAL(this);
562     pthread_barrier_wait(&b_);
563     // ANNOTATE_CONDVAR_WAIT(this, this);
564   }
565  private:
566   pthread_barrier_t b_;
567 };
568 
569 #endif // NO_BARRIER
570 
571 class BlockingCounter {
572  public:
BlockingCounter(int initial_count)573   explicit BlockingCounter(int initial_count) :
574     count_(initial_count) {}
DecrementCount()575   bool DecrementCount() {
576     MutexLock lock(&mu_);
577     count_--;
578     return count_ == 0;
579   }
Wait()580   void Wait() {
581     mu_.LockWhen(Condition(&IsZero, &count_));
582     mu_.Unlock();
583   }
584  private:
IsZero(int * arg)585   static bool IsZero(int *arg) { return *arg == 0; }
586   Mutex mu_;
587   int count_;
588 };
589 
590 int AtomicIncrement(volatile int *value, int increment);
591 
592 #ifndef VGO_darwin
AtomicIncrement(volatile int * value,int increment)593 inline int AtomicIncrement(volatile int *value, int increment) {
594   return __sync_add_and_fetch(value, increment);
595 }
596 
597 #else
598 // Mac OS X version.
AtomicIncrement(volatile int * value,int increment)599 inline int AtomicIncrement(volatile int *value, int increment) {
600   return OSAtomicAdd32(increment, value);
601 }
602 
603 // TODO(timurrrr) this is a hack
604 #define memalign(A,B) malloc(B)
605 
606 // TODO(timurrrr) this is a hack
posix_memalign(void ** out,size_t al,size_t size)607 int posix_memalign(void **out, size_t al, size_t size) {
608   *out = memalign(al, size);
609   return (*out == 0);
610 }
611 #endif // VGO_darwin
612 
613 #endif // THREAD_WRAPPERS_PTHREAD_H
614 // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker
615