1#pylint: disable-msg=C0111 2 3""" This is the module for everything related to the AgentTask. 4 5The BaseAgentTask imposes an interface through which the scheduler can monitor 6a processes; Examples of such processes include Verify, Cleanup and the Queue 7Tasks that run the tests. The scheduler itself only understands Agents. 8Agents: 9 The Agent is the bridge between the scheduler and the AgentTask. The 10 schedulers tick has a method called handle_agents, which calls the 11 tick of each agent in the Dispatchers queue. This leads to the Agent 12 polling its AgentTask. The scheduler will keep polling a task through 13 the associated Agent till the Agent is removed from the dispatcher. 14 15 At a high level: 16 agents finished = tasks done 17 agent polls till finished 18 task polls till done 19 task sets done 20 agent is removed from dispatcher 21AgentTasks: 22 Basic AgentTasks are created when an hqe changes state. Examples of these 23 are the QueueTask, which is created when a hqe goes into the Starting state 24 and the FinalReparseTask, which is created when the hqe goes into parsing. 25SpecialAgentTasks: 26 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted 27 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks. 28 29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps 30an AgentTask to an Agent, which the scheduler understands. From this point 31onward, the scheduler manages the task through the Agents interface,as follows: 32At a high level: 33 task poll 34 start 35 prolog 36 tick till we get an exit code 37 finished(exit==0) 38 done=True 39 epilog 40 cleanup 41 set is_active, is_complete, success (checked in scheduler) 42 43The first special task for an HQE is usually Reset. 44-poll: The first poll will start the task, polls thereafter will call the tasks 45 tick method. A started task will have the started bit set. 46- start: Call prolog, run the process and set the start bit. 47 - prolog: Usually where one puts any model state changes that happen before 48 the actual task. Different per Task. Examples of things that might 49 happen in a prolog: 50 - state of Host, HQE (to something like Resetting) 51 - delete any unwanted queued special tasks 52 - register a pidfile 53 - set the is_active bit on the special task 54 - run: 55 - create a PidfileRunMonitor 56 - pass the autoserv command, working directory etc to drone manager. 57 This will start the actual autoserv process. 58 - set the start bit: so subsequent polls do not 'start' again 59 60- tick: For as long as a started tasks done bit is not set, a poll will lead 61 to a tick. The tick monitors the pid file of the autoserv process 62 running on the drone through the PidfileRunMonitor created in prolog. 63 If the autoserv process has finished we call finished with true/false 64 depending on autoserv exit code. 65 66 - finished: sets the done and success values, then calls epilog. The 67 done bit is important because the Agent polls this bit to 68 measure the success or failure of its task. 69 70 - epilog: Is generally where we set status of the Host/HQE again, 71 requeue any other task that needs to run after this one 72 and perform cleanup. Just like the prolog, this step is 73 different per task. 74 75 - cleanup: Sets the is_active and is_complete and success 76 states on the tasks model. Also uses the 77 drone_manager to: 78 unregister the pidfile 79 copy results of the task 80 (Note this is not to be confused with the 81 special task called cleanup). 82 83 The actions we take in the epilog are based on the 84 success/failure of the autoserv process set in cleanup, 85 eg: if reset failed we will enqueue a repair, but if all 86 is well the epilog will just return. Prejob task epilogs 87 also have an on_pending method that change the status of 88 the HQE to pending/starting, which gets picked up in the 89 scheduler. 90By this point the is_done flag is set, which results in the Agent noticing that 91the task has finished and unregistering it from the dispatcher.Class hierarchy: 92BaseAgentTask 93 |--->SpecialAgentTask (prejob_task.py) 94 |--->RepairTask 95 |--->PreJobTask 96 |--->Verify, Cleanup, Reset, Provision 97 98 |--->AbstractQueueTask (monitor_db.py) 99 |--->QueueTask 100 |--->HostlessQueueTask 101 102 |--->PostJobTask (postjob_task.py) 103 |--->GatherLogsTask 104 |--->SelfThrottledPostJobTask 105 |--->FinalReparseTask 106 |--->ArchiveResultsTask 107 108""" 109 110import logging 111import os 112import urllib 113import time 114 115from autotest_lib.client.common_lib import global_config 116from autotest_lib.client.common_lib import utils 117from autotest_lib.frontend.afe import models 118from autotest_lib.scheduler import drone_manager, pidfile_monitor 119from autotest_lib.scheduler import scheduler_lib 120from autotest_lib.scheduler import rdb_lib 121from autotest_lib.scheduler import scheduler_models 122from autotest_lib.server import autoserv_utils 123from autotest_lib.server import system_utils 124 125try: 126 from chromite.lib import metrics 127except ImportError: 128 metrics = utils.metrics_mock 129 130 131CONFIG = global_config.global_config 132AUTOSERV_NICE_LEVEL = 10 133 134ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value( 135 'CROS', 'enable_drone_in_restricted_subnet', type=bool, 136 default=False) 137 138 139class BaseAgentTask(object): 140 class _NullMonitor(object): 141 pidfile_id = None 142 143 def has_process(self): 144 return True 145 146 147 def __init__(self, log_file_name=None): 148 """ 149 @param log_file_name: (optional) name of file to log command output to 150 """ 151 self._drone_manager = drone_manager.instance() 152 self.done = False 153 self.started = False 154 self.success = None 155 self.aborted = False 156 self.monitor = None 157 self.queue_entry_ids = [] 158 self.host_ids = [] 159 # A map between host id and hostname. 160 self.hostnames = {} 161 self._log_file_name = log_file_name 162 163 164 def _set_ids(self, host=None, queue_entries=None): 165 if queue_entries and queue_entries != [None]: 166 self.host_ids = [entry.host.id for entry in queue_entries] 167 self.queue_entry_ids = [entry.id for entry in queue_entries] 168 self.hostnames = dict((entry.host.id, entry.host.hostname) 169 for entry in queue_entries) 170 else: 171 assert host 172 self.host_ids = [host.id] 173 self.hostnames = {host.id: host.hostname} 174 175 176 def poll(self): 177 if not self.started: 178 self.start() 179 if not self.done: 180 self.tick() 181 182 183 def tick(self): 184 assert self.monitor 185 exit_code = self.monitor.exit_code() 186 if exit_code is None: 187 return 188 189 success = (exit_code == 0) 190 self.finished(success) 191 192 193 def is_done(self): 194 return self.done 195 196 197 def finished(self, success): 198 if self.done: 199 assert self.started 200 return 201 self.started = True 202 self.done = True 203 self.success = success 204 self.epilog() 205 206 207 def prolog(self): 208 """ 209 To be overridden. 210 """ 211 assert not self.monitor 212 self.register_necessary_pidfiles() 213 214 215 def _log_file(self): 216 if not self._log_file_name: 217 return None 218 return os.path.join(self._working_directory(), self._log_file_name) 219 220 221 def cleanup(self): 222 log_file = self._log_file() 223 if self.monitor and log_file: 224 self.monitor.try_copy_to_results_repository(log_file) 225 226 227 def epilog(self): 228 """ 229 To be overridden. 230 """ 231 self.cleanup() 232 logging.info("%s finished with success=%s", type(self).__name__, 233 self.success) 234 235 236 def start(self): 237 if not self.started: 238 self.prolog() 239 self.run() 240 241 self.started = True 242 243 244 def abort(self): 245 if self.monitor: 246 self.monitor.kill() 247 self.done = True 248 self.aborted = True 249 self.cleanup() 250 251 252 def _get_consistent_execution_path(self, execution_entries): 253 first_execution_path = execution_entries[0].execution_path() 254 for execution_entry in execution_entries[1:]: 255 assert execution_entry.execution_path() == first_execution_path, ( 256 '%s (%s) != %s (%s)' % (execution_entry.execution_path(), 257 execution_entry, 258 first_execution_path, 259 execution_entries[0])) 260 return first_execution_path 261 262 263 def _copy_results(self, execution_entries, use_monitor=None): 264 """ 265 @param execution_entries: list of objects with execution_path() method 266 """ 267 if use_monitor is not None and not use_monitor.has_process(): 268 return 269 270 assert len(execution_entries) > 0 271 if use_monitor is None: 272 assert self.monitor 273 use_monitor = self.monitor 274 assert use_monitor.has_process() 275 execution_path = self._get_consistent_execution_path(execution_entries) 276 results_path = execution_path + '/' 277 use_monitor.try_copy_to_results_repository(results_path) 278 279 280 def _parse_results(self, queue_entries): 281 for queue_entry in queue_entries: 282 queue_entry.set_status(models.HostQueueEntry.Status.PARSING) 283 284 285 def _archive_results(self, queue_entries): 286 for queue_entry in queue_entries: 287 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING) 288 289 290 def _command_line(self): 291 """ 292 Return the command line to run. Must be overridden. 293 """ 294 raise NotImplementedError 295 296 297 @property 298 def num_processes(self): 299 """ 300 Return the number of processes forked by this BaseAgentTask's process. 301 It may only be approximate. To be overridden if necessary. 302 """ 303 return 1 304 305 306 def _paired_with_monitor(self): 307 """ 308 If this BaseAgentTask's process must run on the same machine as some 309 previous process, this method should be overridden to return a 310 PidfileRunMonitor for that process. 311 """ 312 return self._NullMonitor() 313 314 315 @property 316 def owner_username(self): 317 """ 318 Return login of user responsible for this task. May be None. Must be 319 overridden. 320 """ 321 raise NotImplementedError 322 323 324 def _working_directory(self): 325 """ 326 Return the directory where this BaseAgentTask's process executes. 327 Must be overridden. 328 """ 329 raise NotImplementedError 330 331 332 def _pidfile_name(self): 333 """ 334 Return the name of the pidfile this BaseAgentTask's process uses. To be 335 overridden if necessary. 336 """ 337 return drone_manager.AUTOSERV_PID_FILE 338 339 340 def _check_paired_results_exist(self): 341 if not self._paired_with_monitor().has_process(): 342 metrics.Counter( 343 'chromeos/autotest/errors/scheduler/no_paired_results' 344 ).increment() 345 self.finished(False) 346 return False 347 return True 348 349 350 def _create_monitor(self): 351 assert not self.monitor 352 self.monitor = pidfile_monitor.PidfileRunMonitor() 353 354 355 def run(self): 356 if not self._check_paired_results_exist(): 357 return 358 359 self._create_monitor() 360 self.monitor.run( 361 self._command_line(), self._working_directory(), 362 num_processes=self.num_processes, 363 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(), 364 pidfile_name=self._pidfile_name(), 365 paired_with_pidfile=self._paired_with_monitor().pidfile_id, 366 username=self.owner_username, 367 drone_hostnames_allowed=self.get_drone_hostnames_allowed()) 368 369 370 def get_drone_hostnames_allowed( 371 self, restricted_subnets=utils.RESTRICTED_SUBNETS, 372 enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET): 373 filtered_drones = None 374 has_unrestricted_host = False 375 if (self.hostnames and restricted_subnets and enable_drone_in_subnet): 376 for hostname in self.hostnames.values(): 377 subnet = utils.get_restricted_subnet(hostname, 378 restricted_subnets) 379 380 # Return an empty set if the list of hosts exists both in 381 # restricted and unrestricted subnet. No drone can work in such 382 # case. 383 if ((not subnet and filtered_drones is not None) or 384 (subnet and has_unrestricted_host)): 385 logging.error('The test has some DUT in restricted subnet, ' 386 'but some in unrestricted subnet. Therefore, ' 387 'no drone is available to run the test.') 388 return set() 389 390 if not subnet: 391 has_unrestricted_host = True 392 continue 393 394 server_ip_map=system_utils.DroneCache.get_drone_ip_map() 395 filtered_drones_for_host = set( 396 utils.get_servers_in_same_subnet( 397 subnet[0], subnet[1], 398 server_ip_map=server_ip_map)) 399 logging.info('DUT %s is in restricted subnet, drone can only ' 400 'be chosen from %s', hostname, 401 filtered_drones_for_host) 402 if filtered_drones is None: 403 filtered_drones = filtered_drones_for_host 404 else: 405 filtered_drones = set.intersection( 406 filtered_drones, filtered_drones_for_host) 407 408 # If filtered_drones is an empty set, that means no drone is 409 # allowed to run the task. This is different fron None, which 410 # means all drones are allowed. 411 if filtered_drones == set(): 412 logging.error('DUT(s) is in restricted subnet, but no ' 413 'drone is available to run the test.') 414 return filtered_drones 415 416 # If host is not in restricted subnet, use the unrestricted drones only. 417 if (filtered_drones is None and restricted_subnets and 418 enable_drone_in_subnet): 419 filtered_drones = set( 420 system_utils.DroneCache.get_unrestricted_drones( 421 restricted_subnets=restricted_subnets)) 422 423 if not models.DroneSet.drone_sets_enabled(): 424 return filtered_drones 425 426 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids) 427 if not hqes: 428 # Only special tasks could be missing host queue entries 429 assert isinstance(self, SpecialAgentTask) 430 return self._user_or_global_default_drone_set( 431 self.task, self.task.requested_by) 432 433 job_ids = hqes.values_list('job', flat=True).distinct() 434 assert job_ids.count() == 1, ("BaseAgentTask's queue entries " 435 "span multiple jobs") 436 437 job = models.Job.objects.get(id=job_ids[0]) 438 drone_set = job.drone_set 439 if not drone_set: 440 return self._user_or_global_default_drone_set(job, job.user()) 441 442 if filtered_drones: 443 return set.intersection(filtered_drones, 444 drone_set.get_drone_hostnames()) 445 else: 446 return drone_set.get_drone_hostnames() 447 448 449 def _user_or_global_default_drone_set(self, obj_with_owner, user): 450 """ 451 Returns the user's default drone set, if present. 452 453 Otherwise, returns the global default drone set. 454 """ 455 default_hostnames = models.DroneSet.get_default().get_drone_hostnames() 456 if not user: 457 logging.warning('%s had no owner; using default drone set', 458 obj_with_owner) 459 return default_hostnames 460 if not user.drone_set: 461 logging.warning('User %s has no default drone set, using global ' 462 'default', user.login) 463 return default_hostnames 464 return user.drone_set.get_drone_hostnames() 465 466 467 def register_necessary_pidfiles(self): 468 pidfile_id = self._drone_manager.get_pidfile_id_from( 469 self._working_directory(), self._pidfile_name()) 470 self._drone_manager.register_pidfile(pidfile_id) 471 472 paired_pidfile_id = self._paired_with_monitor().pidfile_id 473 if paired_pidfile_id: 474 self._drone_manager.register_pidfile(paired_pidfile_id) 475 476 477 def recover(self): 478 if not self._check_paired_results_exist(): 479 return 480 481 self._create_monitor() 482 self.monitor.attach_to_existing_process( 483 self._working_directory(), pidfile_name=self._pidfile_name(), 484 num_processes=self.num_processes) 485 if not self.monitor.has_process(): 486 # no process to recover; wait to be started normally 487 self.monitor = None 488 return 489 490 self.started = True 491 logging.info('Recovering process %s for %s at %s', 492 self.monitor.get_process(), type(self).__name__, 493 self._working_directory()) 494 495 496 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 497 allowed_host_statuses=None): 498 class_name = self.__class__.__name__ 499 for entry in queue_entries: 500 if entry.status not in allowed_hqe_statuses: 501 raise scheduler_lib.SchedulerError( 502 '%s attempting to start entry with invalid status %s: ' 503 '%s' % (class_name, entry.status, entry)) 504 invalid_host_status = ( 505 allowed_host_statuses is not None 506 and entry.host.status not in allowed_host_statuses) 507 if invalid_host_status: 508 raise scheduler_lib.SchedulerError( 509 '%s attempting to start on queue entry with invalid ' 510 'host status %s: %s' 511 % (class_name, entry.host.status, entry)) 512 513 514SiteAgentTask = utils.import_site_class( 515 __file__, 'autotest_lib.scheduler.site_monitor_db', 516 'SiteAgentTask', BaseAgentTask) 517 518class AgentTask(SiteAgentTask): 519 pass 520 521 522class TaskWithJobKeyvals(object): 523 """AgentTask mixin providing functionality to help with job keyval files.""" 524 _KEYVAL_FILE = 'keyval' 525 def _format_keyval(self, key, value): 526 return '%s=%s' % (key, value) 527 528 529 def _keyval_path(self): 530 """Subclasses must override this""" 531 raise NotImplementedError 532 533 534 def _write_keyval_after_job(self, field, value): 535 assert self.monitor 536 if not self.monitor.has_process(): 537 return 538 self._drone_manager.write_lines_to_file( 539 self._keyval_path(), [self._format_keyval(field, value)], 540 paired_with_process=self.monitor.get_process()) 541 542 543 def _job_queued_keyval(self, job): 544 return 'job_queued', int(time.mktime(job.created_on.timetuple())) 545 546 547 def _write_job_finished(self): 548 self._write_keyval_after_job("job_finished", int(time.time())) 549 550 551 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path): 552 keyval_contents = '\n'.join(self._format_keyval(key, value) 553 for key, value in keyval_dict.iteritems()) 554 # always end with a newline to allow additional keyvals to be written 555 keyval_contents += '\n' 556 self._drone_manager.attach_file_to_execution(self._working_directory(), 557 keyval_contents, 558 file_path=keyval_path) 559 560 561 def _write_keyvals_before_job(self, keyval_dict): 562 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path()) 563 564 565 def _write_host_keyvals(self, host): 566 keyval_path = os.path.join(self._working_directory(), 'host_keyvals', 567 host.hostname) 568 platform, all_labels = host.platform_and_labels() 569 all_labels = [ urllib.quote(label) for label in all_labels ] 570 keyval_dict = dict(platform=platform, labels=','.join(all_labels)) 571 self._write_keyvals_before_job_helper(keyval_dict, keyval_path) 572 573 574class SpecialAgentTask(AgentTask, TaskWithJobKeyvals): 575 """ 576 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB. 577 """ 578 579 TASK_TYPE = None 580 host = None 581 queue_entry = None 582 _COUNT_METRIC = 'chromeos/autotest/scheduler/special_task_count' 583 _DUT_METRIC = 'chromeos/autotest/scheduler/special_task_by_dut' 584 _DURATION_METRIC = 'chromeos/autotest/scheduler/special_task_durations' 585 586 587 def __init__(self, task, extra_command_args): 588 super(SpecialAgentTask, self).__init__() 589 590 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden' 591 592 self.host = rdb_lib.get_hosts([task.host.id])[0] 593 self.host.dbg_str = 'Task: %s' % str(task) 594 self.queue_entry = None 595 if task.queue_entry: 596 self.queue_entry = scheduler_models.HostQueueEntry( 597 id=task.queue_entry.id) 598 self.host.dbg_str += self.queue_entry.get_dbg_str() 599 600 # This is of type SpecialTask (as defined in frontend/afe/models.py) 601 self.task = task 602 self._extra_command_args = extra_command_args 603 self.host.metadata = self.get_metadata() 604 self._milestone = '' 605 606 607 def get_metadata(self): 608 """Get a dictionary that contains task information. 609 610 The return value is a dictionary that includes task information like id, 611 name and related job information. The value will be stored in metadata 612 database. 613 @return: A dictionary containing the task id, name and related job id. 614 If some attributes are failed to be accessed, an empty 615 dictionary will be returned, and error will be logged. 616 """ 617 try: 618 metadata = {'task_id':self.task.id, 'task_name':self.task.task, 619 'hostname':self.task.host.hostname} 620 if self.task.queue_entry: 621 job = self.task.queue_entry.job 622 metadata.update( 623 scheduler_models.get_job_metadata(job)) 624 return metadata 625 except AttributeError as e: 626 logging.error('Task has missing attribute: %s', e) 627 return {} 628 629 630 def _keyval_path(self): 631 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 632 633 634 def _command_line(self): 635 return autoserv_utils._autoserv_command_line(self.host.hostname, 636 self._extra_command_args, 637 queue_entry=self.queue_entry, 638 in_lab=True) 639 640 641 def _working_directory(self): 642 return self.task.execution_path() 643 644 645 @property 646 def owner_username(self): 647 if self.task.requested_by: 648 return self.task.requested_by.login 649 return None 650 651 652 def prolog(self): 653 super(SpecialAgentTask, self).prolog() 654 self.task.activate() 655 self._write_host_keyvals(self.host) 656 657 658 def _fail_queue_entry(self): 659 assert self.queue_entry 660 661 if self.queue_entry.meta_host: 662 return # don't fail metahost entries, they'll be reassigned 663 664 self.queue_entry.update_from_database() 665 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED: 666 return # entry has been aborted 667 668 self._actually_fail_queue_entry() 669 670 671 def epilog(self): 672 super(SpecialAgentTask, self).epilog() 673 self._emit_special_task_status_metric() 674 675 676 def _emit_special_task_status_metric(self): 677 """Increments an accumulator associated with this special task.""" 678 fields = {'type': self.TASK_TYPE, 679 'success': bool(self.success), 680 'board': str(self.host.board), 681 'milestone': self._milestone} 682 metrics.Counter(self._COUNT_METRIC).increment( 683 fields=fields) 684 685 if (self.task.time_finished and self.task.time_started): 686 duration = (self.task.time_finished - 687 self.task.time_started).total_seconds() 688 metrics.SecondsDistribution(self._DURATION_METRIC).add( 689 duration, fields=fields) 690 691 dut_fields = { 692 'type': self.TASK_TYPE, 693 'success': bool(self.success), 694 'board': str(self.host.board), 695 'dut_host_name': self.host.hostname 696 } 697 metrics.Counter(self._DUT_METRIC).increment(fields=dut_fields) 698 699 # TODO(milleral): http://crbug.com/268607 700 # All this used to be a part of _fail_queue_entry. The 701 # exact semantics of when one should and should not be failing a queue 702 # entry need to be worked out, because provisioning has placed us in a 703 # case where we want to fail a queue entry that could be requeued, 704 # which makes us fail the two above if statements, and thus 705 # _fail_queue_entry() would exit early and have no effect. 706 # What's left here with _actually_fail_queue_entry is a hack to be able to 707 # bypass the checks and unconditionally execute the code. 708 def _actually_fail_queue_entry(self): 709 self.queue_entry.set_execution_subdir() 710 queued_key, queued_time = self._job_queued_keyval( 711 self.queue_entry.job) 712 self._write_keyval_after_job(queued_key, queued_time) 713 self._write_job_finished() 714 715 # copy results logs into the normal place for job results 716 self.monitor.try_copy_results_on_drone( 717 source_path=self._working_directory() + '/', 718 destination_path=self.queue_entry.execution_path() + '/') 719 720 pidfile_id = self._drone_manager.get_pidfile_id_from( 721 self.queue_entry.execution_path(), 722 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 723 self._drone_manager.register_pidfile(pidfile_id) 724 725 if self.queue_entry.job.parse_failed_repair: 726 self._parse_results([self.queue_entry]) 727 else: 728 self._archive_results([self.queue_entry]) 729 730 # Also fail all other special tasks that have not yet run for this HQE 731 pending_tasks = models.SpecialTask.objects.filter( 732 queue_entry__id=self.queue_entry.id, 733 is_complete=0) 734 for task in pending_tasks: 735 task.finish(False) 736 737 738 def cleanup(self): 739 super(SpecialAgentTask, self).cleanup() 740 741 # We will consider an aborted task to be "Failed" 742 self.task.finish(bool(self.success)) 743 744 if self.monitor: 745 if self.monitor.has_process(): 746 self._copy_results([self.task]) 747 if self.monitor.pidfile_id is not None: 748 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id) 749 750 751 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False): 752 """Remove a type of special task in all tasks, keep last one if needed. 753 754 @param special_task_to_remove: type of special task to be removed, e.g., 755 models.SpecialTask.Task.VERIFY. 756 @param keep_last_one: True to keep the last special task if its type is 757 the same as of special_task_to_remove. 758 759 """ 760 queued_special_tasks = models.SpecialTask.objects.filter( 761 host__id=self.host.id, 762 task=special_task_to_remove, 763 is_active=False, is_complete=False, queue_entry=None) 764 if keep_last_one: 765 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id) 766 queued_special_tasks.delete() 767 768 769 def _generate_autoserv_label_args(self, task): 770 """ 771 @param task: An instance of afe model's SpecialTask. 772 @returns: The list of arguments to pass to autoserv to tell it what the 773 labels of a job are. 774 775 """ 776 labels = {x.name for x in task.queue_entry.job.labels} 777 return ['--job-labels', ','.join(labels)] 778