1#! /usr/bin/python
2
3# Copyright 2015 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""
8Manage swarming bots.
9
10* Launch bots, e.g. 200 bots:
11    $ swarming_bots.py launch --working_dir WORKING_DIR --id_range '1-200'
12
13* Kill bot 1-200:
14    $ swarming_bots.py kill --working_dir WORKING_DIR --id_range '1-200'
15
16* Check bot 1-200, start if not running:
17    $ swarming_bots.py check --working_dir WORKING_DIR --id_range '1-200'
18
19* The hierachy of working dir is like
20  WORKING_DIR
21    |-- bot_0
22    |   |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
23    |   |-- bot_config.log
24    |   |-- swarming_bot.log
25    |   |-- swarming_bot.zip
26    |   |-- swarming_bot.pid
27    |-- bot_1
28        |-- 092b5bd4562f579711823f61e311de37247c853a-cacert.pem
29        |-- bot_config.log
30        |-- swarming_bot.log
31        |-- swarming_bot.zip
32        |-- pid
33  Note bot_config.py:get_dimensions() will rely on the the bot number
34  in the path to generate bot id.
35
36* TODO (fdeng):
37    ** Restart a bot given a bot id.
38"""
39import argparse
40import logging
41import logging.handlers
42import os
43import re
44import shutil
45import signal
46import subprocess
47import sys
48import threading
49import time
50import urllib
51
52import common
53
54from autotest_lib.client.common_lib import global_config
55
56
57LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
58LOG_FILE_SIZE = 1024 * 5000 # 5000 KB
59LOG_FILE_BACKUPCOUNT = 5
60DEFAULT_SWARMING_PROXY = global_config.global_config.get_config_value(
61        'CROS', "swarming_proxy", default=None)
62ID_RANGE_FMT = r'(\d+)-(\d+)'
63KILL_PROC_TIMEOUT_SECS = 3600 * 3 # 3 hours
64MAX_KILL_PROC_SLEEP_SECS = 60
65
66
67class BotManagementError(Exception):
68    """Raised for any bot management related error."""
69
70
71class PidMisMatchError(BotManagementError):
72    """Raised if pid file doesn't match what's found by pgrep."""
73
74    def __init__(self, known_pid, new_pid):
75        """Initialize.
76
77        @param known_pid: pid in the pid file.
78        @param new_pid: new pid found by pgrep.
79
80        """
81        self.known_pid = known_pid
82        self.new_pid = new_pid
83        msg = 'pid does not match, pid: %s, found %s' % (
84                self.known_pid, self.new_pid)
85        super(PidMisMatchError, self).__init__(msg)
86
87
88class DuplicateBotError(BotManagementError):
89    """Raised when multiple processes are detected for the same bot id."""
90
91
92class SwarmingBot(object):
93    """Class represent a swarming bot."""
94
95
96    PID_FILE = 'swarming_bot.pid'
97    BOT_DIR_FORMAT = 'bot_%s'
98    BOT_FILENAME = 'swarming_bot.zip'
99    # Used to search for bot process
100    # The process may bootstrap itself into swarming_bot.1.zip and swarming_bot.2.zip
101    BOT_CMD_PATTERN = 'swarming_bot.*zip start_bot'
102
103
104    def __init__(self, bot_id, parent_dir, swarming_proxy):
105        """Initialize.
106
107        @param bot_id: An integer.
108        @param bot_dir: The working directory for the bot.
109                        The directory is used to store bot code,
110                        log file, and any file generated by the bot
111                        at run time.
112        @param swarming_proxy: URL to the swarming instance.
113        """
114        self.bot_id = bot_id
115        self.swarming_proxy = swarming_proxy
116        self.parent_dir = os.path.abspath(os.path.expanduser(parent_dir))
117        self.bot_dir = os.path.join(self.parent_dir,
118                                    self.BOT_DIR_FORMAT % self.bot_id)
119        self.pid_file = os.path.join(self.bot_dir, self.PID_FILE)
120        self.pid = None
121        self._refresh_pid()
122        if self.pid is None:
123            logging.debug('[Bot %s] Initialize: bot is not running',
124                          self.bot_id)
125        else:
126            logging.debug('[Bot %s] Initialize: bot is running '
127                          'as process %s', self.bot_id, self.pid)
128
129
130    def _write_pid(self):
131        """Write pid to file"""
132        with open(self.pid_file, 'w') as f:
133            f.write(str(self.pid))
134
135
136    def _cleanup_pid(self):
137        """Cleanup self.pid and pid file."""
138        self.pid = None
139        if os.path.exists(self.pid_file):
140            os.remove(self.pid_file)
141
142
143    def _is_process_running(self):
144        """Check if the process is running."""
145        pattern = os.path.join(self.bot_dir, self.BOT_CMD_PATTERN)
146        pattern = '%s %s' % (sys.executable, pattern)
147        cmd = ['pgrep', '-f', pattern]
148        logging.debug('[Bot %s] check process running: %s',
149                      self.bot_id, str(cmd))
150        try:
151            output = subprocess.check_output(cmd)
152            pids = output.splitlines()
153            if len(pids) > 1:
154                raise DuplicateBotError('Multiple processes (pid: %s) detected for Bot %s'
155                                        % (str(pids), self.bot_id))
156            pid = int(pids[0])
157            if pid != self.pid:
158                raise PidMisMatchError(self.pid, pid)
159            return True
160        except subprocess.CalledProcessError as e:
161            if e.returncode == 1:
162                return False
163            else:
164                raise
165
166
167    def _refresh_pid(self):
168        """Check process status and update self.pid accordingly."""
169        # Reload pid from pid file.
170        if os.path.exists(self.pid_file):
171            with open(self.pid_file) as f:
172                try:
173                    pid = f.readline().strip()
174                    self.pid = int(pid)
175                except ValueError as e:
176                    self.pid = None
177        try:
178            if not self._is_process_running():
179                self._cleanup_pid()
180        except PidMisMatchError as e:
181            logging.error('[Bot %s] %s, updating pid file',
182                          self.bot_id, str(e))
183            self.pid = e.new_pid
184            self._write_pid()
185
186
187    def is_running(self):
188        """Return if the bot is running."""
189        self._refresh_pid()
190        return bool(self.pid)
191
192
193    def ensure_running(self):
194        """Start a swarming bot."""
195        if self.is_running():
196            logging.info(
197                    '[Bot %s] Skip start, bot is already running (pid %s).',
198                    self.bot_id, self.pid)
199            return
200        logging.debug('[Bot %s] Bootstrap bot in %s', self.bot_id, self.bot_dir)
201        if os.path.exists(self.bot_dir):
202            shutil.rmtree(self.bot_dir)
203        os.makedirs(self.bot_dir)
204        dest = os.path.join(self.bot_dir, self.BOT_FILENAME)
205        logging.debug('[Bot %s] Getting bot code from: %s/bot_code',
206                      self.bot_id, self.swarming_proxy)
207        urllib.urlretrieve('%s/bot_code' % self.swarming_proxy, dest)
208        cmd = [sys.executable, self.BOT_FILENAME]
209        logging.debug('[Bot %s] Calling command: %s', self. bot_id, cmd)
210        process = subprocess.Popen(
211                cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
212                cwd=self.bot_dir)
213        self.pid = process.pid
214        self._write_pid()
215        logging.info('[Bot %s] Created bot (pid: %d)', self.bot_id, self.pid)
216
217
218    def kill(self):
219        """Kill the bot."""
220        if not self.is_running():
221            logging.info('[Bot %s] Skip killing bot, Bot is not running',
222                          self.bot_id)
223            return
224        try:
225            logging.info('[Bot %s] killing bot (pid: %d)',
226                          self.bot_id, self.pid)
227            os.kill(self.pid, signal.SIGTERM)
228            start = time.time()
229            sleep = 1
230            while(time.time() - start < KILL_PROC_TIMEOUT_SECS):
231                if not self.is_running():
232                    return
233                sleep = min(sleep * 2, MAX_KILL_PROC_SLEEP_SECS)
234                logging.debug('[Bot %s] Waiting %d secs for bot to finish'
235                              ' any running task and exist.',
236                              self.bot_id, sleep)
237                time.sleep(sleep)
238            else:
239                logging.error(
240                        '[Bot %s] Failed to kill pid %s within %d secs, '
241                        'the bot may be running a long running task, you '
242                        'can retry the script. SIGKILL the process is not '
243                        'recommended, it might lead to unexpected error.',
244                        self.bot_id, self.pid, KILL_PROC_TIMEOUT_SECS)
245        except Exception as e:
246            raise BotManagementError('[Bot %s] %s' % (self.bot_id, str(e)))
247
248
249class BotManager(object):
250    """Class that manages swarming bots."""
251
252
253    CHECK_BOTS_PATTERN = '{executable} {working_dir}.*{bot_cmd_pattern}'
254
255
256    def __init__(self, bot_ids, working_dir, swarming_proxy):
257        """Initialize.
258
259        @param bot_ids: a set of integers.
260        @param working_dir: Working directory of the bots.
261                            Store temporary files.
262        @param swarming_proxy: The swarming instance to talk to.
263        """
264        self.bot_ids = bot_ids
265        self.working_dir = os.path.abspath(os.path.expanduser(working_dir))
266        self.bots = [SwarmingBot(bid, self.working_dir, swarming_proxy)
267                     for bid in bot_ids]
268
269    def launch(self):
270        """Launch bots."""
271        for bot in self.bots:
272          try:
273              bot.ensure_running()
274          except BotManagementError as e:
275              logging.error('[BotManager] Failed to start Bot %s: %s',
276                            bot.bot_id, str(e))
277        # If we let the process exit immediately, the last process won't
278        # be launched sometimes. So sleep for 3 secs.
279        # The right way is to query the server until all bots are seen
280        # by the server by visiting
281        # https://SWARMING_PROXY/swarming/api/v1/client/bots
282        # However, this would require oauth authentication (install
283        # oauth library and install credentials).
284        logging.info('Wait 3 seconds for process creation to complete.')
285        time.sleep(3)
286
287
288    def kill(self):
289        """Kill running bots."""
290        # Start threads to kill bots.
291        threads = []
292        for bot in self.bots:
293            t = threading.Thread(target=bot.kill)
294            threads.append(t)
295            t.setDaemon(True)
296            t.start()
297        # Wait on threads.
298        try:
299            while threading.active_count() > 1:
300                time.sleep(0.1)
301        except KeyboardInterrupt:
302            msg = 'Ctrl-c recieved! Bots status not confirmed. Exit.'
303            logging.error(msg)
304            print msg
305
306
307    def check(self):
308        """Check running bots, start it if not running."""
309        pattern =  self.CHECK_BOTS_PATTERN.format(
310                executable=sys.executable, working_dir=self.working_dir,
311                bot_cmd_pattern=SwarmingBot.BOT_CMD_PATTERN)
312        cmd = ['pgrep', '-f', pattern]
313        logging.debug('[BotManager] Check bot counts: %s', str(cmd))
314        try:
315            output = subprocess.check_output(cmd)
316            bot_count = len(output.splitlines())
317        except subprocess.CalledProcessError as e:
318            if e.returncode == 1:
319                bot_count = 0
320            else:
321                raise
322        missing_count = len(self.bot_ids) - bot_count
323        logging.info(
324                '[BotManager] Check bot counts: %d bots running, missing: %d',
325                bot_count, missing_count)
326        if missing_count > 0:
327            logging.info('[BotManager] Checking all bots')
328            self.launch()
329
330
331def _parse_range(id_range):
332    """Convert an id range to a set of bot ids.
333
334    @param id_range: A range of integer, e.g "1-200".
335
336    @returns a set of bot ids set([1,2,...200])
337    """
338    m = re.match(ID_RANGE_FMT, id_range)
339    if not m:
340        raise ValueError('Could not parse %s' % id_range)
341    min, max = int(m.group(1)), int(m.group(2))
342    return set(bid for bid in range(min, max+1))
343
344
345def _parse_args(args):
346    """Parse args.
347
348    @param args: Argument list passed from main.
349
350    @return: A tuple with the parsed args, as returned by parser.parse_args.
351    """
352    parser = argparse.ArgumentParser(
353            description='Launch swarming bots on a autotest server')
354    action_help = ('launch: launch bots. '
355                  'kill: kill bots. '
356                  'check: check if bots are running, if not, starting bots.')
357    parser.add_argument(
358            'action', choices=('launch', 'kill', 'check'), help=action_help)
359    parser.add_argument(
360            '-r', '--id_range', type=str, dest='id_range', required=True,
361            help='A range of integer, each bot created will be labeled '
362                 'with an id from this range. E.g. "1-200"')
363    parser.add_argument(
364            '-d', '--working_dir', type=str, dest='working_dir', required=True,
365            help='A working directory where bots will store files '
366                 'generated at runtime')
367    parser.add_argument(
368            '-p', '--swarming_proxy', type=str, dest='swarming_proxy',
369            default=DEFAULT_SWARMING_PROXY,
370            help='The URL of the swarming instance to talk to, '
371                 'Default to the one specified in global config')
372    parser.add_argument(
373            '-f', '--log_file', dest='log_file', required=False,
374            help='Path to the log file.')
375    parser.add_argument(
376            '-v', '--verbose', dest='verbose', action='store_true',
377            help='Verbose mode')
378
379    return parser.parse_args(args)
380
381
382def _setup_logging(verbose, log_file):
383    """Setup logging.
384
385    @param verbose: bool, if True, log at DEBUG level, otherwise INFO level.
386    @param log_file; path to log file.
387    """
388    log_formatter = logging.Formatter(LOGGING_FORMAT)
389    if not log_file:
390        handler = logging.StreamHandler()
391    else:
392        handler = logging.handlers.RotatingFileHandler(
393                filename=log_file, maxBytes=LOG_FILE_SIZE,
394                backupCount=LOG_FILE_BACKUPCOUNT)
395    handler.setFormatter(log_formatter)
396    logger = logging.getLogger()
397    log_level = logging.DEBUG if verbose else logging.INFO
398    logger.setLevel(log_level)
399    logger.addHandler(handler)
400
401
402def main(args):
403    """Main.
404
405    @args: A list of system arguments.
406    """
407    args = _parse_args(args)
408    _setup_logging(args.verbose, args.log_file)
409
410    if not args.swarming_proxy:
411        logging.error(
412                'No swarming proxy instance specified. '
413                'Specify swarming_proxy in [CROS] in shadow_config, '
414                'or use --swarming_proxy')
415        return 1
416    if not args.swarming_proxy.startswith('https://'):
417        swarming_proxy = 'https://' + args.swarming_proxy
418    else:
419        swarming_proxy = args.swarming_proxy
420
421    logging.info('Connecting to %s', swarming_proxy)
422    m = BotManager(_parse_range(args.id_range),
423                   args.working_dir, args.swarming_proxy)
424
425    if args.action == 'launch':
426        m.launch()
427    elif args.action == 'kill':
428        m.kill()
429    elif args.action == 'check':
430        m.check()
431
432
433if __name__ == '__main__':
434    sys.exit(main(sys.argv[1:]))
435