1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Diagnostics;
21 using System.IO;
22 using System.Runtime.CompilerServices;
23 using System.Runtime.InteropServices;
24 using System.Threading;
25 using System.Threading.Tasks;
26 using Grpc.Core.Internal;
27 using Grpc.Core.Utils;
28 
29 namespace Grpc.Core.Internal
30 {
31     /// <summary>
32     /// Manages server side native call lifecycle.
33     /// </summary>
34     internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback
35     {
36         readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
37         readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
38         readonly Server server;
39 
AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server)40         public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
41         {
42             this.server = GrpcPreconditions.CheckNotNull(server);
43         }
44 
Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)45         public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
46         {
47             call.Initialize(completionQueue);
48 
49             server.AddCallReference(this);
50             InitializeInternal(call);
51         }
52 
53         /// <summary>
54         /// Only for testing purposes.
55         /// </summary>
InitializeForTesting(INativeCall call)56         public void InitializeForTesting(INativeCall call)
57         {
58             server.AddCallReference(this);
59             InitializeInternal(call);
60         }
61 
62         /// <summary>
63         /// Starts a server side call.
64         /// </summary>
ServerSideCallAsync()65         public Task ServerSideCallAsync()
66         {
67             lock (myLock)
68             {
69                 GrpcPreconditions.CheckNotNull(call);
70 
71                 started = true;
72 
73                 call.StartServerSide(ReceiveCloseOnServerCallback);
74                 return finishedServersideTcs.Task;
75             }
76         }
77 
78         /// <summary>
79         /// Sends a streaming response. Only one pending send action is allowed at any given time.
80         /// </summary>
SendMessageAsync(TResponse msg, WriteFlags writeFlags)81         public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
82         {
83             return SendMessageInternalAsync(msg, writeFlags);
84         }
85 
86         /// <summary>
87         /// Receives a streaming request. Only one pending read action is allowed at any given time.
88         /// </summary>
ReadMessageAsync()89         public Task<TRequest> ReadMessageAsync()
90         {
91             return ReadMessageInternalAsync();
92         }
93 
94         /// <summary>
95         /// Initiates sending a initial metadata.
96         /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
97         /// to make things simpler.
98         /// </summary>
SendInitialMetadataAsync(Metadata headers)99         public Task SendInitialMetadataAsync(Metadata headers)
100         {
101             lock (myLock)
102             {
103                 GrpcPreconditions.CheckNotNull(headers, "metadata");
104 
105                 GrpcPreconditions.CheckState(started);
106                 GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
107                 GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
108 
109                 var earlyResult = CheckSendAllowedOrEarlyResult();
110                 if (earlyResult != null)
111                 {
112                     return earlyResult;
113                 }
114 
115                 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
116                 {
117                     call.StartSendInitialMetadata(SendCompletionCallback, metadataArray);
118                 }
119 
120                 this.initialMetadataSent = true;
121                 streamingWriteTcs = new TaskCompletionSource<object>();
122                 return streamingWriteTcs.Task;
123             }
124         }
125 
126         /// <summary>
127         /// Sends call result status, indicating we are done with writes.
128         /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
129         /// </summary>
SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)130         public Task SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)
131         {
132             byte[] payload = optionalWrite.HasValue ? UnsafeSerialize(optionalWrite.Value.Response) : null;
133             var writeFlags = optionalWrite.HasValue ? optionalWrite.Value.WriteFlags : default(WriteFlags);
134 
135             lock (myLock)
136             {
137                 GrpcPreconditions.CheckState(started);
138                 GrpcPreconditions.CheckState(!disposed);
139                 GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
140 
141                 using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
142                 {
143                     call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent,
144                         payload, writeFlags);
145                 }
146                 halfcloseRequested = true;
147                 initialMetadataSent = true;
148                 sendStatusFromServerTcs = new TaskCompletionSource<object>();
149                 if (optionalWrite.HasValue)
150                 {
151                     streamingWritesCounter++;
152                 }
153                 return sendStatusFromServerTcs.Task;
154             }
155         }
156 
157         /// <summary>
158         /// Gets cancellation token that gets cancelled once close completion
159         /// is received and the cancelled flag is set.
160         /// </summary>
161         public CancellationToken CancellationToken
162         {
163             get
164             {
165                 return cancellationTokenSource.Token;
166             }
167         }
168 
169         public string Peer
170         {
171             get
172             {
173                 return call.GetPeer();
174             }
175         }
176 
177         protected override bool IsClient
178         {
179             get { return false; }
180         }
181 
GetRpcExceptionClientOnly()182         protected override Exception GetRpcExceptionClientOnly()
183         {
184             throw new InvalidOperationException("Call be only called for client calls");
185         }
186 
OnAfterReleaseResourcesLocked()187         protected override void OnAfterReleaseResourcesLocked()
188         {
189             server.RemoveCallReference(this);
190         }
191 
CheckSendAllowedOrEarlyResult()192         protected override Task CheckSendAllowedOrEarlyResult()
193         {
194             GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
195             GrpcPreconditions.CheckState(!finished, "Already finished.");
196             GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
197             GrpcPreconditions.CheckState(!disposed);
198 
199             return null;
200         }
201 
202         /// <summary>
203         /// Handles the server side close completion.
204         /// </summary>
HandleFinishedServerside(bool success, bool cancelled)205         private void HandleFinishedServerside(bool success, bool cancelled)
206         {
207             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
208             // success will be always set to true.
209             bool releasedResources;
210             lock (myLock)
211             {
212                 finished = true;
213                 if (streamingReadTcs == null)
214                 {
215                     // if there's no pending read, readingDone=true will dispose now.
216                     // if there is a pending read, we will dispose once that read finishes.
217                     readingDone = true;
218                     streamingReadTcs = new TaskCompletionSource<TRequest>();
219                     streamingReadTcs.SetResult(default(TRequest));
220                 }
221                 releasedResources = ReleaseResourcesIfPossible();
222             }
223 
224             if (releasedResources)
225             {
226                 OnAfterReleaseResourcesUnlocked();
227             }
228 
229             if (cancelled)
230             {
231                 cancellationTokenSource.Cancel();
232             }
233 
234             finishedServersideTcs.SetResult(null);
235         }
236 
237         IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this;
238 
IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)239         void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)
240         {
241             HandleFinishedServerside(success, cancelled);
242         }
243 
244         ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this;
245 
ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)246         void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)
247         {
248             HandleSendStatusFromServerFinished(success);
249         }
250 
251         public struct ResponseWithFlags
252         {
ResponseWithFlagsGrpc.Core.Internal.AsyncCallServer.ResponseWithFlags253             public ResponseWithFlags(TResponse response, WriteFlags writeFlags)
254             {
255                 this.Response = response;
256                 this.WriteFlags = writeFlags;
257             }
258 
259             public TResponse Response { get; }
260             public WriteFlags WriteFlags { get; }
261         }
262     }
263 }
264