1#!/usr/bin/env python
2# Copyright 2010 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Utility functions for use with the mapreduce library."""
17
18# pylint: disable=g-bad-name
19
20
21
22__all__ = [
23    "create_datastore_write_config",
24    "for_name",
25    "get_queue_name",
26    "get_short_name",
27    "handler_for_name",
28    "is_generator",
29    "parse_bool",
30    "total_seconds",
31    "try_serialize_handler",
32    "try_deserialize_handler",
33    "CALLBACK_MR_ID_TASK_HEADER",
34    "strip_prefix_from_items"
35    ]
36
37import inspect
38import os
39import pickle
40import random
41import sys
42import time
43import types
44
45from google.appengine.ext import ndb
46
47from google.appengine.datastore import datastore_rpc
48from mapreduce import parameters
49
50# Taskqueue task header for mr id. Use internal by MR.
51_MR_ID_TASK_HEADER = "AE-MR-ID"
52_MR_SHARD_ID_TASK_HEADER = "AE-MR-SHARD-ID"
53
54# Callback task MR ID task header
55CALLBACK_MR_ID_TASK_HEADER = "Mapreduce-Id"
56
57
58# Ridiculous future UNIX epoch time, 500 years from now.
59_FUTURE_TIME = 2**34
60
61
62def _get_descending_key(gettime=time.time):
63  """Returns a key name lexically ordered by time descending.
64
65  This lets us have a key name for use with Datastore entities which returns
66  rows in time descending order when it is scanned in lexically ascending order,
67  allowing us to bypass index building for descending indexes.
68
69  Args:
70    gettime: Used for testing.
71
72  Returns:
73    A string with a time descending key.
74  """
75  now_descending = int((_FUTURE_TIME - gettime()) * 100)
76  request_id_hash = os.environ.get("REQUEST_ID_HASH")
77  if not request_id_hash:
78    request_id_hash = str(random.getrandbits(32))
79  return "%d%s" % (now_descending, request_id_hash)
80
81
82def _get_task_host():
83  """Get the Host header value for all mr tasks.
84
85  Task Host header determines which instance this task would be routed to.
86
87  Current version id format is: v7.368834058928280579
88  Current module id is just the module's name. It could be "default"
89  Default version hostname is app_id.appspot.com
90
91  Returns:
92    A complete host name is of format version.module.app_id.appspot.com
93  If module is the default module, just version.app_id.appspot.com. The reason
94  is if an app doesn't have modules enabled and the url is
95  "version.default.app_id", "version" is ignored and "default" is used as
96  version. If "default" version doesn't exist, the url is routed to the
97  default version.
98  """
99  version = os.environ["CURRENT_VERSION_ID"].split(".")[0]
100  default_host = os.environ["DEFAULT_VERSION_HOSTNAME"]
101  module = os.environ["CURRENT_MODULE_ID"]
102  if os.environ["CURRENT_MODULE_ID"] == "default":
103    return "%s.%s" % (version, default_host)
104  return "%s.%s.%s" % (version, module, default_host)
105
106
107def _get_task_headers(map_job_id,
108                      mr_id_header_key=_MR_ID_TASK_HEADER):
109  """Get headers for all mr tasks.
110
111  Args:
112    map_job_id: map job id.
113    mr_id_header_key: the key to set mr id with.
114
115  Returns:
116    A dictionary of all headers.
117  """
118  return {mr_id_header_key: map_job_id,
119          "Host": _get_task_host()}
120
121
122def _enum(**enums):
123  """Helper to create enum."""
124  return type("Enum", (), enums)
125
126
127def get_queue_name(queue_name):
128  """Determine which queue MR should run on.
129
130  How to choose the queue:
131  1. If user provided one, use that.
132  2. If we are starting a mr from taskqueue, inherit that queue.
133     If it's a special queue, fall back to the default queue.
134  3. Default queue.
135
136  If user is using any MR pipeline interface, pipeline.start takes a
137  "queue_name" argument. The pipeline will run on that queue and MR will
138  simply inherit the queue_name.
139
140  Args:
141    queue_name: queue_name from user. Maybe None.
142
143  Returns:
144    The queue name to run on.
145  """
146  if queue_name:
147    return queue_name
148  queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
149                              parameters.config.QUEUE_NAME)
150  if len(queue_name) > 1 and queue_name[0:2] == "__":
151    # We are currently in some special queue. E.g. __cron.
152    return parameters.config.QUEUE_NAME
153  else:
154    return queue_name
155
156
157def total_seconds(td):
158  """convert a timedelta to seconds.
159
160  This is patterned after timedelta.total_seconds, which is only
161  available in python 27.
162
163  Args:
164    td: a timedelta object.
165
166  Returns:
167    total seconds within a timedelta. Rounded up to seconds.
168  """
169  secs = td.seconds + td.days * 24 * 3600
170  if td.microseconds:
171    secs += 1
172  return secs
173
174
175def for_name(fq_name, recursive=False):
176  """Find class/function/method specified by its fully qualified name.
177
178  Fully qualified can be specified as:
179    * <module_name>.<class_name>
180    * <module_name>.<function_name>
181    * <module_name>.<class_name>.<method_name> (an unbound method will be
182      returned in this case).
183
184  for_name works by doing __import__ for <module_name>, and looks for
185  <class_name>/<function_name> in module's __dict__/attrs. If fully qualified
186  name doesn't contain '.', the current module will be used.
187
188  Args:
189    fq_name: fully qualified name of something to find.
190    recursive: run recursively or not.
191
192  Returns:
193    class object or None if fq_name is None.
194
195  Raises:
196    ImportError: when specified module could not be loaded or the class
197    was not found in the module.
198  """
199#  if "." not in fq_name:
200#    raise ImportError("'%s' is not a full-qualified name" % fq_name)
201
202  if fq_name is None:
203    return
204
205  fq_name = str(fq_name)
206  module_name = __name__
207  short_name = fq_name
208
209  if fq_name.rfind(".") >= 0:
210    (module_name, short_name) = (fq_name[:fq_name.rfind(".")],
211                                 fq_name[fq_name.rfind(".") + 1:])
212
213  try:
214    result = __import__(module_name, None, None, [short_name])
215    return result.__dict__[short_name]
216  except KeyError:
217    # If we're recursively inside a for_name() chain, then we want to raise
218    # this error as a key error so we can report the actual source of the
219    # problem. If we're *not* recursively being called, that means the
220    # module was found and the specific item could not be loaded, and thus
221    # we want to raise an ImportError directly.
222    if recursive:
223      raise
224    else:
225      raise ImportError("Could not find '%s' on path '%s'" % (
226          short_name, module_name))
227  except ImportError:
228    # module_name is not actually a module. Try for_name for it to figure
229    # out what's this.
230    try:
231      module = for_name(module_name, recursive=True)
232      if hasattr(module, short_name):
233        return getattr(module, short_name)
234      else:
235        # The module was found, but the function component is missing.
236        raise KeyError()
237    except KeyError:
238      raise ImportError("Could not find '%s' on path '%s'" % (
239          short_name, module_name))
240    except ImportError:
241      # This means recursive import attempts failed, thus we will raise the
242      # first ImportError we encountered, since it's likely the most accurate.
243      pass
244    # Raise the original import error that caused all of this, since it is
245    # likely the real cause of the overall problem.
246    raise
247
248
249def handler_for_name(fq_name):
250  """Resolves and instantiates handler by fully qualified name.
251
252  First resolves the name using for_name call. Then if it resolves to a class,
253  instantiates a class, if it resolves to a method - instantiates the class and
254  binds method to the instance.
255
256  Args:
257    fq_name: fully qualified name of something to find.
258
259  Returns:
260    handler instance which is ready to be called.
261  """
262  resolved_name = for_name(fq_name)
263  if isinstance(resolved_name, (type, types.ClassType)):
264    # create new instance if this is type
265    return resolved_name()
266  elif isinstance(resolved_name, types.MethodType):
267    # bind the method
268    return getattr(resolved_name.im_class(), resolved_name.__name__)
269  else:
270    return resolved_name
271
272
273def try_serialize_handler(handler):
274  """Try to serialize map/reduce handler.
275
276  Args:
277    handler: handler function/instance. Handler can be a function or an
278      instance of a callable class. In the latter case, the handler will
279      be serialized across slices to allow users to save states.
280
281  Returns:
282    serialized handler string or None.
283  """
284  if (isinstance(handler, types.InstanceType) or  # old style class
285      (isinstance(handler, object) and  # new style class
286       not inspect.isfunction(handler) and
287       not inspect.ismethod(handler)) and
288      hasattr(handler, "__call__")):
289    return pickle.dumps(handler)
290  return None
291
292
293def try_deserialize_handler(serialized_handler):
294  """Reverse function of try_serialize_handler.
295
296  Args:
297    serialized_handler: serialized handler str or None.
298
299  Returns:
300    handler instance or None.
301  """
302  if serialized_handler:
303    return pickle.loads(serialized_handler)
304
305
306def is_generator(obj):
307  """Return true if the object is generator or generator function.
308
309  Generator function objects provides same attributes as functions.
310  See isfunction.__doc__ for attributes listing.
311
312  Adapted from Python 2.6.
313
314  Args:
315    obj: an object to test.
316
317  Returns:
318    true if the object is generator function.
319  """
320  if isinstance(obj, types.GeneratorType):
321    return True
322
323  CO_GENERATOR = 0x20
324  return bool(((inspect.isfunction(obj) or inspect.ismethod(obj)) and
325               obj.func_code.co_flags & CO_GENERATOR))
326
327
328def get_short_name(fq_name):
329  """Returns the last component of the name."""
330  return fq_name.split(".")[-1:][0]
331
332
333def parse_bool(obj):
334  """Return true if the object represents a truth value, false otherwise.
335
336  For bool and numeric objects, uses Python's built-in bool function.  For
337  str objects, checks string against a list of possible truth values.
338
339  Args:
340    obj: object to determine boolean value of; expected
341
342  Returns:
343    Boolean value according to 5.1 of Python docs if object is not a str
344      object.  For str objects, return True if str is in TRUTH_VALUE_SET
345      and False otherwise.
346    http://docs.python.org/library/stdtypes.html
347  """
348  if type(obj) is str:
349    TRUTH_VALUE_SET = ["true", "1", "yes", "t", "on"]
350    return obj.lower() in TRUTH_VALUE_SET
351  else:
352    return bool(obj)
353
354
355def create_datastore_write_config(mapreduce_spec):
356  """Creates datastore config to use in write operations.
357
358  Args:
359    mapreduce_spec: current mapreduce specification as MapreduceSpec.
360
361  Returns:
362    an instance of datastore_rpc.Configuration to use for all write
363    operations in the mapreduce.
364  """
365  force_writes = parse_bool(mapreduce_spec.params.get("force_writes", "false"))
366  if force_writes:
367    return datastore_rpc.Configuration(force_writes=force_writes)
368  else:
369    # dev server doesn't support force_writes.
370    return datastore_rpc.Configuration()
371
372
373def _set_ndb_cache_policy():
374  """Tell NDB to never cache anything in memcache or in-process.
375
376  This ensures that entities fetched from Datastore input_readers via NDB
377  will not bloat up the request memory size and Datastore Puts will avoid
378  doing calls to memcache. Without this you get soft memory limit exits,
379  which hurts overall throughput.
380  """
381  ndb_ctx = ndb.get_context()
382  ndb_ctx.set_cache_policy(lambda key: False)
383  ndb_ctx.set_memcache_policy(lambda key: False)
384
385
386def _obj_to_path(obj):
387  """Returns the fully qualified path to the object.
388
389  Args:
390    obj: obj must be a new style top level class, or a top level function.
391      No inner function or static method.
392
393  Returns:
394    Fully qualified path to the object.
395
396  Raises:
397    TypeError: when argument obj has unsupported type.
398    ValueError: when obj can't be discovered on the top level.
399  """
400  if obj is None:
401    return obj
402
403  if inspect.isclass(obj) or inspect.isfunction(obj):
404    fetched = getattr(sys.modules[obj.__module__], obj.__name__, None)
405    if fetched is None:
406      raise ValueError(
407          "Object %r must be defined on the top level of a module." % obj)
408    return "%s.%s" % (obj.__module__, obj.__name__)
409  raise TypeError("Unexpected type %s." % type(obj))
410
411
412def strip_prefix_from_items(prefix, items):
413  """Strips out the prefix from each of the items if it is present.
414
415  Args:
416    prefix: the string for that you wish to strip from the beginning of each
417      of the items.
418    items: a list of strings that may or may not contain the prefix you want
419      to strip out.
420
421  Returns:
422    items_no_prefix: a copy of the list of items (same order) without the
423      prefix (if present).
424  """
425  items_no_prefix = []
426  for item in items:
427    if item.startswith(prefix):
428      items_no_prefix.append(item[len(prefix):])
429    else:
430      items_no_prefix.append(item)
431  return items_no_prefix
432