1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <fstream>
20 #include <memory>
21 #include <sstream>
22 #include <thread>
23 
24 #include <gflags/gflags.h>
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 #include <grpcpp/security/server_credentials.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/transport/byte_stream.h"
35 #include "src/proto/grpc/testing/empty.pb.h"
36 #include "src/proto/grpc/testing/messages.pb.h"
37 #include "src/proto/grpc/testing/test.grpc.pb.h"
38 #include "test/cpp/interop/server_helper.h"
39 #include "test/cpp/util/test_config.h"
40 
41 DEFINE_bool(use_alts, false,
42             "Whether to use alts. Enable alts will disable tls.");
43 DEFINE_bool(use_tls, false, "Whether to use tls.");
44 DEFINE_string(custom_credentials_type, "", "User provided credentials type.");
45 DEFINE_int32(port, 0, "Server port.");
46 DEFINE_int32(max_send_message_size, -1, "The maximum send message size.");
47 
48 using grpc::Server;
49 using grpc::ServerBuilder;
50 using grpc::ServerContext;
51 using grpc::ServerCredentials;
52 using grpc::ServerReader;
53 using grpc::ServerReaderWriter;
54 using grpc::ServerWriter;
55 using grpc::SslServerCredentialsOptions;
56 using grpc::Status;
57 using grpc::WriteOptions;
58 using grpc::testing::InteropServerContextInspector;
59 using grpc::testing::Payload;
60 using grpc::testing::SimpleRequest;
61 using grpc::testing::SimpleResponse;
62 using grpc::testing::StreamingInputCallRequest;
63 using grpc::testing::StreamingInputCallResponse;
64 using grpc::testing::StreamingOutputCallRequest;
65 using grpc::testing::StreamingOutputCallResponse;
66 using grpc::testing::TestService;
67 
68 const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
69 const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
70 const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
71 
MaybeEchoMetadata(ServerContext * context)72 void MaybeEchoMetadata(ServerContext* context) {
73   const auto& client_metadata = context->client_metadata();
74   GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
75   GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
76 
77   auto iter = client_metadata.find(kEchoInitialMetadataKey);
78   if (iter != client_metadata.end()) {
79     context->AddInitialMetadata(
80         kEchoInitialMetadataKey,
81         grpc::string(iter->second.begin(), iter->second.end()));
82   }
83   iter = client_metadata.find(kEchoTrailingBinMetadataKey);
84   if (iter != client_metadata.end()) {
85     context->AddTrailingMetadata(
86         kEchoTrailingBinMetadataKey,
87         grpc::string(iter->second.begin(), iter->second.end()));
88   }
89   // Check if client sent a magic key in the header that makes us echo
90   // back the user-agent (for testing purpose)
91   iter = client_metadata.find(kEchoUserAgentKey);
92   if (iter != client_metadata.end()) {
93     iter = client_metadata.find("user-agent");
94     if (iter != client_metadata.end()) {
95       context->AddInitialMetadata(
96           kEchoUserAgentKey,
97           grpc::string(iter->second.begin(), iter->second.end()));
98     }
99   }
100 }
101 
SetPayload(int size,Payload * payload)102 bool SetPayload(int size, Payload* payload) {
103   std::unique_ptr<char[]> body(new char[size]());
104   payload->set_body(body.get(), size);
105   return true;
106 }
107 
CheckExpectedCompression(const ServerContext & context,const bool compression_expected)108 bool CheckExpectedCompression(const ServerContext& context,
109                               const bool compression_expected) {
110   const InteropServerContextInspector inspector(context);
111   const grpc_compression_algorithm received_compression =
112       inspector.GetCallCompressionAlgorithm();
113 
114   if (compression_expected) {
115     if (received_compression == GRPC_COMPRESS_NONE) {
116       // Expected some compression, got NONE. This is an error.
117       gpr_log(GPR_ERROR,
118               "Expected compression but got uncompressed request from client.");
119       return false;
120     }
121     if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) {
122       gpr_log(GPR_ERROR,
123               "Failure: Requested compression in a compressable request, but "
124               "compression bit in message flags not set.");
125       return false;
126     }
127   } else {
128     // Didn't expect compression -> make sure the request is uncompressed
129     if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) {
130       gpr_log(GPR_ERROR,
131               "Failure: Didn't requested compression, but compression bit in "
132               "message flags set.");
133       return false;
134     }
135   }
136   return true;
137 }
138 
139 class TestServiceImpl : public TestService::Service {
140  public:
EmptyCall(ServerContext * context,const grpc::testing::Empty * request,grpc::testing::Empty * response)141   Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request,
142                    grpc::testing::Empty* response) {
143     MaybeEchoMetadata(context);
144     return Status::OK;
145   }
146 
147   // Response contains current timestamp. We ignore everything in the request.
CacheableUnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)148   Status CacheableUnaryCall(ServerContext* context,
149                             const SimpleRequest* request,
150                             SimpleResponse* response) {
151     gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
152     std::string timestamp = std::to_string((long long unsigned)ts.tv_nsec);
153     response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
154     context->AddInitialMetadata("cache-control", "max-age=60, public");
155     return Status::OK;
156   }
157 
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)158   Status UnaryCall(ServerContext* context, const SimpleRequest* request,
159                    SimpleResponse* response) {
160     MaybeEchoMetadata(context);
161     if (request->has_response_compressed()) {
162       const bool compression_requested = request->response_compressed().value();
163       gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
164               compression_requested ? "enabled" : "disabled", __func__);
165       if (compression_requested) {
166         // Any level would do, let's go for HIGH because we are overachievers.
167         context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
168       } else {
169         context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE);
170       }
171     }
172     if (!CheckExpectedCompression(*context,
173                                   request->expect_compressed().value())) {
174       return Status(grpc::StatusCode::INVALID_ARGUMENT,
175                     "Compressed request expectation not met.");
176     }
177     if (request->response_size() > 0) {
178       if (!SetPayload(request->response_size(), response->mutable_payload())) {
179         return Status(grpc::StatusCode::INVALID_ARGUMENT,
180                       "Error creating payload.");
181       }
182     }
183 
184     if (request->has_response_status()) {
185       return Status(
186           static_cast<grpc::StatusCode>(request->response_status().code()),
187           request->response_status().message());
188     }
189 
190     return Status::OK;
191   }
192 
StreamingOutputCall(ServerContext * context,const StreamingOutputCallRequest * request,ServerWriter<StreamingOutputCallResponse> * writer)193   Status StreamingOutputCall(
194       ServerContext* context, const StreamingOutputCallRequest* request,
195       ServerWriter<StreamingOutputCallResponse>* writer) {
196     StreamingOutputCallResponse response;
197     bool write_success = true;
198     for (int i = 0; write_success && i < request->response_parameters_size();
199          i++) {
200       if (!SetPayload(request->response_parameters(i).size(),
201                       response.mutable_payload())) {
202         return Status(grpc::StatusCode::INVALID_ARGUMENT,
203                       "Error creating payload.");
204       }
205       WriteOptions wopts;
206       if (request->response_parameters(i).has_compressed()) {
207         // Compress by default. Disabled on a per-message basis.
208         context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
209         const bool compression_requested =
210             request->response_parameters(i).compressed().value();
211         gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
212                 compression_requested ? "enabled" : "disabled", __func__);
213         if (!compression_requested) {
214           wopts.set_no_compression();
215         }  // else, compression is already enabled via the context.
216       }
217       int time_us;
218       if ((time_us = request->response_parameters(i).interval_us()) > 0) {
219         // Sleep before response if needed
220         gpr_timespec sleep_time =
221             gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
222                          gpr_time_from_micros(time_us, GPR_TIMESPAN));
223         gpr_sleep_until(sleep_time);
224       }
225       write_success = writer->Write(response, wopts);
226     }
227     if (write_success) {
228       return Status::OK;
229     } else {
230       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
231     }
232   }
233 
StreamingInputCall(ServerContext * context,ServerReader<StreamingInputCallRequest> * reader,StreamingInputCallResponse * response)234   Status StreamingInputCall(ServerContext* context,
235                             ServerReader<StreamingInputCallRequest>* reader,
236                             StreamingInputCallResponse* response) {
237     StreamingInputCallRequest request;
238     int aggregated_payload_size = 0;
239     while (reader->Read(&request)) {
240       if (!CheckExpectedCompression(*context,
241                                     request.expect_compressed().value())) {
242         return Status(grpc::StatusCode::INVALID_ARGUMENT,
243                       "Compressed request expectation not met.");
244       }
245       if (request.has_payload()) {
246         aggregated_payload_size += request.payload().body().size();
247       }
248     }
249     response->set_aggregated_payload_size(aggregated_payload_size);
250     return Status::OK;
251   }
252 
FullDuplexCall(ServerContext * context,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)253   Status FullDuplexCall(
254       ServerContext* context,
255       ServerReaderWriter<StreamingOutputCallResponse,
256                          StreamingOutputCallRequest>* stream) {
257     MaybeEchoMetadata(context);
258     StreamingOutputCallRequest request;
259     StreamingOutputCallResponse response;
260     bool write_success = true;
261     while (write_success && stream->Read(&request)) {
262       if (request.has_response_status()) {
263         return Status(
264             static_cast<grpc::StatusCode>(request.response_status().code()),
265             request.response_status().message());
266       }
267       if (request.response_parameters_size() != 0) {
268         response.mutable_payload()->set_type(request.payload().type());
269         response.mutable_payload()->set_body(
270             grpc::string(request.response_parameters(0).size(), '\0'));
271         int time_us;
272         if ((time_us = request.response_parameters(0).interval_us()) > 0) {
273           // Sleep before response if needed
274           gpr_timespec sleep_time =
275               gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
276                            gpr_time_from_micros(time_us, GPR_TIMESPAN));
277           gpr_sleep_until(sleep_time);
278         }
279         write_success = stream->Write(response);
280       }
281     }
282     if (write_success) {
283       return Status::OK;
284     } else {
285       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
286     }
287   }
288 
HalfDuplexCall(ServerContext * context,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)289   Status HalfDuplexCall(
290       ServerContext* context,
291       ServerReaderWriter<StreamingOutputCallResponse,
292                          StreamingOutputCallRequest>* stream) {
293     std::vector<StreamingOutputCallRequest> requests;
294     StreamingOutputCallRequest request;
295     while (stream->Read(&request)) {
296       requests.push_back(request);
297     }
298 
299     StreamingOutputCallResponse response;
300     bool write_success = true;
301     for (unsigned int i = 0; write_success && i < requests.size(); i++) {
302       response.mutable_payload()->set_type(requests[i].payload().type());
303       if (requests[i].response_parameters_size() == 0) {
304         return Status(grpc::StatusCode::INTERNAL,
305                       "Request does not have response parameters.");
306       }
307       response.mutable_payload()->set_body(
308           grpc::string(requests[i].response_parameters(0).size(), '\0'));
309       write_success = stream->Write(response);
310     }
311     if (write_success) {
312       return Status::OK;
313     } else {
314       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
315     }
316   }
317 };
318 
RunServer(const std::shared_ptr<ServerCredentials> & creds)319 void grpc::testing::interop::RunServer(
320     const std::shared_ptr<ServerCredentials>& creds) {
321   RunServer(creds, FLAGS_port, nullptr, nullptr);
322 }
323 
RunServer(const std::shared_ptr<ServerCredentials> & creds,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)324 void grpc::testing::interop::RunServer(
325     const std::shared_ptr<ServerCredentials>& creds,
326     std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
327         server_options) {
328   RunServer(creds, FLAGS_port, nullptr, std::move(server_options));
329 }
330 
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition)331 void grpc::testing::interop::RunServer(
332     const std::shared_ptr<ServerCredentials>& creds, const int port,
333     ServerStartedCondition* server_started_condition) {
334   RunServer(creds, port, server_started_condition, nullptr);
335 }
336 
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)337 void grpc::testing::interop::RunServer(
338     const std::shared_ptr<ServerCredentials>& creds, const int port,
339     ServerStartedCondition* server_started_condition,
340     std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
341         server_options) {
342   GPR_ASSERT(port != 0);
343   std::ostringstream server_address;
344   server_address << "0.0.0.0:" << port;
345   TestServiceImpl service;
346 
347   SimpleRequest request;
348   SimpleResponse response;
349 
350   ServerBuilder builder;
351   builder.RegisterService(&service);
352   builder.AddListeningPort(server_address.str(), creds);
353   if (server_options != nullptr) {
354     for (size_t i = 0; i < server_options->size(); i++) {
355       builder.SetOption(std::move((*server_options)[i]));
356     }
357   }
358   if (FLAGS_max_send_message_size >= 0) {
359     builder.SetMaxSendMessageSize(FLAGS_max_send_message_size);
360   }
361   std::unique_ptr<Server> server(builder.BuildAndStart());
362   gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
363 
364   // Signal that the server has started.
365   if (server_started_condition) {
366     std::unique_lock<std::mutex> lock(server_started_condition->mutex);
367     server_started_condition->server_started = true;
368     server_started_condition->condition.notify_all();
369   }
370 
371   while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
372     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
373                                  gpr_time_from_seconds(5, GPR_TIMESPAN)));
374   }
375 }
376