1# Copyright 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import json
6import logging
7import os
8
9from telemetry.core import util
10from telemetry.internal.backends import codepen_credentials_backend
11from telemetry.internal.backends import facebook_credentials_backend
12from telemetry.internal.backends import google_credentials_backend
13from telemetry.testing import options_for_unittests
14
15
16class CredentialsError(Exception):
17  """Error that can be thrown when logging in."""
18
19
20class BrowserCredentials(object):
21  def __init__(self, backends=None):
22    self._credentials = {}
23    self._credentials_path = None
24    self._extra_credentials = {}
25
26    if backends is None:
27      backends = [
28        codepen_credentials_backend.CodePenCredentialsBackend(),
29        facebook_credentials_backend.FacebookCredentialsBackend(),
30        facebook_credentials_backend.FacebookCredentialsBackend2(),
31        google_credentials_backend.GoogleCredentialsBackend(),
32        google_credentials_backend.GoogleCredentialsBackend2()]
33
34    self._backends = {}
35    for backend in backends:
36      self._backends[backend.credentials_type] = backend
37
38  def AddBackend(self, backend):
39    assert backend.credentials_type not in self._backends
40    self._backends[backend.credentials_type] = backend
41
42  def IsLoggedIn(self, credentials_type):
43    if credentials_type not in self._backends:
44      raise CredentialsError(
45          'Unrecognized credentials type: %s', credentials_type)
46    if credentials_type not in self._credentials:
47      return False
48    return self._backends[credentials_type].IsLoggedIn()
49
50  def CanLogin(self, credentials_type):
51    if credentials_type not in self._backends:
52      raise CredentialsError(
53          'Unrecognized credentials type: %s', credentials_type)
54    return credentials_type in self._credentials
55
56  def LoginNeeded(self, tab, credentials_type):
57    if credentials_type not in self._backends:
58      raise CredentialsError(
59          'Unrecognized credentials type: %s', credentials_type)
60    if credentials_type not in self._credentials:
61      return False
62    from telemetry.page import action_runner
63    runner = action_runner.ActionRunner(tab)
64    return self._backends[credentials_type].LoginNeeded(
65      tab, runner, self._credentials[credentials_type])
66
67  def LoginNoLongerNeeded(self, tab, credentials_type):
68    assert credentials_type in self._backends
69    self._backends[credentials_type].LoginNoLongerNeeded(tab)
70
71  @property
72  def credentials_path(self):
73    return self._credentials_path
74
75  @credentials_path.setter
76  def credentials_path(self, credentials_path):
77    self._credentials_path = credentials_path
78    self._RebuildCredentials()
79
80  def Add(self, credentials_type, data):
81    if credentials_type not in self._extra_credentials:
82      self._extra_credentials[credentials_type] = {}
83    for k, v in data.items():
84      assert k not in self._extra_credentials[credentials_type]
85      self._extra_credentials[credentials_type][k] = v
86    self._RebuildCredentials()
87
88  def _ResetLoggedInState(self):
89    """Makes the backends think we're not logged in even though we are.
90    Should only be used in unit tests to simulate --dont-override-profile.
91    """
92    for backend in self._backends.keys():
93      # pylint: disable=protected-access
94      self._backends[backend]._ResetLoggedInState()
95
96  def _RebuildCredentials(self):
97    credentials = {}
98    if self._credentials_path == None:
99      pass
100    elif os.path.exists(self._credentials_path):
101      with open(self._credentials_path, 'r') as f:
102        credentials = json.loads(f.read())
103
104    # TODO(nduca): use system keychain, if possible.
105    homedir_credentials_path = os.path.expanduser('~/.telemetry-credentials')
106    homedir_credentials = {}
107
108    if (not options_for_unittests.GetCopy() and
109        os.path.exists(homedir_credentials_path)):
110      logging.info("Found ~/.telemetry-credentials. Its contents will be used "
111                   "when no other credentials can be found.")
112      with open(homedir_credentials_path, 'r') as f:
113        homedir_credentials = json.loads(f.read())
114
115    self._credentials = {}
116    all_keys = set(credentials.keys()).union(
117      homedir_credentials.keys()).union(
118      self._extra_credentials.keys())
119
120    for k in all_keys:
121      if k in credentials:
122        self._credentials[k] = credentials[k]
123      if k in homedir_credentials:
124        logging.info("Will use ~/.telemetry-credentials for %s logins." % k)
125        self._credentials[k] = homedir_credentials[k]
126      if k in self._extra_credentials:
127        self._credentials[k] = self._extra_credentials[k]
128
129  def WarnIfMissingCredentials(self, page):
130    if page.credentials and not self.CanLogin(page.credentials):
131      files_to_tweak = []
132      if page.credentials_path:
133        files_to_tweak.append(page.credentials_path)
134      files_to_tweak.append('~/.telemetry-credentials')
135
136      example_credentials_file = os.path.join(
137          util.GetTelemetryDir(), 'examples', 'credentials_example.json')
138
139      logging.warning("""
140        Credentials for %s were not found. page %s will not be tested.
141
142        To fix this, either follow the instructions to authenticate to gsutil
143        here:
144        http://www.chromium.org/developers/telemetry/upload_to_cloud_storage,
145
146        or add your own credentials to:
147            %s
148        An example credentials file you can copy from is here:
149            %s\n""" % (page.credentials, page, ' or '.join(files_to_tweak),
150                       example_credentials_file))
151