1#!/usr/bin/env python2
2# pylint: disable=missing-docstring
3
4import datetime
5import mox
6import unittest
7
8import common
9from autotest_lib.client.common_lib import control_data
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import priorities
13from autotest_lib.client.common_lib.cros import dev_server
14from autotest_lib.client.common_lib.test_utils import mock
15from autotest_lib.frontend import setup_django_environment
16from autotest_lib.frontend.afe import frontend_test_utils
17from autotest_lib.frontend.afe import model_logic
18from autotest_lib.frontend.afe import models
19from autotest_lib.frontend.afe import rpc_interface
20from autotest_lib.frontend.afe import rpc_utils
21from autotest_lib.server import frontend
22from autotest_lib.server import utils as server_utils
23from autotest_lib.server.cros import provision
24from autotest_lib.server.cros.dynamic_suite import constants
25from autotest_lib.server.cros.dynamic_suite import control_file_getter
26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
27from autotest_lib.server.cros.dynamic_suite import suite_common
28
29
30CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
31SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
32
33_hqe_status = models.HostQueueEntry.Status
34
35
36class ShardHeartbeatTest(mox.MoxTestBase, unittest.TestCase):
37
38    _PRIORITY = priorities.Priority.DEFAULT
39
40
41    def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
42                                          upload_jobs=(), upload_hqes=(),
43                                          known_jobs=(), known_hosts=(),
44                                          **kwargs):
45        known_job_ids = [job.id for job in known_jobs]
46        known_host_ids = [host.id for host in known_hosts]
47        known_host_statuses = [host.status for host in known_hosts]
48
49        retval = rpc_interface.shard_heartbeat(
50            shard_hostname=shard_hostname,
51            jobs=upload_jobs, hqes=upload_hqes,
52            known_job_ids=known_job_ids, known_host_ids=known_host_ids,
53            known_host_statuses=known_host_statuses)
54
55        self._assert_shard_heartbeat_response(shard_hostname, retval,
56                                              **kwargs)
57
58        return shard_hostname
59
60
61    def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
62                                         hosts=[], hqes=[],
63                                         incorrect_host_ids=[]):
64
65        retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
66        retval_incorrect_hosts = retval['incorrect_host_ids']
67
68        expected_jobs = [
69            (job.id, job.name, shard_hostname) for job in jobs]
70        returned_jobs = [(job['id'], job['name'], job['shard']['hostname'])
71                         for job in retval_jobs]
72        self.assertEqual(returned_jobs, expected_jobs)
73
74        expected_hosts = [(host.id, host.hostname) for host in hosts]
75        returned_hosts = [(host['id'], host['hostname'])
76                          for host in retval_hosts]
77        self.assertEqual(returned_hosts, expected_hosts)
78
79        retval_hqes = []
80        for job in retval_jobs:
81            retval_hqes += job['hostqueueentry_set']
82
83        expected_hqes = [(hqe.id) for hqe in hqes]
84        returned_hqes = [(hqe['id']) for hqe in retval_hqes]
85        self.assertEqual(returned_hqes, expected_hqes)
86
87        self.assertEqual(retval_incorrect_hosts, incorrect_host_ids)
88
89
90    def _createJobForLabel(self, label):
91        job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY,
92                                          control_file='foo',
93                                          control_type=CLIENT,
94                                          meta_hosts=[label.name],
95                                          dependencies=(label.name,))
96        return models.Job.objects.get(id=job_id)
97
98
99    def _testShardHeartbeatFetchHostlessJobHelper(self, host1):
100        """Create a hostless job and ensure it's not assigned to a shard."""
101        label2 = models.Label.objects.create(name='bluetooth', platform=False)
102
103        job1 = self._create_job(hostless=True)
104
105        # Hostless jobs should be executed by the global scheduler.
106        self._do_heartbeat_and_assert_response(hosts=[host1])
107
108
109    def _testShardHeartbeatIncorrectHostsHelper(self, host1):
110        """Ensure that hosts that don't belong to shard are determined."""
111        host2 = models.Host.objects.create(hostname='test_host2', leased=False)
112
113        # host2 should not belong to shard1. Ensure that if shard1 thinks host2
114        # is a known host, then it is returned as invalid.
115        self._do_heartbeat_and_assert_response(known_hosts=[host1, host2],
116                                               incorrect_host_ids=[host2.id])
117
118
119    def _testShardHeartbeatLabelRemovalRaceHelper(self, shard1, host1, label1):
120        """Ensure correctness if label removed during heartbeat."""
121        host2 = models.Host.objects.create(hostname='test_host2', leased=False)
122        host2.labels.add(label1)
123        self.assertEqual(host2.shard, None)
124
125        # In the middle of the assign_to_shard call, remove label1 from shard1.
126        self.mox.StubOutWithMock(models.Host, '_assign_to_shard_nothing_helper')
127        def remove_label():
128            rpc_interface.remove_board_from_shard(shard1.hostname, label1.name)
129
130        models.Host._assign_to_shard_nothing_helper().WithSideEffects(
131            remove_label)
132        self.mox.ReplayAll()
133
134        self._do_heartbeat_and_assert_response(
135            known_hosts=[host1], hosts=[], incorrect_host_ids=[host1.id])
136        host2 = models.Host.smart_get(host2.id)
137        self.assertEqual(host2.shard, None)
138
139
140    def _testShardRetrieveJobsHelper(self, shard1, host1, label1, shard2,
141                                     host2, label2):
142        """Create jobs and retrieve them."""
143        # should never be returned by heartbeat
144        leased_host = models.Host.objects.create(hostname='leased_host',
145                                                 leased=True)
146
147        leased_host.labels.add(label1)
148
149        job1 = self._createJobForLabel(label1)
150
151        job2 = self._createJobForLabel(label2)
152
153        job_completed = self._createJobForLabel(label1)
154        # Job is already being run, so don't sync it
155        job_completed.hostqueueentry_set.update(complete=True)
156        job_completed.hostqueueentry_set.create(complete=False)
157
158        job_active = self._createJobForLabel(label1)
159        # Job is already started, so don't sync it
160        job_active.hostqueueentry_set.update(active=True)
161        job_active.hostqueueentry_set.create(complete=False, active=False)
162
163        self._do_heartbeat_and_assert_response(
164            jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
165
166        self._do_heartbeat_and_assert_response(
167            shard_hostname=shard2.hostname,
168            jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
169
170        host3 = models.Host.objects.create(hostname='test_host3', leased=False)
171        host3.labels.add(label1)
172
173        self._do_heartbeat_and_assert_response(
174            known_jobs=[job1], known_hosts=[host1], hosts=[host3])
175
176
177    def _testResendJobsAfterFailedHeartbeatHelper(self, shard1, host1, label1):
178        """Create jobs, retrieve them, fail on client, fetch them again."""
179        job1 = self._createJobForLabel(label1)
180
181        self._do_heartbeat_and_assert_response(
182            jobs=[job1],
183            hqes=job1.hostqueueentry_set.all(), hosts=[host1])
184
185        # Make sure it's resubmitted by sending last_job=None again
186        self._do_heartbeat_and_assert_response(
187            known_hosts=[host1],
188            jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
189
190        # Now it worked, make sure it's not sent again
191        self._do_heartbeat_and_assert_response(
192            known_jobs=[job1], known_hosts=[host1])
193
194        job1 = models.Job.objects.get(pk=job1.id)
195        job1.hostqueueentry_set.all().update(complete=True)
196
197        # Job is completed, make sure it's not sent again
198        self._do_heartbeat_and_assert_response(
199            known_hosts=[host1])
200
201        job2 = self._createJobForLabel(label1)
202
203        # job2's creation was later, it should be returned now.
204        self._do_heartbeat_and_assert_response(
205            known_hosts=[host1],
206            jobs=[job2], hqes=job2.hostqueueentry_set.all())
207
208        self._do_heartbeat_and_assert_response(
209            known_jobs=[job2], known_hosts=[host1])
210
211        job2 = models.Job.objects.get(pk=job2.pk)
212        job2.hostqueueentry_set.update(aborted=True)
213        # Setting a job to a complete status will set the shard_id to None in
214        # scheduler_models. We have to emulate that here, because we use Django
215        # models in tests.
216        job2.shard = None
217        job2.save()
218
219        self._do_heartbeat_and_assert_response(
220            known_jobs=[job2], known_hosts=[host1],
221            jobs=[job2],
222            hqes=job2.hostqueueentry_set.all())
223
224        models.Test.objects.create(name='platform_BootPerfServer:shard',
225                                   test_type=1)
226        self.mox.StubOutWithMock(server_utils, 'read_file')
227        self.mox.ReplayAll()
228        rpc_interface.delete_shard(hostname=shard1.hostname)
229
230        self.assertRaises(
231            models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
232
233        job1 = models.Job.objects.get(pk=job1.id)
234        label1 = models.Label.objects.get(pk=label1.id)
235
236        self.assertIsNone(job1.shard)
237        self.assertEqual(len(label1.shard_set.all()), 0)
238
239
240    def _testResendHostsAfterFailedHeartbeatHelper(self, host1):
241        """Check that master accepts resending updated records after failure."""
242        # Send the host
243        self._do_heartbeat_and_assert_response(hosts=[host1])
244
245        # Send it again because previous one didn't persist correctly
246        self._do_heartbeat_and_assert_response(hosts=[host1])
247
248        # Now it worked, make sure it isn't sent again
249        self._do_heartbeat_and_assert_response(known_hosts=[host1])
250
251
252class RpcInterfaceTestWithStaticAttribute(
253        mox.MoxTestBase, unittest.TestCase,
254        frontend_test_utils.FrontendTestMixin):
255
256    def setUp(self):
257        super(RpcInterfaceTestWithStaticAttribute, self).setUp()
258        self._frontend_common_setup()
259        self.god = mock.mock_god()
260        self.old_respect_static_config = rpc_interface.RESPECT_STATIC_ATTRIBUTES
261        rpc_interface.RESPECT_STATIC_ATTRIBUTES = True
262        models.RESPECT_STATIC_ATTRIBUTES = True
263
264
265    def tearDown(self):
266        self.god.unstub_all()
267        self._frontend_common_teardown()
268        global_config.global_config.reset_config_values()
269        rpc_interface.RESPECT_STATIC_ATTRIBUTES = self.old_respect_static_config
270        models.RESPECT_STATIC_ATTRIBUTES = self.old_respect_static_config
271
272
273    def _fake_host_with_static_attributes(self):
274        host1 = models.Host.objects.create(hostname='test_host')
275        host1.set_attribute('test_attribute1', 'test_value1')
276        host1.set_attribute('test_attribute2', 'test_value2')
277        self._set_static_attribute(host1, 'test_attribute1', 'static_value1')
278        self._set_static_attribute(host1, 'static_attribute1', 'static_value2')
279        host1.save()
280        return host1
281
282
283    def test_get_hosts(self):
284        host1 = self._fake_host_with_static_attributes()
285        hosts = rpc_interface.get_hosts(hostname=host1.hostname)
286        host = hosts[0]
287
288        self.assertEquals(host['hostname'], 'test_host')
289        self.assertEquals(host['acls'], ['Everyone'])
290        # Respect the value of static attributes.
291        self.assertEquals(host['attributes'],
292                          {'test_attribute1': 'static_value1',
293                           'test_attribute2': 'test_value2',
294                           'static_attribute1': 'static_value2'})
295
296    def test_get_host_attribute_with_static(self):
297        host1 = models.Host.objects.create(hostname='test_host1')
298        host1.set_attribute('test_attribute1', 'test_value1')
299        self._set_static_attribute(host1, 'test_attribute1', 'static_value1')
300        host2 = models.Host.objects.create(hostname='test_host2')
301        host2.set_attribute('test_attribute1', 'test_value1')
302        host2.set_attribute('test_attribute2', 'test_value2')
303
304        attributes = rpc_interface.get_host_attribute(
305                'test_attribute1',
306                hostname__in=['test_host1', 'test_host2'])
307        hosts = [attr['host'] for attr in attributes]
308        values = [attr['value'] for attr in attributes]
309        self.assertEquals(set(hosts),
310                          set(['test_host1', 'test_host2']))
311        self.assertEquals(set(values),
312                          set(['test_value1', 'static_value1']))
313
314
315    def test_get_hosts_by_attribute_without_static(self):
316        host1 = models.Host.objects.create(hostname='test_host1')
317        host1.set_attribute('test_attribute1', 'test_value1')
318        host2 = models.Host.objects.create(hostname='test_host2')
319        host2.set_attribute('test_attribute1', 'test_value1')
320
321        hosts = rpc_interface.get_hosts_by_attribute(
322                'test_attribute1', 'test_value1')
323        self.assertEquals(set(hosts),
324                          set(['test_host1', 'test_host2']))
325
326
327    def test_get_hosts_by_attribute_with_static(self):
328        host1 = models.Host.objects.create(hostname='test_host1')
329        host1.set_attribute('test_attribute1', 'test_value1')
330        self._set_static_attribute(host1, 'test_attribute1', 'test_value1')
331        host2 = models.Host.objects.create(hostname='test_host2')
332        host2.set_attribute('test_attribute1', 'test_value1')
333        self._set_static_attribute(host2, 'test_attribute1', 'static_value1')
334        host3 = models.Host.objects.create(hostname='test_host3')
335        self._set_static_attribute(host3, 'test_attribute1', 'test_value1')
336        host4 = models.Host.objects.create(hostname='test_host4')
337        host4.set_attribute('test_attribute1', 'test_value1')
338        host5 = models.Host.objects.create(hostname='test_host5')
339        host5.set_attribute('test_attribute1', 'temp_value1')
340        self._set_static_attribute(host5, 'test_attribute1', 'test_value1')
341
342        hosts = rpc_interface.get_hosts_by_attribute(
343                'test_attribute1', 'test_value1')
344        # host1: matched, it has the same value for test_attribute1.
345        # host2: not matched, it has a new value in
346        #        afe_static_host_attributes for test_attribute1.
347        # host3: matched, it has a corresponding entry in
348        #        afe_host_attributes for test_attribute1.
349        # host4: matched, test_attribute1 is not replaced by static
350        #        attribute.
351        # host5: matched, it has an updated & matched value for
352        #        test_attribute1 in afe_static_host_attributes.
353        self.assertEquals(set(hosts),
354                          set(['test_host1', 'test_host3',
355                               'test_host4', 'test_host5']))
356
357
358class RpcInterfaceTestWithStaticLabel(ShardHeartbeatTest,
359                                      frontend_test_utils.FrontendTestMixin):
360
361    _STATIC_LABELS = ['board:lumpy']
362
363    def setUp(self):
364        super(RpcInterfaceTestWithStaticLabel, self).setUp()
365        self._frontend_common_setup()
366        self.god = mock.mock_god()
367        self.old_respect_static_config = rpc_interface.RESPECT_STATIC_LABELS
368        rpc_interface.RESPECT_STATIC_LABELS = True
369        models.RESPECT_STATIC_LABELS = True
370
371
372    def tearDown(self):
373        self.god.unstub_all()
374        self._frontend_common_teardown()
375        global_config.global_config.reset_config_values()
376        rpc_interface.RESPECT_STATIC_LABELS = self.old_respect_static_config
377        models.RESPECT_STATIC_LABELS = self.old_respect_static_config
378
379
380    def _fake_host_with_static_labels(self):
381        host1 = models.Host.objects.create(hostname='test_host')
382        label1 = models.Label.objects.create(
383                name='non_static_label1', platform=False)
384        non_static_platform = models.Label.objects.create(
385                name='static_platform', platform=False)
386        static_platform = models.StaticLabel.objects.create(
387                name='static_platform', platform=True)
388        models.ReplacedLabel.objects.create(label_id=non_static_platform.id)
389        host1.static_labels.add(static_platform)
390        host1.labels.add(non_static_platform)
391        host1.labels.add(label1)
392        host1.save()
393        return host1
394
395
396    def test_get_hosts(self):
397        host1 = self._fake_host_with_static_labels()
398        hosts = rpc_interface.get_hosts(hostname=host1.hostname)
399        host = hosts[0]
400
401        self.assertEquals(host['hostname'], 'test_host')
402        self.assertEquals(host['acls'], ['Everyone'])
403        # Respect all labels in afe_hosts_labels.
404        self.assertEquals(host['labels'],
405                          ['non_static_label1', 'static_platform'])
406        # Respect static labels.
407        self.assertEquals(host['platform'], 'static_platform')
408
409
410    def test_get_hosts_multiple_labels(self):
411        self._fake_host_with_static_labels()
412        hosts = rpc_interface.get_hosts(
413                multiple_labels=['non_static_label1', 'static_platform'])
414        host = hosts[0]
415        self.assertEquals(host['hostname'], 'test_host')
416
417
418    def test_delete_static_label(self):
419        label1 = models.Label.smart_get('static')
420
421        host2 = models.Host.objects.all()[1]
422        shard1 = models.Shard.objects.create(hostname='shard1')
423        host2.shard = shard1
424        host2.labels.add(label1)
425        host2.save()
426
427        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
428                                                  'MockAFE')
429        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
430
431        self.assertRaises(error.UnmodifiableLabelException,
432                          rpc_interface.delete_label,
433                          label1.id)
434
435        self.god.check_playback()
436
437
438    def test_modify_static_label(self):
439        label1 = models.Label.smart_get('static')
440        self.assertEqual(label1.invalid, 0)
441
442        host2 = models.Host.objects.all()[1]
443        shard1 = models.Shard.objects.create(hostname='shard1')
444        host2.shard = shard1
445        host2.labels.add(label1)
446        host2.save()
447
448        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
449                                                  'MockAFE')
450        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
451
452        self.assertRaises(error.UnmodifiableLabelException,
453                          rpc_interface.modify_label,
454                          label1.id,
455                          invalid=1)
456
457        self.assertEqual(models.Label.smart_get('static').invalid, 0)
458        self.god.check_playback()
459
460
461    def test_multiple_platforms_add_non_static_to_static(self):
462        """Test non-static platform to a host with static platform."""
463        static_platform = models.StaticLabel.objects.create(
464                name='static_platform', platform=True)
465        non_static_platform = models.Label.objects.create(
466                name='static_platform', platform=True)
467        models.ReplacedLabel.objects.create(label_id=non_static_platform.id)
468        platform2 = models.Label.objects.create(name='platform2', platform=True)
469        host1 = models.Host.objects.create(hostname='test_host')
470        host1.static_labels.add(static_platform)
471        host1.labels.add(non_static_platform)
472        host1.save()
473
474        self.assertRaises(model_logic.ValidationError,
475                          rpc_interface.label_add_hosts, id='platform2',
476                          hosts=['test_host'])
477        self.assertRaises(model_logic.ValidationError,
478                          rpc_interface.host_add_labels,
479                          id='test_host', labels=['platform2'])
480        # make sure the platform didn't get added
481        platforms = rpc_interface.get_labels(
482            host__hostname__in=['test_host'], platform=True)
483        self.assertEquals(len(platforms), 1)
484
485
486    def test_multiple_platforms_add_static_to_non_static(self):
487        """Test static platform to a host with non-static platform."""
488        platform1 = models.Label.objects.create(
489                name='static_platform', platform=True)
490        models.ReplacedLabel.objects.create(label_id=platform1.id)
491        static_platform = models.StaticLabel.objects.create(
492                name='static_platform', platform=True)
493        platform2 = models.Label.objects.create(
494                name='platform2', platform=True)
495
496        host1 = models.Host.objects.create(hostname='test_host')
497        host1.labels.add(platform2)
498        host1.save()
499
500        self.assertRaises(model_logic.ValidationError,
501                          rpc_interface.label_add_hosts,
502                          id='static_platform',
503                          hosts=['test_host'])
504        self.assertRaises(model_logic.ValidationError,
505                          rpc_interface.host_add_labels,
506                          id='test_host', labels=['static_platform'])
507        # make sure the platform didn't get added
508        platforms = rpc_interface.get_labels(
509            host__hostname__in=['test_host'], platform=True)
510        self.assertEquals(len(platforms), 1)
511
512
513    def test_label_remove_hosts(self):
514        """Test remove a label of hosts."""
515        label = models.Label.smart_get('static')
516        static_label = models.StaticLabel.objects.create(name='static')
517
518        host1 = models.Host.objects.create(hostname='test_host')
519        host1.labels.add(label)
520        host1.static_labels.add(static_label)
521        host1.save()
522
523        self.assertRaises(error.UnmodifiableLabelException,
524                          rpc_interface.label_remove_hosts,
525                          id='static', hosts=['test_host'])
526
527
528    def test_host_remove_labels(self):
529        """Test remove labels of a given host."""
530        label = models.Label.smart_get('static')
531        label1 = models.Label.smart_get('label1')
532        label2 = models.Label.smart_get('label2')
533        static_label = models.StaticLabel.objects.create(name='static')
534
535        host1 = models.Host.objects.create(hostname='test_host')
536        host1.labels.add(label)
537        host1.labels.add(label1)
538        host1.labels.add(label2)
539        host1.static_labels.add(static_label)
540        host1.save()
541
542        rpc_interface.host_remove_labels(
543                'test_host', ['static', 'label1'])
544        labels = rpc_interface.get_labels(host__hostname__in=['test_host'])
545        # Only non_static label 'label1' is removed.
546        self.assertEquals(len(labels), 2)
547        self.assertEquals(labels[0].get('name'), 'label2')
548
549
550    def test_remove_board_from_shard(self):
551        """test remove a board (static label) from shard."""
552        label = models.Label.smart_get('static')
553        static_label = models.StaticLabel.objects.create(name='static')
554
555        shard = models.Shard.objects.create(hostname='test_shard')
556        shard.labels.add(label)
557
558        host = models.Host.objects.create(hostname='test_host',
559                                          leased=False,
560                                          shard=shard)
561        host.static_labels.add(static_label)
562        host.save()
563
564        rpc_interface.remove_board_from_shard(shard.hostname, label.name)
565        host1 = models.Host.smart_get(host.id)
566        shard1 = models.Shard.smart_get(shard.id)
567        self.assertEqual(host1.shard, None)
568        self.assertItemsEqual(shard1.labels.all(), [])
569
570
571    def test_check_job_dependencies_success(self):
572        """Test check_job_dependencies successfully."""
573        static_label = models.StaticLabel.objects.create(name='static')
574
575        host = models.Host.objects.create(hostname='test_host')
576        host.static_labels.add(static_label)
577        host.save()
578
579        host1 = models.Host.smart_get(host.id)
580        rpc_utils.check_job_dependencies([host1], ['static'])
581
582
583    def test_check_job_dependencies_fail(self):
584        """Test check_job_dependencies with raising ValidationError."""
585        label = models.Label.smart_get('static')
586        static_label = models.StaticLabel.objects.create(name='static')
587
588        host = models.Host.objects.create(hostname='test_host')
589        host.labels.add(label)
590        host.save()
591
592        host1 = models.Host.smart_get(host.id)
593        self.assertRaises(model_logic.ValidationError,
594                          rpc_utils.check_job_dependencies,
595                          [host1],
596                          ['static'])
597
598    def test_check_job_metahost_dependencies_success(self):
599        """Test check_job_metahost_dependencies successfully."""
600        label1 = models.Label.smart_get('label1')
601        label2 = models.Label.smart_get('label2')
602        label = models.Label.smart_get('static')
603        static_label = models.StaticLabel.objects.create(name='static')
604
605        host = models.Host.objects.create(hostname='test_host')
606        host.static_labels.add(static_label)
607        host.labels.add(label1)
608        host.labels.add(label2)
609        host.save()
610
611        rpc_utils.check_job_metahost_dependencies(
612                [label1, label], [label2.name])
613        rpc_utils.check_job_metahost_dependencies(
614                [label1], [label2.name, static_label.name])
615
616
617    def test_check_job_metahost_dependencies_fail(self):
618        """Test check_job_metahost_dependencies with raising errors."""
619        label1 = models.Label.smart_get('label1')
620        label2 = models.Label.smart_get('label2')
621        label = models.Label.smart_get('static')
622        static_label = models.StaticLabel.objects.create(name='static')
623
624        host = models.Host.objects.create(hostname='test_host')
625        host.labels.add(label1)
626        host.labels.add(label2)
627        host.save()
628
629        self.assertRaises(error.NoEligibleHostException,
630                          rpc_utils.check_job_metahost_dependencies,
631                          [label1, label], [label2.name])
632        self.assertRaises(error.NoEligibleHostException,
633                          rpc_utils.check_job_metahost_dependencies,
634                          [label1], [label2.name, static_label.name])
635
636
637    def _createShardAndHostWithStaticLabel(self,
638                                           shard_hostname='shard1',
639                                           host_hostname='test_host1',
640                                           label_name='board:lumpy'):
641        label = models.Label.objects.create(name=label_name)
642
643        shard = models.Shard.objects.create(hostname=shard_hostname)
644        shard.labels.add(label)
645
646        host = models.Host.objects.create(hostname=host_hostname, leased=False,
647                                          shard=shard)
648        host.labels.add(label)
649        if label_name in self._STATIC_LABELS:
650            models.ReplacedLabel.objects.create(label_id=label.id)
651            static_label = models.StaticLabel.objects.create(name=label_name)
652            host.static_labels.add(static_label)
653
654        return shard, host, label
655
656
657    def testShardHeartbeatFetchHostlessJob(self):
658        shard1, host1, label1 = self._createShardAndHostWithStaticLabel(
659                host_hostname='test_host1')
660        self._testShardHeartbeatFetchHostlessJobHelper(host1)
661
662
663    def testShardHeartbeatIncorrectHosts(self):
664        shard1, host1, label1 = self._createShardAndHostWithStaticLabel(
665                host_hostname='test_host1')
666        self._testShardHeartbeatIncorrectHostsHelper(host1)
667
668
669    def testShardHeartbeatLabelRemovalRace(self):
670        shard1, host1, label1 = self._createShardAndHostWithStaticLabel(
671                host_hostname='test_host1')
672        self._testShardHeartbeatLabelRemovalRaceHelper(shard1, host1, label1)
673
674
675    def testShardRetrieveJobs(self):
676        shard1, host1, label1 = self._createShardAndHostWithStaticLabel()
677        shard2, host2, label2 = self._createShardAndHostWithStaticLabel(
678            'shard2', 'test_host2', 'board:grumpy')
679        self._testShardRetrieveJobsHelper(shard1, host1, label1,
680                                          shard2, host2, label2)
681
682
683    def testResendJobsAfterFailedHeartbeat(self):
684        shard1, host1, label1 = self._createShardAndHostWithStaticLabel()
685        self._testResendJobsAfterFailedHeartbeatHelper(shard1, host1, label1)
686
687
688    def testResendHostsAfterFailedHeartbeat(self):
689        shard1, host1, label1 = self._createShardAndHostWithStaticLabel(
690                host_hostname='test_host1')
691        self._testResendHostsAfterFailedHeartbeatHelper(host1)
692
693
694class RpcInterfaceTest(unittest.TestCase,
695                       frontend_test_utils.FrontendTestMixin):
696    def setUp(self):
697        self._frontend_common_setup()
698        self.god = mock.mock_god()
699
700
701    def tearDown(self):
702        self.god.unstub_all()
703        self._frontend_common_teardown()
704        global_config.global_config.reset_config_values()
705
706
707    def test_validation(self):
708        # omit a required field
709        self.assertRaises(model_logic.ValidationError, rpc_interface.add_label,
710                          name=None)
711        # violate uniqueness constraint
712        self.assertRaises(model_logic.ValidationError, rpc_interface.add_host,
713                          hostname='host1')
714
715
716    def test_multiple_platforms(self):
717        platform2 = models.Label.objects.create(name='platform2', platform=True)
718        self.assertRaises(model_logic.ValidationError,
719                          rpc_interface. label_add_hosts, id='platform2',
720                          hosts=['host1', 'host2'])
721        self.assertRaises(model_logic.ValidationError,
722                          rpc_interface.host_add_labels,
723                          id='host1', labels=['platform2'])
724        # make sure the platform didn't get added
725        platforms = rpc_interface.get_labels(
726            host__hostname__in=['host1', 'host2'], platform=True)
727        self.assertEquals(len(platforms), 1)
728        self.assertEquals(platforms[0]['name'], 'myplatform')
729
730
731    def _check_hostnames(self, hosts, expected_hostnames):
732        self.assertEquals(set(host['hostname'] for host in hosts),
733                          set(expected_hostnames))
734
735
736    def test_ping_db(self):
737        self.assertEquals(rpc_interface.ping_db(), [True])
738
739
740    def test_get_hosts_by_attribute(self):
741        host1 = models.Host.objects.create(hostname='test_host1')
742        host1.set_attribute('test_attribute1', 'test_value1')
743        host2 = models.Host.objects.create(hostname='test_host2')
744        host2.set_attribute('test_attribute1', 'test_value1')
745
746        hosts = rpc_interface.get_hosts_by_attribute(
747                'test_attribute1', 'test_value1')
748        self.assertEquals(set(hosts),
749                          set(['test_host1', 'test_host2']))
750
751
752    def test_get_host_attribute(self):
753        host1 = models.Host.objects.create(hostname='test_host1')
754        host1.set_attribute('test_attribute1', 'test_value1')
755        host2 = models.Host.objects.create(hostname='test_host2')
756        host2.set_attribute('test_attribute1', 'test_value1')
757
758        attributes = rpc_interface.get_host_attribute(
759                'test_attribute1',
760                hostname__in=['test_host1', 'test_host2'])
761        hosts = [attr['host'] for attr in attributes]
762        values = [attr['value'] for attr in attributes]
763        self.assertEquals(set(hosts),
764                          set(['test_host1', 'test_host2']))
765        self.assertEquals(set(values), set(['test_value1']))
766
767
768    def test_get_hosts(self):
769        hosts = rpc_interface.get_hosts()
770        self._check_hostnames(hosts, [host.hostname for host in self.hosts])
771
772        hosts = rpc_interface.get_hosts(hostname='host1')
773        self._check_hostnames(hosts, ['host1'])
774        host = hosts[0]
775        self.assertEquals(sorted(host['labels']), ['label1', 'myplatform'])
776        self.assertEquals(host['platform'], 'myplatform')
777        self.assertEquals(host['acls'], ['my_acl'])
778        self.assertEquals(host['attributes'], {})
779
780
781    def test_get_hosts_multiple_labels(self):
782        hosts = rpc_interface.get_hosts(
783                multiple_labels=['myplatform', 'label1'])
784        self._check_hostnames(hosts, ['host1'])
785
786
787    def test_job_keyvals(self):
788        keyval_dict = {'mykey': 'myvalue'}
789        job_id = rpc_interface.create_job(name='test',
790                                          priority=priorities.Priority.DEFAULT,
791                                          control_file='foo',
792                                          control_type=CLIENT,
793                                          hosts=['host1'],
794                                          keyvals=keyval_dict)
795        jobs = rpc_interface.get_jobs(id=job_id)
796        self.assertEquals(len(jobs), 1)
797        self.assertEquals(jobs[0]['keyvals'], keyval_dict)
798
799
800    def test_get_jobs_summary(self):
801        job = self._create_job(hosts=xrange(1, 4))
802        entries = list(job.hostqueueentry_set.all())
803        entries[1].status = _hqe_status.FAILED
804        entries[1].save()
805        entries[2].status = _hqe_status.FAILED
806        entries[2].aborted = True
807        entries[2].save()
808
809        # Mock up tko_rpc_interface.get_status_counts.
810        self.god.stub_function_to_return(rpc_interface.tko_rpc_interface,
811                                         'get_status_counts',
812                                         None)
813
814        job_summaries = rpc_interface.get_jobs_summary(id=job.id)
815        self.assertEquals(len(job_summaries), 1)
816        summary = job_summaries[0]
817        self.assertEquals(summary['status_counts'], {'Queued': 1,
818                                                     'Failed': 2})
819
820
821    def _check_job_ids(self, actual_job_dicts, expected_jobs):
822        self.assertEquals(
823                set(job_dict['id'] for job_dict in actual_job_dicts),
824                set(job.id for job in expected_jobs))
825
826
827    def test_get_jobs_status_filters(self):
828        HqeStatus = models.HostQueueEntry.Status
829        def create_two_host_job():
830            return self._create_job(hosts=[1, 2])
831        def set_hqe_statuses(job, first_status, second_status):
832            entries = job.hostqueueentry_set.all()
833            entries[0].update_object(status=first_status)
834            entries[1].update_object(status=second_status)
835
836        queued = create_two_host_job()
837
838        queued_and_running = create_two_host_job()
839        set_hqe_statuses(queued_and_running, HqeStatus.QUEUED,
840                           HqeStatus.RUNNING)
841
842        running_and_complete = create_two_host_job()
843        set_hqe_statuses(running_and_complete, HqeStatus.RUNNING,
844                           HqeStatus.COMPLETED)
845
846        complete = create_two_host_job()
847        set_hqe_statuses(complete, HqeStatus.COMPLETED, HqeStatus.COMPLETED)
848
849        started_but_inactive = create_two_host_job()
850        set_hqe_statuses(started_but_inactive, HqeStatus.QUEUED,
851                           HqeStatus.COMPLETED)
852
853        parsing = create_two_host_job()
854        set_hqe_statuses(parsing, HqeStatus.PARSING, HqeStatus.PARSING)
855
856        self._check_job_ids(rpc_interface.get_jobs(not_yet_run=True), [queued])
857        self._check_job_ids(rpc_interface.get_jobs(running=True),
858                      [queued_and_running, running_and_complete,
859                       started_but_inactive, parsing])
860        self._check_job_ids(rpc_interface.get_jobs(finished=True), [complete])
861
862
863    def test_get_jobs_type_filters(self):
864        self.assertRaises(AssertionError, rpc_interface.get_jobs,
865                          suite=True, sub=True)
866        self.assertRaises(AssertionError, rpc_interface.get_jobs,
867                          suite=True, standalone=True)
868        self.assertRaises(AssertionError, rpc_interface.get_jobs,
869                          standalone=True, sub=True)
870
871        parent_job = self._create_job(hosts=[1])
872        child_jobs = self._create_job(hosts=[1, 2],
873                                      parent_job_id=parent_job.id)
874        standalone_job = self._create_job(hosts=[1])
875
876        self._check_job_ids(rpc_interface.get_jobs(suite=True), [parent_job])
877        self._check_job_ids(rpc_interface.get_jobs(sub=True), [child_jobs])
878        self._check_job_ids(rpc_interface.get_jobs(standalone=True),
879                            [standalone_job])
880
881
882    def _create_job_helper(self, **kwargs):
883        return rpc_interface.create_job(name='test',
884                                        priority=priorities.Priority.DEFAULT,
885                                        control_file='control file',
886                                        control_type=SERVER, **kwargs)
887
888
889    def test_one_time_hosts(self):
890        job = self._create_job_helper(one_time_hosts=['testhost'])
891        host = models.Host.objects.get(hostname='testhost')
892        self.assertEquals(host.invalid, True)
893        self.assertEquals(host.labels.count(), 0)
894        self.assertEquals(host.aclgroup_set.count(), 0)
895
896
897    def test_create_job_duplicate_hosts(self):
898        self.assertRaises(model_logic.ValidationError, self._create_job_helper,
899                          hosts=[1, 1])
900
901
902    def test_create_unrunnable_metahost_job(self):
903        self.assertRaises(error.NoEligibleHostException,
904                          self._create_job_helper, meta_hosts=['unused'])
905
906
907    def test_create_hostless_job(self):
908        job_id = self._create_job_helper(hostless=True)
909        job = models.Job.objects.get(pk=job_id)
910        queue_entries = job.hostqueueentry_set.all()
911        self.assertEquals(len(queue_entries), 1)
912        self.assertEquals(queue_entries[0].host, None)
913        self.assertEquals(queue_entries[0].meta_host, None)
914
915
916    def _setup_special_tasks(self):
917        host = self.hosts[0]
918
919        job1 = self._create_job(hosts=[1])
920        job2 = self._create_job(hosts=[1])
921
922        entry1 = job1.hostqueueentry_set.all()[0]
923        entry1.update_object(started_on=datetime.datetime(2009, 1, 2),
924                             execution_subdir='host1')
925        entry2 = job2.hostqueueentry_set.all()[0]
926        entry2.update_object(started_on=datetime.datetime(2009, 1, 3),
927                             execution_subdir='host1')
928
929        self.task1 = models.SpecialTask.objects.create(
930                host=host, task=models.SpecialTask.Task.VERIFY,
931                time_started=datetime.datetime(2009, 1, 1), # ran before job 1
932                is_complete=True, requested_by=models.User.current_user())
933        self.task2 = models.SpecialTask.objects.create(
934                host=host, task=models.SpecialTask.Task.VERIFY,
935                queue_entry=entry2, # ran with job 2
936                is_active=True, requested_by=models.User.current_user())
937        self.task3 = models.SpecialTask.objects.create(
938                host=host, task=models.SpecialTask.Task.VERIFY,
939                requested_by=models.User.current_user()) # not yet run
940
941
942    def test_get_special_tasks(self):
943        self._setup_special_tasks()
944        tasks = rpc_interface.get_special_tasks(host__hostname='host1',
945                                                queue_entry__isnull=True)
946        self.assertEquals(len(tasks), 2)
947        self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY)
948        self.assertEquals(tasks[0]['is_active'], False)
949        self.assertEquals(tasks[0]['is_complete'], True)
950
951
952    def test_get_latest_special_task(self):
953        # a particular usage of get_special_tasks()
954        self._setup_special_tasks()
955        self.task2.time_started = datetime.datetime(2009, 1, 2)
956        self.task2.save()
957
958        tasks = rpc_interface.get_special_tasks(
959                host__hostname='host1', task=models.SpecialTask.Task.VERIFY,
960                time_started__isnull=False, sort_by=['-time_started'],
961                query_limit=1)
962        self.assertEquals(len(tasks), 1)
963        self.assertEquals(tasks[0]['id'], 2)
964
965
966    def _common_entry_check(self, entry_dict):
967        self.assertEquals(entry_dict['host']['hostname'], 'host1')
968        self.assertEquals(entry_dict['job']['id'], 2)
969
970
971    def test_get_host_queue_entries_and_special_tasks(self):
972        self._setup_special_tasks()
973
974        host = self.hosts[0].id
975        entries_and_tasks = (
976                rpc_interface.get_host_queue_entries_and_special_tasks(host))
977
978        paths = [entry['execution_path'] for entry in entries_and_tasks]
979        self.assertEquals(paths, ['hosts/host1/3-verify',
980                                  '2-autotest_system/host1',
981                                  'hosts/host1/2-verify',
982                                  '1-autotest_system/host1',
983                                  'hosts/host1/1-verify'])
984
985        verify2 = entries_and_tasks[2]
986        self._common_entry_check(verify2)
987        self.assertEquals(verify2['type'], 'Verify')
988        self.assertEquals(verify2['status'], 'Running')
989        self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify')
990
991        entry2 = entries_and_tasks[1]
992        self._common_entry_check(entry2)
993        self.assertEquals(entry2['type'], 'Job')
994        self.assertEquals(entry2['status'], 'Queued')
995        self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00')
996
997
998    def _create_hqes_and_start_time_index_entries(self):
999        shard = models.Shard.objects.create(hostname='shard')
1000        job = self._create_job(shard=shard, control_file='foo')
1001        HqeStatus = models.HostQueueEntry.Status
1002
1003        models.HostQueueEntry(
1004            id=1, job=job, started_on='2017-01-01',
1005            status=HqeStatus.QUEUED).save()
1006        models.HostQueueEntry(
1007            id=2, job=job, started_on='2017-01-02',
1008            status=HqeStatus.QUEUED).save()
1009        models.HostQueueEntry(
1010            id=3, job=job, started_on='2017-01-03',
1011            status=HqeStatus.QUEUED).save()
1012
1013        models.HostQueueEntryStartTimes(
1014            insert_time='2017-01-03', highest_hqe_id=3).save()
1015        models.HostQueueEntryStartTimes(
1016            insert_time='2017-01-02', highest_hqe_id=2).save()
1017        models.HostQueueEntryStartTimes(
1018            insert_time='2017-01-01', highest_hqe_id=1).save()
1019
1020    def test_get_host_queue_entries_by_insert_time(self):
1021        """Check the insert_time_after and insert_time_before constraints."""
1022        self._create_hqes_and_start_time_index_entries()
1023        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1024            insert_time_after='2017-01-01')
1025        self.assertEquals(len(hqes), 3)
1026
1027        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1028            insert_time_after='2017-01-02')
1029        self.assertEquals(len(hqes), 2)
1030
1031        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1032            insert_time_after='2017-01-03')
1033        self.assertEquals(len(hqes), 1)
1034
1035        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1036            insert_time_before='2017-01-01')
1037        self.assertEquals(len(hqes), 1)
1038
1039        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1040            insert_time_before='2017-01-02')
1041        self.assertEquals(len(hqes), 2)
1042
1043        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1044            insert_time_before='2017-01-03')
1045        self.assertEquals(len(hqes), 3)
1046
1047
1048    def test_get_host_queue_entries_by_insert_time_with_missing_index_row(self):
1049        """Shows that the constraints are approximate.
1050
1051        The query may return rows which are actually outside of the bounds
1052        given, if the index table does not have an entry for the specific time.
1053        """
1054        self._create_hqes_and_start_time_index_entries()
1055        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1056            insert_time_before='2016-12-01')
1057        self.assertEquals(len(hqes), 1)
1058
1059    def test_get_hqe_by_insert_time_with_before_and_after(self):
1060        self._create_hqes_and_start_time_index_entries()
1061        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1062            insert_time_before='2017-01-02',
1063            insert_time_after='2017-01-02')
1064        self.assertEquals(len(hqes), 1)
1065
1066    def test_get_hqe_by_insert_time_and_id_constraint(self):
1067        self._create_hqes_and_start_time_index_entries()
1068        # The time constraint is looser than the id constraint, so the time
1069        # constraint should take precedence.
1070        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1071            insert_time_before='2017-01-02',
1072            id__lte=1)
1073        self.assertEquals(len(hqes), 1)
1074
1075        # Now make the time constraint tighter than the id constraint.
1076        hqes = rpc_interface.get_host_queue_entries_by_insert_time(
1077            insert_time_before='2017-01-01',
1078            id__lte=42)
1079        self.assertEquals(len(hqes), 1)
1080
1081    def test_view_invalid_host(self):
1082        # RPCs used by View Host page should work for invalid hosts
1083        self._create_job_helper(hosts=[1])
1084        host = self.hosts[0]
1085        host.delete()
1086
1087        self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1',
1088                                                         valid_only=False))
1089        data = rpc_interface.get_hosts(hostname='host1', valid_only=False)
1090        self.assertEquals(1, len(data))
1091
1092        self.assertEquals(1, rpc_interface.get_num_host_queue_entries(
1093                host__hostname='host1'))
1094        data = rpc_interface.get_host_queue_entries(host__hostname='host1')
1095        self.assertEquals(1, len(data))
1096
1097        count = rpc_interface.get_num_host_queue_entries_and_special_tasks(
1098                host=host.id)
1099        self.assertEquals(1, count)
1100        data = rpc_interface.get_host_queue_entries_and_special_tasks(
1101                host=host.id)
1102        self.assertEquals(1, len(data))
1103
1104
1105    def test_reverify_hosts(self):
1106        hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2])
1107        self.assertEquals(hostname_list, ['host1', 'host2'])
1108        tasks = rpc_interface.get_special_tasks()
1109        self.assertEquals(len(tasks), 2)
1110        self.assertEquals(set(task['host']['id'] for task in tasks),
1111                          set([1, 2]))
1112
1113        task = tasks[0]
1114        self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY)
1115        self.assertEquals(task['requested_by'], 'autotest_system')
1116
1117
1118    def test_repair_hosts(self):
1119        hostname_list = rpc_interface.repair_hosts(id__in=[1, 2])
1120        self.assertEquals(hostname_list, ['host1', 'host2'])
1121        tasks = rpc_interface.get_special_tasks()
1122        self.assertEquals(len(tasks), 2)
1123        self.assertEquals(set(task['host']['id'] for task in tasks),
1124                          set([1, 2]))
1125
1126        task = tasks[0]
1127        self.assertEquals(task['task'], models.SpecialTask.Task.REPAIR)
1128        self.assertEquals(task['requested_by'], 'autotest_system')
1129
1130
1131    def _modify_host_helper(self, on_shard=False, host_on_shard=False):
1132        shard_hostname = 'shard1'
1133        if on_shard:
1134            global_config.global_config.override_config_value(
1135                'SHARD', 'shard_hostname', shard_hostname)
1136
1137        host = models.Host.objects.all()[0]
1138        if host_on_shard:
1139            shard = models.Shard.objects.create(hostname=shard_hostname)
1140            host.shard = shard
1141            host.save()
1142
1143        self.assertFalse(host.locked)
1144
1145        self.god.stub_class_method(frontend.AFE, 'run')
1146
1147        if host_on_shard and not on_shard:
1148            mock_afe = self.god.create_mock_class_obj(
1149                    frontend_wrappers.RetryingAFE, 'MockAFE')
1150            self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
1151
1152            mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
1153                    server=shard_hostname, user=None)
1154            mock_afe2.run.expect_call('modify_host_local', id=host.id,
1155                    locked=True, lock_reason='_modify_host_helper lock',
1156                    lock_time=datetime.datetime(2015, 12, 15))
1157        elif on_shard:
1158            mock_afe = self.god.create_mock_class_obj(
1159                    frontend_wrappers.RetryingAFE, 'MockAFE')
1160            self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
1161
1162            mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
1163                    server=server_utils.get_global_afe_hostname(), user=None)
1164            mock_afe2.run.expect_call('modify_host', id=host.id,
1165                    locked=True, lock_reason='_modify_host_helper lock',
1166                    lock_time=datetime.datetime(2015, 12, 15))
1167
1168        rpc_interface.modify_host(id=host.id, locked=True,
1169                                  lock_reason='_modify_host_helper lock',
1170                                  lock_time=datetime.datetime(2015, 12, 15))
1171
1172        host = models.Host.objects.get(pk=host.id)
1173        if on_shard:
1174            # modify_host on shard does nothing but routing the RPC to master.
1175            self.assertFalse(host.locked)
1176        else:
1177            self.assertTrue(host.locked)
1178        self.god.check_playback()
1179
1180
1181    def test_modify_host_on_master_host_on_master(self):
1182        """Call modify_host to master for host in master."""
1183        self._modify_host_helper()
1184
1185
1186    def test_modify_host_on_master_host_on_shard(self):
1187        """Call modify_host to master for host in shard."""
1188        self._modify_host_helper(host_on_shard=True)
1189
1190
1191    def test_modify_host_on_shard(self):
1192        """Call modify_host to shard for host in shard."""
1193        self._modify_host_helper(on_shard=True, host_on_shard=True)
1194
1195
1196    def test_modify_hosts_on_master_host_on_shard(self):
1197        """Ensure calls to modify_hosts are correctly forwarded to shards."""
1198        host1 = models.Host.objects.all()[0]
1199        host2 = models.Host.objects.all()[1]
1200
1201        shard1 = models.Shard.objects.create(hostname='shard1')
1202        host1.shard = shard1
1203        host1.save()
1204
1205        shard2 = models.Shard.objects.create(hostname='shard2')
1206        host2.shard = shard2
1207        host2.save()
1208
1209        self.assertFalse(host1.locked)
1210        self.assertFalse(host2.locked)
1211
1212        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
1213                                                  'MockAFE')
1214        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
1215
1216        # The statuses of one host might differ on master and shard.
1217        # Filters are always applied on the master. So the host on the shard
1218        # will be affected no matter what his status is.
1219        filters_to_use = {'status': 'Ready'}
1220
1221        mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
1222                server='shard2', user=None)
1223        mock_afe2.run.expect_call(
1224            'modify_hosts_local',
1225            host_filter_data={'id__in': [shard1.id, shard2.id]},
1226            update_data={'locked': True,
1227                         'lock_reason': 'Testing forward to shard',
1228                         'lock_time' : datetime.datetime(2015, 12, 15) })
1229
1230        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
1231                server='shard1', user=None)
1232        mock_afe1.run.expect_call(
1233            'modify_hosts_local',
1234            host_filter_data={'id__in': [shard1.id, shard2.id]},
1235            update_data={'locked': True,
1236                         'lock_reason': 'Testing forward to shard',
1237                         'lock_time' : datetime.datetime(2015, 12, 15)})
1238
1239        rpc_interface.modify_hosts(
1240                host_filter_data={'status': 'Ready'},
1241                update_data={'locked': True,
1242                             'lock_reason': 'Testing forward to shard',
1243                             'lock_time' : datetime.datetime(2015, 12, 15) })
1244
1245        host1 = models.Host.objects.get(pk=host1.id)
1246        self.assertTrue(host1.locked)
1247        host2 = models.Host.objects.get(pk=host2.id)
1248        self.assertTrue(host2.locked)
1249        self.god.check_playback()
1250
1251
1252    def test_delete_host(self):
1253        """Ensure an RPC is made on delete a host, if it is on a shard."""
1254        host1 = models.Host.objects.all()[0]
1255        shard1 = models.Shard.objects.create(hostname='shard1')
1256        host1.shard = shard1
1257        host1.save()
1258        host1_id = host1.id
1259
1260        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
1261                                                 'MockAFE')
1262        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
1263
1264        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
1265                server='shard1', user=None)
1266        mock_afe1.run.expect_call('delete_host', id=host1.id)
1267
1268        rpc_interface.delete_host(id=host1.id)
1269
1270        self.assertRaises(models.Host.DoesNotExist,
1271                          models.Host.smart_get, host1_id)
1272
1273        self.god.check_playback()
1274
1275
1276    def test_delete_shard(self):
1277        """Ensure the RPC can delete a shard."""
1278        host1 = models.Host.objects.all()[0]
1279        shard1 = models.Shard.objects.create(hostname='shard1')
1280        host1.shard = shard1
1281        host1.save()
1282
1283        rpc_interface.delete_shard(hostname=shard1.hostname)
1284
1285        host1 = models.Host.smart_get(host1.id)
1286        self.assertIsNone(host1.shard)
1287        self.assertRaises(models.Shard.DoesNotExist,
1288                          models.Shard.smart_get, shard1.hostname)
1289
1290
1291    def test_modify_label(self):
1292        label1 = models.Label.objects.all()[0]
1293        self.assertEqual(label1.invalid, 0)
1294
1295        host2 = models.Host.objects.all()[1]
1296        shard1 = models.Shard.objects.create(hostname='shard1')
1297        host2.shard = shard1
1298        host2.labels.add(label1)
1299        host2.save()
1300
1301        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
1302                                                  'MockAFE')
1303        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
1304
1305        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
1306                server='shard1', user=None)
1307        mock_afe1.run.expect_call('modify_label', id=label1.id, invalid=1)
1308
1309        rpc_interface.modify_label(label1.id, invalid=1)
1310
1311        self.assertEqual(models.Label.objects.all()[0].invalid, 1)
1312        self.god.check_playback()
1313
1314
1315    def test_delete_label(self):
1316        label1 = models.Label.objects.all()[0]
1317
1318        host2 = models.Host.objects.all()[1]
1319        shard1 = models.Shard.objects.create(hostname='shard1')
1320        host2.shard = shard1
1321        host2.labels.add(label1)
1322        host2.save()
1323
1324        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
1325                                                  'MockAFE')
1326        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
1327
1328        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
1329                server='shard1', user=None)
1330        mock_afe1.run.expect_call('delete_label', id=label1.id)
1331
1332        rpc_interface.delete_label(id=label1.id)
1333
1334        self.assertRaises(models.Label.DoesNotExist,
1335                          models.Label.smart_get, label1.id)
1336        self.god.check_playback()
1337
1338
1339    def test_get_image_for_job_with_keyval_build(self):
1340        keyval_dict = {'build': 'cool-image'}
1341        job_id = rpc_interface.create_job(name='test',
1342                                          priority=priorities.Priority.DEFAULT,
1343                                          control_file='foo',
1344                                          control_type=CLIENT,
1345                                          hosts=['host1'],
1346                                          keyvals=keyval_dict)
1347        job = models.Job.objects.get(id=job_id)
1348        self.assertIsNotNone(job)
1349        image = rpc_interface._get_image_for_job(job, True)
1350        self.assertEquals('cool-image', image)
1351
1352
1353    def test_get_image_for_job_with_keyval_builds(self):
1354        keyval_dict = {'builds': {'cros-version': 'cool-image'}}
1355        job_id = rpc_interface.create_job(name='test',
1356                                          priority=priorities.Priority.DEFAULT,
1357                                          control_file='foo',
1358                                          control_type=CLIENT,
1359                                          hosts=['host1'],
1360                                          keyvals=keyval_dict)
1361        job = models.Job.objects.get(id=job_id)
1362        self.assertIsNotNone(job)
1363        image = rpc_interface._get_image_for_job(job, True)
1364        self.assertEquals('cool-image', image)
1365
1366
1367    def test_get_image_for_job_with_control_build(self):
1368        CONTROL_FILE = """build='cool-image'
1369        """
1370        job_id = rpc_interface.create_job(name='test',
1371                                          priority=priorities.Priority.DEFAULT,
1372                                          control_file='foo',
1373                                          control_type=CLIENT,
1374                                          hosts=['host1'])
1375        job = models.Job.objects.get(id=job_id)
1376        self.assertIsNotNone(job)
1377        job.control_file = CONTROL_FILE
1378        image = rpc_interface._get_image_for_job(job, True)
1379        self.assertEquals('cool-image', image)
1380
1381
1382    def test_get_image_for_job_with_control_builds(self):
1383        CONTROL_FILE = """builds={'cros-version': 'cool-image'}
1384        """
1385        job_id = rpc_interface.create_job(name='test',
1386                                          priority=priorities.Priority.DEFAULT,
1387                                          control_file='foo',
1388                                          control_type=CLIENT,
1389                                          hosts=['host1'])
1390        job = models.Job.objects.get(id=job_id)
1391        self.assertIsNotNone(job)
1392        job.control_file = CONTROL_FILE
1393        image = rpc_interface._get_image_for_job(job, True)
1394        self.assertEquals('cool-image', image)
1395
1396
1397class ExtraRpcInterfaceTest(frontend_test_utils.FrontendTestMixin,
1398                            ShardHeartbeatTest):
1399    """Unit tests for functions originally in site_rpc_interface.py.
1400
1401    @var _NAME: fake suite name.
1402    @var _BOARD: fake board to reimage.
1403    @var _BUILD: fake build with which to reimage.
1404    @var _PRIORITY: fake priority with which to reimage.
1405    """
1406    _NAME = 'name'
1407    _BOARD = 'link'
1408    _BUILD = 'link-release/R36-5812.0.0'
1409    _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD}
1410    _PRIORITY = priorities.Priority.DEFAULT
1411    _TIMEOUT = 24
1412
1413
1414    def setUp(self):
1415        super(ExtraRpcInterfaceTest, self).setUp()
1416        self._SUITE_NAME = suite_common.canonicalize_suite_name(
1417            self._NAME)
1418        self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
1419        self._frontend_common_setup(fill_data=False)
1420
1421
1422    def tearDown(self):
1423        self._frontend_common_teardown()
1424
1425
1426    def _setupDevserver(self):
1427        self.mox.StubOutClassWithMocks(dev_server, 'ImageServer')
1428        dev_server.resolve(self._BUILD).AndReturn(self.dev_server)
1429
1430
1431    def _mockDevServerGetter(self, get_control_file=True):
1432        self._setupDevserver()
1433        if get_control_file:
1434          self.getter = self.mox.CreateMock(
1435              control_file_getter.DevServerGetter)
1436          self.mox.StubOutWithMock(control_file_getter.DevServerGetter,
1437                                   'create')
1438          control_file_getter.DevServerGetter.create(
1439              mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter)
1440
1441
1442    def _mockRpcUtils(self, to_return, control_file_substring=''):
1443        """Fake out the autotest rpc_utils module with a mockable class.
1444
1445        @param to_return: the value that rpc_utils.create_job_common() should
1446                          be mocked out to return.
1447        @param control_file_substring: A substring that is expected to appear
1448                                       in the control file output string that
1449                                       is passed to create_job_common.
1450                                       Default: ''
1451        """
1452        download_started_time = constants.DOWNLOAD_STARTED_TIME
1453        payload_finished_time = constants.PAYLOAD_FINISHED_TIME
1454        self.mox.StubOutWithMock(rpc_utils, 'create_job_common')
1455        rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME),
1456                                    mox.StrContains(self._BUILD)),
1457                            priority=self._PRIORITY,
1458                            timeout_mins=self._TIMEOUT*60,
1459                            max_runtime_mins=self._TIMEOUT*60,
1460                            control_type='Server',
1461                            control_file=mox.And(mox.StrContains(self._BOARD),
1462                                                 mox.StrContains(self._BUILD),
1463                                                 mox.StrContains(
1464                                                     control_file_substring)),
1465                            hostless=True,
1466                            keyvals=mox.And(mox.In(download_started_time),
1467                                            mox.In(payload_finished_time))
1468                            ).AndReturn(to_return)
1469
1470
1471    def testStageBuildFail(self):
1472        """Ensure that a failure to stage the desired build fails the RPC."""
1473        self._setupDevserver()
1474
1475        self.dev_server.hostname = 'mox_url'
1476        self.dev_server.stage_artifacts(
1477                image=self._BUILD, artifacts=['test_suites']).AndRaise(
1478                dev_server.DevServerException())
1479        self.mox.ReplayAll()
1480        self.assertRaises(error.StageControlFileFailure,
1481                          rpc_interface.create_suite_job,
1482                          name=self._NAME,
1483                          board=self._BOARD,
1484                          builds=self._BUILDS,
1485                          pool=None)
1486
1487
1488    def testGetControlFileFail(self):
1489        """Ensure that a failure to get needed control file fails the RPC."""
1490        self._mockDevServerGetter()
1491
1492        self.dev_server.hostname = 'mox_url'
1493        self.dev_server.stage_artifacts(
1494                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
1495
1496        self.getter.get_control_file_contents_by_name(
1497            self._SUITE_NAME).AndReturn(None)
1498        self.mox.ReplayAll()
1499        self.assertRaises(error.ControlFileEmpty,
1500                          rpc_interface.create_suite_job,
1501                          name=self._NAME,
1502                          board=self._BOARD,
1503                          builds=self._BUILDS,
1504                          pool=None)
1505
1506
1507    def testGetControlFileListFail(self):
1508        """Ensure that a failure to get needed control file fails the RPC."""
1509        self._mockDevServerGetter()
1510
1511        self.dev_server.hostname = 'mox_url'
1512        self.dev_server.stage_artifacts(
1513                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
1514
1515        self.getter.get_control_file_contents_by_name(
1516            self._SUITE_NAME).AndRaise(error.NoControlFileList())
1517        self.mox.ReplayAll()
1518        self.assertRaises(error.NoControlFileList,
1519                          rpc_interface.create_suite_job,
1520                          name=self._NAME,
1521                          board=self._BOARD,
1522                          builds=self._BUILDS,
1523                          pool=None)
1524
1525
1526    def testCreateSuiteJobFail(self):
1527        """Ensure that failure to schedule the suite job fails the RPC."""
1528        self._mockDevServerGetter()
1529
1530        self.dev_server.hostname = 'mox_url'
1531        self.dev_server.stage_artifacts(
1532                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
1533
1534        self.getter.get_control_file_contents_by_name(
1535            self._SUITE_NAME).AndReturn('f')
1536
1537        self.dev_server.url().AndReturn('mox_url')
1538        self._mockRpcUtils(-1)
1539        self.mox.ReplayAll()
1540        self.assertEquals(
1541            rpc_interface.create_suite_job(name=self._NAME,
1542                                           board=self._BOARD,
1543                                           builds=self._BUILDS, pool=None),
1544            -1)
1545
1546
1547    def testCreateSuiteJobSuccess(self):
1548        """Ensures that success results in a successful RPC."""
1549        self._mockDevServerGetter()
1550
1551        self.dev_server.hostname = 'mox_url'
1552        self.dev_server.stage_artifacts(
1553                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
1554
1555        self.getter.get_control_file_contents_by_name(
1556            self._SUITE_NAME).AndReturn('f')
1557
1558        self.dev_server.url().AndReturn('mox_url')
1559        job_id = 5
1560        self._mockRpcUtils(job_id)
1561        self.mox.ReplayAll()
1562        self.assertEquals(
1563            rpc_interface.create_suite_job(name=self._NAME,
1564                                           board=self._BOARD,
1565                                           builds=self._BUILDS,
1566                                           pool=None),
1567            job_id)
1568
1569
1570    def testCreateSuiteJobNoHostCheckSuccess(self):
1571        """Ensures that success results in a successful RPC."""
1572        self._mockDevServerGetter()
1573
1574        self.dev_server.hostname = 'mox_url'
1575        self.dev_server.stage_artifacts(
1576                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
1577
1578        self.getter.get_control_file_contents_by_name(
1579            self._SUITE_NAME).AndReturn('f')
1580
1581        self.dev_server.url().AndReturn('mox_url')
1582        job_id = 5
1583        self._mockRpcUtils(job_id)
1584        self.mox.ReplayAll()
1585        self.assertEquals(
1586          rpc_interface.create_suite_job(name=self._NAME,
1587                                         board=self._BOARD,
1588                                         builds=self._BUILDS,
1589                                         pool=None, check_hosts=False),
1590          job_id)
1591
1592
1593    def testCreateSuiteJobControlFileSupplied(self):
1594        """Ensure we can supply the control file to create_suite_job."""
1595        self._mockDevServerGetter(get_control_file=False)
1596
1597        self.dev_server.hostname = 'mox_url'
1598        self.dev_server.stage_artifacts(
1599                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
1600        self.dev_server.url().AndReturn('mox_url')
1601        job_id = 5
1602        self._mockRpcUtils(job_id)
1603        self.mox.ReplayAll()
1604        self.assertEquals(
1605            rpc_interface.create_suite_job(name='%s/%s' % (self._NAME,
1606                                                           self._BUILD),
1607                                           board=None,
1608                                           builds=self._BUILDS,
1609                                           pool=None,
1610                                           control_file='CONTROL FILE'),
1611            job_id)
1612
1613
1614    def _get_records_for_sending_to_master(self):
1615        return [{'control_file': 'foo',
1616                 'control_type': 1,
1617                 'created_on': datetime.datetime(2014, 8, 21),
1618                 'drone_set': None,
1619                 'email_list': '',
1620                 'max_runtime_hrs': 72,
1621                 'max_runtime_mins': 1440,
1622                 'name': 'dummy',
1623                 'owner': 'autotest_system',
1624                 'parse_failed_repair': True,
1625                 'priority': 40,
1626                 'reboot_after': 0,
1627                 'reboot_before': 1,
1628                 'run_reset': True,
1629                 'run_verify': False,
1630                 'synch_count': 0,
1631                 'test_retry': 0,
1632                 'timeout': 24,
1633                 'timeout_mins': 1440,
1634                 'id': 1
1635                 }], [{
1636                    'aborted': False,
1637                    'active': False,
1638                    'complete': False,
1639                    'deleted': False,
1640                    'execution_subdir': '',
1641                    'finished_on': None,
1642                    'started_on': None,
1643                    'status': 'Queued',
1644                    'id': 1
1645                }]
1646
1647
1648    def _send_records_to_master_helper(
1649        self, jobs, hqes, shard_hostname='host1',
1650        exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False):
1651        job_id = rpc_interface.create_job(
1652                name='dummy',
1653                priority=self._PRIORITY,
1654                control_file='foo',
1655                control_type=SERVER,
1656                hostless=True)
1657        job = models.Job.objects.get(pk=job_id)
1658        shard = models.Shard.objects.create(hostname='host1')
1659        job.shard = shard
1660        job.save()
1661
1662        if aborted:
1663            job.hostqueueentry_set.update(aborted=True)
1664            job.shard = None
1665            job.save()
1666
1667        hqe = job.hostqueueentry_set.all()[0]
1668        if not exception_to_throw:
1669            self._do_heartbeat_and_assert_response(
1670                shard_hostname=shard_hostname,
1671                upload_jobs=jobs, upload_hqes=hqes)
1672        else:
1673            self.assertRaises(
1674                exception_to_throw,
1675                self._do_heartbeat_and_assert_response,
1676                shard_hostname=shard_hostname,
1677                upload_jobs=jobs, upload_hqes=hqes)
1678
1679
1680    def testSendingRecordsToMaster(self):
1681        """Send records to the master and ensure they are persisted."""
1682        jobs, hqes = self._get_records_for_sending_to_master()
1683        hqes[0]['status'] = 'Completed'
1684        self._send_records_to_master_helper(
1685            jobs=jobs, hqes=hqes, exception_to_throw=None)
1686
1687        # Check the entry was actually written to db
1688        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
1689                         'Completed')
1690
1691
1692    def testSendingRecordsToMasterAbortedOnMaster(self):
1693        """Send records to the master and ensure they are persisted."""
1694        jobs, hqes = self._get_records_for_sending_to_master()
1695        hqes[0]['status'] = 'Completed'
1696        self._send_records_to_master_helper(
1697            jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True)
1698
1699        # Check the entry was actually written to db
1700        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
1701                         'Completed')
1702
1703
1704    def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
1705        """Ensure records belonging to different shard are silently rejected."""
1706        shard1 = models.Shard.objects.create(hostname='shard1')
1707        shard2 = models.Shard.objects.create(hostname='shard2')
1708        job1 = self._create_job(shard=shard1, control_file='foo1')
1709        job2 = self._create_job(shard=shard2, control_file='foo2')
1710        job1_id = job1.id
1711        job2_id = job2.id
1712        hqe1 = models.HostQueueEntry.objects.create(job=job1)
1713        hqe2 = models.HostQueueEntry.objects.create(job=job2)
1714        hqe1_id = hqe1.id
1715        hqe2_id = hqe2.id
1716        job1_record = job1.serialize(include_dependencies=False)
1717        job2_record = job2.serialize(include_dependencies=False)
1718        hqe1_record = hqe1.serialize(include_dependencies=False)
1719        hqe2_record = hqe2.serialize(include_dependencies=False)
1720
1721        # Prepare a bogus job record update from the wrong shard. The update
1722        # should not throw an exception. Non-bogus jobs in the same update
1723        # should happily update.
1724        job1_record.update({'control_file': 'bar1'})
1725        job2_record.update({'control_file': 'bar2'})
1726        hqe1_record.update({'status': 'Aborted'})
1727        hqe2_record.update({'status': 'Aborted'})
1728        self._do_heartbeat_and_assert_response(
1729            shard_hostname='shard2', upload_jobs=[job1_record, job2_record],
1730            upload_hqes=[hqe1_record, hqe2_record])
1731
1732        # Job and HQE record for wrong job should not be modified, because the
1733        # rpc came from the wrong shard. Job and HQE record for valid job are
1734        # modified.
1735        self.assertEqual(models.Job.objects.get(id=job1_id).control_file,
1736                         'foo1')
1737        self.assertEqual(models.Job.objects.get(id=job2_id).control_file,
1738                         'bar2')
1739        self.assertEqual(models.HostQueueEntry.objects.get(id=hqe1_id).status,
1740                         '')
1741        self.assertEqual(models.HostQueueEntry.objects.get(id=hqe2_id).status,
1742                         'Aborted')
1743
1744
1745    def testSendingRecordsToMasterNotExistingJob(self):
1746        """Ensure update for non existing job gets rejected."""
1747        jobs, hqes = self._get_records_for_sending_to_master()
1748        jobs[0]['id'] = 3
1749
1750        self._send_records_to_master_helper(
1751            jobs=jobs, hqes=hqes)
1752
1753
1754    def _createShardAndHostWithLabel(self, shard_hostname='shard1',
1755                                     host_hostname='host1',
1756                                     label_name='board:lumpy'):
1757        """Create a label, host, shard, and assign host to shard."""
1758        try:
1759            label = models.Label.objects.create(name=label_name)
1760        except:
1761            label = models.Label.smart_get(label_name)
1762
1763        shard = models.Shard.objects.create(hostname=shard_hostname)
1764        shard.labels.add(label)
1765
1766        host = models.Host.objects.create(hostname=host_hostname, leased=False,
1767                                          shard=shard)
1768        host.labels.add(label)
1769
1770        return shard, host, label
1771
1772
1773    def testShardLabelRemovalInvalid(self):
1774        """Ensure you cannot remove the wrong label from shard."""
1775        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1776        stumpy_label = models.Label.objects.create(
1777                name='board:stumpy', platform=True)
1778        with self.assertRaises(error.RPCException):
1779            rpc_interface.remove_board_from_shard(
1780                    shard1.hostname, stumpy_label.name)
1781
1782
1783    def testShardHeartbeatLabelRemoval(self):
1784        """Ensure label removal from shard works."""
1785        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1786
1787        self.assertEqual(host1.shard, shard1)
1788        self.assertItemsEqual(shard1.labels.all(), [lumpy_label])
1789        rpc_interface.remove_board_from_shard(
1790                shard1.hostname, lumpy_label.name)
1791        host1 = models.Host.smart_get(host1.id)
1792        shard1 = models.Shard.smart_get(shard1.id)
1793        self.assertEqual(host1.shard, None)
1794        self.assertItemsEqual(shard1.labels.all(), [])
1795
1796
1797    def testCreateListShard(self):
1798        """Retrieve a list of all shards."""
1799        lumpy_label = models.Label.objects.create(name='board:lumpy',
1800                                                  platform=True)
1801        stumpy_label = models.Label.objects.create(name='board:stumpy',
1802                                                  platform=True)
1803        peppy_label = models.Label.objects.create(name='board:peppy',
1804                                                  platform=True)
1805
1806        shard_id = rpc_interface.add_shard(
1807            hostname='host1', labels='board:lumpy,board:stumpy')
1808        self.assertRaises(error.RPCException,
1809                          rpc_interface.add_shard,
1810                          hostname='host1', labels='board:lumpy,board:stumpy')
1811        self.assertRaises(model_logic.ValidationError,
1812                          rpc_interface.add_shard,
1813                          hostname='host1', labels='board:peppy')
1814        shard = models.Shard.objects.get(pk=shard_id)
1815        self.assertEqual(shard.hostname, 'host1')
1816        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
1817        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
1818
1819        self.assertEqual(rpc_interface.get_shards(),
1820                         [{'labels': ['board:lumpy','board:stumpy'],
1821                           'hostname': 'host1',
1822                           'id': 1}])
1823
1824
1825    def testAddBoardsToShard(self):
1826        """Add boards to a given shard."""
1827        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1828        stumpy_label = models.Label.objects.create(name='board:stumpy',
1829                                                   platform=True)
1830        shard_id = rpc_interface.add_board_to_shard(
1831            hostname='shard1', labels='board:stumpy')
1832        # Test whether raise exception when board label does not exist.
1833        self.assertRaises(models.Label.DoesNotExist,
1834                          rpc_interface.add_board_to_shard,
1835                          hostname='shard1', labels='board:test')
1836        # Test whether raise exception when board already sharded.
1837        self.assertRaises(error.RPCException,
1838                          rpc_interface.add_board_to_shard,
1839                          hostname='shard1', labels='board:lumpy')
1840        shard = models.Shard.objects.get(pk=shard_id)
1841        self.assertEqual(shard.hostname, 'shard1')
1842        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
1843        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
1844
1845        self.assertEqual(rpc_interface.get_shards(),
1846                         [{'labels': ['board:lumpy','board:stumpy'],
1847                           'hostname': 'shard1',
1848                           'id': 1}])
1849
1850
1851    def testShardHeartbeatFetchHostlessJob(self):
1852        shard1, host1, label1 = self._createShardAndHostWithLabel()
1853        self._testShardHeartbeatFetchHostlessJobHelper(host1)
1854
1855
1856    def testShardHeartbeatIncorrectHosts(self):
1857        shard1, host1, label1 = self._createShardAndHostWithLabel()
1858        self._testShardHeartbeatIncorrectHostsHelper(host1)
1859
1860
1861    def testShardHeartbeatLabelRemovalRace(self):
1862        shard1, host1, label1 = self._createShardAndHostWithLabel()
1863        self._testShardHeartbeatLabelRemovalRaceHelper(shard1, host1, label1)
1864
1865
1866    def testShardRetrieveJobs(self):
1867        shard1, host1, label1 = self._createShardAndHostWithLabel()
1868        shard2, host2, label2 = self._createShardAndHostWithLabel(
1869                'shard2', 'host2', 'board:grumpy')
1870        self._testShardRetrieveJobsHelper(shard1, host1, label1,
1871                                          shard2, host2, label2)
1872
1873
1874    def testResendJobsAfterFailedHeartbeat(self):
1875        shard1, host1, label1 = self._createShardAndHostWithLabel()
1876        self._testResendJobsAfterFailedHeartbeatHelper(shard1, host1, label1)
1877
1878
1879    def testResendHostsAfterFailedHeartbeat(self):
1880        shard1, host1, label1 = self._createShardAndHostWithLabel()
1881        self._testResendHostsAfterFailedHeartbeatHelper(host1)
1882
1883
1884if __name__ == '__main__':
1885    unittest.main()
1886