1#!/usr/bin/env python 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import mock 18import os 19import unittest 20 21from vts.utils.python.fuzzer import corpus_manager 22 23dut = mock.Mock() 24dut.build_alias = 'Pie.118.wj12e' 25dut.product_type = 'Pixel3_XL' 26dut.serial = 'HT1178BBZWQ' 27 28 29class CorpusManagerTest(unittest.TestCase): 30 """Unit tests for corpus_manager module.""" 31 32 def SetUp(self): 33 """Setup tasks.""" 34 self.category = "category_default" 35 self.name = "name_default" 36 37 def testInitializationDisabled(self): 38 """Tests the disabled initilization of a CorpusManager object.""" 39 _corpus_manager = corpus_manager.CorpusManager({}, dut) 40 self.assertEqual(_corpus_manager.enabled, False) 41 42 def testInitializationEnabled(self): 43 """Tests the enabled initilization of a CorpusManager object. 44 45 If we initially begin with enabled=True, it will automatically 46 attempt to connect GCS API. 47 """ 48 _corpus_manager = corpus_manager.CorpusManager({}, dut) 49 _corpus_manager.enabled = True 50 self.assertEqual(_corpus_manager.enabled, True) 51 self.assertEqual(_corpus_manager._gcs_path, 'corpus/Pie/Pixel3_XL') 52 self.assertEqual(_corpus_manager._device_serial, 'HT1178BBZWQ') 53 54 def testFetchCorpusSeedEmpty_STAGE_1(self): 55 """Tests the FetchCorpusSeed function of a CorpusManager object. 56 57 This is tested with an empty seed directory for stage 1 algorithm. 58 """ 59 corpus_manager.SCHEDULING_ALGORITHM = 1 60 _corpus_manager = corpus_manager.CorpusManager({}, dut) 61 _corpus_manager.enabled = True 62 _corpus_manager._gcs_api_utils = mock.MagicMock() 63 _corpus_manager._FetchCorpusSeedFromPriority = mock.MagicMock() 64 _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 65 _corpus_manager._FetchCorpusSeedFromPriority.assert_called_with( 66 'ILight', '/tmp/tmpDir1', 'corpus_seed') 67 68 def testFetchCorpusSeedEmpty_STAGE_2(self): 69 """Tests the FetchCorpusSeed function of a CorpusManager object. 70 71 This is tested with an empty seed directory for stage 2 algorithm. 72 """ 73 corpus_manager.SCHEDULING_ALGORITHM = 2 74 _corpus_manager = corpus_manager.CorpusManager({}, dut) 75 _corpus_manager.enabled = True 76 _corpus_manager._gcs_api_utils = mock.MagicMock() 77 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [] 78 _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 79 self.assertEquals( 80 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.call_count, 3) 81 82 def testFetchCorpusSeedEmpty_STAGE_3_LOCKED(self): 83 """Tests the FetchCorpusSeed function of a CorpusManager object. 84 85 This is tested with an empty seed directory for stage 3 algorithm. 86 """ 87 corpus_manager.SCHEDULING_ALGORITHM = 3 88 _corpus_manager = corpus_manager.CorpusManager({}, dut) 89 _corpus_manager.enabled = True 90 _corpus_manager._gcs_api_utils = mock.MagicMock() 91 _corpus_manager._gcs_api_utils.PrefixExists_return_value = True 92 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [] 93 res = _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 94 self.assertEquals(res, 'locked') 95 96 def testFetchCorpusSeedEmpty_STAGE_3_NOT_LOCKED(self): 97 """Tests the FetchCorpusSeed function of a CorpusManager object. 98 99 This is tested with an empty seed directory for stage 3 algorithm. 100 """ 101 corpus_manager.SCHEDULING_ALGORITHM = 3 102 _corpus_manager = corpus_manager.CorpusManager({}, dut) 103 _corpus_manager.enabled = True 104 _corpus_manager._gcs_api_utils = mock.MagicMock() 105 _corpus_manager._gcs_api_utils.PrefixExists.return_value = False 106 _corpus_manager._gcs_api_utils.ContFiles.return_value = 0 107 _corpus_manager.add_lock = mock.MagicMock() 108 _corpus_manager._FetchCorpusSeedDirectory = mock.MagicMock() 109 res = _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 110 self.assertEquals(res, 'directory') 111 112 def testFetchCorpusSeedValid_STAGE_1(self): 113 """Tests the FetchCorpusSeed function of a CorpusManager object. 114 115 This is tested with a valid seed directory for stage 1 algorithm. 116 """ 117 corpus_manager.SCHEDULING_ALGORITHM = 1 118 _corpus_manager = corpus_manager.CorpusManager({}, dut) 119 _corpus_manager.enabled = True 120 _corpus_manager._gcs_api_utils = mock.MagicMock() 121 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 122 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 123 ] 124 _corpus_manager._gcs_api_utils.MoveFile.return_value = True 125 _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 126 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with( 127 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed') 128 _corpus_manager._gcs_api_utils.MoveFile.assert_called() 129 _corpus_manager._gcs_api_utils.PrepareDownloadDestination.assert_called( 130 ) 131 _corpus_manager._gcs_api_utils.DownloadFile.assert_called() 132 133 def testFetchCorpusSeedValid_STAGE_2(self): 134 """Tests the FetchCorpusSeed function of a CorpusManager object. 135 136 This is tested with a valid seed directory for stage 2 algorithm. 137 """ 138 corpus_manager.SCHEDULING_ALGORITHM = 2 139 _corpus_manager = corpus_manager.CorpusManager({}, dut) 140 _corpus_manager.enabled = True 141 _corpus_manager._gcs_api_utils = mock.MagicMock() 142 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 143 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 144 ] 145 _corpus_manager._gcs_api_utils.MoveFile.return_value = True 146 _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 147 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with( 148 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed_high') 149 _corpus_manager._gcs_api_utils.MoveFile.assert_called() 150 _corpus_manager._gcs_api_utils.PrepareDownloadDestination.assert_called( 151 ) 152 _corpus_manager._gcs_api_utils.DownloadFile.assert_called() 153 154 def testFetchCorpusSeedValid_STAGE_3(self): 155 """Tests the FetchCorpusSeed function of a CorpusManager object. 156 157 This is tested with a valid seed directory for stage 3 algorithm. 158 """ 159 corpus_manager.SCHEDULING_ALGORITHM = 3 160 _corpus_manager = corpus_manager.CorpusManager({}, dut) 161 _corpus_manager.enabled = True 162 _corpus_manager._gcs_api_utils = mock.MagicMock() 163 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 164 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 165 ] 166 _corpus_manager._gcs_api_utils.PrefixExists.return_value = False 167 _corpus_manager.add_lock = mock.MagicMock() 168 res = _corpus_manager.FetchCorpusSeed('ILight', '/tmp/tmpDir1') 169 self.assertEquals(res, 'directory') 170 _corpus_manager._gcs_api_utils.PrepareDownloadDestination.assert_called( 171 ) 172 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with( 173 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed') 174 _corpus_manager._gcs_api_utils.DownloadFile.assert_called() 175 176 def testUploadCorpusOutDir(self): 177 """Tests the UploadCorpusOutDir function of a CorpusManager object.""" 178 corpus_manager.MEASURE_CORPUS = False 179 _corpus_manager = corpus_manager.CorpusManager({}, dut) 180 _corpus_manager.enabled = True 181 _corpus_manager._gcs_api_utils = mock.MagicMock() 182 _corpus_manager._gcs_api_utils.UploadDir.return_value = True 183 _corpus_manager._ClassifyPriority = mock.MagicMock() 184 _corpus_manager.UploadCorpusOutDir('ILight', '/tmp/tmpDir1') 185 _corpus_manager._gcs_api_utils.UploadDir.assert_called_with( 186 '/tmp/tmpDir1/ILight_corpus_out', 187 'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1') 188 _corpus_manager._ClassifyPriority.assert_called_with( 189 'ILight', '/tmp/tmpDir1') 190 191 def testInuseToDestSeed(self): 192 """Tests the InuseToDest function of a CorpusManager object.""" 193 _corpus_manager = corpus_manager.CorpusManager({}, dut) 194 _corpus_manager.enabled = True 195 _corpus_manager._gcs_api_utils = mock.MagicMock() 196 197 _corpus_manager.InuseToDest( 198 'ILight', 199 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1', 200 'corpus_seed') 201 _corpus_manager._gcs_api_utils.MoveFile.assert_called_with( 202 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1', 203 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed/corpus_number_1', 204 True) 205 206 _corpus_manager.InuseToDest( 207 'ILight', 208 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1', 209 'corpus_complete') 210 _corpus_manager._gcs_api_utils.MoveFile.assert_called_with( 211 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1', 212 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_complete/corpus_number_1', 213 True) 214 215 _corpus_manager.InuseToDest( 216 'ILight', 217 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1', 218 'corpus_crash') 219 _corpus_manager._gcs_api_utils.MoveFile.assert_called_with( 220 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1', 221 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_crash/corpus_number_1', 222 True) 223 224 def test_ClassifyPriority1(self): 225 """Tests the _ClassifyPriority1 function of a CorpusManager object.""" 226 corpus_manager.SCHEDULING_ALGORITHM = 1 227 _corpus_manager = corpus_manager.CorpusManager({}, dut) 228 _corpus_manager.enabled = True 229 _corpus_manager._gcs_api_utils = mock.MagicMock() 230 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 231 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 232 ] 233 _corpus_manager._CorpusIsDuplicate = mock.MagicMock() 234 _corpus_manager._CorpusIsDuplicate.return_value = False 235 _corpus_manager._ClassifyPriority1('ILight', '/tmp/tmpDir1') 236 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with( 237 'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1/ILight_corpus_out') 238 self.assertEquals(_corpus_manager._gcs_api_utils.MoveFile.call_count, 239 5) 240 241 def test_ClassifyPriority2(self): 242 """Tests the _ClassifyPriority2 function of a CorpusManager object.""" 243 corpus_manager.SCHEDULING_ALGORITHM = 2 244 corpus_manager.os.path.exists = mock.MagicMock() 245 corpus_manager.os.path.exists.return_value = True 246 _corpus_manager = corpus_manager.CorpusManager({}, dut) 247 _corpus_manager.enabled = True 248 _corpus_manager._gcs_api_utils = mock.MagicMock() 249 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 250 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 251 ] 252 _corpus_manager._CorpusIsDuplicate = mock.MagicMock() 253 _corpus_manager._CorpusIsDuplicate.return_value = False 254 _corpus_manager._ClassifyPriority2('ILight', '/tmp/tmpDir1') 255 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with( 256 'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1/ILight_corpus_out') 257 self.assertEquals(_corpus_manager._gcs_api_utils.MoveFile.call_count, 258 5) 259 260 def test_ClassifyPriority3(self): 261 """Tests the _ClassifyPriority3 function of a CorpusManager object.""" 262 corpus_manager.SCHEDULING_ALGORITHM = 3 263 corpus_manager.REPEAT_TIMES = 5 264 corpus_manager.os.path.exists = mock.MagicMock() 265 corpus_manager.os.path.exists.return_value = True 266 _corpus_manager = corpus_manager.CorpusManager({}, dut) 267 _corpus_manager.enabled = True 268 _corpus_manager._gcs_api_utils = mock.MagicMock() 269 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 270 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 271 ] 272 _corpus_manager._gcs_api_utils.PrefixExists.return_value = True 273 _corpus_manager._MoveCorpusDirectory = mock.MagicMock() 274 _corpus_manager._ClassifyPriority3('ILight', '/tmp/tmpDir1') 275 self.assertEquals(_corpus_manager._MoveCorpusDirectory.call_count, 2) 276 _corpus_manager._MoveCorpusDirectory.assert_called_with( 277 'ILight', '/tmp/tmpDir1', 'incoming_child', 'corpus_complete') 278 279 def test_MoveCorpusDirectory(self): 280 """Tests the _MoveCorpusDirectory function of a CorpusManager object.""" 281 _corpus_manager = corpus_manager.CorpusManager({}, dut) 282 _corpus_manager.enabled = True 283 _corpus_manager._gcs_api_utils = mock.MagicMock() 284 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.return_value = [ 285 'dir/file1', 'dir/file2', 'dir/file3', 'dir/file4', 'dir/file5' 286 ] 287 _corpus_manager._MoveCorpusDirectory('ILight', '/tmp/tmpDir1', 288 'corpus_seed', 'corpus_complete') 289 _corpus_manager._gcs_api_utils.ListFilesWithPrefix.assert_called_with( 290 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed', strict=True) 291 self.assertEquals(_corpus_manager._gcs_api_utils.MoveFile.call_count, 292 5) 293 294 def test_GetDirPaths(self): 295 """Tests the _GetDirPaths function of a CorpusManager object.""" 296 _corpus_manager = corpus_manager.CorpusManager({}, dut) 297 self.assertEqual( 298 _corpus_manager._GetDirPaths('corpus_seed', 'ILight'), 299 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed') 300 self.assertEqual( 301 _corpus_manager._GetDirPaths('incoming_parent', 'ILight', 302 '/tmp/tmpDir1'), 303 'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1') 304 self.assertEqual( 305 _corpus_manager._GetDirPaths('incoming_child', 'ILight', 306 '/tmp/tmpDir1'), 307 'corpus/Pie/Pixel3_XL/ILight/incoming/tmpDir1/ILight_corpus_out') 308 self.assertEqual( 309 _corpus_manager._GetDirPaths('corpus_seed', 'ILight'), 310 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed') 311 312 def test_GetFilePaths(self): 313 """Tests the _GetFilePaths function of a CorpusManager object.""" 314 _corpus_manager = corpus_manager.CorpusManager({}, dut) 315 self.assertEqual( 316 _corpus_manager._GetFilePaths('corpus_seed', 'ILight', 317 'some_dir/corpus_number_1'), 318 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_seed/corpus_number_1') 319 self.assertEqual( 320 _corpus_manager._GetFilePaths('corpus_inuse', 'ILight', 321 'some_dir/corpus_number_1'), 322 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_inuse/corpus_number_1') 323 self.assertEqual( 324 _corpus_manager._GetFilePaths('corpus_complete', 'ILight', 325 'somedir/corpus_number_1'), 326 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_complete/corpus_number_1' 327 ) 328 self.assertEqual( 329 _corpus_manager._GetFilePaths('corpus_crash', 'ILight', 330 'somedir/corpus_number_1'), 331 'corpus/Pie/Pixel3_XL/ILight/ILight_corpus_crash/corpus_number_1') 332 333 334if __name__ == "__main__": 335 unittest.main() 336