1"""Autotest AFE Cleanup used by the scheduler"""
2
3import contextlib
4import logging
5import random
6import time
7
8from autotest_lib.client.common_lib import utils
9from autotest_lib.frontend.afe import models
10from autotest_lib.scheduler import scheduler_config
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import host_protections
13
14try:
15    from chromite.lib import metrics
16except ImportError:
17    metrics = utils.metrics_mock
18
19
20_METRICS_PREFIX = 'chromeos/autotest/scheduler/cleanup'
21
22
23class PeriodicCleanup(object):
24    """Base class to schedule periodical cleanup work.
25    """
26
27    def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
28        self._db = db
29        self.clean_interval_minutes = clean_interval_minutes
30        self._last_clean_time = time.time()
31        self._run_at_initialize = run_at_initialize
32
33
34    def initialize(self):
35        """Method called by scheduler at the startup.
36        """
37        if self._run_at_initialize:
38            self._cleanup()
39
40
41    def run_cleanup_maybe(self):
42        """Test if cleanup method should be called.
43        """
44        should_cleanup = (self._last_clean_time +
45                          self.clean_interval_minutes * 60
46                          < time.time())
47        if should_cleanup:
48            self._cleanup()
49            self._last_clean_time = time.time()
50
51
52    def _cleanup(self):
53        """Abrstract cleanup method."""
54        raise NotImplementedError
55
56
57class UserCleanup(PeriodicCleanup):
58    """User cleanup that is controlled by the global config variable
59       clean_interval_minutes in the SCHEDULER section.
60    """
61
62    def __init__(self, db, clean_interval_minutes):
63        super(UserCleanup, self).__init__(db, clean_interval_minutes)
64        self._last_reverify_time = time.time()
65
66
67    @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/user/durations')
68    def _cleanup(self):
69        logging.info('Running periodic cleanup')
70        self._abort_timed_out_jobs()
71        self._abort_jobs_past_max_runtime()
72        self._clear_inactive_blocks()
73        self._check_for_db_inconsistencies()
74        self._reverify_dead_hosts()
75        self._django_session_cleanup()
76
77
78    def _abort_timed_out_jobs(self):
79        logging.info(
80                'Aborting all jobs that have timed out and are not complete')
81        query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
82            where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
83        jobs = query.distinct()
84        if not jobs:
85            return
86
87        with _cleanup_warning_banner('timed out jobs', len(jobs)):
88            for job in jobs:
89                logging.warning('Aborting job %d due to job timeout', job.id)
90                job.abort()
91        _report_detected_errors('jobs_timed_out', len(jobs))
92
93
94    def _abort_jobs_past_max_runtime(self):
95        """
96        Abort executions that have started and are past the job's max runtime.
97        """
98        logging.info('Aborting all jobs that have passed maximum runtime')
99        rows = self._db.execute("""
100            SELECT hqe.id FROM afe_host_queue_entries AS hqe
101            WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS
102            (select * from afe_jobs where hqe.job_id=afe_jobs.id and
103             hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW())
104            """)
105        query = models.HostQueueEntry.objects.filter(
106            id__in=[row[0] for row in rows])
107        hqes = query.distinct()
108        if not hqes:
109            return
110
111        with _cleanup_warning_banner('hqes past max runtime', len(hqes)):
112            for queue_entry in hqes:
113                logging.warning('Aborting entry %s due to max runtime',
114                                queue_entry)
115                queue_entry.abort()
116        _report_detected_errors('hqes_past_max_runtime', len(hqes))
117
118
119    def _check_for_db_inconsistencies(self):
120        logging.info('Cleaning db inconsistencies')
121        self._check_all_invalid_related_objects()
122
123
124    def _check_invalid_related_objects_one_way(self, invalid_model,
125                                               relation_field, valid_model):
126        if 'invalid' not in invalid_model.get_field_dict():
127            return
128
129        invalid_objects = list(invalid_model.objects.filter(invalid=True))
130        invalid_model.objects.populate_relationships(
131                invalid_objects, valid_model, 'related_objects')
132        if not invalid_objects:
133            return
134
135        num_objects_with_invalid_relations = 0
136        errors = []
137        for invalid_object in invalid_objects:
138            if invalid_object.related_objects:
139                related_objects = invalid_object.related_objects
140                related_list = ', '.join(str(x) for x in related_objects)
141                num_objects_with_invalid_relations += 1
142                errors.append('Invalid %s is related to: %s' %
143                              (invalid_object, related_list))
144                related_manager = getattr(invalid_object, relation_field)
145                related_manager.clear()
146
147        # Only log warnings after we're sure we've seen at least one invalid
148        # model with some valid relations to avoid empty banners from getting
149        # printed.
150        if errors:
151            invalid_model_name = invalid_model.__name__
152            valid_model_name = valid_model.__name__
153            banner = 'invalid %s related to valid %s' % (invalid_model_name,
154                                                         valid_model_name)
155            with _cleanup_warning_banner(banner, len(errors)):
156                for error in errors:
157                    logging.warning(error)
158            _report_detected_errors(
159                    'invalid_related_objects',
160                    num_objects_with_invalid_relations,
161                    fields={'invalid_model': invalid_model_name,
162                            'valid_model': valid_model_name})
163            _report_detected_errors(
164                    'invalid_related_objects_relations',
165                    len(errors),
166                    fields={'invalid_model': invalid_model_name,
167                            'valid_model': valid_model_name})
168
169
170    def _check_invalid_related_objects(self, first_model, first_field,
171                                       second_model, second_field):
172        self._check_invalid_related_objects_one_way(
173                first_model,
174                first_field,
175                second_model,
176        )
177        self._check_invalid_related_objects_one_way(
178                second_model,
179                second_field,
180                first_model,
181        )
182
183
184    def _check_all_invalid_related_objects(self):
185        model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
186                       (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
187                       (models.AclGroup, 'users', models.User, 'aclgroup_set'),
188                       (models.Test, 'dependency_labels', models.Label,
189                        'test_set'))
190        for first_model, first_field, second_model, second_field in model_pairs:
191            self._check_invalid_related_objects(
192                    first_model,
193                    first_field,
194                    second_model,
195                    second_field,
196            )
197
198
199    def _clear_inactive_blocks(self):
200        logging.info('Clear out blocks for all completed jobs.')
201        # this would be simpler using NOT IN (subquery), but MySQL
202        # treats all IN subqueries as dependent, so this optimizes much
203        # better
204        self._db.execute("""
205                DELETE ihq FROM afe_ineligible_host_queues ihq
206                WHERE NOT EXISTS
207                    (SELECT job_id FROM afe_host_queue_entries hqe
208                     WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""")
209
210
211    def _should_reverify_hosts_now(self):
212        reverify_period_sec = (scheduler_config.config.reverify_period_minutes
213                               * 60)
214        if reverify_period_sec == 0:
215            return False
216        return (self._last_reverify_time + reverify_period_sec) <= time.time()
217
218
219    def _choose_subset_of_hosts_to_reverify(self, hosts):
220        """Given hosts needing verification, return a subset to reverify."""
221        max_at_once = scheduler_config.config.reverify_max_hosts_at_once
222        if (max_at_once > 0 and len(hosts) > max_at_once):
223            return random.sample(hosts, max_at_once)
224        return sorted(hosts)
225
226
227    def _reverify_dead_hosts(self):
228        if not self._should_reverify_hosts_now():
229            return
230
231        self._last_reverify_time = time.time()
232        logging.info('Checking for dead hosts to reverify')
233        hosts = models.Host.objects.filter(
234                status=models.Host.Status.REPAIR_FAILED,
235                locked=False,
236                invalid=False)
237        hosts = hosts.exclude(
238                protection=host_protections.Protection.DO_NOT_VERIFY)
239        if not hosts:
240            return
241
242        hosts = list(hosts)
243        total_hosts = len(hosts)
244        hosts = self._choose_subset_of_hosts_to_reverify(hosts)
245        logging.info('Reverifying dead hosts (%d of %d)', len(hosts),
246                     total_hosts)
247        with _cleanup_warning_banner('reverify dead hosts', len(hosts)):
248            for host in hosts:
249                logging.warning(host.hostname)
250        _report_detected_errors('dead_hosts_triggered_reverify', len(hosts))
251        _report_detected_errors('dead_hosts_require_reverify', total_hosts)
252        for host in hosts:
253            models.SpecialTask.schedule_special_task(
254                    host=host, task=models.SpecialTask.Task.VERIFY)
255
256
257    def _django_session_cleanup(self):
258        """Clean up django_session since django doesn't for us.
259           http://www.djangoproject.com/documentation/0.96/sessions/
260        """
261        logging.info('Deleting old sessions from django_session')
262        sql = 'TRUNCATE TABLE django_session'
263        self._db.execute(sql)
264
265
266class TwentyFourHourUpkeep(PeriodicCleanup):
267    """Cleanup that runs at the startup of monitor_db and every subsequent
268       twenty four hours.
269    """
270
271
272    def __init__(self, db, drone_manager, run_at_initialize=True):
273        """Initialize TwentyFourHourUpkeep.
274
275        @param db: Database connection object.
276        @param drone_manager: DroneManager to access drones.
277        @param run_at_initialize: True to run cleanup when scheduler starts.
278                                  Default is set to True.
279
280        """
281        self.drone_manager = drone_manager
282        clean_interval_minutes = 24 * 60 # 24 hours
283        super(TwentyFourHourUpkeep, self).__init__(
284            db, clean_interval_minutes, run_at_initialize=run_at_initialize)
285
286
287    @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/daily/durations')
288    def _cleanup(self):
289        logging.info('Running 24 hour clean up')
290        self._check_for_uncleanable_db_inconsistencies()
291        self._cleanup_orphaned_containers()
292
293
294    def _check_for_uncleanable_db_inconsistencies(self):
295        logging.info('Checking for uncleanable DB inconsistencies')
296        self._check_for_active_and_complete_queue_entries()
297        self._check_for_multiple_platform_hosts()
298        self._check_for_no_platform_hosts()
299
300
301    def _check_for_active_and_complete_queue_entries(self):
302        query = models.HostQueueEntry.objects.filter(active=True, complete=True)
303        num_bad_hqes = query.count()
304        if num_bad_hqes == 0:
305            return
306
307        num_aborted = 0
308        logging.warning('%d queue entries found with active=complete=1',
309                        num_bad_hqes)
310        with _cleanup_warning_banner('active and complete hqes', num_bad_hqes):
311            for entry in query:
312                if entry.status == 'Aborted':
313                    entry.active = False
314                    entry.save()
315                    recovery_path = 'was also aborted, set active to False'
316                    num_aborted += 1
317                else:
318                    recovery_path = 'can not recover'
319                logging.warning('%s (recovery: %s)', entry.get_object_dict(),
320                                recovery_path)
321        _report_detected_errors('hqes_active_and_complete', num_bad_hqes)
322        _report_detected_errors('hqes_aborted_set_to_inactive', num_aborted)
323
324
325    def _check_for_multiple_platform_hosts(self):
326        rows = self._db.execute("""
327            SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
328                   GROUP_CONCAT(afe_labels.name)
329            FROM afe_hosts
330            INNER JOIN afe_hosts_labels ON
331                    afe_hosts.id = afe_hosts_labels.host_id
332            INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
333            WHERE afe_labels.platform
334            GROUP BY afe_hosts.id
335            HAVING platform_count > 1
336            ORDER BY hostname""")
337
338        if rows:
339            logging.warning('Cleanup found hosts with multiple platforms')
340            with _cleanup_warning_banner('hosts with multiple platforms',
341                                         len(rows)):
342                for row in rows:
343                    logging.warning(' '.join(str(item) for item in row))
344            _report_detected_errors('hosts_with_multiple_platforms', len(rows))
345
346
347    def _check_for_no_platform_hosts(self):
348        rows = self._db.execute("""
349            SELECT hostname
350            FROM afe_hosts
351            LEFT JOIN afe_hosts_labels
352              ON afe_hosts.id = afe_hosts_labels.host_id
353              AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
354                                                WHERE platform)
355            WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
356        if rows:
357            with _cleanup_warning_banner('hosts with no platform', len(rows)):
358                for row in rows:
359                    logging.warning(row[0])
360            _report_detected_errors('hosts_with_no_platform', len(rows))
361
362
363    def _cleanup_orphaned_containers(self):
364        """Cleanup orphaned containers in each drone.
365
366        The function queues a lxc_cleanup call in each drone without waiting for
367        the script to finish, as the cleanup procedure could take minutes and the
368        script output is logged.
369
370        """
371        ssp_enabled = global_config.global_config.get_config_value(
372                'AUTOSERV', 'enable_ssp_container')
373        if not ssp_enabled:
374            logging.info(
375                    'Server-side packaging is not enabled, no need to clean '
376                    'up orphaned containers.')
377            return
378        self.drone_manager.cleanup_orphaned_containers()
379
380
381def _report_detected_errors(metric_name, count, fields={}):
382    """Reports a counter metric for recovered errors
383
384    @param metric_name: Name of the metric to report about.
385    @param count: How many "errors" were fixed this cycle.
386    @param fields: Optional fields to include with the metric.
387    """
388    m = '%s/errors_recovered/%s' % (_METRICS_PREFIX, metric_name)
389    metrics.Counter(m).increment_by(count, fields=fields)
390
391
392def _report_detected_errors(metric_name, gauge, fields={}):
393    """Reports a gauge metric for errors detected
394
395    @param metric_name: Name of the metric to report about.
396    @param gauge: Outstanding number of unrecoverable errors of this type.
397    @param fields: Optional fields to include with the metric.
398    """
399    m = '%s/errors_detected/%s' % (_METRICS_PREFIX, metric_name)
400    metrics.Gauge(m).set(gauge, fields=fields)
401
402
403@contextlib.contextmanager
404def _cleanup_warning_banner(banner, error_count=None):
405    """Put a clear context in the logs around list of errors
406
407    @param: banner: The identifying header to print for context.
408    @param: error_count: If not None, the number of errors detected.
409    """
410    if error_count is not None:
411        banner += ' (total: %d)' % error_count
412    logging.warning('#### START: %s ####', banner)
413    try:
414        yield
415    finally:
416        logging.warning('#### END: %s ####', banner)
417