1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import math
7import threading
8
9import common
10from autotest_lib.client.common_lib import env
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib import utils
13from autotest_lib.client.common_lib.cros import retry
14from autotest_lib.frontend.afe.json_rpc import proxy
15from autotest_lib.server import frontend
16try:
17    from chromite.lib import retry_util
18    from chromite.lib import timeout_util
19except ImportError:
20    logging.warn('Unable to import chromite.')
21    retry_util = None
22    timeout_util = None
23
24try:
25    from chromite.lib import metrics
26except ImportError:
27    logging.warn('Unable to import metrics from chromite.')
28    metrics = utils.metrics_mock
29
30
31def convert_timeout_to_retry(backoff, timeout_min, delay_sec):
32    """Compute the number of retry attempts for use with chromite.retry_util.
33
34    @param backoff: The exponential backoff factor.
35    @param timeout_min: The maximum amount of time (in minutes) to sleep.
36    @param delay_sec: The amount to sleep (in seconds) between each attempt.
37
38    @return: The number of retry attempts in the case of exponential backoff.
39    """
40    # Estimate the max_retry in the case of exponential backoff:
41    # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r)
42    # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) )
43    # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff))
44    # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff)
45    total_sleep = timeout_min * 60
46    numerator = math.log10(1-(total_sleep/delay_sec)*(1-backoff))
47    denominator = math.log10(backoff)
48    return int(math.ceil(numerator/denominator))
49
50
51class RetryingAFE(frontend.AFE):
52    """Wrapper around frontend.AFE that retries all RPCs.
53
54    Timeout for retries and delay between retries are configurable.
55    """
56    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
57        """Constructor
58
59        @param timeout_min: timeout in minutes until giving up.
60        @param delay_sec: pre-jittered delay between retries in seconds.
61        """
62        self.timeout_min = timeout_min
63        self.delay_sec = delay_sec
64        super(RetryingAFE, self).__init__(**dargs)
65
66
67    def set_timeout(self, timeout_min):
68        """Set timeout minutes for the AFE server.
69
70        @param timeout_min: The timeout minutes for AFE server.
71        """
72        self.timeout_min = timeout_min
73
74
75    def run(self, call, **dargs):
76        """Method for running RPC call.
77
78        @param call: A string RPC call.
79        @param dargs: the parameters of the RPC call.
80        """
81        if retry_util is None:
82            raise ImportError('Unable to import chromite. Please consider to '
83                              'run build_externals to build site packages.')
84        # exc_retry: We retry if this exception is raised.
85        # blacklist: Exceptions that we raise immediately if caught.
86        exc_retry = Exception
87        blacklist = (ImportError, error.RPCException, proxy.JSONRPCException,
88                     timeout_util.TimeoutError, error.ControlFileNotFound)
89        backoff = 2
90        max_retry = convert_timeout_to_retry(backoff, self.timeout_min,
91                                             self.delay_sec)
92
93        def _run(self, call, **dargs):
94            return super(RetryingAFE, self).run(call, **dargs)
95
96        def handler(exc):
97            """Check if exc is an exc_retry or if it's blacklisted.
98
99            @param exc: An exception.
100
101            @return: True if exc is an exc_retry and is not
102                     blacklisted. False otherwise.
103            """
104            is_exc_to_check = isinstance(exc, exc_retry)
105            is_blacklisted = isinstance(exc, blacklist)
106            return is_exc_to_check and not is_blacklisted
107
108        # If the call is not in main thread, signal can't be used to abort the
109        # call. In that case, use a basic retry which does not enforce timeout
110        # if the process hangs.
111        @retry.retry(Exception, timeout_min=self.timeout_min,
112                     delay_sec=self.delay_sec,
113                     blacklist=[ImportError, error.RPCException,
114                                proxy.ValidationError])
115        def _run_in_child_thread(self, call, **dargs):
116            return super(RetryingAFE, self).run(call, **dargs)
117
118        if isinstance(threading.current_thread(), threading._MainThread):
119            # Set the keyword argument for GenericRetry
120            dargs['sleep'] = self.delay_sec
121            dargs['backoff_factor'] = backoff
122            # timeout_util.Timeout fundamentally relies on sigalrm, and doesn't
123            # work at all in wsgi environment (just emits logs spam). So, don't
124            # use it in wsgi.
125            try:
126                if env.IN_MOD_WSGI:
127                    return retry_util.GenericRetry(handler, max_retry, _run,
128                                                   self, call, **dargs)
129                with timeout_util.Timeout(self.timeout_min * 60):
130                    return retry_util.GenericRetry(handler, max_retry, _run,
131                                                   self, call, **dargs)
132            except timeout_util.TimeoutError:
133                c = metrics.Counter(
134                        'chromeos/autotest/retrying_afe/retry_timeout')
135                # Reserve field job_details for future use.
136                f = {'destination_server': self.server.split(':')[0],
137                     'call': call,
138                     'job_details': ''}
139                c.increment(fields=f)
140                raise
141        else:
142            return _run_in_child_thread(self, call, **dargs)
143
144
145class RetryingTKO(frontend.TKO):
146    """Wrapper around frontend.TKO that retries all RPCs.
147
148    Timeout for retries and delay between retries are configurable.
149    """
150    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
151        """Constructor
152
153        @param timeout_min: timeout in minutes until giving up.
154        @param delay_sec: pre-jittered delay between retries in seconds.
155        """
156        self.timeout_min = timeout_min
157        self.delay_sec = delay_sec
158        super(RetryingTKO, self).__init__(**dargs)
159
160
161    def run(self, call, **dargs):
162        """Method for running RPC call.
163
164        @param call: A string RPC call.
165        @param dargs: the parameters of the RPC call.
166        """
167        @retry.retry(Exception, timeout_min=self.timeout_min,
168                     delay_sec=self.delay_sec,
169                     blacklist=[ImportError, error.RPCException,
170                                proxy.ValidationError])
171        def _run(self, call, **dargs):
172            return super(RetryingTKO, self).run(call, **dargs)
173        return _run(self, call, **dargs)
174