1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7
8from autotest_lib.client.bin import utils as bin_utils
9from autotest_lib.client.bin import test
10from autotest_lib.client.common_lib import error, utils
11
12class platform_RootPartitionsNotMounted(test.test):
13    version = 1
14
15    _CGPT_PATH = '/usr/bin/cgpt'
16    _ROOTDEV_PATH = '/usr/bin/rootdev'
17    _UPDATE_ENGINE_PATH = '/usr/sbin/update_engine'
18
19    def get_root_partitions(self, device):
20        """Gets a list of root partitions of a device.
21
22        Gets a list of root partitions of a device by calling
23        `cgpt find -t rootfs <device>`.
24
25        Args:
26            device: The device, specified by its device file, to examine.
27
28        Returns:
29            A list of root partitions, specified by their device file,
30            (e.g. /dev/sda1) of the given device.
31        """
32        cgpt_command = '%s find -t rootfs %s' % (self._CGPT_PATH, device)
33        return utils.run(cgpt_command).stdout.strip('\n').split('\n')
34
35    def get_mounted_devices(self, mounts_file):
36        """Gets a set of mounted devices from a given mounts file.
37
38        Gets a set of device files that are currently mounted. This method
39        parses a given mounts file (e.g. /proc/<pid>/mounts) and extracts the
40        entries with a source path under /dev/.
41
42        Returns:
43            A set of device file names (e.g. /dev/sda1)
44        """
45        mounted_devices = set()
46        try:
47            entries = open(mounts_file).readlines()
48        except:
49            entries = []
50        for entry in entries:
51            node = entry.split(' ')[0]
52            if node.startswith('/dev/'):
53                mounted_devices.add(node)
54        return mounted_devices
55
56    def get_process_executable(self, pid):
57        """Gets the executable path of a given process ID.
58
59        Args:
60            pid: Target process ID.
61
62        Returns:
63            The executable path of the given process ID or None on error.
64        """
65        try:
66            return os.readlink('/proc/%s/exe' % pid)
67        except:
68            return ""
69
70    def get_process_list(self, excluded_executables=[]):
71        """Gets a list of process IDs of active processes.
72
73        Gets a list of process IDs of active processes by looking into /proc
74        and filters out those processes with a executable path that is
75        excluded.
76
77        Args:
78            excluded_executables: A list of executable paths to exclude.
79
80        Returns:
81            A list of process IDs of active processes.
82        """
83        processes = []
84        for path in os.listdir('/proc'):
85            if not path.isdigit(): continue
86            process_exe = self.get_process_executable(path)
87            if process_exe and process_exe not in excluded_executables:
88                processes.append(path)
89        return processes
90
91    def run_once(self):
92        if os.geteuid() != 0:
93            raise error.TestNAError('This test needs to be run under root')
94
95        for path in [self._CGPT_PATH, self._ROOTDEV_PATH]:
96            if not os.path.isfile(path):
97                raise error.TestNAError('%s not found' % path)
98
99        root_device = bin_utils.get_root_device()
100        if not root_device:
101            raise error.TestNAError('Could not find the root device')
102        logging.debug('Root device: %s' % root_device)
103
104        root_partitions = self.get_root_partitions(root_device)
105        if not root_partitions:
106            raise error.TestNAError('Could not find any root partition')
107        logging.debug('Root partitions: %s' % ', '.join(root_partitions))
108
109        processes = self.get_process_list([self._UPDATE_ENGINE_PATH])
110        if not processes:
111            raise error.TestNAError('Could not find any process')
112        logging.debug('Active processes: %s' % ', '.join(processes))
113
114        for process in processes:
115            process_exe = self.get_process_executable(process)
116            mounts_file = '/proc/%s/mounts' % process
117            mounted_devices = self.get_mounted_devices(mounts_file)
118            for partition in root_partitions:
119                if partition in mounted_devices:
120                    raise error.TestFail(
121                            'Root partition "%s" is mounted by process %s (%s)'
122                            % (partition, process, process_exe))
123