1#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
13from autotest_lib.client.common_lib import utils
14from autotest_lib.frontend.afe import models, model_attributes
15from autotest_lib.scheduler import agent_task, drones, drone_manager
16from autotest_lib.scheduler import email_manager, pidfile_monitor
17from autotest_lib.scheduler import scheduler_config
18from autotest_lib.server import autoserv_utils
19
20try:
21    from chromite.lib import metrics
22except ImportError:
23    metrics = utils.metrics_mock
24
25
26_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
27
28
29class PostJobTask(agent_task.AgentTask):
30    def __init__(self, queue_entries, log_file_name):
31        super(PostJobTask, self).__init__(log_file_name=log_file_name)
32
33        self.queue_entries = queue_entries
34
35        self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
36        self._autoserv_monitor.attach_to_existing_process(
37                self._working_directory())
38
39
40    def _command_line(self):
41        # Do we need testing_mode?
42        return self._generate_command(
43                self._drone_manager.absolute_path(self._working_directory()))
44
45
46    def _generate_command(self, results_dir):
47        raise NotImplementedError('Subclasses must override this')
48
49
50    @property
51    def owner_username(self):
52        return self.queue_entries[0].job.owner
53
54
55    def _working_directory(self):
56        return self._get_consistent_execution_path(self.queue_entries)
57
58
59    def _paired_with_monitor(self):
60        return self._autoserv_monitor
61
62
63    def _job_was_aborted(self):
64        was_aborted = None
65        for queue_entry in self.queue_entries:
66            queue_entry.update_from_database()
67            if was_aborted is None: # first queue entry
68                was_aborted = bool(queue_entry.aborted)
69            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
70                entries = ['%s (aborted: %s)' % (entry, entry.aborted)
71                           for entry in self.queue_entries]
72                email_manager.manager.enqueue_notify_email(
73                        'Inconsistent abort state',
74                        'Queue entries have inconsistent abort state:\n' +
75                        '\n'.join(entries))
76                # don't crash here, just assume true
77                return True
78        return was_aborted
79
80
81    def _final_status(self):
82        if self._job_was_aborted():
83            return models.HostQueueEntry.Status.ABORTED
84
85        # we'll use a PidfileRunMonitor to read the autoserv exit status
86        if self._autoserv_monitor.exit_code() == 0:
87            return models.HostQueueEntry.Status.COMPLETED
88        return models.HostQueueEntry.Status.FAILED
89
90
91    def _set_all_statuses(self, status):
92        for queue_entry in self.queue_entries:
93            queue_entry.set_status(status)
94
95
96    def abort(self):
97        # override AgentTask.abort() to avoid killing the process and ending
98        # the task.  post-job tasks continue when the job is aborted.
99        pass
100
101
102    def _pidfile_label(self):
103        # '.autoserv_execute' -> 'autoserv'
104        return self._pidfile_name()[1:-len('_execute')]
105
106
107class SelfThrottledPostJobTask(PostJobTask):
108    """
109    PostJobTask that maintains its own process limit.
110
111    We throttle tasks like parsing because we don't want them to
112    hold up tests. At the same time we don't wish to build up load
113    that will take forever to parse.
114    """
115    _num_running_processes = 0
116    # Last known limit of max processes, used to check whether
117    # max processes config has been changed.
118    _last_known_max_processes = 0
119    # Whether an email should be sent to notifiy process limit being hit.
120    _notification_on = True
121    # Once process limit is hit, an email will be sent.
122    # To prevent spams, do not send another email until
123    # it drops to lower than the following level.
124    REVIVE_NOTIFICATION_THRESHOLD = 0.80
125
126    @classmethod
127    def _gauge_metrics(cls):
128        """Report to monarch the number of running processes."""
129        m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
130        m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
131
132
133    @classmethod
134    def _increment_running_processes(cls):
135        cls._num_running_processes += 1
136        cls._gauge_metrics()
137
138
139    @classmethod
140    def _decrement_running_processes(cls):
141        cls._num_running_processes -= 1
142        cls._gauge_metrics()
143
144
145    @classmethod
146    def _max_processes(cls):
147        raise NotImplementedError
148
149
150    @classmethod
151    def _can_run_new_process(cls):
152        return cls._num_running_processes < cls._max_processes()
153
154
155    def _process_started(self):
156        return bool(self.monitor)
157
158
159    def tick(self):
160        # override tick to keep trying to start until the process count goes
161        # down and we can, at which point we revert to default behavior
162        if self._process_started():
163            super(SelfThrottledPostJobTask, self).tick()
164        else:
165            self._try_starting_process()
166
167
168    def run(self):
169        # override run() to not actually run unless we can
170        self._try_starting_process()
171
172
173    @classmethod
174    def _notify_process_limit_hit(cls):
175        """Send an email to notify that process limit is hit."""
176        if cls._notification_on:
177            subject = '%s: hitting max process limit.' % cls.__name__
178            message = ('Running processes/Max processes: %d/%d'
179                       % (cls._num_running_processes, cls._max_processes()))
180            email_manager.manager.enqueue_notify_email(subject, message)
181            cls._notification_on = False
182
183
184    @classmethod
185    def _reset_notification_switch_if_necessary(cls):
186        """Reset _notification_on if necessary.
187
188        Set _notification_on to True on the following cases:
189        1) If the limit of max processes configuration changes;
190        2) If _notification_on is False and the number of running processes
191           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
192
193        """
194        if cls._last_known_max_processes != cls._max_processes():
195            cls._notification_on = True
196            cls._last_known_max_processes = cls._max_processes()
197            return
198        percentage = float(cls._num_running_processes) / cls._max_processes()
199        if (not cls._notification_on and
200            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
201            cls._notification_on = True
202
203
204    def _try_starting_process(self):
205        self._reset_notification_switch_if_necessary()
206        if not self._can_run_new_process():
207            self._notify_process_limit_hit()
208            return
209
210        # actually run the command
211        super(SelfThrottledPostJobTask, self).run()
212        if self._process_started():
213            self._increment_running_processes()
214
215
216    def finished(self, success):
217        super(SelfThrottledPostJobTask, self).finished(success)
218        if self._process_started():
219            self._decrement_running_processes()
220
221
222class GatherLogsTask(PostJobTask):
223    """
224    Task responsible for
225    * gathering uncollected logs (if Autoserv crashed hard or was killed)
226    * copying logs to the results repository
227    * spawning CleanupTasks for hosts, if necessary
228    * spawning a FinalReparseTask for the job
229    * setting the final status of the host, directly or through a cleanup
230    """
231    def __init__(self, queue_entries, recover_run_monitor=None):
232        self._job = queue_entries[0].job
233        super(GatherLogsTask, self).__init__(
234            queue_entries, log_file_name='.collect_crashinfo.log')
235        self._set_ids(queue_entries=queue_entries)
236
237
238    # TODO: Refactor into autoserv_utils. crbug.com/243090
239    def _generate_command(self, results_dir):
240        host_list = ','.join(queue_entry.host.hostname
241                             for queue_entry in self.queue_entries)
242        return [autoserv_utils.autoserv_path , '-p',
243                '--pidfile-label=%s' % self._pidfile_label(),
244                '--use-existing-results', '--collect-crashinfo',
245                '-m', host_list, '-r', results_dir]
246
247
248    @property
249    def num_processes(self):
250        return len(self.queue_entries)
251
252
253    def _pidfile_name(self):
254        return drone_manager.CRASHINFO_PID_FILE
255
256
257    def prolog(self):
258        self._check_queue_entry_statuses(
259                self.queue_entries,
260                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
261                allowed_host_statuses=(models.Host.Status.RUNNING,))
262
263        super(GatherLogsTask, self).prolog()
264
265
266    def epilog(self):
267        super(GatherLogsTask, self).epilog()
268        self._parse_results(self.queue_entries)
269        final_success, num_tests_failed = self._get_monitor_info()
270        reset_after_failure = (
271                not self._job.run_reset and (
272                    not final_success or num_tests_failed > 0))
273        self._reboot_hosts(final_success, num_tests_failed, reset_after_failure)
274        if reset_after_failure:
275            m = metrics.Counter('chromeos/autotest/scheduler/postjob_tasks/'
276                                'reset_after_failure')
277            m.increment(fields={'autoserv_process_success': final_success,
278                                'num_tests_failed': num_tests_failed > 0})
279            self._reset_after_failure()
280
281
282    def _get_monitor_info(self):
283        """Read monitor info from pidfile.
284
285        @returns: a tuple including
286            final_success: whether the monitor is successfully finished;
287            num_tests_failed: how many failed tests in the process.
288        """
289        if self._autoserv_monitor.has_process():
290            final_success = (self._final_status() ==
291                             models.HostQueueEntry.Status.COMPLETED)
292            num_tests_failed = self._autoserv_monitor.num_tests_failed()
293        else:
294            final_success = False
295            num_tests_failed = 0
296
297        return final_success, num_tests_failed
298
299
300    def _reboot_hosts(self, final_success, num_tests_failed,
301                      reset_after_failure):
302        """Reboot hosts by scheduling a CLEANUP task on host if needed.
303
304        @param final_success: whether the monitor successfully exits.
305        @param num_tests_failed: how many failed tests in total.
306        @param reset_after_failure: whether to schedule RESET task later.
307        """
308        reboot_after = self._job.reboot_after
309        do_reboot = (
310                # always reboot after aborted jobs
311                self._final_status() == models.HostQueueEntry.Status.ABORTED
312                or reboot_after == model_attributes.RebootAfter.ALWAYS
313                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
314                    and final_success and num_tests_failed == 0)
315                or (num_tests_failed > 0 and not reset_after_failure))
316
317        for queue_entry in self.queue_entries:
318            if do_reboot:
319                # don't pass the queue entry to the CleanupTask. if the cleanup
320                # fails, the job doesn't care -- it's over.
321                models.SpecialTask.objects.create(
322                        host=models.Host.objects.get(id=queue_entry.host.id),
323                        task=models.SpecialTask.Task.CLEANUP,
324                        requested_by=self._job.owner_model())
325            else:
326                queue_entry.host.set_status(models.Host.Status.READY)
327
328
329    def _reset_after_failure(self):
330        """Queue a RESET job for the host if job fails.
331
332        The current hqe entry is not passed into the RESET job.
333        """
334        for queue_entry in self.queue_entries:
335            models.SpecialTask.objects.create(
336                        host=models.Host.objects.get(id=queue_entry.host.id),
337                        task=models.SpecialTask.Task.RESET,
338                        requested_by=self._job.owner_model())
339
340
341    def run(self):
342        autoserv_exit_code = self._autoserv_monitor.exit_code()
343        # only run if Autoserv exited due to some signal. if we have no exit
344        # code, assume something bad (and signal-like) happened.
345        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
346            super(GatherLogsTask, self).run()
347        else:
348            self.finished(True)
349
350
351class FinalReparseTask(SelfThrottledPostJobTask):
352    def __init__(self, queue_entries):
353        super(FinalReparseTask, self).__init__(queue_entries,
354                                               log_file_name='.parse.log')
355        # don't use _set_ids, since we don't want to set the host_ids
356        self.queue_entry_ids = [entry.id for entry in queue_entries]
357
358
359    def _generate_command(self, results_dir):
360        return [_parser_path, '--detach', '--write-pidfile',
361                '--record-duration', '--suite-report', '-l', '2', '-r', '-o',
362                results_dir]
363
364
365    @property
366    def num_processes(self):
367        return 0 # don't include parser processes in accounting
368
369
370    def _pidfile_name(self):
371        return drone_manager.PARSER_PID_FILE
372
373
374    @classmethod
375    def _max_processes(cls):
376        return scheduler_config.config.max_parse_processes
377
378
379    def prolog(self):
380        self._check_queue_entry_statuses(
381                self.queue_entries,
382                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
383
384        super(FinalReparseTask, self).prolog()
385
386
387    def epilog(self):
388        super(FinalReparseTask, self).epilog()
389        self._set_all_statuses(self._final_status())
390