1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4"""Utility module that executes management commands on the drone. 5 61. This is the module responsible for orchestrating processes on a drone. 72. It receives instructions via stdin and replies via stdout. 83. Each invocation is responsible for the initiation of a set of batched calls. 94. The batched calls may be synchronous or asynchronous. 105. The caller is responsible for monitoring asynchronous calls through pidfiles. 11""" 12 13 14import argparse 15import pickle, subprocess, os, shutil, sys, time, signal, getpass 16import datetime, traceback, tempfile, itertools, logging 17import common 18from autotest_lib.client.common_lib import utils, global_config, error 19from autotest_lib.client.common_lib import logging_manager 20from autotest_lib.client.common_lib.cros import retry 21from autotest_lib.client.common_lib.cros.graphite import autotest_stats 22from autotest_lib.scheduler import drone_logging_config 23from autotest_lib.scheduler import email_manager, scheduler_config 24from autotest_lib.server import hosts, subcommand 25 26 27# An environment variable we add to the environment to enable us to 28# distinguish processes we started from those that were started by 29# something else during recovery. Name credit goes to showard. ;) 30DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK' 31 32_TEMPORARY_DIRECTORY = 'drone_tmp' 33_TRANSFER_FAILED_FILE = '.transfer_failed' 34 35# script and log file for cleaning up orphaned lxc containers. 36LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils', 37 'lxc_cleanup.py') 38LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs', 39 'lxc_cleanup.log') 40 41 42_STATS_KEY = 'drone_utility' 43timer = autotest_stats.Timer(_STATS_KEY) 44 45class _MethodCall(object): 46 def __init__(self, method, args, kwargs): 47 self._method = method 48 self._args = args 49 self._kwargs = kwargs 50 51 52 def execute_on(self, drone_utility): 53 method = getattr(drone_utility, self._method) 54 return method(*self._args, **self._kwargs) 55 56 57 def __str__(self): 58 args = ', '.join(repr(arg) for arg in self._args) 59 kwargs = ', '.join('%s=%r' % (key, value) for key, value in 60 self._kwargs.iteritems()) 61 full_args = ', '.join(item for item in (args, kwargs) if item) 62 return '%s(%s)' % (self._method, full_args) 63 64 65def call(method, *args, **kwargs): 66 return _MethodCall(method, args, kwargs) 67 68 69class BaseDroneUtility(object): 70 """ 71 This class executes actual OS calls on the drone machine. 72 73 All paths going into and out of this class are absolute. 74 """ 75 _WARNING_DURATION = 400 76 77 def __init__(self): 78 # Tattoo ourselves so that all of our spawn bears our mark. 79 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid())) 80 81 self.warnings = [] 82 self._subcommands = [] 83 84 85 def initialize(self, results_dir): 86 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY) 87 if os.path.exists(temporary_directory): 88 # TODO crbug.com/391111: before we have a better solution to 89 # periodically cleanup tmp files, we have to use rm -rf to delete 90 # the whole folder. shutil.rmtree has performance issue when a 91 # folder has large amount of files, e.g., drone_tmp. 92 os.system('rm -rf %s' % temporary_directory) 93 self._ensure_directory_exists(temporary_directory) 94 # TODO (sbasi) crbug.com/345011 - Remove this configuration variable 95 # and clean up build_externals so it NO-OP's. 96 build_externals = global_config.global_config.get_config_value( 97 scheduler_config.CONFIG_SECTION, 'drone_build_externals', 98 default=True, type=bool) 99 if build_externals: 100 build_extern_cmd = os.path.join(common.autotest_dir, 101 'utils', 'build_externals.py') 102 utils.run(build_extern_cmd) 103 104 105 def _warn(self, warning): 106 self.warnings.append(warning) 107 108 109 @staticmethod 110 def _check_pid_for_dark_mark(pid, open=open): 111 try: 112 env_file = open('/proc/%s/environ' % pid, 'rb') 113 except EnvironmentError: 114 return False 115 try: 116 env_data = env_file.read() 117 finally: 118 env_file.close() 119 return DARK_MARK_ENVIRONMENT_VAR in env_data 120 121 122 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args') 123 124 125 @classmethod 126 @timer.decorate 127 def _get_process_info(cls): 128 """Parse ps output for all process information. 129 130 @returns A generator of dicts with cls._PS_ARGS as keys and 131 string values each representing a running process. eg: 132 { 133 'comm': command_name, 134 'pgid': process group id, 135 'ppid': parent process id, 136 'pid': process id, 137 'args': args the command was invoked with, 138 } 139 """ 140 @retry.retry(subprocess.CalledProcessError, 141 timeout_min=0.5, delay_sec=0.25) 142 def run_ps(): 143 return subprocess.check_output( 144 ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)]) 145 146 ps_output = run_ps() 147 # split each line into the columns output by ps 148 split_lines = [line.split(None, 4) for line in ps_output.splitlines()] 149 return (dict(itertools.izip(cls._PS_ARGS, line_components)) 150 for line_components in split_lines) 151 152 153 def _refresh_processes(self, command_name, open=open, 154 site_check_parse=None): 155 """Refreshes process info for the given command_name. 156 157 Examines ps output as returned by get_process_info and returns 158 the process dicts for processes matching the given command name. 159 160 @param command_name: The name of the command, eg 'autoserv'. 161 162 @return: A list of process info dictionaries as returned by 163 _get_process_info. 164 """ 165 # The open argument is used for test injection. 166 check_mark = global_config.global_config.get_config_value( 167 'SCHEDULER', 'check_processes_for_dark_mark', bool, False) 168 processes = [] 169 for info in self._get_process_info(): 170 is_parse = (site_check_parse and site_check_parse(info)) 171 if info['comm'] == command_name or is_parse: 172 if (check_mark and not 173 self._check_pid_for_dark_mark(info['pid'], open=open)): 174 self._warn('%(comm)s process pid %(pid)s has no ' 175 'dark mark; ignoring.' % info) 176 continue 177 processes.append(info) 178 179 return processes 180 181 182 @timer.decorate 183 def _read_pidfiles(self, pidfile_paths): 184 pidfiles = {} 185 for pidfile_path in pidfile_paths: 186 if not os.path.exists(pidfile_path): 187 continue 188 try: 189 file_object = open(pidfile_path, 'r') 190 pidfiles[pidfile_path] = file_object.read() 191 file_object.close() 192 except IOError: 193 continue 194 return pidfiles 195 196 197 @timer.decorate 198 def refresh(self, pidfile_paths): 199 """ 200 pidfile_paths should be a list of paths to check for pidfiles. 201 202 Returns a dict containing: 203 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles 204 that exist. 205 * autoserv_processes: list of dicts corresponding to running autoserv 206 processes. each dict contain pid, pgid, ppid, comm, and args (see 207 "man ps" for details). 208 * parse_processes: likewise, for parse processes. 209 * pidfiles_second_read: same info as pidfiles, but gathered after the 210 processes are scanned. 211 """ 212 site_check_parse = utils.import_site_function( 213 __file__, 'autotest_lib.scheduler.site_drone_utility', 214 'check_parse', lambda x: False) 215 results = { 216 'pidfiles' : self._read_pidfiles(pidfile_paths), 217 # element 0 of _get_process_info() is the headers from `ps` 218 'all_processes' : list(self._get_process_info())[1:], 219 'autoserv_processes' : self._refresh_processes('autoserv'), 220 'parse_processes' : self._refresh_processes( 221 'parse', site_check_parse=site_check_parse), 222 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths), 223 } 224 return results 225 226 227 def get_signal_queue_to_kill(self, process): 228 """Get the signal queue needed to kill a process. 229 230 autoserv process has a handle on SIGTERM, in which it can do some 231 cleanup work. However, abort a process with SIGTERM then SIGKILL has 232 its overhead, detailed in following CL: 233 https://chromium-review.googlesource.com/230323 234 This method checks the process's argument and determine if SIGTERM is 235 required, and returns signal queue accordingly. 236 237 @param process: A drone_manager.Process object to be killed. 238 239 @return: The signal queue needed to kill a process. 240 241 """ 242 signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL) 243 try: 244 ps_output = subprocess.check_output( 245 ['/bin/ps', '-p', str(process.pid), '-o', 'args']) 246 # For test running with server-side packaging, SIGTERM needs to be 247 # sent for autoserv process to destroy container used by the test. 248 if '--require-ssp' in ps_output: 249 logging.debug('PID %d requires SIGTERM to abort to cleanup ' 250 'container.', process.pid) 251 return signal_queue_with_sigterm 252 except subprocess.CalledProcessError: 253 # Ignore errors, return the signal queue with SIGTERM to be safe. 254 return signal_queue_with_sigterm 255 # Default to kill the process with SIGKILL directly. 256 return (signal.SIGKILL,) 257 258 259 @timer.decorate 260 def kill_processes(self, process_list): 261 """Send signals escalating in severity to the processes in process_list. 262 263 @param process_list: A list of drone_manager.Process objects 264 representing the processes to kill. 265 """ 266 kill_proc_key = 'kill_processes' 267 autotest_stats.Gauge(_STATS_KEY).send('%s.%s' % (kill_proc_key, 'net'), 268 len(process_list)) 269 try: 270 logging.info('List of process to be killed: %s', process_list) 271 processes_to_kill = {} 272 for p in process_list: 273 signal_queue = self.get_signal_queue_to_kill(p) 274 processes_to_kill[signal_queue] = ( 275 processes_to_kill.get(signal_queue, []) + [p]) 276 sig_counts = {} 277 for signal_queue, processes in processes_to_kill.iteritems(): 278 sig_counts.update(utils.nuke_pids( 279 [-process.pid for process in processes], 280 signal_queue=signal_queue)) 281 for name, count in sig_counts.iteritems(): 282 autotest_stats.Gauge(_STATS_KEY).send( 283 '%s.%s' % (kill_proc_key, name), count) 284 except error.AutoservRunError as e: 285 self._warn('Error occured when killing processes. Error: %s' % e) 286 287 288 def _convert_old_host_log(self, log_path): 289 """ 290 For backwards compatibility only. This can safely be removed in the 291 future. 292 293 The scheduler used to create files at results/hosts/<hostname>, and 294 append all host logs to that file. Now, it creates directories at 295 results/hosts/<hostname>, and places individual timestamped log files 296 into that directory. 297 298 This can be a problem the first time the scheduler runs after upgrading. 299 To work around that, we'll look for a file at the path where the 300 directory should be, and if we find one, we'll automatically convert it 301 to a directory containing the old logfile. 302 """ 303 # move the file out of the way 304 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log') 305 base_name = os.path.basename(log_path) 306 temp_path = os.path.join(temp_dir, base_name) 307 os.rename(log_path, temp_path) 308 309 os.mkdir(log_path) 310 311 # and move it into the new directory 312 os.rename(temp_path, os.path.join(log_path, 'old_log')) 313 os.rmdir(temp_dir) 314 315 316 def _ensure_directory_exists(self, path): 317 if os.path.isdir(path): 318 return 319 320 if os.path.exists(path): 321 # path exists already, but as a file, not a directory 322 if '/hosts/' in path: 323 self._convert_old_host_log(path) 324 return 325 else: 326 raise IOError('Path %s exists as a file, not a directory') 327 328 os.makedirs(path) 329 330 331 def execute_command(self, command, working_directory, log_file, 332 pidfile_name): 333 out_file = None 334 if log_file: 335 self._ensure_directory_exists(os.path.dirname(log_file)) 336 try: 337 out_file = open(log_file, 'a') 338 separator = ('*' * 80) + '\n' 339 out_file.write('\n' + separator) 340 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command)) 341 out_file.write(separator) 342 except (OSError, IOError): 343 email_manager.manager.log_stacktrace( 344 'Error opening log file %s' % log_file) 345 346 if not out_file: 347 out_file = open('/dev/null', 'w') 348 349 in_devnull = open('/dev/null', 'r') 350 351 self._ensure_directory_exists(working_directory) 352 pidfile_path = os.path.join(working_directory, pidfile_name) 353 if os.path.exists(pidfile_path): 354 self._warn('Pidfile %s already exists' % pidfile_path) 355 os.remove(pidfile_path) 356 357 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT, 358 stdin=in_devnull) 359 out_file.close() 360 in_devnull.close() 361 362 363 def write_to_file(self, file_path, contents, is_retry=False): 364 """Write the specified contents to the end of the given file. 365 366 @param file_path: Path to the file. 367 @param contents: Content to be written to the file. 368 @param is_retry: True if this is a retry after file permission be 369 corrected. 370 """ 371 self._ensure_directory_exists(os.path.dirname(file_path)) 372 try: 373 file_object = open(file_path, 'a') 374 file_object.write(contents) 375 file_object.close() 376 except IOError as e: 377 # TODO(dshi): crbug.com/459344 Remove following retry when test 378 # container can be unprivileged container. 379 # If write failed with error 'Permission denied', one possible cause 380 # is that the file was created in a container and thus owned by 381 # root. If so, fix the file permission, and try again. 382 if e.errno == 13 and not is_retry: 383 logging.error('Error write to file %s: %s. Will be retried.', 384 file_path, e) 385 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path)) 386 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path)) 387 self.write_to_file(file_path, contents, is_retry=True) 388 else: 389 self._warn('Error write to file %s: %s' % (file_path, e)) 390 391 392 def copy_file_or_directory(self, source_path, destination_path): 393 """ 394 This interface is designed to match server.hosts.abstract_ssh.get_file 395 (and send_file). That is, if the source_path ends with a slash, the 396 contents of the directory are copied; otherwise, the directory iself is 397 copied. 398 """ 399 if self._same_file(source_path, destination_path): 400 return 401 self._ensure_directory_exists(os.path.dirname(destination_path)) 402 if source_path.endswith('/'): 403 # copying a directory's contents to another directory 404 assert os.path.isdir(source_path) 405 assert os.path.isdir(destination_path) 406 for filename in os.listdir(source_path): 407 self.copy_file_or_directory( 408 os.path.join(source_path, filename), 409 os.path.join(destination_path, filename)) 410 elif os.path.isdir(source_path): 411 shutil.copytree(source_path, destination_path, symlinks=True) 412 elif os.path.islink(source_path): 413 # copied from shutil.copytree() 414 link_to = os.readlink(source_path) 415 os.symlink(link_to, destination_path) 416 else: 417 shutil.copy(source_path, destination_path) 418 419 420 def _same_file(self, source_path, destination_path): 421 """Checks if the source and destination are the same 422 423 Returns True if the destination is the same as the source, False 424 otherwise. Also returns False if the destination does not exist. 425 """ 426 if not os.path.exists(destination_path): 427 return False 428 return os.path.samefile(source_path, destination_path) 429 430 431 def cleanup_orphaned_containers(self): 432 """Run lxc_cleanup script to clean up orphaned container. 433 """ 434 # The script needs to run with sudo as the containers are privileged. 435 # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test 436 # container can be unprivileged container. 437 command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l', 438 LXC_CLEANUP_LOG_FILE] 439 logging.info('Running %s', command) 440 # stdout and stderr needs to be direct to /dev/null, otherwise existing 441 # of drone_utils process will kill lxc_cleanup script. 442 subprocess.Popen( 443 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'), 444 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp) 445 446 447 def wait_for_all_async_commands(self): 448 for subproc in self._subcommands: 449 subproc.fork_waitfor() 450 self._subcommands = [] 451 452 453 def _poll_async_commands(self): 454 still_running = [] 455 for subproc in self._subcommands: 456 if subproc.poll() is None: 457 still_running.append(subproc) 458 self._subcommands = still_running 459 460 461 def _wait_for_some_async_commands(self): 462 self._poll_async_commands() 463 max_processes = scheduler_config.config.max_transfer_processes 464 while len(self._subcommands) >= max_processes: 465 time.sleep(1) 466 self._poll_async_commands() 467 468 469 def run_async_command(self, function, args): 470 subproc = subcommand.subcommand(function, args) 471 self._subcommands.append(subproc) 472 subproc.fork_start() 473 474 475 def _sync_get_file_from(self, hostname, source_path, destination_path): 476 logging.debug('_sync_get_file_from hostname: %s, source_path: %s,' 477 'destination_path: %s', hostname, source_path, 478 destination_path) 479 self._ensure_directory_exists(os.path.dirname(destination_path)) 480 host = create_host(hostname) 481 host.get_file(source_path, destination_path, delete_dest=True) 482 483 484 def get_file_from(self, hostname, source_path, destination_path): 485 self.run_async_command(self._sync_get_file_from, 486 (hostname, source_path, destination_path)) 487 488 489 def sync_send_file_to(self, hostname, source_path, destination_path, 490 can_fail): 491 logging.debug('sync_send_file_to. hostname: %s, source_path: %s, ' 492 'destination_path: %s, can_fail:%s', hostname, 493 source_path, destination_path, can_fail) 494 host = create_host(hostname) 495 try: 496 host.run('mkdir -p ' + os.path.dirname(destination_path)) 497 host.send_file(source_path, destination_path, delete_dest=True) 498 except error.AutoservError: 499 if not can_fail: 500 raise 501 502 if os.path.isdir(source_path): 503 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE) 504 file_object = open(failed_file, 'w') 505 try: 506 file_object.write('%s:%s\n%s\n%s' % 507 (hostname, destination_path, 508 datetime.datetime.now(), 509 traceback.format_exc())) 510 finally: 511 file_object.close() 512 else: 513 copy_to = destination_path + _TRANSFER_FAILED_FILE 514 self._ensure_directory_exists(os.path.dirname(copy_to)) 515 self.copy_file_or_directory(source_path, copy_to) 516 517 518 def send_file_to(self, hostname, source_path, destination_path, 519 can_fail=False): 520 self.run_async_command(self.sync_send_file_to, 521 (hostname, source_path, destination_path, 522 can_fail)) 523 524 525 def _report_long_execution(self, calls, duration): 526 call_count = {} 527 for call in calls: 528 call_count.setdefault(call._method, 0) 529 call_count[call._method] += 1 530 call_summary = '\n'.join('%d %s' % (count, method) 531 for method, count in call_count.iteritems()) 532 self._warn('Execution took %f sec\n%s' % (duration, call_summary)) 533 534 535 def execute_calls(self, calls): 536 results = [] 537 start_time = time.time() 538 max_processes = scheduler_config.config.max_transfer_processes 539 for method_call in calls: 540 results.append(method_call.execute_on(self)) 541 if len(self._subcommands) >= max_processes: 542 self._wait_for_some_async_commands() 543 self.wait_for_all_async_commands() 544 545 duration = time.time() - start_time 546 if duration > self._WARNING_DURATION: 547 self._report_long_execution(calls, duration) 548 549 warnings = self.warnings 550 self.warnings = [] 551 return dict(results=results, warnings=warnings) 552 553 554def create_host(hostname): 555 username = global_config.global_config.get_config_value( 556 'SCHEDULER', hostname + '_username', default=getpass.getuser()) 557 return hosts.SSHHost(hostname, user=username) 558 559 560def parse_input(): 561 input_chunks = [] 562 chunk_of_input = sys.stdin.read() 563 while chunk_of_input: 564 input_chunks.append(chunk_of_input) 565 chunk_of_input = sys.stdin.read() 566 pickled_input = ''.join(input_chunks) 567 568 try: 569 return pickle.loads(pickled_input) 570 except Exception: 571 separator = '*' * 50 572 raise ValueError('Unpickling input failed\n' 573 'Input: %r\n' 574 'Exception from pickle:\n' 575 '%s\n%s\n%s' % 576 (pickled_input, separator, traceback.format_exc(), 577 separator)) 578 579 580def _parse_args(args): 581 parser = argparse.ArgumentParser(description='Local drone process manager.') 582 parser.add_argument('--call_time', 583 help='Time this process was invoked from the master', 584 default=None, type=float) 585 return parser.parse_args(args) 586 587 588SiteDroneUtility = utils.import_site_class( 589 __file__, 'autotest_lib.scheduler.site_drone_utility', 590 'SiteDroneUtility', BaseDroneUtility) 591 592 593class DroneUtility(SiteDroneUtility): 594 pass 595 596 597def return_data(data): 598 print pickle.dumps(data) 599 600 601def main(): 602 logging_manager.configure_logging( 603 drone_logging_config.DroneLoggingConfig()) 604 with timer.get_client('decode'): 605 calls = parse_input() 606 args = _parse_args(sys.argv[1:]) 607 if args.call_time is not None: 608 autotest_stats.Gauge(_STATS_KEY).send('invocation_overhead', 609 time.time() - args.call_time) 610 611 drone_utility = DroneUtility() 612 return_value = drone_utility.execute_calls(calls) 613 with timer.get_client('encode'): 614 return_data(return_value) 615 616 617if __name__ == '__main__': 618 main() 619