1#!/usr/bin/python
2
3#pylint: disable=C0111
4
5"""
6Autotest scheduler
7"""
8
9import datetime
10import functools
11import logging
12import optparse
13import os
14import signal
15import sys
16import time
17
18import common
19from autotest_lib.frontend import setup_django_environment
20
21import django.db
22
23from autotest_lib.client.common_lib import control_data
24from autotest_lib.client.common_lib import global_config
25from autotest_lib.client.common_lib import utils
26from autotest_lib.frontend.afe import models
27from autotest_lib.scheduler import agent_task, drone_manager
28from autotest_lib.scheduler import email_manager, host_scheduler
29from autotest_lib.scheduler import luciferlib
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
31from autotest_lib.scheduler import postjob_task
32from autotest_lib.scheduler import query_managers
33from autotest_lib.scheduler import scheduler_lib
34from autotest_lib.scheduler import scheduler_models
35from autotest_lib.scheduler import scheduler_config
36from autotest_lib.server import autoserv_utils
37from autotest_lib.server import system_utils
38from autotest_lib.server import utils as server_utils
39from autotest_lib.site_utils import server_manager_utils
40
41try:
42    from chromite.lib import metrics
43    from chromite.lib import ts_mon_config
44except ImportError:
45    metrics = utils.metrics_mock
46    ts_mon_config = utils.metrics_mock
47
48
49PID_FILE_PREFIX = 'monitor_db'
50
51RESULTS_DIR = '.'
52AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
53
54if os.environ.has_key('AUTOTEST_DIR'):
55    AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
56AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
57AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
58
59if AUTOTEST_SERVER_DIR not in sys.path:
60    sys.path.insert(0, AUTOTEST_SERVER_DIR)
61
62# error message to leave in results dir when an autoserv process disappears
63# mysteriously
64_LOST_PROCESS_ERROR = """\
65Autoserv failed abnormally during execution for this job, probably due to a
66system error on the Autotest server.  Full results may not be available.  Sorry.
67"""
68
69_db_manager = None
70_db = None
71_shutdown = False
72
73# These 2 globals are replaced for testing
74_autoserv_directory = autoserv_utils.autoserv_directory
75_autoserv_path = autoserv_utils.autoserv_path
76_testing_mode = False
77_drone_manager = None
78
79
80def _verify_default_drone_set_exists():
81    if (models.DroneSet.drone_sets_enabled() and
82            not models.DroneSet.default_drone_set_name()):
83        raise scheduler_lib.SchedulerError(
84                'Drone sets are enabled, but no default is set')
85
86
87def _sanity_check():
88    """Make sure the configs are consistent before starting the scheduler"""
89    _verify_default_drone_set_exists()
90
91
92def main():
93    try:
94        try:
95            main_without_exception_handling()
96        except SystemExit:
97            raise
98        except:
99            logging.exception('Exception escaping in monitor_db')
100            raise
101    finally:
102        utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
103
104
105def main_without_exception_handling():
106    scheduler_lib.setup_logging(
107            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
108            os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
109    usage = 'usage: %prog [options] results_dir'
110    parser = optparse.OptionParser(usage)
111    parser.add_option('--recover-hosts', help='Try to recover dead hosts',
112                      action='store_true')
113    parser.add_option('--test', help='Indicate that scheduler is under ' +
114                      'test and should use dummy autoserv and no parsing',
115                      action='store_true')
116    parser.add_option(
117            '--metrics-file',
118            help='If provided, drop metrics to this local file instead of '
119                 'reporting to ts_mon',
120            type=str,
121            default=None,
122    )
123    parser.add_option(
124            '--lifetime-hours',
125            type=float,
126            default=None,
127            help='If provided, number of hours the scheduler should run for. '
128                 'At the expiry of this time, the process will exit '
129                 'gracefully.',
130    )
131    parser.add_option('--production',
132                      help=('Indicate that scheduler is running in production '
133                            'environment and it can use database that is not '
134                            'hosted in localhost. If it is set to False, '
135                            'scheduler will fail if database is not in '
136                            'localhost.'),
137                      action='store_true', default=False)
138    (options, args) = parser.parse_args()
139    if len(args) != 1:
140        parser.print_usage()
141        return
142
143    scheduler_lib.check_production_settings(options)
144
145    scheduler_enabled = global_config.global_config.get_config_value(
146        scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
147
148    if not scheduler_enabled:
149        logging.error("Scheduler not enabled, set enable_scheduler to true in "
150                      "the global_config's SCHEDULER section to enable it. "
151                      "Exiting.")
152        sys.exit(1)
153
154    global RESULTS_DIR
155    RESULTS_DIR = args[0]
156
157    # Change the cwd while running to avoid issues incase we were launched from
158    # somewhere odd (such as a random NFS home directory of the person running
159    # sudo to launch us as the appropriate user).
160    os.chdir(RESULTS_DIR)
161
162    # This is helpful for debugging why stuff a scheduler launches is
163    # misbehaving.
164    logging.info('os.environ: %s', os.environ)
165
166    if options.test:
167        global _autoserv_path
168        _autoserv_path = 'autoserv_dummy'
169        global _testing_mode
170        _testing_mode = True
171
172    with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
173                                             indirect=True,
174                                             debug_file=options.metrics_file):
175      try:
176          metrics.Counter('chromeos/autotest/scheduler/start').increment()
177          process_start_time = time.time()
178          initialize()
179          dispatcher = Dispatcher()
180          dispatcher.initialize(recover_hosts=options.recover_hosts)
181          minimum_tick_sec = global_config.global_config.get_config_value(
182                  scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
183
184          # TODO(crbug.com/837680): Force creating the current user.
185          # This is a dirty hack to work around a race; see bug.
186          models.User.current_user()
187
188          while not _shutdown:
189              if _lifetime_expired(options.lifetime_hours, process_start_time):
190                  break
191
192              start = time.time()
193              dispatcher.tick()
194              curr_tick_sec = time.time() - start
195              if minimum_tick_sec > curr_tick_sec:
196                  time.sleep(minimum_tick_sec - curr_tick_sec)
197              else:
198                  time.sleep(0.0001)
199      except server_manager_utils.ServerActionError as e:
200          # This error is expected when the server is not in primary status
201          # for scheduler role. Thus do not send email for it.
202          logging.exception(e)
203      except Exception:
204          logging.exception('Uncaught exception, terminating monitor_db.')
205          metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
206                          ).increment()
207
208    email_manager.manager.send_queued_emails()
209    _drone_manager.shutdown()
210    _db_manager.disconnect()
211
212
213def handle_signal(signum, frame):
214    global _shutdown
215    _shutdown = True
216    logging.info("Shutdown request received.")
217
218
219def _lifetime_expired(lifetime_hours, process_start_time):
220    """Returns True if we've expired the process lifetime, False otherwise.
221
222    Also sets the global _shutdown so that any background processes also take
223    the cue to exit.
224    """
225    if lifetime_hours is None:
226        return False
227    if time.time() - process_start_time > lifetime_hours * 3600:
228        logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
229                     lifetime_hours)
230        global _shutdown
231        _shutdown = True
232        return True
233    return False
234
235
236def initialize():
237    logging.info("%s> dispatcher starting", time.strftime("%X %x"))
238    logging.info("My PID is %d", os.getpid())
239
240    if utils.program_is_alive(PID_FILE_PREFIX):
241        logging.critical("monitor_db already running, aborting!")
242        sys.exit(1)
243    utils.write_pid(PID_FILE_PREFIX)
244
245    if _testing_mode:
246        global_config.global_config.override_config_value(
247            scheduler_lib.DB_CONFIG_SECTION, 'database',
248            'stresstest_autotest_web')
249
250    # If server database is enabled, check if the server has role `scheduler`.
251    # If the server does not have scheduler role, exception will be raised and
252    # scheduler will not continue to run.
253    if server_manager_utils.use_server_db():
254        server_manager_utils.confirm_server_has_role(hostname='localhost',
255                                                     role='scheduler')
256
257    os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
258    global _db_manager
259    _db_manager = scheduler_lib.ConnectionManager()
260    global _db
261    _db = _db_manager.get_connection()
262    logging.info("Setting signal handler")
263    signal.signal(signal.SIGINT, handle_signal)
264    signal.signal(signal.SIGTERM, handle_signal)
265
266    initialize_globals()
267    scheduler_models.initialize()
268
269    drone_list = system_utils.get_drones()
270    results_host = global_config.global_config.get_config_value(
271        scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
272    _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
273
274    logging.info("Connected! Running...")
275
276
277def initialize_globals():
278    global _drone_manager
279    _drone_manager = drone_manager.instance()
280
281
282def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
283                           verbose=True):
284    """
285    @returns The autoserv command line as a list of executable + parameters.
286
287    @param machines - string - A machine or comma separated list of machines
288            for the (-m) flag.
289    @param extra_args - list - Additional arguments to pass to autoserv.
290    @param job - Job object - If supplied, -u owner, -l name and client -c or
291            server -s parameters will be added.
292    @param queue_entry - A HostQueueEntry object - If supplied and no Job
293            object was supplied, this will be used to lookup the Job object.
294    """
295    command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
296            machines, results_directory=drone_manager.WORKING_DIRECTORY,
297            extra_args=extra_args, job=job, queue_entry=queue_entry,
298            verbose=verbose, in_lab=True)
299    return command
300
301def _calls_log_tick_msg(func):
302    """Used to trace functions called by Dispatcher.tick."""
303    @functools.wraps(func)
304    def wrapper(self, *args, **kwargs):
305        self._log_tick_msg('Starting %s' % func.__name__)
306        return func(self, *args, **kwargs)
307
308    return wrapper
309
310
311class Dispatcher(object):
312
313
314    def __init__(self):
315        self._agents = []
316        self._last_clean_time = time.time()
317        user_cleanup_time = scheduler_config.config.clean_interval_minutes
318        self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
319                _db, user_cleanup_time)
320        self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
321                _db, _drone_manager)
322        self._host_agents = {}
323        self._queue_entry_agents = {}
324        self._tick_count = 0
325        self._tick_debug = global_config.global_config.get_config_value(
326                scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
327                default=False)
328        self._extra_debugging = global_config.global_config.get_config_value(
329                scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
330                default=False)
331        self._inline_host_acquisition = (
332                global_config.global_config.get_config_value(
333                        scheduler_config.CONFIG_SECTION,
334                        'inline_host_acquisition', type=bool, default=True))
335
336        # If _inline_host_acquisition is set the scheduler will acquire and
337        # release hosts against jobs inline, with the tick. Otherwise the
338        # scheduler will only focus on jobs that already have hosts, and
339        # will not explicitly unlease a host when a job finishes using it.
340        self._job_query_manager = query_managers.AFEJobQueryManager()
341        self._host_scheduler = (host_scheduler.BaseHostScheduler()
342                                if self._inline_host_acquisition else
343                                host_scheduler.DummyHostScheduler())
344
345
346    def initialize(self, recover_hosts=True):
347        self._periodic_cleanup.initialize()
348        self._24hr_upkeep.initialize()
349        # Execute all actions queued in the cleanup tasks. Scheduler tick will
350        # run a refresh task first. If there is any action in the queue, refresh
351        # will raise an exception.
352        _drone_manager.execute_actions()
353
354        # always recover processes
355        self._recover_processes()
356
357        if recover_hosts:
358            self._recover_hosts()
359
360
361    # TODO(pprabhu) Drop this metric once tick_times has been verified.
362    @metrics.SecondsTimerDecorator(
363            'chromeos/autotest/scheduler/tick_durations/tick')
364    def tick(self):
365        """
366        This is an altered version of tick() where we keep track of when each
367        major step begins so we can try to figure out where we are using most
368        of the tick time.
369        """
370        with metrics.RuntimeBreakdownTimer(
371            'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
372            self._log_tick_msg('New tick')
373            system_utils.DroneCache.refresh()
374
375            with breakdown_timer.Step('trigger_refresh'):
376                self._log_tick_msg('Starting _drone_manager.trigger_refresh')
377                _drone_manager.trigger_refresh()
378            with breakdown_timer.Step('schedule_running_host_queue_entries'):
379                self._schedule_running_host_queue_entries()
380            with breakdown_timer.Step('schedule_special_tasks'):
381                self._schedule_special_tasks()
382            with breakdown_timer.Step('schedule_new_jobs'):
383                self._schedule_new_jobs()
384            with breakdown_timer.Step('gather_tick_metrics'):
385                self._gather_tick_metrics()
386            with breakdown_timer.Step('sync_refresh'):
387                self._log_tick_msg('Starting _drone_manager.sync_refresh')
388                _drone_manager.sync_refresh()
389            if luciferlib.is_lucifer_enabled():
390                with breakdown_timer.Step('send_to_lucifer'):
391                    self._send_to_lucifer()
392            # _run_cleanup must be called between drone_manager.sync_refresh,
393            # and drone_manager.execute_actions, as sync_refresh will clear the
394            # calls queued in drones. Therefore, any action that calls
395            # drone.queue_call to add calls to the drone._calls, should be after
396            # drone refresh is completed and before
397            # drone_manager.execute_actions at the end of the tick.
398            with breakdown_timer.Step('run_cleanup'):
399                self._run_cleanup()
400            with breakdown_timer.Step('find_aborting'):
401                self._find_aborting()
402            with breakdown_timer.Step('find_aborted_special_tasks'):
403                self._find_aborted_special_tasks()
404            with breakdown_timer.Step('handle_agents'):
405                self._handle_agents()
406            with breakdown_timer.Step('host_scheduler_tick'):
407                self._log_tick_msg('Starting _host_scheduler.tick')
408                self._host_scheduler.tick()
409            with breakdown_timer.Step('drones_execute_actions'):
410                self._log_tick_msg('Starting _drone_manager.execute_actions')
411                _drone_manager.execute_actions()
412            with breakdown_timer.Step('send_queued_emails'):
413                self._log_tick_msg(
414                    'Starting email_manager.manager.send_queued_emails')
415                email_manager.manager.send_queued_emails()
416            with breakdown_timer.Step('db_reset_queries'):
417                self._log_tick_msg('Starting django.db.reset_queries')
418                django.db.reset_queries()
419
420            self._tick_count += 1
421            metrics.Counter('chromeos/autotest/scheduler/tick').increment()
422
423
424    @_calls_log_tick_msg
425    def _run_cleanup(self):
426        self._periodic_cleanup.run_cleanup_maybe()
427        self._24hr_upkeep.run_cleanup_maybe()
428
429
430    def _gather_tick_metrics(self):
431        """Gather metrics during tick, after all tasks have been scheduled."""
432        metrics.Gauge(
433            'chromeos/autotest/scheduler/agent_count'
434        ).set(len(self._agents))
435
436
437    def _register_agent_for_ids(self, agent_dict, object_ids, agent):
438        for object_id in object_ids:
439            agent_dict.setdefault(object_id, set()).add(agent)
440
441
442    def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
443        for object_id in object_ids:
444            assert object_id in agent_dict
445            agent_dict[object_id].remove(agent)
446            # If an ID has no more active agent associated, there is no need to
447            # keep it in the dictionary. Otherwise, scheduler will keep an
448            # unnecessarily big dictionary until being restarted.
449            if not agent_dict[object_id]:
450                agent_dict.pop(object_id)
451
452
453    def add_agent_task(self, agent_task):
454        """
455        Creates and adds an agent to the dispatchers list.
456
457        In creating the agent we also pass on all the queue_entry_ids and
458        host_ids from the special agent task. For every agent we create, we
459        add it to 1. a dict against the queue_entry_ids given to it 2. A dict
460        against the host_ids given to it. So theoritically, a host can have any
461        number of agents associated with it, and each of them can have any
462        special agent task, though in practice we never see > 1 agent/task per
463        host at any time.
464
465        @param agent_task: A SpecialTask for the agent to manage.
466        """
467        if luciferlib.is_enabled_for('STARTING'):
468            # TODO(crbug.com/810141): Transition code.  After running at
469            # STARTING for a while, these tasks should no longer exist.
470            if (isinstance(agent_task, postjob_task.GatherLogsTask)
471                # TODO(crbug.com/811877): Don't skip split HQE parsing.
472                or (isinstance(agent_task, postjob_task.FinalReparseTask)
473                    and not luciferlib.is_split_job(
474                            agent_task.queue_entries[0].id))):
475                return
476            if isinstance(agent_task, AbstractQueueTask):
477                # If Lucifer already owns the job, ignore the agent.
478                if luciferlib.is_lucifer_owned_by_id(agent_task.job.id):
479                    return
480                # If the job isn't started yet, let Lucifer own it.
481                if not agent_task.started:
482                    return
483                # Otherwise, this is a STARTING job that Autotest owned
484                # before Lucifer was enabled for STARTING.  Allow the
485                # scheduler to recover the agent task normally.
486
487        agent = Agent(agent_task)
488        self._agents.append(agent)
489        agent.dispatcher = self
490        self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
491        self._register_agent_for_ids(self._queue_entry_agents,
492                                     agent.queue_entry_ids, agent)
493
494
495    def get_agents_for_entry(self, queue_entry):
496        """
497        Find agents corresponding to the specified queue_entry.
498        """
499        return list(self._queue_entry_agents.get(queue_entry.id, set()))
500
501
502    def host_has_agent(self, host):
503        """
504        Determine if there is currently an Agent present using this host.
505        """
506        return bool(self._host_agents.get(host.id, None))
507
508
509    def remove_agent(self, agent):
510        self._agents.remove(agent)
511        self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
512                                       agent)
513        self._unregister_agent_for_ids(self._queue_entry_agents,
514                                       agent.queue_entry_ids, agent)
515
516
517    def _host_has_scheduled_special_task(self, host):
518        return bool(models.SpecialTask.objects.filter(host__id=host.id,
519                                                      is_active=False,
520                                                      is_complete=False))
521
522
523    def _recover_processes(self):
524        agent_tasks = self._create_recovery_agent_tasks()
525        self._register_pidfiles(agent_tasks)
526        _drone_manager.refresh()
527        self._recover_tasks(agent_tasks)
528        self._recover_pending_entries()
529        self._check_for_unrecovered_verifying_entries()
530        self._reverify_remaining_hosts()
531        # reinitialize drones after killing orphaned processes, since they can
532        # leave around files when they die
533        _drone_manager.execute_actions()
534        _drone_manager.reinitialize_drones()
535
536
537    def _create_recovery_agent_tasks(self):
538        return (self._get_queue_entry_agent_tasks()
539                + self._get_special_task_agent_tasks(is_active=True))
540
541
542    def _get_queue_entry_agent_tasks(self):
543        """
544        Get agent tasks for all hqe in the specified states.
545
546        Loosely this translates to taking a hqe in one of the specified states,
547        say parsing, and getting an AgentTask for it, like the FinalReparseTask,
548        through _get_agent_task_for_queue_entry. Each queue entry can only have
549        one agent task at a time, but there might be multiple queue entries in
550        the group.
551
552        @return: A list of AgentTasks.
553        """
554        # host queue entry statuses handled directly by AgentTasks
555        # (Verifying is handled through SpecialTasks, so is not
556        # listed here)
557        statuses = (models.HostQueueEntry.Status.STARTING,
558                    models.HostQueueEntry.Status.RUNNING,
559                    models.HostQueueEntry.Status.GATHERING,
560                    models.HostQueueEntry.Status.PARSING)
561        status_list = ','.join("'%s'" % status for status in statuses)
562        queue_entries = scheduler_models.HostQueueEntry.fetch(
563                where='status IN (%s)' % status_list)
564
565        agent_tasks = []
566        used_queue_entries = set()
567        hqe_count_by_status = {}
568        for entry in queue_entries:
569            try:
570                hqe_count_by_status[entry.status] = (
571                    hqe_count_by_status.get(entry.status, 0) + 1)
572                if self.get_agents_for_entry(entry):
573                    # already being handled
574                    continue
575                if entry in used_queue_entries:
576                    # already picked up by a synchronous job
577                    continue
578                try:
579                    agent_task = self._get_agent_task_for_queue_entry(entry)
580                except scheduler_lib.SchedulerError:
581                    # Probably being handled by lucifer crbug.com/809773
582                    continue
583                agent_tasks.append(agent_task)
584                used_queue_entries.update(agent_task.queue_entries)
585            except scheduler_lib.MalformedRecordError as e:
586                logging.exception('Skipping agent task for a malformed hqe.')
587                # TODO(akeshet): figure out a way to safely permanently discard
588                # this errant HQE. It appears that calling entry.abort() is not
589                # sufficient, as that already makes some assumptions about
590                # record sanity that may be violated. See crbug.com/739530 for
591                # context.
592                m = 'chromeos/autotest/scheduler/skipped_malformed_hqe'
593                metrics.Counter(m).increment()
594
595        for status, count in hqe_count_by_status.iteritems():
596            metrics.Gauge(
597                'chromeos/autotest/scheduler/active_host_queue_entries'
598            ).set(count, fields={'status': status})
599
600        return agent_tasks
601
602
603    def _get_special_task_agent_tasks(self, is_active=False):
604        special_tasks = models.SpecialTask.objects.filter(
605                is_active=is_active, is_complete=False)
606        agent_tasks = []
607        for task in special_tasks:
608          try:
609              agent_tasks.append(self._get_agent_task_for_special_task(task))
610          except scheduler_lib.MalformedRecordError as e:
611              logging.exception('Skipping agent task for malformed special '
612                                'task.')
613              m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'
614              metrics.Counter(m).increment()
615        return agent_tasks
616
617
618    def _get_agent_task_for_queue_entry(self, queue_entry):
619        """
620        Construct an AgentTask instance for the given active HostQueueEntry.
621
622        @param queue_entry: a HostQueueEntry
623        @return: an AgentTask to run the queue entry
624        """
625        task_entries = queue_entry.job.get_group_entries(queue_entry)
626        self._check_for_duplicate_host_entries(task_entries)
627
628        if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
629                                  models.HostQueueEntry.Status.RUNNING):
630            if queue_entry.is_hostless():
631                return HostlessQueueTask(queue_entry=queue_entry)
632            return QueueTask(queue_entries=task_entries)
633        if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
634            return postjob_task.GatherLogsTask(queue_entries=task_entries)
635        if queue_entry.status == models.HostQueueEntry.Status.PARSING:
636            return postjob_task.FinalReparseTask(queue_entries=task_entries)
637
638        raise scheduler_lib.MalformedRecordError(
639                '_get_agent_task_for_queue_entry got entry with '
640                'invalid status %s: %s' % (queue_entry.status, queue_entry))
641
642
643    def _check_for_duplicate_host_entries(self, task_entries):
644        non_host_statuses = {models.HostQueueEntry.Status.PARSING}
645        for task_entry in task_entries:
646            using_host = (task_entry.host is not None
647                          and task_entry.status not in non_host_statuses)
648            if using_host:
649                self._assert_host_has_no_agent(task_entry)
650
651
652    def _assert_host_has_no_agent(self, entry):
653        """
654        @param entry: a HostQueueEntry or a SpecialTask
655        """
656        if self.host_has_agent(entry.host):
657            agent = tuple(self._host_agents.get(entry.host.id))[0]
658            raise scheduler_lib.MalformedRecordError(
659                    'While scheduling %s, host %s already has a host agent %s'
660                    % (entry, entry.host, agent.task))
661
662
663    def _get_agent_task_for_special_task(self, special_task):
664        """
665        Construct an AgentTask class to run the given SpecialTask and add it
666        to this dispatcher.
667
668        A special task is created through schedule_special_tasks, but only if
669        the host doesn't already have an agent. This happens through
670        add_agent_task. All special agent tasks are given a host on creation,
671        and a Null hqe. To create a SpecialAgentTask object, you need a
672        models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
673        object contains a hqe it's passed on to the special agent task, which
674        creates a HostQueueEntry and saves it as it's queue_entry.
675
676        @param special_task: a models.SpecialTask instance
677        @returns an AgentTask to run this SpecialTask
678        """
679        self._assert_host_has_no_agent(special_task)
680
681        special_agent_task_classes = (prejob_task.CleanupTask,
682                                      prejob_task.VerifyTask,
683                                      prejob_task.RepairTask,
684                                      prejob_task.ResetTask,
685                                      prejob_task.ProvisionTask)
686
687        for agent_task_class in special_agent_task_classes:
688            if agent_task_class.TASK_TYPE == special_task.task:
689                return agent_task_class(task=special_task)
690
691        raise scheduler_lib.MalformedRecordError(
692                'No AgentTask class for task', str(special_task))
693
694
695    def _register_pidfiles(self, agent_tasks):
696        for agent_task in agent_tasks:
697            agent_task.register_necessary_pidfiles()
698
699
700    def _recover_tasks(self, agent_tasks):
701        orphans = _drone_manager.get_orphaned_autoserv_processes()
702
703        for agent_task in agent_tasks:
704            agent_task.recover()
705            if agent_task.monitor and agent_task.monitor.has_process():
706                orphans.discard(agent_task.monitor.get_process())
707            self.add_agent_task(agent_task)
708
709        self._check_for_remaining_orphan_processes(orphans)
710
711
712    def _get_unassigned_entries(self, status):
713        for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
714                                                           % status):
715            if entry.status == status and not self.get_agents_for_entry(entry):
716                # The status can change during iteration, e.g., if job.run()
717                # sets a group of queue entries to Starting
718                yield entry
719
720
721    def _check_for_remaining_orphan_processes(self, orphans):
722        m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
723        metrics.Gauge(m).set(len(orphans))
724
725        if not orphans:
726            return
727        subject = 'Unrecovered orphan autoserv processes remain'
728        message = '\n'.join(str(process) for process in orphans)
729        die_on_orphans = global_config.global_config.get_config_value(
730            scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
731
732        if die_on_orphans:
733            raise RuntimeError(subject + '\n' + message)
734
735
736    def _recover_pending_entries(self):
737        for entry in self._get_unassigned_entries(
738                models.HostQueueEntry.Status.PENDING):
739            logging.info('Recovering Pending entry %s', entry)
740            try:
741                entry.on_pending()
742            except scheduler_lib.MalformedRecordError as e:
743                logging.exception(
744                        'Skipping agent task for malformed special task.')
745                m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'
746                metrics.Counter(m).increment()
747
748
749    def _check_for_unrecovered_verifying_entries(self):
750        # Verify is replaced by Reset.
751        queue_entries = scheduler_models.HostQueueEntry.fetch(
752                where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
753        for queue_entry in queue_entries:
754            special_tasks = models.SpecialTask.objects.filter(
755                    task__in=(models.SpecialTask.Task.CLEANUP,
756                              models.SpecialTask.Task.VERIFY,
757                              models.SpecialTask.Task.RESET),
758                    queue_entry__id=queue_entry.id,
759                    is_complete=False)
760            if special_tasks.count() == 0:
761                logging.error('Unrecovered Resetting host queue entry: %s. ',
762                              str(queue_entry))
763                # Essentially this host queue entry was set to be Verifying
764                # however no special task exists for entry. This occurs if the
765                # scheduler dies between changing the status and creating the
766                # special task. By setting it to queued, the job can restart
767                # from the beginning and proceed correctly. This is much more
768                # preferable than having monitor_db not launching.
769                logging.info('Setting host status for %s to Ready',
770                             str(queue_entry.host))
771                # Let's at least run a cleanup/reset before reusing this DUT.
772                queue_entry.host.update_field('dirty', 1)
773                queue_entry.host.set_status(models.Host.Status.READY)
774                logging.info('Setting status for HQE %s to Queued.',
775                             str(queue_entry))
776                queue_entry.set_status('Queued')
777
778
779    @_calls_log_tick_msg
780    def _schedule_special_tasks(self):
781        """
782        Execute queued SpecialTasks that are ready to run on idle hosts.
783
784        Special tasks include PreJobTasks like verify, reset and cleanup.
785        They are created through _schedule_new_jobs and associated with a hqe
786        This method translates SpecialTasks to the appropriate AgentTask and
787        adds them to the dispatchers agents list, so _handle_agents can execute
788        them.
789        """
790        # When the host scheduler is responsible for acquisition we only want
791        # to run tasks with leased hosts. All hqe tasks will already have
792        # leased hosts, and we don't want to run frontend tasks till the host
793        # scheduler has vetted the assignment. Note that this doesn't include
794        # frontend tasks with hosts leased by other active hqes.
795        for task in self._job_query_manager.get_prioritized_special_tasks(
796                only_tasks_with_leased_hosts=not self._inline_host_acquisition):
797            if self.host_has_agent(task.host):
798                continue
799            try:
800                self.add_agent_task(self._get_agent_task_for_special_task(task))
801            except scheduler_lib.MalformedRecordError:
802                logging.exception('Skipping schedule for malformed '
803                                  'special task.')
804                m = 'chromeos/autotest/scheduler/skipped_schedule_special_task'
805                metrics.Counter(m).increment()
806
807
808    def _reverify_remaining_hosts(self):
809        # recover active hosts that have not yet been recovered, although this
810        # should never happen
811        message = ('Recovering active host %s - this probably indicates a '
812                   'scheduler bug')
813        self._reverify_hosts_where(
814                "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
815                print_message=message)
816
817
818    DEFAULT_REQUESTED_BY_USER_ID = 1
819
820
821    def _reverify_hosts_where(self, where,
822                              print_message='Reverifying host %s'):
823        full_where = 'locked = 0 AND invalid = 0 AND %s' % where
824        for host in scheduler_models.Host.fetch(where=full_where):
825            if self.host_has_agent(host):
826                # host has already been recovered in some way
827                continue
828            if self._host_has_scheduled_special_task(host):
829                # host will have a special task scheduled on the next cycle
830                continue
831            if host.shard_id is not None and not server_utils.is_shard():
832                # I am master and host is owned by a shard, ignore it.
833                continue
834            if print_message:
835                logging.error(print_message, host.hostname)
836            try:
837                user = models.User.objects.get(login='autotest_system')
838            except models.User.DoesNotExist:
839                user = models.User.objects.get(
840                        id=self.DEFAULT_REQUESTED_BY_USER_ID)
841            models.SpecialTask.objects.create(
842                    task=models.SpecialTask.Task.RESET,
843                    host=models.Host.objects.get(id=host.id),
844                    requested_by=user)
845
846
847    def _recover_hosts(self):
848        # recover "Repair Failed" hosts
849        message = 'Reverifying dead host %s'
850        self._reverify_hosts_where("status = 'Repair Failed'",
851                                   print_message=message)
852
853
854    def _refresh_pending_queue_entries(self):
855        """
856        Lookup the pending HostQueueEntries and call our HostScheduler
857        refresh() method given that list.  Return the list.
858
859        @returns A list of pending HostQueueEntries sorted in priority order.
860        """
861        queue_entries = self._job_query_manager.get_pending_queue_entries(
862                only_hostless=not self._inline_host_acquisition)
863        if not queue_entries:
864            return []
865        return queue_entries
866
867
868    def _schedule_hostless_job(self, queue_entry):
869        """Schedule a hostless (suite) job.
870
871        @param queue_entry: The queue_entry representing the hostless job.
872        """
873        if not luciferlib.is_enabled_for('STARTING'):
874            self.add_agent_task(HostlessQueueTask(queue_entry))
875
876        # Need to set execution_subdir before setting the status:
877        # After a restart of the scheduler, agents will be restored for HQEs in
878        # Starting, Running, Gathering, Parsing or Archiving. To do this, the
879        # execution_subdir is needed. Therefore it must be set before entering
880        # one of these states.
881        # Otherwise, if the scheduler was interrupted between setting the status
882        # and the execution_subdir, upon it's restart restoring agents would
883        # fail.
884        # Is there a way to get a status in one of these states without going
885        # through this code? Following cases are possible:
886        # - If it's aborted before being started:
887        #     active bit will be 0, so there's nothing to parse, it will just be
888        #     set to completed by _find_aborting. Critical statuses are skipped.
889        # - If it's aborted or it fails after being started:
890        #     It was started, so this code was executed.
891        queue_entry.update_field('execution_subdir', 'hostless')
892        queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
893
894
895    def _schedule_host_job(self, host, queue_entry):
896        """Schedules a job on the given host.
897
898        1. Assign the host to the hqe, if it isn't already assigned.
899        2. Create a SpecialAgentTask for the hqe.
900        3. Activate the hqe.
901
902        @param queue_entry: The job to schedule.
903        @param host: The host to schedule the job on.
904        """
905        if self.host_has_agent(host):
906            host_agent_task = list(self._host_agents.get(host.id))[0].task
907        else:
908            self._host_scheduler.schedule_host_job(host, queue_entry)
909
910
911    @_calls_log_tick_msg
912    def _schedule_new_jobs(self):
913        """
914        Find any new HQEs and call schedule_pre_job_tasks for it.
915
916        This involves setting the status of the HQE and creating a row in the
917        db corresponding the the special task, through
918        scheduler_models._queue_special_task. The new db row is then added as
919        an agent to the dispatcher through _schedule_special_tasks and
920        scheduled for execution on the drone through _handle_agents.
921        """
922        queue_entries = self._refresh_pending_queue_entries()
923
924        key = 'scheduler.jobs_per_tick'
925        new_hostless_jobs = 0
926        new_jobs_with_hosts = 0
927        new_jobs_need_hosts = 0
928        host_jobs = []
929        logging.debug('Processing %d queue_entries', len(queue_entries))
930
931        for queue_entry in queue_entries:
932            if queue_entry.is_hostless():
933                self._schedule_hostless_job(queue_entry)
934                new_hostless_jobs = new_hostless_jobs + 1
935            else:
936                host_jobs.append(queue_entry)
937                new_jobs_need_hosts = new_jobs_need_hosts + 1
938
939        metrics.Counter(
940            'chromeos/autotest/scheduler/scheduled_jobs_hostless'
941        ).increment_by(new_hostless_jobs)
942
943        if not host_jobs:
944            return
945
946        if not self._inline_host_acquisition:
947          # In this case, host_scheduler is responsible for scheduling
948          # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
949          # since host_scheduler assumes it is the single process scheduling
950          # host jobs.
951          metrics.Gauge(
952              'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
953                  len(host_jobs))
954          return
955
956        jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
957        for host_assignment in jobs_with_hosts:
958            self._schedule_host_job(host_assignment.host, host_assignment.job)
959            new_jobs_with_hosts = new_jobs_with_hosts + 1
960
961        metrics.Counter(
962            'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
963        ).increment_by(new_jobs_with_hosts)
964
965
966    @_calls_log_tick_msg
967    def _send_to_lucifer(self):
968        """
969        Hand off ownership of a job to lucifer component.
970        """
971        self._send_starting_to_lucifer()
972        self._send_parsing_to_lucifer()
973
974
975    # TODO(crbug.com/748234): This is temporary to enable toggling
976    # lucifer rollouts with an option.
977    def _send_starting_to_lucifer(self):
978        Status = models.HostQueueEntry.Status
979        queue_entries_qs = (models.HostQueueEntry.objects
980                            .filter(status=Status.STARTING))
981        for queue_entry in queue_entries_qs:
982            if self.get_agents_for_entry(queue_entry):
983                continue
984            job = queue_entry.job
985            if luciferlib.is_lucifer_owned(job):
986                continue
987            try:
988                drone = luciferlib.spawn_starting_job_handler(
989                        manager=_drone_manager,
990                        job=job)
991            except Exception:
992                logging.exception('Error when sending job to Lucifer')
993                models.HostQueueEntry.abort_host_queue_entries(
994                        job.hostqueueentry_set.all())
995            else:
996                models.JobHandoff.objects.create(
997                        job=job, drone=drone.hostname())
998
999
1000    # TODO(crbug.com/748234): This is temporary to enable toggling
1001    # lucifer rollouts with an option.
1002    def _send_parsing_to_lucifer(self):
1003        Status = models.HostQueueEntry.Status
1004        queue_entries_qs = (models.HostQueueEntry.objects
1005                            .filter(status=Status.PARSING))
1006        for queue_entry in queue_entries_qs:
1007            # If this HQE already has an agent, let monitor_db continue
1008            # owning it.
1009            if self.get_agents_for_entry(queue_entry):
1010                continue
1011            job = queue_entry.job
1012            if luciferlib.is_lucifer_owned(job):
1013                continue
1014            # TODO(crbug.com/811877): Ignore split HQEs.
1015            if luciferlib.is_split_job(queue_entry.id):
1016                continue
1017            task = postjob_task.PostJobTask(
1018                    [queue_entry], log_file_name='/dev/null')
1019            pidfile_id = task._autoserv_monitor.pidfile_id
1020            autoserv_exit = task._autoserv_monitor.exit_code()
1021            try:
1022                drone = luciferlib.spawn_parsing_job_handler(
1023                        manager=_drone_manager,
1024                        job=job,
1025                        autoserv_exit=autoserv_exit,
1026                        pidfile_id=pidfile_id)
1027                models.JobHandoff.objects.create(job=job,
1028                                                 drone=drone.hostname())
1029            except drone_manager.DroneManagerError as e:
1030                logging.warning(
1031                    'Fail to get drone for job %s, skipping lucifer. Error: %s',
1032                    job.id, e)
1033
1034
1035
1036    @_calls_log_tick_msg
1037    def _schedule_running_host_queue_entries(self):
1038        """
1039        Adds agents to the dispatcher.
1040
1041        Any AgentTask, like the QueueTask, is wrapped in an Agent. The
1042        QueueTask for example, will have a job with a control file, and
1043        the agent will have methods that poll, abort and check if the queue
1044        task is finished. The dispatcher runs the agent_task, as well as
1045        other agents in it's _agents member, through _handle_agents, by
1046        calling the Agents tick().
1047
1048        This method creates an agent for each HQE in one of (starting, running,
1049        gathering, parsing) states, and adds it to the dispatcher so
1050        it is handled by _handle_agents.
1051        """
1052        for agent_task in self._get_queue_entry_agent_tasks():
1053            self.add_agent_task(agent_task)
1054
1055
1056    @_calls_log_tick_msg
1057    def _find_aborting(self):
1058        """
1059        Looks through the afe_host_queue_entries for an aborted entry.
1060
1061        The aborted bit is set on an HQE in many ways, the most common
1062        being when a user requests an abort through the frontend, which
1063        results in an rpc from the afe to abort_host_queue_entries.
1064        """
1065        jobs_to_stop = set()
1066        for entry in scheduler_models.HostQueueEntry.fetch(
1067                where='aborted=1 and complete=0'):
1068            if (luciferlib.is_enabled_for('STARTING')
1069                and luciferlib.is_lucifer_owned_by_id(entry.job.id)):
1070                continue
1071
1072            # If the job is running on a shard, let the shard handle aborting
1073            # it and sync back the right status.
1074            if entry.job.shard_id is not None and not server_utils.is_shard():
1075                # Due to crbug.com/894162, we abort jobs that 1hr beyond
1076                # timeout on master.
1077                create_on = time.mktime(entry.job.created_on.timetuple())
1078                wait_threshold = entry.job.timeout_mins * 60 + 3600
1079                abort_anyway = wait_threshold < time.time() - create_on
1080                if abort_anyway:
1081                    logging.info('Aborting %s on master due to '
1082                                 'the job 1 hour beyond timeout.', entry)
1083                else:
1084                    logging.info('Waiting for shard %s to abort hqe %s',
1085                            entry.job.shard_id, entry)
1086                    continue
1087
1088            logging.info('Aborting %s', entry)
1089
1090            # The task would have started off with both is_complete and
1091            # is_active = False. Aborted tasks are neither active nor complete.
1092            # For all currently active tasks this will happen through the agent,
1093            # but we need to manually update the special tasks that haven't
1094            # started yet, because they don't have agents.
1095            models.SpecialTask.objects.filter(is_active=False,
1096                queue_entry_id=entry.id).update(is_complete=True)
1097
1098            for agent in self.get_agents_for_entry(entry):
1099                agent.abort()
1100            entry.abort(self)
1101            jobs_to_stop.add(entry.job)
1102        logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
1103        for job in jobs_to_stop:
1104            job.stop_if_necessary()
1105
1106
1107    @_calls_log_tick_msg
1108    def _find_aborted_special_tasks(self):
1109        """
1110        Find SpecialTasks that have been marked for abortion.
1111
1112        Poll the database looking for SpecialTasks that are active
1113        and have been marked for abortion, then abort them.
1114        """
1115
1116        # The completed and active bits are very important when it comes
1117        # to scheduler correctness. The active bit is set through the prolog
1118        # of a special task, and reset through the cleanup method of the
1119        # SpecialAgentTask. The cleanup is called both through the abort and
1120        # epilog. The complete bit is set in several places, and in general
1121        # a hanging job will have is_active=1 is_complete=0, while a special
1122        # task which completed will have is_active=0 is_complete=1. To check
1123        # aborts we directly check active because the complete bit is set in
1124        # several places, including the epilog of agent tasks.
1125        aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
1126                                                          is_aborted=True)
1127        for task in aborted_tasks:
1128            # There are 2 ways to get the agent associated with a task,
1129            # through the host and through the hqe. A special task
1130            # always needs a host, but doesn't always need a hqe.
1131            for agent in self._host_agents.get(task.host.id, []):
1132                if isinstance(agent.task, agent_task.SpecialAgentTask):
1133
1134                    # The epilog preforms critical actions such as
1135                    # queueing the next SpecialTask, requeuing the
1136                    # hqe etc, however it doesn't actually kill the
1137                    # monitor process and set the 'done' bit. Epilogs
1138                    # assume that the job failed, and that the monitor
1139                    # process has already written an exit code. The
1140                    # done bit is a necessary condition for
1141                    # _handle_agents to schedule any more special
1142                    # tasks against the host, and it must be set
1143                    # in addition to is_active, is_complete and success.
1144                    agent.task.epilog()
1145                    agent.task.abort()
1146
1147
1148    def _can_start_agent(self, agent, have_reached_limit):
1149        # always allow zero-process agents to run
1150        if agent.task.num_processes == 0:
1151            return True
1152        # don't allow any nonzero-process agents to run after we've reached a
1153        # limit (this avoids starvation of many-process agents)
1154        if have_reached_limit:
1155            return False
1156        # total process throttling
1157        max_runnable_processes = _drone_manager.max_runnable_processes(
1158                agent.task.owner_username,
1159                agent.task.get_drone_hostnames_allowed())
1160        if agent.task.num_processes > max_runnable_processes:
1161            return False
1162        return True
1163
1164
1165    @_calls_log_tick_msg
1166    def _handle_agents(self):
1167        """
1168        Handles agents of the dispatcher.
1169
1170        Appropriate Agents are added to the dispatcher through
1171        _schedule_running_host_queue_entries. These agents each
1172        have a task. This method runs the agents task through
1173        agent.tick() leading to:
1174            agent.start
1175                prolog -> AgentTasks prolog
1176                          For each queue entry:
1177                            sets host status/status to Running
1178                            set started_on in afe_host_queue_entries
1179                run    -> AgentTasks run
1180                          Creates PidfileRunMonitor
1181                          Queues the autoserv command line for this AgentTask
1182                          via the drone manager. These commands are executed
1183                          through the drone managers execute actions.
1184                poll   -> AgentTasks/BaseAgentTask poll
1185                          checks the monitors exit_code.
1186                          Executes epilog if task is finished.
1187                          Executes AgentTasks _finish_task
1188                finish_task is usually responsible for setting the status
1189                of the HQE/host, and updating it's active and complete fileds.
1190
1191            agent.is_done
1192                Removed the agent from the dispatchers _agents queue.
1193                Is_done checks the finished bit on the agent, that is
1194                set based on the Agents task. During the agents poll
1195                we check to see if the monitor process has exited in
1196                it's finish method, and set the success member of the
1197                task based on this exit code.
1198        """
1199        num_started_this_tick = 0
1200        num_finished_this_tick = 0
1201        have_reached_limit = False
1202        # iterate over copy, so we can remove agents during iteration
1203        logging.debug('Handling %d Agents', len(self._agents))
1204        for agent in list(self._agents):
1205            self._log_extra_msg('Processing Agent with Host Ids: %s and '
1206                                'queue_entry ids:%s' % (agent.host_ids,
1207                                agent.queue_entry_ids))
1208            if not agent.started:
1209                if not self._can_start_agent(agent, have_reached_limit):
1210                    have_reached_limit = True
1211                    logging.debug('Reached Limit of allowed running Agents.')
1212                    continue
1213                num_started_this_tick += agent.task.num_processes
1214                self._log_extra_msg('Starting Agent')
1215            agent.tick()
1216            self._log_extra_msg('Agent tick completed.')
1217            if agent.is_done():
1218                num_finished_this_tick += agent.task.num_processes
1219                self._log_extra_msg("Agent finished")
1220                self.remove_agent(agent)
1221
1222        metrics.Counter(
1223            'chromeos/autotest/scheduler/agent_processes_started'
1224        ).increment_by(num_started_this_tick)
1225        metrics.Counter(
1226            'chromeos/autotest/scheduler/agent_processes_finished'
1227        ).increment_by(num_finished_this_tick)
1228        num_agent_processes = _drone_manager.total_running_processes()
1229        metrics.Gauge(
1230            'chromeos/autotest/scheduler/agent_processes'
1231        ).set(num_agent_processes)
1232        logging.info('%d running processes. %d added this tick.',
1233                     num_agent_processes, num_started_this_tick)
1234
1235
1236    def _log_tick_msg(self, msg):
1237        if self._tick_debug:
1238            logging.debug(msg)
1239
1240
1241    def _log_extra_msg(self, msg):
1242        if self._extra_debugging:
1243            logging.debug(msg)
1244
1245
1246class Agent(object):
1247    """
1248    An agent for use by the Dispatcher class to perform a task.  An agent wraps
1249    around an AgentTask mainly to associate the AgentTask with the queue_entry
1250    and host ids.
1251
1252    The following methods are required on all task objects:
1253        poll() - Called periodically to let the task check its status and
1254                update its internal state.  If the task succeeded.
1255        is_done() - Returns True if the task is finished.
1256        abort() - Called when an abort has been requested.  The task must
1257                set its aborted attribute to True if it actually aborted.
1258
1259    The following attributes are required on all task objects:
1260        aborted - bool, True if this task was aborted.
1261        success - bool, True if this task succeeded.
1262        queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1263        host_ids - A sequence of Host ids this task represents.
1264    """
1265
1266
1267    def __init__(self, task):
1268        """
1269        @param task: An instance of an AgentTask.
1270        """
1271        self.task = task
1272
1273        # This is filled in by Dispatcher.add_agent()
1274        self.dispatcher = None
1275
1276        self.queue_entry_ids = task.queue_entry_ids
1277        self.host_ids = task.host_ids
1278
1279        self.started = False
1280        self.finished = False
1281
1282
1283    def tick(self):
1284        self.started = True
1285        if not self.finished:
1286            self.task.poll()
1287            if self.task.is_done():
1288                self.finished = True
1289
1290
1291    def is_done(self):
1292        return self.finished
1293
1294
1295    def abort(self):
1296        if self.task:
1297            self.task.abort()
1298            if self.task.aborted:
1299                # tasks can choose to ignore aborts
1300                self.finished = True
1301
1302
1303class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
1304    """
1305    Common functionality for QueueTask and HostlessQueueTask
1306    """
1307    def __init__(self, queue_entries):
1308        super(AbstractQueueTask, self).__init__()
1309        self.job = queue_entries[0].job
1310        self.queue_entries = queue_entries
1311
1312
1313    def _keyval_path(self):
1314        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1315
1316
1317    def _write_control_file(self, execution_path):
1318        control_path = _drone_manager.attach_file_to_execution(
1319                execution_path, self.job.control_file)
1320        return control_path
1321
1322
1323    # TODO: Refactor into autoserv_utils. crbug.com/243090
1324    def _command_line(self):
1325        execution_path = self.queue_entries[0].execution_path()
1326        control_path = self._write_control_file(execution_path)
1327        hostnames = ','.join(entry.host.hostname
1328                             for entry in self.queue_entries
1329                             if not entry.is_hostless())
1330
1331        execution_tag = self.queue_entries[0].execution_tag()
1332        params = _autoserv_command_line(
1333            hostnames,
1334            ['-P', execution_tag, '-n',
1335             _drone_manager.absolute_path(control_path)],
1336            job=self.job, verbose=False)
1337
1338        return params
1339
1340
1341    @property
1342    def num_processes(self):
1343        return len(self.queue_entries)
1344
1345
1346    @property
1347    def owner_username(self):
1348        return self.job.owner
1349
1350
1351    def _working_directory(self):
1352        return self._get_consistent_execution_path(self.queue_entries)
1353
1354
1355    def prolog(self):
1356        queued_key, queued_time = self._job_queued_keyval(self.job)
1357        keyval_dict = self.job.keyval_dict()
1358        keyval_dict[queued_key] = queued_time
1359        self._write_keyvals_before_job(keyval_dict)
1360        for queue_entry in self.queue_entries:
1361            queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
1362            queue_entry.set_started_on_now()
1363
1364
1365    def _write_lost_process_error_file(self):
1366        error_file_path = os.path.join(self._working_directory(), 'job_failure')
1367        _drone_manager.write_lines_to_file(error_file_path,
1368                                           [_LOST_PROCESS_ERROR])
1369
1370
1371    def _finish_task(self):
1372        if not self.monitor:
1373            return
1374
1375        self._write_job_finished()
1376
1377        if self.monitor.lost_process:
1378            self._write_lost_process_error_file()
1379
1380
1381    def _write_status_comment(self, comment):
1382        _drone_manager.write_lines_to_file(
1383            os.path.join(self._working_directory(), 'status.log'),
1384            ['INFO\t----\t----\t' + comment],
1385            paired_with_process=self.monitor.get_process())
1386
1387
1388    def _log_abort(self):
1389        if not self.monitor or not self.monitor.has_process():
1390            return
1391
1392        # build up sets of all the aborted_by and aborted_on values
1393        aborted_by, aborted_on = set(), set()
1394        for queue_entry in self.queue_entries:
1395            if queue_entry.aborted_by:
1396                aborted_by.add(queue_entry.aborted_by)
1397                t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1398                aborted_on.add(t)
1399
1400        # extract some actual, unique aborted by value and write it out
1401        # TODO(showard): this conditional is now obsolete, we just need to leave
1402        # it in temporarily for backwards compatibility over upgrades.  delete
1403        # soon.
1404        assert len(aborted_by) <= 1
1405        if len(aborted_by) == 1:
1406            aborted_by_value = aborted_by.pop()
1407            aborted_on_value = max(aborted_on)
1408        else:
1409            aborted_by_value = 'autotest_system'
1410            aborted_on_value = int(time.time())
1411
1412        self._write_keyval_after_job("aborted_by", aborted_by_value)
1413        self._write_keyval_after_job("aborted_on", aborted_on_value)
1414
1415        aborted_on_string = str(datetime.datetime.fromtimestamp(
1416            aborted_on_value))
1417        self._write_status_comment('Job aborted by %s on %s' %
1418                                   (aborted_by_value, aborted_on_string))
1419
1420
1421    def abort(self):
1422        super(AbstractQueueTask, self).abort()
1423        self._log_abort()
1424        self._finish_task()
1425
1426
1427    def epilog(self):
1428        super(AbstractQueueTask, self).epilog()
1429        self._finish_task()
1430
1431
1432class QueueTask(AbstractQueueTask):
1433    def __init__(self, queue_entries):
1434        super(QueueTask, self).__init__(queue_entries)
1435        self._set_ids(queue_entries=queue_entries)
1436        self._enable_ssp_container = (
1437                global_config.global_config.get_config_value(
1438                        'AUTOSERV', 'enable_ssp_container', type=bool,
1439                        default=True))
1440
1441
1442    def prolog(self):
1443        self._check_queue_entry_statuses(
1444                self.queue_entries,
1445                allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1446                                      models.HostQueueEntry.Status.RUNNING),
1447                allowed_host_statuses=(models.Host.Status.PENDING,
1448                                       models.Host.Status.RUNNING))
1449
1450        super(QueueTask, self).prolog()
1451
1452        for queue_entry in self.queue_entries:
1453            self._write_host_keyvals(queue_entry.host)
1454            queue_entry.host.set_status(models.Host.Status.RUNNING)
1455            queue_entry.host.update_field('dirty', 1)
1456
1457
1458    def _finish_task(self):
1459        super(QueueTask, self)._finish_task()
1460
1461        for queue_entry in self.queue_entries:
1462            queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
1463            queue_entry.host.set_status(models.Host.Status.RUNNING)
1464
1465
1466    def _command_line(self):
1467        invocation = super(QueueTask, self)._command_line()
1468        # Check if server-side packaging is needed.
1469        if (self._enable_ssp_container and
1470            self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1471            self.job.require_ssp != False):
1472            invocation += ['--require-ssp']
1473            keyval_dict = self.job.keyval_dict()
1474            test_source_build = keyval_dict.get('test_source_build', None)
1475            if test_source_build:
1476                invocation += ['--test_source_build', test_source_build]
1477        if self.job.parent_job_id:
1478            invocation += ['--parent_job_id', str(self.job.parent_job_id)]
1479        return invocation + ['--verify_job_repo_url']
1480
1481
1482class HostlessQueueTask(AbstractQueueTask):
1483    def __init__(self, queue_entry):
1484        super(HostlessQueueTask, self).__init__([queue_entry])
1485        self.queue_entry_ids = [queue_entry.id]
1486
1487
1488    def prolog(self):
1489        super(HostlessQueueTask, self).prolog()
1490
1491
1492    def _finish_task(self):
1493        super(HostlessQueueTask, self)._finish_task()
1494
1495        # When a job is added to database, its initial status is always
1496        # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1497        # status, check if any of them can be started. If scheduler hits some
1498        # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1499        # leave these jobs in Starting status. Otherwise, the jobs'
1500        # status will be changed to Running, and an autoserv process
1501        # will be started in drone for each of these jobs.
1502        # If the entry is still in status Starting, the process has not started
1503        # yet. Therefore, there is no need to parse and collect log. Without
1504        # this check, exception will be raised by scheduler as execution_subdir
1505        # for this queue entry does not have a value yet.
1506        hqe = self.queue_entries[0]
1507        if hqe.status != models.HostQueueEntry.Status.STARTING:
1508            hqe.set_status(models.HostQueueEntry.Status.PARSING)
1509
1510
1511if __name__ == '__main__':
1512    main()
1513