#!/usr/bin/python #pylint: disable-msg=C0111 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Host scheduler. If run as a standalone service, the host scheduler ensures the following: 1. Hosts will not be assigned to multiple hqes simultaneously. The process of assignment in this case refers to the modification of the host_id column of a row in the afe_host_queue_entries table, to reflect the host id of a leased host matching the dependencies of the job. 2. Hosts that are not being used by active hqes or incomplete special tasks will be released back to the available hosts pool, for acquisition by subsequent hqes. In addition to these guarantees, the host scheduler also confirms that no 2 active hqes/special tasks are assigned the same host, and sets the leased bit for hosts needed by frontend special tasks. The need for the latter is only apparent when viewed in the context of the job-scheduler (monitor_db), which runs special tasks only after their hosts have been leased. ** Suport minimum duts requirement for suites (non-inline mode) ** Each suite can specify the minimum number of duts it requires by dropping a 'suite_min_duts' job keyval which defaults to 0. When suites are competing for duts, if any suite has not got minimum duts it requires, the host scheduler will try to meet the requirement first, even if other suite may have higher priority or earlier timestamp. Once all suites' minimum duts requirement have been fullfilled, the host scheduler will allocate the rest of duts based on job priority and suite job id. This is to prevent low priority suites from starving when sharing pool with high-priority suites. Note: 1. Prevent potential starvation: We need to carefully choose |suite_min_duts| for both low and high priority suites. If a high priority suite didn't specify it but a low priority one does, the high priority suite can be starved! 2. Restart requirement: Restart host scheduler if you manually released a host by setting leased=0 in db. This is needed because host scheduler maintains internal state of host assignment for suites. 3. Exchanging duts triggers provisioning: TODO(fdeng): There is a chance two suites can exchange duts, if the two suites are for different builds, the exchange will trigger provisioning. This can be optimized by preferring getting hosts with the same build. """ import argparse import collections import datetime import logging import os import signal import sys import time import common from autotest_lib.client.common_lib import utils from autotest_lib.frontend import setup_django_environment # This import needs to come earlier to avoid using autotest's version of # httplib2, which is out of date. try: from chromite.lib import metrics from chromite.lib import ts_mon_config except ImportError: metrics = utils.metrics_mock ts_mon_config = utils.metrics_mock from autotest_lib.client.common_lib import global_config from autotest_lib.scheduler import email_manager from autotest_lib.scheduler import query_managers from autotest_lib.scheduler import rdb_lib from autotest_lib.scheduler import rdb_utils from autotest_lib.scheduler import scheduler_lib from autotest_lib.scheduler import scheduler_models from autotest_lib.site_utils import server_manager_utils _db_manager = None _shutdown = False _tick_pause_sec = global_config.global_config.get_config_value( 'SCHEDULER', 'tick_pause_sec', type=int, default=5) _monitor_db_host_acquisition = global_config.global_config.get_config_value( 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True) _METRICS_PREFIX = 'chromeos/autotest/host_scheduler' class SuiteRecorder(object): """Recording the host assignment for suites. The recorder holds two things: * suite_host_num, records how many duts a suite is holding, which is a map num_of_hosts> * hosts_to_suites, records which host is assigned to which suite, it is a map suite_job_id> The two datastructure got updated when a host is assigned to or released by a job. The reason to maintain hosts_to_suites is that, when a host is released, we need to know which suite it was leased to. Querying the db for the latest completed job that has run on a host is slow. Therefore, we go with an alternative: keeping a map in memory (for 10K hosts, the map should take less than 1M memory on 64-bit machine with python 2.7) """ def __init__(self, job_query_manager): """Initialize. @param job_queue_manager: A JobQueueryManager object. """ self.job_query_manager = job_query_manager self.suite_host_num, self.hosts_to_suites = ( self.job_query_manager.get_suite_host_assignment()) def record_assignment(self, queue_entry): """Record that the hqe has got a host. @param queue_entry: A scheduler_models.HostQueueEntry object which has got a host. """ parent_id = queue_entry.job.parent_job_id if not parent_id: return if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id: logging.error('HQE (id: %d, parent_job_id: %d, host: %s) ' 'seems already recorded', queue_entry.id, parent_id, queue_entry.host.hostname) return num_hosts = self.suite_host_num.get(parent_id, 0) self.suite_host_num[parent_id] = num_hosts + 1 self.hosts_to_suites[queue_entry.host_id] = parent_id logging.debug('Suite %d got host %s, currently holding %d hosts', parent_id, queue_entry.host.hostname, self.suite_host_num[parent_id]) def record_release(self, hosts): """Update the record with host releasing event. @param hosts: A list of scheduler_models.Host objects. """ for host in hosts: if host.id in self.hosts_to_suites: parent_job_id = self.hosts_to_suites.pop(host.id) count = self.suite_host_num[parent_job_id] - 1 if count == 0: del self.suite_host_num[parent_job_id] else: self.suite_host_num[parent_job_id] = count logging.debug( 'Suite %d releases host %s, currently holding %d hosts', parent_job_id, host.hostname, count) def get_min_duts(self, suite_job_ids): """Figure out min duts to request. Given a set ids of suite jobs, figure out minimum duts to request for each suite. It is determined by two factors: min_duts specified for each suite in its job keyvals, and how many duts a suite is currently holding. @param suite_job_ids: A set of suite job ids. @returns: A dictionary, the key is suite_job_id, the value is the minimum number of duts to request. """ suite_min_duts = self.job_query_manager.get_min_duts_of_suites( suite_job_ids) for parent_id in suite_job_ids: min_duts = suite_min_duts.get(parent_id, 0) cur_duts = self.suite_host_num.get(parent_id, 0) suite_min_duts[parent_id] = max(0, min_duts - cur_duts) logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s', suite_min_duts) return suite_min_duts class BaseHostScheduler(object): """Base class containing host acquisition logic. This class contains all the core host acquisition logic needed by the scheduler to run jobs on hosts. It is only capable of releasing hosts back to the rdb through its tick, any other action must be instigated by the job scheduler. """ host_assignment = collections.namedtuple('host_assignment', ['host', 'job']) def __init__(self): self.host_query_manager = query_managers.AFEHostQueryManager() def _release_hosts(self): """Release hosts to the RDB. Release all hosts that are ready and are currently not being used by an active hqe, and don't have a new special task scheduled against them. @return a list of hosts that are released. """ release_hosts = self.host_query_manager.find_unused_healty_hosts() release_hostnames = [host.hostname for host in release_hosts] if release_hostnames: self.host_query_manager.set_leased( False, hostname__in=release_hostnames) return release_hosts @classmethod def schedule_host_job(cls, host, queue_entry): """Schedule a job on a host. Scheduling a job involves: 1. Setting the active bit on the queue_entry. 2. Scheduling a special task on behalf of the queue_entry. Performing these actions will lead the job scheduler through a chain of events, culminating in running the test and collecting results from the host. @param host: The host against which to schedule the job. @param queue_entry: The queue_entry to schedule. """ if queue_entry.host_id is None: queue_entry.set_host(host) elif host.id != queue_entry.host_id: raise rdb_utils.RDBException('The rdb returned host: %s ' 'but the job:%s was already assigned a host: %s. ' % (host.hostname, queue_entry.job_id, queue_entry.host.hostname)) queue_entry.update_field('active', True) # TODO: crbug.com/373936. The host scheduler should only be assigning # jobs to hosts, but the criterion we use to release hosts depends # on it not being used by an active hqe. Since we're activating the # hqe here, we also need to schedule its first prejob task. OTOH, # we could converge to having the host scheduler manager all special # tasks, since their only use today is to verify/cleanup/reset a host. logging.info('Scheduling pre job tasks for entry: %s', queue_entry) queue_entry.schedule_pre_job_tasks() def acquire_hosts(self, host_jobs): """Accquire hosts for given jobs. This method sends jobs that need hosts to rdb. Child class can override this method to pipe more args to rdb. @param host_jobs: A list of queue entries that either require hosts, or require host assignment validation through the rdb. @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper for each host acquired on behalf of a queue_entry, or None if a host wasn't found. """ return rdb_lib.acquire_hosts(host_jobs) def find_hosts_for_jobs(self, host_jobs): """Find and verify hosts for a list of jobs. @param host_jobs: A list of queue entries that either require hosts, or require host assignment validation through the rdb. @return: A generator of tuples of the form (host, queue_entry) for each valid host-queue_entry assignment. """ hosts = self.acquire_hosts(host_jobs) for host, job in zip(hosts, host_jobs): if host: yield self.host_assignment(host, job) def tick(self): """Schedule core host management activities.""" self._release_hosts() class HostScheduler(BaseHostScheduler): """A scheduler capable managing host acquisition for new jobs.""" def __init__(self): super(HostScheduler, self).__init__() self.job_query_manager = query_managers.AFEJobQueryManager() # Keeping track on how many hosts each suite is holding # {suite_job_id: num_hosts} self._suite_recorder = SuiteRecorder(self.job_query_manager) def _record_host_assignment(self, host, queue_entry): """Record that |host| is assigned to |queue_entry|. Record: 1. How long it takes to assign a host to a job in metadata db. 2. Record host assignment of a suite. @param host: A Host object. @param queue_entry: A HostQueueEntry object. """ secs_in_queued = (datetime.datetime.now() - queue_entry.job.created_on).total_seconds() self._suite_recorder.record_assignment(queue_entry) @metrics.SecondsTimerDecorator( '%s/schedule_jobs_duration' % _METRICS_PREFIX) def _schedule_jobs(self): """Schedule new jobs against hosts.""" new_jobs_with_hosts = 0 queue_entries = self.job_query_manager.get_pending_queue_entries( only_hostless=False) unverified_host_jobs = [job for job in queue_entries if not job.is_hostless()] if unverified_host_jobs: for acquisition in self.find_hosts_for_jobs(unverified_host_jobs): self.schedule_host_job(acquisition.host, acquisition.job) self._record_host_assignment(acquisition.host, acquisition.job) new_jobs_with_hosts += 1 metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX ).increment_by(new_jobs_with_hosts) num_jobs_without_hosts = (len(unverified_host_jobs) - new_jobs_with_hosts) metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX ).set(num_jobs_without_hosts) metrics.Counter('%s/tick' % _METRICS_PREFIX).increment() @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX) def _lease_hosts_of_frontend_tasks(self): """Lease hosts of tasks scheduled through the frontend.""" # We really don't need to get all the special tasks here, just the ones # without hqes, but reusing the method used by the scheduler ensures # we prioritize the same way. lease_hostnames = [ task.host.hostname for task in self.job_query_manager.get_prioritized_special_tasks( only_tasks_with_leased_hosts=False) if task.queue_entry_id is None and not task.host.leased] # Leasing a leased hosts here shouldn't be a problem: # 1. The only way a host can be leased is if it's been assigned to # an active hqe or another similar frontend task, but doing so will # have already precluded it from the list of tasks returned by the # job_query_manager. # 2. The unleasing is done based on global conditions. Eg: Even if a # task has already leased a host and we lease it again, the # host scheduler won't release the host till both tasks are complete. if lease_hostnames: self.host_query_manager.set_leased( True, hostname__in=lease_hostnames) def acquire_hosts(self, host_jobs): """Override acquire_hosts. This method overrides the method in parent class. It figures out a set of suites that |host_jobs| belong to; and get min_duts requirement for each suite. It pipes min_duts for each suite to rdb. """ parent_job_ids = set([q.job.parent_job_id for q in host_jobs if q.job.parent_job_id]) suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids) return rdb_lib.acquire_hosts(host_jobs, suite_min_duts) @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX) def tick(self): logging.info('Calling new tick.') logging.info('Leasing hosts for frontend tasks.') self._lease_hosts_of_frontend_tasks() logging.info('Finding hosts for new jobs.') self._schedule_jobs() logging.info('Releasing unused hosts.') released_hosts = self._release_hosts() logging.info('Updating suite assignment with released hosts') self._suite_recorder.record_release(released_hosts) logging.info('Calling email_manager.') email_manager.manager.send_queued_emails() class DummyHostScheduler(BaseHostScheduler): """A dummy host scheduler that doesn't acquire or release hosts.""" def __init__(self): pass def tick(self): pass def handle_signal(signum, frame): """Sigint handler so we don't crash mid-tick.""" global _shutdown _shutdown = True logging.info("Shutdown request received.") def initialize(testing=False): """Initialize the host scheduler.""" if testing: # Don't import testing utilities unless we're in testing mode, # as the database imports have side effects. from autotest_lib.scheduler import rdb_testing_utils rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing( db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE) global _db_manager _db_manager = scheduler_lib.ConnectionManager() scheduler_lib.setup_logging( os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), None, timestamped_logfile_prefix='host_scheduler') logging.info("Setting signal handler") signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) scheduler_models.initialize() def parse_arguments(argv): """ Parse command line arguments @param argv: argument list to parse @returns: parsed arguments. """ parser = argparse.ArgumentParser(description='Host scheduler.') parser.add_argument('--testing', action='store_true', default=False, help='Start the host scheduler in testing mode.') parser.add_argument('--production', help=('Indicate that scheduler is running in production' ' environment and it can use database that is not' ' hosted in localhost. If it is set to False, ' 'scheduler will fail if database is not in ' 'localhost.'), action='store_true', default=False) parser.add_argument( '--lifetime-hours', type=float, default=None, help='If provided, number of hours the scheduler should run for. ' 'At the expiry of this time, the process will exit ' 'gracefully.', ) parser.add_argument( '--metrics-file', help='If provided, drop metrics to this local file instead of ' 'reporting to ts_mon', type=str, default=None, ) options = parser.parse_args(argv) return options def main(): if _monitor_db_host_acquisition: logging.info('Please set inline_host_acquisition=False in the shadow ' 'config before starting the host scheduler.') sys.exit(0) try: options = parse_arguments(sys.argv[1:]) scheduler_lib.check_production_settings(options) # If server database is enabled, check if the server has role # `host_scheduler`. If the server does not have host_scheduler role, # exception will be raised and host scheduler will not continue to run. if server_manager_utils.use_server_db(): server_manager_utils.confirm_server_has_role(hostname='localhost', role='host_scheduler') initialize(options.testing) with ts_mon_config.SetupTsMonGlobalState( 'autotest_host_scheduler', indirect=True, debug_file=options.metrics_file, ): metrics.Counter('%s/start' % _METRICS_PREFIX).increment() process_start_time = time.time() host_scheduler = HostScheduler() minimum_tick_sec = global_config.global_config.get_config_value( 'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float) while not _shutdown: if _lifetime_expired(options.lifetime_hours, process_start_time): break start = time.time() host_scheduler.tick() curr_tick_sec = time.time() - start if (minimum_tick_sec > curr_tick_sec): time.sleep(minimum_tick_sec - curr_tick_sec) else: time.sleep(0.0001) logging.info('Shutdown request recieved. Bye! Bye!') except server_manager_utils.ServerActionError: # This error is expected when the server is not in primary status # for host-scheduler role. Thus do not send email for it. raise except Exception: metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment() raise finally: email_manager.manager.send_queued_emails() if _db_manager: _db_manager.disconnect() def _lifetime_expired(lifetime_hours, process_start_time): """Returns True if we've expired the process lifetime, False otherwise. Also sets the global _shutdown so that any background processes also take the cue to exit. """ if lifetime_hours is None: return False if time.time() - process_start_time > lifetime_hours * 3600: logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', lifetime_hours) global _shutdown _shutdown = True return True return False if __name__ == '__main__': main()