1#!/usr/bin/python
2#
3# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Updates all unlocked hosts in Autotest lab in parallel at a given rate.
8
9Used to update all hosts, or only those of a given platform, in the Autotest
10lab to a given version. Allows a configurable number of updates to be started in
11parallel. Updates can also be staggered to reduce load."""
12
13import logging
14import os
15import subprocess
16import sys
17import threading
18import time
19import traceback
20
21from collections import deque
22from optparse import OptionParser
23
24
25# Default number of hosts to update in parallel.
26DEFAULT_CONCURRENCY = 10
27
28
29# By default do not stagger any of the updates.
30DEFAULT_STAGGER = 0
31
32
33# Default location of ChromeOS checkout.
34DEFAULT_GCLIENT_ROOT = '/usr/local/google/home/${USER}/chromeos/chromeos'
35
36
37# Default path for individual host logs. Each host will have it's own file. E.g.
38# <default_log_path>/<host>.log
39DEFAULT_LOG_PATH = '/tmp/mass_update_logs/%s/' % time.strftime('%Y-%m-%d-%H-%M',
40                                                               time.gmtime())
41
42
43# Location of Autotest cli executable.
44AUTOTEST_LOCATION = '/home/chromeos-test/autotest/cli'
45
46
47# Default time in seconds to sleep while waiting for threads to complete.
48DEFAULT_SLEEP = 10
49
50
51# Amount of time in seconds to wait before declaring an update as failed.
52DEFAULT_TIMEOUT = 2400
53
54
55class MassUpdateStatus():
56  """Used to track status for all updates."""
57  ssh_failures = []
58  update_failures = []
59  successful_updates = 0
60
61
62class UpdateThread(threading.Thread):
63  """Responsible for ssh-test, locking, imaging, and unlocking a host.
64
65  Uses the atest CLI for host control and the image_to_live script to actually
66  update the host. Each thread will continue to process hosts until the queue
67  is empty."""
68
69  _SUCCESS = 0            # Update was successful.
70  _SSH_FAILURE = 1        # Could not SSH to host or related SSH failure.
71  _UPDATE_FAILURE = 2     # Update failed for any reason other than SSH.
72
73  def __init__(self, options, hosts, status):
74    self._options = options
75    self._hosts = hosts
76    self._status = status
77    self._logger = logging.getLogger()
78    threading.Thread.__init__(self)
79
80  def run(self):
81    while self._hosts:
82      host = self._hosts.popleft()
83      status = UpdateThread._UPDATE_FAILURE
84
85      self._logger.info('Updating host %s' % host)
86      try:
87        try:
88          if not CheckSSH(host=host, options=self._options):
89            status = UpdateThread._SSH_FAILURE
90          elif LockHost(host) and ImageHost(host=host, options=self._options):
91            status = UpdateThread._SUCCESS
92        finally:
93          if status == UpdateThread._SUCCESS:
94            self._logger.info(
95                'Completed update for host %s successfully.' % host)
96            self._status.successful_updates += 1
97          elif status == UpdateThread._SSH_FAILURE:
98            self._logger.info('Failed to SSH to host %s.' % host)
99            self._status.ssh_failures.append(host)
100          else:
101            self._logger.info('Failed to update host %s.' % host)
102            self._status.update_failures.append(host)
103
104          UnlockHost(host)
105      except:
106        traceback.print_exc()
107        self._logger.warning(
108            'Exception encountered during update. Skipping host %s.' % host)
109
110
111def CheckSSH(host, options):
112  """Uses the ssh_test script to ensure SSH access to a host is available.
113
114  Returns true if an SSH connection to the host was successful."""
115  return subprocess.Popen(
116      '%s/src/scripts/ssh_test.sh --remote=%s' % (options.gclient, host),
117      shell=True,
118      stdout=subprocess.PIPE,
119      stderr=subprocess.PIPE).wait() == 0
120
121
122def ImageHost(host, options):
123  """Uses the image_to_live script to update a host.
124
125  Returns true if the imaging process was successful."""
126  log_file = open(os.path.join(options.log, host + '.log'), 'w')
127  log_file_err = open(os.path.join(options.log, host + '.log.err'), 'w')
128
129  exit_code = subprocess.Popen(
130      ('/usr/local/scripts/alarm %d %s/src/scripts/image_to_live.sh '
131       '--update_url %s --remote %s' % (DEFAULT_TIMEOUT, options.gclient,
132                                        options.url, host)),
133      shell=True,
134      stdout=log_file,
135      stderr=log_file_err).wait()
136
137  log_file.close()
138  log_file_err.close()
139
140  return exit_code == 0
141
142
143def LockHost(host):
144  """Locks a host using the atest CLI.
145
146  Locking a host tells Autotest that the host shouldn't be scheduled for
147  any other tasks. Returns true if the locking process was successful."""
148  success = subprocess.Popen(
149      '%s/atest host mod -l %s' % (AUTOTEST_LOCATION, host),
150      shell=True,
151      stdout=subprocess.PIPE,
152      stderr=subprocess.PIPE).wait() == 0
153
154  if not success:
155    logging.getLogger().info('Failed to lock host %s.' % host)
156
157  return success
158
159
160def UnlockHost(host):
161  """Unlocks a host using the atest CLI.
162
163  Unlocking a host tells Autotest that the host is okay to be scheduled
164  for other tasks. Returns true if the unlocking process was successful."""
165  success = subprocess.Popen(
166      '%s/atest host mod -u %s' % (AUTOTEST_LOCATION, host),
167      shell=True,
168      stdout=subprocess.PIPE,
169      stderr=subprocess.PIPE).wait() == 0
170
171  if not success:
172    logging.getLogger().info('Failed to unlock host %s.' % host)
173
174  return success
175
176
177def GetHostQueue(options):
178  """Returns a queue containing unlocked hosts retrieved from the atest CLI.
179
180  If options.label has been specified only unlocked hosts with the specified
181  label will be returned."""
182  cmd = ('%s/atest host list --unlocked -s Ready -a acl_cros_test'
183         % AUTOTEST_LOCATION)
184
185  if options.label:
186    cmd += ' -b ' + options.label
187
188  # atest host list will return a tabular data set. Use sed to remove the first
189  # line which contains column labels we don't need. Then since the first column
190  # contains the host name, use awk to extract it
191  cmd += " | sed '1d' | awk '{print $1}'"
192
193  stdout = subprocess.Popen(cmd,
194                            shell=True,
195                            stdout=subprocess.PIPE,
196                            stderr=subprocess.PIPE).communicate()[0]
197
198  return deque(item.strip() for item in stdout.split('\n') if item.strip())
199
200
201def ParseOptions():
202  usage = 'usage: %prog --url=<update url> [options]'
203  parser = OptionParser(usage)
204  parser.add_option('-b', '--label', dest='label',
205                    help='Only update hosts with the specified label.')
206  parser.add_option('-c', '--concurrent', dest='concurrent',
207                    default=DEFAULT_CONCURRENCY,
208                    help=('Number of hosts to be updated concurrently. '
209                          'Defaults to %d hosts.') % DEFAULT_CONCURRENCY)
210  parser.add_option('-g', '--gclient', dest='gclient',
211                    default=DEFAULT_GCLIENT_ROOT,
212                    help=('Location of ChromeOS checkout. defaults to %s'
213                    % DEFAULT_GCLIENT_ROOT))
214  parser.add_option('-l', '--log', dest='log',
215                    default=DEFAULT_LOG_PATH,
216                    help=('Where to put individual host log files. '
217                          'Defaults to %s' % DEFAULT_LOG_PATH))
218  parser.add_option('-s', '--stagger', dest='stagger',
219                    default=DEFAULT_STAGGER,
220                    help=('Attempt to stagger updates. Waits the given amount '
221                          'of time in minutes before starting each updater. '
222                          'Updates will still overlap if the value is set as a '
223                          'multiple of the update period.'))
224  parser.add_option('-u', '--url', dest='url',
225                    help='Update URL. Points to build for updating hosts.')
226
227  options = parser.parse_args()[0]
228
229  if options.url is None:
230    parser.error('An update URL must be provided.')
231
232  return options
233
234
235def InitializeLogging():
236  """Configure the global logger for time/date stamping console output.
237
238  Returns a logger object for convenience."""
239  logger = logging.getLogger()
240  logger.setLevel(logging.INFO)
241
242  stream_handler = logging.StreamHandler()
243  stream_handler.setLevel(logging.INFO)
244  stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
245  logger.addHandler(stream_handler)
246  return logger
247
248
249def main():
250  options = ParseOptions()
251  hosts = GetHostQueue(options)
252  logger = InitializeLogging()
253  status = MassUpdateStatus()
254
255  # Create log folder if it doesn't exist.
256  if not os.path.exists(options.log):
257    os.makedirs(options.log)
258
259  logger.info('Starting update using URL %s' % options.url)
260  logger.info('Individual host logs can be found under %s' % options.log)
261
262  try:
263    # Spawn processing threads which will handle lock, update, and unlock.
264    for i in range(int(options.concurrent)):
265      UpdateThread(hosts=hosts, options=options, status=status).start()
266
267      # Stagger threads if the option has been enabled.
268      if options.stagger > 0:
269        time.sleep(int(options.stagger) * 60)
270
271    # Wait for all hosts to be processed and threads to complete. NOTE: Not
272    # using hosts.join() here because it does not behave properly with CTRL-C
273    # and KeyboardInterrupt.
274    while len(threading.enumerate()) > 1:
275      time.sleep(DEFAULT_SLEEP)
276  except:
277    traceback.print_exc()
278    logger.warning(
279        'Update process aborted. Some machines may be left locked or updating.')
280    sys.exit(1)
281  finally:
282    logger.info(
283        ('Mass updating complete. %d hosts updated successfully, %d failed.' %
284        (status.successful_updates, len(status.ssh_failures) +
285            len(status.update_failures))))
286
287    logger.info(('-' * 25) + '[ SUMMARY ]' + ('-' * 25))
288
289    for host in status.ssh_failures:
290      logger.info('Failed to SSH to host %s.' % host)
291
292    for host in status.update_failures:
293      logger.info('Failed to update host %s.' % host)
294
295    if len(status.ssh_failures) == 0 and len(status.update_failures) == 0:
296      logger.info('All hosts updated successfully.')
297
298    logger.info('-' * 61)
299
300
301if __name__ == '__main__':
302  main()
303