1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4"""Utility module that executes management commands on the drone. 5 61. This is the module responsible for orchestrating processes on a drone. 72. It receives instructions via stdin and replies via stdout. 83. Each invocation is responsible for the initiation of a set of batched calls. 94. The batched calls may be synchronous or asynchronous. 105. The caller is responsible for monitoring asynchronous calls through pidfiles. 11""" 12 13 14import argparse 15import pickle, subprocess, os, shutil, sys, time, signal, getpass 16import datetime, traceback, tempfile, itertools, logging 17import common 18from autotest_lib.client.common_lib import utils, global_config, error 19from autotest_lib.client.common_lib import logging_manager 20from autotest_lib.client.common_lib.cros import retry 21from autotest_lib.scheduler import drone_logging_config 22from autotest_lib.scheduler import email_manager, scheduler_config 23from autotest_lib.server import hosts, subcommand 24 25 26# An environment variable we add to the environment to enable us to 27# distinguish processes we started from those that were started by 28# something else during recovery. Name credit goes to showard. ;) 29DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK' 30 31_TEMPORARY_DIRECTORY = 'drone_tmp' 32_TRANSFER_FAILED_FILE = '.transfer_failed' 33 34# script and log file for cleaning up orphaned lxc containers. 35LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils', 36 'lxc_cleanup.py') 37LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs', 38 'lxc_cleanup.log') 39 40 41class _MethodCall(object): 42 def __init__(self, method, args, kwargs): 43 self._method = method 44 self._args = args 45 self._kwargs = kwargs 46 47 48 def execute_on(self, drone_utility): 49 method = getattr(drone_utility, self._method) 50 return method(*self._args, **self._kwargs) 51 52 53 def __str__(self): 54 args = ', '.join(repr(arg) for arg in self._args) 55 kwargs = ', '.join('%s=%r' % (key, value) for key, value in 56 self._kwargs.iteritems()) 57 full_args = ', '.join(item for item in (args, kwargs) if item) 58 return '%s(%s)' % (self._method, full_args) 59 60 61def call(method, *args, **kwargs): 62 return _MethodCall(method, args, kwargs) 63 64 65class BaseDroneUtility(object): 66 """ 67 This class executes actual OS calls on the drone machine. 68 69 All paths going into and out of this class are absolute. 70 """ 71 _WARNING_DURATION = 400 72 73 def __init__(self): 74 # Tattoo ourselves so that all of our spawn bears our mark. 75 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid())) 76 77 self.warnings = [] 78 self._subcommands = [] 79 80 81 def initialize(self, results_dir): 82 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY) 83 if os.path.exists(temporary_directory): 84 # TODO crbug.com/391111: before we have a better solution to 85 # periodically cleanup tmp files, we have to use rm -rf to delete 86 # the whole folder. shutil.rmtree has performance issue when a 87 # folder has large amount of files, e.g., drone_tmp. 88 os.system('rm -rf %s' % temporary_directory) 89 self._ensure_directory_exists(temporary_directory) 90 # TODO (sbasi) crbug.com/345011 - Remove this configuration variable 91 # and clean up build_externals so it NO-OP's. 92 build_externals = global_config.global_config.get_config_value( 93 scheduler_config.CONFIG_SECTION, 'drone_build_externals', 94 default=True, type=bool) 95 if build_externals: 96 build_extern_cmd = os.path.join(common.autotest_dir, 97 'utils', 'build_externals.py') 98 utils.run(build_extern_cmd) 99 100 101 def _warn(self, warning): 102 self.warnings.append(warning) 103 104 105 @staticmethod 106 def _check_pid_for_dark_mark(pid, open=open): 107 try: 108 env_file = open('/proc/%s/environ' % pid, 'rb') 109 except EnvironmentError: 110 return False 111 try: 112 env_data = env_file.read() 113 finally: 114 env_file.close() 115 return DARK_MARK_ENVIRONMENT_VAR in env_data 116 117 118 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args') 119 120 121 @classmethod 122 def _get_process_info(cls): 123 """Parse ps output for all process information. 124 125 @returns A generator of dicts with cls._PS_ARGS as keys and 126 string values each representing a running process. eg: 127 { 128 'comm': command_name, 129 'pgid': process group id, 130 'ppid': parent process id, 131 'pid': process id, 132 'args': args the command was invoked with, 133 } 134 """ 135 @retry.retry(subprocess.CalledProcessError, 136 timeout_min=0.5, delay_sec=0.25) 137 def run_ps(): 138 return subprocess.check_output( 139 ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)]) 140 141 ps_output = run_ps() 142 # split each line into the columns output by ps 143 split_lines = [line.split(None, 4) for line in ps_output.splitlines()] 144 return (dict(itertools.izip(cls._PS_ARGS, line_components)) 145 for line_components in split_lines) 146 147 148 def _refresh_processes(self, command_name, open=open, 149 site_check_parse=None): 150 """Refreshes process info for the given command_name. 151 152 Examines ps output as returned by get_process_info and returns 153 the process dicts for processes matching the given command name. 154 155 @param command_name: The name of the command, eg 'autoserv'. 156 157 @return: A list of process info dictionaries as returned by 158 _get_process_info. 159 """ 160 # The open argument is used for test injection. 161 check_mark = global_config.global_config.get_config_value( 162 'SCHEDULER', 'check_processes_for_dark_mark', bool, False) 163 processes = [] 164 for info in self._get_process_info(): 165 is_parse = (site_check_parse and site_check_parse(info)) 166 if info['comm'] == command_name or is_parse: 167 if (check_mark and not 168 self._check_pid_for_dark_mark(info['pid'], open=open)): 169 self._warn('%(comm)s process pid %(pid)s has no ' 170 'dark mark; ignoring.' % info) 171 continue 172 processes.append(info) 173 174 return processes 175 176 177 def _read_pidfiles(self, pidfile_paths): 178 pidfiles = {} 179 for pidfile_path in pidfile_paths: 180 if not os.path.exists(pidfile_path): 181 continue 182 try: 183 file_object = open(pidfile_path, 'r') 184 pidfiles[pidfile_path] = file_object.read() 185 file_object.close() 186 except IOError: 187 continue 188 return pidfiles 189 190 191 def refresh(self, pidfile_paths): 192 """ 193 pidfile_paths should be a list of paths to check for pidfiles. 194 195 Returns a dict containing: 196 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles 197 that exist. 198 * autoserv_processes: list of dicts corresponding to running autoserv 199 processes. each dict contain pid, pgid, ppid, comm, and args (see 200 "man ps" for details). 201 * parse_processes: likewise, for parse processes. 202 * pidfiles_second_read: same info as pidfiles, but gathered after the 203 processes are scanned. 204 """ 205 site_check_parse = utils.import_site_function( 206 __file__, 'autotest_lib.scheduler.site_drone_utility', 207 'check_parse', lambda x: False) 208 results = { 209 'pidfiles' : self._read_pidfiles(pidfile_paths), 210 # element 0 of _get_process_info() is the headers from `ps` 211 'all_processes' : list(self._get_process_info())[1:], 212 'autoserv_processes' : self._refresh_processes('autoserv'), 213 'parse_processes' : self._refresh_processes( 214 'parse', site_check_parse=site_check_parse), 215 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths), 216 } 217 return results 218 219 220 def get_signal_queue_to_kill(self, process): 221 """Get the signal queue needed to kill a process. 222 223 autoserv process has a handle on SIGTERM, in which it can do some 224 cleanup work. However, abort a process with SIGTERM then SIGKILL has 225 its overhead, detailed in following CL: 226 https://chromium-review.googlesource.com/230323 227 This method checks the process's argument and determine if SIGTERM is 228 required, and returns signal queue accordingly. 229 230 @param process: A drone_manager.Process object to be killed. 231 232 @return: The signal queue needed to kill a process. 233 234 """ 235 signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL) 236 try: 237 ps_output = subprocess.check_output( 238 ['/bin/ps', '-p', str(process.pid), '-o', 'args']) 239 # For test running with server-side packaging, SIGTERM needs to be 240 # sent for autoserv process to destroy container used by the test. 241 if '--require-ssp' in ps_output: 242 logging.debug('PID %d requires SIGTERM to abort to cleanup ' 243 'container.', process.pid) 244 return signal_queue_with_sigterm 245 except subprocess.CalledProcessError: 246 # Ignore errors, return the signal queue with SIGTERM to be safe. 247 return signal_queue_with_sigterm 248 # Default to kill the process with SIGKILL directly. 249 return (signal.SIGKILL,) 250 251 252 def kill_processes(self, process_list): 253 """Send signals escalating in severity to the processes in process_list. 254 255 @param process_list: A list of drone_manager.Process objects 256 representing the processes to kill. 257 """ 258 try: 259 logging.info('List of process to be killed: %s', process_list) 260 processes_to_kill = {} 261 for p in process_list: 262 signal_queue = self.get_signal_queue_to_kill(p) 263 processes_to_kill[signal_queue] = ( 264 processes_to_kill.get(signal_queue, []) + [p]) 265 sig_counts = {} 266 for signal_queue, processes in processes_to_kill.iteritems(): 267 sig_counts.update(utils.nuke_pids( 268 [-process.pid for process in processes], 269 signal_queue=signal_queue)) 270 except error.AutoservRunError as e: 271 self._warn('Error occured when killing processes. Error: %s' % e) 272 273 274 def _convert_old_host_log(self, log_path): 275 """ 276 For backwards compatibility only. This can safely be removed in the 277 future. 278 279 The scheduler used to create files at results/hosts/<hostname>, and 280 append all host logs to that file. Now, it creates directories at 281 results/hosts/<hostname>, and places individual timestamped log files 282 into that directory. 283 284 This can be a problem the first time the scheduler runs after upgrading. 285 To work around that, we'll look for a file at the path where the 286 directory should be, and if we find one, we'll automatically convert it 287 to a directory containing the old logfile. 288 """ 289 # move the file out of the way 290 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log') 291 base_name = os.path.basename(log_path) 292 temp_path = os.path.join(temp_dir, base_name) 293 os.rename(log_path, temp_path) 294 295 os.mkdir(log_path) 296 297 # and move it into the new directory 298 os.rename(temp_path, os.path.join(log_path, 'old_log')) 299 os.rmdir(temp_dir) 300 301 302 def _ensure_directory_exists(self, path): 303 if os.path.isdir(path): 304 return 305 306 if os.path.exists(path): 307 # path exists already, but as a file, not a directory 308 if '/hosts/' in path: 309 self._convert_old_host_log(path) 310 return 311 else: 312 raise IOError('Path %s exists as a file, not a directory') 313 314 os.makedirs(path) 315 316 317 def execute_command(self, command, working_directory, log_file, 318 pidfile_name): 319 out_file = None 320 if log_file: 321 self._ensure_directory_exists(os.path.dirname(log_file)) 322 try: 323 out_file = open(log_file, 'a') 324 separator = ('*' * 80) + '\n' 325 out_file.write('\n' + separator) 326 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command)) 327 out_file.write(separator) 328 except (OSError, IOError): 329 email_manager.manager.log_stacktrace( 330 'Error opening log file %s' % log_file) 331 332 if not out_file: 333 out_file = open('/dev/null', 'w') 334 335 in_devnull = open('/dev/null', 'r') 336 337 self._ensure_directory_exists(working_directory) 338 pidfile_path = os.path.join(working_directory, pidfile_name) 339 if os.path.exists(pidfile_path): 340 self._warn('Pidfile %s already exists' % pidfile_path) 341 os.remove(pidfile_path) 342 343 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT, 344 stdin=in_devnull) 345 out_file.close() 346 in_devnull.close() 347 348 349 def write_to_file(self, file_path, contents, is_retry=False): 350 """Write the specified contents to the end of the given file. 351 352 @param file_path: Path to the file. 353 @param contents: Content to be written to the file. 354 @param is_retry: True if this is a retry after file permission be 355 corrected. 356 """ 357 self._ensure_directory_exists(os.path.dirname(file_path)) 358 try: 359 file_object = open(file_path, 'a') 360 file_object.write(contents) 361 file_object.close() 362 except IOError as e: 363 # TODO(dshi): crbug.com/459344 Remove following retry when test 364 # container can be unprivileged container. 365 # If write failed with error 'Permission denied', one possible cause 366 # is that the file was created in a container and thus owned by 367 # root. If so, fix the file permission, and try again. 368 if e.errno == 13 and not is_retry: 369 logging.error('Error write to file %s: %s. Will be retried.', 370 file_path, e) 371 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path)) 372 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path)) 373 self.write_to_file(file_path, contents, is_retry=True) 374 else: 375 self._warn('Error write to file %s: %s' % (file_path, e)) 376 377 378 def copy_file_or_directory(self, source_path, destination_path): 379 """ 380 This interface is designed to match server.hosts.abstract_ssh.get_file 381 (and send_file). That is, if the source_path ends with a slash, the 382 contents of the directory are copied; otherwise, the directory iself is 383 copied. 384 """ 385 if self._same_file(source_path, destination_path): 386 return 387 self._ensure_directory_exists(os.path.dirname(destination_path)) 388 if source_path.endswith('/'): 389 # copying a directory's contents to another directory 390 assert os.path.isdir(source_path) 391 assert os.path.isdir(destination_path) 392 for filename in os.listdir(source_path): 393 self.copy_file_or_directory( 394 os.path.join(source_path, filename), 395 os.path.join(destination_path, filename)) 396 elif os.path.isdir(source_path): 397 try: 398 shutil.copytree(source_path, destination_path, symlinks=True) 399 except shutil.Error: 400 # Ignore copy directory error due to missing files. The cause 401 # of this behavior is that, gs_offloader zips up folders with 402 # too many files. There is a race condition that repair job 403 # tries to copy provision job results to the test job result 404 # folder, meanwhile gs_offloader is uploading the provision job 405 # result and zipping up folders which contains too many files. 406 pass 407 elif os.path.islink(source_path): 408 # copied from shutil.copytree() 409 link_to = os.readlink(source_path) 410 os.symlink(link_to, destination_path) 411 else: 412 shutil.copy(source_path, destination_path) 413 414 415 def _same_file(self, source_path, destination_path): 416 """Checks if the source and destination are the same 417 418 Returns True if the destination is the same as the source, False 419 otherwise. Also returns False if the destination does not exist. 420 """ 421 if not os.path.exists(destination_path): 422 return False 423 return os.path.samefile(source_path, destination_path) 424 425 426 def cleanup_orphaned_containers(self): 427 """Run lxc_cleanup script to clean up orphaned container. 428 """ 429 # The script needs to run with sudo as the containers are privileged. 430 # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test 431 # container can be unprivileged container. 432 command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l', 433 LXC_CLEANUP_LOG_FILE] 434 logging.info('Running %s', command) 435 # stdout and stderr needs to be direct to /dev/null, otherwise existing 436 # of drone_utils process will kill lxc_cleanup script. 437 subprocess.Popen( 438 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'), 439 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp) 440 441 442 def wait_for_all_async_commands(self): 443 for subproc in self._subcommands: 444 subproc.fork_waitfor() 445 self._subcommands = [] 446 447 448 def _poll_async_commands(self): 449 still_running = [] 450 for subproc in self._subcommands: 451 if subproc.poll() is None: 452 still_running.append(subproc) 453 self._subcommands = still_running 454 455 456 def _wait_for_some_async_commands(self): 457 self._poll_async_commands() 458 max_processes = scheduler_config.config.max_transfer_processes 459 while len(self._subcommands) >= max_processes: 460 time.sleep(1) 461 self._poll_async_commands() 462 463 464 def run_async_command(self, function, args): 465 subproc = subcommand.subcommand(function, args) 466 self._subcommands.append(subproc) 467 subproc.fork_start() 468 469 470 def _sync_get_file_from(self, hostname, source_path, destination_path): 471 logging.debug('_sync_get_file_from hostname: %s, source_path: %s,' 472 'destination_path: %s', hostname, source_path, 473 destination_path) 474 self._ensure_directory_exists(os.path.dirname(destination_path)) 475 host = create_host(hostname) 476 host.get_file(source_path, destination_path, delete_dest=True) 477 478 479 def get_file_from(self, hostname, source_path, destination_path): 480 self.run_async_command(self._sync_get_file_from, 481 (hostname, source_path, destination_path)) 482 483 484 def sync_send_file_to(self, hostname, source_path, destination_path, 485 can_fail): 486 logging.debug('sync_send_file_to. hostname: %s, source_path: %s, ' 487 'destination_path: %s, can_fail:%s', hostname, 488 source_path, destination_path, can_fail) 489 host = create_host(hostname) 490 try: 491 host.run('mkdir -p ' + os.path.dirname(destination_path)) 492 host.send_file(source_path, destination_path, delete_dest=True) 493 except error.AutoservError: 494 if not can_fail: 495 raise 496 497 if os.path.isdir(source_path): 498 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE) 499 file_object = open(failed_file, 'w') 500 try: 501 file_object.write('%s:%s\n%s\n%s' % 502 (hostname, destination_path, 503 datetime.datetime.now(), 504 traceback.format_exc())) 505 finally: 506 file_object.close() 507 else: 508 copy_to = destination_path + _TRANSFER_FAILED_FILE 509 self._ensure_directory_exists(os.path.dirname(copy_to)) 510 self.copy_file_or_directory(source_path, copy_to) 511 512 513 def send_file_to(self, hostname, source_path, destination_path, 514 can_fail=False): 515 self.run_async_command(self.sync_send_file_to, 516 (hostname, source_path, destination_path, 517 can_fail)) 518 519 520 def _report_long_execution(self, calls, duration): 521 call_count = {} 522 for call in calls: 523 call_count.setdefault(call._method, 0) 524 call_count[call._method] += 1 525 call_summary = '\n'.join('%d %s' % (count, method) 526 for method, count in call_count.iteritems()) 527 self._warn('Execution took %f sec\n%s' % (duration, call_summary)) 528 529 530 def execute_calls(self, calls): 531 results = [] 532 start_time = time.time() 533 max_processes = scheduler_config.config.max_transfer_processes 534 for method_call in calls: 535 results.append(method_call.execute_on(self)) 536 if len(self._subcommands) >= max_processes: 537 self._wait_for_some_async_commands() 538 self.wait_for_all_async_commands() 539 540 duration = time.time() - start_time 541 if duration > self._WARNING_DURATION: 542 self._report_long_execution(calls, duration) 543 544 warnings = self.warnings 545 self.warnings = [] 546 return dict(results=results, warnings=warnings) 547 548 549def create_host(hostname): 550 username = global_config.global_config.get_config_value( 551 'SCHEDULER', hostname + '_username', default=getpass.getuser()) 552 return hosts.SSHHost(hostname, user=username) 553 554 555def parse_input(): 556 input_chunks = [] 557 chunk_of_input = sys.stdin.read() 558 while chunk_of_input: 559 input_chunks.append(chunk_of_input) 560 chunk_of_input = sys.stdin.read() 561 pickled_input = ''.join(input_chunks) 562 563 try: 564 return pickle.loads(pickled_input) 565 except Exception: 566 separator = '*' * 50 567 raise ValueError('Unpickling input failed\n' 568 'Input: %r\n' 569 'Exception from pickle:\n' 570 '%s\n%s\n%s' % 571 (pickled_input, separator, traceback.format_exc(), 572 separator)) 573 574 575def _parse_args(args): 576 parser = argparse.ArgumentParser(description='Local drone process manager.') 577 parser.add_argument('--call_time', 578 help='Time this process was invoked from the master', 579 default=None, type=float) 580 return parser.parse_args(args) 581 582 583SiteDroneUtility = utils.import_site_class( 584 __file__, 'autotest_lib.scheduler.site_drone_utility', 585 'SiteDroneUtility', BaseDroneUtility) 586 587 588class DroneUtility(SiteDroneUtility): 589 pass 590 591 592def return_data(data): 593 print pickle.dumps(data) 594 595 596def main(): 597 logging_manager.configure_logging( 598 drone_logging_config.DroneLoggingConfig()) 599 calls = parse_input() 600 args = _parse_args(sys.argv[1:]) 601 602 drone_utility = DroneUtility() 603 return_value = drone_utility.execute_calls(calls) 604 return_data(return_value) 605 606 607if __name__ == '__main__': 608 main() 609