1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8"""Host scheduler. 9 10If run as a standalone service, the host scheduler ensures the following: 11 1. Hosts will not be assigned to multiple hqes simultaneously. The process 12 of assignment in this case refers to the modification of the host_id 13 column of a row in the afe_host_queue_entries table, to reflect the host 14 id of a leased host matching the dependencies of the job. 15 2. Hosts that are not being used by active hqes or incomplete special tasks 16 will be released back to the available hosts pool, for acquisition by 17 subsequent hqes. 18In addition to these guarantees, the host scheduler also confirms that no 2 19active hqes/special tasks are assigned the same host, and sets the leased bit 20for hosts needed by frontend special tasks. The need for the latter is only 21apparent when viewed in the context of the job-scheduler (monitor_db), which 22runs special tasks only after their hosts have been leased. 23 24** Suport minimum duts requirement for suites (non-inline mode) ** 25 26Each suite can specify the minimum number of duts it requires by 27dropping a 'suite_min_duts' job keyval which defaults to 0. 28 29When suites are competing for duts, if any suite has not got minimum duts 30it requires, the host scheduler will try to meet the requirement first, 31even if other suite may have higher priority or earlier timestamp. Once 32all suites' minimum duts requirement have been fullfilled, the host 33scheduler will allocate the rest of duts based on job priority and suite job id. 34This is to prevent low priority suites from starving when sharing pool with 35high-priority suites. 36 37Note: 38 1. Prevent potential starvation: 39 We need to carefully choose |suite_min_duts| for both low and high 40 priority suites. If a high priority suite didn't specify it but a low 41 priority one does, the high priority suite can be starved! 42 2. Restart requirement: 43 Restart host scheduler if you manually released a host by setting 44 leased=0 in db. This is needed because host scheduler maintains internal 45 state of host assignment for suites. 46 3. Exchanging duts triggers provisioning: 47 TODO(fdeng): There is a chance two suites can exchange duts, 48 if the two suites are for different builds, the exchange 49 will trigger provisioning. This can be optimized by preferring getting 50 hosts with the same build. 51""" 52 53import argparse 54import collections 55import datetime 56import logging 57import os 58import signal 59import sys 60import time 61 62import common 63from autotest_lib.frontend import setup_django_environment 64 65from autotest_lib.client.common_lib import global_config 66from autotest_lib.client.common_lib.cros.graphite import autotest_stats 67from autotest_lib.scheduler import email_manager 68from autotest_lib.scheduler import query_managers 69from autotest_lib.scheduler import rdb_lib 70from autotest_lib.scheduler import rdb_utils 71from autotest_lib.scheduler import scheduler_lib 72from autotest_lib.scheduler import scheduler_models 73from autotest_lib.site_utils import job_overhead 74from autotest_lib.site_utils import metadata_reporter 75from autotest_lib.site_utils import server_manager_utils 76 77_db_manager = None 78_shutdown = False 79_tick_pause_sec = global_config.global_config.get_config_value( 80 'SCHEDULER', 'tick_pause_sec', type=int, default=5) 81_monitor_db_host_acquisition = global_config.global_config.get_config_value( 82 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True) 83 84 85class SuiteRecorder(object): 86 """Recording the host assignment for suites. 87 88 The recorder holds two things: 89 * suite_host_num, records how many duts a suite is holding, 90 which is a map <suite_job_id -> num_of_hosts> 91 * hosts_to_suites, records which host is assigned to which 92 suite, it is a map <host_id -> suite_job_id> 93 The two datastructure got updated when a host is assigned to or released 94 by a job. 95 96 The reason to maintain hosts_to_suites is that, when a host is released, 97 we need to know which suite it was leased to. Querying the db for the 98 latest completed job that has run on a host is slow. Therefore, we go with 99 an alternative: keeping a <host id, suite job id> map 100 in memory (for 10K hosts, the map should take less than 1M memory on 101 64-bit machine with python 2.7) 102 103 """ 104 105 106 _timer = autotest_stats.Timer('suite_recorder') 107 108 109 def __init__(self, job_query_manager): 110 """Initialize. 111 112 @param job_queue_manager: A JobQueueryManager object. 113 """ 114 self.job_query_manager = job_query_manager 115 self.suite_host_num, self.hosts_to_suites = ( 116 self.job_query_manager.get_suite_host_assignment()) 117 118 119 def record_assignment(self, queue_entry): 120 """Record that the hqe has got a host. 121 122 @param queue_entry: A scheduler_models.HostQueueEntry object which has 123 got a host. 124 """ 125 parent_id = queue_entry.job.parent_job_id 126 if not parent_id: 127 return 128 if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id: 129 logging.error('HQE (id: %d, parent_job_id: %d, host: %s) ' 130 'seems already recorded', queue_entry.id, 131 parent_id, queue_entry.host.hostname) 132 return 133 num_hosts = self.suite_host_num.get(parent_id, 0) 134 self.suite_host_num[parent_id] = num_hosts + 1 135 self.hosts_to_suites[queue_entry.host_id] = parent_id 136 logging.debug('Suite %d got host %s, currently holding %d hosts', 137 parent_id, queue_entry.host.hostname, 138 self.suite_host_num[parent_id]) 139 140 141 def record_release(self, hosts): 142 """Update the record with host releasing event. 143 144 @param hosts: A list of scheduler_models.Host objects. 145 """ 146 for host in hosts: 147 if host.id in self.hosts_to_suites: 148 parent_job_id = self.hosts_to_suites.pop(host.id) 149 count = self.suite_host_num[parent_job_id] - 1 150 if count == 0: 151 del self.suite_host_num[parent_job_id] 152 else: 153 self.suite_host_num[parent_job_id] = count 154 logging.debug( 155 'Suite %d releases host %s, currently holding %d hosts', 156 parent_job_id, host.hostname, count) 157 158 159 def get_min_duts(self, suite_job_ids): 160 """Figure out min duts to request. 161 162 Given a set ids of suite jobs, figure out minimum duts to request for 163 each suite. It is determined by two factors: min_duts specified 164 for each suite in its job keyvals, and how many duts a suite is 165 currently holding. 166 167 @param suite_job_ids: A set of suite job ids. 168 169 @returns: A dictionary, the key is suite_job_id, the value 170 is the minimum number of duts to request. 171 """ 172 suite_min_duts = self.job_query_manager.get_min_duts_of_suites( 173 suite_job_ids) 174 for parent_id in suite_job_ids: 175 min_duts = suite_min_duts.get(parent_id, 0) 176 cur_duts = self.suite_host_num.get(parent_id, 0) 177 suite_min_duts[parent_id] = max(0, min_duts - cur_duts) 178 logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s', 179 suite_min_duts) 180 return suite_min_duts 181 182 183class BaseHostScheduler(object): 184 """Base class containing host acquisition logic. 185 186 This class contains all the core host acquisition logic needed by the 187 scheduler to run jobs on hosts. It is only capable of releasing hosts 188 back to the rdb through its tick, any other action must be instigated by 189 the job scheduler. 190 """ 191 192 193 _timer = autotest_stats.Timer('base_host_scheduler') 194 host_assignment = collections.namedtuple('host_assignment', ['host', 'job']) 195 196 197 def __init__(self): 198 self.host_query_manager = query_managers.AFEHostQueryManager() 199 200 201 @_timer.decorate 202 def _release_hosts(self): 203 """Release hosts to the RDB. 204 205 Release all hosts that are ready and are currently not being used by an 206 active hqe, and don't have a new special task scheduled against them. 207 208 @return a list of hosts that are released. 209 """ 210 release_hosts = self.host_query_manager.find_unused_healty_hosts() 211 release_hostnames = [host.hostname for host in release_hosts] 212 if release_hostnames: 213 self.host_query_manager.set_leased( 214 False, hostname__in=release_hostnames) 215 return release_hosts 216 217 218 @classmethod 219 def schedule_host_job(cls, host, queue_entry): 220 """Schedule a job on a host. 221 222 Scheduling a job involves: 223 1. Setting the active bit on the queue_entry. 224 2. Scheduling a special task on behalf of the queue_entry. 225 Performing these actions will lead the job scheduler through a chain of 226 events, culminating in running the test and collecting results from 227 the host. 228 229 @param host: The host against which to schedule the job. 230 @param queue_entry: The queue_entry to schedule. 231 """ 232 if queue_entry.host_id is None: 233 queue_entry.set_host(host) 234 elif host.id != queue_entry.host_id: 235 raise rdb_utils.RDBException('The rdb returned host: %s ' 236 'but the job:%s was already assigned a host: %s. ' % 237 (host.hostname, queue_entry.job_id, 238 queue_entry.host.hostname)) 239 queue_entry.update_field('active', True) 240 241 # TODO: crbug.com/373936. The host scheduler should only be assigning 242 # jobs to hosts, but the criterion we use to release hosts depends 243 # on it not being used by an active hqe. Since we're activating the 244 # hqe here, we also need to schedule its first prejob task. OTOH, 245 # we could converge to having the host scheduler manager all special 246 # tasks, since their only use today is to verify/cleanup/reset a host. 247 logging.info('Scheduling pre job tasks for entry: %s', queue_entry) 248 queue_entry.schedule_pre_job_tasks() 249 250 251 def acquire_hosts(self, host_jobs): 252 """Accquire hosts for given jobs. 253 254 This method sends jobs that need hosts to rdb. 255 Child class can override this method to pipe more args 256 to rdb. 257 258 @param host_jobs: A list of queue entries that either require hosts, 259 or require host assignment validation through the rdb. 260 261 @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper 262 for each host acquired on behalf of a queue_entry, 263 or None if a host wasn't found. 264 """ 265 return rdb_lib.acquire_hosts(host_jobs) 266 267 268 def find_hosts_for_jobs(self, host_jobs): 269 """Find and verify hosts for a list of jobs. 270 271 @param host_jobs: A list of queue entries that either require hosts, 272 or require host assignment validation through the rdb. 273 @return: A list of tuples of the form (host, queue_entry) for each 274 valid host-queue_entry assignment. 275 """ 276 jobs_with_hosts = [] 277 hosts = self.acquire_hosts(host_jobs) 278 for host, job in zip(hosts, host_jobs): 279 if host: 280 jobs_with_hosts.append(self.host_assignment(host, job)) 281 return jobs_with_hosts 282 283 284 @_timer.decorate 285 def tick(self): 286 """Schedule core host management activities.""" 287 self._release_hosts() 288 289 290class HostScheduler(BaseHostScheduler): 291 """A scheduler capable managing host acquisition for new jobs.""" 292 293 _timer = autotest_stats.Timer('host_scheduler') 294 295 296 def __init__(self): 297 super(HostScheduler, self).__init__() 298 self.job_query_manager = query_managers.AFEJobQueryManager() 299 # Keeping track on how many hosts each suite is holding 300 # {suite_job_id: num_hosts} 301 self._suite_recorder = SuiteRecorder(self.job_query_manager) 302 303 304 def _record_host_assignment(self, host, queue_entry): 305 """Record that |host| is assigned to |queue_entry|. 306 307 Record: 308 1. How long it takes to assign a host to a job in metadata db. 309 2. Record host assignment of a suite. 310 311 @param host: A Host object. 312 @param queue_entry: A HostQueueEntry object. 313 """ 314 secs_in_queued = (datetime.datetime.now() - 315 queue_entry.job.created_on).total_seconds() 316 job_overhead.record_state_duration( 317 queue_entry.job_id, host.hostname, 318 job_overhead.STATUS.QUEUED, secs_in_queued) 319 self._suite_recorder.record_assignment(queue_entry) 320 321 322 @_timer.decorate 323 def _schedule_jobs(self): 324 """Schedule new jobs against hosts.""" 325 326 key = 'host_scheduler.jobs_per_tick' 327 new_jobs_with_hosts = 0 328 queue_entries = self.job_query_manager.get_pending_queue_entries( 329 only_hostless=False) 330 unverified_host_jobs = [job for job in queue_entries 331 if not job.is_hostless()] 332 if not unverified_host_jobs: 333 return 334 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs): 335 self.schedule_host_job(acquisition.host, acquisition.job) 336 self._record_host_assignment(acquisition.host, acquisition.job) 337 new_jobs_with_hosts += 1 338 autotest_stats.Gauge(key).send('new_jobs_with_hosts', 339 new_jobs_with_hosts) 340 autotest_stats.Gauge(key).send('new_jobs_without_hosts', 341 len(unverified_host_jobs) - 342 new_jobs_with_hosts) 343 344 345 @_timer.decorate 346 def _lease_hosts_of_frontend_tasks(self): 347 """Lease hosts of tasks scheduled through the frontend.""" 348 # We really don't need to get all the special tasks here, just the ones 349 # without hqes, but reusing the method used by the scheduler ensures 350 # we prioritize the same way. 351 lease_hostnames = [ 352 task.host.hostname for task in 353 self.job_query_manager.get_prioritized_special_tasks( 354 only_tasks_with_leased_hosts=False) 355 if task.queue_entry_id is None and not task.host.leased] 356 # Leasing a leased hosts here shouldn't be a problem: 357 # 1. The only way a host can be leased is if it's been assigned to 358 # an active hqe or another similar frontend task, but doing so will 359 # have already precluded it from the list of tasks returned by the 360 # job_query_manager. 361 # 2. The unleasing is done based on global conditions. Eg: Even if a 362 # task has already leased a host and we lease it again, the 363 # host scheduler won't release the host till both tasks are complete. 364 if lease_hostnames: 365 self.host_query_manager.set_leased( 366 True, hostname__in=lease_hostnames) 367 368 369 def acquire_hosts(self, host_jobs): 370 """Override acquire_hosts. 371 372 This method overrides the method in parent class. 373 It figures out a set of suites that |host_jobs| belong to; 374 and get min_duts requirement for each suite. 375 It pipes min_duts for each suite to rdb. 376 377 """ 378 parent_job_ids = set([q.job.parent_job_id 379 for q in host_jobs if q.job.parent_job_id]) 380 suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids) 381 return rdb_lib.acquire_hosts(host_jobs, suite_min_duts) 382 383 384 @_timer.decorate 385 def tick(self): 386 logging.info('Calling new tick.') 387 logging.info('Leasing hosts for frontend tasks.') 388 self._lease_hosts_of_frontend_tasks() 389 logging.info('Finding hosts for new jobs.') 390 self._schedule_jobs() 391 logging.info('Releasing unused hosts.') 392 released_hosts = self._release_hosts() 393 logging.info('Updating suite assignment with released hosts') 394 self._suite_recorder.record_release(released_hosts) 395 logging.info('Calling email_manager.') 396 email_manager.manager.send_queued_emails() 397 398 399class DummyHostScheduler(BaseHostScheduler): 400 """A dummy host scheduler that doesn't acquire or release hosts.""" 401 402 def __init__(self): 403 pass 404 405 406 def tick(self): 407 pass 408 409 410def handle_signal(signum, frame): 411 """Sigint handler so we don't crash mid-tick.""" 412 global _shutdown 413 _shutdown = True 414 logging.info("Shutdown request received.") 415 416 417def initialize(testing=False): 418 """Initialize the host scheduler.""" 419 if testing: 420 # Don't import testing utilities unless we're in testing mode, 421 # as the database imports have side effects. 422 from autotest_lib.scheduler import rdb_testing_utils 423 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing( 424 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE) 425 global _db_manager 426 _db_manager = scheduler_lib.ConnectionManager() 427 scheduler_lib.setup_logging( 428 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 429 None, timestamped_logfile_prefix='host_scheduler') 430 logging.info("Setting signal handler") 431 signal.signal(signal.SIGINT, handle_signal) 432 signal.signal(signal.SIGTERM, handle_signal) 433 scheduler_models.initialize() 434 435 436def parse_arguments(argv): 437 """ 438 Parse command line arguments 439 440 @param argv: argument list to parse 441 @returns: parsed arguments. 442 """ 443 parser = argparse.ArgumentParser(description='Host scheduler.') 444 parser.add_argument('--testing', action='store_true', default=False, 445 help='Start the host scheduler in testing mode.') 446 parser.add_argument('--production', 447 help=('Indicate that scheduler is running in production' 448 ' environment and it can use database that is not' 449 ' hosted in localhost. If it is set to False, ' 450 'scheduler will fail if database is not in ' 451 'localhost.'), 452 action='store_true', default=False) 453 options = parser.parse_args(argv) 454 455 return options 456 457 458def main(): 459 if _monitor_db_host_acquisition: 460 logging.info('Please set inline_host_acquisition=False in the shadow ' 461 'config before starting the host scheduler.') 462 # The upstart job for the host scheduler understands exit(0) to mean 463 # 'don't respawn'. This is desirable when the job scheduler is acquiring 464 # hosts inline. 465 sys.exit(0) 466 try: 467 options = parse_arguments(sys.argv[1:]) 468 scheduler_lib.check_production_settings(options) 469 470 # If server database is enabled, check if the server has role 471 # `host_scheduler`. If the server does not have host_scheduler role, 472 # exception will be raised and host scheduler will not continue to run. 473 if server_manager_utils.use_server_db(): 474 server_manager_utils.confirm_server_has_role(hostname='localhost', 475 role='host_scheduler') 476 477 initialize(options.testing) 478 479 # Start the thread to report metadata. 480 metadata_reporter.start() 481 482 host_scheduler = HostScheduler() 483 minimum_tick_sec = global_config.global_config.get_config_value( 484 'SCHEDULER', 'minimum_tick_sec', type=float) 485 while not _shutdown: 486 start = time.time() 487 host_scheduler.tick() 488 curr_tick_sec = time.time() - start 489 if (minimum_tick_sec > curr_tick_sec): 490 time.sleep(minimum_tick_sec - curr_tick_sec) 491 else: 492 time.sleep(0.0001) 493 except Exception: 494 email_manager.manager.log_stacktrace( 495 'Uncaught exception; terminating host_scheduler.') 496 raise 497 finally: 498 email_manager.manager.send_queued_emails() 499 if _db_manager: 500 _db_manager.disconnect() 501 metadata_reporter.abort() 502 503 504if __name__ == '__main__': 505 main() 506