1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 17 #define TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 18 19 #define EIGEN_USE_THREADS 20 21 #include "third_party/eigen3/Eigen/Core" 22 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" 23 #include "tensorflow/core/framework/tensor.h" 24 #include "tensorflow/core/framework/tensor_types.h" 25 #include "tensorflow/core/platform/types.h" 26 27 namespace tensorflow { 28 29 using CPUDevice = Eigen::ThreadPoolDevice; 30 31 namespace functor { 32 33 // Compute reduction over outer dimensions. 34 // Example: 35 // input: [D1, D2, ... , DN] 36 // -> 37 // output: [Di, ... , DN] where i belongs to set [1,N] 38 template <typename InputT, typename AccumT, typename OutputT, 39 typename BinaryFunctor> 40 struct ReduceOuterDimensions { ReduceOuterDimensionsReduceOuterDimensions41 ReduceOuterDimensions() {} 42 43 template <int num_dims> operatorReduceOuterDimensions44 void operator()(const CPUDevice& device, 45 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims, 46 const Tensor& input, Tensor* output) const { 47 // Compute inner and outer dim after reshaping into 2d tensor. 48 const int num_output_dims = output->dims(); 49 auto output_dims = output->template flat<OutputT>().dimensions(); 50 51 Eigen::Index inner_dim = 1, outer_dim = 1; 52 for (int i = 0; i < num_dims - num_output_dims; ++i) 53 outer_dim *= input_dims[i]; 54 for (int i = num_dims - num_output_dims; i < num_dims; ++i) 55 inner_dim *= input_dims[i]; 56 57 if (1 == outer_dim) { 58 // Nothing to do but passing input to output. 59 output->template flat<OutputT>() = 60 input.template flat<InputT>().template cast<OutputT>().reshape( 61 output_dims); 62 return; 63 } 64 65 // Get device thread num. 66 const Eigen::Index num_threads = device.numThreads(); 67 68 // If the inner dim parallelism is large enough 69 // TODO(ezhulenev): There seems to be no benefits in going this route. Check 70 // if this can be improved, or use better heuristic? 71 if (inner_dim > num_threads * 32) { 72 // Do not create more blocks than there are threads in a pool. 73 const Eigen::Index num_blocks = num_threads; 74 75 // Block size along the outer dimension. 76 const Eigen::Index inner_block_size = Eigen::divup(inner_dim, num_blocks); 77 const InputT* input_data = input.template flat<InputT>().data(); 78 79 // Allocate temporary buffer for partial reductions. 80 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index> buffer( 81 {inner_dim}); 82 buffer.setZero(); 83 AccumT* buffer_data = buffer.data(); 84 85 using Buffer = Eigen::TensorMap< 86 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 87 Eigen::Unaligned>; 88 89 using Input = Eigen::TensorMap< 90 Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>, 91 Eigen::Unaligned>; 92 93 const auto compute = [inner_dim, outer_dim, num_blocks, inner_block_size, 94 input_data, buffer_data]( 95 Eigen::Index start, Eigen::Index limit) -> void { 96 DCHECK(start >= 0 && limit <= num_blocks); 97 Eigen::Index inner_dim_start = start * inner_block_size; 98 Eigen::Index inner_dim_limit = limit * inner_block_size; 99 inner_dim_limit = std::min(inner_dim, inner_dim_limit); 100 Eigen::Index my_job_len = inner_dim_limit - inner_dim_start; 101 102 const InputT* my_job_start = input_data + inner_dim_start; 103 Buffer buf(buffer_data + inner_dim_start, my_job_len); 104 105 for (Eigen::Index i = 0; i < outer_dim; ++i) { 106 auto in = Input(my_job_start + i * inner_dim, my_job_len); 107 auto cast = in.template cast<AccumT>(); 108 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf), 109 const decltype(cast)>(buf, cast); 110 } 111 }; 112 113 // Compute cost of reducing a single block. 114 const Eigen::Index compute_size = outer_dim * inner_block_size; 115 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT); 116 const Eigen::TensorOpCost cost( 117 compute_input_bytes, 118 0, // We'll be mostly writing to L1, assume store cost is 0 119 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 120 121 device.parallelFor(num_blocks, cost, compute); 122 123 // Write final result to the output. 124 output->template flat<OutputT>() = 125 buffer.template cast<OutputT>().reshape(output_dims); 126 } else { 127 // Compute block size along the outer dimension for efficiency. 128 const Eigen::Index parallel_cell_size = inner_dim; 129 const Eigen::Index total_workload = outer_dim * inner_dim; 130 const Eigen::Index max_parallelism = total_workload / parallel_cell_size; 131 132 const Eigen::Index min_block_workload = 2000; 133 const Eigen::Index min_block_size = 134 Eigen::divup(min_block_workload, parallel_cell_size); 135 const Eigen::Index max_num_blocks = std::min( 136 max_parallelism, Eigen::divup(total_workload, min_block_size)); 137 138 // Do not create more blocks than there are threads in a pool. 139 const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads); 140 141 // Block size along the outer dimension. 142 const Eigen::Index outer_block_size = Eigen::divup(outer_dim, num_blocks); 143 144 const InputT* input_data = input.template flat<InputT>().data(); 145 146 // Allocate temporary buffer for partial reductions. 147 Tensor buffer(DataTypeToEnum<AccumT>::v(), {num_blocks, inner_dim}); 148 buffer.template flat<AccumT>().setZero(); 149 AccumT* buffer_data = buffer.template flat<AccumT>().data(); 150 151 using Buffer = Eigen::TensorMap< 152 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 153 Eigen::Unaligned>; 154 155 using Input = Eigen::TensorMap< 156 Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>, 157 Eigen::Unaligned>; 158 159 const auto compute = [inner_dim, num_blocks, outer_block_size, 160 buffer_data, input_data, outer_dim]( 161 Eigen::Index start, Eigen::Index limit) -> void { 162 DCHECK(start >= 0 && limit <= num_blocks); 163 Eigen::Index outer_dim_start = start * outer_block_size; 164 Eigen::Index outer_dim_limit = limit * outer_block_size; 165 outer_dim_limit = std::min(outer_dim, outer_dim_limit); 166 167 Buffer buf(buffer_data + start * inner_dim, inner_dim); 168 for (Eigen::Index i = outer_dim_start; i < outer_dim_limit; ++i) { 169 auto in = Input(input_data + i * inner_dim, inner_dim); 170 auto cast = in.template cast<AccumT>(); 171 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf), 172 const decltype(cast)>(buf, cast); 173 } 174 }; 175 176 // Compute cost of reducing a single block. 177 const Eigen::Index compute_size = outer_block_size * inner_dim; 178 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT); 179 const Eigen::TensorOpCost cost( 180 compute_input_bytes, 181 0, // We'll be mostly writing to L1, assume store cost is 0 182 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 183 184 device.parallelFor(num_blocks, cost, compute); 185 186 // Aggregate partial results from temporary buffer into first block. 187 auto buf0 = Buffer(buffer_data, inner_dim); 188 // Just sum the buffer up, as inner dimensions is not large in this case. 189 for (int i = 1; i < num_blocks; ++i) { 190 auto buf = Buffer(buffer_data + i * inner_dim, inner_dim); 191 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0), 192 const decltype(buf)>(buf0, buf); 193 } 194 // Write final result to the output. 195 output->template flat<OutputT>() = 196 buf0.template cast<OutputT>().reshape(output_dims); 197 } 198 } 199 }; 200 201 // Compute reduction to some serial middle dimensions (like a axis). 202 // Example: 203 // input: [D1, D2, ... , DN] 204 // -> 205 // output: [Di, ... , Dj] where i & j belongs to set [1,N]. 206 template <typename InputT, typename AccumT, typename OutputT, 207 typename BinaryFunctor, typename Reducer> 208 struct ReduceMiddleDimensions { ReduceMiddleDimensionsReduceMiddleDimensions209 ReduceMiddleDimensions() {} 210 211 template <int num_dims> operatorReduceMiddleDimensions212 void operator()(const CPUDevice& device, 213 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims, 214 const Tensor& input, Tensor* output, 215 const int axis_begin_dim) const { 216 // Compute dims after reshaping into 3d tensor. 217 const int num_output_dims = output->dims(); 218 auto output_dims = output->template flat<OutputT>().dimensions(); 219 220 Eigen::Index inner_dim = 1, middle_dim = 1, outer_dim = 1; 221 for (int i = 0; i < axis_begin_dim; ++i) outer_dim *= input_dims[i]; 222 for (int i = axis_begin_dim; i < axis_begin_dim + num_output_dims; ++i) 223 middle_dim *= input_dims[i]; 224 for (int i = axis_begin_dim + num_output_dims; i < num_dims; ++i) 225 inner_dim *= input_dims[i]; 226 227 if ((1 == inner_dim * outer_dim)) { 228 // Nothing to do. 229 output->template flat<OutputT>() = 230 input.template flat<InputT>().template cast<OutputT>().reshape( 231 output_dims); 232 return; 233 } else if (1 == inner_dim) { 234 // Equivalent to ReduceOuterDimensions. 235 const ReduceOuterDimensions<InputT, AccumT, OutputT, BinaryFunctor> redux; 236 redux(device, input_dims, input, output); 237 return; 238 } 239 240 // Compute block size along the outer dimension for efficiency. 241 const Eigen::Index parallel_cell_size = inner_dim; 242 const Eigen::Index max_parallelism = outer_dim * middle_dim; 243 const Eigen::Index total_workload = max_parallelism * inner_dim; 244 245 const Eigen::Index min_block_workload = 2000; 246 const Eigen::Index min_block_size = 247 Eigen::divup(min_block_workload, parallel_cell_size); 248 const Eigen::Index max_num_blocks = 249 std::min(max_parallelism, Eigen::divup(total_workload, min_block_size)); 250 251 // Do not create more blocks than there are threads in a pool. 252 const Eigen::Index num_threads = device.numThreads(); 253 const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads); 254 255 // Block size along the outer dimension. 256 const Eigen::Index outer_block_size = 257 Eigen::divup(total_workload, num_blocks); 258 259 const InputT* input_data = input.template flat<InputT>().data(); 260 261 // Allocate temporary buffer for partial reductions. 262 Eigen::Tensor<AccumT, 2> buffer(num_blocks, middle_dim); 263 buffer.setZero(); 264 AccumT* buffer_data = buffer.data(); 265 266 using Buffer = Eigen::TensorMap<Eigen::Tensor<AccumT, 1>>; 267 using Input = Eigen::TensorMap<Eigen::Tensor<const InputT, 1>>; 268 269 Eigen::array<Eigen::Index, 1> reduction_axis = {0}; 270 Reducer reducer; 271 const BinaryFunctor binary_op; 272 273 const auto compute = [inner_dim, middle_dim, input_data, buffer_data, 274 total_workload, num_blocks, outer_block_size, 275 reduction_axis, reducer, binary_op]( 276 Eigen::Index start, Eigen::Index limit) -> void { 277 DCHECK(start >= 0 && limit <= num_blocks); 278 Eigen::Index block_start = start * outer_block_size; 279 Eigen::Index block_limit = limit * outer_block_size; 280 block_limit = std::min(total_workload, block_limit); 281 Buffer buf(buffer_data + start * middle_dim, middle_dim); 282 283 const int align_start = 284 ((block_start + inner_dim - 1) / inner_dim) * inner_dim; 285 const int align_end = (block_limit / inner_dim) * inner_dim; 286 287 Eigen::Index coordinate = block_start / inner_dim % middle_dim; 288 Eigen::Tensor<AccumT, 0> reduced = 289 Input(&input_data[block_start], align_start - block_start) 290 .reduce(reduction_axis, reducer) 291 .template cast<AccumT>(); 292 293 buf(coordinate) = binary_op(buf(coordinate), reduced(0)); 294 295 coordinate = align_start / inner_dim % middle_dim; 296 for (int i = align_start; i < align_end; i += inner_dim) { 297 reduced = Input(&input_data[i], inner_dim) 298 .reduce(reduction_axis, reducer) 299 .template cast<AccumT>(); 300 buf(coordinate) = binary_op(buf(coordinate), reduced(0)); 301 ++coordinate; 302 if (middle_dim == coordinate) coordinate = 0; 303 } 304 305 reduced = Input(&input_data[align_end], block_limit - align_end) 306 .reduce(reduction_axis, reducer) 307 .template cast<AccumT>(); 308 buf(coordinate) = binary_op(buf(coordinate), reduced(0)); 309 }; 310 311 // Compute cost of reducing a single block. 312 const Eigen::Index compute_size = outer_block_size * inner_dim; 313 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT); 314 const Eigen::TensorOpCost cost( 315 compute_input_bytes, 316 0, // We'll be mostly writing to L1, assume store cost is 0 317 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 318 319 device.parallelFor(num_blocks, cost, compute); 320 321 using Output = Eigen::TensorMap< 322 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 323 Eigen::Unaligned>; 324 // Aggregate partial results from temporary buffer into first block. 325 auto buf0 = Output(buffer_data, middle_dim); 326 // TODO(ezhulenev): Parallelize this loop for large inner dimensions? 327 for (int i = 1; i < num_blocks; ++i) { 328 auto buf = Output(buffer_data + i * middle_dim, middle_dim); 329 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0), 330 const decltype(buf)>(buf0, buf); 331 } 332 333 // Write final result to the output. 334 output->template flat<OutputT>() = 335 buf0.template cast<OutputT>().reshape(output_dims); 336 } 337 }; 338 339 } // namespace functor 340 } // namespace tensorflow 341 342 #endif // TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 343