1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for LSTM layer."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21from absl.testing import parameterized
22import numpy as np
23
24from tensorflow.python import keras
25from tensorflow.python.eager import context
26from tensorflow.python.keras import keras_parameterized
27from tensorflow.python.keras import testing_utils
28from tensorflow.python.platform import test
29from tensorflow.python.training import adam
30from tensorflow.python.training import gradient_descent
31
32
33@keras_parameterized.run_all_keras_modes
34class LSTMLayerTest(keras_parameterized.TestCase):
35
36  def test_return_sequences_LSTM(self):
37    num_samples = 2
38    timesteps = 3
39    embedding_dim = 4
40    units = 2
41    testing_utils.layer_test(
42        keras.layers.LSTM,
43        kwargs={'units': units,
44                'return_sequences': True},
45        input_shape=(num_samples, timesteps, embedding_dim))
46
47  def test_static_shape_inference_LSTM(self):
48    # Github issue: 15165
49    timesteps = 3
50    embedding_dim = 4
51    units = 2
52
53    model = keras.models.Sequential()
54    inputs = keras.layers.Dense(embedding_dim,
55                                input_shape=(timesteps, embedding_dim))
56    model.add(inputs)
57    layer = keras.layers.LSTM(units, return_sequences=True)
58    model.add(layer)
59    outputs = model.layers[-1].output
60    self.assertEqual(outputs.get_shape().as_list(), [None, timesteps, units])
61
62  def test_dynamic_behavior_LSTM(self):
63    num_samples = 2
64    timesteps = 3
65    embedding_dim = 4
66    units = 2
67    layer = keras.layers.LSTM(units, input_shape=(None, embedding_dim))
68    model = keras.models.Sequential()
69    model.add(layer)
70    model.compile(
71        'rmsprop', 'mse', run_eagerly=testing_utils.should_run_eagerly())
72
73    x = np.random.random((num_samples, timesteps, embedding_dim))
74    y = np.random.random((num_samples, units))
75    model.train_on_batch(x, y)
76
77  def test_dropout_LSTM(self):
78    num_samples = 2
79    timesteps = 3
80    embedding_dim = 4
81    units = 2
82    testing_utils.layer_test(
83        keras.layers.LSTM,
84        kwargs={'units': units,
85                'dropout': 0.1,
86                'recurrent_dropout': 0.1},
87        input_shape=(num_samples, timesteps, embedding_dim))
88
89  @parameterized.parameters([0, 1, 2])
90  def test_implementation_mode_LSTM(self, implementation_mode):
91    num_samples = 2
92    timesteps = 3
93    embedding_dim = 4
94    units = 2
95    testing_utils.layer_test(
96        keras.layers.LSTM,
97        kwargs={'units': units,
98                'implementation': implementation_mode},
99        input_shape=(num_samples, timesteps, embedding_dim))
100
101  def test_constraints_LSTM(self):
102    embedding_dim = 4
103    layer_class = keras.layers.LSTM
104    k_constraint = keras.constraints.max_norm(0.01)
105    r_constraint = keras.constraints.max_norm(0.01)
106    b_constraint = keras.constraints.max_norm(0.01)
107    layer = layer_class(
108        5,
109        return_sequences=False,
110        weights=None,
111        input_shape=(None, embedding_dim),
112        kernel_constraint=k_constraint,
113        recurrent_constraint=r_constraint,
114        bias_constraint=b_constraint)
115    layer.build((None, None, embedding_dim))
116    self.assertEqual(layer.cell.kernel.constraint, k_constraint)
117    self.assertEqual(layer.cell.recurrent_kernel.constraint, r_constraint)
118    self.assertEqual(layer.cell.bias.constraint, b_constraint)
119
120  def test_with_masking_layer_LSTM(self):
121    layer_class = keras.layers.LSTM
122    inputs = np.random.random((2, 3, 4))
123    targets = np.abs(np.random.random((2, 3, 5)))
124    targets /= targets.sum(axis=-1, keepdims=True)
125    model = keras.models.Sequential()
126    model.add(keras.layers.Masking(input_shape=(3, 4)))
127    model.add(layer_class(units=5, return_sequences=True, unroll=False))
128    model.compile(
129        loss='categorical_crossentropy',
130        optimizer='rmsprop',
131        run_eagerly=testing_utils.should_run_eagerly())
132    model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
133
134  def test_masking_with_stacking_LSTM(self):
135    inputs = np.random.random((2, 3, 4))
136    targets = np.abs(np.random.random((2, 3, 5)))
137    targets /= targets.sum(axis=-1, keepdims=True)
138    model = keras.models.Sequential()
139    model.add(keras.layers.Masking(input_shape=(3, 4)))
140    lstm_cells = [keras.layers.LSTMCell(10), keras.layers.LSTMCell(5)]
141    model.add(keras.layers.RNN(lstm_cells, return_sequences=True, unroll=False))
142    model.compile(
143        loss='categorical_crossentropy',
144        optimizer='rmsprop',
145        run_eagerly=testing_utils.should_run_eagerly())
146    model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
147
148  def test_from_config_LSTM(self):
149    layer_class = keras.layers.LSTM
150    for stateful in (False, True):
151      l1 = layer_class(units=1, stateful=stateful)
152      l2 = layer_class.from_config(l1.get_config())
153      assert l1.get_config() == l2.get_config()
154
155  def test_specify_initial_state_keras_tensor(self):
156    num_states = 2
157    timesteps = 3
158    embedding_dim = 4
159    units = 3
160    num_samples = 2
161
162    # Test with Keras tensor
163    inputs = keras.Input((timesteps, embedding_dim))
164    initial_state = [keras.Input((units,)) for _ in range(num_states)]
165    layer = keras.layers.LSTM(units)
166    if len(initial_state) == 1:
167      output = layer(inputs, initial_state=initial_state[0])
168    else:
169      output = layer(inputs, initial_state=initial_state)
170    assert initial_state[0] in layer._inbound_nodes[0].input_tensors
171
172    model = keras.models.Model([inputs] + initial_state, output)
173    model.compile(loss='categorical_crossentropy',
174                  optimizer=adam.AdamOptimizer(),
175                  run_eagerly=testing_utils.should_run_eagerly())
176
177    inputs = np.random.random((num_samples, timesteps, embedding_dim))
178    initial_state = [np.random.random((num_samples, units))
179                     for _ in range(num_states)]
180    targets = np.random.random((num_samples, units))
181    model.train_on_batch([inputs] + initial_state, targets)
182
183  def test_specify_initial_state_non_keras_tensor(self):
184    num_states = 2
185    timesteps = 3
186    embedding_dim = 4
187    units = 3
188    num_samples = 2
189
190    # Test with non-Keras tensor
191    inputs = keras.Input((timesteps, embedding_dim))
192    initial_state = [keras.backend.random_normal_variable(
193        (num_samples, units), 0, 1)
194                     for _ in range(num_states)]
195    layer = keras.layers.LSTM(units)
196    output = layer(inputs, initial_state=initial_state)
197
198    model = keras.models.Model(inputs, output)
199    model.compile(loss='categorical_crossentropy',
200                  optimizer=adam.AdamOptimizer(),
201                  run_eagerly=testing_utils.should_run_eagerly())
202
203    inputs = np.random.random((num_samples, timesteps, embedding_dim))
204    targets = np.random.random((num_samples, units))
205    model.train_on_batch(inputs, targets)
206
207  def test_reset_states_with_values(self):
208    num_states = 2
209    timesteps = 3
210    embedding_dim = 4
211    units = 3
212    num_samples = 2
213
214    layer = keras.layers.LSTM(units, stateful=True)
215    layer.build((num_samples, timesteps, embedding_dim))
216    layer.reset_states()
217    assert len(layer.states) == num_states
218    assert layer.states[0] is not None
219    self.assertAllClose(
220        keras.backend.eval(layer.states[0]),
221        np.zeros(keras.backend.int_shape(layer.states[0])),
222        atol=1e-4)
223    state_shapes = [keras.backend.int_shape(state) for state in layer.states]
224    values = [np.ones(shape) for shape in state_shapes]
225    if len(values) == 1:
226      values = values[0]
227    layer.reset_states(values)
228    self.assertAllClose(
229        keras.backend.eval(layer.states[0]),
230        np.ones(keras.backend.int_shape(layer.states[0])),
231        atol=1e-4)
232
233    # Test with invalid data
234    with self.assertRaises(ValueError):
235      layer.reset_states([1] * (len(layer.states) + 1))
236
237  def test_specify_state_with_masking(self):
238    num_states = 2
239    timesteps = 3
240    embedding_dim = 4
241    units = 3
242    num_samples = 2
243
244    inputs = keras.Input((timesteps, embedding_dim))
245    _ = keras.layers.Masking()(inputs)
246    initial_state = [keras.Input((units,)) for _ in range(num_states)]
247    output = keras.layers.LSTM(units)(inputs, initial_state=initial_state)
248
249    model = keras.models.Model([inputs] + initial_state, output)
250    model.compile(
251        loss='categorical_crossentropy',
252        optimizer='rmsprop',
253        run_eagerly=testing_utils.should_run_eagerly())
254
255    inputs = np.random.random((num_samples, timesteps, embedding_dim))
256    initial_state = [np.random.random((num_samples, units))
257                     for _ in range(num_states)]
258    targets = np.random.random((num_samples, units))
259    model.train_on_batch([inputs] + initial_state, targets)
260
261  def test_return_state(self):
262    num_states = 2
263    timesteps = 3
264    embedding_dim = 4
265    units = 3
266    num_samples = 2
267
268    inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim))
269    layer = keras.layers.LSTM(units, return_state=True, stateful=True)
270    outputs = layer(inputs)
271    state = outputs[1:]
272    assert len(state) == num_states
273    model = keras.models.Model(inputs, state[0])
274
275    inputs = np.random.random((num_samples, timesteps, embedding_dim))
276    state = model.predict(inputs)
277    self.assertAllClose(keras.backend.eval(layer.states[0]), state, atol=1e-4)
278
279  def test_state_reuse(self):
280    timesteps = 3
281    embedding_dim = 4
282    units = 3
283    num_samples = 2
284
285    inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim))
286    layer = keras.layers.LSTM(units, return_state=True, return_sequences=True)
287    outputs = layer(inputs)
288    output, state = outputs[0], outputs[1:]
289    output = keras.layers.LSTM(units)(output, initial_state=state)
290    model = keras.models.Model(inputs, output)
291
292    inputs = np.random.random((num_samples, timesteps, embedding_dim))
293    outputs = model.predict(inputs)
294
295  def test_initial_states_as_other_inputs(self):
296    timesteps = 3
297    embedding_dim = 4
298    units = 3
299    num_samples = 2
300    num_states = 2
301    layer_class = keras.layers.LSTM
302
303    # Test with Keras tensor
304    main_inputs = keras.Input((timesteps, embedding_dim))
305    initial_state = [keras.Input((units,)) for _ in range(num_states)]
306    inputs = [main_inputs] + initial_state
307
308    layer = layer_class(units)
309    output = layer(inputs)
310    assert initial_state[0] in layer._inbound_nodes[0].input_tensors
311
312    model = keras.models.Model(inputs, output)
313    model.compile(loss='categorical_crossentropy',
314                  optimizer=adam.AdamOptimizer(),
315                  run_eagerly=testing_utils.should_run_eagerly())
316
317    main_inputs = np.random.random((num_samples, timesteps, embedding_dim))
318    initial_state = [np.random.random((num_samples, units))
319                     for _ in range(num_states)]
320    targets = np.random.random((num_samples, units))
321    model.train_on_batch([main_inputs] + initial_state, targets)
322
323  def test_regularizers_LSTM(self):
324    embedding_dim = 4
325    layer_class = keras.layers.LSTM
326    layer = layer_class(
327        5,
328        return_sequences=False,
329        weights=None,
330        input_shape=(None, embedding_dim),
331        kernel_regularizer=keras.regularizers.l1(0.01),
332        recurrent_regularizer=keras.regularizers.l1(0.01),
333        bias_regularizer='l2',
334        activity_regularizer='l1')
335    layer.build((None, None, 2))
336    self.assertEqual(len(layer.losses), 3)
337    x = keras.backend.variable(np.ones((2, 3, 2)))
338    layer(x)
339    if context.executing_eagerly():
340      self.assertEqual(len(layer.losses), 4)
341    else:
342      self.assertEqual(len(layer.get_losses_for(x)), 1)
343
344  def test_statefulness_LSTM(self):
345    num_samples = 2
346    timesteps = 3
347    embedding_dim = 4
348    units = 2
349    layer_class = keras.layers.LSTM
350    model = keras.models.Sequential()
351    model.add(
352        keras.layers.Embedding(
353            4,
354            embedding_dim,
355            mask_zero=True,
356            input_length=timesteps,
357            batch_input_shape=(num_samples, timesteps)))
358    layer = layer_class(
359        units, return_sequences=False, stateful=True, weights=None)
360    model.add(layer)
361    model.compile(optimizer=gradient_descent.GradientDescentOptimizer(0.01),
362                  loss='mse', run_eagerly=testing_utils.should_run_eagerly())
363    out1 = model.predict(np.ones((num_samples, timesteps)))
364    self.assertEqual(out1.shape, (num_samples, units))
365
366    # train once so that the states change
367    model.train_on_batch(
368        np.ones((num_samples, timesteps)), np.ones((num_samples, units)))
369    out2 = model.predict(np.ones((num_samples, timesteps)))
370
371    # if the state is not reset, output should be different
372    self.assertNotEqual(out1.max(), out2.max())
373
374    # check that output changes after states are reset
375    # (even though the model itself didn't change)
376    layer.reset_states()
377    out3 = model.predict(np.ones((num_samples, timesteps)))
378    self.assertNotEqual(out2.max(), out3.max())
379
380    # check that container-level reset_states() works
381    model.reset_states()
382    out4 = model.predict(np.ones((num_samples, timesteps)))
383    self.assertAllClose(out3, out4, atol=1e-5)
384
385    # check that the call to `predict` updated the states
386    out5 = model.predict(np.ones((num_samples, timesteps)))
387    self.assertNotEqual(out4.max(), out5.max())
388
389    # Check masking
390    layer.reset_states()
391
392    left_padded_input = np.ones((num_samples, timesteps))
393    left_padded_input[0, :1] = 0
394    left_padded_input[1, :2] = 0
395    out6 = model.predict(left_padded_input)
396
397    layer.reset_states()
398
399    right_padded_input = np.ones((num_samples, timesteps))
400    right_padded_input[0, -1:] = 0
401    right_padded_input[1, -2:] = 0
402    out7 = model.predict(right_padded_input)
403
404    self.assertAllClose(out7, out6, atol=1e-5)
405
406
407if __name__ == '__main__':
408  test.main()
409