1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Diagnostics;
21 using System.IO;
22 using System.Runtime.CompilerServices;
23 using System.Runtime.InteropServices;
24 using System.Threading;
25 using System.Threading.Tasks;
26 
27 using Grpc.Core.Internal;
28 using Grpc.Core.Logging;
29 using Grpc.Core.Profiling;
30 using Grpc.Core.Utils;
31 
32 namespace Grpc.Core.Internal
33 {
34     /// <summary>
35     /// Base for handling both client side and server side calls.
36     /// Manages native call lifecycle and provides convenience methods.
37     /// </summary>
38     internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback
39     {
40         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
41         protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
42 
43         readonly Func<TWrite, byte[]> serializer;
44         readonly Func<byte[], TRead> deserializer;
45 
46         protected readonly object myLock = new object();
47 
48         protected INativeCall call;
49         protected bool disposed;
50 
51         protected bool started;
52         protected bool cancelRequested;
53 
54         protected TaskCompletionSource<TRead> streamingReadTcs;  // Completion of a pending streaming read if not null.
55         protected TaskCompletionSource<object> streamingWriteTcs;  // Completion of a pending streaming write or send close from client if not null.
56         protected TaskCompletionSource<object> sendStatusFromServerTcs;
57         protected bool isStreamingWriteCompletionDelayed;  // Only used for the client side.
58 
59         protected bool readingDone;  // True if last read (i.e. read with null payload) was already received.
60         protected bool halfcloseRequested;  // True if send close have been initiated.
61         protected bool finished;  // True if close has been received from the peer.
62 
63         protected bool initialMetadataSent;
64         protected long streamingWritesCounter;  // Number of streaming send operations started so far.
65 
AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)66         public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
67         {
68             this.serializer = GrpcPreconditions.CheckNotNull(serializer);
69             this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
70         }
71 
72         /// <summary>
73         /// Requests cancelling the call.
74         /// </summary>
Cancel()75         public void Cancel()
76         {
77             lock (myLock)
78             {
79                 GrpcPreconditions.CheckState(started);
80                 cancelRequested = true;
81 
82                 if (!disposed)
83                 {
84                     call.Cancel();
85                 }
86             }
87         }
88 
89         /// <summary>
90         /// Requests cancelling the call with given status.
91         /// </summary>
CancelWithStatus(Status status)92         protected void CancelWithStatus(Status status)
93         {
94             lock (myLock)
95             {
96                 cancelRequested = true;
97 
98                 if (!disposed)
99                 {
100                     call.CancelWithStatus(status);
101                 }
102             }
103         }
104 
InitializeInternal(INativeCall call)105         protected void InitializeInternal(INativeCall call)
106         {
107             lock (myLock)
108             {
109                 this.call = call;
110             }
111         }
112 
113         /// <summary>
114         /// Initiates sending a message. Only one send operation can be active at a time.
115         /// </summary>
SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)116         protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
117         {
118             byte[] payload = UnsafeSerialize(msg);
119 
120             lock (myLock)
121             {
122                 GrpcPreconditions.CheckState(started);
123                 var earlyResult = CheckSendAllowedOrEarlyResult();
124                 if (earlyResult != null)
125                 {
126                     return earlyResult;
127                 }
128 
129                 call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent);
130 
131                 initialMetadataSent = true;
132                 streamingWritesCounter++;
133                 streamingWriteTcs = new TaskCompletionSource<object>();
134                 return streamingWriteTcs.Task;
135             }
136         }
137 
138         /// <summary>
139         /// Initiates reading a message. Only one read operation can be active at a time.
140         /// </summary>
ReadMessageInternalAsync()141         protected Task<TRead> ReadMessageInternalAsync()
142         {
143             lock (myLock)
144             {
145                 GrpcPreconditions.CheckState(started);
146                 if (readingDone)
147                 {
148                     // the last read that returns null or throws an exception is idempotent
149                     // and maintains its state.
150                     GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
151                     return streamingReadTcs.Task;
152                 }
153 
154                 GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
155                 GrpcPreconditions.CheckState(!disposed);
156 
157                 call.StartReceiveMessage(ReceivedMessageCallback);
158                 streamingReadTcs = new TaskCompletionSource<TRead>();
159                 return streamingReadTcs.Task;
160             }
161         }
162 
163         /// <summary>
164         /// If there are no more pending actions and no new actions can be started, releases
165         /// the underlying native resources.
166         /// </summary>
ReleaseResourcesIfPossible()167         protected bool ReleaseResourcesIfPossible()
168         {
169             if (!disposed && call != null)
170             {
171                 bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
172                 if (noMoreSendCompletions && readingDone && finished)
173                 {
174                     ReleaseResources();
175                     return true;
176                 }
177             }
178             return false;
179         }
180 
181         protected abstract bool IsClient
182         {
183             get;
184         }
185 
186         /// <summary>
187         /// Returns an exception to throw for a failed send operation.
188         /// It is only allowed to call this method for a call that has already finished.
189         /// </summary>
GetRpcExceptionClientOnly()190         protected abstract Exception GetRpcExceptionClientOnly();
191 
ReleaseResources()192         protected void ReleaseResources()
193         {
194             if (call != null)
195             {
196                 call.Dispose();
197             }
198             disposed = true;
199             OnAfterReleaseResourcesLocked();
200         }
201 
OnAfterReleaseResourcesLocked()202         protected virtual void OnAfterReleaseResourcesLocked()
203         {
204         }
205 
OnAfterReleaseResourcesUnlocked()206         protected virtual void OnAfterReleaseResourcesUnlocked()
207         {
208         }
209 
210         /// <summary>
211         /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
212         /// logic by directly returning the write operation result task. Normally, null is returned.
213         /// </summary>
CheckSendAllowedOrEarlyResult()214         protected abstract Task CheckSendAllowedOrEarlyResult();
215 
UnsafeSerialize(TWrite msg)216         protected byte[] UnsafeSerialize(TWrite msg)
217         {
218             return serializer(msg);
219         }
220 
TryDeserialize(byte[] payload, out TRead msg)221         protected Exception TryDeserialize(byte[] payload, out TRead msg)
222         {
223             try
224             {
225                 msg = deserializer(payload);
226                 return null;
227             }
228             catch (Exception e)
229             {
230                 msg = default(TRead);
231                 return e;
232             }
233         }
234 
235         /// <summary>
236         /// Handles send completion (including SendCloseFromClient).
237         /// </summary>
HandleSendFinished(bool success)238         protected void HandleSendFinished(bool success)
239         {
240             bool delayCompletion = false;
241             TaskCompletionSource<object> origTcs = null;
242             bool releasedResources;
243             lock (myLock)
244             {
245                 if (!success && !finished && IsClient) {
246                     // We should be setting this only once per call, following writes will be short circuited
247                     // because they cannot start until the entire call finishes.
248                     GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);
249 
250                     // leave streamingWriteTcs set, it will be completed once call finished.
251                     isStreamingWriteCompletionDelayed = true;
252                     delayCompletion = true;
253                 }
254                 else
255                 {
256                     origTcs = streamingWriteTcs;
257                     streamingWriteTcs = null;
258                 }
259 
260                 releasedResources = ReleaseResourcesIfPossible();
261             }
262 
263             if (releasedResources)
264             {
265                 OnAfterReleaseResourcesUnlocked();
266             }
267 
268             if (!success)
269             {
270                 if (!delayCompletion)
271                 {
272                     if (IsClient)
273                     {
274                         GrpcPreconditions.CheckState(finished);  // implied by !success && !delayCompletion && IsClient
275                         origTcs.SetException(GetRpcExceptionClientOnly());
276                     }
277                     else
278                     {
279                         origTcs.SetException (new IOException("Error sending from server."));
280                     }
281                 }
282                 // if delayCompletion == true, postpone SetException until call finishes.
283             }
284             else
285             {
286                 origTcs.SetResult(null);
287             }
288         }
289 
290         /// <summary>
291         /// Handles send status from server completion.
292         /// </summary>
HandleSendStatusFromServerFinished(bool success)293         protected void HandleSendStatusFromServerFinished(bool success)
294         {
295             bool releasedResources;
296             lock (myLock)
297             {
298                 releasedResources = ReleaseResourcesIfPossible();
299             }
300 
301             if (releasedResources)
302             {
303                 OnAfterReleaseResourcesUnlocked();
304             }
305 
306             if (!success)
307             {
308                 sendStatusFromServerTcs.SetException(new IOException("Error sending status from server."));
309             }
310             else
311             {
312                 sendStatusFromServerTcs.SetResult(null);
313             }
314         }
315 
316         /// <summary>
317         /// Handles streaming read completion.
318         /// </summary>
HandleReadFinished(bool success, byte[] receivedMessage)319         protected void HandleReadFinished(bool success, byte[] receivedMessage)
320         {
321             // if success == false, received message will be null. It that case we will
322             // treat this completion as the last read an rely on C core to handle the failed
323             // read (e.g. deliver approriate statusCode on the clientside).
324 
325             TRead msg = default(TRead);
326             var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
327 
328             TaskCompletionSource<TRead> origTcs = null;
329             bool releasedResources;
330             lock (myLock)
331             {
332                 origTcs = streamingReadTcs;
333                 if (receivedMessage == null)
334                 {
335                     // This was the last read.
336                     readingDone = true;
337                 }
338 
339                 if (deserializeException != null && IsClient)
340                 {
341                     readingDone = true;
342 
343                     // TODO(jtattermusch): it might be too late to set the status
344                     CancelWithStatus(DeserializeResponseFailureStatus);
345                 }
346 
347                 if (!readingDone)
348                 {
349                     streamingReadTcs = null;
350                 }
351 
352                 releasedResources = ReleaseResourcesIfPossible();
353             }
354 
355             if (releasedResources)
356             {
357                 OnAfterReleaseResourcesUnlocked();
358             }
359 
360             if (deserializeException != null && !IsClient)
361             {
362                 origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
363                 return;
364             }
365             origTcs.SetResult(msg);
366         }
367 
368         protected ISendCompletionCallback SendCompletionCallback => this;
369 
ISendCompletionCallback.OnSendCompletion(bool success)370         void ISendCompletionCallback.OnSendCompletion(bool success)
371         {
372             HandleSendFinished(success);
373         }
374 
375         IReceivedMessageCallback ReceivedMessageCallback => this;
376 
IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)377         void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)
378         {
379             HandleReadFinished(success, receivedMessage);
380         }
381     }
382 }
383