1 /*****************************************************************************/
2 /* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */
3 /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */
4 /* */
5 /* This program is free software; you can redistribute it and/or modify */
6 /* it under the terms of the GNU General Public License as published by */
7 /* the Free Software Foundation. You should have received a copy of the */
8 /* GNU General Public License along with this program; if not, write to the */
9 /* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
10 /* */
11 /* Files needed for use: */
12 /* * netpipe.c ---- Driver source */
13 /* * netpipe.h ---- General include file */
14 /* * TCP.c ---- TCP calls source */
15 /* * TCP.h ---- Include file for TCP calls and data structs */
16 /* * MPI.c ---- MPI calls source */
17 /* * MPI.h ---- Include file for MPI calls and data structs */
18 /* * PVM.c ---- PVM calls source */
19 /* * PVM.h ---- Include file for PVM calls and data structs */
20 /*****************************************************************************/
21 #include "netpipe.h"
22
23 extern char *optarg;
24
main(int argc,char * argv[])25 int main(int argc, char *argv[])
26 {
27 FILE *out; /* Output data file */
28 char s[255]; /* Generic string */
29 char *memtmp;
30 char *memtmp1;
31
32 int c, /* option index */
33 i, j, n, nq, /* Loop indices */
34 asyncReceive = 0, /* Pre-post a receive buffer? */
35 bufoffset = 0, /* Align buffer to this */
36 bufalign = 16 * 1024, /* Boundary to align buffer to */
37 errFlag, /* Error occurred in inner testing loop */
38 nrepeat, /* Number of time to do the transmission */
39 len, /* Number of bytes to be transmitted */
40 inc = 0, /* Increment value */
41 trans = -1, /* Transmitter flag. 1 if transmitting. */
42 detailflag = 0, /* Set to examine the signature curve detail */
43 bufszflag = 0, /* Set to change the TCP socket buffer size */
44 pert, /* Perturbation value */
45 start = 1, /* Starting value for signature curve */
46 end = MAXINT, /* Ending value for signature curve */
47 streamopt = 0, /* Streaming mode flag */
48 printopt = 0; /* Debug print statements flag */
49
50 ArgStruct args; /* Argumentsfor all the calls */
51
52 double t, t0, t1, t2, /* Time variables */
53 tlast, /* Time for the last transmission */
54 latency; /* Network message latency */
55
56 Data bwdata[NSAMP]; /* Bandwidth curve data */
57
58 short port = DEFPORT; /* Port number for connection */
59 #ifdef HAVE_GETRUSAGE
60 struct rusage prev_rusage, curr_rusage; /* Resource usage */
61 double user_time, sys_time; /* User & system time used */
62 double best_user_time, best_sys_time; /* Total user & system time used */
63 double ut1, ut2, st1, st2; /* User & system time ctrs for variance */
64 double ut_var, st_var; /* Variance in user & system time */
65 #endif
66
67 #ifdef MPI
68 MPI_Init(&argc, &argv);
69 #endif
70
71 strcpy(s, "NetPIPE.out");
72 #ifndef MPI
73 if (argc < 2)
74 PrintUsage();
75 #endif
76
77 /* Parse the arguments. See Usage for description */
78 while ((c = getopt(argc, argv, "Pstrh:p:o:A:O:l:u:i:b:a")) != -1) {
79 switch (c) {
80 case 'o':
81 strcpy(s, optarg);
82 break;
83
84 case 't':
85 trans = 1;
86 break;
87
88 case 'r':
89 trans = 0;
90 break;
91
92 case 's':
93 streamopt = 1;
94 break;
95
96 case 'l': /*detailflag = 1; */
97 start = atoi(optarg);
98 if (start < 1) {
99 fprintf(stderr, "Need a starting value >= 1\n");
100 exit(743);
101 }
102 break;
103
104 case 'u': /*detailflag = 1; */
105 end = atoi(optarg);
106 break;
107
108 case 'i':
109 detailflag = 1;
110 inc = atoi(optarg);
111 break;
112
113 case 'b':
114 bufszflag = 1;
115 #ifdef TCP
116 args.prot.rcvbufsz = atoi(optarg);
117 args.prot.sndbufsz = args.prot.rcvbufsz;
118 #endif
119 break;
120
121 case 'P':
122 printopt = 1;
123 break;
124
125 case 'A':
126 bufalign = atoi(optarg);
127 break;
128
129 case 'O':
130 bufoffset = atoi(optarg);
131 break;
132
133 case 'p':
134 port = atoi(optarg);
135 break;
136
137 case 'h':
138 if (trans == 1) {
139 args.host = (char *)malloc(strlen(optarg) + 1);
140 strcpy(args.host, optarg);
141 } else {
142 fprintf(stderr,
143 "Error: -t must be specified before -h\n");
144 exit(-11);
145 }
146 break;
147
148 case 'a':
149 asyncReceive = 1;
150 break;
151
152 default:
153 PrintUsage();
154 exit(-12);
155 }
156 }
157 if (start > end) {
158 fprintf(stderr, "Start MUST be LESS than end\n");
159 exit(420132);
160 }
161 #if defined(TCP) || defined(PVM)
162 /*
163 It should be explicitly specified whether this is the transmitter
164 or the receiver.
165 */
166 if (trans < 0) {
167 fprintf(stderr, "Error: either -t or -r must be specified\n");
168 exit(-11);
169 }
170 #endif
171
172 args.nbuff = TRIALS;
173 args.tr = trans;
174 args.port = port;
175
176 #if defined(TCP)
177 if (!bufszflag) {
178 args.prot.sndbufsz = 0;
179 args.prot.rcvbufsz = 0;
180 } else
181 fprintf(stderr, "Send and Recv Buffers are %d bytes\n",
182 args.prot.sndbufsz);
183 #endif
184
185 Setup(&args);
186 Establish(&args);
187
188 if (args.tr) {
189 if ((out = fopen(s, "w")) == NULL) {
190 fprintf(stderr, "Can't open %s for output\n", s);
191 exit(1);
192 }
193 } else
194 out = stdout;
195
196 args.bufflen = 1;
197 args.buff = (char *)malloc(args.bufflen);
198 args.buff1 = (char *)malloc(args.bufflen);
199 if (asyncReceive)
200 PrepareToReceive(&args);
201 Sync(&args);
202 t0 = When();
203 t0 = When();
204 t0 = When();
205 #ifdef HAVE_GETRUSAGE
206 getrusage(RUSAGE_SELF, &prev_rusage);
207 #endif
208 t0 = When();
209 for (i = 0; i < LATENCYREPS; i++) {
210 if (args.tr) {
211 SendData(&args);
212 RecvData(&args);
213 if (asyncReceive && (i < LATENCYREPS - 1)) {
214 PrepareToReceive(&args);
215 }
216 } else {
217 RecvData(&args);
218 if (asyncReceive && (i < LATENCYREPS - 1)) {
219 PrepareToReceive(&args);
220 }
221 SendData(&args);
222 }
223 }
224 latency = (When() - t0) / (2 * LATENCYREPS);
225 #ifdef HAVE_GETRUSAGE
226 getrusage(RUSAGE_SELF, &curr_rusage);
227 #endif
228 free(args.buff);
229 free(args.buff1);
230
231 if (args.tr) {
232 SendTime(&args, &latency);
233 } else {
234 RecvTime(&args, &latency);
235 }
236 if (args.tr && printopt) {
237 fprintf(stderr, "Latency: %.7f\n", latency);
238 fprintf(stderr, "Now starting main loop\n");
239 }
240 tlast = latency;
241 if (inc == 0) {
242 /* Set a starting value for the message size increment. */
243 inc = (start > 1) ? start / 2 : 1;
244 }
245
246 /* Main loop of benchmark */
247 for (nq = n = 0, len = start, errFlag = 0;
248 n < NSAMP - 3 && tlast < STOPTM && len <= end && !errFlag;
249 len = len + inc, nq++) {
250 if (nq > 2 && !detailflag) {
251 /*
252 This has the effect of exponentially increasing the block
253 size. If detailflag is false, then the block size is
254 linearly increased (the increment is not adjusted).
255 */
256 inc = ((nq % 2)) ? inc + inc : inc;
257 }
258
259 /* This is a perturbation loop to test nearby values */
260 for (pert = (!detailflag && inc > PERT + 1) ? -PERT : 0;
261 pert <= PERT;
262 n++, pert += (!detailflag
263 && inc > PERT + 1) ? PERT : PERT + 1) {
264
265 /* Calculate how many times to repeat the experiment. */
266 if (args.tr) {
267 nrepeat = MAX((RUNTM / ((double)args.bufflen /
268 (args.bufflen - inc +
269 1.0) * tlast)),
270 TRIALS);
271 SendRepeat(&args, nrepeat);
272 } else {
273 RecvRepeat(&args, &nrepeat);
274 }
275
276 /* Allocate the buffer */
277 args.bufflen = len + pert;
278 if ((args.buff =
279 (char *)malloc(args.bufflen + bufalign)) ==
280 (char *)NULL) {
281 fprintf(stderr, "Couldn't allocate memory\n");
282 errFlag = -1;
283 break;
284 }
285 if ((args.buff1 =
286 (char *)malloc(args.bufflen + bufalign)) ==
287 (char *)NULL) {
288 fprintf(stderr, "Couldn't allocate memory\n");
289 errFlag = -1;
290 break;
291 }
292 /*
293 Possibly align the data buffer: make memtmp and memtmp1
294 point to the original blocks (so they can be freed later),
295 then adjust args.buff and args.buff1 if the user requested it.
296 */
297 memtmp = args.buff;
298 memtmp1 = args.buff1;
299 if (bufalign != 0)
300 args.buff += (bufalign -
301 ((int)(*args.buff) % bufalign) +
302 bufoffset) % bufalign;
303
304 if (bufalign != 0)
305 args.buff1 += (bufalign -
306 ((int)(*args.buff1) % bufalign) +
307 bufoffset) % bufalign;
308
309 if (args.tr && printopt)
310 fprintf(stderr, "%3d: %9d bytes %4d times --> ",
311 n, args.bufflen, nrepeat);
312
313 /* Finally, we get to transmit or receive and time */
314 if (args.tr) {
315 /*
316 This is the transmitter: send the block TRIALS times, and
317 if we are not streaming, expect the receiver to return each
318 block.
319 */
320 bwdata[n].t = LONGTIME;
321 t2 = t1 = 0;
322 #ifdef HAVE_GETRUSAGE
323 ut1 = ut2 = st1 = st2 = 0.0;
324 best_user_time = best_sys_time = LONGTIME;
325 #endif
326 for (i = 0; i < TRIALS; i++) {
327 Sync(&args);
328 #ifdef HAVE_GETRUSAGE
329 getrusage(RUSAGE_SELF, &prev_rusage);
330 #endif
331 t0 = When();
332 for (j = 0; j < nrepeat; j++) {
333 if (asyncReceive && !streamopt) {
334 PrepareToReceive(&args);
335 }
336 SendData(&args);
337 if (!streamopt) {
338 RecvData(&args);
339 }
340 }
341 t = (When() -
342 t0) / ((1 + !streamopt) * nrepeat);
343 #ifdef HAVE_GETRUSAGE
344 getrusage(RUSAGE_SELF, &curr_rusage);
345 user_time =
346 ((curr_rusage.ru_utime.tv_sec -
347 prev_rusage.ru_utime.tv_sec) +
348 (double)
349 (curr_rusage.ru_utime.tv_usec -
350 prev_rusage.ru_utime.tv_usec) *
351 1.0E-6) / ((1 +
352 !streamopt) * nrepeat);
353 sys_time =
354 ((curr_rusage.ru_stime.tv_sec -
355 prev_rusage.ru_stime.tv_sec) +
356 (double)
357 (curr_rusage.ru_stime.tv_usec -
358 prev_rusage.ru_stime.tv_usec) *
359 1.0E-6) / ((1 +
360 !streamopt) * nrepeat);
361 ut2 += user_time * user_time;
362 st2 += sys_time * sys_time;
363 ut1 += user_time;
364 st1 += sys_time;
365 if ((user_time + sys_time) <
366 (best_user_time + best_sys_time)) {
367 best_user_time = user_time;
368 best_sys_time = sys_time;
369 }
370 #endif
371
372 if (!streamopt) {
373 t2 += t * t;
374 t1 += t;
375 bwdata[n].t =
376 MIN(bwdata[n].t, t);
377 }
378 }
379 if (!streamopt)
380 SendTime(&args, &bwdata[n].t);
381 else
382 RecvTime(&args, &bwdata[n].t);
383
384 if (!streamopt)
385 bwdata[n].variance =
386 t2 / TRIALS -
387 t1 / TRIALS * t1 / TRIALS;
388
389 #ifdef HAVE_GETRUSAGE
390 ut_var =
391 ut2 / TRIALS -
392 (ut1 / TRIALS) * (ut1 / TRIALS);
393 st_var =
394 st2 / TRIALS -
395 (st1 / TRIALS) * (st1 / TRIALS);
396 #endif
397
398 } else {
399 /*
400 This is the receiver: receive the block TRIALS times, and
401 if we are not streaming, send the block back to the
402 sender.
403 */
404 bwdata[n].t = LONGTIME;
405 t2 = t1 = 0;
406 for (i = 0; i < TRIALS; i++) {
407 if (asyncReceive) {
408 PrepareToReceive(&args);
409 }
410 Sync(&args);
411 t0 = When();
412 for (j = 0; j < nrepeat; j++) {
413 RecvData(&args);
414 if (asyncReceive
415 && (j < nrepeat - 1)) {
416 PrepareToReceive(&args);
417 }
418 if (!streamopt)
419 SendData(&args);
420 }
421 t = (When() -
422 t0) / ((1 + !streamopt) * nrepeat);
423
424 if (streamopt) {
425 t2 += t * t;
426 t1 += t;
427 bwdata[n].t =
428 MIN(bwdata[n].t, t);
429 }
430 }
431 if (streamopt)
432 SendTime(&args, &bwdata[n].t);
433 else
434 RecvTime(&args, &bwdata[n].t);
435
436 if (streamopt)
437 bwdata[n].variance =
438 t2 / TRIALS -
439 t1 / TRIALS * t1 / TRIALS;
440
441 }
442 tlast = bwdata[n].t;
443 bwdata[n].bits = args.bufflen * CHARSIZE;
444 bwdata[n].bps =
445 bwdata[n].bits / (bwdata[n].t * 1024 * 1024);
446 bwdata[n].repeat = nrepeat;
447
448 if (args.tr) {
449 fprintf(out, "%.7f %.7f %d %d %.7f",
450 bwdata[n].t, bwdata[n].bps,
451 bwdata[n].bits, bwdata[n].bits / 8,
452 bwdata[n].variance);
453 #ifdef HAVE_GETRUSAGE
454 fprintf(out, " %.7f %.7f %.7f %.7f",
455 ut1 / (double)TRIALS,
456 st1 / (double)TRIALS, ut_var, st_var);
457 #endif
458 fprintf(out, "\n");
459 }
460 fflush(out);
461
462 free(memtmp);
463 free(memtmp1);
464
465 if (args.tr && printopt) {
466 fprintf(stderr, " %6.2f Mbps in %.7f sec",
467 bwdata[n].bps, tlast);
468 #ifdef HAVE_GETRUSAGE
469 fprintf(stderr,
470 ", avg utime=%.7f avg stime=%.7f, ",
471 ut1 / (double)TRIALS,
472 st1 / (double)TRIALS);
473 fprintf(stderr, "min utime=%.7f stime=%.7f, ",
474 best_user_time, best_sys_time);
475 fprintf(stderr, "utime var=%.7f stime var=%.7f",
476 ut_var, st_var);
477 #endif
478 fprintf(stderr, "\n");
479 }
480 } /* End of perturbation loop */
481
482 } /* End of main loop */
483
484 if (args.tr)
485 fclose(out);
486
487 CleanUp(&args);
488 return (0);
489 }
490
491 /* Return the current time in seconds, using a double precision number. */
When()492 double When()
493 {
494 struct timeval tp;
495 gettimeofday(&tp, NULL);
496 return ((double)tp.tv_sec + (double)tp.tv_usec * 1e-6);
497 }
498
PrintUsage(void)499 void PrintUsage(void)
500 {
501 printf("\n NETPIPE USAGE \n\n");
502 printf("A: specify buffers alignment e.g.: <-A 1024>\n");
503 printf("a: asynchronous receive (a.k.a. preposted receive)\n");
504 #if defined(TCP)
505 printf("b: specify send and receive buffer sizes e.g. <-b 32768>\n");
506 printf("h: specify hostname <-h host>\n");
507 #endif
508 printf("i: specify increment step size e.g. <-i 64>\n");
509 printf("l: lower bound start value e.g. <-i 1>\n");
510 printf("O: specify buffer offset e.g. <-O 127>\n");
511 printf("o: specify output filename <-o fn>\n");
512 printf("P: print on screen\n");
513 #if defined(TCP)
514 printf("p: specify port e.g. <-p 5150>\n");
515 #endif
516 printf("r: receiver\n");
517 printf("s: stream option\n");
518 printf("t: transmitter\n");
519 printf("u: upper bound stop value e.g. <-u 1048576>\n");
520 printf("\n");
521 exit(-12);
522 }
523