1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Helper functions used in multiple unit tests.""" 6 7import base64 8import json 9import mock 10import os 11import re 12import unittest 13import urllib 14 15from google.appengine.api import users 16from google.appengine.ext import deferred 17from google.appengine.ext import ndb 18from google.appengine.ext import testbed 19 20from dashboard import rietveld_service 21from dashboard import stored_object 22from dashboard import utils 23from dashboard.models import graph_data 24 25_QUEUE_YAML_DIR = os.path.join(os.path.dirname(__file__), '..') 26 27 28class FakeRequestObject(object): 29 """Fake Request object which can be used by datastore_hooks mocks.""" 30 31 def __init__(self, remote_addr=None): 32 self.registry = {} 33 self.remote_addr = remote_addr 34 35 36class FakeResponseObject(object): 37 """Fake Response Object which can be returned by urlfetch mocks.""" 38 39 def __init__(self, status_code, content): 40 self.status_code = status_code 41 self.content = content 42 43 44class TestCase(unittest.TestCase): 45 """Common base class for test cases.""" 46 47 def setUp(self): 48 self.testbed = testbed.Testbed() 49 self.testbed.activate() 50 self.testbed.init_datastore_v3_stub() 51 self.testbed.init_mail_stub() 52 self.mail_stub = self.testbed.get_stub(testbed.MAIL_SERVICE_NAME) 53 self.testbed.init_memcache_stub() 54 self.testbed.init_taskqueue_stub(root_path=_QUEUE_YAML_DIR) 55 self.testbed.init_user_stub() 56 self.testbed.init_urlfetch_stub() 57 self.mock_get_request = None 58 self._PatchIsInternalUser() 59 60 def tearDown(self): 61 self.testbed.deactivate() 62 63 def _AddFakeRietveldConfig(self): 64 """Sets up fake service account credentials for tests.""" 65 rietveld_service.RietveldConfig( 66 id='default_rietveld_config', 67 client_email='foo@bar.com', 68 service_account_key='Fake Account Key', 69 server_url='https://test-rietveld.appspot.com', 70 internal_server_url='https://test-rietveld.appspot.com').put() 71 72 def ExecuteTaskQueueTasks(self, handler_name, task_queue_name): 73 """Executes all of the tasks on the queue until there are none left.""" 74 task_queue = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME) 75 tasks = task_queue.GetTasks(task_queue_name) 76 task_queue.FlushQueue(task_queue_name) 77 for task in tasks: 78 self.testapp.post( 79 handler_name, urllib.unquote_plus(base64.b64decode(task['body']))) 80 self.ExecuteTaskQueueTasks(handler_name, task_queue_name) 81 82 def ExecuteDeferredTasks(self, task_queue_name): 83 task_queue = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME) 84 tasks = task_queue.GetTasks(task_queue_name) 85 task_queue.FlushQueue(task_queue_name) 86 for task in tasks: 87 deferred.run(base64.b64decode(task['body'])) 88 self.ExecuteDeferredTasks(task_queue_name) 89 90 def SetCurrentUser(self, email, user_id='123456', is_admin=False): 91 """Sets the user in the environment in the current testbed.""" 92 self.testbed.setup_env( 93 user_is_admin=('1' if is_admin else '0'), 94 user_email=email, 95 user_id=user_id, 96 overwrite=True) 97 98 def UnsetCurrentUser(self): 99 """Sets the user in the environment to have no email and be non-admin.""" 100 self.testbed.setup_env( 101 user_is_admin='0', user_email='', user_id='', overwrite=True) 102 103 def GetEmbeddedVariable(self, response, var_name): 104 """Gets a variable embedded in a script element in a response. 105 106 If the variable was found but couldn't be parsed as JSON, this method 107 has a side-effect of failing the test. 108 109 Args: 110 response: A webtest.TestResponse object. 111 var_name: The name of the variable to fetch the value of. 112 113 Returns: 114 A value obtained from de-JSON-ifying the embedded variable, 115 or None if no such value could be found in the response. 116 """ 117 scripts_elements = response.html('script') 118 for script_element in scripts_elements: 119 contents = script_element.renderContents() 120 # Assume that the variable is all one line, with no line breaks. 121 match = re.search(var_name + r'\s*=\s*(.+);\s*$', contents, 122 re.MULTILINE) 123 if match: 124 javascript_value = match.group(1) 125 try: 126 return json.loads(javascript_value) 127 except ValueError: 128 self.fail('Could not deserialize value of "%s" as JSON:\n%s' % 129 (var_name, javascript_value)) 130 return None 131 return None 132 133 def GetJsonValue(self, response, key): 134 return json.loads(response.body).get(key) 135 136 def PatchDatastoreHooksRequest(self, remote_addr=None): 137 """This patches the request object to allow IP address to be set. 138 139 It should be used by tests which check code that does IP address checking 140 through datastore_hooks. 141 """ 142 get_request_patcher = mock.patch( 143 'webapp2.get_request', 144 mock.MagicMock(return_value=FakeRequestObject(remote_addr))) 145 self.mock_get_request = get_request_patcher.start() 146 self.addCleanup(get_request_patcher.stop) 147 148 def _PatchIsInternalUser(self): 149 """Sets up a fake version of utils.IsInternalUser to use in tests. 150 151 This version doesn't try to make any requests to check whether the 152 user is internal; it just checks for cached values and returns False 153 if nothing is found. 154 """ 155 def IsInternalUser(): 156 username = users.get_current_user() 157 return bool(utils.GetCachedIsInternalUser(username)) 158 159 is_internal_user_patcher = mock.patch.object( 160 utils, 'IsInternalUser', IsInternalUser) 161 is_internal_user_patcher.start() 162 self.addCleanup(is_internal_user_patcher.stop) 163 164 165def AddTests(masters, bots, tests_dict): 166 """Adds data to the mock datastore. 167 168 Args: 169 masters: List of buildbot master names. 170 bots: List of bot names. 171 tests_dict: Nested dictionary of tests to add; keys are test names 172 and values are nested dictionaries of tests to add. 173 """ 174 for master_name in masters: 175 master_key = graph_data.Master(id=master_name).put() 176 for bot_name in bots: 177 bot_key = graph_data.Bot(id=bot_name, parent=master_key).put() 178 for test_name in tests_dict: 179 test_key = graph_data.Test(id=test_name, parent=bot_key).put() 180 _AddSubtest(test_key, tests_dict[test_name]) 181 182 183def _AddSubtest(parent_test_key, subtests_dict): 184 """Helper function to recursively add sub-Tests to a Test. 185 186 Args: 187 parent_test_key: A Test key. 188 subtests_dict: A dict of test names to dictionaries of subtests. 189 """ 190 for test_name in subtests_dict: 191 test_key = graph_data.Test(id=test_name, parent=parent_test_key).put() 192 _AddSubtest(test_key, subtests_dict[test_name]) 193 194 195def AddRows(test_path, rows): 196 """Adds Rows to a given test. 197 198 Args: 199 test_path: Full test path of Test entity to add Rows to. 200 rows: Either a dict mapping ID (revision) to properties, or a set of IDs. 201 202 Returns: 203 The list of Row entities which have been put, in order by ID. 204 """ 205 test_key = utils.TestKey(test_path) 206 container_key = utils.GetTestContainerKey(test_key) 207 if isinstance(rows, dict): 208 return _AddRowsFromDict(container_key, rows) 209 return _AddRowsFromIterable(container_key, rows) 210 211 212def _AddRowsFromDict(container_key, row_dict): 213 """Adds a set of Rows given a dict of revisions to properties.""" 214 rows = [] 215 for int_id in sorted(row_dict): 216 rows.append( 217 graph_data.Row(id=int_id, parent=container_key, **row_dict[int_id])) 218 ndb.put_multi(rows) 219 return rows 220 221 222def _AddRowsFromIterable(container_key, row_ids): 223 """Adds a set of Rows given an iterable of ID numbers.""" 224 rows = [] 225 for int_id in sorted(row_ids): 226 rows.append(graph_data.Row(id=int_id, parent=container_key, value=int_id)) 227 ndb.put_multi(rows) 228 return rows 229 230 231def SetIsInternalUser(user, is_internal_user): 232 """Sets the domain that users who can access internal data belong to.""" 233 utils.SetCachedIsInternalUser(user, is_internal_user) 234 235 236def SetSheriffDomains(domains): 237 """Sets the domain that users who can access internal data belong to.""" 238 stored_object.Set(utils.SHERIFF_DOMAINS_KEY, domains) 239 240 241def SetIpWhitelist(ip_addresses): 242 """Sets the list of whitelisted IP addresses.""" 243 stored_object.Set(utils.IP_WHITELIST_KEY, ip_addresses) 244