1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/channel.h>
20 
21 #include <chrono>
22 #include <condition_variable>
23 #include <memory>
24 #include <mutex>
25 
26 #include <grpc/grpc.h>
27 #include <grpc/slice.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/sync.h>
31 #include <grpc/support/time.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/completion_queue.h>
34 #include <grpcpp/impl/call.h>
35 #include <grpcpp/impl/codegen/completion_queue_tag.h>
36 #include <grpcpp/impl/grpc_library.h>
37 #include <grpcpp/impl/rpc_method.h>
38 #include <grpcpp/security/credentials.h>
39 #include <grpcpp/support/channel_arguments.h>
40 #include <grpcpp/support/config.h>
41 #include <grpcpp/support/status.h>
42 #include <grpcpp/support/time.h>
43 #include "src/core/lib/gpr/env.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/memory.h"
46 #include "src/core/lib/gprpp/thd.h"
47 #include "src/core/lib/profiling/timers.h"
48 #include "src/core/lib/surface/completion_queue.h"
49 
50 namespace grpc {
51 
52 static internal::GrpcLibraryInitializer g_gli_initializer;
Channel(const grpc::string & host,grpc_channel * channel)53 Channel::Channel(const grpc::string& host, grpc_channel* channel)
54     : host_(host), c_channel_(channel) {
55   g_gli_initializer.summon();
56 }
57 
~Channel()58 Channel::~Channel() {
59   grpc_channel_destroy(c_channel_);
60   if (callback_cq_ != nullptr) {
61     callback_cq_->Shutdown();
62   }
63 }
64 
65 namespace {
66 
GetChannelInfoField(grpc_channel * channel,grpc_channel_info * channel_info,char *** channel_info_field)67 grpc::string GetChannelInfoField(grpc_channel* channel,
68                                  grpc_channel_info* channel_info,
69                                  char*** channel_info_field) {
70   char* value = nullptr;
71   memset(channel_info, 0, sizeof(*channel_info));
72   *channel_info_field = &value;
73   grpc_channel_get_info(channel, channel_info);
74   if (value == nullptr) return "";
75   grpc::string result = value;
76   gpr_free(value);
77   return result;
78 }
79 
80 }  // namespace
81 
GetLoadBalancingPolicyName() const82 grpc::string Channel::GetLoadBalancingPolicyName() const {
83   grpc_channel_info channel_info;
84   return GetChannelInfoField(c_channel_, &channel_info,
85                              &channel_info.lb_policy_name);
86 }
87 
GetServiceConfigJSON() const88 grpc::string Channel::GetServiceConfigJSON() const {
89   grpc_channel_info channel_info;
90   return GetChannelInfoField(c_channel_, &channel_info,
91                              &channel_info.service_config_json);
92 }
93 
94 namespace experimental {
95 
ChannelResetConnectionBackoff(Channel * channel)96 void ChannelResetConnectionBackoff(Channel* channel) {
97   grpc_channel_reset_connect_backoff(channel->c_channel_);
98 }
99 
100 }  // namespace experimental
101 
CreateCall(const internal::RpcMethod & method,ClientContext * context,CompletionQueue * cq)102 internal::Call Channel::CreateCall(const internal::RpcMethod& method,
103                                    ClientContext* context,
104                                    CompletionQueue* cq) {
105   const bool kRegistered = method.channel_tag() && context->authority().empty();
106   grpc_call* c_call = nullptr;
107   if (kRegistered) {
108     c_call = grpc_channel_create_registered_call(
109         c_channel_, context->propagate_from_call_,
110         context->propagation_options_.c_bitmask(), cq->cq(),
111         method.channel_tag(), context->raw_deadline(), nullptr);
112   } else {
113     const char* host_str = nullptr;
114     if (!context->authority().empty()) {
115       host_str = context->authority_.c_str();
116     } else if (!host_.empty()) {
117       host_str = host_.c_str();
118     }
119     grpc_slice method_slice = SliceFromCopiedString(method.name());
120     grpc_slice host_slice;
121     if (host_str != nullptr) {
122       host_slice = SliceFromCopiedString(host_str);
123     }
124     c_call = grpc_channel_create_call(
125         c_channel_, context->propagate_from_call_,
126         context->propagation_options_.c_bitmask(), cq->cq(), method_slice,
127         host_str == nullptr ? nullptr : &host_slice, context->raw_deadline(),
128         nullptr);
129     grpc_slice_unref(method_slice);
130     if (host_str != nullptr) {
131       grpc_slice_unref(host_slice);
132     }
133   }
134   grpc_census_call_set_context(c_call, context->census_context());
135   context->set_call(c_call, shared_from_this());
136   return internal::Call(c_call, this, cq);
137 }
138 
PerformOpsOnCall(internal::CallOpSetInterface * ops,internal::Call * call)139 void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
140                                internal::Call* call) {
141   static const size_t MAX_OPS = 8;
142   size_t nops = 0;
143   grpc_op cops[MAX_OPS];
144   ops->FillOps(call->call(), cops, &nops);
145   GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops,
146                                                    ops->cq_tag(), nullptr));
147 }
148 
RegisterMethod(const char * method)149 void* Channel::RegisterMethod(const char* method) {
150   return grpc_channel_register_call(
151       c_channel_, method, host_.empty() ? nullptr : host_.c_str(), nullptr);
152 }
153 
GetState(bool try_to_connect)154 grpc_connectivity_state Channel::GetState(bool try_to_connect) {
155   return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
156 }
157 
158 namespace {
159 
160 class TagSaver final : public internal::CompletionQueueTag {
161  public:
TagSaver(void * tag)162   explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver()163   ~TagSaver() override {}
FinalizeResult(void ** tag,bool * status)164   bool FinalizeResult(void** tag, bool* status) override {
165     *tag = tag_;
166     delete this;
167     return true;
168   }
169 
170  private:
171   void* tag_;
172 };
173 
174 }  // namespace
175 
NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,gpr_timespec deadline,CompletionQueue * cq,void * tag)176 void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
177                                       gpr_timespec deadline,
178                                       CompletionQueue* cq, void* tag) {
179   TagSaver* tag_saver = new TagSaver(tag);
180   grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
181                                         cq->cq(), tag_saver);
182 }
183 
WaitForStateChangeImpl(grpc_connectivity_state last_observed,gpr_timespec deadline)184 bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
185                                      gpr_timespec deadline) {
186   CompletionQueue cq;
187   bool ok = false;
188   void* tag = nullptr;
189   NotifyOnStateChangeImpl(last_observed, deadline, &cq, nullptr);
190   cq.Next(&tag, &ok);
191   GPR_ASSERT(tag == nullptr);
192   return ok;
193 }
194 
195 namespace {
196 class ShutdownCallback : public grpc_experimental_completion_queue_functor {
197  public:
ShutdownCallback()198   ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
199   // TakeCQ takes ownership of the cq into the shutdown callback
200   // so that the shutdown callback will be responsible for destroying it
TakeCQ(CompletionQueue * cq)201   void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
202 
203   // The Run function will get invoked by the completion queue library
204   // when the shutdown is actually complete
Run(grpc_experimental_completion_queue_functor * cb,int)205   static void Run(grpc_experimental_completion_queue_functor* cb, int) {
206     auto* callback = static_cast<ShutdownCallback*>(cb);
207     delete callback->cq_;
208     grpc_core::Delete(callback);
209   }
210 
211  private:
212   CompletionQueue* cq_ = nullptr;
213 };
214 }  // namespace
215 
CallbackCQ()216 CompletionQueue* Channel::CallbackCQ() {
217   // TODO(vjpai): Consider using a single global CQ for the default CQ
218   // if there is no explicit per-channel CQ registered
219   std::lock_guard<std::mutex> l(mu_);
220   if (callback_cq_ == nullptr) {
221     auto* shutdown_callback = grpc_core::New<ShutdownCallback>();
222     callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
223         GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
224         shutdown_callback});
225 
226     // Transfer ownership of the new cq to its own shutdown callback
227     shutdown_callback->TakeCQ(callback_cq_);
228   }
229   return callback_cq_;
230 }
231 
232 }  // namespace grpc
233