1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7 8from autotest_lib.client.bin import utils as bin_utils 9from autotest_lib.client.bin import test 10from autotest_lib.client.common_lib import error, utils 11 12class platform_RootPartitionsNotMounted(test.test): 13 version = 1 14 15 _CGPT_PATH = '/usr/bin/cgpt' 16 _ROOTDEV_PATH = '/usr/bin/rootdev' 17 _UPDATE_ENGINE_PATH = '/usr/sbin/update_engine' 18 19 def get_root_partitions(self, device): 20 """Gets a list of root partitions of a device. 21 22 Gets a list of root partitions of a device by calling 23 `cgpt find -t rootfs <device>`. 24 25 Args: 26 device: The device, specified by its device file, to examine. 27 28 Returns: 29 A list of root partitions, specified by their device file, 30 (e.g. /dev/sda1) of the given device. 31 """ 32 cgpt_command = '%s find -t rootfs %s' % (self._CGPT_PATH, device) 33 return utils.run(cgpt_command).stdout.strip('\n').split('\n') 34 35 def get_mounted_devices(self, mounts_file): 36 """Gets a set of mounted devices from a given mounts file. 37 38 Gets a set of device files that are currently mounted. This method 39 parses a given mounts file (e.g. /proc/<pid>/mounts) and extracts the 40 entries with a source path under /dev/. 41 42 Returns: 43 A set of device file names (e.g. /dev/sda1) 44 """ 45 mounted_devices = set() 46 try: 47 entries = open(mounts_file).readlines() 48 except: 49 entries = [] 50 for entry in entries: 51 node = entry.split(' ')[0] 52 if node.startswith('/dev/'): 53 mounted_devices.add(node) 54 return mounted_devices 55 56 def get_process_executable(self, pid): 57 """Gets the executable path of a given process ID. 58 59 Args: 60 pid: Target process ID. 61 62 Returns: 63 The executable path of the given process ID or None on error. 64 """ 65 try: 66 return os.readlink('/proc/%s/exe' % pid) 67 except: 68 return "" 69 70 def get_process_list(self, excluded_executables=[]): 71 """Gets a list of process IDs of active processes. 72 73 Gets a list of process IDs of active processes by looking into /proc 74 and filters out those processes with a executable path that is 75 excluded. 76 77 Args: 78 excluded_executables: A list of executable paths to exclude. 79 80 Returns: 81 A list of process IDs of active processes. 82 """ 83 processes = [] 84 for path in os.listdir('/proc'): 85 if not path.isdigit(): continue 86 process_exe = self.get_process_executable(path) 87 if process_exe and process_exe not in excluded_executables: 88 processes.append(path) 89 return processes 90 91 def run_once(self): 92 if os.geteuid() != 0: 93 raise error.TestNAError('This test needs to be run under root') 94 95 for path in [self._CGPT_PATH, self._ROOTDEV_PATH]: 96 if not os.path.isfile(path): 97 raise error.TestNAError('%s not found' % path) 98 99 root_device = bin_utils.get_root_device() 100 if not root_device: 101 raise error.TestNAError('Could not find the root device') 102 logging.debug('Root device: %s' % root_device) 103 104 root_partitions = self.get_root_partitions(root_device) 105 if not root_partitions: 106 raise error.TestNAError('Could not find any root partition') 107 logging.debug('Root partitions: %s' % ', '.join(root_partitions)) 108 109 processes = self.get_process_list([self._UPDATE_ENGINE_PATH]) 110 if not processes: 111 raise error.TestNAError('Could not find any process') 112 logging.debug('Active processes: %s' % ', '.join(processes)) 113 114 for process in processes: 115 process_exe = self.get_process_executable(process) 116 mounts_file = '/proc/%s/mounts' % process 117 mounted_devices = self.get_mounted_devices(mounts_file) 118 for partition in root_partitions: 119 if partition in mounted_devices: 120 raise error.TestFail( 121 'Root partition "%s" is mounted by process %s (%s)' 122 % (partition, process, process_exe)) 123