1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <mutex>
21 #include <sstream>
22 #include <thread>
23 
24 #include <grpc/grpc.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 
35 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
36 #include "src/core/lib/gpr/env.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
40 #include "src/cpp/server/secure_server_credentials.h"
41 
42 #include "src/cpp/client/secure_credentials.h"
43 
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/end2end/test_service_impl.h"
47 
48 #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
49 #include "src/proto/grpc/testing/echo.grpc.pb.h"
50 
51 #include <gmock/gmock.h>
52 #include <gtest/gtest.h>
53 
54 // TODO(dgq): Other scenarios in need of testing:
55 // - Send a serverlist with faulty ip:port addresses (port > 2^16, etc).
56 // - Test reception of invalid serverlist
57 // - Test pinging
58 // - Test against a non-LB server.
59 // - Random LB server closing the stream unexpectedly.
60 // - Test using DNS-resolvable names (localhost?)
61 // - Test handling of creation of faulty RR instance by having the LB return a
62 //   serverlist with non-existent backends after having initially returned a
63 //   valid one.
64 //
65 // Findings from end to end testing to be covered here:
66 // - Handling of LB servers restart, including reconnection after backing-off
67 //   retries.
68 // - Destruction of load balanced channel (and therefore of grpclb instance)
69 //   while:
70 //   1) the internal LB call is still active. This should work by virtue
71 //   of the weak reference the LB call holds. The call should be terminated as
72 //   part of the grpclb shutdown process.
73 //   2) the retry timer is active. Again, the weak reference it holds should
74 //   prevent a premature call to \a glb_destroy.
75 // - Restart of backend servers with no changes to serverlist. This exercises
76 //   the RR handover mechanism.
77 
78 using std::chrono::system_clock;
79 
80 using grpc::lb::v1::LoadBalanceRequest;
81 using grpc::lb::v1::LoadBalanceResponse;
82 using grpc::lb::v1::LoadBalancer;
83 
84 namespace grpc {
85 namespace testing {
86 namespace {
87 
88 template <typename ServiceType>
89 class CountedService : public ServiceType {
90  public:
request_count()91   size_t request_count() {
92     std::unique_lock<std::mutex> lock(mu_);
93     return request_count_;
94   }
95 
response_count()96   size_t response_count() {
97     std::unique_lock<std::mutex> lock(mu_);
98     return response_count_;
99   }
100 
IncreaseResponseCount()101   void IncreaseResponseCount() {
102     std::unique_lock<std::mutex> lock(mu_);
103     ++response_count_;
104   }
IncreaseRequestCount()105   void IncreaseRequestCount() {
106     std::unique_lock<std::mutex> lock(mu_);
107     ++request_count_;
108   }
109 
ResetCounters()110   void ResetCounters() {
111     std::unique_lock<std::mutex> lock(mu_);
112     request_count_ = 0;
113     response_count_ = 0;
114   }
115 
116  protected:
117   std::mutex mu_;
118 
119  private:
120   size_t request_count_ = 0;
121   size_t response_count_ = 0;
122 };
123 
124 using BackendService = CountedService<TestServiceImpl>;
125 using BalancerService = CountedService<LoadBalancer::Service>;
126 
127 const char g_kCallCredsMdKey[] = "Balancer should not ...";
128 const char g_kCallCredsMdValue[] = "... receive me";
129 
130 class BackendServiceImpl : public BackendService {
131  public:
BackendServiceImpl()132   BackendServiceImpl() {}
133 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)134   Status Echo(ServerContext* context, const EchoRequest* request,
135               EchoResponse* response) override {
136     // Backend should receive the call credentials metadata.
137     auto call_credentials_entry =
138         context->client_metadata().find(g_kCallCredsMdKey);
139     EXPECT_NE(call_credentials_entry, context->client_metadata().end());
140     if (call_credentials_entry != context->client_metadata().end()) {
141       EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue);
142     }
143     IncreaseRequestCount();
144     const auto status = TestServiceImpl::Echo(context, request, response);
145     IncreaseResponseCount();
146     return status;
147   }
148 
149   // Returns true on its first invocation, false otherwise.
Shutdown()150   bool Shutdown() {
151     std::unique_lock<std::mutex> lock(mu_);
152     const bool prev = !shutdown_;
153     shutdown_ = true;
154     gpr_log(GPR_INFO, "Backend: shut down");
155     return prev;
156   }
157 
158  private:
159   std::mutex mu_;
160   bool shutdown_ = false;
161 };
162 
Ip4ToPackedString(const char * ip_str)163 grpc::string Ip4ToPackedString(const char* ip_str) {
164   struct in_addr ip4;
165   GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
166   return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
167 }
168 
169 struct ClientStats {
170   size_t num_calls_started = 0;
171   size_t num_calls_finished = 0;
172   size_t num_calls_finished_with_client_failed_to_send = 0;
173   size_t num_calls_finished_known_received = 0;
174   std::map<grpc::string, size_t> drop_token_counts;
175 
operator +=grpc::testing::__anon5deae6e10111::ClientStats176   ClientStats& operator+=(const ClientStats& other) {
177     num_calls_started += other.num_calls_started;
178     num_calls_finished += other.num_calls_finished;
179     num_calls_finished_with_client_failed_to_send +=
180         other.num_calls_finished_with_client_failed_to_send;
181     num_calls_finished_known_received +=
182         other.num_calls_finished_known_received;
183     for (const auto& p : other.drop_token_counts) {
184       drop_token_counts[p.first] += p.second;
185     }
186     return *this;
187   }
188 };
189 
190 class BalancerServiceImpl : public BalancerService {
191  public:
192   using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
193   using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
194 
BalancerServiceImpl(int client_load_reporting_interval_seconds)195   explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
196       : client_load_reporting_interval_seconds_(
197             client_load_reporting_interval_seconds),
198         shutdown_(false) {}
199 
BalanceLoad(ServerContext * context,Stream * stream)200   Status BalanceLoad(ServerContext* context, Stream* stream) override {
201     // Balancer shouldn't receive the call credentials metadata.
202     EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
203               context->client_metadata().end());
204     gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
205     LoadBalanceRequest request;
206     std::vector<ResponseDelayPair> responses_and_delays;
207 
208     if (!stream->Read(&request)) {
209       goto done;
210     }
211     IncreaseRequestCount();
212     gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this,
213             request.DebugString().c_str());
214 
215     // TODO(juanlishen): Initial response should always be the first response.
216     if (client_load_reporting_interval_seconds_ > 0) {
217       LoadBalanceResponse initial_response;
218       initial_response.mutable_initial_response()
219           ->mutable_client_stats_report_interval()
220           ->set_seconds(client_load_reporting_interval_seconds_);
221       stream->Write(initial_response);
222     }
223 
224     {
225       std::unique_lock<std::mutex> lock(mu_);
226       responses_and_delays = responses_and_delays_;
227     }
228     for (const auto& response_and_delay : responses_and_delays) {
229       {
230         std::unique_lock<std::mutex> lock(mu_);
231         if (shutdown_) goto done;
232       }
233       SendResponse(stream, response_and_delay.first, response_and_delay.second);
234     }
235     {
236       std::unique_lock<std::mutex> lock(mu_);
237       if (shutdown_) goto done;
238       serverlist_cond_.wait(lock, [this] { return serverlist_ready_; });
239     }
240 
241     if (client_load_reporting_interval_seconds_ > 0) {
242       request.Clear();
243       if (stream->Read(&request)) {
244         gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'",
245                 this, request.DebugString().c_str());
246         GPR_ASSERT(request.has_client_stats());
247         // We need to acquire the lock here in order to prevent the notify_one
248         // below from firing before its corresponding wait is executed.
249         std::lock_guard<std::mutex> lock(mu_);
250         client_stats_.num_calls_started +=
251             request.client_stats().num_calls_started();
252         client_stats_.num_calls_finished +=
253             request.client_stats().num_calls_finished();
254         client_stats_.num_calls_finished_with_client_failed_to_send +=
255             request.client_stats()
256                 .num_calls_finished_with_client_failed_to_send();
257         client_stats_.num_calls_finished_known_received +=
258             request.client_stats().num_calls_finished_known_received();
259         for (const auto& drop_token_count :
260              request.client_stats().calls_finished_with_drop()) {
261           client_stats_
262               .drop_token_counts[drop_token_count.load_balance_token()] +=
263               drop_token_count.num_calls();
264         }
265         load_report_ready_ = true;
266         load_report_cond_.notify_one();
267       }
268     }
269   done:
270     gpr_log(GPR_INFO, "LB[%p]: done", this);
271     return Status::OK;
272   }
273 
add_response(const LoadBalanceResponse & response,int send_after_ms)274   void add_response(const LoadBalanceResponse& response, int send_after_ms) {
275     std::unique_lock<std::mutex> lock(mu_);
276     responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
277   }
278 
279   // Returns true on its first invocation, false otherwise.
Shutdown()280   bool Shutdown() {
281     NotifyDoneWithServerlists();
282     std::unique_lock<std::mutex> lock(mu_);
283     const bool prev = !shutdown_;
284     shutdown_ = true;
285     gpr_log(GPR_INFO, "LB[%p]: shut down", this);
286     return prev;
287   }
288 
BuildResponseForBackends(const std::vector<int> & backend_ports,const std::map<grpc::string,size_t> & drop_token_counts)289   static LoadBalanceResponse BuildResponseForBackends(
290       const std::vector<int>& backend_ports,
291       const std::map<grpc::string, size_t>& drop_token_counts) {
292     LoadBalanceResponse response;
293     for (const auto& drop_token_count : drop_token_counts) {
294       for (size_t i = 0; i < drop_token_count.second; ++i) {
295         auto* server = response.mutable_server_list()->add_servers();
296         server->set_drop(true);
297         server->set_load_balance_token(drop_token_count.first);
298       }
299     }
300     for (const int& backend_port : backend_ports) {
301       auto* server = response.mutable_server_list()->add_servers();
302       server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
303       server->set_port(backend_port);
304     }
305     return response;
306   }
307 
WaitForLoadReport()308   const ClientStats& WaitForLoadReport() {
309     std::unique_lock<std::mutex> lock(mu_);
310     load_report_cond_.wait(lock, [this] { return load_report_ready_; });
311     load_report_ready_ = false;
312     return client_stats_;
313   }
314 
NotifyDoneWithServerlists()315   void NotifyDoneWithServerlists() {
316     std::lock_guard<std::mutex> lock(mu_);
317     serverlist_ready_ = true;
318     serverlist_cond_.notify_all();
319   }
320 
321  private:
SendResponse(Stream * stream,const LoadBalanceResponse & response,int delay_ms)322   void SendResponse(Stream* stream, const LoadBalanceResponse& response,
323                     int delay_ms) {
324     gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
325     if (delay_ms > 0) {
326       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
327     }
328     gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
329             response.DebugString().c_str());
330     IncreaseResponseCount();
331     stream->Write(response);
332   }
333 
334   const int client_load_reporting_interval_seconds_;
335   std::vector<ResponseDelayPair> responses_and_delays_;
336   std::mutex mu_;
337   std::condition_variable load_report_cond_;
338   bool load_report_ready_ = false;
339   std::condition_variable serverlist_cond_;
340   bool serverlist_ready_ = false;
341   ClientStats client_stats_;
342   bool shutdown_;
343 };
344 
345 class GrpclbEnd2endTest : public ::testing::Test {
346  protected:
GrpclbEnd2endTest(int num_backends,int num_balancers,int client_load_reporting_interval_seconds)347   GrpclbEnd2endTest(int num_backends, int num_balancers,
348                     int client_load_reporting_interval_seconds)
349       : server_host_("localhost"),
350         num_backends_(num_backends),
351         num_balancers_(num_balancers),
352         client_load_reporting_interval_seconds_(
353             client_load_reporting_interval_seconds) {
354     // Make the backup poller poll very frequently in order to pick up
355     // updates from all the subchannels's FDs.
356     gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
357   }
358 
SetUp()359   void SetUp() override {
360     response_generator_ =
361         grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
362     // Start the backends.
363     for (size_t i = 0; i < num_backends_; ++i) {
364       backends_.emplace_back(new BackendServiceImpl());
365       backend_servers_.emplace_back(ServerThread<BackendService>(
366           "backend", server_host_, backends_.back().get()));
367     }
368     // Start the load balancers.
369     for (size_t i = 0; i < num_balancers_; ++i) {
370       balancers_.emplace_back(
371           new BalancerServiceImpl(client_load_reporting_interval_seconds_));
372       balancer_servers_.emplace_back(ServerThread<BalancerService>(
373           "balancer", server_host_, balancers_.back().get()));
374     }
375     ResetStub();
376   }
377 
TearDown()378   void TearDown() override {
379     for (size_t i = 0; i < backends_.size(); ++i) {
380       if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
381     }
382     for (size_t i = 0; i < balancers_.size(); ++i) {
383       if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
384     }
385   }
386 
SetNextResolutionAllBalancers()387   void SetNextResolutionAllBalancers() {
388     std::vector<AddressData> addresses;
389     for (size_t i = 0; i < balancer_servers_.size(); ++i) {
390       addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
391     }
392     SetNextResolution(addresses);
393   }
394 
ResetStub(int fallback_timeout=0,const grpc::string & expected_targets="")395   void ResetStub(int fallback_timeout = 0,
396                  const grpc::string& expected_targets = "") {
397     ChannelArguments args;
398     args.SetGrpclbFallbackTimeout(fallback_timeout);
399     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
400                     response_generator_.get());
401     if (!expected_targets.empty()) {
402       args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
403     }
404     std::ostringstream uri;
405     uri << "fake:///" << kApplicationTargetName_;
406     // TODO(dgq): templatize tests to run everything using both secure and
407     // insecure channel credentials.
408     grpc_channel_credentials* channel_creds =
409         grpc_fake_transport_security_credentials_create();
410     grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
411         g_kCallCredsMdKey, g_kCallCredsMdValue, false);
412     std::shared_ptr<ChannelCredentials> creds(
413         new SecureChannelCredentials(grpc_composite_channel_credentials_create(
414             channel_creds, call_creds, nullptr)));
415     grpc_call_credentials_unref(call_creds);
416     grpc_channel_credentials_unref(channel_creds);
417     channel_ = CreateCustomChannel(uri.str(), creds, args);
418     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
419   }
420 
ResetBackendCounters()421   void ResetBackendCounters() {
422     for (const auto& backend : backends_) backend->ResetCounters();
423   }
424 
WaitForLoadReports()425   ClientStats WaitForLoadReports() {
426     ClientStats client_stats;
427     for (const auto& balancer : balancers_) {
428       client_stats += balancer->WaitForLoadReport();
429     }
430     return client_stats;
431   }
432 
SeenAllBackends()433   bool SeenAllBackends() {
434     for (const auto& backend : backends_) {
435       if (backend->request_count() == 0) return false;
436     }
437     return true;
438   }
439 
SendRpcAndCount(int * num_total,int * num_ok,int * num_failure,int * num_drops)440   void SendRpcAndCount(int* num_total, int* num_ok, int* num_failure,
441                        int* num_drops) {
442     const Status status = SendRpc();
443     if (status.ok()) {
444       ++*num_ok;
445     } else {
446       if (status.error_message() == "Call dropped by load balancing policy") {
447         ++*num_drops;
448       } else {
449         ++*num_failure;
450       }
451     }
452     ++*num_total;
453   }
454 
WaitForAllBackends(int num_requests_multiple_of=1)455   std::tuple<int, int, int> WaitForAllBackends(
456       int num_requests_multiple_of = 1) {
457     int num_ok = 0;
458     int num_failure = 0;
459     int num_drops = 0;
460     int num_total = 0;
461     while (!SeenAllBackends()) {
462       SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
463     }
464     while (num_total % num_requests_multiple_of != 0) {
465       SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
466     }
467     ResetBackendCounters();
468     gpr_log(GPR_INFO,
469             "Performed %d warm up requests (a multiple of %d) against the "
470             "backends. %d succeeded, %d failed, %d dropped.",
471             num_total, num_requests_multiple_of, num_ok, num_failure,
472             num_drops);
473     return std::make_tuple(num_ok, num_failure, num_drops);
474   }
475 
WaitForBackend(size_t backend_idx)476   void WaitForBackend(size_t backend_idx) {
477     do {
478       (void)SendRpc();
479     } while (backends_[backend_idx]->request_count() == 0);
480     ResetBackendCounters();
481   }
482 
483   struct AddressData {
484     int port;
485     bool is_balancer;
486     grpc::string balancer_name;
487   };
488 
CreateLbAddressesFromAddressDataList(const std::vector<AddressData> & address_data)489   grpc_lb_addresses* CreateLbAddressesFromAddressDataList(
490       const std::vector<AddressData>& address_data) {
491     grpc_lb_addresses* addresses =
492         grpc_lb_addresses_create(address_data.size(), nullptr);
493     for (size_t i = 0; i < address_data.size(); ++i) {
494       char* lb_uri_str;
495       gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port);
496       grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
497       GPR_ASSERT(lb_uri != nullptr);
498       grpc_lb_addresses_set_address_from_uri(
499           addresses, i, lb_uri, address_data[i].is_balancer,
500           address_data[i].balancer_name.c_str(), nullptr);
501       grpc_uri_destroy(lb_uri);
502       gpr_free(lb_uri_str);
503     }
504     return addresses;
505   }
506 
SetNextResolution(const std::vector<AddressData> & address_data)507   void SetNextResolution(const std::vector<AddressData>& address_data) {
508     grpc_core::ExecCtx exec_ctx;
509     grpc_lb_addresses* addresses =
510         CreateLbAddressesFromAddressDataList(address_data);
511     grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
512     grpc_channel_args fake_result = {1, &fake_addresses};
513     response_generator_->SetResponse(&fake_result);
514     grpc_lb_addresses_destroy(addresses);
515   }
516 
SetNextReresolutionResponse(const std::vector<AddressData> & address_data)517   void SetNextReresolutionResponse(
518       const std::vector<AddressData>& address_data) {
519     grpc_core::ExecCtx exec_ctx;
520     grpc_lb_addresses* addresses =
521         CreateLbAddressesFromAddressDataList(address_data);
522     grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
523     grpc_channel_args fake_result = {1, &fake_addresses};
524     response_generator_->SetReresolutionResponse(&fake_result);
525     grpc_lb_addresses_destroy(addresses);
526   }
527 
GetBackendPorts(const size_t start_index=0) const528   const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
529     std::vector<int> backend_ports;
530     for (size_t i = start_index; i < backend_servers_.size(); ++i) {
531       backend_ports.push_back(backend_servers_[i].port_);
532     }
533     return backend_ports;
534   }
535 
ScheduleResponseForBalancer(size_t i,const LoadBalanceResponse & response,int delay_ms)536   void ScheduleResponseForBalancer(size_t i,
537                                    const LoadBalanceResponse& response,
538                                    int delay_ms) {
539     balancers_.at(i)->add_response(response, delay_ms);
540   }
541 
SendRpc(EchoResponse * response=nullptr,int timeout_ms=1000)542   Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
543     const bool local_response = (response == nullptr);
544     if (local_response) response = new EchoResponse;
545     EchoRequest request;
546     request.set_message(kRequestMessage_);
547     ClientContext context;
548     context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
549     Status status = stub_->Echo(&context, request, response);
550     if (local_response) delete response;
551     return status;
552   }
553 
CheckRpcSendOk(const size_t times=1,const int timeout_ms=1000)554   void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000) {
555     for (size_t i = 0; i < times; ++i) {
556       EchoResponse response;
557       const Status status = SendRpc(&response, timeout_ms);
558       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
559                                << " message=" << status.error_message();
560       EXPECT_EQ(response.message(), kRequestMessage_);
561     }
562   }
563 
CheckRpcSendFailure()564   void CheckRpcSendFailure() {
565     const Status status = SendRpc();
566     EXPECT_FALSE(status.ok());
567   }
568 
569   template <typename T>
570   struct ServerThread {
ServerThreadgrpc::testing::__anon5deae6e10111::GrpclbEnd2endTest::ServerThread571     explicit ServerThread(const grpc::string& type,
572                           const grpc::string& server_host, T* service)
573         : type_(type), service_(service) {
574       std::mutex mu;
575       // We need to acquire the lock here in order to prevent the notify_one
576       // by ServerThread::Start from firing before the wait below is hit.
577       std::unique_lock<std::mutex> lock(mu);
578       port_ = grpc_pick_unused_port_or_die();
579       gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
580       std::condition_variable cond;
581       thread_.reset(new std::thread(
582           std::bind(&ServerThread::Start, this, server_host, &mu, &cond)));
583       cond.wait(lock);
584       gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
585     }
586 
Startgrpc::testing::__anon5deae6e10111::GrpclbEnd2endTest::ServerThread587     void Start(const grpc::string& server_host, std::mutex* mu,
588                std::condition_variable* cond) {
589       // We need to acquire the lock here in order to prevent the notify_one
590       // below from firing before its corresponding wait is executed.
591       std::lock_guard<std::mutex> lock(*mu);
592       std::ostringstream server_address;
593       server_address << server_host << ":" << port_;
594       ServerBuilder builder;
595       std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
596           grpc_fake_transport_security_server_credentials_create()));
597       builder.AddListeningPort(server_address.str(), creds);
598       builder.RegisterService(service_);
599       server_ = builder.BuildAndStart();
600       cond->notify_one();
601     }
602 
Shutdowngrpc::testing::__anon5deae6e10111::GrpclbEnd2endTest::ServerThread603     void Shutdown() {
604       gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
605       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
606       thread_->join();
607       gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
608     }
609 
610     int port_;
611     grpc::string type_;
612     std::unique_ptr<Server> server_;
613     T* service_;
614     std::unique_ptr<std::thread> thread_;
615   };
616 
617   const grpc::string server_host_;
618   const size_t num_backends_;
619   const size_t num_balancers_;
620   const int client_load_reporting_interval_seconds_;
621   std::shared_ptr<Channel> channel_;
622   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
623   std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
624   std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
625   std::vector<ServerThread<BackendService>> backend_servers_;
626   std::vector<ServerThread<BalancerService>> balancer_servers_;
627   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
628       response_generator_;
629   const grpc::string kRequestMessage_ = "Live long and prosper.";
630   const grpc::string kApplicationTargetName_ = "application_target_name";
631 };
632 
633 class SingleBalancerTest : public GrpclbEnd2endTest {
634  public:
SingleBalancerTest()635   SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
636 };
637 
TEST_F(SingleBalancerTest,Vanilla)638 TEST_F(SingleBalancerTest, Vanilla) {
639   SetNextResolutionAllBalancers();
640   const size_t kNumRpcsPerAddress = 100;
641   ScheduleResponseForBalancer(
642       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
643       0);
644   // Make sure that trying to connect works without a call.
645   channel_->GetState(true /* try_to_connect */);
646   // We need to wait for all backends to come online.
647   WaitForAllBackends();
648   // Send kNumRpcsPerAddress RPCs per server.
649   CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
650 
651   // Each backend should have gotten 100 requests.
652   for (size_t i = 0; i < backends_.size(); ++i) {
653     EXPECT_EQ(kNumRpcsPerAddress,
654               backend_servers_[i].service_->request_count());
655   }
656   balancers_[0]->NotifyDoneWithServerlists();
657   // The balancer got a single request.
658   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
659   // and sent a single response.
660   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
661 
662   // Check LB policy name for the channel.
663   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
664 }
665 
TEST_F(SingleBalancerTest,SecureNaming)666 TEST_F(SingleBalancerTest, SecureNaming) {
667   ResetStub(0, kApplicationTargetName_ + ";lb");
668   SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}});
669   const size_t kNumRpcsPerAddress = 100;
670   ScheduleResponseForBalancer(
671       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
672       0);
673   // Make sure that trying to connect works without a call.
674   channel_->GetState(true /* try_to_connect */);
675   // We need to wait for all backends to come online.
676   WaitForAllBackends();
677   // Send kNumRpcsPerAddress RPCs per server.
678   CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
679 
680   // Each backend should have gotten 100 requests.
681   for (size_t i = 0; i < backends_.size(); ++i) {
682     EXPECT_EQ(kNumRpcsPerAddress,
683               backend_servers_[i].service_->request_count());
684   }
685   balancers_[0]->NotifyDoneWithServerlists();
686   // The balancer got a single request.
687   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
688   // and sent a single response.
689   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
690   // Check LB policy name for the channel.
691   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
692 }
693 
TEST_F(SingleBalancerTest,SecureNamingDeathTest)694 TEST_F(SingleBalancerTest, SecureNamingDeathTest) {
695   ::testing::FLAGS_gtest_death_test_style = "threadsafe";
696   // Make sure that we blow up (via abort() from the security connector) when
697   // the name from the balancer doesn't match expectations.
698   ASSERT_DEATH(
699       {
700         ResetStub(0, kApplicationTargetName_ + ";lb");
701         SetNextResolution(
702             {AddressData{balancer_servers_[0].port_, true, "woops"}});
703         channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
704       },
705       "");
706 }
707 
TEST_F(SingleBalancerTest,InitiallyEmptyServerlist)708 TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
709   SetNextResolutionAllBalancers();
710   const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
711   const int kCallDeadlineMs = kServerlistDelayMs * 2;
712   // First response is an empty serverlist, sent right away.
713   ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
714   // Send non-empty serverlist only after kServerlistDelayMs
715   ScheduleResponseForBalancer(
716       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
717       kServerlistDelayMs);
718 
719   const auto t0 = system_clock::now();
720   // Client will block: LB will initially send empty serverlist.
721   CheckRpcSendOk(1, kCallDeadlineMs);
722   const auto ellapsed_ms =
723       std::chrono::duration_cast<std::chrono::milliseconds>(
724           system_clock::now() - t0);
725   // but eventually, the LB sends a serverlist update that allows the call to
726   // proceed. The call delay must be larger than the delay in sending the
727   // populated serverlist but under the call's deadline (which is enforced by
728   // the call's deadline).
729   EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs);
730   balancers_[0]->NotifyDoneWithServerlists();
731   // The balancer got a single request.
732   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
733   // and sent two responses.
734   EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
735 }
736 
TEST_F(SingleBalancerTest,AllServersUnreachableFailFast)737 TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
738   SetNextResolutionAllBalancers();
739   const size_t kNumUnreachableServers = 5;
740   std::vector<int> ports;
741   for (size_t i = 0; i < kNumUnreachableServers; ++i) {
742     ports.push_back(grpc_pick_unused_port_or_die());
743   }
744   ScheduleResponseForBalancer(
745       0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0);
746   const Status status = SendRpc();
747   // The error shouldn't be DEADLINE_EXCEEDED.
748   EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
749   balancers_[0]->NotifyDoneWithServerlists();
750   // The balancer got a single request.
751   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
752   // and sent a single response.
753   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
754 }
755 
TEST_F(SingleBalancerTest,Fallback)756 TEST_F(SingleBalancerTest, Fallback) {
757   SetNextResolutionAllBalancers();
758   const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
759   const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
760   const size_t kNumBackendInResolution = backends_.size() / 2;
761 
762   ResetStub(kFallbackTimeoutMs);
763   std::vector<AddressData> addresses;
764   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
765   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
766     addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
767   }
768   SetNextResolution(addresses);
769 
770   // Send non-empty serverlist only after kServerlistDelayMs.
771   ScheduleResponseForBalancer(
772       0,
773       BalancerServiceImpl::BuildResponseForBackends(
774           GetBackendPorts(kNumBackendInResolution /* start_index */), {}),
775       kServerlistDelayMs);
776 
777   // Wait until all the fallback backends are reachable.
778   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
779     WaitForBackend(i);
780   }
781 
782   // The first request.
783   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
784   CheckRpcSendOk(kNumBackendInResolution);
785   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
786 
787   // Fallback is used: each backend returned by the resolver should have
788   // gotten one request.
789   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
790     EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
791   }
792   for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
793     EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
794   }
795 
796   // Wait until the serverlist reception has been processed and all backends
797   // in the serverlist are reachable.
798   for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
799     WaitForBackend(i);
800   }
801 
802   // Send out the second request.
803   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
804   CheckRpcSendOk(backends_.size() - kNumBackendInResolution);
805   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
806 
807   // Serverlist is used: each backend returned by the balancer should
808   // have gotten one request.
809   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
810     EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
811   }
812   for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
813     EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
814   }
815 
816   balancers_[0]->NotifyDoneWithServerlists();
817   // The balancer got a single request.
818   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
819   // and sent a single response.
820   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
821 }
822 
TEST_F(SingleBalancerTest,FallbackUpdate)823 TEST_F(SingleBalancerTest, FallbackUpdate) {
824   SetNextResolutionAllBalancers();
825   const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
826   const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
827   const size_t kNumBackendInResolution = backends_.size() / 3;
828   const size_t kNumBackendInResolutionUpdate = backends_.size() / 3;
829 
830   ResetStub(kFallbackTimeoutMs);
831   std::vector<AddressData> addresses;
832   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
833   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
834     addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
835   }
836   SetNextResolution(addresses);
837 
838   // Send non-empty serverlist only after kServerlistDelayMs.
839   ScheduleResponseForBalancer(
840       0,
841       BalancerServiceImpl::BuildResponseForBackends(
842           GetBackendPorts(kNumBackendInResolution +
843                           kNumBackendInResolutionUpdate /* start_index */),
844           {}),
845       kServerlistDelayMs);
846 
847   // Wait until all the fallback backends are reachable.
848   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
849     WaitForBackend(i);
850   }
851 
852   // The first request.
853   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
854   CheckRpcSendOk(kNumBackendInResolution);
855   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
856 
857   // Fallback is used: each backend returned by the resolver should have
858   // gotten one request.
859   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
860     EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
861   }
862   for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
863     EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
864   }
865 
866   addresses.clear();
867   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
868   for (size_t i = kNumBackendInResolution;
869        i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
870     addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
871   }
872   SetNextResolution(addresses);
873 
874   // Wait until the resolution update has been processed and all the new
875   // fallback backends are reachable.
876   for (size_t i = kNumBackendInResolution;
877        i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
878     WaitForBackend(i);
879   }
880 
881   // Send out the second request.
882   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
883   CheckRpcSendOk(kNumBackendInResolutionUpdate);
884   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
885 
886   // The resolution update is used: each backend in the resolution update should
887   // have gotten one request.
888   for (size_t i = 0; i < kNumBackendInResolution; ++i) {
889     EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
890   }
891   for (size_t i = kNumBackendInResolution;
892        i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
893     EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
894   }
895   for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
896        i < backends_.size(); ++i) {
897     EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
898   }
899 
900   // Wait until the serverlist reception has been processed and all backends
901   // in the serverlist are reachable.
902   for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
903        i < backends_.size(); ++i) {
904     WaitForBackend(i);
905   }
906 
907   // Send out the third request.
908   gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
909   CheckRpcSendOk(backends_.size() - kNumBackendInResolution -
910                  kNumBackendInResolutionUpdate);
911   gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
912 
913   // Serverlist is used: each backend returned by the balancer should
914   // have gotten one request.
915   for (size_t i = 0;
916        i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
917     EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
918   }
919   for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
920        i < backends_.size(); ++i) {
921     EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
922   }
923 
924   balancers_[0]->NotifyDoneWithServerlists();
925   // The balancer got a single request.
926   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
927   // and sent a single response.
928   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
929 }
930 
TEST_F(SingleBalancerTest,BackendsRestart)931 TEST_F(SingleBalancerTest, BackendsRestart) {
932   SetNextResolutionAllBalancers();
933   const size_t kNumRpcsPerAddress = 100;
934   ScheduleResponseForBalancer(
935       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
936       0);
937   // Make sure that trying to connect works without a call.
938   channel_->GetState(true /* try_to_connect */);
939   // Send kNumRpcsPerAddress RPCs per server.
940   CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
941   balancers_[0]->NotifyDoneWithServerlists();
942   // The balancer got a single request.
943   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
944   // and sent a single response.
945   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
946   for (size_t i = 0; i < backends_.size(); ++i) {
947     if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
948   }
949   CheckRpcSendFailure();
950   for (size_t i = 0; i < num_backends_; ++i) {
951     backends_.emplace_back(new BackendServiceImpl());
952     backend_servers_.emplace_back(ServerThread<BackendService>(
953         "backend", server_host_, backends_.back().get()));
954   }
955   // The following RPC will fail due to the backend ports having changed. It
956   // will nonetheless exercise the grpclb-roundrobin handling of the RR policy
957   // having gone into shutdown.
958   // TODO(dgq): implement the "backend restart" component as well. We need extra
959   // machinery to either update the LB responses "on the fly" or instruct
960   // backends which ports to restart on.
961   CheckRpcSendFailure();
962 }
963 
964 class UpdatesTest : public GrpclbEnd2endTest {
965  public:
UpdatesTest()966   UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
967 };
968 
TEST_F(UpdatesTest,UpdateBalancers)969 TEST_F(UpdatesTest, UpdateBalancers) {
970   SetNextResolutionAllBalancers();
971   const std::vector<int> first_backend{GetBackendPorts()[0]};
972   const std::vector<int> second_backend{GetBackendPorts()[1]};
973   ScheduleResponseForBalancer(
974       0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
975   ScheduleResponseForBalancer(
976       1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
977 
978   // Wait until the first backend is ready.
979   WaitForBackend(0);
980 
981   // Send 10 requests.
982   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
983   CheckRpcSendOk(10);
984   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
985 
986   // All 10 requests should have gone to the first backend.
987   EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
988 
989   balancers_[0]->NotifyDoneWithServerlists();
990   balancers_[1]->NotifyDoneWithServerlists();
991   balancers_[2]->NotifyDoneWithServerlists();
992   // Balancer 0 got a single request.
993   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
994   // and sent a single response.
995   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
996   EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
997   EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
998   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
999   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1000 
1001   std::vector<AddressData> addresses;
1002   addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1003   gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
1004   SetNextResolution(addresses);
1005   gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
1006 
1007   // Wait until update has been processed, as signaled by the second backend
1008   // receiving a request.
1009   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1010   WaitForBackend(1);
1011 
1012   backend_servers_[1].service_->ResetCounters();
1013   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1014   CheckRpcSendOk(10);
1015   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1016   // All 10 requests should have gone to the second backend.
1017   EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1018 
1019   balancers_[0]->NotifyDoneWithServerlists();
1020   balancers_[1]->NotifyDoneWithServerlists();
1021   balancers_[2]->NotifyDoneWithServerlists();
1022   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1023   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1024   EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
1025   EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
1026   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1027   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1028 }
1029 
1030 // Send an update with the same set of LBs as the one in SetUp() in order to
1031 // verify that the LB channel inside grpclb keeps the initial connection (which
1032 // by definition is also present in the update).
TEST_F(UpdatesTest,UpdateBalancersRepeated)1033 TEST_F(UpdatesTest, UpdateBalancersRepeated) {
1034   SetNextResolutionAllBalancers();
1035   const std::vector<int> first_backend{GetBackendPorts()[0]};
1036   const std::vector<int> second_backend{GetBackendPorts()[0]};
1037 
1038   ScheduleResponseForBalancer(
1039       0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
1040   ScheduleResponseForBalancer(
1041       1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
1042 
1043   // Wait until the first backend is ready.
1044   WaitForBackend(0);
1045 
1046   // Send 10 requests.
1047   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1048   CheckRpcSendOk(10);
1049   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1050 
1051   // All 10 requests should have gone to the first backend.
1052   EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1053 
1054   balancers_[0]->NotifyDoneWithServerlists();
1055   // Balancer 0 got a single request.
1056   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1057   // and sent a single response.
1058   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1059   EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1060   EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1061   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1062   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1063 
1064   std::vector<AddressData> addresses;
1065   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1066   addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1067   addresses.emplace_back(AddressData{balancer_servers_[2].port_, true, ""});
1068   gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
1069   SetNextResolution(addresses);
1070   gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
1071 
1072   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1073   gpr_timespec deadline = gpr_time_add(
1074       gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
1075   // Send 10 seconds worth of RPCs
1076   do {
1077     CheckRpcSendOk();
1078   } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
1079   // grpclb continued using the original LB call to the first balancer, which
1080   // doesn't assign the second backend.
1081   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1082   balancers_[0]->NotifyDoneWithServerlists();
1083 
1084   addresses.clear();
1085   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1086   addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1087   gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 ==========");
1088   SetNextResolution(addresses);
1089   gpr_log(GPR_INFO, "========= UPDATE 2 DONE ==========");
1090 
1091   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1092   deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1093                           gpr_time_from_millis(10000, GPR_TIMESPAN));
1094   // Send 10 seconds worth of RPCs
1095   do {
1096     CheckRpcSendOk();
1097   } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
1098   // grpclb continued using the original LB call to the first balancer, which
1099   // doesn't assign the second backend.
1100   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1101   balancers_[0]->NotifyDoneWithServerlists();
1102 }
1103 
TEST_F(UpdatesTest,UpdateBalancersDeadUpdate)1104 TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
1105   std::vector<AddressData> addresses;
1106   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1107   SetNextResolution(addresses);
1108   const std::vector<int> first_backend{GetBackendPorts()[0]};
1109   const std::vector<int> second_backend{GetBackendPorts()[1]};
1110 
1111   ScheduleResponseForBalancer(
1112       0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
1113   ScheduleResponseForBalancer(
1114       1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
1115 
1116   // Start servers and send 10 RPCs per server.
1117   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1118   CheckRpcSendOk(10);
1119   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1120   // All 10 requests should have gone to the first backend.
1121   EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1122 
1123   // Kill balancer 0
1124   gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
1125   balancers_[0]->NotifyDoneWithServerlists();
1126   if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
1127   gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
1128 
1129   // This is serviced by the existing RR policy
1130   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1131   CheckRpcSendOk(10);
1132   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1133   // All 10 requests should again have gone to the first backend.
1134   EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
1135   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1136 
1137   balancers_[0]->NotifyDoneWithServerlists();
1138   balancers_[1]->NotifyDoneWithServerlists();
1139   balancers_[2]->NotifyDoneWithServerlists();
1140   // Balancer 0 got a single request.
1141   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1142   // and sent a single response.
1143   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1144   EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1145   EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1146   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1147   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1148 
1149   addresses.clear();
1150   addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1151   gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
1152   SetNextResolution(addresses);
1153   gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
1154 
1155   // Wait until update has been processed, as signaled by the second backend
1156   // receiving a request. In the meantime, the client continues to be serviced
1157   // (by the first backend) without interruption.
1158   EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
1159   WaitForBackend(1);
1160 
1161   // This is serviced by the updated RR policy
1162   backend_servers_[1].service_->ResetCounters();
1163   gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
1164   CheckRpcSendOk(10);
1165   gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
1166   // All 10 requests should have gone to the second backend.
1167   EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1168 
1169   balancers_[0]->NotifyDoneWithServerlists();
1170   balancers_[1]->NotifyDoneWithServerlists();
1171   balancers_[2]->NotifyDoneWithServerlists();
1172   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1173   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1174   // The second balancer, published as part of the first update, may end up
1175   // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
1176   // firing races with the arrival of the update containing the second
1177   // balancer.
1178   EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
1179   EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
1180   EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
1181   EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
1182   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1183   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1184 }
1185 
TEST_F(UpdatesTest,ReresolveDeadBackend)1186 TEST_F(UpdatesTest, ReresolveDeadBackend) {
1187   ResetStub(500);
1188   // The first resolution contains the addresses of a balancer that never
1189   // responds, and a fallback backend.
1190   std::vector<AddressData> addresses;
1191   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1192   addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""});
1193   SetNextResolution(addresses);
1194   // The re-resolution result will contain the addresses of the same balancer
1195   // and a new fallback backend.
1196   addresses.clear();
1197   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1198   addresses.emplace_back(AddressData{backend_servers_[1].port_, false, ""});
1199   SetNextReresolutionResponse(addresses);
1200 
1201   // Start servers and send 10 RPCs per server.
1202   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1203   CheckRpcSendOk(10);
1204   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1205   // All 10 requests should have gone to the fallback backend.
1206   EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1207 
1208   // Kill backend 0.
1209   gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
1210   if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
1211   gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
1212 
1213   // Wait until re-resolution has finished, as signaled by the second backend
1214   // receiving a request.
1215   WaitForBackend(1);
1216 
1217   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1218   CheckRpcSendOk(10);
1219   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1220   // All 10 requests should have gone to the second backend.
1221   EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1222 
1223   balancers_[0]->NotifyDoneWithServerlists();
1224   balancers_[1]->NotifyDoneWithServerlists();
1225   balancers_[2]->NotifyDoneWithServerlists();
1226   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1227   EXPECT_EQ(0U, balancer_servers_[0].service_->response_count());
1228   EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1229   EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1230   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1231   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1232 }
1233 
1234 // TODO(juanlishen): Should be removed when the first response is always the
1235 // initial response. Currently, if client load reporting is not enabled, the
1236 // balancer doesn't send initial response. When the backend shuts down, an
1237 // unexpected re-resolution will happen. This test configuration is a workaround
1238 // for test ReresolveDeadBalancer.
1239 class UpdatesWithClientLoadReportingTest : public GrpclbEnd2endTest {
1240  public:
UpdatesWithClientLoadReportingTest()1241   UpdatesWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 3, 2) {}
1242 };
1243 
TEST_F(UpdatesWithClientLoadReportingTest,ReresolveDeadBalancer)1244 TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
1245   std::vector<AddressData> addresses;
1246   addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
1247   SetNextResolution(addresses);
1248   addresses.clear();
1249   addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
1250   SetNextReresolutionResponse(addresses);
1251   const std::vector<int> first_backend{GetBackendPorts()[0]};
1252   const std::vector<int> second_backend{GetBackendPorts()[1]};
1253 
1254   ScheduleResponseForBalancer(
1255       0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
1256   ScheduleResponseForBalancer(
1257       1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
1258 
1259   // Start servers and send 10 RPCs per server.
1260   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1261   CheckRpcSendOk(10);
1262   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1263   // All 10 requests should have gone to the first backend.
1264   EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
1265 
1266   // Kill backend 0.
1267   gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
1268   if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
1269   gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
1270 
1271   CheckRpcSendFailure();
1272 
1273   // Balancer 0 got a single request.
1274   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1275   // and sent a single response.
1276   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1277   EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
1278   EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
1279   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1280   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1281 
1282   // Kill balancer 0.
1283   gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
1284   if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
1285   gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
1286 
1287   // Wait until re-resolution has finished, as signaled by the second backend
1288   // receiving a request.
1289   WaitForBackend(1);
1290 
1291   // This is serviced by the new serverlist.
1292   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1293   CheckRpcSendOk(10);
1294   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1295   // All 10 requests should have gone to the second backend.
1296   EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
1297 
1298   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1299   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1300   // After balancer 0 is killed, we restart an LB call immediately (because we
1301   // disconnect to a previously connected balancer). Although we will cancel
1302   // this call when the re-resolution update is done and another LB call restart
1303   // is needed, this old call may still succeed reaching the LB server if
1304   // re-resolution is slow. So balancer 1 may have received 2 requests and sent
1305   // 2 responses.
1306   EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
1307   EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
1308   EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
1309   EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
1310   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
1311   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
1312 }
1313 
TEST_F(SingleBalancerTest,Drop)1314 TEST_F(SingleBalancerTest, Drop) {
1315   SetNextResolutionAllBalancers();
1316   const size_t kNumRpcsPerAddress = 100;
1317   const int num_of_drop_by_rate_limiting_addresses = 1;
1318   const int num_of_drop_by_load_balancing_addresses = 2;
1319   const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
1320                                     num_of_drop_by_load_balancing_addresses;
1321   const int num_total_addresses = num_backends_ + num_of_drop_addresses;
1322   ScheduleResponseForBalancer(
1323       0,
1324       BalancerServiceImpl::BuildResponseForBackends(
1325           GetBackendPorts(),
1326           {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1327            {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1328       0);
1329   // Wait until all backends are ready.
1330   WaitForAllBackends();
1331   // Send kNumRpcsPerAddress RPCs for each server and drop address.
1332   size_t num_drops = 0;
1333   for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) {
1334     EchoResponse response;
1335     const Status status = SendRpc(&response);
1336     if (!status.ok() &&
1337         status.error_message() == "Call dropped by load balancing policy") {
1338       ++num_drops;
1339     } else {
1340       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1341                                << " message=" << status.error_message();
1342       EXPECT_EQ(response.message(), kRequestMessage_);
1343     }
1344   }
1345   EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
1346 
1347   // Each backend should have gotten 100 requests.
1348   for (size_t i = 0; i < backends_.size(); ++i) {
1349     EXPECT_EQ(kNumRpcsPerAddress,
1350               backend_servers_[i].service_->request_count());
1351   }
1352   // The balancer got a single request.
1353   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1354   // and sent a single response.
1355   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1356 }
1357 
TEST_F(SingleBalancerTest,DropAllFirst)1358 TEST_F(SingleBalancerTest, DropAllFirst) {
1359   SetNextResolutionAllBalancers();
1360   // All registered addresses are marked as "drop".
1361   const int num_of_drop_by_rate_limiting_addresses = 1;
1362   const int num_of_drop_by_load_balancing_addresses = 1;
1363   ScheduleResponseForBalancer(
1364       0,
1365       BalancerServiceImpl::BuildResponseForBackends(
1366           {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1367                {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1368       0);
1369   const Status status = SendRpc();
1370   EXPECT_FALSE(status.ok());
1371   EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
1372 }
1373 
TEST_F(SingleBalancerTest,DropAll)1374 TEST_F(SingleBalancerTest, DropAll) {
1375   SetNextResolutionAllBalancers();
1376   ScheduleResponseForBalancer(
1377       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
1378       0);
1379   const int num_of_drop_by_rate_limiting_addresses = 1;
1380   const int num_of_drop_by_load_balancing_addresses = 1;
1381   ScheduleResponseForBalancer(
1382       0,
1383       BalancerServiceImpl::BuildResponseForBackends(
1384           {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1385                {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1386       1000);
1387 
1388   // First call succeeds.
1389   CheckRpcSendOk();
1390   // But eventually, the update with only dropped servers is processed and calls
1391   // fail.
1392   Status status;
1393   do {
1394     status = SendRpc();
1395   } while (status.ok());
1396   EXPECT_FALSE(status.ok());
1397   EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
1398 }
1399 
1400 class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
1401  public:
SingleBalancerWithClientLoadReportingTest()1402   SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 3) {}
1403 };
1404 
TEST_F(SingleBalancerWithClientLoadReportingTest,Vanilla)1405 TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
1406   SetNextResolutionAllBalancers();
1407   const size_t kNumRpcsPerAddress = 100;
1408   ScheduleResponseForBalancer(
1409       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
1410       0);
1411   // Wait until all backends are ready.
1412   int num_ok = 0;
1413   int num_failure = 0;
1414   int num_drops = 0;
1415   std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
1416   // Send kNumRpcsPerAddress RPCs per server.
1417   CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
1418   // Each backend should have gotten 100 requests.
1419   for (size_t i = 0; i < backends_.size(); ++i) {
1420     EXPECT_EQ(kNumRpcsPerAddress,
1421               backend_servers_[i].service_->request_count());
1422   }
1423   balancers_[0]->NotifyDoneWithServerlists();
1424   // The balancer got a single request.
1425   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1426   // and sent a single response.
1427   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1428 
1429   const ClientStats client_stats = WaitForLoadReports();
1430   EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
1431             client_stats.num_calls_started);
1432   EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
1433             client_stats.num_calls_finished);
1434   EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1435   EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + (num_ok + num_drops),
1436             client_stats.num_calls_finished_known_received);
1437   EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
1438 }
1439 
TEST_F(SingleBalancerWithClientLoadReportingTest,Drop)1440 TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
1441   SetNextResolutionAllBalancers();
1442   const size_t kNumRpcsPerAddress = 3;
1443   const int num_of_drop_by_rate_limiting_addresses = 2;
1444   const int num_of_drop_by_load_balancing_addresses = 1;
1445   const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
1446                                     num_of_drop_by_load_balancing_addresses;
1447   const int num_total_addresses = num_backends_ + num_of_drop_addresses;
1448   ScheduleResponseForBalancer(
1449       0,
1450       BalancerServiceImpl::BuildResponseForBackends(
1451           GetBackendPorts(),
1452           {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1453            {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1454       0);
1455   // Wait until all backends are ready.
1456   int num_warmup_ok = 0;
1457   int num_warmup_failure = 0;
1458   int num_warmup_drops = 0;
1459   std::tie(num_warmup_ok, num_warmup_failure, num_warmup_drops) =
1460       WaitForAllBackends(num_total_addresses /* num_requests_multiple_of */);
1461   const int num_total_warmup_requests =
1462       num_warmup_ok + num_warmup_failure + num_warmup_drops;
1463   size_t num_drops = 0;
1464   for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) {
1465     EchoResponse response;
1466     const Status status = SendRpc(&response);
1467     if (!status.ok() &&
1468         status.error_message() == "Call dropped by load balancing policy") {
1469       ++num_drops;
1470     } else {
1471       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1472                                << " message=" << status.error_message();
1473       EXPECT_EQ(response.message(), kRequestMessage_);
1474     }
1475   }
1476   EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
1477   // Each backend should have gotten 100 requests.
1478   for (size_t i = 0; i < backends_.size(); ++i) {
1479     EXPECT_EQ(kNumRpcsPerAddress,
1480               backend_servers_[i].service_->request_count());
1481   }
1482   balancers_[0]->NotifyDoneWithServerlists();
1483   // The balancer got a single request.
1484   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
1485   // and sent a single response.
1486   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
1487 
1488   const ClientStats client_stats = WaitForLoadReports();
1489   EXPECT_EQ(
1490       kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
1491       client_stats.num_calls_started);
1492   EXPECT_EQ(
1493       kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
1494       client_stats.num_calls_finished);
1495   EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1496   EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_warmup_ok,
1497             client_stats.num_calls_finished_known_received);
1498   // The number of warmup request is a multiple of the number of addresses.
1499   // Therefore, all addresses in the scheduled balancer response are hit the
1500   // same number of times.
1501   const int num_times_drop_addresses_hit =
1502       num_warmup_drops / num_of_drop_addresses;
1503   EXPECT_THAT(
1504       client_stats.drop_token_counts,
1505       ::testing::ElementsAre(
1506           ::testing::Pair("load_balancing",
1507                           (kNumRpcsPerAddress + num_times_drop_addresses_hit)),
1508           ::testing::Pair(
1509               "rate_limiting",
1510               (kNumRpcsPerAddress + num_times_drop_addresses_hit) * 2)));
1511 }
1512 
1513 }  // namespace
1514 }  // namespace testing
1515 }  // namespace grpc
1516 
main(int argc,char ** argv)1517 int main(int argc, char** argv) {
1518   grpc_init();
1519   grpc_test_init(argc, argv);
1520   ::testing::InitGoogleTest(&argc, argv);
1521   const auto result = RUN_ALL_TESTS();
1522   grpc_shutdown();
1523   return result;
1524 }
1525