1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22 
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/ext/health_check_service_server_builder_option.h>
31 #include <grpcpp/server.h>
32 #include <grpcpp/server_builder.h>
33 #include <grpcpp/server_context.h>
34 
35 #include "src/core/lib/gpr/env.h"
36 #include "src/core/lib/gpr/tls.h"
37 #include "src/core/lib/iomgr/port.h"
38 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/string_ref_helper.h"
44 #include "test/cpp/util/test_credentials_provider.h"
45 
46 #include <gtest/gtest.h>
47 
48 using grpc::testing::EchoRequest;
49 using grpc::testing::EchoResponse;
50 using grpc::testing::kTlsCredentialsType;
51 using std::chrono::system_clock;
52 
53 namespace grpc {
54 namespace testing {
55 
56 namespace {
57 
tag(int i)58 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
detag(void * p)59 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
60 
61 class Verifier {
62  public:
Verifier()63   Verifier() : lambda_run_(false) {}
64   // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)65   Verifier& Expect(int i, bool expect_ok) {
66     return ExpectUnless(i, expect_ok, false);
67   }
68   // ExpectUnless sets the expected ok value for a specific tag
69   // unless the tag was already marked seen (as a result of ExpectMaybe)
ExpectUnless(int i,bool expect_ok,bool seen)70   Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
71     if (!seen) {
72       expectations_[tag(i)] = expect_ok;
73     }
74     return *this;
75   }
76   // ExpectMaybe sets the expected ok value for a specific tag, but does not
77   // require it to appear
78   // If it does, sets *seen to true
ExpectMaybe(int i,bool expect_ok,bool * seen)79   Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
80     if (!*seen) {
81       maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
82     }
83     return *this;
84   }
85 
86   // Next waits for 1 async tag to complete, checks its
87   // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)88   int Next(CompletionQueue* cq, bool ignore_ok) {
89     bool ok;
90     void* got_tag;
91     EXPECT_TRUE(cq->Next(&got_tag, &ok));
92     GotTag(got_tag, ok, ignore_ok);
93     return detag(got_tag);
94   }
95 
96   template <typename T>
DoOnceThenAsyncNext(CompletionQueue * cq,void ** got_tag,bool * ok,T deadline,std::function<void (void)> lambda)97   CompletionQueue::NextStatus DoOnceThenAsyncNext(
98       CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
99       std::function<void(void)> lambda) {
100     if (lambda_run_) {
101       return cq->AsyncNext(got_tag, ok, deadline);
102     } else {
103       lambda_run_ = true;
104       return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
105     }
106   }
107 
108   // Verify keeps calling Next until all currently set
109   // expected tags are complete
Verify(CompletionQueue * cq)110   void Verify(CompletionQueue* cq) { Verify(cq, false); }
111 
112   // This version of Verify allows optionally ignoring the
113   // outcome of the expectation
Verify(CompletionQueue * cq,bool ignore_ok)114   void Verify(CompletionQueue* cq, bool ignore_ok) {
115     GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
116     while (!expectations_.empty()) {
117       Next(cq, ignore_ok);
118     }
119   }
120 
121   // This version of Verify stops after a certain deadline
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline)122   void Verify(CompletionQueue* cq,
123               std::chrono::system_clock::time_point deadline) {
124     if (expectations_.empty()) {
125       bool ok;
126       void* got_tag;
127       EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
128                 CompletionQueue::TIMEOUT);
129     } else {
130       while (!expectations_.empty()) {
131         bool ok;
132         void* got_tag;
133         EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
134                   CompletionQueue::GOT_EVENT);
135         GotTag(got_tag, ok, false);
136       }
137     }
138   }
139 
140   // This version of Verify stops after a certain deadline, and uses the
141   // DoThenAsyncNext API
142   // to call the lambda
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline,const std::function<void (void)> & lambda)143   void Verify(CompletionQueue* cq,
144               std::chrono::system_clock::time_point deadline,
145               const std::function<void(void)>& lambda) {
146     if (expectations_.empty()) {
147       bool ok;
148       void* got_tag;
149       EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
150                 CompletionQueue::TIMEOUT);
151     } else {
152       while (!expectations_.empty()) {
153         bool ok;
154         void* got_tag;
155         EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
156                   CompletionQueue::GOT_EVENT);
157         GotTag(got_tag, ok, false);
158       }
159     }
160   }
161 
162  private:
GotTag(void * got_tag,bool ok,bool ignore_ok)163   void GotTag(void* got_tag, bool ok, bool ignore_ok) {
164     auto it = expectations_.find(got_tag);
165     if (it != expectations_.end()) {
166       if (!ignore_ok) {
167         EXPECT_EQ(it->second, ok);
168       }
169       expectations_.erase(it);
170     } else {
171       auto it2 = maybe_expectations_.find(got_tag);
172       if (it2 != maybe_expectations_.end()) {
173         if (it2->second.seen != nullptr) {
174           EXPECT_FALSE(*it2->second.seen);
175           *it2->second.seen = true;
176         }
177         if (!ignore_ok) {
178           EXPECT_EQ(it2->second.ok, ok);
179         }
180       } else {
181         gpr_log(GPR_ERROR, "Unexpected tag: %p", got_tag);
182         abort();
183       }
184     }
185   }
186 
187   struct MaybeExpect {
188     bool ok;
189     bool* seen;
190   };
191 
192   std::map<void*, bool> expectations_;
193   std::map<void*, MaybeExpect> maybe_expectations_;
194   bool lambda_run_;
195 };
196 
plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin> & plugin)197 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
198   return plugin->has_sync_methods();
199 }
200 
201 // This class disables the server builder plugins that may add sync services to
202 // the server. If there are sync services, UnimplementedRpc test will triger
203 // the sync unknown rpc routine on the server side, rather than the async one
204 // that needs to be tested here.
205 class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
206  public:
UpdateArguments(ChannelArguments * arg)207   void UpdateArguments(ChannelArguments* arg) override {}
208 
UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>> * plugins)209   void UpdatePlugins(
210       std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
211     plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
212                                   plugin_has_sync_methods),
213                    plugins->end());
214   }
215 };
216 
217 class TestScenario {
218  public:
TestScenario(bool inproc_stub,const grpc::string & creds_type,bool hcs,const grpc::string & content)219   TestScenario(bool inproc_stub, const grpc::string& creds_type, bool hcs,
220                const grpc::string& content)
221       : inproc(inproc_stub),
222         health_check_service(hcs),
223         credentials_type(creds_type),
224         message_content(content) {}
225   void Log() const;
226   bool inproc;
227   bool health_check_service;
228   const grpc::string credentials_type;
229   const grpc::string message_content;
230 };
231 
operator <<(std::ostream & out,const TestScenario & scenario)232 static std::ostream& operator<<(std::ostream& out,
233                                 const TestScenario& scenario) {
234   return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
235              << ", credentials='" << scenario.credentials_type
236              << ", health_check_service="
237              << (scenario.health_check_service ? "true" : "false")
238              << "', message_size=" << scenario.message_content.size() << "}";
239 }
240 
Log() const241 void TestScenario::Log() const {
242   std::ostringstream out;
243   out << *this;
244   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
245 }
246 
247 class HealthCheck : public health::v1::Health::Service {};
248 
249 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
250  protected:
AsyncEnd2endTest()251   AsyncEnd2endTest() { GetParam().Log(); }
252 
SetUp()253   void SetUp() override {
254     port_ = grpc_pick_unused_port_or_die();
255     server_address_ << "localhost:" << port_;
256 
257     // Setup server
258     BuildAndStartServer();
259   }
260 
TearDown()261   void TearDown() override {
262     server_->Shutdown();
263     void* ignored_tag;
264     bool ignored_ok;
265     cq_->Shutdown();
266     while (cq_->Next(&ignored_tag, &ignored_ok))
267       ;
268     stub_.reset();
269     grpc_recycle_unused_port(port_);
270   }
271 
BuildAndStartServer()272   void BuildAndStartServer() {
273     ServerBuilder builder;
274     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
275         GetParam().credentials_type);
276     builder.AddListeningPort(server_address_.str(), server_creds);
277     service_.reset(new grpc::testing::EchoTestService::AsyncService());
278     builder.RegisterService(service_.get());
279     if (GetParam().health_check_service) {
280       builder.RegisterService(&health_check_);
281     }
282     cq_ = builder.AddCompletionQueue();
283 
284     // TODO(zyc): make a test option to choose wheather sync plugins should be
285     // deleted
286     std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
287         new ServerBuilderSyncPluginDisabler());
288     builder.SetOption(move(sync_plugin_disabler));
289     server_ = builder.BuildAndStart();
290   }
291 
ResetStub()292   void ResetStub() {
293     ChannelArguments args;
294     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
295         GetParam().credentials_type, &args);
296     std::shared_ptr<Channel> channel =
297         !(GetParam().inproc)
298             ? CreateCustomChannel(server_address_.str(), channel_creds, args)
299             : server_->InProcessChannel(args);
300     stub_ = grpc::testing::EchoTestService::NewStub(channel);
301   }
302 
SendRpc(int num_rpcs)303   void SendRpc(int num_rpcs) {
304     for (int i = 0; i < num_rpcs; i++) {
305       EchoRequest send_request;
306       EchoRequest recv_request;
307       EchoResponse send_response;
308       EchoResponse recv_response;
309       Status recv_status;
310 
311       ClientContext cli_ctx;
312       ServerContext srv_ctx;
313       grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
314 
315       send_request.set_message(GetParam().message_content);
316       std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
317           stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
318 
319       service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
320                             cq_.get(), cq_.get(), tag(2));
321 
322       response_reader->Finish(&recv_response, &recv_status, tag(4));
323 
324       Verifier().Expect(2, true).Verify(cq_.get());
325       EXPECT_EQ(send_request.message(), recv_request.message());
326 
327       send_response.set_message(recv_request.message());
328       response_writer.Finish(send_response, Status::OK, tag(3));
329       Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
330 
331       EXPECT_EQ(send_response.message(), recv_response.message());
332       EXPECT_TRUE(recv_status.ok());
333     }
334   }
335 
336   std::unique_ptr<ServerCompletionQueue> cq_;
337   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
338   std::unique_ptr<Server> server_;
339   std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
340   HealthCheck health_check_;
341   std::ostringstream server_address_;
342   int port_;
343 };
344 
TEST_P(AsyncEnd2endTest,SimpleRpc)345 TEST_P(AsyncEnd2endTest, SimpleRpc) {
346   ResetStub();
347   SendRpc(1);
348 }
349 
TEST_P(AsyncEnd2endTest,SequentialRpcs)350 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
351   ResetStub();
352   SendRpc(10);
353 }
354 
TEST_P(AsyncEnd2endTest,ReconnectChannel)355 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
356   // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
357   if (GetParam().inproc) {
358     return;
359   }
360   int poller_slowdown_factor = 1;
361   // It needs 2 pollset_works to reconnect the channel with polling engine
362   // "poll"
363   char* s = gpr_getenv("GRPC_POLL_STRATEGY");
364   if (s != nullptr && 0 == strcmp(s, "poll")) {
365     poller_slowdown_factor = 2;
366   }
367   gpr_free(s);
368   ResetStub();
369   SendRpc(1);
370   server_->Shutdown();
371   void* ignored_tag;
372   bool ignored_ok;
373   cq_->Shutdown();
374   while (cq_->Next(&ignored_tag, &ignored_ok))
375     ;
376   BuildAndStartServer();
377   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
378   // reconnect the channel.
379   gpr_sleep_until(gpr_time_add(
380       gpr_now(GPR_CLOCK_REALTIME),
381       gpr_time_from_millis(
382           300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
383           GPR_TIMESPAN)));
384   SendRpc(1);
385 }
386 
387 // We do not need to protect notify because the use is synchronized.
ServerWait(Server * server,int * notify)388 void ServerWait(Server* server, int* notify) {
389   server->Wait();
390   *notify = 1;
391 }
TEST_P(AsyncEnd2endTest,WaitAndShutdownTest)392 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
393   int notify = 0;
394   std::thread wait_thread(&ServerWait, server_.get(), &notify);
395   ResetStub();
396   SendRpc(1);
397   EXPECT_EQ(0, notify);
398   server_->Shutdown();
399   wait_thread.join();
400   EXPECT_EQ(1, notify);
401 }
402 
TEST_P(AsyncEnd2endTest,ShutdownThenWait)403 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
404   ResetStub();
405   SendRpc(1);
406   std::thread t([this]() { server_->Shutdown(); });
407   server_->Wait();
408   t.join();
409 }
410 
411 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,AsyncNextRpc)412 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
413   ResetStub();
414 
415   EchoRequest send_request;
416   EchoRequest recv_request;
417   EchoResponse send_response;
418   EchoResponse recv_response;
419   Status recv_status;
420 
421   ClientContext cli_ctx;
422   ServerContext srv_ctx;
423   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
424 
425   send_request.set_message(GetParam().message_content);
426   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
427       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
428 
429   std::chrono::system_clock::time_point time_now(
430       std::chrono::system_clock::now());
431   std::chrono::system_clock::time_point time_limit(
432       std::chrono::system_clock::now() + std::chrono::seconds(10));
433   Verifier().Verify(cq_.get(), time_now);
434   Verifier().Verify(cq_.get(), time_now);
435 
436   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
437                         cq_.get(), tag(2));
438   response_reader->Finish(&recv_response, &recv_status, tag(4));
439 
440   Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
441   EXPECT_EQ(send_request.message(), recv_request.message());
442 
443   send_response.set_message(recv_request.message());
444   response_writer.Finish(send_response, Status::OK, tag(3));
445   Verifier().Expect(3, true).Expect(4, true).Verify(
446       cq_.get(), std::chrono::system_clock::time_point::max());
447 
448   EXPECT_EQ(send_response.message(), recv_response.message());
449   EXPECT_TRUE(recv_status.ok());
450 }
451 
452 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,DoThenAsyncNextRpc)453 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
454   ResetStub();
455 
456   EchoRequest send_request;
457   EchoRequest recv_request;
458   EchoResponse send_response;
459   EchoResponse recv_response;
460   Status recv_status;
461 
462   ClientContext cli_ctx;
463   ServerContext srv_ctx;
464   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
465 
466   send_request.set_message(GetParam().message_content);
467   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
468       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
469 
470   std::chrono::system_clock::time_point time_now(
471       std::chrono::system_clock::now());
472   std::chrono::system_clock::time_point time_limit(
473       std::chrono::system_clock::now() + std::chrono::seconds(10));
474   Verifier().Verify(cq_.get(), time_now);
475   Verifier().Verify(cq_.get(), time_now);
476 
477   auto resp_writer_ptr = &response_writer;
478   auto lambda_2 = [&, this, resp_writer_ptr]() {
479     service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
480                           cq_.get(), tag(2));
481   };
482   response_reader->Finish(&recv_response, &recv_status, tag(4));
483 
484   Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
485   EXPECT_EQ(send_request.message(), recv_request.message());
486 
487   send_response.set_message(recv_request.message());
488   auto lambda_3 = [resp_writer_ptr, send_response]() {
489     resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
490   };
491   Verifier().Expect(3, true).Expect(4, true).Verify(
492       cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
493 
494   EXPECT_EQ(send_response.message(), recv_response.message());
495   EXPECT_TRUE(recv_status.ok());
496 }
497 
498 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreaming)499 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
500   ResetStub();
501 
502   EchoRequest send_request;
503   EchoRequest recv_request;
504   EchoResponse send_response;
505   EchoResponse recv_response;
506   Status recv_status;
507   ClientContext cli_ctx;
508   ServerContext srv_ctx;
509   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
510 
511   send_request.set_message(GetParam().message_content);
512   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
513       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
514 
515   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
516                                  tag(2));
517 
518   Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
519 
520   cli_stream->Write(send_request, tag(3));
521   srv_stream.Read(&recv_request, tag(4));
522   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
523   EXPECT_EQ(send_request.message(), recv_request.message());
524 
525   cli_stream->Write(send_request, tag(5));
526   srv_stream.Read(&recv_request, tag(6));
527   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
528 
529   EXPECT_EQ(send_request.message(), recv_request.message());
530   cli_stream->WritesDone(tag(7));
531   srv_stream.Read(&recv_request, tag(8));
532   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
533 
534   send_response.set_message(recv_request.message());
535   srv_stream.Finish(send_response, Status::OK, tag(9));
536   cli_stream->Finish(&recv_status, tag(10));
537   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
538 
539   EXPECT_EQ(send_response.message(), recv_response.message());
540   EXPECT_TRUE(recv_status.ok());
541 }
542 
543 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreamingWithCoalescingApi)544 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
545   ResetStub();
546 
547   EchoRequest send_request;
548   EchoRequest recv_request;
549   EchoResponse send_response;
550   EchoResponse recv_response;
551   Status recv_status;
552   ClientContext cli_ctx;
553   ServerContext srv_ctx;
554   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
555 
556   send_request.set_message(GetParam().message_content);
557   cli_ctx.set_initial_metadata_corked(true);
558   // tag:1 never comes up since no op is performed
559   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
560       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
561 
562   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
563                                  tag(2));
564 
565   cli_stream->Write(send_request, tag(3));
566 
567   bool seen3 = false;
568 
569   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
570 
571   srv_stream.Read(&recv_request, tag(4));
572 
573   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
574 
575   EXPECT_EQ(send_request.message(), recv_request.message());
576 
577   cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
578   srv_stream.Read(&recv_request, tag(6));
579   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
580   EXPECT_EQ(send_request.message(), recv_request.message());
581 
582   srv_stream.Read(&recv_request, tag(7));
583   Verifier().Expect(7, false).Verify(cq_.get());
584 
585   send_response.set_message(recv_request.message());
586   srv_stream.Finish(send_response, Status::OK, tag(8));
587   cli_stream->Finish(&recv_status, tag(9));
588   Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
589 
590   EXPECT_EQ(send_response.message(), recv_response.message());
591   EXPECT_TRUE(recv_status.ok());
592 }
593 
594 // One ping, two pongs.
TEST_P(AsyncEnd2endTest,SimpleServerStreaming)595 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
596   ResetStub();
597 
598   EchoRequest send_request;
599   EchoRequest recv_request;
600   EchoResponse send_response;
601   EchoResponse recv_response;
602   Status recv_status;
603   ClientContext cli_ctx;
604   ServerContext srv_ctx;
605   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
606 
607   send_request.set_message(GetParam().message_content);
608   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
609       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
610 
611   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
612                                   cq_.get(), cq_.get(), tag(2));
613 
614   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
615   EXPECT_EQ(send_request.message(), recv_request.message());
616 
617   send_response.set_message(recv_request.message());
618   srv_stream.Write(send_response, tag(3));
619   cli_stream->Read(&recv_response, tag(4));
620   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
621   EXPECT_EQ(send_response.message(), recv_response.message());
622 
623   srv_stream.Write(send_response, tag(5));
624   cli_stream->Read(&recv_response, tag(6));
625   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
626   EXPECT_EQ(send_response.message(), recv_response.message());
627 
628   srv_stream.Finish(Status::OK, tag(7));
629   cli_stream->Read(&recv_response, tag(8));
630   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
631 
632   cli_stream->Finish(&recv_status, tag(9));
633   Verifier().Expect(9, true).Verify(cq_.get());
634 
635   EXPECT_TRUE(recv_status.ok());
636 }
637 
638 // One ping, two pongs. Using WriteAndFinish API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWAF)639 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
640   ResetStub();
641 
642   EchoRequest send_request;
643   EchoRequest recv_request;
644   EchoResponse send_response;
645   EchoResponse recv_response;
646   Status recv_status;
647   ClientContext cli_ctx;
648   ServerContext srv_ctx;
649   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
650 
651   send_request.set_message(GetParam().message_content);
652   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
653       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
654 
655   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
656                                   cq_.get(), cq_.get(), tag(2));
657 
658   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
659   EXPECT_EQ(send_request.message(), recv_request.message());
660 
661   send_response.set_message(recv_request.message());
662   srv_stream.Write(send_response, tag(3));
663   cli_stream->Read(&recv_response, tag(4));
664   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
665   EXPECT_EQ(send_response.message(), recv_response.message());
666 
667   srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
668   cli_stream->Read(&recv_response, tag(6));
669   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
670   EXPECT_EQ(send_response.message(), recv_response.message());
671 
672   cli_stream->Read(&recv_response, tag(7));
673   Verifier().Expect(7, false).Verify(cq_.get());
674 
675   cli_stream->Finish(&recv_status, tag(8));
676   Verifier().Expect(8, true).Verify(cq_.get());
677 
678   EXPECT_TRUE(recv_status.ok());
679 }
680 
681 // One ping, two pongs. Using WriteLast API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWL)682 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
683   ResetStub();
684 
685   EchoRequest send_request;
686   EchoRequest recv_request;
687   EchoResponse send_response;
688   EchoResponse recv_response;
689   Status recv_status;
690   ClientContext cli_ctx;
691   ServerContext srv_ctx;
692   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
693 
694   send_request.set_message(GetParam().message_content);
695   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
696       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
697 
698   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
699                                   cq_.get(), cq_.get(), tag(2));
700 
701   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
702   EXPECT_EQ(send_request.message(), recv_request.message());
703 
704   send_response.set_message(recv_request.message());
705   srv_stream.Write(send_response, tag(3));
706   cli_stream->Read(&recv_response, tag(4));
707   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
708   EXPECT_EQ(send_response.message(), recv_response.message());
709 
710   srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
711   cli_stream->Read(&recv_response, tag(6));
712   srv_stream.Finish(Status::OK, tag(7));
713   Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
714   EXPECT_EQ(send_response.message(), recv_response.message());
715 
716   cli_stream->Read(&recv_response, tag(8));
717   Verifier().Expect(8, false).Verify(cq_.get());
718 
719   cli_stream->Finish(&recv_status, tag(9));
720   Verifier().Expect(9, true).Verify(cq_.get());
721 
722   EXPECT_TRUE(recv_status.ok());
723 }
724 
725 // One ping, one pong.
TEST_P(AsyncEnd2endTest,SimpleBidiStreaming)726 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
727   ResetStub();
728 
729   EchoRequest send_request;
730   EchoRequest recv_request;
731   EchoResponse send_response;
732   EchoResponse recv_response;
733   Status recv_status;
734   ClientContext cli_ctx;
735   ServerContext srv_ctx;
736   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
737 
738   send_request.set_message(GetParam().message_content);
739   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
740       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
741 
742   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
743                               tag(2));
744 
745   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
746 
747   cli_stream->Write(send_request, tag(3));
748   srv_stream.Read(&recv_request, tag(4));
749   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
750   EXPECT_EQ(send_request.message(), recv_request.message());
751 
752   send_response.set_message(recv_request.message());
753   srv_stream.Write(send_response, tag(5));
754   cli_stream->Read(&recv_response, tag(6));
755   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
756   EXPECT_EQ(send_response.message(), recv_response.message());
757 
758   cli_stream->WritesDone(tag(7));
759   srv_stream.Read(&recv_request, tag(8));
760   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
761 
762   srv_stream.Finish(Status::OK, tag(9));
763   cli_stream->Finish(&recv_status, tag(10));
764   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
765 
766   EXPECT_TRUE(recv_status.ok());
767 }
768 
769 // One ping, one pong. Using server:WriteAndFinish api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWAF)770 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
771   ResetStub();
772 
773   EchoRequest send_request;
774   EchoRequest recv_request;
775   EchoResponse send_response;
776   EchoResponse recv_response;
777   Status recv_status;
778   ClientContext cli_ctx;
779   ServerContext srv_ctx;
780   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
781 
782   send_request.set_message(GetParam().message_content);
783   cli_ctx.set_initial_metadata_corked(true);
784   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
785       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
786 
787   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
788                               tag(2));
789 
790   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
791 
792   bool seen3 = false;
793 
794   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
795 
796   srv_stream.Read(&recv_request, tag(4));
797 
798   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
799   EXPECT_EQ(send_request.message(), recv_request.message());
800 
801   srv_stream.Read(&recv_request, tag(5));
802   Verifier().Expect(5, false).Verify(cq_.get());
803 
804   send_response.set_message(recv_request.message());
805   srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
806   cli_stream->Read(&recv_response, tag(7));
807   Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
808   EXPECT_EQ(send_response.message(), recv_response.message());
809 
810   cli_stream->Finish(&recv_status, tag(8));
811   Verifier().Expect(8, true).Verify(cq_.get());
812 
813   EXPECT_TRUE(recv_status.ok());
814 }
815 
816 // One ping, one pong. Using server:WriteLast api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWL)817 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
818   ResetStub();
819 
820   EchoRequest send_request;
821   EchoRequest recv_request;
822   EchoResponse send_response;
823   EchoResponse recv_response;
824   Status recv_status;
825   ClientContext cli_ctx;
826   ServerContext srv_ctx;
827   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
828 
829   send_request.set_message(GetParam().message_content);
830   cli_ctx.set_initial_metadata_corked(true);
831   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
832       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
833 
834   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
835                               tag(2));
836 
837   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
838 
839   bool seen3 = false;
840 
841   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
842 
843   srv_stream.Read(&recv_request, tag(4));
844 
845   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
846   EXPECT_EQ(send_request.message(), recv_request.message());
847 
848   srv_stream.Read(&recv_request, tag(5));
849   Verifier().Expect(5, false).Verify(cq_.get());
850 
851   send_response.set_message(recv_request.message());
852   srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
853   srv_stream.Finish(Status::OK, tag(7));
854   cli_stream->Read(&recv_response, tag(8));
855   Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
856   EXPECT_EQ(send_response.message(), recv_response.message());
857 
858   cli_stream->Finish(&recv_status, tag(9));
859   Verifier().Expect(9, true).Verify(cq_.get());
860 
861   EXPECT_TRUE(recv_status.ok());
862 }
863 
864 // Metadata tests
TEST_P(AsyncEnd2endTest,ClientInitialMetadataRpc)865 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
866   ResetStub();
867 
868   EchoRequest send_request;
869   EchoRequest recv_request;
870   EchoResponse send_response;
871   EchoResponse recv_response;
872   Status recv_status;
873 
874   ClientContext cli_ctx;
875   ServerContext srv_ctx;
876   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
877 
878   send_request.set_message(GetParam().message_content);
879   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
880   std::pair<grpc::string, grpc::string> meta2("key2", "val2");
881   std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
882   cli_ctx.AddMetadata(meta1.first, meta1.second);
883   cli_ctx.AddMetadata(meta2.first, meta2.second);
884   cli_ctx.AddMetadata(meta3.first, meta3.second);
885 
886   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
887       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
888   response_reader->Finish(&recv_response, &recv_status, tag(4));
889 
890   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
891                         cq_.get(), tag(2));
892   Verifier().Expect(2, true).Verify(cq_.get());
893   EXPECT_EQ(send_request.message(), recv_request.message());
894   const auto& client_initial_metadata = srv_ctx.client_metadata();
895   EXPECT_EQ(meta1.second,
896             ToString(client_initial_metadata.find(meta1.first)->second));
897   EXPECT_EQ(meta2.second,
898             ToString(client_initial_metadata.find(meta2.first)->second));
899   EXPECT_EQ(meta3.second,
900             ToString(client_initial_metadata.find(meta3.first)->second));
901   EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
902 
903   send_response.set_message(recv_request.message());
904   response_writer.Finish(send_response, Status::OK, tag(3));
905   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
906 
907   EXPECT_EQ(send_response.message(), recv_response.message());
908   EXPECT_TRUE(recv_status.ok());
909 }
910 
TEST_P(AsyncEnd2endTest,ServerInitialMetadataRpc)911 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
912   ResetStub();
913 
914   EchoRequest send_request;
915   EchoRequest recv_request;
916   EchoResponse send_response;
917   EchoResponse recv_response;
918   Status recv_status;
919 
920   ClientContext cli_ctx;
921   ServerContext srv_ctx;
922   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
923 
924   send_request.set_message(GetParam().message_content);
925   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
926   std::pair<grpc::string, grpc::string> meta2("key2", "val2");
927 
928   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
929       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
930   response_reader->ReadInitialMetadata(tag(4));
931 
932   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
933                         cq_.get(), tag(2));
934   Verifier().Expect(2, true).Verify(cq_.get());
935   EXPECT_EQ(send_request.message(), recv_request.message());
936   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
937   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
938   response_writer.SendInitialMetadata(tag(3));
939   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
940   const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
941   EXPECT_EQ(meta1.second,
942             ToString(server_initial_metadata.find(meta1.first)->second));
943   EXPECT_EQ(meta2.second,
944             ToString(server_initial_metadata.find(meta2.first)->second));
945   EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
946 
947   send_response.set_message(recv_request.message());
948   response_writer.Finish(send_response, Status::OK, tag(5));
949   response_reader->Finish(&recv_response, &recv_status, tag(6));
950   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
951 
952   EXPECT_EQ(send_response.message(), recv_response.message());
953   EXPECT_TRUE(recv_status.ok());
954 }
955 
TEST_P(AsyncEnd2endTest,ServerTrailingMetadataRpc)956 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
957   ResetStub();
958 
959   EchoRequest send_request;
960   EchoRequest recv_request;
961   EchoResponse send_response;
962   EchoResponse recv_response;
963   Status recv_status;
964 
965   ClientContext cli_ctx;
966   ServerContext srv_ctx;
967   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
968 
969   send_request.set_message(GetParam().message_content);
970   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
971   std::pair<grpc::string, grpc::string> meta2("key2", "val2");
972 
973   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
974       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
975   response_reader->Finish(&recv_response, &recv_status, tag(5));
976 
977   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
978                         cq_.get(), tag(2));
979   Verifier().Expect(2, true).Verify(cq_.get());
980   EXPECT_EQ(send_request.message(), recv_request.message());
981   response_writer.SendInitialMetadata(tag(3));
982   Verifier().Expect(3, true).Verify(cq_.get());
983 
984   send_response.set_message(recv_request.message());
985   srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
986   srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
987   response_writer.Finish(send_response, Status::OK, tag(4));
988 
989   Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
990 
991   EXPECT_EQ(send_response.message(), recv_response.message());
992   EXPECT_TRUE(recv_status.ok());
993   const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
994   EXPECT_EQ(meta1.second,
995             ToString(server_trailing_metadata.find(meta1.first)->second));
996   EXPECT_EQ(meta2.second,
997             ToString(server_trailing_metadata.find(meta2.first)->second));
998   EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
999 }
1000 
TEST_P(AsyncEnd2endTest,MetadataRpc)1001 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1002   ResetStub();
1003 
1004   EchoRequest send_request;
1005   EchoRequest recv_request;
1006   EchoResponse send_response;
1007   EchoResponse recv_response;
1008   Status recv_status;
1009 
1010   ClientContext cli_ctx;
1011   ServerContext srv_ctx;
1012   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1013 
1014   send_request.set_message(GetParam().message_content);
1015   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
1016   std::pair<grpc::string, grpc::string> meta2(
1017       "key2-bin",
1018       grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1019   std::pair<grpc::string, grpc::string> meta3("key3", "val3");
1020   std::pair<grpc::string, grpc::string> meta6(
1021       "key4-bin",
1022       grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1023                    14));
1024   std::pair<grpc::string, grpc::string> meta5("key5", "val5");
1025   std::pair<grpc::string, grpc::string> meta4(
1026       "key6-bin",
1027       grpc::string(
1028           "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1029 
1030   cli_ctx.AddMetadata(meta1.first, meta1.second);
1031   cli_ctx.AddMetadata(meta2.first, meta2.second);
1032 
1033   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1034       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1035   response_reader->ReadInitialMetadata(tag(4));
1036 
1037   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1038                         cq_.get(), tag(2));
1039   Verifier().Expect(2, true).Verify(cq_.get());
1040   EXPECT_EQ(send_request.message(), recv_request.message());
1041   const auto& client_initial_metadata = srv_ctx.client_metadata();
1042   EXPECT_EQ(meta1.second,
1043             ToString(client_initial_metadata.find(meta1.first)->second));
1044   EXPECT_EQ(meta2.second,
1045             ToString(client_initial_metadata.find(meta2.first)->second));
1046   EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
1047 
1048   srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1049   srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1050   response_writer.SendInitialMetadata(tag(3));
1051   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1052   const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1053   EXPECT_EQ(meta3.second,
1054             ToString(server_initial_metadata.find(meta3.first)->second));
1055   EXPECT_EQ(meta4.second,
1056             ToString(server_initial_metadata.find(meta4.first)->second));
1057   EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
1058 
1059   send_response.set_message(recv_request.message());
1060   srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1061   srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1062   response_writer.Finish(send_response, Status::OK, tag(5));
1063   response_reader->Finish(&recv_response, &recv_status, tag(6));
1064 
1065   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1066 
1067   EXPECT_EQ(send_response.message(), recv_response.message());
1068   EXPECT_TRUE(recv_status.ok());
1069   const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1070   EXPECT_EQ(meta5.second,
1071             ToString(server_trailing_metadata.find(meta5.first)->second));
1072   EXPECT_EQ(meta6.second,
1073             ToString(server_trailing_metadata.find(meta6.first)->second));
1074   EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
1075 }
1076 
1077 // Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_P(AsyncEnd2endTest,ServerCheckCancellation)1078 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1079   ResetStub();
1080 
1081   EchoRequest send_request;
1082   EchoRequest recv_request;
1083   EchoResponse send_response;
1084   EchoResponse recv_response;
1085   Status recv_status;
1086 
1087   ClientContext cli_ctx;
1088   ServerContext srv_ctx;
1089   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1090 
1091   send_request.set_message(GetParam().message_content);
1092   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1093       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1094   response_reader->Finish(&recv_response, &recv_status, tag(4));
1095 
1096   srv_ctx.AsyncNotifyWhenDone(tag(5));
1097   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1098                         cq_.get(), tag(2));
1099 
1100   Verifier().Expect(2, true).Verify(cq_.get());
1101   EXPECT_EQ(send_request.message(), recv_request.message());
1102 
1103   cli_ctx.TryCancel();
1104   Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1105   EXPECT_TRUE(srv_ctx.IsCancelled());
1106 
1107   EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1108 }
1109 
1110 // Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_P(AsyncEnd2endTest,ServerCheckDone)1111 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1112   ResetStub();
1113 
1114   EchoRequest send_request;
1115   EchoRequest recv_request;
1116   EchoResponse send_response;
1117   EchoResponse recv_response;
1118   Status recv_status;
1119 
1120   ClientContext cli_ctx;
1121   ServerContext srv_ctx;
1122   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1123 
1124   send_request.set_message(GetParam().message_content);
1125   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1126       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1127   response_reader->Finish(&recv_response, &recv_status, tag(4));
1128 
1129   srv_ctx.AsyncNotifyWhenDone(tag(5));
1130   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1131                         cq_.get(), tag(2));
1132 
1133   Verifier().Expect(2, true).Verify(cq_.get());
1134   EXPECT_EQ(send_request.message(), recv_request.message());
1135 
1136   send_response.set_message(recv_request.message());
1137   response_writer.Finish(send_response, Status::OK, tag(3));
1138   Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1139   EXPECT_FALSE(srv_ctx.IsCancelled());
1140 
1141   EXPECT_EQ(send_response.message(), recv_response.message());
1142   EXPECT_TRUE(recv_status.ok());
1143 }
1144 
TEST_P(AsyncEnd2endTest,UnimplementedRpc)1145 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1146   ChannelArguments args;
1147   const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1148       GetParam().credentials_type, &args);
1149   std::shared_ptr<Channel> channel =
1150       !(GetParam().inproc)
1151           ? CreateCustomChannel(server_address_.str(), channel_creds, args)
1152           : server_->InProcessChannel(args);
1153   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1154   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1155   EchoRequest send_request;
1156   EchoResponse recv_response;
1157   Status recv_status;
1158 
1159   ClientContext cli_ctx;
1160   send_request.set_message(GetParam().message_content);
1161   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1162       stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1163 
1164   response_reader->Finish(&recv_response, &recv_status, tag(4));
1165   Verifier().Expect(4, true).Verify(cq_.get());
1166 
1167   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1168   EXPECT_EQ("", recv_status.error_message());
1169 }
1170 
1171 // This class is for testing scenarios where RPCs are cancelled on the server
1172 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1173 // API to check for cancellation
1174 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1175  protected:
1176   typedef enum {
1177     DO_NOT_CANCEL = 0,
1178     CANCEL_BEFORE_PROCESSING,
1179     CANCEL_DURING_PROCESSING,
1180     CANCEL_AFTER_PROCESSING
1181   } ServerTryCancelRequestPhase;
1182 
1183   // Helper for testing client-streaming RPCs which are cancelled on the server.
1184   // Depending on the value of server_try_cancel parameter, this will test one
1185   // of the following three scenarios:
1186   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1187   //   any messages from the client
1188   //
1189   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1190   //   messages from the client
1191   //
1192   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1193   //   messages from the client (but before sending any status back to the
1194   //   client)
TestClientStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1195   void TestClientStreamingServerCancel(
1196       ServerTryCancelRequestPhase server_try_cancel) {
1197     ResetStub();
1198 
1199     EchoRequest recv_request;
1200     EchoResponse send_response;
1201     EchoResponse recv_response;
1202     Status recv_status;
1203 
1204     ClientContext cli_ctx;
1205     ServerContext srv_ctx;
1206     ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1207 
1208     // Initiate the 'RequestStream' call on client
1209     CompletionQueue cli_cq;
1210 
1211     std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1212         stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1213 
1214     // On the server, request to be notified of 'RequestStream' calls
1215     // and receive the 'RequestStream' call just made by the client
1216     srv_ctx.AsyncNotifyWhenDone(tag(11));
1217     service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1218                                    tag(2));
1219     std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1220     Verifier().Expect(2, true).Verify(cq_.get());
1221     t1.join();
1222 
1223     bool expected_server_cq_result = true;
1224     bool expected_client_cq_result = true;
1225 
1226     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1227       srv_ctx.TryCancel();
1228       Verifier().Expect(11, true).Verify(cq_.get());
1229       EXPECT_TRUE(srv_ctx.IsCancelled());
1230 
1231       // Since cancellation is done before server reads any results, we know
1232       // for sure that all server cq results will return false from this
1233       // point forward
1234       expected_server_cq_result = false;
1235       expected_client_cq_result = false;
1236     }
1237 
1238     bool ignore_client_cq_result =
1239         (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1240         (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1241 
1242     std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1243                             &ignore_client_cq_result] {
1244       EchoRequest send_request;
1245       // Client sends 3 messages (tags 3, 4 and 5)
1246       for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1247         send_request.set_message("Ping " + grpc::to_string(tag_idx));
1248         cli_stream->Write(send_request, tag(tag_idx));
1249         Verifier()
1250             .Expect(tag_idx, expected_client_cq_result)
1251             .Verify(&cli_cq, ignore_client_cq_result);
1252       }
1253       cli_stream->WritesDone(tag(6));
1254       // Ignore ok on WritesDone since cancel can affect it
1255       Verifier()
1256           .Expect(6, expected_client_cq_result)
1257           .Verify(&cli_cq, ignore_client_cq_result);
1258     });
1259 
1260     bool ignore_cq_result = false;
1261     bool want_done_tag = false;
1262     std::thread* server_try_cancel_thd = nullptr;
1263 
1264     auto verif = Verifier();
1265 
1266     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1267       server_try_cancel_thd =
1268           new std::thread(&ServerContext::TryCancel, &srv_ctx);
1269       // Server will cancel the RPC in a parallel thread while reading the
1270       // requests from the client. Since the cancellation can happen at anytime,
1271       // some of the cq results (i.e those until cancellation) might be true but
1272       // its non deterministic. So better to ignore the cq results
1273       ignore_cq_result = true;
1274       // Expect that we might possibly see the done tag that
1275       // indicates cancellation completion in this case
1276       want_done_tag = true;
1277       verif.Expect(11, true);
1278     }
1279 
1280     // Server reads 3 messages (tags 6, 7 and 8)
1281     // But if want_done_tag is true, we might also see tag 11
1282     for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1283       srv_stream.Read(&recv_request, tag(tag_idx));
1284       // Note that we'll add something to the verifier and verify that
1285       // something was seen, but it might be tag 11 and not what we
1286       // just added
1287       int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1288                         .Next(cq_.get(), ignore_cq_result);
1289       GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1290       if (got_tag == 11) {
1291         EXPECT_TRUE(srv_ctx.IsCancelled());
1292         want_done_tag = false;
1293         // Now get the other entry that we were waiting on
1294         EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1295       }
1296     }
1297 
1298     cli_thread.join();
1299 
1300     if (server_try_cancel_thd != nullptr) {
1301       server_try_cancel_thd->join();
1302       delete server_try_cancel_thd;
1303     }
1304 
1305     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1306       srv_ctx.TryCancel();
1307       want_done_tag = true;
1308       verif.Expect(11, true);
1309     }
1310 
1311     if (want_done_tag) {
1312       verif.Verify(cq_.get());
1313       EXPECT_TRUE(srv_ctx.IsCancelled());
1314       want_done_tag = false;
1315     }
1316 
1317     // The RPC has been cancelled at this point for sure (i.e irrespective of
1318     // the value of `server_try_cancel` is). So, from this point forward, we
1319     // know that cq results are supposed to return false on server.
1320 
1321     // Server sends the final message and cancelled status (but the RPC is
1322     // already cancelled at this point. So we expect the operation to fail)
1323     srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1324     Verifier().Expect(9, false).Verify(cq_.get());
1325 
1326     // Client will see the cancellation
1327     cli_stream->Finish(&recv_status, tag(10));
1328     Verifier().Expect(10, true).Verify(&cli_cq);
1329     EXPECT_FALSE(recv_status.ok());
1330     EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1331 
1332     cli_cq.Shutdown();
1333     void* dummy_tag;
1334     bool dummy_ok;
1335     while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1336     }
1337   }
1338 
1339   // Helper for testing server-streaming RPCs which are cancelled on the server.
1340   // Depending on the value of server_try_cancel parameter, this will test one
1341   // of the following three scenarios:
1342   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1343   //   any messages to the client
1344   //
1345   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1346   //   messages to the client
1347   //
1348   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1349   //   messages to the client (but before sending any status back to the
1350   //   client)
TestServerStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1351   void TestServerStreamingServerCancel(
1352       ServerTryCancelRequestPhase server_try_cancel) {
1353     ResetStub();
1354 
1355     EchoRequest send_request;
1356     EchoRequest recv_request;
1357     EchoResponse send_response;
1358     Status recv_status;
1359     ClientContext cli_ctx;
1360     ServerContext srv_ctx;
1361     ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1362 
1363     send_request.set_message("Ping");
1364     // Initiate the 'ResponseStream' call on the client
1365     CompletionQueue cli_cq;
1366     std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1367         stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1368     // On the server, request to be notified of 'ResponseStream' calls and
1369     // receive the call just made by the client
1370     srv_ctx.AsyncNotifyWhenDone(tag(11));
1371     service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1372                                     cq_.get(), cq_.get(), tag(2));
1373 
1374     std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1375     Verifier().Expect(2, true).Verify(cq_.get());
1376     t1.join();
1377 
1378     EXPECT_EQ(send_request.message(), recv_request.message());
1379 
1380     bool expected_cq_result = true;
1381     bool ignore_cq_result = false;
1382     bool want_done_tag = false;
1383     bool expected_client_cq_result = true;
1384     bool ignore_client_cq_result =
1385         (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1386 
1387     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1388       srv_ctx.TryCancel();
1389       Verifier().Expect(11, true).Verify(cq_.get());
1390       EXPECT_TRUE(srv_ctx.IsCancelled());
1391 
1392       // We know for sure that all cq results will be false from this point
1393       // since the server cancelled the RPC
1394       expected_cq_result = false;
1395       expected_client_cq_result = false;
1396     }
1397 
1398     std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1399                             &ignore_client_cq_result] {
1400       // Client attempts to read the three messages from the server
1401       for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1402         EchoResponse recv_response;
1403         cli_stream->Read(&recv_response, tag(tag_idx));
1404         Verifier()
1405             .Expect(tag_idx, expected_client_cq_result)
1406             .Verify(&cli_cq, ignore_client_cq_result);
1407       }
1408     });
1409 
1410     std::thread* server_try_cancel_thd = nullptr;
1411 
1412     auto verif = Verifier();
1413 
1414     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1415       server_try_cancel_thd =
1416           new std::thread(&ServerContext::TryCancel, &srv_ctx);
1417 
1418       // Server will cancel the RPC in a parallel thread while writing responses
1419       // to the client. Since the cancellation can happen at anytime, some of
1420       // the cq results (i.e those until cancellation) might be true but it is
1421       // non deterministic. So better to ignore the cq results
1422       ignore_cq_result = true;
1423       // Expect that we might possibly see the done tag that
1424       // indicates cancellation completion in this case
1425       want_done_tag = true;
1426       verif.Expect(11, true);
1427     }
1428 
1429     // Server sends three messages (tags 3, 4 and 5)
1430     // But if want_done tag is true, we might also see tag 11
1431     for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1432       send_response.set_message("Pong " + grpc::to_string(tag_idx));
1433       srv_stream.Write(send_response, tag(tag_idx));
1434       // Note that we'll add something to the verifier and verify that
1435       // something was seen, but it might be tag 11 and not what we
1436       // just added
1437       int got_tag = verif.Expect(tag_idx, expected_cq_result)
1438                         .Next(cq_.get(), ignore_cq_result);
1439       GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1440       if (got_tag == 11) {
1441         EXPECT_TRUE(srv_ctx.IsCancelled());
1442         want_done_tag = false;
1443         // Now get the other entry that we were waiting on
1444         EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1445       }
1446     }
1447 
1448     if (server_try_cancel_thd != nullptr) {
1449       server_try_cancel_thd->join();
1450       delete server_try_cancel_thd;
1451     }
1452 
1453     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1454       srv_ctx.TryCancel();
1455       want_done_tag = true;
1456       verif.Expect(11, true);
1457     }
1458 
1459     if (want_done_tag) {
1460       verif.Verify(cq_.get());
1461       EXPECT_TRUE(srv_ctx.IsCancelled());
1462       want_done_tag = false;
1463     }
1464 
1465     cli_thread.join();
1466 
1467     // The RPC has been cancelled at this point for sure (i.e irrespective of
1468     // the value of `server_try_cancel` is). So, from this point forward, we
1469     // know that cq results are supposed to return false on server.
1470 
1471     // Server finishes the stream (but the RPC is already cancelled)
1472     srv_stream.Finish(Status::CANCELLED, tag(9));
1473     Verifier().Expect(9, false).Verify(cq_.get());
1474 
1475     // Client will see the cancellation
1476     cli_stream->Finish(&recv_status, tag(10));
1477     Verifier().Expect(10, true).Verify(&cli_cq);
1478     EXPECT_FALSE(recv_status.ok());
1479     EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1480 
1481     cli_cq.Shutdown();
1482     void* dummy_tag;
1483     bool dummy_ok;
1484     while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1485     }
1486   }
1487 
1488   // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1489   // server.
1490   //
1491   // Depending on the value of server_try_cancel parameter, this will
1492   // test one of the following three scenarios:
1493   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1494   //   writing any messages from/to the client
1495   //
1496   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1497   //   messages from the client
1498   //
1499   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1500   //   messages from the client (but before sending any status back to the
1501   //   client)
TestBidiStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1502   void TestBidiStreamingServerCancel(
1503       ServerTryCancelRequestPhase server_try_cancel) {
1504     ResetStub();
1505 
1506     EchoRequest send_request;
1507     EchoRequest recv_request;
1508     EchoResponse send_response;
1509     EchoResponse recv_response;
1510     Status recv_status;
1511     ClientContext cli_ctx;
1512     ServerContext srv_ctx;
1513     ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1514 
1515     // Initiate the call from the client side
1516     std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1517         cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1518 
1519     // On the server, request to be notified of the 'BidiStream' call and
1520     // receive the call just made by the client
1521     srv_ctx.AsyncNotifyWhenDone(tag(11));
1522     service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1523                                 tag(2));
1524     Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1525 
1526     auto verif = Verifier();
1527 
1528     // Client sends the first and the only message
1529     send_request.set_message("Ping");
1530     cli_stream->Write(send_request, tag(3));
1531     verif.Expect(3, true);
1532 
1533     bool expected_cq_result = true;
1534     bool ignore_cq_result = false;
1535     bool want_done_tag = false;
1536 
1537     int got_tag, got_tag2;
1538     bool tag_3_done = false;
1539 
1540     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1541       srv_ctx.TryCancel();
1542       verif.Expect(11, true);
1543       // We know for sure that all server cq results will be false from
1544       // this point since the server cancelled the RPC. However, we can't
1545       // say for sure about the client
1546       expected_cq_result = false;
1547       ignore_cq_result = true;
1548 
1549       do {
1550         got_tag = verif.Next(cq_.get(), ignore_cq_result);
1551         GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1552         if (got_tag == 3) {
1553           tag_3_done = true;
1554         }
1555       } while (got_tag != 11);
1556       EXPECT_TRUE(srv_ctx.IsCancelled());
1557     }
1558 
1559     std::thread* server_try_cancel_thd = nullptr;
1560 
1561     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1562       server_try_cancel_thd =
1563           new std::thread(&ServerContext::TryCancel, &srv_ctx);
1564 
1565       // Since server is going to cancel the RPC in a parallel thread, some of
1566       // the cq results (i.e those until the cancellation) might be true. Since
1567       // that number is non-deterministic, it is better to ignore the cq results
1568       ignore_cq_result = true;
1569       // Expect that we might possibly see the done tag that
1570       // indicates cancellation completion in this case
1571       want_done_tag = true;
1572       verif.Expect(11, true);
1573     }
1574 
1575     srv_stream.Read(&recv_request, tag(4));
1576     verif.Expect(4, expected_cq_result);
1577     got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1578     got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1579     GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1580                (got_tag == 11 && want_done_tag));
1581     GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1582                (got_tag2 == 11 && want_done_tag));
1583     // If we get 3 and 4, we don't need to wait for 11, but if
1584     // we get 11, we should also clear 3 and 4
1585     if (got_tag + got_tag2 != 7) {
1586       EXPECT_TRUE(srv_ctx.IsCancelled());
1587       want_done_tag = false;
1588       got_tag = verif.Next(cq_.get(), ignore_cq_result);
1589       GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1590     }
1591 
1592     send_response.set_message("Pong");
1593     srv_stream.Write(send_response, tag(5));
1594     verif.Expect(5, expected_cq_result);
1595 
1596     cli_stream->Read(&recv_response, tag(6));
1597     verif.Expect(6, expected_cq_result);
1598     got_tag = verif.Next(cq_.get(), ignore_cq_result);
1599     got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1600     GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1601                (got_tag == 11 && want_done_tag));
1602     GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1603                (got_tag2 == 11 && want_done_tag));
1604     // If we get 5 and 6, we don't need to wait for 11, but if
1605     // we get 11, we should also clear 5 and 6
1606     if (got_tag + got_tag2 != 11) {
1607       EXPECT_TRUE(srv_ctx.IsCancelled());
1608       want_done_tag = false;
1609       got_tag = verif.Next(cq_.get(), ignore_cq_result);
1610       GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1611     }
1612 
1613     // This is expected to succeed in all cases
1614     cli_stream->WritesDone(tag(7));
1615     verif.Expect(7, true);
1616     // TODO(vjpai): Consider whether the following is too flexible
1617     // or whether it should just be reset to ignore_cq_result
1618     bool ignore_cq_wd_result =
1619         ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1620     got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1621     GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1622     if (got_tag == 11) {
1623       EXPECT_TRUE(srv_ctx.IsCancelled());
1624       want_done_tag = false;
1625       // Now get the other entry that we were waiting on
1626       EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1627     }
1628 
1629     // This is expected to fail in all cases i.e for all values of
1630     // server_try_cancel. This is because at this point, either there are no
1631     // more msgs from the client (because client called WritesDone) or the RPC
1632     // is cancelled on the server
1633     srv_stream.Read(&recv_request, tag(8));
1634     verif.Expect(8, false);
1635     got_tag = verif.Next(cq_.get(), ignore_cq_result);
1636     GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1637     if (got_tag == 11) {
1638       EXPECT_TRUE(srv_ctx.IsCancelled());
1639       want_done_tag = false;
1640       // Now get the other entry that we were waiting on
1641       EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1642     }
1643 
1644     if (server_try_cancel_thd != nullptr) {
1645       server_try_cancel_thd->join();
1646       delete server_try_cancel_thd;
1647     }
1648 
1649     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1650       srv_ctx.TryCancel();
1651       want_done_tag = true;
1652       verif.Expect(11, true);
1653     }
1654 
1655     if (want_done_tag) {
1656       verif.Verify(cq_.get());
1657       EXPECT_TRUE(srv_ctx.IsCancelled());
1658       want_done_tag = false;
1659     }
1660 
1661     // The RPC has been cancelled at this point for sure (i.e irrespective of
1662     // the value of `server_try_cancel` is). So, from this point forward, we
1663     // know that cq results are supposed to return false on server.
1664 
1665     srv_stream.Finish(Status::CANCELLED, tag(9));
1666     Verifier().Expect(9, false).Verify(cq_.get());
1667 
1668     cli_stream->Finish(&recv_status, tag(10));
1669     Verifier().Expect(10, true).Verify(cq_.get());
1670     EXPECT_FALSE(recv_status.ok());
1671     EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1672   }
1673 };
1674 
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelBefore)1675 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1676   TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1677 }
1678 
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelDuring)1679 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1680   TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1681 }
1682 
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelAfter)1683 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1684   TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1685 }
1686 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelBefore)1687 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1688   TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1689 }
1690 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelDuring)1691 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1692   TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1693 }
1694 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelAfter)1695 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1696   TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1697 }
1698 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelBefore)1699 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1700   TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1701 }
1702 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelDuring)1703 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1704   TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1705 }
1706 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelAfter)1707 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1708   TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1709 }
1710 
CreateTestScenarios(bool test_secure,bool test_message_size_limit)1711 std::vector<TestScenario> CreateTestScenarios(bool test_secure,
1712                                               bool test_message_size_limit) {
1713   std::vector<TestScenario> scenarios;
1714   std::vector<grpc::string> credentials_types;
1715   std::vector<grpc::string> messages;
1716 
1717   auto insec_ok = [] {
1718     // Only allow insecure credentials type when it is registered with the
1719     // provider. User may create providers that do not have insecure.
1720     return GetCredentialsProvider()->GetChannelCredentials(
1721                kInsecureCredentialsType, nullptr) != nullptr;
1722   };
1723 
1724   if (insec_ok()) {
1725     credentials_types.push_back(kInsecureCredentialsType);
1726   }
1727   auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1728   for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1729     credentials_types.push_back(*sec);
1730   }
1731   GPR_ASSERT(!credentials_types.empty());
1732 
1733   messages.push_back("Hello");
1734   if (test_message_size_limit) {
1735     for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1736          k *= 32) {
1737       grpc::string big_msg;
1738       for (size_t i = 0; i < k * 1024; ++i) {
1739         char c = 'a' + (i % 26);
1740         big_msg += c;
1741       }
1742       messages.push_back(big_msg);
1743     }
1744     messages.push_back(
1745         grpc::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 10, 'a'));
1746   }
1747 
1748   // TODO (sreek) Renable tests with health check service after the issue
1749   // https://github.com/grpc/grpc/issues/11223 is resolved
1750   for (auto health_check_service : {false}) {
1751     for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1752       for (auto cred = credentials_types.begin();
1753            cred != credentials_types.end(); ++cred) {
1754         scenarios.emplace_back(false, *cred, health_check_service, *msg);
1755       }
1756       if (insec_ok()) {
1757         scenarios.emplace_back(true, kInsecureCredentialsType,
1758                                health_check_service, *msg);
1759       }
1760     }
1761   }
1762   return scenarios;
1763 }
1764 
1765 INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
1766                         ::testing::ValuesIn(CreateTestScenarios(true, true)));
1767 INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1768                         AsyncEnd2endServerTryCancelTest,
1769                         ::testing::ValuesIn(CreateTestScenarios(false, false)));
1770 
1771 }  // namespace
1772 }  // namespace testing
1773 }  // namespace grpc
1774 
main(int argc,char ** argv)1775 int main(int argc, char** argv) {
1776   // Change the backup poll interval from 5s to 100ms to speed up the
1777   // ReconnectChannel test
1778   gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "100");
1779   grpc_test_init(argc, argv);
1780   ::testing::InitGoogleTest(&argc, argv);
1781   int ret = RUN_ALL_TESTS();
1782   return ret;
1783 }
1784