1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import contextlib
6import logging
7import time
8from multiprocessing import pool
9
10import base_event, board_enumerator, build_event
11import task, timed_event
12
13import common
14from autotest_lib.client.common_lib.cros.graphite import autotest_stats
15from autotest_lib.server import utils
16
17POOL_SIZE = 32
18
19_timer = autotest_stats.Timer('suite_scheduler')
20
21class Driver(object):
22    """Implements the main loop of the suite_scheduler.
23
24    @var EVENT_CLASSES: list of the event classes Driver supports.
25    @var _LOOP_INTERVAL_SECONDS: seconds to wait between loop iterations.
26
27    @var _scheduler: a DedupingScheduler, used to schedule jobs with the AFE.
28    @var _enumerator: a BoardEnumerator, used to list plaforms known to
29                      the AFE
30    @var _events: dict of BaseEvents to be handled each time through main loop.
31    """
32
33    EVENT_CLASSES = [timed_event.Nightly, timed_event.Weekly,
34                     build_event.NewBuild]
35    _LOOP_INTERVAL_SECONDS = 5 * 60
36
37
38    def __init__(self, scheduler, enumerator, is_sanity=False):
39        """Constructor
40
41        @param scheduler: an instance of deduping_scheduler.DedupingScheduler.
42        @param enumerator: an instance of board_enumerator.BoardEnumerator.
43        @param is_sanity: Set to True if the driver is created for sanity check.
44                          Default is set to False.
45        """
46        self._scheduler = scheduler
47        self._enumerator = enumerator
48        task.TotMilestoneManager.is_sanity = is_sanity
49
50
51    def RereadAndReprocessConfig(self, config, mv):
52        """Re-read config, re-populate self._events and recreate task lists.
53
54        @param config: an instance of ForgivingConfigParser.
55        @param mv: an instance of ManifestVersions.
56        """
57        config.reread()
58        new_events = self._CreateEventsWithTasks(config, mv)
59        for keyword, event in self._events.iteritems():
60            event.Merge(new_events[keyword])
61
62
63    def SetUpEventsAndTasks(self, config, mv):
64        """Populate self._events and create task lists from config.
65
66        @param config: an instance of ForgivingConfigParser.
67        @param mv: an instance of ManifestVersions.
68        """
69        self._events = self._CreateEventsWithTasks(config, mv)
70
71
72    def _CreateEventsWithTasks(self, config, mv):
73        """Create task lists from config, and assign to newly-minted events.
74
75        Calling multiple times should start afresh each time.
76
77        @param config: an instance of ForgivingConfigParser.
78        @param mv: an instance of ManifestVersions.
79        """
80        events = {}
81        for klass in self.EVENT_CLASSES:
82            events[klass.KEYWORD] = klass.CreateFromConfig(config, mv)
83
84        tasks = self.TasksFromConfig(config)
85        for keyword, task_list in tasks.iteritems():
86            if keyword in events:
87                events[keyword].tasks = task_list
88            else:
89                logging.warning('%s, is an unknown keyword.', keyword)
90        return events
91
92
93    def TasksFromConfig(self, config):
94        """Generate a dict of {event_keyword: [tasks]} mappings from |config|.
95
96        For each section in |config| that encodes a Task, instantiate a Task
97        object.  Determine the event that Task is supposed to run_on and
98        append the object to a list associated with the appropriate event
99        keyword.  Return a dictionary of these keyword: list of task mappings.
100
101        @param config: a ForgivingConfigParser containing tasks to be parsed.
102        @return dict of {event_keyword: [tasks]} mappings.
103        @raise MalformedConfigEntry on a task parsing error.
104        """
105        tasks = {}
106        for section in config.sections():
107            if not base_event.HonoredSection(section):
108                try:
109                    keyword, new_task = task.Task.CreateFromConfigSection(
110                        config, section)
111                except task.MalformedConfigEntry as e:
112                    logging.warning('%s is malformed: %s', section, e)
113                    continue
114                tasks.setdefault(keyword, []).append(new_task)
115        return tasks
116
117
118    def RunForever(self, config, mv):
119        """Main loop of the scheduler.  Runs til the process is killed.
120
121        @param config: an instance of ForgivingConfigParser.
122        @param mv: an instance of manifest_versions.ManifestVersions.
123        """
124        for event in self._events.itervalues():
125            event.Prepare()
126        while True:
127            try:
128                self.HandleEventsOnce(mv)
129            except board_enumerator.EnumeratorException as e:
130                logging.warning('Failed to enumerate boards: %r', e)
131            with _timer.get_client('manifest_versions_update'):
132                mv.Update()
133            with _timer.get_client('tot_milestone_manager_refresh'):
134                task.TotMilestoneManager().refresh()
135            time.sleep(self._LOOP_INTERVAL_SECONDS)
136            self.RereadAndReprocessConfig(config, mv)
137
138
139    @staticmethod
140    def HandleBoard(inputs):
141        """Handle event based on given inputs.
142
143        @param inputs: A dictionary of the arguments needed to handle an event.
144            Keys include:
145            scheduler: a DedupingScheduler, used to schedule jobs with the AFE.
146            event: An event object to be handled.
147            board: Name of the board.
148        """
149        scheduler = inputs['scheduler']
150        event = inputs['event']
151        board = inputs['board']
152
153        logging.info('Handling %s event for board %s', event.keyword, board)
154        branch_builds = event.GetBranchBuildsForBoard(board)
155        event.Handle(scheduler, branch_builds, board)
156        logging.info('Finished handling %s event for board %s', event.keyword,
157                     board)
158
159
160    @_timer.decorate
161    def HandleEventsOnce(self, mv):
162        """One turn through the loop.  Separated out for unit testing.
163
164        @param mv: an instance of manifest_versions.ManifestVersions.
165        @raise EnumeratorException if we can't enumerate any supported boards.
166        """
167        boards = self._enumerator.Enumerate()
168        logging.info('%d boards currently in the lab: %r', len(boards), boards)
169        thread_pool = pool.ThreadPool(POOL_SIZE)
170        with contextlib.closing(thread_pool):
171            for e in self._events.itervalues():
172                if not e.ShouldHandle():
173                    continue
174                logging.info('Handling %s event for %d boards', e.keyword,
175                             len(boards))
176                args = []
177                for board in boards:
178                    args.append({'scheduler': self._scheduler,
179                                 'event': e,
180                                 'board': board})
181                thread_pool.map(self.HandleBoard, args)
182                logging.info('Finished handling %s event for %d boards',
183                             e.keyword, len(boards))
184                e.UpdateCriteria()
185
186
187    def ForceEventsOnceForBuild(self, keywords, build_name):
188        """Force events with provided keywords to happen, with given build.
189
190        @param keywords: iterable of event keywords to force
191        @param build_name: instead of looking up builds to test, test this one.
192        """
193        board, type, milestone, manifest = utils.ParseBuildName(build_name)
194        branch_builds = {task.PickBranchName(type, milestone): [build_name]}
195        logging.info('Testing build R%s-%s on %s', milestone, manifest, board)
196
197        for e in self._events.itervalues():
198            if e.keyword in keywords:
199                e.Handle(self._scheduler, branch_builds, board, force=True)
200