1#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
13from autotest_lib.client.common_lib import utils
14from autotest_lib.frontend.afe import models, model_attributes
15from autotest_lib.scheduler import agent_task, drones, drone_manager
16from autotest_lib.scheduler import email_manager, pidfile_monitor
17from autotest_lib.scheduler import scheduler_config
18from autotest_lib.server import autoserv_utils
19
20try:
21    from chromite.lib import metrics
22except ImportError:
23    metrics = utils.metrics_mock
24
25
26_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
27
28
29class PostJobTask(agent_task.AgentTask):
30    def __init__(self, queue_entries, log_file_name):
31        super(PostJobTask, self).__init__(log_file_name=log_file_name)
32
33        self.queue_entries = queue_entries
34
35        self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
36        self._autoserv_monitor.attach_to_existing_process(
37                self._working_directory())
38
39
40    def _command_line(self):
41        # Do we need testing_mode?
42        return self._generate_command(
43                self._drone_manager.absolute_path(self._working_directory()))
44
45
46    def _generate_command(self, results_dir):
47        raise NotImplementedError('Subclasses must override this')
48
49
50    @property
51    def owner_username(self):
52        return self.queue_entries[0].job.owner
53
54
55    def _working_directory(self):
56        return self._get_consistent_execution_path(self.queue_entries)
57
58
59    def _paired_with_monitor(self):
60        return self._autoserv_monitor
61
62
63    def _job_was_aborted(self):
64        was_aborted = None
65        for queue_entry in self.queue_entries:
66            queue_entry.update_from_database()
67            if was_aborted is None: # first queue entry
68                was_aborted = bool(queue_entry.aborted)
69            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
70                entries = ['%s (aborted: %s)' % (entry, entry.aborted)
71                           for entry in self.queue_entries]
72                email_manager.manager.enqueue_notify_email(
73                        'Inconsistent abort state',
74                        'Queue entries have inconsistent abort state:\n' +
75                        '\n'.join(entries))
76                # don't crash here, just assume true
77                return True
78        return was_aborted
79
80
81    def _final_status(self):
82        if self._job_was_aborted():
83            return models.HostQueueEntry.Status.ABORTED
84
85        # we'll use a PidfileRunMonitor to read the autoserv exit status
86        if self._autoserv_monitor.exit_code() == 0:
87            return models.HostQueueEntry.Status.COMPLETED
88        return models.HostQueueEntry.Status.FAILED
89
90
91    def _set_all_statuses(self, status):
92        for queue_entry in self.queue_entries:
93            queue_entry.set_status(status)
94
95
96    def abort(self):
97        # override AgentTask.abort() to avoid killing the process and ending
98        # the task.  post-job tasks continue when the job is aborted.
99        pass
100
101
102    def _pidfile_label(self):
103        # '.autoserv_execute' -> 'autoserv'
104        return self._pidfile_name()[1:-len('_execute')]
105
106
107class SelfThrottledPostJobTask(PostJobTask):
108    """
109    PostJobTask that maintains its own process limit.
110
111    We throttle tasks like parsing because we don't want them to
112    hold up tests. At the same time we don't wish to build up load
113    that will take forever to parse.
114    """
115    _num_running_processes = 0
116    # Last known limit of max processes, used to check whether
117    # max processes config has been changed.
118    _last_known_max_processes = 0
119    # Whether an email should be sent to notifiy process limit being hit.
120    _notification_on = True
121    # Once process limit is hit, an email will be sent.
122    # To prevent spams, do not send another email until
123    # it drops to lower than the following level.
124    REVIVE_NOTIFICATION_THRESHOLD = 0.80
125
126    @classmethod
127    def _gauge_metrics(cls):
128        """Report to monarch the number of running processes."""
129        m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
130        m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
131
132
133    @classmethod
134    def _increment_running_processes(cls):
135        cls._num_running_processes += 1
136        cls._gauge_metrics()
137
138
139    @classmethod
140    def _decrement_running_processes(cls):
141        cls._num_running_processes -= 1
142        cls._gauge_metrics()
143
144
145    @classmethod
146    def _max_processes(cls):
147        raise NotImplementedError
148
149
150    @classmethod
151    def _can_run_new_process(cls):
152        return cls._num_running_processes < cls._max_processes()
153
154
155    def _process_started(self):
156        return bool(self.monitor)
157
158
159    def tick(self):
160        # override tick to keep trying to start until the process count goes
161        # down and we can, at which point we revert to default behavior
162        if self._process_started():
163            super(SelfThrottledPostJobTask, self).tick()
164        else:
165            self._try_starting_process()
166
167
168    def run(self):
169        # override run() to not actually run unless we can
170        self._try_starting_process()
171
172
173    @classmethod
174    def _notify_process_limit_hit(cls):
175        """Send an email to notify that process limit is hit."""
176        if cls._notification_on:
177            subject = '%s: hitting max process limit.' % cls.__name__
178            message = ('Running processes/Max processes: %d/%d'
179                       % (cls._num_running_processes, cls._max_processes()))
180            email_manager.manager.enqueue_notify_email(subject, message)
181            cls._notification_on = False
182
183
184    @classmethod
185    def _reset_notification_switch_if_necessary(cls):
186        """Reset _notification_on if necessary.
187
188        Set _notification_on to True on the following cases:
189        1) If the limit of max processes configuration changes;
190        2) If _notification_on is False and the number of running processes
191           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
192
193        """
194        if cls._last_known_max_processes != cls._max_processes():
195            cls._notification_on = True
196            cls._last_known_max_processes = cls._max_processes()
197            return
198        percentage = float(cls._num_running_processes) / cls._max_processes()
199        if (not cls._notification_on and
200            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
201            cls._notification_on = True
202
203
204    def _try_starting_process(self):
205        self._reset_notification_switch_if_necessary()
206        if not self._can_run_new_process():
207            self._notify_process_limit_hit()
208            return
209
210        # actually run the command
211        super(SelfThrottledPostJobTask, self).run()
212        if self._process_started():
213            self._increment_running_processes()
214
215
216    def finished(self, success):
217        super(SelfThrottledPostJobTask, self).finished(success)
218        if self._process_started():
219            self._decrement_running_processes()
220
221
222class GatherLogsTask(PostJobTask):
223    """
224    Task responsible for
225    * gathering uncollected logs (if Autoserv crashed hard or was killed)
226    * copying logs to the results repository
227    * spawning CleanupTasks for hosts, if necessary
228    * spawning a FinalReparseTask for the job
229    * setting the final status of the host, directly or through a cleanup
230    """
231    def __init__(self, queue_entries, recover_run_monitor=None):
232        self._job = queue_entries[0].job
233        super(GatherLogsTask, self).__init__(
234            queue_entries, log_file_name='.collect_crashinfo.log')
235        self._set_ids(queue_entries=queue_entries)
236
237
238    # TODO: Refactor into autoserv_utils. crbug.com/243090
239    def _generate_command(self, results_dir):
240        host_list = ','.join(queue_entry.host.hostname
241                             for queue_entry in self.queue_entries)
242        return [autoserv_utils.autoserv_path , '-p',
243                '--pidfile-label=%s' % self._pidfile_label(),
244                '--use-existing-results', '--collect-crashinfo',
245                '-m', host_list, '-r', results_dir]
246
247
248    @property
249    def num_processes(self):
250        return len(self.queue_entries)
251
252
253    def _pidfile_name(self):
254        return drone_manager.CRASHINFO_PID_FILE
255
256
257    def prolog(self):
258        self._check_queue_entry_statuses(
259                self.queue_entries,
260                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
261                allowed_host_statuses=(models.Host.Status.RUNNING,))
262
263        super(GatherLogsTask, self).prolog()
264
265
266    def epilog(self):
267        super(GatherLogsTask, self).epilog()
268        self._parse_results(self.queue_entries)
269        self._reboot_hosts()
270
271
272    def _reboot_hosts(self):
273        if self._autoserv_monitor.has_process():
274            final_success = (self._final_status() ==
275                             models.HostQueueEntry.Status.COMPLETED)
276            num_tests_failed = self._autoserv_monitor.num_tests_failed()
277        else:
278            final_success = False
279            num_tests_failed = 0
280        reboot_after = self._job.reboot_after
281        do_reboot = (
282                # always reboot after aborted jobs
283                self._final_status() == models.HostQueueEntry.Status.ABORTED
284                or reboot_after == model_attributes.RebootAfter.ALWAYS
285                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
286                    and final_success and num_tests_failed == 0)
287                or num_tests_failed > 0)
288
289        for queue_entry in self.queue_entries:
290            if do_reboot:
291                # don't pass the queue entry to the CleanupTask. if the cleanup
292                # fails, the job doesn't care -- it's over.
293                models.SpecialTask.objects.create(
294                        host=models.Host.objects.get(id=queue_entry.host.id),
295                        task=models.SpecialTask.Task.CLEANUP,
296                        requested_by=self._job.owner_model())
297            else:
298                queue_entry.host.set_status(models.Host.Status.READY)
299
300
301    def run(self):
302        autoserv_exit_code = self._autoserv_monitor.exit_code()
303        # only run if Autoserv exited due to some signal. if we have no exit
304        # code, assume something bad (and signal-like) happened.
305        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
306            super(GatherLogsTask, self).run()
307        else:
308            self.finished(True)
309
310
311class FinalReparseTask(SelfThrottledPostJobTask):
312    def __init__(self, queue_entries):
313        super(FinalReparseTask, self).__init__(queue_entries,
314                                               log_file_name='.parse.log')
315        # don't use _set_ids, since we don't want to set the host_ids
316        self.queue_entry_ids = [entry.id for entry in queue_entries]
317
318
319    def _generate_command(self, results_dir):
320        return [_parser_path, '--write-pidfile', '--record-duration',
321                '--suite-report', '-l', '2', '-r', '-o', results_dir]
322
323
324    @property
325    def num_processes(self):
326        return 0 # don't include parser processes in accounting
327
328
329    def _pidfile_name(self):
330        return drone_manager.PARSER_PID_FILE
331
332
333    @classmethod
334    def _max_processes(cls):
335        return scheduler_config.config.max_parse_processes
336
337
338    def prolog(self):
339        self._check_queue_entry_statuses(
340                self.queue_entries,
341                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
342
343        super(FinalReparseTask, self).prolog()
344
345
346    def epilog(self):
347        super(FinalReparseTask, self).epilog()
348        self._archive_results(self.queue_entries)
349
350
351class ArchiveResultsTask(SelfThrottledPostJobTask):
352    _ARCHIVING_FAILED_FILE = '.archiver_failed'
353
354    def __init__(self, queue_entries):
355        super(ArchiveResultsTask, self).__init__(queue_entries,
356                                                 log_file_name='.archiving.log')
357        # don't use _set_ids, since we don't want to set the host_ids
358        self.queue_entry_ids = [entry.id for entry in queue_entries]
359
360
361    def _pidfile_name(self):
362        return drone_manager.ARCHIVER_PID_FILE
363
364
365    # TODO: Refactor into autoserv_utils. crbug.com/243090
366    def _generate_command(self, results_dir):
367        return [autoserv_utils.autoserv_path , '-p',
368                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
369                '--use-existing-results', '--control-filename=control.archive',
370                os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
371                             'archive_results.control.srv')]
372
373
374    @classmethod
375    def _max_processes(cls):
376        return scheduler_config.config.max_transfer_processes
377
378
379    def prolog(self):
380        self._check_queue_entry_statuses(
381                self.queue_entries,
382                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
383
384        super(ArchiveResultsTask, self).prolog()
385
386
387    def epilog(self):
388        super(ArchiveResultsTask, self).epilog()
389        if not self.success and self._paired_with_monitor().has_process():
390            failed_file = os.path.join(self._working_directory(),
391                                       self._ARCHIVING_FAILED_FILE)
392            paired_process = self._paired_with_monitor().get_process()
393            self._drone_manager.write_lines_to_file(
394                    failed_file, ['Archiving failed with exit code %s'
395                                  % self.monitor.exit_code()],
396                    paired_with_process=paired_process)
397        self._set_all_statuses(self._final_status())
398