1#pylint: disable-msg=C0111
2
3""" This is the module for everything related to the AgentTask.
4
5The BaseAgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9    The Agent is the bridge between the scheduler and the AgentTask. The
10    schedulers tick has a method called handle_agents, which calls the
11    tick of each agent in the Dispatchers queue. This leads to the Agent
12    polling its AgentTask. The scheduler will keep polling a task through
13    the associated Agent till the Agent is removed from the dispatcher.
14
15    At a high level:
16        agents finished = tasks done
17        agent polls till finished
18            task polls till done
19                task sets done
20        agent is removed from dispatcher
21AgentTasks:
22    Basic AgentTasks are created when an hqe changes state. Examples of these
23    are the QueueTask, which is created when a hqe goes into the Starting state
24    and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
26    Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
27    in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33    task poll
34        start
35            prolog
36        tick till we get an exit code
37        finished(exit==0)
38            done=True
39            epilog
40                cleanup
41                    set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45       tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47    - prolog: Usually where one puts any model state changes that happen before
48              the actual task. Different per Task. Examples of things that might
49              happen in a prolog:
50                  - state of Host, HQE (to something like Resetting)
51                  - delete any unwanted queued special tasks
52                  - register a pidfile
53                  - set the is_active bit on the special task
54    - run:
55        - create a PidfileRunMonitor
56        - pass the autoserv command, working directory etc to drone manager.
57          This will start the actual autoserv process.
58   - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61        to a tick. The tick monitors the pid file of the autoserv process
62        running on the drone through the PidfileRunMonitor created in prolog.
63        If the autoserv process has finished we call finished with true/false
64        depending on autoserv exit code.
65
66        - finished: sets the done and success values, then calls epilog. The
67                    done bit is important because the Agent polls this bit to
68                    measure the success or failure of its task.
69
70            - epilog: Is generally where we set status of the Host/HQE again,
71                      requeue any other task that needs to run after this one
72                      and perform cleanup. Just like the prolog, this step is
73                      different per task.
74
75                      - cleanup: Sets the is_active and is_complete and success
76                                 states on the tasks model. Also uses the
77                                 drone_manager to:
78                                    unregister the pidfile
79                                    copy results of the task
80                                 (Note this is not to be confused with the
81                                  special task called cleanup).
82
83                      The actions we take in the epilog are based on the
84                      success/failure of the autoserv process set in cleanup,
85                      eg: if reset failed we will enqueue a repair, but if all
86                      is well the epilog will just return. Prejob task epilogs
87                      also have an on_pending method that change the status of
88                      the HQE to pending/starting, which gets picked up in the
89                      scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92BaseAgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94      |--->RepairTask
95      |--->PreJobTask
96           |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99      |--->QueueTask
100      |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103      |--->GatherLogsTask
104      |--->SelfThrottledPostJobTask
105            |--->FinalReparseTask
106            |--->ArchiveResultsTask
107
108"""
109
110import logging
111import os
112import urllib
113import time
114
115from autotest_lib.client.common_lib import global_config
116from autotest_lib.client.common_lib import utils
117from autotest_lib.frontend.afe import models
118from autotest_lib.scheduler import drone_manager, pidfile_monitor
119from autotest_lib.scheduler import scheduler_lib
120from autotest_lib.scheduler import rdb_lib
121from autotest_lib.scheduler import scheduler_models
122from autotest_lib.server import autoserv_utils
123from autotest_lib.server import system_utils
124
125try:
126    from chromite.lib import metrics
127except ImportError:
128    metrics = utils.metrics_mock
129
130
131CONFIG = global_config.global_config
132AUTOSERV_NICE_LEVEL = 10
133
134ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
135        'CROS', 'enable_drone_in_restricted_subnet', type=bool,
136        default=False)
137
138
139class BaseAgentTask(object):
140    class _NullMonitor(object):
141        pidfile_id = None
142
143        def has_process(self):
144            return True
145
146
147    def __init__(self, log_file_name=None):
148        """
149        @param log_file_name: (optional) name of file to log command output to
150        """
151        self._drone_manager = drone_manager.instance()
152        self.done = False
153        self.started = False
154        self.success = None
155        self.aborted = False
156        self.monitor = None
157        self.queue_entry_ids = []
158        self.host_ids = []
159        # A map between host id and hostname.
160        self.hostnames = {}
161        self._log_file_name = log_file_name
162
163
164    def _set_ids(self, host=None, queue_entries=None):
165        if queue_entries and queue_entries != [None]:
166            self.host_ids = [entry.host.id for entry in queue_entries]
167            self.queue_entry_ids = [entry.id for entry in queue_entries]
168            self.hostnames = dict((entry.host.id, entry.host.hostname)
169                                  for entry in queue_entries)
170        else:
171            assert host
172            self.host_ids = [host.id]
173            self.hostnames = {host.id: host.hostname}
174
175
176    def poll(self):
177        if not self.started:
178            self.start()
179        if not self.done:
180            self.tick()
181
182
183    def tick(self):
184        assert self.monitor
185        exit_code = self.monitor.exit_code()
186        if exit_code is None:
187            return
188
189        success = (exit_code == 0)
190        self.finished(success)
191
192
193    def is_done(self):
194        return self.done
195
196
197    def finished(self, success):
198        if self.done:
199            assert self.started
200            return
201        self.started = True
202        self.done = True
203        self.success = success
204        self.epilog()
205
206
207    def prolog(self):
208        """
209        To be overridden.
210        """
211        assert not self.monitor
212        self.register_necessary_pidfiles()
213
214
215    def _log_file(self):
216        if not self._log_file_name:
217            return None
218        return os.path.join(self._working_directory(), self._log_file_name)
219
220
221    def cleanup(self):
222        log_file = self._log_file()
223        if self.monitor and log_file:
224            self.monitor.try_copy_to_results_repository(log_file)
225
226
227    def epilog(self):
228        """
229        To be overridden.
230        """
231        self.cleanup()
232        logging.info("%s finished with success=%s", type(self).__name__,
233                     self.success)
234
235
236    def start(self):
237        if not self.started:
238            self.prolog()
239            self.run()
240
241        self.started = True
242
243
244    def abort(self):
245        if self.monitor:
246            self.monitor.kill()
247        self.done = True
248        self.aborted = True
249        self.cleanup()
250
251
252    def _get_consistent_execution_path(self, execution_entries):
253        first_execution_path = execution_entries[0].execution_path()
254        for execution_entry in execution_entries[1:]:
255            assert execution_entry.execution_path() == first_execution_path, (
256                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
257                                        execution_entry,
258                                        first_execution_path,
259                                        execution_entries[0]))
260        return first_execution_path
261
262
263    def _copy_results(self, execution_entries, use_monitor=None):
264        """
265        @param execution_entries: list of objects with execution_path() method
266        """
267        if use_monitor is not None and not use_monitor.has_process():
268            return
269
270        assert len(execution_entries) > 0
271        if use_monitor is None:
272            assert self.monitor
273            use_monitor = self.monitor
274        assert use_monitor.has_process()
275        execution_path = self._get_consistent_execution_path(execution_entries)
276        results_path = execution_path + '/'
277        use_monitor.try_copy_to_results_repository(results_path)
278
279
280    def _parse_results(self, queue_entries):
281        for queue_entry in queue_entries:
282            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
283
284
285    def _archive_results(self, queue_entries):
286        for queue_entry in queue_entries:
287            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
288
289
290    def _command_line(self):
291        """
292        Return the command line to run.  Must be overridden.
293        """
294        raise NotImplementedError
295
296
297    @property
298    def num_processes(self):
299        """
300        Return the number of processes forked by this BaseAgentTask's process.
301        It may only be approximate.  To be overridden if necessary.
302        """
303        return 1
304
305
306    def _paired_with_monitor(self):
307        """
308        If this BaseAgentTask's process must run on the same machine as some
309        previous process, this method should be overridden to return a
310        PidfileRunMonitor for that process.
311        """
312        return self._NullMonitor()
313
314
315    @property
316    def owner_username(self):
317        """
318        Return login of user responsible for this task.  May be None.  Must be
319        overridden.
320        """
321        raise NotImplementedError
322
323
324    def _working_directory(self):
325        """
326        Return the directory where this BaseAgentTask's process executes.
327        Must be overridden.
328        """
329        raise NotImplementedError
330
331
332    def _pidfile_name(self):
333        """
334        Return the name of the pidfile this BaseAgentTask's process uses.  To be
335        overridden if necessary.
336        """
337        return drone_manager.AUTOSERV_PID_FILE
338
339
340    def _check_paired_results_exist(self):
341        if not self._paired_with_monitor().has_process():
342            metrics.Counter(
343                'chromeos/autotest/errors/scheduler/no_paired_results'
344            ).increment()
345            self.finished(False)
346            return False
347        return True
348
349
350    def _create_monitor(self):
351        assert not self.monitor
352        self.monitor = pidfile_monitor.PidfileRunMonitor()
353
354
355    def run(self):
356        if not self._check_paired_results_exist():
357            return
358
359        self._create_monitor()
360        self.monitor.run(
361                self._command_line(), self._working_directory(),
362                num_processes=self.num_processes,
363                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
364                pidfile_name=self._pidfile_name(),
365                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
366                username=self.owner_username,
367                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
368
369
370    def get_drone_hostnames_allowed(
371            self, restricted_subnets=utils.RESTRICTED_SUBNETS,
372            enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
373        filtered_drones = None
374        has_unrestricted_host = False
375        if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
376            for hostname in self.hostnames.values():
377                subnet = utils.get_restricted_subnet(hostname,
378                                                     restricted_subnets)
379
380                # Return an empty set if the list of hosts exists both in
381                # restricted and unrestricted subnet. No drone can work in such
382                # case.
383                if ((not subnet and filtered_drones is not None) or
384                    (subnet and has_unrestricted_host)):
385                    logging.error('The test has some DUT in restricted subnet, '
386                                  'but some in unrestricted subnet. Therefore, '
387                                  'no drone is available to run the test.')
388                    return set()
389
390                if not subnet:
391                    has_unrestricted_host = True
392                    continue
393
394                server_ip_map=system_utils.DroneCache.get_drone_ip_map()
395                filtered_drones_for_host = set(
396                        utils.get_servers_in_same_subnet(
397                                subnet[0], subnet[1],
398                                server_ip_map=server_ip_map))
399                logging.info('DUT %s is in restricted subnet, drone can only '
400                             'be chosen from %s', hostname,
401                             filtered_drones_for_host)
402                if filtered_drones is None:
403                    filtered_drones = filtered_drones_for_host
404                else:
405                    filtered_drones = set.intersection(
406                            filtered_drones, filtered_drones_for_host)
407
408                # If filtered_drones is an empty set, that means no drone is
409                # allowed to run the task. This is different fron None, which
410                # means all drones are allowed.
411                if filtered_drones == set():
412                    logging.error('DUT(s) is in restricted subnet, but no '
413                                  'drone is available to run the test.')
414                    return filtered_drones
415
416        # If host is not in restricted subnet, use the unrestricted drones only.
417        if (filtered_drones is None and restricted_subnets and
418            enable_drone_in_subnet):
419            filtered_drones = set(
420                    system_utils.DroneCache.get_unrestricted_drones(
421                            restricted_subnets=restricted_subnets))
422
423        if not models.DroneSet.drone_sets_enabled():
424            return filtered_drones
425
426        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
427        if not hqes:
428            # Only special tasks could be missing host queue entries
429            assert isinstance(self, SpecialAgentTask)
430            return self._user_or_global_default_drone_set(
431                    self.task, self.task.requested_by)
432
433        job_ids = hqes.values_list('job', flat=True).distinct()
434        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
435                                      "span multiple jobs")
436
437        job = models.Job.objects.get(id=job_ids[0])
438        drone_set = job.drone_set
439        if not drone_set:
440            return self._user_or_global_default_drone_set(job, job.user())
441
442        if filtered_drones:
443            return set.intersection(filtered_drones,
444                                    drone_set.get_drone_hostnames())
445        else:
446            return drone_set.get_drone_hostnames()
447
448
449    def _user_or_global_default_drone_set(self, obj_with_owner, user):
450        """
451        Returns the user's default drone set, if present.
452
453        Otherwise, returns the global default drone set.
454        """
455        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
456        if not user:
457            logging.warning('%s had no owner; using default drone set',
458                         obj_with_owner)
459            return default_hostnames
460        if not user.drone_set:
461            logging.warning('User %s has no default drone set, using global '
462                         'default', user.login)
463            return default_hostnames
464        return user.drone_set.get_drone_hostnames()
465
466
467    def register_necessary_pidfiles(self):
468        pidfile_id = self._drone_manager.get_pidfile_id_from(
469                self._working_directory(), self._pidfile_name())
470        self._drone_manager.register_pidfile(pidfile_id)
471
472        paired_pidfile_id = self._paired_with_monitor().pidfile_id
473        if paired_pidfile_id:
474            self._drone_manager.register_pidfile(paired_pidfile_id)
475
476
477    def recover(self):
478        if not self._check_paired_results_exist():
479            return
480
481        self._create_monitor()
482        self.monitor.attach_to_existing_process(
483                self._working_directory(), pidfile_name=self._pidfile_name(),
484                num_processes=self.num_processes)
485        if not self.monitor.has_process():
486            # no process to recover; wait to be started normally
487            self.monitor = None
488            return
489
490        self.started = True
491        logging.info('Recovering process %s for %s at %s',
492                     self.monitor.get_process(), type(self).__name__,
493                     self._working_directory())
494
495
496    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
497                                    allowed_host_statuses=None):
498        class_name = self.__class__.__name__
499        for entry in queue_entries:
500            if entry.status not in allowed_hqe_statuses:
501                raise scheduler_lib.SchedulerError(
502                        '%s attempting to start entry with invalid status %s: '
503                        '%s' % (class_name, entry.status, entry))
504            invalid_host_status = (
505                    allowed_host_statuses is not None
506                    and entry.host.status not in allowed_host_statuses)
507            if invalid_host_status:
508                raise scheduler_lib.SchedulerError(
509                        '%s attempting to start on queue entry with invalid '
510                        'host status %s: %s'
511                        % (class_name, entry.host.status, entry))
512
513
514SiteAgentTask = utils.import_site_class(
515    __file__, 'autotest_lib.scheduler.site_monitor_db',
516    'SiteAgentTask', BaseAgentTask)
517
518class AgentTask(SiteAgentTask):
519    pass
520
521
522class TaskWithJobKeyvals(object):
523    """AgentTask mixin providing functionality to help with job keyval files."""
524    _KEYVAL_FILE = 'keyval'
525    def _format_keyval(self, key, value):
526        return '%s=%s' % (key, value)
527
528
529    def _keyval_path(self):
530        """Subclasses must override this"""
531        raise NotImplementedError
532
533
534    def _write_keyval_after_job(self, field, value):
535        assert self.monitor
536        if not self.monitor.has_process():
537            return
538        self._drone_manager.write_lines_to_file(
539            self._keyval_path(), [self._format_keyval(field, value)],
540            paired_with_process=self.monitor.get_process())
541
542
543    def _job_queued_keyval(self, job):
544        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
545
546
547    def _write_job_finished(self):
548        self._write_keyval_after_job("job_finished", int(time.time()))
549
550
551    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
552        keyval_contents = '\n'.join(self._format_keyval(key, value)
553                                    for key, value in keyval_dict.iteritems())
554        # always end with a newline to allow additional keyvals to be written
555        keyval_contents += '\n'
556        self._drone_manager.attach_file_to_execution(self._working_directory(),
557                                                keyval_contents,
558                                                file_path=keyval_path)
559
560
561    def _write_keyvals_before_job(self, keyval_dict):
562        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
563
564
565    def _write_host_keyvals(self, host):
566        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
567                                   host.hostname)
568        platform, all_labels = host.platform_and_labels()
569        all_labels = [ urllib.quote(label) for label in all_labels ]
570        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
571        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
572
573
574class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
575    """
576    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
577    """
578
579    TASK_TYPE = None
580    host = None
581    queue_entry = None
582    _COUNT_METRIC = 'chromeos/autotest/scheduler/special_task_count'
583    _DUT_METRIC = 'chromeos/autotest/scheduler/special_task_by_dut'
584    _DURATION_METRIC = 'chromeos/autotest/scheduler/special_task_durations'
585
586
587    def __init__(self, task, extra_command_args):
588        super(SpecialAgentTask, self).__init__()
589
590        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
591
592        self.host = rdb_lib.get_hosts([task.host.id])[0]
593        self.host.dbg_str = 'Task: %s' % str(task)
594        self.queue_entry = None
595        if task.queue_entry:
596            self.queue_entry = scheduler_models.HostQueueEntry(
597                    id=task.queue_entry.id)
598            self.host.dbg_str += self.queue_entry.get_dbg_str()
599
600        # This is of type SpecialTask (as defined in frontend/afe/models.py)
601        self.task = task
602        self._extra_command_args = extra_command_args
603        self.host.metadata = self.get_metadata()
604        self._milestone = ''
605
606
607    def get_metadata(self):
608        """Get a dictionary that contains task information.
609
610        The return value is a dictionary that includes task information like id,
611        name and related job information. The value will be stored in metadata
612        database.
613        @return: A dictionary containing the task id, name and related job id.
614                 If some attributes are failed to be accessed, an empty
615                 dictionary will be returned, and error will be logged.
616        """
617        try:
618            metadata = {'task_id':self.task.id, 'task_name':self.task.task,
619                        'hostname':self.task.host.hostname}
620            if self.task.queue_entry:
621                job = self.task.queue_entry.job
622                metadata.update(
623                        scheduler_models.get_job_metadata(job))
624            return metadata
625        except AttributeError as e:
626            logging.error('Task has missing attribute: %s', e)
627            return {}
628
629
630    def _keyval_path(self):
631        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
632
633
634    def _command_line(self):
635        return autoserv_utils._autoserv_command_line(self.host.hostname,
636                                                     self._extra_command_args,
637                                                     queue_entry=self.queue_entry,
638                                                     in_lab=True)
639
640
641    def _working_directory(self):
642        return self.task.execution_path()
643
644
645    @property
646    def owner_username(self):
647        if self.task.requested_by:
648            return self.task.requested_by.login
649        return None
650
651
652    def prolog(self):
653        super(SpecialAgentTask, self).prolog()
654        self.task.activate()
655        self._write_host_keyvals(self.host)
656
657
658    def _fail_queue_entry(self):
659        assert self.queue_entry
660
661        if self.queue_entry.meta_host:
662            return # don't fail metahost entries, they'll be reassigned
663
664        self.queue_entry.update_from_database()
665        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
666            return # entry has been aborted
667
668        self._actually_fail_queue_entry()
669
670
671    def epilog(self):
672        super(SpecialAgentTask, self).epilog()
673        self._emit_special_task_status_metric()
674
675
676    def _emit_special_task_status_metric(self):
677        """Increments an accumulator associated with this special task."""
678        fields = {'type': self.TASK_TYPE,
679                  'success': bool(self.success),
680                  'board': str(self.host.board),
681                  'milestone': self._milestone}
682        metrics.Counter(self._COUNT_METRIC).increment(
683            fields=fields)
684
685        if (self.task.time_finished and self.task.time_started):
686            duration = (self.task.time_finished -
687                        self.task.time_started).total_seconds()
688            metrics.SecondsDistribution(self._DURATION_METRIC).add(
689                duration, fields=fields)
690
691        dut_fields = {
692            'type': self.TASK_TYPE,
693            'success': bool(self.success),
694            'board': str(self.host.board),
695            'dut_host_name': self.host.hostname
696        }
697        metrics.Counter(self._DUT_METRIC).increment(fields=dut_fields)
698
699    # TODO(milleral): http://crbug.com/268607
700    # All this used to be a part of _fail_queue_entry.  The
701    # exact semantics of when one should and should not be failing a queue
702    # entry need to be worked out, because provisioning has placed us in a
703    # case where we want to fail a queue entry that could be requeued,
704    # which makes us fail the two above if statements, and thus
705    # _fail_queue_entry() would exit early and have no effect.
706    # What's left here with _actually_fail_queue_entry is a hack to be able to
707    # bypass the checks and unconditionally execute the code.
708    def _actually_fail_queue_entry(self):
709        self.queue_entry.set_execution_subdir()
710        queued_key, queued_time = self._job_queued_keyval(
711            self.queue_entry.job)
712        self._write_keyval_after_job(queued_key, queued_time)
713        self._write_job_finished()
714
715        # copy results logs into the normal place for job results
716        self.monitor.try_copy_results_on_drone(
717                source_path=self._working_directory() + '/',
718                destination_path=self.queue_entry.execution_path() + '/')
719
720        pidfile_id = self._drone_manager.get_pidfile_id_from(
721                self.queue_entry.execution_path(),
722                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
723        self._drone_manager.register_pidfile(pidfile_id)
724
725        if self.queue_entry.job.parse_failed_repair:
726            self._parse_results([self.queue_entry])
727        else:
728            self._archive_results([self.queue_entry])
729
730        # Also fail all other special tasks that have not yet run for this HQE
731        pending_tasks = models.SpecialTask.objects.filter(
732                queue_entry__id=self.queue_entry.id,
733                is_complete=0)
734        for task in pending_tasks:
735            task.finish(False)
736
737
738    def cleanup(self):
739        super(SpecialAgentTask, self).cleanup()
740
741        # We will consider an aborted task to be "Failed"
742        self.task.finish(bool(self.success))
743
744        if self.monitor:
745            if self.monitor.has_process():
746                self._copy_results([self.task])
747            if self.monitor.pidfile_id is not None:
748                self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
749
750
751    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
752        """Remove a type of special task in all tasks, keep last one if needed.
753
754        @param special_task_to_remove: type of special task to be removed, e.g.,
755            models.SpecialTask.Task.VERIFY.
756        @param keep_last_one: True to keep the last special task if its type is
757            the same as of special_task_to_remove.
758
759        """
760        queued_special_tasks = models.SpecialTask.objects.filter(
761            host__id=self.host.id,
762            task=special_task_to_remove,
763            is_active=False, is_complete=False, queue_entry=None)
764        if keep_last_one:
765            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
766        queued_special_tasks.delete()
767
768
769    def _generate_autoserv_label_args(self, task):
770        """
771        @param task: An instance of afe model's SpecialTask.
772        @returns: The list of arguments to pass to autoserv to tell it what the
773                  labels of a job are.
774
775        """
776        labels = {x.name for x in task.queue_entry.job.labels}
777        return ['--job-labels', ','.join(labels)]
778