1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Utilities for unit-testing Keras."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import threading
22
23import numpy as np
24
25from tensorflow.python import keras
26from tensorflow.python.eager import context
27from tensorflow.python.framework import tensor_shape
28from tensorflow.python.framework import test_util
29from tensorflow.python.keras.optimizer_v2 import adadelta as adadelta_v2
30from tensorflow.python.keras.optimizer_v2 import adagrad as adagrad_v2
31from tensorflow.python.keras.optimizer_v2 import adam as adam_v2
32from tensorflow.python.keras.optimizer_v2 import adamax as adamax_v2
33from tensorflow.python.keras.optimizer_v2 import gradient_descent as gradient_descent_v2
34from tensorflow.python.keras.optimizer_v2 import nadam as nadam_v2
35from tensorflow.python.keras.optimizer_v2 import rmsprop as rmsprop_v2
36from tensorflow.python.util import tf_contextlib
37from tensorflow.python.util import tf_inspect
38
39
40def get_test_data(train_samples,
41                  test_samples,
42                  input_shape,
43                  num_classes,
44                  random_seed=None):
45  """Generates test data to train a model on.
46
47  Arguments:
48    train_samples: Integer, how many training samples to generate.
49    test_samples: Integer, how many test samples to generate.
50    input_shape: Tuple of integers, shape of the inputs.
51    num_classes: Integer, number of classes for the data and targets.
52    random_seed: Integer, random seed used by numpy to generate data.
53
54  Returns:
55    A tuple of Numpy arrays: `(x_train, y_train), (x_test, y_test)`.
56  """
57  if random_seed is not None:
58    np.random.seed(random_seed)
59  num_sample = train_samples + test_samples
60  templates = 2 * num_classes * np.random.random((num_classes,) + input_shape)
61  y = np.random.randint(0, num_classes, size=(num_sample,))
62  x = np.zeros((num_sample,) + input_shape, dtype=np.float32)
63  for i in range(num_sample):
64    x[i] = templates[y[i]] + np.random.normal(loc=0, scale=1., size=input_shape)
65  return ((x[:train_samples], y[:train_samples]),
66          (x[train_samples:], y[train_samples:]))
67
68
69@test_util.use_deterministic_cudnn
70def layer_test(layer_cls, kwargs=None, input_shape=None, input_dtype=None,
71               input_data=None, expected_output=None,
72               expected_output_dtype=None):
73  """Test routine for a layer with a single input and single output.
74
75  Arguments:
76    layer_cls: Layer class object.
77    kwargs: Optional dictionary of keyword arguments for instantiating the
78      layer.
79    input_shape: Input shape tuple.
80    input_dtype: Data type of the input data.
81    input_data: Numpy array of input data.
82    expected_output: Shape tuple for the expected shape of the output.
83    expected_output_dtype: Data type expected for the output.
84
85  Returns:
86    The output data (Numpy array) returned by the layer, for additional
87    checks to be done by the calling code.
88
89  Raises:
90    ValueError: if `input_shape is None`.
91  """
92  if input_data is None:
93    if input_shape is None:
94      raise ValueError('input_shape is None')
95    if not input_dtype:
96      input_dtype = 'float32'
97    input_data_shape = list(input_shape)
98    for i, e in enumerate(input_data_shape):
99      if e is None:
100        input_data_shape[i] = np.random.randint(1, 4)
101    input_data = 10 * np.random.random(input_data_shape)
102    if input_dtype[:5] == 'float':
103      input_data -= 0.5
104    input_data = input_data.astype(input_dtype)
105  elif input_shape is None:
106    input_shape = input_data.shape
107  if input_dtype is None:
108    input_dtype = input_data.dtype
109  if expected_output_dtype is None:
110    expected_output_dtype = input_dtype
111
112  # instantiation
113  kwargs = kwargs or {}
114  layer = layer_cls(**kwargs)
115
116  # test get_weights , set_weights at layer level
117  weights = layer.get_weights()
118  layer.set_weights(weights)
119
120  # test and instantiation from weights
121  if 'weights' in tf_inspect.getargspec(layer_cls.__init__):
122    kwargs['weights'] = weights
123    layer = layer_cls(**kwargs)
124
125  # test in functional API
126  x = keras.layers.Input(shape=input_shape[1:], dtype=input_dtype)
127  y = layer(x)
128  if keras.backend.dtype(y) != expected_output_dtype:
129    raise AssertionError('When testing layer %s, for input %s, found output '
130                         'dtype=%s but expected to find %s.\nFull kwargs: %s' %
131                         (layer_cls.__name__,
132                          x,
133                          keras.backend.dtype(y),
134                          expected_output_dtype,
135                          kwargs))
136  # check shape inference
137  model = keras.models.Model(x, y)
138  expected_output_shape = tuple(
139      layer.compute_output_shape(
140          tensor_shape.TensorShape(input_shape)).as_list())
141  actual_output = model.predict(input_data)
142  actual_output_shape = actual_output.shape
143  for expected_dim, actual_dim in zip(expected_output_shape,
144                                      actual_output_shape):
145    if expected_dim is not None:
146      if expected_dim != actual_dim:
147        raise AssertionError(
148            'When testing layer %s, for input %s, found output_shape='
149            '%s but expected to find %s.\nFull kwargs: %s' %
150            (layer_cls.__name__,
151             x,
152             actual_output_shape,
153             expected_output_shape,
154             kwargs))
155  if expected_output is not None:
156    np.testing.assert_allclose(actual_output, expected_output, rtol=1e-3)
157
158  # test serialization, weight setting at model level
159  model_config = model.get_config()
160  recovered_model = keras.models.Model.from_config(model_config)
161  if model.weights:
162    weights = model.get_weights()
163    recovered_model.set_weights(weights)
164    output = recovered_model.predict(input_data)
165    np.testing.assert_allclose(output, actual_output, rtol=2e-3)
166
167  # test training mode (e.g. useful for dropout tests)
168  # Rebuild the model to avoid the graph being reused between predict() and
169  # train(). This was causing some error for layer with Defun as it body.
170  # See b/120160788 for more details. This should be mitigated after 2.0.
171  model = keras.models.Model(x, layer(x))
172  if _thread_local_data.run_eagerly is not None:
173    model.compile(
174        'rmsprop',
175        'mse',
176        weighted_metrics=['acc'],
177        run_eagerly=should_run_eagerly())
178  else:
179    model.compile('rmsprop', 'mse', weighted_metrics=['acc'])
180  model.train_on_batch(input_data, actual_output)
181
182  # test as first layer in Sequential API
183  layer_config = layer.get_config()
184  layer_config['batch_input_shape'] = input_shape
185  layer = layer.__class__.from_config(layer_config)
186
187  model = keras.models.Sequential()
188  model.add(layer)
189  actual_output = model.predict(input_data)
190  actual_output_shape = actual_output.shape
191  for expected_dim, actual_dim in zip(expected_output_shape,
192                                      actual_output_shape):
193    if expected_dim is not None:
194      if expected_dim != actual_dim:
195        raise AssertionError(
196            'When testing layer %s **after deserialization**, '
197            'for input %s, found output_shape='
198            '%s but expected to find inferred shape %s.\nFull kwargs: %s' %
199            (layer_cls.__name__,
200             x,
201             actual_output_shape,
202             expected_output_shape,
203             kwargs))
204  if expected_output is not None:
205    np.testing.assert_allclose(actual_output, expected_output, rtol=1e-3)
206
207  # test serialization, weight setting at model level
208  model_config = model.get_config()
209  recovered_model = keras.models.Sequential.from_config(model_config)
210  if model.weights:
211    weights = model.get_weights()
212    recovered_model.set_weights(weights)
213    output = recovered_model.predict(input_data)
214    np.testing.assert_allclose(output, actual_output, rtol=2e-3)
215
216  # for further checks in the caller function
217  return actual_output
218
219
220_thread_local_data = threading.local()
221_thread_local_data.model_type = None
222_thread_local_data.run_eagerly = None
223
224
225@tf_contextlib.contextmanager
226def model_type_scope(value):
227  """Provides a scope within which the model type to test is equal to `value`.
228
229  The model type gets restored to its original value upon exiting the scope.
230
231  Arguments:
232     value: model type value
233
234  Yields:
235    The provided value.
236  """
237  previous_value = _thread_local_data.model_type
238  try:
239    _thread_local_data.model_type = value
240    yield value
241  finally:
242    # Restore model type to initial value.
243    _thread_local_data.model_type = previous_value
244
245
246@tf_contextlib.contextmanager
247def run_eagerly_scope(value):
248  """Provides a scope within which we compile models to run eagerly or not.
249
250  The boolean gets restored to its original value upon exiting the scope.
251
252  Arguments:
253     value: Bool specifying if we should run models eagerly in the active test.
254     Should be True or False.
255
256  Yields:
257    The provided value.
258  """
259  previous_value = _thread_local_data.run_eagerly
260  try:
261    _thread_local_data.run_eagerly = value
262    yield value
263  finally:
264    # Restore model type to initial value.
265    _thread_local_data.run_eagerly = previous_value
266
267
268def should_run_eagerly():
269  """Returns whether the models we are testing should be run eagerly."""
270  if _thread_local_data.run_eagerly is None:
271    raise ValueError('Cannot call `should_run_eagerly()` outside of a '
272                     '`run_eagerly_scope()` or `run_all_keras_modes` '
273                     'decorator.')
274
275  return _thread_local_data.run_eagerly and context.executing_eagerly()
276
277
278def get_model_type():
279  """Gets the model type that should be tested."""
280  if _thread_local_data.model_type is None:
281    raise ValueError('Cannot call `get_model_type()` outside of a '
282                     '`model_type_scope()` or `run_with_all_model_types` '
283                     'decorator.')
284
285  return _thread_local_data.model_type
286
287
288def get_small_sequential_mlp(num_hidden, num_classes, input_dim=None):
289  model = keras.models.Sequential()
290  if input_dim:
291    model.add(keras.layers.Dense(num_hidden, activation='relu',
292                                 input_dim=input_dim))
293  else:
294    model.add(keras.layers.Dense(num_hidden, activation='relu'))
295  activation = 'sigmoid' if num_classes == 1 else 'softmax'
296  model.add(keras.layers.Dense(num_classes, activation=activation))
297  return model
298
299
300def get_small_functional_mlp(num_hidden, num_classes, input_dim):
301  inputs = keras.Input(shape=(input_dim,))
302  outputs = keras.layers.Dense(num_hidden, activation='relu')(inputs)
303  activation = 'sigmoid' if num_classes == 1 else 'softmax'
304  outputs = keras.layers.Dense(num_classes, activation=activation)(outputs)
305  return keras.Model(inputs, outputs)
306
307
308class _SmallSubclassMLP(keras.Model):
309  """A subclass model based small MLP."""
310
311  def __init__(self, num_hidden, num_classes):
312    super(_SmallSubclassMLP, self).__init__()
313    self.layer_a = keras.layers.Dense(num_hidden, activation='relu')
314    activation = 'sigmoid' if num_classes == 1 else 'softmax'
315    self.layer_b = keras.layers.Dense(num_classes, activation=activation)
316
317  def call(self, inputs, **kwargs):
318    x = self.layer_a(inputs)
319    return self.layer_b(x)
320
321
322class _SmallSubclassMLPCustomBuild(keras.Model):
323  """A subclass model small MLP that uses a custom build method."""
324
325  def __init__(self, num_hidden, num_classes):
326    super(_SmallSubclassMLPCustomBuild, self).__init__()
327    self.layer_a = None
328    self.layer_b = None
329    self.num_hidden = num_hidden
330    self.num_classes = num_classes
331
332  def build(self, input_shape):
333    self.layer_a = keras.layers.Dense(self.num_hidden, activation='relu')
334    activation = 'sigmoid' if self.num_classes == 1 else 'softmax'
335    self.layer_b = keras.layers.Dense(self.num_classes, activation=activation)
336
337  def call(self, inputs, **kwargs):
338    x = self.layer_a(inputs)
339    return self.layer_b(x)
340
341
342def get_small_subclass_mlp(num_hidden, num_classes):
343  return _SmallSubclassMLP(num_hidden, num_classes)
344
345
346def get_small_subclass_mlp_with_custom_build(num_hidden, num_classes):
347  return _SmallSubclassMLPCustomBuild(num_hidden, num_classes)
348
349
350def get_small_mlp(num_hidden, num_classes, input_dim):
351  """Get a small mlp of the model type specified by `get_model_type`."""
352  model_type = get_model_type()
353  if model_type == 'subclass':
354    return get_small_subclass_mlp(num_hidden, num_classes)
355  if model_type == 'subclass_custom_build':
356    return get_small_subclass_mlp_with_custom_build(num_hidden, num_classes)
357  if model_type == 'sequential':
358    return get_small_sequential_mlp(num_hidden, num_classes, input_dim)
359  if model_type == 'functional':
360    return get_small_functional_mlp(num_hidden, num_classes, input_dim)
361  raise ValueError('Unknown model type {}'.format(model_type))
362
363
364class _SubclassModel(keras.Model):
365  """A Keras subclass model."""
366
367  def __init__(self, layers):
368    super(_SubclassModel, self).__init__()
369    # Note that clone and build doesn't support lists of layers in subclassed
370    # models. Adding each layer directly here.
371    for i, layer in enumerate(layers):
372      setattr(self, self._layer_name_for_i(i), layer)
373
374    self.num_layers = len(layers)
375
376  def _layer_name_for_i(self, i):
377    return 'layer{}'.format(i)
378
379  def call(self, inputs, **kwargs):
380    x = inputs
381    for i in range(self.num_layers):
382      layer = getattr(self, self._layer_name_for_i(i))
383      x = layer(x)
384    return x
385
386
387class _SubclassModelCustomBuild(keras.Model):
388  """A Keras subclass model that uses a custom build method."""
389
390  def __init__(self, layer_generating_func):
391    super(_SubclassModelCustomBuild, self).__init__()
392    self.all_layers = None
393    self._layer_generating_func = layer_generating_func
394
395  def build(self, input_shape):
396    layers = []
397    for layer in self._layer_generating_func():
398      layers.append(layer)
399    self.all_layers = layers
400
401  def call(self, inputs, **kwargs):
402    x = inputs
403    for layer in self.all_layers:
404      x = layer(x)
405    return x
406
407
408def get_model_from_layers(layers, input_shape=None):
409  """Builds a model from a sequence of layers."""
410  model_type = get_model_type()
411  if model_type == 'subclass':
412    return _SubclassModel(layers)
413
414  if model_type == 'subclass_custom_build':
415    layer_generating_func = lambda: layers
416    return _SubclassModelCustomBuild(layer_generating_func)
417
418  if model_type == 'sequential':
419    model = keras.models.Sequential()
420    if input_shape:
421      model.add(keras.layers.InputLayer(input_shape=input_shape))
422    for layer in layers:
423      model.add(layer)
424    return model
425
426  if model_type == 'functional':
427    if not input_shape:
428      raise ValueError('Cannot create a functional model from layers with no '
429                       'input shape.')
430    inputs = keras.Input(shape=input_shape)
431    outputs = inputs
432    for layer in layers:
433      outputs = layer(outputs)
434    return keras.Model(inputs, outputs)
435
436  raise ValueError('Unknown model type {}'.format(model_type))
437
438
439class _MultiIOSubclassModel(keras.Model):
440  """Multi IO Keras subclass model."""
441
442  def __init__(self, branch_a, branch_b, shared_input_branch=None,
443               shared_output_branch=None):
444    super(_MultiIOSubclassModel, self).__init__()
445    self._shared_input_branch = shared_input_branch
446    self._branch_a = branch_a
447    self._branch_b = branch_b
448    self._shared_output_branch = shared_output_branch
449
450  def call(self, inputs, **kwargs):
451    if self._shared_input_branch:
452      for layer in self._shared_input_branch:
453        inputs = layer(inputs)
454      a = inputs
455      b = inputs
456    else:
457      a, b = inputs
458
459    for layer in self._branch_a:
460      a = layer(a)
461    for layer in self._branch_b:
462      b = layer(b)
463    outs = [a, b]
464
465    if self._shared_output_branch:
466      for layer in self._shared_output_branch:
467        outs = layer(outs)
468
469    return outs
470
471
472class _MultiIOSubclassModelCustomBuild(keras.Model):
473  """Multi IO Keras subclass model that uses a custom build method."""
474
475  def __init__(self, branch_a_func, branch_b_func,
476               shared_input_branch_func=None,
477               shared_output_branch_func=None):
478    super(_MultiIOSubclassModelCustomBuild, self).__init__()
479    self._shared_input_branch_func = shared_input_branch_func
480    self._branch_a_func = branch_a_func
481    self._branch_b_func = branch_b_func
482    self._shared_output_branch_func = shared_output_branch_func
483
484    self._shared_input_branch = None
485    self._branch_a = None
486    self._branch_b = None
487    self._shared_output_branch = None
488
489  def build(self, input_shape):
490    if self._shared_input_branch_func():
491      self._shared_input_branch = self._shared_input_branch_func()
492    self._branch_a = self._branch_a_func()
493    self._branch_b = self._branch_b_func()
494
495    if self._shared_output_branch_func():
496      self._shared_output_branch = self._shared_output_branch_func()
497
498  def call(self, inputs, **kwargs):
499    if self._shared_input_branch:
500      for layer in self._shared_input_branch:
501        inputs = layer(inputs)
502      a = inputs
503      b = inputs
504    else:
505      a, b = inputs
506
507    for layer in self._branch_a:
508      a = layer(a)
509    for layer in self._branch_b:
510      b = layer(b)
511    outs = a, b
512
513    if self._shared_output_branch:
514      for layer in self._shared_output_branch:
515        outs = layer(outs)
516
517    return outs
518
519
520def get_multi_io_model(
521    branch_a,
522    branch_b,
523    shared_input_branch=None,
524    shared_output_branch=None):
525  """Builds a multi-io model that contains two branches.
526
527  The produced model will be of the type specified by `get_model_type`.
528
529  To build a two-input, two-output model:
530    Specify a list of layers for branch a and branch b, but do not specify any
531    shared input branch or shared output branch. The resulting model will apply
532    each branch to a different input, to produce two outputs.
533
534    The first value in branch_a must be the Keras 'Input' layer for branch a,
535    and the first value in branch_b must be the Keras 'Input' layer for
536    branch b.
537
538    example usage:
539    ```
540    branch_a = [Input(shape=(2,), name='a'), Dense(), Dense()]
541    branch_b = [Input(shape=(3,), name='b'), Dense(), Dense()]
542
543    model = get_multi_io_model(branch_a, branch_b)
544    ```
545
546  To build a two-input, one-output model:
547    Specify a list of layers for branch a and branch b, and specify a
548    shared output branch. The resulting model will apply
549    each branch to a different input. It will then apply the shared output
550    branch to a tuple containing the intermediate outputs of each branch,
551    to produce a single output. The first layer in the shared_output_branch
552    must be able to merge a tuple of two tensors.
553
554    The first value in branch_a must be the Keras 'Input' layer for branch a,
555    and the first value in branch_b must be the Keras 'Input' layer for
556    branch b.
557
558    example usage:
559    ```
560    input_branch_a = [Input(shape=(2,), name='a'), Dense(), Dense()]
561    input_branch_b = [Input(shape=(3,), name='b'), Dense(), Dense()]
562    shared_output_branch = [Concatenate(), Dense(), Dense()]
563
564    model = get_multi_io_model(input_branch_a, input_branch_b,
565                               shared_output_branch=shared_output_branch)
566    ```
567  To build a one-input, two-output model:
568    Specify a list of layers for branch a and branch b, and specify a
569    shared input branch. The resulting model will take one input, and apply
570    the shared input branch to it. It will then respectively apply each branch
571    to that intermediate result in parallel, to produce two outputs.
572
573    The first value in the shared_input_branch must be the Keras 'Input' layer
574    for the whole model. Branch a and branch b should not contain any Input
575    layers.
576
577    example usage:
578    ```
579    shared_input_branch = [Input(shape=(2,), name='in'), Dense(), Dense()]
580    output_branch_a = [Dense(), Dense()]
581    output_branch_b = [Dense(), Dense()]
582
583
584    model = get_multi_io_model(output__branch_a, output_branch_b,
585                               shared_input_branch=shared_input_branch)
586    ```
587
588  Args:
589    branch_a: A sequence of layers for branch a of the model.
590    branch_b: A sequence of layers for branch b of the model.
591    shared_input_branch: An optional sequence of layers to apply to a single
592      input, before applying both branches to that intermediate result. If set,
593      the model will take only one input instead of two. Defaults to None.
594    shared_output_branch: An optional sequence of layers to merge the
595      intermediate results produced by branch a and branch b. If set,
596      the model will produce only one output instead of two. Defaults to None.
597
598  Returns:
599    A multi-io model of the type specified by `get_model_type`, specified
600    by the different branches.
601  """
602  # Extract the functional inputs from the layer lists
603  if shared_input_branch:
604    inputs = shared_input_branch[0]
605    shared_input_branch = shared_input_branch[1:]
606  else:
607    inputs = branch_a[0], branch_b[0]
608    branch_a = branch_a[1:]
609    branch_b = branch_b[1:]
610
611  model_type = get_model_type()
612  if model_type == 'subclass':
613    return _MultiIOSubclassModel(branch_a, branch_b, shared_input_branch,
614                                 shared_output_branch)
615
616  if model_type == 'subclass_custom_build':
617    return _MultiIOSubclassModelCustomBuild((lambda: branch_a),
618                                            (lambda: branch_b),
619                                            (lambda: shared_input_branch),
620                                            (lambda: shared_output_branch))
621
622  if model_type == 'sequential':
623    raise ValueError('Cannot use `get_multi_io_model` to construct '
624                     'sequential models')
625
626  if model_type == 'functional':
627    if shared_input_branch:
628      a_and_b = inputs
629      for layer in shared_input_branch:
630        a_and_b = layer(a_and_b)
631      a = a_and_b
632      b = a_and_b
633    else:
634      a, b = inputs
635
636    for layer in branch_a:
637      a = layer(a)
638    for layer in branch_b:
639      b = layer(b)
640    outputs = a, b
641
642    if shared_output_branch:
643      for layer in shared_output_branch:
644        outputs = layer(outputs)
645
646    return keras.Model(inputs, outputs)
647
648  raise ValueError('Unknown model type {}'.format(model_type))
649
650
651_V2_OPTIMIZER_MAP = {
652    'adadelta': adadelta_v2.Adadelta,
653    'adagrad': adagrad_v2.Adagrad,
654    'adam': adam_v2.Adam,
655    'adamax': adamax_v2.Adamax,
656    'nadam': nadam_v2.Nadam,
657    'rmsprop': rmsprop_v2.RMSprop,
658    'sgd': gradient_descent_v2.SGD
659}
660
661
662def get_v2_optimizer(name, **kwargs):
663  """Get the v2 optimizer requested.
664
665  This is only necessary until v2 are the default, as we are testing in Eager,
666  and Eager + v1 optimizers fail tests. When we are in v2, the strings alone
667  should be sufficient, and this mapping can theoretically be removed.
668
669  Args:
670    name: string name of Keras v2 optimizer.
671    **kwargs: any kwargs to pass to the optimizer constructor.
672
673  Returns:
674    Initialized Keras v2 optimizer.
675
676  Raises:
677    ValueError: if an unknown name was passed.
678  """
679  try:
680    return _V2_OPTIMIZER_MAP[name](**kwargs)
681  except KeyError:
682    raise ValueError(
683        'Could not find requested v2 optimizer: {}\nValid choices: {}'.format(
684            name, list(_V2_OPTIMIZER_MAP.keys())))
685