1 // Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 // =============================================================================
15 #include <algorithm>
16 #include <iterator>
17 #include <string>
18 #include <vector>
19
20 #include "tensorflow/core/framework/device_base.h"
21 #include "tensorflow/core/framework/op_kernel.h"
22 #include "tensorflow/core/framework/resource_mgr.h"
23 #include "tensorflow/core/framework/tensor.h"
24 #include "tensorflow/core/framework/tensor_shape.h"
25 #include "tensorflow/core/framework/types.h"
26 #include "tensorflow/core/kernels/boosted_trees/quantiles/quantile_stream_resource.h"
27 #include "tensorflow/core/kernels/boosted_trees/quantiles/weighted_quantiles_stream.h"
28 #include "tensorflow/core/kernels/boosted_trees/quantiles/weighted_quantiles_summary.h"
29 #include "tensorflow/core/lib/core/errors.h"
30 #include "tensorflow/core/lib/core/status.h"
31 #include "tensorflow/core/lib/strings/stringprintf.h"
32 #include "tensorflow/core/platform/logging.h"
33 #include "tensorflow/core/platform/types.h"
34 #include "tensorflow/core/util/work_sharder.h"
35
36 namespace tensorflow {
37
38 const char* const kExampleWeightsName = "example_weights";
39 const char* const kMaxElementsName = "max_elements";
40 const char* const kGenerateQuantiles = "generate_quantiles";
41 const char* const kNumBucketsName = "num_buckets";
42 const char* const kEpsilonName = "epsilon";
43 const char* const kBucketBoundariesName = "bucket_boundaries";
44 const char* const kBucketsName = "buckets";
45 const char* const kSummariesName = "summaries";
46 const char* const kNumStreamsName = "num_streams";
47 const char* const kNumFeaturesName = "num_features";
48 const char* const kFloatFeaturesName = "float_values";
49 const char* const kResourceHandleName = "quantile_stream_resource_handle";
50
51 using QuantileStreamResource = BoostedTreesQuantileStreamResource;
52 using QuantileStream =
53 boosted_trees::quantiles::WeightedQuantilesStream<float, float>;
54 using QuantileSummary =
55 boosted_trees::quantiles::WeightedQuantilesSummary<float, float>;
56 using QuantileSummaryEntry =
57 boosted_trees::quantiles::WeightedQuantilesSummary<float,
58 float>::SummaryEntry;
59
60 // Generates quantiles on a finalized QuantileStream.
GenerateBoundaries(const QuantileStream & stream,const int64 num_boundaries)61 std::vector<float> GenerateBoundaries(const QuantileStream& stream,
62 const int64 num_boundaries) {
63 std::vector<float> boundaries = stream.GenerateBoundaries(num_boundaries);
64
65 // Uniquify elements as we may get dupes.
66 auto end_it = std::unique(boundaries.begin(), boundaries.end());
67 boundaries.resize(std::distance(boundaries.begin(), end_it));
68 return boundaries;
69 }
70
71 // Generates quantiles on a finalized QuantileStream.
GenerateQuantiles(const QuantileStream & stream,const int64 num_quantiles)72 std::vector<float> GenerateQuantiles(const QuantileStream& stream,
73 const int64 num_quantiles) {
74 // Do not de-dup boundaries. Exactly num_quantiles+1 boundary values
75 // will be returned.
76 std::vector<float> boundaries = stream.GenerateQuantiles(num_quantiles - 1);
77 CHECK_EQ(boundaries.size(), num_quantiles);
78 return boundaries;
79 }
80
GetBuckets(const int32 feature,const OpInputList & buckets_list)81 std::vector<float> GetBuckets(const int32 feature,
82 const OpInputList& buckets_list) {
83 const auto& buckets = buckets_list[feature].flat<float>();
84 std::vector<float> buckets_vector(buckets.data(),
85 buckets.data() + buckets.size());
86 return buckets_vector;
87 }
88
89 REGISTER_RESOURCE_HANDLE_KERNEL(BoostedTreesQuantileStreamResource);
90
91 REGISTER_KERNEL_BUILDER(
92 Name("IsBoostedTreesQuantileStreamResourceInitialized").Device(DEVICE_CPU),
93 IsResourceInitialized<BoostedTreesQuantileStreamResource>);
94
95 class BoostedTreesCreateQuantileStreamResourceOp : public OpKernel {
96 public:
BoostedTreesCreateQuantileStreamResourceOp(OpKernelConstruction * const context)97 explicit BoostedTreesCreateQuantileStreamResourceOp(
98 OpKernelConstruction* const context)
99 : OpKernel(context) {
100 OP_REQUIRES_OK(context, context->GetAttr(kMaxElementsName, &max_elements_));
101 }
102
Compute(OpKernelContext * context)103 void Compute(OpKernelContext* context) override {
104 // Only create one, if one does not exist already. Report status for all
105 // other exceptions. If one already exists, it unrefs the new one.
106 // An epsilon value of zero could cause perfoamance issues and is therefore,
107 // disallowed.
108 const Tensor* epsilon_t;
109 OP_REQUIRES_OK(context, context->input(kEpsilonName, &epsilon_t));
110 float epsilon = epsilon_t->scalar<float>()();
111 OP_REQUIRES(
112 context, epsilon > 0,
113 errors::InvalidArgument("An epsilon value of zero is not allowed."));
114
115 const Tensor* num_streams_t;
116 OP_REQUIRES_OK(context, context->input(kNumStreamsName, &num_streams_t));
117 int64 num_streams = num_streams_t->scalar<int64>()();
118
119 auto result =
120 new QuantileStreamResource(epsilon, max_elements_, num_streams);
121 auto status = CreateResource(context, HandleFromInput(context, 0), result);
122 if (!status.ok() && status.code() != tensorflow::error::ALREADY_EXISTS) {
123 OP_REQUIRES(context, false, status);
124 }
125 }
126
127 private:
128 // An upper bound on the number of entries that the summaries might have
129 // for a feature.
130 int64 max_elements_;
131 };
132
133 REGISTER_KERNEL_BUILDER(
134 Name("BoostedTreesCreateQuantileStreamResource").Device(DEVICE_CPU),
135 BoostedTreesCreateQuantileStreamResourceOp);
136
137 class BoostedTreesMakeQuantileSummariesOp : public OpKernel {
138 public:
BoostedTreesMakeQuantileSummariesOp(OpKernelConstruction * const context)139 explicit BoostedTreesMakeQuantileSummariesOp(
140 OpKernelConstruction* const context)
141 : OpKernel(context) {
142 OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_));
143 }
144
Compute(OpKernelContext * const context)145 void Compute(OpKernelContext* const context) override {
146 // Read float features list;
147 OpInputList float_features_list;
148 OP_REQUIRES_OK(
149 context, context->input_list(kFloatFeaturesName, &float_features_list));
150
151 // Parse example weights and get batch size.
152 const Tensor* example_weights_t;
153 OP_REQUIRES_OK(context,
154 context->input(kExampleWeightsName, &example_weights_t));
155 DCHECK(float_features_list.size() > 0) << "Got empty feature list";
156 auto example_weights = example_weights_t->flat<float>();
157 const int64 weight_size = example_weights.size();
158 const int64 batch_size = float_features_list[0].flat<float>().size();
159 OP_REQUIRES(
160 context, weight_size == 1 || weight_size == batch_size,
161 errors::InvalidArgument(strings::Printf(
162 "Weights should be a single value or same size as features.")));
163 const Tensor* epsilon_t;
164 OP_REQUIRES_OK(context, context->input(kEpsilonName, &epsilon_t));
165 float epsilon = epsilon_t->scalar<float>()();
166
167 OpOutputList summaries_output_list;
168 OP_REQUIRES_OK(
169 context, context->output_list(kSummariesName, &summaries_output_list));
170
171 auto do_quantile_summary_gen = [&](const int64 begin, const int64 end) {
172 // Iterating features.
173 for (int64 index = begin; index < end; index++) {
174 const auto feature_values = float_features_list[index].flat<float>();
175 QuantileStream stream(epsilon, batch_size + 1);
176 // Run quantile summary generation.
177 for (int64 j = 0; j < batch_size; j++) {
178 stream.PushEntry(feature_values(j), (weight_size > 1)
179 ? example_weights(j)
180 : example_weights(0));
181 }
182 stream.Finalize();
183 const auto summary_entry_list = stream.GetFinalSummary().GetEntryList();
184 Tensor* output_t;
185 OP_REQUIRES_OK(
186 context,
187 summaries_output_list.allocate(
188 index,
189 TensorShape({static_cast<int64>(summary_entry_list.size()), 4}),
190 &output_t));
191 auto output = output_t->matrix<float>();
192 for (auto row = 0; row < summary_entry_list.size(); row++) {
193 const auto& entry = summary_entry_list[row];
194 output(row, 0) = entry.value;
195 output(row, 1) = entry.weight;
196 output(row, 2) = entry.min_rank;
197 output(row, 3) = entry.max_rank;
198 }
199 }
200 };
201 // TODO(tanzheny): comment on the magic number.
202 const int64 kCostPerUnit = 500 * batch_size;
203 const DeviceBase::CpuWorkerThreads& worker_threads =
204 *context->device()->tensorflow_cpu_worker_threads();
205 Shard(worker_threads.num_threads, worker_threads.workers, num_features_,
206 kCostPerUnit, do_quantile_summary_gen);
207 }
208
209 private:
210 int64 num_features_;
211 };
212
213 REGISTER_KERNEL_BUILDER(
214 Name("BoostedTreesMakeQuantileSummaries").Device(DEVICE_CPU),
215 BoostedTreesMakeQuantileSummariesOp);
216
217 class BoostedTreesQuantileStreamResourceAddSummariesOp : public OpKernel {
218 public:
BoostedTreesQuantileStreamResourceAddSummariesOp(OpKernelConstruction * const context)219 explicit BoostedTreesQuantileStreamResourceAddSummariesOp(
220 OpKernelConstruction* const context)
221 : OpKernel(context) {}
222
Compute(OpKernelContext * context)223 void Compute(OpKernelContext* context) override {
224 ResourceHandle handle;
225 OP_REQUIRES_OK(context,
226 HandleFromInput(context, kResourceHandleName, &handle));
227 QuantileStreamResource* stream_resource;
228 // Create a reference to the underlying resource using the handle.
229 OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource));
230 // Remove the reference at the end of this scope.
231 mutex_lock l(*stream_resource->mutex());
232 core::ScopedUnref unref_me(stream_resource);
233
234 OpInputList summaries_list;
235 OP_REQUIRES_OK(context,
236 context->input_list(kSummariesName, &summaries_list));
237 int32 num_streams = stream_resource->num_streams();
238 CHECK_EQ(static_cast<int>(num_streams), summaries_list.size());
239
240 auto do_quantile_add_summary = [&](const int64 begin, const int64 end) {
241 // Iterating all features.
242 for (int64 feature_idx = begin; feature_idx < end; ++feature_idx) {
243 const Tensor& summaries = summaries_list[feature_idx];
244 const auto summary_values = summaries.matrix<float>();
245 const auto& tensor_shape = summaries.shape();
246 const int64 entries_size = tensor_shape.dim_size(0);
247 CHECK_EQ(tensor_shape.dim_size(1), 4);
248 std::vector<QuantileSummaryEntry> summary_entries;
249 summary_entries.reserve(entries_size);
250 for (int64 i = 0; i < entries_size; i++) {
251 float value = summary_values(i, 0);
252 float weight = summary_values(i, 1);
253 float min_rank = summary_values(i, 2);
254 float max_rank = summary_values(i, 3);
255 QuantileSummaryEntry entry(value, weight, min_rank, max_rank);
256 summary_entries.push_back(entry);
257 }
258 stream_resource->stream(feature_idx)->PushSummary(summary_entries);
259 }
260 };
261
262 // TODO(tanzheny): comment on the magic number.
263 const int64 kCostPerUnit = 500 * num_streams;
264 const DeviceBase::CpuWorkerThreads& worker_threads =
265 *context->device()->tensorflow_cpu_worker_threads();
266 Shard(worker_threads.num_threads, worker_threads.workers, num_streams,
267 kCostPerUnit, do_quantile_add_summary);
268 }
269 };
270
271 REGISTER_KERNEL_BUILDER(
272 Name("BoostedTreesQuantileStreamResourceAddSummaries").Device(DEVICE_CPU),
273 BoostedTreesQuantileStreamResourceAddSummariesOp);
274
275 class BoostedTreesQuantileStreamResourceDeserializeOp : public OpKernel {
276 public:
BoostedTreesQuantileStreamResourceDeserializeOp(OpKernelConstruction * const context)277 explicit BoostedTreesQuantileStreamResourceDeserializeOp(
278 OpKernelConstruction* const context)
279 : OpKernel(context) {
280 OP_REQUIRES_OK(context, context->GetAttr(kNumStreamsName, &num_features_));
281 }
282
Compute(OpKernelContext * context)283 void Compute(OpKernelContext* context) override {
284 QuantileStreamResource* streams_resource;
285 // Create a reference to the underlying resource using the handle.
286 OP_REQUIRES_OK(context, LookupResource(context, HandleFromInput(context, 0),
287 &streams_resource));
288 // Remove the reference at the end of this scope.
289 mutex_lock l(*streams_resource->mutex());
290 core::ScopedUnref unref_me(streams_resource);
291
292 OpInputList bucket_boundaries_list;
293 OP_REQUIRES_OK(context, context->input_list(kBucketBoundariesName,
294 &bucket_boundaries_list));
295
296 auto do_quantile_deserialize = [&](const int64 begin, const int64 end) {
297 // Iterating over all streams.
298 for (int64 stream_idx = begin; stream_idx < end; stream_idx++) {
299 const Tensor& bucket_boundaries_t = bucket_boundaries_list[stream_idx];
300 const auto& bucket_boundaries = bucket_boundaries_t.vec<float>();
301 std::vector<float> result;
302 result.reserve(bucket_boundaries.size());
303 for (size_t i = 0; i < bucket_boundaries.size(); ++i) {
304 result.push_back(bucket_boundaries(i));
305 }
306 streams_resource->set_boundaries(result, stream_idx);
307 }
308 };
309
310 // TODO(tanzheny): comment on the magic number.
311 const int64 kCostPerUnit = 500 * num_features_;
312 const DeviceBase::CpuWorkerThreads& worker_threads =
313 *context->device()->tensorflow_cpu_worker_threads();
314 Shard(worker_threads.num_threads, worker_threads.workers, num_features_,
315 kCostPerUnit, do_quantile_deserialize);
316 }
317
318 private:
319 int64 num_features_;
320 };
321
322 REGISTER_KERNEL_BUILDER(
323 Name("BoostedTreesQuantileStreamResourceDeserialize").Device(DEVICE_CPU),
324 BoostedTreesQuantileStreamResourceDeserializeOp);
325
326 class BoostedTreesQuantileStreamResourceFlushOp : public OpKernel {
327 public:
BoostedTreesQuantileStreamResourceFlushOp(OpKernelConstruction * const context)328 explicit BoostedTreesQuantileStreamResourceFlushOp(
329 OpKernelConstruction* const context)
330 : OpKernel(context) {
331 OP_REQUIRES_OK(context,
332 context->GetAttr(kGenerateQuantiles, &generate_quantiles_));
333 }
334
Compute(OpKernelContext * context)335 void Compute(OpKernelContext* context) override {
336 ResourceHandle handle;
337 OP_REQUIRES_OK(context,
338 HandleFromInput(context, kResourceHandleName, &handle));
339 QuantileStreamResource* stream_resource;
340 // Create a reference to the underlying resource using the handle.
341 OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource));
342 // Remove the reference at the end of this scope.
343 mutex_lock l(*stream_resource->mutex());
344 core::ScopedUnref unref_me(stream_resource);
345
346 const Tensor* num_buckets_t;
347 OP_REQUIRES_OK(context, context->input(kNumBucketsName, &num_buckets_t));
348 const int64 num_buckets = num_buckets_t->scalar<int64>()();
349 const int64 num_streams = stream_resource->num_streams();
350
351 auto do_quantile_flush = [&](const int64 begin, const int64 end) {
352 // Iterating over all streams.
353 for (int64 stream_idx = begin; stream_idx < end; ++stream_idx) {
354 QuantileStream* stream = stream_resource->stream(stream_idx);
355 stream->Finalize();
356 stream_resource->set_boundaries(
357 generate_quantiles_ ? GenerateQuantiles(*stream, num_buckets)
358 : GenerateBoundaries(*stream, num_buckets),
359 stream_idx);
360 }
361 };
362
363 // TODO(tanzheny): comment on the magic number.
364 const int64 kCostPerUnit = 500 * num_streams;
365 const DeviceBase::CpuWorkerThreads& worker_threads =
366 *context->device()->tensorflow_cpu_worker_threads();
367 Shard(worker_threads.num_threads, worker_threads.workers, num_streams,
368 kCostPerUnit, do_quantile_flush);
369
370 stream_resource->set_buckets_ready(true);
371 }
372
373 private:
374 bool generate_quantiles_;
375 };
376
377 REGISTER_KERNEL_BUILDER(
378 Name("BoostedTreesQuantileStreamResourceFlush").Device(DEVICE_CPU),
379 BoostedTreesQuantileStreamResourceFlushOp);
380
381 class BoostedTreesQuantileStreamResourceGetBucketBoundariesOp
382 : public OpKernel {
383 public:
BoostedTreesQuantileStreamResourceGetBucketBoundariesOp(OpKernelConstruction * const context)384 explicit BoostedTreesQuantileStreamResourceGetBucketBoundariesOp(
385 OpKernelConstruction* const context)
386 : OpKernel(context) {
387 OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_));
388 }
389
Compute(OpKernelContext * const context)390 void Compute(OpKernelContext* const context) override {
391 ResourceHandle handle;
392 OP_REQUIRES_OK(context,
393 HandleFromInput(context, kResourceHandleName, &handle));
394 QuantileStreamResource* stream_resource;
395 // Create a reference to the underlying resource using the handle.
396 OP_REQUIRES_OK(context, LookupResource(context, handle, &stream_resource));
397 // Remove the reference at the end of this scope.
398 mutex_lock l(*stream_resource->mutex());
399 core::ScopedUnref unref_me(stream_resource);
400
401 const int64 num_streams = stream_resource->num_streams();
402 CHECK_EQ(num_features_, num_streams);
403 OpOutputList bucket_boundaries_list;
404 OP_REQUIRES_OK(context, context->output_list(kBucketBoundariesName,
405 &bucket_boundaries_list));
406
407 auto do_quantile_get_buckets = [&](const int64 begin, const int64 end) {
408 // Iterating over all streams.
409 for (int64 stream_idx = begin; stream_idx < end; stream_idx++) {
410 const auto& boundaries = stream_resource->boundaries(stream_idx);
411 Tensor* bucket_boundaries_t = nullptr;
412 OP_REQUIRES_OK(context,
413 bucket_boundaries_list.allocate(
414 stream_idx, {static_cast<int64>(boundaries.size())},
415 &bucket_boundaries_t));
416 auto* quantiles_flat = bucket_boundaries_t->flat<float>().data();
417 memcpy(quantiles_flat, boundaries.data(),
418 sizeof(float) * boundaries.size());
419 }
420 };
421
422 // TODO(tanzheny): comment on the magic number.
423 const int64 kCostPerUnit = 500 * num_streams;
424 const DeviceBase::CpuWorkerThreads& worker_threads =
425 *context->device()->tensorflow_cpu_worker_threads();
426 Shard(worker_threads.num_threads, worker_threads.workers, num_streams,
427 kCostPerUnit, do_quantile_get_buckets);
428 }
429
430 private:
431 int64 num_features_;
432 };
433
434 REGISTER_KERNEL_BUILDER(
435 Name("BoostedTreesQuantileStreamResourceGetBucketBoundaries")
436 .Device(DEVICE_CPU),
437 BoostedTreesQuantileStreamResourceGetBucketBoundariesOp);
438
439 // Given the calculated quantiles thresholds and input data, this operation
440 // converts the input features into the buckets (categorical values), depending
441 // on which quantile they fall into.
442 class BoostedTreesBucketizeOp : public OpKernel {
443 public:
BoostedTreesBucketizeOp(OpKernelConstruction * const context)444 explicit BoostedTreesBucketizeOp(OpKernelConstruction* const context)
445 : OpKernel(context) {
446 OP_REQUIRES_OK(context, context->GetAttr(kNumFeaturesName, &num_features_));
447 }
448
Compute(OpKernelContext * const context)449 void Compute(OpKernelContext* const context) override {
450 // Read float features list;
451 OpInputList float_features_list;
452 OP_REQUIRES_OK(
453 context, context->input_list(kFloatFeaturesName, &float_features_list));
454 OpInputList bucket_boundaries_list;
455 OP_REQUIRES_OK(context, context->input_list(kBucketBoundariesName,
456 &bucket_boundaries_list));
457 OP_REQUIRES(context,
458 tensorflow::TensorShapeUtils::IsVector(
459 bucket_boundaries_list[0].shape()),
460 errors::InvalidArgument(
461 strings::Printf("Buckets should be flat vectors.")));
462 OpOutputList buckets_list;
463 OP_REQUIRES_OK(context, context->output_list(kBucketsName, &buckets_list));
464
465 auto do_quantile_get_quantiles = [&](const int64 begin, const int64 end) {
466 // Iterating over all resources
467 for (int64 feature_idx = begin; feature_idx < end; feature_idx++) {
468 const Tensor& values_tensor = float_features_list[feature_idx];
469 const int64 num_values = values_tensor.dim_size(0);
470
471 Tensor* output_t = nullptr;
472 OP_REQUIRES_OK(context,
473 buckets_list.allocate(
474 feature_idx, TensorShape({num_values}), &output_t));
475 auto output = output_t->flat<int32>();
476
477 const std::vector<float>& bucket_boundaries_vector =
478 GetBuckets(feature_idx, bucket_boundaries_list);
479 auto flat_values = values_tensor.flat<float>();
480 const auto& iter_begin = bucket_boundaries_vector.begin();
481 const auto& iter_end = bucket_boundaries_vector.end();
482 for (int64 instance = 0; instance < num_values; instance++) {
483 if (iter_begin == iter_end) {
484 output(instance) = 0;
485 continue;
486 }
487 const float value = flat_values(instance);
488 auto bucket_iter = std::lower_bound(iter_begin, iter_end, value);
489 if (bucket_iter == iter_end) {
490 --bucket_iter;
491 }
492 const int32 bucket = static_cast<int32>(bucket_iter - iter_begin);
493 // Bucket id.
494 output(instance) = bucket;
495 }
496 }
497 };
498
499 // TODO(tanzheny): comment on the magic number.
500 const int64 kCostPerUnit = 500 * num_features_;
501 const DeviceBase::CpuWorkerThreads& worker_threads =
502 *context->device()->tensorflow_cpu_worker_threads();
503 Shard(worker_threads.num_threads, worker_threads.workers, num_features_,
504 kCostPerUnit, do_quantile_get_quantiles);
505 }
506
507 private:
508 int64 num_features_;
509 };
510
511 REGISTER_KERNEL_BUILDER(Name("BoostedTreesBucketize").Device(DEVICE_CPU),
512 BoostedTreesBucketizeOp);
513
514 } // namespace tensorflow
515