1# Copyright 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import json
6import logging
7import os
8
9from telemetry.core import util
10from telemetry.internal.backends import codepen_credentials_backend
11from telemetry.internal.backends import facebook_credentials_backend
12from telemetry.internal.backends import google_credentials_backend
13from telemetry.testing import options_for_unittests
14
15
16class CredentialsError(Exception):
17  """Error that can be thrown when logging in."""
18
19
20class BrowserCredentials(object):
21  def __init__(self, backends=None):
22    self._credentials = {}
23    self._credentials_path = None
24    self._extra_credentials = {}
25
26    if backends is None:
27      backends = [
28        codepen_credentials_backend.CodePenCredentialsBackend(),
29        facebook_credentials_backend.FacebookCredentialsBackend(),
30        facebook_credentials_backend.FacebookCredentialsBackend2(),
31        google_credentials_backend.GoogleCredentialsBackend(),
32        google_credentials_backend.GoogleCredentialsBackend2()]
33
34    self._backends = {}
35    for backend in backends:
36      self._backends[backend.credentials_type] = backend
37
38  def AddBackend(self, backend):
39    assert backend.credentials_type not in self._backends
40    self._backends[backend.credentials_type] = backend
41
42  def IsLoggedIn(self, credentials_type):
43    if credentials_type not in self._backends:
44      raise CredentialsError(
45          'Unrecognized credentials type: %s', credentials_type)
46    if credentials_type not in self._credentials:
47      return False
48    return self._backends[credentials_type].IsLoggedIn()
49
50  def CanLogin(self, credentials_type):
51    if credentials_type not in self._backends:
52      raise CredentialsError(
53          'Unrecognized credentials type: %s', credentials_type)
54    return credentials_type in self._credentials
55
56  def LoginNeeded(self, tab, credentials_type):
57    if credentials_type not in self._backends:
58      raise CredentialsError(
59          'Unrecognized credentials type: %s', credentials_type)
60    if credentials_type not in self._credentials:
61      return False
62    runner = tab.action_runner
63    return self._backends[credentials_type].LoginNeeded(
64      tab, runner, self._credentials[credentials_type])
65
66  def LoginNoLongerNeeded(self, tab, credentials_type):
67    assert credentials_type in self._backends
68    self._backends[credentials_type].LoginNoLongerNeeded(tab)
69
70  @property
71  def credentials_path(self):
72    return self._credentials_path
73
74  @credentials_path.setter
75  def credentials_path(self, credentials_path):
76    self._credentials_path = credentials_path
77    self._RebuildCredentials()
78
79  def Add(self, credentials_type, data):
80    if credentials_type not in self._extra_credentials:
81      self._extra_credentials[credentials_type] = {}
82    for k, v in data.items():
83      assert k not in self._extra_credentials[credentials_type]
84      self._extra_credentials[credentials_type][k] = v
85    self._RebuildCredentials()
86
87  def _ResetLoggedInState(self):
88    """Makes the backends think we're not logged in even though we are.
89    Should only be used in unit tests to simulate --dont-override-profile.
90    """
91    for backend in self._backends.keys():
92      # pylint: disable=protected-access
93      self._backends[backend]._ResetLoggedInState()
94
95  def _RebuildCredentials(self):
96    credentials = {}
97    if self._credentials_path == None:
98      pass
99    elif os.path.exists(self._credentials_path):
100      with open(self._credentials_path, 'r') as f:
101        credentials = json.loads(f.read())
102
103    # TODO(nduca): use system keychain, if possible.
104    homedir_credentials_path = os.path.expanduser('~/.telemetry-credentials')
105    homedir_credentials = {}
106
107    if (not options_for_unittests.GetCopy() and
108        os.path.exists(homedir_credentials_path)):
109      logging.info("Found ~/.telemetry-credentials. Its contents will be used "
110                   "when no other credentials can be found.")
111      with open(homedir_credentials_path, 'r') as f:
112        homedir_credentials = json.loads(f.read())
113
114    self._credentials = {}
115    all_keys = set(credentials.keys()).union(
116      homedir_credentials.keys()).union(
117      self._extra_credentials.keys())
118
119    for k in all_keys:
120      if k in credentials:
121        self._credentials[k] = credentials[k]
122      if k in homedir_credentials:
123        logging.info("Will use ~/.telemetry-credentials for %s logins." % k)
124        self._credentials[k] = homedir_credentials[k]
125      if k in self._extra_credentials:
126        self._credentials[k] = self._extra_credentials[k]
127
128  def WarnIfMissingCredentials(self, page):
129    if page.credentials and not self.CanLogin(page.credentials):
130      files_to_tweak = []
131      if page.credentials_path:
132        files_to_tweak.append(page.credentials_path)
133      files_to_tweak.append('~/.telemetry-credentials')
134
135      example_credentials_file = os.path.join(
136          util.GetTelemetryDir(), 'examples', 'credentials_example.json')
137
138      logging.warning("""
139        Credentials for %s were not found. page %s will not be tested.
140
141        To fix this, either follow the instructions to authenticate to gsutil
142        here:
143        http://www.chromium.org/developers/telemetry/upload_to_cloud_storage,
144
145        or add your own credentials to:
146            %s
147        An example credentials file you can copy from is here:
148            %s\n""" % (page.credentials, page, ' or '.join(files_to_tweak),
149                       example_credentials_file))
150