1""" 2Autotest AFE Cleanup used by the scheduler 3""" 4 5 6import logging 7import random 8import time 9 10from autotest_lib.client.common_lib import utils 11from autotest_lib.frontend.afe import models 12from autotest_lib.scheduler import email_manager 13from autotest_lib.scheduler import scheduler_config 14from autotest_lib.client.common_lib import global_config 15from autotest_lib.client.common_lib import host_protections 16 17try: 18 from chromite.lib import metrics 19except ImportError: 20 metrics = utils.metrics_mock 21 22 23class PeriodicCleanup(object): 24 """Base class to schedule periodical cleanup work. 25 """ 26 27 def __init__(self, db, clean_interval_minutes, run_at_initialize=False): 28 self._db = db 29 self.clean_interval_minutes = clean_interval_minutes 30 self._last_clean_time = time.time() 31 self._run_at_initialize = run_at_initialize 32 33 34 def initialize(self): 35 """Method called by scheduler at the startup. 36 """ 37 if self._run_at_initialize: 38 self._cleanup() 39 40 41 def run_cleanup_maybe(self): 42 """Test if cleanup method should be called. 43 """ 44 should_cleanup = (self._last_clean_time + 45 self.clean_interval_minutes * 60 46 < time.time()) 47 if should_cleanup: 48 self._cleanup() 49 self._last_clean_time = time.time() 50 51 52 def _cleanup(self): 53 """Abrstract cleanup method.""" 54 raise NotImplementedError 55 56 57class UserCleanup(PeriodicCleanup): 58 """User cleanup that is controlled by the global config variable 59 clean_interval_minutes in the SCHEDULER section. 60 """ 61 62 def __init__(self, db, clean_interval_minutes): 63 super(UserCleanup, self).__init__(db, clean_interval_minutes) 64 self._last_reverify_time = time.time() 65 66 67 @metrics.SecondsTimerDecorator( 68 'chromeos/autotest/scheduler/cleanup/user/durations') 69 def _cleanup(self): 70 logging.info('Running periodic cleanup') 71 self._abort_timed_out_jobs() 72 self._abort_jobs_past_max_runtime() 73 self._clear_inactive_blocks() 74 self._check_for_db_inconsistencies() 75 self._reverify_dead_hosts() 76 self._django_session_cleanup() 77 78 79 def _abort_timed_out_jobs(self): 80 msg = 'Aborting all jobs that have timed out and are not complete' 81 logging.info(msg) 82 query = models.Job.objects.filter(hostqueueentry__complete=False).extra( 83 where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) 84 for job in query.distinct(): 85 logging.warning('Aborting job %d due to job timeout', job.id) 86 job.abort() 87 88 89 def _abort_jobs_past_max_runtime(self): 90 """ 91 Abort executions that have started and are past the job's max runtime. 92 """ 93 logging.info('Aborting all jobs that have passed maximum runtime') 94 rows = self._db.execute(""" 95 SELECT hqe.id FROM afe_host_queue_entries AS hqe 96 WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS 97 (select * from afe_jobs where hqe.job_id=afe_jobs.id and 98 hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW()) 99 """) 100 query = models.HostQueueEntry.objects.filter( 101 id__in=[row[0] for row in rows]) 102 for queue_entry in query.distinct(): 103 logging.warning('Aborting entry %s due to max runtime', queue_entry) 104 queue_entry.abort() 105 106 107 def _check_for_db_inconsistencies(self): 108 logging.info('Cleaning db inconsistencies') 109 self._check_all_invalid_related_objects() 110 111 112 def _check_invalid_related_objects_one_way(self, first_model, 113 relation_field, second_model): 114 if 'invalid' not in first_model.get_field_dict(): 115 return [] 116 invalid_objects = list(first_model.objects.filter(invalid=True)) 117 first_model.objects.populate_relationships(invalid_objects, 118 second_model, 119 'related_objects') 120 error_lines = [] 121 for invalid_object in invalid_objects: 122 if invalid_object.related_objects: 123 related_list = ', '.join(str(related_object) for related_object 124 in invalid_object.related_objects) 125 error_lines.append('Invalid %s %s is related to %ss: %s' 126 % (first_model.__name__, invalid_object, 127 second_model.__name__, related_list)) 128 related_manager = getattr(invalid_object, relation_field) 129 related_manager.clear() 130 return error_lines 131 132 133 def _check_invalid_related_objects(self, first_model, first_field, 134 second_model, second_field): 135 errors = self._check_invalid_related_objects_one_way( 136 first_model, first_field, second_model) 137 errors.extend(self._check_invalid_related_objects_one_way( 138 second_model, second_field, first_model)) 139 return errors 140 141 142 def _check_all_invalid_related_objects(self): 143 model_pairs = ((models.Host, 'labels', models.Label, 'host_set'), 144 (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'), 145 (models.AclGroup, 'users', models.User, 'aclgroup_set'), 146 (models.Test, 'dependency_labels', models.Label, 147 'test_set')) 148 errors = [] 149 for first_model, first_field, second_model, second_field in model_pairs: 150 errors.extend(self._check_invalid_related_objects( 151 first_model, first_field, second_model, second_field)) 152 153 if errors: 154 m = 'chromeos/autotest/scheduler/cleanup/invalid_models_cleaned' 155 metrics.Counter(m).increment_by(len(errors)) 156 logging.warn('Cleaned invalid models due to errors: %s' 157 % ('\n'.join(errors))) 158 159 def _clear_inactive_blocks(self): 160 msg = 'Clear out blocks for all completed jobs.' 161 logging.info(msg) 162 # this would be simpler using NOT IN (subquery), but MySQL 163 # treats all IN subqueries as dependent, so this optimizes much 164 # better 165 self._db.execute(""" 166 DELETE ihq FROM afe_ineligible_host_queues ihq 167 WHERE NOT EXISTS 168 (SELECT job_id FROM afe_host_queue_entries hqe 169 WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""") 170 171 172 def _should_reverify_hosts_now(self): 173 reverify_period_sec = (scheduler_config.config.reverify_period_minutes 174 * 60) 175 if reverify_period_sec == 0: 176 return False 177 return (self._last_reverify_time + reverify_period_sec) <= time.time() 178 179 180 def _choose_subset_of_hosts_to_reverify(self, hosts): 181 """Given hosts needing verification, return a subset to reverify.""" 182 max_at_once = scheduler_config.config.reverify_max_hosts_at_once 183 if (max_at_once > 0 and len(hosts) > max_at_once): 184 return random.sample(hosts, max_at_once) 185 return sorted(hosts) 186 187 188 def _reverify_dead_hosts(self): 189 if not self._should_reverify_hosts_now(): 190 return 191 192 self._last_reverify_time = time.time() 193 logging.info('Checking for dead hosts to reverify') 194 hosts = models.Host.objects.filter( 195 status=models.Host.Status.REPAIR_FAILED, 196 locked=False, 197 invalid=False) 198 hosts = hosts.exclude( 199 protection=host_protections.Protection.DO_NOT_VERIFY) 200 if not hosts: 201 return 202 203 hosts = list(hosts) 204 total_hosts = len(hosts) 205 hosts = self._choose_subset_of_hosts_to_reverify(hosts) 206 logging.info('Reverifying dead hosts (%d of %d) %s', len(hosts), 207 total_hosts, ', '.join(host.hostname for host in hosts)) 208 for host in hosts: 209 models.SpecialTask.schedule_special_task( 210 host=host, task=models.SpecialTask.Task.VERIFY) 211 212 213 def _django_session_cleanup(self): 214 """Clean up django_session since django doesn't for us. 215 http://www.djangoproject.com/documentation/0.96/sessions/ 216 """ 217 logging.info('Deleting old sessions from django_session') 218 sql = 'TRUNCATE TABLE django_session' 219 self._db.execute(sql) 220 221 222class TwentyFourHourUpkeep(PeriodicCleanup): 223 """Cleanup that runs at the startup of monitor_db and every subsequent 224 twenty four hours. 225 """ 226 227 228 def __init__(self, db, drone_manager, run_at_initialize=True): 229 """Initialize TwentyFourHourUpkeep. 230 231 @param db: Database connection object. 232 @param drone_manager: DroneManager to access drones. 233 @param run_at_initialize: True to run cleanup when scheduler starts. 234 Default is set to True. 235 236 """ 237 self.drone_manager = drone_manager 238 clean_interval_minutes = 24 * 60 # 24 hours 239 super(TwentyFourHourUpkeep, self).__init__( 240 db, clean_interval_minutes, run_at_initialize=run_at_initialize) 241 242 243 @metrics.SecondsTimerDecorator( 244 'chromeos/autotest/scheduler/cleanup/daily/durations') 245 def _cleanup(self): 246 logging.info('Running 24 hour clean up') 247 self._check_for_uncleanable_db_inconsistencies() 248 self._cleanup_orphaned_containers() 249 250 251 def _check_for_uncleanable_db_inconsistencies(self): 252 logging.info('Checking for uncleanable DB inconsistencies') 253 self._check_for_active_and_complete_queue_entries() 254 self._check_for_multiple_platform_hosts() 255 self._check_for_no_platform_hosts() 256 257 258 def _check_for_active_and_complete_queue_entries(self): 259 query = models.HostQueueEntry.objects.filter(active=True, complete=True) 260 if query.count() != 0: 261 subject = ('%d queue entries found with active=complete=1' 262 % query.count()) 263 lines = [] 264 for entry in query: 265 lines.append(str(entry.get_object_dict())) 266 if entry.status == 'Aborted': 267 logging.error('Aborted entry: %s is both active and ' 268 'complete. Setting active value to False.', 269 str(entry)) 270 entry.active = False 271 entry.save() 272 self._send_inconsistency_message(subject, lines) 273 274 275 def _check_for_multiple_platform_hosts(self): 276 rows = self._db.execute(""" 277 SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count, 278 GROUP_CONCAT(afe_labels.name) 279 FROM afe_hosts 280 INNER JOIN afe_hosts_labels ON 281 afe_hosts.id = afe_hosts_labels.host_id 282 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id 283 WHERE afe_labels.platform 284 GROUP BY afe_hosts.id 285 HAVING platform_count > 1 286 ORDER BY hostname""") 287 if rows: 288 subject = '%s hosts with multiple platforms' % self._db.rowcount 289 lines = [' '.join(str(item) for item in row) 290 for row in rows] 291 self._send_inconsistency_message(subject, lines) 292 293 294 def _check_for_no_platform_hosts(self): 295 rows = self._db.execute(""" 296 SELECT hostname 297 FROM afe_hosts 298 LEFT JOIN afe_hosts_labels 299 ON afe_hosts.id = afe_hosts_labels.host_id 300 AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels 301 WHERE platform) 302 WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""") 303 if rows: 304 logging.warning('%s hosts with no platform\n%s', self._db.rowcount, 305 ', '.join(row[0] for row in rows)) 306 307 308 def _send_inconsistency_message(self, subject, lines): 309 logging.error(subject) 310 message = '\n'.join(lines) 311 if len(message) > 5000: 312 message = message[:5000] + '\n(truncated)\n' 313 email_manager.manager.enqueue_notify_email(subject, message) 314 315 316 def _cleanup_orphaned_containers(self): 317 """Cleanup orphaned containers in each drone. 318 319 The function queues a lxc_cleanup call in each drone without waiting for 320 the script to finish, as the cleanup procedure could take minutes and the 321 script output is logged. 322 323 """ 324 ssp_enabled = global_config.global_config.get_config_value( 325 'AUTOSERV', 'enable_ssp_container') 326 if not ssp_enabled: 327 logging.info('Server-side packaging is not enabled, no need to clean' 328 ' up orphaned containers.') 329 return 330 self.drone_manager.cleanup_orphaned_containers() 331