1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4import datetime 5import common 6from autotest_lib.frontend import setup_django_environment 7from autotest_lib.frontend.afe import frontend_test_utils 8from autotest_lib.client.common_lib import host_queue_entry_states 9from autotest_lib.client.common_lib.test_utils import mock 10from autotest_lib.client.common_lib.test_utils import unittest 11from autotest_lib.database import database_connection 12from autotest_lib.frontend.afe import models, model_attributes 13from autotest_lib.scheduler import monitor_db 14from autotest_lib.scheduler import scheduler_lib 15from autotest_lib.scheduler import scheduler_models 16 17_DEBUG = False 18 19 20class BaseSchedulerModelsTest(unittest.TestCase, 21 frontend_test_utils.FrontendTestMixin): 22 _config_section = 'AUTOTEST_WEB' 23 24 def _do_query(self, sql): 25 self._database.execute(sql) 26 27 28 def _set_monitor_stubs(self): 29 # Clear the instance cache as this is a brand new database. 30 scheduler_models.DBObject._clear_instance_cache() 31 32 self._database = ( 33 database_connection.TranslatingDatabase.get_test_database( 34 translators=scheduler_lib._DB_TRANSLATORS)) 35 self._database.connect(db_type='django') 36 self._database.debug = _DEBUG 37 38 self.god.stub_with(scheduler_models, '_db', self._database) 39 40 41 def setUp(self): 42 self._frontend_common_setup() 43 self._set_monitor_stubs() 44 45 46 def tearDown(self): 47 self._database.disconnect() 48 self._frontend_common_teardown() 49 50 51 def _update_hqe(self, set, where=''): 52 query = 'UPDATE afe_host_queue_entries SET ' + set 53 if where: 54 query += ' WHERE ' + where 55 self._do_query(query) 56 57 58class DelayedCallTaskTest(unittest.TestCase): 59 def setUp(self): 60 self.god = mock.mock_god() 61 62 63 def tearDown(self): 64 self.god.unstub_all() 65 66 67 def test_delayed_call(self): 68 test_time = self.god.create_mock_function('time') 69 test_time.expect_call().and_return(33) 70 test_time.expect_call().and_return(34.01) 71 test_time.expect_call().and_return(34.99) 72 test_time.expect_call().and_return(35.01) 73 def test_callback(): 74 test_callback.calls += 1 75 test_callback.calls = 0 76 delay_task = scheduler_models.DelayedCallTask( 77 delay_seconds=2, callback=test_callback, 78 now_func=test_time) # time 33 79 self.assertEqual(35, delay_task.end_time) 80 delay_task.poll() # activates the task and polls it once, time 34.01 81 self.assertEqual(0, test_callback.calls, "callback called early") 82 delay_task.poll() # time 34.99 83 self.assertEqual(0, test_callback.calls, "callback called early") 84 delay_task.poll() # time 35.01 85 self.assertEqual(1, test_callback.calls) 86 self.assert_(delay_task.is_done()) 87 self.assert_(delay_task.success) 88 self.assert_(not delay_task.aborted) 89 self.god.check_playback() 90 91 92 def test_delayed_call_abort(self): 93 delay_task = scheduler_models.DelayedCallTask( 94 delay_seconds=987654, callback=lambda : None) 95 delay_task.abort() 96 self.assert_(delay_task.aborted) 97 self.assert_(delay_task.is_done()) 98 self.assert_(not delay_task.success) 99 self.god.check_playback() 100 101 102class DBObjectTest(BaseSchedulerModelsTest): 103 def test_compare_fields_in_row(self): 104 host = scheduler_models.Host(id=1) 105 fields = list(host._fields) 106 row_data = [getattr(host, fieldname) for fieldname in fields] 107 self.assertEqual({}, host._compare_fields_in_row(row_data)) 108 row_data[fields.index('hostname')] = 'spam' 109 self.assertEqual({'hostname': ('host1', 'spam')}, 110 host._compare_fields_in_row(row_data)) 111 row_data[fields.index('id')] = 23 112 self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)}, 113 host._compare_fields_in_row(row_data)) 114 115 116 def test_compare_fields_in_row_datetime_ignores_microseconds(self): 117 datetime_with_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 7890) 118 datetime_without_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 0) 119 class TestTable(scheduler_models.DBObject): 120 _table_name = 'test_table' 121 _fields = ('id', 'test_datetime') 122 tt = TestTable(row=[1, datetime_without_us]) 123 self.assertEqual({}, tt._compare_fields_in_row([1, datetime_with_us])) 124 125 126 def test_always_query(self): 127 host_a = scheduler_models.Host(id=2) 128 self.assertEqual(host_a.hostname, 'host2') 129 self._do_query('UPDATE afe_hosts SET hostname="host2-updated" ' 130 'WHERE id=2') 131 host_b = scheduler_models.Host(id=2, always_query=True) 132 self.assert_(host_a is host_b, 'Cached instance not returned.') 133 self.assertEqual(host_a.hostname, 'host2-updated', 134 'Database was not re-queried') 135 136 # If either of these are called, a query was made when it shouldn't be. 137 host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!') 138 host_a._update_fields_from_row = host_a._compare_fields_in_row 139 host_c = scheduler_models.Host(id=2, always_query=False) 140 self.assert_(host_a is host_c, 'Cached instance not returned') 141 142 143 def test_delete(self): 144 host = scheduler_models.Host(id=3) 145 host.delete() 146 host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3, 147 always_query=False) 148 host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3, 149 always_query=True) 150 151 def test_save(self): 152 # Dummy Job to avoid creating a one in the HostQueueEntry __init__. 153 class MockJob(object): 154 def __init__(self, id): 155 pass 156 def tag(self): 157 return 'MockJob' 158 self.god.stub_with(scheduler_models, 'Job', MockJob) 159 hqe = scheduler_models.HostQueueEntry( 160 new_record=True, 161 row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None, 162 None]) 163 hqe.save() 164 new_id = hqe.id 165 # Force a re-query and verify that the correct data was stored. 166 scheduler_models.DBObject._clear_instance_cache() 167 hqe = scheduler_models.HostQueueEntry(id=new_id) 168 self.assertEqual(hqe.id, new_id) 169 self.assertEqual(hqe.job_id, 1) 170 self.assertEqual(hqe.host_id, 2) 171 self.assertEqual(hqe.status, 'Queued') 172 self.assertEqual(hqe.meta_host, None) 173 self.assertEqual(hqe.active, False) 174 self.assertEqual(hqe.complete, False) 175 self.assertEqual(hqe.deleted, False) 176 self.assertEqual(hqe.execution_subdir, '.') 177 self.assertEqual(hqe.atomic_group_id, None) 178 self.assertEqual(hqe.started_on, None) 179 self.assertEqual(hqe.finished_on, None) 180 181 182class HostTest(BaseSchedulerModelsTest): 183 def test_cmp_for_sort(self): 184 expected_order = [ 185 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010', 186 'host10', 'host11', 'yolkfolk'] 187 hostname_idx = list(scheduler_models.Host._fields).index('hostname') 188 row = [None] * len(scheduler_models.Host._fields) 189 hosts = [] 190 for hostname in expected_order: 191 row[hostname_idx] = hostname 192 hosts.append(scheduler_models.Host(row=row, new_record=True)) 193 194 host1 = hosts[expected_order.index('Host1')] 195 host010 = hosts[expected_order.index('HOST010')] 196 host10 = hosts[expected_order.index('host10')] 197 host3 = hosts[expected_order.index('host3')] 198 alice = hosts[expected_order.index('alice')] 199 self.assertEqual(0, scheduler_models.Host.cmp_for_sort(host10, host10)) 200 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host10, host010)) 201 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host010, host10)) 202 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host10)) 203 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host010)) 204 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host10)) 205 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host010)) 206 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, host1)) 207 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host3)) 208 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(alice, host3)) 209 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, alice)) 210 self.assertEqual(0, scheduler_models.Host.cmp_for_sort(alice, alice)) 211 212 hosts.sort(cmp=scheduler_models.Host.cmp_for_sort) 213 self.assertEqual(expected_order, [h.hostname for h in hosts]) 214 215 hosts.reverse() 216 hosts.sort(cmp=scheduler_models.Host.cmp_for_sort) 217 self.assertEqual(expected_order, [h.hostname for h in hosts]) 218 219 220class HostQueueEntryTest(BaseSchedulerModelsTest): 221 def _create_hqe(self, dependency_labels=(), **create_job_kwargs): 222 job = self._create_job(**create_job_kwargs) 223 for label in dependency_labels: 224 job.dependency_labels.add(label) 225 hqes = list(scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % job.id)) 226 self.assertEqual(1, len(hqes)) 227 return hqes[0] 228 229 230 def _check_hqe_labels(self, hqe, expected_labels): 231 expected_labels = set(expected_labels) 232 label_names = set(label.name for label in hqe.get_labels()) 233 self.assertEqual(expected_labels, label_names) 234 235 236 def test_get_labels_empty(self): 237 hqe = self._create_hqe(hosts=[1]) 238 labels = list(hqe.get_labels()) 239 self.assertEqual([], labels) 240 241 242 def test_get_labels_metahost(self): 243 hqe = self._create_hqe(metahosts=[2]) 244 self._check_hqe_labels(hqe, ['label2']) 245 246 247 def test_get_labels_dependancies(self): 248 hqe = self._create_hqe(dependency_labels=(self.label3, self.label4), 249 metahosts=[1]) 250 self._check_hqe_labels(hqe, ['label1', 'label3', 'label4']) 251 252 253 def setup_abort_test(self, agent_finished=True): 254 """Setup the variables for testing abort method. 255 256 @param agent_finished: True to mock agent is finished before aborting 257 the hqe. 258 @return hqe, dispatcher: Mock object of hqe and dispatcher to be used 259 to test abort method. 260 """ 261 hqe = self._create_hqe(hosts=[1]) 262 hqe.aborted = True 263 hqe.complete = False 264 hqe.status = models.HostQueueEntry.Status.STARTING 265 hqe.started_on = datetime.datetime.now() 266 267 dispatcher = self.god.create_mock_class(monitor_db.BaseDispatcher, 268 'BaseDispatcher') 269 agent = self.god.create_mock_class(monitor_db.Agent, 'Agent') 270 dispatcher.get_agents_for_entry.expect_call(hqe).and_return([agent]) 271 agent.is_done.expect_call().and_return(agent_finished) 272 return hqe, dispatcher 273 274 275 def test_abort_fail_with_unfinished_agent(self): 276 """abort should fail if the hqe still has agent not finished. 277 """ 278 hqe, dispatcher = self.setup_abort_test(agent_finished=False) 279 self.assertIsNone(hqe.finished_on) 280 with self.assertRaises(AssertionError): 281 hqe.abort(dispatcher) 282 self.god.check_playback() 283 # abort failed, finished_on should not be set 284 self.assertIsNone(hqe.finished_on) 285 286 287 def test_abort_success(self): 288 """abort should succeed if all agents for the hqe are finished. 289 """ 290 hqe, dispatcher = self.setup_abort_test(agent_finished=True) 291 self.assertIsNone(hqe.finished_on) 292 hqe.abort(dispatcher) 293 self.god.check_playback() 294 self.assertIsNotNone(hqe.finished_on) 295 296 297 def test_set_finished_on(self): 298 """Test that finished_on is set when hqe completes.""" 299 for status in host_queue_entry_states.Status.values: 300 hqe = self._create_hqe(hosts=[1]) 301 hqe.started_on = datetime.datetime.now() 302 hqe.job.update_field('shard_id', 3) 303 self.assertIsNone(hqe.finished_on) 304 hqe.set_status(status) 305 if status in host_queue_entry_states.COMPLETE_STATUSES: 306 self.assertIsNotNone(hqe.finished_on) 307 self.assertIsNone(hqe.job.shard_id) 308 else: 309 self.assertIsNone(hqe.finished_on) 310 self.assertEquals(hqe.job.shard_id, 3) 311 312 313class JobTest(BaseSchedulerModelsTest): 314 def setUp(self): 315 super(JobTest, self).setUp() 316 317 def _mock_create(**kwargs): 318 task = models.SpecialTask(**kwargs) 319 task.save() 320 self._tasks.append(task) 321 self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create) 322 323 324 def _test_pre_job_tasks_helper(self, 325 reboot_before=model_attributes.RebootBefore.ALWAYS): 326 """ 327 Calls HQE._do_schedule_pre_job_tasks() and returns the created special 328 task 329 """ 330 self._tasks = [] 331 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 332 queue_entry.job.reboot_before = reboot_before 333 queue_entry._do_schedule_pre_job_tasks() 334 return self._tasks 335 336 337 def test_job_request_abort(self): 338 django_job = self._create_job(hosts=[5, 6], atomic_group=1) 339 job = scheduler_models.Job(django_job.id) 340 job.request_abort() 341 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id)) 342 for hqe in django_hqes: 343 self.assertTrue(hqe.aborted) 344 345 346 def test__atomic_and_has_started__on_atomic(self): 347 self._create_job(hosts=[5, 6], atomic_group=1) 348 job = scheduler_models.Job.fetch('id = 1')[0] 349 self.assertFalse(job._atomic_and_has_started()) 350 351 self._update_hqe("status='Pending'") 352 self.assertFalse(job._atomic_and_has_started()) 353 self._update_hqe("status='Verifying'") 354 self.assertFalse(job._atomic_and_has_started()) 355 self.assertFalse(job._atomic_and_has_started()) 356 self._update_hqe("status='Failed'") 357 self.assertFalse(job._atomic_and_has_started()) 358 self._update_hqe("status='Stopped'") 359 self.assertFalse(job._atomic_and_has_started()) 360 361 self._update_hqe("status='Starting'") 362 self.assertTrue(job._atomic_and_has_started()) 363 self._update_hqe("status='Completed'") 364 self.assertTrue(job._atomic_and_has_started()) 365 self._update_hqe("status='Aborted'") 366 367 368 def test__atomic_and_has_started__not_atomic(self): 369 self._create_job(hosts=[1, 2]) 370 job = scheduler_models.Job.fetch('id = 1')[0] 371 self.assertFalse(job._atomic_and_has_started()) 372 self._update_hqe("status='Starting'") 373 self.assertFalse(job._atomic_and_has_started()) 374 375 376 def _check_special_tasks(self, tasks, task_types): 377 self.assertEquals(len(tasks), len(task_types)) 378 for task, (task_type, queue_entry_id) in zip(tasks, task_types): 379 self.assertEquals(task.task, task_type) 380 self.assertEquals(task.host.id, 1) 381 if queue_entry_id: 382 self.assertEquals(task.queue_entry.id, queue_entry_id) 383 384 385 def test_run_asynchronous(self): 386 self._create_job(hosts=[1, 2]) 387 388 tasks = self._test_pre_job_tasks_helper() 389 390 self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)]) 391 392 393 def test_run_asynchronous_skip_verify(self): 394 job = self._create_job(hosts=[1, 2]) 395 job.run_verify = False 396 job.save() 397 398 tasks = self._test_pre_job_tasks_helper() 399 400 self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)]) 401 402 403 def test_run_synchronous_verify(self): 404 self._create_job(hosts=[1, 2], synchronous=True) 405 406 tasks = self._test_pre_job_tasks_helper() 407 408 self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)]) 409 410 411 def test_run_synchronous_skip_verify(self): 412 job = self._create_job(hosts=[1, 2], synchronous=True) 413 job.run_verify = False 414 job.save() 415 416 tasks = self._test_pre_job_tasks_helper() 417 418 self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)]) 419 420 421 def test_run_asynchronous_do_not_reset(self): 422 job = self._create_job(hosts=[1, 2]) 423 job.run_reset = False 424 job.run_verify = False 425 job.save() 426 427 tasks = self._test_pre_job_tasks_helper() 428 429 self.assertEquals(tasks, []) 430 431 432 def test_run_synchronous_do_not_reset_no_RebootBefore(self): 433 job = self._create_job(hosts=[1, 2], synchronous=True) 434 job.reboot_before = model_attributes.RebootBefore.NEVER 435 job.save() 436 437 tasks = self._test_pre_job_tasks_helper( 438 reboot_before=model_attributes.RebootBefore.NEVER) 439 440 self._check_special_tasks(tasks, [(models.SpecialTask.Task.VERIFY, 1)]) 441 442 443 def test_run_asynchronous_do_not_reset(self): 444 job = self._create_job(hosts=[1, 2], synchronous=False) 445 job.reboot_before = model_attributes.RebootBefore.NEVER 446 job.save() 447 448 tasks = self._test_pre_job_tasks_helper( 449 reboot_before=model_attributes.RebootBefore.NEVER) 450 451 self._check_special_tasks(tasks, [(models.SpecialTask.Task.VERIFY, 1)]) 452 453 454 def test_run_atomic_group_already_started(self): 455 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 456 self._update_hqe("status='Starting', execution_subdir=''") 457 458 job = scheduler_models.Job.fetch('id = 1')[0] 459 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 460 assert queue_entry.job is job 461 self.assertEqual(None, job.run(queue_entry)) 462 463 self.god.check_playback() 464 465 466 def test_reboot_before_always(self): 467 job = self._create_job(hosts=[1]) 468 job.reboot_before = model_attributes.RebootBefore.ALWAYS 469 job.save() 470 471 tasks = self._test_pre_job_tasks_helper() 472 473 self._check_special_tasks(tasks, [ 474 (models.SpecialTask.Task.RESET, None) 475 ]) 476 477 478 def _test_reboot_before_if_dirty_helper(self): 479 job = self._create_job(hosts=[1]) 480 job.reboot_before = model_attributes.RebootBefore.IF_DIRTY 481 job.save() 482 483 tasks = self._test_pre_job_tasks_helper() 484 task_types = [(models.SpecialTask.Task.RESET, None)] 485 486 self._check_special_tasks(tasks, task_types) 487 488 489 def test_reboot_before_if_dirty(self): 490 models.Host.smart_get(1).update_object(dirty=True) 491 self._test_reboot_before_if_dirty_helper() 492 493 494 def test_reboot_before_not_dirty(self): 495 models.Host.smart_get(1).update_object(dirty=False) 496 self._test_reboot_before_if_dirty_helper() 497 498 499 def test_next_group_name(self): 500 django_job = self._create_job(metahosts=[1]) 501 job = scheduler_models.Job(id=django_job.id) 502 self.assertEqual('group0', job._next_group_name()) 503 504 for hqe in django_job.hostqueueentry_set.filter(): 505 hqe.execution_subdir = 'my_rack.group0' 506 hqe.save() 507 self.assertEqual('my_rack.group1', job._next_group_name('my/rack')) 508 509 510if __name__ == '__main__': 511 unittest.main() 512