1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <chrono>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <vector>
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 
36 #include "src/core/lib/gpr/host_port.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/interarrival.h"
41 #include "test/cpp/qps/usage_timer.h"
42 
43 namespace grpc {
44 namespace testing {
45 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)46 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
47     const std::shared_ptr<Channel>& ch) {
48   return BenchmarkService::NewStub(ch);
49 }
50 
51 class SynchronousClient
52     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
53  public:
SynchronousClient(const ClientConfig & config)54   SynchronousClient(const ClientConfig& config)
55       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
56             config, BenchmarkStubCreator) {
57     num_threads_ =
58         config.outstanding_rpcs_per_channel() * config.client_channels();
59     responses_.resize(num_threads_);
60     SetupLoadTest(config, num_threads_);
61   }
62 
~SynchronousClient()63   virtual ~SynchronousClient() {}
64 
65   virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
66   virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
67 
ThreadFunc(size_t thread_idx,Thread * t)68   void ThreadFunc(size_t thread_idx, Thread* t) override {
69     if (!InitThreadFuncImpl(thread_idx)) {
70       return;
71     }
72     for (;;) {
73       // run the loop body
74       HistogramEntry entry;
75       const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
76       t->UpdateHistogram(&entry);
77       if (!thread_still_ok || ThreadCompleted()) {
78         return;
79       }
80     }
81   }
82 
83  protected:
84   // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)85   bool WaitToIssue(int thread_idx) {
86     if (!closed_loop_) {
87       const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
88       // Avoid sleeping for too long continuously because we might
89       // need to terminate before then. This is an issue since
90       // exponential distribution can occasionally produce bad outliers
91       while (true) {
92         const gpr_timespec one_sec_delay =
93             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
94                          gpr_time_from_seconds(1, GPR_TIMESPAN));
95         if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
96           gpr_sleep_until(next_issue_time);
97           return true;
98         } else {
99           gpr_sleep_until(one_sec_delay);
100           if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) {
101             return false;
102           }
103         }
104       }
105     }
106     return true;
107   }
108 
109   size_t num_threads_;
110   std::vector<SimpleResponse> responses_;
111 };
112 
113 class SynchronousUnaryClient final : public SynchronousClient {
114  public:
SynchronousUnaryClient(const ClientConfig & config)115   SynchronousUnaryClient(const ClientConfig& config)
116       : SynchronousClient(config) {
117     StartThreads(num_threads_);
118   }
~SynchronousUnaryClient()119   ~SynchronousUnaryClient() {}
120 
InitThreadFuncImpl(size_t thread_idx)121   bool InitThreadFuncImpl(size_t thread_idx) override { return true; }
122 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)123   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
124     if (!WaitToIssue(thread_idx)) {
125       return true;
126     }
127     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
128     double start = UsageTimer::Now();
129     GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0);
130     grpc::ClientContext context;
131     grpc::Status s =
132         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
133     if (s.ok()) {
134       entry->set_value((UsageTimer::Now() - start) * 1e9);
135     }
136     entry->set_status(s.error_code());
137     return true;
138   }
139 
140  private:
DestroyMultithreading()141   void DestroyMultithreading() override final { EndThreads(); }
142 };
143 
144 template <class StreamType>
145 class SynchronousStreamingClient : public SynchronousClient {
146  public:
SynchronousStreamingClient(const ClientConfig & config)147   SynchronousStreamingClient(const ClientConfig& config)
148       : SynchronousClient(config),
149         context_(num_threads_),
150         stream_(num_threads_),
151         stream_mu_(num_threads_),
152         shutdown_(num_threads_),
153         messages_per_stream_(config.messages_per_stream()),
154         messages_issued_(num_threads_) {
155     StartThreads(num_threads_);
156   }
~SynchronousStreamingClient()157   virtual ~SynchronousStreamingClient() {
158     CleanupAllStreams([this](size_t thread_idx) {
159       // Don't log any kind of error since we may have canceled this
160       stream_[thread_idx]->Finish().IgnoreError();
161     });
162   }
163 
164  protected:
165   std::vector<grpc::ClientContext> context_;
166   std::vector<std::unique_ptr<StreamType>> stream_;
167   // stream_mu_ is only needed when changing an element of stream_ or context_
168   std::vector<std::mutex> stream_mu_;
169   // use struct Bool rather than bool because vector<bool> is not concurrent
170   struct Bool {
171     bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool172     Bool() : val(false) {}
173   };
174   std::vector<Bool> shutdown_;
175   const int messages_per_stream_;
176   std::vector<int> messages_issued_;
177 
FinishStream(HistogramEntry * entry,size_t thread_idx)178   void FinishStream(HistogramEntry* entry, size_t thread_idx) {
179     Status s = stream_[thread_idx]->Finish();
180     // don't set the value since the stream is failed and shouldn't be timed
181     entry->set_status(s.error_code());
182     if (!s.ok()) {
183       std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
184       if (!shutdown_[thread_idx].val) {
185         gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
186                 thread_idx, s.error_message().c_str());
187       }
188     }
189     // Lock the stream_mu_ now because the client context could change
190     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
191     context_[thread_idx].~ClientContext();
192     new (&context_[thread_idx]) ClientContext();
193   }
194 
CleanupAllStreams(const std::function<void (size_t)> & cleaner)195   void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
196     std::vector<std::thread> cleanup_threads;
197     for (size_t i = 0; i < num_threads_; i++) {
198       cleanup_threads.emplace_back([this, i, cleaner] {
199         std::lock_guard<std::mutex> l(stream_mu_[i]);
200         shutdown_[i].val = true;
201         if (stream_[i]) {
202           cleaner(i);
203         }
204       });
205     }
206     for (auto& th : cleanup_threads) {
207       th.join();
208     }
209   }
210 
211  private:
DestroyMultithreading()212   void DestroyMultithreading() override final {
213     CleanupAllStreams(
214         [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
215     EndThreads();
216   }
217 };
218 
219 class SynchronousStreamingPingPongClient final
220     : public SynchronousStreamingClient<
221           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
222  public:
SynchronousStreamingPingPongClient(const ClientConfig & config)223   SynchronousStreamingPingPongClient(const ClientConfig& config)
224       : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()225   ~SynchronousStreamingPingPongClient() {
226     CleanupAllStreams(
227         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
228   }
229 
230  private:
InitThreadFuncImpl(size_t thread_idx)231   bool InitThreadFuncImpl(size_t thread_idx) override {
232     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
233     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
234     if (!shutdown_[thread_idx].val) {
235       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
236     } else {
237       return false;
238     }
239     messages_issued_[thread_idx] = 0;
240     return true;
241   }
242 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)243   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
244     if (!WaitToIssue(thread_idx)) {
245       return true;
246     }
247     GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0);
248     double start = UsageTimer::Now();
249     if (stream_[thread_idx]->Write(request_) &&
250         stream_[thread_idx]->Read(&responses_[thread_idx])) {
251       entry->set_value((UsageTimer::Now() - start) * 1e9);
252       // don't set the status since there isn't one yet
253       if ((messages_per_stream_ != 0) &&
254           (++messages_issued_[thread_idx] < messages_per_stream_)) {
255         return true;
256       } else if (messages_per_stream_ == 0) {
257         return true;
258       } else {
259         // Fall through to the below resetting code after finish
260       }
261     }
262     stream_[thread_idx]->WritesDone();
263     FinishStream(entry, thread_idx);
264     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
265     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
266     if (!shutdown_[thread_idx].val) {
267       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
268     } else {
269       stream_[thread_idx].reset();
270       return false;
271     }
272     messages_issued_[thread_idx] = 0;
273     return true;
274   }
275 };
276 
277 class SynchronousStreamingFromClientClient final
278     : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
279  public:
SynchronousStreamingFromClientClient(const ClientConfig & config)280   SynchronousStreamingFromClientClient(const ClientConfig& config)
281       : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()282   ~SynchronousStreamingFromClientClient() {
283     CleanupAllStreams(
284         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
285   }
286 
287  private:
288   std::vector<double> last_issue_;
289 
InitThreadFuncImpl(size_t thread_idx)290   bool InitThreadFuncImpl(size_t thread_idx) override {
291     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
292     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
293     if (!shutdown_[thread_idx].val) {
294       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
295                                                       &responses_[thread_idx]);
296     } else {
297       return false;
298     }
299     last_issue_[thread_idx] = UsageTimer::Now();
300     return true;
301   }
302 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)303   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
304     // Figure out how to make histogram sensible if this is rate-paced
305     if (!WaitToIssue(thread_idx)) {
306       return true;
307     }
308     GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0);
309     if (stream_[thread_idx]->Write(request_)) {
310       double now = UsageTimer::Now();
311       entry->set_value((now - last_issue_[thread_idx]) * 1e9);
312       last_issue_[thread_idx] = now;
313       return true;
314     }
315     stream_[thread_idx]->WritesDone();
316     FinishStream(entry, thread_idx);
317     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
318     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
319     if (!shutdown_[thread_idx].val) {
320       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
321                                                       &responses_[thread_idx]);
322     } else {
323       stream_[thread_idx].reset();
324       return false;
325     }
326     return true;
327   }
328 };
329 
330 class SynchronousStreamingFromServerClient final
331     : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
332  public:
SynchronousStreamingFromServerClient(const ClientConfig & config)333   SynchronousStreamingFromServerClient(const ClientConfig& config)
334       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()335   ~SynchronousStreamingFromServerClient() {}
336 
337  private:
338   std::vector<double> last_recv_;
339 
InitThreadFuncImpl(size_t thread_idx)340   bool InitThreadFuncImpl(size_t thread_idx) override {
341     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
342     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
343     if (!shutdown_[thread_idx].val) {
344       stream_[thread_idx] =
345           stub->StreamingFromServer(&context_[thread_idx], request_);
346     } else {
347       return false;
348     }
349     last_recv_[thread_idx] = UsageTimer::Now();
350     return true;
351   }
352 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)353   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
354     GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
355     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
356       double now = UsageTimer::Now();
357       entry->set_value((now - last_recv_[thread_idx]) * 1e9);
358       last_recv_[thread_idx] = now;
359       return true;
360     }
361     FinishStream(entry, thread_idx);
362     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
363     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
364     if (!shutdown_[thread_idx].val) {
365       stream_[thread_idx] =
366           stub->StreamingFromServer(&context_[thread_idx], request_);
367     } else {
368       stream_[thread_idx].reset();
369       return false;
370     }
371     return true;
372   }
373 };
374 
375 class SynchronousStreamingBothWaysClient final
376     : public SynchronousStreamingClient<
377           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
378  public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)379   SynchronousStreamingBothWaysClient(const ClientConfig& config)
380       : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()381   ~SynchronousStreamingBothWaysClient() {
382     CleanupAllStreams(
383         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
384   }
385 
386  private:
InitThreadFuncImpl(size_t thread_idx)387   bool InitThreadFuncImpl(size_t thread_idx) override {
388     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
389     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
390     if (!shutdown_[thread_idx].val) {
391       stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
392     } else {
393       return false;
394     }
395     return true;
396   }
397 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)398   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
399     // TODO (vjpai): Do this
400     return true;
401   }
402 };
403 
CreateSynchronousClient(const ClientConfig & config)404 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
405   GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.
406   switch (config.rpc_type()) {
407     case UNARY:
408       return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
409     case STREAMING:
410       return std::unique_ptr<Client>(
411           new SynchronousStreamingPingPongClient(config));
412     case STREAMING_FROM_CLIENT:
413       return std::unique_ptr<Client>(
414           new SynchronousStreamingFromClientClient(config));
415     case STREAMING_FROM_SERVER:
416       return std::unique_ptr<Client>(
417           new SynchronousStreamingFromServerClient(config));
418     case STREAMING_BOTH_WAYS:
419       return std::unique_ptr<Client>(
420           new SynchronousStreamingBothWaysClient(config));
421     default:
422       assert(false);
423       return nullptr;
424   }
425 }
426 
427 }  // namespace testing
428 }  // namespace grpc
429