1#!/usr/bin/env python2
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Standalone service to monitor AFE servers and report to ts_mon"""
7
8import argparse
9import sys
10import time
11import logging
12import multiprocessing
13from six.moves import urllib
14
15import common
16from autotest_lib.client.common_lib import global_config
17from autotest_lib.frontend.afe.json_rpc import proxy
18from autotest_lib.server import frontend
19# import needed to setup host_attributes
20# pylint: disable=unused-import
21from autotest_lib.server import site_host_attributes
22from autotest_lib.site_utils import server_manager_utils
23from chromite.lib import metrics
24from chromite.lib import ts_mon_config
25
26METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
27METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
28METRIC_TICK = METRIC_ROOT + '/tick'
29METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
30
31FAILURE_REASONS = {
32        proxy.JSONRPCException: 'JSONRPCException',
33        }
34
35def afe_rpc_call(hostname):
36    """Perform one rpc call set on server
37
38    @param hostname: server's hostname to poll
39    """
40    afe_monitor = AfeMonitor(hostname)
41    try:
42        afe_monitor.run()
43    except Exception as e:
44        metrics.Counter(METRIC_MONITOR_ERROR).increment(
45                fields={'target_hostname': hostname})
46        logging.exception('Exception when running against host %s', hostname)
47
48
49def update_shards(shards, shards_lock, period=600, stop_event=None):
50    """Updates dict of shards
51
52    @param shards: list of shards to be updated
53    @param shards_lock: shared lock for accessing shards
54    @param period: time between polls
55    @param stop_event: Event that can be set to stop polling
56    """
57    while(not stop_event or not stop_event.is_set()):
58        start_time = time.time()
59
60        logging.debug('Updating Shards')
61        new_shards = set(server_manager_utils.get_shards())
62
63        with shards_lock:
64            current_shards = set(shards)
65            rm_shards = current_shards - new_shards
66            add_shards = new_shards - current_shards
67
68            if rm_shards:
69                for s in rm_shards:
70                    shards.remove(s)
71
72            if add_shards:
73                shards.extend(add_shards)
74
75        if rm_shards:
76            logging.info('Servers left production: %s', str(rm_shards))
77
78        if add_shards:
79            logging.info('Servers entered production: %s',
80                    str(add_shards))
81
82        wait_time = (start_time + period) - time.time()
83        if wait_time > 0:
84            time.sleep(wait_time)
85
86
87def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
88                     stop_event=None):
89    """Blocking function that polls all servers and shards
90
91    @param servers: list of servers to poll
92    @param servers_lock: lock to be used when accessing servers or shards
93    @param shards: list of shards to poll
94    @param period: time between polls
95    @param stop_event: Event that can be set to stop polling
96    """
97    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
98
99    while(not stop_event or not stop_event.is_set()):
100        start_time = time.time()
101        with servers_lock:
102            all_servers = set(servers).union(shards)
103
104        logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
105        pool.map(afe_rpc_call, all_servers)
106
107        logging.debug('Finished Server Polling')
108
109        metrics.Counter(METRIC_TICK).increment()
110
111        wait_time = (start_time + period) - time.time()
112        if wait_time > 0:
113            time.sleep(wait_time)
114
115
116class RpcFlightRecorder(object):
117    """Monitors a list of AFE"""
118    def __init__(self, servers, with_shards=True, poll_period=60):
119        """
120        @param servers: list of afe services to monitor
121        @param with_shards: also record status on shards
122        @param poll_period: frequency to poll all services, in seconds
123        """
124        self._manager = multiprocessing.Manager()
125
126        self._poll_period = poll_period
127
128        self._servers = self._manager.list(servers)
129        self._servers_lock = self._manager.RLock()
130
131        self._with_shards = with_shards
132        self._shards = self._manager.list()
133        self._update_shards_ps = None
134        self._poll_rpc_server_ps = None
135
136        self._stop_event = multiprocessing.Event()
137
138    def start(self):
139        """Call to start recorder"""
140        if(self._with_shards):
141            shard_args = [self._shards, self._servers_lock]
142            shard_kwargs = {'stop_event': self._stop_event}
143            self._update_shards_ps = multiprocessing.Process(
144                    name='update_shards',
145                    target=update_shards,
146                    args=shard_args,
147                    kwargs=shard_kwargs)
148
149            self._update_shards_ps.start()
150
151        poll_args = [self._servers, self._servers_lock]
152        poll_kwargs= {'shards':self._shards,
153                     'period':self._poll_period,
154                     'stop_event':self._stop_event}
155        self._poll_rpc_server_ps = multiprocessing.Process(
156                name='poll_rpc_servers',
157                target=poll_rpc_servers,
158                args=poll_args,
159                kwargs=poll_kwargs)
160
161        self._poll_rpc_server_ps.start()
162
163    def close(self):
164        """Send close event to all sub processes"""
165        self._stop_event.set()
166
167
168    def join(self, timeout=None):
169        """Blocking call until closed and processes complete
170
171        @param timeout: passed to each process, so could be >timeout"""
172        if self._poll_rpc_server_ps:
173            self._poll_rpc_server_ps.join(timeout)
174
175        if self._update_shards_ps:
176            self._update_shards_ps.join(timeout)
177
178def _failed(fields, msg_str, reason, err=None):
179    """Mark current run failed
180
181    @param fields, ts_mon fields to mark as failed
182    @param msg_str, message string to be filled
183    @param reason: why it failed
184    @param err: optional error to log more debug info
185    """
186    fields['success'] = False
187    fields['failure_reason'] = reason
188    logging.warning("%s failed - %s", msg_str, reason)
189    if err:
190        logging.debug("%s fail_err - %s", msg_str, str(err))
191
192class AfeMonitor(object):
193    """Object that runs rpc calls against the given afe frontend"""
194
195    def __init__(self, hostname):
196        """
197        @param hostname: hostname of server to monitor, string
198        """
199        self._hostname = hostname
200        self._afe = frontend.AFE(server=self._hostname)
201        self._metric_fields = {'target_hostname': self._hostname}
202
203
204    def run_cmd(self, cmd, expected=None):
205        """Runs rpc command and log metrics
206
207        @param cmd: string of rpc command to send
208        @param expected: expected result of rpc
209        """
210        metric_fields = self._metric_fields.copy()
211        metric_fields['command'] = cmd
212        metric_fields['success'] = True
213        metric_fields['failure_reason'] = ''
214
215        with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
216                fields=dict(metric_fields), scale=0.001) as f:
217
218            msg_str = "%s:%s" % (self._hostname, cmd)
219
220
221            try:
222                result = self._afe.run(cmd)
223                logging.debug("%s result = %s", msg_str, result)
224                if expected is not None and expected != result:
225                    _failed(f, msg_str, 'IncorrectResponse')
226
227            except urllib.error.HTTPError as e:
228                _failed(f, msg_str, 'HTTPError:%d' % e.code)
229
230            except Exception as e:
231                _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
232                        err=e)
233
234                if type(e) not in FAILURE_REASONS:
235                    raise
236
237            if f['success']:
238                logging.info("%s success", msg_str)
239
240
241    def run(self):
242        """Tests server and returns the result"""
243        self.run_cmd('get_server_time')
244        self.run_cmd('ping_db', [True])
245
246
247def get_parser():
248    """Returns argparse parser"""
249    parser = argparse.ArgumentParser(description=__doc__)
250
251    parser.add_argument('-a', '--afe', action='append', default=[],
252                        help='Autotest FrontEnd server to monitor')
253
254    parser.add_argument('-p', '--poll-period', type=int, default=60,
255                        help='Frequency to poll AFE servers')
256
257    parser.add_argument('--no-shards', action='store_false', dest='with_shards',
258                        help='Disable shard updating')
259
260    return parser
261
262
263def main(argv):
264    """Main function
265
266    @param argv: commandline arguments passed
267    """
268    parser = get_parser()
269    options = parser.parse_args(argv[1:])
270
271
272    if not options.afe:
273        options.afe = [global_config.global_config.get_config_value(
274                        'SERVER', 'global_afe_hostname', default='cautotest')]
275
276    with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
277                                             indirect=True):
278        flight_recorder = RpcFlightRecorder(options.afe,
279                                            with_shards=options.with_shards,
280                                            poll_period=options.poll_period)
281
282        flight_recorder.start()
283        flight_recorder.join()
284
285
286if __name__ == '__main__':
287    main(sys.argv)
288