1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import shutil
8import zipfile
9
10from autotest_lib.client.bin import test
11from autotest_lib.client.common_lib import autotemp, error
12from autotest_lib.client.cros.cros_disks import CrosDisksTester
13from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage
14from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent
15from collections import deque
16
17
18class CrosDisksArchiveTester(CrosDisksTester):
19    """A tester to verify archive support in CrosDisks.
20    """
21    def __init__(self, test, archive_types):
22        super(CrosDisksArchiveTester, self).__init__(test)
23        self._data_dir = os.path.join(test.bindir, 'data')
24        self._archive_types = archive_types
25
26    def _find_all_files(self, root_dir):
27        """Returns all files under a directory and its sub-directories.
28
29           This is a generator that performs a breadth-first-search of
30           all files under a specified directory and its sub-directories.
31
32        Args:
33            root_dir: The root directory where the search starts from.
34        Yields:
35            Path of any found file relative to the root directory.
36        """
37        dirs_to_explore = deque([''])
38        while len(dirs_to_explore) > 0:
39            current_dir = dirs_to_explore.popleft()
40            for path in os.listdir(os.path.join(root_dir, current_dir)):
41                expanded_path = os.path.join(root_dir, current_dir, path)
42                relative_path = os.path.join(current_dir, path)
43                if os.path.isdir(expanded_path):
44                    dirs_to_explore.append(relative_path)
45                else:
46                    yield relative_path
47
48    def _make_zip_archive(self, archive_path, root_dir,
49                         compression=zipfile.ZIP_DEFLATED):
50        """Archives a specified directory into a ZIP file.
51
52           The created ZIP file contains all files and sub-directories
53           under the specified root directory, but not the root directory
54           itself.
55
56        Args:
57            archive_path: Path of the output archive.
58            root_dir: The root directory to archive.
59            compression: The ZIP compression method.
60        """
61        # ZipFile in Python 2.6 does not work with the 'with' statement.
62        archive = zipfile.ZipFile(archive_path, 'w', compression)
63        for path in self._find_all_files(root_dir):
64            archive.write(os.path.join(root_dir, path), path)
65        archive.close()
66
67    def _make_rar_archive(self, archive_path, root_dir):
68        """Archives a specified directory into a RAR file.
69
70           The created RAR file contains all files and sub-directories
71           under the specified root directory, but not the root directory
72           itself.
73
74        Args:
75            archive_path: Path of the output archive.
76            root_dir: The root directory to archive.
77        """
78        # DESPICABLE HACK: app-arch/rar provides only pre-compiled rar binaries
79        # for x86/amd64. As a workaround, we pretend the RAR creation here
80        # using a precanned RAR file.
81        shutil.copyfile(os.path.join(self._data_dir, 'test.rar'), archive_path)
82
83    def _make_archive(self, archive_type, archive_path, root_dir):
84        """Archives a specified directory into an archive of specified type.
85
86           The created archive file contains all files and sub-directories
87           under the specified root directory, but not the root directory
88           itself.
89
90        Args:
91            archive_type: Type of the output archive.
92            archive_path: Path of the output archive.
93            root_dir: The root directory to archive.
94        """
95        if archive_type in ['zip']:
96            self._make_zip_archive(archive_path, root_dir)
97        elif archive_type in ['rar']:
98            self._make_rar_archive(archive_path, root_dir)
99        else:
100            raise error.TestFail("Unsupported archive type " + archive_type)
101
102    def _test_archive(self, archive_type):
103        # Create the archive file content in a temporary directory.
104        archive_dir = autotemp.tempdir(unique_id='CrosDisks')
105        test_content = DefaultFilesystemTestContent()
106        if not test_content.create(archive_dir.name):
107            raise error.TestFail("Failed to create archive test content")
108
109        # Create a FAT-formatted virtual filesystem image containing an
110        # archive file to help stimulate mounting an archive file on a
111        # removable drive.
112        with VirtualFilesystemImage(
113                block_size=1024,
114                block_count=65536,
115                filesystem_type='vfat',
116                mkfs_options=[ '-F', '32', '-n', 'ARCHIVE' ]) as image:
117            image.format()
118            image.mount(options=['sync'])
119            # Create the archive file on the virtual filesystem image.
120            archive_name = 'test.' + archive_type
121            archive_path = os.path.join(image.mount_dir, archive_name)
122            self._make_archive(archive_type, archive_path, archive_dir.name)
123            image.unmount()
124
125            # Mount the virtual filesystem image via CrosDisks.
126            device_file = image.loop_device
127            self.cros_disks.mount(device_file, '',
128                                  [ "ro", "nodev", "noexec", "nosuid" ])
129            result = self.cros_disks.expect_mount_completion({
130                'status': 0,
131                'source_path': device_file
132            })
133
134            # Mount the archive file on the mounted filesystem via CrosDisks.
135            archive_path = os.path.join(result['mount_path'], archive_name)
136            expected_mount_path = os.path.join('/media/archive', archive_name)
137            self.cros_disks.mount(archive_path)
138            result = self.cros_disks.expect_mount_completion({
139                'status': 0,
140                'source_path': archive_path,
141                'mount_path': expected_mount_path
142            })
143
144            # Verify the content of the mounted archive file.
145            if not test_content.verify(expected_mount_path):
146                raise error.TestFail("Failed to verify filesystem test content")
147
148            self.cros_disks.unmount(expected_mount_path, ['lazy'])
149            self.cros_disks.unmount(device_file, ['lazy'])
150
151    def test_archives(self):
152        for archive_type in self._archive_types:
153            self._test_archive(archive_type)
154
155    def get_tests(self):
156        return [self.test_archives]
157
158
159class platform_CrosDisksArchive(test.test):
160    version = 1
161
162    def run_once(self, *args, **kwargs):
163        tester = CrosDisksArchiveTester(self, kwargs['archive_types'])
164        tester.run(*args, **kwargs)
165