1#!/usr/bin/python 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import base64 7import mox 8import unittest 9 10import common 11 12from autotest_lib.client.common_lib import utils 13from autotest_lib.site_utils import cloud_console_client 14from autotest_lib.site_utils import cloud_console_pb2 as cpcon 15from autotest_lib.site_utils import pubsub_utils 16 17class PubSubBasedClientTests(mox.MoxTestBase): 18 """Tests for the 'PubSubBasedClient'.""" 19 20 def setUp(self): 21 super(PubSubBasedClientTests, self).setUp() 22 self._pubsub_client_mock = self.mox.CreateMock( 23 pubsub_utils.PubSubClient) 24 self._stubs = mox.stubout.StubOutForTesting() 25 self._stubs.Set(cloud_console_client, '_create_pubsub_client', 26 lambda x: self._pubsub_client_mock) 27 28 def tearDown(self): 29 self._stubs.UnsetAll() 30 super(PubSubBasedClientTests, self).tearDown() 31 32 def test_create_test_result_notification(self): 33 """Tests the test result notification message.""" 34 self._console_client = cloud_console_client.PubSubBasedClient() 35 self.mox.StubOutWithMock(utils, 36 'get_moblab_serial_number') 37 utils.get_moblab_serial_number().AndReturn( 38 'PV120BB8JAC01E') 39 self.mox.StubOutWithMock(utils, 'get_moblab_id') 40 utils.get_moblab_id().AndReturn( 41 'c8386d92-9ad1-11e6-80f5-111111111111') 42 self.mox.ReplayAll() 43 console_client = cloud_console_client.PubSubBasedClient() 44 msg = console_client._create_test_job_offloaded_message( 45 'gs://test_bucket/123-moblab') 46 self.assertEquals(base64.b64encode( 47 cloud_console_client.LEGACY_TEST_OFFLOAD_MESSAGE), msg['data']) 48 self.assertEquals( 49 cloud_console_client.CURRENT_MESSAGE_VERSION, 50 msg['attributes'][cloud_console_client.LEGACY_ATTR_VERSION]) 51 self.assertEquals( 52 '1c:dc:d1:11:01:e1', 53 msg['attributes'][cloud_console_client.LEGACY_ATTR_MOBLAB_MAC] 54 ) 55 self.assertEquals( 56 'c8386d92-9ad1-11e6-80f5-111111111111', 57 msg['attributes'][cloud_console_client.LEGACY_ATTR_MOBLAB_ID]) 58 self.assertEquals( 59 'gs://test_bucket/123-moblab', 60 msg['attributes'][cloud_console_client.LEGACY_ATTR_GCS_URI]) 61 self.mox.VerifyAll() 62 63 def test_send_test_job_offloaded_message(self): 64 """Tests send job offloaded notification.""" 65 console_client = cloud_console_client.PubSubBasedClient( 66 pubsub_topic='test topic') 67 68 message = {'data': 'dummy data', 'attributes' : {'key' : 'value'}} 69 self.mox.StubOutWithMock(cloud_console_client.PubSubBasedClient, 70 '_create_test_job_offloaded_message') 71 console_client._create_test_job_offloaded_message( 72 'gs://test_bucket/123-moblab').AndReturn(message) 73 74 msg_ids = ['1'] 75 self._pubsub_client_mock.publish_notifications( 76 'test topic', [message]).AndReturn(msg_ids) 77 78 self.mox.ReplayAll() 79 result = console_client.send_test_job_offloaded_message( 80 'gs://test_bucket/123-moblab') 81 self.assertTrue(result) 82 self.mox.VerifyAll() 83 84 def test_send_heartbeat(self): 85 """Tests send heartbeat.""" 86 console_client = cloud_console_client.PubSubBasedClient( 87 pubsub_topic='test topic') 88 self.mox.StubOutWithMock(utils, 'get_moblab_id') 89 utils.get_moblab_id().AndReturn( 90 'c8386d92-9ad1-11e6-80f5-111111111111') 91 92 message = { 93 'attributes' : { 94 'ATTR_MOBLAB_ID': 'c8386d92-9ad1-11e6-80f5-111111111111', 95 'ATTR_MESSAGE_VERSION': '1', 96 'ATTR_MESSAGE_TYPE': 'MSG_MOBLAB_HEARTBEAT', 97 'ATTR_MOBLAB_MAC_ADDRESS': '8c:dc:d4:56:06:e7'}} 98 msg_ids = ['1'] 99 self._pubsub_client_mock.publish_notifications( 100 'test topic', [message]).AndReturn(msg_ids) 101 self.mox.ReplayAll() 102 console_client.send_heartbeat() 103 self.mox.VerifyAll() 104 105 def test_send_event(self): 106 """Tests send heartbeat.""" 107 console_client = cloud_console_client.PubSubBasedClient( 108 pubsub_topic='test topic') 109 self.mox.StubOutWithMock(utils, 'get_moblab_id') 110 utils.get_moblab_id().AndReturn( 111 'c8386d92-9ad1-11e6-80f5-111111111111') 112 113 message = { 114 'data': '\x08\x01\x12\x0ethis is a test', 115 'attributes' : { 116 'ATTR_MOBLAB_ID': 'c8386d92-9ad1-11e6-80f5-111111111111', 117 'ATTR_MESSAGE_VERSION': '1', 118 'ATTR_MESSAGE_TYPE': 'MSG_MOBLAB_REMOTE_EVENT', 119 'ATTR_MOBLAB_MAC_ADDRESS': '8c:dc:d4:56:06:e7'}} 120 msg_ids = ['1'] 121 self._pubsub_client_mock.publish_notifications( 122 'test topic', [message]).AndReturn(msg_ids) 123 self.mox.ReplayAll() 124 console_client.send_event( 125 cpcon.RemoteEventMessage.EVENT_MOBLAB_BOOT_COMPLETE, 126 'this is a test') 127 self.mox.VerifyAll() 128 129if __name__ == '__main__': 130 unittest.main() 131