1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ 17 #define TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ 18 19 #include "tensorflow/core/kernels/typed_conditional_accumulator_base.h" 20 21 namespace tensorflow { 22 23 /** 24 * An aggregation object for adding sparse gradients, represented as a tuple of 25 * indices, values, and a (possibly empty) shape. 26 * 27 * The two main methods of this class are TryApplyGrad and TryTakeGrad. 28 * 29 * TryApplyGrad tries add a gradient to the accumulator. The attempt is 30 * successful if local_step >= global_step, i.e., if the gradient is not stale, 31 * having been computed using up-to-date information. Otherwise, the gradient is 32 * silently dropped. 33 * 34 * TryTakeGrad logs an attempt to read the average gradient. The attempt is 35 * blocked until the number of gradients accumulated (via TryApplyGrad) is equal 36 * or exceeds the number requested by TryTakeGrad. 37 * Once this condition is satisfied, the following actions are taken: 38 * (1) the value of the average gradient is returned 39 * (2) the count of accumulated gradients is reset to 0 40 * (3) the internal global_step value (current_global_step_) is incremented by 1 41 * 42 * SparseConditionalAccumulator is the datatype-dependent templated sub-class of 43 * ConditionalAccumulatorBase. It implements the virtual arithmetic methods that 44 * are used by for aggregating, averaging, allocating, returning indexed slices. 45 */ 46 template <typename Device, typename T> 47 class SparseConditionalAccumulator 48 : public TypedConditionalAccumulatorBase< 49 std::tuple<const Tensor*, const Tensor*, const Tensor*>> { 50 public: SparseConditionalAccumulator(const DataType & dtype,const PartialTensorShape & shape,const string & name,const string & reduction_type)51 SparseConditionalAccumulator(const DataType& dtype, 52 const PartialTensorShape& shape, 53 const string& name, const string& reduction_type) 54 : TypedConditionalAccumulatorBase< 55 std::tuple<const Tensor*, const Tensor*, const Tensor*>>( 56 dtype, shape, name, reduction_type) { 57 accum_idx_vec_ = nullptr; 58 count_element_ = nullptr; 59 accum_val_ = nullptr; 60 accum_val_persistent_ = new PersistentTensor(); 61 } 62 ~SparseConditionalAccumulator()63 ~SparseConditionalAccumulator() override { 64 if (accum_idx_vec_ != nullptr) delete accum_idx_vec_; 65 if (count_element_ != nullptr) delete count_element_; 66 if (accum_val_persistent_ != nullptr) delete accum_val_persistent_; 67 // Do not delete accum_val_! Will be automatically garbage collected 68 }; 69 70 protected: 71 std::vector<int64>* accum_idx_vec_ = nullptr; 72 std::vector<int>* count_element_ = nullptr; 73 74 Tensor* accum_val_ = nullptr; 75 PersistentTensor* accum_val_persistent_ = nullptr; 76 77 typedef Eigen::TensorMap<Eigen::Tensor<T, 1, Eigen::RowMajor>, 78 Eigen::Unaligned> 79 SliceT; 80 typedef Eigen::TensorMap<Eigen::Tensor<const T, 1, Eigen::RowMajor>, 81 Eigen::Unaligned> 82 SliceConstT; 83 ValidateShape(std::tuple<const Tensor *,const Tensor *,const Tensor * > * tensor,bool has_known_shape)84 Status ValidateShape( 85 std::tuple<const Tensor*, const Tensor*, const Tensor*>* tensor, 86 bool has_known_shape) EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { 87 const Tensor* tensor_idx = std::get<0>(*tensor); 88 const Tensor* tensor_val = std::get<1>(*tensor); 89 const Tensor* tensor_shape = std::get<2>(*tensor); 90 int64 grad_val_dims = tensor_val->dims(); 91 int64 grad_dims = grad_val_dims; 92 93 // Compare with provided shape 94 if (has_known_shape) { 95 if (shape_.dims() > tensor_shape->NumElements()) { 96 return errors::InvalidArgument( 97 "Shape mismatch: expected shape rank at least ", shape_.dims(), 98 ", got ", tensor_shape->NumElements()); 99 } 100 const auto tensor_shape_flat = tensor_shape->flat<int64>(); 101 for (int64 i = 0; i < shape_.dims(); i++) { 102 if (shape_.dim_size(i) != -1 && 103 shape_.dim_size(i) != tensor_shape_flat(i)) { 104 return errors::InvalidArgument("Shape mismatch: expected shape dim ", 105 i, " to be ", shape_.dim_size(i), 106 ", got ", tensor_shape_flat(i)); 107 } 108 } 109 } 110 // Check that indices are within limits 111 if (shape_.dims() > 0 && shape_.dim_size(0) != -1 && 112 tensor_idx->dims() > 0) { 113 for (int64 i = 0; i < tensor_idx->dim_size(0); i++) { 114 if (tensor_idx->vec<int64>()(i) >= shape_.dim_size(0)) { 115 return errors::InvalidArgument( 116 "Shape mismatch: index of slice ", i, " exceeded limits of shape", 117 "; index is ", tensor_idx->vec<int64>()(i), " exceeded ", 118 shape_.dim_size(0)); 119 } 120 } 121 } 122 123 // Check values compatibility with accumulated gradient if available 124 if (counter_ > 0) { 125 int64 accum_val_dims = accum_val_->dims(); 126 if (accum_val_dims != grad_val_dims) { 127 return errors::InvalidArgument("Shape mismatch: expected values rank ", 128 accum_val_dims, ", got ", grad_val_dims); 129 } 130 for (int64 i = 1; i < accum_val_dims; i++) { 131 if (accum_val_->dim_size(i) != tensor_val->dim_size(i)) { 132 return errors::InvalidArgument("Shape mismatch: expected values dim ", 133 i, " to be ", accum_val_->dim_size(i), 134 ", got ", tensor_val->dim_size(i)); 135 } 136 } 137 } else { 138 // If there are no accumulated gradients, check against shape_ 139 if (shape_.dims() > grad_dims) { 140 return errors::InvalidArgument( 141 "Shape mismatch: expected values rank at least ", shape_.dims(), 142 ", got ", grad_dims); 143 } 144 // Check that values have correct dimensions 145 for (int64 i = 1; i < shape_.dims(); i++) { 146 if (shape_.dim_size(i) != -1 && 147 shape_.dim_size(i) != tensor_val->dim_size(i)) { 148 return errors::InvalidArgument("Shape mismatch: expected values dim ", 149 i, " to be ", shape_.dim_size(i), 150 ", got ", tensor_val->dim_size(i)); 151 } 152 } 153 } 154 155 return Status::OK(); 156 } 157 AllocateAndAssignToAccumGradFunction(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > * grad)158 void AllocateAndAssignToAccumGradFunction( 159 OpKernelContext* ctx, 160 std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override { 161 const Tensor* grad_idx = std::get<0>(*grad); 162 const Tensor* grad_val = std::get<1>(*grad); 163 164 const int64 nnz = grad_idx->dim_size(0); 165 166 // Assign indices 167 if (accum_idx_vec_ != nullptr) delete accum_idx_vec_; 168 accum_idx_vec_ = new std::vector<int64>(); 169 accum_idx_vec_->reserve(nnz); 170 for (int i = 0; i < nnz; i++) { 171 accum_idx_vec_->push_back(grad_idx->vec<int64>()(i)); 172 } 173 174 // Assign values to accum_val_tensor 175 // TODO(b/32704451): Don't just ignore the ::tensorflow::Status object! 176 ctx->allocate_persistent(dtype_, grad_val->shape(), accum_val_persistent_, 177 &accum_val_) 178 .IgnoreError(); 179 accum_val_->flat<T>().device(ctx->template eigen_device<Device>()) = 180 grad_val->flat<T>(); 181 182 // Assign count_element_ 183 if (count_element_ != nullptr) { 184 delete count_element_; 185 } 186 count_element_ = new std::vector<int>(nnz, 1); 187 188 // Do not need shape; Assume that the op has checked that the shapes match, 189 // so grad's shape == shape_ 190 } 191 AddToAccumGradFunction(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > * grad)192 void AddToAccumGradFunction( 193 OpKernelContext* ctx, 194 std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override { 195 // Modeled after third_party/tensorflow/core/kernels/sparse_add_op 196 197 const Tensor* grad_idx = std::get<0>(*grad); 198 const Tensor* grad_val = std::get<1>(*grad); 199 200 const int64 accum_nnz = accum_idx_vec_->size(); 201 const int64 grad_nnz = grad_idx->dim_size(0); 202 203 // Source enumerates the origin of a non-zero element: whether it is from 204 // the new gradient, the accumulated gradient, or the sum of both. 205 enum Source { from_accum, from_grad, from_accum_and_grad }; 206 207 // (1) do a pass over inputs, and append values and indices to vectors 208 std::vector<std::tuple<Source, int64, int64>> entries_to_copy; 209 entries_to_copy.reserve(accum_nnz + grad_nnz); 210 211 // Pass over all non-zero elements of both the gradient and the accumulated 212 // value, to identify where each non-zero element of the sum comes from. 213 // The input and output indexed slices are assumed to be ordered along 214 // increasing dimension number. 215 int64 i = 0, j = 0; 216 int64 sum_nnz = 0; 217 while (i < accum_nnz && j < grad_nnz) { 218 sum_nnz++; 219 switch (cmp(accum_idx_vec_, grad_idx, i, j)) { 220 case -1: 221 entries_to_copy.emplace_back(from_accum, i, -1); 222 ++i; 223 break; 224 case 0: 225 entries_to_copy.emplace_back(from_accum_and_grad, i, j); 226 ++i; 227 ++j; 228 break; 229 case 1: 230 entries_to_copy.emplace_back(from_grad, -1, j); 231 ++j; 232 break; 233 } 234 } 235 236 // Handle leftovers 237 while (i < accum_nnz) { 238 sum_nnz++; 239 entries_to_copy.emplace_back(from_accum, i, -1); 240 ++i; 241 } 242 while (j < grad_nnz) { 243 sum_nnz++; 244 entries_to_copy.emplace_back(from_grad, -1, j); 245 ++j; 246 } 247 248 // (2) Copy or sum the non-zero elements into sum_indices and sum_tensor 249 std::vector<int64>* sum_indices_vec = new std::vector<int64>(); 250 sum_indices_vec->reserve(sum_nnz); 251 252 std::vector<int>* sum_counts = new std::vector<int>(); 253 sum_counts->reserve(sum_nnz); 254 255 Tensor* sum_tensor = nullptr; 256 PersistentTensor* tensor_sum_persistent = new PersistentTensor(); 257 258 TensorShape sum_shape = grad_val->shape(); 259 sum_shape.set_dim(0, sum_nnz); 260 261 OP_REQUIRES_OK( 262 ctx, ctx->allocate_persistent(dtype_, sum_shape, tensor_sum_persistent, 263 &sum_tensor)); 264 auto sum_flat = sum_tensor->flat_outer_dims<T>(); 265 auto accum_flat = accum_val_->flat_outer_dims<T>(); 266 auto grad_flat = grad_val->flat_outer_dims<T>(); 267 268 const int64 num_col = grad_flat.dimension(1); 269 270 Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(num_col); 271 272 for (i = 0; i < sum_nnz; ++i) { 273 const Source src = std::get<0>(entries_to_copy[i]); 274 const int64 idx_a = std::get<1>(entries_to_copy[i]); 275 const int64 idx_b = std::get<2>(entries_to_copy[i]); 276 T* sum_slice_ptr = &sum_flat(i, 0); 277 SliceT sum_slice(sum_slice_ptr, slice_shape); 278 if (src == from_accum) { 279 // Element comes from accumulator; directly copy data structures over 280 sum_indices_vec->push_back(accum_idx_vec_->at(idx_a)); 281 T* accum_slice_ptr = &accum_flat(idx_a, 0); 282 SliceT accum_slice(accum_slice_ptr, slice_shape); 283 sum_slice = accum_slice; 284 sum_counts->push_back(count_element_->at(idx_a)); 285 } else if (src == from_accum_and_grad) { 286 // Element is a sum of accumulated value and new gradient; 287 // compute sum here 288 sum_indices_vec->push_back(accum_idx_vec_->at(idx_a)); 289 const T* grad_slice_ptr = &grad_flat(idx_b, 0); 290 SliceConstT grad_slice(grad_slice_ptr, slice_shape); 291 T* accum_slice_ptr = &accum_flat(idx_a, 0); 292 SliceT accum_slice(accum_slice_ptr, slice_shape); 293 sum_slice = grad_slice + accum_slice; 294 sum_counts->push_back(count_element_->at(idx_a) + 1); 295 } else if (src == from_grad) { 296 // Element comes from new gradient; make a copy of indices and values 297 sum_indices_vec->push_back(grad_idx->vec<int64>()(idx_b)); 298 const T* grad_slice_ptr = &grad_flat(idx_b, 0); 299 SliceConstT grad_slice(grad_slice_ptr, slice_shape); 300 sum_slice = grad_slice; 301 sum_counts->push_back(1); 302 } 303 } 304 305 // (3) Keep output, i.e., switch pointers to point to new data structures 306 // representing the sum 307 // Indices 308 if (accum_idx_vec_ != nullptr) delete accum_idx_vec_; 309 accum_idx_vec_ = sum_indices_vec; 310 // Values 311 accum_val_ = sum_tensor; 312 delete accum_val_persistent_; 313 accum_val_persistent_ = tensor_sum_persistent; 314 // Counts 315 if (count_element_ != nullptr) delete count_element_; 316 count_element_ = sum_counts; 317 318 // No need to copy shape, since shape remains the same after sum. 319 } 320 DivideAccumGradByCounter(OpKernelContext * ctx)321 void DivideAccumGradByCounter(OpKernelContext* ctx) override 322 EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { 323 const int64 nnz = count_element_->size(); 324 auto accum_flat = accum_val_->flat_outer_dims<T>(); 325 std::vector<T> count_typet; 326 std::transform(count_element_->begin(), count_element_->end(), 327 std::back_inserter(count_typet), 328 TypeConverter<T, int>::ConvertUToT); 329 330 // Option 1: divide all by counter 331 /* 332 std::transform( 333 &accum_flat(0,0), &accum_flat(nnz,0), &accum_flat(0,0), 334 std::bind2nd(std::divides<T>(), 335 TypeConverter<T, int>::ConvertUToT(this->counter_))); 336 */ 337 338 // Option 2: average element-wise 339 Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(accum_flat.dimension(1)); 340 for (int64 i = 0; i < nnz; i++) { 341 T* accum_slice_ptr = &accum_flat(i, 0); 342 SliceT accum_slice(accum_slice_ptr, slice_shape); 343 accum_slice.device(ctx->template eigen_device<Device>()) = 344 accum_slice / count_typet[i]; 345 } 346 } 347 SetOutput(OpKernelContext * ctx)348 bool SetOutput(OpKernelContext* ctx) override { 349 bool is_successful = true; 350 if (is_successful) is_successful = ReturnIdxTensor(ctx); 351 if (is_successful) is_successful = ReturnValTensor(ctx); 352 if (is_successful) is_successful = ReturnShapeTensor(ctx); 353 return is_successful; 354 } 355 GetAndValidateTensorInputForApplyGrad(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > ** tensor)356 bool GetAndValidateTensorInputForApplyGrad( 357 OpKernelContext* ctx, 358 std::tuple<const Tensor*, const Tensor*, const Tensor*>** tensor) override 359 EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { 360 // TODO(xinghao, jmchen): The roundabout way of getting attr from 361 // OpKernelContext (instead of OpKernelConstruction) is a hack, and should 362 // be fixed if it affects efficiency. 363 bool has_known_shape = false; 364 OP_REQUIRES_OK_BOOLEAN( 365 ctx, GetNodeAttr(ctx->op_kernel().def(), "has_known_shape", 366 &has_known_shape)); 367 368 // Get input gradient tensors 369 const Tensor* grad_idx_tensor; 370 OP_REQUIRES_OK_BOOLEAN(ctx, 371 ctx->input("gradient_indices", &grad_idx_tensor)); 372 const Tensor* grad_val_tensor; 373 OP_REQUIRES_OK_BOOLEAN(ctx, 374 ctx->input("gradient_values", &grad_val_tensor)); 375 const Tensor* grad_shape_tensor = nullptr; 376 if (has_known_shape) { 377 OP_REQUIRES_OK_BOOLEAN(ctx, 378 ctx->input("gradient_shape", &grad_shape_tensor)); 379 } 380 381 // Checks 382 OP_REQUIRES_BOOLEAN( 383 ctx, TensorShapeUtils::IsVector(grad_idx_tensor->shape()), 384 errors::InvalidArgument( 385 "Input indices should be vector but received shape: ", 386 grad_idx_tensor->shape().DebugString())); 387 const int64 nnz = grad_idx_tensor->dim_size(0); 388 OP_REQUIRES_BOOLEAN( 389 ctx, grad_val_tensor->dims() > 0, 390 errors::InvalidArgument("Values cannot be 0-dimensional.")); 391 OP_REQUIRES_BOOLEAN(ctx, grad_val_tensor->dim_size(0) == nnz, 392 errors::InvalidArgument("Expected ", nnz, 393 " non-empty input values, got ", 394 grad_val_tensor->dim_size(0))); 395 396 *tensor = new std::tuple<const Tensor*, const Tensor*, const Tensor*>( 397 grad_idx_tensor, grad_val_tensor, grad_shape_tensor); 398 399 OP_REQUIRES_OK_BOOLEAN(ctx, this->ValidateShape(*tensor, has_known_shape)); 400 401 return true; 402 } 403 CleanUpGradTensor(std::tuple<const Tensor *,const Tensor *,const Tensor * > * tensor)404 void CleanUpGradTensor(std::tuple<const Tensor*, const Tensor*, 405 const Tensor*>* tensor) override { 406 if (tensor != nullptr) delete tensor; 407 } 408 409 private: cmp(std::vector<int64> * a_idx,const Tensor * b_idx,const int64 a_row,const int64 b_row)410 inline int cmp(std::vector<int64>* a_idx, const Tensor* b_idx, 411 const int64 a_row, const int64 b_row) { 412 const int64 a = a_idx->at(a_row); 413 const int64 b = b_idx->vec<int64>()(b_row); 414 if (a < b) { 415 return -1; 416 } else if (a > b) { 417 return 1; 418 } 419 return 0; 420 } 421 ReturnIdxTensor(OpKernelContext * ctx)422 inline bool ReturnIdxTensor(OpKernelContext* ctx) { 423 Tensor* idx_tensor; 424 const int64 nnz = accum_idx_vec_->size(); 425 OP_REQUIRES_OK_BOOLEAN(ctx, ctx->allocate_output(0, {nnz}, &idx_tensor)); 426 // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit 427 // the remaining code and just return false 428 auto idx_tensor_vec = idx_tensor->vec<int64>(); 429 for (int i = 0; i < nnz; ++i) { 430 idx_tensor_vec(i) = accum_idx_vec_->at(i); 431 } 432 return true; 433 } 434 ReturnValTensor(OpKernelContext * ctx)435 inline bool ReturnValTensor(OpKernelContext* ctx) { 436 ctx->set_output(1, *accum_val_); 437 return true; 438 } 439 ReturnShapeTensor(OpKernelContext * ctx)440 inline bool ReturnShapeTensor(OpKernelContext* ctx) { 441 int64 accum_val_dims = accum_val_->dims(); 442 Tensor* shape_tensor; 443 OP_REQUIRES_OK_BOOLEAN( 444 ctx, ctx->allocate_output(2, {accum_val_dims}, &shape_tensor)); 445 // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit 446 // the remaining code and just return false 447 448 // First dim of shape is defined by shape_, others by accum_val_->shape 449 shape_tensor->flat<int64>()(0) = 450 (shape_.dims() > 0) ? shape_.dim_size(0) : -1; 451 for (int64 i = 1; i < accum_val_dims; i++) { 452 shape_tensor->flat<int64>()(i) = accum_val_->dim_size(i); 453 } 454 return true; 455 } 456 457 TF_DISALLOW_COPY_AND_ASSIGN(SparseConditionalAccumulator); 458 }; 459 460 } // namespace tensorflow 461 462 #endif // TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ 463