1#!/usr/bin/python 2# pylint: disable=missing-docstring 3 4import time 5import unittest 6 7import common 8from autotest_lib.frontend import setup_django_environment 9from autotest_lib.frontend.afe import frontend_test_utils 10from autotest_lib.client.common_lib import global_config 11from autotest_lib.client.common_lib.test_utils import mock 12from autotest_lib.database import database_connection 13from autotest_lib.frontend.afe import models 14from autotest_lib.scheduler import agent_task 15from autotest_lib.scheduler import luciferlib 16from autotest_lib.scheduler import monitor_db, drone_manager 17from autotest_lib.scheduler import pidfile_monitor 18from autotest_lib.scheduler import scheduler_config 19from autotest_lib.scheduler import scheduler_lib 20from autotest_lib.scheduler import scheduler_models 21 22_DEBUG = False 23 24 25class DummyAgentTask(object): 26 num_processes = 1 27 owner_username = 'my_user' 28 29 def get_drone_hostnames_allowed(self): 30 return None 31 32 33class DummyAgent(object): 34 started = False 35 _is_done = False 36 host_ids = () 37 hostnames = {} 38 queue_entry_ids = () 39 40 def __init__(self): 41 self.task = DummyAgentTask() 42 43 44 def tick(self): 45 self.started = True 46 47 48 def is_done(self): 49 return self._is_done 50 51 52 def set_done(self, done): 53 self._is_done = done 54 55 56class IsRow(mock.argument_comparator): 57 def __init__(self, row_id): 58 self.row_id = row_id 59 60 61 def is_satisfied_by(self, parameter): 62 return list(parameter)[0] == self.row_id 63 64 65 def __str__(self): 66 return 'row with id %s' % self.row_id 67 68 69class IsAgentWithTask(mock.argument_comparator): 70 def __init__(self, task): 71 self._task = task 72 73 74 def is_satisfied_by(self, parameter): 75 if not isinstance(parameter, monitor_db.Agent): 76 return False 77 tasks = list(parameter.queue.queue) 78 if len(tasks) != 1: 79 return False 80 return tasks[0] == self._task 81 82 83def _set_host_and_qe_ids(agent_or_task, id_list=None): 84 if id_list is None: 85 id_list = [] 86 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list 87 agent_or_task.hostnames = dict((host_id, '192.168.1.1') 88 for host_id in id_list) 89 90 91class BaseSchedulerTest(unittest.TestCase, 92 frontend_test_utils.FrontendTestMixin): 93 _config_section = 'AUTOTEST_WEB' 94 95 def _do_query(self, sql): 96 self._database.execute(sql) 97 98 99 def _set_monitor_stubs(self): 100 self.mock_config = global_config.FakeGlobalConfig() 101 self.god.stub_with(global_config, 'global_config', self.mock_config) 102 103 # Clear the instance cache as this is a brand new database. 104 scheduler_models.DBObject._clear_instance_cache() 105 106 self._database = ( 107 database_connection.TranslatingDatabase.get_test_database( 108 translators=scheduler_lib._DB_TRANSLATORS)) 109 self._database.connect(db_type='django') 110 self._database.debug = _DEBUG 111 112 connection_manager = scheduler_lib.ConnectionManager(autocommit=False) 113 self.god.stub_with(connection_manager, 'db_connection', self._database) 114 self.god.stub_with(monitor_db, '_db_manager', connection_manager) 115 self.god.stub_with(monitor_db, '_db', self._database) 116 117 self.god.stub_with(monitor_db.Dispatcher, 118 '_get_pending_queue_entries', 119 self._get_pending_hqes) 120 self.god.stub_with(scheduler_models, '_db', self._database) 121 self.god.stub_with(drone_manager.instance(), '_results_dir', 122 '/test/path') 123 self.god.stub_with(drone_manager.instance(), '_temporary_directory', 124 '/test/path/tmp') 125 self.god.stub_with(drone_manager.instance(), 'initialize', 126 lambda *args: None) 127 self.god.stub_with(drone_manager.instance(), 'execute_actions', 128 lambda *args: None) 129 130 monitor_db.initialize_globals() 131 scheduler_models.initialize_globals() 132 133 134 def setUp(self): 135 self._frontend_common_setup() 136 self._set_monitor_stubs() 137 self._set_global_config_values() 138 self._dispatcher = monitor_db.Dispatcher() 139 140 141 def tearDown(self): 142 self._database.disconnect() 143 self._frontend_common_teardown() 144 145 146 def _set_global_config_values(self): 147 """Set global_config values to suit unittest needs.""" 148 self.mock_config.set_config_value( 149 'SCHEDULER', 'inline_host_acquisition', True) 150 151 152 def _update_hqe(self, set, where=''): 153 query = 'UPDATE afe_host_queue_entries SET ' + set 154 if where: 155 query += ' WHERE ' + where 156 self._do_query(query) 157 158 159 def _get_pending_hqes(self): 160 query_string=('afe_jobs.priority DESC, ' 161 'ifnull(nullif(host_id, NULL), host_id) DESC, ' 162 'ifnull(nullif(meta_host, NULL), meta_host) DESC, ' 163 'job_id') 164 return list(scheduler_models.HostQueueEntry.fetch( 165 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)', 166 where='NOT complete AND NOT active AND status="Queued"', 167 order_by=query_string)) 168 169 170class DispatcherSchedulingTest(BaseSchedulerTest): 171 _jobs_scheduled = [] 172 173 174 def tearDown(self): 175 super(DispatcherSchedulingTest, self).tearDown() 176 177 178 def _set_monitor_stubs(self): 179 super(DispatcherSchedulingTest, self)._set_monitor_stubs() 180 181 def hqe__do_schedule_pre_job_tasks_stub(queue_entry): 182 """Called by HostQueueEntry.run().""" 183 self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id) 184 queue_entry.set_status('Starting') 185 186 self.god.stub_with(scheduler_models.HostQueueEntry, 187 '_do_schedule_pre_job_tasks', 188 hqe__do_schedule_pre_job_tasks_stub) 189 190 191 def _record_job_scheduled(self, job_id, host_id): 192 record = (job_id, host_id) 193 self.assert_(record not in self._jobs_scheduled, 194 'Job %d scheduled on host %d twice' % 195 (job_id, host_id)) 196 self._jobs_scheduled.append(record) 197 198 199 def _assert_job_scheduled_on(self, job_id, host_id): 200 record = (job_id, host_id) 201 self.assert_(record in self._jobs_scheduled, 202 'Job %d not scheduled on host %d as expected\n' 203 'Jobs scheduled: %s' % 204 (job_id, host_id, self._jobs_scheduled)) 205 self._jobs_scheduled.remove(record) 206 207 208 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number): 209 """Assert job was scheduled on exactly number hosts out of a set.""" 210 found = [] 211 for host_id in host_ids: 212 record = (job_id, host_id) 213 if record in self._jobs_scheduled: 214 found.append(record) 215 self._jobs_scheduled.remove(record) 216 if len(found) < number: 217 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n' 218 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 219 elif len(found) > number: 220 self.fail('Job %d scheduled on more than %d hosts in %s.\n' 221 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 222 223 224 def _check_for_extra_schedulings(self): 225 if len(self._jobs_scheduled) != 0: 226 self.fail('Extra jobs scheduled: ' + 227 str(self._jobs_scheduled)) 228 229 230 def _convert_jobs_to_metahosts(self, *job_ids): 231 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 232 self._do_query('UPDATE afe_host_queue_entries SET ' 233 'meta_host=host_id, host_id=NULL ' 234 'WHERE job_id IN ' + sql_tuple) 235 236 237 def _lock_host(self, host_id): 238 self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' + 239 str(host_id)) 240 241 242 def setUp(self): 243 super(DispatcherSchedulingTest, self).setUp() 244 self._jobs_scheduled = [] 245 246 247 def _run_scheduler(self): 248 self._dispatcher._host_scheduler.tick() 249 for _ in xrange(2): # metahost scheduling can take two ticks 250 self._dispatcher._schedule_new_jobs() 251 252 253 def _test_basic_scheduling_helper(self, use_metahosts): 254 'Basic nonmetahost scheduling' 255 self._create_job_simple([1], use_metahosts) 256 self._create_job_simple([2], use_metahosts) 257 self._run_scheduler() 258 self._assert_job_scheduled_on(1, 1) 259 self._assert_job_scheduled_on(2, 2) 260 self._check_for_extra_schedulings() 261 262 263 def _test_priorities_helper(self, use_metahosts): 264 'Test prioritization ordering' 265 self._create_job_simple([1], use_metahosts) 266 self._create_job_simple([2], use_metahosts) 267 self._create_job_simple([1,2], use_metahosts) 268 self._create_job_simple([1], use_metahosts, priority=1) 269 self._run_scheduler() 270 self._assert_job_scheduled_on(4, 1) # higher priority 271 self._assert_job_scheduled_on(2, 2) # earlier job over later 272 self._check_for_extra_schedulings() 273 274 275 def _test_hosts_ready_helper(self, use_metahosts): 276 """ 277 Only hosts that are status=Ready, unlocked and not invalid get 278 scheduled. 279 """ 280 self._create_job_simple([1], use_metahosts) 281 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1') 282 self._run_scheduler() 283 self._check_for_extra_schedulings() 284 285 self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 ' 286 'WHERE id=1') 287 self._run_scheduler() 288 self._check_for_extra_schedulings() 289 290 self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 ' 291 'WHERE id=1') 292 self._run_scheduler() 293 if not use_metahosts: 294 self._assert_job_scheduled_on(1, 1) 295 self._check_for_extra_schedulings() 296 297 298 def _test_hosts_idle_helper(self, use_metahosts): 299 'Only idle hosts get scheduled' 300 self._create_job(hosts=[1], active=True) 301 self._create_job_simple([1], use_metahosts) 302 self._run_scheduler() 303 self._check_for_extra_schedulings() 304 305 306 def _test_obey_ACLs_helper(self, use_metahosts): 307 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1') 308 self._create_job_simple([1], use_metahosts) 309 self._run_scheduler() 310 self._check_for_extra_schedulings() 311 312 313 def test_basic_scheduling(self): 314 self._test_basic_scheduling_helper(False) 315 316 317 def test_priorities(self): 318 self._test_priorities_helper(False) 319 320 321 def test_hosts_ready(self): 322 self._test_hosts_ready_helper(False) 323 324 325 def test_hosts_idle(self): 326 self._test_hosts_idle_helper(False) 327 328 329 def test_obey_ACLs(self): 330 self._test_obey_ACLs_helper(False) 331 332 333 def test_one_time_hosts_ignore_ACLs(self): 334 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1') 335 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1') 336 self._create_job_simple([1]) 337 self._run_scheduler() 338 self._assert_job_scheduled_on(1, 1) 339 self._check_for_extra_schedulings() 340 341 342 def test_non_metahost_on_invalid_host(self): 343 """ 344 Non-metahost entries can get scheduled on invalid hosts (this is how 345 one-time hosts work). 346 """ 347 self._do_query('UPDATE afe_hosts SET invalid=1') 348 self._test_basic_scheduling_helper(False) 349 350 351 def test_metahost_scheduling(self): 352 """ 353 Basic metahost scheduling 354 """ 355 self._test_basic_scheduling_helper(True) 356 357 358 def test_metahost_priorities(self): 359 self._test_priorities_helper(True) 360 361 362 def test_metahost_hosts_ready(self): 363 self._test_hosts_ready_helper(True) 364 365 366 def test_metahost_hosts_idle(self): 367 self._test_hosts_idle_helper(True) 368 369 370 def test_metahost_obey_ACLs(self): 371 self._test_obey_ACLs_helper(True) 372 373 374 def test_nonmetahost_over_metahost(self): 375 """ 376 Non-metahost entries should take priority over metahost entries 377 for the same host 378 """ 379 self._create_job(metahosts=[1]) 380 self._create_job(hosts=[1]) 381 self._run_scheduler() 382 self._assert_job_scheduled_on(2, 1) 383 self._check_for_extra_schedulings() 384 385 386 def test_no_execution_subdir_not_found(self): 387 """Reproduce bug crosbug.com/334353 and recover from it.""" 388 389 self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost') 390 391 job = self._create_job(hostless=True) 392 393 # Ensure execution_subdir is set before status 394 original_set_status = scheduler_models.HostQueueEntry.set_status 395 def fake_set_status(hqe, *args, **kwargs): 396 self.assertEqual(hqe.execution_subdir, 'hostless') 397 original_set_status(hqe, *args, **kwargs) 398 399 self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status', 400 fake_set_status) 401 402 self._dispatcher._schedule_new_jobs() 403 404 hqe = job.hostqueueentry_set.all()[0] 405 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 406 self.assertEqual('hostless', hqe.execution_subdir) 407 408 409 def test_only_schedule_queued_entries(self): 410 self._create_job(metahosts=[1]) 411 self._update_hqe(set='active=1, host_id=2') 412 self._run_scheduler() 413 self._check_for_extra_schedulings() 414 415 416 def test_no_ready_hosts(self): 417 self._create_job(hosts=[1]) 418 self._do_query('UPDATE afe_hosts SET status="Repair Failed"') 419 self._run_scheduler() 420 self._check_for_extra_schedulings() 421 422 423class DispatcherThrottlingTest(BaseSchedulerTest): 424 """ 425 Test that the dispatcher throttles: 426 * total number of running processes 427 * number of processes started per cycle 428 """ 429 _MAX_RUNNING = 3 430 _MAX_STARTED = 2 431 432 def setUp(self): 433 super(DispatcherThrottlingTest, self).setUp() 434 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING 435 436 def fake_max_runnable_processes(fake_self, username, 437 drone_hostnames_allowed): 438 running = sum(agent.task.num_processes 439 for agent in self._agents 440 if agent.started and not agent.is_done()) 441 return self._MAX_RUNNING - running 442 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes', 443 fake_max_runnable_processes) 444 445 446 def _setup_some_agents(self, num_agents): 447 self._agents = [DummyAgent() for i in xrange(num_agents)] 448 self._dispatcher._agents = list(self._agents) 449 450 451 def _run_a_few_ticks(self): 452 for i in xrange(4): 453 self._dispatcher._handle_agents() 454 455 456 def _assert_agents_started(self, indexes, is_started=True): 457 for i in indexes: 458 self.assert_(self._agents[i].started == is_started, 459 'Agent %d %sstarted' % 460 (i, is_started and 'not ' or '')) 461 462 463 def _assert_agents_not_started(self, indexes): 464 self._assert_agents_started(indexes, False) 465 466 467 def test_throttle_total(self): 468 self._setup_some_agents(4) 469 self._run_a_few_ticks() 470 self._assert_agents_started([0, 1, 2]) 471 self._assert_agents_not_started([3]) 472 473 474 def test_throttle_with_synchronous(self): 475 self._setup_some_agents(2) 476 self._agents[0].task.num_processes = 3 477 self._run_a_few_ticks() 478 self._assert_agents_started([0]) 479 self._assert_agents_not_started([1]) 480 481 482 def test_large_agent_starvation(self): 483 """ 484 Ensure large agents don't get starved by lower-priority agents. 485 """ 486 self._setup_some_agents(3) 487 self._agents[1].task.num_processes = 3 488 self._run_a_few_ticks() 489 self._assert_agents_started([0]) 490 self._assert_agents_not_started([1, 2]) 491 492 self._agents[0].set_done(True) 493 self._run_a_few_ticks() 494 self._assert_agents_started([1]) 495 self._assert_agents_not_started([2]) 496 497 498 def test_zero_process_agent(self): 499 self._setup_some_agents(5) 500 self._agents[4].task.num_processes = 0 501 self._run_a_few_ticks() 502 self._assert_agents_started([0, 1, 2, 4]) 503 self._assert_agents_not_started([3]) 504 505 506class PidfileRunMonitorTest(unittest.TestCase): 507 execution_tag = 'test_tag' 508 pid = 12345 509 process = drone_manager.Process('myhost', pid) 510 num_tests_failed = 1 511 512 def setUp(self): 513 self.god = mock.mock_god() 514 self.mock_drone_manager = self.god.create_mock_class( 515 drone_manager.DroneManager, 'drone_manager') 516 self.god.stub_with(drone_manager, '_the_instance', 517 self.mock_drone_manager) 518 self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs', 519 self._mock_get_pidfile_timeout_secs) 520 521 self.pidfile_id = object() 522 523 (self.mock_drone_manager.get_pidfile_id_from 524 .expect_call(self.execution_tag, 525 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 526 .and_return(self.pidfile_id)) 527 528 self.monitor = pidfile_monitor.PidfileRunMonitor() 529 self.monitor.attach_to_existing_process(self.execution_tag) 530 531 def tearDown(self): 532 self.god.unstub_all() 533 534 535 def _mock_get_pidfile_timeout_secs(self): 536 return 300 537 538 539 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None, 540 use_second_read=False): 541 contents = drone_manager.PidfileContents() 542 if pid is not None: 543 contents.process = drone_manager.Process('myhost', pid) 544 contents.exit_status = exit_code 545 contents.num_tests_failed = tests_failed 546 self.mock_drone_manager.get_pidfile_contents.expect_call( 547 self.pidfile_id, use_second_read=use_second_read).and_return( 548 contents) 549 550 551 def set_not_yet_run(self): 552 self.setup_pidfile() 553 554 555 def set_empty_pidfile(self): 556 self.setup_pidfile() 557 558 559 def set_running(self, use_second_read=False): 560 self.setup_pidfile(self.pid, use_second_read=use_second_read) 561 562 563 def set_complete(self, error_code, use_second_read=False): 564 self.setup_pidfile(self.pid, error_code, self.num_tests_failed, 565 use_second_read=use_second_read) 566 567 568 def _check_monitor(self, expected_pid, expected_exit_status, 569 expected_num_tests_failed): 570 if expected_pid is None: 571 self.assertEquals(self.monitor._state.process, None) 572 else: 573 self.assertEquals(self.monitor._state.process.pid, expected_pid) 574 self.assertEquals(self.monitor._state.exit_status, expected_exit_status) 575 self.assertEquals(self.monitor._state.num_tests_failed, 576 expected_num_tests_failed) 577 578 579 self.god.check_playback() 580 581 582 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status, 583 expected_num_tests_failed): 584 self.monitor._read_pidfile() 585 self._check_monitor(expected_pid, expected_exit_status, 586 expected_num_tests_failed) 587 588 589 def _get_expected_tests_failed(self, expected_exit_status): 590 if expected_exit_status is None: 591 expected_tests_failed = None 592 else: 593 expected_tests_failed = self.num_tests_failed 594 return expected_tests_failed 595 596 597 def test_read_pidfile(self): 598 self.set_not_yet_run() 599 self._test_read_pidfile_helper(None, None, None) 600 601 self.set_empty_pidfile() 602 self._test_read_pidfile_helper(None, None, None) 603 604 self.set_running() 605 self._test_read_pidfile_helper(self.pid, None, None) 606 607 self.set_complete(123) 608 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed) 609 610 611 def test_read_pidfile_error(self): 612 self.mock_drone_manager.get_pidfile_contents.expect_call( 613 self.pidfile_id, use_second_read=False).and_return( 614 drone_manager.InvalidPidfile('error')) 615 self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException, 616 self.monitor._read_pidfile) 617 self.god.check_playback() 618 619 620 def setup_is_running(self, is_running): 621 self.mock_drone_manager.is_process_running.expect_call( 622 self.process).and_return(is_running) 623 624 625 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status, 626 expected_num_tests_failed): 627 self.monitor._get_pidfile_info() 628 self._check_monitor(expected_pid, expected_exit_status, 629 expected_num_tests_failed) 630 631 632 def test_get_pidfile_info(self): 633 """ 634 normal cases for get_pidfile_info 635 """ 636 # running 637 self.set_running() 638 self.setup_is_running(True) 639 self._test_get_pidfile_info_helper(self.pid, None, None) 640 641 # exited during check 642 self.set_running() 643 self.setup_is_running(False) 644 self.set_complete(123, use_second_read=True) # pidfile gets read again 645 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 646 647 # completed 648 self.set_complete(123) 649 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 650 651 652 def test_get_pidfile_info_running_no_proc(self): 653 """ 654 pidfile shows process running, but no proc exists 655 """ 656 # running but no proc 657 self.set_running() 658 self.setup_is_running(False) 659 self.set_running(use_second_read=True) 660 self._test_get_pidfile_info_helper(self.pid, 1, 0) 661 self.assertTrue(self.monitor.lost_process) 662 663 664 def test_get_pidfile_info_not_yet_run(self): 665 """ 666 pidfile hasn't been written yet 667 """ 668 self.set_not_yet_run() 669 self._test_get_pidfile_info_helper(None, None, None) 670 671 672 def test_process_failed_to_write_pidfile(self): 673 self.set_not_yet_run() 674 self.monitor._start_time = (time.time() - 675 pidfile_monitor._get_pidfile_timeout_secs() - 1) 676 self._test_get_pidfile_info_helper(None, 1, 0) 677 self.assertTrue(self.monitor.lost_process) 678 679 680class AgentTest(unittest.TestCase): 681 def setUp(self): 682 self.god = mock.mock_god() 683 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 684 'dispatcher') 685 686 687 def tearDown(self): 688 self.god.unstub_all() 689 690 691 def _create_mock_task(self, name): 692 task = self.god.create_mock_class(agent_task.AgentTask, name) 693 task.num_processes = 1 694 _set_host_and_qe_ids(task) 695 return task 696 697 def _create_agent(self, task): 698 agent = monitor_db.Agent(task) 699 agent.dispatcher = self._dispatcher 700 return agent 701 702 703 def _finish_agent(self, agent): 704 while not agent.is_done(): 705 agent.tick() 706 707 708 def test_agent_abort(self): 709 task = self._create_mock_task('task') 710 task.poll.expect_call() 711 task.is_done.expect_call().and_return(False) 712 task.abort.expect_call() 713 task.aborted = True 714 715 agent = self._create_agent(task) 716 agent.tick() 717 agent.abort() 718 self._finish_agent(agent) 719 self.god.check_playback() 720 721 722 def _test_agent_abort_before_started_helper(self, ignore_abort=False): 723 task = self._create_mock_task('task') 724 task.abort.expect_call() 725 if ignore_abort: 726 task.aborted = False 727 task.poll.expect_call() 728 task.is_done.expect_call().and_return(True) 729 task.success = True 730 else: 731 task.aborted = True 732 733 agent = self._create_agent(task) 734 agent.abort() 735 self._finish_agent(agent) 736 self.god.check_playback() 737 738 739 def test_agent_abort_before_started(self): 740 self._test_agent_abort_before_started_helper() 741 self._test_agent_abort_before_started_helper(True) 742 743 744class JobSchedulingTest(BaseSchedulerTest): 745 def _test_run_helper(self, expect_agent=True, expect_starting=False, 746 expect_pending=False, lucifer=False): 747 if expect_starting: 748 expected_status = models.HostQueueEntry.Status.STARTING 749 elif expect_pending: 750 expected_status = models.HostQueueEntry.Status.PENDING 751 else: 752 expected_status = models.HostQueueEntry.Status.VERIFYING 753 job = scheduler_models.Job.fetch('id = 1')[0] 754 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 755 assert queue_entry.job is job 756 job.run_if_ready(queue_entry) 757 758 self.god.check_playback() 759 760 self._dispatcher._schedule_running_host_queue_entries() 761 762 actual_status = models.HostQueueEntry.smart_get(1).status 763 self.assertEquals(expected_status, actual_status) 764 765 if lucifer: 766 # TODO(ayatane): Lucifer ruins the fragile expectations 767 # here. In particular, there won't be any agents. 768 return 769 agent = self._dispatcher._agents[0] 770 if not expect_agent: 771 self.assertEquals(agent, None) 772 return 773 774 self.assert_(isinstance(agent, monitor_db.Agent)) 775 self.assert_(agent.task) 776 return agent.task 777 778 779 def test_run_synchronous_ready(self): 780 job = self._create_job(hosts=[1, 2], synchronous=True) 781 self._update_hqe("status='Pending', execution_subdir=''") 782 self._test_run_helper(expect_starting=True, lucifer=True) 783 self._assert_lucifer_called_with(job) 784 785 786 def test_schedule_running_host_queue_entries_fail(self): 787 self._create_job(hosts=[2]) 788 self._update_hqe("status='%s', execution_subdir=''" % 789 models.HostQueueEntry.Status.PENDING) 790 job = scheduler_models.Job.fetch('id = 1')[0] 791 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 792 assert queue_entry.job is job 793 job.run_if_ready(queue_entry) 794 self.assertEqual(queue_entry.status, 795 models.HostQueueEntry.Status.STARTING) 796 self.assert_(queue_entry.execution_subdir) 797 self.god.check_playback() 798 799 class dummy_test_agent(object): 800 task = 'dummy_test_agent' 801 self._dispatcher._register_agent_for_ids( 802 self._dispatcher._host_agents, [queue_entry.host.id], 803 dummy_test_agent) 804 805 # Attempted to schedule on a host that already has an agent. 806 # Verify that it doesn't raise any error. 807 self._dispatcher._schedule_running_host_queue_entries() 808 809 810 def test_schedule_hostless_job(self): 811 job = self._create_job(hostless=True) 812 self.assertEqual(1, job.hostqueueentry_set.count()) 813 hqe_query = scheduler_models.HostQueueEntry.fetch( 814 'id = %s' % job.hostqueueentry_set.all()[0].id) 815 self.assertEqual(1, len(hqe_query)) 816 hqe = hqe_query[0] 817 818 self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status) 819 self.assertEqual(0, len(self._dispatcher._agents)) 820 821 self._dispatcher._schedule_new_jobs() 822 823 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 824 self._assert_lucifer_called_with(job) 825 826 827 def _assert_lucifer_called_with(self, job): 828 calls = [] 829 # Create a thing we can assign attributes on. 830 fake_drone = lambda: None 831 fake_drone.hostname = lambda: 'localhost' 832 def fake(manager, job): 833 calls.append((manager, job)) 834 return fake_drone 835 old = luciferlib.spawn_starting_job_handler 836 try: 837 luciferlib.spawn_starting_job_handler = fake 838 self._dispatcher._send_to_lucifer() 839 finally: 840 luciferlib.spawn_starting_job_handler = old 841 self.assertEqual(calls[0][1], job) 842 843 844class TopLevelFunctionsTest(unittest.TestCase): 845 def setUp(self): 846 self.god = mock.mock_god() 847 848 849 def tearDown(self): 850 self.god.unstub_all() 851 852 853 def test_autoserv_command_line(self): 854 machines = 'abcd12,efgh34' 855 extra_args = ['-Z', 'hello'] 856 expected_command_line_base = set((monitor_db._autoserv_path, '-p', 857 '-m', machines, '-r', 858 '--lab', 'True', 859 drone_manager.WORKING_DIRECTORY)) 860 861 expected_command_line = expected_command_line_base.union( 862 ['--verbose']).union(extra_args) 863 command_line = set( 864 monitor_db._autoserv_command_line(machines, extra_args)) 865 self.assertEqual(expected_command_line, command_line) 866 867 class FakeJob(object): 868 owner = 'Bob' 869 name = 'fake job name' 870 test_retry = 0 871 id = 1337 872 873 class FakeHQE(object): 874 job = FakeJob 875 876 expected_command_line = expected_command_line_base.union( 877 ['-u', FakeJob.owner, '-l', FakeJob.name]) 878 command_line = set(monitor_db._autoserv_command_line( 879 machines, extra_args=[], queue_entry=FakeHQE, verbose=False)) 880 self.assertEqual(expected_command_line, command_line) 881 882 883class AgentTaskTest(unittest.TestCase, 884 frontend_test_utils.FrontendTestMixin): 885 def setUp(self): 886 self._frontend_common_setup() 887 888 889 def tearDown(self): 890 self._frontend_common_teardown() 891 892 893 def _setup_drones(self): 894 self.god.stub_function(models.DroneSet, 'drone_sets_enabled') 895 models.DroneSet.drone_sets_enabled.expect_call().and_return(True) 896 897 drones = [] 898 for x in xrange(4): 899 drones.append(models.Drone.objects.create(hostname=str(x))) 900 901 drone_set_1 = models.DroneSet.objects.create(name='1') 902 drone_set_1.drones.add(*drones[0:2]) 903 drone_set_2 = models.DroneSet.objects.create(name='2') 904 drone_set_2.drones.add(*drones[2:4]) 905 drone_set_3 = models.DroneSet.objects.create(name='3') 906 907 job_1 = self._create_job_simple([self.hosts[0].id], 908 drone_set=drone_set_1) 909 job_2 = self._create_job_simple([self.hosts[0].id], 910 drone_set=drone_set_2) 911 job_3 = self._create_job_simple([self.hosts[0].id], 912 drone_set=drone_set_3) 913 914 job_4 = self._create_job_simple([self.hosts[0].id]) 915 job_4.drone_set = None 916 job_4.save() 917 918 hqe_1 = job_1.hostqueueentry_set.all()[0] 919 hqe_2 = job_2.hostqueueentry_set.all()[0] 920 hqe_3 = job_3.hostqueueentry_set.all()[0] 921 hqe_4 = job_4.hostqueueentry_set.all()[0] 922 923 return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask() 924 925 926 def test_get_drone_hostnames_allowed_no_drones_in_set(self): 927 hqes, task = self._setup_drones() 928 task.queue_entry_ids = (hqes[2].id,) 929 self.assertEqual(set(), task.get_drone_hostnames_allowed()) 930 self.god.check_playback() 931 932 933 def test_get_drone_hostnames_allowed_no_drone_set(self): 934 hqes, task = self._setup_drones() 935 hqe = hqes[3] 936 task.queue_entry_ids = (hqe.id,) 937 938 result = object() 939 940 self.god.stub_function(task, '_user_or_global_default_drone_set') 941 task._user_or_global_default_drone_set.expect_call( 942 hqe.job, hqe.job.user()).and_return(result) 943 944 self.assertEqual(result, task.get_drone_hostnames_allowed()) 945 self.god.check_playback() 946 947 948 def test_get_drone_hostnames_allowed_success(self): 949 hqes, task = self._setup_drones() 950 task.queue_entry_ids = (hqes[0].id,) 951 self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed([])) 952 self.god.check_playback() 953 954 955 def test_get_drone_hostnames_allowed_multiple_jobs(self): 956 hqes, task = self._setup_drones() 957 task.queue_entry_ids = (hqes[0].id, hqes[1].id) 958 self.assertRaises(AssertionError, 959 task.get_drone_hostnames_allowed) 960 self.god.check_playback() 961 962 963 def test_get_drone_hostnames_allowed_no_hqe(self): 964 class MockSpecialTask(object): 965 requested_by = object() 966 967 class MockSpecialAgentTask(agent_task.SpecialAgentTask): 968 task = MockSpecialTask() 969 queue_entry_ids = [] 970 def __init__(self, *args, **kwargs): 971 super(agent_task.SpecialAgentTask, self).__init__() 972 973 task = MockSpecialAgentTask() 974 self.god.stub_function(models.DroneSet, 'drone_sets_enabled') 975 self.god.stub_function(task, '_user_or_global_default_drone_set') 976 977 result = object() 978 models.DroneSet.drone_sets_enabled.expect_call().and_return(True) 979 task._user_or_global_default_drone_set.expect_call( 980 task.task, MockSpecialTask.requested_by).and_return(result) 981 982 self.assertEqual(result, task.get_drone_hostnames_allowed()) 983 self.god.check_playback() 984 985 986 def _setup_test_user_or_global_default_drone_set(self): 987 result = object() 988 class MockDroneSet(object): 989 def get_drone_hostnames(self): 990 return result 991 992 self.god.stub_function(models.DroneSet, 'get_default') 993 models.DroneSet.get_default.expect_call().and_return(MockDroneSet()) 994 return result 995 996 997 def test_user_or_global_default_drone_set(self): 998 expected = object() 999 class MockDroneSet(object): 1000 def get_drone_hostnames(self): 1001 return expected 1002 class MockUser(object): 1003 drone_set = MockDroneSet() 1004 1005 self._setup_test_user_or_global_default_drone_set() 1006 1007 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1008 None, MockUser()) 1009 1010 self.assertEqual(expected, actual) 1011 self.god.check_playback() 1012 1013 1014 def test_user_or_global_default_drone_set_no_user(self): 1015 expected = self._setup_test_user_or_global_default_drone_set() 1016 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1017 None, None) 1018 1019 self.assertEqual(expected, actual) 1020 self.god.check_playback() 1021 1022 1023 def test_user_or_global_default_drone_set_no_user_drone_set(self): 1024 class MockUser(object): 1025 drone_set = None 1026 login = None 1027 1028 expected = self._setup_test_user_or_global_default_drone_set() 1029 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1030 None, MockUser()) 1031 1032 self.assertEqual(expected, actual) 1033 self.god.check_playback() 1034 1035 1036 def test_abort_HostlessQueueTask(self): 1037 hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry, 1038 'HostQueueEntry') 1039 # If hqe is still in STARTING status, aborting the task should finish 1040 # without changing hqe's status. 1041 hqe.status = models.HostQueueEntry.Status.STARTING 1042 hqe.job = None 1043 hqe.id = 0 1044 task = monitor_db.HostlessQueueTask(hqe) 1045 task.abort() 1046 1047 # If hqe is in RUNNING status, aborting the task should change hqe's 1048 # status to Parsing, so FinalReparseTask can be scheduled. 1049 hqe.set_status.expect_call('Parsing') 1050 hqe.status = models.HostQueueEntry.Status.RUNNING 1051 hqe.job = None 1052 hqe.id = 0 1053 task = monitor_db.HostlessQueueTask(hqe) 1054 task.abort() 1055 1056 1057if __name__ == '__main__': 1058 unittest.main() 1059