1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import mock
18import os
19import unittest
20
21from vts.utils.python.fuzzer import corpus_manager
22
23dut = mock.Mock()
24dut.build_alias = 'Pie.118.wj12e'
25dut.product_type = 'Pixel3_XL'
26dut.serial = 'HT1178BBZWQ'
27
28
29class CorpusManagerTest(unittest.TestCase):
30    """Unit tests for corpus_manager module."""
31
32    def SetUp(self):
33        """Setup tasks."""
34        self.category = "category_default"
35        self.name = "name_default"
36
37    def testInitializationDisabled(self):
38        """Tests the disabled initilization of a CorpusManager object."""
39        _corpus_manager = corpus_manager.CorpusManager({}, dut)
40        self.assertEqual(_corpus_manager.enabled, False)
41
42    def testInitializationEnabled(self):
43        """Tests the enabled initilization of a CorpusManager object.
44
45        If we initially begin with enabled=True, it will automatically
46        attempt to connect GCS API.
47        """
48        _corpus_manager = corpus_manager.CorpusManager({}, dut)
49        _corpus_manager.enabled = True
50        self.assertEqual(_corpus_manager.enabled, True)
51        self.assertEqual(_corpus_manager._gcs_path, 'corpus/Pie/Pixel3_XL')
52        self.assertEqual(_corpus_manager._device_serial, 'HT1178BBZWQ')
53
54    def testFetchCorpusSeedEmpty_STAGE_1(self):
55        """Tests the FetchCorpusSeed function of a CorpusManager object.
56
57        This is tested with an empty seed directory for stage 1 algorithm.
58        """
59        corpus_manager.SCHEDULING_ALGORITHM = 1
60        _corpus_manager = corpus_manager.CorpusManager({}, dut)
61        _corpus_manager.enabled = True
62        _corpus_manager._gcs_api_utils = mock.MagicMock()
63        _corpus_manager._FetchCorpusSeedFromPriority = mock.MagicMock()
64        _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
65        _corpus_manager._FetchCorpusSeedFromPriority.assert_called_with(
66            'ILight', '/tmp/tmpDir1', 'corpus_seed')
67
68    def testFetchCorpusSeedEmpty_STAGE_2(self):
69        """Tests the FetchCorpusSeed function of a CorpusManager object.
70
71        This is tested with an empty seed directory for stage 2 algorithm.
72        """
73        corpus_manager.SCHEDULING_ALGORITHM = 2
74        _corpus_manager = corpus_manager.CorpusManager({}, dut)
75        _corpus_manager.enabled = True
76        _corpus_manager._gcs_api_utils = mock.MagicMock()
77        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = []
78        _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
79        self.assertEquals(
80            _corpus_manager._gcs_api_utils.ListFilesWithPrefix.call_count, 3)
81
82    def testFetchCorpusSeedEmpty_STAGE_3_LOCKED(self):
83        """Tests the FetchCorpusSeed function of a CorpusManager object.
84
85        This is tested with an empty seed directory for stage 3 algorithm.
86        """
87        corpus_manager.SCHEDULING_ALGORITHM = 3
88        _corpus_manager = corpus_manager.CorpusManager({}, dut)
89        _corpus_manager.enabled = True
90        _corpus_manager._gcs_api_utils = mock.MagicMock()
91        _corpus_manager._gcs_api_utils.PrefixExists_return_value = True
92        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = []
93        res = _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
94        self.assertEquals(res, 'locked')
95
96    def testFetchCorpusSeedEmpty_STAGE_3_NOT_LOCKED(self):
97        """Tests the FetchCorpusSeed function of a CorpusManager object.
98
99        This is tested with an empty seed directory for stage 3 algorithm.
100        """
101        corpus_manager.SCHEDULING_ALGORITHM = 3
102        _corpus_manager = corpus_manager.CorpusManager({}, dut)
103        _corpus_manager.enabled = True
104        _corpus_manager._gcs_api_utils = mock.MagicMock()
105        _corpus_manager._gcs_api_utils.PrefixExists.return_value = False
106        _corpus_manager._gcs_api_utils.ContFiles.return_value = 0
107        _corpus_manager.add_lock = mock.MagicMock()
108        _corpus_manager._FetchCorpusSeedDirectory = mock.MagicMock()
109        res = _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
110        self.assertEquals(res, 'directory')
111
112    def testFetchCorpusSeedValid_STAGE_1(self):
113        """Tests the FetchCorpusSeed function of a CorpusManager object.
114
115        This is tested with a valid seed directory for stage 1 algorithm.
116        """
117        corpus_manager.SCHEDULING_ALGORITHM = 1
118        _corpus_manager = corpus_manager.CorpusManager({}, dut)
119        _corpus_manager.enabled = True
120        _corpus_manager._gcs_api_utils = mock.MagicMock()
121        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
122            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
123        ]
124        _corpus_manager._gcs_api_utils.MoveFile.return_value = True
125        _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
126        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with(
127            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed')
128        _corpus_manager._gcs_api_utils.MoveFile.assert_called()
129        _corpus_manager._gcs_api_utils.PrepareDownloadDestination.assert_called(
130        )
131        _corpus_manager._gcs_api_utils.DownloadFile.assert_called()
132
133    def testFetchCorpusSeedValid_STAGE_2(self):
134        """Tests the FetchCorpusSeed function of a CorpusManager object.
135
136        This is tested with a valid seed directory for stage 2 algorithm.
137        """
138        corpus_manager.SCHEDULING_ALGORITHM = 2
139        _corpus_manager = corpus_manager.CorpusManager({}, dut)
140        _corpus_manager.enabled = True
141        _corpus_manager._gcs_api_utils = mock.MagicMock()
142        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
143            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
144        ]
145        _corpus_manager._gcs_api_utils.MoveFile.return_value = True
146        _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
147        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with(
148            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed_high')
149        _corpus_manager._gcs_api_utils.MoveFile.assert_called()
150        _corpus_manager._gcs_api_utils.PrepareDownloadDestination.assert_called(
151        )
152        _corpus_manager._gcs_api_utils.DownloadFile.assert_called()
153
154    def testFetchCorpusSeedValid_STAGE_3(self):
155        """Tests the FetchCorpusSeed function of a CorpusManager object.
156
157        This is tested with a valid seed directory for stage 3 algorithm.
158        """
159        corpus_manager.SCHEDULING_ALGORITHM = 3
160        _corpus_manager = corpus_manager.CorpusManager({}, dut)
161        _corpus_manager.enabled = True
162        _corpus_manager._gcs_api_utils = mock.MagicMock()
163        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
164            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
165        ]
166        _corpus_manager._gcs_api_utils.PrefixExists.return_value = False
167        _corpus_manager.add_lock = mock.MagicMock()
168        res = _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1')
169        self.assertEquals(res, 'directory')
170        _corpus_manager._gcs_api_utils.PrepareDownloadDestination.assert_called(
171        )
172        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with(
173            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed')
174        _corpus_manager._gcs_api_utils.DownloadFile.assert_called()
175
176    def testUploadCorpusOutDir(self):
177        """Tests the UploadCorpusOutDir function of a CorpusManager object."""
178        corpus_manager.MEASURE_CORPUS = False
179        _corpus_manager = corpus_manager.CorpusManager({}, dut)
180        _corpus_manager.enabled = True
181        _corpus_manager._gcs_api_utils = mock.MagicMock()
182        _corpus_manager._gcs_api_utils.UploadDir.return_value = True
183        _corpus_manager._ClassifyPriority = mock.MagicMock()
184        _corpus_manager.UploadCorpusOutDir('ILight', '/tmp/tmpDir1')
185        _corpus_manager._gcs_api_utils.UploadDir.assert_called_with(
186            '/tmp/tmpDir1/ILight_corpus_out',
187            'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1')
188        _corpus_manager._ClassifyPriority.assert_called_with(
189            'ILight', '/tmp/tmpDir1')
190
191    def testInuseToDestSeed(self):
192        """Tests the InuseToDest function of a CorpusManager object."""
193        _corpus_manager = corpus_manager.CorpusManager({}, dut)
194        _corpus_manager.enabled = True
195        _corpus_manager._gcs_api_utils = mock.MagicMock()
196
197        _corpus_manager.InuseToDest(
198            'ILight',
199            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1',
200            'corpus_seed')
201        _corpus_manager._gcs_api_utils.MoveFile.assert_called_with(
202            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1',
203            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed/corpus_number_1',
204            True)
205
206        _corpus_manager.InuseToDest(
207            'ILight',
208            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1',
209            'corpus_complete')
210        _corpus_manager._gcs_api_utils.MoveFile.assert_called_with(
211            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1',
212            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_complete/corpus_number_1',
213            True)
214
215        _corpus_manager.InuseToDest(
216            'ILight',
217            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1',
218            'corpus_crash')
219        _corpus_manager._gcs_api_utils.MoveFile.assert_called_with(
220            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1',
221            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_crash/corpus_number_1',
222            True)
223
224    def test_ClassifyPriority1(self):
225        """Tests the _ClassifyPriority1 function of a CorpusManager object."""
226        corpus_manager.SCHEDULING_ALGORITHM = 1
227        _corpus_manager = corpus_manager.CorpusManager({}, dut)
228        _corpus_manager.enabled = True
229        _corpus_manager._gcs_api_utils = mock.MagicMock()
230        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
231            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
232        ]
233        _corpus_manager._CorpusIsDuplicate = mock.MagicMock()
234        _corpus_manager._CorpusIsDuplicate.return_value = False
235        _corpus_manager._ClassifyPriority1('ILight', '/tmp/tmpDir1')
236        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with(
237            'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1/ILight_corpus_out')
238        self.assertEquals(_corpus_manager._gcs_api_utils.MoveFile.call_count,
239                          5)
240
241    def test_ClassifyPriority2(self):
242        """Tests the _ClassifyPriority2 function of a CorpusManager object."""
243        corpus_manager.SCHEDULING_ALGORITHM = 2
244        corpus_manager.os.path.exists = mock.MagicMock()
245        corpus_manager.os.path.exists.return_value = True
246        _corpus_manager = corpus_manager.CorpusManager({}, dut)
247        _corpus_manager.enabled = True
248        _corpus_manager._gcs_api_utils = mock.MagicMock()
249        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
250            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
251        ]
252        _corpus_manager._CorpusIsDuplicate = mock.MagicMock()
253        _corpus_manager._CorpusIsDuplicate.return_value = False
254        _corpus_manager._ClassifyPriority2('ILight', '/tmp/tmpDir1')
255        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with(
256            'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1/ILight_corpus_out')
257        self.assertEquals(_corpus_manager._gcs_api_utils.MoveFile.call_count,
258                          5)
259
260    def test_ClassifyPriority3(self):
261        """Tests the _ClassifyPriority3 function of a CorpusManager object."""
262        corpus_manager.SCHEDULING_ALGORITHM = 3
263        corpus_manager.REPEAT_TIMES = 5
264        corpus_manager.os.path.exists = mock.MagicMock()
265        corpus_manager.os.path.exists.return_value = True
266        _corpus_manager = corpus_manager.CorpusManager({}, dut)
267        _corpus_manager.enabled = True
268        _corpus_manager._gcs_api_utils = mock.MagicMock()
269        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
270            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
271        ]
272        _corpus_manager._gcs_api_utils.PrefixExists.return_value = True
273        _corpus_manager._MoveCorpusDirectory = mock.MagicMock()
274        _corpus_manager._ClassifyPriority3('ILight', '/tmp/tmpDir1')
275        self.assertEquals(_corpus_manager._MoveCorpusDirectory.call_count, 2)
276        _corpus_manager._MoveCorpusDirectory.assert_called_with(
277            'ILight', '/tmp/tmpDir1', 'incoming_child', 'corpus_complete')
278
279    def test_MoveCorpusDirectory(self):
280        """Tests the _MoveCorpusDirectory function of a CorpusManager object."""
281        _corpus_manager = corpus_manager.CorpusManager({}, dut)
282        _corpus_manager.enabled = True
283        _corpus_manager._gcs_api_utils = mock.MagicMock()
284        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [
285            'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5'
286        ]
287        _corpus_manager._MoveCorpusDirectory('ILight', '/tmp/tmpDir1',
288                                             'corpus_seed', 'corpus_complete')
289        _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with(
290            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed', strict=True)
291        self.assertEquals(_corpus_manager._gcs_api_utils.MoveFile.call_count,
292                          5)
293
294    def test_GetDirPaths(self):
295        """Tests the _GetDirPaths function of a CorpusManager object."""
296        _corpus_manager = corpus_manager.CorpusManager({}, dut)
297        self.assertEqual(
298            _corpus_manager._GetDirPaths('corpus_seed', 'ILight'),
299            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed')
300        self.assertEqual(
301            _corpus_manager._GetDirPaths('incoming_parent', 'ILight',
302                                         '/tmp/tmpDir1'),
303            'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1')
304        self.assertEqual(
305            _corpus_manager._GetDirPaths('incoming_child', 'ILight',
306                                         '/tmp/tmpDir1'),
307            'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1/ILight_corpus_out')
308        self.assertEqual(
309            _corpus_manager._GetDirPaths('corpus_seed', 'ILight'),
310            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed')
311
312    def test_GetFilePaths(self):
313        """Tests the _GetFilePaths function of a CorpusManager object."""
314        _corpus_manager = corpus_manager.CorpusManager({}, dut)
315        self.assertEqual(
316            _corpus_manager._GetFilePaths('corpus_seed', 'ILight',
317                                          'some_dir/corpus_number_1'),
318            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed/corpus_number_1')
319        self.assertEqual(
320            _corpus_manager._GetFilePaths('corpus_inuse', 'ILight',
321                                          'some_dir/corpus_number_1'),
322            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1')
323        self.assertEqual(
324            _corpus_manager._GetFilePaths('corpus_complete', 'ILight',
325                                          'somedir/corpus_number_1'),
326            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_complete/corpus_number_1'
327        )
328        self.assertEqual(
329            _corpus_manager._GetFilePaths('corpus_crash', 'ILight',
330                                          'somedir/corpus_number_1'),
331            'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_crash/corpus_number_1')
332
333
334if __name__ == "__main__":
335    unittest.main()
336