1#!/usr/bin/python
2"""
3Autotest scheduler
4"""
5
6import datetime
7import gc
8import logging
9import optparse
10import os
11import signal
12import sys
13import time
14
15import common
16from autotest_lib.frontend import setup_django_environment
17
18import django.db
19
20from autotest_lib.client.common_lib import control_data
21from autotest_lib.client.common_lib import global_config
22from autotest_lib.client.common_lib import utils
23from autotest_lib.client.common_lib.cros.graphite import autotest_stats
24from autotest_lib.frontend.afe import models, rpc_utils
25from autotest_lib.scheduler import agent_task, drone_manager
26from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
28from autotest_lib.scheduler import postjob_task
29from autotest_lib.scheduler import query_managers
30from autotest_lib.scheduler import scheduler_lib
31from autotest_lib.scheduler import scheduler_models
32from autotest_lib.scheduler import status_server, scheduler_config
33from autotest_lib.server import autoserv_utils
34from autotest_lib.server import system_utils
35from autotest_lib.server import utils as server_utils
36from autotest_lib.site_utils import metadata_reporter
37from autotest_lib.site_utils import server_manager_utils
38
39
40BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
41PID_FILE_PREFIX = 'monitor_db'
42
43RESULTS_DIR = '.'
44AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
45
46if os.environ.has_key('AUTOTEST_DIR'):
47    AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
48AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
50
51if AUTOTEST_SERVER_DIR not in sys.path:
52    sys.path.insert(0, AUTOTEST_SERVER_DIR)
53
54# error message to leave in results dir when an autoserv process disappears
55# mysteriously
56_LOST_PROCESS_ERROR = """\
57Autoserv failed abnormally during execution for this job, probably due to a
58system error on the Autotest server.  Full results may not be available.  Sorry.
59"""
60
61_db_manager = None
62_db = None
63_shutdown = False
64
65# These 2 globals are replaced for testing
66_autoserv_directory = autoserv_utils.autoserv_directory
67_autoserv_path = autoserv_utils.autoserv_path
68_testing_mode = False
69_drone_manager = None
70_inline_host_acquisition = global_config.global_config.get_config_value(
71        scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
72        default=True)
73
74_enable_ssp_container = global_config.global_config.get_config_value(
75        'AUTOSERV', 'enable_ssp_container', type=bool,
76        default=True)
77
78def _site_init_monitor_db_dummy():
79    return {}
80
81
82def _verify_default_drone_set_exists():
83    if (models.DroneSet.drone_sets_enabled() and
84            not models.DroneSet.default_drone_set_name()):
85        raise scheduler_lib.SchedulerError(
86                'Drone sets are enabled, but no default is set')
87
88
89def _sanity_check():
90    """Make sure the configs are consistent before starting the scheduler"""
91    _verify_default_drone_set_exists()
92
93
94def main():
95    try:
96        try:
97            main_without_exception_handling()
98        except SystemExit:
99            raise
100        except:
101            logging.exception('Exception escaping in monitor_db')
102            raise
103    finally:
104        utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
105
106
107def main_without_exception_handling():
108    scheduler_lib.setup_logging(
109            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
110            os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
111    usage = 'usage: %prog [options] results_dir'
112    parser = optparse.OptionParser(usage)
113    parser.add_option('--recover-hosts', help='Try to recover dead hosts',
114                      action='store_true')
115    parser.add_option('--test', help='Indicate that scheduler is under ' +
116                      'test and should use dummy autoserv and no parsing',
117                      action='store_true')
118    parser.add_option('--production',
119                      help=('Indicate that scheduler is running in production '
120                            'environment and it can use database that is not '
121                            'hosted in localhost. If it is set to False, '
122                            'scheduler will fail if database is not in '
123                            'localhost.'),
124                      action='store_true', default=False)
125    (options, args) = parser.parse_args()
126    if len(args) != 1:
127        parser.print_usage()
128        return
129
130    scheduler_lib.check_production_settings(options)
131
132    scheduler_enabled = global_config.global_config.get_config_value(
133        scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
134
135    if not scheduler_enabled:
136        logging.error("Scheduler not enabled, set enable_scheduler to true in "
137                      "the global_config's SCHEDULER section to enable it. "
138                      "Exiting.")
139        sys.exit(1)
140
141    global RESULTS_DIR
142    RESULTS_DIR = args[0]
143
144    site_init = utils.import_site_function(__file__,
145        "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
146        _site_init_monitor_db_dummy)
147    site_init()
148
149    # Change the cwd while running to avoid issues incase we were launched from
150    # somewhere odd (such as a random NFS home directory of the person running
151    # sudo to launch us as the appropriate user).
152    os.chdir(RESULTS_DIR)
153
154    # This is helpful for debugging why stuff a scheduler launches is
155    # misbehaving.
156    logging.info('os.environ: %s', os.environ)
157
158    if options.test:
159        global _autoserv_path
160        _autoserv_path = 'autoserv_dummy'
161        global _testing_mode
162        _testing_mode = True
163
164    server = status_server.StatusServer()
165    server.start()
166
167    # Start the thread to report metadata.
168    metadata_reporter.start()
169
170    try:
171        initialize()
172        dispatcher = Dispatcher()
173        dispatcher.initialize(recover_hosts=options.recover_hosts)
174        minimum_tick_sec = global_config.global_config.get_config_value(
175                scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
176
177        while not _shutdown and not server._shutdown_scheduler:
178            start = time.time()
179            dispatcher.tick()
180            curr_tick_sec = time.time() - start
181            if (minimum_tick_sec > curr_tick_sec):
182                time.sleep(minimum_tick_sec - curr_tick_sec)
183            else:
184                time.sleep(0.0001)
185    except Exception:
186        email_manager.manager.log_stacktrace(
187            "Uncaught exception; terminating monitor_db")
188
189    metadata_reporter.abort()
190    email_manager.manager.send_queued_emails()
191    server.shutdown()
192    _drone_manager.shutdown()
193    _db_manager.disconnect()
194
195
196def handle_signal(signum, frame):
197    global _shutdown
198    _shutdown = True
199    logging.info("Shutdown request received.")
200
201
202def initialize():
203    logging.info("%s> dispatcher starting", time.strftime("%X %x"))
204    logging.info("My PID is %d", os.getpid())
205
206    if utils.program_is_alive(PID_FILE_PREFIX):
207        logging.critical("monitor_db already running, aborting!")
208        sys.exit(1)
209    utils.write_pid(PID_FILE_PREFIX)
210
211    if _testing_mode:
212        global_config.global_config.override_config_value(
213            scheduler_lib.DB_CONFIG_SECTION, 'database',
214            'stresstest_autotest_web')
215
216    # If server database is enabled, check if the server has role `scheduler`.
217    # If the server does not have scheduler role, exception will be raised and
218    # scheduler will not continue to run.
219    if server_manager_utils.use_server_db():
220        server_manager_utils.confirm_server_has_role(hostname='localhost',
221                                                     role='scheduler')
222
223    os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
224    global _db_manager
225    _db_manager = scheduler_lib.ConnectionManager()
226    global _db
227    _db = _db_manager.get_connection()
228    logging.info("Setting signal handler")
229    signal.signal(signal.SIGINT, handle_signal)
230    signal.signal(signal.SIGTERM, handle_signal)
231
232    initialize_globals()
233    scheduler_models.initialize()
234
235    drone_list = system_utils.get_drones()
236    results_host = global_config.global_config.get_config_value(
237        scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
238    _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
239
240    logging.info("Connected! Running...")
241
242
243def initialize_globals():
244    global _drone_manager
245    _drone_manager = drone_manager.instance()
246
247
248def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
249                           verbose=True):
250    """
251    @returns The autoserv command line as a list of executable + parameters.
252
253    @param machines - string - A machine or comma separated list of machines
254            for the (-m) flag.
255    @param extra_args - list - Additional arguments to pass to autoserv.
256    @param job - Job object - If supplied, -u owner, -l name, --test-retry,
257            and client -c or server -s parameters will be added.
258    @param queue_entry - A HostQueueEntry object - If supplied and no Job
259            object was supplied, this will be used to lookup the Job object.
260    """
261    command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
262            machines, results_directory=drone_manager.WORKING_DIRECTORY,
263            extra_args=extra_args, job=job, queue_entry=queue_entry,
264            verbose=verbose, in_lab=True)
265    return command
266
267
268class BaseDispatcher(object):
269
270
271    def __init__(self):
272        self._agents = []
273        self._last_clean_time = time.time()
274        user_cleanup_time = scheduler_config.config.clean_interval_minutes
275        self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
276                _db, user_cleanup_time)
277        self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
278                _db, _drone_manager)
279        self._host_agents = {}
280        self._queue_entry_agents = {}
281        self._tick_count = 0
282        self._last_garbage_stats_time = time.time()
283        self._seconds_between_garbage_stats = 60 * (
284                global_config.global_config.get_config_value(
285                        scheduler_config.CONFIG_SECTION,
286                        'gc_stats_interval_mins', type=int, default=6*60))
287        self._tick_debug = global_config.global_config.get_config_value(
288                scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
289                default=False)
290        self._extra_debugging = global_config.global_config.get_config_value(
291                scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
292                default=False)
293
294        # If _inline_host_acquisition is set the scheduler will acquire and
295        # release hosts against jobs inline, with the tick. Otherwise the
296        # scheduler will only focus on jobs that already have hosts, and
297        # will not explicitly unlease a host when a job finishes using it.
298        self._job_query_manager = query_managers.AFEJobQueryManager()
299        self._host_scheduler = (host_scheduler.BaseHostScheduler()
300                                if _inline_host_acquisition else
301                                host_scheduler.DummyHostScheduler())
302
303
304    def initialize(self, recover_hosts=True):
305        self._periodic_cleanup.initialize()
306        self._24hr_upkeep.initialize()
307        # Execute all actions queued in the cleanup tasks. Scheduler tick will
308        # run a refresh task first. If there is any action in the queue, refresh
309        # will raise an exception.
310        _drone_manager.execute_actions()
311
312        # always recover processes
313        self._recover_processes()
314
315        if recover_hosts:
316            self._recover_hosts()
317
318
319    def _log_tick_msg(self, msg):
320        if self._tick_debug:
321            logging.debug(msg)
322
323
324    def _log_extra_msg(self, msg):
325        if self._extra_debugging:
326            logging.debug(msg)
327
328
329    def tick(self):
330        """
331        This is an altered version of tick() where we keep track of when each
332        major step begins so we can try to figure out where we are using most
333        of the tick time.
334        """
335        timer = autotest_stats.Timer('scheduler.tick')
336        system_utils.DroneCache.refresh()
337        self._log_tick_msg('Calling new tick, starting garbage collection().')
338        self._garbage_collection()
339        self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
340        _drone_manager.trigger_refresh()
341        self._log_tick_msg('Calling _process_recurring_runs().')
342        self._process_recurring_runs()
343        self._log_tick_msg('Calling _schedule_delay_tasks().')
344        self._schedule_delay_tasks()
345        self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
346        self._schedule_running_host_queue_entries()
347        self._log_tick_msg('Calling _schedule_special_tasks().')
348        self._schedule_special_tasks()
349        self._log_tick_msg('Calling _schedule_new_jobs().')
350        self._schedule_new_jobs()
351        self._log_tick_msg('Calling _drone_manager.sync_refresh().')
352        _drone_manager.sync_refresh()
353        # _run_cleanup must be called between drone_manager.sync_refresh, and
354        # drone_manager.execute_actions, as sync_refresh will clear the calls
355        # queued in drones. Therefore, any action that calls drone.queue_call
356        # to add calls to the drone._calls, should be after drone refresh is
357        # completed and before drone_manager.execute_actions at the end of the
358        # tick.
359        self._log_tick_msg('Calling _run_cleanup().')
360        self._run_cleanup()
361        self._log_tick_msg('Calling _find_aborting().')
362        self._find_aborting()
363        self._log_tick_msg('Calling _find_aborted_special_tasks().')
364        self._find_aborted_special_tasks()
365        self._log_tick_msg('Calling _handle_agents().')
366        self._handle_agents()
367        self._log_tick_msg('Calling _host_scheduler.tick().')
368        self._host_scheduler.tick()
369        self._log_tick_msg('Calling _drone_manager.execute_actions().')
370        _drone_manager.execute_actions()
371        self._log_tick_msg('Calling '
372                           'email_manager.manager.send_queued_emails().')
373        with timer.get_client('email_manager_send_queued_emails'):
374            email_manager.manager.send_queued_emails()
375        self._log_tick_msg('Calling django.db.reset_queries().')
376        with timer.get_client('django_db_reset_queries'):
377            django.db.reset_queries()
378        self._tick_count += 1
379
380
381    def _run_cleanup(self):
382        self._periodic_cleanup.run_cleanup_maybe()
383        self._24hr_upkeep.run_cleanup_maybe()
384
385
386    def _garbage_collection(self):
387        threshold_time = time.time() - self._seconds_between_garbage_stats
388        if threshold_time < self._last_garbage_stats_time:
389            # Don't generate these reports very often.
390            return
391
392        self._last_garbage_stats_time = time.time()
393        # Force a full level 0 collection (because we can, it doesn't hurt
394        # at this interval).
395        gc.collect()
396        logging.info('Logging garbage collector stats on tick %d.',
397                     self._tick_count)
398        gc_stats._log_garbage_collector_stats()
399
400
401    def _register_agent_for_ids(self, agent_dict, object_ids, agent):
402        for object_id in object_ids:
403            agent_dict.setdefault(object_id, set()).add(agent)
404
405
406    def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
407        for object_id in object_ids:
408            assert object_id in agent_dict
409            agent_dict[object_id].remove(agent)
410            # If an ID has no more active agent associated, there is no need to
411            # keep it in the dictionary. Otherwise, scheduler will keep an
412            # unnecessarily big dictionary until being restarted.
413            if not agent_dict[object_id]:
414                agent_dict.pop(object_id)
415
416
417    def add_agent_task(self, agent_task):
418        """
419        Creates and adds an agent to the dispatchers list.
420
421        In creating the agent we also pass on all the queue_entry_ids and
422        host_ids from the special agent task. For every agent we create, we
423        add it to 1. a dict against the queue_entry_ids given to it 2. A dict
424        against the host_ids given to it. So theoritically, a host can have any
425        number of agents associated with it, and each of them can have any
426        special agent task, though in practice we never see > 1 agent/task per
427        host at any time.
428
429        @param agent_task: A SpecialTask for the agent to manage.
430        """
431        agent = Agent(agent_task)
432        self._agents.append(agent)
433        agent.dispatcher = self
434        self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
435        self._register_agent_for_ids(self._queue_entry_agents,
436                                     agent.queue_entry_ids, agent)
437
438
439    def get_agents_for_entry(self, queue_entry):
440        """
441        Find agents corresponding to the specified queue_entry.
442        """
443        return list(self._queue_entry_agents.get(queue_entry.id, set()))
444
445
446    def host_has_agent(self, host):
447        """
448        Determine if there is currently an Agent present using this host.
449        """
450        return bool(self._host_agents.get(host.id, None))
451
452
453    def remove_agent(self, agent):
454        self._agents.remove(agent)
455        self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
456                                       agent)
457        self._unregister_agent_for_ids(self._queue_entry_agents,
458                                       agent.queue_entry_ids, agent)
459
460
461    def _host_has_scheduled_special_task(self, host):
462        return bool(models.SpecialTask.objects.filter(host__id=host.id,
463                                                      is_active=False,
464                                                      is_complete=False))
465
466
467    def _recover_processes(self):
468        agent_tasks = self._create_recovery_agent_tasks()
469        self._register_pidfiles(agent_tasks)
470        _drone_manager.refresh()
471        self._recover_tasks(agent_tasks)
472        self._recover_pending_entries()
473        self._check_for_unrecovered_verifying_entries()
474        self._reverify_remaining_hosts()
475        # reinitialize drones after killing orphaned processes, since they can
476        # leave around files when they die
477        _drone_manager.execute_actions()
478        _drone_manager.reinitialize_drones()
479
480
481    def _create_recovery_agent_tasks(self):
482        return (self._get_queue_entry_agent_tasks()
483                + self._get_special_task_agent_tasks(is_active=True))
484
485
486    def _get_queue_entry_agent_tasks(self):
487        """
488        Get agent tasks for all hqe in the specified states.
489
490        Loosely this translates to taking a hqe in one of the specified states,
491        say parsing, and getting an AgentTask for it, like the FinalReparseTask,
492        through _get_agent_task_for_queue_entry. Each queue entry can only have
493        one agent task at a time, but there might be multiple queue entries in
494        the group.
495
496        @return: A list of AgentTasks.
497        """
498        # host queue entry statuses handled directly by AgentTasks (Verifying is
499        # handled through SpecialTasks, so is not listed here)
500        statuses = (models.HostQueueEntry.Status.STARTING,
501                    models.HostQueueEntry.Status.RUNNING,
502                    models.HostQueueEntry.Status.GATHERING,
503                    models.HostQueueEntry.Status.PARSING,
504                    models.HostQueueEntry.Status.ARCHIVING)
505        status_list = ','.join("'%s'" % status for status in statuses)
506        queue_entries = scheduler_models.HostQueueEntry.fetch(
507                where='status IN (%s)' % status_list)
508        autotest_stats.Gauge('scheduler.jobs_per_tick').send(
509                'running', len(queue_entries))
510
511        agent_tasks = []
512        used_queue_entries = set()
513        for entry in queue_entries:
514            if self.get_agents_for_entry(entry):
515                # already being handled
516                continue
517            if entry in used_queue_entries:
518                # already picked up by a synchronous job
519                continue
520            agent_task = self._get_agent_task_for_queue_entry(entry)
521            agent_tasks.append(agent_task)
522            used_queue_entries.update(agent_task.queue_entries)
523        return agent_tasks
524
525
526    def _get_special_task_agent_tasks(self, is_active=False):
527        special_tasks = models.SpecialTask.objects.filter(
528                is_active=is_active, is_complete=False)
529        return [self._get_agent_task_for_special_task(task)
530                for task in special_tasks]
531
532
533    def _get_agent_task_for_queue_entry(self, queue_entry):
534        """
535        Construct an AgentTask instance for the given active HostQueueEntry.
536
537        @param queue_entry: a HostQueueEntry
538        @return: an AgentTask to run the queue entry
539        """
540        task_entries = queue_entry.job.get_group_entries(queue_entry)
541        self._check_for_duplicate_host_entries(task_entries)
542
543        if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
544                                  models.HostQueueEntry.Status.RUNNING):
545            if queue_entry.is_hostless():
546                return HostlessQueueTask(queue_entry=queue_entry)
547            return QueueTask(queue_entries=task_entries)
548        if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
549            return postjob_task.GatherLogsTask(queue_entries=task_entries)
550        if queue_entry.status == models.HostQueueEntry.Status.PARSING:
551            return postjob_task.FinalReparseTask(queue_entries=task_entries)
552        if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
553            return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
554
555        raise scheduler_lib.SchedulerError(
556                '_get_agent_task_for_queue_entry got entry with '
557                'invalid status %s: %s' % (queue_entry.status, queue_entry))
558
559
560    def _check_for_duplicate_host_entries(self, task_entries):
561        non_host_statuses = (models.HostQueueEntry.Status.PARSING,
562                             models.HostQueueEntry.Status.ARCHIVING)
563        for task_entry in task_entries:
564            using_host = (task_entry.host is not None
565                          and task_entry.status not in non_host_statuses)
566            if using_host:
567                self._assert_host_has_no_agent(task_entry)
568
569
570    def _assert_host_has_no_agent(self, entry):
571        """
572        @param entry: a HostQueueEntry or a SpecialTask
573        """
574        if self.host_has_agent(entry.host):
575            agent = tuple(self._host_agents.get(entry.host.id))[0]
576            raise scheduler_lib.SchedulerError(
577                    'While scheduling %s, host %s already has a host agent %s'
578                    % (entry, entry.host, agent.task))
579
580
581    def _get_agent_task_for_special_task(self, special_task):
582        """
583        Construct an AgentTask class to run the given SpecialTask and add it
584        to this dispatcher.
585
586        A special task is created through schedule_special_tasks, but only if
587        the host doesn't already have an agent. This happens through
588        add_agent_task. All special agent tasks are given a host on creation,
589        and a Null hqe. To create a SpecialAgentTask object, you need a
590        models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
591        object contains a hqe it's passed on to the special agent task, which
592        creates a HostQueueEntry and saves it as it's queue_entry.
593
594        @param special_task: a models.SpecialTask instance
595        @returns an AgentTask to run this SpecialTask
596        """
597        self._assert_host_has_no_agent(special_task)
598
599        special_agent_task_classes = (prejob_task.CleanupTask,
600                                      prejob_task.VerifyTask,
601                                      prejob_task.RepairTask,
602                                      prejob_task.ResetTask,
603                                      prejob_task.ProvisionTask)
604
605        for agent_task_class in special_agent_task_classes:
606            if agent_task_class.TASK_TYPE == special_task.task:
607                return agent_task_class(task=special_task)
608
609        raise scheduler_lib.SchedulerError(
610                'No AgentTask class for task', str(special_task))
611
612
613    def _register_pidfiles(self, agent_tasks):
614        for agent_task in agent_tasks:
615            agent_task.register_necessary_pidfiles()
616
617
618    def _recover_tasks(self, agent_tasks):
619        orphans = _drone_manager.get_orphaned_autoserv_processes()
620
621        for agent_task in agent_tasks:
622            agent_task.recover()
623            if agent_task.monitor and agent_task.monitor.has_process():
624                orphans.discard(agent_task.monitor.get_process())
625            self.add_agent_task(agent_task)
626
627        self._check_for_remaining_orphan_processes(orphans)
628
629
630    def _get_unassigned_entries(self, status):
631        for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
632                                                           % status):
633            if entry.status == status and not self.get_agents_for_entry(entry):
634                # The status can change during iteration, e.g., if job.run()
635                # sets a group of queue entries to Starting
636                yield entry
637
638
639    def _check_for_remaining_orphan_processes(self, orphans):
640        if not orphans:
641            return
642        subject = 'Unrecovered orphan autoserv processes remain'
643        message = '\n'.join(str(process) for process in orphans)
644        email_manager.manager.enqueue_notify_email(subject, message)
645
646        die_on_orphans = global_config.global_config.get_config_value(
647            scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
648
649        if die_on_orphans:
650            raise RuntimeError(subject + '\n' + message)
651
652
653    def _recover_pending_entries(self):
654        for entry in self._get_unassigned_entries(
655                models.HostQueueEntry.Status.PENDING):
656            logging.info('Recovering Pending entry %s', entry)
657            entry.on_pending()
658
659
660    def _check_for_unrecovered_verifying_entries(self):
661        queue_entries = scheduler_models.HostQueueEntry.fetch(
662                where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
663        unrecovered_hqes = []
664        for queue_entry in queue_entries:
665            special_tasks = models.SpecialTask.objects.filter(
666                    task__in=(models.SpecialTask.Task.CLEANUP,
667                              models.SpecialTask.Task.VERIFY),
668                    queue_entry__id=queue_entry.id,
669                    is_complete=False)
670            if special_tasks.count() == 0:
671                unrecovered_hqes.append(queue_entry)
672
673        if unrecovered_hqes:
674            message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
675            raise scheduler_lib.SchedulerError(
676                    '%d unrecovered verifying host queue entries:\n%s' %
677                    (len(unrecovered_hqes), message))
678
679
680    def _schedule_special_tasks(self):
681        """
682        Execute queued SpecialTasks that are ready to run on idle hosts.
683
684        Special tasks include PreJobTasks like verify, reset and cleanup.
685        They are created through _schedule_new_jobs and associated with a hqe
686        This method translates SpecialTasks to the appropriate AgentTask and
687        adds them to the dispatchers agents list, so _handle_agents can execute
688        them.
689        """
690        # When the host scheduler is responsible for acquisition we only want
691        # to run tasks with leased hosts. All hqe tasks will already have
692        # leased hosts, and we don't want to run frontend tasks till the host
693        # scheduler has vetted the assignment. Note that this doesn't include
694        # frontend tasks with hosts leased by other active hqes.
695        for task in self._job_query_manager.get_prioritized_special_tasks(
696                only_tasks_with_leased_hosts=not _inline_host_acquisition):
697            if self.host_has_agent(task.host):
698                continue
699            self.add_agent_task(self._get_agent_task_for_special_task(task))
700
701
702    def _reverify_remaining_hosts(self):
703        # recover active hosts that have not yet been recovered, although this
704        # should never happen
705        message = ('Recovering active host %s - this probably indicates a '
706                   'scheduler bug')
707        self._reverify_hosts_where(
708                "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
709                print_message=message)
710
711
712    def _reverify_hosts_where(self, where,
713                              print_message='Reverifying host %s'):
714        full_where='locked = 0 AND invalid = 0 AND ' + where
715        for host in scheduler_models.Host.fetch(where=full_where):
716            if self.host_has_agent(host):
717                # host has already been recovered in some way
718                continue
719            if self._host_has_scheduled_special_task(host):
720                # host will have a special task scheduled on the next tick
721                continue
722            if print_message:
723                logging.info(print_message, host.hostname)
724            models.SpecialTask.objects.create(
725                    task=models.SpecialTask.Task.CLEANUP,
726                    host=models.Host.objects.get(id=host.id))
727
728
729    def _recover_hosts(self):
730        # recover "Repair Failed" hosts
731        message = 'Reverifying dead host %s'
732        self._reverify_hosts_where("status = 'Repair Failed'",
733                                   print_message=message)
734
735
736    def _refresh_pending_queue_entries(self):
737        """
738        Lookup the pending HostQueueEntries and call our HostScheduler
739        refresh() method given that list.  Return the list.
740
741        @returns A list of pending HostQueueEntries sorted in priority order.
742        """
743        queue_entries = self._job_query_manager.get_pending_queue_entries(
744                only_hostless=not _inline_host_acquisition)
745        if not queue_entries:
746            return []
747        return queue_entries
748
749
750    def _schedule_hostless_job(self, queue_entry):
751        """Schedule a hostless (suite) job.
752
753        @param queue_entry: The queue_entry representing the hostless job.
754        """
755        self.add_agent_task(HostlessQueueTask(queue_entry))
756
757        # Need to set execution_subdir before setting the status:
758        # After a restart of the scheduler, agents will be restored for HQEs in
759        # Starting, Running, Gathering, Parsing or Archiving. To do this, the
760        # execution_subdir is needed. Therefore it must be set before entering
761        # one of these states.
762        # Otherwise, if the scheduler was interrupted between setting the status
763        # and the execution_subdir, upon it's restart restoring agents would
764        # fail.
765        # Is there a way to get a status in one of these states without going
766        # through this code? Following cases are possible:
767        # - If it's aborted before being started:
768        #     active bit will be 0, so there's nothing to parse, it will just be
769        #     set to completed by _find_aborting. Critical statuses are skipped.
770        # - If it's aborted or it fails after being started:
771        #     It was started, so this code was executed.
772        queue_entry.update_field('execution_subdir', 'hostless')
773        queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
774
775
776    def _schedule_host_job(self, host, queue_entry):
777        """Schedules a job on the given host.
778
779        1. Assign the host to the hqe, if it isn't already assigned.
780        2. Create a SpecialAgentTask for the hqe.
781        3. Activate the hqe.
782
783        @param queue_entry: The job to schedule.
784        @param host: The host to schedule the job on.
785        """
786        if self.host_has_agent(host):
787            host_agent_task = list(self._host_agents.get(host.id))[0].task
788            subject = 'Host with agents assigned to an HQE'
789            message = ('HQE: %s assigned host %s, but the host has '
790                       'agent: %s for queue_entry %s. The HQE '
791                       'will have to try and acquire a host next tick ' %
792                       (queue_entry, host.hostname, host_agent_task,
793                        host_agent_task.queue_entry))
794            email_manager.manager.enqueue_notify_email(subject, message)
795        else:
796            self._host_scheduler.schedule_host_job(host, queue_entry)
797
798
799    def _schedule_new_jobs(self):
800        """
801        Find any new HQEs and call schedule_pre_job_tasks for it.
802
803        This involves setting the status of the HQE and creating a row in the
804        db corresponding the the special task, through
805        scheduler_models._queue_special_task. The new db row is then added as
806        an agent to the dispatcher through _schedule_special_tasks and
807        scheduled for execution on the drone through _handle_agents.
808        """
809        queue_entries = self._refresh_pending_queue_entries()
810
811        key = 'scheduler.jobs_per_tick'
812        new_hostless_jobs = 0
813        new_jobs_with_hosts = 0
814        new_jobs_need_hosts = 0
815        host_jobs = []
816        logging.debug('Processing %d queue_entries', len(queue_entries))
817
818        for queue_entry in queue_entries:
819            if queue_entry.is_hostless():
820                self._schedule_hostless_job(queue_entry)
821                new_hostless_jobs = new_hostless_jobs + 1
822            else:
823                host_jobs.append(queue_entry)
824                new_jobs_need_hosts = new_jobs_need_hosts + 1
825
826        autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
827        if not host_jobs:
828            return
829        if not _inline_host_acquisition:
830            message = ('Found %s jobs that need hosts though '
831                       '_inline_host_acquisition=%s. Will acquire hosts.' %
832                       ([str(job) for job in host_jobs],
833                        _inline_host_acquisition))
834            email_manager.manager.enqueue_notify_email(
835                    'Processing unexpected host acquisition requests', message)
836        jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
837        for host_assignment in jobs_with_hosts:
838            self._schedule_host_job(host_assignment.host, host_assignment.job)
839            new_jobs_with_hosts = new_jobs_with_hosts + 1
840
841        autotest_stats.Gauge(key).send('new_jobs_with_hosts',
842                                       new_jobs_with_hosts)
843        autotest_stats.Gauge(key).send('new_jobs_without_hosts',
844                                       new_jobs_need_hosts -
845                                       new_jobs_with_hosts)
846
847
848    def _schedule_running_host_queue_entries(self):
849        """
850        Adds agents to the dispatcher.
851
852        Any AgentTask, like the QueueTask, is wrapped in an Agent. The
853        QueueTask for example, will have a job with a control file, and
854        the agent will have methods that poll, abort and check if the queue
855        task is finished. The dispatcher runs the agent_task, as well as
856        other agents in it's _agents member, through _handle_agents, by
857        calling the Agents tick().
858
859        This method creates an agent for each HQE in one of (starting, running,
860        gathering, parsing, archiving) states, and adds it to the dispatcher so
861        it is handled by _handle_agents.
862        """
863        for agent_task in self._get_queue_entry_agent_tasks():
864            self.add_agent_task(agent_task)
865
866
867    def _schedule_delay_tasks(self):
868        for entry in scheduler_models.HostQueueEntry.fetch(
869                where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
870            task = entry.job.schedule_delayed_callback_task(entry)
871            if task:
872                self.add_agent_task(task)
873
874
875    def _find_aborting(self):
876        """
877        Looks through the afe_host_queue_entries for an aborted entry.
878
879        The aborted bit is set on an HQE in many ways, the most common
880        being when a user requests an abort through the frontend, which
881        results in an rpc from the afe to abort_host_queue_entries.
882        """
883        jobs_to_stop = set()
884        for entry in scheduler_models.HostQueueEntry.fetch(
885                where='aborted=1 and complete=0'):
886
887            # If the job is running on a shard, let the shard handle aborting
888            # it and sync back the right status.
889            if entry.job.shard_id is not None and not server_utils.is_shard():
890                logging.info('Waiting for shard %s to abort hqe %s',
891                        entry.job.shard_id, entry)
892                continue
893
894            logging.info('Aborting %s', entry)
895
896            # The task would have started off with both is_complete and
897            # is_active = False. Aborted tasks are neither active nor complete.
898            # For all currently active tasks this will happen through the agent,
899            # but we need to manually update the special tasks that haven't
900            # started yet, because they don't have agents.
901            models.SpecialTask.objects.filter(is_active=False,
902                queue_entry_id=entry.id).update(is_complete=True)
903
904            for agent in self.get_agents_for_entry(entry):
905                agent.abort()
906            entry.abort(self)
907            jobs_to_stop.add(entry.job)
908        logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
909        for job in jobs_to_stop:
910            job.stop_if_necessary()
911
912
913    def _find_aborted_special_tasks(self):
914        """
915        Find SpecialTasks that have been marked for abortion.
916
917        Poll the database looking for SpecialTasks that are active
918        and have been marked for abortion, then abort them.
919        """
920
921        # The completed and active bits are very important when it comes
922        # to scheduler correctness. The active bit is set through the prolog
923        # of a special task, and reset through the cleanup method of the
924        # SpecialAgentTask. The cleanup is called both through the abort and
925        # epilog. The complete bit is set in several places, and in general
926        # a hanging job will have is_active=1 is_complete=0, while a special
927        # task which completed will have is_active=0 is_complete=1. To check
928        # aborts we directly check active because the complete bit is set in
929        # several places, including the epilog of agent tasks.
930        aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
931                                                          is_aborted=True)
932        for task in aborted_tasks:
933            # There are 2 ways to get the agent associated with a task,
934            # through the host and through the hqe. A special task
935            # always needs a host, but doesn't always need a hqe.
936            for agent in self._host_agents.get(task.host.id, []):
937                if isinstance(agent.task, agent_task.SpecialAgentTask):
938
939                    # The epilog preforms critical actions such as
940                    # queueing the next SpecialTask, requeuing the
941                    # hqe etc, however it doesn't actually kill the
942                    # monitor process and set the 'done' bit. Epilogs
943                    # assume that the job failed, and that the monitor
944                    # process has already written an exit code. The
945                    # done bit is a necessary condition for
946                    # _handle_agents to schedule any more special
947                    # tasks against the host, and it must be set
948                    # in addition to is_active, is_complete and success.
949                    agent.task.epilog()
950                    agent.task.abort()
951
952
953    def _can_start_agent(self, agent, have_reached_limit):
954        # always allow zero-process agents to run
955        if agent.task.num_processes == 0:
956            return True
957        # don't allow any nonzero-process agents to run after we've reached a
958        # limit (this avoids starvation of many-process agents)
959        if have_reached_limit:
960            return False
961        # total process throttling
962        max_runnable_processes = _drone_manager.max_runnable_processes(
963                agent.task.owner_username,
964                agent.task.get_drone_hostnames_allowed())
965        if agent.task.num_processes > max_runnable_processes:
966            return False
967        return True
968
969
970    def _handle_agents(self):
971        """
972        Handles agents of the dispatcher.
973
974        Appropriate Agents are added to the dispatcher through
975        _schedule_running_host_queue_entries. These agents each
976        have a task. This method runs the agents task through
977        agent.tick() leading to:
978            agent.start
979                prolog -> AgentTasks prolog
980                          For each queue entry:
981                            sets host status/status to Running
982                            set started_on in afe_host_queue_entries
983                run    -> AgentTasks run
984                          Creates PidfileRunMonitor
985                          Queues the autoserv command line for this AgentTask
986                          via the drone manager. These commands are executed
987                          through the drone managers execute actions.
988                poll   -> AgentTasks/BaseAgentTask poll
989                          checks the monitors exit_code.
990                          Executes epilog if task is finished.
991                          Executes AgentTasks _finish_task
992                finish_task is usually responsible for setting the status
993                of the HQE/host, and updating it's active and complete fileds.
994
995            agent.is_done
996                Removed the agent from the dispatchers _agents queue.
997                Is_done checks the finished bit on the agent, that is
998                set based on the Agents task. During the agents poll
999                we check to see if the monitor process has exited in
1000                it's finish method, and set the success member of the
1001                task based on this exit code.
1002        """
1003        num_started_this_tick = 0
1004        num_finished_this_tick = 0
1005        have_reached_limit = False
1006        # iterate over copy, so we can remove agents during iteration
1007        logging.debug('Handling %d Agents', len(self._agents))
1008        for agent in list(self._agents):
1009            self._log_extra_msg('Processing Agent with Host Ids: %s and '
1010                                'queue_entry ids:%s' % (agent.host_ids,
1011                                agent.queue_entry_ids))
1012            if not agent.started:
1013                if not self._can_start_agent(agent, have_reached_limit):
1014                    have_reached_limit = True
1015                    logging.debug('Reached Limit of allowed running Agents.')
1016                    continue
1017                num_started_this_tick += agent.task.num_processes
1018                self._log_extra_msg('Starting Agent')
1019            agent.tick()
1020            self._log_extra_msg('Agent tick completed.')
1021            if agent.is_done():
1022                num_finished_this_tick += agent.task.num_processes
1023                self._log_extra_msg("Agent finished")
1024                self.remove_agent(agent)
1025        autotest_stats.Gauge('scheduler.jobs_per_tick').send(
1026                'agents_started', num_started_this_tick)
1027        autotest_stats.Gauge('scheduler.jobs_per_tick').send(
1028                'agents_finished', num_finished_this_tick)
1029        logging.info('%d running processes. %d added this tick.',
1030                     _drone_manager.total_running_processes(),
1031                     num_started_this_tick)
1032
1033
1034    def _process_recurring_runs(self):
1035        recurring_runs = models.RecurringRun.objects.filter(
1036            start_date__lte=datetime.datetime.now())
1037        for rrun in recurring_runs:
1038            # Create job from template
1039            job = rrun.job
1040            info = rpc_utils.get_job_info(job)
1041            options = job.get_object_dict()
1042
1043            host_objects = info['hosts']
1044            one_time_hosts = info['one_time_hosts']
1045            metahost_objects = info['meta_hosts']
1046            dependencies = info['dependencies']
1047            atomic_group = info['atomic_group']
1048
1049            for host in one_time_hosts or []:
1050                this_host = models.Host.create_one_time_host(host.hostname)
1051                host_objects.append(this_host)
1052
1053            try:
1054                rpc_utils.create_new_job(owner=rrun.owner.login,
1055                                         options=options,
1056                                         host_objects=host_objects,
1057                                         metahost_objects=metahost_objects,
1058                                         atomic_group=atomic_group)
1059
1060            except Exception, ex:
1061                logging.exception(ex)
1062                #TODO send email
1063
1064            if rrun.loop_count == 1:
1065                rrun.delete()
1066            else:
1067                if rrun.loop_count != 0: # if not infinite loop
1068                    # calculate new start_date
1069                    difference = datetime.timedelta(seconds=rrun.loop_period)
1070                    rrun.start_date = rrun.start_date + difference
1071                    rrun.loop_count -= 1
1072                    rrun.save()
1073
1074
1075SiteDispatcher = utils.import_site_class(
1076    __file__, 'autotest_lib.scheduler.site_monitor_db',
1077    'SiteDispatcher', BaseDispatcher)
1078
1079class Dispatcher(SiteDispatcher):
1080    pass
1081
1082
1083class Agent(object):
1084    """
1085    An agent for use by the Dispatcher class to perform a task.  An agent wraps
1086    around an AgentTask mainly to associate the AgentTask with the queue_entry
1087    and host ids.
1088
1089    The following methods are required on all task objects:
1090        poll() - Called periodically to let the task check its status and
1091                update its internal state.  If the task succeeded.
1092        is_done() - Returns True if the task is finished.
1093        abort() - Called when an abort has been requested.  The task must
1094                set its aborted attribute to True if it actually aborted.
1095
1096    The following attributes are required on all task objects:
1097        aborted - bool, True if this task was aborted.
1098        success - bool, True if this task succeeded.
1099        queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1100        host_ids - A sequence of Host ids this task represents.
1101    """
1102
1103
1104    def __init__(self, task):
1105        """
1106        @param task: An instance of an AgentTask.
1107        """
1108        self.task = task
1109
1110        # This is filled in by Dispatcher.add_agent()
1111        self.dispatcher = None
1112
1113        self.queue_entry_ids = task.queue_entry_ids
1114        self.host_ids = task.host_ids
1115
1116        self.started = False
1117        self.finished = False
1118
1119
1120    def tick(self):
1121        self.started = True
1122        if not self.finished:
1123            self.task.poll()
1124            if self.task.is_done():
1125                self.finished = True
1126
1127
1128    def is_done(self):
1129        return self.finished
1130
1131
1132    def abort(self):
1133        if self.task:
1134            self.task.abort()
1135            if self.task.aborted:
1136                # tasks can choose to ignore aborts
1137                self.finished = True
1138
1139
1140class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
1141    """
1142    Common functionality for QueueTask and HostlessQueueTask
1143    """
1144    def __init__(self, queue_entries):
1145        super(AbstractQueueTask, self).__init__()
1146        self.job = queue_entries[0].job
1147        self.queue_entries = queue_entries
1148
1149
1150    def _keyval_path(self):
1151        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1152
1153
1154    def _write_control_file(self, execution_path):
1155        control_path = _drone_manager.attach_file_to_execution(
1156                execution_path, self.job.control_file)
1157        return control_path
1158
1159
1160    # TODO: Refactor into autoserv_utils. crbug.com/243090
1161    def _command_line(self):
1162        execution_path = self.queue_entries[0].execution_path()
1163        control_path = self._write_control_file(execution_path)
1164        hostnames = ','.join(entry.host.hostname
1165                             for entry in self.queue_entries
1166                             if not entry.is_hostless())
1167
1168        execution_tag = self.queue_entries[0].execution_tag()
1169        params = _autoserv_command_line(
1170            hostnames,
1171            ['-P', execution_tag, '-n',
1172             _drone_manager.absolute_path(control_path)],
1173            job=self.job, verbose=False)
1174        if self.job.is_image_update_job():
1175            params += ['--image', self.job.update_image_path]
1176
1177        return params
1178
1179
1180    @property
1181    def num_processes(self):
1182        return len(self.queue_entries)
1183
1184
1185    @property
1186    def owner_username(self):
1187        return self.job.owner
1188
1189
1190    def _working_directory(self):
1191        return self._get_consistent_execution_path(self.queue_entries)
1192
1193
1194    def prolog(self):
1195        queued_key, queued_time = self._job_queued_keyval(self.job)
1196        keyval_dict = self.job.keyval_dict()
1197        keyval_dict[queued_key] = queued_time
1198        group_name = self.queue_entries[0].get_group_name()
1199        if group_name:
1200            keyval_dict['host_group_name'] = group_name
1201        self._write_keyvals_before_job(keyval_dict)
1202        for queue_entry in self.queue_entries:
1203            queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
1204            queue_entry.set_started_on_now()
1205
1206
1207    def _write_lost_process_error_file(self):
1208        error_file_path = os.path.join(self._working_directory(), 'job_failure')
1209        _drone_manager.write_lines_to_file(error_file_path,
1210                                           [_LOST_PROCESS_ERROR])
1211
1212
1213    def _finish_task(self):
1214        if not self.monitor:
1215            return
1216
1217        self._write_job_finished()
1218
1219        if self.monitor.lost_process:
1220            self._write_lost_process_error_file()
1221
1222
1223    def _write_status_comment(self, comment):
1224        _drone_manager.write_lines_to_file(
1225            os.path.join(self._working_directory(), 'status.log'),
1226            ['INFO\t----\t----\t' + comment],
1227            paired_with_process=self.monitor.get_process())
1228
1229
1230    def _log_abort(self):
1231        if not self.monitor or not self.monitor.has_process():
1232            return
1233
1234        # build up sets of all the aborted_by and aborted_on values
1235        aborted_by, aborted_on = set(), set()
1236        for queue_entry in self.queue_entries:
1237            if queue_entry.aborted_by:
1238                aborted_by.add(queue_entry.aborted_by)
1239                t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1240                aborted_on.add(t)
1241
1242        # extract some actual, unique aborted by value and write it out
1243        # TODO(showard): this conditional is now obsolete, we just need to leave
1244        # it in temporarily for backwards compatibility over upgrades.  delete
1245        # soon.
1246        assert len(aborted_by) <= 1
1247        if len(aborted_by) == 1:
1248            aborted_by_value = aborted_by.pop()
1249            aborted_on_value = max(aborted_on)
1250        else:
1251            aborted_by_value = 'autotest_system'
1252            aborted_on_value = int(time.time())
1253
1254        self._write_keyval_after_job("aborted_by", aborted_by_value)
1255        self._write_keyval_after_job("aborted_on", aborted_on_value)
1256
1257        aborted_on_string = str(datetime.datetime.fromtimestamp(
1258            aborted_on_value))
1259        self._write_status_comment('Job aborted by %s on %s' %
1260                                   (aborted_by_value, aborted_on_string))
1261
1262
1263    def abort(self):
1264        super(AbstractQueueTask, self).abort()
1265        self._log_abort()
1266        self._finish_task()
1267
1268
1269    def epilog(self):
1270        super(AbstractQueueTask, self).epilog()
1271        self._finish_task()
1272
1273
1274class QueueTask(AbstractQueueTask):
1275    def __init__(self, queue_entries):
1276        super(QueueTask, self).__init__(queue_entries)
1277        self._set_ids(queue_entries=queue_entries)
1278
1279
1280    def prolog(self):
1281        self._check_queue_entry_statuses(
1282                self.queue_entries,
1283                allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1284                                      models.HostQueueEntry.Status.RUNNING),
1285                allowed_host_statuses=(models.Host.Status.PENDING,
1286                                       models.Host.Status.RUNNING))
1287
1288        super(QueueTask, self).prolog()
1289
1290        for queue_entry in self.queue_entries:
1291            self._write_host_keyvals(queue_entry.host)
1292            queue_entry.host.set_status(models.Host.Status.RUNNING)
1293            queue_entry.host.update_field('dirty', 1)
1294
1295
1296    def _finish_task(self):
1297        super(QueueTask, self)._finish_task()
1298
1299        for queue_entry in self.queue_entries:
1300            queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
1301            queue_entry.host.set_status(models.Host.Status.RUNNING)
1302
1303
1304    def _command_line(self):
1305        invocation = super(QueueTask, self)._command_line()
1306        # Check if server-side packaging is needed.
1307        if (_enable_ssp_container and
1308            self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1309            self.job.require_ssp != False):
1310            invocation += ['--require-ssp']
1311            keyval_dict = self.job.keyval_dict()
1312            test_source_build = keyval_dict.get('test_source_build', None)
1313            if test_source_build:
1314                invocation += ['--test_source_build', test_source_build]
1315        if self.job.parent_job_id:
1316            invocation += ['--parent_job_id', str(self.job.parent_job_id)]
1317        return invocation + ['--verify_job_repo_url']
1318
1319
1320class HostlessQueueTask(AbstractQueueTask):
1321    def __init__(self, queue_entry):
1322        super(HostlessQueueTask, self).__init__([queue_entry])
1323        self.queue_entry_ids = [queue_entry.id]
1324
1325
1326    def prolog(self):
1327        super(HostlessQueueTask, self).prolog()
1328
1329
1330    def _finish_task(self):
1331        super(HostlessQueueTask, self)._finish_task()
1332
1333        # When a job is added to database, its initial status is always
1334        # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1335        # status, check if any of them can be started. If scheduler hits some
1336        # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1337        # leave these jobs in Starting status. Otherwise, the jobs'
1338        # status will be changed to Running, and an autoserv process
1339        # will be started in drone for each of these jobs.
1340        # If the entry is still in status Starting, the process has not started
1341        # yet. Therefore, there is no need to parse and collect log. Without
1342        # this check, exception will be raised by scheduler as execution_subdir
1343        # for this queue entry does not have a value yet.
1344        hqe = self.queue_entries[0]
1345        if hqe.status != models.HostQueueEntry.Status.STARTING:
1346            hqe.set_status(models.HostQueueEntry.Status.PARSING)
1347
1348
1349if __name__ == '__main__':
1350    main()
1351