1#!/usr/bin/python
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Tool to validate code in prod branch before pushing to lab.
8
9The script runs push_to_prod suite to verify code in prod branch is ready to be
10pushed. Link to design document:
11https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
12
13To verify if prod branch can be pushed to lab, run following command in
14chromeos-staging-master2.hot server:
15/usr/local/autotest/site_utils/test_push.py -e someone@company.com
16
17The script uses latest gandof stable build as test build by default.
18
19"""
20
21import argparse
22import ast
23import datetime
24import getpass
25import multiprocessing
26import os
27import re
28import subprocess
29import sys
30import time
31import traceback
32import urllib2
33
34import common
35try:
36    from autotest_lib.frontend import setup_django_environment
37    from autotest_lib.frontend.afe import models
38    from autotest_lib.frontend.afe import rpc_utils
39except ImportError:
40    # Unittest may not have Django database configured and will fail to import.
41    pass
42from autotest_lib.client.common_lib import global_config
43from autotest_lib.client.common_lib import priorities
44from autotest_lib.client.common_lib.cros import retry
45from autotest_lib.frontend.afe import rpc_client_lib
46from autotest_lib.server import constants
47from autotest_lib.server import site_utils
48from autotest_lib.server import utils
49from autotest_lib.server.cros import provision
50from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
51from autotest_lib.site_utils import test_push_common
52
53AUTOTEST_DIR=common.autotest_dir
54CONFIG = global_config.global_config
55
56AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
57TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
58
59MAIL_FROM = 'chromeos-test@google.com'
60BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
61RUN_SUITE_COMMAND = 'run_suite.py'
62PUSH_TO_PROD_SUITE = 'push_to_prod'
63DUMMY_SUITE = 'dummy'
64DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
65IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
66DEFAULT_NUM_DUTS = (
67        ('gandof', 4),
68        ('quawks', 2),
69)
70
71SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
72                              'tab_id=view_job&object_id=(\d+)$')
73
74URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
75URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
76
77# Some test could be extra / missing or have mismatched results for various
78# reasons. Add such test in this list and explain the reason.
79_IGNORED_TESTS = [
80    # test_push uses a stable image build to test, which is quite behind ToT.
81    # The following expectations are correct at ToT, but need to be ignored
82    # until stable image is recent enough.
83
84    # TODO(pprabhu): Remove once R70 is stable.
85    'dummy_Fail.RetrySuccess',
86    'dummy_Fail.RetryFail',
87]
88
89# Multiprocessing proxy objects that are used to share data between background
90# suite-running processes and main process. The multiprocessing-compatible
91# versions are initialized in _main.
92_run_suite_output = []
93_all_suite_ids = []
94
95DEFAULT_SERVICE_RESPAWN_LIMIT = 2
96
97
98class TestPushException(Exception):
99    """Exception to be raised when the test to push to prod failed."""
100    pass
101
102@retry.retry(TestPushException, timeout_min=5, delay_sec=30)
103def check_dut_inventory(required_num_duts, pool):
104    """Check DUT inventory for each board in the pool specified..
105
106    @param required_num_duts: a dict specifying the number of DUT each platform
107                              requires in order to finish push tests.
108    @param pool: the pool used by test_push.
109    @raise TestPushException: if number of DUTs are less than the requirement.
110    """
111    print 'Checking DUT inventory...'
112    pool_label = constants.Labels.POOL_PREFIX + pool
113    hosts = AFE.run('get_hosts', status='Ready', locked=False)
114    hosts = [h for h in hosts if pool_label in h.get('labels', [])]
115    platforms = [host['platform'] for host in hosts]
116    current_inventory = {p : platforms.count(p) for p in platforms}
117    error_msg = ''
118    for platform, req_num in required_num_duts.items():
119        curr_num = current_inventory.get(platform, 0)
120        if curr_num < req_num:
121            error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
122                          ' now' % (req_num, platform, pool, curr_num))
123    if error_msg:
124        raise TestPushException('Not enough DUTs to run push tests. %s' %
125                                error_msg)
126
127
128def powerwash_dut_to_test_repair(hostname, timeout):
129    """Powerwash dut to test repair workflow.
130
131    @param hostname: hostname of the dut.
132    @param timeout: seconds of the powerwash test to hit timeout.
133    @raise TestPushException: if DUT fail to run the test.
134    """
135    t = models.Test.objects.get(name='platform_Powerwash')
136    c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path))
137    job_id = rpc_utils.create_job_common(
138             'powerwash', priority=priorities.Priority.SUPER,
139             control_type='Server', control_file=c, hosts=[hostname])
140
141    end = time.time() + timeout
142    while not TKO.get_job_test_statuses_from_db(job_id):
143        if time.time() >= end:
144            AFE.run('abort_host_queue_entries', job=job_id)
145            raise TestPushException(
146                'Powerwash test on %s timeout after %ds, abort it.' %
147                (hostname, timeout))
148        time.sleep(10)
149    verify_test_results(job_id,
150                        test_push_common.EXPECTED_TEST_RESULTS_POWERWASH)
151    # Kick off verify, verify will fail and a repair should be triggered.
152    AFE.reverify_hosts(hostnames=[hostname])
153
154
155def reverify_all_push_duts():
156    """Reverify all the push DUTs."""
157    print 'Reverifying all DUTs.'
158    hosts = [h.hostname for h in AFE.get_hosts()]
159    AFE.reverify_hosts(hostnames=hosts)
160
161
162def parse_arguments(argv):
163    """Parse arguments for test_push tool.
164
165    @param argv   Argument vector, as for `sys.argv`, including the
166                  command name in `argv[0]`.
167    @return: Parsed arguments.
168
169    """
170    parser = argparse.ArgumentParser(prog=argv[0])
171    parser.add_argument('-b', '--board', dest='board', default='gandof',
172                        help='Default is gandof.')
173    parser.add_argument('-sb', '--shard_board', dest='shard_board',
174                        default='quawks',
175                        help='Default is quawks.')
176    parser.add_argument('-i', '--build', dest='build', default=None,
177                        help='Default is the latest stale build of given '
178                             'board. Must be a stable build, otherwise AU test '
179                             'will fail. (ex: gandolf-release/R54-8743.25.0)')
180    parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
181                        help='Default is the latest stable build of given '
182                             'board. Must be a stable build, otherwise AU test '
183                             'will fail.')
184    parser.add_argument('-p', '--pool', dest='pool', default='bvt')
185    parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
186                        default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
187                        help='Time in mins to wait before abort the jobs we '
188                             'are waiting on. Only for the asynchronous suites '
189                             'triggered by create_and_return flag.')
190    parser.add_argument('-ud', '--num_duts', dest='num_duts',
191                        default=dict(DEFAULT_NUM_DUTS),
192                        type=ast.literal_eval,
193                        help="Python dict literal that specifies the required"
194                        " number of DUTs for each board. E.g {'gandof':4}")
195    parser.add_argument('-c', '--continue_on_failure', action='store_true',
196                        dest='continue_on_failure',
197                        help='All tests continue to run when there is failure')
198    parser.add_argument('-sl', '--service_respawn_limit', type=int,
199                        default=DEFAULT_SERVICE_RESPAWN_LIMIT,
200                        help='If a service crashes more than this, the test '
201                             'push is considered failed.')
202
203    arguments = parser.parse_args(argv[1:])
204
205    # Get latest stable build as default build.
206    version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE)
207    if not arguments.build:
208        arguments.build = version_map.get_image_name(arguments.board)
209    if not arguments.shard_build:
210        arguments.shard_build = version_map.get_image_name(
211            arguments.shard_board)
212    return arguments
213
214
215def do_run_suite(suite_name, arguments, use_shard=False,
216                 create_and_return=False):
217    """Call run_suite to run a suite job, and return the suite job id.
218
219    The script waits the suite job to finish before returning the suite job id.
220    Also it will echo the run_suite output to stdout.
221
222    @param suite_name: Name of a suite, e.g., dummy.
223    @param arguments: Arguments for run_suite command.
224    @param use_shard: If true, suite is scheduled for shard board.
225    @param create_and_return: If True, run_suite just creates the suite, print
226                              the job id, then finish immediately.
227
228    @return: Suite job ID.
229
230    """
231    if use_shard:
232        board = arguments.shard_board
233        build = arguments.shard_build
234    else:
235        board = arguments.board
236        build = arguments.build
237
238    # Remove cros-version label to force provision.
239    hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
240                          locked=False)
241    for host in hosts:
242        labels_to_remove = [
243                l for l in host.labels
244                if l.startswith(provision.CROS_VERSION_PREFIX)]
245        if labels_to_remove:
246            AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
247
248        # Test repair work flow on shards, powerwash test will timeout after 7m.
249        if use_shard and not create_and_return:
250            powerwash_dut_to_test_repair(host.hostname, timeout=420)
251
252    current_dir = os.path.dirname(os.path.realpath(__file__))
253    cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
254           '-s', suite_name,
255           '-b', board,
256           '-i', build,
257           '-p', arguments.pool,
258           '--minimum_duts', str(arguments.num_duts[board])]
259    if create_and_return:
260        cmd += ['-c']
261
262    suite_job_id = None
263
264    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
265                            stderr=subprocess.STDOUT)
266
267    while True:
268        line = proc.stdout.readline()
269
270        # Break when run_suite process completed.
271        if not line and proc.poll() != None:
272            break
273        print line.rstrip()
274        _run_suite_output.append(line.rstrip())
275
276        if not suite_job_id:
277            m = re.match(SUITE_JOB_START_INFO_REGEX, line)
278            if m and m.group(1):
279                suite_job_id = int(m.group(1))
280                _all_suite_ids.append(suite_job_id)
281
282    if not suite_job_id:
283        raise TestPushException('Failed to retrieve suite job ID.')
284
285    # If create_and_return specified, wait for the suite to finish.
286    if create_and_return:
287        end = time.time() + arguments.timeout_min * 60
288        while not AFE.get_jobs(id=suite_job_id, finished=True):
289            if time.time() < end:
290                time.sleep(10)
291            else:
292                AFE.run('abort_host_queue_entries', job=suite_job_id)
293                raise TestPushException(
294                        'Asynchronous suite triggered by create_and_return '
295                        'flag has timed out after %d mins. Aborting it.' %
296                        arguments.timeout_min)
297
298    print 'Suite job %s is completed.' % suite_job_id
299    return suite_job_id
300
301
302def check_dut_image(build, suite_job_id):
303    """Confirm all DUTs used for the suite are imaged to expected build.
304
305    @param build: Expected build to be imaged.
306    @param suite_job_id: job ID of the suite job.
307    @raise TestPushException: If a DUT does not have expected build imaged.
308    """
309    print 'Checking image installed in DUTs...'
310    job_ids = [job.id for job in
311               models.Job.objects.filter(parent_job_id=suite_job_id)]
312    hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
313            for job_id in job_ids]
314    hostnames = set([hqe.host.hostname for hqe in hqes])
315    for hostname in hostnames:
316        found_build = site_utils.get_build_from_afe(hostname, AFE)
317        if found_build != build:
318            raise TestPushException('DUT is not imaged properly. Host %s has '
319                                    'build %s, while build %s is expected.' %
320                                    (hostname, found_build, build))
321
322
323def test_suite(suite_name, expected_results, arguments, use_shard=False,
324               create_and_return=False):
325    """Call run_suite to start a suite job and verify results.
326
327    @param suite_name: Name of a suite, e.g., dummy
328    @param expected_results: A dictionary of test name to test result.
329    @param arguments: Arguments for run_suite command.
330    @param use_shard: If true, suite is scheduled for shard board.
331    @param create_and_return: If True, run_suite just creates the suite, print
332                              the job id, then finish immediately.
333    """
334    suite_job_id = do_run_suite(suite_name, arguments, use_shard,
335                                create_and_return)
336
337    # Confirm all DUTs used for the suite are imaged to expected build.
338    # hqe.host_id for jobs running in shard is not synced back to master db,
339    # therefore, skip verifying dut build for jobs running in shard.
340    build_expected = arguments.build
341    if not use_shard:
342        check_dut_image(build_expected, suite_job_id)
343
344    # Verify test results are the expected results.
345    verify_test_results(suite_job_id, expected_results)
346
347
348def verify_test_results(job_id, expected_results):
349    """Verify the test results with the expected results.
350
351    @param job_id: id of the running jobs. For suite job, it is suite_job_id.
352    @param expected_results: A dictionary of test name to test result.
353    @raise TestPushException: If verify fails.
354    """
355    print 'Comparing test results...'
356    test_views = site_utils.get_test_views_from_tko(job_id, TKO)
357    summary = test_push_common.summarize_push(test_views, expected_results,
358                                              _IGNORED_TESTS)
359
360    # Test link to log can be loaded.
361    job_name = '%s-%s' % (job_id, getpass.getuser())
362    log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name)
363    try:
364        urllib2.urlopen(log_link).read()
365    except urllib2.URLError:
366        summary.append('Failed to load page for link to log: %s.' % log_link)
367
368    if summary:
369        raise TestPushException('\n'.join(summary))
370
371def test_suite_wrapper(queue, suite_name, expected_results, arguments,
372                       use_shard=False, create_and_return=False):
373    """Wrapper to call test_suite. Handle exception and pipe it to parent
374    process.
375
376    @param queue: Queue to save exception to be accessed by parent process.
377    @param suite_name: Name of a suite, e.g., dummy
378    @param expected_results: A dictionary of test name to test result.
379    @param arguments: Arguments for run_suite command.
380    @param use_shard: If true, suite is scheduled for shard board.
381    @param create_and_return: If True, run_suite just creates the suite, print
382                              the job id, then finish immediately.
383    """
384    try:
385        test_suite(suite_name, expected_results, arguments, use_shard,
386                   create_and_return)
387    except Exception:
388        # Store the whole exc_info leads to a PicklingError.
389        except_type, except_value, tb = sys.exc_info()
390        queue.put((except_type, except_value, traceback.extract_tb(tb)))
391
392
393def check_queue(queue):
394    """Check the queue for any exception being raised.
395
396    @param queue: Queue used to store exception for parent process to access.
397    @raise: Any exception found in the queue.
398    """
399    if queue.empty():
400        return
401    exc_info = queue.get()
402    # Raise the exception with original backtrace.
403    print 'Original stack trace of the exception:\n%s' % exc_info[2]
404    raise exc_info[0](exc_info[1])
405
406
407def _run_test_suites(arguments):
408    """Run the actual tests that comprise the test_push."""
409    # Use daemon flag will kill child processes when parent process fails.
410    use_daemon = not arguments.continue_on_failure
411    queue = multiprocessing.Queue()
412
413    push_to_prod_suite = multiprocessing.Process(
414            target=test_suite_wrapper,
415            args=(queue, PUSH_TO_PROD_SUITE,
416                  test_push_common.EXPECTED_TEST_RESULTS, arguments))
417    push_to_prod_suite.daemon = use_daemon
418    push_to_prod_suite.start()
419
420    # suite test with --create_and_return flag
421    asynchronous_suite = multiprocessing.Process(
422            target=test_suite_wrapper,
423            args=(queue, DUMMY_SUITE,
424                  test_push_common.EXPECTED_TEST_RESULTS_DUMMY,
425                  arguments, True, True))
426    asynchronous_suite.daemon = True
427    asynchronous_suite.start()
428
429    while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive():
430        check_queue(queue)
431        time.sleep(5)
432    check_queue(queue)
433    push_to_prod_suite.join()
434    asynchronous_suite.join()
435
436
437def check_service_crash(respawn_limit, start_time):
438  """Check whether scheduler or host_scheduler crash during testing.
439
440  Since the testing push is kicked off at the beginning of a given hour, the way
441  to check whether a service is crashed is to check whether the times of the
442  service being respawn during testing push is over the respawn_limit.
443
444  @param respawn_limit: The maximum number of times the service is allowed to
445                        be respawn.
446  @param start_time: The time that testing push is kicked off.
447  """
448  def _parse(filename_prefix, filename):
449    """Helper method to parse the time of the log.
450
451    @param filename_prefix: The prefix of the filename.
452    @param filename: The name of the log file.
453    """
454    return datetime.datetime.strptime(filename[len(filename_prefix):],
455                                      "%Y-%m-%d-%H.%M.%S")
456
457  services = ['scheduler', 'host_scheduler']
458  logs = os.listdir('%s/logs/' % AUTOTEST_DIR)
459  curr_time = datetime.datetime.now()
460
461  error_msg = ''
462  for service in services:
463    log_prefix = '%s.log.' % service
464    respawn_count = sum(1 for l in logs if l.startswith(log_prefix)
465                        and start_time <= _parse(log_prefix, l) <= curr_time)
466
467    if respawn_count > respawn_limit:
468      error_msg += ('%s has been respawned %s times during testing push at %s. '
469                    'It is very likely crashed. Please check!\n' %
470                    (service, respawn_count,
471                     start_time.strftime("%Y-%m-%d-%H")))
472  if error_msg:
473    raise TestPushException(error_msg)
474
475
476_SUCCESS_MSG = """
477All staging tests completed successfully.
478
479Instructions for pushing to prod are available at
480https://goto.google.com/autotest-to-prod
481"""
482
483
484def _main(arguments):
485    """Run test and promote repo branches if tests succeed.
486
487    @param arguments: command line arguments.
488    """
489
490    # TODO Use chromite.lib.parallel.Manager instead, to workaround the
491    # too-long-tmp-path problem.
492    mpmanager = multiprocessing.Manager()
493    # These are globals used by other functions in this module to communicate
494    # back from worker processes.
495    global _run_suite_output
496    _run_suite_output = mpmanager.list()
497    global _all_suite_ids
498    _all_suite_ids = mpmanager.list()
499
500    try:
501        start_time = datetime.datetime.now()
502        reverify_all_push_duts()
503        time.sleep(15) # Wait for the verify test to start.
504        check_dut_inventory(arguments.num_duts, arguments.pool)
505        _run_test_suites(arguments)
506        check_service_crash(arguments.service_respawn_limit, start_time)
507        print _SUCCESS_MSG
508    except Exception:
509        # Abort running jobs unless flagged to continue when there is a failure.
510        if not arguments.continue_on_failure:
511            for suite_id in _all_suite_ids:
512                if AFE.get_jobs(id=suite_id, finished=False):
513                    AFE.run('abort_host_queue_entries', job=suite_id)
514        raise
515
516
517def main():
518    """Entry point."""
519    arguments = parse_arguments(sys.argv)
520    _main(arguments)
521
522
523if __name__ == '__main__':
524    main()
525