1import heapq 2import os 3import logging 4 5import common 6from autotest_lib.client.common_lib import error 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib import utils 9from autotest_lib.scheduler import drones 10from autotest_lib.scheduler import drone_utility 11from autotest_lib.scheduler import drone_task_queue 12from autotest_lib.scheduler import scheduler_config 13from autotest_lib.scheduler import thread_lib 14 15try: 16 from chromite.lib import metrics 17except ImportError: 18 metrics = utils.metrics_mock 19 20 21# results on drones will be placed under the drone_installation_directory in a 22# directory with this name 23_DRONE_RESULTS_DIR_SUFFIX = 'results' 24 25WORKING_DIRECTORY = object() # see execute_command() 26 27 28AUTOSERV_PID_FILE = '.autoserv_execute' 29CRASHINFO_PID_FILE = '.collect_crashinfo_execute' 30PARSER_PID_FILE = '.parser_execute' 31ARCHIVER_PID_FILE = '.archiver_execute' 32 33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE, 34 ARCHIVER_PID_FILE) 35 36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value( 37 scheduler_config.CONFIG_SECTION, 'threaded_drone_manager', 38 type=bool, default=True) 39 40 41class DroneManagerError(Exception): 42 pass 43 44 45class CustomEquals(object): 46 def _id(self): 47 raise NotImplementedError 48 49 50 def __eq__(self, other): 51 if not isinstance(other, type(self)): 52 return NotImplemented 53 return self._id() == other._id() 54 55 56 def __ne__(self, other): 57 return not self == other 58 59 60 def __hash__(self): 61 return hash(self._id()) 62 63 64class Process(CustomEquals): 65 def __init__(self, hostname, pid, ppid=None): 66 self.hostname = hostname 67 self.pid = pid 68 self.ppid = ppid 69 70 def _id(self): 71 return (self.hostname, self.pid) 72 73 74 def __str__(self): 75 return '%s/%s' % (self.hostname, self.pid) 76 77 78 def __repr__(self): 79 return super(Process, self).__repr__() + '<%s>' % self 80 81 82class PidfileId(CustomEquals): 83 def __init__(self, path): 84 self.path = path 85 86 87 def _id(self): 88 return self.path 89 90 91 def __str__(self): 92 return str(self.path) 93 94 95class _PidfileInfo(object): 96 age = 0 97 num_processes = None 98 99 100class PidfileContents(object): 101 process = None 102 exit_status = None 103 num_tests_failed = None 104 105 def is_invalid(self): 106 return False 107 108 109 def is_running(self): 110 return self.process and not self.exit_status 111 112 113class InvalidPidfile(object): 114 process = None 115 exit_status = None 116 num_tests_failed = None 117 118 119 def __init__(self, error): 120 self.error = error 121 122 123 def is_invalid(self): 124 return True 125 126 127 def is_running(self): 128 return False 129 130 131 def __str__(self): 132 return self.error 133 134 135class _DroneHeapWrapper(object): 136 """Wrapper to compare drones based on used_capacity(). 137 138 These objects can be used to keep a heap of drones by capacity. 139 """ 140 def __init__(self, drone): 141 self.drone = drone 142 143 144 def __cmp__(self, other): 145 assert isinstance(other, _DroneHeapWrapper) 146 return cmp(self.drone.used_capacity(), other.drone.used_capacity()) 147 148 149class BaseDroneManager(object): 150 """ 151 This class acts as an interface from the scheduler to drones, whether it be 152 only a single "drone" for localhost or multiple remote drones. 153 154 All paths going into and out of this class are relative to the full results 155 directory, except for those returns by absolute_path(). 156 """ 157 158 159 # Minimum time to wait before next email 160 # about a drone hitting process limit is sent. 161 NOTIFY_INTERVAL = 60 * 60 * 24 # one day 162 _STATS_KEY = 'drone_manager' 163 164 _ACTIVE_PROCESS_GAUGE = metrics.Gauge( 165 'chromeos/autotest/drone/active_processes') 166 167 168 def __init__(self): 169 # absolute path of base results dir 170 self._results_dir = None 171 # holds Process objects 172 self._process_set = set() 173 # holds the list of all processes running on all drones 174 self._all_processes = {} 175 # maps PidfileId to PidfileContents 176 self._pidfiles = {} 177 # same as _pidfiles 178 self._pidfiles_second_read = {} 179 # maps PidfileId to _PidfileInfo 180 self._registered_pidfile_info = {} 181 # used to generate unique temporary paths 182 self._temporary_path_counter = 0 183 # maps hostname to Drone object 184 self._drones = {} 185 self._results_drone = None 186 # maps results dir to dict mapping file path to contents 187 self._attached_files = {} 188 # heapq of _DroneHeapWrappers 189 self._drone_queue = [] 190 # A threaded task queue used to refresh drones asynchronously. 191 if _THREADED_DRONE_MANAGER: 192 self._refresh_task_queue = thread_lib.ThreadedTaskQueue( 193 name='%s.refresh_queue' % self._STATS_KEY) 194 else: 195 self._refresh_task_queue = drone_task_queue.DroneTaskQueue() 196 197 198 def initialize(self, base_results_dir, drone_hostnames, 199 results_repository_hostname): 200 self._results_dir = base_results_dir 201 202 for hostname in drone_hostnames: 203 self._add_drone(hostname) 204 205 if not self._drones: 206 # all drones failed to initialize 207 raise DroneManagerError('No valid drones found') 208 209 self.refresh_drone_configs() 210 211 logging.info('Using results repository on %s', 212 results_repository_hostname) 213 self._results_drone = drones.get_drone(results_repository_hostname) 214 results_installation_dir = global_config.global_config.get_config_value( 215 scheduler_config.CONFIG_SECTION, 216 'results_host_installation_directory', default=None) 217 if results_installation_dir: 218 self._results_drone.set_autotest_install_dir( 219 results_installation_dir) 220 # don't initialize() the results drone - we don't want to clear out any 221 # directories and we don't need to kill any processes 222 223 224 def reinitialize_drones(self): 225 for drone in self.get_drones(): 226 with metrics.SecondsTimer( 227 'chromeos/autotest/drone_manager/' 228 'reinitialize_drones_duration', 229 fields={'drone': drone.hostname}): 230 drone.call('initialize', self._results_dir) 231 232 233 def shutdown(self): 234 for drone in self.get_drones(): 235 drone.shutdown() 236 237 238 def _get_max_pidfile_refreshes(self): 239 """ 240 Normally refresh() is called on every monitor_db.Dispatcher.tick(). 241 242 @returns: The number of refresh() calls before we forget a pidfile. 243 """ 244 pidfile_timeout = global_config.global_config.get_config_value( 245 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes', 246 type=int, default=2000) 247 return pidfile_timeout 248 249 250 def _add_drone(self, hostname): 251 logging.info('Adding drone %s', hostname) 252 drone = drones.get_drone(hostname) 253 if drone: 254 self._drones[drone.hostname] = drone 255 drone.call('initialize', self.absolute_path('')) 256 257 258 def _remove_drone(self, hostname): 259 self._drones.pop(hostname, None) 260 261 262 def refresh_drone_configs(self): 263 """ 264 Reread global config options for all drones. 265 """ 266 # Import server_manager_utils is delayed rather than at the beginning of 267 # this module. The reason is that test_that imports drone_manager when 268 # importing autoserv_utils. The import is done before test_that setup 269 # django (test_that only setup django in setup_local_afe, since it's 270 # not needed when test_that runs the test in a lab duts through :lab: 271 # option. Therefore, if server_manager_utils is imported at the 272 # beginning of this module, test_that will fail since django is not 273 # setup yet. 274 from autotest_lib.site_utils import server_manager_utils 275 config = global_config.global_config 276 section = scheduler_config.CONFIG_SECTION 277 config.parse_config_file() 278 for hostname, drone in self._drones.iteritems(): 279 if server_manager_utils.use_server_db(): 280 server = server_manager_utils.get_servers(hostname=hostname)[0] 281 attributes = dict([(a.attribute, a.value) 282 for a in server.attributes.all()]) 283 drone.enabled = ( 284 int(attributes.get('disabled', 0)) == 0) 285 drone.max_processes = int( 286 attributes.get( 287 'max_processes', 288 scheduler_config.config.max_processes_per_drone)) 289 allowed_users = attributes.get('users', None) 290 else: 291 disabled = config.get_config_value( 292 section, '%s_disabled' % hostname, default='') 293 drone.enabled = not bool(disabled) 294 drone.max_processes = config.get_config_value( 295 section, '%s_max_processes' % hostname, type=int, 296 default=scheduler_config.config.max_processes_per_drone) 297 298 allowed_users = config.get_config_value( 299 section, '%s_users' % hostname, default=None) 300 if allowed_users: 301 drone.allowed_users = set(allowed_users.split()) 302 else: 303 drone.allowed_users = None 304 logging.info('Drone %s.max_processes: %s', hostname, 305 drone.max_processes) 306 logging.info('Drone %s.enabled: %s', hostname, drone.enabled) 307 logging.info('Drone %s.allowed_users: %s', hostname, 308 drone.allowed_users) 309 logging.info('Drone %s.support_ssp: %s', hostname, 310 drone.support_ssp) 311 312 self._reorder_drone_queue() # max_processes may have changed 313 # Clear notification record about reaching max_processes limit. 314 self._notify_record = {} 315 316 317 def get_drones(self): 318 return self._drones.itervalues() 319 320 321 def cleanup_orphaned_containers(self): 322 """Queue cleanup_orphaned_containers call at each drone. 323 """ 324 for drone in self._drones.values(): 325 logging.info('Queue cleanup_orphaned_containers at %s', 326 drone.hostname) 327 drone.queue_call('cleanup_orphaned_containers') 328 329 330 def _get_drone_for_process(self, process): 331 return self._drones[process.hostname] 332 333 334 def _get_drone_for_pidfile_id(self, pidfile_id): 335 pidfile_contents = self.get_pidfile_contents(pidfile_id) 336 assert pidfile_contents.process is not None 337 return self._get_drone_for_process(pidfile_contents.process) 338 339 340 def _drop_old_pidfiles(self): 341 # use items() since the dict is modified in unregister_pidfile() 342 for pidfile_id, info in self._registered_pidfile_info.items(): 343 if info.age > self._get_max_pidfile_refreshes(): 344 logging.warning('dropping leaked pidfile %s', pidfile_id) 345 self.unregister_pidfile(pidfile_id) 346 else: 347 info.age += 1 348 349 350 def _reset(self): 351 self._process_set = set() 352 self._all_processes = {} 353 self._pidfiles = {} 354 self._pidfiles_second_read = {} 355 self._drone_queue = [] 356 357 358 def _parse_pidfile(self, drone, raw_contents): 359 """Parse raw pidfile contents. 360 361 @param drone: The drone on which this pidfile was found. 362 @param raw_contents: The raw contents of a pidfile, eg: 363 "pid\nexit_staus\nnum_tests_failed\n". 364 """ 365 contents = PidfileContents() 366 if not raw_contents: 367 return contents 368 lines = raw_contents.splitlines() 369 if len(lines) > 3: 370 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' % 371 (len(lines), lines)) 372 try: 373 pid = int(lines[0]) 374 contents.process = Process(drone.hostname, pid) 375 # if len(lines) == 2, assume we caught Autoserv between writing 376 # exit_status and num_failed_tests, so just ignore it and wait for 377 # the next cycle 378 if len(lines) == 3: 379 contents.exit_status = int(lines[1]) 380 contents.num_tests_failed = int(lines[2]) 381 except ValueError, exc: 382 return InvalidPidfile('Corrupt pid file: ' + str(exc.args)) 383 384 return contents 385 386 387 def _process_pidfiles(self, drone, pidfiles, store_in_dict): 388 for pidfile_path, contents in pidfiles.iteritems(): 389 pidfile_id = PidfileId(pidfile_path) 390 contents = self._parse_pidfile(drone, contents) 391 store_in_dict[pidfile_id] = contents 392 393 394 def _add_process(self, drone, process_info): 395 process = Process(drone.hostname, int(process_info['pid']), 396 int(process_info['ppid'])) 397 self._process_set.add(process) 398 399 400 def _add_autoserv_process(self, drone, process_info): 401 assert process_info['comm'] == 'autoserv' 402 # only root autoserv processes have pgid == pid 403 if process_info['pgid'] != process_info['pid']: 404 return 405 self._add_process(drone, process_info) 406 407 408 def _enqueue_drone(self, drone): 409 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone)) 410 411 412 def _reorder_drone_queue(self): 413 heapq.heapify(self._drone_queue) 414 415 416 def _compute_active_processes(self, drone): 417 drone.active_processes = 0 418 for pidfile_id, contents in self._pidfiles.iteritems(): 419 is_running = contents.exit_status is None 420 on_this_drone = (contents.process 421 and contents.process.hostname == drone.hostname) 422 if is_running and on_this_drone: 423 info = self._registered_pidfile_info[pidfile_id] 424 if info.num_processes is not None: 425 drone.active_processes += info.num_processes 426 self._ACTIVE_PROCESS_GAUGE.set( 427 drone.active_processes, 428 fields={'drone_hostname': drone.hostname}) 429 430 431 def _check_drone_process_limit(self, drone): 432 """ 433 Notify if the number of processes on |drone| is approaching limit. 434 435 @param drone: A Drone object. 436 """ 437 try: 438 percent = float(drone.active_processes) / drone.max_processes 439 except ZeroDivisionError: 440 percent = 100 441 metrics.Float('chromeos/autotest/drone/active_process_percentage' 442 ).set(percent, fields={'drone_hostname': drone.hostname}) 443 444 def trigger_refresh(self): 445 """Triggers a drone manager refresh. 446 447 @raises DroneManagerError: If a drone has un-executed calls. 448 Since they will get clobbered when we queue refresh calls. 449 """ 450 self._reset() 451 self._drop_old_pidfiles() 452 pidfile_paths = [pidfile_id.path 453 for pidfile_id in self._registered_pidfile_info] 454 drones = list(self.get_drones()) 455 for drone in drones: 456 calls = drone.get_calls() 457 if calls: 458 raise DroneManagerError('Drone %s has un-executed calls: %s ' 459 'which might get corrupted through ' 460 'this invocation' % 461 (drone, [str(call) for call in calls])) 462 drone.queue_call('refresh', pidfile_paths) 463 logging.info("Invoking drone refresh.") 464 with metrics.SecondsTimer( 465 'chromeos/autotest/drone_manager/trigger_refresh_duration'): 466 self._refresh_task_queue.execute(drones, wait=False) 467 468 469 def sync_refresh(self): 470 """Complete the drone refresh started by trigger_refresh. 471 472 Waits for all drone threads then refreshes internal datastructures 473 with drone process information. 474 """ 475 476 # This gives us a dictionary like what follows: 477 # {drone: [{'pidfiles': (raw contents of pidfile paths), 478 # 'autoserv_processes': (autoserv process info from ps), 479 # 'all_processes': (all process info from ps), 480 # 'parse_processes': (parse process infor from ps), 481 # 'pidfile_second_read': (pidfile contents, again),}] 482 # drone2: ...} 483 # The values of each drone are only a list because this adheres to the 484 # drone utility interface (each call is executed and its results are 485 # places in a list, but since we never couple the refresh calls with 486 # any other call, this list will always contain a single dict). 487 with metrics.SecondsTimer( 488 'chromeos/autotest/drone_manager/sync_refresh_duration'): 489 all_results = self._refresh_task_queue.get_results() 490 logging.info("Drones refreshed.") 491 492 # The loop below goes through and parses pidfile contents. Pidfiles 493 # are used to track autoserv execution, and will always contain < 3 494 # lines of the following: pid, exit code, number of tests. Each pidfile 495 # is identified by a PidfileId object, which contains a unique pidfile 496 # path (unique because it contains the job id) making it hashable. 497 # All pidfiles are stored in the drone managers _pidfiles dict as: 498 # {pidfile_id: pidfile_contents(Process(drone, pid), 499 # exit_code, num_tests_failed)} 500 # In handle agents, each agent knows its pidfile_id, and uses this 501 # to retrieve the refreshed contents of its pidfile via the 502 # PidfileRunMonitor (through its tick) before making decisions. If 503 # the agent notices that its process has exited, it unregisters the 504 # pidfile from the drone_managers._registered_pidfile_info dict 505 # through its epilog. 506 for drone, results_list in all_results.iteritems(): 507 results = results_list[0] 508 drone_hostname = drone.hostname.replace('.', '_') 509 510 for process_info in results['all_processes']: 511 if process_info['comm'] == 'autoserv': 512 self._add_autoserv_process(drone, process_info) 513 drone_pid = drone.hostname, int(process_info['pid']) 514 self._all_processes[drone_pid] = process_info 515 516 for process_info in results['parse_processes']: 517 self._add_process(drone, process_info) 518 519 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles) 520 self._process_pidfiles(drone, results['pidfiles_second_read'], 521 self._pidfiles_second_read) 522 523 self._compute_active_processes(drone) 524 if drone.enabled: 525 self._enqueue_drone(drone) 526 self._check_drone_process_limit(drone) 527 528 529 def refresh(self): 530 """Refresh all drones.""" 531 with metrics.SecondsTimer( 532 'chromeos/autotest/drone_manager/refresh_duration'): 533 self.trigger_refresh() 534 self.sync_refresh() 535 536 537 @metrics.SecondsTimerDecorator( 538 'chromeos/autotest/drone_manager/execute_actions_duration') 539 def execute_actions(self): 540 """ 541 Called at the end of a scheduler cycle to execute all queued actions 542 on drones. 543 """ 544 # Invoke calls queued on all drones since the last call to execute 545 # and wait for them to return. 546 if _THREADED_DRONE_MANAGER: 547 thread_lib.ThreadedTaskQueue( 548 name='%s.execute_queue' % self._STATS_KEY).execute( 549 self._drones.values()) 550 else: 551 drone_task_queue.DroneTaskQueue().execute(self._drones.values()) 552 553 try: 554 self._results_drone.execute_queued_calls() 555 except error.AutoservError: 556 m = 'chromeos/autotest/errors/results_repository_failed' 557 metrics.Counter(m).increment( 558 fields={'drone_hostname': self._results_drone.hostname}) 559 self._results_drone.clear_call_queue() 560 561 562 def get_orphaned_autoserv_processes(self): 563 """ 564 Returns a set of Process objects for orphaned processes only. 565 """ 566 return set(process for process in self._process_set 567 if process.ppid == 1) 568 569 570 def kill_process(self, process): 571 """ 572 Kill the given process. 573 """ 574 logging.info('killing %s', process) 575 drone = self._get_drone_for_process(process) 576 drone.queue_call('kill_process', process) 577 578 579 def _ensure_directory_exists(self, path): 580 if not os.path.exists(path): 581 os.makedirs(path) 582 583 584 def total_running_processes(self): 585 return sum(drone.active_processes for drone in self.get_drones()) 586 587 588 def max_runnable_processes(self, username, drone_hostnames_allowed): 589 """ 590 Return the maximum number of processes that can be run (in a single 591 execution) given the current load on drones. 592 @param username: login of user to run a process. may be None. 593 @param drone_hostnames_allowed: list of drones that can be used. May be 594 None 595 """ 596 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue 597 if wrapper.drone.usable_by(username) and 598 (drone_hostnames_allowed is None or 599 wrapper.drone.hostname in 600 drone_hostnames_allowed)] 601 if not usable_drone_wrappers: 602 # all drones disabled or inaccessible 603 return 0 604 runnable_processes = [ 605 wrapper.drone.max_processes - wrapper.drone.active_processes 606 for wrapper in usable_drone_wrappers] 607 return max([0] + runnable_processes) 608 609 610 def _least_loaded_drone(self, drones): 611 drone_to_use = drones[0] 612 for drone in drones[1:]: 613 if drone.used_capacity() < drone_to_use.used_capacity(): 614 drone_to_use = drone 615 return drone_to_use 616 617 618 def _choose_drone_for_execution(self, num_processes, username, 619 drone_hostnames_allowed, 620 require_ssp=False): 621 """Choose a drone to execute command. 622 623 @param num_processes: Number of processes needed for execution. 624 @param username: Name of the user to execute the command. 625 @param drone_hostnames_allowed: A list of names of drone allowed. 626 @param require_ssp: Require server-side packaging to execute the, 627 command, default to False. 628 629 @return: A drone object to be used for execution. 630 """ 631 # cycle through drones is order of increasing used capacity until 632 # we find one that can handle these processes 633 checked_drones = [] 634 usable_drones = [] 635 # Drones do not support server-side packaging, used as backup if no 636 # drone is found to run command requires server-side packaging. 637 no_ssp_drones = [] 638 drone_to_use = None 639 while self._drone_queue: 640 drone = heapq.heappop(self._drone_queue).drone 641 checked_drones.append(drone) 642 logging.info('Checking drone %s', drone.hostname) 643 if not drone.usable_by(username): 644 continue 645 646 drone_allowed = (drone_hostnames_allowed is None 647 or drone.hostname in drone_hostnames_allowed) 648 if not drone_allowed: 649 logging.debug('Drone %s not allowed: ', drone.hostname) 650 continue 651 if require_ssp and not drone.support_ssp: 652 logging.debug('Drone %s does not support server-side ' 653 'packaging.', drone.hostname) 654 no_ssp_drones.append(drone) 655 continue 656 657 usable_drones.append(drone) 658 659 if drone.active_processes + num_processes <= drone.max_processes: 660 drone_to_use = drone 661 break 662 logging.info('Drone %s has %d active + %s requested > %s max', 663 drone.hostname, drone.active_processes, num_processes, 664 drone.max_processes) 665 666 if not drone_to_use and usable_drones: 667 # Drones are all over loaded, pick the one with least load. 668 drone_summary = ','.join('%s %s/%s' % (drone.hostname, 669 drone.active_processes, 670 drone.max_processes) 671 for drone in usable_drones) 672 logging.error('No drone has capacity to handle %d processes (%s) ' 673 'for user %s', num_processes, drone_summary, username) 674 drone_to_use = self._least_loaded_drone(usable_drones) 675 elif not drone_to_use and require_ssp and no_ssp_drones: 676 # No drone supports server-side packaging, choose the least loaded. 677 drone_to_use = self._least_loaded_drone(no_ssp_drones) 678 679 # refill _drone_queue 680 for drone in checked_drones: 681 self._enqueue_drone(drone) 682 683 return drone_to_use 684 685 686 def _substitute_working_directory_into_command(self, command, 687 working_directory): 688 for i, item in enumerate(command): 689 if item is WORKING_DIRECTORY: 690 command[i] = working_directory 691 692 693 def execute_command(self, command, working_directory, pidfile_name, 694 num_processes, log_file=None, paired_with_pidfile=None, 695 username=None, drone_hostnames_allowed=None): 696 """ 697 Execute the given command, taken as an argv list. 698 699 @param command: command to execute as a list. if any item is 700 WORKING_DIRECTORY, the absolute path to the working directory 701 will be substituted for it. 702 @param working_directory: directory in which the pidfile will be written 703 @param pidfile_name: name of the pidfile this process will write 704 @param num_processes: number of processes to account for from this 705 execution 706 @param log_file (optional): path (in the results repository) to hold 707 command output. 708 @param paired_with_pidfile (optional): a PidfileId for an 709 already-executed process; the new process will execute on the 710 same drone as the previous process. 711 @param username (optional): login of the user responsible for this 712 process. 713 @param drone_hostnames_allowed (optional): hostnames of the drones that 714 this command is allowed to 715 execute on 716 """ 717 abs_working_directory = self.absolute_path(working_directory) 718 if not log_file: 719 log_file = self.get_temporary_path('execute') 720 log_file = self.absolute_path(log_file) 721 722 self._substitute_working_directory_into_command(command, 723 abs_working_directory) 724 725 if paired_with_pidfile: 726 drone = self._get_drone_for_pidfile_id(paired_with_pidfile) 727 else: 728 require_ssp = '--require-ssp' in command 729 drone = self._choose_drone_for_execution( 730 num_processes, username, drone_hostnames_allowed, 731 require_ssp=require_ssp) 732 # Enable --warn-no-ssp option for autoserv to log a warning and run 733 # the command without using server-side packaging. 734 if require_ssp and not drone.support_ssp: 735 command.append('--warn-no-ssp') 736 737 if not drone: 738 raise DroneManagerError('command failed; no drones available: %s' 739 % command) 740 741 logging.info("command = %s", command) 742 logging.info('log file = %s:%s', drone.hostname, log_file) 743 self._write_attached_files(working_directory, drone) 744 drone.queue_call('execute_command', command, abs_working_directory, 745 log_file, pidfile_name) 746 drone.active_processes += num_processes 747 self._reorder_drone_queue() 748 749 pidfile_path = os.path.join(abs_working_directory, pidfile_name) 750 pidfile_id = PidfileId(pidfile_path) 751 self.register_pidfile(pidfile_id) 752 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 753 return pidfile_id 754 755 756 def get_pidfile_id_from(self, execution_tag, pidfile_name): 757 path = os.path.join(self.absolute_path(execution_tag), pidfile_name) 758 return PidfileId(path) 759 760 761 def register_pidfile(self, pidfile_id): 762 """ 763 Indicate that the DroneManager should look for the given pidfile when 764 refreshing. 765 """ 766 if pidfile_id not in self._registered_pidfile_info: 767 logging.info('monitoring pidfile %s', pidfile_id) 768 self._registered_pidfile_info[pidfile_id] = _PidfileInfo() 769 self._reset_pidfile_age(pidfile_id) 770 771 772 def _reset_pidfile_age(self, pidfile_id): 773 if pidfile_id in self._registered_pidfile_info: 774 self._registered_pidfile_info[pidfile_id].age = 0 775 776 777 def unregister_pidfile(self, pidfile_id): 778 if pidfile_id in self._registered_pidfile_info: 779 logging.info('forgetting pidfile %s', pidfile_id) 780 del self._registered_pidfile_info[pidfile_id] 781 782 783 def declare_process_count(self, pidfile_id, num_processes): 784 self._registered_pidfile_info[pidfile_id].num_processes = num_processes 785 786 787 def get_pidfile_contents(self, pidfile_id, use_second_read=False): 788 """ 789 Retrieve a PidfileContents object for the given pidfile_id. If 790 use_second_read is True, use results that were read after the processes 791 were checked, instead of before. 792 """ 793 self._reset_pidfile_age(pidfile_id) 794 if use_second_read: 795 pidfile_map = self._pidfiles_second_read 796 else: 797 pidfile_map = self._pidfiles 798 return pidfile_map.get(pidfile_id, PidfileContents()) 799 800 801 def is_process_running(self, process): 802 """ 803 Check if the given process is in the running process list. 804 """ 805 if process in self._process_set: 806 return True 807 808 drone_pid = process.hostname, process.pid 809 if drone_pid in self._all_processes: 810 logging.error('Process %s found, but not an autoserv process. ' 811 'Is %s', process, self._all_processes[drone_pid]) 812 return True 813 814 return False 815 816 817 def get_temporary_path(self, base_name): 818 """ 819 Get a new temporary path guaranteed to be unique across all drones 820 for this scheduler execution. 821 """ 822 self._temporary_path_counter += 1 823 return os.path.join(drone_utility._TEMPORARY_DIRECTORY, 824 '%s.%s' % (base_name, self._temporary_path_counter)) 825 826 827 def absolute_path(self, path, on_results_repository=False): 828 if on_results_repository: 829 base_dir = self._results_dir 830 else: 831 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR, 832 _DRONE_RESULTS_DIR_SUFFIX) 833 return os.path.join(base_dir, path) 834 835 836 def _copy_results_helper(self, process, source_path, destination_path, 837 to_results_repository=False): 838 logging.debug('_copy_results_helper. process: %s, source_path: %s, ' 839 'destination_path: %s, to_results_repository: %s', 840 process, source_path, destination_path, 841 to_results_repository) 842 full_source = self.absolute_path(source_path) 843 full_destination = self.absolute_path( 844 destination_path, on_results_repository=to_results_repository) 845 source_drone = self._get_drone_for_process(process) 846 if to_results_repository: 847 source_drone.send_file_to(self._results_drone, full_source, 848 full_destination, can_fail=True) 849 else: 850 source_drone.queue_call('copy_file_or_directory', full_source, 851 full_destination) 852 853 854 def copy_to_results_repository(self, process, source_path, 855 destination_path=None): 856 """ 857 Copy results from the given process at source_path to destination_path 858 in the results repository. 859 """ 860 if destination_path is None: 861 destination_path = source_path 862 self._copy_results_helper(process, source_path, destination_path, 863 to_results_repository=True) 864 865 866 def copy_results_on_drone(self, process, source_path, destination_path): 867 """ 868 Copy a results directory from one place to another on the drone. 869 """ 870 self._copy_results_helper(process, source_path, destination_path) 871 872 873 def _write_attached_files(self, results_dir, drone): 874 attached_files = self._attached_files.pop(results_dir, {}) 875 for file_path, contents in attached_files.iteritems(): 876 drone.queue_call('write_to_file', self.absolute_path(file_path), 877 contents) 878 879 880 def attach_file_to_execution(self, results_dir, file_contents, 881 file_path=None): 882 """ 883 When the process for the results directory is executed, the given file 884 contents will be placed in a file on the drone. Returns the path at 885 which the file will be placed. 886 """ 887 if not file_path: 888 file_path = self.get_temporary_path('attach') 889 files_for_execution = self._attached_files.setdefault(results_dir, {}) 890 assert file_path not in files_for_execution 891 files_for_execution[file_path] = file_contents 892 return file_path 893 894 895 def write_lines_to_file(self, file_path, lines, paired_with_process=None): 896 """ 897 Write the given lines (as a list of strings) to a file. If 898 paired_with_process is given, the file will be written on the drone 899 running the given Process. Otherwise, the file will be written to the 900 results repository. 901 """ 902 file_contents = '\n'.join(lines) + '\n' 903 if paired_with_process: 904 drone = self._get_drone_for_process(paired_with_process) 905 on_results_repository = False 906 else: 907 drone = self._results_drone 908 on_results_repository = True 909 full_path = self.absolute_path( 910 file_path, on_results_repository=on_results_repository) 911 drone.queue_call('write_to_file', full_path, file_contents) 912 913 914SiteDroneManager = utils.import_site_class( 915 __file__, 'autotest_lib.scheduler.site_drone_manager', 916 'SiteDroneManager', BaseDroneManager) 917 918 919class DroneManager(SiteDroneManager): 920 pass 921 922 923_the_instance = None 924 925def instance(): 926 if _the_instance is None: 927 _set_instance(DroneManager()) 928 return _the_instance 929 930 931def _set_instance(instance): # usable for testing 932 global _the_instance 933 _the_instance = instance 934