1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7
8from autotest_lib.client.bin import site_utils, test
9from autotest_lib.client.common_lib import error, utils
10
11class platform_RootPartitionsNotMounted(test.test):
12    version = 1
13
14    _CGPT_PATH = '/usr/bin/cgpt'
15    _ROOTDEV_PATH = '/usr/bin/rootdev'
16    _UPDATE_ENGINE_PATH = '/usr/sbin/update_engine'
17
18    def get_root_partitions(self, device):
19        """Gets a list of root partitions of a device.
20
21        Gets a list of root partitions of a device by calling
22        `cgpt find -t rootfs <device>`.
23
24        Args:
25            device: The device, specified by its device file, to examine.
26
27        Returns:
28            A list of root partitions, specified by their device file,
29            (e.g. /dev/sda1) of the given device.
30        """
31        cgpt_command = '%s find -t rootfs %s' % (self._CGPT_PATH, device)
32        return utils.run(cgpt_command).stdout.strip('\n').split('\n')
33
34    def get_mounted_devices(self, mounts_file):
35        """Gets a set of mounted devices from a given mounts file.
36
37        Gets a set of device files that are currently mounted. This method
38        parses a given mounts file (e.g. /proc/<pid>/mounts) and extracts the
39        entries with a source path under /dev/.
40
41        Returns:
42            A set of device file names (e.g. /dev/sda1)
43        """
44        mounted_devices = set()
45        try:
46            entries = open(mounts_file).readlines()
47        except:
48            entries = []
49        for entry in entries:
50            node = entry.split(' ')[0]
51            if node.startswith('/dev/'):
52                mounted_devices.add(node)
53        return mounted_devices
54
55    def get_process_executable(self, pid):
56        """Gets the executable path of a given process ID.
57
58        Args:
59            pid: Target process ID.
60
61        Returns:
62            The executable path of the given process ID or None on error.
63        """
64        try:
65            return os.readlink('/proc/%s/exe' % pid)
66        except:
67            return ""
68
69    def get_process_list(self, excluded_executables=[]):
70        """Gets a list of process IDs of active processes.
71
72        Gets a list of process IDs of active processes by looking into /proc
73        and filters out those processes with a executable path that is
74        excluded.
75
76        Args:
77            excluded_executables: A list of executable paths to exclude.
78
79        Returns:
80            A list of process IDs of active processes.
81        """
82        processes = []
83        for path in os.listdir('/proc'):
84            if not path.isdigit(): continue
85            process_exe = self.get_process_executable(path)
86            if process_exe and process_exe not in excluded_executables:
87                processes.append(path)
88        return processes
89
90    def run_once(self):
91        if os.geteuid() != 0:
92            raise error.TestNAError('This test needs to be run under root')
93
94        for path in [self._CGPT_PATH, self._ROOTDEV_PATH]:
95            if not os.path.isfile(path):
96                raise error.TestNAError('%s not found' % path)
97
98        root_device = site_utils.get_root_device()
99        if not root_device:
100            raise error.TestNAError('Could not find the root device')
101        logging.debug('Root device: %s' % root_device)
102
103        root_partitions = self.get_root_partitions(root_device)
104        if not root_partitions:
105            raise error.TestNAError('Could not find any root partition')
106        logging.debug('Root partitions: %s' % ', '.join(root_partitions))
107
108        processes = self.get_process_list([self._UPDATE_ENGINE_PATH])
109        if not processes:
110            raise error.TestNAError('Could not find any process')
111        logging.debug('Active processes: %s' % ', '.join(processes))
112
113        for process in processes:
114            process_exe = self.get_process_executable(process)
115            mounts_file = '/proc/%s/mounts' % process
116            mounted_devices = self.get_mounted_devices(mounts_file)
117            for partition in root_partitions:
118                if partition in mounted_devices:
119                    raise error.TestFail(
120                            'Root partition "%s" is mounted by process %s (%s)'
121                            % (partition, process, process_exe))
122