1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/tracing/core/startup_trace_writer.h"
18
19 #include <numeric>
20
21 #include "perfetto/base/logging.h"
22 #include "perfetto/protozero/proto_utils.h"
23 #include "perfetto/trace/trace_packet.pbzero.h"
24 #include "perfetto/tracing/core/shared_memory_abi.h"
25 #include "perfetto/tracing/core/startup_trace_writer_registry.h"
26 #include "src/tracing/core/patch_list.h"
27 #include "src/tracing/core/shared_memory_arbiter_impl.h"
28
29 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
30
31 namespace perfetto {
32
33 namespace {
34
NewChunk(SharedMemoryArbiterImpl * arbiter,WriterID writer_id,ChunkID chunk_id,bool fragmenting_packet)35 SharedMemoryABI::Chunk NewChunk(SharedMemoryArbiterImpl* arbiter,
36 WriterID writer_id,
37 ChunkID chunk_id,
38 bool fragmenting_packet) {
39 ChunkHeader::Packets packets = {};
40 if (fragmenting_packet) {
41 packets.count = 1;
42 packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
43 }
44
45 // The memory order of the stores below doesn't really matter. This |header|
46 // is just a local temporary object. The GetNewChunk() call below will copy it
47 // into the shared buffer with the proper barriers.
48 ChunkHeader header = {};
49 header.writer_id.store(writer_id, std::memory_order_relaxed);
50 header.chunk_id.store(chunk_id, std::memory_order_relaxed);
51 header.packets.store(packets, std::memory_order_relaxed);
52
53 return arbiter->GetNewChunk(header);
54 }
55
56 class LocalBufferReader {
57 public:
LocalBufferReader(protozero::ScatteredHeapBuffer * buffer)58 LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
59 : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
60
ReadBytes(SharedMemoryABI::Chunk * target_chunk,size_t num_bytes,size_t cur_payload_size)61 size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk,
62 size_t num_bytes,
63 size_t cur_payload_size) {
64 PERFETTO_CHECK(target_chunk->payload_size() >=
65 num_bytes + cur_payload_size);
66 uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size;
67 size_t bytes_read = 0;
68 while (bytes_read < num_bytes) {
69 if (cur_slice_ == buffer_slices_.end())
70 return bytes_read;
71
72 auto cur_slice_range = cur_slice_->GetUsedRange();
73
74 if (cur_slice_range.size() == cur_slice_offset_) {
75 cur_slice_offset_ = 0;
76 cur_slice_++;
77 continue;
78 }
79
80 size_t read_size = std::min(num_bytes - bytes_read,
81 cur_slice_range.size() - cur_slice_offset_);
82 memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_,
83 read_size);
84 cur_slice_offset_ += read_size;
85 bytes_read += read_size;
86
87 // Should have either read all of the chunk or completed reading now.
88 PERFETTO_DCHECK(cur_slice_offset_ == cur_slice_range.size() ||
89 bytes_read == num_bytes);
90 }
91 return bytes_read;
92 }
93
TotalUsedSize() const94 size_t TotalUsedSize() const {
95 size_t used_size = 0;
96 for (const auto& slice : buffer_slices_) {
97 used_size += slice.GetUsedRange().size();
98 }
99 return used_size;
100 }
101
DidReadAllData() const102 bool DidReadAllData() const {
103 if (cur_slice_ == buffer_slices_.end())
104 return true;
105
106 const auto next_slice = cur_slice_ + 1;
107 return next_slice == buffer_slices_.end() &&
108 cur_slice_->GetUsedRange().size() == cur_slice_offset_;
109 }
110
111 private:
112 const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
113
114 // Iterator pointing to slice in |buffer_slices_| that we're currently reading
115 // from.
116 std::vector<protozero::ScatteredHeapBuffer::Slice>::const_iterator cur_slice_;
117 // Read offset in the current slice in bytes.
118 size_t cur_slice_offset_ = 0;
119 };
120
121 } // namespace
122
StartupTraceWriter(std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)123 StartupTraceWriter::StartupTraceWriter(
124 std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)
125 : registry_handle_(std::move(registry_handle)),
126 memory_buffer_(new protozero::ScatteredHeapBuffer()),
127 memory_stream_writer_(
128 new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
129 memory_buffer_->set_writer(memory_stream_writer_.get());
130 PERFETTO_DETACH_FROM_THREAD(writer_thread_checker_);
131 }
132
StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer)133 StartupTraceWriter::StartupTraceWriter(
134 std::unique_ptr<TraceWriter> trace_writer)
135 : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
136
~StartupTraceWriter()137 StartupTraceWriter::~StartupTraceWriter() {
138 if (registry_handle_)
139 registry_handle_->OnWriterDestroyed(this);
140 }
141
BindToArbiter(SharedMemoryArbiterImpl * arbiter,BufferID target_buffer)142 bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
143 BufferID target_buffer) {
144 // Create and destroy trace writer without holding lock, since this will post
145 // a task and task posting may trigger a trace event, which would cause a
146 // deadlock. This may create a few more trace writers than necessary in cases
147 // where a concurrent write is in progress (other than causing some
148 // computational overhead, this is not problematic).
149 auto trace_writer = arbiter->CreateTraceWriter(target_buffer);
150
151 {
152 std::lock_guard<std::mutex> lock(lock_);
153
154 PERFETTO_DCHECK(!trace_writer_);
155
156 // Can't bind while the writer thread is writing.
157 if (write_in_progress_)
158 return false;
159
160 // If there's a pending trace packet, it should have been completed by the
161 // writer thread before write_in_progress_ is reset.
162 if (cur_packet_) {
163 PERFETTO_DCHECK(cur_packet_->is_finalized());
164 cur_packet_.reset();
165 }
166
167 trace_writer_ = std::move(trace_writer);
168 ChunkID next_chunk_id = CommitLocalBufferChunks(
169 arbiter, trace_writer_->writer_id(), target_buffer);
170
171 // The real TraceWriter should start writing at the subsequent chunk ID.
172 bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
173 PERFETTO_DCHECK(success);
174
175 memory_stream_writer_.reset();
176 memory_buffer_.reset();
177 }
178
179 return true;
180 }
181
NewTracePacket()182 TraceWriter::TracePacketHandle StartupTraceWriter::NewTracePacket() {
183 PERFETTO_DCHECK_THREAD(writer_thread_checker_);
184
185 // Check if we are already bound without grabbing the lock. This is an
186 // optimization to avoid any locking in the common case where the proxy was
187 // bound some time ago.
188 if (PERFETTO_LIKELY(was_bound_)) {
189 PERFETTO_DCHECK(!cur_packet_);
190 PERFETTO_DCHECK(trace_writer_);
191 return trace_writer_->NewTracePacket();
192 }
193
194 // Now grab the lock and safely check whether we are still unbound.
195 {
196 std::unique_lock<std::mutex> lock(lock_);
197 if (trace_writer_) {
198 PERFETTO_DCHECK(!cur_packet_);
199 // Set the |was_bound_| flag to avoid locking in future calls to
200 // NewTracePacket().
201 was_bound_ = true;
202 // Don't hold the lock while calling NewTracePacket() on |trace_writer_|.
203 // This is safe because |trace_writer_| remains valid once set. It also
204 // avoids deadlocks that may be caused by holding the lock while waiting
205 // for a new SMB chunk in |trace_writer_|.
206 lock.unlock();
207 return trace_writer_->NewTracePacket();
208 }
209 // Not bound. Make sure it stays this way until the TracePacketHandle goes
210 // out of scope by setting |write_in_progress_|.
211 PERFETTO_DCHECK(!write_in_progress_);
212 write_in_progress_ = true;
213 }
214
215 // Write to the local buffer.
216 if (cur_packet_) {
217 // If we hit this, the caller is calling NewTracePacket() without having
218 // finalized the previous packet.
219 PERFETTO_DCHECK(cur_packet_->is_finalized());
220 } else {
221 cur_packet_.reset(new protos::pbzero::TracePacket());
222 }
223 cur_packet_->Reset(memory_stream_writer_.get());
224 TraceWriter::TracePacketHandle handle(cur_packet_.get());
225 // |this| outlives the packet handle.
226 handle.set_finalization_listener(this);
227 return handle;
228 }
229
Flush(std::function<void ()> callback)230 void StartupTraceWriter::Flush(std::function<void()> callback) {
231 PERFETTO_DCHECK_THREAD(writer_thread_checker_);
232 // It's fine to check |was_bound_| instead of acquiring the lock because
233 // |trace_writer_| will only need flushing after the first trace packet was
234 // written to it and |was_bound_| is set.
235 if (PERFETTO_LIKELY(was_bound_)) {
236 PERFETTO_DCHECK(trace_writer_);
237 return trace_writer_->Flush(std::move(callback));
238 }
239
240 // Can't flush while unbound.
241 if (callback)
242 callback();
243 }
244
writer_id() const245 WriterID StartupTraceWriter::writer_id() const {
246 PERFETTO_DCHECK_THREAD(writer_thread_checker_);
247 // We can't acquire the lock because this is a const method. So we'll only
248 // proxy to |trace_writer_| once we have written the first packet to it
249 // instead.
250 if (PERFETTO_LIKELY(was_bound_)) {
251 PERFETTO_DCHECK(trace_writer_);
252 return trace_writer_->writer_id();
253 }
254 return 0;
255 }
256
written() const257 uint64_t StartupTraceWriter::written() const {
258 PERFETTO_DCHECK_THREAD(writer_thread_checker_);
259 // We can't acquire the lock because this is a const method. So we'll only
260 // proxy to |trace_writer_| once we have written the first packet to it
261 // instead.
262 if (PERFETTO_LIKELY(was_bound_)) {
263 PERFETTO_DCHECK(trace_writer_);
264 return trace_writer_->written();
265 }
266 return 0;
267 }
268
used_buffer_size()269 size_t StartupTraceWriter::used_buffer_size() {
270 PERFETTO_DCHECK_THREAD(writer_thread_checker_);
271 if (PERFETTO_LIKELY(was_bound_))
272 return 0;
273
274 std::lock_guard<std::mutex> lock(lock_);
275 if (trace_writer_)
276 return 0;
277
278 size_t used_size = 0;
279 memory_buffer_->AdjustUsedSizeOfCurrentSlice();
280 for (const auto& slice : memory_buffer_->slices()) {
281 used_size += slice.GetUsedRange().size();
282 }
283 return used_size;
284 }
285
OnMessageFinalized(protozero::Message * message)286 void StartupTraceWriter::OnMessageFinalized(protozero::Message* message) {
287 PERFETTO_DCHECK(cur_packet_.get() == message);
288 PERFETTO_DCHECK(cur_packet_->is_finalized());
289 // Finalize() is a no-op because the packet is already finalized.
290 uint32_t packet_size = cur_packet_->Finalize();
291 packet_sizes_.push_back(packet_size);
292
293 // Write is complete, reset the flag to allow binding.
294 std::lock_guard<std::mutex> lock(lock_);
295 PERFETTO_DCHECK(write_in_progress_);
296 write_in_progress_ = false;
297 }
298
CommitLocalBufferChunks(SharedMemoryArbiterImpl * arbiter,WriterID writer_id,BufferID target_buffer)299 ChunkID StartupTraceWriter::CommitLocalBufferChunks(
300 SharedMemoryArbiterImpl* arbiter,
301 WriterID writer_id,
302 BufferID target_buffer) {
303 // TODO(eseckler): Write and commit these chunks asynchronously. This would
304 // require that the service is informed of the missing initial chunks, e.g. by
305 // committing our first chunk here before the new trace writer has a chance to
306 // commit its first chunk. Otherwise the service wouldn't know to wait for our
307 // chunks.
308
309 if (packet_sizes_.empty() || !writer_id)
310 return 0;
311
312 memory_buffer_->AdjustUsedSizeOfCurrentSlice();
313 LocalBufferReader local_buffer_reader(memory_buffer_.get());
314
315 PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() ==
316 std::accumulate(packet_sizes_.begin(), packet_sizes_.end(),
317 static_cast<size_t>(0u)));
318
319 ChunkID next_chunk_id = 0;
320 SharedMemoryABI::Chunk cur_chunk =
321 NewChunk(arbiter, writer_id, next_chunk_id++, false);
322
323 size_t max_payload_size = cur_chunk.payload_size();
324 size_t cur_payload_size = 0;
325 uint16_t cur_num_packets = 0;
326 size_t total_num_packets = packet_sizes_.size();
327 PatchList empty_patch_list;
328 for (size_t packet_idx = 0; packet_idx < total_num_packets; packet_idx++) {
329 uint32_t packet_size = packet_sizes_[packet_idx];
330 uint32_t remaining_packet_size = packet_size;
331 ++cur_num_packets;
332 do {
333 uint32_t fragment_size = static_cast<uint32_t>(
334 std::min(static_cast<size_t>(remaining_packet_size),
335 max_payload_size - cur_payload_size -
336 SharedMemoryABI::kPacketHeaderSize));
337 // Write packet header, i.e. the fragment size.
338 protozero::proto_utils::WriteRedundantVarInt(
339 fragment_size, cur_chunk.payload_begin() + cur_payload_size);
340 cur_payload_size += SharedMemoryABI::kPacketHeaderSize;
341
342 // Copy packet content into the chunk.
343 size_t bytes_read = local_buffer_reader.ReadBytes(
344 &cur_chunk, fragment_size, cur_payload_size);
345 PERFETTO_DCHECK(bytes_read == fragment_size);
346
347 cur_payload_size += fragment_size;
348 remaining_packet_size -= fragment_size;
349
350 bool last_write =
351 packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
352
353 // We should return the current chunk if we've filled its payload, reached
354 // the maximum number of packets, or wrote everything we wanted to.
355 bool return_chunk =
356 cur_payload_size >=
357 max_payload_size - SharedMemoryABI::kPacketHeaderSize ||
358 cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write;
359
360 if (return_chunk) {
361 auto new_packet_count =
362 cur_chunk.IncreasePacketCountTo(cur_num_packets);
363 PERFETTO_DCHECK(new_packet_count == cur_num_packets);
364
365 bool is_fragmenting = remaining_packet_size > 0;
366 if (is_fragmenting) {
367 PERFETTO_DCHECK(cur_payload_size == max_payload_size);
368 cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
369 }
370
371 arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
372 &empty_patch_list);
373
374 // Avoid creating a new chunk after the last write.
375 if (!last_write) {
376 cur_chunk =
377 NewChunk(arbiter, writer_id, next_chunk_id++, is_fragmenting);
378 max_payload_size = cur_chunk.payload_size();
379 cur_payload_size = 0;
380 cur_num_packets = is_fragmenting ? 1 : 0;
381 } else {
382 PERFETTO_DCHECK(!is_fragmenting);
383 }
384 }
385 } while (remaining_packet_size > 0);
386 }
387
388 // The last chunk should have been returned.
389 PERFETTO_DCHECK(!cur_chunk.is_valid());
390 // We should have read all data from the local buffer.
391 PERFETTO_DCHECK(local_buffer_reader.DidReadAllData());
392
393 return next_chunk_id;
394 }
395
396 } // namespace perfetto
397