1#!/usr/bin/env python 2"""Parameters to control Mapreduce.""" 3 4__all__ = ["CONFIG_NAMESPACE", 5 "config"] 6 7import pickle 8 9 10# To break circular dependency and more. 11# pylint: disable=g-import-not-at-top 12 13 14# For the mapreduce in python 25 runtime, this import will fail. 15# TODO(user): Remove all pipeline import protections after 25 mr defunct. 16try: 17 from pipeline import util as pipeline_util 18except ImportError: 19 pipeline_util = None 20 21from google.appengine.api import lib_config 22 23CONFIG_NAMESPACE = "mapreduce" 24 25 26# pylint: disable=protected-access 27# pylint: disable=invalid-name 28 29 30class _JobConfigMeta(type): 31 """Metaclass that controls class creation.""" 32 33 _OPTIONS = "_options" 34 _REQUIRED = "_required" 35 36 def __new__(mcs, classname, bases, class_dict): 37 """Creates a _Config class and modifies its class dict. 38 39 Args: 40 classname: name of the class. 41 bases: a list of base classes. 42 class_dict: original class dict. 43 44 Returns: 45 A new _Config class. The modified class will have two fields. 46 _options field is a dict from option name to _Option objects. 47 _required field is a set of required option names. 48 """ 49 options = {} 50 required = set() 51 for name, option in class_dict.iteritems(): 52 if isinstance(option, _Option): 53 options[name] = option 54 if option.required: 55 required.add(name) 56 57 for name in options: 58 class_dict.pop(name) 59 class_dict[mcs._OPTIONS] = options 60 class_dict[mcs._REQUIRED] = required 61 cls = type.__new__(mcs, classname, bases, class_dict) 62 63 # Handle inheritance of _Config. 64 if object not in bases: 65 parent_options = {} 66 # Update options from the root down. 67 for c in reversed(cls.__mro__): 68 if mcs._OPTIONS in c.__dict__: 69 # Children override parent. 70 parent_options.update(c.__dict__[mcs._OPTIONS]) 71 if mcs._REQUIRED in c.__dict__: 72 required.update(c.__dict__[mcs._REQUIRED]) 73 for k, v in parent_options.iteritems(): 74 if k not in options: 75 options[k] = v 76 return cls 77 78 79class _Option(object): 80 """An option for _Config.""" 81 82 def __init__(self, kind, required=False, default_factory=None, 83 can_be_none=False): 84 """Init. 85 86 Args: 87 kind: type of the option. 88 required: whether user is required to supply a value. 89 default_factory: a factory, when called, returns the default value. 90 can_be_none: whether value can be None. 91 92 Raises: 93 ValueError: if arguments aren't compatible. 94 """ 95 if required and default_factory is not None: 96 raise ValueError("No default_factory value when option is required.") 97 self.kind = kind 98 self.required = required 99 self.default_factory = default_factory 100 self.can_be_none = can_be_none 101 102 103class _Config(object): 104 """Root class for all per job configuration.""" 105 106 __metaclass__ = _JobConfigMeta 107 108 def __init__(self, _lenient=False, **kwds): 109 """Init. 110 111 Args: 112 _lenient: When true, no option is required. 113 **kwds: keyword arguments for options and their values. 114 """ 115 self._verify_keys(kwds, _lenient) 116 self._set_values(kwds, _lenient) 117 118 def _verify_keys(self, kwds, _lenient): 119 keys = set() 120 for k in kwds: 121 if k not in self._options: 122 raise ValueError("Option %s is not supported." % (k)) 123 keys.add(k) 124 if not _lenient: 125 missing = self._required - keys 126 if missing: 127 raise ValueError("Options %s are required." % tuple(missing)) 128 129 def _set_values(self, kwds, _lenient): 130 for k, option in self._options.iteritems(): 131 v = kwds.get(k) 132 if v is None and option.default_factory: 133 v = option.default_factory() 134 setattr(self, k, v) 135 if _lenient: 136 continue 137 if v is None and option.can_be_none: 138 continue 139 if isinstance(v, type) and not issubclass(v, option.kind): 140 raise TypeError( 141 "Expect subclass of %r for option %s. Got %r" % ( 142 option.kind, k, v)) 143 if not isinstance(v, type) and not isinstance(v, option.kind): 144 raise TypeError("Expect type %r for option %s. Got %r" % ( 145 option.kind, k, v)) 146 147 def __eq__(self, other): 148 if not isinstance(other, self.__class__): 149 return False 150 return other.__dict__ == self.__dict__ 151 152 def __repr__(self): 153 return str(self.__dict__) 154 155 def to_json(self): 156 return {"config": pickle.dumps(self)} 157 158 @classmethod 159 def from_json(cls, json): 160 return pickle.loads(json["config"]) 161 162 163# TODO(user): Make more of these private. 164class _ConfigDefaults(object): 165 """Default configs. 166 167 Do not change parameters whose names begin with _. 168 169 SHARD_MAX_ATTEMPTS: Max attempts to execute a shard before giving up. 170 171 TASK_MAX_ATTEMPTS: Max attempts to execute a task before dropping it. Task 172 is any taskqueue task created by MR framework. A task is dropped 173 when its X-AppEngine-TaskExecutionCount is bigger than this number. 174 Dropping a task will cause abort on the entire MR job. 175 176 TASK_MAX_DATA_PROCESSING_ATTEMPTS: 177 Max times to execute a task when previous task attempts failed during 178 data processing stage. An MR work task has three major stages: 179 initial setup, data processing, and final checkpoint. 180 Setup stage should be allowed to be retried more times than data processing 181 stage: setup failures are caused by unavailable GAE services while 182 data processing failures are mostly due to user function error out on 183 certain input data. Thus, set TASK_MAX_ATTEMPTS higher than this parameter. 184 185 QUEUE_NAME: Default queue for MR. 186 187 SHARD_COUNT: Default shard count. 188 189 PROCESSING_RATE_PER_SEC: Default rate of processed entities per second. 190 191 BASE_PATH : Base path of mapreduce and pipeline handlers. 192 """ 193 194 SHARD_MAX_ATTEMPTS = 4 195 196 # Arbitrary big number. 197 TASK_MAX_ATTEMPTS = 31 198 199 TASK_MAX_DATA_PROCESSING_ATTEMPTS = 11 200 201 QUEUE_NAME = "default" 202 203 SHARD_COUNT = 8 204 205 # Maximum number of mapper calls per second. 206 # This parameter is useful for testing to force short slices. 207 # Maybe make this a private constant instead. 208 # If people want to rate limit their jobs, they can reduce shard count. 209 PROCESSING_RATE_PER_SEC = 1000000 210 211 # This path will be changed by build process when this is a part of SDK. 212 BASE_PATH = "/mapreduce" 213 214 # TODO(user): find a proper value for this. 215 # The amount of time to perform scanning in one slice. New slice will be 216 # scheduled as soon as current one takes this long. 217 _SLICE_DURATION_SEC = 15 218 219 # Delay between consecutive controller callback invocations. 220 _CONTROLLER_PERIOD_SEC = 2 221 222 223# TODO(user): changes this name to app_config 224config = lib_config.register(CONFIG_NAMESPACE, _ConfigDefaults.__dict__) 225 226 227# The following are constants that depends on the value of _config. 228# They are constants because _config is completely initialized on the first 229# request of an instance and will never change until user deploy a new version. 230_DEFAULT_PIPELINE_BASE_PATH = config.BASE_PATH + "/pipeline" 231# See b/11341023 for context. 232_GCS_URLFETCH_TIMEOUT_SEC = 30 233# If a lock has been held longer than this value, mapreduce will start to use 234# logs API to check if the request has ended. 235_LEASE_DURATION_SEC = config._SLICE_DURATION_SEC * 1.1 236# In rare occasions, Logs API misses log entries. Thus 237# if a lock has been held longer than this timeout, mapreduce assumes the 238# request holding the lock has died, regardless of Logs API. 239# 10 mins is taskqueue task timeout on a frontend. 240_MAX_LEASE_DURATION_SEC = max(10 * 60 + 30, config._SLICE_DURATION_SEC * 1.5) 241