1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
17 #define TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
18 
19 #include "tensorflow/core/kernels/typed_conditional_accumulator_base.h"
20 
21 namespace tensorflow {
22 
23 /**
24  * An aggregation object for adding sparse gradients, represented as a tuple of
25  * indices, values, and a (possibly empty) shape.
26  *
27  * The two main methods of this class are TryApplyGrad and TryTakeGrad.
28  *
29  * TryApplyGrad tries add a gradient to the accumulator. The attempt is
30  * successful if local_step >= global_step, i.e., if the gradient is not stale,
31  * having been computed using up-to-date information. Otherwise, the gradient is
32  * silently dropped.
33  *
34  * TryTakeGrad logs an attempt to read the average gradient. The attempt is
35  * blocked until the number of gradients accumulated (via TryApplyGrad) is equal
36  * or exceeds the number requested by TryTakeGrad.
37  * Once this condition is satisfied, the following actions are taken:
38  * (1) the value of the average gradient is returned
39  * (2) the count of accumulated gradients is reset to 0
40  * (3) the internal global_step value (current_global_step_) is incremented by 1
41  *
42  * SparseConditionalAccumulator is the datatype-dependent templated sub-class of
43  * ConditionalAccumulatorBase. It implements the virtual arithmetic methods that
44  * are used by for aggregating, averaging, allocating, returning indexed slices.
45  */
46 template <typename Device, typename T>
47 class SparseConditionalAccumulator
48     : public TypedConditionalAccumulatorBase<
49           std::tuple<const Tensor*, const Tensor*, const Tensor*>> {
50  public:
SparseConditionalAccumulator(const DataType & dtype,const PartialTensorShape & shape,const string & name,const string & reduction_type)51   SparseConditionalAccumulator(const DataType& dtype,
52                                const PartialTensorShape& shape,
53                                const string& name, const string& reduction_type)
54       : TypedConditionalAccumulatorBase<
55             std::tuple<const Tensor*, const Tensor*, const Tensor*>>(
56             dtype, shape, name, reduction_type) {
57     accum_idx_vec_ = nullptr;
58     count_element_ = nullptr;
59     accum_val_ = nullptr;
60     accum_val_persistent_ = new PersistentTensor();
61   }
62 
~SparseConditionalAccumulator()63   ~SparseConditionalAccumulator() override {
64     if (accum_idx_vec_ != nullptr) delete accum_idx_vec_;
65     if (count_element_ != nullptr) delete count_element_;
66     if (accum_val_persistent_ != nullptr) delete accum_val_persistent_;
67     // Do not delete accum_val_! Will be automatically garbage collected
68   };
69 
70  protected:
71   std::vector<int64>* accum_idx_vec_ = nullptr;
72   std::vector<int>* count_element_ = nullptr;
73 
74   Tensor* accum_val_ = nullptr;
75   PersistentTensor* accum_val_persistent_ = nullptr;
76 
77   typedef Eigen::TensorMap<Eigen::Tensor<T, 1, Eigen::RowMajor>,
78                            Eigen::Unaligned>
79       SliceT;
80   typedef Eigen::TensorMap<Eigen::Tensor<const T, 1, Eigen::RowMajor>,
81                            Eigen::Unaligned>
82       SliceConstT;
83 
ValidateShape(std::tuple<const Tensor *,const Tensor *,const Tensor * > * tensor,bool has_known_shape)84   Status ValidateShape(
85       std::tuple<const Tensor*, const Tensor*, const Tensor*>* tensor,
86       bool has_known_shape) EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
87     const Tensor* tensor_idx = std::get<0>(*tensor);
88     const Tensor* tensor_val = std::get<1>(*tensor);
89     const Tensor* tensor_shape = std::get<2>(*tensor);
90     int64 grad_val_dims = tensor_val->dims();
91     int64 grad_dims = grad_val_dims;
92 
93     // Compare with provided shape
94     if (has_known_shape) {
95       if (shape_.dims() > tensor_shape->NumElements()) {
96         return errors::InvalidArgument(
97             "Shape mismatch: expected shape rank at least ", shape_.dims(),
98             ", got ", tensor_shape->NumElements());
99       }
100       const auto tensor_shape_flat = tensor_shape->flat<int64>();
101       for (int64 i = 0; i < shape_.dims(); i++) {
102         if (shape_.dim_size(i) != -1 &&
103             shape_.dim_size(i) != tensor_shape_flat(i)) {
104           return errors::InvalidArgument("Shape mismatch: expected shape dim ",
105                                          i, " to be ", shape_.dim_size(i),
106                                          ", got ", tensor_shape_flat(i));
107         }
108       }
109     }
110     // Check that indices are within limits
111     if (shape_.dims() > 0 && shape_.dim_size(0) != -1 &&
112         tensor_idx->dims() > 0) {
113       for (int64 i = 0; i < tensor_idx->dim_size(0); i++) {
114         if (tensor_idx->vec<int64>()(i) >= shape_.dim_size(0)) {
115           return errors::InvalidArgument(
116               "Shape mismatch: index of slice ", i, " exceeded limits of shape",
117               "; index is ", tensor_idx->vec<int64>()(i), " exceeded ",
118               shape_.dim_size(0));
119         }
120       }
121     }
122 
123     // Check values compatibility with accumulated gradient if available
124     if (counter_ > 0) {
125       int64 accum_val_dims = accum_val_->dims();
126       if (accum_val_dims != grad_val_dims) {
127         return errors::InvalidArgument("Shape mismatch: expected values rank ",
128                                        accum_val_dims, ", got ", grad_val_dims);
129       }
130       for (int64 i = 1; i < accum_val_dims; i++) {
131         if (accum_val_->dim_size(i) != tensor_val->dim_size(i)) {
132           return errors::InvalidArgument("Shape mismatch: expected values dim ",
133                                          i, " to be ", accum_val_->dim_size(i),
134                                          ", got ", tensor_val->dim_size(i));
135         }
136       }
137     } else {
138       // If there are no accumulated gradients, check against shape_
139       if (shape_.dims() > grad_dims) {
140         return errors::InvalidArgument(
141             "Shape mismatch: expected values rank at least ", shape_.dims(),
142             ", got ", grad_dims);
143       }
144       // Check that values have correct dimensions
145       for (int64 i = 1; i < shape_.dims(); i++) {
146         if (shape_.dim_size(i) != -1 &&
147             shape_.dim_size(i) != tensor_val->dim_size(i)) {
148           return errors::InvalidArgument("Shape mismatch: expected values dim ",
149                                          i, " to be ", shape_.dim_size(i),
150                                          ", got ", tensor_val->dim_size(i));
151         }
152       }
153     }
154 
155     return Status::OK();
156   }
157 
AllocateAndAssignToAccumGradFunction(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > * grad)158   void AllocateAndAssignToAccumGradFunction(
159       OpKernelContext* ctx,
160       std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override {
161     const Tensor* grad_idx = std::get<0>(*grad);
162     const Tensor* grad_val = std::get<1>(*grad);
163 
164     const int64 nnz = grad_idx->dim_size(0);
165 
166     // Assign indices
167     if (accum_idx_vec_ != nullptr) delete accum_idx_vec_;
168     accum_idx_vec_ = new std::vector<int64>();
169     accum_idx_vec_->reserve(nnz);
170     for (int i = 0; i < nnz; i++) {
171       accum_idx_vec_->push_back(grad_idx->vec<int64>()(i));
172     }
173 
174     // Assign values to accum_val_tensor
175     // TODO(b/32704451): Don't just ignore the ::tensorflow::Status object!
176     ctx->allocate_persistent(dtype_, grad_val->shape(), accum_val_persistent_,
177                              &accum_val_)
178         .IgnoreError();
179     accum_val_->flat<T>().device(ctx->template eigen_device<Device>()) =
180         grad_val->flat<T>();
181 
182     // Assign count_element_
183     if (count_element_ != nullptr) {
184       delete count_element_;
185     }
186     count_element_ = new std::vector<int>(nnz, 1);
187 
188     // Do not need shape; Assume that the op has checked that the shapes match,
189     // so grad's shape == shape_
190   }
191 
AddToAccumGradFunction(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > * grad)192   void AddToAccumGradFunction(
193       OpKernelContext* ctx,
194       std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override {
195     // Modeled after third_party/tensorflow/core/kernels/sparse_add_op
196 
197     const Tensor* grad_idx = std::get<0>(*grad);
198     const Tensor* grad_val = std::get<1>(*grad);
199 
200     const int64 accum_nnz = accum_idx_vec_->size();
201     const int64 grad_nnz = grad_idx->dim_size(0);
202 
203     // Source enumerates the origin of a non-zero element: whether it is from
204     // the new gradient, the accumulated gradient, or the sum of both.
205     enum Source { from_accum, from_grad, from_accum_and_grad };
206 
207     // (1) do a pass over inputs, and append values and indices to vectors
208     std::vector<std::tuple<Source, int64, int64>> entries_to_copy;
209     entries_to_copy.reserve(accum_nnz + grad_nnz);
210 
211     // Pass over all non-zero elements of both the gradient and the accumulated
212     // value, to identify where each non-zero element of the sum comes from.
213     // The input and output indexed slices are assumed to be ordered along
214     // increasing dimension number.
215     int64 i = 0, j = 0;
216     int64 sum_nnz = 0;
217     while (i < accum_nnz && j < grad_nnz) {
218       sum_nnz++;
219       switch (cmp(accum_idx_vec_, grad_idx, i, j)) {
220         case -1:
221           entries_to_copy.emplace_back(from_accum, i, -1);
222           ++i;
223           break;
224         case 0:
225           entries_to_copy.emplace_back(from_accum_and_grad, i, j);
226           ++i;
227           ++j;
228           break;
229         case 1:
230           entries_to_copy.emplace_back(from_grad, -1, j);
231           ++j;
232           break;
233       }
234     }
235 
236     // Handle leftovers
237     while (i < accum_nnz) {
238       sum_nnz++;
239       entries_to_copy.emplace_back(from_accum, i, -1);
240       ++i;
241     }
242     while (j < grad_nnz) {
243       sum_nnz++;
244       entries_to_copy.emplace_back(from_grad, -1, j);
245       ++j;
246     }
247 
248     // (2) Copy or sum the non-zero elements into sum_indices and sum_tensor
249     std::vector<int64>* sum_indices_vec = new std::vector<int64>();
250     sum_indices_vec->reserve(sum_nnz);
251 
252     std::vector<int>* sum_counts = new std::vector<int>();
253     sum_counts->reserve(sum_nnz);
254 
255     Tensor* sum_tensor = nullptr;
256     PersistentTensor* tensor_sum_persistent = new PersistentTensor();
257 
258     TensorShape sum_shape = grad_val->shape();
259     sum_shape.set_dim(0, sum_nnz);
260 
261     OP_REQUIRES_OK(
262         ctx, ctx->allocate_persistent(dtype_, sum_shape, tensor_sum_persistent,
263                                       &sum_tensor));
264     auto sum_flat = sum_tensor->flat_outer_dims<T>();
265     auto accum_flat = accum_val_->flat_outer_dims<T>();
266     auto grad_flat = grad_val->flat_outer_dims<T>();
267 
268     const int64 num_col = grad_flat.dimension(1);
269 
270     Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(num_col);
271 
272     for (i = 0; i < sum_nnz; ++i) {
273       const Source src = std::get<0>(entries_to_copy[i]);
274       const int64 idx_a = std::get<1>(entries_to_copy[i]);
275       const int64 idx_b = std::get<2>(entries_to_copy[i]);
276       T* sum_slice_ptr = &sum_flat(i, 0);
277       SliceT sum_slice(sum_slice_ptr, slice_shape);
278       if (src == from_accum) {
279         // Element comes from accumulator; directly copy data structures over
280         sum_indices_vec->push_back(accum_idx_vec_->at(idx_a));
281         T* accum_slice_ptr = &accum_flat(idx_a, 0);
282         SliceT accum_slice(accum_slice_ptr, slice_shape);
283         sum_slice = accum_slice;
284         sum_counts->push_back(count_element_->at(idx_a));
285       } else if (src == from_accum_and_grad) {
286         // Element is a sum of accumulated value and new gradient;
287         // compute sum here
288         sum_indices_vec->push_back(accum_idx_vec_->at(idx_a));
289         const T* grad_slice_ptr = &grad_flat(idx_b, 0);
290         SliceConstT grad_slice(grad_slice_ptr, slice_shape);
291         T* accum_slice_ptr = &accum_flat(idx_a, 0);
292         SliceT accum_slice(accum_slice_ptr, slice_shape);
293         sum_slice = grad_slice + accum_slice;
294         sum_counts->push_back(count_element_->at(idx_a) + 1);
295       } else if (src == from_grad) {
296         // Element comes from new gradient; make a copy of indices and values
297         sum_indices_vec->push_back(grad_idx->vec<int64>()(idx_b));
298         const T* grad_slice_ptr = &grad_flat(idx_b, 0);
299         SliceConstT grad_slice(grad_slice_ptr, slice_shape);
300         sum_slice = grad_slice;
301         sum_counts->push_back(1);
302       }
303     }
304 
305     // (3) Keep output, i.e., switch pointers to point to new data structures
306     // representing the sum
307     // Indices
308     if (accum_idx_vec_ != nullptr) delete accum_idx_vec_;
309     accum_idx_vec_ = sum_indices_vec;
310     // Values
311     accum_val_ = sum_tensor;
312     delete accum_val_persistent_;
313     accum_val_persistent_ = tensor_sum_persistent;
314     // Counts
315     if (count_element_ != nullptr) delete count_element_;
316     count_element_ = sum_counts;
317 
318     // No need to copy shape, since shape remains the same after sum.
319   }
320 
DivideAccumGradByCounter(OpKernelContext * ctx)321   void DivideAccumGradByCounter(OpKernelContext* ctx) override
322       EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
323     const int64 nnz = count_element_->size();
324     auto accum_flat = accum_val_->flat_outer_dims<T>();
325     std::vector<T> count_typet;
326     std::transform(count_element_->begin(), count_element_->end(),
327                    std::back_inserter(count_typet),
328                    TypeConverter<T, int>::ConvertUToT);
329 
330     // Option 1: divide all by counter
331     /*
332     std::transform(
333         &accum_flat(0,0), &accum_flat(nnz,0), &accum_flat(0,0),
334         std::bind2nd(std::divides<T>(),
335                      TypeConverter<T, int>::ConvertUToT(this->counter_)));
336     */
337 
338     // Option 2: average element-wise
339     Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(accum_flat.dimension(1));
340     for (int64 i = 0; i < nnz; i++) {
341       T* accum_slice_ptr = &accum_flat(i, 0);
342       SliceT accum_slice(accum_slice_ptr, slice_shape);
343       accum_slice.device(ctx->template eigen_device<Device>()) =
344           accum_slice / count_typet[i];
345     }
346   }
347 
SetOutput(OpKernelContext * ctx)348   bool SetOutput(OpKernelContext* ctx) override {
349     bool is_successful = true;
350     if (is_successful) is_successful = ReturnIdxTensor(ctx);
351     if (is_successful) is_successful = ReturnValTensor(ctx);
352     if (is_successful) is_successful = ReturnShapeTensor(ctx);
353     return is_successful;
354   }
355 
GetAndValidateTensorInputForApplyGrad(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > ** tensor)356   bool GetAndValidateTensorInputForApplyGrad(
357       OpKernelContext* ctx,
358       std::tuple<const Tensor*, const Tensor*, const Tensor*>** tensor) override
359       EXCLUSIVE_LOCKS_REQUIRED(this->mu_) {
360     // TODO(xinghao, jmchen): The roundabout way of getting attr from
361     // OpKernelContext (instead of OpKernelConstruction) is a hack, and should
362     // be fixed if it affects efficiency.
363     bool has_known_shape = false;
364     OP_REQUIRES_OK_BOOLEAN(
365         ctx, GetNodeAttr(ctx->op_kernel().def(), "has_known_shape",
366                          &has_known_shape));
367 
368     // Get input gradient tensors
369     const Tensor* grad_idx_tensor;
370     OP_REQUIRES_OK_BOOLEAN(ctx,
371                            ctx->input("gradient_indices", &grad_idx_tensor));
372     const Tensor* grad_val_tensor;
373     OP_REQUIRES_OK_BOOLEAN(ctx,
374                            ctx->input("gradient_values", &grad_val_tensor));
375     const Tensor* grad_shape_tensor = nullptr;
376     if (has_known_shape) {
377       OP_REQUIRES_OK_BOOLEAN(ctx,
378                              ctx->input("gradient_shape", &grad_shape_tensor));
379     }
380 
381     // Checks
382     OP_REQUIRES_BOOLEAN(
383         ctx, TensorShapeUtils::IsVector(grad_idx_tensor->shape()),
384         errors::InvalidArgument(
385             "Input indices should be vector but received shape: ",
386             grad_idx_tensor->shape().DebugString()));
387     const int64 nnz = grad_idx_tensor->dim_size(0);
388     OP_REQUIRES_BOOLEAN(
389         ctx, grad_val_tensor->dims() > 0,
390         errors::InvalidArgument("Values cannot be 0-dimensional."));
391     OP_REQUIRES_BOOLEAN(ctx, grad_val_tensor->dim_size(0) == nnz,
392                         errors::InvalidArgument("Expected ", nnz,
393                                                 " non-empty input values, got ",
394                                                 grad_val_tensor->dim_size(0)));
395 
396     *tensor = new std::tuple<const Tensor*, const Tensor*, const Tensor*>(
397         grad_idx_tensor, grad_val_tensor, grad_shape_tensor);
398 
399     OP_REQUIRES_OK_BOOLEAN(ctx, this->ValidateShape(*tensor, has_known_shape));
400 
401     return true;
402   }
403 
CleanUpGradTensor(std::tuple<const Tensor *,const Tensor *,const Tensor * > * tensor)404   void CleanUpGradTensor(std::tuple<const Tensor*, const Tensor*,
405                                     const Tensor*>* tensor) override {
406     if (tensor != nullptr) delete tensor;
407   }
408 
409  private:
cmp(std::vector<int64> * a_idx,const Tensor * b_idx,const int64 a_row,const int64 b_row)410   inline int cmp(std::vector<int64>* a_idx, const Tensor* b_idx,
411                  const int64 a_row, const int64 b_row) {
412     const int64 a = a_idx->at(a_row);
413     const int64 b = b_idx->vec<int64>()(b_row);
414     if (a < b) {
415       return -1;
416     } else if (a > b) {
417       return 1;
418     }
419     return 0;
420   }
421 
ReturnIdxTensor(OpKernelContext * ctx)422   inline bool ReturnIdxTensor(OpKernelContext* ctx) {
423     Tensor* idx_tensor;
424     const int64 nnz = accum_idx_vec_->size();
425     OP_REQUIRES_OK_BOOLEAN(ctx, ctx->allocate_output(0, {nnz}, &idx_tensor));
426     // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit
427     // the remaining code and just return false
428     auto idx_tensor_vec = idx_tensor->vec<int64>();
429     for (int i = 0; i < nnz; ++i) {
430       idx_tensor_vec(i) = accum_idx_vec_->at(i);
431     }
432     return true;
433   }
434 
ReturnValTensor(OpKernelContext * ctx)435   inline bool ReturnValTensor(OpKernelContext* ctx) {
436     ctx->set_output(1, *accum_val_);
437     return true;
438   }
439 
ReturnShapeTensor(OpKernelContext * ctx)440   inline bool ReturnShapeTensor(OpKernelContext* ctx) {
441     int64 accum_val_dims = accum_val_->dims();
442     Tensor* shape_tensor;
443     OP_REQUIRES_OK_BOOLEAN(
444         ctx, ctx->allocate_output(2, {accum_val_dims}, &shape_tensor));
445     // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit
446     // the remaining code and just return false
447 
448     // First dim of shape is defined by shape_, others by accum_val_->shape
449     shape_tensor->flat<int64>()(0) =
450         (shape_.dims() > 0) ? shape_.dim_size(0) : -1;
451     for (int64 i = 1; i < accum_val_dims; i++) {
452       shape_tensor->flat<int64>()(i) = accum_val_->dim_size(i);
453     }
454     return true;
455   }
456 
457   TF_DISALLOW_COPY_AND_ASSIGN(SparseConditionalAccumulator);
458 };
459 
460 }  // namespace tensorflow
461 
462 #endif  // TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_
463