1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4"""Utility module that executes management commands on the drone.
5
61. This is the module responsible for orchestrating processes on a drone.
72. It receives instructions via stdin and replies via stdout.
83. Each invocation is responsible for the initiation of a set of batched calls.
94. The batched calls may be synchronous or asynchronous.
105. The caller is responsible for monitoring asynchronous calls through pidfiles.
11"""
12
13
14import argparse
15import pickle, subprocess, os, shutil, sys, time, signal, getpass
16import datetime, traceback, tempfile, itertools, logging
17import common
18from autotest_lib.client.common_lib import utils, global_config, error
19from autotest_lib.client.common_lib import logging_manager
20from autotest_lib.client.common_lib.cros import retry
21from autotest_lib.client.common_lib.cros.graphite import autotest_stats
22from autotest_lib.scheduler import drone_logging_config
23from autotest_lib.scheduler import email_manager, scheduler_config
24from autotest_lib.server import hosts, subcommand
25
26
27# An environment variable we add to the environment to enable us to
28# distinguish processes we started from those that were started by
29# something else during recovery.  Name credit goes to showard. ;)
30DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
31
32_TEMPORARY_DIRECTORY = 'drone_tmp'
33_TRANSFER_FAILED_FILE = '.transfer_failed'
34
35# script and log file for cleaning up orphaned lxc containers.
36LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
37                                  'lxc_cleanup.py')
38LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
39                                    'lxc_cleanup.log')
40
41
42_STATS_KEY = 'drone_utility'
43timer = autotest_stats.Timer(_STATS_KEY)
44
45class _MethodCall(object):
46    def __init__(self, method, args, kwargs):
47        self._method = method
48        self._args = args
49        self._kwargs = kwargs
50
51
52    def execute_on(self, drone_utility):
53        method = getattr(drone_utility, self._method)
54        return method(*self._args, **self._kwargs)
55
56
57    def __str__(self):
58        args = ', '.join(repr(arg) for arg in self._args)
59        kwargs = ', '.join('%s=%r' % (key, value) for key, value in
60                           self._kwargs.iteritems())
61        full_args = ', '.join(item for item in (args, kwargs) if item)
62        return '%s(%s)' % (self._method, full_args)
63
64
65def call(method, *args, **kwargs):
66    return _MethodCall(method, args, kwargs)
67
68
69class BaseDroneUtility(object):
70    """
71    This class executes actual OS calls on the drone machine.
72
73    All paths going into and out of this class are absolute.
74    """
75    _WARNING_DURATION = 400
76
77    def __init__(self):
78        # Tattoo ourselves so that all of our spawn bears our mark.
79        os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
80
81        self.warnings = []
82        self._subcommands = []
83
84
85    def initialize(self, results_dir):
86        temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
87        if os.path.exists(temporary_directory):
88            # TODO crbug.com/391111: before we have a better solution to
89            # periodically cleanup tmp files, we have to use rm -rf to delete
90            # the whole folder. shutil.rmtree has performance issue when a
91            # folder has large amount of files, e.g., drone_tmp.
92            os.system('rm -rf %s' % temporary_directory)
93        self._ensure_directory_exists(temporary_directory)
94        # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
95        # and clean up build_externals so it NO-OP's.
96        build_externals = global_config.global_config.get_config_value(
97                scheduler_config.CONFIG_SECTION, 'drone_build_externals',
98                default=True, type=bool)
99        if build_externals:
100            build_extern_cmd = os.path.join(common.autotest_dir,
101                                            'utils', 'build_externals.py')
102            utils.run(build_extern_cmd)
103
104
105    def _warn(self, warning):
106        self.warnings.append(warning)
107
108
109    @staticmethod
110    def _check_pid_for_dark_mark(pid, open=open):
111        try:
112            env_file = open('/proc/%s/environ' % pid, 'rb')
113        except EnvironmentError:
114            return False
115        try:
116            env_data = env_file.read()
117        finally:
118            env_file.close()
119        return DARK_MARK_ENVIRONMENT_VAR in env_data
120
121
122    _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
123
124
125    @classmethod
126    @timer.decorate
127    def _get_process_info(cls):
128        """Parse ps output for all process information.
129
130        @returns A generator of dicts with cls._PS_ARGS as keys and
131            string values each representing a running process. eg:
132            {
133                'comm': command_name,
134                'pgid': process group id,
135                'ppid': parent process id,
136                'pid': process id,
137                'args': args the command was invoked with,
138            }
139        """
140        @retry.retry(subprocess.CalledProcessError,
141                     timeout_min=0.5, delay_sec=0.25)
142        def run_ps():
143            return subprocess.check_output(
144                    ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)])
145
146        ps_output = run_ps()
147        # split each line into the columns output by ps
148        split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
149        return (dict(itertools.izip(cls._PS_ARGS, line_components))
150                for line_components in split_lines)
151
152
153    def _refresh_processes(self, command_name, open=open,
154                           site_check_parse=None):
155        """Refreshes process info for the given command_name.
156
157        Examines ps output as returned by get_process_info and returns
158        the process dicts for processes matching the given command name.
159
160        @param command_name: The name of the command, eg 'autoserv'.
161
162        @return: A list of process info dictionaries as returned by
163            _get_process_info.
164        """
165        # The open argument is used for test injection.
166        check_mark = global_config.global_config.get_config_value(
167            'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
168        processes = []
169        for info in self._get_process_info():
170            is_parse = (site_check_parse and site_check_parse(info))
171            if info['comm'] == command_name or is_parse:
172                if (check_mark and not
173                        self._check_pid_for_dark_mark(info['pid'], open=open)):
174                    self._warn('%(comm)s process pid %(pid)s has no '
175                               'dark mark; ignoring.' % info)
176                    continue
177                processes.append(info)
178
179        return processes
180
181
182    @timer.decorate
183    def _read_pidfiles(self, pidfile_paths):
184        pidfiles = {}
185        for pidfile_path in pidfile_paths:
186            if not os.path.exists(pidfile_path):
187                continue
188            try:
189                file_object = open(pidfile_path, 'r')
190                pidfiles[pidfile_path] = file_object.read()
191                file_object.close()
192            except IOError:
193                continue
194        return pidfiles
195
196
197    @timer.decorate
198    def refresh(self, pidfile_paths):
199        """
200        pidfile_paths should be a list of paths to check for pidfiles.
201
202        Returns a dict containing:
203        * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
204        that exist.
205        * autoserv_processes: list of dicts corresponding to running autoserv
206        processes.  each dict contain pid, pgid, ppid, comm, and args (see
207        "man ps" for details).
208        * parse_processes: likewise, for parse processes.
209        * pidfiles_second_read: same info as pidfiles, but gathered after the
210        processes are scanned.
211        """
212        site_check_parse = utils.import_site_function(
213                __file__, 'autotest_lib.scheduler.site_drone_utility',
214                'check_parse', lambda x: False)
215        results = {
216            'pidfiles' : self._read_pidfiles(pidfile_paths),
217            # element 0 of _get_process_info() is the headers from `ps`
218            'all_processes' : list(self._get_process_info())[1:],
219            'autoserv_processes' : self._refresh_processes('autoserv'),
220            'parse_processes' : self._refresh_processes(
221                    'parse', site_check_parse=site_check_parse),
222            'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
223        }
224        return results
225
226
227    def get_signal_queue_to_kill(self, process):
228        """Get the signal queue needed to kill a process.
229
230        autoserv process has a handle on SIGTERM, in which it can do some
231        cleanup work. However, abort a process with SIGTERM then SIGKILL has
232        its overhead, detailed in following CL:
233        https://chromium-review.googlesource.com/230323
234        This method checks the process's argument and determine if SIGTERM is
235        required, and returns signal queue accordingly.
236
237        @param process: A drone_manager.Process object to be killed.
238
239        @return: The signal queue needed to kill a process.
240
241        """
242        signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
243        try:
244            ps_output = subprocess.check_output(
245                    ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
246            # For test running with server-side packaging, SIGTERM needs to be
247            # sent for autoserv process to destroy container used by the test.
248            if '--require-ssp' in ps_output:
249                logging.debug('PID %d requires SIGTERM to abort to cleanup '
250                              'container.', process.pid)
251                return signal_queue_with_sigterm
252        except subprocess.CalledProcessError:
253            # Ignore errors, return the signal queue with SIGTERM to be safe.
254            return signal_queue_with_sigterm
255        # Default to kill the process with SIGKILL directly.
256        return (signal.SIGKILL,)
257
258
259    @timer.decorate
260    def kill_processes(self, process_list):
261        """Send signals escalating in severity to the processes in process_list.
262
263        @param process_list: A list of drone_manager.Process objects
264                             representing the processes to kill.
265        """
266        kill_proc_key = 'kill_processes'
267        autotest_stats.Gauge(_STATS_KEY).send('%s.%s' % (kill_proc_key, 'net'),
268                                              len(process_list))
269        try:
270            logging.info('List of process to be killed: %s', process_list)
271            processes_to_kill = {}
272            for p in process_list:
273                signal_queue = self.get_signal_queue_to_kill(p)
274                processes_to_kill[signal_queue] = (
275                        processes_to_kill.get(signal_queue, []) + [p])
276            sig_counts = {}
277            for signal_queue, processes in processes_to_kill.iteritems():
278                sig_counts.update(utils.nuke_pids(
279                        [-process.pid for process in processes],
280                        signal_queue=signal_queue))
281            for name, count in sig_counts.iteritems():
282                autotest_stats.Gauge(_STATS_KEY).send(
283                        '%s.%s' % (kill_proc_key, name), count)
284        except error.AutoservRunError as e:
285            self._warn('Error occured when killing processes. Error: %s' % e)
286
287
288    def _convert_old_host_log(self, log_path):
289        """
290        For backwards compatibility only.  This can safely be removed in the
291        future.
292
293        The scheduler used to create files at results/hosts/<hostname>, and
294        append all host logs to that file.  Now, it creates directories at
295        results/hosts/<hostname>, and places individual timestamped log files
296        into that directory.
297
298        This can be a problem the first time the scheduler runs after upgrading.
299        To work around that, we'll look for a file at the path where the
300        directory should be, and if we find one, we'll automatically convert it
301        to a directory containing the old logfile.
302        """
303        # move the file out of the way
304        temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
305        base_name = os.path.basename(log_path)
306        temp_path = os.path.join(temp_dir, base_name)
307        os.rename(log_path, temp_path)
308
309        os.mkdir(log_path)
310
311        # and move it into the new directory
312        os.rename(temp_path, os.path.join(log_path, 'old_log'))
313        os.rmdir(temp_dir)
314
315
316    def _ensure_directory_exists(self, path):
317        if os.path.isdir(path):
318            return
319
320        if os.path.exists(path):
321            # path exists already, but as a file, not a directory
322            if '/hosts/' in path:
323                self._convert_old_host_log(path)
324                return
325            else:
326                raise IOError('Path %s exists as a file, not a directory')
327
328        os.makedirs(path)
329
330
331    def execute_command(self, command, working_directory, log_file,
332                        pidfile_name):
333        out_file = None
334        if log_file:
335            self._ensure_directory_exists(os.path.dirname(log_file))
336            try:
337                out_file = open(log_file, 'a')
338                separator = ('*' * 80) + '\n'
339                out_file.write('\n' + separator)
340                out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
341                out_file.write(separator)
342            except (OSError, IOError):
343                email_manager.manager.log_stacktrace(
344                    'Error opening log file %s' % log_file)
345
346        if not out_file:
347            out_file = open('/dev/null', 'w')
348
349        in_devnull = open('/dev/null', 'r')
350
351        self._ensure_directory_exists(working_directory)
352        pidfile_path = os.path.join(working_directory, pidfile_name)
353        if os.path.exists(pidfile_path):
354            self._warn('Pidfile %s already exists' % pidfile_path)
355            os.remove(pidfile_path)
356
357        subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
358                         stdin=in_devnull)
359        out_file.close()
360        in_devnull.close()
361
362
363    def write_to_file(self, file_path, contents, is_retry=False):
364        """Write the specified contents to the end of the given file.
365
366        @param file_path: Path to the file.
367        @param contents: Content to be written to the file.
368        @param is_retry: True if this is a retry after file permission be
369                         corrected.
370        """
371        self._ensure_directory_exists(os.path.dirname(file_path))
372        try:
373            file_object = open(file_path, 'a')
374            file_object.write(contents)
375            file_object.close()
376        except IOError as e:
377            # TODO(dshi): crbug.com/459344 Remove following retry when test
378            # container can be unprivileged container.
379            # If write failed with error 'Permission denied', one possible cause
380            # is that the file was created in a container and thus owned by
381            # root. If so, fix the file permission, and try again.
382            if e.errno == 13 and not is_retry:
383                logging.error('Error write to file %s: %s. Will be retried.',
384                              file_path, e)
385                utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
386                utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
387                self.write_to_file(file_path, contents, is_retry=True)
388            else:
389                self._warn('Error write to file %s: %s' % (file_path, e))
390
391
392    def copy_file_or_directory(self, source_path, destination_path):
393        """
394        This interface is designed to match server.hosts.abstract_ssh.get_file
395        (and send_file).  That is, if the source_path ends with a slash, the
396        contents of the directory are copied; otherwise, the directory iself is
397        copied.
398        """
399        if self._same_file(source_path, destination_path):
400            return
401        self._ensure_directory_exists(os.path.dirname(destination_path))
402        if source_path.endswith('/'):
403            # copying a directory's contents to another directory
404            assert os.path.isdir(source_path)
405            assert os.path.isdir(destination_path)
406            for filename in os.listdir(source_path):
407                self.copy_file_or_directory(
408                    os.path.join(source_path, filename),
409                    os.path.join(destination_path, filename))
410        elif os.path.isdir(source_path):
411            shutil.copytree(source_path, destination_path, symlinks=True)
412        elif os.path.islink(source_path):
413            # copied from shutil.copytree()
414            link_to = os.readlink(source_path)
415            os.symlink(link_to, destination_path)
416        else:
417            shutil.copy(source_path, destination_path)
418
419
420    def _same_file(self, source_path, destination_path):
421        """Checks if the source and destination are the same
422
423        Returns True if the destination is the same as the source, False
424        otherwise. Also returns False if the destination does not exist.
425        """
426        if not os.path.exists(destination_path):
427            return False
428        return os.path.samefile(source_path, destination_path)
429
430
431    def cleanup_orphaned_containers(self):
432        """Run lxc_cleanup script to clean up orphaned container.
433        """
434        # The script needs to run with sudo as the containers are privileged.
435        # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
436        # container can be unprivileged container.
437        command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
438                   LXC_CLEANUP_LOG_FILE]
439        logging.info('Running %s', command)
440        # stdout and stderr needs to be direct to /dev/null, otherwise existing
441        # of drone_utils process will kill lxc_cleanup script.
442        subprocess.Popen(
443                command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
444                stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
445
446
447    def wait_for_all_async_commands(self):
448        for subproc in self._subcommands:
449            subproc.fork_waitfor()
450        self._subcommands = []
451
452
453    def _poll_async_commands(self):
454        still_running = []
455        for subproc in self._subcommands:
456            if subproc.poll() is None:
457                still_running.append(subproc)
458        self._subcommands = still_running
459
460
461    def _wait_for_some_async_commands(self):
462        self._poll_async_commands()
463        max_processes = scheduler_config.config.max_transfer_processes
464        while len(self._subcommands) >= max_processes:
465            time.sleep(1)
466            self._poll_async_commands()
467
468
469    def run_async_command(self, function, args):
470        subproc = subcommand.subcommand(function, args)
471        self._subcommands.append(subproc)
472        subproc.fork_start()
473
474
475    def _sync_get_file_from(self, hostname, source_path, destination_path):
476        logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
477                      'destination_path: %s', hostname, source_path,
478                      destination_path)
479        self._ensure_directory_exists(os.path.dirname(destination_path))
480        host = create_host(hostname)
481        host.get_file(source_path, destination_path, delete_dest=True)
482
483
484    def get_file_from(self, hostname, source_path, destination_path):
485        self.run_async_command(self._sync_get_file_from,
486                               (hostname, source_path, destination_path))
487
488
489    def sync_send_file_to(self, hostname, source_path, destination_path,
490                           can_fail):
491        logging.debug('sync_send_file_to. hostname: %s, source_path: %s, '
492                      'destination_path: %s, can_fail:%s', hostname,
493                      source_path, destination_path, can_fail)
494        host = create_host(hostname)
495        try:
496            host.run('mkdir -p ' + os.path.dirname(destination_path))
497            host.send_file(source_path, destination_path, delete_dest=True)
498        except error.AutoservError:
499            if not can_fail:
500                raise
501
502            if os.path.isdir(source_path):
503                failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
504                file_object = open(failed_file, 'w')
505                try:
506                    file_object.write('%s:%s\n%s\n%s' %
507                                      (hostname, destination_path,
508                                       datetime.datetime.now(),
509                                       traceback.format_exc()))
510                finally:
511                    file_object.close()
512            else:
513                copy_to = destination_path + _TRANSFER_FAILED_FILE
514                self._ensure_directory_exists(os.path.dirname(copy_to))
515                self.copy_file_or_directory(source_path, copy_to)
516
517
518    def send_file_to(self, hostname, source_path, destination_path,
519                     can_fail=False):
520        self.run_async_command(self.sync_send_file_to,
521                               (hostname, source_path, destination_path,
522                                can_fail))
523
524
525    def _report_long_execution(self, calls, duration):
526        call_count = {}
527        for call in calls:
528            call_count.setdefault(call._method, 0)
529            call_count[call._method] += 1
530        call_summary = '\n'.join('%d %s' % (count, method)
531                                 for method, count in call_count.iteritems())
532        self._warn('Execution took %f sec\n%s' % (duration, call_summary))
533
534
535    def execute_calls(self, calls):
536        results = []
537        start_time = time.time()
538        max_processes = scheduler_config.config.max_transfer_processes
539        for method_call in calls:
540            results.append(method_call.execute_on(self))
541            if len(self._subcommands) >= max_processes:
542                self._wait_for_some_async_commands()
543        self.wait_for_all_async_commands()
544
545        duration = time.time() - start_time
546        if duration > self._WARNING_DURATION:
547            self._report_long_execution(calls, duration)
548
549        warnings = self.warnings
550        self.warnings = []
551        return dict(results=results, warnings=warnings)
552
553
554def create_host(hostname):
555    username = global_config.global_config.get_config_value(
556        'SCHEDULER', hostname + '_username', default=getpass.getuser())
557    return hosts.SSHHost(hostname, user=username)
558
559
560def parse_input():
561    input_chunks = []
562    chunk_of_input = sys.stdin.read()
563    while chunk_of_input:
564        input_chunks.append(chunk_of_input)
565        chunk_of_input = sys.stdin.read()
566    pickled_input = ''.join(input_chunks)
567
568    try:
569        return pickle.loads(pickled_input)
570    except Exception:
571        separator = '*' * 50
572        raise ValueError('Unpickling input failed\n'
573                         'Input: %r\n'
574                         'Exception from pickle:\n'
575                         '%s\n%s\n%s' %
576                         (pickled_input, separator, traceback.format_exc(),
577                          separator))
578
579
580def _parse_args(args):
581    parser = argparse.ArgumentParser(description='Local drone process manager.')
582    parser.add_argument('--call_time',
583                        help='Time this process was invoked from the master',
584                        default=None, type=float)
585    return parser.parse_args(args)
586
587
588SiteDroneUtility = utils.import_site_class(
589   __file__, 'autotest_lib.scheduler.site_drone_utility',
590   'SiteDroneUtility', BaseDroneUtility)
591
592
593class DroneUtility(SiteDroneUtility):
594    pass
595
596
597def return_data(data):
598    print pickle.dumps(data)
599
600
601def main():
602    logging_manager.configure_logging(
603            drone_logging_config.DroneLoggingConfig())
604    with timer.get_client('decode'):
605        calls = parse_input()
606    args = _parse_args(sys.argv[1:])
607    if args.call_time is not None:
608        autotest_stats.Gauge(_STATS_KEY).send('invocation_overhead',
609                                              time.time() - args.call_time)
610
611    drone_utility = DroneUtility()
612    return_value = drone_utility.execute_calls(calls)
613    with timer.get_client('encode'):
614        return_data(return_value)
615
616
617if __name__ == '__main__':
618    main()
619