1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <math.h>
18 #include <stdint.h>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <chrono>
23 #include <list>
24 #include <random>
25 #include <thread>
26 
27 #include "perfetto/base/time.h"
28 #include "perfetto/ext/base/file_utils.h"
29 #include "perfetto/ext/base/string_utils.h"
30 #include "perfetto/tracing.h"
31 
32 #include "protos/perfetto/config/stress_test_config.gen.h"
33 #include "protos/perfetto/trace/test_event.pbzero.h"
34 
35 using StressTestConfig = perfetto::protos::gen::StressTestConfig;
36 
37 namespace perfetto {
38 namespace {
39 
40 StressTestConfig* g_cfg;
41 
42 class StressTestDataSource : public DataSource<StressTestDataSource> {
43  public:
44   constexpr static BufferExhaustedPolicy kBufferExhaustedPolicy =
45       BufferExhaustedPolicy::kStall;
46 
47   void OnSetup(const SetupArgs& args) override;
48   void OnStart(const StartArgs&) override;
49   void OnStop(const StopArgs&) override;
50 
51  private:
52   class Worker {
53    public:
Worker(uint32_t id)54     explicit Worker(uint32_t id) : id_(id) {}
55     void Start();
56     void Stop();
~Worker()57     ~Worker() { Stop(); }
58 
59    private:
60     void WorkerMain(uint32_t worker_id);
61     void FillPayload(const StressTestConfig::WriterTiming&,
62                      uint32_t seq,
63                      uint32_t nesting,
64                      protos::pbzero::TestEvent::TestPayload*);
65 
66     const uint32_t id_;
67     std::thread thread_;
68     std::atomic<bool> quit_;
69     std::minstd_rand0 rnd_seq_;
70 
71     // Use a different engine for the generation of random value, keep rnd_seq_
72     // dedicated to generating deterministic sequences.
73     std::minstd_rand0 rnd_gen_;
74   };
75 
76   std::list<Worker> workers_;
77 };
78 
79 // Called before the tracing session starts.
OnSetup(const SetupArgs &)80 void StressTestDataSource::OnSetup(const SetupArgs&) {
81   for (uint32_t i = 0; i < std::max(g_cfg->num_threads(), 1u); ++i)
82     workers_.emplace_back(i);
83 }
84 
85 // Called when the tracing session starts.
OnStart(const StartArgs &)86 void StressTestDataSource::OnStart(const StartArgs&) {
87   for (auto& worker : workers_)
88     worker.Start();
89 }
90 
91 // Called when the tracing session ends.
OnStop(const StopArgs &)92 void StressTestDataSource::OnStop(const StopArgs&) {
93   for (auto& worker : workers_)
94     worker.Stop();
95   workers_.clear();
96 }
97 
Start()98 void StressTestDataSource::Worker::Start() {
99   quit_.store(false);
100   thread_ = std::thread(&StressTestDataSource::Worker::WorkerMain, this, id_);
101 }
102 
Stop()103 void StressTestDataSource::Worker::Stop() {
104   if (!thread_.joinable() || quit_)
105     return;
106   PERFETTO_DLOG("Stopping worker %u", id_);
107   quit_.store(true);
108   thread_.join();
109 }
110 
WorkerMain(uint32_t worker_id)111 void StressTestDataSource::Worker::WorkerMain(uint32_t worker_id) {
112   PERFETTO_DLOG("Worker %u starting", worker_id);
113   rnd_seq_ = std::minstd_rand0(0);
114   int64_t t_start = base::GetBootTimeNs().count();
115   int64_t num_msgs = 0;
116 
117   const int64_t max_msgs = g_cfg->max_events()
118                                ? static_cast<int64_t>(g_cfg->max_events())
119                                : INT64_MAX;
120   bool is_last = false;
121   while (!is_last) {
122     is_last = quit_ || ++num_msgs >= max_msgs;
123 
124     const int64_t now = base::GetBootTimeNs().count();
125     const auto elapsed_ms = static_cast<uint64_t>((now - t_start) / 1000000);
126 
127     const auto* timings = &g_cfg->steady_state_timings();
128     if (g_cfg->burst_period_ms() &&
129         elapsed_ms % g_cfg->burst_period_ms() >
130             (g_cfg->burst_period_ms() - g_cfg->burst_duration_ms())) {
131       timings = &g_cfg->burst_timings();
132     }
133     std::normal_distribution<> rate_dist{timings->rate_mean(),
134                                          timings->rate_stddev()};
135 
136     double period_ns = 1e9 / rate_dist(rnd_gen_);
137     period_ns = isnan(period_ns) || period_ns == 0.0 ? 1 : period_ns;
138     double expected_msgs = static_cast<double>(now - t_start) / period_ns;
139     int64_t delay_ns = 0;
140     if (static_cast<int64_t>(expected_msgs) < num_msgs)
141       delay_ns = static_cast<int64_t>(period_ns);
142     std::this_thread::sleep_for(
143         std::chrono::nanoseconds(static_cast<int64_t>(delay_ns)));
144 
145     StressTestDataSource::Trace([&](StressTestDataSource::TraceContext ctx) {
146       const uint32_t seq = static_cast<uint32_t>(rnd_seq_());
147       auto packet = ctx.NewTracePacket();
148       packet->set_timestamp(static_cast<uint64_t>(now));
149       auto* test_event = packet->set_for_testing();
150       test_event->set_seq_value(seq);
151       test_event->set_counter(static_cast<uint64_t>(num_msgs));
152       if (is_last)
153         test_event->set_is_last(true);
154 
155       FillPayload(*timings, seq, g_cfg->nesting(), test_event->set_payload());
156     });  // Trace().
157 
158   }  // while (!quit)
159   PERFETTO_DLOG("Worker done");
160 }
161 
FillPayload(const StressTestConfig::WriterTiming & timings,uint32_t seq,uint32_t nesting,protos::pbzero::TestEvent::TestPayload * payload)162 void StressTestDataSource::Worker::FillPayload(
163     const StressTestConfig::WriterTiming& timings,
164     uint32_t seq,
165     uint32_t nesting,
166     protos::pbzero::TestEvent::TestPayload* payload) {
167   // Write the payload in two halves, optionally with some delay in the
168   // middle.
169   std::normal_distribution<> msg_size_dist{timings.payload_mean(),
170                                            timings.payload_stddev()};
171   auto payload_size =
172       static_cast<uint32_t>(std::max(std::round(msg_size_dist(rnd_gen_)), 0.0));
173   std::string buf;
174   buf.resize(payload_size / 2);
175   for (size_t i = 0; i < buf.size(); ++i) {
176     buf[i] = static_cast<char>(33 + ((seq + i) % 64));  // Stay ASCII.
177   }
178   payload->add_str(buf);
179   payload->set_remaining_nesting_depth(nesting);
180   if (timings.payload_write_time_ms() > 0) {
181     std::this_thread::sleep_for(
182         std::chrono::milliseconds(timings.payload_write_time_ms()));
183   }
184 
185   if (nesting > 0)
186     FillPayload(timings, seq, nesting - 1, payload->add_nested());
187 
188   payload->add_str(buf);
189 }
190 }  // namespace
191 
192 PERFETTO_DECLARE_DATA_SOURCE_STATIC_MEMBERS(StressTestDataSource);
193 PERFETTO_DEFINE_DATA_SOURCE_STATIC_MEMBERS(StressTestDataSource);
194 
195 }  // namespace perfetto
196 
main()197 int main() {
198   perfetto::TracingInitArgs args;
199   args.backends = perfetto::kSystemBackend;
200 
201   std::string config_blob;
202   if (isatty(fileno(stdin)))
203     PERFETTO_LOG("Reading StressTestConfig proto from stdin");
204   perfetto::base::ReadFileStream(stdin, &config_blob);
205 
206   StressTestConfig cfg;
207   perfetto::g_cfg = &cfg;
208   if (config_blob.empty() || !cfg.ParseFromString(config_blob))
209     PERFETTO_FATAL("A StressTestConfig blob must be passed into stdin");
210 
211   if (cfg.shmem_page_size_kb())
212     args.shmem_page_size_hint_kb = cfg.shmem_page_size_kb();
213   if (cfg.shmem_size_kb())
214     args.shmem_page_size_hint_kb = cfg.shmem_size_kb();
215 
216   perfetto::Tracing::Initialize(args);
217   perfetto::DataSourceDescriptor dsd;
218   dsd.set_name("perfetto.stress_test");
219   perfetto::StressTestDataSource::Register(dsd);
220 
221   for (;;) {
222     std::this_thread::sleep_for(std::chrono::seconds(30));
223   }
224 }
225