1#!/usr/bin/env python
2#
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import re
20import uuid
21
22from vts.runners.host import asserts
23from vts.runners.host import base_test
24from vts.runners.host import const
25from vts.runners.host import test_runner
26from vts.utils.python.controllers import android_device
27from vts.utils.python.file import target_file_utils
28
29
30class KernelApiSysfsTest(base_test.BaseTestClass):
31    '''Test cases which check sysfs files.'''
32
33    def setUpClass(self):
34        self.dut = self.android_devices[0]
35        self.shell = self.dut.shell
36
37    def ConvertToInteger(self, text):
38        '''Check whether a given text is interger.
39
40        Args:
41            text: object, usually a string representing the content of a file
42
43        Returns:
44            bool, True if is integer
45        '''
46        try:
47            return int(text)
48        except ValueError as e:
49            logging.exception(e)
50            asserts.fail('Content "%s" is not integer' % text)
51
52    def MatchRegex(self, regex, string):
53        '''Check whether a string completely matches a given regex.
54
55        Assertions will fail if given string is not a complete match.
56
57        Args:
58            regex: string, regex pattern to match
59            string: string, given string for matching
60        '''
61        pattern = re.compile(regex)
62        match = pattern.match(string)
63        message = 'String "%s" is not a complete match of regex "%s".' % (
64            string, regex)
65        asserts.assertTrue(match is not None, message)
66        asserts.assertEqual(match.start(), 0, message)
67        asserts.assertEqual(match.end(), len(string), message)
68
69    def GetPathPermission(self, path, assert_if_absent):
70        '''Get the permission bits of a path, catching IOError.'''
71        permission = ''
72        try:
73            permission = target_file_utils.GetPermission(path, self.shell)
74        except IOError as e:
75            if not assert_if_absent:
76                return None
77            logging.exception(e)
78            asserts.fail('Path "%s" does not exist or has invalid '
79                         'permission bits' % path)
80        return permission
81
82    def IsReadOnly(self, path, assert_if_absent=True):
83        '''Check whether a given path is read only.
84
85        Assertion will fail if given path does not exist or is not read only.
86        '''
87        permission = self.GetPathPermission(path, assert_if_absent)
88        if permission is None and not assert_if_absent:
89            return
90        asserts.assertTrue(target_file_utils.IsReadOnly(permission),
91                'path %s is not read only' % path)
92
93    def IsReadWrite(self, path, assert_if_absent=True):
94        '''Check whether a given path is read-write.
95
96        Assertion will fail if given path does not exist or is not read-write.
97        '''
98        permission = self.GetPathPermission(path, assert_if_absent)
99        if permission is None and not assert_if_absent:
100            return
101        asserts.assertTrue(target_file_utils.IsReadWrite(permission),
102                'path %s is not read write' % path)
103
104    def tryReadFileContent(self, f, shell):
105        '''Attempt to read a file.
106
107        If the file does not exist None will be returned.
108        '''
109        try:
110            content = target_file_utils.ReadFileContent(f, self.shell)
111        except IOError as e:
112            return None
113        return content
114
115    def testAndroidUSB(self):
116        '''Check for the existence of required files in /sys/class/android_usb.
117        '''
118        state = '/sys/class/android_usb/android0/state'
119        self.IsReadOnly(state)
120        contents = target_file_utils.ReadFileContent(state, self.shell).strip()
121        asserts.assertTrue(contents in
122                ['DISCONNECTED', 'CONNECTED', 'CONFIGURED'],
123                '%s does not contain an expected string' % state)
124
125    def testCpuOnlineFormat(self):
126        '''Check the format of cpu online file.
127
128        Confirm /sys/devices/system/cpu/online exists and is read-only.
129        Parse contents to ensure it is a comma-separated series of ranges
130        (%d-%d) and/or integers.
131        '''
132        filepath = '/sys/devices/system/cpu/online'
133        self.IsReadOnly(filepath)
134        content = target_file_utils.ReadFileContent(filepath, self.shell)
135        regex = '(\d+(-\d+)?)(,\d+(-\d+)?)*'
136        if content.endswith('\n'):
137            content = content[:-1]
138        self.MatchRegex(regex, content)
139
140    def testPerCpuCpufreq(self):
141        '''Check each cpu's scaling_cur_freq, scaling_min_freq, scaling_max_freq,
142        scaling_available_frequencies, and time_in_state files.
143        '''
144        f = '/sys/devices/system/cpu/present'
145        self.IsReadOnly(f)
146        present_cpus = target_file_utils.ReadFileContent(f, self.shell)
147        cpu_ranges = present_cpus.split(',')
148        cpu_list = []
149
150        for r in cpu_ranges:
151            m = re.match(r'(\d+)(-\d+)?', r)
152            asserts.assertTrue(m is not None,
153                    'malformatted range in /sys/devices/system/cpu/present')
154            start_cpu = int(m.group(1))
155            if m.group(2) is None:
156                end_cpu = start_cpu
157            else:
158                end_cpu = int(m.group(2)[1:])
159            cpu_list += range(start_cpu, end_cpu+1)
160
161        for cpu in cpu_list:
162            f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_cur_freq' % cpu
163            self.IsReadOnly(f, False)
164            content = self.tryReadFileContent(f, self.shell)
165            if content is not None:
166                self.ConvertToInteger(content)
167
168            f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_min_freq' % cpu
169            self.IsReadWrite(f, False)
170            content = self.tryReadFileContent(f, self.shell)
171            if content is not None:
172                self.ConvertToInteger(content)
173
174            f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_max_freq' % cpu
175            self.IsReadWrite(f, False)
176            content = self.tryReadFileContent(f, self.shell)
177            if content is not None:
178                self.ConvertToInteger(content)
179
180            f = '/sys/devices/system/cpu/cpu%s/cpufreq/scaling_available_frequencies' % cpu
181            self.IsReadOnly(f, False)
182            content = self.tryReadFileContent(f, self.shell)
183            if content is not None:
184                content = content.rstrip()
185                avail_freqs = content.split(' ')
186                for x in avail_freqs:
187                    self.ConvertToInteger(x)
188
189            f = '/sys/devices/system/cpu/cpu%s/cpufreq/stats/time_in_state' % cpu
190            self.IsReadOnly(f, False)
191            content = self.tryReadFileContent(f, self.shell)
192            if content is not None:
193                for line in content:
194                    values = line.split()
195                    for v in values:
196                        try:
197                            unused = int(v)
198                        except ValueError as e:
199                            asserts.fail("Malformatted time_in_state file at %s" % f)
200
201    def testLastResumeReason(self):
202        '''Check /sys/kernel/wakeup_reasons/last_resume_reason.'''
203        filepath = '/sys/kernel/wakeup_reasons/last_resume_reason'
204        self.IsReadOnly(filepath)
205
206    def testKernelMax(self):
207        '''Check the value of /sys/devices/system/cpu/kernel_max.'''
208        filepath = '/sys/devices/system/cpu/kernel_max'
209        self.IsReadOnly(filepath)
210        content = target_file_utils.ReadFileContent(filepath, self.shell)
211        self.ConvertToInteger(content)
212
213    def testNetMTU(self):
214        '''Check for /sys/class/net/*/mtu.'''
215        dirlist = target_file_utils.FindFiles(self.shell, '/sys/class/net',
216                '*', '-maxdepth 1 -type l')
217        for entry in dirlist:
218            mtufile = entry + "/mtu"
219            self.IsReadWrite(mtufile)
220            content = target_file_utils.ReadFileContent(mtufile, self.shell)
221            self.ConvertToInteger(content)
222
223    def testRtcHctosys(self):
224        '''Check that at least one rtc exists with hctosys = 1.'''
225        rtclist = target_file_utils.FindFiles(self.shell, '/sys/class/rtc',
226                'rtc*', '-maxdepth 1 -type l')
227        for entry in rtclist:
228            content = target_file_utils.ReadFileContent(entry + "/hctosys",
229                    self.shell)
230            try:
231                hctosys = int(content)
232            except ValueError as e:
233                continue
234            if hctosys == 1:
235                return
236        asserts.fail("No RTC with hctosys=1 present")
237
238    def testWakeLock(self):
239        '''Check that locking and unlocking a wake lock works.'''
240        _WAKE_LOCK_PATH = '/sys/power/wake_lock'
241        _WAKE_UNLOCK_PATH = '/sys/power/wake_unlock'
242        lock_name = 'KernelApiSysfsTestWakeLock' + uuid.uuid4().hex
243
244        # Enable wake lock
245        self.shell.Execute('echo %s > %s' % (lock_name, _WAKE_LOCK_PATH))
246
247        # Confirm wake lock is enabled
248        results = self.shell.Execute('cat %s' % _WAKE_LOCK_PATH)
249        active_sources = results[const.STDOUT][0].split()
250        asserts.assertTrue(lock_name in active_sources,
251                'active wake lock not reported in %s' % _WAKE_LOCK_PATH)
252
253        # Disable wake lock
254        self.shell.Execute('echo %s > %s' % (lock_name, _WAKE_UNLOCK_PATH))
255
256        # Confirm wake lock is no longer enabled
257        results = self.shell.Execute('cat %s' % _WAKE_LOCK_PATH)
258        active_sources = results[const.STDOUT][0].split()
259        asserts.assertTrue(lock_name not in active_sources,
260                'inactive wake lock reported in %s' % _WAKE_LOCK_PATH)
261        results = self.shell.Execute('cat %s' % _WAKE_UNLOCK_PATH)
262        inactive_sources = results[const.STDOUT][0].split()
263        asserts.assertTrue(lock_name in inactive_sources,
264                'inactive wake lock not reported in %s' % _WAKE_UNLOCK_PATH)
265
266    def testWakeupCount(self):
267        filepath = '/sys/power/wakeup_count'
268        self.IsReadWrite(filepath)
269
270    def testSysPowerState(self):
271        '''/sys/power/state controls the system sleep states.'''
272        filepath = '/sys/power/state'
273        self.IsReadWrite(filepath)
274        content = target_file_utils.ReadFileContent(filepath, self.shell)
275        allowed_states = ['freeze', 'mem', 'disk', 'standby']
276        for state in content.split():
277            if state not in allowed_states:
278                asserts.fail("Invalid system power state: %s" % state)
279
280if __name__ == "__main__":
281    test_runner.main()
282