1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <mutex>
20 #include <thread>
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/time.h>
26 #include <grpcpp/channel.h>
27 #include <grpcpp/client_context.h>
28 #include <grpcpp/create_channel.h>
29 #include <grpcpp/resource_quota.h>
30 #include <grpcpp/security/auth_metadata_processor.h>
31 #include <grpcpp/security/credentials.h>
32 #include <grpcpp/security/server_credentials.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 #include <grpcpp/server_context.h>
36 
37 #include "src/core/lib/gpr/env.h"
38 #include "src/core/lib/security/credentials/credentials.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/end2end/test_service_impl.h"
44 #include "test/cpp/util/string_ref_helper.h"
45 #include "test/cpp/util/test_credentials_provider.h"
46 
47 #include <gtest/gtest.h>
48 
49 using grpc::testing::EchoRequest;
50 using grpc::testing::EchoResponse;
51 using grpc::testing::kTlsCredentialsType;
52 using std::chrono::system_clock;
53 
54 namespace grpc {
55 namespace testing {
56 namespace {
57 
CheckIsLocalhost(const grpc::string & addr)58 bool CheckIsLocalhost(const grpc::string& addr) {
59   const grpc::string kIpv6("ipv6:[::1]:");
60   const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
61   const grpc::string kIpv4("ipv4:127.0.0.1:");
62   return addr.substr(0, kIpv4.size()) == kIpv4 ||
63          addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
64          addr.substr(0, kIpv6.size()) == kIpv6;
65 }
66 
67 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
68 
69 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
70  public:
71   static const char kGoodMetadataKey[];
72   static const char kBadMetadataKey[];
73 
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful)74   TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
75                                 const grpc::string_ref& metadata_value,
76                                 bool is_blocking, bool is_successful)
77       : metadata_key_(metadata_key.data(), metadata_key.length()),
78         metadata_value_(metadata_value.data(), metadata_value.length()),
79         is_blocking_(is_blocking),
80         is_successful_(is_successful) {}
81 
IsBlocking() const82   bool IsBlocking() const override { return is_blocking_; }
83 
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<grpc::string,grpc::string> * metadata)84   Status GetMetadata(
85       grpc::string_ref service_url, grpc::string_ref method_name,
86       const grpc::AuthContext& channel_auth_context,
87       std::multimap<grpc::string, grpc::string>* metadata) override {
88     EXPECT_GT(service_url.length(), 0UL);
89     EXPECT_GT(method_name.length(), 0UL);
90     EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
91     EXPECT_TRUE(metadata != nullptr);
92     if (is_successful_) {
93       metadata->insert(std::make_pair(metadata_key_, metadata_value_));
94       return Status::OK;
95     } else {
96       return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
97     }
98   }
99 
100  private:
101   grpc::string metadata_key_;
102   grpc::string metadata_value_;
103   bool is_blocking_;
104   bool is_successful_;
105 };
106 
107 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
108     "TestPluginMetadata";
109 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
110     "test-plugin-metadata";
111 
112 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
113  public:
114   static const char kGoodGuy[];
115 
TestAuthMetadataProcessor(bool is_blocking)116   TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {}
117 
GetCompatibleClientCreds()118   std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
119     return MetadataCredentialsFromPlugin(
120         std::unique_ptr<MetadataCredentialsPlugin>(
121             new TestMetadataCredentialsPlugin(
122                 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
123                 is_blocking_, true)));
124   }
125 
GetIncompatibleClientCreds()126   std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
127     return MetadataCredentialsFromPlugin(
128         std::unique_ptr<MetadataCredentialsPlugin>(
129             new TestMetadataCredentialsPlugin(
130                 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
131                 is_blocking_, true)));
132   }
133 
134   // Interface implementation
IsBlocking() const135   bool IsBlocking() const override { return is_blocking_; }
136 
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)137   Status Process(const InputMetadata& auth_metadata, AuthContext* context,
138                  OutputMetadata* consumed_auth_metadata,
139                  OutputMetadata* response_metadata) override {
140     EXPECT_TRUE(consumed_auth_metadata != nullptr);
141     EXPECT_TRUE(context != nullptr);
142     EXPECT_TRUE(response_metadata != nullptr);
143     auto auth_md =
144         auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
145     EXPECT_NE(auth_md, auth_metadata.end());
146     string_ref auth_md_value = auth_md->second;
147     if (auth_md_value == kGoodGuy) {
148       context->AddProperty(kIdentityPropName, kGoodGuy);
149       context->SetPeerIdentityPropertyName(kIdentityPropName);
150       consumed_auth_metadata->insert(std::make_pair(
151           string(auth_md->first.data(), auth_md->first.length()),
152           string(auth_md->second.data(), auth_md->second.length())));
153       return Status::OK;
154     } else {
155       return Status(StatusCode::UNAUTHENTICATED,
156                     string("Invalid principal: ") +
157                         string(auth_md_value.data(), auth_md_value.length()));
158     }
159   }
160 
161  private:
162   static const char kIdentityPropName[];
163   bool is_blocking_;
164 };
165 
166 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
167 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
168 
169 class Proxy : public ::grpc::testing::EchoTestService::Service {
170  public:
Proxy(const std::shared_ptr<Channel> & channel)171   Proxy(const std::shared_ptr<Channel>& channel)
172       : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
173 
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)174   Status Echo(ServerContext* server_context, const EchoRequest* request,
175               EchoResponse* response) override {
176     std::unique_ptr<ClientContext> client_context =
177         ClientContext::FromServerContext(*server_context);
178     return stub_->Echo(client_context.get(), *request, response);
179   }
180 
181  private:
182   std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_;
183 };
184 
185 class TestServiceImplDupPkg
186     : public ::grpc::testing::duplicate::EchoTestService::Service {
187  public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)188   Status Echo(ServerContext* context, const EchoRequest* request,
189               EchoResponse* response) override {
190     response->set_message("no package");
191     return Status::OK;
192   }
193 };
194 
195 class TestScenario {
196  public:
TestScenario(bool proxy,bool inproc_stub,const grpc::string & creds_type)197   TestScenario(bool proxy, bool inproc_stub, const grpc::string& creds_type)
198       : use_proxy(proxy), inproc(inproc_stub), credentials_type(creds_type) {}
199   void Log() const;
200   bool use_proxy;
201   bool inproc;
202   const grpc::string credentials_type;
203 };
204 
operator <<(std::ostream & out,const TestScenario & scenario)205 static std::ostream& operator<<(std::ostream& out,
206                                 const TestScenario& scenario) {
207   return out << "TestScenario{use_proxy="
208              << (scenario.use_proxy ? "true" : "false")
209              << ", inproc=" << (scenario.inproc ? "true" : "false")
210              << ", credentials='" << scenario.credentials_type << "'}";
211 }
212 
Log() const213 void TestScenario::Log() const {
214   std::ostringstream out;
215   out << *this;
216   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
217 }
218 
219 class End2endTest : public ::testing::TestWithParam<TestScenario> {
220  protected:
End2endTest()221   End2endTest()
222       : is_server_started_(false),
223         kMaxMessageSize_(8192),
224         special_service_("special"),
225         first_picked_port_(0) {
226     GetParam().Log();
227   }
228 
TearDown()229   void TearDown() override {
230     if (is_server_started_) {
231       server_->Shutdown();
232       if (proxy_server_) proxy_server_->Shutdown();
233     }
234     if (first_picked_port_ > 0) {
235       grpc_recycle_unused_port(first_picked_port_);
236     }
237   }
238 
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)239   void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
240     int port = grpc_pick_unused_port_or_die();
241     first_picked_port_ = port;
242     server_address_ << "127.0.0.1:" << port;
243     // Setup server
244     BuildAndStartServer(processor);
245   }
246 
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)247   void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
248     if (is_server_started_) {
249       server_->Shutdown();
250       BuildAndStartServer(processor);
251     }
252   }
253 
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)254   void BuildAndStartServer(
255       const std::shared_ptr<AuthMetadataProcessor>& processor) {
256     ServerBuilder builder;
257     ConfigureServerBuilder(&builder);
258     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
259         GetParam().credentials_type);
260     if (GetParam().credentials_type != kInsecureCredentialsType) {
261       server_creds->SetAuthMetadataProcessor(processor);
262     }
263     builder.AddListeningPort(server_address_.str(), server_creds);
264     builder.RegisterService(&service_);
265     builder.RegisterService("foo.test.youtube.com", &special_service_);
266     builder.RegisterService(&dup_pkg_service_);
267 
268     builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
269     builder.SetSyncServerOption(
270         ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
271 
272     server_ = builder.BuildAndStart();
273     is_server_started_ = true;
274   }
275 
ConfigureServerBuilder(ServerBuilder * builder)276   virtual void ConfigureServerBuilder(ServerBuilder* builder) {
277     builder->SetMaxMessageSize(
278         kMaxMessageSize_);  // For testing max message size.
279   }
280 
ResetChannel()281   void ResetChannel() {
282     if (!is_server_started_) {
283       StartServer(std::shared_ptr<AuthMetadataProcessor>());
284     }
285     EXPECT_TRUE(is_server_started_);
286     ChannelArguments args;
287     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
288         GetParam().credentials_type, &args);
289     if (!user_agent_prefix_.empty()) {
290       args.SetUserAgentPrefix(user_agent_prefix_);
291     }
292     args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
293 
294     if (!GetParam().inproc) {
295       channel_ =
296           CreateCustomChannel(server_address_.str(), channel_creds, args);
297     } else {
298       channel_ = server_->InProcessChannel(args);
299     }
300   }
301 
ResetStub()302   void ResetStub() {
303     ResetChannel();
304     if (GetParam().use_proxy) {
305       proxy_service_.reset(new Proxy(channel_));
306       int port = grpc_pick_unused_port_or_die();
307       std::ostringstream proxyaddr;
308       proxyaddr << "localhost:" << port;
309       ServerBuilder builder;
310       builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
311       builder.RegisterService(proxy_service_.get());
312 
313       builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
314       builder.SetSyncServerOption(
315           ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
316 
317       proxy_server_ = builder.BuildAndStart();
318 
319       channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
320     }
321 
322     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
323   }
324 
325   bool is_server_started_;
326   std::shared_ptr<Channel> channel_;
327   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
328   std::unique_ptr<Server> server_;
329   std::unique_ptr<Server> proxy_server_;
330   std::unique_ptr<Proxy> proxy_service_;
331   std::ostringstream server_address_;
332   const int kMaxMessageSize_;
333   TestServiceImpl service_;
334   TestServiceImpl special_service_;
335   TestServiceImplDupPkg dup_pkg_service_;
336   grpc::string user_agent_prefix_;
337   int first_picked_port_;
338 };
339 
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)340 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
341                     bool with_binary_metadata) {
342   EchoRequest request;
343   EchoResponse response;
344   request.set_message("Hello hello hello hello");
345 
346   for (int i = 0; i < num_rpcs; ++i) {
347     ClientContext context;
348     if (with_binary_metadata) {
349       char bytes[8] = {'\0', '\1', '\2', '\3',
350                        '\4', '\5', '\6', static_cast<char>(i)};
351       context.AddMetadata("custom-bin", grpc::string(bytes, 8));
352     }
353     context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
354     Status s = stub->Echo(&context, request, &response);
355     EXPECT_EQ(response.message(), request.message());
356     EXPECT_TRUE(s.ok());
357   }
358 }
359 
360 // This class is for testing scenarios where RPCs are cancelled on the server
361 // by calling ServerContext::TryCancel()
362 class End2endServerTryCancelTest : public End2endTest {
363  protected:
364   // Helper for testing client-streaming RPCs which are cancelled on the server.
365   // Depending on the value of server_try_cancel parameter, this will test one
366   // of the following three scenarios:
367   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
368   //   any messages from the client
369   //
370   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
371   //   messages from the client
372   //
373   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
374   //   the messages from the client
375   //
376   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)377   void TestRequestStreamServerCancel(
378       ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
379     ResetStub();
380     EchoRequest request;
381     EchoResponse response;
382     ClientContext context;
383 
384     // Send server_try_cancel value in the client metadata
385     context.AddMetadata(kServerTryCancelRequest,
386                         grpc::to_string(server_try_cancel));
387 
388     auto stream = stub_->RequestStream(&context, &response);
389 
390     int num_msgs_sent = 0;
391     while (num_msgs_sent < num_msgs_to_send) {
392       request.set_message("hello");
393       if (!stream->Write(request)) {
394         break;
395       }
396       num_msgs_sent++;
397     }
398     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
399 
400     stream->WritesDone();
401     Status s = stream->Finish();
402 
403     // At this point, we know for sure that RPC was cancelled by the server
404     // since we passed server_try_cancel value in the metadata. Depending on the
405     // value of server_try_cancel, the RPC might have been cancelled by the
406     // server at different stages. The following validates our expectations of
407     // number of messages sent in various cancellation scenarios:
408 
409     switch (server_try_cancel) {
410       case CANCEL_BEFORE_PROCESSING:
411       case CANCEL_DURING_PROCESSING:
412         // If the RPC is cancelled by server before / during messages from the
413         // client, it means that the client most likely did not get a chance to
414         // send all the messages it wanted to send. i.e num_msgs_sent <=
415         // num_msgs_to_send
416         EXPECT_LE(num_msgs_sent, num_msgs_to_send);
417         break;
418 
419       case CANCEL_AFTER_PROCESSING:
420         // If the RPC was cancelled after all messages were read by the server,
421         // the client did get a chance to send all its messages
422         EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
423         break;
424 
425       default:
426         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
427                 server_try_cancel);
428         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
429                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
430         break;
431     }
432 
433     EXPECT_FALSE(s.ok());
434     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
435   }
436 
437   // Helper for testing server-streaming RPCs which are cancelled on the server.
438   // Depending on the value of server_try_cancel parameter, this will test one
439   // of the following three scenarios:
440   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
441   //   any messages to the client
442   //
443   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
444   //   messages to the client
445   //
446   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
447   //   the messages to the client
448   //
449   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)450   void TestResponseStreamServerCancel(
451       ServerTryCancelRequestPhase server_try_cancel) {
452     ResetStub();
453     EchoRequest request;
454     EchoResponse response;
455     ClientContext context;
456 
457     // Send server_try_cancel in the client metadata
458     context.AddMetadata(kServerTryCancelRequest,
459                         grpc::to_string(server_try_cancel));
460 
461     request.set_message("hello");
462     auto stream = stub_->ResponseStream(&context, request);
463 
464     int num_msgs_read = 0;
465     while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
466       if (!stream->Read(&response)) {
467         break;
468       }
469       EXPECT_EQ(response.message(),
470                 request.message() + grpc::to_string(num_msgs_read));
471       num_msgs_read++;
472     }
473     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
474 
475     Status s = stream->Finish();
476 
477     // Depending on the value of server_try_cancel, the RPC might have been
478     // cancelled by the server at different stages. The following validates our
479     // expectations of number of messages read in various cancellation
480     // scenarios:
481     switch (server_try_cancel) {
482       case CANCEL_BEFORE_PROCESSING:
483         // Server cancelled before sending any messages. Which means the client
484         // wouldn't have read any
485         EXPECT_EQ(num_msgs_read, 0);
486         break;
487 
488       case CANCEL_DURING_PROCESSING:
489         // Server cancelled while writing messages. Client must have read less
490         // than or equal to the expected number of messages
491         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
492         break;
493 
494       case CANCEL_AFTER_PROCESSING:
495         // Even though the Server cancelled after writing all messages, the RPC
496         // may be cancelled before the Client got a chance to read all the
497         // messages.
498         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
499         break;
500 
501       default: {
502         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
503                 server_try_cancel);
504         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
505                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
506         break;
507       }
508     }
509 
510     EXPECT_FALSE(s.ok());
511     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
512   }
513 
514   // Helper for testing bidirectional-streaming RPCs which are cancelled on the
515   // server. Depending on the value of server_try_cancel parameter, this will
516   // test one of the following three scenarios:
517   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
518   //   writing any messages from/to the client
519   //
520   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
521   //   writing messages from/to the client
522   //
523   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
524   //   all the messages from/to the client
525   //
526   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)527   void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
528                                   int num_messages) {
529     ResetStub();
530     EchoRequest request;
531     EchoResponse response;
532     ClientContext context;
533 
534     // Send server_try_cancel in the client metadata
535     context.AddMetadata(kServerTryCancelRequest,
536                         grpc::to_string(server_try_cancel));
537 
538     auto stream = stub_->BidiStream(&context);
539 
540     int num_msgs_read = 0;
541     int num_msgs_sent = 0;
542     while (num_msgs_sent < num_messages) {
543       request.set_message("hello " + grpc::to_string(num_msgs_sent));
544       if (!stream->Write(request)) {
545         break;
546       }
547       num_msgs_sent++;
548 
549       if (!stream->Read(&response)) {
550         break;
551       }
552       num_msgs_read++;
553 
554       EXPECT_EQ(response.message(), request.message());
555     }
556     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
557     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
558 
559     stream->WritesDone();
560     Status s = stream->Finish();
561 
562     // Depending on the value of server_try_cancel, the RPC might have been
563     // cancelled by the server at different stages. The following validates our
564     // expectations of number of messages read in various cancellation
565     // scenarios:
566     switch (server_try_cancel) {
567       case CANCEL_BEFORE_PROCESSING:
568         EXPECT_EQ(num_msgs_read, 0);
569         break;
570 
571       case CANCEL_DURING_PROCESSING:
572         EXPECT_LE(num_msgs_sent, num_messages);
573         EXPECT_LE(num_msgs_read, num_msgs_sent);
574         break;
575 
576       case CANCEL_AFTER_PROCESSING:
577         EXPECT_EQ(num_msgs_sent, num_messages);
578 
579         // The Server cancelled after reading the last message and after writing
580         // the message to the client. However, the RPC cancellation might have
581         // taken effect before the client actually read the response.
582         EXPECT_LE(num_msgs_read, num_msgs_sent);
583         break;
584 
585       default:
586         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
587                 server_try_cancel);
588         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
589                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
590         break;
591     }
592 
593     EXPECT_FALSE(s.ok());
594     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
595   }
596 };
597 
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)598 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
599   ResetStub();
600   EchoRequest request;
601   EchoResponse response;
602   ClientContext context;
603 
604   context.AddMetadata(kServerTryCancelRequest,
605                       grpc::to_string(CANCEL_BEFORE_PROCESSING));
606   Status s = stub_->Echo(&context, request, &response);
607   EXPECT_FALSE(s.ok());
608   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
609 }
610 
611 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)612 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
613   TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
614 }
615 
616 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)617 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
618   TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
619 }
620 
621 // Server to cancel after reading all the requests but before returning to the
622 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)623 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
624   TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
625 }
626 
627 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)628 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
629   TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
630 }
631 
632 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)633 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
634   TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
635 }
636 
637 // Server to cancel after writing all the respones to the stream but before
638 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)639 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
640   TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
641 }
642 
643 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)644 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
645   TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
646 }
647 
648 // Server to cancel while reading/writing requests/responses on the stream in
649 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)650 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
651   TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
652 }
653 
654 // Server to cancel after reading/writing all requests/responses on the stream
655 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)656 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
657   TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
658 }
659 
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)660 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
661   // User-Agent is an HTTP header for HTTP transports only
662   if (GetParam().inproc) {
663     return;
664   }
665   user_agent_prefix_ = "custom_prefix";
666   ResetStub();
667   EchoRequest request;
668   EchoResponse response;
669   request.set_message("Hello hello hello hello");
670   request.mutable_param()->set_echo_metadata(true);
671 
672   ClientContext context;
673   Status s = stub_->Echo(&context, request, &response);
674   EXPECT_EQ(response.message(), request.message());
675   EXPECT_TRUE(s.ok());
676   const auto& trailing_metadata = context.GetServerTrailingMetadata();
677   auto iter = trailing_metadata.find("user-agent");
678   EXPECT_TRUE(iter != trailing_metadata.end());
679   grpc::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
680   EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
681 }
682 
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)683 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
684   ResetStub();
685   std::vector<std::thread> threads;
686   threads.reserve(10);
687   for (int i = 0; i < 10; ++i) {
688     threads.emplace_back(SendRpc, stub_.get(), 10, true);
689   }
690   for (int i = 0; i < 10; ++i) {
691     threads[i].join();
692   }
693 }
694 
TEST_P(End2endTest,MultipleRpcs)695 TEST_P(End2endTest, MultipleRpcs) {
696   ResetStub();
697   std::vector<std::thread> threads;
698   threads.reserve(10);
699   for (int i = 0; i < 10; ++i) {
700     threads.emplace_back(SendRpc, stub_.get(), 10, false);
701   }
702   for (int i = 0; i < 10; ++i) {
703     threads[i].join();
704   }
705 }
706 
TEST_P(End2endTest,ReconnectChannel)707 TEST_P(End2endTest, ReconnectChannel) {
708   if (GetParam().inproc) {
709     return;
710   }
711   int poller_slowdown_factor = 1;
712   // It needs 2 pollset_works to reconnect the channel with polling engine
713   // "poll"
714   char* s = gpr_getenv("GRPC_POLL_STRATEGY");
715   if (s != nullptr && 0 == strcmp(s, "poll")) {
716     poller_slowdown_factor = 2;
717   }
718   gpr_free(s);
719   ResetStub();
720   SendRpc(stub_.get(), 1, false);
721   RestartServer(std::shared_ptr<AuthMetadataProcessor>());
722   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
723   // reconnect the channel.
724   gpr_sleep_until(gpr_time_add(
725       gpr_now(GPR_CLOCK_REALTIME),
726       gpr_time_from_millis(
727           300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
728           GPR_TIMESPAN)));
729   SendRpc(stub_.get(), 1, false);
730 }
731 
TEST_P(End2endTest,RequestStreamOneRequest)732 TEST_P(End2endTest, RequestStreamOneRequest) {
733   ResetStub();
734   EchoRequest request;
735   EchoResponse response;
736   ClientContext context;
737 
738   auto stream = stub_->RequestStream(&context, &response);
739   request.set_message("hello");
740   EXPECT_TRUE(stream->Write(request));
741   stream->WritesDone();
742   Status s = stream->Finish();
743   EXPECT_EQ(response.message(), request.message());
744   EXPECT_TRUE(s.ok());
745   EXPECT_TRUE(context.debug_error_string().empty());
746 }
747 
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)748 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
749   ResetStub();
750   EchoRequest request;
751   EchoResponse response;
752   ClientContext context;
753 
754   context.set_initial_metadata_corked(true);
755   auto stream = stub_->RequestStream(&context, &response);
756   request.set_message("hello");
757   stream->WriteLast(request, WriteOptions());
758   Status s = stream->Finish();
759   EXPECT_EQ(response.message(), request.message());
760   EXPECT_TRUE(s.ok());
761 }
762 
TEST_P(End2endTest,RequestStreamTwoRequests)763 TEST_P(End2endTest, RequestStreamTwoRequests) {
764   ResetStub();
765   EchoRequest request;
766   EchoResponse response;
767   ClientContext context;
768 
769   auto stream = stub_->RequestStream(&context, &response);
770   request.set_message("hello");
771   EXPECT_TRUE(stream->Write(request));
772   EXPECT_TRUE(stream->Write(request));
773   stream->WritesDone();
774   Status s = stream->Finish();
775   EXPECT_EQ(response.message(), "hellohello");
776   EXPECT_TRUE(s.ok());
777 }
778 
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)779 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
780   ResetStub();
781   EchoRequest request;
782   EchoResponse response;
783   ClientContext context;
784 
785   auto stream = stub_->RequestStream(&context, &response);
786   request.set_message("hello");
787   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
788   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
789   stream->WritesDone();
790   Status s = stream->Finish();
791   EXPECT_EQ(response.message(), "hellohello");
792   EXPECT_TRUE(s.ok());
793 }
794 
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)795 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
796   ResetStub();
797   EchoRequest request;
798   EchoResponse response;
799   ClientContext context;
800 
801   context.set_initial_metadata_corked(true);
802   auto stream = stub_->RequestStream(&context, &response);
803   request.set_message("hello");
804   EXPECT_TRUE(stream->Write(request));
805   stream->WriteLast(request, WriteOptions());
806   Status s = stream->Finish();
807   EXPECT_EQ(response.message(), "hellohello");
808   EXPECT_TRUE(s.ok());
809 }
810 
TEST_P(End2endTest,ResponseStream)811 TEST_P(End2endTest, ResponseStream) {
812   ResetStub();
813   EchoRequest request;
814   EchoResponse response;
815   ClientContext context;
816   request.set_message("hello");
817 
818   auto stream = stub_->ResponseStream(&context, request);
819   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
820     EXPECT_TRUE(stream->Read(&response));
821     EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
822   }
823   EXPECT_FALSE(stream->Read(&response));
824 
825   Status s = stream->Finish();
826   EXPECT_TRUE(s.ok());
827 }
828 
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)829 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
830   ResetStub();
831   EchoRequest request;
832   EchoResponse response;
833   ClientContext context;
834   request.set_message("hello");
835   context.AddMetadata(kServerUseCoalescingApi, "1");
836 
837   auto stream = stub_->ResponseStream(&context, request);
838   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
839     EXPECT_TRUE(stream->Read(&response));
840     EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
841   }
842   EXPECT_FALSE(stream->Read(&response));
843 
844   Status s = stream->Finish();
845   EXPECT_TRUE(s.ok());
846 }
847 
848 // This was added to prevent regression from issue:
849 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)850 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
851   ResetStub();
852   EchoRequest request;
853   EchoResponse response;
854   ClientContext context;
855   request.set_message("hello");
856   context.AddMetadata(kServerUseCoalescingApi, "1");
857   // We will only send one message, forcing everything (init metadata, message,
858   // trailing) to be coalesced together.
859   context.AddMetadata(kServerResponseStreamsToSend, "1");
860 
861   auto stream = stub_->ResponseStream(&context, request);
862   EXPECT_TRUE(stream->Read(&response));
863   EXPECT_EQ(response.message(), request.message() + "0");
864 
865   EXPECT_FALSE(stream->Read(&response));
866 
867   Status s = stream->Finish();
868   EXPECT_TRUE(s.ok());
869 }
870 
TEST_P(End2endTest,BidiStream)871 TEST_P(End2endTest, BidiStream) {
872   ResetStub();
873   EchoRequest request;
874   EchoResponse response;
875   ClientContext context;
876   grpc::string msg("hello");
877 
878   auto stream = stub_->BidiStream(&context);
879 
880   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
881     request.set_message(msg + grpc::to_string(i));
882     EXPECT_TRUE(stream->Write(request));
883     EXPECT_TRUE(stream->Read(&response));
884     EXPECT_EQ(response.message(), request.message());
885   }
886 
887   stream->WritesDone();
888   EXPECT_FALSE(stream->Read(&response));
889   EXPECT_FALSE(stream->Read(&response));
890 
891   Status s = stream->Finish();
892   EXPECT_TRUE(s.ok());
893 }
894 
TEST_P(End2endTest,BidiStreamWithCoalescingApi)895 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
896   ResetStub();
897   EchoRequest request;
898   EchoResponse response;
899   ClientContext context;
900   context.AddMetadata(kServerFinishAfterNReads, "3");
901   context.set_initial_metadata_corked(true);
902   grpc::string msg("hello");
903 
904   auto stream = stub_->BidiStream(&context);
905 
906   request.set_message(msg + "0");
907   EXPECT_TRUE(stream->Write(request));
908   EXPECT_TRUE(stream->Read(&response));
909   EXPECT_EQ(response.message(), request.message());
910 
911   request.set_message(msg + "1");
912   EXPECT_TRUE(stream->Write(request));
913   EXPECT_TRUE(stream->Read(&response));
914   EXPECT_EQ(response.message(), request.message());
915 
916   request.set_message(msg + "2");
917   stream->WriteLast(request, WriteOptions());
918   EXPECT_TRUE(stream->Read(&response));
919   EXPECT_EQ(response.message(), request.message());
920 
921   EXPECT_FALSE(stream->Read(&response));
922   EXPECT_FALSE(stream->Read(&response));
923 
924   Status s = stream->Finish();
925   EXPECT_TRUE(s.ok());
926 }
927 
928 // This was added to prevent regression from issue:
929 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)930 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
931   ResetStub();
932   EchoRequest request;
933   EchoResponse response;
934   ClientContext context;
935   context.AddMetadata(kServerFinishAfterNReads, "1");
936   context.set_initial_metadata_corked(true);
937   grpc::string msg("hello");
938 
939   auto stream = stub_->BidiStream(&context);
940 
941   request.set_message(msg + "0");
942   stream->WriteLast(request, WriteOptions());
943   EXPECT_TRUE(stream->Read(&response));
944   EXPECT_EQ(response.message(), request.message());
945 
946   EXPECT_FALSE(stream->Read(&response));
947   EXPECT_FALSE(stream->Read(&response));
948 
949   Status s = stream->Finish();
950   EXPECT_TRUE(s.ok());
951 }
952 
953 // Talk to the two services with the same name but different package names.
954 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)955 TEST_P(End2endTest, DiffPackageServices) {
956   ResetStub();
957   EchoRequest request;
958   EchoResponse response;
959   request.set_message("Hello");
960 
961   ClientContext context;
962   Status s = stub_->Echo(&context, request, &response);
963   EXPECT_EQ(response.message(), request.message());
964   EXPECT_TRUE(s.ok());
965 
966   std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
967       grpc::testing::duplicate::EchoTestService::NewStub(channel_));
968   ClientContext context2;
969   s = dup_pkg_stub->Echo(&context2, request, &response);
970   EXPECT_EQ("no package", response.message());
971   EXPECT_TRUE(s.ok());
972 }
973 
CancelRpc(ClientContext * context,int delay_us,TestServiceImpl * service)974 void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
975   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
976                                gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
977   while (!service->signal_client()) {
978   }
979   context->TryCancel();
980 }
981 
TEST_P(End2endTest,CancelRpcBeforeStart)982 TEST_P(End2endTest, CancelRpcBeforeStart) {
983   ResetStub();
984   EchoRequest request;
985   EchoResponse response;
986   ClientContext context;
987   request.set_message("hello");
988   context.TryCancel();
989   Status s = stub_->Echo(&context, request, &response);
990   EXPECT_EQ("", response.message());
991   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
992 }
993 
994 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)995 TEST_P(End2endTest, ClientCancelsRequestStream) {
996   ResetStub();
997   EchoRequest request;
998   EchoResponse response;
999   ClientContext context;
1000   request.set_message("hello");
1001 
1002   auto stream = stub_->RequestStream(&context, &response);
1003   EXPECT_TRUE(stream->Write(request));
1004   EXPECT_TRUE(stream->Write(request));
1005 
1006   context.TryCancel();
1007 
1008   Status s = stream->Finish();
1009   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1010 
1011   EXPECT_EQ(response.message(), "");
1012 }
1013 
1014 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1015 TEST_P(End2endTest, ClientCancelsResponseStream) {
1016   ResetStub();
1017   EchoRequest request;
1018   EchoResponse response;
1019   ClientContext context;
1020   request.set_message("hello");
1021 
1022   auto stream = stub_->ResponseStream(&context, request);
1023 
1024   EXPECT_TRUE(stream->Read(&response));
1025   EXPECT_EQ(response.message(), request.message() + "0");
1026   EXPECT_TRUE(stream->Read(&response));
1027   EXPECT_EQ(response.message(), request.message() + "1");
1028 
1029   context.TryCancel();
1030 
1031   // The cancellation races with responses, so there might be zero or
1032   // one responses pending, read till failure
1033 
1034   if (stream->Read(&response)) {
1035     EXPECT_EQ(response.message(), request.message() + "2");
1036     // Since we have cancelled, we expect the next attempt to read to fail
1037     EXPECT_FALSE(stream->Read(&response));
1038   }
1039 
1040   Status s = stream->Finish();
1041   // The final status could be either of CANCELLED or OK depending on
1042   // who won the race.
1043   EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1044 }
1045 
1046 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1047 TEST_P(End2endTest, ClientCancelsBidi) {
1048   ResetStub();
1049   EchoRequest request;
1050   EchoResponse response;
1051   ClientContext context;
1052   grpc::string msg("hello");
1053 
1054   auto stream = stub_->BidiStream(&context);
1055 
1056   request.set_message(msg + "0");
1057   EXPECT_TRUE(stream->Write(request));
1058   EXPECT_TRUE(stream->Read(&response));
1059   EXPECT_EQ(response.message(), request.message());
1060 
1061   request.set_message(msg + "1");
1062   EXPECT_TRUE(stream->Write(request));
1063 
1064   context.TryCancel();
1065 
1066   // The cancellation races with responses, so there might be zero or
1067   // one responses pending, read till failure
1068 
1069   if (stream->Read(&response)) {
1070     EXPECT_EQ(response.message(), request.message());
1071     // Since we have cancelled, we expect the next attempt to read to fail
1072     EXPECT_FALSE(stream->Read(&response));
1073   }
1074 
1075   Status s = stream->Finish();
1076   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1077 }
1078 
TEST_P(End2endTest,RpcMaxMessageSize)1079 TEST_P(End2endTest, RpcMaxMessageSize) {
1080   ResetStub();
1081   EchoRequest request;
1082   EchoResponse response;
1083   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1084   request.mutable_param()->set_server_die(true);
1085 
1086   ClientContext context;
1087   Status s = stub_->Echo(&context, request, &response);
1088   EXPECT_FALSE(s.ok());
1089 }
1090 
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1091 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1092                       gpr_event* ev) {
1093   EchoResponse resp;
1094   gpr_event_set(ev, (void*)1);
1095   while (stream->Read(&resp)) {
1096     gpr_log(GPR_INFO, "Read message");
1097   }
1098 }
1099 
1100 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1101 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1102   ResetStub();
1103   ClientContext context;
1104   gpr_event ev;
1105   gpr_event_init(&ev);
1106   auto stream = stub_->BidiStream(&context);
1107   std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1108   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1109   stream->WritesDone();
1110   reader_thread.join();
1111   Status s = stream->Finish();
1112   EXPECT_TRUE(s.ok());
1113 }
1114 
TEST_P(End2endTest,ChannelState)1115 TEST_P(End2endTest, ChannelState) {
1116   if (GetParam().inproc) {
1117     return;
1118   }
1119 
1120   ResetStub();
1121   // Start IDLE
1122   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1123 
1124   // Did not ask to connect, no state change.
1125   CompletionQueue cq;
1126   std::chrono::system_clock::time_point deadline =
1127       std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1128   channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1129   void* tag;
1130   bool ok = true;
1131   cq.Next(&tag, &ok);
1132   EXPECT_FALSE(ok);
1133 
1134   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1135   EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1136                                            gpr_inf_future(GPR_CLOCK_REALTIME)));
1137   auto state = channel_->GetState(false);
1138   EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1139 }
1140 
1141 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1142 TEST_P(End2endTest, ChannelStateTimeout) {
1143   if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1144       GetParam().inproc) {
1145     return;
1146   }
1147   int port = grpc_pick_unused_port_or_die();
1148   std::ostringstream server_address;
1149   server_address << "127.0.0.1:" << port;
1150   // Channel to non-existing server
1151   auto channel =
1152       CreateChannel(server_address.str(), InsecureChannelCredentials());
1153   // Start IDLE
1154   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1155 
1156   auto state = GRPC_CHANNEL_IDLE;
1157   for (int i = 0; i < 10; i++) {
1158     channel->WaitForStateChange(
1159         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1160     state = channel->GetState(false);
1161   }
1162 }
1163 
1164 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1165 TEST_P(End2endTest, NonExistingService) {
1166   ResetChannel();
1167   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1168   stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1169 
1170   EchoRequest request;
1171   EchoResponse response;
1172   request.set_message("Hello");
1173 
1174   ClientContext context;
1175   Status s = stub->Unimplemented(&context, request, &response);
1176   EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1177   EXPECT_EQ("", s.error_message());
1178 }
1179 
1180 // Ask the server to send back a serialized proto in trailer.
1181 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1182 TEST_P(End2endTest, BinaryTrailerTest) {
1183   ResetStub();
1184   EchoRequest request;
1185   EchoResponse response;
1186   ClientContext context;
1187 
1188   request.mutable_param()->set_echo_metadata(true);
1189   DebugInfo* info = request.mutable_param()->mutable_debug_info();
1190   info->add_stack_entries("stack_entry_1");
1191   info->add_stack_entries("stack_entry_2");
1192   info->add_stack_entries("stack_entry_3");
1193   info->set_detail("detailed debug info");
1194   grpc::string expected_string = info->SerializeAsString();
1195   request.set_message("Hello");
1196 
1197   Status s = stub_->Echo(&context, request, &response);
1198   EXPECT_FALSE(s.ok());
1199   auto trailers = context.GetServerTrailingMetadata();
1200   EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1201   auto iter = trailers.find(kDebugInfoTrailerKey);
1202   EXPECT_EQ(expected_string, iter->second);
1203   // Parse the returned trailer into a DebugInfo proto.
1204   DebugInfo returned_info;
1205   EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1206 }
1207 
TEST_P(End2endTest,ExpectErrorTest)1208 TEST_P(End2endTest, ExpectErrorTest) {
1209   ResetStub();
1210 
1211   std::vector<ErrorStatus> expected_status;
1212   expected_status.emplace_back();
1213   expected_status.back().set_code(13);  // INTERNAL
1214   // No Error message or details
1215 
1216   expected_status.emplace_back();
1217   expected_status.back().set_code(13);  // INTERNAL
1218   expected_status.back().set_error_message("text error message");
1219   expected_status.back().set_binary_error_details("text error details");
1220 
1221   expected_status.emplace_back();
1222   expected_status.back().set_code(13);  // INTERNAL
1223   expected_status.back().set_error_message("text error message");
1224   expected_status.back().set_binary_error_details(
1225       "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1226 
1227   for (auto iter = expected_status.begin(); iter != expected_status.end();
1228        ++iter) {
1229     EchoRequest request;
1230     EchoResponse response;
1231     ClientContext context;
1232     request.set_message("Hello");
1233     auto* error = request.mutable_param()->mutable_expected_error();
1234     error->set_code(iter->code());
1235     error->set_error_message(iter->error_message());
1236     error->set_binary_error_details(iter->binary_error_details());
1237 
1238     Status s = stub_->Echo(&context, request, &response);
1239     EXPECT_FALSE(s.ok());
1240     EXPECT_EQ(iter->code(), s.error_code());
1241     EXPECT_EQ(iter->error_message(), s.error_message());
1242     EXPECT_EQ(iter->binary_error_details(), s.error_details());
1243     EXPECT_TRUE(context.debug_error_string().find("created") !=
1244                 std::string::npos);
1245     EXPECT_TRUE(context.debug_error_string().find("file") != std::string::npos);
1246     EXPECT_TRUE(context.debug_error_string().find("line") != std::string::npos);
1247     EXPECT_TRUE(context.debug_error_string().find("status") !=
1248                 std::string::npos);
1249     EXPECT_TRUE(context.debug_error_string().find("13") != std::string::npos);
1250   }
1251 }
1252 
1253 //////////////////////////////////////////////////////////////////////////
1254 // Test with and without a proxy.
1255 class ProxyEnd2endTest : public End2endTest {
1256  protected:
1257 };
1258 
TEST_P(ProxyEnd2endTest,SimpleRpc)1259 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1260   ResetStub();
1261   SendRpc(stub_.get(), 1, false);
1262 }
1263 
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1264 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1265   ResetStub();
1266   EchoRequest request;
1267   EchoResponse response;
1268 
1269   ClientContext context;
1270   Status s = stub_->Echo(&context, request, &response);
1271   EXPECT_TRUE(s.ok());
1272 }
1273 
TEST_P(ProxyEnd2endTest,MultipleRpcs)1274 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1275   ResetStub();
1276   std::vector<std::thread> threads;
1277   threads.reserve(10);
1278   for (int i = 0; i < 10; ++i) {
1279     threads.emplace_back(SendRpc, stub_.get(), 10, false);
1280   }
1281   for (int i = 0; i < 10; ++i) {
1282     threads[i].join();
1283   }
1284 }
1285 
1286 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1287 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1288   ResetStub();
1289   EchoRequest request;
1290   EchoResponse response;
1291   request.set_message("Hello");
1292   request.mutable_param()->set_skip_cancelled_check(true);
1293   // Let server sleep for 40 ms first to guarantee expiry.
1294   // 40 ms might seem a bit extreme but the timer manager would have been just
1295   // initialized (when ResetStub() was called) and there are some warmup costs
1296   // i.e the timer thread many not have even started. There might also be other
1297   // delays in the timer manager thread (in acquiring locks, timer data
1298   // structure manipulations, starting backup timer threads) that add to the
1299   // delays. 40ms is still not enough in some cases but this significantly
1300   // reduces the test flakes
1301   request.mutable_param()->set_server_sleep_us(40 * 1000);
1302 
1303   ClientContext context;
1304   std::chrono::system_clock::time_point deadline =
1305       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1306   context.set_deadline(deadline);
1307   Status s = stub_->Echo(&context, request, &response);
1308   EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1309 }
1310 
1311 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1312 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1313   ResetStub();
1314   EchoRequest request;
1315   EchoResponse response;
1316   request.set_message("Hello");
1317 
1318   ClientContext context;
1319   std::chrono::system_clock::time_point deadline =
1320       std::chrono::system_clock::now() + std::chrono::hours(1);
1321   context.set_deadline(deadline);
1322   Status s = stub_->Echo(&context, request, &response);
1323   EXPECT_EQ(response.message(), request.message());
1324   EXPECT_TRUE(s.ok());
1325 }
1326 
1327 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1328 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1329   ResetStub();
1330   EchoRequest request;
1331   EchoResponse response;
1332   request.set_message("Hello");
1333   request.mutable_param()->set_echo_deadline(true);
1334 
1335   ClientContext context;
1336   std::chrono::system_clock::time_point deadline =
1337       std::chrono::system_clock::now() + std::chrono::seconds(100);
1338   context.set_deadline(deadline);
1339   Status s = stub_->Echo(&context, request, &response);
1340   EXPECT_EQ(response.message(), request.message());
1341   EXPECT_TRUE(s.ok());
1342   gpr_timespec sent_deadline;
1343   Timepoint2Timespec(deadline, &sent_deadline);
1344   // We want to allow some reasonable error given:
1345   // - request_deadline() only has 1sec resolution so the best we can do is +-1
1346   // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1347   // can end up being off by 2 in one direction.
1348   EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1349   EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1350 }
1351 
1352 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1353 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1354   ResetStub();
1355   EchoRequest request;
1356   EchoResponse response;
1357   request.set_message("Hello");
1358   request.mutable_param()->set_echo_deadline(true);
1359 
1360   ClientContext context;
1361   Status s = stub_->Echo(&context, request, &response);
1362   EXPECT_EQ(response.message(), request.message());
1363   EXPECT_TRUE(s.ok());
1364   EXPECT_EQ(response.param().request_deadline(),
1365             gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1366 }
1367 
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1368 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1369   ResetStub();
1370   EchoRequest request;
1371   EchoResponse response;
1372   request.set_message("Hello");
1373 
1374   ClientContext context;
1375   Status s = stub_->Unimplemented(&context, request, &response);
1376   EXPECT_FALSE(s.ok());
1377   EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1378   EXPECT_EQ(s.error_message(), "");
1379   EXPECT_EQ(response.message(), "");
1380 }
1381 
1382 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1383 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1384   ResetStub();
1385   EchoRequest request;
1386   EchoResponse response;
1387   request.set_message("Hello");
1388   const int kCancelDelayUs = 10 * 1000;
1389   request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1390 
1391   ClientContext context;
1392   std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_);
1393   Status s = stub_->Echo(&context, request, &response);
1394   cancel_thread.join();
1395   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1396   EXPECT_EQ(s.error_message(), "Cancelled");
1397 }
1398 
1399 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1400 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1401   ResetStub();
1402   EchoRequest request;
1403   EchoResponse response;
1404   request.set_message("Hello");
1405   request.mutable_param()->set_server_cancel_after_us(1000);
1406 
1407   ClientContext context;
1408   Status s = stub_->Echo(&context, request, &response);
1409   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1410   EXPECT_TRUE(s.error_message().empty());
1411 }
1412 
1413 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1414 TEST_P(ProxyEnd2endTest, HugeResponse) {
1415   ResetStub();
1416   EchoRequest request;
1417   EchoResponse response;
1418   request.set_message("huge response");
1419   const size_t kResponseSize = 1024 * (1024 + 10);
1420   request.mutable_param()->set_response_message_length(kResponseSize);
1421 
1422   ClientContext context;
1423   std::chrono::system_clock::time_point deadline =
1424       std::chrono::system_clock::now() + std::chrono::seconds(20);
1425   context.set_deadline(deadline);
1426   Status s = stub_->Echo(&context, request, &response);
1427   EXPECT_EQ(kResponseSize, response.message().size());
1428   EXPECT_TRUE(s.ok());
1429 }
1430 
TEST_P(ProxyEnd2endTest,Peer)1431 TEST_P(ProxyEnd2endTest, Peer) {
1432   // Peer is not meaningful for inproc
1433   if (GetParam().inproc) {
1434     return;
1435   }
1436   ResetStub();
1437   EchoRequest request;
1438   EchoResponse response;
1439   request.set_message("hello");
1440   request.mutable_param()->set_echo_peer(true);
1441 
1442   ClientContext context;
1443   Status s = stub_->Echo(&context, request, &response);
1444   EXPECT_EQ(response.message(), request.message());
1445   EXPECT_TRUE(s.ok());
1446   EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1447   EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1448 }
1449 
1450 //////////////////////////////////////////////////////////////////////////
1451 class SecureEnd2endTest : public End2endTest {
1452  protected:
SecureEnd2endTest()1453   SecureEnd2endTest() {
1454     GPR_ASSERT(!GetParam().use_proxy);
1455     GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType);
1456   }
1457 };
1458 
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1459 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1460   ResetStub();
1461 
1462   EchoRequest request;
1463   EchoResponse response;
1464   request.set_message("Hello");
1465 
1466   ClientContext context;
1467   context.set_authority("foo.test.youtube.com");
1468   Status s = stub_->Echo(&context, request, &response);
1469   EXPECT_EQ(response.message(), request.message());
1470   EXPECT_TRUE(response.has_param());
1471   EXPECT_EQ("special", response.param().host());
1472   EXPECT_TRUE(s.ok());
1473 }
1474 
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const grpc::string & key,const grpc::string & value)1475 bool MetadataContains(
1476     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1477     const grpc::string& key, const grpc::string& value) {
1478   int count = 0;
1479 
1480   for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1481            metadata.begin();
1482        iter != metadata.end(); ++iter) {
1483     if (ToString(iter->first) == key && ToString(iter->second) == value) {
1484       count++;
1485     }
1486   }
1487   return count == 1;
1488 }
1489 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1490 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1491   auto* processor = new TestAuthMetadataProcessor(true);
1492   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1493   ResetStub();
1494   EchoRequest request;
1495   EchoResponse response;
1496   ClientContext context;
1497   context.set_credentials(processor->GetCompatibleClientCreds());
1498   request.set_message("Hello");
1499   request.mutable_param()->set_echo_metadata(true);
1500   request.mutable_param()->set_expected_client_identity(
1501       TestAuthMetadataProcessor::kGoodGuy);
1502   request.mutable_param()->set_expected_transport_security_type(
1503       GetParam().credentials_type);
1504 
1505   Status s = stub_->Echo(&context, request, &response);
1506   EXPECT_EQ(request.message(), response.message());
1507   EXPECT_TRUE(s.ok());
1508 
1509   // Metadata should have been consumed by the processor.
1510   EXPECT_FALSE(MetadataContains(
1511       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1512       grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1513 }
1514 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1515 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1516   auto* processor = new TestAuthMetadataProcessor(true);
1517   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1518   ResetStub();
1519   EchoRequest request;
1520   EchoResponse response;
1521   ClientContext context;
1522   context.set_credentials(processor->GetIncompatibleClientCreds());
1523   request.set_message("Hello");
1524 
1525   Status s = stub_->Echo(&context, request, &response);
1526   EXPECT_FALSE(s.ok());
1527   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1528 }
1529 
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1530 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1531   ResetStub();
1532   EchoRequest request;
1533   EchoResponse response;
1534   ClientContext context;
1535   std::shared_ptr<CallCredentials> creds =
1536       GoogleIAMCredentials("fake_token", "fake_selector");
1537   context.set_credentials(creds);
1538   request.set_message("Hello");
1539   request.mutable_param()->set_echo_metadata(true);
1540 
1541   Status s = stub_->Echo(&context, request, &response);
1542   EXPECT_EQ(request.message(), response.message());
1543   EXPECT_TRUE(s.ok());
1544   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1545                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1546                                "fake_token"));
1547   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1548                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1549                                "fake_selector"));
1550 }
1551 
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1552 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1553   ResetStub();
1554   EchoRequest request;
1555   EchoResponse response;
1556   ClientContext context;
1557   std::shared_ptr<CallCredentials> creds1 =
1558       GoogleIAMCredentials("fake_token1", "fake_selector1");
1559   context.set_credentials(creds1);
1560   std::shared_ptr<CallCredentials> creds2 =
1561       GoogleIAMCredentials("fake_token2", "fake_selector2");
1562   context.set_credentials(creds2);
1563   request.set_message("Hello");
1564   request.mutable_param()->set_echo_metadata(true);
1565 
1566   Status s = stub_->Echo(&context, request, &response);
1567   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1568                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1569                                "fake_token2"));
1570   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1571                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1572                                "fake_selector2"));
1573   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1574                                 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1575                                 "fake_token1"));
1576   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1577                                 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1578                                 "fake_selector1"));
1579   EXPECT_EQ(request.message(), response.message());
1580   EXPECT_TRUE(s.ok());
1581 }
1582 
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1583 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1584   ResetStub();
1585   EchoRequest request;
1586   EchoResponse response;
1587   ClientContext context;
1588   context.set_credentials(
1589       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1590           new TestMetadataCredentialsPlugin(
1591               TestMetadataCredentialsPlugin::kBadMetadataKey,
1592               "Does not matter, will fail the key is invalid.", false, true))));
1593   request.set_message("Hello");
1594 
1595   Status s = stub_->Echo(&context, request, &response);
1596   EXPECT_FALSE(s.ok());
1597   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1598 }
1599 
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1600 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1601   ResetStub();
1602   EchoRequest request;
1603   EchoResponse response;
1604   ClientContext context;
1605   context.set_credentials(
1606       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1607           new TestMetadataCredentialsPlugin(
1608               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1609               "With illegal \n value.", false, true))));
1610   request.set_message("Hello");
1611 
1612   Status s = stub_->Echo(&context, request, &response);
1613   EXPECT_FALSE(s.ok());
1614   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1615 }
1616 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)1617 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
1618   ResetStub();
1619   EchoRequest request;
1620   EchoResponse response;
1621   ClientContext context;
1622   context.set_credentials(
1623       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1624           new TestMetadataCredentialsPlugin(
1625               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1626               "Does not matter, will fail anyway (see 3rd param)", false,
1627               false))));
1628   request.set_message("Hello");
1629 
1630   Status s = stub_->Echo(&context, request, &response);
1631   EXPECT_FALSE(s.ok());
1632   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1633   EXPECT_EQ(s.error_message(),
1634             grpc::string("Getting metadata from plugin failed with error: ") +
1635                 kTestCredsPluginErrorMsg);
1636 }
1637 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)1638 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
1639   auto* processor = new TestAuthMetadataProcessor(false);
1640   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1641   ResetStub();
1642   EchoRequest request;
1643   EchoResponse response;
1644   ClientContext context;
1645   context.set_credentials(processor->GetCompatibleClientCreds());
1646   request.set_message("Hello");
1647   request.mutable_param()->set_echo_metadata(true);
1648   request.mutable_param()->set_expected_client_identity(
1649       TestAuthMetadataProcessor::kGoodGuy);
1650   request.mutable_param()->set_expected_transport_security_type(
1651       GetParam().credentials_type);
1652 
1653   Status s = stub_->Echo(&context, request, &response);
1654   EXPECT_EQ(request.message(), response.message());
1655   EXPECT_TRUE(s.ok());
1656 
1657   // Metadata should have been consumed by the processor.
1658   EXPECT_FALSE(MetadataContains(
1659       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1660       grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1661 }
1662 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)1663 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
1664   auto* processor = new TestAuthMetadataProcessor(false);
1665   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1666   ResetStub();
1667   EchoRequest request;
1668   EchoResponse response;
1669   ClientContext context;
1670   context.set_credentials(processor->GetIncompatibleClientCreds());
1671   request.set_message("Hello");
1672 
1673   Status s = stub_->Echo(&context, request, &response);
1674   EXPECT_FALSE(s.ok());
1675   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1676 }
1677 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)1678 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
1679   ResetStub();
1680   EchoRequest request;
1681   EchoResponse response;
1682   ClientContext context;
1683   context.set_credentials(
1684       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1685           new TestMetadataCredentialsPlugin(
1686               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1687               "Does not matter, will fail anyway (see 3rd param)", true,
1688               false))));
1689   request.set_message("Hello");
1690 
1691   Status s = stub_->Echo(&context, request, &response);
1692   EXPECT_FALSE(s.ok());
1693   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1694   EXPECT_EQ(s.error_message(),
1695             grpc::string("Getting metadata from plugin failed with error: ") +
1696                 kTestCredsPluginErrorMsg);
1697 }
1698 
TEST_P(SecureEnd2endTest,CompositeCallCreds)1699 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
1700   ResetStub();
1701   EchoRequest request;
1702   EchoResponse response;
1703   ClientContext context;
1704   const char kMetadataKey1[] = "call-creds-key1";
1705   const char kMetadataKey2[] = "call-creds-key2";
1706   const char kMetadataVal1[] = "call-creds-val1";
1707   const char kMetadataVal2[] = "call-creds-val2";
1708 
1709   context.set_credentials(CompositeCallCredentials(
1710       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1711           new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1, true,
1712                                             true))),
1713       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1714           new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2, true,
1715                                             true)))));
1716   request.set_message("Hello");
1717   request.mutable_param()->set_echo_metadata(true);
1718 
1719   Status s = stub_->Echo(&context, request, &response);
1720   EXPECT_TRUE(s.ok());
1721   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1722                                kMetadataKey1, kMetadataVal1));
1723   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1724                                kMetadataKey2, kMetadataVal2));
1725 }
1726 
TEST_P(SecureEnd2endTest,ClientAuthContext)1727 TEST_P(SecureEnd2endTest, ClientAuthContext) {
1728   ResetStub();
1729   EchoRequest request;
1730   EchoResponse response;
1731   request.set_message("Hello");
1732   request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
1733                                                   kTlsCredentialsType);
1734   request.mutable_param()->set_expected_transport_security_type(
1735       GetParam().credentials_type);
1736   ClientContext context;
1737   Status s = stub_->Echo(&context, request, &response);
1738   EXPECT_EQ(response.message(), request.message());
1739   EXPECT_TRUE(s.ok());
1740 
1741   std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
1742   std::vector<grpc::string_ref> tst =
1743       auth_ctx->FindPropertyValues("transport_security_type");
1744   ASSERT_EQ(1u, tst.size());
1745   EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
1746   if (GetParam().credentials_type == kTlsCredentialsType) {
1747     EXPECT_EQ("x509_subject_alternative_name",
1748               auth_ctx->GetPeerIdentityPropertyName());
1749     EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
1750     EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
1751     EXPECT_EQ("waterzooi.test.google.be",
1752               ToString(auth_ctx->GetPeerIdentity()[1]));
1753     EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
1754     EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
1755   }
1756 }
1757 
1758 class ResourceQuotaEnd2endTest : public End2endTest {
1759  public:
ResourceQuotaEnd2endTest()1760   ResourceQuotaEnd2endTest()
1761       : server_resource_quota_("server_resource_quota") {}
1762 
ConfigureServerBuilder(ServerBuilder * builder)1763   virtual void ConfigureServerBuilder(ServerBuilder* builder) override {
1764     builder->SetResourceQuota(server_resource_quota_);
1765   }
1766 
1767  private:
1768   ResourceQuota server_resource_quota_;
1769 };
1770 
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)1771 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
1772   ResetStub();
1773 
1774   EchoRequest request;
1775   EchoResponse response;
1776   request.set_message("Hello");
1777 
1778   ClientContext context;
1779   Status s = stub_->Echo(&context, request, &response);
1780   EXPECT_EQ(response.message(), request.message());
1781   EXPECT_TRUE(s.ok());
1782 }
1783 
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc)1784 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
1785                                               bool test_insecure,
1786                                               bool test_secure,
1787                                               bool test_inproc) {
1788   std::vector<TestScenario> scenarios;
1789   std::vector<grpc::string> credentials_types;
1790   if (test_secure) {
1791     credentials_types =
1792         GetCredentialsProvider()->GetSecureCredentialsTypeList();
1793   }
1794   auto insec_ok = [] {
1795     // Only allow insecure credentials type when it is registered with the
1796     // provider. User may create providers that do not have insecure.
1797     return GetCredentialsProvider()->GetChannelCredentials(
1798                kInsecureCredentialsType, nullptr) != nullptr;
1799   };
1800   if (test_insecure && insec_ok()) {
1801     credentials_types.push_back(kInsecureCredentialsType);
1802   }
1803   GPR_ASSERT(!credentials_types.empty());
1804   for (const auto& cred : credentials_types) {
1805     scenarios.emplace_back(false, false, cred);
1806     if (use_proxy) {
1807       scenarios.emplace_back(true, false, cred);
1808     }
1809   }
1810   if (test_inproc && insec_ok()) {
1811     scenarios.emplace_back(false, true, kInsecureCredentialsType);
1812   }
1813   return scenarios;
1814 }
1815 
1816 INSTANTIATE_TEST_CASE_P(End2end, End2endTest,
1817                         ::testing::ValuesIn(CreateTestScenarios(false, true,
1818                                                                 true, true)));
1819 
1820 INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest,
1821                         ::testing::ValuesIn(CreateTestScenarios(false, true,
1822                                                                 true, true)));
1823 
1824 INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest,
1825                         ::testing::ValuesIn(CreateTestScenarios(true, true,
1826                                                                 true, true)));
1827 
1828 INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest,
1829                         ::testing::ValuesIn(CreateTestScenarios(false, false,
1830                                                                 true, false)));
1831 
1832 INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
1833                         ::testing::ValuesIn(CreateTestScenarios(false, true,
1834                                                                 true, true)));
1835 
1836 }  // namespace
1837 }  // namespace testing
1838 }  // namespace grpc
1839 
main(int argc,char ** argv)1840 int main(int argc, char** argv) {
1841   gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
1842   grpc_test_init(argc, argv);
1843   ::testing::InitGoogleTest(&argc, argv);
1844   return RUN_ALL_TESTS();
1845 }
1846