1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Monitor jobs and abort them as necessary. 6 7This daemon does a number of upkeep tasks: 8 9* When a process owning a job crashes, job_aborter will mark the job as 10 aborted in the database and clean up its lease files. 11 12* When a job is marked aborted in the database, job_aborter will signal 13 the process owning the job to abort. 14 15See also http://goto.google.com/monitor_db_per_job_refactor 16""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import print_function 21 22import argparse 23import logging 24import sys 25import time 26 27from lucifer import autotest 28from lucifer import handoffs 29from lucifer import leasing 30from lucifer import loglib 31 32logger = logging.getLogger(__name__) 33 34 35def main(args): 36 """Main function 37 38 @param args: list of command line args 39 """ 40 41 parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__) 42 parser.add_argument('--jobdir', required=True) 43 loglib.add_logging_options(parser) 44 args = parser.parse_args(args) 45 loglib.configure_logging_with_args(parser, args) 46 logger.info('Starting with args: %r', args) 47 48 autotest.monkeypatch() 49 ts_mon_config = autotest.chromite_load('ts_mon_config') 50 with ts_mon_config.SetupTsMonGlobalState('job_aborter'): 51 _main_loop(jobdir=args.jobdir) 52 assert False # cannot exit normally 53 54 55def _main_loop(jobdir): 56 transaction = autotest.deps_load('django.db.transaction') 57 58 @transaction.commit_manually 59 def flush_transaction(): 60 """Flush transaction https://stackoverflow.com/questions/3346124/""" 61 transaction.commit() 62 63 metrics = _Metrics() 64 metrics.send_starting() 65 while True: 66 logger.debug('Tick') 67 metrics.send_tick() 68 _main_loop_body(metrics, jobdir) 69 flush_transaction() 70 time.sleep(20) 71 72 73def _main_loop_body(metrics, jobdir): 74 active_leases = { 75 lease.id: lease for lease in leasing.leases_iter(jobdir) 76 if not lease.expired() 77 } 78 _mark_expired_jobs_failed(metrics, active_leases) 79 _abort_timed_out_jobs(active_leases) 80 _abort_jobs_marked_aborting(active_leases) 81 _abort_special_tasks_marked_aborted() 82 _clean_up_expired_leases(jobdir) 83 # TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into lucifer 84 85 86def _mark_expired_jobs_failed(metrics, active_leases): 87 """Mark expired jobs failed. 88 89 Expired jobs are jobs that have an incomplete JobHandoff and that do 90 not have an active lease. These jobs have been handed off to a 91 job_reporter, but that job_reporter has crashed. These jobs are 92 marked failed in the database. 93 94 @param metrics: _Metrics instance. 95 @param active_leases: dict mapping job ids to Leases. 96 """ 97 logger.debug('Looking for expired jobs') 98 job_ids = [] 99 for handoff in handoffs.incomplete(): 100 logger.debug('Found handoff: %d', handoff.job_id) 101 if handoff.job_id not in active_leases: 102 logger.info('Handoff %d is missing active lease; cleaning up', 103 handoff.job_id) 104 job_ids.append(handoff.job_id) 105 handoffs.clean_up(job_ids) 106 handoffs.mark_complete(job_ids) 107 metrics.send_expired_jobs(len(job_ids)) 108 109 110def _abort_timed_out_jobs(active_leases): 111 """Send abort to timed out jobs. 112 113 @param active_leases: dict mapping job ids to Leases. 114 """ 115 for job in _timed_out_jobs_queryset(): 116 if job.id in active_leases: 117 logger.info('Job %d is timed out; aborting', job.id) 118 active_leases[job.id].maybe_abort() 119 120 121def _abort_jobs_marked_aborting(active_leases): 122 """Send abort to jobs marked aborting in Autotest database. 123 124 @param active_leases: dict mapping job ids to Leases. 125 """ 126 for job in _aborting_jobs_queryset(): 127 if job.id in active_leases: 128 logger.info('Job %d is marked for aborting; aborting', job.id) 129 active_leases[job.id].maybe_abort() 130 131 132def _abort_special_tasks_marked_aborted(): 133 # TODO(crbug.com/748234): Special tasks not implemented yet. This 134 # would abort jobs running on the behalf of special tasks and thus 135 # need to check a different database table. 136 pass 137 138 139def _clean_up_expired_leases(jobdir): 140 """Clean up files for expired leases. 141 142 We only care about active leases, so we can remove the stale files 143 for expired leases. 144 """ 145 for lease in leasing.leases_iter(jobdir): 146 if lease.expired(): 147 lease.cleanup() 148 149 150def _timed_out_jobs_queryset(): 151 """Return a QuerySet of timed out Jobs. 152 153 @returns: Django QuerySet 154 """ 155 models = autotest.load('frontend.afe.models') 156 return ( 157 models.Job.objects 158 .filter(hostqueueentry__complete=False) 159 .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) 160 .distinct() 161 ) 162 163 164def _aborting_jobs_queryset(): 165 """Return a QuerySet of aborting Jobs. 166 167 @returns: Django QuerySet 168 """ 169 models = autotest.load('frontend.afe.models') 170 return ( 171 models.Job.objects 172 .filter(hostqueueentry__aborted=True) 173 .filter(hostqueueentry__complete=False) 174 .distinct() 175 ) 176 177 178class _Metrics(object): 179 180 """Class for sending job_aborter metrics.""" 181 182 def __init__(self): 183 metrics = autotest.chromite_load('metrics') 184 prefix = 'chromeos/lucifer/job_aborter' 185 self._starting_m = metrics.Counter(prefix + '/start') 186 self._tick_m = metrics.Counter(prefix + '/tick') 187 self._expired_m = metrics.Counter(prefix + '/expired_jobs') 188 189 def send_starting(self): 190 """Send starting metric.""" 191 self._starting_m.increment() 192 193 def send_tick(self): 194 """Send tick metric.""" 195 self._tick_m.increment() 196 197 def send_expired_jobs(self, count): 198 """Send expired_jobs metric.""" 199 self._expired_m.increment_by(count) 200 201 202if __name__ == '__main__': 203 main(sys.argv[1:]) 204