1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Diagnostics;
22 using System.Linq;
23 using System.Threading;
24 using System.Threading.Tasks;
25 using Grpc.Core;
26 using Grpc.Core.Internal;
27 using Grpc.Core.Profiling;
28 using Grpc.Core.Utils;
29 using NUnit.Framework;
30 
31 namespace Grpc.Core.Tests
32 {
33     public class CallCancellationTest
34     {
35         const string Host = "127.0.0.1";
36 
37         MockServiceHelper helper;
38         Server server;
39         Channel channel;
40 
41         [SetUp]
Init()42         public void Init()
43         {
44             helper = new MockServiceHelper(Host);
45             server = helper.GetServer();
46             server.Start();
47             channel = helper.GetChannel();
48         }
49 
50         [TearDown]
Cleanup()51         public void Cleanup()
52         {
53             channel.ShutdownAsync().Wait();
54             server.ShutdownAsync().Wait();
55         }
56 
57         [Test]
ClientStreamingCall_CancelAfterBegin()58         public async Task ClientStreamingCall_CancelAfterBegin()
59         {
60             var barrier = new TaskCompletionSource<object>();
61 
62             helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
63             {
64                 barrier.SetResult(null);
65                 await requestStream.ToListAsync();
66                 return "";
67             });
68 
69             var cts = new CancellationTokenSource();
70             var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
71 
72             await barrier.Task;  // make sure the handler has started.
73             cts.Cancel();
74 
75             try
76             {
77                 // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
78                 await call.ResponseAsync;
79                 Assert.Fail();
80             }
81             catch (RpcException ex)
82             {
83                 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
84             }
85         }
86 
87         [Test]
ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()88         public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
89         {
90             var handlerStartedBarrier = new TaskCompletionSource<object>();
91             var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
92             var successTcs = new TaskCompletionSource<string>();
93 
94             helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
95             {
96                 handlerStartedBarrier.SetResult(null);
97 
98                 // wait for cancellation to be delivered.
99                 context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
100                 await cancelNotificationReceivedBarrier.Task;
101 
102                 var moveNextResult = await requestStream.MoveNext();
103                 successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
104                 return "";
105             });
106 
107             var cts = new CancellationTokenSource();
108             var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
109 
110             await handlerStartedBarrier.Task;
111             cts.Cancel();
112 
113             try
114             {
115                 await call.ResponseAsync;
116                 Assert.Fail();
117             }
118             catch (RpcException ex)
119             {
120                 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
121             }
122             Assert.AreEqual("SUCCESS", await successTcs.Task);
123         }
124 
125         [Test]
ClientStreamingCall_CancelServerSideRead()126         public async Task ClientStreamingCall_CancelServerSideRead()
127         {
128             helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
129             {
130                 var cts = new CancellationTokenSource();
131                 var moveNextTask = requestStream.MoveNext(cts.Token);
132                 cts.Cancel();
133                 await moveNextTask;
134                 return "";
135             });
136 
137             var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
138             try
139             {
140                 // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
141                 await call.ResponseAsync;
142                 Assert.Fail();
143             }
144             catch (RpcException ex)
145             {
146                 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
147             }
148         }
149 
150         [Test]
ServerStreamingCall_CancelClientSideRead()151         public async Task ServerStreamingCall_CancelClientSideRead()
152         {
153             helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
154             {
155                 await responseStream.WriteAsync("abc");
156                 while (!context.CancellationToken.IsCancellationRequested)
157                 {
158                     await Task.Delay(10);
159                 }
160             });
161 
162             var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
163             await call.ResponseStream.MoveNext();
164             Assert.AreEqual("abc", call.ResponseStream.Current);
165 
166             var cts = new CancellationTokenSource();
167             var moveNextTask = call.ResponseStream.MoveNext(cts.Token);
168             cts.Cancel();
169 
170             try
171             {
172                 // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
173                 await moveNextTask;
174                 Assert.Fail();
175             }
176             catch (RpcException ex)
177             {
178                 Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
179             }
180         }
181     }
182 }
183