1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_SERVER_H
20 #define GRPCPP_SERVER_H
21 
22 #include <condition_variable>
23 #include <list>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include <grpc/compression.h>
29 #include <grpcpp/completion_queue.h>
30 #include <grpcpp/impl/call.h>
31 #include <grpcpp/impl/codegen/grpc_library.h>
32 #include <grpcpp/impl/codegen/server_interface.h>
33 #include <grpcpp/impl/rpc_service_method.h>
34 #include <grpcpp/security/server_credentials.h>
35 #include <grpcpp/support/channel_arguments.h>
36 #include <grpcpp/support/config.h>
37 #include <grpcpp/support/status.h>
38 
39 struct grpc_server;
40 
41 namespace grpc {
42 
43 class AsyncGenericService;
44 class HealthCheckServiceInterface;
45 class ServerContext;
46 class ServerInitializer;
47 
48 /// Represents a gRPC server.
49 ///
50 /// Use a \a grpc::ServerBuilder to create, configure, and start
51 /// \a Server instances.
52 class Server : public ServerInterface, private GrpcLibraryCodegen {
53  public:
54   ~Server();
55 
56   /// Block until the server shuts down.
57   ///
58   /// \warning The server must be either shutting down or some other thread must
59   /// call \a Shutdown for this function to ever return.
60   void Wait() override;
61 
62   /// Global callbacks are a set of hooks that are called when server
63   /// events occur.  \a SetGlobalCallbacks method is used to register
64   /// the hooks with gRPC.  Note that
65   /// the \a GlobalCallbacks instance will be shared among all
66   /// \a Server instances in an application and can be set exactly
67   /// once per application.
68   class GlobalCallbacks {
69    public:
~GlobalCallbacks()70     virtual ~GlobalCallbacks() {}
71     /// Called before server is created.
UpdateArguments(ChannelArguments * args)72     virtual void UpdateArguments(ChannelArguments* args) {}
73     /// Called before application callback for each synchronous server request
74     virtual void PreSynchronousRequest(ServerContext* context) = 0;
75     /// Called after application callback for each synchronous server request
76     virtual void PostSynchronousRequest(ServerContext* context) = 0;
77     /// Called before server is started.
PreServerStart(Server * server)78     virtual void PreServerStart(Server* server) {}
79     /// Called after a server port is added.
AddPort(Server * server,const grpc::string & addr,ServerCredentials * creds,int port)80     virtual void AddPort(Server* server, const grpc::string& addr,
81                          ServerCredentials* creds, int port) {}
82   };
83   /// Set the global callback object. Can only be called once per application.
84   /// Does not take ownership of callbacks, and expects the pointed to object
85   /// to be alive until all server objects in the process have been destroyed.
86   /// The same \a GlobalCallbacks object will be used throughout the
87   /// application and is shared among all \a Server objects.
88   static void SetGlobalCallbacks(GlobalCallbacks* callbacks);
89 
90   /// Returns a \em raw pointer to the underlying \a grpc_server instance.
91   /// EXPERIMENTAL:  for internal/test use only
92   grpc_server* c_server();
93 
94   /// Returns the health check service.
GetHealthCheckService()95   HealthCheckServiceInterface* GetHealthCheckService() const {
96     return health_check_service_.get();
97   }
98 
99   /// Establish a channel for in-process communication
100   std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args);
101 
102  protected:
103   /// Register a service. This call does not take ownership of the service.
104   /// The service must exist for the lifetime of the Server instance.
105   bool RegisterService(const grpc::string* host, Service* service) override;
106 
107   /// Try binding the server to the given \a addr endpoint
108   /// (port, and optionally including IP address to bind to).
109   ///
110   /// It can be invoked multiple times. Should be used before
111   /// starting the server.
112   ///
113   /// \param addr The address to try to bind to the server (eg, localhost:1234,
114   /// 192.168.1.1:31416, [::1]:27182, etc.).
115   /// \param creds The credentials associated with the server.
116   ///
117   /// \return bound port number on success, 0 on failure.
118   ///
119   /// \warning It is an error to call this method on an already started server.
120   int AddListeningPort(const grpc::string& addr,
121                        ServerCredentials* creds) override;
122 
123   /// NOTE: This is *NOT* a public API. The server constructors are supposed to
124   /// be used by \a ServerBuilder class only. The constructor will be made
125   /// 'private' very soon.
126   ///
127   /// Server constructors. To be used by \a ServerBuilder only.
128   ///
129   /// \param max_message_size Maximum message length that the channel can
130   /// receive.
131   ///
132   /// \param args The channel args
133   ///
134   /// \param sync_server_cqs The completion queues to use if the server is a
135   /// synchronous server (or a hybrid server). The server polls for new RPCs on
136   /// these queues
137   ///
138   /// \param min_pollers The minimum number of polling threads per server
139   /// completion queue (in param sync_server_cqs) to use for listening to
140   /// incoming requests (used only in case of sync server)
141   ///
142   /// \param max_pollers The maximum number of polling threads per server
143   /// completion queue (in param sync_server_cqs) to use for listening to
144   /// incoming requests (used only in case of sync server)
145   ///
146   /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
147   /// server completion queues passed via sync_server_cqs param.
148   Server(int max_message_size, ChannelArguments* args,
149          std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
150              sync_server_cqs,
151          int min_pollers, int max_pollers, int sync_cq_timeout_msec,
152          grpc_resource_quota* server_rq = nullptr);
153 
154   /// Start the server.
155   ///
156   /// \param cqs Completion queues for handling asynchronous services. The
157   /// caller is required to keep all completion queues live until the server is
158   /// destroyed.
159   /// \param num_cqs How many completion queues does \a cqs hold.
160   void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
161 
server()162   grpc_server* server() override { return server_; };
163 
164  private:
165   friend class AsyncGenericService;
166   friend class ServerBuilder;
167   friend class ServerInitializer;
168 
169   class SyncRequest;
170   class UnimplementedAsyncRequest;
171   class UnimplementedAsyncResponse;
172 
173   /// SyncRequestThreadManager is an implementation of ThreadManager. This class
174   /// is responsible for polling for incoming RPCs and calling the RPC handlers.
175   /// This is only used in case of a Sync server (i.e a server exposing a sync
176   /// interface)
177   class SyncRequestThreadManager;
178 
179   /// Register a generic service. This call does not take ownership of the
180   /// service. The service must exist for the lifetime of the Server instance.
181   void RegisterAsyncGenericService(AsyncGenericService* service) override;
182 
183   void PerformOpsOnCall(internal::CallOpSetInterface* ops,
184                         internal::Call* call) override;
185 
186   void ShutdownInternal(gpr_timespec deadline) override;
187 
max_receive_message_size()188   int max_receive_message_size() const override {
189     return max_receive_message_size_;
190   };
191 
192   ServerInitializer* initializer();
193 
194   const int max_receive_message_size_;
195 
196   /// The following completion queues are ONLY used in case of Sync API
197   /// i.e. if the server has any services with sync methods. The server uses
198   /// these completion queues to poll for new RPCs
199   std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
200       sync_server_cqs_;
201 
202   /// List of \a ThreadManager instances (one for each cq in
203   /// the \a sync_server_cqs)
204   std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
205 
206   // Server status
207   std::mutex mu_;
208   bool started_;
209   bool shutdown_;
210   bool shutdown_notified_;  // Was notify called on the shutdown_cv_
211 
212   std::condition_variable shutdown_cv_;
213 
214   std::shared_ptr<GlobalCallbacks> global_callbacks_;
215 
216   std::vector<grpc::string> services_;
217   bool has_generic_service_;
218 
219   // Pointer to the wrapped grpc_server.
220   grpc_server* server_;
221 
222   std::unique_ptr<ServerInitializer> server_initializer_;
223 
224   std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
225   bool health_check_service_disabled_;
226 
227   // A special handler for resource exhausted in sync case
228   std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
229 };
230 
231 }  // namespace grpc
232 
233 #endif  // GRPCPP_SERVER_H
234