1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/contrib/bigtable/kernels/test_kernels/bigtable_test_client.h"
17 
18 #include "external/com_github_googleapis_googleapis/google/bigtable/v2/data.pb.h"
19 #include "google/protobuf/wrappers.pb.h"
20 #include "re2/re2.h"
21 #include "tensorflow/core/lib/strings/stringprintf.h"
22 #include "tensorflow/core/util/ptr_util.h"
23 // #include "util/task/codes.pb.h"
24 
25 namespace tensorflow {
26 namespace {
27 
UpdateRow(const::google::bigtable::v2::Mutation & mut,std::map<string,string> * row)28 void UpdateRow(const ::google::bigtable::v2::Mutation& mut,
29                std::map<string, string>* row) {
30   if (mut.has_set_cell()) {
31     CHECK(mut.set_cell().timestamp_micros() >= -1)
32         << "Timestamp_micros: " << mut.set_cell().timestamp_micros();
33     auto col =
34         strings::Printf("%s:%s", mut.set_cell().family_name().c_str(),
35                         string(mut.set_cell().column_qualifier()).c_str());
36     (*row)[col] = string(mut.set_cell().value());
37   } else if (mut.has_delete_from_column()) {
38     auto col = strings::Printf(
39         "%s:%s", mut.delete_from_column().family_name().c_str(),
40         string(mut.delete_from_column().column_qualifier()).c_str());
41     row->erase(col);
42   } else if (mut.has_delete_from_family()) {
43     auto itr = row->lower_bound(mut.delete_from_family().family_name());
44     auto prefix =
45         strings::Printf("%s:", mut.delete_from_family().family_name().c_str());
46     while (itr != row->end() && itr->first.substr(0, prefix.size()) == prefix) {
47       row->erase(itr);
48     }
49   } else if (mut.has_delete_from_row()) {
50     row->clear();
51   } else {
52     LOG(ERROR) << "Unknown mutation: " << mut.ShortDebugString();
53   }
54 }
55 
56 }  // namespace
57 
58 class SampleRowKeysResponse : public grpc::ClientReaderInterface<
59                                   google::bigtable::v2::SampleRowKeysResponse> {
60  public:
SampleRowKeysResponse(BigtableTestClient * client)61   explicit SampleRowKeysResponse(BigtableTestClient* client)
62       : client_(client) {}
63 
NextMessageSize(uint32_t * sz)64   bool NextMessageSize(uint32_t* sz) override {
65     mutex_lock l(mu_);
66     mutex_lock l2(client_->mu_);
67     if (num_messages_sent_ * 2 < client_->table_.rows.size()) {
68       *sz = 10000;  // A sufficiently high enough value to not worry about.
69       return true;
70     }
71     return false;
72   }
73 
Read(google::bigtable::v2::SampleRowKeysResponse * resp)74   bool Read(google::bigtable::v2::SampleRowKeysResponse* resp) override {
75     // Send every other key from the table.
76     mutex_lock l(mu_);
77     mutex_lock l2(client_->mu_);
78     *resp = google::bigtable::v2::SampleRowKeysResponse();
79     auto itr = client_->table_.rows.begin();
80     for (uint64 i = 0; i < 2 * num_messages_sent_; ++i) {
81       ++itr;
82       if (itr == client_->table_.rows.end()) {
83         return false;
84       }
85     }
86     resp->set_row_key(itr->first);
87     resp->set_offset_bytes(100 * num_messages_sent_);
88     num_messages_sent_++;
89     return true;
90   }
91 
Finish()92   grpc::Status Finish() override { return grpc::Status::OK; }
93 
WaitForInitialMetadata()94   void WaitForInitialMetadata() override {}  // Do nothing.
95 
96  private:
97   mutex mu_;
98   int64 num_messages_sent_ GUARDED_BY(mu_) = 0;
99   BigtableTestClient* client_;  // Not owned.
100 };
101 
102 class ReadRowsResponse : public grpc::ClientReaderInterface<
103                              google::bigtable::v2::ReadRowsResponse> {
104  public:
ReadRowsResponse(BigtableTestClient * client,google::bigtable::v2::ReadRowsRequest const & request)105   ReadRowsResponse(BigtableTestClient* client,
106                    google::bigtable::v2::ReadRowsRequest const& request)
107       : client_(client), request_(request) {}
108 
NextMessageSize(uint32_t * sz)109   bool NextMessageSize(uint32_t* sz) override {
110     mutex_lock l(mu_);
111     if (sent_first_message_) {
112       return false;
113     }
114     *sz = 10000000;  // A sufficiently high enough value to not worry about.
115     return true;
116   }
117 
Read(google::bigtable::v2::ReadRowsResponse * resp)118   bool Read(google::bigtable::v2::ReadRowsResponse* resp) override {
119     mutex_lock l(mu_);
120     if (sent_first_message_) {
121       return false;
122     }
123     sent_first_message_ = true;
124     RowFilter filter = MakeRowFilter();
125 
126     mutex_lock l2(client_->mu_);
127     *resp = google::bigtable::v2::ReadRowsResponse();
128     // Send all contents in first response.
129     for (auto itr = client_->table_.rows.begin();
130          itr != client_->table_.rows.end(); ++itr) {
131       if (filter.AllowRow(itr->first)) {
132         ::google::bigtable::v2::ReadRowsResponse_CellChunk* chunk = nullptr;
133         bool sent_first = false;
134         for (auto col_itr = itr->second.columns.begin();
135              col_itr != itr->second.columns.end(); ++col_itr) {
136           if (filter.AllowColumn(col_itr->first)) {
137             chunk = resp->add_chunks();
138             if (!sent_first) {
139               sent_first = true;
140               chunk->set_row_key(itr->first);
141             }
142             auto colon_idx = col_itr->first.find(":");
143             CHECK(colon_idx != string::npos)
144                 << "No ':' found in: " << col_itr->first;
145             chunk->mutable_family_name()->set_value(
146                 string(col_itr->first, 0, colon_idx));
147             chunk->mutable_qualifier()->set_value(
148                 string(col_itr->first, ++colon_idx));
149             if (!filter.strip_values) {
150               chunk->set_value(col_itr->second);
151             }
152             if (filter.only_one_column) {
153               break;
154             }
155           }
156         }
157         if (sent_first) {
158           // We are sending this row, so set the commit flag on the last chunk.
159           chunk->set_commit_row(true);
160         }
161       }
162     }
163     return true;
164   }
165 
Finish()166   grpc::Status Finish() override { return grpc::Status::OK; }
167 
WaitForInitialMetadata()168   void WaitForInitialMetadata() override {}  // Do nothing.
169 
170  private:
171   struct RowFilter {
172     std::set<string> row_set;
173     std::vector<std::pair<string, string>> row_ranges;
174     double row_sample = 0.0;  // Note: currently ignored.
175     std::unique_ptr<RE2> col_filter;
176     bool strip_values = false;
177     bool only_one_column = false;
178 
AllowRowtensorflow::ReadRowsResponse::RowFilter179     bool AllowRow(const string& row) {
180       if (row_set.find(row) != row_set.end()) {
181         return true;
182       }
183       for (const auto& range : row_ranges) {
184         if (range.first <= row && range.second > row) {
185           return true;
186         }
187       }
188       return false;
189     }
190 
AllowColumntensorflow::ReadRowsResponse::RowFilter191     bool AllowColumn(const string& col) {
192       if (col_filter) {
193         return RE2::FullMatch(col, *col_filter);
194       } else {
195         return true;
196       }
197     }
198   };
199 
MakeRowFilter()200   RowFilter MakeRowFilter() {
201     RowFilter filter;
202     for (auto i = request_.rows().row_keys().begin();
203          i != request_.rows().row_keys().end(); ++i) {
204       filter.row_set.insert(string(*i));
205     }
206     for (auto i = request_.rows().row_ranges().begin();
207          i != request_.rows().row_ranges().end(); ++i) {
208       if (i->start_key_case() !=
209               google::bigtable::v2::RowRange::kStartKeyClosed ||
210           i->end_key_case() != google::bigtable::v2::RowRange::kEndKeyOpen) {
211         LOG(WARNING) << "Skipping row range that cannot be processed: "
212                      << i->ShortDebugString();
213         continue;
214       }
215       filter.row_ranges.emplace_back(std::make_pair(
216           string(i->start_key_closed()), string(i->end_key_open())));
217     }
218     if (request_.filter().has_chain()) {
219       string family_filter;
220       string qualifier_filter;
221       for (auto i = request_.filter().chain().filters().begin();
222            i != request_.filter().chain().filters().end(); ++i) {
223         switch (i->filter_case()) {
224           case google::bigtable::v2::RowFilter::kFamilyNameRegexFilter:
225             family_filter = i->family_name_regex_filter();
226             break;
227           case google::bigtable::v2::RowFilter::kColumnQualifierRegexFilter:
228             qualifier_filter = i->column_qualifier_regex_filter();
229             break;
230           case google::bigtable::v2::RowFilter::kCellsPerColumnLimitFilter:
231             if (i->cells_per_column_limit_filter() != 1) {
232               LOG(ERROR) << "Unexpected cells_per_column_limit_filter: "
233                          << i->cells_per_column_limit_filter();
234             }
235             break;
236           case google::bigtable::v2::RowFilter::kStripValueTransformer:
237             filter.strip_values = i->strip_value_transformer();
238             break;
239           case google::bigtable::v2::RowFilter::kRowSampleFilter:
240             LOG(INFO) << "Ignoring row sample directive.";
241             break;
242           case google::bigtable::v2::RowFilter::kPassAllFilter:
243             break;
244           case google::bigtable::v2::RowFilter::kCellsPerRowLimitFilter:
245             filter.only_one_column = true;
246             break;
247           default:
248             LOG(WARNING) << "Ignoring unknown filter type: "
249                          << i->ShortDebugString();
250         }
251       }
252       if (family_filter.empty() || qualifier_filter.empty()) {
253         LOG(WARNING) << "Missing regex!";
254       } else {
255         string regex = strings::Printf("%s:%s", family_filter.c_str(),
256                                        qualifier_filter.c_str());
257         filter.col_filter.reset(new RE2(regex));
258       }
259     } else {
260       LOG(WARNING) << "Read request did not have a filter chain specified: "
261                    << request_.filter().DebugString();
262     }
263     return filter;
264   }
265 
266   mutex mu_;
267   bool sent_first_message_ GUARDED_BY(mu_) = false;
268   BigtableTestClient* client_;  // Not owned.
269   const google::bigtable::v2::ReadRowsRequest request_;
270 };
271 
272 class MutateRowsResponse : public grpc::ClientReaderInterface<
273                                google::bigtable::v2::MutateRowsResponse> {
274  public:
MutateRowsResponse(size_t num_successes)275   explicit MutateRowsResponse(size_t num_successes)
276       : num_successes_(num_successes) {}
277 
NextMessageSize(uint32_t * sz)278   bool NextMessageSize(uint32_t* sz) override {
279     mutex_lock l(mu_);
280     if (sent_first_message_) {
281       return false;
282     }
283     *sz = 10000000;  // A sufficiently high enough value to not worry about.
284     return true;
285   }
286 
Read(google::bigtable::v2::MutateRowsResponse * resp)287   bool Read(google::bigtable::v2::MutateRowsResponse* resp) override {
288     mutex_lock l(mu_);
289     if (sent_first_message_) {
290       return false;
291     }
292     sent_first_message_ = true;
293     *resp = google::bigtable::v2::MutateRowsResponse();
294     for (size_t i = 0; i < num_successes_; ++i) {
295       auto entry = resp->add_entries();
296       entry->set_index(i);
297     }
298     return true;
299   }
300 
Finish()301   grpc::Status Finish() override { return grpc::Status::OK; }
302 
WaitForInitialMetadata()303   void WaitForInitialMetadata() override {}  // Do nothing.
304 
305  private:
306   const size_t num_successes_;
307 
308   mutex mu_;
309   bool sent_first_message_ = false;
310 };
311 
MutateRow(grpc::ClientContext * context,google::bigtable::v2::MutateRowRequest const & request,google::bigtable::v2::MutateRowResponse * response)312 grpc::Status BigtableTestClient::MutateRow(
313     grpc::ClientContext* context,
314     google::bigtable::v2::MutateRowRequest const& request,
315     google::bigtable::v2::MutateRowResponse* response) {
316   mutex_lock l(mu_);
317   auto* row = &table_.rows[string(request.row_key())];
318   for (int i = 0; i < request.mutations_size(); ++i) {
319     UpdateRow(request.mutations(i), &row->columns);
320   }
321   *response = google::bigtable::v2::MutateRowResponse();
322   return grpc::Status::OK;
323 }
CheckAndMutateRow(grpc::ClientContext * context,google::bigtable::v2::CheckAndMutateRowRequest const & request,google::bigtable::v2::CheckAndMutateRowResponse * response)324 grpc::Status BigtableTestClient::CheckAndMutateRow(
325     grpc::ClientContext* context,
326     google::bigtable::v2::CheckAndMutateRowRequest const& request,
327     google::bigtable::v2::CheckAndMutateRowResponse* response) {
328   return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
329                       "CheckAndMutateRow not implemented.");
330 }
ReadModifyWriteRow(grpc::ClientContext * context,google::bigtable::v2::ReadModifyWriteRowRequest const & request,google::bigtable::v2::ReadModifyWriteRowResponse * response)331 grpc::Status BigtableTestClient::ReadModifyWriteRow(
332     grpc::ClientContext* context,
333     google::bigtable::v2::ReadModifyWriteRowRequest const& request,
334     google::bigtable::v2::ReadModifyWriteRowResponse* response) {
335   return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
336                       "ReadModifyWriteRow not implemented.");
337 }
338 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<
339     google::bigtable::v2::ReadModifyWriteRowResponse>>
AsyncReadModifyWriteRow(grpc::ClientContext * context,google::bigtable::v2::ReadModifyWriteRowRequest const & request,grpc::CompletionQueue * cq)340 BigtableTestClient::AsyncReadModifyWriteRow(
341     grpc::ClientContext* context,
342     google::bigtable::v2::ReadModifyWriteRowRequest const& request,
343     grpc::CompletionQueue* cq) {
344   LOG(WARNING) << "Call to AsyncReadModifyWriteRow:" << __func__
345                << "(); this will likely cause a crash!";
346   return nullptr;
347 }
348 
349 std::unique_ptr<
350     grpc::ClientReaderInterface<google::bigtable::v2::ReadRowsResponse>>
ReadRows(grpc::ClientContext * context,google::bigtable::v2::ReadRowsRequest const & request)351 BigtableTestClient::ReadRows(
352     grpc::ClientContext* context,
353     google::bigtable::v2::ReadRowsRequest const& request) {
354   return MakeUnique<ReadRowsResponse>(this, request);
355 }
356 
357 std::unique_ptr<
358     grpc::ClientReaderInterface<google::bigtable::v2::SampleRowKeysResponse>>
SampleRowKeys(grpc::ClientContext * context,google::bigtable::v2::SampleRowKeysRequest const & request)359 BigtableTestClient::SampleRowKeys(
360     grpc::ClientContext* context,
361     google::bigtable::v2::SampleRowKeysRequest const& request) {
362   return MakeUnique<SampleRowKeysResponse>(this);
363 }
364 std::unique_ptr<
365     grpc::ClientReaderInterface<google::bigtable::v2::MutateRowsResponse>>
MutateRows(grpc::ClientContext * context,google::bigtable::v2::MutateRowsRequest const & request)366 BigtableTestClient::MutateRows(
367     grpc::ClientContext* context,
368     google::bigtable::v2::MutateRowsRequest const& request) {
369   mutex_lock l(mu_);
370   for (auto i = request.entries().begin(); i != request.entries().end(); ++i) {
371     auto* row = &table_.rows[string(i->row_key())];
372     for (auto mut = i->mutations().begin(); mut != i->mutations().end();
373          ++mut) {
374       UpdateRow(*mut, &row->columns);
375     }
376   }
377   return MakeUnique<MutateRowsResponse>(request.entries_size());
378 }
379 
380 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<
381     google::bigtable::v2::MutateRowResponse>>
AsyncMutateRow(grpc::ClientContext * context,google::bigtable::v2::MutateRowRequest const & request,grpc::CompletionQueue * cq)382 BigtableTestClient::AsyncMutateRow(
383     grpc::ClientContext* context,
384     google::bigtable::v2::MutateRowRequest const& request,
385     grpc::CompletionQueue* cq) {
386   LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
387                << "(); this will likely cause a crash!";
388   return nullptr;
389 }
390 
391 std::unique_ptr<::grpc::ClientAsyncReaderInterface<
392     ::google::bigtable::v2::SampleRowKeysResponse>>
AsyncSampleRowKeys(::grpc::ClientContext * context,const::google::bigtable::v2::SampleRowKeysRequest & request,::grpc::CompletionQueue * cq,void * tag)393 BigtableTestClient::AsyncSampleRowKeys(
394     ::grpc::ClientContext* context,
395     const ::google::bigtable::v2::SampleRowKeysRequest& request,
396     ::grpc::CompletionQueue* cq, void* tag) {
397   LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
398                << "(); this will likely cause a crash!";
399   return nullptr;
400 }
401 
402 std::unique_ptr<::grpc::ClientAsyncReaderInterface<
403     ::google::bigtable::v2::MutateRowsResponse>>
AsyncMutateRows(::grpc::ClientContext * context,const::google::bigtable::v2::MutateRowsRequest & request,::grpc::CompletionQueue * cq,void * tag)404 BigtableTestClient::AsyncMutateRows(
405     ::grpc::ClientContext* context,
406     const ::google::bigtable::v2::MutateRowsRequest& request,
407     ::grpc::CompletionQueue* cq, void* tag) {
408   LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
409                << "(); this will likely cause a crash!";
410   return nullptr;
411 }
412 
413 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<
414     google::bigtable::v2::CheckAndMutateRowResponse>>
AsyncCheckAndMutateRow(grpc::ClientContext * context,const google::bigtable::v2::CheckAndMutateRowRequest & request,grpc::CompletionQueue * cq)415 BigtableTestClient::AsyncCheckAndMutateRow(
416     grpc::ClientContext* context,
417     const google::bigtable::v2::CheckAndMutateRowRequest& request,
418     grpc::CompletionQueue* cq) {
419   LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
420                << "(); this will likely cause a crash!";
421   return nullptr;
422 }
423 
424 std::unique_ptr<
425     grpc::ClientAsyncReaderInterface<google::bigtable::v2::ReadRowsResponse>>
AsyncReadRows(grpc::ClientContext * context,const google::bigtable::v2::ReadRowsRequest & request,grpc::CompletionQueue * cq,void * tag)426 BigtableTestClient::AsyncReadRows(
427     grpc::ClientContext* context,
428     const google::bigtable::v2::ReadRowsRequest& request,
429     grpc::CompletionQueue* cq, void* tag) {
430   LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
431                << "(); this will likely cause a crash!";
432   return nullptr;
433 }
434 
Channel()435 std::shared_ptr<grpc::Channel> BigtableTestClient::Channel() {
436   LOG(WARNING) << "Call to InMemoryDataClient::Channel(); this will likely "
437                   "cause a crash!";
438   return nullptr;
439 }
440 }  // namespace tensorflow
441