1# Copyright 2014 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Tests for oauth2client.contrib.keyring_storage."""
16
17import datetime
18import threading
19
20import keyring
21import mock
22import unittest2
23
24import oauth2client
25from oauth2client import client
26from oauth2client.contrib import keyring_storage
27
28
29__author__ = 'jcgregorio@google.com (Joe Gregorio)'
30
31
32class KeyringStorageTests(unittest2.TestCase):
33
34    def test_constructor(self):
35        service_name = 'my_unit_test'
36        user_name = 'me'
37        store = keyring_storage.Storage(service_name, user_name)
38        self.assertEqual(store._service_name, service_name)
39        self.assertEqual(store._user_name, user_name)
40        lock_type = type(threading.Lock())
41        self.assertIsInstance(store._lock, lock_type)
42
43    def test_acquire_lock(self):
44        store = keyring_storage.Storage('my_unit_test', 'me')
45        store._lock = lock = _FakeLock()
46        self.assertEqual(lock._acquire_count, 0)
47        store.acquire_lock()
48        self.assertEqual(lock._acquire_count, 1)
49
50    def test_release_lock(self):
51        store = keyring_storage.Storage('my_unit_test', 'me')
52        store._lock = lock = _FakeLock()
53        self.assertEqual(lock._release_count, 0)
54        store.release_lock()
55        self.assertEqual(lock._release_count, 1)
56
57    def test_locked_get(self):
58        service_name = 'my_unit_test'
59        user_name = 'me'
60        mock_content = (object(), 'mock_content')
61        mock_return_creds = mock.MagicMock()
62        mock_return_creds.set_store = set_store = mock.MagicMock(
63            name='set_store')
64        with mock.patch.object(keyring, 'get_password',
65                               return_value=mock_content,
66                               autospec=True) as get_password:
67            class_name = 'oauth2client.client.Credentials'
68            with mock.patch(class_name) as MockCreds:
69                MockCreds.new_from_json = new_from_json = mock.MagicMock(
70                    name='new_from_json', return_value=mock_return_creds)
71                store = keyring_storage.Storage(service_name, user_name)
72                credentials = store.locked_get()
73                new_from_json.assert_called_once_with(mock_content)
74                get_password.assert_called_once_with(service_name, user_name)
75                self.assertEqual(credentials, mock_return_creds)
76                set_store.assert_called_once_with(store)
77
78    def test_locked_put(self):
79        service_name = 'my_unit_test'
80        user_name = 'me'
81        store = keyring_storage.Storage(service_name, user_name)
82        with mock.patch.object(keyring, 'set_password',
83                               return_value=None,
84                               autospec=True) as set_password:
85            credentials = mock.MagicMock()
86            to_json_ret = object()
87            credentials.to_json = to_json = mock.MagicMock(
88                name='to_json', return_value=to_json_ret)
89            store.locked_put(credentials)
90            to_json.assert_called_once_with()
91            set_password.assert_called_once_with(service_name, user_name,
92                                                 to_json_ret)
93
94    def test_locked_delete(self):
95        service_name = 'my_unit_test'
96        user_name = 'me'
97        store = keyring_storage.Storage(service_name, user_name)
98        with mock.patch.object(keyring, 'set_password',
99                               return_value=None,
100                               autospec=True) as set_password:
101            store.locked_delete()
102            set_password.assert_called_once_with(service_name, user_name, '')
103
104    def test_get_with_no_credentials_stored(self):
105        with mock.patch.object(keyring, 'get_password',
106                               return_value=None,
107                               autospec=True) as get_password:
108            store = keyring_storage.Storage('my_unit_test', 'me')
109            credentials = store.get()
110            self.assertEquals(None, credentials)
111            get_password.assert_called_once_with('my_unit_test', 'me')
112
113    def test_get_with_malformed_json_credentials_stored(self):
114        with mock.patch.object(keyring, 'get_password',
115                               return_value='{',
116                               autospec=True) as get_password:
117            store = keyring_storage.Storage('my_unit_test', 'me')
118            credentials = store.get()
119            self.assertEquals(None, credentials)
120            get_password.assert_called_once_with('my_unit_test', 'me')
121
122    def test_get_and_set_with_json_credentials_stored(self):
123        access_token = 'foo'
124        client_id = 'some_client_id'
125        client_secret = 'cOuDdkfjxxnv+'
126        refresh_token = '1/0/a.df219fjls0'
127        token_expiry = datetime.datetime.utcnow()
128        user_agent = 'refresh_checker/1.0'
129
130        credentials = client.OAuth2Credentials(
131            access_token, client_id, client_secret,
132            refresh_token, token_expiry, oauth2client.GOOGLE_TOKEN_URI,
133            user_agent)
134
135        # Setting autospec on a mock with an iterable side_effect is
136        # currently broken (http://bugs.python.org/issue17826), so instead
137        # we patch twice.
138        with mock.patch.object(keyring, 'get_password',
139                               return_value=None,
140                               autospec=True) as get_password:
141            with mock.patch.object(keyring, 'set_password',
142                                   return_value=None,
143                                   autospec=True) as set_password:
144                store = keyring_storage.Storage('my_unit_test', 'me')
145                self.assertEquals(None, store.get())
146
147                store.put(credentials)
148
149                set_password.assert_called_once_with(
150                    'my_unit_test', 'me', credentials.to_json())
151                get_password.assert_called_once_with('my_unit_test', 'me')
152
153        with mock.patch.object(keyring, 'get_password',
154                               return_value=credentials.to_json(),
155                               autospec=True) as get_password:
156            restored = store.get()
157            self.assertEqual('foo', restored.access_token)
158            self.assertEqual('some_client_id', restored.client_id)
159            get_password.assert_called_once_with('my_unit_test', 'me')
160
161
162class _FakeLock(object):
163
164    _acquire_count = 0
165    _release_count = 0
166
167    def acquire(self):
168        self._acquire_count += 1
169
170    def release(self):
171        self._release_count += 1
172