1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import logging
19import math
20import os
21import shlex
22import subprocess
23import threading
24import time
25
26from acts import context
27from acts import utils
28from acts.controllers.android_device import AndroidDevice
29from acts.controllers.utils_lib.ssh import connection
30from acts.controllers.utils_lib.ssh import settings
31from acts.event import event_bus
32from acts.event.decorators import subscribe_static
33from acts.event.event import TestClassBeginEvent
34from acts.event.event import TestClassEndEvent
35from acts.libs.proc import job
36
37MOBLY_CONTROLLER_CONFIG_NAME = 'IPerfServer'
38ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers'
39KILOBITS = 1024
40MEGABITS = KILOBITS * 1024
41GIGABITS = MEGABITS * 1024
42BITS_IN_BYTE = 8
43
44
45def create(configs):
46    """ Factory method for iperf servers.
47
48    The function creates iperf servers based on at least one config.
49    If configs only specify a port number, a regular local IPerfServer object
50    will be created. If configs contains ssh settings or and AndroidDevice,
51    remote iperf servers will be started on those devices
52
53    Args:
54        configs: config parameters for the iperf server
55    """
56    results = []
57    for c in configs:
58        if type(c) in (str, int) and str(c).isdigit():
59            results.append(IPerfServer(int(c)))
60        elif type(c) is dict and 'AndroidDevice' in c and 'port' in c:
61            results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port']))
62        elif type(c) is dict and 'ssh_config' in c and 'port' in c:
63            results.append(
64                IPerfServerOverSsh(c['ssh_config'],
65                                   c['port'],
66                                   test_interface=c.get('test_interface'),
67                                   use_killall=c.get('use_killall')))
68        else:
69            raise ValueError(
70                'Config entry %s in %s is not a valid IPerfServer '
71                'config.' % (repr(c), configs))
72    return results
73
74
75def get_info(iperf_servers):
76    """Placeholder for info about iperf servers
77
78    Returns:
79        None
80    """
81    return None
82
83
84def destroy(iperf_server_list):
85    for iperf_server in iperf_server_list:
86        try:
87            iperf_server.stop()
88        except Exception:
89            logging.exception('Unable to properly clean up %s.' % iperf_server)
90
91
92class IPerfResult(object):
93    def __init__(self, result_path, reporting_speed_units='Mbytes'):
94        """Loads iperf result from file.
95
96        Loads iperf result from JSON formatted server log. File can be accessed
97        before or after server is stopped. Note that only the first JSON object
98        will be loaded and this funtion is not intended to be used with files
99        containing multiple iperf client runs.
100        """
101        # if result_path isn't a path, treat it as JSON
102        self.reporting_speed_units = reporting_speed_units
103        if not os.path.exists(result_path):
104            self.result = json.loads(result_path)
105        else:
106            try:
107                with open(result_path, 'r') as f:
108                    iperf_output = f.readlines()
109                    if '}\n' in iperf_output:
110                        iperf_output = iperf_output[:iperf_output.index('}\n'
111                                                                        ) + 1]
112                    iperf_string = ''.join(iperf_output)
113                    iperf_string = iperf_string.replace('nan', '0')
114                    self.result = json.loads(iperf_string)
115            except ValueError:
116                with open(result_path, 'r') as f:
117                    # Possibly a result from interrupted iperf run,
118                    # skip first line and try again.
119                    lines = f.readlines()[1:]
120                    self.result = json.loads(''.join(lines))
121
122    def _has_data(self):
123        """Checks if the iperf result has valid throughput data.
124
125        Returns:
126            True if the result contains throughput data. False otherwise.
127        """
128        return ('end' in self.result) and ('sum_received' in self.result['end']
129                                           or 'sum' in self.result['end'])
130
131    def _get_reporting_speed(self, network_speed_in_bits_per_second):
132        """Sets the units for the network speed reporting based on how the
133        object was initiated.  Defaults to Megabytes per second.  Currently
134        supported, bits per second (bits), kilobits per second (kbits), megabits
135        per second (mbits), gigabits per second (gbits), bytes per second
136        (bytes), kilobits per second (kbytes), megabits per second (mbytes),
137        gigabytes per second (gbytes).
138
139        Args:
140            network_speed_in_bits_per_second: The network speed from iperf in
141                bits per second.
142
143        Returns:
144            The value of the throughput in the appropriate units.
145        """
146        speed_divisor = 1
147        if self.reporting_speed_units[1:].lower() == 'bytes':
148            speed_divisor = speed_divisor * BITS_IN_BYTE
149        if self.reporting_speed_units[0:1].lower() == 'k':
150            speed_divisor = speed_divisor * KILOBITS
151        if self.reporting_speed_units[0:1].lower() == 'm':
152            speed_divisor = speed_divisor * MEGABITS
153        if self.reporting_speed_units[0:1].lower() == 'g':
154            speed_divisor = speed_divisor * GIGABITS
155        return network_speed_in_bits_per_second / speed_divisor
156
157    def get_json(self):
158        """Returns the raw json output from iPerf."""
159        return self.result
160
161    @property
162    def error(self):
163        return self.result.get('error', None)
164
165    @property
166    def avg_rate(self):
167        """Average UDP rate in MB/s over the entire run.
168
169        This is the average UDP rate observed at the terminal the iperf result
170        is pulled from. According to iperf3 documentation this is calculated
171        based on bytes sent and thus is not a good representation of the
172        quality of the link. If the result is not from a success run, this
173        property is None.
174        """
175        if not self._has_data() or 'sum' not in self.result['end']:
176            return None
177        bps = self.result['end']['sum']['bits_per_second']
178        return self._get_reporting_speed(bps)
179
180    @property
181    def avg_receive_rate(self):
182        """Average receiving rate in MB/s over the entire run.
183
184        This data may not exist if iperf was interrupted. If the result is not
185        from a success run, this property is None.
186        """
187        if not self._has_data() or 'sum_received' not in self.result['end']:
188            return None
189        bps = self.result['end']['sum_received']['bits_per_second']
190        return self._get_reporting_speed(bps)
191
192    @property
193    def avg_send_rate(self):
194        """Average sending rate in MB/s over the entire run.
195
196        This data may not exist if iperf was interrupted. If the result is not
197        from a success run, this property is None.
198        """
199        if not self._has_data() or 'sum_sent' not in self.result['end']:
200            return None
201        bps = self.result['end']['sum_sent']['bits_per_second']
202        return self._get_reporting_speed(bps)
203
204    @property
205    def instantaneous_rates(self):
206        """Instantaneous received rate in MB/s over entire run.
207
208        This data may not exist if iperf was interrupted. If the result is not
209        from a success run, this property is None.
210        """
211        if not self._has_data():
212            return None
213        intervals = [
214            self._get_reporting_speed(interval['sum']['bits_per_second'])
215            for interval in self.result['intervals']
216        ]
217        return intervals
218
219    @property
220    def std_deviation(self):
221        """Standard deviation of rates in MB/s over entire run.
222
223        This data may not exist if iperf was interrupted. If the result is not
224        from a success run, this property is None.
225        """
226        return self.get_std_deviation(0)
227
228    def get_std_deviation(self, iperf_ignored_interval):
229        """Standard deviation of rates in MB/s over entire run.
230
231        This data may not exist if iperf was interrupted. If the result is not
232        from a success run, this property is None. A configurable number of
233        beginning (and the single last) intervals are ignored in the
234        calculation as they are inaccurate (e.g. the last is from a very small
235        interval)
236
237        Args:
238            iperf_ignored_interval: number of iperf interval to ignored in
239            calculating standard deviation
240
241        Returns:
242            The standard deviation.
243        """
244        if not self._has_data():
245            return None
246        instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval:
247                                                       -1]
248        avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates)
249        sqd_deviations = ([(rate - avg_rate)**2
250                           for rate in instantaneous_rates])
251        std_dev = math.sqrt(
252            math.fsum(sqd_deviations) / (len(sqd_deviations) - 1))
253        return std_dev
254
255
256class IPerfServerBase(object):
257    # Keeps track of the number of IPerfServer logs to prevent file name
258    # collisions.
259    __log_file_counter = 0
260
261    __log_file_lock = threading.Lock()
262
263    def __init__(self, port):
264        self._port = port
265        # TODO(markdr): We shouldn't be storing the log files in an array like
266        # this. Nobody should be reading this property either. Instead, the
267        # IPerfResult should be returned in stop() with all the necessary info.
268        # See aosp/1012824 for a WIP implementation.
269        self.log_files = []
270
271    @property
272    def port(self):
273        raise NotImplementedError('port must be specified.')
274
275    @property
276    def started(self):
277        raise NotImplementedError('started must be specified.')
278
279    def start(self, extra_args='', tag=''):
280        """Starts an iperf3 server.
281
282        Args:
283            extra_args: A string representing extra arguments to start iperf
284                server with.
285            tag: Appended to log file name to identify logs from different
286                iperf runs.
287        """
288        raise NotImplementedError('start() must be specified.')
289
290    def stop(self):
291        """Stops the iperf server.
292
293        Returns:
294            The name of the log file generated from the terminated session.
295        """
296        raise NotImplementedError('stop() must be specified.')
297
298    def _get_full_file_path(self, tag=None):
299        """Returns the full file path for the IPerfServer log file.
300
301        Note: If the directory for the file path does not exist, it will be
302        created.
303
304        Args:
305            tag: The tag passed in to the server run.
306        """
307        out_dir = self.log_path
308
309        with IPerfServerBase.__log_file_lock:
310            tags = [tag, IPerfServerBase.__log_file_counter]
311            out_file_name = 'IPerfServer,%s.log' % (','.join(
312                [str(x) for x in tags if x != '' and x is not None]))
313            IPerfServerBase.__log_file_counter += 1
314
315        file_path = os.path.join(out_dir, out_file_name)
316        self.log_files.append(file_path)
317        return file_path
318
319    @property
320    def log_path(self):
321        current_context = context.get_current_context()
322        full_out_dir = os.path.join(current_context.get_full_output_path(),
323                                    'IPerfServer%s' % self.port)
324
325        # Ensure the directory exists.
326        os.makedirs(full_out_dir, exist_ok=True)
327
328        return full_out_dir
329
330
331def _get_port_from_ss_output(ss_output, pid):
332    pid = str(pid)
333    lines = ss_output.split('\n')
334    for line in lines:
335        if pid in line:
336            # Expected format:
337            # tcp LISTEN  0 5 *:<PORT>  *:* users:(("cmd",pid=<PID>,fd=3))
338            return line.split()[4].split(':')[-1]
339    else:
340        raise ProcessLookupError('Could not find started iperf3 process.')
341
342
343class IPerfServer(IPerfServerBase):
344    """Class that handles iperf server commands on localhost."""
345    def __init__(self, port=5201):
346        super().__init__(port)
347        self._hinted_port = port
348        self._current_log_file = None
349        self._iperf_process = None
350        self._last_opened_file = None
351
352    @property
353    def port(self):
354        return self._port
355
356    @property
357    def started(self):
358        return self._iperf_process is not None
359
360    def start(self, extra_args='', tag=''):
361        """Starts iperf server on local machine.
362
363        Args:
364            extra_args: A string representing extra arguments to start iperf
365                server with.
366            tag: Appended to log file name to identify logs from different
367                iperf runs.
368        """
369        if self._iperf_process is not None:
370            return
371
372        self._current_log_file = self._get_full_file_path(tag)
373
374        # Run an iperf3 server on the hinted port with JSON output.
375        command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J']
376
377        command.extend(shlex.split(extra_args))
378
379        if self._last_opened_file:
380            self._last_opened_file.close()
381        self._last_opened_file = open(self._current_log_file, 'w')
382        self._iperf_process = subprocess.Popen(command,
383                                               stdout=self._last_opened_file,
384                                               stderr=subprocess.DEVNULL)
385        for attempts_left in reversed(range(3)):
386            try:
387                self._port = int(
388                    _get_port_from_ss_output(
389                        job.run('ss -l -p -n | grep iperf').stdout,
390                        self._iperf_process.pid))
391                break
392            except ProcessLookupError:
393                if attempts_left == 0:
394                    raise
395                logging.debug('iperf3 process not started yet.')
396                time.sleep(.01)
397
398    def stop(self):
399        """Stops the iperf server.
400
401        Returns:
402            The name of the log file generated from the terminated session.
403        """
404        if self._iperf_process is None:
405            return
406
407        if self._last_opened_file:
408            self._last_opened_file.close()
409            self._last_opened_file = None
410
411        self._iperf_process.terminate()
412        self._iperf_process = None
413
414        return self._current_log_file
415
416    def __del__(self):
417        self.stop()
418
419
420class IPerfServerOverSsh(IPerfServerBase):
421    """Class that handles iperf3 operations on remote machines."""
422    def __init__(self,
423                 ssh_config,
424                 port,
425                 test_interface=None,
426                 use_killall=False):
427        super().__init__(port)
428        self.ssh_settings = settings.from_config(ssh_config)
429        self._ssh_session = None
430        self.start_ssh()
431
432        self._iperf_pid = None
433        self._current_tag = None
434        self.hostname = self.ssh_settings.hostname
435        self._use_killall = str(use_killall).lower() == 'true'
436        try:
437            # A test interface can only be found if an ip address is specified.
438            # A fully qualified hostname will return None for the
439            # test_interface.
440            self.test_interface = self._get_test_interface_based_on_ip(
441                test_interface)
442        except Exception:
443            self.test_interface = None
444
445    @property
446    def port(self):
447        return self._port
448
449    @property
450    def started(self):
451        return self._iperf_pid is not None
452
453    def _get_remote_log_path(self):
454        return '/tmp/iperf_server_port%s.log' % self.port
455
456    def _get_test_interface_based_on_ip(self, test_interface):
457        """Gets the test interface for a particular IP if the test interface
458            passed in test_interface is None
459
460        Args:
461            test_interface: Either a interface name, ie eth0, or None
462
463        Returns:
464            The name of the test interface.
465        """
466        if test_interface:
467            return test_interface
468        return utils.get_interface_based_on_ip(self._ssh_session,
469                                               self.hostname)
470
471    def get_interface_ip_addresses(self, interface):
472        """Gets all of the ip addresses, ipv4 and ipv6, associated with a
473           particular interface name.
474
475        Args:
476            interface: The interface name on the device, ie eth0
477
478        Returns:
479            A list of dictionaries of the the various IP addresses:
480                ipv4_private_local_addresses: Any 192.168, 172.16, or 10
481                    addresses
482                ipv4_public_addresses: Any IPv4 public addresses
483                ipv6_link_local_addresses: Any fe80:: addresses
484                ipv6_private_local_addresses: Any fd00:: addresses
485                ipv6_public_addresses: Any publicly routable addresses
486        """
487        if not self._ssh_session:
488            self.start_ssh()
489
490        return utils.get_interface_ip_addresses(self._ssh_session, interface)
491
492    def renew_test_interface_ip_address(self):
493        """Renews the test interface's IP address.  Necessary for changing
494           DHCP scopes during a test.
495        """
496        if not self._ssh_session:
497            self.start_ssh()
498        utils.renew_linux_ip_address(self._ssh_session, self.test_interface)
499
500    def start(self, extra_args='', tag='', iperf_binary=None):
501        """Starts iperf server on specified machine and port.
502
503        Args:
504            extra_args: A string representing extra arguments to start iperf
505                server with.
506            tag: Appended to log file name to identify logs from different
507                iperf runs.
508            iperf_binary: Location of iperf3 binary. If none, it is assumed the
509                the binary is in the path.
510        """
511        if self.started:
512            return
513
514        if not self._ssh_session:
515            self.start_ssh()
516        if not iperf_binary:
517            logging.debug('No iperf3 binary specified.  '
518                          'Assuming iperf3 is in the path.')
519            iperf_binary = 'iperf3'
520        else:
521            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
522        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
523
524        cmd = '{cmd} {extra_flags} > {log_file}'.format(
525            cmd=iperf_command,
526            extra_flags=extra_args,
527            log_file=self._get_remote_log_path())
528
529        job_result = self._ssh_session.run_async(cmd)
530        self._iperf_pid = job_result.stdout
531        self._current_tag = tag
532
533    def stop(self):
534        """Stops the iperf server.
535
536        Returns:
537            The name of the log file generated from the terminated session.
538        """
539        if not self.started:
540            return
541
542        if self._use_killall:
543            self._ssh_session.run('killall iperf3', ignore_status=True)
544        else:
545            self._ssh_session.run_async('kill -9 {}'.format(
546                str(self._iperf_pid)))
547
548        iperf_result = self._ssh_session.run('cat {}'.format(
549            self._get_remote_log_path()))
550
551        log_file = self._get_full_file_path(self._current_tag)
552        with open(log_file, 'w') as f:
553            f.write(iperf_result.stdout)
554
555        self._ssh_session.run_async('rm {}'.format(
556            self._get_remote_log_path()))
557        self._iperf_pid = None
558        return log_file
559
560    def start_ssh(self):
561        """Starts an ssh session to the iperf server."""
562        if not self._ssh_session:
563            self._ssh_session = connection.SshConnection(self.ssh_settings)
564
565    def close_ssh(self):
566        """Closes the ssh session to the iperf server, if one exists, preventing
567        connection reset errors when rebooting server device.
568        """
569        if self.started:
570            self.stop()
571        if self._ssh_session:
572            self._ssh_session.close()
573            self._ssh_session = None
574
575
576# TODO(markdr): Remove this after automagic controller creation has been
577# removed.
578class _AndroidDeviceBridge(object):
579    """A helper class for connecting serial numbers to AndroidDevices."""
580
581    _test_class = None
582
583    @staticmethod
584    @subscribe_static(TestClassBeginEvent)
585    def on_test_begin(event):
586        _AndroidDeviceBridge._test_class = event.test_class
587
588    @staticmethod
589    @subscribe_static(TestClassEndEvent)
590    def on_test_end(_):
591        _AndroidDeviceBridge._test_class = None
592
593    @staticmethod
594    def android_devices():
595        """A dict of serial -> AndroidDevice, where AndroidDevice is a device
596        found in the current TestClass's controllers.
597        """
598        if not _AndroidDeviceBridge._test_class:
599            return {}
600        return {
601            device.serial: device
602            for device in _AndroidDeviceBridge._test_class.android_devices
603        }
604
605
606event_bus.register_subscription(
607    _AndroidDeviceBridge.on_test_begin.subscription)
608event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription)
609
610
611class IPerfServerOverAdb(IPerfServerBase):
612    """Class that handles iperf3 operations over ADB devices."""
613    def __init__(self, android_device_or_serial, port):
614        """Creates a new IPerfServerOverAdb object.
615
616        Args:
617            android_device_or_serial: Either an AndroidDevice object, or the
618                serial that corresponds to the AndroidDevice. Note that the
619                serial must be present in an AndroidDevice entry in the ACTS
620                config.
621            port: The port number to open the iperf server on.
622        """
623        super().__init__(port)
624        self._android_device_or_serial = android_device_or_serial
625
626        self._iperf_process = None
627        self._current_tag = ''
628
629    @property
630    def port(self):
631        return self._port
632
633    @property
634    def started(self):
635        return self._iperf_process is not None
636
637    @property
638    def _android_device(self):
639        if isinstance(self._android_device_or_serial, AndroidDevice):
640            return self._android_device_or_serial
641        else:
642            return _AndroidDeviceBridge.android_devices()[
643                self._android_device_or_serial]
644
645    def _get_device_log_path(self):
646        return '~/data/iperf_server_port%s.log' % self.port
647
648    def start(self, extra_args='', tag='', iperf_binary=None):
649        """Starts iperf server on an ADB device.
650
651        Args:
652            extra_args: A string representing extra arguments to start iperf
653                server with.
654            tag: Appended to log file name to identify logs from different
655                iperf runs.
656            iperf_binary: Location of iperf3 binary. If none, it is assumed the
657                the binary is in the path.
658        """
659        if self._iperf_process is not None:
660            return
661
662        if not iperf_binary:
663            logging.debug('No iperf3 binary specified.  '
664                          'Assuming iperf3 is in the path.')
665            iperf_binary = 'iperf3'
666        else:
667            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
668        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
669
670        self._iperf_process = self._android_device.adb.shell_nb(
671            '{cmd} {extra_flags} > {log_file}'.format(
672                cmd=iperf_command,
673                extra_flags=extra_args,
674                log_file=self._get_device_log_path()))
675
676        self._iperf_process_adb_pid = ''
677        while len(self._iperf_process_adb_pid) == 0:
678            self._iperf_process_adb_pid = self._android_device.adb.shell(
679                'pgrep iperf3 -n')
680
681        self._current_tag = tag
682
683    def stop(self):
684        """Stops the iperf server.
685
686        Returns:
687            The name of the log file generated from the terminated session.
688        """
689        if self._iperf_process is None:
690            return
691
692        job.run('kill -9 {}'.format(self._iperf_process.pid))
693
694        # TODO(markdr): update with definitive kill method
695        while True:
696            iperf_process_list = self._android_device.adb.shell('pgrep iperf3')
697            if iperf_process_list.find(self._iperf_process_adb_pid) == -1:
698                break
699            else:
700                self._android_device.adb.shell("kill -9 {}".format(
701                    self._iperf_process_adb_pid))
702
703        iperf_result = self._android_device.adb.shell('cat {}'.format(
704            self._get_device_log_path()))
705
706        log_file = self._get_full_file_path(self._current_tag)
707        with open(log_file, 'w') as f:
708            f.write(iperf_result)
709
710        self._android_device.adb.shell('rm {}'.format(
711            self._get_device_log_path()))
712
713        self._iperf_process = None
714        return log_file
715