1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/server_context.h>
20 
21 #include <algorithm>
22 #include <mutex>
23 #include <utility>
24 
25 #include <grpc/compression.h>
26 #include <grpc/grpc.h>
27 #include <grpc/load_reporting.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpcpp/completion_queue.h>
31 #include <grpcpp/impl/call.h>
32 #include <grpcpp/support/time.h>
33 
34 #include "src/core/lib/surface/call.h"
35 
36 namespace grpc {
37 
38 // CompletionOp
39 
40 class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
41  public:
42   // initial refs: one in the server context, one in the cq
CompletionOp()43   CompletionOp()
44       : has_tag_(false),
45         tag_(nullptr),
46         refs_(2),
47         finalized_(false),
48         cancelled_(0) {}
49 
50   void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override;
51   bool FinalizeResult(void** tag, bool* status) override;
52 
CheckCancelled(CompletionQueue * cq)53   bool CheckCancelled(CompletionQueue* cq) {
54     cq->TryPluck(this);
55     return CheckCancelledNoPluck();
56   }
CheckCancelledAsync()57   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
58 
set_tag(void * tag)59   void set_tag(void* tag) {
60     has_tag_ = true;
61     tag_ = tag;
62   }
63 
64   /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
cq_tag()65   void* cq_tag() override { return this; }
66 
67   void Unref();
68 
69  private:
CheckCancelledNoPluck()70   bool CheckCancelledNoPluck() {
71     std::lock_guard<std::mutex> g(mu_);
72     return finalized_ ? (cancelled_ != 0) : false;
73   }
74 
75   bool has_tag_;
76   void* tag_;
77   std::mutex mu_;
78   int refs_;
79   bool finalized_;
80   int cancelled_;
81 };
82 
Unref()83 void ServerContext::CompletionOp::Unref() {
84   std::unique_lock<std::mutex> lock(mu_);
85   if (--refs_ == 0) {
86     lock.unlock();
87     delete this;
88   }
89 }
90 
FillOps(grpc_call * call,grpc_op * ops,size_t * nops)91 void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops,
92                                           size_t* nops) {
93   ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
94   ops->data.recv_close_on_server.cancelled = &cancelled_;
95   ops->flags = 0;
96   ops->reserved = nullptr;
97   *nops = 1;
98 }
99 
FinalizeResult(void ** tag,bool * status)100 bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
101   std::unique_lock<std::mutex> lock(mu_);
102   finalized_ = true;
103   bool ret = false;
104   if (has_tag_) {
105     *tag = tag_;
106     ret = true;
107   }
108   if (!*status) cancelled_ = 1;
109   if (--refs_ == 0) {
110     lock.unlock();
111     delete this;
112   }
113   return ret;
114 }
115 
116 // ServerContext body
117 
ServerContext()118 ServerContext::ServerContext()
119     : completion_op_(nullptr),
120       has_notify_when_done_tag_(false),
121       async_notify_when_done_tag_(nullptr),
122       deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
123       call_(nullptr),
124       cq_(nullptr),
125       sent_initial_metadata_(false),
126       compression_level_set_(false),
127       has_pending_ops_(false) {}
128 
ServerContext(gpr_timespec deadline,grpc_metadata_array * arr)129 ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr)
130     : completion_op_(nullptr),
131       has_notify_when_done_tag_(false),
132       async_notify_when_done_tag_(nullptr),
133       deadline_(deadline),
134       call_(nullptr),
135       cq_(nullptr),
136       sent_initial_metadata_(false),
137       compression_level_set_(false),
138       has_pending_ops_(false) {
139   std::swap(*client_metadata_.arr(), *arr);
140 }
141 
~ServerContext()142 ServerContext::~ServerContext() {
143   if (call_) {
144     grpc_call_unref(call_);
145   }
146   if (completion_op_) {
147     completion_op_->Unref();
148   }
149 }
150 
BeginCompletionOp(internal::Call * call)151 void ServerContext::BeginCompletionOp(internal::Call* call) {
152   GPR_ASSERT(!completion_op_);
153   completion_op_ = new CompletionOp();
154   if (has_notify_when_done_tag_) {
155     completion_op_->set_tag(async_notify_when_done_tag_);
156   }
157   call->PerformOps(completion_op_);
158 }
159 
GetCompletionOpTag()160 internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
161   return static_cast<internal::CompletionQueueTag*>(completion_op_);
162 }
163 
AddInitialMetadata(const grpc::string & key,const grpc::string & value)164 void ServerContext::AddInitialMetadata(const grpc::string& key,
165                                        const grpc::string& value) {
166   initial_metadata_.insert(std::make_pair(key, value));
167 }
168 
AddTrailingMetadata(const grpc::string & key,const grpc::string & value)169 void ServerContext::AddTrailingMetadata(const grpc::string& key,
170                                         const grpc::string& value) {
171   trailing_metadata_.insert(std::make_pair(key, value));
172 }
173 
TryCancel() const174 void ServerContext::TryCancel() const {
175   grpc_call_error err = grpc_call_cancel_with_status(
176       call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
177   if (err != GRPC_CALL_OK) {
178     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
179   }
180 }
181 
IsCancelled() const182 bool ServerContext::IsCancelled() const {
183   if (has_notify_when_done_tag_) {
184     // when using async API, but the result is only valid
185     // if the tag has already been delivered at the completion queue
186     return completion_op_ && completion_op_->CheckCancelledAsync();
187   } else {
188     // when using sync API
189     return completion_op_ && completion_op_->CheckCancelled(cq_);
190   }
191 }
192 
set_compression_algorithm(grpc_compression_algorithm algorithm)193 void ServerContext::set_compression_algorithm(
194     grpc_compression_algorithm algorithm) {
195   compression_algorithm_ = algorithm;
196   const char* algorithm_name = nullptr;
197   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
198     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
199             algorithm);
200     abort();
201   }
202   GPR_ASSERT(algorithm_name != nullptr);
203   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
204 }
205 
peer() const206 grpc::string ServerContext::peer() const {
207   grpc::string peer;
208   if (call_) {
209     char* c_peer = grpc_call_get_peer(call_);
210     peer = c_peer;
211     gpr_free(c_peer);
212   }
213   return peer;
214 }
215 
census_context() const216 const struct census_context* ServerContext::census_context() const {
217   return grpc_census_call_get_context(call_);
218 }
219 
SetLoadReportingCosts(const std::vector<grpc::string> & cost_data)220 void ServerContext::SetLoadReportingCosts(
221     const std::vector<grpc::string>& cost_data) {
222   if (call_ == nullptr) return;
223   for (const auto& cost_datum : cost_data) {
224     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
225   }
226 }
227 
228 }  // namespace grpc
229