1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Linq;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using Grpc.Core.Interceptors;
25 using Grpc.Core.Internal;
26 using Grpc.Core.Logging;
27 using Grpc.Core.Utils;
28 
29 namespace Grpc.Core.Internal
30 {
31     internal interface IServerCallHandler
32     {
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)33         Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
Intercept(Interceptor interceptor)34         IServerCallHandler Intercept(Interceptor interceptor);
35     }
36 
37     internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
38         where TRequest : class
39         where TResponse : class
40     {
41         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<UnaryServerCallHandler<TRequest, TResponse>>();
42 
43         readonly Method<TRequest, TResponse> method;
44         readonly UnaryServerMethod<TRequest, TResponse> handler;
45 
UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)46         public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)
47         {
48             this.method = method;
49             this.handler = handler;
50         }
51 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)52         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
53         {
54             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
55                 method.ResponseMarshaller.Serializer,
56                 method.RequestMarshaller.Deserializer,
57                 newRpc.Server);
58 
59             asyncCall.Initialize(newRpc.Call, cq);
60             var finishedTask = asyncCall.ServerSideCallAsync();
61             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
62             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
63 
64             Status status;
65             AsyncCallServer<TRequest,TResponse>.ResponseWithFlags? responseWithFlags = null;
66             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
67             try
68             {
69                 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
70                 var request = requestStream.Current;
71                 var response = await handler(request, context).ConfigureAwait(false);
72                 status = context.Status;
73                 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
74             }
75             catch (Exception e)
76             {
77                 if (!(e is RpcException))
78                 {
79                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
80                 }
81                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
82             }
83             try
84             {
85                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false);
86             }
87             catch (Exception)
88             {
89                 asyncCall.Cancel();
90                 throw;
91             }
92             await finishedTask.ConfigureAwait(false);
93         }
94 
Intercept(Interceptor interceptor)95         public IServerCallHandler Intercept(Interceptor interceptor)
96         {
97             return new UnaryServerCallHandler<TRequest, TResponse>(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler));
98         }
99     }
100 
101     internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
102         where TRequest : class
103         where TResponse : class
104     {
105         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerStreamingServerCallHandler<TRequest, TResponse>>();
106 
107         readonly Method<TRequest, TResponse> method;
108         readonly ServerStreamingServerMethod<TRequest, TResponse> handler;
109 
ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)110         public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)
111         {
112             this.method = method;
113             this.handler = handler;
114         }
115 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)116         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
117         {
118             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
119                 method.ResponseMarshaller.Serializer,
120                 method.RequestMarshaller.Deserializer,
121                 newRpc.Server);
122 
123             asyncCall.Initialize(newRpc.Call, cq);
124             var finishedTask = asyncCall.ServerSideCallAsync();
125             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
126             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
127 
128             Status status;
129             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
130             try
131             {
132                 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
133                 var request = requestStream.Current;
134                 await handler(request, responseStream, context).ConfigureAwait(false);
135                 status = context.Status;
136             }
137             catch (Exception e)
138             {
139                 if (!(e is RpcException))
140                 {
141                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
142                 }
143                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
144             }
145 
146             try
147             {
148                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
149             }
150             catch (Exception)
151             {
152                 asyncCall.Cancel();
153                 throw;
154             }
155             await finishedTask.ConfigureAwait(false);
156         }
157 
Intercept(Interceptor interceptor)158         public IServerCallHandler Intercept(Interceptor interceptor)
159         {
160             return new ServerStreamingServerCallHandler<TRequest, TResponse>(method, (request, responseStream, context) => interceptor.ServerStreamingServerHandler(request, responseStream, context, handler));
161         }
162     }
163 
164     internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
165         where TRequest : class
166         where TResponse : class
167     {
168         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientStreamingServerCallHandler<TRequest, TResponse>>();
169 
170         readonly Method<TRequest, TResponse> method;
171         readonly ClientStreamingServerMethod<TRequest, TResponse> handler;
172 
ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)173         public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)
174         {
175             this.method = method;
176             this.handler = handler;
177         }
178 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)179         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
180         {
181             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
182                 method.ResponseMarshaller.Serializer,
183                 method.RequestMarshaller.Deserializer,
184                 newRpc.Server);
185 
186             asyncCall.Initialize(newRpc.Call, cq);
187             var finishedTask = asyncCall.ServerSideCallAsync();
188             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
189             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
190 
191             Status status;
192             AsyncCallServer<TRequest, TResponse>.ResponseWithFlags? responseWithFlags = null;
193             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
194             try
195             {
196                 var response = await handler(requestStream, context).ConfigureAwait(false);
197                 status = context.Status;
198                 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
199             }
200             catch (Exception e)
201             {
202                 if (!(e is RpcException))
203                 {
204                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
205                 }
206                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
207             }
208 
209             try
210             {
211                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false);
212             }
213             catch (Exception)
214             {
215                 asyncCall.Cancel();
216                 throw;
217             }
218             await finishedTask.ConfigureAwait(false);
219         }
220 
Intercept(Interceptor interceptor)221         public IServerCallHandler Intercept(Interceptor interceptor)
222         {
223             return new ClientStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, context) => interceptor.ClientStreamingServerHandler(requestStream, context, handler));
224         }
225     }
226 
227     internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
228         where TRequest : class
229         where TResponse : class
230     {
231         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<DuplexStreamingServerCallHandler<TRequest, TResponse>>();
232 
233         readonly Method<TRequest, TResponse> method;
234         readonly DuplexStreamingServerMethod<TRequest, TResponse> handler;
235 
DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)236         public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)
237         {
238             this.method = method;
239             this.handler = handler;
240         }
241 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)242         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
243         {
244             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
245                 method.ResponseMarshaller.Serializer,
246                 method.RequestMarshaller.Deserializer,
247                 newRpc.Server);
248 
249             asyncCall.Initialize(newRpc.Call, cq);
250             var finishedTask = asyncCall.ServerSideCallAsync();
251             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
252             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
253 
254             Status status;
255             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
256             try
257             {
258                 await handler(requestStream, responseStream, context).ConfigureAwait(false);
259                 status = context.Status;
260             }
261             catch (Exception e)
262             {
263                 if (!(e is RpcException))
264                 {
265                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
266                 }
267                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
268             }
269             try
270             {
271                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
272             }
273             catch (Exception)
274             {
275                 asyncCall.Cancel();
276                 throw;
277             }
278             await finishedTask.ConfigureAwait(false);
279         }
280 
Intercept(Interceptor interceptor)281         public IServerCallHandler Intercept(Interceptor interceptor)
282         {
283             return new DuplexStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, responseStream, context) => interceptor.DuplexStreamingServerHandler(requestStream, responseStream, context, handler));
284         }
285     }
286 
287     internal class UnimplementedMethodCallHandler : IServerCallHandler
288     {
289         public static readonly UnimplementedMethodCallHandler Instance = new UnimplementedMethodCallHandler();
290 
291         DuplexStreamingServerCallHandler<byte[], byte[]> callHandlerImpl;
292 
UnimplementedMethodCallHandler()293         public UnimplementedMethodCallHandler()
294         {
295             var marshaller = new Marshaller<byte[]>((payload) => payload, (payload) => payload);
296             var method = new Method<byte[], byte[]>(MethodType.DuplexStreaming, "", "", marshaller, marshaller);
297             this.callHandlerImpl = new DuplexStreamingServerCallHandler<byte[], byte[]>(method, new DuplexStreamingServerMethod<byte[], byte[]>(UnimplementedMethod));
298         }
299 
300         /// <summary>
301         /// Handler used for unimplemented method.
302         /// </summary>
UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)303         private Task UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)
304         {
305             ctx.Status = new Status(StatusCode.Unimplemented, "");
306             return TaskUtils.CompletedTask;
307         }
308 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)309         public Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
310         {
311             return callHandlerImpl.HandleCall(newRpc, cq);
312         }
313 
Intercept(Interceptor interceptor)314         public IServerCallHandler Intercept(Interceptor interceptor)
315         {
316             return this;  // Do not intercept unimplemented methods.
317         }
318     }
319 
320     internal static class HandlerUtils
321     {
GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)322         public static Status GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)
323         {
324             var rpcException = e as RpcException;
325             if (rpcException != null)
326             {
327                 // There are two sources of metadata entries on the server-side:
328                 // 1. serverCallContext.ResponseTrailers
329                 // 2. trailers in RpcException thrown by user code in server side handler.
330                 // As metadata allows duplicate keys, the logical thing to do is
331                 // to just merge trailers from RpcException into serverCallContext.ResponseTrailers.
332                 foreach (var entry in rpcException.Trailers)
333                 {
334                     callContextResponseTrailers.Add(entry);
335                 }
336                 // use the status thrown by handler.
337                 return rpcException.Status;
338             }
339 
340             return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
341         }
342 
GetWriteFlags(WriteOptions writeOptions)343         public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
344         {
345             return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
346         }
347 
348         public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
349             where TRequest : class
350             where TResponse : class
351         {
352             DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
353 
354             return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline,
355                 newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
356         }
357     }
358 }
359