1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the 'License');
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an 'AS IS' BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import logging
18import os
19import random
20import re
21import time
22import uuid
23
24from vts.runners.host import keys
25from vts.utils.python.gcs import gcs_api_utils
26from vts.utils.python.web import feature_utils
27"""
28Stage 1: FETCH_ONE_AND_FEED
29The stage 1 algorithm collects the corpus output generated from the fuzz test.
30Then, the algorithm chooses by random one of the generated seeds in the
31next round as input.
32
33Stage 2: FETCH_CRASH_AND_FEED
34The stage 2 algorithm classifies generated corpus output into two priorities:
35high priority and regular priority. Corpus strings created during a fuzz test
36run that revealed a crash will be given a high priority.
37On the other hand, corpus strings created during a fuzz test run that did
38not lead to a crash will be given the regular priority.
39
40Stage 3: FETCH_ALL_AND_REPEAT
41The stage 3 algorithm feeds the entire corpus body generated from the
42previous run as corpus seed input directory. This process will
43repeat {REPEAT_TIMES} times. After executing {REPEAT_TIMES},
44the scheduling algorithm will start a new session with an empty corpus seed
45and reset the counter to 0.
46"""
47FETCH_ONE_AND_FEED = 1
48FETCH_CRASH_AND_FEED = 2
49FETCH_ALL_AND_REPEAT = 3
50REPEAT_TIMES = 5
51
52SCHEDULING_ALGORITHM = FETCH_ALL_AND_REPEAT
53MEASURE_CORPUS = True
54CORPUS_STATES = [
55    'corpus_seed_high', 'corpus_seed', 'corpus_seed_low', 'corpus_inuse',
56    'corpus_complete', 'corpus_crash', 'corpus_error', 'corpus_trigger'
57]
58CORPUS_PRIORITIES = ['corpus_seed_high', 'corpus_seed', 'corpus_seed_low']
59
60
61class CorpusManager(feature_utils.Feature):
62    """Manages corpus for fuzzing.
63
64    Features include:
65    Fetching corpus input from GCS to host.
66    Uploading corpus output from host to GCS.
67    Classifying corpus output into different priorities.
68    Moving corpus between different states (seed, inuse, complete).
69
70    Attributes:
71        _TOGGLE_PARAM: String, the name of the parameter used to toggle the feature.
72        _REQUIRED_PARAMS: list, the list of parameter names that are required.
73        _OPTIONAL_PARAMS: list, the list of parameter names that are optional.
74        _key_path: string, path to the json path.
75        _bucket_name: string, name of the Google Cloud Storage bucket used.
76        _gcs_api_utils: GcsApiUtils object, used to communicate with GCS.
77        _gcs_path: string, path to the upper most level corpus directory in GCS.
78        _device_serial: string, serial number of the current target device.
79    """
80
81    _TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_LOG_UPLOADING
82    _REQUIRED_PARAMS = [
83        keys.ConfigKeys.IKEY_SERVICE_JSON_PATH,
84        keys.ConfigKeys.IKEY_FUZZING_GCS_BUCKET_NAME
85    ]
86    _OPTIONAL_PARAMS = []
87
88    def __init__(self, user_params, dut):
89        """Initializes the gcs util provider.
90
91        Args:
92            user_params: A dictionary from parameter name (String) to parameter value.
93            dut: The Android device we are testing against.
94        """
95        self.ParseParameters(
96            toggle_param_name=self._TOGGLE_PARAM,
97            required_param_names=self._REQUIRED_PARAMS,
98            optional_param_names=self._OPTIONAL_PARAMS,
99            user_params=user_params)
100
101        if self.enabled:
102            self._key_path = self.service_key_json_path
103            self._bucket_name = self.fuzzing_gcs_bucket_name
104            self._gcs_api_utils = gcs_api_utils.GcsApiUtils(
105                self._key_path, self._bucket_name)
106            self.enabled = self._gcs_api_utils.Enabled
107
108        branch = dut.build_alias.split('.')[0]
109        model = dut.product_type
110        self._gcs_path = os.path.join('corpus', branch, model)
111        self._device_serial = dut.serial
112
113    def FetchCorpusSeed(self, test_name, local_temp_dir):
114        """Fetches seed corpus of the corresponding test from the GCS directory.
115
116        Args:
117            test_name: string, name of the current fuzzing test.
118            local_temp_dir: string, path to temporary directory for this test
119                            on the host machine.
120
121        Returns:
122            inuse_seed, GCS file path of the seed in use for test case
123                        if fetch was successful.
124            None otherwise.
125        """
126        if self.enabled:
127            logging.debug('Attempting to fetch corpus seed for %s.', test_name)
128        else:
129            return None
130
131        if SCHEDULING_ALGORITHM == FETCH_ONE_AND_FEED:
132            inuse_seed = self._FetchCorpusSeedFromPriority(
133                test_name, local_temp_dir, 'corpus_seed')
134            return inuse_seed
135        elif SCHEDULING_ALGORITHM == FETCH_CRASH_AND_FEED:
136            for CORPUS_PRIORITY in CORPUS_PRIORITIES:
137                inuse_seed = self._FetchCorpusSeedFromPriority(
138                    test_name, local_temp_dir, CORPUS_PRIORITY)
139                if inuse_seed is not None:
140                    return inuse_seed
141            return None
142        elif SCHEDULING_ALGORITHM == FETCH_ALL_AND_REPEAT:
143            if self._gcs_api_utils.PrefixExists(
144                    self._GetDirPaths('corpus_lock', test_name)):
145                logging.error('test locked. skipping.')
146                return 'locked'
147            else:
148                self.add_lock(test_name, local_temp_dir)
149                self._FetchCorpusSeedDirectory(test_name, local_temp_dir)
150                return 'directory'
151
152    def _FetchCorpusSeedFromPriority(self, test_name, local_temp_dir,
153                                     CORPUS_PRIORITY):
154        """Fetches 1 seed corpus from a corpus seed directory with the given priority.
155
156        In GCS, moves the seed from corpus_seed directory to corpus_inuse directory.
157        From GCS to host, downloads 1 corpus seed from corpus_inuse directory
158        to {temp_dir}_{test_name}_corpus_seed in host machine.
159
160        Args:
161            test_name: string, name of the current fuzzing test.
162            local_temp_dir: string, path to temporary directory for this test
163                            on the host machine.
164            CORPUS_PRIORITY: string, priority of the given directory.
165
166        Returns:
167            inuse_seed, GCS file path of the seed in use for test case
168                        if fetch was successful.
169            None otherwise.
170        """
171        corpus_seed_dir = self._GetDirPaths(CORPUS_PRIORITY, test_name)
172        num_try = 0
173        while num_try < 10:
174            seed_list = self._gcs_api_utils.ListFilesWithPrefix(
175                corpus_seed_dir)
176
177            if len(seed_list) == 0:
178                logging.info('No corpus available to fetch from %s.',
179                             corpus_seed_dir)
180                return None
181
182            target_seed = seed_list[random.randint(0, len(seed_list) - 1)]
183            inuse_seed = self._GetFilePaths('corpus_inuse', test_name,
184                                            target_seed)
185            move_successful = self._gcs_api_utils.MoveFile(
186                target_seed, inuse_seed, False)
187
188            if move_successful:
189                local_dest_folder = self._gcs_api_utils.PrepareDownloadDestination(
190                    corpus_seed_dir, local_temp_dir)
191                dest_file_path = os.path.join(local_dest_folder,
192                                              os.path.basename(target_seed))
193                try:
194                    self._gcs_api_utils.DownloadFile(inuse_seed,
195                                                     dest_file_path)
196                    logging.info('Successfully fetched corpus seed from %s.',
197                                 corpus_seed_dir)
198                except:
199                    logging.error('Download failed, retrying.')
200                    continue
201                return inuse_seed
202            else:
203                num_try += 1
204                logging.debug('move try %d failed, retrying.', num_try)
205                continue
206
207    def _FetchCorpusSeedDirectory(self, test_name, local_temp_dir):
208        """Fetches an entire corpus directory generated from the previous run.
209
210        From GCS to host, downloads corpus seed from corpus_seed directory
211        to {temp_dir}_{test_name}_corpus_seed in host machine.
212
213        Args:
214            test_name: string, name of the current fuzzing test.
215            local_temp_dir: string, path to temporary directory for this test
216                            on the host machine.
217
218        Returns:
219            corpus_seed_dir, GCS directory of the seed directory for test case
220                             if fetch was successful.
221            None otherwise.
222        """
223        corpus_seed_dir = self._GetDirPaths('corpus_seed', test_name)
224        seed_count = self._gcs_api_utils.CountFiles(corpus_seed_dir)
225        if seed_count == 0:
226            logging.info('No corpus available to fetch from %s.',
227                         corpus_seed_dir)
228            return None
229
230        logging.info('Fetching %d corpus strings from %s', seed_count,
231                     corpus_seed_dir)
232        local_dest_folder = self._gcs_api_utils.PrepareDownloadDestination(
233            corpus_seed_dir, local_temp_dir)
234        for target_seed in self._gcs_api_utils.ListFilesWithPrefix(
235                corpus_seed_dir):
236            dest_file_path = os.path.join(local_dest_folder,
237                                          os.path.basename(target_seed))
238            self._gcs_api_utils.DownloadFile(target_seed, dest_file_path)
239        return corpus_seed_dir
240
241    def UploadCorpusOutDir(self, test_name, local_temp_dir):
242        """Uploads the corpus output source directory in host to GCS.
243
244        First, uploads the corpus output sorce directory in host to
245        its corresponding incoming directory in GCS.
246        Then, calls _ClassifyPriority function to classify each of
247        newly generated corpus by its priority.
248        Empty directory can be handled in the case no interesting corpus
249        was generated.
250
251        Args:
252            test_name: string, name of the current fuzzing test.
253            local_temp_dir: string, path to temporary directory for this test
254                            on the host machine.
255
256        Returns:
257            True if successfully uploaded.
258            False otherwise.
259        """
260        if self.enabled:
261            logging.debug('Attempting to upload corpus output for %s.',
262                          test_name)
263        else:
264            return False
265
266        local_corpus_out_dir = self._GetDirPaths('local_corpus_out', test_name,
267                                                 local_temp_dir)
268        incoming_parent_dir = self._GetDirPaths('incoming_parent', test_name,
269                                                local_temp_dir)
270        if self._gcs_api_utils.UploadDir(local_corpus_out_dir,
271                                         incoming_parent_dir):
272            logging.info('Successfully uploaded corpus output to %s.',
273                         incoming_parent_dir)
274            num_unique_corpus = self._ClassifyPriority(test_name,
275                                                       local_temp_dir)
276            if MEASURE_CORPUS:
277                self._UploadCorpusMeasure(test_name, local_temp_dir,
278                                          num_unique_corpus)
279            return True
280        else:
281            logging.error('Failed to upload corpus output for %s.', test_name)
282            return False
283
284    def _UploadCorpusMeasure(self, test_name, local_temp_dir,
285                             num_unique_corpus):
286        """Uploads the corpus measurement file to GCS.
287
288        Args:
289            test_name: string, name of the current fuzzing test.
290            local_temp_dir: string, path to temporary directory for this test
291                            on the host machine.
292            num_unique_corpus: integer, number of unique corpus generated.
293        """
294        local_measure_file = os.path.join(
295            local_temp_dir,
296            '%s_%s.txt' % (test_name, time.strftime('%Y-%m-%d-%H%M')))
297        with open(local_measure_file, 'w') as f:
298            f.write(str(num_unique_corpus))
299        remote_measure_file = os.path.join(
300            self._GetDirPaths('corpus_measure', test_name),
301            os.path.basename(local_measure_file))
302        self._gcs_api_utils.UploadFile(local_measure_file, remote_measure_file)
303
304    def InuseToDest(self, test_name, inuse_seed, destination):
305        """Moves a corpus from corpus_inuse to destination.
306
307        Destinations are as follows:
308        corpus_seed directory is the directory for corpus that are ready
309        to be used as input corpus seed.
310        corpus_complete directory is the directory for corpus that have
311        been used as an input, succeeded, and the test exited normally.
312        corpus_crash directory is the directory for corpus whose mutant have
313        caused a fuzz test crash.
314        corpus_error directory is the directory for corpus that have
315        caused an error in executing the fuzz test.
316
317        Args:
318            test_name: string, name of the current test.
319            inuse_seed: string, path to corpus seed currently in use.
320            destination: string, destination of the seed.
321
322        Returns:
323            True if move was successful.
324            False otherwise.
325        """
326        if not self.enabled:
327            return False
328
329        if self._gcs_api_utils.FileExists(inuse_seed):
330            if destination in CORPUS_STATES:
331                corpus_destination = self._GetFilePaths(
332                    destination, test_name, inuse_seed)
333                return self._gcs_api_utils.MoveFile(inuse_seed,
334                                                    corpus_destination, True)
335            else:
336                logging.error(
337                    'destination is not one of the predefined states')
338                return False
339        else:
340            logging.error('seed in use %s does not exist', inuse_seed)
341            return False
342
343    def _CorpusIsDuplicate(self, test_name, incoming_seed):
344        """Checks if the newly generated corpus is a duplicate corpus.
345
346        Args:
347            test_name: string, name of the current test.
348            incoming_seed: string, path to the incoming seed in GCS.
349
350        Returns:
351            True if the incoming corpus already exists in the GCS bucket.
352            False otherwise.
353        """
354        for file_type in CORPUS_STATES:
355            remote_corpus = self._GetFilePaths(file_type, test_name,
356                                               incoming_seed)
357            logging.debug(remote_corpus)
358            if self._gcs_api_utils.FileExists(remote_corpus):
359                logging.info('Corpus %s already exists.', remote_corpus)
360                return True
361        return False
362
363    def _ClassifyPriority(self, test_name, local_temp_dir):
364        """Calls the appropriate classification algorithm.
365
366        Args:
367            test_name: string, name of the current test.
368            local_temp_dir: string, path to temporary directory for this
369                            test on the host machine.
370
371        Returns:
372            num_unique_corpus: integer, number of unique corpus generated.
373        """
374        if SCHEDULING_ALGORITHM == FETCH_ONE_AND_FEED:
375            return self._ClassifyPriority1(test_name, local_temp_dir)
376        elif SCHEDULING_ALGORITHM == FETCH_CRASH_AND_FEED:
377            return self._ClassifyPriority2(test_name, local_temp_dir)
378        elif SCHEDULING_ALGORITHM == FETCH_ALL_AND_REPEAT:
379            num_unique_corpus = self._ClassifyPriority3(
380                test_name, local_temp_dir)
381            self.remove_lock(test_name)
382            return num_unique_corpus
383
384    def _ClassifyPriority1(self, test_name, local_temp_dir):
385        """Classifies each of newly genereated corpus into different priorities.
386
387        Uses 1 priority level: corpus_seed.
388        This algorithm is a naive implementation.
389
390        Args:
391            test_name: string, name of the current test.
392            local_temp_dir: string, path to temporary directory for this
393                            test on the host machine.
394
395        Returns:
396            num_unique_corpus: integer, number of unique corpus generated.
397        """
398        incoming_child_dir = self._GetDirPaths('incoming_child', test_name,
399                                               local_temp_dir)
400        num_unique_corpus = 0
401        for incoming_seed in self._gcs_api_utils.ListFilesWithPrefix(
402                incoming_child_dir):
403            if self._CorpusIsDuplicate(test_name, incoming_seed):
404                logging.info('Deleting duplicate corpus.')
405                self._gcs_api_utils.DeleteFile(incoming_seed)
406                continue
407            num_unique_corpus += 1
408            logging.info(
409                'Corpus string %s was classified as regular priority.',
410                incoming_seed)
411            corpus_destination = self._GetFilePaths('corpus_seed', test_name,
412                                                    incoming_seed)
413            self._gcs_api_utils.MoveFile(incoming_seed, corpus_destination,
414                                         True)
415
416        return num_unique_corpus
417
418    def _ClassifyPriority2(self, test_name, local_temp_dir):
419        """Classifies each of newly genereated corpus into different priorities.
420
421        Uses 2 priority levels: corpus_seed_high, corpus_seed.
422        This algorithm uses crash occurrence as its classification criteria.
423
424        Args:
425            test_name: string, name of the current test.
426            local_temp_dir: string, path to temporary directory for this
427                            test on the host machine.
428
429        Returns:
430            num_unique_corpus: integer, number of unique corpus generated.
431        """
432        triggered_corpus = os.path.join(
433            self._GetDirPaths('local_corpus_trigger', test_name,
434                              local_temp_dir), 'crash_report')
435        high_priority = os.path.exists(triggered_corpus)
436        incoming_child_dir = self._GetDirPaths('incoming_child', test_name,
437                                               local_temp_dir)
438        num_unique_corpus = 0
439        for incoming_seed in self._gcs_api_utils.ListFilesWithPrefix(
440                incoming_child_dir):
441            if self._CorpusIsDuplicate(test_name, incoming_seed):
442                logging.info('Deleting duplicate corpus.')
443                self._gcs_api_utils.DeleteFile(incoming_seed)
444                continue
445
446            num_unique_corpus += 1
447            if high_priority:
448                logging.info(
449                    'corpus string %s was classified as high priority.',
450                    incoming_seed)
451                corpus_destination = self._GetFilePaths(
452                    'corpus_seed_high', test_name, incoming_seed)
453            else:
454                logging.info(
455                    'corpus string %s was classified as regular priority.',
456                    incoming_seed)
457                corpus_destination = self._GetFilePaths(
458                    'corpus_seed', test_name, incoming_seed)
459            self._gcs_api_utils.MoveFile(incoming_seed, corpus_destination,
460                                         True)
461
462        self._UploadTriggeredCorpus(test_name, local_temp_dir)
463
464        return num_unique_corpus
465
466    def _ClassifyPriority3(self, test_name, local_temp_dir):
467        """Classifies the newly genereated corpus body into priorities.
468
469        The stage 3 algorithm collects the corpus output generated
470        from the fuzz test. Then it will perform one of the following:
471
472        If we are still in the same fuzzing session:
473            - Move the seed generated from the previous run (ILight_corpus_seed)
474              to the corresponding (ILight_corpus_seed##) directiry.
475            - Move the seed generated from the current run (incoming/)
476              to the seed (ILight_corpus_seed) directory.
477        If we have reached the maximum number of runs within a session,
478        start a new session:
479            - Move the seed generated from the previous runs of this fuzz session
480              (ILight_corpus_seed, ILight_corpus_seed##)
481              to the completed (ILight_corpus_completed) directory.
482            - Move the seed generated from the current run (incoming/)
483              to the completed (ILight_corpus_completed) directory.
484
485        Args:
486            test_name: string, name of the current test.
487            local_temp_dir: string, path to temporary directory for this
488                            test on the host machine.
489
490        Returns:
491            num_unique_corpus: integer, number of unique corpus generated.
492        """
493        # check if directories seed_01, seed_02, ..., seed_{REPEAT_TIMES - 1} exist.
494        for i in range(1, REPEAT_TIMES):
495            last_corpus_dir = '%s_%02d' % (self._GetDirPaths(
496                'corpus_seed', test_name), i)
497            if not self._gcs_api_utils.PrefixExists(last_corpus_dir):
498                break
499
500        num_unique_corpus = 0
501        # if seed_01, ..., seed_{REPEAT_TIMES-2}
502        if i < REPEAT_TIMES - 1:
503            self._MoveCorpusDirectory(
504                test_name, local_temp_dir, 'corpus_seed', 'corpus_seed_%02d' % i)
505            num_unique_corpus = self._MoveCorpusDirectory(
506                test_name, local_temp_dir, 'incoming_child', 'corpus_seed')
507
508        # if seed_{REPEAT_TIMES-1}
509        else:
510            self._MoveCorpusDirectory(
511                test_name, local_temp_dir, 'corpus_seed', 'corpus_complete', False)
512            num_unique_corpus = self._MoveCorpusDirectory(
513                test_name, local_temp_dir, 'incoming_child', 'corpus_complete')
514
515        self._UploadTriggeredCorpus(test_name, local_temp_dir)
516
517        return num_unique_corpus
518
519    def _UploadTriggeredCorpus(self, test_name, local_temp_dir):
520        """Uploades the corpus that tiggered crash to GCS.
521
522        Args:
523            test_name: string, name of the current test.
524            local_temp_dir: string, path to temporary directory for this
525                            test on the host machine.
526        """
527        triggered_corpus = os.path.join(
528            self._GetDirPaths('local_corpus_trigger', test_name,
529                              local_temp_dir), 'crash_report')
530
531        if os.path.exists(triggered_corpus):
532            corpus_destination = self._GetFilePaths(
533                'corpus_trigger', test_name, triggered_corpus)
534            corpus_destination += str(uuid.uuid4())
535            self._gcs_api_utils.UploadFile(triggered_corpus,
536                                           corpus_destination)
537
538    def _MoveCorpusDirectory(self,
539                             test_name,
540                             local_temp_dir,
541                             src_dir,
542                             dest_dir,
543                             strict=True):
544        """Moves every corpus in the given src_dir to dest_dir.
545
546        Args:
547            test_name: string, name of the current test.
548            local_temp_dir: string, path to temporary directory for this
549                            test on the host machine.
550            src_dir: string, source directory of corpus strings in GCS.
551            dest_dir: string, destination directory of corpus strings in GCS.
552            strict: boolean, whether to use strict when calling ListFilesWithPrefix.
553
554        Returns:
555            num_unique_corpus: int, blah.
556        """
557        if strict:
558            logging.info('Moving %s to %s', src_dir, dest_dir)
559        else:
560            logging.info('Moving %s*  to %s', src_dir, dest_dir)
561
562        num_unique_corpus = 0
563        src_corpus_dir = self._GetDirPaths(src_dir, test_name, local_temp_dir)
564        for src_corpus in self._gcs_api_utils.ListFilesWithPrefix(
565                src_corpus_dir, strict=strict):
566            dest_corpus = self._GetFilePaths(dest_dir, test_name, src_corpus)
567            self._gcs_api_utils.MoveFile(src_corpus, dest_corpus, True)
568            num_unique_corpus += 1
569
570        return num_unique_corpus
571
572    def add_lock(self, test_name, local_temp_dir):
573        """Adds a locking mechanism to the GCS directory.
574
575        Args:
576            test_name: string, name of the current test.
577            local_temp_dir: string, path to temporary directory for this
578                            test on the host machine.
579        """
580        if self.enabled:
581            logging.info('adding a lock.')
582        else:
583            return None
584
585        # write empty file /tmp/tmpV1oPTp/HT7BF1A01613
586        local_lock_file = os.path.join(local_temp_dir, self._device_serial)
587        open(local_lock_file, 'a').close()
588        # move to corpus/PPR1/walleye/ILight/ILight_corpus_lock/HT7BF1A01613
589        self._gcs_api_utils.UploadFile(
590            local_lock_file, self._GetFilePaths('corpus_lock', test_name))
591
592    def remove_lock(self, test_name):
593        """Removes locking mechanism from the GCS directory.
594
595        Args:
596            test_name: string, name of the current test.
597        """
598        if self.enabled:
599            logging.info('removing a lock.')
600        else:
601            return None
602
603        # delete corpus/PPR1/walleye/ILight/ILight_corpus_lock/HT7BF1A01613
604        self._gcs_api_utils.DeleteFile(
605            self._GetFilePaths('corpus_lock', test_name))
606
607    def _GetDirPaths(self, dir_type, test_name, local_temp_dir=None):
608        """Generates the required directory path name for the given information.
609
610        Args:
611            dir_type: string, type of the directory requested.
612            test_name: string, name of the current test.
613            local_temp_dir: string, path to temporary directory for this
614                            test on the host machine.
615
616        Returns:
617            dir_path, generated directory path if dir_type supported.
618            Empty string if dir_type not supported.
619        """
620        dir_path = ''
621
622        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_seed
623        if dir_type in CORPUS_STATES:
624            dir_path = os.path.join(self._gcs_path, test_name,
625                                    '%s_%s' % (test_name, dir_type))
626        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_seed_01
627        elif re.match('corpus_seed_[0-9][0-9]$', dir_type):
628            dir_path = os.path.join(self._gcs_path, test_name,
629                                    '%s_%s' % (test_name, dir_type))
630        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_measure
631        elif dir_type == 'corpus_measure':
632            dir_path = os.path.join(self._gcs_path, test_name,
633                                    '%s_%s' % (test_name, dir_type))
634        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_lock
635        elif dir_type == 'corpus_lock':
636            dir_path = os.path.join(self._gcs_path, test_name,
637                                    '%s_%s' % (test_name, dir_type))
638        # ex: corpus/[build tag]/[device]/ILight/incoming/tmpV1oPTp
639        elif dir_type == 'incoming_parent':
640            dir_path = os.path.join(self._gcs_path, test_name, 'incoming',
641                                    os.path.basename(local_temp_dir))
642        # ex: corpus/[build tag]/[device]/ILight/incoming/tmpV1oPTp/ILight_corpus_out
643        elif dir_type == 'incoming_child':
644            dir_path = os.path.join(self._gcs_path, test_name, 'incoming',
645                                    os.path.basename(local_temp_dir),
646                                    '%s_corpus_out' % test_name)
647        # ex: /tmp/tmpV1oPTp/ILight_corpus_out
648        elif dir_type == 'local_corpus_out':
649            dir_path = os.path.join(local_temp_dir,
650                                    '%s_corpus_out' % test_name)
651        # ex: /tmp/tmpV1oPTp/ILight_corpus_trigger
652        elif dir_type == 'local_corpus_trigger':
653            dir_path = os.path.join(local_temp_dir,
654                                    '%s_corpus_trigger' % test_name)
655
656        return dir_path
657
658    def _GetFilePaths(self, file_type, test_name, seed=None):
659        """Generates the required file path name for the given information.
660
661        Args:
662            file_type: string, type of the file requested.
663            test_name: string, name of the current test.
664            seed: string, seed to base new file path name upon.
665
666        Returns:
667            file_path, generated file path if file_type supported.
668            Empty string if file_type not supported.
669        """
670        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_seed/20f5d9b8cd53881c9ff0205c9fdc5d283dc9fc68
671        if file_type in CORPUS_STATES:
672            file_path = os.path.join(
673                self._GetDirPaths(file_type, test_name),
674                os.path.basename(seed))
675            return file_path
676        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_seed_01/20f5d9b8cd53881c9ff0205c9fdc5d283dc9fc68
677        elif re.match('corpus_seed_[0-9][0-9]$', file_type):
678            file_path = os.path.join(
679                self._GetDirPaths(file_type, test_name),
680                os.path.basename(seed))
681            return file_path
682        # ex: corpus/[build tag]/[device]/ILight/ILight_corpus_lock/HT7BF1A01613
683        elif file_type == 'corpus_lock':
684            file_path = os.path.join(
685                self._GetDirPaths(file_type, test_name), self._device_serial)
686            return file_path
687        else:
688            logging.error('invalid file_type argument entered.')
689            return ''
690