1# Copyright 2014 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit tests for oauth2client.contrib.gce.""" 16 17import datetime 18import json 19 20import httplib2 21import mock 22from six.moves import http_client 23from tests.contrib.test_metadata import request_mock 24import unittest2 25 26from oauth2client import client 27from oauth2client.contrib import gce 28 29__author__ = 'jcgregorio@google.com (Joe Gregorio)' 30 31SERVICE_ACCOUNT_INFO = { 32 'scopes': ['a', 'b'], 33 'email': 'a@example.com', 34 'aliases': ['default'] 35} 36 37 38class AppAssertionCredentialsTests(unittest2.TestCase): 39 40 def test_constructor(self): 41 credentials = gce.AppAssertionCredentials() 42 self.assertIsNone(credentials.assertion_type, None) 43 self.assertIsNone(credentials.service_account_email) 44 self.assertIsNone(credentials.scopes) 45 self.assertTrue(credentials.invalid) 46 47 @mock.patch('warnings.warn') 48 def test_constructor_with_scopes(self, warn_mock): 49 scope = 'http://example.com/a http://example.com/b' 50 scopes = scope.split() 51 credentials = gce.AppAssertionCredentials(scopes=scopes) 52 self.assertEqual(credentials.scopes, None) 53 self.assertEqual(credentials.assertion_type, None) 54 warn_mock.assert_called_once_with(gce._SCOPES_WARNING) 55 56 def test_to_json(self): 57 credentials = gce.AppAssertionCredentials() 58 with self.assertRaises(NotImplementedError): 59 credentials.to_json() 60 61 def test_from_json(self): 62 with self.assertRaises(NotImplementedError): 63 gce.AppAssertionCredentials.from_json({}) 64 65 @mock.patch('oauth2client.contrib._metadata.get_token', 66 side_effect=[('A', datetime.datetime.min), 67 ('B', datetime.datetime.max)]) 68 @mock.patch('oauth2client.contrib._metadata.get_service_account_info', 69 return_value=SERVICE_ACCOUNT_INFO) 70 def test_refresh_token(self, get_info, get_token): 71 http_request = mock.MagicMock() 72 http_mock = mock.MagicMock(request=http_request) 73 credentials = gce.AppAssertionCredentials() 74 credentials.invalid = False 75 credentials.service_account_email = 'a@example.com' 76 self.assertIsNone(credentials.access_token) 77 credentials.get_access_token(http=http_mock) 78 self.assertEqual(credentials.access_token, 'A') 79 self.assertTrue(credentials.access_token_expired) 80 get_token.assert_called_with(http_request, 81 service_account='a@example.com') 82 credentials.get_access_token(http=http_mock) 83 self.assertEqual(credentials.access_token, 'B') 84 self.assertFalse(credentials.access_token_expired) 85 get_token.assert_called_with(http_request, 86 service_account='a@example.com') 87 get_info.assert_not_called() 88 89 def test_refresh_token_failed_fetch(self): 90 http_request = request_mock( 91 http_client.NOT_FOUND, 92 'application/json', 93 json.dumps({'access_token': 'a', 'expires_in': 100}) 94 ) 95 credentials = gce.AppAssertionCredentials() 96 credentials.invalid = False 97 credentials.service_account_email = 'a@example.com' 98 with self.assertRaises(client.HttpAccessTokenRefreshError): 99 credentials._refresh(http_request) 100 101 def test_serialization_data(self): 102 credentials = gce.AppAssertionCredentials() 103 with self.assertRaises(NotImplementedError): 104 getattr(credentials, 'serialization_data') 105 106 def test_create_scoped_required(self): 107 credentials = gce.AppAssertionCredentials() 108 self.assertFalse(credentials.create_scoped_required()) 109 110 def test_sign_blob_not_implemented(self): 111 credentials = gce.AppAssertionCredentials([]) 112 with self.assertRaises(NotImplementedError): 113 credentials.sign_blob(b'blob') 114 115 @mock.patch('oauth2client.contrib._metadata.get_service_account_info', 116 return_value=SERVICE_ACCOUNT_INFO) 117 def test_retrieve_scopes(self, metadata): 118 http_request = mock.MagicMock() 119 http_mock = mock.MagicMock(request=http_request) 120 credentials = gce.AppAssertionCredentials() 121 self.assertTrue(credentials.invalid) 122 self.assertIsNone(credentials.scopes) 123 scopes = credentials.retrieve_scopes(http_mock) 124 self.assertEqual(scopes, SERVICE_ACCOUNT_INFO['scopes']) 125 self.assertFalse(credentials.invalid) 126 credentials.retrieve_scopes(http_mock) 127 # Assert scopes weren't refetched 128 metadata.assert_called_once_with(http_request, 129 service_account='default') 130 131 @mock.patch('oauth2client.contrib._metadata.get_service_account_info', 132 side_effect=httplib2.HttpLib2Error('No Such Email')) 133 def test_retrieve_scopes_bad_email(self, metadata): 134 http_request = mock.MagicMock() 135 http_mock = mock.MagicMock(request=http_request) 136 credentials = gce.AppAssertionCredentials(email='b@example.com') 137 with self.assertRaises(httplib2.HttpLib2Error): 138 credentials.retrieve_scopes(http_mock) 139 140 metadata.assert_called_once_with(http_request, 141 service_account='b@example.com') 142 143 def test_save_to_well_known_file(self): 144 import os 145 ORIGINAL_ISDIR = os.path.isdir 146 try: 147 os.path.isdir = lambda path: True 148 credentials = gce.AppAssertionCredentials() 149 with self.assertRaises(NotImplementedError): 150 client.save_to_well_known_file(credentials) 151 finally: 152 os.path.isdir = ORIGINAL_ISDIR 153