1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Definition of a CrOS suite in skylab.
6
7This file is a simplicication of dynamic_suite.suite without any useless
8features for skylab suite.
9
10Suite class in this file mainly has 2 features:
11    1. Integrate parameters from control file & passed in arguments.
12    2. Find proper child tests for a given suite.
13
14Use case:
15    See _run_suite() in skylab_suite.run_suite_skylab.
16"""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import print_function
21
22import collections
23import logging
24import os
25
26from lucifer import autotest
27from skylab_suite import errors
28from skylab_suite import swarming_lib
29
30
31SuiteSpec = collections.namedtuple(
32        'SuiteSpec',
33        [
34                'builds',
35                'suite_name',
36                'suite_file_name',
37                'test_source_build',
38                'suite_args',
39                'priority',
40                'board',
41                'model',
42                'pool',
43                'job_keyvals',
44                'minimum_duts',
45                'timeout_mins',
46                'quota_account',
47        ])
48
49SuiteHandlerSpec = collections.namedtuple(
50        'SuiteHandlerSpec',
51        [
52                'suite_name',
53                'wait',
54                'suite_id',
55                'timeout_mins',
56                'passed_mins',
57                'test_retry',
58                'max_retries',
59                'provision_num_required',
60        ])
61
62TestHandlerSpec = collections.namedtuple(
63        'TestHandlerSpec',
64        [
65                'test_spec',
66                'remaining_retries',
67                'previous_retried_ids',
68        ])
69
70TestSpec = collections.namedtuple(
71        'TestSpec',
72        [
73                'test',
74                'priority',
75                'board',
76                'model',
77                'pool',
78                'build',
79                'keyvals',
80                # TODO(akeshet): Determine why this is necessary
81                # (can't this just be specified as its own dimension?) and
82                # delete it if it isn't necessary.
83                'bot_id',
84                'dut_name',
85                'expiration_secs',
86                'grace_period_secs',
87                'execution_timeout_secs',
88                'io_timeout_secs',
89                'quota_account',
90        ])
91
92
93class SuiteHandler(object):
94    """The class for handling a CrOS suite run.
95
96    Its responsibility includes handling retries for child tests.
97    """
98
99    def __init__(self, specs, client):
100        self._suite_name = specs.suite_name
101        self._wait = specs.wait
102        self._timeout_mins = specs.timeout_mins
103        self._provision_num_required = specs.provision_num_required
104        self._test_retry = specs.test_retry
105        self._max_retries = specs.max_retries
106        self.passed_mins = specs.passed_mins
107
108        # The swarming task id of the suite that this suite_handler is handling.
109        self._suite_id = specs.suite_id
110        # The swarming task id of current run_suite_skylab process. It could be
111        # different from self._suite_id if a suite_id is passed in.
112        self._task_id = os.environ.get('SWARMING_TASK_ID')
113        self._task_to_test_maps = {}
114        self.successfully_provisioned_duts = set()
115        self._client = client
116
117        # It only maintains the swarming task of the final run of each
118        # child task, i.e. it doesn't include failed swarming tasks of
119        # each child task which will get retried later.
120        self._active_child_tasks = []
121
122    def should_wait(self):
123        """Return whether to wait for a suite's result."""
124        return self._wait
125
126    def is_provision(self):
127        """Return whether the suite handler is for provision suite."""
128        return self._suite_name == 'provision'
129
130    def set_suite_id(self, suite_id):
131        """Set swarming task id for a suite.
132
133        @param suite_id: The swarming task id of this suite.
134        """
135        self._suite_id = suite_id
136
137    def add_test_by_task_id(self, task_id, test_handler_spec):
138        """Record a child test and its swarming task id.
139
140        @param task_id: the swarming task id of a child test.
141        @param test_handler_spec: a TestHandlerSpec object.
142        """
143        self._task_to_test_maps[task_id] = test_handler_spec
144
145    def get_test_by_task_id(self, task_id):
146        """Get a child test by its swarming task id.
147
148        @param task_id: the swarming task id of a child test.
149        """
150        return self._task_to_test_maps[task_id]
151
152    def remove_test_by_task_id(self, task_id):
153        """Delete a child test by its swarming task id.
154
155        @param task_id: the swarming task id of a child test.
156        """
157        self._task_to_test_maps.pop(task_id, None)
158
159    def set_max_retries(self, max_retries):
160        """Set the max retries for a suite.
161
162        @param max_retries: The current maximum retries to set.
163        """
164        self._max_retries = max_retries
165
166    @property
167    def task_to_test_maps(self):
168        """Get the task_to_test_maps of a suite."""
169        return self._task_to_test_maps
170
171    @property
172    def timeout_mins(self):
173        """Get the timeout minutes of a suite."""
174        return self._timeout_mins
175
176    @property
177    def suite_id(self):
178        """Get the swarming task id of a suite."""
179        return self._suite_id
180
181    @property
182    def task_id(self):
183        """Get swarming task id of current process."""
184        return self._task_id
185
186    @property
187    def max_retries(self):
188        """Get the max num of retries of a suite."""
189        return self._max_retries
190
191    def get_active_child_tasks(self, suite_id):
192        """Get the child tasks which is actively monitored by a suite.
193
194        The active child tasks list includes tasks which are currently running
195        or finished without following retries. E.g.
196        Suite task X:
197            child task 1: x1 (first try x1_1, second try x1_2)
198            child task 2: x2 (first try: x2_1)
199        The final active child task list will include task x1_2 and x2_1, won't
200        include x1_1 since it's a task which is finished but get retried later.
201        """
202        all_tasks = self._client.get_child_tasks(suite_id)
203        return [t for t in all_tasks if t['task_id'] in self._task_to_test_maps]
204
205    def handle_results(self, suite_id):
206        """Handle child tasks' results."""
207        self._active_child_tasks = self.get_active_child_tasks(suite_id)
208        self.retried_tasks = [t for t in self._active_child_tasks
209                              if self._should_retry(t)]
210        logging.info('Found %d tests to be retried.', len(self.retried_tasks))
211
212    def _check_all_tasks_finished(self):
213        """Check whether all tasks are finished, including retried tasks."""
214        finished_tasks = [t for t in self._active_child_tasks if
215                          t['state'] in swarming_lib.TASK_FINISHED_STATUS]
216        logging.info('%d/%d child tasks finished, %d got retried.',
217                     len(finished_tasks), len(self._active_child_tasks),
218                     len(self.retried_tasks))
219        return (len(finished_tasks) == len(self._active_child_tasks)
220                and not self.retried_tasks)
221
222    def _set_successful_provisioned_duts(self):
223        """Set successfully provisioned duts."""
224        for t in self._active_child_tasks:
225            if (swarming_lib.get_task_final_state(t) ==
226                swarming_lib.TASK_COMPLETED_SUCCESS):
227                dut_name = self.get_test_by_task_id(
228                        t['task_id']).test_spec.dut_name
229                if dut_name:
230                    self.successfully_provisioned_duts.add(dut_name)
231
232    def is_provision_successfully_finished(self):
233        """Check whether provision succeeds."""
234        logging.info('Found %d successfully provisioned duts, '
235                     'the minimum requirement is %d',
236                     len(self.successfully_provisioned_duts),
237                     self._provision_num_required)
238        return (len(self.successfully_provisioned_duts) >=
239                self._provision_num_required)
240
241    def is_finished_waiting(self):
242        """Check whether the suite should finish its waiting."""
243        if self.is_provision():
244            self._set_successful_provisioned_duts()
245            return (self.is_provision_successfully_finished() or
246                    self._check_all_tasks_finished())
247
248        return self._check_all_tasks_finished()
249
250    def _should_retry(self, test_result):
251        """Check whether a test should be retried.
252
253        We will retry a test if:
254            1. The test-level retry is enabled for this suite.
255            2. The test fails.
256            3. The test is currently monitored by the suite, i.e.
257               it's not a previous retried test.
258            4. The test has remaining retries based on JOB_RETRIES in
259               its control file.
260            5. The suite-level max retries isn't hit.
261
262        @param test_result: A json test result from swarming API.
263
264        @return True if we should retry the test.
265        """
266        task_id = test_result['task_id']
267        state = test_result['state']
268        is_failure = test_result['failure']
269        return (self._test_retry and
270                ((state == swarming_lib.TASK_COMPLETED and is_failure)
271                 or (state in swarming_lib.TASK_STATUS_TO_RETRY))
272                and (task_id in self._task_to_test_maps)
273                and (self._task_to_test_maps[task_id].remaining_retries > 0)
274                and (self._max_retries > 0))
275
276
277class Suite(object):
278    """The class for a CrOS suite."""
279    EXPIRATION_SECS = swarming_lib.DEFAULT_EXPIRATION_SECS
280
281    def __init__(self, spec, client):
282        """Initialize a suite.
283
284        @param spec: A SuiteSpec object.
285        @param client: A swarming_lib.Client instance.
286        """
287        self._ds = None
288
289        self.control_file = ''
290        self.test_specs = []
291        self.builds = spec.builds
292        self.test_source_build = spec.test_source_build
293        self.suite_name = spec.suite_name
294        self.suite_file_name = spec.suite_file_name
295        self.priority = spec.priority
296        self.board = spec.board
297        self.model = spec.model
298        self.pool = spec.pool
299        self.job_keyvals = spec.job_keyvals
300        self.minimum_duts = spec.minimum_duts
301        self.timeout_mins = spec.timeout_mins
302        self.quota_account = spec.quota_account
303        self._client = client
304
305    @property
306    def ds(self):
307        """Getter for private |self._ds| property.
308
309        This ensures that once self.ds is called, there's a devserver ready
310        for it.
311        """
312        if self._ds is None:
313            raise errors.InValidPropertyError(
314                'Property self.ds is None. Please call stage_suite_artifacts() '
315                'before calling it.')
316
317        return self._ds
318
319    def _get_cros_build(self):
320        provision = autotest.load('server.cros.provision')
321        return self.builds.get(provision.CROS_VERSION_PREFIX,
322                               self.builds.values()[0])
323
324    def _create_suite_keyvals(self):
325        constants = autotest.load('server.cros.dynamic_suite.constants')
326        provision = autotest.load('server.cros.provision')
327        cros_build = self._get_cros_build()
328        keyvals = {
329                constants.JOB_BUILD_KEY: cros_build,
330                constants.JOB_SUITE_KEY: self.suite_name,
331                constants.JOB_BUILDS_KEY: self.builds
332        }
333        if (cros_build != self.test_source_build or
334            len(self.builds) > 1):
335            keyvals[constants.JOB_TEST_SOURCE_BUILD_KEY] = (
336                    self.test_source_build)
337            for prefix, build in self.builds.iteritems():
338                if prefix == provision.FW_RW_VERSION_PREFIX:
339                    keyvals[constants.FWRW_BUILD]= build
340                elif prefix == provision.FW_RO_VERSION_PREFIX:
341                    keyvals[constants.FWRO_BUILD] = build
342
343        for key in self.job_keyvals:
344            if key in constants.INHERITED_KEYVALS:
345                keyvals[key] = self.job_keyvals[key]
346
347        return keyvals
348
349    def prepare(self):
350        """Prepare a suite job for execution."""
351        self._stage_suite_artifacts()
352        self._parse_suite_args()
353        keyvals = self._create_suite_keyvals()
354        available_bots = self._get_available_bots()
355        if len(available_bots) < self.minimum_duts:
356            raise errors.NoAvailableDUTsError(
357                    self.board, self.pool, len(available_bots),
358                    self.minimum_duts)
359
360        tests = self._find_tests(available_bots_num=len(available_bots))
361        self.test_specs = self._get_test_specs(tests, available_bots, keyvals)
362
363    def _create_test_spec(self, test, keyvals, bot_id='', dut_name=''):
364        return TestSpec(
365                test=test,
366                priority=self.priority,
367                board=self.board,
368                model=self.model,
369                pool=self.pool,
370                build=self.test_source_build,
371                bot_id=bot_id,
372                dut_name=dut_name,
373                keyvals=keyvals,
374                expiration_secs=self.timeout_mins * 60,
375                grace_period_secs=swarming_lib.DEFAULT_TIMEOUT_SECS,
376                execution_timeout_secs=self.timeout_mins * 60,
377                io_timeout_secs=swarming_lib.DEFAULT_TIMEOUT_SECS,
378                quota_account=self.quota_account,
379        )
380
381    def _get_test_specs(self, tests, available_bots, keyvals):
382        return [self._create_test_spec(test, keyvals) for test in tests]
383
384    def _stage_suite_artifacts(self):
385        """Stage suite control files and suite-to-tests mapping file.
386
387        @param build: The build to stage artifacts.
388        """
389        suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
390        ds, _ = suite_common.stage_build_artifacts(self.test_source_build)
391        self._ds = ds
392
393    def _parse_suite_args(self):
394        """Get the suite args.
395
396        The suite args includes:
397            a. suite args in suite control file.
398            b. passed-in suite args by user.
399        """
400        suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
401        self.control_file = suite_common.get_control_file_by_build(
402                self.test_source_build, self.ds, self.suite_file_name)
403
404    def _find_tests(self, available_bots_num=0):
405        """Fetch the child tests."""
406        control_file_getter = autotest.load(
407                'server.cros.dynamic_suite.control_file_getter')
408        suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
409        cf_getter = control_file_getter.DevServerGetter(
410                self.test_source_build, self.ds)
411        tests = suite_common.retrieve_for_suite(
412                cf_getter, self.suite_name)
413        return suite_common.filter_tests(
414                tests, suite_common.name_in_tag_predicate(self.suite_name))
415
416    def _get_available_bots(self):
417        """Get available bots for suites."""
418        dimensions = {'pool': swarming_lib.SKYLAB_DRONE_POOL,
419                      'label-board': self.board}
420        swarming_pool_deps = swarming_lib.task_dependencies_from_labels(
421            ['pool:%s' % self.pool])
422        dimensions.update(swarming_pool_deps)
423        bots = self._client.query_bots_list(dimensions)
424        return [bot for bot in bots if swarming_lib.bot_available(bot)]
425
426
427class ProvisionSuite(Suite):
428    """The class for a CrOS provision suite."""
429    EXPIRATION_SECS = swarming_lib.DEFAULT_EXPIRATION_SECS
430
431    def __init__(self, spec, client):
432        super(ProvisionSuite, self).__init__(spec, client)
433        self._num_required = spec.suite_args['num_required']
434
435    def _find_tests(self, available_bots_num=0):
436        """Fetch the child tests for provision suite."""
437        control_file_getter = autotest.load(
438                'server.cros.dynamic_suite.control_file_getter')
439        suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
440        cf_getter = control_file_getter.DevServerGetter(
441                self.test_source_build, self.ds)
442        dummy_test = suite_common.retrieve_control_data_for_test(
443                cf_getter, 'dummy_Pass')
444        logging.info('Get %d available DUTs for provision.', available_bots_num)
445        if available_bots_num < self._num_required:
446            logging.warning('Not enough available DUTs for provision.')
447            raise errors.NoAvailableDUTsError(
448                    self.board, self.pool, available_bots_num,
449                    self._num_required)
450
451        return [dummy_test] * max(self._num_required, available_bots_num)
452
453    def _get_test_specs(self, tests, available_bots, keyvals):
454        test_specs = []
455        for idx, test in enumerate(tests):
456            if idx < len(available_bots):
457                bot = available_bots[idx]
458                test_specs.append(self._create_test_spec(
459                        test, keyvals, bot_id=bot['bot_id'],
460                        dut_name=swarming_lib.get_task_dut_name(
461                                bot['dimensions'])))
462            else:
463                test_specs.append(self._create_test_spec(test, keyvals))
464
465        return test_specs
466