1#!/usr/bin/env python
2#
3# Copyright 2016 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import os
8import unittest
9
10import mox
11
12from apiclient import discovery
13from oauth2client.client import ApplicationDefaultCredentialsError
14from oauth2client.client import GoogleCredentials
15from googleapiclient.errors import UnknownApiNameOrVersion
16
17import common
18import pubsub_utils
19
20
21class MockedPubSub(object):
22    """A mocked PubSub handle."""
23    def __init__(self, test, topic, msg, retry, ret_val=None,
24            raise_except=False):
25        self.test = test
26        self.topic = topic
27        self.msg = msg
28        self.retry = retry
29        self.ret_val = ret_val
30        self.raise_except = raise_except
31
32    def projects(self):
33        """Mocked PubSub projects."""
34        return self
35
36    def topics(self):
37        """Mocked PubSub topics."""
38        return self
39
40    def publish(self, topic, body):
41        """Mocked PubSub publish method.
42
43        @param topic: PubSub topic string.
44        @param body: PubSub notification body.
45        """
46        self.test.assertEquals(self.topic, topic)
47        self.test.assertEquals(self.msg, body['messages'][0])
48        return self
49
50    def execute(self, num_retries):
51        """Mocked PubSub execute method.
52
53        @param num_retries: Number of retries.
54        """
55        self.test.assertEquals(self.num_retries, num_retries)
56        if self.raise_except:
57            raise Exception()
58        return self.ret
59
60
61def _create_sample_message():
62    """Creates a sample pubsub message."""
63    msg_payload = {'data': 'sample data'}
64    msg_attributes = {}
65    msg_attributes['var'] = 'value'
66    msg_payload['attributes'] = msg_attributes
67
68    return msg_payload
69
70
71class PubSubTests(mox.MoxTestBase):
72    """Tests for pubsub related functios."""
73
74    def test_get_pubsub_service_no_service_account(self):
75        """Test getting the pubsub service"""
76        self.mox.StubOutWithMock(os.path, 'isfile')
77        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
78        self.mox.ReplayAll()
79        pubsub = pubsub_utils._get_pubsub_service()
80        self.assertIsNone(pubsub)
81        self.mox.VerifyAll()
82
83    def test_get_pubsub_service_with_invalid_service_account(self):
84        """Test getting the pubsub service"""
85        self.mox.StubOutWithMock(os.path, 'isfile')
86        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
87        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
88        credentials = self.mox.CreateMock(GoogleCredentials)
89        GoogleCredentials.from_stream(
90                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
91                        ApplicationDefaultCredentialsError())
92        self.mox.ReplayAll()
93        pubsub = pubsub_utils._get_pubsub_service()
94        self.assertIsNone(pubsub)
95        self.mox.VerifyAll()
96
97    def test_get_pubsub_service_with_invalid_service_account(self):
98        """Test getting the pubsub service"""
99        self.mox.StubOutWithMock(os.path, 'isfile')
100        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
101        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
102        credentials = self.mox.CreateMock(GoogleCredentials)
103        GoogleCredentials.from_stream(
104                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
105        credentials.create_scoped_required().AndReturn(True)
106        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
107                credentials)
108        self.mox.StubOutWithMock(discovery, 'build')
109        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
110                pubsub_utils.PUBSUB_VERSION,
111                credentials=credentials).AndRaise(UnknownApiNameOrVersion())
112        self.mox.ReplayAll()
113        pubsub = pubsub_utils._get_pubsub_service()
114        self.assertIsNone(pubsub)
115        self.mox.VerifyAll()
116
117    def test_get_pubsub_service_with_service_account(self):
118        """Test getting the pubsub service"""
119        self.mox.StubOutWithMock(os.path, 'isfile')
120        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
121        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
122        credentials = self.mox.CreateMock(GoogleCredentials)
123        GoogleCredentials.from_stream(
124                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
125        credentials.create_scoped_required().AndReturn(True)
126        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
127                credentials)
128        self.mox.StubOutWithMock(discovery, 'build')
129        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
130                pubsub_utils.PUBSUB_VERSION,
131                credentials=credentials).AndReturn(1)
132        self.mox.ReplayAll()
133        pubsub = pubsub_utils._get_pubsub_service()
134        self.assertIsNotNone(pubsub)
135        self.mox.VerifyAll()
136
137    def test_publish_notifications(self):
138        """Tests publish notifications."""
139        self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service')
140        msg = _create_sample_message()
141        pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub(
142            self,
143            'test_topic',
144            msg,
145            pubsub_utils._PUBSUB_NUM_RETRIES,
146            # use tuple ('123') instead of list just for easy to write the test.
147            ret_val = {'messageIds', ('123')}))
148
149        self.mox.ReplayAll()
150        pubsub_utils.publish_notifications(
151                'test_topic', [msg])
152        self.mox.VerifyAll()
153
154
155if __name__ == '__main__':
156    unittest.main()
157