1#pylint: disable-msg=C0111 2 3""" This is the module for everything related to the AgentTask. 4 5The BaseAgentTask imposes an interface through which the scheduler can monitor 6a processes; Examples of such processes include Verify, Cleanup and the Queue 7Tasks that run the tests. The scheduler itself only understands Agents. 8Agents: 9 The Agent is the bridge between the scheduler and the AgentTask. The 10 schedulers tick has a method called handle_agents, which calls the 11 tick of each agent in the Dispatchers queue. This leads to the Agent 12 polling its AgentTask. The scheduler will keep polling a task through 13 the associated Agent till the Agent is removed from the dispatcher. 14 15 At a high level: 16 agents finished = tasks done 17 agent polls till finished 18 task polls till done 19 task sets done 20 agent is removed from dispatcher 21AgentTasks: 22 Basic AgentTasks are created when an hqe changes state. Examples of these 23 are the QueueTask, which is created when a hqe goes into the Starting state 24 and the FinalReparseTask, which is created when the hqe goes into parsing. 25SpecialAgentTasks: 26 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted 27 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks. 28 29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps 30an AgentTask to an Agent, which the scheduler understands. From this point 31onward, the scheduler manages the task through the Agents interface,as follows: 32At a high level: 33 task poll 34 start 35 prolog 36 tick till we get an exit code 37 finished(exit==0) 38 done=True 39 epilog 40 cleanup 41 set is_active, is_complete, success (checked in scheduler) 42 43The first special task for an HQE is usually Reset. 44-poll: The first poll will start the task, polls thereafter will call the tasks 45 tick method. A started task will have the started bit set. 46- start: Call prolog, run the process and set the start bit. 47 - prolog: Usually where one puts any model state changes that happen before 48 the actual task. Different per Task. Examples of things that might 49 happen in a prolog: 50 - state of Host, HQE (to something like Resetting) 51 - delete any unwanted queued special tasks 52 - register a pidfile 53 - set the is_active bit on the special task 54 - run: 55 - create a PidfileRunMonitor 56 - pass the autoserv command, working directory etc to drone manager. 57 This will start the actual autoserv process. 58 - set the start bit: so subsequent polls do not 'start' again 59 60- tick: For as long as a started tasks done bit is not set, a poll will lead 61 to a tick. The tick monitors the pid file of the autoserv process 62 running on the drone through the PidfileRunMonitor created in prolog. 63 If the autoserv process has finished we call finished with true/false 64 depending on autoserv exit code. 65 66 - finished: sets the done and success values, then calls epilog. The 67 done bit is important because the Agent polls this bit to 68 measure the success or failure of its task. 69 70 - epilog: Is generally where we set status of the Host/HQE again, 71 requeue any other task that needs to run after this one 72 and perform cleanup. Just like the prolog, this step is 73 different per task. 74 75 - cleanup: Sets the is_active and is_complete and success 76 states on the tasks model. Also uses the 77 drone_manager to: 78 unregister the pidfile 79 copy results of the task 80 (Note this is not to be confused with the 81 special task called cleanup). 82 83 The actions we take in the epilog are based on the 84 success/failure of the autoserv process set in cleanup, 85 eg: if reset failed we will enqueue a repair, but if all 86 is well the epilog will just return. Prejob task epilogs 87 also have an on_pending method that change the status of 88 the HQE to pending/starting, which gets picked up in the 89 scheduler. 90By this point the is_done flag is set, which results in the Agent noticing that 91the task has finished and unregistering it from the dispatcher.Class hierarchy: 92BaseAgentTask 93 |--->SpecialAgentTask (prejob_task.py) 94 |--->RepairTask 95 |--->PreJobTask 96 |--->Verify, Cleanup, Reset, Provision 97 98 |--->AbstractQueueTask (monitor_db.py) 99 |--->QueueTask 100 |--->HostlessQueueTask 101 102 |--->PostJobTask (postjob_task.py) 103 |--->GatherLogsTask 104 |--->SelfThrottledPostJobTask 105 |--->FinalReparseTask 106 |--->ArchiveResultsTask 107 108""" 109 110import logging 111import os 112import urllib 113import time 114 115from autotest_lib.client.common_lib import global_config 116from autotest_lib.client.common_lib import utils 117from autotest_lib.client.common_lib.cros.graphite import autotest_stats 118from autotest_lib.frontend.afe import models 119from autotest_lib.scheduler import drone_manager, pidfile_monitor 120from autotest_lib.scheduler import scheduler_lib 121from autotest_lib.scheduler import rdb_lib 122from autotest_lib.scheduler import scheduler_models 123from autotest_lib.server import autoserv_utils 124from autotest_lib.server import system_utils 125 126CONFIG = global_config.global_config 127AUTOSERV_NICE_LEVEL = 10 128 129ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value( 130 'CROS', 'enable_drone_in_restricted_subnet', type=bool, 131 default=False) 132 133 134class BaseAgentTask(object): 135 class _NullMonitor(object): 136 pidfile_id = None 137 138 def has_process(self): 139 return True 140 141 142 def __init__(self, log_file_name=None): 143 """ 144 @param log_file_name: (optional) name of file to log command output to 145 """ 146 self._drone_manager = drone_manager.instance() 147 self.done = False 148 self.started = False 149 self.success = None 150 self.aborted = False 151 self.monitor = None 152 self.queue_entry_ids = [] 153 self.host_ids = [] 154 # A map between host id and hostname. 155 self.hostnames = {} 156 self._log_file_name = log_file_name 157 158 159 def _set_ids(self, host=None, queue_entries=None): 160 if queue_entries and queue_entries != [None]: 161 self.host_ids = [entry.host.id for entry in queue_entries] 162 self.queue_entry_ids = [entry.id for entry in queue_entries] 163 self.hostnames = dict((entry.host.id, entry.host.hostname) 164 for entry in queue_entries) 165 else: 166 assert host 167 self.host_ids = [host.id] 168 self.hostnames = {host.id: host.hostname} 169 170 171 def poll(self): 172 if not self.started: 173 self.start() 174 if not self.done: 175 self.tick() 176 177 178 def tick(self): 179 assert self.monitor 180 exit_code = self.monitor.exit_code() 181 if exit_code is None: 182 return 183 184 success = (exit_code == 0) 185 self.finished(success) 186 187 188 def is_done(self): 189 return self.done 190 191 192 def finished(self, success): 193 if self.done: 194 assert self.started 195 return 196 self.started = True 197 self.done = True 198 self.success = success 199 self.epilog() 200 201 202 def prolog(self): 203 """ 204 To be overridden. 205 """ 206 assert not self.monitor 207 self.register_necessary_pidfiles() 208 209 210 def _log_file(self): 211 if not self._log_file_name: 212 return None 213 return os.path.join(self._working_directory(), self._log_file_name) 214 215 216 def cleanup(self): 217 log_file = self._log_file() 218 if self.monitor and log_file: 219 self.monitor.try_copy_to_results_repository(log_file) 220 221 222 def epilog(self): 223 """ 224 To be overridden. 225 """ 226 self.cleanup() 227 logging.info("%s finished with success=%s", type(self).__name__, 228 self.success) 229 230 231 def start(self): 232 if not self.started: 233 self.prolog() 234 self.run() 235 236 self.started = True 237 238 239 def abort(self): 240 if self.monitor: 241 self.monitor.kill() 242 self.done = True 243 self.aborted = True 244 self.cleanup() 245 246 247 def _get_consistent_execution_path(self, execution_entries): 248 first_execution_path = execution_entries[0].execution_path() 249 for execution_entry in execution_entries[1:]: 250 assert execution_entry.execution_path() == first_execution_path, ( 251 '%s (%s) != %s (%s)' % (execution_entry.execution_path(), 252 execution_entry, 253 first_execution_path, 254 execution_entries[0])) 255 return first_execution_path 256 257 258 def _copy_results(self, execution_entries, use_monitor=None): 259 """ 260 @param execution_entries: list of objects with execution_path() method 261 """ 262 if use_monitor is not None and not use_monitor.has_process(): 263 return 264 265 assert len(execution_entries) > 0 266 if use_monitor is None: 267 assert self.monitor 268 use_monitor = self.monitor 269 assert use_monitor.has_process() 270 execution_path = self._get_consistent_execution_path(execution_entries) 271 results_path = execution_path + '/' 272 use_monitor.try_copy_to_results_repository(results_path) 273 274 275 def _parse_results(self, queue_entries): 276 for queue_entry in queue_entries: 277 queue_entry.set_status(models.HostQueueEntry.Status.PARSING) 278 279 280 def _archive_results(self, queue_entries): 281 for queue_entry in queue_entries: 282 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING) 283 284 285 def _command_line(self): 286 """ 287 Return the command line to run. Must be overridden. 288 """ 289 raise NotImplementedError 290 291 292 @property 293 def num_processes(self): 294 """ 295 Return the number of processes forked by this BaseAgentTask's process. 296 It may only be approximate. To be overridden if necessary. 297 """ 298 return 1 299 300 301 def _paired_with_monitor(self): 302 """ 303 If this BaseAgentTask's process must run on the same machine as some 304 previous process, this method should be overridden to return a 305 PidfileRunMonitor for that process. 306 """ 307 return self._NullMonitor() 308 309 310 @property 311 def owner_username(self): 312 """ 313 Return login of user responsible for this task. May be None. Must be 314 overridden. 315 """ 316 raise NotImplementedError 317 318 319 def _working_directory(self): 320 """ 321 Return the directory where this BaseAgentTask's process executes. 322 Must be overridden. 323 """ 324 raise NotImplementedError 325 326 327 def _pidfile_name(self): 328 """ 329 Return the name of the pidfile this BaseAgentTask's process uses. To be 330 overridden if necessary. 331 """ 332 return drone_manager.AUTOSERV_PID_FILE 333 334 335 def _check_paired_results_exist(self): 336 if not self._paired_with_monitor().has_process(): 337 metadata = { 338 '_type': 'scheduler_error', 339 'error': 'No paired results in task', 340 'task': str(self), 341 'pidfile_id': str(self._paired_with_monitor().pidfile_id)} 342 autotest_stats.Counter('no_paired_results_in_task', 343 metadata=metadata).increment() 344 self.finished(False) 345 return False 346 return True 347 348 349 def _create_monitor(self): 350 assert not self.monitor 351 self.monitor = pidfile_monitor.PidfileRunMonitor() 352 353 354 def run(self): 355 if not self._check_paired_results_exist(): 356 return 357 358 self._create_monitor() 359 self.monitor.run( 360 self._command_line(), self._working_directory(), 361 num_processes=self.num_processes, 362 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(), 363 pidfile_name=self._pidfile_name(), 364 paired_with_pidfile=self._paired_with_monitor().pidfile_id, 365 username=self.owner_username, 366 drone_hostnames_allowed=self.get_drone_hostnames_allowed()) 367 368 369 def get_drone_hostnames_allowed( 370 self, restricted_subnets=utils.RESTRICTED_SUBNETS, 371 enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET): 372 filtered_drones = None 373 has_unrestricted_host = False 374 if (self.hostnames and restricted_subnets and enable_drone_in_subnet): 375 for hostname in self.hostnames.values(): 376 subnet = utils.get_restricted_subnet(hostname, 377 restricted_subnets) 378 379 # Return an empty set if the list of hosts exists both in 380 # restricted and unrestricted subnet. No drone can work in such 381 # case. 382 if ((not subnet and filtered_drones is not None) or 383 (subnet and has_unrestricted_host)): 384 logging.error('The test has some DUT in restricted subnet, ' 385 'but some in unrestricted subnet. Therefore, ' 386 'no drone is available to run the test.') 387 return set() 388 389 if not subnet: 390 has_unrestricted_host = True 391 continue 392 393 server_ip_map=system_utils.DroneCache.get_drone_ip_map() 394 filtered_drones_for_host = set( 395 utils.get_servers_in_same_subnet( 396 subnet[0], subnet[1], 397 server_ip_map=server_ip_map)) 398 logging.info('DUT %s is in restricted subnet, drone can only ' 399 'be chosen from %s', hostname, 400 filtered_drones_for_host) 401 if filtered_drones is None: 402 filtered_drones = filtered_drones_for_host 403 else: 404 filtered_drones = set.intersection( 405 filtered_drones, filtered_drones_for_host) 406 407 # If filtered_drones is an empty set, that means no drone is 408 # allowed to run the task. This is different fron None, which 409 # means all drones are allowed. 410 if filtered_drones == set(): 411 logging.error('DUT(s) is in restricted subnet, but no ' 412 'drone is available to run the test.') 413 return filtered_drones 414 415 # If host is not in restricted subnet, use the unrestricted drones only. 416 if (filtered_drones is None and restricted_subnets and 417 enable_drone_in_subnet): 418 filtered_drones = set( 419 system_utils.DroneCache.get_unrestricted_drones( 420 restricted_subnets=restricted_subnets)) 421 422 if not models.DroneSet.drone_sets_enabled(): 423 return filtered_drones 424 425 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids) 426 if not hqes: 427 # Only special tasks could be missing host queue entries 428 assert isinstance(self, SpecialAgentTask) 429 return self._user_or_global_default_drone_set( 430 self.task, self.task.requested_by) 431 432 job_ids = hqes.values_list('job', flat=True).distinct() 433 assert job_ids.count() == 1, ("BaseAgentTask's queue entries " 434 "span multiple jobs") 435 436 job = models.Job.objects.get(id=job_ids[0]) 437 drone_set = job.drone_set 438 if not drone_set: 439 return self._user_or_global_default_drone_set(job, job.user()) 440 441 if filtered_drones: 442 return set.intersection(filtered_drones, 443 drone_set.get_drone_hostnames()) 444 else: 445 return drone_set.get_drone_hostnames() 446 447 448 def _user_or_global_default_drone_set(self, obj_with_owner, user): 449 """ 450 Returns the user's default drone set, if present. 451 452 Otherwise, returns the global default drone set. 453 """ 454 default_hostnames = models.DroneSet.get_default().get_drone_hostnames() 455 if not user: 456 logging.warning('%s had no owner; using default drone set', 457 obj_with_owner) 458 return default_hostnames 459 if not user.drone_set: 460 logging.warning('User %s has no default drone set, using global ' 461 'default', user.login) 462 return default_hostnames 463 return user.drone_set.get_drone_hostnames() 464 465 466 def register_necessary_pidfiles(self): 467 pidfile_id = self._drone_manager.get_pidfile_id_from( 468 self._working_directory(), self._pidfile_name()) 469 self._drone_manager.register_pidfile(pidfile_id) 470 471 paired_pidfile_id = self._paired_with_monitor().pidfile_id 472 if paired_pidfile_id: 473 self._drone_manager.register_pidfile(paired_pidfile_id) 474 475 476 def recover(self): 477 if not self._check_paired_results_exist(): 478 return 479 480 self._create_monitor() 481 self.monitor.attach_to_existing_process( 482 self._working_directory(), pidfile_name=self._pidfile_name(), 483 num_processes=self.num_processes) 484 if not self.monitor.has_process(): 485 # no process to recover; wait to be started normally 486 self.monitor = None 487 return 488 489 self.started = True 490 logging.info('Recovering process %s for %s at %s', 491 self.monitor.get_process(), type(self).__name__, 492 self._working_directory()) 493 494 495 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 496 allowed_host_statuses=None): 497 class_name = self.__class__.__name__ 498 for entry in queue_entries: 499 if entry.status not in allowed_hqe_statuses: 500 raise scheduler_lib.SchedulerError( 501 '%s attempting to start entry with invalid status %s: ' 502 '%s' % (class_name, entry.status, entry)) 503 invalid_host_status = ( 504 allowed_host_statuses is not None 505 and entry.host.status not in allowed_host_statuses) 506 if invalid_host_status: 507 raise scheduler_lib.SchedulerError( 508 '%s attempting to start on queue entry with invalid ' 509 'host status %s: %s' 510 % (class_name, entry.host.status, entry)) 511 512 513SiteAgentTask = utils.import_site_class( 514 __file__, 'autotest_lib.scheduler.site_monitor_db', 515 'SiteAgentTask', BaseAgentTask) 516 517class AgentTask(SiteAgentTask): 518 pass 519 520 521class TaskWithJobKeyvals(object): 522 """AgentTask mixin providing functionality to help with job keyval files.""" 523 _KEYVAL_FILE = 'keyval' 524 def _format_keyval(self, key, value): 525 return '%s=%s' % (key, value) 526 527 528 def _keyval_path(self): 529 """Subclasses must override this""" 530 raise NotImplementedError 531 532 533 def _write_keyval_after_job(self, field, value): 534 assert self.monitor 535 if not self.monitor.has_process(): 536 return 537 self._drone_manager.write_lines_to_file( 538 self._keyval_path(), [self._format_keyval(field, value)], 539 paired_with_process=self.monitor.get_process()) 540 541 542 def _job_queued_keyval(self, job): 543 return 'job_queued', int(time.mktime(job.created_on.timetuple())) 544 545 546 def _write_job_finished(self): 547 self._write_keyval_after_job("job_finished", int(time.time())) 548 549 550 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path): 551 keyval_contents = '\n'.join(self._format_keyval(key, value) 552 for key, value in keyval_dict.iteritems()) 553 # always end with a newline to allow additional keyvals to be written 554 keyval_contents += '\n' 555 self._drone_manager.attach_file_to_execution(self._working_directory(), 556 keyval_contents, 557 file_path=keyval_path) 558 559 560 def _write_keyvals_before_job(self, keyval_dict): 561 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path()) 562 563 564 def _write_host_keyvals(self, host): 565 keyval_path = os.path.join(self._working_directory(), 'host_keyvals', 566 host.hostname) 567 platform, all_labels = host.platform_and_labels() 568 all_labels = [ urllib.quote(label) for label in all_labels ] 569 keyval_dict = dict(platform=platform, labels=','.join(all_labels)) 570 self._write_keyvals_before_job_helper(keyval_dict, keyval_path) 571 572 573class SpecialAgentTask(AgentTask, TaskWithJobKeyvals): 574 """ 575 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB. 576 """ 577 578 TASK_TYPE = None 579 host = None 580 queue_entry = None 581 582 def __init__(self, task, extra_command_args): 583 super(SpecialAgentTask, self).__init__() 584 585 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden' 586 587 self.host = rdb_lib.get_hosts([task.host.id])[0] 588 self.host.dbg_str = 'Task: %s' % str(task) 589 self.queue_entry = None 590 if task.queue_entry: 591 self.queue_entry = scheduler_models.HostQueueEntry( 592 id=task.queue_entry.id) 593 self.host.dbg_str += self.queue_entry.get_dbg_str() 594 595 self.task = task 596 self._extra_command_args = extra_command_args 597 self.host.metadata = self.get_metadata() 598 599 600 def get_metadata(self): 601 """Get a dictionary that contains task information. 602 603 The return value is a dictionary that includes task information like id, 604 name and related job information. The value will be stored in metadata 605 database. 606 @return: A dictionary containing the task id, name and related job id. 607 If some attributes are failed to be accessed, an empty 608 dictionary will be returned, and error will be logged. 609 """ 610 try: 611 metadata = {'task_id':self.task.id, 'task_name':self.task.task, 612 'hostname':self.task.host.hostname} 613 if self.task.queue_entry: 614 job = self.task.queue_entry.job 615 metadata.update( 616 scheduler_models.get_job_metadata(job)) 617 return metadata 618 except AttributeError as e: 619 logging.error('Task has missing attribute: %s', e) 620 return {} 621 622 623 def _keyval_path(self): 624 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 625 626 627 def _command_line(self): 628 return autoserv_utils._autoserv_command_line(self.host.hostname, 629 self._extra_command_args, 630 queue_entry=self.queue_entry, 631 in_lab=True) 632 633 634 def _working_directory(self): 635 return self.task.execution_path() 636 637 638 @property 639 def owner_username(self): 640 if self.task.requested_by: 641 return self.task.requested_by.login 642 return None 643 644 645 def prolog(self): 646 super(SpecialAgentTask, self).prolog() 647 self.task.activate() 648 self._write_host_keyvals(self.host) 649 650 651 def _fail_queue_entry(self): 652 assert self.queue_entry 653 654 if self.queue_entry.meta_host: 655 return # don't fail metahost entries, they'll be reassigned 656 657 self.queue_entry.update_from_database() 658 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED: 659 return # entry has been aborted 660 661 self._actually_fail_queue_entry() 662 663 664 # TODO(milleral): http://crbug.com/268607 665 # All this used to be a part of _fail_queue_entry. The 666 # exact semantics of when one should and should not be failing a queue 667 # entry need to be worked out, because provisioning has placed us in a 668 # case where we want to fail a queue entry that could be requeued, 669 # which makes us fail the two above if statements, and thus 670 # _fail_queue_entry() would exit early and have no effect. 671 # What's left here with _actually_fail_queue_entry is a hack to be able to 672 # bypass the checks and unconditionally execute the code. 673 def _actually_fail_queue_entry(self): 674 self.queue_entry.set_execution_subdir() 675 queued_key, queued_time = self._job_queued_keyval( 676 self.queue_entry.job) 677 self._write_keyval_after_job(queued_key, queued_time) 678 self._write_job_finished() 679 680 # copy results logs into the normal place for job results 681 self.monitor.try_copy_results_on_drone( 682 source_path=self._working_directory() + '/', 683 destination_path=self.queue_entry.execution_path() + '/') 684 685 pidfile_id = self._drone_manager.get_pidfile_id_from( 686 self.queue_entry.execution_path(), 687 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 688 self._drone_manager.register_pidfile(pidfile_id) 689 690 if self.queue_entry.job.parse_failed_repair: 691 self._parse_results([self.queue_entry]) 692 else: 693 self._archive_results([self.queue_entry]) 694 695 # Also fail all other special tasks that have not yet run for this HQE 696 pending_tasks = models.SpecialTask.objects.filter( 697 queue_entry__id=self.queue_entry.id, 698 is_complete=0) 699 for task in pending_tasks: 700 task.finish(False) 701 702 703 def cleanup(self): 704 super(SpecialAgentTask, self).cleanup() 705 706 # We will consider an aborted task to be "Failed" 707 self.task.finish(bool(self.success)) 708 709 if self.monitor: 710 if self.monitor.has_process(): 711 self._copy_results([self.task]) 712 if self.monitor.pidfile_id is not None: 713 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id) 714 715 716 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False): 717 """Remove a type of special task in all tasks, keep last one if needed. 718 719 @param special_task_to_remove: type of special task to be removed, e.g., 720 models.SpecialTask.Task.VERIFY. 721 @param keep_last_one: True to keep the last special task if its type is 722 the same as of special_task_to_remove. 723 724 """ 725 queued_special_tasks = models.SpecialTask.objects.filter( 726 host__id=self.host.id, 727 task=special_task_to_remove, 728 is_active=False, is_complete=False, queue_entry=None) 729 if keep_last_one: 730 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id) 731 queued_special_tasks.delete() 732 733 734 def _generate_autoserv_label_args(self, task): 735 """ 736 @param task: An instance of afe model's SpecialTask. 737 @returns: The list of arguments to pass to autoserv to tell it what the 738 labels of a job are. 739 740 """ 741 labels = {x.name for x in task.queue_entry.job.labels} 742 return ['--job-labels', ','.join(labels)] 743