1""" 2Autotest AFE Cleanup used by the scheduler 3""" 4 5 6import time, logging, random 7 8from autotest_lib.frontend.afe import models 9from autotest_lib.scheduler import email_manager 10from autotest_lib.scheduler import scheduler_config 11from autotest_lib.client.common_lib import global_config 12from autotest_lib.client.common_lib import host_protections 13from autotest_lib.client.common_lib.cros.graphite import autotest_stats 14 15 16class PeriodicCleanup(object): 17 """Base class to schedule periodical cleanup work. 18 """ 19 20 def __init__(self, db, clean_interval_minutes, run_at_initialize=False): 21 self._db = db 22 self.clean_interval_minutes = clean_interval_minutes 23 self._last_clean_time = time.time() 24 self._run_at_initialize = run_at_initialize 25 26 27 def initialize(self): 28 """Method called by scheduler at the startup. 29 """ 30 if self._run_at_initialize: 31 self._cleanup() 32 33 34 def run_cleanup_maybe(self): 35 """Test if cleanup method should be called. 36 """ 37 should_cleanup = (self._last_clean_time + 38 self.clean_interval_minutes * 60 39 < time.time()) 40 if should_cleanup: 41 self._cleanup() 42 self._last_clean_time = time.time() 43 44 45 def _cleanup(self): 46 """Abrstract cleanup method.""" 47 raise NotImplementedError 48 49 50class UserCleanup(PeriodicCleanup): 51 """User cleanup that is controlled by the global config variable 52 clean_interval_minutes in the SCHEDULER section. 53 """ 54 timer = autotest_stats.Timer('monitor_db_cleanup.user_cleanup') 55 56 57 def __init__(self, db, clean_interval_minutes): 58 super(UserCleanup, self).__init__(db, clean_interval_minutes) 59 self._last_reverify_time = time.time() 60 61 62 @timer.decorate 63 def _cleanup(self): 64 logging.info('Running periodic cleanup') 65 self._abort_timed_out_jobs() 66 self._abort_jobs_past_max_runtime() 67 self._clear_inactive_blocks() 68 self._check_for_db_inconsistencies() 69 self._reverify_dead_hosts() 70 self._django_session_cleanup() 71 72 73 @timer.decorate 74 def _abort_timed_out_jobs(self): 75 msg = 'Aborting all jobs that have timed out and are not complete' 76 logging.info(msg) 77 query = models.Job.objects.filter(hostqueueentry__complete=False).extra( 78 where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) 79 for job in query.distinct(): 80 logging.warning('Aborting job %d due to job timeout', job.id) 81 job.abort() 82 83 84 @timer.decorate 85 def _abort_jobs_past_max_runtime(self): 86 """ 87 Abort executions that have started and are past the job's max runtime. 88 """ 89 logging.info('Aborting all jobs that have passed maximum runtime') 90 rows = self._db.execute(""" 91 SELECT hqe.id 92 FROM afe_host_queue_entries AS hqe 93 INNER JOIN afe_jobs ON (hqe.job_id = afe_jobs.id) 94 WHERE NOT hqe.complete AND NOT hqe.aborted AND 95 hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < 96 NOW()""") 97 query = models.HostQueueEntry.objects.filter( 98 id__in=[row[0] for row in rows]) 99 for queue_entry in query.distinct(): 100 logging.warning('Aborting entry %s due to max runtime', queue_entry) 101 queue_entry.abort() 102 103 104 @timer.decorate 105 def _check_for_db_inconsistencies(self): 106 logging.info('Cleaning db inconsistencies') 107 self._check_all_invalid_related_objects() 108 109 110 def _check_invalid_related_objects_one_way(self, first_model, 111 relation_field, second_model): 112 if 'invalid' not in first_model.get_field_dict(): 113 return [] 114 invalid_objects = list(first_model.objects.filter(invalid=True)) 115 first_model.objects.populate_relationships(invalid_objects, 116 second_model, 117 'related_objects') 118 error_lines = [] 119 for invalid_object in invalid_objects: 120 if invalid_object.related_objects: 121 related_list = ', '.join(str(related_object) for related_object 122 in invalid_object.related_objects) 123 error_lines.append('Invalid %s %s is related to %ss: %s' 124 % (first_model.__name__, invalid_object, 125 second_model.__name__, related_list)) 126 related_manager = getattr(invalid_object, relation_field) 127 related_manager.clear() 128 return error_lines 129 130 131 def _check_invalid_related_objects(self, first_model, first_field, 132 second_model, second_field): 133 errors = self._check_invalid_related_objects_one_way( 134 first_model, first_field, second_model) 135 errors.extend(self._check_invalid_related_objects_one_way( 136 second_model, second_field, first_model)) 137 return errors 138 139 140 def _check_all_invalid_related_objects(self): 141 model_pairs = ((models.Host, 'labels', models.Label, 'host_set'), 142 (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'), 143 (models.AclGroup, 'users', models.User, 'aclgroup_set'), 144 (models.Test, 'dependency_labels', models.Label, 145 'test_set')) 146 errors = [] 147 for first_model, first_field, second_model, second_field in model_pairs: 148 errors.extend(self._check_invalid_related_objects( 149 first_model, first_field, second_model, second_field)) 150 151 if errors: 152 subject = ('%s relationships to invalid models, cleaned all' % 153 len(errors)) 154 message = '\n'.join(errors) 155 logging.warning(subject) 156 logging.warning(message) 157 email_manager.manager.enqueue_notify_email(subject, message) 158 159 160 @timer.decorate 161 def _clear_inactive_blocks(self): 162 msg = 'Clear out blocks for all completed jobs.' 163 logging.info(msg) 164 # this would be simpler using NOT IN (subquery), but MySQL 165 # treats all IN subqueries as dependent, so this optimizes much 166 # better 167 self._db.execute(""" 168 DELETE ihq FROM afe_ineligible_host_queues ihq 169 LEFT JOIN (SELECT DISTINCT job_id FROM afe_host_queue_entries 170 WHERE NOT complete) hqe 171 USING (job_id) WHERE hqe.job_id IS NULL""") 172 173 174 def _should_reverify_hosts_now(self): 175 reverify_period_sec = (scheduler_config.config.reverify_period_minutes 176 * 60) 177 if reverify_period_sec == 0: 178 return False 179 return (self._last_reverify_time + reverify_period_sec) <= time.time() 180 181 182 def _choose_subset_of_hosts_to_reverify(self, hosts): 183 """Given hosts needing verification, return a subset to reverify.""" 184 max_at_once = scheduler_config.config.reverify_max_hosts_at_once 185 if (max_at_once > 0 and len(hosts) > max_at_once): 186 return random.sample(hosts, max_at_once) 187 return sorted(hosts) 188 189 190 @timer.decorate 191 def _reverify_dead_hosts(self): 192 if not self._should_reverify_hosts_now(): 193 return 194 195 self._last_reverify_time = time.time() 196 logging.info('Checking for dead hosts to reverify') 197 hosts = models.Host.objects.filter( 198 status=models.Host.Status.REPAIR_FAILED, 199 locked=False, 200 invalid=False) 201 hosts = hosts.exclude( 202 protection=host_protections.Protection.DO_NOT_VERIFY) 203 if not hosts: 204 return 205 206 hosts = list(hosts) 207 total_hosts = len(hosts) 208 hosts = self._choose_subset_of_hosts_to_reverify(hosts) 209 logging.info('Reverifying dead hosts (%d of %d) %s', len(hosts), 210 total_hosts, ', '.join(host.hostname for host in hosts)) 211 for host in hosts: 212 models.SpecialTask.schedule_special_task( 213 host=host, task=models.SpecialTask.Task.VERIFY) 214 215 216 @timer.decorate 217 def _django_session_cleanup(self): 218 """Clean up django_session since django doesn't for us. 219 http://www.djangoproject.com/documentation/0.96/sessions/ 220 """ 221 logging.info('Deleting old sessions from django_session') 222 sql = 'TRUNCATE TABLE django_session' 223 self._db.execute(sql) 224 225 226class TwentyFourHourUpkeep(PeriodicCleanup): 227 """Cleanup that runs at the startup of monitor_db and every subsequent 228 twenty four hours. 229 """ 230 timer = autotest_stats.Timer('monitor_db_cleanup.twentyfourhour_cleanup') 231 232 233 def __init__(self, db, drone_manager, run_at_initialize=True): 234 """Initialize TwentyFourHourUpkeep. 235 236 @param db: Database connection object. 237 @param drone_manager: DroneManager to access drones. 238 @param run_at_initialize: True to run cleanup when scheduler starts. 239 Default is set to True. 240 241 """ 242 self.drone_manager = drone_manager 243 clean_interval_minutes = 24 * 60 # 24 hours 244 super(TwentyFourHourUpkeep, self).__init__( 245 db, clean_interval_minutes, run_at_initialize=run_at_initialize) 246 247 248 @timer.decorate 249 def _cleanup(self): 250 logging.info('Running 24 hour clean up') 251 self._check_for_uncleanable_db_inconsistencies() 252 self._cleanup_orphaned_containers() 253 254 255 @timer.decorate 256 def _check_for_uncleanable_db_inconsistencies(self): 257 logging.info('Checking for uncleanable DB inconsistencies') 258 self._check_for_active_and_complete_queue_entries() 259 self._check_for_multiple_platform_hosts() 260 self._check_for_no_platform_hosts() 261 self._check_for_multiple_atomic_group_hosts() 262 263 264 @timer.decorate 265 def _check_for_active_and_complete_queue_entries(self): 266 query = models.HostQueueEntry.objects.filter(active=True, complete=True) 267 if query.count() != 0: 268 subject = ('%d queue entries found with active=complete=1' 269 % query.count()) 270 lines = [] 271 for entry in query: 272 lines.append(str(entry.get_object_dict())) 273 if entry.status == 'Aborted': 274 logging.error('Aborted entry: %s is both active and ' 275 'complete. Setting active value to False.', 276 str(entry)) 277 entry.active = False 278 entry.save() 279 self._send_inconsistency_message(subject, lines) 280 281 282 @timer.decorate 283 def _check_for_multiple_platform_hosts(self): 284 rows = self._db.execute(""" 285 SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count, 286 GROUP_CONCAT(afe_labels.name) 287 FROM afe_hosts 288 INNER JOIN afe_hosts_labels ON 289 afe_hosts.id = afe_hosts_labels.host_id 290 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id 291 WHERE afe_labels.platform 292 GROUP BY afe_hosts.id 293 HAVING platform_count > 1 294 ORDER BY hostname""") 295 if rows: 296 subject = '%s hosts with multiple platforms' % self._db.rowcount 297 lines = [' '.join(str(item) for item in row) 298 for row in rows] 299 self._send_inconsistency_message(subject, lines) 300 301 302 @timer.decorate 303 def _check_for_no_platform_hosts(self): 304 rows = self._db.execute(""" 305 SELECT hostname 306 FROM afe_hosts 307 LEFT JOIN afe_hosts_labels 308 ON afe_hosts.id = afe_hosts_labels.host_id 309 AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels 310 WHERE platform) 311 WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""") 312 if rows: 313 logging.warning('%s hosts with no platform\n%s', self._db.rowcount, 314 ', '.join(row[0] for row in rows)) 315 316 317 @timer.decorate 318 def _check_for_multiple_atomic_group_hosts(self): 319 rows = self._db.execute(""" 320 SELECT afe_hosts.id, hostname, 321 COUNT(DISTINCT afe_atomic_groups.name) AS atomic_group_count, 322 GROUP_CONCAT(afe_labels.name), 323 GROUP_CONCAT(afe_atomic_groups.name) 324 FROM afe_hosts 325 INNER JOIN afe_hosts_labels ON 326 afe_hosts.id = afe_hosts_labels.host_id 327 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id 328 INNER JOIN afe_atomic_groups ON 329 afe_labels.atomic_group_id = afe_atomic_groups.id 330 WHERE NOT afe_hosts.invalid AND NOT afe_labels.invalid 331 GROUP BY afe_hosts.id 332 HAVING atomic_group_count > 1 333 ORDER BY hostname""") 334 if rows: 335 subject = '%s hosts with multiple atomic groups' % self._db.rowcount 336 lines = [' '.join(str(item) for item in row) 337 for row in rows] 338 self._send_inconsistency_message(subject, lines) 339 340 341 def _send_inconsistency_message(self, subject, lines): 342 logging.error(subject) 343 message = '\n'.join(lines) 344 if len(message) > 5000: 345 message = message[:5000] + '\n(truncated)\n' 346 email_manager.manager.enqueue_notify_email(subject, message) 347 348 349 @timer.decorate 350 def _cleanup_orphaned_containers(self): 351 """Cleanup orphaned containers in each drone. 352 353 The function queues a lxc_cleanup call in each drone without waiting for 354 the script to finish, as the cleanup procedure could take minutes and the 355 script output is logged. 356 357 """ 358 ssp_enabled = global_config.global_config.get_config_value( 359 'AUTOSERV', 'enable_ssp_container') 360 if not ssp_enabled: 361 logging.info('Server-side packaging is not enabled, no need to clean' 362 ' up orphaned containers.') 363 return 364 self.drone_manager.cleanup_orphaned_containers() 365