1# Copyright 2015 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit tests for oauth2client.multistore_file.""" 16 17import datetime 18import errno 19import os 20import stat 21import tempfile 22 23import mock 24import unittest2 25 26from oauth2client import client 27from oauth2client import util 28from oauth2client.contrib import locked_file 29from oauth2client.contrib import multistore_file 30 31_filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data') 32os.close(_filehandle) 33 34 35class _MockLockedFile(object): 36 37 def __init__(self, filename_str, error_class, error_code): 38 self.filename_str = filename_str 39 self.error_class = error_class 40 self.error_code = error_code 41 self.open_and_lock_called = False 42 43 def open_and_lock(self): 44 self.open_and_lock_called = True 45 raise self.error_class(self.error_code, '') 46 47 def is_locked(self): 48 return False 49 50 def filename(self): 51 return self.filename_str 52 53 54class Test__dict_to_tuple_key(unittest2.TestCase): 55 56 def test_key_conversions(self): 57 key1, val1 = 'somekey', 'some value' 58 key2, val2 = 'another', 'something else' 59 key3, val3 = 'onemore', 'foo' 60 test_dict = { 61 key1: val1, 62 key2: val2, 63 key3: val3, 64 } 65 tuple_key = multistore_file._dict_to_tuple_key(test_dict) 66 67 # the resulting key should be naturally sorted 68 expected_output = ( 69 (key2, val2), 70 (key3, val3), 71 (key1, val1), 72 ) 73 self.assertTupleEqual(expected_output, tuple_key) 74 # check we get the original dictionary back 75 self.assertDictEqual(test_dict, dict(tuple_key)) 76 77 78class MultistoreFileTests(unittest2.TestCase): 79 80 def tearDown(self): 81 try: 82 os.unlink(FILENAME) 83 except OSError: 84 pass 85 86 def setUp(self): 87 try: 88 os.unlink(FILENAME) 89 except OSError: 90 pass 91 92 def _create_test_credentials(self, client_id='some_client_id', 93 expiration=None): 94 access_token = 'foo' 95 client_secret = 'cOuDdkfjxxnv+' 96 refresh_token = '1/0/a.df219fjls0' 97 token_expiry = expiration or datetime.datetime.utcnow() 98 token_uri = 'https://www.google.com/accounts/o8/oauth2/token' 99 user_agent = 'refresh_checker/1.0' 100 101 credentials = client.OAuth2Credentials( 102 access_token, client_id, client_secret, 103 refresh_token, token_expiry, token_uri, 104 user_agent) 105 return credentials 106 107 def test_lock_file_raises_ioerror(self): 108 filehandle, filename = tempfile.mkstemp() 109 os.close(filehandle) 110 111 try: 112 for error_code in (errno.EDEADLK, errno.ENOSYS, errno.ENOLCK, 113 errno.EACCES): 114 for error_class in (IOError, OSError): 115 multistore = multistore_file._MultiStore(filename) 116 multistore._file = _MockLockedFile( 117 filename, error_class, error_code) 118 # Should not raise though the underlying file class did. 119 multistore._lock() 120 self.assertTrue(multistore._file.open_and_lock_called) 121 finally: 122 os.unlink(filename) 123 124 def test_lock_file_raise_unexpected_error(self): 125 filehandle, filename = tempfile.mkstemp() 126 os.close(filehandle) 127 128 try: 129 multistore = multistore_file._MultiStore(filename) 130 multistore._file = _MockLockedFile(filename, IOError, errno.EBUSY) 131 with self.assertRaises(IOError): 132 multistore._lock() 133 self.assertTrue(multistore._file.open_and_lock_called) 134 finally: 135 os.unlink(filename) 136 137 def test_read_only_file_fail_lock(self): 138 credentials = self._create_test_credentials() 139 140 open(FILENAME, 'a+b').close() 141 os.chmod(FILENAME, 0o400) 142 143 store = multistore_file.get_credential_storage( 144 FILENAME, 145 credentials.client_id, 146 credentials.user_agent, 147 ['some-scope', 'some-other-scope']) 148 149 store.put(credentials) 150 if os.name == 'posix': # pragma: NO COVER 151 self.assertTrue(store._multistore._read_only) 152 os.chmod(FILENAME, 0o600) 153 154 def test_read_only_file_fail_lock_no_warning(self): 155 open(FILENAME, 'a+b').close() 156 os.chmod(FILENAME, 0o400) 157 158 multistore = multistore_file._MultiStore(FILENAME) 159 160 with mock.patch.object(multistore_file.logger, 'warn') as mock_warn: 161 multistore._warn_on_readonly = False 162 multistore._lock() 163 self.assertFalse(mock_warn.called) 164 165 def test_lock_skip_refresh(self): 166 with open(FILENAME, 'w') as f: 167 f.write('123') 168 os.chmod(FILENAME, 0o400) 169 170 multistore = multistore_file._MultiStore(FILENAME) 171 172 refresh_patch = mock.patch.object( 173 multistore, '_refresh_data_cache') 174 175 with refresh_patch as refresh_mock: 176 multistore._data = {} 177 multistore._lock() 178 self.assertFalse(refresh_mock.called) 179 180 @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available') 181 def test_multistore_no_symbolic_link_files(self): 182 SYMFILENAME = FILENAME + 'sym' 183 os.symlink(FILENAME, SYMFILENAME) 184 store = multistore_file.get_credential_storage( 185 SYMFILENAME, 186 'some_client_id', 187 'user-agent/1.0', 188 ['some-scope', 'some-other-scope']) 189 try: 190 with self.assertRaises( 191 locked_file.CredentialsFileSymbolicLinkError): 192 store.get() 193 finally: 194 os.unlink(SYMFILENAME) 195 196 def test_multistore_non_existent_file(self): 197 store = multistore_file.get_credential_storage( 198 FILENAME, 199 'some_client_id', 200 'user-agent/1.0', 201 ['some-scope', 'some-other-scope']) 202 203 credentials = store.get() 204 self.assertEquals(None, credentials) 205 206 def test_multistore_file(self): 207 credentials = self._create_test_credentials() 208 209 store = multistore_file.get_credential_storage( 210 FILENAME, 211 credentials.client_id, 212 credentials.user_agent, 213 ['some-scope', 'some-other-scope']) 214 215 # Save credentials 216 store.put(credentials) 217 credentials = store.get() 218 219 self.assertNotEquals(None, credentials) 220 self.assertEquals('foo', credentials.access_token) 221 222 # Delete credentials 223 store.delete() 224 credentials = store.get() 225 226 self.assertEquals(None, credentials) 227 228 if os.name == 'posix': # pragma: NO COVER 229 self.assertEquals( 230 0o600, stat.S_IMODE(os.stat(FILENAME).st_mode)) 231 232 def test_multistore_file_custom_key(self): 233 credentials = self._create_test_credentials() 234 235 custom_key = {'myapp': 'testing', 'clientid': 'some client'} 236 store = multistore_file.get_credential_storage_custom_key( 237 FILENAME, custom_key) 238 239 store.put(credentials) 240 stored_credentials = store.get() 241 242 self.assertNotEquals(None, stored_credentials) 243 self.assertEqual(credentials.access_token, 244 stored_credentials.access_token) 245 246 store.delete() 247 stored_credentials = store.get() 248 249 self.assertEquals(None, stored_credentials) 250 251 def test_multistore_file_custom_string_key(self): 252 credentials = self._create_test_credentials() 253 254 # store with string key 255 store = multistore_file.get_credential_storage_custom_string_key( 256 FILENAME, 'mykey') 257 258 store.put(credentials) 259 stored_credentials = store.get() 260 261 self.assertNotEquals(None, stored_credentials) 262 self.assertEqual(credentials.access_token, 263 stored_credentials.access_token) 264 265 # try retrieving with a dictionary 266 multistore_file.get_credential_storage_custom_string_key( 267 FILENAME, {'key': 'mykey'}) 268 stored_credentials = store.get() 269 self.assertNotEquals(None, stored_credentials) 270 self.assertEqual(credentials.access_token, 271 stored_credentials.access_token) 272 273 store.delete() 274 stored_credentials = store.get() 275 276 self.assertEquals(None, stored_credentials) 277 278 def test_multistore_file_backwards_compatibility(self): 279 credentials = self._create_test_credentials() 280 scopes = ['scope1', 'scope2'] 281 282 # store the credentials using the legacy key method 283 store = multistore_file.get_credential_storage( 284 FILENAME, 'client_id', 'user_agent', scopes) 285 store.put(credentials) 286 287 # retrieve the credentials using a custom key that matches the 288 # legacy key 289 key = {'clientId': 'client_id', 'userAgent': 'user_agent', 290 'scope': util.scopes_to_string(scopes)} 291 store = multistore_file.get_credential_storage_custom_key( 292 FILENAME, key) 293 stored_credentials = store.get() 294 295 self.assertEqual(credentials.access_token, 296 stored_credentials.access_token) 297 298 def test_multistore_file_get_all_keys(self): 299 # start with no keys 300 keys = multistore_file.get_all_credential_keys(FILENAME) 301 self.assertEquals([], keys) 302 303 # store credentials 304 credentials = self._create_test_credentials(client_id='client1') 305 custom_key = {'myapp': 'testing', 'clientid': 'client1'} 306 store1 = multistore_file.get_credential_storage_custom_key( 307 FILENAME, custom_key) 308 store1.put(credentials) 309 310 keys = multistore_file.get_all_credential_keys(FILENAME) 311 self.assertEquals([custom_key], keys) 312 313 # store more credentials 314 credentials = self._create_test_credentials(client_id='client2') 315 string_key = 'string_key' 316 store2 = multistore_file.get_credential_storage_custom_string_key( 317 FILENAME, string_key) 318 store2.put(credentials) 319 320 keys = multistore_file.get_all_credential_keys(FILENAME) 321 self.assertEquals(2, len(keys)) 322 self.assertTrue(custom_key in keys) 323 self.assertTrue({'key': string_key} in keys) 324 325 # back to no keys 326 store1.delete() 327 store2.delete() 328 keys = multistore_file.get_all_credential_keys(FILENAME) 329 self.assertEquals([], keys) 330 331 def _refresh_data_cache_helper(self): 332 multistore = multistore_file._MultiStore(FILENAME) 333 json_patch = mock.patch.object(multistore, '_locked_json_read') 334 335 return multistore, json_patch 336 337 def test__refresh_data_cache_bad_json(self): 338 multistore, json_patch = self._refresh_data_cache_helper() 339 340 with json_patch as json_mock: 341 json_mock.side_effect = ValueError('') 342 multistore._refresh_data_cache() 343 self.assertTrue(json_mock.called) 344 self.assertEqual(multistore._data, {}) 345 346 def test__refresh_data_cache_bad_version(self): 347 multistore, json_patch = self._refresh_data_cache_helper() 348 349 with json_patch as json_mock: 350 json_mock.return_value = {} 351 multistore._refresh_data_cache() 352 self.assertTrue(json_mock.called) 353 self.assertEqual(multistore._data, {}) 354 355 def test__refresh_data_cache_newer_version(self): 356 multistore, json_patch = self._refresh_data_cache_helper() 357 358 with json_patch as json_mock: 359 json_mock.return_value = {'file_version': 5} 360 with self.assertRaises(multistore_file.NewerCredentialStoreError): 361 multistore._refresh_data_cache() 362 self.assertTrue(json_mock.called) 363 364 def test__refresh_data_cache_bad_credentials(self): 365 multistore, json_patch = self._refresh_data_cache_helper() 366 367 with json_patch as json_mock: 368 json_mock.return_value = { 369 'file_version': 1, 370 'data': [ 371 {'lol': 'this is a bad credential object.'} 372 ]} 373 multistore._refresh_data_cache() 374 self.assertTrue(json_mock.called) 375 self.assertEqual(multistore._data, {}) 376 377 def test__delete_credential_nonexistent(self): 378 multistore = multistore_file._MultiStore(FILENAME) 379 380 with mock.patch.object(multistore, '_write') as write_mock: 381 multistore._data = {} 382 multistore._delete_credential('nonexistent_key') 383 self.assertTrue(write_mock.called) 384