#!/usr/bin/env python # # Copyright (C) 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import re import uuid from vts.runners.host import asserts from vts.runners.host import base_test from vts.runners.host import const from vts.runners.host import test_runner from vts.utils.python.controllers import android_device from vts.utils.python.file import target_file_utils class KernelApiSysfsTest(base_test.BaseTestClass): '''Test cases which check sysfs files.''' def setUpClass(self): self.dut = self.android_devices[0] self.shell = self.dut.shell def ConvertToInteger(self, text): '''Check whether a given text is interger. Args: text: object, usually a string representing the content of a file Returns: bool, True if is integer ''' try: return int(text) except ValueError as e: logging.exception(e) asserts.fail('Content "%s" is not integer' % text) def MatchRegex(self, regex, string): '''Check whether a string completely matches a given regex. Assertions will fail if given string is not a complete match. Args: regex: string, regex pattern to match string: string, given string for matching ''' pattern = re.compile(regex) match = pattern.match(string) message = 'String "%s" is not a complete match of regex "%s".' % ( string, regex) asserts.assertTrue(match is not None, message) asserts.assertEqual(match.start(), 0, message) asserts.assertEqual(match.end(), len(string), message) def GetPathPermission(self, path, assert_if_absent): '''Get the permission bits of a path, catching IOError.''' permission = '' try: permission = target_file_utils.GetPermission(path, self.shell) except IOError as e: if not assert_if_absent: return None logging.exception(e) asserts.fail('Path "%s" does not exist or has invalid ' 'permission bits' % path) return permission def IsReadOnly(self, path, assert_if_absent=True): '''Check whether a given path is read only. Assertion will fail if given path does not exist or is not read only. ''' permission = self.GetPathPermission(path, assert_if_absent) if permission is None and not assert_if_absent: return asserts.assertTrue(target_file_utils.IsReadOnly(permission), 'path %s is not read only' % path) def IsReadWrite(self, path, assert_if_absent=True): '''Check whether a given path is read-write. Assertion will fail if given path does not exist or is not read-write. ''' permission = self.GetPathPermission(path, assert_if_absent) if permission is None and not assert_if_absent: return asserts.assertTrue(target_file_utils.IsReadWrite(permission), 'path %s is not read write' % path) def tryReadFileContent(self, f, shell): '''Attempt to read a file. If the file does not exist None will be returned. ''' try: content = target_file_utils.ReadFileContent(f, self.shell) except IOError as e: return None return content def testAndroidUSB(self): '''Check for the existence of required files in /sys/class/android_usb. ''' state = '/sys/class/android_usb/android0/state' self.IsReadOnly(state) contents = target_file_utils.ReadFileContent(state, self.shell).strip() asserts.assertTrue(contents in ['DISCONNECTED', 'CONNECTED', 'CONFIGURED'], '%s does not contain an expected string' % state) def testCpuOnlineFormat(self): '''Check the format of cpu online file. Confirm /sys/devices/system/cpu/online exists and is read-only. Parse contents to ensure it is a comma-separated series of ranges (%d-%d) and/or integers. ''' filepath = '/sys/devices/system/cpu/online' self.IsReadOnly(filepath) content = target_file_utils.ReadFileContent(filepath, self.shell) regex = '(\d+(-\d+)?)(,\d+(-\d+)?)*' if content.endswith('\n'): content = content[:-1] self.MatchRegex(regex, content) def testPerCpuCpufreq(self): '''Check each cpu's scaling_cur_freq, scaling_min_freq, scaling_max_freq, scaling_available_frequencies, and time_in_state files. ''' f = '/sys/devices/system/cpu/present' self.IsReadOnly(f) present_cpus = target_file_utils.ReadFileContent(f, self.shell) cpu_ranges = present_cpus.split(',') cpu_list = [] for r in cpu_ranges: m = re.match(r'(\d+)(-\d+)?', r) asserts.assertTrue(m is not None, 'malformatted range in /sys/devices/system/cpu/present') start_cpu = int(m.group(1)) if m.group(2) is None: end_cpu = start_cpu else: end_cpu = int(m.group(2)[1:]) cpu_list += range(start_cpu, end_cpu+1) for cpu in cpu_list: f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_cur_freq' % cpu self.IsReadOnly(f, False) content = self.tryReadFileContent(f, self.shell) if content is not None: self.ConvertToInteger(content) f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_min_freq' % cpu self.IsReadWrite(f, False) content = self.tryReadFileContent(f, self.shell) if content is not None: self.ConvertToInteger(content) f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_max_freq' % cpu self.IsReadWrite(f, False) content = self.tryReadFileContent(f, self.shell) if content is not None: self.ConvertToInteger(content) f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_available_frequencies' % cpu self.IsReadOnly(f, False) content = self.tryReadFileContent(f, self.shell) if content is not None: content = content.rstrip() avail_freqs = content.split(' ') for x in avail_freqs: self.ConvertToInteger(x) f = '/sys/devices/system/cpu/cpu%s/cpufreq/stats/time_in_state' % cpu self.IsReadOnly(f, False) content = self.tryReadFileContent(f, self.shell) if content is not None: for line in content: values = line.split() for v in values: try: unused = int(v) except ValueError as e: asserts.fail("Malformatted time_in_state file at %s" % f) def testLastResumeReason(self): '''Check /sys/kernel/wakeup_reasons/last_resume_reason.''' filepath = '/sys/kernel/wakeup_reasons/last_resume_reason' self.IsReadOnly(filepath) def testKernelMax(self): '''Check the value of /sys/devices/system/cpu/kernel_max.''' filepath = '/sys/devices/system/cpu/kernel_max' self.IsReadOnly(filepath) content = target_file_utils.ReadFileContent(filepath, self.shell) self.ConvertToInteger(content) def testNetMTU(self): '''Check for /sys/class/net/*/mtu.''' dirlist = target_file_utils.FindFiles(self.shell, '/sys/class/net', '*', '-maxdepth 1 -type l') for entry in dirlist: mtufile = entry + "/mtu" self.IsReadWrite(mtufile) content = target_file_utils.ReadFileContent(mtufile, self.shell) self.ConvertToInteger(content) def testRtcHctosys(self): '''Check that at least one rtc exists with hctosys = 1.''' rtclist = target_file_utils.FindFiles(self.shell, '/sys/class/rtc', 'rtc*', '-maxdepth 1 -type l') for entry in rtclist: content = target_file_utils.ReadFileContent(entry + "/hctosys", self.shell) try: hctosys = int(content) except ValueError as e: continue if hctosys == 1: return asserts.fail("No RTC with hctosys=1 present") def testWakeLock(self): '''Check that locking and unlocking a wake lock works.''' _WAKE_LOCK_PATH = '/sys/power/wake_lock' _WAKE_UNLOCK_PATH = '/sys/power/wake_unlock' lock_name = 'KernelApiSysfsTestWakeLock' + uuid.uuid4().hex # Enable wake lock self.shell.Execute('echo %s > %s' % (lock_name, _WAKE_LOCK_PATH)) # Confirm wake lock is enabled results = self.shell.Execute('cat %s' % _WAKE_LOCK_PATH) active_sources = results[const.STDOUT][0].split() asserts.assertTrue(lock_name in active_sources, 'active wake lock not reported in %s' % _WAKE_LOCK_PATH) # Disable wake lock self.shell.Execute('echo %s > %s' % (lock_name, _WAKE_UNLOCK_PATH)) # Confirm wake lock is no longer enabled results = self.shell.Execute('cat %s' % _WAKE_LOCK_PATH) active_sources = results[const.STDOUT][0].split() asserts.assertTrue(lock_name not in active_sources, 'inactive wake lock reported in %s' % _WAKE_LOCK_PATH) results = self.shell.Execute('cat %s' % _WAKE_UNLOCK_PATH) inactive_sources = results[const.STDOUT][0].split() asserts.assertTrue(lock_name in inactive_sources, 'inactive wake lock not reported in %s' % _WAKE_UNLOCK_PATH) def testWakeupCount(self): filepath = '/sys/power/wakeup_count' self.IsReadWrite(filepath) def testSysPowerState(self): '''/sys/power/state controls the system sleep states.''' filepath = '/sys/power/state' self.IsReadWrite(filepath) content = target_file_utils.ReadFileContent(filepath, self.shell) allowed_states = ['freeze', 'mem', 'disk', 'standby'] for state in content.split(): if state not in allowed_states: asserts.fail("Invalid system power state: %s" % state) if __name__ == "__main__": test_runner.main()