1#!/usr/bin/env python 2# Copyright 2010 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Utility functions for use with the mapreduce library.""" 17 18# pylint: disable=g-bad-name 19 20 21 22__all__ = [ 23 "create_datastore_write_config", 24 "for_name", 25 "get_queue_name", 26 "get_short_name", 27 "handler_for_name", 28 "is_generator", 29 "parse_bool", 30 "total_seconds", 31 "try_serialize_handler", 32 "try_deserialize_handler", 33 "CALLBACK_MR_ID_TASK_HEADER", 34 "strip_prefix_from_items" 35 ] 36 37import inspect 38import os 39import pickle 40import random 41import sys 42import time 43import types 44 45from google.appengine.ext import ndb 46 47from google.appengine.datastore import datastore_rpc 48from mapreduce import parameters 49 50# Taskqueue task header for mr id. Use internal by MR. 51_MR_ID_TASK_HEADER = "AE-MR-ID" 52_MR_SHARD_ID_TASK_HEADER = "AE-MR-SHARD-ID" 53 54# Callback task MR ID task header 55CALLBACK_MR_ID_TASK_HEADER = "Mapreduce-Id" 56 57 58# Ridiculous future UNIX epoch time, 500 years from now. 59_FUTURE_TIME = 2**34 60 61 62def _get_descending_key(gettime=time.time): 63 """Returns a key name lexically ordered by time descending. 64 65 This lets us have a key name for use with Datastore entities which returns 66 rows in time descending order when it is scanned in lexically ascending order, 67 allowing us to bypass index building for descending indexes. 68 69 Args: 70 gettime: Used for testing. 71 72 Returns: 73 A string with a time descending key. 74 """ 75 now_descending = int((_FUTURE_TIME - gettime()) * 100) 76 request_id_hash = os.environ.get("REQUEST_ID_HASH") 77 if not request_id_hash: 78 request_id_hash = str(random.getrandbits(32)) 79 return "%d%s" % (now_descending, request_id_hash) 80 81 82def _get_task_host(): 83 """Get the Host header value for all mr tasks. 84 85 Task Host header determines which instance this task would be routed to. 86 87 Current version id format is: v7.368834058928280579 88 Current module id is just the module's name. It could be "default" 89 Default version hostname is app_id.appspot.com 90 91 Returns: 92 A complete host name is of format version.module.app_id.appspot.com 93 If module is the default module, just version.app_id.appspot.com. The reason 94 is if an app doesn't have modules enabled and the url is 95 "version.default.app_id", "version" is ignored and "default" is used as 96 version. If "default" version doesn't exist, the url is routed to the 97 default version. 98 """ 99 version = os.environ["CURRENT_VERSION_ID"].split(".")[0] 100 default_host = os.environ["DEFAULT_VERSION_HOSTNAME"] 101 module = os.environ["CURRENT_MODULE_ID"] 102 if os.environ["CURRENT_MODULE_ID"] == "default": 103 return "%s.%s" % (version, default_host) 104 return "%s.%s.%s" % (version, module, default_host) 105 106 107def _get_task_headers(map_job_id, 108 mr_id_header_key=_MR_ID_TASK_HEADER): 109 """Get headers for all mr tasks. 110 111 Args: 112 map_job_id: map job id. 113 mr_id_header_key: the key to set mr id with. 114 115 Returns: 116 A dictionary of all headers. 117 """ 118 return {mr_id_header_key: map_job_id, 119 "Host": _get_task_host()} 120 121 122def _enum(**enums): 123 """Helper to create enum.""" 124 return type("Enum", (), enums) 125 126 127def get_queue_name(queue_name): 128 """Determine which queue MR should run on. 129 130 How to choose the queue: 131 1. If user provided one, use that. 132 2. If we are starting a mr from taskqueue, inherit that queue. 133 If it's a special queue, fall back to the default queue. 134 3. Default queue. 135 136 If user is using any MR pipeline interface, pipeline.start takes a 137 "queue_name" argument. The pipeline will run on that queue and MR will 138 simply inherit the queue_name. 139 140 Args: 141 queue_name: queue_name from user. Maybe None. 142 143 Returns: 144 The queue name to run on. 145 """ 146 if queue_name: 147 return queue_name 148 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", 149 parameters.config.QUEUE_NAME) 150 if len(queue_name) > 1 and queue_name[0:2] == "__": 151 # We are currently in some special queue. E.g. __cron. 152 return parameters.config.QUEUE_NAME 153 else: 154 return queue_name 155 156 157def total_seconds(td): 158 """convert a timedelta to seconds. 159 160 This is patterned after timedelta.total_seconds, which is only 161 available in python 27. 162 163 Args: 164 td: a timedelta object. 165 166 Returns: 167 total seconds within a timedelta. Rounded up to seconds. 168 """ 169 secs = td.seconds + td.days * 24 * 3600 170 if td.microseconds: 171 secs += 1 172 return secs 173 174 175def for_name(fq_name, recursive=False): 176 """Find class/function/method specified by its fully qualified name. 177 178 Fully qualified can be specified as: 179 * <module_name>.<class_name> 180 * <module_name>.<function_name> 181 * <module_name>.<class_name>.<method_name> (an unbound method will be 182 returned in this case). 183 184 for_name works by doing __import__ for <module_name>, and looks for 185 <class_name>/<function_name> in module's __dict__/attrs. If fully qualified 186 name doesn't contain '.', the current module will be used. 187 188 Args: 189 fq_name: fully qualified name of something to find. 190 recursive: run recursively or not. 191 192 Returns: 193 class object or None if fq_name is None. 194 195 Raises: 196 ImportError: when specified module could not be loaded or the class 197 was not found in the module. 198 """ 199# if "." not in fq_name: 200# raise ImportError("'%s' is not a full-qualified name" % fq_name) 201 202 if fq_name is None: 203 return 204 205 fq_name = str(fq_name) 206 module_name = __name__ 207 short_name = fq_name 208 209 if fq_name.rfind(".") >= 0: 210 (module_name, short_name) = (fq_name[:fq_name.rfind(".")], 211 fq_name[fq_name.rfind(".") + 1:]) 212 213 try: 214 result = __import__(module_name, None, None, [short_name]) 215 return result.__dict__[short_name] 216 except KeyError: 217 # If we're recursively inside a for_name() chain, then we want to raise 218 # this error as a key error so we can report the actual source of the 219 # problem. If we're *not* recursively being called, that means the 220 # module was found and the specific item could not be loaded, and thus 221 # we want to raise an ImportError directly. 222 if recursive: 223 raise 224 else: 225 raise ImportError("Could not find '%s' on path '%s'" % ( 226 short_name, module_name)) 227 except ImportError: 228 # module_name is not actually a module. Try for_name for it to figure 229 # out what's this. 230 try: 231 module = for_name(module_name, recursive=True) 232 if hasattr(module, short_name): 233 return getattr(module, short_name) 234 else: 235 # The module was found, but the function component is missing. 236 raise KeyError() 237 except KeyError: 238 raise ImportError("Could not find '%s' on path '%s'" % ( 239 short_name, module_name)) 240 except ImportError: 241 # This means recursive import attempts failed, thus we will raise the 242 # first ImportError we encountered, since it's likely the most accurate. 243 pass 244 # Raise the original import error that caused all of this, since it is 245 # likely the real cause of the overall problem. 246 raise 247 248 249def handler_for_name(fq_name): 250 """Resolves and instantiates handler by fully qualified name. 251 252 First resolves the name using for_name call. Then if it resolves to a class, 253 instantiates a class, if it resolves to a method - instantiates the class and 254 binds method to the instance. 255 256 Args: 257 fq_name: fully qualified name of something to find. 258 259 Returns: 260 handler instance which is ready to be called. 261 """ 262 resolved_name = for_name(fq_name) 263 if isinstance(resolved_name, (type, types.ClassType)): 264 # create new instance if this is type 265 return resolved_name() 266 elif isinstance(resolved_name, types.MethodType): 267 # bind the method 268 return getattr(resolved_name.im_class(), resolved_name.__name__) 269 else: 270 return resolved_name 271 272 273def try_serialize_handler(handler): 274 """Try to serialize map/reduce handler. 275 276 Args: 277 handler: handler function/instance. Handler can be a function or an 278 instance of a callable class. In the latter case, the handler will 279 be serialized across slices to allow users to save states. 280 281 Returns: 282 serialized handler string or None. 283 """ 284 if (isinstance(handler, types.InstanceType) or # old style class 285 (isinstance(handler, object) and # new style class 286 not inspect.isfunction(handler) and 287 not inspect.ismethod(handler)) and 288 hasattr(handler, "__call__")): 289 return pickle.dumps(handler) 290 return None 291 292 293def try_deserialize_handler(serialized_handler): 294 """Reverse function of try_serialize_handler. 295 296 Args: 297 serialized_handler: serialized handler str or None. 298 299 Returns: 300 handler instance or None. 301 """ 302 if serialized_handler: 303 return pickle.loads(serialized_handler) 304 305 306def is_generator(obj): 307 """Return true if the object is generator or generator function. 308 309 Generator function objects provides same attributes as functions. 310 See isfunction.__doc__ for attributes listing. 311 312 Adapted from Python 2.6. 313 314 Args: 315 obj: an object to test. 316 317 Returns: 318 true if the object is generator function. 319 """ 320 if isinstance(obj, types.GeneratorType): 321 return True 322 323 CO_GENERATOR = 0x20 324 return bool(((inspect.isfunction(obj) or inspect.ismethod(obj)) and 325 obj.func_code.co_flags & CO_GENERATOR)) 326 327 328def get_short_name(fq_name): 329 """Returns the last component of the name.""" 330 return fq_name.split(".")[-1:][0] 331 332 333def parse_bool(obj): 334 """Return true if the object represents a truth value, false otherwise. 335 336 For bool and numeric objects, uses Python's built-in bool function. For 337 str objects, checks string against a list of possible truth values. 338 339 Args: 340 obj: object to determine boolean value of; expected 341 342 Returns: 343 Boolean value according to 5.1 of Python docs if object is not a str 344 object. For str objects, return True if str is in TRUTH_VALUE_SET 345 and False otherwise. 346 http://docs.python.org/library/stdtypes.html 347 """ 348 if type(obj) is str: 349 TRUTH_VALUE_SET = ["true", "1", "yes", "t", "on"] 350 return obj.lower() in TRUTH_VALUE_SET 351 else: 352 return bool(obj) 353 354 355def create_datastore_write_config(mapreduce_spec): 356 """Creates datastore config to use in write operations. 357 358 Args: 359 mapreduce_spec: current mapreduce specification as MapreduceSpec. 360 361 Returns: 362 an instance of datastore_rpc.Configuration to use for all write 363 operations in the mapreduce. 364 """ 365 force_writes = parse_bool(mapreduce_spec.params.get("force_writes", "false")) 366 if force_writes: 367 return datastore_rpc.Configuration(force_writes=force_writes) 368 else: 369 # dev server doesn't support force_writes. 370 return datastore_rpc.Configuration() 371 372 373def _set_ndb_cache_policy(): 374 """Tell NDB to never cache anything in memcache or in-process. 375 376 This ensures that entities fetched from Datastore input_readers via NDB 377 will not bloat up the request memory size and Datastore Puts will avoid 378 doing calls to memcache. Without this you get soft memory limit exits, 379 which hurts overall throughput. 380 """ 381 ndb_ctx = ndb.get_context() 382 ndb_ctx.set_cache_policy(lambda key: False) 383 ndb_ctx.set_memcache_policy(lambda key: False) 384 385 386def _obj_to_path(obj): 387 """Returns the fully qualified path to the object. 388 389 Args: 390 obj: obj must be a new style top level class, or a top level function. 391 No inner function or static method. 392 393 Returns: 394 Fully qualified path to the object. 395 396 Raises: 397 TypeError: when argument obj has unsupported type. 398 ValueError: when obj can't be discovered on the top level. 399 """ 400 if obj is None: 401 return obj 402 403 if inspect.isclass(obj) or inspect.isfunction(obj): 404 fetched = getattr(sys.modules[obj.__module__], obj.__name__, None) 405 if fetched is None: 406 raise ValueError( 407 "Object %r must be defined on the top level of a module." % obj) 408 return "%s.%s" % (obj.__module__, obj.__name__) 409 raise TypeError("Unexpected type %s." % type(obj)) 410 411 412def strip_prefix_from_items(prefix, items): 413 """Strips out the prefix from each of the items if it is present. 414 415 Args: 416 prefix: the string for that you wish to strip from the beginning of each 417 of the items. 418 items: a list of strings that may or may not contain the prefix you want 419 to strip out. 420 421 Returns: 422 items_no_prefix: a copy of the list of items (same order) without the 423 prefix (if present). 424 """ 425 items_no_prefix = [] 426 for item in items: 427 if item.startswith(prefix): 428 items_no_prefix.append(item[len(prefix):]) 429 else: 430 items_no_prefix.append(item) 431 return items_no_prefix 432