1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/cpp/qps/qps_worker.h"
20 
21 #include <memory>
22 #include <mutex>
23 #include <sstream>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/cpu.h>
31 #include <grpc/support/log.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/security/server_credentials.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
36 
37 #include "src/core/lib/gpr/host_port.h"
38 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
39 #include "test/core/util/grpc_profiler.h"
40 #include "test/core/util/histogram.h"
41 #include "test/cpp/qps/client.h"
42 #include "test/cpp/qps/qps_server_builder.h"
43 #include "test/cpp/qps/server.h"
44 #include "test/cpp/util/create_test_channel.h"
45 #include "test/cpp/util/test_credentials_provider.h"
46 
47 namespace grpc {
48 namespace testing {
49 
CreateClient(const ClientConfig & config)50 static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
51   gpr_log(GPR_INFO, "Starting client of type %s %s %d",
52           ClientType_Name(config.client_type()).c_str(),
53           RpcType_Name(config.rpc_type()).c_str(),
54           config.payload_config().has_bytebuf_params());
55 
56   switch (config.client_type()) {
57     case ClientType::SYNC_CLIENT:
58       return CreateSynchronousClient(config);
59     case ClientType::ASYNC_CLIENT:
60       return config.payload_config().has_bytebuf_params()
61                  ? CreateGenericAsyncStreamingClient(config)
62                  : CreateAsyncClient(config);
63     default:
64       abort();
65   }
66   abort();
67 }
68 
CreateServer(const ServerConfig & config)69 static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
70   gpr_log(GPR_INFO, "Starting server of type %s",
71           ServerType_Name(config.server_type()).c_str());
72 
73   switch (config.server_type()) {
74     case ServerType::SYNC_SERVER:
75       return CreateSynchronousServer(config);
76     case ServerType::ASYNC_SERVER:
77       return CreateAsyncServer(config);
78     case ServerType::ASYNC_GENERIC_SERVER:
79       return CreateAsyncGenericServer(config);
80     default:
81       abort();
82   }
83   abort();
84 }
85 
86 class ScopedProfile final {
87  public:
ScopedProfile(const char * filename,bool enable)88   ScopedProfile(const char* filename, bool enable) : enable_(enable) {
89     if (enable_) grpc_profiler_start(filename);
90   }
~ScopedProfile()91   ~ScopedProfile() {
92     if (enable_) grpc_profiler_stop();
93   }
94 
95  private:
96   const bool enable_;
97 };
98 
99 class WorkerServiceImpl final : public WorkerService::Service {
100  public:
WorkerServiceImpl(int server_port,QpsWorker * worker)101   WorkerServiceImpl(int server_port, QpsWorker* worker)
102       : acquired_(false), server_port_(server_port), worker_(worker) {}
103 
RunClient(ServerContext * ctx,ServerReaderWriter<ClientStatus,ClientArgs> * stream)104   Status RunClient(
105       ServerContext* ctx,
106       ServerReaderWriter<ClientStatus, ClientArgs>* stream) override {
107     InstanceGuard g(this);
108     if (!g.Acquired()) {
109       return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
110     }
111 
112     ScopedProfile profile("qps_client.prof", false);
113     Status ret = RunClientBody(ctx, stream);
114     gpr_log(GPR_INFO, "RunClient: Returning");
115     return ret;
116   }
117 
RunServer(ServerContext * ctx,ServerReaderWriter<ServerStatus,ServerArgs> * stream)118   Status RunServer(
119       ServerContext* ctx,
120       ServerReaderWriter<ServerStatus, ServerArgs>* stream) override {
121     InstanceGuard g(this);
122     if (!g.Acquired()) {
123       return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
124     }
125 
126     ScopedProfile profile("qps_server.prof", false);
127     Status ret = RunServerBody(ctx, stream);
128     gpr_log(GPR_INFO, "RunServer: Returning");
129     return ret;
130   }
131 
CoreCount(ServerContext * ctx,const CoreRequest *,CoreResponse * resp)132   Status CoreCount(ServerContext* ctx, const CoreRequest*,
133                    CoreResponse* resp) override {
134     resp->set_cores(gpr_cpu_num_cores());
135     return Status::OK;
136   }
137 
QuitWorker(ServerContext * ctx,const Void *,Void *)138   Status QuitWorker(ServerContext* ctx, const Void*, Void*) override {
139     InstanceGuard g(this);
140     if (!g.Acquired()) {
141       return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
142     }
143 
144     worker_->MarkDone();
145     return Status::OK;
146   }
147 
148  private:
149   // Protect against multiple clients using this worker at once.
150   class InstanceGuard {
151    public:
InstanceGuard(WorkerServiceImpl * impl)152     InstanceGuard(WorkerServiceImpl* impl)
153         : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard()154     ~InstanceGuard() {
155       if (acquired_) {
156         impl_->ReleaseInstance();
157       }
158     }
159 
Acquired() const160     bool Acquired() const { return acquired_; }
161 
162    private:
163     WorkerServiceImpl* const impl_;
164     const bool acquired_;
165   };
166 
TryAcquireInstance()167   bool TryAcquireInstance() {
168     std::lock_guard<std::mutex> g(mu_);
169     if (acquired_) return false;
170     acquired_ = true;
171     return true;
172   }
173 
ReleaseInstance()174   void ReleaseInstance() {
175     std::lock_guard<std::mutex> g(mu_);
176     GPR_ASSERT(acquired_);
177     acquired_ = false;
178   }
179 
RunClientBody(ServerContext * ctx,ServerReaderWriter<ClientStatus,ClientArgs> * stream)180   Status RunClientBody(ServerContext* ctx,
181                        ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
182     ClientArgs args;
183     if (!stream->Read(&args)) {
184       return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
185     }
186     if (!args.has_setup()) {
187       return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
188     }
189     gpr_log(GPR_INFO, "RunClientBody: about to create client");
190     auto client = CreateClient(args.setup());
191     if (!client) {
192       return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
193     }
194     gpr_log(GPR_INFO, "RunClientBody: client created");
195     ClientStatus status;
196     if (!stream->Write(status)) {
197       return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
198     }
199     gpr_log(GPR_INFO, "RunClientBody: creation status reported");
200     while (stream->Read(&args)) {
201       gpr_log(GPR_INFO, "RunClientBody: Message read");
202       if (!args.has_mark()) {
203         gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
204         return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
205       }
206       *status.mutable_stats() = client->Mark(args.mark().reset());
207       if (!stream->Write(status)) {
208         return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
209       }
210       gpr_log(GPR_INFO, "RunClientBody: Mark response given");
211     }
212 
213     gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
214     client->AwaitThreadsCompletion();
215 
216     gpr_log(GPR_INFO, "RunClientBody: Returning");
217     return Status::OK;
218   }
219 
RunServerBody(ServerContext * ctx,ServerReaderWriter<ServerStatus,ServerArgs> * stream)220   Status RunServerBody(ServerContext* ctx,
221                        ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
222     ServerArgs args;
223     if (!stream->Read(&args)) {
224       return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
225     }
226     if (!args.has_setup()) {
227       return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
228     }
229     if (server_port_ > 0) {
230       args.mutable_setup()->set_port(server_port_);
231     }
232     gpr_log(GPR_INFO, "RunServerBody: about to create server");
233     auto server = CreateServer(args.setup());
234     if (g_inproc_servers != nullptr) {
235       g_inproc_servers->push_back(server.get());
236     }
237     if (!server) {
238       return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
239     }
240     gpr_log(GPR_INFO, "RunServerBody: server created");
241     ServerStatus status;
242     status.set_port(server->port());
243     status.set_cores(server->cores());
244     if (!stream->Write(status)) {
245       return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
246     }
247     gpr_log(GPR_INFO, "RunServerBody: creation status reported");
248     while (stream->Read(&args)) {
249       gpr_log(GPR_INFO, "RunServerBody: Message read");
250       if (!args.has_mark()) {
251         gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
252         return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
253       }
254       *status.mutable_stats() = server->Mark(args.mark().reset());
255       if (!stream->Write(status)) {
256         return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
257       }
258       gpr_log(GPR_INFO, "RunServerBody: Mark response given");
259     }
260 
261     gpr_log(GPR_INFO, "RunServerBody: Returning");
262     return Status::OK;
263   }
264 
265   std::mutex mu_;
266   bool acquired_;
267   int server_port_;
268   QpsWorker* worker_;
269 };
270 
QpsWorker(int driver_port,int server_port,const grpc::string & credential_type)271 QpsWorker::QpsWorker(int driver_port, int server_port,
272                      const grpc::string& credential_type) {
273   impl_.reset(new WorkerServiceImpl(server_port, this));
274   gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
275 
276   std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
277   if (driver_port >= 0) {
278     char* server_address = nullptr;
279     gpr_join_host_port(&server_address, "::", driver_port);
280     builder->AddListeningPort(
281         server_address,
282         GetCredentialsProvider()->GetServerCredentials(credential_type));
283     gpr_free(server_address);
284   }
285   builder->RegisterService(impl_.get());
286 
287   server_ = builder->BuildAndStart();
288 }
289 
~QpsWorker()290 QpsWorker::~QpsWorker() {}
291 
Done() const292 bool QpsWorker::Done() const {
293   return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0));
294 }
MarkDone()295 void QpsWorker::MarkDone() {
296   gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1));
297 }
298 }  // namespace testing
299 }  // namespace grpc
300