1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Mapreduce execution context. 18 19Mapreduce context provides handler code with information about 20current mapreduce execution and organizes utility data flow 21from handlers such as counters, log messages, mutation pools. 22""" 23 24 25 26__all__ = ["get", 27 "Pool", 28 "Context", 29 "COUNTER_MAPPER_CALLS", 30 "COUNTER_MAPPER_WALLTIME_MS", 31 "DATASTORE_DEADLINE", 32 "MAX_ENTITY_COUNT", 33 ] 34 35import heapq 36import logging 37import threading 38 39try: 40 from google.appengine.ext import ndb 41except ImportError: 42 ndb = None 43from google.appengine.api import datastore 44from google.appengine.ext import db 45from google.appengine.runtime import apiproxy_errors 46 47 48# Maximum number of items. Pool will be flushed when reaches this amount. 49# Datastore API is smart to group entities into as few RPCs as possible without 50# exceeding RPC size. So in theory, MAX_ENTITY_COUNT can be as big as 51# the instance's memory can hold. 52# This number is just an estimate. 53# TODO(user): Do batching by entity size if cheap. b/10427424 54MAX_ENTITY_COUNT = 20 55 56# Deadline in seconds for mutation pool datastore operations. 57DATASTORE_DEADLINE = 15 58 59# The name of the counter which counts all mapper calls. 60COUNTER_MAPPER_CALLS = "mapper-calls" 61 62# Total walltime in msec given to mapper process. This is not just mapper 63# hundler function, but includes all i/o overhead. 64COUNTER_MAPPER_WALLTIME_MS = "mapper-walltime-ms" 65 66 67# pylint: disable=protected-access 68# pylint: disable=g-bad-name 69 70 71def _normalize_entity(value): 72 """Return an entity from an entity or model instance.""" 73 if ndb is not None and isinstance(value, ndb.Model): 74 return None 75 if getattr(value, "_populate_internal_entity", None): 76 return value._populate_internal_entity() 77 return value 78 79 80def _normalize_key(value): 81 """Return a key from an entity, model instance, key, or key string.""" 82 if ndb is not None and isinstance(value, (ndb.Model, ndb.Key)): 83 return None 84 if getattr(value, "key", None): 85 return value.key() 86 elif isinstance(value, basestring): 87 return datastore.Key(value) 88 else: 89 return value 90 91 92class _ItemList(object): 93 """A buffer that holds arbitrary items and auto flushes them when full. 94 95 Callers of this class provides the logic on how to flush. 96 This class takes care of the common logic of when to flush and when to retry. 97 98 Properties: 99 items: list of objects. 100 length: length of item list. 101 size: aggregate item size in bytes. 102 """ 103 104 DEFAULT_RETRIES = 3 105 _LARGEST_ITEMS_TO_LOG = 5 106 107 def __init__(self, 108 max_entity_count, 109 flush_function, 110 timeout_retries=DEFAULT_RETRIES, 111 repr_function=None): 112 """Constructor. 113 114 Args: 115 max_entity_count: maximum number of entities before flushing it to db. 116 flush_function: a function that can flush the items. The function is 117 called with a list of items as the first argument, a dict of options 118 as second argument. Currently options can contain {"deadline": int}. 119 see self.flush on how the function is called. 120 timeout_retries: how many times to retry upon timeouts. 121 repr_function: a function that turns an item into meaningful 122 representation. For debugging large items. 123 """ 124 self.items = [] 125 self.__max_entity_count = int(max_entity_count) 126 self.__flush_function = flush_function 127 self.__repr_function = repr_function 128 self.__timeout_retries = int(timeout_retries) 129 130 def __str__(self): 131 return "ItemList of with %s items" % len(self.items) 132 133 def append(self, item): 134 """Add new item to the list. 135 136 If needed, append will first flush existing items and clear existing items. 137 138 Args: 139 item: an item to add to the list. 140 """ 141 if self.should_flush(): 142 self.flush() 143 self.items.append(item) 144 145 def flush(self): 146 """Force a flush.""" 147 if not self.items: 148 return 149 150 retry = 0 151 options = {"deadline": DATASTORE_DEADLINE} 152 while retry <= self.__timeout_retries: 153 try: 154 self.__flush_function(self.items, options) 155 self.clear() 156 break 157 except db.Timeout, e: 158 logging.warning(e) 159 logging.warning("Flushing '%s' timed out. Will retry for the %s time.", 160 self, retry) 161 retry += 1 162 options["deadline"] *= 2 163 except apiproxy_errors.RequestTooLargeError: 164 self._log_largest_items() 165 raise 166 else: 167 raise 168 169 def _log_largest_items(self): 170 if not self.__repr_function: 171 logging.error("Got RequestTooLargeError but can't interpret items in " 172 "_ItemList %s.", self) 173 return 174 175 sizes = [len(self.__repr_function(i)) for i in self.items] 176 largest = heapq.nlargest(self._LARGEST_ITEMS_TO_LOG, 177 zip(sizes, self.items), 178 lambda t: t[0]) 179 # Set field for for test only. 180 self._largest = [(s, self.__repr_function(i)) for s, i in largest] 181 logging.error("Got RequestTooLargeError. Largest items: %r", self._largest) 182 183 def clear(self): 184 """Clear item list.""" 185 self.items = [] 186 187 def should_flush(self): 188 """Whether to flush before append the next entity. 189 190 Returns: 191 True to flush. False other. 192 """ 193 return len(self.items) >= self.__max_entity_count 194 195 196class Pool(object): 197 """Mutation pool accumulates changes to perform them in patch. 198 199 Any Pool subclass should not be public. Instead, Pool should define an 200 operation.Operation class and let user uses that. For example, in a map 201 function, user can do: 202 203 def map(foo): 204 yield OperationOnMyPool(any_argument) 205 206 Since Operation is a callable object, Mapreduce library will invoke 207 any Operation object that is yielded with context.Context instance. 208 The operation object can then access MyPool from Context.get_pool. 209 """ 210 211 def flush(self): 212 """Flush all changes.""" 213 raise NotImplementedError() 214 215 216class _MutationPool(Pool): 217 """Mutation pool accumulates datastore changes to perform them in batch. 218 219 Properties: 220 puts: _ItemList of entities to put to datastore. 221 deletes: _ItemList of keys to delete from datastore. 222 ndb_puts: _ItemList of ndb entities to put to datastore. 223 ndb_deletes: _ItemList of ndb keys to delete from datastore. 224 """ 225 226 def __init__(self, 227 max_entity_count=MAX_ENTITY_COUNT, 228 mapreduce_spec=None): 229 """Constructor. 230 231 Args: 232 max_entity_count: maximum number of entities before flushing it to db. 233 mapreduce_spec: An optional instance of MapperSpec. 234 """ 235 self.max_entity_count = max_entity_count 236 params = mapreduce_spec.params if mapreduce_spec is not None else {} 237 self.force_writes = bool(params.get("force_ops_writes", False)) 238 self.puts = _ItemList(max_entity_count, 239 self._flush_puts, 240 repr_function=self._db_repr) 241 self.deletes = _ItemList(max_entity_count, 242 self._flush_deletes) 243 self.ndb_puts = _ItemList(max_entity_count, 244 self._flush_ndb_puts, 245 repr_function=self._ndb_repr) 246 self.ndb_deletes = _ItemList(max_entity_count, 247 self._flush_ndb_deletes) 248 249 def put(self, entity): 250 """Registers entity to put to datastore. 251 252 Args: 253 entity: an entity or model instance to put. 254 """ 255 actual_entity = _normalize_entity(entity) 256 if actual_entity is None: 257 return self.ndb_put(entity) 258 self.puts.append(actual_entity) 259 260 def ndb_put(self, entity): 261 """Like put(), but for NDB entities.""" 262 assert ndb is not None and isinstance(entity, ndb.Model) 263 self.ndb_puts.append(entity) 264 265 def delete(self, entity): 266 """Registers entity to delete from datastore. 267 268 Args: 269 entity: an entity, model instance, or key to delete. 270 """ 271 key = _normalize_key(entity) 272 if key is None: 273 return self.ndb_delete(entity) 274 self.deletes.append(key) 275 276 def ndb_delete(self, entity_or_key): 277 """Like delete(), but for NDB entities/keys.""" 278 if ndb is not None and isinstance(entity_or_key, ndb.Model): 279 key = entity_or_key.key 280 else: 281 key = entity_or_key 282 self.ndb_deletes.append(key) 283 284 def flush(self): 285 """Flush(apply) all changed to datastore.""" 286 self.puts.flush() 287 self.deletes.flush() 288 self.ndb_puts.flush() 289 self.ndb_deletes.flush() 290 291 @classmethod 292 def _db_repr(cls, entity): 293 """Converts entity to a readable repr. 294 295 Args: 296 entity: datastore.Entity or datastore_types.Key. 297 298 Returns: 299 Proto in str. 300 """ 301 return str(entity._ToPb()) 302 303 @classmethod 304 def _ndb_repr(cls, entity): 305 """Converts entity to a readable repr. 306 307 Args: 308 entity: ndb.Model 309 310 Returns: 311 Proto in str. 312 """ 313 return str(entity._to_pb()) 314 315 def _flush_puts(self, items, options): 316 """Flush all puts to datastore.""" 317 datastore.Put(items, config=self._create_config(options)) 318 319 def _flush_deletes(self, items, options): 320 """Flush all deletes to datastore.""" 321 datastore.Delete(items, config=self._create_config(options)) 322 323 def _flush_ndb_puts(self, items, options): 324 """Flush all NDB puts to datastore.""" 325 assert ndb is not None 326 ndb.put_multi(items, config=self._create_config(options)) 327 328 def _flush_ndb_deletes(self, items, options): 329 """Flush all deletes to datastore.""" 330 assert ndb is not None 331 ndb.delete_multi(items, config=self._create_config(options)) 332 333 def _create_config(self, options): 334 """Creates datastore Config. 335 336 Returns: 337 A datastore_rpc.Configuration instance. 338 """ 339 return datastore.CreateConfig(deadline=options["deadline"], 340 force_writes=self.force_writes) 341 342 343class _Counters(Pool): 344 """Regulates access to counters. 345 346 Counters Pool is a str to int map. It is saved as part of ShardState so it 347 is flushed when ShardState commits to datastore successfully. 348 """ 349 350 def __init__(self, shard_state): 351 """Constructor. 352 353 Args: 354 shard_state: current mapreduce shard state as model.ShardState. 355 """ 356 self._shard_state = shard_state 357 358 def increment(self, counter_name, delta=1): 359 """Increment counter value. 360 361 Args: 362 counter_name: name of the counter as string. 363 delta: increment delta as int. 364 """ 365 self._shard_state.counters_map.increment(counter_name, delta) 366 367 def flush(self): 368 """Flush unsaved counter values.""" 369 pass 370 371 372# TODO(user): Define what fields should be public. 373class Context(object): 374 """MapReduce execution context. 375 376 The main purpose of Context is to facilitate IO. User code, input reader, 377 and output writer code can plug in pools (see Pool class) to Context to 378 batch operations. 379 380 There is a single Context instance associated with each worker thread. 381 It can be accessed via context.get(). handlers.MapperWorkerHandler creates 382 this instance before any IO code (input_reader, output_writer, user functions) 383 is called. 384 385 Each Pool decides how to batch and when to flush. 386 Context and all its pools are flushed by the end of a slice. 387 Upon error in slice execution, what is flushed is undefined. (See _Counters 388 for an exception). 389 390 Properties: 391 mapreduce_spec: current mapreduce specification as model.MapreduceSpec. 392 """ 393 394 # Current context instance 395 _local = threading.local() 396 397 def __init__(self, mapreduce_spec, shard_state, task_retry_count=0): 398 """Constructor. 399 400 Args: 401 mapreduce_spec: mapreduce specification as model.MapreduceSpec. 402 shard_state: an instance of model.ShardState. This has to be the same 403 instance as the one MapperWorkerHandler mutates. All mutations are 404 flushed to datastore in the end of the slice. 405 task_retry_count: how many times this task has been retried. 406 """ 407 self._shard_state = shard_state 408 self.mapreduce_spec = mapreduce_spec 409 # TODO(user): Create a hierarchy of Context classes. Certain fields 410 # like task_retry_count only makes sense in TaskAttemptContext. 411 self.task_retry_count = task_retry_count 412 413 if self.mapreduce_spec: 414 self.mapreduce_id = self.mapreduce_spec.mapreduce_id 415 else: 416 # Only in tests 417 self.mapreduce_id = None 418 if shard_state: 419 self.shard_id = shard_state.get_shard_id() 420 else: 421 # Only in tests 422 self.shard_id = None 423 424 # TODO(user): Allow user to specify max entity count for the pool 425 # as they know how big their entities are. 426 self._mutation_pool = _MutationPool(mapreduce_spec=mapreduce_spec) 427 self._counters = _Counters(shard_state) 428 # TODO(user): Remove this after fixing 429 # keyhole/dataeng/imagery/feeds/client_lib.py in another CL. 430 self.counters = self._counters 431 432 self._pools = {} 433 self.register_pool("mutation_pool", self._mutation_pool) 434 self.register_pool("counters", self.counters) 435 436 def flush(self): 437 """Flush all information recorded in context.""" 438 for pool in self._pools.values(): 439 pool.flush() 440 441 def register_pool(self, key, pool): 442 """Register an arbitrary pool to be flushed together with this context. 443 444 Args: 445 key: pool key as string. 446 pool: a pool instance. 447 """ 448 self._pools[key] = pool 449 450 def get_pool(self, key): 451 """Obtains an instance of registered pool. 452 453 Args: 454 key: pool key as string. 455 456 Returns: 457 an instance of the pool registered earlier, or None. 458 """ 459 return self._pools.get(key, None) 460 461 @classmethod 462 def _set(cls, context): 463 """Set current context instance. 464 465 Args: 466 context: new context as Context or None. 467 """ 468 cls._local._context_instance = context 469 470 471# This method is intended for user code to access context instance. 472# MR framework should still try to take context as an explicit argument 473# whenever possible (dependency injection). 474def get(): 475 """Get current context instance. 476 477 Returns: 478 current context as Context. 479 """ 480 if not hasattr(Context._local, "_context_instance") : 481 return None 482 return Context._local._context_instance 483