1 // Use ALWAYS at the tag level. Control is performed manually during command
2 // line processing.
3 #define ATRACE_TAG ATRACE_TAG_ALWAYS
4 #include <utils/Trace.h>
5 
6 #include <base/files/file_util.h>
7 #include <base/logging.h>
8 #include <base/strings/string_split.h>
9 #include <errno.h>
10 #include <getopt.h>
11 #include <pdx/client.h>
12 #include <pdx/default_transport/client_channel_factory.h>
13 #include <pdx/default_transport/service_endpoint.h>
14 #include <pdx/rpc/buffer_wrapper.h>
15 #include <pdx/rpc/default_initialization_allocator.h>
16 #include <pdx/rpc/message_buffer.h>
17 #include <pdx/rpc/remote_method.h>
18 #include <pdx/rpc/serializable.h>
19 #include <pdx/service.h>
20 #include <sys/prctl.h>
21 #include <time.h>
22 #include <unistd.h>
23 
24 #include <atomic>
25 #include <cstdlib>
26 #include <functional>
27 #include <future>
28 #include <iomanip>
29 #include <ios>
30 #include <iostream>
31 #include <memory>
32 #include <numeric>
33 #include <sstream>
34 #include <string>
35 #include <thread>
36 #include <vector>
37 
38 using android::pdx::Channel;
39 using android::pdx::ClientBase;
40 using android::pdx::Endpoint;
41 using android::pdx::ErrorStatus;
42 using android::pdx::Message;
43 using android::pdx::Service;
44 using android::pdx::ServiceBase;
45 using android::pdx::default_transport::ClientChannelFactory;
46 using android::pdx::Status;
47 using android::pdx::Transaction;
48 using android::pdx::rpc::BufferWrapper;
49 using android::pdx::rpc::DefaultInitializationAllocator;
50 using android::pdx::rpc::MessageBuffer;
51 using android::pdx::rpc::DispatchRemoteMethod;
52 using android::pdx::rpc::RemoteMethodReturn;
53 using android::pdx::rpc::ReplyBuffer;
54 using android::pdx::rpc::Void;
55 using android::pdx::rpc::WrapBuffer;
56 
57 namespace {
58 
59 constexpr size_t kMaxMessageSize = 4096 * 1024;
60 
GetServicePath(const std::string & path,int instance_id)61 std::string GetServicePath(const std::string& path, int instance_id) {
62   return path + std::to_string(instance_id);
63 }
64 
SetThreadName(const std::string & name)65 void SetThreadName(const std::string& name) {
66   prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(name.c_str()), 0, 0, 0);
67 }
68 
69 constexpr uint64_t kNanosPerSecond = 1000000000LLU;
70 
GetClockNs()71 uint64_t GetClockNs() {
72   timespec t;
73   clock_gettime(CLOCK_MONOTONIC, &t);
74   return kNanosPerSecond * t.tv_sec + t.tv_nsec;
75 }
76 
77 template <typename T>
ssizeof(const T &)78 ssize_t ssizeof(const T&) {
79   return static_cast<ssize_t>(sizeof(T));
80 }
81 
82 class SchedStats {
83  public:
SchedStats()84   SchedStats() : SchedStats(gettid()) {}
SchedStats(pid_t task_id)85   explicit SchedStats(pid_t task_id) : task_id_(task_id) {}
86   SchedStats(const SchedStats&) = default;
87   SchedStats& operator=(const SchedStats&) = default;
88 
Update()89   void Update() {
90     const std::string stats_path =
91         "/proc/" + std::to_string(task_id_) + "/schedstat";
92 
93     std::string line;
94     base::ReadFileToString(base::FilePath{stats_path}, &line);
95     std::vector<std::string> stats = base::SplitString(
96         line, " ", base::TRIM_WHITESPACE, base::SPLIT_WANT_ALL);
97 
98     CHECK_EQ(3u, stats.size());
99 
100     // Calculate the deltas since the last update. Each value is absolute since
101     // the task started.
102     uint64_t current_cpu_time_ns = std::stoull(stats[0]);
103     uint64_t current_wait_ns = std::stoull(stats[1]);
104     uint64_t current_timeslices = std::stoull(stats[2]);
105     cpu_time_ns_ = current_cpu_time_ns - last_cpu_time_ns_;
106     wait_ns_ = current_wait_ns - last_wait_ns_;
107     timeslices_ = current_timeslices - last_timeslices_;
108     last_cpu_time_ns_ = current_cpu_time_ns;
109     last_wait_ns_ = current_wait_ns;
110     last_timeslices_ = current_timeslices;
111   }
112 
task_id() const113   pid_t task_id() const { return task_id_; }
cpu_time_ns() const114   uint64_t cpu_time_ns() const { return cpu_time_ns_; }
wait_ns() const115   uint64_t wait_ns() const { return wait_ns_; }
timeslices() const116   uint64_t timeslices() const { return timeslices_; }
117 
cpu_time_s() const118   double cpu_time_s() const {
119     return static_cast<double>(cpu_time_ns_) / kNanosPerSecond;
120   }
wait_s() const121   double wait_s() const {
122     return static_cast<double>(wait_ns_) / kNanosPerSecond;
123   }
124 
125  private:
126   int32_t task_id_;
127   uint64_t cpu_time_ns_ = 0;
128   uint64_t last_cpu_time_ns_ = 0;
129   uint64_t wait_ns_ = 0;
130   uint64_t last_wait_ns_ = 0;
131   uint64_t timeslices_ = 0;
132   uint64_t last_timeslices_ = 0;
133 
134   PDX_SERIALIZABLE_MEMBERS(SchedStats, task_id_, cpu_time_ns_, wait_ns_,
135                            timeslices_);
136 };
137 
138 // Opcodes for client/service protocol.
139 struct BenchmarkOps {
140   enum : int {
141     Nop,
142     Read,
143     Write,
144     Echo,
145     Stats,
146     WriteVector,
147     EchoVector,
148     Quit,
149   };
150 };
151 
152 struct BenchmarkRPC {
153   PDX_REMOTE_METHOD(Stats, BenchmarkOps::Stats,
154                     std::tuple<uint64_t, uint64_t, SchedStats>(Void));
155   PDX_REMOTE_METHOD(WriteVector, BenchmarkOps::WriteVector,
156                     int(const BufferWrapper<std::vector<uint8_t>> data));
157   PDX_REMOTE_METHOD(EchoVector, BenchmarkOps::EchoVector,
158                     BufferWrapper<std::vector<uint8_t>>(
159                         const BufferWrapper<std::vector<uint8_t>> data));
160 };
161 
162 struct BenchmarkResult {
163   int thread_id = 0;
164   int service_id = 0;
165   double time_delta_s = 0.0;
166   uint64_t bytes_sent = 0;
167   SchedStats sched_stats = {};
168 };
169 
170 // Global command line option values.
171 struct Options {
172   bool verbose = false;
173   int threads = 1;
174   int opcode = BenchmarkOps::Read;
175   int blocksize = 1;
176   int count = 1;
177   int instances = 1;
178   int timeout = 1;
179   int warmup = 0;
180 } ProgramOptions;
181 
182 // Command line option names.
183 const char kOptionService[] = "service";
184 const char kOptionClient[] = "client";
185 const char kOptionVerbose[] = "verbose";
186 const char kOptionOpcode[] = "op";
187 const char kOptionBlocksize[] = "bs";
188 const char kOptionCount[] = "count";
189 const char kOptionThreads[] = "threads";
190 const char kOptionInstances[] = "instances";
191 const char kOptionTimeout[] = "timeout";
192 const char kOptionTrace[] = "trace";
193 const char kOptionWarmup[] = "warmup";
194 
195 // getopt() long options.
196 static option long_options[] = {
197     {kOptionService, required_argument, 0, 0},
198     {kOptionClient, required_argument, 0, 0},
199     {kOptionVerbose, no_argument, 0, 0},
200     {kOptionOpcode, required_argument, 0, 0},
201     {kOptionBlocksize, required_argument, 0, 0},
202     {kOptionCount, required_argument, 0, 0},
203     {kOptionThreads, required_argument, 0, 0},
204     {kOptionInstances, required_argument, 0, 0},
205     {kOptionTimeout, required_argument, 0, 0},
206     {kOptionTrace, no_argument, 0, 0},
207     {kOptionWarmup, required_argument, 0, 0},
208     {0, 0, 0, 0},
209 };
210 
211 // Parses the argument for kOptionOpcode and sets the value of
212 // ProgramOptions.opcode.
ParseOpcodeOption(const std::string & argument)213 void ParseOpcodeOption(const std::string& argument) {
214   if (argument == "read") {
215     ProgramOptions.opcode = BenchmarkOps::Read;
216   } else if (argument == "write") {
217     ProgramOptions.opcode = BenchmarkOps::Write;
218   } else if (argument == "echo") {
219     ProgramOptions.opcode = BenchmarkOps::Echo;
220   } else if (argument == "writevec") {
221     ProgramOptions.opcode = BenchmarkOps::WriteVector;
222   } else if (argument == "echovec") {
223     ProgramOptions.opcode = BenchmarkOps::EchoVector;
224   } else if (argument == "quit") {
225     ProgramOptions.opcode = BenchmarkOps::Quit;
226   } else if (argument == "nop") {
227     ProgramOptions.opcode = BenchmarkOps::Nop;
228   } else if (argument == "stats") {
229     ProgramOptions.opcode = BenchmarkOps::Stats;
230   } else {
231     ProgramOptions.opcode = std::stoi(argument);
232   }
233 }
234 
235 // Implements the service side of the benchmark.
236 class BenchmarkService : public ServiceBase<BenchmarkService> {
237  public:
OnChannelOpen(Message & message)238   std::shared_ptr<Channel> OnChannelOpen(Message& message) override {
239     VLOG(1) << "BenchmarkService::OnChannelCreate: cid="
240             << message.GetChannelId();
241     return nullptr;
242   }
243 
OnChannelClose(Message & message,const std::shared_ptr<Channel> &)244   void OnChannelClose(Message& message,
245                       const std::shared_ptr<Channel>& /*channel*/) override {
246     VLOG(1) << "BenchmarkService::OnChannelClose: cid="
247             << message.GetChannelId();
248   }
249 
HandleMessage(Message & message)250   Status<void> HandleMessage(Message& message) override {
251     ATRACE_NAME("BenchmarkService::HandleMessage");
252 
253     switch (message.GetOp()) {
254       case BenchmarkOps::Nop:
255         VLOG(1) << "BenchmarkService::HandleMessage: op=nop";
256         {
257           ATRACE_NAME("Reply");
258           CHECK(message.Reply(0));
259         }
260         return {};
261 
262       case BenchmarkOps::Write: {
263         VLOG(1) << "BenchmarkService::HandleMessage: op=write send_length="
264                 << message.GetSendLength()
265                 << " receive_length=" << message.GetReceiveLength();
266 
267         Status<void> status;
268         if (message.GetSendLength())
269           status = message.ReadAll(send_buffer.data(), message.GetSendLength());
270 
271         {
272           ATRACE_NAME("Reply");
273           if (!status)
274             CHECK(message.ReplyError(status.error()));
275           else
276             CHECK(message.Reply(message.GetSendLength()));
277         }
278         return {};
279       }
280 
281       case BenchmarkOps::Read: {
282         VLOG(1) << "BenchmarkService::HandleMessage: op=read send_length="
283                 << message.GetSendLength()
284                 << " receive_length=" << message.GetReceiveLength();
285 
286         Status<void> status;
287         if (message.GetReceiveLength()) {
288           status = message.WriteAll(receive_buffer.data(),
289                                     message.GetReceiveLength());
290         }
291 
292         {
293           ATRACE_NAME("Reply");
294           if (!status)
295             CHECK(message.ReplyError(status.error()));
296           else
297             CHECK(message.Reply(message.GetReceiveLength()));
298         }
299         return {};
300       }
301 
302       case BenchmarkOps::Echo: {
303         VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
304                 << message.GetSendLength()
305                 << " receive_length=" << message.GetReceiveLength();
306 
307         Status<void> status;
308         if (message.GetSendLength())
309           status = message.ReadAll(send_buffer.data(), message.GetSendLength());
310 
311         if (!status) {
312           CHECK(message.ReplyError(status.error()));
313           return {};
314         }
315 
316         if (message.GetSendLength()) {
317           status =
318               message.WriteAll(send_buffer.data(), message.GetSendLength());
319         }
320 
321         {
322           ATRACE_NAME("Reply");
323           if (!status)
324             CHECK(message.ReplyError(status.error()));
325           else
326             CHECK(message.Reply(message.GetSendLength()));
327         }
328         return {};
329       }
330 
331       case BenchmarkOps::Stats: {
332         VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
333                 << message.GetSendLength()
334                 << " receive_length=" << message.GetReceiveLength();
335 
336         // Snapshot the stats when the message is received.
337         const uint64_t receive_time_ns = GetClockNs();
338         sched_stats_.Update();
339 
340         // Use the RPC system to return the results.
341         RemoteMethodReturn<BenchmarkRPC::Stats>(
342             message, BenchmarkRPC::Stats::Return{receive_time_ns, GetClockNs(),
343                                                  sched_stats_});
344         return {};
345       }
346 
347       case BenchmarkOps::WriteVector:
348         VLOG(1) << "BenchmarkService::HandleMessage: op=writevec send_length="
349                 << message.GetSendLength()
350                 << " receive_length=" << message.GetReceiveLength();
351 
352         DispatchRemoteMethod<BenchmarkRPC::WriteVector>(
353             *this, &BenchmarkService::OnWriteVector, message, kMaxMessageSize);
354         return {};
355 
356       case BenchmarkOps::EchoVector:
357         VLOG(1) << "BenchmarkService::HandleMessage: op=echovec send_length="
358                 << message.GetSendLength()
359                 << " receive_length=" << message.GetReceiveLength();
360 
361         DispatchRemoteMethod<BenchmarkRPC::EchoVector>(
362             *this, &BenchmarkService::OnEchoVector, message, kMaxMessageSize);
363         return {};
364 
365       case BenchmarkOps::Quit:
366         Cancel();
367         return ErrorStatus{ESHUTDOWN};
368 
369       default:
370         VLOG(1) << "BenchmarkService::HandleMessage: default case; op="
371                 << message.GetOp();
372         return Service::DefaultHandleMessage(message);
373     }
374   }
375 
376   // Updates the scheduler stats from procfs for this thread.
UpdateSchedStats()377   void UpdateSchedStats() { sched_stats_.Update(); }
378 
379  private:
380   friend BASE;
381 
BenchmarkService(std::unique_ptr<Endpoint> endpoint)382   explicit BenchmarkService(std::unique_ptr<Endpoint> endpoint)
383       : BASE("BenchmarkService", std::move(endpoint)),
384         send_buffer(kMaxMessageSize),
385         receive_buffer(kMaxMessageSize) {}
386 
387   std::vector<uint8_t> send_buffer;
388   std::vector<uint8_t> receive_buffer;
389 
390   // Each service thread has its own scheduler stats object.
391   static thread_local SchedStats sched_stats_;
392 
393   using BufferType = BufferWrapper<
394       std::vector<uint8_t, DefaultInitializationAllocator<uint8_t>>>;
395 
OnWriteVector(Message &,const BufferType & data)396   int OnWriteVector(Message&, const BufferType& data) { return data.size(); }
OnEchoVector(Message &,BufferType && data)397   BufferType OnEchoVector(Message&, BufferType&& data) {
398     return std::move(data);
399   }
400 
401   BenchmarkService(const BenchmarkService&) = delete;
402   void operator=(const BenchmarkService&) = delete;
403 };
404 
405 thread_local SchedStats BenchmarkService::sched_stats_;
406 
407 // Implements the client side of the benchmark.
408 class BenchmarkClient : public ClientBase<BenchmarkClient> {
409  public:
Nop()410   int Nop() {
411     ATRACE_NAME("BenchmarkClient::Nop");
412     VLOG(1) << "BenchmarkClient::Nop";
413     Transaction transaction{*this};
414     return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Nop));
415   }
416 
Write(const void * buffer,size_t length)417   int Write(const void* buffer, size_t length) {
418     ATRACE_NAME("BenchmarkClient::Write");
419     VLOG(1) << "BenchmarkClient::Write: buffer=" << buffer
420             << " length=" << length;
421     Transaction transaction{*this};
422     return ReturnStatusOrError(
423         transaction.Send<int>(BenchmarkOps::Write, buffer, length, nullptr, 0));
424     // return write(endpoint_fd(), buffer, length);
425   }
426 
Read(void * buffer,size_t length)427   int Read(void* buffer, size_t length) {
428     ATRACE_NAME("BenchmarkClient::Read");
429     VLOG(1) << "BenchmarkClient::Read: buffer=" << buffer
430             << " length=" << length;
431     Transaction transaction{*this};
432     return ReturnStatusOrError(
433         transaction.Send<int>(BenchmarkOps::Read, nullptr, 0, buffer, length));
434     // return read(endpoint_fd(), buffer, length);
435   }
436 
Echo(const void * send_buffer,size_t send_length,void * receive_buffer,size_t receive_length)437   int Echo(const void* send_buffer, size_t send_length, void* receive_buffer,
438            size_t receive_length) {
439     ATRACE_NAME("BenchmarkClient::Echo");
440     VLOG(1) << "BenchmarkClient::Echo: send_buffer=" << send_buffer
441             << " send_length=" << send_length
442             << " receive_buffer=" << receive_buffer
443             << " receive_length=" << receive_length;
444     Transaction transaction{*this};
445     return ReturnStatusOrError(
446         transaction.Send<int>(BenchmarkOps::Echo, send_buffer, send_length,
447                               receive_buffer, receive_length));
448   }
449 
Stats(std::tuple<uint64_t,uint64_t,SchedStats> * stats_out)450   int Stats(std::tuple<uint64_t, uint64_t, SchedStats>* stats_out) {
451     ATRACE_NAME("BenchmarkClient::Stats");
452     VLOG(1) << "BenchmarkClient::Stats";
453 
454     auto status = InvokeRemoteMethodInPlace<BenchmarkRPC::Stats>(stats_out);
455     return status ? 0 : -status.error();
456   }
457 
WriteVector(const BufferWrapper<std::vector<uint8_t>> & data)458   int WriteVector(const BufferWrapper<std::vector<uint8_t>>& data) {
459     ATRACE_NAME("BenchmarkClient::Stats");
460     VLOG(1) << "BenchmarkClient::Stats";
461 
462     auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
463     return ReturnStatusOrError(status);
464   }
465 
466   template <typename T>
WriteVector(const BufferWrapper<T> & data)467   int WriteVector(const BufferWrapper<T>& data) {
468     ATRACE_NAME("BenchmarkClient::WriteVector");
469     VLOG(1) << "BenchmarkClient::WriteVector";
470 
471     auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
472     return ReturnStatusOrError(status);
473   }
474 
475   template <typename T, typename U>
EchoVector(const BufferWrapper<T> & data,BufferWrapper<U> * data_out)476   int EchoVector(const BufferWrapper<T>& data, BufferWrapper<U>* data_out) {
477     ATRACE_NAME("BenchmarkClient::EchoVector");
478     VLOG(1) << "BenchmarkClient::EchoVector";
479 
480     MessageBuffer<ReplyBuffer>::Reserve(kMaxMessageSize - 1);
481     auto status =
482         InvokeRemoteMethodInPlace<BenchmarkRPC::EchoVector>(data_out, data);
483     return status ? 0 : -status.error();
484   }
485 
Quit()486   int Quit() {
487     VLOG(1) << "BenchmarkClient::Quit";
488     Transaction transaction{*this};
489     return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Echo));
490   }
491 
492  private:
493   friend BASE;
494 
BenchmarkClient(const std::string & service_path)495   explicit BenchmarkClient(const std::string& service_path)
496       : BASE(ClientChannelFactory::Create(service_path),
497              ProgramOptions.timeout) {}
498 
499   BenchmarkClient(const BenchmarkClient&) = delete;
500   void operator=(const BenchmarkClient&) = delete;
501 };
502 
503 // Creates a benchmark service at |path| and dispatches messages.
ServiceCommand(const std::string & path)504 int ServiceCommand(const std::string& path) {
505   if (path.empty())
506     return -EINVAL;
507 
508   // Start the requested number of dispatch threads.
509   std::vector<std::thread> dispatch_threads;
510   int service_count = ProgramOptions.instances;
511   int service_id_counter = 0;
512   int thread_id_counter = 0;
513   std::atomic<bool> done(false);
514 
515   while (service_count--) {
516     std::cerr << "Starting service instance " << service_id_counter
517               << std::endl;
518     auto service = BenchmarkService::Create(
519         android::pdx::default_transport::Endpoint::CreateAndBindSocket(
520             GetServicePath(path, service_id_counter),
521             android::pdx::default_transport::Endpoint::kBlocking));
522     if (!service) {
523       std::cerr << "Failed to create service instance!!" << std::endl;
524       done = true;
525       break;
526     }
527 
528     int thread_count = ProgramOptions.threads;
529     while (thread_count--) {
530       std::cerr << "Starting dispatch thread " << thread_id_counter
531                 << " service " << service_id_counter << std::endl;
532 
533       dispatch_threads.emplace_back(
534           [&](const int thread_id, const int service_id,
535               const std::shared_ptr<BenchmarkService>& local_service) {
536             SetThreadName("service" + std::to_string(service_id));
537 
538             // Read the initial schedstats for this thread from procfs.
539             local_service->UpdateSchedStats();
540 
541             ATRACE_NAME("BenchmarkService::Dispatch");
542             while (!done) {
543               auto ret = local_service->ReceiveAndDispatch();
544               if (!ret) {
545                 if (ret.error() != ESHUTDOWN) {
546                   std::cerr << "Error while dispatching message on thread "
547                             << thread_id << " service " << service_id << ": "
548                             << ret.GetErrorMessage() << std::endl;
549                 } else {
550                   std::cerr << "Quitting thread " << thread_id << " service "
551                             << service_id << std::endl;
552                 }
553                 done = true;
554                 return;
555               }
556             }
557           },
558           thread_id_counter++, service_id_counter, service);
559     }
560 
561     service_id_counter++;
562   }
563 
564   // Wait for the dispatch threads to exit.
565   for (auto& thread : dispatch_threads) {
566     thread.join();
567   }
568 
569   return 0;
570 }
571 
ClientCommand(const std::string & path)572 int ClientCommand(const std::string& path) {
573   // Start the requested number of client threads.
574   std::vector<std::thread> client_threads;
575   std::vector<std::future<BenchmarkResult>> client_results;
576   int service_count = ProgramOptions.instances;
577   int thread_id_counter = 0;
578   int service_id_counter = 0;
579 
580   // Aggregate statistics, updated when worker threads exit.
581   std::atomic<uint64_t> total_bytes(0);
582   std::atomic<uint64_t> total_time_ns(0);
583 
584   // Samples for variance calculation.
585   std::vector<uint64_t> latency_samples_ns(
586       ProgramOptions.instances * ProgramOptions.threads * ProgramOptions.count);
587   const size_t samples_per_thread = ProgramOptions.count;
588 
589   std::vector<uint8_t> send_buffer(ProgramOptions.blocksize);
590   std::vector<uint8_t> receive_buffer(kMaxMessageSize);
591 
592   // Barriers for synchronizing thread start.
593   std::vector<std::future<void>> ready_barrier_futures;
594   std::promise<void> go_barrier_promise;
595   std::future<void> go_barrier_future = go_barrier_promise.get_future();
596 
597   // Barrier for synchronizing thread tear down.
598   std::promise<void> done_barrier_promise;
599   std::future<void> done_barrier_future = done_barrier_promise.get_future();
600 
601   while (service_count--) {
602     int thread_count = ProgramOptions.threads;
603     while (thread_count--) {
604       std::cerr << "Starting client thread " << thread_id_counter << " service "
605                 << service_id_counter << std::endl;
606 
607       std::promise<BenchmarkResult> result_promise;
608       client_results.push_back(result_promise.get_future());
609 
610       std::promise<void> ready_barrier_promise;
611       ready_barrier_futures.push_back(ready_barrier_promise.get_future());
612 
613       client_threads.emplace_back(
614           [&](const int thread_id, const int service_id,
615               std::promise<BenchmarkResult> result, std::promise<void> ready) {
616             SetThreadName("client" + std::to_string(thread_id) + "/" +
617                           std::to_string(service_id));
618 
619             ATRACE_NAME("BenchmarkClient::Dispatch");
620 
621             auto client =
622                 BenchmarkClient::Create(GetServicePath(path, service_id));
623             if (!client) {
624               std::cerr << "Failed to create client for service " << service_id
625                         << std::endl;
626               return -ENOMEM;
627             }
628 
629             uint64_t* thread_samples =
630                 &latency_samples_ns[samples_per_thread * thread_id];
631 
632             // Per-thread statistics.
633             uint64_t bytes_sent = 0;
634             uint64_t time_start_ns;
635             uint64_t time_end_ns;
636             SchedStats sched_stats;
637 
638             // Signal ready and wait for go.
639             ready.set_value();
640             go_barrier_future.wait();
641 
642             // Warmup the scheduler.
643             int warmup = ProgramOptions.warmup;
644             while (warmup--) {
645               for (int i = 0; i < 1000000; i++)
646                 ;
647             }
648 
649             sched_stats.Update();
650             time_start_ns = GetClockNs();
651 
652             int count = ProgramOptions.count;
653             while (count--) {
654               uint64_t iteration_start_ns = GetClockNs();
655 
656               switch (ProgramOptions.opcode) {
657                 case BenchmarkOps::Nop: {
658                   const int ret = client->Nop();
659                   if (ret < 0) {
660                     std::cerr << "Failed to send nop: " << strerror(-ret)
661                               << std::endl;
662                     return ret;
663                   } else {
664                     VLOG(1) << "Success";
665                   }
666                   break;
667                 }
668 
669                 case BenchmarkOps::Read: {
670                   const int ret = client->Read(receive_buffer.data(),
671                                                ProgramOptions.blocksize);
672                   if (ret < 0) {
673                     std::cerr << "Failed to read: " << strerror(-ret)
674                               << std::endl;
675                     return ret;
676                   } else if (ret != ProgramOptions.blocksize) {
677                     std::cerr << "Expected ret=" << ProgramOptions.blocksize
678                               << "; actual ret=" << ret << std::endl;
679                     return -EINVAL;
680                   } else {
681                     VLOG(1) << "Success";
682                     bytes_sent += ret;
683                   }
684                   break;
685                 }
686 
687                 case BenchmarkOps::Write: {
688                   const int ret =
689                       client->Write(send_buffer.data(), send_buffer.size());
690                   if (ret < 0) {
691                     std::cerr << "Failed to write: " << strerror(-ret)
692                               << std::endl;
693                     return ret;
694                   } else if (ret != ProgramOptions.blocksize) {
695                     std::cerr << "Expected ret=" << ProgramOptions.blocksize
696                               << "; actual ret=" << ret << std::endl;
697                     return -EINVAL;
698                   } else {
699                     VLOG(1) << "Success";
700                     bytes_sent += ret;
701                   }
702                   break;
703                 }
704 
705                 case BenchmarkOps::Echo: {
706                   const int ret = client->Echo(
707                       send_buffer.data(), send_buffer.size(),
708                       receive_buffer.data(), receive_buffer.size());
709                   if (ret < 0) {
710                     std::cerr << "Failed to echo: " << strerror(-ret)
711                               << std::endl;
712                     return ret;
713                   } else if (ret != ProgramOptions.blocksize) {
714                     std::cerr << "Expected ret=" << ProgramOptions.blocksize
715                               << "; actual ret=" << ret << std::endl;
716                     return -EINVAL;
717                   } else {
718                     VLOG(1) << "Success";
719                     bytes_sent += ret * 2;
720                   }
721                   break;
722                 }
723 
724                 case BenchmarkOps::Stats: {
725                   std::tuple<uint64_t, uint64_t, SchedStats> stats;
726                   const int ret = client->Stats(&stats);
727                   if (ret < 0) {
728                     std::cerr << "Failed to get stats: " << strerror(-ret)
729                               << std::endl;
730                     return ret;
731                   } else {
732                     VLOG(1) << "Success";
733                     std::cerr
734                         << "Round trip: receive_time_ns=" << std::get<0>(stats)
735                         << " reply_time_ns=" << std::get<1>(stats)
736                         << " cpu_time_s=" << std::get<2>(stats).cpu_time_s()
737                         << " wait_s=" << std::get<2>(stats).wait_s()
738                         << std::endl;
739                   }
740                   break;
741                 }
742 
743                 case BenchmarkOps::WriteVector: {
744                   const int ret = client->WriteVector(
745                       WrapBuffer(send_buffer.data(), ProgramOptions.blocksize));
746                   if (ret < 0) {
747                     std::cerr << "Failed to write vector: " << strerror(-ret)
748                               << std::endl;
749                     return ret;
750                   } else {
751                     VLOG(1) << "Success";
752                     bytes_sent += ret;
753                   }
754                   break;
755                 }
756 
757                 case BenchmarkOps::EchoVector: {
758                   thread_local BufferWrapper<std::vector<
759                       uint8_t, DefaultInitializationAllocator<uint8_t>>>
760                       response_buffer;
761                   const int ret = client->EchoVector(
762                       WrapBuffer(send_buffer.data(), ProgramOptions.blocksize),
763                       &response_buffer);
764                   if (ret < 0) {
765                     std::cerr << "Failed to echo vector: " << strerror(-ret)
766                               << std::endl;
767                     return ret;
768                   } else {
769                     VLOG(1) << "Success";
770                     bytes_sent += send_buffer.size() + response_buffer.size();
771                   }
772                   break;
773                 }
774 
775                 case BenchmarkOps::Quit: {
776                   const int ret = client->Quit();
777                   if (ret < 0 && ret != -ESHUTDOWN) {
778                     std::cerr << "Failed to send quit: " << strerror(-ret);
779                     return ret;
780                   } else {
781                     VLOG(1) << "Success";
782                   }
783                   break;
784                 }
785 
786                 default:
787                   std::cerr
788                       << "Invalid client operation: " << ProgramOptions.opcode
789                       << std::endl;
790                   return -EINVAL;
791               }
792 
793               uint64_t iteration_end_ns = GetClockNs();
794               uint64_t iteration_delta_ns =
795                   iteration_end_ns - iteration_start_ns;
796               thread_samples[count] = iteration_delta_ns;
797 
798               if (iteration_delta_ns > (kNanosPerSecond / 100)) {
799                 SchedStats stats = sched_stats;
800                 stats.Update();
801                 std::cerr << "Thread " << thread_id << " iteration_delta_s="
802                           << (static_cast<double>(iteration_delta_ns) /
803                               kNanosPerSecond)
804                           << " " << stats.cpu_time_s() << " " << stats.wait_s()
805                           << std::endl;
806               }
807             }
808 
809             time_end_ns = GetClockNs();
810             sched_stats.Update();
811 
812             const double time_delta_s =
813                 static_cast<double>(time_end_ns - time_start_ns) /
814                 kNanosPerSecond;
815 
816             total_bytes += bytes_sent;
817             total_time_ns += time_end_ns - time_start_ns;
818 
819             result.set_value(
820                 {thread_id, service_id, time_delta_s, bytes_sent, sched_stats});
821             done_barrier_future.wait();
822 
823             return 0;
824           },
825           thread_id_counter++, service_id_counter, std::move(result_promise),
826           std::move(ready_barrier_promise));
827     }
828 
829     service_id_counter++;
830   }
831 
832   // Wait for workers to be ready.
833   std::cerr << "Waiting for workers to be ready..." << std::endl;
834   for (auto& ready : ready_barrier_futures)
835     ready.wait();
836 
837   // Signal workers to go.
838   std::cerr << "Kicking off benchmark." << std::endl;
839   go_barrier_promise.set_value();
840 
841   // Wait for all the worker threas to finish.
842   for (auto& result : client_results)
843     result.wait();
844 
845   // Report worker thread results.
846   for (auto& result : client_results) {
847     BenchmarkResult benchmark_result = result.get();
848     std::cerr << std::fixed << "Thread " << benchmark_result.thread_id
849               << " service " << benchmark_result.service_id << ":" << std::endl;
850     std::cerr << "\t " << benchmark_result.bytes_sent << " bytes in "
851               << benchmark_result.time_delta_s << " seconds ("
852               << std::setprecision(0) << (benchmark_result.bytes_sent / 1024.0 /
853                                           benchmark_result.time_delta_s)
854               << " K/s; " << std::setprecision(3)
855               << (ProgramOptions.count / benchmark_result.time_delta_s)
856               << " txn/s; " << std::setprecision(9)
857               << (benchmark_result.time_delta_s / ProgramOptions.count)
858               << " s/txn)" << std::endl;
859     std::cerr << "\tStats: " << benchmark_result.sched_stats.cpu_time_s() << " "
860               << (benchmark_result.sched_stats.cpu_time_s() /
861                   ProgramOptions.count)
862               << " " << benchmark_result.sched_stats.wait_s() << " "
863               << (benchmark_result.sched_stats.wait_s() / ProgramOptions.count)
864               << " " << benchmark_result.sched_stats.timeslices() << std::endl;
865   }
866 
867   // Signal worker threads to exit.
868   done_barrier_promise.set_value();
869 
870   // Wait for the worker threads to exit.
871   for (auto& thread : client_threads) {
872     thread.join();
873   }
874 
875   // Report aggregate results.
876   const int total_threads = ProgramOptions.threads * ProgramOptions.instances;
877   const int iterations = ProgramOptions.count;
878   const double total_time_s =
879       static_cast<double>(total_time_ns) / kNanosPerSecond;
880   // This is about how much wall time it took to completely transfer all the
881   // paylaods.
882   const double average_time_s = total_time_s / total_threads;
883 
884   const uint64_t min_sample_time_ns =
885       *std::min_element(latency_samples_ns.begin(), latency_samples_ns.end());
886   const double min_sample_time_s =
887       static_cast<double>(min_sample_time_ns) / kNanosPerSecond;
888 
889   const uint64_t max_sample_time_ns =
890       *std::max_element(latency_samples_ns.begin(), latency_samples_ns.end());
891   const double max_sample_time_s =
892       static_cast<double>(max_sample_time_ns) / kNanosPerSecond;
893 
894   const double total_sample_time_s =
895       std::accumulate(latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
896                       [](double s, uint64_t ns) {
897                         return s + static_cast<double>(ns) / kNanosPerSecond;
898                       });
899   const double average_sample_time_s =
900       total_sample_time_s / latency_samples_ns.size();
901 
902   const double sum_of_squared_deviations = std::accumulate(
903       latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
904       [&](double s, uint64_t ns) {
905         const double delta =
906             static_cast<double>(ns) / kNanosPerSecond - average_sample_time_s;
907         return s + delta * delta;
908       });
909   const double variance = sum_of_squared_deviations / latency_samples_ns.size();
910   const double standard_deviation = std::sqrt(variance);
911 
912   const int num_buckets = 200;
913   const uint64_t sample_range_ns = max_sample_time_ns - min_sample_time_ns;
914   const uint64_t ns_per_bucket = sample_range_ns / num_buckets;
915   std::array<uint64_t, num_buckets> sample_buckets = {{0}};
916 
917   // Count samples in each bucket range.
918   for (uint64_t sample_ns : latency_samples_ns) {
919     sample_buckets[(sample_ns - min_sample_time_ns) / (ns_per_bucket + 1)] += 1;
920   }
921 
922   // Calculate population percentiles.
923   const uint64_t percent_50 =
924       static_cast<uint64_t>(latency_samples_ns.size() * 0.5);
925   const uint64_t percent_90 =
926       static_cast<uint64_t>(latency_samples_ns.size() * 0.9);
927   const uint64_t percent_95 =
928       static_cast<uint64_t>(latency_samples_ns.size() * 0.95);
929   const uint64_t percent_99 =
930       static_cast<uint64_t>(latency_samples_ns.size() * 0.99);
931 
932   uint64_t sample_count = 0;
933   double latency_50th_percentile_s, latency_90th_percentile_s,
934       latency_95th_percentile_s, latency_99th_percentile_s;
935   for (int i = 0; i < num_buckets; i++) {
936     // Report the midpoint of the bucket range as the value of the
937     // corresponding
938     // percentile.
939     const double bucket_midpoint_time_s =
940         (ns_per_bucket * i + 0.5 * ns_per_bucket + min_sample_time_ns) /
941         kNanosPerSecond;
942     if (sample_count < percent_50 &&
943         (sample_count + sample_buckets[i]) >= percent_50) {
944       latency_50th_percentile_s = bucket_midpoint_time_s;
945     }
946     if (sample_count < percent_90 &&
947         (sample_count + sample_buckets[i]) >= percent_90) {
948       latency_90th_percentile_s = bucket_midpoint_time_s;
949     }
950     if (sample_count < percent_95 &&
951         (sample_count + sample_buckets[i]) >= percent_95) {
952       latency_95th_percentile_s = bucket_midpoint_time_s;
953     }
954     if (sample_count < percent_99 &&
955         (sample_count + sample_buckets[i]) >= percent_99) {
956       latency_99th_percentile_s = bucket_midpoint_time_s;
957     }
958     sample_count += sample_buckets[i];
959   }
960 
961   std::cerr << std::fixed << "Total throughput over " << total_threads
962             << " threads:\n\t " << total_bytes << " bytes in " << average_time_s
963             << " seconds (" << std::setprecision(0)
964             << (total_bytes / 1024.0 / average_time_s) << " K/s; "
965             << std::setprecision(3)
966             << (iterations * total_threads / average_time_s)
967             << std::setprecision(9) << " txn/s; "
968             << (average_time_s / (iterations * total_threads)) << " s/txn)"
969             << std::endl;
970   std::cerr << "Sample statistics: " << std::endl;
971   std::cerr << total_sample_time_s << " s total sample time" << std::endl;
972   std::cerr << average_sample_time_s << " s avg" << std::endl;
973   std::cerr << standard_deviation << " s std dev" << std::endl;
974   std::cerr << min_sample_time_s << " s min" << std::endl;
975   std::cerr << max_sample_time_s << " s max" << std::endl;
976   std::cerr << "Latency percentiles:" << std::endl;
977   std::cerr << "50th: " << latency_50th_percentile_s << " s" << std::endl;
978   std::cerr << "90th: " << latency_90th_percentile_s << " s" << std::endl;
979   std::cerr << "95th: " << latency_95th_percentile_s << " s" << std::endl;
980   std::cerr << "99th: " << latency_99th_percentile_s << " s" << std::endl;
981 
982   std::cout << total_time_ns << " " << std::fixed << std::setprecision(9)
983             << average_sample_time_s << " " << std::fixed
984             << std::setprecision(9) << standard_deviation << std::endl;
985   return 0;
986 }
987 
Usage(const std::string & command_name)988 int Usage(const std::string& command_name) {
989   // clang-format off
990   std::cout << "Usage: " << command_name << " [options]" << std::endl;
991   std::cout << "\t--verbose                   : Use verbose messages." << std::endl;
992   std::cout << "\t--service <endpoint path>   : Start service at the given path." << std::endl;
993   std::cout << "\t--client <endpoint path>    : Start client to the given path." << std::endl;
994   std::cout << "\t--op <read | write | echo>  : Sepcify client operation mode." << std::endl;
995   std::cout << "\t--bs <block size bytes>     : Sepcify block size to use." << std::endl;
996   std::cout << "\t--count <count>             : Sepcify number of transactions to make." << std::endl;
997   std::cout << "\t--instances <count>         : Specify number of service instances." << std::endl;
998   std::cout << "\t--threads <count>           : Sepcify number of threads per instance." << std::endl;
999   std::cout << "\t--timeout <timeout ms | -1> : Timeout to wait for services." << std::endl;
1000   std::cout << "\t--trace                     : Enable systrace logging." << std::endl;
1001   std::cout << "\t--warmup <iterations>       : Busy loops before running benchmarks." << std::endl;
1002   // clang-format on
1003   return -1;
1004 }
1005 
1006 }  // anonymous namespace
1007 
main(int argc,char ** argv)1008 int main(int argc, char** argv) {
1009   logging::LoggingSettings logging_settings;
1010   logging_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
1011   logging::InitLogging(logging_settings);
1012 
1013   int getopt_code;
1014   int option_index;
1015   std::string option = "";
1016   std::string command = "";
1017   std::string command_argument = "";
1018   bool tracing_enabled = false;
1019 
1020   // Process command line options.
1021   while ((getopt_code =
1022               getopt_long(argc, argv, "", long_options, &option_index)) != -1) {
1023     option = long_options[option_index].name;
1024     VLOG(1) << "option=" << option;
1025     switch (getopt_code) {
1026       case 0:
1027         if (option == kOptionVerbose) {
1028           ProgramOptions.verbose = true;
1029           logging::SetMinLogLevel(-1);
1030         } else if (option == kOptionOpcode) {
1031           ParseOpcodeOption(optarg);
1032         } else if (option == kOptionBlocksize) {
1033           ProgramOptions.blocksize = std::stoi(optarg);
1034           if (ProgramOptions.blocksize < 0) {
1035             std::cerr << "Invalid blocksize argument: "
1036                       << ProgramOptions.blocksize << std::endl;
1037             return -EINVAL;
1038           }
1039         } else if (option == kOptionCount) {
1040           ProgramOptions.count = std::stoi(optarg);
1041           if (ProgramOptions.count < 1) {
1042             std::cerr << "Invalid count argument: " << ProgramOptions.count
1043                       << std::endl;
1044             return -EINVAL;
1045           }
1046         } else if (option == kOptionThreads) {
1047           ProgramOptions.threads = std::stoi(optarg);
1048           if (ProgramOptions.threads < 1) {
1049             std::cerr << "Invalid threads argument: " << ProgramOptions.threads
1050                       << std::endl;
1051             return -EINVAL;
1052           }
1053         } else if (option == kOptionInstances) {
1054           ProgramOptions.instances = std::stoi(optarg);
1055           if (ProgramOptions.instances < 1) {
1056             std::cerr << "Invalid instances argument: "
1057                       << ProgramOptions.instances << std::endl;
1058             return -EINVAL;
1059           }
1060         } else if (option == kOptionTimeout) {
1061           ProgramOptions.timeout = std::stoi(optarg);
1062         } else if (option == kOptionTrace) {
1063           tracing_enabled = true;
1064         } else if (option == kOptionWarmup) {
1065           ProgramOptions.warmup = std::stoi(optarg);
1066         } else {
1067           command = option;
1068           if (optarg)
1069             command_argument = optarg;
1070         }
1071         break;
1072     }
1073   }
1074 
1075   // Setup ATRACE/systrace based on command line.
1076   atrace_setup();
1077   atrace_set_tracing_enabled(tracing_enabled);
1078 
1079   VLOG(1) << "command=" << command << " command_argument=" << command_argument;
1080 
1081   if (command == "") {
1082     return Usage(argv[0]);
1083   } else if (command == kOptionService) {
1084     return ServiceCommand(command_argument);
1085   } else if (command == kOptionClient) {
1086     return ClientCommand(command_argument);
1087   } else {
1088     return Usage(argv[0]);
1089   }
1090 }
1091