1 
2 /*
3  * Copyright (C) 2020 The Android Open Source Project
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #include "src/trace_processor/rpc/query_result_serializer.h"
19 
20 #include <deque>
21 #include <ostream>
22 #include <random>
23 #include <string>
24 #include <vector>
25 
26 #include "perfetto/ext/base/string_utils.h"
27 #include "perfetto/trace_processor/basic_types.h"
28 #include "perfetto/trace_processor/trace_processor.h"
29 #include "test/gtest_and_gmock.h"
30 
31 #include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
32 
33 namespace perfetto {
34 namespace trace_processor {
35 
36 // For ASSERT_THAT(ElementsAre(...))
operator ==(const SqlValue & a,const SqlValue & b)37 inline bool operator==(const SqlValue& a, const SqlValue& b) {
38   if (a.type != b.type)
39     return false;
40   if (a.type == SqlValue::kString)
41     return strcmp(a.string_value, b.string_value) == 0;
42   if (a.type == SqlValue::kBytes) {
43     if (a.bytes_count != b.bytes_count)
44       return false;
45     return memcmp(a.bytes_value, b.bytes_value, a.bytes_count) == 0;
46   }
47   return a.long_value == b.long_value;
48 }
49 
operator <<(std::ostream & stream,const SqlValue & v)50 inline std::ostream& operator<<(std::ostream& stream, const SqlValue& v) {
51   stream << "SqlValue{";
52   switch (v.type) {
53     case SqlValue::kString:
54       return stream << "\"" << v.string_value << "\"}";
55     case SqlValue::kBytes:
56       return stream << "Bytes[" << v.bytes_count << "]:"
57                     << base::ToHex(reinterpret_cast<const char*>(v.bytes_value),
58                                    v.bytes_count)
59                     << "}";
60     case SqlValue::kLong:
61       return stream << "Long " << v.long_value << "}";
62     case SqlValue::kDouble:
63       return stream << "Double " << v.double_value << "}";
64     case SqlValue::kNull:
65       return stream << "NULL}";
66   }
67   return stream;
68 }
69 
70 namespace {
71 
72 using ::testing::ElementsAre;
73 using BatchProto = protos::pbzero::QueryResult::CellsBatch;
74 using ResultProto = protos::pbzero::QueryResult;
75 
RunQueryChecked(TraceProcessor * tp,const std::string & query)76 void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
77   auto iter = tp->ExecuteQuery(query);
78   iter.Next();
79   ASSERT_TRUE(iter.Status().ok()) << iter.Status().message();
80 }
81 
82 // Implements a minimal deserializer for QueryResultSerializer.
83 class TestDeserializer {
84  public:
85   void SerializeAndDeserialize(QueryResultSerializer*);
86   void DeserializeBuffer(const uint8_t* start, size_t size);
87 
88   std::vector<std::string> columns;
89   std::vector<SqlValue> cells;
90   std::string error;
91   bool eof_reached = false;
92 
93  private:
94   std::vector<std::unique_ptr<char[]>> copied_buf_;
95 };
96 
SerializeAndDeserialize(QueryResultSerializer * serializer)97 void TestDeserializer::SerializeAndDeserialize(
98     QueryResultSerializer* serializer) {
99   std::vector<uint8_t> buf;
100   error.clear();
101   for (eof_reached = false; !eof_reached;) {
102     serializer->Serialize(&buf);
103     DeserializeBuffer(buf.data(), buf.size());
104     buf.clear();
105   }
106 }
107 
DeserializeBuffer(const uint8_t * start,size_t size)108 void TestDeserializer::DeserializeBuffer(const uint8_t* start, size_t size) {
109   ResultProto::Decoder result(start, size);
110   error += result.error().ToStdString();
111   for (auto it = result.column_names(); it; ++it)
112     columns.push_back(it->as_std_string());
113 
114   for (auto batch_it = result.batch(); batch_it; ++batch_it) {
115     ASSERT_FALSE(eof_reached);
116     auto batch_bytes = batch_it->as_bytes();
117 
118     ResultProto::CellsBatch::Decoder batch(batch_bytes.data, batch_bytes.size);
119     eof_reached = batch.is_last_batch();
120     std::deque<int64_t> varints;
121     std::deque<double> doubles;
122     std::deque<std::string> blobs;
123 
124     bool parse_error = false;
125     for (auto it = batch.varint_cells(&parse_error); it; ++it)
126       varints.emplace_back(*it);
127 
128     for (auto it = batch.float64_cells(&parse_error); it; ++it)
129       doubles.emplace_back(*it);
130 
131     for (auto it = batch.blob_cells(); it; ++it)
132       blobs.emplace_back((*it).ToStdString());
133 
134     std::string merged_strings = batch.string_cells().ToStdString();
135     std::deque<std::string> strings;
136     for (size_t pos = 0; pos < merged_strings.size();) {
137       // Will return npos for the last string, but it's fine
138       size_t next_sep = merged_strings.find('\0', pos);
139       strings.emplace_back(merged_strings.substr(pos, next_sep - pos));
140       pos = next_sep == std::string::npos ? next_sep : next_sep + 1;
141     }
142 
143     uint32_t num_cells = 0;
144     for (auto it = batch.cells(&parse_error); it; ++it, ++num_cells) {
145       uint8_t cell_type = static_cast<uint8_t>(*it);
146       switch (cell_type) {
147         case BatchProto::CELL_INVALID:
148           break;
149         case BatchProto::CELL_NULL:
150           cells.emplace_back(SqlValue());
151           break;
152         case BatchProto::CELL_VARINT:
153           ASSERT_GT(varints.size(), 0u);
154           cells.emplace_back(SqlValue::Long(varints.front()));
155           varints.pop_front();
156           break;
157         case BatchProto::CELL_FLOAT64:
158           ASSERT_GT(doubles.size(), 0u);
159           cells.emplace_back(SqlValue::Double(doubles.front()));
160           doubles.pop_front();
161           break;
162         case BatchProto::CELL_STRING: {
163           ASSERT_GT(strings.size(), 0u);
164           const std::string& str = strings.front();
165           copied_buf_.emplace_back(new char[str.size() + 1]);
166           char* new_buf = copied_buf_.back().get();
167           memcpy(new_buf, str.c_str(), str.size() + 1);
168           cells.emplace_back(SqlValue::String(new_buf));
169           strings.pop_front();
170           break;
171         }
172         case BatchProto::CELL_BLOB: {
173           ASSERT_GT(blobs.size(), 0u);
174           auto bytes = blobs.front();
175           copied_buf_.emplace_back(new char[bytes.size()]);
176           memcpy(copied_buf_.back().get(), bytes.data(), bytes.size());
177           cells.emplace_back(
178               SqlValue::Bytes(copied_buf_.back().get(), bytes.size()));
179           blobs.pop_front();
180           break;
181         }
182         default:
183           FAIL() << "Unknown cell type " << cell_type;
184       }
185 
186       EXPECT_FALSE(parse_error);
187     }
188     if (columns.empty()) {
189       EXPECT_EQ(num_cells, 0u);
190     } else {
191       EXPECT_EQ(num_cells % columns.size(), 0u);
192     }
193   }
194 }
195 
TEST(QueryResultSerializerTest,ShortBatch)196 TEST(QueryResultSerializerTest, ShortBatch) {
197   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
198 
199   auto iter = tp->ExecuteQuery(
200       "select 1 as i8, 128 as i16, 100000 as i32, 42001001001 as i64, 1e9 as "
201       "f64, 'a_string' as str, cast('a_blob' as blob) as blb");
202   QueryResultSerializer ser(std::move(iter));
203   TestDeserializer deser;
204   deser.SerializeAndDeserialize(&ser);
205 
206   EXPECT_THAT(deser.columns,
207               ElementsAre("i8", "i16", "i32", "i64", "f64", "str", "blb"));
208   EXPECT_THAT(deser.cells,
209               ElementsAre(SqlValue::Long(1), SqlValue::Long(128),
210                           SqlValue::Long(100000), SqlValue::Long(42001001001),
211                           SqlValue::Double(1e9), SqlValue::String("a_string"),
212                           SqlValue::Bytes("a_blob", 6)));
213 }
214 
TEST(QueryResultSerializerTest,LongBatch)215 TEST(QueryResultSerializerTest, LongBatch) {
216   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
217 
218   RunQueryChecked(tp.get(), "create virtual table win using window;");
219   RunQueryChecked(tp.get(),
220                   "update win set window_start=0, window_dur=8192, quantum=1 "
221                   "where rowid = 0");
222 
223   auto iter = tp->ExecuteQuery(
224       "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
225   QueryResultSerializer ser(std::move(iter));
226 
227   TestDeserializer deser;
228   deser.SerializeAndDeserialize(&ser);
229 
230   ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
231   ASSERT_EQ(deser.cells.size(), 4 * 8192u);
232   for (uint32_t row = 0; row < 1024; row++) {
233     uint32_t cell = row * 4;
234     ASSERT_EQ(deser.cells[cell].type, SqlValue::kString);
235     ASSERT_STREQ(deser.cells[cell].string_value, "x");
236 
237     ASSERT_EQ(deser.cells[cell + 1].type, SqlValue::kLong);
238     ASSERT_EQ(deser.cells[cell + 1].long_value, row);
239 
240     ASSERT_EQ(deser.cells[cell + 2].type, SqlValue::kDouble);
241     ASSERT_EQ(deser.cells[cell + 2].double_value, 1.0);
242 
243     ASSERT_EQ(deser.cells[cell + 3].type, SqlValue::kLong);
244     ASSERT_EQ(deser.cells[cell + 3].long_value, row);
245   }
246 }
247 
TEST(QueryResultSerializerTest,BatchSaturatingBinaryPayload)248 TEST(QueryResultSerializerTest, BatchSaturatingBinaryPayload) {
249   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
250 
251   RunQueryChecked(tp.get(), "create virtual table win using window;");
252   RunQueryChecked(tp.get(),
253                   "update win set window_start=0, window_dur=1024, quantum=1 "
254                   "where rowid = 0");
255   auto iter = tp->ExecuteQuery(
256       "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
257   QueryResultSerializer ser(std::move(iter));
258   ser.set_batch_size_for_testing(1024, 32);
259 
260   TestDeserializer deser;
261   deser.SerializeAndDeserialize(&ser);
262 
263   ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
264   ASSERT_EQ(deser.cells.size(), 1024 * 4u);
265 }
266 
TEST(QueryResultSerializerTest,BatchSaturatingNumCells)267 TEST(QueryResultSerializerTest, BatchSaturatingNumCells) {
268   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
269 
270   RunQueryChecked(tp.get(), "create virtual table win using window;");
271   RunQueryChecked(tp.get(),
272                   "update win set window_start=0, window_dur=4, quantum=1 "
273                   "where rowid = 0");
274   auto iter = tp->ExecuteQuery(
275       "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
276   QueryResultSerializer ser(std::move(iter));
277   ser.set_batch_size_for_testing(16, 4096);
278 
279   TestDeserializer deser;
280   deser.SerializeAndDeserialize(&ser);
281 
282   ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
283   ASSERT_EQ(deser.cells.size(), 16u);
284 }
285 
TEST(QueryResultSerializerTest,LargeStringAndBlobs)286 TEST(QueryResultSerializerTest, LargeStringAndBlobs) {
287   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
288   RunQueryChecked(tp.get(), "create table tab (colz);");
289 
290   std::minstd_rand0 rnd_engine(0);
291   std::vector<SqlValue> expected;
292   std::string sql_values;
293   std::deque<std::string> string_buf;  // Needs stable pointers
294   for (size_t n = 0; n < 32; n++) {
295     std::string very_long_str;
296     size_t len = (rnd_engine() % 4) * 32 * 1024;
297     very_long_str.resize(len);
298     for (size_t i = 0; i < very_long_str.size(); i++)
299       very_long_str[i] = 'A' + ((n * 11 + i) % 25);
300 
301     if (n % 4 == 0) {
302       sql_values += "(NULL),";
303       expected.emplace_back(SqlValue());  // NULL.
304     } else if (n % 4 == 1) {
305       // Blob
306       sql_values += "(X'" + base::ToHex(very_long_str) + "'),";
307       string_buf.emplace_back(std::move(very_long_str));
308       expected.emplace_back(
309           SqlValue::Bytes(string_buf.back().data(), string_buf.back().size()));
310     } else {
311       sql_values += "('" + very_long_str + "'),";
312       string_buf.emplace_back(std::move(very_long_str));
313       expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
314     }
315   }
316   sql_values.resize(sql_values.size() - 1);  // Remove trailing comma.
317   RunQueryChecked(tp.get(), "insert into tab (colz) values " + sql_values);
318 
319   auto iter = tp->ExecuteQuery("select colz from tab");
320   QueryResultSerializer ser(std::move(iter));
321   TestDeserializer deser;
322   deser.SerializeAndDeserialize(&ser);
323   ASSERT_EQ(deser.cells.size(), expected.size());
324   for (size_t i = 0; i < expected.size(); i++) {
325     EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
326   }
327 }
328 
TEST(QueryResultSerializerTest,RandomSizes)329 TEST(QueryResultSerializerTest, RandomSizes) {
330   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
331   static constexpr uint32_t kNumCells = 3 * 1000;
332 
333   RunQueryChecked(tp.get(), "create table tab (a, b, c);");
334   std::vector<SqlValue> expected;
335   expected.reserve(kNumCells);
336   std::deque<std::string> string_buf;  // Needs stable pointers
337   std::minstd_rand0 rnd_engine(0);
338   std::string insert_values;
339 
340   for (uint32_t i = 0; i < kNumCells; i++) {
341     const uint32_t col = i % 3;
342     if (col == 0)
343       insert_values += "(";
344     int type = rnd_engine() % 5;
345     if (type == 0) {
346       expected.emplace_back(SqlValue());  // NULL
347       insert_values += "NULL";
348     } else if (type == 1) {
349       expected.emplace_back(SqlValue::Long(static_cast<long>(rnd_engine())));
350       insert_values += std::to_string(expected.back().long_value);
351     } else if (type == 2) {
352       expected.emplace_back(
353           SqlValue::Double(static_cast<double>(rnd_engine())));
354       insert_values += std::to_string(expected.back().double_value);
355     } else if (type == 3 || type == 4) {
356       size_t len = (rnd_engine() % 5) * 32;
357       std::string rndstr;
358       rndstr.resize(len);
359       for (size_t n = 0; n < len; n++)
360         rndstr[n] = static_cast<char>(rnd_engine() % 256);
361       auto rndstr_hex = base::ToHex(rndstr);
362       if (type == 3) {
363         insert_values += "\"" + rndstr_hex + "\"";
364         string_buf.emplace_back(std::move(rndstr_hex));
365         expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
366 
367       } else {
368         insert_values += "X'" + rndstr_hex + "'";
369         string_buf.emplace_back(std::move(rndstr));
370         expected.emplace_back(SqlValue::Bytes(string_buf.back().data(),
371                                               string_buf.back().size()));
372       }
373     }
374 
375     if (col < 2) {
376       insert_values += ",";
377     } else {
378       insert_values += "),";
379       if (insert_values.size() > 1024 * 1024 || i == kNumCells - 1) {
380         insert_values[insert_values.size() - 1] = ';';
381         auto query = "insert into tab (a,b,c) values " + insert_values;
382         insert_values = "";
383         RunQueryChecked(tp.get(), query);
384       }
385     }
386   }
387 
388   // Serialize and de-serialize with different batch and payload sizes.
389   for (int rep = 0; rep < 10; rep++) {
390     auto iter = tp->ExecuteQuery("select * from tab");
391     QueryResultSerializer ser(std::move(iter));
392     uint32_t cells_per_batch = 1 << (rnd_engine() % 8 + 2);
393     uint32_t binary_payload_size = 1 << (rnd_engine() % 8 + 8);
394     ser.set_batch_size_for_testing(cells_per_batch, binary_payload_size);
395     TestDeserializer deser;
396     deser.SerializeAndDeserialize(&ser);
397     ASSERT_EQ(deser.cells.size(), expected.size());
398     for (size_t i = 0; i < expected.size(); i++) {
399       EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
400     }
401   }
402 }
403 
TEST(QueryResultSerializerTest,ErrorBeforeStartingQuery)404 TEST(QueryResultSerializerTest, ErrorBeforeStartingQuery) {
405   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
406   auto iter = tp->ExecuteQuery("insert into incomplete_input");
407   QueryResultSerializer ser(std::move(iter));
408   TestDeserializer deser;
409   deser.SerializeAndDeserialize(&ser);
410   EXPECT_EQ(deser.cells.size(), 0u);
411   EXPECT_EQ(deser.error, "incomplete input");
412   EXPECT_TRUE(deser.eof_reached);
413 }
414 
TEST(QueryResultSerializerTest,ErrorAfterSomeResults)415 TEST(QueryResultSerializerTest, ErrorAfterSomeResults) {
416   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
417   RunQueryChecked(tp.get(), "create table tab (x)");
418   RunQueryChecked(tp.get(), "insert into tab (x) values (0), (1), ('error')");
419   auto iter = tp->ExecuteQuery("select str_split('a;b', ';', x) as s from tab");
420   QueryResultSerializer ser(std::move(iter));
421   TestDeserializer deser;
422   deser.SerializeAndDeserialize(&ser);
423   EXPECT_NE(deser.error, "");
424   EXPECT_THAT(deser.cells,
425               ElementsAre(SqlValue::String("a"), SqlValue::String("b")));
426   EXPECT_TRUE(deser.eof_reached);
427 }
428 
TEST(QueryResultSerializerTest,NoResultQuery)429 TEST(QueryResultSerializerTest, NoResultQuery) {
430   auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
431   {
432     auto iter = tp->ExecuteQuery("create table tab (x)");
433     QueryResultSerializer ser(std::move(iter));
434     TestDeserializer deser;
435     deser.SerializeAndDeserialize(&ser);
436     EXPECT_EQ(deser.error, "");
437     EXPECT_EQ(deser.cells.size(), 0u);
438     EXPECT_TRUE(deser.eof_reached);
439   }
440 
441   // Check that the table has been created for real.
442   {
443     auto iter = tp->ExecuteQuery("select count(*) from tab");
444     QueryResultSerializer ser(std::move(iter));
445     TestDeserializer deser;
446     deser.SerializeAndDeserialize(&ser);
447     EXPECT_EQ(deser.error, "");
448     EXPECT_EQ(deser.cells.size(), 1u);
449     EXPECT_TRUE(deser.eof_reached);
450   }
451 }
452 
453 }  // namespace
454 }  // namespace trace_processor
455 }  // namespace perfetto
456