1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <atomic>
20 #include <memory>
21 #include <mutex>
22 #include <random>
23 #include <sstream>
24 #include <thread>
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
36 
37 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
38 #include "src/core/lib/gprpp/ref_counted_ptr.h"
39 #include "src/core/lib/gprpp/thd.h"
40 #include "src/core/lib/iomgr/sockaddr.h"
41 
42 #include "test/core/util/port.h"
43 #include "test/core/util/test_config.h"
44 #include "test/cpp/end2end/test_service_impl.h"
45 
46 #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
47 #include "src/proto/grpc/testing/echo.grpc.pb.h"
48 
49 using grpc::lb::v1::LoadBalanceRequest;
50 using grpc::lb::v1::LoadBalanceResponse;
51 using grpc::lb::v1::LoadBalancer;
52 
53 namespace grpc {
54 namespace testing {
55 namespace {
56 
57 const size_t kNumBackends = 10;
58 const size_t kNumBalancers = 5;
59 const size_t kNumClientThreads = 100;
60 const int kResolutionUpdateIntervalMs = 50;
61 const int kServerlistUpdateIntervalMs = 10;
62 const int kTestDurationSec = 30;
63 
64 using BackendServiceImpl = TestServiceImpl;
65 
66 class BalancerServiceImpl : public LoadBalancer::Service {
67  public:
68   using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
69 
BalancerServiceImpl(const std::vector<int> & all_backend_ports)70   explicit BalancerServiceImpl(const std::vector<int>& all_backend_ports)
71       : all_backend_ports_(all_backend_ports) {}
72 
BalanceLoad(ServerContext * context,Stream * stream)73   Status BalanceLoad(ServerContext* context, Stream* stream) override {
74     gpr_log(GPR_INFO, "LB[%p]: Start BalanceLoad.", this);
75     LoadBalanceRequest request;
76     stream->Read(&request);
77     while (!shutdown_) {
78       stream->Write(BuildRandomResponseForBackends());
79       std::this_thread::sleep_for(
80           std::chrono::milliseconds(kServerlistUpdateIntervalMs));
81     }
82     gpr_log(GPR_INFO, "LB[%p]: Finish BalanceLoad.", this);
83     return Status::OK;
84   }
85 
Shutdown()86   void Shutdown() { shutdown_ = true; }
87 
88  private:
Ip4ToPackedString(const char * ip_str)89   grpc::string Ip4ToPackedString(const char* ip_str) {
90     struct in_addr ip4;
91     GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
92     return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
93   }
94 
BuildRandomResponseForBackends()95   LoadBalanceResponse BuildRandomResponseForBackends() {
96     // Generate a random serverlist with varying size (if N =
97     // all_backend_ports_.size(), num_non_drop_entry is in [0, 2N],
98     // num_drop_entry is in [0, N]), order, duplicate, and drop rate.
99     size_t num_non_drop_entry =
100         std::rand() % (all_backend_ports_.size() * 2 + 1);
101     size_t num_drop_entry = std::rand() % (all_backend_ports_.size() + 1);
102     std::vector<int> random_backend_indices;
103     for (size_t i = 0; i < num_non_drop_entry; ++i) {
104       random_backend_indices.push_back(std::rand() % all_backend_ports_.size());
105     }
106     for (size_t i = 0; i < num_drop_entry; ++i) {
107       random_backend_indices.push_back(-1);
108     }
109     std::shuffle(random_backend_indices.begin(), random_backend_indices.end(),
110                  std::mt19937(std::random_device()()));
111     // Build the response according to the random list generated above.
112     LoadBalanceResponse response;
113     for (int index : random_backend_indices) {
114       auto* server = response.mutable_server_list()->add_servers();
115       if (index < 0) {
116         server->set_drop(true);
117         server->set_load_balance_token("load_balancing");
118       } else {
119         server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
120         server->set_port(all_backend_ports_[index]);
121       }
122     }
123     return response;
124   }
125 
126   std::atomic_bool shutdown_{false};
127   const std::vector<int> all_backend_ports_;
128 };
129 
130 class ClientChannelStressTest {
131  public:
Run()132   void Run() {
133     Start();
134     // Keep updating resolution for the test duration.
135     gpr_log(GPR_INFO, "Start updating resolution.");
136     const auto wait_duration =
137         std::chrono::milliseconds(kResolutionUpdateIntervalMs);
138     std::vector<AddressData> addresses;
139     auto start_time = std::chrono::steady_clock::now();
140     while (true) {
141       if (std::chrono::duration_cast<std::chrono::seconds>(
142               std::chrono::steady_clock::now() - start_time)
143               .count() > kTestDurationSec) {
144         break;
145       }
146       // Generate a random subset of balancers.
147       addresses.clear();
148       for (const auto& balancer_server : balancer_servers_) {
149         // Select each address with probability of 0.8.
150         if (std::rand() % 10 < 8) {
151           addresses.emplace_back(AddressData{balancer_server.port_, true, ""});
152         }
153       }
154       std::shuffle(addresses.begin(), addresses.end(),
155                    std::mt19937(std::random_device()()));
156       SetNextResolution(addresses);
157       std::this_thread::sleep_for(wait_duration);
158     }
159     gpr_log(GPR_INFO, "Finish updating resolution.");
160     Shutdown();
161   }
162 
163  private:
164   template <typename T>
165   struct ServerThread {
ServerThreadgrpc::testing::__anone8ea55010111::ClientChannelStressTest::ServerThread166     explicit ServerThread(const grpc::string& type,
167                           const grpc::string& server_host, T* service)
168         : type_(type), service_(service) {
169       std::mutex mu;
170       // We need to acquire the lock here in order to prevent the notify_one
171       // by ServerThread::Start from firing before the wait below is hit.
172       std::unique_lock<std::mutex> lock(mu);
173       port_ = grpc_pick_unused_port_or_die();
174       gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
175       std::condition_variable cond;
176       thread_.reset(new std::thread(
177           std::bind(&ServerThread::Start, this, server_host, &mu, &cond)));
178       cond.wait(lock);
179       gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
180     }
181 
Startgrpc::testing::__anone8ea55010111::ClientChannelStressTest::ServerThread182     void Start(const grpc::string& server_host, std::mutex* mu,
183                std::condition_variable* cond) {
184       // We need to acquire the lock here in order to prevent the notify_one
185       // below from firing before its corresponding wait is executed.
186       std::lock_guard<std::mutex> lock(*mu);
187       std::ostringstream server_address;
188       server_address << server_host << ":" << port_;
189       ServerBuilder builder;
190       builder.AddListeningPort(server_address.str(),
191                                InsecureServerCredentials());
192       builder.RegisterService(service_);
193       server_ = builder.BuildAndStart();
194       cond->notify_one();
195     }
196 
Shutdowngrpc::testing::__anone8ea55010111::ClientChannelStressTest::ServerThread197     void Shutdown() {
198       gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
199       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
200       thread_->join();
201       gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
202     }
203 
204     int port_;
205     grpc::string type_;
206     std::unique_ptr<Server> server_;
207     T* service_;
208     std::unique_ptr<std::thread> thread_;
209   };
210 
211   struct AddressData {
212     int port;
213     bool is_balancer;
214     grpc::string balancer_name;
215   };
216 
SetNextResolution(const std::vector<AddressData> & address_data)217   void SetNextResolution(const std::vector<AddressData>& address_data) {
218     grpc_core::ExecCtx exec_ctx;
219     grpc_lb_addresses* addresses =
220         grpc_lb_addresses_create(address_data.size(), nullptr);
221     for (size_t i = 0; i < address_data.size(); ++i) {
222       char* lb_uri_str;
223       gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port);
224       grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
225       GPR_ASSERT(lb_uri != nullptr);
226       grpc_lb_addresses_set_address_from_uri(
227           addresses, i, lb_uri, address_data[i].is_balancer,
228           address_data[i].balancer_name.c_str(), nullptr);
229       grpc_uri_destroy(lb_uri);
230       gpr_free(lb_uri_str);
231     }
232     grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
233     grpc_channel_args fake_result = {1, &fake_addresses};
234     response_generator_->SetResponse(&fake_result);
235     grpc_lb_addresses_destroy(addresses);
236   }
237 
KeepSendingRequests()238   void KeepSendingRequests() {
239     gpr_log(GPR_INFO, "Start sending requests.");
240     while (!shutdown_) {
241       ClientContext context;
242       context.set_deadline(grpc_timeout_milliseconds_to_deadline(1000));
243       EchoRequest request;
244       request.set_message("test");
245       EchoResponse response;
246       {
247         std::lock_guard<std::mutex> lock(stub_mutex_);
248         stub_->Echo(&context, request, &response);
249       }
250     }
251     gpr_log(GPR_INFO, "Finish sending requests.");
252   }
253 
CreateStub()254   void CreateStub() {
255     ChannelArguments args;
256     response_generator_ =
257         grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
258     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
259                     response_generator_.get());
260     std::ostringstream uri;
261     uri << "fake:///servername_not_used";
262     channel_ =
263         CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
264     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
265   }
266 
Start()267   void Start() {
268     // Start the backends.
269     std::vector<int> backend_ports;
270     for (size_t i = 0; i < kNumBackends; ++i) {
271       backends_.emplace_back(new BackendServiceImpl());
272       backend_servers_.emplace_back(ServerThread<BackendServiceImpl>(
273           "backend", server_host_, backends_.back().get()));
274       backend_ports.push_back(backend_servers_.back().port_);
275     }
276     // Start the load balancers.
277     for (size_t i = 0; i < kNumBalancers; ++i) {
278       balancers_.emplace_back(new BalancerServiceImpl(backend_ports));
279       balancer_servers_.emplace_back(ServerThread<BalancerServiceImpl>(
280           "balancer", server_host_, balancers_.back().get()));
281     }
282     // Start sending RPCs in multiple threads.
283     CreateStub();
284     for (size_t i = 0; i < kNumClientThreads; ++i) {
285       client_threads_.emplace_back(
286           std::thread(&ClientChannelStressTest::KeepSendingRequests, this));
287     }
288   }
289 
Shutdown()290   void Shutdown() {
291     shutdown_ = true;
292     for (size_t i = 0; i < client_threads_.size(); ++i) {
293       client_threads_[i].join();
294     }
295     for (size_t i = 0; i < balancers_.size(); ++i) {
296       balancers_[i]->Shutdown();
297       balancer_servers_[i].Shutdown();
298     }
299     for (size_t i = 0; i < backends_.size(); ++i) {
300       backend_servers_[i].Shutdown();
301     }
302   }
303 
304   std::atomic_bool shutdown_{false};
305   const grpc::string server_host_ = "localhost";
306   std::shared_ptr<Channel> channel_;
307   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
308   std::mutex stub_mutex_;
309   std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
310   std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
311   std::vector<ServerThread<BackendServiceImpl>> backend_servers_;
312   std::vector<ServerThread<BalancerServiceImpl>> balancer_servers_;
313   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
314       response_generator_;
315   std::vector<std::thread> client_threads_;
316 };
317 
318 }  // namespace
319 }  // namespace testing
320 }  // namespace grpc
321 
main(int argc,char ** argv)322 int main(int argc, char** argv) {
323   grpc_init();
324   grpc_test_init(argc, argv);
325   grpc::testing::ClientChannelStressTest test;
326   test.Run();
327   grpc_shutdown();
328   return 0;
329 }
330