1# Copyright 2013 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#    http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing,
10# software distributed under the License is distributed on an
11# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12# either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14
15"""Util functions and classes for cloudstorage_api."""
16
17
18
19__all__ = ['set_default_retry_params',
20           'RetryParams',
21          ]
22
23import copy
24import httplib
25import logging
26import math
27import os
28import threading
29import time
30import urllib
31
32
33try:
34  from google.appengine.api import app_identity
35  from google.appengine.api import urlfetch
36  from google.appengine.datastore import datastore_rpc
37  from google.appengine.ext import ndb
38  from google.appengine.ext.ndb import eventloop
39  from google.appengine.ext.ndb import tasklets
40  from google.appengine.ext.ndb import utils
41  from google.appengine import runtime
42  from google.appengine.runtime import apiproxy_errors
43except ImportError:
44  from google.appengine.api import app_identity
45  from google.appengine.api import urlfetch
46  from google.appengine.datastore import datastore_rpc
47  from google.appengine import runtime
48  from google.appengine.runtime import apiproxy_errors
49  from google.appengine.ext import ndb
50  from google.appengine.ext.ndb import eventloop
51  from google.appengine.ext.ndb import tasklets
52  from google.appengine.ext.ndb import utils
53
54
55_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
56                         apiproxy_errors.Error,
57                         app_identity.InternalError,
58                         app_identity.BackendDeadlineExceeded)
59
60_thread_local_settings = threading.local()
61_thread_local_settings.default_retry_params = None
62
63
64def set_default_retry_params(retry_params):
65  """Set a default RetryParams for current thread current request."""
66  _thread_local_settings.default_retry_params = copy.copy(retry_params)
67
68
69def _get_default_retry_params():
70  """Get default RetryParams for current request and current thread.
71
72  Returns:
73    A new instance of the default RetryParams.
74  """
75  default = getattr(_thread_local_settings, 'default_retry_params', None)
76  if default is None or not default.belong_to_current_request():
77    return RetryParams()
78  else:
79    return copy.copy(default)
80
81
82def _quote_filename(filename):
83  """Quotes filename to use as a valid URI path.
84
85  Args:
86    filename: user provided filename. /bucket/filename.
87
88  Returns:
89    The filename properly quoted to use as URI's path component.
90  """
91  return urllib.quote(filename)
92
93
94def _unquote_filename(filename):
95  """Unquotes a valid URI path back to its filename.
96
97  This is the opposite of _quote_filename.
98
99  Args:
100    filename: a quoted filename. /bucket/some%20filename.
101
102  Returns:
103    The filename unquoted.
104  """
105  return urllib.unquote(filename)
106
107
108def _should_retry(resp):
109  """Given a urlfetch response, decide whether to retry that request."""
110  return (resp.status_code == httplib.REQUEST_TIMEOUT or
111          (resp.status_code >= 500 and
112           resp.status_code < 600))
113
114
115class _RetryWrapper(object):
116  """A wrapper that wraps retry logic around any tasklet."""
117
118  def __init__(self,
119               retry_params,
120               retriable_exceptions=_RETRIABLE_EXCEPTIONS,
121               should_retry=lambda r: False):
122    """Init.
123
124    Args:
125      retry_params: an RetryParams instance.
126      retriable_exceptions: a list of exception classes that are retriable.
127      should_retry: a function that takes a result from the tasklet and returns
128        a boolean. True if the result should be retried.
129    """
130    self.retry_params = retry_params
131    self.retriable_exceptions = retriable_exceptions
132    self.should_retry = should_retry
133
134  @ndb.tasklet
135  def run(self, tasklet, **kwds):
136    """Run a tasklet with retry.
137
138    The retry should be transparent to the caller: if no results
139    are successful, the exception or result from the last retry is returned
140    to the caller.
141
142    Args:
143      tasklet: the tasklet to run.
144      **kwds: keywords arguments to run the tasklet.
145
146    Raises:
147      The exception from running the tasklet.
148
149    Returns:
150      The result from running the tasklet.
151    """
152    start_time = time.time()
153    n = 1
154
155    while True:
156      e = None
157      result = None
158      got_result = False
159
160      try:
161        result = yield tasklet(**kwds)
162        got_result = True
163        if not self.should_retry(result):
164          raise ndb.Return(result)
165      except runtime.DeadlineExceededError:
166        logging.debug(
167            'Tasklet has exceeded request deadline after %s seconds total',
168            time.time() - start_time)
169        raise
170      except self.retriable_exceptions, e:
171        pass
172
173      if n == 1:
174        logging.debug('Tasklet is %r', tasklet)
175
176      delay = self.retry_params.delay(n, start_time)
177
178      if delay <= 0:
179        logging.debug(
180            'Tasklet failed after %s attempts and %s seconds in total',
181            n, time.time() - start_time)
182        if got_result:
183          raise ndb.Return(result)
184        elif e is not None:
185          raise e
186        else:
187          assert False, 'Should never reach here.'
188
189      if got_result:
190        logging.debug(
191            'Got result %r from tasklet.', result)
192      else:
193        logging.debug(
194            'Got exception "%r" from tasklet.', e)
195      logging.debug('Retry in %s seconds.', delay)
196      n += 1
197      yield tasklets.sleep(delay)
198
199
200class RetryParams(object):
201  """Retry configuration parameters."""
202
203  _DEFAULT_USER_AGENT = 'App Engine Python GCS Client'
204
205  @datastore_rpc._positional(1)
206  def __init__(self,
207               backoff_factor=2.0,
208               initial_delay=0.1,
209               max_delay=10.0,
210               min_retries=3,
211               max_retries=6,
212               max_retry_period=30.0,
213               urlfetch_timeout=None,
214               save_access_token=False,
215               _user_agent=None):
216    """Init.
217
218    This object is unique per request per thread.
219
220    Library will retry according to this setting when App Engine Server
221    can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
222    500-600 response.
223
224    Args:
225      backoff_factor: exponential backoff multiplier.
226      initial_delay: seconds to delay for the first retry.
227      max_delay: max seconds to delay for every retry.
228      min_retries: min number of times to retry. This value is automatically
229        capped by max_retries.
230      max_retries: max number of times to retry. Set this to 0 for no retry.
231      max_retry_period: max total seconds spent on retry. Retry stops when
232        this period passed AND min_retries has been attempted.
233      urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
234        in which case the value will be chosen by urlfetch module.
235      save_access_token: persist access token to datastore to avoid
236        excessive usage of GetAccessToken API. Usually the token is cached
237        in process and in memcache. In some cases, memcache isn't very
238        reliable.
239      _user_agent: The user agent string that you want to use in your requests.
240    """
241    self.backoff_factor = self._check('backoff_factor', backoff_factor)
242    self.initial_delay = self._check('initial_delay', initial_delay)
243    self.max_delay = self._check('max_delay', max_delay)
244    self.max_retry_period = self._check('max_retry_period', max_retry_period)
245    self.max_retries = self._check('max_retries', max_retries, True, int)
246    self.min_retries = self._check('min_retries', min_retries, True, int)
247    if self.min_retries > self.max_retries:
248      self.min_retries = self.max_retries
249
250    self.urlfetch_timeout = None
251    if urlfetch_timeout is not None:
252      self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
253    self.save_access_token = self._check('save_access_token', save_access_token,
254                                         True, bool)
255    self._user_agent = _user_agent or self._DEFAULT_USER_AGENT
256
257    self._request_id = os.getenv('REQUEST_LOG_ID')
258
259  def __eq__(self, other):
260    if not isinstance(other, self.__class__):
261      return False
262    return self.__dict__ == other.__dict__
263
264  def __ne__(self, other):
265    return not self.__eq__(other)
266
267  @classmethod
268  def _check(cls, name, val, can_be_zero=False, val_type=float):
269    """Check init arguments.
270
271    Args:
272      name: name of the argument. For logging purpose.
273      val: value. Value has to be non negative number.
274      can_be_zero: whether value can be zero.
275      val_type: Python type of the value.
276
277    Returns:
278      The value.
279
280    Raises:
281      ValueError: when invalid value is passed in.
282      TypeError: when invalid value type is passed in.
283    """
284    valid_types = [val_type]
285    if val_type is float:
286      valid_types.append(int)
287
288    if type(val) not in valid_types:
289      raise TypeError(
290          'Expect type %s for parameter %s' % (val_type.__name__, name))
291    if val < 0:
292      raise ValueError(
293          'Value for parameter %s has to be greater than 0' % name)
294    if not can_be_zero and val == 0:
295      raise ValueError(
296          'Value for parameter %s can not be 0' % name)
297    return val
298
299  def belong_to_current_request(self):
300    return os.getenv('REQUEST_LOG_ID') == self._request_id
301
302  def delay(self, n, start_time):
303    """Calculate delay before the next retry.
304
305    Args:
306      n: the number of current attempt. The first attempt should be 1.
307      start_time: the time when retry started in unix time.
308
309    Returns:
310      Number of seconds to wait before next retry. -1 if retry should give up.
311    """
312    if (n > self.max_retries or
313        (n > self.min_retries and
314         time.time() - start_time > self.max_retry_period)):
315      return -1
316    return min(
317        math.pow(self.backoff_factor, n-1) * self.initial_delay,
318        self.max_delay)
319
320
321def _run_until_rpc():
322  """Eagerly evaluate tasklets until it is blocking on some RPC.
323
324  Usually ndb eventloop el isn't run until some code calls future.get_result().
325
326  When an async tasklet is called, the tasklet wrapper evaluates the tasklet
327  code into a generator, enqueues a callback _help_tasklet_along onto
328  the el.current queue, and returns a future.
329
330  _help_tasklet_along, when called by the el, will
331  get one yielded value from the generator. If the value if another future,
332  set up a callback _on_future_complete to invoke _help_tasklet_along
333  when the dependent future fulfills. If the value if a RPC, set up a
334  callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
335  Thus _help_tasklet_along drills down
336  the chain of futures until some future is blocked by RPC. El runs
337  all callbacks and constantly check pending RPC status.
338  """
339  el = eventloop.get_event_loop()
340  while el.current:
341    el.run0()
342
343
344def _eager_tasklet(tasklet):
345  """Decorator to turn tasklet to run eagerly."""
346
347  @utils.wrapping(tasklet)
348  def eager_wrapper(*args, **kwds):
349    fut = tasklet(*args, **kwds)
350    _run_until_rpc()
351    return fut
352
353  return eager_wrapper
354