1#!/usr/bin/env python 2"""Helpers iterators for input_readers.DatastoreInputReader.""" 3 4 5 6# pylint: disable=g-bad-name 7import itertools 8from google.appengine.datastore import datastore_query 9from google.appengine.datastore import datastore_rpc 10from google.appengine.ext import db 11from google.appengine.ext import key_range 12from mapreduce import json_util 13from mapreduce import key_ranges 14from mapreduce import model 15from mapreduce import namespace_range 16from mapreduce import property_range 17from mapreduce import util 18 19__all__ = [ 20 "RangeIteratorFactory", 21 "RangeIterator", 22 "AbstractKeyRangeIterator", 23 "KeyRangeModelIterator", 24 "KeyRangeEntityIterator", 25 "KeyRangeKeyIterator", 26 "KeyRangeEntityProtoIterator"] 27 28 29class RangeIteratorFactory(object): 30 """Factory to create RangeIterator.""" 31 32 @classmethod 33 def create_property_range_iterator(cls, 34 p_range, 35 ns_range, 36 query_spec): 37 """Create a _PropertyRangeModelIterator. 38 39 Args: 40 p_range: a property_range.PropertyRange object that defines the 41 conditions entities should safisfy. 42 ns_range: a namesrange.NamespaceRange object that defines the namespaces 43 to examine. 44 query_spec: a model.QuerySpec object that defines how to retrieve 45 entities from datastore. 46 47 Returns: 48 a RangeIterator. 49 """ 50 return _PropertyRangeModelIterator(p_range, 51 ns_range, 52 query_spec) 53 54 @classmethod 55 def create_multi_property_range_iterator(cls, 56 p_range_iters): 57 """Create a RangeIterator. 58 59 Args: 60 p_range_iters: a list of RangeIterator objects to chain together. 61 62 Returns: 63 a RangeIterator. 64 """ 65 return _MultiPropertyRangeModelIterator(p_range_iters) 66 67 @classmethod 68 def create_key_ranges_iterator(cls, 69 k_ranges, 70 query_spec, 71 key_range_iter_cls): 72 """Create a _KeyRangesIterator. 73 74 Args: 75 k_ranges: a key_ranges._KeyRanges object. 76 query_spec: a model.query_spec object that defines how to retrieve 77 entities from datastore. 78 key_range_iter_cls: the class that iterates over a single key range. 79 The value yielded by this class is yielded. 80 81 Returns: 82 a RangeIterator. 83 """ 84 return _KeyRangesIterator(k_ranges, query_spec, key_range_iter_cls) 85 86 @classmethod 87 def from_json(cls, json): 88 return _RANGE_ITERATORS[json["name"]].from_json(json) 89 90 91class RangeIterator(json_util.JsonMixin): 92 """Interface for DatastoreInputReader helpers. 93 94 Technically, RangeIterator is a container. It contains all datastore 95 entities that fall under a certain range (key range or proprety range). 96 It implements __iter__, which returns a generator that can iterate 97 through entities. It also implements marshalling logics. Marshalling 98 saves the state of the container so that any new generator created 99 can resume where the old generator left off. 100 101 Caveats: 102 1. Calling next() on the generators may also modify the container. 103 2. Marshlling after StopIteration is raised has undefined behavior. 104 """ 105 106 def __iter__(self): 107 """Iter. 108 109 Yields: 110 Iterates over datastore entities and yields some kind of value 111 for each entity. 112 """ 113 raise NotImplementedError() 114 115 def __repr__(self): 116 raise NotImplementedError() 117 118 def to_json(self): 119 """Serializes all states into json form. 120 121 Returns: 122 all states in json-compatible map. 123 """ 124 raise NotImplementedError() 125 126 @classmethod 127 def from_json(cls, json): 128 """Reverse of to_json.""" 129 raise NotImplementedError() 130 131 132class _PropertyRangeModelIterator(RangeIterator): 133 """Yields db/ndb model entities within a property range.""" 134 135 def __init__(self, p_range, ns_range, query_spec): 136 """Init. 137 138 Args: 139 p_range: a property_range.PropertyRange object that defines the 140 conditions entities should safisfy. 141 ns_range: a namesrange.NamespaceRange object that defines the namespaces 142 to examine. 143 query_spec: a model.QuerySpec object that defines how to retrieve 144 entities from datastore. 145 """ 146 self._property_range = p_range 147 self._ns_range = ns_range 148 self._query_spec = query_spec 149 self._cursor = None 150 self._query = None 151 152 def __repr__(self): 153 return "PropertyRangeIterator for %s" % str(self._property_range) 154 155 def __iter__(self): 156 """Iterate over entities. 157 158 Yields: 159 db model entities or ndb model entities if the model is defined with ndb. 160 """ 161 for ns in self._ns_range: 162 self._query = self._property_range.make_query(ns) 163 if isinstance(self._query, db.Query): 164 if self._cursor: 165 self._query.with_cursor(self._cursor) 166 for model_instance in self._query.run( 167 batch_size=self._query_spec.batch_size, 168 keys_only=self._query_spec.keys_only): 169 yield model_instance 170 else: 171 self._query = self._query.iter(batch_size=self._query_spec.batch_size, 172 keys_only=self._query_spec.keys_only, 173 start_cursor=self._cursor, 174 produce_cursors=True) 175 for model_instance in self._query: 176 yield model_instance 177 self._query = None 178 self._cursor = None 179 if ns != self._ns_range.namespace_end: 180 self._ns_range = self._ns_range.with_start_after(ns) 181 182 def to_json(self): 183 """Inherit doc.""" 184 cursor = self._cursor 185 if self._query is not None: 186 if isinstance(self._query, db.Query): 187 cursor = self._query.cursor() 188 else: 189 cursor = self._query.cursor_after() 190 191 if cursor is None or isinstance(cursor, basestring): 192 cursor_object = False 193 else: 194 cursor_object = True 195 cursor = cursor.to_websafe_string() 196 197 return {"property_range": self._property_range.to_json(), 198 "query_spec": self._query_spec.to_json(), 199 "cursor": cursor, 200 "ns_range": self._ns_range.to_json_object(), 201 "name": self.__class__.__name__, 202 "cursor_object": cursor_object} 203 204 # TODO(user): it sucks we need to handle cursor_to_str in many places. 205 # In the long run, datastore adaptor refactor will take care of this as 206 # we will only need to deal with low level datastore API after that. 207 # Thus we will not add Cursor as a json primitive MR should understand. 208 @classmethod 209 def from_json(cls, json): 210 """Inherit doc.""" 211 obj = cls(property_range.PropertyRange.from_json(json["property_range"]), 212 namespace_range.NamespaceRange.from_json_object(json["ns_range"]), 213 model.QuerySpec.from_json(json["query_spec"])) 214 cursor = json["cursor"] 215 # lint bug. Class method can access protected fields. 216 # pylint: disable=protected-access 217 if cursor and json["cursor_object"]: 218 obj._cursor = datastore_query.Cursor.from_websafe_string(cursor) 219 else: 220 obj._cursor = cursor 221 return obj 222 223 224class _MultiPropertyRangeModelIterator(RangeIterator): 225 """Yields db/ndb model entities within a list of disjoint property ranges.""" 226 227 def __init__(self, p_range_iters): 228 """Init. 229 230 Args: 231 p_range_iters: a list of _PropertyRangeModelIterator objects to chain 232 together. 233 """ 234 self._iters = p_range_iters 235 236 def __repr__(self): 237 return "MultiPropertyRangeIterator combining %s" % str( 238 [str(it) for it in self._iters]) 239 240 def __iter__(self): 241 """Iterate over entities. 242 243 Yields: 244 db model entities or ndb model entities if the model is defined with ndb. 245 """ 246 for model_instance in itertools.chain.from_iterable(self._iters): 247 yield model_instance 248 249 def to_json(self): 250 """Inherit doc.""" 251 json = {"name": self.__class__.__name__, 252 "num_ranges": len(self._iters)} 253 254 for i in xrange(len(self._iters)): 255 json_item = self._iters[i].to_json() 256 query_spec = json_item["query_spec"] 257 item_name = json_item["name"] 258 # Delete and move one level up 259 del json_item["query_spec"] 260 del json_item["name"] 261 json[str(i)] = json_item 262 # Store once to save space 263 json["query_spec"] = query_spec 264 json["item_name"] = item_name 265 266 return json 267 268 @classmethod 269 def from_json(cls, json): 270 """Inherit doc.""" 271 num_ranges = int(json["num_ranges"]) 272 query_spec = json["query_spec"] 273 item_name = json["item_name"] 274 275 p_range_iters = [] 276 for i in xrange(num_ranges): 277 json_item = json[str(i)] 278 # Place query_spec, name back into each iterator 279 json_item["query_spec"] = query_spec 280 json_item["name"] = item_name 281 p_range_iters.append(_PropertyRangeModelIterator.from_json(json_item)) 282 283 obj = cls(p_range_iters) 284 return obj 285 286 287class _KeyRangesIterator(RangeIterator): 288 """Create an iterator over a key_ranges.KeyRanges object.""" 289 290 def __init__(self, 291 k_ranges, 292 query_spec, 293 key_range_iter_cls): 294 """Init. 295 296 Args: 297 k_ranges: a key_ranges._KeyRanges object. 298 query_spec: a model.query_spec object that defines how to retrieve 299 entities from datastore. 300 key_range_iter_cls: the class that iterates over a single key range. 301 The value yielded by this class is yielded. 302 """ 303 self._key_ranges = k_ranges 304 self._query_spec = query_spec 305 self._key_range_iter_cls = key_range_iter_cls 306 self._current_iter = None 307 self._current_key_range = None 308 309 def __repr__(self): 310 return "KeyRangesIterator for %s" % str(self._key_ranges) 311 312 def __iter__(self): 313 while True: 314 if self._current_iter: 315 for o in self._current_iter: 316 yield o 317 318 try: 319 k_range = self._key_ranges.next() 320 self._current_iter = self._key_range_iter_cls(k_range, 321 self._query_spec) 322 except StopIteration: 323 self._current_iter = None 324 break 325 326 def to_json(self): 327 """Inherit doc.""" 328 current_iter = None 329 if self._current_iter: 330 current_iter = self._current_iter.to_json() 331 332 return {"key_ranges": self._key_ranges.to_json(), 333 "query_spec": self._query_spec.to_json(), 334 "current_iter": current_iter, 335 "key_range_iter_cls": self._key_range_iter_cls.__name__, 336 "name": self.__class__.__name__} 337 338 @classmethod 339 def from_json(cls, json): 340 """Inherit doc.""" 341 key_range_iter_cls = _KEY_RANGE_ITERATORS[json["key_range_iter_cls"]] 342 obj = cls(key_ranges.KeyRangesFactory.from_json(json["key_ranges"]), 343 model.QuerySpec.from_json(json["query_spec"]), 344 key_range_iter_cls) 345 346 current_iter = None 347 if json["current_iter"]: 348 current_iter = key_range_iter_cls.from_json(json["current_iter"]) 349 # pylint: disable=protected-access 350 obj._current_iter = current_iter 351 return obj 352 353 354# A map from class name to class of all RangeIterators. 355_RANGE_ITERATORS = { 356 _PropertyRangeModelIterator.__name__: _PropertyRangeModelIterator, 357 _MultiPropertyRangeModelIterator.__name__: _MultiPropertyRangeModelIterator, 358 _KeyRangesIterator.__name__: _KeyRangesIterator 359 } 360 361 362class AbstractKeyRangeIterator(json_util.JsonMixin): 363 """Iterates over a single key_range.KeyRange and yields value for each key. 364 365 All subclasses do the same thing: iterate over a single KeyRange. 366 They do so using different APIs (db, ndb, datastore) to return entities 367 of different types (db model, ndb model, datastore entity, raw proto). 368 """ 369 370 def __init__(self, k_range, query_spec): 371 """Init. 372 373 Args: 374 k_range: a key_range.KeyRange object that defines the entity keys to 375 operate on. KeyRange object already contains a namespace. 376 query_spec: a model.query_spec object that defines how to retrieve 377 entities from datastore. 378 """ 379 self._key_range = k_range 380 self._query_spec = query_spec 381 self._cursor = None 382 self._query = None 383 384 def __iter__(self): 385 """Iter.""" 386 raise NotImplementedError() 387 388 def _get_cursor(self): 389 """Get cursor on current query iterator for serialization.""" 390 raise NotImplementedError() 391 392 def to_json(self): 393 """Serializes all states into json form. 394 395 Returns: 396 all states in json-compatible map. 397 """ 398 cursor = self._get_cursor() 399 cursor_object = False 400 if cursor and isinstance(cursor, datastore_query.Cursor): 401 cursor = cursor.to_websafe_string() 402 cursor_object = True 403 return {"key_range": self._key_range.to_json(), 404 "query_spec": self._query_spec.to_json(), 405 "cursor": cursor, 406 "cursor_object": cursor_object} 407 408 @classmethod 409 def from_json(cls, json): 410 """Reverse of to_json.""" 411 obj = cls(key_range.KeyRange.from_json(json["key_range"]), 412 model.QuerySpec.from_json(json["query_spec"])) 413 cursor = json["cursor"] 414 # lint bug. Class method can access protected fields. 415 # pylint: disable=protected-access 416 if cursor and json["cursor_object"]: 417 obj._cursor = datastore_query.Cursor.from_websafe_string(cursor) 418 else: 419 obj._cursor = cursor 420 return obj 421 422 423class KeyRangeModelIterator(AbstractKeyRangeIterator): 424 """Yields db/ndb model entities with a key range.""" 425 426 def __iter__(self): 427 self._query = self._key_range.make_ascending_query( 428 util.for_name(self._query_spec.model_class_path), 429 filters=self._query_spec.filters) 430 431 if isinstance(self._query, db.Query): 432 if self._cursor: 433 self._query.with_cursor(self._cursor) 434 for model_instance in self._query.run( 435 batch_size=self._query_spec.batch_size, 436 keys_only=self._query_spec.keys_only): 437 yield model_instance 438 else: 439 self._query = self._query.iter(batch_size=self._query_spec.batch_size, 440 keys_only=self._query_spec.keys_only, 441 start_cursor=self._cursor, 442 produce_cursors=True) 443 for model_instance in self._query: 444 yield model_instance 445 446 def _get_cursor(self): 447 if self._query is None: 448 return self._cursor 449 450 if isinstance(self._query, db.Query): 451 return self._query.cursor() 452 else: 453 return self._query.cursor_after() 454 455 456class KeyRangeEntityIterator(AbstractKeyRangeIterator): 457 """Yields datastore.Entity type within a key range.""" 458 459 _KEYS_ONLY = False 460 461 def __iter__(self): 462 self._query = self._key_range.make_ascending_datastore_query( 463 self._query_spec.entity_kind, filters=self._query_spec.filters) 464 for entity in self._query.Run(config=datastore_query.QueryOptions( 465 batch_size=self._query_spec.batch_size, 466 keys_only=self._query_spec.keys_only or self._KEYS_ONLY, 467 start_cursor=self._cursor)): 468 yield entity 469 470 def _get_cursor(self): 471 if self._query is None: 472 return self._cursor 473 return self._query.GetCursor() 474 475 476class KeyRangeKeyIterator(KeyRangeEntityIterator): 477 """Yields datastore.Key type within a key range.""" 478 479 _KEYS_ONLY = True 480 481 482class KeyRangeEntityProtoIterator(AbstractKeyRangeIterator): 483 """Yields datastore.Entity's raw proto within a key range.""" 484 485 def __iter__(self): 486 query = self._key_range.make_ascending_datastore_query( 487 self._query_spec.entity_kind, filters=self._query_spec.filters) 488 # get a connection without adapter. 489 connection = datastore_rpc.Connection() 490 query_options = datastore_query.QueryOptions( 491 batch_size=self._query_spec.batch_size, 492 start_cursor=self._cursor, 493 produce_cursors=True) 494 495 # Transform datastore.Query: 496 # datastore.Query -> datastore_query.Query -> datastore_query.Batcher -> 497 # datastore_query.ResultsIterator 498 self._query = datastore_query.ResultsIterator( 499 query.GetQuery().run(connection, query_options)) 500 for entity_proto in self._query: 501 yield entity_proto 502 503 def _get_cursor(self): 504 if self._query is None: 505 return self._cursor 506 return self._query.cursor() 507 508 509# TODO(user): update this map automatically using metaclass if needed. 510# Ideally, we want a parameter in datastore input reader to control 511# the return type. 512_KEY_RANGE_ITERATORS = { 513 KeyRangeModelIterator.__name__: KeyRangeModelIterator, 514 KeyRangeEntityIterator.__name__: KeyRangeEntityIterator, 515 KeyRangeKeyIterator.__name__: KeyRangeKeyIterator, 516 KeyRangeEntityProtoIterator.__name__: KeyRangeEntityProtoIterator 517} 518