1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import argparse
9import httplib
10import logging
11import os
12import random
13import signal
14import time
15import urllib2
16
17import common
18
19from autotest_lib.frontend import setup_django_environment
20from autotest_lib.client.common_lib import error
21from autotest_lib.client.common_lib import global_config
22from autotest_lib.frontend.afe import models
23from autotest_lib.scheduler import email_manager
24from autotest_lib.scheduler import scheduler_lib
25from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
26from autotest_lib.server import utils as server_utils
27from chromite.lib import timeout_util
28from django.db import transaction
29
30try:
31    from chromite.lib import metrics
32    from chromite.lib import ts_mon_config
33except ImportError:
34    metrics = server_utils.metrics_mock
35    ts_mon_config = server_utils.metrics_mock
36
37
38"""
39Autotest shard client
40
41The shard client can be run as standalone service. It periodically polls the
42master in a heartbeat, retrieves new jobs and hosts and inserts them into the
43local database.
44
45A shard is set up (by a human) and pointed to the global AFE (cautotest).
46On the shard, this script periodically makes so called heartbeat requests to the
47global AFE, which will then complete the following actions:
48
491. Find the previously created (with atest) record for the shard. Shards are
50   identified by their hostnames, specified in the shadow_config.
512. Take the records that were sent in the heartbeat and insert them into the
52   global database.
53   - This is to set the status of jobs to completed in the master database after
54     they were run by a slave. This is necessary so one can just look at the
55     master's afe to see the statuses of all jobs. Otherwise one would have to
56     check the tko tables or the individual slave AFEs.
573. Find labels that have been assigned to this shard.
584. Assign hosts that:
59   - have the specified label
60   - aren't leased
61   - have an id which is not in the known_host_ids which were sent in the
62     heartbeat request.
635. Assign jobs that:
64   - depend on the specified label
65   - haven't been assigned before
66   - aren't started yet
67   - aren't completed yet
68   - have an id which is not in the jobs_known_ids which were sent in the
69     heartbeat request.
706. Serialize the chosen jobs and hosts.
71   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
72     and many more. Details about this can be found around
73     model_logic.serialize()
747. Send these objects to the slave.
75
76
77On the client side, this will happen:
781. Deserialize the objects sent from the master and persist them to the local
79   database.
802. monitor_db on the shard will pick up these jobs and schedule them on the
81   available hosts (which were retrieved from a heartbeat).
823. Once a job is finished, it's shard_id is set to NULL
834. The shard_client will pick up all jobs where shard_id=NULL and will
84   send them to the master in the request of the next heartbeat.
85   - The master will persist them as described earlier.
86   - the shard_id will be set back to the shard's id, so the record won't be
87     uploaded again.
88   The heartbeat request will also contain the ids of incomplete jobs and the
89   ids of all hosts. This is used to not send objects repeatedly. For more
90   information on this and alternatives considered
91   see rpc_interface.shard_heartbeat.
92"""
93
94
95HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
96
97RPC_TIMEOUT_MIN = 5
98RPC_DELAY_SEC = 5
99
100_heartbeat_client = None
101
102
103class ShardClient(object):
104    """Performs client side tasks of sharding, i.e. the heartbeat.
105
106    This class contains the logic to do periodic heartbeats to a global AFE,
107    to retrieve new jobs from it and to report completed jobs back.
108    """
109
110    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
111        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
112                                                 timeout_min=RPC_TIMEOUT_MIN,
113                                                 delay_sec=RPC_DELAY_SEC)
114        self.hostname = shard_hostname
115        self.tick_pause_sec = tick_pause_sec
116        self._shutdown = False
117        self._shard = None
118
119
120    def _deserialize_many(self, serialized_list, djmodel, message):
121        """Deserialize data in JSON format to database.
122
123        Deserialize a list of JSON-formatted data to database using Django.
124
125        @param serialized_list: A list of JSON-formatted data.
126        @param djmodel: Django model type.
127        @param message: A string to be used in a logging message.
128        """
129        for serialized in serialized_list:
130            with transaction.commit_on_success():
131                try:
132                    djmodel.deserialize(serialized)
133                except Exception as e:
134                    logging.error('Deserializing a %s fails: %s, Error: %s',
135                                  message, serialized, e)
136                    metrics.Counter(
137                        'chromeos/autotest/shard_client/deserialization_failed'
138                        ).increment()
139
140
141    @metrics.SecondsTimerDecorator(
142            'chromeos/autotest/shard_client/heartbeat_response_duration')
143    def process_heartbeat_response(self, heartbeat_response):
144        """Save objects returned by a heartbeat to the local database.
145
146        This deseralizes hosts and jobs including their dependencies and saves
147        them to the local database.
148
149        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
150                                   as returned by the `shard_heartbeat` rpc
151                                   call.
152        """
153        hosts_serialized = heartbeat_response['hosts']
154        jobs_serialized = heartbeat_response['jobs']
155        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
156
157        metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
158                      ).set(len(hosts_serialized))
159        metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
160                      ).set(len(jobs_serialized))
161        metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
162                      ).set(len(suite_keyvals_serialized))
163
164        self._deserialize_many(hosts_serialized, models.Host, 'host')
165        self._deserialize_many(jobs_serialized, models.Job, 'job')
166        self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
167                               'jobkeyval')
168
169        host_ids = [h['id'] for h in hosts_serialized]
170        logging.info('Heartbeat response contains hosts %s', host_ids)
171        job_ids = [j['id'] for j in jobs_serialized]
172        logging.info('Heartbeat response contains jobs %s', job_ids)
173        parent_jobs_with_keyval = set([kv['job_id']
174                                       for kv in suite_keyvals_serialized])
175        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
176                     list(parent_jobs_with_keyval))
177
178        # If the master has just sent any jobs that we think have completed,
179        # re-sync them with the master. This is especially useful when a
180        # heartbeat or job is silently dropped, as the next heartbeat will
181        # have a disagreement. Updating the shard_id to NULL will mark these
182        # jobs for upload on the next heartbeat.
183        job_models = models.Job.objects.filter(
184                id__in=job_ids, hostqueueentry__complete=True)
185        if job_models:
186            job_models.update(shard=None)
187            job_ids_repr = ', '.join([str(job.id) for job in job_models])
188            logging.warn('Following completed jobs are reset shard_id to NULL '
189                         'to be uploaded to master again: %s', job_ids_repr)
190
191
192    @property
193    def shard(self):
194        """Return this shard's own shard object, fetched from the database.
195
196        A shard's object is fetched from the master with the first jobs. It will
197        not exist before that time.
198
199        @returns: The shard object if it already exists, otherwise None
200        """
201        if self._shard is None:
202            try:
203                self._shard = models.Shard.smart_get(self.hostname)
204            except models.Shard.DoesNotExist:
205                # This might happen before any jobs are assigned to this shard.
206                # This is okay because then there is nothing to offload anyway.
207                pass
208        return self._shard
209
210
211    def _get_jobs_to_upload(self):
212        jobs = []
213        # The scheduler sets shard to None upon completion of the job.
214        # For more information on the shard field's semantic see
215        # models.Job.shard. We need to be careful to wait for both the
216        # shard_id and the complete bit here, or we will end up syncing
217        # the job without ever setting the complete bit.
218        job_ids = list(models.Job.objects.filter(
219            shard=None,
220            hostqueueentry__complete=True).values_list('pk', flat=True))
221
222        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
223            jobs.append(job_to_upload)
224        return jobs
225
226
227    def _mark_jobs_as_uploaded(self, job_ids):
228        # self.shard might be None if no jobs were downloaded yet.
229        # But then job_ids is empty, so this is harmless.
230        # Even if there were jobs we'd in the worst case upload them twice.
231        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
232
233
234    def _get_hqes_for_jobs(self, jobs):
235        hqes = []
236        for job in jobs:
237            hqes.extend(job.hostqueueentry_set.all())
238        return hqes
239
240
241    def _get_known_jobs_and_hosts(self):
242        """Returns lists of host and job info to send in a heartbeat.
243
244        The host and job ids are ids of objects that are already present on the
245        shard and therefore don't need to be sent again.
246
247        For jobs, only incomplete jobs are sent, as the master won't send
248        already completed jobs anyway. This helps keeping the list of id's
249        considerably small.
250
251        For hosts, host status in addition to host id are sent to master
252        to sync the host status.
253
254        @returns: Tuple of three lists. The first one contains job ids, the
255                  second one host ids, and the third one host statuses.
256        """
257        job_ids = list(models.Job.objects.filter(
258                hostqueueentry__complete=False).values_list('id', flat=True))
259        host_models = models.Host.objects.filter(invalid=0)
260        host_ids = []
261        host_statuses = []
262        for h in host_models:
263            host_ids.append(h.id)
264            host_statuses.append(h.status)
265        return job_ids, host_ids, host_statuses
266
267
268    def _heartbeat_packet(self):
269        """Construct the heartbeat packet.
270
271        See rpc_interface for a more detailed description of the heartbeat.
272
273        @return: A heartbeat packet.
274        """
275        known_job_ids, known_host_ids, known_host_statuses = (
276                self._get_known_jobs_and_hosts())
277        logging.info('Known jobs: %s', known_job_ids)
278
279        job_objs = self._get_jobs_to_upload()
280        hqes = [hqe.serialize(include_dependencies=False)
281                for hqe in self._get_hqes_for_jobs(job_objs)]
282        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
283        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
284
285        return {'shard_hostname': self.hostname,
286                'known_job_ids': known_job_ids,
287                'known_host_ids': known_host_ids,
288                'known_host_statuses': known_host_statuses,
289                'jobs': jobs, 'hqes': hqes}
290
291
292    def _heartbeat_failure(self, log_message):
293        logging.error("Heartbeat failed. %s", log_message)
294        metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
295                        ).increment()
296
297
298    @metrics.SecondsTimerDecorator(
299            'chromeos/autotest/shard_client/do_heatbeat_duration')
300    def do_heartbeat(self):
301        """Perform a heartbeat: Retreive new jobs.
302
303        This function executes a `shard_heartbeat` RPC. It retrieves the
304        response of this call and processes the response by storing the returned
305        objects in the local database.
306        """
307        heartbeat_metrics_prefix  = 'chromeos/autotest/shard_client/heartbeat/'
308
309        logging.info("Performing heartbeat.")
310        packet = self._heartbeat_packet()
311        metrics.Gauge(heartbeat_metrics_prefix + 'request_size').set(
312            len(str(packet)))
313
314        try:
315            response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
316        except urllib2.HTTPError as e:
317            self._heartbeat_failure("HTTPError %d: %s" % (e.code, e.reason))
318            return
319        except urllib2.URLError as e:
320            self._heartbeat_failure("URLError: %s" % e.reason)
321            return
322        except httplib.HTTPException as e:
323            self._heartbeat_failure("HTTPException: %s" % e)
324            return
325        except timeout_util.TimeoutError as e:
326            self._heartbeat_failure("TimeoutError: %s" % e)
327            return
328
329        metrics.Gauge(heartbeat_metrics_prefix + 'response_size').set(
330            len(str(response)))
331        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
332        self.process_heartbeat_response(response)
333        logging.info("Heartbeat completed.")
334
335
336    def tick(self):
337        """Performs all tasks the shard clients needs to do periodically."""
338        self.do_heartbeat()
339        metrics.Counter('chromeos/autotest/shard_client/tick').increment()
340
341
342    def loop(self):
343        """Calls tick() until shutdown() is called."""
344        while not self._shutdown:
345            self.tick()
346            # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
347            tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
348            time.sleep(self.tick_pause_sec + tick_fuzz)
349
350
351    def shutdown(self):
352        """Stops the shard client after the current tick."""
353        logging.info("Shutdown request received.")
354        self._shutdown = True
355
356
357def handle_signal(signum, frame):
358    """Sigint handler so we don't crash mid-tick."""
359    _heartbeat_client.shutdown()
360
361
362def _get_shard_hostname_and_ensure_running_on_shard():
363    """Read the hostname the local shard from the global configuration.
364
365    Raise an exception if run from elsewhere than a shard.
366
367    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
368            elsewhere than from a shard.
369    """
370    hostname = global_config.global_config.get_config_value(
371        'SHARD', 'shard_hostname', default=None)
372    if not hostname:
373        raise error.HeartbeatOnlyAllowedInShardModeException(
374            'To run the shard client, shard_hostname must neither be None nor '
375            'empty.')
376    return hostname
377
378
379def _get_tick_pause_sec():
380    """Read pause to make between two ticks from the global configuration."""
381    return global_config.global_config.get_config_value(
382        'SHARD', 'heartbeat_pause_sec', type=float)
383
384
385def get_shard_client():
386    """Instantiate a shard client instance.
387
388    Configuration values will be read from the global configuration.
389
390    @returns A shard client instance.
391    """
392    global_afe_hostname = server_utils.get_global_afe_hostname()
393    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
394    tick_pause_sec = _get_tick_pause_sec()
395    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
396
397
398def main():
399    ts_mon_config.SetupTsMonGlobalState('shard_client')
400
401    try:
402        metrics.Counter('chromeos/autotest/shard_client/start').increment()
403        main_without_exception_handling()
404    except Exception as e:
405        message = 'Uncaught exception. Terminating shard_client.'
406        email_manager.manager.log_stacktrace(message)
407        logging.exception(message)
408        raise
409    finally:
410        email_manager.manager.send_queued_emails()
411
412
413def main_without_exception_handling():
414    parser = argparse.ArgumentParser(description='Shard client.')
415    options = parser.parse_args()
416
417    scheduler_lib.setup_logging(
418            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
419            None, timestamped_logfile_prefix='shard_client')
420
421    logging.info("Setting signal handler.")
422    signal.signal(signal.SIGINT, handle_signal)
423    signal.signal(signal.SIGTERM, handle_signal)
424
425    logging.info("Starting shard client.")
426    global _heartbeat_client
427    _heartbeat_client = get_shard_client()
428    _heartbeat_client.loop()
429
430
431if __name__ == '__main__':
432    main()
433