1import collections 2import heapq 3import os 4import Queue 5import time 6import threading 7import traceback 8import logging 9 10import common 11from autotest_lib.client.common_lib import error, global_config, utils 12from autotest_lib.client.common_lib.cros.graphite import autotest_stats 13from autotest_lib.scheduler import email_manager, drone_utility, drones 14from autotest_lib.scheduler import drone_task_queue 15from autotest_lib.scheduler import scheduler_config 16from autotest_lib.scheduler import thread_lib 17 18 19# results on drones will be placed under the drone_installation_directory in a 20# directory with this name 21_DRONE_RESULTS_DIR_SUFFIX = 'results' 22 23WORKING_DIRECTORY = object() # see execute_command() 24 25 26AUTOSERV_PID_FILE = '.autoserv_execute' 27CRASHINFO_PID_FILE = '.collect_crashinfo_execute' 28PARSER_PID_FILE = '.parser_execute' 29ARCHIVER_PID_FILE = '.archiver_execute' 30 31ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE, 32 ARCHIVER_PID_FILE) 33 34_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value( 35 scheduler_config.CONFIG_SECTION, 'threaded_drone_manager', 36 type=bool, default=True) 37 38 39class DroneManagerError(Exception): 40 pass 41 42 43class CustomEquals(object): 44 def _id(self): 45 raise NotImplementedError 46 47 48 def __eq__(self, other): 49 if not isinstance(other, type(self)): 50 return NotImplemented 51 return self._id() == other._id() 52 53 54 def __ne__(self, other): 55 return not self == other 56 57 58 def __hash__(self): 59 return hash(self._id()) 60 61 62class Process(CustomEquals): 63 def __init__(self, hostname, pid, ppid=None): 64 self.hostname = hostname 65 self.pid = pid 66 self.ppid = ppid 67 68 def _id(self): 69 return (self.hostname, self.pid) 70 71 72 def __str__(self): 73 return '%s/%s' % (self.hostname, self.pid) 74 75 76 def __repr__(self): 77 return super(Process, self).__repr__() + '<%s>' % self 78 79 80class PidfileId(CustomEquals): 81 def __init__(self, path): 82 self.path = path 83 84 85 def _id(self): 86 return self.path 87 88 89 def __str__(self): 90 return str(self.path) 91 92 93class _PidfileInfo(object): 94 age = 0 95 num_processes = None 96 97 98class PidfileContents(object): 99 process = None 100 exit_status = None 101 num_tests_failed = None 102 103 def is_invalid(self): 104 return False 105 106 107 def is_running(self): 108 return self.process and not self.exit_status 109 110 111class InvalidPidfile(object): 112 process = None 113 exit_status = None 114 num_tests_failed = None 115 116 117 def __init__(self, error): 118 self.error = error 119 120 121 def is_invalid(self): 122 return True 123 124 125 def is_running(self): 126 return False 127 128 129 def __str__(self): 130 return self.error 131 132 133class _DroneHeapWrapper(object): 134 """Wrapper to compare drones based on used_capacity(). 135 136 These objects can be used to keep a heap of drones by capacity. 137 """ 138 def __init__(self, drone): 139 self.drone = drone 140 141 142 def __cmp__(self, other): 143 assert isinstance(other, _DroneHeapWrapper) 144 return cmp(self.drone.used_capacity(), other.drone.used_capacity()) 145 146 147class BaseDroneManager(object): 148 """ 149 This class acts as an interface from the scheduler to drones, whether it be 150 only a single "drone" for localhost or multiple remote drones. 151 152 All paths going into and out of this class are relative to the full results 153 directory, except for those returns by absolute_path(). 154 """ 155 156 157 # Minimum time to wait before next email 158 # about a drone hitting process limit is sent. 159 NOTIFY_INTERVAL = 60 * 60 * 24 # one day 160 _STATS_KEY = 'drone_manager' 161 _timer = autotest_stats.Timer(_STATS_KEY) 162 163 164 def __init__(self): 165 # absolute path of base results dir 166 self._results_dir = None 167 # holds Process objects 168 self._process_set = set() 169 # holds the list of all processes running on all drones 170 self._all_processes = {} 171 # maps PidfileId to PidfileContents 172 self._pidfiles = {} 173 # same as _pidfiles 174 self._pidfiles_second_read = {} 175 # maps PidfileId to _PidfileInfo 176 self._registered_pidfile_info = {} 177 # used to generate unique temporary paths 178 self._temporary_path_counter = 0 179 # maps hostname to Drone object 180 self._drones = {} 181 self._results_drone = None 182 # maps results dir to dict mapping file path to contents 183 self._attached_files = {} 184 # heapq of _DroneHeapWrappers 185 self._drone_queue = [] 186 # map drone hostname to time stamp of email that 187 # has been sent about the drone hitting process limit. 188 self._notify_record = {} 189 # A threaded task queue used to refresh drones asynchronously. 190 if _THREADED_DRONE_MANAGER: 191 self._refresh_task_queue = thread_lib.ThreadedTaskQueue( 192 name='%s.refresh_queue' % self._STATS_KEY) 193 else: 194 self._refresh_task_queue = drone_task_queue.DroneTaskQueue() 195 196 197 def initialize(self, base_results_dir, drone_hostnames, 198 results_repository_hostname): 199 self._results_dir = base_results_dir 200 201 for hostname in drone_hostnames: 202 self._add_drone(hostname) 203 204 if not self._drones: 205 # all drones failed to initialize 206 raise DroneManagerError('No valid drones found') 207 208 self.refresh_drone_configs() 209 210 logging.info('Using results repository on %s', 211 results_repository_hostname) 212 self._results_drone = drones.get_drone(results_repository_hostname) 213 results_installation_dir = global_config.global_config.get_config_value( 214 scheduler_config.CONFIG_SECTION, 215 'results_host_installation_directory', default=None) 216 if results_installation_dir: 217 self._results_drone.set_autotest_install_dir( 218 results_installation_dir) 219 # don't initialize() the results drone - we don't want to clear out any 220 # directories and we don't need to kill any processes 221 222 223 def reinitialize_drones(self): 224 self._call_all_drones('initialize', self._results_dir) 225 226 227 def shutdown(self): 228 for drone in self.get_drones(): 229 drone.shutdown() 230 231 232 def _get_max_pidfile_refreshes(self): 233 """ 234 Normally refresh() is called on every monitor_db.Dispatcher.tick(). 235 236 @returns: The number of refresh() calls before we forget a pidfile. 237 """ 238 pidfile_timeout = global_config.global_config.get_config_value( 239 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes', 240 type=int, default=2000) 241 return pidfile_timeout 242 243 244 def _add_drone(self, hostname): 245 logging.info('Adding drone %s', hostname) 246 drone = drones.get_drone(hostname) 247 if drone: 248 self._drones[drone.hostname] = drone 249 drone.call('initialize', self.absolute_path('')) 250 251 252 def _remove_drone(self, hostname): 253 self._drones.pop(hostname, None) 254 255 256 def refresh_drone_configs(self): 257 """ 258 Reread global config options for all drones. 259 """ 260 # Import server_manager_utils is delayed rather than at the beginning of 261 # this module. The reason is that test_that imports drone_manager when 262 # importing autoserv_utils. The import is done before test_that setup 263 # django (test_that only setup django in setup_local_afe, since it's 264 # not needed when test_that runs the test in a lab duts through :lab: 265 # option. Therefore, if server_manager_utils is imported at the 266 # beginning of this module, test_that will fail since django is not 267 # setup yet. 268 from autotest_lib.site_utils import server_manager_utils 269 config = global_config.global_config 270 section = scheduler_config.CONFIG_SECTION 271 config.parse_config_file() 272 for hostname, drone in self._drones.iteritems(): 273 if server_manager_utils.use_server_db(): 274 server = server_manager_utils.get_servers(hostname=hostname)[0] 275 attributes = dict([(a.attribute, a.value) 276 for a in server.attributes.all()]) 277 drone.enabled = ( 278 int(attributes.get('disabled', 0)) == 0) 279 drone.max_processes = int( 280 attributes.get( 281 'max_processes', 282 scheduler_config.config.max_processes_per_drone)) 283 allowed_users = attributes.get('users', None) 284 else: 285 disabled = config.get_config_value( 286 section, '%s_disabled' % hostname, default='') 287 drone.enabled = not bool(disabled) 288 drone.max_processes = config.get_config_value( 289 section, '%s_max_processes' % hostname, type=int, 290 default=scheduler_config.config.max_processes_per_drone) 291 292 allowed_users = config.get_config_value( 293 section, '%s_users' % hostname, default=None) 294 if allowed_users: 295 drone.allowed_users = set(allowed_users.split()) 296 else: 297 drone.allowed_users = None 298 logging.info('Drone %s.max_processes: %s', hostname, 299 drone.max_processes) 300 logging.info('Drone %s.enabled: %s', hostname, drone.enabled) 301 logging.info('Drone %s.allowed_users: %s', hostname, 302 drone.allowed_users) 303 logging.info('Drone %s.support_ssp: %s', hostname, 304 drone.support_ssp) 305 306 self._reorder_drone_queue() # max_processes may have changed 307 # Clear notification record about reaching max_processes limit. 308 self._notify_record = {} 309 310 311 def get_drones(self): 312 return self._drones.itervalues() 313 314 315 def cleanup_orphaned_containers(self): 316 """Queue cleanup_orphaned_containers call at each drone. 317 """ 318 for drone in self._drones.values(): 319 logging.info('Queue cleanup_orphaned_containers at %s', 320 drone.hostname) 321 drone.queue_call('cleanup_orphaned_containers') 322 323 324 def _get_drone_for_process(self, process): 325 return self._drones[process.hostname] 326 327 328 def _get_drone_for_pidfile_id(self, pidfile_id): 329 pidfile_contents = self.get_pidfile_contents(pidfile_id) 330 assert pidfile_contents.process is not None 331 return self._get_drone_for_process(pidfile_contents.process) 332 333 334 def _drop_old_pidfiles(self): 335 # use items() since the dict is modified in unregister_pidfile() 336 for pidfile_id, info in self._registered_pidfile_info.items(): 337 if info.age > self._get_max_pidfile_refreshes(): 338 logging.warning('dropping leaked pidfile %s', pidfile_id) 339 self.unregister_pidfile(pidfile_id) 340 else: 341 info.age += 1 342 343 344 def _reset(self): 345 self._process_set = set() 346 self._all_processes = {} 347 self._pidfiles = {} 348 self._pidfiles_second_read = {} 349 self._drone_queue = [] 350 351 352 def _call_all_drones(self, method, *args, **kwargs): 353 all_results = {} 354 for drone in self.get_drones(): 355 with self._timer.get_client( 356 '%s.%s' % (drone.hostname.replace('.', '_'), method)): 357 all_results[drone] = drone.call(method, *args, **kwargs) 358 return all_results 359 360 361 def _parse_pidfile(self, drone, raw_contents): 362 """Parse raw pidfile contents. 363 364 @param drone: The drone on which this pidfile was found. 365 @param raw_contents: The raw contents of a pidfile, eg: 366 "pid\nexit_staus\nnum_tests_failed\n". 367 """ 368 contents = PidfileContents() 369 if not raw_contents: 370 return contents 371 lines = raw_contents.splitlines() 372 if len(lines) > 3: 373 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' % 374 (len(lines), lines)) 375 try: 376 pid = int(lines[0]) 377 contents.process = Process(drone.hostname, pid) 378 # if len(lines) == 2, assume we caught Autoserv between writing 379 # exit_status and num_failed_tests, so just ignore it and wait for 380 # the next cycle 381 if len(lines) == 3: 382 contents.exit_status = int(lines[1]) 383 contents.num_tests_failed = int(lines[2]) 384 except ValueError, exc: 385 return InvalidPidfile('Corrupt pid file: ' + str(exc.args)) 386 387 return contents 388 389 390 def _process_pidfiles(self, drone, pidfiles, store_in_dict): 391 for pidfile_path, contents in pidfiles.iteritems(): 392 pidfile_id = PidfileId(pidfile_path) 393 contents = self._parse_pidfile(drone, contents) 394 store_in_dict[pidfile_id] = contents 395 396 397 def _add_process(self, drone, process_info): 398 process = Process(drone.hostname, int(process_info['pid']), 399 int(process_info['ppid'])) 400 self._process_set.add(process) 401 402 403 def _add_autoserv_process(self, drone, process_info): 404 assert process_info['comm'] == 'autoserv' 405 # only root autoserv processes have pgid == pid 406 if process_info['pgid'] != process_info['pid']: 407 return 408 self._add_process(drone, process_info) 409 410 411 def _enqueue_drone(self, drone): 412 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone)) 413 414 415 def _reorder_drone_queue(self): 416 heapq.heapify(self._drone_queue) 417 418 419 def _compute_active_processes(self, drone): 420 drone.active_processes = 0 421 for pidfile_id, contents in self._pidfiles.iteritems(): 422 is_running = contents.exit_status is None 423 on_this_drone = (contents.process 424 and contents.process.hostname == drone.hostname) 425 if is_running and on_this_drone: 426 info = self._registered_pidfile_info[pidfile_id] 427 if info.num_processes is not None: 428 drone.active_processes += info.num_processes 429 autotest_stats.Gauge(self._STATS_KEY).send( 430 '%s.%s' % (drone.hostname.replace('.', '_'), 431 'active_processes'), drone.active_processes) 432 433 434 def _check_drone_process_limit(self, drone): 435 """ 436 Notify if the number of processes on |drone| is approaching limit. 437 438 @param drone: A Drone object. 439 """ 440 try: 441 percent = float(drone.active_processes) / drone.max_processes 442 except ZeroDivisionError: 443 percent = 100 444 max_percent = scheduler_config.config.max_processes_warning_threshold 445 if percent >= max_percent: 446 message = ('Drone %s is hitting %s of process limit.' % 447 (drone.hostname, format(percent, '.2%'))) 448 logging.warning(message) 449 last_notified = self._notify_record.get(drone.hostname, 0) 450 now = time.time() 451 if last_notified + BaseDroneManager.NOTIFY_INTERVAL < now: 452 body = ('Active processes/Process limit: %d/%d (%s)' % 453 (drone.active_processes, drone.max_processes, 454 format(percent, '.2%'))) 455 email_manager.manager.enqueue_notify_email(message, body) 456 self._notify_record[drone.hostname] = now 457 458 459 def trigger_refresh(self): 460 """Triggers a drone manager refresh. 461 462 @raises DroneManagerError: If a drone has un-executed calls. 463 Since they will get clobbered when we queue refresh calls. 464 """ 465 self._reset() 466 self._drop_old_pidfiles() 467 pidfile_paths = [pidfile_id.path 468 for pidfile_id in self._registered_pidfile_info] 469 drones = list(self.get_drones()) 470 for drone in drones: 471 calls = drone.get_calls() 472 if calls: 473 raise DroneManagerError('Drone %s has un-executed calls: %s ' 474 'which might get corrupted through ' 475 'this invocation' % 476 (drone, [str(call) for call in calls])) 477 drone.queue_call('refresh', pidfile_paths) 478 logging.info("Invoking drone refresh.") 479 with self._timer.get_client('trigger_refresh'): 480 self._refresh_task_queue.execute(drones, wait=False) 481 482 483 def sync_refresh(self): 484 """Complete the drone refresh started by trigger_refresh. 485 486 Waits for all drone threads then refreshes internal datastructures 487 with drone process information. 488 """ 489 490 # This gives us a dictionary like what follows: 491 # {drone: [{'pidfiles': (raw contents of pidfile paths), 492 # 'autoserv_processes': (autoserv process info from ps), 493 # 'all_processes': (all process info from ps), 494 # 'parse_processes': (parse process infor from ps), 495 # 'pidfile_second_read': (pidfile contents, again),}] 496 # drone2: ...} 497 # The values of each drone are only a list because this adheres to the 498 # drone utility interface (each call is executed and its results are 499 # places in a list, but since we never couple the refresh calls with 500 # any other call, this list will always contain a single dict). 501 with self._timer.get_client('sync_refresh'): 502 all_results = self._refresh_task_queue.get_results() 503 logging.info("Drones refreshed.") 504 505 # The loop below goes through and parses pidfile contents. Pidfiles 506 # are used to track autoserv execution, and will always contain < 3 507 # lines of the following: pid, exit code, number of tests. Each pidfile 508 # is identified by a PidfileId object, which contains a unique pidfile 509 # path (unique because it contains the job id) making it hashable. 510 # All pidfiles are stored in the drone managers _pidfiles dict as: 511 # {pidfile_id: pidfile_contents(Process(drone, pid), 512 # exit_code, num_tests_failed)} 513 # In handle agents, each agent knows its pidfile_id, and uses this 514 # to retrieve the refreshed contents of its pidfile via the 515 # PidfileRunMonitor (through its tick) before making decisions. If 516 # the agent notices that its process has exited, it unregisters the 517 # pidfile from the drone_managers._registered_pidfile_info dict 518 # through its epilog. 519 for drone, results_list in all_results.iteritems(): 520 results = results_list[0] 521 drone_hostname = drone.hostname.replace('.', '_') 522 523 with self._timer.get_client('%s.results' % drone_hostname): 524 for process_info in results['all_processes']: 525 if process_info['comm'] == 'autoserv': 526 self._add_autoserv_process(drone, process_info) 527 drone_pid = drone.hostname, int(process_info['pid']) 528 self._all_processes[drone_pid] = process_info 529 530 for process_info in results['parse_processes']: 531 self._add_process(drone, process_info) 532 533 with self._timer.get_client('%s.pidfiles' % drone_hostname): 534 self._process_pidfiles(drone, results['pidfiles'], 535 self._pidfiles) 536 with self._timer.get_client('%s.pidfiles_second' % drone_hostname): 537 self._process_pidfiles(drone, results['pidfiles_second_read'], 538 self._pidfiles_second_read) 539 540 self._compute_active_processes(drone) 541 if drone.enabled: 542 self._enqueue_drone(drone) 543 self._check_drone_process_limit(drone) 544 545 546 def refresh(self): 547 """Refresh all drones.""" 548 with self._timer.get_client('refresh'): 549 self.trigger_refresh() 550 self.sync_refresh() 551 552 553 def execute_actions(self): 554 """ 555 Called at the end of a scheduler cycle to execute all queued actions 556 on drones. 557 """ 558 # Invoke calls queued on all drones since the last call to execute 559 # and wait for them to return. 560 if _THREADED_DRONE_MANAGER: 561 thread_lib.ThreadedTaskQueue( 562 name='%s.execute_queue' % self._STATS_KEY).execute( 563 self._drones.values()) 564 else: 565 drone_task_queue.DroneTaskQueue().execute(self._drones.values()) 566 567 try: 568 self._results_drone.execute_queued_calls() 569 except error.AutoservError: 570 warning = ('Results repository failed to execute calls:\n' + 571 traceback.format_exc()) 572 email_manager.manager.enqueue_notify_email( 573 'Results repository error', warning) 574 self._results_drone.clear_call_queue() 575 576 577 def get_orphaned_autoserv_processes(self): 578 """ 579 Returns a set of Process objects for orphaned processes only. 580 """ 581 return set(process for process in self._process_set 582 if process.ppid == 1) 583 584 585 def kill_process(self, process): 586 """ 587 Kill the given process. 588 """ 589 logging.info('killing %s', process) 590 drone = self._get_drone_for_process(process) 591 drone.queue_call('kill_process', process) 592 593 594 def _ensure_directory_exists(self, path): 595 if not os.path.exists(path): 596 os.makedirs(path) 597 598 599 def total_running_processes(self): 600 return sum(drone.active_processes for drone in self.get_drones()) 601 602 603 def max_runnable_processes(self, username, drone_hostnames_allowed): 604 """ 605 Return the maximum number of processes that can be run (in a single 606 execution) given the current load on drones. 607 @param username: login of user to run a process. may be None. 608 @param drone_hostnames_allowed: list of drones that can be used. May be 609 None 610 """ 611 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue 612 if wrapper.drone.usable_by(username) and 613 (drone_hostnames_allowed is None or 614 wrapper.drone.hostname in 615 drone_hostnames_allowed)] 616 if not usable_drone_wrappers: 617 # all drones disabled or inaccessible 618 return 0 619 runnable_processes = [ 620 wrapper.drone.max_processes - wrapper.drone.active_processes 621 for wrapper in usable_drone_wrappers] 622 return max([0] + runnable_processes) 623 624 625 def _least_loaded_drone(self, drones): 626 drone_to_use = drones[0] 627 for drone in drones[1:]: 628 if drone.used_capacity() < drone_to_use.used_capacity(): 629 drone_to_use = drone 630 return drone_to_use 631 632 633 def _choose_drone_for_execution(self, num_processes, username, 634 drone_hostnames_allowed, 635 require_ssp=False): 636 """Choose a drone to execute command. 637 638 @param num_processes: Number of processes needed for execution. 639 @param username: Name of the user to execute the command. 640 @param drone_hostnames_allowed: A list of names of drone allowed. 641 @param require_ssp: Require server-side packaging to execute the, 642 command, default to False. 643 644 @return: A drone object to be used for execution. 645 """ 646 # cycle through drones is order of increasing used capacity until 647 # we find one that can handle these processes 648 checked_drones = [] 649 usable_drones = [] 650 # Drones do not support server-side packaging, used as backup if no 651 # drone is found to run command requires server-side packaging. 652 no_ssp_drones = [] 653 drone_to_use = None 654 while self._drone_queue: 655 drone = heapq.heappop(self._drone_queue).drone 656 checked_drones.append(drone) 657 logging.info('Checking drone %s', drone.hostname) 658 if not drone.usable_by(username): 659 continue 660 661 drone_allowed = (drone_hostnames_allowed is None 662 or drone.hostname in drone_hostnames_allowed) 663 if not drone_allowed: 664 logging.debug('Drone %s not allowed: ', drone.hostname) 665 continue 666 if require_ssp and not drone.support_ssp: 667 logging.debug('Drone %s does not support server-side ' 668 'packaging.', drone.hostname) 669 no_ssp_drones.append(drone) 670 continue 671 672 usable_drones.append(drone) 673 674 if drone.active_processes + num_processes <= drone.max_processes: 675 drone_to_use = drone 676 break 677 logging.info('Drone %s has %d active + %s requested > %s max', 678 drone.hostname, drone.active_processes, num_processes, 679 drone.max_processes) 680 681 if not drone_to_use and usable_drones: 682 # Drones are all over loaded, pick the one with least load. 683 drone_summary = ','.join('%s %s/%s' % (drone.hostname, 684 drone.active_processes, 685 drone.max_processes) 686 for drone in usable_drones) 687 logging.error('No drone has capacity to handle %d processes (%s) ' 688 'for user %s', num_processes, drone_summary, username) 689 drone_to_use = self._least_loaded_drone(usable_drones) 690 elif not drone_to_use and require_ssp and no_ssp_drones: 691 # No drone supports server-side packaging, choose the least loaded. 692 drone_to_use = self._least_loaded_drone(no_ssp_drones) 693 694 # refill _drone_queue 695 for drone in checked_drones: 696 self._enqueue_drone(drone) 697 698 return drone_to_use 699 700 701 def _substitute_working_directory_into_command(self, command, 702 working_directory): 703 for i, item in enumerate(command): 704 if item is WORKING_DIRECTORY: 705 command[i] = working_directory 706 707 708 def execute_command(self, command, working_directory, pidfile_name, 709 num_processes, log_file=None, paired_with_pidfile=None, 710 username=None, drone_hostnames_allowed=None): 711 """ 712 Execute the given command, taken as an argv list. 713 714 @param command: command to execute as a list. if any item is 715 WORKING_DIRECTORY, the absolute path to the working directory 716 will be substituted for it. 717 @param working_directory: directory in which the pidfile will be written 718 @param pidfile_name: name of the pidfile this process will write 719 @param num_processes: number of processes to account for from this 720 execution 721 @param log_file (optional): path (in the results repository) to hold 722 command output. 723 @param paired_with_pidfile (optional): a PidfileId for an 724 already-executed process; the new process will execute on the 725 same drone as the previous process. 726 @param username (optional): login of the user responsible for this 727 process. 728 @param drone_hostnames_allowed (optional): hostnames of the drones that 729 this command is allowed to 730 execute on 731 """ 732 abs_working_directory = self.absolute_path(working_directory) 733 if not log_file: 734 log_file = self.get_temporary_path('execute') 735 log_file = self.absolute_path(log_file) 736 737 self._substitute_working_directory_into_command(command, 738 abs_working_directory) 739 740 if paired_with_pidfile: 741 drone = self._get_drone_for_pidfile_id(paired_with_pidfile) 742 else: 743 require_ssp = '--require-ssp' in command 744 drone = self._choose_drone_for_execution( 745 num_processes, username, drone_hostnames_allowed, 746 require_ssp=require_ssp) 747 # Enable --warn-no-ssp option for autoserv to log a warning and run 748 # the command without using server-side packaging. 749 if require_ssp and not drone.support_ssp: 750 command.append('--warn-no-ssp') 751 752 if not drone: 753 raise DroneManagerError('command failed; no drones available: %s' 754 % command) 755 756 logging.info("command = %s", command) 757 logging.info('log file = %s:%s', drone.hostname, log_file) 758 self._write_attached_files(working_directory, drone) 759 drone.queue_call('execute_command', command, abs_working_directory, 760 log_file, pidfile_name) 761 drone.active_processes += num_processes 762 self._reorder_drone_queue() 763 764 pidfile_path = os.path.join(abs_working_directory, pidfile_name) 765 pidfile_id = PidfileId(pidfile_path) 766 self.register_pidfile(pidfile_id) 767 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 768 return pidfile_id 769 770 771 def get_pidfile_id_from(self, execution_tag, pidfile_name): 772 path = os.path.join(self.absolute_path(execution_tag), pidfile_name) 773 return PidfileId(path) 774 775 776 def register_pidfile(self, pidfile_id): 777 """ 778 Indicate that the DroneManager should look for the given pidfile when 779 refreshing. 780 """ 781 if pidfile_id not in self._registered_pidfile_info: 782 logging.info('monitoring pidfile %s', pidfile_id) 783 self._registered_pidfile_info[pidfile_id] = _PidfileInfo() 784 self._reset_pidfile_age(pidfile_id) 785 786 787 def _reset_pidfile_age(self, pidfile_id): 788 if pidfile_id in self._registered_pidfile_info: 789 self._registered_pidfile_info[pidfile_id].age = 0 790 791 792 def unregister_pidfile(self, pidfile_id): 793 if pidfile_id in self._registered_pidfile_info: 794 logging.info('forgetting pidfile %s', pidfile_id) 795 del self._registered_pidfile_info[pidfile_id] 796 797 798 def declare_process_count(self, pidfile_id, num_processes): 799 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 800 801 802 def get_pidfile_contents(self, pidfile_id, use_second_read=False): 803 """ 804 Retrieve a PidfileContents object for the given pidfile_id. If 805 use_second_read is True, use results that were read after the processes 806 were checked, instead of before. 807 """ 808 self._reset_pidfile_age(pidfile_id) 809 if use_second_read: 810 pidfile_map = self._pidfiles_second_read 811 else: 812 pidfile_map = self._pidfiles 813 return pidfile_map.get(pidfile_id, PidfileContents()) 814 815 816 def is_process_running(self, process): 817 """ 818 Check if the given process is in the running process list. 819 """ 820 if process in self._process_set: 821 return True 822 823 drone_pid = process.hostname, process.pid 824 if drone_pid in self._all_processes: 825 logging.error('Process %s found, but not an autoserv process. ' 826 'Is %s', process, self._all_processes[drone_pid]) 827 return True 828 829 return False 830 831 832 def get_temporary_path(self, base_name): 833 """ 834 Get a new temporary path guaranteed to be unique across all drones 835 for this scheduler execution. 836 """ 837 self._temporary_path_counter += 1 838 return os.path.join(drone_utility._TEMPORARY_DIRECTORY, 839 '%s.%s' % (base_name, self._temporary_path_counter)) 840 841 842 def absolute_path(self, path, on_results_repository=False): 843 if on_results_repository: 844 base_dir = self._results_dir 845 else: 846 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR, 847 _DRONE_RESULTS_DIR_SUFFIX) 848 return os.path.join(base_dir, path) 849 850 851 def _copy_results_helper(self, process, source_path, destination_path, 852 to_results_repository=False): 853 logging.debug('_copy_results_helper. process: %s, source_path: %s, ' 854 'destination_path: %s, to_results_repository: %s', 855 process, source_path, destination_path, 856 to_results_repository) 857 full_source = self.absolute_path(source_path) 858 full_destination = self.absolute_path( 859 destination_path, on_results_repository=to_results_repository) 860 source_drone = self._get_drone_for_process(process) 861 if to_results_repository: 862 source_drone.send_file_to(self._results_drone, full_source, 863 full_destination, can_fail=True) 864 else: 865 source_drone.queue_call('copy_file_or_directory', full_source, 866 full_destination) 867 868 869 def copy_to_results_repository(self, process, source_path, 870 destination_path=None): 871 """ 872 Copy results from the given process at source_path to destination_path 873 in the results repository. 874 """ 875 if destination_path is None: 876 destination_path = source_path 877 self._copy_results_helper(process, source_path, destination_path, 878 to_results_repository=True) 879 880 881 def copy_results_on_drone(self, process, source_path, destination_path): 882 """ 883 Copy a results directory from one place to another on the drone. 884 """ 885 self._copy_results_helper(process, source_path, destination_path) 886 887 888 def _write_attached_files(self, results_dir, drone): 889 attached_files = self._attached_files.pop(results_dir, {}) 890 for file_path, contents in attached_files.iteritems(): 891 drone.queue_call('write_to_file', self.absolute_path(file_path), 892 contents) 893 894 895 def attach_file_to_execution(self, results_dir, file_contents, 896 file_path=None): 897 """ 898 When the process for the results directory is executed, the given file 899 contents will be placed in a file on the drone. Returns the path at 900 which the file will be placed. 901 """ 902 if not file_path: 903 file_path = self.get_temporary_path('attach') 904 files_for_execution = self._attached_files.setdefault(results_dir, {}) 905 assert file_path not in files_for_execution 906 files_for_execution[file_path] = file_contents 907 return file_path 908 909 910 def write_lines_to_file(self, file_path, lines, paired_with_process=None): 911 """ 912 Write the given lines (as a list of strings) to a file. If 913 paired_with_process is given, the file will be written on the drone 914 running the given Process. Otherwise, the file will be written to the 915 results repository. 916 """ 917 file_contents = '\n'.join(lines) + '\n' 918 if paired_with_process: 919 drone = self._get_drone_for_process(paired_with_process) 920 on_results_repository = False 921 else: 922 drone = self._results_drone 923 on_results_repository = True 924 full_path = self.absolute_path( 925 file_path, on_results_repository=on_results_repository) 926 drone.queue_call('write_to_file', full_path, file_contents) 927 928 929SiteDroneManager = utils.import_site_class( 930 __file__, 'autotest_lib.scheduler.site_drone_manager', 931 'SiteDroneManager', BaseDroneManager) 932 933 934class DroneManager(SiteDroneManager): 935 pass 936 937 938_the_instance = None 939 940def instance(): 941 if _the_instance is None: 942 _set_instance(DroneManager()) 943 return _the_instance 944 945 946def _set_instance(instance): # usable for testing 947 global _the_instance 948 _the_instance = instance 949