1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef TEST_QPS_CLIENT_H
20 #define TEST_QPS_CLIENT_H
21 
22 #include <stdlib.h>
23 
24 #include <condition_variable>
25 #include <mutex>
26 #include <unordered_map>
27 #include <vector>
28 
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/support/byte_buffer.h>
33 #include <grpcpp/support/channel_arguments.h>
34 #include <grpcpp/support/slice.h>
35 
36 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
37 #include "src/proto/grpc/testing/payloads.pb.h"
38 
39 #include "src/core/lib/gpr/env.h"
40 #include "src/cpp/util/core_stats.h"
41 #include "test/cpp/qps/histogram.h"
42 #include "test/cpp/qps/interarrival.h"
43 #include "test/cpp/qps/qps_worker.h"
44 #include "test/cpp/qps/server.h"
45 #include "test/cpp/qps/usage_timer.h"
46 #include "test/cpp/util/create_test_channel.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48 
49 #define INPROC_NAME_PREFIX "qpsinproc:"
50 
51 namespace grpc {
52 namespace testing {
53 
54 template <class RequestType>
55 class ClientRequestCreator {
56  public:
ClientRequestCreator(RequestType * req,const PayloadConfig &)57   ClientRequestCreator(RequestType* req, const PayloadConfig&) {
58     // this template must be specialized
59     // fail with an assertion rather than a compile-time
60     // check since these only happen at the beginning anyway
61     GPR_ASSERT(false);
62   }
63 };
64 
65 template <>
66 class ClientRequestCreator<SimpleRequest> {
67  public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)68   ClientRequestCreator(SimpleRequest* req,
69                        const PayloadConfig& payload_config) {
70     if (payload_config.has_bytebuf_params()) {
71       GPR_ASSERT(false);  // not appropriate for this specialization
72     } else if (payload_config.has_simple_params()) {
73       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
74       req->set_response_size(payload_config.simple_params().resp_size());
75       req->mutable_payload()->set_type(
76           grpc::testing::PayloadType::COMPRESSABLE);
77       int size = payload_config.simple_params().req_size();
78       std::unique_ptr<char[]> body(new char[size]);
79       req->mutable_payload()->set_body(body.get(), size);
80     } else if (payload_config.has_complex_params()) {
81       GPR_ASSERT(false);  // not appropriate for this specialization
82     } else {
83       // default should be simple proto without payloads
84       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
85       req->set_response_size(0);
86       req->mutable_payload()->set_type(
87           grpc::testing::PayloadType::COMPRESSABLE);
88     }
89   }
90 };
91 
92 template <>
93 class ClientRequestCreator<ByteBuffer> {
94  public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)95   ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
96     if (payload_config.has_bytebuf_params()) {
97       std::unique_ptr<char[]> buf(
98           new char[payload_config.bytebuf_params().req_size()]);
99       Slice slice(buf.get(), payload_config.bytebuf_params().req_size());
100       *req = ByteBuffer(&slice, 1);
101     } else {
102       GPR_ASSERT(false);  // not appropriate for this specialization
103     }
104   }
105 };
106 
107 class HistogramEntry final {
108  public:
HistogramEntry()109   HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()110   bool value_used() const { return value_used_; }
value()111   double value() const { return value_; }
set_value(double v)112   void set_value(double v) {
113     value_used_ = true;
114     value_ = v;
115   }
status_used()116   bool status_used() const { return status_used_; }
status()117   int status() const { return status_; }
set_status(int status)118   void set_status(int status) {
119     status_used_ = true;
120     status_ = status;
121   }
122 
123  private:
124   bool value_used_;
125   double value_;
126   bool status_used_;
127   int status_;
128 };
129 
130 typedef std::unordered_map<int, int64_t> StatusHistogram;
131 
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)132 inline void MergeStatusHistogram(const StatusHistogram& from,
133                                  StatusHistogram* to) {
134   for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
135        ++it) {
136     (*to)[it->first] += it->second;
137   }
138 }
139 
140 class Client {
141  public:
Client()142   Client()
143       : timer_(new UsageTimer),
144         interarrival_timer_(),
145         started_requests_(false),
146         last_reset_poll_count_(0) {
147     gpr_event_init(&start_requests_);
148   }
~Client()149   virtual ~Client() {}
150 
Mark(bool reset)151   ClientStats Mark(bool reset) {
152     Histogram latencies;
153     StatusHistogram statuses;
154     UsageTimer::Result timer_result;
155 
156     MaybeStartRequests();
157 
158     int cur_poll_count = GetPollCount();
159     int poll_count = cur_poll_count - last_reset_poll_count_;
160     if (reset) {
161       std::vector<Histogram> to_merge(threads_.size());
162       std::vector<StatusHistogram> to_merge_status(threads_.size());
163 
164       for (size_t i = 0; i < threads_.size(); i++) {
165         threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
166       }
167       std::unique_ptr<UsageTimer> timer(new UsageTimer);
168       timer_.swap(timer);
169       for (size_t i = 0; i < threads_.size(); i++) {
170         latencies.Merge(to_merge[i]);
171         MergeStatusHistogram(to_merge_status[i], &statuses);
172       }
173       timer_result = timer->Mark();
174       last_reset_poll_count_ = cur_poll_count;
175     } else {
176       // merge snapshots of each thread histogram
177       for (size_t i = 0; i < threads_.size(); i++) {
178         threads_[i]->MergeStatsInto(&latencies, &statuses);
179       }
180       timer_result = timer_->Mark();
181     }
182 
183     // Print the median latency per interval for one thread.
184     // If the number of warmup seconds is x, then the first x + 1 numbers in the
185     // vector are from the warmup period and should be discarded.
186     if (median_latency_collection_interval_seconds_ > 0) {
187       std::vector<double> medians_per_interval =
188           threads_[0]->GetMedianPerIntervalList();
189       gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
190       gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
191       for (size_t j = 0; j < medians_per_interval.size(); j++) {
192         gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
193       }
194     }
195 
196     grpc_stats_data core_stats;
197     grpc_stats_collect(&core_stats);
198 
199     ClientStats stats;
200     latencies.FillProto(stats.mutable_latencies());
201     for (StatusHistogram::const_iterator it = statuses.begin();
202          it != statuses.end(); ++it) {
203       RequestResultCount* rrc = stats.add_request_results();
204       rrc->set_status_code(it->first);
205       rrc->set_count(it->second);
206     }
207     stats.set_time_elapsed(timer_result.wall);
208     stats.set_time_system(timer_result.system);
209     stats.set_time_user(timer_result.user);
210     stats.set_cq_poll_count(poll_count);
211     CoreStatsToProto(core_stats, stats.mutable_core_stats());
212     return stats;
213   }
214 
215   // Must call AwaitThreadsCompletion before destructor to avoid a race
216   // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()217   void AwaitThreadsCompletion() {
218     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
219     DestroyMultithreading();
220     std::unique_lock<std::mutex> g(thread_completion_mu_);
221     while (threads_remaining_ != 0) {
222       threads_complete_.wait(g);
223     }
224   }
225 
226   // Returns the interval (in seconds) between collecting latency medians. If 0,
227   // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()228   double GetLatencyCollectionIntervalInSeconds() {
229     return median_latency_collection_interval_seconds_;
230   }
231 
GetPollCount()232   virtual int GetPollCount() {
233     // For sync client.
234     return 0;
235   }
236 
237  protected:
238   bool closed_loop_;
239   gpr_atm thread_pool_done_;
240   double median_latency_collection_interval_seconds_;  // In seconds
241 
StartThreads(size_t num_threads)242   void StartThreads(size_t num_threads) {
243     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
244     threads_remaining_ = num_threads;
245     for (size_t i = 0; i < num_threads; i++) {
246       threads_.emplace_back(new Thread(this, i));
247     }
248   }
249 
EndThreads()250   void EndThreads() {
251     MaybeStartRequests();
252     threads_.clear();
253   }
254 
255   virtual void DestroyMultithreading() = 0;
256 
SetupLoadTest(const ClientConfig & config,size_t num_threads)257   void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
258     // Set up the load distribution based on the number of threads
259     const auto& load = config.load_params();
260 
261     std::unique_ptr<RandomDistInterface> random_dist;
262     switch (load.load_case()) {
263       case LoadParams::kClosedLoop:
264         // Closed-loop doesn't use random dist at all
265         break;
266       case LoadParams::kPoisson:
267         random_dist.reset(
268             new ExpDist(load.poisson().offered_load() / num_threads));
269         break;
270       default:
271         GPR_ASSERT(false);
272     }
273 
274     // Set closed_loop_ based on whether or not random_dist is set
275     if (!random_dist) {
276       closed_loop_ = true;
277     } else {
278       closed_loop_ = false;
279       // set up interarrival timer according to random dist
280       interarrival_timer_.init(*random_dist, num_threads);
281       const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
282       for (size_t i = 0; i < num_threads; i++) {
283         next_time_.push_back(gpr_time_add(
284             now,
285             gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
286       }
287     }
288   }
289 
NextIssueTime(int thread_idx)290   gpr_timespec NextIssueTime(int thread_idx) {
291     const gpr_timespec result = next_time_[thread_idx];
292     next_time_[thread_idx] =
293         gpr_time_add(next_time_[thread_idx],
294                      gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
295                                          GPR_TIMESPAN));
296     return result;
297   }
NextIssuer(int thread_idx)298   std::function<gpr_timespec()> NextIssuer(int thread_idx) {
299     return closed_loop_ ? std::function<gpr_timespec()>()
300                         : std::bind(&Client::NextIssueTime, this, thread_idx);
301   }
302 
303   class Thread {
304    public:
Thread(Client * client,size_t idx)305     Thread(Client* client, size_t idx)
306         : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
307 
~Thread()308     ~Thread() { impl_.join(); }
309 
BeginSwap(Histogram * n,StatusHistogram * s)310     void BeginSwap(Histogram* n, StatusHistogram* s) {
311       std::lock_guard<std::mutex> g(mu_);
312       n->Swap(&histogram_);
313       s->swap(statuses_);
314     }
315 
MergeStatsInto(Histogram * hist,StatusHistogram * s)316     void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
317       std::unique_lock<std::mutex> g(mu_);
318       hist->Merge(histogram_);
319       MergeStatusHistogram(statuses_, s);
320     }
321 
GetMedianPerIntervalList()322     std::vector<double> GetMedianPerIntervalList() {
323       return medians_each_interval_list_;
324     }
325 
UpdateHistogram(HistogramEntry * entry)326     void UpdateHistogram(HistogramEntry* entry) {
327       std::lock_guard<std::mutex> g(mu_);
328       if (entry->value_used()) {
329         histogram_.Add(entry->value());
330         if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
331           histogram_per_interval_.Add(entry->value());
332           double now = UsageTimer::Now();
333           if ((now - interval_start_time_) >=
334               client_->GetLatencyCollectionIntervalInSeconds()) {
335             // Record the median latency of requests from the last interval.
336             // Divide by 1e3 to get microseconds.
337             medians_each_interval_list_.push_back(
338                 histogram_per_interval_.Percentile(50) / 1e3);
339             histogram_per_interval_.Reset();
340             interval_start_time_ = now;
341           }
342         }
343       }
344       if (entry->status_used()) {
345         statuses_[entry->status()]++;
346       }
347     }
348 
349    private:
350     Thread(const Thread&);
351     Thread& operator=(const Thread&);
352 
ThreadFunc()353     void ThreadFunc() {
354       int wait_loop = 0;
355       while (!gpr_event_wait(
356           &client_->start_requests_,
357           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
358                        gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
359         gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
360                 idx_, wait_loop);
361         wait_loop++;
362       }
363 
364       client_->ThreadFunc(idx_, this);
365       client_->CompleteThread();
366     }
367 
368     std::mutex mu_;
369     Histogram histogram_;
370     StatusHistogram statuses_;
371     Client* client_;
372     const size_t idx_;
373     std::thread impl_;
374     // The following are used only if
375     // median_latency_collection_interval_seconds_ is greater than 0
376     Histogram histogram_per_interval_;
377     std::vector<double> medians_each_interval_list_;
378     double interval_start_time_;
379   };
380 
ThreadCompleted()381   bool ThreadCompleted() {
382     return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
383   }
384 
385   virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
386 
387   std::vector<std::unique_ptr<Thread>> threads_;
388   std::unique_ptr<UsageTimer> timer_;
389 
390   InterarrivalTimer interarrival_timer_;
391   std::vector<gpr_timespec> next_time_;
392 
393   std::mutex thread_completion_mu_;
394   size_t threads_remaining_;
395   std::condition_variable threads_complete_;
396 
397   gpr_event start_requests_;
398   bool started_requests_;
399 
400   int last_reset_poll_count_;
401 
MaybeStartRequests()402   void MaybeStartRequests() {
403     if (!started_requests_) {
404       started_requests_ = true;
405       gpr_event_set(&start_requests_, (void*)1);
406     }
407   }
408 
CompleteThread()409   void CompleteThread() {
410     std::lock_guard<std::mutex> g(thread_completion_mu_);
411     threads_remaining_--;
412     if (threads_remaining_ == 0) {
413       threads_complete_.notify_all();
414     }
415   }
416 };
417 
418 template <class StubType, class RequestType>
419 class ClientImpl : public Client {
420  public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)421   ClientImpl(const ClientConfig& config,
422              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
423                  create_stub)
424       : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
425     for (int i = 0; i < config.client_channels(); i++) {
426       channels_.emplace_back(
427           config.server_targets(i % config.server_targets_size()), config,
428           create_stub_, i);
429     }
430     std::vector<std::unique_ptr<std::thread>> connecting_threads;
431     for (auto& c : channels_) {
432       connecting_threads.emplace_back(c.WaitForReady());
433     }
434     for (auto& t : connecting_threads) {
435       t->join();
436     }
437     median_latency_collection_interval_seconds_ =
438         config.median_latency_collection_interval_millis() / 1e3;
439     ClientRequestCreator<RequestType> create_req(&request_,
440                                                  config.payload_config());
441   }
~ClientImpl()442   virtual ~ClientImpl() {}
443 
444  protected:
445   const int cores_;
446   RequestType request_;
447 
448   class ClientChannelInfo {
449    public:
ClientChannelInfo(const grpc::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)450     ClientChannelInfo(
451         const grpc::string& target, const ClientConfig& config,
452         std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
453             create_stub,
454         int shard) {
455       ChannelArguments args;
456       args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
457       set_channel_args(config, &args);
458 
459       grpc::string type;
460       if (config.has_security_params() &&
461           config.security_params().cred_type().empty()) {
462         type = kTlsCredentialsType;
463       } else {
464         type = config.security_params().cred_type();
465       }
466 
467       grpc::string inproc_pfx(INPROC_NAME_PREFIX);
468       if (target.find(inproc_pfx) != 0) {
469         channel_ = CreateTestChannel(
470             target, type, config.security_params().server_host_override(),
471             !config.security_params().use_test_ca(),
472             std::shared_ptr<CallCredentials>(), args);
473         gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
474         is_inproc_ = false;
475       } else {
476         grpc::string tgt = target;
477         tgt.erase(0, inproc_pfx.length());
478         int srv_num = std::stoi(tgt);
479         channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
480         is_inproc_ = true;
481       }
482       stub_ = create_stub(channel_);
483     }
get_channel()484     Channel* get_channel() { return channel_.get(); }
get_stub()485     StubType* get_stub() { return stub_.get(); }
486 
WaitForReady()487     std::unique_ptr<std::thread> WaitForReady() {
488       return std::unique_ptr<std::thread>(new std::thread([this]() {
489         if (!is_inproc_) {
490           int connect_deadline = 10;
491           /* Allow optionally overriding connect_deadline in order
492            * to deal with benchmark environments in which the server
493            * can take a long time to become ready. */
494           char* channel_connect_timeout_str =
495               gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
496           if (channel_connect_timeout_str != nullptr &&
497               strcmp(channel_connect_timeout_str, "") != 0) {
498             connect_deadline = atoi(channel_connect_timeout_str);
499           }
500           gpr_log(GPR_INFO,
501                   "Waiting for up to %d seconds for the channel %p to connect",
502                   connect_deadline, channel_.get());
503           gpr_free(channel_connect_timeout_str);
504           GPR_ASSERT(channel_->WaitForConnected(gpr_time_add(
505               gpr_now(GPR_CLOCK_REALTIME),
506               gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN))));
507           gpr_log(GPR_INFO, "Channel %p connected!", channel_.get());
508         }
509       }));
510     }
511 
512    private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)513     void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
514       for (const auto& channel_arg : config.channel_args()) {
515         if (channel_arg.value_case() == ChannelArg::kStrValue) {
516           args->SetString(channel_arg.name(), channel_arg.str_value());
517         } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
518           args->SetInt(channel_arg.name(), channel_arg.int_value());
519         } else {
520           gpr_log(GPR_ERROR, "Empty channel arg value.");
521         }
522       }
523     }
524 
525     std::shared_ptr<Channel> channel_;
526     std::unique_ptr<StubType> stub_;
527     bool is_inproc_;
528   };
529   std::vector<ClientChannelInfo> channels_;
530   std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
531       create_stub_;
532 };
533 
534 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);
535 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args);
536 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
537     const ClientConfig& args);
538 
539 }  // namespace testing
540 }  // namespace grpc
541 
542 #endif
543