1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import json 6import logging 7import os 8 9from telemetry.core import util 10from telemetry.internal.backends import codepen_credentials_backend 11from telemetry.internal.backends import facebook_credentials_backend 12from telemetry.internal.backends import google_credentials_backend 13from telemetry.testing import options_for_unittests 14 15 16class CredentialsError(Exception): 17 """Error that can be thrown when logging in.""" 18 19 20class BrowserCredentials(object): 21 def __init__(self, backends=None): 22 self._credentials = {} 23 self._credentials_path = None 24 self._extra_credentials = {} 25 26 if backends is None: 27 backends = [ 28 codepen_credentials_backend.CodePenCredentialsBackend(), 29 facebook_credentials_backend.FacebookCredentialsBackend(), 30 facebook_credentials_backend.FacebookCredentialsBackend2(), 31 google_credentials_backend.GoogleCredentialsBackend(), 32 google_credentials_backend.GoogleCredentialsBackend2()] 33 34 self._backends = {} 35 for backend in backends: 36 self._backends[backend.credentials_type] = backend 37 38 def AddBackend(self, backend): 39 assert backend.credentials_type not in self._backends 40 self._backends[backend.credentials_type] = backend 41 42 def IsLoggedIn(self, credentials_type): 43 if credentials_type not in self._backends: 44 raise CredentialsError( 45 'Unrecognized credentials type: %s', credentials_type) 46 if credentials_type not in self._credentials: 47 return False 48 return self._backends[credentials_type].IsLoggedIn() 49 50 def CanLogin(self, credentials_type): 51 if credentials_type not in self._backends: 52 raise CredentialsError( 53 'Unrecognized credentials type: %s', credentials_type) 54 return credentials_type in self._credentials 55 56 def LoginNeeded(self, tab, credentials_type): 57 if credentials_type not in self._backends: 58 raise CredentialsError( 59 'Unrecognized credentials type: %s', credentials_type) 60 if credentials_type not in self._credentials: 61 return False 62 from telemetry.page import action_runner 63 runner = action_runner.ActionRunner(tab) 64 return self._backends[credentials_type].LoginNeeded( 65 tab, runner, self._credentials[credentials_type]) 66 67 def LoginNoLongerNeeded(self, tab, credentials_type): 68 assert credentials_type in self._backends 69 self._backends[credentials_type].LoginNoLongerNeeded(tab) 70 71 @property 72 def credentials_path(self): 73 return self._credentials_path 74 75 @credentials_path.setter 76 def credentials_path(self, credentials_path): 77 self._credentials_path = credentials_path 78 self._RebuildCredentials() 79 80 def Add(self, credentials_type, data): 81 if credentials_type not in self._extra_credentials: 82 self._extra_credentials[credentials_type] = {} 83 for k, v in data.items(): 84 assert k not in self._extra_credentials[credentials_type] 85 self._extra_credentials[credentials_type][k] = v 86 self._RebuildCredentials() 87 88 def _ResetLoggedInState(self): 89 """Makes the backends think we're not logged in even though we are. 90 Should only be used in unit tests to simulate --dont-override-profile. 91 """ 92 for backend in self._backends.keys(): 93 # pylint: disable=protected-access 94 self._backends[backend]._ResetLoggedInState() 95 96 def _RebuildCredentials(self): 97 credentials = {} 98 if self._credentials_path == None: 99 pass 100 elif os.path.exists(self._credentials_path): 101 with open(self._credentials_path, 'r') as f: 102 credentials = json.loads(f.read()) 103 104 # TODO(nduca): use system keychain, if possible. 105 homedir_credentials_path = os.path.expanduser('~/.telemetry-credentials') 106 homedir_credentials = {} 107 108 if (not options_for_unittests.GetCopy() and 109 os.path.exists(homedir_credentials_path)): 110 logging.info("Found ~/.telemetry-credentials. Its contents will be used " 111 "when no other credentials can be found.") 112 with open(homedir_credentials_path, 'r') as f: 113 homedir_credentials = json.loads(f.read()) 114 115 self._credentials = {} 116 all_keys = set(credentials.keys()).union( 117 homedir_credentials.keys()).union( 118 self._extra_credentials.keys()) 119 120 for k in all_keys: 121 if k in credentials: 122 self._credentials[k] = credentials[k] 123 if k in homedir_credentials: 124 logging.info("Will use ~/.telemetry-credentials for %s logins." % k) 125 self._credentials[k] = homedir_credentials[k] 126 if k in self._extra_credentials: 127 self._credentials[k] = self._extra_credentials[k] 128 129 def WarnIfMissingCredentials(self, page): 130 if page.credentials and not self.CanLogin(page.credentials): 131 files_to_tweak = [] 132 if page.credentials_path: 133 files_to_tweak.append(page.credentials_path) 134 files_to_tweak.append('~/.telemetry-credentials') 135 136 example_credentials_file = os.path.join( 137 util.GetTelemetryDir(), 'examples', 'credentials_example.json') 138 139 logging.warning(""" 140 Credentials for %s were not found. page %s will not be tested. 141 142 To fix this, either follow the instructions to authenticate to gsutil 143 here: 144 http://www.chromium.org/developers/telemetry/upload_to_cloud_storage, 145 146 or add your own credentials to: 147 %s 148 An example credentials file you can copy from is here: 149 %s\n""" % (page.credentials, page, ' or '.join(files_to_tweak), 150 example_credentials_file)) 151