1 /*
2  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of version 2 of the GNU General Public License as
6  * published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it would be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  *
12  * Further, this software is distributed without any warranty that it is
13  * free of the rightful claim of any third person regarding infringement
14  * or the like.  Any license provided herein, whether implied or
15  * otherwise, applies only to this software file.  Patent licenses, if
16  * any, provided herein do not apply to combinations of this program with
17  * other software, or any other product whatsoever.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write the Free Software Foundation, Inc.,
21  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22  *
23  * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24  * Mountain View, CA  94043, or:
25  *
26  *
27  * aio-stress
28  *
29  * will open or create each file on the command line, and start a series
30  * of aio to it.
31  *
32  * aio is done in a rotating loop.  first file1 gets 8 requests, then
33  * file2, then file3 etc.  As each file finishes writing, it is switched
34  * to reads
35  *
36  * io buffers are aligned in case you want to do raw io
37  *
38  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39  *
40  * run aio-stress -h to see the options
41  *
42  * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43  */
44 #define _FILE_OFFSET_BITS 64
45 #define PROG_VERSION "0.21"
46 #define NEW_GETEVENTS
47 
48 #define _GNU_SOURCE
49 #include <stdio.h>
50 #include <errno.h>
51 #include <assert.h>
52 #include <stdlib.h>
53 
54 #include <sys/types.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <unistd.h>
58 #include <sys/time.h>
59 #include <libaio.h>
60 #include <sys/ipc.h>
61 #include <sys/shm.h>
62 #include <sys/mman.h>
63 #include <string.h>
64 #include <pthread.h>
65 
66 #define IO_FREE 0
67 #define IO_PENDING 1
68 #define RUN_FOREVER -1
69 
70 enum {
71 	WRITE,
72 	READ,
73 	RWRITE,
74 	RREAD,
75 	LAST_STAGE,
76 };
77 
78 #define USE_MALLOC 0
79 #define USE_SHM 1
80 #define USE_SHMFS 2
81 
82 /*
83  * various globals, these are effectively read only by the time the threads
84  * are started
85  */
86 long stages = 0;
87 unsigned long page_size_mask;
88 int o_direct = 0;
89 int o_sync = 0;
90 int latency_stats = 0;
91 int completion_latency_stats = 0;
92 int io_iter = 8;
93 int iterations = RUN_FOREVER;
94 int max_io_submit = 0;
95 long rec_len = 64 * 1024;
96 int depth = 64;
97 int num_threads = 1;
98 int num_contexts = 1;
99 off_t context_offset = 2 * 1024 * 1024;
100 int fsync_stages = 1;
101 int use_shm = 0;
102 int shm_id;
103 char *unaligned_buffer = NULL;
104 char *aligned_buffer = NULL;
105 int padded_reclen = 0;
106 int stonewall = 1;
107 int verify = 0;
108 char *verify_buf = NULL;
109 int unlink_files = 0;
110 
111 struct io_unit;
112 struct thread_info;
113 
114 /* pthread mutexes and other globals for keeping the threads in sync */
115 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
116 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
117 int threads_ending = 0;
118 int threads_starting = 0;
119 struct timeval global_stage_start_time;
120 struct thread_info *global_thread_info;
121 
122 /*
123  * latencies during io_submit are measured, these are the
124  * granularities for deviations
125  */
126 #define DEVIATIONS 6
127 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
128 
129 struct io_latency {
130 	double max;
131 	double min;
132 	double total_io;
133 	double total_lat;
134 	double deviations[DEVIATIONS];
135 };
136 
137 /* container for a series of operations to a file */
138 struct io_oper {
139 	/* already open file descriptor, valid for whatever operation you want */
140 	int fd;
141 
142 	/* starting byte of the operation */
143 	off_t start;
144 
145 	/* ending byte of the operation */
146 	off_t end;
147 
148 	/* size of the read/write buffer */
149 	int reclen;
150 
151 	/* max number of pending requests before a wait is triggered */
152 	int depth;
153 
154 	/* current number of pending requests */
155 	int num_pending;
156 
157 	/* last error, zero if there were none */
158 	int last_err;
159 
160 	/* total number of errors hit. */
161 	int num_err;
162 
163 	/* read,write, random, etc */
164 	int rw;
165 
166 	/* number of I/O that will get sent to aio */
167 	int total_ios;
168 
169 	/* number of I/O we've already sent */
170 	int started_ios;
171 
172 	/* last offset used in an io operation */
173 	off_t last_offset;
174 
175 	/* stonewalled = 1 when we got cut off before submitting all our I/O */
176 	int stonewalled;
177 
178 	/* list management */
179 	struct io_oper *next;
180 	struct io_oper *prev;
181 
182 	struct timeval start_time;
183 
184 	char *file_name;
185 };
186 
187 /* a single io, and all the tracking needed for it */
188 struct io_unit {
189 	/* note, iocb must go first! */
190 	struct iocb iocb;
191 
192 	/* pointer to parent io operation struct */
193 	struct io_oper *io_oper;
194 
195 	/* aligned buffer */
196 	char *buf;
197 
198 	/* size of the aligned buffer (record size) */
199 	int buf_size;
200 
201 	/* state of this io unit (free, pending, done) */
202 	int busy;
203 
204 	/* result of last operation */
205 	long res;
206 
207 	struct io_unit *next;
208 
209 	struct timeval io_start_time;	/* time of io_submit */
210 };
211 
212 struct thread_info {
213 	io_context_t io_ctx;
214 	pthread_t tid;
215 
216 	/* allocated array of io_unit structs */
217 	struct io_unit *ios;
218 
219 	/* list of io units available for io */
220 	struct io_unit *free_ious;
221 
222 	/* number of io units in the I/O array */
223 	int num_global_ios;
224 
225 	/* number of io units in flight */
226 	int num_global_pending;
227 
228 	/* preallocated array of iocb pointers, only used in run_active */
229 	struct iocb **iocbs;
230 
231 	/* preallocated array of events */
232 	struct io_event *events;
233 
234 	/* size of the events array */
235 	int num_global_events;
236 
237 	/* latency stats for io_submit */
238 	struct io_latency io_submit_latency;
239 
240 	/* list of operations still in progress, and of those finished */
241 	struct io_oper *active_opers;
242 	struct io_oper *finished_opers;
243 
244 	/* number of files this thread is doing io on */
245 	int num_files;
246 
247 	/* how much io this thread did in the last stage */
248 	double stage_mb_trans;
249 
250 	/* latency completion stats i/o time from io_submit until io_getevents */
251 	struct io_latency io_completion_latency;
252 };
253 
254 /*
255  * return seconds between start_tv and stop_tv in double precision
256  */
time_since(struct timeval * start_tv,struct timeval * stop_tv)257 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
258 {
259 	double sec, usec;
260 	double ret;
261 	sec = stop_tv->tv_sec - start_tv->tv_sec;
262 	usec = stop_tv->tv_usec - start_tv->tv_usec;
263 	if (sec > 0 && usec < 0) {
264 		sec--;
265 		usec += 1000000;
266 	}
267 	ret = sec + usec / (double)1000000;
268 	if (ret < 0)
269 		ret = 0;
270 	return ret;
271 }
272 
273 /*
274  * return seconds between start_tv and now in double precision
275  */
time_since_now(struct timeval * start_tv)276 static double time_since_now(struct timeval *start_tv)
277 {
278 	struct timeval stop_time;
279 	gettimeofday(&stop_time, NULL);
280 	return time_since(start_tv, &stop_time);
281 }
282 
283 /*
284  * Add latency info to latency struct
285  */
calc_latency(struct timeval * start_tv,struct timeval * stop_tv,struct io_latency * lat)286 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
287 			 struct io_latency *lat)
288 {
289 	double delta;
290 	int i;
291 	delta = time_since(start_tv, stop_tv);
292 	delta = delta * 1000;
293 
294 	if (delta > lat->max)
295 		lat->max = delta;
296 	if (!lat->min || delta < lat->min)
297 		lat->min = delta;
298 	lat->total_io++;
299 	lat->total_lat += delta;
300 	for (i = 0; i < DEVIATIONS; i++) {
301 		if (delta < deviations[i]) {
302 			lat->deviations[i]++;
303 			break;
304 		}
305 	}
306 }
307 
oper_list_add(struct io_oper * oper,struct io_oper ** list)308 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
309 {
310 	if (!*list) {
311 		*list = oper;
312 		oper->prev = oper->next = oper;
313 		return;
314 	}
315 	oper->prev = (*list)->prev;
316 	oper->next = *list;
317 	(*list)->prev->next = oper;
318 	(*list)->prev = oper;
319 	return;
320 }
321 
oper_list_del(struct io_oper * oper,struct io_oper ** list)322 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
323 {
324 	if ((*list)->next == (*list)->prev && *list == (*list)->next) {
325 		*list = NULL;
326 		return;
327 	}
328 	oper->prev->next = oper->next;
329 	oper->next->prev = oper->prev;
330 	if (*list == oper)
331 		*list = oper->next;
332 }
333 
334 /* worker func to check error fields in the io unit */
check_finished_io(struct io_unit * io)335 static int check_finished_io(struct io_unit *io)
336 {
337 	int i;
338 	if (io->res != io->buf_size) {
339 
340 		struct stat s;
341 		fstat(io->io_oper->fd, &s);
342 
343 		/*
344 		 * If file size is large enough for the read, then this short
345 		 * read is an error.
346 		 */
347 		if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
348 		    s.st_size > (io->iocb.u.c.offset + io->res)) {
349 
350 			fprintf(stderr,
351 				"io err %lu (%s) op %d, off %Lu size %d\n",
352 				io->res, strerror(-io->res),
353 				io->iocb.aio_lio_opcode, io->iocb.u.c.offset,
354 				io->buf_size);
355 			io->io_oper->last_err = io->res;
356 			io->io_oper->num_err++;
357 			return -1;
358 		}
359 	}
360 	if (verify && io->io_oper->rw == READ) {
361 		if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
362 			fprintf(stderr,
363 				"verify error, file %s offset %Lu contents (offset:bad:good):\n",
364 				io->io_oper->file_name, io->iocb.u.c.offset);
365 
366 			for (i = 0; i < io->io_oper->reclen; i++) {
367 				if (io->buf[i] != verify_buf[i]) {
368 					fprintf(stderr, "%d:%c:%c ", i,
369 						io->buf[i], verify_buf[i]);
370 				}
371 			}
372 			fprintf(stderr, "\n");
373 		}
374 
375 	}
376 	return 0;
377 }
378 
379 /* worker func to check the busy bits and get an io unit ready for use */
grab_iou(struct io_unit * io,struct io_oper * oper)380 static int grab_iou(struct io_unit *io, struct io_oper *oper)
381 {
382 	if (io->busy == IO_PENDING)
383 		return -1;
384 
385 	io->busy = IO_PENDING;
386 	io->res = 0;
387 	io->io_oper = oper;
388 	return 0;
389 }
390 
stage_name(int rw)391 char *stage_name(int rw)
392 {
393 	switch (rw) {
394 	case WRITE:
395 		return "write";
396 	case READ:
397 		return "read";
398 	case RWRITE:
399 		return "random write";
400 	case RREAD:
401 		return "random read";
402 	}
403 	return "unknown";
404 }
405 
oper_mb_trans(struct io_oper * oper)406 static inline double oper_mb_trans(struct io_oper *oper)
407 {
408 	return ((double)oper->started_ios * (double)oper->reclen) /
409 	    (double)(1024 * 1024);
410 }
411 
print_time(struct io_oper * oper)412 static void print_time(struct io_oper *oper)
413 {
414 	double runtime;
415 	double tput;
416 	double mb;
417 
418 	runtime = time_since_now(&oper->start_time);
419 	mb = oper_mb_trans(oper);
420 	tput = mb / runtime;
421 	fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
422 		stage_name(oper->rw), oper->file_name, tput, mb, runtime);
423 }
424 
print_lat(char * str,struct io_latency * lat)425 static void print_lat(char *str, struct io_latency *lat)
426 {
427 	double avg = lat->total_lat / lat->total_io;
428 	int i;
429 	double total_counted = 0;
430 	fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
431 		str, lat->min, avg, lat->max);
432 
433 	for (i = 0; i < DEVIATIONS; i++) {
434 		fprintf(stderr, " %.0f < %d", lat->deviations[i],
435 			deviations[i]);
436 		total_counted += lat->deviations[i];
437 	}
438 	if (total_counted && lat->total_io - total_counted)
439 		fprintf(stderr, " < %.0f", lat->total_io - total_counted);
440 	fprintf(stderr, "\n");
441 	memset(lat, 0, sizeof(*lat));
442 }
443 
print_latency(struct thread_info * t)444 static void print_latency(struct thread_info *t)
445 {
446 	struct io_latency *lat = &t->io_submit_latency;
447 	print_lat("latency", lat);
448 }
449 
print_completion_latency(struct thread_info * t)450 static void print_completion_latency(struct thread_info *t)
451 {
452 	struct io_latency *lat = &t->io_completion_latency;
453 	print_lat("completion latency", lat);
454 }
455 
456 /*
457  * updates the fields in the io operation struct that belongs to this
458  * io unit, and make the io unit reusable again
459  */
finish_io(struct thread_info * t,struct io_unit * io,long result,struct timeval * tv_now)460 void finish_io(struct thread_info *t, struct io_unit *io, long result,
461 	       struct timeval *tv_now)
462 {
463 	struct io_oper *oper = io->io_oper;
464 
465 	calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
466 	io->res = result;
467 	io->busy = IO_FREE;
468 	io->next = t->free_ious;
469 	t->free_ious = io;
470 	oper->num_pending--;
471 	t->num_global_pending--;
472 	check_finished_io(io);
473 	if (oper->num_pending == 0 &&
474 	    (oper->started_ios == oper->total_ios || oper->stonewalled)) {
475 		print_time(oper);
476 	}
477 }
478 
read_some_events(struct thread_info * t)479 int read_some_events(struct thread_info *t)
480 {
481 	struct io_unit *event_io;
482 	struct io_event *event;
483 	int nr;
484 	int i;
485 	int min_nr = io_iter;
486 	struct timeval stop_time;
487 
488 	if (t->num_global_pending < io_iter)
489 		min_nr = t->num_global_pending;
490 
491 #ifdef NEW_GETEVENTS
492 	nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,
493 			  NULL);
494 #else
495 	nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
496 #endif
497 	if (nr <= 0)
498 		return nr;
499 
500 	gettimeofday(&stop_time, NULL);
501 	for (i = 0; i < nr; i++) {
502 		event = t->events + i;
503 		event_io = (struct io_unit *)((unsigned long)event->obj);
504 		finish_io(t, event_io, event->res, &stop_time);
505 	}
506 	return nr;
507 }
508 
509 /*
510  * finds a free io unit, waiting for pending requests if required.  returns
511  * null if none could be found
512  */
find_iou(struct thread_info * t,struct io_oper * oper)513 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
514 {
515 	struct io_unit *event_io;
516 	int nr;
517 
518 retry:
519 	if (t->free_ious) {
520 		event_io = t->free_ious;
521 		t->free_ious = t->free_ious->next;
522 		if (grab_iou(event_io, oper)) {
523 			fprintf(stderr, "io unit on free list but not free\n");
524 			abort();
525 		}
526 		return event_io;
527 	}
528 	nr = read_some_events(t);
529 	if (nr > 0)
530 		goto retry;
531 	else
532 		fprintf(stderr, "no free ious after read_some_events\n");
533 	return NULL;
534 }
535 
536 /*
537  * wait for all pending requests for this io operation to finish
538  */
io_oper_wait(struct thread_info * t,struct io_oper * oper)539 static int io_oper_wait(struct thread_info *t, struct io_oper *oper)
540 {
541 	struct io_event event;
542 	struct io_unit *event_io;
543 
544 	if (oper == NULL) {
545 		return 0;
546 	}
547 
548 	if (oper->num_pending == 0)
549 		goto done;
550 
551 	/* this func is not speed sensitive, no need to go wild reading
552 	 * more than one event at a time
553 	 */
554 #ifdef NEW_GETEVENTS
555 	while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
556 #else
557 	while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
558 #endif
559 		struct timeval tv_now;
560 		event_io = (struct io_unit *)((unsigned long)event.obj);
561 
562 		gettimeofday(&tv_now, NULL);
563 		finish_io(t, event_io, event.res, &tv_now);
564 
565 		if (oper->num_pending == 0)
566 			break;
567 	}
568 done:
569 	if (oper->num_err) {
570 		fprintf(stderr, "%u errors on oper, last %u\n",
571 			oper->num_err, oper->last_err);
572 	}
573 	return 0;
574 }
575 
576 off_t random_byte_offset(struct io_oper * oper)
577 {
578 	off_t num;
579 	off_t rand_byte = oper->start;
580 	off_t range;
581 	off_t offset = 1;
582 
583 	range = (oper->end - oper->start) / (1024 * 1024);
584 	if ((page_size_mask + 1) > (1024 * 1024))
585 		offset = (page_size_mask + 1) / (1024 * 1024);
586 	if (range < offset)
587 		range = 0;
588 	else
589 		range -= offset;
590 
591 	/* find a random mb offset */
592 	num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0));
593 	rand_byte += num * 1024 * 1024;
594 
595 	/* find a random byte offset */
596 	num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
597 
598 	/* page align */
599 	num = (num + page_size_mask) & ~page_size_mask;
600 	rand_byte += num;
601 
602 	if (rand_byte + oper->reclen > oper->end) {
603 		rand_byte -= oper->reclen;
604 	}
605 	return rand_byte;
606 }
607 
608 /*
609  * build an aio iocb for an operation, based on oper->rw and the
610  * last offset used.  This finds the struct io_unit that will be attached
611  * to the iocb, and things are ready for submission to aio after this
612  * is called.
613  *
614  * returns null on error
615  */
616 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
617 {
618 	struct io_unit *io;
619 	off_t rand_byte;
620 
621 	io = find_iou(t, oper);
622 	if (!io) {
623 		fprintf(stderr, "unable to find io unit\n");
624 		return NULL;
625 	}
626 
627 	switch (oper->rw) {
628 	case WRITE:
629 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen,
630 			       oper->last_offset);
631 		oper->last_offset += oper->reclen;
632 		break;
633 	case READ:
634 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen,
635 			      oper->last_offset);
636 		oper->last_offset += oper->reclen;
637 		break;
638 	case RREAD:
639 		rand_byte = random_byte_offset(oper);
640 		oper->last_offset = rand_byte;
641 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen,
642 			      rand_byte);
643 		break;
644 	case RWRITE:
645 		rand_byte = random_byte_offset(oper);
646 		oper->last_offset = rand_byte;
647 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen,
648 			       rand_byte);
649 
650 		break;
651 	}
652 
653 	return io;
654 }
655 
656 /*
657  * wait for any pending requests, and then free all ram associated with
658  * an operation.  returns the last error the operation hit (zero means none)
659  */
660 static int finish_oper(struct thread_info *t, struct io_oper *oper)
661 {
662 	unsigned long last_err;
663 
664 	io_oper_wait(t, oper);
665 	last_err = oper->last_err;
666 	if (oper->num_pending > 0) {
667 		fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
668 	}
669 	close(oper->fd);
670 	free(oper);
671 	return last_err;
672 }
673 
674 /*
675  * allocates an io operation and fills in all the fields.  returns
676  * null on error
677  */
678 static struct io_oper *create_oper(int fd, int rw, off_t start, off_t end,
679 				   int reclen, int depth, int iter,
680 				   char *file_name)
681 {
682 	struct io_oper *oper;
683 
684 	oper = malloc(sizeof(*oper));
685 	if (!oper) {
686 		fprintf(stderr, "unable to allocate io oper\n");
687 		return NULL;
688 	}
689 	memset(oper, 0, sizeof(*oper));
690 
691 	oper->depth = depth;
692 	oper->start = start;
693 	oper->end = end;
694 	oper->last_offset = oper->start;
695 	oper->fd = fd;
696 	oper->reclen = reclen;
697 	oper->rw = rw;
698 	oper->total_ios = (oper->end - oper->start) / oper->reclen;
699 	oper->file_name = file_name;
700 
701 	return oper;
702 }
703 
704 /*
705  * does setup on num_ios worth of iocbs, but does not actually
706  * start any io
707  */
708 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
709 	       struct iocb **my_iocbs)
710 {
711 	int i;
712 	struct io_unit *io;
713 
714 	if (oper->started_ios == 0)
715 		gettimeofday(&oper->start_time, NULL);
716 
717 	if (num_ios == 0)
718 		num_ios = oper->total_ios;
719 
720 	if ((oper->started_ios + num_ios) > oper->total_ios)
721 		num_ios = oper->total_ios - oper->started_ios;
722 
723 	for (i = 0; i < num_ios; i++) {
724 		io = build_iocb(t, oper);
725 		if (!io) {
726 			return -1;
727 		}
728 		my_iocbs[i] = &io->iocb;
729 	}
730 	return num_ios;
731 }
732 
733 /*
734  * runs through the iocbs in the array provided and updates
735  * counters in the associated oper struct
736  */
737 static void update_iou_counters(struct iocb **my_iocbs, int nr,
738 				struct timeval *tv_now)
739 {
740 	struct io_unit *io;
741 	int i;
742 	for (i = 0; i < nr; i++) {
743 		io = (struct io_unit *)(my_iocbs[i]);
744 		io->io_oper->num_pending++;
745 		io->io_oper->started_ios++;
746 		io->io_start_time = *tv_now;	/* set time of io_submit */
747 	}
748 }
749 
750 /* starts some io for a given file, returns zero if all went well */
751 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
752 {
753 	int ret;
754 	struct timeval start_time;
755 	struct timeval stop_time;
756 
757 resubmit:
758 	gettimeofday(&start_time, NULL);
759 	ret = io_submit(t->io_ctx, num_ios, my_iocbs);
760 	gettimeofday(&stop_time, NULL);
761 	calc_latency(&start_time, &stop_time, &t->io_submit_latency);
762 
763 	if (ret != num_ios) {
764 		/* some I/O got through */
765 		if (ret > 0) {
766 			update_iou_counters(my_iocbs, ret, &stop_time);
767 			my_iocbs += ret;
768 			t->num_global_pending += ret;
769 			num_ios -= ret;
770 		}
771 		/*
772 		 * we've used all the requests allocated in aio_init, wait and
773 		 * retry
774 		 */
775 		if (ret > 0 || ret == -EAGAIN) {
776 			int old_ret = ret;
777 			if ((ret = read_some_events(t) > 0)) {
778 				goto resubmit;
779 			} else {
780 				fprintf(stderr, "ret was %d and now is %d\n",
781 					ret, old_ret);
782 				abort();
783 			}
784 		}
785 
786 		fprintf(stderr, "ret %d (%s) on io_submit\n", ret,
787 			strerror(-ret));
788 		return -1;
789 	}
790 	update_iou_counters(my_iocbs, ret, &stop_time);
791 	t->num_global_pending += ret;
792 	return 0;
793 }
794 
795 /*
796  * changes oper->rw to the next in a command sequence, or returns zero
797  * to say this operation is really, completely done for
798  */
799 static int restart_oper(struct io_oper *oper)
800 {
801 	int new_rw = 0;
802 	if (oper->last_err)
803 		return 0;
804 
805 	/* this switch falls through */
806 	switch (oper->rw) {
807 	case WRITE:
808 		if (stages & (1 << READ))
809 			new_rw = READ;
810 	case READ:
811 		if (!new_rw && stages & (1 << RWRITE))
812 			new_rw = RWRITE;
813 	case RWRITE:
814 		if (!new_rw && stages & (1 << RREAD))
815 			new_rw = RREAD;
816 	}
817 
818 	if (new_rw) {
819 		oper->started_ios = 0;
820 		oper->last_offset = oper->start;
821 		oper->stonewalled = 0;
822 
823 		/*
824 		 * we're restarting an operation with pending requests, so the
825 		 * timing info won't be printed by finish_io.  Printing it here
826 		 */
827 		if (oper->num_pending)
828 			print_time(oper);
829 
830 		oper->rw = new_rw;
831 		return 1;
832 	}
833 	return 0;
834 }
835 
836 static int oper_runnable(struct io_oper *oper)
837 {
838 	struct stat buf;
839 	int ret;
840 
841 	/* first context is always runnable, if started_ios > 0, no need to
842 	 * redo the calculations
843 	 */
844 	if (oper->started_ios || oper->start == 0)
845 		return 1;
846 	/*
847 	 * only the sequential phases force delays in starting */
848 	if (oper->rw >= RWRITE)
849 		return 1;
850 	ret = fstat(oper->fd, &buf);
851 	if (ret < 0) {
852 		perror("fstat");
853 		exit(1);
854 	}
855 	if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
856 		return 0;
857 	return 1;
858 }
859 
860 /*
861  * runs through all the io operations on the active list, and starts
862  * a chunk of io on each.  If any io operations are completely finished,
863  * it either switches them to the next stage or puts them on the
864  * finished list.
865  *
866  * this function stops after max_io_submit iocbs are sent down the
867  * pipe, even if it has not yet touched all the operations on the
868  * active list.  Any operations that have finished are moved onto
869  * the finished_opers list.
870  */
871 static int run_active_list(struct thread_info *t,
872 			   int io_iter, int max_io_submit)
873 {
874 	struct io_oper *oper;
875 	struct io_oper *built_opers = NULL;
876 	struct iocb **my_iocbs = t->iocbs;
877 	int ret = 0;
878 	int num_built = 0;
879 
880 	oper = t->active_opers;
881 	while (oper) {
882 		if (!oper_runnable(oper)) {
883 			oper = oper->next;
884 			if (oper == t->active_opers)
885 				break;
886 			continue;
887 		}
888 		ret = build_oper(t, oper, io_iter, my_iocbs);
889 		if (ret >= 0) {
890 			my_iocbs += ret;
891 			num_built += ret;
892 			oper_list_del(oper, &t->active_opers);
893 			oper_list_add(oper, &built_opers);
894 			oper = t->active_opers;
895 			if (num_built + io_iter > max_io_submit)
896 				break;
897 		} else
898 			break;
899 	}
900 	if (num_built) {
901 		ret = run_built(t, num_built, t->iocbs);
902 		if (ret < 0) {
903 			fprintf(stderr, "error %d on run_built\n", ret);
904 			exit(1);
905 		}
906 		while (built_opers) {
907 			oper = built_opers;
908 			oper_list_del(oper, &built_opers);
909 			oper_list_add(oper, &t->active_opers);
910 			if (oper->started_ios == oper->total_ios) {
911 				oper_list_del(oper, &t->active_opers);
912 				oper_list_add(oper, &t->finished_opers);
913 			}
914 		}
915 	}
916 	return 0;
917 }
918 
919 void drop_shm()
920 {
921 	int ret;
922 	struct shmid_ds ds;
923 	if (use_shm != USE_SHM)
924 		return;
925 
926 	ret = shmctl(shm_id, IPC_RMID, &ds);
927 	if (ret) {
928 		perror("shmctl IPC_RMID");
929 	}
930 }
931 
932 void aio_setup(io_context_t * io_ctx, int n)
933 {
934 	int res = io_queue_init(n, io_ctx);
935 	if (res != 0) {
936 		fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
937 			n, res, strerror(-res));
938 		exit(3);
939 	}
940 }
941 
942 /*
943  * allocate io operation and event arrays for a given thread
944  */
945 int setup_ious(struct thread_info *t,
946 	       int num_files, int depth, int reclen, int max_io_submit)
947 {
948 	int i;
949 	size_t bytes = num_files * depth * sizeof(*t->ios);
950 
951 	t->ios = malloc(bytes);
952 	if (!t->ios) {
953 		fprintf(stderr, "unable to allocate io units\n");
954 		return -1;
955 	}
956 	memset(t->ios, 0, bytes);
957 
958 	for (i = 0; i < depth * num_files; i++) {
959 		t->ios[i].buf = aligned_buffer;
960 		aligned_buffer += padded_reclen;
961 		t->ios[i].buf_size = reclen;
962 		if (verify)
963 			memset(t->ios[i].buf, 'b', reclen);
964 		else
965 			memset(t->ios[i].buf, 0, reclen);
966 		t->ios[i].next = t->free_ious;
967 		t->free_ious = t->ios + i;
968 	}
969 	if (verify) {
970 		verify_buf = aligned_buffer;
971 		memset(verify_buf, 'b', reclen);
972 	}
973 
974 	t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
975 	if (!t->iocbs) {
976 		fprintf(stderr, "unable to allocate iocbs\n");
977 		goto free_buffers;
978 	}
979 
980 	memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
981 
982 	t->events = malloc(sizeof(struct io_event) * depth * num_files);
983 	if (!t->events) {
984 		fprintf(stderr, "unable to allocate ram for events\n");
985 		goto free_buffers;
986 	}
987 	memset(t->events, 0, num_files * sizeof(struct io_event) * depth);
988 
989 	t->num_global_ios = num_files * depth;
990 	t->num_global_events = t->num_global_ios;
991 	return 0;
992 
993 free_buffers:
994 	free(t->ios);
995 	free(t->iocbs);
996 	free(t->events);
997 	return -1;
998 }
999 
1000 /*
1001  * The buffers used for file data are allocated as a single big
1002  * malloc, and then each thread and operation takes a piece and uses
1003  * that for file data.  This lets us do a large shm or bigpages alloc
1004  * and without trying to find a special place in each thread to map the
1005  * buffers to
1006  */
1007 int setup_shared_mem(int num_threads, int num_files, int depth,
1008 		     int reclen, int max_io_submit)
1009 {
1010 	char *p = NULL;
1011 	size_t total_ram;
1012 
1013 	padded_reclen = (reclen + page_size_mask) / (page_size_mask + 1);
1014 	padded_reclen = padded_reclen * (page_size_mask + 1);
1015 	total_ram = num_files * depth * padded_reclen + num_threads;
1016 	if (verify)
1017 		total_ram += padded_reclen;
1018 
1019 	/* for aligning buffer after the allocation */
1020 	total_ram += page_size_mask;
1021 
1022 	if (use_shm == USE_MALLOC) {
1023 		p = malloc(total_ram);
1024 	} else if (use_shm == USE_SHM) {
1025 		shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1026 		if (shm_id < 0) {
1027 			perror("shmget");
1028 			drop_shm();
1029 			goto free_buffers;
1030 		}
1031 		p = shmat(shm_id, (char *)0x50000000, 0);
1032 		if ((long)p == -1) {
1033 			perror("shmat");
1034 			goto free_buffers;
1035 		}
1036 		/* won't really be dropped until we shmdt */
1037 		drop_shm();
1038 	} else if (use_shm == USE_SHMFS) {
1039 		char mmap_name[16];	/* /dev/shm/ + null + XXXXXX */
1040 		int fd;
1041 
1042 		strcpy(mmap_name, "/dev/shm/XXXXXX");
1043 		fd = mkstemp(mmap_name);
1044 		if (fd < 0) {
1045 			perror("mkstemp");
1046 			goto free_buffers;
1047 		}
1048 		unlink(mmap_name);
1049 		ftruncate(fd, total_ram);
1050 		shm_id = fd;
1051 		p = mmap((char *)0x50000000, total_ram,
1052 			 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1053 
1054 		if (p == MAP_FAILED) {
1055 			perror("mmap");
1056 			goto free_buffers;
1057 		}
1058 	}
1059 	if (!p) {
1060 		fprintf(stderr, "unable to allocate buffers\n");
1061 		goto free_buffers;
1062 	}
1063 	unaligned_buffer = p;
1064 	p = (char *)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1065 	aligned_buffer = p;
1066 	return 0;
1067 
1068 free_buffers:
1069 	drop_shm();
1070 	if (unaligned_buffer)
1071 		free(unaligned_buffer);
1072 	return -1;
1073 }
1074 
1075 /*
1076  * runs through all the thread_info structs and calculates a combined
1077  * throughput
1078  */
1079 void global_thread_throughput(struct thread_info *t, char *this_stage)
1080 {
1081 	int i;
1082 	double runtime = time_since_now(&global_stage_start_time);
1083 	double total_mb = 0;
1084 	double min_trans = 0;
1085 
1086 	for (i = 0; i < num_threads; i++) {
1087 		total_mb += global_thread_info[i].stage_mb_trans;
1088 		if (!min_trans || t->stage_mb_trans < min_trans)
1089 			min_trans = t->stage_mb_trans;
1090 	}
1091 	if (total_mb) {
1092 		fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1093 			total_mb / runtime);
1094 		fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1095 		if (stonewall)
1096 			fprintf(stderr, " min transfer %.2fMB", min_trans);
1097 		fprintf(stderr, "\n");
1098 	}
1099 }
1100 
1101 /* this is the meat of the state machine.  There is a list of
1102  * active operations structs, and as each one finishes the required
1103  * io it is moved to a list of finished operations.  Once they have
1104  * all finished whatever stage they were in, they are given the chance
1105  * to restart and pick a different stage (read/write/random read etc)
1106  *
1107  * various timings are printed in between the stages, along with
1108  * thread synchronization if there are more than one threads.
1109  */
1110 int worker(struct thread_info *t)
1111 {
1112 	struct io_oper *oper;
1113 	char *this_stage = NULL;
1114 	struct timeval stage_time;
1115 	int status = 0;
1116 	int iteration = 0;
1117 	int cnt;
1118 
1119 	aio_setup(&t->io_ctx, 512);
1120 
1121 restart:
1122 	if (num_threads > 1) {
1123 		pthread_mutex_lock(&stage_mutex);
1124 		threads_starting++;
1125 		if (threads_starting == num_threads) {
1126 			threads_ending = 0;
1127 			gettimeofday(&global_stage_start_time, NULL);
1128 			pthread_cond_broadcast(&stage_cond);
1129 		}
1130 		while (threads_starting != num_threads)
1131 			pthread_cond_wait(&stage_cond, &stage_mutex);
1132 		pthread_mutex_unlock(&stage_mutex);
1133 	}
1134 	if (t->active_opers) {
1135 		this_stage = stage_name(t->active_opers->rw);
1136 		gettimeofday(&stage_time, NULL);
1137 		t->stage_mb_trans = 0;
1138 	}
1139 
1140 	cnt = 0;
1141 	/* first we send everything through aio */
1142 	while (t->active_opers
1143 	       && (cnt < iterations || iterations == RUN_FOREVER)) {
1144 		if (stonewall && threads_ending) {
1145 			oper = t->active_opers;
1146 			oper->stonewalled = 1;
1147 			oper_list_del(oper, &t->active_opers);
1148 			oper_list_add(oper, &t->finished_opers);
1149 		} else {
1150 			run_active_list(t, io_iter, max_io_submit);
1151 		}
1152 		cnt++;
1153 	}
1154 	if (latency_stats)
1155 		print_latency(t);
1156 
1157 	if (completion_latency_stats)
1158 		print_completion_latency(t);
1159 
1160 	/* then we wait for all the operations to finish */
1161 	oper = t->finished_opers;
1162 	do {
1163 		if (!oper)
1164 			break;
1165 		io_oper_wait(t, oper);
1166 		oper = oper->next;
1167 	} while (oper != t->finished_opers);
1168 
1169 	/* then we do an fsync to get the timing for any future operations
1170 	 * right, and check to see if any of these need to get restarted
1171 	 */
1172 	oper = t->finished_opers;
1173 	while (oper) {
1174 		if (fsync_stages)
1175 			fsync(oper->fd);
1176 		t->stage_mb_trans += oper_mb_trans(oper);
1177 		if (restart_oper(oper)) {
1178 			oper_list_del(oper, &t->finished_opers);
1179 			oper_list_add(oper, &t->active_opers);
1180 			oper = t->finished_opers;
1181 			continue;
1182 		}
1183 		oper = oper->next;
1184 		if (oper == t->finished_opers)
1185 			break;
1186 	}
1187 
1188 	if (t->stage_mb_trans && t->num_files > 0) {
1189 		double seconds = time_since_now(&stage_time);
1190 		fprintf(stderr,
1191 			"thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1192 			t - global_thread_info, this_stage,
1193 			t->stage_mb_trans / seconds, t->stage_mb_trans,
1194 			seconds);
1195 	}
1196 
1197 	if (num_threads > 1) {
1198 		pthread_mutex_lock(&stage_mutex);
1199 		threads_ending++;
1200 		if (threads_ending == num_threads) {
1201 			threads_starting = 0;
1202 			pthread_cond_broadcast(&stage_cond);
1203 			global_thread_throughput(t, this_stage);
1204 		}
1205 		while (threads_ending != num_threads)
1206 			pthread_cond_wait(&stage_cond, &stage_mutex);
1207 		pthread_mutex_unlock(&stage_mutex);
1208 	}
1209 
1210 	/* someone got restarted, go back to the beginning */
1211 	if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1212 		iteration++;
1213 		goto restart;
1214 	}
1215 
1216 	/* finally, free all the ram */
1217 	while (t->finished_opers) {
1218 		oper = t->finished_opers;
1219 		oper_list_del(oper, &t->finished_opers);
1220 		status = finish_oper(t, oper);
1221 	}
1222 
1223 	if (t->num_global_pending) {
1224 		fprintf(stderr, "global num pending is %d\n",
1225 			t->num_global_pending);
1226 	}
1227 	io_queue_release(t->io_ctx);
1228 
1229 	return status;
1230 }
1231 
1232 typedef void *(*start_routine) (void *);
1233 int run_workers(struct thread_info *t, int num_threads)
1234 {
1235 	int ret;
1236 	int i;
1237 
1238 	for (i = 0; i < num_threads; i++) {
1239 		ret =
1240 		    pthread_create(&t[i].tid, NULL, (start_routine) worker,
1241 				   t + i);
1242 		if (ret) {
1243 			perror("pthread_create");
1244 			exit(1);
1245 		}
1246 	}
1247 	for (i = 0; i < num_threads; i++) {
1248 		ret = pthread_join(t[i].tid, NULL);
1249 		if (ret) {
1250 			perror("pthread_join");
1251 			exit(1);
1252 		}
1253 	}
1254 	return 0;
1255 }
1256 
1257 off_t parse_size(char *size_arg, off_t mult)
1258 {
1259 	char c;
1260 	int num;
1261 	off_t ret;
1262 	c = size_arg[strlen(size_arg) - 1];
1263 	if (c > '9') {
1264 		size_arg[strlen(size_arg) - 1] = '\0';
1265 	}
1266 	num = atoi(size_arg);
1267 	switch (c) {
1268 	case 'g':
1269 	case 'G':
1270 		mult = 1024 * 1024 * 1024;
1271 		break;
1272 	case 'm':
1273 	case 'M':
1274 		mult = 1024 * 1024;
1275 		break;
1276 	case 'k':
1277 	case 'K':
1278 		mult = 1024;
1279 		break;
1280 	case 'b':
1281 	case 'B':
1282 		mult = 1;
1283 		break;
1284 	}
1285 	ret = mult * num;
1286 	return ret;
1287 }
1288 
1289 void print_usage(void)
1290 {
1291 	printf
1292 	    ("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1293 	printf
1294 	    ("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1295 	printf("                  file1 [file2 ...]\n");
1296 	printf("\t-a size in KB at which to align buffers\n");
1297 	printf("\t-b max number of iocbs to give io_submit at once\n");
1298 	printf("\t-c number of io contexts per file\n");
1299 	printf("\t-C offset between contexts, default 2MB\n");
1300 	printf("\t-s size in MB of the test file(s), default 1024MB\n");
1301 	printf("\t-r record size in KB used for each io, default 64KB\n");
1302 	printf
1303 	    ("\t-d number of pending aio requests for each file, default 64\n");
1304 	printf("\t-i number of I/O per file sent before switching\n"
1305 	       "\t   to the next file, default 8\n");
1306 	printf("\t-I total number of ayncs I/O the program will run, "
1307 	       "default is run until Cntl-C\n");
1308 	printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1309 	printf("\t-S Use O_SYNC for writes\n");
1310 	printf("\t-o add an operation to the list: write=0, read=1,\n");
1311 	printf("\t   random write=2, random read=3.\n");
1312 	printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1313 	printf
1314 	    ("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1315 	printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1316 	printf("\t-n no fsyncs between write stage and read stage\n");
1317 	printf("\t-l print io_submit latencies after each stage\n");
1318 	printf("\t-L print io completion latencies after each stage\n");
1319 	printf("\t-t number of threads to run\n");
1320 	printf("\t-u unlink files after completion\n");
1321 	printf("\t-v verification of bytes written\n");
1322 	printf("\t-x turn off thread stonewalling\n");
1323 	printf("\t-h this message\n");
1324 	printf
1325 	    ("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1326 	printf("\t   translate to 400KB, 400MB and 400GB\n");
1327 	printf("version %s\n", PROG_VERSION);
1328 }
1329 
1330 int main(int ac, char **av)
1331 {
1332 	int rwfd;
1333 	int i;
1334 	int j;
1335 	int c;
1336 
1337 	off_t file_size = 1 * 1024 * 1024 * 1024;
1338 	int first_stage = WRITE;
1339 	struct io_oper *oper;
1340 	int status = 0;
1341 	int num_files = 0;
1342 	int open_fds = 0;
1343 	struct thread_info *t;
1344 
1345 	page_size_mask = getpagesize() - 1;
1346 
1347 	while (1) {
1348 		c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1349 		if (c < 0)
1350 			break;
1351 
1352 		switch (c) {
1353 		case 'a':
1354 			page_size_mask = parse_size(optarg, 1024);
1355 			page_size_mask--;
1356 			break;
1357 		case 'c':
1358 			num_contexts = atoi(optarg);
1359 			break;
1360 		case 'C':
1361 			context_offset = parse_size(optarg, 1024 * 1024);
1362 		case 'b':
1363 			max_io_submit = atoi(optarg);
1364 			break;
1365 		case 's':
1366 			file_size = parse_size(optarg, 1024 * 1024);
1367 			break;
1368 		case 'd':
1369 			depth = atoi(optarg);
1370 			break;
1371 		case 'r':
1372 			rec_len = parse_size(optarg, 1024);
1373 			break;
1374 		case 'i':
1375 			io_iter = atoi(optarg);
1376 			break;
1377 		case 'I':
1378 			iterations = atoi(optarg);
1379 			break;
1380 		case 'n':
1381 			fsync_stages = 0;
1382 			break;
1383 		case 'l':
1384 			latency_stats = 1;
1385 			break;
1386 		case 'L':
1387 			completion_latency_stats = 1;
1388 			break;
1389 		case 'm':
1390 			if (!strcmp(optarg, "shm")) {
1391 				fprintf(stderr, "using ipc shm\n");
1392 				use_shm = USE_SHM;
1393 			} else if (!strcmp(optarg, "shmfs")) {
1394 				fprintf(stderr, "using /dev/shm for buffers\n");
1395 				use_shm = USE_SHMFS;
1396 			}
1397 			break;
1398 		case 'o':
1399 			i = atoi(optarg);
1400 			stages |= 1 << i;
1401 			fprintf(stderr, "adding stage %s\n", stage_name(i));
1402 			break;
1403 		case 'O':
1404 			o_direct = O_DIRECT;
1405 			break;
1406 		case 'S':
1407 			o_sync = O_SYNC;
1408 			break;
1409 		case 't':
1410 			num_threads = atoi(optarg);
1411 			break;
1412 		case 'x':
1413 			stonewall = 0;
1414 			break;
1415 		case 'u':
1416 			unlink_files = 1;
1417 			break;
1418 		case 'v':
1419 			verify = 1;
1420 			break;
1421 		case 'h':
1422 		default:
1423 			print_usage();
1424 			exit(1);
1425 		}
1426 	}
1427 
1428 	/*
1429 	 * make sure we don't try to submit more I/O than we have allocated
1430 	 * memory for
1431 	 */
1432 	if (depth < io_iter) {
1433 		io_iter = depth;
1434 		fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1435 	}
1436 
1437 	if (optind >= ac) {
1438 		print_usage();
1439 		exit(1);
1440 	}
1441 
1442 	num_files = ac - optind;
1443 
1444 	if (num_threads > (num_files * num_contexts)) {
1445 		num_threads = num_files * num_contexts;
1446 		fprintf(stderr,
1447 			"dropping thread count to the number of contexts %d\n",
1448 			num_threads);
1449 	}
1450 
1451 	t = malloc(num_threads * sizeof(*t));
1452 	if (!t) {
1453 		perror("malloc");
1454 		exit(1);
1455 	}
1456 	global_thread_info = t;
1457 
1458 	/* by default, allow a huge number of iocbs to be sent towards
1459 	 * io_submit
1460 	 */
1461 	if (!max_io_submit)
1462 		max_io_submit = num_files * io_iter * num_contexts;
1463 
1464 	/*
1465 	 * make sure we don't try to submit more I/O than max_io_submit allows
1466 	 */
1467 	if (max_io_submit < io_iter) {
1468 		io_iter = max_io_submit;
1469 		fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1470 	}
1471 
1472 	if (!stages) {
1473 		stages =
1474 		    (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1475 	} else {
1476 		for (i = 0; i < LAST_STAGE; i++) {
1477 			if (stages & (1 << i)) {
1478 				first_stage = i;
1479 				fprintf(stderr, "starting with %s\n",
1480 					stage_name(i));
1481 				break;
1482 			}
1483 		}
1484 	}
1485 
1486 	if (file_size < num_contexts * context_offset) {
1487 		fprintf(stderr, "file size %ld too small for %d contexts\n",
1488 			(long)file_size, num_contexts);
1489 		exit(1);
1490 	}
1491 
1492 	fprintf(stderr, "file size %ldMB, record size %ldKB, depth %d, "
1493 		"I/O per iteration %d\n",
1494 		(long)(file_size / (1024 * 1024)),
1495 		rec_len / 1024, depth, io_iter);
1496 	fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1497 		max_io_submit, (page_size_mask + 1) / 1024);
1498 	fprintf(stderr, "threads %d files %d contexts %d context offset %ldMB "
1499 		"verification %s\n", num_threads, num_files, num_contexts,
1500 		(long)(context_offset / (1024 * 1024)), verify ? "on" : "off");
1501 	/* open all the files and do any required setup for them */
1502 	for (i = optind; i < ac; i++) {
1503 		int thread_index;
1504 		for (j = 0; j < num_contexts; j++) {
1505 			thread_index = open_fds % num_threads;
1506 			open_fds++;
1507 
1508 			rwfd =
1509 			    open(av[i], O_CREAT | O_RDWR | o_direct | o_sync,
1510 				 0600);
1511 			if (rwfd == -1) {
1512 				fprintf(stderr,
1513 					"error while creating file %s: %s",
1514 					av[i], strerror(errno));
1515 				exit(1);
1516 			}
1517 
1518 			oper =
1519 			    create_oper(rwfd, first_stage, j * context_offset,
1520 					file_size - j * context_offset, rec_len,
1521 					depth, io_iter, av[i]);
1522 			if (!oper) {
1523 				fprintf(stderr, "error in create_oper\n");
1524 				exit(-1);
1525 			}
1526 			oper_list_add(oper, &t[thread_index].active_opers);
1527 			t[thread_index].num_files++;
1528 		}
1529 	}
1530 	if (setup_shared_mem(num_threads, num_files * num_contexts,
1531 			     depth, rec_len, max_io_submit)) {
1532 		exit(1);
1533 	}
1534 	for (i = 0; i < num_threads; i++) {
1535 		if (setup_ious
1536 		    (&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1537 			exit(1);
1538 	}
1539 	if (num_threads > 1) {
1540 		printf("Running multi thread version num_threads:%d\n",
1541 		       num_threads);
1542 		run_workers(t, num_threads);
1543 	} else {
1544 		printf("Running single thread version \n");
1545 		status = worker(t);
1546 	}
1547 	if (unlink_files) {
1548 		for (i = optind; i < ac; i++) {
1549 			printf("Cleaning up file %s \n", av[i]);
1550 			unlink(av[i]);
1551 		}
1552 	}
1553 
1554 	if (status) {
1555 		exit(1);
1556 	}
1557 	return status;
1558 }
1559