1# Copyright 2014 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Oauth2client tests
16
17Unit tests for oauth2client.
18"""
19
20import base64
21import contextlib
22import copy
23import datetime
24import json
25import os
26import socket
27import sys
28import tempfile
29
30import httplib2
31import mock
32import six
33from six.moves import http_client
34from six.moves import urllib
35import unittest2
36
37import oauth2client
38from oauth2client import _helpers
39from oauth2client import client
40from oauth2client import clientsecrets
41from oauth2client import service_account
42from oauth2client import util
43from .http_mock import CacheMock
44from .http_mock import HttpMock
45from .http_mock import HttpMockSequence
46
47__author__ = 'jcgregorio@google.com (Joe Gregorio)'
48
49DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
50
51
52# TODO(craigcitro): This is duplicated from
53# googleapiclient.test_discovery; consolidate these definitions.
54def assertUrisEqual(testcase, expected, actual):
55    """Test that URIs are the same, up to reordering of query parameters."""
56    expected = urllib.parse.urlparse(expected)
57    actual = urllib.parse.urlparse(actual)
58    testcase.assertEqual(expected.scheme, actual.scheme)
59    testcase.assertEqual(expected.netloc, actual.netloc)
60    testcase.assertEqual(expected.path, actual.path)
61    testcase.assertEqual(expected.params, actual.params)
62    testcase.assertEqual(expected.fragment, actual.fragment)
63    expected_query = urllib.parse.parse_qs(expected.query)
64    actual_query = urllib.parse.parse_qs(actual.query)
65    for name in expected_query.keys():
66        testcase.assertEqual(expected_query[name], actual_query[name])
67    for name in actual_query.keys():
68        testcase.assertEqual(expected_query[name], actual_query[name])
69
70
71def datafile(filename):
72    return os.path.join(DATA_DIR, filename)
73
74
75def load_and_cache(existing_file, fakename, cache_mock):
76    client_type, client_info = clientsecrets._loadfile(datafile(existing_file))
77    cache_mock.cache[fakename] = {client_type: client_info}
78
79
80class CredentialsTests(unittest2.TestCase):
81
82    def test_to_from_json(self):
83        credentials = client.Credentials()
84        json = credentials.to_json()
85        client.Credentials.new_from_json(json)
86
87    def test_authorize_abstract(self):
88        credentials = client.Credentials()
89        http = object()
90        with self.assertRaises(NotImplementedError):
91            credentials.authorize(http)
92
93    def test_refresh_abstract(self):
94        credentials = client.Credentials()
95        http = object()
96        with self.assertRaises(NotImplementedError):
97            credentials.refresh(http)
98
99    def test_revoke_abstract(self):
100        credentials = client.Credentials()
101        http = object()
102        with self.assertRaises(NotImplementedError):
103            credentials.revoke(http)
104
105    def test_apply_abstract(self):
106        credentials = client.Credentials()
107        headers = {}
108        with self.assertRaises(NotImplementedError):
109            credentials.apply(headers)
110
111    def test__to_json_basic(self):
112        credentials = client.Credentials()
113        json_payload = credentials._to_json([])
114        # str(bytes) in Python2 and str(unicode) in Python3
115        self.assertIsInstance(json_payload, str)
116        payload = json.loads(json_payload)
117        expected_payload = {
118            '_class': client.Credentials.__name__,
119            '_module': client.Credentials.__module__,
120            'token_expiry': None,
121        }
122        self.assertEqual(payload, expected_payload)
123
124    def test__to_json_with_strip(self):
125        credentials = client.Credentials()
126        credentials.foo = 'bar'
127        credentials.baz = 'quux'
128        to_strip = ['foo']
129        json_payload = credentials._to_json(to_strip)
130        # str(bytes) in Python2 and str(unicode) in Python3
131        self.assertIsInstance(json_payload, str)
132        payload = json.loads(json_payload)
133        expected_payload = {
134            '_class': client.Credentials.__name__,
135            '_module': client.Credentials.__module__,
136            'token_expiry': None,
137            'baz': credentials.baz,
138        }
139        self.assertEqual(payload, expected_payload)
140
141    def test__to_json_to_serialize(self):
142        credentials = client.Credentials()
143        to_serialize = {
144            'foo': b'bar',
145            'baz': u'quux',
146            'st': set(['a', 'b']),
147        }
148        orig_vals = to_serialize.copy()
149        json_payload = credentials._to_json([], to_serialize=to_serialize)
150        # str(bytes) in Python2 and str(unicode) in Python3
151        self.assertIsInstance(json_payload, str)
152        payload = json.loads(json_payload)
153        expected_payload = {
154            '_class': client.Credentials.__name__,
155            '_module': client.Credentials.__module__,
156            'token_expiry': None,
157        }
158        expected_payload.update(to_serialize)
159        # Special-case the set.
160        expected_payload['st'] = list(expected_payload['st'])
161        # Special-case the bytes.
162        expected_payload['foo'] = u'bar'
163        self.assertEqual(payload, expected_payload)
164        # Make sure the method call didn't modify our dictionary.
165        self.assertEqual(to_serialize, orig_vals)
166
167    @mock.patch.object(client.Credentials, '_to_json',
168                       return_value=object())
169    def test_to_json(self, to_json):
170        credentials = client.Credentials()
171        self.assertEqual(credentials.to_json(), to_json.return_value)
172        to_json.assert_called_once_with(
173            client.Credentials.NON_SERIALIZED_MEMBERS)
174
175    def test_new_from_json_no_data(self):
176        creds_data = {}
177        json_data = json.dumps(creds_data)
178        with self.assertRaises(KeyError):
179            client.Credentials.new_from_json(json_data)
180
181    def test_new_from_json_basic_data(self):
182        creds_data = {
183            '_module': 'oauth2client.client',
184            '_class': 'Credentials',
185        }
186        json_data = json.dumps(creds_data)
187        credentials = client.Credentials.new_from_json(json_data)
188        self.assertIsInstance(credentials, client.Credentials)
189
190    def test_new_from_json_old_name(self):
191        creds_data = {
192            '_module': 'oauth2client.googleapiclient.client',
193            '_class': 'Credentials',
194        }
195        json_data = json.dumps(creds_data)
196        credentials = client.Credentials.new_from_json(json_data)
197        self.assertIsInstance(credentials, client.Credentials)
198
199    def test_new_from_json_bad_module(self):
200        creds_data = {
201            '_module': 'oauth2client.foobar',
202            '_class': 'Credentials',
203        }
204        json_data = json.dumps(creds_data)
205        with self.assertRaises(ImportError):
206            client.Credentials.new_from_json(json_data)
207
208    def test_new_from_json_bad_class(self):
209        creds_data = {
210            '_module': 'oauth2client.client',
211            '_class': 'NopeNotCredentials',
212        }
213        json_data = json.dumps(creds_data)
214        with self.assertRaises(AttributeError):
215            client.Credentials.new_from_json(json_data)
216
217    def test_from_json(self):
218        unused_data = {}
219        credentials = client.Credentials.from_json(unused_data)
220        self.assertIsInstance(credentials, client.Credentials)
221        self.assertEqual(credentials.__dict__, {})
222
223
224class TestStorage(unittest2.TestCase):
225
226    def test_locked_get_abstract(self):
227        storage = client.Storage()
228        with self.assertRaises(NotImplementedError):
229            storage.locked_get()
230
231    def test_locked_put_abstract(self):
232        storage = client.Storage()
233        credentials = object()
234        with self.assertRaises(NotImplementedError):
235            storage.locked_put(credentials)
236
237    def test_locked_delete_abstract(self):
238        storage = client.Storage()
239        with self.assertRaises(NotImplementedError):
240            storage.locked_delete()
241
242
243@contextlib.contextmanager
244def mock_module_import(module):
245    """Place a dummy objects in sys.modules to mock an import test."""
246    parts = module.split('.')
247    entries = ['.'.join(parts[:i + 1]) for i in range(len(parts))]
248    for entry in entries:
249        sys.modules[entry] = object()
250
251    try:
252        yield
253
254    finally:
255        for entry in entries:
256            del sys.modules[entry]
257
258
259class GoogleCredentialsTests(unittest2.TestCase):
260
261    def setUp(self):
262        self.os_name = os.name
263        client.SETTINGS.env_name = None
264
265    def tearDown(self):
266        self.reset_env('SERVER_SOFTWARE')
267        self.reset_env(client.GOOGLE_APPLICATION_CREDENTIALS)
268        self.reset_env('APPDATA')
269        os.name = self.os_name
270
271    def reset_env(self, env):
272        """Set the environment variable 'env' to 'value'."""
273        os.environ.pop(env, None)
274
275    def validate_service_account_credentials(self, credentials):
276        self.assertIsInstance(
277            credentials, service_account.ServiceAccountCredentials)
278        self.assertEqual('123', credentials.client_id)
279        self.assertEqual('dummy@google.com',
280                         credentials._service_account_email)
281        self.assertEqual('ABCDEF', credentials._private_key_id)
282        self.assertEqual('', credentials._scopes)
283
284    def validate_google_credentials(self, credentials):
285        self.assertIsInstance(credentials, client.GoogleCredentials)
286        self.assertEqual(None, credentials.access_token)
287        self.assertEqual('123', credentials.client_id)
288        self.assertEqual('secret', credentials.client_secret)
289        self.assertEqual('alabalaportocala', credentials.refresh_token)
290        self.assertEqual(None, credentials.token_expiry)
291        self.assertEqual(oauth2client.GOOGLE_TOKEN_URI, credentials.token_uri)
292        self.assertEqual('Python client library', credentials.user_agent)
293
294    def get_a_google_credentials_object(self):
295        return client.GoogleCredentials(None, None, None, None,
296                                        None, None, None, None)
297
298    def test_create_scoped_required(self):
299        self.assertFalse(
300            self.get_a_google_credentials_object().create_scoped_required())
301
302    def test_create_scoped(self):
303        credentials = self.get_a_google_credentials_object()
304        self.assertEqual(credentials, credentials.create_scoped(None))
305        self.assertEqual(credentials,
306                         credentials.create_scoped(['dummy_scope']))
307
308    @mock.patch.object(client.GoogleCredentials,
309                       '_implicit_credentials_from_files',
310                       return_value=None)
311    @mock.patch.object(client.GoogleCredentials,
312                       '_implicit_credentials_from_gce')
313    @mock.patch.object(client, '_in_gae_environment',
314                       return_value=True)
315    @mock.patch.object(client, '_get_application_default_credential_GAE',
316                       return_value=object())
317    def test_get_application_default_in_gae(self, gae_adc, in_gae,
318                                            from_gce, from_files):
319        credentials = client.GoogleCredentials.get_application_default()
320        self.assertEqual(credentials, gae_adc.return_value)
321        from_files.assert_called_once_with()
322        in_gae.assert_called_once_with()
323        from_gce.assert_not_called()
324
325    @mock.patch.object(client.GoogleCredentials,
326                       '_implicit_credentials_from_gae',
327                       return_value=None)
328    @mock.patch.object(client.GoogleCredentials,
329                       '_implicit_credentials_from_files',
330                       return_value=None)
331    @mock.patch.object(client, '_in_gce_environment',
332                       return_value=True)
333    @mock.patch.object(client, '_get_application_default_credential_GCE',
334                       return_value=object())
335    def test_get_application_default_in_gce(self, gce_adc, in_gce,
336                                            from_files, from_gae):
337        credentials = client.GoogleCredentials.get_application_default()
338        self.assertEqual(credentials, gce_adc.return_value)
339        in_gce.assert_called_once_with()
340        from_gae.assert_called_once_with()
341        from_files.assert_called_once_with()
342
343    def test_environment_check_gae_production(self):
344        with mock_module_import('google.appengine'):
345            self._environment_check_gce_helper(
346                server_software='Google App Engine/XYZ')
347
348    def test_environment_check_gae_local(self):
349        with mock_module_import('google.appengine'):
350            self._environment_check_gce_helper(
351                server_software='Development/XYZ')
352
353    def test_environment_check_fastpath(self):
354        with mock_module_import('google.appengine'):
355            self._environment_check_gce_helper(
356                server_software='Development/XYZ')
357
358    def test_environment_caching(self):
359        os.environ['SERVER_SOFTWARE'] = 'Development/XYZ'
360        with mock_module_import('google.appengine'):
361            self.assertTrue(client._in_gae_environment())
362            os.environ['SERVER_SOFTWARE'] = ''
363            # Even though we no longer pass the environment check, it
364            # is cached.
365            self.assertTrue(client._in_gae_environment())
366
367    def _environment_check_gce_helper(self, status_ok=True, socket_error=False,
368                                      server_software=''):
369        response = mock.MagicMock()
370        if status_ok:
371            response.status = http_client.OK
372            response.getheader = mock.MagicMock(
373                name='getheader',
374                return_value=client._DESIRED_METADATA_FLAVOR)
375        else:
376            response.status = http_client.NOT_FOUND
377
378        connection = mock.MagicMock()
379        connection.getresponse = mock.MagicMock(name='getresponse',
380                                                return_value=response)
381        if socket_error:
382            connection.getresponse.side_effect = socket.error()
383
384        with mock.patch('oauth2client.client.os') as os_module:
385            os_module.environ = {client._SERVER_SOFTWARE: server_software}
386            with mock.patch('oauth2client.client.six') as six_module:
387                http_client_module = six_module.moves.http_client
388                http_client_module.HTTPConnection = mock.MagicMock(
389                    name='HTTPConnection', return_value=connection)
390
391                if server_software == '':
392                    self.assertFalse(client._in_gae_environment())
393                else:
394                    self.assertTrue(client._in_gae_environment())
395
396                if status_ok and not socket_error and server_software == '':
397                    self.assertTrue(client._in_gce_environment())
398                else:
399                    self.assertFalse(client._in_gce_environment())
400
401                if server_software == '':
402                    http_client_module.HTTPConnection.assert_called_once_with(
403                        client._GCE_METADATA_HOST,
404                        timeout=client.GCE_METADATA_TIMEOUT)
405                    connection.getresponse.assert_called_once_with()
406                    # Remaining calls are not "getresponse"
407                    headers = {
408                        client._METADATA_FLAVOR_HEADER: (
409                            client._DESIRED_METADATA_FLAVOR),
410                    }
411                    self.assertEqual(connection.method_calls, [
412                        mock.call.request('GET', '/',
413                                          headers=headers),
414                        mock.call.close(),
415                    ])
416                    self.assertEqual(response.method_calls, [])
417                    if status_ok and not socket_error:
418                        response.getheader.assert_called_once_with(
419                            client._METADATA_FLAVOR_HEADER)
420                else:
421                    self.assertEqual(
422                        http_client_module.HTTPConnection.mock_calls, [])
423                    self.assertEqual(connection.getresponse.mock_calls, [])
424                    # Remaining calls are not "getresponse"
425                    self.assertEqual(connection.method_calls, [])
426                    self.assertEqual(response.method_calls, [])
427                    self.assertEqual(response.getheader.mock_calls, [])
428
429    def test_environment_check_gce_production(self):
430        self._environment_check_gce_helper(status_ok=True)
431
432    def test_environment_check_gce_prod_with_working_gae_imports(self):
433        with mock_module_import('google.appengine'):
434            self._environment_check_gce_helper(status_ok=True)
435
436    def test_environment_check_gce_timeout(self):
437        self._environment_check_gce_helper(socket_error=True)
438
439    def test_environ_check_gae_module_unknown(self):
440        with mock_module_import('google.appengine'):
441            self._environment_check_gce_helper(status_ok=False)
442
443    def test_environment_check_unknown(self):
444        self._environment_check_gce_helper(status_ok=False)
445
446    def test_get_environment_variable_file(self):
447        environment_variable_file = datafile(
448            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
449        os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = (
450            environment_variable_file)
451        self.assertEqual(environment_variable_file,
452                         client._get_environment_variable_file())
453
454    def test_get_environment_variable_file_error(self):
455        nonexistent_file = datafile('nonexistent')
456        os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file
457        expected_err_msg = (
458            'File {0} \(pointed by {1} environment variable\) does not '
459            'exist!'.format(
460                nonexistent_file, client.GOOGLE_APPLICATION_CREDENTIALS))
461        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
462                                     expected_err_msg):
463            client._get_environment_variable_file()
464
465    @mock.patch.dict(os.environ, {}, clear=True)
466    def test_get_environment_variable_file_without_env_var(self):
467        self.assertIsNone(client._get_environment_variable_file())
468
469    @mock.patch('os.name', new='nt')
470    @mock.patch.dict(os.environ, {'APPDATA': DATA_DIR}, clear=True)
471    def test_get_well_known_file_on_windows(self):
472        well_known_file = datafile(
473            os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY,
474                         client._WELL_KNOWN_CREDENTIALS_FILE))
475        self.assertEqual(well_known_file, client._get_well_known_file())
476
477    @mock.patch('os.name', new='nt')
478    @mock.patch.dict(os.environ, {'SystemDrive': 'G:'}, clear=True)
479    def test_get_well_known_file_on_windows_without_appdata(self):
480        well_known_file = os.path.join('G:', '\\',
481                                       client._CLOUDSDK_CONFIG_DIRECTORY,
482                                       client._WELL_KNOWN_CREDENTIALS_FILE)
483        self.assertEqual(well_known_file, client._get_well_known_file())
484
485    @mock.patch.dict(os.environ,
486                     {client._CLOUDSDK_CONFIG_ENV_VAR: 'CUSTOM_DIR'},
487                     clear=True)
488    def test_get_well_known_file_with_custom_config_dir(self):
489        CUSTOM_DIR = os.environ[client._CLOUDSDK_CONFIG_ENV_VAR]
490        EXPECTED_FILE = os.path.join(CUSTOM_DIR,
491                                     client._WELL_KNOWN_CREDENTIALS_FILE)
492        well_known_file = client._get_well_known_file()
493        self.assertEqual(well_known_file, EXPECTED_FILE)
494
495    def test_get_adc_from_file_service_account(self):
496        credentials_file = datafile(
497            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
498        credentials = client._get_application_default_credential_from_file(
499            credentials_file)
500        self.validate_service_account_credentials(credentials)
501
502    def test_save_to_well_known_file_service_account(self):
503        credential_file = datafile(
504            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
505        credentials = client._get_application_default_credential_from_file(
506            credential_file)
507        temp_credential_file = datafile(
508            os.path.join('gcloud',
509                         'temp_well_known_file_service_account.json'))
510        client.save_to_well_known_file(credentials, temp_credential_file)
511        with open(temp_credential_file) as f:
512            d = json.load(f)
513        self.assertEqual('service_account', d['type'])
514        self.assertEqual('123', d['client_id'])
515        self.assertEqual('dummy@google.com', d['client_email'])
516        self.assertEqual('ABCDEF', d['private_key_id'])
517        os.remove(temp_credential_file)
518
519    @mock.patch('os.path.isdir', return_value=False)
520    def test_save_well_known_file_with_non_existent_config_dir(self,
521                                                               isdir_mock):
522        credential_file = datafile(
523            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
524        credentials = client._get_application_default_credential_from_file(
525            credential_file)
526        with self.assertRaises(OSError):
527            client.save_to_well_known_file(credentials)
528        config_dir = os.path.join(os.path.expanduser('~'), '.config', 'gcloud')
529        isdir_mock.assert_called_once_with(config_dir)
530
531    def test_get_adc_from_file_authorized_user(self):
532        credentials_file = datafile(os.path.join(
533            'gcloud',
534            'application_default_credentials_authorized_user.json'))
535        credentials = client._get_application_default_credential_from_file(
536            credentials_file)
537        self.validate_google_credentials(credentials)
538
539    def test_save_to_well_known_file_authorized_user(self):
540        credentials_file = datafile(os.path.join(
541            'gcloud',
542            'application_default_credentials_authorized_user.json'))
543        credentials = client._get_application_default_credential_from_file(
544            credentials_file)
545        temp_credential_file = datafile(
546            os.path.join('gcloud',
547                         'temp_well_known_file_authorized_user.json'))
548        client.save_to_well_known_file(credentials, temp_credential_file)
549        with open(temp_credential_file) as f:
550            d = json.load(f)
551        self.assertEqual('authorized_user', d['type'])
552        self.assertEqual('123', d['client_id'])
553        self.assertEqual('secret', d['client_secret'])
554        self.assertEqual('alabalaportocala', d['refresh_token'])
555        os.remove(temp_credential_file)
556
557    def test_get_application_default_credential_from_malformed_file_1(self):
558        credentials_file = datafile(
559            os.path.join('gcloud',
560                         'application_default_credentials_malformed_1.json'))
561        expected_err_msg = (
562            "'type' field should be defined \(and have one of the '{0}' or "
563            "'{1}' values\)".format(client.AUTHORIZED_USER,
564                                    client.SERVICE_ACCOUNT))
565
566        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
567                                     expected_err_msg):
568            client._get_application_default_credential_from_file(
569                credentials_file)
570
571    def test_get_application_default_credential_from_malformed_file_2(self):
572        credentials_file = datafile(
573            os.path.join('gcloud',
574                         'application_default_credentials_malformed_2.json'))
575        expected_err_msg = (
576            'The following field\(s\) must be defined: private_key_id')
577        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
578                                     expected_err_msg):
579            client._get_application_default_credential_from_file(
580                credentials_file)
581
582    def test_get_application_default_credential_from_malformed_file_3(self):
583        credentials_file = datafile(
584            os.path.join('gcloud',
585                         'application_default_credentials_malformed_3.json'))
586        with self.assertRaises(ValueError):
587            client._get_application_default_credential_from_file(
588                credentials_file)
589
590    def test_raise_exception_for_missing_fields(self):
591        missing_fields = ['first', 'second', 'third']
592        expected_err_msg = ('The following field\(s\) must be defined: ' +
593                            ', '.join(missing_fields))
594        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
595                                     expected_err_msg):
596            client._raise_exception_for_missing_fields(missing_fields)
597
598    def test_raise_exception_for_reading_json(self):
599        credential_file = 'any_file'
600        extra_help = ' be good'
601        error = client.ApplicationDefaultCredentialsError('stuff happens')
602        expected_err_msg = ('An error was encountered while reading '
603                            'json file: ' + credential_file +
604                            extra_help + ': ' + str(error))
605        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
606                                     expected_err_msg):
607            client._raise_exception_for_reading_json(
608                credential_file, extra_help, error)
609
610    @mock.patch('oauth2client.client._in_gce_environment')
611    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
612    @mock.patch('oauth2client.client._get_environment_variable_file')
613    @mock.patch('oauth2client.client._get_well_known_file')
614    def test_get_adc_from_env_var_service_account(self, *stubs):
615        # Set up stubs.
616        get_well_known, get_env_file, in_gae, in_gce = stubs
617        get_env_file.return_value = datafile(
618            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
619
620        credentials = client.GoogleCredentials.get_application_default()
621        self.validate_service_account_credentials(credentials)
622
623        get_env_file.assert_called_once_with()
624        get_well_known.assert_not_called()
625        in_gae.assert_not_called()
626        in_gce.assert_not_called()
627
628    def test_env_name(self):
629        self.assertEqual(None, client.SETTINGS.env_name)
630        self.test_get_adc_from_env_var_service_account()
631        self.assertEqual(client.DEFAULT_ENV_NAME, client.SETTINGS.env_name)
632
633    @mock.patch('oauth2client.client._in_gce_environment')
634    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
635    @mock.patch('oauth2client.client._get_environment_variable_file')
636    @mock.patch('oauth2client.client._get_well_known_file')
637    def test_get_adc_from_env_var_authorized_user(self, *stubs):
638        # Set up stubs.
639        get_well_known, get_env_file, in_gae, in_gce = stubs
640        get_env_file.return_value = datafile(os.path.join(
641            'gcloud',
642            'application_default_credentials_authorized_user.json'))
643
644        credentials = client.GoogleCredentials.get_application_default()
645        self.validate_google_credentials(credentials)
646
647        get_env_file.assert_called_once_with()
648        get_well_known.assert_not_called()
649        in_gae.assert_not_called()
650        in_gce.assert_not_called()
651
652    @mock.patch('oauth2client.client._in_gce_environment')
653    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
654    @mock.patch('oauth2client.client._get_environment_variable_file')
655    @mock.patch('oauth2client.client._get_well_known_file')
656    def test_get_adc_from_env_var_malformed_file(self, *stubs):
657        # Set up stubs.
658        get_well_known, get_env_file, in_gae, in_gce = stubs
659        get_env_file.return_value = datafile(
660            os.path.join('gcloud',
661                         'application_default_credentials_malformed_3.json'))
662
663        expected_err = client.ApplicationDefaultCredentialsError
664        with self.assertRaises(expected_err) as exc_manager:
665            client.GoogleCredentials.get_application_default()
666
667        self.assertTrue(str(exc_manager.exception).startswith(
668            'An error was encountered while reading json file: ' +
669            get_env_file.return_value + ' (pointed to by ' +
670            client.GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):'))
671
672        get_env_file.assert_called_once_with()
673        get_well_known.assert_not_called()
674        in_gae.assert_not_called()
675        in_gce.assert_not_called()
676
677    @mock.patch('oauth2client.client._in_gce_environment', return_value=False)
678    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
679    @mock.patch('oauth2client.client._get_environment_variable_file',
680                return_value=None)
681    @mock.patch('oauth2client.client._get_well_known_file',
682                return_value='BOGUS_FILE')
683    def test_get_adc_env_not_set_up(self, *stubs):
684        # Unpack stubs.
685        get_well_known, get_env_file, in_gae, in_gce = stubs
686        # Make sure the well-known file actually doesn't exist.
687        self.assertFalse(os.path.exists(get_well_known.return_value))
688
689        expected_err = client.ApplicationDefaultCredentialsError
690        with self.assertRaises(expected_err) as exc_manager:
691            client.GoogleCredentials.get_application_default()
692
693        self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception))
694        get_env_file.assert_called_once_with()
695        get_well_known.assert_called_once_with()
696        in_gae.assert_called_once_with()
697        in_gce.assert_called_once_with()
698
699    @mock.patch('oauth2client.client._in_gce_environment', return_value=False)
700    @mock.patch('oauth2client.client._in_gae_environment', return_value=False)
701    @mock.patch('oauth2client.client._get_environment_variable_file',
702                return_value=None)
703    @mock.patch('oauth2client.client._get_well_known_file')
704    def test_get_adc_env_from_well_known(self, *stubs):
705        # Unpack stubs.
706        get_well_known, get_env_file, in_gae, in_gce = stubs
707        # Make sure the well-known file is an actual file.
708        get_well_known.return_value = __file__
709        # Make sure the well-known file actually doesn't exist.
710        self.assertTrue(os.path.exists(get_well_known.return_value))
711
712        method_name = \
713            'oauth2client.client._get_application_default_credential_from_file'
714        result_creds = object()
715        with mock.patch(method_name,
716                        return_value=result_creds) as get_from_file:
717            result = client.GoogleCredentials.get_application_default()
718            self.assertEqual(result, result_creds)
719            get_from_file.assert_called_once_with(__file__)
720
721        get_env_file.assert_called_once_with()
722        get_well_known.assert_called_once_with()
723        in_gae.assert_not_called()
724        in_gce.assert_not_called()
725
726    def test_from_stream_service_account(self):
727        credentials_file = datafile(
728            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
729        credentials = self.get_a_google_credentials_object().from_stream(
730            credentials_file)
731        self.validate_service_account_credentials(credentials)
732
733    def test_from_stream_authorized_user(self):
734        credentials_file = datafile(os.path.join(
735            'gcloud',
736            'application_default_credentials_authorized_user.json'))
737        credentials = self.get_a_google_credentials_object().from_stream(
738            credentials_file)
739        self.validate_google_credentials(credentials)
740
741    def test_from_stream_missing_file(self):
742        credentials_filename = None
743        expected_err_msg = (r'The parameter passed to the from_stream\(\) '
744                            r'method should point to a file.')
745        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
746                                     expected_err_msg):
747            self.get_a_google_credentials_object().from_stream(
748                credentials_filename)
749
750    def test_from_stream_malformed_file_1(self):
751        credentials_file = datafile(
752            os.path.join('gcloud',
753                         'application_default_credentials_malformed_1.json'))
754        expected_err_msg = (
755            'An error was encountered while reading json file: ' +
756            credentials_file +
757            ' \(provided as parameter to the from_stream\(\) method\): ' +
758            "'type' field should be defined \(and have one of the '" +
759            client.AUTHORIZED_USER + "' or '" + client.SERVICE_ACCOUNT +
760            "' values\)")
761        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
762                                     expected_err_msg):
763            self.get_a_google_credentials_object().from_stream(
764                credentials_file)
765
766    def test_from_stream_malformed_file_2(self):
767        credentials_file = datafile(
768            os.path.join('gcloud',
769                         'application_default_credentials_malformed_2.json'))
770        expected_err_msg = (
771            'An error was encountered while reading json file: ' +
772            credentials_file +
773            ' \(provided as parameter to the from_stream\(\) method\): '
774            'The following field\(s\) must be defined: '
775            'private_key_id')
776        with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
777                                     expected_err_msg):
778            self.get_a_google_credentials_object().from_stream(
779                credentials_file)
780
781    def test_from_stream_malformed_file_3(self):
782        credentials_file = datafile(
783            os.path.join('gcloud',
784                         'application_default_credentials_malformed_3.json'))
785        with self.assertRaises(client.ApplicationDefaultCredentialsError):
786            self.get_a_google_credentials_object().from_stream(
787                credentials_file)
788
789    def test_to_from_json_authorized_user(self):
790        filename = 'application_default_credentials_authorized_user.json'
791        credentials_file = datafile(os.path.join('gcloud', filename))
792        creds = client.GoogleCredentials.from_stream(credentials_file)
793        json = creds.to_json()
794        creds2 = client.GoogleCredentials.from_json(json)
795
796        self.assertEqual(creds.__dict__, creds2.__dict__)
797
798    def test_to_from_json_service_account(self):
799        credentials_file = datafile(
800            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
801        creds1 = client.GoogleCredentials.from_stream(credentials_file)
802        # Convert to and then back from json.
803        creds2 = client.GoogleCredentials.from_json(creds1.to_json())
804
805        creds1_vals = creds1.__dict__
806        creds1_vals.pop('_signer')
807        creds2_vals = creds2.__dict__
808        creds2_vals.pop('_signer')
809        self.assertEqual(creds1_vals, creds2_vals)
810
811    def test_to_from_json_service_account_scoped(self):
812        credentials_file = datafile(
813            os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
814        creds1 = client.GoogleCredentials.from_stream(credentials_file)
815        creds1 = creds1.create_scoped(['dummy_scope'])
816        # Convert to and then back from json.
817        creds2 = client.GoogleCredentials.from_json(creds1.to_json())
818
819        creds1_vals = creds1.__dict__
820        creds1_vals.pop('_signer')
821        creds2_vals = creds2.__dict__
822        creds2_vals.pop('_signer')
823        self.assertEqual(creds1_vals, creds2_vals)
824
825    def test_parse_expiry(self):
826        dt = datetime.datetime(2016, 1, 1)
827        parsed_expiry = client._parse_expiry(dt)
828        self.assertEqual('2016-01-01T00:00:00Z', parsed_expiry)
829
830    def test_bad_expiry(self):
831        dt = object()
832        parsed_expiry = client._parse_expiry(dt)
833        self.assertEqual(None, parsed_expiry)
834
835
836class DummyDeleteStorage(client.Storage):
837    delete_called = False
838
839    def locked_delete(self):
840        self.delete_called = True
841
842
843def _token_revoke_test_helper(testcase, status, revoke_raise,
844                              valid_bool_value, token_attr):
845    current_store = getattr(testcase.credentials, 'store', None)
846
847    dummy_store = DummyDeleteStorage()
848    testcase.credentials.set_store(dummy_store)
849
850    actual_do_revoke = testcase.credentials._do_revoke
851    testcase.token_from_revoke = None
852
853    def do_revoke_stub(http_request, token):
854        testcase.token_from_revoke = token
855        return actual_do_revoke(http_request, token)
856    testcase.credentials._do_revoke = do_revoke_stub
857
858    http = HttpMock(headers={'status': status})
859    if revoke_raise:
860        testcase.assertRaises(client.TokenRevokeError,
861                              testcase.credentials.revoke, http)
862    else:
863        testcase.credentials.revoke(http)
864
865    testcase.assertEqual(getattr(testcase.credentials, token_attr),
866                         testcase.token_from_revoke)
867    testcase.assertEqual(valid_bool_value, testcase.credentials.invalid)
868    testcase.assertEqual(valid_bool_value, dummy_store.delete_called)
869
870    testcase.credentials.set_store(current_store)
871
872
873class BasicCredentialsTests(unittest2.TestCase):
874
875    def setUp(self):
876        access_token = 'foo'
877        client_id = 'some_client_id'
878        client_secret = 'cOuDdkfjxxnv+'
879        refresh_token = '1/0/a.df219fjls0'
880        token_expiry = datetime.datetime.utcnow()
881        user_agent = 'refresh_checker/1.0'
882        self.credentials = client.OAuth2Credentials(
883            access_token, client_id, client_secret,
884            refresh_token, token_expiry, oauth2client.GOOGLE_TOKEN_URI,
885            user_agent, revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
886            scopes='foo', token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
887
888        # Provoke a failure if @util.positional is not respected.
889        self.old_positional_enforcement = (
890            util.positional_parameters_enforcement)
891        util.positional_parameters_enforcement = (
892            util.POSITIONAL_EXCEPTION)
893
894    def tearDown(self):
895        util.positional_parameters_enforcement = (
896            self.old_positional_enforcement)
897
898    def test_token_refresh_success(self):
899        for status_code in client.REFRESH_STATUS_CODES:
900            token_response = {'access_token': '1/3w', 'expires_in': 3600}
901            http = HttpMockSequence([
902                ({'status': status_code}, b''),
903                ({'status': '200'}, json.dumps(token_response).encode(
904                    'utf-8')),
905                ({'status': '200'}, 'echo_request_headers'),
906            ])
907            http = self.credentials.authorize(http)
908            resp, content = http.request('http://example.com')
909            self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
910            self.assertFalse(self.credentials.access_token_expired)
911            self.assertEqual(token_response, self.credentials.token_response)
912
913    def test_recursive_authorize(self):
914        """Tests that OAuth2Credentials doesn't intro. new method constraints.
915
916        Formerly, OAuth2Credentials.authorize monkeypatched the request method
917        of its httplib2.Http argument with a wrapper annotated with
918        @util.positional(1). Since the original method has no such annotation,
919        that meant that the wrapper was violating the contract of the original
920        method by adding a new requirement to it. And in fact the wrapper
921        itself doesn't even respect that requirement. So before the removal of
922        the annotation, this test would fail.
923        """
924        token_response = {'access_token': '1/3w', 'expires_in': 3600}
925        encoded_response = json.dumps(token_response).encode('utf-8')
926        http = HttpMockSequence([
927            ({'status': '200'}, encoded_response),
928        ])
929        http = self.credentials.authorize(http)
930        http = self.credentials.authorize(http)
931        http.request('http://example.com')
932
933    def test_token_refresh_failure(self):
934        for status_code in client.REFRESH_STATUS_CODES:
935            http = HttpMockSequence([
936                ({'status': status_code}, b''),
937                ({'status': http_client.BAD_REQUEST},
938                 b'{"error":"access_denied"}'),
939            ])
940            http = self.credentials.authorize(http)
941            with self.assertRaises(
942                    client.HttpAccessTokenRefreshError) as exc_manager:
943                http.request('http://example.com')
944            self.assertEqual(http_client.BAD_REQUEST,
945                             exc_manager.exception.status)
946            self.assertTrue(self.credentials.access_token_expired)
947            self.assertEqual(None, self.credentials.token_response)
948
949    def test_token_revoke_success(self):
950        _token_revoke_test_helper(
951            self, '200', revoke_raise=False,
952            valid_bool_value=True, token_attr='refresh_token')
953
954    def test_token_revoke_failure(self):
955        _token_revoke_test_helper(
956            self, '400', revoke_raise=True,
957            valid_bool_value=False, token_attr='refresh_token')
958
959    def test_token_revoke_fallback(self):
960        original_credentials = self.credentials.to_json()
961        self.credentials.refresh_token = None
962        _token_revoke_test_helper(
963            self, '200', revoke_raise=False,
964            valid_bool_value=True, token_attr='access_token')
965        self.credentials = self.credentials.from_json(original_credentials)
966
967    def test_non_401_error_response(self):
968        http = HttpMockSequence([
969            ({'status': '400'}, b''),
970        ])
971        http = self.credentials.authorize(http)
972        resp, content = http.request('http://example.com')
973        self.assertEqual(http_client.BAD_REQUEST, resp.status)
974        self.assertEqual(None, self.credentials.token_response)
975
976    def test_to_from_json(self):
977        json = self.credentials.to_json()
978        instance = client.OAuth2Credentials.from_json(json)
979        self.assertEqual(client.OAuth2Credentials, type(instance))
980        instance.token_expiry = None
981        self.credentials.token_expiry = None
982
983        self.assertEqual(instance.__dict__, self.credentials.__dict__)
984
985    def test_from_json_token_expiry(self):
986        data = json.loads(self.credentials.to_json())
987        data['token_expiry'] = None
988        instance = client.OAuth2Credentials.from_json(json.dumps(data))
989        self.assertIsInstance(instance, client.OAuth2Credentials)
990
991    def test_from_json_bad_token_expiry(self):
992        data = json.loads(self.credentials.to_json())
993        data['token_expiry'] = 'foobar'
994        instance = client.OAuth2Credentials.from_json(json.dumps(data))
995        self.assertIsInstance(instance, client.OAuth2Credentials)
996
997    def test_unicode_header_checks(self):
998        access_token = u'foo'
999        client_id = u'some_client_id'
1000        client_secret = u'cOuDdkfjxxnv+'
1001        refresh_token = u'1/0/a.df219fjls0'
1002        token_expiry = str(datetime.datetime.utcnow())
1003        token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
1004        revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
1005        user_agent = u'refresh_checker/1.0'
1006        credentials = client.OAuth2Credentials(
1007            access_token, client_id, client_secret, refresh_token,
1008            token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)
1009
1010        # First, test that we correctly encode basic objects, making sure
1011        # to include a bytes object. Note that oauth2client will normalize
1012        # everything to bytes, no matter what python version we're in.
1013        http = credentials.authorize(HttpMock())
1014        headers = {u'foo': 3, b'bar': True, 'baz': b'abc'}
1015        cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'}
1016        http.request(u'http://example.com', method=u'GET', headers=headers)
1017        for k, v in cleaned_headers.items():
1018            self.assertTrue(k in http.headers)
1019            self.assertEqual(v, http.headers[k])
1020
1021        # Next, test that we do fail on unicode.
1022        unicode_str = six.unichr(40960) + 'abcd'
1023        with self.assertRaises(client.NonAsciiHeaderError):
1024            http.request(u'http://example.com', method=u'GET',
1025                         headers={u'foo': unicode_str})
1026
1027    def test_no_unicode_in_request_params(self):
1028        access_token = u'foo'
1029        client_id = u'some_client_id'
1030        client_secret = u'cOuDdkfjxxnv+'
1031        refresh_token = u'1/0/a.df219fjls0'
1032        token_expiry = str(datetime.datetime.utcnow())
1033        token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
1034        revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
1035        user_agent = u'refresh_checker/1.0'
1036        credentials = client.OAuth2Credentials(
1037            access_token, client_id, client_secret, refresh_token,
1038            token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)
1039
1040        http = HttpMock()
1041        http = credentials.authorize(http)
1042        http.request(u'http://example.com', method=u'GET',
1043                     headers={u'foo': u'bar'})
1044        for k, v in six.iteritems(http.headers):
1045            self.assertIsInstance(k, six.binary_type)
1046            self.assertIsInstance(v, six.binary_type)
1047
1048        # Test again with unicode strings that can't simply be converted
1049        # to ASCII.
1050        with self.assertRaises(client.NonAsciiHeaderError):
1051            http.request(
1052                u'http://example.com', method=u'GET',
1053                headers={u'foo': u'\N{COMET}'})
1054
1055        self.credentials.token_response = 'foobar'
1056        instance = client.OAuth2Credentials.from_json(
1057            self.credentials.to_json())
1058        self.assertEqual('foobar', instance.token_response)
1059
1060    def test__expires_in_no_expiry(self):
1061        credentials = client.OAuth2Credentials(None, None, None, None,
1062                                               None, None, None)
1063        self.assertIsNone(credentials.token_expiry)
1064        self.assertIsNone(credentials._expires_in())
1065
1066    @mock.patch('oauth2client.client._UTCNOW')
1067    def test__expires_in_expired(self, utcnow):
1068        credentials = client.OAuth2Credentials(None, None, None, None,
1069                                               None, None, None)
1070        credentials.token_expiry = datetime.datetime.utcnow()
1071        now = credentials.token_expiry + datetime.timedelta(seconds=1)
1072        self.assertLess(credentials.token_expiry, now)
1073        utcnow.return_value = now
1074        self.assertEqual(credentials._expires_in(), 0)
1075        utcnow.assert_called_once_with()
1076
1077    @mock.patch('oauth2client.client._UTCNOW')
1078    def test__expires_in_not_expired(self, utcnow):
1079        credentials = client.OAuth2Credentials(None, None, None, None,
1080                                               None, None, None)
1081        credentials.token_expiry = datetime.datetime.utcnow()
1082        seconds = 1234
1083        now = credentials.token_expiry - datetime.timedelta(seconds=seconds)
1084        self.assertLess(now, credentials.token_expiry)
1085        utcnow.return_value = now
1086        self.assertEqual(credentials._expires_in(), seconds)
1087        utcnow.assert_called_once_with()
1088
1089    @mock.patch('oauth2client.client._UTCNOW')
1090    def test_get_access_token(self, utcnow):
1091        # Configure the patch.
1092        seconds = 11
1093        NOW = datetime.datetime(1992, 12, 31, second=seconds)
1094        utcnow.return_value = NOW
1095
1096        lifetime = 2  # number of seconds in which the token expires
1097        EXPIRY_TIME = datetime.datetime(1992, 12, 31,
1098                                        second=seconds + lifetime)
1099
1100        token1 = u'first_token'
1101        token_response_first = {
1102            'access_token': token1,
1103            'expires_in': lifetime,
1104        }
1105        token2 = u'second_token'
1106        token_response_second = {
1107            'access_token': token2,
1108            'expires_in': lifetime,
1109        }
1110        http = HttpMockSequence([
1111            ({'status': '200'}, json.dumps(token_response_first).encode(
1112                'utf-8')),
1113            ({'status': '200'}, json.dumps(token_response_second).encode(
1114                'utf-8')),
1115        ])
1116
1117        # Use the current credentials but unset the expiry and
1118        # the access token.
1119        credentials = copy.deepcopy(self.credentials)
1120        credentials.access_token = None
1121        credentials.token_expiry = None
1122
1123        # Get Access Token, First attempt.
1124        self.assertEqual(credentials.access_token, None)
1125        self.assertFalse(credentials.access_token_expired)
1126        self.assertEqual(credentials.token_expiry, None)
1127        token = credentials.get_access_token(http=http)
1128        self.assertEqual(credentials.token_expiry, EXPIRY_TIME)
1129        self.assertEqual(token1, token.access_token)
1130        self.assertEqual(lifetime, token.expires_in)
1131        self.assertEqual(token_response_first, credentials.token_response)
1132        # Two utcnow calls are expected:
1133        # - get_access_token() -> _do_refresh_request (setting expires in)
1134        # - get_access_token() -> _expires_in()
1135        expected_utcnow_calls = [mock.call()] * 2
1136        self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
1137
1138        # Get Access Token, Second Attempt (not expired)
1139        self.assertEqual(credentials.access_token, token1)
1140        self.assertFalse(credentials.access_token_expired)
1141        token = credentials.get_access_token(http=http)
1142        # Make sure no refresh occurred since the token was not expired.
1143        self.assertEqual(token1, token.access_token)
1144        self.assertEqual(lifetime, token.expires_in)
1145        self.assertEqual(token_response_first, credentials.token_response)
1146        # Three more utcnow calls are expected:
1147        # - access_token_expired
1148        # - get_access_token() -> access_token_expired
1149        # - get_access_token -> _expires_in
1150        expected_utcnow_calls = [mock.call()] * (2 + 3)
1151        self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
1152
1153        # Get Access Token, Third Attempt (force expiration)
1154        self.assertEqual(credentials.access_token, token1)
1155        credentials.token_expiry = NOW  # Manually force expiry.
1156        self.assertTrue(credentials.access_token_expired)
1157        token = credentials.get_access_token(http=http)
1158        # Make sure refresh occurred since the token was not expired.
1159        self.assertEqual(token2, token.access_token)
1160        self.assertEqual(lifetime, token.expires_in)
1161        self.assertFalse(credentials.access_token_expired)
1162        self.assertEqual(token_response_second,
1163                         credentials.token_response)
1164        # Five more utcnow calls are expected:
1165        # - access_token_expired
1166        # - get_access_token -> access_token_expired
1167        # - get_access_token -> _do_refresh_request
1168        # - get_access_token -> _expires_in
1169        # - access_token_expired
1170        expected_utcnow_calls = [mock.call()] * (2 + 3 + 5)
1171        self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
1172
1173    @mock.patch.object(client.OAuth2Credentials, 'refresh')
1174    @mock.patch.object(client.OAuth2Credentials, '_expires_in',
1175                       return_value=1835)
1176    def test_get_access_token_without_http(self, expires_in, refresh_mock):
1177        credentials = client.OAuth2Credentials(None, None, None, None,
1178                                               None, None, None)
1179        # Make sure access_token_expired returns True
1180        credentials.invalid = True
1181        # Specify a token so we can use it in the response.
1182        credentials.access_token = 'ya29-s3kr3t'
1183
1184        with mock.patch('httplib2.Http',
1185                        return_value=object) as http_kls:
1186            token_info = credentials.get_access_token()
1187            expires_in.assert_called_once_with()
1188            refresh_mock.assert_called_once_with(http_kls.return_value)
1189
1190        self.assertIsInstance(token_info, client.AccessTokenInfo)
1191        self.assertEqual(token_info.access_token,
1192                         credentials.access_token)
1193        self.assertEqual(token_info.expires_in,
1194                         expires_in.return_value)
1195
1196    @mock.patch.object(client.OAuth2Credentials, 'refresh')
1197    @mock.patch.object(client.OAuth2Credentials, '_expires_in',
1198                       return_value=1835)
1199    def test_get_access_token_with_http(self, expires_in, refresh_mock):
1200        credentials = client.OAuth2Credentials(None, None, None, None,
1201                                               None, None, None)
1202        # Make sure access_token_expired returns True
1203        credentials.invalid = True
1204        # Specify a token so we can use it in the response.
1205        credentials.access_token = 'ya29-s3kr3t'
1206
1207        http_obj = object()
1208        token_info = credentials.get_access_token(http_obj)
1209        self.assertIsInstance(token_info, client.AccessTokenInfo)
1210        self.assertEqual(token_info.access_token,
1211                         credentials.access_token)
1212        self.assertEqual(token_info.expires_in,
1213                         expires_in.return_value)
1214
1215        expires_in.assert_called_once_with()
1216        refresh_mock.assert_called_once_with(http_obj)
1217
1218    @mock.patch.object(client.OAuth2Credentials,
1219                       '_generate_refresh_request_headers',
1220                       return_value=object())
1221    @mock.patch.object(client.OAuth2Credentials,
1222                       '_generate_refresh_request_body',
1223                       return_value=object())
1224    @mock.patch('oauth2client.client.logger')
1225    def _do_refresh_request_test_helper(self, response, content,
1226                                        error_msg, logger, gen_body,
1227                                        gen_headers, store=None):
1228        credentials = client.OAuth2Credentials(None, None, None, None,
1229                                               None, None, None)
1230        credentials.store = store
1231        http_request = mock.Mock()
1232        http_request.return_value = response, content
1233
1234        with self.assertRaises(
1235                client.HttpAccessTokenRefreshError) as exc_manager:
1236            credentials._do_refresh_request(http_request)
1237
1238        self.assertEqual(exc_manager.exception.args, (error_msg,))
1239        self.assertEqual(exc_manager.exception.status, response.status)
1240        http_request.assert_called_once_with(None, body=gen_body.return_value,
1241                                             headers=gen_headers.return_value,
1242                                             method='POST')
1243
1244        call1 = mock.call('Refreshing access_token')
1245        failure_template = 'Failed to retrieve access token: %s'
1246        call2 = mock.call(failure_template, content)
1247        self.assertEqual(logger.info.mock_calls, [call1, call2])
1248        if store is not None:
1249            store.locked_put.assert_called_once_with(credentials)
1250
1251    def test__do_refresh_request_non_json_failure(self):
1252        response = httplib2.Response({
1253            'status': int(http_client.BAD_REQUEST),
1254        })
1255        content = u'Bad request'
1256        error_msg = 'Invalid response {0}.'.format(int(response.status))
1257        self._do_refresh_request_test_helper(response, content, error_msg)
1258
1259    def test__do_refresh_request_basic_failure(self):
1260        response = httplib2.Response({
1261            'status': int(http_client.INTERNAL_SERVER_ERROR),
1262        })
1263        content = u'{}'
1264        error_msg = 'Invalid response {0}.'.format(int(response.status))
1265        self._do_refresh_request_test_helper(response, content, error_msg)
1266
1267    def test__do_refresh_request_failure_w_json_error(self):
1268        response = httplib2.Response({
1269            'status': http_client.BAD_GATEWAY,
1270        })
1271        error_msg = 'Hi I am an error not a bearer'
1272        content = json.dumps({'error': error_msg})
1273        self._do_refresh_request_test_helper(response, content, error_msg)
1274
1275    def test__do_refresh_request_failure_w_json_error_and_store(self):
1276        response = httplib2.Response({
1277            'status': http_client.BAD_GATEWAY,
1278        })
1279        error_msg = 'Where are we going wearer?'
1280        content = json.dumps({'error': error_msg})
1281        store = mock.MagicMock()
1282        self._do_refresh_request_test_helper(response, content, error_msg,
1283                                             store=store)
1284
1285    def test__do_refresh_request_failure_w_json_error_and_desc(self):
1286        response = httplib2.Response({
1287            'status': http_client.SERVICE_UNAVAILABLE,
1288        })
1289        base_error = 'Ruckus'
1290        error_desc = 'Can you describe the ruckus'
1291        content = json.dumps({
1292            'error': base_error,
1293            'error_description': error_desc,
1294        })
1295        error_msg = '{0}: {1}'.format(base_error, error_desc)
1296        self._do_refresh_request_test_helper(response, content, error_msg)
1297
1298    @mock.patch('oauth2client.client.logger')
1299    def _do_revoke_test_helper(self, response, content,
1300                               error_msg, logger, store=None):
1301        credentials = client.OAuth2Credentials(
1302            None, None, None, None, None, None, None,
1303            revoke_uri=oauth2client.GOOGLE_REVOKE_URI)
1304        credentials.store = store
1305        http_request = mock.Mock()
1306        http_request.return_value = response, content
1307        token = u's3kr3tz'
1308
1309        if response.status == http_client.OK:
1310            self.assertFalse(credentials.invalid)
1311            self.assertIsNone(credentials._do_revoke(http_request, token))
1312            self.assertTrue(credentials.invalid)
1313            if store is not None:
1314                store.delete.assert_called_once_with()
1315        else:
1316            self.assertFalse(credentials.invalid)
1317            with self.assertRaises(client.TokenRevokeError) as exc_manager:
1318                credentials._do_revoke(http_request, token)
1319            # Make sure invalid was not flipped on.
1320            self.assertFalse(credentials.invalid)
1321            self.assertEqual(exc_manager.exception.args, (error_msg,))
1322            if store is not None:
1323                store.delete.assert_not_called()
1324
1325        revoke_uri = oauth2client.GOOGLE_REVOKE_URI + '?token=' + token
1326        http_request.assert_called_once_with(revoke_uri)
1327
1328        logger.info.assert_called_once_with('Revoking token')
1329
1330    def test__do_revoke_success(self):
1331        response = httplib2.Response({
1332            'status': http_client.OK,
1333        })
1334        self._do_revoke_test_helper(response, b'', None)
1335
1336    def test__do_revoke_success_with_store(self):
1337        response = httplib2.Response({
1338            'status': http_client.OK,
1339        })
1340        store = mock.MagicMock()
1341        self._do_revoke_test_helper(response, b'', None, store=store)
1342
1343    def test__do_revoke_non_json_failure(self):
1344        response = httplib2.Response({
1345            'status': http_client.BAD_REQUEST,
1346        })
1347        content = u'Bad request'
1348        error_msg = 'Invalid response {0}.'.format(response.status)
1349        self._do_revoke_test_helper(response, content, error_msg)
1350
1351    def test__do_revoke_basic_failure(self):
1352        response = httplib2.Response({
1353            'status': http_client.INTERNAL_SERVER_ERROR,
1354        })
1355        content = u'{}'
1356        error_msg = 'Invalid response {0}.'.format(response.status)
1357        self._do_revoke_test_helper(response, content, error_msg)
1358
1359    def test__do_revoke_failure_w_json_error(self):
1360        response = httplib2.Response({
1361            'status': http_client.BAD_GATEWAY,
1362        })
1363        error_msg = 'Hi I am an error not a bearer'
1364        content = json.dumps({'error': error_msg})
1365        self._do_revoke_test_helper(response, content, error_msg)
1366
1367    def test__do_revoke_failure_w_json_error_and_store(self):
1368        response = httplib2.Response({
1369            'status': http_client.BAD_GATEWAY,
1370        })
1371        error_msg = 'Where are we going wearer?'
1372        content = json.dumps({'error': error_msg})
1373        store = mock.MagicMock()
1374        self._do_revoke_test_helper(response, content, error_msg,
1375                                    store=store)
1376
1377    @mock.patch('oauth2client.client.logger')
1378    def _do_retrieve_scopes_test_helper(self, response, content,
1379                                        error_msg, logger, scopes=None):
1380        credentials = client.OAuth2Credentials(
1381            None, None, None, None, None, None, None,
1382            token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
1383        http_request = mock.Mock()
1384        http_request.return_value = response, content
1385        token = u's3kr3tz'
1386
1387        if response.status == http_client.OK:
1388            self.assertEqual(credentials.scopes, set())
1389            self.assertIsNone(
1390                credentials._do_retrieve_scopes(http_request, token))
1391            self.assertEqual(credentials.scopes, scopes)
1392        else:
1393            self.assertEqual(credentials.scopes, set())
1394            with self.assertRaises(client.Error) as exc_manager:
1395                credentials._do_retrieve_scopes(http_request, token)
1396            # Make sure scopes were not changed.
1397            self.assertEqual(credentials.scopes, set())
1398            self.assertEqual(exc_manager.exception.args, (error_msg,))
1399
1400        token_uri = client._update_query_params(
1401            oauth2client.GOOGLE_TOKEN_INFO_URI,
1402            {'fields': 'scope', 'access_token': token})
1403        self.assertEqual(len(http_request.mock_calls), 1)
1404        scopes_call = http_request.mock_calls[0]
1405        call_args = scopes_call[1]
1406        self.assertEqual(len(call_args), 1)
1407        called_uri = call_args[0]
1408        assertUrisEqual(self, token_uri, called_uri)
1409        logger.info.assert_called_once_with('Refreshing scopes')
1410
1411    def test__do_retrieve_scopes_success_bad_json(self):
1412        response = httplib2.Response({
1413            'status': http_client.OK,
1414        })
1415        invalid_json = b'{'
1416        with self.assertRaises(ValueError):
1417            self._do_retrieve_scopes_test_helper(response, invalid_json, None)
1418
1419    def test__do_retrieve_scopes_success(self):
1420        response = httplib2.Response({
1421            'status': http_client.OK,
1422        })
1423        content = b'{"scope": "foo bar"}'
1424        self._do_retrieve_scopes_test_helper(response, content, None,
1425                                             scopes=set(['foo', 'bar']))
1426
1427    def test__do_retrieve_scopes_non_json_failure(self):
1428        response = httplib2.Response({
1429            'status': http_client.BAD_REQUEST,
1430        })
1431        content = u'Bad request'
1432        error_msg = 'Invalid response {0}.'.format(response.status)
1433        self._do_retrieve_scopes_test_helper(response, content, error_msg)
1434
1435    def test__do_retrieve_scopes_basic_failure(self):
1436        response = httplib2.Response({
1437            'status': http_client.INTERNAL_SERVER_ERROR,
1438        })
1439        content = u'{}'
1440        error_msg = 'Invalid response {0}.'.format(response.status)
1441        self._do_retrieve_scopes_test_helper(response, content, error_msg)
1442
1443    def test__do_retrieve_scopes_failure_w_json_error(self):
1444        response = httplib2.Response({
1445            'status': http_client.BAD_GATEWAY,
1446        })
1447        error_msg = 'Error desc I sit at a desk'
1448        content = json.dumps({'error_description': error_msg})
1449        self._do_retrieve_scopes_test_helper(response, content, error_msg)
1450
1451    def test_has_scopes(self):
1452        self.assertTrue(self.credentials.has_scopes('foo'))
1453        self.assertTrue(self.credentials.has_scopes(['foo']))
1454        self.assertFalse(self.credentials.has_scopes('bar'))
1455        self.assertFalse(self.credentials.has_scopes(['bar']))
1456
1457        self.credentials.scopes = set(['foo', 'bar'])
1458        self.assertTrue(self.credentials.has_scopes('foo'))
1459        self.assertTrue(self.credentials.has_scopes('bar'))
1460        self.assertFalse(self.credentials.has_scopes('baz'))
1461        self.assertTrue(self.credentials.has_scopes(['foo', 'bar']))
1462        self.assertFalse(self.credentials.has_scopes(['foo', 'baz']))
1463
1464        self.credentials.scopes = set([])
1465        self.assertFalse(self.credentials.has_scopes('foo'))
1466
1467    def test_retrieve_scopes(self):
1468        info_response_first = {'scope': 'foo bar'}
1469        info_response_second = {'error_description': 'abcdef'}
1470        http = HttpMockSequence([
1471            ({'status': '200'}, json.dumps(info_response_first).encode(
1472                'utf-8')),
1473            ({'status': '400'}, json.dumps(info_response_second).encode(
1474                'utf-8')),
1475            ({'status': '500'}, b''),
1476        ])
1477
1478        self.credentials.retrieve_scopes(http)
1479        self.assertEqual(set(['foo', 'bar']), self.credentials.scopes)
1480
1481        with self.assertRaises(client.Error):
1482            self.credentials.retrieve_scopes(http)
1483
1484        with self.assertRaises(client.Error):
1485            self.credentials.retrieve_scopes(http)
1486
1487    def test_refresh_updates_id_token(self):
1488        for status_code in client.REFRESH_STATUS_CODES:
1489            body = {'foo': 'bar'}
1490            body_json = json.dumps(body).encode('ascii')
1491            payload = base64.urlsafe_b64encode(body_json).strip(b'=')
1492            jwt = b'stuff.' + payload + b'.signature'
1493
1494            token_response = (b'{'
1495                              b'  "access_token":"1/3w",'
1496                              b'  "expires_in":3600,'
1497                              b'  "id_token": "' + jwt + b'"'
1498                              b'}')
1499            http = HttpMockSequence([
1500                ({'status': status_code}, b''),
1501                ({'status': '200'}, token_response),
1502                ({'status': '200'}, 'echo_request_headers'),
1503            ])
1504            http = self.credentials.authorize(http)
1505            resp, content = http.request('http://example.com')
1506            self.assertEqual(self.credentials.id_token, body)
1507
1508
1509class AccessTokenCredentialsTests(unittest2.TestCase):
1510
1511    def setUp(self):
1512        access_token = 'foo'
1513        user_agent = 'refresh_checker/1.0'
1514        self.credentials = client.AccessTokenCredentials(
1515            access_token, user_agent,
1516            revoke_uri=oauth2client.GOOGLE_REVOKE_URI)
1517
1518    def test_token_refresh_success(self):
1519        for status_code in client.REFRESH_STATUS_CODES:
1520            http = HttpMockSequence([
1521                ({'status': status_code}, b''),
1522            ])
1523            http = self.credentials.authorize(http)
1524            with self.assertRaises(client.AccessTokenCredentialsError):
1525                resp, content = http.request('http://example.com')
1526
1527    def test_token_revoke_success(self):
1528        _token_revoke_test_helper(
1529            self, '200', revoke_raise=False,
1530            valid_bool_value=True, token_attr='access_token')
1531
1532    def test_token_revoke_failure(self):
1533        _token_revoke_test_helper(
1534            self, '400', revoke_raise=True,
1535            valid_bool_value=False, token_attr='access_token')
1536
1537    def test_non_401_error_response(self):
1538        http = HttpMockSequence([
1539            ({'status': '400'}, b''),
1540        ])
1541        http = self.credentials.authorize(http)
1542        resp, content = http.request('http://example.com')
1543        self.assertEqual(http_client.BAD_REQUEST, resp.status)
1544
1545    def test_auth_header_sent(self):
1546        http = HttpMockSequence([
1547            ({'status': '200'}, 'echo_request_headers'),
1548        ])
1549        http = self.credentials.authorize(http)
1550        resp, content = http.request('http://example.com')
1551        self.assertEqual(b'Bearer foo', content[b'Authorization'])
1552
1553
1554class TestAssertionCredentials(unittest2.TestCase):
1555    assertion_text = 'This is the assertion'
1556    assertion_type = 'http://www.google.com/assertionType'
1557
1558    class AssertionCredentialsTestImpl(client.AssertionCredentials):
1559
1560        def _generate_assertion(self):
1561            return TestAssertionCredentials.assertion_text
1562
1563    def setUp(self):
1564        user_agent = 'fun/2.0'
1565        self.credentials = self.AssertionCredentialsTestImpl(
1566            self.assertion_type, user_agent=user_agent)
1567
1568    def test__generate_assertion_abstract(self):
1569        credentials = client.AssertionCredentials(None)
1570        with self.assertRaises(NotImplementedError):
1571            credentials._generate_assertion()
1572
1573    def test_assertion_body(self):
1574        body = urllib.parse.parse_qs(
1575            self.credentials._generate_refresh_request_body())
1576        self.assertEqual(self.assertion_text, body['assertion'][0])
1577        self.assertEqual('urn:ietf:params:oauth:grant-type:jwt-bearer',
1578                         body['grant_type'][0])
1579
1580    def test_assertion_refresh(self):
1581        http = HttpMockSequence([
1582            ({'status': '200'}, b'{"access_token":"1/3w"}'),
1583            ({'status': '200'}, 'echo_request_headers'),
1584        ])
1585        http = self.credentials.authorize(http)
1586        resp, content = http.request('http://example.com')
1587        self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
1588
1589    def test_token_revoke_success(self):
1590        _token_revoke_test_helper(
1591            self, '200', revoke_raise=False,
1592            valid_bool_value=True, token_attr='access_token')
1593
1594    def test_token_revoke_failure(self):
1595        _token_revoke_test_helper(
1596            self, '400', revoke_raise=True,
1597            valid_bool_value=False, token_attr='access_token')
1598
1599    def test_sign_blob_abstract(self):
1600        credentials = client.AssertionCredentials(None)
1601        with self.assertRaises(NotImplementedError):
1602            credentials.sign_blob(b'blob')
1603
1604
1605class UpdateQueryParamsTest(unittest2.TestCase):
1606    def test_update_query_params_no_params(self):
1607        uri = 'http://www.google.com'
1608        updated = client._update_query_params(uri, {'a': 'b'})
1609        self.assertEqual(updated, uri + '?a=b')
1610
1611    def test_update_query_params_existing_params(self):
1612        uri = 'http://www.google.com?x=y'
1613        updated = client._update_query_params(uri, {'a': 'b', 'c': 'd&'})
1614        hardcoded_update = uri + '&a=b&c=d%26'
1615        assertUrisEqual(self, updated, hardcoded_update)
1616
1617
1618class ExtractIdTokenTest(unittest2.TestCase):
1619    """Tests client._extract_id_token()."""
1620
1621    def test_extract_success(self):
1622        body = {'foo': 'bar'}
1623        body_json = json.dumps(body).encode('ascii')
1624        payload = base64.urlsafe_b64encode(body_json).strip(b'=')
1625        jwt = b'stuff.' + payload + b'.signature'
1626
1627        extracted = client._extract_id_token(jwt)
1628        self.assertEqual(extracted, body)
1629
1630    def test_extract_failure(self):
1631        body = {'foo': 'bar'}
1632        body_json = json.dumps(body).encode('ascii')
1633        payload = base64.urlsafe_b64encode(body_json).strip(b'=')
1634        jwt = b'stuff.' + payload
1635        with self.assertRaises(client.VerifyJwtTokenError):
1636            client._extract_id_token(jwt)
1637
1638
1639class OAuth2WebServerFlowTest(unittest2.TestCase):
1640
1641    def setUp(self):
1642        self.flow = client.OAuth2WebServerFlow(
1643            client_id='client_id+1',
1644            client_secret='secret+1',
1645            scope='foo',
1646            redirect_uri=client.OOB_CALLBACK_URN,
1647            user_agent='unittest-sample/1.0',
1648            revoke_uri='dummy_revoke_uri',
1649        )
1650
1651    def test_construct_authorize_url(self):
1652        authorize_url = self.flow.step1_get_authorize_url(state='state+1')
1653
1654        parsed = urllib.parse.urlparse(authorize_url)
1655        q = urllib.parse.parse_qs(parsed[4])
1656        self.assertEqual('client_id+1', q['client_id'][0])
1657        self.assertEqual('code', q['response_type'][0])
1658        self.assertEqual('foo', q['scope'][0])
1659        self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0])
1660        self.assertEqual('offline', q['access_type'][0])
1661        self.assertEqual('state+1', q['state'][0])
1662
1663    def test_override_flow_via_kwargs(self):
1664        """Passing kwargs to override defaults."""
1665        flow = client.OAuth2WebServerFlow(
1666            client_id='client_id+1',
1667            client_secret='secret+1',
1668            scope='foo',
1669            redirect_uri=client.OOB_CALLBACK_URN,
1670            user_agent='unittest-sample/1.0',
1671            access_type='online',
1672            response_type='token'
1673        )
1674        authorize_url = flow.step1_get_authorize_url()
1675
1676        parsed = urllib.parse.urlparse(authorize_url)
1677        q = urllib.parse.parse_qs(parsed[4])
1678        self.assertEqual('client_id+1', q['client_id'][0])
1679        self.assertEqual('token', q['response_type'][0])
1680        self.assertEqual('foo', q['scope'][0])
1681        self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0])
1682        self.assertEqual('online', q['access_type'][0])
1683
1684    def test__oauth2_web_server_flow_params(self):
1685        params = client._oauth2_web_server_flow_params({})
1686        self.assertEqual(params['access_type'], 'offline')
1687        self.assertEqual(params['response_type'], 'code')
1688
1689        params = client._oauth2_web_server_flow_params({
1690            'approval_prompt': 'force'})
1691        self.assertEqual(params['prompt'], 'consent')
1692        self.assertNotIn('approval_prompt', params)
1693
1694        params = client._oauth2_web_server_flow_params({
1695            'approval_prompt': 'other'})
1696        self.assertEqual(params['approval_prompt'], 'other')
1697
1698    @mock.patch('oauth2client.client.logger')
1699    def test_step1_get_authorize_url_redirect_override(self, logger):
1700        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo',
1701                                          redirect_uri=client.OOB_CALLBACK_URN)
1702        alt_redirect = 'foo:bar'
1703        self.assertEqual(flow.redirect_uri, client.OOB_CALLBACK_URN)
1704        result = flow.step1_get_authorize_url(redirect_uri=alt_redirect)
1705        # Make sure the redirect value was updated.
1706        self.assertEqual(flow.redirect_uri, alt_redirect)
1707        query_params = {
1708            'client_id': flow.client_id,
1709            'redirect_uri': alt_redirect,
1710            'scope': flow.scope,
1711            'access_type': 'offline',
1712            'response_type': 'code',
1713        }
1714        expected = client._update_query_params(flow.auth_uri, query_params)
1715        assertUrisEqual(self, expected, result)
1716        # Check stubs.
1717        self.assertEqual(logger.warning.call_count, 1)
1718
1719    def test_step1_get_authorize_url_without_redirect(self):
1720        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo',
1721                                          redirect_uri=None)
1722        with self.assertRaises(ValueError):
1723            flow.step1_get_authorize_url(redirect_uri=None)
1724
1725    def test_step1_get_authorize_url_without_login_hint(self):
1726        login_hint = 'There are wascally wabbits nearby'
1727        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo',
1728                                          redirect_uri=client.OOB_CALLBACK_URN,
1729                                          login_hint=login_hint)
1730        result = flow.step1_get_authorize_url()
1731        query_params = {
1732            'client_id': flow.client_id,
1733            'login_hint': login_hint,
1734            'redirect_uri': client.OOB_CALLBACK_URN,
1735            'scope': flow.scope,
1736            'access_type': 'offline',
1737            'response_type': 'code',
1738        }
1739        expected = client._update_query_params(flow.auth_uri, query_params)
1740        assertUrisEqual(self, expected, result)
1741
1742    def test_step1_get_device_and_user_codes_wo_device_uri(self):
1743        flow = client.OAuth2WebServerFlow('CID', scope='foo', device_uri=None)
1744        with self.assertRaises(ValueError):
1745            flow.step1_get_device_and_user_codes()
1746
1747    def _step1_get_device_and_user_codes_helper(
1748            self, extra_headers=None, user_agent=None, default_http=False,
1749            content=None):
1750        flow = client.OAuth2WebServerFlow('CID', scope='foo',
1751                                          user_agent=user_agent)
1752        device_code = 'bfc06756-062e-430f-9f0f-460ca44724e5'
1753        user_code = '5faf2780-fc83-11e5-9bc2-00c2c63e5792'
1754        ver_url = 'http://foo.bar'
1755        if content is None:
1756            content = json.dumps({
1757                'device_code': device_code,
1758                'user_code': user_code,
1759                'verification_url': ver_url,
1760            })
1761        http = HttpMockSequence([
1762            ({'status': http_client.OK}, content),
1763        ])
1764        if default_http:
1765            with mock.patch('httplib2.Http', return_value=http):
1766                result = flow.step1_get_device_and_user_codes()
1767        else:
1768            result = flow.step1_get_device_and_user_codes(http=http)
1769
1770        expected = client.DeviceFlowInfo(
1771            device_code, user_code, None, ver_url, None)
1772        self.assertEqual(result, expected)
1773        self.assertEqual(len(http.requests), 1)
1774        self.assertEqual(
1775            http.requests[0]['uri'], oauth2client.GOOGLE_DEVICE_URI)
1776        body = http.requests[0]['body']
1777        self.assertEqual(urllib.parse.parse_qs(body),
1778                         {'client_id': [flow.client_id],
1779                          'scope': [flow.scope]})
1780        headers = {'content-type': 'application/x-www-form-urlencoded'}
1781        if extra_headers is not None:
1782            headers.update(extra_headers)
1783        self.assertEqual(http.requests[0]['headers'], headers)
1784
1785    def test_step1_get_device_and_user_codes(self):
1786        self._step1_get_device_and_user_codes_helper()
1787
1788    def test_step1_get_device_and_user_codes_w_user_agent(self):
1789        user_agent = 'spiderman'
1790        extra_headers = {'user-agent': user_agent}
1791        self._step1_get_device_and_user_codes_helper(
1792            user_agent=user_agent, extra_headers=extra_headers)
1793
1794    def test_step1_get_device_and_user_codes_w_default_http(self):
1795        self._step1_get_device_and_user_codes_helper(default_http=True)
1796
1797    def test_step1_get_device_and_user_codes_bad_payload(self):
1798        non_json_content = b'{'
1799        with self.assertRaises(client.OAuth2DeviceCodeError):
1800            self._step1_get_device_and_user_codes_helper(
1801                content=non_json_content)
1802
1803    def _step1_get_device_and_user_codes_fail_helper(self, status,
1804                                                     content, error_msg):
1805        flow = client.OAuth2WebServerFlow('CID', scope='foo')
1806        http = HttpMockSequence([
1807            ({'status': status}, content),
1808        ])
1809        with self.assertRaises(client.OAuth2DeviceCodeError) as exc_manager:
1810            flow.step1_get_device_and_user_codes(http=http)
1811
1812        self.assertEqual(exc_manager.exception.args, (error_msg,))
1813
1814    def test_step1_get_device_and_user_codes_non_json_failure(self):
1815        status = int(http_client.BAD_REQUEST)
1816        content = 'Nope not JSON.'
1817        error_msg = 'Invalid response {0}.'.format(status)
1818        self._step1_get_device_and_user_codes_fail_helper(status, content,
1819                                                          error_msg)
1820
1821    def test_step1_get_device_and_user_codes_basic_failure(self):
1822        status = int(http_client.INTERNAL_SERVER_ERROR)
1823        content = b'{}'
1824        error_msg = 'Invalid response {0}.'.format(status)
1825        self._step1_get_device_and_user_codes_fail_helper(status, content,
1826                                                          error_msg)
1827
1828    def test_step1_get_device_and_user_codes_failure_w_json_error(self):
1829        status = int(http_client.BAD_GATEWAY)
1830        base_error = 'ZOMG user codes failure.'
1831        content = json.dumps({'error': base_error})
1832        error_msg = 'Invalid response {0}. Error: {1}'.format(status,
1833                                                              base_error)
1834        self._step1_get_device_and_user_codes_fail_helper(status, content,
1835                                                          error_msg)
1836
1837    def test_step2_exchange_no_input(self):
1838        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo')
1839        with self.assertRaises(ValueError):
1840            flow.step2_exchange()
1841
1842    def test_step2_exchange_code_and_device_flow(self):
1843        flow = client.OAuth2WebServerFlow('client_id+1', scope='foo')
1844        with self.assertRaises(ValueError):
1845            flow.step2_exchange(code='code', device_flow_info='dfi')
1846
1847    def test_scope_is_required(self):
1848        with self.assertRaises(TypeError):
1849            client.OAuth2WebServerFlow('client_id+1')
1850
1851    def test_exchange_failure(self):
1852        http = HttpMockSequence([
1853            ({'status': '400'}, b'{"error":"invalid_request"}'),
1854        ])
1855
1856        with self.assertRaises(client.FlowExchangeError):
1857            self.flow.step2_exchange(code='some random code', http=http)
1858
1859    def test_urlencoded_exchange_failure(self):
1860        http = HttpMockSequence([
1861            ({'status': '400'}, b'error=invalid_request'),
1862        ])
1863
1864        with self.assertRaisesRegexp(client.FlowExchangeError,
1865                                     'invalid_request'):
1866            self.flow.step2_exchange(code='some random code', http=http)
1867
1868    def test_exchange_failure_with_json_error(self):
1869        # Some providers have 'error' attribute as a JSON object
1870        # in place of regular string.
1871        # This test makes sure no strange object-to-string coversion
1872        # exceptions are being raised instead of FlowExchangeError.
1873        payload = (b'{'
1874                   b'  "error": {'
1875                   b'    "message": "Error validating verification code.",'
1876                   b'    "type": "OAuthException"'
1877                   b'  }'
1878                   b'}')
1879        http = HttpMockSequence([({'status': '400'}, payload)])
1880
1881        with self.assertRaises(client.FlowExchangeError):
1882            self.flow.step2_exchange(code='some random code', http=http)
1883
1884    def _exchange_success_test_helper(self, code=None, device_flow_info=None):
1885        payload = (b'{'
1886                   b'  "access_token":"SlAV32hkKG",'
1887                   b'  "expires_in":3600,'
1888                   b'  "refresh_token":"8xLOxBtZp8"'
1889                   b'}')
1890        http = HttpMockSequence([({'status': '200'}, payload)])
1891        credentials = self.flow.step2_exchange(
1892            code=code, device_flow_info=device_flow_info, http=http)
1893        self.assertEqual('SlAV32hkKG', credentials.access_token)
1894        self.assertNotEqual(None, credentials.token_expiry)
1895        self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
1896        self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
1897        self.assertEqual(set(['foo']), credentials.scopes)
1898
1899    def test_exchange_success(self):
1900        self._exchange_success_test_helper(code='some random code')
1901
1902    def test_exchange_success_with_device_flow_info(self):
1903        device_flow_info = client.DeviceFlowInfo(
1904            'some random code', None, None, None, None)
1905        self._exchange_success_test_helper(device_flow_info=device_flow_info)
1906
1907    def test_exchange_success_binary_code(self):
1908        binary_code = b'some random code'
1909        access_token = 'SlAV32hkKG'
1910        expires_in = '3600'
1911        refresh_token = '8xLOxBtZp8'
1912        revoke_uri = 'dummy_revoke_uri'
1913
1914        payload = ('{'
1915                   '  "access_token":"' + access_token + '",'
1916                   '  "expires_in":' + expires_in + ','
1917                   '  "refresh_token":"' + refresh_token + '"'
1918                   '}')
1919        http = HttpMockSequence(
1920            [({'status': '200'}, _helpers._to_bytes(payload))])
1921        credentials = self.flow.step2_exchange(code=binary_code, http=http)
1922        self.assertEqual(access_token, credentials.access_token)
1923        self.assertIsNotNone(credentials.token_expiry)
1924        self.assertEqual(refresh_token, credentials.refresh_token)
1925        self.assertEqual(revoke_uri, credentials.revoke_uri)
1926        self.assertEqual(set(['foo']), credentials.scopes)
1927
1928    def test_exchange_dictlike(self):
1929        class FakeDict(object):
1930            def __init__(self, d):
1931                self.d = d
1932
1933            def __getitem__(self, name):
1934                return self.d[name]
1935
1936            def __contains__(self, name):
1937                return name in self.d
1938
1939        code = 'some random code'
1940        not_a_dict = FakeDict({'code': code})
1941        payload = (b'{'
1942                   b'  "access_token":"SlAV32hkKG",'
1943                   b'  "expires_in":3600,'
1944                   b'  "refresh_token":"8xLOxBtZp8"'
1945                   b'}')
1946        http = HttpMockSequence([({'status': '200'}, payload)])
1947
1948        credentials = self.flow.step2_exchange(code=not_a_dict, http=http)
1949        self.assertEqual('SlAV32hkKG', credentials.access_token)
1950        self.assertNotEqual(None, credentials.token_expiry)
1951        self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
1952        self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
1953        self.assertEqual(set(['foo']), credentials.scopes)
1954        request_code = urllib.parse.parse_qs(
1955            http.requests[0]['body'])['code'][0]
1956        self.assertEqual(code, request_code)
1957
1958    def test_exchange_using_authorization_header(self):
1959        auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
1960        flow = client.OAuth2WebServerFlow(
1961            client_id='client_id+1',
1962            authorization_header=auth_header,
1963            scope='foo',
1964            redirect_uri=client.OOB_CALLBACK_URN,
1965            user_agent='unittest-sample/1.0',
1966            revoke_uri='dummy_revoke_uri',
1967        )
1968        http = HttpMockSequence([
1969            ({'status': '200'}, b'access_token=SlAV32hkKG'),
1970        ])
1971
1972        credentials = flow.step2_exchange(code='some random code', http=http)
1973        self.assertEqual('SlAV32hkKG', credentials.access_token)
1974
1975        test_request = http.requests[0]
1976        # Did we pass the Authorization header?
1977        self.assertEqual(test_request['headers']['Authorization'], auth_header)
1978        # Did we omit client_secret from POST body?
1979        self.assertTrue('client_secret' not in test_request['body'])
1980
1981    def test_urlencoded_exchange_success(self):
1982        http = HttpMockSequence([
1983            ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'),
1984        ])
1985
1986        credentials = self.flow.step2_exchange(code='some random code',
1987                                               http=http)
1988        self.assertEqual('SlAV32hkKG', credentials.access_token)
1989        self.assertNotEqual(None, credentials.token_expiry)
1990
1991    def test_urlencoded_expires_param(self):
1992        http = HttpMockSequence([
1993            # Note the 'expires=3600' where you'd normally
1994            # have if named 'expires_in'
1995            ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'),
1996        ])
1997
1998        credentials = self.flow.step2_exchange(code='some random code',
1999                                               http=http)
2000        self.assertNotEqual(None, credentials.token_expiry)
2001
2002    def test_exchange_no_expires_in(self):
2003        payload = (b'{'
2004                   b'  "access_token":"SlAV32hkKG",'
2005                   b'  "refresh_token":"8xLOxBtZp8"'
2006                   b'}')
2007        http = HttpMockSequence([({'status': '200'}, payload)])
2008
2009        credentials = self.flow.step2_exchange(code='some random code',
2010                                               http=http)
2011        self.assertEqual(None, credentials.token_expiry)
2012
2013    def test_urlencoded_exchange_no_expires_in(self):
2014        http = HttpMockSequence([
2015            # This might be redundant but just to make sure
2016            # urlencoded access_token gets parsed correctly
2017            ({'status': '200'}, b'access_token=SlAV32hkKG'),
2018        ])
2019
2020        credentials = self.flow.step2_exchange(code='some random code',
2021                                               http=http)
2022        self.assertEqual(None, credentials.token_expiry)
2023
2024    def test_exchange_fails_if_no_code(self):
2025        payload = (b'{'
2026                   b'  "access_token":"SlAV32hkKG",'
2027                   b'  "refresh_token":"8xLOxBtZp8"'
2028                   b'}')
2029        http = HttpMockSequence([({'status': '200'}, payload)])
2030
2031        code = {'error': 'thou shall not pass'}
2032        with self.assertRaisesRegexp(
2033                client.FlowExchangeError, 'shall not pass'):
2034            self.flow.step2_exchange(code=code, http=http)
2035
2036    def test_exchange_id_token_fail(self):
2037        payload = (b'{'
2038                   b'  "access_token":"SlAV32hkKG",'
2039                   b'  "refresh_token":"8xLOxBtZp8",'
2040                   b'  "id_token": "stuff.payload"'
2041                   b'}')
2042        http = HttpMockSequence([({'status': '200'}, payload)])
2043
2044        with self.assertRaises(client.VerifyJwtTokenError):
2045            self.flow.step2_exchange(code='some random code', http=http)
2046
2047    def test_exchange_id_token(self):
2048        body = {'foo': 'bar'}
2049        body_json = json.dumps(body).encode('ascii')
2050        payload = base64.urlsafe_b64encode(body_json).strip(b'=')
2051        jwt = (base64.urlsafe_b64encode(b'stuff') + b'.' + payload + b'.' +
2052               base64.urlsafe_b64encode(b'signature'))
2053
2054        payload = (b'{'
2055                   b'  "access_token":"SlAV32hkKG",'
2056                   b'  "refresh_token":"8xLOxBtZp8",'
2057                   b'  "id_token": "' + jwt + b'"'
2058                   b'}')
2059        http = HttpMockSequence([({'status': '200'}, payload)])
2060        credentials = self.flow.step2_exchange(code='some random code',
2061                                               http=http)
2062        self.assertEqual(credentials.id_token, body)
2063
2064
2065class FlowFromCachedClientsecrets(unittest2.TestCase):
2066
2067    def test_flow_from_clientsecrets_cached(self):
2068        cache_mock = CacheMock()
2069        load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
2070
2071        flow = client.flow_from_clientsecrets(
2072            'some_secrets', '', redirect_uri='oob', cache=cache_mock)
2073        self.assertEqual('foo_client_secret', flow.client_secret)
2074
2075    @mock.patch('oauth2client.clientsecrets.loadfile')
2076    def _flow_from_clientsecrets_success_helper(self, loadfile_mock,
2077                                                device_uri=None,
2078                                                revoke_uri=None):
2079        client_type = clientsecrets.TYPE_WEB
2080        client_info = {
2081            'auth_uri': 'auth_uri',
2082            'token_uri': 'token_uri',
2083            'client_id': 'client_id',
2084            'client_secret': 'client_secret',
2085        }
2086        if revoke_uri is not None:
2087            client_info['revoke_uri'] = revoke_uri
2088        loadfile_mock.return_value = client_type, client_info
2089        filename = object()
2090        scope = ['baz']
2091        cache = object()
2092
2093        if device_uri is not None:
2094            result = client.flow_from_clientsecrets(
2095                filename, scope, cache=cache, device_uri=device_uri)
2096            self.assertEqual(result.device_uri, device_uri)
2097        else:
2098            result = client.flow_from_clientsecrets(
2099                filename, scope, cache=cache)
2100
2101        self.assertIsInstance(result, client.OAuth2WebServerFlow)
2102        loadfile_mock.assert_called_once_with(filename, cache=cache)
2103
2104    def test_flow_from_clientsecrets_success(self):
2105        self._flow_from_clientsecrets_success_helper()
2106
2107    def test_flow_from_clientsecrets_success_w_device_uri(self):
2108        device_uri = 'http://device.uri'
2109        self._flow_from_clientsecrets_success_helper(device_uri=device_uri)
2110
2111    def test_flow_from_clientsecrets_success_w_revoke_uri(self):
2112        revoke_uri = 'http://revoke.uri'
2113        self._flow_from_clientsecrets_success_helper(revoke_uri=revoke_uri)
2114
2115    @mock.patch('oauth2client.clientsecrets.loadfile',
2116                side_effect=clientsecrets.InvalidClientSecretsError)
2117    def test_flow_from_clientsecrets_invalid(self, loadfile_mock):
2118        filename = object()
2119        cache = object()
2120        with self.assertRaises(clientsecrets.InvalidClientSecretsError):
2121            client.flow_from_clientsecrets(
2122                filename, None, cache=cache, message=None)
2123        loadfile_mock.assert_called_once_with(filename, cache=cache)
2124
2125    @mock.patch('oauth2client.clientsecrets.loadfile',
2126                side_effect=clientsecrets.InvalidClientSecretsError)
2127    @mock.patch('sys.exit')
2128    def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit,
2129                                                   loadfile_mock):
2130        filename = object()
2131        cache = object()
2132        message = 'hi mom'
2133
2134        client.flow_from_clientsecrets(
2135            filename, None, cache=cache, message=message)
2136        sys_exit.assert_called_once_with(message)
2137        loadfile_mock.assert_called_once_with(filename, cache=cache)
2138
2139    @mock.patch('oauth2client.clientsecrets.loadfile',
2140                side_effect=clientsecrets.InvalidClientSecretsError('foobar'))
2141    @mock.patch('sys.exit')
2142    def test_flow_from_clientsecrets_invalid_w_msg_and_text(self, sys_exit,
2143                                                            loadfile_mock):
2144        filename = object()
2145        cache = object()
2146        message = 'hi mom'
2147        expected = ('The client secrets were invalid: '
2148                    '\n{0}\n{1}'.format('foobar', 'hi mom'))
2149
2150        client.flow_from_clientsecrets(
2151            filename, None, cache=cache, message=message)
2152        sys_exit.assert_called_once_with(expected)
2153        loadfile_mock.assert_called_once_with(filename, cache=cache)
2154
2155    @mock.patch('oauth2client.clientsecrets.loadfile')
2156    def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock):
2157        client_type = 'UNKNOWN'
2158        loadfile_mock.return_value = client_type, None
2159        filename = object()
2160        cache = object()
2161
2162        err_msg = ('This OAuth 2.0 flow is unsupported: '
2163                   '{0!r}'.format(client_type))
2164        with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError,
2165                                     err_msg):
2166            client.flow_from_clientsecrets(filename, None, cache=cache)
2167
2168        loadfile_mock.assert_called_once_with(filename, cache=cache)
2169
2170
2171class CredentialsFromCodeTests(unittest2.TestCase):
2172
2173    def setUp(self):
2174        self.client_id = 'client_id_abc'
2175        self.client_secret = 'secret_use_code'
2176        self.scope = 'foo'
2177        self.code = '12345abcde'
2178        self.redirect_uri = 'postmessage'
2179
2180    def test_exchange_code_for_token(self):
2181        token = 'asdfghjkl'
2182        payload = json.dumps({'access_token': token, 'expires_in': 3600})
2183        http = HttpMockSequence([
2184            ({'status': '200'}, payload.encode('utf-8')),
2185        ])
2186        credentials = client.credentials_from_code(
2187            self.client_id, self.client_secret, self.scope,
2188            self.code, http=http, redirect_uri=self.redirect_uri)
2189        self.assertEqual(credentials.access_token, token)
2190        self.assertNotEqual(None, credentials.token_expiry)
2191        self.assertEqual(set(['foo']), credentials.scopes)
2192
2193    def test_exchange_code_for_token_fail(self):
2194        http = HttpMockSequence([
2195            ({'status': '400'}, b'{"error":"invalid_request"}'),
2196        ])
2197
2198        with self.assertRaises(client.FlowExchangeError):
2199            client.credentials_from_code(
2200                self.client_id, self.client_secret, self.scope,
2201                self.code, http=http, redirect_uri=self.redirect_uri)
2202
2203    def test_exchange_code_and_file_for_token(self):
2204        payload = (b'{'
2205                   b'  "access_token":"asdfghjkl",'
2206                   b'  "expires_in":3600'
2207                   b'}')
2208        http = HttpMockSequence([({'status': '200'}, payload)])
2209        credentials = client.credentials_from_clientsecrets_and_code(
2210            datafile('client_secrets.json'), self.scope,
2211            self.code, http=http)
2212        self.assertEqual(credentials.access_token, 'asdfghjkl')
2213        self.assertNotEqual(None, credentials.token_expiry)
2214        self.assertEqual(set(['foo']), credentials.scopes)
2215
2216    def test_exchange_code_and_cached_file_for_token(self):
2217        http = HttpMockSequence([
2218            ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'),
2219        ])
2220        cache_mock = CacheMock()
2221        load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
2222
2223        credentials = client.credentials_from_clientsecrets_and_code(
2224            'some_secrets', self.scope,
2225            self.code, http=http, cache=cache_mock)
2226        self.assertEqual(credentials.access_token, 'asdfghjkl')
2227        self.assertEqual(set(['foo']), credentials.scopes)
2228
2229    def test_exchange_code_and_file_for_token_fail(self):
2230        http = HttpMockSequence([
2231            ({'status': '400'}, b'{"error":"invalid_request"}'),
2232        ])
2233
2234        with self.assertRaises(client.FlowExchangeError):
2235            client.credentials_from_clientsecrets_and_code(
2236                datafile('client_secrets.json'), self.scope,
2237                self.code, http=http)
2238
2239
2240class Test__save_private_file(unittest2.TestCase):
2241
2242    def _save_helper(self, filename):
2243        contents = []
2244        contents_str = '[]'
2245        client._save_private_file(filename, contents)
2246        with open(filename, 'r') as f:
2247            stored_contents = f.read()
2248        self.assertEqual(stored_contents, contents_str)
2249
2250        stat_mode = os.stat(filename).st_mode
2251        # Octal 777, only last 3 positions matter for permissions mask.
2252        stat_mode &= 0o777
2253        self.assertEqual(stat_mode, 0o600)
2254
2255    def test_new(self):
2256        filename = tempfile.mktemp()
2257        self.assertFalse(os.path.exists(filename))
2258        self._save_helper(filename)
2259
2260    def test_existing(self):
2261        filename = tempfile.mktemp()
2262        with open(filename, 'w') as f:
2263            f.write('a bunch of nonsense longer than []')
2264        self.assertTrue(os.path.exists(filename))
2265        self._save_helper(filename)
2266
2267
2268class Test__get_application_default_credential_GAE(unittest2.TestCase):
2269
2270    @mock.patch.dict('sys.modules', {
2271        'oauth2client.contrib.appengine': mock.Mock()})
2272    def test_it(self):
2273        gae_mod = sys.modules['oauth2client.contrib.appengine']
2274        gae_mod.AppAssertionCredentials = creds_kls = mock.Mock()
2275        creds_kls.return_value = object()
2276        credentials = client._get_application_default_credential_GAE()
2277        self.assertEqual(credentials, creds_kls.return_value)
2278        creds_kls.assert_called_once_with([])
2279
2280
2281class Test__get_application_default_credential_GCE(unittest2.TestCase):
2282
2283    @mock.patch.dict('sys.modules', {
2284        'oauth2client.contrib.gce': mock.Mock()})
2285    def test_it(self):
2286        gce_mod = sys.modules['oauth2client.contrib.gce']
2287        gce_mod.AppAssertionCredentials = creds_kls = mock.Mock()
2288        creds_kls.return_value = object()
2289        credentials = client._get_application_default_credential_GCE()
2290        self.assertEqual(credentials, creds_kls.return_value)
2291        creds_kls.assert_called_once_with()
2292
2293
2294class Test__require_crypto_or_die(unittest2.TestCase):
2295
2296    @mock.patch.object(client, 'HAS_CRYPTO', new=True)
2297    def test_with_crypto(self):
2298        self.assertIsNone(client._require_crypto_or_die())
2299
2300    @mock.patch.object(client, 'HAS_CRYPTO', new=False)
2301    def test_without_crypto(self):
2302        with self.assertRaises(client.CryptoUnavailableError):
2303            client._require_crypto_or_die()
2304
2305
2306class TestDeviceFlowInfo(unittest2.TestCase):
2307
2308    DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4'
2309    USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792'
2310    VER_URL = 'http://foo.bar'
2311
2312    def test_FromResponse(self):
2313        response = {
2314            'device_code': self.DEVICE_CODE,
2315            'user_code': self.USER_CODE,
2316            'verification_url': self.VER_URL,
2317        }
2318        result = client.DeviceFlowInfo.FromResponse(response)
2319        expected_result = client.DeviceFlowInfo(
2320            self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, None)
2321        self.assertEqual(result, expected_result)
2322
2323    def test_FromResponse_fallback_to_uri(self):
2324        response = {
2325            'device_code': self.DEVICE_CODE,
2326            'user_code': self.USER_CODE,
2327            'verification_uri': self.VER_URL,
2328        }
2329        result = client.DeviceFlowInfo.FromResponse(response)
2330        expected_result = client.DeviceFlowInfo(
2331            self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, None)
2332        self.assertEqual(result, expected_result)
2333
2334    def test_FromResponse_missing_url(self):
2335        response = {
2336            'device_code': self.DEVICE_CODE,
2337            'user_code': self.USER_CODE,
2338        }
2339        with self.assertRaises(client.OAuth2DeviceCodeError):
2340            client.DeviceFlowInfo.FromResponse(response)
2341
2342    @mock.patch('oauth2client.client._UTCNOW')
2343    def test_FromResponse_with_expires_in(self, utcnow):
2344        expires_in = 23
2345        response = {
2346            'device_code': self.DEVICE_CODE,
2347            'user_code': self.USER_CODE,
2348            'verification_url': self.VER_URL,
2349            'expires_in': expires_in,
2350        }
2351        now = datetime.datetime(1999, 1, 1, 12, 30, 27)
2352        expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in)
2353        utcnow.return_value = now
2354
2355        result = client.DeviceFlowInfo.FromResponse(response)
2356        expected_result = client.DeviceFlowInfo(
2357            self.DEVICE_CODE, self.USER_CODE, None, self.VER_URL, expire)
2358        self.assertEqual(result, expected_result)
2359