1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Threading;
21 using System.Threading.Tasks;
22 using Grpc.Core.Logging;
23 using Grpc.Core.Profiling;
24 using Grpc.Core.Utils;
25 
26 namespace Grpc.Core.Internal
27 {
28     /// <summary>
29     /// Manages client side native call lifecycle.
30     /// </summary>
31     internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback
32     {
33         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
34 
35         readonly CallInvocationDetails<TRequest, TResponse> details;
36         readonly INativeCall injectedNativeCall;  // for testing
37 
38         bool registeredWithChannel;
39 
40         // Dispose of to de-register cancellation token registration
41         IDisposable cancellationTokenRegistration;
42 
43         // Completion of a pending unary response if not null.
44         TaskCompletionSource<TResponse> unaryResponseTcs;
45 
46         // Completion of a streaming response call if not null.
47         TaskCompletionSource<object> streamingResponseCallFinishedTcs;
48 
49         // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
50         // Response headers set here once received.
51         TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
52 
53         // Set after status is received. Used for both unary and streaming response calls.
54         ClientSideStatus? finishedStatus;
55 
AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)56         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
57             : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
58         {
59             this.details = callDetails.WithOptions(callDetails.Options.Normalize());
60             this.initialMetadataSent = true;  // we always send metadata at the very beginning of the call.
61         }
62 
63         /// <summary>
64         /// This constructor should only be used for testing.
65         /// </summary>
AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall)66         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
67         {
68             this.injectedNativeCall = injectedNativeCall;
69         }
70 
71         // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
72         // it is reusing fair amount of code in this class, so we are leaving it here.
73         /// <summary>
74         /// Blocking unary request - unary response call.
75         /// </summary>
UnaryCall(TRequest msg)76         public TResponse UnaryCall(TRequest msg)
77         {
78             var profiler = Profilers.ForCurrentThread();
79 
80             using (profiler.NewScope("AsyncCall.UnaryCall"))
81             using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync())
82             {
83                 bool callStartedOk = false;
84                 try
85                 {
86                     unaryResponseTcs = new TaskCompletionSource<TResponse>();
87 
88                     lock (myLock)
89                     {
90                         GrpcPreconditions.CheckState(!started);
91                         started = true;
92                         Initialize(cq);
93 
94                         halfcloseRequested = true;
95                         readingDone = true;
96                     }
97 
98                     byte[] payload = UnsafeSerialize(msg);
99 
100                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
101                     {
102                         var ctx = details.Channel.Environment.BatchContextPool.Lease();
103                         try
104                         {
105                             call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
106                             callStartedOk = true;
107 
108                             var ev = cq.Pluck(ctx.Handle);
109                             bool success = (ev.success != 0);
110                             try
111                             {
112                                 using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
113                                 {
114                                     HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
115                                 }
116                             }
117                             catch (Exception e)
118                             {
119                                 Logger.Error(e, "Exception occurred while invoking completion delegate.");
120                             }
121                         }
122                         finally
123                         {
124                             ctx.Recycle();
125                         }
126                     }
127                 }
128                 finally
129                 {
130                     if (!callStartedOk)
131                     {
132                         lock (myLock)
133                         {
134                             OnFailedToStartCallLocked();
135                         }
136                     }
137                 }
138 
139                 // Once the blocking call returns, the result should be available synchronously.
140                 // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
141                 return unaryResponseTcs.Task.GetAwaiter().GetResult();
142             }
143         }
144 
145         /// <summary>
146         /// Starts a unary request - unary response call.
147         /// </summary>
UnaryCallAsync(TRequest msg)148         public Task<TResponse> UnaryCallAsync(TRequest msg)
149         {
150             lock (myLock)
151             {
152                 bool callStartedOk = false;
153                 try
154                 {
155                     GrpcPreconditions.CheckState(!started);
156                     started = true;
157 
158                     Initialize(details.Channel.CompletionQueue);
159 
160                     halfcloseRequested = true;
161                     readingDone = true;
162 
163                     byte[] payload = UnsafeSerialize(msg);
164 
165                     unaryResponseTcs = new TaskCompletionSource<TResponse>();
166                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
167                     {
168                         call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
169                         callStartedOk = true;
170                     }
171 
172                     return unaryResponseTcs.Task;
173                 }
174                 finally
175                 {
176                     if (!callStartedOk)
177                     {
178                         OnFailedToStartCallLocked();
179                     }
180                 }
181             }
182         }
183 
184         /// <summary>
185         /// Starts a streamed request - unary response call.
186         /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
187         /// </summary>
ClientStreamingCallAsync()188         public Task<TResponse> ClientStreamingCallAsync()
189         {
190             lock (myLock)
191             {
192                 bool callStartedOk = false;
193                 try
194                 {
195                     GrpcPreconditions.CheckState(!started);
196                     started = true;
197 
198                     Initialize(details.Channel.CompletionQueue);
199 
200                     readingDone = true;
201 
202                     unaryResponseTcs = new TaskCompletionSource<TResponse>();
203                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
204                     {
205                         call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
206                         callStartedOk = true;
207                     }
208 
209                     return unaryResponseTcs.Task;
210                 }
211                 finally
212                 {
213                     if (!callStartedOk)
214                     {
215                         OnFailedToStartCallLocked();
216                     }
217                 }
218             }
219         }
220 
221         /// <summary>
222         /// Starts a unary request - streamed response call.
223         /// </summary>
StartServerStreamingCall(TRequest msg)224         public void StartServerStreamingCall(TRequest msg)
225         {
226             lock (myLock)
227             {
228                 bool callStartedOk = false;
229                 try
230                 {
231                     GrpcPreconditions.CheckState(!started);
232                     started = true;
233 
234                     Initialize(details.Channel.CompletionQueue);
235 
236                     halfcloseRequested = true;
237 
238                     byte[] payload = UnsafeSerialize(msg);
239 
240                     streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
241                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
242                     {
243                         call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
244                         callStartedOk = true;
245                     }
246                     call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
247                 }
248                 finally
249                 {
250                     if (!callStartedOk)
251                     {
252                         OnFailedToStartCallLocked();
253                     }
254                 }
255             }
256         }
257 
258         /// <summary>
259         /// Starts a streaming request - streaming response call.
260         /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
261         /// </summary>
StartDuplexStreamingCall()262         public void StartDuplexStreamingCall()
263         {
264             lock (myLock)
265             {
266                 bool callStartedOk = false;
267                 try
268                 {
269                     GrpcPreconditions.CheckState(!started);
270                     started = true;
271 
272                     Initialize(details.Channel.CompletionQueue);
273 
274                     streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
275                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
276                     {
277                         call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
278                         callStartedOk = true;
279                     }
280                     call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
281                 }
282                 finally
283                 {
284                     if (!callStartedOk)
285                     {
286                         OnFailedToStartCallLocked();
287                     }
288                 }
289             }
290         }
291 
292         /// <summary>
293         /// Sends a streaming request. Only one pending send action is allowed at any given time.
294         /// </summary>
SendMessageAsync(TRequest msg, WriteFlags writeFlags)295         public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
296         {
297             return SendMessageInternalAsync(msg, writeFlags);
298         }
299 
300         /// <summary>
301         /// Receives a streaming response. Only one pending read action is allowed at any given time.
302         /// </summary>
ReadMessageAsync()303         public Task<TResponse> ReadMessageAsync()
304         {
305             return ReadMessageInternalAsync();
306         }
307 
308         /// <summary>
309         /// Sends halfclose, indicating client is done with streaming requests.
310         /// Only one pending send action is allowed at any given time.
311         /// </summary>
SendCloseFromClientAsync()312         public Task SendCloseFromClientAsync()
313         {
314             lock (myLock)
315             {
316                 GrpcPreconditions.CheckState(started);
317 
318                 var earlyResult = CheckSendPreconditionsClientSide();
319                 if (earlyResult != null)
320                 {
321                     return earlyResult;
322                 }
323 
324                 if (disposed || finished)
325                 {
326                     // In case the call has already been finished by the serverside,
327                     // the halfclose has already been done implicitly, so just return
328                     // completed task here.
329                     halfcloseRequested = true;
330                     return TaskUtils.CompletedTask;
331                 }
332                 call.StartSendCloseFromClient(SendCompletionCallback);
333 
334                 halfcloseRequested = true;
335                 streamingWriteTcs = new TaskCompletionSource<object>();
336                 return streamingWriteTcs.Task;
337             }
338         }
339 
340         /// <summary>
341         /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise.
342         /// </summary>
343         public Task StreamingResponseCallFinishedTask
344         {
345             get
346             {
347                 return streamingResponseCallFinishedTcs.Task;
348             }
349         }
350 
351         /// <summary>
352         /// Get the task that completes once response headers are received.
353         /// </summary>
354         public Task<Metadata> ResponseHeadersAsync
355         {
356             get
357             {
358                 return responseHeadersTcs.Task;
359             }
360         }
361 
362         /// <summary>
363         /// Gets the resulting status if the call has already finished.
364         /// Throws InvalidOperationException otherwise.
365         /// </summary>
GetStatus()366         public Status GetStatus()
367         {
368             lock (myLock)
369             {
370                 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished.");
371                 return finishedStatus.Value.Status;
372             }
373         }
374 
375         /// <summary>
376         /// Gets the trailing metadata if the call has already finished.
377         /// Throws InvalidOperationException otherwise.
378         /// </summary>
GetTrailers()379         public Metadata GetTrailers()
380         {
381             lock (myLock)
382             {
383                 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished.");
384                 return finishedStatus.Value.Trailers;
385             }
386         }
387 
388         public CallInvocationDetails<TRequest, TResponse> Details
389         {
390             get
391             {
392                 return this.details;
393             }
394         }
395 
OnAfterReleaseResourcesLocked()396         protected override void OnAfterReleaseResourcesLocked()
397         {
398             if (registeredWithChannel)
399             {
400                 details.Channel.RemoveCallReference(this);
401                 registeredWithChannel = false;
402             }
403         }
404 
OnAfterReleaseResourcesUnlocked()405         protected override void OnAfterReleaseResourcesUnlocked()
406         {
407             // If cancellation callback is in progress, this can block
408             // so we need to do this outside of call's lock to prevent
409             // deadlock.
410             // See https://github.com/grpc/grpc/issues/14777
411             // See https://github.com/dotnet/corefx/issues/14903
412             cancellationTokenRegistration?.Dispose();
413         }
414 
415         protected override bool IsClient
416         {
417             get { return true; }
418         }
419 
GetRpcExceptionClientOnly()420         protected override Exception GetRpcExceptionClientOnly()
421         {
422             return new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers);
423         }
424 
CheckSendAllowedOrEarlyResult()425         protected override Task CheckSendAllowedOrEarlyResult()
426         {
427             var earlyResult = CheckSendPreconditionsClientSide();
428             if (earlyResult != null)
429             {
430                 return earlyResult;
431             }
432 
433             if (finishedStatus.HasValue)
434             {
435                 // throwing RpcException if we already received status on client
436                 // side makes the most sense.
437                 // Note that this throws even for StatusCode.OK.
438                 // Writing after the call has finished is not a programming error because server can close
439                 // the call anytime, so don't throw directly, but let the write task finish with an error.
440                 var tcs = new TaskCompletionSource<object>();
441                 tcs.SetException(new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers));
442                 return tcs.Task;
443             }
444 
445             return null;
446         }
447 
CheckSendPreconditionsClientSide()448         private Task CheckSendPreconditionsClientSide()
449         {
450             GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
451             GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
452 
453             if (cancelRequested)
454             {
455                 // Return a cancelled task.
456                 var tcs = new TaskCompletionSource<object>();
457                 tcs.SetCanceled();
458                 return tcs.Task;
459             }
460 
461             return null;
462         }
463 
Initialize(CompletionQueueSafeHandle cq)464         private void Initialize(CompletionQueueSafeHandle cq)
465         {
466             var call = CreateNativeCall(cq);
467 
468             details.Channel.AddCallReference(this);
469             registeredWithChannel = true;
470             InitializeInternal(call);
471 
472             RegisterCancellationCallback();
473         }
474 
OnFailedToStartCallLocked()475         private void OnFailedToStartCallLocked()
476         {
477             ReleaseResources();
478 
479             // We need to execute the hook that disposes the cancellation token
480             // registration, but it cannot be done from under a lock.
481             // To make things simple, we just schedule the unregistering
482             // on a threadpool.
483             // - Once the native call is disposed, the Cancel() calls are ignored anyway
484             // - We don't care about the overhead as OnFailedToStartCallLocked() only happens
485             //   when something goes very bad when initializing a call and that should
486             //   never happen when gRPC is used correctly.
487             ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked());
488         }
489 
CreateNativeCall(CompletionQueueSafeHandle cq)490         private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
491         {
492             if (injectedNativeCall != null)
493             {
494                 return injectedNativeCall;  // allows injecting a mock INativeCall in tests.
495             }
496 
497             var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
498 
499             var credentials = details.Options.Credentials;
500             using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
501             {
502                 var result = details.Channel.Handle.CreateCall(
503                              parentCall, ContextPropagationToken.DefaultMask, cq,
504                              details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
505                 return result;
506             }
507         }
508 
509         // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
RegisterCancellationCallback()510         private void RegisterCancellationCallback()
511         {
512             var token = details.Options.CancellationToken;
513             if (token.CanBeCanceled)
514             {
515                 cancellationTokenRegistration = token.Register(() => this.Cancel());
516             }
517         }
518 
519         /// <summary>
520         /// Gets WriteFlags set in callDetails.Options.WriteOptions
521         /// </summary>
GetWriteFlagsForCall()522         private WriteFlags GetWriteFlagsForCall()
523         {
524             var writeOptions = details.Options.WriteOptions;
525             return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
526         }
527 
528         /// <summary>
529         /// Handles receive status completion for calls with streaming response.
530         /// </summary>
HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)531         private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
532         {
533             // TODO(jtattermusch): handle success==false
534             responseHeadersTcs.SetResult(responseHeaders);
535         }
536 
537         /// <summary>
538         /// Handler for unary response completion.
539         /// </summary>
HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)540         private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
541         {
542             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
543             // success will be always set to true.
544 
545             TaskCompletionSource<object> delayedStreamingWriteTcs = null;
546             TResponse msg = default(TResponse);
547             var deserializeException = TryDeserialize(receivedMessage, out msg);
548 
549             bool releasedResources;
550             lock (myLock)
551             {
552                 finished = true;
553 
554                 if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
555                 {
556                     receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
557                 }
558                 finishedStatus = receivedStatus;
559 
560                 if (isStreamingWriteCompletionDelayed)
561                 {
562                     delayedStreamingWriteTcs = streamingWriteTcs;
563                     streamingWriteTcs = null;
564                 }
565 
566                 releasedResources = ReleaseResourcesIfPossible();
567             }
568 
569             if (releasedResources)
570             {
571                 OnAfterReleaseResourcesUnlocked();
572             }
573 
574             responseHeadersTcs.SetResult(responseHeaders);
575 
576             if (delayedStreamingWriteTcs != null)
577             {
578                 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
579             }
580 
581             var status = receivedStatus.Status;
582             if (status.StatusCode != StatusCode.OK)
583             {
584                 unaryResponseTcs.SetException(new RpcException(status, receivedStatus.Trailers));
585                 return;
586             }
587 
588             unaryResponseTcs.SetResult(msg);
589         }
590 
591         /// <summary>
592         /// Handles receive status completion for calls with streaming response.
593         /// </summary>
HandleFinished(bool success, ClientSideStatus receivedStatus)594         private void HandleFinished(bool success, ClientSideStatus receivedStatus)
595         {
596             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
597             // success will be always set to true.
598 
599             TaskCompletionSource<object> delayedStreamingWriteTcs = null;
600 
601             bool releasedResources;
602             lock (myLock)
603             {
604                 finished = true;
605                 finishedStatus = receivedStatus;
606                 if (isStreamingWriteCompletionDelayed)
607                 {
608                     delayedStreamingWriteTcs = streamingWriteTcs;
609                     streamingWriteTcs = null;
610                 }
611 
612                 releasedResources = ReleaseResourcesIfPossible();
613             }
614 
615             if (releasedResources)
616             {
617                 OnAfterReleaseResourcesUnlocked();
618             }
619 
620             if (delayedStreamingWriteTcs != null)
621             {
622                 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
623             }
624 
625             var status = receivedStatus.Status;
626             if (status.StatusCode != StatusCode.OK)
627             {
628                 streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers));
629                 return;
630             }
631 
632             streamingResponseCallFinishedTcs.SetResult(null);
633         }
634 
635         IUnaryResponseClientCallback UnaryResponseClientCallback => this;
636 
IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)637         void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
638         {
639             HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders);
640         }
641 
642         IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
643 
IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)644         void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)
645         {
646             HandleFinished(success, receivedStatus);
647         }
648 
649         IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this;
650 
IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)651         void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)
652         {
653             HandleReceivedResponseHeaders(success, responseHeaders);
654         }
655     }
656 }
657