1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import datetime
6import difflib
7import hashlib
8import logging
9import operator
10import os
11import re
12import traceback
13import sys
14
15import common
16
17from autotest_lib.frontend.afe.json_rpc import proxy
18from autotest_lib.client.common_lib import control_data
19from autotest_lib.client.common_lib import enum
20from autotest_lib.client.common_lib import error
21from autotest_lib.client.common_lib import global_config
22from autotest_lib.client.common_lib import priorities
23from autotest_lib.client.common_lib import site_utils
24from autotest_lib.client.common_lib import time_utils
25from autotest_lib.client.common_lib import utils
26from autotest_lib.frontend.afe.json_rpc import proxy
27from autotest_lib.server.cros import provision
28from autotest_lib.server.cros.dynamic_suite import constants
29from autotest_lib.server.cros.dynamic_suite import control_file_getter
30from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
31from autotest_lib.server.cros.dynamic_suite import job_status
32from autotest_lib.server.cros.dynamic_suite import reporting
33from autotest_lib.server.cros.dynamic_suite import reporting_utils
34from autotest_lib.server.cros.dynamic_suite import tools
35from autotest_lib.server.cros.dynamic_suite.job_status import Status
36
37try:
38    from chromite.lib import boolparse_lib
39    from chromite.lib import cros_logging as logging
40except ImportError:
41    print 'Unable to import chromite.'
42    print 'This script must be either:'
43    print '  - Be run in the chroot.'
44    print '  - (not yet supported) be run after running '
45    print '    ../utils/build_externals.py'
46
47_FILE_BUG_SUITES = ['au', 'bvt', 'bvt-cq', 'bvt-inline', 'paygen_au_beta',
48                    'paygen_au_canary', 'paygen_au_dev', 'paygen_au_stable',
49                    'sanity', 'push_to_prod']
50_AUTOTEST_DIR = global_config.global_config.get_config_value(
51        'SCHEDULER', 'drone_installation_directory')
52
53
54class RetryHandler(object):
55    """Maintain retry information.
56
57    @var _retry_map: A dictionary that stores retry history.
58            The key is afe job id. The value is a dictionary.
59            {job_id: {'state':RetryHandler.States, 'retry_max':int}}
60            - state:
61                The retry state of a job.
62                NOT_ATTEMPTED:
63                    We haven't done anything about the job.
64                ATTEMPTED:
65                    We've made an attempt to schedule a retry job. The
66                    scheduling may or may not be successful, e.g.
67                    it might encounter an rpc error. Note failure
68                    in scheduling a retry is different from a retry job failure.
69                    For each job, we only attempt to schedule a retry once.
70                    For example, assume we have a test with JOB_RETRIES=5 and
71                    its second retry job failed. When we attempt to create
72                    a third retry job to retry the second, we hit an rpc
73                    error. In such case, we will give up on all following
74                    retries.
75                RETRIED:
76                    A retry job has already been successfully
77                    scheduled.
78            - retry_max:
79                The maximum of times the job can still
80                be retried, taking into account retries
81                that have occurred.
82    @var _retry_level: A retry might be triggered only if the result
83            is worse than the level.
84    @var _max_retries: Maximum retry limit at suite level.
85                     Regardless how many times each individual test
86                     has been retried, the total number of retries happening in
87                     the suite can't exceed _max_retries.
88    """
89
90    States = enum.Enum('NOT_ATTEMPTED', 'ATTEMPTED', 'RETRIED',
91                       start_value=1, step=1)
92
93    def __init__(self, initial_jobs_to_tests, retry_level='WARN',
94                 max_retries=None):
95        """Initialize RetryHandler.
96
97        @param initial_jobs_to_tests: A dictionary that maps a job id to
98                a ControlData object. This dictionary should contain
99                jobs that are originally scheduled by the suite.
100        @param retry_level: A retry might be triggered only if the result is
101                worse than the level.
102        @param max_retries: Integer, maxmium total retries allowed
103                                  for the suite. Default to None, no max.
104        """
105        self._retry_map = {}
106        self._retry_level = retry_level
107        self._max_retries = (max_retries
108                             if max_retries is not None else sys.maxint)
109        for job_id, test in initial_jobs_to_tests.items():
110            if test.job_retries > 0:
111                self.add_job(new_job_id=job_id,
112                             retry_max=test.job_retries)
113
114
115    def add_job(self, new_job_id, retry_max):
116        """Add a newly-created job to the retry map.
117
118        @param new_job_id: The afe_job_id of a newly created job.
119        @param retry_max: The maximum of times that we could retry
120                          the test if the job fails.
121
122        @raises ValueError if new_job_id is already in retry map.
123
124        """
125        if new_job_id in self._retry_map:
126            raise ValueError('add_job called when job is already in retry map.')
127
128        self._retry_map[new_job_id] = {
129                'state': self.States.NOT_ATTEMPTED,
130                'retry_max': retry_max}
131
132
133    def suite_max_reached(self):
134        """Return whether maximum retry limit for a suite has been reached."""
135        return self._max_retries <= 0
136
137
138    def should_retry(self, result):
139        """Check whether we should retry a job based on its result.
140
141        We will retry the job that corresponds to the result
142        when all of the following are true.
143        a) The test was actually executed, meaning that if
144           a job was aborted before it could ever reach the state
145           of 'Running', the job will not be retried.
146        b) The result is worse than |self._retry_level| which
147           defaults to 'WARN'.
148        c) The test requires retry, i.e. the job has an entry in the retry map.
149        d) We haven't made any retry attempt yet, i.e. state == NOT_ATTEMPTED
150           Note that if a test has JOB_RETRIES=5, and the second time
151           it was retried it hit an rpc error, we will give up on
152           all following retries.
153        e) The job has not reached its retry max, i.e. retry_max > 0
154
155        @param result: A result, encapsulating the status of the job.
156
157        @returns: True if we should retry the job.
158
159        """
160        if (self.suite_max_reached() or not result.test_executed or
161            not result.is_worse_than(
162                job_status.Status(self._retry_level, '', 'reason'))):
163            return False
164        failed_job_id = result.id
165        return (failed_job_id in self._retry_map and
166                self._retry_map[failed_job_id]['state'] ==
167                        self.States.NOT_ATTEMPTED and
168                self._retry_map[failed_job_id]['retry_max'] > 0)
169
170
171    def add_retry(self, old_job_id, new_job_id):
172        """Record a retry.
173
174        Update retry map with the retry information.
175
176        @param old_job_id: The afe_job_id of the job that is retried.
177        @param new_job_id: The afe_job_id of the retry job.
178
179        @raises KeyError if old_job_id isn't in the retry map.
180        @raises ValueError if we have already retried or made an attempt
181                to retry the old job.
182
183        """
184        old_record = self._retry_map[old_job_id]
185        if old_record['state'] != self.States.NOT_ATTEMPTED:
186            raise ValueError(
187                    'We have already retried or attempted to retry job %d' %
188                    old_job_id)
189        old_record['state'] = self.States.RETRIED
190        self.add_job(new_job_id=new_job_id,
191                     retry_max=old_record['retry_max'] - 1)
192        self._max_retries -= 1
193
194
195    def set_attempted(self, job_id):
196        """Set the state of the job to ATTEMPTED.
197
198        @param job_id: afe_job_id of a job.
199
200        @raises KeyError if job_id isn't in the retry map.
201        @raises ValueError if the current state is not NOT_ATTEMPTED.
202
203        """
204        current_state = self._retry_map[job_id]['state']
205        if current_state != self.States.NOT_ATTEMPTED:
206            # We are supposed to retry or attempt to retry each job
207            # only once. Raise an error if this is not the case.
208            raise ValueError('Unexpected state transition: %s -> %s' %
209                             (self.States.get_string(current_state),
210                              self.States.get_string(self.States.ATTEMPTED)))
211        else:
212            self._retry_map[job_id]['state'] = self.States.ATTEMPTED
213
214
215    def has_following_retry(self, result):
216        """Check whether there will be a following retry.
217
218        We have the following cases for a given job id (result.id),
219        - no retry map entry -> retry not required, no following retry
220        - has retry map entry:
221            - already retried -> has following retry
222            - has not retried
223                (this branch can be handled by checking should_retry(result))
224                - retry_max == 0 --> the last retry job, no more retry
225                - retry_max > 0
226                   - attempted, but has failed in scheduling a
227                     following retry due to rpc error  --> no more retry
228                   - has not attempped --> has following retry if test failed.
229
230        @param result: A result, encapsulating the status of the job.
231
232        @returns: True, if there will be a following retry.
233                  False otherwise.
234
235        """
236        return (result.test_executed and result.id in self._retry_map and (
237                self._retry_map[result.id]['state'] == self.States.RETRIED or
238                self.should_retry(result)))
239
240
241    def get_retry_max(self, job_id):
242        """Get the maximum times the job can still be retried.
243
244        @param job_id: afe_job_id of a job.
245
246        @returns: An int, representing the maximum times the job can still be
247                  retried.
248        @raises KeyError if job_id isn't in the retry map.
249
250        """
251        return self._retry_map[job_id]['retry_max']
252
253
254class Suite(object):
255    """
256    A suite of tests, defined by some predicate over control file variables.
257
258    Given a place to search for control files a predicate to match the desired
259    tests, can gather tests and fire off jobs to run them, and then wait for
260    results.
261
262    @var _predicate: a function that should return True when run over a
263         ControlData representation of a control file that should be in
264         this Suite.
265    @var _tag: a string with which to tag jobs run in this suite.
266    @var _builds: the builds on which we're running this suite.
267    @var _afe: an instance of AFE as defined in server/frontend.py.
268    @var _tko: an instance of TKO as defined in server/frontend.py.
269    @var _jobs: currently scheduled jobs, if any.
270    @var _jobs_to_tests: a dictionary that maps job ids to tests represented
271                         ControlData objects.
272    @var _cf_getter: a control_file_getter.ControlFileGetter
273    @var _retry: a bool value indicating whether jobs should be retried on
274                 failure.
275    @var _retry_handler: a RetryHandler object.
276
277    """
278
279
280    @staticmethod
281    def create_ds_getter(build, devserver):
282        """
283        @param build: the build on which we're running this suite.
284        @param devserver: the devserver which contains the build.
285        @return a FileSystemGetter instance that looks under |autotest_dir|.
286        """
287        return control_file_getter.DevServerGetter(build, devserver)
288
289
290    @staticmethod
291    def create_fs_getter(autotest_dir):
292        """
293        @param autotest_dir: the place to find autotests.
294        @return a FileSystemGetter instance that looks under |autotest_dir|.
295        """
296        # currently hard-coded places to look for tests.
297        subpaths = ['server/site_tests', 'client/site_tests',
298                    'server/tests', 'client/tests']
299        directories = [os.path.join(autotest_dir, p) for p in subpaths]
300        return control_file_getter.FileSystemGetter(directories)
301
302
303    @staticmethod
304    def parse_tag(tag):
305        """Splits a string on ',' optionally surrounded by whitespace.
306        @param tag: string to split.
307        """
308        return map(lambda x: x.strip(), tag.split(','))
309
310
311    @staticmethod
312    def name_in_tag_predicate(name):
313        """Returns predicate that takes a control file and looks for |name|.
314
315        Builds a predicate that takes in a parsed control file (a ControlData)
316        and returns True if the SUITE tag is present and contains |name|.
317
318        @param name: the suite name to base the predicate on.
319        @return a callable that takes a ControlData and looks for |name| in that
320                ControlData object's suite member.
321        """
322        return lambda t: (hasattr(t, 'suite') and
323                          name in Suite.parse_tag(t.suite))
324
325
326    @staticmethod
327    def name_in_tag_similarity_predicate(name):
328        """Returns predicate that takes a control file and gets the similarity
329        of the suites in the control file and the given name.
330
331        Builds a predicate that takes in a parsed control file (a ControlData)
332        and returns a list of tuples of (suite name, ratio), where suite name
333        is each suite listed in the control file, and ratio is the similarity
334        between each suite and the given name.
335
336        @param name: the suite name to base the predicate on.
337        @return a callable that takes a ControlData and returns a list of tuples
338                of (suite name, ratio), where suite name is each suite listed in
339                the control file, and ratio is the similarity between each suite
340                and the given name.
341        """
342        return lambda t: ((None, 0) if not hasattr(t, 'suite') else
343                          [(suite,
344                            difflib.SequenceMatcher(a=suite, b=name).ratio())
345                           for suite in Suite.parse_tag(t.suite)])
346
347
348    @staticmethod
349    def not_in_blacklist_predicate(blacklist):
350        """Returns predicate that takes a control file and looks for its
351        path to not be in given blacklist.
352
353        @param blacklist: A list of strings both paths on control_files that
354                          should be blacklisted.
355
356        @return a callable that takes a ControlData and looks for it to be
357                absent from blacklist.
358        """
359        return lambda t: hasattr(t, 'path') and \
360                         not any(b.endswith(t.path) for b in blacklist)
361
362
363    @staticmethod
364    def test_name_equals_predicate(test_name):
365        """Returns predicate that matched based on a test's name.
366
367        Builds a predicate that takes in a parsed control file (a ControlData)
368        and returns True if the test name is equal to |test_name|.
369
370        @param test_name: the test name to base the predicate on.
371        @return a callable that takes a ControlData and looks for |test_name|
372                in that ControlData's name.
373        """
374        return lambda t: hasattr(t, 'name') and test_name == t.name
375
376
377    @staticmethod
378    def test_name_matches_pattern_predicate(test_name_pattern):
379        """Returns predicate that matches based on a test's name pattern.
380
381        Builds a predicate that takes in a parsed control file (a ControlData)
382        and returns True if the test name matches the given regular expression.
383
384        @param test_name_pattern: regular expression (string) to match against
385                                  test names.
386        @return a callable that takes a ControlData and returns
387                True if the name fields matches the pattern.
388        """
389        return lambda t: hasattr(t, 'name') and re.match(test_name_pattern,
390                                                         t.name)
391
392
393    @staticmethod
394    def test_file_matches_pattern_predicate(test_file_pattern):
395        """Returns predicate that matches based on a test's file name pattern.
396
397        Builds a predicate that takes in a parsed control file (a ControlData)
398        and returns True if the test's control file name matches the given
399        regular expression.
400
401        @param test_file_pattern: regular expression (string) to match against
402                                  control file names.
403        @return a callable that takes a ControlData and and returns
404                True if control file name matches the pattern.
405        """
406        return lambda t: hasattr(t, 'path') and re.match(test_file_pattern,
407                                                         t.path)
408
409
410    @staticmethod
411    def matches_attribute_expression_predicate(test_attr_boolstr):
412        """Returns predicate that matches based on boolean expression of
413        attributes.
414
415        Builds a predicate that takes in a parsed control file (a ControlData)
416        ans returns True if the test attributes satisfy the given attribute
417        boolean expression.
418
419        @param test_attr_boolstr: boolean expression of the attributes to be
420                                  test, like 'system:all and interval:daily'.
421
422        @return a callable that takes a ControlData and returns True if the test
423                attributes satisfy the given boolean expression.
424        """
425        return lambda t: boolparse_lib.BoolstrResult(
426            test_attr_boolstr, t.attributes)
427
428    @staticmethod
429    def test_name_similarity_predicate(test_name):
430        """Returns predicate that matched based on a test's name.
431
432        Builds a predicate that takes in a parsed control file (a ControlData)
433        and returns a tuple of (test name, ratio), where ratio is the similarity
434        between the test name and the given test_name.
435
436        @param test_name: the test name to base the predicate on.
437        @return a callable that takes a ControlData and returns a tuple of
438                (test name, ratio), where ratio is the similarity between the
439                test name and the given test_name.
440        """
441        return lambda t: ((None, 0) if not hasattr(t, 'name') else
442                (t.name,
443                 difflib.SequenceMatcher(a=t.name, b=test_name).ratio()))
444
445
446    @staticmethod
447    def test_file_similarity_predicate(test_file_pattern):
448        """Returns predicate that gets the similarity based on a test's file
449        name pattern.
450
451        Builds a predicate that takes in a parsed control file (a ControlData)
452        and returns a tuple of (file path, ratio), where ratio is the
453        similarity between the test file name and the given test_file_pattern.
454
455        @param test_file_pattern: regular expression (string) to match against
456                                  control file names.
457        @return a callable that takes a ControlData and and returns a tuple of
458                (file path, ratio), where ratio is the similarity between the
459                test file name and the given test_file_pattern.
460        """
461        return lambda t: ((None, 0) if not hasattr(t, 'path') else
462                (t.path, difflib.SequenceMatcher(a=t.path,
463                                                 b=test_file_pattern).ratio()))
464
465
466    @staticmethod
467    def list_all_suites(build, devserver, cf_getter=None):
468        """
469        Parses all ControlData objects with a SUITE tag and extracts all
470        defined suite names.
471
472        @param build: the build on which we're running this suite.
473        @param devserver: the devserver which contains the build.
474        @param cf_getter: control_file_getter.ControlFileGetter. Defaults to
475                          using DevServerGetter.
476
477        @return list of suites
478        """
479        if cf_getter is None:
480            cf_getter = Suite.create_ds_getter(build, devserver)
481
482        suites = set()
483        predicate = lambda t: hasattr(t, 'suite')
484        for test in Suite.find_and_parse_tests(cf_getter, predicate,
485                                               add_experimental=True):
486            suites.update(Suite.parse_tag(test.suite))
487        return list(suites)
488
489
490    @staticmethod
491    def get_test_source_build(builds, **dargs):
492        """Get the build of test code.
493
494        Get the test source build from arguments. If parameter
495        `test_source_build` is set and has a value, return its value. Otherwise
496        returns the ChromeOS build name if it exists. If ChromeOS build is not
497        specified either, raise SuiteArgumentException.
498
499        @param builds: the builds on which we're running this suite. It's a
500                       dictionary of version_prefix:build.
501        @param **dargs: Any other Suite constructor parameters, as described
502                        in Suite.__init__ docstring.
503
504        @return: The build contains the test code.
505        @raise: SuiteArgumentException if both test_source_build and ChromeOS
506                build are not specified.
507
508        """
509        if dargs.get('test_source_build', None):
510            return dargs['test_source_build']
511        test_source_build = builds.get(provision.CROS_VERSION_PREFIX, None)
512        if not test_source_build:
513            raise error.SuiteArgumentException(
514                    'test_source_build must be specified if CrOS build is not '
515                    'specified.')
516        return test_source_build
517
518
519    @staticmethod
520    def create_from_predicates(predicates, builds, board, devserver,
521                               cf_getter=None, name='ad_hoc_suite',
522                               run_prod_code=False, **dargs):
523        """
524        Create a Suite using a given predicate test filters.
525
526        Uses supplied predicate(s) to instantiate a Suite. Looks for tests in
527        |autotest_dir| and will schedule them using |afe|.  Pulls control files
528        from the default dev server. Results will be pulled from |tko| upon
529        completion.
530
531        @param predicates: A list of callables that accept ControlData
532                           representations of control files. A test will be
533                           included in suite if all callables in this list
534                           return True on the given control file.
535        @param builds: the builds on which we're running this suite. It's a
536                       dictionary of version_prefix:build.
537        @param board: the board on which we're running this suite.
538        @param devserver: the devserver which contains the build.
539        @param cf_getter: control_file_getter.ControlFileGetter. Defaults to
540                          using DevServerGetter.
541        @param name: name of suite. Defaults to 'ad_hoc_suite'
542        @param run_prod_code: If true, the suite will run the tests that
543                              lives in prod aka the test code currently on the
544                              lab servers.
545        @param **dargs: Any other Suite constructor parameters, as described
546                        in Suite.__init__ docstring.
547        @return a Suite instance.
548        """
549        if cf_getter is None:
550            build = Suite.get_test_source_build(builds, **dargs)
551            if run_prod_code:
552                cf_getter = Suite.create_fs_getter(_AUTOTEST_DIR)
553            else:
554                cf_getter = Suite.create_ds_getter(build, devserver)
555
556        return Suite(predicates,
557                     name, builds, board, cf_getter, run_prod_code, **dargs)
558
559
560    @staticmethod
561    def create_from_name(name, builds, board, devserver, cf_getter=None,
562                         **dargs):
563        """
564        Create a Suite using a predicate based on the SUITE control file var.
565
566        Makes a predicate based on |name| and uses it to instantiate a Suite
567        that looks for tests in |autotest_dir| and will schedule them using
568        |afe|.  Pulls control files from the default dev server.
569        Results will be pulled from |tko| upon completion.
570
571        @param name: a value of the SUITE control file variable to search for.
572        @param builds: the builds on which we're running this suite. It's a
573                       dictionary of version_prefix:build.
574        @param board: the board on which we're running this suite.
575        @param devserver: the devserver which contains the build.
576        @param cf_getter: control_file_getter.ControlFileGetter. Defaults to
577                          using DevServerGetter.
578        @param **dargs: Any other Suite constructor parameters, as described
579                        in Suite.__init__ docstring.
580        @return a Suite instance.
581        """
582        if cf_getter is None:
583            build = Suite.get_test_source_build(builds, **dargs)
584            cf_getter = Suite.create_ds_getter(build, devserver)
585
586        return Suite([Suite.name_in_tag_predicate(name)],
587                     name, builds, board, cf_getter, **dargs)
588
589
590    def __init__(self, predicates, tag, builds, board, cf_getter,
591                 run_prod_code=False, afe=None, tko=None, pool=None,
592                 results_dir=None, max_runtime_mins=24*60, timeout_mins=24*60,
593                 file_bugs=False, file_experimental_bugs=False,
594                 suite_job_id=None, ignore_deps=False, extra_deps=[],
595                 priority=priorities.Priority.DEFAULT, forgiving_parser=True,
596                 wait_for_results=True, job_retry=False,
597                 max_retries=sys.maxint, offload_failures_only=False,
598                 test_source_build=None):
599        """
600        Constructor
601
602        @param predicates: A list of callables that accept ControlData
603                           representations of control files. A test will be
604                           included in suite is all callables in this list
605                           return True on the given control file.
606        @param tag: a string with which to tag jobs run in this suite.
607        @param builds: the builds on which we're running this suite.
608        @param board: the board on which we're running this suite.
609        @param cf_getter: a control_file_getter.ControlFileGetter
610        @param afe: an instance of AFE as defined in server/frontend.py.
611        @param tko: an instance of TKO as defined in server/frontend.py.
612        @param pool: Specify the pool of machines to use for scheduling
613                purposes.
614        @param run_prod_code: If true, the suite will run the test code that
615                              lives in prod aka the test code currently on the
616                              lab servers.
617        @param results_dir: The directory where the job can write results to.
618                            This must be set if you want job_id of sub-jobs
619                            list in the job keyvals.
620        @param max_runtime_mins: Maximum suite runtime, in minutes.
621        @param timeout: Maximum job lifetime, in hours.
622        @param suite_job_id: Job id that will act as parent id to all sub jobs.
623                             Default: None
624        @param ignore_deps: True if jobs should ignore the DEPENDENCIES
625                            attribute and skip applying of dependency labels.
626                            (Default:False)
627        @param extra_deps: A list of strings which are the extra DEPENDENCIES
628                           to add to each test being scheduled.
629        @param priority: Integer priority level.  Higher is more important.
630        @param wait_for_results: Set to False to run the suite job without
631                                 waiting for test jobs to finish. Default is
632                                 True.
633        @param job_retry: A bool value indicating whether jobs should be retired
634                          on failure. If True, the field 'JOB_RETRIES' in
635                          control files will be respected. If False, do not
636                          retry.
637        @param max_retries: Maximum retry limit at suite level.
638                            Regardless how many times each individual test
639                            has been retried, the total number of retries
640                            happening in the suite can't exceed _max_retries.
641                            Default to sys.maxint.
642        @param offload_failures_only: Only enable gs_offloading for failed
643                                      jobs.
644        @param test_source_build: Build that contains the server-side test code.
645
646        """
647        def combined_predicate(test):
648            #pylint: disable-msg=C0111
649            return all((f(test) for f in predicates))
650        self._predicate = combined_predicate
651
652        self._tag = tag
653        self._builds = builds
654        self._board = board
655        self._cf_getter = cf_getter
656        self._results_dir = results_dir
657        self._afe = afe or frontend_wrappers.RetryingAFE(timeout_min=30,
658                                                         delay_sec=10,
659                                                         debug=False)
660        self._tko = tko or frontend_wrappers.RetryingTKO(timeout_min=30,
661                                                         delay_sec=10,
662                                                         debug=False)
663        self._pool = pool
664        self._jobs = []
665        self._jobs_to_tests = {}
666        self._tests = Suite.find_and_parse_tests(self._cf_getter,
667                        self._predicate, self._tag, add_experimental=True,
668                        forgiving_parser=forgiving_parser,
669                        run_prod_code=run_prod_code)
670
671        self._max_runtime_mins = max_runtime_mins
672        self._timeout_mins = timeout_mins
673        self._file_bugs = file_bugs
674        self._file_experimental_bugs = file_experimental_bugs
675        self._suite_job_id = suite_job_id
676        self._ignore_deps = ignore_deps
677        self._extra_deps = extra_deps
678        self._priority = priority
679        self._job_retry=job_retry
680        self._max_retries = max_retries
681        # RetryHandler to be initialized in schedule()
682        self._retry_handler = None
683        self.wait_for_results = wait_for_results
684        self._offload_failures_only = offload_failures_only
685        self._test_source_build = test_source_build
686
687
688    @property
689    def tests(self):
690        """
691        A list of ControlData objects in the suite, with added |text| attr.
692        """
693        return self._tests
694
695
696    def stable_tests(self):
697        """
698        |self.tests|, filtered for non-experimental tests.
699        """
700        return filter(lambda t: not t.experimental, self.tests)
701
702
703    def unstable_tests(self):
704        """
705        |self.tests|, filtered for experimental tests.
706        """
707        return filter(lambda t: t.experimental, self.tests)
708
709
710    def _create_job(self, test, retry_for=None):
711        """
712        Thin wrapper around frontend.AFE.create_job().
713
714        @param test: ControlData object for a test to run.
715        @param retry_for: If the to-be-created job is a retry for an
716                          old job, the afe_job_id of the old job will
717                          be passed in as |retry_for|, which will be
718                          recorded in the new job's keyvals.
719        @returns: A frontend.Job object with an added test_name member.
720                  test_name is used to preserve the higher level TEST_NAME
721                  name of the job.
722        """
723        if self._ignore_deps:
724            job_deps = []
725        else:
726            job_deps = list(test.dependencies)
727        if self._extra_deps:
728            job_deps.extend(self._extra_deps)
729        if self._pool:
730            job_deps.append(self._pool)
731
732        # TODO(beeps): Comletely remove the concept of a metahost.
733        # Currently we use this to distinguis a job scheduled through
734        # the afe from a suite job, as only the latter will get requeued
735        # when a special task fails.
736        job_deps.append(self._board)
737        # JOB_BUILD_KEY is default to use CrOS image, if it's not available,
738        # take the first build in the builds dictionary.
739        # test_source_build is saved to job_keyvals so scheduler can retrieve
740        # the build name from database when compiling autoserv commandline.
741        # This avoid a database change to add a new field in afe_jobs.
742        build = self._builds.get(provision.CROS_VERSION_PREFIX,
743                                 self._builds.values()[0])
744        keyvals={constants.JOB_BUILD_KEY: build,
745                 constants.JOB_SUITE_KEY: self._tag,
746                 constants.JOB_EXPERIMENTAL_KEY: test.experimental,
747                 constants.JOB_BUILDS_KEY: self._builds}
748        # Only add `test_source_build` to job keyvals if the build is different
749        # from the CrOS build or the job uses more than one build, e.g., both
750        # firmware and CrOS will be updated in the dut.
751        # This is for backwards compatibility, so the update Autotest code can
752        # compile an autoserv command line to run in a SSP container using
753        # previous builds.
754        if (self._test_source_build and
755            (build != self._test_source_build or len(self._builds) > 1)):
756            keyvals[constants.JOB_TEST_SOURCE_BUILD_KEY] = (
757                    self._test_source_build)
758            for prefix, build in self._builds.iteritems():
759                if prefix == provision.FW_RW_VERSION_PREFIX:
760                    keyvals[constants.FWRW_BUILD]= build
761                elif prefix == provision.FW_RO_VERSION_PREFIX:
762                    keyvals[constants.FWRO_BUILD] = build
763        # Add suite job id to keyvals so tko parser can read it from keyval file
764        if self._suite_job_id:
765            keyvals[constants.PARENT_JOB_ID] = self._suite_job_id
766        if retry_for:
767            # We drop the old job's id in the new job's keyval file
768            # so that later our tko parser can figure out the retring
769            # relationship and invalidate the results of the old job
770            # in tko database.
771            keyvals[constants.RETRY_ORIGINAL_JOB_ID] = retry_for
772        if self._offload_failures_only:
773            keyvals[constants.JOB_OFFLOAD_FAILURES_KEY] = True
774
775        test_obj = self._afe.create_job(
776            control_file=test.text,
777            name=tools.create_job_name(self._test_source_build or build,
778                                       self._tag, test.name),
779            control_type=test.test_type.capitalize(),
780            meta_hosts=[self._board]*test.sync_count,
781            dependencies=job_deps,
782            keyvals=keyvals,
783            max_runtime_mins=self._max_runtime_mins,
784            timeout_mins=self._timeout_mins,
785            parent_job_id=self._suite_job_id,
786            test_retry=test.retries,
787            priority=self._priority,
788            synch_count=test.sync_count,
789            require_ssp=test.require_ssp)
790
791        setattr(test_obj, 'test_name', test.name)
792
793        return test_obj
794
795
796    def _schedule_test(self, record, test, retry_for=None, ignore_errors=False):
797        """Schedule a single test and return the job.
798
799        Schedule a single test by creating a job.
800        And then update relevant data structures that are used to
801        keep track of all running jobs.
802
803        Emit TEST_NA if it failed to schedule the test due to
804        NoEligibleHostException or a non-existent board label.
805
806        @param record: A callable to use for logging.
807                       prototype: record(base_job.status_log_entry)
808        @param test: ControlData for a test to run.
809        @param retry_for: If we are scheduling a test to retry an
810                          old job, the afe_job_id of the old job
811                          will be passed in as |retry_for|.
812        @param ignore_errors: If True, when an rpc error occur, ignore
813                             the error and will return None.
814                             If False, rpc errors will be raised.
815
816        @returns: A frontend.Job object if the test is successfully scheduled.
817                  Returns None if scheduling failed due to
818                  NoEligibleHostException or a non-existent board label.
819                  Returns None if it encounters other rpc errors we don't know
820                  how to handle and ignore_errors is False.
821
822        """
823        msg = 'Scheduling %s' % test.name
824        if retry_for:
825            msg = msg + ', to retry afe job %d' % retry_for
826        logging.debug(msg)
827        begin_time_str = datetime.datetime.now().strftime(time_utils.TIME_FMT)
828        try:
829            job = self._create_job(test, retry_for=retry_for)
830        except error.NoEligibleHostException:
831            logging.debug('%s not applicable for this board/pool. '
832                          'Emitting TEST_NA.', test.name)
833            Status('TEST_NA', test.name, 'Unsatisfiable DEPENDENCIES',
834                   begin_time_str=begin_time_str).record_all(record)
835        except proxy.ValidationError as e:
836            # The goal here is to treat a dependency on a
837            # non-existent board label the same as a
838            # dependency on a board that exists, but for which
839            # there's no hardware.
840            #
841            # As of this writing, the particular case we
842            # want looks like this:
843            #  1) e.problem_keys is a dictionary
844            #  2) e.problem_keys['meta_hosts'] exists as
845            #     the only key in the dictionary.
846            #  3) e.problem_keys['meta_hosts'] matches this
847            #     pattern: "Label "board:.*" not found"
848            #
849            # We check for conditions 1) and 2) on the
850            # theory that they're relatively immutable.
851            # We don't check condition 3) because it seems
852            # likely to be a maintenance burden, and for the
853            # times when we're wrong, being right shouldn't
854            # matter enough (we _hope_).
855            #
856            # If we don't recognize the error, we pass
857            # the buck to the outer try in this function,
858            # which immediately fails the suite.
859            if (not isinstance(e.problem_keys, dict) or
860                    len(e.problem_keys) != 1 or
861                    'meta_hosts' not in e.problem_keys):
862                raise e
863            logging.debug('Validation error: %s', str(e))
864            logging.debug('Assuming label not found')
865            Status('TEST_NA', test.name, e.problem_keys.values()[0],
866                   begin_time_str=begin_time_str).record_all(record)
867        except (error.RPCException, proxy.JSONRPCException) as e:
868            if retry_for:
869                # Mark that we've attempted to retry the old job.
870                self._retry_handler.set_attempted(job_id=retry_for)
871            if ignore_errors:
872                logging.error('Failed to schedule test: %s, Reason: %s',
873                              test.name, e)
874            else:
875                raise e
876        else:
877            self._jobs.append(job)
878            self._jobs_to_tests[job.id] = test
879            if retry_for:
880                # A retry job was just created, record it.
881                self._retry_handler.add_retry(
882                        old_job_id=retry_for, new_job_id=job.id)
883                retry_count = (test.job_retries -
884                               self._retry_handler.get_retry_max(job.id))
885                logging.debug('Job %d created to retry job %d. '
886                              'Have retried for %d time(s)',
887                              job.id, retry_for, retry_count)
888            if self._results_dir:
889                self._remember_provided_job_id(job)
890            return job
891        return None
892
893
894    def schedule(self, record, add_experimental=True):
895        #pylint: disable-msg=C0111
896        """
897        Schedule jobs using |self._afe|.
898
899        frontend.Job objects representing each scheduled job will be put in
900        |self._jobs|.
901
902        @param record: A callable to use for logging.
903                       prototype: record(base_job.status_log_entry)
904        @param add_experimental: schedule experimental tests as well, or not.
905        @returns: The number of tests that were scheduled.
906        """
907        logging.debug('Discovered %d stable tests.', len(self.stable_tests()))
908        logging.debug('Discovered %d unstable tests.',
909                      len(self.unstable_tests()))
910        n_scheduled = 0
911
912        Status('INFO', 'Start %s' % self._tag).record_result(record)
913        try:
914            tests = self.stable_tests()
915            if add_experimental:
916                for test in self.unstable_tests():
917                    if not test.name.startswith(constants.EXPERIMENTAL_PREFIX):
918                        test.name = constants.EXPERIMENTAL_PREFIX + test.name
919                    tests.append(test)
920
921            for test in tests:
922                if self._schedule_test(record, test):
923                    n_scheduled += 1
924        except Exception:  # pylint: disable=W0703
925            logging.error(traceback.format_exc())
926            Status('FAIL', self._tag,
927                   'Exception while scheduling suite').record_result(record)
928
929        if self._job_retry:
930            self._retry_handler = RetryHandler(
931                    initial_jobs_to_tests=self._jobs_to_tests,
932                    max_retries=self._max_retries)
933        return n_scheduled
934
935
936    def should_report(self, result):
937        """
938        Returns True if this failure requires to be reported.
939
940        @param result: A result, encapsulating the status of the failed job.
941        @return: True if we should report this failure.
942        """
943        if self._job_retry and self._retry_handler.has_following_retry(result):
944            return False
945
946        is_not_experimental = (
947            constants.EXPERIMENTAL_PREFIX not in result._test_name and
948            constants.EXPERIMENTAL_PREFIX not in result._job_name)
949
950        return (self._file_bugs and result.test_executed and
951                (is_not_experimental or self._file_experimental_bugs) and
952                not result.is_testna() and
953                result.is_worse_than(job_status.Status('GOOD', '', 'reason')))
954
955
956    def wait(self, record, bug_template={}):
957        """
958        Polls for the job statuses, using |record| to print status when each
959        completes.
960
961        @param record: callable that records job status.
962                 prototype:
963                   record(base_job.status_log_entry)
964        @param bug_template: A template dictionary specifying the default bug
965                             filing options for failures in this suite.
966        """
967        if self._file_bugs:
968            bug_reporter = reporting.Reporter()
969        try:
970            if self._suite_job_id:
971                results_generator = job_status.wait_for_child_results(
972                        self._afe, self._tko, self._suite_job_id)
973            else:
974                logging.warning('Unknown suite_job_id, falling back to less '
975                             'efficient results_generator.')
976                results_generator = job_status.wait_for_results(self._afe,
977                                                                self._tko,
978                                                                self._jobs)
979            template = reporting_utils.BugTemplate(bug_template)
980            for result in results_generator:
981                result.record_all(record)
982                if (self._results_dir and
983                    job_status.is_for_infrastructure_fail(result)):
984                    self._remember_provided_job_id(result)
985                elif (self._results_dir and isinstance(result, Status)):
986                    self._remember_test_status_job_id(result)
987
988                if self._job_retry and self._retry_handler.should_retry(result):
989                    new_job = self._schedule_test(
990                            record=record, test=self._jobs_to_tests[result.id],
991                            retry_for=result.id, ignore_errors=True)
992                    if new_job:
993                        results_generator.send([new_job])
994
995                # TODO (fdeng): If the suite times out before a retry could
996                # finish, we would lose the chance to file a bug for the
997                # original job.
998                if self.should_report(result):
999                    job_views = self._tko.run('get_detailed_test_views',
1000                                              afe_job_id=result.id)
1001                    # Use the CrOS build for bug filing. If CrOS build is not
1002                    # specified, use the first build in the builds dictionary.
1003                    build = self._builds.get(provision.CROS_VERSION_PREFIX,
1004                                             self._builds.values()[0])
1005                    failure = reporting.TestBug(build,
1006                            site_utils.get_chrome_version(job_views),
1007                            self._tag,
1008                            result)
1009
1010                    # Try to merge with bug template in test control file.
1011                    try:
1012                        test_data = self._jobs_to_tests[result.id]
1013                        merged_template = template.finalize_bug_template(
1014                                test_data.bug_template)
1015                    except AttributeError:
1016                        # Test control file does not have bug template defined.
1017                        merged_template = bug_template
1018                    except reporting_utils.InvalidBugTemplateException as e:
1019                        merged_template = {}
1020                        logging.error('Merging bug templates failed with '
1021                                      'error: %s An empty bug template will '
1022                                      'be used.', e)
1023
1024                    # File bug when failure is one of the _FILE_BUG_SUITES,
1025                    # otherwise send an email to the owner anc cc.
1026                    if self._tag in _FILE_BUG_SUITES:
1027                        bug_id, bug_count = bug_reporter.report(failure,
1028                                                                merged_template)
1029
1030                        # We use keyvals to communicate bugs filed with
1031                        # run_suite.
1032                        if bug_id is not None:
1033                            bug_keyvals = tools.create_bug_keyvals(
1034                                    result.id, result.test_name,
1035                                    (bug_id, bug_count))
1036                            try:
1037                                utils.write_keyval(self._results_dir,
1038                                                   bug_keyvals)
1039                            except ValueError:
1040                                logging.error('Unable to log bug keyval for:%s',
1041                                              result.test_name)
1042                    else:
1043                        reporting.send_email(failure, merged_template)
1044
1045        except Exception:  # pylint: disable=W0703
1046            logging.error(traceback.format_exc())
1047            Status('FAIL', self._tag,
1048                   'Exception waiting for results').record_result(record)
1049
1050
1051    def abort(self):
1052        """
1053        Abort all scheduled test jobs.
1054        """
1055        if self._jobs:
1056            job_ids = [job.id for job in self._jobs]
1057            self._afe.run('abort_host_queue_entries', job__id__in=job_ids)
1058
1059
1060    def _remember_provided_job_id(self, job):
1061        """
1062        Record provided job as a suite job keyval, for later referencing.
1063
1064        @param job: some representation of a job, including id, test_name
1065                    and owner
1066        """
1067        if job.id and job.owner and job.test_name:
1068            job_id_owner = '%s-%s' % (job.id, job.owner)
1069            logging.debug('Adding job keyval for %s=%s',
1070                          job.test_name, job_id_owner)
1071            utils.write_keyval(
1072                self._results_dir,
1073                {hashlib.md5(job.test_name).hexdigest(): job_id_owner})
1074
1075
1076    def _remember_test_status_job_id(self, status):
1077        """
1078        Record provided status as a test status keyval, for later referencing.
1079
1080        @param status: Test status, including properties such as id, test_name
1081                       and owner.
1082        """
1083        if status.id and status.owner and status.test_name:
1084            test_id_owner = '%s-%s' % (status.id, status.owner)
1085            logging.debug('Adding status keyval for %s=%s',
1086                          status.test_name, test_id_owner)
1087            utils.write_keyval(
1088                self._results_dir,
1089                {hashlib.md5(status.test_name).hexdigest(): test_id_owner})
1090
1091
1092    @staticmethod
1093    def find_all_tests(cf_getter, suite_name='', add_experimental=False,
1094                       forgiving_parser=True, run_prod_code=False):
1095        """
1096        Function to scan through all tests and find all tests.
1097
1098        Looks at control files returned by _cf_getter.get_control_file_list()
1099        for tests that pass self._predicate(). When this method is called
1100        with a file system ControlFileGetter, it performs a full parse of the
1101        root directory associated with the getter. This is the case when it's
1102        invoked from suite_preprocessor. When it's invoked with a devserver
1103        getter it looks up the suite_name in a suite to control file map
1104        generated at build time, and parses the relevant control files alone.
1105        This lookup happens on the devserver, so as far as this method is
1106        concerned, both cases are equivalent.
1107
1108        @param cf_getter: a control_file_getter.ControlFileGetter used to list
1109               and fetch the content of control files
1110        @param suite_name: If specified, this method will attempt to restrain
1111                           the search space to just this suite's control files.
1112        @param add_experimental: add tests with experimental attribute set.
1113        @param forgiving_parser: If False, will raise ControlVariableExceptions
1114                                 if any are encountered when parsing control
1115                                 files. Note that this can raise an exception
1116                                 for syntax errors in unrelated files, because
1117                                 we parse them before applying the predicate.
1118        @param run_prod_code: If true, the suite will run the test code that
1119                              lives in prod aka the test code currently on the
1120                              lab servers by disabling SSP for the discovered
1121                              tests.
1122
1123        @raises ControlVariableException: If forgiving_parser is False and there
1124                                          is a syntax error in a control file.
1125
1126        @returns a dictionary of ControlData objects that based on given
1127                 parameters.
1128        """
1129        logging.debug('Getting control file list for suite: %s', suite_name)
1130        tests = {}
1131        files = cf_getter.get_control_file_list(suite_name=suite_name)
1132
1133        logging.debug('Parsing control files ...')
1134        matcher = re.compile(r'[^/]+/(deps|profilers)/.+')
1135        for file in filter(lambda f: not matcher.match(f), files):
1136            text = cf_getter.get_control_file_contents(file)
1137            try:
1138                found_test = control_data.parse_control_string(
1139                        text, raise_warnings=True)
1140                if not add_experimental and found_test.experimental:
1141                    continue
1142                found_test.text = text
1143                found_test.path = file
1144                if run_prod_code:
1145                    found_test.require_ssp = False
1146                tests[file] = found_test
1147            except control_data.ControlVariableException, e:
1148                if not forgiving_parser:
1149                    msg = "Failed parsing %s\n%s" % (file, e)
1150                    raise control_data.ControlVariableException(msg)
1151                logging.warning("Skipping %s\n%s", file, e)
1152            except Exception, e:
1153                logging.error("Bad %s\n%s", file, e)
1154        return tests
1155
1156
1157    @staticmethod
1158    def find_and_parse_tests(cf_getter, predicate, suite_name='',
1159                             add_experimental=False, forgiving_parser=True,
1160                             run_prod_code=False):
1161        """
1162        Function to scan through all tests and find eligible tests.
1163
1164        Search through all tests based on given cf_getter, suite_name,
1165        add_experimental and forgiving_parser, return the tests that match
1166        given predicate.
1167
1168        @param cf_getter: a control_file_getter.ControlFileGetter used to list
1169               and fetch the content of control files
1170        @param predicate: a function that should return True when run over a
1171               ControlData representation of a control file that should be in
1172               this Suite.
1173        @param suite_name: If specified, this method will attempt to restrain
1174                           the search space to just this suite's control files.
1175        @param add_experimental: add tests with experimental attribute set.
1176        @param forgiving_parser: If False, will raise ControlVariableExceptions
1177                                 if any are encountered when parsing control
1178                                 files. Note that this can raise an exception
1179                                 for syntax errors in unrelated files, because
1180                                 we parse them before applying the predicate.
1181        @param run_prod_code: If true, the suite will run the test code that
1182                              lives in prod aka the test code currently on the
1183                              lab servers by disabling SSP for the discovered
1184                              tests.
1185
1186        @raises ControlVariableException: If forgiving_parser is False and there
1187                                          is a syntax error in a control file.
1188
1189        @return list of ControlData objects that should be run, with control
1190                file text added in |text| attribute. Results are sorted based
1191                on the TIME setting in control file, slowest test comes first.
1192        """
1193        tests = Suite.find_all_tests(cf_getter, suite_name, add_experimental,
1194                                     forgiving_parser,
1195                                     run_prod_code=run_prod_code)
1196        logging.debug('Parsed %s control files.', len(tests))
1197        tests = [test for test in tests.itervalues() if predicate(test)]
1198        tests.sort(key=lambda t:
1199                   control_data.ControlData.get_test_time_index(t.time),
1200                   reverse=True)
1201        return tests
1202
1203
1204    @staticmethod
1205    def find_possible_tests(cf_getter, predicate, suite_name='', count=10):
1206        """
1207        Function to scan through all tests and find possible tests.
1208
1209        Search through all tests based on given cf_getter, suite_name,
1210        add_experimental and forgiving_parser. Use the given predicate to
1211        calculate the similarity and return the top 10 matches.
1212
1213        @param cf_getter: a control_file_getter.ControlFileGetter used to list
1214               and fetch the content of control files
1215        @param predicate: a function that should return a tuple of (name, ratio)
1216               when run over a ControlData representation of a control file that
1217               should be in this Suite. `name` is the key to be compared, e.g.,
1218               a suite name or test name. `ratio` is a value between [0,1]
1219               indicating the similarity of `name` and the value to be compared.
1220        @param suite_name: If specified, this method will attempt to restrain
1221                           the search space to just this suite's control files.
1222        @param count: Number of suggestions to return, default to 10.
1223
1224        @return list of top names that similar to the given test, sorted by
1225                match ratio.
1226        """
1227        tests = Suite.find_all_tests(cf_getter, suite_name,
1228                                     add_experimental=True,
1229                                     forgiving_parser=True)
1230        logging.debug('Parsed %s control files.', len(tests))
1231        similarities = {}
1232        for test in tests.itervalues():
1233            ratios = predicate(test)
1234            # Some predicates may return a list of tuples, e.g.,
1235            # name_in_tag_similarity_predicate. Convert all returns to a list.
1236            if not isinstance(ratios, list):
1237                ratios = [ratios]
1238            for name, ratio in ratios:
1239                similarities[name] = ratio
1240        return [s[0] for s in
1241                sorted(similarities.items(), key=operator.itemgetter(1),
1242                       reverse=True)][:count]
1243