1 /*
2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like. Any license provided herein, whether implied or
15 * otherwise, applies only to this software file. Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22 *
23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24 * Mountain View, CA 94043, or:
25 *
26 *
27 * aio-stress
28 *
29 * will open or create each file on the command line, and start a series
30 * of aio to it.
31 *
32 * aio is done in a rotating loop. first file1 gets 8 requests, then
33 * file2, then file3 etc. As each file finishes writing, it is switched
34 * to reads
35 *
36 * io buffers are aligned in case you want to do raw io
37 *
38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39 *
40 * run aio-stress -h to see the options
41 *
42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43 */
44 #define _FILE_OFFSET_BITS 64
45 #define PROG_VERSION "0.21"
46 #define NEW_GETEVENTS
47
48 #include <stdio.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <stdlib.h>
52
53 #include <sys/types.h>
54 #include <sys/stat.h>
55 #include <fcntl.h>
56 #include <unistd.h>
57 #include <sys/time.h>
58 #include <libaio.h>
59 #include <sys/ipc.h>
60 #include <sys/shm.h>
61 #include <sys/mman.h>
62 #include <string.h>
63 #include <pthread.h>
64
65 #define IO_FREE 0
66 #define IO_PENDING 1
67 #define RUN_FOREVER -1
68
69 #ifndef O_DIRECT
70 #define O_DIRECT 040000 /* direct disk access hint */
71 #endif
72
73 enum {
74 WRITE,
75 READ,
76 RWRITE,
77 RREAD,
78 LAST_STAGE,
79 };
80
81 #define USE_MALLOC 0
82 #define USE_SHM 1
83 #define USE_SHMFS 2
84
85 /*
86 * various globals, these are effectively read only by the time the threads
87 * are started
88 */
89 long stages = 0;
90 unsigned long page_size_mask;
91 int o_direct = 0;
92 int o_sync = 0;
93 int latency_stats = 0;
94 int completion_latency_stats = 0;
95 int io_iter = 8;
96 int iterations = RUN_FOREVER;
97 int max_io_submit = 0;
98 long rec_len = 64 * 1024;
99 int depth = 64;
100 int num_threads = 1;
101 int num_contexts = 1;
102 off_t context_offset = 2 * 1024 * 1024;
103 int fsync_stages = 1;
104 int use_shm = 0;
105 int shm_id;
106 char *unaligned_buffer = NULL;
107 char *aligned_buffer = NULL;
108 int padded_reclen = 0;
109 int stonewall = 1;
110 int verify = 0;
111 char *verify_buf = NULL;
112 int unlink_files = 0;
113
114 struct io_unit;
115 struct thread_info;
116
117 /* pthread mutexes and other globals for keeping the threads in sync */
118 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
119 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
120 int threads_ending = 0;
121 int threads_starting = 0;
122 struct timeval global_stage_start_time;
123 struct thread_info *global_thread_info;
124
125 /*
126 * latencies during io_submit are measured, these are the
127 * granularities for deviations
128 */
129 #define DEVIATIONS 6
130 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
131 struct io_latency {
132 double max;
133 double min;
134 double total_io;
135 double total_lat;
136 double deviations[DEVIATIONS];
137 };
138
139 /* container for a series of operations to a file */
140 struct io_oper {
141 /* already open file descriptor, valid for whatever operation you want */
142 int fd;
143
144 /* starting byte of the operation */
145 off_t start;
146
147 /* ending byte of the operation */
148 off_t end;
149
150 /* size of the read/write buffer */
151 int reclen;
152
153 /* max number of pending requests before a wait is triggered */
154 int depth;
155
156 /* current number of pending requests */
157 int num_pending;
158
159 /* last error, zero if there were none */
160 int last_err;
161
162 /* total number of errors hit. */
163 int num_err;
164
165 /* read,write, random, etc */
166 int rw;
167
168 /* number of ios that will get sent to aio */
169 int total_ios;
170
171 /* number of ios we've already sent */
172 int started_ios;
173
174 /* last offset used in an io operation */
175 off_t last_offset;
176
177 /* stonewalled = 1 when we got cut off before submitting all our ios */
178 int stonewalled;
179
180 /* list management */
181 struct io_oper *next;
182 struct io_oper *prev;
183
184 struct timeval start_time;
185
186 char *file_name;
187 };
188
189 /* a single io, and all the tracking needed for it */
190 struct io_unit {
191 /* note, iocb must go first! */
192 struct iocb iocb;
193
194 /* pointer to parent io operation struct */
195 struct io_oper *io_oper;
196
197 /* aligned buffer */
198 char *buf;
199
200 /* size of the aligned buffer (record size) */
201 int buf_size;
202
203 /* state of this io unit (free, pending, done) */
204 int busy;
205
206 /* result of last operation */
207 long res;
208
209 struct io_unit *next;
210
211 struct timeval io_start_time; /* time of io_submit */
212 };
213
214 struct thread_info {
215 io_context_t io_ctx;
216 pthread_t tid;
217
218 /* allocated array of io_unit structs */
219 struct io_unit *ios;
220
221 /* list of io units available for io */
222 struct io_unit *free_ious;
223
224 /* number of io units in the ios array */
225 int num_global_ios;
226
227 /* number of io units in flight */
228 int num_global_pending;
229
230 /* preallocated array of iocb pointers, only used in run_active */
231 struct iocb **iocbs;
232
233 /* preallocated array of events */
234 struct io_event *events;
235
236 /* size of the events array */
237 int num_global_events;
238
239 /* latency stats for io_submit */
240 struct io_latency io_submit_latency;
241
242 /* list of operations still in progress, and of those finished */
243 struct io_oper *active_opers;
244 struct io_oper *finished_opers;
245
246 /* number of files this thread is doing io on */
247 int num_files;
248
249 /* how much io this thread did in the last stage */
250 double stage_mb_trans;
251
252 /* latency completion stats i/o time from io_submit until io_getevents */
253 struct io_latency io_completion_latency;
254 };
255
256 /*
257 * return seconds between start_tv and stop_tv in double precision
258 */
time_since(struct timeval * start_tv,struct timeval * stop_tv)259 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
260 {
261 double sec, usec;
262 double ret;
263 sec = stop_tv->tv_sec - start_tv->tv_sec;
264 usec = stop_tv->tv_usec - start_tv->tv_usec;
265 if (sec > 0 && usec < 0) {
266 sec--;
267 usec += 1000000;
268 }
269 ret = sec + usec / (double)1000000;
270 if (ret < 0)
271 ret = 0;
272 return ret;
273 }
274
275 /*
276 * return seconds between start_tv and now in double precision
277 */
time_since_now(struct timeval * start_tv)278 static double time_since_now(struct timeval *start_tv)
279 {
280 struct timeval stop_time;
281 gettimeofday(&stop_time, NULL);
282 return time_since(start_tv, &stop_time);
283 }
284
285 /*
286 * Add latency info to latency struct
287 */
calc_latency(struct timeval * start_tv,struct timeval * stop_tv,struct io_latency * lat)288 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
289 struct io_latency *lat)
290 {
291 double delta;
292 int i;
293 delta = time_since(start_tv, stop_tv);
294 delta = delta * 1000;
295
296 if (delta > lat->max)
297 lat->max = delta;
298 if (!lat->min || delta < lat->min)
299 lat->min = delta;
300 lat->total_io++;
301 lat->total_lat += delta;
302 for (i = 0 ; i < DEVIATIONS ; i++) {
303 if (delta < deviations[i]) {
304 lat->deviations[i]++;
305 break;
306 }
307 }
308 }
309
oper_list_add(struct io_oper * oper,struct io_oper ** list)310 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
311 {
312 if (!*list) {
313 *list = oper;
314 oper->prev = oper->next = oper;
315 return;
316 }
317 oper->prev = (*list)->prev;
318 oper->next = *list;
319 (*list)->prev->next = oper;
320 (*list)->prev = oper;
321 return;
322 }
323
oper_list_del(struct io_oper * oper,struct io_oper ** list)324 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
325 {
326 if ((*list)->next == (*list)->prev && *list == (*list)->next) {
327 *list = NULL;
328 return;
329 }
330 oper->prev->next = oper->next;
331 oper->next->prev = oper->prev;
332 if (*list == oper)
333 *list = oper->next;
334 }
335
336 /* worker func to check error fields in the io unit */
check_finished_io(struct io_unit * io)337 static int check_finished_io(struct io_unit *io) {
338 int i;
339 if (io->res != io->buf_size) {
340
341 struct stat s;
342 fstat(io->io_oper->fd, &s);
343
344 /*
345 * If file size is large enough for the read, then this short
346 * read is an error.
347 */
348 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
349 s.st_size > (io->iocb.u.c.offset + io->res)) {
350
351 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
352 io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
353 io->iocb.u.c.offset, io->buf_size);
354 io->io_oper->last_err = io->res;
355 io->io_oper->num_err++;
356 return -1;
357 }
358 }
359 if (verify && io->io_oper->rw == READ) {
360 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
361 fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
362 io->io_oper->file_name, io->iocb.u.c.offset);
363
364 for (i = 0 ; i < io->io_oper->reclen ; i++) {
365 if (io->buf[i] != verify_buf[i]) {
366 fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
367 }
368 }
369 fprintf(stderr, "\n");
370 }
371
372 }
373 return 0;
374 }
375
376 /* worker func to check the busy bits and get an io unit ready for use */
grab_iou(struct io_unit * io,struct io_oper * oper)377 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
378 if (io->busy == IO_PENDING)
379 return -1;
380
381 io->busy = IO_PENDING;
382 io->res = 0;
383 io->io_oper = oper;
384 return 0;
385 }
386
stage_name(int rw)387 char *stage_name(int rw) {
388 switch(rw) {
389 case WRITE:
390 return "write";
391 case READ:
392 return "read";
393 case RWRITE:
394 return "random write";
395 case RREAD:
396 return "random read";
397 }
398 return "unknown";
399 }
400
oper_mb_trans(struct io_oper * oper)401 static inline double oper_mb_trans(struct io_oper *oper) {
402 return ((double)oper->started_ios * (double)oper->reclen) /
403 (double)(1024 * 1024);
404 }
405
print_time(struct io_oper * oper)406 static void print_time(struct io_oper *oper) {
407 double runtime;
408 double tput;
409 double mb;
410
411 runtime = time_since_now(&oper->start_time);
412 mb = oper_mb_trans(oper);
413 tput = mb / runtime;
414 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
415 stage_name(oper->rw), oper->file_name, tput, mb, runtime);
416 }
417
print_lat(char * str,struct io_latency * lat)418 static void print_lat(char *str, struct io_latency *lat) {
419 double avg = lat->total_lat / lat->total_io;
420 int i;
421 double total_counted = 0;
422 fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
423 str, lat->min, avg, lat->max);
424
425 for (i = 0 ; i < DEVIATIONS ; i++) {
426 fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
427 total_counted += lat->deviations[i];
428 }
429 if (total_counted && lat->total_io - total_counted)
430 fprintf(stderr, " < %.0f", lat->total_io - total_counted);
431 fprintf(stderr, "\n");
432 memset(lat, 0, sizeof(*lat));
433 }
434
print_latency(struct thread_info * t)435 static void print_latency(struct thread_info *t)
436 {
437 struct io_latency *lat = &t->io_submit_latency;
438 print_lat("latency", lat);
439 }
440
print_completion_latency(struct thread_info * t)441 static void print_completion_latency(struct thread_info *t)
442 {
443 struct io_latency *lat = &t->io_completion_latency;
444 print_lat("completion latency", lat);
445 }
446
447 /*
448 * updates the fields in the io operation struct that belongs to this
449 * io unit, and make the io unit reusable again
450 */
finish_io(struct thread_info * t,struct io_unit * io,long result,struct timeval * tv_now)451 void finish_io(struct thread_info *t, struct io_unit *io, long result,
452 struct timeval *tv_now) {
453 struct io_oper *oper = io->io_oper;
454
455 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
456 io->res = result;
457 io->busy = IO_FREE;
458 io->next = t->free_ious;
459 t->free_ious = io;
460 oper->num_pending--;
461 t->num_global_pending--;
462 check_finished_io(io);
463 if (oper->num_pending == 0 &&
464 (oper->started_ios == oper->total_ios || oper->stonewalled))
465 {
466 print_time(oper);
467 }
468 }
469
read_some_events(struct thread_info * t)470 int read_some_events(struct thread_info *t) {
471 struct io_unit *event_io;
472 struct io_event *event;
473 int nr;
474 int i;
475 int min_nr = io_iter;
476 struct timeval stop_time;
477
478 if (t->num_global_pending < io_iter)
479 min_nr = t->num_global_pending;
480
481 #ifdef NEW_GETEVENTS
482 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
483 #else
484 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
485 #endif
486 if (nr <= 0)
487 return nr;
488
489 gettimeofday(&stop_time, NULL);
490 for (i = 0 ; i < nr ; i++) {
491 event = t->events + i;
492 event_io = (struct io_unit *)((unsigned long)event->obj);
493 finish_io(t, event_io, event->res, &stop_time);
494 }
495 return nr;
496 }
497
498 /*
499 * finds a free io unit, waiting for pending requests if required. returns
500 * null if none could be found
501 */
find_iou(struct thread_info * t,struct io_oper * oper)502 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
503 {
504 struct io_unit *event_io;
505 int nr;
506
507 retry:
508 if (t->free_ious) {
509 event_io = t->free_ious;
510 t->free_ious = t->free_ious->next;
511 if (grab_iou(event_io, oper)) {
512 fprintf(stderr, "io unit on free list but not free\n");
513 abort();
514 }
515 return event_io;
516 }
517 nr = read_some_events(t);
518 if (nr > 0)
519 goto retry;
520 else
521 fprintf(stderr, "no free ious after read_some_events\n");
522 return NULL;
523 }
524
525 /*
526 * wait for all pending requests for this io operation to finish
527 */
io_oper_wait(struct thread_info * t,struct io_oper * oper)528 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
529 struct io_event event;
530 struct io_unit *event_io;
531
532 if (oper == NULL) {
533 return 0;
534 }
535
536 if (oper->num_pending == 0)
537 goto done;
538
539 /* this func is not speed sensitive, no need to go wild reading
540 * more than one event at a time
541 */
542 #ifdef NEW_GETEVENTS
543 while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
544 #else
545 while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
546 #endif
547 struct timeval tv_now;
548 event_io = (struct io_unit *)((unsigned long)event.obj);
549
550 gettimeofday(&tv_now, NULL);
551 finish_io(t, event_io, event.res, &tv_now);
552
553 if (oper->num_pending == 0)
554 break;
555 }
556 done:
557 if (oper->num_err) {
558 fprintf(stderr, "%u errors on oper, last %u\n",
559 oper->num_err, oper->last_err);
560 }
561 return 0;
562 }
563
564 off_t random_byte_offset(struct io_oper *oper) {
565 off_t num;
566 off_t rand_byte = oper->start;
567 off_t range;
568 off_t offset = 1;
569
570 range = (oper->end - oper->start) / (1024 * 1024);
571 if ((page_size_mask+1) > (1024 * 1024))
572 offset = (page_size_mask+1) / (1024 * 1024);
573 if (range < offset)
574 range = 0;
575 else
576 range -= offset;
577
578 /* find a random mb offset */
579 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
580 rand_byte += num * 1024 * 1024;
581
582 /* find a random byte offset */
583 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
584
585 /* page align */
586 num = (num + page_size_mask) & ~page_size_mask;
587 rand_byte += num;
588
589 if (rand_byte + oper->reclen > oper->end) {
590 rand_byte -= oper->reclen;
591 }
592 return rand_byte;
593 }
594
595 /*
596 * build an aio iocb for an operation, based on oper->rw and the
597 * last offset used. This finds the struct io_unit that will be attached
598 * to the iocb, and things are ready for submission to aio after this
599 * is called.
600 *
601 * returns null on error
602 */
603 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
604 {
605 struct io_unit *io;
606 off_t rand_byte;
607
608 io = find_iou(t, oper);
609 if (!io) {
610 fprintf(stderr, "unable to find io unit\n");
611 return NULL;
612 }
613
614 switch(oper->rw) {
615 case WRITE:
616 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
617 oper->last_offset);
618 oper->last_offset += oper->reclen;
619 break;
620 case READ:
621 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
622 oper->last_offset);
623 oper->last_offset += oper->reclen;
624 break;
625 case RREAD:
626 rand_byte = random_byte_offset(oper);
627 oper->last_offset = rand_byte;
628 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
629 rand_byte);
630 break;
631 case RWRITE:
632 rand_byte = random_byte_offset(oper);
633 oper->last_offset = rand_byte;
634 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
635 rand_byte);
636
637 break;
638 }
639
640 return io;
641 }
642
643 /*
644 * wait for any pending requests, and then free all ram associated with
645 * an operation. returns the last error the operation hit (zero means none)
646 */
647 static int
648 finish_oper(struct thread_info *t, struct io_oper *oper)
649 {
650 unsigned long last_err;
651
652 io_oper_wait(t, oper);
653 last_err = oper->last_err;
654 if (oper->num_pending > 0) {
655 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
656 }
657 close(oper->fd);
658 free(oper);
659 return last_err;
660 }
661
662 /*
663 * allocates an io operation and fills in all the fields. returns
664 * null on error
665 */
666 static struct io_oper *
667 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
668 int iter, char *file_name)
669 {
670 struct io_oper *oper;
671
672 oper = malloc (sizeof(*oper));
673 if (!oper) {
674 fprintf(stderr, "unable to allocate io oper\n");
675 return NULL;
676 }
677 memset(oper, 0, sizeof(*oper));
678
679 oper->depth = depth;
680 oper->start = start;
681 oper->end = end;
682 oper->last_offset = oper->start;
683 oper->fd = fd;
684 oper->reclen = reclen;
685 oper->rw = rw;
686 oper->total_ios = (oper->end - oper->start) / oper->reclen;
687 oper->file_name = file_name;
688
689 return oper;
690 }
691
692 /*
693 * does setup on num_ios worth of iocbs, but does not actually
694 * start any io
695 */
696 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
697 struct iocb **my_iocbs)
698 {
699 int i;
700 struct io_unit *io;
701
702 if (oper->started_ios == 0)
703 gettimeofday(&oper->start_time, NULL);
704
705 if (num_ios == 0)
706 num_ios = oper->total_ios;
707
708 if ((oper->started_ios + num_ios) > oper->total_ios)
709 num_ios = oper->total_ios - oper->started_ios;
710
711 for( i = 0 ; i < num_ios ; i++) {
712 io = build_iocb(t, oper);
713 if (!io) {
714 return -1;
715 }
716 my_iocbs[i] = &io->iocb;
717 }
718 return num_ios;
719 }
720
721 /*
722 * runs through the iocbs in the array provided and updates
723 * counters in the associated oper struct
724 */
725 static void update_iou_counters(struct iocb **my_iocbs, int nr,
726 struct timeval *tv_now)
727 {
728 struct io_unit *io;
729 int i;
730 for (i = 0 ; i < nr ; i++) {
731 io = (struct io_unit *)(my_iocbs[i]);
732 io->io_oper->num_pending++;
733 io->io_oper->started_ios++;
734 io->io_start_time = *tv_now; /* set time of io_submit */
735 }
736 }
737
738 /* starts some io for a given file, returns zero if all went well */
739 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
740 {
741 int ret;
742 struct timeval start_time;
743 struct timeval stop_time;
744
745 resubmit:
746 gettimeofday(&start_time, NULL);
747 ret = io_submit(t->io_ctx, num_ios, my_iocbs);
748 gettimeofday(&stop_time, NULL);
749 calc_latency(&start_time, &stop_time, &t->io_submit_latency);
750
751 if (ret != num_ios) {
752 /* some ios got through */
753 if (ret > 0) {
754 update_iou_counters(my_iocbs, ret, &stop_time);
755 my_iocbs += ret;
756 t->num_global_pending += ret;
757 num_ios -= ret;
758 }
759 /*
760 * we've used all the requests allocated in aio_init, wait and
761 * retry
762 */
763 if (ret > 0 || ret == -EAGAIN) {
764 int old_ret = ret;
765 if ((ret = read_some_events(t) > 0)) {
766 goto resubmit;
767 } else {
768 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
769 abort();
770 }
771 }
772
773 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
774 return -1;
775 }
776 update_iou_counters(my_iocbs, ret, &stop_time);
777 t->num_global_pending += ret;
778 return 0;
779 }
780
781 /*
782 * changes oper->rw to the next in a command sequence, or returns zero
783 * to say this operation is really, completely done for
784 */
785 static int restart_oper(struct io_oper *oper) {
786 int new_rw = 0;
787 if (oper->last_err)
788 return 0;
789
790 /* this switch falls through */
791 switch(oper->rw) {
792 case WRITE:
793 if (stages & (1 << READ))
794 new_rw = READ;
795 case READ:
796 if (!new_rw && stages & (1 << RWRITE))
797 new_rw = RWRITE;
798 case RWRITE:
799 if (!new_rw && stages & (1 << RREAD))
800 new_rw = RREAD;
801 }
802
803 if (new_rw) {
804 oper->started_ios = 0;
805 oper->last_offset = oper->start;
806 oper->stonewalled = 0;
807
808 /*
809 * we're restarting an operation with pending requests, so the
810 * timing info won't be printed by finish_io. Printing it here
811 */
812 if (oper->num_pending)
813 print_time(oper);
814
815 oper->rw = new_rw;
816 return 1;
817 }
818 return 0;
819 }
820
821 static int oper_runnable(struct io_oper *oper) {
822 struct stat buf;
823 int ret;
824
825 /* first context is always runnable, if started_ios > 0, no need to
826 * redo the calculations
827 */
828 if (oper->started_ios || oper->start == 0)
829 return 1;
830 /*
831 * only the sequential phases force delays in starting */
832 if (oper->rw >= RWRITE)
833 return 1;
834 ret = fstat(oper->fd, &buf);
835 if (ret < 0) {
836 perror("fstat");
837 exit(1);
838 }
839 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
840 return 0;
841 return 1;
842 }
843
844 /*
845 * runs through all the io operations on the active list, and starts
846 * a chunk of io on each. If any io operations are completely finished,
847 * it either switches them to the next stage or puts them on the
848 * finished list.
849 *
850 * this function stops after max_io_submit iocbs are sent down the
851 * pipe, even if it has not yet touched all the operations on the
852 * active list. Any operations that have finished are moved onto
853 * the finished_opers list.
854 */
855 static int run_active_list(struct thread_info *t,
856 int io_iter,
857 int max_io_submit)
858 {
859 struct io_oper *oper;
860 struct io_oper *built_opers = NULL;
861 struct iocb **my_iocbs = t->iocbs;
862 int ret = 0;
863 int num_built = 0;
864
865 oper = t->active_opers;
866 while(oper) {
867 if (!oper_runnable(oper)) {
868 oper = oper->next;
869 if (oper == t->active_opers)
870 break;
871 continue;
872 }
873 ret = build_oper(t, oper, io_iter, my_iocbs);
874 if (ret >= 0) {
875 my_iocbs += ret;
876 num_built += ret;
877 oper_list_del(oper, &t->active_opers);
878 oper_list_add(oper, &built_opers);
879 oper = t->active_opers;
880 if (num_built + io_iter > max_io_submit)
881 break;
882 } else
883 break;
884 }
885 if (num_built) {
886 ret = run_built(t, num_built, t->iocbs);
887 if (ret < 0) {
888 fprintf(stderr, "error %d on run_built\n", ret);
889 exit(1);
890 }
891 while(built_opers) {
892 oper = built_opers;
893 oper_list_del(oper, &built_opers);
894 oper_list_add(oper, &t->active_opers);
895 if (oper->started_ios == oper->total_ios) {
896 oper_list_del(oper, &t->active_opers);
897 oper_list_add(oper, &t->finished_opers);
898 }
899 }
900 }
901 return 0;
902 }
903
904 void drop_shm() {
905 int ret;
906 struct shmid_ds ds;
907 if (use_shm != USE_SHM)
908 return;
909
910 ret = shmctl(shm_id, IPC_RMID, &ds);
911 if (ret) {
912 perror("shmctl IPC_RMID");
913 }
914 }
915
916 void aio_setup(io_context_t *io_ctx, int n)
917 {
918 int res = io_queue_init(n, io_ctx);
919 if (res != 0) {
920 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
921 n, res, strerror(-res));
922 exit(3);
923 }
924 }
925
926 /*
927 * allocate io operation and event arrays for a given thread
928 */
929 int setup_ious(struct thread_info *t,
930 int num_files, int depth,
931 int reclen, int max_io_submit) {
932 int i;
933 size_t bytes = num_files * depth * sizeof(*t->ios);
934
935 t->ios = malloc(bytes);
936 if (!t->ios) {
937 fprintf(stderr, "unable to allocate io units\n");
938 return -1;
939 }
940 memset(t->ios, 0, bytes);
941
942 for (i = 0 ; i < depth * num_files; i++) {
943 t->ios[i].buf = aligned_buffer;
944 aligned_buffer += padded_reclen;
945 t->ios[i].buf_size = reclen;
946 if (verify)
947 memset(t->ios[i].buf, 'b', reclen);
948 else
949 memset(t->ios[i].buf, 0, reclen);
950 t->ios[i].next = t->free_ious;
951 t->free_ious = t->ios + i;
952 }
953 if (verify) {
954 verify_buf = aligned_buffer;
955 memset(verify_buf, 'b', reclen);
956 }
957
958 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
959 if (!t->iocbs) {
960 fprintf(stderr, "unable to allocate iocbs\n");
961 goto free_buffers;
962 }
963
964 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
965
966 t->events = malloc(sizeof(struct io_event) * depth * num_files);
967 if (!t->events) {
968 fprintf(stderr, "unable to allocate ram for events\n");
969 goto free_buffers;
970 }
971 memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
972
973 t->num_global_ios = num_files * depth;
974 t->num_global_events = t->num_global_ios;
975 return 0;
976
977 free_buffers:
978 if (t->ios)
979 free(t->ios);
980 if (t->iocbs)
981 free(t->iocbs);
982 if (t->events)
983 free(t->events);
984 return -1;
985 }
986
987 /*
988 * The buffers used for file data are allocated as a single big
989 * malloc, and then each thread and operation takes a piece and uses
990 * that for file data. This lets us do a large shm or bigpages alloc
991 * and without trying to find a special place in each thread to map the
992 * buffers to
993 */
994 int setup_shared_mem(int num_threads, int num_files, int depth,
995 int reclen, int max_io_submit)
996 {
997 char *p = NULL;
998 size_t total_ram;
999
1000 padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
1001 padded_reclen = padded_reclen * (page_size_mask+1);
1002 total_ram = num_files * depth * padded_reclen + num_threads;
1003 if (verify)
1004 total_ram += padded_reclen;
1005
1006 if (use_shm == USE_MALLOC) {
1007 p = malloc(total_ram + page_size_mask);
1008 } else if (use_shm == USE_SHM) {
1009 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1010 if (shm_id < 0) {
1011 perror("shmget");
1012 drop_shm();
1013 goto free_buffers;
1014 }
1015 p = shmat(shm_id, (char *)0x50000000, 0);
1016 if ((long)p == -1) {
1017 perror("shmat");
1018 goto free_buffers;
1019 }
1020 /* won't really be dropped until we shmdt */
1021 drop_shm();
1022 } else if (use_shm == USE_SHMFS) {
1023 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
1024 int fd;
1025
1026 strcpy(mmap_name, "/dev/shm/XXXXXX");
1027 fd = mkstemp(mmap_name);
1028 if (fd < 0) {
1029 perror("mkstemp");
1030 goto free_buffers;
1031 }
1032 unlink(mmap_name);
1033 ftruncate(fd, total_ram);
1034 shm_id = fd;
1035 p = mmap((char *)0x50000000, total_ram,
1036 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1037
1038 if (p == MAP_FAILED) {
1039 perror("mmap");
1040 goto free_buffers;
1041 }
1042 }
1043 if (!p) {
1044 fprintf(stderr, "unable to allocate buffers\n");
1045 goto free_buffers;
1046 }
1047 unaligned_buffer = p;
1048 p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1049 aligned_buffer = p;
1050 return 0;
1051
1052 free_buffers:
1053 drop_shm();
1054 if (unaligned_buffer)
1055 free(unaligned_buffer);
1056 return -1;
1057 }
1058
1059 /*
1060 * runs through all the thread_info structs and calculates a combined
1061 * throughput
1062 */
1063 void global_thread_throughput(struct thread_info *t, char *this_stage) {
1064 int i;
1065 double runtime = time_since_now(&global_stage_start_time);
1066 double total_mb = 0;
1067 double min_trans = 0;
1068
1069 for (i = 0 ; i < num_threads ; i++) {
1070 total_mb += global_thread_info[i].stage_mb_trans;
1071 if (!min_trans || t->stage_mb_trans < min_trans)
1072 min_trans = t->stage_mb_trans;
1073 }
1074 if (total_mb) {
1075 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1076 total_mb / runtime);
1077 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1078 if (stonewall)
1079 fprintf(stderr, " min transfer %.2fMB", min_trans);
1080 fprintf(stderr, "\n");
1081 }
1082 }
1083
1084
1085 /* this is the meat of the state machine. There is a list of
1086 * active operations structs, and as each one finishes the required
1087 * io it is moved to a list of finished operations. Once they have
1088 * all finished whatever stage they were in, they are given the chance
1089 * to restart and pick a different stage (read/write/random read etc)
1090 *
1091 * various timings are printed in between the stages, along with
1092 * thread synchronization if there are more than one threads.
1093 */
1094 int worker(struct thread_info *t)
1095 {
1096 struct io_oper *oper;
1097 char *this_stage = NULL;
1098 struct timeval stage_time;
1099 int status = 0;
1100 int iteration = 0;
1101 int cnt;
1102
1103 aio_setup(&t->io_ctx, 512);
1104
1105 restart:
1106 if (num_threads > 1) {
1107 pthread_mutex_lock(&stage_mutex);
1108 threads_starting++;
1109 if (threads_starting == num_threads) {
1110 threads_ending = 0;
1111 gettimeofday(&global_stage_start_time, NULL);
1112 pthread_cond_broadcast(&stage_cond);
1113 }
1114 while (threads_starting != num_threads)
1115 pthread_cond_wait(&stage_cond, &stage_mutex);
1116 pthread_mutex_unlock(&stage_mutex);
1117 }
1118 if (t->active_opers) {
1119 this_stage = stage_name(t->active_opers->rw);
1120 gettimeofday(&stage_time, NULL);
1121 t->stage_mb_trans = 0;
1122 }
1123
1124 cnt = 0;
1125 /* first we send everything through aio */
1126 while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1127 if (stonewall && threads_ending) {
1128 oper = t->active_opers;
1129 oper->stonewalled = 1;
1130 oper_list_del(oper, &t->active_opers);
1131 oper_list_add(oper, &t->finished_opers);
1132 } else {
1133 run_active_list(t, io_iter, max_io_submit);
1134 }
1135 cnt++;
1136 }
1137 if (latency_stats)
1138 print_latency(t);
1139
1140 if (completion_latency_stats)
1141 print_completion_latency(t);
1142
1143 /* then we wait for all the operations to finish */
1144 oper = t->finished_opers;
1145 do {
1146 if (!oper)
1147 break;
1148 io_oper_wait(t, oper);
1149 oper = oper->next;
1150 } while(oper != t->finished_opers);
1151
1152 /* then we do an fsync to get the timing for any future operations
1153 * right, and check to see if any of these need to get restarted
1154 */
1155 oper = t->finished_opers;
1156 while(oper) {
1157 if (fsync_stages)
1158 fsync(oper->fd);
1159 t->stage_mb_trans += oper_mb_trans(oper);
1160 if (restart_oper(oper)) {
1161 oper_list_del(oper, &t->finished_opers);
1162 oper_list_add(oper, &t->active_opers);
1163 oper = t->finished_opers;
1164 continue;
1165 }
1166 oper = oper->next;
1167 if (oper == t->finished_opers)
1168 break;
1169 }
1170
1171 if (t->stage_mb_trans && t->num_files > 0) {
1172 double seconds = time_since_now(&stage_time);
1173 fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1174 t - global_thread_info, this_stage, t->stage_mb_trans/seconds,
1175 t->stage_mb_trans, seconds);
1176 }
1177
1178 if (num_threads > 1) {
1179 pthread_mutex_lock(&stage_mutex);
1180 threads_ending++;
1181 if (threads_ending == num_threads) {
1182 threads_starting = 0;
1183 pthread_cond_broadcast(&stage_cond);
1184 global_thread_throughput(t, this_stage);
1185 }
1186 while(threads_ending != num_threads)
1187 pthread_cond_wait(&stage_cond, &stage_mutex);
1188 pthread_mutex_unlock(&stage_mutex);
1189 }
1190
1191 /* someone got restarted, go back to the beginning */
1192 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1193 iteration++;
1194 goto restart;
1195 }
1196
1197 /* finally, free all the ram */
1198 while(t->finished_opers) {
1199 oper = t->finished_opers;
1200 oper_list_del(oper, &t->finished_opers);
1201 status = finish_oper(t, oper);
1202 }
1203
1204 if (t->num_global_pending) {
1205 fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1206 }
1207 io_queue_release(t->io_ctx);
1208
1209 return status;
1210 }
1211
1212 typedef void * (*start_routine)(void *);
1213 int run_workers(struct thread_info *t, int num_threads)
1214 {
1215 int ret;
1216 int thread_ret;
1217 int i;
1218
1219 for(i = 0 ; i < num_threads ; i++) {
1220 ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1221 if (ret) {
1222 perror("pthread_create");
1223 exit(1);
1224 }
1225 }
1226 for(i = 0 ; i < num_threads ; i++) {
1227 ret = pthread_join(t[i].tid, (void *)&thread_ret);
1228 if (ret) {
1229 perror("pthread_join");
1230 exit(1);
1231 }
1232 }
1233 return 0;
1234 }
1235
1236 off_t parse_size(char *size_arg, off_t mult) {
1237 char c;
1238 int num;
1239 off_t ret;
1240 c = size_arg[strlen(size_arg) - 1];
1241 if (c > '9') {
1242 size_arg[strlen(size_arg) - 1] = '\0';
1243 }
1244 num = atoi(size_arg);
1245 switch(c) {
1246 case 'g':
1247 case 'G':
1248 mult = 1024 * 1024 * 1024;
1249 break;
1250 case 'm':
1251 case 'M':
1252 mult = 1024 * 1024;
1253 break;
1254 case 'k':
1255 case 'K':
1256 mult = 1024;
1257 break;
1258 case 'b':
1259 case 'B':
1260 mult = 1;
1261 break;
1262 }
1263 ret = mult * num;
1264 return ret;
1265 }
1266
1267 void print_usage(void) {
1268 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1269 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1270 printf(" file1 [file2 ...]\n");
1271 printf("\t-a size in KB at which to align buffers\n");
1272 printf("\t-b max number of iocbs to give io_submit at once\n");
1273 printf("\t-c number of io contexts per file\n");
1274 printf("\t-C offset between contexts, default 2MB\n");
1275 printf("\t-s size in MB of the test file(s), default 1024MB\n");
1276 printf("\t-r record size in KB used for each io, default 64KB\n");
1277 printf("\t-d number of pending aio requests for each file, default 64\n");
1278 printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n");
1279 printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1280 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1281 printf("\t-S Use O_SYNC for writes\n");
1282 printf("\t-o add an operation to the list: write=0, read=1,\n");
1283 printf("\t random write=2, random read=3.\n");
1284 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1285 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1286 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1287 printf("\t-n no fsyncs between write stage and read stage\n");
1288 printf("\t-l print io_submit latencies after each stage\n");
1289 printf("\t-L print io completion latencies after each stage\n");
1290 printf("\t-t number of threads to run\n");
1291 printf("\t-u unlink files after completion\n");
1292 printf("\t-v verification of bytes written\n");
1293 printf("\t-x turn off thread stonewalling\n");
1294 printf("\t-h this message\n");
1295 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1296 printf("\t translate to 400KB, 400MB and 400GB\n");
1297 printf("version %s\n", PROG_VERSION);
1298 }
1299
1300 int main(int ac, char **av)
1301 {
1302 int rwfd;
1303 int i;
1304 int j;
1305 int c;
1306
1307 off_t file_size = 1 * 1024 * 1024 * 1024;
1308 int first_stage = WRITE;
1309 struct io_oper *oper;
1310 int status = 0;
1311 int num_files = 0;
1312 int open_fds = 0;
1313 struct thread_info *t;
1314
1315 page_size_mask = getpagesize() - 1;
1316
1317 while(1) {
1318 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1319 if (c < 0)
1320 break;
1321
1322 switch(c) {
1323 case 'a':
1324 page_size_mask = parse_size(optarg, 1024);
1325 page_size_mask--;
1326 break;
1327 case 'c':
1328 num_contexts = atoi(optarg);
1329 break;
1330 case 'C':
1331 context_offset = parse_size(optarg, 1024 * 1024);
1332 case 'b':
1333 max_io_submit = atoi(optarg);
1334 break;
1335 case 's':
1336 file_size = parse_size(optarg, 1024 * 1024);
1337 break;
1338 case 'd':
1339 depth = atoi(optarg);
1340 break;
1341 case 'r':
1342 rec_len = parse_size(optarg, 1024);
1343 break;
1344 case 'i':
1345 io_iter = atoi(optarg);
1346 break;
1347 case 'I':
1348 iterations = atoi(optarg);
1349 break;
1350 case 'n':
1351 fsync_stages = 0;
1352 break;
1353 case 'l':
1354 latency_stats = 1;
1355 break;
1356 case 'L':
1357 completion_latency_stats = 1;
1358 break;
1359 case 'm':
1360 if (!strcmp(optarg, "shm")) {
1361 fprintf(stderr, "using ipc shm\n");
1362 use_shm = USE_SHM;
1363 } else if (!strcmp(optarg, "shmfs")) {
1364 fprintf(stderr, "using /dev/shm for buffers\n");
1365 use_shm = USE_SHMFS;
1366 }
1367 break;
1368 case 'o':
1369 i = atoi(optarg);
1370 stages |= 1 << i;
1371 fprintf(stderr, "adding stage %s\n", stage_name(i));
1372 break;
1373 case 'O':
1374 o_direct = O_DIRECT;
1375 break;
1376 case 'S':
1377 o_sync = O_SYNC;
1378 break;
1379 case 't':
1380 num_threads = atoi(optarg);
1381 break;
1382 case 'x':
1383 stonewall = 0;
1384 break;
1385 case 'u':
1386 unlink_files = 1;
1387 break;
1388 case 'v':
1389 verify = 1;
1390 break;
1391 case 'h':
1392 default:
1393 print_usage();
1394 exit(1);
1395 }
1396 }
1397
1398 /*
1399 * make sure we don't try to submit more ios than we have allocated
1400 * memory for
1401 */
1402 if (depth < io_iter) {
1403 io_iter = depth;
1404 fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1405 }
1406
1407 if (optind >= ac) {
1408 print_usage();
1409 exit(1);
1410 }
1411
1412 num_files = ac - optind;
1413
1414 if (num_threads > (num_files * num_contexts)) {
1415 num_threads = num_files * num_contexts;
1416 fprintf(stderr, "dropping thread count to the number of contexts %d\n",
1417 num_threads);
1418 }
1419
1420 t = malloc(num_threads * sizeof(*t));
1421 if (!t) {
1422 perror("malloc");
1423 exit(1);
1424 }
1425 global_thread_info = t;
1426
1427 /* by default, allow a huge number of iocbs to be sent towards
1428 * io_submit
1429 */
1430 if (!max_io_submit)
1431 max_io_submit = num_files * io_iter * num_contexts;
1432
1433 /*
1434 * make sure we don't try to submit more ios than max_io_submit allows
1435 */
1436 if (max_io_submit < io_iter) {
1437 io_iter = max_io_submit;
1438 fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1439 }
1440
1441 if (!stages) {
1442 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1443 } else {
1444 for (i = 0 ; i < LAST_STAGE; i++) {
1445 if (stages & (1 << i)) {
1446 first_stage = i;
1447 fprintf(stderr, "starting with %s\n", stage_name(i));
1448 break;
1449 }
1450 }
1451 }
1452
1453 if (file_size < num_contexts * context_offset) {
1454 fprintf(stderr, "file size %Lu too small for %d contexts\n",
1455 file_size, num_contexts);
1456 exit(1);
1457 }
1458
1459 fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
1460 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1461 max_io_submit, (page_size_mask + 1)/1024);
1462 fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
1463 num_threads, num_files, num_contexts,
1464 context_offset / (1024 * 1024), verify ? "on" : "off");
1465 /* open all the files and do any required setup for them */
1466 for (i = optind ; i < ac ; i++) {
1467 int thread_index;
1468 for (j = 0 ; j < num_contexts ; j++) {
1469 thread_index = open_fds % num_threads;
1470 open_fds++;
1471
1472 rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1473 assert(rwfd != -1);
1474
1475 oper = create_oper(rwfd, first_stage, j * context_offset,
1476 file_size - j * context_offset, rec_len,
1477 depth, io_iter, av[i]);
1478 if (!oper) {
1479 fprintf(stderr, "error in create_oper\n");
1480 exit(-1);
1481 }
1482 oper_list_add(oper, &t[thread_index].active_opers);
1483 t[thread_index].num_files++;
1484 }
1485 }
1486 if (setup_shared_mem(num_threads, num_files * num_contexts,
1487 depth, rec_len, max_io_submit))
1488 {
1489 exit(1);
1490 }
1491 for (i = 0 ; i < num_threads ; i++) {
1492 if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1493 exit(1);
1494 }
1495 if (num_threads > 1){
1496 printf("Running multi thread version num_threads:%d\n", num_threads);
1497 run_workers(t, num_threads);
1498 } else {
1499 printf("Running single thread version \n");
1500 status = worker(t);
1501 }
1502 if (unlink_files) {
1503 for (i = optind ; i < ac ; i++) {
1504 printf("Cleaning up file %s \n", av[i]);
1505 unlink(av[i]);
1506 }
1507 }
1508
1509 if (status) {
1510 exit(1);
1511 }
1512 return status;
1513 }
1514
1515