1# Copyright 2014 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Oauth2client tests 16 17Unit tests for oauth2client. 18""" 19 20import base64 21import contextlib 22import copy 23import datetime 24import json 25import os 26import socket 27import sys 28import tempfile 29 30import httplib2 31import mock 32import six 33from six.moves import http_client 34from six.moves import urllib 35import unittest2 36 37import oauth2client 38from oauth2client import _helpers 39from oauth2client import client 40from oauth2client import clientsecrets 41from oauth2client import service_account 42from oauth2client import util 43from .http_mock import CacheMock 44from .http_mock import HttpMock 45from .http_mock import HttpMockSequence 46 47__author__ = 'jcgregorio@google.com (Joe Gregorio)' 48 49DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') 50 51 52# TODO(craigcitro): This is duplicated from 53# googleapiclient.test_discovery; consolidate these definitions. 54def assertUrisEqual(testcase, expected, actual): 55 """Test that URIs are the same, up to reordering of query parameters.""" 56 expected = urllib.parse.urlparse(expected) 57 actual = urllib.parse.urlparse(actual) 58 testcase.assertEqual(expected.scheme, actual.scheme) 59 testcase.assertEqual(expected.netloc, actual.netloc) 60 testcase.assertEqual(expected.path, actual.path) 61 testcase.assertEqual(expected.params, actual.params) 62 testcase.assertEqual(expected.fragment, actual.fragment) 63 expected_query = urllib.parse.parse_qs(expected.query) 64 actual_query = urllib.parse.parse_qs(actual.query) 65 for name in expected_query.keys(): 66 testcase.assertEqual(expected_query[name], actual_query[name]) 67 for name in actual_query.keys(): 68 testcase.assertEqual(expected_query[name], actual_query[name]) 69 70 71def datafile(filename): 72 return os.path.join(DATA_DIR, filename) 73 74 75def load_and_cache(existing_file, fakename, cache_mock): 76 client_type, client_info = clientsecrets._loadfile(datafile(existing_file)) 77 cache_mock.cache[fakename] = {client_type: client_info} 78 79 80class CredentialsTests(unittest2.TestCase): 81 82 def test_to_from_json(self): 83 credentials = client.Credentials() 84 json = credentials.to_json() 85 client.Credentials.new_from_json(json) 86 87 def test_authorize_abstract(self): 88 credentials = client.Credentials() 89 http = object() 90 with self.assertRaises(NotImplementedError): 91 credentials.authorize(http) 92 93 def test_refresh_abstract(self): 94 credentials = client.Credentials() 95 http = object() 96 with self.assertRaises(NotImplementedError): 97 credentials.refresh(http) 98 99 def test_revoke_abstract(self): 100 credentials = client.Credentials() 101 http = object() 102 with self.assertRaises(NotImplementedError): 103 credentials.revoke(http) 104 105 def test_apply_abstract(self): 106 credentials = client.Credentials() 107 headers = {} 108 with self.assertRaises(NotImplementedError): 109 credentials.apply(headers) 110 111 def test__to_json_basic(self): 112 credentials = client.Credentials() 113 json_payload = credentials._to_json([]) 114 # str(bytes) in Python2 and str(unicode) in Python3 115 self.assertIsInstance(json_payload, str) 116 payload = json.loads(json_payload) 117 expected_payload = { 118 '_class': client.Credentials.__name__, 119 '_module': client.Credentials.__module__, 120 'token_expiry': None, 121 } 122 self.assertEqual(payload, expected_payload) 123 124 def test__to_json_with_strip(self): 125 credentials = client.Credentials() 126 credentials.foo = 'bar' 127 credentials.baz = 'quux' 128 to_strip = ['foo'] 129 json_payload = credentials._to_json(to_strip) 130 # str(bytes) in Python2 and str(unicode) in Python3 131 self.assertIsInstance(json_payload, str) 132 payload = json.loads(json_payload) 133 expected_payload = { 134 '_class': client.Credentials.__name__, 135 '_module': client.Credentials.__module__, 136 'token_expiry': None, 137 'baz': credentials.baz, 138 } 139 self.assertEqual(payload, expected_payload) 140 141 def test__to_json_to_serialize(self): 142 credentials = client.Credentials() 143 to_serialize = { 144 'foo': b'bar', 145 'baz': u'quux', 146 'st': set(['a', 'b']), 147 } 148 orig_vals = to_serialize.copy() 149 json_payload = credentials._to_json([], to_serialize=to_serialize) 150 # str(bytes) in Python2 and str(unicode) in Python3 151 self.assertIsInstance(json_payload, str) 152 payload = json.loads(json_payload) 153 expected_payload = { 154 '_class': client.Credentials.__name__, 155 '_module': client.Credentials.__module__, 156 'token_expiry': None, 157 } 158 expected_payload.update(to_serialize) 159 # Special-case the set. 160 expected_payload['st'] = list(expected_payload['st']) 161 # Special-case the bytes. 162 expected_payload['foo'] = u'bar' 163 self.assertEqual(payload, expected_payload) 164 # Make sure the method call didn't modify our dictionary. 165 self.assertEqual(to_serialize, orig_vals) 166 167 @mock.patch.object(client.Credentials, '_to_json', 168 return_value=object()) 169 def test_to_json(self, to_json): 170 credentials = client.Credentials() 171 self.assertEqual(credentials.to_json(), to_json.return_value) 172 to_json.assert_called_once_with( 173 client.Credentials.NON_SERIALIZED_MEMBERS) 174 175 def test_new_from_json_no_data(self): 176 creds_data = {} 177 json_data = json.dumps(creds_data) 178 with self.assertRaises(KeyError): 179 client.Credentials.new_from_json(json_data) 180 181 def test_new_from_json_basic_data(self): 182 creds_data = { 183 '_module': 'oauth2client.client', 184 '_class': 'Credentials', 185 } 186 json_data = json.dumps(creds_data) 187 credentials = client.Credentials.new_from_json(json_data) 188 self.assertIsInstance(credentials, client.Credentials) 189 190 def test_new_from_json_old_name(self): 191 creds_data = { 192 '_module': 'oauth2client.googleapiclient.client', 193 '_class': 'Credentials', 194 } 195 json_data = json.dumps(creds_data) 196 credentials = client.Credentials.new_from_json(json_data) 197 self.assertIsInstance(credentials, client.Credentials) 198 199 def test_new_from_json_bad_module(self): 200 creds_data = { 201 '_module': 'oauth2client.foobar', 202 '_class': 'Credentials', 203 } 204 json_data = json.dumps(creds_data) 205 with self.assertRaises(ImportError): 206 client.Credentials.new_from_json(json_data) 207 208 def test_new_from_json_bad_class(self): 209 creds_data = { 210 '_module': 'oauth2client.client', 211 '_class': 'NopeNotCredentials', 212 } 213 json_data = json.dumps(creds_data) 214 with self.assertRaises(AttributeError): 215 client.Credentials.new_from_json(json_data) 216 217 def test_from_json(self): 218 unused_data = {} 219 credentials = client.Credentials.from_json(unused_data) 220 self.assertIsInstance(credentials, client.Credentials) 221 self.assertEqual(credentials.__dict__, {}) 222 223 224class TestStorage(unittest2.TestCase): 225 226 def test_locked_get_abstract(self): 227 storage = client.Storage() 228 with self.assertRaises(NotImplementedError): 229 storage.locked_get() 230 231 def test_locked_put_abstract(self): 232 storage = client.Storage() 233 credentials = object() 234 with self.assertRaises(NotImplementedError): 235 storage.locked_put(credentials) 236 237 def test_locked_delete_abstract(self): 238 storage = client.Storage() 239 with self.assertRaises(NotImplementedError): 240 storage.locked_delete() 241 242 243@contextlib.contextmanager 244def mock_module_import(module): 245 """Place a dummy objects in sys.modules to mock an import test.""" 246 parts = module.split('.') 247 entries = ['.'.join(parts[:i + 1]) for i in range(len(parts))] 248 for entry in entries: 249 sys.modules[entry] = object() 250 251 try: 252 yield 253 254 finally: 255 for entry in entries: 256 del sys.modules[entry] 257 258 259class GoogleCredentialsTests(unittest2.TestCase): 260 261 def setUp(self): 262 self.os_name = os.name 263 client.SETTINGS.env_name = None 264 265 def tearDown(self): 266 self.reset_env('SERVER_SOFTWARE') 267 self.reset_env(client.GOOGLE_APPLICATION_CREDENTIALS) 268 self.reset_env('APPDATA') 269 os.name = self.os_name 270 271 def reset_env(self, env): 272 """Set the environment variable 'env' to 'value'.""" 273 os.environ.pop(env, None) 274 275 def validate_service_account_credentials(self, credentials): 276 self.assertIsInstance( 277 credentials, service_account.ServiceAccountCredentials) 278 self.assertEqual('123', credentials.client_id) 279 self.assertEqual('dummy@google.com', 280 credentials._service_account_email) 281 self.assertEqual('ABCDEF', credentials._private_key_id) 282 self.assertEqual('', credentials._scopes) 283 284 def validate_google_credentials(self, credentials): 285 self.assertIsInstance(credentials, client.GoogleCredentials) 286 self.assertEqual(None, credentials.access_token) 287 self.assertEqual('123', credentials.client_id) 288 self.assertEqual('secret', credentials.client_secret) 289 self.assertEqual('alabalaportocala', credentials.refresh_token) 290 self.assertEqual(None, credentials.token_expiry) 291 self.assertEqual(oauth2client.GOOGLE_TOKEN_URI, credentials.token_uri) 292 self.assertEqual('Python client library', credentials.user_agent) 293 294 def get_a_google_credentials_object(self): 295 return client.GoogleCredentials(None, None, None, None, 296 None, None, None, None) 297 298 def test_create_scoped_required(self): 299 self.assertFalse( 300 self.get_a_google_credentials_object().create_scoped_required()) 301 302 def test_create_scoped(self): 303 credentials = self.get_a_google_credentials_object() 304 self.assertEqual(credentials, credentials.create_scoped(None)) 305 self.assertEqual(credentials, 306 credentials.create_scoped(['dummy_scope'])) 307 308 @mock.patch.object(client.GoogleCredentials, 309 '_implicit_credentials_from_files', 310 return_value=None) 311 @mock.patch.object(client.GoogleCredentials, 312 '_implicit_credentials_from_gce') 313 @mock.patch.object(client, '_in_gae_environment', 314 return_value=True) 315 @mock.patch.object(client, '_get_application_default_credential_GAE', 316 return_value=object()) 317 def test_get_application_default_in_gae(self, gae_adc, in_gae, 318 from_gce, from_files): 319 credentials = client.GoogleCredentials.get_application_default() 320 self.assertEqual(credentials, gae_adc.return_value) 321 from_files.assert_called_once_with() 322 in_gae.assert_called_once_with() 323 from_gce.assert_not_called() 324 325 @mock.patch.object(client.GoogleCredentials, 326 '_implicit_credentials_from_gae', 327 return_value=None) 328 @mock.patch.object(client.GoogleCredentials, 329 '_implicit_credentials_from_files', 330 return_value=None) 331 @mock.patch.object(client, '_in_gce_environment', 332 return_value=True) 333 @mock.patch.object(client, '_get_application_default_credential_GCE', 334 return_value=object()) 335 def test_get_application_default_in_gce(self, gce_adc, in_gce, 336 from_files, from_gae): 337 credentials = client.GoogleCredentials.get_application_default() 338 self.assertEqual(credentials, gce_adc.return_value) 339 in_gce.assert_called_once_with() 340 from_gae.assert_called_once_with() 341 from_files.assert_called_once_with() 342 343 def test_environment_check_gae_production(self): 344 with mock_module_import('google.appengine'): 345 self._environment_check_gce_helper( 346 server_software='Google App Engine/XYZ') 347 348 def test_environment_check_gae_local(self): 349 with mock_module_import('google.appengine'): 350 self._environment_check_gce_helper( 351 server_software='Development/XYZ') 352 353 def test_environment_check_fastpath(self): 354 with mock_module_import('google.appengine'): 355 self._environment_check_gce_helper( 356 server_software='Development/XYZ') 357 358 def test_environment_caching(self): 359 os.environ['SERVER_SOFTWARE'] = 'Development/XYZ' 360 with mock_module_import('google.appengine'): 361 self.assertTrue(client._in_gae_environment()) 362 os.environ['SERVER_SOFTWARE'] = '' 363 # Even though we no longer pass the environment check, it 364 # is cached. 365 self.assertTrue(client._in_gae_environment()) 366 367 def _environment_check_gce_helper(self, status_ok=True, socket_error=False, 368 server_software=''): 369 response = mock.MagicMock() 370 if status_ok: 371 response.status = http_client.OK 372 response.getheader = mock.MagicMock( 373 name='getheader', 374 return_value=client._DESIRED_METADATA_FLAVOR) 375 else: 376 response.status = http_client.NOT_FOUND 377 378 connection = mock.MagicMock() 379 connection.getresponse = mock.MagicMock(name='getresponse', 380 return_value=response) 381 if socket_error: 382 connection.getresponse.side_effect = socket.error() 383 384 with mock.patch('oauth2client.client.os') as os_module: 385 os_module.environ = {client._SERVER_SOFTWARE: server_software} 386 with mock.patch('oauth2client.client.six') as six_module: 387 http_client_module = six_module.moves.http_client 388 http_client_module.HTTPConnection = mock.MagicMock( 389 name='HTTPConnection', return_value=connection) 390 391 if server_software == '': 392 self.assertFalse(client._in_gae_environment()) 393 else: 394 self.assertTrue(client._in_gae_environment()) 395 396 if status_ok and not socket_error and server_software == '': 397 self.assertTrue(client._in_gce_environment()) 398 else: 399 self.assertFalse(client._in_gce_environment()) 400 401 if server_software == '': 402 http_client_module.HTTPConnection.assert_called_once_with( 403 client._GCE_METADATA_HOST, 404 timeout=client.GCE_METADATA_TIMEOUT) 405 connection.getresponse.assert_called_once_with() 406 # Remaining calls are not "getresponse" 407 headers = { 408 client._METADATA_FLAVOR_HEADER: ( 409 client._DESIRED_METADATA_FLAVOR), 410 } 411 self.assertEqual(connection.method_calls, [ 412 mock.call.request('GET', '/', 413 headers=headers), 414 mock.call.close(), 415 ]) 416 self.assertEqual(response.method_calls, []) 417 if status_ok and not socket_error: 418 response.getheader.assert_called_once_with( 419 client._METADATA_FLAVOR_HEADER) 420 else: 421 self.assertEqual( 422 http_client_module.HTTPConnection.mock_calls, []) 423 self.assertEqual(connection.getresponse.mock_calls, []) 424 # Remaining calls are not "getresponse" 425 self.assertEqual(connection.method_calls, []) 426 self.assertEqual(response.method_calls, []) 427 self.assertEqual(response.getheader.mock_calls, []) 428 429 def test_environment_check_gce_production(self): 430 self._environment_check_gce_helper(status_ok=True) 431 432 def test_environment_check_gce_prod_with_working_gae_imports(self): 433 with mock_module_import('google.appengine'): 434 self._environment_check_gce_helper(status_ok=True) 435 436 def test_environment_check_gce_timeout(self): 437 self._environment_check_gce_helper(socket_error=True) 438 439 def test_environ_check_gae_module_unknown(self): 440 with mock_module_import('google.appengine'): 441 self._environment_check_gce_helper(status_ok=False) 442 443 def test_environment_check_unknown(self): 444 self._environment_check_gce_helper(status_ok=False) 445 446 def test_get_environment_variable_file(self): 447 environment_variable_file = datafile( 448 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 449 os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = ( 450 environment_variable_file) 451 self.assertEqual(environment_variable_file, 452 client._get_environment_variable_file()) 453 454 def test_get_environment_variable_file_error(self): 455 nonexistent_file = datafile('nonexistent') 456 os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file 457 expected_err_msg = ( 458 'File {0} \(pointed by {1} environment variable\) does not ' 459 'exist!'.format( 460 nonexistent_file, client.GOOGLE_APPLICATION_CREDENTIALS)) 461 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 462 expected_err_msg): 463 client._get_environment_variable_file() 464 465 @mock.patch.dict(os.environ, {}, clear=True) 466 def test_get_environment_variable_file_without_env_var(self): 467 self.assertIsNone(client._get_environment_variable_file()) 468 469 @mock.patch('os.name', new='nt') 470 @mock.patch.dict(os.environ, {'APPDATA': DATA_DIR}, clear=True) 471 def test_get_well_known_file_on_windows(self): 472 well_known_file = datafile( 473 os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY, 474 client._WELL_KNOWN_CREDENTIALS_FILE)) 475 self.assertEqual(well_known_file, client._get_well_known_file()) 476 477 @mock.patch('os.name', new='nt') 478 @mock.patch.dict(os.environ, {'SystemDrive': 'G:'}, clear=True) 479 def test_get_well_known_file_on_windows_without_appdata(self): 480 well_known_file = os.path.join('G:', '\\', 481 client._CLOUDSDK_CONFIG_DIRECTORY, 482 client._WELL_KNOWN_CREDENTIALS_FILE) 483 self.assertEqual(well_known_file, client._get_well_known_file()) 484 485 @mock.patch.dict(os.environ, 486 {client._CLOUDSDK_CONFIG_ENV_VAR: 'CUSTOM_DIR'}, 487 clear=True) 488 def test_get_well_known_file_with_custom_config_dir(self): 489 CUSTOM_DIR = os.environ[client._CLOUDSDK_CONFIG_ENV_VAR] 490 EXPECTED_FILE = os.path.join(CUSTOM_DIR, 491 client._WELL_KNOWN_CREDENTIALS_FILE) 492 well_known_file = client._get_well_known_file() 493 self.assertEqual(well_known_file, EXPECTED_FILE) 494 495 def test_get_adc_from_file_service_account(self): 496 credentials_file = datafile( 497 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 498 credentials = client._get_application_default_credential_from_file( 499 credentials_file) 500 self.validate_service_account_credentials(credentials) 501 502 def test_save_to_well_known_file_service_account(self): 503 credential_file = datafile( 504 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 505 credentials = client._get_application_default_credential_from_file( 506 credential_file) 507 temp_credential_file = datafile( 508 os.path.join('gcloud', 509 'temp_well_known_file_service_account.json')) 510 client.save_to_well_known_file(credentials, temp_credential_file) 511 with open(temp_credential_file) as f: 512 d = json.load(f) 513 self.assertEqual('service_account', d['type']) 514 self.assertEqual('123', d['client_id']) 515 self.assertEqual('dummy@google.com', d['client_email']) 516 self.assertEqual('ABCDEF', d['private_key_id']) 517 os.remove(temp_credential_file) 518 519 @mock.patch('os.path.isdir', return_value=False) 520 def test_save_well_known_file_with_non_existent_config_dir(self, 521 isdir_mock): 522 credential_file = datafile( 523 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 524 credentials = client._get_application_default_credential_from_file( 525 credential_file) 526 with self.assertRaises(OSError): 527 client.save_to_well_known_file(credentials) 528 config_dir = os.path.join(os.path.expanduser('~'), '.config', 'gcloud') 529 isdir_mock.assert_called_once_with(config_dir) 530 531 def test_get_adc_from_file_authorized_user(self): 532 credentials_file = datafile(os.path.join( 533 'gcloud', 534 'application_default_credentials_authorized_user.json')) 535 credentials = client._get_application_default_credential_from_file( 536 credentials_file) 537 self.validate_google_credentials(credentials) 538 539 def test_save_to_well_known_file_authorized_user(self): 540 credentials_file = datafile(os.path.join( 541 'gcloud', 542 'application_default_credentials_authorized_user.json')) 543 credentials = client._get_application_default_credential_from_file( 544 credentials_file) 545 temp_credential_file = datafile( 546 os.path.join('gcloud', 547 'temp_well_known_file_authorized_user.json')) 548 client.save_to_well_known_file(credentials, temp_credential_file) 549 with open(temp_credential_file) as f: 550 d = json.load(f) 551 self.assertEqual('authorized_user', d['type']) 552 self.assertEqual('123', d['client_id']) 553 self.assertEqual('secret', d['client_secret']) 554 self.assertEqual('alabalaportocala', d['refresh_token']) 555 os.remove(temp_credential_file) 556 557 def test_get_application_default_credential_from_malformed_file_1(self): 558 credentials_file = datafile( 559 os.path.join('gcloud', 560 'application_default_credentials_malformed_1.json')) 561 expected_err_msg = ( 562 "'type' field should be defined \(and have one of the '{0}' or " 563 "'{1}' values\)".format(client.AUTHORIZED_USER, 564 client.SERVICE_ACCOUNT)) 565 566 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 567 expected_err_msg): 568 client._get_application_default_credential_from_file( 569 credentials_file) 570 571 def test_get_application_default_credential_from_malformed_file_2(self): 572 credentials_file = datafile( 573 os.path.join('gcloud', 574 'application_default_credentials_malformed_2.json')) 575 expected_err_msg = ( 576 'The following field\(s\) must be defined: private_key_id') 577 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 578 expected_err_msg): 579 client._get_application_default_credential_from_file( 580 credentials_file) 581 582 def test_get_application_default_credential_from_malformed_file_3(self): 583 credentials_file = datafile( 584 os.path.join('gcloud', 585 'application_default_credentials_malformed_3.json')) 586 with self.assertRaises(ValueError): 587 client._get_application_default_credential_from_file( 588 credentials_file) 589 590 def test_raise_exception_for_missing_fields(self): 591 missing_fields = ['first', 'second', 'third'] 592 expected_err_msg = ('The following field\(s\) must be defined: ' + 593 ', '.join(missing_fields)) 594 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 595 expected_err_msg): 596 client._raise_exception_for_missing_fields(missing_fields) 597 598 def test_raise_exception_for_reading_json(self): 599 credential_file = 'any_file' 600 extra_help = ' be good' 601 error = client.ApplicationDefaultCredentialsError('stuff happens') 602 expected_err_msg = ('An error was encountered while reading ' 603 'json file: ' + credential_file + 604 extra_help + ': ' + str(error)) 605 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 606 expected_err_msg): 607 client._raise_exception_for_reading_json( 608 credential_file, extra_help, error) 609 610 @mock.patch('oauth2client.client._in_gce_environment') 611 @mock.patch('oauth2client.client._in_gae_environment', return_value=False) 612 @mock.patch('oauth2client.client._get_environment_variable_file') 613 @mock.patch('oauth2client.client._get_well_known_file') 614 def test_get_adc_from_env_var_service_account(self, *stubs): 615 # Set up stubs. 616 get_well_known, get_env_file, in_gae, in_gce = stubs 617 get_env_file.return_value = datafile( 618 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 619 620 credentials = client.GoogleCredentials.get_application_default() 621 self.validate_service_account_credentials(credentials) 622 623 get_env_file.assert_called_once_with() 624 get_well_known.assert_not_called() 625 in_gae.assert_not_called() 626 in_gce.assert_not_called() 627 628 def test_env_name(self): 629 self.assertEqual(None, client.SETTINGS.env_name) 630 self.test_get_adc_from_env_var_service_account() 631 self.assertEqual(client.DEFAULT_ENV_NAME, client.SETTINGS.env_name) 632 633 @mock.patch('oauth2client.client._in_gce_environment') 634 @mock.patch('oauth2client.client._in_gae_environment', return_value=False) 635 @mock.patch('oauth2client.client._get_environment_variable_file') 636 @mock.patch('oauth2client.client._get_well_known_file') 637 def test_get_adc_from_env_var_authorized_user(self, *stubs): 638 # Set up stubs. 639 get_well_known, get_env_file, in_gae, in_gce = stubs 640 get_env_file.return_value = datafile(os.path.join( 641 'gcloud', 642 'application_default_credentials_authorized_user.json')) 643 644 credentials = client.GoogleCredentials.get_application_default() 645 self.validate_google_credentials(credentials) 646 647 get_env_file.assert_called_once_with() 648 get_well_known.assert_not_called() 649 in_gae.assert_not_called() 650 in_gce.assert_not_called() 651 652 @mock.patch('oauth2client.client._in_gce_environment') 653 @mock.patch('oauth2client.client._in_gae_environment', return_value=False) 654 @mock.patch('oauth2client.client._get_environment_variable_file') 655 @mock.patch('oauth2client.client._get_well_known_file') 656 def test_get_adc_from_env_var_malformed_file(self, *stubs): 657 # Set up stubs. 658 get_well_known, get_env_file, in_gae, in_gce = stubs 659 get_env_file.return_value = datafile( 660 os.path.join('gcloud', 661 'application_default_credentials_malformed_3.json')) 662 663 expected_err = client.ApplicationDefaultCredentialsError 664 with self.assertRaises(expected_err) as exc_manager: 665 client.GoogleCredentials.get_application_default() 666 667 self.assertTrue(str(exc_manager.exception).startswith( 668 'An error was encountered while reading json file: ' + 669 get_env_file.return_value + ' (pointed to by ' + 670 client.GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):')) 671 672 get_env_file.assert_called_once_with() 673 get_well_known.assert_not_called() 674 in_gae.assert_not_called() 675 in_gce.assert_not_called() 676 677 @mock.patch('oauth2client.client._in_gce_environment', return_value=False) 678 @mock.patch('oauth2client.client._in_gae_environment', return_value=False) 679 @mock.patch('oauth2client.client._get_environment_variable_file', 680 return_value=None) 681 @mock.patch('oauth2client.client._get_well_known_file', 682 return_value='BOGUS_FILE') 683 def test_get_adc_env_not_set_up(self, *stubs): 684 # Unpack stubs. 685 get_well_known, get_env_file, in_gae, in_gce = stubs 686 # Make sure the well-known file actually doesn't exist. 687 self.assertFalse(os.path.exists(get_well_known.return_value)) 688 689 expected_err = client.ApplicationDefaultCredentialsError 690 with self.assertRaises(expected_err) as exc_manager: 691 client.GoogleCredentials.get_application_default() 692 693 self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception)) 694 get_env_file.assert_called_once_with() 695 get_well_known.assert_called_once_with() 696 in_gae.assert_called_once_with() 697 in_gce.assert_called_once_with() 698 699 @mock.patch('oauth2client.client._in_gce_environment', return_value=False) 700 @mock.patch('oauth2client.client._in_gae_environment', return_value=False) 701 @mock.patch('oauth2client.client._get_environment_variable_file', 702 return_value=None) 703 @mock.patch('oauth2client.client._get_well_known_file') 704 def test_get_adc_env_from_well_known(self, *stubs): 705 # Unpack stubs. 706 get_well_known, get_env_file, in_gae, in_gce = stubs 707 # Make sure the well-known file is an actual file. 708 get_well_known.return_value = __file__ 709 # Make sure the well-known file actually doesn't exist. 710 self.assertTrue(os.path.exists(get_well_known.return_value)) 711 712 method_name = \ 713 'oauth2client.client._get_application_default_credential_from_file' 714 result_creds = object() 715 with mock.patch(method_name, 716 return_value=result_creds) as get_from_file: 717 result = client.GoogleCredentials.get_application_default() 718 self.assertEqual(result, result_creds) 719 get_from_file.assert_called_once_with(__file__) 720 721 get_env_file.assert_called_once_with() 722 get_well_known.assert_called_once_with() 723 in_gae.assert_not_called() 724 in_gce.assert_not_called() 725 726 def test_from_stream_service_account(self): 727 credentials_file = datafile( 728 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 729 credentials = self.get_a_google_credentials_object().from_stream( 730 credentials_file) 731 self.validate_service_account_credentials(credentials) 732 733 def test_from_stream_authorized_user(self): 734 credentials_file = datafile(os.path.join( 735 'gcloud', 736 'application_default_credentials_authorized_user.json')) 737 credentials = self.get_a_google_credentials_object().from_stream( 738 credentials_file) 739 self.validate_google_credentials(credentials) 740 741 def test_from_stream_missing_file(self): 742 credentials_filename = None 743 expected_err_msg = (r'The parameter passed to the from_stream\(\) ' 744 r'method should point to a file.') 745 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 746 expected_err_msg): 747 self.get_a_google_credentials_object().from_stream( 748 credentials_filename) 749 750 def test_from_stream_malformed_file_1(self): 751 credentials_file = datafile( 752 os.path.join('gcloud', 753 'application_default_credentials_malformed_1.json')) 754 expected_err_msg = ( 755 'An error was encountered while reading json file: ' + 756 credentials_file + 757 ' \(provided as parameter to the from_stream\(\) method\): ' + 758 "'type' field should be defined \(and have one of the '" + 759 client.AUTHORIZED_USER + "' or '" + client.SERVICE_ACCOUNT + 760 "' values\)") 761 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 762 expected_err_msg): 763 self.get_a_google_credentials_object().from_stream( 764 credentials_file) 765 766 def test_from_stream_malformed_file_2(self): 767 credentials_file = datafile( 768 os.path.join('gcloud', 769 'application_default_credentials_malformed_2.json')) 770 expected_err_msg = ( 771 'An error was encountered while reading json file: ' + 772 credentials_file + 773 ' \(provided as parameter to the from_stream\(\) method\): ' 774 'The following field\(s\) must be defined: ' 775 'private_key_id') 776 with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError, 777 expected_err_msg): 778 self.get_a_google_credentials_object().from_stream( 779 credentials_file) 780 781 def test_from_stream_malformed_file_3(self): 782 credentials_file = datafile( 783 os.path.join('gcloud', 784 'application_default_credentials_malformed_3.json')) 785 with self.assertRaises(client.ApplicationDefaultCredentialsError): 786 self.get_a_google_credentials_object().from_stream( 787 credentials_file) 788 789 def test_to_from_json_authorized_user(self): 790 filename = 'application_default_credentials_authorized_user.json' 791 credentials_file = datafile(os.path.join('gcloud', filename)) 792 creds = client.GoogleCredentials.from_stream(credentials_file) 793 json = creds.to_json() 794 creds2 = client.GoogleCredentials.from_json(json) 795 796 self.assertEqual(creds.__dict__, creds2.__dict__) 797 798 def test_to_from_json_service_account(self): 799 credentials_file = datafile( 800 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 801 creds1 = client.GoogleCredentials.from_stream(credentials_file) 802 # Convert to and then back from json. 803 creds2 = client.GoogleCredentials.from_json(creds1.to_json()) 804 805 creds1_vals = creds1.__dict__ 806 creds1_vals.pop('_signer') 807 creds2_vals = creds2.__dict__ 808 creds2_vals.pop('_signer') 809 self.assertEqual(creds1_vals, creds2_vals) 810 811 def test_to_from_json_service_account_scoped(self): 812 credentials_file = datafile( 813 os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE)) 814 creds1 = client.GoogleCredentials.from_stream(credentials_file) 815 creds1 = creds1.create_scoped(['dummy_scope']) 816 # Convert to and then back from json. 817 creds2 = client.GoogleCredentials.from_json(creds1.to_json()) 818 819 creds1_vals = creds1.__dict__ 820 creds1_vals.pop('_signer') 821 creds2_vals = creds2.__dict__ 822 creds2_vals.pop('_signer') 823 self.assertEqual(creds1_vals, creds2_vals) 824 825 def test_parse_expiry(self): 826 dt = datetime.datetime(2016, 1, 1) 827 parsed_expiry = client._parse_expiry(dt) 828 self.assertEqual('2016-01-01T00:00:00Z', parsed_expiry) 829 830 def test_bad_expiry(self): 831 dt = object() 832 parsed_expiry = client._parse_expiry(dt) 833 self.assertEqual(None, parsed_expiry) 834 835 836class DummyDeleteStorage(client.Storage): 837 delete_called = False 838 839 def locked_delete(self): 840 self.delete_called = True 841 842 843def _token_revoke_test_helper(testcase, status, revoke_raise, 844 valid_bool_value, token_attr): 845 current_store = getattr(testcase.credentials, 'store', None) 846 847 dummy_store = DummyDeleteStorage() 848 testcase.credentials.set_store(dummy_store) 849 850 actual_do_revoke = testcase.credentials._do_revoke 851 testcase.token_from_revoke = None 852 853 def do_revoke_stub(http_request, token): 854 testcase.token_from_revoke = token 855 return actual_do_revoke(http_request, token) 856 testcase.credentials._do_revoke = do_revoke_stub 857 858 http = HttpMock(headers={'status': status}) 859 if revoke_raise: 860 testcase.assertRaises(client.TokenRevokeError, 861 testcase.credentials.revoke, http) 862 else: 863 testcase.credentials.revoke(http) 864 865 testcase.assertEqual(getattr(testcase.credentials, token_attr), 866 testcase.token_from_revoke) 867 testcase.assertEqual(valid_bool_value, testcase.credentials.invalid) 868 testcase.assertEqual(valid_bool_value, dummy_store.delete_called) 869 870 testcase.credentials.set_store(current_store) 871 872 873class BasicCredentialsTests(unittest2.TestCase): 874 875 def setUp(self): 876 access_token = 'foo' 877 client_id = 'some_client_id' 878 client_secret = 'cOuDdkfjxxnv+' 879 refresh_token = '1/0/a.df219fjls0' 880 token_expiry = datetime.datetime.utcnow() 881 user_agent = 'refresh_checker/1.0' 882 self.credentials = client.OAuth2Credentials( 883 access_token, client_id, client_secret, 884 refresh_token, token_expiry, oauth2client.GOOGLE_TOKEN_URI, 885 user_agent, revoke_uri=oauth2client.GOOGLE_REVOKE_URI, 886 scopes='foo', token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI) 887 888 # Provoke a failure if @util.positional is not respected. 889 self.old_positional_enforcement = ( 890 util.positional_parameters_enforcement) 891 util.positional_parameters_enforcement = ( 892 util.POSITIONAL_EXCEPTION) 893 894 def tearDown(self): 895 util.positional_parameters_enforcement = ( 896 self.old_positional_enforcement) 897 898 def test_token_refresh_success(self): 899 for status_code in client.REFRESH_STATUS_CODES: 900 token_response = {'access_token': '1/3w', 'expires_in': 3600} 901 http = HttpMockSequence([ 902 ({'status': status_code}, b''), 903 ({'status': '200'}, json.dumps(token_response).encode( 904 'utf-8')), 905 ({'status': '200'}, 'echo_request_headers'), 906 ]) 907 http = self.credentials.authorize(http) 908 resp, content = http.request('http://example.com') 909 self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) 910 self.assertFalse(self.credentials.access_token_expired) 911 self.assertEqual(token_response, self.credentials.token_response) 912 913 def test_recursive_authorize(self): 914 """Tests that OAuth2Credentials doesn't intro. new method constraints. 915 916 Formerly, OAuth2Credentials.authorize monkeypatched the request method 917 of its httplib2.Http argument with a wrapper annotated with 918 @util.positional(1). Since the original method has no such annotation, 919 that meant that the wrapper was violating the contract of the original 920 method by adding a new requirement to it. And in fact the wrapper 921 itself doesn't even respect that requirement. So before the removal of 922 the annotation, this test would fail. 923 """ 924 token_response = {'access_token': '1/3w', 'expires_in': 3600} 925 encoded_response = json.dumps(token_response).encode('utf-8') 926 http = HttpMockSequence([ 927 ({'status': '200'}, encoded_response), 928 ]) 929 http = self.credentials.authorize(http) 930 http = self.credentials.authorize(http) 931 http.request('http://example.com') 932 933 def test_token_refresh_failure(self): 934 for status_code in client.REFRESH_STATUS_CODES: 935 http = HttpMockSequence([ 936 ({'status': status_code}, b''), 937 ({'status': http_client.BAD_REQUEST}, 938 b'{"error":"access_denied"}'), 939 ]) 940 http = self.credentials.authorize(http) 941 with self.assertRaises( 942 client.HttpAccessTokenRefreshError) as exc_manager: 943 http.request('http://example.com') 944 self.assertEqual(http_client.BAD_REQUEST, 945 exc_manager.exception.status) 946 self.assertTrue(self.credentials.access_token_expired) 947 self.assertEqual(None, self.credentials.token_response) 948 949 def test_token_revoke_success(self): 950 _token_revoke_test_helper( 951 self, '200', revoke_raise=False, 952 valid_bool_value=True, token_attr='refresh_token') 953 954 def test_token_revoke_failure(self): 955 _token_revoke_test_helper( 956 self, '400', revoke_raise=True, 957 valid_bool_value=False, token_attr='refresh_token') 958 959 def test_token_revoke_fallback(self): 960 original_credentials = self.credentials.to_json() 961 self.credentials.refresh_token = None 962 _token_revoke_test_helper( 963 self, '200', revoke_raise=False, 964 valid_bool_value=True, token_attr='access_token') 965 self.credentials = self.credentials.from_json(original_credentials) 966 967 def test_non_401_error_response(self): 968 http = HttpMockSequence([ 969 ({'status': '400'}, b''), 970 ]) 971 http = self.credentials.authorize(http) 972 resp, content = http.request('http://example.com') 973 self.assertEqual(http_client.BAD_REQUEST, resp.status) 974 self.assertEqual(None, self.credentials.token_response) 975 976 def test_to_from_json(self): 977 json = self.credentials.to_json() 978 instance = client.OAuth2Credentials.from_json(json) 979 self.assertEqual(client.OAuth2Credentials, type(instance)) 980 instance.token_expiry = None 981 self.credentials.token_expiry = None 982 983 self.assertEqual(instance.__dict__, self.credentials.__dict__) 984 985 def test_from_json_token_expiry(self): 986 data = json.loads(self.credentials.to_json()) 987 data['token_expiry'] = None 988 instance = client.OAuth2Credentials.from_json(json.dumps(data)) 989 self.assertIsInstance(instance, client.OAuth2Credentials) 990 991 def test_from_json_bad_token_expiry(self): 992 data = json.loads(self.credentials.to_json()) 993 data['token_expiry'] = 'foobar' 994 instance = client.OAuth2Credentials.from_json(json.dumps(data)) 995 self.assertIsInstance(instance, client.OAuth2Credentials) 996 997 def test_unicode_header_checks(self): 998 access_token = u'foo' 999 client_id = u'some_client_id' 1000 client_secret = u'cOuDdkfjxxnv+' 1001 refresh_token = u'1/0/a.df219fjls0' 1002 token_expiry = str(datetime.datetime.utcnow()) 1003 token_uri = str(oauth2client.GOOGLE_TOKEN_URI) 1004 revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI) 1005 user_agent = u'refresh_checker/1.0' 1006 credentials = client.OAuth2Credentials( 1007 access_token, client_id, client_secret, refresh_token, 1008 token_expiry, token_uri, user_agent, revoke_uri=revoke_uri) 1009 1010 # First, test that we correctly encode basic objects, making sure 1011 # to include a bytes object. Note that oauth2client will normalize 1012 # everything to bytes, no matter what python version we're in. 1013 http = credentials.authorize(HttpMock()) 1014 headers = {u'foo': 3, b'bar': True, 'baz': b'abc'} 1015 cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'} 1016 http.request(u'http://example.com', method=u'GET', headers=headers) 1017 for k, v in cleaned_headers.items(): 1018 self.assertTrue(k in http.headers) 1019 self.assertEqual(v, http.headers[k]) 1020 1021 # Next, test that we do fail on unicode. 1022 unicode_str = six.unichr(40960) + 'abcd' 1023 with self.assertRaises(client.NonAsciiHeaderError): 1024 http.request(u'http://example.com', method=u'GET', 1025 headers={u'foo': unicode_str}) 1026 1027 def test_no_unicode_in_request_params(self): 1028 access_token = u'foo' 1029 client_id = u'some_client_id' 1030 client_secret = u'cOuDdkfjxxnv+' 1031 refresh_token = u'1/0/a.df219fjls0' 1032 token_expiry = str(datetime.datetime.utcnow()) 1033 token_uri = str(oauth2client.GOOGLE_TOKEN_URI) 1034 revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI) 1035 user_agent = u'refresh_checker/1.0' 1036 credentials = client.OAuth2Credentials( 1037 access_token, client_id, client_secret, refresh_token, 1038 token_expiry, token_uri, user_agent, revoke_uri=revoke_uri) 1039 1040 http = HttpMock() 1041 http = credentials.authorize(http) 1042 http.request(u'http://example.com', method=u'GET', 1043 headers={u'foo': u'bar'}) 1044 for k, v in six.iteritems(http.headers): 1045 self.assertIsInstance(k, six.binary_type) 1046 self.assertIsInstance(v, six.binary_type) 1047 1048 # Test again with unicode strings that can't simply be converted 1049 # to ASCII. 1050 with self.assertRaises(client.NonAsciiHeaderError): 1051 http.request( 1052 u'http://example.com', method=u'GET', 1053 headers={u'foo': u'\N{COMET}'}) 1054 1055 self.credentials.token_response = 'foobar' 1056 instance = client.OAuth2Credentials.from_json( 1057 self.credentials.to_json()) 1058 self.assertEqual('foobar', instance.token_response) 1059 1060 def test__expires_in_no_expiry(self): 1061 credentials = client.OAuth2Credentials(None, None, None, None, 1062 None, None, None) 1063 self.assertIsNone(credentials.token_expiry) 1064 self.assertIsNone(credentials._expires_in()) 1065 1066 @mock.patch('oauth2client.client._UTCNOW') 1067 def test__expires_in_expired(self, utcnow): 1068 credentials = client.OAuth2Credentials(None, None, None, None, 1069 None, None, None) 1070 credentials.token_expiry = datetime.datetime.utcnow() 1071 now = credentials.token_expiry + datetime.timedelta(seconds=1) 1072 self.assertLess(credentials.token_expiry, now) 1073 utcnow.return_value = now 1074 self.assertEqual(credentials._expires_in(), 0) 1075 utcnow.assert_called_once_with() 1076 1077 @mock.patch('oauth2client.client._UTCNOW') 1078 def test__expires_in_not_expired(self, utcnow): 1079 credentials = client.OAuth2Credentials(None, None, None, None, 1080 None, None, None) 1081 credentials.token_expiry = datetime.datetime.utcnow() 1082 seconds = 1234 1083 now = credentials.token_expiry - datetime.timedelta(seconds=seconds) 1084 self.assertLess(now, credentials.token_expiry) 1085 utcnow.return_value = now 1086 self.assertEqual(credentials._expires_in(), seconds) 1087 utcnow.assert_called_once_with() 1088 1089 @mock.patch('oauth2client.client._UTCNOW') 1090 def test_get_access_token(self, utcnow): 1091 # Configure the patch. 1092 seconds = 11 1093 NOW = datetime.datetime(1992, 12, 31, second=seconds) 1094 utcnow.return_value = NOW 1095 1096 lifetime = 2 # number of seconds in which the token expires 1097 EXPIRY_TIME = datetime.datetime(1992, 12, 31, 1098 second=seconds + lifetime) 1099 1100 token1 = u'first_token' 1101 token_response_first = { 1102 'access_token': token1, 1103 'expires_in': lifetime, 1104 } 1105 token2 = u'second_token' 1106 token_response_second = { 1107 'access_token': token2, 1108 'expires_in': lifetime, 1109 } 1110 http = HttpMockSequence([ 1111 ({'status': '200'}, json.dumps(token_response_first).encode( 1112 'utf-8')), 1113 ({'status': '200'}, json.dumps(token_response_second).encode( 1114 'utf-8')), 1115 ]) 1116 1117 # Use the current credentials but unset the expiry and 1118 # the access token. 1119 credentials = copy.deepcopy(self.credentials) 1120 credentials.access_token = None 1121 credentials.token_expiry = None 1122 1123 # Get Access Token, First attempt. 1124 self.assertEqual(credentials.access_token, None) 1125 self.assertFalse(credentials.access_token_expired) 1126 self.assertEqual(credentials.token_expiry, None) 1127 token = credentials.get_access_token(http=http) 1128 self.assertEqual(credentials.token_expiry, EXPIRY_TIME) 1129 self.assertEqual(token1, token.access_token) 1130 self.assertEqual(lifetime, token.expires_in) 1131 self.assertEqual(token_response_first, credentials.token_response) 1132 # Two utcnow calls are expected: 1133 # - get_access_token() -> _do_refresh_request (setting expires in) 1134 # - get_access_token() -> _expires_in() 1135 expected_utcnow_calls = [mock.call()] * 2 1136 self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) 1137 1138 # Get Access Token, Second Attempt (not expired) 1139 self.assertEqual(credentials.access_token, token1) 1140 self.assertFalse(credentials.access_token_expired) 1141 token = credentials.get_access_token(http=http) 1142 # Make sure no refresh occurred since the token was not expired. 1143 self.assertEqual(token1, token.access_token) 1144 self.assertEqual(lifetime, token.expires_in) 1145 self.assertEqual(token_response_first, credentials.token_response) 1146 # Three more utcnow calls are expected: 1147 # - access_token_expired 1148 # - get_access_token() -> access_token_expired 1149 # - get_access_token -> _expires_in 1150 expected_utcnow_calls = [mock.call()] * (2 + 3) 1151 self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) 1152 1153 # Get Access Token, Third Attempt (force expiration) 1154 self.assertEqual(credentials.access_token, token1) 1155 credentials.token_expiry = NOW # Manually force expiry. 1156 self.assertTrue(credentials.access_token_expired) 1157 token = credentials.get_access_token(http=http) 1158 # Make sure refresh occurred since the token was not expired. 1159 self.assertEqual(token2, token.access_token) 1160 self.assertEqual(lifetime, token.expires_in) 1161 self.assertFalse(credentials.access_token_expired) 1162 self.assertEqual(token_response_second, 1163 credentials.token_response) 1164 # Five more utcnow calls are expected: 1165 # - access_token_expired 1166 # - get_access_token -> access_token_expired 1167 # - get_access_token -> _do_refresh_request 1168 # - get_access_token -> _expires_in 1169 # - access_token_expired 1170 expected_utcnow_calls = [mock.call()] * (2 + 3 + 5) 1171 self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) 1172 1173 @mock.patch.object(client.OAuth2Credentials, 'refresh') 1174 @mock.patch.object(client.OAuth2Credentials, '_expires_in', 1175 return_value=1835) 1176 def test_get_access_token_without_http(self, expires_in, refresh_mock): 1177 credentials = client.OAuth2Credentials(None, None, None, None, 1178 None, None, None) 1179 # Make sure access_token_expired returns True 1180 credentials.invalid = True 1181 # Specify a token so we can use it in the response. 1182 credentials.access_token = 'ya29-s3kr3t' 1183 1184 with mock.patch('httplib2.Http', 1185 return_value=object) as http_kls: 1186 token_info = credentials.get_access_token() 1187 expires_in.assert_called_once_with() 1188 refresh_mock.assert_called_once_with(http_kls.return_value) 1189 1190 self.assertIsInstance(token_info, client.AccessTokenInfo) 1191 self.assertEqual(token_info.access_token, 1192 credentials.access_token) 1193 self.assertEqual(token_info.expires_in, 1194 expires_in.return_value) 1195 1196 @mock.patch.object(client.OAuth2Credentials, 'refresh') 1197 @mock.patch.object(client.OAuth2Credentials, '_expires_in', 1198 return_value=1835) 1199 def test_get_access_token_with_http(self, expires_in, refresh_mock): 1200 credentials = client.OAuth2Credentials(None, None, None, None, 1201 None, None, None) 1202 # Make sure access_token_expired returns True 1203 credentials.invalid = True 1204 # Specify a token so we can use it in the response. 1205 credentials.access_token = 'ya29-s3kr3t' 1206 1207 http_obj = object() 1208 token_info = credentials.get_access_token(http_obj) 1209 self.assertIsInstance(token_info, client.AccessTokenInfo) 1210 self.assertEqual(token_info.access_token, 1211 credentials.access_token) 1212 self.assertEqual(token_info.expires_in, 1213 expires_in.return_value) 1214 1215 expires_in.assert_called_once_with() 1216 refresh_mock.assert_called_once_with(http_obj) 1217 1218 @mock.patch.object(client.OAuth2Credentials, 1219 '_generate_refresh_request_headers', 1220 return_value=object()) 1221 @mock.patch.object(client.OAuth2Credentials, 1222 '_generate_refresh_request_body', 1223 return_value=object()) 1224 @mock.patch('oauth2client.client.logger') 1225 def _do_refresh_request_test_helper(self, response, content, 1226 error_msg, logger, gen_body, 1227 gen_headers, store=None): 1228 credentials = client.OAuth2Credentials(None, None, None, None, 1229 None, None, None) 1230 credentials.store = store 1231 http_request = mock.Mock() 1232 http_request.return_value = response, content 1233 1234 with self.assertRaises( 1235 client.HttpAccessTokenRefreshError) as exc_manager: 1236 credentials._do_refresh_request(http_request) 1237 1238 self.assertEqual(exc_manager.exception.args, (error_msg,)) 1239 self.assertEqual(exc_manager.exception.status, response.status) 1240 http_request.assert_called_once_with(None, body=gen_body.return_value, 1241 headers=gen_headers.return_value, 1242 method='POST') 1243 1244 call1 = mock.call('Refreshing access_token') 1245 failure_template = 'Failed to retrieve access token: %s' 1246 call2 = mock.call(failure_template, content) 1247 self.assertEqual(logger.info.mock_calls, [call1, call2]) 1248 if store is not None: 1249 store.locked_put.assert_called_once_with(credentials) 1250 1251 def test__do_refresh_request_non_json_failure(self): 1252 response = httplib2.Response({ 1253 'status': int(http_client.BAD_REQUEST), 1254 }) 1255 content = u'Bad request' 1256 error_msg = 'Invalid response {0}.'.format(int(response.status)) 1257 self._do_refresh_request_test_helper(response, content, error_msg) 1258 1259 def test__do_refresh_request_basic_failure(self): 1260 response = httplib2.Response({ 1261 'status': int(http_client.INTERNAL_SERVER_ERROR), 1262 }) 1263 content = u'{}' 1264 error_msg = 'Invalid response {0}.'.format(int(response.status)) 1265 self._do_refresh_request_test_helper(response, content, error_msg) 1266 1267 def test__do_refresh_request_failure_w_json_error(self): 1268 response = httplib2.Response({ 1269 'status': http_client.BAD_GATEWAY, 1270 }) 1271 error_msg = 'Hi I am an error not a bearer' 1272 content = json.dumps({'error': error_msg}) 1273 self._do_refresh_request_test_helper(response, content, error_msg) 1274 1275 def test__do_refresh_request_failure_w_json_error_and_store(self): 1276 response = httplib2.Response({ 1277 'status': http_client.BAD_GATEWAY, 1278 }) 1279 error_msg = 'Where are we going wearer?' 1280 content = json.dumps({'error': error_msg}) 1281 store = mock.MagicMock() 1282 self._do_refresh_request_test_helper(response, content, error_msg, 1283 store=store) 1284 1285 def test__do_refresh_request_failure_w_json_error_and_desc(self): 1286 response = httplib2.Response({ 1287 'status': http_client.SERVICE_UNAVAILABLE, 1288 }) 1289 base_error = 'Ruckus' 1290 error_desc = 'Can you describe the ruckus' 1291 content = json.dumps({ 1292 'error': base_error, 1293 'error_description': error_desc, 1294 }) 1295 error_msg = '{0}: {1}'.format(base_error, error_desc) 1296 self._do_refresh_request_test_helper(response, content, error_msg) 1297 1298 @mock.patch('oauth2client.client.logger') 1299 def _do_revoke_test_helper(self, response, content, 1300 error_msg, logger, store=None): 1301 credentials = client.OAuth2Credentials( 1302 None, None, None, None, None, None, None, 1303 revoke_uri=oauth2client.GOOGLE_REVOKE_URI) 1304 credentials.store = store 1305 http_request = mock.Mock() 1306 http_request.return_value = response, content 1307 token = u's3kr3tz' 1308 1309 if response.status == http_client.OK: 1310 self.assertFalse(credentials.invalid) 1311 self.assertIsNone(credentials._do_revoke(http_request, token)) 1312 self.assertTrue(credentials.invalid) 1313 if store is not None: 1314 store.delete.assert_called_once_with() 1315 else: 1316 self.assertFalse(credentials.invalid) 1317 with self.assertRaises(client.TokenRevokeError) as exc_manager: 1318 credentials._do_revoke(http_request, token) 1319 # Make sure invalid was not flipped on. 1320 self.assertFalse(credentials.invalid) 1321 self.assertEqual(exc_manager.exception.args, (error_msg,)) 1322 if store is not None: 1323 store.delete.assert_not_called() 1324 1325 revoke_uri = oauth2client.GOOGLE_REVOKE_URI + '?token=' + token 1326 http_request.assert_called_once_with(revoke_uri) 1327 1328 logger.info.assert_called_once_with('Revoking token') 1329 1330 def test__do_revoke_success(self): 1331 response = httplib2.Response({ 1332 'status': http_client.OK, 1333 }) 1334 self._do_revoke_test_helper(response, b'', None) 1335 1336 def test__do_revoke_success_with_store(self): 1337 response = httplib2.Response({ 1338 'status': http_client.OK, 1339 }) 1340 store = mock.MagicMock() 1341 self._do_revoke_test_helper(response, b'', None, store=store) 1342 1343 def test__do_revoke_non_json_failure(self): 1344 response = httplib2.Response({ 1345 'status': http_client.BAD_REQUEST, 1346 }) 1347 content = u'Bad request' 1348 error_msg = 'Invalid response {0}.'.format(response.status) 1349 self._do_revoke_test_helper(response, content, error_msg) 1350 1351 def test__do_revoke_basic_failure(self): 1352 response = httplib2.Response({ 1353 'status': http_client.INTERNAL_SERVER_ERROR, 1354 }) 1355 content = u'{}' 1356 error_msg = 'Invalid response {0}.'.format(response.status) 1357 self._do_revoke_test_helper(response, content, error_msg) 1358 1359 def test__do_revoke_failure_w_json_error(self): 1360 response = httplib2.Response({ 1361 'status': http_client.BAD_GATEWAY, 1362 }) 1363 error_msg = 'Hi I am an error not a bearer' 1364 content = json.dumps({'error': error_msg}) 1365 self._do_revoke_test_helper(response, content, error_msg) 1366 1367 def test__do_revoke_failure_w_json_error_and_store(self): 1368 response = httplib2.Response({ 1369 'status': http_client.BAD_GATEWAY, 1370 }) 1371 error_msg = 'Where are we going wearer?' 1372 content = json.dumps({'error': error_msg}) 1373 store = mock.MagicMock() 1374 self._do_revoke_test_helper(response, content, error_msg, 1375 store=store) 1376 1377 @mock.patch('oauth2client.client.logger') 1378 def _do_retrieve_scopes_test_helper(self, response, content, 1379 error_msg, logger, scopes=None): 1380 credentials = client.OAuth2Credentials( 1381 None, None, None, None, None, None, None, 1382 token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI) 1383 http_request = mock.Mock() 1384 http_request.return_value = response, content 1385 token = u's3kr3tz' 1386 1387 if response.status == http_client.OK: 1388 self.assertEqual(credentials.scopes, set()) 1389 self.assertIsNone( 1390 credentials._do_retrieve_scopes(http_request, token)) 1391 self.assertEqual(credentials.scopes, scopes) 1392 else: 1393 self.assertEqual(credentials.scopes, set()) 1394 with self.assertRaises(client.Error) as exc_manager: 1395 credentials._do_retrieve_scopes(http_request, token) 1396 # Make sure scopes were not changed. 1397 self.assertEqual(credentials.scopes, set()) 1398 self.assertEqual(exc_manager.exception.args, (error_msg,)) 1399 1400 token_uri = client._update_query_params( 1401 oauth2client.GOOGLE_TOKEN_INFO_URI, 1402 {'fields': 'scope', 'access_token': token}) 1403 self.assertEqual(len(http_request.mock_calls), 1) 1404 scopes_call = http_request.mock_calls[0] 1405 call_args = scopes_call[1] 1406 self.assertEqual(len(call_args), 1) 1407 called_uri = call_args[0] 1408 assertUrisEqual(self, token_uri, called_uri) 1409 logger.info.assert_called_once_with('Refreshing scopes') 1410 1411 def test__do_retrieve_scopes_success_bad_json(self): 1412 response = httplib2.Response({ 1413 'status': http_client.OK, 1414 }) 1415 invalid_json = b'{' 1416 with self.assertRaises(ValueError): 1417 self._do_retrieve_scopes_test_helper(response, invalid_json, None) 1418 1419 def test__do_retrieve_scopes_success(self): 1420 response = httplib2.Response({ 1421 'status': http_client.OK, 1422 }) 1423 content = b'{"scope": "foo bar"}' 1424 self._do_retrieve_scopes_test_helper(response, content, None, 1425 scopes=set(['foo', 'bar'])) 1426 1427 def test__do_retrieve_scopes_non_json_failure(self): 1428 response = httplib2.Response({ 1429 'status': http_client.BAD_REQUEST, 1430 }) 1431 content = u'Bad request' 1432 error_msg = 'Invalid response {0}.'.format(response.status) 1433 self._do_retrieve_scopes_test_helper(response, content, error_msg) 1434 1435 def test__do_retrieve_scopes_basic_failure(self): 1436 response = httplib2.Response({ 1437 'status': http_client.INTERNAL_SERVER_ERROR, 1438 }) 1439 content = u'{}' 1440 error_msg = 'Invalid response {0}.'.format(response.status) 1441 self._do_retrieve_scopes_test_helper(response, content, error_msg) 1442 1443 def test__do_retrieve_scopes_failure_w_json_error(self): 1444 response = httplib2.Response({ 1445 'status': http_client.BAD_GATEWAY, 1446 }) 1447 error_msg = 'Error desc I sit at a desk' 1448 content = json.dumps({'error_description': error_msg}) 1449 self._do_retrieve_scopes_test_helper(response, content, error_msg) 1450 1451 def test_has_scopes(self): 1452 self.assertTrue(self.credentials.has_scopes('foo')) 1453 self.assertTrue(self.credentials.has_scopes(['foo'])) 1454 self.assertFalse(self.credentials.has_scopes('bar')) 1455 self.assertFalse(self.credentials.has_scopes(['bar'])) 1456 1457 self.credentials.scopes = set(['foo', 'bar']) 1458 self.assertTrue(self.credentials.has_scopes('foo')) 1459 self.assertTrue(self.credentials.has_scopes('bar')) 1460 self.assertFalse(self.credentials.has_scopes('baz')) 1461 self.assertTrue(self.credentials.has_scopes(['foo', 'bar'])) 1462 self.assertFalse(self.credentials.has_scopes(['foo', 'baz'])) 1463 1464 self.credentials.scopes = set([]) 1465 self.assertFalse(self.credentials.has_scopes('foo')) 1466 1467 def test_retrieve_scopes(self): 1468 info_response_first = {'scope': 'foo bar'} 1469 info_response_second = {'error_description': 'abcdef'} 1470 http = HttpMockSequence([ 1471 ({'status': '200'}, json.dumps(info_response_first).encode( 1472 'utf-8')), 1473 ({'status': '400'}, json.dumps(info_response_second).encode( 1474 'utf-8')), 1475 ({'status': '500'}, b''), 1476 ]) 1477 1478 self.credentials.retrieve_scopes(http) 1479 self.assertEqual(set(['foo', 'bar']), self.credentials.scopes) 1480 1481 with self.assertRaises(client.Error): 1482 self.credentials.retrieve_scopes(http) 1483 1484 with self.assertRaises(client.Error): 1485 self.credentials.retrieve_scopes(http) 1486 1487 def test_refresh_updates_id_token(self): 1488 for status_code in client.REFRESH_STATUS_CODES: 1489 body = {'foo': 'bar'} 1490 body_json = json.dumps(body).encode('ascii') 1491 payload = base64.urlsafe_b64encode(body_json).strip(b'=') 1492 jwt = b'stuff.' + payload + b'.signature' 1493 1494 token_response = (b'{' 1495 b' "access_token":"1/3w",' 1496 b' "expires_in":3600,' 1497 b' "id_token": "' + jwt + b'"' 1498 b'}') 1499 http = HttpMockSequence([ 1500 ({'status': status_code}, b''), 1501 ({'status': '200'}, token_response), 1502 ({'status': '200'}, 'echo_request_headers'), 1503 ]) 1504 http = self.credentials.authorize(http) 1505 resp, content = http.request('http://example.com') 1506 self.assertEqual(self.credentials.id_token, body) 1507 1508 1509class AccessTokenCredentialsTests(unittest2.TestCase): 1510 1511 def setUp(self): 1512 access_token = 'foo' 1513 user_agent = 'refresh_checker/1.0' 1514 self.credentials = client.AccessTokenCredentials( 1515 access_token, user_agent, 1516 revoke_uri=oauth2client.GOOGLE_REVOKE_URI) 1517 1518 def test_token_refresh_success(self): 1519 for status_code in client.REFRESH_STATUS_CODES: 1520 http = HttpMockSequence([ 1521 ({'status': status_code}, b''), 1522 ]) 1523 http = self.credentials.authorize(http) 1524 with self.assertRaises(client.AccessTokenCredentialsError): 1525 resp, content = http.request('http://example.com') 1526 1527 def test_token_revoke_success(self): 1528 _token_revoke_test_helper( 1529 self, '200', revoke_raise=False, 1530 valid_bool_value=True, token_attr='access_token') 1531 1532 def test_token_revoke_failure(self): 1533 _token_revoke_test_helper( 1534 self, '400', revoke_raise=True, 1535 valid_bool_value=False, token_attr='access_token') 1536 1537 def test_non_401_error_response(self): 1538 http = HttpMockSequence([ 1539 ({'status': '400'}, b''), 1540 ]) 1541 http = self.credentials.authorize(http) 1542 resp, content = http.request('http://example.com') 1543 self.assertEqual(http_client.BAD_REQUEST, resp.status) 1544 1545 def test_auth_header_sent(self): 1546 http = HttpMockSequence([ 1547 ({'status': '200'}, 'echo_request_headers'), 1548 ]) 1549 http = self.credentials.authorize(http) 1550 resp, content = http.request('http://example.com') 1551 self.assertEqual(b'Bearer foo', content[b'Authorization']) 1552 1553 1554class TestAssertionCredentials(unittest2.TestCase): 1555 assertion_text = 'This is the assertion' 1556 assertion_type = 'http://www.google.com/assertionType' 1557 1558 class AssertionCredentialsTestImpl(client.AssertionCredentials): 1559 1560 def _generate_assertion(self): 1561 return TestAssertionCredentials.assertion_text 1562 1563 def setUp(self): 1564 user_agent = 'fun/2.0' 1565 self.credentials = self.AssertionCredentialsTestImpl( 1566 self.assertion_type, user_agent=user_agent) 1567 1568 def test__generate_assertion_abstract(self): 1569 credentials = client.AssertionCredentials(None) 1570 with self.assertRaises(NotImplementedError): 1571 credentials._generate_assertion() 1572 1573 def test_assertion_body(self): 1574 body = urllib.parse.parse_qs( 1575 self.credentials._generate_refresh_request_body()) 1576 self.assertEqual(self.assertion_text, body['assertion'][0]) 1577 self.assertEqual('urn:ietf:params:oauth:grant-type:jwt-bearer', 1578 body['grant_type'][0]) 1579 1580 def test_assertion_refresh(self): 1581 http = HttpMockSequence([ 1582 ({'status': '200'}, b'{"access_token":"1/3w"}'), 1583 ({'status': '200'}, 'echo_request_headers'), 1584 ]) 1585 http = self.credentials.authorize(http) 1586 resp, content = http.request('http://example.com') 1587 self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) 1588 1589 def test_token_revoke_success(self): 1590 _token_revoke_test_helper( 1591 self, '200', revoke_raise=False, 1592 valid_bool_value=True, token_attr='access_token') 1593 1594 def test_token_revoke_failure(self): 1595 _token_revoke_test_helper( 1596 self, '400', revoke_raise=True, 1597 valid_bool_value=False, token_attr='access_token') 1598 1599 def test_sign_blob_abstract(self): 1600 credentials = client.AssertionCredentials(None) 1601 with self.assertRaises(NotImplementedError): 1602 credentials.sign_blob(b'blob') 1603 1604 1605class UpdateQueryParamsTest(unittest2.TestCase): 1606 def test_update_query_params_no_params(self): 1607 uri = 'http://www.google.com' 1608 updated = client._update_query_params(uri, {'a': 'b'}) 1609 self.assertEqual(updated, uri + '?a=b') 1610 1611 def test_update_query_params_existing_params(self): 1612 uri = 'http://www.google.com?x=y' 1613 updated = client._update_query_params(uri, {'a': 'b', 'c': 'd&'}) 1614 hardcoded_update = uri + '&a=b&c=d%26' 1615 assertUrisEqual(self, updated, hardcoded_update) 1616 1617 1618class ExtractIdTokenTest(unittest2.TestCase): 1619 """Tests client._extract_id_token().""" 1620 1621 def test_extract_success(self): 1622 body = {'foo': 'bar'} 1623 body_json = json.dumps(body).encode('ascii') 1624 payload = base64.urlsafe_b64encode(body_json).strip(b'=') 1625 jwt = b'stuff.' + payload + b'.signature' 1626 1627 extracted = client._extract_id_token(jwt) 1628 self.assertEqual(extracted, body) 1629 1630 def test_extract_failure(self): 1631 body = {'foo': 'bar'} 1632 body_json = json.dumps(body).encode('ascii') 1633 payload = base64.urlsafe_b64encode(body_json).strip(b'=') 1634 jwt = b'stuff.' + payload 1635 with self.assertRaises(client.VerifyJwtTokenError): 1636 client._extract_id_token(jwt) 1637 1638 1639class OAuth2WebServerFlowTest(unittest2.TestCase): 1640 1641 def setUp(self): 1642 self.flow = client.OAuth2WebServerFlow( 1643 client_id='client_id+1', 1644 client_secret='secret+1', 1645 scope='foo', 1646 redirect_uri=client.OOB_CALLBACK_URN, 1647 user_agent='unittest-sample/1.0', 1648 revoke_uri='dummy_revoke_uri', 1649 ) 1650 1651 def test_construct_authorize_url(self): 1652 authorize_url = self.flow.step1_get_authorize_url(state='state+1') 1653 1654 parsed = urllib.parse.urlparse(authorize_url) 1655 q = urllib.parse.parse_qs(parsed[4]) 1656 self.assertEqual('client_id+1', q['client_id'][0]) 1657 self.assertEqual('code', q['response_type'][0]) 1658 self.assertEqual('foo', q['scope'][0]) 1659 self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0]) 1660 self.assertEqual('offline', q['access_type'][0]) 1661 self.assertEqual('state+1', q['state'][0]) 1662 1663 def test_override_flow_via_kwargs(self): 1664 """Passing kwargs to override defaults.""" 1665 flow = client.OAuth2WebServerFlow( 1666 client_id='client_id+1', 1667 client_secret='secret+1', 1668 scope='foo', 1669 redirect_uri=client.OOB_CALLBACK_URN, 1670 user_agent='unittest-sample/1.0', 1671 access_type='online', 1672 response_type='token' 1673 ) 1674 authorize_url = flow.step1_get_authorize_url() 1675 1676 parsed = urllib.parse.urlparse(authorize_url) 1677 q = urllib.parse.parse_qs(parsed[4]) 1678 self.assertEqual('client_id+1', q['client_id'][0]) 1679 self.assertEqual('token', q['response_type'][0]) 1680 self.assertEqual('foo', q['scope'][0]) 1681 self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0]) 1682 self.assertEqual('online', q['access_type'][0]) 1683 1684 def test__oauth2_web_server_flow_params(self): 1685 params = client._oauth2_web_server_flow_params({}) 1686 self.assertEqual(params['access_type'], 'offline') 1687 self.assertEqual(params['response_type'], 'code') 1688 1689 params = client._oauth2_web_server_flow_params({ 1690 'approval_prompt': 'force'}) 1691 self.assertEqual(params['prompt'], 'consent') 1692 self.assertNotIn('approval_prompt', params) 1693 1694 params = client._oauth2_web_server_flow_params({ 1695 'approval_prompt': 'other'}) 1696 self.assertEqual(params['approval_prompt'], 'other') 1697 1698 @mock.patch('oauth2client.client.logger') 1699 def test_step1_get_authorize_url_redirect_override(self, logger): 1700 flow = client.OAuth2WebServerFlow('client_id+1', scope='foo', 1701 redirect_uri=client.OOB_CALLBACK_URN) 1702 alt_redirect = 'foo:bar' 1703 self.assertEqual(flow.redirect_uri, client.OOB_CALLBACK_URN) 1704 result = flow.step1_get_authorize_url(redirect_uri=alt_redirect) 1705 # Make sure the redirect value was updated. 1706 self.assertEqual(flow.redirect_uri, alt_redirect) 1707 query_params = { 1708 'client_id': flow.client_id, 1709 'redirect_uri': alt_redirect, 1710 'scope': flow.scope, 1711 'access_type': 'offline', 1712 'response_type': 'code', 1713 } 1714 expected = client._update_query_params(flow.auth_uri, query_params) 1715 assertUrisEqual(self, expected, result) 1716 # Check stubs. 1717 self.assertEqual(logger.warning.call_count, 1) 1718 1719 def test_step1_get_authorize_url_without_redirect(self): 1720 flow = client.OAuth2WebServerFlow('client_id+1', scope='foo', 1721 redirect_uri=None) 1722 with self.assertRaises(ValueError): 1723 flow.step1_get_authorize_url(redirect_uri=None) 1724 1725 def test_step1_get_authorize_url_without_login_hint(self): 1726 login_hint = 'There are wascally wabbits nearby' 1727 flow = client.OAuth2WebServerFlow('client_id+1', scope='foo', 1728 redirect_uri=client.OOB_CALLBACK_URN, 1729 login_hint=login_hint) 1730 result = flow.step1_get_authorize_url() 1731 query_params = { 1732 'client_id': flow.client_id, 1733 'login_hint': login_hint, 1734 'redirect_uri': client.OOB_CALLBACK_URN, 1735 'scope': flow.scope, 1736 'access_type': 'offline', 1737 'response_type': 'code', 1738 } 1739 expected = client._update_query_params(flow.auth_uri, query_params) 1740 assertUrisEqual(self, expected, result) 1741 1742 def test_step1_get_device_and_user_codes_wo_device_uri(self): 1743 flow = client.OAuth2WebServerFlow('CID', scope='foo', device_uri=None) 1744 with self.assertRaises(ValueError): 1745 flow.step1_get_device_and_user_codes() 1746 1747 def _step1_get_device_and_user_codes_helper( 1748 self, extra_headers=None, user_agent=None, default_http=False, 1749 content=None): 1750 flow = client.OAuth2WebServerFlow('CID', scope='foo', 1751 user_agent=user_agent) 1752 device_code = 'bfc06756-062e-430f-9f0f-460ca44724e5' 1753 user_code = '5faf2780-fc83-11e5-9bc2-00c2c63e5792' 1754 ver_url = 'http://foo.bar' 1755 if content is None: 1756 content = json.dumps({ 1757 'device_code': device_code, 1758 'user_code': user_code, 1759 'verification_url': ver_url, 1760 }) 1761 http = HttpMockSequence([ 1762 ({'status': http_client.OK}, content), 1763 ]) 1764 if default_http: 1765 with mock.patch('httplib2.Http', return_value=http): 1766 result = flow.step1_get_device_and_user_codes() 1767 else: 1768 result = flow.step1_get_device_and_user_codes(http=http) 1769 1770 expected = client.DeviceFlowInfo( 1771 device_code, user_code, None, ver_url, None) 1772 self.assertEqual(result, expected) 1773 self.assertEqual(len(http.requests), 1) 1774 self.assertEqual( 1775 http.requests[0]['uri'], oauth2client.GOOGLE_DEVICE_URI) 1776 body = http.requests[0]['body'] 1777 self.assertEqual(urllib.parse.parse_qs(body), 1778 {'client_id': [flow.client_id], 1779 'scope': [flow.scope]}) 1780 headers = {'content-type': 'application/x-www-form-urlencoded'} 1781 if extra_headers is not None: 1782 headers.update(extra_headers) 1783 self.assertEqual(http.requests[0]['headers'], headers) 1784 1785 def test_step1_get_device_and_user_codes(self): 1786 self._step1_get_device_and_user_codes_helper() 1787 1788 def test_step1_get_device_and_user_codes_w_user_agent(self): 1789 user_agent = 'spiderman' 1790 extra_headers = {'user-agent': user_agent} 1791 self._step1_get_device_and_user_codes_helper( 1792 user_agent=user_agent, extra_headers=extra_headers) 1793 1794 def test_step1_get_device_and_user_codes_w_default_http(self): 1795 self._step1_get_device_and_user_codes_helper(default_http=True) 1796 1797 def test_step1_get_device_and_user_codes_bad_payload(self): 1798 non_json_content = b'{' 1799 with self.assertRaises(client.OAuth2DeviceCodeError): 1800 self._step1_get_device_and_user_codes_helper( 1801 content=non_json_content) 1802 1803 def _step1_get_device_and_user_codes_fail_helper(self, status, 1804 content, error_msg): 1805 flow = client.OAuth2WebServerFlow('CID', scope='foo') 1806 http = HttpMockSequence([ 1807 ({'status': status}, content), 1808 ]) 1809 with self.assertRaises(client.OAuth2DeviceCodeError) as exc_manager: 1810 flow.step1_get_device_and_user_codes(http=http) 1811 1812 self.assertEqual(exc_manager.exception.args, (error_msg,)) 1813 1814 def test_step1_get_device_and_user_codes_non_json_failure(self): 1815 status = int(http_client.BAD_REQUEST) 1816 content = 'Nope not JSON.' 1817 error_msg = 'Invalid response {0}.'.format(status) 1818 self._step1_get_device_and_user_codes_fail_helper(status, content, 1819 error_msg) 1820 1821 def test_step1_get_device_and_user_codes_basic_failure(self): 1822 status = int(http_client.INTERNAL_SERVER_ERROR) 1823 content = b'{}' 1824 error_msg = 'Invalid response {0}.'.format(status) 1825 self._step1_get_device_and_user_codes_fail_helper(status, content, 1826 error_msg) 1827 1828 def test_step1_get_device_and_user_codes_failure_w_json_error(self): 1829 status = int(http_client.BAD_GATEWAY) 1830 base_error = 'ZOMG user codes failure.' 1831 content = json.dumps({'error': base_error}) 1832 error_msg = 'Invalid response {0}. Error: {1}'.format(status, 1833 base_error) 1834 self._step1_get_device_and_user_codes_fail_helper(status, content, 1835 error_msg) 1836 1837 def test_step2_exchange_no_input(self): 1838 flow = client.OAuth2WebServerFlow('client_id+1', scope='foo') 1839 with self.assertRaises(ValueError): 1840 flow.step2_exchange() 1841 1842 def test_step2_exchange_code_and_device_flow(self): 1843 flow = client.OAuth2WebServerFlow('client_id+1', scope='foo') 1844 with self.assertRaises(ValueError): 1845 flow.step2_exchange(code='code', device_flow_info='dfi') 1846 1847 def test_scope_is_required(self): 1848 with self.assertRaises(TypeError): 1849 client.OAuth2WebServerFlow('client_id+1') 1850 1851 def test_exchange_failure(self): 1852 http = HttpMockSequence([ 1853 ({'status': '400'}, b'{"error":"invalid_request"}'), 1854 ]) 1855 1856 with self.assertRaises(client.FlowExchangeError): 1857 self.flow.step2_exchange(code='some random code', http=http) 1858 1859 def test_urlencoded_exchange_failure(self): 1860 http = HttpMockSequence([ 1861 ({'status': '400'}, b'error=invalid_request'), 1862 ]) 1863 1864 with self.assertRaisesRegexp(client.FlowExchangeError, 1865 'invalid_request'): 1866 self.flow.step2_exchange(code='some random code', http=http) 1867 1868 def test_exchange_failure_with_json_error(self): 1869 # Some providers have 'error' attribute as a JSON object 1870 # in place of regular string. 1871 # This test makes sure no strange object-to-string coversion 1872 # exceptions are being raised instead of FlowExchangeError. 1873 payload = (b'{' 1874 b' "error": {' 1875 b' "message": "Error validating verification code.",' 1876 b' "type": "OAuthException"' 1877 b' }' 1878 b'}') 1879 http = HttpMockSequence([({'status': '400'}, payload)]) 1880 1881 with self.assertRaises(client.FlowExchangeError): 1882 self.flow.step2_exchange(code='some random code', http=http) 1883 1884 def _exchange_success_test_helper(self, code=None, device_flow_info=None): 1885 payload = (b'{' 1886 b' "access_token":"SlAV32hkKG",' 1887 b' "expires_in":3600,' 1888 b' "refresh_token":"8xLOxBtZp8"' 1889 b'}') 1890 http = HttpMockSequence([({'status': '200'}, payload)]) 1891 credentials = self.flow.step2_exchange( 1892 code=code, device_flow_info=device_flow_info, http=http) 1893 self.assertEqual('SlAV32hkKG', credentials.access_token) 1894 self.assertNotEqual(None, credentials.token_expiry) 1895 self.assertEqual('8xLOxBtZp8', credentials.refresh_token) 1896 self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) 1897 self.assertEqual(set(['foo']), credentials.scopes) 1898 1899 def test_exchange_success(self): 1900 self._exchange_success_test_helper(code='some random code') 1901 1902 def test_exchange_success_with_device_flow_info(self): 1903 device_flow_info = client.DeviceFlowInfo( 1904 'some random code', None, None, None, None) 1905 self._exchange_success_test_helper(device_flow_info=device_flow_info) 1906 1907 def test_exchange_success_binary_code(self): 1908 binary_code = b'some random code' 1909 access_token = 'SlAV32hkKG' 1910 expires_in = '3600' 1911 refresh_token = '8xLOxBtZp8' 1912 revoke_uri = 'dummy_revoke_uri' 1913 1914 payload = ('{' 1915 ' "access_token":"' + access_token + '",' 1916 ' "expires_in":' + expires_in + ',' 1917 ' "refresh_token":"' + refresh_token + '"' 1918 '}') 1919 http = HttpMockSequence( 1920 [({'status': '200'}, _helpers._to_bytes(payload))]) 1921 credentials = self.flow.step2_exchange(code=binary_code, http=http) 1922 self.assertEqual(access_token, credentials.access_token) 1923 self.assertIsNotNone(credentials.token_expiry) 1924 self.assertEqual(refresh_token, credentials.refresh_token) 1925 self.assertEqual(revoke_uri, credentials.revoke_uri) 1926 self.assertEqual(set(['foo']), credentials.scopes) 1927 1928 def test_exchange_dictlike(self): 1929 class FakeDict(object): 1930 def __init__(self, d): 1931 self.d = d 1932 1933 def __getitem__(self, name): 1934 return self.d[name] 1935 1936 def __contains__(self, name): 1937 return name in self.d 1938 1939 code = 'some random code' 1940 not_a_dict = FakeDict({'code': code}) 1941 payload = (b'{' 1942 b' "access_token":"SlAV32hkKG",' 1943 b' "expires_in":3600,' 1944 b' "refresh_token":"8xLOxBtZp8"' 1945 b'}') 1946 http = HttpMockSequence([({'status': '200'}, payload)]) 1947 1948 credentials = self.flow.step2_exchange(code=not_a_dict, http=http) 1949 self.assertEqual('SlAV32hkKG', credentials.access_token) 1950 self.assertNotEqual(None, credentials.token_expiry) 1951 self.assertEqual('8xLOxBtZp8', credentials.refresh_token) 1952 self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) 1953 self.assertEqual(set(['foo']), credentials.scopes) 1954 request_code = urllib.parse.parse_qs( 1955 http.requests[0]['body'])['code'][0] 1956 self.assertEqual(code, request_code) 1957 1958 def test_exchange_using_authorization_header(self): 1959 auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=', 1960 flow = client.OAuth2WebServerFlow( 1961 client_id='client_id+1', 1962 authorization_header=auth_header, 1963 scope='foo', 1964 redirect_uri=client.OOB_CALLBACK_URN, 1965 user_agent='unittest-sample/1.0', 1966 revoke_uri='dummy_revoke_uri', 1967 ) 1968 http = HttpMockSequence([ 1969 ({'status': '200'}, b'access_token=SlAV32hkKG'), 1970 ]) 1971 1972 credentials = flow.step2_exchange(code='some random code', http=http) 1973 self.assertEqual('SlAV32hkKG', credentials.access_token) 1974 1975 test_request = http.requests[0] 1976 # Did we pass the Authorization header? 1977 self.assertEqual(test_request['headers']['Authorization'], auth_header) 1978 # Did we omit client_secret from POST body? 1979 self.assertTrue('client_secret' not in test_request['body']) 1980 1981 def test_urlencoded_exchange_success(self): 1982 http = HttpMockSequence([ 1983 ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'), 1984 ]) 1985 1986 credentials = self.flow.step2_exchange(code='some random code', 1987 http=http) 1988 self.assertEqual('SlAV32hkKG', credentials.access_token) 1989 self.assertNotEqual(None, credentials.token_expiry) 1990 1991 def test_urlencoded_expires_param(self): 1992 http = HttpMockSequence([ 1993 # Note the 'expires=3600' where you'd normally 1994 # have if named 'expires_in' 1995 ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'), 1996 ]) 1997 1998 credentials = self.flow.step2_exchange(code='some random code', 1999 http=http) 2000 self.assertNotEqual(None, credentials.token_expiry) 2001 2002 def test_exchange_no_expires_in(self): 2003 payload = (b'{' 2004 b' "access_token":"SlAV32hkKG",' 2005 b' "refresh_token":"8xLOxBtZp8"' 2006 b'}') 2007 http = HttpMockSequence([({'status': '200'}, payload)]) 2008 2009 credentials = self.flow.step2_exchange(code='some random code', 2010 http=http) 2011 self.assertEqual(None, credentials.token_expiry) 2012 2013 def test_urlencoded_exchange_no_expires_in(self): 2014 http = HttpMockSequence([ 2015 # This might be redundant but just to make sure 2016 # urlencoded access_token gets parsed correctly 2017 ({'status': '200'}, b'access_token=SlAV32hkKG'), 2018 ]) 2019 2020 credentials = self.flow.step2_exchange(code='some random code', 2021 http=http) 2022 self.assertEqual(None, credentials.token_expiry) 2023 2024 def test_exchange_fails_if_no_code(self): 2025 payload = (b'{' 2026 b' "access_token":"SlAV32hkKG",' 2027 b' "refresh_token":"8xLOxBtZp8"' 2028 b'}') 2029 http = HttpMockSequence([({'status': '200'}, payload)]) 2030 2031 code = {'error': 'thou shall not pass'} 2032 with self.assertRaisesRegexp( 2033 client.FlowExchangeError, 'shall not pass'): 2034 self.flow.step2_exchange(code=code, http=http) 2035 2036 def test_exchange_id_token_fail(self): 2037 payload = (b'{' 2038 b' "access_token":"SlAV32hkKG",' 2039 b' "refresh_token":"8xLOxBtZp8",' 2040 b' "id_token": "stuff.payload"' 2041 b'}') 2042 http = HttpMockSequence([({'status': '200'}, payload)]) 2043 2044 with self.assertRaises(client.VerifyJwtTokenError): 2045 self.flow.step2_exchange(code='some random code', http=http) 2046 2047 def test_exchange_id_token(self): 2048 body = {'foo': 'bar'} 2049 body_json = json.dumps(body).encode('ascii') 2050 payload = base64.urlsafe_b64encode(body_json).strip(b'=') 2051 jwt = (base64.urlsafe_b64encode(b'stuff') + b'.' + payload + b'.' + 2052 base64.urlsafe_b64encode(b'signature')) 2053 2054 payload = (b'{' 2055 b' "access_token":"SlAV32hkKG",' 2056 b' "refresh_token":"8xLOxBtZp8",' 2057 b' "id_token": "' + jwt + b'"' 2058 b'}') 2059 http = HttpMockSequence([({'status': '200'}, payload)]) 2060 credentials = self.flow.step2_exchange(code='some random code', 2061 http=http) 2062 self.assertEqual(credentials.id_token, body) 2063 2064 2065class FlowFromCachedClientsecrets(unittest2.TestCase): 2066 2067 def test_flow_from_clientsecrets_cached(self): 2068 cache_mock = CacheMock() 2069 load_and_cache('client_secrets.json', 'some_secrets', cache_mock) 2070 2071 flow = client.flow_from_clientsecrets( 2072 'some_secrets', '', redirect_uri='oob', cache=cache_mock) 2073 self.assertEqual('foo_client_secret', flow.client_secret) 2074 2075 @mock.patch('oauth2client.clientsecrets.loadfile') 2076 def _flow_from_clientsecrets_success_helper(self, loadfile_mock, 2077 device_uri=None, 2078 revoke_uri=None): 2079 client_type = clientsecrets.TYPE_WEB 2080 client_info = { 2081 'auth_uri': 'auth_uri', 2082 'token_uri': 'token_uri', 2083 'client_id': 'client_id', 2084 'client_secret': 'client_secret', 2085 } 2086 if revoke_uri is not None: 2087 client_info['revoke_uri'] = revoke_uri 2088 loadfile_mock.return_value = client_type, client_info 2089 filename = object() 2090 scope = ['baz'] 2091 cache = object() 2092 2093 if device_uri is not None: 2094 result = client.flow_from_clientsecrets( 2095 filename, scope, cache=cache, device_uri=device_uri) 2096 self.assertEqual(result.device_uri, device_uri) 2097 else: 2098 result = client.flow_from_clientsecrets( 2099 filename, scope, cache=cache) 2100 2101 self.assertIsInstance(result, client.OAuth2WebServerFlow) 2102 loadfile_mock.assert_called_once_with(filename, cache=cache) 2103 2104 def test_flow_from_clientsecrets_success(self): 2105 self._flow_from_clientsecrets_success_helper() 2106 2107 def test_flow_from_clientsecrets_success_w_device_uri(self): 2108 device_uri = 'http://device.uri' 2109 self._flow_from_clientsecrets_success_helper(device_uri=device_uri) 2110 2111 def test_flow_from_clientsecrets_success_w_revoke_uri(self): 2112 revoke_uri = 'http://revoke.uri' 2113 self._flow_from_clientsecrets_success_helper(revoke_uri=revoke_uri) 2114 2115 @mock.patch('oauth2client.clientsecrets.loadfile', 2116 side_effect=clientsecrets.InvalidClientSecretsError) 2117 def test_flow_from_clientsecrets_invalid(self, loadfile_mock): 2118 filename = object() 2119 cache = object() 2120 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 2121 client.flow_from_clientsecrets( 2122 filename, None, cache=cache, message=None) 2123 loadfile_mock.assert_called_once_with(filename, cache=cache) 2124 2125 @mock.patch('oauth2client.clientsecrets.loadfile', 2126 side_effect=clientsecrets.InvalidClientSecretsError) 2127 @mock.patch('sys.exit') 2128 def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit, 2129 loadfile_mock): 2130 filename = object() 2131 cache = object() 2132 message = 'hi mom' 2133 2134 client.flow_from_clientsecrets( 2135 filename, None, cache=cache, message=message) 2136 sys_exit.assert_called_once_with(message) 2137 loadfile_mock.assert_called_once_with(filename, cache=cache) 2138 2139 @mock.patch('oauth2client.clientsecrets.loadfile', 2140 side_effect=clientsecrets.InvalidClientSecretsError('foobar')) 2141 @mock.patch('sys.exit') 2142 def test_flow_from_clientsecrets_invalid_w_msg_and_text(self, sys_exit, 2143 loadfile_mock): 2144 filename = object() 2145 cache = object() 2146 message = 'hi mom' 2147 expected = ('The client secrets were invalid: ' 2148 '\n{0}\n{1}'.format('foobar', 'hi mom')) 2149 2150 client.flow_from_clientsecrets( 2151 filename, None, cache=cache, message=message) 2152 sys_exit.assert_called_once_with(expected) 2153 loadfile_mock.assert_called_once_with(filename, cache=cache) 2154 2155 @mock.patch('oauth2client.clientsecrets.loadfile') 2156 def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock): 2157 client_type = 'UNKNOWN' 2158 loadfile_mock.return_value = client_type, None 2159 filename = object() 2160 cache = object() 2161 2162 err_msg = ('This OAuth 2.0 flow is unsupported: ' 2163 '{0!r}'.format(client_type)) 2164 with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError, 2165 err_msg): 2166 client.flow_from_clientsecrets(filename, None, cache=cache) 2167 2168 loadfile_mock.assert_called_once_with(filename, cache=cache) 2169 2170 2171class CredentialsFromCodeTests(unittest2.TestCase): 2172 2173 def setUp(self): 2174 self.client_id = 'client_id_abc' 2175 self.client_secret = 'secret_use_code' 2176 self.scope = 'foo' 2177 self.code = '12345abcde' 2178 self.redirect_uri = 'postmessage' 2179 2180 def test_exchange_code_for_token(self): 2181 token = 'asdfghjkl' 2182 payload = json.dumps({'access_token': token, 'expires_in': 3600}) 2183 http = HttpMockSequence([ 2184 ({'status': '200'}, payload.encode('utf-8')), 2185 ]) 2186 credentials = client.credentials_from_code( 2187 self.client_id, self.client_secret, self.scope, 2188 self.code, http=http, redirect_uri=self.redirect_uri) 2189 self.assertEqual(credentials.access_token, token) 2190 self.assertNotEqual(None, credentials.token_expiry) 2191 self.assertEqual(set(['foo']), credentials.scopes) 2192 2193 def test_exchange_code_for_token_fail(self): 2194 http = HttpMockSequence([ 2195 ({'status': '400'}, b'{"error":"invalid_request"}'), 2196 ]) 2197 2198 with self.assertRaises(client.FlowExchangeError): 2199 client.credentials_from_code( 2200 self.client_id, self.client_secret, self.scope, 2201 self.code, http=http, redirect_uri=self.redirect_uri) 2202 2203 def test_exchange_code_and_file_for_token(self): 2204 payload = (b'{' 2205 b' "access_token":"asdfghjkl",' 2206 b' "expires_in":3600' 2207 b'}') 2208 http = HttpMockSequence([({'status': '200'}, payload)]) 2209 credentials = client.credentials_from_clientsecrets_and_code( 2210 datafile('client_secrets.json'), self.scope, 2211 self.code, http=http) 2212 self.assertEqual(credentials.access_token, 'asdfghjkl') 2213 self.assertNotEqual(None, credentials.token_expiry) 2214 self.assertEqual(set(['foo']), credentials.scopes) 2215 2216 def test_exchange_code_and_cached_file_for_token(self): 2217 http = HttpMockSequence([ 2218 ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'), 2219 ]) 2220 cache_mock = CacheMock() 2221 load_and_cache('client_secrets.json', 'some_secrets', cache_mock) 2222 2223 credentials = client.credentials_from_clientsecrets_and_code( 2224 'some_secrets', self.scope, 2225 self.code, http=http, cache=cache_mock) 2226 self.assertEqual(credentials.access_token, 'asdfghjkl') 2227 self.assertEqual(set(['foo']), credentials.scopes) 2228 2229 def test_exchange_code_and_file_for_token_fail(self): 2230 http = HttpMockSequence([ 2231 ({'status': '400'}, b'{"error":"invalid_request"}'), 2232 ]) 2233 2234 with self.assertRaises(client.FlowExchangeError): 2235 client.credentials_from_clientsecrets_and_code( 2236 datafile('client_secrets.json'), self.scope, 2237 self.code, http=http) 2238 2239 2240class Test__save_private_file(unittest2.TestCase): 2241 2242 def _save_helper(self, filename): 2243 contents = [] 2244 contents_str = '[]' 2245 client._save_private_file(filename, contents) 2246 with open(filename, 'r') as f: 2247 stored_contents = f.read() 2248 self.assertEqual(stored_contents, contents_str) 2249 2250 stat_mode = os.stat(filename).st_mode 2251 # Octal 777, only last 3 positions matter for permissions mask. 2252 stat_mode &= 0o777 2253 self.assertEqual(stat_mode, 0o600) 2254 2255 def test_new(self): 2256 filename = tempfile.mktemp() 2257 self.assertFalse(os.path.exists(filename)) 2258 self._save_helper(filename) 2259 2260 def test_existing(self): 2261 filename = tempfile.mktemp() 2262 with open(filename, 'w') as f: 2263 f.write('a bunch of nonsense longer than []') 2264 self.assertTrue(os.path.exists(filename)) 2265 self._save_helper(filename) 2266 2267 2268class Test__get_application_default_credential_GAE(unittest2.TestCase): 2269 2270 @mock.patch.dict('sys.modules', { 2271 'oauth2client.contrib.appengine': mock.Mock()}) 2272 def test_it(self): 2273 gae_mod = sys.modules['oauth2client.contrib.appengine'] 2274 gae_mod.AppAssertionCredentials = creds_kls = mock.Mock() 2275 creds_kls.return_value = object() 2276 credentials = client._get_application_default_credential_GAE() 2277 self.assertEqual(credentials, creds_kls.return_value) 2278 creds_kls.assert_called_once_with([]) 2279 2280 2281class Test__get_application_default_credential_GCE(unittest2.TestCase): 2282 2283 @mock.patch.dict('sys.modules', { 2284 'oauth2client.contrib.gce': mock.Mock()}) 2285 def test_it(self): 2286 gce_mod = sys.modules['oauth2client.contrib.gce'] 2287 gce_mod.AppAssertionCredentials = creds_kls = mock.Mock() 2288 creds_kls.return_value = object() 2289 credentials = client._get_application_default_credential_GCE() 2290 self.assertEqual(credentials, creds_kls.return_value) 2291 creds_kls.assert_called_once_with() 2292 2293 2294class Test__require_crypto_or_die(unittest2.TestCase): 2295 2296 @mock.patch.object(client, 'HAS_CRYPTO', new=True) 2297 def test_with_crypto(self): 2298 self.assertIsNone(client._require_crypto_or_die()) 2299 2300 @mock.patch.object(client, 'HAS_CRYPTO', new=False) 2301 def test_without_crypto(self): 2302 with self.assertRaises(client.CryptoUnavailableError): 2303 client._require_crypto_or_die() 2304 2305 2306class TestDeviceFlowInfo(unittest2.TestCase): 2307 2308 DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4' 2309 USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792' 2310 VER_URL = 'http://foo.bar' 2311 2312 def test_FromResponse(self): 2313 response = { 2314 'device_code': self.DEVICE_CODE, 2315 'user_code': self.USER_CODE, 2316 'verification_url': self.VER_URL, 2317 } 2318 result = client.DeviceFlowInfo.FromResponse(response) 2319 expected_result = client.DeviceFlowInfo( 2320 self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, None) 2321 self.assertEqual(result, expected_result) 2322 2323 def test_FromResponse_fallback_to_uri(self): 2324 response = { 2325 'device_code': self.DEVICE_CODE, 2326 'user_code': self.USER_CODE, 2327 'verification_uri': self.VER_URL, 2328 } 2329 result = client.DeviceFlowInfo.FromResponse(response) 2330 expected_result = client.DeviceFlowInfo( 2331 self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, None) 2332 self.assertEqual(result, expected_result) 2333 2334 def test_FromResponse_missing_url(self): 2335 response = { 2336 'device_code': self.DEVICE_CODE, 2337 'user_code': self.USER_CODE, 2338 } 2339 with self.assertRaises(client.OAuth2DeviceCodeError): 2340 client.DeviceFlowInfo.FromResponse(response) 2341 2342 @mock.patch('oauth2client.client._UTCNOW') 2343 def test_FromResponse_with_expires_in(self, utcnow): 2344 expires_in = 23 2345 response = { 2346 'device_code': self.DEVICE_CODE, 2347 'user_code': self.USER_CODE, 2348 'verification_url': self.VER_URL, 2349 'expires_in': expires_in, 2350 } 2351 now = datetime.datetime(1999, 1, 1, 12, 30, 27) 2352 expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in) 2353 utcnow.return_value = now 2354 2355 result = client.DeviceFlowInfo.FromResponse(response) 2356 expected_result = client.DeviceFlowInfo( 2357 self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, expire) 2358 self.assertEqual(result, expected_result) 2359