1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import dbus, gobject, logging, os, stat
12from dbus.mainloop.glib import DBusGMainLoop
13import six
14from six.moves import zip
15
16import common
17
18from autotest_lib.client.bin import utils
19from autotest_lib.client.common_lib import autotemp, error
20from autotest_lib.client.cros import dbus_util
21from autotest_lib.client.cros.mainloop import ExceptionForward
22from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
23
24
25"""This module contains several helper classes for writing tests to verify the
26CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
27to derive functional tests that interact with the CrosDisks server over DBus.
28"""
29
30
31class ExceptionSuppressor(object):
32    """A context manager class for suppressing certain types of exception.
33
34    An instance of this class is expected to be used with the with statement
35    and takes a set of exception classes at instantiation, which are types of
36    exception to be suppressed (and logged) in the code block under the with
37    statement.
38
39    Example:
40
41        with ExceptionSuppressor(OSError, IOError):
42            # An exception, which is a sub-class of OSError or IOError, is
43            # suppressed in the block code under the with statement.
44    """
45    def __init__(self, *args):
46        self.__suppressed_exc_types = (args)
47
48    def __enter__(self):
49        return self
50
51    def __exit__(self, exc_type, exc_value, traceback):
52        if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
53            try:
54                logging.exception('Suppressed exception: %s(%s)',
55                                  exc_type, exc_value)
56            except Exception:
57                pass
58            return True
59        return False
60
61
62class DBusClient(object):
63    """ A base class of a DBus proxy client to test a DBus server.
64
65    This class is expected to be used along with a GLib main loop and provides
66    some convenient functions for testing the DBus API exposed by a DBus server.
67    """
68
69    def __init__(self, main_loop, bus, bus_name, object_path, timeout=None):
70        """Initializes the instance.
71
72        Args:
73            main_loop: The GLib main loop.
74            bus: The bus where the DBus server is connected to.
75            bus_name: The bus name owned by the DBus server.
76            object_path: The object path of the DBus server.
77            timeout: Maximum time in seconds to wait for the DBus connection.
78        """
79        self.__signal_content = {}
80        self.main_loop = main_loop
81        self.signal_timeout_in_seconds = 10
82        logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
83                      bus_name, object_path)
84        self.proxy_object = dbus_util.get_dbus_object(bus, bus_name,
85                                                      object_path, timeout)
86
87    def clear_signal_content(self, signal_name):
88        """Clears the content of the signal.
89
90        Args:
91            signal_name: The name of the signal.
92        """
93        if signal_name in self.__signal_content:
94            self.__signal_content[signal_name] = None
95
96    def get_signal_content(self, signal_name):
97        """Gets the content of a signal.
98
99        Args:
100            signal_name: The name of the signal.
101
102        Returns:
103            The content of a signal or None if the signal is not being handled.
104        """
105        return self.__signal_content.get(signal_name)
106
107    def handle_signal(self, interface, signal_name, argument_names=()):
108        """Registers a signal handler to handle a given signal.
109
110        Args:
111            interface: The DBus interface of the signal.
112            signal_name: The name of the signal.
113            argument_names: A list of argument names that the signal contains.
114        """
115        if signal_name in self.__signal_content:
116            return
117
118        self.__signal_content[signal_name] = None
119
120        def signal_handler(*args):
121            self.__signal_content[signal_name] = dict(zip(argument_names, args))
122
123        logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
124                      signal_name, ', '.join(argument_names), interface)
125        self.proxy_object.connect_to_signal(signal_name, signal_handler,
126                                            interface)
127
128    def wait_for_signal(self, signal_name):
129        """Waits for the reception of a signal.
130
131        Args:
132            signal_name: The name of the signal to wait for.
133
134        Returns:
135            The content of the signal.
136        """
137        if signal_name not in self.__signal_content:
138            return None
139
140        def check_signal_content():
141            context = self.main_loop.get_context()
142            while context.iteration(False):
143                pass
144            return self.__signal_content[signal_name] is not None
145
146        logging.debug('Waiting for D-Bus signal "%s"', signal_name)
147        utils.poll_for_condition(condition=check_signal_content,
148                                 desc='%s signal' % signal_name,
149                                 timeout=self.signal_timeout_in_seconds)
150        content = self.__signal_content[signal_name]
151        logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
152        self.__signal_content[signal_name] = None
153        return content
154
155    def expect_signal(self, signal_name, expected_content):
156        """Waits the the reception of a signal and verifies its content.
157
158        Args:
159            signal_name: The name of the signal to wait for.
160            expected_content: The expected content of the signal, which can be
161                              partially specified. Only specified fields are
162                              compared between the actual and expected content.
163
164        Returns:
165            The actual content of the signal.
166
167        Raises:
168            error.TestFail: A test failure when there is a mismatch between the
169                            actual and expected content of the signal.
170        """
171        actual_content = self.wait_for_signal(signal_name)
172        logging.debug("%s signal: expected=%s actual=%s",
173                      signal_name, expected_content, actual_content)
174        for argument, expected_value in six.iteritems(expected_content):
175            if argument not in actual_content:
176                raise error.TestFail(
177                    ('%s signal missing "%s": expected=%s, actual=%s') %
178                    (signal_name, argument, expected_content, actual_content))
179
180            if actual_content[argument] != expected_value:
181                raise error.TestFail(
182                    ('%s signal not matched on "%s": expected=%s, actual=%s') %
183                    (signal_name, argument, expected_content, actual_content))
184        return actual_content
185
186
187class CrosDisksClient(DBusClient):
188    """A DBus proxy client for testing the CrosDisks DBus server.
189    """
190
191    CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
192    CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
193    CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
194    DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
195    FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
196    FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
197        'status', 'path'
198    )
199    MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
200    MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
201        'status', 'source_path', 'source_type', 'mount_path'
202    )
203    RENAME_COMPLETED_SIGNAL = 'RenameCompleted'
204    RENAME_COMPLETED_SIGNAL_ARGUMENTS = (
205        'status', 'path'
206    )
207
208    def __init__(self, main_loop, bus, timeout_seconds=None):
209        """Initializes the instance.
210
211        Args:
212            main_loop: The GLib main loop.
213            bus: The bus where the DBus server is connected to.
214            timeout_seconds: Maximum time in seconds to wait for the DBus
215                             connection.
216        """
217        super(CrosDisksClient, self).__init__(main_loop, bus,
218                                              self.CROS_DISKS_BUS_NAME,
219                                              self.CROS_DISKS_OBJECT_PATH,
220                                              timeout_seconds)
221        self.interface = dbus.Interface(self.proxy_object,
222                                        self.CROS_DISKS_INTERFACE)
223        self.properties = dbus.Interface(self.proxy_object,
224                                         self.DBUS_PROPERTIES_INTERFACE)
225        self.handle_signal(self.CROS_DISKS_INTERFACE,
226                           self.FORMAT_COMPLETED_SIGNAL,
227                           self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
228        self.handle_signal(self.CROS_DISKS_INTERFACE,
229                           self.MOUNT_COMPLETED_SIGNAL,
230                           self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)
231        self.handle_signal(self.CROS_DISKS_INTERFACE,
232                           self.RENAME_COMPLETED_SIGNAL,
233                           self.RENAME_COMPLETED_SIGNAL_ARGUMENTS)
234
235    def enumerate_devices(self):
236        """Invokes the CrosDisks EnumerateMountableDevices method.
237
238        Returns:
239            A list of sysfs paths of devices that are recognized by
240            CrosDisks.
241        """
242        return self.interface.EnumerateDevices()
243
244    def get_device_properties(self, path):
245        """Invokes the CrosDisks GetDeviceProperties method.
246
247        Args:
248            path: The device path.
249
250        Returns:
251            The properties of the device in a dictionary.
252        """
253        return self.interface.GetDeviceProperties(path)
254
255    def format(self, path, filesystem_type=None, options=None):
256        """Invokes the CrosDisks Format method.
257
258        Args:
259            path: The device path to format.
260            filesystem_type: The filesystem type used for formatting the device.
261            options: A list of options used for formatting the device.
262        """
263        if filesystem_type is None:
264            filesystem_type = ''
265        if options is None:
266            options = []
267        self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
268        self.interface.Format(path, filesystem_type,
269                              dbus.Array(options, signature='s'))
270
271    def wait_for_format_completion(self):
272        """Waits for the CrosDisks FormatCompleted signal.
273
274        Returns:
275            The content of the FormatCompleted signal.
276        """
277        return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)
278
279    def expect_format_completion(self, expected_content):
280        """Waits and verifies for the CrosDisks FormatCompleted signal.
281
282        Args:
283            expected_content: The expected content of the FormatCompleted
284                              signal, which can be partially specified.
285                              Only specified fields are compared between the
286                              actual and expected content.
287
288        Returns:
289            The actual content of the FormatCompleted signal.
290
291        Raises:
292            error.TestFail: A test failure when there is a mismatch between the
293                            actual and expected content of the FormatCompleted
294                            signal.
295        """
296        return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
297                                  expected_content)
298
299    def rename(self, path, volume_name=None):
300        """Invokes the CrosDisks Rename method.
301
302        Args:
303            path: The device path to rename.
304            volume_name: The new name used for renaming.
305        """
306        if volume_name is None:
307            volume_name = ''
308        self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL)
309        self.interface.Rename(path, volume_name)
310
311    def wait_for_rename_completion(self):
312        """Waits for the CrosDisks RenameCompleted signal.
313
314        Returns:
315            The content of the RenameCompleted signal.
316        """
317        return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL)
318
319    def expect_rename_completion(self, expected_content):
320        """Waits and verifies for the CrosDisks RenameCompleted signal.
321
322        Args:
323            expected_content: The expected content of the RenameCompleted
324                              signal, which can be partially specified.
325                              Only specified fields are compared between the
326                              actual and expected content.
327
328        Returns:
329            The actual content of the RenameCompleted signal.
330
331        Raises:
332            error.TestFail: A test failure when there is a mismatch between the
333                            actual and expected content of the RenameCompleted
334                            signal.
335        """
336        return self.expect_signal(self.RENAME_COMPLETED_SIGNAL,
337                                  expected_content)
338
339    def mount(self, path, filesystem_type=None, options=None):
340        """Invokes the CrosDisks Mount method.
341
342        Args:
343            path: The device path to mount.
344            filesystem_type: The filesystem type used for mounting the device.
345            options: A list of options used for mounting the device.
346        """
347        if filesystem_type is None:
348            filesystem_type = ''
349        if options is None:
350            options = []
351        self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
352        self.interface.Mount(path, filesystem_type,
353                             dbus.Array(options, signature='s'))
354
355    def unmount(self, path, options=None):
356        """Invokes the CrosDisks Unmount method.
357
358        Args:
359            path: The device or mount path to unmount.
360            options: A list of options used for unmounting the path.
361
362        Returns:
363            The mount error code.
364        """
365        if options is None:
366            options = []
367        return self.interface.Unmount(path, dbus.Array(options, signature='s'))
368
369    def wait_for_mount_completion(self):
370        """Waits for the CrosDisks MountCompleted signal.
371
372        Returns:
373            The content of the MountCompleted signal.
374        """
375        return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)
376
377    def expect_mount_completion(self, expected_content):
378        """Waits and verifies for the CrosDisks MountCompleted signal.
379
380        Args:
381            expected_content: The expected content of the MountCompleted
382                              signal, which can be partially specified.
383                              Only specified fields are compared between the
384                              actual and expected content.
385
386        Returns:
387            The actual content of the MountCompleted signal.
388
389        Raises:
390            error.TestFail: A test failure when there is a mismatch between the
391                            actual and expected content of the MountCompleted
392                            signal.
393        """
394        return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
395                                  expected_content)
396
397    def add_loopback_to_allowlist(self, path):
398        """Adds a device by its path to the allowlist for testing.
399
400        Args:
401            path: path to the /dev/loopX device.
402        """
403        sys_path = '/sys/devices/virtual/block/' + os.path.basename(path)
404        self.interface.AddDeviceToAllowlist(sys_path)
405
406    def remove_loopback_from_allowlist(self, path):
407        """Removes a device by its sys path from the allowlist for testing.
408
409        Args:
410            path: path to the /dev/loopX device.
411        """
412        sys_path = '/sys/devices/virtual/block/' + os.path.basename(path)
413        self.interface.RemoveDeviceFromAllowlist(sys_path)
414
415
416class CrosDisksTester(GenericTesterMainLoop):
417    """A base tester class for testing the CrosDisks server.
418
419    A derived class should override the get_tests method to return a list of
420    test methods. The perform_one_test method invokes each test method in the
421    list to verify some functionalities of CrosDisks server.
422    """
423    def __init__(self, test):
424        bus_loop = DBusGMainLoop(set_as_default=True)
425        self.bus = dbus.SystemBus(mainloop=bus_loop)
426        self.main_loop = gobject.MainLoop()
427        super(CrosDisksTester, self).__init__(test, self.main_loop)
428        self.cros_disks = CrosDisksClient(self.main_loop, self.bus)
429
430    def get_tests(self):
431        """Returns a list of test methods to be invoked by perform_one_test.
432
433        A derived class should override this method.
434
435        Returns:
436            A list of test methods.
437        """
438        return []
439
440    @ExceptionForward
441    def perform_one_test(self):
442        """Exercises each test method in the list returned by get_tests.
443        """
444        tests = self.get_tests()
445        self.remaining_requirements = set([test.__name__ for test in tests])
446        for test in tests:
447            test()
448            self.requirement_completed(test.__name__)
449
450    def reconnect_client(self, timeout_seconds=None):
451        """"Reconnect the CrosDisks DBus client.
452
453        Args:
454            timeout_seconds: Maximum time in seconds to wait for the DBus
455                            connection.
456        """
457        self.cros_disks = CrosDisksClient(self.main_loop, self.bus,
458                                          timeout_seconds)
459
460
461class FilesystemTestObject(object):
462    """A base class to represent a filesystem test object.
463
464    A filesystem test object can be a file, directory or symbolic link.
465    A derived class should override the _create and _verify method to implement
466    how the test object should be created and verified, respectively, on a
467    filesystem.
468    """
469    def __init__(self, path, content, mode):
470        """Initializes the instance.
471
472        Args:
473            path: The relative path of the test object.
474            content: The content of the test object.
475            mode: The file permissions given to the test object.
476        """
477        self._path = path
478        self._content = content
479        self._mode = mode
480
481    def create(self, base_dir):
482        """Creates the test object in a base directory.
483
484        Args:
485            base_dir: The base directory where the test object is created.
486
487        Returns:
488            True if the test object is created successfully or False otherwise.
489        """
490        if not self._create(base_dir):
491            logging.debug('Failed to create filesystem test object at "%s"',
492                          os.path.join(base_dir, self._path))
493            return False
494        return True
495
496    def verify(self, base_dir):
497        """Verifies the test object in a base directory.
498
499        Args:
500            base_dir: The base directory where the test object is expected to be
501                      found.
502
503        Returns:
504            True if the test object is found in the base directory and matches
505            the expected content, or False otherwise.
506        """
507        if not self._verify(base_dir):
508            logging.error('Mismatched filesystem object at "%s"',
509                          os.path.join(base_dir, self._path))
510            return False
511        return True
512
513    def _create(self, base_dir):
514        return False
515
516    def _verify(self, base_dir):
517        return False
518
519
520class FilesystemTestDirectory(FilesystemTestObject):
521    """A filesystem test object that represents a directory."""
522
523    def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
524                 stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH, strict=False):
525        """Initializes the directory.
526
527        Args:
528            path: The name of this directory.
529            content: The list of items in this directory.
530            mode: The file permissions given to this directory.
531            strict: Whether verify() strictly compares directory contents for
532                    equality. This flag only applies to this directory, and not
533                    to any child directories.
534        """
535        super(FilesystemTestDirectory, self).__init__(path, content, mode)
536        self._strict = strict
537
538    def _create(self, base_dir):
539        path = os.path.join(base_dir, self._path) if self._path else base_dir
540
541        if self._path:
542            with ExceptionSuppressor(OSError):
543                os.makedirs(path)
544                os.chmod(path, self._mode)
545
546        if not os.path.isdir(path):
547            return False
548
549        for content in self._content:
550            if not content.create(path):
551                return False
552
553        return True
554
555    def _verify(self, base_dir):
556        path = os.path.join(base_dir, self._path) if self._path else base_dir
557        if not os.path.isdir(path):
558            return False
559
560        result = True
561        seen = set()
562
563        for content in self._content:
564            if not content.verify(path):
565                result = False
566            seen.add(content._path)
567
568        if self._strict:
569            for child in os.listdir(path):
570                if child not in seen:
571                    logging.error('Unexpected filesystem entry "%s"',
572                                  os.path.join(path, child))
573                    result = False
574
575        return result
576
577
578class FilesystemTestFile(FilesystemTestObject):
579    """A filesystem test object that represents a file."""
580
581    def __init__(self,
582                 path,
583                 content,
584                 mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP \
585                 | stat.S_IROTH,
586                 mtime=None):
587        """Initializes the file.
588
589        Args:
590            path: The name of this file.
591            content: A byte string with the expected file contents.
592            mode: The file permissions given to this file.
593            mtime: If set, the expected file modification timestamp.
594        """
595        super(FilesystemTestFile, self).__init__(path, content, mode)
596        self._mtime = mtime
597
598    def _create(self, base_dir):
599        path = os.path.join(base_dir, self._path)
600        with ExceptionSuppressor(IOError):
601            with open(path, 'wb+') as f:
602                f.write(self._content)
603            with ExceptionSuppressor(OSError):
604                os.chmod(path, self._mode)
605            return True
606        return False
607
608    def _verify(self, base_dir):
609        path = os.path.join(base_dir, self._path)
610        with ExceptionSuppressor(IOError):
611            result = True
612
613            if self._content is not None:
614                with open(path, 'rb') as f:
615                    if f.read() != self._content:
616                        logging.error('Mismatched file contents for "%s"',
617                                      path)
618                        result = False
619
620            if self._mtime is not None:
621                st = os.stat(path)
622                if st.st_mtime != self._mtime:
623                    logging.error(
624                            'Mismatched file modification time for "%s": ' +
625                            'want %d, got %d', path, self._mtime, st.st_mtime)
626                    result = False
627
628            return result
629
630        return False
631
632
633class DefaultFilesystemTestContent(FilesystemTestDirectory):
634    def __init__(self):
635        super(DefaultFilesystemTestContent, self).__init__('', [
636            FilesystemTestFile('file1', '0123456789'),
637            FilesystemTestDirectory('dir1', [
638                FilesystemTestFile('file1', ''),
639                FilesystemTestFile('file2', 'abcdefg'),
640                FilesystemTestDirectory('dir2', [
641                    FilesystemTestFile('file3', 'abcdefg'),
642                    FilesystemTestFile('file4', 'a' * 65536),
643                ]),
644            ]),
645        ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
646
647
648class VirtualFilesystemImage(object):
649    def __init__(self, block_size, block_count, filesystem_type,
650                 *args, **kwargs):
651        """Initializes the instance.
652
653        Args:
654            block_size: The number of bytes of each block in the image.
655            block_count: The number of blocks in the image.
656            filesystem_type: The filesystem type to be given to the mkfs
657                             program for formatting the image.
658
659        Keyword Args:
660            mount_filesystem_type: The filesystem type to be given to the
661                                   mount program for mounting the image.
662            mkfs_options: A list of options to be given to the mkfs program.
663        """
664        self._block_size = block_size
665        self._block_count = block_count
666        self._filesystem_type = filesystem_type
667        self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
668        if self._mount_filesystem_type is None:
669            self._mount_filesystem_type = filesystem_type
670        self._mkfs_options = kwargs.get('mkfs_options')
671        if self._mkfs_options is None:
672            self._mkfs_options = []
673        self._image_file = None
674        self._loop_device = None
675        self._loop_device_stat = None
676        self._mount_dir = None
677
678    def __del__(self):
679        with ExceptionSuppressor(Exception):
680            self.clean()
681
682    def __enter__(self):
683        self.create()
684        return self
685
686    def __exit__(self, exc_type, exc_value, traceback):
687        self.clean()
688        return False
689
690    def _remove_temp_path(self, temp_path):
691        """Removes a temporary file or directory created using autotemp."""
692        if temp_path:
693            with ExceptionSuppressor(Exception):
694                path = temp_path.name
695                temp_path.clean()
696                logging.debug('Removed "%s"', path)
697
698    def _remove_image_file(self):
699        """Removes the image file if one has been created."""
700        self._remove_temp_path(self._image_file)
701        self._image_file = None
702
703    def _remove_mount_dir(self):
704        """Removes the mount directory if one has been created."""
705        self._remove_temp_path(self._mount_dir)
706        self._mount_dir = None
707
708    @property
709    def image_file(self):
710        """Gets the path of the image file.
711
712        Returns:
713            The path of the image file or None if no image file has been
714            created.
715        """
716        return self._image_file.name if self._image_file else None
717
718    @property
719    def loop_device(self):
720        """Gets the loop device where the image file is attached to.
721
722        Returns:
723            The path of the loop device where the image file is attached to or
724            None if no loop device is attaching the image file.
725        """
726        return self._loop_device
727
728    @property
729    def mount_dir(self):
730        """Gets the directory where the image file is mounted to.
731
732        Returns:
733            The directory where the image file is mounted to or None if no
734            mount directory has been created.
735        """
736        return self._mount_dir.name if self._mount_dir else None
737
738    def create(self):
739        """Creates a zero-filled image file with the specified size.
740
741        The created image file is temporary and removed when clean()
742        is called.
743        """
744        self.clean()
745        self._image_file = autotemp.tempfile(unique_id='fsImage')
746        try:
747            logging.debug('Creating zero-filled image file at "%s"',
748                          self._image_file.name)
749            utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
750                      (self._image_file.name, self._block_size,
751                       self._block_count))
752        except error.CmdError as exc:
753            self._remove_image_file()
754            message = 'Failed to create filesystem image: %s' % exc
755            raise RuntimeError(message)
756
757    def clean(self):
758        """Removes the image file if one has been created.
759
760        Before removal, the image file is detached from the loop device that
761        it is attached to.
762        """
763        self.detach_from_loop_device()
764        self._remove_image_file()
765
766    def attach_to_loop_device(self):
767        """Attaches the created image file to a loop device.
768
769        Creates the image file, if one has not been created, by calling
770        create().
771
772        Returns:
773            The path of the loop device where the image file is attached to.
774        """
775        if self._loop_device:
776            return self._loop_device
777
778        if not self._image_file:
779            self.create()
780
781        logging.debug('Attaching image file "%s" to loop device',
782                      self._image_file.name)
783        utils.run('losetup -f %s' % self._image_file.name)
784        output = utils.system_output('losetup -j %s' % self._image_file.name)
785        # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
786        self._loop_device = output.split(':')[0]
787        logging.debug('Attached image file "%s" to loop device "%s"',
788                      self._image_file.name, self._loop_device)
789
790        self._loop_device_stat = os.stat(self._loop_device)
791        logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)',
792                      self._loop_device,
793                      self._loop_device_stat.st_uid,
794                      self._loop_device_stat.st_gid,
795                      stat.S_IMODE(self._loop_device_stat.st_mode))
796        return self._loop_device
797
798    def detach_from_loop_device(self):
799        """Detaches the image file from the loop device."""
800        if not self._loop_device:
801            return
802
803        self.unmount()
804
805        logging.debug('Cleaning up remaining mount points of loop device "%s"',
806                      self._loop_device)
807        utils.run('umount -f %s' % self._loop_device, ignore_status=True)
808
809        logging.debug('Restore ownership/permissions of loop device "%s"',
810                      self._loop_device)
811        os.chmod(self._loop_device,
812                 stat.S_IMODE(self._loop_device_stat.st_mode))
813        os.chown(self._loop_device,
814                 self._loop_device_stat.st_uid, self._loop_device_stat.st_gid)
815
816        logging.debug('Detaching image file "%s" from loop device "%s"',
817                      self._image_file.name, self._loop_device)
818        utils.run('losetup -d %s' % self._loop_device)
819        self._loop_device = None
820
821    def format(self):
822        """Formats the image file as the specified filesystem."""
823        self.attach_to_loop_device()
824        try:
825            logging.debug('Formatting image file at "%s" as "%s" filesystem',
826                          self._image_file.name, self._filesystem_type)
827            utils.run('yes | mkfs -t %s %s %s' %
828                      (self._filesystem_type, ' '.join(self._mkfs_options),
829                       self._loop_device))
830            logging.debug('blkid: %s', utils.system_output(
831                'blkid -c /dev/null %s' % self._loop_device,
832                ignore_status=True))
833        except error.CmdError as exc:
834            message = 'Failed to format filesystem image: %s' % exc
835            raise RuntimeError(message)
836
837    def mount(self, options=None):
838        """Mounts the image file to a directory.
839
840        Args:
841            options: An optional list of mount options.
842        """
843        if self._mount_dir:
844            return self._mount_dir.name
845
846        if options is None:
847            options = []
848
849        options_arg = ','.join(options)
850        if options_arg:
851            options_arg = '-o ' + options_arg
852
853        self.attach_to_loop_device()
854        self._mount_dir = autotemp.tempdir(unique_id='fsImage')
855        try:
856            logging.debug('Mounting image file "%s" (%s) to directory "%s"',
857                          self._image_file.name, self._loop_device,
858                          self._mount_dir.name)
859            utils.run('mount -t %s %s %s %s' %
860                      (self._mount_filesystem_type, options_arg,
861                       self._loop_device, self._mount_dir.name))
862        except error.CmdError as exc:
863            self._remove_mount_dir()
864            message = ('Failed to mount virtual filesystem image "%s": %s' %
865                       (self._image_file.name, exc))
866            raise RuntimeError(message)
867        return self._mount_dir.name
868
869    def unmount(self):
870        """Unmounts the image file from the mounted directory."""
871        if not self._mount_dir:
872            return
873
874        try:
875            logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
876                          self._image_file.name, self._loop_device,
877                          self._mount_dir.name)
878            utils.run('umount %s' % self._mount_dir.name)
879        except error.CmdError as exc:
880            message = ('Failed to unmount virtual filesystem image "%s": %s' %
881                       (self._image_file.name, exc))
882            raise RuntimeError(message)
883        finally:
884            self._remove_mount_dir()
885
886    def get_volume_label(self):
887        """Gets volume name information of |self._loop_device|
888
889        @return a string with volume name if it exists.
890        """
891        # This script is run as root in a normal autotest run,
892        # so this works: It doesn't have access to the necessary info
893        # when run as a non-privileged user
894        cmd = "blkid -c /dev/null -o udev %s" % self._loop_device
895        output = utils.system_output(cmd, ignore_status=True)
896
897        for line in output.splitlines():
898            udev_key, udev_val = line.split('=')
899
900            if udev_key == 'ID_FS_LABEL':
901                return udev_val
902
903        return None
904