1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import json 5import os 6import shutil 7import tempfile 8import unittest 9 10from py_utils import cloud_storage # pylint: disable=import-error 11 12from telemetry.page import page 13from telemetry.testing import system_stub 14from telemetry.wpr import archive_info 15 16 17class MockPage(page.Page): 18 def __init__(self, url, name=None, platform_specific=False): 19 super(MockPage, self).__init__(url, None, name=name) 20 self._platform_specific = platform_specific 21 22page1 = MockPage('http://www.foo.com/', 'Foo') 23page2 = MockPage('http://www.bar.com/', 'Bar', True) 24page3 = MockPage('http://www.baz.com/', platform_specific=True) 25pageNew1 = MockPage('http://www.new.com/', 'New') 26pageNew2 = MockPage('http://www.newer.com/', 'Newer', True) 27recording1 = 'data_001.wpr' 28recording2 = 'data_002.wpr' 29recording3 = 'data_003.wpr' 30recording4 = 'data_004.wpr' 31recording5 = 'data_005.wpr' 32_DEFAULT_PLATFORM = archive_info._DEFAULT_PLATFORM 33 34default_archives_info_contents_dict = { 35 "platform_specific": True, 36 "archives": { 37 "Foo": { 38 _DEFAULT_PLATFORM: recording1 39 }, 40 "Bar": { 41 _DEFAULT_PLATFORM: recording2 42 }, 43 "http://www.baz.com/": { 44 _DEFAULT_PLATFORM: recording1, 45 "win": recording2, 46 "mac": recording3, 47 "linux": recording4, 48 "android": recording5 49 } 50 } 51} 52 53default_archive_info_contents = json.dumps(default_archives_info_contents_dict) 54default_wpr_files = [ 55 'data_001.wpr', 'data_002.wpr', 'data_003.wpr', 'data_004.wpr', 56 'data_005.wpr'] 57_BASE_ARCHIVE = { 58 u'platform_specific': True, 59 u'description': (u'Describes the Web Page Replay archives for a' 60 u' story set. Don\'t edit by hand! Use record_wpr for' 61 u' updating.'), 62 u'archives': {}, 63} 64 65 66class WprArchiveInfoTest(unittest.TestCase): 67 def setUp(self): 68 self.tmp_dir = tempfile.mkdtemp() 69 # Set file for the metadata. 70 self.story_set_archive_info_file = os.path.join( 71 self.tmp_dir, 'info.json') 72 self.overrides = system_stub.Override(archive_info, ['cloud_storage']) 73 74 def tearDown(self): 75 shutil.rmtree(self.tmp_dir) 76 self.overrides.Restore() 77 78 def createArchiveInfo( 79 self, archive_data=default_archive_info_contents, 80 cloud_storage_bucket=cloud_storage.PUBLIC_BUCKET, wpr_files=None): 81 82 # Cannot set lists as a default parameter, so doing it this way. 83 if wpr_files is None: 84 wpr_files = default_wpr_files 85 86 with open(self.story_set_archive_info_file, 'w') as f: 87 f.write(archive_data) 88 89 assert isinstance(wpr_files, list) 90 for wpr_file in wpr_files: 91 assert isinstance(wpr_file, basestring) 92 with open(os.path.join(self.tmp_dir, wpr_file), 'w') as f: 93 f.write(archive_data) 94 return archive_info.WprArchiveInfo.FromFile( 95 self.story_set_archive_info_file, cloud_storage_bucket) 96 97 def testInitNotPlatformSpecific(self): 98 with open(self.story_set_archive_info_file, 'w') as f: 99 f.write('{}') 100 with self.assertRaises(AssertionError): 101 self.createArchiveInfo(archive_data='{}') 102 103 104 def testDownloadArchivesIfNeededAllNeeded(self): 105 test_archive_info = self.createArchiveInfo() 106 cloud_storage_stub = self.overrides.cloud_storage 107 # Second hash doesn't match, need to fetch it. 108 cloud_storage_stub.SetRemotePathsForTesting( 109 {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old", 110 recording2: "dummyhash2_old", 111 recording3: "dummyhash3_old", 112 recording4: "dummyhash4_old"}}) 113 cloud_storage_stub.SetCalculatedHashesForTesting( 114 {os.path.join(self.tmp_dir, recording1): "dummyhash1", 115 os.path.join(self.tmp_dir, recording2): "dummyhash2", 116 os.path.join(self.tmp_dir, recording3): "dummyhash3", 117 os.path.join(self.tmp_dir, recording4): "dummyhash4",}) 118 119 test_archive_info.DownloadArchivesIfNeeded() 120 self.assertItemsEqual(cloud_storage_stub.downloaded_files, 121 [recording1, recording2, recording3, recording4]) 122 123 124 def testDownloadArchivesIfNeededOneNeeded(self): 125 test_archive_info = self.createArchiveInfo() 126 cloud_storage_stub = self.overrides.cloud_storage 127 # Second hash doesn't match, need to fetch it. 128 cloud_storage_stub.SetRemotePathsForTesting( 129 {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old", 130 recording2: "dummyhash2", 131 recording3: "dummyhash3", 132 recording4: "dummyhash4"}}) 133 cloud_storage_stub.SetCalculatedHashesForTesting( 134 {os.path.join(self.tmp_dir, recording1): "dummyhash1", 135 os.path.join(self.tmp_dir, recording2): "dummyhash2", 136 os.path.join(self.tmp_dir, recording3): "dummyhash3", 137 os.path.join(self.tmp_dir, recording4): "dummyhash4",}) 138 test_archive_info.DownloadArchivesIfNeeded() 139 self.assertItemsEqual(cloud_storage_stub.downloaded_files, [recording1]) 140 141 def testDownloadArchivesIfNeededNonDefault(self): 142 data = { 143 'platform_specific': True, 144 'archives': { 145 'http://www.baz.com/': { 146 _DEFAULT_PLATFORM: 'data_001.wpr', 147 'win': 'data_002.wpr', 148 'linux': 'data_004.wpr', 149 'mac': 'data_003.wpr', 150 'android': 'data_005.wpr'}, 151 'Foo': {_DEFAULT_PLATFORM: 'data_003.wpr'}, 152 'Bar': {_DEFAULT_PLATFORM: 'data_002.wpr'} 153 } 154 } 155 test_archive_info = self.createArchiveInfo( 156 archive_data=json.dumps(data, separators=(',', ': '))) 157 cloud_storage_stub = self.overrides.cloud_storage 158 # Second hash doesn't match, need to fetch it. 159 cloud_storage_stub.SetRemotePathsForTesting( 160 {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old", 161 recording2: "dummyhash2", 162 recording3: "dummyhash3", 163 recording4: "dummyhash4_old"}}) 164 cloud_storage_stub.SetCalculatedHashesForTesting( 165 {os.path.join(self.tmp_dir, recording1): "dummyhash1", 166 os.path.join(self.tmp_dir, recording2): "dummyhash2", 167 os.path.join(self.tmp_dir, recording3): "dummyhash3", 168 os.path.join(self.tmp_dir, recording4): "dummyhash4",}) 169 test_archive_info.DownloadArchivesIfNeeded(target_platforms=['linux']) 170 self.assertItemsEqual(cloud_storage_stub.downloaded_files, 171 [recording1, recording4]) 172 173 def testDownloadArchivesIfNeededNoBucket(self): 174 test_archive_info = self.createArchiveInfo(cloud_storage_bucket=None) 175 cloud_storage_stub = self.overrides.cloud_storage 176 # Second hash doesn't match, need to fetch it. 177 cloud_storage_stub.SetRemotePathsForTesting( 178 {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1", 179 recording2: "dummyhash2", 180 recording3: "dummyhash3", 181 recording4: "dummyhash4_old"}}) 182 cloud_storage_stub.SetCalculatedHashesForTesting( 183 {os.path.join(self.tmp_dir, recording1): "dummyhash1", 184 os.path.join(self.tmp_dir, recording2): "dummyhash2", 185 os.path.join(self.tmp_dir, recording3): "dummyhash3", 186 os.path.join(self.tmp_dir, recording4): "dummyhash4",}) 187 test_archive_info.DownloadArchivesIfNeeded() 188 self.assertItemsEqual(cloud_storage_stub.downloaded_files, []) 189 190 def testWprFilePathForStoryDefault(self): 191 test_archive_info = self.createArchiveInfo() 192 self.assertEqual( 193 test_archive_info.WprFilePathForStory(page1), 194 os.path.join(self.tmp_dir, recording1)) 195 self.assertEqual( 196 test_archive_info.WprFilePathForStory(page2), 197 os.path.join(self.tmp_dir, recording2)) 198 self.assertEqual( 199 test_archive_info.WprFilePathForStory(page3), 200 os.path.join(self.tmp_dir, recording1)) 201 202 def testWprFilePathForStoryMac(self): 203 test_archive_info = self.createArchiveInfo() 204 self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'mac'), 205 os.path.join(self.tmp_dir, recording1)) 206 self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'mac'), 207 os.path.join(self.tmp_dir, recording2)) 208 self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'mac'), 209 os.path.join(self.tmp_dir, recording3)) 210 211 def testWprFilePathForStoryWin(self): 212 test_archive_info = self.createArchiveInfo() 213 self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'win'), 214 os.path.join(self.tmp_dir, recording1)) 215 self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'win'), 216 os.path.join(self.tmp_dir, recording2)) 217 self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'win'), 218 os.path.join(self.tmp_dir, recording2)) 219 220 def testWprFilePathForStoryAndroid(self): 221 test_archive_info = self.createArchiveInfo() 222 self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'android'), 223 os.path.join(self.tmp_dir, recording1)) 224 self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'android'), 225 os.path.join(self.tmp_dir, recording2)) 226 self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'android'), 227 os.path.join(self.tmp_dir, recording5)) 228 229 def testWprFilePathForStoryLinux(self): 230 test_archive_info = self.createArchiveInfo() 231 self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'linux'), 232 os.path.join(self.tmp_dir, recording1)) 233 self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'linux'), 234 os.path.join(self.tmp_dir, recording2)) 235 self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'linux'), 236 os.path.join(self.tmp_dir, recording4)) 237 238 def testWprFilePathForStoryBadStory(self): 239 test_archive_info = self.createArchiveInfo() 240 self.assertIsNone(test_archive_info.WprFilePathForStory(pageNew1)) 241 242 243 def testAddRecordedStoriesNoStories(self): 244 test_archive_info = self.createArchiveInfo() 245 old_data = test_archive_info._data.copy() 246 test_archive_info.AddNewTemporaryRecording() 247 test_archive_info.AddRecordedStories(None) 248 self.assertDictEqual(old_data, test_archive_info._data) 249 250 def assertWprFileDoesNotExist(self, file_name): 251 sha_file = file_name + '.sha1' 252 self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, sha_file))) 253 self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, file_name))) 254 255 def assertWprFileDoesExist(self, file_name): 256 sha_file = file_name + '.sha1' 257 self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, sha_file))) 258 self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, file_name))) 259 260 def testAddRecordedStoriesDefault(self): 261 test_archive_info = self.createArchiveInfo() 262 self.assertWprFileDoesNotExist('data_006.wpr') 263 264 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 265 expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr') 266 hash_dictionary = {expected_archive_file_path: 'filehash'} 267 cloud_storage_stub = self.overrides.cloud_storage 268 cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary) 269 270 with open(new_temp_recording, 'w') as f: 271 f.write('wpr data') 272 273 test_archive_info.AddNewTemporaryRecording(new_temp_recording) 274 test_archive_info.AddRecordedStories([page2, page3]) 275 276 with open(self.story_set_archive_info_file, 'r') as f: 277 archive_file_contents = json.load(f) 278 279 expected_archive_contents = _BASE_ARCHIVE.copy() 280 expected_archive_contents['archives'] = { 281 page1.display_name: { 282 _DEFAULT_PLATFORM: recording1 283 }, 284 page2.display_name: { 285 _DEFAULT_PLATFORM: 'data_006.wpr' 286 }, 287 page3.display_name: { 288 _DEFAULT_PLATFORM: u'data_006.wpr', 289 'linux': recording4, 290 'mac': recording3, 291 'win': recording2, 292 'android': recording5 293 } 294 } 295 296 self.assertDictEqual(expected_archive_contents, archive_file_contents) 297 # Ensure the saved JSON does not contain trailing spaces. 298 with open(self.story_set_archive_info_file, 'rU') as f: 299 for line in f: 300 self.assertFalse(line.rstrip('\n').endswith(' ')) 301 self.assertWprFileDoesExist('data_006.wpr') 302 303 def testAddRecordedStoriesNotDefault(self): 304 test_archive_info = self.createArchiveInfo() 305 self.assertWprFileDoesNotExist('data_006.wpr') 306 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 307 expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr') 308 hash_dictionary = {expected_archive_file_path: 'filehash'} 309 cloud_storage_stub = self.overrides.cloud_storage 310 cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary) 311 312 with open(new_temp_recording, 'w') as f: 313 f.write('wpr data') 314 test_archive_info.AddNewTemporaryRecording(new_temp_recording) 315 test_archive_info.AddRecordedStories([page2, page3], 316 target_platform='android') 317 318 with open(self.story_set_archive_info_file, 'r') as f: 319 archive_file_contents = json.load(f) 320 321 expected_archive_contents = _BASE_ARCHIVE.copy() 322 expected_archive_contents['archives'] = { 323 page1.display_name: { 324 _DEFAULT_PLATFORM: recording1 325 }, 326 page2.display_name: { 327 _DEFAULT_PLATFORM: recording2, 328 'android': 'data_006.wpr' 329 }, 330 page3.display_name: { 331 _DEFAULT_PLATFORM: recording1, 332 'linux': recording4, 333 'mac': recording3, 334 'win': recording2, 335 'android': 'data_006.wpr' 336 }, 337 } 338 339 self.assertDictEqual(expected_archive_contents, archive_file_contents) 340 # Ensure the saved JSON does not contain trailing spaces. 341 with open(self.story_set_archive_info_file, 'rU') as f: 342 for line in f: 343 self.assertFalse(line.rstrip('\n').endswith(' ')) 344 self.assertWprFileDoesExist('data_006.wpr') 345 346 347 348 def testAddRecordedStoriesNewPage(self): 349 test_archive_info = self.createArchiveInfo() 350 self.assertWprFileDoesNotExist('data_006.wpr') 351 self.assertWprFileDoesNotExist('data_007.wpr') 352 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 353 expected_archive_file_path1 = os.path.join(self.tmp_dir, 'data_006.wpr') 354 expected_archive_file_path2 = os.path.join(self.tmp_dir, 'data_007.wpr') 355 hash_dictionary = { 356 expected_archive_file_path1: 'filehash', 357 expected_archive_file_path2: 'filehash2' 358 } 359 cloud_storage_stub = self.overrides.cloud_storage 360 cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary) 361 362 with open(new_temp_recording, 'w') as f: 363 f.write('wpr data') 364 test_archive_info.AddNewTemporaryRecording(new_temp_recording) 365 test_archive_info.AddRecordedStories([pageNew1]) 366 367 with open(new_temp_recording, 'w') as f: 368 f.write('wpr data2') 369 370 test_archive_info.AddNewTemporaryRecording(new_temp_recording) 371 test_archive_info.AddRecordedStories([pageNew2], target_platform='android') 372 373 with open(self.story_set_archive_info_file, 'r') as f: 374 archive_file_contents = json.load(f) 375 376 377 expected_archive_contents = _BASE_ARCHIVE.copy() 378 expected_archive_contents['archives'] = { 379 page1.display_name: { 380 _DEFAULT_PLATFORM: recording1 381 }, 382 page2.display_name: { 383 _DEFAULT_PLATFORM: recording2, 384 }, 385 page3.display_name: { 386 _DEFAULT_PLATFORM: recording1, 387 'linux': recording4, 388 'mac': recording3, 389 'win': recording2, 390 'android': recording5 391 }, 392 pageNew1.display_name: { 393 _DEFAULT_PLATFORM: 'data_006.wpr' 394 }, 395 pageNew2.display_name: { 396 _DEFAULT_PLATFORM: 'data_007.wpr', 397 'android': 'data_007.wpr' 398 } 399 } 400 401 self.assertDictEqual(expected_archive_contents, archive_file_contents) 402 # Ensure the saved JSON does not contain trailing spaces. 403 with open(self.story_set_archive_info_file, 'rU') as f: 404 for line in f: 405 self.assertFalse(line.rstrip('\n').endswith(' ')) 406 self.assertWprFileDoesExist('data_006.wpr') 407 self.assertWprFileDoesExist('data_007.wpr') 408