1#!/usr/bin/env python 2# 3# Copyright 2016 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import os 8import unittest 9 10import mox 11 12from apiclient import discovery 13from oauth2client.client import ApplicationDefaultCredentialsError 14from oauth2client.client import GoogleCredentials 15from googleapiclient.errors import UnknownApiNameOrVersion 16 17import common 18import pubsub_utils 19 20 21class MockedPubSub(object): 22 """A mocked PubSub handle.""" 23 def __init__(self, test, topic, msg, retry, ret_val=None, 24 raise_except=False): 25 self.test = test 26 self.topic = topic 27 self.msg = msg 28 self.retry = retry 29 self.ret_val = ret_val 30 self.raise_except = raise_except 31 32 def projects(self): 33 """Mocked PubSub projects.""" 34 return self 35 36 def topics(self): 37 """Mocked PubSub topics.""" 38 return self 39 40 def publish(self, topic, body): 41 """Mocked PubSub publish method. 42 43 @param topic: PubSub topic string. 44 @param body: PubSub notification body. 45 """ 46 self.test.assertEquals(self.topic, topic) 47 self.test.assertEquals(self.msg, body['messages'][0]) 48 return self 49 50 def execute(self, num_retries): 51 """Mocked PubSub execute method. 52 53 @param num_retries: Number of retries. 54 """ 55 self.test.assertEquals(self.num_retries, num_retries) 56 if self.raise_except: 57 raise Exception() 58 return self.ret 59 60 61def _create_sample_message(): 62 """Creates a sample pubsub message.""" 63 msg_payload = {'data': 'sample data'} 64 msg_attributes = {} 65 msg_attributes['var'] = 'value' 66 msg_payload['attributes'] = msg_attributes 67 68 return msg_payload 69 70 71class PubSubTests(mox.MoxTestBase): 72 """Tests for pubsub related functios.""" 73 74 def test_get_pubsub_service_no_service_account(self): 75 """Test getting the pubsub service""" 76 self.mox.StubOutWithMock(os.path, 'isfile') 77 os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False) 78 self.mox.ReplayAll() 79 pubsub = pubsub_utils._get_pubsub_service() 80 self.assertIsNone(pubsub) 81 self.mox.VerifyAll() 82 83 def test_get_pubsub_service_with_invalid_service_account(self): 84 """Test getting the pubsub service""" 85 self.mox.StubOutWithMock(os.path, 'isfile') 86 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 87 os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 88 credentials = self.mox.CreateMock(GoogleCredentials) 89 GoogleCredentials.from_stream( 90 pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise( 91 ApplicationDefaultCredentialsError()) 92 self.mox.ReplayAll() 93 pubsub = pubsub_utils._get_pubsub_service() 94 self.assertIsNone(pubsub) 95 self.mox.VerifyAll() 96 97 def test_get_pubsub_service_with_invalid_service_account(self): 98 """Test getting the pubsub service""" 99 self.mox.StubOutWithMock(os.path, 'isfile') 100 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 101 os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 102 credentials = self.mox.CreateMock(GoogleCredentials) 103 GoogleCredentials.from_stream( 104 pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials) 105 credentials.create_scoped_required().AndReturn(True) 106 credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn( 107 credentials) 108 self.mox.StubOutWithMock(discovery, 'build') 109 discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME, 110 pubsub_utils.PUBSUB_VERSION, 111 credentials=credentials).AndRaise(UnknownApiNameOrVersion()) 112 self.mox.ReplayAll() 113 pubsub = pubsub_utils._get_pubsub_service() 114 self.assertIsNone(pubsub) 115 self.mox.VerifyAll() 116 117 def test_get_pubsub_service_with_service_account(self): 118 """Test getting the pubsub service""" 119 self.mox.StubOutWithMock(os.path, 'isfile') 120 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 121 os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 122 credentials = self.mox.CreateMock(GoogleCredentials) 123 GoogleCredentials.from_stream( 124 pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials) 125 credentials.create_scoped_required().AndReturn(True) 126 credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn( 127 credentials) 128 self.mox.StubOutWithMock(discovery, 'build') 129 discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME, 130 pubsub_utils.PUBSUB_VERSION, 131 credentials=credentials).AndReturn(1) 132 self.mox.ReplayAll() 133 pubsub = pubsub_utils._get_pubsub_service() 134 self.assertIsNotNone(pubsub) 135 self.mox.VerifyAll() 136 137 def test_publish_notifications(self): 138 """Tests publish notifications.""" 139 self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service') 140 msg = _create_sample_message() 141 pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub( 142 self, 143 'test_topic', 144 msg, 145 pubsub_utils._PUBSUB_NUM_RETRIES, 146 # use tuple ('123') instead of list just for easy to write the test. 147 ret_val = {'messageIds', ('123')})) 148 149 self.mox.ReplayAll() 150 pubsub_utils.publish_notifications( 151 'test_topic', [msg]) 152 self.mox.VerifyAll() 153 154 155if __name__ == '__main__': 156 unittest.main() 157