1#!/usr/bin/env python
2# Copyright 2010 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Defines input readers for MapReduce."""
17
18
19
20__all__ = [
21    "AbstractDatastoreInputReader",
22    "ALLOW_CHECKPOINT",
23    "BadReaderParamsError",
24    "BlobstoreLineInputReader",
25    "BlobstoreZipInputReader",
26    "BlobstoreZipLineInputReader",
27    "COUNTER_IO_READ_BYTES",
28    "COUNTER_IO_READ_MSEC",
29    "DatastoreEntityInputReader",
30    "DatastoreInputReader",
31    "DatastoreKeyInputReader",
32    "GoogleCloudStorageInputReader",
33    "GoogleCloudStorageRecordInputReader",
34    "RandomStringInputReader",
35    "RawDatastoreInputReader",
36    "Error",
37    "InputReader",
38    "LogInputReader",
39    "NamespaceInputReader",
40    ]
41
42# pylint: disable=g-bad-name
43# pylint: disable=protected-access
44
45import base64
46import copy
47import logging
48import pickle
49import random
50import string
51import StringIO
52import time
53import zipfile
54
55from google.net.proto import ProtocolBuffer
56from google.appengine.ext import ndb
57
58from google.appengine.api import datastore
59from google.appengine.api import logservice
60from google.appengine.api.logservice import log_service_pb
61from google.appengine.ext import blobstore
62from google.appengine.ext import db
63from google.appengine.ext import key_range
64from google.appengine.ext.db import metadata
65from mapreduce import context
66from mapreduce import datastore_range_iterators as db_iters
67from mapreduce import errors
68from mapreduce import json_util
69from mapreduce import key_ranges
70from mapreduce import kv_pb
71from mapreduce import model
72from mapreduce import namespace_range
73from mapreduce import operation
74from mapreduce import property_range
75from mapreduce import records
76from mapreduce import util
77
78# pylint: disable=g-import-not-at-top
79# TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime.
80try:
81  # Check if the full cloudstorage package exists. The stub part is in runtime.
82  cloudstorage = None
83  import cloudstorage
84  if hasattr(cloudstorage, "_STUB"):
85    cloudstorage = None
86except ImportError:
87  pass  # CloudStorage library not available
88
89# Attempt to load cloudstorage from the bundle (availble in some tests)
90if cloudstorage is None:
91  try:
92    import cloudstorage
93  except ImportError:
94    pass  # CloudStorage library really not available
95
96
97# Classes moved to errors module. Copied here for compatibility.
98Error = errors.Error
99BadReaderParamsError = errors.BadReaderParamsError
100
101
102# Counter name for number of bytes read.
103COUNTER_IO_READ_BYTES = "io-read-bytes"
104
105# Counter name for milliseconds spent reading data.
106COUNTER_IO_READ_MSEC = "io-read-msec"
107
108# Special value that can be yielded by InputReaders if they want to give the
109# framework an opportunity to save the state of the mapreduce without having
110# to yield an actual value to the handler.
111ALLOW_CHECKPOINT = object()
112
113
114class InputReader(json_util.JsonMixin):
115  """Abstract base class for input readers.
116
117  InputReaders have the following properties:
118   * They are created by using the split_input method to generate a set of
119     InputReaders from a MapperSpec.
120   * They generate inputs to the mapper via the iterator interface.
121   * After creation, they can be serialized and resumed using the JsonMixin
122     interface.
123   * They are cast to string for a user-readable description; it may be
124     valuable to implement __str__.
125  """
126
127  # When expand_parameters is False, then value yielded by reader is passed
128  # to handler as is. If it's true, then *value is passed, expanding arguments
129  # and letting handler be a multi-parameter function.
130  expand_parameters = False
131
132  # Mapreduce parameters.
133  _APP_PARAM = "_app"
134  NAMESPACE_PARAM = "namespace"
135  NAMESPACES_PARAM = "namespaces"  # Obsolete.
136
137  def __iter__(self):
138    return self
139
140  def next(self):
141    """Returns the next input from this input reader as a key, value pair.
142
143    Returns:
144      The next input from this input reader.
145    """
146    raise NotImplementedError("next() not implemented in %s" % self.__class__)
147
148  @classmethod
149  def from_json(cls, input_shard_state):
150    """Creates an instance of the InputReader for the given input shard state.
151
152    Args:
153      input_shard_state: The InputReader state as a dict-like object.
154
155    Returns:
156      An instance of the InputReader configured using the values of json.
157    """
158    raise NotImplementedError("from_json() not implemented in %s" % cls)
159
160  def to_json(self):
161    """Returns an input shard state for the remaining inputs.
162
163    Returns:
164      A json-izable version of the remaining InputReader.
165    """
166    raise NotImplementedError("to_json() not implemented in %s" %
167                              self.__class__)
168
169  @classmethod
170  def split_input(cls, mapper_spec):
171    """Returns a list of input readers.
172
173    This method creates a list of input readers, each for one shard.
174    It attempts to split inputs among readers evenly.
175
176    Args:
177      mapper_spec: model.MapperSpec specifies the inputs and additional
178        parameters to define the behavior of input readers.
179
180    Returns:
181      A list of InputReaders. None or [] when no input data can be found.
182    """
183    raise NotImplementedError("split_input() not implemented in %s" % cls)
184
185  @classmethod
186  def validate(cls, mapper_spec):
187    """Validates mapper spec and all mapper parameters.
188
189    Input reader parameters are expected to be passed as "input_reader"
190    subdictionary in mapper_spec.params.
191
192    Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
193    to be compatible, input reader check mapper_spec.params as well and
194    issue a warning if "input_reader" subdicationary is not present.
195
196    Args:
197      mapper_spec: The MapperSpec for this InputReader.
198
199    Raises:
200      BadReaderParamsError: required parameters are missing or invalid.
201    """
202    if mapper_spec.input_reader_class() != cls:
203      raise BadReaderParamsError("Input reader class mismatch")
204
205
206def _get_params(mapper_spec, allowed_keys=None, allow_old=True):
207  """Obtain input reader parameters.
208
209  Utility function for input readers implementation. Fetches parameters
210  from mapreduce specification giving appropriate usage warnings.
211
212  Args:
213    mapper_spec: The MapperSpec for the job
214    allowed_keys: set of all allowed keys in parameters as strings. If it is not
215      None, then parameters are expected to be in a separate "input_reader"
216      subdictionary of mapper_spec parameters.
217    allow_old: Allow parameters to exist outside of the input_reader
218      subdictionary for compatability.
219
220  Returns:
221    mapper parameters as dict
222
223  Raises:
224    BadReaderParamsError: if parameters are invalid/missing or not allowed.
225  """
226  if "input_reader" not in mapper_spec.params:
227    message = ("Input reader's parameters should be specified in "
228               "input_reader subdictionary.")
229    if not allow_old or allowed_keys:
230      raise errors.BadReaderParamsError(message)
231    params = mapper_spec.params
232    params = dict((str(n), v) for n, v in params.iteritems())
233  else:
234    if not isinstance(mapper_spec.params.get("input_reader"), dict):
235      raise errors.BadReaderParamsError(
236          "Input reader parameters should be a dictionary")
237    params = mapper_spec.params.get("input_reader")
238    params = dict((str(n), v) for n, v in params.iteritems())
239    if allowed_keys:
240      params_diff = set(params.keys()) - allowed_keys
241      if params_diff:
242        raise errors.BadReaderParamsError(
243            "Invalid input_reader parameters: %s" % ",".join(params_diff))
244  return params
245
246
247class AbstractDatastoreInputReader(InputReader):
248  """Abstract class for datastore input readers."""
249
250  # Number of entities to fetch at once while doing scanning.
251  _BATCH_SIZE = 50
252
253  # Maximum number of shards we'll create.
254  _MAX_SHARD_COUNT = 256
255
256  # Factor for additional ranges to split when using inequality filters.
257  _OVERSPLIT_FACTOR = 1
258
259  # The maximum number of namespaces that will be sharded by datastore key
260  # before switching to a strategy where sharding is done lexographically by
261  # namespace.
262  MAX_NAMESPACES_FOR_KEY_SHARD = 10
263
264  # reader parameters.
265  ENTITY_KIND_PARAM = "entity_kind"
266  KEYS_ONLY_PARAM = "keys_only"
267  BATCH_SIZE_PARAM = "batch_size"
268  KEY_RANGE_PARAM = "key_range"
269  FILTERS_PARAM = "filters"
270  OVERSPLIT_FACTOR_PARAM = "oversplit_factor"
271
272  _KEY_RANGE_ITER_CLS = db_iters.AbstractKeyRangeIterator
273
274  def __init__(self, iterator):
275    """Create new DatastoreInputReader object.
276
277    This is internal constructor. Use split_input to create readers instead.
278
279    Args:
280      iterator: an iterator that generates objects for this input reader.
281    """
282    self._iter = iterator
283
284  def __iter__(self):
285    """Yields whatever internal iterator yields."""
286    for o in self._iter:
287      yield o
288
289  def __str__(self):
290    """Returns the string representation of this InputReader."""
291    return repr(self._iter)
292
293  def to_json(self):
294    """Serializes input reader to json compatible format.
295
296    Returns:
297      all the data in json-compatible map.
298    """
299    return self._iter.to_json()
300
301  @classmethod
302  def from_json(cls, json):
303    """Create new DatastoreInputReader from json, encoded by to_json.
304
305    Args:
306      json: json representation of DatastoreInputReader.
307
308    Returns:
309      an instance of DatastoreInputReader with all data deserialized from json.
310    """
311    return cls(db_iters.RangeIteratorFactory.from_json(json))
312
313  @classmethod
314  def _get_query_spec(cls, mapper_spec):
315    """Construct a model.QuerySpec from model.MapperSpec."""
316    params = _get_params(mapper_spec)
317    entity_kind = params[cls.ENTITY_KIND_PARAM]
318    filters = params.get(cls.FILTERS_PARAM)
319    app = params.get(cls._APP_PARAM)
320    ns = params.get(cls.NAMESPACE_PARAM)
321
322    return model.QuerySpec(
323        entity_kind=cls._get_raw_entity_kind(entity_kind),
324        keys_only=bool(params.get(cls.KEYS_ONLY_PARAM, False)),
325        filters=filters,
326        batch_size=int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)),
327        oversplit_factor=int(params.get(cls.OVERSPLIT_FACTOR_PARAM,
328                                        cls._OVERSPLIT_FACTOR)),
329        model_class_path=entity_kind,
330        app=app,
331        ns=ns)
332
333  @classmethod
334  def split_input(cls, mapper_spec):
335    """Inherit doc."""
336    shard_count = mapper_spec.shard_count
337    query_spec = cls._get_query_spec(mapper_spec)
338
339    namespaces = None
340    if query_spec.ns is not None:
341      k_ranges = cls._to_key_ranges_by_shard(
342          query_spec.app, [query_spec.ns], shard_count, query_spec)
343    else:
344      ns_keys = namespace_range.get_namespace_keys(
345          query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
346      # No namespace means the app may have some data but those data are not
347      # visible yet. Just return.
348      if not ns_keys:
349        return
350      # If the number of ns is small, we shard each ns by key and assign each
351      # shard a piece of a ns.
352      elif len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
353        namespaces = [ns_key.name() or "" for ns_key in ns_keys]
354        k_ranges = cls._to_key_ranges_by_shard(
355            query_spec.app, namespaces, shard_count, query_spec)
356      # When number of ns is large, we can only split lexicographically by ns.
357      else:
358        ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
359                                                         contiguous=False,
360                                                         can_query=lambda: True,
361                                                         _app=query_spec.app)
362        k_ranges = [key_ranges.KeyRangesFactory.create_from_ns_range(ns_range)
363                    for ns_range in ns_ranges]
364
365    iters = [db_iters.RangeIteratorFactory.create_key_ranges_iterator(
366        r, query_spec, cls._KEY_RANGE_ITER_CLS) for r in k_ranges]
367
368    return [cls(i) for i in iters]
369
370  @classmethod
371  def _to_key_ranges_by_shard(cls, app, namespaces, shard_count, query_spec):
372    """Get a list of key_ranges.KeyRanges objects, one for each shard.
373
374    This method uses scatter index to split each namespace into pieces
375    and assign those pieces to shards.
376
377    Args:
378      app: app_id in str.
379      namespaces: a list of namespaces in str.
380      shard_count: number of shards to split.
381      query_spec: model.QuerySpec.
382
383    Returns:
384      a list of key_ranges.KeyRanges objects.
385    """
386    key_ranges_by_ns = []
387    # Split each ns into n splits. If a ns doesn't have enough scatter to
388    # split into n, the last few splits are None.
389    for namespace in namespaces:
390      ranges = cls._split_ns_by_scatter(
391          shard_count,
392          namespace,
393          query_spec.entity_kind,
394          query_spec.filters,
395          app)
396      # The nth split of each ns will be assigned to the nth shard.
397      # Shuffle so that None are not all by the end.
398      random.shuffle(ranges)
399      key_ranges_by_ns.append(ranges)
400
401    # KeyRanges from different namespaces might be very different in size.
402    # Use round robin to make sure each shard can have at most one split
403    # or a None from a ns.
404    ranges_by_shard = [[] for _ in range(shard_count)]
405    for ranges in key_ranges_by_ns:
406      for i, k_range in enumerate(ranges):
407        if k_range:
408          ranges_by_shard[i].append(k_range)
409
410    key_ranges_by_shard = []
411    for ranges in ranges_by_shard:
412      if ranges:
413        key_ranges_by_shard.append(key_ranges.KeyRangesFactory.create_from_list(
414            ranges))
415    return key_ranges_by_shard
416
417  @classmethod
418  def _split_ns_by_scatter(cls,
419                           shard_count,
420                           namespace,
421                           raw_entity_kind,
422                           filters,
423                           app):
424    """Split a namespace by scatter index into key_range.KeyRange.
425
426    TODO(user): Power this with key_range.KeyRange.compute_split_points.
427
428    Args:
429      shard_count: number of shards.
430      namespace: namespace name to split. str.
431      raw_entity_kind: low level datastore API entity kind.
432      app: app id in str.
433
434    Returns:
435      A list of key_range.KeyRange objects. If there are not enough entities to
436    splits into requested shards, the returned list will contain KeyRanges
437    ordered lexicographically with any Nones appearing at the end.
438    """
439    if shard_count == 1:
440      # With one shard we don't need to calculate any split points at all.
441      return [key_range.KeyRange(namespace=namespace, _app=app)]
442
443    ds_query = datastore.Query(kind=raw_entity_kind,
444                               namespace=namespace,
445                               _app=app,
446                               keys_only=True)
447    ds_query.Order("__scatter__")
448    oversampling_factor = 32
449    random_keys = None
450    if filters:
451      ds_query_with_filters = copy.copy(ds_query)
452      for (key, op, value) in filters:
453        ds_query_with_filters.update({'%s %s' % (key, op): value})
454        try:
455          random_keys = ds_query_with_filters.Get(shard_count *
456                                                  oversampling_factor)
457        except db.NeedIndexError, why:
458          logging.warning('Need to add an index for optimal mapreduce-input'
459                          ' splitting:\n%s' % why)
460          # We'll try again without the filter.  We hope the filter
461          # will filter keys uniformly across the key-name space!
462
463    if not random_keys:
464      random_keys = ds_query.Get(shard_count * oversampling_factor)
465
466    if not random_keys:
467      # There are no entities with scatter property. We have no idea
468      # how to split.
469      return ([key_range.KeyRange(namespace=namespace, _app=app)] +
470              [None] * (shard_count - 1))
471
472    random_keys.sort()
473
474    if len(random_keys) >= shard_count:
475      # We've got a lot of scatter values. Sample them down.
476      random_keys = cls._choose_split_points(random_keys, shard_count)
477
478    k_ranges = []
479
480    k_ranges.append(key_range.KeyRange(
481        key_start=None,
482        key_end=random_keys[0],
483        direction=key_range.KeyRange.ASC,
484        include_start=False,
485        include_end=False,
486        namespace=namespace,
487        _app=app))
488
489    for i in range(0, len(random_keys) - 1):
490      k_ranges.append(key_range.KeyRange(
491          key_start=random_keys[i],
492          key_end=random_keys[i+1],
493          direction=key_range.KeyRange.ASC,
494          include_start=True,
495          include_end=False,
496          namespace=namespace,
497          _app=app))
498
499    k_ranges.append(key_range.KeyRange(
500        key_start=random_keys[-1],
501        key_end=None,
502        direction=key_range.KeyRange.ASC,
503        include_start=True,
504        include_end=False,
505        namespace=namespace,
506        _app=app))
507
508    if len(k_ranges) < shard_count:
509      # We need to have as many shards as it was requested. Add some Nones.
510      k_ranges += [None] * (shard_count - len(k_ranges))
511    return k_ranges
512
513  @classmethod
514  def _choose_split_points(cls, sorted_keys, shard_count):
515    """Returns the best split points given a random set of datastore.Keys."""
516    assert len(sorted_keys) >= shard_count
517    index_stride = len(sorted_keys) / float(shard_count)
518    return [sorted_keys[int(round(index_stride * i))]
519            for i in range(1, shard_count)]
520
521  @classmethod
522  def validate(cls, mapper_spec):
523    """Inherit docs."""
524    params = _get_params(mapper_spec)
525    if cls.ENTITY_KIND_PARAM not in params:
526      raise BadReaderParamsError("Missing input reader parameter 'entity_kind'")
527    if cls.BATCH_SIZE_PARAM in params:
528      try:
529        batch_size = int(params[cls.BATCH_SIZE_PARAM])
530        if batch_size < 1:
531          raise BadReaderParamsError("Bad batch size: %s" % batch_size)
532      except ValueError, e:
533        raise BadReaderParamsError("Bad batch size: %s" % e)
534    if cls.OVERSPLIT_FACTOR_PARAM in params:
535      try:
536        oversplit_factor = int(params[cls.OVERSPLIT_FACTOR_PARAM])
537        if oversplit_factor < 1:
538          raise BadReaderParamsError("Bad oversplit factor:"
539                                     " %s" % oversplit_factor)
540      except ValueError, e:
541        raise BadReaderParamsError("Bad oversplit factor: %s" % e)
542    try:
543      bool(params.get(cls.KEYS_ONLY_PARAM, False))
544    except:
545      raise BadReaderParamsError("keys_only expects a boolean value but got %s",
546                                 params[cls.KEYS_ONLY_PARAM])
547    if cls.NAMESPACE_PARAM in params:
548      if not isinstance(params[cls.NAMESPACE_PARAM],
549                        (str, unicode, type(None))):
550        raise BadReaderParamsError(
551            "Expected a single namespace string")
552    if cls.NAMESPACES_PARAM in params:
553      raise BadReaderParamsError("Multiple namespaces are no longer supported")
554    if cls.FILTERS_PARAM in params:
555      filters = params[cls.FILTERS_PARAM]
556      if not isinstance(filters, list):
557        raise BadReaderParamsError("Expected list for filters parameter")
558      for f in filters:
559        if not isinstance(f, (tuple, list)):
560          raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
561        if len(f) != 3:
562          raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
563        prop, op, _ = f
564        if not isinstance(prop, basestring):
565          raise BadReaderParamsError("Property should be string: %s", prop)
566        if not isinstance(op, basestring):
567          raise BadReaderParamsError("Operator should be string: %s", op)
568
569  @classmethod
570  def _get_raw_entity_kind(cls, entity_kind_or_model_classpath):
571    """Returns the entity kind to use with low level datastore calls.
572
573    Args:
574      entity_kind_or_model_classpath: user specified entity kind or model
575        classpath.
576
577    Returns:
578      the entity kind in str to use with low level datastore calls.
579    """
580    return entity_kind_or_model_classpath
581
582
583class RawDatastoreInputReader(AbstractDatastoreInputReader):
584  """Iterates over an entity kind and yields datastore.Entity."""
585
586  _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityIterator
587
588  @classmethod
589  def validate(cls, mapper_spec):
590    """Inherit docs."""
591    super(RawDatastoreInputReader, cls).validate(mapper_spec)
592    params = _get_params(mapper_spec)
593    entity_kind = params[cls.ENTITY_KIND_PARAM]
594    if "." in entity_kind:
595      logging.warning(
596          ". detected in entity kind %s specified for reader %s."
597          "Assuming entity kind contains the dot.",
598          entity_kind, cls.__name__)
599    if cls.FILTERS_PARAM in params:
600      filters = params[cls.FILTERS_PARAM]
601      for f in filters:
602        if f[1] != "=":
603          raise BadReaderParamsError(
604              "Only equality filters are supported: %s", f)
605
606
607class DatastoreInputReader(AbstractDatastoreInputReader):
608  """Iterates over a Model and yields model instances.
609
610  Supports both db.model and ndb.model.
611  """
612
613  _KEY_RANGE_ITER_CLS = db_iters.KeyRangeModelIterator
614
615  @classmethod
616  def _get_raw_entity_kind(cls, model_classpath):
617    entity_type = util.for_name(model_classpath)
618    if isinstance(entity_type, db.Model):
619      return entity_type.kind()
620    elif isinstance(entity_type, (ndb.Model, ndb.MetaModel)):
621      # pylint: disable=protected-access
622      return entity_type._get_kind()
623    else:
624      return util.get_short_name(model_classpath)
625
626  @classmethod
627  def validate(cls, mapper_spec):
628    """Inherit docs."""
629    super(DatastoreInputReader, cls).validate(mapper_spec)
630    params = _get_params(mapper_spec)
631    entity_kind = params[cls.ENTITY_KIND_PARAM]
632    # Fail fast if Model cannot be located.
633    try:
634      model_class = util.for_name(entity_kind)
635    except ImportError, e:
636      raise BadReaderParamsError("Bad entity kind: %s" % e)
637    if cls.FILTERS_PARAM in params:
638      filters = params[cls.FILTERS_PARAM]
639      if issubclass(model_class, db.Model):
640        cls._validate_filters(filters, model_class)
641      else:
642        cls._validate_filters_ndb(filters, model_class)
643      property_range.PropertyRange(filters, entity_kind)
644
645  @classmethod
646  def _validate_filters(cls, filters, model_class):
647    """Validate user supplied filters.
648
649    Validate filters are on existing properties and filter values
650    have valid semantics.
651
652    Args:
653      filters: user supplied filters. Each filter should be a list or tuple of
654        format (<property_name_as_str>, <query_operator_as_str>,
655        <value_of_certain_type>). Value type is up to the property's type.
656      model_class: the db.Model class for the entity type to apply filters on.
657
658    Raises:
659      BadReaderParamsError: if any filter is invalid in any way.
660    """
661    if not filters:
662      return
663
664    properties = model_class.properties()
665
666    for f in filters:
667      prop, _, val = f
668      if prop not in properties:
669        raise errors.BadReaderParamsError(
670            "Property %s is not defined for entity type %s",
671            prop, model_class.kind())
672
673      # Validate the value of each filter. We need to know filters have
674      # valid value to carry out splits.
675      try:
676        properties[prop].validate(val)
677      except db.BadValueError, e:
678        raise errors.BadReaderParamsError(e)
679
680  @classmethod
681  # pylint: disable=protected-access
682  def _validate_filters_ndb(cls, filters, model_class):
683    """Validate ndb.Model filters."""
684    if not filters:
685      return
686
687    properties = model_class._properties
688
689
690    for idx, f in enumerate(filters):
691      prop, ineq, val = f
692      if prop not in properties:
693        raise errors.BadReaderParamsError(
694            "Property %s is not defined for entity type %s",
695            prop, model_class._get_kind())
696
697      # Attempt to cast the value to a KeyProperty if appropriate.
698      # This enables filtering against keys.
699      try:
700        if (isinstance(val, basestring) and
701            isinstance(properties[prop],
702              (ndb.KeyProperty, ndb.ComputedProperty))):
703          val = ndb.Key(urlsafe=val)
704          filters[idx] = [prop, ineq, val]
705      except:
706        pass
707
708      # Validate the value of each filter. We need to know filters have
709      # valid value to carry out splits.
710      try:
711        properties[prop]._do_validate(val)
712      except db.BadValueError, e:
713        raise errors.BadReaderParamsError(e)
714
715  @classmethod
716  def split_input(cls, mapper_spec):
717    """Inherit docs."""
718    shard_count = mapper_spec.shard_count
719    query_spec = cls._get_query_spec(mapper_spec)
720
721    if not property_range.should_shard_by_property_range(query_spec.filters):
722      return super(DatastoreInputReader, cls).split_input(mapper_spec)
723
724    # Artificially increase the number of shards to get a more even split.
725    # For example, if we are creating 7 shards for one week of data based on a
726    # Day property and the data points tend to be clumped on certain days (say,
727    # Monday and Wednesday), instead of assigning each shard a single day of
728    # the week, we will split each day into "oversplit_factor" pieces, and
729    # assign each shard "oversplit_factor" pieces with "1 / oversplit_factor"
730    # the work, so that the data from Monday and Wednesday is more evenly
731    # spread across all shards.
732    oversplit_factor = query_spec.oversplit_factor
733    oversplit_shard_count = oversplit_factor * shard_count
734    p_range = property_range.PropertyRange(query_spec.filters,
735                                           query_spec.model_class_path)
736    p_ranges = p_range.split(oversplit_shard_count)
737
738    # User specified a namespace.
739    if query_spec.ns is not None:
740      ns_range = namespace_range.NamespaceRange(
741          namespace_start=query_spec.ns,
742          namespace_end=query_spec.ns,
743          _app=query_spec.app)
744      ns_ranges = [copy.copy(ns_range) for _ in p_ranges]
745    else:
746      ns_keys = namespace_range.get_namespace_keys(
747          query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
748      if not ns_keys:
749        return
750      # User doesn't specify ns but the number of ns is small.
751      # We still split by property range.
752      if len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
753        ns_ranges = [namespace_range.NamespaceRange(_app=query_spec.app)
754                     for _ in p_ranges]
755      # Lots of namespaces. Split by ns.
756      else:
757        ns_ranges = namespace_range.NamespaceRange.split(n=oversplit_shard_count,
758                                                         contiguous=False,
759                                                         can_query=lambda: True,
760                                                         _app=query_spec.app)
761        p_ranges = [copy.copy(p_range) for _ in ns_ranges]
762
763    assert len(p_ranges) == len(ns_ranges)
764
765    iters = [
766        db_iters.RangeIteratorFactory.create_property_range_iterator(
767            p, ns, query_spec) for p, ns in zip(p_ranges, ns_ranges)]
768
769    # Reduce the number of ranges back down to the shard count.
770    # It's possible that we didn't split into enough shards even
771    # after oversplitting, in which case we don't need to do anything.
772    if len(iters) > shard_count:
773      # We cycle through the iterators and chain them together, e.g.
774      # if we look at the indices chained together, we get:
775      # Shard #0 gets 0, num_shards, 2 * num_shards, ...
776      # Shard #1 gets 1, num_shards + 1, 2 * num_shards + 1, ...
777      # Shard #2 gets 2, num_shards + 2, 2 * num_shards + 2, ...
778      # and so on. This should split fairly evenly.
779      iters = [
780        db_iters.RangeIteratorFactory.create_multi_property_range_iterator(
781          [iters[i] for i in xrange(start_index, len(iters), shard_count)]
782        ) for start_index in xrange(shard_count)
783      ]
784
785    return [cls(i) for i in iters]
786
787
788class DatastoreKeyInputReader(RawDatastoreInputReader):
789  """Iterate over an entity kind and yields datastore.Key."""
790
791  _KEY_RANGE_ITER_CLS = db_iters.KeyRangeKeyIterator
792
793
794# For backward compatibility.
795DatastoreEntityInputReader = RawDatastoreInputReader
796
797
798# TODO(user): Remove this after the only dependency GroomerMarkReader is
799class _OldAbstractDatastoreInputReader(InputReader):
800  """Abstract base class for classes that iterate over datastore entities.
801
802  Concrete subclasses must implement _iter_key_range(self, k_range). See the
803  docstring for that method for details.
804  """
805
806  # Number of entities to fetch at once while doing scanning.
807  _BATCH_SIZE = 50
808
809  # Maximum number of shards we'll create.
810  _MAX_SHARD_COUNT = 256
811
812  # __scatter__ oversampling factor
813  _OVERSAMPLING_FACTOR = 32
814
815  # The maximum number of namespaces that will be sharded by datastore key
816  # before switching to a strategy where sharding is done lexographically by
817  # namespace.
818  MAX_NAMESPACES_FOR_KEY_SHARD = 10
819
820  # Mapreduce parameters.
821  ENTITY_KIND_PARAM = "entity_kind"
822  KEYS_ONLY_PARAM = "keys_only"
823  BATCH_SIZE_PARAM = "batch_size"
824  KEY_RANGE_PARAM = "key_range"
825  NAMESPACE_RANGE_PARAM = "namespace_range"
826  CURRENT_KEY_RANGE_PARAM = "current_key_range"
827  FILTERS_PARAM = "filters"
828
829  # TODO(user): Add support for arbitrary queries. It's not possible to
830  # support them without cursors since right now you can't even serialize query
831  # definition.
832  # pylint: disable=redefined-outer-name
833  def __init__(self,
834               entity_kind,
835               key_ranges=None,
836               ns_range=None,
837               batch_size=_BATCH_SIZE,
838               current_key_range=None,
839               filters=None):
840    """Create new AbstractDatastoreInputReader object.
841
842    This is internal constructor. Use split_query in a concrete class instead.
843
844    Args:
845      entity_kind: entity kind as string.
846      key_ranges: a sequence of key_range.KeyRange instances to process. Only
847          one of key_ranges or ns_range can be non-None.
848      ns_range: a namespace_range.NamespaceRange to process. Only one of
849          key_ranges or ns_range can be non-None.
850      batch_size: size of read batch as int.
851      current_key_range: the current key_range.KeyRange being processed.
852      filters: optional list of filters to apply to the query. Each filter is
853        a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
854        User filters are applied first.
855    """
856    assert key_ranges is not None or ns_range is not None, (
857        "must specify one of 'key_ranges' or 'ns_range'")
858    assert key_ranges is None or ns_range is None, (
859        "can't specify both 'key_ranges ' and 'ns_range'")
860
861    self._entity_kind = entity_kind
862    # Reverse the KeyRanges so they can be processed in order as a stack of
863    # work items.
864    self._key_ranges = key_ranges and list(reversed(key_ranges))
865
866    self._ns_range = ns_range
867    self._batch_size = int(batch_size)
868    self._current_key_range = current_key_range
869    self._filters = filters
870
871  @classmethod
872  def _get_raw_entity_kind(cls, entity_kind):
873    if "." in entity_kind:
874      logging.warning(
875          ". detected in entity kind %s specified for reader %s."
876          "Assuming entity kind contains the dot.",
877          entity_kind, cls.__name__)
878    return entity_kind
879
880  def __iter__(self):
881    """Iterates over the given KeyRanges or NamespaceRange.
882
883    This method iterates over the given KeyRanges or NamespaceRange and sets
884    the self._current_key_range to the KeyRange currently being processed. It
885    then delegates to the _iter_key_range method to yield that actual
886    results.
887
888    Yields:
889      Forwards the objects yielded by the subclasses concrete _iter_key_range()
890      method. The caller must consume the result yielded because self.to_json()
891      will not include it.
892    """
893    if self._key_ranges is not None:
894      for o in self._iter_key_ranges():
895        yield o
896    elif self._ns_range is not None:
897      for o in self._iter_ns_range():
898        yield o
899    else:
900      assert False, "self._key_ranges and self._ns_range are both None"
901
902  def _iter_key_ranges(self):
903    """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
904    while True:
905      if self._current_key_range is None:
906        if self._key_ranges:
907          self._current_key_range = self._key_ranges.pop()
908          # The most recently popped key_range may be None, so continue here
909          # to find the next keyrange that's valid.
910          continue
911        else:
912          break
913
914      for key, o in self._iter_key_range(
915          copy.deepcopy(self._current_key_range)):
916        # The caller must consume yielded values so advancing the KeyRange
917        # before yielding is safe.
918        self._current_key_range.advance(key)
919        yield o
920      self._current_key_range = None
921
922  def _iter_ns_range(self):
923    """Iterates over self._ns_range, delegating to self._iter_key_range()."""
924    while True:
925      if self._current_key_range is None:
926        query = self._ns_range.make_datastore_query()
927        namespace_result = query.Get(1)
928        if not namespace_result:
929          break
930
931        namespace = namespace_result[0].name() or ""
932        self._current_key_range = key_range.KeyRange(
933            namespace=namespace, _app=self._ns_range.app)
934        yield ALLOW_CHECKPOINT
935
936      for key, o in self._iter_key_range(
937          copy.deepcopy(self._current_key_range)):
938        # The caller must consume yielded values so advancing the KeyRange
939        # before yielding is safe.
940        self._current_key_range.advance(key)
941        yield o
942
943      if (self._ns_range.is_single_namespace or
944          self._current_key_range.namespace == self._ns_range.namespace_end):
945        break
946      self._ns_range = self._ns_range.with_start_after(
947          self._current_key_range.namespace)
948      self._current_key_range = None
949
950  def _iter_key_range(self, k_range):
951    """Yields a db.Key and the value that should be yielded by self.__iter__().
952
953    Args:
954      k_range: The key_range.KeyRange to iterate over.
955
956    Yields:
957      A 2-tuple containing the last db.Key processed and the value that should
958      be yielded by __iter__. The returned db.Key will be used to determine the
959      InputReader's current position in self._current_key_range.
960    """
961    raise NotImplementedError("_iter_key_range() not implemented in %s" %
962                              self.__class__)
963
964  def __str__(self):
965    """Returns the string representation of this InputReader."""
966    if self._ns_range is None:
967      return repr(self._key_ranges)
968    else:
969      return repr(self._ns_range)
970
971  @classmethod
972  def _choose_split_points(cls, sorted_keys, shard_count):
973    """Returns the best split points given a random set of db.Keys."""
974    assert len(sorted_keys) >= shard_count
975    index_stride = len(sorted_keys) / float(shard_count)
976    return [sorted_keys[int(round(index_stride * i))]
977            for i in range(1, shard_count)]
978
979  # TODO(user): use query splitting functionality when it becomes available
980  # instead.
981  @classmethod
982  def _split_input_from_namespace(cls, app, namespace, entity_kind,
983                                  shard_count):
984    """Helper for _split_input_from_params.
985
986    If there are not enough Entities to make all of the given shards, the
987    returned list of KeyRanges will include Nones. The returned list will
988    contain KeyRanges ordered lexographically with any Nones appearing at the
989    end.
990
991    Args:
992      app: the app.
993      namespace: the namespace.
994      entity_kind: entity kind as string.
995      shard_count: the number of shards.
996
997    Returns:
998      KeyRange objects.
999    """
1000
1001    raw_entity_kind = cls._get_raw_entity_kind(entity_kind)
1002    if shard_count == 1:
1003      # With one shard we don't need to calculate any splitpoints at all.
1004      return [key_range.KeyRange(namespace=namespace, _app=app)]
1005
1006    ds_query = datastore.Query(kind=raw_entity_kind,
1007                               namespace=namespace,
1008                               _app=app,
1009                               keys_only=True)
1010    ds_query.Order("__scatter__")
1011    random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR)
1012
1013    if not random_keys:
1014      # There are no entities with scatter property. We have no idea
1015      # how to split.
1016      return ([key_range.KeyRange(namespace=namespace, _app=app)] +
1017              [None] * (shard_count - 1))
1018
1019    random_keys.sort()
1020
1021    if len(random_keys) >= shard_count:
1022      # We've got a lot of scatter values. Sample them down.
1023      random_keys = cls._choose_split_points(random_keys, shard_count)
1024
1025    # pylint: disable=redefined-outer-name
1026    key_ranges = []
1027
1028    key_ranges.append(key_range.KeyRange(
1029        key_start=None,
1030        key_end=random_keys[0],
1031        direction=key_range.KeyRange.ASC,
1032        include_start=False,
1033        include_end=False,
1034        namespace=namespace,
1035        _app=app))
1036
1037    for i in range(0, len(random_keys) - 1):
1038      key_ranges.append(key_range.KeyRange(
1039          key_start=random_keys[i],
1040          key_end=random_keys[i+1],
1041          direction=key_range.KeyRange.ASC,
1042          include_start=True,
1043          include_end=False,
1044          namespace=namespace,
1045          _app=app))
1046
1047    key_ranges.append(key_range.KeyRange(
1048        key_start=random_keys[-1],
1049        key_end=None,
1050        direction=key_range.KeyRange.ASC,
1051        include_start=True,
1052        include_end=False,
1053        namespace=namespace,
1054        _app=app))
1055
1056    if len(key_ranges) < shard_count:
1057      # We need to have as many shards as it was requested. Add some Nones.
1058      key_ranges += [None] * (shard_count - len(key_ranges))
1059
1060    return key_ranges
1061
1062  @classmethod
1063  def _split_input_from_params(cls, app, namespaces, entity_kind_name,
1064                               params, shard_count):
1065    """Return input reader objects. Helper for split_input."""
1066    # pylint: disable=redefined-outer-name
1067    key_ranges = []  # KeyRanges for all namespaces
1068    for namespace in namespaces:
1069      key_ranges.extend(
1070          cls._split_input_from_namespace(app,
1071                                          namespace,
1072                                          entity_kind_name,
1073                                          shard_count))
1074
1075    # Divide the KeyRanges into shard_count shards. The KeyRanges for different
1076    # namespaces might be very different in size so the assignment of KeyRanges
1077    # to shards is done round-robin.
1078    shared_ranges = [[] for _ in range(shard_count)]
1079    for i, k_range in enumerate(key_ranges):
1080      shared_ranges[i % shard_count].append(k_range)
1081    batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1082
1083    return [cls(entity_kind_name,
1084                key_ranges=key_ranges,
1085                ns_range=None,
1086                batch_size=batch_size)
1087            for key_ranges in shared_ranges if key_ranges]
1088
1089  @classmethod
1090  def validate(cls, mapper_spec):
1091    """Validates mapper spec and all mapper parameters.
1092
1093    Args:
1094      mapper_spec: The MapperSpec for this InputReader.
1095
1096    Raises:
1097      BadReaderParamsError: required parameters are missing or invalid.
1098    """
1099    if mapper_spec.input_reader_class() != cls:
1100      raise BadReaderParamsError("Input reader class mismatch")
1101    params = _get_params(mapper_spec)
1102    if cls.ENTITY_KIND_PARAM not in params:
1103      raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
1104    if cls.BATCH_SIZE_PARAM in params:
1105      try:
1106        batch_size = int(params[cls.BATCH_SIZE_PARAM])
1107        if batch_size < 1:
1108          raise BadReaderParamsError("Bad batch size: %s" % batch_size)
1109      except ValueError, e:
1110        raise BadReaderParamsError("Bad batch size: %s" % e)
1111    if cls.NAMESPACE_PARAM in params:
1112      if not isinstance(params[cls.NAMESPACE_PARAM],
1113                        (str, unicode, type(None))):
1114        raise BadReaderParamsError(
1115            "Expected a single namespace string")
1116    if cls.NAMESPACES_PARAM in params:
1117      raise BadReaderParamsError("Multiple namespaces are no longer supported")
1118    if cls.FILTERS_PARAM in params:
1119      filters = params[cls.FILTERS_PARAM]
1120      if not isinstance(filters, list):
1121        raise BadReaderParamsError("Expected list for filters parameter")
1122      for f in filters:
1123        if not isinstance(f, (tuple, list)):
1124          raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
1125        if len(f) != 3:
1126          raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
1127        if not isinstance(f[0], basestring):
1128          raise BadReaderParamsError("First element should be string: %s", f)
1129        if f[1] != "=":
1130          raise BadReaderParamsError(
1131              "Only equality filters are supported: %s", f)
1132
1133  @classmethod
1134  def split_input(cls, mapper_spec):
1135    """Splits query into shards without fetching query results.
1136
1137    Tries as best as it can to split the whole query result set into equal
1138    shards. Due to difficulty of making the perfect split, resulting shards'
1139    sizes might differ significantly from each other.
1140
1141    Args:
1142      mapper_spec: MapperSpec with params containing 'entity_kind'.
1143        May have 'namespace' in the params as a string containing a single
1144        namespace. If specified then the input reader will only yield values
1145        in the given namespace. If 'namespace' is not given then values from
1146        all namespaces will be yielded. May also have 'batch_size' in the params
1147        to specify the number of entities to process in each batch.
1148
1149    Returns:
1150      A list of InputReader objects. If the query results are empty then the
1151      empty list will be returned. Otherwise, the list will always have a length
1152      equal to number_of_shards but may be padded with Nones if there are too
1153      few results for effective sharding.
1154    """
1155    params = _get_params(mapper_spec)
1156    entity_kind_name = params[cls.ENTITY_KIND_PARAM]
1157    batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1158    shard_count = mapper_spec.shard_count
1159    namespace = params.get(cls.NAMESPACE_PARAM)
1160    app = params.get(cls._APP_PARAM)
1161    filters = params.get(cls.FILTERS_PARAM)
1162
1163    if namespace is None:
1164      # It is difficult to efficiently shard large numbers of namespaces because
1165      # there can be an arbitrary number of them. So the strategy is:
1166      # 1. if there are a small number of namespaces in the datastore then
1167      #    generate one KeyRange per namespace per shard and assign each shard a
1168      #    KeyRange for every namespace. This should lead to nearly perfect
1169      #    sharding.
1170      # 2. if there are a large number of namespaces in the datastore then
1171      #    generate one NamespaceRange per worker. This can lead to very bad
1172      #    sharding because namespaces can contain very different numbers of
1173      #    entities and each NamespaceRange may contain very different numbers
1174      #    of namespaces.
1175      namespace_query = datastore.Query("__namespace__",
1176                                        keys_only=True,
1177                                        _app=app)
1178      namespace_keys = namespace_query.Get(
1179          limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
1180
1181      if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD:
1182        ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
1183                                                         contiguous=True,
1184                                                         _app=app)
1185        return [cls(entity_kind_name,
1186                    key_ranges=None,
1187                    ns_range=ns_range,
1188                    batch_size=batch_size,
1189                    filters=filters)
1190                for ns_range in ns_ranges]
1191      elif not namespace_keys:
1192        return [cls(entity_kind_name,
1193                    key_ranges=None,
1194                    ns_range=namespace_range.NamespaceRange(_app=app),
1195                    batch_size=shard_count,
1196                    filters=filters)]
1197      else:
1198        namespaces = [namespace_key.name() or ""
1199                      for namespace_key in namespace_keys]
1200    else:
1201      namespaces = [namespace]
1202
1203    readers = cls._split_input_from_params(
1204        app, namespaces, entity_kind_name, params, shard_count)
1205    if filters:
1206      for reader in readers:
1207        reader._filters = filters
1208    return readers
1209
1210  def to_json(self):
1211    """Serializes all the data in this query range into json form.
1212
1213    Returns:
1214      all the data in json-compatible map.
1215    """
1216    if self._key_ranges is None:
1217      key_ranges_json = None
1218    else:
1219      key_ranges_json = []
1220      for k in self._key_ranges:
1221        if k:
1222          key_ranges_json.append(k.to_json())
1223        else:
1224          key_ranges_json.append(None)
1225
1226    if self._ns_range is None:
1227      namespace_range_json = None
1228    else:
1229      namespace_range_json = self._ns_range.to_json_object()
1230
1231    if self._current_key_range is None:
1232      current_key_range_json = None
1233    else:
1234      current_key_range_json = self._current_key_range.to_json()
1235
1236    json_dict = {self.KEY_RANGE_PARAM: key_ranges_json,
1237                 self.NAMESPACE_RANGE_PARAM: namespace_range_json,
1238                 self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
1239                 self.ENTITY_KIND_PARAM: self._entity_kind,
1240                 self.BATCH_SIZE_PARAM: self._batch_size,
1241                 self.FILTERS_PARAM: self._filters}
1242    return json_dict
1243
1244  @classmethod
1245  def from_json(cls, json):
1246    """Create new DatastoreInputReader from the json, encoded by to_json.
1247
1248    Args:
1249      json: json map representation of DatastoreInputReader.
1250
1251    Returns:
1252      an instance of DatastoreInputReader with all data deserialized from json.
1253    """
1254    if json[cls.KEY_RANGE_PARAM] is None:
1255      # pylint: disable=redefined-outer-name
1256      key_ranges = None
1257    else:
1258      key_ranges = []
1259      for k in json[cls.KEY_RANGE_PARAM]:
1260        if k:
1261          key_ranges.append(key_range.KeyRange.from_json(k))
1262        else:
1263          key_ranges.append(None)
1264
1265    if json[cls.NAMESPACE_RANGE_PARAM] is None:
1266      ns_range = None
1267    else:
1268      ns_range = namespace_range.NamespaceRange.from_json_object(
1269          json[cls.NAMESPACE_RANGE_PARAM])
1270
1271    if json[cls.CURRENT_KEY_RANGE_PARAM] is None:
1272      current_key_range = None
1273    else:
1274      current_key_range = key_range.KeyRange.from_json(
1275          json[cls.CURRENT_KEY_RANGE_PARAM])
1276
1277    return cls(
1278        json[cls.ENTITY_KIND_PARAM],
1279        key_ranges,
1280        ns_range,
1281        json[cls.BATCH_SIZE_PARAM],
1282        current_key_range,
1283        filters=json.get(cls.FILTERS_PARAM))
1284
1285
1286class BlobstoreLineInputReader(InputReader):
1287  """Input reader for a newline delimited blob in Blobstore."""
1288
1289  # TODO(user): Should we set this based on MAX_BLOB_FETCH_SIZE?
1290  _BLOB_BUFFER_SIZE = 64000
1291
1292  # Maximum number of shards to allow.
1293  _MAX_SHARD_COUNT = 256
1294
1295  # Maximum number of blobs to allow.
1296  _MAX_BLOB_KEYS_COUNT = 246
1297
1298  # Mapreduce parameters.
1299  BLOB_KEYS_PARAM = "blob_keys"
1300
1301  # Serialization parmaeters.
1302  INITIAL_POSITION_PARAM = "initial_position"
1303  END_POSITION_PARAM = "end_position"
1304  BLOB_KEY_PARAM = "blob_key"
1305
1306  def __init__(self, blob_key, start_position, end_position):
1307    """Initializes this instance with the given blob key and character range.
1308
1309    This BlobstoreInputReader will read from the first record starting after
1310    strictly after start_position until the first record ending at or after
1311    end_position (exclusive). As an exception, if start_position is 0, then
1312    this InputReader starts reading at the first record.
1313
1314    Args:
1315      blob_key: the BlobKey that this input reader is processing.
1316      start_position: the position to start reading at.
1317      end_position: a position in the last record to read.
1318    """
1319    self._blob_key = blob_key
1320    self._blob_reader = blobstore.BlobReader(blob_key,
1321                                             self._BLOB_BUFFER_SIZE,
1322                                             start_position)
1323    self._end_position = end_position
1324    self._has_iterated = False
1325    self._read_before_start = bool(start_position)
1326
1327  def next(self):
1328    """Returns the next input from as an (offset, line) tuple."""
1329    self._has_iterated = True
1330
1331    if self._read_before_start:
1332      self._blob_reader.readline()
1333      self._read_before_start = False
1334    start_position = self._blob_reader.tell()
1335
1336    if start_position > self._end_position:
1337      raise StopIteration()
1338
1339    line = self._blob_reader.readline()
1340
1341    if not line:
1342      raise StopIteration()
1343
1344    return start_position, line.rstrip("\n")
1345
1346  def to_json(self):
1347    """Returns an json-compatible input shard spec for remaining inputs."""
1348    new_pos = self._blob_reader.tell()
1349    if self._has_iterated:
1350      new_pos -= 1
1351    return {self.BLOB_KEY_PARAM: self._blob_key,
1352            self.INITIAL_POSITION_PARAM: new_pos,
1353            self.END_POSITION_PARAM: self._end_position}
1354
1355  def __str__(self):
1356    """Returns the string representation of this BlobstoreLineInputReader."""
1357    return "blobstore.BlobKey(%r):[%d, %d]" % (
1358        self._blob_key, self._blob_reader.tell(), self._end_position)
1359
1360  @classmethod
1361  def from_json(cls, json):
1362    """Instantiates an instance of this InputReader for the given shard spec."""
1363    return cls(json[cls.BLOB_KEY_PARAM],
1364               json[cls.INITIAL_POSITION_PARAM],
1365               json[cls.END_POSITION_PARAM])
1366
1367  @classmethod
1368  def validate(cls, mapper_spec):
1369    """Validates mapper spec and all mapper parameters.
1370
1371    Args:
1372      mapper_spec: The MapperSpec for this InputReader.
1373
1374    Raises:
1375      BadReaderParamsError: required parameters are missing or invalid.
1376    """
1377    if mapper_spec.input_reader_class() != cls:
1378      raise BadReaderParamsError("Mapper input reader class mismatch")
1379    params = _get_params(mapper_spec)
1380    if cls.BLOB_KEYS_PARAM not in params:
1381      raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1382    blob_keys = params[cls.BLOB_KEYS_PARAM]
1383    if isinstance(blob_keys, basestring):
1384      # This is a mechanism to allow multiple blob keys (which do not contain
1385      # commas) in a single string. It may go away.
1386      blob_keys = blob_keys.split(",")
1387    if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1388      raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1389    if not blob_keys:
1390      raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1391    for blob_key in blob_keys:
1392      blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1393      if not blob_info:
1394        raise BadReaderParamsError("Could not find blobinfo for key %s" %
1395                                   blob_key)
1396
1397  @classmethod
1398  def split_input(cls, mapper_spec):
1399    """Returns a list of shard_count input_spec_shards for input_spec.
1400
1401    Args:
1402      mapper_spec: The mapper specification to split from. Must contain
1403          'blob_keys' parameter with one or more blob keys.
1404
1405    Returns:
1406      A list of BlobstoreInputReaders corresponding to the specified shards.
1407    """
1408    params = _get_params(mapper_spec)
1409    blob_keys = params[cls.BLOB_KEYS_PARAM]
1410    if isinstance(blob_keys, basestring):
1411      # This is a mechanism to allow multiple blob keys (which do not contain
1412      # commas) in a single string. It may go away.
1413      blob_keys = blob_keys.split(",")
1414
1415    blob_sizes = {}
1416    for blob_key in blob_keys:
1417      blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1418      blob_sizes[blob_key] = blob_info.size
1419
1420    shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1421    shards_per_blob = shard_count // len(blob_keys)
1422    if shards_per_blob == 0:
1423      shards_per_blob = 1
1424
1425    chunks = []
1426    for blob_key, blob_size in blob_sizes.items():
1427      blob_chunk_size = blob_size // shards_per_blob
1428      for i in xrange(shards_per_blob - 1):
1429        chunks.append(BlobstoreLineInputReader.from_json(
1430            {cls.BLOB_KEY_PARAM: blob_key,
1431             cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
1432             cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
1433      chunks.append(BlobstoreLineInputReader.from_json(
1434          {cls.BLOB_KEY_PARAM: blob_key,
1435           cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
1436           cls.END_POSITION_PARAM: blob_size}))
1437    return chunks
1438
1439
1440class BlobstoreZipInputReader(InputReader):
1441  """Input reader for files from a zip archive stored in the Blobstore.
1442
1443  Each instance of the reader will read the TOC, from the end of the zip file,
1444  and then only the contained files which it is responsible for.
1445  """
1446
1447  # Maximum number of shards to allow.
1448  _MAX_SHARD_COUNT = 256
1449
1450  # Mapreduce parameters.
1451  BLOB_KEY_PARAM = "blob_key"
1452  START_INDEX_PARAM = "start_index"
1453  END_INDEX_PARAM = "end_index"
1454
1455  def __init__(self, blob_key, start_index, end_index,
1456               _reader=blobstore.BlobReader):
1457    """Initializes this instance with the given blob key and file range.
1458
1459    This BlobstoreZipInputReader will read from the file with index start_index
1460    up to but not including the file with index end_index.
1461
1462    Args:
1463      blob_key: the BlobKey that this input reader is processing.
1464      start_index: the index of the first file to read.
1465      end_index: the index of the first file that will not be read.
1466      _reader: a callable that returns a file-like object for reading blobs.
1467          Used for dependency injection.
1468    """
1469    self._blob_key = blob_key
1470    self._start_index = start_index
1471    self._end_index = end_index
1472    self._reader = _reader
1473    self._zip = None
1474    self._entries = None
1475
1476  def next(self):
1477    """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1478
1479    Returns:
1480      The next input from this input reader, in the form of a 2-tuple.
1481      The first element of the tuple is a zipfile.ZipInfo object.
1482      The second element of the tuple is a zero-argument function that, when
1483      called, returns the complete body of the file.
1484    """
1485    if not self._zip:
1486      self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1487      # Get a list of entries, reversed so we can pop entries off in order
1488      self._entries = self._zip.infolist()[self._start_index:self._end_index]
1489      self._entries.reverse()
1490    if not self._entries:
1491      raise StopIteration()
1492    entry = self._entries.pop()
1493    self._start_index += 1
1494    return (entry, lambda: self._read(entry))
1495
1496  def _read(self, entry):
1497    """Read entry content.
1498
1499    Args:
1500      entry: zip file entry as zipfile.ZipInfo.
1501    Returns:
1502      Entry content as string.
1503    """
1504    start_time = time.time()
1505    content = self._zip.read(entry.filename)
1506
1507    ctx = context.get()
1508    if ctx:
1509      operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1510      operation.counters.Increment(
1511          COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1512
1513    return content
1514
1515  @classmethod
1516  def from_json(cls, json):
1517    """Creates an instance of the InputReader for the given input shard state.
1518
1519    Args:
1520      json: The InputReader state as a dict-like object.
1521
1522    Returns:
1523      An instance of the InputReader configured using the values of json.
1524    """
1525    return cls(json[cls.BLOB_KEY_PARAM],
1526               json[cls.START_INDEX_PARAM],
1527               json[cls.END_INDEX_PARAM])
1528
1529  def to_json(self):
1530    """Returns an input shard state for the remaining inputs.
1531
1532    Returns:
1533      A json-izable version of the remaining InputReader.
1534    """
1535    return {self.BLOB_KEY_PARAM: self._blob_key,
1536            self.START_INDEX_PARAM: self._start_index,
1537            self.END_INDEX_PARAM: self._end_index}
1538
1539  def __str__(self):
1540    """Returns the string representation of this BlobstoreZipInputReader."""
1541    return "blobstore.BlobKey(%r):[%d, %d]" % (
1542        self._blob_key, self._start_index, self._end_index)
1543
1544  @classmethod
1545  def validate(cls, mapper_spec):
1546    """Validates mapper spec and all mapper parameters.
1547
1548    Args:
1549      mapper_spec: The MapperSpec for this InputReader.
1550
1551    Raises:
1552      BadReaderParamsError: required parameters are missing or invalid.
1553    """
1554    if mapper_spec.input_reader_class() != cls:
1555      raise BadReaderParamsError("Mapper input reader class mismatch")
1556    params = _get_params(mapper_spec)
1557    if cls.BLOB_KEY_PARAM not in params:
1558      raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1559    blob_key = params[cls.BLOB_KEY_PARAM]
1560    blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1561    if not blob_info:
1562      raise BadReaderParamsError("Could not find blobinfo for key %s" %
1563                                 blob_key)
1564
1565  @classmethod
1566  def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1567    """Returns a list of input shard states for the input spec.
1568
1569    Args:
1570      mapper_spec: The MapperSpec for this InputReader. Must contain
1571          'blob_key' parameter with one blob key.
1572      _reader: a callable that returns a file-like object for reading blobs.
1573          Used for dependency injection.
1574
1575    Returns:
1576      A list of InputReaders spanning files within the zip.
1577    """
1578    params = _get_params(mapper_spec)
1579    blob_key = params[cls.BLOB_KEY_PARAM]
1580    zip_input = zipfile.ZipFile(_reader(blob_key))
1581    zfiles = zip_input.infolist()
1582    total_size = sum(x.file_size for x in zfiles)
1583    num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT)
1584    size_per_shard = total_size // num_shards
1585
1586    # Break the list of files into sublists, each of approximately
1587    # size_per_shard bytes.
1588    shard_start_indexes = [0]
1589    current_shard_size = 0
1590    for i, fileinfo in enumerate(zfiles):
1591      current_shard_size += fileinfo.file_size
1592      if current_shard_size >= size_per_shard:
1593        shard_start_indexes.append(i + 1)
1594        current_shard_size = 0
1595
1596    if shard_start_indexes[-1] != len(zfiles):
1597      shard_start_indexes.append(len(zfiles))
1598
1599    return [cls(blob_key, start_index, end_index, _reader)
1600            for start_index, end_index
1601            in zip(shard_start_indexes, shard_start_indexes[1:])]
1602
1603
1604class BlobstoreZipLineInputReader(InputReader):
1605  """Input reader for newline delimited files in zip archives from Blobstore.
1606
1607  This has the same external interface as the BlobstoreLineInputReader, in that
1608  it takes a list of blobs as its input and yields lines to the reader.
1609  However the blobs themselves are expected to be zip archives of line delimited
1610  files instead of the files themselves.
1611
1612  This is useful as many line delimited files gain greatly from compression.
1613  """
1614
1615  # Maximum number of shards to allow.
1616  _MAX_SHARD_COUNT = 256
1617
1618  # Maximum number of blobs to allow.
1619  _MAX_BLOB_KEYS_COUNT = 246
1620
1621  # Mapreduce parameters.
1622  BLOB_KEYS_PARAM = "blob_keys"
1623
1624  # Serialization parameters.
1625  BLOB_KEY_PARAM = "blob_key"
1626  START_FILE_INDEX_PARAM = "start_file_index"
1627  END_FILE_INDEX_PARAM = "end_file_index"
1628  OFFSET_PARAM = "offset"
1629
1630  def __init__(self, blob_key, start_file_index, end_file_index, offset,
1631               _reader=blobstore.BlobReader):
1632    """Initializes this instance with the given blob key and file range.
1633
1634    This BlobstoreZipLineInputReader will read from the file with index
1635    start_file_index up to but not including the file with index end_file_index.
1636    It will return lines starting at offset within file[start_file_index]
1637
1638    Args:
1639      blob_key: the BlobKey that this input reader is processing.
1640      start_file_index: the index of the first file to read within the zip.
1641      end_file_index: the index of the first file that will not be read.
1642      offset: the byte offset within blob_key.zip[start_file_index] to start
1643        reading. The reader will continue to the end of the file.
1644      _reader: a callable that returns a file-like object for reading blobs.
1645          Used for dependency injection.
1646    """
1647    self._blob_key = blob_key
1648    self._start_file_index = start_file_index
1649    self._end_file_index = end_file_index
1650    self._initial_offset = offset
1651    self._reader = _reader
1652    self._zip = None
1653    self._entries = None
1654    self._filestream = None
1655
1656  @classmethod
1657  def validate(cls, mapper_spec):
1658    """Validates mapper spec and all mapper parameters.
1659
1660    Args:
1661      mapper_spec: The MapperSpec for this InputReader.
1662
1663    Raises:
1664      BadReaderParamsError: required parameters are missing or invalid.
1665    """
1666    if mapper_spec.input_reader_class() != cls:
1667      raise BadReaderParamsError("Mapper input reader class mismatch")
1668    params = _get_params(mapper_spec)
1669    if cls.BLOB_KEYS_PARAM not in params:
1670      raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1671
1672    blob_keys = params[cls.BLOB_KEYS_PARAM]
1673    if isinstance(blob_keys, basestring):
1674      # This is a mechanism to allow multiple blob keys (which do not contain
1675      # commas) in a single string. It may go away.
1676      blob_keys = blob_keys.split(",")
1677    if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1678      raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1679    if not blob_keys:
1680      raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1681    for blob_key in blob_keys:
1682      blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1683      if not blob_info:
1684        raise BadReaderParamsError("Could not find blobinfo for key %s" %
1685                                   blob_key)
1686
1687  @classmethod
1688  def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1689    """Returns a list of input readers for the input spec.
1690
1691    Args:
1692      mapper_spec: The MapperSpec for this InputReader. Must contain
1693          'blob_keys' parameter with one or more blob keys.
1694      _reader: a callable that returns a file-like object for reading blobs.
1695          Used for dependency injection.
1696
1697    Returns:
1698      A list of InputReaders spanning the subfiles within the blobs.
1699      There will be at least one reader per blob, but it will otherwise
1700      attempt to keep the expanded size even.
1701    """
1702    params = _get_params(mapper_spec)
1703    blob_keys = params[cls.BLOB_KEYS_PARAM]
1704    if isinstance(blob_keys, basestring):
1705      # This is a mechanism to allow multiple blob keys (which do not contain
1706      # commas) in a single string. It may go away.
1707      blob_keys = blob_keys.split(",")
1708
1709    blob_files = {}
1710    total_size = 0
1711    for blob_key in blob_keys:
1712      zip_input = zipfile.ZipFile(_reader(blob_key))
1713      blob_files[blob_key] = zip_input.infolist()
1714      total_size += sum(x.file_size for x in blob_files[blob_key])
1715
1716    shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1717
1718    # We can break on both blob key and file-within-zip boundaries.
1719    # A shard will span at minimum a single blob key, but may only
1720    # handle a few files within a blob.
1721
1722    size_per_shard = total_size // shard_count
1723
1724    readers = []
1725    for blob_key in blob_keys:
1726      bfiles = blob_files[blob_key]
1727      current_shard_size = 0
1728      start_file_index = 0
1729      next_file_index = 0
1730      for fileinfo in bfiles:
1731        next_file_index += 1
1732        current_shard_size += fileinfo.file_size
1733        if current_shard_size >= size_per_shard:
1734          readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1735                             _reader))
1736          current_shard_size = 0
1737          start_file_index = next_file_index
1738      if current_shard_size != 0:
1739        readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1740                           _reader))
1741
1742    return readers
1743
1744  def next(self):
1745    """Returns the next line from this input reader as (lineinfo, line) tuple.
1746
1747    Returns:
1748      The next input from this input reader, in the form of a 2-tuple.
1749      The first element of the tuple describes the source, it is itself
1750        a tuple (blobkey, filenumber, byteoffset).
1751      The second element of the tuple is the line found at that offset.
1752    """
1753    if not self._filestream:
1754      if not self._zip:
1755        self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1756        # Get a list of entries, reversed so we can pop entries off in order
1757        self._entries = self._zip.infolist()[self._start_file_index:
1758                                             self._end_file_index]
1759        self._entries.reverse()
1760      if not self._entries:
1761        raise StopIteration()
1762      entry = self._entries.pop()
1763      value = self._zip.read(entry.filename)
1764      self._filestream = StringIO.StringIO(value)
1765      if self._initial_offset:
1766        self._filestream.seek(self._initial_offset)
1767        self._filestream.readline()
1768
1769    start_position = self._filestream.tell()
1770    line = self._filestream.readline()
1771
1772    if not line:
1773      # Done with this file in the zip. Move on to the next file.
1774      self._filestream.close()
1775      self._filestream = None
1776      self._start_file_index += 1
1777      self._initial_offset = 0
1778      return self.next()
1779
1780    return ((self._blob_key, self._start_file_index, start_position),
1781            line.rstrip("\n"))
1782
1783  def _next_offset(self):
1784    """Return the offset of the next line to read."""
1785    if self._filestream:
1786      offset = self._filestream.tell()
1787      if offset:
1788        offset -= 1
1789    else:
1790      offset = self._initial_offset
1791
1792    return offset
1793
1794  def to_json(self):
1795    """Returns an input shard state for the remaining inputs.
1796
1797    Returns:
1798      A json-izable version of the remaining InputReader.
1799    """
1800
1801    return {self.BLOB_KEY_PARAM: self._blob_key,
1802            self.START_FILE_INDEX_PARAM: self._start_file_index,
1803            self.END_FILE_INDEX_PARAM: self._end_file_index,
1804            self.OFFSET_PARAM: self._next_offset()}
1805
1806  @classmethod
1807  def from_json(cls, json, _reader=blobstore.BlobReader):
1808    """Creates an instance of the InputReader for the given input shard state.
1809
1810    Args:
1811      json: The InputReader state as a dict-like object.
1812      _reader: For dependency injection.
1813
1814    Returns:
1815      An instance of the InputReader configured using the values of json.
1816    """
1817    return cls(json[cls.BLOB_KEY_PARAM],
1818               json[cls.START_FILE_INDEX_PARAM],
1819               json[cls.END_FILE_INDEX_PARAM],
1820               json[cls.OFFSET_PARAM],
1821               _reader)
1822
1823  def __str__(self):
1824    """Returns the string representation of this reader.
1825
1826    Returns:
1827      string blobkey:[start file num, end file num]:current offset.
1828    """
1829    return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1830        self._blob_key, self._start_file_index, self._end_file_index,
1831        self._next_offset())
1832
1833
1834class RandomStringInputReader(InputReader):
1835  """RandomStringInputReader generates random strings as output.
1836
1837  Primary usage is to populate output with testing entries.
1838  """
1839
1840  # Total number of entries this reader should generate.
1841  COUNT = "count"
1842  # Length of the generated strings.
1843  STRING_LENGTH = "string_length"
1844
1845  DEFAULT_STRING_LENGTH = 10
1846
1847  def __init__(self, count, string_length):
1848    """Initialize input reader.
1849
1850    Args:
1851      count: number of entries this shard should generate.
1852      string_length: the length of generated random strings.
1853    """
1854    self._count = count
1855    self._string_length = string_length
1856
1857  def __iter__(self):
1858    ctx = context.get()
1859
1860    while self._count:
1861      self._count -= 1
1862      start_time = time.time()
1863      content = "".join(random.choice(string.ascii_lowercase)
1864                        for _ in range(self._string_length))
1865      if ctx:
1866        operation.counters.Increment(
1867            COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1868        operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1869      yield content
1870
1871  @classmethod
1872  def split_input(cls, mapper_spec):
1873    params = _get_params(mapper_spec)
1874    count = params[cls.COUNT]
1875    string_length = cls.DEFAULT_STRING_LENGTH
1876    if cls.STRING_LENGTH in params:
1877      string_length = params[cls.STRING_LENGTH]
1878
1879    shard_count = mapper_spec.shard_count
1880    count_per_shard = count // shard_count
1881
1882    mr_input_readers = [
1883        cls(count_per_shard, string_length) for _ in range(shard_count)]
1884
1885    left = count - count_per_shard*shard_count
1886    if left > 0:
1887      mr_input_readers.append(cls(left, string_length))
1888
1889    return mr_input_readers
1890
1891  @classmethod
1892  def validate(cls, mapper_spec):
1893    if mapper_spec.input_reader_class() != cls:
1894      raise BadReaderParamsError("Mapper input reader class mismatch")
1895
1896    params = _get_params(mapper_spec)
1897    if cls.COUNT not in params:
1898      raise BadReaderParamsError("Must specify %s" % cls.COUNT)
1899    if not isinstance(params[cls.COUNT], int):
1900      raise BadReaderParamsError("%s should be an int but is %s" %
1901                                 (cls.COUNT, type(params[cls.COUNT])))
1902    if params[cls.COUNT] <= 0:
1903      raise BadReaderParamsError("%s should be a positive int")
1904    if cls.STRING_LENGTH in params and not (
1905        isinstance(params[cls.STRING_LENGTH], int) and
1906        params[cls.STRING_LENGTH] > 0):
1907      raise BadReaderParamsError("%s should be a positive int but is %s" %
1908                                 (cls.STRING_LENGTH, params[cls.STRING_LENGTH]))
1909    if (not isinstance(mapper_spec.shard_count, int) or
1910        mapper_spec.shard_count <= 0):
1911      raise BadReaderParamsError(
1912          "shard_count should be a positive int but is %s" %
1913          mapper_spec.shard_count)
1914
1915  @classmethod
1916  def from_json(cls, json):
1917    return cls(json[cls.COUNT], json[cls.STRING_LENGTH])
1918
1919  def to_json(self):
1920    return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
1921
1922
1923# TODO(user): This reader always produces only one shard, because
1924# namespace entities use the mix of ids/names, and KeyRange-based splitting
1925# doesn't work satisfactory in this case.
1926# It's possible to implement specific splitting functionality for the reader
1927# instead of reusing generic one. Meanwhile 1 shard is enough for our
1928# applications.
1929class NamespaceInputReader(InputReader):
1930  """An input reader to iterate over namespaces.
1931
1932  This reader yields namespace names as string.
1933  It will always produce only one shard.
1934  """
1935
1936  NAMESPACE_RANGE_PARAM = "namespace_range"
1937  BATCH_SIZE_PARAM = "batch_size"
1938  _BATCH_SIZE = 10
1939
1940  def __init__(self, ns_range, batch_size=_BATCH_SIZE):
1941    self.ns_range = ns_range
1942    self._batch_size = batch_size
1943
1944  def to_json(self):
1945    """Serializes all the data in this query range into json form.
1946
1947    Returns:
1948      all the data in json-compatible map.
1949    """
1950    return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(),
1951            self.BATCH_SIZE_PARAM: self._batch_size}
1952
1953  @classmethod
1954  def from_json(cls, json):
1955    """Create new DatastoreInputReader from the json, encoded by to_json.
1956
1957    Args:
1958      json: json map representation of DatastoreInputReader.
1959
1960    Returns:
1961      an instance of DatastoreInputReader with all data deserialized from json.
1962    """
1963    return cls(
1964        namespace_range.NamespaceRange.from_json_object(
1965            json[cls.NAMESPACE_RANGE_PARAM]),
1966        json[cls.BATCH_SIZE_PARAM])
1967
1968  @classmethod
1969  def validate(cls, mapper_spec):
1970    """Validates mapper spec.
1971
1972    Args:
1973      mapper_spec: The MapperSpec for this InputReader.
1974
1975    Raises:
1976      BadReaderParamsError: required parameters are missing or invalid.
1977    """
1978    if mapper_spec.input_reader_class() != cls:
1979      raise BadReaderParamsError("Input reader class mismatch")
1980    params = _get_params(mapper_spec)
1981    if cls.BATCH_SIZE_PARAM in params:
1982      try:
1983        batch_size = int(params[cls.BATCH_SIZE_PARAM])
1984        if batch_size < 1:
1985          raise BadReaderParamsError("Bad batch size: %s" % batch_size)
1986      except ValueError, e:
1987        raise BadReaderParamsError("Bad batch size: %s" % e)
1988
1989  @classmethod
1990  def split_input(cls, mapper_spec):
1991    """Returns a list of input readers for the input spec.
1992
1993    Args:
1994      mapper_spec: The MapperSpec for this InputReader.
1995
1996    Returns:
1997      A list of InputReaders.
1998    """
1999    batch_size = int(_get_params(mapper_spec).get(
2000        cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
2001    shard_count = mapper_spec.shard_count
2002    namespace_ranges = namespace_range.NamespaceRange.split(shard_count,
2003                                                            contiguous=True)
2004    return [NamespaceInputReader(ns_range, batch_size)
2005            for ns_range in namespace_ranges]
2006
2007  def __iter__(self):
2008    while True:
2009      keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size)
2010      if not keys:
2011        break
2012
2013      for key in keys:
2014        namespace = metadata.Namespace.key_to_namespace(key)
2015        self.ns_range = self.ns_range.with_start_after(namespace)
2016        yield namespace
2017
2018  def __str__(self):
2019    return repr(self.ns_range)
2020
2021
2022class LogInputReader(InputReader):
2023  """Input reader for a time range of logs via the Logs Reader API.
2024
2025  The number of input shards may be specified by the SHARDS_PARAM mapper
2026  parameter.  A starting and ending time (in seconds since the Unix epoch) are
2027  required to generate time ranges over which to shard the input.
2028  """
2029  # Parameters directly mapping to those available via logservice.fetch().
2030  START_TIME_PARAM = "start_time"
2031  END_TIME_PARAM = "end_time"
2032  MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level"
2033  INCLUDE_INCOMPLETE_PARAM = "include_incomplete"
2034  INCLUDE_APP_LOGS_PARAM = "include_app_logs"
2035  VERSION_IDS_PARAM = "version_ids"
2036  MODULE_VERSIONS_PARAM = "module_versions"
2037
2038  # Semi-hidden parameters used only internally or for privileged applications.
2039  _OFFSET_PARAM = "offset"
2040  _PROTOTYPE_REQUEST_PARAM = "prototype_request"
2041
2042  _PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM,
2043                       MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM,
2044                       INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM,
2045                       MODULE_VERSIONS_PARAM, _PROTOTYPE_REQUEST_PARAM])
2046  _KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM])
2047
2048  def __init__(self,
2049               start_time=None,
2050               end_time=None,
2051               minimum_log_level=None,
2052               include_incomplete=False,
2053               include_app_logs=False,
2054               version_ids=None,
2055               module_versions=None,
2056               **kwargs):
2057    """Constructor.
2058
2059    Args:
2060      start_time: The earliest request completion or last-update time of logs
2061        that should be mapped over, in seconds since the Unix epoch.
2062      end_time: The latest request completion or last-update time that logs
2063        should be mapped over, in seconds since the Unix epoch.
2064      minimum_log_level: An application log level which serves as a filter on
2065        the requests mapped over--requests with no application log at or above
2066        the specified level will be omitted, even if include_app_logs is False.
2067      include_incomplete: Whether or not to include requests that have started
2068        but not yet finished, as a boolean.  Defaults to False.
2069      include_app_logs: Whether or not to include application level logs in the
2070        mapped logs, as a boolean.  Defaults to False.
2071      version_ids: A list of version ids whose logs should be read. This can not
2072        be used with module_versions
2073      module_versions: A list of tuples containing a module and version id
2074        whose logs should be read. This can not be used with version_ids
2075      **kwargs: A dictionary of keywords associated with this input reader.
2076    """
2077    InputReader.__init__(self)  # pylint: disable=non-parent-init-called
2078
2079    # The rule for __params is that its contents will always be suitable as
2080    # input to logservice.fetch().
2081    self.__params = dict(kwargs)
2082
2083    if start_time is not None:
2084      self.__params[self.START_TIME_PARAM] = start_time
2085    if end_time is not None:
2086      self.__params[self.END_TIME_PARAM] = end_time
2087    if minimum_log_level is not None:
2088      self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level
2089    if include_incomplete is not None:
2090      self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete
2091    if include_app_logs is not None:
2092      self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs
2093    if version_ids:
2094      self.__params[self.VERSION_IDS_PARAM] = version_ids
2095    if module_versions:
2096      self.__params[self.MODULE_VERSIONS_PARAM] = module_versions
2097
2098    # Any submitted prototype_request will be in encoded form.
2099    if self._PROTOTYPE_REQUEST_PARAM in self.__params:
2100      prototype_request = log_service_pb.LogReadRequest(
2101          self.__params[self._PROTOTYPE_REQUEST_PARAM])
2102      self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request
2103
2104  def __iter__(self):
2105    """Iterates over logs in a given range of time.
2106
2107    Yields:
2108      A RequestLog containing all the information for a single request.
2109    """
2110    for log in logservice.fetch(**self.__params):
2111      self.__params[self._OFFSET_PARAM] = log.offset
2112      yield log
2113
2114  @classmethod
2115  def from_json(cls, json):
2116    """Creates an instance of the InputReader for the given input shard's state.
2117
2118    Args:
2119      json: The InputReader state as a dict-like object.
2120
2121    Returns:
2122      An instance of the InputReader configured using the given JSON parameters.
2123    """
2124    # Strip out unrecognized parameters, as introduced by b/5960884.
2125    params = dict((str(k), v) for k, v in json.iteritems()
2126                  if k in cls._PARAMS)
2127
2128    # This is not symmetric with to_json() wrt. PROTOTYPE_REQUEST_PARAM because
2129    # the constructor parameters need to be JSON-encodable, so the decoding
2130    # needs to happen there anyways.
2131    if cls._OFFSET_PARAM in params:
2132      params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM])
2133    return cls(**params)
2134
2135  def to_json(self):
2136    """Returns an input shard state for the remaining inputs.
2137
2138    Returns:
2139      A JSON serializable version of the remaining input to read.
2140    """
2141
2142    params = dict(self.__params)  # Shallow copy.
2143    if self._PROTOTYPE_REQUEST_PARAM in params:
2144      prototype_request = params[self._PROTOTYPE_REQUEST_PARAM]
2145      params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode()
2146    if self._OFFSET_PARAM in params:
2147      params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM])
2148    return params
2149
2150  @classmethod
2151  def split_input(cls, mapper_spec):
2152    """Returns a list of input readers for the given input specification.
2153
2154    Args:
2155      mapper_spec: The MapperSpec for this InputReader.
2156
2157    Returns:
2158      A list of InputReaders.
2159    """
2160    params = _get_params(mapper_spec)
2161    shard_count = mapper_spec.shard_count
2162
2163    # Pick out the overall start and end times and time step per shard.
2164    start_time = params[cls.START_TIME_PARAM]
2165    end_time = params[cls.END_TIME_PARAM]
2166    seconds_per_shard = (end_time - start_time) / shard_count
2167
2168    # Create a LogInputReader for each shard, modulating the params as we go.
2169    shards = []
2170    for _ in xrange(shard_count - 1):
2171      params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] +
2172                                    seconds_per_shard)
2173      shards.append(LogInputReader(**params))
2174      params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM]
2175
2176    # Create a final shard to complete the time range.
2177    params[cls.END_TIME_PARAM] = end_time
2178    return shards + [LogInputReader(**params)]
2179
2180  @classmethod
2181  def validate(cls, mapper_spec):
2182    """Validates the mapper's specification and all necessary parameters.
2183
2184    Args:
2185      mapper_spec: The MapperSpec to be used with this InputReader.
2186
2187    Raises:
2188      BadReaderParamsError: If the user fails to specify both a starting time
2189        and an ending time, or if the starting time is later than the ending
2190        time.
2191    """
2192    if mapper_spec.input_reader_class() != cls:
2193      raise errors.BadReaderParamsError("Input reader class mismatch")
2194
2195    params = _get_params(mapper_spec, allowed_keys=cls._PARAMS)
2196    if (cls.VERSION_IDS_PARAM not in params and
2197        cls.MODULE_VERSIONS_PARAM not in params):
2198      raise errors.BadReaderParamsError("Must specify a list of version ids or "
2199                                        "module/version ids for mapper input")
2200    if (cls.VERSION_IDS_PARAM in params and
2201        cls.MODULE_VERSIONS_PARAM in params):
2202      raise errors.BadReaderParamsError("Can not supply both version ids or "
2203                                        "module/version ids. Use only one.")
2204    if (cls.START_TIME_PARAM not in params or
2205        params[cls.START_TIME_PARAM] is None):
2206      raise errors.BadReaderParamsError("Must specify a starting time for "
2207                                        "mapper input")
2208    if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None:
2209      params[cls.END_TIME_PARAM] = time.time()
2210
2211    if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]:
2212      raise errors.BadReaderParamsError("The starting time cannot be later "
2213                                        "than or the same as the ending time.")
2214
2215    if cls._PROTOTYPE_REQUEST_PARAM in params:
2216      try:
2217        params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest(
2218            params[cls._PROTOTYPE_REQUEST_PARAM])
2219      except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
2220        raise errors.BadReaderParamsError("The prototype request must be "
2221                                          "parseable as a LogReadRequest.")
2222
2223    # Pass the parameters to logservice.fetch() to verify any underlying
2224    # constraints on types or values.  This only constructs an iterator, it
2225    # doesn't trigger any requests for actual log records.
2226    try:
2227      logservice.fetch(**params)
2228    except logservice.InvalidArgumentError, e:
2229      raise errors.BadReaderParamsError("One or more parameters are not valid "
2230                                        "inputs to logservice.fetch(): %s" % e)
2231
2232  def __str__(self):
2233    """Returns the string representation of this LogInputReader."""
2234    params = []
2235    for key in sorted(self.__params.keys()):
2236      value = self.__params[key]
2237      if key is self._PROTOTYPE_REQUEST_PARAM:
2238        params.append("%s='%s'" % (key, value))
2239      elif key is self._OFFSET_PARAM:
2240        params.append("%s='%s'" % (key, value))
2241      else:
2242        params.append("%s=%s" % (key, value))
2243
2244    return "LogInputReader(%s)" % ", ".join(params)
2245
2246
2247class _GoogleCloudStorageInputReader(InputReader):
2248  """Input reader from Google Cloud Storage using the cloudstorage library.
2249
2250  This class is expected to be subclassed with a reader that understands
2251  user-level records.
2252
2253  Required configuration in the mapper_spec.input_reader dictionary.
2254    BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or
2255      suffixed such as directories.
2256    OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
2257      in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
2258      treated as prefix and all objects with matching names will be read.
2259      Entries should not start with a slash unless that is part of the object's
2260      name. An example list could be:
2261      ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
2262      To retrieve all files "*" will match every object in the bucket. If a file
2263      is listed twice or is covered by multiple prefixes it will be read twice,
2264      there is no deduplication.
2265
2266  Optional configuration in the mapper_sec.input_reader dictionary.
2267    BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
2268    DELIMITER_PARAM: if specified, turn on the shallow splitting mode.
2269      The delimiter is used as a path separator to designate directory
2270      hierarchy. Matching of prefixes from OBJECT_NAME_PARAM
2271      will stop at the first directory instead of matching
2272      all files under the directory. This allows MR to process bucket with
2273      hundreds of thousands of files.
2274    FAIL_ON_MISSING_INPUT: if specified and True, the MR will fail if any of
2275      the input files are missing. Missing files will be skipped otherwise.
2276  """
2277
2278  # Supported parameters
2279  BUCKET_NAME_PARAM = "bucket_name"
2280  OBJECT_NAMES_PARAM = "objects"
2281  BUFFER_SIZE_PARAM = "buffer_size"
2282  DELIMITER_PARAM = "delimiter"
2283  FAIL_ON_MISSING_INPUT = "fail_on_missing_input"
2284
2285  # Internal parameters
2286  _ACCOUNT_ID_PARAM = "account_id"
2287
2288  # Other internal configuration constants
2289  _JSON_PICKLE = "pickle"
2290  _JSON_FAIL_ON_MISSING_INPUT = "fail_on_missing_input"
2291  _STRING_MAX_FILES_LISTED = 10  # Max files shown in the str representation
2292
2293  # Input reader can also take in start and end filenames and do
2294  # listbucket. This saves space but has two cons.
2295  # 1. Files to read are less well defined: files can be added or removed over
2296  #    the lifetime of the MR job.
2297  # 2. A shard has to process files from a contiguous namespace.
2298  #    May introduce staggering shard.
2299  def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
2300               delimiter=None):
2301    """Initialize a GoogleCloudStorageInputReader instance.
2302
2303    Args:
2304      filenames: A list of Google Cloud Storage filenames of the form
2305        '/bucket/objectname'.
2306      index: Index of the next filename to read.
2307      buffer_size: The size of the read buffer, None to use default.
2308      _account_id: Internal use only. See cloudstorage documentation.
2309      delimiter: Delimiter used as path separator. See class doc for details.
2310    """
2311    self._filenames = filenames
2312    self._index = index
2313    self._buffer_size = buffer_size
2314    self._account_id = _account_id
2315    self._delimiter = delimiter
2316    self._bucket = None
2317    self._bucket_iter = None
2318
2319    # True iff we should fail on missing input (see class doc above). Set to
2320    # None in constructor and overwritten in split_input and from_json.
2321    # fail_on_missing_input is not parameter of the constructor to avoid
2322    # breaking classes inheriting from _GoogleCloudStorageInputReader and
2323    # overriding the constructor.
2324    self._fail_on_missing_input = None
2325
2326  def _next_file(self):
2327    """Find next filename.
2328
2329    self._filenames may need to be expanded via listbucket.
2330
2331    Returns:
2332      None if no more file is left. Filename otherwise.
2333    """
2334    while True:
2335      if self._bucket_iter:
2336        try:
2337          return self._bucket_iter.next().filename
2338        except StopIteration:
2339          self._bucket_iter = None
2340          self._bucket = None
2341      if self._index >= len(self._filenames):
2342        return
2343      filename = self._filenames[self._index]
2344      self._index += 1
2345      if self._delimiter is None or not filename.endswith(self._delimiter):
2346        return filename
2347      self._bucket = cloudstorage.listbucket(filename,
2348                                             delimiter=self._delimiter)
2349      self._bucket_iter = iter(self._bucket)
2350
2351  @classmethod
2352  def get_params(cls, mapper_spec, allowed_keys=None, allow_old=True):
2353    params = _get_params(mapper_spec, allowed_keys, allow_old)
2354    # Use the bucket_name defined in mapper_spec params if one was not defined
2355    # specifically in the input_reader params.
2356    if (mapper_spec.params.get(cls.BUCKET_NAME_PARAM) is not None and
2357        params.get(cls.BUCKET_NAME_PARAM) is None):
2358      params[cls.BUCKET_NAME_PARAM] = mapper_spec.params[cls.BUCKET_NAME_PARAM]
2359    return params
2360
2361  @classmethod
2362  def validate(cls, mapper_spec):
2363    """Validate mapper specification.
2364
2365    Args:
2366      mapper_spec: an instance of model.MapperSpec
2367
2368    Raises:
2369      BadReaderParamsError: if the specification is invalid for any reason such
2370        as missing the bucket name or providing an invalid bucket name.
2371    """
2372    reader_spec = cls.get_params(mapper_spec, allow_old=False)
2373
2374    # Bucket Name is required
2375    if cls.BUCKET_NAME_PARAM not in reader_spec:
2376      raise errors.BadReaderParamsError(
2377          "%s is required for Google Cloud Storage" %
2378          cls.BUCKET_NAME_PARAM)
2379    try:
2380      cloudstorage.validate_bucket_name(
2381          reader_spec[cls.BUCKET_NAME_PARAM])
2382    except ValueError, error:
2383      raise errors.BadReaderParamsError("Bad bucket name, %s" % (error))
2384
2385    # Object Name(s) are required
2386    if cls.OBJECT_NAMES_PARAM not in reader_spec:
2387      raise errors.BadReaderParamsError(
2388          "%s is required for Google Cloud Storage" %
2389          cls.OBJECT_NAMES_PARAM)
2390    filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
2391    if not isinstance(filenames, list):
2392      raise errors.BadReaderParamsError(
2393          "Object name list is not a list but a %s" %
2394          filenames.__class__.__name__)
2395    for filename in filenames:
2396      if not isinstance(filename, basestring):
2397        raise errors.BadReaderParamsError(
2398            "Object name is not a string but a %s" %
2399            filename.__class__.__name__)
2400    if cls.DELIMITER_PARAM in reader_spec:
2401      delimiter = reader_spec[cls.DELIMITER_PARAM]
2402      if not isinstance(delimiter, basestring):
2403        raise errors.BadReaderParamsError(
2404            "%s is not a string but a %s" %
2405            (cls.DELIMITER_PARAM, type(delimiter)))
2406
2407  @classmethod
2408  def split_input(cls, mapper_spec):
2409    """Returns a list of input readers.
2410
2411    An equal number of input files are assigned to each shard (+/- 1). If there
2412    are fewer files than shards, fewer than the requested number of shards will
2413    be used. Input files are currently never split (although for some formats
2414    could be and may be split in a future implementation).
2415
2416    Args:
2417      mapper_spec: an instance of model.MapperSpec.
2418
2419    Returns:
2420      A list of InputReaders. None when no input data can be found.
2421    """
2422    reader_spec = cls.get_params(mapper_spec, allow_old=False)
2423    bucket = reader_spec[cls.BUCKET_NAME_PARAM]
2424    filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
2425    delimiter = reader_spec.get(cls.DELIMITER_PARAM)
2426    account_id = reader_spec.get(cls._ACCOUNT_ID_PARAM)
2427    buffer_size = reader_spec.get(cls.BUFFER_SIZE_PARAM)
2428    fail_on_missing_input = reader_spec.get(cls.FAIL_ON_MISSING_INPUT)
2429
2430    # Gather the complete list of files (expanding wildcards)
2431    all_filenames = []
2432    for filename in filenames:
2433      if filename.endswith("*"):
2434        all_filenames.extend(
2435            [file_stat.filename for file_stat in cloudstorage.listbucket(
2436                "/" + bucket + "/" + filename[:-1], delimiter=delimiter,
2437                _account_id=account_id)])
2438      else:
2439        all_filenames.append("/%s/%s" % (bucket, filename))
2440
2441    # Split into shards
2442    readers = []
2443    for shard in range(0, mapper_spec.shard_count):
2444      shard_filenames = all_filenames[shard::mapper_spec.shard_count]
2445      if shard_filenames:
2446        reader = cls(
2447            shard_filenames, buffer_size=buffer_size, _account_id=account_id,
2448            delimiter=delimiter)
2449        reader._fail_on_missing_input = fail_on_missing_input
2450        readers.append(reader)
2451    return readers
2452
2453  @classmethod
2454  def from_json(cls, state):
2455    obj = pickle.loads(state[cls._JSON_PICKLE])
2456    # fail_on_missing_input might not be set - default to False.
2457    obj._fail_on_missing_input = state.get(
2458        cls._JSON_FAIL_ON_MISSING_INPUT, False)
2459    if obj._bucket:
2460      obj._bucket_iter = iter(obj._bucket)
2461    return obj
2462
2463  def to_json(self):
2464    before_iter = self._bucket_iter
2465    self._bucket_iter = None
2466    try:
2467      return {
2468          self._JSON_PICKLE: pickle.dumps(self),
2469          # self._fail_on_missing_input gets pickled but we save it separately
2470          # and override it in from_json to deal with version flipping.
2471          self._JSON_FAIL_ON_MISSING_INPUT:
2472              getattr(self, "_fail_on_missing_input", False)
2473      }
2474      return {self._JSON_PICKLE: pickle.dumps(self)}
2475    finally:
2476      self._bucket_itr = before_iter
2477
2478  def next(self):
2479    """Returns the next input from this input reader, a block of bytes.
2480
2481    Non existent files will be logged and skipped. The file might have been
2482    removed after input splitting.
2483
2484    Returns:
2485      The next input from this input reader in the form of a cloudstorage
2486      ReadBuffer that supports a File-like interface (read, readline, seek,
2487      tell, and close). An error may be raised if the file can not be opened.
2488
2489    Raises:
2490      StopIteration: The list of files has been exhausted.
2491    """
2492    options = {}
2493    if self._buffer_size:
2494      options["read_buffer_size"] = self._buffer_size
2495    if self._account_id:
2496      options["_account_id"] = self._account_id
2497    while True:
2498      filename = self._next_file()
2499      if filename is None:
2500        raise StopIteration()
2501      try:
2502        start_time = time.time()
2503        handle = cloudstorage.open(filename, **options)
2504
2505        ctx = context.get()
2506        if ctx:
2507          operation.counters.Increment(
2508              COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2509
2510        return handle
2511      except cloudstorage.NotFoundError:
2512        # Fail the job if we're strict on missing input.
2513        if getattr(self, "_fail_on_missing_input", False):
2514          raise errors.FailJobError(
2515              "File missing in GCS, aborting: %s" % filename)
2516        # Move on otherwise.
2517        logging.warning("File %s may have been removed. Skipping file.",
2518                        filename)
2519
2520  def __str__(self):
2521    # Only show a limited number of files individually for readability
2522    num_files = len(self._filenames)
2523    if num_files > self._STRING_MAX_FILES_LISTED:
2524      names = "%s...%s + %d not shown" % (
2525          ",".join(self._filenames[0:self._STRING_MAX_FILES_LISTED-1]),
2526          self._filenames[-1],
2527          num_files - self._STRING_MAX_FILES_LISTED)
2528    else:
2529      names = ",".join(self._filenames)
2530
2531    if self._index > num_files:
2532      status = "EOF"
2533    else:
2534      status = "Next %s (%d of %d)" % (
2535          self._filenames[self._index],
2536          self._index + 1,  # +1 for human 1-indexing
2537          num_files)
2538    return "CloudStorage [%s, %s]" % (status, names)
2539
2540
2541GoogleCloudStorageInputReader = _GoogleCloudStorageInputReader
2542
2543
2544class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader):
2545  """Read data from a Google Cloud Storage file using LevelDB format.
2546
2547  See the _GoogleCloudStorageOutputWriter for additional configuration options.
2548  """
2549
2550  def __getstate__(self):
2551    result = self.__dict__.copy()
2552    # record reader may not exist if reader has not been used
2553    if "_record_reader" in result:
2554      # RecordsReader has no buffering, it can safely be reconstructed after
2555      # deserialization
2556      result.pop("_record_reader")
2557    return result
2558
2559  def next(self):
2560    """Returns the next input from this input reader, a record.
2561
2562    Returns:
2563      The next input from this input reader in the form of a record read from
2564      an LevelDB file.
2565
2566    Raises:
2567      StopIteration: The ordered set records has been exhausted.
2568    """
2569    while True:
2570      if not hasattr(self, "_cur_handle") or self._cur_handle is None:
2571        # If there are no more files, StopIteration is raised here
2572        self._cur_handle = super(_GoogleCloudStorageRecordInputReader,
2573                                 self).next()
2574      if not hasattr(self, "_record_reader") or self._record_reader is None:
2575        self._record_reader = records.RecordsReader(self._cur_handle)
2576
2577      try:
2578        start_time = time.time()
2579        content = self._record_reader.read()
2580
2581        ctx = context.get()
2582        if ctx:
2583          operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
2584          operation.counters.Increment(
2585              COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2586        return content
2587
2588      except EOFError:
2589        self._cur_handle = None
2590        self._record_reader = None
2591
2592
2593GoogleCloudStorageRecordInputReader = _GoogleCloudStorageRecordInputReader
2594
2595
2596class _ReducerReader(_GoogleCloudStorageRecordInputReader):
2597  """Reader to read KeyValues records from GCS."""
2598
2599  expand_parameters = True
2600
2601  def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
2602               delimiter=None):
2603    super(_ReducerReader, self).__init__(filenames, index, buffer_size,
2604                                         _account_id, delimiter)
2605    self.current_key = None
2606    self.current_values = None
2607
2608  def __iter__(self):
2609    ctx = context.get()
2610    combiner = None
2611
2612    if ctx:
2613      combiner_spec = ctx.mapreduce_spec.mapper.params.get("combiner_spec")
2614      if combiner_spec:
2615        combiner = util.handler_for_name(combiner_spec)
2616
2617    try:
2618      while True:
2619        binary_record = super(_ReducerReader, self).next()
2620        proto = kv_pb.KeyValues()
2621        proto.ParseFromString(binary_record)
2622
2623        to_yield = None
2624        if self.current_key is not None and self.current_key != proto.key():
2625          to_yield = (self.current_key, self.current_values)
2626          self.current_key = None
2627          self.current_values = None
2628
2629        if self.current_key is None:
2630          self.current_key = proto.key()
2631          self.current_values = []
2632
2633        if combiner:
2634          combiner_result = combiner(
2635              self.current_key, proto.value_list(), self.current_values)
2636
2637          if not util.is_generator(combiner_result):
2638            raise errors.BadCombinerOutputError(
2639                "Combiner %s should yield values instead of returning them "
2640                "(%s)" % (combiner, combiner_result))
2641
2642          self.current_values = []
2643          for value in combiner_result:
2644            if isinstance(value, operation.Operation):
2645              value(ctx)
2646            else:
2647              # With combiner the current values always come from the combiner.
2648              self.current_values.append(value)
2649
2650          # Check-point after each combiner call is run only when there's
2651          # nothing that needs to be yielded below. Otherwise allowing a
2652          # check-point here would cause the current to_yield data to be lost.
2653          if not to_yield:
2654            yield ALLOW_CHECKPOINT
2655        else:
2656          # Without combiner we just accumulate values.
2657          self.current_values.extend(proto.value_list())
2658
2659        if to_yield:
2660          yield to_yield
2661          # Check-point after each key is yielded.
2662          yield ALLOW_CHECKPOINT
2663    except StopIteration:
2664      pass
2665
2666    # There may be some accumulated values left at the end of an input file
2667    # so be sure to yield those too.
2668    if self.current_key is not None:
2669      to_yield = (self.current_key, self.current_values)
2670      self.current_key = None
2671      self.current_values = None
2672      yield to_yield
2673
2674  @staticmethod
2675  def encode_data(data):
2676    """Encodes the given data, which may have include raw bytes.
2677
2678    Works around limitations in JSON encoding, which cannot handle raw bytes.
2679
2680    Args:
2681      data: the data to encode.
2682
2683    Returns:
2684      The data encoded.
2685    """
2686    return base64.b64encode(pickle.dumps(data))
2687
2688  @staticmethod
2689  def decode_data(data):
2690    """Decodes data encoded with the encode_data function."""
2691    return pickle.loads(base64.b64decode(data))
2692
2693  def to_json(self):
2694    """Returns an input shard state for the remaining inputs.
2695
2696    Returns:
2697      A json-izable version of the remaining InputReader.
2698    """
2699    result = super(_ReducerReader, self).to_json()
2700    result["current_key"] = self.encode_data(self.current_key)
2701    result["current_values"] = self.encode_data(self.current_values)
2702    return result
2703
2704  @classmethod
2705  def from_json(cls, json):
2706    """Creates an instance of the InputReader for the given input shard state.
2707
2708    Args:
2709      json: The InputReader state as a dict-like object.
2710
2711    Returns:
2712      An instance of the InputReader configured using the values of json.
2713    """
2714    result = super(_ReducerReader, cls).from_json(json)
2715    result.current_key = _ReducerReader.decode_data(json["current_key"])
2716    result.current_values = _ReducerReader.decode_data(json["current_values"])
2717    return result
2718