1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Helper functions used in multiple unit tests."""
6
7import base64
8import json
9import mock
10import os
11import re
12import unittest
13import urllib
14
15from google.appengine.api import users
16from google.appengine.ext import deferred
17from google.appengine.ext import ndb
18from google.appengine.ext import testbed
19
20from dashboard import rietveld_service
21from dashboard import stored_object
22from dashboard import utils
23from dashboard.models import graph_data
24
25_QUEUE_YAML_DIR = os.path.join(os.path.dirname(__file__), '..')
26
27
28class FakeRequestObject(object):
29  """Fake Request object which can be used by datastore_hooks mocks."""
30
31  def __init__(self, remote_addr=None):
32    self.registry = {}
33    self.remote_addr = remote_addr
34
35
36class FakeResponseObject(object):
37  """Fake Response Object which can be returned by urlfetch mocks."""
38
39  def __init__(self, status_code, content):
40    self.status_code = status_code
41    self.content = content
42
43
44class TestCase(unittest.TestCase):
45  """Common base class for test cases."""
46
47  def setUp(self):
48    self.testbed = testbed.Testbed()
49    self.testbed.activate()
50    self.testbed.init_datastore_v3_stub()
51    self.testbed.init_mail_stub()
52    self.mail_stub = self.testbed.get_stub(testbed.MAIL_SERVICE_NAME)
53    self.testbed.init_memcache_stub()
54    self.testbed.init_taskqueue_stub(root_path=_QUEUE_YAML_DIR)
55    self.testbed.init_user_stub()
56    self.testbed.init_urlfetch_stub()
57    self.mock_get_request = None
58    self._PatchIsInternalUser()
59
60  def tearDown(self):
61    self.testbed.deactivate()
62
63  def _AddFakeRietveldConfig(self):
64    """Sets up fake service account credentials for tests."""
65    rietveld_service.RietveldConfig(
66        id='default_rietveld_config',
67        client_email='foo@bar.com',
68        service_account_key='Fake Account Key',
69        server_url='https://test-rietveld.appspot.com',
70        internal_server_url='https://test-rietveld.appspot.com').put()
71
72  def ExecuteTaskQueueTasks(self, handler_name, task_queue_name):
73    """Executes all of the tasks on the queue until there are none left."""
74    task_queue = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
75    tasks = task_queue.GetTasks(task_queue_name)
76    task_queue.FlushQueue(task_queue_name)
77    for task in tasks:
78      self.testapp.post(
79          handler_name, urllib.unquote_plus(base64.b64decode(task['body'])))
80      self.ExecuteTaskQueueTasks(handler_name, task_queue_name)
81
82  def ExecuteDeferredTasks(self, task_queue_name):
83    task_queue = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
84    tasks = task_queue.GetTasks(task_queue_name)
85    task_queue.FlushQueue(task_queue_name)
86    for task in tasks:
87      deferred.run(base64.b64decode(task['body']))
88      self.ExecuteDeferredTasks(task_queue_name)
89
90  def SetCurrentUser(self, email, user_id='123456', is_admin=False):
91    """Sets the user in the environment in the current testbed."""
92    self.testbed.setup_env(
93        user_is_admin=('1' if is_admin else '0'),
94        user_email=email,
95        user_id=user_id,
96        overwrite=True)
97
98  def UnsetCurrentUser(self):
99    """Sets the user in the environment to have no email and be non-admin."""
100    self.testbed.setup_env(
101        user_is_admin='0', user_email='', user_id='', overwrite=True)
102
103  def GetEmbeddedVariable(self, response, var_name):
104    """Gets a variable embedded in a script element in a response.
105
106    If the variable was found but couldn't be parsed as JSON, this method
107    has a side-effect of failing the test.
108
109    Args:
110      response: A webtest.TestResponse object.
111      var_name: The name of the variable to fetch the value of.
112
113    Returns:
114      A value obtained from de-JSON-ifying the embedded variable,
115      or None if no such value could be found in the response.
116    """
117    scripts_elements = response.html('script')
118    for script_element in scripts_elements:
119      contents = script_element.renderContents()
120      # Assume that the variable is all one line, with no line breaks.
121      match = re.search(var_name + r'\s*=\s*(.+);\s*$', contents,
122                        re.MULTILINE)
123      if match:
124        javascript_value = match.group(1)
125        try:
126          return json.loads(javascript_value)
127        except ValueError:
128          self.fail('Could not deserialize value of "%s" as JSON:\n%s' %
129                    (var_name, javascript_value))
130          return None
131    return None
132
133  def GetJsonValue(self, response, key):
134    return json.loads(response.body).get(key)
135
136  def PatchDatastoreHooksRequest(self, remote_addr=None):
137    """This patches the request object to allow IP address to be set.
138
139    It should be used by tests which check code that does IP address checking
140    through datastore_hooks.
141    """
142    get_request_patcher = mock.patch(
143        'webapp2.get_request',
144        mock.MagicMock(return_value=FakeRequestObject(remote_addr)))
145    self.mock_get_request = get_request_patcher.start()
146    self.addCleanup(get_request_patcher.stop)
147
148  def _PatchIsInternalUser(self):
149    """Sets up a fake version of utils.IsInternalUser to use in tests.
150
151    This version doesn't try to make any requests to check whether the
152    user is internal; it just checks for cached values and returns False
153    if nothing is found.
154    """
155    def IsInternalUser():
156      username = users.get_current_user()
157      return bool(utils.GetCachedIsInternalUser(username))
158
159    is_internal_user_patcher = mock.patch.object(
160        utils, 'IsInternalUser', IsInternalUser)
161    is_internal_user_patcher.start()
162    self.addCleanup(is_internal_user_patcher.stop)
163
164
165def AddTests(masters, bots, tests_dict):
166  """Adds data to the mock datastore.
167
168  Args:
169    masters: List of buildbot master names.
170    bots: List of bot names.
171    tests_dict: Nested dictionary of tests to add; keys are test names
172        and values are nested dictionaries of tests to add.
173  """
174  for master_name in masters:
175    master_key = graph_data.Master(id=master_name).put()
176    for bot_name in bots:
177      bot_key = graph_data.Bot(id=bot_name, parent=master_key).put()
178      for test_name in tests_dict:
179        test_key = graph_data.Test(id=test_name, parent=bot_key).put()
180        _AddSubtest(test_key, tests_dict[test_name])
181
182
183def _AddSubtest(parent_test_key, subtests_dict):
184  """Helper function to recursively add sub-Tests to a Test.
185
186  Args:
187    parent_test_key: A Test key.
188    subtests_dict: A dict of test names to dictionaries of subtests.
189  """
190  for test_name in subtests_dict:
191    test_key = graph_data.Test(id=test_name, parent=parent_test_key).put()
192    _AddSubtest(test_key, subtests_dict[test_name])
193
194
195def AddRows(test_path, rows):
196  """Adds Rows to a given test.
197
198  Args:
199    test_path: Full test path of Test entity to add Rows to.
200    rows: Either a dict mapping ID (revision) to properties, or a set of IDs.
201
202  Returns:
203    The list of Row entities which have been put, in order by ID.
204  """
205  test_key = utils.TestKey(test_path)
206  container_key = utils.GetTestContainerKey(test_key)
207  if isinstance(rows, dict):
208    return _AddRowsFromDict(container_key, rows)
209  return _AddRowsFromIterable(container_key, rows)
210
211
212def _AddRowsFromDict(container_key, row_dict):
213  """Adds a set of Rows given a dict of revisions to properties."""
214  rows = []
215  for int_id in sorted(row_dict):
216    rows.append(
217        graph_data.Row(id=int_id, parent=container_key, **row_dict[int_id]))
218  ndb.put_multi(rows)
219  return rows
220
221
222def _AddRowsFromIterable(container_key, row_ids):
223  """Adds a set of Rows given an iterable of ID numbers."""
224  rows = []
225  for int_id in sorted(row_ids):
226    rows.append(graph_data.Row(id=int_id, parent=container_key, value=int_id))
227  ndb.put_multi(rows)
228  return rows
229
230
231def SetIsInternalUser(user, is_internal_user):
232  """Sets the domain that users who can access internal data belong to."""
233  utils.SetCachedIsInternalUser(user, is_internal_user)
234
235
236def SetSheriffDomains(domains):
237  """Sets the domain that users who can access internal data belong to."""
238  stored_object.Set(utils.SHERIFF_DOMAINS_KEY, domains)
239
240
241def SetIpWhitelist(ip_addresses):
242  """Sets the list of whitelisted IP addresses."""
243  stored_object.Set(utils.IP_WHITELIST_KEY, ip_addresses)
244