1#!/usr/bin/python 2""" 3Autotest scheduler 4""" 5 6import datetime 7import gc 8import logging 9import optparse 10import os 11import signal 12import sys 13import time 14 15import common 16from autotest_lib.frontend import setup_django_environment 17 18import django.db 19 20from autotest_lib.client.common_lib import control_data 21from autotest_lib.client.common_lib import global_config 22from autotest_lib.client.common_lib import utils 23from autotest_lib.client.common_lib.cros.graphite import autotest_stats 24from autotest_lib.frontend.afe import models, rpc_utils 25from autotest_lib.scheduler import agent_task, drone_manager 26from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler 27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task 28from autotest_lib.scheduler import postjob_task 29from autotest_lib.scheduler import query_managers 30from autotest_lib.scheduler import scheduler_lib 31from autotest_lib.scheduler import scheduler_models 32from autotest_lib.scheduler import status_server, scheduler_config 33from autotest_lib.server import autoserv_utils 34from autotest_lib.server import system_utils 35from autotest_lib.server import utils as server_utils 36from autotest_lib.site_utils import metadata_reporter 37from autotest_lib.site_utils import server_manager_utils 38 39 40BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' 41PID_FILE_PREFIX = 'monitor_db' 42 43RESULTS_DIR = '.' 44AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') 45 46if os.environ.has_key('AUTOTEST_DIR'): 47 AUTOTEST_PATH = os.environ['AUTOTEST_DIR'] 48AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server') 49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko') 50 51if AUTOTEST_SERVER_DIR not in sys.path: 52 sys.path.insert(0, AUTOTEST_SERVER_DIR) 53 54# error message to leave in results dir when an autoserv process disappears 55# mysteriously 56_LOST_PROCESS_ERROR = """\ 57Autoserv failed abnormally during execution for this job, probably due to a 58system error on the Autotest server. Full results may not be available. Sorry. 59""" 60 61_db_manager = None 62_db = None 63_shutdown = False 64 65# These 2 globals are replaced for testing 66_autoserv_directory = autoserv_utils.autoserv_directory 67_autoserv_path = autoserv_utils.autoserv_path 68_testing_mode = False 69_drone_manager = None 70_inline_host_acquisition = global_config.global_config.get_config_value( 71 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool, 72 default=True) 73 74_enable_ssp_container = global_config.global_config.get_config_value( 75 'AUTOSERV', 'enable_ssp_container', type=bool, 76 default=True) 77 78def _site_init_monitor_db_dummy(): 79 return {} 80 81 82def _verify_default_drone_set_exists(): 83 if (models.DroneSet.drone_sets_enabled() and 84 not models.DroneSet.default_drone_set_name()): 85 raise scheduler_lib.SchedulerError( 86 'Drone sets are enabled, but no default is set') 87 88 89def _sanity_check(): 90 """Make sure the configs are consistent before starting the scheduler""" 91 _verify_default_drone_set_exists() 92 93 94def main(): 95 try: 96 try: 97 main_without_exception_handling() 98 except SystemExit: 99 raise 100 except: 101 logging.exception('Exception escaping in monitor_db') 102 raise 103 finally: 104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX) 105 106 107def main_without_exception_handling(): 108 scheduler_lib.setup_logging( 109 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 110 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)) 111 usage = 'usage: %prog [options] results_dir' 112 parser = optparse.OptionParser(usage) 113 parser.add_option('--recover-hosts', help='Try to recover dead hosts', 114 action='store_true') 115 parser.add_option('--test', help='Indicate that scheduler is under ' + 116 'test and should use dummy autoserv and no parsing', 117 action='store_true') 118 parser.add_option('--production', 119 help=('Indicate that scheduler is running in production ' 120 'environment and it can use database that is not ' 121 'hosted in localhost. If it is set to False, ' 122 'scheduler will fail if database is not in ' 123 'localhost.'), 124 action='store_true', default=False) 125 (options, args) = parser.parse_args() 126 if len(args) != 1: 127 parser.print_usage() 128 return 129 130 scheduler_lib.check_production_settings(options) 131 132 scheduler_enabled = global_config.global_config.get_config_value( 133 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool) 134 135 if not scheduler_enabled: 136 logging.error("Scheduler not enabled, set enable_scheduler to true in " 137 "the global_config's SCHEDULER section to enable it. " 138 "Exiting.") 139 sys.exit(1) 140 141 global RESULTS_DIR 142 RESULTS_DIR = args[0] 143 144 site_init = utils.import_site_function(__file__, 145 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db", 146 _site_init_monitor_db_dummy) 147 site_init() 148 149 # Change the cwd while running to avoid issues incase we were launched from 150 # somewhere odd (such as a random NFS home directory of the person running 151 # sudo to launch us as the appropriate user). 152 os.chdir(RESULTS_DIR) 153 154 # This is helpful for debugging why stuff a scheduler launches is 155 # misbehaving. 156 logging.info('os.environ: %s', os.environ) 157 158 if options.test: 159 global _autoserv_path 160 _autoserv_path = 'autoserv_dummy' 161 global _testing_mode 162 _testing_mode = True 163 164 server = status_server.StatusServer() 165 server.start() 166 167 # Start the thread to report metadata. 168 metadata_reporter.start() 169 170 try: 171 initialize() 172 dispatcher = Dispatcher() 173 dispatcher.initialize(recover_hosts=options.recover_hosts) 174 minimum_tick_sec = global_config.global_config.get_config_value( 175 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float) 176 177 while not _shutdown and not server._shutdown_scheduler: 178 start = time.time() 179 dispatcher.tick() 180 curr_tick_sec = time.time() - start 181 if (minimum_tick_sec > curr_tick_sec): 182 time.sleep(minimum_tick_sec - curr_tick_sec) 183 else: 184 time.sleep(0.0001) 185 except Exception: 186 email_manager.manager.log_stacktrace( 187 "Uncaught exception; terminating monitor_db") 188 189 metadata_reporter.abort() 190 email_manager.manager.send_queued_emails() 191 server.shutdown() 192 _drone_manager.shutdown() 193 _db_manager.disconnect() 194 195 196def handle_signal(signum, frame): 197 global _shutdown 198 _shutdown = True 199 logging.info("Shutdown request received.") 200 201 202def initialize(): 203 logging.info("%s> dispatcher starting", time.strftime("%X %x")) 204 logging.info("My PID is %d", os.getpid()) 205 206 if utils.program_is_alive(PID_FILE_PREFIX): 207 logging.critical("monitor_db already running, aborting!") 208 sys.exit(1) 209 utils.write_pid(PID_FILE_PREFIX) 210 211 if _testing_mode: 212 global_config.global_config.override_config_value( 213 scheduler_lib.DB_CONFIG_SECTION, 'database', 214 'stresstest_autotest_web') 215 216 # If server database is enabled, check if the server has role `scheduler`. 217 # If the server does not have scheduler role, exception will be raised and 218 # scheduler will not continue to run. 219 if server_manager_utils.use_server_db(): 220 server_manager_utils.confirm_server_has_role(hostname='localhost', 221 role='scheduler') 222 223 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH'] 224 global _db_manager 225 _db_manager = scheduler_lib.ConnectionManager() 226 global _db 227 _db = _db_manager.get_connection() 228 logging.info("Setting signal handler") 229 signal.signal(signal.SIGINT, handle_signal) 230 signal.signal(signal.SIGTERM, handle_signal) 231 232 initialize_globals() 233 scheduler_models.initialize() 234 235 drone_list = system_utils.get_drones() 236 results_host = global_config.global_config.get_config_value( 237 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost') 238 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host) 239 240 logging.info("Connected! Running...") 241 242 243def initialize_globals(): 244 global _drone_manager 245 _drone_manager = drone_manager.instance() 246 247 248def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None, 249 verbose=True): 250 """ 251 @returns The autoserv command line as a list of executable + parameters. 252 253 @param machines - string - A machine or comma separated list of machines 254 for the (-m) flag. 255 @param extra_args - list - Additional arguments to pass to autoserv. 256 @param job - Job object - If supplied, -u owner, -l name, --test-retry, 257 and client -c or server -s parameters will be added. 258 @param queue_entry - A HostQueueEntry object - If supplied and no Job 259 object was supplied, this will be used to lookup the Job object. 260 """ 261 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory, 262 machines, results_directory=drone_manager.WORKING_DIRECTORY, 263 extra_args=extra_args, job=job, queue_entry=queue_entry, 264 verbose=verbose, in_lab=True) 265 return command 266 267 268class BaseDispatcher(object): 269 270 271 def __init__(self): 272 self._agents = [] 273 self._last_clean_time = time.time() 274 user_cleanup_time = scheduler_config.config.clean_interval_minutes 275 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( 276 _db, user_cleanup_time) 277 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep( 278 _db, _drone_manager) 279 self._host_agents = {} 280 self._queue_entry_agents = {} 281 self._tick_count = 0 282 self._last_garbage_stats_time = time.time() 283 self._seconds_between_garbage_stats = 60 * ( 284 global_config.global_config.get_config_value( 285 scheduler_config.CONFIG_SECTION, 286 'gc_stats_interval_mins', type=int, default=6*60)) 287 self._tick_debug = global_config.global_config.get_config_value( 288 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool, 289 default=False) 290 self._extra_debugging = global_config.global_config.get_config_value( 291 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool, 292 default=False) 293 294 # If _inline_host_acquisition is set the scheduler will acquire and 295 # release hosts against jobs inline, with the tick. Otherwise the 296 # scheduler will only focus on jobs that already have hosts, and 297 # will not explicitly unlease a host when a job finishes using it. 298 self._job_query_manager = query_managers.AFEJobQueryManager() 299 self._host_scheduler = (host_scheduler.BaseHostScheduler() 300 if _inline_host_acquisition else 301 host_scheduler.DummyHostScheduler()) 302 303 304 def initialize(self, recover_hosts=True): 305 self._periodic_cleanup.initialize() 306 self._24hr_upkeep.initialize() 307 # Execute all actions queued in the cleanup tasks. Scheduler tick will 308 # run a refresh task first. If there is any action in the queue, refresh 309 # will raise an exception. 310 _drone_manager.execute_actions() 311 312 # always recover processes 313 self._recover_processes() 314 315 if recover_hosts: 316 self._recover_hosts() 317 318 319 def _log_tick_msg(self, msg): 320 if self._tick_debug: 321 logging.debug(msg) 322 323 324 def _log_extra_msg(self, msg): 325 if self._extra_debugging: 326 logging.debug(msg) 327 328 329 def tick(self): 330 """ 331 This is an altered version of tick() where we keep track of when each 332 major step begins so we can try to figure out where we are using most 333 of the tick time. 334 """ 335 timer = autotest_stats.Timer('scheduler.tick') 336 system_utils.DroneCache.refresh() 337 self._log_tick_msg('Calling new tick, starting garbage collection().') 338 self._garbage_collection() 339 self._log_tick_msg('Calling _drone_manager.trigger_refresh().') 340 _drone_manager.trigger_refresh() 341 self._log_tick_msg('Calling _process_recurring_runs().') 342 self._process_recurring_runs() 343 self._log_tick_msg('Calling _schedule_delay_tasks().') 344 self._schedule_delay_tasks() 345 self._log_tick_msg('Calling _schedule_running_host_queue_entries().') 346 self._schedule_running_host_queue_entries() 347 self._log_tick_msg('Calling _schedule_special_tasks().') 348 self._schedule_special_tasks() 349 self._log_tick_msg('Calling _schedule_new_jobs().') 350 self._schedule_new_jobs() 351 self._log_tick_msg('Calling _drone_manager.sync_refresh().') 352 _drone_manager.sync_refresh() 353 # _run_cleanup must be called between drone_manager.sync_refresh, and 354 # drone_manager.execute_actions, as sync_refresh will clear the calls 355 # queued in drones. Therefore, any action that calls drone.queue_call 356 # to add calls to the drone._calls, should be after drone refresh is 357 # completed and before drone_manager.execute_actions at the end of the 358 # tick. 359 self._log_tick_msg('Calling _run_cleanup().') 360 self._run_cleanup() 361 self._log_tick_msg('Calling _find_aborting().') 362 self._find_aborting() 363 self._log_tick_msg('Calling _find_aborted_special_tasks().') 364 self._find_aborted_special_tasks() 365 self._log_tick_msg('Calling _handle_agents().') 366 self._handle_agents() 367 self._log_tick_msg('Calling _host_scheduler.tick().') 368 self._host_scheduler.tick() 369 self._log_tick_msg('Calling _drone_manager.execute_actions().') 370 _drone_manager.execute_actions() 371 self._log_tick_msg('Calling ' 372 'email_manager.manager.send_queued_emails().') 373 with timer.get_client('email_manager_send_queued_emails'): 374 email_manager.manager.send_queued_emails() 375 self._log_tick_msg('Calling django.db.reset_queries().') 376 with timer.get_client('django_db_reset_queries'): 377 django.db.reset_queries() 378 self._tick_count += 1 379 380 381 def _run_cleanup(self): 382 self._periodic_cleanup.run_cleanup_maybe() 383 self._24hr_upkeep.run_cleanup_maybe() 384 385 386 def _garbage_collection(self): 387 threshold_time = time.time() - self._seconds_between_garbage_stats 388 if threshold_time < self._last_garbage_stats_time: 389 # Don't generate these reports very often. 390 return 391 392 self._last_garbage_stats_time = time.time() 393 # Force a full level 0 collection (because we can, it doesn't hurt 394 # at this interval). 395 gc.collect() 396 logging.info('Logging garbage collector stats on tick %d.', 397 self._tick_count) 398 gc_stats._log_garbage_collector_stats() 399 400 401 def _register_agent_for_ids(self, agent_dict, object_ids, agent): 402 for object_id in object_ids: 403 agent_dict.setdefault(object_id, set()).add(agent) 404 405 406 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent): 407 for object_id in object_ids: 408 assert object_id in agent_dict 409 agent_dict[object_id].remove(agent) 410 # If an ID has no more active agent associated, there is no need to 411 # keep it in the dictionary. Otherwise, scheduler will keep an 412 # unnecessarily big dictionary until being restarted. 413 if not agent_dict[object_id]: 414 agent_dict.pop(object_id) 415 416 417 def add_agent_task(self, agent_task): 418 """ 419 Creates and adds an agent to the dispatchers list. 420 421 In creating the agent we also pass on all the queue_entry_ids and 422 host_ids from the special agent task. For every agent we create, we 423 add it to 1. a dict against the queue_entry_ids given to it 2. A dict 424 against the host_ids given to it. So theoritically, a host can have any 425 number of agents associated with it, and each of them can have any 426 special agent task, though in practice we never see > 1 agent/task per 427 host at any time. 428 429 @param agent_task: A SpecialTask for the agent to manage. 430 """ 431 agent = Agent(agent_task) 432 self._agents.append(agent) 433 agent.dispatcher = self 434 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent) 435 self._register_agent_for_ids(self._queue_entry_agents, 436 agent.queue_entry_ids, agent) 437 438 439 def get_agents_for_entry(self, queue_entry): 440 """ 441 Find agents corresponding to the specified queue_entry. 442 """ 443 return list(self._queue_entry_agents.get(queue_entry.id, set())) 444 445 446 def host_has_agent(self, host): 447 """ 448 Determine if there is currently an Agent present using this host. 449 """ 450 return bool(self._host_agents.get(host.id, None)) 451 452 453 def remove_agent(self, agent): 454 self._agents.remove(agent) 455 self._unregister_agent_for_ids(self._host_agents, agent.host_ids, 456 agent) 457 self._unregister_agent_for_ids(self._queue_entry_agents, 458 agent.queue_entry_ids, agent) 459 460 461 def _host_has_scheduled_special_task(self, host): 462 return bool(models.SpecialTask.objects.filter(host__id=host.id, 463 is_active=False, 464 is_complete=False)) 465 466 467 def _recover_processes(self): 468 agent_tasks = self._create_recovery_agent_tasks() 469 self._register_pidfiles(agent_tasks) 470 _drone_manager.refresh() 471 self._recover_tasks(agent_tasks) 472 self._recover_pending_entries() 473 self._check_for_unrecovered_verifying_entries() 474 self._reverify_remaining_hosts() 475 # reinitialize drones after killing orphaned processes, since they can 476 # leave around files when they die 477 _drone_manager.execute_actions() 478 _drone_manager.reinitialize_drones() 479 480 481 def _create_recovery_agent_tasks(self): 482 return (self._get_queue_entry_agent_tasks() 483 + self._get_special_task_agent_tasks(is_active=True)) 484 485 486 def _get_queue_entry_agent_tasks(self): 487 """ 488 Get agent tasks for all hqe in the specified states. 489 490 Loosely this translates to taking a hqe in one of the specified states, 491 say parsing, and getting an AgentTask for it, like the FinalReparseTask, 492 through _get_agent_task_for_queue_entry. Each queue entry can only have 493 one agent task at a time, but there might be multiple queue entries in 494 the group. 495 496 @return: A list of AgentTasks. 497 """ 498 # host queue entry statuses handled directly by AgentTasks (Verifying is 499 # handled through SpecialTasks, so is not listed here) 500 statuses = (models.HostQueueEntry.Status.STARTING, 501 models.HostQueueEntry.Status.RUNNING, 502 models.HostQueueEntry.Status.GATHERING, 503 models.HostQueueEntry.Status.PARSING, 504 models.HostQueueEntry.Status.ARCHIVING) 505 status_list = ','.join("'%s'" % status for status in statuses) 506 queue_entries = scheduler_models.HostQueueEntry.fetch( 507 where='status IN (%s)' % status_list) 508 autotest_stats.Gauge('scheduler.jobs_per_tick').send( 509 'running', len(queue_entries)) 510 511 agent_tasks = [] 512 used_queue_entries = set() 513 for entry in queue_entries: 514 if self.get_agents_for_entry(entry): 515 # already being handled 516 continue 517 if entry in used_queue_entries: 518 # already picked up by a synchronous job 519 continue 520 agent_task = self._get_agent_task_for_queue_entry(entry) 521 agent_tasks.append(agent_task) 522 used_queue_entries.update(agent_task.queue_entries) 523 return agent_tasks 524 525 526 def _get_special_task_agent_tasks(self, is_active=False): 527 special_tasks = models.SpecialTask.objects.filter( 528 is_active=is_active, is_complete=False) 529 return [self._get_agent_task_for_special_task(task) 530 for task in special_tasks] 531 532 533 def _get_agent_task_for_queue_entry(self, queue_entry): 534 """ 535 Construct an AgentTask instance for the given active HostQueueEntry. 536 537 @param queue_entry: a HostQueueEntry 538 @return: an AgentTask to run the queue entry 539 """ 540 task_entries = queue_entry.job.get_group_entries(queue_entry) 541 self._check_for_duplicate_host_entries(task_entries) 542 543 if queue_entry.status in (models.HostQueueEntry.Status.STARTING, 544 models.HostQueueEntry.Status.RUNNING): 545 if queue_entry.is_hostless(): 546 return HostlessQueueTask(queue_entry=queue_entry) 547 return QueueTask(queue_entries=task_entries) 548 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: 549 return postjob_task.GatherLogsTask(queue_entries=task_entries) 550 if queue_entry.status == models.HostQueueEntry.Status.PARSING: 551 return postjob_task.FinalReparseTask(queue_entries=task_entries) 552 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: 553 return postjob_task.ArchiveResultsTask(queue_entries=task_entries) 554 555 raise scheduler_lib.SchedulerError( 556 '_get_agent_task_for_queue_entry got entry with ' 557 'invalid status %s: %s' % (queue_entry.status, queue_entry)) 558 559 560 def _check_for_duplicate_host_entries(self, task_entries): 561 non_host_statuses = (models.HostQueueEntry.Status.PARSING, 562 models.HostQueueEntry.Status.ARCHIVING) 563 for task_entry in task_entries: 564 using_host = (task_entry.host is not None 565 and task_entry.status not in non_host_statuses) 566 if using_host: 567 self._assert_host_has_no_agent(task_entry) 568 569 570 def _assert_host_has_no_agent(self, entry): 571 """ 572 @param entry: a HostQueueEntry or a SpecialTask 573 """ 574 if self.host_has_agent(entry.host): 575 agent = tuple(self._host_agents.get(entry.host.id))[0] 576 raise scheduler_lib.SchedulerError( 577 'While scheduling %s, host %s already has a host agent %s' 578 % (entry, entry.host, agent.task)) 579 580 581 def _get_agent_task_for_special_task(self, special_task): 582 """ 583 Construct an AgentTask class to run the given SpecialTask and add it 584 to this dispatcher. 585 586 A special task is created through schedule_special_tasks, but only if 587 the host doesn't already have an agent. This happens through 588 add_agent_task. All special agent tasks are given a host on creation, 589 and a Null hqe. To create a SpecialAgentTask object, you need a 590 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask 591 object contains a hqe it's passed on to the special agent task, which 592 creates a HostQueueEntry and saves it as it's queue_entry. 593 594 @param special_task: a models.SpecialTask instance 595 @returns an AgentTask to run this SpecialTask 596 """ 597 self._assert_host_has_no_agent(special_task) 598 599 special_agent_task_classes = (prejob_task.CleanupTask, 600 prejob_task.VerifyTask, 601 prejob_task.RepairTask, 602 prejob_task.ResetTask, 603 prejob_task.ProvisionTask) 604 605 for agent_task_class in special_agent_task_classes: 606 if agent_task_class.TASK_TYPE == special_task.task: 607 return agent_task_class(task=special_task) 608 609 raise scheduler_lib.SchedulerError( 610 'No AgentTask class for task', str(special_task)) 611 612 613 def _register_pidfiles(self, agent_tasks): 614 for agent_task in agent_tasks: 615 agent_task.register_necessary_pidfiles() 616 617 618 def _recover_tasks(self, agent_tasks): 619 orphans = _drone_manager.get_orphaned_autoserv_processes() 620 621 for agent_task in agent_tasks: 622 agent_task.recover() 623 if agent_task.monitor and agent_task.monitor.has_process(): 624 orphans.discard(agent_task.monitor.get_process()) 625 self.add_agent_task(agent_task) 626 627 self._check_for_remaining_orphan_processes(orphans) 628 629 630 def _get_unassigned_entries(self, status): 631 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'" 632 % status): 633 if entry.status == status and not self.get_agents_for_entry(entry): 634 # The status can change during iteration, e.g., if job.run() 635 # sets a group of queue entries to Starting 636 yield entry 637 638 639 def _check_for_remaining_orphan_processes(self, orphans): 640 if not orphans: 641 return 642 subject = 'Unrecovered orphan autoserv processes remain' 643 message = '\n'.join(str(process) for process in orphans) 644 email_manager.manager.enqueue_notify_email(subject, message) 645 646 die_on_orphans = global_config.global_config.get_config_value( 647 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool) 648 649 if die_on_orphans: 650 raise RuntimeError(subject + '\n' + message) 651 652 653 def _recover_pending_entries(self): 654 for entry in self._get_unassigned_entries( 655 models.HostQueueEntry.Status.PENDING): 656 logging.info('Recovering Pending entry %s', entry) 657 entry.on_pending() 658 659 660 def _check_for_unrecovered_verifying_entries(self): 661 queue_entries = scheduler_models.HostQueueEntry.fetch( 662 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING) 663 unrecovered_hqes = [] 664 for queue_entry in queue_entries: 665 special_tasks = models.SpecialTask.objects.filter( 666 task__in=(models.SpecialTask.Task.CLEANUP, 667 models.SpecialTask.Task.VERIFY), 668 queue_entry__id=queue_entry.id, 669 is_complete=False) 670 if special_tasks.count() == 0: 671 unrecovered_hqes.append(queue_entry) 672 673 if unrecovered_hqes: 674 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) 675 raise scheduler_lib.SchedulerError( 676 '%d unrecovered verifying host queue entries:\n%s' % 677 (len(unrecovered_hqes), message)) 678 679 680 def _schedule_special_tasks(self): 681 """ 682 Execute queued SpecialTasks that are ready to run on idle hosts. 683 684 Special tasks include PreJobTasks like verify, reset and cleanup. 685 They are created through _schedule_new_jobs and associated with a hqe 686 This method translates SpecialTasks to the appropriate AgentTask and 687 adds them to the dispatchers agents list, so _handle_agents can execute 688 them. 689 """ 690 # When the host scheduler is responsible for acquisition we only want 691 # to run tasks with leased hosts. All hqe tasks will already have 692 # leased hosts, and we don't want to run frontend tasks till the host 693 # scheduler has vetted the assignment. Note that this doesn't include 694 # frontend tasks with hosts leased by other active hqes. 695 for task in self._job_query_manager.get_prioritized_special_tasks( 696 only_tasks_with_leased_hosts=not _inline_host_acquisition): 697 if self.host_has_agent(task.host): 698 continue 699 self.add_agent_task(self._get_agent_task_for_special_task(task)) 700 701 702 def _reverify_remaining_hosts(self): 703 # recover active hosts that have not yet been recovered, although this 704 # should never happen 705 message = ('Recovering active host %s - this probably indicates a ' 706 'scheduler bug') 707 self._reverify_hosts_where( 708 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')", 709 print_message=message) 710 711 712 def _reverify_hosts_where(self, where, 713 print_message='Reverifying host %s'): 714 full_where='locked = 0 AND invalid = 0 AND ' + where 715 for host in scheduler_models.Host.fetch(where=full_where): 716 if self.host_has_agent(host): 717 # host has already been recovered in some way 718 continue 719 if self._host_has_scheduled_special_task(host): 720 # host will have a special task scheduled on the next tick 721 continue 722 if print_message: 723 logging.info(print_message, host.hostname) 724 models.SpecialTask.objects.create( 725 task=models.SpecialTask.Task.CLEANUP, 726 host=models.Host.objects.get(id=host.id)) 727 728 729 def _recover_hosts(self): 730 # recover "Repair Failed" hosts 731 message = 'Reverifying dead host %s' 732 self._reverify_hosts_where("status = 'Repair Failed'", 733 print_message=message) 734 735 736 def _refresh_pending_queue_entries(self): 737 """ 738 Lookup the pending HostQueueEntries and call our HostScheduler 739 refresh() method given that list. Return the list. 740 741 @returns A list of pending HostQueueEntries sorted in priority order. 742 """ 743 queue_entries = self._job_query_manager.get_pending_queue_entries( 744 only_hostless=not _inline_host_acquisition) 745 if not queue_entries: 746 return [] 747 return queue_entries 748 749 750 def _schedule_hostless_job(self, queue_entry): 751 """Schedule a hostless (suite) job. 752 753 @param queue_entry: The queue_entry representing the hostless job. 754 """ 755 self.add_agent_task(HostlessQueueTask(queue_entry)) 756 757 # Need to set execution_subdir before setting the status: 758 # After a restart of the scheduler, agents will be restored for HQEs in 759 # Starting, Running, Gathering, Parsing or Archiving. To do this, the 760 # execution_subdir is needed. Therefore it must be set before entering 761 # one of these states. 762 # Otherwise, if the scheduler was interrupted between setting the status 763 # and the execution_subdir, upon it's restart restoring agents would 764 # fail. 765 # Is there a way to get a status in one of these states without going 766 # through this code? Following cases are possible: 767 # - If it's aborted before being started: 768 # active bit will be 0, so there's nothing to parse, it will just be 769 # set to completed by _find_aborting. Critical statuses are skipped. 770 # - If it's aborted or it fails after being started: 771 # It was started, so this code was executed. 772 queue_entry.update_field('execution_subdir', 'hostless') 773 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 774 775 776 def _schedule_host_job(self, host, queue_entry): 777 """Schedules a job on the given host. 778 779 1. Assign the host to the hqe, if it isn't already assigned. 780 2. Create a SpecialAgentTask for the hqe. 781 3. Activate the hqe. 782 783 @param queue_entry: The job to schedule. 784 @param host: The host to schedule the job on. 785 """ 786 if self.host_has_agent(host): 787 host_agent_task = list(self._host_agents.get(host.id))[0].task 788 subject = 'Host with agents assigned to an HQE' 789 message = ('HQE: %s assigned host %s, but the host has ' 790 'agent: %s for queue_entry %s. The HQE ' 791 'will have to try and acquire a host next tick ' % 792 (queue_entry, host.hostname, host_agent_task, 793 host_agent_task.queue_entry)) 794 email_manager.manager.enqueue_notify_email(subject, message) 795 else: 796 self._host_scheduler.schedule_host_job(host, queue_entry) 797 798 799 def _schedule_new_jobs(self): 800 """ 801 Find any new HQEs and call schedule_pre_job_tasks for it. 802 803 This involves setting the status of the HQE and creating a row in the 804 db corresponding the the special task, through 805 scheduler_models._queue_special_task. The new db row is then added as 806 an agent to the dispatcher through _schedule_special_tasks and 807 scheduled for execution on the drone through _handle_agents. 808 """ 809 queue_entries = self._refresh_pending_queue_entries() 810 811 key = 'scheduler.jobs_per_tick' 812 new_hostless_jobs = 0 813 new_jobs_with_hosts = 0 814 new_jobs_need_hosts = 0 815 host_jobs = [] 816 logging.debug('Processing %d queue_entries', len(queue_entries)) 817 818 for queue_entry in queue_entries: 819 if queue_entry.is_hostless(): 820 self._schedule_hostless_job(queue_entry) 821 new_hostless_jobs = new_hostless_jobs + 1 822 else: 823 host_jobs.append(queue_entry) 824 new_jobs_need_hosts = new_jobs_need_hosts + 1 825 826 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs) 827 if not host_jobs: 828 return 829 if not _inline_host_acquisition: 830 message = ('Found %s jobs that need hosts though ' 831 '_inline_host_acquisition=%s. Will acquire hosts.' % 832 ([str(job) for job in host_jobs], 833 _inline_host_acquisition)) 834 email_manager.manager.enqueue_notify_email( 835 'Processing unexpected host acquisition requests', message) 836 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs) 837 for host_assignment in jobs_with_hosts: 838 self._schedule_host_job(host_assignment.host, host_assignment.job) 839 new_jobs_with_hosts = new_jobs_with_hosts + 1 840 841 autotest_stats.Gauge(key).send('new_jobs_with_hosts', 842 new_jobs_with_hosts) 843 autotest_stats.Gauge(key).send('new_jobs_without_hosts', 844 new_jobs_need_hosts - 845 new_jobs_with_hosts) 846 847 848 def _schedule_running_host_queue_entries(self): 849 """ 850 Adds agents to the dispatcher. 851 852 Any AgentTask, like the QueueTask, is wrapped in an Agent. The 853 QueueTask for example, will have a job with a control file, and 854 the agent will have methods that poll, abort and check if the queue 855 task is finished. The dispatcher runs the agent_task, as well as 856 other agents in it's _agents member, through _handle_agents, by 857 calling the Agents tick(). 858 859 This method creates an agent for each HQE in one of (starting, running, 860 gathering, parsing, archiving) states, and adds it to the dispatcher so 861 it is handled by _handle_agents. 862 """ 863 for agent_task in self._get_queue_entry_agent_tasks(): 864 self.add_agent_task(agent_task) 865 866 867 def _schedule_delay_tasks(self): 868 for entry in scheduler_models.HostQueueEntry.fetch( 869 where='status = "%s"' % models.HostQueueEntry.Status.WAITING): 870 task = entry.job.schedule_delayed_callback_task(entry) 871 if task: 872 self.add_agent_task(task) 873 874 875 def _find_aborting(self): 876 """ 877 Looks through the afe_host_queue_entries for an aborted entry. 878 879 The aborted bit is set on an HQE in many ways, the most common 880 being when a user requests an abort through the frontend, which 881 results in an rpc from the afe to abort_host_queue_entries. 882 """ 883 jobs_to_stop = set() 884 for entry in scheduler_models.HostQueueEntry.fetch( 885 where='aborted=1 and complete=0'): 886 887 # If the job is running on a shard, let the shard handle aborting 888 # it and sync back the right status. 889 if entry.job.shard_id is not None and not server_utils.is_shard(): 890 logging.info('Waiting for shard %s to abort hqe %s', 891 entry.job.shard_id, entry) 892 continue 893 894 logging.info('Aborting %s', entry) 895 896 # The task would have started off with both is_complete and 897 # is_active = False. Aborted tasks are neither active nor complete. 898 # For all currently active tasks this will happen through the agent, 899 # but we need to manually update the special tasks that haven't 900 # started yet, because they don't have agents. 901 models.SpecialTask.objects.filter(is_active=False, 902 queue_entry_id=entry.id).update(is_complete=True) 903 904 for agent in self.get_agents_for_entry(entry): 905 agent.abort() 906 entry.abort(self) 907 jobs_to_stop.add(entry.job) 908 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop)) 909 for job in jobs_to_stop: 910 job.stop_if_necessary() 911 912 913 def _find_aborted_special_tasks(self): 914 """ 915 Find SpecialTasks that have been marked for abortion. 916 917 Poll the database looking for SpecialTasks that are active 918 and have been marked for abortion, then abort them. 919 """ 920 921 # The completed and active bits are very important when it comes 922 # to scheduler correctness. The active bit is set through the prolog 923 # of a special task, and reset through the cleanup method of the 924 # SpecialAgentTask. The cleanup is called both through the abort and 925 # epilog. The complete bit is set in several places, and in general 926 # a hanging job will have is_active=1 is_complete=0, while a special 927 # task which completed will have is_active=0 is_complete=1. To check 928 # aborts we directly check active because the complete bit is set in 929 # several places, including the epilog of agent tasks. 930 aborted_tasks = models.SpecialTask.objects.filter(is_active=True, 931 is_aborted=True) 932 for task in aborted_tasks: 933 # There are 2 ways to get the agent associated with a task, 934 # through the host and through the hqe. A special task 935 # always needs a host, but doesn't always need a hqe. 936 for agent in self._host_agents.get(task.host.id, []): 937 if isinstance(agent.task, agent_task.SpecialAgentTask): 938 939 # The epilog preforms critical actions such as 940 # queueing the next SpecialTask, requeuing the 941 # hqe etc, however it doesn't actually kill the 942 # monitor process and set the 'done' bit. Epilogs 943 # assume that the job failed, and that the monitor 944 # process has already written an exit code. The 945 # done bit is a necessary condition for 946 # _handle_agents to schedule any more special 947 # tasks against the host, and it must be set 948 # in addition to is_active, is_complete and success. 949 agent.task.epilog() 950 agent.task.abort() 951 952 953 def _can_start_agent(self, agent, have_reached_limit): 954 # always allow zero-process agents to run 955 if agent.task.num_processes == 0: 956 return True 957 # don't allow any nonzero-process agents to run after we've reached a 958 # limit (this avoids starvation of many-process agents) 959 if have_reached_limit: 960 return False 961 # total process throttling 962 max_runnable_processes = _drone_manager.max_runnable_processes( 963 agent.task.owner_username, 964 agent.task.get_drone_hostnames_allowed()) 965 if agent.task.num_processes > max_runnable_processes: 966 return False 967 return True 968 969 970 def _handle_agents(self): 971 """ 972 Handles agents of the dispatcher. 973 974 Appropriate Agents are added to the dispatcher through 975 _schedule_running_host_queue_entries. These agents each 976 have a task. This method runs the agents task through 977 agent.tick() leading to: 978 agent.start 979 prolog -> AgentTasks prolog 980 For each queue entry: 981 sets host status/status to Running 982 set started_on in afe_host_queue_entries 983 run -> AgentTasks run 984 Creates PidfileRunMonitor 985 Queues the autoserv command line for this AgentTask 986 via the drone manager. These commands are executed 987 through the drone managers execute actions. 988 poll -> AgentTasks/BaseAgentTask poll 989 checks the monitors exit_code. 990 Executes epilog if task is finished. 991 Executes AgentTasks _finish_task 992 finish_task is usually responsible for setting the status 993 of the HQE/host, and updating it's active and complete fileds. 994 995 agent.is_done 996 Removed the agent from the dispatchers _agents queue. 997 Is_done checks the finished bit on the agent, that is 998 set based on the Agents task. During the agents poll 999 we check to see if the monitor process has exited in 1000 it's finish method, and set the success member of the 1001 task based on this exit code. 1002 """ 1003 num_started_this_tick = 0 1004 num_finished_this_tick = 0 1005 have_reached_limit = False 1006 # iterate over copy, so we can remove agents during iteration 1007 logging.debug('Handling %d Agents', len(self._agents)) 1008 for agent in list(self._agents): 1009 self._log_extra_msg('Processing Agent with Host Ids: %s and ' 1010 'queue_entry ids:%s' % (agent.host_ids, 1011 agent.queue_entry_ids)) 1012 if not agent.started: 1013 if not self._can_start_agent(agent, have_reached_limit): 1014 have_reached_limit = True 1015 logging.debug('Reached Limit of allowed running Agents.') 1016 continue 1017 num_started_this_tick += agent.task.num_processes 1018 self._log_extra_msg('Starting Agent') 1019 agent.tick() 1020 self._log_extra_msg('Agent tick completed.') 1021 if agent.is_done(): 1022 num_finished_this_tick += agent.task.num_processes 1023 self._log_extra_msg("Agent finished") 1024 self.remove_agent(agent) 1025 autotest_stats.Gauge('scheduler.jobs_per_tick').send( 1026 'agents_started', num_started_this_tick) 1027 autotest_stats.Gauge('scheduler.jobs_per_tick').send( 1028 'agents_finished', num_finished_this_tick) 1029 logging.info('%d running processes. %d added this tick.', 1030 _drone_manager.total_running_processes(), 1031 num_started_this_tick) 1032 1033 1034 def _process_recurring_runs(self): 1035 recurring_runs = models.RecurringRun.objects.filter( 1036 start_date__lte=datetime.datetime.now()) 1037 for rrun in recurring_runs: 1038 # Create job from template 1039 job = rrun.job 1040 info = rpc_utils.get_job_info(job) 1041 options = job.get_object_dict() 1042 1043 host_objects = info['hosts'] 1044 one_time_hosts = info['one_time_hosts'] 1045 metahost_objects = info['meta_hosts'] 1046 dependencies = info['dependencies'] 1047 atomic_group = info['atomic_group'] 1048 1049 for host in one_time_hosts or []: 1050 this_host = models.Host.create_one_time_host(host.hostname) 1051 host_objects.append(this_host) 1052 1053 try: 1054 rpc_utils.create_new_job(owner=rrun.owner.login, 1055 options=options, 1056 host_objects=host_objects, 1057 metahost_objects=metahost_objects, 1058 atomic_group=atomic_group) 1059 1060 except Exception, ex: 1061 logging.exception(ex) 1062 #TODO send email 1063 1064 if rrun.loop_count == 1: 1065 rrun.delete() 1066 else: 1067 if rrun.loop_count != 0: # if not infinite loop 1068 # calculate new start_date 1069 difference = datetime.timedelta(seconds=rrun.loop_period) 1070 rrun.start_date = rrun.start_date + difference 1071 rrun.loop_count -= 1 1072 rrun.save() 1073 1074 1075SiteDispatcher = utils.import_site_class( 1076 __file__, 'autotest_lib.scheduler.site_monitor_db', 1077 'SiteDispatcher', BaseDispatcher) 1078 1079class Dispatcher(SiteDispatcher): 1080 pass 1081 1082 1083class Agent(object): 1084 """ 1085 An agent for use by the Dispatcher class to perform a task. An agent wraps 1086 around an AgentTask mainly to associate the AgentTask with the queue_entry 1087 and host ids. 1088 1089 The following methods are required on all task objects: 1090 poll() - Called periodically to let the task check its status and 1091 update its internal state. If the task succeeded. 1092 is_done() - Returns True if the task is finished. 1093 abort() - Called when an abort has been requested. The task must 1094 set its aborted attribute to True if it actually aborted. 1095 1096 The following attributes are required on all task objects: 1097 aborted - bool, True if this task was aborted. 1098 success - bool, True if this task succeeded. 1099 queue_entry_ids - A sequence of HostQueueEntry ids this task handles. 1100 host_ids - A sequence of Host ids this task represents. 1101 """ 1102 1103 1104 def __init__(self, task): 1105 """ 1106 @param task: An instance of an AgentTask. 1107 """ 1108 self.task = task 1109 1110 # This is filled in by Dispatcher.add_agent() 1111 self.dispatcher = None 1112 1113 self.queue_entry_ids = task.queue_entry_ids 1114 self.host_ids = task.host_ids 1115 1116 self.started = False 1117 self.finished = False 1118 1119 1120 def tick(self): 1121 self.started = True 1122 if not self.finished: 1123 self.task.poll() 1124 if self.task.is_done(): 1125 self.finished = True 1126 1127 1128 def is_done(self): 1129 return self.finished 1130 1131 1132 def abort(self): 1133 if self.task: 1134 self.task.abort() 1135 if self.task.aborted: 1136 # tasks can choose to ignore aborts 1137 self.finished = True 1138 1139 1140class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals): 1141 """ 1142 Common functionality for QueueTask and HostlessQueueTask 1143 """ 1144 def __init__(self, queue_entries): 1145 super(AbstractQueueTask, self).__init__() 1146 self.job = queue_entries[0].job 1147 self.queue_entries = queue_entries 1148 1149 1150 def _keyval_path(self): 1151 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 1152 1153 1154 def _write_control_file(self, execution_path): 1155 control_path = _drone_manager.attach_file_to_execution( 1156 execution_path, self.job.control_file) 1157 return control_path 1158 1159 1160 # TODO: Refactor into autoserv_utils. crbug.com/243090 1161 def _command_line(self): 1162 execution_path = self.queue_entries[0].execution_path() 1163 control_path = self._write_control_file(execution_path) 1164 hostnames = ','.join(entry.host.hostname 1165 for entry in self.queue_entries 1166 if not entry.is_hostless()) 1167 1168 execution_tag = self.queue_entries[0].execution_tag() 1169 params = _autoserv_command_line( 1170 hostnames, 1171 ['-P', execution_tag, '-n', 1172 _drone_manager.absolute_path(control_path)], 1173 job=self.job, verbose=False) 1174 if self.job.is_image_update_job(): 1175 params += ['--image', self.job.update_image_path] 1176 1177 return params 1178 1179 1180 @property 1181 def num_processes(self): 1182 return len(self.queue_entries) 1183 1184 1185 @property 1186 def owner_username(self): 1187 return self.job.owner 1188 1189 1190 def _working_directory(self): 1191 return self._get_consistent_execution_path(self.queue_entries) 1192 1193 1194 def prolog(self): 1195 queued_key, queued_time = self._job_queued_keyval(self.job) 1196 keyval_dict = self.job.keyval_dict() 1197 keyval_dict[queued_key] = queued_time 1198 group_name = self.queue_entries[0].get_group_name() 1199 if group_name: 1200 keyval_dict['host_group_name'] = group_name 1201 self._write_keyvals_before_job(keyval_dict) 1202 for queue_entry in self.queue_entries: 1203 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING) 1204 queue_entry.set_started_on_now() 1205 1206 1207 def _write_lost_process_error_file(self): 1208 error_file_path = os.path.join(self._working_directory(), 'job_failure') 1209 _drone_manager.write_lines_to_file(error_file_path, 1210 [_LOST_PROCESS_ERROR]) 1211 1212 1213 def _finish_task(self): 1214 if not self.monitor: 1215 return 1216 1217 self._write_job_finished() 1218 1219 if self.monitor.lost_process: 1220 self._write_lost_process_error_file() 1221 1222 1223 def _write_status_comment(self, comment): 1224 _drone_manager.write_lines_to_file( 1225 os.path.join(self._working_directory(), 'status.log'), 1226 ['INFO\t----\t----\t' + comment], 1227 paired_with_process=self.monitor.get_process()) 1228 1229 1230 def _log_abort(self): 1231 if not self.monitor or not self.monitor.has_process(): 1232 return 1233 1234 # build up sets of all the aborted_by and aborted_on values 1235 aborted_by, aborted_on = set(), set() 1236 for queue_entry in self.queue_entries: 1237 if queue_entry.aborted_by: 1238 aborted_by.add(queue_entry.aborted_by) 1239 t = int(time.mktime(queue_entry.aborted_on.timetuple())) 1240 aborted_on.add(t) 1241 1242 # extract some actual, unique aborted by value and write it out 1243 # TODO(showard): this conditional is now obsolete, we just need to leave 1244 # it in temporarily for backwards compatibility over upgrades. delete 1245 # soon. 1246 assert len(aborted_by) <= 1 1247 if len(aborted_by) == 1: 1248 aborted_by_value = aborted_by.pop() 1249 aborted_on_value = max(aborted_on) 1250 else: 1251 aborted_by_value = 'autotest_system' 1252 aborted_on_value = int(time.time()) 1253 1254 self._write_keyval_after_job("aborted_by", aborted_by_value) 1255 self._write_keyval_after_job("aborted_on", aborted_on_value) 1256 1257 aborted_on_string = str(datetime.datetime.fromtimestamp( 1258 aborted_on_value)) 1259 self._write_status_comment('Job aborted by %s on %s' % 1260 (aborted_by_value, aborted_on_string)) 1261 1262 1263 def abort(self): 1264 super(AbstractQueueTask, self).abort() 1265 self._log_abort() 1266 self._finish_task() 1267 1268 1269 def epilog(self): 1270 super(AbstractQueueTask, self).epilog() 1271 self._finish_task() 1272 1273 1274class QueueTask(AbstractQueueTask): 1275 def __init__(self, queue_entries): 1276 super(QueueTask, self).__init__(queue_entries) 1277 self._set_ids(queue_entries=queue_entries) 1278 1279 1280 def prolog(self): 1281 self._check_queue_entry_statuses( 1282 self.queue_entries, 1283 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING, 1284 models.HostQueueEntry.Status.RUNNING), 1285 allowed_host_statuses=(models.Host.Status.PENDING, 1286 models.Host.Status.RUNNING)) 1287 1288 super(QueueTask, self).prolog() 1289 1290 for queue_entry in self.queue_entries: 1291 self._write_host_keyvals(queue_entry.host) 1292 queue_entry.host.set_status(models.Host.Status.RUNNING) 1293 queue_entry.host.update_field('dirty', 1) 1294 1295 1296 def _finish_task(self): 1297 super(QueueTask, self)._finish_task() 1298 1299 for queue_entry in self.queue_entries: 1300 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING) 1301 queue_entry.host.set_status(models.Host.Status.RUNNING) 1302 1303 1304 def _command_line(self): 1305 invocation = super(QueueTask, self)._command_line() 1306 # Check if server-side packaging is needed. 1307 if (_enable_ssp_container and 1308 self.job.control_type == control_data.CONTROL_TYPE.SERVER and 1309 self.job.require_ssp != False): 1310 invocation += ['--require-ssp'] 1311 keyval_dict = self.job.keyval_dict() 1312 test_source_build = keyval_dict.get('test_source_build', None) 1313 if test_source_build: 1314 invocation += ['--test_source_build', test_source_build] 1315 if self.job.parent_job_id: 1316 invocation += ['--parent_job_id', str(self.job.parent_job_id)] 1317 return invocation + ['--verify_job_repo_url'] 1318 1319 1320class HostlessQueueTask(AbstractQueueTask): 1321 def __init__(self, queue_entry): 1322 super(HostlessQueueTask, self).__init__([queue_entry]) 1323 self.queue_entry_ids = [queue_entry.id] 1324 1325 1326 def prolog(self): 1327 super(HostlessQueueTask, self).prolog() 1328 1329 1330 def _finish_task(self): 1331 super(HostlessQueueTask, self)._finish_task() 1332 1333 # When a job is added to database, its initial status is always 1334 # Starting. In a scheduler tick, scheduler finds all jobs in Starting 1335 # status, check if any of them can be started. If scheduler hits some 1336 # limit, e.g., max_hostless_jobs_per_drone, scheduler will 1337 # leave these jobs in Starting status. Otherwise, the jobs' 1338 # status will be changed to Running, and an autoserv process 1339 # will be started in drone for each of these jobs. 1340 # If the entry is still in status Starting, the process has not started 1341 # yet. Therefore, there is no need to parse and collect log. Without 1342 # this check, exception will be raised by scheduler as execution_subdir 1343 # for this queue entry does not have a value yet. 1344 hqe = self.queue_entries[0] 1345 if hqe.status != models.HostQueueEntry.Status.STARTING: 1346 hqe.set_status(models.HostQueueEntry.Status.PARSING) 1347 1348 1349if __name__ == '__main__': 1350 main() 1351