1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import argparse 9import httplib 10import logging 11import os 12import random 13import signal 14import time 15import urllib2 16 17import common 18 19from autotest_lib.frontend import setup_django_environment 20from autotest_lib.client.common_lib import error 21from autotest_lib.client.common_lib import global_config 22from autotest_lib.frontend.afe import models 23from autotest_lib.scheduler import email_manager 24from autotest_lib.scheduler import scheduler_lib 25from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 26from autotest_lib.server import utils as server_utils 27from chromite.lib import timeout_util 28from django.db import transaction 29 30try: 31 from chromite.lib import metrics 32 from chromite.lib import ts_mon_config 33except ImportError: 34 metrics = server_utils.metrics_mock 35 ts_mon_config = server_utils.metrics_mock 36 37 38""" 39Autotest shard client 40 41The shard client can be run as standalone service. It periodically polls the 42master in a heartbeat, retrieves new jobs and hosts and inserts them into the 43local database. 44 45A shard is set up (by a human) and pointed to the global AFE (cautotest). 46On the shard, this script periodically makes so called heartbeat requests to the 47global AFE, which will then complete the following actions: 48 491. Find the previously created (with atest) record for the shard. Shards are 50 identified by their hostnames, specified in the shadow_config. 512. Take the records that were sent in the heartbeat and insert them into the 52 global database. 53 - This is to set the status of jobs to completed in the master database after 54 they were run by a slave. This is necessary so one can just look at the 55 master's afe to see the statuses of all jobs. Otherwise one would have to 56 check the tko tables or the individual slave AFEs. 573. Find labels that have been assigned to this shard. 584. Assign hosts that: 59 - have the specified label 60 - aren't leased 61 - have an id which is not in the known_host_ids which were sent in the 62 heartbeat request. 635. Assign jobs that: 64 - depend on the specified label 65 - haven't been assigned before 66 - aren't started yet 67 - aren't completed yet 68 - have an id which is not in the jobs_known_ids which were sent in the 69 heartbeat request. 706. Serialize the chosen jobs and hosts. 71 - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users, 72 and many more. Details about this can be found around 73 model_logic.serialize() 747. Send these objects to the slave. 75 76 77On the client side, this will happen: 781. Deserialize the objects sent from the master and persist them to the local 79 database. 802. monitor_db on the shard will pick up these jobs and schedule them on the 81 available hosts (which were retrieved from a heartbeat). 823. Once a job is finished, it's shard_id is set to NULL 834. The shard_client will pick up all jobs where shard_id=NULL and will 84 send them to the master in the request of the next heartbeat. 85 - The master will persist them as described earlier. 86 - the shard_id will be set back to the shard's id, so the record won't be 87 uploaded again. 88 The heartbeat request will also contain the ids of incomplete jobs and the 89 ids of all hosts. This is used to not send objects repeatedly. For more 90 information on this and alternatives considered 91 see rpc_interface.shard_heartbeat. 92""" 93 94 95HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat' 96 97RPC_TIMEOUT_MIN = 5 98RPC_DELAY_SEC = 5 99 100_heartbeat_client = None 101 102 103class ShardClient(object): 104 """Performs client side tasks of sharding, i.e. the heartbeat. 105 106 This class contains the logic to do periodic heartbeats to a global AFE, 107 to retrieve new jobs from it and to report completed jobs back. 108 """ 109 110 def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec): 111 self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname, 112 timeout_min=RPC_TIMEOUT_MIN, 113 delay_sec=RPC_DELAY_SEC) 114 self.hostname = shard_hostname 115 self.tick_pause_sec = tick_pause_sec 116 self._shutdown = False 117 self._shard = None 118 119 120 def _deserialize_many(self, serialized_list, djmodel, message): 121 """Deserialize data in JSON format to database. 122 123 Deserialize a list of JSON-formatted data to database using Django. 124 125 @param serialized_list: A list of JSON-formatted data. 126 @param djmodel: Django model type. 127 @param message: A string to be used in a logging message. 128 """ 129 for serialized in serialized_list: 130 with transaction.commit_on_success(): 131 try: 132 djmodel.deserialize(serialized) 133 except Exception as e: 134 logging.error('Deserializing a %s fails: %s, Error: %s', 135 message, serialized, e) 136 metrics.Counter( 137 'chromeos/autotest/shard_client/deserialization_failed' 138 ).increment() 139 140 141 @metrics.SecondsTimerDecorator( 142 'chromeos/autotest/shard_client/heartbeat_response_duration') 143 def process_heartbeat_response(self, heartbeat_response): 144 """Save objects returned by a heartbeat to the local database. 145 146 This deseralizes hosts and jobs including their dependencies and saves 147 them to the local database. 148 149 @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs', 150 as returned by the `shard_heartbeat` rpc 151 call. 152 """ 153 hosts_serialized = heartbeat_response['hosts'] 154 jobs_serialized = heartbeat_response['jobs'] 155 suite_keyvals_serialized = heartbeat_response['suite_keyvals'] 156 157 metrics.Gauge('chromeos/autotest/shard_client/hosts_received' 158 ).set(len(hosts_serialized)) 159 metrics.Gauge('chromeos/autotest/shard_client/jobs_received' 160 ).set(len(jobs_serialized)) 161 metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received' 162 ).set(len(suite_keyvals_serialized)) 163 164 self._deserialize_many(hosts_serialized, models.Host, 'host') 165 self._deserialize_many(jobs_serialized, models.Job, 'job') 166 self._deserialize_many(suite_keyvals_serialized, models.JobKeyval, 167 'jobkeyval') 168 169 host_ids = [h['id'] for h in hosts_serialized] 170 logging.info('Heartbeat response contains hosts %s', host_ids) 171 job_ids = [j['id'] for j in jobs_serialized] 172 logging.info('Heartbeat response contains jobs %s', job_ids) 173 parent_jobs_with_keyval = set([kv['job_id'] 174 for kv in suite_keyvals_serialized]) 175 logging.info('Heartbeat response contains suite_keyvals_for jobs %s', 176 list(parent_jobs_with_keyval)) 177 178 # If the master has just sent any jobs that we think have completed, 179 # re-sync them with the master. This is especially useful when a 180 # heartbeat or job is silently dropped, as the next heartbeat will 181 # have a disagreement. Updating the shard_id to NULL will mark these 182 # jobs for upload on the next heartbeat. 183 job_models = models.Job.objects.filter( 184 id__in=job_ids, hostqueueentry__complete=True) 185 if job_models: 186 job_models.update(shard=None) 187 job_ids_repr = ', '.join([str(job.id) for job in job_models]) 188 logging.warn('Following completed jobs are reset shard_id to NULL ' 189 'to be uploaded to master again: %s', job_ids_repr) 190 191 192 @property 193 def shard(self): 194 """Return this shard's own shard object, fetched from the database. 195 196 A shard's object is fetched from the master with the first jobs. It will 197 not exist before that time. 198 199 @returns: The shard object if it already exists, otherwise None 200 """ 201 if self._shard is None: 202 try: 203 self._shard = models.Shard.smart_get(self.hostname) 204 except models.Shard.DoesNotExist: 205 # This might happen before any jobs are assigned to this shard. 206 # This is okay because then there is nothing to offload anyway. 207 pass 208 return self._shard 209 210 211 def _get_jobs_to_upload(self): 212 jobs = [] 213 # The scheduler sets shard to None upon completion of the job. 214 # For more information on the shard field's semantic see 215 # models.Job.shard. We need to be careful to wait for both the 216 # shard_id and the complete bit here, or we will end up syncing 217 # the job without ever setting the complete bit. 218 job_ids = list(models.Job.objects.filter( 219 shard=None, 220 hostqueueentry__complete=True).values_list('pk', flat=True)) 221 222 for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all(): 223 jobs.append(job_to_upload) 224 return jobs 225 226 227 def _mark_jobs_as_uploaded(self, job_ids): 228 # self.shard might be None if no jobs were downloaded yet. 229 # But then job_ids is empty, so this is harmless. 230 # Even if there were jobs we'd in the worst case upload them twice. 231 models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard) 232 233 234 def _get_hqes_for_jobs(self, jobs): 235 hqes = [] 236 for job in jobs: 237 hqes.extend(job.hostqueueentry_set.all()) 238 return hqes 239 240 241 def _get_known_jobs_and_hosts(self): 242 """Returns lists of host and job info to send in a heartbeat. 243 244 The host and job ids are ids of objects that are already present on the 245 shard and therefore don't need to be sent again. 246 247 For jobs, only incomplete jobs are sent, as the master won't send 248 already completed jobs anyway. This helps keeping the list of id's 249 considerably small. 250 251 For hosts, host status in addition to host id are sent to master 252 to sync the host status. 253 254 @returns: Tuple of three lists. The first one contains job ids, the 255 second one host ids, and the third one host statuses. 256 """ 257 job_ids = list(models.Job.objects.filter( 258 hostqueueentry__complete=False).values_list('id', flat=True)) 259 host_models = models.Host.objects.filter(invalid=0) 260 host_ids = [] 261 host_statuses = [] 262 for h in host_models: 263 host_ids.append(h.id) 264 host_statuses.append(h.status) 265 return job_ids, host_ids, host_statuses 266 267 268 def _heartbeat_packet(self): 269 """Construct the heartbeat packet. 270 271 See rpc_interface for a more detailed description of the heartbeat. 272 273 @return: A heartbeat packet. 274 """ 275 known_job_ids, known_host_ids, known_host_statuses = ( 276 self._get_known_jobs_and_hosts()) 277 logging.info('Known jobs: %s', known_job_ids) 278 279 job_objs = self._get_jobs_to_upload() 280 hqes = [hqe.serialize(include_dependencies=False) 281 for hqe in self._get_hqes_for_jobs(job_objs)] 282 jobs = [job.serialize(include_dependencies=False) for job in job_objs] 283 logging.info('Uploading jobs %s', [j['id'] for j in jobs]) 284 285 return {'shard_hostname': self.hostname, 286 'known_job_ids': known_job_ids, 287 'known_host_ids': known_host_ids, 288 'known_host_statuses': known_host_statuses, 289 'jobs': jobs, 'hqes': hqes} 290 291 292 def _heartbeat_failure(self, log_message): 293 logging.error("Heartbeat failed. %s", log_message) 294 metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure' 295 ).increment() 296 297 298 @metrics.SecondsTimerDecorator( 299 'chromeos/autotest/shard_client/do_heatbeat_duration') 300 def do_heartbeat(self): 301 """Perform a heartbeat: Retreive new jobs. 302 303 This function executes a `shard_heartbeat` RPC. It retrieves the 304 response of this call and processes the response by storing the returned 305 objects in the local database. 306 """ 307 heartbeat_metrics_prefix = 'chromeos/autotest/shard_client/heartbeat/' 308 309 logging.info("Performing heartbeat.") 310 packet = self._heartbeat_packet() 311 metrics.Gauge(heartbeat_metrics_prefix + 'request_size').set( 312 len(str(packet))) 313 314 try: 315 response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet) 316 except urllib2.HTTPError as e: 317 self._heartbeat_failure("HTTPError %d: %s" % (e.code, e.reason)) 318 return 319 except urllib2.URLError as e: 320 self._heartbeat_failure("URLError: %s" % e.reason) 321 return 322 except httplib.HTTPException as e: 323 self._heartbeat_failure("HTTPException: %s" % e) 324 return 325 except timeout_util.TimeoutError as e: 326 self._heartbeat_failure("TimeoutError: %s" % e) 327 return 328 329 metrics.Gauge(heartbeat_metrics_prefix + 'response_size').set( 330 len(str(response))) 331 self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']]) 332 self.process_heartbeat_response(response) 333 logging.info("Heartbeat completed.") 334 335 336 def tick(self): 337 """Performs all tasks the shard clients needs to do periodically.""" 338 self.do_heartbeat() 339 metrics.Counter('chromeos/autotest/shard_client/tick').increment() 340 341 342 def loop(self): 343 """Calls tick() until shutdown() is called.""" 344 while not self._shutdown: 345 self.tick() 346 # Sleep with +/- 10% fuzzing to avoid phaselock of shards. 347 tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5) 348 time.sleep(self.tick_pause_sec + tick_fuzz) 349 350 351 def shutdown(self): 352 """Stops the shard client after the current tick.""" 353 logging.info("Shutdown request received.") 354 self._shutdown = True 355 356 357def handle_signal(signum, frame): 358 """Sigint handler so we don't crash mid-tick.""" 359 _heartbeat_client.shutdown() 360 361 362def _get_shard_hostname_and_ensure_running_on_shard(): 363 """Read the hostname the local shard from the global configuration. 364 365 Raise an exception if run from elsewhere than a shard. 366 367 @raises error.HeartbeatOnlyAllowedInShardModeException if run from 368 elsewhere than from a shard. 369 """ 370 hostname = global_config.global_config.get_config_value( 371 'SHARD', 'shard_hostname', default=None) 372 if not hostname: 373 raise error.HeartbeatOnlyAllowedInShardModeException( 374 'To run the shard client, shard_hostname must neither be None nor ' 375 'empty.') 376 return hostname 377 378 379def _get_tick_pause_sec(): 380 """Read pause to make between two ticks from the global configuration.""" 381 return global_config.global_config.get_config_value( 382 'SHARD', 'heartbeat_pause_sec', type=float) 383 384 385def get_shard_client(): 386 """Instantiate a shard client instance. 387 388 Configuration values will be read from the global configuration. 389 390 @returns A shard client instance. 391 """ 392 global_afe_hostname = server_utils.get_global_afe_hostname() 393 shard_hostname = _get_shard_hostname_and_ensure_running_on_shard() 394 tick_pause_sec = _get_tick_pause_sec() 395 return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec) 396 397 398def main(): 399 ts_mon_config.SetupTsMonGlobalState('shard_client') 400 401 try: 402 metrics.Counter('chromeos/autotest/shard_client/start').increment() 403 main_without_exception_handling() 404 except Exception as e: 405 message = 'Uncaught exception. Terminating shard_client.' 406 email_manager.manager.log_stacktrace(message) 407 logging.exception(message) 408 raise 409 finally: 410 email_manager.manager.send_queued_emails() 411 412 413def main_without_exception_handling(): 414 parser = argparse.ArgumentParser(description='Shard client.') 415 options = parser.parse_args() 416 417 scheduler_lib.setup_logging( 418 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 419 None, timestamped_logfile_prefix='shard_client') 420 421 logging.info("Setting signal handler.") 422 signal.signal(signal.SIGINT, handle_signal) 423 signal.signal(signal.SIGTERM, handle_signal) 424 425 logging.info("Starting shard client.") 426 global _heartbeat_client 427 _heartbeat_client = get_shard_client() 428 _heartbeat_client.loop() 429 430 431if __name__ == '__main__': 432 main() 433