1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A module to provide interface to OS services."""
6
7import datetime
8import os
9import re
10import struct
11
12import shell_wrapper
13
14
15class OSInterfaceError(Exception):
16    """OS interface specific exception."""
17    pass
18
19class Crossystem(object):
20    """A wrapper for the crossystem utility."""
21
22    # Code dedicated for user triggering recovery mode through crossystem.
23    USER_RECOVERY_REQUEST_CODE = '193'
24
25    def init(self, os_if):
26        """Init the instance. If running on Mario - adjust the map."""
27        self.os_if = os_if
28
29    def __getattr__(self, name):
30        """
31        Retrieve a crosssystem attribute.
32
33        Attempt to access crossystemobject.name will invoke `crossystem name'
34        and return the stdout as the value.
35        """
36        return self.os_if.run_shell_command_get_output(
37            'crossystem %s' % name)[0]
38
39    def __setattr__(self, name, value):
40        if name in ('os_if',):
41            self.__dict__[name] = value
42        else:
43            self.os_if.run_shell_command('crossystem "%s=%s"' % (name, value))
44
45    def request_recovery(self):
46        """Request recovery mode next time the target reboots."""
47
48        self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE)
49
50
51class OSInterface(object):
52    """An object to encapsulate OS services functions."""
53
54    ANDROID_TESTER_FILE = '/mnt/stateful_partition/.android_faft_tester'
55
56    def __init__(self):
57        """Object construction time initialization."""
58        self.state_dir = None
59        self.log_file = None
60        self.cs = Crossystem()
61        self.is_android = os.path.isfile(self.ANDROID_TESTER_FILE)
62        if self.is_android:
63            self.shell = shell_wrapper.AdbShell()
64            self.host_shell = shell_wrapper.LocalShell()
65        else:
66            self.shell = shell_wrapper.LocalShell()
67            self.host_shell = None
68
69
70    def init(self, state_dir=None, log_file=None):
71        """Initialize the OS interface object.
72
73        Args:
74          state_dir - a string, the name of the directory (as defined by the
75                      caller). The contents of this directory persist over
76                      system restarts and power cycles.
77          log_file - a string, the name of the log file kept in the state
78                     directory.
79
80        Default argument values support unit testing.
81        """
82        self.cs.init(self)
83        self.state_dir = state_dir
84
85        if self.state_dir:
86            if not os.path.exists(self.state_dir):
87                try:
88                    os.mkdir(self.state_dir)
89                except OSError, err:
90                    raise OSInterfaceError(err)
91            if log_file:
92                if log_file[0] == '/':
93                    self.log_file = log_file
94                else:
95                    self.log_file = os.path.join(state_dir, log_file)
96
97        # Initialize the shell. Should be after creating the log file.
98        self.shell.init(self)
99        if self.host_shell:
100            self.host_shell.init(self)
101
102    def has_host(self):
103        """Return True if a host is connected to DUT."""
104        return self.is_android
105
106    def run_shell_command(self, cmd):
107        """Run a shell command."""
108        self.shell.run_command(cmd)
109
110    def run_shell_command_get_status(self, cmd):
111        """Run shell command and return its return code."""
112        return self.shell.run_command_get_status(cmd)
113
114    def run_shell_command_get_output(self, cmd):
115        """Run shell command and return its console output."""
116        return self.shell.run_command_get_output(cmd)
117
118    def run_host_shell_command(self, cmd, block=True):
119        """Run a shell command on the host."""
120        if self.host_shell:
121            self.host_shell.run_command(cmd, block)
122        else:
123            raise OSInterfaceError('There is no host for DUT.')
124
125    def run_host_shell_command_get_status(self, cmd):
126        """Run shell command and return its return code on the host."""
127        if self.host_shell:
128            return self.host_shell.run_command_get_status(cmd)
129        else:
130            raise OSInterfaceError('There is no host for DUT.')
131
132    def run_host_shell_command_get_output(self, cmd):
133        """Run shell command and return its console output."""
134        if self.host_shell:
135            return self.host_shell.run_command_get_output(cmd)
136        else:
137            raise OSInterfaceError('There is no host for DUT.')
138
139    def read_file(self, path):
140        """Read the content of the file."""
141        return self.shell.read_file(path)
142
143    def write_file(self, path, data):
144        """Write the data to the file."""
145        self.shell.write_file(path, data)
146
147    def append_file(self, path, data):
148        """Append the data to the file."""
149        self.shell.append_file(path, data)
150
151    def path_exists(self, path):
152        """Return True if the path exists on DUT."""
153        cmd = 'test -e %s' % path
154        return self.run_shell_command_get_status(cmd) == 0
155
156    def is_dir(self, path):
157        """Return True if the path is a directory."""
158        cmd = 'test -d %s' % path
159        return self.run_shell_command_get_status(cmd) == 0
160
161    def create_dir(self, path):
162        """Create a new directory."""
163        cmd = 'mkdir -p %s' % path
164        return self.run_shell_command(cmd)
165
166    def create_temp_file(self, prefix):
167        """Create a temporary file with a prefix."""
168        if self.is_android:
169            tmp_path = '/data/local/tmp'
170        else:
171            tmp_path = '/tmp'
172        cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix)
173        return self.run_shell_command_get_output(cmd)[0]
174
175    def copy_file(self, from_path, to_path):
176        """Copy the file."""
177        cmd = 'cp -f %s %s' % (from_path, to_path)
178        return self.run_shell_command(cmd)
179
180    def copy_dir(self, from_path, to_path):
181        """Copy the directory."""
182        cmd = 'cp -rf %s %s' % (from_path, to_path)
183        return self.run_shell_command(cmd)
184
185    def remove_file(self, path):
186        """Remove the file."""
187        cmd = 'rm -f %s' % path
188        return self.run_shell_command(cmd)
189
190    def remove_dir(self, path):
191        """Remove the directory."""
192        cmd = 'rm -rf %s' % path
193        return self.run_shell_command(cmd)
194
195    def get_file_size(self, path):
196        """Get the size of the file."""
197        cmd = 'stat -c %%s %s' % path
198        return int(self.run_shell_command_get_output(cmd)[0])
199
200    def target_hosted(self):
201        """Return True if running on DUT."""
202        if self.is_android:
203            return True
204        signature = open('/etc/lsb-release', 'r').readlines()[0]
205        return re.search(r'chrom(ium|e)os', signature, re.IGNORECASE) != None
206
207    def state_dir_file(self, file_name):
208        """Get a full path of a file in the state directory."""
209        return os.path.join(self.state_dir, file_name)
210
211    def wait_for_device(self, timeout):
212        """Wait for an Android device to be connected."""
213        return self.shell.wait_for_device(timeout)
214
215    def wait_for_no_device(self, timeout):
216        """Wait for no Android device to be connected (offline)."""
217        return self.shell.wait_for_no_device(timeout)
218
219    def log(self, text):
220        """Write text to the log file and print it on the screen, if enabled.
221
222        The entire log (maintained across reboots) can be found in
223        self.log_file.
224        """
225        if not self.log_file or not os.path.exists(self.state_dir):
226            # Called before environment was initialized, ignore.
227            return
228
229        timestamp = datetime.datetime.strftime(
230            datetime.datetime.now(), '%I:%M:%S %p:')
231
232        with open(self.log_file, 'a') as log_f:
233            log_f.write('%s %s\n' % (timestamp, text))
234            log_f.flush()
235            os.fdatasync(log_f)
236
237    def is_removable_device(self, device):
238        """Check if a certain storage device is removable.
239
240        device - a string, file name of a storage device or a device partition
241                 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
242
243        Returns True if the device is removable, False if not.
244        """
245        if self.is_android:
246            return False
247
248        if not self.target_hosted():
249            return False
250
251        # Drop trailing digit(s) and letter(s) (if any)
252        base_dev = self.strip_part(device.split('/')[2])
253        removable = int(self.read_file('/sys/block/%s/removable' % base_dev))
254
255        return removable == 1
256
257    def get_internal_disk(self, device):
258        """Get the internal disk by given the current disk.
259
260        If device is removable device, internal disk is decided by which kind
261        of divice (arm or x86). Otherwise, return device itself.
262
263        device - a string, file name of a storage device or a device partition
264                 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
265
266        Return internal kernel disk.
267        """
268        if self.is_removable_device(device):
269            for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'):
270                if self.path_exists(p):
271                    return p
272            return '/dev/sda'
273        else:
274            return self.strip_part(device)
275
276    def get_root_part(self):
277        """Return a string, the name of root device with partition number"""
278        # FIXME(waihong): Android doesn't support dual kernel/root and misses
279        # the related tools. Just return something that not break the existing
280        # code.
281        if self.is_android:
282            return '/dev/mmcblk0p3'
283        else:
284            return self.run_shell_command_get_output('rootdev -s')[0]
285
286    def get_root_dev(self):
287        """Return a string, the name of root device without partition number"""
288        return self.strip_part(self.get_root_part())
289
290    def join_part(self, dev, part):
291        """Return a concatenated string of device and partition number"""
292        if dev.endswith(tuple(str(i) for i in range(0, 10))):
293            return dev + 'p' + part
294        else:
295            return dev + part
296
297    def strip_part(self, dev_with_part):
298        """Return a stripped string without partition number"""
299        dev_name_stripper = re.compile('p?[0-9]+$')
300        return dev_name_stripper.sub('', dev_with_part)
301
302    def retrieve_body_version(self, blob):
303        """Given a blob, retrieve body version.
304
305        Currently works for both, firmware and kernel blobs. Returns '-1' in
306        case the version can not be retrieved reliably.
307        """
308        header_format = '<8s8sQ'
309        preamble_format = '<40sQ'
310        magic, _, kb_size = struct.unpack_from(header_format, blob)
311
312        if magic != 'CHROMEOS':
313            return -1  # This could be a corrupted version case.
314
315        _, version = struct.unpack_from(preamble_format, blob, kb_size)
316        return version
317
318    def retrieve_datakey_version(self, blob):
319        """Given a blob, retrieve firmware data key version.
320
321        Currently works for both, firmware and kernel blobs. Returns '-1' in
322        case the version can not be retrieved reliably.
323        """
324        header_format = '<8s96sQ'
325        magic, _, version = struct.unpack_from(header_format, blob)
326        if magic != 'CHROMEOS':
327            return -1 # This could be a corrupted version case.
328        return version
329
330    def retrieve_kernel_subkey_version(self, blob):
331        """Given a blob, retrieve kernel subkey version.
332
333        It is in firmware vblock's preamble.
334        """
335
336        header_format = '<8s8sQ'
337        preamble_format = '<72sQ'
338        magic, _, kb_size = struct.unpack_from(header_format, blob)
339
340        if magic != 'CHROMEOS':
341            return -1
342
343        _, version = struct.unpack_from(preamble_format, blob, kb_size)
344        return version
345
346    def retrieve_preamble_flags(self, blob):
347        """Given a blob, retrieve preamble flags if available.
348
349        It only works for firmware. If the version of preamble header is less
350        than 2.1, no preamble flags supported, just returns 0.
351        """
352        header_format = '<8s8sQ'
353        preamble_format = '<32sII64sI'
354        magic, _, kb_size = struct.unpack_from(header_format, blob)
355
356        if magic != 'CHROMEOS':
357            return -1  # This could be a corrupted version case.
358
359        _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob,
360                                                      kb_size)
361
362        if ver > 2 or (ver == 2 and subver >= 1):
363            return flags
364        else:
365            return 0  # Returns 0 if preamble flags not available.
366
367    def read_partition(self, partition, size):
368        """Read the requested partition, up to size bytes."""
369        tmp_file = self.state_dir_file('part.tmp')
370        self.run_shell_command('dd if=%s of=%s bs=1 count=%d' % (
371                partition, tmp_file, size))
372        data = self.read_file(tmp_file)
373        self.remove_file(tmp_file)
374        return data
375