1 // Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 // =============================================================================
15 #include <algorithm>
16 #include <iterator>
17 #include <string>
18 #include <vector>
19 
20 #include "tensorflow/core/framework/device_base.h"
21 #include "tensorflow/core/framework/op_kernel.h"
22 #include "tensorflow/core/framework/resource_mgr.h"
23 #include "tensorflow/core/framework/tensor.h"
24 #include "tensorflow/core/framework/tensor_shape.h"
25 #include "tensorflow/core/framework/types.h"
26 #include "tensorflow/core/kernels/boosted_trees/quantiles/quantile_stream_resource.h"
27 #include "tensorflow/core/kernels/boosted_trees/quantiles/weighted_quantiles_stream.h"
28 #include "tensorflow/core/kernels/boosted_trees/quantiles/weighted_quantiles_summary.h"
29 #include "tensorflow/core/lib/core/errors.h"
30 #include "tensorflow/core/lib/core/status.h"
31 #include "tensorflow/core/lib/strings/stringprintf.h"
32 #include "tensorflow/core/platform/logging.h"
33 #include "tensorflow/core/platform/types.h"
34 #include "tensorflow/core/util/work_sharder.h"
35 
36 namespace tensorflow {
37 
38 const char* const kExampleWeightsName = "example_weights";
39 const char* const kMaxElementsName = "max_elements";
40 const char* const kGenerateQuantiles = "generate_quantiles";
41 const char* const kNumBucketsName = "num_buckets";
42 const char* const kEpsilonName = "epsilon";
43 const char* const kBucketBoundariesName = "bucket_boundaries";
44 const char* const kBucketsName = "buckets";
45 const char* const kSummariesName = "summaries";
46 const char* const kNumStreamsName = "num_streams";
47 const char* const kNumFeaturesName = "num_features";
48 const char* const kFloatFeaturesName = "float_values";
49 const char* const kResourceHandleName = "quantile_stream_resource_handle";
50 
51 using QuantileStreamResource = BoostedTreesQuantileStreamResource;
52 using QuantileStream =
53     boosted_trees::quantiles::WeightedQuantilesStream<float, float>;
54 using QuantileSummary =
55     boosted_trees::quantiles::WeightedQuantilesSummary<float, float>;
56 using QuantileSummaryEntry =
57     boosted_trees::quantiles::WeightedQuantilesSummary<float,
58                                                        float>::SummaryEntry;
59 
60 // Generates quantiles on a finalized QuantileStream.
GenerateBoundaries(const QuantileStream & stream,const int64 num_boundaries)61 std::vector<float> GenerateBoundaries(const QuantileStream& stream,
62                                       const int64 num_boundaries) {
63   std::vector<float> boundaries = stream.GenerateBoundaries(num_boundaries);
64 
65   // Uniquify elements as we may get dupes.
66   auto end_it = std::unique(boundaries.begin(), boundaries.end());
67   boundaries.resize(std::distance(boundaries.begin(), end_it));
68   return boundaries;
69 }
70 
71 // Generates quantiles on a finalized QuantileStream.
GenerateQuantiles(const QuantileStream & stream,const int64 num_quantiles)72 std::vector<float> GenerateQuantiles(const QuantileStream& stream,
73                                      const int64 num_quantiles) {
74   // Do not de-dup boundaries. Exactly num_quantiles+1 boundary values
75   // will be returned.
76   std::vector<float> boundaries = stream.GenerateQuantiles(num_quantiles - 1);
77   CHECK_EQ(boundaries.size(), num_quantiles);
78   return boundaries;
79 }
80 
GetBuckets(const int32 feature,const OpInputList & buckets_list)81 std::vector<float> GetBuckets(const int32 feature,
82                               const OpInputList& buckets_list) {
83   const auto& buckets = buckets_list[feature].flat<float>();
84   std::vector<float> buckets_vector(buckets.data(),
85                                     buckets.data() + buckets.size());
86   return buckets_vector;
87 }
88 
89 REGISTER_RESOURCE_HANDLE_KERNEL(BoostedTreesQuantileStreamResource);
90 
91 REGISTER_KERNEL_BUILDER(
92     Name("IsBoostedTreesQuantileStreamResourceInitialized").Device(DEVICE_CPU),
93     IsResourceInitialized<BoostedTreesQuantileStreamResource>);
94 
95 class BoostedTreesCreateQuantileStreamResourceOp : public OpKernel {
96  public:
BoostedTreesCreateQuantileStreamResourceOp(OpKernelConstruction * const context)97   explicit BoostedTreesCreateQuantileStreamResourceOp(
98       OpKernelConstruction* const context)
99       : OpKernel(context) {
100     OP_REQUIRES_OK(context, context->GetAttr(kMaxElementsName, &max_elements_));
101   }
102 
Compute(OpKernelContext * context)103   void Compute(OpKernelContext* context) override {
104     // Only create one, if one does not exist already. Report status for all
105     // other exceptions. If one already exists, it unrefs the new one.
106     // An epsilon value of zero could cause perfoamance issues and is therefore,
107     // disallowed.
108     const Tensor* epsilon_t;
109     OP_REQUIRES_OK(context, context->input(kEpsilonName, &epsilon_t));
110     float epsilon = epsilon_t->scalar<float>()();
111     OP_REQUIRES(
112         context, epsilon > 0,
113         errors::InvalidArgument("An epsilon value of zero is not allowed."));
114 
115     const Tensor* num_streams_t;
116     OP_REQUIRES_OK(context, context->input(kNumStreamsName, &num_streams_t));
117     int64 num_streams = num_streams_t->scalar<int64>()();
118 
119     auto result =
120         new QuantileStreamResource(epsilon, max_elements_, num_streams);
121     auto status = CreateResource(context, HandleFromInput(context, 0), result);
122     if (!status.ok() && status.code() != tensorflow::error::ALREADY_EXISTS) {
123       OP_REQUIRES(context, false, status);
124     }
125   }
126 
127  private:
128   // An upper bound on the number of entries that the summaries might have
129   // for a feature.
130   int64 max_elements_;
131 };
132 
133 REGISTER_KERNEL_BUILDER(
134     Name("BoostedTreesCreateQuantileStreamResource").Device(DEVICE_CPU),
135     BoostedTreesCreateQuantileStreamResourceOp);
136 
137 class BoostedTreesMakeQuantileSummariesOp : public OpKernel {
138  public:
BoostedTreesMakeQuantileSummariesOp(OpKernelConstruction * const context)139   explicit BoostedTreesMakeQuantileSummariesOp(
140       OpKernelConstruction* const context)
141       : OpKernel(context) {
142     OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_));
143   }
144 
Compute(OpKernelContext * const context)145   void Compute(OpKernelContext* const context) override {
146     // Read float features list;
147     OpInputList float_features_list;
148     OP_REQUIRES_OK(
149         context, context->input_list(kFloatFeaturesName, &float_features_list));
150 
151     // Parse example weights and get batch size.
152     const Tensor* example_weights_t;
153     OP_REQUIRES_OK(context,
154                    context->input(kExampleWeightsName, &example_weights_t));
155     DCHECK(float_features_list.size() > 0) << "Got empty feature list";
156     auto example_weights = example_weights_t->flat<float>();
157     const int64 weight_size = example_weights.size();
158     const int64 batch_size = float_features_list[0].flat<float>().size();
159     OP_REQUIRES(
160         context, weight_size == 1 || weight_size == batch_size,
161         errors::InvalidArgument(strings::Printf(
162             "Weights should be a single value or same size as features.")));
163     const Tensor* epsilon_t;
164     OP_REQUIRES_OK(context, context->input(kEpsilonName, &epsilon_t));
165     float epsilon = epsilon_t->scalar<float>()();
166 
167     OpOutputList summaries_output_list;
168     OP_REQUIRES_OK(
169         context, context->output_list(kSummariesName, &summaries_output_list));
170 
171     auto do_quantile_summary_gen = [&](const int64 begin, const int64 end) {
172       // Iterating features.
173       for (int64 index = begin; index < end; index++) {
174         const auto feature_values = float_features_list[index].flat<float>();
175         QuantileStream stream(epsilon, batch_size + 1);
176         // Run quantile summary generation.
177         for (int64 j = 0; j < batch_size; j++) {
178           stream.PushEntry(feature_values(j), (weight_size > 1)
179                                                   ? example_weights(j)
180                                                   : example_weights(0));
181         }
182         stream.Finalize();
183         const auto summary_entry_list = stream.GetFinalSummary().GetEntryList();
184         Tensor* output_t;
185         OP_REQUIRES_OK(
186             context,
187             summaries_output_list.allocate(
188                 index,
189                 TensorShape({static_cast<int64>(summary_entry_list.size()), 4}),
190                 &output_t));
191         auto output = output_t->matrix<float>();
192         for (auto row = 0; row < summary_entry_list.size(); row++) {
193           const auto& entry = summary_entry_list[row];
194           output(row, 0) = entry.value;
195           output(row, 1) = entry.weight;
196           output(row, 2) = entry.min_rank;
197           output(row, 3) = entry.max_rank;
198         }
199       }
200     };
201     // TODO(tanzheny): comment on the magic number.
202     const int64 kCostPerUnit = 500 * batch_size;
203     const DeviceBase::CpuWorkerThreads& worker_threads =
204         *context->device()->tensorflow_cpu_worker_threads();
205     Shard(worker_threads.num_threads, worker_threads.workers, num_features_,
206           kCostPerUnit, do_quantile_summary_gen);
207   }
208 
209  private:
210   int64 num_features_;
211 };
212 
213 REGISTER_KERNEL_BUILDER(
214     Name("BoostedTreesMakeQuantileSummaries").Device(DEVICE_CPU),
215     BoostedTreesMakeQuantileSummariesOp);
216 
217 class BoostedTreesQuantileStreamResourceAddSummariesOp : public OpKernel {
218  public:
BoostedTreesQuantileStreamResourceAddSummariesOp(OpKernelConstruction * const context)219   explicit BoostedTreesQuantileStreamResourceAddSummariesOp(
220       OpKernelConstruction* const context)
221       : OpKernel(context) {}
222 
Compute(OpKernelContext * context)223   void Compute(OpKernelContext* context) override {
224     ResourceHandle handle;
225     OP_REQUIRES_OK(context,
226                    HandleFromInput(context, kResourceHandleName, &handle));
227     QuantileStreamResource* stream_resource;
228     // Create a reference to the underlying resource using the handle.
229     OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource));
230     // Remove the reference at the end of this scope.
231     mutex_lock l(*stream_resource->mutex());
232     core::ScopedUnref unref_me(stream_resource);
233 
234     OpInputList summaries_list;
235     OP_REQUIRES_OK(context,
236                    context->input_list(kSummariesName, &summaries_list));
237     int32 num_streams = stream_resource->num_streams();
238     CHECK_EQ(static_cast<int>(num_streams), summaries_list.size());
239 
240     auto do_quantile_add_summary = [&](const int64 begin, const int64 end) {
241       // Iterating all features.
242       for (int64 feature_idx = begin; feature_idx < end; ++feature_idx) {
243         const Tensor& summaries = summaries_list[feature_idx];
244         const auto summary_values = summaries.matrix<float>();
245         const auto& tensor_shape = summaries.shape();
246         const int64 entries_size = tensor_shape.dim_size(0);
247         CHECK_EQ(tensor_shape.dim_size(1), 4);
248         std::vector<QuantileSummaryEntry> summary_entries;
249         summary_entries.reserve(entries_size);
250         for (int64 i = 0; i < entries_size; i++) {
251           float value = summary_values(i, 0);
252           float weight = summary_values(i, 1);
253           float min_rank = summary_values(i, 2);
254           float max_rank = summary_values(i, 3);
255           QuantileSummaryEntry entry(value, weight, min_rank, max_rank);
256           summary_entries.push_back(entry);
257         }
258         stream_resource->stream(feature_idx)->PushSummary(summary_entries);
259       }
260     };
261 
262     // TODO(tanzheny): comment on the magic number.
263     const int64 kCostPerUnit = 500 * num_streams;
264     const DeviceBase::CpuWorkerThreads& worker_threads =
265         *context->device()->tensorflow_cpu_worker_threads();
266     Shard(worker_threads.num_threads, worker_threads.workers, num_streams,
267           kCostPerUnit, do_quantile_add_summary);
268   }
269 };
270 
271 REGISTER_KERNEL_BUILDER(
272     Name("BoostedTreesQuantileStreamResourceAddSummaries").Device(DEVICE_CPU),
273     BoostedTreesQuantileStreamResourceAddSummariesOp);
274 
275 class BoostedTreesQuantileStreamResourceDeserializeOp : public OpKernel {
276  public:
BoostedTreesQuantileStreamResourceDeserializeOp(OpKernelConstruction * const context)277   explicit BoostedTreesQuantileStreamResourceDeserializeOp(
278       OpKernelConstruction* const context)
279       : OpKernel(context) {
280     OP_REQUIRES_OK(context, context->GetAttr(kNumStreamsName, &num_features_));
281   }
282 
Compute(OpKernelContext * context)283   void Compute(OpKernelContext* context) override {
284     QuantileStreamResource* streams_resource;
285     // Create a reference to the underlying resource using the handle.
286     OP_REQUIRES_OK(context, LookupResource(context, HandleFromInput(context, 0),
287                                            &streams_resource));
288     // Remove the reference at the end of this scope.
289     mutex_lock l(*streams_resource->mutex());
290     core::ScopedUnref unref_me(streams_resource);
291 
292     OpInputList bucket_boundaries_list;
293     OP_REQUIRES_OK(context, context->input_list(kBucketBoundariesName,
294                                                 &bucket_boundaries_list));
295 
296     auto do_quantile_deserialize = [&](const int64 begin, const int64 end) {
297       // Iterating over all streams.
298       for (int64 stream_idx = begin; stream_idx < end; stream_idx++) {
299         const Tensor& bucket_boundaries_t = bucket_boundaries_list[stream_idx];
300         const auto& bucket_boundaries = bucket_boundaries_t.vec<float>();
301         std::vector<float> result;
302         result.reserve(bucket_boundaries.size());
303         for (size_t i = 0; i < bucket_boundaries.size(); ++i) {
304           result.push_back(bucket_boundaries(i));
305         }
306         streams_resource->set_boundaries(result, stream_idx);
307       }
308     };
309 
310     // TODO(tanzheny): comment on the magic number.
311     const int64 kCostPerUnit = 500 * num_features_;
312     const DeviceBase::CpuWorkerThreads& worker_threads =
313         *context->device()->tensorflow_cpu_worker_threads();
314     Shard(worker_threads.num_threads, worker_threads.workers, num_features_,
315           kCostPerUnit, do_quantile_deserialize);
316   }
317 
318  private:
319   int64 num_features_;
320 };
321 
322 REGISTER_KERNEL_BUILDER(
323     Name("BoostedTreesQuantileStreamResourceDeserialize").Device(DEVICE_CPU),
324     BoostedTreesQuantileStreamResourceDeserializeOp);
325 
326 class BoostedTreesQuantileStreamResourceFlushOp : public OpKernel {
327  public:
BoostedTreesQuantileStreamResourceFlushOp(OpKernelConstruction * const context)328   explicit BoostedTreesQuantileStreamResourceFlushOp(
329       OpKernelConstruction* const context)
330       : OpKernel(context) {
331     OP_REQUIRES_OK(context,
332                    context->GetAttr(kGenerateQuantiles, &generate_quantiles_));
333   }
334 
Compute(OpKernelContext * context)335   void Compute(OpKernelContext* context) override {
336     ResourceHandle handle;
337     OP_REQUIRES_OK(context,
338                    HandleFromInput(context, kResourceHandleName, &handle));
339     QuantileStreamResource* stream_resource;
340     // Create a reference to the underlying resource using the handle.
341     OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource));
342     // Remove the reference at the end of this scope.
343     mutex_lock l(*stream_resource->mutex());
344     core::ScopedUnref unref_me(stream_resource);
345 
346     const Tensor* num_buckets_t;
347     OP_REQUIRES_OK(context, context->input(kNumBucketsName, &num_buckets_t));
348     const int64 num_buckets = num_buckets_t->scalar<int64>()();
349     const int64 num_streams = stream_resource->num_streams();
350 
351     auto do_quantile_flush = [&](const int64 begin, const int64 end) {
352       // Iterating over all streams.
353       for (int64 stream_idx = begin; stream_idx < end; ++stream_idx) {
354         QuantileStream* stream = stream_resource->stream(stream_idx);
355         stream->Finalize();
356         stream_resource->set_boundaries(
357             generate_quantiles_ ? GenerateQuantiles(*stream, num_buckets)
358                                 : GenerateBoundaries(*stream, num_buckets),
359             stream_idx);
360       }
361     };
362 
363     // TODO(tanzheny): comment on the magic number.
364     const int64 kCostPerUnit = 500 * num_streams;
365     const DeviceBase::CpuWorkerThreads& worker_threads =
366         *context->device()->tensorflow_cpu_worker_threads();
367     Shard(worker_threads.num_threads, worker_threads.workers, num_streams,
368           kCostPerUnit, do_quantile_flush);
369 
370     stream_resource->set_buckets_ready(true);
371   }
372 
373  private:
374   bool generate_quantiles_;
375 };
376 
377 REGISTER_KERNEL_BUILDER(
378     Name("BoostedTreesQuantileStreamResourceFlush").Device(DEVICE_CPU),
379     BoostedTreesQuantileStreamResourceFlushOp);
380 
381 class BoostedTreesQuantileStreamResourceGetBucketBoundariesOp
382     : public OpKernel {
383  public:
BoostedTreesQuantileStreamResourceGetBucketBoundariesOp(OpKernelConstruction * const context)384   explicit BoostedTreesQuantileStreamResourceGetBucketBoundariesOp(
385       OpKernelConstruction* const context)
386       : OpKernel(context) {
387     OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_));
388   }
389 
Compute(OpKernelContext * const context)390   void Compute(OpKernelContext* const context) override {
391     ResourceHandle handle;
392     OP_REQUIRES_OK(context,
393                    HandleFromInput(context, kResourceHandleName, &handle));
394     QuantileStreamResource* stream_resource;
395     // Create a reference to the underlying resource using the handle.
396     OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource));
397     // Remove the reference at the end of this scope.
398     mutex_lock l(*stream_resource->mutex());
399     core::ScopedUnref unref_me(stream_resource);
400 
401     const int64 num_streams = stream_resource->num_streams();
402     CHECK_EQ(num_features_, num_streams);
403     OpOutputList bucket_boundaries_list;
404     OP_REQUIRES_OK(context, context->output_list(kBucketBoundariesName,
405                                                  &bucket_boundaries_list));
406 
407     auto do_quantile_get_buckets = [&](const int64 begin, const int64 end) {
408       // Iterating over all streams.
409       for (int64 stream_idx = begin; stream_idx < end; stream_idx++) {
410         const auto& boundaries = stream_resource->boundaries(stream_idx);
411         Tensor* bucket_boundaries_t = nullptr;
412         OP_REQUIRES_OK(context,
413                        bucket_boundaries_list.allocate(
414                            stream_idx, {static_cast<int64>(boundaries.size())},
415                            &bucket_boundaries_t));
416         auto* quantiles_flat = bucket_boundaries_t->flat<float>().data();
417         memcpy(quantiles_flat, boundaries.data(),
418                sizeof(float) * boundaries.size());
419       }
420     };
421 
422     // TODO(tanzheny): comment on the magic number.
423     const int64 kCostPerUnit = 500 * num_streams;
424     const DeviceBase::CpuWorkerThreads& worker_threads =
425         *context->device()->tensorflow_cpu_worker_threads();
426     Shard(worker_threads.num_threads, worker_threads.workers, num_streams,
427           kCostPerUnit, do_quantile_get_buckets);
428   }
429 
430  private:
431   int64 num_features_;
432 };
433 
434 REGISTER_KERNEL_BUILDER(
435     Name("BoostedTreesQuantileStreamResourceGetBucketBoundaries")
436         .Device(DEVICE_CPU),
437     BoostedTreesQuantileStreamResourceGetBucketBoundariesOp);
438 
439 // Given the calculated quantiles thresholds and input data, this operation
440 // converts the input features into the buckets (categorical values), depending
441 // on which quantile they fall into.
442 class BoostedTreesBucketizeOp : public OpKernel {
443  public:
BoostedTreesBucketizeOp(OpKernelConstruction * const context)444   explicit BoostedTreesBucketizeOp(OpKernelConstruction* const context)
445       : OpKernel(context) {
446     OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_));
447   }
448 
Compute(OpKernelContext * const context)449   void Compute(OpKernelContext* const context) override {
450     // Read float features list;
451     OpInputList float_features_list;
452     OP_REQUIRES_OK(
453         context, context->input_list(kFloatFeaturesName, &float_features_list));
454     OpInputList bucket_boundaries_list;
455     OP_REQUIRES_OK(context, context->input_list(kBucketBoundariesName,
456                                                 &bucket_boundaries_list));
457     OP_REQUIRES(context,
458                 tensorflow::TensorShapeUtils::IsVector(
459                     bucket_boundaries_list[0].shape()),
460                 errors::InvalidArgument(
461                     strings::Printf("Buckets should be flat vectors.")));
462     OpOutputList buckets_list;
463     OP_REQUIRES_OK(context, context->output_list(kBucketsName, &buckets_list));
464 
465     auto do_quantile_get_quantiles = [&](const int64 begin, const int64 end) {
466       // Iterating over all resources
467       for (int64 feature_idx = begin; feature_idx < end; feature_idx++) {
468         const Tensor& values_tensor = float_features_list[feature_idx];
469         const int64 num_values = values_tensor.dim_size(0);
470 
471         Tensor* output_t = nullptr;
472         OP_REQUIRES_OK(context,
473                        buckets_list.allocate(
474                            feature_idx, TensorShape({num_values}), &output_t));
475         auto output = output_t->flat<int32>();
476 
477         const std::vector<float>& bucket_boundaries_vector =
478             GetBuckets(feature_idx, bucket_boundaries_list);
479         auto flat_values = values_tensor.flat<float>();
480         const auto& iter_begin = bucket_boundaries_vector.begin();
481         const auto& iter_end = bucket_boundaries_vector.end();
482         for (int64 instance = 0; instance < num_values; instance++) {
483           if (iter_begin == iter_end) {
484             output(instance) = 0;
485             continue;
486           }
487           const float value = flat_values(instance);
488           auto bucket_iter = std::lower_bound(iter_begin, iter_end, value);
489           if (bucket_iter == iter_end) {
490             --bucket_iter;
491           }
492           const int32 bucket = static_cast<int32>(bucket_iter - iter_begin);
493           // Bucket id.
494           output(instance) = bucket;
495         }
496       }
497     };
498 
499     // TODO(tanzheny): comment on the magic number.
500     const int64 kCostPerUnit = 500 * num_features_;
501     const DeviceBase::CpuWorkerThreads& worker_threads =
502         *context->device()->tensorflow_cpu_worker_threads();
503     Shard(worker_threads.num_threads, worker_threads.workers, num_features_,
504           kCostPerUnit, do_quantile_get_quantiles);
505   }
506 
507  private:
508   int64 num_features_;
509 };
510 
511 REGISTER_KERNEL_BUILDER(Name("BoostedTreesBucketize").Device(DEVICE_CPU),
512                         BoostedTreesBucketizeOp);
513 
514 }  // namespace tensorflow
515