1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import shutil 8import zipfile 9 10from autotest_lib.client.bin import test 11from autotest_lib.client.common_lib import autotemp, error 12from autotest_lib.client.cros.cros_disks import CrosDisksTester 13from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage 14from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent 15from collections import deque 16 17 18class CrosDisksArchiveTester(CrosDisksTester): 19 """A tester to verify archive support in CrosDisks. 20 """ 21 def __init__(self, test, archive_types): 22 super(CrosDisksArchiveTester, self).__init__(test) 23 self._data_dir = os.path.join(test.bindir, 'data') 24 self._archive_types = archive_types 25 26 def _find_all_files(self, root_dir): 27 """Returns all files under a directory and its sub-directories. 28 29 This is a generator that performs a breadth-first-search of 30 all files under a specified directory and its sub-directories. 31 32 Args: 33 root_dir: The root directory where the search starts from. 34 Yields: 35 Path of any found file relative to the root directory. 36 """ 37 dirs_to_explore = deque(['']) 38 while len(dirs_to_explore) > 0: 39 current_dir = dirs_to_explore.popleft() 40 for path in os.listdir(os.path.join(root_dir, current_dir)): 41 expanded_path = os.path.join(root_dir, current_dir, path) 42 relative_path = os.path.join(current_dir, path) 43 if os.path.isdir(expanded_path): 44 dirs_to_explore.append(relative_path) 45 else: 46 yield relative_path 47 48 def _make_zip_archive(self, archive_path, root_dir, 49 compression=zipfile.ZIP_DEFLATED): 50 """Archives a specified directory into a ZIP file. 51 52 The created ZIP file contains all files and sub-directories 53 under the specified root directory, but not the root directory 54 itself. 55 56 Args: 57 archive_path: Path of the output archive. 58 root_dir: The root directory to archive. 59 compression: The ZIP compression method. 60 """ 61 # ZipFile in Python 2.6 does not work with the 'with' statement. 62 archive = zipfile.ZipFile(archive_path, 'w', compression) 63 for path in self._find_all_files(root_dir): 64 archive.write(os.path.join(root_dir, path), path) 65 archive.close() 66 67 def _make_rar_archive(self, archive_path, root_dir): 68 """Archives a specified directory into a RAR file. 69 70 The created RAR file contains all files and sub-directories 71 under the specified root directory, but not the root directory 72 itself. 73 74 Args: 75 archive_path: Path of the output archive. 76 root_dir: The root directory to archive. 77 """ 78 # DESPICABLE HACK: app-arch/rar provides only pre-compiled rar binaries 79 # for x86/amd64. As a workaround, we pretend the RAR creation here 80 # using a precanned RAR file. 81 shutil.copyfile(os.path.join(self._data_dir, 'test.rar'), archive_path) 82 83 def _make_archive(self, archive_type, archive_path, root_dir): 84 """Archives a specified directory into an archive of specified type. 85 86 The created archive file contains all files and sub-directories 87 under the specified root directory, but not the root directory 88 itself. 89 90 Args: 91 archive_type: Type of the output archive. 92 archive_path: Path of the output archive. 93 root_dir: The root directory to archive. 94 """ 95 if archive_type in ['zip']: 96 self._make_zip_archive(archive_path, root_dir) 97 elif archive_type in ['rar']: 98 self._make_rar_archive(archive_path, root_dir) 99 else: 100 raise error.TestFail("Unsupported archive type " + archive_type) 101 102 def _test_archive(self, archive_type): 103 # Create the archive file content in a temporary directory. 104 archive_dir = autotemp.tempdir(unique_id='CrosDisks') 105 test_content = DefaultFilesystemTestContent() 106 if not test_content.create(archive_dir.name): 107 raise error.TestFail("Failed to create archive test content") 108 109 # Create a FAT-formatted virtual filesystem image containing an 110 # archive file to help stimulate mounting an archive file on a 111 # removable drive. 112 with VirtualFilesystemImage( 113 block_size=1024, 114 block_count=65536, 115 filesystem_type='vfat', 116 mkfs_options=[ '-F', '32', '-n', 'ARCHIVE' ]) as image: 117 image.format() 118 image.mount(options=['sync']) 119 # Create the archive file on the virtual filesystem image. 120 archive_name = 'test.' + archive_type 121 archive_path = os.path.join(image.mount_dir, archive_name) 122 self._make_archive(archive_type, archive_path, archive_dir.name) 123 image.unmount() 124 125 # Mount the virtual filesystem image via CrosDisks. 126 device_file = image.loop_device 127 self.cros_disks.mount(device_file, '', 128 [ "ro", "nodev", "noexec", "nosuid" ]) 129 result = self.cros_disks.expect_mount_completion({ 130 'status': 0, 131 'source_path': device_file 132 }) 133 134 # Mount the archive file on the mounted filesystem via CrosDisks. 135 archive_path = os.path.join(result['mount_path'], archive_name) 136 expected_mount_path = os.path.join('/media/archive', archive_name) 137 self.cros_disks.mount(archive_path) 138 result = self.cros_disks.expect_mount_completion({ 139 'status': 0, 140 'source_path': archive_path, 141 'mount_path': expected_mount_path 142 }) 143 144 # Verify the content of the mounted archive file. 145 if not test_content.verify(expected_mount_path): 146 raise error.TestFail("Failed to verify filesystem test content") 147 148 self.cros_disks.unmount(expected_mount_path, ['lazy']) 149 self.cros_disks.unmount(device_file, ['lazy']) 150 151 def test_archives(self): 152 for archive_type in self._archive_types: 153 self._test_archive(archive_type) 154 155 def get_tests(self): 156 return [self.test_archives] 157 158 159class platform_CrosDisksArchive(test.test): 160 version = 1 161 162 def run_once(self, *args, **kwargs): 163 tester = CrosDisksArchiveTester(self, kwargs['archive_types']) 164 tester.run(*args, **kwargs) 165