1#!/usr/bin/env python
2"""Helpers iterators for input_readers.DatastoreInputReader."""
3
4
5
6# pylint: disable=g-bad-name
7import itertools
8from google.appengine.datastore import datastore_query
9from google.appengine.datastore import datastore_rpc
10from google.appengine.ext import db
11from google.appengine.ext import key_range
12from mapreduce import json_util
13from mapreduce import key_ranges
14from mapreduce import model
15from mapreduce import namespace_range
16from mapreduce import property_range
17from mapreduce import util
18
19__all__ = [
20    "RangeIteratorFactory",
21    "RangeIterator",
22    "AbstractKeyRangeIterator",
23    "KeyRangeModelIterator",
24    "KeyRangeEntityIterator",
25    "KeyRangeKeyIterator",
26    "KeyRangeEntityProtoIterator"]
27
28
29class RangeIteratorFactory(object):
30  """Factory to create RangeIterator."""
31
32  @classmethod
33  def create_property_range_iterator(cls,
34                                     p_range,
35                                     ns_range,
36                                     query_spec):
37    """Create a _PropertyRangeModelIterator.
38
39    Args:
40      p_range: a property_range.PropertyRange object that defines the
41        conditions entities should safisfy.
42      ns_range: a namesrange.NamespaceRange object that defines the namespaces
43        to examine.
44      query_spec: a model.QuerySpec object that defines how to retrieve
45        entities from datastore.
46
47    Returns:
48      a RangeIterator.
49    """
50    return _PropertyRangeModelIterator(p_range,
51                                       ns_range,
52                                       query_spec)
53
54  @classmethod
55  def create_multi_property_range_iterator(cls,
56                                           p_range_iters):
57    """Create a RangeIterator.
58
59    Args:
60      p_range_iters: a list of RangeIterator objects to chain together.
61
62    Returns:
63      a RangeIterator.
64    """
65    return _MultiPropertyRangeModelIterator(p_range_iters)
66
67  @classmethod
68  def create_key_ranges_iterator(cls,
69                                 k_ranges,
70                                 query_spec,
71                                 key_range_iter_cls):
72    """Create a _KeyRangesIterator.
73
74    Args:
75      k_ranges: a key_ranges._KeyRanges object.
76      query_spec: a model.query_spec object that defines how to retrieve
77        entities from datastore.
78      key_range_iter_cls: the class that iterates over a single key range.
79        The value yielded by this class is yielded.
80
81    Returns:
82      a RangeIterator.
83    """
84    return _KeyRangesIterator(k_ranges, query_spec, key_range_iter_cls)
85
86  @classmethod
87  def from_json(cls, json):
88    return _RANGE_ITERATORS[json["name"]].from_json(json)
89
90
91class RangeIterator(json_util.JsonMixin):
92  """Interface for DatastoreInputReader helpers.
93
94  Technically, RangeIterator is a container. It contains all datastore
95  entities that fall under a certain range (key range or proprety range).
96  It implements __iter__, which returns a generator that can iterate
97  through entities. It also implements marshalling logics. Marshalling
98  saves the state of the container so that any new generator created
99  can resume where the old generator left off.
100
101  Caveats:
102    1. Calling next() on the generators may also modify the container.
103    2. Marshlling after StopIteration is raised has undefined behavior.
104  """
105
106  def __iter__(self):
107    """Iter.
108
109    Yields:
110      Iterates over datastore entities and yields some kind of value
111        for each entity.
112    """
113    raise NotImplementedError()
114
115  def __repr__(self):
116    raise NotImplementedError()
117
118  def to_json(self):
119    """Serializes all states into json form.
120
121    Returns:
122      all states in json-compatible map.
123    """
124    raise NotImplementedError()
125
126  @classmethod
127  def from_json(cls, json):
128    """Reverse of to_json."""
129    raise NotImplementedError()
130
131
132class _PropertyRangeModelIterator(RangeIterator):
133  """Yields db/ndb model entities within a property range."""
134
135  def __init__(self, p_range, ns_range, query_spec):
136    """Init.
137
138    Args:
139      p_range: a property_range.PropertyRange object that defines the
140        conditions entities should safisfy.
141      ns_range: a namesrange.NamespaceRange object that defines the namespaces
142        to examine.
143      query_spec: a model.QuerySpec object that defines how to retrieve
144        entities from datastore.
145    """
146    self._property_range = p_range
147    self._ns_range = ns_range
148    self._query_spec = query_spec
149    self._cursor = None
150    self._query = None
151
152  def __repr__(self):
153    return "PropertyRangeIterator for %s" % str(self._property_range)
154
155  def __iter__(self):
156    """Iterate over entities.
157
158    Yields:
159      db model entities or ndb model entities if the model is defined with ndb.
160    """
161    for ns in self._ns_range:
162      self._query = self._property_range.make_query(ns)
163      if isinstance(self._query, db.Query):
164        if self._cursor:
165          self._query.with_cursor(self._cursor)
166        for model_instance in self._query.run(
167            batch_size=self._query_spec.batch_size,
168            keys_only=self._query_spec.keys_only):
169          yield model_instance
170      else:
171        self._query = self._query.iter(batch_size=self._query_spec.batch_size,
172                                       keys_only=self._query_spec.keys_only,
173                                       start_cursor=self._cursor,
174                                       produce_cursors=True)
175        for model_instance in self._query:
176          yield model_instance
177      self._query = None
178      self._cursor = None
179      if ns != self._ns_range.namespace_end:
180        self._ns_range = self._ns_range.with_start_after(ns)
181
182  def to_json(self):
183    """Inherit doc."""
184    cursor = self._cursor
185    if self._query is not None:
186      if isinstance(self._query, db.Query):
187        cursor = self._query.cursor()
188      else:
189        cursor = self._query.cursor_after()
190
191    if cursor is None or isinstance(cursor, basestring):
192      cursor_object = False
193    else:
194      cursor_object = True
195      cursor = cursor.to_websafe_string()
196
197    return {"property_range": self._property_range.to_json(),
198            "query_spec": self._query_spec.to_json(),
199            "cursor": cursor,
200            "ns_range": self._ns_range.to_json_object(),
201            "name": self.__class__.__name__,
202            "cursor_object": cursor_object}
203
204  # TODO(user): it sucks we need to handle cursor_to_str in many places.
205  # In the long run, datastore adaptor refactor will take care of this as
206  # we will only need to deal with low level datastore API after that.
207  # Thus we will not add Cursor as a json primitive MR should understand.
208  @classmethod
209  def from_json(cls, json):
210    """Inherit doc."""
211    obj = cls(property_range.PropertyRange.from_json(json["property_range"]),
212              namespace_range.NamespaceRange.from_json_object(json["ns_range"]),
213              model.QuerySpec.from_json(json["query_spec"]))
214    cursor = json["cursor"]
215    # lint bug. Class method can access protected fields.
216    # pylint: disable=protected-access
217    if cursor and json["cursor_object"]:
218      obj._cursor = datastore_query.Cursor.from_websafe_string(cursor)
219    else:
220      obj._cursor = cursor
221    return obj
222
223
224class _MultiPropertyRangeModelIterator(RangeIterator):
225  """Yields db/ndb model entities within a list of disjoint property ranges."""
226
227  def __init__(self, p_range_iters):
228    """Init.
229
230    Args:
231      p_range_iters: a list of _PropertyRangeModelIterator objects to chain
232      together.
233    """
234    self._iters = p_range_iters
235
236  def __repr__(self):
237    return "MultiPropertyRangeIterator combining %s" % str(
238      [str(it) for it in self._iters])
239
240  def __iter__(self):
241    """Iterate over entities.
242
243    Yields:
244      db model entities or ndb model entities if the model is defined with ndb.
245    """
246    for model_instance in itertools.chain.from_iterable(self._iters):
247      yield model_instance
248
249  def to_json(self):
250    """Inherit doc."""
251    json = {"name": self.__class__.__name__,
252            "num_ranges": len(self._iters)}
253
254    for i in xrange(len(self._iters)):
255      json_item = self._iters[i].to_json()
256      query_spec = json_item["query_spec"]
257      item_name = json_item["name"]
258      # Delete and move one level up
259      del json_item["query_spec"]
260      del json_item["name"]
261      json[str(i)] = json_item
262    # Store once to save space
263    json["query_spec"] = query_spec
264    json["item_name"] = item_name
265
266    return json
267
268  @classmethod
269  def from_json(cls, json):
270    """Inherit doc."""
271    num_ranges = int(json["num_ranges"])
272    query_spec = json["query_spec"]
273    item_name = json["item_name"]
274
275    p_range_iters = []
276    for i in xrange(num_ranges):
277      json_item = json[str(i)]
278      # Place query_spec, name back into each iterator
279      json_item["query_spec"] = query_spec
280      json_item["name"] = item_name
281      p_range_iters.append(_PropertyRangeModelIterator.from_json(json_item))
282
283    obj = cls(p_range_iters)
284    return obj
285
286
287class _KeyRangesIterator(RangeIterator):
288  """Create an iterator over a key_ranges.KeyRanges object."""
289
290  def __init__(self,
291               k_ranges,
292               query_spec,
293               key_range_iter_cls):
294    """Init.
295
296    Args:
297      k_ranges: a key_ranges._KeyRanges object.
298      query_spec: a model.query_spec object that defines how to retrieve
299        entities from datastore.
300      key_range_iter_cls: the class that iterates over a single key range.
301        The value yielded by this class is yielded.
302    """
303    self._key_ranges = k_ranges
304    self._query_spec = query_spec
305    self._key_range_iter_cls = key_range_iter_cls
306    self._current_iter = None
307    self._current_key_range = None
308
309  def __repr__(self):
310    return "KeyRangesIterator for %s" % str(self._key_ranges)
311
312  def __iter__(self):
313    while True:
314      if self._current_iter:
315        for o in self._current_iter:
316          yield o
317
318      try:
319        k_range = self._key_ranges.next()
320        self._current_iter = self._key_range_iter_cls(k_range,
321                                                      self._query_spec)
322      except StopIteration:
323        self._current_iter = None
324        break
325
326  def to_json(self):
327    """Inherit doc."""
328    current_iter = None
329    if self._current_iter:
330      current_iter = self._current_iter.to_json()
331
332    return {"key_ranges": self._key_ranges.to_json(),
333            "query_spec": self._query_spec.to_json(),
334            "current_iter": current_iter,
335            "key_range_iter_cls": self._key_range_iter_cls.__name__,
336            "name": self.__class__.__name__}
337
338  @classmethod
339  def from_json(cls, json):
340    """Inherit doc."""
341    key_range_iter_cls = _KEY_RANGE_ITERATORS[json["key_range_iter_cls"]]
342    obj = cls(key_ranges.KeyRangesFactory.from_json(json["key_ranges"]),
343              model.QuerySpec.from_json(json["query_spec"]),
344              key_range_iter_cls)
345
346    current_iter = None
347    if json["current_iter"]:
348      current_iter = key_range_iter_cls.from_json(json["current_iter"])
349    # pylint: disable=protected-access
350    obj._current_iter = current_iter
351    return obj
352
353
354# A map from class name to class of all RangeIterators.
355_RANGE_ITERATORS = {
356    _PropertyRangeModelIterator.__name__: _PropertyRangeModelIterator,
357    _MultiPropertyRangeModelIterator.__name__: _MultiPropertyRangeModelIterator,
358    _KeyRangesIterator.__name__: _KeyRangesIterator
359    }
360
361
362class AbstractKeyRangeIterator(json_util.JsonMixin):
363  """Iterates over a single key_range.KeyRange and yields value for each key.
364
365  All subclasses do the same thing: iterate over a single KeyRange.
366  They do so using different APIs (db, ndb, datastore) to return entities
367  of different types (db model, ndb model, datastore entity, raw proto).
368  """
369
370  def __init__(self, k_range, query_spec):
371    """Init.
372
373    Args:
374      k_range: a key_range.KeyRange object that defines the entity keys to
375        operate on. KeyRange object already contains a namespace.
376      query_spec: a model.query_spec object that defines how to retrieve
377        entities from datastore.
378    """
379    self._key_range = k_range
380    self._query_spec = query_spec
381    self._cursor = None
382    self._query = None
383
384  def __iter__(self):
385    """Iter."""
386    raise NotImplementedError()
387
388  def _get_cursor(self):
389    """Get cursor on current query iterator for serialization."""
390    raise NotImplementedError()
391
392  def to_json(self):
393    """Serializes all states into json form.
394
395    Returns:
396      all states in json-compatible map.
397    """
398    cursor = self._get_cursor()
399    cursor_object = False
400    if cursor and isinstance(cursor, datastore_query.Cursor):
401      cursor = cursor.to_websafe_string()
402      cursor_object = True
403    return {"key_range": self._key_range.to_json(),
404            "query_spec": self._query_spec.to_json(),
405            "cursor": cursor,
406            "cursor_object": cursor_object}
407
408  @classmethod
409  def from_json(cls, json):
410    """Reverse of to_json."""
411    obj = cls(key_range.KeyRange.from_json(json["key_range"]),
412              model.QuerySpec.from_json(json["query_spec"]))
413    cursor = json["cursor"]
414    # lint bug. Class method can access protected fields.
415    # pylint: disable=protected-access
416    if cursor and json["cursor_object"]:
417      obj._cursor = datastore_query.Cursor.from_websafe_string(cursor)
418    else:
419      obj._cursor = cursor
420    return obj
421
422
423class KeyRangeModelIterator(AbstractKeyRangeIterator):
424  """Yields db/ndb model entities with a key range."""
425
426  def __iter__(self):
427    self._query = self._key_range.make_ascending_query(
428        util.for_name(self._query_spec.model_class_path),
429        filters=self._query_spec.filters)
430
431    if isinstance(self._query, db.Query):
432      if self._cursor:
433        self._query.with_cursor(self._cursor)
434      for model_instance in self._query.run(
435          batch_size=self._query_spec.batch_size,
436          keys_only=self._query_spec.keys_only):
437        yield model_instance
438    else:
439      self._query = self._query.iter(batch_size=self._query_spec.batch_size,
440                                     keys_only=self._query_spec.keys_only,
441                                     start_cursor=self._cursor,
442                                     produce_cursors=True)
443      for model_instance in self._query:
444        yield model_instance
445
446  def _get_cursor(self):
447    if self._query is None:
448      return self._cursor
449
450    if isinstance(self._query, db.Query):
451      return self._query.cursor()
452    else:
453      return self._query.cursor_after()
454
455
456class KeyRangeEntityIterator(AbstractKeyRangeIterator):
457  """Yields datastore.Entity type within a key range."""
458
459  _KEYS_ONLY = False
460
461  def __iter__(self):
462    self._query = self._key_range.make_ascending_datastore_query(
463        self._query_spec.entity_kind, filters=self._query_spec.filters)
464    for entity in self._query.Run(config=datastore_query.QueryOptions(
465        batch_size=self._query_spec.batch_size,
466        keys_only=self._query_spec.keys_only or self._KEYS_ONLY,
467        start_cursor=self._cursor)):
468      yield entity
469
470  def _get_cursor(self):
471    if self._query is None:
472      return self._cursor
473    return self._query.GetCursor()
474
475
476class KeyRangeKeyIterator(KeyRangeEntityIterator):
477  """Yields datastore.Key type within a key range."""
478
479  _KEYS_ONLY = True
480
481
482class KeyRangeEntityProtoIterator(AbstractKeyRangeIterator):
483  """Yields datastore.Entity's raw proto within a key range."""
484
485  def __iter__(self):
486    query = self._key_range.make_ascending_datastore_query(
487        self._query_spec.entity_kind, filters=self._query_spec.filters)
488    # get a connection without adapter.
489    connection = datastore_rpc.Connection()
490    query_options = datastore_query.QueryOptions(
491        batch_size=self._query_spec.batch_size,
492        start_cursor=self._cursor,
493        produce_cursors=True)
494
495    # Transform datastore.Query:
496    # datastore.Query -> datastore_query.Query -> datastore_query.Batcher ->
497    # datastore_query.ResultsIterator
498    self._query = datastore_query.ResultsIterator(
499        query.GetQuery().run(connection, query_options))
500    for entity_proto in self._query:
501      yield entity_proto
502
503  def _get_cursor(self):
504    if self._query is None:
505      return self._cursor
506    return self._query.cursor()
507
508
509# TODO(user): update this map automatically using metaclass if needed.
510# Ideally, we want a parameter in datastore input reader to control
511# the return type.
512_KEY_RANGE_ITERATORS = {
513    KeyRangeModelIterator.__name__: KeyRangeModelIterator,
514    KeyRangeEntityIterator.__name__: KeyRangeEntityIterator,
515    KeyRangeKeyIterator.__name__: KeyRangeKeyIterator,
516    KeyRangeEntityProtoIterator.__name__: KeyRangeEntityProtoIterator
517}
518