1#!/usr/bin/env python 2# Copyright 2010 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Defines input readers for MapReduce.""" 17 18 19 20__all__ = [ 21 "AbstractDatastoreInputReader", 22 "ALLOW_CHECKPOINT", 23 "BadReaderParamsError", 24 "BlobstoreLineInputReader", 25 "BlobstoreZipInputReader", 26 "BlobstoreZipLineInputReader", 27 "COUNTER_IO_READ_BYTES", 28 "COUNTER_IO_READ_MSEC", 29 "DatastoreEntityInputReader", 30 "DatastoreInputReader", 31 "DatastoreKeyInputReader", 32 "GoogleCloudStorageInputReader", 33 "GoogleCloudStorageRecordInputReader", 34 "RandomStringInputReader", 35 "RawDatastoreInputReader", 36 "Error", 37 "InputReader", 38 "LogInputReader", 39 "NamespaceInputReader", 40 ] 41 42# pylint: disable=g-bad-name 43# pylint: disable=protected-access 44 45import base64 46import copy 47import logging 48import pickle 49import random 50import string 51import StringIO 52import time 53import zipfile 54 55from google.net.proto import ProtocolBuffer 56from google.appengine.ext import ndb 57 58from google.appengine.api import datastore 59from google.appengine.api import logservice 60from google.appengine.api.logservice import log_service_pb 61from google.appengine.ext import blobstore 62from google.appengine.ext import db 63from google.appengine.ext import key_range 64from google.appengine.ext.db import metadata 65from mapreduce import context 66from mapreduce import datastore_range_iterators as db_iters 67from mapreduce import errors 68from mapreduce import json_util 69from mapreduce import key_ranges 70from mapreduce import kv_pb 71from mapreduce import model 72from mapreduce import namespace_range 73from mapreduce import operation 74from mapreduce import property_range 75from mapreduce import records 76from mapreduce import util 77 78# pylint: disable=g-import-not-at-top 79# TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime. 80try: 81 # Check if the full cloudstorage package exists. The stub part is in runtime. 82 cloudstorage = None 83 import cloudstorage 84 if hasattr(cloudstorage, "_STUB"): 85 cloudstorage = None 86except ImportError: 87 pass # CloudStorage library not available 88 89# Attempt to load cloudstorage from the bundle (availble in some tests) 90if cloudstorage is None: 91 try: 92 import cloudstorage 93 except ImportError: 94 pass # CloudStorage library really not available 95 96 97# Classes moved to errors module. Copied here for compatibility. 98Error = errors.Error 99BadReaderParamsError = errors.BadReaderParamsError 100 101 102# Counter name for number of bytes read. 103COUNTER_IO_READ_BYTES = "io-read-bytes" 104 105# Counter name for milliseconds spent reading data. 106COUNTER_IO_READ_MSEC = "io-read-msec" 107 108# Special value that can be yielded by InputReaders if they want to give the 109# framework an opportunity to save the state of the mapreduce without having 110# to yield an actual value to the handler. 111ALLOW_CHECKPOINT = object() 112 113 114class InputReader(json_util.JsonMixin): 115 """Abstract base class for input readers. 116 117 InputReaders have the following properties: 118 * They are created by using the split_input method to generate a set of 119 InputReaders from a MapperSpec. 120 * They generate inputs to the mapper via the iterator interface. 121 * After creation, they can be serialized and resumed using the JsonMixin 122 interface. 123 * They are cast to string for a user-readable description; it may be 124 valuable to implement __str__. 125 """ 126 127 # When expand_parameters is False, then value yielded by reader is passed 128 # to handler as is. If it's true, then *value is passed, expanding arguments 129 # and letting handler be a multi-parameter function. 130 expand_parameters = False 131 132 # Mapreduce parameters. 133 _APP_PARAM = "_app" 134 NAMESPACE_PARAM = "namespace" 135 NAMESPACES_PARAM = "namespaces" # Obsolete. 136 137 def __iter__(self): 138 return self 139 140 def next(self): 141 """Returns the next input from this input reader as a key, value pair. 142 143 Returns: 144 The next input from this input reader. 145 """ 146 raise NotImplementedError("next() not implemented in %s" % self.__class__) 147 148 @classmethod 149 def from_json(cls, input_shard_state): 150 """Creates an instance of the InputReader for the given input shard state. 151 152 Args: 153 input_shard_state: The InputReader state as a dict-like object. 154 155 Returns: 156 An instance of the InputReader configured using the values of json. 157 """ 158 raise NotImplementedError("from_json() not implemented in %s" % cls) 159 160 def to_json(self): 161 """Returns an input shard state for the remaining inputs. 162 163 Returns: 164 A json-izable version of the remaining InputReader. 165 """ 166 raise NotImplementedError("to_json() not implemented in %s" % 167 self.__class__) 168 169 @classmethod 170 def split_input(cls, mapper_spec): 171 """Returns a list of input readers. 172 173 This method creates a list of input readers, each for one shard. 174 It attempts to split inputs among readers evenly. 175 176 Args: 177 mapper_spec: model.MapperSpec specifies the inputs and additional 178 parameters to define the behavior of input readers. 179 180 Returns: 181 A list of InputReaders. None or [] when no input data can be found. 182 """ 183 raise NotImplementedError("split_input() not implemented in %s" % cls) 184 185 @classmethod 186 def validate(cls, mapper_spec): 187 """Validates mapper spec and all mapper parameters. 188 189 Input reader parameters are expected to be passed as "input_reader" 190 subdictionary in mapper_spec.params. 191 192 Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus 193 to be compatible, input reader check mapper_spec.params as well and 194 issue a warning if "input_reader" subdicationary is not present. 195 196 Args: 197 mapper_spec: The MapperSpec for this InputReader. 198 199 Raises: 200 BadReaderParamsError: required parameters are missing or invalid. 201 """ 202 if mapper_spec.input_reader_class() != cls: 203 raise BadReaderParamsError("Input reader class mismatch") 204 205 206def _get_params(mapper_spec, allowed_keys=None, allow_old=True): 207 """Obtain input reader parameters. 208 209 Utility function for input readers implementation. Fetches parameters 210 from mapreduce specification giving appropriate usage warnings. 211 212 Args: 213 mapper_spec: The MapperSpec for the job 214 allowed_keys: set of all allowed keys in parameters as strings. If it is not 215 None, then parameters are expected to be in a separate "input_reader" 216 subdictionary of mapper_spec parameters. 217 allow_old: Allow parameters to exist outside of the input_reader 218 subdictionary for compatability. 219 220 Returns: 221 mapper parameters as dict 222 223 Raises: 224 BadReaderParamsError: if parameters are invalid/missing or not allowed. 225 """ 226 if "input_reader" not in mapper_spec.params: 227 message = ("Input reader's parameters should be specified in " 228 "input_reader subdictionary.") 229 if not allow_old or allowed_keys: 230 raise errors.BadReaderParamsError(message) 231 params = mapper_spec.params 232 params = dict((str(n), v) for n, v in params.iteritems()) 233 else: 234 if not isinstance(mapper_spec.params.get("input_reader"), dict): 235 raise errors.BadReaderParamsError( 236 "Input reader parameters should be a dictionary") 237 params = mapper_spec.params.get("input_reader") 238 params = dict((str(n), v) for n, v in params.iteritems()) 239 if allowed_keys: 240 params_diff = set(params.keys()) - allowed_keys 241 if params_diff: 242 raise errors.BadReaderParamsError( 243 "Invalid input_reader parameters: %s" % ",".join(params_diff)) 244 return params 245 246 247class AbstractDatastoreInputReader(InputReader): 248 """Abstract class for datastore input readers.""" 249 250 # Number of entities to fetch at once while doing scanning. 251 _BATCH_SIZE = 50 252 253 # Maximum number of shards we'll create. 254 _MAX_SHARD_COUNT = 256 255 256 # Factor for additional ranges to split when using inequality filters. 257 _OVERSPLIT_FACTOR = 1 258 259 # The maximum number of namespaces that will be sharded by datastore key 260 # before switching to a strategy where sharding is done lexographically by 261 # namespace. 262 MAX_NAMESPACES_FOR_KEY_SHARD = 10 263 264 # reader parameters. 265 ENTITY_KIND_PARAM = "entity_kind" 266 KEYS_ONLY_PARAM = "keys_only" 267 BATCH_SIZE_PARAM = "batch_size" 268 KEY_RANGE_PARAM = "key_range" 269 FILTERS_PARAM = "filters" 270 OVERSPLIT_FACTOR_PARAM = "oversplit_factor" 271 272 _KEY_RANGE_ITER_CLS = db_iters.AbstractKeyRangeIterator 273 274 def __init__(self, iterator): 275 """Create new DatastoreInputReader object. 276 277 This is internal constructor. Use split_input to create readers instead. 278 279 Args: 280 iterator: an iterator that generates objects for this input reader. 281 """ 282 self._iter = iterator 283 284 def __iter__(self): 285 """Yields whatever internal iterator yields.""" 286 for o in self._iter: 287 yield o 288 289 def __str__(self): 290 """Returns the string representation of this InputReader.""" 291 return repr(self._iter) 292 293 def to_json(self): 294 """Serializes input reader to json compatible format. 295 296 Returns: 297 all the data in json-compatible map. 298 """ 299 return self._iter.to_json() 300 301 @classmethod 302 def from_json(cls, json): 303 """Create new DatastoreInputReader from json, encoded by to_json. 304 305 Args: 306 json: json representation of DatastoreInputReader. 307 308 Returns: 309 an instance of DatastoreInputReader with all data deserialized from json. 310 """ 311 return cls(db_iters.RangeIteratorFactory.from_json(json)) 312 313 @classmethod 314 def _get_query_spec(cls, mapper_spec): 315 """Construct a model.QuerySpec from model.MapperSpec.""" 316 params = _get_params(mapper_spec) 317 entity_kind = params[cls.ENTITY_KIND_PARAM] 318 filters = params.get(cls.FILTERS_PARAM) 319 app = params.get(cls._APP_PARAM) 320 ns = params.get(cls.NAMESPACE_PARAM) 321 322 return model.QuerySpec( 323 entity_kind=cls._get_raw_entity_kind(entity_kind), 324 keys_only=bool(params.get(cls.KEYS_ONLY_PARAM, False)), 325 filters=filters, 326 batch_size=int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)), 327 oversplit_factor=int(params.get(cls.OVERSPLIT_FACTOR_PARAM, 328 cls._OVERSPLIT_FACTOR)), 329 model_class_path=entity_kind, 330 app=app, 331 ns=ns) 332 333 @classmethod 334 def split_input(cls, mapper_spec): 335 """Inherit doc.""" 336 shard_count = mapper_spec.shard_count 337 query_spec = cls._get_query_spec(mapper_spec) 338 339 namespaces = None 340 if query_spec.ns is not None: 341 k_ranges = cls._to_key_ranges_by_shard( 342 query_spec.app, [query_spec.ns], shard_count, query_spec) 343 else: 344 ns_keys = namespace_range.get_namespace_keys( 345 query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1) 346 # No namespace means the app may have some data but those data are not 347 # visible yet. Just return. 348 if not ns_keys: 349 return 350 # If the number of ns is small, we shard each ns by key and assign each 351 # shard a piece of a ns. 352 elif len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD: 353 namespaces = [ns_key.name() or "" for ns_key in ns_keys] 354 k_ranges = cls._to_key_ranges_by_shard( 355 query_spec.app, namespaces, shard_count, query_spec) 356 # When number of ns is large, we can only split lexicographically by ns. 357 else: 358 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count, 359 contiguous=False, 360 can_query=lambda: True, 361 _app=query_spec.app) 362 k_ranges = [key_ranges.KeyRangesFactory.create_from_ns_range(ns_range) 363 for ns_range in ns_ranges] 364 365 iters = [db_iters.RangeIteratorFactory.create_key_ranges_iterator( 366 r, query_spec, cls._KEY_RANGE_ITER_CLS) for r in k_ranges] 367 368 return [cls(i) for i in iters] 369 370 @classmethod 371 def _to_key_ranges_by_shard(cls, app, namespaces, shard_count, query_spec): 372 """Get a list of key_ranges.KeyRanges objects, one for each shard. 373 374 This method uses scatter index to split each namespace into pieces 375 and assign those pieces to shards. 376 377 Args: 378 app: app_id in str. 379 namespaces: a list of namespaces in str. 380 shard_count: number of shards to split. 381 query_spec: model.QuerySpec. 382 383 Returns: 384 a list of key_ranges.KeyRanges objects. 385 """ 386 key_ranges_by_ns = [] 387 # Split each ns into n splits. If a ns doesn't have enough scatter to 388 # split into n, the last few splits are None. 389 for namespace in namespaces: 390 ranges = cls._split_ns_by_scatter( 391 shard_count, 392 namespace, 393 query_spec.entity_kind, 394 query_spec.filters, 395 app) 396 # The nth split of each ns will be assigned to the nth shard. 397 # Shuffle so that None are not all by the end. 398 random.shuffle(ranges) 399 key_ranges_by_ns.append(ranges) 400 401 # KeyRanges from different namespaces might be very different in size. 402 # Use round robin to make sure each shard can have at most one split 403 # or a None from a ns. 404 ranges_by_shard = [[] for _ in range(shard_count)] 405 for ranges in key_ranges_by_ns: 406 for i, k_range in enumerate(ranges): 407 if k_range: 408 ranges_by_shard[i].append(k_range) 409 410 key_ranges_by_shard = [] 411 for ranges in ranges_by_shard: 412 if ranges: 413 key_ranges_by_shard.append(key_ranges.KeyRangesFactory.create_from_list( 414 ranges)) 415 return key_ranges_by_shard 416 417 @classmethod 418 def _split_ns_by_scatter(cls, 419 shard_count, 420 namespace, 421 raw_entity_kind, 422 filters, 423 app): 424 """Split a namespace by scatter index into key_range.KeyRange. 425 426 TODO(user): Power this with key_range.KeyRange.compute_split_points. 427 428 Args: 429 shard_count: number of shards. 430 namespace: namespace name to split. str. 431 raw_entity_kind: low level datastore API entity kind. 432 app: app id in str. 433 434 Returns: 435 A list of key_range.KeyRange objects. If there are not enough entities to 436 splits into requested shards, the returned list will contain KeyRanges 437 ordered lexicographically with any Nones appearing at the end. 438 """ 439 if shard_count == 1: 440 # With one shard we don't need to calculate any split points at all. 441 return [key_range.KeyRange(namespace=namespace, _app=app)] 442 443 ds_query = datastore.Query(kind=raw_entity_kind, 444 namespace=namespace, 445 _app=app, 446 keys_only=True) 447 ds_query.Order("__scatter__") 448 oversampling_factor = 32 449 random_keys = None 450 if filters: 451 ds_query_with_filters = copy.copy(ds_query) 452 for (key, op, value) in filters: 453 ds_query_with_filters.update({'%s %s' % (key, op): value}) 454 try: 455 random_keys = ds_query_with_filters.Get(shard_count * 456 oversampling_factor) 457 except db.NeedIndexError, why: 458 logging.warning('Need to add an index for optimal mapreduce-input' 459 ' splitting:\n%s' % why) 460 # We'll try again without the filter. We hope the filter 461 # will filter keys uniformly across the key-name space! 462 463 if not random_keys: 464 random_keys = ds_query.Get(shard_count * oversampling_factor) 465 466 if not random_keys: 467 # There are no entities with scatter property. We have no idea 468 # how to split. 469 return ([key_range.KeyRange(namespace=namespace, _app=app)] + 470 [None] * (shard_count - 1)) 471 472 random_keys.sort() 473 474 if len(random_keys) >= shard_count: 475 # We've got a lot of scatter values. Sample them down. 476 random_keys = cls._choose_split_points(random_keys, shard_count) 477 478 k_ranges = [] 479 480 k_ranges.append(key_range.KeyRange( 481 key_start=None, 482 key_end=random_keys[0], 483 direction=key_range.KeyRange.ASC, 484 include_start=False, 485 include_end=False, 486 namespace=namespace, 487 _app=app)) 488 489 for i in range(0, len(random_keys) - 1): 490 k_ranges.append(key_range.KeyRange( 491 key_start=random_keys[i], 492 key_end=random_keys[i+1], 493 direction=key_range.KeyRange.ASC, 494 include_start=True, 495 include_end=False, 496 namespace=namespace, 497 _app=app)) 498 499 k_ranges.append(key_range.KeyRange( 500 key_start=random_keys[-1], 501 key_end=None, 502 direction=key_range.KeyRange.ASC, 503 include_start=True, 504 include_end=False, 505 namespace=namespace, 506 _app=app)) 507 508 if len(k_ranges) < shard_count: 509 # We need to have as many shards as it was requested. Add some Nones. 510 k_ranges += [None] * (shard_count - len(k_ranges)) 511 return k_ranges 512 513 @classmethod 514 def _choose_split_points(cls, sorted_keys, shard_count): 515 """Returns the best split points given a random set of datastore.Keys.""" 516 assert len(sorted_keys) >= shard_count 517 index_stride = len(sorted_keys) / float(shard_count) 518 return [sorted_keys[int(round(index_stride * i))] 519 for i in range(1, shard_count)] 520 521 @classmethod 522 def validate(cls, mapper_spec): 523 """Inherit docs.""" 524 params = _get_params(mapper_spec) 525 if cls.ENTITY_KIND_PARAM not in params: 526 raise BadReaderParamsError("Missing input reader parameter 'entity_kind'") 527 if cls.BATCH_SIZE_PARAM in params: 528 try: 529 batch_size = int(params[cls.BATCH_SIZE_PARAM]) 530 if batch_size < 1: 531 raise BadReaderParamsError("Bad batch size: %s" % batch_size) 532 except ValueError, e: 533 raise BadReaderParamsError("Bad batch size: %s" % e) 534 if cls.OVERSPLIT_FACTOR_PARAM in params: 535 try: 536 oversplit_factor = int(params[cls.OVERSPLIT_FACTOR_PARAM]) 537 if oversplit_factor < 1: 538 raise BadReaderParamsError("Bad oversplit factor:" 539 " %s" % oversplit_factor) 540 except ValueError, e: 541 raise BadReaderParamsError("Bad oversplit factor: %s" % e) 542 try: 543 bool(params.get(cls.KEYS_ONLY_PARAM, False)) 544 except: 545 raise BadReaderParamsError("keys_only expects a boolean value but got %s", 546 params[cls.KEYS_ONLY_PARAM]) 547 if cls.NAMESPACE_PARAM in params: 548 if not isinstance(params[cls.NAMESPACE_PARAM], 549 (str, unicode, type(None))): 550 raise BadReaderParamsError( 551 "Expected a single namespace string") 552 if cls.NAMESPACES_PARAM in params: 553 raise BadReaderParamsError("Multiple namespaces are no longer supported") 554 if cls.FILTERS_PARAM in params: 555 filters = params[cls.FILTERS_PARAM] 556 if not isinstance(filters, list): 557 raise BadReaderParamsError("Expected list for filters parameter") 558 for f in filters: 559 if not isinstance(f, (tuple, list)): 560 raise BadReaderParamsError("Filter should be a tuple or list: %s", f) 561 if len(f) != 3: 562 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f) 563 prop, op, _ = f 564 if not isinstance(prop, basestring): 565 raise BadReaderParamsError("Property should be string: %s", prop) 566 if not isinstance(op, basestring): 567 raise BadReaderParamsError("Operator should be string: %s", op) 568 569 @classmethod 570 def _get_raw_entity_kind(cls, entity_kind_or_model_classpath): 571 """Returns the entity kind to use with low level datastore calls. 572 573 Args: 574 entity_kind_or_model_classpath: user specified entity kind or model 575 classpath. 576 577 Returns: 578 the entity kind in str to use with low level datastore calls. 579 """ 580 return entity_kind_or_model_classpath 581 582 583class RawDatastoreInputReader(AbstractDatastoreInputReader): 584 """Iterates over an entity kind and yields datastore.Entity.""" 585 586 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityIterator 587 588 @classmethod 589 def validate(cls, mapper_spec): 590 """Inherit docs.""" 591 super(RawDatastoreInputReader, cls).validate(mapper_spec) 592 params = _get_params(mapper_spec) 593 entity_kind = params[cls.ENTITY_KIND_PARAM] 594 if "." in entity_kind: 595 logging.warning( 596 ". detected in entity kind %s specified for reader %s." 597 "Assuming entity kind contains the dot.", 598 entity_kind, cls.__name__) 599 if cls.FILTERS_PARAM in params: 600 filters = params[cls.FILTERS_PARAM] 601 for f in filters: 602 if f[1] != "=": 603 raise BadReaderParamsError( 604 "Only equality filters are supported: %s", f) 605 606 607class DatastoreInputReader(AbstractDatastoreInputReader): 608 """Iterates over a Model and yields model instances. 609 610 Supports both db.model and ndb.model. 611 """ 612 613 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeModelIterator 614 615 @classmethod 616 def _get_raw_entity_kind(cls, model_classpath): 617 entity_type = util.for_name(model_classpath) 618 if isinstance(entity_type, db.Model): 619 return entity_type.kind() 620 elif isinstance(entity_type, (ndb.Model, ndb.MetaModel)): 621 # pylint: disable=protected-access 622 return entity_type._get_kind() 623 else: 624 return util.get_short_name(model_classpath) 625 626 @classmethod 627 def validate(cls, mapper_spec): 628 """Inherit docs.""" 629 super(DatastoreInputReader, cls).validate(mapper_spec) 630 params = _get_params(mapper_spec) 631 entity_kind = params[cls.ENTITY_KIND_PARAM] 632 # Fail fast if Model cannot be located. 633 try: 634 model_class = util.for_name(entity_kind) 635 except ImportError, e: 636 raise BadReaderParamsError("Bad entity kind: %s" % e) 637 if cls.FILTERS_PARAM in params: 638 filters = params[cls.FILTERS_PARAM] 639 if issubclass(model_class, db.Model): 640 cls._validate_filters(filters, model_class) 641 else: 642 cls._validate_filters_ndb(filters, model_class) 643 property_range.PropertyRange(filters, entity_kind) 644 645 @classmethod 646 def _validate_filters(cls, filters, model_class): 647 """Validate user supplied filters. 648 649 Validate filters are on existing properties and filter values 650 have valid semantics. 651 652 Args: 653 filters: user supplied filters. Each filter should be a list or tuple of 654 format (<property_name_as_str>, <query_operator_as_str>, 655 <value_of_certain_type>). Value type is up to the property's type. 656 model_class: the db.Model class for the entity type to apply filters on. 657 658 Raises: 659 BadReaderParamsError: if any filter is invalid in any way. 660 """ 661 if not filters: 662 return 663 664 properties = model_class.properties() 665 666 for f in filters: 667 prop, _, val = f 668 if prop not in properties: 669 raise errors.BadReaderParamsError( 670 "Property %s is not defined for entity type %s", 671 prop, model_class.kind()) 672 673 # Validate the value of each filter. We need to know filters have 674 # valid value to carry out splits. 675 try: 676 properties[prop].validate(val) 677 except db.BadValueError, e: 678 raise errors.BadReaderParamsError(e) 679 680 @classmethod 681 # pylint: disable=protected-access 682 def _validate_filters_ndb(cls, filters, model_class): 683 """Validate ndb.Model filters.""" 684 if not filters: 685 return 686 687 properties = model_class._properties 688 689 690 for idx, f in enumerate(filters): 691 prop, ineq, val = f 692 if prop not in properties: 693 raise errors.BadReaderParamsError( 694 "Property %s is not defined for entity type %s", 695 prop, model_class._get_kind()) 696 697 # Attempt to cast the value to a KeyProperty if appropriate. 698 # This enables filtering against keys. 699 try: 700 if (isinstance(val, basestring) and 701 isinstance(properties[prop], 702 (ndb.KeyProperty, ndb.ComputedProperty))): 703 val = ndb.Key(urlsafe=val) 704 filters[idx] = [prop, ineq, val] 705 except: 706 pass 707 708 # Validate the value of each filter. We need to know filters have 709 # valid value to carry out splits. 710 try: 711 properties[prop]._do_validate(val) 712 except db.BadValueError, e: 713 raise errors.BadReaderParamsError(e) 714 715 @classmethod 716 def split_input(cls, mapper_spec): 717 """Inherit docs.""" 718 shard_count = mapper_spec.shard_count 719 query_spec = cls._get_query_spec(mapper_spec) 720 721 if not property_range.should_shard_by_property_range(query_spec.filters): 722 return super(DatastoreInputReader, cls).split_input(mapper_spec) 723 724 # Artificially increase the number of shards to get a more even split. 725 # For example, if we are creating 7 shards for one week of data based on a 726 # Day property and the data points tend to be clumped on certain days (say, 727 # Monday and Wednesday), instead of assigning each shard a single day of 728 # the week, we will split each day into "oversplit_factor" pieces, and 729 # assign each shard "oversplit_factor" pieces with "1 / oversplit_factor" 730 # the work, so that the data from Monday and Wednesday is more evenly 731 # spread across all shards. 732 oversplit_factor = query_spec.oversplit_factor 733 oversplit_shard_count = oversplit_factor * shard_count 734 p_range = property_range.PropertyRange(query_spec.filters, 735 query_spec.model_class_path) 736 p_ranges = p_range.split(oversplit_shard_count) 737 738 # User specified a namespace. 739 if query_spec.ns is not None: 740 ns_range = namespace_range.NamespaceRange( 741 namespace_start=query_spec.ns, 742 namespace_end=query_spec.ns, 743 _app=query_spec.app) 744 ns_ranges = [copy.copy(ns_range) for _ in p_ranges] 745 else: 746 ns_keys = namespace_range.get_namespace_keys( 747 query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1) 748 if not ns_keys: 749 return 750 # User doesn't specify ns but the number of ns is small. 751 # We still split by property range. 752 if len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD: 753 ns_ranges = [namespace_range.NamespaceRange(_app=query_spec.app) 754 for _ in p_ranges] 755 # Lots of namespaces. Split by ns. 756 else: 757 ns_ranges = namespace_range.NamespaceRange.split(n=oversplit_shard_count, 758 contiguous=False, 759 can_query=lambda: True, 760 _app=query_spec.app) 761 p_ranges = [copy.copy(p_range) for _ in ns_ranges] 762 763 assert len(p_ranges) == len(ns_ranges) 764 765 iters = [ 766 db_iters.RangeIteratorFactory.create_property_range_iterator( 767 p, ns, query_spec) for p, ns in zip(p_ranges, ns_ranges)] 768 769 # Reduce the number of ranges back down to the shard count. 770 # It's possible that we didn't split into enough shards even 771 # after oversplitting, in which case we don't need to do anything. 772 if len(iters) > shard_count: 773 # We cycle through the iterators and chain them together, e.g. 774 # if we look at the indices chained together, we get: 775 # Shard #0 gets 0, num_shards, 2 * num_shards, ... 776 # Shard #1 gets 1, num_shards + 1, 2 * num_shards + 1, ... 777 # Shard #2 gets 2, num_shards + 2, 2 * num_shards + 2, ... 778 # and so on. This should split fairly evenly. 779 iters = [ 780 db_iters.RangeIteratorFactory.create_multi_property_range_iterator( 781 [iters[i] for i in xrange(start_index, len(iters), shard_count)] 782 ) for start_index in xrange(shard_count) 783 ] 784 785 return [cls(i) for i in iters] 786 787 788class DatastoreKeyInputReader(RawDatastoreInputReader): 789 """Iterate over an entity kind and yields datastore.Key.""" 790 791 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeKeyIterator 792 793 794# For backward compatibility. 795DatastoreEntityInputReader = RawDatastoreInputReader 796 797 798# TODO(user): Remove this after the only dependency GroomerMarkReader is 799class _OldAbstractDatastoreInputReader(InputReader): 800 """Abstract base class for classes that iterate over datastore entities. 801 802 Concrete subclasses must implement _iter_key_range(self, k_range). See the 803 docstring for that method for details. 804 """ 805 806 # Number of entities to fetch at once while doing scanning. 807 _BATCH_SIZE = 50 808 809 # Maximum number of shards we'll create. 810 _MAX_SHARD_COUNT = 256 811 812 # __scatter__ oversampling factor 813 _OVERSAMPLING_FACTOR = 32 814 815 # The maximum number of namespaces that will be sharded by datastore key 816 # before switching to a strategy where sharding is done lexographically by 817 # namespace. 818 MAX_NAMESPACES_FOR_KEY_SHARD = 10 819 820 # Mapreduce parameters. 821 ENTITY_KIND_PARAM = "entity_kind" 822 KEYS_ONLY_PARAM = "keys_only" 823 BATCH_SIZE_PARAM = "batch_size" 824 KEY_RANGE_PARAM = "key_range" 825 NAMESPACE_RANGE_PARAM = "namespace_range" 826 CURRENT_KEY_RANGE_PARAM = "current_key_range" 827 FILTERS_PARAM = "filters" 828 829 # TODO(user): Add support for arbitrary queries. It's not possible to 830 # support them without cursors since right now you can't even serialize query 831 # definition. 832 # pylint: disable=redefined-outer-name 833 def __init__(self, 834 entity_kind, 835 key_ranges=None, 836 ns_range=None, 837 batch_size=_BATCH_SIZE, 838 current_key_range=None, 839 filters=None): 840 """Create new AbstractDatastoreInputReader object. 841 842 This is internal constructor. Use split_query in a concrete class instead. 843 844 Args: 845 entity_kind: entity kind as string. 846 key_ranges: a sequence of key_range.KeyRange instances to process. Only 847 one of key_ranges or ns_range can be non-None. 848 ns_range: a namespace_range.NamespaceRange to process. Only one of 849 key_ranges or ns_range can be non-None. 850 batch_size: size of read batch as int. 851 current_key_range: the current key_range.KeyRange being processed. 852 filters: optional list of filters to apply to the query. Each filter is 853 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>). 854 User filters are applied first. 855 """ 856 assert key_ranges is not None or ns_range is not None, ( 857 "must specify one of 'key_ranges' or 'ns_range'") 858 assert key_ranges is None or ns_range is None, ( 859 "can't specify both 'key_ranges ' and 'ns_range'") 860 861 self._entity_kind = entity_kind 862 # Reverse the KeyRanges so they can be processed in order as a stack of 863 # work items. 864 self._key_ranges = key_ranges and list(reversed(key_ranges)) 865 866 self._ns_range = ns_range 867 self._batch_size = int(batch_size) 868 self._current_key_range = current_key_range 869 self._filters = filters 870 871 @classmethod 872 def _get_raw_entity_kind(cls, entity_kind): 873 if "." in entity_kind: 874 logging.warning( 875 ". detected in entity kind %s specified for reader %s." 876 "Assuming entity kind contains the dot.", 877 entity_kind, cls.__name__) 878 return entity_kind 879 880 def __iter__(self): 881 """Iterates over the given KeyRanges or NamespaceRange. 882 883 This method iterates over the given KeyRanges or NamespaceRange and sets 884 the self._current_key_range to the KeyRange currently being processed. It 885 then delegates to the _iter_key_range method to yield that actual 886 results. 887 888 Yields: 889 Forwards the objects yielded by the subclasses concrete _iter_key_range() 890 method. The caller must consume the result yielded because self.to_json() 891 will not include it. 892 """ 893 if self._key_ranges is not None: 894 for o in self._iter_key_ranges(): 895 yield o 896 elif self._ns_range is not None: 897 for o in self._iter_ns_range(): 898 yield o 899 else: 900 assert False, "self._key_ranges and self._ns_range are both None" 901 902 def _iter_key_ranges(self): 903 """Iterates over self._key_ranges, delegating to self._iter_key_range().""" 904 while True: 905 if self._current_key_range is None: 906 if self._key_ranges: 907 self._current_key_range = self._key_ranges.pop() 908 # The most recently popped key_range may be None, so continue here 909 # to find the next keyrange that's valid. 910 continue 911 else: 912 break 913 914 for key, o in self._iter_key_range( 915 copy.deepcopy(self._current_key_range)): 916 # The caller must consume yielded values so advancing the KeyRange 917 # before yielding is safe. 918 self._current_key_range.advance(key) 919 yield o 920 self._current_key_range = None 921 922 def _iter_ns_range(self): 923 """Iterates over self._ns_range, delegating to self._iter_key_range().""" 924 while True: 925 if self._current_key_range is None: 926 query = self._ns_range.make_datastore_query() 927 namespace_result = query.Get(1) 928 if not namespace_result: 929 break 930 931 namespace = namespace_result[0].name() or "" 932 self._current_key_range = key_range.KeyRange( 933 namespace=namespace, _app=self._ns_range.app) 934 yield ALLOW_CHECKPOINT 935 936 for key, o in self._iter_key_range( 937 copy.deepcopy(self._current_key_range)): 938 # The caller must consume yielded values so advancing the KeyRange 939 # before yielding is safe. 940 self._current_key_range.advance(key) 941 yield o 942 943 if (self._ns_range.is_single_namespace or 944 self._current_key_range.namespace == self._ns_range.namespace_end): 945 break 946 self._ns_range = self._ns_range.with_start_after( 947 self._current_key_range.namespace) 948 self._current_key_range = None 949 950 def _iter_key_range(self, k_range): 951 """Yields a db.Key and the value that should be yielded by self.__iter__(). 952 953 Args: 954 k_range: The key_range.KeyRange to iterate over. 955 956 Yields: 957 A 2-tuple containing the last db.Key processed and the value that should 958 be yielded by __iter__. The returned db.Key will be used to determine the 959 InputReader's current position in self._current_key_range. 960 """ 961 raise NotImplementedError("_iter_key_range() not implemented in %s" % 962 self.__class__) 963 964 def __str__(self): 965 """Returns the string representation of this InputReader.""" 966 if self._ns_range is None: 967 return repr(self._key_ranges) 968 else: 969 return repr(self._ns_range) 970 971 @classmethod 972 def _choose_split_points(cls, sorted_keys, shard_count): 973 """Returns the best split points given a random set of db.Keys.""" 974 assert len(sorted_keys) >= shard_count 975 index_stride = len(sorted_keys) / float(shard_count) 976 return [sorted_keys[int(round(index_stride * i))] 977 for i in range(1, shard_count)] 978 979 # TODO(user): use query splitting functionality when it becomes available 980 # instead. 981 @classmethod 982 def _split_input_from_namespace(cls, app, namespace, entity_kind, 983 shard_count): 984 """Helper for _split_input_from_params. 985 986 If there are not enough Entities to make all of the given shards, the 987 returned list of KeyRanges will include Nones. The returned list will 988 contain KeyRanges ordered lexographically with any Nones appearing at the 989 end. 990 991 Args: 992 app: the app. 993 namespace: the namespace. 994 entity_kind: entity kind as string. 995 shard_count: the number of shards. 996 997 Returns: 998 KeyRange objects. 999 """ 1000 1001 raw_entity_kind = cls._get_raw_entity_kind(entity_kind) 1002 if shard_count == 1: 1003 # With one shard we don't need to calculate any splitpoints at all. 1004 return [key_range.KeyRange(namespace=namespace, _app=app)] 1005 1006 ds_query = datastore.Query(kind=raw_entity_kind, 1007 namespace=namespace, 1008 _app=app, 1009 keys_only=True) 1010 ds_query.Order("__scatter__") 1011 random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR) 1012 1013 if not random_keys: 1014 # There are no entities with scatter property. We have no idea 1015 # how to split. 1016 return ([key_range.KeyRange(namespace=namespace, _app=app)] + 1017 [None] * (shard_count - 1)) 1018 1019 random_keys.sort() 1020 1021 if len(random_keys) >= shard_count: 1022 # We've got a lot of scatter values. Sample them down. 1023 random_keys = cls._choose_split_points(random_keys, shard_count) 1024 1025 # pylint: disable=redefined-outer-name 1026 key_ranges = [] 1027 1028 key_ranges.append(key_range.KeyRange( 1029 key_start=None, 1030 key_end=random_keys[0], 1031 direction=key_range.KeyRange.ASC, 1032 include_start=False, 1033 include_end=False, 1034 namespace=namespace, 1035 _app=app)) 1036 1037 for i in range(0, len(random_keys) - 1): 1038 key_ranges.append(key_range.KeyRange( 1039 key_start=random_keys[i], 1040 key_end=random_keys[i+1], 1041 direction=key_range.KeyRange.ASC, 1042 include_start=True, 1043 include_end=False, 1044 namespace=namespace, 1045 _app=app)) 1046 1047 key_ranges.append(key_range.KeyRange( 1048 key_start=random_keys[-1], 1049 key_end=None, 1050 direction=key_range.KeyRange.ASC, 1051 include_start=True, 1052 include_end=False, 1053 namespace=namespace, 1054 _app=app)) 1055 1056 if len(key_ranges) < shard_count: 1057 # We need to have as many shards as it was requested. Add some Nones. 1058 key_ranges += [None] * (shard_count - len(key_ranges)) 1059 1060 return key_ranges 1061 1062 @classmethod 1063 def _split_input_from_params(cls, app, namespaces, entity_kind_name, 1064 params, shard_count): 1065 """Return input reader objects. Helper for split_input.""" 1066 # pylint: disable=redefined-outer-name 1067 key_ranges = [] # KeyRanges for all namespaces 1068 for namespace in namespaces: 1069 key_ranges.extend( 1070 cls._split_input_from_namespace(app, 1071 namespace, 1072 entity_kind_name, 1073 shard_count)) 1074 1075 # Divide the KeyRanges into shard_count shards. The KeyRanges for different 1076 # namespaces might be very different in size so the assignment of KeyRanges 1077 # to shards is done round-robin. 1078 shared_ranges = [[] for _ in range(shard_count)] 1079 for i, k_range in enumerate(key_ranges): 1080 shared_ranges[i % shard_count].append(k_range) 1081 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)) 1082 1083 return [cls(entity_kind_name, 1084 key_ranges=key_ranges, 1085 ns_range=None, 1086 batch_size=batch_size) 1087 for key_ranges in shared_ranges if key_ranges] 1088 1089 @classmethod 1090 def validate(cls, mapper_spec): 1091 """Validates mapper spec and all mapper parameters. 1092 1093 Args: 1094 mapper_spec: The MapperSpec for this InputReader. 1095 1096 Raises: 1097 BadReaderParamsError: required parameters are missing or invalid. 1098 """ 1099 if mapper_spec.input_reader_class() != cls: 1100 raise BadReaderParamsError("Input reader class mismatch") 1101 params = _get_params(mapper_spec) 1102 if cls.ENTITY_KIND_PARAM not in params: 1103 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'") 1104 if cls.BATCH_SIZE_PARAM in params: 1105 try: 1106 batch_size = int(params[cls.BATCH_SIZE_PARAM]) 1107 if batch_size < 1: 1108 raise BadReaderParamsError("Bad batch size: %s" % batch_size) 1109 except ValueError, e: 1110 raise BadReaderParamsError("Bad batch size: %s" % e) 1111 if cls.NAMESPACE_PARAM in params: 1112 if not isinstance(params[cls.NAMESPACE_PARAM], 1113 (str, unicode, type(None))): 1114 raise BadReaderParamsError( 1115 "Expected a single namespace string") 1116 if cls.NAMESPACES_PARAM in params: 1117 raise BadReaderParamsError("Multiple namespaces are no longer supported") 1118 if cls.FILTERS_PARAM in params: 1119 filters = params[cls.FILTERS_PARAM] 1120 if not isinstance(filters, list): 1121 raise BadReaderParamsError("Expected list for filters parameter") 1122 for f in filters: 1123 if not isinstance(f, (tuple, list)): 1124 raise BadReaderParamsError("Filter should be a tuple or list: %s", f) 1125 if len(f) != 3: 1126 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f) 1127 if not isinstance(f[0], basestring): 1128 raise BadReaderParamsError("First element should be string: %s", f) 1129 if f[1] != "=": 1130 raise BadReaderParamsError( 1131 "Only equality filters are supported: %s", f) 1132 1133 @classmethod 1134 def split_input(cls, mapper_spec): 1135 """Splits query into shards without fetching query results. 1136 1137 Tries as best as it can to split the whole query result set into equal 1138 shards. Due to difficulty of making the perfect split, resulting shards' 1139 sizes might differ significantly from each other. 1140 1141 Args: 1142 mapper_spec: MapperSpec with params containing 'entity_kind'. 1143 May have 'namespace' in the params as a string containing a single 1144 namespace. If specified then the input reader will only yield values 1145 in the given namespace. If 'namespace' is not given then values from 1146 all namespaces will be yielded. May also have 'batch_size' in the params 1147 to specify the number of entities to process in each batch. 1148 1149 Returns: 1150 A list of InputReader objects. If the query results are empty then the 1151 empty list will be returned. Otherwise, the list will always have a length 1152 equal to number_of_shards but may be padded with Nones if there are too 1153 few results for effective sharding. 1154 """ 1155 params = _get_params(mapper_spec) 1156 entity_kind_name = params[cls.ENTITY_KIND_PARAM] 1157 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)) 1158 shard_count = mapper_spec.shard_count 1159 namespace = params.get(cls.NAMESPACE_PARAM) 1160 app = params.get(cls._APP_PARAM) 1161 filters = params.get(cls.FILTERS_PARAM) 1162 1163 if namespace is None: 1164 # It is difficult to efficiently shard large numbers of namespaces because 1165 # there can be an arbitrary number of them. So the strategy is: 1166 # 1. if there are a small number of namespaces in the datastore then 1167 # generate one KeyRange per namespace per shard and assign each shard a 1168 # KeyRange for every namespace. This should lead to nearly perfect 1169 # sharding. 1170 # 2. if there are a large number of namespaces in the datastore then 1171 # generate one NamespaceRange per worker. This can lead to very bad 1172 # sharding because namespaces can contain very different numbers of 1173 # entities and each NamespaceRange may contain very different numbers 1174 # of namespaces. 1175 namespace_query = datastore.Query("__namespace__", 1176 keys_only=True, 1177 _app=app) 1178 namespace_keys = namespace_query.Get( 1179 limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1) 1180 1181 if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD: 1182 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count, 1183 contiguous=True, 1184 _app=app) 1185 return [cls(entity_kind_name, 1186 key_ranges=None, 1187 ns_range=ns_range, 1188 batch_size=batch_size, 1189 filters=filters) 1190 for ns_range in ns_ranges] 1191 elif not namespace_keys: 1192 return [cls(entity_kind_name, 1193 key_ranges=None, 1194 ns_range=namespace_range.NamespaceRange(_app=app), 1195 batch_size=shard_count, 1196 filters=filters)] 1197 else: 1198 namespaces = [namespace_key.name() or "" 1199 for namespace_key in namespace_keys] 1200 else: 1201 namespaces = [namespace] 1202 1203 readers = cls._split_input_from_params( 1204 app, namespaces, entity_kind_name, params, shard_count) 1205 if filters: 1206 for reader in readers: 1207 reader._filters = filters 1208 return readers 1209 1210 def to_json(self): 1211 """Serializes all the data in this query range into json form. 1212 1213 Returns: 1214 all the data in json-compatible map. 1215 """ 1216 if self._key_ranges is None: 1217 key_ranges_json = None 1218 else: 1219 key_ranges_json = [] 1220 for k in self._key_ranges: 1221 if k: 1222 key_ranges_json.append(k.to_json()) 1223 else: 1224 key_ranges_json.append(None) 1225 1226 if self._ns_range is None: 1227 namespace_range_json = None 1228 else: 1229 namespace_range_json = self._ns_range.to_json_object() 1230 1231 if self._current_key_range is None: 1232 current_key_range_json = None 1233 else: 1234 current_key_range_json = self._current_key_range.to_json() 1235 1236 json_dict = {self.KEY_RANGE_PARAM: key_ranges_json, 1237 self.NAMESPACE_RANGE_PARAM: namespace_range_json, 1238 self.CURRENT_KEY_RANGE_PARAM: current_key_range_json, 1239 self.ENTITY_KIND_PARAM: self._entity_kind, 1240 self.BATCH_SIZE_PARAM: self._batch_size, 1241 self.FILTERS_PARAM: self._filters} 1242 return json_dict 1243 1244 @classmethod 1245 def from_json(cls, json): 1246 """Create new DatastoreInputReader from the json, encoded by to_json. 1247 1248 Args: 1249 json: json map representation of DatastoreInputReader. 1250 1251 Returns: 1252 an instance of DatastoreInputReader with all data deserialized from json. 1253 """ 1254 if json[cls.KEY_RANGE_PARAM] is None: 1255 # pylint: disable=redefined-outer-name 1256 key_ranges = None 1257 else: 1258 key_ranges = [] 1259 for k in json[cls.KEY_RANGE_PARAM]: 1260 if k: 1261 key_ranges.append(key_range.KeyRange.from_json(k)) 1262 else: 1263 key_ranges.append(None) 1264 1265 if json[cls.NAMESPACE_RANGE_PARAM] is None: 1266 ns_range = None 1267 else: 1268 ns_range = namespace_range.NamespaceRange.from_json_object( 1269 json[cls.NAMESPACE_RANGE_PARAM]) 1270 1271 if json[cls.CURRENT_KEY_RANGE_PARAM] is None: 1272 current_key_range = None 1273 else: 1274 current_key_range = key_range.KeyRange.from_json( 1275 json[cls.CURRENT_KEY_RANGE_PARAM]) 1276 1277 return cls( 1278 json[cls.ENTITY_KIND_PARAM], 1279 key_ranges, 1280 ns_range, 1281 json[cls.BATCH_SIZE_PARAM], 1282 current_key_range, 1283 filters=json.get(cls.FILTERS_PARAM)) 1284 1285 1286class BlobstoreLineInputReader(InputReader): 1287 """Input reader for a newline delimited blob in Blobstore.""" 1288 1289 # TODO(user): Should we set this based on MAX_BLOB_FETCH_SIZE? 1290 _BLOB_BUFFER_SIZE = 64000 1291 1292 # Maximum number of shards to allow. 1293 _MAX_SHARD_COUNT = 256 1294 1295 # Maximum number of blobs to allow. 1296 _MAX_BLOB_KEYS_COUNT = 246 1297 1298 # Mapreduce parameters. 1299 BLOB_KEYS_PARAM = "blob_keys" 1300 1301 # Serialization parmaeters. 1302 INITIAL_POSITION_PARAM = "initial_position" 1303 END_POSITION_PARAM = "end_position" 1304 BLOB_KEY_PARAM = "blob_key" 1305 1306 def __init__(self, blob_key, start_position, end_position): 1307 """Initializes this instance with the given blob key and character range. 1308 1309 This BlobstoreInputReader will read from the first record starting after 1310 strictly after start_position until the first record ending at or after 1311 end_position (exclusive). As an exception, if start_position is 0, then 1312 this InputReader starts reading at the first record. 1313 1314 Args: 1315 blob_key: the BlobKey that this input reader is processing. 1316 start_position: the position to start reading at. 1317 end_position: a position in the last record to read. 1318 """ 1319 self._blob_key = blob_key 1320 self._blob_reader = blobstore.BlobReader(blob_key, 1321 self._BLOB_BUFFER_SIZE, 1322 start_position) 1323 self._end_position = end_position 1324 self._has_iterated = False 1325 self._read_before_start = bool(start_position) 1326 1327 def next(self): 1328 """Returns the next input from as an (offset, line) tuple.""" 1329 self._has_iterated = True 1330 1331 if self._read_before_start: 1332 self._blob_reader.readline() 1333 self._read_before_start = False 1334 start_position = self._blob_reader.tell() 1335 1336 if start_position > self._end_position: 1337 raise StopIteration() 1338 1339 line = self._blob_reader.readline() 1340 1341 if not line: 1342 raise StopIteration() 1343 1344 return start_position, line.rstrip("\n") 1345 1346 def to_json(self): 1347 """Returns an json-compatible input shard spec for remaining inputs.""" 1348 new_pos = self._blob_reader.tell() 1349 if self._has_iterated: 1350 new_pos -= 1 1351 return {self.BLOB_KEY_PARAM: self._blob_key, 1352 self.INITIAL_POSITION_PARAM: new_pos, 1353 self.END_POSITION_PARAM: self._end_position} 1354 1355 def __str__(self): 1356 """Returns the string representation of this BlobstoreLineInputReader.""" 1357 return "blobstore.BlobKey(%r):[%d, %d]" % ( 1358 self._blob_key, self._blob_reader.tell(), self._end_position) 1359 1360 @classmethod 1361 def from_json(cls, json): 1362 """Instantiates an instance of this InputReader for the given shard spec.""" 1363 return cls(json[cls.BLOB_KEY_PARAM], 1364 json[cls.INITIAL_POSITION_PARAM], 1365 json[cls.END_POSITION_PARAM]) 1366 1367 @classmethod 1368 def validate(cls, mapper_spec): 1369 """Validates mapper spec and all mapper parameters. 1370 1371 Args: 1372 mapper_spec: The MapperSpec for this InputReader. 1373 1374 Raises: 1375 BadReaderParamsError: required parameters are missing or invalid. 1376 """ 1377 if mapper_spec.input_reader_class() != cls: 1378 raise BadReaderParamsError("Mapper input reader class mismatch") 1379 params = _get_params(mapper_spec) 1380 if cls.BLOB_KEYS_PARAM not in params: 1381 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input") 1382 blob_keys = params[cls.BLOB_KEYS_PARAM] 1383 if isinstance(blob_keys, basestring): 1384 # This is a mechanism to allow multiple blob keys (which do not contain 1385 # commas) in a single string. It may go away. 1386 blob_keys = blob_keys.split(",") 1387 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT: 1388 raise BadReaderParamsError("Too many 'blob_keys' for mapper input") 1389 if not blob_keys: 1390 raise BadReaderParamsError("No 'blob_keys' specified for mapper input") 1391 for blob_key in blob_keys: 1392 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key)) 1393 if not blob_info: 1394 raise BadReaderParamsError("Could not find blobinfo for key %s" % 1395 blob_key) 1396 1397 @classmethod 1398 def split_input(cls, mapper_spec): 1399 """Returns a list of shard_count input_spec_shards for input_spec. 1400 1401 Args: 1402 mapper_spec: The mapper specification to split from. Must contain 1403 'blob_keys' parameter with one or more blob keys. 1404 1405 Returns: 1406 A list of BlobstoreInputReaders corresponding to the specified shards. 1407 """ 1408 params = _get_params(mapper_spec) 1409 blob_keys = params[cls.BLOB_KEYS_PARAM] 1410 if isinstance(blob_keys, basestring): 1411 # This is a mechanism to allow multiple blob keys (which do not contain 1412 # commas) in a single string. It may go away. 1413 blob_keys = blob_keys.split(",") 1414 1415 blob_sizes = {} 1416 for blob_key in blob_keys: 1417 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key)) 1418 blob_sizes[blob_key] = blob_info.size 1419 1420 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count) 1421 shards_per_blob = shard_count // len(blob_keys) 1422 if shards_per_blob == 0: 1423 shards_per_blob = 1 1424 1425 chunks = [] 1426 for blob_key, blob_size in blob_sizes.items(): 1427 blob_chunk_size = blob_size // shards_per_blob 1428 for i in xrange(shards_per_blob - 1): 1429 chunks.append(BlobstoreLineInputReader.from_json( 1430 {cls.BLOB_KEY_PARAM: blob_key, 1431 cls.INITIAL_POSITION_PARAM: blob_chunk_size * i, 1432 cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)})) 1433 chunks.append(BlobstoreLineInputReader.from_json( 1434 {cls.BLOB_KEY_PARAM: blob_key, 1435 cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1), 1436 cls.END_POSITION_PARAM: blob_size})) 1437 return chunks 1438 1439 1440class BlobstoreZipInputReader(InputReader): 1441 """Input reader for files from a zip archive stored in the Blobstore. 1442 1443 Each instance of the reader will read the TOC, from the end of the zip file, 1444 and then only the contained files which it is responsible for. 1445 """ 1446 1447 # Maximum number of shards to allow. 1448 _MAX_SHARD_COUNT = 256 1449 1450 # Mapreduce parameters. 1451 BLOB_KEY_PARAM = "blob_key" 1452 START_INDEX_PARAM = "start_index" 1453 END_INDEX_PARAM = "end_index" 1454 1455 def __init__(self, blob_key, start_index, end_index, 1456 _reader=blobstore.BlobReader): 1457 """Initializes this instance with the given blob key and file range. 1458 1459 This BlobstoreZipInputReader will read from the file with index start_index 1460 up to but not including the file with index end_index. 1461 1462 Args: 1463 blob_key: the BlobKey that this input reader is processing. 1464 start_index: the index of the first file to read. 1465 end_index: the index of the first file that will not be read. 1466 _reader: a callable that returns a file-like object for reading blobs. 1467 Used for dependency injection. 1468 """ 1469 self._blob_key = blob_key 1470 self._start_index = start_index 1471 self._end_index = end_index 1472 self._reader = _reader 1473 self._zip = None 1474 self._entries = None 1475 1476 def next(self): 1477 """Returns the next input from this input reader as (ZipInfo, opener) tuple. 1478 1479 Returns: 1480 The next input from this input reader, in the form of a 2-tuple. 1481 The first element of the tuple is a zipfile.ZipInfo object. 1482 The second element of the tuple is a zero-argument function that, when 1483 called, returns the complete body of the file. 1484 """ 1485 if not self._zip: 1486 self._zip = zipfile.ZipFile(self._reader(self._blob_key)) 1487 # Get a list of entries, reversed so we can pop entries off in order 1488 self._entries = self._zip.infolist()[self._start_index:self._end_index] 1489 self._entries.reverse() 1490 if not self._entries: 1491 raise StopIteration() 1492 entry = self._entries.pop() 1493 self._start_index += 1 1494 return (entry, lambda: self._read(entry)) 1495 1496 def _read(self, entry): 1497 """Read entry content. 1498 1499 Args: 1500 entry: zip file entry as zipfile.ZipInfo. 1501 Returns: 1502 Entry content as string. 1503 """ 1504 start_time = time.time() 1505 content = self._zip.read(entry.filename) 1506 1507 ctx = context.get() 1508 if ctx: 1509 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx) 1510 operation.counters.Increment( 1511 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx) 1512 1513 return content 1514 1515 @classmethod 1516 def from_json(cls, json): 1517 """Creates an instance of the InputReader for the given input shard state. 1518 1519 Args: 1520 json: The InputReader state as a dict-like object. 1521 1522 Returns: 1523 An instance of the InputReader configured using the values of json. 1524 """ 1525 return cls(json[cls.BLOB_KEY_PARAM], 1526 json[cls.START_INDEX_PARAM], 1527 json[cls.END_INDEX_PARAM]) 1528 1529 def to_json(self): 1530 """Returns an input shard state for the remaining inputs. 1531 1532 Returns: 1533 A json-izable version of the remaining InputReader. 1534 """ 1535 return {self.BLOB_KEY_PARAM: self._blob_key, 1536 self.START_INDEX_PARAM: self._start_index, 1537 self.END_INDEX_PARAM: self._end_index} 1538 1539 def __str__(self): 1540 """Returns the string representation of this BlobstoreZipInputReader.""" 1541 return "blobstore.BlobKey(%r):[%d, %d]" % ( 1542 self._blob_key, self._start_index, self._end_index) 1543 1544 @classmethod 1545 def validate(cls, mapper_spec): 1546 """Validates mapper spec and all mapper parameters. 1547 1548 Args: 1549 mapper_spec: The MapperSpec for this InputReader. 1550 1551 Raises: 1552 BadReaderParamsError: required parameters are missing or invalid. 1553 """ 1554 if mapper_spec.input_reader_class() != cls: 1555 raise BadReaderParamsError("Mapper input reader class mismatch") 1556 params = _get_params(mapper_spec) 1557 if cls.BLOB_KEY_PARAM not in params: 1558 raise BadReaderParamsError("Must specify 'blob_key' for mapper input") 1559 blob_key = params[cls.BLOB_KEY_PARAM] 1560 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key)) 1561 if not blob_info: 1562 raise BadReaderParamsError("Could not find blobinfo for key %s" % 1563 blob_key) 1564 1565 @classmethod 1566 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader): 1567 """Returns a list of input shard states for the input spec. 1568 1569 Args: 1570 mapper_spec: The MapperSpec for this InputReader. Must contain 1571 'blob_key' parameter with one blob key. 1572 _reader: a callable that returns a file-like object for reading blobs. 1573 Used for dependency injection. 1574 1575 Returns: 1576 A list of InputReaders spanning files within the zip. 1577 """ 1578 params = _get_params(mapper_spec) 1579 blob_key = params[cls.BLOB_KEY_PARAM] 1580 zip_input = zipfile.ZipFile(_reader(blob_key)) 1581 zfiles = zip_input.infolist() 1582 total_size = sum(x.file_size for x in zfiles) 1583 num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT) 1584 size_per_shard = total_size // num_shards 1585 1586 # Break the list of files into sublists, each of approximately 1587 # size_per_shard bytes. 1588 shard_start_indexes = [0] 1589 current_shard_size = 0 1590 for i, fileinfo in enumerate(zfiles): 1591 current_shard_size += fileinfo.file_size 1592 if current_shard_size >= size_per_shard: 1593 shard_start_indexes.append(i + 1) 1594 current_shard_size = 0 1595 1596 if shard_start_indexes[-1] != len(zfiles): 1597 shard_start_indexes.append(len(zfiles)) 1598 1599 return [cls(blob_key, start_index, end_index, _reader) 1600 for start_index, end_index 1601 in zip(shard_start_indexes, shard_start_indexes[1:])] 1602 1603 1604class BlobstoreZipLineInputReader(InputReader): 1605 """Input reader for newline delimited files in zip archives from Blobstore. 1606 1607 This has the same external interface as the BlobstoreLineInputReader, in that 1608 it takes a list of blobs as its input and yields lines to the reader. 1609 However the blobs themselves are expected to be zip archives of line delimited 1610 files instead of the files themselves. 1611 1612 This is useful as many line delimited files gain greatly from compression. 1613 """ 1614 1615 # Maximum number of shards to allow. 1616 _MAX_SHARD_COUNT = 256 1617 1618 # Maximum number of blobs to allow. 1619 _MAX_BLOB_KEYS_COUNT = 246 1620 1621 # Mapreduce parameters. 1622 BLOB_KEYS_PARAM = "blob_keys" 1623 1624 # Serialization parameters. 1625 BLOB_KEY_PARAM = "blob_key" 1626 START_FILE_INDEX_PARAM = "start_file_index" 1627 END_FILE_INDEX_PARAM = "end_file_index" 1628 OFFSET_PARAM = "offset" 1629 1630 def __init__(self, blob_key, start_file_index, end_file_index, offset, 1631 _reader=blobstore.BlobReader): 1632 """Initializes this instance with the given blob key and file range. 1633 1634 This BlobstoreZipLineInputReader will read from the file with index 1635 start_file_index up to but not including the file with index end_file_index. 1636 It will return lines starting at offset within file[start_file_index] 1637 1638 Args: 1639 blob_key: the BlobKey that this input reader is processing. 1640 start_file_index: the index of the first file to read within the zip. 1641 end_file_index: the index of the first file that will not be read. 1642 offset: the byte offset within blob_key.zip[start_file_index] to start 1643 reading. The reader will continue to the end of the file. 1644 _reader: a callable that returns a file-like object for reading blobs. 1645 Used for dependency injection. 1646 """ 1647 self._blob_key = blob_key 1648 self._start_file_index = start_file_index 1649 self._end_file_index = end_file_index 1650 self._initial_offset = offset 1651 self._reader = _reader 1652 self._zip = None 1653 self._entries = None 1654 self._filestream = None 1655 1656 @classmethod 1657 def validate(cls, mapper_spec): 1658 """Validates mapper spec and all mapper parameters. 1659 1660 Args: 1661 mapper_spec: The MapperSpec for this InputReader. 1662 1663 Raises: 1664 BadReaderParamsError: required parameters are missing or invalid. 1665 """ 1666 if mapper_spec.input_reader_class() != cls: 1667 raise BadReaderParamsError("Mapper input reader class mismatch") 1668 params = _get_params(mapper_spec) 1669 if cls.BLOB_KEYS_PARAM not in params: 1670 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input") 1671 1672 blob_keys = params[cls.BLOB_KEYS_PARAM] 1673 if isinstance(blob_keys, basestring): 1674 # This is a mechanism to allow multiple blob keys (which do not contain 1675 # commas) in a single string. It may go away. 1676 blob_keys = blob_keys.split(",") 1677 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT: 1678 raise BadReaderParamsError("Too many 'blob_keys' for mapper input") 1679 if not blob_keys: 1680 raise BadReaderParamsError("No 'blob_keys' specified for mapper input") 1681 for blob_key in blob_keys: 1682 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key)) 1683 if not blob_info: 1684 raise BadReaderParamsError("Could not find blobinfo for key %s" % 1685 blob_key) 1686 1687 @classmethod 1688 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader): 1689 """Returns a list of input readers for the input spec. 1690 1691 Args: 1692 mapper_spec: The MapperSpec for this InputReader. Must contain 1693 'blob_keys' parameter with one or more blob keys. 1694 _reader: a callable that returns a file-like object for reading blobs. 1695 Used for dependency injection. 1696 1697 Returns: 1698 A list of InputReaders spanning the subfiles within the blobs. 1699 There will be at least one reader per blob, but it will otherwise 1700 attempt to keep the expanded size even. 1701 """ 1702 params = _get_params(mapper_spec) 1703 blob_keys = params[cls.BLOB_KEYS_PARAM] 1704 if isinstance(blob_keys, basestring): 1705 # This is a mechanism to allow multiple blob keys (which do not contain 1706 # commas) in a single string. It may go away. 1707 blob_keys = blob_keys.split(",") 1708 1709 blob_files = {} 1710 total_size = 0 1711 for blob_key in blob_keys: 1712 zip_input = zipfile.ZipFile(_reader(blob_key)) 1713 blob_files[blob_key] = zip_input.infolist() 1714 total_size += sum(x.file_size for x in blob_files[blob_key]) 1715 1716 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count) 1717 1718 # We can break on both blob key and file-within-zip boundaries. 1719 # A shard will span at minimum a single blob key, but may only 1720 # handle a few files within a blob. 1721 1722 size_per_shard = total_size // shard_count 1723 1724 readers = [] 1725 for blob_key in blob_keys: 1726 bfiles = blob_files[blob_key] 1727 current_shard_size = 0 1728 start_file_index = 0 1729 next_file_index = 0 1730 for fileinfo in bfiles: 1731 next_file_index += 1 1732 current_shard_size += fileinfo.file_size 1733 if current_shard_size >= size_per_shard: 1734 readers.append(cls(blob_key, start_file_index, next_file_index, 0, 1735 _reader)) 1736 current_shard_size = 0 1737 start_file_index = next_file_index 1738 if current_shard_size != 0: 1739 readers.append(cls(blob_key, start_file_index, next_file_index, 0, 1740 _reader)) 1741 1742 return readers 1743 1744 def next(self): 1745 """Returns the next line from this input reader as (lineinfo, line) tuple. 1746 1747 Returns: 1748 The next input from this input reader, in the form of a 2-tuple. 1749 The first element of the tuple describes the source, it is itself 1750 a tuple (blobkey, filenumber, byteoffset). 1751 The second element of the tuple is the line found at that offset. 1752 """ 1753 if not self._filestream: 1754 if not self._zip: 1755 self._zip = zipfile.ZipFile(self._reader(self._blob_key)) 1756 # Get a list of entries, reversed so we can pop entries off in order 1757 self._entries = self._zip.infolist()[self._start_file_index: 1758 self._end_file_index] 1759 self._entries.reverse() 1760 if not self._entries: 1761 raise StopIteration() 1762 entry = self._entries.pop() 1763 value = self._zip.read(entry.filename) 1764 self._filestream = StringIO.StringIO(value) 1765 if self._initial_offset: 1766 self._filestream.seek(self._initial_offset) 1767 self._filestream.readline() 1768 1769 start_position = self._filestream.tell() 1770 line = self._filestream.readline() 1771 1772 if not line: 1773 # Done with this file in the zip. Move on to the next file. 1774 self._filestream.close() 1775 self._filestream = None 1776 self._start_file_index += 1 1777 self._initial_offset = 0 1778 return self.next() 1779 1780 return ((self._blob_key, self._start_file_index, start_position), 1781 line.rstrip("\n")) 1782 1783 def _next_offset(self): 1784 """Return the offset of the next line to read.""" 1785 if self._filestream: 1786 offset = self._filestream.tell() 1787 if offset: 1788 offset -= 1 1789 else: 1790 offset = self._initial_offset 1791 1792 return offset 1793 1794 def to_json(self): 1795 """Returns an input shard state for the remaining inputs. 1796 1797 Returns: 1798 A json-izable version of the remaining InputReader. 1799 """ 1800 1801 return {self.BLOB_KEY_PARAM: self._blob_key, 1802 self.START_FILE_INDEX_PARAM: self._start_file_index, 1803 self.END_FILE_INDEX_PARAM: self._end_file_index, 1804 self.OFFSET_PARAM: self._next_offset()} 1805 1806 @classmethod 1807 def from_json(cls, json, _reader=blobstore.BlobReader): 1808 """Creates an instance of the InputReader for the given input shard state. 1809 1810 Args: 1811 json: The InputReader state as a dict-like object. 1812 _reader: For dependency injection. 1813 1814 Returns: 1815 An instance of the InputReader configured using the values of json. 1816 """ 1817 return cls(json[cls.BLOB_KEY_PARAM], 1818 json[cls.START_FILE_INDEX_PARAM], 1819 json[cls.END_FILE_INDEX_PARAM], 1820 json[cls.OFFSET_PARAM], 1821 _reader) 1822 1823 def __str__(self): 1824 """Returns the string representation of this reader. 1825 1826 Returns: 1827 string blobkey:[start file num, end file num]:current offset. 1828 """ 1829 return "blobstore.BlobKey(%r):[%d, %d]:%d" % ( 1830 self._blob_key, self._start_file_index, self._end_file_index, 1831 self._next_offset()) 1832 1833 1834class RandomStringInputReader(InputReader): 1835 """RandomStringInputReader generates random strings as output. 1836 1837 Primary usage is to populate output with testing entries. 1838 """ 1839 1840 # Total number of entries this reader should generate. 1841 COUNT = "count" 1842 # Length of the generated strings. 1843 STRING_LENGTH = "string_length" 1844 1845 DEFAULT_STRING_LENGTH = 10 1846 1847 def __init__(self, count, string_length): 1848 """Initialize input reader. 1849 1850 Args: 1851 count: number of entries this shard should generate. 1852 string_length: the length of generated random strings. 1853 """ 1854 self._count = count 1855 self._string_length = string_length 1856 1857 def __iter__(self): 1858 ctx = context.get() 1859 1860 while self._count: 1861 self._count -= 1 1862 start_time = time.time() 1863 content = "".join(random.choice(string.ascii_lowercase) 1864 for _ in range(self._string_length)) 1865 if ctx: 1866 operation.counters.Increment( 1867 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx) 1868 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx) 1869 yield content 1870 1871 @classmethod 1872 def split_input(cls, mapper_spec): 1873 params = _get_params(mapper_spec) 1874 count = params[cls.COUNT] 1875 string_length = cls.DEFAULT_STRING_LENGTH 1876 if cls.STRING_LENGTH in params: 1877 string_length = params[cls.STRING_LENGTH] 1878 1879 shard_count = mapper_spec.shard_count 1880 count_per_shard = count // shard_count 1881 1882 mr_input_readers = [ 1883 cls(count_per_shard, string_length) for _ in range(shard_count)] 1884 1885 left = count - count_per_shard*shard_count 1886 if left > 0: 1887 mr_input_readers.append(cls(left, string_length)) 1888 1889 return mr_input_readers 1890 1891 @classmethod 1892 def validate(cls, mapper_spec): 1893 if mapper_spec.input_reader_class() != cls: 1894 raise BadReaderParamsError("Mapper input reader class mismatch") 1895 1896 params = _get_params(mapper_spec) 1897 if cls.COUNT not in params: 1898 raise BadReaderParamsError("Must specify %s" % cls.COUNT) 1899 if not isinstance(params[cls.COUNT], int): 1900 raise BadReaderParamsError("%s should be an int but is %s" % 1901 (cls.COUNT, type(params[cls.COUNT]))) 1902 if params[cls.COUNT] <= 0: 1903 raise BadReaderParamsError("%s should be a positive int") 1904 if cls.STRING_LENGTH in params and not ( 1905 isinstance(params[cls.STRING_LENGTH], int) and 1906 params[cls.STRING_LENGTH] > 0): 1907 raise BadReaderParamsError("%s should be a positive int but is %s" % 1908 (cls.STRING_LENGTH, params[cls.STRING_LENGTH])) 1909 if (not isinstance(mapper_spec.shard_count, int) or 1910 mapper_spec.shard_count <= 0): 1911 raise BadReaderParamsError( 1912 "shard_count should be a positive int but is %s" % 1913 mapper_spec.shard_count) 1914 1915 @classmethod 1916 def from_json(cls, json): 1917 return cls(json[cls.COUNT], json[cls.STRING_LENGTH]) 1918 1919 def to_json(self): 1920 return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length} 1921 1922 1923# TODO(user): This reader always produces only one shard, because 1924# namespace entities use the mix of ids/names, and KeyRange-based splitting 1925# doesn't work satisfactory in this case. 1926# It's possible to implement specific splitting functionality for the reader 1927# instead of reusing generic one. Meanwhile 1 shard is enough for our 1928# applications. 1929class NamespaceInputReader(InputReader): 1930 """An input reader to iterate over namespaces. 1931 1932 This reader yields namespace names as string. 1933 It will always produce only one shard. 1934 """ 1935 1936 NAMESPACE_RANGE_PARAM = "namespace_range" 1937 BATCH_SIZE_PARAM = "batch_size" 1938 _BATCH_SIZE = 10 1939 1940 def __init__(self, ns_range, batch_size=_BATCH_SIZE): 1941 self.ns_range = ns_range 1942 self._batch_size = batch_size 1943 1944 def to_json(self): 1945 """Serializes all the data in this query range into json form. 1946 1947 Returns: 1948 all the data in json-compatible map. 1949 """ 1950 return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(), 1951 self.BATCH_SIZE_PARAM: self._batch_size} 1952 1953 @classmethod 1954 def from_json(cls, json): 1955 """Create new DatastoreInputReader from the json, encoded by to_json. 1956 1957 Args: 1958 json: json map representation of DatastoreInputReader. 1959 1960 Returns: 1961 an instance of DatastoreInputReader with all data deserialized from json. 1962 """ 1963 return cls( 1964 namespace_range.NamespaceRange.from_json_object( 1965 json[cls.NAMESPACE_RANGE_PARAM]), 1966 json[cls.BATCH_SIZE_PARAM]) 1967 1968 @classmethod 1969 def validate(cls, mapper_spec): 1970 """Validates mapper spec. 1971 1972 Args: 1973 mapper_spec: The MapperSpec for this InputReader. 1974 1975 Raises: 1976 BadReaderParamsError: required parameters are missing or invalid. 1977 """ 1978 if mapper_spec.input_reader_class() != cls: 1979 raise BadReaderParamsError("Input reader class mismatch") 1980 params = _get_params(mapper_spec) 1981 if cls.BATCH_SIZE_PARAM in params: 1982 try: 1983 batch_size = int(params[cls.BATCH_SIZE_PARAM]) 1984 if batch_size < 1: 1985 raise BadReaderParamsError("Bad batch size: %s" % batch_size) 1986 except ValueError, e: 1987 raise BadReaderParamsError("Bad batch size: %s" % e) 1988 1989 @classmethod 1990 def split_input(cls, mapper_spec): 1991 """Returns a list of input readers for the input spec. 1992 1993 Args: 1994 mapper_spec: The MapperSpec for this InputReader. 1995 1996 Returns: 1997 A list of InputReaders. 1998 """ 1999 batch_size = int(_get_params(mapper_spec).get( 2000 cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)) 2001 shard_count = mapper_spec.shard_count 2002 namespace_ranges = namespace_range.NamespaceRange.split(shard_count, 2003 contiguous=True) 2004 return [NamespaceInputReader(ns_range, batch_size) 2005 for ns_range in namespace_ranges] 2006 2007 def __iter__(self): 2008 while True: 2009 keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size) 2010 if not keys: 2011 break 2012 2013 for key in keys: 2014 namespace = metadata.Namespace.key_to_namespace(key) 2015 self.ns_range = self.ns_range.with_start_after(namespace) 2016 yield namespace 2017 2018 def __str__(self): 2019 return repr(self.ns_range) 2020 2021 2022class LogInputReader(InputReader): 2023 """Input reader for a time range of logs via the Logs Reader API. 2024 2025 The number of input shards may be specified by the SHARDS_PARAM mapper 2026 parameter. A starting and ending time (in seconds since the Unix epoch) are 2027 required to generate time ranges over which to shard the input. 2028 """ 2029 # Parameters directly mapping to those available via logservice.fetch(). 2030 START_TIME_PARAM = "start_time" 2031 END_TIME_PARAM = "end_time" 2032 MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level" 2033 INCLUDE_INCOMPLETE_PARAM = "include_incomplete" 2034 INCLUDE_APP_LOGS_PARAM = "include_app_logs" 2035 VERSION_IDS_PARAM = "version_ids" 2036 MODULE_VERSIONS_PARAM = "module_versions" 2037 2038 # Semi-hidden parameters used only internally or for privileged applications. 2039 _OFFSET_PARAM = "offset" 2040 _PROTOTYPE_REQUEST_PARAM = "prototype_request" 2041 2042 _PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM, 2043 MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM, 2044 INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM, 2045 MODULE_VERSIONS_PARAM, _PROTOTYPE_REQUEST_PARAM]) 2046 _KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM]) 2047 2048 def __init__(self, 2049 start_time=None, 2050 end_time=None, 2051 minimum_log_level=None, 2052 include_incomplete=False, 2053 include_app_logs=False, 2054 version_ids=None, 2055 module_versions=None, 2056 **kwargs): 2057 """Constructor. 2058 2059 Args: 2060 start_time: The earliest request completion or last-update time of logs 2061 that should be mapped over, in seconds since the Unix epoch. 2062 end_time: The latest request completion or last-update time that logs 2063 should be mapped over, in seconds since the Unix epoch. 2064 minimum_log_level: An application log level which serves as a filter on 2065 the requests mapped over--requests with no application log at or above 2066 the specified level will be omitted, even if include_app_logs is False. 2067 include_incomplete: Whether or not to include requests that have started 2068 but not yet finished, as a boolean. Defaults to False. 2069 include_app_logs: Whether or not to include application level logs in the 2070 mapped logs, as a boolean. Defaults to False. 2071 version_ids: A list of version ids whose logs should be read. This can not 2072 be used with module_versions 2073 module_versions: A list of tuples containing a module and version id 2074 whose logs should be read. This can not be used with version_ids 2075 **kwargs: A dictionary of keywords associated with this input reader. 2076 """ 2077 InputReader.__init__(self) # pylint: disable=non-parent-init-called 2078 2079 # The rule for __params is that its contents will always be suitable as 2080 # input to logservice.fetch(). 2081 self.__params = dict(kwargs) 2082 2083 if start_time is not None: 2084 self.__params[self.START_TIME_PARAM] = start_time 2085 if end_time is not None: 2086 self.__params[self.END_TIME_PARAM] = end_time 2087 if minimum_log_level is not None: 2088 self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level 2089 if include_incomplete is not None: 2090 self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete 2091 if include_app_logs is not None: 2092 self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs 2093 if version_ids: 2094 self.__params[self.VERSION_IDS_PARAM] = version_ids 2095 if module_versions: 2096 self.__params[self.MODULE_VERSIONS_PARAM] = module_versions 2097 2098 # Any submitted prototype_request will be in encoded form. 2099 if self._PROTOTYPE_REQUEST_PARAM in self.__params: 2100 prototype_request = log_service_pb.LogReadRequest( 2101 self.__params[self._PROTOTYPE_REQUEST_PARAM]) 2102 self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request 2103 2104 def __iter__(self): 2105 """Iterates over logs in a given range of time. 2106 2107 Yields: 2108 A RequestLog containing all the information for a single request. 2109 """ 2110 for log in logservice.fetch(**self.__params): 2111 self.__params[self._OFFSET_PARAM] = log.offset 2112 yield log 2113 2114 @classmethod 2115 def from_json(cls, json): 2116 """Creates an instance of the InputReader for the given input shard's state. 2117 2118 Args: 2119 json: The InputReader state as a dict-like object. 2120 2121 Returns: 2122 An instance of the InputReader configured using the given JSON parameters. 2123 """ 2124 # Strip out unrecognized parameters, as introduced by b/5960884. 2125 params = dict((str(k), v) for k, v in json.iteritems() 2126 if k in cls._PARAMS) 2127 2128 # This is not symmetric with to_json() wrt. PROTOTYPE_REQUEST_PARAM because 2129 # the constructor parameters need to be JSON-encodable, so the decoding 2130 # needs to happen there anyways. 2131 if cls._OFFSET_PARAM in params: 2132 params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM]) 2133 return cls(**params) 2134 2135 def to_json(self): 2136 """Returns an input shard state for the remaining inputs. 2137 2138 Returns: 2139 A JSON serializable version of the remaining input to read. 2140 """ 2141 2142 params = dict(self.__params) # Shallow copy. 2143 if self._PROTOTYPE_REQUEST_PARAM in params: 2144 prototype_request = params[self._PROTOTYPE_REQUEST_PARAM] 2145 params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode() 2146 if self._OFFSET_PARAM in params: 2147 params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM]) 2148 return params 2149 2150 @classmethod 2151 def split_input(cls, mapper_spec): 2152 """Returns a list of input readers for the given input specification. 2153 2154 Args: 2155 mapper_spec: The MapperSpec for this InputReader. 2156 2157 Returns: 2158 A list of InputReaders. 2159 """ 2160 params = _get_params(mapper_spec) 2161 shard_count = mapper_spec.shard_count 2162 2163 # Pick out the overall start and end times and time step per shard. 2164 start_time = params[cls.START_TIME_PARAM] 2165 end_time = params[cls.END_TIME_PARAM] 2166 seconds_per_shard = (end_time - start_time) / shard_count 2167 2168 # Create a LogInputReader for each shard, modulating the params as we go. 2169 shards = [] 2170 for _ in xrange(shard_count - 1): 2171 params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] + 2172 seconds_per_shard) 2173 shards.append(LogInputReader(**params)) 2174 params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM] 2175 2176 # Create a final shard to complete the time range. 2177 params[cls.END_TIME_PARAM] = end_time 2178 return shards + [LogInputReader(**params)] 2179 2180 @classmethod 2181 def validate(cls, mapper_spec): 2182 """Validates the mapper's specification and all necessary parameters. 2183 2184 Args: 2185 mapper_spec: The MapperSpec to be used with this InputReader. 2186 2187 Raises: 2188 BadReaderParamsError: If the user fails to specify both a starting time 2189 and an ending time, or if the starting time is later than the ending 2190 time. 2191 """ 2192 if mapper_spec.input_reader_class() != cls: 2193 raise errors.BadReaderParamsError("Input reader class mismatch") 2194 2195 params = _get_params(mapper_spec, allowed_keys=cls._PARAMS) 2196 if (cls.VERSION_IDS_PARAM not in params and 2197 cls.MODULE_VERSIONS_PARAM not in params): 2198 raise errors.BadReaderParamsError("Must specify a list of version ids or " 2199 "module/version ids for mapper input") 2200 if (cls.VERSION_IDS_PARAM in params and 2201 cls.MODULE_VERSIONS_PARAM in params): 2202 raise errors.BadReaderParamsError("Can not supply both version ids or " 2203 "module/version ids. Use only one.") 2204 if (cls.START_TIME_PARAM not in params or 2205 params[cls.START_TIME_PARAM] is None): 2206 raise errors.BadReaderParamsError("Must specify a starting time for " 2207 "mapper input") 2208 if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None: 2209 params[cls.END_TIME_PARAM] = time.time() 2210 2211 if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]: 2212 raise errors.BadReaderParamsError("The starting time cannot be later " 2213 "than or the same as the ending time.") 2214 2215 if cls._PROTOTYPE_REQUEST_PARAM in params: 2216 try: 2217 params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest( 2218 params[cls._PROTOTYPE_REQUEST_PARAM]) 2219 except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError): 2220 raise errors.BadReaderParamsError("The prototype request must be " 2221 "parseable as a LogReadRequest.") 2222 2223 # Pass the parameters to logservice.fetch() to verify any underlying 2224 # constraints on types or values. This only constructs an iterator, it 2225 # doesn't trigger any requests for actual log records. 2226 try: 2227 logservice.fetch(**params) 2228 except logservice.InvalidArgumentError, e: 2229 raise errors.BadReaderParamsError("One or more parameters are not valid " 2230 "inputs to logservice.fetch(): %s" % e) 2231 2232 def __str__(self): 2233 """Returns the string representation of this LogInputReader.""" 2234 params = [] 2235 for key in sorted(self.__params.keys()): 2236 value = self.__params[key] 2237 if key is self._PROTOTYPE_REQUEST_PARAM: 2238 params.append("%s='%s'" % (key, value)) 2239 elif key is self._OFFSET_PARAM: 2240 params.append("%s='%s'" % (key, value)) 2241 else: 2242 params.append("%s=%s" % (key, value)) 2243 2244 return "LogInputReader(%s)" % ", ".join(params) 2245 2246 2247class _GoogleCloudStorageInputReader(InputReader): 2248 """Input reader from Google Cloud Storage using the cloudstorage library. 2249 2250 This class is expected to be subclassed with a reader that understands 2251 user-level records. 2252 2253 Required configuration in the mapper_spec.input_reader dictionary. 2254 BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or 2255 suffixed such as directories. 2256 OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be 2257 in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be 2258 treated as prefix and all objects with matching names will be read. 2259 Entries should not start with a slash unless that is part of the object's 2260 name. An example list could be: 2261 ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"] 2262 To retrieve all files "*" will match every object in the bucket. If a file 2263 is listed twice or is covered by multiple prefixes it will be read twice, 2264 there is no deduplication. 2265 2266 Optional configuration in the mapper_sec.input_reader dictionary. 2267 BUFFER_SIZE_PARAM: the size of the read buffer for each file handle. 2268 DELIMITER_PARAM: if specified, turn on the shallow splitting mode. 2269 The delimiter is used as a path separator to designate directory 2270 hierarchy. Matching of prefixes from OBJECT_NAME_PARAM 2271 will stop at the first directory instead of matching 2272 all files under the directory. This allows MR to process bucket with 2273 hundreds of thousands of files. 2274 FAIL_ON_MISSING_INPUT: if specified and True, the MR will fail if any of 2275 the input files are missing. Missing files will be skipped otherwise. 2276 """ 2277 2278 # Supported parameters 2279 BUCKET_NAME_PARAM = "bucket_name" 2280 OBJECT_NAMES_PARAM = "objects" 2281 BUFFER_SIZE_PARAM = "buffer_size" 2282 DELIMITER_PARAM = "delimiter" 2283 FAIL_ON_MISSING_INPUT = "fail_on_missing_input" 2284 2285 # Internal parameters 2286 _ACCOUNT_ID_PARAM = "account_id" 2287 2288 # Other internal configuration constants 2289 _JSON_PICKLE = "pickle" 2290 _JSON_FAIL_ON_MISSING_INPUT = "fail_on_missing_input" 2291 _STRING_MAX_FILES_LISTED = 10 # Max files shown in the str representation 2292 2293 # Input reader can also take in start and end filenames and do 2294 # listbucket. This saves space but has two cons. 2295 # 1. Files to read are less well defined: files can be added or removed over 2296 # the lifetime of the MR job. 2297 # 2. A shard has to process files from a contiguous namespace. 2298 # May introduce staggering shard. 2299 def __init__(self, filenames, index=0, buffer_size=None, _account_id=None, 2300 delimiter=None): 2301 """Initialize a GoogleCloudStorageInputReader instance. 2302 2303 Args: 2304 filenames: A list of Google Cloud Storage filenames of the form 2305 '/bucket/objectname'. 2306 index: Index of the next filename to read. 2307 buffer_size: The size of the read buffer, None to use default. 2308 _account_id: Internal use only. See cloudstorage documentation. 2309 delimiter: Delimiter used as path separator. See class doc for details. 2310 """ 2311 self._filenames = filenames 2312 self._index = index 2313 self._buffer_size = buffer_size 2314 self._account_id = _account_id 2315 self._delimiter = delimiter 2316 self._bucket = None 2317 self._bucket_iter = None 2318 2319 # True iff we should fail on missing input (see class doc above). Set to 2320 # None in constructor and overwritten in split_input and from_json. 2321 # fail_on_missing_input is not parameter of the constructor to avoid 2322 # breaking classes inheriting from _GoogleCloudStorageInputReader and 2323 # overriding the constructor. 2324 self._fail_on_missing_input = None 2325 2326 def _next_file(self): 2327 """Find next filename. 2328 2329 self._filenames may need to be expanded via listbucket. 2330 2331 Returns: 2332 None if no more file is left. Filename otherwise. 2333 """ 2334 while True: 2335 if self._bucket_iter: 2336 try: 2337 return self._bucket_iter.next().filename 2338 except StopIteration: 2339 self._bucket_iter = None 2340 self._bucket = None 2341 if self._index >= len(self._filenames): 2342 return 2343 filename = self._filenames[self._index] 2344 self._index += 1 2345 if self._delimiter is None or not filename.endswith(self._delimiter): 2346 return filename 2347 self._bucket = cloudstorage.listbucket(filename, 2348 delimiter=self._delimiter) 2349 self._bucket_iter = iter(self._bucket) 2350 2351 @classmethod 2352 def get_params(cls, mapper_spec, allowed_keys=None, allow_old=True): 2353 params = _get_params(mapper_spec, allowed_keys, allow_old) 2354 # Use the bucket_name defined in mapper_spec params if one was not defined 2355 # specifically in the input_reader params. 2356 if (mapper_spec.params.get(cls.BUCKET_NAME_PARAM) is not None and 2357 params.get(cls.BUCKET_NAME_PARAM) is None): 2358 params[cls.BUCKET_NAME_PARAM] = mapper_spec.params[cls.BUCKET_NAME_PARAM] 2359 return params 2360 2361 @classmethod 2362 def validate(cls, mapper_spec): 2363 """Validate mapper specification. 2364 2365 Args: 2366 mapper_spec: an instance of model.MapperSpec 2367 2368 Raises: 2369 BadReaderParamsError: if the specification is invalid for any reason such 2370 as missing the bucket name or providing an invalid bucket name. 2371 """ 2372 reader_spec = cls.get_params(mapper_spec, allow_old=False) 2373 2374 # Bucket Name is required 2375 if cls.BUCKET_NAME_PARAM not in reader_spec: 2376 raise errors.BadReaderParamsError( 2377 "%s is required for Google Cloud Storage" % 2378 cls.BUCKET_NAME_PARAM) 2379 try: 2380 cloudstorage.validate_bucket_name( 2381 reader_spec[cls.BUCKET_NAME_PARAM]) 2382 except ValueError, error: 2383 raise errors.BadReaderParamsError("Bad bucket name, %s" % (error)) 2384 2385 # Object Name(s) are required 2386 if cls.OBJECT_NAMES_PARAM not in reader_spec: 2387 raise errors.BadReaderParamsError( 2388 "%s is required for Google Cloud Storage" % 2389 cls.OBJECT_NAMES_PARAM) 2390 filenames = reader_spec[cls.OBJECT_NAMES_PARAM] 2391 if not isinstance(filenames, list): 2392 raise errors.BadReaderParamsError( 2393 "Object name list is not a list but a %s" % 2394 filenames.__class__.__name__) 2395 for filename in filenames: 2396 if not isinstance(filename, basestring): 2397 raise errors.BadReaderParamsError( 2398 "Object name is not a string but a %s" % 2399 filename.__class__.__name__) 2400 if cls.DELIMITER_PARAM in reader_spec: 2401 delimiter = reader_spec[cls.DELIMITER_PARAM] 2402 if not isinstance(delimiter, basestring): 2403 raise errors.BadReaderParamsError( 2404 "%s is not a string but a %s" % 2405 (cls.DELIMITER_PARAM, type(delimiter))) 2406 2407 @classmethod 2408 def split_input(cls, mapper_spec): 2409 """Returns a list of input readers. 2410 2411 An equal number of input files are assigned to each shard (+/- 1). If there 2412 are fewer files than shards, fewer than the requested number of shards will 2413 be used. Input files are currently never split (although for some formats 2414 could be and may be split in a future implementation). 2415 2416 Args: 2417 mapper_spec: an instance of model.MapperSpec. 2418 2419 Returns: 2420 A list of InputReaders. None when no input data can be found. 2421 """ 2422 reader_spec = cls.get_params(mapper_spec, allow_old=False) 2423 bucket = reader_spec[cls.BUCKET_NAME_PARAM] 2424 filenames = reader_spec[cls.OBJECT_NAMES_PARAM] 2425 delimiter = reader_spec.get(cls.DELIMITER_PARAM) 2426 account_id = reader_spec.get(cls._ACCOUNT_ID_PARAM) 2427 buffer_size = reader_spec.get(cls.BUFFER_SIZE_PARAM) 2428 fail_on_missing_input = reader_spec.get(cls.FAIL_ON_MISSING_INPUT) 2429 2430 # Gather the complete list of files (expanding wildcards) 2431 all_filenames = [] 2432 for filename in filenames: 2433 if filename.endswith("*"): 2434 all_filenames.extend( 2435 [file_stat.filename for file_stat in cloudstorage.listbucket( 2436 "/" + bucket + "/" + filename[:-1], delimiter=delimiter, 2437 _account_id=account_id)]) 2438 else: 2439 all_filenames.append("/%s/%s" % (bucket, filename)) 2440 2441 # Split into shards 2442 readers = [] 2443 for shard in range(0, mapper_spec.shard_count): 2444 shard_filenames = all_filenames[shard::mapper_spec.shard_count] 2445 if shard_filenames: 2446 reader = cls( 2447 shard_filenames, buffer_size=buffer_size, _account_id=account_id, 2448 delimiter=delimiter) 2449 reader._fail_on_missing_input = fail_on_missing_input 2450 readers.append(reader) 2451 return readers 2452 2453 @classmethod 2454 def from_json(cls, state): 2455 obj = pickle.loads(state[cls._JSON_PICKLE]) 2456 # fail_on_missing_input might not be set - default to False. 2457 obj._fail_on_missing_input = state.get( 2458 cls._JSON_FAIL_ON_MISSING_INPUT, False) 2459 if obj._bucket: 2460 obj._bucket_iter = iter(obj._bucket) 2461 return obj 2462 2463 def to_json(self): 2464 before_iter = self._bucket_iter 2465 self._bucket_iter = None 2466 try: 2467 return { 2468 self._JSON_PICKLE: pickle.dumps(self), 2469 # self._fail_on_missing_input gets pickled but we save it separately 2470 # and override it in from_json to deal with version flipping. 2471 self._JSON_FAIL_ON_MISSING_INPUT: 2472 getattr(self, "_fail_on_missing_input", False) 2473 } 2474 return {self._JSON_PICKLE: pickle.dumps(self)} 2475 finally: 2476 self._bucket_itr = before_iter 2477 2478 def next(self): 2479 """Returns the next input from this input reader, a block of bytes. 2480 2481 Non existent files will be logged and skipped. The file might have been 2482 removed after input splitting. 2483 2484 Returns: 2485 The next input from this input reader in the form of a cloudstorage 2486 ReadBuffer that supports a File-like interface (read, readline, seek, 2487 tell, and close). An error may be raised if the file can not be opened. 2488 2489 Raises: 2490 StopIteration: The list of files has been exhausted. 2491 """ 2492 options = {} 2493 if self._buffer_size: 2494 options["read_buffer_size"] = self._buffer_size 2495 if self._account_id: 2496 options["_account_id"] = self._account_id 2497 while True: 2498 filename = self._next_file() 2499 if filename is None: 2500 raise StopIteration() 2501 try: 2502 start_time = time.time() 2503 handle = cloudstorage.open(filename, **options) 2504 2505 ctx = context.get() 2506 if ctx: 2507 operation.counters.Increment( 2508 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx) 2509 2510 return handle 2511 except cloudstorage.NotFoundError: 2512 # Fail the job if we're strict on missing input. 2513 if getattr(self, "_fail_on_missing_input", False): 2514 raise errors.FailJobError( 2515 "File missing in GCS, aborting: %s" % filename) 2516 # Move on otherwise. 2517 logging.warning("File %s may have been removed. Skipping file.", 2518 filename) 2519 2520 def __str__(self): 2521 # Only show a limited number of files individually for readability 2522 num_files = len(self._filenames) 2523 if num_files > self._STRING_MAX_FILES_LISTED: 2524 names = "%s...%s + %d not shown" % ( 2525 ",".join(self._filenames[0:self._STRING_MAX_FILES_LISTED-1]), 2526 self._filenames[-1], 2527 num_files - self._STRING_MAX_FILES_LISTED) 2528 else: 2529 names = ",".join(self._filenames) 2530 2531 if self._index > num_files: 2532 status = "EOF" 2533 else: 2534 status = "Next %s (%d of %d)" % ( 2535 self._filenames[self._index], 2536 self._index + 1, # +1 for human 1-indexing 2537 num_files) 2538 return "CloudStorage [%s, %s]" % (status, names) 2539 2540 2541GoogleCloudStorageInputReader = _GoogleCloudStorageInputReader 2542 2543 2544class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader): 2545 """Read data from a Google Cloud Storage file using LevelDB format. 2546 2547 See the _GoogleCloudStorageOutputWriter for additional configuration options. 2548 """ 2549 2550 def __getstate__(self): 2551 result = self.__dict__.copy() 2552 # record reader may not exist if reader has not been used 2553 if "_record_reader" in result: 2554 # RecordsReader has no buffering, it can safely be reconstructed after 2555 # deserialization 2556 result.pop("_record_reader") 2557 return result 2558 2559 def next(self): 2560 """Returns the next input from this input reader, a record. 2561 2562 Returns: 2563 The next input from this input reader in the form of a record read from 2564 an LevelDB file. 2565 2566 Raises: 2567 StopIteration: The ordered set records has been exhausted. 2568 """ 2569 while True: 2570 if not hasattr(self, "_cur_handle") or self._cur_handle is None: 2571 # If there are no more files, StopIteration is raised here 2572 self._cur_handle = super(_GoogleCloudStorageRecordInputReader, 2573 self).next() 2574 if not hasattr(self, "_record_reader") or self._record_reader is None: 2575 self._record_reader = records.RecordsReader(self._cur_handle) 2576 2577 try: 2578 start_time = time.time() 2579 content = self._record_reader.read() 2580 2581 ctx = context.get() 2582 if ctx: 2583 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx) 2584 operation.counters.Increment( 2585 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx) 2586 return content 2587 2588 except EOFError: 2589 self._cur_handle = None 2590 self._record_reader = None 2591 2592 2593GoogleCloudStorageRecordInputReader = _GoogleCloudStorageRecordInputReader 2594 2595 2596class _ReducerReader(_GoogleCloudStorageRecordInputReader): 2597 """Reader to read KeyValues records from GCS.""" 2598 2599 expand_parameters = True 2600 2601 def __init__(self, filenames, index=0, buffer_size=None, _account_id=None, 2602 delimiter=None): 2603 super(_ReducerReader, self).__init__(filenames, index, buffer_size, 2604 _account_id, delimiter) 2605 self.current_key = None 2606 self.current_values = None 2607 2608 def __iter__(self): 2609 ctx = context.get() 2610 combiner = None 2611 2612 if ctx: 2613 combiner_spec = ctx.mapreduce_spec.mapper.params.get("combiner_spec") 2614 if combiner_spec: 2615 combiner = util.handler_for_name(combiner_spec) 2616 2617 try: 2618 while True: 2619 binary_record = super(_ReducerReader, self).next() 2620 proto = kv_pb.KeyValues() 2621 proto.ParseFromString(binary_record) 2622 2623 to_yield = None 2624 if self.current_key is not None and self.current_key != proto.key(): 2625 to_yield = (self.current_key, self.current_values) 2626 self.current_key = None 2627 self.current_values = None 2628 2629 if self.current_key is None: 2630 self.current_key = proto.key() 2631 self.current_values = [] 2632 2633 if combiner: 2634 combiner_result = combiner( 2635 self.current_key, proto.value_list(), self.current_values) 2636 2637 if not util.is_generator(combiner_result): 2638 raise errors.BadCombinerOutputError( 2639 "Combiner %s should yield values instead of returning them " 2640 "(%s)" % (combiner, combiner_result)) 2641 2642 self.current_values = [] 2643 for value in combiner_result: 2644 if isinstance(value, operation.Operation): 2645 value(ctx) 2646 else: 2647 # With combiner the current values always come from the combiner. 2648 self.current_values.append(value) 2649 2650 # Check-point after each combiner call is run only when there's 2651 # nothing that needs to be yielded below. Otherwise allowing a 2652 # check-point here would cause the current to_yield data to be lost. 2653 if not to_yield: 2654 yield ALLOW_CHECKPOINT 2655 else: 2656 # Without combiner we just accumulate values. 2657 self.current_values.extend(proto.value_list()) 2658 2659 if to_yield: 2660 yield to_yield 2661 # Check-point after each key is yielded. 2662 yield ALLOW_CHECKPOINT 2663 except StopIteration: 2664 pass 2665 2666 # There may be some accumulated values left at the end of an input file 2667 # so be sure to yield those too. 2668 if self.current_key is not None: 2669 to_yield = (self.current_key, self.current_values) 2670 self.current_key = None 2671 self.current_values = None 2672 yield to_yield 2673 2674 @staticmethod 2675 def encode_data(data): 2676 """Encodes the given data, which may have include raw bytes. 2677 2678 Works around limitations in JSON encoding, which cannot handle raw bytes. 2679 2680 Args: 2681 data: the data to encode. 2682 2683 Returns: 2684 The data encoded. 2685 """ 2686 return base64.b64encode(pickle.dumps(data)) 2687 2688 @staticmethod 2689 def decode_data(data): 2690 """Decodes data encoded with the encode_data function.""" 2691 return pickle.loads(base64.b64decode(data)) 2692 2693 def to_json(self): 2694 """Returns an input shard state for the remaining inputs. 2695 2696 Returns: 2697 A json-izable version of the remaining InputReader. 2698 """ 2699 result = super(_ReducerReader, self).to_json() 2700 result["current_key"] = self.encode_data(self.current_key) 2701 result["current_values"] = self.encode_data(self.current_values) 2702 return result 2703 2704 @classmethod 2705 def from_json(cls, json): 2706 """Creates an instance of the InputReader for the given input shard state. 2707 2708 Args: 2709 json: The InputReader state as a dict-like object. 2710 2711 Returns: 2712 An instance of the InputReader configured using the values of json. 2713 """ 2714 result = super(_ReducerReader, cls).from_json(json) 2715 result.current_key = _ReducerReader.decode_data(json["current_key"]) 2716 result.current_values = _ReducerReader.decode_data(json["current_values"]) 2717 return result 2718