1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Mapreduce execution context.
18
19Mapreduce context provides handler code with information about
20current mapreduce execution and organizes utility data flow
21from handlers such as counters, log messages, mutation pools.
22"""
23
24
25
26__all__ = ["get",
27           "Pool",
28           "Context",
29           "COUNTER_MAPPER_CALLS",
30           "COUNTER_MAPPER_WALLTIME_MS",
31           "DATASTORE_DEADLINE",
32           "MAX_ENTITY_COUNT",
33          ]
34
35import heapq
36import logging
37import threading
38
39try:
40  from google.appengine.ext import ndb
41except ImportError:
42  ndb = None
43from google.appengine.api import datastore
44from google.appengine.ext import db
45from google.appengine.runtime import apiproxy_errors
46
47
48# Maximum number of items. Pool will be flushed when reaches this amount.
49# Datastore API is smart to group entities into as few RPCs as possible without
50# exceeding RPC size. So in theory, MAX_ENTITY_COUNT can be as big as
51# the instance's memory can hold.
52# This number is just an estimate.
53# TODO(user): Do batching by entity size if cheap. b/10427424
54MAX_ENTITY_COUNT = 20
55
56# Deadline in seconds for mutation pool datastore operations.
57DATASTORE_DEADLINE = 15
58
59# The name of the counter which counts all mapper calls.
60COUNTER_MAPPER_CALLS = "mapper-calls"
61
62# Total walltime in msec given to mapper process. This is not just mapper
63# hundler function, but includes all i/o overhead.
64COUNTER_MAPPER_WALLTIME_MS = "mapper-walltime-ms"
65
66
67# pylint: disable=protected-access
68# pylint: disable=g-bad-name
69
70
71def _normalize_entity(value):
72  """Return an entity from an entity or model instance."""
73  if ndb is not None and isinstance(value, ndb.Model):
74    return None
75  if getattr(value, "_populate_internal_entity", None):
76    return value._populate_internal_entity()
77  return value
78
79
80def _normalize_key(value):
81  """Return a key from an entity, model instance, key, or key string."""
82  if ndb is not None and isinstance(value, (ndb.Model, ndb.Key)):
83    return None
84  if getattr(value, "key", None):
85    return value.key()
86  elif isinstance(value, basestring):
87    return datastore.Key(value)
88  else:
89    return value
90
91
92class _ItemList(object):
93  """A buffer that holds arbitrary items and auto flushes them when full.
94
95  Callers of this class provides the logic on how to flush.
96  This class takes care of the common logic of when to flush and when to retry.
97
98  Properties:
99    items: list of objects.
100    length: length of item list.
101    size: aggregate item size in bytes.
102  """
103
104  DEFAULT_RETRIES = 3
105  _LARGEST_ITEMS_TO_LOG = 5
106
107  def __init__(self,
108               max_entity_count,
109               flush_function,
110               timeout_retries=DEFAULT_RETRIES,
111               repr_function=None):
112    """Constructor.
113
114    Args:
115      max_entity_count: maximum number of entities before flushing it to db.
116      flush_function: a function that can flush the items. The function is
117        called with a list of items as the first argument, a dict of options
118        as second argument. Currently options can contain {"deadline": int}.
119        see self.flush on how the function is called.
120      timeout_retries: how many times to retry upon timeouts.
121      repr_function: a function that turns an item into meaningful
122        representation. For debugging large items.
123    """
124    self.items = []
125    self.__max_entity_count = int(max_entity_count)
126    self.__flush_function = flush_function
127    self.__repr_function = repr_function
128    self.__timeout_retries = int(timeout_retries)
129
130  def __str__(self):
131    return "ItemList of with %s items" % len(self.items)
132
133  def append(self, item):
134    """Add new item to the list.
135
136    If needed, append will first flush existing items and clear existing items.
137
138    Args:
139      item: an item to add to the list.
140    """
141    if self.should_flush():
142      self.flush()
143    self.items.append(item)
144
145  def flush(self):
146    """Force a flush."""
147    if not self.items:
148      return
149
150    retry = 0
151    options = {"deadline": DATASTORE_DEADLINE}
152    while retry <= self.__timeout_retries:
153      try:
154        self.__flush_function(self.items, options)
155        self.clear()
156        break
157      except db.Timeout, e:
158        logging.warning(e)
159        logging.warning("Flushing '%s' timed out. Will retry for the %s time.",
160                        self, retry)
161        retry += 1
162        options["deadline"] *= 2
163      except apiproxy_errors.RequestTooLargeError:
164        self._log_largest_items()
165        raise
166    else:
167      raise
168
169  def _log_largest_items(self):
170    if not self.__repr_function:
171      logging.error("Got RequestTooLargeError but can't interpret items in "
172                    "_ItemList %s.", self)
173      return
174
175    sizes = [len(self.__repr_function(i)) for i in self.items]
176    largest = heapq.nlargest(self._LARGEST_ITEMS_TO_LOG,
177                             zip(sizes, self.items),
178                             lambda t: t[0])
179    # Set field for for test only.
180    self._largest = [(s, self.__repr_function(i)) for s, i in largest]
181    logging.error("Got RequestTooLargeError. Largest items: %r", self._largest)
182
183  def clear(self):
184    """Clear item list."""
185    self.items = []
186
187  def should_flush(self):
188    """Whether to flush before append the next entity.
189
190    Returns:
191      True to flush. False other.
192    """
193    return len(self.items) >= self.__max_entity_count
194
195
196class Pool(object):
197  """Mutation pool accumulates changes to perform them in patch.
198
199  Any Pool subclass should not be public. Instead, Pool should define an
200  operation.Operation class and let user uses that. For example, in a map
201  function, user can do:
202
203  def map(foo):
204    yield OperationOnMyPool(any_argument)
205
206  Since Operation is a callable object, Mapreduce library will invoke
207  any Operation object that is yielded with context.Context instance.
208  The operation object can then access MyPool from Context.get_pool.
209  """
210
211  def flush(self):
212    """Flush all changes."""
213    raise NotImplementedError()
214
215
216class _MutationPool(Pool):
217  """Mutation pool accumulates datastore changes to perform them in batch.
218
219  Properties:
220    puts: _ItemList of entities to put to datastore.
221    deletes: _ItemList of keys to delete from datastore.
222    ndb_puts: _ItemList of ndb entities to put to datastore.
223    ndb_deletes: _ItemList of ndb keys to delete from datastore.
224  """
225
226  def __init__(self,
227               max_entity_count=MAX_ENTITY_COUNT,
228               mapreduce_spec=None):
229    """Constructor.
230
231    Args:
232      max_entity_count: maximum number of entities before flushing it to db.
233      mapreduce_spec: An optional instance of MapperSpec.
234    """
235    self.max_entity_count = max_entity_count
236    params = mapreduce_spec.params if mapreduce_spec is not None else {}
237    self.force_writes = bool(params.get("force_ops_writes", False))
238    self.puts = _ItemList(max_entity_count,
239                          self._flush_puts,
240                          repr_function=self._db_repr)
241    self.deletes = _ItemList(max_entity_count,
242                             self._flush_deletes)
243    self.ndb_puts = _ItemList(max_entity_count,
244                              self._flush_ndb_puts,
245                              repr_function=self._ndb_repr)
246    self.ndb_deletes = _ItemList(max_entity_count,
247                                 self._flush_ndb_deletes)
248
249  def put(self, entity):
250    """Registers entity to put to datastore.
251
252    Args:
253      entity: an entity or model instance to put.
254    """
255    actual_entity = _normalize_entity(entity)
256    if actual_entity is None:
257      return self.ndb_put(entity)
258    self.puts.append(actual_entity)
259
260  def ndb_put(self, entity):
261    """Like put(), but for NDB entities."""
262    assert ndb is not None and isinstance(entity, ndb.Model)
263    self.ndb_puts.append(entity)
264
265  def delete(self, entity):
266    """Registers entity to delete from datastore.
267
268    Args:
269      entity: an entity, model instance, or key to delete.
270    """
271    key = _normalize_key(entity)
272    if key is None:
273      return self.ndb_delete(entity)
274    self.deletes.append(key)
275
276  def ndb_delete(self, entity_or_key):
277    """Like delete(), but for NDB entities/keys."""
278    if ndb is not None and isinstance(entity_or_key, ndb.Model):
279      key = entity_or_key.key
280    else:
281      key = entity_or_key
282    self.ndb_deletes.append(key)
283
284  def flush(self):
285    """Flush(apply) all changed to datastore."""
286    self.puts.flush()
287    self.deletes.flush()
288    self.ndb_puts.flush()
289    self.ndb_deletes.flush()
290
291  @classmethod
292  def _db_repr(cls, entity):
293    """Converts entity to a readable repr.
294
295    Args:
296      entity: datastore.Entity or datastore_types.Key.
297
298    Returns:
299      Proto in str.
300    """
301    return str(entity._ToPb())
302
303  @classmethod
304  def _ndb_repr(cls, entity):
305    """Converts entity to a readable repr.
306
307    Args:
308      entity: ndb.Model
309
310    Returns:
311      Proto in str.
312    """
313    return str(entity._to_pb())
314
315  def _flush_puts(self, items, options):
316    """Flush all puts to datastore."""
317    datastore.Put(items, config=self._create_config(options))
318
319  def _flush_deletes(self, items, options):
320    """Flush all deletes to datastore."""
321    datastore.Delete(items, config=self._create_config(options))
322
323  def _flush_ndb_puts(self, items, options):
324    """Flush all NDB puts to datastore."""
325    assert ndb is not None
326    ndb.put_multi(items, config=self._create_config(options))
327
328  def _flush_ndb_deletes(self, items, options):
329    """Flush all deletes to datastore."""
330    assert ndb is not None
331    ndb.delete_multi(items, config=self._create_config(options))
332
333  def _create_config(self, options):
334    """Creates datastore Config.
335
336    Returns:
337      A datastore_rpc.Configuration instance.
338    """
339    return datastore.CreateConfig(deadline=options["deadline"],
340                                  force_writes=self.force_writes)
341
342
343class _Counters(Pool):
344  """Regulates access to counters.
345
346  Counters Pool is a str to int map. It is saved as part of ShardState so it
347  is flushed when ShardState commits to datastore successfully.
348  """
349
350  def __init__(self, shard_state):
351    """Constructor.
352
353    Args:
354      shard_state: current mapreduce shard state as model.ShardState.
355    """
356    self._shard_state = shard_state
357
358  def increment(self, counter_name, delta=1):
359    """Increment counter value.
360
361    Args:
362      counter_name: name of the counter as string.
363      delta: increment delta as int.
364    """
365    self._shard_state.counters_map.increment(counter_name, delta)
366
367  def flush(self):
368    """Flush unsaved counter values."""
369    pass
370
371
372# TODO(user): Define what fields should be public.
373class Context(object):
374  """MapReduce execution context.
375
376  The main purpose of Context is to facilitate IO. User code, input reader,
377  and output writer code can plug in pools (see Pool class) to Context to
378  batch operations.
379
380  There is a single Context instance associated with each worker thread.
381  It can be accessed via context.get(). handlers.MapperWorkerHandler creates
382  this instance before any IO code (input_reader, output_writer, user functions)
383  is called.
384
385  Each Pool decides how to batch and when to flush.
386  Context and all its pools are flushed by the end of a slice.
387  Upon error in slice execution, what is flushed is undefined. (See _Counters
388  for an exception).
389
390  Properties:
391    mapreduce_spec: current mapreduce specification as model.MapreduceSpec.
392  """
393
394  # Current context instance
395  _local = threading.local()
396
397  def __init__(self, mapreduce_spec, shard_state, task_retry_count=0):
398    """Constructor.
399
400    Args:
401      mapreduce_spec: mapreduce specification as model.MapreduceSpec.
402      shard_state: an instance of model.ShardState. This has to be the same
403        instance as the one MapperWorkerHandler mutates. All mutations are
404        flushed to datastore in the end of the slice.
405      task_retry_count: how many times this task has been retried.
406    """
407    self._shard_state = shard_state
408    self.mapreduce_spec = mapreduce_spec
409    # TODO(user): Create a hierarchy of Context classes. Certain fields
410    # like task_retry_count only makes sense in TaskAttemptContext.
411    self.task_retry_count = task_retry_count
412
413    if self.mapreduce_spec:
414      self.mapreduce_id = self.mapreduce_spec.mapreduce_id
415    else:
416      # Only in tests
417      self.mapreduce_id = None
418    if shard_state:
419      self.shard_id = shard_state.get_shard_id()
420    else:
421      # Only in tests
422      self.shard_id = None
423
424    # TODO(user): Allow user to specify max entity count for the pool
425    # as they know how big their entities are.
426    self._mutation_pool = _MutationPool(mapreduce_spec=mapreduce_spec)
427    self._counters = _Counters(shard_state)
428    # TODO(user): Remove this after fixing
429    # keyhole/dataeng/imagery/feeds/client_lib.py in another CL.
430    self.counters = self._counters
431
432    self._pools = {}
433    self.register_pool("mutation_pool", self._mutation_pool)
434    self.register_pool("counters", self.counters)
435
436  def flush(self):
437    """Flush all information recorded in context."""
438    for pool in self._pools.values():
439      pool.flush()
440
441  def register_pool(self, key, pool):
442    """Register an arbitrary pool to be flushed together with this context.
443
444    Args:
445      key: pool key as string.
446      pool: a pool instance.
447    """
448    self._pools[key] = pool
449
450  def get_pool(self, key):
451    """Obtains an instance of registered pool.
452
453    Args:
454      key: pool key as string.
455
456    Returns:
457      an instance of the pool registered earlier, or None.
458    """
459    return self._pools.get(key, None)
460
461  @classmethod
462  def _set(cls, context):
463    """Set current context instance.
464
465    Args:
466      context: new context as Context or None.
467    """
468    cls._local._context_instance = context
469
470
471# This method is intended for user code to access context instance.
472# MR framework should still try to take context as an explicit argument
473# whenever possible (dependency injection).
474def get():
475  """Get current context instance.
476
477  Returns:
478    current context as Context.
479  """
480  if not hasattr(Context._local, "_context_instance") :
481    return None
482  return Context._local._context_instance
483