1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import contextlib 6import logging 7import time 8from multiprocessing import pool 9 10import base_event, board_enumerator, build_event 11import task, timed_event 12 13import common 14from autotest_lib.client.common_lib.cros.graphite import autotest_stats 15from autotest_lib.server import utils 16 17POOL_SIZE = 32 18 19_timer = autotest_stats.Timer('suite_scheduler') 20 21class Driver(object): 22 """Implements the main loop of the suite_scheduler. 23 24 @var EVENT_CLASSES: list of the event classes Driver supports. 25 @var _LOOP_INTERVAL_SECONDS: seconds to wait between loop iterations. 26 27 @var _scheduler: a DedupingScheduler, used to schedule jobs with the AFE. 28 @var _enumerator: a BoardEnumerator, used to list plaforms known to 29 the AFE 30 @var _events: dict of BaseEvents to be handled each time through main loop. 31 """ 32 33 EVENT_CLASSES = [timed_event.Nightly, timed_event.Weekly, 34 build_event.NewBuild] 35 _LOOP_INTERVAL_SECONDS = 5 * 60 36 37 38 def __init__(self, scheduler, enumerator, is_sanity=False): 39 """Constructor 40 41 @param scheduler: an instance of deduping_scheduler.DedupingScheduler. 42 @param enumerator: an instance of board_enumerator.BoardEnumerator. 43 @param is_sanity: Set to True if the driver is created for sanity check. 44 Default is set to False. 45 """ 46 self._scheduler = scheduler 47 self._enumerator = enumerator 48 task.TotMilestoneManager.is_sanity = is_sanity 49 50 51 def RereadAndReprocessConfig(self, config, mv): 52 """Re-read config, re-populate self._events and recreate task lists. 53 54 @param config: an instance of ForgivingConfigParser. 55 @param mv: an instance of ManifestVersions. 56 """ 57 config.reread() 58 new_events = self._CreateEventsWithTasks(config, mv) 59 for keyword, event in self._events.iteritems(): 60 event.Merge(new_events[keyword]) 61 62 63 def SetUpEventsAndTasks(self, config, mv): 64 """Populate self._events and create task lists from config. 65 66 @param config: an instance of ForgivingConfigParser. 67 @param mv: an instance of ManifestVersions. 68 """ 69 self._events = self._CreateEventsWithTasks(config, mv) 70 71 72 def _CreateEventsWithTasks(self, config, mv): 73 """Create task lists from config, and assign to newly-minted events. 74 75 Calling multiple times should start afresh each time. 76 77 @param config: an instance of ForgivingConfigParser. 78 @param mv: an instance of ManifestVersions. 79 """ 80 events = {} 81 for klass in self.EVENT_CLASSES: 82 events[klass.KEYWORD] = klass.CreateFromConfig(config, mv) 83 84 tasks = self.TasksFromConfig(config) 85 for keyword, task_list in tasks.iteritems(): 86 if keyword in events: 87 events[keyword].tasks = task_list 88 else: 89 logging.warning('%s, is an unknown keyword.', keyword) 90 return events 91 92 93 def TasksFromConfig(self, config): 94 """Generate a dict of {event_keyword: [tasks]} mappings from |config|. 95 96 For each section in |config| that encodes a Task, instantiate a Task 97 object. Determine the event that Task is supposed to run_on and 98 append the object to a list associated with the appropriate event 99 keyword. Return a dictionary of these keyword: list of task mappings. 100 101 @param config: a ForgivingConfigParser containing tasks to be parsed. 102 @return dict of {event_keyword: [tasks]} mappings. 103 @raise MalformedConfigEntry on a task parsing error. 104 """ 105 tasks = {} 106 for section in config.sections(): 107 if not base_event.HonoredSection(section): 108 try: 109 keyword, new_task = task.Task.CreateFromConfigSection( 110 config, section) 111 except task.MalformedConfigEntry as e: 112 logging.warning('%s is malformed: %s', section, e) 113 continue 114 tasks.setdefault(keyword, []).append(new_task) 115 return tasks 116 117 118 def RunForever(self, config, mv): 119 """Main loop of the scheduler. Runs til the process is killed. 120 121 @param config: an instance of ForgivingConfigParser. 122 @param mv: an instance of manifest_versions.ManifestVersions. 123 """ 124 for event in self._events.itervalues(): 125 event.Prepare() 126 while True: 127 try: 128 self.HandleEventsOnce(mv) 129 except board_enumerator.EnumeratorException as e: 130 logging.warning('Failed to enumerate boards: %r', e) 131 with _timer.get_client('manifest_versions_update'): 132 mv.Update() 133 with _timer.get_client('tot_milestone_manager_refresh'): 134 task.TotMilestoneManager().refresh() 135 time.sleep(self._LOOP_INTERVAL_SECONDS) 136 self.RereadAndReprocessConfig(config, mv) 137 138 139 @staticmethod 140 def HandleBoard(inputs): 141 """Handle event based on given inputs. 142 143 @param inputs: A dictionary of the arguments needed to handle an event. 144 Keys include: 145 scheduler: a DedupingScheduler, used to schedule jobs with the AFE. 146 event: An event object to be handled. 147 board: Name of the board. 148 """ 149 scheduler = inputs['scheduler'] 150 event = inputs['event'] 151 board = inputs['board'] 152 153 logging.info('Handling %s event for board %s', event.keyword, board) 154 branch_builds = event.GetBranchBuildsForBoard(board) 155 event.Handle(scheduler, branch_builds, board) 156 logging.info('Finished handling %s event for board %s', event.keyword, 157 board) 158 159 160 @_timer.decorate 161 def HandleEventsOnce(self, mv): 162 """One turn through the loop. Separated out for unit testing. 163 164 @param mv: an instance of manifest_versions.ManifestVersions. 165 @raise EnumeratorException if we can't enumerate any supported boards. 166 """ 167 boards = self._enumerator.Enumerate() 168 logging.info('%d boards currently in the lab: %r', len(boards), boards) 169 thread_pool = pool.ThreadPool(POOL_SIZE) 170 with contextlib.closing(thread_pool): 171 for e in self._events.itervalues(): 172 if not e.ShouldHandle(): 173 continue 174 logging.info('Handling %s event for %d boards', e.keyword, 175 len(boards)) 176 args = [] 177 for board in boards: 178 args.append({'scheduler': self._scheduler, 179 'event': e, 180 'board': board}) 181 thread_pool.map(self.HandleBoard, args) 182 logging.info('Finished handling %s event for %d boards', 183 e.keyword, len(boards)) 184 e.UpdateCriteria() 185 186 187 def ForceEventsOnceForBuild(self, keywords, build_name): 188 """Force events with provided keywords to happen, with given build. 189 190 @param keywords: iterable of event keywords to force 191 @param build_name: instead of looking up builds to test, test this one. 192 """ 193 board, type, milestone, manifest = utils.ParseBuildName(build_name) 194 branch_builds = {task.PickBranchName(type, milestone): [build_name]} 195 logging.info('Testing build R%s-%s on %s', milestone, manifest, board) 196 197 for e in self._events.itervalues(): 198 if e.keyword in keywords: 199 e.Handle(self._scheduler, branch_builds, board, force=True) 200