1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import json 6import logging 7import os 8 9from telemetry.core import util 10from telemetry.internal.backends import codepen_credentials_backend 11from telemetry.internal.backends import facebook_credentials_backend 12from telemetry.internal.backends import google_credentials_backend 13from telemetry.testing import options_for_unittests 14 15 16class CredentialsError(Exception): 17 """Error that can be thrown when logging in.""" 18 19 20class BrowserCredentials(object): 21 def __init__(self, backends=None): 22 self._credentials = {} 23 self._credentials_path = None 24 self._extra_credentials = {} 25 26 if backends is None: 27 backends = [ 28 codepen_credentials_backend.CodePenCredentialsBackend(), 29 facebook_credentials_backend.FacebookCredentialsBackend(), 30 facebook_credentials_backend.FacebookCredentialsBackend2(), 31 google_credentials_backend.GoogleCredentialsBackend(), 32 google_credentials_backend.GoogleCredentialsBackend2()] 33 34 self._backends = {} 35 for backend in backends: 36 self._backends[backend.credentials_type] = backend 37 38 def AddBackend(self, backend): 39 assert backend.credentials_type not in self._backends 40 self._backends[backend.credentials_type] = backend 41 42 def IsLoggedIn(self, credentials_type): 43 if credentials_type not in self._backends: 44 raise CredentialsError( 45 'Unrecognized credentials type: %s', credentials_type) 46 if credentials_type not in self._credentials: 47 return False 48 return self._backends[credentials_type].IsLoggedIn() 49 50 def CanLogin(self, credentials_type): 51 if credentials_type not in self._backends: 52 raise CredentialsError( 53 'Unrecognized credentials type: %s', credentials_type) 54 return credentials_type in self._credentials 55 56 def LoginNeeded(self, tab, credentials_type): 57 if credentials_type not in self._backends: 58 raise CredentialsError( 59 'Unrecognized credentials type: %s', credentials_type) 60 if credentials_type not in self._credentials: 61 return False 62 runner = tab.action_runner 63 return self._backends[credentials_type].LoginNeeded( 64 tab, runner, self._credentials[credentials_type]) 65 66 def LoginNoLongerNeeded(self, tab, credentials_type): 67 assert credentials_type in self._backends 68 self._backends[credentials_type].LoginNoLongerNeeded(tab) 69 70 @property 71 def credentials_path(self): 72 return self._credentials_path 73 74 @credentials_path.setter 75 def credentials_path(self, credentials_path): 76 self._credentials_path = credentials_path 77 self._RebuildCredentials() 78 79 def Add(self, credentials_type, data): 80 if credentials_type not in self._extra_credentials: 81 self._extra_credentials[credentials_type] = {} 82 for k, v in data.items(): 83 assert k not in self._extra_credentials[credentials_type] 84 self._extra_credentials[credentials_type][k] = v 85 self._RebuildCredentials() 86 87 def _ResetLoggedInState(self): 88 """Makes the backends think we're not logged in even though we are. 89 Should only be used in unit tests to simulate --dont-override-profile. 90 """ 91 for backend in self._backends.keys(): 92 # pylint: disable=protected-access 93 self._backends[backend]._ResetLoggedInState() 94 95 def _RebuildCredentials(self): 96 credentials = {} 97 if self._credentials_path == None: 98 pass 99 elif os.path.exists(self._credentials_path): 100 with open(self._credentials_path, 'r') as f: 101 credentials = json.loads(f.read()) 102 103 # TODO(nduca): use system keychain, if possible. 104 homedir_credentials_path = os.path.expanduser('~/.telemetry-credentials') 105 homedir_credentials = {} 106 107 if (not options_for_unittests.GetCopy() and 108 os.path.exists(homedir_credentials_path)): 109 logging.info("Found ~/.telemetry-credentials. Its contents will be used " 110 "when no other credentials can be found.") 111 with open(homedir_credentials_path, 'r') as f: 112 homedir_credentials = json.loads(f.read()) 113 114 self._credentials = {} 115 all_keys = set(credentials.keys()).union( 116 homedir_credentials.keys()).union( 117 self._extra_credentials.keys()) 118 119 for k in all_keys: 120 if k in credentials: 121 self._credentials[k] = credentials[k] 122 if k in homedir_credentials: 123 logging.info("Will use ~/.telemetry-credentials for %s logins." % k) 124 self._credentials[k] = homedir_credentials[k] 125 if k in self._extra_credentials: 126 self._credentials[k] = self._extra_credentials[k] 127 128 def WarnIfMissingCredentials(self, page): 129 if page.credentials and not self.CanLogin(page.credentials): 130 files_to_tweak = [] 131 if page.credentials_path: 132 files_to_tweak.append(page.credentials_path) 133 files_to_tweak.append('~/.telemetry-credentials') 134 135 example_credentials_file = os.path.join( 136 util.GetTelemetryDir(), 'examples', 'credentials_example.json') 137 138 logging.warning(""" 139 Credentials for %s were not found. page %s will not be tested. 140 141 To fix this, either follow the instructions to authenticate to gsutil 142 here: 143 http://www.chromium.org/developers/telemetry/upload_to_cloud_storage, 144 145 or add your own credentials to: 146 %s 147 An example credentials file you can copy from is here: 148 %s\n""" % (page.credentials, page, ' or '.join(files_to_tweak), 149 example_credentials_file)) 150