1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import logging
19import math
20import os
21import shlex
22import subprocess
23import threading
24import time
25
26from acts import context
27from acts import utils
28from acts.controllers.android_device import AndroidDevice
29from acts.controllers.utils_lib.ssh import connection
30from acts.controllers.utils_lib.ssh import settings
31from acts.event import event_bus
32from acts.event.decorators import subscribe_static
33from acts.event.event import TestClassBeginEvent
34from acts.event.event import TestClassEndEvent
35from acts.libs.proc import job
36
37ACTS_CONTROLLER_CONFIG_NAME = 'IPerfServer'
38ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers'
39KILOBITS = 1024
40MEGABITS = KILOBITS * 1024
41GIGABITS = MEGABITS * 1024
42BITS_IN_BYTE = 8
43
44
45def create(configs):
46    """ Factory method for iperf servers.
47
48    The function creates iperf servers based on at least one config.
49    If configs only specify a port number, a regular local IPerfServer object
50    will be created. If configs contains ssh settings or and AndroidDevice,
51    remote iperf servers will be started on those devices
52
53    Args:
54        configs: config parameters for the iperf server
55    """
56    results = []
57    for c in configs:
58        if type(c) in (str, int) and str(c).isdigit():
59            results.append(IPerfServer(int(c)))
60        elif type(c) is dict and 'AndroidDevice' in c and 'port' in c:
61            results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port']))
62        elif type(c) is dict and 'ssh_config' in c and 'port' in c:
63            results.append(
64                IPerfServerOverSsh(c['ssh_config'],
65                                   c['port'],
66                                   test_interface=c.get('test_interface')))
67        else:
68            raise ValueError(
69                'Config entry %s in %s is not a valid IPerfServer '
70                'config.' % (repr(c), configs))
71    return results
72
73
74def get_info(iperf_servers):
75    """Placeholder for info about iperf servers
76
77    Returns:
78        None
79    """
80    return None
81
82
83def destroy(iperf_server_list):
84    for iperf_server in iperf_server_list:
85        try:
86            iperf_server.stop()
87        except Exception:
88            logging.exception('Unable to properly clean up %s.' % iperf_server)
89
90
91class IPerfResult(object):
92    def __init__(self, result_path, reporting_speed_units='Mbytes'):
93        """Loads iperf result from file.
94
95        Loads iperf result from JSON formatted server log. File can be accessed
96        before or after server is stopped. Note that only the first JSON object
97        will be loaded and this funtion is not intended to be used with files
98        containing multiple iperf client runs.
99        """
100        # if result_path isn't a path, treat it as JSON
101        self.reporting_speed_units = reporting_speed_units
102        if not os.path.exists(result_path):
103            self.result = json.loads(result_path)
104        else:
105            try:
106                with open(result_path, 'r') as f:
107                    iperf_output = f.readlines()
108                    if '}\n' in iperf_output:
109                        iperf_output = iperf_output[:iperf_output.index('}\n'
110                                                                        ) + 1]
111                    iperf_string = ''.join(iperf_output)
112                    iperf_string = iperf_string.replace('nan', '0')
113                    self.result = json.loads(iperf_string)
114            except ValueError:
115                with open(result_path, 'r') as f:
116                    # Possibly a result from interrupted iperf run,
117                    # skip first line and try again.
118                    lines = f.readlines()[1:]
119                    self.result = json.loads(''.join(lines))
120
121    def _has_data(self):
122        """Checks if the iperf result has valid throughput data.
123
124        Returns:
125            True if the result contains throughput data. False otherwise.
126        """
127        return ('end' in self.result) and ('sum_received' in self.result['end']
128                                           or 'sum' in self.result['end'])
129
130    def _get_reporting_speed(self, network_speed_in_bits_per_second):
131        """Sets the units for the network speed reporting based on how the
132        object was initiated.  Defaults to Megabytes per second.  Currently
133        supported, bits per second (bits), kilobits per second (kbits), megabits
134        per second (mbits), gigabits per second (gbits), bytes per second
135        (bytes), kilobits per second (kbytes), megabits per second (mbytes),
136        gigabytes per second (gbytes).
137
138        Args:
139            network_speed_in_bits_per_second: The network speed from iperf in
140                bits per second.
141
142        Returns:
143            The value of the throughput in the appropriate units.
144        """
145        speed_divisor = 1
146        if self.reporting_speed_units[1:].lower() == 'bytes':
147            speed_divisor = speed_divisor * BITS_IN_BYTE
148        if self.reporting_speed_units[0:1].lower() == 'k':
149            speed_divisor = speed_divisor * KILOBITS
150        if self.reporting_speed_units[0:1].lower() == 'm':
151            speed_divisor = speed_divisor * MEGABITS
152        if self.reporting_speed_units[0:1].lower() == 'g':
153            speed_divisor = speed_divisor * GIGABITS
154        return network_speed_in_bits_per_second / speed_divisor
155
156    def get_json(self):
157        """Returns the raw json output from iPerf."""
158        return self.result
159
160    @property
161    def error(self):
162        return self.result.get('error', None)
163
164    @property
165    def avg_rate(self):
166        """Average UDP rate in MB/s over the entire run.
167
168        This is the average UDP rate observed at the terminal the iperf result
169        is pulled from. According to iperf3 documentation this is calculated
170        based on bytes sent and thus is not a good representation of the
171        quality of the link. If the result is not from a success run, this
172        property is None.
173        """
174        if not self._has_data() or 'sum' not in self.result['end']:
175            return None
176        bps = self.result['end']['sum']['bits_per_second']
177        return self._get_reporting_speed(bps)
178
179    @property
180    def avg_receive_rate(self):
181        """Average receiving rate in MB/s over the entire run.
182
183        This data may not exist if iperf was interrupted. If the result is not
184        from a success run, this property is None.
185        """
186        if not self._has_data() or 'sum_received' not in self.result['end']:
187            return None
188        bps = self.result['end']['sum_received']['bits_per_second']
189        return self._get_reporting_speed(bps)
190
191    @property
192    def avg_send_rate(self):
193        """Average sending rate in MB/s over the entire run.
194
195        This data may not exist if iperf was interrupted. If the result is not
196        from a success run, this property is None.
197        """
198        if not self._has_data() or 'sum_sent' not in self.result['end']:
199            return None
200        bps = self.result['end']['sum_sent']['bits_per_second']
201        return self._get_reporting_speed(bps)
202
203    @property
204    def instantaneous_rates(self):
205        """Instantaneous received rate in MB/s over entire run.
206
207        This data may not exist if iperf was interrupted. If the result is not
208        from a success run, this property is None.
209        """
210        if not self._has_data():
211            return None
212        intervals = [
213            self._get_reporting_speed(interval['sum']['bits_per_second'])
214            for interval in self.result['intervals']
215        ]
216        return intervals
217
218    @property
219    def std_deviation(self):
220        """Standard deviation of rates in MB/s over entire run.
221
222        This data may not exist if iperf was interrupted. If the result is not
223        from a success run, this property is None.
224        """
225        return self.get_std_deviation(0)
226
227    def get_std_deviation(self, iperf_ignored_interval):
228        """Standard deviation of rates in MB/s over entire run.
229
230        This data may not exist if iperf was interrupted. If the result is not
231        from a success run, this property is None. A configurable number of
232        beginning (and the single last) intervals are ignored in the
233        calculation as they are inaccurate (e.g. the last is from a very small
234        interval)
235
236        Args:
237            iperf_ignored_interval: number of iperf interval to ignored in
238            calculating standard deviation
239
240        Returns:
241            The standard deviation.
242        """
243        if not self._has_data():
244            return None
245        instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval:
246                                                       -1]
247        avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates)
248        sqd_deviations = ([(rate - avg_rate)**2
249                           for rate in instantaneous_rates])
250        std_dev = math.sqrt(
251            math.fsum(sqd_deviations) / (len(sqd_deviations) - 1))
252        return std_dev
253
254
255class IPerfServerBase(object):
256    # Keeps track of the number of IPerfServer logs to prevent file name
257    # collisions.
258    __log_file_counter = 0
259
260    __log_file_lock = threading.Lock()
261
262    def __init__(self, port):
263        self._port = port
264        # TODO(markdr): We shouldn't be storing the log files in an array like
265        # this. Nobody should be reading this property either. Instead, the
266        # IPerfResult should be returned in stop() with all the necessary info.
267        # See aosp/1012824 for a WIP implementation.
268        self.log_files = []
269
270    @property
271    def port(self):
272        raise NotImplementedError('port must be specified.')
273
274    @property
275    def started(self):
276        raise NotImplementedError('started must be specified.')
277
278    def start(self, extra_args='', tag=''):
279        """Starts an iperf3 server.
280
281        Args:
282            extra_args: A string representing extra arguments to start iperf
283                server with.
284            tag: Appended to log file name to identify logs from different
285                iperf runs.
286        """
287        raise NotImplementedError('start() must be specified.')
288
289    def stop(self):
290        """Stops the iperf server.
291
292        Returns:
293            The name of the log file generated from the terminated session.
294        """
295        raise NotImplementedError('stop() must be specified.')
296
297    def _get_full_file_path(self, tag=None):
298        """Returns the full file path for the IPerfServer log file.
299
300        Note: If the directory for the file path does not exist, it will be
301        created.
302
303        Args:
304            tag: The tag passed in to the server run.
305        """
306        out_dir = self.log_path
307
308        with IPerfServerBase.__log_file_lock:
309            tags = [tag, IPerfServerBase.__log_file_counter]
310            out_file_name = 'IPerfServer,%s.log' % (','.join(
311                [str(x) for x in tags if x != '' and x is not None]))
312            IPerfServerBase.__log_file_counter += 1
313
314        file_path = os.path.join(out_dir, out_file_name)
315        self.log_files.append(file_path)
316        return file_path
317
318    @property
319    def log_path(self):
320        current_context = context.get_current_context()
321        full_out_dir = os.path.join(current_context.get_full_output_path(),
322                                    'IPerfServer%s' % self.port)
323
324        # Ensure the directory exists.
325        os.makedirs(full_out_dir, exist_ok=True)
326
327        return full_out_dir
328
329
330def _get_port_from_ss_output(ss_output, pid):
331    pid = str(pid)
332    lines = ss_output.split('\n')
333    for line in lines:
334        if pid in line:
335            # Expected format:
336            # tcp LISTEN  0 5 *:<PORT>  *:* users:(("cmd",pid=<PID>,fd=3))
337            return line.split()[4].split(':')[-1]
338    else:
339        raise ProcessLookupError('Could not find started iperf3 process.')
340
341
342class IPerfServer(IPerfServerBase):
343    """Class that handles iperf server commands on localhost."""
344    def __init__(self, port=5201):
345        super().__init__(port)
346        self._hinted_port = port
347        self._current_log_file = None
348        self._iperf_process = None
349        self._last_opened_file = None
350
351    @property
352    def port(self):
353        return self._port
354
355    @property
356    def started(self):
357        return self._iperf_process is not None
358
359    def start(self, extra_args='', tag=''):
360        """Starts iperf server on local machine.
361
362        Args:
363            extra_args: A string representing extra arguments to start iperf
364                server with.
365            tag: Appended to log file name to identify logs from different
366                iperf runs.
367        """
368        if self._iperf_process is not None:
369            return
370
371        self._current_log_file = self._get_full_file_path(tag)
372
373        # Run an iperf3 server on the hinted port with JSON output.
374        command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J']
375
376        command.extend(shlex.split(extra_args))
377
378        if self._last_opened_file:
379            self._last_opened_file.close()
380        self._last_opened_file = open(self._current_log_file, 'w')
381        self._iperf_process = subprocess.Popen(command,
382                                               stdout=self._last_opened_file,
383                                               stderr=subprocess.DEVNULL)
384        for attempts_left in reversed(range(3)):
385            try:
386                self._port = int(
387                    _get_port_from_ss_output(
388                        job.run('ss -l -p -n | grep iperf').stdout,
389                        self._iperf_process.pid))
390                break
391            except ProcessLookupError:
392                if attempts_left == 0:
393                    raise
394                logging.debug('iperf3 process not started yet.')
395                time.sleep(.01)
396
397    def stop(self):
398        """Stops the iperf server.
399
400        Returns:
401            The name of the log file generated from the terminated session.
402        """
403        if self._iperf_process is None:
404            return
405
406        if self._last_opened_file:
407            self._last_opened_file.close()
408            self._last_opened_file = None
409
410        self._iperf_process.terminate()
411        self._iperf_process = None
412
413        return self._current_log_file
414
415    def __del__(self):
416        self.stop()
417
418
419class IPerfServerOverSsh(IPerfServerBase):
420    """Class that handles iperf3 operations on remote machines."""
421    def __init__(self, ssh_config, port, test_interface=None):
422        super().__init__(port)
423        ssh_settings = settings.from_config(ssh_config)
424        self._ssh_session = connection.SshConnection(ssh_settings)
425
426        self._iperf_pid = None
427        self._current_tag = None
428        self.hostname = ssh_settings.hostname
429        try:
430            # A test interface can only be found if an ip address is specified.
431            # A fully qualified hostname will return None for the
432            # test_interface.
433            self.test_interface = self._get_test_interface_based_on_ip(
434                test_interface)
435        except Exception:
436            self.test_interface = None
437
438    @property
439    def port(self):
440        return self._port
441
442    @property
443    def started(self):
444        return self._iperf_pid is not None
445
446    def _get_remote_log_path(self):
447        return 'iperf_server_port%s.log' % self.port
448
449    def _get_test_interface_based_on_ip(self, test_interface):
450        """Gets the test interface for a particular IP if the test interface
451            passed in test_interface is None
452
453        Args:
454            test_interface: Either a interface name, ie eth0, or None
455
456        Returns:
457            The name of the test interface.
458        """
459        if test_interface:
460            return test_interface
461        return utils.get_interface_based_on_ip(self._ssh_session,
462                                               self.hostname)
463
464    def get_interface_ip_addresses(self, interface):
465        """Gets all of the ip addresses, ipv4 and ipv6, associated with a
466           particular interface name.
467
468        Args:
469            interface: The interface name on the device, ie eth0
470
471    Returns:
472        A list of dictionaries of the the various IP addresses:
473            ipv4_private_local_addresses: Any 192.168, 172.16, or 10
474                addresses
475            ipv4_public_addresses: Any IPv4 public addresses
476            ipv6_link_local_addresses: Any fe80:: addresses
477            ipv6_private_local_addresses: Any fd00:: addresses
478            ipv6_public_addresses: Any publicly routable addresses
479    """
480        return utils.get_interface_ip_addresses(self._ssh_session, interface)
481
482    def renew_test_interface_ip_address(self):
483        """Renews the test interface's IP address.  Necessary for changing
484           DHCP scopes during a test.
485        """
486        utils.renew_linux_ip_address(self._ssh_session, self.test_interface)
487
488    def start(self, extra_args='', tag='', iperf_binary=None):
489        """Starts iperf server on specified machine and port.
490
491        Args:
492            extra_args: A string representing extra arguments to start iperf
493                server with.
494            tag: Appended to log file name to identify logs from different
495                iperf runs.
496            iperf_binary: Location of iperf3 binary. If none, it is assumed the
497                the binary is in the path.
498        """
499        if self.started:
500            return
501
502        if not iperf_binary:
503            logging.debug('No iperf3 binary specified.  '
504                          'Assuming iperf3 is in the path.')
505            iperf_binary = 'iperf3'
506        else:
507            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
508        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
509
510        cmd = '{cmd} {extra_flags} > {log_file}'.format(
511            cmd=iperf_command,
512            extra_flags=extra_args,
513            log_file=self._get_remote_log_path())
514
515        job_result = self._ssh_session.run_async(cmd)
516        self._iperf_pid = job_result.stdout
517        self._current_tag = tag
518
519    def stop(self):
520        """Stops the iperf server.
521
522        Returns:
523            The name of the log file generated from the terminated session.
524        """
525        if not self.started:
526            return
527
528        self._ssh_session.run_async('kill -9 {}'.format(str(self._iperf_pid)))
529        iperf_result = self._ssh_session.run('cat {}'.format(
530            self._get_remote_log_path()))
531
532        log_file = self._get_full_file_path(self._current_tag)
533        with open(log_file, 'w') as f:
534            f.write(iperf_result.stdout)
535
536        self._ssh_session.run_async('rm {}'.format(
537            self._get_remote_log_path()))
538        self._iperf_pid = None
539        return log_file
540
541
542# TODO(markdr): Remove this after automagic controller creation has been
543# removed.
544class _AndroidDeviceBridge(object):
545    """A helper class for connecting serial numbers to AndroidDevices."""
546
547    _test_class = None
548
549    @staticmethod
550    @subscribe_static(TestClassBeginEvent)
551    def on_test_begin(event):
552        _AndroidDeviceBridge._test_class = event.test_class
553
554    @staticmethod
555    @subscribe_static(TestClassEndEvent)
556    def on_test_end(_):
557        _AndroidDeviceBridge._test_class = None
558
559    @staticmethod
560    def android_devices():
561        """A dict of serial -> AndroidDevice, where AndroidDevice is a device
562        found in the current TestClass's controllers.
563        """
564        if not _AndroidDeviceBridge._test_class:
565            return {}
566        return {
567            device.serial: device
568            for device in _AndroidDeviceBridge._test_class.android_devices
569        }
570
571
572event_bus.register_subscription(
573    _AndroidDeviceBridge.on_test_begin.subscription)
574event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription)
575
576
577class IPerfServerOverAdb(IPerfServerBase):
578    """Class that handles iperf3 operations over ADB devices."""
579    def __init__(self, android_device_or_serial, port):
580        """Creates a new IPerfServerOverAdb object.
581
582        Args:
583            android_device_or_serial: Either an AndroidDevice object, or the
584                serial that corresponds to the AndroidDevice. Note that the
585                serial must be present in an AndroidDevice entry in the ACTS
586                config.
587            port: The port number to open the iperf server on.
588        """
589        super().__init__(port)
590        self._android_device_or_serial = android_device_or_serial
591
592        self._iperf_process = None
593        self._current_tag = ''
594
595    @property
596    def port(self):
597        return self._port
598
599    @property
600    def started(self):
601        return self._iperf_process is not None
602
603    @property
604    def _android_device(self):
605        if isinstance(self._android_device_or_serial, AndroidDevice):
606            return self._android_device_or_serial
607        else:
608            return _AndroidDeviceBridge.android_devices()[
609                self._android_device_or_serial]
610
611    def _get_device_log_path(self):
612        return '~/data/iperf_server_port%s.log' % self.port
613
614    def start(self, extra_args='', tag='', iperf_binary=None):
615        """Starts iperf server on an ADB device.
616
617        Args:
618            extra_args: A string representing extra arguments to start iperf
619                server with.
620            tag: Appended to log file name to identify logs from different
621                iperf runs.
622            iperf_binary: Location of iperf3 binary. If none, it is assumed the
623                the binary is in the path.
624        """
625        if self._iperf_process is not None:
626            return
627
628        if not iperf_binary:
629            logging.debug('No iperf3 binary specified.  '
630                          'Assuming iperf3 is in the path.')
631            iperf_binary = 'iperf3'
632        else:
633            logging.debug('Using iperf3 binary located at %s' % iperf_binary)
634        iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port)
635
636        self._iperf_process = self._android_device.adb.shell_nb(
637            '{cmd} {extra_flags} > {log_file}'.format(
638                cmd=iperf_command,
639                extra_flags=extra_args,
640                log_file=self._get_device_log_path()))
641
642        self._iperf_process_adb_pid = ''
643        while len(self._iperf_process_adb_pid) == 0:
644            self._iperf_process_adb_pid = self._android_device.adb.shell(
645                'pgrep iperf3 -n')
646
647        self._current_tag = tag
648
649    def stop(self):
650        """Stops the iperf server.
651
652        Returns:
653            The name of the log file generated from the terminated session.
654        """
655        if self._iperf_process is None:
656            return
657
658        job.run('kill -9 {}'.format(self._iperf_process.pid))
659
660        # TODO(markdr): update with definitive kill method
661        while True:
662            iperf_process_list = self._android_device.adb.shell('pgrep iperf3')
663            if iperf_process_list.find(self._iperf_process_adb_pid) == -1:
664                break
665            else:
666                self._android_device.adb.shell("kill -9 {}".format(
667                    self._iperf_process_adb_pid))
668
669        iperf_result = self._android_device.adb.shell('cat {}'.format(
670            self._get_device_log_path()))
671
672        log_file = self._get_full_file_path(self._current_tag)
673        with open(log_file, 'w') as f:
674            f.write(iperf_result)
675
676        self._android_device.adb.shell('rm {}'.format(
677            self._get_device_log_path()))
678
679        self._iperf_process = None
680        return log_file
681