1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" 6This module is designed to report metadata in a separated thread to avoid the 7performance overhead of sending data to Elasticsearch using HTTP. 8 9""" 10 11import logging 12import Queue 13import time 14import threading 15 16import common 17from autotest_lib.client.common_lib.cros.graphite import autotest_es 18from autotest_lib.client.common_lib.cros.graphite import autotest_stats 19from autotest_lib.scheduler import email_manager 20# The metadata_reporter thread runs inside scheduler process, thus it doesn't 21# need to setup django, otherwise, following import is needed: 22# from autotest_lib.frontend import setup_django_environment 23from autotest_lib.site_utils import server_manager_utils 24 25 26# Number of seconds to wait before checking queue again for uploading data. 27_REPORT_INTERVAL_SECONDS = 5 28 29_MAX_METADATA_QUEUE_SIZE = 1000000 30_MAX_UPLOAD_SIZE = 50000 31# The number of seconds for upload to fail continuously. After that, upload will 32# be limited to 1 entry. 33_MAX_UPLOAD_FAIL_DURATION = 600 34# Number of entries to retry when the previous upload failed continueously for 35# the duration of _MAX_UPLOAD_FAIL_DURATION. 36_MIN_RETRY_ENTRIES = 10 37# Queue to buffer metadata to be reported. 38metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) 39 40_report_lock = threading.Lock() 41_abort = threading.Event() 42_queue_full = threading.Event() 43 44def queue(data): 45 """Queue metadata to be uploaded in reporter thread. 46 47 If the queue is full, an error will be logged for the first time the queue 48 becomes full. The call does not wait or raise Queue.Full exception, so 49 there is no overhead on the performance of caller, e.g., scheduler. 50 51 @param data: A metadata entry, which should be a dictionary. 52 """ 53 try: 54 metadata_queue.put_nowait(data) 55 if _queue_full.is_set(): 56 logging.info('Metadata queue is available to receive new data ' 57 'again.') 58 _queue_full.clear() 59 except Queue.Full: 60 if not _queue_full.is_set(): 61 _queue_full.set() 62 logging.error('Metadata queue is full, cannot report data. ' 63 'Consider increasing the value of ' 64 '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' 65 'to %d.', _MAX_METADATA_QUEUE_SIZE) 66 67 68def _email_alert(): 69 """ 70 """ 71 if not server_manager_utils.use_server_db(): 72 logging.debug('Server database not emailed, email alert is skipped.') 73 return 74 try: 75 server_manager_utils.confirm_server_has_role(hostname='localhost', 76 role='scheduler') 77 except server_manager_utils.ServerActionError: 78 # Only email alert if the server is a scheduler, not shard. 79 return 80 subject = ('Metadata upload has been failing for %d seconds' % 81 _MAX_UPLOAD_FAIL_DURATION) 82 email_manager.manager.enqueue_notify_email(subject, '') 83 email_manager.manager.send_queued_emails() 84 85 86def _run(): 87 """Report metadata in the queue until being aborted. 88 """ 89 # Time when the first time upload failed. None if the last upload succeeded. 90 first_failed_upload = None 91 # True if email alert was sent when upload has been failing continuously 92 # for _MAX_UPLOAD_FAIL_DURATION seconds. 93 email_alert = False 94 upload_size = _MIN_RETRY_ENTRIES 95 try: 96 while True: 97 start_time = time.time() 98 data_list = [] 99 if (first_failed_upload and 100 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION): 101 upload_size = _MIN_RETRY_ENTRIES 102 if not email_alert: 103 _email_alert() 104 email_alert = True 105 else: 106 upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE) 107 while (not metadata_queue.empty() and len(data_list) < upload_size): 108 data_list.append(metadata_queue.get_nowait()) 109 if data_list: 110 if autotest_es.bulk_post(data_list=data_list): 111 time_used = time.time() - start_time 112 logging.info('%d entries of metadata uploaded in %s ' 113 'seconds.', len(data_list), time_used) 114 autotest_stats.Timer('metadata_reporter').send( 115 'time_used', time_used) 116 autotest_stats.Gauge('metadata_reporter').send( 117 'entries_uploaded', len(data_list)) 118 first_failed_upload = None 119 email_alert = False 120 else: 121 logging.warn('Failed to upload %d entries of metadata, ' 122 'they will be retried later.', len(data_list)) 123 autotest_stats.Gauge('metadata_reporter').send( 124 'entries_failed', len(data_list)) 125 for data in data_list: 126 queue(data) 127 if not first_failed_upload: 128 first_failed_upload = time.time() 129 sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time 130 if sleep_time < 0: 131 sleep_time = 0.5 132 _abort.wait(timeout=sleep_time) 133 except Exception as e: 134 logging.error('Metadata reporter thread failed with error: %s', e) 135 raise 136 finally: 137 logging.info('Metadata reporting thread is exiting.') 138 _abort.clear() 139 _report_lock.release() 140 141 142def start(): 143 """Start the thread to report metadata. 144 """ 145 # The lock makes sure there is only one reporting thread working. 146 if _report_lock.locked(): 147 logging.error('There is already a metadata reporter thread.') 148 return 149 150 _report_lock.acquire() 151 reporting_thread = threading.Thread(target=_run) 152 # Make it a daemon thread so it doesn't need to be closed explicitly. 153 reporting_thread.setDaemon(True) 154 reporting_thread.start() 155 logging.info('Metadata reporting thread is started.') 156 157 158def abort(): 159 """Abort the thread to report metadata. 160 161 The call will wait up to 5 seconds for existing data to be uploaded. 162 """ 163 if not _report_lock.locked(): 164 logging.error('The metadata reporting thread has already exited.') 165 return 166 167 _abort.set() 168 logging.info('Waiting up to %s seconds for metadata reporting thread to ' 169 'complete.', _REPORT_INTERVAL_SECONDS) 170 _abort.wait(_REPORT_INTERVAL_SECONDS) 171