1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import argparse
9import datetime
10import httplib
11import logging
12import os
13import random
14import signal
15import time
16import urllib2
17
18import common
19
20from autotest_lib.frontend import setup_django_environment
21from autotest_lib.frontend.afe.json_rpc import proxy
22from autotest_lib.client.common_lib import error
23from autotest_lib.client.common_lib import global_config
24from autotest_lib.frontend.afe import models
25from autotest_lib.scheduler import email_manager
26from autotest_lib.scheduler import scheduler_lib
27from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
28from autotest_lib.server import utils as server_utils
29from chromite.lib import timeout_util
30from django.core.exceptions import MultipleObjectsReturned
31from django.db import transaction
32
33try:
34    from chromite.lib import metrics
35    from chromite.lib import ts_mon_config
36    from infra_libs import ts_mon
37except ImportError:
38    metrics = server_utils.metrics_mock
39    ts_mon_config = server_utils.metrics_mock
40
41
42"""
43Autotest shard client
44
45The shard client can be run as standalone service. It periodically polls the
46master in a heartbeat, retrieves new jobs and hosts and inserts them into the
47local database.
48
49A shard is set up (by a human) and pointed to the global AFE (cautotest).
50On the shard, this script periodically makes so called heartbeat requests to the
51global AFE, which will then complete the following actions:
52
531. Find the previously created (with atest) record for the shard. Shards are
54   identified by their hostnames, specified in the shadow_config.
552. Take the records that were sent in the heartbeat and insert them into the
56   global database.
57   - This is to set the status of jobs to completed in the master database after
58     they were run by a slave. This is necessary so one can just look at the
59     master's afe to see the statuses of all jobs. Otherwise one would have to
60     check the tko tables or the individual slave AFEs.
613. Find labels that have been assigned to this shard.
624. Assign hosts that:
63   - have the specified label
64   - aren't leased
65   - have an id which is not in the known_host_ids which were sent in the
66     heartbeat request.
675. Assign jobs that:
68   - depend on the specified label
69   - haven't been assigned before
70   - aren't started yet
71   - aren't completed yet
72   - have an id which is not in the jobs_known_ids which were sent in the
73     heartbeat request.
746. Serialize the chosen jobs and hosts.
75   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
76     and many more. Details about this can be found around
77     model_logic.serialize()
787. Send these objects to the slave.
79
80
81On the client side, this will happen:
821. Deserialize the objects sent from the master and persist them to the local
83   database.
842. monitor_db on the shard will pick up these jobs and schedule them on the
85   available hosts (which were retrieved from a heartbeat).
863. Once a job is finished, it's shard_id is set to NULL
874. The shard_client will pick up all jobs where shard_id=NULL and will
88   send them to the master in the request of the next heartbeat.
89   - The master will persist them as described earlier.
90   - the shard_id will be set back to the shard's id, so the record won't be
91     uploaded again.
92   The heartbeat request will also contain the ids of incomplete jobs and the
93   ids of all hosts. This is used to not send objects repeatedly. For more
94   information on this and alternatives considered
95   see rpc_interface.shard_heartbeat.
96"""
97
98
99HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
100_METRICS_PREFIX  = 'chromeos/autotest/shard_client/heartbeat/'
101
102RPC_TIMEOUT_MIN = 30
103RPC_DELAY_SEC = 5
104
105# The maximum number of jobs to attempt to upload in a single heartbeat.
106MAX_UPLOAD_JOBS = 1000
107
108_heartbeat_client = None
109
110
111class ShardClient(object):
112    """Performs client side tasks of sharding, i.e. the heartbeat.
113
114    This class contains the logic to do periodic heartbeats to a global AFE,
115    to retrieve new jobs from it and to report completed jobs back.
116    """
117
118    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
119        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
120                                                 timeout_min=RPC_TIMEOUT_MIN,
121                                                 delay_sec=RPC_DELAY_SEC)
122        self.hostname = shard_hostname
123        self.tick_pause_sec = tick_pause_sec
124        self._shutdown_requested = False
125        self._shard = None
126
127
128    def _deserialize_many(self, serialized_list, djmodel, message):
129        """Deserialize data in JSON format to database.
130
131        Deserialize a list of JSON-formatted data to database using Django.
132
133        @param serialized_list: A list of JSON-formatted data or python dict
134                                literals.
135        @param djmodel: Django model type.
136        @param message: A string to be used in a logging message.
137        """
138        logging.info('Deserializing %s %ss', len(serialized_list), message)
139        i = 0
140        for serialized in serialized_list:
141            i += 1
142            if i % 100 == 1:
143              logging.info('Progress: at entry %s', i)
144            with transaction.commit_on_success():
145                try:
146                    djmodel.deserialize(serialized)
147                except Exception as e:
148                    logging.error('Deserializing a %s fails: %s, Error: %s',
149                                  message, serialized, e)
150                    metrics.Counter(
151                        'chromeos/autotest/shard_client/deserialization_failed'
152                        ).increment()
153        logging.info('Done deserializing %ss', message)
154
155
156    @metrics.SecondsTimerDecorator(
157            'chromeos/autotest/shard_client/heartbeat_response_duration')
158    def process_heartbeat_response(self, heartbeat_response):
159        """Save objects returned by a heartbeat to the local database.
160
161        This deseralizes hosts and jobs including their dependencies and saves
162        them to the local database.
163
164        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
165                                   as returned by the `shard_heartbeat` rpc
166                                   call.
167        """
168        hosts_serialized = heartbeat_response['hosts']
169        jobs_serialized = heartbeat_response['jobs']
170        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
171        incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', [])
172
173        metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
174                      ).set(len(hosts_serialized))
175        metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
176                      ).set(len(jobs_serialized))
177        metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
178                      ).set(len(suite_keyvals_serialized))
179
180        self._deserialize_many(hosts_serialized, models.Host, 'host')
181        self._deserialize_many(jobs_serialized, models.Job, 'job')
182        self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
183                               'jobkeyval')
184
185        host_ids = [h['id'] for h in hosts_serialized]
186        logging.info('Heartbeat response contains hosts %s', host_ids)
187        job_ids = [j['id'] for j in jobs_serialized]
188        logging.info('Heartbeat response contains jobs %s', job_ids)
189        parent_jobs_with_keyval = set([kv['job_id']
190                                       for kv in suite_keyvals_serialized])
191        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
192                     list(parent_jobs_with_keyval))
193        if incorrect_host_ids:
194            logging.info('Heartbeat response contains incorrect_host_ids %s '
195                         'which will be deleted.', incorrect_host_ids)
196            self._remove_incorrect_hosts(incorrect_host_ids)
197
198        # If the master has just sent any jobs that we think have completed,
199        # re-sync them with the master. This is especially useful when a
200        # heartbeat or job is silently dropped, as the next heartbeat will
201        # have a disagreement. Updating the shard_id to NULL will mark these
202        # jobs for upload on the next heartbeat.
203        job_models = models.Job.objects.filter(
204                id__in=job_ids, hostqueueentry__complete=True)
205        if job_models:
206            job_models.update(shard=None)
207            job_ids_repr = ', '.join([str(job.id) for job in job_models])
208            logging.warn('Following completed jobs are reset shard_id to NULL '
209                         'to be uploaded to master again: %s', job_ids_repr)
210
211
212    def _remove_incorrect_hosts(self, incorrect_host_ids=None):
213        """Remove from local database any hosts that should not exist.
214
215        Entries of |incorrect_host_ids| that are absent from database will be
216        silently ignored.
217
218        @param incorrect_host_ids: a list of ids (as integers) to remove.
219        """
220        if not incorrect_host_ids:
221            return
222
223        try:
224            models.Host.objects.filter(id__in=incorrect_host_ids).delete()
225        except MultipleObjectsReturned as e:
226            logging.exception('Failed to remove incorrect hosts %s',
227                              incorrect_host_ids)
228
229
230    @property
231    def shard(self):
232        """Return this shard's own shard object, fetched from the database.
233
234        A shard's object is fetched from the master with the first jobs. It will
235        not exist before that time.
236
237        @returns: The shard object if it already exists, otherwise None
238        """
239        if self._shard is None:
240            try:
241                self._shard = models.Shard.smart_get(self.hostname)
242            except models.Shard.DoesNotExist:
243                # This might happen before any jobs are assigned to this shard.
244                # This is okay because then there is nothing to offload anyway.
245                pass
246        return self._shard
247
248
249    def _get_jobs_to_upload(self):
250        jobs = []
251        # The scheduler sets shard to None upon completion of the job.
252        # For more information on the shard field's semantic see
253        # models.Job.shard. We need to be careful to wait for both the
254        # shard_id and the complete bit here, or we will end up syncing
255        # the job without ever setting the complete bit.
256        job_ids = list(models.Job.objects.filter(
257            shard=None,
258            hostqueueentry__complete=True).values_list('pk', flat=True))
259
260        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
261            jobs.append(job_to_upload)
262        return jobs
263
264
265    def _mark_jobs_as_uploaded(self, job_ids):
266        # self.shard might be None if no jobs were downloaded yet.
267        # But then job_ids is empty, so this is harmless.
268        # Even if there were jobs we'd in the worst case upload them twice.
269        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
270
271
272    def _get_hqes_for_jobs(self, jobs):
273        hqes = []
274        for job in jobs:
275            hqes.extend(job.hostqueueentry_set.all())
276        return hqes
277
278
279    def _get_known_jobs_and_hosts(self):
280        """Returns lists of host and job info to send in a heartbeat.
281
282        The host and job ids are ids of objects that are already present on the
283        shard and therefore don't need to be sent again.
284
285        For jobs, only incomplete jobs are sent, as the master won't send
286        already completed jobs anyway. This helps keeping the list of id's
287        considerably small.
288
289        For hosts, host status in addition to host id are sent to master
290        to sync the host status.
291
292        @returns: Tuple of three lists. The first one contains job ids, the
293                  second one host ids, and the third one host statuses.
294        """
295        jobs = models.Job.objects.filter(hostqueueentry__complete=False)
296        job_ids = list(jobs.values_list('id', flat=True))
297        self._report_job_time_distribution(jobs)
298
299        host_models = models.Host.objects.filter(invalid=0)
300        host_ids = []
301        host_statuses = []
302        for h in host_models:
303            host_ids.append(h.id)
304            host_statuses.append(h.status)
305        return job_ids, host_ids, host_statuses
306
307
308    def _heartbeat_packet(self):
309        """Construct the heartbeat packet.
310
311        See rpc_interface for a more detailed description of the heartbeat.
312
313        @return: A heartbeat packet.
314        """
315        known_job_ids, known_host_ids, known_host_statuses = (
316                self._get_known_jobs_and_hosts())
317        max_print = 100
318        logging.info('Known jobs (first %s): %s', max_print,
319                     known_job_ids[:max_print])
320        logging.info('Total known jobs: %s', len(known_job_ids))
321
322        job_objs = self._get_jobs_to_upload()
323        hqes = [hqe.serialize(include_dependencies=False)
324                for hqe in self._get_hqes_for_jobs(job_objs)]
325
326        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
327        if len(jobs) > MAX_UPLOAD_JOBS:
328            logging.info('Throttling number of jobs to upload from %s to %s.',
329                         len(jobs), MAX_UPLOAD_JOBS)
330            jobs = jobs[:MAX_UPLOAD_JOBS]
331        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
332
333        return {'shard_hostname': self.hostname,
334                'known_job_ids': known_job_ids,
335                'known_host_ids': known_host_ids,
336                'known_host_statuses': known_host_statuses,
337                'jobs': jobs,
338                'hqes': hqes}
339
340
341    def _report_job_time_distribution(self, jobs):
342        """Report distribution of job durations to monarch."""
343        jobs_time_distribution = metrics.Distribution(
344                _METRICS_PREFIX + 'known_jobs_durations')
345        now = datetime.datetime.now()
346
347        # The type expected by the .set(...) of a distribution is a
348        # distribution.
349        dist = ts_mon.Distribution(ts_mon.GeometricBucketer())
350        for job in jobs:
351            duration = int(
352                    max(0, (now - job.created_on).total_seconds()))
353            dist.add(duration)
354        jobs_time_distribution.set(dist)
355
356    def _report_packet_metrics(self, packet):
357        """Report stats about outgoing packet to monarch."""
358        metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set(
359                len(packet['known_job_ids']))
360        metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set(
361                len(packet['jobs']))
362        metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set(
363                len(packet['known_host_ids']))
364
365
366    def _heartbeat_failure(self, log_message, failure_type_str=''):
367        logging.error("Heartbeat failed. %s", log_message)
368        metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
369                        ).increment(fields={'failure_type': failure_type_str})
370
371
372    @metrics.SecondsTimerDecorator(
373            'chromeos/autotest/shard_client/do_heatbeat_duration')
374    def do_heartbeat(self):
375        """Perform a heartbeat: Retreive new jobs.
376
377        This function executes a `shard_heartbeat` RPC. It retrieves the
378        response of this call and processes the response by storing the returned
379        objects in the local database.
380
381        Returns: True if the heartbeat ran successfully, False otherwise.
382        """
383
384        logging.info("Performing heartbeat.")
385        packet = self._heartbeat_packet()
386        self._report_packet_metrics(packet)
387        metrics.Gauge(_METRICS_PREFIX + 'request_size').set(
388            len(str(packet)))
389
390        try:
391            response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
392            logging.info('Finished heartbeat upload.')
393        except urllib2.HTTPError as e:
394            self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason),
395                                    'HTTPError')
396            return False
397        except urllib2.URLError as e:
398            self._heartbeat_failure('URLError: %s' % e.reason,
399                                    'URLError')
400            return False
401        except httplib.HTTPException as e:
402            self._heartbeat_failure('HTTPException: %s' % e,
403                                    'HTTPException')
404            return False
405        except timeout_util.TimeoutError as e:
406            self._heartbeat_failure('TimeoutError: %s' % e,
407                                    'TimeoutError')
408            return False
409        except proxy.JSONRPCException as e:
410            self._heartbeat_failure('JSONRPCException: %s' % e,
411                                    'JSONRPCException')
412            return False
413
414        metrics.Gauge(_METRICS_PREFIX + 'response_size').set(
415            len(str(response)))
416        logging.info('Marking jobs as uploaded.')
417        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
418        logging.info('Processing heartbeat response.')
419        self.process_heartbeat_response(response)
420        logging.info("Heartbeat completed.")
421        return True
422
423
424    def tick(self):
425        """Performs all tasks the shard clients needs to do periodically."""
426        success = self.do_heartbeat()
427        if success:
428            metrics.Counter('chromeos/autotest/shard_client/tick').increment()
429
430
431    def loop(self, lifetime_hours):
432        """Calls tick() until shutdown() is called or lifetime expires.
433
434        @param lifetime_hours: (int) hours to loop for.
435        """
436        loop_start_time = time.time()
437        while self._continue_looping(lifetime_hours, loop_start_time):
438            self.tick()
439            # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
440            tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
441            time.sleep(self.tick_pause_sec + tick_fuzz)
442
443
444    def shutdown(self):
445        """Stops the shard client after the current tick."""
446        logging.info("Shutdown request received.")
447        self._shutdown_requested = True
448
449
450    def _continue_looping(self, lifetime_hours, loop_start_time):
451        """Determines if we should continue with the next mainloop iteration.
452
453        @param lifetime_hours: (float) number of hours to loop for. None
454                implies no deadline.
455        @param process_start_time: Time when we started looping.
456        @returns True if we should continue looping, False otherwise.
457        """
458        if self._shutdown_requested:
459            return False
460
461        if (lifetime_hours is None
462            or time.time() - loop_start_time < lifetime_hours * 3600):
463            return True
464        logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
465                     lifetime_hours)
466        return False
467
468
469def handle_signal(signum, frame):
470    """Sigint handler so we don't crash mid-tick."""
471    _heartbeat_client.shutdown()
472
473
474def _get_shard_hostname_and_ensure_running_on_shard():
475    """Read the hostname the local shard from the global configuration.
476
477    Raise an exception if run from elsewhere than a shard.
478
479    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
480            elsewhere than from a shard.
481    """
482    hostname = global_config.global_config.get_config_value(
483        'SHARD', 'shard_hostname', default=None)
484    if not hostname:
485        raise error.HeartbeatOnlyAllowedInShardModeException(
486            'To run the shard client, shard_hostname must neither be None nor '
487            'empty.')
488    return hostname
489
490
491def _get_tick_pause_sec():
492    """Read pause to make between two ticks from the global configuration."""
493    return global_config.global_config.get_config_value(
494        'SHARD', 'heartbeat_pause_sec', type=float)
495
496
497def get_shard_client():
498    """Instantiate a shard client instance.
499
500    Configuration values will be read from the global configuration.
501
502    @returns A shard client instance.
503    """
504    global_afe_hostname = server_utils.get_global_afe_hostname()
505    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
506    tick_pause_sec = _get_tick_pause_sec()
507    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
508
509
510def main():
511    parser = argparse.ArgumentParser(description='Shard client.')
512    parser.add_argument(
513            '--lifetime-hours',
514            type=float,
515            default=None,
516            help='If provided, number of hours we should run for. '
517                 'At the expiry of this time, the process will exit '
518                 'gracefully.',
519    )
520    parser.add_argument(
521            '--metrics-file',
522            help='If provided, drop metrics to this local file instead of '
523                 'reporting to ts_mon',
524            type=str,
525            default=None,
526    )
527    options = parser.parse_args()
528
529    with ts_mon_config.SetupTsMonGlobalState(
530          'shard_client',
531          indirect=True,
532          debug_file=options.metrics_file,
533    ):
534        try:
535            metrics.Counter('chromeos/autotest/shard_client/start').increment()
536            main_without_exception_handling(options)
537        except Exception as e:
538            metrics.Counter('chromeos/autotest/shard_client/uncaught_exception'
539                            ).increment()
540            message = 'Uncaught exception. Terminating shard_client.'
541            email_manager.manager.log_stacktrace(message)
542            logging.exception(message)
543            raise
544        finally:
545            email_manager.manager.send_queued_emails()
546
547
548def main_without_exception_handling(options):
549    scheduler_lib.setup_logging(
550            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
551            None, timestamped_logfile_prefix='shard_client')
552
553    logging.info("Setting signal handler.")
554    signal.signal(signal.SIGINT, handle_signal)
555    signal.signal(signal.SIGTERM, handle_signal)
556
557    logging.info("Starting shard client.")
558    global _heartbeat_client
559    _heartbeat_client = get_shard_client()
560    _heartbeat_client.loop(options.lifetime_hours)
561
562
563if __name__ == '__main__':
564    main()
565