1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import logging
7import math
8import numbers
9import re
10import time
11import os.path
12
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.common_lib.cros import path_utils
15
16
17class NetperfResult(object):
18    """Encapsulates logic to parse and represent netperf results."""
19
20    @staticmethod
21    def from_netperf_results(test_type, results, duration_seconds):
22        """Parse the text output of netperf and return a NetperfResult.
23
24        @param test_type string one of NetperfConfig.TEST_TYPE_* below.
25        @param results string raw results from netperf.
26        @param duration_seconds float number of seconds the test ran for.
27        @return NetperfResult result.
28
29        """
30        lines = results.splitlines()
31
32        # Include only results lines, which should start with a number. This
33        # helps eliminate inconsistent output, e.g., from benign warnings
34        # like:
35        #   catcher: timer popped with times_up != 0
36        lines = [l for l in lines if re.match('[0-9]+', l.strip())]
37
38        if test_type in NetperfConfig.TCP_STREAM_TESTS:
39            """Parses the following (works for both TCP_STREAM, TCP_MAERTS and
40            TCP_SENDFILE) and returns a singleton containing throughput.
41
42            TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to \
43            foo.bar.com (10.10.10.3) port 0 AF_INET
44            Recv   Send    Send
45            Socket Socket  Message  Elapsed
46            Size   Size    Size     Time     Throughput
47            bytes  bytes   bytes    secs.    10^6bits/sec
48
49            87380  16384  16384    2.00      941.28
50            """
51            if len(lines) < 1:
52                return None
53
54            result = NetperfResult(test_type, duration_seconds,
55                                   throughput=float(lines[0].split()[4]))
56        elif test_type in NetperfConfig.UDP_STREAM_TESTS:
57            """Parses the following and returns a tuple containing throughput
58            and the number of errors.
59
60            UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
61            to foo.bar.com (10.10.10.3) port 0 AF_INET
62            Socket  Message  Elapsed      Messages
63            Size    Size     Time         Okay Errors   Throughput
64            bytes   bytes    secs            #      #   10^6bits/sec
65
66            129024   65507   2.00         3673      0     961.87
67            131072           2.00         3673            961.87
68            """
69            if len(lines) < 1:
70                return None
71
72            udp_tokens = lines[0].split()
73            result = NetperfResult(test_type, duration_seconds,
74                                   throughput=float(udp_tokens[5]),
75                                   errors=float(udp_tokens[4]))
76        elif test_type in NetperfConfig.REQUEST_RESPONSE_TESTS:
77            """Parses the following which works for both rr (TCP and UDP)
78            and crr tests and returns a singleton containing transfer rate.
79
80            TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
81            to foo.bar.com (10.10.10.3) port 0 AF_INET
82            Local /Remote
83            Socket Size   Request  Resp.   Elapsed  Trans.
84            Send   Recv   Size     Size    Time     Rate
85            bytes  Bytes  bytes    bytes   secs.    per sec
86
87            16384  87380  1        1       2.00     14118.53
88            16384  87380
89            """
90            if len(lines) < 1:
91                return None
92
93            result = NetperfResult(test_type, duration_seconds,
94                                   transaction_rate=float(lines[0].split()[5]))
95        else:
96            raise error.TestFail('Invalid netperf test type: %r.' % test_type)
97
98        logging.info('%r', result)
99        return result
100
101
102    @staticmethod
103    def _get_stats(samples, field_name):
104        if any(map(lambda x: getattr(x, field_name) is None, samples)):
105            return (None, None)
106
107        values = map(lambda x: getattr(x, field_name), samples)
108        N = len(samples)
109        mean = math.fsum(values) / N
110        deviation = None
111        if N > 1:
112            differences = map(lambda x: math.pow(mean - x, 2), values)
113            deviation = math.sqrt(math.fsum(differences) / (N - 1))
114        return mean, deviation
115
116
117    @staticmethod
118    def from_samples(samples):
119        """Build an averaged NetperfResult from |samples|.
120
121        Calculate an representative sample with averaged values
122        and standard deviation from samples.
123
124        @param samples list of NetperfResult objects.
125        @return NetperfResult object.
126
127        """
128        if len(set([x.test_type for x in samples])) != 1:
129            # We have either no samples or multiple test types.
130            return None
131
132        duration_seconds, duration_seconds_dev = NetperfResult._get_stats(
133                samples, 'duration_seconds')
134        throughput, throughput_dev = NetperfResult._get_stats(
135                samples, 'throughput')
136        errors, errors_dev = NetperfResult._get_stats(samples, 'errors')
137        transaction_rate, transaction_rate_dev = NetperfResult._get_stats(
138                samples, 'transaction_rate')
139        return NetperfResult(
140                samples[0].test_type,
141                duration_seconds, duration_seconds_dev=duration_seconds_dev,
142                throughput=throughput, throughput_dev=throughput_dev,
143                errors=errors, errors_dev=errors_dev,
144                transaction_rate=transaction_rate,
145                transaction_rate_dev=transaction_rate_dev)
146
147
148    @property
149    def human_readable_tag(self):
150        """@return string human readable test description."""
151        return NetperfConfig.test_type_to_human_readable_tag(self.test_type)
152
153
154    @property
155    def tag(self):
156        """@return string very short test description."""
157        return NetperfConfig.test_type_to_tag(self.test_type)
158
159
160    def __init__(self, test_type, duration_seconds, duration_seconds_dev=None,
161                 throughput=None, throughput_dev=None,
162                 errors=None, errors_dev=None,
163                 transaction_rate=None, transaction_rate_dev=None):
164        """Construct a NetperfResult.
165
166        @param duration_seconds float how long the test took.
167        @param throughput float test throughput in Mbps.
168        @param errors int number of UDP errors in test.
169        @param transaction_rate float transactions per second.
170
171        """
172        self.test_type = test_type
173        self.duration_seconds = duration_seconds
174        self.duration_seconds_dev = duration_seconds_dev
175        self.throughput = throughput
176        self.throughput_dev = throughput_dev
177        self.errors = errors
178        self.errors_dev = errors_dev
179        self.transaction_rate = transaction_rate
180        self.transaction_rate_dev = transaction_rate_dev
181        if throughput is None and transaction_rate is None and errors is None:
182            logging.error('Created a NetperfResult with no data.')
183
184
185    def __repr__(self):
186        fields = ['test_type=%s' % self.test_type]
187        fields += ['%s=%0.2f' % item
188                   for item in vars(self).iteritems()
189                   if item[1] is not None
190                   and isinstance(item[1], numbers.Number)]
191        return '%s(%s)' % (self.__class__.__name__, ', '.join(fields))
192
193
194    def all_deviations_less_than_fraction(self, fraction):
195        """Check that this result is "acurate" enough.
196
197        We say that a NetperfResult is "acurate" enough when for each
198        measurement X with standard deviation d(X), d(X)/X <= |fraction|.
199
200        @param fraction float used in constraint above.
201        @return True on above condition.
202
203        """
204        for measurement in ['throughput', 'errors', 'transaction_rate']:
205            value = getattr(self, measurement)
206            dev = getattr(self, measurement + '_dev')
207            if value is None or dev is None:
208                continue
209
210            if not dev and not value:
211                # 0/0 is undefined, but take this to be good for our purposes.
212                continue
213
214            if dev and not value:
215                # Deviation is non-zero, but the average is 0.  Deviation
216                # as a fraction of the value is undefined but in theory
217                # a "very large number."
218                return False
219
220            if dev / value > fraction:
221                return False
222
223        return True
224
225
226    def get_keyval(self, prefix='', suffix=''):
227        ret = {}
228        if prefix:
229            prefix = prefix + '_'
230        if suffix:
231            suffix = '_' + suffix
232
233        for measurement in ['throughput', 'errors', 'transaction_rate']:
234            value = getattr(self, measurement)
235            dev = getattr(self, measurement + '_dev')
236            if dev is None:
237                margin = ''
238            else:
239                margin = '+-%0.2f' % dev
240            if value is not None:
241                ret[prefix + measurement + suffix] = '%0.2f%s' % (value, margin)
242        return ret
243
244
245class NetperfAssertion(object):
246    """Defines a set of expectations for netperf results."""
247
248    def _passes(self, result, field):
249        value = getattr(result, field)
250        deviation = getattr(result, field + '_dev')
251        bounds = getattr(self, field + '_bounds')
252        if bounds[0] is None and bounds[1] is None:
253            return True
254
255        if value is None:
256            # We have bounds requirements, but no value to check?
257            return False
258
259        if bounds[0] is not None and bounds[0] > value + deviation:
260            return False
261
262        if bounds[1] is not None and bounds[1] < value - deviation:
263            return False
264
265        return True
266
267
268    def __init__(self, duration_seconds_min=None, duration_seconds_max=None,
269                 throughput_min=None, throughput_max=None,
270                 error_min=None, error_max=None,
271                 transaction_rate_min=None, transaction_rate_max=None):
272        """Construct a NetperfAssertion.
273
274        Leaving bounds undefined sets them to values which are permissive.
275
276        @param duration_seconds_min float minimal test duration in seconds.
277        @param duration_seconds_max float maximal test duration in seconds.
278        @param throughput_min float minimal throughput in Mbps.
279        @param throughput_max float maximal throughput in Mbps.
280        @param error_min int minimal number of UDP frame errors.
281        @param error_max int max number of UDP frame errors.
282        @param transaction_rate_min float minimal number of transactions
283                per second.
284        @param transaction_rate_max float max number of transactions per second.
285
286        """
287        Bound = collections.namedtuple('Bound', ['lower', 'upper'])
288        self.duration_seconds_bounds = Bound(duration_seconds_min,
289                                             duration_seconds_max)
290        self.throughput_bounds = Bound(throughput_min, throughput_max)
291        self.errors_bounds = Bound(error_min, error_max)
292        self.transaction_rate_bounds = Bound(transaction_rate_min,
293                                             transaction_rate_max)
294
295
296    def passes(self, result):
297        """Check that a result matches the given assertion.
298
299        @param result NetperfResult object produced by a test.
300        @return True iff all this assertion passes for the give result.
301
302        """
303        passed = [self._passes(result, field)
304                  for field in ['duration_seconds', 'throughput',
305                                'errors', 'transaction_rate']]
306        if all(passed):
307            return True
308
309        return False
310
311
312    def __repr__(self):
313        fields = {'duration_seconds_min': self.duration_seconds_bounds.lower,
314                  'duration_seconds_max': self.duration_seconds_bounds.upper,
315                  'throughput_min': self.throughput_bounds.lower,
316                  'throughput_max': self.throughput_bounds.upper,
317                  'error_min': self.errors_bounds.lower,
318                  'error_max': self.errors_bounds.upper,
319                  'transaction_rate_min': self.transaction_rate_bounds.lower,
320                  'transaction_rate_max': self.transaction_rate_bounds.upper}
321        return '%s(%s)' % (self.__class__.__name__,
322                           ', '.join(['%s=%r' % item
323                                      for item in fields.iteritems()
324                                      if item[1] is not None]))
325
326
327class NetperfConfig(object):
328    """Defines a single netperf run."""
329
330    DEFAULT_TEST_TIME = 10
331    # Measures how many times we can connect, request a byte, and receive a
332    # byte per second.
333    TEST_TYPE_TCP_CRR = 'TCP_CRR'
334    # MAERTS is stream backwards.  Measure bitrate of a stream from the netperf
335    # server to the client.
336    TEST_TYPE_TCP_MAERTS = 'TCP_MAERTS'
337    # Measures how many times we can request a byte and receive a byte per
338    # second.
339    TEST_TYPE_TCP_RR = 'TCP_RR'
340    # This is like a TCP_STREAM test except that the netperf client will use
341    # a platform dependent call like sendfile() rather than the simple send()
342    # call.  This can result in better performance.
343    TEST_TYPE_TCP_SENDFILE = 'TCP_SENDFILE'
344    # Measures throughput sending bytes from the client to the server in a
345    # TCP stream.
346    TEST_TYPE_TCP_STREAM = 'TCP_STREAM'
347    # Measures how many times we can request a byte from the client and receive
348    # a byte from the server.  If any datagram is dropped, the client or server
349    # will block indefinitely.  This failure is not evident except as a low
350    # transaction rate.
351    TEST_TYPE_UDP_RR = 'UDP_RR'
352    # Test UDP throughput sending from the client to the server.  There is no
353    # flow control here, and generally sending is easier that receiving, so
354    # there will be two types of throughput, both receiving and sending.
355    TEST_TYPE_UDP_STREAM = 'UDP_STREAM'
356    # This isn't a real test type, but we can emulate a UDP stream from the
357    # server to the DUT by running the netperf server on the DUT and the
358    # client on the server and then doing a UDP_STREAM test.
359    TEST_TYPE_UDP_MAERTS = 'UDP_MAERTS'
360    # Different kinds of tests have different output formats.
361    REQUEST_RESPONSE_TESTS = [ TEST_TYPE_TCP_CRR,
362                               TEST_TYPE_TCP_RR,
363                               TEST_TYPE_UDP_RR ]
364    TCP_STREAM_TESTS = [ TEST_TYPE_TCP_MAERTS,
365                         TEST_TYPE_TCP_SENDFILE,
366                         TEST_TYPE_TCP_STREAM ]
367    UDP_STREAM_TESTS = [ TEST_TYPE_UDP_STREAM,
368                         TEST_TYPE_UDP_MAERTS ]
369
370    SHORT_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_crr',
371                   TEST_TYPE_TCP_MAERTS: 'tcp_rx',
372                   TEST_TYPE_TCP_RR: 'tcp_rr',
373                   TEST_TYPE_TCP_SENDFILE: 'tcp_stx',
374                   TEST_TYPE_TCP_STREAM: 'tcp_tx',
375                   TEST_TYPE_UDP_RR: 'udp_rr',
376                   TEST_TYPE_UDP_STREAM: 'udp_tx',
377                   TEST_TYPE_UDP_MAERTS: 'udp_rx' }
378
379    READABLE_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_connect_roundtrip_rate',
380                      TEST_TYPE_TCP_MAERTS: 'tcp_downstream',
381                      TEST_TYPE_TCP_RR: 'tcp_roundtrip_rate',
382                      TEST_TYPE_TCP_SENDFILE: 'tcp_upstream_sendfile',
383                      TEST_TYPE_TCP_STREAM: 'tcp_upstream',
384                      TEST_TYPE_UDP_RR: 'udp_roundtrip',
385                      TEST_TYPE_UDP_STREAM: 'udp_upstream',
386                      TEST_TYPE_UDP_MAERTS: 'udp_downstream' }
387
388
389    @staticmethod
390    def _assert_is_valid_test_type(test_type):
391        """Assert that |test_type| is one of TEST_TYPE_* above.
392
393        @param test_type string test type.
394
395        """
396        if (test_type not in NetperfConfig.REQUEST_RESPONSE_TESTS and
397            test_type not in NetperfConfig.TCP_STREAM_TESTS and
398            test_type not in NetperfConfig.UDP_STREAM_TESTS):
399            raise error.TestFail('Invalid netperf test type: %r.' % test_type)
400
401
402    @staticmethod
403    def test_type_to_tag(test_type):
404        """Convert a test type to a concise unique tag.
405
406        @param test_type string, one of TEST_TYPE_* above.
407        @return string very short test description.
408
409        """
410        return NetperfConfig.SHORT_TAGS.get(test_type, 'unknown')
411
412
413    @staticmethod
414    def test_type_to_human_readable_tag(test_type):
415        """Convert a test type to a unique human readable tag.
416
417        @param test_type string, one of TEST_TYPE_* above.
418        @return string human readable test description.
419
420        """
421        return NetperfConfig.READABLE_TAGS.get(test_type, 'unknown')
422
423    @property
424    def human_readable_tag(self):
425        """@return string human readable test description."""
426        return self.test_type_to_human_readable_tag(self.test_type)
427
428
429    @property
430    def netperf_test_type(self):
431        """@return string test type suitable for passing to netperf."""
432        if self.test_type == self.TEST_TYPE_UDP_MAERTS:
433            return self.TEST_TYPE_UDP_STREAM
434
435        return self.test_type
436
437
438    @property
439    def server_serves(self):
440        """False iff the server and DUT should switch roles for running netperf.
441
442        @return True iff netserv should be run on server host.  When false
443                this indicates that the DUT should run netserv and netperf
444                should be run on the server against the client.
445
446        """
447        return self.test_type != self.TEST_TYPE_UDP_MAERTS
448
449
450    @property
451    def tag(self):
452        """@return string very short test description."""
453        return self.test_type_to_tag(self.test_type)
454
455
456    def __init__(self, test_type, test_time=DEFAULT_TEST_TIME):
457        """Construct a NetperfConfig.
458
459        @param test_type string one of TEST_TYPE_* above.
460        @param test_time int number of seconds to run the test for.
461
462        """
463        self.test_type = test_type
464        self.test_time = test_time
465        self._assert_is_valid_test_type(self.netperf_test_type)
466
467
468    def __repr__(self):
469        return '%s(test_type=%r, test_time=%r' % (
470                self.__class__.__name__,
471                self.test_type,
472                self.test_time)
473
474
475class NetperfRunner(object):
476    """Delegate to run netperf on a client/server pair."""
477
478    NETPERF_DATA_PORT = 12866
479    NETPERF_PORT = 12865
480    NETSERV_STARTUP_WAIT_TIME = 3
481    NETPERF_COMMAND_TIMEOUT_MARGIN = 60
482
483
484    def __init__(self, client_proxy, server_proxy, config):
485        """Construct a NetperfRunner.
486
487        @param client WiFiClient object.
488        @param server LinuxSystem object.
489
490        """
491        self._client_proxy = client_proxy
492        self._server_proxy = server_proxy
493        if config.server_serves:
494            self._server_host = server_proxy.host
495            self._client_host = client_proxy.host
496            self._target_ip = server_proxy.wifi_ip
497        else:
498            self._server_host = client_proxy.host
499            self._client_host = server_proxy.host
500            self._target_ip = client_proxy.wifi_ip
501        self._command_netserv = path_utils.must_be_installed(
502                'netserver', host=self._server_host)
503        self._command_netperf = path_utils.must_be_installed(
504                'netperf', host=self._client_host)
505        self._config = config
506
507
508    def __enter__(self):
509        self._restart_netserv()
510        return self
511
512
513    def __exit__(self, exc_type, exc_value, traceback):
514        self._client_proxy.firewall_cleanup()
515        self._kill_netserv()
516
517
518    def _kill_netserv(self):
519        """Kills any existing netserv process on the serving host."""
520        self._server_host.run('pkill %s' %
521                              os.path.basename(self._command_netserv),
522                              ignore_status=True)
523
524
525    def _restart_netserv(self):
526        logging.info('Starting netserver...')
527        self._kill_netserv()
528        self._server_host.run('%s -p %d' %
529                              (self._command_netserv, self.NETPERF_PORT))
530        startup_time = time.time()
531        self._client_proxy.firewall_open('tcp', self._server_proxy.wifi_ip)
532        self._client_proxy.firewall_open('udp', self._server_proxy.wifi_ip)
533        # Wait for the netserv to come up.
534        while time.time() - startup_time < self.NETSERV_STARTUP_WAIT_TIME:
535            time.sleep(0.1)
536
537
538    def run(self, ignore_failures=False, retry_count=3):
539        """Run netperf and take a performance measurement.
540
541        @param ignore_failures bool True iff netperf runs that fail should be
542                ignored.  If this happens, run will return a None value rather
543                than a NetperfResult.
544        @param retry_count int number of times to retry the netperf command if
545                it fails due to an internal timeout within netperf.
546        @return NetperfResult summarizing a netperf run.
547
548        """
549        netperf = '%s -H %s -p %s -t %s -l %d -- -P 0,%d' % (
550                self._command_netperf,
551                self._target_ip,
552                self.NETPERF_PORT,
553                self._config.netperf_test_type,
554                self._config.test_time,
555                self.NETPERF_DATA_PORT)
556        logging.debug('Running netperf client.')
557        logging.info('Running netperf for %d seconds.', self._config.test_time)
558        timeout = self._config.test_time + self.NETPERF_COMMAND_TIMEOUT_MARGIN
559        for _ in range(retry_count):
560            start_time = time.time()
561            result = self._client_host.run(netperf, ignore_status=True,
562                                           ignore_timeout=ignore_failures,
563                                           timeout=timeout)
564            if not result:
565                logging.info('Retrying netperf after empty result.')
566                continue
567
568            # Exit retry loop on success.
569            if not result.exit_status:
570                break
571
572            # Only retry for known retryable conditions.
573            if 'Interrupted system call' in result.stderr:
574                logging.info('Retrying netperf after internal timeout error.')
575                continue
576
577            if 'establish the control connection' in result.stdout:
578                logging.info('Restarting netserv after client failed connect.')
579                self._restart_netserv()
580                continue
581
582            # We are in an unhandled error case.
583            logging.info('Retrying netperf after an unknown error.')
584
585        if ignore_failures and (result is None or result.exit_status):
586            return None
587
588        if result is None:
589            raise error.TestFail("No results; cmd: %s", netperf)
590
591        if result.exit_status:
592            raise error.CmdError(netperf, result,
593                                 "Command returned non-zero exit status")
594
595        duration = time.time() - start_time
596        return NetperfResult.from_netperf_results(
597                self._config.test_type, result.stdout, duration)
598