1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright 2019 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Script to image a ChromeOS device.
9
10This script images a remote ChromeOS device with a specific image."
11"""
12
13from __future__ import print_function
14
15__author__ = 'asharif@google.com (Ahmad Sharif)'
16
17import argparse
18import filecmp
19import getpass
20import glob
21import os
22import re
23import shutil
24import sys
25import tempfile
26import time
27
28from cros_utils import command_executer
29from cros_utils import locks
30from cros_utils import logger
31from cros_utils import misc
32from cros_utils.file_utils import FileUtils
33
34checksum_file = '/usr/local/osimage_checksum_file'
35lock_file = '/tmp/image_chromeos_lock/image_chromeos_lock'
36
37
38def Usage(parser, message):
39  print('ERROR: %s' % message)
40  parser.print_help()
41  sys.exit(0)
42
43
44def CheckForCrosFlash(chromeos_root, remote, log_level):
45  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
46
47  # Check to see if remote machine has cherrypy, ctypes
48  command = "python -c 'import cherrypy, ctypes'"
49  ret = cmd_executer.CrosRunCommand(
50      command, chromeos_root=chromeos_root, machine=remote)
51  logger.GetLogger().LogFatalIf(
52      ret == 255, 'Failed ssh to %s (for checking cherrypy)' % remote)
53  logger.GetLogger().LogFatalIf(
54      ret != 0, "Failed to find cherrypy or ctypes on remote '{}', "
55      'cros flash cannot work.'.format(remote))
56
57
58def DisableCrosBeeps(chromeos_root, remote, log_level):
59  """Disable annoying chromebooks beeps after reboots."""
60  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
61
62  command = '/usr/share/vboot/bin/set_gbb_flags.sh 0x1'
63  logger.GetLogger().LogOutput('Trying to disable beeping.')
64
65  ret, o, _ = cmd_executer.CrosRunCommandWOutput(
66      command, chromeos_root=chromeos_root, machine=remote)
67  if ret != 0:
68    logger.GetLogger().LogOutput(o)
69    logger.GetLogger().LogOutput('Failed to disable beeps.')
70
71
72def FindChromeOSImage(image_file, chromeos_root):
73  """Find path for ChromeOS image inside chroot.
74
75  This function could be called with image paths that are either inside
76  or outside the chroot.  In either case the path needs to be translated
77  to an real/absolute path inside the chroot.
78  Example input paths:
79  /usr/local/google/home/uname/chromeos/chroot/tmp/my-test-images/image
80  ~/trunk/src/build/images/board/latest/image
81  /tmp/peppy-release/R67-1235.0.0/image
82
83  Corresponding example output paths:
84  /tmp/my-test-images/image
85  /home/uname/trunk/src/build/images/board/latest/image
86  /tmp/peppy-release/R67-1235.0,0/image
87  """
88
89  # Get the name of the user, for "/home/<user>" part of the path.
90  whoami = getpass.getuser()
91  # Get the full path for the chroot dir, including 'chroot'
92  real_chroot_dir = os.path.join(os.path.realpath(chromeos_root), 'chroot')
93  # Get the full path for the chromeos root, excluding 'chroot'
94  real_chromeos_root = os.path.realpath(chromeos_root)
95
96  # If path name starts with real_chroot_dir, remove that piece, but assume
97  # the rest of the path is correct.
98  if image_file.find(real_chroot_dir) != -1:
99    chroot_image = image_file[len(real_chroot_dir):]
100  # If path name starts with chromeos_root, excluding 'chroot', replace the
101  # chromeos_root with the prefix: '/home/<username>/trunk'.
102  elif image_file.find(real_chromeos_root) != -1:
103    chroot_image = image_file[len(real_chromeos_root):]
104    chroot_image = '/home/%s/trunk%s' % (whoami, chroot_image)
105  # Else assume the path is already internal, so leave it alone.
106  else:
107    chroot_image = image_file
108
109  return chroot_image
110
111
112def DoImage(argv):
113  """Image ChromeOS."""
114
115  parser = argparse.ArgumentParser()
116  parser.add_argument(
117      '-c',
118      '--chromeos_root',
119      dest='chromeos_root',
120      help='Target directory for ChromeOS installation.')
121  parser.add_argument('-r', '--remote', dest='remote', help='Target device.')
122  parser.add_argument('-i', '--image', dest='image', help='Image binary file.')
123  parser.add_argument(
124      '-b', '--board', dest='board', help='Target board override.')
125  parser.add_argument(
126      '-f',
127      '--force',
128      dest='force',
129      action='store_true',
130      default=False,
131      help='Force an image even if it is non-test.')
132  parser.add_argument(
133      '-n',
134      '--no_lock',
135      dest='no_lock',
136      default=False,
137      action='store_true',
138      help='Do not attempt to lock remote before imaging.  '
139      'This option should only be used in cases where the '
140      'exclusive lock has already been acquired (e.g. in '
141      'a script that calls this one).')
142  parser.add_argument(
143      '-l',
144      '--logging_level',
145      dest='log_level',
146      default='verbose',
147      help='Amount of logging to be used. Valid levels are '
148      "'quiet', 'average', and 'verbose'.")
149  parser.add_argument('-a', '--image_args', dest='image_args')
150
151  options = parser.parse_args(argv[1:])
152
153  if not options.log_level in command_executer.LOG_LEVEL:
154    Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'")
155  else:
156    log_level = options.log_level
157
158  # Common initializations
159  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
160  l = logger.GetLogger()
161
162  if options.chromeos_root is None:
163    Usage(parser, '--chromeos_root must be set')
164
165  if options.remote is None:
166    Usage(parser, '--remote must be set')
167
168  options.chromeos_root = os.path.expanduser(options.chromeos_root)
169
170  if options.board is None:
171    board = cmd_executer.CrosLearnBoard(options.chromeos_root, options.remote)
172  else:
173    board = options.board
174
175  if options.image is None:
176    images_dir = misc.GetImageDir(options.chromeos_root, board)
177    image = os.path.join(images_dir, 'latest', 'chromiumos_test_image.bin')
178    if not os.path.exists(image):
179      image = os.path.join(images_dir, 'latest', 'chromiumos_image.bin')
180    is_xbuddy_image = False
181  else:
182    image = options.image
183    is_xbuddy_image = image.startswith('xbuddy://')
184    if not is_xbuddy_image:
185      image = os.path.expanduser(image)
186
187  if not is_xbuddy_image:
188    image = os.path.realpath(image)
189
190  if not os.path.exists(image) and not is_xbuddy_image:
191    Usage(parser, 'Image file: ' + image + ' does not exist!')
192
193  try:
194    should_unlock = False
195    if not options.no_lock:
196      try:
197        _ = locks.AcquireLock(
198            list(options.remote.split()), options.chromeos_root)
199        should_unlock = True
200      except Exception as e:
201        raise RuntimeError('Error acquiring machine: %s' % str(e))
202
203    reimage = False
204    local_image = False
205    if not is_xbuddy_image:
206      local_image = True
207      image_checksum = FileUtils().Md5File(image, log_level=log_level)
208
209      command = 'cat ' + checksum_file
210      ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput(
211          command, chromeos_root=options.chromeos_root, machine=options.remote)
212
213      device_checksum = device_checksum.strip()
214      image_checksum = str(image_checksum)
215
216      l.LogOutput('Image checksum: ' + image_checksum)
217      l.LogOutput('Device checksum: ' + device_checksum)
218
219      if image_checksum != device_checksum:
220        [found, located_image] = LocateOrCopyImage(
221            options.chromeos_root, image, board=board)
222
223        reimage = True
224        l.LogOutput('Checksums do not match. Re-imaging...')
225
226        chroot_image = FindChromeOSImage(located_image, options.chromeos_root)
227
228        is_test_image = IsImageModdedForTest(options.chromeos_root,
229                                             chroot_image, log_level)
230
231        if not is_test_image and not options.force:
232          logger.GetLogger().LogFatal('Have to pass --force to image a '
233                                      'non-test image!')
234    else:
235      reimage = True
236      found = True
237      l.LogOutput('Using non-local image; Re-imaging...')
238
239    if reimage:
240      # If the device has /tmp mounted as noexec, image_to_live.sh can fail.
241      command = 'mount -o remount,rw,exec /tmp'
242      cmd_executer.CrosRunCommand(
243          command, chromeos_root=options.chromeos_root, machine=options.remote)
244
245      # Check to see if cros flash will work for the remote machine.
246      CheckForCrosFlash(options.chromeos_root, options.remote, log_level)
247
248      # Disable the annoying chromebook beeps after reboot.
249      DisableCrosBeeps(options.chromeos_root, options.remote, log_level)
250
251      cros_flash_args = [
252          'cros', 'flash',
253          '--board=%s' % board, '--clobber-stateful', options.remote
254      ]
255      if local_image:
256        cros_flash_args.append(chroot_image)
257      else:
258        cros_flash_args.append(image)
259
260      command = ' '.join(cros_flash_args)
261
262      # Workaround for crosbug.com/35684.
263      os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600)
264
265      if log_level == 'average':
266        cmd_executer.SetLogLevel('verbose')
267      retries = 0
268      while True:
269        if log_level == 'quiet':
270          l.LogOutput('CMD : %s' % command)
271        ret = cmd_executer.ChrootRunCommand(
272            options.chromeos_root, command, command_timeout=1800)
273        if ret == 0 or retries >= 2:
274          break
275        retries += 1
276        if log_level == 'quiet':
277          l.LogOutput('Imaging failed. Retry # %d.' % retries)
278
279      if log_level == 'average':
280        cmd_executer.SetLogLevel(log_level)
281
282      logger.GetLogger().LogFatalIf(ret, 'Image command failed')
283
284      # Unfortunately cros_image_to_target.py sometimes returns early when the
285      # machine isn't fully up yet.
286      ret = EnsureMachineUp(options.chromeos_root, options.remote, log_level)
287
288      # If this is a non-local image, then the ret returned from
289      # EnsureMachineUp is the one that will be returned by this function;
290      # in that case, make sure the value in 'ret' is appropriate.
291      if not local_image and ret:
292        ret = 0
293      else:
294        ret = 1
295
296      if local_image:
297        if log_level == 'average':
298          l.LogOutput('Verifying image.')
299        command = 'echo %s > %s && chmod -w %s' % (image_checksum,
300                                                   checksum_file, checksum_file)
301        ret = cmd_executer.CrosRunCommand(
302            command,
303            chromeos_root=options.chromeos_root,
304            machine=options.remote)
305        logger.GetLogger().LogFatalIf(ret, 'Writing checksum failed.')
306
307        successfully_imaged = VerifyChromeChecksum(
308            options.chromeos_root, chroot_image, options.remote, log_level)
309        logger.GetLogger().LogFatalIf(not successfully_imaged,
310                                      'Image verification failed!')
311        TryRemountPartitionAsRW(options.chromeos_root, options.remote,
312                                log_level)
313
314      if not found:
315        temp_dir = os.path.dirname(located_image)
316        l.LogOutput('Deleting temp image dir: %s' % temp_dir)
317        shutil.rmtree(temp_dir)
318      l.LogOutput('Image updated.')
319    else:
320      l.LogOutput('Checksums match, skip image update and reboot.')
321      command = 'reboot && exit'
322      _ = cmd_executer.CrosRunCommand(
323          command, chromeos_root=options.chromeos_root, machine=options.remote)
324      # Wait 30s after reboot.
325      time.sleep(30)
326
327  finally:
328    if should_unlock:
329      locks.ReleaseLock(list(options.remote.split()), options.chromeos_root)
330
331  return ret
332
333
334def LocateOrCopyImage(chromeos_root, image, board=None):
335  l = logger.GetLogger()
336  if board is None:
337    board_glob = '*'
338  else:
339    board_glob = board
340
341  chromeos_root_realpath = os.path.realpath(chromeos_root)
342  image = os.path.realpath(image)
343
344  if image.startswith('%s/' % chromeos_root_realpath):
345    return [True, image]
346
347  # First search within the existing build dirs for any matching files.
348  images_glob = (
349      '%s/src/build/images/%s/*/*.bin' % (chromeos_root_realpath, board_glob))
350  images_list = glob.glob(images_glob)
351  for potential_image in images_list:
352    if filecmp.cmp(potential_image, image):
353      l.LogOutput('Found matching image %s in chromeos_root.' % potential_image)
354      return [True, potential_image]
355  # We did not find an image. Copy it in the src dir and return the copied
356  # file.
357  if board is None:
358    board = ''
359  base_dir = ('%s/src/build/images/%s' % (chromeos_root_realpath, board))
360  if not os.path.isdir(base_dir):
361    os.makedirs(base_dir)
362  temp_dir = tempfile.mkdtemp(prefix='%s/tmp' % base_dir)
363  new_image = '%s/%s' % (temp_dir, os.path.basename(image))
364  l.LogOutput('No matching image found. Copying %s to %s' % (image, new_image))
365  shutil.copyfile(image, new_image)
366  return [False, new_image]
367
368
369def GetImageMountCommand(image, rootfs_mp, stateful_mp):
370  image_dir = os.path.dirname(image)
371  image_file = os.path.basename(image)
372  mount_command = ('cd /mnt/host/source/src/scripts &&'
373                   './mount_gpt_image.sh --from=%s --image=%s'
374                   ' --safe --read_only'
375                   ' --rootfs_mountpt=%s'
376                   ' --stateful_mountpt=%s' % (image_dir, image_file, rootfs_mp,
377                                               stateful_mp))
378  return mount_command
379
380
381def MountImage(chromeos_root,
382               image,
383               rootfs_mp,
384               stateful_mp,
385               log_level,
386               unmount=False,
387               extra_commands=''):
388  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
389  command = GetImageMountCommand(image, rootfs_mp, stateful_mp)
390  if unmount:
391    command = '%s --unmount' % command
392  if extra_commands:
393    command = '%s ; %s' % (command, extra_commands)
394  ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command)
395  logger.GetLogger().LogFatalIf(ret, 'Mount/unmount command failed!')
396  return out
397
398
399def IsImageModdedForTest(chromeos_root, image, log_level):
400  if log_level != 'verbose':
401    log_level = 'quiet'
402  command = 'mktemp -d'
403  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
404  _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command)
405  _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput(
406      chromeos_root, command)
407  rootfs_mp = rootfs_mp.strip()
408  stateful_mp = stateful_mp.strip()
409  lsb_release_file = os.path.join(rootfs_mp, 'etc/lsb-release')
410  extra = ('grep CHROMEOS_RELEASE_TRACK %s | grep -i test' % lsb_release_file)
411  output = MountImage(
412      chromeos_root,
413      image,
414      rootfs_mp,
415      stateful_mp,
416      log_level,
417      extra_commands=extra)
418  is_test_image = re.search('test', output, re.IGNORECASE)
419  MountImage(
420      chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True)
421  return is_test_image
422
423
424def VerifyChromeChecksum(chromeos_root, image, remote, log_level):
425  command = 'mktemp -d'
426  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
427  _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command)
428  _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput(
429      chromeos_root, command)
430  rootfs_mp = rootfs_mp.strip()
431  stateful_mp = stateful_mp.strip()
432  chrome_file = '%s/opt/google/chrome/chrome' % rootfs_mp
433  extra = 'md5sum %s' % chrome_file
434  out = MountImage(
435      chromeos_root,
436      image,
437      rootfs_mp,
438      stateful_mp,
439      log_level,
440      extra_commands=extra)
441  image_chrome_checksum = out.strip().split()[0]
442  MountImage(
443      chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True)
444
445  command = 'md5sum /opt/google/chrome/chrome'
446  [_, o, _] = cmd_executer.CrosRunCommandWOutput(
447      command, chromeos_root=chromeos_root, machine=remote)
448  device_chrome_checksum = o.split()[0]
449  return image_chrome_checksum.strip() == device_chrome_checksum.strip()
450
451
452# Remount partition as writable.
453# TODO: auto-detect if an image is built using --noenable_rootfs_verification.
454def TryRemountPartitionAsRW(chromeos_root, remote, log_level):
455  l = logger.GetLogger()
456  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
457  command = 'sudo mount -o remount,rw /'
458  ret = cmd_executer.CrosRunCommand(\
459    command, chromeos_root=chromeos_root, machine=remote,
460    terminated_timeout=10)
461  if ret:
462    ## Safely ignore.
463    l.LogWarning('Failed to remount partition as rw, '
464                 'probably the image was not built with '
465                 '"--noenable_rootfs_verification", '
466                 'you can safely ignore this.')
467  else:
468    l.LogOutput('Re-mounted partition as writable.')
469
470
471def EnsureMachineUp(chromeos_root, remote, log_level):
472  l = logger.GetLogger()
473  cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
474  timeout = 600
475  magic = 'abcdefghijklmnopqrstuvwxyz'
476  command = 'echo %s' % magic
477  start_time = time.time()
478  while True:
479    current_time = time.time()
480    if current_time - start_time > timeout:
481      l.LogError(
482          'Timeout of %ss reached. Machine still not up. Aborting.' % timeout)
483      return False
484    ret = cmd_executer.CrosRunCommand(
485        command, chromeos_root=chromeos_root, machine=remote)
486    if not ret:
487      return True
488
489
490if __name__ == '__main__':
491  retval = DoImage(sys.argv)
492  sys.exit(retval)
493