1# Copyright 2014 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unit tests for oauth2client.clientsecrets."""
16
17import errno
18from io import StringIO
19import os
20import tempfile
21
22import unittest2
23
24import oauth2client
25from oauth2client import _helpers
26from oauth2client import clientsecrets
27
28
29__author__ = 'jcgregorio@google.com (Joe Gregorio)'
30
31
32DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
33VALID_FILE = os.path.join(DATA_DIR, 'client_secrets.json')
34INVALID_FILE = os.path.join(DATA_DIR, 'unfilled_client_secrets.json')
35NONEXISTENT_FILE = os.path.join(
36    os.path.dirname(__file__), 'afilethatisntthere.json')
37
38
39class Test__validate_clientsecrets(unittest2.TestCase):
40
41    def test_with_none(self):
42        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
43            clientsecrets._validate_clientsecrets(None)
44
45    def test_with_other_than_one_key(self):
46        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
47            clientsecrets._validate_clientsecrets({})
48        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
49            clientsecrets._validate_clientsecrets({'one': 'val', 'two': 'val'})
50
51    def test_with_non_dictionary(self):
52        non_dict = [None]
53        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
54            clientsecrets._validate_clientsecrets(non_dict)
55
56    def test_invalid_client_type(self):
57        fake_type = 'fake_type'
58        self.assertNotEqual(fake_type, clientsecrets.TYPE_WEB)
59        self.assertNotEqual(fake_type, clientsecrets.TYPE_INSTALLED)
60        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
61            clientsecrets._validate_clientsecrets({fake_type: None})
62
63    def test_missing_required_type_web(self):
64        required = clientsecrets.VALID_CLIENT[
65            clientsecrets.TYPE_WEB]['required']
66        # We will certainly have less than all 5 keys.
67        self.assertEqual(len(required), 5)
68
69        clientsecrets_dict = {
70            clientsecrets.TYPE_WEB: {'not_required': None},
71        }
72        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
73            clientsecrets._validate_clientsecrets(clientsecrets_dict)
74
75    def test_string_not_configured_type_web(self):
76        string_props = clientsecrets.VALID_CLIENT[
77            clientsecrets.TYPE_WEB]['string']
78
79        self.assertTrue('client_id' in string_props)
80        clientsecrets_dict = {
81            clientsecrets.TYPE_WEB: {
82                'client_id': '[[template]]',
83                'client_secret': 'seekrit',
84                'redirect_uris': None,
85                'auth_uri': None,
86                'token_uri': None,
87            },
88        }
89        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
90            clientsecrets._validate_clientsecrets(clientsecrets_dict)
91
92    def test_missing_required_type_installed(self):
93        required = clientsecrets.VALID_CLIENT[
94            clientsecrets.TYPE_INSTALLED]['required']
95        # We will certainly have less than all 5 keys.
96        self.assertEqual(len(required), 5)
97
98        clientsecrets_dict = {
99            clientsecrets.TYPE_INSTALLED: {'not_required': None},
100        }
101        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
102            clientsecrets._validate_clientsecrets(clientsecrets_dict)
103
104    def test_string_not_configured_type_installed(self):
105        string_props = clientsecrets.VALID_CLIENT[
106            clientsecrets.TYPE_INSTALLED]['string']
107
108        self.assertTrue('client_id' in string_props)
109        clientsecrets_dict = {
110            clientsecrets.TYPE_INSTALLED: {
111                'client_id': '[[template]]',
112                'client_secret': 'seekrit',
113                'redirect_uris': None,
114                'auth_uri': None,
115                'token_uri': None,
116            },
117        }
118        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
119            clientsecrets._validate_clientsecrets(clientsecrets_dict)
120
121    def test_success_type_web(self):
122        client_info = {
123            'client_id': 'eye-dee',
124            'client_secret': 'seekrit',
125            'redirect_uris': None,
126            'auth_uri': None,
127            'token_uri': None,
128        }
129        clientsecrets_dict = {
130            clientsecrets.TYPE_WEB: client_info,
131        }
132        result = clientsecrets._validate_clientsecrets(clientsecrets_dict)
133        self.assertEqual(result, (clientsecrets.TYPE_WEB, client_info))
134
135    def test_success_type_installed(self):
136        client_info = {
137            'client_id': 'eye-dee',
138            'client_secret': 'seekrit',
139            'redirect_uris': None,
140            'auth_uri': None,
141            'token_uri': None,
142        }
143        clientsecrets_dict = {
144            clientsecrets.TYPE_INSTALLED: client_info,
145        }
146        result = clientsecrets._validate_clientsecrets(clientsecrets_dict)
147        self.assertEqual(result, (clientsecrets.TYPE_INSTALLED, client_info))
148
149
150class Test__loadfile(unittest2.TestCase):
151
152    def test_success(self):
153        client_type, client_info = clientsecrets._loadfile(VALID_FILE)
154        expected_client_info = {
155            'client_id': 'foo_client_id',
156            'client_secret': 'foo_client_secret',
157            'redirect_uris': [],
158            'auth_uri': oauth2client.GOOGLE_AUTH_URI,
159            'token_uri': oauth2client.GOOGLE_TOKEN_URI,
160            'revoke_uri': oauth2client.GOOGLE_REVOKE_URI,
161        }
162        self.assertEqual(client_type, clientsecrets.TYPE_WEB)
163        self.assertEqual(client_info, expected_client_info)
164
165    def test_non_existent(self):
166        path = os.path.join(DATA_DIR, 'fake.json')
167        self.assertFalse(os.path.exists(path))
168        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
169            clientsecrets._loadfile(path)
170
171    def test_bad_json(self):
172        filename = tempfile.mktemp()
173        with open(filename, 'wb') as file_obj:
174            file_obj.write(b'[')
175        with self.assertRaises(ValueError):
176            clientsecrets._loadfile(filename)
177
178
179class OAuth2CredentialsTests(unittest2.TestCase):
180
181    def test_validate_error(self):
182        payload = (
183            b'{'
184            b'  "web": {'
185            b'    "client_id": "[[CLIENT ID REQUIRED]]",'
186            b'    "client_secret": "[[CLIENT SECRET REQUIRED]]",'
187            b'    "redirect_uris": ["http://localhost:8080/oauth2callback"],'
188            b'    "auth_uri": "",'
189            b'    "token_uri": ""'
190            b'  }'
191            b'}')
192        ERRORS = [
193            ('{}', 'Invalid'),
194            ('{"foo": {}}', 'Unknown'),
195            ('{"web": {}}', 'Missing'),
196            ('{"web": {"client_id": "dkkd"}}', 'Missing'),
197            (payload, 'Property'),
198        ]
199        for src, match in ERRORS:
200            # Ensure that it is unicode
201            src = _helpers._from_bytes(src)
202            # Test load(s)
203            with self.assertRaises(
204                    clientsecrets.InvalidClientSecretsError) as exc_manager:
205                clientsecrets.loads(src)
206
207            self.assertTrue(str(exc_manager.exception).startswith(match))
208
209            # Test loads(fp)
210            with self.assertRaises(
211                    clientsecrets.InvalidClientSecretsError) as exc_manager:
212                fp = StringIO(src)
213                clientsecrets.load(fp)
214
215            self.assertTrue(str(exc_manager.exception).startswith(match))
216
217    def test_load_by_filename_missing_file(self):
218        with self.assertRaises(
219                clientsecrets.InvalidClientSecretsError) as exc_manager:
220            clientsecrets._loadfile(NONEXISTENT_FILE)
221
222        self.assertEquals(exc_manager.exception.args[1], NONEXISTENT_FILE)
223        self.assertEquals(exc_manager.exception.args[3], errno.ENOENT)
224
225
226class CachedClientsecretsTests(unittest2.TestCase):
227
228    class CacheMock(object):
229        def __init__(self):
230            self.cache = {}
231            self.last_get_ns = None
232            self.last_set_ns = None
233
234        def get(self, key, namespace=''):
235            # ignoring namespace for easier testing
236            self.last_get_ns = namespace
237            return self.cache.get(key, None)
238
239        def set(self, key, value, namespace=''):
240            # ignoring namespace for easier testing
241            self.last_set_ns = namespace
242            self.cache[key] = value
243
244    def setUp(self):
245        self.cache_mock = self.CacheMock()
246
247    def test_cache_miss(self):
248        client_type, client_info = clientsecrets.loadfile(
249            VALID_FILE, cache=self.cache_mock)
250        self.assertEqual('web', client_type)
251        self.assertEqual('foo_client_secret', client_info['client_secret'])
252
253        cached = self.cache_mock.cache[VALID_FILE]
254        self.assertEqual({client_type: client_info}, cached)
255
256        # make sure we're using non-empty namespace
257        ns = self.cache_mock.last_set_ns
258        self.assertTrue(bool(ns))
259        # make sure they're equal
260        self.assertEqual(ns, self.cache_mock.last_get_ns)
261
262    def test_cache_hit(self):
263        self.cache_mock.cache[NONEXISTENT_FILE] = {'web': 'secret info'}
264
265        client_type, client_info = clientsecrets.loadfile(
266            NONEXISTENT_FILE, cache=self.cache_mock)
267        self.assertEqual('web', client_type)
268        self.assertEqual('secret info', client_info)
269        # make sure we didn't do any set() RPCs
270        self.assertEqual(None, self.cache_mock.last_set_ns)
271
272    def test_validation(self):
273        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
274            clientsecrets.loadfile(INVALID_FILE, cache=self.cache_mock)
275
276    def test_without_cache(self):
277        # this also ensures loadfile() is backward compatible
278        client_type, client_info = clientsecrets.loadfile(VALID_FILE)
279        self.assertEqual('web', client_type)
280        self.assertEqual('foo_client_secret', client_info['client_secret'])
281