1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_
17 #define TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_
18 
19 #define EIGEN_USE_THREADS
20 
21 #include "third_party/eigen3/Eigen/Core"
22 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
23 #include "tensorflow/core/framework/tensor.h"
24 #include "tensorflow/core/framework/tensor_types.h"
25 #include "tensorflow/core/platform/types.h"
26 
27 namespace tensorflow {
28 
29 using CPUDevice = Eigen::ThreadPoolDevice;
30 
31 namespace functor {
32 
33 // Compute reduction over outer dimensions.
34 // Example:
35 //   input: [D1, D2, ... , DN]
36 //   ->
37 //   output: [Di, ... , DN] where i belongs to set [1,N]
38 template <typename InputT, typename AccumT, typename OutputT,
39           typename BinaryFunctor>
40 struct ReduceOuterDimensions {
ReduceOuterDimensionsReduceOuterDimensions41   ReduceOuterDimensions() {}
42 
43   template <int num_dims>
operatorReduceOuterDimensions44   void operator()(const CPUDevice& device,
45                   const Eigen::DSizes<Eigen::Index, num_dims>& input_dims,
46                   const Tensor& input, Tensor* output) const {
47     // Compute inner and outer dim after reshaping into 2d tensor.
48     const int num_output_dims = output->dims();
49     auto output_dims = output->template flat<OutputT>().dimensions();
50 
51     Eigen::Index inner_dim = 1, outer_dim = 1;
52     for (int i = 0; i < num_dims - num_output_dims; ++i)
53       outer_dim *= input_dims[i];
54     for (int i = num_dims - num_output_dims; i < num_dims; ++i)
55       inner_dim *= input_dims[i];
56 
57     if (1 == outer_dim) {
58       // Nothing to do but passing input to output.
59       output->template flat<OutputT>() =
60           input.template flat<InputT>().template cast<OutputT>().reshape(
61               output_dims);
62       return;
63     }
64 
65     // Get device thread num.
66     const Eigen::Index num_threads = device.numThreads();
67 
68     // If the inner dim parallelism is large enough
69     // TODO(ezhulenev): There seems to be no benefits in going this route. Check
70     // if this can be improved, or use better heuristic?
71     if (inner_dim > num_threads * 32) {
72       // Do not create more blocks than there are threads in a pool.
73       const Eigen::Index num_blocks = num_threads;
74 
75       // Block size along the outer dimension.
76       const Eigen::Index inner_block_size = Eigen::divup(inner_dim, num_blocks);
77       const InputT* input_data = input.template flat<InputT>().data();
78 
79       // Allocate temporary buffer for partial reductions.
80       Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index> buffer(
81           {inner_dim});
82       buffer.setZero();
83       AccumT* buffer_data = buffer.data();
84 
85       using Buffer = Eigen::TensorMap<
86           Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>,
87           Eigen::Unaligned>;
88 
89       using Input = Eigen::TensorMap<
90           Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>,
91           Eigen::Unaligned>;
92 
93       const auto compute = [inner_dim, outer_dim, num_blocks, inner_block_size,
94                             input_data, buffer_data](
95                                Eigen::Index start, Eigen::Index limit) -> void {
96         DCHECK(start >= 0 && limit <= num_blocks);
97         Eigen::Index inner_dim_start = start * inner_block_size;
98         Eigen::Index inner_dim_limit = limit * inner_block_size;
99         inner_dim_limit = std::min(inner_dim, inner_dim_limit);
100         Eigen::Index my_job_len = inner_dim_limit - inner_dim_start;
101 
102         const InputT* my_job_start = input_data + inner_dim_start;
103         Buffer buf(buffer_data + inner_dim_start, my_job_len);
104 
105         for (Eigen::Index i = 0; i < outer_dim; ++i) {
106           auto in = Input(my_job_start + i * inner_dim, my_job_len);
107           auto cast = in.template cast<AccumT>();
108           buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf),
109                                            const decltype(cast)>(buf, cast);
110         }
111       };
112 
113       // Compute cost of reducing a single block.
114       const Eigen::Index compute_size = outer_dim * inner_block_size;
115       const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT);
116       const Eigen::TensorOpCost cost(
117           compute_input_bytes,
118           0,  // We'll be mostly writing to L1, assume store cost is 0
119           compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost);
120 
121       device.parallelFor(num_blocks, cost, compute);
122 
123       // Write final result to the output.
124       output->template flat<OutputT>() =
125           buffer.template cast<OutputT>().reshape(output_dims);
126     } else {
127       // Compute block size along the outer dimension for efficiency.
128       const Eigen::Index parallel_cell_size = inner_dim;
129       const Eigen::Index total_workload = outer_dim * inner_dim;
130       const Eigen::Index max_parallelism = total_workload / parallel_cell_size;
131 
132       const Eigen::Index min_block_workload = 2000;
133       const Eigen::Index min_block_size =
134           Eigen::divup(min_block_workload, parallel_cell_size);
135       const Eigen::Index max_num_blocks = std::min(
136           max_parallelism, Eigen::divup(total_workload, min_block_size));
137 
138       // Do not create more blocks than there are threads in a pool.
139       const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads);
140 
141       // Block size along the outer dimension.
142       const Eigen::Index outer_block_size = Eigen::divup(outer_dim, num_blocks);
143 
144       const InputT* input_data = input.template flat<InputT>().data();
145 
146       // Allocate temporary buffer for partial reductions.
147       Tensor buffer(DataTypeToEnum<AccumT>::v(), {num_blocks, inner_dim});
148       buffer.template flat<AccumT>().setZero();
149       AccumT* buffer_data = buffer.template flat<AccumT>().data();
150 
151       using Buffer = Eigen::TensorMap<
152           Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>,
153           Eigen::Unaligned>;
154 
155       using Input = Eigen::TensorMap<
156           Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>,
157           Eigen::Unaligned>;
158 
159       const auto compute = [inner_dim, num_blocks, outer_block_size,
160                             buffer_data, input_data, outer_dim](
161                                Eigen::Index start, Eigen::Index limit) -> void {
162         DCHECK(start >= 0 && limit <= num_blocks);
163         Eigen::Index outer_dim_start = start * outer_block_size;
164         Eigen::Index outer_dim_limit = limit * outer_block_size;
165         outer_dim_limit = std::min(outer_dim, outer_dim_limit);
166 
167         Buffer buf(buffer_data + start * inner_dim, inner_dim);
168         for (Eigen::Index i = outer_dim_start; i < outer_dim_limit; ++i) {
169           auto in = Input(input_data + i * inner_dim, inner_dim);
170           auto cast = in.template cast<AccumT>();
171           buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf),
172                                            const decltype(cast)>(buf, cast);
173         }
174       };
175 
176       // Compute cost of reducing a single block.
177       const Eigen::Index compute_size = outer_block_size * inner_dim;
178       const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT);
179       const Eigen::TensorOpCost cost(
180           compute_input_bytes,
181           0,  // We'll be mostly writing to L1, assume store cost is 0
182           compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost);
183 
184       device.parallelFor(num_blocks, cost, compute);
185 
186       // Aggregate partial results from temporary buffer into first block.
187       auto buf0 = Buffer(buffer_data, inner_dim);
188       // Just sum the buffer up, as inner dimensions is not large in this case.
189       for (int i = 1; i < num_blocks; ++i) {
190         auto buf = Buffer(buffer_data + i * inner_dim, inner_dim);
191         buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0),
192                                           const decltype(buf)>(buf0, buf);
193       }
194       // Write final result to the output.
195       output->template flat<OutputT>() =
196           buf0.template cast<OutputT>().reshape(output_dims);
197     }
198   }
199 };
200 
201 // Compute reduction to some serial middle dimensions (like a axis).
202 // Example:
203 //   input: [D1, D2, ... , DN]
204 //   ->
205 //   output: [Di, ... , Dj] where i & j belongs to set [1,N].
206 template <typename InputT, typename AccumT, typename OutputT,
207           typename BinaryFunctor, typename Reducer>
208 struct ReduceMiddleDimensions {
ReduceMiddleDimensionsReduceMiddleDimensions209   ReduceMiddleDimensions() {}
210 
211   template <int num_dims>
operatorReduceMiddleDimensions212   void operator()(const CPUDevice& device,
213                   const Eigen::DSizes<Eigen::Index, num_dims>& input_dims,
214                   const Tensor& input, Tensor* output,
215                   const int axis_begin_dim) const {
216     // Compute dims after reshaping into 3d tensor.
217     const int num_output_dims = output->dims();
218     auto output_dims = output->template flat<OutputT>().dimensions();
219 
220     Eigen::Index inner_dim = 1, middle_dim = 1, outer_dim = 1;
221     for (int i = 0; i < axis_begin_dim; ++i) outer_dim *= input_dims[i];
222     for (int i = axis_begin_dim; i < axis_begin_dim + num_output_dims; ++i)
223       middle_dim *= input_dims[i];
224     for (int i = axis_begin_dim + num_output_dims; i < num_dims; ++i)
225       inner_dim *= input_dims[i];
226 
227     if ((1 == inner_dim * outer_dim)) {
228       // Nothing to do.
229       output->template flat<OutputT>() =
230           input.template flat<InputT>().template cast<OutputT>().reshape(
231               output_dims);
232       return;
233     } else if (1 == inner_dim) {
234       // Equivalent to ReduceOuterDimensions.
235       const ReduceOuterDimensions<InputT, AccumT, OutputT, BinaryFunctor> redux;
236       redux(device, input_dims, input, output);
237       return;
238     }
239 
240     // Compute block size along the outer dimension for efficiency.
241     const Eigen::Index parallel_cell_size = inner_dim;
242     const Eigen::Index max_parallelism = outer_dim * middle_dim;
243     const Eigen::Index total_workload = max_parallelism * inner_dim;
244 
245     const Eigen::Index min_block_workload = 2000;
246     const Eigen::Index min_block_size =
247         Eigen::divup(min_block_workload, parallel_cell_size);
248     const Eigen::Index max_num_blocks =
249         std::min(max_parallelism, Eigen::divup(total_workload, min_block_size));
250 
251     // Do not create more blocks than there are threads in a pool.
252     const Eigen::Index num_threads = device.numThreads();
253     const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads);
254 
255     // Block size along the outer dimension.
256     const Eigen::Index outer_block_size =
257         Eigen::divup(total_workload, num_blocks);
258 
259     const InputT* input_data = input.template flat<InputT>().data();
260 
261     // Allocate temporary buffer for partial reductions.
262     Eigen::Tensor<AccumT, 2> buffer(num_blocks, middle_dim);
263     buffer.setZero();
264     AccumT* buffer_data = buffer.data();
265 
266     using Buffer = Eigen::TensorMap<Eigen::Tensor<AccumT, 1>>;
267     using Input = Eigen::TensorMap<Eigen::Tensor<const InputT, 1>>;
268 
269     Eigen::array<Eigen::Index, 1> reduction_axis = {0};
270     Reducer reducer;
271     const BinaryFunctor binary_op;
272 
273     const auto compute = [inner_dim, middle_dim, input_data, buffer_data,
274                           total_workload, num_blocks, outer_block_size,
275                           reduction_axis, reducer, binary_op](
276                              Eigen::Index start, Eigen::Index limit) -> void {
277       DCHECK(start >= 0 && limit <= num_blocks);
278       Eigen::Index block_start = start * outer_block_size;
279       Eigen::Index block_limit = limit * outer_block_size;
280       block_limit = std::min(total_workload, block_limit);
281       Buffer buf(buffer_data + start * middle_dim, middle_dim);
282 
283       const int align_start =
284           ((block_start + inner_dim - 1) / inner_dim) * inner_dim;
285       const int align_end = (block_limit / inner_dim) * inner_dim;
286 
287       Eigen::Index coordinate = block_start / inner_dim % middle_dim;
288       Eigen::Tensor<AccumT, 0> reduced =
289           Input(&input_data[block_start], align_start - block_start)
290               .reduce(reduction_axis, reducer)
291               .template cast<AccumT>();
292 
293       buf(coordinate) = binary_op(buf(coordinate), reduced(0));
294 
295       coordinate = align_start / inner_dim % middle_dim;
296       for (int i = align_start; i < align_end; i += inner_dim) {
297         reduced = Input(&input_data[i], inner_dim)
298                       .reduce(reduction_axis, reducer)
299                       .template cast<AccumT>();
300         buf(coordinate) = binary_op(buf(coordinate), reduced(0));
301         ++coordinate;
302         if (middle_dim == coordinate) coordinate = 0;
303       }
304 
305       reduced = Input(&input_data[align_end], block_limit - align_end)
306                     .reduce(reduction_axis, reducer)
307                     .template cast<AccumT>();
308       buf(coordinate) = binary_op(buf(coordinate), reduced(0));
309     };
310 
311     // Compute cost of reducing a single block.
312     const Eigen::Index compute_size = outer_block_size * inner_dim;
313     const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT);
314     const Eigen::TensorOpCost cost(
315         compute_input_bytes,
316         0,  // We'll be mostly writing to L1, assume store cost is 0
317         compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost);
318 
319     device.parallelFor(num_blocks, cost, compute);
320 
321     using Output = Eigen::TensorMap<
322         Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>,
323         Eigen::Unaligned>;
324     // Aggregate partial results from temporary buffer into first block.
325     auto buf0 = Output(buffer_data, middle_dim);
326     // TODO(ezhulenev): Parallelize this loop for large inner dimensions?
327     for (int i = 1; i < num_blocks; ++i) {
328       auto buf = Output(buffer_data + i * middle_dim, middle_dim);
329       buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0),
330                                         const decltype(buf)>(buf0, buf);
331     }
332 
333     // Write final result to the output.
334     output->template flat<OutputT>() =
335         buf0.template cast<OutputT>().reshape(output_dims);
336   }
337 };
338 
339 }  // namespace functor
340 }  // namespace tensorflow
341 
342 #endif  // TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_
343