1# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for keras.layers.preprocessing.reduction."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21from absl.testing import parameterized
22import numpy as np
23
24from tensorflow.python import keras
25
26from tensorflow.python.keras import keras_parameterized
27from tensorflow.python.keras.layers.preprocessing import reduction
28from tensorflow.python.ops.ragged import ragged_factory_ops
29from tensorflow.python.platform import test
30
31
32@keras_parameterized.run_all_keras_modes
33class ReductionTest(keras_parameterized.TestCase):
34
35  @parameterized.named_parameters(
36      {
37          "testcase_name": "max",
38          "reduction_str": "max",
39          "expected_output": [[3.0, 3.0], [3.0, 2.0]]
40      }, {
41          "testcase_name": "mean",
42          "reduction_str": "mean",
43          "expected_output": [[2.0, 2.0], [2.0, 1.5]]
44      }, {
45          "testcase_name": "min",
46          "reduction_str": "min",
47          "expected_output": [[1.0, 1.0], [1.0, 1.0]]
48      }, {
49          "testcase_name": "prod",
50          "reduction_str": "prod",
51          "expected_output": [[6.0, 6.0], [3.0, 2.0]]
52      }, {
53          "testcase_name": "sum",
54          "reduction_str": "sum",
55          "expected_output": [[6.0, 6.0], [4.0, 3.0]]
56      })
57  def test_unweighted_ragged_reduction(self, reduction_str, expected_output):
58    data = ragged_factory_ops.constant([[[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],
59                                        [[3.0, 1.0], [1.0, 2.0]]])
60    input_tensor = keras.Input(shape=(None, None), ragged=True)
61
62    output_tensor = reduction.Reduction(reduction=reduction_str)(input_tensor)
63    model = keras.Model(input_tensor, output_tensor)
64
65    output = model.predict(data)
66
67    self.assertAllClose(expected_output, output)
68
69  @parameterized.named_parameters(
70      {
71          "testcase_name": "max",
72          "reduction_str": "max",
73          "expected_output": [[4.0, 4.0], [1.5, 6.0]]
74      }, {
75          "testcase_name": "mean",
76          "reduction_str": "mean",
77          "expected_output": [[2.0, 2.0], [1.666667, 1.75]]
78      }, {
79          "testcase_name": "min",
80          "reduction_str": "min",
81          "expected_output": [[1.0, 1.0], [1.0, 1.0]]
82      }, {
83          "testcase_name": "prod",
84          "reduction_str": "prod",
85          "expected_output": [[12.0, 12.0], [1.5, 6.0]]
86      }, {
87          "testcase_name": "sum",
88          "reduction_str": "sum",
89          "expected_output": [[8.0, 8.0], [2.5, 7.0]]
90      }, {
91          "testcase_name": "sqrtn",
92          "reduction_str": "sqrtn",
93          "expected_output": [[3.265986, 3.265986], [2.236067, 2.213594]]
94      })
95  def test_weighted_ragged_reduction(self, reduction_str, expected_output):
96    data = ragged_factory_ops.constant([[[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],
97                                        [[3.0, 1.0], [1.0, 2.0]]])
98    input_tensor = keras.Input(shape=(None, None), ragged=True)
99
100    weights = ragged_factory_ops.constant([[[1.0, 1.0], [2.0, 2.0], [1.0, 1.0]],
101                                           [[0.5, 1.0], [1.0, 3.0]]])
102    weight_input_tensor = keras.Input(shape=(None, None), ragged=True)
103
104    output_tensor = reduction.Reduction(reduction=reduction_str)(
105        input_tensor, weights=weight_input_tensor)
106    model = keras.Model([input_tensor, weight_input_tensor], output_tensor)
107
108    output = model.predict([data, weights])
109    self.assertAllClose(expected_output, output)
110
111  def test_weighted_ragged_reduction_with_different_dimensionality(self):
112    data = ragged_factory_ops.constant([[[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],
113                                        [[3.0, 1.0], [1.0, 2.0]]])
114    input_tensor = keras.Input(shape=(None, None), ragged=True)
115
116    weights = ragged_factory_ops.constant([[1.0, 2.0, 1.0], [1.0, 1.0]])
117    weight_input_tensor = keras.Input(shape=(None,), ragged=True)
118
119    output_tensor = reduction.Reduction(reduction="mean")(
120        input_tensor, weights=weight_input_tensor)
121    model = keras.Model([input_tensor, weight_input_tensor], output_tensor)
122
123    output = model.predict([data, weights])
124    expected_output = [[2.0, 2.0], [2.0, 1.5]]
125    self.assertAllClose(expected_output, output)
126
127  @parameterized.named_parameters(
128      {
129          "testcase_name": "max",
130          "reduction_str": "max",
131          "expected_output": [[3.0, 3.0], [3.0, 2.0]]
132      }, {
133          "testcase_name": "mean",
134          "reduction_str": "mean",
135          "expected_output": [[2.0, 2.0], [1.333333, 1.0]]
136      }, {
137          "testcase_name": "min",
138          "reduction_str": "min",
139          "expected_output": [[1.0, 1.0], [0.0, 0.0]]
140      }, {
141          "testcase_name": "prod",
142          "reduction_str": "prod",
143          "expected_output": [[6.0, 6.0], [0.0, 0.0]]
144      }, {
145          "testcase_name": "sum",
146          "reduction_str": "sum",
147          "expected_output": [[6.0, 6.0], [4.0, 3.0]]
148      })
149  def test_unweighted_dense_reduction(self, reduction_str, expected_output):
150    data = np.array([[[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],
151                     [[3.0, 1.0], [1.0, 2.0], [0.0, 0.0]]])
152    input_tensor = keras.Input(shape=(None, None))
153
154    output_tensor = reduction.Reduction(reduction=reduction_str)(input_tensor)
155    model = keras.Model(input_tensor, output_tensor)
156
157    output = model.predict(data)
158
159    self.assertAllClose(expected_output, output)
160
161  @parameterized.named_parameters(
162      {
163          "testcase_name": "max",
164          "reduction_str": "max",
165          "expected_output": [[4.0, 4.0], [1.5, 6.0]]
166      }, {
167          "testcase_name": "mean",
168          "reduction_str": "mean",
169          "expected_output": [[2.0, 2.0], [1.666667, 1.75]]
170      }, {
171          "testcase_name": "min",
172          "reduction_str": "min",
173          "expected_output": [[1.0, 1.0], [0.0, 0.0]]
174      }, {
175          "testcase_name": "prod",
176          "reduction_str": "prod",
177          "expected_output": [[12.0, 12.0], [0.0, 0.0]]
178      }, {
179          "testcase_name": "sum",
180          "reduction_str": "sum",
181          "expected_output": [[8.0, 8.0], [2.5, 7.0]]
182      }, {
183          "testcase_name": "sqrtn",
184          "reduction_str": "sqrtn",
185          "expected_output": [[3.265986, 3.265986], [2.236067, 2.213594]]
186      })
187  def test_weighted_dense_reduction(self, reduction_str, expected_output):
188    data = np.array([[[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],
189                     [[3.0, 1.0], [1.0, 2.0], [0.0, 0.0]]])
190    input_tensor = keras.Input(shape=(None, None))
191
192    weights = np.array([[[1.0, 1.0], [2.0, 2.0], [1.0, 1.0]],
193                        [[0.5, 1.0], [1.0, 3.0], [0.0, 0.0]]])
194    weight_input_tensor = keras.Input(shape=(None, None))
195
196    output_tensor = reduction.Reduction(reduction=reduction_str)(
197        input_tensor, weights=weight_input_tensor)
198    model = keras.Model([input_tensor, weight_input_tensor], output_tensor)
199
200    output = model.predict([data, weights])
201
202    self.assertAllClose(expected_output, output)
203
204  def test_weighted_dense_reduction_with_different_dimensionality(self):
205    data = np.array([[[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]],
206                     [[3.0, 1.0], [1.0, 2.0], [0.0, 0.0]]])
207    input_tensor = keras.Input(shape=(None, None))
208
209    weights = np.array([[1.0, 2.0, 1.0], [1.0, 1.0, 0.0]])
210    weight_input_tensor = keras.Input(shape=(None,))
211
212    output_tensor = reduction.Reduction(reduction="mean")(
213        input_tensor, weights=weight_input_tensor)
214    model = keras.Model([input_tensor, weight_input_tensor], output_tensor)
215
216    output = model.predict([data, weights])
217    expected_output = [[2.0, 2.0], [2.0, 1.5]]
218    self.assertAllClose(expected_output, output)
219
220  def test_sqrtn_fails_on_unweighted_ragged(self):
221    input_tensor = keras.Input(shape=(None, None), ragged=True)
222    with self.assertRaisesRegex(ValueError, ".*sqrtn.*"):
223      _ = reduction.Reduction(reduction="sqrtn")(input_tensor)
224
225  def test_sqrtn_fails_on_unweighted_dense(self):
226    input_tensor = keras.Input(shape=(None, None))
227    with self.assertRaisesRegex(ValueError, ".*sqrtn.*"):
228      _ = reduction.Reduction(reduction="sqrtn")(input_tensor)
229
230if __name__ == "__main__":
231  test.main()
232