1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5#pylint: disable-msg=C0111 6 7import os 8import logging 9import time 10 11from autotest_lib.client.common_lib import global_config 12from autotest_lib.frontend.afe import models 13from autotest_lib.scheduler import email_manager 14from autotest_lib.scheduler import scheduler_config, scheduler_models 15 16 17# Override default parser with our site parser. 18def parser_path(install_dir): 19 """Return site implementation of parser. 20 21 @param install_dir: installation directory. 22 """ 23 return os.path.join(install_dir, 'tko', 'site_parse') 24 25 26class SiteAgentTask(object): 27 """ 28 SiteAgentTask subclasses BaseAgentTask in monitor_db. 29 """ 30 31 32 def _archive_results(self, queue_entries): 33 """ 34 Set the status of queue_entries to ARCHIVING. 35 36 This method sets the status of the queue_entries to ARCHIVING 37 if the enable_archiving flag is true in global_config.ini. 38 Otherwise, it bypasses the archiving step and sets the queue entries 39 to the final status of current step. 40 """ 41 enable_archiving = global_config.global_config.get_config_value( 42 scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) 43 # Set the status of the queue entries to archiving or self final status 44 if enable_archiving: 45 status = models.HostQueueEntry.Status.ARCHIVING 46 else: 47 status = self._final_status() 48 49 for queue_entry in self.queue_entries: 50 queue_entry.set_status(status) 51 52 53 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 54 allowed_host_statuses=None): 55 """ 56 Forked from monitor_db.py 57 """ 58 class_name = self.__class__.__name__ 59 for entry in queue_entries: 60 if entry.status not in allowed_hqe_statuses: 61 # In the orignal code, here we raise an exception. In an 62 # effort to prevent downtime we will instead abort the job and 63 # send out an email notifying us this has occured. 64 error_message = ('%s attempting to start entry with invalid ' 65 'status %s: %s. Aborting Job: %s.' 66 % (class_name, entry.status, entry, 67 entry.job)) 68 logging.error(error_message) 69 email_manager.manager.enqueue_notify_email( 70 'Job Aborted - Invalid Host Queue Entry Status', 71 error_message) 72 entry.job.request_abort() 73 invalid_host_status = ( 74 allowed_host_statuses is not None 75 and entry.host.status not in allowed_host_statuses) 76 if invalid_host_status: 77 # In the orignal code, here we raise an exception. In an 78 # effort to prevent downtime we will instead abort the job and 79 # send out an email notifying us this has occured. 80 error_message = ('%s attempting to start on queue entry with ' 81 'invalid host status %s: %s. Aborting Job: %s' 82 % (class_name, entry.host.status, entry, 83 entry.job)) 84 logging.error(error_message) 85 email_manager.manager.enqueue_notify_email( 86 'Job Aborted - Invalid Host Status', error_message) 87 entry.job.request_abort() 88 89 90class SiteDispatcher(object): 91 """ 92 SiteDispatcher subclasses BaseDispatcher in monitor_db. 93 """ 94 DEFAULT_REQUESTED_BY_USER_ID = 1 95 96 97 def _reverify_hosts_where(self, where, 98 print_message='Reverifying host %s'): 99 """ 100 This is an altered version of _reverify_hosts_where the class to 101 models.SpecialTask.objects.create passes in an argument for 102 requested_by, in order to allow the Reset task to be created 103 properly. 104 """ 105 full_where='locked = 0 AND invalid = 0 AND ' + where 106 for host in scheduler_models.Host.fetch(where=full_where): 107 if self.host_has_agent(host): 108 # host has already been recovered in some way 109 continue 110 if self._host_has_scheduled_special_task(host): 111 # host will have a special task scheduled on the next cycle 112 continue 113 if print_message: 114 logging.error(print_message, host.hostname) 115 try: 116 user = models.User.objects.get(login='autotest_system') 117 except models.User.DoesNotExist: 118 user = models.User.objects.get( 119 id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID) 120 models.SpecialTask.objects.create( 121 task=models.SpecialTask.Task.RESET, 122 host=models.Host.objects.get(id=host.id), 123 requested_by=user) 124 125 126 def _check_for_unrecovered_verifying_entries(self): 127 # Verify is replaced by Reset. 128 queue_entries = scheduler_models.HostQueueEntry.fetch( 129 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING) 130 for queue_entry in queue_entries: 131 special_tasks = models.SpecialTask.objects.filter( 132 task__in=(models.SpecialTask.Task.CLEANUP, 133 models.SpecialTask.Task.VERIFY, 134 models.SpecialTask.Task.RESET), 135 queue_entry__id=queue_entry.id, 136 is_complete=False) 137 if special_tasks.count() == 0: 138 logging.error('Unrecovered Resetting host queue entry: %s. ' 139 'Setting status to Queued.', str(queue_entry)) 140 # Essentially this host queue entry was set to be Verifying 141 # however no special task exists for entry. This occurs if the 142 # scheduler dies between changing the status and creating the 143 # special task. By setting it to queued, the job can restart 144 # from the beginning and proceed correctly. This is much more 145 # preferable than having monitor_db not launching. 146 queue_entry.set_status('Queued') 147