1#!/usr/bin/env python3 2# 3# Copyright 2018 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6# 7 8"""A tool for running puffin tests in a corpus of deflate compressed files.""" 9 10import argparse 11import filecmp 12import logging 13import os 14import subprocess 15import sys 16import tempfile 17 18_PUFFHUFF = 'puffhuff' 19_PUFFDIFF = 'puffdiff' 20TESTS = (_PUFFHUFF, _PUFFDIFF) 21 22 23class Error(Exception): 24 """Puffin general processing error.""" 25 26 27def ParseArguments(argv): 28 """Parses and Validates command line arguments. 29 30 Args: 31 argv: command line arguments to parse. 32 33 Returns: 34 The arguments list. 35 """ 36 parser = argparse.ArgumentParser() 37 38 parser.add_argument('corpus', metavar='CORPUS', 39 help='A corpus directory containing compressed files') 40 parser.add_argument('-d', '--disabled_tests', default=(), metavar='', 41 nargs='*', 42 help=('Space separated list of tests to disable. ' 43 'Allowed options include: ' + ', '.join(TESTS)), 44 choices=TESTS) 45 parser.add_argument('--cache_size', type=int, metavar='SIZE', 46 help='The size (in bytes) of the cache for puffpatch ' 47 'operations.') 48 parser.add_argument('--debug', action='store_true', 49 help='Turns on verbosity.') 50 51 # Parse command-line arguments. 52 args = parser.parse_args(argv) 53 54 if not os.path.isdir(args.corpus): 55 raise Error('Corpus directory {} is non-existent or inaccesible' 56 .format(args.corpus)) 57 return args 58 59 60def main(argv): 61 """The main function.""" 62 args = ParseArguments(argv[1:]) 63 64 if args.debug: 65 logging.getLogger().setLevel(logging.DEBUG) 66 67 # Construct list of appropriate files. 68 files = list(filter(os.path.isfile, [os.path.join(args.corpus, f) 69 for f in os.listdir(args.corpus)])) 70 71 # For each file in corpus run puffhuff. 72 if _PUFFHUFF not in args.disabled_tests: 73 for src in files: 74 with tempfile.NamedTemporaryFile() as tgt_file: 75 76 operation = 'puffhuff' 77 logging.debug('Running %s on %s', operation, src) 78 cmd = ['puffin', 79 '--operation={}'.format(operation), 80 '--src_file={}'.format(src), 81 '--dst_file={}'.format(tgt_file.name)] 82 if subprocess.call(cmd) != 0: 83 raise Error('Puffin failed to do {} command: {}' 84 .format(operation, cmd)) 85 86 if not filecmp.cmp(src, tgt_file.name): 87 raise Error('The generated file {} is not equivalent to the original ' 88 'file {} after {} operation.' 89 .format(tgt_file.name, src, operation)) 90 91 if _PUFFDIFF not in args.disabled_tests: 92 # Run puffdiff and puffpatch for each pairs of files in the corpus. 93 for src in files: 94 for tgt in files: 95 with tempfile.NamedTemporaryFile() as patch, \ 96 tempfile.NamedTemporaryFile() as new_tgt: 97 98 operation = 'puffdiff' 99 logging.debug('Running %s on %s (%d) and %s (%d)', 100 operation, 101 os.path.basename(src), os.stat(src).st_size, 102 os.path.basename(tgt), os.stat(tgt).st_size) 103 cmd = ['puffin', 104 '--operation={}'.format(operation), 105 '--src_file={}'.format(src), 106 '--dst_file={}'.format(tgt), 107 '--patch_file={}'.format(patch.name)] 108 109 # Running the puffdiff operation 110 if subprocess.call(cmd) != 0: 111 raise Error('Puffin failed to do {} command: {}' 112 .format(operation, cmd)) 113 114 logging.debug('Patch size is: %d', os.stat(patch.name).st_size) 115 116 operation = 'puffpatch' 117 logging.debug('Running %s on src file %s and patch %s', 118 operation, os.path.basename(src), patch.name) 119 cmd = ['puffin', 120 '--operation={}'.format(operation), 121 '--src_file={}'.format(src), 122 '--dst_file={}'.format(new_tgt.name), 123 '--patch_file={}'.format(patch.name)] 124 if args.cache_size: 125 cmd += ['--cache_size={}'.format(args.cache_size)] 126 127 # Running the puffpatch operation 128 if subprocess.call(cmd) != 0: 129 raise Error('Puffin failed to do {} command: {}' 130 .format(operation, cmd)) 131 132 if not filecmp.cmp(tgt, new_tgt.name): 133 raise Error('The generated file {} is not equivalent to the ' 134 'original file {} after puffpatch operation.' 135 .format(new_tgt.name, tgt)) 136 137 return 0 138 139 140if __name__ == '__main__': 141 sys.exit(main(sys.argv)) 142