1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11    1. Hosts will not be assigned to multiple hqes simultaneously. The process
12       of assignment in this case refers to the modification of the host_id
13       column of a row in the afe_host_queue_entries table, to reflect the host
14       id of a leased host matching the dependencies of the job.
15    2. Hosts that are not being used by active hqes or incomplete special tasks
16       will be released back to the available hosts pool, for acquisition by
17       subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
23
24** Suport minimum duts requirement for suites (non-inline mode) **
25
26Each suite can specify the minimum number of duts it requires by
27dropping a 'suite_min_duts' job keyval which defaults to 0.
28
29When suites are competing for duts, if any suite has not got minimum duts
30it requires, the host scheduler will try to meet the requirement first,
31even if other suite may have higher priority or earlier timestamp. Once
32all suites' minimum duts requirement have been fullfilled, the host
33scheduler will allocate the rest of duts based on job priority and suite job id.
34This is to prevent low priority suites from starving when sharing pool with
35high-priority suites.
36
37Note:
38    1. Prevent potential starvation:
39       We need to carefully choose |suite_min_duts| for both low and high
40       priority suites. If a high priority suite didn't specify it but a low
41       priority one does, the high priority suite can be starved!
42    2. Restart requirement:
43       Restart host scheduler if you manually released a host by setting
44       leased=0 in db. This is needed because host scheduler maintains internal
45       state of host assignment for suites.
46    3. Exchanging duts triggers provisioning:
47       TODO(fdeng): There is a chance two suites can exchange duts,
48       if the two suites are for different builds, the exchange
49       will trigger provisioning. This can be optimized by preferring getting
50       hosts with the same build.
51"""
52
53import argparse
54import collections
55import datetime
56import logging
57import os
58import signal
59import sys
60import time
61
62import common
63from autotest_lib.client.common_lib import utils
64from autotest_lib.frontend import setup_django_environment
65
66# This import needs to come earlier to avoid using autotest's version of
67# httplib2, which is out of date.
68try:
69    from chromite.lib import metrics
70    from chromite.lib import ts_mon_config
71except ImportError:
72    metrics = utils.metrics_mock
73    ts_mon_config = utils.metrics_mock
74
75from autotest_lib.client.common_lib import global_config
76from autotest_lib.scheduler import email_manager
77from autotest_lib.scheduler import query_managers
78from autotest_lib.scheduler import rdb_lib
79from autotest_lib.scheduler import rdb_utils
80from autotest_lib.scheduler import scheduler_lib
81from autotest_lib.scheduler import scheduler_models
82from autotest_lib.site_utils import server_manager_utils
83
84
85_db_manager = None
86_shutdown = False
87_tick_pause_sec = global_config.global_config.get_config_value(
88        'SCHEDULER', 'tick_pause_sec', type=int, default=5)
89_monitor_db_host_acquisition = global_config.global_config.get_config_value(
90        'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
91_METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
92
93class SuiteRecorder(object):
94    """Recording the host assignment for suites.
95
96    The recorder holds two things:
97        * suite_host_num, records how many duts a suite is holding,
98          which is a map <suite_job_id -> num_of_hosts>
99        * hosts_to_suites, records which host is assigned to which
100          suite, it is a map <host_id -> suite_job_id>
101    The two datastructure got updated when a host is assigned to or released
102    by a job.
103
104    The reason to maintain hosts_to_suites is that, when a host is released,
105    we need to know which suite it was leased to. Querying the db for the
106    latest completed job that has run on a host is slow.  Therefore, we go with
107    an alternative: keeping a <host id, suite job id> map
108    in memory (for 10K hosts, the map should take less than 1M memory on
109    64-bit machine with python 2.7)
110
111    """
112
113
114    def __init__(self, job_query_manager):
115        """Initialize.
116
117        @param job_queue_manager: A JobQueueryManager object.
118        """
119        self.job_query_manager = job_query_manager
120        self.suite_host_num, self.hosts_to_suites = (
121                self.job_query_manager.get_suite_host_assignment())
122
123
124    def record_assignment(self, queue_entry):
125        """Record that the hqe has got a host.
126
127        @param queue_entry: A scheduler_models.HostQueueEntry object which has
128                            got a host.
129        """
130        parent_id = queue_entry.job.parent_job_id
131        if not parent_id:
132            return
133        if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
134            logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
135                          'seems already recorded', queue_entry.id,
136                          parent_id, queue_entry.host.hostname)
137            return
138        num_hosts = self.suite_host_num.get(parent_id, 0)
139        self.suite_host_num[parent_id] = num_hosts + 1
140        self.hosts_to_suites[queue_entry.host_id] = parent_id
141        logging.debug('Suite %d got host %s, currently holding %d hosts',
142                      parent_id, queue_entry.host.hostname,
143                      self.suite_host_num[parent_id])
144
145
146    def record_release(self, hosts):
147        """Update the record with host releasing event.
148
149        @param hosts: A list of scheduler_models.Host objects.
150        """
151        for host in hosts:
152            if host.id in self.hosts_to_suites:
153                parent_job_id = self.hosts_to_suites.pop(host.id)
154                count = self.suite_host_num[parent_job_id] - 1
155                if count == 0:
156                    del self.suite_host_num[parent_job_id]
157                else:
158                    self.suite_host_num[parent_job_id] = count
159                logging.debug(
160                        'Suite %d releases host %s, currently holding %d hosts',
161                        parent_job_id, host.hostname, count)
162
163
164    def get_min_duts(self, suite_job_ids):
165        """Figure out min duts to request.
166
167        Given a set ids of suite jobs, figure out minimum duts to request for
168        each suite. It is determined by two factors: min_duts specified
169        for each suite in its job keyvals, and how many duts a suite is
170        currently holding.
171
172        @param suite_job_ids: A set of suite job ids.
173
174        @returns: A dictionary, the key is suite_job_id, the value
175                  is the minimum number of duts to request.
176        """
177        suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
178                suite_job_ids)
179        for parent_id in suite_job_ids:
180            min_duts = suite_min_duts.get(parent_id, 0)
181            cur_duts = self.suite_host_num.get(parent_id, 0)
182            suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
183        logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
184                      suite_min_duts)
185        return suite_min_duts
186
187
188class BaseHostScheduler(object):
189    """Base class containing host acquisition logic.
190
191    This class contains all the core host acquisition logic needed by the
192    scheduler to run jobs on hosts. It is only capable of releasing hosts
193    back to the rdb through its tick, any other action must be instigated by
194    the job scheduler.
195    """
196
197
198    host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
199
200
201    def __init__(self):
202        self.host_query_manager = query_managers.AFEHostQueryManager()
203
204
205    def _release_hosts(self):
206        """Release hosts to the RDB.
207
208        Release all hosts that are ready and are currently not being used by an
209        active hqe, and don't have a new special task scheduled against them.
210
211        @return a list of hosts that are released.
212        """
213        release_hosts = self.host_query_manager.find_unused_healty_hosts()
214        release_hostnames = [host.hostname for host in release_hosts]
215        if release_hostnames:
216            self.host_query_manager.set_leased(
217                    False, hostname__in=release_hostnames)
218        return release_hosts
219
220
221    @classmethod
222    def schedule_host_job(cls, host, queue_entry):
223        """Schedule a job on a host.
224
225        Scheduling a job involves:
226            1. Setting the active bit on the queue_entry.
227            2. Scheduling a special task on behalf of the queue_entry.
228        Performing these actions will lead the job scheduler through a chain of
229        events, culminating in running the test and collecting results from
230        the host.
231
232        @param host: The host against which to schedule the job.
233        @param queue_entry: The queue_entry to schedule.
234        """
235        if queue_entry.host_id is None:
236            queue_entry.set_host(host)
237        elif host.id != queue_entry.host_id:
238                raise rdb_utils.RDBException('The rdb returned host: %s '
239                        'but the job:%s was already assigned a host: %s. ' %
240                        (host.hostname, queue_entry.job_id,
241                         queue_entry.host.hostname))
242        queue_entry.update_field('active', True)
243
244        # TODO: crbug.com/373936. The host scheduler should only be assigning
245        # jobs to hosts, but the criterion we use to release hosts depends
246        # on it not being used by an active hqe. Since we're activating the
247        # hqe here, we also need to schedule its first prejob task. OTOH,
248        # we could converge to having the host scheduler manager all special
249        # tasks, since their only use today is to verify/cleanup/reset a host.
250        logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
251        queue_entry.schedule_pre_job_tasks()
252
253
254    def acquire_hosts(self, host_jobs):
255        """Accquire hosts for given jobs.
256
257        This method sends jobs that need hosts to rdb.
258        Child class can override this method to pipe more args
259        to rdb.
260
261        @param host_jobs: A list of queue entries that either require hosts,
262            or require host assignment validation through the rdb.
263
264        @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
265                       for each host acquired on behalf of a queue_entry,
266                       or None if a host wasn't found.
267        """
268        return rdb_lib.acquire_hosts(host_jobs)
269
270
271    def find_hosts_for_jobs(self, host_jobs):
272        """Find and verify hosts for a list of jobs.
273
274        @param host_jobs: A list of queue entries that either require hosts,
275            or require host assignment validation through the rdb.
276        @return: A generator of tuples of the form (host, queue_entry) for each
277            valid host-queue_entry assignment.
278        """
279        hosts = self.acquire_hosts(host_jobs)
280        for host, job in zip(hosts, host_jobs):
281            if host:
282                yield self.host_assignment(host, job)
283
284
285    def tick(self):
286        """Schedule core host management activities."""
287        self._release_hosts()
288
289
290class HostScheduler(BaseHostScheduler):
291    """A scheduler capable managing host acquisition for new jobs."""
292
293
294    def __init__(self):
295        super(HostScheduler, self).__init__()
296        self.job_query_manager = query_managers.AFEJobQueryManager()
297        # Keeping track on how many hosts each suite is holding
298        # {suite_job_id: num_hosts}
299        self._suite_recorder = SuiteRecorder(self.job_query_manager)
300
301
302    def _record_host_assignment(self, host, queue_entry):
303        """Record that |host| is assigned to |queue_entry|.
304
305        Record:
306            1. How long it takes to assign a host to a job in metadata db.
307            2. Record host assignment of a suite.
308
309        @param host: A Host object.
310        @param queue_entry: A HostQueueEntry object.
311        """
312        secs_in_queued = (datetime.datetime.now() -
313                          queue_entry.job.created_on).total_seconds()
314        self._suite_recorder.record_assignment(queue_entry)
315
316
317    @metrics.SecondsTimerDecorator(
318            '%s/schedule_jobs_duration' % _METRICS_PREFIX)
319    def _schedule_jobs(self):
320        """Schedule new jobs against hosts."""
321
322        new_jobs_with_hosts = 0
323        queue_entries = self.job_query_manager.get_pending_queue_entries(
324                only_hostless=False)
325        unverified_host_jobs = [job for job in queue_entries
326                                if not job.is_hostless()]
327        if unverified_host_jobs:
328            for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
329                self.schedule_host_job(acquisition.host, acquisition.job)
330                self._record_host_assignment(acquisition.host, acquisition.job)
331                new_jobs_with_hosts += 1
332            metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
333                            ).increment_by(new_jobs_with_hosts)
334
335        num_jobs_without_hosts = (len(unverified_host_jobs) -
336                                  new_jobs_with_hosts)
337        metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
338                      ).set(num_jobs_without_hosts)
339
340        metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
341
342    @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
343    def _lease_hosts_of_frontend_tasks(self):
344        """Lease hosts of tasks scheduled through the frontend."""
345        # We really don't need to get all the special tasks here, just the ones
346        # without hqes, but reusing the method used by the scheduler ensures
347        # we prioritize the same way.
348        lease_hostnames = [
349                task.host.hostname for task in
350                self.job_query_manager.get_prioritized_special_tasks(
351                    only_tasks_with_leased_hosts=False)
352                if task.queue_entry_id is None and not task.host.leased]
353        # Leasing a leased hosts here shouldn't be a problem:
354        # 1. The only way a host can be leased is if it's been assigned to
355        #    an active hqe or another similar frontend task, but doing so will
356        #    have already precluded it from the list of tasks returned by the
357        #    job_query_manager.
358        # 2. The unleasing is done based on global conditions. Eg: Even if a
359        #    task has already leased a host and we lease it again, the
360        #    host scheduler won't release the host till both tasks are complete.
361        if lease_hostnames:
362            self.host_query_manager.set_leased(
363                    True, hostname__in=lease_hostnames)
364
365
366    def acquire_hosts(self, host_jobs):
367        """Override acquire_hosts.
368
369        This method overrides the method in parent class.
370        It figures out a set of suites that |host_jobs| belong to;
371        and get min_duts requirement for each suite.
372        It pipes min_duts for each suite to rdb.
373
374        """
375        parent_job_ids = set([q.job.parent_job_id
376                              for q in host_jobs if q.job.parent_job_id])
377        suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
378        return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
379
380
381    @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
382    def tick(self):
383        logging.info('Calling new tick.')
384        logging.info('Leasing hosts for frontend tasks.')
385        self._lease_hosts_of_frontend_tasks()
386        logging.info('Finding hosts for new jobs.')
387        self._schedule_jobs()
388        logging.info('Releasing unused hosts.')
389        released_hosts = self._release_hosts()
390        logging.info('Updating suite assignment with released hosts')
391        self._suite_recorder.record_release(released_hosts)
392        logging.info('Calling email_manager.')
393        email_manager.manager.send_queued_emails()
394
395
396class DummyHostScheduler(BaseHostScheduler):
397    """A dummy host scheduler that doesn't acquire or release hosts."""
398
399    def __init__(self):
400        pass
401
402
403    def tick(self):
404        pass
405
406
407def handle_signal(signum, frame):
408    """Sigint handler so we don't crash mid-tick."""
409    global _shutdown
410    _shutdown = True
411    logging.info("Shutdown request received.")
412
413
414def initialize(testing=False):
415    """Initialize the host scheduler."""
416    if testing:
417        # Don't import testing utilities unless we're in testing mode,
418        # as the database imports have side effects.
419        from autotest_lib.scheduler import rdb_testing_utils
420        rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
421                db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
422    global _db_manager
423    _db_manager = scheduler_lib.ConnectionManager()
424    scheduler_lib.setup_logging(
425            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
426            None, timestamped_logfile_prefix='host_scheduler')
427    logging.info("Setting signal handler")
428    signal.signal(signal.SIGINT, handle_signal)
429    signal.signal(signal.SIGTERM, handle_signal)
430    scheduler_models.initialize()
431
432
433def parse_arguments(argv):
434    """
435    Parse command line arguments
436
437    @param argv: argument list to parse
438    @returns:    parsed arguments.
439    """
440    parser = argparse.ArgumentParser(description='Host scheduler.')
441    parser.add_argument('--testing', action='store_true', default=False,
442                        help='Start the host scheduler in testing mode.')
443    parser.add_argument('--production',
444                        help=('Indicate that scheduler is running in production'
445                              ' environment and it can use database that is not'
446                              ' hosted in localhost. If it is set to False, '
447                              'scheduler will fail if database is not in '
448                              'localhost.'),
449                        action='store_true', default=False)
450    parser.add_argument(
451            '--lifetime-hours',
452            type=float,
453            default=None,
454            help='If provided, number of hours the scheduler should run for. '
455                 'At the expiry of this time, the process will exit '
456                 'gracefully.',
457    )
458    parser.add_argument(
459            '--metrics-file',
460            help='If provided, drop metrics to this local file instead of '
461                 'reporting to ts_mon',
462            type=str,
463            default=None,
464    )
465    options = parser.parse_args(argv)
466
467    return options
468
469
470def main():
471    if _monitor_db_host_acquisition:
472        logging.info('Please set inline_host_acquisition=False in the shadow '
473                     'config before starting the host scheduler.')
474        sys.exit(0)
475    try:
476        options = parse_arguments(sys.argv[1:])
477        scheduler_lib.check_production_settings(options)
478
479        # If server database is enabled, check if the server has role
480        # `host_scheduler`. If the server does not have host_scheduler role,
481        # exception will be raised and host scheduler will not continue to run.
482        if server_manager_utils.use_server_db():
483            server_manager_utils.confirm_server_has_role(hostname='localhost',
484                                                         role='host_scheduler')
485
486        initialize(options.testing)
487
488        with ts_mon_config.SetupTsMonGlobalState(
489                'autotest_host_scheduler',
490                indirect=True,
491                debug_file=options.metrics_file,
492        ):
493            metrics.Counter('%s/start' % _METRICS_PREFIX).increment()
494            process_start_time = time.time()
495            host_scheduler = HostScheduler()
496            minimum_tick_sec = global_config.global_config.get_config_value(
497                    'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float)
498            while not _shutdown:
499                if _lifetime_expired(options.lifetime_hours,
500                                     process_start_time):
501                    break
502                start = time.time()
503                host_scheduler.tick()
504                curr_tick_sec = time.time() - start
505                if (minimum_tick_sec > curr_tick_sec):
506                    time.sleep(minimum_tick_sec - curr_tick_sec)
507                else:
508                    time.sleep(0.0001)
509            logging.info('Shutdown request recieved. Bye! Bye!')
510    except server_manager_utils.ServerActionError:
511        # This error is expected when the server is not in primary status
512        # for host-scheduler role. Thus do not send email for it.
513        raise
514    except Exception:
515        metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment()
516        raise
517    finally:
518        email_manager.manager.send_queued_emails()
519        if _db_manager:
520            _db_manager.disconnect()
521
522
523def _lifetime_expired(lifetime_hours, process_start_time):
524    """Returns True if we've expired the process lifetime, False otherwise.
525
526    Also sets the global _shutdown so that any background processes also take
527    the cue to exit.
528    """
529    if lifetime_hours is None:
530        return False
531    if time.time() - process_start_time > lifetime_hours * 3600:
532        logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
533                     lifetime_hours)
534        global _shutdown
535        _shutdown = True
536        return True
537    return False
538
539
540if __name__ == '__main__':
541    main()
542