1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import json
5import os
6import shutil
7import tempfile
8import unittest
9
10from py_utils import cloud_storage  # pylint: disable=import-error
11
12from telemetry.page import page
13from telemetry.testing import system_stub
14from telemetry.wpr import archive_info
15
16
17class MockPage(page.Page):
18  def __init__(self, url, name=None, platform_specific=False):
19    super(MockPage, self).__init__(url, None, name=name)
20    self._platform_specific = platform_specific
21
22page1 = MockPage('http://www.foo.com/', 'Foo')
23page2 = MockPage('http://www.bar.com/', 'Bar', True)
24page3 = MockPage('http://www.baz.com/', platform_specific=True)
25pageNew1 = MockPage('http://www.new.com/', 'New')
26pageNew2 = MockPage('http://www.newer.com/', 'Newer', True)
27recording1 = 'data_001.wpr'
28recording2 = 'data_002.wpr'
29recording3 = 'data_003.wpr'
30recording4 = 'data_004.wpr'
31recording5 = 'data_005.wpr'
32_DEFAULT_PLATFORM = archive_info._DEFAULT_PLATFORM
33
34default_archives_info_contents_dict = {
35    "platform_specific": True,
36    "archives": {
37        "Foo": {
38            _DEFAULT_PLATFORM: recording1
39        },
40        "Bar": {
41            _DEFAULT_PLATFORM: recording2
42        },
43        "http://www.baz.com/": {
44            _DEFAULT_PLATFORM: recording1,
45            "win": recording2,
46            "mac": recording3,
47            "linux": recording4,
48            "android": recording5
49        }
50    }
51}
52
53default_archive_info_contents = json.dumps(default_archives_info_contents_dict)
54default_wpr_files = [
55    'data_001.wpr', 'data_002.wpr', 'data_003.wpr', 'data_004.wpr',
56    'data_005.wpr']
57_BASE_ARCHIVE = {
58        u'platform_specific': True,
59        u'description': (u'Describes the Web Page Replay archives for a'
60                         u' story set. Don\'t edit by hand! Use record_wpr for'
61                         u' updating.'),
62        u'archives': {},
63}
64
65
66class WprArchiveInfoTest(unittest.TestCase):
67  def setUp(self):
68    self.tmp_dir = tempfile.mkdtemp()
69    # Set file for the metadata.
70    self.story_set_archive_info_file = os.path.join(
71        self.tmp_dir, 'info.json')
72    self.overrides = system_stub.Override(archive_info, ['cloud_storage'])
73
74  def tearDown(self):
75    shutil.rmtree(self.tmp_dir)
76    self.overrides.Restore()
77
78  def createArchiveInfo(
79      self, archive_data=default_archive_info_contents,
80      cloud_storage_bucket=cloud_storage.PUBLIC_BUCKET, wpr_files=None):
81
82    # Cannot set lists as a default parameter, so doing it this way.
83    if wpr_files is None:
84      wpr_files = default_wpr_files
85
86    with open(self.story_set_archive_info_file, 'w') as f:
87      f.write(archive_data)
88
89    assert isinstance(wpr_files, list)
90    for wpr_file in wpr_files:
91      assert isinstance(wpr_file, basestring)
92      with open(os.path.join(self.tmp_dir, wpr_file), 'w') as f:
93        f.write(archive_data)
94    return archive_info.WprArchiveInfo.FromFile(
95        self.story_set_archive_info_file, cloud_storage_bucket)
96
97  def testInitNotPlatformSpecific(self):
98    with open(self.story_set_archive_info_file, 'w') as f:
99      f.write('{}')
100    with self.assertRaises(AssertionError):
101      self.createArchiveInfo(archive_data='{}')
102
103
104  def testDownloadArchivesIfNeededAllNeeded(self):
105    test_archive_info = self.createArchiveInfo()
106    cloud_storage_stub = self.overrides.cloud_storage
107    # Second hash doesn't match, need to fetch it.
108    cloud_storage_stub.SetRemotePathsForTesting(
109        {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
110                                       recording2: "dummyhash2_old",
111                                       recording3: "dummyhash3_old",
112                                       recording4: "dummyhash4_old"}})
113    cloud_storage_stub.SetCalculatedHashesForTesting(
114        {os.path.join(self.tmp_dir, recording1): "dummyhash1",
115         os.path.join(self.tmp_dir, recording2): "dummyhash2",
116         os.path.join(self.tmp_dir, recording3): "dummyhash3",
117         os.path.join(self.tmp_dir, recording4): "dummyhash4",})
118
119    test_archive_info.DownloadArchivesIfNeeded()
120    self.assertItemsEqual(cloud_storage_stub.downloaded_files,
121                          [recording1, recording2, recording3, recording4])
122
123
124  def testDownloadArchivesIfNeededOneNeeded(self):
125    test_archive_info = self.createArchiveInfo()
126    cloud_storage_stub = self.overrides.cloud_storage
127    # Second hash doesn't match, need to fetch it.
128    cloud_storage_stub.SetRemotePathsForTesting(
129        {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
130                                       recording2: "dummyhash2",
131                                       recording3: "dummyhash3",
132                                       recording4: "dummyhash4"}})
133    cloud_storage_stub.SetCalculatedHashesForTesting(
134        {os.path.join(self.tmp_dir, recording1): "dummyhash1",
135         os.path.join(self.tmp_dir, recording2): "dummyhash2",
136         os.path.join(self.tmp_dir, recording3): "dummyhash3",
137         os.path.join(self.tmp_dir, recording4): "dummyhash4",})
138    test_archive_info.DownloadArchivesIfNeeded()
139    self.assertItemsEqual(cloud_storage_stub.downloaded_files, [recording1])
140
141  def testDownloadArchivesIfNeededNonDefault(self):
142    data = {
143        'platform_specific': True,
144        'archives': {
145            'http://www.baz.com/': {
146                _DEFAULT_PLATFORM: 'data_001.wpr',
147                'win': 'data_002.wpr',
148                'linux': 'data_004.wpr',
149                'mac': 'data_003.wpr',
150                'android': 'data_005.wpr'},
151            'Foo': {_DEFAULT_PLATFORM: 'data_003.wpr'},
152            'Bar': {_DEFAULT_PLATFORM: 'data_002.wpr'}
153        }
154    }
155    test_archive_info = self.createArchiveInfo(
156        archive_data=json.dumps(data, separators=(',', ': ')))
157    cloud_storage_stub = self.overrides.cloud_storage
158    # Second hash doesn't match, need to fetch it.
159    cloud_storage_stub.SetRemotePathsForTesting(
160        {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1_old",
161                                       recording2: "dummyhash2",
162                                       recording3: "dummyhash3",
163                                       recording4: "dummyhash4_old"}})
164    cloud_storage_stub.SetCalculatedHashesForTesting(
165        {os.path.join(self.tmp_dir, recording1): "dummyhash1",
166         os.path.join(self.tmp_dir, recording2): "dummyhash2",
167         os.path.join(self.tmp_dir, recording3): "dummyhash3",
168         os.path.join(self.tmp_dir, recording4): "dummyhash4",})
169    test_archive_info.DownloadArchivesIfNeeded(target_platforms=['linux'])
170    self.assertItemsEqual(cloud_storage_stub.downloaded_files,
171                          [recording1, recording4])
172
173  def testDownloadArchivesIfNeededNoBucket(self):
174    test_archive_info = self.createArchiveInfo(cloud_storage_bucket=None)
175    cloud_storage_stub = self.overrides.cloud_storage
176    # Second hash doesn't match, need to fetch it.
177    cloud_storage_stub.SetRemotePathsForTesting(
178        {cloud_storage.PUBLIC_BUCKET: {recording1: "dummyhash1",
179                                       recording2: "dummyhash2",
180                                       recording3: "dummyhash3",
181                                       recording4: "dummyhash4_old"}})
182    cloud_storage_stub.SetCalculatedHashesForTesting(
183        {os.path.join(self.tmp_dir, recording1): "dummyhash1",
184         os.path.join(self.tmp_dir, recording2): "dummyhash2",
185         os.path.join(self.tmp_dir, recording3): "dummyhash3",
186         os.path.join(self.tmp_dir, recording4): "dummyhash4",})
187    test_archive_info.DownloadArchivesIfNeeded()
188    self.assertItemsEqual(cloud_storage_stub.downloaded_files, [])
189
190  def testWprFilePathForStoryDefault(self):
191    test_archive_info = self.createArchiveInfo()
192    self.assertEqual(
193        test_archive_info.WprFilePathForStory(page1),
194        os.path.join(self.tmp_dir, recording1))
195    self.assertEqual(
196        test_archive_info.WprFilePathForStory(page2),
197        os.path.join(self.tmp_dir, recording2))
198    self.assertEqual(
199        test_archive_info.WprFilePathForStory(page3),
200        os.path.join(self.tmp_dir, recording1))
201
202  def testWprFilePathForStoryMac(self):
203    test_archive_info = self.createArchiveInfo()
204    self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'mac'),
205                     os.path.join(self.tmp_dir, recording1))
206    self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'mac'),
207                     os.path.join(self.tmp_dir, recording2))
208    self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'mac'),
209                     os.path.join(self.tmp_dir, recording3))
210
211  def testWprFilePathForStoryWin(self):
212    test_archive_info = self.createArchiveInfo()
213    self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'win'),
214                     os.path.join(self.tmp_dir, recording1))
215    self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'win'),
216                     os.path.join(self.tmp_dir, recording2))
217    self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'win'),
218                     os.path.join(self.tmp_dir, recording2))
219
220  def testWprFilePathForStoryAndroid(self):
221    test_archive_info = self.createArchiveInfo()
222    self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'android'),
223                     os.path.join(self.tmp_dir, recording1))
224    self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'android'),
225                     os.path.join(self.tmp_dir, recording2))
226    self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'android'),
227                     os.path.join(self.tmp_dir, recording5))
228
229  def testWprFilePathForStoryLinux(self):
230    test_archive_info = self.createArchiveInfo()
231    self.assertEqual(test_archive_info.WprFilePathForStory(page1, 'linux'),
232                     os.path.join(self.tmp_dir, recording1))
233    self.assertEqual(test_archive_info.WprFilePathForStory(page2, 'linux'),
234                     os.path.join(self.tmp_dir, recording2))
235    self.assertEqual(test_archive_info.WprFilePathForStory(page3, 'linux'),
236                     os.path.join(self.tmp_dir, recording4))
237
238  def testWprFilePathForStoryBadStory(self):
239    test_archive_info = self.createArchiveInfo()
240    self.assertIsNone(test_archive_info.WprFilePathForStory(pageNew1))
241
242
243  def testAddRecordedStoriesNoStories(self):
244    test_archive_info = self.createArchiveInfo()
245    old_data = test_archive_info._data.copy()
246    test_archive_info.AddNewTemporaryRecording()
247    test_archive_info.AddRecordedStories(None)
248    self.assertDictEqual(old_data, test_archive_info._data)
249
250  def assertWprFileDoesNotExist(self, file_name):
251    sha_file = file_name + '.sha1'
252    self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, sha_file)))
253    self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, file_name)))
254
255  def assertWprFileDoesExist(self, file_name):
256    sha_file = file_name + '.sha1'
257    self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, sha_file)))
258    self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, file_name)))
259
260  def testAddRecordedStoriesDefault(self):
261    test_archive_info = self.createArchiveInfo()
262    self.assertWprFileDoesNotExist('data_006.wpr')
263
264    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
265    expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr')
266    hash_dictionary = {expected_archive_file_path: 'filehash'}
267    cloud_storage_stub = self.overrides.cloud_storage
268    cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
269
270    with open(new_temp_recording, 'w') as f:
271      f.write('wpr data')
272
273    test_archive_info.AddNewTemporaryRecording(new_temp_recording)
274    test_archive_info.AddRecordedStories([page2, page3])
275
276    with open(self.story_set_archive_info_file, 'r') as f:
277      archive_file_contents = json.load(f)
278
279    expected_archive_contents = _BASE_ARCHIVE.copy()
280    expected_archive_contents['archives'] = {
281        page1.display_name: {
282            _DEFAULT_PLATFORM: recording1
283        },
284        page2.display_name: {
285            _DEFAULT_PLATFORM: 'data_006.wpr'
286        },
287        page3.display_name: {
288           _DEFAULT_PLATFORM: u'data_006.wpr',
289           'linux': recording4,
290           'mac': recording3,
291           'win': recording2,
292           'android': recording5
293        }
294    }
295
296    self.assertDictEqual(expected_archive_contents, archive_file_contents)
297    # Ensure the saved JSON does not contain trailing spaces.
298    with open(self.story_set_archive_info_file, 'rU') as f:
299      for line in f:
300        self.assertFalse(line.rstrip('\n').endswith(' '))
301    self.assertWprFileDoesExist('data_006.wpr')
302
303  def testAddRecordedStoriesNotDefault(self):
304    test_archive_info = self.createArchiveInfo()
305    self.assertWprFileDoesNotExist('data_006.wpr')
306    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
307    expected_archive_file_path = os.path.join(self.tmp_dir, 'data_006.wpr')
308    hash_dictionary = {expected_archive_file_path: 'filehash'}
309    cloud_storage_stub = self.overrides.cloud_storage
310    cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
311
312    with open(new_temp_recording, 'w') as f:
313      f.write('wpr data')
314    test_archive_info.AddNewTemporaryRecording(new_temp_recording)
315    test_archive_info.AddRecordedStories([page2, page3],
316                                         target_platform='android')
317
318    with open(self.story_set_archive_info_file, 'r') as f:
319      archive_file_contents = json.load(f)
320
321    expected_archive_contents = _BASE_ARCHIVE.copy()
322    expected_archive_contents['archives'] = {
323        page1.display_name: {
324            _DEFAULT_PLATFORM: recording1
325        },
326        page2.display_name: {
327            _DEFAULT_PLATFORM: recording2,
328            'android': 'data_006.wpr'
329        },
330        page3.display_name: {
331           _DEFAULT_PLATFORM: recording1,
332           'linux': recording4,
333           'mac': recording3,
334           'win': recording2,
335           'android': 'data_006.wpr'
336        },
337    }
338
339    self.assertDictEqual(expected_archive_contents, archive_file_contents)
340    # Ensure the saved JSON does not contain trailing spaces.
341    with open(self.story_set_archive_info_file, 'rU') as f:
342      for line in f:
343        self.assertFalse(line.rstrip('\n').endswith(' '))
344    self.assertWprFileDoesExist('data_006.wpr')
345
346
347
348  def testAddRecordedStoriesNewPage(self):
349    test_archive_info = self.createArchiveInfo()
350    self.assertWprFileDoesNotExist('data_006.wpr')
351    self.assertWprFileDoesNotExist('data_007.wpr')
352    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
353    expected_archive_file_path1 = os.path.join(self.tmp_dir, 'data_006.wpr')
354    expected_archive_file_path2 = os.path.join(self.tmp_dir, 'data_007.wpr')
355    hash_dictionary = {
356        expected_archive_file_path1: 'filehash',
357        expected_archive_file_path2: 'filehash2'
358    }
359    cloud_storage_stub = self.overrides.cloud_storage
360    cloud_storage_stub.SetCalculatedHashesForTesting(hash_dictionary)
361
362    with open(new_temp_recording, 'w') as f:
363      f.write('wpr data')
364    test_archive_info.AddNewTemporaryRecording(new_temp_recording)
365    test_archive_info.AddRecordedStories([pageNew1])
366
367    with open(new_temp_recording, 'w') as f:
368      f.write('wpr data2')
369
370    test_archive_info.AddNewTemporaryRecording(new_temp_recording)
371    test_archive_info.AddRecordedStories([pageNew2], target_platform='android')
372
373    with open(self.story_set_archive_info_file, 'r') as f:
374      archive_file_contents = json.load(f)
375
376
377    expected_archive_contents = _BASE_ARCHIVE.copy()
378    expected_archive_contents['archives'] = {
379        page1.display_name: {
380            _DEFAULT_PLATFORM: recording1
381        },
382        page2.display_name: {
383            _DEFAULT_PLATFORM: recording2,
384        },
385        page3.display_name: {
386           _DEFAULT_PLATFORM: recording1,
387           'linux': recording4,
388           'mac': recording3,
389           'win': recording2,
390           'android': recording5
391        },
392        pageNew1.display_name: {
393          _DEFAULT_PLATFORM: 'data_006.wpr'
394        },
395        pageNew2.display_name: {
396          _DEFAULT_PLATFORM: 'data_007.wpr',
397          'android': 'data_007.wpr'
398        }
399    }
400
401    self.assertDictEqual(expected_archive_contents, archive_file_contents)
402    # Ensure the saved JSON does not contain trailing spaces.
403    with open(self.story_set_archive_info_file, 'rU') as f:
404      for line in f:
405        self.assertFalse(line.rstrip('\n').endswith(' '))
406    self.assertWprFileDoesExist('data_006.wpr')
407    self.assertWprFileDoesExist('data_007.wpr')
408