1#!/usr/bin/env python3 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import logging 19import math 20import os 21import shlex 22import subprocess 23import threading 24import time 25 26from acts import context 27from acts import utils 28from acts.controllers.android_device import AndroidDevice 29from acts.controllers.utils_lib.ssh import connection 30from acts.controllers.utils_lib.ssh import settings 31from acts.event import event_bus 32from acts.event.decorators import subscribe_static 33from acts.event.event import TestClassBeginEvent 34from acts.event.event import TestClassEndEvent 35from acts.libs.proc import job 36 37ACTS_CONTROLLER_CONFIG_NAME = 'IPerfServer' 38ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers' 39KILOBITS = 1024 40MEGABITS = KILOBITS * 1024 41GIGABITS = MEGABITS * 1024 42BITS_IN_BYTE = 8 43 44 45def create(configs): 46 """ Factory method for iperf servers. 47 48 The function creates iperf servers based on at least one config. 49 If configs only specify a port number, a regular local IPerfServer object 50 will be created. If configs contains ssh settings or and AndroidDevice, 51 remote iperf servers will be started on those devices 52 53 Args: 54 configs: config parameters for the iperf server 55 """ 56 results = [] 57 for c in configs: 58 if type(c) in (str, int) and str(c).isdigit(): 59 results.append(IPerfServer(int(c))) 60 elif type(c) is dict and 'AndroidDevice' in c and 'port' in c: 61 results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port'])) 62 elif type(c) is dict and 'ssh_config' in c and 'port' in c: 63 results.append( 64 IPerfServerOverSsh(c['ssh_config'], 65 c['port'], 66 test_interface=c.get('test_interface'))) 67 else: 68 raise ValueError( 69 'Config entry %s in %s is not a valid IPerfServer ' 70 'config.' % (repr(c), configs)) 71 return results 72 73 74def get_info(iperf_servers): 75 """Placeholder for info about iperf servers 76 77 Returns: 78 None 79 """ 80 return None 81 82 83def destroy(iperf_server_list): 84 for iperf_server in iperf_server_list: 85 try: 86 iperf_server.stop() 87 except Exception: 88 logging.exception('Unable to properly clean up %s.' % iperf_server) 89 90 91class IPerfResult(object): 92 def __init__(self, result_path, reporting_speed_units='Mbytes'): 93 """Loads iperf result from file. 94 95 Loads iperf result from JSON formatted server log. File can be accessed 96 before or after server is stopped. Note that only the first JSON object 97 will be loaded and this funtion is not intended to be used with files 98 containing multiple iperf client runs. 99 """ 100 # if result_path isn't a path, treat it as JSON 101 self.reporting_speed_units = reporting_speed_units 102 if not os.path.exists(result_path): 103 self.result = json.loads(result_path) 104 else: 105 try: 106 with open(result_path, 'r') as f: 107 iperf_output = f.readlines() 108 if '}\n' in iperf_output: 109 iperf_output = iperf_output[:iperf_output.index('}\n' 110 ) + 1] 111 iperf_string = ''.join(iperf_output) 112 iperf_string = iperf_string.replace('nan', '0') 113 self.result = json.loads(iperf_string) 114 except ValueError: 115 with open(result_path, 'r') as f: 116 # Possibly a result from interrupted iperf run, 117 # skip first line and try again. 118 lines = f.readlines()[1:] 119 self.result = json.loads(''.join(lines)) 120 121 def _has_data(self): 122 """Checks if the iperf result has valid throughput data. 123 124 Returns: 125 True if the result contains throughput data. False otherwise. 126 """ 127 return ('end' in self.result) and ('sum_received' in self.result['end'] 128 or 'sum' in self.result['end']) 129 130 def _get_reporting_speed(self, network_speed_in_bits_per_second): 131 """Sets the units for the network speed reporting based on how the 132 object was initiated. Defaults to Megabytes per second. Currently 133 supported, bits per second (bits), kilobits per second (kbits), megabits 134 per second (mbits), gigabits per second (gbits), bytes per second 135 (bytes), kilobits per second (kbytes), megabits per second (mbytes), 136 gigabytes per second (gbytes). 137 138 Args: 139 network_speed_in_bits_per_second: The network speed from iperf in 140 bits per second. 141 142 Returns: 143 The value of the throughput in the appropriate units. 144 """ 145 speed_divisor = 1 146 if self.reporting_speed_units[1:].lower() == 'bytes': 147 speed_divisor = speed_divisor * BITS_IN_BYTE 148 if self.reporting_speed_units[0:1].lower() == 'k': 149 speed_divisor = speed_divisor * KILOBITS 150 if self.reporting_speed_units[0:1].lower() == 'm': 151 speed_divisor = speed_divisor * MEGABITS 152 if self.reporting_speed_units[0:1].lower() == 'g': 153 speed_divisor = speed_divisor * GIGABITS 154 return network_speed_in_bits_per_second / speed_divisor 155 156 def get_json(self): 157 """Returns the raw json output from iPerf.""" 158 return self.result 159 160 @property 161 def error(self): 162 return self.result.get('error', None) 163 164 @property 165 def avg_rate(self): 166 """Average UDP rate in MB/s over the entire run. 167 168 This is the average UDP rate observed at the terminal the iperf result 169 is pulled from. According to iperf3 documentation this is calculated 170 based on bytes sent and thus is not a good representation of the 171 quality of the link. If the result is not from a success run, this 172 property is None. 173 """ 174 if not self._has_data() or 'sum' not in self.result['end']: 175 return None 176 bps = self.result['end']['sum']['bits_per_second'] 177 return self._get_reporting_speed(bps) 178 179 @property 180 def avg_receive_rate(self): 181 """Average receiving rate in MB/s over the entire run. 182 183 This data may not exist if iperf was interrupted. If the result is not 184 from a success run, this property is None. 185 """ 186 if not self._has_data() or 'sum_received' not in self.result['end']: 187 return None 188 bps = self.result['end']['sum_received']['bits_per_second'] 189 return self._get_reporting_speed(bps) 190 191 @property 192 def avg_send_rate(self): 193 """Average sending rate in MB/s over the entire run. 194 195 This data may not exist if iperf was interrupted. If the result is not 196 from a success run, this property is None. 197 """ 198 if not self._has_data() or 'sum_sent' not in self.result['end']: 199 return None 200 bps = self.result['end']['sum_sent']['bits_per_second'] 201 return self._get_reporting_speed(bps) 202 203 @property 204 def instantaneous_rates(self): 205 """Instantaneous received rate in MB/s over entire run. 206 207 This data may not exist if iperf was interrupted. If the result is not 208 from a success run, this property is None. 209 """ 210 if not self._has_data(): 211 return None 212 intervals = [ 213 self._get_reporting_speed(interval['sum']['bits_per_second']) 214 for interval in self.result['intervals'] 215 ] 216 return intervals 217 218 @property 219 def std_deviation(self): 220 """Standard deviation of rates in MB/s over entire run. 221 222 This data may not exist if iperf was interrupted. If the result is not 223 from a success run, this property is None. 224 """ 225 return self.get_std_deviation(0) 226 227 def get_std_deviation(self, iperf_ignored_interval): 228 """Standard deviation of rates in MB/s over entire run. 229 230 This data may not exist if iperf was interrupted. If the result is not 231 from a success run, this property is None. A configurable number of 232 beginning (and the single last) intervals are ignored in the 233 calculation as they are inaccurate (e.g. the last is from a very small 234 interval) 235 236 Args: 237 iperf_ignored_interval: number of iperf interval to ignored in 238 calculating standard deviation 239 240 Returns: 241 The standard deviation. 242 """ 243 if not self._has_data(): 244 return None 245 instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval: 246 -1] 247 avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) 248 sqd_deviations = ([(rate - avg_rate)**2 249 for rate in instantaneous_rates]) 250 std_dev = math.sqrt( 251 math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) 252 return std_dev 253 254 255class IPerfServerBase(object): 256 # Keeps track of the number of IPerfServer logs to prevent file name 257 # collisions. 258 __log_file_counter = 0 259 260 __log_file_lock = threading.Lock() 261 262 def __init__(self, port): 263 self._port = port 264 # TODO(markdr): We shouldn't be storing the log files in an array like 265 # this. Nobody should be reading this property either. Instead, the 266 # IPerfResult should be returned in stop() with all the necessary info. 267 # See aosp/1012824 for a WIP implementation. 268 self.log_files = [] 269 270 @property 271 def port(self): 272 raise NotImplementedError('port must be specified.') 273 274 @property 275 def started(self): 276 raise NotImplementedError('started must be specified.') 277 278 def start(self, extra_args='', tag=''): 279 """Starts an iperf3 server. 280 281 Args: 282 extra_args: A string representing extra arguments to start iperf 283 server with. 284 tag: Appended to log file name to identify logs from different 285 iperf runs. 286 """ 287 raise NotImplementedError('start() must be specified.') 288 289 def stop(self): 290 """Stops the iperf server. 291 292 Returns: 293 The name of the log file generated from the terminated session. 294 """ 295 raise NotImplementedError('stop() must be specified.') 296 297 def _get_full_file_path(self, tag=None): 298 """Returns the full file path for the IPerfServer log file. 299 300 Note: If the directory for the file path does not exist, it will be 301 created. 302 303 Args: 304 tag: The tag passed in to the server run. 305 """ 306 out_dir = self.log_path 307 308 with IPerfServerBase.__log_file_lock: 309 tags = [tag, IPerfServerBase.__log_file_counter] 310 out_file_name = 'IPerfServer,%s.log' % (','.join( 311 [str(x) for x in tags if x != '' and x is not None])) 312 IPerfServerBase.__log_file_counter += 1 313 314 file_path = os.path.join(out_dir, out_file_name) 315 self.log_files.append(file_path) 316 return file_path 317 318 @property 319 def log_path(self): 320 current_context = context.get_current_context() 321 full_out_dir = os.path.join(current_context.get_full_output_path(), 322 'IPerfServer%s' % self.port) 323 324 # Ensure the directory exists. 325 os.makedirs(full_out_dir, exist_ok=True) 326 327 return full_out_dir 328 329 330def _get_port_from_ss_output(ss_output, pid): 331 pid = str(pid) 332 lines = ss_output.split('\n') 333 for line in lines: 334 if pid in line: 335 # Expected format: 336 # tcp LISTEN 0 5 *:<PORT> *:* users:(("cmd",pid=<PID>,fd=3)) 337 return line.split()[4].split(':')[-1] 338 else: 339 raise ProcessLookupError('Could not find started iperf3 process.') 340 341 342class IPerfServer(IPerfServerBase): 343 """Class that handles iperf server commands on localhost.""" 344 def __init__(self, port=5201): 345 super().__init__(port) 346 self._hinted_port = port 347 self._current_log_file = None 348 self._iperf_process = None 349 self._last_opened_file = None 350 351 @property 352 def port(self): 353 return self._port 354 355 @property 356 def started(self): 357 return self._iperf_process is not None 358 359 def start(self, extra_args='', tag=''): 360 """Starts iperf server on local machine. 361 362 Args: 363 extra_args: A string representing extra arguments to start iperf 364 server with. 365 tag: Appended to log file name to identify logs from different 366 iperf runs. 367 """ 368 if self._iperf_process is not None: 369 return 370 371 self._current_log_file = self._get_full_file_path(tag) 372 373 # Run an iperf3 server on the hinted port with JSON output. 374 command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J'] 375 376 command.extend(shlex.split(extra_args)) 377 378 if self._last_opened_file: 379 self._last_opened_file.close() 380 self._last_opened_file = open(self._current_log_file, 'w') 381 self._iperf_process = subprocess.Popen(command, 382 stdout=self._last_opened_file, 383 stderr=subprocess.DEVNULL) 384 for attempts_left in reversed(range(3)): 385 try: 386 self._port = int( 387 _get_port_from_ss_output( 388 job.run('ss -l -p -n | grep iperf').stdout, 389 self._iperf_process.pid)) 390 break 391 except ProcessLookupError: 392 if attempts_left == 0: 393 raise 394 logging.debug('iperf3 process not started yet.') 395 time.sleep(.01) 396 397 def stop(self): 398 """Stops the iperf server. 399 400 Returns: 401 The name of the log file generated from the terminated session. 402 """ 403 if self._iperf_process is None: 404 return 405 406 if self._last_opened_file: 407 self._last_opened_file.close() 408 self._last_opened_file = None 409 410 self._iperf_process.terminate() 411 self._iperf_process = None 412 413 return self._current_log_file 414 415 def __del__(self): 416 self.stop() 417 418 419class IPerfServerOverSsh(IPerfServerBase): 420 """Class that handles iperf3 operations on remote machines.""" 421 def __init__(self, ssh_config, port, test_interface=None): 422 super().__init__(port) 423 ssh_settings = settings.from_config(ssh_config) 424 self._ssh_session = connection.SshConnection(ssh_settings) 425 426 self._iperf_pid = None 427 self._current_tag = None 428 self.hostname = ssh_settings.hostname 429 try: 430 # A test interface can only be found if an ip address is specified. 431 # A fully qualified hostname will return None for the 432 # test_interface. 433 self.test_interface = self._get_test_interface_based_on_ip( 434 test_interface) 435 except Exception: 436 self.test_interface = None 437 438 @property 439 def port(self): 440 return self._port 441 442 @property 443 def started(self): 444 return self._iperf_pid is not None 445 446 def _get_remote_log_path(self): 447 return 'iperf_server_port%s.log' % self.port 448 449 def _get_test_interface_based_on_ip(self, test_interface): 450 """Gets the test interface for a particular IP if the test interface 451 passed in test_interface is None 452 453 Args: 454 test_interface: Either a interface name, ie eth0, or None 455 456 Returns: 457 The name of the test interface. 458 """ 459 if test_interface: 460 return test_interface 461 return utils.get_interface_based_on_ip(self._ssh_session, 462 self.hostname) 463 464 def get_interface_ip_addresses(self, interface): 465 """Gets all of the ip addresses, ipv4 and ipv6, associated with a 466 particular interface name. 467 468 Args: 469 interface: The interface name on the device, ie eth0 470 471 Returns: 472 A list of dictionaries of the the various IP addresses: 473 ipv4_private_local_addresses: Any 192.168, 172.16, or 10 474 addresses 475 ipv4_public_addresses: Any IPv4 public addresses 476 ipv6_link_local_addresses: Any fe80:: addresses 477 ipv6_private_local_addresses: Any fd00:: addresses 478 ipv6_public_addresses: Any publicly routable addresses 479 """ 480 return utils.get_interface_ip_addresses(self._ssh_session, interface) 481 482 def renew_test_interface_ip_address(self): 483 """Renews the test interface's IP address. Necessary for changing 484 DHCP scopes during a test. 485 """ 486 utils.renew_linux_ip_address(self._ssh_session, self.test_interface) 487 488 def start(self, extra_args='', tag='', iperf_binary=None): 489 """Starts iperf server on specified machine and port. 490 491 Args: 492 extra_args: A string representing extra arguments to start iperf 493 server with. 494 tag: Appended to log file name to identify logs from different 495 iperf runs. 496 iperf_binary: Location of iperf3 binary. If none, it is assumed the 497 the binary is in the path. 498 """ 499 if self.started: 500 return 501 502 if not iperf_binary: 503 logging.debug('No iperf3 binary specified. ' 504 'Assuming iperf3 is in the path.') 505 iperf_binary = 'iperf3' 506 else: 507 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 508 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 509 510 cmd = '{cmd} {extra_flags} > {log_file}'.format( 511 cmd=iperf_command, 512 extra_flags=extra_args, 513 log_file=self._get_remote_log_path()) 514 515 job_result = self._ssh_session.run_async(cmd) 516 self._iperf_pid = job_result.stdout 517 self._current_tag = tag 518 519 def stop(self): 520 """Stops the iperf server. 521 522 Returns: 523 The name of the log file generated from the terminated session. 524 """ 525 if not self.started: 526 return 527 528 self._ssh_session.run_async('kill -9 {}'.format(str(self._iperf_pid))) 529 iperf_result = self._ssh_session.run('cat {}'.format( 530 self._get_remote_log_path())) 531 532 log_file = self._get_full_file_path(self._current_tag) 533 with open(log_file, 'w') as f: 534 f.write(iperf_result.stdout) 535 536 self._ssh_session.run_async('rm {}'.format( 537 self._get_remote_log_path())) 538 self._iperf_pid = None 539 return log_file 540 541 542# TODO(markdr): Remove this after automagic controller creation has been 543# removed. 544class _AndroidDeviceBridge(object): 545 """A helper class for connecting serial numbers to AndroidDevices.""" 546 547 _test_class = None 548 549 @staticmethod 550 @subscribe_static(TestClassBeginEvent) 551 def on_test_begin(event): 552 _AndroidDeviceBridge._test_class = event.test_class 553 554 @staticmethod 555 @subscribe_static(TestClassEndEvent) 556 def on_test_end(_): 557 _AndroidDeviceBridge._test_class = None 558 559 @staticmethod 560 def android_devices(): 561 """A dict of serial -> AndroidDevice, where AndroidDevice is a device 562 found in the current TestClass's controllers. 563 """ 564 if not _AndroidDeviceBridge._test_class: 565 return {} 566 return { 567 device.serial: device 568 for device in _AndroidDeviceBridge._test_class.android_devices 569 } 570 571 572event_bus.register_subscription( 573 _AndroidDeviceBridge.on_test_begin.subscription) 574event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription) 575 576 577class IPerfServerOverAdb(IPerfServerBase): 578 """Class that handles iperf3 operations over ADB devices.""" 579 def __init__(self, android_device_or_serial, port): 580 """Creates a new IPerfServerOverAdb object. 581 582 Args: 583 android_device_or_serial: Either an AndroidDevice object, or the 584 serial that corresponds to the AndroidDevice. Note that the 585 serial must be present in an AndroidDevice entry in the ACTS 586 config. 587 port: The port number to open the iperf server on. 588 """ 589 super().__init__(port) 590 self._android_device_or_serial = android_device_or_serial 591 592 self._iperf_process = None 593 self._current_tag = '' 594 595 @property 596 def port(self): 597 return self._port 598 599 @property 600 def started(self): 601 return self._iperf_process is not None 602 603 @property 604 def _android_device(self): 605 if isinstance(self._android_device_or_serial, AndroidDevice): 606 return self._android_device_or_serial 607 else: 608 return _AndroidDeviceBridge.android_devices()[ 609 self._android_device_or_serial] 610 611 def _get_device_log_path(self): 612 return '~/data/iperf_server_port%s.log' % self.port 613 614 def start(self, extra_args='', tag='', iperf_binary=None): 615 """Starts iperf server on an ADB device. 616 617 Args: 618 extra_args: A string representing extra arguments to start iperf 619 server with. 620 tag: Appended to log file name to identify logs from different 621 iperf runs. 622 iperf_binary: Location of iperf3 binary. If none, it is assumed the 623 the binary is in the path. 624 """ 625 if self._iperf_process is not None: 626 return 627 628 if not iperf_binary: 629 logging.debug('No iperf3 binary specified. ' 630 'Assuming iperf3 is in the path.') 631 iperf_binary = 'iperf3' 632 else: 633 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 634 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 635 636 self._iperf_process = self._android_device.adb.shell_nb( 637 '{cmd} {extra_flags} > {log_file}'.format( 638 cmd=iperf_command, 639 extra_flags=extra_args, 640 log_file=self._get_device_log_path())) 641 642 self._iperf_process_adb_pid = '' 643 while len(self._iperf_process_adb_pid) == 0: 644 self._iperf_process_adb_pid = self._android_device.adb.shell( 645 'pgrep iperf3 -n') 646 647 self._current_tag = tag 648 649 def stop(self): 650 """Stops the iperf server. 651 652 Returns: 653 The name of the log file generated from the terminated session. 654 """ 655 if self._iperf_process is None: 656 return 657 658 job.run('kill -9 {}'.format(self._iperf_process.pid)) 659 660 # TODO(markdr): update with definitive kill method 661 while True: 662 iperf_process_list = self._android_device.adb.shell('pgrep iperf3') 663 if iperf_process_list.find(self._iperf_process_adb_pid) == -1: 664 break 665 else: 666 self._android_device.adb.shell("kill -9 {}".format( 667 self._iperf_process_adb_pid)) 668 669 iperf_result = self._android_device.adb.shell('cat {}'.format( 670 self._get_device_log_path())) 671 672 log_file = self._get_full_file_path(self._current_tag) 673 with open(log_file, 'w') as f: 674 f.write(iperf_result) 675 676 self._android_device.adb.shell('rm {}'.format( 677 self._get_device_log_path())) 678 679 self._iperf_process = None 680 return log_file 681