1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <grpc/grpc_security.h>
22 #include <grpc/slice.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/string_util.h>
26 
27 #include "src/core/ext/filters/client_channel/parse_address.h"
28 #include "src/core/ext/filters/client_channel/uri_parser.h"
29 #include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
30 #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/channel/context.h"
33 #include "src/core/lib/iomgr/resolve_address.h"
34 #include "src/core/lib/iomgr/sockaddr_posix.h"
35 #include "src/core/lib/iomgr/socket_utils.h"
36 #include "src/core/lib/security/context/security_context.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/surface/call.h"
39 
40 namespace grpc {
41 
42 constexpr char kEncodedIpv4AddressLengthString[] = "08";
43 constexpr char kEncodedIpv6AddressLengthString[] = "32";
44 constexpr char kEmptyAddressLengthString[] = "00";
45 constexpr size_t kLengthPrefixSize = 2;
46 
Init(grpc_channel_element *,grpc_channel_element_args * args)47 grpc_error* ServerLoadReportingChannelData::Init(
48     grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
49   GPR_ASSERT(!args->is_last);
50   // Find and record the peer_identity.
51   const grpc_auth_context* auth_context =
52       grpc_find_auth_context_in_args(args->channel_args);
53   if (auth_context != nullptr &&
54       grpc_auth_context_peer_is_authenticated(auth_context)) {
55     grpc_auth_property_iterator auth_it =
56         grpc_auth_context_peer_identity(auth_context);
57     const grpc_auth_property* auth_property =
58         grpc_auth_property_iterator_next(&auth_it);
59     if (auth_property != nullptr) {
60       peer_identity_ = auth_property->value;
61       peer_identity_len_ = auth_property->value_length;
62     }
63   }
64   return GRPC_ERROR_NONE;
65 }
66 
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)67 void ServerLoadReportingCallData::Destroy(
68     grpc_call_element* elem, const grpc_call_final_info* final_info,
69     grpc_closure* then_call_closure) {
70   ServerLoadReportingChannelData* chand =
71       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
72   // Only record an end if we've recorded its corresponding start, which is
73   // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
74   // that we attempt to record the call end before we have recorded the call
75   // start, because the data needed for recording the start comes from the
76   // initial metadata, which may not be ready before the call finishes.
77   if (client_ip_and_lr_token_ != nullptr) {
78     opencensus::stats::Record(
79         {{::grpc::load_reporter::MeasureEndCount(), 1},
80          {::grpc::load_reporter::MeasureEndBytesSent(),
81           final_info->stats.transport_stream_stats.outgoing.data_bytes},
82          {::grpc::load_reporter::MeasureEndBytesReceived(),
83           final_info->stats.transport_stream_stats.incoming.data_bytes},
84          {::grpc::load_reporter::MeasureEndLatencyMs(),
85           gpr_time_to_millis(final_info->stats.latency)}},
86         {{::grpc::load_reporter::TagKeyToken(),
87           {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
88          {::grpc::load_reporter::TagKeyHost(),
89           {target_host_, target_host_len_}},
90          {::grpc::load_reporter::TagKeyUserId(),
91           {chand->peer_identity(), chand->peer_identity_len()}},
92          {::grpc::load_reporter::TagKeyStatus(),
93           GetStatusTagForStatus(final_info->final_status)}});
94     gpr_free(client_ip_and_lr_token_);
95   }
96   gpr_free(target_host_);
97   grpc_slice_unref_internal(service_method_);
98 }
99 
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)100 void ServerLoadReportingCallData::StartTransportStreamOpBatch(
101     grpc_call_element* elem, TransportStreamOpBatch* op) {
102   GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
103   if (op->recv_initial_metadata() != nullptr) {
104     // Save some fields to use when initial metadata is ready.
105     peer_string_ = op->get_peer_string();
106     recv_initial_metadata_ =
107         op->op()->payload->recv_initial_metadata.recv_initial_metadata;
108     original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
109     // Substitute the original closure for the wrapper closure.
110     op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
111   } else if (op->send_trailing_metadata() != nullptr) {
112     GRPC_LOG_IF_ERROR(
113         "server_load_reporting_filter",
114         grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
115                                    SendTrailingMetadataFilter, elem,
116                                    "send_trailing_metadata filtering error"));
117   }
118   grpc_call_next_op(elem, op->op());
119 }
120 
GetCensusSafeClientIpString(char ** client_ip_string,size_t * size)121 void ServerLoadReportingCallData::GetCensusSafeClientIpString(
122     char** client_ip_string, size_t* size) {
123   // Find the client URI string.
124   const char* client_uri_str =
125       reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
126   if (client_uri_str == nullptr) {
127     gpr_log(GPR_ERROR,
128             "Unable to extract client URI string (peer string) from gRPC "
129             "metadata.");
130     *client_ip_string = nullptr;
131     *size = 0;
132     return;
133   }
134   // Parse the client URI string into grpc_uri.
135   grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true);
136   if (client_uri == nullptr) {
137     gpr_log(GPR_ERROR,
138             "Unable to parse the client URI string (peer string) to a client "
139             "URI.");
140     *client_ip_string = nullptr;
141     *size = 0;
142     return;
143   }
144   // Parse the client URI into grpc_resolved_address.
145   grpc_resolved_address resolved_address;
146   bool success = grpc_parse_uri(client_uri, &resolved_address);
147   grpc_uri_destroy(client_uri);
148   if (!success) {
149     gpr_log(GPR_ERROR,
150             "Unable to parse client URI into a grpc_resolved_address.");
151     *client_ip_string = nullptr;
152     *size = 0;
153     return;
154   }
155   // Convert the socket address in the grpc_resolved_address into a hex string
156   // according to the address family.
157   grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
158   if (addr->sa_family == GRPC_AF_INET) {
159     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
160     gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr));
161     *size = 8;
162   } else if (addr->sa_family == GRPC_AF_INET6) {
163     grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
164     *client_ip_string = static_cast<char*>(gpr_malloc(32 + 1));
165     uint32_t* addr6_next_long = reinterpret_cast<uint32_t*>(&addr6->sin6_addr);
166     for (size_t i = 0; i < 4; ++i) {
167       snprintf(*client_ip_string + 8 * i, 8 + 1, "%08x",
168                grpc_ntohl(*addr6_next_long++));
169     }
170     *size = 32;
171   } else {
172     GPR_UNREACHABLE_CODE();
173   }
174 }
175 
StoreClientIpAndLrToken(const char * lr_token,size_t lr_token_len)176 void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
177                                                           size_t lr_token_len) {
178   char* client_ip;
179   size_t client_ip_len;
180   GetCensusSafeClientIpString(&client_ip, &client_ip_len);
181   client_ip_and_lr_token_len_ =
182       kLengthPrefixSize + client_ip_len + lr_token_len;
183   client_ip_and_lr_token_ = static_cast<char*>(
184       gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
185   char* cur_pos = client_ip_and_lr_token_;
186   // Store the IP length prefix.
187   if (client_ip_len == 0) {
188     strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
189   } else if (client_ip_len == 8) {
190     strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
191   } else if (client_ip_len == 32) {
192     strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
193   } else {
194     GPR_UNREACHABLE_CODE();
195   }
196   cur_pos += kLengthPrefixSize;
197   // Store the IP.
198   if (client_ip_len != 0) {
199     strncpy(cur_pos, client_ip, client_ip_len);
200   }
201   gpr_free(client_ip);
202   cur_pos += client_ip_len;
203   // Store the LR token.
204   if (lr_token_len != 0) {
205     strncpy(cur_pos, lr_token, lr_token_len);
206   }
207   GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
208              client_ip_and_lr_token_len_);
209 }
210 
RecvInitialMetadataFilter(void * user_data,grpc_mdelem md)211 grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
212     void* user_data, grpc_mdelem md) {
213   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
214   ServerLoadReportingCallData* calld =
215       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
216   if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
217     calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
218   } else if (calld->target_host_ == nullptr &&
219              grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
220     grpc_slice target_host_slice = GRPC_MDVALUE(md);
221     calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
222     calld->target_host_ =
223         reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
224     for (size_t i = 0; i < calld->target_host_len_; ++i) {
225       calld->target_host_[i] = static_cast<char>(
226           tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
227     }
228   } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_TOKEN)) {
229     if (calld->client_ip_and_lr_token_ == nullptr) {
230       calld->StoreClientIpAndLrToken(
231           reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
232           GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
233     }
234     return GRPC_FILTERED_REMOVE();
235   }
236   return GRPC_FILTERED_MDELEM(md);
237 }
238 
RecvInitialMetadataReady(void * arg,grpc_error * err)239 void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
240                                                            grpc_error* err) {
241   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
242   ServerLoadReportingCallData* calld =
243       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
244   ServerLoadReportingChannelData* chand =
245       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
246   if (err == GRPC_ERROR_NONE) {
247     GRPC_LOG_IF_ERROR(
248         "server_load_reporting_filter",
249         grpc_metadata_batch_filter(calld->recv_initial_metadata_,
250                                    RecvInitialMetadataFilter, elem,
251                                    "recv_initial_metadata filtering error"));
252     // If the LB token was not found in the recv_initial_metadata, only the
253     // client IP part will be recorded (with an empty LB token).
254     if (calld->client_ip_and_lr_token_ == nullptr) {
255       calld->StoreClientIpAndLrToken(nullptr, 0);
256     }
257     opencensus::stats::Record(
258         {{::grpc::load_reporter::MeasureStartCount(), 1}},
259         {{::grpc::load_reporter::TagKeyToken(),
260           {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
261          {::grpc::load_reporter::TagKeyHost(),
262           {calld->target_host_, calld->target_host_len_}},
263          {::grpc::load_reporter::TagKeyUserId(),
264           {chand->peer_identity(), chand->peer_identity_len()}}});
265   }
266   GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
267                    GRPC_ERROR_REF(err));
268 }
269 
Init(grpc_call_element * elem,const grpc_call_element_args * args)270 grpc_error* ServerLoadReportingCallData::Init(
271     grpc_call_element* elem, const grpc_call_element_args* args) {
272   service_method_ = grpc_empty_slice();
273   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
274                     elem, grpc_schedule_on_exec_ctx);
275   return GRPC_ERROR_NONE;
276 }
277 
SendTrailingMetadataFilter(void * user_data,grpc_mdelem md)278 grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
279     void* user_data, grpc_mdelem md) {
280   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
281   ServerLoadReportingCallData* calld =
282       reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
283   ServerLoadReportingChannelData* chand =
284       reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
285   if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
286     const grpc_slice value = GRPC_MDVALUE(md);
287     const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
288     if (cost_entry_size < sizeof(double)) {
289       gpr_log(GPR_ERROR,
290               "Cost metadata value too small (%zu bytes) to hold valid data. "
291               "Ignoring.",
292               cost_entry_size);
293       return GRPC_FILTERED_REMOVE();
294     }
295     const double* cost_entry_ptr =
296         reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
297     double cost_value = *cost_entry_ptr++;
298     const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
299     const size_t cost_name_len = cost_entry_size - sizeof(double);
300     opencensus::stats::Record(
301         {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
302         {{::grpc::load_reporter::TagKeyToken(),
303           {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
304          {::grpc::load_reporter::TagKeyHost(),
305           {calld->target_host_, calld->target_host_len_}},
306          {::grpc::load_reporter::TagKeyUserId(),
307           {chand->peer_identity(), chand->peer_identity_len()}},
308          {::grpc::load_reporter::TagKeyMetricName(),
309           {cost_name, cost_name_len}}});
310     return GRPC_FILTERED_REMOVE();
311   }
312   return GRPC_FILTERED_MDELEM(md);
313 }
314 
GetStatusTagForStatus(grpc_status_code status)315 const char* ServerLoadReportingCallData::GetStatusTagForStatus(
316     grpc_status_code status) {
317   switch (status) {
318     case GRPC_STATUS_OK:
319       return ::grpc::load_reporter::kCallStatusOk;
320     case GRPC_STATUS_UNKNOWN:
321     case GRPC_STATUS_DEADLINE_EXCEEDED:
322     case GRPC_STATUS_UNIMPLEMENTED:
323     case GRPC_STATUS_INTERNAL:
324     case GRPC_STATUS_UNAVAILABLE:
325     case GRPC_STATUS_DATA_LOSS:
326       return ::grpc::load_reporter::kCallStatusServerError;
327     default:
328       return ::grpc::load_reporter::kCallStatusClientError;
329   }
330 }
331 
332 namespace {
MaybeAddServerLoadReportingFilter(const grpc_channel_args & args)333 bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
334   return grpc_channel_arg_get_bool(
335       grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
336 }
337 }  // namespace
338 
339 // TODO(juanlishen): We should register the filter during grpc initialization
340 // time once OpenCensus is compatible with our build system. For now, we force
341 // registration of the server load reporting filter at static initialization
342 // time if we build with the filter target.
343 struct ServerLoadReportingFilterStaticRegistrar {
ServerLoadReportingFilterStaticRegistrargrpc::ServerLoadReportingFilterStaticRegistrar344   ServerLoadReportingFilterStaticRegistrar() {
345     static std::atomic_bool registered{false};
346     if (registered) return;
347     RegisterChannelFilter<ServerLoadReportingChannelData,
348                           ServerLoadReportingCallData>(
349         "server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
350         MaybeAddServerLoadReportingFilter);
351     // Access measures to ensure they are initialized. Otherwise, we can't
352     // create any valid view before the first RPC.
353     ::grpc::load_reporter::MeasureStartCount();
354     ::grpc::load_reporter::MeasureEndCount();
355     ::grpc::load_reporter::MeasureEndBytesSent();
356     ::grpc::load_reporter::MeasureEndBytesReceived();
357     ::grpc::load_reporter::MeasureEndLatencyMs();
358     ::grpc::load_reporter::MeasureOtherCallMetric();
359     registered = true;
360   }
361 } server_load_reporting_filter_static_registrar;
362 
363 }  // namespace grpc
364