1# Copyright 2013 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, 10# software distributed under the License is distributed on an 11# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 12# either express or implied. See the License for the specific 13# language governing permissions and limitations under the License. 14 15"""Util functions and classes for cloudstorage_api.""" 16 17 18 19__all__ = ['set_default_retry_params', 20 'RetryParams', 21 ] 22 23import copy 24import httplib 25import logging 26import math 27import os 28import threading 29import time 30import urllib 31 32 33try: 34 from google.appengine.api import app_identity 35 from google.appengine.api import urlfetch 36 from google.appengine.datastore import datastore_rpc 37 from google.appengine.ext import ndb 38 from google.appengine.ext.ndb import eventloop 39 from google.appengine.ext.ndb import tasklets 40 from google.appengine.ext.ndb import utils 41 from google.appengine import runtime 42 from google.appengine.runtime import apiproxy_errors 43except ImportError: 44 from google.appengine.api import app_identity 45 from google.appengine.api import urlfetch 46 from google.appengine.datastore import datastore_rpc 47 from google.appengine import runtime 48 from google.appengine.runtime import apiproxy_errors 49 from google.appengine.ext import ndb 50 from google.appengine.ext.ndb import eventloop 51 from google.appengine.ext.ndb import tasklets 52 from google.appengine.ext.ndb import utils 53 54 55_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError, 56 apiproxy_errors.Error, 57 app_identity.InternalError, 58 app_identity.BackendDeadlineExceeded) 59 60_thread_local_settings = threading.local() 61_thread_local_settings.default_retry_params = None 62 63 64def set_default_retry_params(retry_params): 65 """Set a default RetryParams for current thread current request.""" 66 _thread_local_settings.default_retry_params = copy.copy(retry_params) 67 68 69def _get_default_retry_params(): 70 """Get default RetryParams for current request and current thread. 71 72 Returns: 73 A new instance of the default RetryParams. 74 """ 75 default = getattr(_thread_local_settings, 'default_retry_params', None) 76 if default is None or not default.belong_to_current_request(): 77 return RetryParams() 78 else: 79 return copy.copy(default) 80 81 82def _quote_filename(filename): 83 """Quotes filename to use as a valid URI path. 84 85 Args: 86 filename: user provided filename. /bucket/filename. 87 88 Returns: 89 The filename properly quoted to use as URI's path component. 90 """ 91 return urllib.quote(filename) 92 93 94def _unquote_filename(filename): 95 """Unquotes a valid URI path back to its filename. 96 97 This is the opposite of _quote_filename. 98 99 Args: 100 filename: a quoted filename. /bucket/some%20filename. 101 102 Returns: 103 The filename unquoted. 104 """ 105 return urllib.unquote(filename) 106 107 108def _should_retry(resp): 109 """Given a urlfetch response, decide whether to retry that request.""" 110 return (resp.status_code == httplib.REQUEST_TIMEOUT or 111 (resp.status_code >= 500 and 112 resp.status_code < 600)) 113 114 115class _RetryWrapper(object): 116 """A wrapper that wraps retry logic around any tasklet.""" 117 118 def __init__(self, 119 retry_params, 120 retriable_exceptions=_RETRIABLE_EXCEPTIONS, 121 should_retry=lambda r: False): 122 """Init. 123 124 Args: 125 retry_params: an RetryParams instance. 126 retriable_exceptions: a list of exception classes that are retriable. 127 should_retry: a function that takes a result from the tasklet and returns 128 a boolean. True if the result should be retried. 129 """ 130 self.retry_params = retry_params 131 self.retriable_exceptions = retriable_exceptions 132 self.should_retry = should_retry 133 134 @ndb.tasklet 135 def run(self, tasklet, **kwds): 136 """Run a tasklet with retry. 137 138 The retry should be transparent to the caller: if no results 139 are successful, the exception or result from the last retry is returned 140 to the caller. 141 142 Args: 143 tasklet: the tasklet to run. 144 **kwds: keywords arguments to run the tasklet. 145 146 Raises: 147 The exception from running the tasklet. 148 149 Returns: 150 The result from running the tasklet. 151 """ 152 start_time = time.time() 153 n = 1 154 155 while True: 156 e = None 157 result = None 158 got_result = False 159 160 try: 161 result = yield tasklet(**kwds) 162 got_result = True 163 if not self.should_retry(result): 164 raise ndb.Return(result) 165 except runtime.DeadlineExceededError: 166 logging.debug( 167 'Tasklet has exceeded request deadline after %s seconds total', 168 time.time() - start_time) 169 raise 170 except self.retriable_exceptions, e: 171 pass 172 173 if n == 1: 174 logging.debug('Tasklet is %r', tasklet) 175 176 delay = self.retry_params.delay(n, start_time) 177 178 if delay <= 0: 179 logging.debug( 180 'Tasklet failed after %s attempts and %s seconds in total', 181 n, time.time() - start_time) 182 if got_result: 183 raise ndb.Return(result) 184 elif e is not None: 185 raise e 186 else: 187 assert False, 'Should never reach here.' 188 189 if got_result: 190 logging.debug( 191 'Got result %r from tasklet.', result) 192 else: 193 logging.debug( 194 'Got exception "%r" from tasklet.', e) 195 logging.debug('Retry in %s seconds.', delay) 196 n += 1 197 yield tasklets.sleep(delay) 198 199 200class RetryParams(object): 201 """Retry configuration parameters.""" 202 203 _DEFAULT_USER_AGENT = 'App Engine Python GCS Client' 204 205 @datastore_rpc._positional(1) 206 def __init__(self, 207 backoff_factor=2.0, 208 initial_delay=0.1, 209 max_delay=10.0, 210 min_retries=3, 211 max_retries=6, 212 max_retry_period=30.0, 213 urlfetch_timeout=None, 214 save_access_token=False, 215 _user_agent=None): 216 """Init. 217 218 This object is unique per request per thread. 219 220 Library will retry according to this setting when App Engine Server 221 can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or 222 500-600 response. 223 224 Args: 225 backoff_factor: exponential backoff multiplier. 226 initial_delay: seconds to delay for the first retry. 227 max_delay: max seconds to delay for every retry. 228 min_retries: min number of times to retry. This value is automatically 229 capped by max_retries. 230 max_retries: max number of times to retry. Set this to 0 for no retry. 231 max_retry_period: max total seconds spent on retry. Retry stops when 232 this period passed AND min_retries has been attempted. 233 urlfetch_timeout: timeout for urlfetch in seconds. Could be None, 234 in which case the value will be chosen by urlfetch module. 235 save_access_token: persist access token to datastore to avoid 236 excessive usage of GetAccessToken API. Usually the token is cached 237 in process and in memcache. In some cases, memcache isn't very 238 reliable. 239 _user_agent: The user agent string that you want to use in your requests. 240 """ 241 self.backoff_factor = self._check('backoff_factor', backoff_factor) 242 self.initial_delay = self._check('initial_delay', initial_delay) 243 self.max_delay = self._check('max_delay', max_delay) 244 self.max_retry_period = self._check('max_retry_period', max_retry_period) 245 self.max_retries = self._check('max_retries', max_retries, True, int) 246 self.min_retries = self._check('min_retries', min_retries, True, int) 247 if self.min_retries > self.max_retries: 248 self.min_retries = self.max_retries 249 250 self.urlfetch_timeout = None 251 if urlfetch_timeout is not None: 252 self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout) 253 self.save_access_token = self._check('save_access_token', save_access_token, 254 True, bool) 255 self._user_agent = _user_agent or self._DEFAULT_USER_AGENT 256 257 self._request_id = os.getenv('REQUEST_LOG_ID') 258 259 def __eq__(self, other): 260 if not isinstance(other, self.__class__): 261 return False 262 return self.__dict__ == other.__dict__ 263 264 def __ne__(self, other): 265 return not self.__eq__(other) 266 267 @classmethod 268 def _check(cls, name, val, can_be_zero=False, val_type=float): 269 """Check init arguments. 270 271 Args: 272 name: name of the argument. For logging purpose. 273 val: value. Value has to be non negative number. 274 can_be_zero: whether value can be zero. 275 val_type: Python type of the value. 276 277 Returns: 278 The value. 279 280 Raises: 281 ValueError: when invalid value is passed in. 282 TypeError: when invalid value type is passed in. 283 """ 284 valid_types = [val_type] 285 if val_type is float: 286 valid_types.append(int) 287 288 if type(val) not in valid_types: 289 raise TypeError( 290 'Expect type %s for parameter %s' % (val_type.__name__, name)) 291 if val < 0: 292 raise ValueError( 293 'Value for parameter %s has to be greater than 0' % name) 294 if not can_be_zero and val == 0: 295 raise ValueError( 296 'Value for parameter %s can not be 0' % name) 297 return val 298 299 def belong_to_current_request(self): 300 return os.getenv('REQUEST_LOG_ID') == self._request_id 301 302 def delay(self, n, start_time): 303 """Calculate delay before the next retry. 304 305 Args: 306 n: the number of current attempt. The first attempt should be 1. 307 start_time: the time when retry started in unix time. 308 309 Returns: 310 Number of seconds to wait before next retry. -1 if retry should give up. 311 """ 312 if (n > self.max_retries or 313 (n > self.min_retries and 314 time.time() - start_time > self.max_retry_period)): 315 return -1 316 return min( 317 math.pow(self.backoff_factor, n-1) * self.initial_delay, 318 self.max_delay) 319 320 321def _run_until_rpc(): 322 """Eagerly evaluate tasklets until it is blocking on some RPC. 323 324 Usually ndb eventloop el isn't run until some code calls future.get_result(). 325 326 When an async tasklet is called, the tasklet wrapper evaluates the tasklet 327 code into a generator, enqueues a callback _help_tasklet_along onto 328 the el.current queue, and returns a future. 329 330 _help_tasklet_along, when called by the el, will 331 get one yielded value from the generator. If the value if another future, 332 set up a callback _on_future_complete to invoke _help_tasklet_along 333 when the dependent future fulfills. If the value if a RPC, set up a 334 callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills. 335 Thus _help_tasklet_along drills down 336 the chain of futures until some future is blocked by RPC. El runs 337 all callbacks and constantly check pending RPC status. 338 """ 339 el = eventloop.get_event_loop() 340 while el.current: 341 el.run0() 342 343 344def _eager_tasklet(tasklet): 345 """Decorator to turn tasklet to run eagerly.""" 346 347 @utils.wrapping(tasklet) 348 def eager_wrapper(*args, **kwds): 349 fut = tasklet(*args, **kwds) 350 _run_until_rpc() 351 return fut 352 353 return eager_wrapper 354