1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Threading.Tasks; 22 23 using Grpc.Core.Internal; 24 using NUnit.Framework; 25 26 namespace Grpc.Core.Internal.Tests 27 { 28 /// <summary> 29 /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations. 30 /// </summary> 31 public class AsyncCallTest 32 { 33 Channel channel; 34 FakeNativeCall fakeCall; 35 AsyncCall<string, string> asyncCall; 36 37 [SetUp] Init()38 public void Init() 39 { 40 channel = new Channel("localhost", ChannelCredentials.Insecure); 41 42 fakeCall = new FakeNativeCall(); 43 44 var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions()); 45 asyncCall = new AsyncCall<string, string>(callDetails, fakeCall); 46 } 47 48 [TearDown] Cleanup()49 public void Cleanup() 50 { 51 channel.ShutdownAsync().Wait(); 52 } 53 54 [Test] AsyncUnary_CanBeStartedOnlyOnce()55 public void AsyncUnary_CanBeStartedOnlyOnce() 56 { 57 asyncCall.UnaryCallAsync("request1"); 58 Assert.Throws(typeof(InvalidOperationException), 59 () => asyncCall.UnaryCallAsync("abc")); 60 } 61 62 [Test] AsyncUnary_StreamingOperationsNotAllowed()63 public void AsyncUnary_StreamingOperationsNotAllowed() 64 { 65 asyncCall.UnaryCallAsync("request1"); 66 Assert.ThrowsAsync(typeof(InvalidOperationException), 67 async () => await asyncCall.ReadMessageAsync()); 68 Assert.Throws(typeof(InvalidOperationException), 69 () => asyncCall.SendMessageAsync("abc", new WriteFlags())); 70 } 71 72 [Test] AsyncUnary_Success()73 public void AsyncUnary_Success() 74 { 75 var resultTask = asyncCall.UnaryCallAsync("request1"); 76 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 77 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 78 CreateResponsePayload(), 79 new Metadata()); 80 81 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 82 } 83 84 [Test] AsyncUnary_NonSuccessStatusCode()85 public void AsyncUnary_NonSuccessStatusCode() 86 { 87 var resultTask = asyncCall.UnaryCallAsync("request1"); 88 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 89 CreateClientSideStatus(StatusCode.InvalidArgument), 90 null, 91 new Metadata()); 92 93 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); 94 } 95 96 [Test] AsyncUnary_NullResponsePayload()97 public void AsyncUnary_NullResponsePayload() 98 { 99 var resultTask = asyncCall.UnaryCallAsync("request1"); 100 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 101 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 102 null, 103 new Metadata()); 104 105 // failure to deserialize will result in InvalidArgument status. 106 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 107 } 108 109 [Test] AsyncUnary_RequestSerializationExceptionDoesntLeakResources()110 public void AsyncUnary_RequestSerializationExceptionDoesntLeakResources() 111 { 112 string nullRequest = null; // will throw when serializing 113 Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCallAsync(nullRequest)); 114 Assert.AreEqual(0, channel.GetCallReferenceCount()); 115 Assert.IsTrue(fakeCall.IsDisposed); 116 } 117 118 [Test] AsyncUnary_StartCallFailureDoesntLeakResources()119 public void AsyncUnary_StartCallFailureDoesntLeakResources() 120 { 121 fakeCall.MakeStartCallFail(); 122 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCallAsync("request1")); 123 Assert.AreEqual(0, channel.GetCallReferenceCount()); 124 Assert.IsTrue(fakeCall.IsDisposed); 125 } 126 127 [Test] SyncUnary_RequestSerializationExceptionDoesntLeakResources()128 public void SyncUnary_RequestSerializationExceptionDoesntLeakResources() 129 { 130 string nullRequest = null; // will throw when serializing 131 Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCall(nullRequest)); 132 Assert.AreEqual(0, channel.GetCallReferenceCount()); 133 Assert.IsTrue(fakeCall.IsDisposed); 134 } 135 136 [Test] SyncUnary_StartCallFailureDoesntLeakResources()137 public void SyncUnary_StartCallFailureDoesntLeakResources() 138 { 139 fakeCall.MakeStartCallFail(); 140 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCall("request1")); 141 Assert.AreEqual(0, channel.GetCallReferenceCount()); 142 Assert.IsTrue(fakeCall.IsDisposed); 143 } 144 145 [Test] ClientStreaming_StreamingReadNotAllowed()146 public void ClientStreaming_StreamingReadNotAllowed() 147 { 148 asyncCall.ClientStreamingCallAsync(); 149 Assert.ThrowsAsync(typeof(InvalidOperationException), 150 async () => await asyncCall.ReadMessageAsync()); 151 } 152 153 [Test] ClientStreaming_NoRequest_Success()154 public void ClientStreaming_NoRequest_Success() 155 { 156 var resultTask = asyncCall.ClientStreamingCallAsync(); 157 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 158 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 159 CreateResponsePayload(), 160 new Metadata()); 161 162 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 163 } 164 165 [Test] ClientStreaming_NoRequest_NonSuccessStatusCode()166 public void ClientStreaming_NoRequest_NonSuccessStatusCode() 167 { 168 var resultTask = asyncCall.ClientStreamingCallAsync(); 169 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 170 CreateClientSideStatus(StatusCode.InvalidArgument), 171 null, 172 new Metadata()); 173 174 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); 175 } 176 177 [Test] ClientStreaming_MoreRequests_Success()178 public void ClientStreaming_MoreRequests_Success() 179 { 180 var resultTask = asyncCall.ClientStreamingCallAsync(); 181 var requestStream = new ClientRequestStream<string, string>(asyncCall); 182 183 var writeTask = requestStream.WriteAsync("request1"); 184 fakeCall.SendCompletionCallback.OnSendCompletion(true); 185 writeTask.Wait(); 186 187 var writeTask2 = requestStream.WriteAsync("request2"); 188 fakeCall.SendCompletionCallback.OnSendCompletion(true); 189 writeTask2.Wait(); 190 191 var completeTask = requestStream.CompleteAsync(); 192 fakeCall.SendCompletionCallback.OnSendCompletion(true); 193 completeTask.Wait(); 194 195 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 196 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 197 CreateResponsePayload(), 198 new Metadata()); 199 200 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 201 } 202 203 [Test] ClientStreaming_WriteFailureThrowsRpcException()204 public void ClientStreaming_WriteFailureThrowsRpcException() 205 { 206 var resultTask = asyncCall.ClientStreamingCallAsync(); 207 var requestStream = new ClientRequestStream<string, string>(asyncCall); 208 209 var writeTask = requestStream.WriteAsync("request1"); 210 fakeCall.SendCompletionCallback.OnSendCompletion(false); 211 212 // The write will wait for call to finish to receive the status code. 213 Assert.IsFalse(writeTask.IsCompleted); 214 215 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 216 CreateClientSideStatus(StatusCode.Internal), 217 null, 218 new Metadata()); 219 220 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 221 Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); 222 223 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 224 } 225 226 [Test] ClientStreaming_WriteFailureThrowsRpcException2()227 public void ClientStreaming_WriteFailureThrowsRpcException2() 228 { 229 var resultTask = asyncCall.ClientStreamingCallAsync(); 230 var requestStream = new ClientRequestStream<string, string>(asyncCall); 231 232 var writeTask = requestStream.WriteAsync("request1"); 233 234 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 235 CreateClientSideStatus(StatusCode.Internal), 236 null, 237 new Metadata()); 238 239 fakeCall.SendCompletionCallback.OnSendCompletion(false); 240 241 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 242 Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); 243 244 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 245 } 246 247 [Test] ClientStreaming_WriteFailureThrowsRpcException3()248 public void ClientStreaming_WriteFailureThrowsRpcException3() 249 { 250 var resultTask = asyncCall.ClientStreamingCallAsync(); 251 var requestStream = new ClientRequestStream<string, string>(asyncCall); 252 253 var writeTask = requestStream.WriteAsync("request1"); 254 fakeCall.SendCompletionCallback.OnSendCompletion(false); 255 256 // Until the delayed write completion has been triggered, 257 // we still act as if there was an active write. 258 Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2")); 259 260 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 261 CreateClientSideStatus(StatusCode.Internal), 262 null, 263 new Metadata()); 264 265 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 266 Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); 267 268 // Following attempts to write keep delivering the same status 269 var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished")); 270 Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode); 271 272 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 273 } 274 275 [Test] ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()276 public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException() 277 { 278 var resultTask = asyncCall.ClientStreamingCallAsync(); 279 var requestStream = new ClientRequestStream<string, string>(asyncCall); 280 281 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 282 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 283 CreateResponsePayload(), 284 new Metadata()); 285 286 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 287 288 var writeTask = requestStream.WriteAsync("request1"); 289 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 290 Assert.AreEqual(Status.DefaultSuccess, ex.Status); 291 } 292 293 [Test] ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()294 public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2() 295 { 296 var resultTask = asyncCall.ClientStreamingCallAsync(); 297 var requestStream = new ClientRequestStream<string, string>(asyncCall); 298 299 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 300 new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()), 301 CreateResponsePayload(), 302 new Metadata()); 303 304 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange); 305 306 var writeTask = requestStream.WriteAsync("request1"); 307 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 308 Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode); 309 } 310 311 [Test] ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()312 public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException() 313 { 314 var resultTask = asyncCall.ClientStreamingCallAsync(); 315 var requestStream = new ClientRequestStream<string, string>(asyncCall); 316 317 requestStream.CompleteAsync(); 318 319 Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1")); 320 321 fakeCall.SendCompletionCallback.OnSendCompletion(true); 322 323 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 324 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 325 CreateResponsePayload(), 326 new Metadata()); 327 328 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 329 } 330 331 [Test] ClientStreaming_CompleteAfterReceivingStatusSucceeds()332 public void ClientStreaming_CompleteAfterReceivingStatusSucceeds() 333 { 334 var resultTask = asyncCall.ClientStreamingCallAsync(); 335 var requestStream = new ClientRequestStream<string, string>(asyncCall); 336 337 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 338 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 339 CreateResponsePayload(), 340 new Metadata()); 341 342 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 343 Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); 344 } 345 346 [Test] ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()347 public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() 348 { 349 var resultTask = asyncCall.ClientStreamingCallAsync(); 350 var requestStream = new ClientRequestStream<string, string>(asyncCall); 351 352 asyncCall.Cancel(); 353 Assert.IsTrue(fakeCall.IsCancelled); 354 355 var writeTask = requestStream.WriteAsync("request1"); 356 Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); 357 358 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 359 CreateClientSideStatus(StatusCode.Cancelled), 360 null, 361 new Metadata()); 362 363 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); 364 } 365 366 [Test] ClientStreaming_StartCallFailureDoesntLeakResources()367 public void ClientStreaming_StartCallFailureDoesntLeakResources() 368 { 369 fakeCall.MakeStartCallFail(); 370 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.ClientStreamingCallAsync()); 371 Assert.AreEqual(0, channel.GetCallReferenceCount()); 372 Assert.IsTrue(fakeCall.IsDisposed); 373 } 374 375 [Test] ServerStreaming_StreamingSendNotAllowed()376 public void ServerStreaming_StreamingSendNotAllowed() 377 { 378 asyncCall.StartServerStreamingCall("request1"); 379 Assert.Throws(typeof(InvalidOperationException), 380 () => asyncCall.SendMessageAsync("abc", new WriteFlags())); 381 } 382 383 [Test] ServerStreaming_NoResponse_Success1()384 public void ServerStreaming_NoResponse_Success1() 385 { 386 asyncCall.StartServerStreamingCall("request1"); 387 var responseStream = new ClientResponseStream<string, string>(asyncCall); 388 var readTask = responseStream.MoveNext(); 389 390 fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata()); 391 Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); 392 393 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 394 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 395 396 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 397 } 398 399 [Test] ServerStreaming_NoResponse_Success2()400 public void ServerStreaming_NoResponse_Success2() 401 { 402 asyncCall.StartServerStreamingCall("request1"); 403 var responseStream = new ClientResponseStream<string, string>(asyncCall); 404 var readTask = responseStream.MoveNext(); 405 406 // try alternative order of completions 407 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 408 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 409 410 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 411 } 412 413 [Test] ServerStreaming_NoResponse_ReadFailure()414 public void ServerStreaming_NoResponse_ReadFailure() 415 { 416 asyncCall.StartServerStreamingCall("request1"); 417 var responseStream = new ClientResponseStream<string, string>(asyncCall); 418 var readTask = responseStream.MoveNext(); 419 420 fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code. 421 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal)); 422 423 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); 424 } 425 426 [Test] ServerStreaming_MoreResponses_Success()427 public void ServerStreaming_MoreResponses_Success() 428 { 429 asyncCall.StartServerStreamingCall("request1"); 430 var responseStream = new ClientResponseStream<string, string>(asyncCall); 431 432 var readTask1 = responseStream.MoveNext(); 433 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 434 Assert.IsTrue(readTask1.Result); 435 Assert.AreEqual("response1", responseStream.Current); 436 437 var readTask2 = responseStream.MoveNext(); 438 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 439 Assert.IsTrue(readTask2.Result); 440 Assert.AreEqual("response1", responseStream.Current); 441 442 var readTask3 = responseStream.MoveNext(); 443 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 444 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 445 446 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); 447 } 448 449 [Test] ServerStreaming_RequestSerializationExceptionDoesntLeakResources()450 public void ServerStreaming_RequestSerializationExceptionDoesntLeakResources() 451 { 452 string nullRequest = null; // will throw when serializing 453 Assert.Throws(typeof(ArgumentNullException), () => asyncCall.StartServerStreamingCall(nullRequest)); 454 Assert.AreEqual(0, channel.GetCallReferenceCount()); 455 Assert.IsTrue(fakeCall.IsDisposed); 456 457 var responseStream = new ClientResponseStream<string, string>(asyncCall); 458 var readTask = responseStream.MoveNext(); 459 } 460 461 [Test] ServerStreaming_StartCallFailureDoesntLeakResources()462 public void ServerStreaming_StartCallFailureDoesntLeakResources() 463 { 464 fakeCall.MakeStartCallFail(); 465 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartServerStreamingCall("request1")); 466 Assert.AreEqual(0, channel.GetCallReferenceCount()); 467 Assert.IsTrue(fakeCall.IsDisposed); 468 } 469 470 [Test] DuplexStreaming_NoRequestNoResponse_Success()471 public void DuplexStreaming_NoRequestNoResponse_Success() 472 { 473 asyncCall.StartDuplexStreamingCall(); 474 var requestStream = new ClientRequestStream<string, string>(asyncCall); 475 var responseStream = new ClientResponseStream<string, string>(asyncCall); 476 477 var writeTask1 = requestStream.CompleteAsync(); 478 fakeCall.SendCompletionCallback.OnSendCompletion(true); 479 Assert.DoesNotThrowAsync(async () => await writeTask1); 480 481 var readTask = responseStream.MoveNext(); 482 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 483 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 484 485 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 486 } 487 488 [Test] DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()489 public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException() 490 { 491 asyncCall.StartDuplexStreamingCall(); 492 var requestStream = new ClientRequestStream<string, string>(asyncCall); 493 var responseStream = new ClientResponseStream<string, string>(asyncCall); 494 495 var readTask = responseStream.MoveNext(); 496 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 497 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 498 499 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 500 501 var writeTask = requestStream.WriteAsync("request1"); 502 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 503 Assert.AreEqual(Status.DefaultSuccess, ex.Status); 504 } 505 506 [Test] DuplexStreaming_CompleteAfterReceivingStatusSuceeds()507 public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds() 508 { 509 asyncCall.StartDuplexStreamingCall(); 510 var requestStream = new ClientRequestStream<string, string>(asyncCall); 511 var responseStream = new ClientResponseStream<string, string>(asyncCall); 512 513 var readTask = responseStream.MoveNext(); 514 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 515 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 516 517 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 518 519 Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); 520 } 521 522 [Test] DuplexStreaming_WriteFailureThrowsRpcException()523 public void DuplexStreaming_WriteFailureThrowsRpcException() 524 { 525 asyncCall.StartDuplexStreamingCall(); 526 var requestStream = new ClientRequestStream<string, string>(asyncCall); 527 var responseStream = new ClientResponseStream<string, string>(asyncCall); 528 529 var writeTask = requestStream.WriteAsync("request1"); 530 fakeCall.SendCompletionCallback.OnSendCompletion(false); 531 532 // The write will wait for call to finish to receive the status code. 533 Assert.IsFalse(writeTask.IsCompleted); 534 535 var readTask = responseStream.MoveNext(); 536 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 537 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); 538 539 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 540 Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); 541 542 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied); 543 } 544 545 [Test] DuplexStreaming_WriteFailureThrowsRpcException2()546 public void DuplexStreaming_WriteFailureThrowsRpcException2() 547 { 548 asyncCall.StartDuplexStreamingCall(); 549 var requestStream = new ClientRequestStream<string, string>(asyncCall); 550 var responseStream = new ClientResponseStream<string, string>(asyncCall); 551 552 var writeTask = requestStream.WriteAsync("request1"); 553 554 var readTask = responseStream.MoveNext(); 555 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 556 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); 557 fakeCall.SendCompletionCallback.OnSendCompletion(false); 558 559 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 560 Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); 561 562 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied); 563 } 564 565 [Test] DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()566 public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() 567 { 568 asyncCall.StartDuplexStreamingCall(); 569 var requestStream = new ClientRequestStream<string, string>(asyncCall); 570 var responseStream = new ClientResponseStream<string, string>(asyncCall); 571 572 asyncCall.Cancel(); 573 Assert.IsTrue(fakeCall.IsCancelled); 574 575 var writeTask = requestStream.WriteAsync("request1"); 576 Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); 577 578 var readTask = responseStream.MoveNext(); 579 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 580 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); 581 582 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); 583 } 584 585 [Test] DuplexStreaming_ReadAfterCancellationRequestCanSucceed()586 public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed() 587 { 588 asyncCall.StartDuplexStreamingCall(); 589 var responseStream = new ClientResponseStream<string, string>(asyncCall); 590 591 asyncCall.Cancel(); 592 Assert.IsTrue(fakeCall.IsCancelled); 593 594 var readTask1 = responseStream.MoveNext(); 595 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 596 Assert.IsTrue(readTask1.Result); 597 Assert.AreEqual("response1", responseStream.Current); 598 599 var readTask2 = responseStream.MoveNext(); 600 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 601 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); 602 603 AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); 604 } 605 606 [Test] DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()607 public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed() 608 { 609 asyncCall.StartDuplexStreamingCall(); 610 var responseStream = new ClientResponseStream<string, string>(asyncCall); 611 612 var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request 613 asyncCall.Cancel(); 614 Assert.IsTrue(fakeCall.IsCancelled); 615 616 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 617 Assert.IsTrue(readTask1.Result); 618 Assert.AreEqual("response1", responseStream.Current); 619 620 var readTask2 = responseStream.MoveNext(); 621 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); 622 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); 623 624 AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); 625 } 626 627 [Test] DuplexStreaming_StartCallFailureDoesntLeakResources()628 public void DuplexStreaming_StartCallFailureDoesntLeakResources() 629 { 630 fakeCall.MakeStartCallFail(); 631 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartDuplexStreamingCall()); 632 Assert.AreEqual(0, channel.GetCallReferenceCount()); 633 Assert.IsTrue(fakeCall.IsDisposed); 634 } 635 CreateClientSideStatus(StatusCode statusCode)636 ClientSideStatus CreateClientSideStatus(StatusCode statusCode) 637 { 638 return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); 639 } 640 CreateResponsePayload()641 byte[] CreateResponsePayload() 642 { 643 return Marshallers.StringMarshaller.Serializer("response1"); 644 } 645 AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)646 static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask) 647 { 648 Assert.IsTrue(resultTask.IsCompleted); 649 Assert.IsTrue(fakeCall.IsDisposed); 650 651 Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); 652 Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); 653 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 654 Assert.AreEqual("response1", resultTask.Result); 655 } 656 AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)657 static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask) 658 { 659 Assert.IsTrue(moveNextTask.IsCompleted); 660 Assert.IsTrue(fakeCall.IsDisposed); 661 662 Assert.IsFalse(moveNextTask.Result); 663 Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); 664 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 665 } 666 AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)667 static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode) 668 { 669 Assert.IsTrue(resultTask.IsCompleted); 670 Assert.IsTrue(fakeCall.IsDisposed); 671 672 Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); 673 var ex = Assert.ThrowsAsync<RpcException>(async () => await resultTask); 674 Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); 675 Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); 676 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 677 } 678 AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)679 static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode) 680 { 681 Assert.IsTrue(moveNextTask.IsCompleted); 682 Assert.IsTrue(fakeCall.IsDisposed); 683 684 var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask); 685 Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); 686 Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); 687 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 688 } 689 } 690 } 691