1 /*****************************************************************************/
2 /* "NetPIPE" -- Network Protocol Independent Performance Evaluator.          */
3 /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc.      */
4 /*                                                                           */
5 /* This program is free software; you can redistribute it and/or modify      */
6 /* it under the terms of the GNU General Public License as published by      */
7 /* the Free Software Foundation.  You should have received a copy of the     */
8 /* GNU General Public License along with this program; if not, write to the  */
9 /* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.   */
10 /*                                                                           */
11 /* Files needed for use:                                                     */
12 /*     * netpipe.c          ---- Driver source                               */
13 /*     * netpipe.h          ---- General include file                        */
14 /*     * TCP.c              ---- TCP calls source                            */
15 /*     * TCP.h              ---- Include file for TCP calls and data structs */
16 /*     * MPI.c              ---- MPI calls source                            */
17 /*     * MPI.h              ---- Include file for MPI calls and data structs */
18 /*     * PVM.c              ---- PVM calls source                            */
19 /*     * PVM.h              ---- Include file for PVM calls and data structs */
20 /* 2002/03/18 --- Modified to specify server interfaces - Robbie Williamson  */
21 /*		  (robbiew@us.ibm.com)					     */
22 /*****************************************************************************/
23 #include "netpipe.h"
24 
25 extern char *optarg;
26 
27 /* Return the current time in seconds, using a double precision number.      */
When()28 double When()
29 {
30 	struct timeval tp;
31 	gettimeofday(&tp, NULL);
32 	return ((double)tp.tv_sec + (double)tp.tv_usec * 1e-6);
33 }
34 
PrintUsage()35 int PrintUsage()
36 {
37 	printf("\n NETPIPE USAGE \n\n");
38 	printf("A: specify buffers alignment e.g.: <-A 1024>\n");
39 	printf("a: asynchronous receive (a.k.a. preposted receive)\n");
40 #if defined(TCP)
41 	printf("b: specify send and receive buffer sizes e.g. <-b 32768>\n");
42 	printf("h: specify hostname <-h host>\n");
43 	printf("H: specify server hostname <-H HOST> e.g. Multiple NICs\n");
44 #endif
45 	printf("i: specify increment step size e.g. <-i 64>\n");
46 	printf("l: lower bound start value e.g. <-i 1>\n");
47 	printf("O: specify buffer offset e.g. <-O 127>\n");
48 	printf("o: specify output filename <-o fn>\n");
49 	printf("P: print on screen\n");
50 #if defined(TCP)
51 	printf("p: specify port e.g. <-p 5150>\n");
52 #endif
53 	printf("r: receiver\n");
54 	printf("s: stream option\n");
55 	printf("t: transmitter\n");
56 	printf("u: upper bound stop value e.g. <-u 1048576>\n");
57 	printf("\n");
58 	exit(-12);
59 }
60 
main(int argc,char * argv[])61 int main(int argc, char *argv[])
62 {
63 	FILE *out;		/* Output data file                          */
64 	char s[255];		/* Generic string                            */
65 	char *memtmp;
66 	char *memtmp1;
67 
68 	int c,			/* option index                              */
69 	 i, j, n, nq,		/* Loop indices                              */
70 	 asyncReceive = 0,	/* Pre-post a receive buffer?                */
71 	    bufoffset = 0,	/* Align buffer to this                      */
72 	    bufalign = 16 * 1024,	/* Boundary to align buffer to              */
73 	    errFlag,		/* Error occurred in inner testing loop      */
74 	    nrepeat,		/* Number of time to do the transmission     */
75 	    len,		/* Number of bytes to be transmitted         */
76 	    inc = 0,		/* Increment value                           */
77 	    trans = -1,		/* Transmitter flag. 1 if transmitting.      */
78 	    server = 0,		/* Server flag. 1 if specifying server.      */
79 	    detailflag = 0,	/* Set to examine the signature curve detail */
80 	    bufszflag = 0,	/* Set to change the TCP socket buffer size  */
81 	    pert,		/* Perturbation value                        */
82 	    start = 1,		/* Starting value for signature curve        */
83 	    end = MAXINT,	/* Ending value for signature curve          */
84 	    streamopt = 0,	/* Streaming mode flag                       */
85 	    printopt = 0;	/* Debug print statements flag               */
86 
87 	ArgStruct args;		/* Argumentsfor all the calls                */
88 
89 	double t, t0, t1, t2,	/* Time variables                            */
90 	 tlast,			/* Time for the last transmission            */
91 	 latency;		/* Network message latency                   */
92 
93 	Data bwdata[NSAMP];	/* Bandwidth curve data                      */
94 
95 	short port = DEFPORT;	/* Port number for connection                */
96 #ifdef HAVE_GETRUSAGE
97 	struct rusage prev_rusage, curr_rusage;	/* Resource usage                */
98 	double user_time, sys_time;	/* User & system time used                   */
99 	double best_user_time, best_sys_time;	/* Total user & system time used     */
100 	double ut1, ut2, st1, st2;	/* User & system time ctrs for variance  */
101 	double ut_var, st_var;	/* Variance in user & system time            */
102 #endif
103 
104 #ifdef MPI
105 	MPI_Init(&argc, &argv);
106 #endif
107 
108 	strcpy(s, "NetPIPE.out");
109 #ifndef MPI
110 	if (argc < 2)
111 		PrintUsage();
112 #endif
113 	/* Parse the arguments. See Usage for description */
114 	while ((c = getopt(argc, argv, "PstrhH:p:o:A:O:l:u:i:b:a")) != -1) {
115 		switch (c) {
116 		case 'o':
117 			strcpy(s, optarg);
118 			break;
119 
120 		case 't':
121 			trans = 1;
122 			break;
123 
124 		case 'r':
125 			trans = 0;
126 			break;
127 
128 		case 's':
129 			streamopt = 1;
130 			break;
131 
132 		case 'l':	/*detailflag = 1; */
133 			start = atoi(optarg);
134 			if (start < 1) {
135 				fprintf(stderr, "Need a starting value >= 1\n");
136 				exit(743);
137 			}
138 			break;
139 
140 		case 'u':	/*detailflag = 1; */
141 			end = atoi(optarg);
142 			break;
143 
144 		case 'i':
145 			detailflag = 1;
146 			inc = atoi(optarg);
147 			break;
148 
149 		case 'b':
150 			bufszflag = 1;
151 #ifdef TCP
152 			args.prot.rcvbufsz = atoi(optarg);
153 			args.prot.sndbufsz = args.prot.rcvbufsz;
154 #endif
155 			break;
156 
157 		case 'P':
158 			printopt = 1;
159 			break;
160 
161 		case 'A':
162 			bufalign = atoi(optarg);
163 			break;
164 
165 		case 'O':
166 			bufoffset = atoi(optarg);
167 			break;
168 
169 		case 'p':
170 			port = atoi(optarg);
171 			break;
172 
173 		case 'h':
174 			if (trans == 1) {
175 				args.host = (char *)malloc(strlen(optarg) + 1);
176 				strcpy(args.host, optarg);
177 				printf("host is %s\n", args.host);
178 			} else {
179 				fprintf(stderr,
180 					"Error: -t must be specified before -h\n");
181 				exit(-11);
182 			}
183 			break;
184 
185 		case 'H':
186 			if (trans == 0) {
187 				args.server_host =
188 				    (char *)malloc(strlen(optarg) + 1);
189 				strcpy(args.server_host, optarg);
190 				printf("server is %s\n", args.server_host);
191 				server = 1;
192 			} else {
193 				fprintf(stderr,
194 					"Error: -r must be specified before -H\n");
195 				exit(-11);
196 			}
197 			break;
198 
199 		case 'a':
200 			asyncReceive = 1;
201 			break;
202 
203 		default:
204 			PrintUsage();
205 			exit(-12);
206 		}
207 	}
208 	if (start > end) {
209 		fprintf(stderr, "Start MUST be LESS than end\n");
210 		exit(420132);
211 	}
212 #if defined(TCP) || defined(PVM)
213 	/*
214 	   It should be explicitly specified whether this is the transmitter
215 	   or the receiver.
216 	 */
217 	if (trans < 0) {
218 		fprintf(stderr, "Error: either -t or -r must be specified\n");
219 		exit(-11);
220 	}
221 #endif
222 
223 	args.nbuff = TRIALS;
224 	args.tr = trans;
225 	args.sr = server;
226 	args.port = port;
227 
228 #if defined(TCP)
229 	if (!bufszflag) {
230 		args.prot.sndbufsz = 0;
231 		args.prot.rcvbufsz = 0;
232 	} else
233 		fprintf(stderr, "Send and Recv Buffers are %d bytes\n",
234 			args.prot.sndbufsz);
235 #endif
236 
237 	Setup(&args);
238 	Establish(&args);
239 
240 	if (args.tr) {
241 		if ((out = fopen(s, "w")) == NULL) {
242 			fprintf(stderr, "Can't open %s for output\n", s);
243 			exit(1);
244 		}
245 	} else
246 		out = stdout;
247 
248 	args.bufflen = 1;
249 	args.buff = (char *)malloc(args.bufflen);
250 	args.buff1 = (char *)malloc(args.bufflen);
251 	if (asyncReceive)
252 		PrepareToReceive(&args);
253 	Sync(&args);
254 	t0 = When();
255 	t0 = When();
256 	t0 = When();
257 #ifdef HAVE_GETRUSAGE
258 	getrusage(RUSAGE_SELF, &prev_rusage);
259 #endif
260 	t0 = When();
261 	for (i = 0; i < LATENCYREPS; i++) {
262 		if (args.tr) {
263 			SendData(&args);
264 			RecvData(&args);
265 			if (asyncReceive && (i < LATENCYREPS - 1)) {
266 				PrepareToReceive(&args);
267 			}
268 		} else {
269 			RecvData(&args);
270 			if (asyncReceive && (i < LATENCYREPS - 1)) {
271 				PrepareToReceive(&args);
272 			}
273 			SendData(&args);
274 		}
275 	}
276 	latency = (When() - t0) / (2 * LATENCYREPS);
277 #ifdef HAVE_GETRUSAGE
278 	getrusage(RUSAGE_SELF, &curr_rusage);
279 #endif
280 	free(args.buff);
281 	free(args.buff1);
282 
283 	if (args.tr) {
284 		SendTime(&args, &latency);
285 	} else {
286 		RecvTime(&args, &latency);
287 	}
288 	if (args.tr && printopt) {
289 		fprintf(stderr, "Latency: %.7f\n", latency);
290 		fprintf(stderr, "Now starting main loop\n");
291 	}
292 	tlast = latency;
293 	if (inc == 0) {
294 		/* Set a starting value for the message size increment. */
295 		inc = (start > 1) ? start / 2 : 1;
296 	}
297 
298 	/* Main loop of benchmark */
299 	for (nq = n = 0, len = start, errFlag = 0;
300 	     n < NSAMP - 3 && tlast < STOPTM && len <= end && !errFlag;
301 	     len = len + inc, nq++) {
302 		if (nq > 2 && !detailflag) {
303 			/*
304 			   This has the effect of exponentially increasing the block
305 			   size.  If detailflag is false, then the block size is
306 			   linearly increased (the increment is not adjusted).
307 			 */
308 			inc = ((nq % 2)) ? inc + inc : inc;
309 		}
310 
311 		/* This is a perturbation loop to test nearby values */
312 		for (pert = (!detailflag && inc > PERT + 1) ? -PERT : 0;
313 		     pert <= PERT;
314 		     n++, pert += (!detailflag
315 				   && inc > PERT + 1) ? PERT : PERT + 1) {
316 
317 			/* Calculate how many times to repeat the experiment. */
318 			if (args.tr) {
319 				nrepeat = MAX((RUNTM / ((double)args.bufflen /
320 							(args.bufflen - inc +
321 							 1.0) * tlast)),
322 					      TRIALS);
323 				SendRepeat(&args, nrepeat);
324 			} else {
325 				RecvRepeat(&args, &nrepeat);
326 			}
327 
328 			/* Allocate the buffer */
329 			args.bufflen = len + pert;
330 			if ((args.buff =
331 			     (char *)malloc(args.bufflen + bufalign)) ==
332 			    (char *)NULL) {
333 				fprintf(stderr, "Couldn't allocate memory\n");
334 				errFlag = -1;
335 				break;
336 			}
337 			if ((args.buff1 =
338 			     (char *)malloc(args.bufflen + bufalign)) ==
339 			    (char *)NULL) {
340 				fprintf(stderr, "Couldn't allocate memory\n");
341 				errFlag = -1;
342 				break;
343 			}
344 			/*
345 			   Possibly align the data buffer: make memtmp and memtmp1
346 			   point to the original blocks (so they can be freed later),
347 			   then adjust args.buff and args.buff1 if the user requested it.
348 			 */
349 			memtmp = args.buff;
350 			memtmp1 = args.buff1;
351 			if (bufalign != 0)
352 				args.buff += (bufalign -
353 					      ((intptr_t) args.buff %
354 					       bufalign) +
355 					      bufoffset) % bufalign;
356 
357 			if (bufalign != 0)
358 				args.buff1 += (bufalign -
359 					       ((intptr_t) args.buff1 %
360 						bufalign) +
361 					       bufoffset) % bufalign;
362 
363 			if (args.tr && printopt)
364 				fprintf(stderr, "%3d: %9d bytes %4d times --> ",
365 					n, args.bufflen, nrepeat);
366 
367 			/* Finally, we get to transmit or receive and time */
368 			if (args.tr) {
369 				/*
370 				   This is the transmitter: send the block TRIALS times, and
371 				   if we are not streaming, expect the receiver to return each
372 				   block.
373 				 */
374 				bwdata[n].t = LONGTIME;
375 				t2 = t1 = 0;
376 #ifdef HAVE_GETRUSAGE
377 				ut1 = ut2 = st1 = st2 = 0.0;
378 				best_user_time = best_sys_time = LONGTIME;
379 #endif
380 				for (i = 0; i < TRIALS; i++) {
381 					Sync(&args);
382 #ifdef HAVE_GETRUSAGE
383 					getrusage(RUSAGE_SELF, &prev_rusage);
384 #endif
385 					t0 = When();
386 					for (j = 0; j < nrepeat; j++) {
387 						if (asyncReceive && !streamopt) {
388 							PrepareToReceive(&args);
389 						}
390 						SendData(&args);
391 						if (!streamopt) {
392 							RecvData(&args);
393 						}
394 					}
395 					t = (When() -
396 					     t0) / ((1 + !streamopt) * nrepeat);
397 #ifdef HAVE_GETRUSAGE
398 					getrusage(RUSAGE_SELF, &curr_rusage);
399 					user_time =
400 					    ((curr_rusage.ru_utime.tv_sec -
401 					      prev_rusage.ru_utime.tv_sec) +
402 					     (double)
403 					     (curr_rusage.ru_utime.tv_usec -
404 					      prev_rusage.ru_utime.tv_usec) *
405 					     1.0E-6) / ((1 +
406 							 !streamopt) * nrepeat);
407 					sys_time =
408 					    ((curr_rusage.ru_stime.tv_sec -
409 					      prev_rusage.ru_stime.tv_sec) +
410 					     (double)
411 					     (curr_rusage.ru_stime.tv_usec -
412 					      prev_rusage.ru_stime.tv_usec) *
413 					     1.0E-6) / ((1 +
414 							 !streamopt) * nrepeat);
415 					ut2 += user_time * user_time;
416 					st2 += sys_time * sys_time;
417 					ut1 += user_time;
418 					st1 += sys_time;
419 					if ((user_time + sys_time) <
420 					    (best_user_time + best_sys_time)) {
421 						best_user_time = user_time;
422 						best_sys_time = sys_time;
423 					}
424 #endif
425 
426 					if (!streamopt) {
427 						t2 += t * t;
428 						t1 += t;
429 						bwdata[n].t =
430 						    MIN(bwdata[n].t, t);
431 					}
432 				}
433 				if (!streamopt)
434 					SendTime(&args, &bwdata[n].t);
435 				else
436 					RecvTime(&args, &bwdata[n].t);
437 
438 				if (!streamopt)
439 					bwdata[n].variance =
440 					    t2 / TRIALS -
441 					    t1 / TRIALS * t1 / TRIALS;
442 
443 #ifdef HAVE_GETRUSAGE
444 				ut_var =
445 				    ut2 / TRIALS -
446 				    (ut1 / TRIALS) * (ut1 / TRIALS);
447 				st_var =
448 				    st2 / TRIALS -
449 				    (st1 / TRIALS) * (st1 / TRIALS);
450 #endif
451 
452 			} else {
453 				/*
454 				   This is the receiver: receive the block TRIALS times, and
455 				   if we are not streaming, send the block back to the
456 				   sender.
457 				 */
458 				bwdata[n].t = LONGTIME;
459 				t2 = t1 = 0;
460 				for (i = 0; i < TRIALS; i++) {
461 					if (asyncReceive) {
462 						PrepareToReceive(&args);
463 					}
464 					Sync(&args);
465 					t0 = When();
466 					for (j = 0; j < nrepeat; j++) {
467 						RecvData(&args);
468 						if (asyncReceive
469 						    && (j < nrepeat - 1)) {
470 							PrepareToReceive(&args);
471 						}
472 						if (!streamopt)
473 							SendData(&args);
474 					}
475 					t = (When() -
476 					     t0) / ((1 + !streamopt) * nrepeat);
477 
478 					if (streamopt) {
479 						t2 += t * t;
480 						t1 += t;
481 						bwdata[n].t =
482 						    MIN(bwdata[n].t, t);
483 					}
484 				}
485 				if (streamopt)
486 					SendTime(&args, &bwdata[n].t);
487 				else
488 					RecvTime(&args, &bwdata[n].t);
489 
490 				if (streamopt)
491 					bwdata[n].variance =
492 					    t2 / TRIALS -
493 					    t1 / TRIALS * t1 / TRIALS;
494 
495 			}
496 			tlast = bwdata[n].t;
497 			bwdata[n].bits = args.bufflen * CHARSIZE;
498 			bwdata[n].bps =
499 			    bwdata[n].bits / (bwdata[n].t * 1024 * 1024);
500 			bwdata[n].repeat = nrepeat;
501 
502 			if (args.tr) {
503 				fprintf(out, "%.7f %.7f %d %d %.7f",
504 					bwdata[n].t, bwdata[n].bps,
505 					bwdata[n].bits, bwdata[n].bits / 8,
506 					bwdata[n].variance);
507 #ifdef HAVE_GETRUSAGE
508 				fprintf(out, " %.7f %.7f %.7f %.7f",
509 					ut1 / (double)TRIALS,
510 					st1 / (double)TRIALS, ut_var, st_var);
511 #endif
512 				fprintf(out, "\n");
513 			}
514 			fflush(out);
515 
516 			free(memtmp);
517 			free(memtmp1);
518 
519 			if (args.tr && printopt) {
520 				fprintf(stderr, " %6.3f Mbps in %.7f sec",
521 					bwdata[n].bps, tlast);
522 #ifdef HAVE_GETRUSAGE
523 				fprintf(stderr,
524 					", avg utime=%.7f avg stime=%.7f, ",
525 					ut1 / (double)TRIALS,
526 					st1 / (double)TRIALS);
527 				fprintf(stderr, "min utime=%.7f stime=%.7f, ",
528 					best_user_time, best_sys_time);
529 				fprintf(stderr, "utime var=%.7f stime var=%.7f",
530 					ut_var, st_var);
531 #endif
532 				fprintf(stderr, "\n");
533 			}
534 		}		/* End of perturbation loop */
535 
536 	}			/* End of main loop  */
537 
538 	if (args.tr)
539 		fclose(out);
540 
541 	CleanUp(&args);
542 	return (0);
543 }
544