1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for tensorflow.ctc_ops.ctc_loss_op."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import itertools
22
23import numpy as np
24from six.moves import zip_longest
25
26from tensorflow.python.framework import errors
27from tensorflow.python.framework import ops
28from tensorflow.python.framework import test_util
29from tensorflow.python.ops import array_ops
30from tensorflow.python.ops import ctc_ops
31from tensorflow.python.platform import test
32
33
34def grouper(iterable, n, fillvalue=None):
35  """Collect data into fixed-length chunks or blocks."""
36  # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
37  args = [iter(iterable)] * n
38  return zip_longest(fillvalue=fillvalue, *args)
39
40
41def flatten(list_of_lists):
42  """Flatten one level of nesting."""
43  return itertools.chain.from_iterable(list_of_lists)
44
45
46class CTCGreedyDecoderTest(test.TestCase):
47
48  def _testCTCDecoder(self,
49                      decoder,
50                      inputs,
51                      seq_lens,
52                      log_prob_truth,
53                      decode_truth,
54                      expected_err_re=None,
55                      **decoder_args):
56    inputs_t = [ops.convert_to_tensor(x) for x in inputs]
57    # convert inputs_t into a [max_time x batch_size x depth] tensor
58    # from a len time python list of [batch_size x depth] tensors
59    inputs_t = array_ops.stack(inputs_t)
60
61    with self.cached_session(use_gpu=False) as sess:
62      decoded_list, log_probability = decoder(
63          inputs_t, sequence_length=seq_lens, **decoder_args)
64      decoded_unwrapped = list(
65          flatten([(st.indices, st.values, st.dense_shape) for st in
66                   decoded_list]))
67
68      if expected_err_re is None:
69        outputs = sess.run(decoded_unwrapped + [log_probability])
70
71        # Group outputs into (ix, vals, shape) tuples
72        output_sparse_tensors = list(grouper(outputs[:-1], 3))
73
74        output_log_probability = outputs[-1]
75
76        # Check the number of decoded outputs (top_paths) match
77        self.assertEqual(len(output_sparse_tensors), len(decode_truth))
78
79        # For each SparseTensor tuple, compare (ix, vals, shape)
80        for out_st, truth_st, tf_st in zip(output_sparse_tensors, decode_truth,
81                                           decoded_list):
82          self.assertAllEqual(out_st[0], truth_st[0])  # ix
83          self.assertAllEqual(out_st[1], truth_st[1])  # vals
84          self.assertAllEqual(out_st[2], truth_st[2])  # shape
85          # Compare the shapes of the components with the truth. The
86          # `None` elements are not known statically.
87          self.assertEqual([None, truth_st[0].shape[1]],
88                           tf_st.indices.get_shape().as_list())
89          self.assertEqual([None], tf_st.values.get_shape().as_list())
90          self.assertShapeEqual(truth_st[2], tf_st.dense_shape)
91
92        # Make sure decoded probabilities match
93        self.assertAllClose(output_log_probability, log_prob_truth, atol=1e-6)
94      else:
95        with self.assertRaisesOpError(expected_err_re):
96          sess.run(decoded_unwrapped + [log_probability])
97
98  @test_util.run_deprecated_v1
99  def testCTCGreedyDecoder(self):
100    """Test two batch entries - best path decoder."""
101    max_time_steps = 6
102    # depth == 4
103
104    seq_len_0 = 4
105    input_prob_matrix_0 = np.asarray(
106        [[1.0, 0.0, 0.0, 0.0],  # t=0
107         [0.0, 0.0, 0.4, 0.6],  # t=1
108         [0.0, 0.0, 0.4, 0.6],  # t=2
109         [0.0, 0.9, 0.1, 0.0],  # t=3
110         [0.0, 0.0, 0.0, 0.0],  # t=4 (ignored)
111         [0.0, 0.0, 0.0, 0.0]],  # t=5 (ignored)
112        dtype=np.float32)
113    input_log_prob_matrix_0 = np.log(input_prob_matrix_0)
114
115    seq_len_1 = 5
116    # dimensions are time x depth
117
118    input_prob_matrix_1 = np.asarray(
119        [
120            [0.1, 0.9, 0.0, 0.0],  # t=0
121            [0.0, 0.9, 0.1, 0.0],  # t=1
122            [0.0, 0.0, 0.1, 0.9],  # t=2
123            [0.0, 0.9, 0.1, 0.1],  # t=3
124            [0.9, 0.1, 0.0, 0.0],  # t=4
125            [0.0, 0.0, 0.0, 0.0]
126        ],  # t=5 (ignored)
127        dtype=np.float32)
128    input_log_prob_matrix_1 = np.log(input_prob_matrix_1)
129
130    # len max_time_steps array of batch_size x depth matrices
131    inputs = [
132        np.vstack(
133            [input_log_prob_matrix_0[t, :], input_log_prob_matrix_1[t, :]])
134        for t in range(max_time_steps)
135    ]
136
137    # batch_size length vector of sequence_lengths
138    seq_lens = np.array([seq_len_0, seq_len_1], dtype=np.int32)
139
140    # batch_size length vector of negative log probabilities
141    log_prob_truth = np.array([
142        np.sum(-np.log([1.0, 0.6, 0.6, 0.9])),
143        np.sum(-np.log([0.9, 0.9, 0.9, 0.9, 0.9]))
144    ], np.float32)[:, np.newaxis]
145
146    # decode_truth: one SparseTensor (ix, vals, shape)
147    decode_truth = [
148        (
149            np.array(
150                [
151                    [0, 0],  # batch 0, 2 outputs
152                    [0, 1],
153                    [1, 0],  # batch 1, 3 outputs
154                    [1, 1],
155                    [1, 2]
156                ],
157                dtype=np.int64),
158            np.array(
159                [
160                    0,
161                    1,  # batch 0
162                    1,
163                    1,
164                    0
165                ],  # batch 1
166                dtype=np.int64),
167            # shape is batch x max_decoded_length
168            np.array(
169                [2, 3], dtype=np.int64)),
170    ]
171
172    self._testCTCDecoder(ctc_ops.ctc_greedy_decoder, inputs, seq_lens,
173                         log_prob_truth, decode_truth)
174
175  @test_util.run_deprecated_v1
176  def testCTCDecoderBeamSearch(self):
177    """Test one batch, two beams - hibernating beam search."""
178    # max_time_steps == 8
179    depth = 6
180
181    seq_len_0 = 5
182    input_prob_matrix_0 = np.asarray(
183        [
184            [0.30999, 0.309938, 0.0679938, 0.0673362, 0.0708352, 0.173908],
185            [0.215136, 0.439699, 0.0370931, 0.0393967, 0.0381581, 0.230517],
186            [0.199959, 0.489485, 0.0233221, 0.0251417, 0.0233289, 0.238763],
187            [0.279611, 0.452966, 0.0204795, 0.0209126, 0.0194803, 0.20655],
188            [0.51286, 0.288951, 0.0243026, 0.0220788, 0.0219297, 0.129878],
189            # Random entry added in at time=5
190            [0.155251, 0.164444, 0.173517, 0.176138, 0.169979, 0.160671]
191        ],
192        dtype=np.float32)
193    # Add arbitrary offset - this is fine
194    input_prob_matrix_0 = input_prob_matrix_0 + 2.0
195
196    # len max_time_steps array of batch_size x depth matrices
197    inputs = ([
198        input_prob_matrix_0[t, :][np.newaxis, :] for t in range(seq_len_0)
199    ]  # Pad to max_time_steps = 8
200              + 2 * [np.zeros(
201                  (1, depth), dtype=np.float32)])
202
203    # batch_size length vector of sequence_lengths
204    seq_lens = np.array([seq_len_0], dtype=np.int32)
205
206    # batch_size length vector of log probabilities
207    log_prob_truth = np.array(
208        [
209            -5.811451,  # output beam 0
210            -6.63339  # output beam 1
211        ],
212        np.float32)[np.newaxis, :]
213
214    # decode_truth: two SparseTensors, (ix, values, shape)
215    decode_truth = [
216        # beam 0, batch 0, two outputs decoded
217        (np.array(
218            [[0, 0], [0, 1]], dtype=np.int64), np.array(
219                [1, 0], dtype=np.int64), np.array(
220                    [1, 2], dtype=np.int64)),
221        # beam 1, batch 0, one output decoded
222        (np.array(
223            [[0, 0]], dtype=np.int64), np.array(
224                [1], dtype=np.int64), np.array(
225                    [1, 1], dtype=np.int64)),
226    ]
227
228    # Test correct decoding.
229    self._testCTCDecoder(
230        ctc_ops.ctc_beam_search_decoder,
231        inputs,
232        seq_lens,
233        log_prob_truth,
234        decode_truth,
235        beam_width=2,
236        top_paths=2)
237
238    # Requesting more paths than the beam width allows.
239    with self.assertRaisesRegexp(errors.InvalidArgumentError,
240                                 (".*requested more paths than the beam "
241                                  "width.*")):
242      self._testCTCDecoder(
243          ctc_ops.ctc_beam_search_decoder,
244          inputs,
245          seq_lens,
246          log_prob_truth,
247          decode_truth,
248          beam_width=2,
249          top_paths=3)
250
251
252if __name__ == "__main__":
253  test.main()
254