1#!/usr/bin/python 2 3import gc 4import time 5import unittest 6 7import common 8from autotest_lib.frontend import setup_django_environment 9from autotest_lib.frontend.afe import frontend_test_utils 10from autotest_lib.client.common_lib import global_config 11from autotest_lib.client.common_lib.test_utils import mock 12from autotest_lib.database import database_connection 13from autotest_lib.frontend.afe import models 14from autotest_lib.scheduler import agent_task 15from autotest_lib.scheduler import monitor_db, drone_manager 16from autotest_lib.scheduler import pidfile_monitor 17from autotest_lib.scheduler import scheduler_config, gc_stats 18from autotest_lib.scheduler import scheduler_lib 19from autotest_lib.scheduler import scheduler_models 20 21_DEBUG = False 22 23 24class DummyAgentTask(object): 25 num_processes = 1 26 owner_username = 'my_user' 27 28 def get_drone_hostnames_allowed(self): 29 return None 30 31 32class DummyAgent(object): 33 started = False 34 _is_done = False 35 host_ids = () 36 hostnames = {} 37 queue_entry_ids = () 38 39 def __init__(self): 40 self.task = DummyAgentTask() 41 42 43 def tick(self): 44 self.started = True 45 46 47 def is_done(self): 48 return self._is_done 49 50 51 def set_done(self, done): 52 self._is_done = done 53 54 55class IsRow(mock.argument_comparator): 56 def __init__(self, row_id): 57 self.row_id = row_id 58 59 60 def is_satisfied_by(self, parameter): 61 return list(parameter)[0] == self.row_id 62 63 64 def __str__(self): 65 return 'row with id %s' % self.row_id 66 67 68class IsAgentWithTask(mock.argument_comparator): 69 def __init__(self, task): 70 self._task = task 71 72 73 def is_satisfied_by(self, parameter): 74 if not isinstance(parameter, monitor_db.Agent): 75 return False 76 tasks = list(parameter.queue.queue) 77 if len(tasks) != 1: 78 return False 79 return tasks[0] == self._task 80 81 82def _set_host_and_qe_ids(agent_or_task, id_list=None): 83 if id_list is None: 84 id_list = [] 85 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list 86 agent_or_task.hostnames = dict((host_id, '192.168.1.1') 87 for host_id in id_list) 88 89 90class BaseSchedulerTest(unittest.TestCase, 91 frontend_test_utils.FrontendTestMixin): 92 _config_section = 'AUTOTEST_WEB' 93 94 def _do_query(self, sql): 95 self._database.execute(sql) 96 97 98 def _set_monitor_stubs(self): 99 self.mock_config = global_config.FakeGlobalConfig() 100 self.god.stub_with(global_config, 'global_config', self.mock_config) 101 102 # Clear the instance cache as this is a brand new database. 103 scheduler_models.DBObject._clear_instance_cache() 104 105 self._database = ( 106 database_connection.TranslatingDatabase.get_test_database( 107 translators=scheduler_lib._DB_TRANSLATORS)) 108 self._database.connect(db_type='django') 109 self._database.debug = _DEBUG 110 111 connection_manager = scheduler_lib.ConnectionManager(autocommit=False) 112 self.god.stub_with(connection_manager, 'db_connection', self._database) 113 self.god.stub_with(monitor_db, '_db_manager', connection_manager) 114 self.god.stub_with(monitor_db, '_db', self._database) 115 116 self.god.stub_with(monitor_db.BaseDispatcher, 117 '_get_pending_queue_entries', 118 self._get_pending_hqes) 119 self.god.stub_with(scheduler_models, '_db', self._database) 120 self.god.stub_with(drone_manager.instance(), '_results_dir', 121 '/test/path') 122 self.god.stub_with(drone_manager.instance(), '_temporary_directory', 123 '/test/path/tmp') 124 self.god.stub_with(drone_manager.instance(), 'initialize', 125 lambda *args: None) 126 self.god.stub_with(drone_manager.instance(), 'execute_actions', 127 lambda *args: None) 128 129 monitor_db.initialize_globals() 130 scheduler_models.initialize_globals() 131 132 133 def setUp(self): 134 self._frontend_common_setup() 135 self._set_monitor_stubs() 136 self._set_global_config_values() 137 self._dispatcher = monitor_db.Dispatcher() 138 139 140 def tearDown(self): 141 self._database.disconnect() 142 self._frontend_common_teardown() 143 144 145 def _set_global_config_values(self): 146 """Set global_config values to suit unittest needs.""" 147 self.mock_config.set_config_value( 148 'SCHEDULER', 'inline_host_acquisition', True) 149 150 151 def _update_hqe(self, set, where=''): 152 query = 'UPDATE afe_host_queue_entries SET ' + set 153 if where: 154 query += ' WHERE ' + where 155 self._do_query(query) 156 157 158 def _get_pending_hqes(self): 159 query_string=('afe_jobs.priority DESC, ' 160 'ifnull(nullif(host_id, NULL), host_id) DESC, ' 161 'ifnull(nullif(meta_host, NULL), meta_host) DESC, ' 162 'job_id') 163 return list(scheduler_models.HostQueueEntry.fetch( 164 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)', 165 where='NOT complete AND NOT active AND status="Queued"', 166 order_by=query_string)) 167 168 169class DispatcherSchedulingTest(BaseSchedulerTest): 170 _jobs_scheduled = [] 171 172 173 def tearDown(self): 174 super(DispatcherSchedulingTest, self).tearDown() 175 176 177 def _set_monitor_stubs(self): 178 super(DispatcherSchedulingTest, self)._set_monitor_stubs() 179 180 def hqe__do_schedule_pre_job_tasks_stub(queue_entry): 181 """Called by HostQueueEntry.run().""" 182 self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id) 183 queue_entry.set_status('Starting') 184 185 self.god.stub_with(scheduler_models.HostQueueEntry, 186 '_do_schedule_pre_job_tasks', 187 hqe__do_schedule_pre_job_tasks_stub) 188 189 190 def _record_job_scheduled(self, job_id, host_id): 191 record = (job_id, host_id) 192 self.assert_(record not in self._jobs_scheduled, 193 'Job %d scheduled on host %d twice' % 194 (job_id, host_id)) 195 self._jobs_scheduled.append(record) 196 197 198 def _assert_job_scheduled_on(self, job_id, host_id): 199 record = (job_id, host_id) 200 self.assert_(record in self._jobs_scheduled, 201 'Job %d not scheduled on host %d as expected\n' 202 'Jobs scheduled: %s' % 203 (job_id, host_id, self._jobs_scheduled)) 204 self._jobs_scheduled.remove(record) 205 206 207 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number): 208 """Assert job was scheduled on exactly number hosts out of a set.""" 209 found = [] 210 for host_id in host_ids: 211 record = (job_id, host_id) 212 if record in self._jobs_scheduled: 213 found.append(record) 214 self._jobs_scheduled.remove(record) 215 if len(found) < number: 216 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n' 217 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 218 elif len(found) > number: 219 self.fail('Job %d scheduled on more than %d hosts in %s.\n' 220 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 221 222 223 def _check_for_extra_schedulings(self): 224 if len(self._jobs_scheduled) != 0: 225 self.fail('Extra jobs scheduled: ' + 226 str(self._jobs_scheduled)) 227 228 229 def _convert_jobs_to_metahosts(self, *job_ids): 230 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 231 self._do_query('UPDATE afe_host_queue_entries SET ' 232 'meta_host=host_id, host_id=NULL ' 233 'WHERE job_id IN ' + sql_tuple) 234 235 236 def _lock_host(self, host_id): 237 self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' + 238 str(host_id)) 239 240 241 def setUp(self): 242 super(DispatcherSchedulingTest, self).setUp() 243 self._jobs_scheduled = [] 244 245 246 def _run_scheduler(self): 247 self._dispatcher._host_scheduler.tick() 248 for _ in xrange(2): # metahost scheduling can take two ticks 249 self._dispatcher._schedule_new_jobs() 250 251 252 def _test_basic_scheduling_helper(self, use_metahosts): 253 'Basic nonmetahost scheduling' 254 self._create_job_simple([1], use_metahosts) 255 self._create_job_simple([2], use_metahosts) 256 self._run_scheduler() 257 self._assert_job_scheduled_on(1, 1) 258 self._assert_job_scheduled_on(2, 2) 259 self._check_for_extra_schedulings() 260 261 262 def _test_priorities_helper(self, use_metahosts): 263 'Test prioritization ordering' 264 self._create_job_simple([1], use_metahosts) 265 self._create_job_simple([2], use_metahosts) 266 self._create_job_simple([1,2], use_metahosts) 267 self._create_job_simple([1], use_metahosts, priority=1) 268 self._run_scheduler() 269 self._assert_job_scheduled_on(4, 1) # higher priority 270 self._assert_job_scheduled_on(2, 2) # earlier job over later 271 self._check_for_extra_schedulings() 272 273 274 def _test_hosts_ready_helper(self, use_metahosts): 275 """ 276 Only hosts that are status=Ready, unlocked and not invalid get 277 scheduled. 278 """ 279 self._create_job_simple([1], use_metahosts) 280 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1') 281 self._run_scheduler() 282 self._check_for_extra_schedulings() 283 284 self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 ' 285 'WHERE id=1') 286 self._run_scheduler() 287 self._check_for_extra_schedulings() 288 289 self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 ' 290 'WHERE id=1') 291 self._run_scheduler() 292 if not use_metahosts: 293 self._assert_job_scheduled_on(1, 1) 294 self._check_for_extra_schedulings() 295 296 297 def _test_hosts_idle_helper(self, use_metahosts): 298 'Only idle hosts get scheduled' 299 self._create_job(hosts=[1], active=True) 300 self._create_job_simple([1], use_metahosts) 301 self._run_scheduler() 302 self._check_for_extra_schedulings() 303 304 305 def _test_obey_ACLs_helper(self, use_metahosts): 306 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1') 307 self._create_job_simple([1], use_metahosts) 308 self._run_scheduler() 309 self._check_for_extra_schedulings() 310 311 312 def test_basic_scheduling(self): 313 self._test_basic_scheduling_helper(False) 314 315 316 def test_priorities(self): 317 self._test_priorities_helper(False) 318 319 320 def test_hosts_ready(self): 321 self._test_hosts_ready_helper(False) 322 323 324 def test_hosts_idle(self): 325 self._test_hosts_idle_helper(False) 326 327 328 def test_obey_ACLs(self): 329 self._test_obey_ACLs_helper(False) 330 331 332 def test_one_time_hosts_ignore_ACLs(self): 333 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1') 334 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1') 335 self._create_job_simple([1]) 336 self._run_scheduler() 337 self._assert_job_scheduled_on(1, 1) 338 self._check_for_extra_schedulings() 339 340 341 def test_non_metahost_on_invalid_host(self): 342 """ 343 Non-metahost entries can get scheduled on invalid hosts (this is how 344 one-time hosts work). 345 """ 346 self._do_query('UPDATE afe_hosts SET invalid=1') 347 self._test_basic_scheduling_helper(False) 348 349 350 def test_metahost_scheduling(self): 351 """ 352 Basic metahost scheduling 353 """ 354 self._test_basic_scheduling_helper(True) 355 356 357 def test_metahost_priorities(self): 358 self._test_priorities_helper(True) 359 360 361 def test_metahost_hosts_ready(self): 362 self._test_hosts_ready_helper(True) 363 364 365 def test_metahost_hosts_idle(self): 366 self._test_hosts_idle_helper(True) 367 368 369 def test_metahost_obey_ACLs(self): 370 self._test_obey_ACLs_helper(True) 371 372 373 def test_nonmetahost_over_metahost(self): 374 """ 375 Non-metahost entries should take priority over metahost entries 376 for the same host 377 """ 378 self._create_job(metahosts=[1]) 379 self._create_job(hosts=[1]) 380 self._run_scheduler() 381 self._assert_job_scheduled_on(2, 1) 382 self._check_for_extra_schedulings() 383 384 385 def test_no_execution_subdir_not_found(self): 386 """Reproduce bug crosbug.com/334353 and recover from it.""" 387 388 self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost') 389 390 job = self._create_job(hostless=True) 391 392 # Ensure execution_subdir is set before status 393 original_set_status = scheduler_models.HostQueueEntry.set_status 394 def fake_set_status(hqe, *args, **kwargs): 395 self.assertEqual(hqe.execution_subdir, 'hostless') 396 original_set_status(hqe, *args, **kwargs) 397 398 self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status', 399 fake_set_status) 400 401 self._dispatcher._schedule_new_jobs() 402 403 hqe = job.hostqueueentry_set.all()[0] 404 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 405 self.assertEqual('hostless', hqe.execution_subdir) 406 407 408 def test_only_schedule_queued_entries(self): 409 self._create_job(metahosts=[1]) 410 self._update_hqe(set='active=1, host_id=2') 411 self._run_scheduler() 412 self._check_for_extra_schedulings() 413 414 415 def test_no_ready_hosts(self): 416 self._create_job(hosts=[1]) 417 self._do_query('UPDATE afe_hosts SET status="Repair Failed"') 418 self._run_scheduler() 419 self._check_for_extra_schedulings() 420 421 422 def test_garbage_collection(self): 423 self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats', 424 999999) 425 self.god.stub_function(gc, 'collect') 426 self.god.stub_function(gc_stats, '_log_garbage_collector_stats') 427 gc.collect.expect_call().and_return(0) 428 gc_stats._log_garbage_collector_stats.expect_call() 429 # Force a garbage collection run 430 self._dispatcher._last_garbage_stats_time = 0 431 self._dispatcher._garbage_collection() 432 # The previous call should have reset the time, it won't do anything 433 # the second time. If it does, we'll get an unexpected call. 434 self._dispatcher._garbage_collection() 435 436 437class DispatcherThrottlingTest(BaseSchedulerTest): 438 """ 439 Test that the dispatcher throttles: 440 * total number of running processes 441 * number of processes started per cycle 442 """ 443 _MAX_RUNNING = 3 444 _MAX_STARTED = 2 445 446 def setUp(self): 447 super(DispatcherThrottlingTest, self).setUp() 448 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING 449 450 def fake_max_runnable_processes(fake_self, username, 451 drone_hostnames_allowed): 452 running = sum(agent.task.num_processes 453 for agent in self._agents 454 if agent.started and not agent.is_done()) 455 return self._MAX_RUNNING - running 456 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes', 457 fake_max_runnable_processes) 458 459 460 def _setup_some_agents(self, num_agents): 461 self._agents = [DummyAgent() for i in xrange(num_agents)] 462 self._dispatcher._agents = list(self._agents) 463 464 465 def _run_a_few_ticks(self): 466 for i in xrange(4): 467 self._dispatcher._handle_agents() 468 469 470 def _assert_agents_started(self, indexes, is_started=True): 471 for i in indexes: 472 self.assert_(self._agents[i].started == is_started, 473 'Agent %d %sstarted' % 474 (i, is_started and 'not ' or '')) 475 476 477 def _assert_agents_not_started(self, indexes): 478 self._assert_agents_started(indexes, False) 479 480 481 def test_throttle_total(self): 482 self._setup_some_agents(4) 483 self._run_a_few_ticks() 484 self._assert_agents_started([0, 1, 2]) 485 self._assert_agents_not_started([3]) 486 487 488 def test_throttle_with_synchronous(self): 489 self._setup_some_agents(2) 490 self._agents[0].task.num_processes = 3 491 self._run_a_few_ticks() 492 self._assert_agents_started([0]) 493 self._assert_agents_not_started([1]) 494 495 496 def test_large_agent_starvation(self): 497 """ 498 Ensure large agents don't get starved by lower-priority agents. 499 """ 500 self._setup_some_agents(3) 501 self._agents[1].task.num_processes = 3 502 self._run_a_few_ticks() 503 self._assert_agents_started([0]) 504 self._assert_agents_not_started([1, 2]) 505 506 self._agents[0].set_done(True) 507 self._run_a_few_ticks() 508 self._assert_agents_started([1]) 509 self._assert_agents_not_started([2]) 510 511 512 def test_zero_process_agent(self): 513 self._setup_some_agents(5) 514 self._agents[4].task.num_processes = 0 515 self._run_a_few_ticks() 516 self._assert_agents_started([0, 1, 2, 4]) 517 self._assert_agents_not_started([3]) 518 519 520class PidfileRunMonitorTest(unittest.TestCase): 521 execution_tag = 'test_tag' 522 pid = 12345 523 process = drone_manager.Process('myhost', pid) 524 num_tests_failed = 1 525 526 def setUp(self): 527 self.god = mock.mock_god() 528 self.mock_drone_manager = self.god.create_mock_class( 529 drone_manager.DroneManager, 'drone_manager') 530 self.god.stub_with(drone_manager, '_the_instance', 531 self.mock_drone_manager) 532 self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs', 533 self._mock_get_pidfile_timeout_secs) 534 535 self.pidfile_id = object() 536 537 (self.mock_drone_manager.get_pidfile_id_from 538 .expect_call(self.execution_tag, 539 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 540 .and_return(self.pidfile_id)) 541 542 self.monitor = pidfile_monitor.PidfileRunMonitor() 543 self.monitor.attach_to_existing_process(self.execution_tag) 544 545 def tearDown(self): 546 self.god.unstub_all() 547 548 549 def _mock_get_pidfile_timeout_secs(self): 550 return 300 551 552 553 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None, 554 use_second_read=False): 555 contents = drone_manager.PidfileContents() 556 if pid is not None: 557 contents.process = drone_manager.Process('myhost', pid) 558 contents.exit_status = exit_code 559 contents.num_tests_failed = tests_failed 560 self.mock_drone_manager.get_pidfile_contents.expect_call( 561 self.pidfile_id, use_second_read=use_second_read).and_return( 562 contents) 563 564 565 def set_not_yet_run(self): 566 self.setup_pidfile() 567 568 569 def set_empty_pidfile(self): 570 self.setup_pidfile() 571 572 573 def set_running(self, use_second_read=False): 574 self.setup_pidfile(self.pid, use_second_read=use_second_read) 575 576 577 def set_complete(self, error_code, use_second_read=False): 578 self.setup_pidfile(self.pid, error_code, self.num_tests_failed, 579 use_second_read=use_second_read) 580 581 582 def _check_monitor(self, expected_pid, expected_exit_status, 583 expected_num_tests_failed): 584 if expected_pid is None: 585 self.assertEquals(self.monitor._state.process, None) 586 else: 587 self.assertEquals(self.monitor._state.process.pid, expected_pid) 588 self.assertEquals(self.monitor._state.exit_status, expected_exit_status) 589 self.assertEquals(self.monitor._state.num_tests_failed, 590 expected_num_tests_failed) 591 592 593 self.god.check_playback() 594 595 596 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status, 597 expected_num_tests_failed): 598 self.monitor._read_pidfile() 599 self._check_monitor(expected_pid, expected_exit_status, 600 expected_num_tests_failed) 601 602 603 def _get_expected_tests_failed(self, expected_exit_status): 604 if expected_exit_status is None: 605 expected_tests_failed = None 606 else: 607 expected_tests_failed = self.num_tests_failed 608 return expected_tests_failed 609 610 611 def test_read_pidfile(self): 612 self.set_not_yet_run() 613 self._test_read_pidfile_helper(None, None, None) 614 615 self.set_empty_pidfile() 616 self._test_read_pidfile_helper(None, None, None) 617 618 self.set_running() 619 self._test_read_pidfile_helper(self.pid, None, None) 620 621 self.set_complete(123) 622 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed) 623 624 625 def test_read_pidfile_error(self): 626 self.mock_drone_manager.get_pidfile_contents.expect_call( 627 self.pidfile_id, use_second_read=False).and_return( 628 drone_manager.InvalidPidfile('error')) 629 self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException, 630 self.monitor._read_pidfile) 631 self.god.check_playback() 632 633 634 def setup_is_running(self, is_running): 635 self.mock_drone_manager.is_process_running.expect_call( 636 self.process).and_return(is_running) 637 638 639 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status, 640 expected_num_tests_failed): 641 self.monitor._get_pidfile_info() 642 self._check_monitor(expected_pid, expected_exit_status, 643 expected_num_tests_failed) 644 645 646 def test_get_pidfile_info(self): 647 """ 648 normal cases for get_pidfile_info 649 """ 650 # running 651 self.set_running() 652 self.setup_is_running(True) 653 self._test_get_pidfile_info_helper(self.pid, None, None) 654 655 # exited during check 656 self.set_running() 657 self.setup_is_running(False) 658 self.set_complete(123, use_second_read=True) # pidfile gets read again 659 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 660 661 # completed 662 self.set_complete(123) 663 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 664 665 666 def test_get_pidfile_info_running_no_proc(self): 667 """ 668 pidfile shows process running, but no proc exists 669 """ 670 # running but no proc 671 self.set_running() 672 self.setup_is_running(False) 673 self.set_running(use_second_read=True) 674 self._test_get_pidfile_info_helper(self.pid, 1, 0) 675 self.assertTrue(self.monitor.lost_process) 676 677 678 def test_get_pidfile_info_not_yet_run(self): 679 """ 680 pidfile hasn't been written yet 681 """ 682 self.set_not_yet_run() 683 self._test_get_pidfile_info_helper(None, None, None) 684 685 686 def test_process_failed_to_write_pidfile(self): 687 self.set_not_yet_run() 688 self.monitor._start_time = (time.time() - 689 pidfile_monitor._get_pidfile_timeout_secs() - 1) 690 self._test_get_pidfile_info_helper(None, 1, 0) 691 self.assertTrue(self.monitor.lost_process) 692 693 694class AgentTest(unittest.TestCase): 695 def setUp(self): 696 self.god = mock.mock_god() 697 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 698 'dispatcher') 699 700 701 def tearDown(self): 702 self.god.unstub_all() 703 704 705 def _create_mock_task(self, name): 706 task = self.god.create_mock_class(agent_task.AgentTask, name) 707 task.num_processes = 1 708 _set_host_and_qe_ids(task) 709 return task 710 711 def _create_agent(self, task): 712 agent = monitor_db.Agent(task) 713 agent.dispatcher = self._dispatcher 714 return agent 715 716 717 def _finish_agent(self, agent): 718 while not agent.is_done(): 719 agent.tick() 720 721 722 def test_agent_abort(self): 723 task = self._create_mock_task('task') 724 task.poll.expect_call() 725 task.is_done.expect_call().and_return(False) 726 task.abort.expect_call() 727 task.aborted = True 728 729 agent = self._create_agent(task) 730 agent.tick() 731 agent.abort() 732 self._finish_agent(agent) 733 self.god.check_playback() 734 735 736 def _test_agent_abort_before_started_helper(self, ignore_abort=False): 737 task = self._create_mock_task('task') 738 task.abort.expect_call() 739 if ignore_abort: 740 task.aborted = False 741 task.poll.expect_call() 742 task.is_done.expect_call().and_return(True) 743 task.success = True 744 else: 745 task.aborted = True 746 747 agent = self._create_agent(task) 748 agent.abort() 749 self._finish_agent(agent) 750 self.god.check_playback() 751 752 753 def test_agent_abort_before_started(self): 754 self._test_agent_abort_before_started_helper() 755 self._test_agent_abort_before_started_helper(True) 756 757 758class JobSchedulingTest(BaseSchedulerTest): 759 def _test_run_helper(self, expect_agent=True, expect_starting=False, 760 expect_pending=False): 761 if expect_starting: 762 expected_status = models.HostQueueEntry.Status.STARTING 763 elif expect_pending: 764 expected_status = models.HostQueueEntry.Status.PENDING 765 else: 766 expected_status = models.HostQueueEntry.Status.VERIFYING 767 job = scheduler_models.Job.fetch('id = 1')[0] 768 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 769 assert queue_entry.job is job 770 job.run_if_ready(queue_entry) 771 772 self.god.check_playback() 773 774 self._dispatcher._schedule_running_host_queue_entries() 775 agent = self._dispatcher._agents[0] 776 777 actual_status = models.HostQueueEntry.smart_get(1).status 778 self.assertEquals(expected_status, actual_status) 779 780 if not expect_agent: 781 self.assertEquals(agent, None) 782 return 783 784 self.assert_(isinstance(agent, monitor_db.Agent)) 785 self.assert_(agent.task) 786 return agent.task 787 788 789 def test_run_synchronous_ready(self): 790 self._create_job(hosts=[1, 2], synchronous=True) 791 self._update_hqe("status='Pending', execution_subdir=''") 792 793 queue_task = self._test_run_helper(expect_starting=True) 794 795 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 796 self.assertEquals(queue_task.job.id, 1) 797 hqe_ids = [hqe.id for hqe in queue_task.queue_entries] 798 self.assertEquals(hqe_ids, [1, 2]) 799 800 801 def test_schedule_running_host_queue_entries_fail(self): 802 self._create_job(hosts=[2]) 803 self._update_hqe("status='%s', execution_subdir=''" % 804 models.HostQueueEntry.Status.PENDING) 805 job = scheduler_models.Job.fetch('id = 1')[0] 806 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 807 assert queue_entry.job is job 808 job.run_if_ready(queue_entry) 809 self.assertEqual(queue_entry.status, 810 models.HostQueueEntry.Status.STARTING) 811 self.assert_(queue_entry.execution_subdir) 812 self.god.check_playback() 813 814 class dummy_test_agent(object): 815 task = 'dummy_test_agent' 816 self._dispatcher._register_agent_for_ids( 817 self._dispatcher._host_agents, [queue_entry.host.id], 818 dummy_test_agent) 819 820 # Attempted to schedule on a host that already has an agent. 821 self.assertRaises(scheduler_lib.SchedulerError, 822 self._dispatcher._schedule_running_host_queue_entries) 823 824 825 def test_schedule_hostless_job(self): 826 job = self._create_job(hostless=True) 827 self.assertEqual(1, job.hostqueueentry_set.count()) 828 hqe_query = scheduler_models.HostQueueEntry.fetch( 829 'id = %s' % job.hostqueueentry_set.all()[0].id) 830 self.assertEqual(1, len(hqe_query)) 831 hqe = hqe_query[0] 832 833 self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status) 834 self.assertEqual(0, len(self._dispatcher._agents)) 835 836 self._dispatcher._schedule_new_jobs() 837 838 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 839 self.assertEqual(1, len(self._dispatcher._agents)) 840 841 self._dispatcher._schedule_new_jobs() 842 843 # No change to previously schedule hostless job, and no additional agent 844 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 845 self.assertEqual(1, len(self._dispatcher._agents)) 846 847 848class TopLevelFunctionsTest(unittest.TestCase): 849 def setUp(self): 850 self.god = mock.mock_god() 851 852 853 def tearDown(self): 854 self.god.unstub_all() 855 856 857 def test_autoserv_command_line(self): 858 machines = 'abcd12,efgh34' 859 extra_args = ['-Z', 'hello'] 860 expected_command_line_base = set((monitor_db._autoserv_path, '-p', 861 '-m', machines, '-r', 862 '--lab', 'True', 863 drone_manager.WORKING_DIRECTORY)) 864 865 expected_command_line = expected_command_line_base.union( 866 ['--verbose']).union(extra_args) 867 command_line = set( 868 monitor_db._autoserv_command_line(machines, extra_args)) 869 self.assertEqual(expected_command_line, command_line) 870 871 class FakeJob(object): 872 owner = 'Bob' 873 name = 'fake job name' 874 test_retry = 0 875 id = 1337 876 877 class FakeHQE(object): 878 job = FakeJob 879 880 expected_command_line = expected_command_line_base.union( 881 ['-u', FakeJob.owner, '-l', FakeJob.name]) 882 command_line = set(monitor_db._autoserv_command_line( 883 machines, extra_args=[], queue_entry=FakeHQE, verbose=False)) 884 self.assertEqual(expected_command_line, command_line) 885 886 887class AgentTaskTest(unittest.TestCase, 888 frontend_test_utils.FrontendTestMixin): 889 def setUp(self): 890 self._frontend_common_setup() 891 892 893 def tearDown(self): 894 self._frontend_common_teardown() 895 896 897 def _setup_drones(self): 898 self.god.stub_function(models.DroneSet, 'drone_sets_enabled') 899 models.DroneSet.drone_sets_enabled.expect_call().and_return(True) 900 901 drones = [] 902 for x in xrange(4): 903 drones.append(models.Drone.objects.create(hostname=str(x))) 904 905 drone_set_1 = models.DroneSet.objects.create(name='1') 906 drone_set_1.drones.add(*drones[0:2]) 907 drone_set_2 = models.DroneSet.objects.create(name='2') 908 drone_set_2.drones.add(*drones[2:4]) 909 drone_set_3 = models.DroneSet.objects.create(name='3') 910 911 job_1 = self._create_job_simple([self.hosts[0].id], 912 drone_set=drone_set_1) 913 job_2 = self._create_job_simple([self.hosts[0].id], 914 drone_set=drone_set_2) 915 job_3 = self._create_job_simple([self.hosts[0].id], 916 drone_set=drone_set_3) 917 918 job_4 = self._create_job_simple([self.hosts[0].id]) 919 job_4.drone_set = None 920 job_4.save() 921 922 hqe_1 = job_1.hostqueueentry_set.all()[0] 923 hqe_2 = job_2.hostqueueentry_set.all()[0] 924 hqe_3 = job_3.hostqueueentry_set.all()[0] 925 hqe_4 = job_4.hostqueueentry_set.all()[0] 926 927 return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask() 928 929 930 def test_get_drone_hostnames_allowed_no_drones_in_set(self): 931 hqes, task = self._setup_drones() 932 task.queue_entry_ids = (hqes[2].id,) 933 self.assertEqual(set(), task.get_drone_hostnames_allowed()) 934 self.god.check_playback() 935 936 937 def test_get_drone_hostnames_allowed_no_drone_set(self): 938 hqes, task = self._setup_drones() 939 hqe = hqes[3] 940 task.queue_entry_ids = (hqe.id,) 941 942 result = object() 943 944 self.god.stub_function(task, '_user_or_global_default_drone_set') 945 task._user_or_global_default_drone_set.expect_call( 946 hqe.job, hqe.job.user()).and_return(result) 947 948 self.assertEqual(result, task.get_drone_hostnames_allowed()) 949 self.god.check_playback() 950 951 952 def test_get_drone_hostnames_allowed_success(self): 953 hqes, task = self._setup_drones() 954 task.queue_entry_ids = (hqes[0].id,) 955 self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed([])) 956 self.god.check_playback() 957 958 959 def test_get_drone_hostnames_allowed_multiple_jobs(self): 960 hqes, task = self._setup_drones() 961 task.queue_entry_ids = (hqes[0].id, hqes[1].id) 962 self.assertRaises(AssertionError, 963 task.get_drone_hostnames_allowed) 964 self.god.check_playback() 965 966 967 def test_get_drone_hostnames_allowed_no_hqe(self): 968 class MockSpecialTask(object): 969 requested_by = object() 970 971 class MockSpecialAgentTask(agent_task.SpecialAgentTask): 972 task = MockSpecialTask() 973 queue_entry_ids = [] 974 def __init__(self, *args, **kwargs): 975 super(agent_task.SpecialAgentTask, self).__init__() 976 977 task = MockSpecialAgentTask() 978 self.god.stub_function(models.DroneSet, 'drone_sets_enabled') 979 self.god.stub_function(task, '_user_or_global_default_drone_set') 980 981 result = object() 982 models.DroneSet.drone_sets_enabled.expect_call().and_return(True) 983 task._user_or_global_default_drone_set.expect_call( 984 task.task, MockSpecialTask.requested_by).and_return(result) 985 986 self.assertEqual(result, task.get_drone_hostnames_allowed()) 987 self.god.check_playback() 988 989 990 def _setup_test_user_or_global_default_drone_set(self): 991 result = object() 992 class MockDroneSet(object): 993 def get_drone_hostnames(self): 994 return result 995 996 self.god.stub_function(models.DroneSet, 'get_default') 997 models.DroneSet.get_default.expect_call().and_return(MockDroneSet()) 998 return result 999 1000 1001 def test_user_or_global_default_drone_set(self): 1002 expected = object() 1003 class MockDroneSet(object): 1004 def get_drone_hostnames(self): 1005 return expected 1006 class MockUser(object): 1007 drone_set = MockDroneSet() 1008 1009 self._setup_test_user_or_global_default_drone_set() 1010 1011 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1012 None, MockUser()) 1013 1014 self.assertEqual(expected, actual) 1015 self.god.check_playback() 1016 1017 1018 def test_user_or_global_default_drone_set_no_user(self): 1019 expected = self._setup_test_user_or_global_default_drone_set() 1020 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1021 None, None) 1022 1023 self.assertEqual(expected, actual) 1024 self.god.check_playback() 1025 1026 1027 def test_user_or_global_default_drone_set_no_user_drone_set(self): 1028 class MockUser(object): 1029 drone_set = None 1030 login = None 1031 1032 expected = self._setup_test_user_or_global_default_drone_set() 1033 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1034 None, MockUser()) 1035 1036 self.assertEqual(expected, actual) 1037 self.god.check_playback() 1038 1039 1040 def test_abort_HostlessQueueTask(self): 1041 hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry, 1042 'HostQueueEntry') 1043 # If hqe is still in STARTING status, aborting the task should finish 1044 # without changing hqe's status. 1045 hqe.status = models.HostQueueEntry.Status.STARTING 1046 hqe.job = None 1047 hqe.id = 0 1048 task = monitor_db.HostlessQueueTask(hqe) 1049 task.abort() 1050 1051 # If hqe is in RUNNING status, aborting the task should change hqe's 1052 # status to Parsing, so FinalReparseTask can be scheduled. 1053 hqe.set_status.expect_call('Parsing') 1054 hqe.status = models.HostQueueEntry.Status.RUNNING 1055 hqe.job = None 1056 hqe.id = 0 1057 task = monitor_db.HostlessQueueTask(hqe) 1058 task.abort() 1059 1060 1061if __name__ == '__main__': 1062 unittest.main() 1063