1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Unittest for suite_runner."""
9
10from __future__ import print_function
11
12import json
13
14import unittest
15import unittest.mock as mock
16
17import suite_runner
18import label
19
20from benchmark import Benchmark
21
22from cros_utils import command_executer
23from cros_utils import logger
24from machine_manager import MockCrosMachine
25
26
27class SuiteRunnerTest(unittest.TestCase):
28  """Class of SuiteRunner test."""
29  mock_json = mock.Mock(spec=json)
30  mock_cmd_exec = mock.Mock(spec=command_executer.CommandExecuter)
31  mock_cmd_term = mock.Mock(spec=command_executer.CommandTerminator)
32  mock_logger = mock.Mock(spec=logger.Logger)
33  mock_label = label.MockLabel('lumpy', 'build', 'lumpy_chromeos_image', '', '',
34                               '/tmp/chromeos', 'lumpy',
35                               ['lumpy1.cros', 'lumpy.cros2'], '', '', False,
36                               'average', 'gcc', False, '')
37  telemetry_crosperf_bench = Benchmark(
38      'b1_test',  # name
39      'octane',  # test_name
40      '',  # test_args
41      3,  # iterations
42      False,  # rm_chroot_tmp
43      'record -e cycles',  # perf_args
44      'telemetry_Crosperf',  # suite
45      True)  # show_all_results
46
47  crosperf_wrapper_bench = Benchmark(
48      'b2_test',  # name
49      'webgl',  # test_name
50      '',  # test_args
51      3,  # iterations
52      False,  # rm_chroot_tmp
53      '',  # perf_args
54      'crosperf_Wrapper')  # suite
55
56  tast_bench = Benchmark(
57      'b3_test',  # name
58      'platform.ReportDiskUsage',  # test_name
59      '',  # test_args
60      1,  # iterations
61      False,  # rm_chroot_tmp
62      '',  # perf_args
63      'tast')  # suite
64
65  def __init__(self, *args, **kwargs):
66    super(SuiteRunnerTest, self).__init__(*args, **kwargs)
67    self.skylab_run_args = []
68    self.test_that_args = []
69    self.tast_args = []
70    self.call_skylab_run = False
71    self.call_test_that_run = False
72    self.call_tast_run = False
73
74  def setUp(self):
75    self.runner = suite_runner.SuiteRunner(
76        {}, self.mock_logger, 'verbose', self.mock_cmd_exec, self.mock_cmd_term)
77
78  def test_get_profiler_args(self):
79    input_str = ("--profiler=custom_perf --profiler_args='perf_options"
80                 '="record -a -e cycles,instructions"\'')
81    output_str = ("profiler=custom_perf profiler_args='record -a -e "
82                  "cycles,instructions'")
83    res = suite_runner.GetProfilerArgs(input_str)
84    self.assertEqual(res, output_str)
85
86  def test_get_dut_config_args(self):
87    dut_config = {'enable_aslr': False, 'top_interval': 1.0}
88    output_str = ('dut_config='
89                  "'"
90                  '{"enable_aslr": '
91                  'false, "top_interval": 1.0}'
92                  "'"
93                  '')
94    res = suite_runner.GetDutConfigArgs(dut_config)
95    self.assertEqual(res, output_str)
96
97  def test_run(self):
98
99    def reset():
100      self.test_that_args = []
101      self.skylab_run_args = []
102      self.tast_args = []
103      self.call_test_that_run = False
104      self.call_skylab_run = False
105      self.call_tast_run = False
106
107    def FakeSkylabRun(test_label, benchmark, test_args, profiler_args):
108      self.skylab_run_args = [test_label, benchmark, test_args, profiler_args]
109      self.call_skylab_run = True
110      return 'Ran FakeSkylabRun'
111
112    def FakeTestThatRun(machine, test_label, benchmark, test_args,
113                        profiler_args):
114      self.test_that_args = [
115          machine, test_label, benchmark, test_args, profiler_args
116      ]
117      self.call_test_that_run = True
118      return 'Ran FakeTestThatRun'
119
120    def FakeTastRun(machine, test_label, benchmark):
121      self.tast_args = [machine, test_label, benchmark]
122      self.call_tast_run = True
123      return 'Ran FakeTastRun'
124
125    self.runner.Skylab_Run = FakeSkylabRun
126    self.runner.Test_That_Run = FakeTestThatRun
127    self.runner.Tast_Run = FakeTastRun
128
129    self.runner.dut_config['enable_aslr'] = False
130    self.runner.dut_config['cooldown_time'] = 0
131    self.runner.dut_config['governor'] = 'fake_governor'
132    self.runner.dut_config['cpu_freq_pct'] = 65
133    self.runner.dut_config['intel_pstate'] = 'no_hwp'
134    machine = 'fake_machine'
135    cros_machine = MockCrosMachine(machine, self.mock_label.chromeos_root,
136                                   self.mock_logger)
137    test_args = ''
138    profiler_args = ''
139
140    # Test skylab run for telemetry_Crosperf and crosperf_Wrapper benchmarks.
141    self.mock_label.skylab = True
142    reset()
143    self.runner.Run(cros_machine, self.mock_label, self.crosperf_wrapper_bench,
144                    test_args, profiler_args)
145    self.assertTrue(self.call_skylab_run)
146    self.assertFalse(self.call_test_that_run)
147    self.assertEqual(self.skylab_run_args,
148                     [self.mock_label, self.crosperf_wrapper_bench, '', ''])
149
150    reset()
151    self.runner.Run(cros_machine, self.mock_label,
152                    self.telemetry_crosperf_bench, test_args, profiler_args)
153    self.assertTrue(self.call_skylab_run)
154    self.assertFalse(self.call_test_that_run)
155    self.assertEqual(self.skylab_run_args,
156                     [self.mock_label, self.telemetry_crosperf_bench, '', ''])
157
158    # Test test_that run for telemetry_Crosperf and crosperf_Wrapper benchmarks.
159    self.mock_label.skylab = False
160    reset()
161    self.runner.Run(cros_machine, self.mock_label, self.crosperf_wrapper_bench,
162                    test_args, profiler_args)
163    self.assertTrue(self.call_test_that_run)
164    self.assertFalse(self.call_skylab_run)
165    self.assertEqual(
166        self.test_that_args,
167        ['fake_machine', self.mock_label, self.crosperf_wrapper_bench, '', ''])
168
169    reset()
170    self.runner.Run(cros_machine, self.mock_label,
171                    self.telemetry_crosperf_bench, test_args, profiler_args)
172    self.assertTrue(self.call_test_that_run)
173    self.assertFalse(self.call_skylab_run)
174    self.assertEqual(self.test_that_args, [
175        'fake_machine', self.mock_label, self.telemetry_crosperf_bench, '', ''
176    ])
177
178    # Test tast run for tast benchmarks.
179    reset()
180    self.runner.Run(cros_machine, self.mock_label, self.tast_bench, '', '')
181    self.assertTrue(self.call_tast_run)
182    self.assertFalse(self.call_test_that_run)
183    self.assertFalse(self.call_skylab_run)
184    self.assertEqual(self.tast_args,
185                     ['fake_machine', self.mock_label, self.tast_bench])
186
187  def test_gen_test_args(self):
188    test_args = '--iterations=2'
189    perf_args = 'record -a -e cycles'
190
191    # Test crosperf_Wrapper benchmarks arg list generation
192    args_list = ["test_args='--iterations=2'", "dut_config='{}'", 'test=webgl']
193    res = self.runner.GenTestArgs(self.crosperf_wrapper_bench, test_args, '')
194    self.assertCountEqual(res, args_list)
195
196    # Test telemetry_Crosperf benchmarks arg list generation
197    args_list = [
198        "test_args='--iterations=2'", "dut_config='{}'", 'test=octane',
199        'run_local=False'
200    ]
201    args_list.append(suite_runner.GetProfilerArgs(perf_args))
202    res = self.runner.GenTestArgs(self.telemetry_crosperf_bench, test_args,
203                                  perf_args)
204    self.assertCountEqual(res, args_list)
205
206  @mock.patch.object(command_executer.CommandExecuter, 'CrosRunCommand')
207  @mock.patch.object(command_executer.CommandExecuter,
208                     'ChrootRunCommandWOutput')
209  def test_tast_run(self, mock_chroot_runcmd, mock_cros_runcmd):
210    mock_chroot_runcmd.return_value = 0
211    self.mock_cmd_exec.ChrootRunCommandWOutput = mock_chroot_runcmd
212    self.mock_cmd_exec.CrosRunCommand = mock_cros_runcmd
213    res = self.runner.Tast_Run('lumpy1.cros', self.mock_label, self.tast_bench)
214    self.assertEqual(mock_cros_runcmd.call_count, 1)
215    self.assertEqual(mock_chroot_runcmd.call_count, 1)
216    self.assertEqual(res, 0)
217    self.assertEqual(mock_cros_runcmd.call_args_list[0][0],
218                     ('rm -rf /usr/local/autotest/results/*',))
219    args_list = mock_chroot_runcmd.call_args_list[0][0]
220    args_dict = mock_chroot_runcmd.call_args_list[0][1]
221    self.assertEqual(len(args_list), 2)
222    self.assertEqual(args_dict['command_terminator'], self.mock_cmd_term)
223
224  @mock.patch.object(command_executer.CommandExecuter, 'CrosRunCommand')
225  @mock.patch.object(command_executer.CommandExecuter,
226                     'ChrootRunCommandWOutput')
227  @mock.patch.object(logger.Logger, 'LogFatal')
228  def test_test_that_run(self, mock_log_fatal, mock_chroot_runcmd,
229                         mock_cros_runcmd):
230    mock_log_fatal.side_effect = SystemExit()
231    self.runner.logger.LogFatal = mock_log_fatal
232    # Test crosperf_Wrapper benchmarks cannot take perf_args
233    raised_exception = False
234    try:
235      self.runner.Test_That_Run('lumpy1.cros', self.mock_label,
236                                self.crosperf_wrapper_bench, '',
237                                'record -a -e cycles')
238    except SystemExit:
239      raised_exception = True
240    self.assertTrue(raised_exception)
241
242    mock_chroot_runcmd.return_value = 0
243    self.mock_cmd_exec.ChrootRunCommandWOutput = mock_chroot_runcmd
244    self.mock_cmd_exec.CrosRunCommand = mock_cros_runcmd
245    res = self.runner.Test_That_Run('lumpy1.cros', self.mock_label,
246                                    self.crosperf_wrapper_bench,
247                                    '--iterations=2', '')
248    self.assertEqual(mock_cros_runcmd.call_count, 1)
249    self.assertEqual(mock_chroot_runcmd.call_count, 1)
250    self.assertEqual(res, 0)
251    self.assertEqual(mock_cros_runcmd.call_args_list[0][0],
252                     ('rm -rf /usr/local/autotest/results/*',))
253    args_list = mock_chroot_runcmd.call_args_list[0][0]
254    args_dict = mock_chroot_runcmd.call_args_list[0][1]
255    self.assertEqual(len(args_list), 2)
256    self.assertEqual(args_dict['command_terminator'], self.mock_cmd_term)
257
258  @mock.patch.object(command_executer.CommandExecuter, 'RunCommandWOutput')
259  @mock.patch.object(json, 'loads')
260  def test_skylab_run_client(self, mock_json_loads, mock_runcmd):
261
262    def FakeDownloadResult(l, task_id):
263      if l and task_id:
264        self.assertEqual(task_id, '12345')
265        return 0
266
267    mock_runcmd.return_value = (
268        0,
269        'Created Swarming task https://swarming/task/b12345',
270        '',
271    )
272    self.mock_cmd_exec.RunCommandWOutput = mock_runcmd
273
274    mock_json_loads.return_value = {
275        'child-results': [{
276            'success': True,
277            'task-run-url': 'https://swarming/task?id=12345'
278        }]
279    }
280    self.mock_json.loads = mock_json_loads
281
282    self.mock_label.skylab = True
283    self.runner.DownloadResult = FakeDownloadResult
284    res = self.runner.Skylab_Run(self.mock_label, self.crosperf_wrapper_bench,
285                                 '', '')
286    ret_tup = (0, '\nResults placed in tmp/swarming-12345\n', '')
287    self.assertEqual(res, ret_tup)
288    self.assertEqual(mock_runcmd.call_count, 2)
289
290    args_list = mock_runcmd.call_args_list[0][0]
291    args_dict = mock_runcmd.call_args_list[0][1]
292    self.assertEqual(len(args_list), 1)
293    self.assertEqual(args_dict['command_terminator'], self.mock_cmd_term)
294
295    args_list = mock_runcmd.call_args_list[1][0]
296    self.assertEqual(args_list[0], ('skylab wait-task 12345'))
297    self.assertEqual(args_dict['command_terminator'], self.mock_cmd_term)
298
299
300if __name__ == '__main__':
301  unittest.main()
302