1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Facilities for creating multiple test combinations.
16
17Here is an example of testing various optimizers in Eager and Graph mode:
18
19class AdditionExample(test.TestCase, parameterized.TestCase):
20  @combinations.generate(
21     combinations.combine(mode=["graph", "eager"],
22                          optimizer=[AdamOptimizer(),
23                                     GradientDescentOptimizer()]))
24  def testOptimizer(self, optimizer):
25    ... f(optimizer)...
26
27This will run `testOptimizer` 4 times with the specified optimizers: 2 in
28Eager and 2 in Graph mode.
29The test will be provided with arguments that match the arguments of combine
30by name.  It is necessary to request all arguments, except for `mode`, which is
31optional.
32
33`combine()` function is available for creating a cross product of various
34options.  `times()` function exists for creating a product of N `combine()`-ed
35results.  See below.
36"""
37
38from __future__ import absolute_import
39from __future__ import division
40from __future__ import print_function
41
42from collections import OrderedDict
43import sys
44import types
45import unittest
46from absl.testing import parameterized
47import six
48from tensorflow.contrib.distribute.python import mirrored_strategy as mirrored_lib
49from tensorflow.contrib.distribute.python import parameter_server_strategy
50from tensorflow.contrib.optimizer_v2 import adagrad as adagrad_v2
51from tensorflow.contrib.optimizer_v2 import adam as adam_v2
52from tensorflow.contrib.optimizer_v2 import gradient_descent as gradient_descent_v2
53from tensorflow.python.distribute import distribution_strategy_context
54from tensorflow.python.distribute import one_device_strategy as one_device_lib
55from tensorflow.python.distribute import tpu_strategy as tpu_lib
56from tensorflow.python.distribute.cluster_resolver import tpu_cluster_resolver
57from tensorflow.python.eager import context
58from tensorflow.python.framework import ops
59from tensorflow.python.keras.optimizer_v2 import adagrad as adagrad_keras_v2
60from tensorflow.python.keras.optimizer_v2 import adam as adam_keras_v2
61from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_keras_v2
62from tensorflow.python.keras.optimizer_v2 import rmsprop as rmsprop_keras_v2
63from tensorflow.python.tpu import device_assignment as device_assignment_lib
64from tensorflow.python.tpu import tpu_strategy_util
65from tensorflow.python.training import adagrad
66from tensorflow.python.training import adam
67from tensorflow.python.training import gradient_descent
68from tensorflow.python.training import rmsprop
69from tensorflow.python.util import tf_inspect
70
71
72GPU_TEST = "test_gpu" in sys.argv[0]
73TPU_TEST = "test_tpu" in sys.argv[0]
74
75
76def generate(combinations):
77  """A decorator for generating test cases of a test method or a test class.
78
79  Args:
80    combinations: a list of dictionaries created using combine() and times().
81
82  Restrictions:
83   -- the "mode" argument can be either "eager" or "graph".  It's "graph" by
84      default.
85   -- arguments of the test method must match by name to get the corresponding
86      value of the combination.  Tests must accept all arguments except the
87      "mode", "required_tpu" and "required_gpus".
88   -- "distribution" argument is special and optional.  It is meant for passing
89      instances of DistributionStrategy.  Each instance is to be passed as via
90      `NamedDistribution`.  If using "distribution", "required_gpus" and
91      "required_tpu" should be specified via the NamedDistribution instance,
92      rather than as separate arguments.
93   -- "required_tpu" argument is special and optional.  If not `None`, then the
94      test will be skipped if TPUs aren't available.
95   -- "required_gpus" argument is special and optional.  If not `None`, then the
96      test will be skipped if the specified number of GPUs aren't available.
97
98  Returns:
99    a decorator that will cause the test method or the test class to be run
100    under the specified conditions.
101
102  Raises:
103    ValueError - if "mode" argument wasn't either "eager" or "graph" or if other
104      arguments were not accepted by the test method.
105  """
106
107  def decorator(test_method_or_class):
108    """The decorator to be returned."""
109
110    # Generate good test names that can be used with --test_filter.
111    named_combinations = []
112    for combination in combinations:
113      # We use OrderedDicts in `combine()` and `times()` to ensure stable
114      # order of keys in each dictionary.
115      assert isinstance(combination, OrderedDict)
116      name = "".join([
117          "_{}_{}".format(
118              "".join(filter(str.isalnum, key)),
119              "".join(filter(str.isalnum, str(value))))
120          for key, value in combination.items()
121      ])
122      named_combinations.append(
123          OrderedDict(
124              list(combination.items()) + [("testcase_name",
125                                            "_test{}".format(name))]))
126
127    if isinstance(test_method_or_class, type):
128      class_object = test_method_or_class
129      class_object._test_method_ids = test_method_ids = {}
130      for name, test_method in six.iteritems(class_object.__dict__.copy()):
131        if (name.startswith(unittest.TestLoader.testMethodPrefix) and
132            isinstance(test_method, types.FunctionType)):
133          delattr(class_object, name)
134          methods = {}
135          parameterized._update_class_dict_for_param_test_case(
136              class_object.__name__, methods, test_method_ids, name,
137              parameterized._ParameterizedTestIter(
138                  _augment_with_special_arguments(test_method),
139                  named_combinations, parameterized._NAMED, name))
140          for method_name, method in six.iteritems(methods):
141            setattr(class_object, method_name, method)
142
143      return class_object
144    else:
145      test_method = _augment_with_special_arguments(test_method_or_class)
146      return parameterized.named_parameters(*named_combinations)(test_method)
147
148  return decorator
149
150
151def _augment_with_special_arguments(test_method):
152  def decorated(self, **kwargs):
153    """A wrapped test method that treats some arguments in a special way."""
154    mode = kwargs.pop("mode", "graph")
155
156    distribution = kwargs.get("distribution", None)
157    required_tpu = kwargs.pop("required_tpu", False)
158    required_gpus = kwargs.pop("required_gpus", None)
159
160    if distribution:
161      assert required_gpus is None, (
162          "Do not use `required_gpus` and `distribution` together.")
163      assert required_tpu is False, (
164          "Do not use `required_tpu` and `distribution` together.")
165      required_gpus = distribution.required_gpus
166      required_tpu = distribution.required_tpu
167
168    if required_tpu and not TPU_TEST:
169      self.skipTest("Test requires a TPU, but it's not available.")
170    if not required_tpu and TPU_TEST:
171      self.skipTest("Test that doesn't require a TPU.")
172
173    if not required_gpus:
174      if GPU_TEST:
175        self.skipTest("Test that doesn't require GPUs.")
176    elif context.num_gpus() < required_gpus:
177      # TODO(priyag): Consider allowing tests in graph mode using soft
178      # placement.
179      self.skipTest(
180          "{} GPUs are not available for this test. {} GPUs are available".
181          format(required_gpus, context.num_gpus()))
182
183    # At this point, `kwargs` doesn't have `required_gpus` or `required_tpu`
184    # that the user might have specified.  `kwargs` still has `mode`, which
185    # the test is allowed to accept or ignore.
186    requested_arguments = tf_inspect.getfullargspec(test_method).args
187    missing_arguments = set(list(kwargs.keys()) + ["self"]).difference(
188        set(requested_arguments + ["mode"]))
189    if missing_arguments:
190      raise ValueError("The test is missing arguments {} .".format(
191          missing_arguments))
192
193    kwargs_to_pass = {}
194    for arg in requested_arguments:
195      if arg == "self":
196        kwargs_to_pass[arg] = self
197      else:
198        kwargs_to_pass[arg] = kwargs[arg]
199
200    if mode == "eager":
201      with context.eager_mode():
202        if distribution:
203          kwargs_to_pass["distribution"] = distribution.strategy
204        test_method(**kwargs_to_pass)
205    elif mode == "graph":
206      with ops.Graph().as_default(), context.graph_mode():
207        if distribution:
208          kwargs_to_pass["distribution"] = distribution.strategy
209        test_method(**kwargs_to_pass)
210    else:
211      raise ValueError(
212          "'mode' has to be either 'eager' or 'graph' and not {}".format(
213              mode))
214  return decorated
215
216
217def combine(**kwargs):
218  """Generate combinations based on its keyword arguments.
219
220  Two sets of returned combinations can be concatenated using +.  Their product
221  can be computed using `times()`.
222
223  Args:
224    **kwargs: keyword arguments of form `option=[possibilities, ...]`
225         or `option=the_only_possibility`.
226
227  Returns:
228    a list of dictionaries for each combination. Keys in the dictionaries are
229    the keyword argument names.  Each key has one value - one of the
230    corresponding keyword argument values.
231  """
232  if not kwargs:
233    return [OrderedDict()]
234
235  sort_by_key = lambda k: k[0]
236  kwargs = OrderedDict(sorted(kwargs.items(), key=sort_by_key))
237  first = list(kwargs.items())[0]
238
239  rest = dict(list(kwargs.items())[1:])
240  rest_combined = combine(**rest)
241
242  key = first[0]
243  values = first[1]
244  if not isinstance(values, list):
245    values = [values]
246
247  return [
248      OrderedDict(sorted(list(combined.items()) + [(key, v)], key=sort_by_key))
249      for v in values
250      for combined in rest_combined
251  ]
252
253
254def times(*combined):
255  """Generate a product of N sets of combinations.
256
257  times(combine(a=[1,2]), combine(b=[3,4])) == combine(a=[1,2], b=[3,4])
258
259  Args:
260    *combined: N lists of dictionaries that specify combinations.
261
262  Returns:
263    a list of dictionaries for each combination.
264
265  Raises:
266    ValueError: if some of the inputs have overlapping keys.
267  """
268  assert combined
269
270  if len(combined) == 1:
271    return combined[0]
272
273  first = combined[0]
274  rest_combined = times(*combined[1:])
275
276  combined_results = []
277  for a in first:
278    for b in rest_combined:
279      if set(a.keys()).intersection(set(b.keys())):
280        raise ValueError("Keys need to not overlap: {} vs {}".format(
281            a.keys(), b.keys()))
282
283      combined_results.append(OrderedDict(list(a.items()) + list(b.items())))
284  return combined_results
285
286
287class NamedObject(object):
288  """A class that translates an object into a good test name."""
289
290  def __init__(self, name, obj):
291    self._name = name
292    self._obj = obj
293
294  def __getattr__(self, name):
295    return getattr(self._obj, name)
296
297  def __call__(self, *args, **kwargs):
298    return self._obj(*args, **kwargs)
299
300  def __repr__(self):
301    return self._name
302
303
304class NamedDistribution(object):
305  """Translates DistributionStrategy and its data into a good name."""
306
307  def __init__(self, name, distribution_fn, required_gpus=None,
308               required_tpu=False):
309    self._distribution_fn = distribution_fn
310    self._name = name
311    self._required_gpus = required_gpus
312    self._required_tpu = required_tpu
313
314  def __repr__(self):
315    return self._name
316
317  @property
318  def strategy(self):
319    return self._distribution_fn()
320
321  @property
322  def required_gpus(self):
323    return self._required_gpus
324
325  @property
326  def required_tpu(self):
327    return self._required_tpu
328
329
330def _get_tpu_strategy_creator(steps_per_run, use_single_core=False, **kwargs):
331  def _create_tpu_strategy():
332    resolver = tpu_cluster_resolver.TPUClusterResolver("")
333    topology = tpu_strategy_util.initialize_tpu_system(resolver)
334    device_assignment = None
335    if use_single_core:
336      device_assignment = device_assignment_lib.DeviceAssignment(
337          topology, core_assignment=device_assignment_lib.
338          SINGLE_CORE_ASSIGNMENT)
339
340    strategy = tpu_lib.TPUStrategy(resolver, steps_per_run=steps_per_run,
341                                   device_assignment=device_assignment,
342                                   **kwargs)
343    return strategy
344  return _create_tpu_strategy
345
346
347# pylint: disable=g-long-lambda
348default_strategy = NamedDistribution(
349    "Default",
350    distribution_strategy_context._get_default_strategy,  # pylint: disable=protected-access
351    required_gpus=None)
352one_device_strategy = NamedDistribution(
353    "OneDeviceCPU", lambda: one_device_lib.OneDeviceStrategy("/cpu:0"),
354    required_gpus=None)
355one_device_strategy_gpu = NamedDistribution(
356    "OneDeviceGPU", lambda: one_device_lib.OneDeviceStrategy("/gpu:0"),
357    required_gpus=1)
358tpu_strategy = NamedDistribution(
359    "TPU", _get_tpu_strategy_creator(steps_per_run=2),
360    required_tpu=True)
361tpu_strategy_one_step = NamedDistribution(
362    "TPUOneStep", _get_tpu_strategy_creator(steps_per_run=1),
363    required_tpu=True)
364tpu_strategy_one_core = NamedDistribution(
365    "TPUOneCore", _get_tpu_strategy_creator(
366        steps_per_run=2, use_single_core=True),
367    required_tpu=True)
368tpu_strategy_one_step_one_core = NamedDistribution(
369    "TPUOneStepOneCore", _get_tpu_strategy_creator(
370        steps_per_run=1, use_single_core=True),
371    required_tpu=True)
372
373mirrored_strategy_with_one_cpu = NamedDistribution(
374    "Mirrored1CPU",
375    lambda: mirrored_lib.MirroredStrategy(["/cpu:0"]))
376mirrored_strategy_with_one_gpu = NamedDistribution(
377    "Mirrored1GPU",
378    lambda: mirrored_lib.MirroredStrategy(["/gpu:0"]),
379    required_gpus=1)
380mirrored_strategy_with_gpu_and_cpu = NamedDistribution(
381    "MirroredCPUAndGPU",
382    lambda: mirrored_lib.MirroredStrategy(["/gpu:0", "/cpu:0"]),
383    required_gpus=1)
384mirrored_strategy_with_two_gpus = NamedDistribution(
385    "Mirrored2GPUs",
386    lambda: mirrored_lib.MirroredStrategy(["/gpu:0", "/gpu:1"]),
387    required_gpus=2)
388core_mirrored_strategy_with_one_cpu = NamedDistribution(
389    "CoreMirrored1CPU",
390    lambda: mirrored_lib.CoreMirroredStrategy(["/cpu:0"]))
391core_mirrored_strategy_with_one_gpu = NamedDistribution(
392    "CoreMirrored1GPU",
393    lambda: mirrored_lib.CoreMirroredStrategy(["/gpu:0"]),
394    required_gpus=1)
395core_mirrored_strategy_with_gpu_and_cpu = NamedDistribution(
396    "CoreMirroredCPUAndGPU",
397    lambda: mirrored_lib.CoreMirroredStrategy(["/gpu:0", "/cpu:0"]),
398    required_gpus=1)
399core_mirrored_strategy_with_two_gpus = NamedDistribution(
400    "CoreMirrored2GPUs",
401    lambda: mirrored_lib.CoreMirroredStrategy(["/gpu:0", "/gpu:1"]),
402    required_gpus=2)
403parameter_server_strategy_with_two_gpus = NamedDistribution(
404    "ParameterServer2GPUs",
405    lambda: parameter_server_strategy.ParameterServerStrategy(
406        num_gpus_per_worker=2),
407    required_gpus=2)
408
409
410gradient_descent_optimizer_v1_fn = NamedObject(
411    "GradientDescentV1", lambda: gradient_descent.GradientDescentOptimizer(0.2))
412adagrad_optimizer_v1_fn = NamedObject(
413    "AdagradV1", lambda: adagrad.AdagradOptimizer(0.001))
414adam_optimizer_v1_fn = NamedObject("AdamV1",
415                                   lambda: adam.AdamOptimizer(0.001, epsilon=1))
416rmsprop_optimizer_v1_fn = NamedObject(
417    "RmsPropV1", lambda: rmsprop.RMSPropOptimizer(0.001))
418
419optimizers_v1 = [gradient_descent_optimizer_v1_fn, adagrad_optimizer_v1_fn]
420
421gradient_descent_optimizer_v2_fn = NamedObject(
422    "GradientDescentV2",
423    lambda: gradient_descent_v2.GradientDescentOptimizer(0.2))
424adagrad_optimizer_v2_fn = NamedObject(
425    "AdagradV2", lambda: adagrad_v2.AdagradOptimizer(0.001))
426adam_optimizer_v2_fn = NamedObject(
427    "AdamV2", lambda: adam_v2.AdamOptimizer(0.001, epsilon=1.0))
428
429optimizers_v2 = [gradient_descent_optimizer_v2_fn, adagrad_optimizer_v2_fn]
430
431gradient_descent_optimizer_keras_v2_fn = NamedObject(
432    "GradientDescentKerasV2",
433    lambda: gradient_descent_keras_v2.SGD(0.2))
434adagrad_optimizer_keras_v2_fn = NamedObject(
435    "AdagradKerasV2", lambda: adagrad_keras_v2.Adagrad(0.001))
436adam_optimizer_keras_v2_fn = NamedObject(
437    "AdamKerasV2", lambda: adam_keras_v2.Adam(0.001, epsilon=1.0))
438rmsprop_optimizer_keras_v2_fn = NamedObject(
439    "RmsPropKerasV2", lambda: rmsprop_keras_v2.RMSprop(0.001))
440
441graph_and_eager_modes = ["graph", "eager"]
442
443
444def distributions_and_v1_optimizers():
445  """A common set of combination with DistributionStrategies and Optimizers."""
446  return combine(
447      distribution=[
448          one_device_strategy,
449          mirrored_strategy_with_gpu_and_cpu,
450          mirrored_strategy_with_two_gpus,
451          core_mirrored_strategy_with_gpu_and_cpu,
452          core_mirrored_strategy_with_two_gpus,
453      ],
454      optimizer_fn=optimizers_v1)
455
456
457def distributions_and_v2_optimizers():
458  """DistributionStrategies and V2 Optimizers."""
459  return combine(
460      distribution=[
461          one_device_strategy,
462          mirrored_strategy_with_gpu_and_cpu,
463          mirrored_strategy_with_two_gpus,
464          core_mirrored_strategy_with_gpu_and_cpu,
465          core_mirrored_strategy_with_two_gpus,
466      ],
467      optimizer_fn=optimizers_v2)
468