1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18
19import scapy.all as scapy
20
21from acts import asserts
22from acts import utils
23from acts.metrics.loggers.blackbox import BlackboxMetricLogger
24from acts_contrib.test_utils.power import IperfHelper as IPH
25from acts_contrib.test_utils.power import plot_utils
26import acts_contrib.test_utils.power.cellular.cellular_power_base_test as PWCEL
27from acts_contrib.test_utils.tel import tel_test_utils as telutils
28
29
30class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest):
31    """ Cellular traffic power test.
32
33    Inherits from PowerCellularLabBaseTest. Parses config specific
34    to this kind of test. Contains methods to start data traffic
35    between a local instance of iPerf and one running in the dut.
36
37    """
38
39    # Keywords for test name parameters
40    PARAM_DIRECTION = 'direction'
41    PARAM_DIRECTION_UL = 'ul'
42    PARAM_DIRECTION_DL = 'dl'
43    PARAM_DIRECTION_DL_UL = 'dlul'
44    PARAM_BANDWIDTH_LIMIT = 'blimit'
45
46    # Iperf waiting time
47    IPERF_MARGIN = 10
48
49    def __init__(self, controllers):
50        """ Class initialization.
51
52        Sets test parameters to initial values.
53        """
54
55        super().__init__(controllers)
56
57        # These variables are passed to iPerf when starting data
58        # traffic with the -b parameter to limit throughput on
59        # the application layer.
60        self.bandwidth_limit_dl = None
61        self.bandwidth_limit_ul = None
62
63        # Throughput obtained from iPerf
64        self.iperf_results = {}
65
66        # Blackbox metrics loggers
67
68        self.dl_tput_logger = BlackboxMetricLogger.for_test_case(
69            metric_name='avg_dl_tput')
70        self.ul_tput_logger = BlackboxMetricLogger.for_test_case(
71            metric_name='avg_ul_tput')
72
73    def setup_class(self):
74        super().setup_class()
75
76        # Unpack test parameters used in this class
77        self.unpack_userparams(tcp_window_fraction=0, tcp_dumps=False)
78
79        # Verify that at least one PacketSender controller has been initialized
80        if not hasattr(self, 'packet_senders'):
81            raise RuntimeError('At least one packet sender controller needs '
82                               'to be defined in the test config files.')
83
84    def setup_test(self):
85        """ Executed before every test case.
86
87        Parses test configuration from the test name and prepares
88        the simulation for measurement.
89        """
90
91        # Reset results at the start of the test
92        self.iperf_results = {}
93
94        # Call parent method first to setup simulation
95        if not super().setup_test():
96            return False
97
98        # Traffic direction
99
100        values = self.consume_parameter(self.PARAM_DIRECTION, 1)
101
102        if not values:
103            self.log.warning("The keyword {} was not included in the testname "
104                             "parameters. Setting to {} by default.".format(
105                                 self.PARAM_DIRECTION,
106                                 self.PARAM_DIRECTION_DL_UL))
107            self.traffic_direction = self.PARAM_DIRECTION_DL_UL
108        elif values[1] in [
109                self.PARAM_DIRECTION_DL, self.PARAM_DIRECTION_UL,
110                self.PARAM_DIRECTION_DL_UL
111        ]:
112            self.traffic_direction = values[1]
113        else:
114            self.log.error("The test name has to include parameter {} "
115                           "followed by {}/{}/{}.".format(
116                               self.PARAM_DIRECTION, self.PARAM_DIRECTION_UL,
117                               self.PARAM_DIRECTION_DL,
118                               self.PARAM_DIRECTION_DL_UL))
119            return False
120
121        # Bandwidth limit
122
123        values = self.consume_parameter(self.PARAM_BANDWIDTH_LIMIT, 2)
124
125        if values:
126            self.bandwidth_limit_dl = values[1]
127            self.bandwidth_limit_ul = values[2]
128        else:
129            self.bandwidth_limit_dl = 0
130            self.bandwidth_limit_ul = 0
131            self.log.error(
132                "No bandwidth limit was indicated in the test parameters. "
133                "Setting to default value of 0 (no limit to bandwidth). To set "
134                "a different value include parameter '{}' followed by two "
135                "strings indicating downlink and uplink bandwidth limits for "
136                "iPerf.".format(self.PARAM_BANDWIDTH_LIMIT))
137
138        # No errors when parsing parameters
139        return True
140
141    def teardown_test(self):
142        """Tear down necessary objects after test case is finished.
143
144        """
145
146        super().teardown_test()
147
148        # Log the throughput values to Blackbox
149        self.dl_tput_logger.metric_value = self.iperf_results.get('DL', 0)
150        self.ul_tput_logger.metric_value = self.iperf_results.get('UL', 0)
151
152        # Log the throughput values to Spanner
153        self.power_logger.set_dl_tput(self.iperf_results.get('DL', 0))
154        self.power_logger.set_ul_tput(self.iperf_results.get('UL', 0))
155
156        try:
157            dl_max_throughput = self.simulation.maximum_downlink_throughput()
158            ul_max_throughput = self.simulation.maximum_uplink_throughput()
159            self.power_logger.set_dl_tput_threshold(dl_max_throughput)
160            self.power_logger.set_ul_tput_threshold(ul_max_throughput)
161        except NotImplementedError as e:
162            self.log.error("%s Downlink/uplink thresholds will not be "
163                           "logged in the power proto" % e)
164
165        for ips in self.iperf_servers:
166            ips.stop()
167
168    def power_tel_traffic_test(self):
169        """ Measures power and throughput during data transmission.
170
171        Measurement step in this test. Starts iPerf client in the DUT and then
172        initiates power measurement. After that, DUT is connected again and
173        the result from iPerf is collected. Pass or fail is decided with a
174        threshold value.
175        """
176
177        # Start data traffic
178        iperf_helpers = self.start_tel_traffic(self.dut)
179
180        # Measure power
181        self.collect_power_data()
182
183        # Wait for iPerf to finish
184        time.sleep(self.IPERF_MARGIN + 2)
185
186        # Collect throughput measurement
187        self.iperf_results = self.get_iperf_results(self.dut, iperf_helpers)
188
189        # Check if power measurement is below the required value
190        self.pass_fail_check(self.avg_current)
191
192        return self.avg_current, self.iperf_results
193
194    def get_iperf_results(self, device, iperf_helpers):
195        """ Pulls iperf results from the device.
196
197        Args:
198            device: the device from which iperf results need to be pulled.
199
200        Returns:
201            a dictionary containing DL/UL throughput in Mbit/s.
202        """
203
204        # Pull TCP logs if enabled
205        if self.tcp_dumps:
206            self.log.info('Pulling TCP dumps.')
207            telutils.stop_adb_tcpdump(self.dut)
208            telutils.get_tcpdump_log(self.dut)
209
210        throughput = {}
211
212        for iph in iperf_helpers:
213
214            self.log.info("Getting {} throughput results.".format(
215                iph.traffic_direction))
216
217            iperf_result = iph.process_iperf_results(device, self.log,
218                                                     self.iperf_servers,
219                                                     self.test_name)
220
221            throughput[iph.traffic_direction] = iperf_result
222
223        return throughput
224
225    def check_throughput_results(self, iperf_results):
226        """ Checks throughput results.
227
228        Compares the obtained throughput with the expected value
229        provided by the simulation class.
230
231        """
232
233        for direction, throughput in iperf_results.items():
234            try:
235                if direction == "UL":
236                    expected_t = self.simulation.maximum_uplink_throughput()
237                elif direction == "DL":
238                    expected_t = self.simulation.maximum_downlink_throughput()
239                else:
240                    raise RuntimeError("Unexpected traffic direction value.")
241            except NotImplementedError:
242                # Some simulation classes might not have implemented the max
243                # throughput calculation yet.
244                self.log.debug("Expected throughput is not available for the "
245                               "current simulation class.")
246            else:
247
248                self.log.info(
249                    "The expected {} throughput is {} Mbit/s.".format(
250                        direction, expected_t))
251                asserts.assert_true(
252                    0.90 < throughput / expected_t < 1.10,
253                    "{} throughput differed more than 10% from the expected "
254                    "value! ({}/{} = {})".format(
255                        direction, round(throughput, 3), round(expected_t, 3),
256                        round(throughput / expected_t, 3)))
257
258    def pass_fail_check(self, average_current=None):
259        """ Checks power consumption and throughput.
260
261        Uses the base class method to check power consumption. Also, compares
262        the obtained throughput with the expected value provided by the
263        simulation class.
264
265        """
266        self.check_throughput_results(self.iperf_results)
267        super().pass_fail_check(average_current)
268
269    def start_tel_traffic(self, client_host):
270        """ Starts iPerf in the indicated device and initiates traffic.
271
272        Starts the required iperf clients and servers according to the traffic
273        pattern config in the current test.
274
275        Args:
276            client_host: device handler in which to start the iperf client.
277
278        Returns:
279            A list of iperf helpers.
280        """
281        # The iPerf server is hosted in this computer
282        self.iperf_server_address = scapy.get_if_addr(
283            self.packet_senders[0].interface)
284
285        self.log.info('Testing IP connectivity with ping.')
286        if not utils.adb_shell_ping(
287                client_host, count=10, dest_ip=self.iperf_server_address):
288            raise RuntimeError('Ping between DUT and host failed.')
289
290        # Start iPerf traffic
291        iperf_helpers = []
292
293        # If the tcp_window_fraction parameter was set, calculate the TCP
294        # window size as a fraction of the peak throughput.
295        ul_tcp_window = None
296        dl_tcp_window = None
297        if self.tcp_window_fraction == 0:
298            self.log.info("tcp_window_fraction was not indicated. "
299                          "Disabling fixed TCP window.")
300        else:
301            try:
302                max_dl_tput = self.simulation.maximum_downlink_throughput()
303                max_ul_tput = self.simulation.maximum_uplink_throughput()
304                dl_tcp_window = max_dl_tput / self.tcp_window_fraction
305                ul_tcp_window = max_ul_tput / self.tcp_window_fraction
306            except NotImplementedError:
307                self.log.error("Maximum downlink/uplink throughput method not "
308                               "implemented for %s." %
309                               type(self.simulation).__name__)
310
311        if self.traffic_direction in [
312                self.PARAM_DIRECTION_DL, self.PARAM_DIRECTION_DL_UL
313        ]:
314            # Downlink traffic
315            iperf_helpers.append(
316                self.start_iperf_traffic(client_host,
317                                         server_idx=len(iperf_helpers),
318                                         traffic_direction='DL',
319                                         window=dl_tcp_window,
320                                         bandwidth=self.bandwidth_limit_dl))
321
322        if self.traffic_direction in [
323                self.PARAM_DIRECTION_UL, self.PARAM_DIRECTION_DL_UL
324        ]:
325            # Uplink traffic
326            iperf_helpers.append(
327                self.start_iperf_traffic(client_host,
328                                         server_idx=len(iperf_helpers),
329                                         traffic_direction='UL',
330                                         window=ul_tcp_window,
331                                         bandwidth=self.bandwidth_limit_ul))
332
333        # Enable TCP logger.
334        if self.tcp_dumps:
335            self.log.info('Enabling TCP logger.')
336            telutils.start_adb_tcpdump(self.dut)
337
338        return iperf_helpers
339
340    def start_iperf_traffic(self,
341                            client_host,
342                            server_idx,
343                            traffic_direction,
344                            bandwidth=0,
345                            window=None):
346        """Starts iPerf data traffic.
347
348        Starts an iperf client in an android device and a server locally.
349
350        Args:
351            client_host: device handler in which to start the iperf client
352            server_idx: id of the iperf server to connect to
353            traffic_direction: has to be either 'UL' or 'DL'
354            bandwidth: bandwidth limit for data traffic
355            window: the tcp window. if None, no window will be passed to iperf
356
357        Returns:
358            An IperfHelper object for the started client/server pair.
359        """
360
361        # Start the server locally
362        self.iperf_servers[server_idx].start()
363
364        config = {
365            'traffic_type': 'TCP',
366            'duration':
367            self.mon_duration + self.mon_offset + self.IPERF_MARGIN,
368            'start_meas_time': 4,
369            'server_idx': server_idx,
370            'port': self.iperf_servers[server_idx].port,
371            'traffic_direction': traffic_direction,
372            'window': window
373        }
374
375        # If bandwidth is equal to zero then no bandwidth requirements are set
376        if bandwidth > 0:
377            config['bandwidth'] = bandwidth
378
379        iph = IPH.IperfHelper(config)
380
381        # Start the client in the android device
382        client_host.adb.shell_nb(
383            "nohup >/dev/null 2>&1 sh -c 'iperf3 -c {} {} "
384            "&'".format(self.iperf_server_address, iph.iperf_args))
385
386        self.log.info('{} iPerf started on port {}.'.format(
387            traffic_direction, iph.port))
388
389        return iph
390
391
392class PowerTelRvRTest(PowerTelTrafficTest):
393    """ Gets Range vs Rate curves while measuring power consumption.
394
395    Uses PowerTelTrafficTest as a base class.
396    """
397
398    # Test name configuration keywords
399    PARAM_SWEEP = "sweep"
400    PARAM_SWEEP_UPLINK = "uplink"
401    PARAM_SWEEP_DOWNLINK = "downlink"
402
403    # Sweep values. Need to be set before starting test by test
404    # function or child class.
405    downlink_power_sweep = None
406    uplink_power_sweep = None
407
408    def setup_test(self):
409        """ Executed before every test case.
410
411        Parses test configuration from the test name and prepares
412        the simulation for measurement.
413        """
414
415        # Call parent method first to setup simulation
416        if not super().setup_test():
417            return False
418
419        # Get which power value to sweep from config
420
421        try:
422            values = self.consume_parameter(self.PARAM_SWEEP, 1)
423
424            if values[1] == self.PARAM_SWEEP_UPLINK:
425                self.sweep = self.PARAM_SWEEP_UPLINK
426            elif values[1] == self.PARAM_SWEEP_DOWNLINK:
427                self.sweep = self.PARAM_SWEEP_DOWNLINK
428            else:
429                raise ValueError()
430        except:
431            self.log.error(
432                "The test name has to include parameter {} followed by "
433                "either {} or {}.".format(self.PARAM_SWEEP,
434                                          self.PARAM_SWEEP_DOWNLINK,
435                                          self.PARAM_SWEEP_UPLINK))
436            return False
437
438        return True
439
440    def power_tel_rvr_test(self):
441        """ Main function for the RvR test.
442
443        Produces the RvR curve according to the indicated sweep values.
444        """
445
446        if self.sweep == self.PARAM_SWEEP_DOWNLINK:
447            sweep_range = self.downlink_power_sweep
448        elif self.sweep == self.PARAM_SWEEP_UPLINK:
449            sweep_range = self.uplink_power_sweep
450
451        current = []
452        throughput = []
453
454        for pw in sweep_range:
455
456            if self.sweep == self.PARAM_SWEEP_DOWNLINK:
457                self.simulation.set_downlink_rx_power(self.simulation.bts1, pw)
458            elif self.sweep == self.PARAM_SWEEP_UPLINK:
459                self.simulation.set_uplink_tx_power(self.simulation.bts1, pw)
460
461            i, t = self.power_tel_traffic_test()
462            self.log.info("---------------------")
463            self.log.info("{} -- {} --".format(self.sweep, pw))
464            self.log.info("{} ----- {}".format(i, t[0]))
465            self.log.info("---------------------")
466
467            current.append(i)
468            throughput.append(t[0])
469
470        print(sweep_range)
471        print(current)
472        print(throughput)
473
474
475class PowerTelTxPowerSweepTest(PowerTelTrafficTest):
476    """ Gets Average Current vs Tx Power plot.
477
478    Uses PowerTelTrafficTest as a base class.
479    """
480
481    # Test config keywords
482    KEY_TX_STEP = 'step'
483    KEY_UP_TOLERANCE = 'up_tolerance'
484    KEY_DOWN_TOLERANCE = 'down_tolerance'
485
486    # Test name parameters
487    PARAM_TX_POWER_SWEEP = 'sweep'
488
489    def setup_class(self):
490        super().setup_class()
491        self.unpack_userparams(
492            [self.KEY_TX_STEP, self.KEY_UP_TOLERANCE, self.KEY_DOWN_TOLERANCE])
493
494    def setup_test(self):
495        """ Executed before every test case.
496
497        Parses test configuration from the test name and prepares
498        the simulation for measurement.
499        """
500        # Call parent method first to setup simulation
501        if not super().setup_test():
502            return False
503
504        # Determine power range to sweep from test case params
505        try:
506            values = self.consume_parameter(self.PARAM_TX_POWER_SWEEP, 2)
507
508            if len(values) == 3:
509                self.start_dbm = int(values[1].replace('n', '-'))
510                self.end_dbm = int(values[2].replace('n', '-'))
511            else:
512                raise ValueError('Not enough params specified for sweep.')
513        except ValueError as e:
514            self.log.error("Unable to parse test param sweep: {}".format(e))
515            return False
516
517        return True
518
519    def pass_fail_check(self, currents, txs, iperf_results):
520        """ Compares the obtained throughput with the expected
521        value provided by the simulation class. Also, ensures
522        consecutive currents do not increase or decrease beyond
523        specified tolerance
524        """
525        for iperf_result in iperf_results:
526            self.check_throughput_results(iperf_result)
527
528        # x = reference current value, y = next current value, i = index of x
529        for i, (x, y) in enumerate(zip(currents[::], currents[1::])):
530            measured_change = (y - x) / x * 100
531            asserts.assert_true(
532                -self.down_tolerance < measured_change < self.up_tolerance,
533                "Current went from {} to {} ({}%) between {} dBm and {} dBm. "
534                "Tolerance range: -{}% to {}%".format(x, y, measured_change,
535                                                      txs[i], txs[i + 1],
536                                                      self.down_tolerance,
537                                                      self.up_tolerance))
538
539    def create_power_plot(self, currents, txs):
540        """ Creates average current vs tx power plot
541        """
542        title = '{}_{}_{}_tx_power_sweep'.format(
543            self.test_name, self.dut.model, self.dut.build_info['build_id'])
544
545        plot_utils.monsoon_tx_power_sweep_plot(self.mon_info.data_path, title,
546                                               currents, txs)
547
548    def power_tel_tx_sweep(self):
549        """ Main function for the Tx power sweep test.
550
551        Produces a plot of power consumption vs tx power
552        """
553        currents = []
554        txs = []
555        iperf_results = []
556        for tx in range(self.start_dbm, self.end_dbm + 1, self.step):
557
558            self.simulation.set_uplink_tx_power(tx)
559
560            iperf_helpers = self.start_tel_traffic(self.dut)
561
562            # Measure power
563            self.collect_power_data()
564
565            # Wait for iPerf to finish
566            time.sleep(self.IPERF_MARGIN + 2)
567
568            # Collect and check throughput measurement
569            iperf_result = self.get_iperf_results(self.dut, iperf_helpers)
570
571            currents.append(self.avg_current)
572
573            # Get the actual Tx power as measured from the callbox side
574            measured_tx = self.simulation.get_measured_ul_power()
575
576            txs.append(measured_tx)
577            iperf_results.append(iperf_result)
578
579        self.create_power_plot(currents, txs)
580        self.pass_fail_check(currents, txs, iperf_results)
581