1#!/usr/bin/python 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Unit tests for frontend/afe/site_rpc_interface.py.""" 8 9 10import __builtin__ 11import datetime 12import mox 13import StringIO 14import unittest 15 16import common 17 18from autotest_lib.frontend import setup_django_environment 19from autotest_lib.frontend.afe import frontend_test_utils 20from autotest_lib.frontend.afe import models, model_logic, rpc_utils 21from autotest_lib.client.common_lib import control_data, error 22from autotest_lib.client.common_lib import global_config 23from autotest_lib.client.common_lib import lsbrelease_utils 24from autotest_lib.client.common_lib import priorities 25from autotest_lib.client.common_lib.cros import dev_server 26from autotest_lib.frontend.afe import rpc_interface, site_rpc_interface 27from autotest_lib.server import utils 28from autotest_lib.server.cros.dynamic_suite import control_file_getter 29from autotest_lib.server.cros.dynamic_suite import constants 30from autotest_lib.server.hosts import moblab_host 31 32 33CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT 34SERVER = control_data.CONTROL_TYPE_NAMES.SERVER 35 36 37class SiteRpcInterfaceTest(mox.MoxTestBase, 38 frontend_test_utils.FrontendTestMixin): 39 """Unit tests for functions in site_rpc_interface.py. 40 41 @var _NAME: fake suite name. 42 @var _BOARD: fake board to reimage. 43 @var _BUILD: fake build with which to reimage. 44 @var _PRIORITY: fake priority with which to reimage. 45 """ 46 _NAME = 'name' 47 _BOARD = 'link' 48 _BUILD = 'link-release/R36-5812.0.0' 49 _PRIORITY = priorities.Priority.DEFAULT 50 _TIMEOUT = 24 51 52 53 def setUp(self): 54 super(SiteRpcInterfaceTest, self).setUp() 55 self._SUITE_NAME = site_rpc_interface.canonicalize_suite_name( 56 self._NAME) 57 self.dev_server = self.mox.CreateMock(dev_server.ImageServer) 58 self._frontend_common_setup(fill_data=False) 59 60 61 def tearDown(self): 62 self._frontend_common_teardown() 63 64 65 def _setupDevserver(self): 66 self.mox.StubOutClassWithMocks(dev_server, 'ImageServer') 67 dev_server.ImageServer.resolve(self._BUILD).AndReturn(self.dev_server) 68 69 70 def _mockDevServerGetter(self, get_control_file=True): 71 self._setupDevserver() 72 if get_control_file: 73 self.getter = self.mox.CreateMock( 74 control_file_getter.DevServerGetter) 75 self.mox.StubOutWithMock(control_file_getter.DevServerGetter, 76 'create') 77 control_file_getter.DevServerGetter.create( 78 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter) 79 80 81 def _mockRpcUtils(self, to_return, control_file_substring=''): 82 """Fake out the autotest rpc_utils module with a mockable class. 83 84 @param to_return: the value that rpc_utils.create_job_common() should 85 be mocked out to return. 86 @param control_file_substring: A substring that is expected to appear 87 in the control file output string that 88 is passed to create_job_common. 89 Default: '' 90 """ 91 download_started_time = constants.DOWNLOAD_STARTED_TIME 92 payload_finished_time = constants.PAYLOAD_FINISHED_TIME 93 self.mox.StubOutWithMock(rpc_utils, 'create_job_common') 94 rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME), 95 mox.StrContains(self._BUILD)), 96 priority=self._PRIORITY, 97 timeout_mins=self._TIMEOUT*60, 98 max_runtime_mins=self._TIMEOUT*60, 99 control_type='Server', 100 control_file=mox.And(mox.StrContains(self._BOARD), 101 mox.StrContains(self._BUILD), 102 mox.StrContains( 103 control_file_substring)), 104 hostless=True, 105 keyvals=mox.And(mox.In(download_started_time), 106 mox.In(payload_finished_time)) 107 ).AndReturn(to_return) 108 109 110 def testStageBuildFail(self): 111 """Ensure that a failure to stage the desired build fails the RPC.""" 112 self._setupDevserver() 113 114 self.dev_server.url().AndReturn('mox_url') 115 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 116 self.dev_server.stage_artifacts( 117 self._BUILD, ['test_suites']).AndRaise( 118 dev_server.DevServerException()) 119 self.mox.ReplayAll() 120 self.assertRaises(error.StageControlFileFailure, 121 site_rpc_interface.create_suite_job, 122 name=self._NAME, 123 board=self._BOARD, 124 build=self._BUILD, 125 pool=None) 126 127 128 def testGetControlFileFail(self): 129 """Ensure that a failure to get needed control file fails the RPC.""" 130 self._mockDevServerGetter() 131 132 self.dev_server.url().AndReturn('mox_url') 133 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 134 self.dev_server.stage_artifacts(self._BUILD, 135 ['test_suites']).AndReturn(True) 136 137 self.dev_server.url().AndReturn('mox_url') 138 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 139 self.getter.get_control_file_contents_by_name( 140 self._SUITE_NAME).AndReturn(None) 141 self.mox.ReplayAll() 142 self.assertRaises(error.ControlFileEmpty, 143 site_rpc_interface.create_suite_job, 144 name=self._NAME, 145 board=self._BOARD, 146 build=self._BUILD, 147 pool=None) 148 149 150 def testGetControlFileListFail(self): 151 """Ensure that a failure to get needed control file fails the RPC.""" 152 self._mockDevServerGetter() 153 154 self.dev_server.url().AndReturn('mox_url') 155 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 156 self.dev_server.stage_artifacts(self._BUILD, 157 ['test_suites']).AndReturn(True) 158 159 self.dev_server.url().AndReturn('mox_url') 160 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 161 self.getter.get_control_file_contents_by_name( 162 self._SUITE_NAME).AndRaise(error.NoControlFileList()) 163 self.mox.ReplayAll() 164 self.assertRaises(error.NoControlFileList, 165 site_rpc_interface.create_suite_job, 166 name=self._NAME, 167 board=self._BOARD, 168 build=self._BUILD, 169 pool=None) 170 171 172 def testBadNumArgument(self): 173 """Ensure we handle bad values for the |num| argument.""" 174 self.assertRaises(error.SuiteArgumentException, 175 site_rpc_interface.create_suite_job, 176 name=self._NAME, 177 board=self._BOARD, 178 build=self._BUILD, 179 pool=None, 180 num='goo') 181 self.assertRaises(error.SuiteArgumentException, 182 site_rpc_interface.create_suite_job, 183 name=self._NAME, 184 board=self._BOARD, 185 build=self._BUILD, 186 pool=None, 187 num=[]) 188 self.assertRaises(error.SuiteArgumentException, 189 site_rpc_interface.create_suite_job, 190 name=self._NAME, 191 board=self._BOARD, 192 build=self._BUILD, 193 pool=None, 194 num='5') 195 196 197 198 def testCreateSuiteJobFail(self): 199 """Ensure that failure to schedule the suite job fails the RPC.""" 200 self._mockDevServerGetter() 201 202 self.dev_server.url().AndReturn('mox_url') 203 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 204 self.dev_server.stage_artifacts(self._BUILD, 205 ['test_suites']).AndReturn(True) 206 207 self.dev_server.url().AndReturn('mox_url') 208 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 209 self.getter.get_control_file_contents_by_name( 210 self._SUITE_NAME).AndReturn('f') 211 212 self.dev_server.url().AndReturn('mox_url') 213 self._mockRpcUtils(-1) 214 self.mox.ReplayAll() 215 self.assertEquals( 216 site_rpc_interface.create_suite_job(name=self._NAME, 217 board=self._BOARD, 218 build=self._BUILD, pool=None), 219 -1) 220 221 222 def testCreateSuiteJobSuccess(self): 223 """Ensures that success results in a successful RPC.""" 224 self._mockDevServerGetter() 225 226 self.dev_server.url().AndReturn('mox_url') 227 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 228 self.dev_server.stage_artifacts(self._BUILD, 229 ['test_suites']).AndReturn(True) 230 231 self.dev_server.url().AndReturn('mox_url') 232 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 233 self.getter.get_control_file_contents_by_name( 234 self._SUITE_NAME).AndReturn('f') 235 236 self.dev_server.url().AndReturn('mox_url') 237 job_id = 5 238 self._mockRpcUtils(job_id) 239 self.mox.ReplayAll() 240 self.assertEquals( 241 site_rpc_interface.create_suite_job(name=self._NAME, 242 board=self._BOARD, 243 build=self._BUILD, 244 pool=None), 245 job_id) 246 247 248 def testCreateSuiteJobNoHostCheckSuccess(self): 249 """Ensures that success results in a successful RPC.""" 250 self._mockDevServerGetter() 251 252 self.dev_server.url().AndReturn('mox_url') 253 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 254 self.dev_server.stage_artifacts(self._BUILD, 255 ['test_suites']).AndReturn(True) 256 257 self.dev_server.url().AndReturn('mox_url') 258 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 259 self.getter.get_control_file_contents_by_name( 260 self._SUITE_NAME).AndReturn('f') 261 262 self.dev_server.url().AndReturn('mox_url') 263 job_id = 5 264 self._mockRpcUtils(job_id) 265 self.mox.ReplayAll() 266 self.assertEquals( 267 site_rpc_interface.create_suite_job(name=self._NAME, 268 board=self._BOARD, 269 build=self._BUILD, 270 pool=None, check_hosts=False), 271 job_id) 272 273 def testCreateSuiteIntegerNum(self): 274 """Ensures that success results in a successful RPC.""" 275 self._mockDevServerGetter() 276 277 self.dev_server.url().AndReturn('mox_url') 278 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 279 self.dev_server.stage_artifacts(self._BUILD, 280 ['test_suites']).AndReturn(True) 281 282 self.dev_server.url().AndReturn('mox_url') 283 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 284 self.getter.get_control_file_contents_by_name( 285 self._SUITE_NAME).AndReturn('f') 286 287 self.dev_server.url().AndReturn('mox_url') 288 job_id = 5 289 self._mockRpcUtils(job_id, control_file_substring='num=17') 290 self.mox.ReplayAll() 291 self.assertEquals( 292 site_rpc_interface.create_suite_job(name=self._NAME, 293 board=self._BOARD, 294 build=self._BUILD, 295 pool=None, 296 check_hosts=False, 297 num=17), 298 job_id) 299 300 301 def testCreateSuiteJobControlFileSupplied(self): 302 """Ensure we can supply the control file to create_suite_job.""" 303 self._mockDevServerGetter(get_control_file=False) 304 305 self.dev_server.url().AndReturn('mox_url') 306 self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url') 307 self.dev_server.stage_artifacts(self._BUILD, 308 ['test_suites']).AndReturn(True) 309 self.dev_server.url().AndReturn('mox_url') 310 job_id = 5 311 self._mockRpcUtils(job_id) 312 self.mox.ReplayAll() 313 self.assertEquals( 314 site_rpc_interface.create_suite_job(name='%s/%s' % (self._NAME, 315 self._BUILD), 316 board=None, 317 build=self._BUILD, 318 pool=None, 319 control_file='CONTROL FILE'), 320 job_id) 321 322 323 def setIsMoblab(self, is_moblab): 324 """Set utils.is_moblab result. 325 326 @param is_moblab: Value to have utils.is_moblab to return. 327 """ 328 self.mox.StubOutWithMock(utils, 'is_moblab') 329 utils.is_moblab().AndReturn(is_moblab) 330 331 332 def testMoblabOnlyDecorator(self): 333 """Ensure the moblab only decorator gates functions properly.""" 334 self.setIsMoblab(False) 335 self.mox.ReplayAll() 336 self.assertRaises(error.RPCException, 337 site_rpc_interface.get_config_values) 338 339 340 def testGetConfigValues(self): 341 """Ensure that the config object is properly converted to a dict.""" 342 self.setIsMoblab(True) 343 config_mock = self.mox.CreateMockAnything() 344 site_rpc_interface._CONFIG = config_mock 345 config_mock.get_sections().AndReturn(['section1', 'section2']) 346 config_mock.config = self.mox.CreateMockAnything() 347 config_mock.config.items('section1').AndReturn([('item1', 'value1'), 348 ('item2', 'value2')]) 349 config_mock.config.items('section2').AndReturn([('item3', 'value3'), 350 ('item4', 'value4')]) 351 352 rpc_utils.prepare_for_serialization( 353 {'section1' : [('item1', 'value1'), 354 ('item2', 'value2')], 355 'section2' : [('item3', 'value3'), 356 ('item4', 'value4')]}) 357 self.mox.ReplayAll() 358 site_rpc_interface.get_config_values() 359 360 361 def _mockReadFile(self, path, lines=[]): 362 """Mock out reading a file line by line. 363 364 @param path: Path of the file we are mock reading. 365 @param lines: lines of the mock file that will be returned when 366 readLine() is called. 367 """ 368 mockFile = self.mox.CreateMockAnything() 369 for line in lines: 370 mockFile.readline().AndReturn(line) 371 mockFile.readline() 372 mockFile.close() 373 open(path).AndReturn(mockFile) 374 375 376 def testUpdateConfig(self): 377 """Ensure that updating the config works as expected.""" 378 self.setIsMoblab(True) 379 site_rpc_interface.os = self.mox.CreateMockAnything() 380 381 self.mox.StubOutWithMock(__builtin__, 'open') 382 self._mockReadFile(global_config.DEFAULT_CONFIG_FILE) 383 384 self.mox.StubOutWithMock(lsbrelease_utils, 'is_moblab') 385 lsbrelease_utils.is_moblab().AndReturn(True) 386 387 self._mockReadFile(global_config.DEFAULT_MOBLAB_FILE, 388 ['[section1]', 'item1: value1']) 389 390 site_rpc_interface.os = self.mox.CreateMockAnything() 391 site_rpc_interface.os.path = self.mox.CreateMockAnything() 392 site_rpc_interface.os.path.exists( 393 site_rpc_interface._CONFIG.shadow_file).AndReturn( 394 True) 395 mockShadowFile = self.mox.CreateMockAnything() 396 mockShadowFileContents = StringIO.StringIO() 397 mockShadowFile.__enter__().AndReturn(mockShadowFileContents) 398 mockShadowFile.__exit__(mox.IgnoreArg(), mox.IgnoreArg(), 399 mox.IgnoreArg()) 400 open(site_rpc_interface._CONFIG.shadow_file, 401 'w').AndReturn(mockShadowFile) 402 site_rpc_interface.os.system('sudo reboot') 403 404 self.mox.ReplayAll() 405 site_rpc_interface.update_config_handler( 406 {'section1' : [('item1', 'value1'), 407 ('item2', 'value2')], 408 'section2' : [('item3', 'value3'), 409 ('item4', 'value4')]}) 410 411 # item1 should not be in the new shadow config as its updated value 412 # matches the original config's value. 413 self.assertEquals( 414 mockShadowFileContents.getvalue(), 415 '[section2]\nitem3 = value3\nitem4 = value4\n\n' 416 '[section1]\nitem2 = value2\n\n') 417 418 419 def testResetConfig(self): 420 """Ensure that reset opens the shadow_config file for writing.""" 421 self.setIsMoblab(True) 422 config_mock = self.mox.CreateMockAnything() 423 site_rpc_interface._CONFIG = config_mock 424 config_mock.shadow_file = 'shadow_config.ini' 425 self.mox.StubOutWithMock(__builtin__, 'open') 426 mockFile = self.mox.CreateMockAnything() 427 file_contents = self.mox.CreateMockAnything() 428 mockFile.__enter__().AndReturn(file_contents) 429 mockFile.__exit__(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()) 430 open(config_mock.shadow_file, 'w').AndReturn(mockFile) 431 site_rpc_interface.os = self.mox.CreateMockAnything() 432 site_rpc_interface.os.system('sudo reboot') 433 self.mox.ReplayAll() 434 site_rpc_interface.reset_config_settings() 435 436 437 def testSetBotoKey(self): 438 """Ensure that the botokey path supplied is copied correctly.""" 439 self.setIsMoblab(True) 440 boto_key = '/tmp/boto' 441 site_rpc_interface.os.path = self.mox.CreateMockAnything() 442 site_rpc_interface.os.path.exists(boto_key).AndReturn( 443 True) 444 site_rpc_interface.shutil = self.mox.CreateMockAnything() 445 site_rpc_interface.shutil.copyfile( 446 boto_key, site_rpc_interface.MOBLAB_BOTO_LOCATION) 447 self.mox.ReplayAll() 448 site_rpc_interface.set_boto_key(boto_key) 449 450 451 def testSetLaunchControlKey(self): 452 """Ensure that the Launch Control key path supplied is copied correctly. 453 """ 454 self.setIsMoblab(True) 455 launch_control_key = '/tmp/launch_control' 456 site_rpc_interface.os = self.mox.CreateMockAnything() 457 site_rpc_interface.os.path = self.mox.CreateMockAnything() 458 site_rpc_interface.os.path.exists(launch_control_key).AndReturn( 459 True) 460 site_rpc_interface.shutil = self.mox.CreateMockAnything() 461 site_rpc_interface.shutil.copyfile( 462 launch_control_key, 463 moblab_host.MOBLAB_LAUNCH_CONTROL_KEY_LOCATION) 464 site_rpc_interface.os.system('sudo restart moblab-devserver-init') 465 self.mox.ReplayAll() 466 site_rpc_interface.set_launch_control_key(launch_control_key) 467 468 469 def _get_records_for_sending_to_master(self): 470 return [{'control_file': 'foo', 471 'control_type': 1, 472 'created_on': datetime.datetime(2014, 8, 21), 473 'drone_set': None, 474 'email_list': '', 475 'max_runtime_hrs': 72, 476 'max_runtime_mins': 1440, 477 'name': 'dummy', 478 'owner': 'autotest_system', 479 'parse_failed_repair': True, 480 'priority': 40, 481 'reboot_after': 0, 482 'reboot_before': 1, 483 'run_reset': True, 484 'run_verify': False, 485 'synch_count': 0, 486 'test_retry': 10, 487 'timeout': 24, 488 'timeout_mins': 1440, 489 'id': 1 490 }], [{ 491 'aborted': False, 492 'active': False, 493 'complete': False, 494 'deleted': False, 495 'execution_subdir': '', 496 'finished_on': None, 497 'started_on': None, 498 'status': 'Queued', 499 'id': 1 500 }] 501 502 503 def _do_heartbeat_and_assert_response(self, shard_hostname='shard1', 504 upload_jobs=(), upload_hqes=(), 505 known_jobs=(), known_hosts=(), 506 **kwargs): 507 known_job_ids = [job.id for job in known_jobs] 508 known_host_ids = [host.id for host in known_hosts] 509 known_host_statuses = [host.status for host in known_hosts] 510 511 retval = site_rpc_interface.shard_heartbeat( 512 shard_hostname=shard_hostname, 513 jobs=upload_jobs, hqes=upload_hqes, 514 known_job_ids=known_job_ids, known_host_ids=known_host_ids, 515 known_host_statuses=known_host_statuses) 516 517 self._assert_shard_heartbeat_response(shard_hostname, retval, 518 **kwargs) 519 520 return shard_hostname 521 522 523 def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[], 524 hosts=[], hqes=[]): 525 526 retval_hosts, retval_jobs = retval['hosts'], retval['jobs'] 527 528 expected_jobs = [ 529 (job.id, job.name, shard_hostname) for job in jobs] 530 returned_jobs = [(job['id'], job['name'], job['shard']['hostname']) 531 for job in retval_jobs] 532 self.assertEqual(returned_jobs, expected_jobs) 533 534 expected_hosts = [(host.id, host.hostname) for host in hosts] 535 returned_hosts = [(host['id'], host['hostname']) 536 for host in retval_hosts] 537 self.assertEqual(returned_hosts, expected_hosts) 538 539 retval_hqes = [] 540 for job in retval_jobs: 541 retval_hqes += job['hostqueueentry_set'] 542 543 expected_hqes = [(hqe.id) for hqe in hqes] 544 returned_hqes = [(hqe['id']) for hqe in retval_hqes] 545 self.assertEqual(returned_hqes, expected_hqes) 546 547 548 def _send_records_to_master_helper( 549 self, jobs, hqes, shard_hostname='host1', 550 exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False): 551 job_id = rpc_interface.create_job(name='dummy', priority='Medium', 552 control_file='foo', 553 control_type=SERVER, 554 test_retry=10, hostless=True) 555 job = models.Job.objects.get(pk=job_id) 556 shard = models.Shard.objects.create(hostname='host1') 557 job.shard = shard 558 job.save() 559 560 if aborted: 561 job.hostqueueentry_set.update(aborted=True) 562 job.shard = None 563 job.save() 564 565 hqe = job.hostqueueentry_set.all()[0] 566 if not exception_to_throw: 567 self._do_heartbeat_and_assert_response( 568 shard_hostname=shard_hostname, 569 upload_jobs=jobs, upload_hqes=hqes) 570 else: 571 self.assertRaises( 572 exception_to_throw, 573 self._do_heartbeat_and_assert_response, 574 shard_hostname=shard_hostname, 575 upload_jobs=jobs, upload_hqes=hqes) 576 577 578 def testSendingRecordsToMaster(self): 579 """Send records to the master and ensure they are persisted.""" 580 jobs, hqes = self._get_records_for_sending_to_master() 581 hqes[0]['status'] = 'Completed' 582 self._send_records_to_master_helper( 583 jobs=jobs, hqes=hqes, exception_to_throw=None) 584 585 # Check the entry was actually written to db 586 self.assertEqual(models.HostQueueEntry.objects.all()[0].status, 587 'Completed') 588 589 590 def testSendingRecordsToMasterAbortedOnMaster(self): 591 """Send records to the master and ensure they are persisted.""" 592 jobs, hqes = self._get_records_for_sending_to_master() 593 hqes[0]['status'] = 'Completed' 594 self._send_records_to_master_helper( 595 jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True) 596 597 # Check the entry was actually written to db 598 self.assertEqual(models.HostQueueEntry.objects.all()[0].status, 599 'Completed') 600 601 602 def testSendingRecordsToMasterJobAssignedToDifferentShard(self): 603 """Ensure records that belong to a different shard are rejected.""" 604 jobs, hqes = self._get_records_for_sending_to_master() 605 models.Shard.objects.create(hostname='other_shard') 606 self._send_records_to_master_helper( 607 jobs=jobs, hqes=hqes, shard_hostname='other_shard') 608 609 610 def testSendingRecordsToMasterJobHqeWithoutJob(self): 611 """Ensure update for hqe without update for it's job gets rejected.""" 612 _, hqes = self._get_records_for_sending_to_master() 613 self._send_records_to_master_helper( 614 jobs=[], hqes=hqes) 615 616 617 def testSendingRecordsToMasterNotExistingJob(self): 618 """Ensure update for non existing job gets rejected.""" 619 jobs, hqes = self._get_records_for_sending_to_master() 620 jobs[0]['id'] = 3 621 622 self._send_records_to_master_helper( 623 jobs=jobs, hqes=hqes) 624 625 626 def _createShardAndHostWithLabel(self, shard_hostname='shard1', 627 host_hostname='host1', 628 label_name='board:lumpy'): 629 label = models.Label.objects.create(name=label_name) 630 631 shard = models.Shard.objects.create(hostname=shard_hostname) 632 shard.labels.add(label) 633 634 host = models.Host.objects.create(hostname=host_hostname, leased=False) 635 host.labels.add(label) 636 637 return shard, host, label 638 639 640 def _createJobForLabel(self, label): 641 job_id = rpc_interface.create_job(name='dummy', priority='Medium', 642 control_file='foo', 643 control_type=CLIENT, 644 meta_hosts=[label.name], 645 dependencies=(label.name,)) 646 return models.Job.objects.get(id=job_id) 647 648 649 def testShardHeartbeatFetchHostlessJob(self): 650 """Create a hostless job and ensure it's not assigned to a shard.""" 651 shard1, host1, lumpy_label = self._createShardAndHostWithLabel( 652 'shard1', 'host1', 'board:lumpy') 653 654 label2 = models.Label.objects.create(name='bluetooth', platform=False) 655 656 job1 = self._create_job(hostless=True) 657 658 # Hostless jobs should be executed by the global scheduler. 659 self._do_heartbeat_and_assert_response(hosts=[host1]) 660 661 662 def testShardRetrieveJobs(self): 663 """Create jobs and retrieve them.""" 664 # should never be returned by heartbeat 665 leased_host = models.Host.objects.create(hostname='leased_host', 666 leased=True) 667 668 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 669 shard2, host2, grumpy_label = self._createShardAndHostWithLabel( 670 'shard2', 'host2', 'board:grumpy') 671 672 leased_host.labels.add(lumpy_label) 673 674 job1 = self._createJobForLabel(lumpy_label) 675 676 job2 = self._createJobForLabel(grumpy_label) 677 678 job_completed = self._createJobForLabel(lumpy_label) 679 # Job is already being run, so don't sync it 680 job_completed.hostqueueentry_set.update(complete=True) 681 job_completed.hostqueueentry_set.create(complete=False) 682 683 job_active = self._createJobForLabel(lumpy_label) 684 # Job is already started, so don't sync it 685 job_active.hostqueueentry_set.update(active=True) 686 job_active.hostqueueentry_set.create(complete=False, active=False) 687 688 self._do_heartbeat_and_assert_response( 689 jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all()) 690 691 self._do_heartbeat_and_assert_response( 692 shard_hostname=shard2.hostname, 693 jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all()) 694 695 host3 = models.Host.objects.create(hostname='host3', leased=False) 696 host3.labels.add(lumpy_label) 697 698 self._do_heartbeat_and_assert_response( 699 known_jobs=[job1], known_hosts=[host1], hosts=[host3]) 700 701 702 def testResendJobsAfterFailedHeartbeat(self): 703 """Create jobs, retrieve them, fail on client, fetch them again.""" 704 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 705 706 job1 = self._createJobForLabel(lumpy_label) 707 708 self._do_heartbeat_and_assert_response( 709 jobs=[job1], 710 hqes=job1.hostqueueentry_set.all(), hosts=[host1]) 711 712 # Make sure it's resubmitted by sending last_job=None again 713 self._do_heartbeat_and_assert_response( 714 known_hosts=[host1], 715 jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[]) 716 717 # Now it worked, make sure it's not sent again 718 self._do_heartbeat_and_assert_response( 719 known_jobs=[job1], known_hosts=[host1]) 720 721 job1 = models.Job.objects.get(pk=job1.id) 722 job1.hostqueueentry_set.all().update(complete=True) 723 724 # Job is completed, make sure it's not sent again 725 self._do_heartbeat_and_assert_response( 726 known_hosts=[host1]) 727 728 job2 = self._createJobForLabel(lumpy_label) 729 730 # job2's creation was later, it should be returned now. 731 self._do_heartbeat_and_assert_response( 732 known_hosts=[host1], 733 jobs=[job2], hqes=job2.hostqueueentry_set.all()) 734 735 self._do_heartbeat_and_assert_response( 736 known_jobs=[job2], known_hosts=[host1]) 737 738 job2 = models.Job.objects.get(pk=job2.pk) 739 job2.hostqueueentry_set.update(aborted=True) 740 # Setting a job to a complete status will set the shard_id to None in 741 # scheduler_models. We have to emulate that here, because we use Django 742 # models in tests. 743 job2.shard = None 744 job2.save() 745 746 self._do_heartbeat_and_assert_response( 747 known_jobs=[job2], known_hosts=[host1], 748 jobs=[job2], 749 hqes=job2.hostqueueentry_set.all()) 750 751 site_rpc_interface.delete_shard(hostname=shard1.hostname) 752 753 self.assertRaises( 754 models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id) 755 756 job1 = models.Job.objects.get(pk=job1.id) 757 lumpy_label = models.Label.objects.get(pk=lumpy_label.id) 758 host1 = models.Host.objects.get(pk=host1.id) 759 760 self.assertIsNone(job1.shard) 761 self.assertEqual(len(lumpy_label.shard_set.all()), 0) 762 self.assertIsNone(host1.shard) 763 self.assertEqual([s.task for s in host1.specialtask_set.all()], 764 ['Repair']) 765 766 767 def testCreateListShard(self): 768 """Retrieve a list of all shards.""" 769 lumpy_label = models.Label.objects.create(name='board:lumpy', 770 platform=True) 771 stumpy_label = models.Label.objects.create(name='board:stumpy', 772 platform=True) 773 774 shard_id = site_rpc_interface.add_shard( 775 hostname='host1', labels='board:lumpy,board:stumpy') 776 self.assertRaises(model_logic.ValidationError, 777 site_rpc_interface.add_shard, 778 hostname='host1', labels='board:lumpy,board:stumpy') 779 shard = models.Shard.objects.get(pk=shard_id) 780 self.assertEqual(shard.hostname, 'host1') 781 self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,)) 782 self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,)) 783 784 self.assertEqual(site_rpc_interface.get_shards(), 785 [{'labels': ['board:lumpy','board:stumpy'], 786 'hostname': 'host1', 787 'id': 1}]) 788 789 790 def testResendHostsAfterFailedHeartbeat(self): 791 """Check that master accepts resending updated records after failure.""" 792 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 793 794 # Send the host 795 self._do_heartbeat_and_assert_response(hosts=[host1]) 796 797 # Send it again because previous one didn't persist correctly 798 self._do_heartbeat_and_assert_response(hosts=[host1]) 799 800 # Now it worked, make sure it isn't sent again 801 self._do_heartbeat_and_assert_response(known_hosts=[host1]) 802 803 804if __name__ == '__main__': 805 unittest.main() 806