1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
21 
22 #include <grpc/impl/codegen/grpc_types.h>
23 #include <grpcpp/impl/codegen/byte_buffer.h>
24 #include <grpcpp/impl/codegen/call_hook.h>
25 #include <grpcpp/impl/codegen/completion_queue_tag.h>
26 #include <grpcpp/impl/codegen/core_codegen_interface.h>
27 #include <grpcpp/impl/codegen/rpc_service_method.h>
28 
29 namespace grpc {
30 
31 class AsyncGenericService;
32 class Channel;
33 class GenericServerContext;
34 class ServerCompletionQueue;
35 class ServerContext;
36 class ServerCredentials;
37 class Service;
38 
39 extern CoreCodegenInterface* g_core_codegen_interface;
40 
41 /// Models a gRPC server.
42 ///
43 /// Servers are configured and started via \a grpc::ServerBuilder.
44 namespace internal {
45 class ServerAsyncStreamingInterface;
46 }  // namespace internal
47 
48 class ServerInterface : public internal::CallHook {
49  public:
~ServerInterface()50   virtual ~ServerInterface() {}
51 
52   /// \a Shutdown does the following things:
53   ///
54   /// 1. Shutdown the server: deactivate all listening ports, mark it in
55   ///    "shutdown mode" so that further call Request's or incoming RPC matches
56   ///    are no longer allowed. Also return all Request'ed-but-not-yet-active
57   ///    calls as failed (!ok). This refers to calls that have been requested
58   ///    at the server by the server-side library or application code but that
59   ///    have not yet been matched to incoming RPCs from the client. Note that
60   ///    this would even include default calls added automatically by the gRPC
61   ///    C++ API without the user's input (e.g., "Unimplemented RPC method")
62   ///
63   /// 2. Block until all rpc method handlers invoked automatically by the sync
64   ///    API finish.
65   ///
66   /// 3. If all pending calls complete (and all their operations are
67   ///    retrieved by Next) before \a deadline expires, this finishes
68   ///    gracefully. Otherwise, forcefully cancel all pending calls associated
69   ///    with the server after \a deadline expires. In the case of the sync API,
70   ///    if the RPC function for a streaming call has already been started and
71   ///    takes a week to complete, the RPC function won't be forcefully
72   ///    terminated (since that would leave state corrupt and incomplete) and
73   ///    the method handler will just keep running (which will prevent the
74   ///    server from completing the "join" operation that it needs to do at
75   ///    shutdown time).
76   ///
77   /// All completion queue associated with the server (for example, for async
78   /// serving) must be shutdown *after* this method has returned:
79   /// See \a ServerBuilder::AddCompletionQueue for details.
80   /// They must also be drained (by repeated Next) after being shutdown.
81   ///
82   /// \param deadline How long to wait until pending rpcs are forcefully
83   /// terminated.
84   template <class T>
Shutdown(const T & deadline)85   void Shutdown(const T& deadline) {
86     ShutdownInternal(TimePoint<T>(deadline).raw_time());
87   }
88 
89   /// Shutdown the server without a deadline and forced cancellation.
90   ///
91   /// All completion queue associated with the server (for example, for async
92   /// serving) must be shutdown *after* this method has returned:
93   /// See \a ServerBuilder::AddCompletionQueue for details.
Shutdown()94   void Shutdown() {
95     ShutdownInternal(
96         g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC));
97   }
98 
99   /// Block waiting for all work to complete.
100   ///
101   /// \warning The server must be either shutting down or some other thread must
102   /// call \a Shutdown for this function to ever return.
103   virtual void Wait() = 0;
104 
105  protected:
106   friend class ::grpc::Service;
107 
108   /// Register a service. This call does not take ownership of the service.
109   /// The service must exist for the lifetime of the Server instance.
110   virtual bool RegisterService(const grpc::string* host, Service* service) = 0;
111 
112   /// Register a generic service. This call does not take ownership of the
113   /// service. The service must exist for the lifetime of the Server instance.
114   virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
115 
116   /// Tries to bind \a server to the given \a addr.
117   ///
118   /// It can be invoked multiple times.
119   ///
120   /// \param addr The address to try to bind to the server (eg, localhost:1234,
121   /// 192.168.1.1:31416, [::1]:27182, etc.).
122   /// \params creds The credentials associated with the server.
123   ///
124   /// \return bound port number on sucess, 0 on failure.
125   ///
126   /// \warning It's an error to call this method on an already started server.
127   virtual int AddListeningPort(const grpc::string& addr,
128                                ServerCredentials* creds) = 0;
129 
130   /// Start the server.
131   ///
132   /// \param cqs Completion queues for handling asynchronous services. The
133   /// caller is required to keep all completion queues live until the server is
134   /// destroyed.
135   /// \param num_cqs How many completion queues does \a cqs hold.
136   virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
137 
138   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
139 
140   virtual int max_receive_message_size() const = 0;
141 
142   virtual grpc_server* server() = 0;
143 
144   virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
145                                 internal::Call* call) = 0;
146 
147   class BaseAsyncRequest : public internal::CompletionQueueTag {
148    public:
149     BaseAsyncRequest(ServerInterface* server, ServerContext* context,
150                      internal::ServerAsyncStreamingInterface* stream,
151                      CompletionQueue* call_cq, void* tag,
152                      bool delete_on_finalize);
153     virtual ~BaseAsyncRequest();
154 
155     bool FinalizeResult(void** tag, bool* status) override;
156 
157    protected:
158     ServerInterface* const server_;
159     ServerContext* const context_;
160     internal::ServerAsyncStreamingInterface* const stream_;
161     CompletionQueue* const call_cq_;
162     void* const tag_;
163     const bool delete_on_finalize_;
164     grpc_call* call_;
165   };
166 
167   class RegisteredAsyncRequest : public BaseAsyncRequest {
168    public:
169     RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
170                            internal::ServerAsyncStreamingInterface* stream,
171                            CompletionQueue* call_cq, void* tag);
172 
173     // uses BaseAsyncRequest::FinalizeResult
174 
175    protected:
176     void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
177                       ServerCompletionQueue* notification_cq);
178   };
179 
180   class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
181    public:
NoPayloadAsyncRequest(void * registered_method,ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag)182     NoPayloadAsyncRequest(void* registered_method, ServerInterface* server,
183                           ServerContext* context,
184                           internal::ServerAsyncStreamingInterface* stream,
185                           CompletionQueue* call_cq,
186                           ServerCompletionQueue* notification_cq, void* tag)
187         : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
188       IssueRequest(registered_method, nullptr, notification_cq);
189     }
190 
191     // uses RegisteredAsyncRequest::FinalizeResult
192   };
193 
194   template <class Message>
195   class PayloadAsyncRequest final : public RegisteredAsyncRequest {
196    public:
PayloadAsyncRequest(void * registered_method,ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,Message * request)197     PayloadAsyncRequest(void* registered_method, ServerInterface* server,
198                         ServerContext* context,
199                         internal::ServerAsyncStreamingInterface* stream,
200                         CompletionQueue* call_cq,
201                         ServerCompletionQueue* notification_cq, void* tag,
202                         Message* request)
203         : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
204           registered_method_(registered_method),
205           server_(server),
206           context_(context),
207           stream_(stream),
208           call_cq_(call_cq),
209           notification_cq_(notification_cq),
210           tag_(tag),
211           request_(request) {
212       IssueRequest(registered_method, payload_.bbuf_ptr(), notification_cq);
213     }
214 
~PayloadAsyncRequest()215     ~PayloadAsyncRequest() {
216       payload_.Release();  // We do not own the payload_
217     }
218 
FinalizeResult(void ** tag,bool * status)219     bool FinalizeResult(void** tag, bool* status) override {
220       if (*status) {
221         if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
222                                       payload_.bbuf_ptr(), request_)
223                                       .ok()) {
224           // If deserialization fails, we cancel the call and instantiate
225           // a new instance of ourselves to request another call.  We then
226           // return false, which prevents the call from being returned to
227           // the application.
228           g_core_codegen_interface->grpc_call_cancel_with_status(
229               call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
230           g_core_codegen_interface->grpc_call_unref(call_);
231           new PayloadAsyncRequest(registered_method_, server_, context_,
232                                   stream_, call_cq_, notification_cq_, tag_,
233                                   request_);
234           delete this;
235           return false;
236         }
237       }
238       return RegisteredAsyncRequest::FinalizeResult(tag, status);
239     }
240 
241    private:
242     void* const registered_method_;
243     ServerInterface* const server_;
244     ServerContext* const context_;
245     internal::ServerAsyncStreamingInterface* const stream_;
246     CompletionQueue* const call_cq_;
247     ServerCompletionQueue* const notification_cq_;
248     void* const tag_;
249     Message* const request_;
250     ByteBuffer payload_;
251   };
252 
253   class GenericAsyncRequest : public BaseAsyncRequest {
254    public:
255     GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
256                         internal::ServerAsyncStreamingInterface* stream,
257                         CompletionQueue* call_cq,
258                         ServerCompletionQueue* notification_cq, void* tag,
259                         bool delete_on_finalize);
260 
261     bool FinalizeResult(void** tag, bool* status) override;
262 
263    private:
264     grpc_call_details call_details_;
265   };
266 
267   template <class Message>
RequestAsyncCall(internal::RpcServiceMethod * method,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,Message * message)268   void RequestAsyncCall(internal::RpcServiceMethod* method,
269                         ServerContext* context,
270                         internal::ServerAsyncStreamingInterface* stream,
271                         CompletionQueue* call_cq,
272                         ServerCompletionQueue* notification_cq, void* tag,
273                         Message* message) {
274     GPR_CODEGEN_ASSERT(method);
275     new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
276                                      stream, call_cq, notification_cq, tag,
277                                      message);
278   }
279 
RequestAsyncCall(internal::RpcServiceMethod * method,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag)280   void RequestAsyncCall(internal::RpcServiceMethod* method,
281                         ServerContext* context,
282                         internal::ServerAsyncStreamingInterface* stream,
283                         CompletionQueue* call_cq,
284                         ServerCompletionQueue* notification_cq, void* tag) {
285     GPR_CODEGEN_ASSERT(method);
286     new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
287                               call_cq, notification_cq, tag);
288   }
289 
RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag)290   void RequestAsyncGenericCall(GenericServerContext* context,
291                                internal::ServerAsyncStreamingInterface* stream,
292                                CompletionQueue* call_cq,
293                                ServerCompletionQueue* notification_cq,
294                                void* tag) {
295     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
296                             tag, true);
297   }
298 };
299 
300 }  // namespace grpc
301 
302 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
303