1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/server_builder.h>
20 
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/impl/service_type.h>
24 #include <grpcpp/resource_quota.h>
25 #include <grpcpp/server.h>
26 
27 #include <utility>
28 
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/cpp/server/thread_pool_interface.h"
31 
32 namespace grpc {
33 
34 static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
35     g_plugin_factory_list;
36 static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
37 
do_plugin_list_init(void)38 static void do_plugin_list_init(void) {
39   g_plugin_factory_list =
40       new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
41 }
42 
ServerBuilder()43 ServerBuilder::ServerBuilder()
44     : max_receive_message_size_(INT_MIN),
45       max_send_message_size_(INT_MIN),
46       sync_server_settings_(SyncServerSettings()),
47       resource_quota_(nullptr),
48       generic_service_(nullptr) {
49   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
50   for (auto it = g_plugin_factory_list->begin();
51        it != g_plugin_factory_list->end(); it++) {
52     auto& factory = *it;
53     plugins_.emplace_back(factory());
54   }
55 
56   // all compression algorithms enabled by default.
57   enabled_compression_algorithms_bitset_ =
58       (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
59   memset(&maybe_default_compression_level_, 0,
60          sizeof(maybe_default_compression_level_));
61   memset(&maybe_default_compression_algorithm_, 0,
62          sizeof(maybe_default_compression_algorithm_));
63 }
64 
~ServerBuilder()65 ServerBuilder::~ServerBuilder() {
66   if (resource_quota_ != nullptr) {
67     grpc_resource_quota_unref(resource_quota_);
68   }
69 }
70 
AddCompletionQueue(bool is_frequently_polled)71 std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
72     bool is_frequently_polled) {
73   ServerCompletionQueue* cq = new ServerCompletionQueue(
74       is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING);
75   cqs_.push_back(cq);
76   return std::unique_ptr<ServerCompletionQueue>(cq);
77 }
78 
RegisterService(Service * service)79 ServerBuilder& ServerBuilder::RegisterService(Service* service) {
80   services_.emplace_back(new NamedService(service));
81   return *this;
82 }
83 
RegisterService(const grpc::string & addr,Service * service)84 ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr,
85                                               Service* service) {
86   services_.emplace_back(new NamedService(addr, service));
87   return *this;
88 }
89 
RegisterAsyncGenericService(AsyncGenericService * service)90 ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
91     AsyncGenericService* service) {
92   if (generic_service_) {
93     gpr_log(GPR_ERROR,
94             "Adding multiple AsyncGenericService is unsupported for now. "
95             "Dropping the service %p",
96             (void*)service);
97   } else {
98     generic_service_ = service;
99   }
100   return *this;
101 }
102 
SetOption(std::unique_ptr<ServerBuilderOption> option)103 ServerBuilder& ServerBuilder::SetOption(
104     std::unique_ptr<ServerBuilderOption> option) {
105   options_.push_back(std::move(option));
106   return *this;
107 }
108 
SetSyncServerOption(ServerBuilder::SyncServerOption option,int val)109 ServerBuilder& ServerBuilder::SetSyncServerOption(
110     ServerBuilder::SyncServerOption option, int val) {
111   switch (option) {
112     case NUM_CQS:
113       sync_server_settings_.num_cqs = val;
114       break;
115     case MIN_POLLERS:
116       sync_server_settings_.min_pollers = val;
117       break;
118     case MAX_POLLERS:
119       sync_server_settings_.max_pollers = val;
120       break;
121     case CQ_TIMEOUT_MSEC:
122       sync_server_settings_.cq_timeout_msec = val;
123       break;
124   }
125   return *this;
126 }
127 
SetCompressionAlgorithmSupportStatus(grpc_compression_algorithm algorithm,bool enabled)128 ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
129     grpc_compression_algorithm algorithm, bool enabled) {
130   if (enabled) {
131     GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm);
132   } else {
133     GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm);
134   }
135   return *this;
136 }
137 
SetDefaultCompressionLevel(grpc_compression_level level)138 ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
139     grpc_compression_level level) {
140   maybe_default_compression_level_.level = level;
141   return *this;
142 }
143 
SetDefaultCompressionAlgorithm(grpc_compression_algorithm algorithm)144 ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
145     grpc_compression_algorithm algorithm) {
146   maybe_default_compression_algorithm_.is_set = true;
147   maybe_default_compression_algorithm_.algorithm = algorithm;
148   return *this;
149 }
150 
SetResourceQuota(const grpc::ResourceQuota & resource_quota)151 ServerBuilder& ServerBuilder::SetResourceQuota(
152     const grpc::ResourceQuota& resource_quota) {
153   if (resource_quota_ != nullptr) {
154     grpc_resource_quota_unref(resource_quota_);
155   }
156   resource_quota_ = resource_quota.c_resource_quota();
157   grpc_resource_quota_ref(resource_quota_);
158   return *this;
159 }
160 
AddListeningPort(const grpc::string & addr_uri,std::shared_ptr<ServerCredentials> creds,int * selected_port)161 ServerBuilder& ServerBuilder::AddListeningPort(
162     const grpc::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
163     int* selected_port) {
164   const grpc::string uri_scheme = "dns:";
165   grpc::string addr = addr_uri;
166   if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
167     size_t pos = uri_scheme.size();
168     while (addr_uri[pos] == '/') ++pos;  // Skip slashes.
169     addr = addr_uri.substr(pos);
170   }
171   Port port = {addr, std::move(creds), selected_port};
172   ports_.push_back(port);
173   return *this;
174 }
175 
BuildAndStart()176 std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
177   ChannelArguments args;
178   for (auto option = options_.begin(); option != options_.end(); ++option) {
179     (*option)->UpdateArguments(&args);
180     (*option)->UpdatePlugins(&plugins_);
181   }
182 
183   for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
184     (*plugin)->UpdateServerBuilder(this);
185     (*plugin)->UpdateChannelArguments(&args);
186   }
187 
188   if (max_receive_message_size_ >= -1) {
189     args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
190   }
191 
192   // The default message size is -1 (max), so no need to explicitly set it for
193   // -1.
194   if (max_send_message_size_ >= 0) {
195     args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
196   }
197 
198   args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
199               enabled_compression_algorithms_bitset_);
200   if (maybe_default_compression_level_.is_set) {
201     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL,
202                 maybe_default_compression_level_.level);
203   }
204   if (maybe_default_compression_algorithm_.is_set) {
205     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
206                 maybe_default_compression_algorithm_.algorithm);
207   }
208 
209   if (resource_quota_ != nullptr) {
210     args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
211                               grpc_resource_quota_arg_vtable());
212   }
213 
214   // == Determine if the server has any syncrhonous methods ==
215   bool has_sync_methods = false;
216   for (auto it = services_.begin(); it != services_.end(); ++it) {
217     if ((*it)->service->has_synchronous_methods()) {
218       has_sync_methods = true;
219       break;
220     }
221   }
222 
223   if (!has_sync_methods) {
224     for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
225       if ((*plugin)->has_sync_methods()) {
226         has_sync_methods = true;
227         break;
228       }
229     }
230   }
231 
232   // If this is a Sync server, i.e a server expositing sync API, then the server
233   // needs to create some completion queues to listen for incoming requests.
234   // 'sync_server_cqs' are those internal completion queues.
235   //
236   // This is different from the completion queues added to the server via
237   // ServerBuilder's AddCompletionQueue() method (those completion queues
238   // are in 'cqs_' member variable of ServerBuilder object)
239   std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
240       sync_server_cqs(std::make_shared<
241                       std::vector<std::unique_ptr<ServerCompletionQueue>>>());
242 
243   int num_frequently_polled_cqs = 0;
244   for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
245     if ((*it)->IsFrequentlyPolled()) {
246       num_frequently_polled_cqs++;
247     }
248   }
249 
250   const bool is_hybrid_server =
251       has_sync_methods && num_frequently_polled_cqs > 0;
252 
253   if (has_sync_methods) {
254     grpc_cq_polling_type polling_type =
255         is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
256 
257     // Create completion queues to listen to incoming rpc requests
258     for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
259       sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type));
260     }
261   }
262 
263   std::unique_ptr<Server> server(new Server(
264       max_receive_message_size_, &args, sync_server_cqs,
265       sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
266       sync_server_settings_.cq_timeout_msec, resource_quota_));
267 
268   if (has_sync_methods) {
269     // This is a Sync server
270     gpr_log(GPR_INFO,
271             "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
272             "%d, CQ timeout (msec): %d",
273             sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
274             sync_server_settings_.max_pollers,
275             sync_server_settings_.cq_timeout_msec);
276   }
277 
278   ServerInitializer* initializer = server->initializer();
279 
280   // Register all the completion queues with the server. i.e
281   //  1. sync_server_cqs: internal completion queues created IF this is a sync
282   //     server
283   //  2. cqs_: Completion queues added via AddCompletionQueue() call
284 
285   for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
286     grpc_server_register_completion_queue(server->server_, (*it)->cq(),
287                                           nullptr);
288     num_frequently_polled_cqs++;
289   }
290 
291   // cqs_ contains the completion queue added by calling the ServerBuilder's
292   // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
293   // calling Next() or AsyncNext()) and hence are not safe to be used for
294   // listening to incoming channels. Such completion queues must be registered
295   // as non-listening queues
296   for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
297     grpc_server_register_completion_queue(server->server_, (*it)->cq(),
298                                           nullptr);
299   }
300 
301   if (num_frequently_polled_cqs == 0) {
302     gpr_log(GPR_ERROR,
303             "At least one of the completion queues must be frequently polled");
304     return nullptr;
305   }
306 
307   for (auto service = services_.begin(); service != services_.end();
308        service++) {
309     if (!server->RegisterService((*service)->host.get(), (*service)->service)) {
310       return nullptr;
311     }
312   }
313 
314   for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
315     (*plugin)->InitServer(initializer);
316   }
317 
318   if (generic_service_) {
319     server->RegisterAsyncGenericService(generic_service_);
320   } else {
321     for (auto it = services_.begin(); it != services_.end(); ++it) {
322       if ((*it)->service->has_generic_methods()) {
323         gpr_log(GPR_ERROR,
324                 "Some methods were marked generic but there is no "
325                 "generic service registered.");
326         return nullptr;
327       }
328     }
329   }
330 
331   bool added_port = false;
332   for (auto port = ports_.begin(); port != ports_.end(); port++) {
333     int r = server->AddListeningPort(port->addr, port->creds.get());
334     if (!r) {
335       if (added_port) server->Shutdown();
336       return nullptr;
337     }
338     added_port = true;
339     if (port->selected_port != nullptr) {
340       *port->selected_port = r;
341     }
342   }
343 
344   auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
345   server->Start(cqs_data, cqs_.size());
346 
347   for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
348     (*plugin)->Finish(initializer);
349   }
350 
351   return server;
352 }
353 
InternalAddPluginFactory(std::unique_ptr<ServerBuilderPlugin> (* CreatePlugin)())354 void ServerBuilder::InternalAddPluginFactory(
355     std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
356   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
357   (*g_plugin_factory_list).push_back(CreatePlugin);
358 }
359 
EnableWorkaround(grpc_workaround_list id)360 ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
361   switch (id) {
362     case GRPC_WORKAROUND_ID_CRONET_COMPRESSION:
363       return AddChannelArgument(GRPC_ARG_WORKAROUND_CRONET_COMPRESSION, 1);
364     default:
365       gpr_log(GPR_ERROR, "Workaround %u does not exist or is obsolete.", id);
366       return *this;
367   }
368 }
369 
370 }  // namespace grpc
371