1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/contrib/cloud/kernels/bigquery_table_accessor.h"
16 
17 #include "tensorflow/core/example/feature.pb.h"
18 #include "tensorflow/core/lib/strings/numbers.h"
19 
20 namespace tensorflow {
21 namespace {
22 
23 constexpr size_t kBufferSize = 1024 * 1024;  // In bytes.
24 const string kBigQueryEndPoint = "https://www.googleapis.com/bigquery/v2";
25 
IsPartitionEmpty(const BigQueryTablePartition & partition)26 bool IsPartitionEmpty(const BigQueryTablePartition& partition) {
27   if (partition.end_index() != -1 &&
28       partition.end_index() < partition.start_index()) {
29     return true;
30   }
31   return false;
32 }
33 
ParseJson(StringPiece json,Json::Value * result)34 Status ParseJson(StringPiece json, Json::Value* result) {
35   Json::Reader reader;
36   if (!reader.parse(string(json), *result)) {
37     return errors::Internal("Couldn't parse JSON response from BigQuery.");
38   }
39   return Status::OK();
40 }
41 
ParseColumnType(const string & type,BigQueryTableAccessor::ColumnType * enum_type)42 Status ParseColumnType(const string& type,
43                        BigQueryTableAccessor::ColumnType* enum_type) {
44   if (type == "RECORD") {
45     *enum_type = BigQueryTableAccessor::ColumnType::kRecord;
46   } else if (type == "STRING") {
47     *enum_type = BigQueryTableAccessor::ColumnType::kString;
48   } else if (type == "BYTES") {
49     *enum_type = BigQueryTableAccessor::ColumnType::kBytes;
50   } else if (type == "INTEGER") {
51     *enum_type = BigQueryTableAccessor::ColumnType::kInteger;
52   } else if (type == "FLOAT") {
53     *enum_type = BigQueryTableAccessor::ColumnType::kFloat;
54   } else if (type == "BOOLEAN") {
55     *enum_type = BigQueryTableAccessor::ColumnType::kBoolean;
56   } else if (type == "TIMESTAMP") {
57     *enum_type = BigQueryTableAccessor::ColumnType::kTimestamp;
58   } else if (type == "DATE") {
59     *enum_type = BigQueryTableAccessor::ColumnType::kDate;
60   } else if (type == "TIME") {
61     *enum_type = BigQueryTableAccessor::ColumnType::kTime;
62   } else if (type == "DATETIME") {
63     *enum_type = BigQueryTableAccessor::ColumnType::kDatetime;
64   } else {
65     return errors::Internal(
66         strings::StrCat("Could not parse column type ", type));
67   }
68   return Status::OK();
69 }
70 
71 }  // namespace
72 
New(const string & project_id,const string & dataset_id,const string & table_id,int64 timestamp_millis,int64 row_buffer_size,const string & end_point,const std::vector<string> & columns,const BigQueryTablePartition & partition,std::unique_ptr<BigQueryTableAccessor> * accessor)73 Status BigQueryTableAccessor::New(
74     const string& project_id, const string& dataset_id, const string& table_id,
75     int64 timestamp_millis, int64 row_buffer_size, const string& end_point,
76     const std::vector<string>& columns, const BigQueryTablePartition& partition,
77     std::unique_ptr<BigQueryTableAccessor>* accessor) {
78   return New(project_id, dataset_id, table_id, timestamp_millis,
79              row_buffer_size, end_point, columns, partition, nullptr, nullptr,
80              accessor);
81 }
82 
New(const string & project_id,const string & dataset_id,const string & table_id,int64 timestamp_millis,int64 row_buffer_size,const string & end_point,const std::vector<string> & columns,const BigQueryTablePartition & partition,std::unique_ptr<AuthProvider> auth_provider,std::shared_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<BigQueryTableAccessor> * accessor)83 Status BigQueryTableAccessor::New(
84     const string& project_id, const string& dataset_id, const string& table_id,
85     int64 timestamp_millis, int64 row_buffer_size, const string& end_point,
86     const std::vector<string>& columns, const BigQueryTablePartition& partition,
87     std::unique_ptr<AuthProvider> auth_provider,
88     std::shared_ptr<HttpRequest::Factory> http_request_factory,
89     std::unique_ptr<BigQueryTableAccessor>* accessor) {
90   if (timestamp_millis <= 0) {
91     return errors::InvalidArgument(
92         "Cannot use zero or negative timestamp to query a table.");
93   }
94   const string& big_query_end_point =
95       end_point.empty() ? kBigQueryEndPoint : end_point;
96   if (auth_provider == nullptr && http_request_factory == nullptr) {
97     http_request_factory = std::make_shared<CurlHttpRequest::Factory>();
98     auto compute_engine_metadata_client =
99         std::make_shared<ComputeEngineMetadataClient>(http_request_factory);
100     auth_provider = std::unique_ptr<AuthProvider>(
101         new GoogleAuthProvider(compute_engine_metadata_client));
102   }
103 
104   accessor->reset(new BigQueryTableAccessor(
105       project_id, dataset_id, table_id, timestamp_millis, row_buffer_size,
106       big_query_end_point, columns, partition, std::move(auth_provider),
107       std::move(http_request_factory)));
108 
109   return (*accessor)->ReadSchema();
110 }
111 
BigQueryTableAccessor(const string & project_id,const string & dataset_id,const string & table_id,int64 timestamp_millis,int64 row_buffer_size,const string & end_point,const std::vector<string> & columns,const BigQueryTablePartition & partition,std::unique_ptr<AuthProvider> auth_provider,std::shared_ptr<HttpRequest::Factory> http_request_factory)112 BigQueryTableAccessor::BigQueryTableAccessor(
113     const string& project_id, const string& dataset_id, const string& table_id,
114     int64 timestamp_millis, int64 row_buffer_size, const string& end_point,
115     const std::vector<string>& columns, const BigQueryTablePartition& partition,
116     std::unique_ptr<AuthProvider> auth_provider,
117     std::shared_ptr<HttpRequest::Factory> http_request_factory)
118     : project_id_(project_id),
119       dataset_id_(dataset_id),
120       table_id_(table_id),
121       timestamp_millis_(timestamp_millis),
122       columns_(columns.begin(), columns.end()),
123       bigquery_end_point_(end_point),
124       partition_(partition),
125       auth_provider_(std::move(auth_provider)),
126       http_request_factory_(std::move(http_request_factory)) {
127   row_buffer_.resize(row_buffer_size);
128   Reset();
129 }
130 
SetPartition(const BigQueryTablePartition & partition)131 Status BigQueryTableAccessor::SetPartition(
132     const BigQueryTablePartition& partition) {
133   if (partition.start_index() < 0) {
134     return errors::InvalidArgument("Start index cannot be negative.");
135   }
136   partition_ = partition;
137   Reset();
138   return Status::OK();
139 }
140 
Reset()141 void BigQueryTableAccessor::Reset() {
142   first_buffered_row_index_ = partition_.start_index();
143   next_row_in_buffer_ = -1;
144   next_page_token_ = "";
145 }
146 
ReadRow(int64 * row_id,Example * example)147 Status BigQueryTableAccessor::ReadRow(int64* row_id, Example* example) {
148   if (Done()) {
149     return errors::OutOfRange("Reached end of table ", FullTableName());
150   }
151 
152   // If the next row is already fetched and cached, return the row from the
153   // buffer. Otherwise, fill up the row buffer from BigQuery and return a row.
154   if (next_row_in_buffer_ != -1 &&
155       next_row_in_buffer_ < ComputeMaxResultsArg()) {
156     *row_id = first_buffered_row_index_ + next_row_in_buffer_;
157     *example = row_buffer_[next_row_in_buffer_];
158     next_row_in_buffer_++;
159   } else {
160     string auth_token;
161     TF_RETURN_IF_ERROR(
162         AuthProvider::GetToken(auth_provider_.get(), &auth_token));
163 
164     std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
165     std::vector<char> output_buffer;
166     output_buffer.reserve(kBufferSize);
167 
168     // The first time that we access BigQuery there is no page token. After that
169     // we use the page token (which returns rows faster).
170     if (!next_page_token_.empty()) {
171       request->SetUri(strings::StrCat(
172           BigQueryUriPrefix(), "data?maxResults=", ComputeMaxResultsArg(),
173           "&pageToken=", request->EscapeString(next_page_token_)));
174       first_buffered_row_index_ += row_buffer_.size();
175     } else {
176       request->SetUri(strings::StrCat(
177           BigQueryUriPrefix(), "data?maxResults=", ComputeMaxResultsArg(),
178           "&startIndex=", first_buffered_row_index_));
179     }
180     request->AddAuthBearerHeader(auth_token);
181     request->SetResultBuffer(&output_buffer);
182     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading rows from ",
183                                     FullTableName());
184 
185     // Parse the returned row.
186     StringPiece response_piece =
187         StringPiece(&output_buffer[0], output_buffer.size());
188     Json::Value root;
189     TF_RETURN_IF_ERROR(ParseJson(response_piece, &root));
190     for (unsigned int i = 0; i < root["rows"].size(); ++i) {
191       row_buffer_[i].Clear();
192       TF_RETURN_IF_ERROR(
193           ParseColumnValues(root["rows"][i], schema_root_, &row_buffer_[i]));
194     }
195 
196     next_page_token_ = root["pageToken"].asString();
197     *row_id = first_buffered_row_index_;
198     *example = row_buffer_[0];
199     next_row_in_buffer_ = 1;
200   }
201   return Status::OK();
202 }
203 
ComputeMaxResultsArg()204 int64 BigQueryTableAccessor::ComputeMaxResultsArg() {
205   if (partition_.end_index() == -1) {
206     return row_buffer_.size();
207   }
208   if (IsPartitionEmpty(partition_)) {
209     return 0;
210   }
211   return std::min(static_cast<int64>(row_buffer_.size()),
212                   static_cast<int64>(partition_.end_index() -
213                                      partition_.start_index() + 1));
214 }
215 
ParseColumnValues(const Json::Value & value,const SchemaNode & root_schema_node,Example * example)216 Status BigQueryTableAccessor::ParseColumnValues(
217     const Json::Value& value, const SchemaNode& root_schema_node,
218     Example* example) {
219   if (value.empty()) {
220     return Status::OK();
221   }
222   if (value["f"].isNull()) {
223     return Status::OK();
224   }
225   int value_index = 0;
226   for (const auto& schema_node : root_schema_node.schema_nodes) {
227     if (value["f"][value_index].isNull()) {
228       value_index++;
229       continue;
230     }
231 
232     if (schema_node.type == ColumnType::kRecord) {
233       TF_RETURN_IF_ERROR(ParseColumnValues(value["f"][value_index]["v"],
234                                            schema_node, example));
235     } else {
236       // Append the column value only if user has requested the column.
237       if (columns_.empty() ||
238           columns_.find(schema_node.name) != columns_.end()) {
239         TF_RETURN_IF_ERROR(AppendValueToExample(schema_node.name,
240                                                 value["f"][value_index]["v"],
241                                                 schema_node.type, example));
242       }
243     }
244     value_index++;
245   }
246   return Status::OK();
247 }
248 
ReadSchema()249 Status BigQueryTableAccessor::ReadSchema() {
250   string auth_token;
251   TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
252 
253   // Send a request to read the schema.
254   std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
255   std::vector<char> output_buffer;
256   output_buffer.reserve(kBufferSize);
257   request->SetUri(BigQueryUriPrefix());
258   request->AddAuthBearerHeader(auth_token);
259   request->SetResultBuffer(&output_buffer);
260   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading schema for ",
261                                   FullTableName());
262 
263   // Parse the schema.
264   StringPiece response_piece =
265       StringPiece(&output_buffer[0], output_buffer.size());
266 
267   Json::Value root;
268   TF_RETURN_IF_ERROR(ParseJson(response_piece, &root));
269   const auto& columns = root["schema"]["fields"];
270   string column_name_prefix = "";
271   schema_root_ = {"", ColumnType::kNone};
272   TF_RETURN_IF_ERROR(
273       ExtractColumnType(columns, column_name_prefix, &schema_root_));
274   if (root["numRows"].isNull()) {
275     return errors::Internal("Number of rows cannot be extracted for table ",
276                             FullTableName());
277   }
278   strings::safe_strto64(root["numRows"].asString().c_str(), &total_num_rows_);
279   return Status::OK();
280 }
281 
ExtractColumnType(const Json::Value & columns,const string & column_name_prefix,SchemaNode * root)282 Status BigQueryTableAccessor::ExtractColumnType(
283     const Json::Value& columns, const string& column_name_prefix,
284     SchemaNode* root) {
285   for (auto columns_it = columns.begin(); columns_it != columns.end();
286        ++columns_it) {
287     if ((*columns_it)["mode"].asString() == "REPEATED") {
288       return errors::Unimplemented(strings::StrCat(
289           "Tables with repeated columns are not supported: ", FullTableName()));
290     }
291     ColumnType type;
292     const string current_column_name = strings::StrCat(
293         column_name_prefix, (*columns_it)["name"].asString().c_str());
294     TF_RETURN_IF_ERROR(
295         ParseColumnType((*columns_it)["type"].asString().c_str(), &type));
296     root->schema_nodes.emplace_back(current_column_name, type);
297     if (type == ColumnType::kRecord) {
298       const auto new_prefix = strings::StrCat(current_column_name, ".");
299       TF_RETURN_IF_ERROR(ExtractColumnType((*columns_it)["fields"], new_prefix,
300                                            &root->schema_nodes.back()));
301     }
302   }
303   return Status::OK();
304 }
305 
AppendValueToExample(const string & column_name,const Json::Value & column_value,const BigQueryTableAccessor::ColumnType type,Example * example)306 Status BigQueryTableAccessor::AppendValueToExample(
307     const string& column_name, const Json::Value& column_value,
308     const BigQueryTableAccessor::ColumnType type, Example* example) {
309   if (column_value.isNull()) {
310     return Status::OK();
311   }
312   auto& feature =
313       (*example->mutable_features()->mutable_feature())[column_name];
314 
315   switch (type) {
316     case BigQueryTableAccessor::ColumnType::kNone:
317     case BigQueryTableAccessor::ColumnType::kRecord:
318       return errors::Unimplemented("Cannot append type to an example.");
319     case BigQueryTableAccessor::ColumnType::kTimestamp:
320     case BigQueryTableAccessor::ColumnType::kDate:
321     case BigQueryTableAccessor::ColumnType::kTime:
322     case BigQueryTableAccessor::ColumnType::kDatetime:
323     case BigQueryTableAccessor::ColumnType::kString:
324     case BigQueryTableAccessor::ColumnType::kBytes:
325       feature.mutable_bytes_list()->add_value(column_value.asString());
326       break;
327     case BigQueryTableAccessor::ColumnType::kBoolean:
328       feature.mutable_int64_list()->add_value(
329           column_value.asString() == "false" ? 0 : 1);
330       break;
331     case BigQueryTableAccessor::ColumnType::kInteger:
332       int64 column_value_int64;
333       if (!strings::safe_strto64(column_value.asString().c_str(),
334                                  &column_value_int64)) {
335         return errors::Internal("Cannot convert value to integer ",
336                                 column_value.asString().c_str());
337       }
338       feature.mutable_int64_list()->add_value(column_value_int64);
339       break;
340     case BigQueryTableAccessor::ColumnType::kFloat:
341       // BigQuery float is actually a double.
342       double column_value_double;
343       if (!strings::safe_strtod(column_value.asString().c_str(),
344                                 &column_value_double)) {
345         return errors::Internal("Cannot convert value to double: ",
346                                 column_value.asString().c_str());
347       }
348       feature.mutable_float_list()->add_value(
349           static_cast<float>(column_value_double));
350       break;
351   }
352   return Status::OK();
353 }
354 
BigQueryUriPrefix()355 string BigQueryTableAccessor::BigQueryTableAccessor::BigQueryUriPrefix() {
356   CurlHttpRequest request;
357   return strings::StrCat(bigquery_end_point_, "/projects/",
358                          request.EscapeString(project_id_), "/datasets/",
359                          request.EscapeString(dataset_id_), "/tables/",
360                          request.EscapeString(table_id_), "/");
361 }
362 
Done()363 bool BigQueryTableAccessor::Done() {
364   return (total_num_rows_ <= first_buffered_row_index_ + next_row_in_buffer_) ||
365          IsPartitionEmpty(partition_) ||
366          (partition_.end_index() != -1 &&
367           partition_.end_index() <
368               first_buffered_row_index_ + next_row_in_buffer_);
369 }
370 
371 }  // namespace tensorflow
372