1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4"""Utility module that executes management commands on the drone.
5
61. This is the module responsible for orchestrating processes on a drone.
72. It receives instructions via stdin and replies via stdout.
83. Each invocation is responsible for the initiation of a set of batched calls.
94. The batched calls may be synchronous or asynchronous.
105. The caller is responsible for monitoring asynchronous calls through pidfiles.
11"""
12
13
14import argparse
15import pickle, subprocess, os, shutil, sys, time, signal, getpass
16import datetime, traceback, tempfile, itertools, logging
17import common
18from autotest_lib.client.common_lib import utils, global_config, error
19from autotest_lib.client.common_lib import logging_manager
20from autotest_lib.client.common_lib.cros import retry
21from autotest_lib.scheduler import drone_logging_config
22from autotest_lib.scheduler import email_manager, scheduler_config
23from autotest_lib.server import hosts, subcommand
24
25
26# An environment variable we add to the environment to enable us to
27# distinguish processes we started from those that were started by
28# something else during recovery.  Name credit goes to showard. ;)
29DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
30
31_TEMPORARY_DIRECTORY = 'drone_tmp'
32_TRANSFER_FAILED_FILE = '.transfer_failed'
33
34# script and log file for cleaning up orphaned lxc containers.
35LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
36                                  'lxc_cleanup.py')
37LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
38                                    'lxc_cleanup.log')
39
40
41class _MethodCall(object):
42    def __init__(self, method, args, kwargs):
43        self._method = method
44        self._args = args
45        self._kwargs = kwargs
46
47
48    def execute_on(self, drone_utility):
49        method = getattr(drone_utility, self._method)
50        return method(*self._args, **self._kwargs)
51
52
53    def __str__(self):
54        args = ', '.join(repr(arg) for arg in self._args)
55        kwargs = ', '.join('%s=%r' % (key, value) for key, value in
56                           self._kwargs.iteritems())
57        full_args = ', '.join(item for item in (args, kwargs) if item)
58        return '%s(%s)' % (self._method, full_args)
59
60
61def call(method, *args, **kwargs):
62    return _MethodCall(method, args, kwargs)
63
64
65class BaseDroneUtility(object):
66    """
67    This class executes actual OS calls on the drone machine.
68
69    All paths going into and out of this class are absolute.
70    """
71    _WARNING_DURATION = 400
72
73    def __init__(self):
74        # Tattoo ourselves so that all of our spawn bears our mark.
75        os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
76
77        self.warnings = []
78        self._subcommands = []
79
80
81    def initialize(self, results_dir):
82        temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
83        if os.path.exists(temporary_directory):
84            # TODO crbug.com/391111: before we have a better solution to
85            # periodically cleanup tmp files, we have to use rm -rf to delete
86            # the whole folder. shutil.rmtree has performance issue when a
87            # folder has large amount of files, e.g., drone_tmp.
88            os.system('rm -rf %s' % temporary_directory)
89        self._ensure_directory_exists(temporary_directory)
90        # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
91        # and clean up build_externals so it NO-OP's.
92        build_externals = global_config.global_config.get_config_value(
93                scheduler_config.CONFIG_SECTION, 'drone_build_externals',
94                default=True, type=bool)
95        if build_externals:
96            build_extern_cmd = os.path.join(common.autotest_dir,
97                                            'utils', 'build_externals.py')
98            utils.run(build_extern_cmd)
99
100
101    def _warn(self, warning):
102        self.warnings.append(warning)
103
104
105    @staticmethod
106    def _check_pid_for_dark_mark(pid, open=open):
107        try:
108            env_file = open('/proc/%s/environ' % pid, 'rb')
109        except EnvironmentError:
110            return False
111        try:
112            env_data = env_file.read()
113        finally:
114            env_file.close()
115        return DARK_MARK_ENVIRONMENT_VAR in env_data
116
117
118    _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
119
120
121    @classmethod
122    def _get_process_info(cls):
123        """Parse ps output for all process information.
124
125        @returns A generator of dicts with cls._PS_ARGS as keys and
126            string values each representing a running process. eg:
127            {
128                'comm': command_name,
129                'pgid': process group id,
130                'ppid': parent process id,
131                'pid': process id,
132                'args': args the command was invoked with,
133            }
134        """
135        @retry.retry(subprocess.CalledProcessError,
136                     timeout_min=0.5, delay_sec=0.25)
137        def run_ps():
138            return subprocess.check_output(
139                    ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)])
140
141        ps_output = run_ps()
142        # split each line into the columns output by ps
143        split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
144        return (dict(itertools.izip(cls._PS_ARGS, line_components))
145                for line_components in split_lines)
146
147
148    def _refresh_processes(self, command_name, open=open,
149                           site_check_parse=None):
150        """Refreshes process info for the given command_name.
151
152        Examines ps output as returned by get_process_info and returns
153        the process dicts for processes matching the given command name.
154
155        @param command_name: The name of the command, eg 'autoserv'.
156
157        @return: A list of process info dictionaries as returned by
158            _get_process_info.
159        """
160        # The open argument is used for test injection.
161        check_mark = global_config.global_config.get_config_value(
162            'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
163        processes = []
164        for info in self._get_process_info():
165            is_parse = (site_check_parse and site_check_parse(info))
166            if info['comm'] == command_name or is_parse:
167                if (check_mark and not
168                        self._check_pid_for_dark_mark(info['pid'], open=open)):
169                    self._warn('%(comm)s process pid %(pid)s has no '
170                               'dark mark; ignoring.' % info)
171                    continue
172                processes.append(info)
173
174        return processes
175
176
177    def _read_pidfiles(self, pidfile_paths):
178        pidfiles = {}
179        for pidfile_path in pidfile_paths:
180            if not os.path.exists(pidfile_path):
181                continue
182            try:
183                file_object = open(pidfile_path, 'r')
184                pidfiles[pidfile_path] = file_object.read()
185                file_object.close()
186            except IOError:
187                continue
188        return pidfiles
189
190
191    def refresh(self, pidfile_paths):
192        """
193        pidfile_paths should be a list of paths to check for pidfiles.
194
195        Returns a dict containing:
196        * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
197        that exist.
198        * autoserv_processes: list of dicts corresponding to running autoserv
199        processes.  each dict contain pid, pgid, ppid, comm, and args (see
200        "man ps" for details).
201        * parse_processes: likewise, for parse processes.
202        * pidfiles_second_read: same info as pidfiles, but gathered after the
203        processes are scanned.
204        """
205        site_check_parse = utils.import_site_function(
206                __file__, 'autotest_lib.scheduler.site_drone_utility',
207                'check_parse', lambda x: False)
208        results = {
209            'pidfiles' : self._read_pidfiles(pidfile_paths),
210            # element 0 of _get_process_info() is the headers from `ps`
211            'all_processes' : list(self._get_process_info())[1:],
212            'autoserv_processes' : self._refresh_processes('autoserv'),
213            'parse_processes' : self._refresh_processes(
214                    'parse', site_check_parse=site_check_parse),
215            'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
216        }
217        return results
218
219
220    def get_signal_queue_to_kill(self, process):
221        """Get the signal queue needed to kill a process.
222
223        autoserv process has a handle on SIGTERM, in which it can do some
224        cleanup work. However, abort a process with SIGTERM then SIGKILL has
225        its overhead, detailed in following CL:
226        https://chromium-review.googlesource.com/230323
227        This method checks the process's argument and determine if SIGTERM is
228        required, and returns signal queue accordingly.
229
230        @param process: A drone_manager.Process object to be killed.
231
232        @return: The signal queue needed to kill a process.
233
234        """
235        signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
236        try:
237            ps_output = subprocess.check_output(
238                    ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
239            # For test running with server-side packaging, SIGTERM needs to be
240            # sent for autoserv process to destroy container used by the test.
241            if '--require-ssp' in ps_output:
242                logging.debug('PID %d requires SIGTERM to abort to cleanup '
243                              'container.', process.pid)
244                return signal_queue_with_sigterm
245        except subprocess.CalledProcessError:
246            # Ignore errors, return the signal queue with SIGTERM to be safe.
247            return signal_queue_with_sigterm
248        # Default to kill the process with SIGKILL directly.
249        return (signal.SIGKILL,)
250
251
252    def kill_processes(self, process_list):
253        """Send signals escalating in severity to the processes in process_list.
254
255        @param process_list: A list of drone_manager.Process objects
256                             representing the processes to kill.
257        """
258        try:
259            logging.info('List of process to be killed: %s', process_list)
260            processes_to_kill = {}
261            for p in process_list:
262                signal_queue = self.get_signal_queue_to_kill(p)
263                processes_to_kill[signal_queue] = (
264                        processes_to_kill.get(signal_queue, []) + [p])
265            sig_counts = {}
266            for signal_queue, processes in processes_to_kill.iteritems():
267                sig_counts.update(utils.nuke_pids(
268                        [-process.pid for process in processes],
269                        signal_queue=signal_queue))
270        except error.AutoservRunError as e:
271            self._warn('Error occured when killing processes. Error: %s' % e)
272
273
274    def _convert_old_host_log(self, log_path):
275        """
276        For backwards compatibility only.  This can safely be removed in the
277        future.
278
279        The scheduler used to create files at results/hosts/<hostname>, and
280        append all host logs to that file.  Now, it creates directories at
281        results/hosts/<hostname>, and places individual timestamped log files
282        into that directory.
283
284        This can be a problem the first time the scheduler runs after upgrading.
285        To work around that, we'll look for a file at the path where the
286        directory should be, and if we find one, we'll automatically convert it
287        to a directory containing the old logfile.
288        """
289        # move the file out of the way
290        temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
291        base_name = os.path.basename(log_path)
292        temp_path = os.path.join(temp_dir, base_name)
293        os.rename(log_path, temp_path)
294
295        os.mkdir(log_path)
296
297        # and move it into the new directory
298        os.rename(temp_path, os.path.join(log_path, 'old_log'))
299        os.rmdir(temp_dir)
300
301
302    def _ensure_directory_exists(self, path):
303        if os.path.isdir(path):
304            return
305
306        if os.path.exists(path):
307            # path exists already, but as a file, not a directory
308            if '/hosts/' in path:
309                self._convert_old_host_log(path)
310                return
311            else:
312                raise IOError('Path %s exists as a file, not a directory')
313
314        os.makedirs(path)
315
316
317    def execute_command(self, command, working_directory, log_file,
318                        pidfile_name):
319        out_file = None
320        if log_file:
321            self._ensure_directory_exists(os.path.dirname(log_file))
322            try:
323                out_file = open(log_file, 'a')
324                separator = ('*' * 80) + '\n'
325                out_file.write('\n' + separator)
326                out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
327                out_file.write(separator)
328            except (OSError, IOError):
329                email_manager.manager.log_stacktrace(
330                    'Error opening log file %s' % log_file)
331
332        if not out_file:
333            out_file = open('/dev/null', 'w')
334
335        in_devnull = open('/dev/null', 'r')
336
337        self._ensure_directory_exists(working_directory)
338        pidfile_path = os.path.join(working_directory, pidfile_name)
339        if os.path.exists(pidfile_path):
340            self._warn('Pidfile %s already exists' % pidfile_path)
341            os.remove(pidfile_path)
342
343        subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
344                         stdin=in_devnull)
345        out_file.close()
346        in_devnull.close()
347
348
349    def write_to_file(self, file_path, contents, is_retry=False):
350        """Write the specified contents to the end of the given file.
351
352        @param file_path: Path to the file.
353        @param contents: Content to be written to the file.
354        @param is_retry: True if this is a retry after file permission be
355                         corrected.
356        """
357        self._ensure_directory_exists(os.path.dirname(file_path))
358        try:
359            file_object = open(file_path, 'a')
360            file_object.write(contents)
361            file_object.close()
362        except IOError as e:
363            # TODO(dshi): crbug.com/459344 Remove following retry when test
364            # container can be unprivileged container.
365            # If write failed with error 'Permission denied', one possible cause
366            # is that the file was created in a container and thus owned by
367            # root. If so, fix the file permission, and try again.
368            if e.errno == 13 and not is_retry:
369                logging.error('Error write to file %s: %s. Will be retried.',
370                              file_path, e)
371                utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
372                utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
373                self.write_to_file(file_path, contents, is_retry=True)
374            else:
375                self._warn('Error write to file %s: %s' % (file_path, e))
376
377
378    def copy_file_or_directory(self, source_path, destination_path):
379        """
380        This interface is designed to match server.hosts.abstract_ssh.get_file
381        (and send_file).  That is, if the source_path ends with a slash, the
382        contents of the directory are copied; otherwise, the directory iself is
383        copied.
384        """
385        if self._same_file(source_path, destination_path):
386            return
387        self._ensure_directory_exists(os.path.dirname(destination_path))
388        if source_path.endswith('/'):
389            # copying a directory's contents to another directory
390            assert os.path.isdir(source_path)
391            assert os.path.isdir(destination_path)
392            for filename in os.listdir(source_path):
393                self.copy_file_or_directory(
394                    os.path.join(source_path, filename),
395                    os.path.join(destination_path, filename))
396        elif os.path.isdir(source_path):
397            try:
398                shutil.copytree(source_path, destination_path, symlinks=True)
399            except shutil.Error:
400                # Ignore copy directory error due to missing files. The cause
401                # of this behavior is that, gs_offloader zips up folders with
402                # too many files. There is a race condition that repair job
403                # tries to copy provision job results to the test job result
404                # folder, meanwhile gs_offloader is uploading the provision job
405                # result and zipping up folders which contains too many files.
406                pass
407        elif os.path.islink(source_path):
408            # copied from shutil.copytree()
409            link_to = os.readlink(source_path)
410            os.symlink(link_to, destination_path)
411        else:
412            shutil.copy(source_path, destination_path)
413
414
415    def _same_file(self, source_path, destination_path):
416        """Checks if the source and destination are the same
417
418        Returns True if the destination is the same as the source, False
419        otherwise. Also returns False if the destination does not exist.
420        """
421        if not os.path.exists(destination_path):
422            return False
423        return os.path.samefile(source_path, destination_path)
424
425
426    def cleanup_orphaned_containers(self):
427        """Run lxc_cleanup script to clean up orphaned container.
428        """
429        # The script needs to run with sudo as the containers are privileged.
430        # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
431        # container can be unprivileged container.
432        command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
433                   LXC_CLEANUP_LOG_FILE]
434        logging.info('Running %s', command)
435        # stdout and stderr needs to be direct to /dev/null, otherwise existing
436        # of drone_utils process will kill lxc_cleanup script.
437        subprocess.Popen(
438                command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
439                stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
440
441
442    def wait_for_all_async_commands(self):
443        for subproc in self._subcommands:
444            subproc.fork_waitfor()
445        self._subcommands = []
446
447
448    def _poll_async_commands(self):
449        still_running = []
450        for subproc in self._subcommands:
451            if subproc.poll() is None:
452                still_running.append(subproc)
453        self._subcommands = still_running
454
455
456    def _wait_for_some_async_commands(self):
457        self._poll_async_commands()
458        max_processes = scheduler_config.config.max_transfer_processes
459        while len(self._subcommands) >= max_processes:
460            time.sleep(1)
461            self._poll_async_commands()
462
463
464    def run_async_command(self, function, args):
465        subproc = subcommand.subcommand(function, args)
466        self._subcommands.append(subproc)
467        subproc.fork_start()
468
469
470    def _sync_get_file_from(self, hostname, source_path, destination_path):
471        logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
472                      'destination_path: %s', hostname, source_path,
473                      destination_path)
474        self._ensure_directory_exists(os.path.dirname(destination_path))
475        host = create_host(hostname)
476        host.get_file(source_path, destination_path, delete_dest=True)
477
478
479    def get_file_from(self, hostname, source_path, destination_path):
480        self.run_async_command(self._sync_get_file_from,
481                               (hostname, source_path, destination_path))
482
483
484    def sync_send_file_to(self, hostname, source_path, destination_path,
485                           can_fail):
486        logging.debug('sync_send_file_to. hostname: %s, source_path: %s, '
487                      'destination_path: %s, can_fail:%s', hostname,
488                      source_path, destination_path, can_fail)
489        host = create_host(hostname)
490        try:
491            host.run('mkdir -p ' + os.path.dirname(destination_path))
492            host.send_file(source_path, destination_path, delete_dest=True)
493        except error.AutoservError:
494            if not can_fail:
495                raise
496
497            if os.path.isdir(source_path):
498                failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
499                file_object = open(failed_file, 'w')
500                try:
501                    file_object.write('%s:%s\n%s\n%s' %
502                                      (hostname, destination_path,
503                                       datetime.datetime.now(),
504                                       traceback.format_exc()))
505                finally:
506                    file_object.close()
507            else:
508                copy_to = destination_path + _TRANSFER_FAILED_FILE
509                self._ensure_directory_exists(os.path.dirname(copy_to))
510                self.copy_file_or_directory(source_path, copy_to)
511
512
513    def send_file_to(self, hostname, source_path, destination_path,
514                     can_fail=False):
515        self.run_async_command(self.sync_send_file_to,
516                               (hostname, source_path, destination_path,
517                                can_fail))
518
519
520    def _report_long_execution(self, calls, duration):
521        call_count = {}
522        for call in calls:
523            call_count.setdefault(call._method, 0)
524            call_count[call._method] += 1
525        call_summary = '\n'.join('%d %s' % (count, method)
526                                 for method, count in call_count.iteritems())
527        self._warn('Execution took %f sec\n%s' % (duration, call_summary))
528
529
530    def execute_calls(self, calls):
531        results = []
532        start_time = time.time()
533        max_processes = scheduler_config.config.max_transfer_processes
534        for method_call in calls:
535            results.append(method_call.execute_on(self))
536            if len(self._subcommands) >= max_processes:
537                self._wait_for_some_async_commands()
538        self.wait_for_all_async_commands()
539
540        duration = time.time() - start_time
541        if duration > self._WARNING_DURATION:
542            self._report_long_execution(calls, duration)
543
544        warnings = self.warnings
545        self.warnings = []
546        return dict(results=results, warnings=warnings)
547
548
549def create_host(hostname):
550    username = global_config.global_config.get_config_value(
551        'SCHEDULER', hostname + '_username', default=getpass.getuser())
552    return hosts.SSHHost(hostname, user=username)
553
554
555def parse_input():
556    input_chunks = []
557    chunk_of_input = sys.stdin.read()
558    while chunk_of_input:
559        input_chunks.append(chunk_of_input)
560        chunk_of_input = sys.stdin.read()
561    pickled_input = ''.join(input_chunks)
562
563    try:
564        return pickle.loads(pickled_input)
565    except Exception:
566        separator = '*' * 50
567        raise ValueError('Unpickling input failed\n'
568                         'Input: %r\n'
569                         'Exception from pickle:\n'
570                         '%s\n%s\n%s' %
571                         (pickled_input, separator, traceback.format_exc(),
572                          separator))
573
574
575def _parse_args(args):
576    parser = argparse.ArgumentParser(description='Local drone process manager.')
577    parser.add_argument('--call_time',
578                        help='Time this process was invoked from the master',
579                        default=None, type=float)
580    return parser.parse_args(args)
581
582
583SiteDroneUtility = utils.import_site_class(
584   __file__, 'autotest_lib.scheduler.site_drone_utility',
585   'SiteDroneUtility', BaseDroneUtility)
586
587
588class DroneUtility(SiteDroneUtility):
589    pass
590
591
592def return_data(data):
593    print pickle.dumps(data)
594
595
596def main():
597    logging_manager.configure_logging(
598            drone_logging_config.DroneLoggingConfig())
599    calls = parse_input()
600    args = _parse_args(sys.argv[1:])
601
602    drone_utility = DroneUtility()
603    return_value = drone_utility.execute_calls(calls)
604    return_data(return_value)
605
606
607if __name__ == '__main__':
608    main()
609