1 // std
2 #include <algorithm>
3 #include <cstdlib>
4 #include <iostream>
5 #include <mutex>
6 #include <queue>
7 #include <random>
8 #include <sstream>
9 #include <string>
10 #include <thread>
11 
12 // gnu-c
13 #include <sys/types.h>
14 #include <unistd.h>
15 
16 // usdt_sample_lib1
17 #include "usdt_sample_lib1/lib1.h"
18 
print_usage(int argc,char ** argv)19 void print_usage(int argc, char** argv)
20 {
21     std::cout << "Usage:" << std::endl;
22     std::cout << argv[0]
23               << " <InputPrefix> <InputMinimum (1-50)> <InputMaximum (1-50)> <CallsPerSec (1-50)> <MinimumLatencyMs (1-50)> <MaximumLatencyMs (1-50)>"
24               << std::endl;
25     std::cout << "InputPrefix: Prefix of the input string to the operation. Default: dummy" << std::endl;
26     std::cout << "InputMinimum: Minimum number to make the input string to the operation somewhat unique. Default: 1" << std::endl;
27     std::cout << "InputMaximum: Maximum number to make the input string to the operation somewhat unique. Default: 50" << std::endl;
28     std::cout << "CallsPerSec: Rate of calls to the operation. Default: 10" << std::endl;
29     std::cout << "MinimumLatencyMs: Minimum latency to apply to the operation. Default: 20" << std::endl;
30     std::cout << "MaximumLatencyMs: Maximum latency to apply to the operation. Default: 40" << std::endl;
31 }
32 
main(int argc,char ** argv)33 int main(int argc, char** argv)
34 {
35     std::string inputPrefix("dummy");
36     std::uint32_t inputMinimum = 1;
37     std::uint32_t inputMaximum = 50;
38     std::uint32_t callsPerSec = 10;
39     std::uint32_t minLatMs = 20;
40     std::uint32_t maxLatMs = 40;
41 
42     try {
43         if (argc > 1) {
44             inputPrefix = argv[1];
45         }
46 
47         if (argc > 2) {
48             inputMinimum = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[2]))));
49         }
50 
51         if (argc > 3) {
52             inputMaximum = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[3]))));
53         }
54 
55         if (argc > 4) {
56             callsPerSec = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[4]))));
57         }
58 
59         if (argc > 5) {
60             minLatMs = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[5]))));
61         }
62 
63         if (argc > 6) {
64             maxLatMs = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[6]))));
65         }
66     }
67     catch (const std::exception& exc) {
68         std::cout << "Exception while reading arguments: " << exc.what() << std::endl;
69         print_usage(argc, argv);
70         return -1;
71     }
72     catch (...) {
73         std::cout << "Unknown exception while reading arguments." << std::endl;
74         print_usage(argc, argv);
75         return -1;
76     }
77 
78     if (inputMinimum > inputMaximum) {
79         std::cout << "InputMinimum must be smaller than InputMaximum." << std::endl;
80         print_usage(argc, argv);
81         return -1;
82     }
83 
84     if (minLatMs > maxLatMs) {
85         std::cout << "MinimumLatencyMs must be smaller than MaximumLatencyMs." << std::endl;
86         print_usage(argc, argv);
87         return -1;
88     }
89 
90     std::cout << "Applying the following parameters:" << std::endl
91               << "Input prefix: " << inputPrefix << "." << std::endl
92               << "Input range: [" << inputMinimum << ", " << inputMaximum << "]." << std::endl
93               << "Calls Per Second: " << callsPerSec << "." << std::endl
94               << "Latency range: [" << minLatMs << ", " << maxLatMs << "] ms." << std::endl;
95 
96     const int sleepTimeMs = 1000 / callsPerSec;
97     OperationProvider op(minLatMs, maxLatMs);
98 
99     std::mutex queueMutex;
100     std::queue<std::shared_future<OperationResponse>> responseQueue;
101 
102     auto dequeueFuture = std::async(std::launch::async, [&]() {
103         while (true) {
104             bool empty = false;
105             {
106                 std::lock_guard<std::mutex> lg(queueMutex);
107                 empty = responseQueue.empty();
108             }
109 
110             if (empty) {
111                 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
112                 continue;
113             }
114 
115             responseQueue.front().get();
116 
117             // std::cout << "Removing item from queue." << std::endl;
118             std::lock_guard<std::mutex> lg(queueMutex);
119             responseQueue.pop();
120         }
121     });
122 
123     std::random_device rd;
124     std::uniform_int_distribution<> dis(inputMinimum, inputMaximum);
125 
126     std::cout << "You can now run the bcc scripts, see usdt_sample.md for examples." << std::endl;
127     std::cout << "pid: " << ::getpid() << std::endl;
128     std::cout << "Press ctrl-c to exit." << std::endl;
129     while (true) {
130         std::ostringstream inputOss;
131         inputOss << inputPrefix << "_" << dis(rd);
132         auto responseFuture = op.executeAsync(OperationRequest(inputOss.str()));
133 
134         {
135             std::lock_guard<std::mutex> lg(queueMutex);
136             responseQueue.push(responseFuture);
137         }
138 
139         // For a sample application, this is good enough to simulate callsPerSec.
140         std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
141     }
142 
143     dequeueFuture.get();
144     return 0;
145 }
146