1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import logging
7import math
8import numbers
9import time
10import os.path
11
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.common_lib.cros import path_utils
14
15
16class NetperfResult(object):
17    """Encapsulates logic to parse and represent netperf results."""
18
19    @staticmethod
20    def from_netperf_results(test_type, results, duration_seconds):
21        """Parse the text output of netperf and return a NetperfResult.
22
23        @param test_type string one of NetperfConfig.TEST_TYPE_* below.
24        @param results string raw results from netperf.
25        @param duration_seconds float number of seconds the test ran for.
26        @return NetperfResult result.
27
28        """
29        lines = results.splitlines()
30        if test_type in NetperfConfig.TCP_STREAM_TESTS:
31            """Parses the following (works for both TCP_STREAM, TCP_MAERTS and
32            TCP_SENDFILE) and returns a singleton containing throughput.
33
34            TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to \
35            foo.bar.com (10.10.10.3) port 0 AF_INET
36            Recv   Send    Send
37            Socket Socket  Message  Elapsed
38            Size   Size    Size     Time     Throughput
39            bytes  bytes   bytes    secs.    10^6bits/sec
40
41            87380  16384  16384    2.00      941.28
42            """
43            if len(lines) < 7:
44                return None
45
46            result = NetperfResult(test_type, duration_seconds,
47                                   throughput=float(lines[6].split()[4]))
48        elif test_type in NetperfConfig.UDP_STREAM_TESTS:
49            """Parses the following and returns a tuple containing throughput
50            and the number of errors.
51
52            UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
53            to foo.bar.com (10.10.10.3) port 0 AF_INET
54            Socket  Message  Elapsed      Messages
55            Size    Size     Time         Okay Errors   Throughput
56            bytes   bytes    secs            #      #   10^6bits/sec
57
58            129024   65507   2.00         3673      0     961.87
59            131072           2.00         3673            961.87
60            """
61            if len(lines) < 6:
62                return None
63
64            udp_tokens = lines[5].split()
65            result = NetperfResult(test_type, duration_seconds,
66                                   throughput=float(udp_tokens[5]),
67                                   errors=float(udp_tokens[4]))
68        elif test_type in NetperfConfig.REQUEST_RESPONSE_TESTS:
69            """Parses the following which works for both rr (TCP and UDP)
70            and crr tests and returns a singleton containing transfer rate.
71
72            TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
73            to foo.bar.com (10.10.10.3) port 0 AF_INET
74            Local /Remote
75            Socket Size   Request  Resp.   Elapsed  Trans.
76            Send   Recv   Size     Size    Time     Rate
77            bytes  Bytes  bytes    bytes   secs.    per sec
78
79            16384  87380  1        1       2.00     14118.53
80            16384  87380
81            """
82            if len(lines) < 7:
83                return None
84
85            result = NetperfResult(test_type, duration_seconds,
86                                   transaction_rate=float(lines[6].split()[5]))
87        else:
88            raise error.TestFail('Invalid netperf test type: %r.' % test_type)
89
90        logging.info('%r', result)
91        return result
92
93
94    @staticmethod
95    def _get_stats(samples, field_name):
96        if any(map(lambda x: getattr(x, field_name) is None, samples)):
97            return (None, None)
98
99        values = map(lambda x: getattr(x, field_name), samples)
100        N = len(samples)
101        mean = math.fsum(values) / N
102        deviation = None
103        if N > 1:
104            differences = map(lambda x: math.pow(mean - x, 2), values)
105            deviation = math.sqrt(math.fsum(differences) / (N - 1))
106        return mean, deviation
107
108
109    @staticmethod
110    def from_samples(samples):
111        """Build an averaged NetperfResult from |samples|.
112
113        Calculate an representative sample with averaged values
114        and standard deviation from samples.
115
116        @param samples list of NetperfResult objects.
117        @return NetperfResult object.
118
119        """
120        if len(set([x.test_type for x in samples])) != 1:
121            # We have either no samples or multiple test types.
122            return None
123
124        duration_seconds, duration_seconds_dev = NetperfResult._get_stats(
125                samples, 'duration_seconds')
126        throughput, throughput_dev = NetperfResult._get_stats(
127                samples, 'throughput')
128        errors, errors_dev = NetperfResult._get_stats(samples, 'errors')
129        transaction_rate, transaction_rate_dev = NetperfResult._get_stats(
130                samples, 'transaction_rate')
131        return NetperfResult(
132                samples[0].test_type,
133                duration_seconds, duration_seconds_dev=duration_seconds_dev,
134                throughput=throughput, throughput_dev=throughput_dev,
135                errors=errors, errors_dev=errors_dev,
136                transaction_rate=transaction_rate,
137                transaction_rate_dev=transaction_rate_dev)
138
139
140    @property
141    def human_readable_tag(self):
142        """@return string human readable test description."""
143        return NetperfConfig.test_type_to_human_readable_tag(self.test_type)
144
145
146    @property
147    def tag(self):
148        """@return string very short test description."""
149        return NetperfConfig.test_type_to_tag(self.test_type)
150
151
152    def __init__(self, test_type, duration_seconds, duration_seconds_dev=None,
153                 throughput=None, throughput_dev=None,
154                 errors=None, errors_dev=None,
155                 transaction_rate=None, transaction_rate_dev=None):
156        """Construct a NetperfResult.
157
158        @param duration_seconds float how long the test took.
159        @param throughput float test throughput in Mbps.
160        @param errors int number of UDP errors in test.
161        @param transaction_rate float transactions per second.
162
163        """
164        self.test_type = test_type
165        self.duration_seconds = duration_seconds
166        self.duration_seconds_dev = duration_seconds_dev
167        self.throughput = throughput
168        self.throughput_dev = throughput_dev
169        self.errors = errors
170        self.errors_dev = errors_dev
171        self.transaction_rate = transaction_rate
172        self.transaction_rate_dev = transaction_rate_dev
173        if throughput is None and transaction_rate is None and errors is None:
174            logging.error('Created a NetperfResult with no data.')
175
176
177    def __repr__(self):
178        fields = ['test_type=%s' % self.test_type]
179        fields += ['%s=%0.2f' % item
180                   for item in vars(self).iteritems()
181                   if item[1] is not None
182                   and isinstance(item[1], numbers.Number)]
183        return '%s(%s)' % (self.__class__.__name__, ', '.join(fields))
184
185
186    def all_deviations_less_than_fraction(self, fraction):
187        """Check that this result is "acurate" enough.
188
189        We say that a NetperfResult is "acurate" enough when for each
190        measurement X with standard deviation d(X), d(X)/X <= |fraction|.
191
192        @param fraction float used in constraint above.
193        @return True on above condition.
194
195        """
196        for measurement in ['throughput', 'errors', 'transaction_rate']:
197            value = getattr(self, measurement)
198            dev = getattr(self, measurement + '_dev')
199            if value is None or dev is None:
200                continue
201
202            if not dev and not value:
203                # 0/0 is undefined, but take this to be good for our purposes.
204                continue
205
206            if dev and not value:
207                # Deviation is non-zero, but the average is 0.  Deviation
208                # as a fraction of the value is undefined but in theory
209                # a "very large number."
210                return False
211
212            if dev / value > fraction:
213                return False
214
215        return True
216
217
218    def get_keyval(self, prefix='', suffix=''):
219        ret = {}
220        if prefix:
221            prefix = prefix + '_'
222        if suffix:
223            suffix = '_' + suffix
224
225        for measurement in ['throughput', 'errors', 'transaction_rate']:
226            value = getattr(self, measurement)
227            dev = getattr(self, measurement + '_dev')
228            if dev is None:
229                margin = ''
230            else:
231                margin = '+-%0.2f' % dev
232            if value is not None:
233                ret[prefix + measurement + suffix] = '%0.2f%s' % (value, margin)
234        return ret
235
236
237class NetperfAssertion(object):
238    """Defines a set of expectations for netperf results."""
239
240    def _passes(self, result, field):
241        value = getattr(result, field)
242        deviation = getattr(result, field + '_dev')
243        bounds = getattr(self, field + '_bounds')
244        if bounds[0] is None and bounds[1] is None:
245            return True
246
247        if value is None:
248            # We have bounds requirements, but no value to check?
249            return False
250
251        if bounds[0] is not None and bounds[0] > value + deviation:
252            return False
253
254        if bounds[1] is not None and bounds[1] < value - deviation:
255            return False
256
257        return True
258
259
260    def __init__(self, duration_seconds_min=None, duration_seconds_max=None,
261                 throughput_min=None, throughput_max=None,
262                 error_min=None, error_max=None,
263                 transaction_rate_min=None, transaction_rate_max=None):
264        """Construct a NetperfAssertion.
265
266        Leaving bounds undefined sets them to values which are permissive.
267
268        @param duration_seconds_min float minimal test duration in seconds.
269        @param duration_seconds_max float maximal test duration in seconds.
270        @param throughput_min float minimal throughput in Mbps.
271        @param throughput_max float maximal throughput in Mbps.
272        @param error_min int minimal number of UDP frame errors.
273        @param error_max int max number of UDP frame errors.
274        @param transaction_rate_min float minimal number of transactions
275                per second.
276        @param transaction_rate_max float max number of transactions per second.
277
278        """
279        Bound = collections.namedtuple('Bound', ['lower', 'upper'])
280        self.duration_seconds_bounds = Bound(duration_seconds_min,
281                                             duration_seconds_max)
282        self.throughput_bounds = Bound(throughput_min, throughput_max)
283        self.errors_bounds = Bound(error_min, error_max)
284        self.transaction_rate_bounds = Bound(transaction_rate_min,
285                                             transaction_rate_max)
286
287
288    def passes(self, result):
289        """Check that a result matches the given assertion.
290
291        @param result NetperfResult object produced by a test.
292        @return True iff all this assertion passes for the give result.
293
294        """
295        passed = [self._passes(result, field)
296                  for field in ['duration_seconds', 'throughput',
297                                'errors', 'transaction_rate']]
298        if all(passed):
299            return True
300
301        return False
302
303
304    def __repr__(self):
305        fields = {'duration_seconds_min': self.duration_seconds_bounds.lower,
306                  'duration_seconds_max': self.duration_seconds_bounds.upper,
307                  'throughput_min': self.throughput_bounds.lower,
308                  'throughput_max': self.throughput_bounds.upper,
309                  'error_min': self.errors_bounds.lower,
310                  'error_max': self.errors_bounds.upper,
311                  'transaction_rate_min': self.transaction_rate_bounds.lower,
312                  'transaction_rate_max': self.transaction_rate_bounds.upper}
313        return '%s(%s)' % (self.__class__.__name__,
314                           ', '.join(['%s=%r' % item
315                                      for item in fields.iteritems()
316                                      if item[1] is not None]))
317
318
319class NetperfConfig(object):
320    """Defines a single netperf run."""
321
322    DEFAULT_TEST_TIME = 10
323    # Measures how many times we can connect, request a byte, and receive a
324    # byte per second.
325    TEST_TYPE_TCP_CRR = 'TCP_CRR'
326    # MAERTS is stream backwards.  Measure bitrate of a stream from the netperf
327    # server to the client.
328    TEST_TYPE_TCP_MAERTS = 'TCP_MAERTS'
329    # Measures how many times we can request a byte and receive a byte per
330    # second.
331    TEST_TYPE_TCP_RR = 'TCP_RR'
332    # This is like a TCP_STREAM test except that the netperf client will use
333    # a platform dependent call like sendfile() rather than the simple send()
334    # call.  This can result in better performance.
335    TEST_TYPE_TCP_SENDFILE = 'TCP_SENDFILE'
336    # Measures throughput sending bytes from the client to the server in a
337    # TCP stream.
338    TEST_TYPE_TCP_STREAM = 'TCP_STREAM'
339    # Measures how many times we can request a byte from the client and receive
340    # a byte from the server.  If any datagram is dropped, the client or server
341    # will block indefinitely.  This failure is not evident except as a low
342    # transaction rate.
343    TEST_TYPE_UDP_RR = 'UDP_RR'
344    # Test UDP throughput sending from the client to the server.  There is no
345    # flow control here, and generally sending is easier that receiving, so
346    # there will be two types of throughput, both receiving and sending.
347    TEST_TYPE_UDP_STREAM = 'UDP_STREAM'
348    # This isn't a real test type, but we can emulate a UDP stream from the
349    # server to the DUT by running the netperf server on the DUT and the
350    # client on the server and then doing a UDP_STREAM test.
351    TEST_TYPE_UDP_MAERTS = 'UDP_MAERTS'
352    # Different kinds of tests have different output formats.
353    REQUEST_RESPONSE_TESTS = [ TEST_TYPE_TCP_CRR,
354                               TEST_TYPE_TCP_RR,
355                               TEST_TYPE_UDP_RR ]
356    TCP_STREAM_TESTS = [ TEST_TYPE_TCP_MAERTS,
357                         TEST_TYPE_TCP_SENDFILE,
358                         TEST_TYPE_TCP_STREAM ]
359    UDP_STREAM_TESTS = [ TEST_TYPE_UDP_STREAM,
360                         TEST_TYPE_UDP_MAERTS ]
361
362    SHORT_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_crr',
363                   TEST_TYPE_TCP_MAERTS: 'tcp_rx',
364                   TEST_TYPE_TCP_RR: 'tcp_rr',
365                   TEST_TYPE_TCP_SENDFILE: 'tcp_stx',
366                   TEST_TYPE_TCP_STREAM: 'tcp_tx',
367                   TEST_TYPE_UDP_RR: 'udp_rr',
368                   TEST_TYPE_UDP_STREAM: 'udp_tx',
369                   TEST_TYPE_UDP_MAERTS: 'udp_rx' }
370
371    READABLE_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_connect_roundtrip_rate',
372                      TEST_TYPE_TCP_MAERTS: 'tcp_downstream',
373                      TEST_TYPE_TCP_RR: 'tcp_roundtrip_rate',
374                      TEST_TYPE_TCP_SENDFILE: 'tcp_upstream_sendfile',
375                      TEST_TYPE_TCP_STREAM: 'tcp_upstream',
376                      TEST_TYPE_UDP_RR: 'udp_roundtrip',
377                      TEST_TYPE_UDP_STREAM: 'udp_upstream',
378                      TEST_TYPE_UDP_MAERTS: 'udp_downstream' }
379
380
381    @staticmethod
382    def _assert_is_valid_test_type(test_type):
383        """Assert that |test_type| is one of TEST_TYPE_* above.
384
385        @param test_type string test type.
386
387        """
388        if (test_type not in NetperfConfig.REQUEST_RESPONSE_TESTS and
389            test_type not in NetperfConfig.TCP_STREAM_TESTS and
390            test_type not in NetperfConfig.UDP_STREAM_TESTS):
391            raise error.TestFail('Invalid netperf test type: %r.' % test_type)
392
393
394    @staticmethod
395    def test_type_to_tag(test_type):
396        """Convert a test type to a concise unique tag.
397
398        @param test_type string, one of TEST_TYPE_* above.
399        @return string very short test description.
400
401        """
402        return NetperfConfig.SHORT_TAGS.get(test_type, 'unknown')
403
404
405    @staticmethod
406    def test_type_to_human_readable_tag(test_type):
407        """Convert a test type to a unique human readable tag.
408
409        @param test_type string, one of TEST_TYPE_* above.
410        @return string human readable test description.
411
412        """
413        return NetperfConfig.READABLE_TAGS.get(test_type, 'unknown')
414
415    @property
416    def human_readable_tag(self):
417        """@return string human readable test description."""
418        return self.test_type_to_human_readable_tag(self.test_type)
419
420
421    @property
422    def netperf_test_type(self):
423        """@return string test type suitable for passing to netperf."""
424        if self.test_type == self.TEST_TYPE_UDP_MAERTS:
425            return self.TEST_TYPE_UDP_STREAM
426
427        return self.test_type
428
429
430    @property
431    def server_serves(self):
432        """False iff the server and DUT should switch roles for running netperf.
433
434        @return True iff netserv should be run on server host.  When false
435                this indicates that the DUT should run netserv and netperf
436                should be run on the server against the client.
437
438        """
439        return self.test_type != self.TEST_TYPE_UDP_MAERTS
440
441
442    @property
443    def tag(self):
444        """@return string very short test description."""
445        return self.test_type_to_tag(self.test_type)
446
447
448    def __init__(self, test_type, test_time=DEFAULT_TEST_TIME):
449        """Construct a NetperfConfig.
450
451        @param test_type string one of TEST_TYPE_* above.
452        @param test_time int number of seconds to run the test for.
453
454        """
455        self.test_type = test_type
456        self.test_time = test_time
457        self._assert_is_valid_test_type(self.netperf_test_type)
458
459
460    def __repr__(self):
461        return '%s(test_type=%r, test_time=%r' % (
462                self.__class__.__name__,
463                self.test_type,
464                self.test_time)
465
466
467class NetperfRunner(object):
468    """Delegate to run netperf on a client/server pair."""
469
470    NETPERF_DATA_PORT = 12866
471    NETPERF_PORT = 12865
472    NETSERV_STARTUP_WAIT_TIME = 3
473    NETPERF_COMMAND_TIMEOUT_MARGIN = 120
474
475
476    def __init__(self, client_proxy, server_proxy, config):
477        """Construct a NetperfRunner.
478
479        @param client WiFiClient object.
480        @param server LinuxSystem object.
481
482        """
483        self._client_proxy = client_proxy
484        self._server_proxy = server_proxy
485        if config.server_serves:
486            self._server_host = server_proxy.host
487            self._client_host = client_proxy.host
488            self._target_ip = server_proxy.wifi_ip
489        else:
490            self._server_host = client_proxy.host
491            self._client_host = server_proxy.host
492            self._target_ip = client_proxy.wifi_ip
493        self._command_netserv = path_utils.must_be_installed(
494                'netserver', host=self._server_host)
495        self._command_netperf = path_utils.must_be_installed(
496                'netperf', host=self._client_host)
497        self._config = config
498
499
500    def __enter__(self):
501        self._restart_netserv()
502        return self
503
504
505    def __exit__(self, exc_type, exc_value, traceback):
506        self._client_proxy.firewall_cleanup()
507        self._kill_netserv()
508
509
510    def _kill_netserv(self):
511        """Kills any existing netserv process on the serving host."""
512        self._server_host.run('pkill %s' %
513                              os.path.basename(self._command_netserv),
514                              ignore_status=True)
515
516
517    def _restart_netserv(self):
518        logging.info('Starting netserver...')
519        self._kill_netserv()
520        self._server_host.run('%s -p %d >/dev/null 2>&1' %
521                              (self._command_netserv, self.NETPERF_PORT))
522        startup_time = time.time()
523        self._client_proxy.firewall_open('tcp', self._server_proxy.wifi_ip)
524        self._client_proxy.firewall_open('udp', self._server_proxy.wifi_ip)
525        # Wait for the netserv to come up.
526        while time.time() - startup_time < self.NETSERV_STARTUP_WAIT_TIME:
527            time.sleep(0.1)
528
529
530    def run(self, ignore_failures=False, retry_count=3):
531        """Run netperf and take a performance measurement.
532
533        @param ignore_failures bool True iff netperf runs that fail should be
534                ignored.  If this happens, run will return a None value rather
535                than a NetperfResult.
536        @param retry_count int number of times to retry the netperf command if
537                it fails due to an internal timeout within netperf.
538        @return NetperfResult summarizing a netperf run.
539
540        """
541        netperf = '%s -H %s -p %s -t %s -l %d -- -P 0,%d' % (
542                self._command_netperf,
543                self._target_ip,
544                self.NETPERF_PORT,
545                self._config.netperf_test_type,
546                self._config.test_time,
547                self.NETPERF_DATA_PORT)
548        logging.debug('Running netperf client.')
549        logging.info('Running netperf for %d seconds.', self._config.test_time)
550        timeout = self._config.test_time + self.NETPERF_COMMAND_TIMEOUT_MARGIN
551        for attempt in range(retry_count):
552            start_time = time.time()
553            result = self._client_host.run(netperf, ignore_status=True,
554                                           ignore_timeout=ignore_failures,
555                                           timeout=timeout)
556            if not result:
557                logging.info('Retrying netperf after empty result.')
558                continue
559
560            # Exit retry loop on success.
561            if not result.exit_status:
562                break
563
564            # Only retry for known retryable conditions.
565            if 'Interrupted system call' in result.stderr:
566                logging.info('Retrying netperf after internal timeout error.')
567                continue
568
569            if 'establish the control connection' in result.stdout:
570                logging.info('Restarting netserv after client failed connect.')
571                self._restart_netserv()
572                continue
573
574            # We are in an unhandled error case.
575            logging.info('Retrying netperf after an unknown error.')
576
577        if result.exit_status and not ignore_failures:
578            raise error.CmdError(netperf, result,
579                                 "Command returned non-zero exit status")
580
581        duration = time.time() - start_time
582        if result is None or result.exit_status:
583            return None
584
585        return NetperfResult.from_netperf_results(
586                self._config.test_type, result.stdout, duration)
587