1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Monitor jobs and abort them as necessary.
6
7This daemon does a number of upkeep tasks:
8
9* When a process owning a job crashes, job_aborter will mark the job as
10  aborted in the database and clean up its lease files.
11
12* When a job is marked aborted in the database, job_aborter will signal
13  the process owning the job to abort.
14
15See also http://goto.google.com/monitor_db_per_job_refactor
16"""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import print_function
21
22import argparse
23import logging
24import sys
25import time
26
27from lucifer import autotest
28from lucifer import handoffs
29from lucifer import leasing
30from lucifer import loglib
31
32logger = logging.getLogger(__name__)
33
34
35def main(args):
36    """Main function
37
38    @param args: list of command line args
39    """
40
41    parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__)
42    parser.add_argument('--jobdir', required=True)
43    loglib.add_logging_options(parser)
44    args = parser.parse_args(args)
45    loglib.configure_logging_with_args(parser, args)
46    logger.info('Starting with args: %r', args)
47
48    autotest.monkeypatch()
49    ts_mon_config = autotest.chromite_load('ts_mon_config')
50    with ts_mon_config.SetupTsMonGlobalState('job_aborter'):
51        _main_loop(jobdir=args.jobdir)
52    assert False  # cannot exit normally
53
54
55def _main_loop(jobdir):
56    transaction = autotest.deps_load('django.db.transaction')
57
58    @transaction.commit_manually
59    def flush_transaction():
60        """Flush transaction https://stackoverflow.com/questions/3346124/"""
61        transaction.commit()
62
63    metrics = _Metrics()
64    metrics.send_starting()
65    while True:
66        logger.debug('Tick')
67        metrics.send_tick()
68        _main_loop_body(metrics, jobdir)
69        flush_transaction()
70        time.sleep(20)
71
72
73def _main_loop_body(metrics, jobdir):
74    active_leases = {
75            lease.id: lease for lease in leasing.leases_iter(jobdir)
76            if not lease.expired()
77    }
78    _mark_expired_jobs_failed(metrics, active_leases)
79    _abort_timed_out_jobs(active_leases)
80    _abort_jobs_marked_aborting(active_leases)
81    _abort_special_tasks_marked_aborted()
82    _clean_up_expired_leases(jobdir)
83    # TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into lucifer
84
85
86def _mark_expired_jobs_failed(metrics, active_leases):
87    """Mark expired jobs failed.
88
89    Expired jobs are jobs that have an incomplete JobHandoff and that do
90    not have an active lease.  These jobs have been handed off to a
91    job_reporter, but that job_reporter has crashed.  These jobs are
92    marked failed in the database.
93
94    @param metrics: _Metrics instance.
95    @param active_leases: dict mapping job ids to Leases.
96    """
97    logger.debug('Looking for expired jobs')
98    job_ids = []
99    for handoff in handoffs.incomplete():
100        logger.debug('Found handoff: %d', handoff.job_id)
101        if handoff.job_id not in active_leases:
102            logger.info('Handoff %d is missing active lease; cleaning up',
103                        handoff.job_id)
104            job_ids.append(handoff.job_id)
105    handoffs.clean_up(job_ids)
106    handoffs.mark_complete(job_ids)
107    metrics.send_expired_jobs(len(job_ids))
108
109
110def _abort_timed_out_jobs(active_leases):
111    """Send abort to timed out jobs.
112
113    @param active_leases: dict mapping job ids to Leases.
114    """
115    for job in _timed_out_jobs_queryset():
116        if job.id in active_leases:
117            logger.info('Job %d is timed out; aborting', job.id)
118            active_leases[job.id].maybe_abort()
119
120
121def _abort_jobs_marked_aborting(active_leases):
122    """Send abort to jobs marked aborting in Autotest database.
123
124    @param active_leases: dict mapping job ids to Leases.
125    """
126    for job in _aborting_jobs_queryset():
127        if job.id in active_leases:
128            logger.info('Job %d is marked for aborting; aborting', job.id)
129            active_leases[job.id].maybe_abort()
130
131
132def _abort_special_tasks_marked_aborted():
133    # TODO(crbug.com/748234): Special tasks not implemented yet.  This
134    # would abort jobs running on the behalf of special tasks and thus
135    # need to check a different database table.
136    pass
137
138
139def _clean_up_expired_leases(jobdir):
140    """Clean up files for expired leases.
141
142    We only care about active leases, so we can remove the stale files
143    for expired leases.
144    """
145    for lease in leasing.leases_iter(jobdir):
146        if lease.expired():
147            lease.cleanup()
148
149
150def _timed_out_jobs_queryset():
151    """Return a QuerySet of timed out Jobs.
152
153    @returns: Django QuerySet
154    """
155    models = autotest.load('frontend.afe.models')
156    return (
157            models.Job.objects
158            .filter(hostqueueentry__complete=False)
159            .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
160            .distinct()
161    )
162
163
164def _aborting_jobs_queryset():
165    """Return a QuerySet of aborting Jobs.
166
167    @returns: Django QuerySet
168    """
169    models = autotest.load('frontend.afe.models')
170    return (
171            models.Job.objects
172            .filter(hostqueueentry__aborted=True)
173            .filter(hostqueueentry__complete=False)
174            .distinct()
175    )
176
177
178class _Metrics(object):
179
180    """Class for sending job_aborter metrics."""
181
182    def __init__(self):
183        metrics = autotest.chromite_load('metrics')
184        prefix = 'chromeos/lucifer/job_aborter'
185        self._starting_m = metrics.Counter(prefix + '/start')
186        self._tick_m = metrics.Counter(prefix + '/tick')
187        self._expired_m = metrics.Counter(prefix + '/expired_jobs')
188
189    def send_starting(self):
190        """Send starting metric."""
191        self._starting_m.increment()
192
193    def send_tick(self):
194        """Send tick metric."""
195        self._tick_m.increment()
196
197    def send_expired_jobs(self, count):
198        """Send expired_jobs metric."""
199        self._expired_m.increment_by(count)
200
201
202if __name__ == '__main__':
203    main(sys.argv[1:])
204