1#pylint: disable-msg=C0111 2 3""" 4Postjob task. 5 6Postjob tasks are responsible for setting the final status of the HQE 7and Host, and scheduling additional special agents such as cleanup, 8if necessary. 9""" 10 11import os 12 13from autotest_lib.client.common_lib import utils 14from autotest_lib.frontend.afe import models, model_attributes 15from autotest_lib.scheduler import agent_task, drones, drone_manager 16from autotest_lib.scheduler import email_manager, pidfile_monitor 17from autotest_lib.scheduler import scheduler_config 18from autotest_lib.server import autoserv_utils 19 20try: 21 from chromite.lib import metrics 22except ImportError: 23 metrics = utils.metrics_mock 24 25 26_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse') 27 28 29class PostJobTask(agent_task.AgentTask): 30 def __init__(self, queue_entries, log_file_name): 31 super(PostJobTask, self).__init__(log_file_name=log_file_name) 32 33 self.queue_entries = queue_entries 34 35 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor() 36 self._autoserv_monitor.attach_to_existing_process( 37 self._working_directory()) 38 39 40 def _command_line(self): 41 # Do we need testing_mode? 42 return self._generate_command( 43 self._drone_manager.absolute_path(self._working_directory())) 44 45 46 def _generate_command(self, results_dir): 47 raise NotImplementedError('Subclasses must override this') 48 49 50 @property 51 def owner_username(self): 52 return self.queue_entries[0].job.owner 53 54 55 def _working_directory(self): 56 return self._get_consistent_execution_path(self.queue_entries) 57 58 59 def _paired_with_monitor(self): 60 return self._autoserv_monitor 61 62 63 def _job_was_aborted(self): 64 was_aborted = None 65 for queue_entry in self.queue_entries: 66 queue_entry.update_from_database() 67 if was_aborted is None: # first queue entry 68 was_aborted = bool(queue_entry.aborted) 69 elif was_aborted != bool(queue_entry.aborted): # subsequent entries 70 entries = ['%s (aborted: %s)' % (entry, entry.aborted) 71 for entry in self.queue_entries] 72 email_manager.manager.enqueue_notify_email( 73 'Inconsistent abort state', 74 'Queue entries have inconsistent abort state:\n' + 75 '\n'.join(entries)) 76 # don't crash here, just assume true 77 return True 78 return was_aborted 79 80 81 def _final_status(self): 82 if self._job_was_aborted(): 83 return models.HostQueueEntry.Status.ABORTED 84 85 # we'll use a PidfileRunMonitor to read the autoserv exit status 86 if self._autoserv_monitor.exit_code() == 0: 87 return models.HostQueueEntry.Status.COMPLETED 88 return models.HostQueueEntry.Status.FAILED 89 90 91 def _set_all_statuses(self, status): 92 for queue_entry in self.queue_entries: 93 queue_entry.set_status(status) 94 95 96 def abort(self): 97 # override AgentTask.abort() to avoid killing the process and ending 98 # the task. post-job tasks continue when the job is aborted. 99 pass 100 101 102 def _pidfile_label(self): 103 # '.autoserv_execute' -> 'autoserv' 104 return self._pidfile_name()[1:-len('_execute')] 105 106 107class SelfThrottledPostJobTask(PostJobTask): 108 """ 109 PostJobTask that maintains its own process limit. 110 111 We throttle tasks like parsing because we don't want them to 112 hold up tests. At the same time we don't wish to build up load 113 that will take forever to parse. 114 """ 115 _num_running_processes = 0 116 # Last known limit of max processes, used to check whether 117 # max processes config has been changed. 118 _last_known_max_processes = 0 119 # Whether an email should be sent to notifiy process limit being hit. 120 _notification_on = True 121 # Once process limit is hit, an email will be sent. 122 # To prevent spams, do not send another email until 123 # it drops to lower than the following level. 124 REVIVE_NOTIFICATION_THRESHOLD = 0.80 125 126 @classmethod 127 def _gauge_metrics(cls): 128 """Report to monarch the number of running processes.""" 129 m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks') 130 m.set(cls._num_running_processes, fields={'task_name': cls.__name__}) 131 132 133 @classmethod 134 def _increment_running_processes(cls): 135 cls._num_running_processes += 1 136 cls._gauge_metrics() 137 138 139 @classmethod 140 def _decrement_running_processes(cls): 141 cls._num_running_processes -= 1 142 cls._gauge_metrics() 143 144 145 @classmethod 146 def _max_processes(cls): 147 raise NotImplementedError 148 149 150 @classmethod 151 def _can_run_new_process(cls): 152 return cls._num_running_processes < cls._max_processes() 153 154 155 def _process_started(self): 156 return bool(self.monitor) 157 158 159 def tick(self): 160 # override tick to keep trying to start until the process count goes 161 # down and we can, at which point we revert to default behavior 162 if self._process_started(): 163 super(SelfThrottledPostJobTask, self).tick() 164 else: 165 self._try_starting_process() 166 167 168 def run(self): 169 # override run() to not actually run unless we can 170 self._try_starting_process() 171 172 173 @classmethod 174 def _notify_process_limit_hit(cls): 175 """Send an email to notify that process limit is hit.""" 176 if cls._notification_on: 177 subject = '%s: hitting max process limit.' % cls.__name__ 178 message = ('Running processes/Max processes: %d/%d' 179 % (cls._num_running_processes, cls._max_processes())) 180 email_manager.manager.enqueue_notify_email(subject, message) 181 cls._notification_on = False 182 183 184 @classmethod 185 def _reset_notification_switch_if_necessary(cls): 186 """Reset _notification_on if necessary. 187 188 Set _notification_on to True on the following cases: 189 1) If the limit of max processes configuration changes; 190 2) If _notification_on is False and the number of running processes 191 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD. 192 193 """ 194 if cls._last_known_max_processes != cls._max_processes(): 195 cls._notification_on = True 196 cls._last_known_max_processes = cls._max_processes() 197 return 198 percentage = float(cls._num_running_processes) / cls._max_processes() 199 if (not cls._notification_on and 200 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD): 201 cls._notification_on = True 202 203 204 def _try_starting_process(self): 205 self._reset_notification_switch_if_necessary() 206 if not self._can_run_new_process(): 207 self._notify_process_limit_hit() 208 return 209 210 # actually run the command 211 super(SelfThrottledPostJobTask, self).run() 212 if self._process_started(): 213 self._increment_running_processes() 214 215 216 def finished(self, success): 217 super(SelfThrottledPostJobTask, self).finished(success) 218 if self._process_started(): 219 self._decrement_running_processes() 220 221 222class GatherLogsTask(PostJobTask): 223 """ 224 Task responsible for 225 * gathering uncollected logs (if Autoserv crashed hard or was killed) 226 * copying logs to the results repository 227 * spawning CleanupTasks for hosts, if necessary 228 * spawning a FinalReparseTask for the job 229 * setting the final status of the host, directly or through a cleanup 230 """ 231 def __init__(self, queue_entries, recover_run_monitor=None): 232 self._job = queue_entries[0].job 233 super(GatherLogsTask, self).__init__( 234 queue_entries, log_file_name='.collect_crashinfo.log') 235 self._set_ids(queue_entries=queue_entries) 236 237 238 # TODO: Refactor into autoserv_utils. crbug.com/243090 239 def _generate_command(self, results_dir): 240 host_list = ','.join(queue_entry.host.hostname 241 for queue_entry in self.queue_entries) 242 return [autoserv_utils.autoserv_path , '-p', 243 '--pidfile-label=%s' % self._pidfile_label(), 244 '--use-existing-results', '--collect-crashinfo', 245 '-m', host_list, '-r', results_dir] 246 247 248 @property 249 def num_processes(self): 250 return len(self.queue_entries) 251 252 253 def _pidfile_name(self): 254 return drone_manager.CRASHINFO_PID_FILE 255 256 257 def prolog(self): 258 self._check_queue_entry_statuses( 259 self.queue_entries, 260 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,), 261 allowed_host_statuses=(models.Host.Status.RUNNING,)) 262 263 super(GatherLogsTask, self).prolog() 264 265 266 def epilog(self): 267 super(GatherLogsTask, self).epilog() 268 self._parse_results(self.queue_entries) 269 self._reboot_hosts() 270 271 272 def _reboot_hosts(self): 273 if self._autoserv_monitor.has_process(): 274 final_success = (self._final_status() == 275 models.HostQueueEntry.Status.COMPLETED) 276 num_tests_failed = self._autoserv_monitor.num_tests_failed() 277 else: 278 final_success = False 279 num_tests_failed = 0 280 reboot_after = self._job.reboot_after 281 do_reboot = ( 282 # always reboot after aborted jobs 283 self._final_status() == models.HostQueueEntry.Status.ABORTED 284 or reboot_after == model_attributes.RebootAfter.ALWAYS 285 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED 286 and final_success and num_tests_failed == 0) 287 or num_tests_failed > 0) 288 289 for queue_entry in self.queue_entries: 290 if do_reboot: 291 # don't pass the queue entry to the CleanupTask. if the cleanup 292 # fails, the job doesn't care -- it's over. 293 models.SpecialTask.objects.create( 294 host=models.Host.objects.get(id=queue_entry.host.id), 295 task=models.SpecialTask.Task.CLEANUP, 296 requested_by=self._job.owner_model()) 297 else: 298 queue_entry.host.set_status(models.Host.Status.READY) 299 300 301 def run(self): 302 autoserv_exit_code = self._autoserv_monitor.exit_code() 303 # only run if Autoserv exited due to some signal. if we have no exit 304 # code, assume something bad (and signal-like) happened. 305 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code): 306 super(GatherLogsTask, self).run() 307 else: 308 self.finished(True) 309 310 311class FinalReparseTask(SelfThrottledPostJobTask): 312 def __init__(self, queue_entries): 313 super(FinalReparseTask, self).__init__(queue_entries, 314 log_file_name='.parse.log') 315 # don't use _set_ids, since we don't want to set the host_ids 316 self.queue_entry_ids = [entry.id for entry in queue_entries] 317 318 319 def _generate_command(self, results_dir): 320 return [_parser_path, '--write-pidfile', '--record-duration', 321 '--suite-report', '-l', '2', '-r', '-o', results_dir] 322 323 324 @property 325 def num_processes(self): 326 return 0 # don't include parser processes in accounting 327 328 329 def _pidfile_name(self): 330 return drone_manager.PARSER_PID_FILE 331 332 333 @classmethod 334 def _max_processes(cls): 335 return scheduler_config.config.max_parse_processes 336 337 338 def prolog(self): 339 self._check_queue_entry_statuses( 340 self.queue_entries, 341 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,)) 342 343 super(FinalReparseTask, self).prolog() 344 345 346 def epilog(self): 347 super(FinalReparseTask, self).epilog() 348 self._archive_results(self.queue_entries) 349 350 351class ArchiveResultsTask(SelfThrottledPostJobTask): 352 _ARCHIVING_FAILED_FILE = '.archiver_failed' 353 354 def __init__(self, queue_entries): 355 super(ArchiveResultsTask, self).__init__(queue_entries, 356 log_file_name='.archiving.log') 357 # don't use _set_ids, since we don't want to set the host_ids 358 self.queue_entry_ids = [entry.id for entry in queue_entries] 359 360 361 def _pidfile_name(self): 362 return drone_manager.ARCHIVER_PID_FILE 363 364 365 # TODO: Refactor into autoserv_utils. crbug.com/243090 366 def _generate_command(self, results_dir): 367 return [autoserv_utils.autoserv_path , '-p', 368 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir, 369 '--use-existing-results', '--control-filename=control.archive', 370 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler', 371 'archive_results.control.srv')] 372 373 374 @classmethod 375 def _max_processes(cls): 376 return scheduler_config.config.max_transfer_processes 377 378 379 def prolog(self): 380 self._check_queue_entry_statuses( 381 self.queue_entries, 382 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,)) 383 384 super(ArchiveResultsTask, self).prolog() 385 386 387 def epilog(self): 388 super(ArchiveResultsTask, self).epilog() 389 if not self.success and self._paired_with_monitor().has_process(): 390 failed_file = os.path.join(self._working_directory(), 391 self._ARCHIVING_FAILED_FILE) 392 paired_process = self._paired_with_monitor().get_process() 393 self._drone_manager.write_lines_to_file( 394 failed_file, ['Archiving failed with exit code %s' 395 % self.monitor.exit_code()], 396 paired_with_process=paired_process) 397 self._set_all_statuses(self._final_status()) 398