1# Copyright 2014 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unit tests for oauth2client.contrib.gce."""
16
17import datetime
18import json
19
20import httplib2
21import mock
22from six.moves import http_client
23from tests.contrib.test_metadata import request_mock
24import unittest2
25
26from oauth2client import client
27from oauth2client.contrib import gce
28
29__author__ = 'jcgregorio@google.com (Joe Gregorio)'
30
31SERVICE_ACCOUNT_INFO = {
32    'scopes': ['a', 'b'],
33    'email': 'a@example.com',
34    'aliases': ['default']
35}
36
37
38class AppAssertionCredentialsTests(unittest2.TestCase):
39
40    def test_constructor(self):
41        credentials = gce.AppAssertionCredentials()
42        self.assertIsNone(credentials.assertion_type, None)
43        self.assertIsNone(credentials.service_account_email)
44        self.assertIsNone(credentials.scopes)
45        self.assertTrue(credentials.invalid)
46
47    @mock.patch('warnings.warn')
48    def test_constructor_with_scopes(self, warn_mock):
49        scope = 'http://example.com/a http://example.com/b'
50        scopes = scope.split()
51        credentials = gce.AppAssertionCredentials(scopes=scopes)
52        self.assertEqual(credentials.scopes, None)
53        self.assertEqual(credentials.assertion_type, None)
54        warn_mock.assert_called_once_with(gce._SCOPES_WARNING)
55
56    def test_to_json(self):
57        credentials = gce.AppAssertionCredentials()
58        with self.assertRaises(NotImplementedError):
59            credentials.to_json()
60
61    def test_from_json(self):
62        with self.assertRaises(NotImplementedError):
63            gce.AppAssertionCredentials.from_json({})
64
65    @mock.patch('oauth2client.contrib._metadata.get_token',
66                side_effect=[('A', datetime.datetime.min),
67                             ('B', datetime.datetime.max)])
68    @mock.patch('oauth2client.contrib._metadata.get_service_account_info',
69                return_value=SERVICE_ACCOUNT_INFO)
70    def test_refresh_token(self, get_info, get_token):
71        http_request = mock.MagicMock()
72        http_mock = mock.MagicMock(request=http_request)
73        credentials = gce.AppAssertionCredentials()
74        credentials.invalid = False
75        credentials.service_account_email = 'a@example.com'
76        self.assertIsNone(credentials.access_token)
77        credentials.get_access_token(http=http_mock)
78        self.assertEqual(credentials.access_token, 'A')
79        self.assertTrue(credentials.access_token_expired)
80        get_token.assert_called_with(http_request,
81                                     service_account='a@example.com')
82        credentials.get_access_token(http=http_mock)
83        self.assertEqual(credentials.access_token, 'B')
84        self.assertFalse(credentials.access_token_expired)
85        get_token.assert_called_with(http_request,
86                                     service_account='a@example.com')
87        get_info.assert_not_called()
88
89    def test_refresh_token_failed_fetch(self):
90        http_request = request_mock(
91            http_client.NOT_FOUND,
92            'application/json',
93            json.dumps({'access_token': 'a', 'expires_in': 100})
94        )
95        credentials = gce.AppAssertionCredentials()
96        credentials.invalid = False
97        credentials.service_account_email = 'a@example.com'
98        with self.assertRaises(client.HttpAccessTokenRefreshError):
99            credentials._refresh(http_request)
100
101    def test_serialization_data(self):
102        credentials = gce.AppAssertionCredentials()
103        with self.assertRaises(NotImplementedError):
104            getattr(credentials, 'serialization_data')
105
106    def test_create_scoped_required(self):
107        credentials = gce.AppAssertionCredentials()
108        self.assertFalse(credentials.create_scoped_required())
109
110    def test_sign_blob_not_implemented(self):
111        credentials = gce.AppAssertionCredentials([])
112        with self.assertRaises(NotImplementedError):
113            credentials.sign_blob(b'blob')
114
115    @mock.patch('oauth2client.contrib._metadata.get_service_account_info',
116                return_value=SERVICE_ACCOUNT_INFO)
117    def test_retrieve_scopes(self, metadata):
118        http_request = mock.MagicMock()
119        http_mock = mock.MagicMock(request=http_request)
120        credentials = gce.AppAssertionCredentials()
121        self.assertTrue(credentials.invalid)
122        self.assertIsNone(credentials.scopes)
123        scopes = credentials.retrieve_scopes(http_mock)
124        self.assertEqual(scopes, SERVICE_ACCOUNT_INFO['scopes'])
125        self.assertFalse(credentials.invalid)
126        credentials.retrieve_scopes(http_mock)
127        # Assert scopes weren't refetched
128        metadata.assert_called_once_with(http_request,
129                                         service_account='default')
130
131    @mock.patch('oauth2client.contrib._metadata.get_service_account_info',
132                side_effect=httplib2.HttpLib2Error('No Such Email'))
133    def test_retrieve_scopes_bad_email(self, metadata):
134        http_request = mock.MagicMock()
135        http_mock = mock.MagicMock(request=http_request)
136        credentials = gce.AppAssertionCredentials(email='b@example.com')
137        with self.assertRaises(httplib2.HttpLib2Error):
138            credentials.retrieve_scopes(http_mock)
139
140        metadata.assert_called_once_with(http_request,
141                                         service_account='b@example.com')
142
143    def test_save_to_well_known_file(self):
144        import os
145        ORIGINAL_ISDIR = os.path.isdir
146        try:
147            os.path.isdir = lambda path: True
148            credentials = gce.AppAssertionCredentials()
149            with self.assertRaises(NotImplementedError):
150                client.save_to_well_known_file(credentials)
151        finally:
152            os.path.isdir = ORIGINAL_ISDIR
153