1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Diagnostics; 22 using System.Linq; 23 using System.Threading; 24 using System.Threading.Tasks; 25 using Grpc.Core; 26 using Grpc.Core.Internal; 27 using Grpc.Core.Profiling; 28 using Grpc.Core.Utils; 29 using NUnit.Framework; 30 31 namespace Grpc.Core.Tests 32 { 33 public class CallCancellationTest 34 { 35 const string Host = "127.0.0.1"; 36 37 MockServiceHelper helper; 38 Server server; 39 Channel channel; 40 41 [SetUp] Init()42 public void Init() 43 { 44 helper = new MockServiceHelper(Host); 45 server = helper.GetServer(); 46 server.Start(); 47 channel = helper.GetChannel(); 48 } 49 50 [TearDown] Cleanup()51 public void Cleanup() 52 { 53 channel.ShutdownAsync().Wait(); 54 server.ShutdownAsync().Wait(); 55 } 56 57 [Test] ClientStreamingCall_CancelAfterBegin()58 public async Task ClientStreamingCall_CancelAfterBegin() 59 { 60 var barrier = new TaskCompletionSource<object>(); 61 62 helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => 63 { 64 barrier.SetResult(null); 65 await requestStream.ToListAsync(); 66 return ""; 67 }); 68 69 var cts = new CancellationTokenSource(); 70 var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); 71 72 await barrier.Task; // make sure the handler has started. 73 cts.Cancel(); 74 75 try 76 { 77 // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. 78 await call.ResponseAsync; 79 Assert.Fail(); 80 } 81 catch (RpcException ex) 82 { 83 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); 84 } 85 } 86 87 [Test] ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()88 public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() 89 { 90 var handlerStartedBarrier = new TaskCompletionSource<object>(); 91 var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); 92 var successTcs = new TaskCompletionSource<string>(); 93 94 helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => 95 { 96 handlerStartedBarrier.SetResult(null); 97 98 // wait for cancellation to be delivered. 99 context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); 100 await cancelNotificationReceivedBarrier.Task; 101 102 var moveNextResult = await requestStream.MoveNext(); 103 successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); 104 return ""; 105 }); 106 107 var cts = new CancellationTokenSource(); 108 var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); 109 110 await handlerStartedBarrier.Task; 111 cts.Cancel(); 112 113 try 114 { 115 await call.ResponseAsync; 116 Assert.Fail(); 117 } 118 catch (RpcException ex) 119 { 120 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); 121 } 122 Assert.AreEqual("SUCCESS", await successTcs.Task); 123 } 124 125 [Test] ClientStreamingCall_CancelServerSideRead()126 public async Task ClientStreamingCall_CancelServerSideRead() 127 { 128 helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => 129 { 130 var cts = new CancellationTokenSource(); 131 var moveNextTask = requestStream.MoveNext(cts.Token); 132 cts.Cancel(); 133 await moveNextTask; 134 return ""; 135 }); 136 137 var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); 138 try 139 { 140 // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. 141 await call.ResponseAsync; 142 Assert.Fail(); 143 } 144 catch (RpcException ex) 145 { 146 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); 147 } 148 } 149 150 [Test] ServerStreamingCall_CancelClientSideRead()151 public async Task ServerStreamingCall_CancelClientSideRead() 152 { 153 helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => 154 { 155 await responseStream.WriteAsync("abc"); 156 while (!context.CancellationToken.IsCancellationRequested) 157 { 158 await Task.Delay(10); 159 } 160 }); 161 162 var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); 163 await call.ResponseStream.MoveNext(); 164 Assert.AreEqual("abc", call.ResponseStream.Current); 165 166 var cts = new CancellationTokenSource(); 167 var moveNextTask = call.ResponseStream.MoveNext(cts.Token); 168 cts.Cancel(); 169 170 try 171 { 172 // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. 173 await moveNextTask; 174 Assert.Fail(); 175 } 176 catch (RpcException ex) 177 { 178 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); 179 } 180 } 181 } 182 } 183