1#!/usr/bin/env python
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Standalone service to monitor AFE servers and report to ts_mon"""
7import sys
8import time
9import logging
10import multiprocessing
11import urllib2
12
13import common
14from autotest_lib.client.common_lib import global_config
15from autotest_lib.frontend.afe.json_rpc import proxy
16from autotest_lib.server import frontend
17# import needed to setup host_attributes
18# pylint: disable=unused-import
19from autotest_lib.server import site_host_attributes
20from autotest_lib.site_utils import server_manager_utils
21from chromite.lib import commandline
22from chromite.lib import metrics
23from chromite.lib import ts_mon_config
24
25METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
26METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
27METRIC_TICK = METRIC_ROOT + '/tick'
28METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
29
30FAILURE_REASONS = {
31        proxy.JSONRPCException: 'JSONRPCException',
32        }
33
34def afe_rpc_call(hostname):
35    """Perform one rpc call set on server
36
37    @param hostname: server's hostname to poll
38    """
39    afe_monitor = AfeMonitor(hostname)
40    try:
41        afe_monitor.run()
42    except Exception as e:
43        metrics.Counter(METRIC_MONITOR_ERROR).increment(
44                fields={'target_hostname': hostname})
45        logging.exception('Exception when running against host %s', hostname)
46
47
48def update_shards(shards, shards_lock, period=600, stop_event=None):
49    """Updates dict of shards
50
51    @param shards: list of shards to be updated
52    @param shards_lock: shared lock for accessing shards
53    @param period: time between polls
54    @param stop_event: Event that can be set to stop polling
55    """
56    while(not stop_event or not stop_event.is_set()):
57        start_time = time.time()
58
59        logging.debug('Updating Shards')
60        new_shards = set(server_manager_utils.get_shards())
61
62        with shards_lock:
63            current_shards = set(shards)
64            rm_shards = current_shards - new_shards
65            add_shards = new_shards - current_shards
66
67            if rm_shards:
68                for s in rm_shards:
69                    shards.remove(s)
70
71            if add_shards:
72                shards.extend(add_shards)
73
74        if rm_shards:
75            logging.info('Servers left production: %s', str(rm_shards))
76
77        if add_shards:
78            logging.info('Servers entered production: %s',
79                    str(add_shards))
80
81        wait_time = (start_time + period) - time.time()
82        if wait_time > 0:
83            time.sleep(wait_time)
84
85
86def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
87                     stop_event=None):
88    """Blocking function that polls all servers and shards
89
90    @param servers: list of servers to poll
91    @param servers_lock: lock to be used when accessing servers or shards
92    @param shards: list of shards to poll
93    @param period: time between polls
94    @param stop_event: Event that can be set to stop polling
95    """
96    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
97
98    while(not stop_event or not stop_event.is_set()):
99        start_time = time.time()
100        with servers_lock:
101            all_servers = set(servers).union(shards)
102
103        logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
104        pool.map(afe_rpc_call, all_servers)
105
106        logging.debug('Finished Server Polling')
107
108        metrics.Counter(METRIC_TICK).increment()
109
110        wait_time = (start_time + period) - time.time()
111        if wait_time > 0:
112            time.sleep(wait_time)
113
114
115class RpcFlightRecorder(object):
116    """Monitors a list of AFE"""
117    def __init__(self, servers, with_shards=True, poll_period=60):
118        """
119        @param servers: list of afe services to monitor
120        @param with_shards: also record status on shards
121        @param poll_period: frequency to poll all services, in seconds
122        """
123        self._manager = multiprocessing.Manager()
124
125        self._poll_period = poll_period
126
127        self._servers = self._manager.list(servers)
128        self._servers_lock = self._manager.RLock()
129
130        self._with_shards = with_shards
131        self._shards = self._manager.list()
132        self._update_shards_ps = None
133        self._poll_rpc_server_ps = None
134
135        self._stop_event = multiprocessing.Event()
136
137    def start(self):
138        """Call to start recorder"""
139        if(self._with_shards):
140            shard_args = [self._shards, self._servers_lock]
141            shard_kwargs = {'stop_event': self._stop_event}
142            self._update_shards_ps = multiprocessing.Process(
143                    name='update_shards',
144                    target=update_shards,
145                    args=shard_args,
146                    kwargs=shard_kwargs)
147
148            self._update_shards_ps.start()
149
150        poll_args = [self._servers, self._servers_lock]
151        poll_kwargs= {'shards':self._shards,
152                     'period':self._poll_period,
153                     'stop_event':self._stop_event}
154        self._poll_rpc_server_ps = multiprocessing.Process(
155                name='poll_rpc_servers',
156                target=poll_rpc_servers,
157                args=poll_args,
158                kwargs=poll_kwargs)
159
160        self._poll_rpc_server_ps.start()
161
162    def close(self):
163        """Send close event to all sub processes"""
164        self._stop_event.set()
165
166
167    def termitate(self):
168        """Terminate processes"""
169        self.close()
170        if self._poll_rpc_server_ps:
171            self._poll_rpc_server_ps.terminate()
172
173        if self._update_shards_ps:
174            self._update_shards_ps.terminate()
175
176        if self._manager:
177            self._manager.shutdown()
178
179
180    def join(self, timeout=None):
181        """Blocking call until closed and processes complete
182
183        @param timeout: passed to each process, so could be >timeout"""
184        if self._poll_rpc_server_ps:
185            self._poll_rpc_server_ps.join(timeout)
186
187        if self._update_shards_ps:
188            self._update_shards_ps.join(timeout)
189
190def _failed(fields, msg_str, reason, err=None):
191    """Mark current run failed
192
193    @param fields, ts_mon fields to mark as failed
194    @param msg_str, message string to be filled
195    @param reason: why it failed
196    @param err: optional error to log more debug info
197    """
198    fields['success'] = False
199    fields['failure_reason'] = reason
200    logging.warning("%s failed - %s", msg_str, reason)
201    if err:
202        logging.debug("%s fail_err - %s", msg_str, str(err))
203
204class AfeMonitor(object):
205    """Object that runs rpc calls against the given afe frontend"""
206
207    def __init__(self, hostname):
208        """
209        @param hostname: hostname of server to monitor, string
210        """
211        self._hostname = hostname
212        self._afe = frontend.AFE(server=self._hostname)
213        self._metric_fields = {'target_hostname': self._hostname}
214
215
216    def run_cmd(self, cmd, expected=None):
217        """Runs rpc command and log metrics
218
219        @param cmd: string of rpc command to send
220        @param expected: expected result of rpc
221        """
222        metric_fields = self._metric_fields.copy()
223        metric_fields['command'] = cmd
224        metric_fields['success'] = True
225        metric_fields['failure_reason'] = ''
226
227        with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
228                fields=dict(metric_fields), scale=0.001) as f:
229
230            msg_str = "%s:%s" % (self._hostname, cmd)
231
232
233            try:
234                result = self._afe.run(cmd)
235                logging.debug("%s result = %s", msg_str, result)
236                if expected is not None and expected != result:
237                    _failed(f, msg_str, 'IncorrectResponse')
238
239            except urllib2.HTTPError as e:
240                _failed(f, msg_str, 'HTTPError:%d' % e.code)
241
242            except Exception as e:
243                _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
244                        err=e)
245
246                if type(e) not in FAILURE_REASONS:
247                    raise
248
249            if f['success']:
250                logging.info("%s success", msg_str)
251
252
253    def run(self):
254        """Tests server and returns the result"""
255        self.run_cmd('get_server_time')
256        self.run_cmd('ping_db', [True])
257
258
259def get_parser():
260    """Returns argparse parser"""
261    parser = commandline.ArgumentParser(description=__doc__)
262
263    parser.add_argument('-a', '--afe', action='append', default=[],
264                        help='Autotest FrontEnd server to monitor')
265
266    parser.add_argument('-p', '--poll-period', type=int, default=60,
267                        help='Frequency to poll AFE servers')
268
269    parser.add_argument('--no-shards', action='store_false', dest='with_shards',
270                        help='Disable shard updating')
271
272    return parser
273
274
275def main(argv):
276    """Main function
277
278    @param argv: commandline arguments passed
279    """
280    parser = get_parser()
281    options = parser.parse_args(argv[1:])
282
283
284    if not options.afe:
285        options.afe = [global_config.global_config.get_config_value(
286                        'SERVER', 'global_afe_hostname', default='cautotest')]
287
288    with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
289                                             indirect=True):
290        flight_recorder = RpcFlightRecorder(options.afe,
291                                            with_shards=options.with_shards,
292                                            poll_period=options.poll_period)
293
294        flight_recorder.start()
295        flight_recorder.join()
296
297
298if __name__ == '__main__':
299    main(sys.argv)
300