1 /* Test program that performs producer-consumer style communication through
2  * a circular buffer. This test program is a slightly modified version of the
3  * test program made available by Miguel Ojeda
4  * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5  */
6 
7 
8 #include <stdio.h>
9 #include <string.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <time.h>
13 #include <pthread.h>
14 #include <semaphore.h>
15 #include <fcntl.h>
16 #include "../../config.h"
17 
18 
19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
20 
21 #ifndef HAVE_BUILTIN_ATOMIC
22 #error Sorry, but this test program can only be compiled by a compiler that\
23 has built-in functions for atomic memory access.
24 #endif
25 
26 
27 #define BUFFER_MAX (2)
28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
30 
31 
32 typedef int data_t;
33 
34 typedef struct {
35   /* Counting semaphore representing the number of data items in the buffer. */
36   sem_t* data;
37   /* Counting semaphore representing the number of free elements. */
38   sem_t* free;
39   /* Position where a new elements should be written. */
40   int in;
41   /* Position from where an element can be removed. */
42   int out;
43   /* Mutex that protects 'in'. */
44   pthread_mutex_t mutex_in;
45   /* Mutex that protects 'out'. */
46   pthread_mutex_t mutex_out;
47   /* Data buffer. */
48   data_t buffer[BUFFER_MAX];
49 } buffer_t;
50 
51 static int quiet = 0;
52 static int use_locking = 1;
53 
54 static __inline__
fetch_and_add(int * p,int i)55 int fetch_and_add(int* p, int i)
56 {
57   return __sync_fetch_and_add(p, i);
58 }
59 
create_semaphore(const char * const name,const int value)60 static sem_t* create_semaphore(const char* const name, const int value)
61 {
62 #ifdef VGO_darwin
63   char name_and_pid[32];
64   snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
65   sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
66   if (p == SEM_FAILED) {
67     perror("sem_open");
68     return NULL;
69   }
70   return p;
71 #else
72   sem_t* p = malloc(sizeof(*p));
73   if (p)
74     sem_init(p, 0, value);
75   return p;
76 #endif
77 }
78 
destroy_semaphore(const char * const name,sem_t * p)79 static void destroy_semaphore(const char* const name, sem_t* p)
80 {
81 #ifdef VGO_darwin
82   sem_close(p);
83   sem_unlink(name);
84 #else
85   sem_destroy(p);
86   free(p);
87 #endif
88 }
89 
buffer_init(buffer_t * b)90 static void buffer_init(buffer_t * b)
91 {
92   b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
93   b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
94 
95   pthread_mutex_init(&b->mutex_in, NULL);
96   pthread_mutex_init(&b->mutex_out, NULL);
97 
98   b->in = 0;
99   b->out = 0;
100 }
101 
buffer_recv(buffer_t * b,data_t * d)102 static void buffer_recv(buffer_t* b, data_t* d)
103 {
104   int out;
105   sem_wait(b->data);
106   if (use_locking)
107     pthread_mutex_lock(&b->mutex_out);
108   out = fetch_and_add(&b->out, 1);
109   if (out >= BUFFER_MAX)
110   {
111     fetch_and_add(&b->out, -BUFFER_MAX);
112     out -= BUFFER_MAX;
113   }
114   *d = b->buffer[out];
115   if (use_locking)
116     pthread_mutex_unlock(&b->mutex_out);
117   if (! quiet)
118   {
119     printf("received %d from buffer[%d]\n", *d, out);
120     fflush(stdout);
121   }
122   sem_post(b->free);
123 }
124 
buffer_send(buffer_t * b,data_t * d)125 static void buffer_send(buffer_t* b, data_t* d)
126 {
127   int in;
128   sem_wait(b->free);
129   if (use_locking)
130     pthread_mutex_lock(&b->mutex_in);
131   in = fetch_and_add(&b->in, 1);
132   if (in >= BUFFER_MAX)
133   {
134     fetch_and_add(&b->in, -BUFFER_MAX);
135     in -= BUFFER_MAX;
136   }
137   b->buffer[in] = *d;
138   if (use_locking)
139     pthread_mutex_unlock(&b->mutex_in);
140   if (! quiet)
141   {
142     printf("sent %d to buffer[%d]\n", *d, in);
143     fflush(stdout);
144   }
145   sem_post(b->data);
146 }
147 
buffer_destroy(buffer_t * b)148 static void buffer_destroy(buffer_t* b)
149 {
150   destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
151   destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
152 
153   pthread_mutex_destroy(&b->mutex_in);
154   pthread_mutex_destroy(&b->mutex_out);
155 }
156 
157 static buffer_t b;
158 
producer(int * id)159 static void producer(int* id)
160 {
161   buffer_send(&b, id);
162   pthread_exit(NULL);
163 }
164 
165 #define MAXSLEEP (100 * 1000)
166 
consumer(int * id)167 static void consumer(int* id)
168 {
169   int d;
170   usleep(rand() % MAXSLEEP);
171   buffer_recv(&b, &d);
172   if (! quiet)
173   {
174     printf("%i: %i\n", *id, d);
175     fflush(stdout);
176   }
177   pthread_exit(NULL);
178 }
179 
180 #define THREADS (10)
181 
main(int argc,char ** argv)182 int main(int argc, char** argv)
183 {
184   pthread_t producers[THREADS];
185   pthread_t consumers[THREADS];
186   int thread_arg[THREADS];
187   int i;
188   int optchar;
189 
190   while ((optchar = getopt(argc, argv, "nq")) != EOF)
191   {
192     switch (optchar)
193     {
194     case 'n':
195       use_locking = 0;
196       break;
197     case 'q':
198       quiet = 1;
199       break;
200     }
201   }
202 
203   srand(time(NULL));
204 
205   buffer_init(&b);
206 
207   for (i = 0; i < THREADS; ++i)
208   {
209     thread_arg[i] = i;
210     pthread_create(producers + i, NULL,
211                    (void * (*)(void *)) producer, &thread_arg[i]);
212   }
213 
214   for (i = 0; i < THREADS; ++i)
215     pthread_create(consumers + i, NULL,
216                    (void * (*)(void *)) consumer, &thread_arg[i]);
217 
218   for (i = 0; i < THREADS; ++i)
219   {
220     pthread_join(producers[i], NULL);
221     pthread_join(consumers[i], NULL);
222   }
223 
224   buffer_destroy(&b);
225 
226   return 0;
227 }
228