1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6This module is designed to report metadata in a separated thread to avoid the
7performance overhead of sending data to Elasticsearch using HTTP.
8
9"""
10
11import logging
12import Queue
13import time
14import threading
15
16import common
17from autotest_lib.client.common_lib.cros.graphite import autotest_es
18from autotest_lib.client.common_lib.cros.graphite import autotest_stats
19from autotest_lib.scheduler import email_manager
20# The metadata_reporter thread runs inside scheduler process, thus it doesn't
21# need to setup django, otherwise, following import is needed:
22# from autotest_lib.frontend import setup_django_environment
23from autotest_lib.site_utils import server_manager_utils
24
25
26# Number of seconds to wait before checking queue again for uploading data.
27_REPORT_INTERVAL_SECONDS = 5
28
29_MAX_METADATA_QUEUE_SIZE = 1000000
30_MAX_UPLOAD_SIZE = 50000
31# The number of seconds for upload to fail continuously. After that, upload will
32# be limited to 1 entry.
33_MAX_UPLOAD_FAIL_DURATION = 600
34# Number of entries to retry when the previous upload failed continueously for
35# the duration of _MAX_UPLOAD_FAIL_DURATION.
36_MIN_RETRY_ENTRIES = 10
37# Queue to buffer metadata to be reported.
38metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
39
40_report_lock = threading.Lock()
41_abort = threading.Event()
42_queue_full = threading.Event()
43
44def queue(data):
45    """Queue metadata to be uploaded in reporter thread.
46
47    If the queue is full, an error will be logged for the first time the queue
48    becomes full. The call does not wait or raise Queue.Full exception, so
49    there is no overhead on the performance of caller, e.g., scheduler.
50
51    @param data: A metadata entry, which should be a dictionary.
52    """
53    try:
54        metadata_queue.put_nowait(data)
55        if _queue_full.is_set():
56            logging.info('Metadata queue is available to receive new data '
57                         'again.')
58            _queue_full.clear()
59    except Queue.Full:
60        if not _queue_full.is_set():
61            _queue_full.set()
62            logging.error('Metadata queue is full, cannot report data. '
63                          'Consider increasing the value of '
64                          '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
65                          'to %d.', _MAX_METADATA_QUEUE_SIZE)
66
67
68def _email_alert():
69    """
70    """
71    if not server_manager_utils.use_server_db():
72        logging.debug('Server database not emailed, email alert is skipped.')
73        return
74    try:
75        server_manager_utils.confirm_server_has_role(hostname='localhost',
76                                                     role='scheduler')
77    except server_manager_utils.ServerActionError:
78        # Only email alert if the server is a scheduler, not shard.
79        return
80    subject = ('Metadata upload has been failing for %d seconds' %
81               _MAX_UPLOAD_FAIL_DURATION)
82    email_manager.manager.enqueue_notify_email(subject, '')
83    email_manager.manager.send_queued_emails()
84
85
86def _run():
87    """Report metadata in the queue until being aborted.
88    """
89    # Time when the first time upload failed. None if the last upload succeeded.
90    first_failed_upload = None
91    # True if email alert was sent when upload has been failing continuously
92    # for _MAX_UPLOAD_FAIL_DURATION seconds.
93    email_alert = False
94    upload_size = _MIN_RETRY_ENTRIES
95    try:
96        while True:
97            start_time = time.time()
98            data_list = []
99            if (first_failed_upload and
100                time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
101                upload_size = _MIN_RETRY_ENTRIES
102                if not email_alert:
103                    _email_alert()
104                    email_alert = True
105            else:
106                upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
107            while (not metadata_queue.empty() and len(data_list) < upload_size):
108                data_list.append(metadata_queue.get_nowait())
109            if data_list:
110                if autotest_es.bulk_post(data_list=data_list):
111                    time_used = time.time() - start_time
112                    logging.info('%d entries of metadata uploaded in %s '
113                                 'seconds.', len(data_list), time_used)
114                    autotest_stats.Timer('metadata_reporter').send(
115                            'time_used', time_used)
116                    autotest_stats.Gauge('metadata_reporter').send(
117                            'entries_uploaded', len(data_list))
118                    first_failed_upload = None
119                    email_alert = False
120                else:
121                    logging.warn('Failed to upload %d entries of metadata, '
122                                 'they will be retried later.', len(data_list))
123                    autotest_stats.Gauge('metadata_reporter').send(
124                            'entries_failed', len(data_list))
125                    for data in data_list:
126                        queue(data)
127                    if not first_failed_upload:
128                        first_failed_upload = time.time()
129            sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
130            if sleep_time < 0:
131                sleep_time = 0.5
132            _abort.wait(timeout=sleep_time)
133    except Exception as e:
134        logging.error('Metadata reporter thread failed with error: %s', e)
135        raise
136    finally:
137        logging.info('Metadata reporting thread is exiting.')
138        _abort.clear()
139        _report_lock.release()
140
141
142def start():
143    """Start the thread to report metadata.
144    """
145    # The lock makes sure there is only one reporting thread working.
146    if _report_lock.locked():
147        logging.error('There is already a metadata reporter thread.')
148        return
149
150    _report_lock.acquire()
151    reporting_thread = threading.Thread(target=_run)
152    # Make it a daemon thread so it doesn't need to be closed explicitly.
153    reporting_thread.setDaemon(True)
154    reporting_thread.start()
155    logging.info('Metadata reporting thread is started.')
156
157
158def abort():
159    """Abort the thread to report metadata.
160
161    The call will wait up to 5 seconds for existing data to be uploaded.
162    """
163    if  not _report_lock.locked():
164        logging.error('The metadata reporting thread has already exited.')
165        return
166
167    _abort.set()
168    logging.info('Waiting up to %s seconds for metadata reporting thread to '
169                 'complete.', _REPORT_INTERVAL_SECONDS)
170    _abort.wait(_REPORT_INTERVAL_SECONDS)
171