1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/contrib/bigtable/kernels/test_kernels/bigtable_test_client.h"
17
18 #include "external/com_github_googleapis_googleapis/google/bigtable/v2/data.pb.h"
19 #include "google/protobuf/wrappers.pb.h"
20 #include "re2/re2.h"
21 #include "tensorflow/core/lib/strings/stringprintf.h"
22 #include "tensorflow/core/util/ptr_util.h"
23 // #include "util/task/codes.pb.h"
24
25 namespace tensorflow {
26 namespace {
27
UpdateRow(const::google::bigtable::v2::Mutation & mut,std::map<string,string> * row)28 void UpdateRow(const ::google::bigtable::v2::Mutation& mut,
29 std::map<string, string>* row) {
30 if (mut.has_set_cell()) {
31 CHECK(mut.set_cell().timestamp_micros() >= -1)
32 << "Timestamp_micros: " << mut.set_cell().timestamp_micros();
33 auto col =
34 strings::Printf("%s:%s", mut.set_cell().family_name().c_str(),
35 string(mut.set_cell().column_qualifier()).c_str());
36 (*row)[col] = string(mut.set_cell().value());
37 } else if (mut.has_delete_from_column()) {
38 auto col = strings::Printf(
39 "%s:%s", mut.delete_from_column().family_name().c_str(),
40 string(mut.delete_from_column().column_qualifier()).c_str());
41 row->erase(col);
42 } else if (mut.has_delete_from_family()) {
43 auto itr = row->lower_bound(mut.delete_from_family().family_name());
44 auto prefix =
45 strings::Printf("%s:", mut.delete_from_family().family_name().c_str());
46 while (itr != row->end() && itr->first.substr(0, prefix.size()) == prefix) {
47 row->erase(itr);
48 }
49 } else if (mut.has_delete_from_row()) {
50 row->clear();
51 } else {
52 LOG(ERROR) << "Unknown mutation: " << mut.ShortDebugString();
53 }
54 }
55
56 } // namespace
57
58 class SampleRowKeysResponse : public grpc::ClientReaderInterface<
59 google::bigtable::v2::SampleRowKeysResponse> {
60 public:
SampleRowKeysResponse(BigtableTestClient * client)61 explicit SampleRowKeysResponse(BigtableTestClient* client)
62 : client_(client) {}
63
NextMessageSize(uint32_t * sz)64 bool NextMessageSize(uint32_t* sz) override {
65 mutex_lock l(mu_);
66 mutex_lock l2(client_->mu_);
67 if (num_messages_sent_ * 2 < client_->table_.rows.size()) {
68 *sz = 10000; // A sufficiently high enough value to not worry about.
69 return true;
70 }
71 return false;
72 }
73
Read(google::bigtable::v2::SampleRowKeysResponse * resp)74 bool Read(google::bigtable::v2::SampleRowKeysResponse* resp) override {
75 // Send every other key from the table.
76 mutex_lock l(mu_);
77 mutex_lock l2(client_->mu_);
78 *resp = google::bigtable::v2::SampleRowKeysResponse();
79 auto itr = client_->table_.rows.begin();
80 for (uint64 i = 0; i < 2 * num_messages_sent_; ++i) {
81 ++itr;
82 if (itr == client_->table_.rows.end()) {
83 return false;
84 }
85 }
86 resp->set_row_key(itr->first);
87 resp->set_offset_bytes(100 * num_messages_sent_);
88 num_messages_sent_++;
89 return true;
90 }
91
Finish()92 grpc::Status Finish() override { return grpc::Status::OK; }
93
WaitForInitialMetadata()94 void WaitForInitialMetadata() override {} // Do nothing.
95
96 private:
97 mutex mu_;
98 int64 num_messages_sent_ GUARDED_BY(mu_) = 0;
99 BigtableTestClient* client_; // Not owned.
100 };
101
102 class ReadRowsResponse : public grpc::ClientReaderInterface<
103 google::bigtable::v2::ReadRowsResponse> {
104 public:
ReadRowsResponse(BigtableTestClient * client,google::bigtable::v2::ReadRowsRequest const & request)105 ReadRowsResponse(BigtableTestClient* client,
106 google::bigtable::v2::ReadRowsRequest const& request)
107 : client_(client), request_(request) {}
108
NextMessageSize(uint32_t * sz)109 bool NextMessageSize(uint32_t* sz) override {
110 mutex_lock l(mu_);
111 if (sent_first_message_) {
112 return false;
113 }
114 *sz = 10000000; // A sufficiently high enough value to not worry about.
115 return true;
116 }
117
Read(google::bigtable::v2::ReadRowsResponse * resp)118 bool Read(google::bigtable::v2::ReadRowsResponse* resp) override {
119 mutex_lock l(mu_);
120 if (sent_first_message_) {
121 return false;
122 }
123 sent_first_message_ = true;
124 RowFilter filter = MakeRowFilter();
125
126 mutex_lock l2(client_->mu_);
127 *resp = google::bigtable::v2::ReadRowsResponse();
128 // Send all contents in first response.
129 for (auto itr = client_->table_.rows.begin();
130 itr != client_->table_.rows.end(); ++itr) {
131 if (filter.AllowRow(itr->first)) {
132 ::google::bigtable::v2::ReadRowsResponse_CellChunk* chunk = nullptr;
133 bool sent_first = false;
134 for (auto col_itr = itr->second.columns.begin();
135 col_itr != itr->second.columns.end(); ++col_itr) {
136 if (filter.AllowColumn(col_itr->first)) {
137 chunk = resp->add_chunks();
138 if (!sent_first) {
139 sent_first = true;
140 chunk->set_row_key(itr->first);
141 }
142 auto colon_idx = col_itr->first.find(":");
143 CHECK(colon_idx != string::npos)
144 << "No ':' found in: " << col_itr->first;
145 chunk->mutable_family_name()->set_value(
146 string(col_itr->first, 0, colon_idx));
147 chunk->mutable_qualifier()->set_value(
148 string(col_itr->first, ++colon_idx));
149 if (!filter.strip_values) {
150 chunk->set_value(col_itr->second);
151 }
152 if (filter.only_one_column) {
153 break;
154 }
155 }
156 }
157 if (sent_first) {
158 // We are sending this row, so set the commit flag on the last chunk.
159 chunk->set_commit_row(true);
160 }
161 }
162 }
163 return true;
164 }
165
Finish()166 grpc::Status Finish() override { return grpc::Status::OK; }
167
WaitForInitialMetadata()168 void WaitForInitialMetadata() override {} // Do nothing.
169
170 private:
171 struct RowFilter {
172 std::set<string> row_set;
173 std::vector<std::pair<string, string>> row_ranges;
174 double row_sample = 0.0; // Note: currently ignored.
175 std::unique_ptr<RE2> col_filter;
176 bool strip_values = false;
177 bool only_one_column = false;
178
AllowRowtensorflow::ReadRowsResponse::RowFilter179 bool AllowRow(const string& row) {
180 if (row_set.find(row) != row_set.end()) {
181 return true;
182 }
183 for (const auto& range : row_ranges) {
184 if (range.first <= row && range.second > row) {
185 return true;
186 }
187 }
188 return false;
189 }
190
AllowColumntensorflow::ReadRowsResponse::RowFilter191 bool AllowColumn(const string& col) {
192 if (col_filter) {
193 return RE2::FullMatch(col, *col_filter);
194 } else {
195 return true;
196 }
197 }
198 };
199
MakeRowFilter()200 RowFilter MakeRowFilter() {
201 RowFilter filter;
202 for (auto i = request_.rows().row_keys().begin();
203 i != request_.rows().row_keys().end(); ++i) {
204 filter.row_set.insert(string(*i));
205 }
206 for (auto i = request_.rows().row_ranges().begin();
207 i != request_.rows().row_ranges().end(); ++i) {
208 if (i->start_key_case() !=
209 google::bigtable::v2::RowRange::kStartKeyClosed ||
210 i->end_key_case() != google::bigtable::v2::RowRange::kEndKeyOpen) {
211 LOG(WARNING) << "Skipping row range that cannot be processed: "
212 << i->ShortDebugString();
213 continue;
214 }
215 filter.row_ranges.emplace_back(std::make_pair(
216 string(i->start_key_closed()), string(i->end_key_open())));
217 }
218 if (request_.filter().has_chain()) {
219 string family_filter;
220 string qualifier_filter;
221 for (auto i = request_.filter().chain().filters().begin();
222 i != request_.filter().chain().filters().end(); ++i) {
223 switch (i->filter_case()) {
224 case google::bigtable::v2::RowFilter::kFamilyNameRegexFilter:
225 family_filter = i->family_name_regex_filter();
226 break;
227 case google::bigtable::v2::RowFilter::kColumnQualifierRegexFilter:
228 qualifier_filter = i->column_qualifier_regex_filter();
229 break;
230 case google::bigtable::v2::RowFilter::kCellsPerColumnLimitFilter:
231 if (i->cells_per_column_limit_filter() != 1) {
232 LOG(ERROR) << "Unexpected cells_per_column_limit_filter: "
233 << i->cells_per_column_limit_filter();
234 }
235 break;
236 case google::bigtable::v2::RowFilter::kStripValueTransformer:
237 filter.strip_values = i->strip_value_transformer();
238 break;
239 case google::bigtable::v2::RowFilter::kRowSampleFilter:
240 LOG(INFO) << "Ignoring row sample directive.";
241 break;
242 case google::bigtable::v2::RowFilter::kPassAllFilter:
243 break;
244 case google::bigtable::v2::RowFilter::kCellsPerRowLimitFilter:
245 filter.only_one_column = true;
246 break;
247 default:
248 LOG(WARNING) << "Ignoring unknown filter type: "
249 << i->ShortDebugString();
250 }
251 }
252 if (family_filter.empty() || qualifier_filter.empty()) {
253 LOG(WARNING) << "Missing regex!";
254 } else {
255 string regex = strings::Printf("%s:%s", family_filter.c_str(),
256 qualifier_filter.c_str());
257 filter.col_filter.reset(new RE2(regex));
258 }
259 } else {
260 LOG(WARNING) << "Read request did not have a filter chain specified: "
261 << request_.filter().DebugString();
262 }
263 return filter;
264 }
265
266 mutex mu_;
267 bool sent_first_message_ GUARDED_BY(mu_) = false;
268 BigtableTestClient* client_; // Not owned.
269 const google::bigtable::v2::ReadRowsRequest request_;
270 };
271
272 class MutateRowsResponse : public grpc::ClientReaderInterface<
273 google::bigtable::v2::MutateRowsResponse> {
274 public:
MutateRowsResponse(size_t num_successes)275 explicit MutateRowsResponse(size_t num_successes)
276 : num_successes_(num_successes) {}
277
NextMessageSize(uint32_t * sz)278 bool NextMessageSize(uint32_t* sz) override {
279 mutex_lock l(mu_);
280 if (sent_first_message_) {
281 return false;
282 }
283 *sz = 10000000; // A sufficiently high enough value to not worry about.
284 return true;
285 }
286
Read(google::bigtable::v2::MutateRowsResponse * resp)287 bool Read(google::bigtable::v2::MutateRowsResponse* resp) override {
288 mutex_lock l(mu_);
289 if (sent_first_message_) {
290 return false;
291 }
292 sent_first_message_ = true;
293 *resp = google::bigtable::v2::MutateRowsResponse();
294 for (size_t i = 0; i < num_successes_; ++i) {
295 auto entry = resp->add_entries();
296 entry->set_index(i);
297 }
298 return true;
299 }
300
Finish()301 grpc::Status Finish() override { return grpc::Status::OK; }
302
WaitForInitialMetadata()303 void WaitForInitialMetadata() override {} // Do nothing.
304
305 private:
306 const size_t num_successes_;
307
308 mutex mu_;
309 bool sent_first_message_ = false;
310 };
311
MutateRow(grpc::ClientContext * context,google::bigtable::v2::MutateRowRequest const & request,google::bigtable::v2::MutateRowResponse * response)312 grpc::Status BigtableTestClient::MutateRow(
313 grpc::ClientContext* context,
314 google::bigtable::v2::MutateRowRequest const& request,
315 google::bigtable::v2::MutateRowResponse* response) {
316 mutex_lock l(mu_);
317 auto* row = &table_.rows[string(request.row_key())];
318 for (int i = 0; i < request.mutations_size(); ++i) {
319 UpdateRow(request.mutations(i), &row->columns);
320 }
321 *response = google::bigtable::v2::MutateRowResponse();
322 return grpc::Status::OK;
323 }
CheckAndMutateRow(grpc::ClientContext * context,google::bigtable::v2::CheckAndMutateRowRequest const & request,google::bigtable::v2::CheckAndMutateRowResponse * response)324 grpc::Status BigtableTestClient::CheckAndMutateRow(
325 grpc::ClientContext* context,
326 google::bigtable::v2::CheckAndMutateRowRequest const& request,
327 google::bigtable::v2::CheckAndMutateRowResponse* response) {
328 return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
329 "CheckAndMutateRow not implemented.");
330 }
ReadModifyWriteRow(grpc::ClientContext * context,google::bigtable::v2::ReadModifyWriteRowRequest const & request,google::bigtable::v2::ReadModifyWriteRowResponse * response)331 grpc::Status BigtableTestClient::ReadModifyWriteRow(
332 grpc::ClientContext* context,
333 google::bigtable::v2::ReadModifyWriteRowRequest const& request,
334 google::bigtable::v2::ReadModifyWriteRowResponse* response) {
335 return grpc::Status(grpc::StatusCode::UNIMPLEMENTED,
336 "ReadModifyWriteRow not implemented.");
337 }
338 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<
339 google::bigtable::v2::ReadModifyWriteRowResponse>>
AsyncReadModifyWriteRow(grpc::ClientContext * context,google::bigtable::v2::ReadModifyWriteRowRequest const & request,grpc::CompletionQueue * cq)340 BigtableTestClient::AsyncReadModifyWriteRow(
341 grpc::ClientContext* context,
342 google::bigtable::v2::ReadModifyWriteRowRequest const& request,
343 grpc::CompletionQueue* cq) {
344 LOG(WARNING) << "Call to AsyncReadModifyWriteRow:" << __func__
345 << "(); this will likely cause a crash!";
346 return nullptr;
347 }
348
349 std::unique_ptr<
350 grpc::ClientReaderInterface<google::bigtable::v2::ReadRowsResponse>>
ReadRows(grpc::ClientContext * context,google::bigtable::v2::ReadRowsRequest const & request)351 BigtableTestClient::ReadRows(
352 grpc::ClientContext* context,
353 google::bigtable::v2::ReadRowsRequest const& request) {
354 return MakeUnique<ReadRowsResponse>(this, request);
355 }
356
357 std::unique_ptr<
358 grpc::ClientReaderInterface<google::bigtable::v2::SampleRowKeysResponse>>
SampleRowKeys(grpc::ClientContext * context,google::bigtable::v2::SampleRowKeysRequest const & request)359 BigtableTestClient::SampleRowKeys(
360 grpc::ClientContext* context,
361 google::bigtable::v2::SampleRowKeysRequest const& request) {
362 return MakeUnique<SampleRowKeysResponse>(this);
363 }
364 std::unique_ptr<
365 grpc::ClientReaderInterface<google::bigtable::v2::MutateRowsResponse>>
MutateRows(grpc::ClientContext * context,google::bigtable::v2::MutateRowsRequest const & request)366 BigtableTestClient::MutateRows(
367 grpc::ClientContext* context,
368 google::bigtable::v2::MutateRowsRequest const& request) {
369 mutex_lock l(mu_);
370 for (auto i = request.entries().begin(); i != request.entries().end(); ++i) {
371 auto* row = &table_.rows[string(i->row_key())];
372 for (auto mut = i->mutations().begin(); mut != i->mutations().end();
373 ++mut) {
374 UpdateRow(*mut, &row->columns);
375 }
376 }
377 return MakeUnique<MutateRowsResponse>(request.entries_size());
378 }
379
380 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<
381 google::bigtable::v2::MutateRowResponse>>
AsyncMutateRow(grpc::ClientContext * context,google::bigtable::v2::MutateRowRequest const & request,grpc::CompletionQueue * cq)382 BigtableTestClient::AsyncMutateRow(
383 grpc::ClientContext* context,
384 google::bigtable::v2::MutateRowRequest const& request,
385 grpc::CompletionQueue* cq) {
386 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
387 << "(); this will likely cause a crash!";
388 return nullptr;
389 }
390
391 std::unique_ptr<::grpc::ClientAsyncReaderInterface<
392 ::google::bigtable::v2::SampleRowKeysResponse>>
AsyncSampleRowKeys(::grpc::ClientContext * context,const::google::bigtable::v2::SampleRowKeysRequest & request,::grpc::CompletionQueue * cq,void * tag)393 BigtableTestClient::AsyncSampleRowKeys(
394 ::grpc::ClientContext* context,
395 const ::google::bigtable::v2::SampleRowKeysRequest& request,
396 ::grpc::CompletionQueue* cq, void* tag) {
397 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
398 << "(); this will likely cause a crash!";
399 return nullptr;
400 }
401
402 std::unique_ptr<::grpc::ClientAsyncReaderInterface<
403 ::google::bigtable::v2::MutateRowsResponse>>
AsyncMutateRows(::grpc::ClientContext * context,const::google::bigtable::v2::MutateRowsRequest & request,::grpc::CompletionQueue * cq,void * tag)404 BigtableTestClient::AsyncMutateRows(
405 ::grpc::ClientContext* context,
406 const ::google::bigtable::v2::MutateRowsRequest& request,
407 ::grpc::CompletionQueue* cq, void* tag) {
408 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
409 << "(); this will likely cause a crash!";
410 return nullptr;
411 }
412
413 std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<
414 google::bigtable::v2::CheckAndMutateRowResponse>>
AsyncCheckAndMutateRow(grpc::ClientContext * context,const google::bigtable::v2::CheckAndMutateRowRequest & request,grpc::CompletionQueue * cq)415 BigtableTestClient::AsyncCheckAndMutateRow(
416 grpc::ClientContext* context,
417 const google::bigtable::v2::CheckAndMutateRowRequest& request,
418 grpc::CompletionQueue* cq) {
419 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
420 << "(); this will likely cause a crash!";
421 return nullptr;
422 }
423
424 std::unique_ptr<
425 grpc::ClientAsyncReaderInterface<google::bigtable::v2::ReadRowsResponse>>
AsyncReadRows(grpc::ClientContext * context,const google::bigtable::v2::ReadRowsRequest & request,grpc::CompletionQueue * cq,void * tag)426 BigtableTestClient::AsyncReadRows(
427 grpc::ClientContext* context,
428 const google::bigtable::v2::ReadRowsRequest& request,
429 grpc::CompletionQueue* cq, void* tag) {
430 LOG(WARNING) << "Call to InMemoryDataClient::" << __func__
431 << "(); this will likely cause a crash!";
432 return nullptr;
433 }
434
Channel()435 std::shared_ptr<grpc::Channel> BigtableTestClient::Channel() {
436 LOG(WARNING) << "Call to InMemoryDataClient::Channel(); this will likely "
437 "cause a crash!";
438 return nullptr;
439 }
440 } // namespace tensorflow
441