1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCXX_CHANNEL_FILTER_H
20 #define GRPCXX_CHANNEL_FILTER_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpcpp/impl/codegen/config.h>
25 
26 #include <functional>
27 #include <vector>
28 
29 #include "src/core/lib/channel/channel_stack.h"
30 #include "src/core/lib/surface/channel_init.h"
31 #include "src/core/lib/transport/metadata_batch.h"
32 
33 /// An interface to define filters.
34 ///
35 /// To define a filter, implement a subclass of each of \c CallData and
36 /// \c ChannelData. Then register the filter using something like this:
37 /// \code{.cpp}
38 ///   RegisterChannelFilter<MyChannelDataSubclass, MyCallDataSubclass>(
39 ///       "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
40 /// \endcode
41 
42 namespace grpc {
43 
44 /// A C++ wrapper for the \c grpc_metadata_batch struct.
45 class MetadataBatch {
46  public:
47   /// Borrows a pointer to \a batch, but does NOT take ownership.
48   /// The caller must ensure that \a batch continues to exist for as
49   /// long as the MetadataBatch object does.
MetadataBatch(grpc_metadata_batch * batch)50   explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {}
51 
batch()52   grpc_metadata_batch* batch() const { return batch_; }
53 
54   /// Adds metadata and returns the newly allocated storage.
55   /// The caller takes ownership of the result, which must exist for the
56   /// lifetime of the gRPC call.
57   grpc_linked_mdelem* AddMetadata(const string& key, const string& value);
58 
59   class const_iterator : public std::iterator<std::bidirectional_iterator_tag,
60                                               const grpc_mdelem> {
61    public:
62     const grpc_mdelem& operator*() const { return elem_->md; }
63     const grpc_mdelem operator->() const { return elem_->md; }
64 
65     const_iterator& operator++() {
66       elem_ = elem_->next;
67       return *this;
68     }
69     const_iterator operator++(int) {
70       const_iterator tmp(*this);
71       operator++();
72       return tmp;
73     }
74     const_iterator& operator--() {
75       elem_ = elem_->prev;
76       return *this;
77     }
78     const_iterator operator--(int) {
79       const_iterator tmp(*this);
80       operator--();
81       return tmp;
82     }
83 
84     bool operator==(const const_iterator& other) const {
85       return elem_ == other.elem_;
86     }
87     bool operator!=(const const_iterator& other) const {
88       return elem_ != other.elem_;
89     }
90 
91    private:
92     friend class MetadataBatch;
const_iterator(grpc_linked_mdelem * elem)93     explicit const_iterator(grpc_linked_mdelem* elem) : elem_(elem) {}
94 
95     grpc_linked_mdelem* elem_;
96   };
97 
begin()98   const_iterator begin() const { return const_iterator(batch_->list.head); }
end()99   const_iterator end() const { return const_iterator(nullptr); }
100 
101  private:
102   grpc_metadata_batch* batch_;  // Not owned.
103 };
104 
105 /// A C++ wrapper for the \c grpc_transport_op struct.
106 class TransportOp {
107  public:
108   /// Borrows a pointer to \a op, but does NOT take ownership.
109   /// The caller must ensure that \a op continues to exist for as
110   /// long as the TransportOp object does.
TransportOp(grpc_transport_op * op)111   explicit TransportOp(grpc_transport_op* op) : op_(op) {}
112 
op()113   grpc_transport_op* op() const { return op_; }
114 
115   // TODO(roth): Add a C++ wrapper for grpc_error?
disconnect_with_error()116   grpc_error* disconnect_with_error() const {
117     return op_->disconnect_with_error;
118   }
send_goaway()119   bool send_goaway() const { return op_->goaway_error != GRPC_ERROR_NONE; }
120 
121   // TODO(roth): Add methods for additional fields as needed.
122 
123  private:
124   grpc_transport_op* op_;  // Not owned.
125 };
126 
127 /// A C++ wrapper for the \c grpc_transport_stream_op_batch struct.
128 class TransportStreamOpBatch {
129  public:
130   /// Borrows a pointer to \a op, but does NOT take ownership.
131   /// The caller must ensure that \a op continues to exist for as
132   /// long as the TransportStreamOpBatch object does.
TransportStreamOpBatch(grpc_transport_stream_op_batch * op)133   explicit TransportStreamOpBatch(grpc_transport_stream_op_batch* op)
134       : op_(op),
135         send_initial_metadata_(
136             op->send_initial_metadata
137                 ? op->payload->send_initial_metadata.send_initial_metadata
138                 : nullptr),
139         send_trailing_metadata_(
140             op->send_trailing_metadata
141                 ? op->payload->send_trailing_metadata.send_trailing_metadata
142                 : nullptr),
143         recv_initial_metadata_(
144             op->recv_initial_metadata
145                 ? op->payload->recv_initial_metadata.recv_initial_metadata
146                 : nullptr),
147         recv_trailing_metadata_(
148             op->recv_trailing_metadata
149                 ? op->payload->recv_trailing_metadata.recv_trailing_metadata
150                 : nullptr) {}
151 
op()152   grpc_transport_stream_op_batch* op() const { return op_; }
153 
on_complete()154   grpc_closure* on_complete() const { return op_->on_complete; }
set_on_complete(grpc_closure * closure)155   void set_on_complete(grpc_closure* closure) { op_->on_complete = closure; }
156 
send_initial_metadata()157   MetadataBatch* send_initial_metadata() {
158     return op_->send_initial_metadata ? &send_initial_metadata_ : nullptr;
159   }
send_trailing_metadata()160   MetadataBatch* send_trailing_metadata() {
161     return op_->send_trailing_metadata ? &send_trailing_metadata_ : nullptr;
162   }
recv_initial_metadata()163   MetadataBatch* recv_initial_metadata() {
164     return op_->recv_initial_metadata ? &recv_initial_metadata_ : nullptr;
165   }
recv_trailing_metadata()166   MetadataBatch* recv_trailing_metadata() {
167     return op_->recv_trailing_metadata ? &recv_trailing_metadata_ : nullptr;
168   }
169 
send_initial_metadata_flags()170   uint32_t* send_initial_metadata_flags() const {
171     return op_->send_initial_metadata ? &op_->payload->send_initial_metadata
172                                              .send_initial_metadata_flags
173                                       : nullptr;
174   }
175 
recv_initial_metadata_ready()176   grpc_closure* recv_initial_metadata_ready() const {
177     return op_->recv_initial_metadata
178                ? op_->payload->recv_initial_metadata.recv_initial_metadata_ready
179                : nullptr;
180   }
set_recv_initial_metadata_ready(grpc_closure * closure)181   void set_recv_initial_metadata_ready(grpc_closure* closure) {
182     op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure;
183   }
184 
send_message()185   grpc_core::OrphanablePtr<grpc_core::ByteStream>* send_message() const {
186     return op_->send_message ? &op_->payload->send_message.send_message
187                              : nullptr;
188   }
set_send_message(grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message)189   void set_send_message(
190       grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message) {
191     op_->send_message = true;
192     op_->payload->send_message.send_message = std::move(send_message);
193   }
194 
recv_message()195   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message() const {
196     return op_->recv_message ? op_->payload->recv_message.recv_message
197                              : nullptr;
198   }
set_recv_message(grpc_core::OrphanablePtr<grpc_core::ByteStream> * recv_message)199   void set_recv_message(
200       grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message) {
201     op_->recv_message = true;
202     op_->payload->recv_message.recv_message = recv_message;
203   }
204 
get_census_context()205   census_context* get_census_context() const {
206     return static_cast<census_context*>(
207         op_->payload->context[GRPC_CONTEXT_TRACING].value);
208   }
209 
get_peer_string()210   const gpr_atm* get_peer_string() const {
211     if (op_->send_initial_metadata &&
212         op_->payload->send_initial_metadata.peer_string != nullptr) {
213       return op_->payload->send_initial_metadata.peer_string;
214     } else if (op_->recv_initial_metadata &&
215                op_->payload->recv_initial_metadata.peer_string != nullptr) {
216       return op_->payload->recv_initial_metadata.peer_string;
217     } else {
218       return nullptr;
219     }
220   }
221 
222  private:
223   grpc_transport_stream_op_batch* op_;  // Not owned.
224   MetadataBatch send_initial_metadata_;
225   MetadataBatch send_trailing_metadata_;
226   MetadataBatch recv_initial_metadata_;
227   MetadataBatch recv_trailing_metadata_;
228 };
229 
230 /// Represents channel data.
231 class ChannelData {
232  public:
ChannelData()233   ChannelData() {}
~ChannelData()234   virtual ~ChannelData() {}
235 
236   // TODO(roth): Come up with a more C++-like API for the channel element.
237 
238   /// Initializes the channel data.
Init(grpc_channel_element * elem,grpc_channel_element_args * args)239   virtual grpc_error* Init(grpc_channel_element* elem,
240                            grpc_channel_element_args* args) {
241     return GRPC_ERROR_NONE;
242   }
243 
244   // Called before destruction.
Destroy(grpc_channel_element * elem)245   virtual void Destroy(grpc_channel_element* elem) {}
246 
247   virtual void StartTransportOp(grpc_channel_element* elem, TransportOp* op);
248 
249   virtual void GetInfo(grpc_channel_element* elem,
250                        const grpc_channel_info* channel_info);
251 };
252 
253 /// Represents call data.
254 class CallData {
255  public:
CallData()256   CallData() {}
~CallData()257   virtual ~CallData() {}
258 
259   // TODO(roth): Come up with a more C++-like API for the call element.
260 
261   /// Initializes the call data.
Init(grpc_call_element * elem,const grpc_call_element_args * args)262   virtual grpc_error* Init(grpc_call_element* elem,
263                            const grpc_call_element_args* args) {
264     return GRPC_ERROR_NONE;
265   }
266 
267   // Called before destruction.
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)268   virtual void Destroy(grpc_call_element* elem,
269                        const grpc_call_final_info* final_info,
270                        grpc_closure* then_call_closure) {}
271 
272   /// Starts a new stream operation.
273   virtual void StartTransportStreamOpBatch(grpc_call_element* elem,
274                                            TransportStreamOpBatch* op);
275 
276   /// Sets a pollset or pollset set.
277   virtual void SetPollsetOrPollsetSet(grpc_call_element* elem,
278                                       grpc_polling_entity* pollent);
279 };
280 
281 namespace internal {
282 
283 // Defines static members for passing to C core.
284 // Members of this class correspond to the members of the C
285 // grpc_channel_filter struct.
286 template <typename ChannelDataType, typename CallDataType>
287 class ChannelFilter final {
288  public:
289   static const size_t channel_data_size = sizeof(ChannelDataType);
290 
InitChannelElement(grpc_channel_element * elem,grpc_channel_element_args * args)291   static grpc_error* InitChannelElement(grpc_channel_element* elem,
292                                         grpc_channel_element_args* args) {
293     // Construct the object in the already-allocated memory.
294     ChannelDataType* channel_data = new (elem->channel_data) ChannelDataType();
295     return channel_data->Init(elem, args);
296   }
297 
DestroyChannelElement(grpc_channel_element * elem)298   static void DestroyChannelElement(grpc_channel_element* elem) {
299     ChannelDataType* channel_data =
300         static_cast<ChannelDataType*>(elem->channel_data);
301     channel_data->Destroy(elem);
302     channel_data->~ChannelDataType();
303   }
304 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)305   static void StartTransportOp(grpc_channel_element* elem,
306                                grpc_transport_op* op) {
307     ChannelDataType* channel_data =
308         static_cast<ChannelDataType*>(elem->channel_data);
309     TransportOp op_wrapper(op);
310     channel_data->StartTransportOp(elem, &op_wrapper);
311   }
312 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * channel_info)313   static void GetChannelInfo(grpc_channel_element* elem,
314                              const grpc_channel_info* channel_info) {
315     ChannelDataType* channel_data =
316         static_cast<ChannelDataType*>(elem->channel_data);
317     channel_data->GetInfo(elem, channel_info);
318   }
319 
320   static const size_t call_data_size = sizeof(CallDataType);
321 
InitCallElement(grpc_call_element * elem,const grpc_call_element_args * args)322   static grpc_error* InitCallElement(grpc_call_element* elem,
323                                      const grpc_call_element_args* args) {
324     // Construct the object in the already-allocated memory.
325     CallDataType* call_data = new (elem->call_data) CallDataType();
326     return call_data->Init(elem, args);
327   }
328 
DestroyCallElement(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)329   static void DestroyCallElement(grpc_call_element* elem,
330                                  const grpc_call_final_info* final_info,
331                                  grpc_closure* then_call_closure) {
332     CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
333     call_data->Destroy(elem, final_info, then_call_closure);
334     call_data->~CallDataType();
335   }
336 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)337   static void StartTransportStreamOpBatch(grpc_call_element* elem,
338                                           grpc_transport_stream_op_batch* op) {
339     CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
340     TransportStreamOpBatch op_wrapper(op);
341     call_data->StartTransportStreamOpBatch(elem, &op_wrapper);
342   }
343 
SetPollsetOrPollsetSet(grpc_call_element * elem,grpc_polling_entity * pollent)344   static void SetPollsetOrPollsetSet(grpc_call_element* elem,
345                                      grpc_polling_entity* pollent) {
346     CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
347     call_data->SetPollsetOrPollsetSet(elem, pollent);
348   }
349 };
350 
351 struct FilterRecord {
352   grpc_channel_stack_type stack_type;
353   int priority;
354   std::function<bool(const grpc_channel_args&)> include_filter;
355   grpc_channel_filter filter;
356 };
357 extern std::vector<FilterRecord>* channel_filters;
358 
359 void ChannelFilterPluginInit();
360 void ChannelFilterPluginShutdown();
361 
362 }  // namespace internal
363 
364 /// Registers a new filter.
365 /// Must be called by only one thread at a time.
366 /// The \a include_filter argument specifies a function that will be called
367 /// to determine at run-time whether or not to add the filter. If the
368 /// value is nullptr, the filter will be added unconditionally.
369 template <typename ChannelDataType, typename CallDataType>
RegisterChannelFilter(const char * name,grpc_channel_stack_type stack_type,int priority,std::function<bool (const grpc_channel_args &)> include_filter)370 void RegisterChannelFilter(
371     const char* name, grpc_channel_stack_type stack_type, int priority,
372     std::function<bool(const grpc_channel_args&)> include_filter) {
373   // If we haven't been called before, initialize channel_filters and
374   // call grpc_register_plugin().
375   if (internal::channel_filters == nullptr) {
376     grpc_register_plugin(internal::ChannelFilterPluginInit,
377                          internal::ChannelFilterPluginShutdown);
378     internal::channel_filters = new std::vector<internal::FilterRecord>();
379   }
380   // Add an entry to channel_filters. The filter will be added when the
381   // C-core initialization code calls ChannelFilterPluginInit().
382   typedef internal::ChannelFilter<ChannelDataType, CallDataType> FilterType;
383   internal::FilterRecord filter_record = {
384       stack_type,
385       priority,
386       include_filter,
387       {FilterType::StartTransportStreamOpBatch, FilterType::StartTransportOp,
388        FilterType::call_data_size, FilterType::InitCallElement,
389        FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement,
390        FilterType::channel_data_size, FilterType::InitChannelElement,
391        FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}};
392   internal::channel_filters->push_back(filter_record);
393 }
394 
395 }  // namespace grpc
396 
397 #endif  // GRPCXX_CHANNEL_FILTER_H
398